X-Git-Url: https://git.m6w6.name/?p=m6w6%2Fatick;a=blobdiff_plain;f=lib%2Fatick%2FPipe.php;fp=lib%2Fatick%2FPipe.php;h=393e2efa6b67eb97ae417a4480e0a5f5811bb312;hp=0000000000000000000000000000000000000000;hb=69e47998dafb560e9b5ee127a56dbb1bbba40cf7;hpb=69b8d48db614677e819a0292d95ea7d3ea681732 diff --git a/lib/atick/Pipe.php b/lib/atick/Pipe.php new file mode 100644 index 0000000..393e2ef --- /dev/null +++ b/lib/atick/Pipe.php @@ -0,0 +1,77 @@ +producer = $producer; + $this->consumer = $consumer; + + $this->producer->read(function($fd) { + if (strlen($data = fread($fd, 8192))) { + $this->consumer->write($data); + } + }); + } + + function __toString() { + return "$this->producer | $this->consumer"; + } + + function close($what = self::CLOSED) { + echo "PIPE KILL $this $what\n"; + return $this; + } + + function stat() { + echo "STAT $this\n"; + if (!($this->producer->stat() & self::READABLE)) { + if ($this->consumer->stat() & self::WRITABLE) { + $this->consumer->close(self::WRITABLE); + return self::READABLE; + } else { + $this->consumer->close(self::READABLE); + return self::CLOSED; + } + } + return ($this->producer->stat() & self::WRITABLE) + | ($this->consumer->stat() & self::READABLE); + } + + function write($data) { + return $this->producer->write($data); + } + + function read($into) { + $this->consumer->read($into); + return $this; + } + + function error($into) { + $this->consumer->error($into); + return $this; + } + + function with(Ticker $ticker, callable $verify = null) { + $this->producer->with($ticker, $verify ?: array($this, "stat")); + $this->consumer->with($ticker, $verify ?: array($this, "stat")); + } +}