libmemcached: fix #35 (handling of MEMCACHED_BEHAVIOR_REMOVE_FAILED_SERVERS)
[awesomized/libmemcached] / libmemcachedutil / pool.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcachedutil/common.h>
40
41 #include <cassert>
42 #include <cerrno>
43 #include <pthread.h>
44 #include <memory>
45
46 struct memcached_pool_st
47 {
48 pthread_mutex_t mutex;
49 pthread_cond_t cond;
50 memcached_st *master;
51 memcached_st **server_pool;
52 int firstfree;
53 const uint32_t size;
54 uint32_t current_size;
55 bool _owns_master;
56 struct timespec _timeout;
57
58 memcached_pool_st(memcached_st *master_arg, size_t max_arg) :
59 master(master_arg),
60 server_pool(NULL),
61 firstfree(-1),
62 size(uint32_t(max_arg)),
63 current_size(0),
64 _owns_master(false)
65 {
66 pthread_mutex_init(&mutex, NULL);
67 pthread_cond_init(&cond, NULL);
68 _timeout.tv_sec= 5;
69 _timeout.tv_nsec= 0;
70 }
71
72 const struct timespec& timeout() const
73 {
74 return _timeout;
75 }
76
77 bool release(memcached_st*, memcached_return_t& rc);
78
79 memcached_st *fetch(memcached_return_t& rc);
80 memcached_st *fetch(const struct timespec&, memcached_return_t& rc);
81
82 bool init(uint32_t initial);
83
84 ~memcached_pool_st()
85 {
86 for (int x= 0; x <= firstfree; ++x)
87 {
88 memcached_free(server_pool[x]);
89 server_pool[x]= NULL;
90 }
91
92 int error;
93 if ((error= pthread_mutex_destroy(&mutex)) != 0)
94 {
95 assert_vmsg(error != 0, "pthread_mutex_destroy() %s(%d)", strerror(error), error);
96 }
97
98 if ((error= pthread_cond_destroy(&cond)) != 0)
99 {
100 assert_vmsg(error != 0, "pthread_cond_destroy() %s", strerror(error));
101 }
102
103 delete [] server_pool;
104 if (_owns_master)
105 {
106 memcached_free(master);
107 }
108 }
109
110 void increment_version()
111 {
112 ++master->configure.version;
113 }
114
115 bool compare_version(const memcached_st *arg) const
116 {
117 return (arg->configure.version == version());
118 }
119
120 int32_t version() const
121 {
122 return master->configure.version;
123 }
124 };
125
126
127 /**
128 * Grow the connection pool by creating a connection structure and clone the
129 * original memcached handle.
130 */
131 static bool grow_pool(memcached_pool_st* pool)
132 {
133 assert(pool);
134
135 memcached_st *obj;
136 if (not (obj= memcached_clone(NULL, pool->master)))
137 {
138 return false;
139 }
140
141 pool->server_pool[++pool->firstfree]= obj;
142 pool->current_size++;
143 obj->configure.version= pool->version();
144
145 return true;
146 }
147
148 bool memcached_pool_st::init(uint32_t initial)
149 {
150 server_pool= new (std::nothrow) memcached_st *[size];
151 if (server_pool == NULL)
152 {
153 return false;
154 }
155
156 /*
157 Try to create the initial size of the pool. An allocation failure at
158 this time is not fatal..
159 */
160 for (unsigned int x= 0; x < initial; ++x)
161 {
162 if (grow_pool(this) == false)
163 {
164 break;
165 }
166 }
167
168 return true;
169 }
170
171
172 static inline memcached_pool_st *_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
173 {
174 if (initial == 0 or max == 0 or (initial > max))
175 {
176 return NULL;
177 }
178
179 memcached_pool_st *object= new (std::nothrow) memcached_pool_st(master, max);
180 if (object == NULL)
181 {
182 return NULL;
183 }
184
185 /*
186 Try to create the initial size of the pool. An allocation failure at
187 this time is not fatal..
188 */
189 if (not object->init(initial))
190 {
191 delete object;
192 return NULL;
193 }
194
195 return object;
196 }
197
198 memcached_pool_st *memcached_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
199 {
200 return _pool_create(master, initial, max);
201 }
202
203 memcached_pool_st * memcached_pool(const char *option_string, size_t option_string_length)
204 {
205 memcached_st *memc= memcached(option_string, option_string_length);
206
207 if (memc == NULL)
208 {
209 return NULL;
210 }
211
212 memcached_pool_st *self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
213 if (self == NULL)
214 {
215 memcached_free(memc);
216 return NULL;
217 }
218
219 self->_owns_master= true;
220
221 return self;
222 }
223
224 memcached_st* memcached_pool_destroy(memcached_pool_st* pool)
225 {
226 if (pool == NULL)
227 {
228 return NULL;
229 }
230
231 // Legacy that we return the original structure
232 memcached_st *ret= NULL;
233 if (pool->_owns_master)
234 { }
235 else
236 {
237 ret= pool->master;
238 }
239
240 delete pool;
241
242 return ret;
243 }
244
245 memcached_st* memcached_pool_st::fetch(memcached_return_t& rc)
246 {
247 static struct timespec relative_time= { 0, 0 };
248 return fetch(relative_time, rc);
249 }
250
251 memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, memcached_return_t& rc)
252 {
253 rc= MEMCACHED_SUCCESS;
254
255 int error;
256 if ((error= pthread_mutex_lock(&mutex)) != 0)
257 {
258 rc= MEMCACHED_IN_PROGRESS;
259 return NULL;
260 }
261
262 memcached_st *ret= NULL;
263 do
264 {
265 if (firstfree > -1)
266 {
267 ret= server_pool[firstfree--];
268 }
269 else if (current_size == size)
270 {
271 if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0)
272 {
273 error= pthread_mutex_unlock(&mutex);
274 rc= MEMCACHED_NOTFOUND;
275
276 return NULL;
277 }
278
279 struct timespec time_to_wait= {0, 0};
280 time_to_wait.tv_sec= time(NULL) +relative_time.tv_sec;
281 time_to_wait.tv_nsec= relative_time.tv_nsec;
282
283 int thread_ret;
284 if ((thread_ret= pthread_cond_timedwait(&cond, &mutex, &time_to_wait)) != 0)
285 {
286 int unlock_error;
287 if ((unlock_error= pthread_mutex_unlock(&mutex)) != 0)
288 {
289 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
290 }
291
292 if (thread_ret == ETIMEDOUT)
293 {
294 rc= MEMCACHED_TIMEOUT;
295 }
296 else
297 {
298 errno= thread_ret;
299 rc= MEMCACHED_ERRNO;
300 }
301
302 return NULL;
303 }
304 }
305 else if (grow_pool(this) == false)
306 {
307 int unlock_error;
308 if ((unlock_error= pthread_mutex_unlock(&mutex)) != 0)
309 {
310 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
311 }
312
313 return NULL;
314 }
315 } while (ret == NULL);
316
317 if ((error= pthread_mutex_unlock(&mutex)) != 0)
318 {
319 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
320 }
321
322 return ret;
323 }
324
325 bool memcached_pool_st::release(memcached_st *released, memcached_return_t& rc)
326 {
327 rc= MEMCACHED_SUCCESS;
328 if (released == NULL)
329 {
330 rc= MEMCACHED_INVALID_ARGUMENTS;
331 return false;
332 }
333
334 int error;
335 if ((error= pthread_mutex_lock(&mutex)))
336 {
337 rc= MEMCACHED_IN_PROGRESS;
338 return false;
339 }
340
341 /*
342 Someone updated the behavior on the object, so we clone a new memcached_st with the new settings. If we fail to clone, we keep the old one around.
343 */
344 if (compare_version(released) == false)
345 {
346 memcached_st *memc;
347 if ((memc= memcached_clone(NULL, master)))
348 {
349 memcached_free(released);
350 released= memc;
351 }
352 }
353
354 server_pool[++firstfree]= released;
355
356 if (firstfree == 0 and current_size == size)
357 {
358 /* we might have people waiting for a connection.. wake them up :-) */
359 if ((error= pthread_cond_broadcast(&cond)) != 0)
360 {
361 assert_vmsg(error != 0, "pthread_cond_broadcast() %s", strerror(error));
362 }
363 }
364
365 if ((error= pthread_mutex_unlock(&mutex)) != 0)
366 {
367 }
368
369 return true;
370 }
371
372 memcached_st* memcached_pool_fetch(memcached_pool_st* pool, struct timespec* relative_time, memcached_return_t* rc)
373 {
374 if (pool == NULL)
375 {
376 return NULL;
377 }
378
379 memcached_return_t unused;
380 if (rc == NULL)
381 {
382 rc= &unused;
383 }
384
385 if (relative_time == NULL)
386 {
387 return pool->fetch(*rc);
388 }
389
390 return pool->fetch(*relative_time, *rc);
391 }
392
393 memcached_st* memcached_pool_pop(memcached_pool_st* pool,
394 bool block,
395 memcached_return_t *rc)
396 {
397 if (pool == NULL)
398 {
399 return NULL;
400 }
401
402 memcached_return_t unused;
403 if (rc == NULL)
404 {
405 rc= &unused;
406 }
407
408 memcached_st *memc;
409 if (block)
410 {
411 memc= pool->fetch(pool->timeout(), *rc);
412 }
413 else
414 {
415 memc= pool->fetch(*rc);
416 }
417
418 return memc;
419 }
420
421 memcached_return_t memcached_pool_release(memcached_pool_st* pool, memcached_st *released)
422 {
423 if (pool == NULL)
424 {
425 return MEMCACHED_INVALID_ARGUMENTS;
426 }
427
428 memcached_return_t rc;
429
430 (void) pool->release(released, rc);
431
432 return rc;
433 }
434
435 memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
436 {
437 return memcached_pool_release(pool, released);
438 }
439
440
441 memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
442 memcached_behavior_t flag,
443 uint64_t data)
444 {
445 if (pool == NULL)
446 {
447 return MEMCACHED_INVALID_ARGUMENTS;
448 }
449
450 int error;
451 if ((error= pthread_mutex_lock(&pool->mutex)))
452 {
453 return MEMCACHED_IN_PROGRESS;
454 }
455
456 /* update the master */
457 memcached_return_t rc= memcached_behavior_set(pool->master, flag, data);
458 if (memcached_failed(rc))
459 {
460 if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
461 {
462 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
463 }
464 return rc;
465 }
466
467 pool->increment_version();
468 /* update the clones */
469 for (int xx= 0; xx <= pool->firstfree; ++xx)
470 {
471 if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data)))
472 {
473 pool->server_pool[xx]->configure.version= pool->version();
474 }
475 else
476 {
477 memcached_st *memc;
478 if ((memc= memcached_clone(NULL, pool->master)))
479 {
480 memcached_free(pool->server_pool[xx]);
481 pool->server_pool[xx]= memc;
482 /* I'm not sure what to do in this case.. this would happen
483 if we fail to push the server list inside the client..
484 I should add a testcase for this, but I believe the following
485 would work, except that you would add a hole in the pool list..
486 in theory you could end up with an empty pool....
487 */
488 }
489 }
490 }
491
492 if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
493 {
494 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
495 }
496
497 return rc;
498 }
499
500 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
501 memcached_behavior_t flag,
502 uint64_t *value)
503 {
504 if (pool == NULL)
505 {
506 return MEMCACHED_INVALID_ARGUMENTS;
507 }
508
509 int error;
510 if ((error= pthread_mutex_lock(&pool->mutex)))
511 {
512 return MEMCACHED_IN_PROGRESS;
513 }
514
515 *value= memcached_behavior_get(pool->master, flag);
516
517 if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
518 {
519 assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
520 }
521
522 return MEMCACHED_SUCCESS;
523 }