c2185141b6c3e6cec7d788c8c2cf4c6816299f87
[m6w6/libmemcached] / src / libmemcachedutil / pool.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcachedutil/common.h"
17
18 #include <alloca.h>
19 #include <cassert>
20 #include <cerrno>
21 #include <cstring>
22 #include <pthread.h>
23 #include <memory>
24
25 struct memcached_pool_st {
26 pthread_mutex_t mutex;
27 pthread_cond_t cond;
28 memcached_st *master;
29 memcached_st **server_pool;
30 int firstfree;
31 const uint32_t size;
32 uint32_t current_size;
33 bool _owns_master;
34 struct timespec _timeout;
35
36 memcached_pool_st(memcached_st *master_arg, size_t max_arg)
37 : master(master_arg)
38 , server_pool(NULL)
39 , firstfree(-1)
40 , size(uint32_t(max_arg))
41 , current_size(0)
42 , _owns_master(false) {
43 pthread_mutex_init(&mutex, NULL);
44 pthread_cond_init(&cond, NULL);
45 _timeout.tv_sec = 5;
46 _timeout.tv_nsec = 0;
47 }
48
49 const struct timespec &timeout() const {
50 return _timeout;
51 }
52
53 bool release(memcached_st *, memcached_return_t &rc);
54
55 memcached_st *fetch(memcached_return_t &rc);
56 memcached_st *fetch(const struct timespec &, memcached_return_t &rc);
57
58 bool init(uint32_t initial);
59
60 ~memcached_pool_st() {
61 for (int x = 0; x <= firstfree; ++x) {
62 memcached_free(server_pool[x]);
63 server_pool[x] = NULL;
64 }
65
66 int error;
67 if ((error = pthread_mutex_destroy(&mutex))) {
68 assert_vmsg(error, "pthread_mutex_destroy() %s(%d)", strerror(error), error);
69 }
70
71 if ((error = pthread_cond_destroy(&cond))) {
72 assert_vmsg(error, "pthread_cond_destroy() %s", strerror(error));
73 }
74
75 delete[] server_pool;
76 if (_owns_master) {
77 memcached_free(master);
78 }
79 }
80
81 void increment_version() {
82 ++master->configure.version;
83 }
84
85 bool compare_version(const memcached_st *arg) const {
86 return (arg->configure.version == version());
87 }
88
89 int32_t version() const {
90 return master->configure.version;
91 }
92 };
93
94 /**
95 * Grow the connection pool by creating a connection structure and clone the
96 * original memcached handle.
97 */
98 static bool grow_pool(memcached_pool_st *pool) {
99 assert(pool);
100
101 memcached_st *obj;
102 if (not(obj = memcached_clone(NULL, pool->master))) {
103 return false;
104 }
105
106 pool->server_pool[++pool->firstfree] = obj;
107 pool->current_size++;
108 obj->configure.version = pool->version();
109
110 return true;
111 }
112
113 bool memcached_pool_st::init(uint32_t initial) {
114 server_pool = new (std::nothrow) memcached_st *[size];
115 if (server_pool == NULL) {
116 return false;
117 }
118
119 /*
120 Try to create the initial size of the pool. An allocation failure at
121 this time is not fatal..
122 */
123 for (unsigned int x = 0; x < initial; ++x) {
124 if (grow_pool(this) == false) {
125 break;
126 }
127 }
128
129 return true;
130 }
131
132 static inline memcached_pool_st *_pool_create(memcached_st *master, uint32_t initial,
133 uint32_t max) {
134 if (initial == 0 or max == 0 or (initial > max)) {
135 return NULL;
136 }
137
138 memcached_pool_st *object = new (std::nothrow) memcached_pool_st(master, max);
139 if (object == NULL) {
140 return NULL;
141 }
142
143 /*
144 Try to create the initial size of the pool. An allocation failure at
145 this time is not fatal..
146 */
147 if (not object->init(initial)) {
148 delete object;
149 return NULL;
150 }
151
152 return object;
153 }
154
155 memcached_pool_st *memcached_pool_create(memcached_st *master, uint32_t initial, uint32_t max) {
156 return _pool_create(master, initial, max);
157 }
158
159 memcached_pool_st *memcached_pool(const char *option_string, size_t option_string_length) {
160 memcached_st *memc = memcached(option_string, option_string_length);
161
162 if (memc == NULL) {
163 return NULL;
164 }
165
166 memcached_pool_st *self =
167 memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
168 if (self == NULL) {
169 memcached_free(memc);
170 return NULL;
171 }
172
173 self->_owns_master = true;
174
175 return self;
176 }
177
178 memcached_st *memcached_pool_destroy(memcached_pool_st *pool) {
179 if (pool == NULL) {
180 return NULL;
181 }
182
183 // Legacy that we return the original structure
184 memcached_st *ret = NULL;
185 if (pool->_owns_master) {
186 } else {
187 ret = pool->master;
188 }
189
190 delete pool;
191
192 return ret;
193 }
194
195 memcached_st *memcached_pool_st::fetch(memcached_return_t &rc) {
196 static struct timespec relative_time = {0, 0};
197 return fetch(relative_time, rc);
198 }
199
200 memcached_st *memcached_pool_st::fetch(const struct timespec &relative_time,
201 memcached_return_t &rc) {
202 rc = MEMCACHED_SUCCESS;
203
204 int error;
205 if ((error = pthread_mutex_lock(&mutex))) {
206 rc = MEMCACHED_IN_PROGRESS;
207 return NULL;
208 }
209
210 memcached_st *ret = NULL;
211 do {
212 if (firstfree > -1) {
213 ret = server_pool[firstfree--];
214 } else if (current_size == size) {
215 if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0) {
216 error = pthread_mutex_unlock(&mutex);
217 rc = MEMCACHED_NOTFOUND;
218
219 return NULL;
220 }
221
222 struct timespec time_to_wait = {0, 0};
223 time_to_wait.tv_sec = time(NULL) + relative_time.tv_sec;
224 time_to_wait.tv_nsec = relative_time.tv_nsec;
225
226 int thread_ret;
227 if ((thread_ret = pthread_cond_timedwait(&cond, &mutex, &time_to_wait))) {
228 int unlock_error;
229 if ((unlock_error = pthread_mutex_unlock(&mutex))) {
230 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
231 }
232
233 if (thread_ret == ETIMEDOUT) {
234 rc = MEMCACHED_TIMEOUT;
235 } else {
236 errno = thread_ret;
237 rc = MEMCACHED_ERRNO;
238 }
239
240 return NULL;
241 }
242 } else if (grow_pool(this) == false) {
243 int unlock_error;
244 if ((unlock_error = pthread_mutex_unlock(&mutex))) {
245 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
246 }
247
248 return NULL;
249 }
250 } while (ret == NULL);
251
252 if ((error = pthread_mutex_unlock(&mutex))) {
253 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
254 }
255
256 return ret;
257 }
258
259 bool memcached_pool_st::release(memcached_st *released, memcached_return_t &rc) {
260 rc = MEMCACHED_SUCCESS;
261 if (released == NULL) {
262 rc = MEMCACHED_INVALID_ARGUMENTS;
263 return false;
264 }
265
266 int error;
267 if ((error = pthread_mutex_lock(&mutex))) {
268 rc = MEMCACHED_IN_PROGRESS;
269 return false;
270 }
271
272 /*
273 Someone updated the behavior on the object, so we clone a new memcached_st with the new
274 settings. If we fail to clone, we keep the old one around.
275 */
276 if (compare_version(released) == false) {
277 memcached_st *memc;
278 if ((memc = memcached_clone(NULL, master))) {
279 memcached_free(released);
280 released = memc;
281 }
282 }
283
284 server_pool[++firstfree] = released;
285
286 if (firstfree == 0 and current_size == size) {
287 /* we might have people waiting for a connection.. wake them up :-) */
288 if ((error = pthread_cond_broadcast(&cond))) {
289 assert_vmsg(error, "pthread_cond_broadcast() %s", strerror(error));
290 }
291 }
292
293 if ((error = pthread_mutex_unlock(&mutex))) {
294 }
295
296 return true;
297 }
298
299 memcached_st *memcached_pool_fetch(memcached_pool_st *pool, struct timespec *relative_time,
300 memcached_return_t *rc) {
301 if (pool == NULL) {
302 return NULL;
303 }
304
305 memcached_return_t unused;
306 if (rc == NULL) {
307 rc = &unused;
308 }
309
310 if (relative_time == NULL) {
311 return pool->fetch(*rc);
312 }
313
314 return pool->fetch(*relative_time, *rc);
315 }
316
317 memcached_st *memcached_pool_pop(memcached_pool_st *pool, bool block, memcached_return_t *rc) {
318 if (pool == NULL) {
319 return NULL;
320 }
321
322 memcached_return_t unused;
323 if (rc == NULL) {
324 rc = &unused;
325 }
326
327 memcached_st *memc;
328 if (block) {
329 memc = pool->fetch(pool->timeout(), *rc);
330 } else {
331 memc = pool->fetch(*rc);
332 }
333
334 return memc;
335 }
336
337 memcached_return_t memcached_pool_release(memcached_pool_st *pool, memcached_st *released) {
338 if (pool == NULL) {
339 return MEMCACHED_INVALID_ARGUMENTS;
340 }
341
342 memcached_return_t rc;
343
344 (void) pool->release(released, rc);
345
346 return rc;
347 }
348
349 memcached_return_t memcached_pool_push(memcached_pool_st *pool, memcached_st *released) {
350 return memcached_pool_release(pool, released);
351 }
352
353 memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool, memcached_behavior_t flag,
354 uint64_t data) {
355 if (pool == NULL) {
356 return MEMCACHED_INVALID_ARGUMENTS;
357 }
358
359 int error;
360 if ((error = pthread_mutex_lock(&pool->mutex))) {
361 return MEMCACHED_IN_PROGRESS;
362 }
363
364 /* update the master */
365 memcached_return_t rc = memcached_behavior_set(pool->master, flag, data);
366 if (memcached_failed(rc)) {
367 if ((error = pthread_mutex_unlock(&pool->mutex))) {
368 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
369 }
370 return rc;
371 }
372
373 pool->increment_version();
374 /* update the clones */
375 for (int xx = 0; xx <= pool->firstfree; ++xx) {
376 if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data))) {
377 pool->server_pool[xx]->configure.version = pool->version();
378 } else {
379 memcached_st *memc;
380 if ((memc = memcached_clone(NULL, pool->master))) {
381 memcached_free(pool->server_pool[xx]);
382 pool->server_pool[xx] = memc;
383 /* I'm not sure what to do in this case.. this would happen
384 if we fail to push the server list inside the client..
385 I should add a testcase for this, but I believe the following
386 would work, except that you would add a hole in the pool list..
387 in theory you could end up with an empty pool....
388 */
389 }
390 }
391 }
392
393 if ((error = pthread_mutex_unlock(&pool->mutex))) {
394 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
395 }
396
397 return rc;
398 }
399
400 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool, memcached_behavior_t flag,
401 uint64_t *value) {
402 if (pool == NULL) {
403 return MEMCACHED_INVALID_ARGUMENTS;
404 }
405
406 int error;
407 if ((error = pthread_mutex_lock(&pool->mutex))) {
408 return MEMCACHED_IN_PROGRESS;
409 }
410
411 *value = memcached_behavior_get(pool->master, flag);
412
413 if ((error = pthread_mutex_unlock(&pool->mutex))) {
414 assert_vmsg(error, "pthread_mutex_unlock() %s", strerror(error));
415 }
416
417 return MEMCACHED_SUCCESS;
418 }