autoload
[m6w6/atick] / lib / atick / Pipe.php
1 <?php
2
3 namespace atick;
4
5 class Pipe implements Able
6 {
7 /**
8 * Output producing proc
9 * @var \atick\Able
10 */
11 protected $producer;
12
13 /**
14 * Input consuming proc
15 * @var \atick\Able
16 */
17 protected $consumer;
18
19 /**
20 * Create a pipe between two procs or other pipes
21 * @param \atick\Able $producer
22 * @param \atick\Able $consumer
23 */
24 function __construct(Able $producer, Able $consumer) {
25 $this->producer = $producer;
26 $this->consumer = $consumer;
27
28 $this->producer->read(function($fd) {
29 if (strlen($data = fread($fd, 8192))) {
30 $this->consumer->write($data);
31 }
32 });
33 }
34
35 function __toString() {
36 return "$this->producer | $this->consumer";
37 }
38
39 function close($what = self::CLOSED) {
40 echo "PIPE KILL $this $what\n";
41 return $this;
42 }
43
44 function stat() {
45 echo "STAT $this\n";
46 if (!($this->producer->stat() & self::READABLE)) {
47 if ($this->consumer->stat() & self::WRITABLE) {
48 $this->consumer->close(self::WRITABLE);
49 return self::READABLE;
50 } else {
51 $this->consumer->close(self::READABLE);
52 return self::CLOSED;
53 }
54 }
55 return ($this->producer->stat() & self::WRITABLE)
56 | ($this->consumer->stat() & self::READABLE);
57 }
58
59 function write($data) {
60 return $this->producer->write($data);
61 }
62
63 function read($into) {
64 $this->consumer->read($into);
65 return $this;
66 }
67
68 function error($into) {
69 $this->consumer->error($into);
70 return $this;
71 }
72
73 function with(Ticker $ticker, callable $verify = null) {
74 $this->producer->with($ticker, $verify ?: array($this, "stat"));
75 $this->consumer->with($ticker, $verify ?: array($this, "stat"));
76 }
77 }