IO
authorMichael Wallner <mike@php.net>
Sun, 24 Nov 2013 10:04:05 +0000 (11:04 +0100)
committerMichael Wallner <mike@php.net>
Sun, 24 Nov 2013 10:07:53 +0000 (11:07 +0100)
lib/atick/Able.php [deleted file]
lib/atick/IO.php [new file with mode: 0644]
lib/atick/IO/Filter.php [new file with mode: 0644]
lib/atick/IO/Process.php [new file with mode: 0644]
lib/atick/Pipe.php [deleted file]
lib/atick/Proc.php [deleted file]
lib/atick/Ticker.php

diff --git a/lib/atick/Able.php b/lib/atick/Able.php
deleted file mode 100644 (file)
index 8bcac42..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-<?php
-
-namespace atick;
-
-interface Able
-{
-       const CLOSED = 0;
-       const READABLE = 1;
-       const WRITABLE = 2;
-
-       /**
-        * Register any output streams with the ticker
-        * @param \atick\Ticker $ticker
-        * @param callable $verify
-        */
-       function with(Ticker $ticker, callable $verify = null);
-
-       /**
-        * Pass data to the input stream
-        * @param string $data
-        */
-       function write($data);
-
-       /**
-        * Where to send output to
-        * @param resource|callable $into
-        * @return \atick\Able
-        */
-       function read($into);
-
-       /**
-        * Where to send error output to
-        * @param resource|callable $into
-        * @return \atick\Able
-        */
-       function error($into);
-
-       /**
-        * Whether the pipe/proc is alive
-        * @return int
-        */
-       function stat();
-
-       /**
-        * Shutdown the pipe/proc
-        * @param int $what
-        */
-       function close($what = self::CLOSED);
-}
diff --git a/lib/atick/IO.php b/lib/atick/IO.php
new file mode 100644 (file)
index 0000000..3a37e72
--- /dev/null
@@ -0,0 +1,41 @@
+<?php
+
+namespace atick\IO;
+
+function copy($from, $to, $len = 4096) {
+       $data = fread($from, $len);
+
+       if (!strlen($data)) {
+               if (feof($from)) {
+                       /* forward EOF */
+                       fclose($to);
+               }
+               return;
+       }
+
+       return fwrite($to, $data);
+}
+
+namespace atick;
+
+interface IO
+{
+       /**
+        * Retrieve the output stream
+        * @return resource
+        */
+       function getOutput();
+
+       /**
+        * Retrieve the input stream
+        * @return resource
+        */
+       function getInput();
+
+       /**
+        * Pass input from FD to input and return output stream
+        * @param resource $fd
+        * @return resource
+        */
+       function __invoke($fd);
+}
diff --git a/lib/atick/IO/Filter.php b/lib/atick/IO/Filter.php
new file mode 100644 (file)
index 0000000..bb28c07
--- /dev/null
@@ -0,0 +1,103 @@
+<?php
+
+namespace atick\IO;
+
+use atick\IO;
+
+class Filter implements IO
+{
+       /**
+        * Input stream
+        * @var resource
+        */
+       protected $input;
+
+       /**
+        * Output stream
+        * @var resource
+        */
+       protected $output;
+
+       /**
+        * @param callable $func filter proc
+        * @param callable $ctor constructor
+        * @param callable $dtor destructor
+        */
+       function __construct(callable $func, callable $ctor = null, callable $dtor = null) {
+               /*
+                * We don't have pipe(2) support, so we'll use socketpair(2) instead.
+                */
+               list($this->input, $this->output) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
+               stream_filter_append($this->input, "atick\\IO\\StreamFilter", STREAM_FILTER_WRITE, compact("func", "ctor", "dtor"));
+               stream_set_blocking($this->output, false);
+       }
+
+       /**
+        * Cleanup socketpair(2) resources
+        */
+       function __destruct() {
+               if (is_resource($this->input)) {
+                       fclose($this->input);
+               }
+               if (is_resource($this->output)) {
+                       fclose($this->output);
+               }
+       }
+
+       /**
+        * @inheritdoc
+        * @return resource
+        */
+       function getOutput() {
+               return $this->output;
+       }
+
+       /**
+        * @inheritdoc
+        * @return resource
+        */
+       function getInput() {
+               return $this->input;
+       }
+
+       /**
+        * @inheritdoc
+        * @param resource $fd
+        * @return resource
+        */
+       function __invoke($fd = null) {
+               if ($fd) {
+                       copy($fd, $this->getInput());
+               }
+               return $this->getOutput();
+       }
+}
+
+class StreamFilter extends \php_user_filter
+{
+       public $filtername = "atick\\IO\\Func";
+       public $params;
+
+       function filter($in, $out, &$consumed, $closing) {
+               while ($bucket = stream_bucket_make_writeable($in)) {
+                       $consumed += $bucket->datalen;
+                       $bucket->data = call_user_func($this->params["func"], $this, $bucket->data, $closing);
+                       stream_bucket_append($out, $bucket);
+               }
+               return PSFS_PASS_ON;
+       }
+
+       function onClose() {
+               if (!empty($this->params["dtor"])) {
+                       call_user_func($this->params["dtor"], $this);
+               }
+       }
+
+       function onCreate() {
+               if (!empty($this->params["ctor"])) {
+                       call_user_func($this->params["ctor"], $this);
+               }
+       }
+}
+
+stream_filter_register("atick\\IO\\StreamFilter", "\\atick\\IO\\StreamFilter");
diff --git a/lib/atick/IO/Process.php b/lib/atick/IO/Process.php
new file mode 100644 (file)
index 0000000..e4d9bc3
--- /dev/null
@@ -0,0 +1,77 @@
+<?php
+
+namespace atick\IO;
+
+use atick\IO;
+
+class Process implements IO
+{
+       /**
+        * Process handle
+        * @var resource
+        */
+       protected $process;
+
+       /**
+        * Process' stdio pipes
+        * @var array
+        */
+       protected $pipes;
+
+       /**
+        * @param string $command
+        * @param string $cwd
+        * @param array $env
+        * @throws \RuntimeException
+        */
+       function __construct($command, $cwd = null, array $env = null) {
+               $this->process = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env);
+
+               if (!is_resource($this->process) || !($status = proc_get_status($this->process))) {
+                       throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]);
+               }
+
+               stream_set_blocking($this->pipes[1], false);
+               stream_set_blocking($this->pipes[2], false);
+       }
+
+       /**
+        * Cleanup pipes and proc handle
+        */
+       function __destruct() {
+               foreach ($this->pipes as $fd) {
+                       if (is_resource($fd)) {
+                               fclose($fd);
+                       }
+               }
+               proc_close($this->process);
+       }
+
+       /**
+        * @inheritdoc
+        * @return resource
+        */
+       function getOutput() {
+               return $this->pipes[1];
+       }
+
+       /**
+        * @inheritdoc
+        * @return resource
+        */
+       function getInput() {
+               return $this->pipes[0];
+       }
+
+       /**
+        * @inheritdoc
+        * @param resource $fd
+        * @return resource
+        */
+       function __invoke($fd) {
+               if ($fd) {
+                       copy($fd, $this->getInput());
+               }
+               return $this->getOutput();
+       }
+}
diff --git a/lib/atick/Pipe.php b/lib/atick/Pipe.php
deleted file mode 100644 (file)
index 393e2ef..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-<?php
-
-namespace atick;
-
-class Pipe implements Able
-{
-       /**
-        * Output producing proc
-        * @var \atick\Able
-        */
-       protected $producer;
-
-       /**
-        * Input consuming proc
-        * @var \atick\Able
-        */
-       protected $consumer;
-
-       /**
-        * Create a pipe between two procs or other pipes
-        * @param \atick\Able $producer
-        * @param \atick\Able $consumer
-        */
-       function __construct(Able $producer, Able $consumer) {
-               $this->producer = $producer;
-               $this->consumer = $consumer;
-
-               $this->producer->read(function($fd) {
-                       if (strlen($data = fread($fd, 8192))) {
-                               $this->consumer->write($data);
-                       }
-               });
-       }
-
-       function __toString() {
-               return "$this->producer | $this->consumer";
-       }
-
-       function close($what = self::CLOSED) {
-               echo "PIPE KILL $this $what\n";
-               return $this;
-       }
-
-       function stat() {
-               echo "STAT $this\n";
-               if (!($this->producer->stat() & self::READABLE)) {
-                       if ($this->consumer->stat() & self::WRITABLE) {
-                               $this->consumer->close(self::WRITABLE);
-                               return self::READABLE;
-                       } else {
-                               $this->consumer->close(self::READABLE);
-                               return self::CLOSED;
-                       }
-               }
-               return ($this->producer->stat() & self::WRITABLE)
-                       | ($this->consumer->stat() & self::READABLE);
-       }
-
-       function write($data) {
-               return $this->producer->write($data);
-       }
-
-       function read($into) {
-               $this->consumer->read($into);
-               return $this;
-       }
-
-       function error($into) {
-               $this->consumer->error($into);
-               return $this;
-       }
-
-       function with(Ticker $ticker, callable $verify = null) {
-               $this->producer->with($ticker, $verify ?: array($this, "stat"));
-               $this->consumer->with($ticker, $verify ?: array($this, "stat"));
-       }
-}
diff --git a/lib/atick/Proc.php b/lib/atick/Proc.php
deleted file mode 100644 (file)
index 13e240c..0000000
+++ /dev/null
@@ -1,187 +0,0 @@
-<?php
-
-namespace atick;
-
-class Proc implements Able
-{
-       /**
-        * Command string
-        * @var string
-        */
-       protected $command;
-
-       /**
-        * Process handle
-        * @var resource
-        */
-       protected $proc;
-
-       /**
-        * Proc's pipes
-        * @var array
-        */
-       protected $pipes;
-
-       protected $read;
-       protected $error;
-
-       /**
-        * @param string $command
-        * @param string $cwd
-        * @param array $env
-        * @throws \RuntimeException
-        */
-       function __construct($command, $cwd = null, array $env = null) {
-               $this->command = $command;
-               $this->proc = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env);
-
-               if (!is_resource($this->proc) || !($status = proc_get_status($this->proc))) {
-                       throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]);
-               }
-
-               stream_set_blocking($this->pipes[1], false);
-               stream_set_blocking($this->pipes[2], false);
-       }
-
-       /**
-        * Returns the command string
-        * @return string
-        */
-       function __toString() {
-               return (string) $this->command;
-       }
-
-       /**
-        * Cleanup pipes and proc handle
-        */
-       function __destruct() {
-               $this->close();
-       }
-
-       /**
-        * @inheritdoc
-        * @implements \aticker\Able
-        * @param \atick\Ticker $ticker
-        * @param callable $verify
-        */
-       function with(Ticker $ticker, callable $verify = null) {
-               if (is_callable($this->read)) {
-                       $ticker->read($this->pipes[1], $this->read, $verify ?: array($this, "stat"));
-               } elseif (is_resource($this->read)) {
-                       $ticker->read($this->pipes[1], function($fd) {
-                               if (strlen($data = fread($fd, 8192))) {
-                                       fwrite($this->read, $data);
-                               }
-                       }, $verify ?: array($this, "stat"));
-               } else {
-                       $ticker->read($this->pipes[1], function($fd) {
-                               /* nirvana */
-                               fread($fd, 8192);
-                       }, $verify ?: array($this, "stat"));
-               }
-
-               if (is_callable($this->error)) {
-                       $ticker->read($this->pipes[2], $this->error, $verify ?: array($this, "stat"));
-               } elseif (is_resource($this->error)) {
-                       $ticker->read($this->pipes[2], function($fd) {
-                               if (strlen($data = fread($fd, 8192))) {
-                                       fwrite($this->error, $data);
-                               }
-                       }, $verify ?: array($this, "stat"));
-               } else {
-                       $ticker->read($this->pipes[2], function($fd) {
-                               /* nirvana */
-                               fread($fd, 8192);
-                       }, $verify ?: array($this, "stat"));
-               }
-       }
-
-       function stat() {
-               echo "STAT $this\n";
-               if ($this->proc && proc_get_status($this->proc)["running"]) {
-                       if ((isset($this->pipes[1]) && is_resource($this->pipes[1]) && !feof($this->pipes[1]))
-                       &&  (isset($this->pipes[2]) && is_resource($this->pipes[2]) && !feof($this->pipes[2]))
-                       ) {
-                               if (isset($this->pipes[0]) && is_resource($this->pipes[0]) && !feof($this->pipes[0])) {
-                                       return self::READABLE | self::WRITABLE;
-                               }
-                               return self::READABLE;
-                       }
-               }
-               $this->close();
-               return self::CLOSED;
-       }
-
-       function close($what = self::CLOSED) {
-               echo "PROC KILL $this $what\n";
-
-               if (!$what || ($what & self::WRITABLE)) {
-                       if (is_resource($this->pipes[0])) {
-                               fclose($this->pipes[0]);
-                       }
-                       $this->pipes[0] = null;
-               }
-
-               if (!$what || ($what & self::READABLE)) {
-                       if (is_resource($this->pipes[1])) {
-                               fclose($this->pipes[1]);
-                       }
-                       $this->pipes[1] = null;
-                       if (is_resource($this->read)) {
-                               fclose($this->read);
-                       }
-                       if (is_resource($this->pipes[2])) {
-                               fclose($this->pipes[2]);
-                       }
-                       $this->pipes[2] = null;
-                       if (is_resource($this->error)) {
-                               fclose($this->error);
-                       }
-               }
-
-               if (!$what && is_resource($this->proc)) {
-                       proc_close($this->proc);
-                       $this->proc = null;
-               }
-       }
-
-       /**
-        * @inheritdoc
-        * @implements \aticker\Able
-        * @param string $data
-        * @return int
-        */
-       function write($data) {
-               return fwrite($this->pipes[0], $data);
-       }
-
-       /**
-        * Where to read STDOUT into
-        * @param resource|callable $into
-        * @return \atick\Proc
-        * @throws \InvalidArgumentException
-        */
-       function read($into) {
-               if (is_resource($into) || is_callable($into)) {
-                       $this->read = $into;
-               } else {
-                       throw new \InvalidArgumentException("Not a valid resource or callback");
-               }
-               return $this;
-       }
-
-       /**
-        * Where to pass STDERR into
-        * @param resource|callable $into
-        * @return \atick\Proc
-        * @throws \InvalidArgumentException
-        */
-       function error($into) {
-               if (is_resource($into) || is_callable($into)) {
-                       $this->error = $into;
-               } else {
-                       throw new \InvalidArgumentException("Not a valid resource or callback");
-               }
-               return $this;
-       }
-}
index f009b5edc64bea17c76cd3e0572efb2800f832dd..1255977fc55a86df13a96ba2930fce9b92e448c6 100644 (file)
@@ -82,6 +82,16 @@ class Ticker implements \Countable
                return $this;
        }
        
