5 class Pipe
implements Able
8 * Output producing proc
14 * Input consuming proc
20 * Create a pipe between two procs or other pipes
21 * @param \atick\Able $producer
22 * @param \atick\Able $consumer
24 function __construct(Able
$producer, Able
$consumer) {
25 $this->producer
= $producer;
26 $this->consumer
= $consumer;
28 $this->producer
->read(function($fd) {
29 if (strlen($data = fread($fd, 8192))) {
30 $this->consumer
->write($data);
35 function __toString() {
36 return "$this->producer | $this->consumer";
39 function close($what = self
::CLOSED
) {
40 echo "PIPE KILL $this $what\n";
46 if (!($this->producer
->stat() & self
::READABLE
)) {
47 if ($this->consumer
->stat() & self
::WRITABLE
) {
48 $this->consumer
->close(self
::WRITABLE
);
49 return self
::READABLE
;
51 $this->consumer
->close(self
::READABLE
);
55 return ($this->producer
->stat() & self
::WRITABLE
)
56 |
($this->consumer
->stat() & self
::READABLE
);
59 function write($data) {
60 return $this->producer
->write($data);
63 function read($into) {
64 $this->consumer
->read($into);
68 function error($into) {
69 $this->consumer
->error($into);
73 function with(Ticker
$ticker, callable
$verify = null) {
74 $this->producer
->with($ticker, $verify ?
: array($this, "stat"));
75 $this->consumer
->with($ticker, $verify ?
: array($this, "stat"));