X-Git-Url: https://git.m6w6.name/?p=m6w6%2Fseekat;a=blobdiff_plain;f=lib%2FAPI%2FConsumer.php;h=8db2b4d614c86993c0e887a14cbe3eda039a1fdc;hp=6e677fe0dde7e3e1dde39f1ac6a80f1aab47b608;hb=654d736df2c46ec2520f73e9089d06a44f2b9c50;hpb=626d8937c75f6d8fca463fa2b374f645068b2d6d diff --git a/lib/API/Consumer.php b/lib/API/Consumer.php index 6e677fe..8db2b4d 100644 --- a/lib/API/Consumer.php +++ b/lib/API/Consumer.php @@ -2,56 +2,53 @@ namespace seekat\API; +use Exception; use Generator; -use http\Client; -use React\Promise\{ - Deferred, - ExtendedPromiseInterface, - PromiseInterface, - function all -}; - -final class Consumer extends Deferred -{ - /** - * The HTTP client - * @var Client - */ - private $client; +use seekat\API; +use seekat\Exception\{function exception}; +final class Consumer { /** * The return value of the generator * @var mixed */ - private $result; + private mixed $result = null; /** * Cancellation flag - * @var bool */ - private $cancelled = false; + private bool $cancelled = false; /** - * Create a new generator invoker - * @param Client $client + * Promise */ - function __construct(Client $client) { - $this->client = $client; + private object $promise; + + private \Closure $resolve; + private \Closure $reject; + private \Closure $reduce; - parent::__construct(function($resolve, $reject) { - return $this->cancel($resolve, $reject); + /** + * Create a new generator consumer + */ + function __construct(private readonly Future $future, private readonly \Closure $loop) { + $context = $future->createContext(function() { + $this->cancelled = true; }); + $this->promise = $future->getPromise($context); + $this->resolve = $future->resolver($context); + $this->reject = $future->rejecter($context); + $this->reduce = $future->reducer(); } /** * Iterate over $gen, a \Generator yielding promises * * @param Generator $gen - * @return ExtendedPromiseInterface + * @return mixed promise */ - function __invoke(Generator $gen) : ExtendedPromiseInterface { + function __invoke(Generator $gen) : mixed { $this->cancelled = false; - foreach ($gen as $promise) { if ($this->cancelled) { break; @@ -59,50 +56,43 @@ final class Consumer extends Deferred $this->give($promise, $gen); } - if (!$this->cancelled) { - $this->resolve($this->result = $gen->getReturn()); + if ($this->cancelled) { + ($this->reject)("Cancelled"); + } else if (!$gen->valid()) { + try { + $this->result = $gen->getReturn(); + } catch (Exception $e) { + assert($e->getMessage() === "Cannot get return value of a generator that hasn't returned"); + } } + ($this->resolve)($this->result); - return $this->promise(); + return $this->promise; } /** * Promise handler * - * @param array|PromiseInterface $promise + * @param mixed $promise * @param Generator $gen */ - private function give($promise, Generator $gen) { - if ($promise instanceof PromiseInterface) { - $promise->then(function($result) use($gen) { - if (($promise = $gen->send($result))) { - $this->give($promise, $gen); - } - }); - } else { - all($promise)->then(function($results) use($gen) { - if (($promise = $gen->send($results))) { - $this->give($promise, $gen); - } - }); + private function give(mixed $promise, Generator $gen) : void { + if ($promise instanceof \Traversable) { + $promise = iterator_to_array($promise); + } + if (is_array($promise)) { + $promise = ($this->reduce)($promise); } - $this->client->send(); - } - /** - * Cancellation callback - * - * @param callable $resolve - * @param callable $reject - */ - private function cancel(callable $resolve, callable $reject) { - $this->cancelled = true; + $this->future->handlePromise($promise, function($result) use($gen) { + if (($promise = $gen->send($result))) { + $this->give($promise, $gen); + } + }, function($error) use($gen) { + $gen->throw(exception($error)); + }); - /* did we finish in the meantime? */ - if ($this->result) { - $resolve($this->result); - } else { - $reject("Cancelled"); - } + /* FIXME: external loop */ + ($this->loop)(); } }