IO
[m6w6/atick] / lib / atick / Ticker.php
index f009b5edc64bea17c76cd3e0572efb2800f832dd..1255977fc55a86df13a96ba2930fce9b92e448c6 100644 (file)
@@ -82,6 +82,16 @@ class Ticker implements \Countable
                return $this;
        }
        
+       function dispatch() {
+               pcntl_signal_dispatch();
+               return $this;
+       }
+
+       function on($signal, $action) {
+               pcntl_signal($signal, $action);
+               return $this;
+       }
+
        /**
         * The tick handler; calls atick\Ticker::wait(0)
         * @return int
@@ -143,7 +153,7 @@ class Ticker implements \Countable
 
                return count($this->read) + count($this->write);
        }
-       
+
        /**
         * Attach a read handler
         * @param resource $fd
@@ -152,6 +162,9 @@ class Ticker implements \Countable
         * @return \atick\Ticker
         */
        function read($fd, callable $onread, callable $verify = null) {
+               if ($fd instanceof IO) {
+                       $fd = $fd->getOutput();
+               }
                $this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) {
                        return is_resource($fd) && !feof($fd);
                });
@@ -166,9 +179,34 @@ class Ticker implements \Countable
         * @return \atick\Ticker
         */
        function write($fd, callable $onwrite, callable $verify = null) {
+               if ($fd instanceof IO) {
+                       $fd = $fd->getInput();
+               }
                $this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) {
                        return is_resource($fd) && !feof($fd);
                });
                return $this;
        }
+
+       /**
+        * Pipe
+        * e.g. $ticker->pipe(STDIN, new IO\Process("gzip"), new IO\Process("base64"), STDOUT);
+        * @param IO ...
+        * @return \atick\Ticker
+        */
+       function pipe(/*IO ...*/) {
+               $io = func_get_args();
+               reset($io);
+
+               do {
+                       $r = current($io);
+                       $w = next($io);
+
+                       $this->read($r, $w ?: function($fd) {
+                               stream_copy_to_stream($fd, STDOUT);
+                       });
+               } while ($w);
+
+               return $this;
+       }
 }