From 0dcd2b11631fcf57602514e13fae9d31bbb79000 Mon Sep 17 00:00:00 2001 From: Michael Wallner Date: Fri, 13 May 2016 00:33:52 +0200 Subject: [PATCH] generators+promises FTW --- examples/generator.php | 43 ++++++++++ examples/hooks.php | 49 +++++++++++ examples/promise.php | 30 +++++++ examples/readme.php | 8 ++ lib/API.php | 41 +++++---- lib/API/{Deferred.php => Call.php} | 34 ++++---- lib/API/Invoker.php | 130 +++++++++++++++++++++++++++++ 7 files changed, 304 insertions(+), 31 deletions(-) create mode 100755 examples/generator.php create mode 100755 examples/hooks.php create mode 100755 examples/promise.php create mode 100755 examples/readme.php rename lib/API/{Deferred.php => Call.php} (79%) create mode 100644 lib/API/Invoker.php diff --git a/examples/generator.php b/examples/generator.php new file mode 100755 index 0000000..d488e97 --- /dev/null +++ b/examples/generator.php @@ -0,0 +1,43 @@ +#!/usr/bin/env php +pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::INFO)); + +$cli = new http\Client("curl", "seekat"); + +$api = new API([ + "Authorization" => "token ".getenv("GITHUB_TOKEN") +], null, $cli, $log); + +$api(function() use($api) { + $count = 0; + $events = yield $api->repos->m6w6->{"ext-http"}->issues->events(); + while ($events) { + /* pro-actively queue the next request */ + $next = $events->next(); + + foreach ($events as $event) { + if ($event->event == "labeled") { + continue; + } + ++$count; + printf("@%s %s issue #%d (%s) at %s\n", + $event->actor->login, + $event->event, + (int) (string) $event->issue->number, + $event->issue->title, + $event->created_at + ); + } + $events = yield $next; + } + return $count; +})->done(function($count) { + printf("Listed %d events\n", $count); +}); + diff --git a/examples/hooks.php b/examples/hooks.php new file mode 100755 index 0000000..ad23692 --- /dev/null +++ b/examples/hooks.php @@ -0,0 +1,49 @@ +#!/usr/bin/env php +configure([ + "max_host_connections" => 10, + "max_total_connections" => 50, +]); + +$log = new Monolog\Logger("seekat"); +$log->pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::WARNING)); + +$api = new API([ + "Authorization" => "token ".getenv("GITHUB_TOKEN") +], null, $cli, $log); + +$api(function() use($api) { + $repos = yield $api->users->m6w6->repos([ + "visibility" => "public", + "affiliation" => "owner" + ]); + while ($repos) { + $next = $repos->next(); + + $batch = []; + foreach ($repos as $repo) { + $batch[] = $repo->hooks(); + } + foreach (yield $batch as $key => $hooks) { + if (!count($hooks)) { + continue; + } + printf("%s:\n", $repos->{$key}->name); + foreach ($hooks as $hook) { + if ($hook->name == "web") { + printf("\t%s\n", $hook->config->url); + } else { + printf("\t%s\n", $hook->name); + } + } + } + + $repos = yield $next; + } +}); diff --git a/examples/promise.php b/examples/promise.php new file mode 100755 index 0000000..81c7f10 --- /dev/null +++ b/examples/promise.php @@ -0,0 +1,30 @@ +#!/usr/bin/env php +pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::INFO)); + +$api = new API([ + "Authorization" => "token ".getenv("GITHUB_TOKEN") +], null, null, $log); + +$api->users->m6w6->gists()->done(function($gists) { + foreach ($gists as $gist) { + $gist->commits()->then(function($commits) use($gist) { + foreach ($commits as $i => $commit) { + if (!$i) { + printf("\nGist %s, %s:\n", $gist->id, $gist->description ?: ""); + } + $cs = $commit->change_status; + printf("\t%s: ", substr($commit->version,0,8)); + printf("-%s+%s=%s\n", $cs->deletions, $cs->additions, $cs->total); + } + }); + } +}); + +$api->send(); \ No newline at end of file diff --git a/examples/readme.php b/examples/readme.php new file mode 100755 index 0000000..93cc464 --- /dev/null +++ b/examples/readme.php @@ -0,0 +1,8 @@ +#!/usr/bin/env php +repos->m6w6->seekat->readme->as("raw")->get(); +}); diff --git a/lib/API.php b/lib/API.php index c3877ec..f0b7620 100644 --- a/lib/API.php +++ b/lib/API.php @@ -19,6 +19,7 @@ use Psr\Log\NullLogger; use React\Promise\ExtendedPromiseInterface; use function React\Promise\resolve; use function React\Promise\reject; +use function React\Promise\map; class API implements \IteratorAggregate, \Countable { /** @@ -459,27 +460,33 @@ class API implements \IteratorAggregate, \Countable { } /** - * Run the send loop once + * Run the send loop through a generator * - * @param callable $timeout as function(\seekat\API $api) : float, returning any applicable select timeout - * @return bool + * @param callable|\Generator $cbg A \Generator or a factory of a \Generator yielding promises + * @return \React\Promise\ExtendedPromiseInterface The promise of the generator's return value */ - function __invoke(callable $timeout = null) : bool { + function __invoke($cbg) : ExtendedPromiseInterface { $this->__log->debug(__FUNCTION__); - if (count($this->__client)) { - if ($this->__client->once()) { - if ($timeout) { - $timeout = $timeout($this); - } - - $this->__log->debug(__FUNCTION__.": wait", compact("timeout")); - - $this->__client->wait($timeout); - return 0 < count($this->__client); - } + $invoker = new API\Invoker($this->__client); + + if ($cbg instanceof \Generator) { + return $invoker->iterate($cbg)->promise(); + } + + if (is_callable($cbg)) { + return $invoker->invoke(function() use($cbg) { + return $cbg($this); + })->promise(); } - return false; + + throw \InvalidArgumentException( + "Expected callable or Generator, got ".( + is_object($cbg) + ? "instance of ".get_class($cbg) + : gettype($cbg).": ".var_export($cbg, true) + ) + ); } /** @@ -552,6 +559,6 @@ class API implements \IteratorAggregate, \Countable { "headers" => $headers, ]); - return (new API\Deferred($this, $this->__client, $request))->promise(); + return (new API\Call($this, $this->__client, $request))->promise(); } } diff --git a/lib/API/Deferred.php b/lib/API/Call.php similarity index 79% rename from lib/API/Deferred.php rename to lib/API/Call.php index 4384684..56e15dc 100644 --- a/lib/API/Deferred.php +++ b/lib/API/Call.php @@ -2,12 +2,16 @@ namespace seekat\API; -use seekat\API; +use Exception; use http\Client; use http\Client\Request; use http\Client\Response; +use React\Promise\Deferred; +use seekat\API; +use SplObserver; +use SplSubject; -class Deferred extends \React\Promise\Deferred implements \SplObserver +class Call extends Deferred implements SplObserver { /** * The endpoint @@ -17,19 +21,19 @@ class Deferred extends \React\Promise\Deferred implements \SplObserver /** * The HTTP client - * @var \http\Client + * @var Client */ private $client; /** * The executed request - * @var \http\Client\Request + * @var Request */ private $request; /** * The promised response - * @var \http\Client\Response + * @var Response */ private $response; @@ -37,20 +41,22 @@ class Deferred extends \React\Promise\Deferred implements \SplObserver * Create a deferred promise for the response of $request * * @var \seekat\API $api The endpoint of the request - * @var \http\Client $client The HTTP client to send the request - * @var \http\Client\Request The request to execute + * @var Client $client The HTTP client to send the request + * @var Request The request to execute */ function __construct(API $api, Client $client, Request $request) { $this->api = $api; $this->client = $client; $this->request = $request; - $client->attach($this); - $client->enqueue($request); - parent::__construct(function($resolve, $reject) { return $this->cancel($resolve, $reject); }); + + $client->attach($this); + $client->enqueue($request); + /* start off */ + $client->once(); } /** @@ -58,11 +64,11 @@ class Deferred extends \React\Promise\Deferred implements \SplObserver * * Import the response's data on success and resolve the promise. * - * @var \SplSubject $client The observed HTTP client - * @var \http\Client\Request The request which generated the update + * @var SplSubject $client The observed HTTP client + * @var Request The request which generated the update * @var object $progress The progress information */ - function update(\SplSubject $client, Request $request = null, $progress = null) { + function update(SplSubject $client, Request $request = null, $progress = null) { if ($request !== $this->request) { return; } @@ -89,7 +95,7 @@ class Deferred extends \React\Promise\Deferred implements \SplObserver if ($this->response) { try { $resolve($this->api->import($this->response)); - } catch (\Exception $e) { + } catch (Exception $e) { $reject($e); } } else { diff --git a/lib/API/Invoker.php b/lib/API/Invoker.php new file mode 100644 index 0000000..ff3a91e --- /dev/null +++ b/lib/API/Invoker.php @@ -0,0 +1,130 @@ +client = $client; + + parent::__construct(function($resolve, $reject) { + return $this->cancel($resolve, $reject); + }); + } + + /** + * Invoke $generator to create a \Generator which yields promises + * + * @param callable $generator as function() : \Generator, creating a generator yielding promises + * @return \seekat\API\Invoker + */ + function invoke(callable $generator) : Invoker { + $this->iterate($generator()); + return $this; + } + + /** + * Iterate over $gen, a \Generator yielding promises + * + * @param \Generator $gen + * @return \seekat\API\Invoker + */ + function iterate(Generator $gen) : Invoker { + $this->cancelled = false; + + foreach ($gen as $promise) { + if ($this->cancelled) { + break; + } + $this->queue($promise, $gen); + } + + if (!$this->cancelled) { + $this->resolve($this->result = $gen->getReturn()); + } + return $this; + } + + /** + * Get the generator's result + * + * @return \React\Promise\ExtendedPromiseInterface + */ + function result() : ExtendedPromiseInterface { + return $this->promise(); + } + + /** + * Promise handler + * + * @param \React\Promise\PromiseInterface $promise + * @param \Generator $to + */ + private function give(PromiseInterface $promise, Generator $to) { + $promise->then(function($result) use($to) { + if (($promise = $to->send($result))) { + $this->queue($promise, $to); + } + }); + } + + private function queue($promise, Generator $gen) { + if ($promise instanceof PromiseInterface) { + $this->give($promise, $gen); + } else { + all($promise)->then(function($results) use($gen) { + if (($promise = $gen->send($results))) { + $this->queue($promise, $gen); + } + }); + } + $this->client->send(); + } + + /** + * Cancellation callback + * + * @param callable $resolve + * @param callable $reject + */ + private function cancel(callable $resolve, callable $reject) { + $this->cancelled = true; + + /* did we finish in the meantime? */ + if ($this->result) { + $resolve($this->result); + } else { + $reject("Cancelled"); + } + } +} -- 2.30.2