update to current promise APIs
[m6w6/pq-gateway] / lib / pq / Query / AsyncExecutor.php
1 <?php
2
3 namespace pq\Query;
4
5 use \pq\Query\Executor;
6 use \pq\Query\WriterInterface;
7
8 /**
9 * An asynchronous query executor
10 */
11 class AsyncExecutor extends Executor
12 {
13 /**
14 * Context initializer
15 * @var callable
16 */
17 protected $init;
18
19 /**
20 * Result resolver
21 * @var callable
22 */
23 protected $done;
24
25 /**
26 * Callback queue
27 * @var callable
28 */
29 protected $then;
30
31 /**
32 * Set (promise) callbacks
33 *
34 * Example with reactphp:
35 * <code>
36 * use React\Promise\Deferred;
37 *
38 * $exec = new pq\Query\AsyncExecutor(new pq\Connection);
39 * $exec->setCallbacks(
40 * # init context
41 * function() {
42 * return new Deferred;
43 * },
44 * # done
45 * function(Deferred $context, $result) {
46 * $context->resolve($result);
47 * },
48 * # then
49 * function(Deferred $context, callable $cb) {
50 * return $context->promise()->then($cb);
51 * });
52 * $exec->execute($queryWriter, function($result){});
53 * </code>
54 *
55 * Example with amphp:
56 * <code>
57 * use Amp\Deferred;
58 *
59 * $exec = new pq\Query\AsyncExecutor(new pq\Connection);
60 * $exec->setCallbacks(
61 * # init context
62 * function() {
63 * return new Deferred;
64 * },
65 * # done
66 * function(Deferred $context, $result) {
67 * $context->succeed($result);
68 * },
69 * # then
70 * function(Deferred $context, callable $cb) {
71 * return $context->promise()->when(function($error, $result) use ($cb) {
72 * $cb($result);
73 * });
74 * });
75 * $exec->execute($queryWriter, function($result){});
76 * </code>
77 *
78 * @param callable $init context initializer as function()
79 * @param callable $done result receiver as function($context, $result)
80 * @param callable $then callback queue as function($context, $callback)
81 */
82 function setCallbacks(callable $init, callable $done, callable $then) {
83 $this->init = $init;
84 $this->done = $done;
85 $this->then = $then;
86 }
87
88 /**
89 * Get (promise) callbacks previously set
90 * @return array(callable)
91 */
92 function getCallbacks() {
93 return array($this->init, $this->done, $this->then);
94 }
95
96 /**
97 * Prepare (promise) callbacks
98 * @param callable $callback
99 * @return array($context, $resolver)
100 */
101 protected function prepareCallbacks(callable $callback/*, ... */) {
102 list($init, $done, $then) = $this->getCallbacks();
103
104 $context = $init();
105 foreach (func_get_args() as $cb) {
106 $then($context, $cb);
107 }
108
109 return array($context, function($result) use ($context, $done) {
110 $done($context, $result);
111 });
112 }
113
114 /**
115 * Execute the query asynchronously through \pq\Connection::execParamsAsync()
116 * @param \pq\Query\WriterInterface $query
117 * @param callable $callback result callback
118 * @return mixed context created by the init callback
119 */
120 function execute(WriterInterface $query, callable $callback) {
121 $this->result = null;
122 $this->query = $query;
123 $this->notify();
124
125 list($context, $resolver) = $this->prepareCallbacks(
126 function(\pq\Result $result) {
127 $this->result = $result;
128 $this->notify();
129 return $result;
130 }, $callback);
131 $this->getConnection()->execParamsAsync($query, $query->getParams(),
132 $query->getTypes(), $resolver);
133 return $context;
134 }
135 }