generators+promises FTW
authorMichael Wallner <mike@php.net>
Thu, 12 May 2016 22:33:52 +0000 (00:33 +0200)
committerMichael Wallner <mike@php.net>
Fri, 13 May 2016 13:04:46 +0000 (15:04 +0200)
examples/generator.php [new file with mode: 0755]
examples/hooks.php [new file with mode: 0755]
examples/promise.php [new file with mode: 0755]
examples/readme.php [new file with mode: 0755]
lib/API.php
lib/API/Call.php [new file with mode: 0644]
lib/API/Deferred.php [deleted file]
lib/API/Invoker.php [new file with mode: 0644]

diff --git a/examples/generator.php b/examples/generator.php
new file mode 100755 (executable)
index 0000000..d488e97
--- /dev/null
@@ -0,0 +1,43 @@
+#!/usr/bin/env php
+<?php
+
+require_once __DIR__."/../vendor/autoload.php";
+
+use seekat\API;
+
+$log = new Monolog\Logger("seekat");
+$log->pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::INFO));
+
+$cli = new http\Client("curl", "seekat");
+
+$api = new API([
+       "Authorization" => "token ".getenv("GITHUB_TOKEN")
+], null, $cli, $log);
+
+$api(function() use($api) {
+       $count = 0;
+       $events = yield $api->repos->m6w6->{"ext-http"}->issues->events();
+       while ($events) {
+               /* pro-actively queue the next request */
+               $next = $events->next();
+
+               foreach ($events as $event) {
+                       if ($event->event == "labeled") {
+                               continue;
+                       }
+                       ++$count;
+                       printf("@%s %s issue #%d (%s) at %s\n",
+                               $event->actor->login,
+                               $event->event,
+                               (int) (string) $event->issue->number,
+                               $event->issue->title,
+                               $event->created_at
+                       );
+               }
+               $events = yield $next;
+       }
+       return $count;
+})->done(function($count) {
+       printf("Listed %d events\n", $count);
+});
+
diff --git a/examples/hooks.php b/examples/hooks.php
new file mode 100755 (executable)
index 0000000..ad23692
--- /dev/null
@@ -0,0 +1,49 @@
+#!/usr/bin/env php
+<?php
+
+require_once __DIR__."/../vendor/autoload.php";
+
+use seekat\API;
+
+$cli = new http\Client("curl", "seekat");
+$cli->configure([
+       "max_host_connections" => 10,
+       "max_total_connections" => 50,
+]);
+
+$log = new Monolog\Logger("seekat");
+$log->pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::WARNING));
+
+$api = new API([
+       "Authorization" => "token ".getenv("GITHUB_TOKEN")
+], null, $cli, $log);
+
+$api(function() use($api) {
+       $repos = yield $api->users->m6w6->repos([
+               "visibility" => "public",
+               "affiliation" => "owner"
+       ]);
+       while ($repos) {
+               $next = $repos->next();
+
+               $batch = [];
+               foreach ($repos as $repo) {
+                       $batch[] = $repo->hooks();
+               }
+               foreach (yield $batch as $key => $hooks) {
+                       if (!count($hooks)) {
+                               continue;
+                       }
+                       printf("%s:\n", $repos->{$key}->name);
+                       foreach ($hooks as $hook) {
+                               if ($hook->name == "web") {
+                                       printf("\t%s\n", $hook->config->url);
+                               } else {
+                                       printf("\t%s\n", $hook->name);
+                               }
+                       }
+               }
+
+               $repos = yield $next;
+       }
+});
diff --git a/examples/promise.php b/examples/promise.php
new file mode 100755 (executable)
index 0000000..81c7f10
--- /dev/null
@@ -0,0 +1,30 @@
+#!/usr/bin/env php
+<?php
+
+require_once __DIR__."/../vendor/autoload.php";
+
+use seekat\API;
+
+$log = new Monolog\Logger("seekat");
+$log->pushHandler((new Monolog\Handler\StreamHandler(STDERR))->setLevel(Monolog\Logger::INFO));
+
+$api = new API([
+       "Authorization" => "token ".getenv("GITHUB_TOKEN")
+], null, null, $log);
+
+$api->users->m6w6->gists()->done(function($gists) {
+       foreach ($gists as $gist) {
+               $gist->commits()->then(function($commits) use($gist) {
+                       foreach ($commits as $i => $commit) {
+                               if (!$i) {
+                                       printf("\nGist %s, %s:\n", $gist->id, $gist->description ?: "<no title>");
+                               }
+                               $cs = $commit->change_status;
+                               printf("\t%s: ", substr($commit->version,0,8));
+                               printf("-%s+%s=%s\n", $cs->deletions, $cs->additions, $cs->total);
+                       }
+               });
+       }
+});
+
+$api->send();
\ No newline at end of file
diff --git a/examples/readme.php b/examples/readme.php
new file mode 100755 (executable)
index 0000000..93cc464
--- /dev/null
@@ -0,0 +1,8 @@
+#!/usr/bin/env php
+<?php
+
+require_once __DIR__."/../vendor/autoload.php";
+
+(new seekat\API)(function($api) {
+       echo yield $api->repos->m6w6->seekat->readme->as("raw")->get();
+});
index c3877ec28061185405fcdc3a383f7fd291d79a0f..f0b7620994d6516cb8f52e5efee4a2b5a908f700 100644 (file)
@@ -19,6 +19,7 @@ use Psr\Log\NullLogger;
 use React\Promise\ExtendedPromiseInterface;
 use function React\Promise\resolve;
 use function React\Promise\reject;
