basic async-interop support; generator consumer missing
[m6w6/seekat] / lib / API / Call / Deferred.php
index 7fc44de1ba607c50c36e22c4ee9c39fae130b887..9bdc030ba955e07c562b20f6913c8d88b6c62287 100644 (file)
@@ -2,15 +2,17 @@
 
 namespace seekat\API\Call;
 
+use AsyncInterop\Promise;
 use Exception;
 use http\{
        Client, Client\Request, Client\Response
 };
+use Psr\Log\LoggerInterface;
 use seekat\API;
 use SplObserver;
 use SplSubject;
 
-final class Deferred extends \React\Promise\Deferred implements SplObserver
+final class Deferred implements SplObserver
 {
        /**
         * The response importer
@@ -33,6 +35,11 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver
         */
        private $cache;
 
+       /**
+        * @var LoggerInterface
+        */
+       private $logger;
+
        /**
         * The executed request
         *
@@ -47,6 +54,26 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver
         */
        private $response;
 
+       /**
+        * @var Promise
+        */
+       private $promise;
+
+       /**
+        * @var \Closure
+        */
+       private $resolve;
+
+       /**
+        * @var \Closure
+        */
+       private $reject;
+
+       /**
+        * @var \Closure
+        */
+       private $update;
+
        /**
         * Create a deferred promise for the response of $request
         *
@@ -55,26 +82,38 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver
         * @param Cache\Service $cache
         */
        function __construct(API $api, Request $request, Cache\Service $cache = null) {
-               parent::__construct(function ($resolve, $reject) {
-                       return $this->cancel($resolve, $reject);
-               });
-
                $this->request = $request;
                $this->client = $api->getClient();
+               $this->logger = $api->getLogger();
                $this->result = new Result($api);
                $this->cache = new Cache($cache);
 
+               $future = $api->getFuture();
+               $context = $future->createContext(function() {
+                       if ($this->response) {
+                               /* we did finish in the meantime */
+                               $this->complete();
+                       } else {
+                               $this->client->detach($this);
+                               $this->client->dequeue($this->request);
+                               ($this->reject)("Cancelled");
+                       }
+               });
+               $this->promise = $future->getPromise($context);
+               $this->resolve = API\Future\resolver($future, $context);
+               $this->reject = API\Future\rejecter($future, $context);
+               $this->update = API\Future\updater($future, $context);
+       }
+
+       function __invoke() : Promise {
                if ($this->cache->load($this->request, $cached)) {
-                       $api->getLogger()->info("deferred -> cached", [
-                               "method" => $request->getRequestMethod(),
-                               "url" => $request->getRequestUrl(),
+                       $this->logger->info("deferred -> cached", [
+                               "method" => $this->request->getRequestMethod(),
+                               "url" => $this->request->getRequestUrl(),
                        ]);
 
                        $this->response = $cached;
-                       $this->complete(
-                               [$this, "resolve"],
-                               [$this, "reject"]
-                       );
+                       $this->complete();
                } else {
                        $this->client->attach($this);
                        $this->client->enqueue($this->request, function(Response $response) use($cached) {
@@ -83,19 +122,18 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver
                                } else {
                                        $this->response = $response;
                                }
-                               $this->complete(
-                                       [$this, "resolve"],
-                                       [$this, "reject"]
-                               );
+                               $this->complete();
                                return true;
                        });
-                       $api->getLogger()->info("deferred -> enqueued", [
-                               "method" => $request->getRequestMethod(),
-                               "url" => $request->getRequestUrl(),
+                       $this->logger->info("deferred -> enqueued", [
+                               "method" => $this->request->getRequestMethod(),
+                               "url" => $this->request->getRequestUrl(),
                        ]);
                        /* start off */
                        $this->client->once();
                }
+
+               return $this->promise;
        }
 
        /**
@@ -112,7 +150,7 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver
                        return;
                }
 
-               $this->notify((object) compact("client", "request", "progress"));
+               ($this->update)((object) compact("client", "request", "progress"));
        }
 
        /**
@@ -120,7 +158,7 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver
         * @param callable $resolve
         * @param callable $reject
         */
-       private function complete(callable $resolve, callable $reject) {
+       private function complete() {
                $this->client->detach($this);
 
                if ($this->response) {
@@ -129,28 +167,13 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver
 
                                $this->cache->save($this->request, $this->response);
 
-                               $resolve($api);
+                               ($this->resolve)($api);
                        } catch (Exception $e) {
-                               $reject($e);
+                               ($this->reject)($e);
                        }
                } else {
-                       $reject($this->client->getTransferInfo($this->request)->error);
+                       ($this->reject)($this->client->getTransferInfo($this->request)->error);
                }
        }
 
-       /**
-        * Cancellation callback
-        * @param callable $resolve
-        * @param callable $reject
-        */
-       private function cancel(callable $resolve, callable $reject) {
-               /* did we finish in the meantime? */
-               if ($this->response) {
-                       $this->complete($resolve, $reject);
-               } else {
-                       $this->client->detach($this);
-                       $this->client->dequeue($this->request);
-                       $reject("Cancelled");
-               }
-       }
 }