+++ /dev/null
-<?php
-
-namespace atick;
-
-class Pipe implements Able
-{
- /**
- * Output producing proc
- * @var \atick\Able
- */
- protected $producer;
-
- /**
- * Input consuming proc
- * @var \atick\Able
- */
- protected $consumer;
-
- /**
- * Create a pipe between two procs or other pipes
- * @param \atick\Able $producer
- * @param \atick\Able $consumer
- */
- function __construct(Able $producer, Able $consumer) {
- $this->producer = $producer;
- $this->consumer = $consumer;
-
- $this->producer->read(function($fd) {
- if (strlen($data = fread($fd, 8192))) {
- $this->consumer->write($data);
- }
- });
- }
-
- function __toString() {
- return "$this->producer | $this->consumer";
- }
-
- function close($what = self::CLOSED) {
- echo "PIPE KILL $this $what\n";
- return $this;
- }
-
- function stat() {
- echo "STAT $this\n";
- if (!($this->producer->stat() & self::READABLE)) {
- if ($this->consumer->stat() & self::WRITABLE) {
- $this->consumer->close(self::WRITABLE);
- return self::READABLE;
- } else {
- $this->consumer->close(self::READABLE);
- return self::CLOSED;
- }
- }
- return ($this->producer->stat() & self::WRITABLE)
- | ($this->consumer->stat() & self::READABLE);
- }
-
- function write($data) {
- return $this->producer->write($data);
- }
-
- function read($into) {
- $this->consumer->read($into);
- return $this;
- }
-
- function error($into) {
- $this->consumer->error($into);
- return $this;
- }
-
- function with(Ticker $ticker, callable $verify = null) {
- $this->producer->with($ticker, $verify ?: array($this, "stat"));
- $this->consumer->with($ticker, $verify ?: array($this, "stat"));
- }
-}