+       function dispatch() {
+               pcntl_signal_dispatch();
+               return $this;
+       }
+
+       function on($signal, $action) {
+               pcntl_signal($signal, $action);
+               return $this;
+       }
+
        /**
         * The tick handler; calls atick\Ticker::wait(0)
         * @return int
@@ -143,7 +153,7 @@ class Ticker implements \Countable
 
                return count($this->read) + count($this->write);
        }
-       
+
        /**
         * Attach a read handler
         * @param resource $fd
@@ -152,6 +162,9 @@ class Ticker implements \Countable
         * @return \atick\Ticker
         */
        function read($fd, callable $onread, callable $verify = null) {
+               if ($fd instanceof IO) {
+                       $fd = $fd->getOutput();
+               }
                $this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) {
                        return is_resource($fd) && !feof($fd);
                });
@@ -166,9 +179,34 @@ class Ticker implements \Countable
         * @return \atick\Ticker
         */
        function write($fd, callable $onwrite, callable $verify = null) {
+               if ($fd instanceof IO) {
+                       $fd = $fd->getInput();
+               }
                $this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) {
                        return is_resource($fd) && !feof($fd);
                });
                return $this;
        }
+
+       /**
+        * Pipe
+        * e.g. $ticker->pipe(STDIN, new IO\Process("gzip"), new IO\Process("base64"), STDOUT);
+        * @param IO ...
+        * @return \atick\Ticker
+        */
+       function pipe(/*IO ...*/) {
+               $io = func_get_args();
+               reset($io);
+
+               do {
+                       $r = current($io);
+                       $w = next($io);
+
+                       $this->read($r, $w ?: function($fd) {
+                               stream_copy_to_stream($fd, STDOUT);
+                       });
+               } while ($w);
+
+               return $this;
+       }
 }