a309ce62799cf535810870868b1dc38d10459834
[m6w6/pq-gateway] / lib / pq / Query / AsyncExecutor.php
1 <?php
2
3 namespace pq\Query;
4
5 use \pq\Query\Executor;
6 use \pq\Query\WriterInterface;
7
8 /**
9 * An asynchronous query executor
10 */
11 class AsyncExecutor extends Executor
12 {
13 /**
14 * Context initializer
15 * @var callable
16 */
17 protected $init;
18
19 /**
20 * Result resolver
21 * @var callable
22 */
23 protected $done;
24
25 /**
26 * Callback queue
27 * @var callable
28 */
29 protected $then;
30
31 /**
32 * Set (promise) callbacks
33 *
34 * Example with reactphp:
35 * <code>
36 * use \React\Promise\Deferred;
37 *
38 * $exec = new pq\Query\AsyncExecutor(new pq\Connection);
39 * $exec->setCallbacks(
40 * # init context
41 * function() {
42 * return new Deferred;
43 * },
44 * # done
45 * function(Deferred $context, $result) {
46 * $context->resolver()->resolve($result);
47 * },
48 * # then
49 * function(Deferred $context, callable $cb) {
50 * return $context->then($cb);
51 * });
52 * $exec->execute($queryWriter, function($result){});
53 * </code>
54 *
55 * Example with amphp:
56 * <code>
57 * use Amp\Future;
58 * use function Amp\reactor;
59 *
60 * $exec = new pq\Query\AsyncExecutor(new pq\Connection);
61 * $exec->setCallbacks(
62 * # init context
63 * function() {
64 * return new Future(reactor());
65 * },
66 * # done
67 * function(Future $context, $result) {
68 * $context->succeed($result);
69 * },
70 * # then
71 * function(Future $context, callable $cb) {
72 * return $context->when(function ($error, $result) use ($cb) {
73 * $cb($result);
74 * });
75 * });
76 * $exec->execute($queryWriter, function($result){});
77 * </code>
78 *
79 * @param callable $init context initializer as function()
80 * @param callable $done result receiver as function($context, $result)
81 * @param callable $then callback queue as function($context, $callback)
82 */
83 function setCallbacks(callable $init, callable $done, callable $then) {
84 $this->init = $init;
85 $this->done = $done;
86 $this->then = $then;
87 }
88
89 /**
90 * Get (promise) callbacks previously set
91 * @return array(callable)
92 */
93 function getCallbacks() {
94 return array($this->init, $this->done, $this->then);
95 }
96
97 /**
98 * Prepare (promise) callbacks
99 * @param callable $callback
100 * @return array($context, $resolver)
101 */
102 protected function prepareCallbacks(callable $callback/*, ... */) {
103 list($init, $done, $then) = $this->getCallbacks();
104
105 $context = $init();
106 foreach (func_get_args() as $cb) {
107 $then($context, $cb);
108 }
109
110 return array($context, function($result) use ($context, $done) {
111 $done($context, $result);
112 });
113 }
114
115 /**
116 * Execute the query asynchronously through \pq\Connection::execParamsAsync()
117 * @param \pq\Query\WriterInterface $query
118 * @param callable $callback result callback
119 * @return mixed context created by the init callback
120 */
121 function execute(WriterInterface $query, callable $callback) {
122 $this->result = null;
123 $this->query = $query;
124 $this->notify();
125
126 list($context, $resolver) = $this->prepareCallbacks(
127 function(\pq\Result $result) {
128 $this->result = $result;
129 $this->notify();
130 }, $callback);
131 $this->getConnection()->execParamsAsync($query, $query->getParams(),
132 $query->getTypes(), $resolver);
133 return $context;
134 }
135 }