5 use AsyncInterop\Promise
;
10 InvalidArgumentException
, UnexpectedValueException
, function exception
22 * The return value of the generator
31 private $cancelled = false;
54 * Create a new generator consumer
55 * @param Future $future
56 * @param callable $loop
58 function __construct(Future
$future, callable
$loop) {
61 $context = $future->createContext(function() {
62 $this->cancelled
= true;
64 $this->promise
= $future->getPromise($context);
65 $this->resolve
= API\Future\resolver
($future, $context);
66 $this->reject
= API\Future\rejecter
($future, $context);
67 $this->reduce
= API\Future\reducer
($future, $context);
71 * Iterate over $gen, a \Generator yielding promises
73 * @param Generator $gen
76 function __invoke(Generator
$gen) : Promise
{
77 $this->cancelled
= false;
79 foreach ($gen as $promise) {
80 if ($this->cancelled
) {
83 $this->give($promise, $gen);
88 if (!$this->cancelled
) {
89 $this->result
= $gen->getReturn();
91 if (isset($this->result
)) {
92 ($this->resolve
)($this->result
);
94 ($this->reject
)("Cancelled");
97 return $this->promise
;
103 * @param array|Promise $promise
104 * @param Generator $gen
106 private function give($promise, Generator
$gen) {
107 if ($promise instanceof \Traversable
) {
108 $promise = iterator_to_array($promise);
110 if (is_array($promise)) {
111 $promise = ($this->reduce
)($promise);
113 if ($promise instanceof Promise
) {
114 $promise->when(function($error, $result) use($gen) {
116 $gen->throw(exception($error));
118 if (($promise = $gen->send($result))) {
119 $this->give($promise, $gen);
123 $gen->throw(new UnexpectedValueException(
124 "Expected Promise or array of Promises; got ".\seekat\typeof
($promise)));
126 /* FIXME: external loop */