9bdc030ba955e07c562b20f6913c8d88b6c62287
[m6w6/seekat] / lib / API / Call / Deferred.php
1 <?php
2
3 namespace seekat\API\Call;
4
5 use AsyncInterop\Promise;
6 use Exception;
7 use http\{
8 Client, Client\Request, Client\Response
9 };
10 use Psr\Log\LoggerInterface;
11 use seekat\API;
12 use SplObserver;
13 use SplSubject;
14
15 final class Deferred implements SplObserver
16 {
17 /**
18 * The response importer
19 *
20 * @var Result
21 */
22 private $result;
23
24 /**
25 * The HTTP client
26 *
27 * @var Client
28 */
29 private $client;
30
31 /**
32 * Request cache
33 *
34 * @var callable
35 */
36 private $cache;
37
38 /**
39 * @var LoggerInterface
40 */
41 private $logger;
42
43 /**
44 * The executed request
45 *
46 * @var Request
47 */
48 private $request;
49
50 /**
51 * The promised response
52 *
53 * @var Response
54 */
55 private $response;
56
57 /**
58 * @var Promise
59 */
60 private $promise;
61
62 /**
63 * @var \Closure
64 */
65 private $resolve;
66
67 /**
68 * @var \Closure
69 */
70 private $reject;
71
72 /**
73 * @var \Closure
74 */
75 private $update;
76
77 /**
78 * Create a deferred promise for the response of $request
79 *
80 * @param API $api The endpoint of the request
81 * @param Request $request The request to execute
82 * @param Cache\Service $cache
83 */
84 function __construct(API $api, Request $request, Cache\Service $cache = null) {
85 $this->request = $request;
86 $this->client = $api->getClient();
87 $this->logger = $api->getLogger();
88 $this->result = new Result($api);
89 $this->cache = new Cache($cache);
90
91 $future = $api->getFuture();
92 $context = $future->createContext(function() {
93 if ($this->response) {
94 /* we did finish in the meantime */
95 $this->complete();
96 } else {
97 $this->client->detach($this);
98 $this->client->dequeue($this->request);
99 ($this->reject)("Cancelled");
100 }
101 });
102 $this->promise = $future->getPromise($context);
103 $this->resolve = API\Future\resolver($future, $context);
104 $this->reject = API\Future\rejecter($future, $context);
105 $this->update = API\Future\updater($future, $context);
106 }
107
108 function __invoke() : Promise {
109 if ($this->cache->load($this->request, $cached)) {
110 $this->logger->info("deferred -> cached", [
111 "method" => $this->request->getRequestMethod(),
112 "url" => $this->request->getRequestUrl(),
113 ]);
114
115 $this->response = $cached;
116 $this->complete();
117 } else {
118 $this->client->attach($this);
119 $this->client->enqueue($this->request, function(Response $response) use($cached) {
120 if ($response->getResponseCode() == 304) {
121 $this->response = $cached;
122 } else {
123 $this->response = $response;
124 }
125 $this->complete();
126 return true;
127 });
128 $this->logger->info("deferred -> enqueued", [
129 "method" => $this->request->getRequestMethod(),
130 "url" => $this->request->getRequestUrl(),
131 ]);
132 /* start off */
133 $this->client->once();
134 }
135
136 return $this->promise;
137 }
138
139 /**
140 * Progress observer
141 *
142 * Import the response's data on success and resolve the promise.
143 *
144 * @param SplSubject $client The observed HTTP client
145 * @param Request $request The request which generated the update
146 * @param object $progress The progress information
147 */
148 function update(SplSubject $client, Request $request = null, $progress = null) {
149 if ($request !== $this->request) {
150 return;
151 }
152
153 ($this->update)((object) compact("client", "request", "progress"));
154 }
155
156 /**
157 * Completion callback
158 * @param callable $resolve
159 * @param callable $reject
160 */
161 private function complete() {
162 $this->client->detach($this);
163
164 if ($this->response) {
165 try {
166 $api = ($this->result)($this->response);
167
168 $this->cache->save($this->request, $this->response);
169
170 ($this->resolve)($api);
171 } catch (Exception $e) {
172 ($this->reject)($e);
173 }
174 } else {
175 ($this->reject)($this->client->getTransferInfo($this->request)->error);
176 }
177 }
178
179 }