+use function React\Promise\map;
 
 class API implements \IteratorAggregate, \Countable {
        /**
@@ -459,27 +460,33 @@ class API implements \IteratorAggregate, \Countable {
        }
        
        /**
-        * Run the send loop once
+        * Run the send loop through a generator
         *
-        * @param callable $timeout as function(\seekat\API $api) : float, returning any applicable select timeout
-        * @return bool
+        * @param callable|\Generator $cbg A \Generator or a factory of a \Generator yielding promises
+        * @return \React\Promise\ExtendedPromiseInterface The promise of the generator's return value
         */
-       function __invoke(callable $timeout = null) : bool {
+       function __invoke($cbg) : ExtendedPromiseInterface {
                $this->__log->debug(__FUNCTION__);
                
-               if (count($this->__client)) {
-                       if ($this->__client->once()) {
-                               if ($timeout) {
-                                       $timeout = $timeout($this);
-                               }
-                               
-                               $this->__log->debug(__FUNCTION__.": wait", compact("timeout"));
-                               
-                               $this->__client->wait($timeout);
-                               return 0 < count($this->__client);
-                       }
+               $invoker = new API\Invoker($this->__client);
+
+               if ($cbg instanceof \Generator) {
+                       return $invoker->iterate($cbg)->promise();
+               }
+
+               if (is_callable($cbg)) {
+                       return $invoker->invoke(function() use($cbg) {
+                               return $cbg($this);
+                       })->promise();
                }
-               return false;
+
+               throw \InvalidArgumentException(
+                       "Expected callable or Generator, got ".(
+                               is_object($cbg)
+                                       ? "instance of ".get_class($cbg)
+                                       : gettype($cbg).": ".var_export($cbg, true)
+                       )
+               );
        }
        
        /**
@@ -552,6 +559,6 @@ class API implements \IteratorAggregate, \Countable {
                        "headers" => $headers,
                ]);
                
-               return (new API\Deferred($this, $this->__client, $request))->promise();
+               return (new API\Call($this, $this->__client, $request))->promise();
        }
 }
diff --git a/lib/API/Call.php b/lib/API/Call.php
new file mode 100644 (file)
index 0000000..56e15dc
--- /dev/null
@@ -0,0 +1,123 @@
+<?php
+
+namespace seekat\API;
+
+use Exception;
+use http\Client;
+use http\Client\Request;
+use http\Client\Response;
+use React\Promise\Deferred;
+use seekat\API;
+use SplObserver;
+use SplSubject;
+
+class Call extends Deferred implements SplObserver
+{
+       /**
+        * The endpoint
+        * @var \seekat\API
+        */
+       private $api;
+
+       /**
+        * The HTTP client
+        * @var Client
+        */
+       private $client;
+
+       /**
+        * The executed request
+        * @var Request
+        */
+       private $request;
+
+       /**
+        * The promised response
+        * @var Response
+        */
+       private $response;
+
+       /**
+        * Create a deferred promise for the response of $request
+        *
+        * @var \seekat\API $api The endpoint of the request
+        * @var Client $client The HTTP client to send the request
+        * @var Request The request to execute
+        */
+       function __construct(API $api, Client $client, Request $request) {
+               $this->api = $api;
+               $this->client = $client;
+               $this->request = $request;
+
+               parent::__construct(function($resolve, $reject) {
+                       return $this->cancel($resolve, $reject);
+               });
+               
+               $client->attach($this);
+               $client->enqueue($request);
+               /* start off */
+               $client->once();
+       }
+
+       /**
+        * Progress observer
+        *
+        * Import the response's data on success and resolve the promise.
+        *
+        * @var SplSubject $client The observed HTTP client
+        * @var Request The request which generated the update
+        * @var object $progress The progress information
+        */
+       function update(SplSubject $client, Request $request = null, $progress = null) {
+               if ($request !== $this->request) {
+                       return;
+               }
+
+               $this->notify((object) compact("client", "request", "progress"));
+
+               if ($progress->info === "finished") {
+                       $this->response = $this->client->getResponse();
+                       $this->complete(
+                               [$this, "resolve"],
+                               [$this, "reject"]
+                       );
+               }
+       }
+
+       /**
+        * Completion callback
+        * @param callable $resolve
+        * @param callable $reject
+        */
+       private function complete(callable $resolve, callable $reject) {
+               $this->client->detach($this);
+
+               if ($this->response) {
+                       try {
+                               $resolve($this->api->import($this->response));
+                       } catch (Exception $e) {
+                               $reject($e);
+                       }
+               } else {
+                       $reject($this->client->getTransferInfo($this->request)["error"]);
+               }
+
+               $this->client->dequeue($this->request);
+       }
+
+       /**
+        * Cancellation callback
+        * @param callable $resolve
+        * @param callable $reject
+        */
+       private function cancel(callable $resolve, callable $reject) {
+               /* did we finish in the meantime? */
+               if ($this->response) {
+                       $this->complete($resolve, $reject);
+               } else {
+                       $this->client->detach($this);
+                       $this->client->dequeue($this->request);
+                       $reject("Cancelled");
+               }
+       }
+}
diff --git a/lib/API/Deferred.php b/lib/API/Deferred.php
deleted file mode 100644 (file)
index 4384684..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-<?php
-
-namespace seekat\API;
-
-use seekat\API;
-use http\Client;
-use http\Client\Request;
-use http\Client\Response;
-
-class Deferred extends \React\Promise\Deferred implements \SplObserver
-{
-       /**
-        * The endpoint
-        * @var \seekat\API
-        */
-       private $api;
-
-       /**
-        * The HTTP client
-        * @var \http\Client
-        */
-       private $client;
-
-       /**
-        * The executed request
-        * @var \http\Client\Request
-        */
-       private $request;
-
-       /**
-        * The promised response
-        * @var \http\Client\Response
-        */
-       private $response;
-
-       /**
-        * Create a deferred promise for the response of $request
-        *
-        * @var \seekat\API $api The endpoint of the request
-        * @var \http\Client $client The HTTP client to send the request
-        * @var \http\Client\Request The request to execute
-        */
-       function __construct(API $api, Client $client, Request $request) {
-               $this->api = $api;
-               $this->client = $client;
-               $this->request = $request;
-
-               $client->attach($this);
-               $client->enqueue($request);
-
-               parent::__construct(function($resolve, $reject) {
-                       return $this->cancel($resolve, $reject);
-               });
-       }
-
-       /**
-        * Progress observer
-        *
-        * Import the response's data on success and resolve the promise.
-        *
-        * @var \SplSubject $client The observed HTTP client
-        * @var \http\Client\Request The request which generated the update
-        * @var object $progress The progress information
-        */
-       function update(\SplSubject $client, Request $request = null, $progress = null) {
-               if ($request !== $this->request) {
-                       return;
-               }
-
-               $this->notify((object) compact("client", "request", "progress"));
-
-               if ($progress->info === "finished") {
-                       $this->response = $this->client->getResponse();
-                       $this->complete(
-                               [$this, "resolve"],
-                               [$this, "reject"]
-                       );
-               }
-       }
-
-       /**
-        * Completion callback
-        * @param callable $resolve
-        * @param callable $reject
-        */
-       private function complete(callable $resolve, callable $reject) {
-               $this->client->detach($this);
-
-               if ($this->response) {
-                       try {
-                               $resolve($this->api->import($this->response));
-                       } catch (\Exception $e) {
-                               $reject($e);
-                       }
-               } else {
-                       $reject($this->client->getTransferInfo($this->request)["error"]);
-               }
-
-               $this->client->dequeue($this->request);
-       }
-
-       /**
-        * Cancellation callback
-        * @param callable $resolve
-        * @param callable $reject
-        */
-       private function cancel(callable $resolve, callable $reject) {
-               /* did we finish in the meantime? */
-               if ($this->response) {
-                       $this->complete($resolve, $reject);
-               } else {
-                       $this->client->detach($this);
-                       $this->client->dequeue($this->request);
-                       $reject("Cancelled");
-               }
-       }
-}
diff --git a/lib/API/Invoker.php b/lib/API/Invoker.php
new file mode 100644 (file)
index 0000000..ff3a91e
--- /dev/null
@@ -0,0 +1,130 @@
+<?php
+
+namespace seekat\API;
+
+use Generator;
+use http\Client;
+use React\Promise\Deferred;
+use React\Promise\PromiseInterface;
+use React\Promise\ExtendedPromiseInterface;
+
+use function React\Promise\all;
+
+class Invoker extends Deferred
+{
+       /**
+        * The HTTP client
+        * @var Client
+        */
+       private $client;
+
+       /**
+        * The return value of the generator
+        * @var mixed
+        */
+       private $result;
+
+       /**
+        * Cancellation flag
+        * @var bool
+        */
+       private $cancelled = false;
+
+       /**
+        * Create a new generator invoker
+        * @param \http\Client $client
+        */
+       function __construct(Client $client) {
+               $this->client = $client;
+
+               parent::__construct(function($resolve, $reject) {
+                       return $this->cancel($resolve, $reject);
+               });
+       }
+
+       /**
+        * Invoke $generator to create a \Generator which yields promises
+        *
+        * @param callable $generator as function() : \Generator, creating a generator yielding promises
+        * @return \seekat\API\Invoker
+        */
+       function invoke(callable $generator) : Invoker {
+               $this->iterate($generator());
+               return $this;
+       }
+
+       /**
+        * Iterate over $gen, a \Generator yielding promises
+        *
+        * @param \Generator $gen
+        * @return \seekat\API\Invoker
+        */
+       function iterate(Generator $gen) : Invoker {
+               $this->cancelled = false;
+
+               foreach ($gen as $promise) {
+                       if ($this->cancelled) {
+                               break;
+                       }
+                       $this->queue($promise, $gen);
+               }
+
+               if (!$this->cancelled) {
+                       $this->resolve($this->result = $gen->getReturn());
+               }
+               return $this;
+       }
+
+       /**
+        * Get the generator's result
+        * 
+        * @return \React\Promise\ExtendedPromiseInterface
+        */
+       function result() : ExtendedPromiseInterface {
+               return $this->promise();
+       }
+
+       /**
+        * Promise handler
+        * 
+        * @param \React\Promise\PromiseInterface $promise
+        * @param \Generator $to
+        */
+       private function give(PromiseInterface $promise, Generator $to) {
+               $promise->then(function($result) use($to) {
+                       if (($promise = $to->send($result))) {
+                               $this->queue($promise, $to);
+                       }
+               });
+       }
+
+       private function queue($promise, Generator $gen) {
+               if ($promise instanceof PromiseInterface) {
+                               $this->give($promise, $gen);
+               } else {
+                       all($promise)->then(function($results) use($gen) {
+                               if (($promise = $gen->send($results))) {
+                                       $this->queue($promise, $gen);
+                               }
+                       });
+               }
+               $this->client->send();
+       }
+
+       /**
+        * Cancellation callback
+        *
+        * @param callable $resolve
+        * @param callable $reject
+        */
+       private function cancel(callable $resolve, callable $reject) {
+               $this->cancelled = true;
+
+               /* did we finish in the meantime? */
+               if ($this->result) {
+                       $resolve($this->result);
+               } else {
+                       $reject("Cancelled");
+               }
+       }
+}