From: Michael Wallner Date: Fri, 15 Mar 2013 14:21:10 +0000 (+0100) Subject: flush X-Git-Url: https://git.m6w6.name/?p=m6w6%2Fatick;a=commitdiff_plain;h=69e47998dafb560e9b5ee127a56dbb1bbba40cf7 flush --- diff --git a/lib/atick/Able.php b/lib/atick/Able.php new file mode 100644 index 0000000..8bcac42 --- /dev/null +++ b/lib/atick/Able.php @@ -0,0 +1,49 @@ +producer = $producer; + $this->consumer = $consumer; + + $this->producer->read(function($fd) { + if (strlen($data = fread($fd, 8192))) { + $this->consumer->write($data); + } + }); + } + + function __toString() { + return "$this->producer | $this->consumer"; + } + + function close($what = self::CLOSED) { + echo "PIPE KILL $this $what\n"; + return $this; + } + + function stat() { + echo "STAT $this\n"; + if (!($this->producer->stat() & self::READABLE)) { + if ($this->consumer->stat() & self::WRITABLE) { + $this->consumer->close(self::WRITABLE); + return self::READABLE; + } else { + $this->consumer->close(self::READABLE); + return self::CLOSED; + } + } + return ($this->producer->stat() & self::WRITABLE) + | ($this->consumer->stat() & self::READABLE); + } + + function write($data) { + return $this->producer->write($data); + } + + function read($into) { + $this->consumer->read($into); + return $this; + } + + function error($into) { + $this->consumer->error($into); + return $this; + } + + function with(Ticker $ticker, callable $verify = null) { + $this->producer->with($ticker, $verify ?: array($this, "stat")); + $this->consumer->with($ticker, $verify ?: array($this, "stat")); + } +} diff --git a/lib/atick/Proc.php b/lib/atick/Proc.php new file mode 100644 index 0000000..13e240c --- /dev/null +++ b/lib/atick/Proc.php @@ -0,0 +1,187 @@ +command = $command; + $this->proc = proc_open($command, [["pipe","r"],["pipe","w"],["pipe","w"]], $this->pipes, $cwd, $env); + + if (!is_resource($this->proc) || !($status = proc_get_status($this->proc))) { + throw new \RuntimeException("Could not open proc '$command': " . error_get_last()["message"]); + } + + stream_set_blocking($this->pipes[1], false); + stream_set_blocking($this->pipes[2], false); + } + + /** + * Returns the command string + * @return string + */ + function __toString() { + return (string) $this->command; + } + + /** + * Cleanup pipes and proc handle + */ + function __destruct() { + $this->close(); + } + + /** + * @inheritdoc + * @implements \aticker\Able + * @param \atick\Ticker $ticker + * @param callable $verify + */ + function with(Ticker $ticker, callable $verify = null) { + if (is_callable($this->read)) { + $ticker->read($this->pipes[1], $this->read, $verify ?: array($this, "stat")); + } elseif (is_resource($this->read)) { + $ticker->read($this->pipes[1], function($fd) { + if (strlen($data = fread($fd, 8192))) { + fwrite($this->read, $data); + } + }, $verify ?: array($this, "stat")); + } else { + $ticker->read($this->pipes[1], function($fd) { + /* nirvana */ + fread($fd, 8192); + }, $verify ?: array($this, "stat")); + } + + if (is_callable($this->error)) { + $ticker->read($this->pipes[2], $this->error, $verify ?: array($this, "stat")); + } elseif (is_resource($this->error)) { + $ticker->read($this->pipes[2], function($fd) { + if (strlen($data = fread($fd, 8192))) { + fwrite($this->error, $data); + } + }, $verify ?: array($this, "stat")); + } else { + $ticker->read($this->pipes[2], function($fd) { + /* nirvana */ + fread($fd, 8192); + }, $verify ?: array($this, "stat")); + } + } + + function stat() { + echo "STAT $this\n"; + if ($this->proc && proc_get_status($this->proc)["running"]) { + if ((isset($this->pipes[1]) && is_resource($this->pipes[1]) && !feof($this->pipes[1])) + && (isset($this->pipes[2]) && is_resource($this->pipes[2]) && !feof($this->pipes[2])) + ) { + if (isset($this->pipes[0]) && is_resource($this->pipes[0]) && !feof($this->pipes[0])) { + return self::READABLE | self::WRITABLE; + } + return self::READABLE; + } + } + $this->close(); + return self::CLOSED; + } + + function close($what = self::CLOSED) { + echo "PROC KILL $this $what\n"; + + if (!$what || ($what & self::WRITABLE)) { + if (is_resource($this->pipes[0])) { + fclose($this->pipes[0]); + } + $this->pipes[0] = null; + } + + if (!$what || ($what & self::READABLE)) { + if (is_resource($this->pipes[1])) { + fclose($this->pipes[1]); + } + $this->pipes[1] = null; + if (is_resource($this->read)) { + fclose($this->read); + } + if (is_resource($this->pipes[2])) { + fclose($this->pipes[2]); + } + $this->pipes[2] = null; + if (is_resource($this->error)) { + fclose($this->error); + } + } + + if (!$what && is_resource($this->proc)) { + proc_close($this->proc); + $this->proc = null; + } + } + + /** + * @inheritdoc + * @implements \aticker\Able + * @param string $data + * @return int + */ + function write($data) { + return fwrite($this->pipes[0], $data); + } + + /** + * Where to read STDOUT into + * @param resource|callable $into + * @return \atick\Proc + * @throws \InvalidArgumentException + */ + function read($into) { + if (is_resource($into) || is_callable($into)) { + $this->read = $into; + } else { + throw new \InvalidArgumentException("Not a valid resource or callback"); + } + return $this; + } + + /** + * Where to pass STDERR into + * @param resource|callable $into + * @return \atick\Proc + * @throws \InvalidArgumentException + */ + function error($into) { + if (is_resource($into) || is_callable($into)) { + $this->error = $into; + } else { + throw new \InvalidArgumentException("Not a valid resource or callback"); + } + return $this; + } +} diff --git a/lib/atick/Ticker.php b/lib/atick/Ticker.php index 1dbf4e8..f009b5e 100644 --- a/lib/atick/Ticker.php +++ b/lib/atick/Ticker.php @@ -86,8 +86,8 @@ class Ticker implements \Countable * The tick handler; calls atick\Ticker::wait(0) * @return int */ - function __invoke() { - return $this->wait(0); + function __invoke($timeout = 0) { + return $this->wait($timeout); } /** @@ -97,26 +97,27 @@ class Ticker implements \Countable */ function wait($timeout = 1) { $r = $w = $e = array(); + foreach ($this->read as $s) { - $r[] = $s[0]; + is_resource($s[0]) and $r[] = $s[0]; } + foreach ($this->write as $s) { - $w[] = $s[0]; + is_resource($s[0]) and $w[] = $s[0]; } - $s = (int) $timeout; - $u = (int) (($timeout - $s) * 1000000); - if (($r || $w) && stream_select($r, $w, $e, $s, $u)) { + + $t = (int) $timeout; + $u = (int) (($timeout - $t) * 1000000); + + if (($r || $w) && stream_select($r, $w, $e, $t, $u)) { foreach ($r as $s) { - if ($this->read[(int)$s][1]($s)) { - unset($this->read[(int)$s]); - } + $this->read[(int)$s][1]($s); } foreach ($w as $s) { - if ($this->write[(int)$s][1]($s)) { - unset($this->write[(int)$s]); - } + $this->write[(int)$s][1]($s); } } + return $this->count(); } @@ -126,28 +127,48 @@ class Ticker implements \Countable * @return int */ function count() { + foreach ($this->read as $i => $s) { + list($fd,,$verify) = $s; + if (!$verify($fd)) { + unset($this->read[$i]); + } + } + + foreach ($this->write as $i => $s) { + list($fd,,$verify) = $s; + if (!$verify($fd)) { + unset($this->write[$i]); + } + } + return count($this->read) + count($this->write); } /** - * Attach a read handler; let the callback return true, to stop watching the fd. + * Attach a read handler * @param resource $fd - * @param callable $cb + * @param callable $onread void($fd) the descriptor is readable, read data, now! + * @param callable $verify bool($fd) wheter the fd is still valid and should be watched * @return \atick\Ticker */ - function read($fd, callable $cb) { - $this->read[(int)$fd] = array($fd, $cb); + function read($fd, callable $onread, callable $verify = null) { + $this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) { + return is_resource($fd) && !feof($fd); + }); return $this; } /** - * Attach a write handler; let the callback return true, to stop watching the fd. + * Attach a write handler * @param resource $fd - * @param callable $cb + * @param callable $onwrite void($fd) the descriptor is writable, write data. + * @param callable $verify bool($fd) wheter the fd is still valid and should be watched * @return \atick\Ticker */ - function write($fd, callable $cb) { - $this->write[(int)$fd] = array($fd, $cb); + function write($fd, callable $onwrite, callable $verify = null) { + $this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) { + return is_resource($fd) && !feof($fd); + }); return $this; } }