dont depend on react
[m6w6/pq-gateway] / lib / pq / Query / AsyncExecutor.php
index 737c0ebf5492429f36d58183222682dda11d4c28..e7c1c35998f1847a4b544ac0ece47337fc1418bc 100644 (file)
@@ -5,34 +5,138 @@ namespace pq\Query\Executor;
 use \pq\Query\Executor;
 use \pq\Query\WriterInterface;
 
-/**
- * @requires \React\Promise
- */
-use \React\Promise\Deferred;
-
 /**
  * An asynchronous query executor
  */
-class Async extends Executor
+class AsyncExecutor extends Executor
 {
+       /**
+        * Context initializer
+        * @var callable
+        */
+       protected $init;
+       
+       /**
+        * Result resolver
+        * @var callable
+        */
+       protected $done;
+       
+       /**
+        * Callback queue
+        * @var callable
+        */
+       protected $then;
+       
+       /**
+        * Set (promise) callbacks
+        * 
+        * Example with reactphp:
+        * <code>
+        * use \React\Promise\Deferred;
+        * 
+        * $exec = new pq\Query\AsyncExecutor(new pq\Connection);
+        * $exec->setCallbacks(
+        * # init context
+        * function() {
+        *              return new Deferred;
+        * },
+        * # done
+        * function(Deferred $context, $result) {
+        *              $context->resolver()->resolve($result);
+        * },
+        * # then
+        * function(Deferred $context, callable $cb) {
+        *              return $context->then($cb);
+        * });
+        * $exec->execute($queryWriter, function($result){});
+        * </code>
+        * 
+        * Example with amphp:
+        * <code>
+        * use amp\Future;
+        * use function amp\reactor;
+        * 
+        * $exec = new pq\Query\AsyncExecutor(new pq\Connection);
+        * $exec->setCallbacks(
+        * # init context
+        * function() {
+        *              return new Future(reactor());
+        * },
+        * # done
+        * function(Future $context, $result) {
+        *              $context->succeed($result);
+        * },
+        * # then
+        * function(Future $context, callable $cb) {
+        *              return $context->when(function ($error, $result) {
+        *                      $cb($result);
+        *              });
+        * });
+        * $exec->execute($queryWriter, function($result){});
+        * </code>
+        * 
+        * @param callable $init context initializer as function()
+        * @param callable $done result receiver as function($context, $result)
+        * @param callable $then callback queue as function($context, $callback)
+        */
+       function setCallbacks(callable $init, callable $done, callable $then) {
+               $this->init = $init;
+               $this->done = $done;
+               $this->then = $then;
+       }
+       
+       /**
+        * Get (promise) callbacks previously set
+        * @return array(callable)
+        */
+       function getCallbacks() {
+               return array($this->init, $this->done, $this->then);
+       }
+       
+       /**
+        * Prepare (promise) callbacks
+        * @param callable $callback
+        * @return array($context, $resolver)
+        */
+       protected function prepareCallback(callable $callback/*, ... */) {
+               list($init, $done, $then) = $this->getCallbacks();
+               
+               $context = $init();
+               foreach (func_get_args() as $cb) {
+                       $then($context, $cb);
+               }
+               $then($context, $callback);
+               
+               return array($context, function($result) use ($context, $done) {
+                       $done($context, $result);
+               });
+       }
+       
+       /**
+        * Result callback
+        * @param \pq\Result $result
+        */
+       protected function receiveResult(\pq\Result $result) {
+               $this->result = $result;
+               $this->notify();
+       }
+       
        /**
         * Execute the query asynchronously through \pq\Connection::execParamsAsync()
         * @param \pq\Query\WriterInterface $query
-        * @param callable $callback
-        * @return \React\Promise\DeferredPromise
+        * @param callable $callback result callback
+        * @return mixed context created by the init callback
         */
        function execute(WriterInterface $query, callable $callback) {
                $this->result = null;
                $this->query = $query;
                $this->notify();
                
-               $deferred = new Deferred;
-               $this->getConnection()->execParamsAsync($query, $query->getParams(), $query->getTypes(), 
-                       array($deferred->resolver(), "resolve"));
-               
-               return $deferred->then(function($result) {
-                       $this->result = $result;
-                       $this->notify();
-               })->then($callback);
+               list($context, $resolver) = $this->prepareCallback(
+                       array($this, "receiveResult"), $callback);
+               $this->getConnection()->execParamsAsync($query, $query->getParams(), 
+                       $query->getTypes(), $resolver);
+               return $context;
        }
 }