flush
[m6w6/atick] / lib / atick / Pipe.php
diff --git a/lib/atick/Pipe.php b/lib/atick/Pipe.php
new file mode 100644 (file)
index 0000000..393e2ef
--- /dev/null
@@ -0,0 +1,77 @@
+<?php
+
+namespace atick;
+
+class Pipe implements Able
+{
+       /**
+        * Output producing proc
+        * @var \atick\Able
+        */
+       protected $producer;
+
+       /**
+        * Input consuming proc
+        * @var \atick\Able
+        */
+       protected $consumer;
+
+       /**
+        * Create a pipe between two procs or other pipes
+        * @param \atick\Able $producer
+        * @param \atick\Able $consumer
+        */
+       function __construct(Able $producer, Able $consumer) {
+               $this->producer = $producer;
+               $this->consumer = $consumer;
+
+               $this->producer->read(function($fd) {
+                       if (strlen($data = fread($fd, 8192))) {
+                               $this->consumer->write($data);
+                       }
+               });
+       }
+
+       function __toString() {
+               return "$this->producer | $this->consumer";
+       }
+
+       function close($what = self::CLOSED) {
+               echo "PIPE KILL $this $what\n";
+               return $this;
+       }
+
+       function stat() {
+               echo "STAT $this\n";
+               if (!($this->producer->stat() & self::READABLE)) {
+                       if ($this->consumer->stat() & self::WRITABLE) {
+                               $this->consumer->close(self::WRITABLE);
+                               return self::READABLE;
+                       } else {
+                               $this->consumer->close(self::READABLE);
+                               return self::CLOSED;
+                       }
+               }
+               return ($this->producer->stat() & self::WRITABLE)
+                       | ($this->consumer->stat() & self::READABLE);
+       }
+
+       function write($data) {
+               return $this->producer->write($data);
+       }
+
+       function read($into) {
+               $this->consumer->read($into);
+               return $this;
+       }
+
+       function error($into) {
+               $this->consumer->error($into);
+               return $this;
+       }
+
+       function with(Ticker $ticker, callable $verify = null) {
+               $this->producer->with($ticker, $verify ?: array($this, "stat"));
+               $this->consumer->with($ticker, $verify ?: array($this, "stat"));
+       }
+}