flush
authorMichael Wallner <mike@php.net>
Fri, 15 Mar 2013 14:21:10 +0000 (15:21 +0100)
committerMichael Wallner <mike@php.net>
Sun, 24 Nov 2013 10:07:52 +0000 (11:07 +0100)
lib/atick/Able.php [new file with mode: 0644]
lib/atick/Pipe.php [new file with mode: 0644]
lib/atick/Proc.php [new file with mode: 0644]
lib/atick/Ticker.php

diff --git a/lib/atick/Able.php b/lib/atick/Able.php
new file mode 100644 (file)
index 0000000..8bcac42
--- /dev/null
@@ -0,0 +1,49 @@
+<?php
+
+namespace atick;
+
+interface Able
+{
+       const CLOSED = 0;
+       const READABLE = 1;
+       const WRITABLE = 2;
+
+       /**
+        * Register any output streams with the ticker
+        * @param \atick\Ticker $ticker
+        * @param callable $verify
+        */
+       function with(Ticker $ticker, callable $verify = null);
+
+       /**
+        * Pass data to the input stream
+        * @param string $data
+        */
+       function write($data);
+
+       /**
+        * Where to send output to
+        * @param resource|callable $into
+        * @return \atick\Able
+        */
+       function read($into);
+
+       /**
+        * Where to send error output to
+        * @param resource|callable $into
+        * @return \atick\Able
+        */
+       function error($into);
+
+       /**
+        * Whether the pipe/proc is alive
+        * @return int
+        */
+       function stat();
+
+       /**
+        * Shutdown the pipe/proc
+        * @param int $what
+        */
+       function close($what = self::CLOSED);
+}
diff --git a/lib/atick/Pipe.php b/lib/atick/Pipe.php
new file mode 100644 (file)
index 0000000..393e2ef
--- /dev/null
@@ -0,0 +1,77 @@
+<?php
+
+namespace atick;
+
+class Pipe implements Able
+{
+       /**
+        * Output producing proc
+        * @var \atick\Able
+        */
+       protected $producer;
+
+       /**
+        * Input consuming proc
+        * @var \atick\Able
+        */
+       protected $consumer;
+
+       /**
+        * Create a pipe between two procs or other pipes
+        * @param \atick\Able $producer
+        * @param \atick\Able $consumer
+        */
+       function __construct(Able $producer, Able $consumer) {
+               $this->producer = $producer;
+               $this->consumer = $consumer;
+
+               $this->producer->read(function($fd) {
+                       if (strlen($data = fread($fd, 8192))) {
+                               $this->consumer->write($data);
+                       }
+               });
+       }
+
+       function __toString() {
+               return "$this->producer | $this->consumer";
+       }
+
+       function close($what = self::CLOSED) {
+               echo "PIPE KILL $this $what\n";
+               return $this;
+       }
+
+       function stat() {
+               echo "STAT $this\n";
+               if (!($this->producer->stat() & self::READABLE)) {
+                       if ($this->consumer->stat() & self::WRITABLE) {
+                               $this->consumer->close(self::WRITABLE);
+                               return self::READABLE;
+                       } else {
+                               $this->consumer->close(self::READABLE);
+                               return self::CLOSED;
+                       }
+               }
+               return ($this->producer->stat() & self::WRITABLE)
+                       | ($this->consumer->stat() & self::READABLE);
+       }
+
+       function write($data) {
+               return $this->producer->write($data);
+       }
+
+       function read($into) {
+               $this->consumer->read($into);
+               return $this;
+       }
+
+       function error($into) {
+               $this->consumer->error($into);
+               return $this;
+       }
+
+       function with(Ticker $ticker, callable $verify = null) {
+               $this->producer->with($ticker, $verify ?: array($this, "stat"));
+               $this->consumer->with($ticker, $verify ?: array($this, "stat"));
+       }
+}
diff --git a/lib/atick/Proc.php b/lib/atick/Proc.php
new file mode 100644 (file)
index 0000000..13e240c
--- /dev/null
@@ -0,0 +1,187 @@
+<?php
+
+namespace atick;
+
+class Proc implements Able
+{
+       /**
+        * Command string
+        * @var string
+        */
+       protected $command;
+
+       /**
+        * Process handle
+        * @var resource
+        */
+       protected $proc;
+
+       /**
+        * Proc's pipes
+        * @var array
+        */
+       protected $pipes;
+
+       protected $read;
+       protected $error;
+
+       /**
+        * @param string $command
+        * @param string $cwd
+        * @param array $env
+        * @throws \RuntimeException
+        */
+       function __construct($command, $cwd = null, array $env = null) {
+               $this->command = $command;
+               $this->proc = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env);
+
+               if (!is_resource($this->proc) || !($status = proc_get_status($this->proc))) {
+                       throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]);
+               }
+
+               stream_set_blocking($this->pipes[1], false);
+               stream_set_blocking($this->pipes[2], false);
+       }
+
+       /**
+        * Returns the command string
+        * @return string
+        */
+       function __toString() {
+               return (string) $this->command;
+       }
+
+       /**
+        * Cleanup pipes and proc handle
+        */
+       function __destruct() {
+               $this->close();
+       }
+
+       /**
+        * @inheritdoc
+        * @implements \aticker\Able
+        * @param \atick\Ticker $ticker
+        * @param callable $verify
+        */
+       function with(Ticker $ticker, callable $verify = null) {
+               if (is_callable($this->read)) {
+                       $ticker->read($this->pipes[1], $this->read, $verify ?: array($this, "stat"));
+               } elseif (is_resource($this->read)) {
+                       $ticker->read($this->pipes[1], function($fd) {
+                               if (strlen($data = fread($fd, 8192))) {
+                                       fwrite($this->read, $data);
+                               }
+                       }, $verify ?: array($this, "stat"));
+               } else {
+                       $ticker->read($this->pipes[1], function($fd) {
+                               /* nirvana */
+                               fread($fd, 8192);
+                       }, $verify ?: array($this, "stat"));
+               }
+
+               if (is_callable($this->error)) {
+                       $ticker->read($this->pipes[2], $this->error, $verify ?: array($this, "stat"));
+               } elseif (is_resource($this->error)) {
+                       $ticker->read($this->pipes[2], function($fd) {
+                               if (strlen($data = fread($fd, 8192))) {
+                                       fwrite($this->error, $data);
+                               }
+                       }, $verify ?: array($this, "stat"));
+               } else {
+                       $ticker->read($this->pipes[2], function($fd) {
+                               /* nirvana */
+                               fread($fd, 8192);
+                       }, $verify ?: array($this, "stat"));
+               }
+       }
+
+       function stat() {
+               echo "STAT $this\n";
+               if ($this->proc && proc_get_status($this->proc)["running"]) {
+                       if ((isset($this->pipes[1]) && is_resource($this->pipes[1]) && !feof($this->pipes[1]))
+                       &&  (isset($this->pipes[2]) && is_resource($this->pipes[2]) && !feof($this->pipes[2]))
+                       ) {
+                               if (isset($this->pipes[0]) && is_resource($this->pipes[0]) && !feof($this->pipes[0])) {
+                                       return self::READABLE | self::WRITABLE;
+                               }
+                               return self::READABLE;
+                       }
+               }
+               $this->close();
+               return self::CLOSED;
+       }
+
+       function close($what = self::CLOSED) {
+               echo "PROC KILL $this $what\n";
+
+               if (!$what || ($what & self::WRITABLE)) {
+                       if (is_resource($this->pipes[0])) {
+                               fclose($this->pipes[0]);
+                       }
+                       $this->pipes[0] = null;
+               }
+
+               if (!$what || ($what & self::READABLE)) {
+                       if (is_resource($this->pipes[1])) {
+                               fclose($this->pipes[1]);
+                       }
+                       $this->pipes[1] = null;
+                       if (is_resource($this->read)) {
+                               fclose($this->read);
+                       }
+                       if (is_resource($this->pipes[2])) {
+                               fclose($this->pipes[2]);
+                       }
+                       $this->pipes[2] = null;
+                       if (is_resource($this->error)) {
+                               fclose($this->error);
+                       }
+               }
+
+               if (!$what && is_resource($this->proc)) {
+                       proc_close($this->proc);
+                       $this->proc = null;
+               }
+       }
+
+       /**
+        * @inheritdoc
+        * @implements \aticker\Able
+        * @param string $data
+        * @return int
+        */
+       function write($data) {
+               return fwrite($this->pipes[0], $data);
+       }
+
+       /**
+        * Where to read STDOUT into
+        * @param resource|callable $into
+        * @return \atick\Proc
+        * @throws \InvalidArgumentException
+        */
+       function read($into) {
+               if (is_resource($into) || is_callable($into)) {
+                       $this->read = $into;
+               } else {
+                       throw new \InvalidArgumentException("Not a valid resource or callback");
+               }
+               return $this;
+       }
+
+       /**
+        * Where to pass STDERR into
+        * @param resource|callable $into
+        * @return \atick\Proc
+        * @throws \InvalidArgumentException
+        */
+       function error($into) {
+               if (is_resource($into) || is_callable($into)) {
+                       $this->error = $into;
+               } else {
+                       throw new \InvalidArgumentException("Not a valid resource or callback");
+               }
+               return $this;
+       }
+}
index 1dbf4e8744f1733de2720f91ad55ea7811d49f1b..f009b5edc64bea17c76cd3e0572efb2800f832dd 100644 (file)
@@ -86,8 +86,8 @@ class Ticker implements \Countable
         * The tick handler; calls atick\Ticker::wait(0)
         * @return int
         */
