b57cc761665c94f25a49fb9731fdc68c566b7dc2
[m6w6/libmemcached] / src / libmemcachedutil / pool.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcachedutil/common.h"
17
18 #include <cassert>
19 #include <cerrno>
20 #include <pthread.h>
21 #include <memory>
22
23 struct memcached_pool_st {
24 pthread_mutex_t mutex;
25 pthread_cond_t cond;
26 memcached_st *master;
27 memcached_st **server_pool;
28 int firstfree;
29 const uint32_t size;
30 uint32_t current_size;
31 bool _owns_master;
32 struct timespec _timeout;
33
34 memcached_pool_st(memcached_st *master_arg, size_t max_arg)
35 : master(master_arg)
36 , server_pool(NULL)
37 , firstfree(-1)
38 , size(uint32_t(max_arg))
39 , current_size(0)
40 , _owns_master(false) {
41 pthread_mutex_init(&mutex, NULL);
42 pthread_cond_init(&cond, NULL);
43 _timeout.tv_sec = 5;
44 _timeout.tv_nsec = 0;
45 }
46
47 const struct timespec &timeout() const {
48 return _timeout;
49 }
50
51 bool release(memcached_st *, memcached_return_t &rc);
52
53 memcached_st *fetch(memcached_return_t &rc);
54 memcached_st *fetch(const struct timespec &, memcached_return_t &rc);
55
56 bool init(uint32_t initial);
57
58 ~memcached_pool_st() {
59 for (int x = 0; x <= firstfree; ++x) {
60 memcached_free(server_pool[x]);
61 server_pool[x] = NULL;
62 }
63
64 int error;
65 if ((error = pthread_mutex_destroy(&mutex))) {
66 assert_vmsg(error, "pthread_mutex_destroy() %s(%d)", strerror(error), error);
67 }
68
69 if ((error = pthread_cond_destroy(&cond))) {
70 assert_vmsg(error, "pthread_cond_destroy() %s", strerror(error));
71 }
72
73 delete[] server_pool;
74 if (_owns_master) {
75 memcached_free(master);
76 }
77 }
78
79 void increment_version() {
80 ++master->configure.version;
81 }
82
83 bool compare_version(const memcached_st *arg) const {
84 return (arg->configure.version == version());
85 }
86
87 int32_t version() const {
88 return master->configure.version;
89 }
90 };
91
92 /**
93 * Grow the connection pool by creating a connection structure and clone the
94 * original memcached handle.
95 */
96 static bool grow_pool(memcached_pool_st *pool) {
97 assert(pool);
98
99 memcached_st *obj;
100 if (not(obj = memcached_clone(NULL, pool->master))) {
101 return false;
102 }
103
104 pool->server_pool[++pool->firstfree] = obj;
105 pool->current_size++;
106 obj->configure.version = pool->version();
107
108 return true;
109 }
110
111 bool memcached_pool_st::init(uint32_t initial) {
112 server_pool = new (std::nothrow) memcached_st *[size];
113 if (server_pool == NULL) {
114 return false;
115 }
116
117 /*
118 Try to create the initial size of the pool. An allocation failure at
119 this time is not fatal..
120 */
121 for (unsigned int x = 0; x < initial; ++x) {
122 if (grow_pool(this) == false) {
123 break;
124 }
125 }
126
127 return true;
128 }
129
130 static inline memcached_pool_st *_pool_create(memcached_st *master, uint32_t initial,
131 uint32_t max) {
132 if (initial == 0 or max == 0 or (initial > max)) {
133 return NULL;
134 }
135
136 memcached_pool_st *object = new (std::nothrow) memcached_pool_st(master, max);
137 if (object == NULL) {
138 return NULL;
139 }
140
141 /*
142 Try to create the initial size of the pool. An allocation failure at
143 this time is not fatal..
144 */
145 if (not object->init(initial)) {
146 delete object;
147 return NULL;
148 }
149
150 return object;
151 }
152
153 memcached_pool_st *memcached_pool_create(memcached_st *master, uint32_t initial, uint32_t max) {
154 return _pool_create(master, initial, max);
155 }
156
157 memcached_pool_st *memcached_pool(const char *option_string, size_t option_string_length) {
158 memcached_st *memc = memcached(option_string, option_string_length);
159
160 if (memc == NULL) {
161 return NULL;
162 }
163
164 memcached_pool_st *self =
165 memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
166 if (self == NULL) {
167 memcached_free(memc);
168 return NULL;
169 }
170
171 self->_owns_master = true;
172
173 return self;
174 }
175
176 memcached_st *memcached_pool_destroy(memcached_pool_st *pool) {
177 if (pool == NULL) {
178 return NULL;
179 }
180
181 // Legacy that we return the original structure
182 memcached_st *ret = NULL;
183 if (pool->_owns_master) {
184 } else {
185 ret = pool->master;
186 }
187
188 delete pool;
189
190 return ret;
191 }
192
193 memcached_st *memcached_pool_st::fetch(memcached_return_t &rc) {
194 static struct timespec relative_time = {0, 0};
195 return fetch(relative_time, rc);
196 }
197
198 memcached_st *memcached_pool_st::fetch(const struct timespec &relative_time,
199 memcached_return_t &rc) {
200 rc = MEMCACHED_SUCCESS;
201
202 int error;
203 if ((error = pthread_mutex_lock(&mutex))) {
204 rc = MEMCACHED_IN_PROGRESS;
205 return NULL;
206 }
207
208 memcached_st *ret = NULL;
209 do {
210 if (firstfree > -1) {
211 ret = server_pool[firstfree--];
212 } else if (current_size == size) {
213 if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0) {
214 error = pthread_mutex_unlock(&mutex);
215 rc = MEMCACHED_NOTFOUND;
216
217 return NULL;
218 }
219
220 struct timespec time_to_wait = {0, 0};
221 time_to_wait.tv_sec = time(NULL) + relative_time.tv_sec;
222 time_to_wait.tv_nsec = relative_time.tv_nsec;
223
224 int thread_ret;
225 if ((thread_ret = pthread_cond_timedwait(&cond, &mutex, &time_to_wait))) {
226 int unlock_error;
227 if ((unlock_error = pthread_mutex_unlock(&mutex))) {
228 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
229 }
230
231 if (thread_ret == ETIMEDOUT) {
232 rc = MEMCACHED_TIMEOUT;
233 } else {
234 errno = thread_ret;
235 rc = MEMCACHED_ERRNO;
236 }
237
238 return NULL;
239 }
240 } else if (grow_pool(this) == false) {
241 int unlock_error;
242 if ((unlock_error = pthread_mutex_unlock(&mutex))) {
243 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
244 }
245
246 return NULL;
247 }
248 } while (ret == NULL);
249
250 if ((error = pthread_mutex_unlock(&mutex))) {
251 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
252 }
253
254 return ret;
255 }
256
257 bool memcached_pool_st::release(memcached_st *released, memcached_return_t &rc) {
258 rc = MEMCACHED_SUCCESS;
259 if (released == NULL) {
260 rc = MEMCACHED_INVALID_ARGUMENTS;
261 return false;
262 }
263
264 int error;
265 if ((error = pthread_mutex_lock(&mutex))) {
266 rc = MEMCACHED_IN_PROGRESS;
267 return false;
268 }
269
270 /*
271 Someone updated the behavior on the object, so we clone a new memcached_st with the new
272 settings. If we fail to clone, we keep the old one around.
273 */
274 if (compare_version(released) == false) {
275 memcached_st *memc;
276 if ((memc = memcached_clone(NULL, master))) {
277 memcached_free(released);
278 released = memc;
279 }
280 }
281
282 server_pool[++firstfree] = released;
283
284 if (firstfree == 0 and current_size == size) {
285 /* we might have people waiting for a connection.. wake them up :-) */
286 if ((error = pthread_cond_broadcast(&cond))) {
287 assert_vmsg(error, "pthread_cond_broadcast() %s", strerror(error));
288 }
289 }
290
291 if ((error = pthread_mutex_unlock(&mutex))) {
292 }
293
294 return true;
295 }
296
297 memcached_st *memcached_pool_fetch(memcached_pool_st *pool, struct timespec *relative_time,
298 memcached_return_t *rc) {
299 if (pool == NULL) {
300 return NULL;
301 }
302
303 memcached_return_t unused;
304 if (rc == NULL) {
305 rc = &unused;
306 }
307
308 if (relative_time == NULL) {
309 return pool->fetch(*rc);
310 }
311
312 return pool->fetch(*relative_time, *rc);
313 }
314
315 memcached_st *memcached_pool_pop(memcached_pool_st *pool, bool block, memcached_return_t *rc) {
316 if (pool == NULL) {
317 return NULL;
318 }
319
320 memcached_return_t unused;
321 if (rc == NULL) {
322 rc = &unused;
323 }
324
325 memcached_st *memc;
326 if (block) {
327 memc = pool->fetch(pool->timeout(), *rc);
328 } else {
329 memc = pool->fetch(*rc);
330 }
331
332 return memc;
333 }
334
335 memcached_return_t memcached_pool_release(memcached_pool_st *pool, memcached_st *released) {
336 if (pool == NULL) {
337 return MEMCACHED_INVALID_ARGUMENTS;
338 }
339
340 memcached_return_t rc;
341
342 (void) pool->release(released, rc);
343
344 return rc;
345 }
346
347 memcached_return_t memcached_pool_push(memcached_pool_st *pool, memcached_st *released) {
348 return memcached_pool_release(pool, released);
349 }
350
351 memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool, memcached_behavior_t flag,
352 uint64_t data) {
353 if (pool == NULL) {
354 return MEMCACHED_INVALID_ARGUMENTS;
355 }
356
357 int error;
358 if ((error = pthread_mutex_lock(&pool->mutex))) {
359 return MEMCACHED_IN_PROGRESS;
360 }
361
362 /* update the master */
363 memcached_return_t rc = memcached_behavior_set(pool->master, flag, data);
364 if (memcached_failed(rc)) {
365 if ((error = pthread_mutex_unlock(&pool->mutex))) {
366 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
367 }
368 return rc;
369 }
370
371 pool->increment_version();
372 /* update the clones */
373 for (int xx = 0; xx <= pool->firstfree; ++xx) {
374 if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data))) {
375 pool->server_pool[xx]->configure.version = pool->version();
376 } else {
377 memcached_st *memc;
378 if ((memc = memcached_clone(NULL, pool->master))) {
379 memcached_free(pool->server_pool[xx]);
380 pool->server_pool[xx] = memc;
381 /* I'm not sure what to do in this case.. this would happen
382 if we fail to push the server list inside the client..
383 I should add a testcase for this, but I believe the following
384 would work, except that you would add a hole in the pool list..
385 in theory you could end up with an empty pool....
386 */
387 }
388 }
389 }
390
391 if ((error = pthread_mutex_unlock(&pool->mutex))) {
392 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
393 }
394
395 return rc;
396 }
397
398 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool, memcached_behavior_t flag,
399 uint64_t *value) {
400 if (pool == NULL) {
401 return MEMCACHED_INVALID_ARGUMENTS;
402 }
403
404 int error;
405 if ((error = pthread_mutex_lock(&pool->mutex))) {
406 return MEMCACHED_IN_PROGRESS;
407 }
408
409 *value = memcached_behavior_get(pool->master, flag);
410
411 if ((error = pthread_mutex_unlock(&pool->mutex))) {
412 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
413 }
414
415 return MEMCACHED_SUCCESS;
416 }