2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
16 #include "mem_config.h"
30 #include <sys/types.h>
35 #include "libmemcached-1.0/memcached.h"
37 #include "client_options.h"
38 #include "utilities.h"
39 #include "generator.h"
41 #define DEFAULT_INITIAL_LOAD 10000
42 #define DEFAULT_EXECUTE_NUMBER 10000
43 #define DEFAULT_CONCURRENCY 1
45 #define VALUE_BYTES 4096
47 #define PROGRAM_NAME "memslap"
48 #define PROGRAM_DESCRIPTION "Generates a load against a memcached cluster of servers."
50 /* Global Thread counter */
51 volatile unsigned int master_wakeup
;
52 pthread_mutex_t sleeper_mutex
;
53 pthread_cond_t sleep_threshhold
;
56 enum test_t
{ SET_TEST
, GET_TEST
, MGET_TEST
};
58 struct thread_context_st
{
59 unsigned int key_count
;
60 pairs_st
*initial_pairs
;
61 unsigned int initial_number
;
62 pairs_st
*execute_pairs
;
63 unsigned int execute_number
;
68 const memcached_st
*root
;
70 thread_context_st(const memcached_st
*memc_arg
, test_t test_arg
)
83 memc
= memcached_clone(NULL
, root
);
86 ~thread_context_st() {
88 pairs_free(execute_pairs
);
94 struct conclusions_st
{
97 unsigned int rows_loaded
;
98 unsigned int rows_read
;
108 void options_parse(int argc
, char *argv
[]);
109 void conclusions_print(conclusions_st
*conclusion
);
110 void scheduler(memcached_server_st
*servers
, conclusions_st
*conclusion
);
111 pairs_st
*load_create_data(memcached_st
*memc
, unsigned int number_of
, unsigned int *actual_loaded
);
112 void flush_all(memcached_st
*memc
);
114 static bool opt_binary
= 0;
115 static int opt_verbose
= 0;
116 static int opt_flush
= 0;
117 static int opt_non_blocking_io
= 0;
118 static int opt_tcp_nodelay
= 0;
119 static unsigned int opt_execute_number
= 0;
120 static unsigned int opt_createial_load
= 0;
121 static unsigned int opt_concurrency
= 0;
122 static int opt_displayflag
= 0;
123 static char *opt_servers
= NULL
;
124 static bool opt_udp_io
= false;
125 test_t opt_test
= SET_TEST
;
128 unsigned int execute_set(memcached_st
*memc
, pairs_st
*pairs
, unsigned int number_of
) {
130 for (; count
< number_of
; ++count
) {
131 memcached_return_t rc
= memcached_set(memc
, pairs
[count
].key
, pairs
[count
].key_length
,
132 pairs
[count
].value
, pairs
[count
].value_length
, 0, 0);
133 if (memcached_failed(rc
)) {
134 fprintf(stderr
, "%s:%d Failure on %u insert (%s) of %.*s\n", __FILE__
, __LINE__
, count
,
135 memcached_last_error_message(memc
), (unsigned int) pairs
[count
].key_length
,
138 // We will try to reconnect and see if that fixes the issue
139 memcached_quit(memc
);
149 Execute a memcached_get() on a set of pairs.
150 Return the number of rows retrieved.
152 static unsigned int execute_get(memcached_st
*memc
, pairs_st
*pairs
, unsigned int number_of
) {
154 unsigned int retrieved
;
156 for (retrieved
= 0, x
= 0; x
< number_of
; x
++) {
160 unsigned int fetch_key
= (unsigned int) ((unsigned int) random() % number_of
);
162 memcached_return_t rc
;
163 char *value
= memcached_get(memc
, pairs
[fetch_key
].key
, pairs
[fetch_key
].key_length
,
164 &value_length
, &flags
, &rc
);
166 if (memcached_failed(rc
)) {
167 fprintf(stderr
, "%s:%d Failure on read(%s) of %.*s\n", __FILE__
, __LINE__
,
168 memcached_last_error_message(memc
), (unsigned int) pairs
[fetch_key
].key_length
,
169 pairs
[fetch_key
].key
);
181 * Callback function to count the number of results
183 static memcached_return_t
callback_counter(const memcached_st
*ptr
, memcached_result_st
*result
,
187 unsigned int *counter
= (unsigned int *) context
;
188 *counter
= *counter
+ 1;
190 return MEMCACHED_SUCCESS
;
194 * Try to run a large mget to get all of the keys
195 * @param memc memcached handle
196 * @param keys the keys to get
197 * @param key_length the length of the keys
198 * @param number_of the number of keys to try to get
199 * @return the number of keys received
201 static unsigned int execute_mget(memcached_st
*memc
, const char *const *keys
, size_t *key_length
,
202 unsigned int number_of
) {
203 unsigned int retrieved
= 0;
204 memcached_execute_fn callbacks
[] = {callback_counter
};
205 memcached_return_t rc
;
206 rc
= memcached_mget_execute(memc
, keys
, key_length
, (size_t) number_of
, callbacks
, &retrieved
, 1);
208 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_NOTFOUND
|| rc
== MEMCACHED_BUFFERED
209 || rc
== MEMCACHED_END
)
211 rc
= memcached_fetch_execute(memc
, callbacks
, (void *) &retrieved
, 1);
212 if (rc
!= MEMCACHED_SUCCESS
&& rc
!= MEMCACHED_NOTFOUND
&& rc
!= MEMCACHED_END
) {
213 fprintf(stderr
, "%s:%d Failed to execute mget: %s\n", __FILE__
, __LINE__
,
214 memcached_strerror(memc
, rc
));
215 memcached_quit(memc
);
219 fprintf(stderr
, "%s:%d Failed to execute mget: %s\n", __FILE__
, __LINE__
,
220 memcached_strerror(memc
, rc
));
221 memcached_quit(memc
);
230 static __attribute__((noreturn
)) void *run_task(void *p
) {
231 thread_context_st
*context
= (thread_context_st
*) p
;
235 pthread_mutex_lock(&sleeper_mutex
);
236 while (master_wakeup
) {
237 pthread_cond_wait(&sleep_threshhold
, &sleeper_mutex
);
239 pthread_mutex_unlock(&sleeper_mutex
);
242 switch (context
->test
) {
244 assert(context
->execute_pairs
);
245 execute_set(context
->memc
, context
->execute_pairs
, context
->execute_number
);
249 execute_get(context
->memc
, context
->initial_pairs
, context
->initial_number
);
253 execute_mget(context
->memc
, (const char *const *) context
->keys
, context
->key_lengths
,
254 context
->initial_number
);
266 int main(int argc
, char *argv
[]) {
267 conclusions_st conclusion
;
269 srandom((unsigned int) time(NULL
));
270 options_parse(argc
, argv
);
272 if (opt_servers
== NULL
) {
275 if ((temp
= getenv("MEMCACHED_SERVERS"))) {
276 opt_servers
= strdup(temp
);
279 if (opt_servers
== NULL
) {
280 std::cerr
<< "No servers provided" << std::endl
;
285 memcached_server_st
*servers
= memcached_servers_parse(opt_servers
);
286 if (servers
== NULL
or memcached_server_list_count(servers
) == 0) {
287 std::cerr
<< "Invalid server list provided:" << opt_servers
<< std::endl
;
291 pthread_mutex_init(&sleeper_mutex
, NULL
);
292 pthread_cond_init(&sleep_threshhold
, NULL
);
294 int error_code
= EXIT_SUCCESS
;
296 scheduler(servers
, &conclusion
);
297 } catch (std::exception
&e
) {
298 std::cerr
<< "Died with exception: " << e
.what() << std::endl
;
299 error_code
= EXIT_FAILURE
;
304 (void) pthread_mutex_destroy(&sleeper_mutex
);
305 (void) pthread_cond_destroy(&sleep_threshhold
);
306 conclusions_print(&conclusion
);
307 memcached_server_list_free(servers
);
312 void scheduler(memcached_server_st
*servers
, conclusions_st
*conclusion
) {
313 unsigned int actual_loaded
= 0; /* Fix warning */
315 struct timeval start_time
, end_time
;
316 pairs_st
*pairs
= NULL
;
318 memcached_st
*memc
= memcached_create(NULL
);
320 memcached_server_push(memc
, servers
);
322 /* We need to set udp behavior before adding servers to the client */
324 if (memcached_failed(memcached_behavior_set(memc
, MEMCACHED_BEHAVIOR_USE_UDP
, opt_udp_io
))) {
325 std::cerr
<< "Failed to enable UDP." << std::endl
;
326 memcached_free(memc
);
331 memcached_behavior_set(memc
, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL
, (uint64_t) opt_binary
);
337 if (opt_createial_load
) {
338 pairs
= load_create_data(memc
, opt_createial_load
, &actual_loaded
);
341 char **keys
= static_cast<char **>(calloc(actual_loaded
, sizeof(char *)));
342 size_t *key_lengths
= static_cast<size_t *>(calloc(actual_loaded
, sizeof(size_t)));
344 if (keys
== NULL
or key_lengths
== NULL
) {
350 for (uint32_t x
= 0; x
< actual_loaded
; ++x
) {
351 keys
[x
] = pairs
[x
].key
;
352 key_lengths
[x
] = pairs
[x
].key_length
;
356 /* We set this after we have loaded */
358 if (opt_non_blocking_io
)
359 memcached_behavior_set(memc
, MEMCACHED_BEHAVIOR_NO_BLOCK
, 1);
362 memcached_behavior_set(memc
, MEMCACHED_BEHAVIOR_TCP_NODELAY
, 1);
365 pthread_mutex_lock(&sleeper_mutex
);
367 pthread_mutex_unlock(&sleeper_mutex
);
369 pthread_t
*threads
= new (std::nothrow
) pthread_t
[opt_concurrency
];
371 if (threads
== NULL
) {
375 for (uint32_t x
= 0; x
< opt_concurrency
; x
++) {
376 thread_context_st
*context
= new thread_context_st(memc
, opt_test
);
377 context
->test
= opt_test
;
379 context
->initial_pairs
= pairs
;
380 context
->initial_number
= actual_loaded
;
381 context
->keys
= keys
;
382 context
->key_lengths
= key_lengths
;
384 if (opt_test
== SET_TEST
) {
385 context
->execute_pairs
= pairs_generate(opt_execute_number
, VALUE_BYTES
);
386 context
->execute_number
= opt_execute_number
;
389 /* now you create the thread */
390 if (pthread_create(threads
+ x
, NULL
, run_task
, (void *) context
)) {
391 fprintf(stderr
, "Could not create thread\n");
396 pthread_mutex_lock(&sleeper_mutex
);
398 pthread_mutex_unlock(&sleeper_mutex
);
399 pthread_cond_broadcast(&sleep_threshhold
);
400 gettimeofday(&start_time
, NULL
);
402 for (uint32_t x
= 0; x
< opt_concurrency
; x
++) {
404 pthread_join(threads
[x
], &retval
);
408 gettimeofday(&end_time
, NULL
);
410 conclusion
->load_time
= timedif(end_time
, start_time
);
411 conclusion
->read_time
= timedif(end_time
, start_time
);
415 memcached_free(memc
);
418 void options_parse(int argc
, char *argv
[]) {
419 memcached_programs_help_st help_options
[] = {
423 static struct option long_options
[] = {
424 {(OPTIONSTRING
) "concurrency", required_argument
, NULL
, OPT_SLAP_CONCURRENCY
},
425 {(OPTIONSTRING
) "debug", no_argument
, &opt_verbose
, OPT_DEBUG
},
426 {(OPTIONSTRING
) "quiet", no_argument
, NULL
, OPT_QUIET
},
427 {(OPTIONSTRING
) "execute-number", required_argument
, NULL
, OPT_SLAP_EXECUTE_NUMBER
},
428 {(OPTIONSTRING
) "flag", no_argument
, &opt_displayflag
, OPT_FLAG
},
429 {(OPTIONSTRING
) "flush", no_argument
, &opt_flush
, OPT_FLUSH
},
430 {(OPTIONSTRING
) "help", no_argument
, NULL
, OPT_HELP
},
431 {(OPTIONSTRING
) "initial-load", required_argument
, NULL
,
432 OPT_SLAP_INITIAL_LOAD
}, /* Number to load initially */
433 {(OPTIONSTRING
) "non-blocking", no_argument
, &opt_non_blocking_io
, OPT_SLAP_NON_BLOCK
},
434 {(OPTIONSTRING
) "servers", required_argument
, NULL
, OPT_SERVERS
},
435 {(OPTIONSTRING
) "tcp-nodelay", no_argument
, &opt_tcp_nodelay
, OPT_SLAP_TCP_NODELAY
},
436 {(OPTIONSTRING
) "test", required_argument
, NULL
, OPT_SLAP_TEST
},
437 {(OPTIONSTRING
) "verbose", no_argument
, &opt_verbose
, OPT_VERBOSE
},
438 {(OPTIONSTRING
) "version", no_argument
, NULL
, OPT_VERSION
},
439 {(OPTIONSTRING
) "binary", no_argument
, NULL
, OPT_BINARY
},
440 {(OPTIONSTRING
) "udp", no_argument
, NULL
, OPT_UDP
},
444 bool opt_help
= false;
445 bool opt_version
= false;
446 int option_index
= 0;
448 int option_rv
= getopt_long(argc
, argv
, "Vhvds:", long_options
, &option_index
);
458 if (opt_test
== GET_TEST
) {
460 "You can not run a get test in UDP mode. UDP mode "
461 "does not currently support get ops.\n");
471 case OPT_VERBOSE
: /* --verbose or -v */
472 opt_verbose
= OPT_VERBOSE
;
475 case OPT_DEBUG
: /* --debug or -d */
476 opt_verbose
= OPT_DEBUG
;
479 case OPT_VERSION
: /* --version or -V */
483 case OPT_HELP
: /* --help or -h */
487 case OPT_SERVERS
: /* --servers or -s */
488 opt_servers
= strdup(optarg
);
492 if (strcmp(optarg
, "get") == 0) {
493 if (opt_udp_io
== 1) {
495 "You can not run a get test in UDP mode. UDP mode "
496 "does not currently support get ops.\n");
500 } else if (strcmp(optarg
, "set") == 0) {
502 } else if (strcmp(optarg
, "mget") == 0) {
503 opt_test
= MGET_TEST
;
505 fprintf(stderr
, "Your test, %s, is not a known test\n", optarg
);
510 case OPT_SLAP_CONCURRENCY
:
512 opt_concurrency
= (unsigned int) strtoul(optarg
, (char **) NULL
, 10);
514 fprintf(stderr
, "Invalid value for concurrency: %s\n", optarg
);
519 case OPT_SLAP_EXECUTE_NUMBER
:
521 opt_execute_number
= (unsigned int) strtoul(optarg
, (char **) NULL
, 10);
523 fprintf(stderr
, "Invalid value for execute: %s\n", optarg
);
528 case OPT_SLAP_INITIAL_LOAD
:
530 opt_createial_load
= (unsigned int) strtoul(optarg
, (char **) NULL
, 10);
532 fprintf(stderr
, "Invalid value for initial load: %s\n", optarg
);
542 /* getopt_long already printed an error message. */
551 version_command(PROGRAM_NAME
);
556 help_command(PROGRAM_NAME
, PROGRAM_DESCRIPTION
, long_options
, help_options
);
560 if ((opt_test
== GET_TEST
or opt_test
== MGET_TEST
) and opt_createial_load
== 0)
561 opt_createial_load
= DEFAULT_INITIAL_LOAD
;
563 if (opt_execute_number
== 0)
564 opt_execute_number
= DEFAULT_EXECUTE_NUMBER
;
566 if (opt_concurrency
== 0)
567 opt_concurrency
= DEFAULT_CONCURRENCY
;
570 void conclusions_print(conclusions_st
*conclusion
) {
571 printf("\tThreads connecting to servers %u\n", opt_concurrency
);
573 printf("\tLoaded %u rows\n", conclusion
->rows_loaded
);
574 printf("\tRead %u rows\n", conclusion
->rows_read
);
576 if (opt_test
== SET_TEST
)
577 printf("\tTook %ld.%03ld seconds to load data\n", conclusion
->load_time
/ 1000,
578 conclusion
->load_time
% 1000);
580 printf("\tTook %ld.%03ld seconds to read data\n", conclusion
->read_time
/ 1000,
581 conclusion
->read_time
% 1000);
584 void flush_all(memcached_st
*memc
) {
585 memcached_flush(memc
, 0);
588 pairs_st
*load_create_data(memcached_st
*memc
, unsigned int number_of
,
589 unsigned int *actual_loaded
) {
590 memcached_st
*memc_clone
= memcached_clone(NULL
, memc
);
591 /* We always used non-blocking IO for load since it is faster */
592 memcached_behavior_set(memc_clone
, MEMCACHED_BEHAVIOR_NO_BLOCK
, 0);
594 pairs_st
*pairs
= pairs_generate(number_of
, VALUE_BYTES
);
595 *actual_loaded
= execute_set(memc_clone
, pairs
, number_of
);
597 memcached_free(memc_clone
);