a9fb731ff09c671e7a2fea2a0eef186399d0f7ee
[m6w6/libmemcached] / src / bin / memslap.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #include <cassert>
19 #include <cerrno>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <cstring>
23 #include <fcntl.h>
24 #include <getopt.h>
25 #include <memory>
26 #include <pthread.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #include <sys/time.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32
33 #include <iostream>
34
35 #include "libmemcached-1.0/memcached.h"
36
37 #include "client_options.h"
38 #include "utilities.h"
39 #include "generator.h"
40
41 #define DEFAULT_INITIAL_LOAD 10000
42 #define DEFAULT_EXECUTE_NUMBER 10000
43 #define DEFAULT_CONCURRENCY 1
44
45 #define VALUE_BYTES 4096
46
47 #define PROGRAM_NAME "memslap"
48 #define PROGRAM_DESCRIPTION "Generates a load against a memcached cluster of servers."
49
50 /* Global Thread counter */
51 volatile unsigned int master_wakeup;
52 pthread_mutex_t sleeper_mutex;
53 pthread_cond_t sleep_threshhold;
54
55 /* Types */
56 enum test_t { SET_TEST, GET_TEST, MGET_TEST };
57
58 struct thread_context_st {
59 unsigned int key_count;
60 pairs_st *initial_pairs;
61 unsigned int initial_number;
62 pairs_st *execute_pairs;
63 unsigned int execute_number;
64 char **keys;
65 size_t *key_lengths;
66 test_t test;
67 memcached_st *memc;
68 const memcached_st *root;
69
70 thread_context_st(const memcached_st *memc_arg, test_t test_arg)
71 : key_count(0)
72 , initial_pairs(NULL)
73 , initial_number(0)
74 , execute_pairs(NULL)
75 , execute_number(0)
76 , keys(0)
77 , key_lengths(NULL)
78 , test(test_arg)
79 , memc(NULL)
80 , root(memc_arg) {}
81
82 void init() {
83 memc = memcached_clone(NULL, root);
84 }
85
86 ~thread_context_st() {
87 if (execute_pairs) {
88 pairs_free(execute_pairs);
89 }
90 memcached_free(memc);
91 }
92 };
93
94 struct conclusions_st {
95 long int load_time;
96 long int read_time;
97 unsigned int rows_loaded;
98 unsigned int rows_read;
99
100 conclusions_st()
101 : load_time(0)
102 , read_time(0)
103 , rows_loaded(0)
104 , rows_read() {}
105 };
106
107 /* Prototypes */
108 void options_parse(int argc, char *argv[]);
109 void conclusions_print(conclusions_st *conclusion);
110 void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
111 pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded);
112 void flush_all(memcached_st *memc);
113
114 static bool opt_binary = 0;
115 static int opt_verbose = 0;
116 static int opt_flush = 0;
117 static int opt_non_blocking_io = 0;
118 static int opt_tcp_nodelay = 0;
119 static unsigned int opt_execute_number = 0;
120 static unsigned int opt_createial_load = 0;
121 static unsigned int opt_concurrency = 0;
122 static int opt_displayflag = 0;
123 static char *opt_servers = NULL;
124 static bool opt_udp_io = false;
125 test_t opt_test = SET_TEST;
126
127
128 unsigned int execute_set(memcached_st *memc, pairs_st *pairs, unsigned int number_of) {
129 uint32_t count = 0;
130 for (; count < number_of; ++count) {
131 memcached_return_t rc = memcached_set(memc, pairs[count].key, pairs[count].key_length,
132 pairs[count].value, pairs[count].value_length, 0, 0);
133 if (memcached_failed(rc)) {
134 fprintf(stderr, "%s:%d Failure on %u insert (%s) of %.*s\n", __FILE__, __LINE__, count,
135 memcached_last_error_message(memc), (unsigned int) pairs[count].key_length,
136 pairs[count].key);
137
138 // We will try to reconnect and see if that fixes the issue
139 memcached_quit(memc);
140
141 return count;
142 }
143 }
144
145 return count;
146 }
147
148 /*
149 Execute a memcached_get() on a set of pairs.
150 Return the number of rows retrieved.
151 */
152 static unsigned int execute_get(memcached_st *memc, pairs_st *pairs, unsigned int number_of) {
153 unsigned int x;
154 unsigned int retrieved;
155
156 for (retrieved = 0, x = 0; x < number_of; x++) {
157 size_t value_length;
158 uint32_t flags;
159
160 unsigned int fetch_key = (unsigned int) ((unsigned int) random() % number_of);
161
162 memcached_return_t rc;
163 char *value = memcached_get(memc, pairs[fetch_key].key, pairs[fetch_key].key_length,
164 &value_length, &flags, &rc);
165
166 if (memcached_failed(rc)) {
167 fprintf(stderr, "%s:%d Failure on read(%s) of %.*s\n", __FILE__, __LINE__,
168 memcached_last_error_message(memc), (unsigned int) pairs[fetch_key].key_length,
169 pairs[fetch_key].key);
170 } else {
171 retrieved++;
172 }
173
174 ::free(value);
175 }
176
177 return retrieved;
178 }
179
180 /**
181 * Callback function to count the number of results
182 */
183 static memcached_return_t callback_counter(const memcached_st *ptr, memcached_result_st *result,
184 void *context) {
185 (void) ptr;
186 (void) result;
187 unsigned int *counter = (unsigned int *) context;
188 *counter = *counter + 1;
189
190 return MEMCACHED_SUCCESS;
191 }
192
193 /**
194 * Try to run a large mget to get all of the keys
195 * @param memc memcached handle
196 * @param keys the keys to get
197 * @param key_length the length of the keys
198 * @param number_of the number of keys to try to get
199 * @return the number of keys received
200 */
201 static unsigned int execute_mget(memcached_st *memc, const char *const *keys, size_t *key_length,
202 unsigned int number_of) {
203 unsigned int retrieved = 0;
204 memcached_execute_fn callbacks[] = {callback_counter};
205 memcached_return_t rc;
206 rc = memcached_mget_execute(memc, keys, key_length, (size_t) number_of, callbacks, &retrieved, 1);
207
208 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_NOTFOUND || rc == MEMCACHED_BUFFERED
209 || rc == MEMCACHED_END)
210 {
211 rc = memcached_fetch_execute(memc, callbacks, (void *) &retrieved, 1);
212 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND && rc != MEMCACHED_END) {
213 fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__,
214 memcached_strerror(memc, rc));
215 memcached_quit(memc);
216 return 0;
217 }
218 } else {
219 fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__,
220 memcached_strerror(memc, rc));
221 memcached_quit(memc);
222 return 0;
223 }
224
225 return retrieved;
226 }
227
228 extern "C" {
229
230 static __attribute__((noreturn)) void *run_task(void *p) {
231 thread_context_st *context = (thread_context_st *) p;
232
233 context->init();
234
235 pthread_mutex_lock(&sleeper_mutex);
236 while (master_wakeup) {
237 pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
238 }
239 pthread_mutex_unlock(&sleeper_mutex);
240
241 /* Do Stuff */
242 switch (context->test) {
243 case SET_TEST:
244 assert(context->execute_pairs);
245 execute_set(context->memc, context->execute_pairs, context->execute_number);
246 break;
247
248 case GET_TEST:
249 execute_get(context->memc, context->initial_pairs, context->initial_number);
250 break;
251
252 case MGET_TEST:
253 execute_mget(context->memc, (const char *const *) context->keys, context->key_lengths,
254 context->initial_number);
255 break;
256 }
257
258 delete context;
259
260 pthread_exit(0);
261 }
262
263 } // extern "C"
264
265
266 int main(int argc, char *argv[]) {
267 conclusions_st conclusion;
268
269 srandom((unsigned int) time(NULL));
270 options_parse(argc, argv);
271
272 if (opt_servers == NULL) {
273 char *temp;
274
275 if ((temp = getenv("MEMCACHED_SERVERS"))) {
276 opt_servers = strdup(temp);
277 }
278
279 if (opt_servers == NULL) {
280 std::cerr << "No servers provided" << std::endl;
281 exit(EXIT_FAILURE);
282 }
283 }
284
285 memcached_server_st *servers = memcached_servers_parse(opt_servers);
286 if (servers == NULL or memcached_server_list_count(servers) == 0) {
287 std::cerr << "Invalid server list provided:" << opt_servers << std::endl;
288 return EXIT_FAILURE;
289 }
290
291 pthread_mutex_init(&sleeper_mutex, NULL);
292 pthread_cond_init(&sleep_threshhold, NULL);
293
294 int error_code = EXIT_SUCCESS;
295 try {
296 scheduler(servers, &conclusion);
297 } catch (std::exception &e) {
298 std::cerr << "Died with exception: " << e.what() << std::endl;
299 error_code = EXIT_FAILURE;
300 }
301
302 free(opt_servers);
303
304 (void) pthread_mutex_destroy(&sleeper_mutex);
305 (void) pthread_cond_destroy(&sleep_threshhold);
306 conclusions_print(&conclusion);
307 memcached_server_list_free(servers);
308
309 return error_code;
310 }
311
312 void scheduler(memcached_server_st *servers, conclusions_st *conclusion) {
313 unsigned int actual_loaded = 0; /* Fix warning */
314
315 struct timeval start_time, end_time;
316 pairs_st *pairs = NULL;
317
318 memcached_st *memc = memcached_create(NULL);
319
320 memcached_server_push(memc, servers);
321
322 /* We need to set udp behavior before adding servers to the client */
323 if (opt_udp_io) {
324 if (memcached_failed(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, opt_udp_io))) {
325 std::cerr << "Failed to enable UDP." << std::endl;
326 memcached_free(memc);
327 exit(EXIT_FAILURE);
328 }
329 }
330
331 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, (uint64_t) opt_binary);
332
333 if (opt_flush) {
334 flush_all(memc);
335 }
336
337 if (opt_createial_load) {
338 pairs = load_create_data(memc, opt_createial_load, &actual_loaded);
339 }
340
341 char **keys = static_cast<char **>(calloc(actual_loaded, sizeof(char *)));
342 size_t *key_lengths = static_cast<size_t *>(calloc(actual_loaded, sizeof(size_t)));
343
344 if (keys == NULL or key_lengths == NULL) {
345 free(keys);
346 free(key_lengths);
347 keys = NULL;
348 key_lengths = NULL;
349 } else {
350 for (uint32_t x = 0; x < actual_loaded; ++x) {
351 keys[x] = pairs[x].key;
352 key_lengths[x] = pairs[x].key_length;
353 }
354 }
355
356 /* We set this after we have loaded */
357 {
358 if (opt_non_blocking_io)
359 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
360
361 if (opt_tcp_nodelay)
362 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
363 }
364
365 pthread_mutex_lock(&sleeper_mutex);
366 master_wakeup = 1;
367 pthread_mutex_unlock(&sleeper_mutex);
368
369 pthread_t *threads = new (std::nothrow) pthread_t[opt_concurrency];
370
371 if (threads == NULL) {
372 exit(EXIT_FAILURE);
373 }
374
375 for (uint32_t x = 0; x < opt_concurrency; x++) {
376 thread_context_st *context = new thread_context_st(memc, opt_test);
377 context->test = opt_test;
378
379 context->initial_pairs = pairs;
380 context->initial_number = actual_loaded;
381 context->keys = keys;
382 context->key_lengths = key_lengths;
383
384 if (opt_test == SET_TEST) {
385 context->execute_pairs = pairs_generate(opt_execute_number, VALUE_BYTES);
386 context->execute_number = opt_execute_number;
387 }
388
389 /* now you create the thread */
390 if (pthread_create(threads + x, NULL, run_task, (void *) context)) {
391 fprintf(stderr, "Could not create thread\n");
392 exit(1);
393 }
394 }
395
396 pthread_mutex_lock(&sleeper_mutex);
397 master_wakeup = 0;
398 pthread_mutex_unlock(&sleeper_mutex);
399 pthread_cond_broadcast(&sleep_threshhold);
400 gettimeofday(&start_time, NULL);
401
402 for (uint32_t x = 0; x < opt_concurrency; x++) {
403 void *retval;
404 pthread_join(threads[x], &retval);
405 }
406 delete[] threads;
407
408 gettimeofday(&end_time, NULL);
409
410 conclusion->load_time = timedif(end_time, start_time);
411 conclusion->read_time = timedif(end_time, start_time);
412 free(keys);
413 free(key_lengths);
414 pairs_free(pairs);
415 memcached_free(memc);
416 }
417
418 void options_parse(int argc, char *argv[]) {
419 memcached_programs_help_st help_options[] = {
420 {0},
421 };
422
423 static struct option long_options[] = {
424 {(OPTIONSTRING) "concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
425 {(OPTIONSTRING) "debug", no_argument, &opt_verbose, OPT_DEBUG},
426 {(OPTIONSTRING) "quiet", no_argument, NULL, OPT_QUIET},
427 {(OPTIONSTRING) "execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
428 {(OPTIONSTRING) "flag", no_argument, &opt_displayflag, OPT_FLAG},
429 {(OPTIONSTRING) "flush", no_argument, &opt_flush, OPT_FLUSH},
430 {(OPTIONSTRING) "help", no_argument, NULL, OPT_HELP},
431 {(OPTIONSTRING) "initial-load", required_argument, NULL,
432 OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
433 {(OPTIONSTRING) "non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
434 {(OPTIONSTRING) "servers", required_argument, NULL, OPT_SERVERS},
435 {(OPTIONSTRING) "tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY},
436 {(OPTIONSTRING) "test", required_argument, NULL, OPT_SLAP_TEST},
437 {(OPTIONSTRING) "verbose", no_argument, &opt_verbose, OPT_VERBOSE},
438 {(OPTIONSTRING) "version", no_argument, NULL, OPT_VERSION},
439 {(OPTIONSTRING) "binary", no_argument, NULL, OPT_BINARY},
440 {(OPTIONSTRING) "udp", no_argument, NULL, OPT_UDP},
441 {0, 0, 0, 0},
442 };
443
444 bool opt_help = false;
445 bool opt_version = false;
446 int option_index = 0;
447 while (1) {
448 int option_rv = getopt_long(argc, argv, "Vhvds:", long_options, &option_index);
449
450 if (option_rv == -1)
451 break;
452
453 switch (option_rv) {
454 case 0:
455 break;
456
457 case OPT_UDP:
458 if (opt_test == GET_TEST) {
459 fprintf(stderr,
460 "You can not run a get test in UDP mode. UDP mode "
461 "does not currently support get ops.\n");
462 exit(1);
463 }
464 opt_udp_io = true;
465 break;
466
467 case OPT_BINARY:
468 opt_binary = true;
469 break;
470
471 case OPT_VERBOSE: /* --verbose or -v */
472 opt_verbose = OPT_VERBOSE;
473 break;
474
475 case OPT_DEBUG: /* --debug or -d */
476 opt_verbose = OPT_DEBUG;
477 break;
478
479 case OPT_VERSION: /* --version or -V */
480 opt_version = true;
481 break;
482
483 case OPT_HELP: /* --help or -h */
484 opt_help = true;
485 break;
486
487 case OPT_SERVERS: /* --servers or -s */
488 opt_servers = strdup(optarg);
489 break;
490
491 case OPT_SLAP_TEST:
492 if (strcmp(optarg, "get") == 0) {
493 if (opt_udp_io == 1) {
494 fprintf(stderr,
495 "You can not run a get test in UDP mode. UDP mode "
496 "does not currently support get ops.\n");
497 exit(EXIT_FAILURE);
498 }
499 opt_test = GET_TEST;
500 } else if (strcmp(optarg, "set") == 0) {
501 opt_test = SET_TEST;
502 } else if (strcmp(optarg, "mget") == 0) {
503 opt_test = MGET_TEST;
504 } else {
505 fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
506 exit(EXIT_FAILURE);
507 }
508 break;
509
510 case OPT_SLAP_CONCURRENCY:
511 errno = 0;
512 opt_concurrency = (unsigned int) strtoul(optarg, (char **) NULL, 10);
513 if (errno) {
514 fprintf(stderr, "Invalid value for concurrency: %s\n", optarg);
515 exit(EXIT_FAILURE);
516 }
517 break;
518
519 case OPT_SLAP_EXECUTE_NUMBER:
520 errno = 0;
521 opt_execute_number = (unsigned int) strtoul(optarg, (char **) NULL, 10);
522 if (errno) {
523 fprintf(stderr, "Invalid value for execute: %s\n", optarg);
524 exit(EXIT_FAILURE);
525 }
526 break;
527
528 case OPT_SLAP_INITIAL_LOAD:
529 errno = 0;
530 opt_createial_load = (unsigned int) strtoul(optarg, (char **) NULL, 10);
531 if (errno) {
532 fprintf(stderr, "Invalid value for initial load: %s\n", optarg);
533 exit(EXIT_FAILURE);
534 }
535 break;
536
537 case OPT_QUIET:
538 close_stdio();
539 break;
540
541 case '?':
542 /* getopt_long already printed an error message. */
543 exit(EXIT_FAILURE);
544
545 default:
546 abort();
547 }
548 }
549
550 if (opt_version) {
551 version_command(PROGRAM_NAME);
552 exit(EXIT_SUCCESS);
553 }
554
555 if (opt_help) {
556 help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
557 exit(EXIT_SUCCESS);
558 }
559
560 if ((opt_test == GET_TEST or opt_test == MGET_TEST) and opt_createial_load == 0)
561 opt_createial_load = DEFAULT_INITIAL_LOAD;
562
563 if (opt_execute_number == 0)
564 opt_execute_number = DEFAULT_EXECUTE_NUMBER;
565
566 if (opt_concurrency == 0)
567 opt_concurrency = DEFAULT_CONCURRENCY;
568 }
569
570 void conclusions_print(conclusions_st *conclusion) {
571 printf("\tThreads connecting to servers %u\n", opt_concurrency);
572 #ifdef NOT_FINISHED
573 printf("\tLoaded %u rows\n", conclusion->rows_loaded);
574 printf("\tRead %u rows\n", conclusion->rows_read);
575 #endif
576 if (opt_test == SET_TEST)
577 printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
578 conclusion->load_time % 1000);
579 else
580 printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
581 conclusion->read_time % 1000);
582 }
583
584 void flush_all(memcached_st *memc) {
585 memcached_flush(memc, 0);
586 }
587
588 pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
589 unsigned int *actual_loaded) {
590 memcached_st *memc_clone = memcached_clone(NULL, memc);
591 /* We always used non-blocking IO for load since it is faster */
592 memcached_behavior_set(memc_clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0);
593
594 pairs_st *pairs = pairs_generate(number_of, VALUE_BYTES);
595 *actual_loaded = execute_set(memc_clone, pairs, number_of);
596
597 memcached_free(memc_clone);
598
599 return pairs;
600 }