3af614caa9355335c70278f36395f2f5449f8d17
8 use seekat\Exception\
{function exception
};
10 final class Consumer
{
18 * The return value of the generator
27 private $cancelled = false;
55 * Create a new generator consumer
56 * @param Future $future
57 * @param callable $loop
59 function __construct(Future
$future, callable
$loop) {
62 $this->future
= $future;
63 $this->context
= $future->createContext(function() {
64 $this->cancelled
= true;
66 $this->resolve
= API\Future\resolver
($future, $this->context
);
67 $this->reject
= API\Future\rejecter
($future, $this->context
);
68 $this->reduce
= API\Future\reducer
($future, $this->context
);
72 * Iterate over $gen, a \Generator yielding promises
74 * @param Generator $gen
75 * @return mixed promise
77 function __invoke(Generator
$gen) {
78 $this->cancelled
= false;
79 foreach ($gen as $promise) {
80 if ($this->cancelled
) {
83 $this->give($promise, $gen);
86 if ($this->cancelled
) {
87 ($this->reject
)("Cancelled");
90 $this->result
= $gen->getReturn();
91 } catch (Exception
$e) {
93 ($this->resolve
)($this->result
);
96 return $this->context
->promise();
102 * @param mixed $promise
103 * @param Generator $gen
105 private function give($promise, Generator
$gen) {
106 if ($promise instanceof \Traversable
) {
107 $promise = iterator_to_array($promise);
109 if (is_array($promise)) {
110 $promise = ($this->reduce
)($promise);
113 $this->future
->handlePromise($promise, function($result) use($gen) {
114 if (($promise = $gen->send($result))) {
115 $this->give($promise, $gen);
117 }, function($error) use($gen) {
118 $gen->throw(exception($error));
121 /* FIXME: external loop */