-       function __invoke() {
-               return $this->wait(0);
+       function __invoke($timeout = 0) {
+               return $this->wait($timeout);
        }
        
        /**
@@ -97,26 +97,27 @@ class Ticker implements \Countable
         */
        function wait($timeout = 1) {
                $r = $w = $e = array();
+
                foreach ($this->read as $s) {
-                       $r[] = $s[0];
+                       is_resource($s[0]) and $r[] = $s[0];
                }
+
                foreach ($this->write as $s) {
-                       $w[] = $s[0];
+                       is_resource($s[0]) and $w[] = $s[0];
                }
-               $s = (int) $timeout;
-               $u = (int) (($timeout - $s) * 1000000);
-               if (($r || $w) && stream_select($r, $w, $e, $s, $u)) {
+
+               $t = (int) $timeout;
+               $u = (int) (($timeout - $t) * 1000000);
+
+               if (($r || $w) && stream_select($r, $w, $e, $t, $u)) {
                        foreach ($r as $s) {
-                               if ($this->read[(int)$s][1]($s)) {
-                                       unset($this->read[(int)$s]);
-                               }
+                               $this->read[(int)$s][1]($s);
                        }
                        foreach ($w as $s) {
-                               if ($this->write[(int)$s][1]($s)) {
-                                       unset($this->write[(int)$s]);
-                               }
+                               $this->write[(int)$s][1]($s);
                        }
                }
+
                return $this->count();
        }
        
@@ -126,28 +127,48 @@ class Ticker implements \Countable
         * @return int
         */
        function count() {
+               foreach ($this->read as $i => $s) {
+                       list($fd,,$verify) = $s;
+                       if (!$verify($fd)) {
+                               unset($this->read[$i]);
+                       }
+               }
+
+               foreach ($this->write as $i => $s) {
+                       list($fd,,$verify) = $s;
+                       if (!$verify($fd)) {
+                               unset($this->write[$i]);
+                       }
+               }
+
                return count($this->read) + count($this->write);
        }
        
        /**
-        * Attach a read handler; let the callback return true, to stop watching the fd.
+        * Attach a read handler
         * @param resource $fd
-        * @param callable $cb
+        * @param callable $onread void($fd) the descriptor is readable, read data, now!
+        * @param callable $verify bool($fd) wheter the fd is still valid and should be watched
         * @return \atick\Ticker
         */
-       function read($fd, callable $cb) {
-               $this->read[(int)$fd] = array($fd, $cb);
+       function read($fd, callable $onread, callable $verify = null) {
+               $this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) {
+                       return is_resource($fd) && !feof($fd);
+               });
                return $this;
        }
        
        /**
-        * Attach a write handler; let the callback return true, to stop watching the fd.
+        * Attach a write handler
         * @param resource $fd
-        * @param callable $cb
+        * @param callable $onwrite void($fd) the descriptor is writable, write data.
+        * @param callable $verify bool($fd) wheter the fd is still valid and should be watched
         * @return \atick\Ticker
         */
-       function write($fd, callable $cb) {
-               $this->write[(int)$fd] = array($fd, $cb);
+       function write($fd, callable $onwrite, callable $verify = null) {
+               $this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) {
+                       return is_resource($fd) && !feof($fd);
+               });
                return $this;
        }
 }