+++ /dev/null
-<?php
-
-namespace atick;
-
-interface Able
-{
- const CLOSED = 0;
- const READABLE = 1;
- const WRITABLE = 2;
-
- /**
- * Register any output streams with the ticker
- * @param \atick\Ticker $ticker
- * @param callable $verify
- */
- function with(Ticker $ticker, callable $verify = null);
-
- /**
- * Pass data to the input stream
- * @param string $data
- */
- function write($data);
-
- /**
- * Where to send output to
- * @param resource|callable $into
- * @return \atick\Able
- */
- function read($into);
-
- /**
- * Where to send error output to
- * @param resource|callable $into
- * @return \atick\Able
- */
- function error($into);
-
- /**
- * Whether the pipe/proc is alive
- * @return int
- */
- function stat();
-
- /**
- * Shutdown the pipe/proc
- * @param int $what
- */
- function close($what = self::CLOSED);
-}
--- /dev/null
+<?php
+
+namespace atick\IO;
+
+function copy($from, $to, $len = 4096) {
+ $data = fread($from, $len);
+
+ if (!strlen($data)) {
+ if (feof($from)) {
+ /* forward EOF */
+ fclose($to);
+ }
+ return;
+ }
+
+ return fwrite($to, $data);
+}
+
+namespace atick;
+
+interface IO
+{
+ /**
+ * Retrieve the output stream
+ * @return resource
+ */
+ function getOutput();
+
+ /**
+ * Retrieve the input stream
+ * @return resource
+ */
+ function getInput();
+
+ /**
+ * Pass input from FD to input and return output stream
+ * @param resource $fd
+ * @return resource
+ */
+ function __invoke($fd);
+}
--- /dev/null
+<?php
+
+namespace atick\IO;
+
+use atick\IO;
+
+class Filter implements IO
+{
+ /**
+ * Input stream
+ * @var resource
+ */
+ protected $input;
+
+ /**
+ * Output stream
+ * @var resource
+ */
+ protected $output;
+
+ /**
+ * @param callable $func filter proc
+ * @param callable $ctor constructor
+ * @param callable $dtor destructor
+ */
+ function __construct(callable $func, callable $ctor = null, callable $dtor = null) {
+ /*
+ * We don't have pipe(2) support, so we'll use socketpair(2) instead.
+ */
+ list($this->input, $this->output) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
+ stream_filter_append($this->input, "atick\\IO\\StreamFilter", STREAM_FILTER_WRITE, compact("func", "ctor", "dtor"));
+ stream_set_blocking($this->output, false);
+ }
+
+ /**
+ * Cleanup socketpair(2) resources
+ */
+ function __destruct() {
+ if (is_resource($this->input)) {
+ fclose($this->input);
+ }
+ if (is_resource($this->output)) {
+ fclose($this->output);
+ }
+ }
+
+ /**
+ * @inheritdoc
+ * @return resource
+ */
+ function getOutput() {
+ return $this->output;
+ }
+
+ /**
+ * @inheritdoc
+ * @return resource
+ */
+ function getInput() {
+ return $this->input;
+ }
+
+ /**
+ * @inheritdoc
+ * @param resource $fd
+ * @return resource
+ */
+ function __invoke($fd = null) {
+ if ($fd) {
+ copy($fd, $this->getInput());
+ }
+ return $this->getOutput();
+ }
+}
+
+class StreamFilter extends \php_user_filter
+{
+ public $filtername = "atick\\IO\\Func";
+ public $params;
+
+ function filter($in, $out, &$consumed, $closing) {
+ while ($bucket = stream_bucket_make_writeable($in)) {
+ $consumed += $bucket->datalen;
+ $bucket->data = call_user_func($this->params["func"], $this, $bucket->data, $closing);
+ stream_bucket_append($out, $bucket);
+ }
+ return PSFS_PASS_ON;
+ }
+
+ function onClose() {
+ if (!empty($this->params["dtor"])) {
+ call_user_func($this->params["dtor"], $this);
+ }
+ }
+
+ function onCreate() {
+ if (!empty($this->params["ctor"])) {
+ call_user_func($this->params["ctor"], $this);
+ }
+ }
+}
+
+stream_filter_register("atick\\IO\\StreamFilter", "\\atick\\IO\\StreamFilter");
--- /dev/null
+<?php
+
+namespace atick\IO;
+
+use atick\IO;
+
+class Process implements IO
+{
+ /**
+ * Process handle
+ * @var resource
+ */
+ protected $process;
+
+ /**
+ * Process' stdio pipes
+ * @var array
+ */
+ protected $pipes;
+
+ /**
+ * @param string $command
+ * @param string $cwd
+ * @param array $env
+ * @throws \RuntimeException
+ */
+ function __construct($command, $cwd = null, array $env = null) {
+ $this->process = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env);
+
+ if (!is_resource($this->process) || !($status = proc_get_status($this->process))) {
+ throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]);
+ }
+
+ stream_set_blocking($this->pipes[1], false);
+ stream_set_blocking($this->pipes[2], false);
+ }
+
+ /**
+ * Cleanup pipes and proc handle
+ */
+ function __destruct() {
+ foreach ($this->pipes as $fd) {
+ if (is_resource($fd)) {
+ fclose($fd);
+ }
+ }
+ proc_close($this->process);
+ }
+
+ /**
+ * @inheritdoc
+ * @return resource
+ */
+ function getOutput() {
+ return $this->pipes[1];
+ }
+
+ /**
+ * @inheritdoc
+ * @return resource
+ */
+ function getInput() {
+ return $this->pipes[0];
+ }
+
+ /**
+ * @inheritdoc
+ * @param resource $fd
+ * @return resource
+ */
+ function __invoke($fd) {
+ if ($fd) {
+ copy($fd, $this->getInput());
+ }
+ return $this->getOutput();
+ }
+}
+++ /dev/null
-<?php
-
-namespace atick;
-
-class Pipe implements Able
-{
- /**
- * Output producing proc
- * @var \atick\Able
- */
- protected $producer;
-
- /**
- * Input consuming proc
- * @var \atick\Able
- */
- protected $consumer;
-
- /**
- * Create a pipe between two procs or other pipes
- * @param \atick\Able $producer
- * @param \atick\Able $consumer
- */
- function __construct(Able $producer, Able $consumer) {
- $this->producer = $producer;
- $this->consumer = $consumer;
-
- $this->producer->read(function($fd) {
- if (strlen($data = fread($fd, 8192))) {
- $this->consumer->write($data);
- }
- });
- }
-
- function __toString() {
- return "$this->producer | $this->consumer";
- }
-
- function close($what = self::CLOSED) {
- echo "PIPE KILL $this $what\n";
- return $this;
- }
-
- function stat() {
- echo "STAT $this\n";
- if (!($this->producer->stat() & self::READABLE)) {
- if ($this->consumer->stat() & self::WRITABLE) {
- $this->consumer->close(self::WRITABLE);
- return self::READABLE;
- } else {
- $this->consumer->close(self::READABLE);
- return self::CLOSED;
- }
- }
- return ($this->producer->stat() & self::WRITABLE)
- | ($this->consumer->stat() & self::READABLE);
- }
-
- function write($data) {
- return $this->producer->write($data);
- }
-
- function read($into) {
- $this->consumer->read($into);
- return $this;
- }
-
- function error($into) {
- $this->consumer->error($into);
- return $this;
- }
-
- function with(Ticker $ticker, callable $verify = null) {
- $this->producer->with($ticker, $verify ?: array($this, "stat"));
- $this->consumer->with($ticker, $verify ?: array($this, "stat"));
- }
-}
+++ /dev/null
-<?php
-
-namespace atick;
-
-class Proc implements Able
-{
- /**
- * Command string
- * @var string
- */
- protected $command;
-
- /**
- * Process handle
- * @var resource
- */
- protected $proc;
-
- /**
- * Proc's pipes
- * @var array
- */
- protected $pipes;
-
- protected $read;
- protected $error;
-
- /**
- * @param string $command
- * @param string $cwd
- * @param array $env
- * @throws \RuntimeException
- */
- function __construct($command, $cwd = null, array $env = null) {
- $this->command = $command;
- $this->proc = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env);
-
- if (!is_resource($this->proc) || !($status = proc_get_status($this->proc))) {
- throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]);
- }
-
- stream_set_blocking($this->pipes[1], false);
- stream_set_blocking($this->pipes[2], false);
- }
-
- /**
- * Returns the command string
- * @return string
- */
- function __toString() {
- return (string) $this->command;
- }
-
- /**
- * Cleanup pipes and proc handle
- */
- function __destruct() {
- $this->close();
- }
-
- /**
- * @inheritdoc
- * @implements \aticker\Able
- * @param \atick\Ticker $ticker
- * @param callable $verify
- */
- function with(Ticker $ticker, callable $verify = null) {
- if (is_callable($this->read)) {
- $ticker->read($this->pipes[1], $this->read, $verify ?: array($this, "stat"));
- } elseif (is_resource($this->read)) {
- $ticker->read($this->pipes[1], function($fd) {
- if (strlen($data = fread($fd, 8192))) {
- fwrite($this->read, $data);
- }
- }, $verify ?: array($this, "stat"));
- } else {
- $ticker->read($this->pipes[1], function($fd) {
- /* nirvana */
- fread($fd, 8192);
- }, $verify ?: array($this, "stat"));
- }
-
- if (is_callable($this->error)) {
- $ticker->read($this->pipes[2], $this->error, $verify ?: array($this, "stat"));
- } elseif (is_resource($this->error)) {
- $ticker->read($this->pipes[2], function($fd) {
- if (strlen($data = fread($fd, 8192))) {
- fwrite($this->error, $data);
- }
- }, $verify ?: array($this, "stat"));
- } else {
- $ticker->read($this->pipes[2], function($fd) {
- /* nirvana */
- fread($fd, 8192);
- }, $verify ?: array($this, "stat"));
- }
- }
-
- function stat() {
- echo "STAT $this\n";
- if ($this->proc && proc_get_status($this->proc)["running"]) {
- if ((isset($this->pipes[1]) && is_resource($this->pipes[1]) && !feof($this->pipes[1]))
- && (isset($this->pipes[2]) && is_resource($this->pipes[2]) && !feof($this->pipes[2]))
- ) {
- if (isset($this->pipes[0]) && is_resource($this->pipes[0]) && !feof($this->pipes[0])) {
- return self::READABLE | self::WRITABLE;
- }
- return self::READABLE;
- }
- }
- $this->close();
- return self::CLOSED;
- }
-
- function close($what = self::CLOSED) {
- echo "PROC KILL $this $what\n";
-
- if (!$what || ($what & self::WRITABLE)) {
- if (is_resource($this->pipes[0])) {
- fclose($this->pipes[0]);
- }
- $this->pipes[0] = null;
- }
-
- if (!$what || ($what & self::READABLE)) {
- if (is_resource($this->pipes[1])) {
- fclose($this->pipes[1]);
- }
- $this->pipes[1] = null;
- if (is_resource($this->read)) {
- fclose($this->read);
- }
- if (is_resource($this->pipes[2])) {
- fclose($this->pipes[2]);
- }
- $this->pipes[2] = null;
- if (is_resource($this->error)) {
- fclose($this->error);
- }
- }
-
- if (!$what && is_resource($this->proc)) {
- proc_close($this->proc);
- $this->proc = null;
- }
- }
-
- /**
- * @inheritdoc
- * @implements \aticker\Able
- * @param string $data
- * @return int
- */
- function write($data) {
- return fwrite($this->pipes[0], $data);
- }
-
- /**
- * Where to read STDOUT into
- * @param resource|callable $into
- * @return \atick\Proc
- * @throws \InvalidArgumentException
- */
- function read($into) {
- if (is_resource($into) || is_callable($into)) {
- $this->read = $into;
- } else {
- throw new \InvalidArgumentException("Not a valid resource or callback");
- }
- return $this;
- }
-
- /**
- * Where to pass STDERR into
- * @param resource|callable $into
- * @return \atick\Proc
- * @throws \InvalidArgumentException
- */
- function error($into) {
- if (is_resource($into) || is_callable($into)) {
- $this->error = $into;
- } else {
- throw new \InvalidArgumentException("Not a valid resource or callback");
- }
- return $this;
- }
-}
return $this;
}
+ function dispatch() {
+ pcntl_signal_dispatch();
+ return $this;
+ }
+
+ function on($signal, $action) {
+ pcntl_signal($signal, $action);
+ return $this;
+ }
+
/**
* The tick handler; calls atick\Ticker::wait(0)
* @return int
return count($this->read) + count($this->write);
}
-
+
/**
* Attach a read handler
* @param resource $fd
* @return \atick\Ticker
*/
function read($fd, callable $onread, callable $verify = null) {
+ if ($fd instanceof IO) {
+ $fd = $fd->getOutput();
+ }
$this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) {
return is_resource($fd) && !feof($fd);
});
* @return \atick\Ticker
*/
function write($fd, callable $onwrite, callable $verify = null) {
+ if ($fd instanceof IO) {
+ $fd = $fd->getInput();
+ }
$this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) {
return is_resource($fd) && !feof($fd);
});
return $this;
}
+
+ /**
+ * Pipe
+ * e.g. $ticker->pipe(STDIN, new IO\Process("gzip"), new IO\Process("base64"), STDOUT);
+ * @param IO ...
+ * @return \atick\Ticker
+ */
+ function pipe(/*IO ...*/) {
+ $io = func_get_args();
+ reset($io);
+
+ do {
+ $r = current($io);
+ $w = next($io);
+
+ $this->read($r, $w ?: function($fd) {
+ stream_copy_to_stream($fd, STDOUT);
+ });
+ } while ($w);
+
+ return $this;
+ }
}