3af614caa9355335c70278f36395f2f5449f8d17
[m6w6/seekat] / lib / API / Consumer.php
1 <?php
2
3 namespace seekat\API;
4
5 use Exception;
6 use Generator;
7 use seekat\API;
8 use seekat\Exception\{function exception};
9
10 final class Consumer {
11 /**
12 * Loop
13 * @var callable
14 */
15 private $loop;
16
17 /**
18 * The return value of the generator
19 * @var mixed
20 */
21 private $result;
22
23 /**
24 * Cancellation flag
25 * @var bool
26 */
27 private $cancelled = false;
28
29 /**
30 * @var Future
31 */
32 private $future;
33
34 /**
35 * @var mixed
36 */
37 private $context;
38
39 /**
40 * @var \Closure
41 */
42 private $resolve;
43
44 /**
45 * @var \Closure
46 */
47 private $reject;
48
49 /**
50 * @var \Closure
51 */
52 private $reduce;
53
54 /**
55 * Create a new generator consumer
56 * @param Future $future
57 * @param callable $loop
58 */
59 function __construct(Future $future, callable $loop) {
60 $this->loop = $loop;
61
62 $this->future = $future;
63 $this->context = $future->createContext(function() {
64 $this->cancelled = true;
65 });
66 $this->resolve = API\Future\resolver($future, $this->context);
67 $this->reject = API\Future\rejecter($future, $this->context);
68 $this->reduce = API\Future\reducer($future, $this->context);
69 }
70
71 /**
72 * Iterate over $gen, a \Generator yielding promises
73 *
74 * @param Generator $gen
75 * @return mixed promise
76 */
77 function __invoke(Generator $gen) {
78 $this->cancelled = false;
79 foreach ($gen as $promise) {
80 if ($this->cancelled) {
81 break;
82 }
83 $this->give($promise, $gen);
84 }
85
86 if ($this->cancelled) {
87 ($this->reject)("Cancelled");
88 } else {
89 try {
90 $this->result = $gen->getReturn();
91 } catch (Exception $e) {
92 }
93 ($this->resolve)($this->result);
94 }
95
96 return $this->context->promise();
97 }
98
99 /**
100 * Promise handler
101 *
102 * @param mixed $promise
103 * @param Generator $gen
104 */
105 private function give($promise, Generator $gen) {
106 if ($promise instanceof \Traversable) {
107 $promise = iterator_to_array($promise);
108 }
109 if (is_array($promise)) {
110 $promise = ($this->reduce)($promise);
111 }
112
113 $this->future->handlePromise($promise, function($result) use($gen) {
114 if (($promise = $gen->send($result))) {
115 $this->give($promise, $gen);
116 }
117 }, function($error) use($gen) {
118 $gen->throw(exception($error));
119 });
120
121 /* FIXME: external loop */
122 ($this->loop)();
123 }
124 }