10450dac846897b7b79c4c18427aac0a89cb65df
[m6w6/libmemcached] / tests / libmemcached-1.0 / pool.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached Client and Server
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include <config.h>
39 #include <libtest/test.hpp>
40
41 using namespace libtest;
42
43 #include <vector>
44 #include <iostream>
45 #include <string>
46 #include <cerrno>
47
48 #include <semaphore.h>
49
50 #include <libmemcached-1.0/memcached.h>
51 #include <libmemcachedutil-1.0/util.h>
52 #include <libmemcached/is.h>
53 #include <tests/pool.h>
54
55 #include <pthread.h>
56 #include <poll.h>
57
58 #ifndef __INTEL_COMPILER
59 #pragma GCC diagnostic ignored "-Wstrict-aliasing"
60 #endif
61
62
63 test_return_t memcached_pool_test(memcached_st *)
64 {
65 memcached_return_t rc;
66 const char *config_string= "--SERVER=host10.example.com --SERVER=host11.example.com --SERVER=host10.example.com --POOL-MIN=10 --POOL-MAX=32";
67
68 char buffer[2048];
69 rc= libmemcached_check_configuration(config_string, sizeof(config_string) -1, buffer, sizeof(buffer));
70
71 test_true_got(rc != MEMCACHED_SUCCESS, buffer);
72
73 memcached_pool_st* pool= memcached_pool(config_string, strlen(config_string));
74 test_true_got(pool, strerror(errno));
75
76 memcached_st *memc= memcached_pool_pop(pool, false, &rc);
77
78 test_true(rc == MEMCACHED_SUCCESS);
79 test_true(memc);
80
81 /*
82 Release the memc_ptr that was pulled from the pool
83 */
84 memcached_pool_push(pool, memc);
85
86 /*
87 Destroy the pool.
88 */
89 memcached_pool_destroy(pool);
90
91 return TEST_SUCCESS;
92 }
93
94
95 #define POOL_SIZE 10
96 test_return_t connection_pool_test(memcached_st *memc)
97 {
98 memcached_pool_st* pool= memcached_pool_create(memc, 5, POOL_SIZE);
99 test_true(pool);
100 memcached_st *mmc[POOL_SIZE];
101
102 // Fill up our array that we will store the memc that are in the pool
103 for (size_t x= 0; x < POOL_SIZE; ++x)
104 {
105 memcached_return_t rc;
106 mmc[x]= memcached_pool_fetch(pool, NULL, &rc);
107 test_compare(MEMCACHED_SUCCESS, rc);
108 test_true(mmc[x]);
109 }
110
111 // All memc should be gone
112 {
113 memcached_return_t rc;
114 test_null(memcached_pool_fetch(pool, NULL, &rc));
115 test_compare(MEMCACHED_NOTFOUND, rc);
116 }
117
118 // Release them..
119 for (size_t x= 0; x < POOL_SIZE; ++x)
120 {
121 if (mmc[x])
122 {
123 test_compare(MEMCACHED_SUCCESS, memcached_pool_release(pool, mmc[x]));
124 }
125 }
126 test_true(memcached_pool_destroy(pool) == memc);
127
128 return TEST_SUCCESS;
129 }
130
131 test_return_t connection_pool2_test(memcached_st *memc)
132 {
133 memcached_pool_st* pool= memcached_pool_create(memc, 5, POOL_SIZE);
134 test_true(pool);
135 memcached_st *mmc[POOL_SIZE];
136
137 // Fill up our array that we will store the memc that are in the pool
138 for (size_t x= 0; x < POOL_SIZE; ++x)
139 {
140 memcached_return_t rc;
141 mmc[x]= memcached_pool_fetch(pool, NULL, &rc);
142 test_compare(MEMCACHED_SUCCESS, rc);
143 test_true(mmc[x]);
144 }
145
146 // All memc should be gone
147 {
148 memcached_return_t rc;
149 test_null(memcached_pool_fetch(pool, NULL, &rc));
150 test_compare(MEMCACHED_NOTFOUND, rc);
151 }
152
153 // verify that I can do ops with all connections
154 test_compare(MEMCACHED_SUCCESS,
155 memcached_set(mmc[0],
156 test_literal_param("key"),
157 "0", 1, 0, 0));
158
159 for (uint64_t x= 0; x < POOL_SIZE; ++x)
160 {
161 uint64_t number_value;
162 test_compare(MEMCACHED_SUCCESS,
163 memcached_increment(mmc[x],
164 test_literal_param("key"),
165 1, &number_value));
166 test_compare(number_value, (x+1));
167 }
168
169 // Release them..
170 for (size_t x= 0; x < POOL_SIZE; ++x)
171 {
172 test_compare(MEMCACHED_SUCCESS, memcached_pool_release(pool, mmc[x]));
173 }
174
175
176 /* verify that I can set behaviors on the pool when I don't have all
177 * of the connections in the pool. It should however be enabled
178 * when I push the item into the pool
179 */
180 mmc[0]= memcached_pool_fetch(pool, NULL, NULL);
181 test_true(mmc[0]);
182
183 test_compare(MEMCACHED_SUCCESS,
184 memcached_pool_behavior_set(pool, MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK, 9999));
185
186 {
187 memcached_return_t rc;
188 mmc[1]= memcached_pool_fetch(pool, NULL, &rc);
189 test_true(mmc[1]);
190 test_compare(MEMCACHED_SUCCESS, rc);
191 }
192
193 test_compare(UINT64_C(9999), memcached_behavior_get(mmc[1], MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK));
194 test_compare(MEMCACHED_SUCCESS, memcached_pool_release(pool, mmc[1]));
195 test_compare(MEMCACHED_SUCCESS, memcached_pool_release(pool, mmc[0]));
196
197 {
198 memcached_return_t rc;
199 mmc[0]= memcached_pool_fetch(pool, NULL, &rc);
200 test_true(mmc[0]);
201 test_compare(MEMCACHED_SUCCESS, rc);
202 }
203
204 test_compare(UINT64_C(9999), memcached_behavior_get(mmc[0], MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK));
205 test_compare(MEMCACHED_SUCCESS, memcached_pool_release(pool, mmc[0]));
206
207 test_true(memcached_pool_destroy(pool) == memc);
208
209 return TEST_SUCCESS;
210 }
211
212 struct test_pool_context_st {
213 volatile memcached_return_t rc;
214 memcached_pool_st* pool;
215 memcached_st* mmc;
216 sem_t _lock;
217
218 test_pool_context_st(memcached_pool_st *pool_arg, memcached_st *memc_arg):
219 rc(MEMCACHED_FAILURE),
220 pool(pool_arg),
221 mmc(memc_arg)
222 {
223 sem_init(&_lock, 0, 0);
224 }
225
226 void wait()
227 {
228 sem_wait(&_lock);
229 }
230
231 void release()
232 {
233 sem_post(&_lock);
234 }
235
236 ~test_pool_context_st()
237 {
238 sem_destroy(&_lock);
239 }
240 };
241
242 static void* connection_release(void *arg)
243 {
244 test_pool_context_st *resource= static_cast<test_pool_context_st *>(arg);
245 if (resource == NULL)
246 {
247 fatal_message("resource == NULL");
248 }
249
250 // Release all of the memc we are holding
251 resource->rc= memcached_pool_release(resource->pool, resource->mmc);
252 resource->release();
253
254 pthread_exit(arg);
255 }
256
257 test_return_t connection_pool3_test(memcached_st *memc)
258 {
259 #ifdef __APPLE__
260 return TEST_SKIPPED;
261 #endif
262
263 memcached_pool_st* pool= memcached_pool_create(memc, 1, 1);
264 test_true(pool);
265
266 memcached_st *pool_memc;
267 {
268 memcached_return_t rc;
269 pool_memc= memcached_pool_fetch(pool, NULL, &rc);
270 test_compare(MEMCACHED_SUCCESS, rc);
271 test_true(pool_memc);
272 }
273
274 /*
275 @note This comment was written to describe what was believed to be the original authors intent.
276
277 This portion of the test creates a thread that will wait until told to free a memcached_st
278 that will be grabbed by the main thread.
279
280 It is believed that this tests whether or not we are handling ownership correctly.
281 */
282 pthread_t tid;
283 test_pool_context_st item(pool, pool_memc);
284
285 test_zero(pthread_create(&tid, NULL, connection_release, &item));
286 item.wait();
287
288 memcached_return_t rc;
289 memcached_st *pop_memc;
290 // We do a hard loop, and try N times
291 int counter= 5;
292 do
293 {
294 struct timespec relative_time= { 0, 0 };
295 pop_memc= memcached_pool_fetch(pool, &relative_time, &rc);
296
297 if (memcached_success(rc))
298 {
299 break;
300 }
301
302 if (memcached_failed(rc))
303 {
304 test_null(pop_memc);
305 test_true(rc != MEMCACHED_TIMEOUT); // As long as relative_time is zero, MEMCACHED_TIMEOUT is invalid
306 }
307 } while (--counter);
308
309 if (memcached_failed(rc)) // Cleanup thread since we will exit once we test.
310 {
311 pthread_join(tid, NULL);
312 test_compare(MEMCACHED_SUCCESS, rc);
313 }
314
315 {
316 int pthread_ret= pthread_join(tid, NULL);
317 test_true(pthread_ret == 0 or pthread_ret == ESRCH);
318 }
319 test_compare(MEMCACHED_SUCCESS, rc);
320 test_true(pool_memc == pop_memc);
321
322 test_true(memcached_pool_destroy(pool) == memc);
323
324 return TEST_SUCCESS;
325 }
326
327 static memcached_st * create_single_instance_memcached(const memcached_st *original_memc, const char *options)
328 {
329 /*
330 If no options are given, copy over at least the binary flag.
331 */
332 char options_buffer[1024]= { 0 };
333 if (options == NULL)
334 {
335 if (memcached_is_binary(original_memc))
336 {
337 snprintf(options_buffer, sizeof(options_buffer), "--BINARY");
338 }
339 }
340
341 /*
342 * I only want to hit _one_ server so I know the number of requests I'm
343 * sending in the pipeline.
344 */
345 memcached_server_instance_st instance= memcached_server_instance_by_position(original_memc, 0);
346
347 char server_string[1024];
348 int server_string_length;
349 if (instance->type == MEMCACHED_CONNECTION_UNIX_SOCKET)
350 {
351 if (options)
352 {
353 server_string_length= snprintf(server_string, sizeof(server_string), "--SOCKET=\"%s\" %s",
354 memcached_server_name(instance), options);
355 }
356 else
357 {
358 server_string_length= snprintf(server_string, sizeof(server_string), "--SOCKET=\"%s\"",
359 memcached_server_name(instance));
360 }
361 }
362 else
363 {
364 if (options)
365 {
366 server_string_length= snprintf(server_string, sizeof(server_string), "--server=%s:%d %s",
367 memcached_server_name(instance), int(memcached_server_port(instance)),
368 options);
369 }
370 else
371 {
372 server_string_length= snprintf(server_string, sizeof(server_string), "--server=%s:%d",
373 memcached_server_name(instance), int(memcached_server_port(instance)));
374 }
375 }
376
377 if (server_string_length <= 0)
378 {
379 return NULL;
380 }
381
382 char errror_buffer[1024];
383 if (memcached_failed(libmemcached_check_configuration(server_string, server_string_length, errror_buffer, sizeof(errror_buffer))))
384 {
385 Error << "Failed to parse (" << server_string << ") " << errror_buffer;
386 return NULL;
387 }
388
389 return memcached(server_string, server_string_length);
390 }
391
392 pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
393 static bool _running= false;
394
395 static void set_running(const bool arg)
396 {
397 int error;
398 if ((error= pthread_mutex_lock(&mutex)) != 0)
399 {
400 fatal_message(strerror(error));
401 }
402
403 _running= arg;
404
405 if ((error= pthread_mutex_unlock(&mutex)) != 0)
406 {
407 fatal_message(strerror(error));
408 }
409 }
410
411 static bool running()
412 {
413 int error;
414 bool ret;
415
416 if ((error= pthread_mutex_lock(&mutex)) != 0)
417 {
418 fatal_message(strerror(error));
419 }
420
421 ret= _running;
422
423 if ((error= pthread_mutex_unlock(&mutex)) != 0)
424 {
425 fatal_message(strerror(error));
426 }
427
428 return ret;
429 }
430
431 static void *worker_thread(void *ctx)
432 {
433 memcached_pool_st *pool= (memcached_pool_st *)ctx;
434
435 while (running())
436 {
437 memcached_return_t rc;
438 memcached_st *mc= memcached_pool_pop(pool, true, &rc);
439
440 if (mc == NULL)
441 {
442 Error << "failed to fetch a connection from the pool" << memcached_strerror(NULL, rc);
443 dream(1, 0);
444 continue;
445 }
446
447 rc= memcached_set(mc, "test:kv", 7, "value", 5, 600, 0);
448 if (memcached_failed(rc))
449 {
450 Out << "failed memcached_set()";
451 }
452
453 rc= memcached_pool_push(pool, mc);
454 if (memcached_failed(rc))
455 {
456 Error << "failed to release a connection to the pool" << memcached_strerror(NULL, rc);
457 }
458 }
459
460 return NULL;
461 }
462
463 #define NUM_THREADS 20
464 test_return_t regression_bug_962815(memcached_st *memc)
465 {
466 pthread_t pid[NUM_THREADS];
467
468 test_false(running());
469
470 memcached_st *master = create_single_instance_memcached(memc, 0);
471 test_true(master);
472
473 memcached_pool_st *pool= memcached_pool_create(master, 5, 10);
474
475 test_true(pool);
476
477 set_running(true);
478
479 for (size_t x=0; x < NUM_THREADS; x++)
480 {
481 test_compare(0, pthread_create(&pid[x], NULL, worker_thread, (void*)pool));
482 }
483
484 {
485 pollfd fds[1];
486 memset(fds, 0, sizeof(pollfd));
487 fds[0].fd= -1; //STDIN_FILENO;
488 fds[0].events= POLLIN;
489 fds[0].revents= 0;
490
491 int active_fd;
492 if ((active_fd= poll(fds, 1, 5000)) == -1)
493 {
494 Error << "poll() failed with:" << strerror(errno);
495 }
496 set_running(false);
497 }
498
499 for (size_t x=0; x < NUM_THREADS; x++)
500 {
501 test_compare(0, pthread_join(pid[x], NULL));
502 }
503
504 if (pool)
505 {
506 memcached_pool_destroy(pool);
507 }
508
509 if (master)
510 {
511 memcached_free(master);
512 }
513
514 return TEST_SUCCESS;
515 }