From: Michael Wallner Date: Wed, 20 May 2015 15:42:36 +0000 (+0200) Subject: Merge branch 'async' X-Git-Tag: v2.1.0~1 X-Git-Url: https://git.m6w6.name/?p=m6w6%2Fpq-gateway;a=commitdiff_plain;h=0861b6cd0d8f86add1a068fd3d9740428c8418cd;hp=3aa333045c0bb93b83e87e190b4ed49333d106f0 Merge branch 'async' --- diff --git a/composer.json b/composer.json index 07c3d53..2fd2af6 100644 --- a/composer.json +++ b/composer.json @@ -17,7 +17,8 @@ "pq\\Query": "lib" } }, - "suggest": { - "react/promise": "1.0.*" + "require-dev": { + "react/promise": "~2.2", + "amphp/amp": "^1.0-beta" } } diff --git a/lib/pq/Query/AsyncExecutor.php b/lib/pq/Query/AsyncExecutor.php index 737c0eb..a61f87a 100644 --- a/lib/pq/Query/AsyncExecutor.php +++ b/lib/pq/Query/AsyncExecutor.php @@ -1,38 +1,135 @@ + * use React\Promise\Deferred; + * + * $exec = new pq\Query\AsyncExecutor(new pq\Connection); + * $exec->setCallbacks( + * # init context + * function() { + * return new Deferred; + * }, + * # done + * function(Deferred $context, $result) { + * $context->resolve($result); + * }, + * # then + * function(Deferred $context, callable $cb) { + * return $context->promise()->then($cb); + * }); + * $exec->execute($queryWriter, function($result){}); + * + * + * Example with amphp: + * + * use Amp\Deferred; + * + * $exec = new pq\Query\AsyncExecutor(new pq\Connection); + * $exec->setCallbacks( + * # init context + * function() { + * return new Deferred; + * }, + * # done + * function(Deferred $context, $result) { + * $context->succeed($result); + * }, + * # then + * function(Deferred $context, callable $cb) { + * return $context->promise()->when(function($error, $result) use ($cb) { + * $cb($result); + * }); + * }); + * $exec->execute($queryWriter, function($result){}); + * + * + * @param callable $init context initializer as function() + * @param callable $done result receiver as function($context, $result) + * @param callable $then callback queue as function($context, $callback) + */ + function setCallbacks(callable $init, callable $done, callable $then) { + $this->init = $init; + $this->done = $done; + $this->then = $then; + } + + /** + * Get (promise) callbacks previously set + * @return array(callable) + */ + function getCallbacks() { + return array($this->init, $this->done, $this->then); + } + + /** + * Prepare (promise) callbacks + * @param callable $callback + * @return array($context, $resolver) + */ + protected function prepareCallbacks(callable $callback/*, ... */) { + list($init, $done, $then) = $this->getCallbacks(); + + $context = $init(); + foreach (func_get_args() as $cb) { + $then($context, $cb); + } + + return array($context, function($result) use ($context, $done) { + $done($context, $result); + }); + } + /** * Execute the query asynchronously through \pq\Connection::execParamsAsync() * @param \pq\Query\WriterInterface $query - * @param callable $callback - * @return \React\Promise\DeferredPromise + * @param callable $callback result callback + * @return mixed context created by the init callback */ function execute(WriterInterface $query, callable $callback) { $this->result = null; $this->query = $query; $this->notify(); - $deferred = new Deferred; - $this->getConnection()->execParamsAsync($query, $query->getParams(), $query->getTypes(), - array($deferred->resolver(), "resolve")); - - return $deferred->then(function($result) { - $this->result = $result; - $this->notify(); - })->then($callback); + list($context, $resolver) = $this->prepareCallbacks( + function(\pq\Result $result) { + $this->result = $result; + $this->notify(); + return $result; + }, $callback); + $this->getConnection()->execParamsAsync($query, $query->getParams(), + $query->getTypes(), $resolver); + return $context; } } diff --git a/tests/lib/pq/Query/AsyncExecutorTest.php b/tests/lib/pq/Query/AsyncExecutorTest.php new file mode 100644 index 0000000..ad68a91 --- /dev/null +++ b/tests/lib/pq/Query/AsyncExecutorTest.php @@ -0,0 +1,72 @@ +conn = new Connection(PQ_TEST_DSN); + $this->query = new Writer("SELECT \$1::int,\$2::int", [1,2]); + } + + function testReact() { + $exec = new AsyncExecutor($this->conn); + $exec->setCallbacks( + # init context + function() { + return new Reacted; + }, + # done + function(Reacted $context, $result) { + $context->resolve($result); + }, + # then + function(Reacted $context, callable $cb) { + return $context->promise()->then($cb); + }); + + $guard = new \stdClass; + $exec->execute($this->query, function($result) use($guard) { + $guard->result = $result; + }); + $this->conn->getResult(); + $this->assertTrue(!empty($guard->result), "guard is empty"); + $this->assertInstanceOf("pq\\Result", $guard->result); + $this->assertSame([[1,2]], $guard->result->fetchAll()); + } + + function testAmp() { + $exec = new AsyncExecutor($this->conn); + $exec->setCallbacks( + # init context + function() { + return new Amped; + }, + # done + function(Amped $context, $result) { + $context->succeed($result); + }, + # then + function(Amped $context, callable $cb) { + return $context->promise()->when(function($error, $result) use ($cb) { + $cb($result); + }); + }); + $guard = new \stdClass; + $exec->execute($this->query, function($result) use($guard) { + $guard->result = $result; + }); + $this->conn->getResult(); + $this->assertTrue(!empty($guard->result), "guard is empty"); + $this->assertInstanceOf("pq\\Result", $guard->result); + $this->assertSame([[1,2]], $guard->result->fetchAll()); + } +} diff --git a/tests/setup.inc b/tests/setup.inc index 9c7a932..bdee9d3 100644 --- a/tests/setup.inc +++ b/tests/setup.inc @@ -39,7 +39,7 @@ const PQ_TEST_TEARDOWN_SQL = <<getConnection();