5e72ef2a0aa91f9e362e44c0dcf52c9af474dcfe
[m6w6/seekat] / lib / API / Consumer.php
1 <?php
2
3 namespace seekat\API;
4
5 use Generator;
6 use http\Client;
7 use React\Promise\{
8 Deferred,
9 ExtendedPromiseInterface,
10 PromiseInterface,
11 function all
12 };
13
14 class Consumer extends Deferred
15 {
16 /**
17 * The HTTP client
18 * @var Client
19 */
20 private $client;
21
22 /**
23 * The return value of the generator
24 * @var mixed
25 */
26 private $result;
27
28 /**
29 * Cancellation flag
30 * @var bool
31 */
32 private $cancelled = false;
33
34 /**
35 * Create a new generator invoker
36 * @param Client $client
37 */
38 function __construct(Client $client) {
39 $this->client = $client;
40
41 parent::__construct(function($resolve, $reject) {
42 return $this->cancel($resolve, $reject);
43 });
44 }
45
46 /**
47 * Iterate over $gen, a \Generator yielding promises
48 *
49 * @param Generator $gen
50 * @return ExtendedPromiseInterface
51 */
52 function __invoke(Generator $gen) : ExtendedPromiseInterface {
53 $this->cancelled = false;
54
55 foreach ($gen as $promise) {
56 if ($this->cancelled) {
57 break;
58 }
59 $this->give($promise, $gen);
60 }
61
62 if (!$this->cancelled) {
63 $this->resolve($this->result = $gen->getReturn());
64 }
65
66 return $this->promise();
67 }
68
69 /**
70 * Promise handler
71 *
72 * @param array|PromiseInterface $promise
73 * @param Generator $gen
74 */
75 private function give($promise, Generator $gen) {
76 if ($promise instanceof PromiseInterface) {
77 $promise->then(function($result) use($gen) {
78 if (($promise = $gen->send($result))) {
79 $this->give($promise, $gen);
80 }
81 });
82 } else {
83 all($promise)->then(function($results) use($gen) {
84 if (($promise = $gen->send($results))) {
85 $this->give($promise, $gen);
86 }
87 });
88 }
89 $this->client->send();
90 }
91
92 /**
93 * Cancellation callback
94 *
95 * @param callable $resolve
96 * @param callable $reject
97 */
98 private function cancel(callable $resolve, callable $reject) {
99 $this->cancelled = true;
100
101 /* did we finish in the meantime? */
102 if ($this->result) {
103 $resolve($this->result);
104 } else {
105 $reject("Cancelled");
106 }
107 }
108 }