X-Git-Url: https://git.m6w6.name/?p=m6w6%2Fseekat;a=blobdiff_plain;f=lib%2FAPI%2FConsumer.php;fp=lib%2FAPI%2FConsumer.php;h=926eb736e756f066d61d9b7db1b497dfa0b18a4e;hp=d593588aee7883c47302b9c4b03c953793e7a929;hb=d38b3ae03472ba2f9af5009778574b23472bb3f7;hpb=e368287b3cd2dd40945ac8d1a1946bc32268007d diff --git a/lib/API/Consumer.php b/lib/API/Consumer.php index d593588..926eb73 100644 --- a/lib/API/Consumer.php +++ b/lib/API/Consumer.php @@ -2,7 +2,6 @@ namespace seekat\API; -use AsyncInterop\Promise; use Generator; use http\Client; use seekat\API; @@ -31,9 +30,14 @@ final class Consumer private $cancelled = false; /** - * @var Promise + * @var Future */ - private $promise; + private $future; + + /** + * @var mixed + */ + private $context; /** * @var \Closure @@ -58,22 +62,22 @@ final class Consumer function __construct(Future $future, callable $loop) { $this->loop = $loop; - $context = $future->createContext(function() { + $this->future = $future; + $this->context = $future->createContext(function() { $this->cancelled = true; }); - $this->promise = $future->getPromise($context); - $this->resolve = API\Future\resolver($future, $context); - $this->reject = API\Future\rejecter($future, $context); - $this->reduce = API\Future\reducer($future, $context); + $this->resolve = API\Future\resolver($future, $this->context); + $this->reject = API\Future\rejecter($future, $this->context); + $this->reduce = API\Future\reducer($future, $this->context); } /** * Iterate over $gen, a \Generator yielding promises * * @param Generator $gen - * @return Promise + * @return mixed promise */ - function __invoke(Generator $gen) : Promise { + function __invoke(Generator $gen) { $this->cancelled = false; foreach ($gen as $promise) { @@ -94,13 +98,13 @@ final class Consumer ($this->reject)("Cancelled"); } - return $this->promise; + return $this->context->promise(); } /** * Promise handler * - * @param array|Promise $promise + * @param mixed $promise * @param Generator $gen */ private function give($promise, Generator $gen) { @@ -110,19 +114,15 @@ final class Consumer if (is_array($promise)) { $promise = ($this->reduce)($promise); } - if ($promise instanceof Promise) { - $promise->when(function($error, $result) use($gen) { - if ($error) { - $gen->throw(exception($error)); - } - if (($promise = $gen->send($result))) { - $this->give($promise, $gen); - } - }); - } else { - $gen->throw(new UnexpectedValueException( - "Expected Promise or array of Promises; got ".\seekat\typeof($promise))); - } + + $this->future->handlePromise($promise, function($result) use($gen) { + if (($promise = $gen->send($result))) { + $this->give($promise, $gen); + } + }, function($error) use($gen) { + $gen->throw(exception($error)); + }); + /* FIXME: external loop */ ($this->loop)(); }