namespace seekat\API;
+use AsyncInterop\Promise;
use Generator;
use http\Client;
-use React\Promise\{
- Deferred,
- ExtendedPromiseInterface,
- PromiseInterface,
- function all
+use seekat\API;
+use seekat\Exception\{
+ InvalidArgumentException, UnexpectedValueException, function exception
};
-final class Consumer extends Deferred
+final class Consumer
{
/**
- * The HTTP client
- * @var Client
+ * Loop
+ * @var callable
*/
- private $client;
+ private $loop;
/**
* The return value of the generator
private $cancelled = false;
/**
- * Create a new generator invoker
- * @param Client $client
+ * @var Promise
*/
- function __construct(Client $client) {
- $this->client = $client;
+ private $promise;
- parent::__construct(function($resolve, $reject) {
- return $this->cancel($resolve, $reject);
+ /**
+ * @var \Closure
+ */
+ private $resolve;
+
+ /**
+ * @var \Closure
+ */
+ private $reject;
+
+ /**
+ * @var \Closure
+ */
+ private $reduce;
+
+ /**
+ * Create a new generator consumer
+ * @param Future $future
+ * @param callable $loop
+ */
+ function __construct(Future $future, callable $loop) {
+ $this->loop = $loop;
+
+ $context = $future->createContext(function() {
+ $this->cancelled = true;
});
+ $this->promise = $future->getPromise($context);
+ $this->resolve = API\Future\resolver($future, $context);
+ $this->reject = API\Future\rejecter($future, $context);
+ $this->reduce = API\Future\reducer($future, $context);
}
/**
* Iterate over $gen, a \Generator yielding promises
*
* @param Generator $gen
- * @return ExtendedPromiseInterface
+ * @return Promise
*/
- function __invoke(Generator $gen) : ExtendedPromiseInterface {
+ function __invoke(Generator $gen) : Promise {
$this->cancelled = false;
foreach ($gen as $promise) {
$this->give($promise, $gen);
}
+ #($this->loop)();
+
if (!$this->cancelled) {
- $this->resolve($this->result = $gen->getReturn());
+ $this->result = $gen->getReturn();
+ }
+ if (isset($this->result)) {
+ ($this->resolve)($this->result);
+ } else {
+ ($this->reject)("Cancelled");
}
- return $this->promise();
+ return $this->promise;
}
/**
* Promise handler
*
- * @param array|PromiseInterface $promise
+ * @param array|Promise $promise
* @param Generator $gen
*/
private function give($promise, Generator $gen) {
- if ($promise instanceof PromiseInterface) {
- $promise->then(function($result) use($gen) {
- if (($promise = $gen->send($result))) {
- $this->give($promise, $gen);
+ if ($promise instanceof \Traversable) {
+ $promise = iterator_to_array($promise);
+ }
+ if (is_array($promise)) {
+ $promise = ($this->reduce)($promise);
+ }
+ if ($promise instanceof Promise) {
+ $promise->when(function($error, $result) use($gen) {
+ if ($error) {
+ $gen->throw(exception($error));
}
- });
- } else {
- all($promise)->then(function($results) use($gen) {
- if (($promise = $gen->send($results))) {
+ if (($promise = $gen->send($result))) {
$this->give($promise, $gen);
}
});
- }
- $this->client->send();
- }
-
- /**
- * Cancellation callback
- *
- * @param callable $resolve
- * @param callable $reject
- */
- private function cancel(callable $resolve, callable $reject) {
- $this->cancelled = true;
-
- /* did we finish in the meantime? */
- if ($this->result) {
- $resolve($this->result);
} else {
- $reject("Cancelled");
+ $gen->throw(new UnexpectedValueException(
+ "Expected Promise or array of Promises; got ".\seekat\typeof($promise)));
}
+ /* FIXME: external loop */
+ ($this->loop)();
}
}