From 363315d4d8cbca1850183e27243e317359e261e5 Mon Sep 17 00:00:00 2001 From: Michael Wallner Date: Sun, 24 Nov 2013 11:04:05 +0100 Subject: [PATCH 1/1] IO --- lib/atick/Able.php | 49 ---------- lib/atick/IO.php | 41 +++++++++ lib/atick/IO/Filter.php | 103 +++++++++++++++++++++ lib/atick/IO/Process.php | 77 ++++++++++++++++ lib/atick/Pipe.php | 77 ---------------- lib/atick/Proc.php | 187 --------------------------------------- lib/atick/Ticker.php | 40 ++++++++- 7 files changed, 260 insertions(+), 314 deletions(-) delete mode 100644 lib/atick/Able.php create mode 100644 lib/atick/IO.php create mode 100644 lib/atick/IO/Filter.php create mode 100644 lib/atick/IO/Process.php delete mode 100644 lib/atick/Pipe.php delete mode 100644 lib/atick/Proc.php diff --git a/lib/atick/Able.php b/lib/atick/Able.php deleted file mode 100644 index 8bcac42..0000000 --- a/lib/atick/Able.php +++ /dev/null @@ -1,49 +0,0 @@ -input, $this->output) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + stream_filter_append($this->input, "atick\\IO\\StreamFilter", STREAM_FILTER_WRITE, compact("func", "ctor", "dtor")); + stream_set_blocking($this->output, false); + } + + /** + * Cleanup socketpair(2) resources + */ + function __destruct() { + if (is_resource($this->input)) { + fclose($this->input); + } + if (is_resource($this->output)) { + fclose($this->output); + } + } + + /** + * @inheritdoc + * @return resource + */ + function getOutput() { + return $this->output; + } + + /** + * @inheritdoc + * @return resource + */ + function getInput() { + return $this->input; + } + + /** + * @inheritdoc + * @param resource $fd + * @return resource + */ + function __invoke($fd = null) { + if ($fd) { + copy($fd, $this->getInput()); + } + return $this->getOutput(); + } +} + +class StreamFilter extends \php_user_filter +{ + public $filtername = "atick\\IO\\Func"; + public $params; + + function filter($in, $out, &$consumed, $closing) { + while ($bucket = stream_bucket_make_writeable($in)) { + $consumed += $bucket->datalen; + $bucket->data = call_user_func($this->params["func"], $this, $bucket->data, $closing); + stream_bucket_append($out, $bucket); + } + return PSFS_PASS_ON; + } + + function onClose() { + if (!empty($this->params["dtor"])) { + call_user_func($this->params["dtor"], $this); + } + } + + function onCreate() { + if (!empty($this->params["ctor"])) { + call_user_func($this->params["ctor"], $this); + } + } +} + +stream_filter_register("atick\\IO\\StreamFilter", "\\atick\\IO\\StreamFilter"); diff --git a/lib/atick/IO/Process.php b/lib/atick/IO/Process.php new file mode 100644 index 0000000..e4d9bc3 --- /dev/null +++ b/lib/atick/IO/Process.php @@ -0,0 +1,77 @@ +process = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env); + + if (!is_resource($this->process) || !($status = proc_get_status($this->process))) { + throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]); + } + + stream_set_blocking($this->pipes[1], false); + stream_set_blocking($this->pipes[2], false); + } + + /** + * Cleanup pipes and proc handle + */ + function __destruct() { + foreach ($this->pipes as $fd) { + if (is_resource($fd)) { + fclose($fd); + } + } + proc_close($this->process); + } + + /** + * @inheritdoc + * @return resource + */ + function getOutput() { + return $this->pipes[1]; + } + + /** + * @inheritdoc + * @return resource + */ + function getInput() { + return $this->pipes[0]; + } + + /** + * @inheritdoc + * @param resource $fd + * @return resource + */ + function __invoke($fd) { + if ($fd) { + copy($fd, $this->getInput()); + } + return $this->getOutput(); + } +} diff --git a/lib/atick/Pipe.php b/lib/atick/Pipe.php deleted file mode 100644 index 393e2ef..0000000 --- a/lib/atick/Pipe.php +++ /dev/null @@ -1,77 +0,0 @@ -producer = $producer; - $this->consumer = $consumer; - - $this->producer->read(function($fd) { - if (strlen($data = fread($fd, 8192))) { - $this->consumer->write($data); - } - }); - } - - function __toString() { - return "$this->producer | $this->consumer"; - } - - function close($what = self::CLOSED) { - echo "PIPE KILL $this $what\n"; - return $this; - } - - function stat() { - echo "STAT $this\n"; - if (!($this->producer->stat() & self::READABLE)) { - if ($this->consumer->stat() & self::WRITABLE) { - $this->consumer->close(self::WRITABLE); - return self::READABLE; - } else { - $this->consumer->close(self::READABLE); - return self::CLOSED; - } - } - return ($this->producer->stat() & self::WRITABLE) - | ($this->consumer->stat() & self::READABLE); - } - - function write($data) { - return $this->producer->write($data); - } - - function read($into) { - $this->consumer->read($into); - return $this; - } - - function error($into) { - $this->consumer->error($into); - return $this; - } - - function with(Ticker $ticker, callable $verify = null) { - $this->producer->with($ticker, $verify ?: array($this, "stat")); - $this->consumer->with($ticker, $verify ?: array($this, "stat")); - } -} diff --git a/lib/atick/Proc.php b/lib/atick/Proc.php deleted file mode 100644 index 13e240c..0000000 --- a/lib/atick/Proc.php +++ /dev/null @@ -1,187 +0,0 @@ -command = $command; - $this->proc = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env); - - if (!is_resource($this->proc) || !($status = proc_get_status($this->proc))) { - throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]); - } - - stream_set_blocking($this->pipes[1], false); - stream_set_blocking($this->pipes[2], false); - } - - /** - * Returns the command string - * @return string - */ - function __toString() { - return (string) $this->command; - } - - /** - * Cleanup pipes and proc handle - */ - function __destruct() { - $this->close(); - } - - /** - * @inheritdoc - * @implements \aticker\Able - * @param \atick\Ticker $ticker - * @param callable $verify - */ - function with(Ticker $ticker, callable $verify = null) { - if (is_callable($this->read)) { - $ticker->read($this->pipes[1], $this->read, $verify ?: array($this, "stat")); - } elseif (is_resource($this->read)) { - $ticker->read($this->pipes[1], function($fd) { - if (strlen($data = fread($fd, 8192))) { - fwrite($this->read, $data); - } - }, $verify ?: array($this, "stat")); - } else { - $ticker->read($this->pipes[1], function($fd) { - /* nirvana */ - fread($fd, 8192); - }, $verify ?: array($this, "stat")); - } - - if (is_callable($this->error)) { - $ticker->read($this->pipes[2], $this->error, $verify ?: array($this, "stat")); - } elseif (is_resource($this->error)) { - $ticker->read($this->pipes[2], function($fd) { - if (strlen($data = fread($fd, 8192))) { - fwrite($this->error, $data); - } - }, $verify ?: array($this, "stat")); - } else { - $ticker->read($this->pipes[2], function($fd) { - /* nirvana */ - fread($fd, 8192); - }, $verify ?: array($this, "stat")); - } - } - - function stat() { - echo "STAT $this\n"; - if ($this->proc && proc_get_status($this->proc)["running"]) { - if ((isset($this->pipes[1]) && is_resource($this->pipes[1]) && !feof($this->pipes[1])) - && (isset($this->pipes[2]) && is_resource($this->pipes[2]) && !feof($this->pipes[2])) - ) { - if (isset($this->pipes[0]) && is_resource($this->pipes[0]) && !feof($this->pipes[0])) { - return self::READABLE | self::WRITABLE; - } - return self::READABLE; - } - } - $this->close(); - return self::CLOSED; - } - - function close($what = self::CLOSED) { - echo "PROC KILL $this $what\n"; - - if (!$what || ($what & self::WRITABLE)) { - if (is_resource($this->pipes[0])) { - fclose($this->pipes[0]); - } - $this->pipes[0] = null; - } - - if (!$what || ($what & self::READABLE)) { - if (is_resource($this->pipes[1])) { - fclose($this->pipes[1]); - } - $this->pipes[1] = null; - if (is_resource($this->read)) { - fclose($this->read); - } - if (is_resource($this->pipes[2])) { - fclose($this->pipes[2]); - } - $this->pipes[2] = null; - if (is_resource($this->error)) { - fclose($this->error); - } - } - - if (!$what && is_resource($this->proc)) { - proc_close($this->proc); - $this->proc = null; - } - } - - /** - * @inheritdoc - * @implements \aticker\Able - * @param string $data - * @return int - */ - function write($data) { - return fwrite($this->pipes[0], $data); - } - - /** - * Where to read STDOUT into - * @param resource|callable $into - * @return \atick\Proc - * @throws \InvalidArgumentException - */ - function read($into) { - if (is_resource($into) || is_callable($into)) { - $this->read = $into; - } else { - throw new \InvalidArgumentException("Not a valid resource or callback"); - } - return $this; - } - - /** - * Where to pass STDERR into - * @param resource|callable $into - * @return \atick\Proc - * @throws \InvalidArgumentException - */ - function error($into) { - if (is_resource($into) || is_callable($into)) { - $this->error = $into; - } else { - throw new \InvalidArgumentException("Not a valid resource or callback"); - } - return $this; - } -} diff --git a/lib/atick/Ticker.php b/lib/atick/Ticker.php index f009b5e..1255977 100644 --- a/lib/atick/Ticker.php +++ b/lib/atick/Ticker.php @@ -82,6 +82,16 @@ class Ticker implements \Countable return $this; } + function dispatch() { + pcntl_signal_dispatch(); + return $this; + } + + function on($signal, $action) { + pcntl_signal($signal, $action); + return $this; + } + /** * The tick handler; calls atick\Ticker::wait(0) * @return int @@ -143,7 +153,7 @@ class Ticker implements \Countable return count($this->read) + count($this->write); } - + /** * Attach a read handler * @param resource $fd @@ -152,6 +162,9 @@ class Ticker implements \Countable * @return \atick\Ticker */ function read($fd, callable $onread, callable $verify = null) { + if ($fd instanceof IO) { + $fd = $fd->getOutput(); + } $this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) { return is_resource($fd) && !feof($fd); }); @@ -166,9 +179,34 @@ class Ticker implements \Countable * @return \atick\Ticker */ function write($fd, callable $onwrite, callable $verify = null) { + if ($fd instanceof IO) { + $fd = $fd->getInput(); + } $this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) { return is_resource($fd) && !feof($fd); }); return $this; } + + /** + * Pipe + * e.g. $ticker->pipe(STDIN, new IO\Process("gzip"), new IO\Process("base64"), STDOUT); + * @param IO ... + * @return \atick\Ticker + */ + function pipe(/*IO ...*/) { + $io = func_get_args(); + reset($io); + + do { + $r = current($io); + $w = next($io); + + $this->read($r, $w ?: function($fd) { + stream_copy_to_stream($fd, STDOUT); + }); + } while ($w); + + return $this; + } } -- 2.30.2