IO
[m6w6/atick] / lib / atick / Ticker.php
1 <?php
2
3 namespace atick;
4
5 /**
6 * Asynchronnous resource handling, optionally (ab)using ticks
7 *
8 * Example with ticks:
9 * <code>
10 * <?php
11 * declare(ticks=1);
12 *
13 * $conn = new \pq\Connection;
14 * $conn->execAsync("SELECT * FROM foo", function ($rs) {
15 * var_dump($rs);
16 * });
17 *
18 * $ticker = new \atick\Ticker;
19 * $ticker->register();
20 * $ticker->read($conn->socket, function($fd) use ($conn) {
21 * $conn->poll();
22 * if ($conn->busy) {
23 * return false;
24 * }
25 * $conn->getResult();
26 * return true;
27 * });
28 *
29 * while (count($ticker));
30 * ?>
31 * </code>
32 *
33 * And an example without ticks:
34 * <code>
35 * <?php
36 * $conn = new \pq\Connection;
37 * $conn->execAsync("SELECT * FROM foo", function ($r) {
38 * var_dump($r);
39 * });
40 *
41 * $ticker = new \atick\Ticker;
42 * $ticker->read($conn->socket, function($fd) use ($conn) {
43 * $conn->poll();
44 * if ($conn->busy) {
45 * return false;
46 * }
47 * $conn->getResult();
48 * return true;
49 * });
50 *
51 * while($ticker());
52 * ?>
53 * </code>
54 */
55 class Ticker implements \Countable
56 {
57 /**
58 * @var array
59 */
60 protected $read = array();
61
62 /**
63 * @var array
64 */
65 protected $write = array();
66
67 /**
68 * Register the ticker as tick function
69 * @return \atick\Ticker
70 */
71 function register() {
72 register_tick_function(array($this, "__invoke"));
73 return $this;
74 }
75
76 /**
77 * Unregister the ticker as tick function
78 * @return \atick\Ticker
79 */
80 function unregister() {
81 unregister_tick_function(array($this, "__invoke"));
82 return $this;
83 }
84
85 function dispatch() {
86 pcntl_signal_dispatch();
87 return $this;
88 }
89
90 function on($signal, $action) {
91 pcntl_signal($signal, $action);
92 return $this;
93 }
94
95 /**
96 * The tick handler; calls atick\Ticker::wait(0)
97 * @return int
98 */
99 function __invoke($timeout = 0) {
100 return $this->wait($timeout);
101 }
102
103 /**
104 * Wait for read/write readiness on the watched fds
105 * @param float $timeout
106 * @return int count of wached fds
107 */
108 function wait($timeout = 1) {
109 $r = $w = $e = array();
110
111 foreach ($this->read as $s) {
112 is_resource($s[0]) and $r[] = $s[0];
113 }
114
115 foreach ($this->write as $s) {
116 is_resource($s[0]) and $w[] = $s[0];
117 }
118
119 $t = (int) $timeout;
120 $u = (int) (($timeout - $t) * 1000000);
121
122 if (($r || $w) && stream_select($r, $w, $e, $t, $u)) {
123 foreach ($r as $s) {
124 $this->read[(int)$s][1]($s);
125 }
126 foreach ($w as $s) {
127 $this->write[(int)$s][1]($s);
128 }
129 }
130
131 return $this->count();
132 }
133
134 /**
135 * Returns the count of watched fds
136 * @implements \Countable
137 * @return int
138 */
139 function count() {
140 foreach ($this->read as $i => $s) {
141 list($fd,,$verify) = $s;
142 if (!$verify($fd)) {
143 unset($this->read[$i]);
144 }
145 }
146
147 foreach ($this->write as $i => $s) {
148 list($fd,,$verify) = $s;
149 if (!$verify($fd)) {
150 unset($this->write[$i]);
151 }
152 }
153
154 return count($this->read) + count($this->write);
155 }
156
157 /**
158 * Attach a read handler
159 * @param resource $fd
160 * @param callable $onread void($fd) the descriptor is readable, read data, now!
161 * @param callable $verify bool($fd) wheter the fd is still valid and should be watched
162 * @return \atick\Ticker
163 */
164 function read($fd, callable $onread, callable $verify = null) {
165 if ($fd instanceof IO) {
166 $fd = $fd->getOutput();
167 }
168 $this->read[(int)$fd] = array($fd, $onread, $verify ?: function($fd) {
169 return is_resource($fd) && !feof($fd);
170 });
171 return $this;
172 }
173
174 /**
175 * Attach a write handler
176 * @param resource $fd
177 * @param callable $onwrite void($fd) the descriptor is writable, write data.
178 * @param callable $verify bool($fd) wheter the fd is still valid and should be watched
179 * @return \atick\Ticker
180 */
181 function write($fd, callable $onwrite, callable $verify = null) {
182 if ($fd instanceof IO) {
183 $fd = $fd->getInput();
184 }
185 $this->write[(int)$fd] = array($fd, $onwrite, $verify ?: function($fd) {
186 return is_resource($fd) && !feof($fd);
187 });
188 return $this;
189 }
190
191 /**
192 * Pipe
193 * e.g. $ticker->pipe(STDIN, new IO\Process("gzip"), new IO\Process("base64"), STDOUT);
194 * @param IO ...
195 * @return \atick\Ticker
196 */
197 function pipe(/*IO ...*/) {
198 $io = func_get_args();
199 reset($io);
200
201 do {
202 $r = current($io);
203 $w = next($io);
204
205 $this->read($r, $w ?: function($fd) {
206 stream_copy_to_stream($fd, STDOUT);
207 });
208 } while ($w);
209
210 return $this;
211 }
212 }