From: Michael Wallner Date: Thu, 12 May 2016 22:33:52 +0000 (+0200) Subject: generators+promises FTW X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=0dcd2b11631fcf57602514e13fae9d31bbb79000;p=m6w6%2Fseekat generators+promises FTW --- diff --git a/examples/generator.php b/examples/generator.php new file mode 100755 index 0000000..d488e97 --- /dev/null +++ b/examples/generator.php @@ -0,0 +1,43 @@ +#!/usr/bin/env php +pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::INFO)); + +$cli = new http\Client("curl", "seekat"); + +$api = new API([ + "Authorization" => "token ".getenv("GITHUB_TOKEN") +], null, $cli, $log); + +$api(function() use($api) { + $count = 0; + $events = yield $api->repos->m6w6->{"ext-http"}->issues->events(); + while ($events) { + /* pro-actively queue the next request */ + $next = $events->next(); + + foreach ($events as $event) { + if ($event->event == "labeled") { + continue; + } + ++$count; + printf("@%s %s issue #%d (%s) at %s\n", + $event->actor->login, + $event->event, + (int) (string) $event->issue->number, + $event->issue->title, + $event->created_at + ); + } + $events = yield $next; + } + return $count; +})->done(function($count) { + printf("Listed %d events\n", $count); +}); + diff --git a/examples/hooks.php b/examples/hooks.php new file mode 100755 index 0000000..ad23692 --- /dev/null +++ b/examples/hooks.php @@ -0,0 +1,49 @@ +#!/usr/bin/env php +configure([ + "max_host_connections" => 10, + "max_total_connections" => 50, +]); + +$log = new Monolog\Logger("seekat"); +$log->pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::WARNING)); + +$api = new API([ + "Authorization" => "token ".getenv("GITHUB_TOKEN") +], null, $cli, $log); + +$api(function() use($api) { + $repos = yield $api->users->m6w6->repos([ + "visibility" => "public", + "affiliation" => "owner" + ]); + while ($repos) { + $next = $repos->next(); + + $batch = []; + foreach ($repos as $repo) { + $batch[] = $repo->hooks(); + } + foreach (yield $batch as $key => $hooks) { + if (!count($hooks)) { + continue; + } + printf("%s:\n", $repos->{$key}->name); + foreach ($hooks as $hook) { + if ($hook->name == "web") { + printf("\t%s\n", $hook->config->url); + } else { + printf("\t%s\n", $hook->name); + } + } + } + + $repos = yield $next; + } +}); diff --git a/examples/promise.php b/examples/promise.php new file mode 100755 index 0000000..81c7f10 --- /dev/null +++ b/examples/promise.php @@ -0,0 +1,30 @@ +#!/usr/bin/env php +pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::INFO)); + +$api = new API([ + "Authorization" => "token ".getenv("GITHUB_TOKEN") +], null, null, $log); + +$api->users->m6w6->gists()->done(function($gists) { + foreach ($gists as $gist) { + $gist->commits()->then(function($commits) use($gist) { + foreach ($commits as $i => $commit) { + if (!$i) { + printf("\nGist %s, %s:\n", $gist->id, $gist->description ?: ""); + } + $cs = $commit->change_status; + printf("\t%s: ", substr($commit->version,0,8)); + printf("-%s+%s=%s\n", $cs->deletions, $cs->additions, $cs->total); + } + }); + } +}); + +$api->send(); \ No newline at end of file diff --git a/examples/readme.php b/examples/readme.php new file mode 100755 index 0000000..93cc464 --- /dev/null +++ b/examples/readme.php @@ -0,0 +1,8 @@ +#!/usr/bin/env php +repos->m6w6->seekat->readme->as("raw")->get(); +}); diff --git a/lib/API.php b/lib/API.php index c3877ec..f0b7620 100644 --- a/lib/API.php +++ b/lib/API.php @@ -19,6 +19,7 @@ use Psr\Log\NullLogger; use React\Promise\ExtendedPromiseInterface; use function React\Promise\resolve; use function React\Promise\reject; +use function React\Promise\map; class API implements \IteratorAggregate, \Countable { /** @@ -459,27 +460,33 @@ class API implements \IteratorAggregate, \Countable { } /** - * Run the send loop once + * Run the send loop through a generator * - * @param callable $timeout as function(\seekat\API $api) : float, returning any applicable select timeout - * @return bool + * @param callable|\Generator $cbg A \Generator or a factory of a \Generator yielding promises + * @return \React\Promise\ExtendedPromiseInterface The promise of the generator's return value */ - function __invoke(callable $timeout = null) : bool { + function __invoke($cbg) : ExtendedPromiseInterface { $this->__log->debug(__FUNCTION__); - if (count($this->__client)) { - if ($this->__client->once()) { - if ($timeout) { - $timeout = $timeout($this); - } - - $this->__log->debug(__FUNCTION__.": wait", compact("timeout")); - - $this->__client->wait($timeout); - return 0 < count($this->__client); - } + $invoker = new API\Invoker($this->__client); + + if ($cbg instanceof \Generator) { + return $invoker->iterate($cbg)->promise(); + } + + if (is_callable($cbg)) { + return $invoker->invoke(function() use($cbg) { + return $cbg($this); + })->promise(); } - return false; + + throw \InvalidArgumentException( + "Expected callable or Generator, got ".( + is_object($cbg) + ? "instance of ".get_class($cbg) + : gettype($cbg).": ".var_export($cbg, true) + ) + ); } /** @@ -552,6 +559,6 @@ class API implements \IteratorAggregate, \Countable { "headers" => $headers, ]); - return (new API\Deferred($this, $this->__client, $request))->promise(); + return (new API\Call($this, $this->__client, $request))->promise(); } } diff --git a/lib/API/Call.php b/lib/API/Call.php new file mode 100644 index 0000000..56e15dc --- /dev/null +++ b/lib/API/Call.php @@ -0,0 +1,123 @@ +api = $api; + $this->client = $client; + $this->request = $request; + + parent::__construct(function($resolve, $reject) { + return $this->cancel($resolve, $reject); + }); + + $client->attach($this); + $client->enqueue($request); + /* start off */ + $client->once(); + } + + /** + * Progress observer + * + * Import the response's data on success and resolve the promise. + * + * @var SplSubject $client The observed HTTP client + * @var Request The request which generated the update + * @var object $progress The progress information + */ + function update(SplSubject $client, Request $request = null, $progress = null) { + if ($request !== $this->request) { + return; + } + + $this->notify((object) compact("client", "request", "progress")); + + if ($progress->info === "finished") { + $this->response = $this->client->getResponse(); + $this->complete( + [$this, "resolve"], + [$this, "reject"] + ); + } + } + + /** + * Completion callback + * @param callable $resolve + * @param callable $reject + */ + private function complete(callable $resolve, callable $reject) { + $this->client->detach($this); + + if ($this->response) { + try { + $resolve($this->api->import($this->response)); + } catch (Exception $e) { + $reject($e); + } + } else { + $reject($this->client->getTransferInfo($this->request)["error"]); + } + + $this->client->dequeue($this->request); + } + + /** + * Cancellation callback + * @param callable $resolve + * @param callable $reject + */ + private function cancel(callable $resolve, callable $reject) { + /* did we finish in the meantime? */ + if ($this->response) { + $this->complete($resolve, $reject); + } else { + $this->client->detach($this); + $this->client->dequeue($this->request); + $reject("Cancelled"); + } + } +} diff --git a/lib/API/Deferred.php b/lib/API/Deferred.php deleted file mode 100644 index 4384684..0000000 --- a/lib/API/Deferred.php +++ /dev/null @@ -1,117 +0,0 @@ -api = $api; - $this->client = $client; - $this->request = $request; - - $client->attach($this); - $client->enqueue($request); - - parent::__construct(function($resolve, $reject) { - return $this->cancel($resolve, $reject); - }); - } - - /** - * Progress observer - * - * Import the response's data on success and resolve the promise. - * - * @var \SplSubject $client The observed HTTP client - * @var \http\Client\Request The request which generated the update - * @var object $progress The progress information - */ - function update(\SplSubject $client, Request $request = null, $progress = null) { - if ($request !== $this->request) { - return; - } - - $this->notify((object) compact("client", "request", "progress")); - - if ($progress->info === "finished") { - $this->response = $this->client->getResponse(); - $this->complete( - [$this, "resolve"], - [$this, "reject"] - ); - } - } - - /** - * Completion callback - * @param callable $resolve - * @param callable $reject - */ - private function complete(callable $resolve, callable $reject) { - $this->client->detach($this); - - if ($this->response) { - try { - $resolve($this->api->import($this->response)); - } catch (\Exception $e) { - $reject($e); - } - } else { - $reject($this->client->getTransferInfo($this->request)["error"]); - } - - $this->client->dequeue($this->request); - } - - /** - * Cancellation callback - * @param callable $resolve - * @param callable $reject - */ - private function cancel(callable $resolve, callable $reject) { - /* did we finish in the meantime? */ - if ($this->response) { - $this->complete($resolve, $reject); - } else { - $this->client->detach($this); - $this->client->dequeue($this->request); - $reject("Cancelled"); - } - } -} diff --git a/lib/API/Invoker.php b/lib/API/Invoker.php new file mode 100644 index 0000000..ff3a91e --- /dev/null +++ b/lib/API/Invoker.php @@ -0,0 +1,130 @@ +client = $client; + + parent::__construct(function($resolve, $reject) { + return $this->cancel($resolve, $reject); + }); + } + + /** + * Invoke $generator to create a \Generator which yields promises + * + * @param callable $generator as function() : \Generator, creating a generator yielding promises + * @return \seekat\API\Invoker + */ + function invoke(callable $generator) : Invoker { + $this->iterate($generator()); + return $this; + } + + /** + * Iterate over $gen, a \Generator yielding promises + * + * @param \Generator $gen + * @return \seekat\API\Invoker + */ + function iterate(Generator $gen) : Invoker { + $this->cancelled = false; + + foreach ($gen as $promise) { + if ($this->cancelled) { + break; + } + $this->queue($promise, $gen); + } + + if (!$this->cancelled) { + $this->resolve($this->result = $gen->getReturn()); + } + return $this; + } + + /** + * Get the generator's result + * + * @return \React\Promise\ExtendedPromiseInterface + */ + function result() : ExtendedPromiseInterface { + return $this->promise(); + } + + /** + * Promise handler + * + * @param \React\Promise\PromiseInterface $promise + * @param \Generator $to + */ + private function give(PromiseInterface $promise, Generator $to) { + $promise->then(function($result) use($to) { + if (($promise = $to->send($result))) { + $this->queue($promise, $to); + } + }); + } + + private function queue($promise, Generator $gen) { + if ($promise instanceof PromiseInterface) { + $this->give($promise, $gen); + } else { + all($promise)->then(function($results) use($gen) { + if (($promise = $gen->send($results))) { + $this->queue($promise, $gen); + } + }); + } + $this->client->send(); + } + + /** + * Cancellation callback + * + * @param callable $resolve + * @param callable $reject + */ + private function cancel(callable $resolve, callable $reject) { + $this->cancelled = true; + + /* did we finish in the meantime? */ + if ($this->result) { + $resolve($this->result); + } else { + $reject("Cancelled"); + } + } +}