6 * Asynchronnous resource handling, optionally (ab)using ticks
13 * $conn = new \pq\Connection;
14 * $conn->execAsync("SELECT * FROM foo", function ($rs) {
18 * $ticker = new \atick\Ticker;
19 * $ticker->register();
20 * $ticker->read($conn->socket, function($fd) use ($conn) {
29 * while (count($ticker));
33 * And an example without ticks:
36 * $conn = new \pq\Connection;
37 * $conn->execAsync("SELECT * FROM foo", function ($r) {
41 * $ticker = new \atick\Ticker;
42 * $ticker->read($conn->socket, function($fd) use ($conn) {
55 class Ticker
implements \Countable
60 protected $read = array();
65 protected $write = array();
68 * Register the ticker as tick function
69 * @return \atick\Ticker
72 register_tick_function(array($this, "__invoke"));
77 * Unregister the ticker as tick function
78 * @return \atick\Ticker
80 function unregister() {
81 unregister_tick_function(array($this, "__invoke"));
86 pcntl_signal_dispatch();
90 function on($signal, $action) {
91 pcntl_signal($signal, $action);
96 * The tick handler; calls atick\Ticker::wait(0)
99 function __invoke($timeout = 0) {
100 return $this->wait($timeout);
104 * Wait for read/write readiness on the watched fds
105 * @param float $timeout
106 * @return int count of wached fds
108 function wait($timeout = 1) {
109 $r = $w = $e = array();
111 foreach ($this->read
as $s) {
112 is_resource($s[0]) and $r[] = $s[0];
115 foreach ($this->write
as $s) {
116 is_resource($s[0]) and $w[] = $s[0];
120 $u = (int) (($timeout - $t) * 1000000);
122 if (($r ||
$w) && stream_select($r, $w, $e, $t, $u)) {
124 $this->read
[(int)$s][1]($s);
127 $this->write
[(int)$s][1]($s);
131 return $this->count();
135 * Returns the count of watched fds
136 * @implements \Countable
140 foreach ($this->read
as $i => $s) {
141 list($fd,,$verify) = $s;
143 unset($this->read
[$i]);
147 foreach ($this->write
as $i => $s) {
148 list($fd,,$verify) = $s;
150 unset($this->write
[$i]);
154 return count($this->read
) +
count($this->write
);
158 * Attach a read handler
159 * @param resource $fd
160 * @param callable $onread void($fd) the descriptor is readable, read data, now!
161 * @param callable $verify bool($fd) wheter the fd is still valid and should be watched
162 * @return \atick\Ticker
164 function read($fd, callable
$onread, callable
$verify = null) {
165 if ($fd instanceof IO
) {
166 $fd = $fd->getOutput();
168 $this->read
[(int)$fd] = array($fd, $onread, $verify ?
: function($fd) {
169 return is_resource($fd) && !feof($fd);
175 * Attach a write handler
176 * @param resource $fd
177 * @param callable $onwrite void($fd) the descriptor is writable, write data.
178 * @param callable $verify bool($fd) wheter the fd is still valid and should be watched
179 * @return \atick\Ticker
181 function write($fd, callable
$onwrite, callable
$verify = null) {
182 if ($fd instanceof IO
) {
183 $fd = $fd->getInput();
185 $this->write
[(int)$fd] = array($fd, $onwrite, $verify ?
: function($fd) {
186 return is_resource($fd) && !feof($fd);
193 * e.g. $ticker->pipe(STDIN, new IO\Process("gzip"), new IO\Process("base64"), STDOUT);
195 * @return \atick\Ticker
197 function pipe(/*IO ...*/) {
198 $io = func_get_args();
205 $this->read($r, $w ?
: function($fd) {
206 stream_copy_to_stream($fd, STDOUT
);