8db2b4d614c86993c0e887a14cbe3eda039a1fdc
8 use seekat\Exception\
{function exception
};
10 final class Consumer
{
12 * The return value of the generator
15 private mixed $result = null;
20 private bool $cancelled = false;
25 private object $promise;
27 private \Closure
$resolve;
28 private \Closure
$reject;
29 private \Closure
$reduce;
32 * Create a new generator consumer
34 function __construct(private readonly Future
$future, private readonly \Closure
$loop) {
35 $context = $future->createContext(function() {
36 $this->cancelled
= true;
38 $this->promise
= $future->getPromise($context);
39 $this->resolve
= $future->resolver($context);
40 $this->reject
= $future->rejecter($context);
41 $this->reduce
= $future->reducer();
45 * Iterate over $gen, a \Generator yielding promises
47 * @param Generator $gen
48 * @return mixed promise
50 function __invoke(Generator
$gen) : mixed {
51 $this->cancelled
= false;
52 foreach ($gen as $promise) {
53 if ($this->cancelled
) {
56 $this->give($promise, $gen);
59 if ($this->cancelled
) {
60 ($this->reject
)("Cancelled");
61 } else if (!$gen->valid()) {
63 $this->result
= $gen->getReturn();
64 } catch (Exception
$e) {
65 assert($e->getMessage() === "Cannot get return value of a generator that hasn't returned");
68 ($this->resolve
)($this->result
);
70 return $this->promise
;
76 * @param mixed $promise
77 * @param Generator $gen
79 private function give(mixed $promise, Generator
$gen) : void
{
80 if ($promise instanceof \Traversable
) {
81 $promise = iterator_to_array($promise);
83 if (is_array($promise)) {
84 $promise = ($this->reduce
)($promise);
87 $this->future
->handlePromise($promise, function($result) use($gen) {
88 if (($promise = $gen->send($result))) {
89 $this->give($promise, $gen);
91 }, function($error) use($gen) {
92 $gen->throw(exception($error));
95 /* FIXME: external loop */