448526519d3331f7d33406c89531d3a3df7770cf
[awesomized/libmemcached] / src / libmemcachedutil / pool.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcachedutil/common.h"
17
18 #include <cassert>
19 #include <cerrno>
20 #include <pthread.h>
21 #include <memory>
22
23 struct memcached_pool_st {
24 pthread_mutex_t mutex;
25 pthread_cond_t cond;
26 memcached_st *master;
27 memcached_st **server_pool;
28 int firstfree;
29 const uint32_t size;
30 uint32_t current_size;
31 bool _owns_master;
32 struct timespec _timeout;
33
34 memcached_pool_st(memcached_st *master_arg, size_t max_arg)
35 : master(master_arg)
36 , server_pool(NULL)
37 , firstfree(-1)
38 , size(uint32_t(max_arg))
39 , current_size(0)
40 , _owns_master(false) {
41 pthread_mutex_init(&mutex, NULL);
42 pthread_cond_init(&cond, NULL);
43 _timeout.tv_sec = 5;
44 _timeout.tv_nsec = 0;
45 }
46
47 const struct timespec &timeout() const { return _timeout; }
48
49 bool release(memcached_st *, memcached_return_t &rc);
50
51 memcached_st *fetch(memcached_return_t &rc);
52 memcached_st *fetch(const struct timespec &, memcached_return_t &rc);
53
54 bool init(uint32_t initial);
55
56 ~memcached_pool_st() {
57 for (int x = 0; x <= firstfree; ++x) {
58 memcached_free(server_pool[x]);
59 server_pool[x] = NULL;
60 }
61
62 int error;
63 if ((error = pthread_mutex_destroy(&mutex)) != 0) {
64 assert_vmsg(error != 0, "pthread_mutex_destroy() %s(%d)", strerror(error), error);
65 }
66
67 if ((error = pthread_cond_destroy(&cond)) != 0) {
68 assert_vmsg(error != 0, "pthread_cond_destroy() %s", strerror(error));
69 }
70
71 delete[] server_pool;
72 if (_owns_master) {
73 memcached_free(master);
74 }
75 }
76
77 void increment_version() { ++master->configure.version; }
78
79 bool compare_version(const memcached_st *arg) const {
80 return (arg->configure.version == version());
81 }
82
83 int32_t version() const { return master->configure.version; }
84 };
85
86 /**
87 * Grow the connection pool by creating a connection structure and clone the
88 * original memcached handle.
89 */
90 static bool grow_pool(memcached_pool_st *pool) {
91 assert(pool);
92
93 memcached_st *obj;
94 if (not(obj = memcached_clone(NULL, pool->master))) {
95 return false;
96 }
97
98 pool->server_pool[++pool->firstfree] = obj;
99 pool->current_size++;
100 obj->configure.version = pool->version();
101
102 return true;
103 }
104
105 bool memcached_pool_st::init(uint32_t initial) {
106 server_pool = new (std::nothrow) memcached_st *[size];
107 if (server_pool == NULL) {
108 return false;
109 }
110
111 /*
112 Try to create the initial size of the pool. An allocation failure at
113 this time is not fatal..
114 */
115 for (unsigned int x = 0; x < initial; ++x) {
116 if (grow_pool(this) == false) {
117 break;
118 }
119 }
120
121 return true;
122 }
123
124 static inline memcached_pool_st *_pool_create(memcached_st *master, uint32_t initial,
125 uint32_t max) {
126 if (initial == 0 or max == 0 or (initial > max)) {
127 return NULL;
128 }
129
130 memcached_pool_st *object = new (std::nothrow) memcached_pool_st(master, max);
131 if (object == NULL) {
132 return NULL;
133 }
134
135 /*
136 Try to create the initial size of the pool. An allocation failure at
137 this time is not fatal..
138 */
139 if (not object->init(initial)) {
140 delete object;
141 return NULL;
142 }
143
144 return object;
145 }
146
147 memcached_pool_st *memcached_pool_create(memcached_st *master, uint32_t initial, uint32_t max) {
148 return _pool_create(master, initial, max);
149 }
150
151 memcached_pool_st *memcached_pool(const char *option_string, size_t option_string_length) {
152 memcached_st *memc = memcached(option_string, option_string_length);
153
154 if (memc == NULL) {
155 return NULL;
156 }
157
158 memcached_pool_st *self =
159 memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
160 if (self == NULL) {
161 memcached_free(memc);
162 return NULL;
163 }
164
165 self->_owns_master = true;
166
167 return self;
168 }
169
170 memcached_st *memcached_pool_destroy(memcached_pool_st *pool) {
171 if (pool == NULL) {
172 return NULL;
173 }
174
175 // Legacy that we return the original structure
176 memcached_st *ret = NULL;
177 if (pool->_owns_master) {
178 } else {
179 ret = pool->master;
180 }
181
182 delete pool;
183
184 return ret;
185 }
186
187 memcached_st *memcached_pool_st::fetch(memcached_return_t &rc) {
188 static struct timespec relative_time = {0, 0};
189 return fetch(relative_time, rc);
190 }
191
192 memcached_st *memcached_pool_st::fetch(const struct timespec &relative_time,
193 memcached_return_t &rc) {
194 rc = MEMCACHED_SUCCESS;
195
196 int error;
197 if ((error = pthread_mutex_lock(&mutex)) != 0) {
198 rc = MEMCACHED_IN_PROGRESS;
199 return NULL;
200 }
201
202 memcached_st *ret = NULL;
203 do {
204 if (firstfree > -1) {
205 ret = server_pool[firstfree--];
206 } else if (current_size == size) {
207 if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0) {
208 error = pthread_mutex_unlock(&mutex);
209 rc = MEMCACHED_NOTFOUND;
210
211 return NULL;
212 }
213
214 struct timespec time_to_wait = {0, 0};
215 time_to_wait.tv_sec = time(NULL) + relative_time.tv_sec;
216 time_to_wait.tv_nsec = relative_time.tv_nsec;
217
218 int thread_ret;
219 if ((thread_ret = pthread_cond_timedwait(&cond, &mutex, &time_to_wait)) != 0) {
220 int unlock_error;
221 if ((unlock_error = pthread_mutex_unlock(&mutex)) != 0) {
222 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
223 }
224
225 if (thread_ret == ETIMEDOUT) {
226 rc = MEMCACHED_TIMEOUT;
227 } else {
228 errno = thread_ret;
229 rc = MEMCACHED_ERRNO;
230 }
231
232 return NULL;
233 }
234 } else if (grow_pool(this) == false) {
235 int unlock_error;
236 if ((unlock_error = pthread_mutex_unlock(&mutex)) != 0) {
237 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
238 }
239
240 return NULL;
241 }
242 } while (ret == NULL);
243
244 if ((error = pthread_mutex_unlock(&mutex)) != 0) {
245 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
246 }
247
248 return ret;
249 }
250
251 bool memcached_pool_st::release(memcached_st *released, memcached_return_t &rc) {
252 rc = MEMCACHED_SUCCESS;
253 if (released == NULL) {
254 rc = MEMCACHED_INVALID_ARGUMENTS;
255 return false;
256 }
257
258 int error;
259 if ((error = pthread_mutex_lock(&mutex))) {
260 rc = MEMCACHED_IN_PROGRESS;
261 return false;
262 }
263
264 /*
265 Someone updated the behavior on the object, so we clone a new memcached_st with the new
266 settings. If we fail to clone, we keep the old one around.
267 */
268 if (compare_version(released) == false) {
269 memcached_st *memc;
270 if ((memc = memcached_clone(NULL, master))) {
271 memcached_free(released);
272 released = memc;
273 }
274 }
275
276 server_pool[++firstfree] = released;
277
278 if (firstfree == 0 and current_size == size) {
279 /* we might have people waiting for a connection.. wake them up :-) */
280 if ((error = pthread_cond_broadcast(&cond)) != 0) {
281 assert_vmsg(error != 0, "pthread_cond_broadcast() %s", strerror(error));
282 }
283 }
284
285 if ((error = pthread_mutex_unlock(&mutex)) != 0) {
286 }
287
288 return true;
289 }
290
291 memcached_st *memcached_pool_fetch(memcached_pool_st *pool, struct timespec *relative_time,
292 memcached_return_t *rc) {
293 if (pool == NULL) {
294 return NULL;
295 }
296
297 memcached_return_t unused;
298 if (rc == NULL) {
299 rc = &unused;
300 }
301
302 if (relative_time == NULL) {
303 return pool->fetch(*rc);
304 }
305
306 return pool->fetch(*relative_time, *rc);
307 }
308
309 memcached_st *memcached_pool_pop(memcached_pool_st *pool, bool block, memcached_return_t *rc) {
310 if (pool == NULL) {
311 return NULL;
312 }
313
314 memcached_return_t unused;
315 if (rc == NULL) {
316 rc = &unused;
317 }
318
319 memcached_st *memc;
320 if (block) {
321 memc = pool->fetch(pool->timeout(), *rc);
322 } else {
323 memc = pool->fetch(*rc);
324 }
325
326 return memc;
327 }
328
329 memcached_return_t memcached_pool_release(memcached_pool_st *pool, memcached_st *released) {
330 if (pool == NULL) {
331 return MEMCACHED_INVALID_ARGUMENTS;
332 }
333
334 memcached_return_t rc;
335
336 (void) pool->release(released, rc);
337
338 return rc;
339 }
340
341 memcached_return_t memcached_pool_push(memcached_pool_st *pool, memcached_st *released) {
342 return memcached_pool_release(pool, released);
343 }
344
345 memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool, memcached_behavior_t flag,
346 uint64_t data) {
347 if (pool == NULL) {
348 return MEMCACHED_INVALID_ARGUMENTS;
349 }
350
351 int error;
352 if ((error = pthread_mutex_lock(&pool->mutex))) {
353 return MEMCACHED_IN_PROGRESS;
354 }
355
356 /* update the master */
357 memcached_return_t rc = memcached_behavior_set(pool->master, flag, data);
358 if (memcached_failed(rc)) {
359 if ((error = pthread_mutex_unlock(&pool->mutex)) != 0) {
360 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
361 }
362 return rc;
363 }
364
365 pool->increment_version();
366 /* update the clones */
367 for (int xx = 0; xx <= pool->firstfree; ++xx) {
368 if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data))) {
369 pool->server_pool[xx]->configure.version = pool->version();
370 } else {
371 memcached_st *memc;
372 if ((memc = memcached_clone(NULL, pool->master))) {
373 memcached_free(pool->server_pool[xx]);
374 pool->server_pool[xx] = memc;
375 /* I'm not sure what to do in this case.. this would happen
376 if we fail to push the server list inside the client..
377 I should add a testcase for this, but I believe the following
378 would work, except that you would add a hole in the pool list..
379 in theory you could end up with an empty pool....
380 */
381 }
382 }
383 }
384
385 if ((error = pthread_mutex_unlock(&pool->mutex)) != 0) {
386 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
387 }
388
389 return rc;
390 }
391
392 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool, memcached_behavior_t flag,
393 uint64_t *value) {
394 if (pool == NULL) {
395 return MEMCACHED_INVALID_ARGUMENTS;
396 }
397
398 int error;
399 if ((error = pthread_mutex_lock(&pool->mutex))) {
400 return MEMCACHED_IN_PROGRESS;
401 }
402
403 *value = memcached_behavior_get(pool->master, flag);
404
405 if ((error = pthread_mutex_unlock(&pool->mutex)) != 0) {
406 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
407 }
408
409 return MEMCACHED_SUCCESS;
410 }