return $this;
}
+ function dispatch() {
+ pcntl_signal_dispatch();
+ return $this;
+ }
+
+ function on($signal, $action) {
+ pcntl_signal($signal, $action);
+ return $this;
+ }
+
/**
* The tick handler; calls atick\Ticker::wait(0)
* @return int
return count($this->read) + count($this->write);
}
-
+
/**
* Attach a read handler
* @param resource $fd
* @return \atick\Ticker
*/
function read($fd, callable $onread, callable $verify = null) {
+ if ($fd instanceof IO) {
+ $fd = $fd->getOutput();
+ }
$this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) {
return is_resource($fd) && !feof($fd);
});
* @return \atick\Ticker
*/
function write($fd, callable $onwrite, callable $verify = null) {
+ if ($fd instanceof IO) {
+ $fd = $fd->getInput();
+ }
$this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) {
return is_resource($fd) && !feof($fd);
});
return $this;
}
+
+ /**
+ * Pipe
+ * e.g. $ticker->pipe(STDIN, new IO\Process("gzip"), new IO\Process("base64"), STDOUT);
+ * @param IO ...
+ * @return \atick\Ticker
+ */
+ function pipe(/*IO ...*/) {
+ $io = func_get_args();
+ reset($io);
+
+ do {
+ $r = current($io);
+ $w = next($io);
+
+ $this->read($r, $w ?: function($fd) {
+ stream_copy_to_stream($fd, STDOUT);
+ });
+ } while ($w);
+
+ return $this;
+ }
}