namespace seekat\API;
-use AsyncInterop\Promise;
+use Exception;
use Generator;
-use http\Client;
use seekat\API;
-use seekat\Exception\{
- InvalidArgumentException, UnexpectedValueException, function exception
-};
-
-final class Consumer
-{
- /**
- * Loop
- * @var callable
- */
- private $loop;
+use seekat\Exception\{function exception};
+final class Consumer {
/**
* The return value of the generator
* @var mixed
*/
- private $result;
+ private mixed $result = null;
/**
* Cancellation flag
- * @var bool
- */
- private $cancelled = false;
-
- /**
- * @var Promise
*/
- private $promise;
+ private bool $cancelled = false;
/**
- * @var \Closure
+ * Promise
*/
- private $resolve;
+ private object $promise;
- /**
- * @var \Closure
- */
- private $reject;
-
- /**
- * @var \Closure
- */
- private $reduce;
+ private \Closure $resolve;
+ private \Closure $reject;
+ private \Closure $reduce;
/**
* Create a new generator consumer
- * @param Future $future
- * @param callable $loop
*/
- function __construct(Future $future, callable $loop) {
- $this->loop = $loop;
-
+ function __construct(private readonly Future $future, private readonly \Closure $loop) {
$context = $future->createContext(function() {
$this->cancelled = true;
});
$this->promise = $future->getPromise($context);
- $this->resolve = API\Future\resolver($future, $context);
- $this->reject = API\Future\rejecter($future, $context);
- $this->reduce = API\Future\reducer($future, $context);
+ $this->resolve = $future->resolver($context);
+ $this->reject = $future->rejecter($context);
+ $this->reduce = $future->reducer();
}
/**
* Iterate over $gen, a \Generator yielding promises
*
* @param Generator $gen
- * @return Promise
+ * @return mixed promise
*/
- function __invoke(Generator $gen) : Promise {
+ function __invoke(Generator $gen) : mixed {
$this->cancelled = false;
-
foreach ($gen as $promise) {
if ($this->cancelled) {
break;
$this->give($promise, $gen);
}
- #($this->loop)();
-
- if (!$this->cancelled) {
- $this->result = $gen->getReturn();
- }
- if (isset($this->result)) {
- ($this->resolve)($this->result);
- } else {
+ if ($this->cancelled) {
($this->reject)("Cancelled");
+ } else if (!$gen->valid()) {
+ try {
+ $this->result = $gen->getReturn();
+ } catch (Exception $e) {
+ assert($e->getMessage() === "Cannot get return value of a generator that hasn't returned");
+ }
}
+ ($this->resolve)($this->result);
return $this->promise;
}
/**
* Promise handler
*
- * @param array|Promise $promise
+ * @param mixed $promise
* @param Generator $gen
*/
- private function give($promise, Generator $gen) {
+ private function give(mixed $promise, Generator $gen) : void {
if ($promise instanceof \Traversable) {
$promise = iterator_to_array($promise);
}
if (is_array($promise)) {
$promise = ($this->reduce)($promise);
}
- if ($promise instanceof Promise) {
- $promise->when(function($error, $result) use($gen) {
- if ($error) {
- $gen->throw(exception($error));
- }
- if (($promise = $gen->send($result))) {
- $this->give($promise, $gen);
- }
- });
- } else {
- $gen->throw(new UnexpectedValueException(
- "Expected Promise or array of Promises; got ".\seekat\typeof($promise)));
- }
+
+ $this->future->handlePromise($promise, function($result) use($gen) {
+ if (($promise = $gen->send($result))) {
+ $this->give($promise, $gen);
+ }
+ }, function($error) use($gen) {
+ $gen->throw(exception($error));
+ });
+
/* FIXME: external loop */
($this->loop)();
}