--- /dev/null
+<?php
+
+namespace atick;
+
+interface Able
+{
+ const CLOSED = 0;
+ const READABLE = 1;
+ const WRITABLE = 2;
+
+ /**
+ * Register any output streams with the ticker
+ * @param \atick\Ticker $ticker
+ * @param callable $verify
+ */
+ function with(Ticker $ticker, callable $verify = null);
+
+ /**
+ * Pass data to the input stream
+ * @param string $data
+ */
+ function write($data);
+
+ /**
+ * Where to send output to
+ * @param resource|callable $into
+ * @return \atick\Able
+ */
+ function read($into);
+
+ /**
+ * Where to send error output to
+ * @param resource|callable $into
+ * @return \atick\Able
+ */
+ function error($into);
+
+ /**
+ * Whether the pipe/proc is alive
+ * @return int
+ */
+ function stat();
+
+ /**
+ * Shutdown the pipe/proc
+ * @param int $what
+ */
+ function close($what = self::CLOSED);
+}
--- /dev/null
+<?php
+
+namespace atick;
+
+class Pipe implements Able
+{
+ /**
+ * Output producing proc
+ * @var \atick\Able
+ */
+ protected $producer;
+
+ /**
+ * Input consuming proc
+ * @var \atick\Able
+ */
+ protected $consumer;
+
+ /**
+ * Create a pipe between two procs or other pipes
+ * @param \atick\Able $producer
+ * @param \atick\Able $consumer
+ */
+ function __construct(Able $producer, Able $consumer) {
+ $this->producer = $producer;
+ $this->consumer = $consumer;
+
+ $this->producer->read(function($fd) {
+ if (strlen($data = fread($fd, 8192))) {
+ $this->consumer->write($data);
+ }
+ });
+ }
+
+ function __toString() {
+ return "$this->producer | $this->consumer";
+ }
+
+ function close($what = self::CLOSED) {
+ echo "PIPE KILL $this $what\n";
+ return $this;
+ }
+
+ function stat() {
+ echo "STAT $this\n";
+ if (!($this->producer->stat() & self::READABLE)) {
+ if ($this->consumer->stat() & self::WRITABLE) {
+ $this->consumer->close(self::WRITABLE);
+ return self::READABLE;
+ } else {
+ $this->consumer->close(self::READABLE);
+ return self::CLOSED;
+ }
+ }
+ return ($this->producer->stat() & self::WRITABLE)
+ | ($this->consumer->stat() & self::READABLE);
+ }
+
+ function write($data) {
+ return $this->producer->write($data);
+ }
+
+ function read($into) {
+ $this->consumer->read($into);
+ return $this;
+ }
+
+ function error($into) {
+ $this->consumer->error($into);
+ return $this;
+ }
+
+ function with(Ticker $ticker, callable $verify = null) {
+ $this->producer->with($ticker, $verify ?: array($this, "stat"));
+ $this->consumer->with($ticker, $verify ?: array($this, "stat"));
+ }
+}
--- /dev/null
+<?php
+
+namespace atick;
+
+class Proc implements Able
+{
+ /**
+ * Command string
+ * @var string
+ */
+ protected $command;
+
+ /**
+ * Process handle
+ * @var resource
+ */
+ protected $proc;
+
+ /**
+ * Proc's pipes
+ * @var array
+ */
+ protected $pipes;
+
+ protected $read;
+ protected $error;
+
+ /**
+ * @param string $command
+ * @param string $cwd
+ * @param array $env
+ * @throws \RuntimeException
+ */
+ function __construct($command, $cwd = null, array $env = null) {
+ $this->command = $command;
+ $this->proc = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env);
+
+ if (!is_resource($this->proc) || !($status = proc_get_status($this->proc))) {
+ throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]);
+ }
+
+ stream_set_blocking($this->pipes[1], false);
+ stream_set_blocking($this->pipes[2], false);
+ }
+
+ /**
+ * Returns the command string
+ * @return string
+ */
+ function __toString() {
+ return (string) $this->command;
+ }
+
+ /**
+ * Cleanup pipes and proc handle
+ */
+ function __destruct() {
+ $this->close();
+ }
+
+ /**
+ * @inheritdoc
+ * @implements \aticker\Able
+ * @param \atick\Ticker $ticker
+ * @param callable $verify
+ */
+ function with(Ticker $ticker, callable $verify = null) {
+ if (is_callable($this->read)) {
+ $ticker->read($this->pipes[1], $this->read, $verify ?: array($this, "stat"));
+ } elseif (is_resource($this->read)) {
+ $ticker->read($this->pipes[1], function($fd) {
+ if (strlen($data = fread($fd, 8192))) {
+ fwrite($this->read, $data);
+ }
+ }, $verify ?: array($this, "stat"));
+ } else {
+ $ticker->read($this->pipes[1], function($fd) {
+ /* nirvana */
+ fread($fd, 8192);
+ }, $verify ?: array($this, "stat"));
+ }
+
+ if (is_callable($this->error)) {
+ $ticker->read($this->pipes[2], $this->error, $verify ?: array($this, "stat"));
+ } elseif (is_resource($this->error)) {
+ $ticker->read($this->pipes[2], function($fd) {
+ if (strlen($data = fread($fd, 8192))) {
+ fwrite($this->error, $data);
+ }
+ }, $verify ?: array($this, "stat"));
+ } else {
+ $ticker->read($this->pipes[2], function($fd) {
+ /* nirvana */
+ fread($fd, 8192);
+ }, $verify ?: array($this, "stat"));
+ }
+ }
+
+ function stat() {
+ echo "STAT $this\n";
+ if ($this->proc && proc_get_status($this->proc)["running"]) {
+ if ((isset($this->pipes[1]) && is_resource($this->pipes[1]) && !feof($this->pipes[1]))
+ && (isset($this->pipes[2]) && is_resource($this->pipes[2]) && !feof($this->pipes[2]))
+ ) {
+ if (isset($this->pipes[0]) && is_resource($this->pipes[0]) && !feof($this->pipes[0])) {
+ return self::READABLE | self::WRITABLE;
+ }
+ return self::READABLE;
+ }
+ }
+ $this->close();
+ return self::CLOSED;
+ }
+
+ function close($what = self::CLOSED) {
+ echo "PROC KILL $this $what\n";
+
+ if (!$what || ($what & self::WRITABLE)) {
+ if (is_resource($this->pipes[0])) {
+ fclose($this->pipes[0]);
+ }
+ $this->pipes[0] = null;
+ }
+
+ if (!$what || ($what & self::READABLE)) {
+ if (is_resource($this->pipes[1])) {
+ fclose($this->pipes[1]);
+ }
+ $this->pipes[1] = null;
+ if (is_resource($this->read)) {
+ fclose($this->read);
+ }
+ if (is_resource($this->pipes[2])) {
+ fclose($this->pipes[2]);
+ }
+ $this->pipes[2] = null;
+ if (is_resource($this->error)) {
+ fclose($this->error);
+ }
+ }
+
+ if (!$what && is_resource($this->proc)) {
+ proc_close($this->proc);
+ $this->proc = null;
+ }
+ }
+
+ /**
+ * @inheritdoc
+ * @implements \aticker\Able
+ * @param string $data
+ * @return int
+ */
+ function write($data) {
+ return fwrite($this->pipes[0], $data);
+ }
+
+ /**
+ * Where to read STDOUT into
+ * @param resource|callable $into
+ * @return \atick\Proc
+ * @throws \InvalidArgumentException
+ */
+ function read($into) {
+ if (is_resource($into) || is_callable($into)) {
+ $this->read = $into;
+ } else {
+ throw new \InvalidArgumentException("Not a valid resource or callback");
+ }
+ return $this;
+ }
+
+ /**
+ * Where to pass STDERR into
+ * @param resource|callable $into
+ * @return \atick\Proc
+ * @throws \InvalidArgumentException
+ */
+ function error($into) {
+ if (is_resource($into) || is_callable($into)) {
+ $this->error = $into;
+ } else {
+ throw new \InvalidArgumentException("Not a valid resource or callback");
+ }
+ return $this;
+ }
+}
* The tick handler; calls atick\Ticker::wait(0)
* @return int
*/
- function __invoke() {
- return $this->wait(0);
+ function __invoke($timeout = 0) {
+ return $this->wait($timeout);
}
/**
*/
function wait($timeout = 1) {
$r = $w = $e = array();
+
foreach ($this->read as $s) {
- $r[] = $s[0];
+ is_resource($s[0]) and $r[] = $s[0];
}
+
foreach ($this->write as $s) {
- $w[] = $s[0];
+ is_resource($s[0]) and $w[] = $s[0];
}
- $s = (int) $timeout;
- $u = (int) (($timeout - $s) * 1000000);
- if (($r || $w) && stream_select($r, $w, $e, $s, $u)) {
+
+ $t = (int) $timeout;
+ $u = (int) (($timeout - $t) * 1000000);
+
+ if (($r || $w) && stream_select($r, $w, $e, $t, $u)) {
foreach ($r as $s) {
- if ($this->read[(int)$s][1]($s)) {
- unset($this->read[(int)$s]);
- }
+ $this->read[(int)$s][1]($s);
}
foreach ($w as $s) {
- if ($this->write[(int)$s][1]($s)) {
- unset($this->write[(int)$s]);
- }
+ $this->write[(int)$s][1]($s);
}
}
+
return $this->count();
}
* @return int
*/
function count() {
+ foreach ($this->read as $i => $s) {
+ list($fd,,$verify) = $s;
+ if (!$verify($fd)) {
+ unset($this->read[$i]);
+ }
+ }
+
+ foreach ($this->write as $i => $s) {
+ list($fd,,$verify) = $s;
+ if (!$verify($fd)) {
+ unset($this->write[$i]);
+ }
+ }
+
return count($this->read) + count($this->write);
}
/**
- * Attach a read handler; let the callback return true, to stop watching the fd.
+ * Attach a read handler
* @param resource $fd
- * @param callable $cb
+ * @param callable $onread void($fd) the descriptor is readable, read data, now!
+ * @param callable $verify bool($fd) wheter the fd is still valid and should be watched
* @return \atick\Ticker
*/
- function read($fd, callable $cb) {
- $this->read[(int)$fd] = array($fd, $cb);
+ function read($fd, callable $onread, callable $verify = null) {
+ $this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) {
+ return is_resource($fd) && !feof($fd);
+ });
return $this;
}
/**
- * Attach a write handler; let the callback return true, to stop watching the fd.
+ * Attach a write handler
* @param resource $fd
- * @param callable $cb
+ * @param callable $onwrite void($fd) the descriptor is writable, write data.
+ * @param callable $verify bool($fd) wheter the fd is still valid and should be watched
* @return \atick\Ticker
*/
- function write($fd, callable $cb) {
- $this->write[(int)$fd] = array($fd, $cb);
+ function write($fd, callable $onwrite, callable $verify = null) {
+ $this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) {
+ return is_resource($fd) && !feof($fd);
+ });
return $this;
}
}