namespace seekat\API\Call;
-use Exception;
-use http\{
- Client, Client\Request, Client\Response
-};
+use http\{Client, Client\Request, Client\Response};
+use Psr\Log\LoggerInterface;
use seekat\API;
-use SplObserver;
-use SplSubject;
-final class Deferred extends \React\Promise\Deferred implements SplObserver
-{
+final class Deferred {
/**
* The response importer
*
*/
private $cache;
+ /**
+ * @var LoggerInterface
+ */
+ private $logger;
+
/**
* The executed request
*
*/
private $response;
+ /**
+ * @var mixed
+ */
+ private $promise;
+
+ /**
+ * @var \Closure
+ */
+ private $resolve;
+
+ /**
+ * @var \Closure
+ */
+ private $reject;
+
/**
* Create a deferred promise for the response of $request
*
* @param Cache\Service $cache
*/
function __construct(API $api, Request $request, Cache\Service $cache = null) {
- parent::__construct(function ($resolve, $reject) {
- return $this->cancel($resolve, $reject);
- });
-
$this->request = $request;
$this->client = $api->getClient();
+ $this->logger = $api->getLogger();
$this->result = new Result($api);
$this->cache = new Cache($cache);
- if ($this->cache->load($this->request, $cached)) {
- $api->getLogger()->info("deferred -> cached", [
- "method" => $request->getRequestMethod(),
- "url" => $request->getRequestUrl(),
- ]);
+ $future = $api->getFuture();
+ $context = $future->createContext(function() {
+ if ($this->response) {
+ /* we did finish in the meantime */
+ $this->complete();
+ } else {
+ $this->client->dequeue($this->request);
+ ($this->reject)("Cancelled");
+ }
+ });
+ $this->promise = $future->getPromise($context);
+ $this->resolve = API\Future\resolver($future, $context);
+ $this->reject = API\Future\rejecter($future, $context);
+ }
- $this->response = $cached;
- $this->complete(
- [$this, "resolve"],
- [$this, "reject"]
- );
- } else {
- $this->client->attach($this);
- $this->client->enqueue($this->request, function(Response $response) use($cached) {
- if ($response->getResponseCode() == 304) {
- $this->response = $cached;
- } else {
- $this->response = $response;
- }
- $this->complete(
- [$this, "resolve"],
- [$this, "reject"]
- );
- return true;
- });
- $api->getLogger()->info("deferred -> enqueued", [
- "method" => $request->getRequestMethod(),
- "url" => $request->getRequestUrl(),
- ]);
- /* start off */
- $this->client->once();
+ function __invoke() {
+ if (!$this->cached($cached)) {
+ $this->refresh($cached);
}
+
+ return $this->promise;
}
/**
- * Progress observer
+ * Peek into cache
*
- * Import the response's data on success and resolve the promise.
- *
- * @param SplSubject $client The observed HTTP client
- * @param Request $request The request which generated the update
- * @param object $progress The progress information
+ * @param Response $cached
+ * @return bool
*/
- function update(SplSubject $client, Request $request = null, $progress = null) {
- if ($request !== $this->request) {
- return;
+ private function cached(Response &$cached = null) : bool {
+ $fresh = $this->cache->load($this->request, $cachedResponse);
+
+ if (!$cachedResponse) {
+ return false;
+ } else {
+ $cached = $cachedResponse;
+
+ $this->logger->info("deferred -> cached", [
+ "method" => $this->request->getRequestMethod(),
+ "url" => $this->request->getRequestUrl(),
+ ]);
+
+
+ if (!$fresh) {
+ $this->logger->info("cached -> stale", [
+ "method" => $this->request->getRequestMethod(),
+ "url" => $this->request->getRequestUrl(),
+ ]);
+ return false;
+ }
}
- $this->notify((object) compact("client", "request", "progress"));
+ $this->response = $cached;
+ $this->complete("cached");
+ return true;
}
/**
- * Completion callback
- * @param callable $resolve
- * @param callable $reject
+ * Refresh
+ *
+ * @param Response|null $cached
*/
- private function complete(callable $resolve, callable $reject) {
- $this->client->detach($this);
-
- if ($this->response) {
- try {
- $api = ($this->result)($this->response);
+ private function refresh(Response $cached = null) {
+ $this->client->enqueue($this->request, function(Response $response) use($cached) {
+ $this->response = $response;
+ $this->complete();
+ return true;
+ });
- $this->cache->save($this->request, $this->response);
+ $this->logger->info(($cached ? "stale" : "deferred") . " -> enqueued", [
+ "method" => $this->request->getRequestMethod(),
+ "url" => $this->request->getRequestUrl(),
+ ]);
- $resolve($api);
- } catch (Exception $e) {
- $reject($e);
- }
- } else {
- $reject($this->client->getTransferInfo($this->request)->error);
- }
+ /* start off */
+ $this->client->once();
}
/**
- * Cancellation callback
- * @param callable $resolve
- * @param callable $reject
+ * Completion callback
*/
- private function cancel(callable $resolve, callable $reject) {
- /* did we finish in the meantime? */
+ private function complete(string $by = "enqueued") {
if ($this->response) {
- $this->complete($resolve, $reject);
+ $this->logger->info("$by -> response", [
+ "url" => $this->request->getRequestUrl(),
+ "info" => $this->response->getInfo(),
+ ]);
+
+ try {
+ $this->cache->update($this->request, $this->response);
+ ($this->resolve)(($this->result)($this->response));
+ } catch (\Throwable $e) {
+ ($this->reject)($e);
+ }
} else {
- $this->client->detach($this);
- $this->client->dequeue($this->request);
- $reject("Cancelled");
+ $info = $this->client->getTransferInfo($this->request);
+
+ $this->logger->warning("$by -> no response", [
+ "url" => $this->request->getRequestUrl(),
+ "info" => $info
+ ]);
+
+ ($this->reject)($info->error);
}
}
+
}