cdc3364a46891c766a56c0f8f845647e1cc7ab97
[m6w6/atick] / lib / atick / Ticker.php
1 <?php
2
3 namespace atick;
4
5 /**
6 * Asynchronnous resource handling, optionally (ab)using ticks
7 *
8 * Example with ticks:
9 * <code>
10 * <?php
11 * declare(ticks=1);
12 *
13 * $conn = new \pq\Connection;
14 * $conn->execAsync("SELECT * FROM foo", function ($rs) {
15 * var_dump($rs);
16 * });
17 *
18 * $ticker = new \atick\Ticker;
19 * $ticker->register();
20 * $ticker->read($conn->socket, function($fd) use ($conn) {
21 * $conn->poll();
22 * if ($conn->busy) {
23 * return false;
24 * }
25 * $conn->getResult();
26 * return true;
27 * });
28 *
29 * while (count($ticker));
30 * ?>
31 * </code>
32 *
33 * And an example without ticks:
34 * <code>
35 * <?php
36 * $conn = new \pq\Connection;
37 * $conn->execAsync("SELECT * FROM foo", function ($r) {
38 * var_dump($r);
39 * });
40 *
41 * $ticker = new \atick\Ticker;
42 * $ticker->read($conn->socket, function($fd) use ($conn) {
43 * $conn->poll();
44 * if ($conn->busy) {
45 * return false;
46 * }
47 * $conn->getResult();
48 * return true;
49 * });
50 *
51 * while($ticker());
52 * ?>
53 * </code>
54 */
55 class Ticker implements \Countable
56 {
57 /**
58 * @var array
59 */
60 protected $read = array();
61
62 /**
63 * @var array
64 */
65 protected $write = array();
66
67 /**
68 * Register the ticker as tick function
69 * @return \atick\Ticker
70 */
71 function register() {
72 register_tick_function(array($this, "__invoke"));
73 return $this;
74 }
75
76 /**
77 * Unregister the ticker as tick function
78 * @return \atick\Ticker
79 */
80 function unregister() {
81 unregister_tick_function(array($this, "__invoke"));
82 return $this;
83 }
84
85 /**
86 * The tick handler; calls atick\Ticker::wait(0)
87 * @return int
88 */
89 function __invoke() {
90 return $this->wait(0);
91 }
92
93 /**
94 * Wait for read/write readiness on the watched fds
95 * @param float $timeout
96 * @return int count of wached fds
97 */
98 function wait($timeout = 1) {
99 $r = $w = $e = array();
100 foreach ($this->read as $s) {
101 $r[] = $s[0];
102 }
103 foreach ($this->write as $s) {
104 $w[] = $s[0];
105 }
106 $s = (int) $timeout;
107 $u = (int) (($timeout - $s) * 1000000);
108 if (($r || $w) && stream_select($r, $w, $e, $s, $u)) {
109 foreach ($r as $s) {
110 if ($this->read[(int)$s][1]($s)) {
111 unset($this->read[(int)$s]);
112 }
113 }
114 foreach ($w as $s) {
115 if ($this->write[(int)$s][1]($s)) {
116 unset($this->write[(int)$s]);
117 }
118 }
119 }
120 return $this->count();
121 }
122
123 /**
124 * @implements \Countable
125 * @return int
126 */
127 function count() {
128 return count($this->read) + count($this->write);
129 }
130
131 /**
132 * Attach a read handler; let the callback return true, to stop watching the fd.
133 * @param resource $fd
134 * @param callable $cb
135 * @return \atick\Ticker
136 */
137 function read($fd, callable $cb) {
138 $this->read[(int)$fd] = array($fd, $cb);
139 return $this;
140 }
141
142 /**
143 * Attach a write handler; let the callback return true, to stop watching the fd.
144 * @param int $fd
145 * @param callable $cb
146 * @return \atick\Ticker
147 */
148 function write($fd, callable $cb) {
149 $this->write[(int)$fd] = array($fd, $cb);
150 return $this;
151 }
152 }