flush
[m6w6/seekat] / lib / API / Consumer.php
1 <?php
2
3 namespace seekat\API;
4
5 use AsyncInterop\Promise;
6 use Generator;
7 use http\Client;
8 use seekat\API;
9 use seekat\Exception\{
10 InvalidArgumentException, UnexpectedValueException, function exception
11 };
12
13 final class Consumer
14 {
15 /**
16 * Loop
17 * @var callable
18 */
19 private $loop;
20
21 /**
22 * The return value of the generator
23 * @var mixed
24 */
25 private $result;
26
27 /**
28 * Cancellation flag
29 * @var bool
30 */
31 private $cancelled = false;
32
33 /**
34 * @var Promise
35 */
36 private $promise;
37
38 /**
39 * @var \Closure
40 */
41 private $resolve;
42
43 /**
44 * @var \Closure
45 */
46 private $reject;
47
48 /**
49 * @var \Closure
50 */
51 private $reduce;
52
53 /**
54 * Create a new generator consumer
55 * @param Future $future
56 * @param callable $loop
57 */
58 function __construct(Future $future, callable $loop) {
59 $this->loop = $loop;
60
61 $context = $future->createContext(function() {
62 $this->cancelled = true;
63 });
64 $this->promise = $future->getPromise($context);
65 $this->resolve = API\Future\resolver($future, $context);
66 $this->reject = API\Future\rejecter($future, $context);
67 $this->reduce = API\Future\reducer($future, $context);
68 }
69
70 /**
71 * Iterate over $gen, a \Generator yielding promises
72 *
73 * @param Generator $gen
74 * @return Promise
75 */
76 function __invoke(Generator $gen) : Promise {
77 $this->cancelled = false;
78
79 foreach ($gen as $promise) {
80 if ($this->cancelled) {
81 break;
82 }
83 $this->give($promise, $gen);
84 }
85
86 #($this->loop)();
87
88 if (!$this->cancelled) {
89 $this->result = $gen->getReturn();
90 }
91 if (isset($this->result)) {
92 ($this->resolve)($this->result);
93 } else {
94 ($this->reject)("Cancelled");
95 }
96
97 return $this->promise;
98 }
99
100 /**
101 * Promise handler
102 *
103 * @param array|Promise $promise
104 * @param Generator $gen
105 */
106 private function give($promise, Generator $gen) {
107 if ($promise instanceof \Traversable) {
108 $promise = iterator_to_array($promise);
109 }
110 if (is_array($promise)) {
111 $promise = ($this->reduce)($promise);
112 }
113 if ($promise instanceof Promise) {
114 $promise->when(function($error, $result) use($gen) {
115 if ($error) {
116 $gen->throw(exception($error));
117 }
118 if (($promise = $gen->send($result))) {
119 $this->give($promise, $gen);
120 }
121 });
122 } else {
123 $gen->throw(new UnexpectedValueException(
124 "Expected Promise or array of Promises; got ".\seekat\typeof($promise)));
125 }
126 /* FIXME: external loop */
127 ($this->loop)();
128 }
129 }