namespace seekat\API;
+use Exception;
use Generator;
-use http\Client;
-use React\Promise\{
- Deferred,
- ExtendedPromiseInterface,
- PromiseInterface,
- function all
-};
-
-final class Consumer extends Deferred
-{
- /**
- * The HTTP client
- * @var Client
- */
- private $client;
+use seekat\API;
+use seekat\Exception\{function exception};
+final class Consumer {
/**
* The return value of the generator
* @var mixed
*/
- private $result;
+ private mixed $result = null;
/**
* Cancellation flag
- * @var bool
*/
- private $cancelled = false;
+ private bool $cancelled = false;
/**
- * Create a new generator invoker
- * @param Client $client
+ * Promise
*/
- function __construct(Client $client) {
- $this->client = $client;
+ private object $promise;
+
+ private \Closure $resolve;
+ private \Closure $reject;
+ private \Closure $reduce;
- parent::__construct(function($resolve, $reject) {
- return $this->cancel($resolve, $reject);
+ /**
+ * Create a new generator consumer
+ */
+ function __construct(private readonly Future $future, private readonly \Closure $loop) {
+ $context = $future->createContext(function() {
+ $this->cancelled = true;
});
+ $this->promise = $future->getPromise($context);
+ $this->resolve = $future->resolver($context);
+ $this->reject = $future->rejecter($context);
+ $this->reduce = $future->reducer();
}
/**
* Iterate over $gen, a \Generator yielding promises
*
* @param Generator $gen
- * @return ExtendedPromiseInterface
+ * @return mixed promise
*/
- function __invoke(Generator $gen) : ExtendedPromiseInterface {
+ function __invoke(Generator $gen) : mixed {
$this->cancelled = false;
-
foreach ($gen as $promise) {
if ($this->cancelled) {
break;
$this->give($promise, $gen);
}
- if (!$this->cancelled) {
- $this->resolve($this->result = $gen->getReturn());
+ if ($this->cancelled) {
+ ($this->reject)("Cancelled");
+ } else if (!$gen->valid()) {
+ try {
+ $this->result = $gen->getReturn();
+ } catch (Exception $e) {
+ assert($e->getMessage() === "Cannot get return value of a generator that hasn't returned");
+ }
}
+ ($this->resolve)($this->result);
- return $this->promise();
+ return $this->promise;
}
/**
* Promise handler
*
- * @param array|PromiseInterface $promise
+ * @param mixed $promise
* @param Generator $gen
*/
- private function give($promise, Generator $gen) {
- if ($promise instanceof PromiseInterface) {
- $promise->then(function($result) use($gen) {
- if (($promise = $gen->send($result))) {
- $this->give($promise, $gen);
- }
- });
- } else {
- all($promise)->then(function($results) use($gen) {
- if (($promise = $gen->send($results))) {
- $this->give($promise, $gen);
- }
- });
+ private function give(mixed $promise, Generator $gen) : void {
+ if ($promise instanceof \Traversable) {
+ $promise = iterator_to_array($promise);
+ }
+ if (is_array($promise)) {
+ $promise = ($this->reduce)($promise);
}
- $this->client->send();
- }
- /**
- * Cancellation callback
- *
- * @param callable $resolve
- * @param callable $reject
- */
- private function cancel(callable $resolve, callable $reject) {
- $this->cancelled = true;
+ $this->future->handlePromise($promise, function($result) use($gen) {
+ if (($promise = $gen->send($result))) {
+ $this->give($promise, $gen);
+ }
+ }, function($error) use($gen) {
+ $gen->throw(exception($error));
+ });
- /* did we finish in the meantime? */
- if ($this->result) {
- $resolve($this->result);
- } else {
- $reject("Cancelled");
- }
+ /* FIXME: external loop */
+ ($this->loop)();
}
}