flush
[m6w6/seekat] / lib / API / Consumer.php
index 6e677fe0dde7e3e1dde39f1ac6a80f1aab47b608..d593588aee7883c47302b9c4b03c953793e7a929 100644 (file)
@@ -2,22 +2,21 @@
 
 namespace seekat\API;
 
+use AsyncInterop\Promise;
 use Generator;
 use http\Client;
-use React\Promise\{
-       Deferred,
-       ExtendedPromiseInterface,
-       PromiseInterface,
-       function all
+use seekat\API;
+use seekat\Exception\{
+       InvalidArgumentException, UnexpectedValueException, function exception
 };
 
-final class Consumer extends Deferred
+final class Consumer
 {
        /**
-        * The HTTP client
-        * @var Client
+        * Loop
+        * @var callable
         */
-       private $client;
+       private $loop;
 
        /**
         * The return value of the generator
@@ -32,24 +31,49 @@ final class Consumer extends Deferred
        private $cancelled = false;
 
        /**
-        * Create a new generator invoker
-        * @param Client $client
+        * @var Promise
         */
-       function __construct(Client $client) {
-               $this->client = $client;
+       private $promise;
 
-               parent::__construct(function($resolve, $reject) {
-                       return $this->cancel($resolve, $reject);
+       /**
+        * @var \Closure
+        */
+       private $resolve;
+
+       /**
+        * @var \Closure
+        */
+       private $reject;
+
+       /**
+        * @var \Closure
+        */
+       private $reduce;
+
+       /**
+        * Create a new generator consumer
+        * @param Future $future
+        * @param callable $loop
+        */
+       function __construct(Future $future, callable $loop) {
+               $this->loop = $loop;
+
+               $context = $future->createContext(function() {
+                       $this->cancelled = true;
                });
+               $this->promise = $future->getPromise($context);
+               $this->resolve = API\Future\resolver($future, $context);
+               $this->reject = API\Future\rejecter($future, $context);
+               $this->reduce = API\Future\reducer($future, $context);
        }
 
        /**
         * Iterate over $gen, a \Generator yielding promises
         *
         * @param Generator $gen
-        * @return ExtendedPromiseInterface
+        * @return Promise
         */
-       function __invoke(Generator $gen) : ExtendedPromiseInterface {
+       function __invoke(Generator $gen) : Promise {
                $this->cancelled = false;
 
                foreach ($gen as $promise) {
@@ -59,50 +83,47 @@ final class Consumer extends Deferred
                        $this->give($promise, $gen);
                }
 
+               #($this->loop)();
+
                if (!$this->cancelled) {
-                       $this->resolve($this->result = $gen->getReturn());
+                       $this->result = $gen->getReturn();
+               }
+               if (isset($this->result)) {
+                       ($this->resolve)($this->result);
+               } else {
+                       ($this->reject)("Cancelled");
                }
 
-               return $this->promise();
+               return $this->promise;
        }
 
        /**
         * Promise handler
         *
-        * @param array|PromiseInterface $promise
+        * @param array|Promise $promise
         * @param Generator $gen
         */
        private function give($promise, Generator $gen) {
-               if ($promise instanceof PromiseInterface) {
-                       $promise->then(function($result) use($gen) {
-                               if (($promise = $gen->send($result))) {
-                                       $this->give($promise, $gen);
+               if ($promise instanceof \Traversable) {
+                       $promise = iterator_to_array($promise);
+               }
+               if (is_array($promise)) {
+                       $promise = ($this->reduce)($promise);
+               }
+               if ($promise instanceof Promise) {
+                       $promise->when(function($error, $result) use($gen) {
+                               if ($error) {
+                                       $gen->throw(exception($error));
                                }
-                       });
-               } else {
-                       all($promise)->then(function($results) use($gen) {
-                               if (($promise = $gen->send($results))) {
+                               if (($promise = $gen->send($result))) {
                                        $this->give($promise, $gen);
                                }
                        });
-               }
-               $this->client->send();
-       }
-
-       /**
-        * Cancellation callback
-        *
-        * @param callable $resolve
-        * @param callable $reject
-        */
-       private function cancel(callable $resolve, callable $reject) {
-               $this->cancelled = true;
-
-               /* did we finish in the meantime? */
-               if ($this->result) {
-                       $resolve($this->result);
                } else {
-                       $reject("Cancelled");
+                       $gen->throw(new UnexpectedValueException(
+                               "Expected Promise or array of Promises; got ".\seekat\typeof($promise)));
                }
+               /* FIXME: external loop */
+               ($this->loop)();
        }
 }