drop async-interop
[m6w6/seekat] / lib / API / Consumer.php
1 <?php
2
3 namespace seekat\API;
4
5 use Generator;
6 use http\Client;
7 use seekat\API;
8 use seekat\Exception\{
9 InvalidArgumentException, UnexpectedValueException, function exception
10 };
11
12 final class Consumer
13 {
14 /**
15 * Loop
16 * @var callable
17 */
18 private $loop;
19
20 /**
21 * The return value of the generator
22 * @var mixed
23 */
24 private $result;
25
26 /**
27 * Cancellation flag
28 * @var bool
29 */
30 private $cancelled = false;
31
32 /**
33 * @var Future
34 */
35 private $future;
36
37 /**
38 * @var mixed
39 */
40 private $context;
41
42 /**
43 * @var \Closure
44 */
45 private $resolve;
46
47 /**
48 * @var \Closure
49 */
50 private $reject;
51
52 /**
53 * @var \Closure
54 */
55 private $reduce;
56
57 /**
58 * Create a new generator consumer
59 * @param Future $future
60 * @param callable $loop
61 */
62 function __construct(Future $future, callable $loop) {
63 $this->loop = $loop;
64
65 $this->future = $future;
66 $this->context = $future->createContext(function() {
67 $this->cancelled = true;
68 });
69 $this->resolve = API\Future\resolver($future, $this->context);
70 $this->reject = API\Future\rejecter($future, $this->context);
71 $this->reduce = API\Future\reducer($future, $this->context);
72 }
73
74 /**
75 * Iterate over $gen, a \Generator yielding promises
76 *
77 * @param Generator $gen
78 * @return mixed promise
79 */
80 function __invoke(Generator $gen) {
81 $this->cancelled = false;
82
83 foreach ($gen as $promise) {
84 if ($this->cancelled) {
85 break;
86 }
87 $this->give($promise, $gen);
88 }
89
90 #($this->loop)();
91
92 if (!$this->cancelled) {
93 $this->result = $gen->getReturn();
94 }
95 if (isset($this->result)) {
96 ($this->resolve)($this->result);
97 } else {
98 ($this->reject)("Cancelled");
99 }
100
101 return $this->context->promise();
102 }
103
104 /**
105 * Promise handler
106 *
107 * @param mixed $promise
108 * @param Generator $gen
109 */
110 private function give($promise, Generator $gen) {
111 if ($promise instanceof \Traversable) {
112 $promise = iterator_to_array($promise);
113 }
114 if (is_array($promise)) {
115 $promise = ($this->reduce)($promise);
116 }
117
118 $this->future->handlePromise($promise, function($result) use($gen) {
119 if (($promise = $gen->send($result))) {
120 $this->give($promise, $gen);
121 }
122 }, function($error) use($gen) {
123 $gen->throw(exception($error));
124 });
125
126 /* FIXME: external loop */
127 ($this->loop)();
128 }
129 }