From: Michael Wallner Date: Tue, 17 Jan 2017 09:09:26 +0000 (+0100) Subject: basic async-interop support; generator consumer missing X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=f4aa6beaf2e1f0dc9c877782cbbad5a989194517;p=m6w6%2Fseekat basic async-interop support; generator consumer missing --- diff --git a/composer.json b/composer.json index b9fe1ec..c734070 100644 --- a/composer.json +++ b/composer.json @@ -14,19 +14,27 @@ "seekat\\": "lib/" }, "files": [ - "lib/functions.php" + "lib/functions.php", + "lib/Exception/functions.php", + "lib/API/Future/functions.php", + "lib/API/Links/functions.php" ] }, + "minimum-stability": "beta", + "prefer-stable": true, "require": { "php": "^7.0", "ext-http": "^3.0", - "react/promise": "^2.4", "seebz/uri-template": "dev-master", "psr/log": "^1.0", "psr/cache": "^1.0", - "psr/simple-cache": "^1.0" + "psr/simple-cache": "^1.0", + "async-interop/promise": "^0.4.0" }, "require-dev": { + "react/promise": "dev-async-interop", + "amphp/amp": "dev-master", + "amphp/loop": "dev-master", "peridot-php/peridot": "^1.15", "monolog/monolog": "^1.19", "peridot-php/leo": "^1.5", diff --git a/examples/promise.php b/examples/promise.php index 81c7f10..c26e911 100755 --- a/examples/promise.php +++ b/examples/promise.php @@ -4,17 +4,16 @@ require_once __DIR__."/../vendor/autoload.php"; use seekat\API; +use seekat\API\Future; $log = new Monolog\Logger("seekat"); -$log->pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::INFO)); +$log->pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::NOTICE)); -$api = new API([ - "Authorization" => "token ".getenv("GITHUB_TOKEN") -], null, null, $log); +$api = new API(Future\amp(), API\auth("token", getenv("GITHUB_TOKEN")), null, null, $log); -$api->users->m6w6->gists()->done(function($gists) { +$api->users->m6w6->gists()->when(function($error, $gists) { foreach ($gists as $gist) { - $gist->commits()->then(function($commits) use($gist) { + $gist->commits()->when(function($error, $commits) use($gist) { foreach ($commits as $i => $commit) { if (!$i) { printf("\nGist %s, %s:\n", $gist->id, $gist->description ?: ""); @@ -27,4 +26,4 @@ $api->users->m6w6->gists()->done(function($gists) { } }); -$api->send(); \ No newline at end of file +$api->send(); diff --git a/lib/API.php b/lib/API.php index 63fd004..546dd88 100644 --- a/lib/API.php +++ b/lib/API.php @@ -2,6 +2,7 @@ namespace seekat; +use AsyncInterop\Promise; use Countable; use Generator; use http\{ @@ -11,12 +12,10 @@ use IteratorAggregate; use Psr\Log\{ LoggerInterface, NullLogger }; -use React\Promise\{ - ExtendedPromiseInterface, function resolve -}; -use seekat\{ - API\Call, API\Consumer, API\ContentType, API\Iterator, API\Links, Exception\InvalidArgumentException +use seekat\API\{ + Call, Consumer, ContentType, Future, Iterator, Links }; +use seekat\Exception\InvalidArgumentException; class API implements IteratorAggregate, Countable { /** @@ -37,6 +36,12 @@ class API implements IteratorAggregate, Countable { */ private $cache; + /** + * Promisor + * @var Future + */ + private $future; + /** * The HTTP client * @var Client @@ -70,12 +75,15 @@ class API implements IteratorAggregate, Countable { /** * Create a new API endpoint root * + * @param Future $future pretending to fulfill promises * @param array $headers Standard request headers, defaults to ["Accept" => "application/vnd.github.v3+json"] * @param Url $url The API's endpoint, defaults to https://api.github.com * @param Client $client The HTTP client to use for executing requests * @param LoggerInterface $log A logger + * @param Call\Cache\Service $cache A cache */ - function __construct(array $headers = null, Url $url = null, Client $client = null, LoggerInterface $log = null, Call\Cache\Service $cache = null) { + function __construct(Future $future, array $headers = null, Url $url = null, Client $client = null, LoggerInterface $log = null, Call\Cache\Service $cache = null) { + $this->future = $future; $this->cache = $cache; $this->logger = $log ?? new NullLogger; $this->url = $url ?? new Url("https://api.github.com"); @@ -117,26 +125,31 @@ class API implements IteratorAggregate, Countable { * * @param string $method The API's "path" element to ascend into * @param array $args Array of arguments forwarded to \seekat\API::get() - * @return ExtendedPromiseInterface + * @return Promise */ - function __call(string $method, array $args) : ExtendedPromiseInterface { + function __call(string $method, array $args) : Promise { /* We cannot implement an explicit then() method, * because the Promise implementation might think * we're actually implementing Thenable, * which might cause an infinite loop. + * FIXXME: then/when */ - if ($method === "then") { - return $this->get()->then(...$args); + if ($method === "when") { + $promise = $this->get(); + $promise->when(...$args); + return $promise; } /* * very short-hand version: - * ->users->m6w6->gists->get()->then(...) + * ->users->m6w6->gists->get()->when(...) * vs: * ->users->m6w6->gists(...) */ if (is_callable(current($args))) { - return $this->api->get()->then(current($args)); + $promise = $this->get(); + $promise->when(current($args)); + return $promise; } return (new Call($this, $method))($args); @@ -146,10 +159,10 @@ class API implements IteratorAggregate, Countable { * Run the send loop through a generator * * @param callable|Generator $cbg A \Generator or a factory of a \Generator yielding promises - * @return ExtendedPromiseInterface The promise of the generator's return value + * @return Promise The promise of the generator's return value * @throws InvalidArgumentException */ - function __invoke($cbg) : ExtendedPromiseInterface { + function __invoke($cbg) : Promise { $this->logger->debug(__FUNCTION__); $consumer = new Consumer($this->client); @@ -222,6 +235,13 @@ class API implements IteratorAggregate, Countable { return $this->logger; } + /** + * @return Future + */ + function getFuture() { + return $this->future; + } + /** * @return Client */ @@ -340,9 +360,9 @@ class API implements IteratorAggregate, Countable { * * @param mixed $args The HTTP query string parameters * @param array $headers The request's additional HTTP headers - * @return ExtendedPromiseInterface + * @return Promise */ - function get($args = null, array $headers = null, $cache = null) : ExtendedPromiseInterface { + function get($args = null, array $headers = null, $cache = null) : Promise { return $this->request("GET", $args, null, $headers, $cache); } @@ -351,9 +371,9 @@ class API implements IteratorAggregate, Countable { * * @param mixed $args The HTTP query string parameters * @param array $headers The request's additional HTTP headers - * @return ExtendedPromiseInterface + * @return Promise */ - function delete($args = null, array $headers = null) : ExtendedPromiseInterface { + function delete($args = null, array $headers = null) : Promise { return $this->request("DELETE", $args, null, $headers); } @@ -363,9 +383,9 @@ class API implements IteratorAggregate, Countable { * @param mixed $body The HTTP message's body * @param mixed $args The HTTP query string parameters * @param array $headers The request's additional HTTP headers - * @return ExtendedPromiseInterface + * @return Promise */ - function post($body = null, $args = null, array $headers = null) : ExtendedPromiseInterface { + function post($body = null, $args = null, array $headers = null) : Promise { return $this->request("POST", $args, $body, $headers); } @@ -375,9 +395,9 @@ class API implements IteratorAggregate, Countable { * @param mixed $body The HTTP message's body * @param mixed $args The HTTP query string parameters * @param array $headers The request's additional HTTP headers - * @return ExtendedPromiseInterface + * @return Promise */ - function put($body = null, $args = null, array $headers = null) : ExtendedPromiseInterface { + function put($body = null, $args = null, array $headers = null) : Promise { return $this->request("PUT", $args, $body, $headers); } @@ -387,9 +407,9 @@ class API implements IteratorAggregate, Countable { * @param mixed $body The HTTP message's body * @param mixed $args The HTTP query string parameters * @param array $headers The request's additional HTTP headers - * @return ExtendedPromiseInterface + * @return Promise */ - function patch($body = null, $args = null, array $headers = null) : ExtendedPromiseInterface { + function patch($body = null, $args = null, array $headers = null) : Promise { return $this->request("PATCH", $args, $body, $headers); } @@ -444,19 +464,19 @@ class API implements IteratorAggregate, Countable { * @param mixed $body Thee HTTP message's body * @param array $headers The request's additional HTTP headers * @param Call\Cache\Service $cache - * @return ExtendedPromiseInterface + * @return Promise */ - private function request(string $method, $args = null, $body = null, array $headers = null, Call\Cache\Service $cache = null) : ExtendedPromiseInterface { + private function request(string $method, $args = null, $body = null, array $headers = null, Call\Cache\Service $cache = null) : Promise { if (isset($this->data)) { $this->logger->debug("request -> resolve", [ "method" => $method, - "url" => (string)$this->url, + "url" => (string) $this->url, "args" => $args, "body" => $body, "headers" => $headers, ]); - return resolve($this); + return Future\resolve($this->future, $this); } $url = $this->url->mod(["query" => new QueryString($args)]); @@ -474,6 +494,6 @@ class API implements IteratorAggregate, Countable { "headers" => $headers, ]); - return (new Call\Deferred($this, $request, $cache ?: $this->cache))->promise(); + return (new Call\Deferred($this, $request, $cache ?: $this->cache))(); } } diff --git a/lib/API/Call.php b/lib/API/Call.php index f10d6e8..ba51ca3 100644 --- a/lib/API/Call.php +++ b/lib/API/Call.php @@ -2,8 +2,8 @@ namespace seekat\API; +use AsyncInterop\Promise; use http\Url; -use React\Promise\ExtendedPromiseInterface; use seekat\API; use seekat\Exception; @@ -24,12 +24,15 @@ final class Call $this->call = $call; } - function __invoke(array $args) : ExtendedPromiseInterface { + function __invoke(array $args) : Promise { $promise = $this->api->{$this->call}->get(...$args); /* fetch resource, unless already localized, and try for {$method}_url */ if (!$this->api->exists($this->call)) { - $promise = $promise->otherwise(function($error) use($args) { + $promise->when(function($error, $value) use($args) { + if (!isset($error)) { + return $value; + } if ($this->api->exists($this->call."_url", $url)) { $url = new Url(uri_template($url, (array)current($args))); return $this->api->withUrl($url)->get(...$args); diff --git a/lib/API/Call/Deferred.php b/lib/API/Call/Deferred.php index 7fc44de..9bdc030 100644 --- a/lib/API/Call/Deferred.php +++ b/lib/API/Call/Deferred.php @@ -2,15 +2,17 @@ namespace seekat\API\Call; +use AsyncInterop\Promise; use Exception; use http\{ Client, Client\Request, Client\Response }; +use Psr\Log\LoggerInterface; use seekat\API; use SplObserver; use SplSubject; -final class Deferred extends \React\Promise\Deferred implements SplObserver +final class Deferred implements SplObserver { /** * The response importer @@ -33,6 +35,11 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver */ private $cache; + /** + * @var LoggerInterface + */ + private $logger; + /** * The executed request * @@ -47,6 +54,26 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver */ private $response; + /** + * @var Promise + */ + private $promise; + + /** + * @var \Closure + */ + private $resolve; + + /** + * @var \Closure + */ + private $reject; + + /** + * @var \Closure + */ + private $update; + /** * Create a deferred promise for the response of $request * @@ -55,26 +82,38 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver * @param Cache\Service $cache */ function __construct(API $api, Request $request, Cache\Service $cache = null) { - parent::__construct(function ($resolve, $reject) { - return $this->cancel($resolve, $reject); - }); - $this->request = $request; $this->client = $api->getClient(); + $this->logger = $api->getLogger(); $this->result = new Result($api); $this->cache = new Cache($cache); + $future = $api->getFuture(); + $context = $future->createContext(function() { + if ($this->response) { + /* we did finish in the meantime */ + $this->complete(); + } else { + $this->client->detach($this); + $this->client->dequeue($this->request); + ($this->reject)("Cancelled"); + } + }); + $this->promise = $future->getPromise($context); + $this->resolve = API\Future\resolver($future, $context); + $this->reject = API\Future\rejecter($future, $context); + $this->update = API\Future\updater($future, $context); + } + + function __invoke() : Promise { if ($this->cache->load($this->request, $cached)) { - $api->getLogger()->info("deferred -> cached", [ - "method" => $request->getRequestMethod(), - "url" => $request->getRequestUrl(), + $this->logger->info("deferred -> cached", [ + "method" => $this->request->getRequestMethod(), + "url" => $this->request->getRequestUrl(), ]); $this->response = $cached; - $this->complete( - [$this, "resolve"], - [$this, "reject"] - ); + $this->complete(); } else { $this->client->attach($this); $this->client->enqueue($this->request, function(Response $response) use($cached) { @@ -83,19 +122,18 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver } else { $this->response = $response; } - $this->complete( - [$this, "resolve"], - [$this, "reject"] - ); + $this->complete(); return true; }); - $api->getLogger()->info("deferred -> enqueued", [ - "method" => $request->getRequestMethod(), - "url" => $request->getRequestUrl(), + $this->logger->info("deferred -> enqueued", [ + "method" => $this->request->getRequestMethod(), + "url" => $this->request->getRequestUrl(), ]); /* start off */ $this->client->once(); } + + return $this->promise; } /** @@ -112,7 +150,7 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver return; } - $this->notify((object) compact("client", "request", "progress")); + ($this->update)((object) compact("client", "request", "progress")); } /** @@ -120,7 +158,7 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver * @param callable $resolve * @param callable $reject */ - private function complete(callable $resolve, callable $reject) { + private function complete() { $this->client->detach($this); if ($this->response) { @@ -129,28 +167,13 @@ final class Deferred extends \React\Promise\Deferred implements SplObserver $this->cache->save($this->request, $this->response); - $resolve($api); + ($this->resolve)($api); } catch (Exception $e) { - $reject($e); + ($this->reject)($e); } } else { - $reject($this->client->getTransferInfo($this->request)->error); + ($this->reject)($this->client->getTransferInfo($this->request)->error); } } - /** - * Cancellation callback - * @param callable $resolve - * @param callable $reject - */ - private function cancel(callable $resolve, callable $reject) { - /* did we finish in the meantime? */ - if ($this->response) { - $this->complete($resolve, $reject); - } else { - $this->client->detach($this); - $this->client->dequeue($this->request); - $reject("Cancelled"); - } - } } diff --git a/lib/API/Call/Result.php b/lib/API/Call/Result.php index 8afc57e..22ce2ef 100644 --- a/lib/API/Call/Result.php +++ b/lib/API/Call/Result.php @@ -35,9 +35,7 @@ final class Result throw $e; } - $this->api = $this->api->with(compact("type", "data", "links")); - - return $this->api; + return $this->api = $this->api->with(compact("type", "data", "links")); } /** @@ -56,13 +54,11 @@ final class Result throw $e; } - if (($link = $response->getHeader("Link", Header::class))) { - $links = new API\Links($link); - } else { - $links = null; + if (!($link = $response->getHeader("Link", Header::class))) { + $link = null; } - return $links; + return new API\Links($link); } private function checkResponseType(Response $response) { diff --git a/lib/API/Future.php b/lib/API/Future.php new file mode 100644 index 0000000..8c3571c --- /dev/null +++ b/lib/API/Future.php @@ -0,0 +1,41 @@ +createContext(); + $future->onSuccess($promisor, $value); + return $future->getPromise($promisor); +} + +/** + * @param Future $future + * @param mixed $reason + * @return Promise + */ +function reject(Future $future, $reason) { + $promisor = $future->createContext(); + $future->onFailure($promisor, $reason); + return $future->getPromise($promisor); +} + +/** + * @param Future $future + * @param mixed $context Promisor + * @return \Closure + */ +function resolver(Future $future, $context) { + return function($value) use($future, $context) { + return $future->onSuccess($context, $value); + }; +} + +/** + * @param Future $future + * @param mixed $context Promisor + * @return \Closure + */ +function rejecter(Future $future, $context) { + return function($reason) use($future, $context) { + return $future->onFailure($context, $reason); + }; +} + +/** + * @param Future $future + * @param mixed $context Promisor + * @return \Closure + */ +function updater(Future $future, $context) { + return function($update) use($future, $context) { + return $future->onUpdate($context, $update); + }; +} + +/** + * @return Future + */ +function react() { + return new class implements Future { + /** + * @param callable|null $onCancel + * @return ReactDeferred + */ + function createContext(callable $onCancel = null) { + return new ReactDeferred($onCancel); + } + + function getPromise($context) : Promise { + /* @var $context ReactDeferred */ + return $context->promise(); + } + + function onSuccess($context, $value) { + /* @var $context ReactDeferred */ + $context->resolve($value); + } + + function onFailure($context, $reason) { + /* @var $context ReactDeferred */ + $context->reject($reason); + } + + function onUpdate($context, $update) { + /* @var $context ReactDeferred */ + $context->notify($update); + } + }; +} + +/** + * @return Future + */ +function amp() { + return new class implements Future { + /** + * @return AmpDeferred + */ + function createContext(callable $onCancel = null) { + return new AmpDeferred(); + } + + function getPromise($context) : Promise { + /* @var $context AmpDeferred */ + return $context->promise(); + } + + function onSuccess($context, $value) { + /* @var $context AmpDeferred */ + $context->resolve($value); + } + + function onFailure($context, $reason) { + /* @var $context AmpDeferred */ + $context->fail($reason); + } + + function onUpdate($context, $update) { + /* @var $context AmpDeferred */ + /* noop */ + } + }; +} + +/** + * @return Future + */ +function icicle() { + return new class implements Future { + /** + * @param callable|null $onCancel + * @return IcicleDeferred + */ + function createContext(callable $onCancel = null) { + return new IcicleDeferred($onCancel); + } + + function getPromise($context): Promise { + /* @var $context IcicleDeferred */ + return $context->getPromise(); + } + + function onSuccess($context, $value) { + /* @var $context IcicleDeferred */ + $context->resolve($value); + } + + function onFailure($context, $reason) { + /* @var $context IcicleDeferred */ + $context->reject($reason); + } + + function onUpdate($context, $update) { + /* @var $context IcicleDeferred */ + /* noop */ + } + }; +} diff --git a/lib/API/Iterator.php b/lib/API/Iterator.php index 33d5132..f8b6495 100644 --- a/lib/API/Iterator.php +++ b/lib/API/Iterator.php @@ -3,10 +3,9 @@ namespace seekat\API; use http\Url; -use Iterator as BaseIterator; use seekat\API; -final class Iterator implements BaseIterator +final class Iterator implements \Iterator { /** * The endpoint diff --git a/lib/API/Links.php b/lib/API/Links.php index 85d24bd..322eb8e 100644 --- a/lib/API/Links.php +++ b/lib/API/Links.php @@ -31,11 +31,13 @@ final class Links implements Serializable * @param Header $links The Link header * @throws UnexpectedValueException */ - function __construct(Header $links) { - if (strcasecmp($links->name, "Link")) { - throw new UnexpectedValueException("Expected 'Link' header, got: '{$links->name}'"); + function __construct(Header $links = null) { + if ($links) { + if (strcasecmp($links->name, "Link")) { + throw new UnexpectedValueException("Expected 'Link' header, got: '{$links->name}'"); + } + $this->unserialize($links->value); } - $this->unserialize($links->value); } /** diff --git a/lib/API/Links/functions.php b/lib/API/Links/functions.php new file mode 100644 index 0000000..eb9b495 --- /dev/null +++ b/lib/API/Links/functions.php @@ -0,0 +1,61 @@ +getLinks(); + if ($links && ($first = $links->getFirst())) { + return $api->withUrl($first)->get(null, null, $cache); + } + return Future\reject($api->getFuture(), $links); +} + +/** + * Perform a GET request against the link's "prev" relation + * + * @return Promise + */ +function prev(API $api, Cache\Service $cache = null) : Promise { + $links = $api->getLinks(); + if ($links && ($prev = $links->getPrev())) { + return $api->withUrl($prev)->get(null, null, $cache); + } + return Future\reject($api->getFuture(), $links); +} + +/** + * Perform a GET request against the link's "next" relation + * + * @return Promise + */ +function next(API $api, Cache\Service $cache = null) : Promise { + $links = $api->getLinks(); + if ($links && ($next = $links->getNext())) { + return $api->withUrl($next)->get(null, null, $cache); + } + return Future\reject($api->getFuture(), $links); +} + +/** + * Perform a GET request against the link's "last" relation + * + * @return Promise + */ +function last(API $api, Cache\Service $cache = null) : Promise { + $links = $api->getLinks(); + if ($links && ($last = $links->getLast())) { + return $api->withUrl($last)->get(null, null, $cache); + } + return Future\reject($api->getFuture(), $links); +} + diff --git a/lib/API/functions.php b/lib/API/functions.php new file mode 100644 index 0000000..950791d --- /dev/null +++ b/lib/API/functions.php @@ -0,0 +1,12 @@ + "$type $value"]; +} diff --git a/lib/Exception/RequestException.php b/lib/Exception/RequestException.php index 7f91535..575097f 100644 --- a/lib/Exception/RequestException.php +++ b/lib/Exception/RequestException.php @@ -2,7 +2,6 @@ namespace seekat\Exception; -use Exception as BaseException; use http\ { Client\Response, Header @@ -12,7 +11,7 @@ use seekat\Exception; /** * @code-coverage-ignore */ -class RequestException extends BaseException implements Exception +class RequestException extends \Exception implements Exception { /** * JSON errors diff --git a/lib/Exception/functions.php b/lib/Exception/functions.php new file mode 100644 index 0000000..08c8ddb --- /dev/null +++ b/lib/Exception/functions.php @@ -0,0 +1,18 @@ +getMessage(); + } else { + $message = $error; + $error = new \Exception($error); + } + return $message; +} diff --git a/lib/functions.php b/lib/functions.php index 9776da9..fdd9ea0 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -19,81 +19,3 @@ function typeof($arg, $export = false) { return $type; } -namespace seekat\Exception; - -/** - * Canonicalize an error message from a string or Exception - * @param string|Exception $error - * @return string - */ -function message(&$error) : string { - if ($error instanceof \Throwable) { - $message = $error->getMessage(); - } else { - $message = $error; - $error = new \Exception($error); - } - return $message; -} - -namespace seekat\API\Links; - -use React\Promise\{ - ExtendedPromiseInterface, - function reject -}; -use seekat\API; -use seekat\API\Call\Cache; - -/** - * Perform a GET request against the link's "first" relation - * - * @return ExtendedPromiseInterface - */ -function first(API $api, Cache\Service $cache = null) : ExtendedPromiseInterface { - $links = $api->getLinks(); - if ($links && ($first = $links->getFirst())) { - return $api->withUrl($first)->get(null, null, $cache); - } - return reject($links); -} - -/** - * Perform a GET request against the link's "prev" relation - * - * @return ExtendedPromiseInterface - */ -function prev(API $api, Cache\Service $cache = null) : ExtendedPromiseInterface { - $links = $api->getLinks(); - if ($links && ($prev = $links->getPrev())) { - return $api->withUrl($prev)->get(null, null, $cache); - } - return reject($links); -} - -/** - * Perform a GET request against the link's "next" relation - * - * @return ExtendedPromiseInterface - */ -function next(API $api, Cache\Service $cache = null) : ExtendedPromiseInterface { - $links = $api->getLinks(); - if ($links && ($next = $links->getNext())) { - return $api->withUrl($next)->get(null, null, $cache); - } - return reject($links); -} - -/** - * Perform a GET request against the link's "last" relation - * - * @return ExtendedPromiseInterface - */ -function last(API $api, Cache\Service $cache = null) : ExtendedPromiseInterface { - $links = $api->getLinks(); - if ($links && ($last = $links->getLast())) { - return $api->withUrl($last)->get(null, null, $cache); - } - return reject($links); -} -