Extend pool API for fetch/release. Fix concurrency issue in test case.
[awesomized/libmemcached] / libmemcached / util / pool.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40 #include <libmemcached/memcached_util.h>
41
42 #include <libmemcached/error.hpp>
43
44 #include <cassert>
45 #include <cerrno>
46 #include <pthread.h>
47 #include <memory>
48
49 struct memcached_pool_st
50 {
51 pthread_mutex_t mutex;
52 pthread_cond_t cond;
53 memcached_st *master;
54 memcached_st **server_pool;
55 int firstfree;
56 const uint32_t size;
57 uint32_t current_size;
58 bool _owns_master;
59 struct timespec _timeout;
60
61 memcached_pool_st(memcached_st *master_arg, size_t max_arg) :
62 master(master_arg),
63 server_pool(NULL),
64 firstfree(-1),
65 size(max_arg),
66 current_size(0),
67 _owns_master(false)
68 {
69 pthread_mutex_init(&mutex, NULL);
70 pthread_cond_init(&cond, NULL);
71 _timeout.tv_sec= 5;
72 _timeout.tv_nsec= 0;
73 }
74
75 const struct timespec& timeout() const
76 {
77 return _timeout;
78 }
79
80 bool release(memcached_st*, memcached_return_t& rc);
81
82 memcached_st *fetch(memcached_return_t& rc);
83 memcached_st *fetch(const struct timespec&, memcached_return_t& rc);
84
85 bool init(uint32_t initial);
86
87 ~memcached_pool_st()
88 {
89 for (int x= 0; x <= firstfree; ++x)
90 {
91 memcached_free(server_pool[x]);
92 server_pool[x] = NULL;
93 }
94
95 pthread_mutex_destroy(&mutex);
96 pthread_cond_destroy(&cond);
97 delete [] server_pool;
98 if (_owns_master)
99 {
100 memcached_free(master);
101 }
102 }
103
104 void increment_version()
105 {
106 ++master->configure.version;
107 }
108
109 bool compare_version(const memcached_st *arg) const
110 {
111 return (arg->configure.version == version());
112 }
113
114 int32_t version() const
115 {
116 return master->configure.version;
117 }
118 };
119
120
121 /**
122 * Grow the connection pool by creating a connection structure and clone the
123 * original memcached handle.
124 */
125 static bool grow_pool(memcached_pool_st* pool)
126 {
127 assert(pool);
128
129 memcached_st *obj;
130 if (not (obj= memcached_clone(NULL, pool->master)))
131 {
132 return false;
133 }
134
135 pool->server_pool[++pool->firstfree]= obj;
136 pool->current_size++;
137 obj->configure.version= pool->version();
138
139 return true;
140 }
141
142 bool memcached_pool_st::init(uint32_t initial)
143 {
144 server_pool= new (std::nothrow) memcached_st *[size];
145 if (not server_pool)
146 return false;
147
148 /*
149 Try to create the initial size of the pool. An allocation failure at
150 this time is not fatal..
151 */
152 for (unsigned int x= 0; x < initial; ++x)
153 {
154 if (grow_pool(this) == false)
155 {
156 break;
157 }
158 }
159
160 return true;
161 }
162
163
164 static inline memcached_pool_st *_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
165 {
166 if (initial == 0 or max == 0 or (initial > max))
167 {
168 return NULL;
169 }
170
171 memcached_pool_st *object= new (std::nothrow) memcached_pool_st(master, max);
172 if (object == NULL)
173 {
174 return NULL;
175 }
176
177 /*
178 Try to create the initial size of the pool. An allocation failure at
179 this time is not fatal..
180 */
181 if (not object->init(initial))
182 {
183 delete object;
184 return NULL;
185 }
186
187 return object;
188 }
189
190 memcached_pool_st *memcached_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
191 {
192 return _pool_create(master, initial, max);
193 }
194
195 memcached_pool_st * memcached_pool(const char *option_string, size_t option_string_length)
196 {
197 memcached_st *memc= memcached(option_string, option_string_length);
198
199 if (memc == NULL)
200 {
201 return NULL;
202 }
203
204 memcached_pool_st *self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
205 if (self == NULL)
206 {
207 memcached_free(memc);
208 return NULL;
209 }
210
211 self->_owns_master= true;
212
213 return self;
214 }
215
216 memcached_st* memcached_pool_destroy(memcached_pool_st* pool)
217 {
218 if (pool == NULL)
219 {
220 return NULL;
221 }
222
223 // Legacy that we return the original structure
224 memcached_st *ret= NULL;
225 if (pool->_owns_master)
226 { }
227 else
228 {
229 ret= pool->master;
230 }
231
232 delete pool;
233
234 return ret;
235 }
236
237 memcached_st* memcached_pool_st::fetch(memcached_return_t& rc)
238 {
239 static struct timespec relative_time= { 0, 0 };
240 return fetch(relative_time, rc);
241 }
242
243 memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, memcached_return_t& rc)
244 {
245 rc= MEMCACHED_SUCCESS;
246
247 if (pthread_mutex_lock(&mutex))
248 {
249 rc= MEMCACHED_IN_PROGRESS;
250 return NULL;
251 }
252
253 memcached_st *ret= NULL;
254 do
255 {
256 if (firstfree > -1)
257 {
258 ret= server_pool[firstfree--];
259 }
260 else if (current_size == size)
261 {
262 if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0)
263 {
264 pthread_mutex_unlock(&mutex);
265 rc= MEMCACHED_NOTFOUND;
266
267 return NULL;
268 }
269
270 struct timespec time_to_wait= {0, 0};
271 time_to_wait.tv_sec= time(NULL) +relative_time.tv_sec;
272 time_to_wait.tv_nsec= relative_time.tv_nsec;
273
274 int thread_ret;
275 if ((thread_ret= pthread_cond_timedwait(&cond, &mutex, &time_to_wait)) != 0)
276 {
277 pthread_mutex_unlock(&mutex);
278
279 if (thread_ret == ETIMEDOUT)
280 {
281 rc= MEMCACHED_TIMEOUT;
282 }
283 else
284 {
285 errno= thread_ret;
286 rc= MEMCACHED_ERRNO;
287 }
288
289 return NULL;
290 }
291 }
292 else if (grow_pool(this) == false)
293 {
294 (void)pthread_mutex_unlock(&mutex);
295 return NULL;
296 }
297 } while (ret == NULL);
298
299 pthread_mutex_unlock(&mutex);
300
301 return ret;
302 }
303
304 bool memcached_pool_st::release(memcached_st *released, memcached_return_t& rc)
305 {
306 rc= MEMCACHED_SUCCESS;
307 if (released == NULL)
308 {
309 rc= MEMCACHED_INVALID_ARGUMENTS;
310 return false;
311 }
312
313 if (pthread_mutex_lock(&mutex))
314 {
315 rc= MEMCACHED_IN_PROGRESS;
316 return false;
317 }
318
319 /*
320 Someone updated the behavior on the object, so we clone a new memcached_st with the new settings. If we fail to clone, we keep the old one around.
321 */
322 if (compare_version(released) == false)
323 {
324 memcached_st *memc;
325 if ((memc= memcached_clone(NULL, master)))
326 {
327 memcached_free(released);
328 released= memc;
329 }
330 }
331
332 server_pool[++firstfree]= released;
333
334 if (firstfree == 0 and current_size == size)
335 {
336 /* we might have people waiting for a connection.. wake them up :-) */
337 pthread_cond_broadcast(&cond);
338 }
339
340 (void)pthread_mutex_unlock(&mutex);
341
342 return true;
343 }
344
345 memcached_st* memcached_pool_fetch(memcached_pool_st* pool, struct timespec* relative_time, memcached_return_t* rc)
346 {
347 if (pool == NULL)
348 {
349 return NULL;
350 }
351
352 memcached_return_t unused;
353 if (rc == NULL)
354 {
355 rc= &unused;
356 }
357
358 if (relative_time == NULL)
359 {
360 return pool->fetch(*rc);
361 }
362
363 return pool->fetch(*relative_time, *rc);
364 }
365
366 memcached_st* memcached_pool_pop(memcached_pool_st* pool,
367 bool block,
368 memcached_return_t *rc)
369 {
370 if (pool == NULL)
371 {
372 return NULL;
373 }
374
375 memcached_return_t unused;
376 if (rc == NULL)
377 {
378 rc= &unused;
379 }
380
381 memcached_st *memc;
382 if (block)
383 {
384 memc= pool->fetch(pool->timeout(), *rc);
385 }
386 else
387 {
388 memc= pool->fetch(*rc);
389 }
390
391 return memc;
392 }
393
394 memcached_return_t memcached_pool_release(memcached_pool_st* pool, memcached_st *released)
395 {
396 if (pool == NULL)
397 {
398 return MEMCACHED_INVALID_ARGUMENTS;
399 }
400
401 memcached_return_t rc;
402
403 (void) pool->release(released, rc);
404
405 return rc;
406 }
407
408 memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
409 {
410 return memcached_pool_release(pool, released);
411 }
412
413
414 memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
415 memcached_behavior_t flag,
416 uint64_t data)
417 {
418 if (pool == NULL)
419 {
420 return MEMCACHED_INVALID_ARGUMENTS;
421 }
422
423 if (pthread_mutex_lock(&pool->mutex))
424 {
425 return MEMCACHED_IN_PROGRESS;
426 }
427
428 /* update the master */
429 memcached_return_t rc= memcached_behavior_set(pool->master, flag, data);
430 if (memcached_failed(rc))
431 {
432 (void)pthread_mutex_unlock(&pool->mutex);
433 return rc;
434 }
435
436 pool->increment_version();
437 /* update the clones */
438 for (int xx= 0; xx <= pool->firstfree; ++xx)
439 {
440 if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data)))
441 {
442 pool->server_pool[xx]->configure.version= pool->version();
443 }
444 else
445 {
446 memcached_st *memc;
447 if ((memc= memcached_clone(NULL, pool->master)))
448 {
449 memcached_free(pool->server_pool[xx]);
450 pool->server_pool[xx]= memc;
451 /* I'm not sure what to do in this case.. this would happen
452 if we fail to push the server list inside the client..
453 I should add a testcase for this, but I believe the following
454 would work, except that you would add a hole in the pool list..
455 in theory you could end up with an empty pool....
456 */
457 }
458 }
459 }
460
461 (void)pthread_mutex_unlock(&pool->mutex);
462
463 return rc;
464 }
465
466 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
467 memcached_behavior_t flag,
468 uint64_t *value)
469 {
470 if (pool == NULL)
471 {
472 return MEMCACHED_INVALID_ARGUMENTS;
473 }
474
475 if (pthread_mutex_lock(&pool->mutex))
476 {
477 return MEMCACHED_IN_PROGRESS;
478 }
479
480 *value= memcached_behavior_get(pool->master, flag);
481
482 (void)pthread_mutex_unlock(&pool->mutex);
483
484 return MEMCACHED_SUCCESS;
485 }