X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=src%2Fbin%2Fmemslap.cc;h=a9fb731ff09c671e7a2fea2a0eef186399d0f7ee;hb=2c5690e3e3384b98c1b4dc78c076ae9d59eb7d22;hp=b7b606e97e9bbc05870e583b879c8f17fa40706a;hpb=7c2da91b9897c54f66e7fc634a675fcaeef32167;p=m6w6%2Flibmemcached diff --git a/src/bin/memslap.cc b/src/bin/memslap.cc index b7b606e9..a9fb731f 100644 --- a/src/bin/memslap.cc +++ b/src/bin/memslap.cc @@ -37,7 +37,6 @@ #include "client_options.h" #include "utilities.h" #include "generator.h" -#include "execute.h" #define DEFAULT_INITIAL_LOAD 10000 #define DEFAULT_EXECUTE_NUMBER 10000 @@ -46,7 +45,7 @@ #define VALUE_BYTES 4096 #define PROGRAM_NAME "memslap" -#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers." +#define PROGRAM_DESCRIPTION "Generates a load against a memcached cluster of servers." /* Global Thread counter */ volatile unsigned int master_wakeup; @@ -80,7 +79,9 @@ struct thread_context_st { , memc(NULL) , root(memc_arg) {} - void init() { memc = memcached_clone(NULL, root); } + void init() { + memc = memcached_clone(NULL, root); + } ~thread_context_st() { if (execute_pairs) { @@ -123,6 +124,107 @@ static char *opt_servers = NULL; static bool opt_udp_io = false; test_t opt_test = SET_TEST; + +unsigned int execute_set(memcached_st *memc, pairs_st *pairs, unsigned int number_of) { + uint32_t count = 0; + for (; count < number_of; ++count) { + memcached_return_t rc = memcached_set(memc, pairs[count].key, pairs[count].key_length, + pairs[count].value, pairs[count].value_length, 0, 0); + if (memcached_failed(rc)) { + fprintf(stderr, "%s:%d Failure on %u insert (%s) of %.*s\n", __FILE__, __LINE__, count, + memcached_last_error_message(memc), (unsigned int) pairs[count].key_length, + pairs[count].key); + + // We will try to reconnect and see if that fixes the issue + memcached_quit(memc); + + return count; + } + } + + return count; +} + +/* + Execute a memcached_get() on a set of pairs. + Return the number of rows retrieved. +*/ +static unsigned int execute_get(memcached_st *memc, pairs_st *pairs, unsigned int number_of) { + unsigned int x; + unsigned int retrieved; + + for (retrieved = 0, x = 0; x < number_of; x++) { + size_t value_length; + uint32_t flags; + + unsigned int fetch_key = (unsigned int) ((unsigned int) random() % number_of); + + memcached_return_t rc; + char *value = memcached_get(memc, pairs[fetch_key].key, pairs[fetch_key].key_length, + &value_length, &flags, &rc); + + if (memcached_failed(rc)) { + fprintf(stderr, "%s:%d Failure on read(%s) of %.*s\n", __FILE__, __LINE__, + memcached_last_error_message(memc), (unsigned int) pairs[fetch_key].key_length, + pairs[fetch_key].key); + } else { + retrieved++; + } + + ::free(value); + } + + return retrieved; +} + +/** + * Callback function to count the number of results + */ +static memcached_return_t callback_counter(const memcached_st *ptr, memcached_result_st *result, + void *context) { + (void) ptr; + (void) result; + unsigned int *counter = (unsigned int *) context; + *counter = *counter + 1; + + return MEMCACHED_SUCCESS; +} + +/** + * Try to run a large mget to get all of the keys + * @param memc memcached handle + * @param keys the keys to get + * @param key_length the length of the keys + * @param number_of the number of keys to try to get + * @return the number of keys received + */ +static unsigned int execute_mget(memcached_st *memc, const char *const *keys, size_t *key_length, + unsigned int number_of) { + unsigned int retrieved = 0; + memcached_execute_fn callbacks[] = {callback_counter}; + memcached_return_t rc; + rc = memcached_mget_execute(memc, keys, key_length, (size_t) number_of, callbacks, &retrieved, 1); + + if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_NOTFOUND || rc == MEMCACHED_BUFFERED + || rc == MEMCACHED_END) + { + rc = memcached_fetch_execute(memc, callbacks, (void *) &retrieved, 1); + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND && rc != MEMCACHED_END) { + fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__, + memcached_strerror(memc, rc)); + memcached_quit(memc); + return 0; + } + } else { + fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__, + memcached_strerror(memc, rc)); + memcached_quit(memc); + return 0; + } + + return retrieved; +} + extern "C" { static __attribute__((noreturn)) void *run_task(void *p) { @@ -143,11 +245,13 @@ static __attribute__((noreturn)) void *run_task(void *p) { execute_set(context->memc, context->execute_pairs, context->execute_number); break; - case GET_TEST: execute_get(context->memc, context->initial_pairs, context->initial_number); break; + case GET_TEST: + execute_get(context->memc, context->initial_pairs, context->initial_number); + break; case MGET_TEST: execute_mget(context->memc, (const char *const *) context->keys, context->key_lengths, - context->initial_number); + context->initial_number); break; } @@ -155,7 +259,9 @@ static __attribute__((noreturn)) void *run_task(void *p) { pthread_exit(0); } -} + +} // extern "C" + int main(int argc, char *argv[]) { conclusions_st conclusion; @@ -281,7 +387,7 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) { } /* now you create the thread */ - if (pthread_create(threads + x, NULL, run_task, (void *) context) != 0) { + if (pthread_create(threads + x, NULL, run_task, (void *) context)) { fprintf(stderr, "Could not create thread\n"); exit(1); } @@ -345,7 +451,8 @@ void options_parse(int argc, char *argv[]) { break; switch (option_rv) { - case 0: break; + case 0: + break; case OPT_UDP: if (opt_test == GET_TEST) { @@ -357,17 +464,29 @@ void options_parse(int argc, char *argv[]) { opt_udp_io = true; break; - case OPT_BINARY: opt_binary = true; break; + case OPT_BINARY: + opt_binary = true; + break; - case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break; + case OPT_VERBOSE: /* --verbose or -v */ + opt_verbose = OPT_VERBOSE; + break; - case OPT_DEBUG: /* --debug or -d */ opt_verbose = OPT_DEBUG; break; + case OPT_DEBUG: /* --debug or -d */ + opt_verbose = OPT_DEBUG; + break; - case OPT_VERSION: /* --version or -V */ opt_version = true; break; + case OPT_VERSION: /* --version or -V */ + opt_version = true; + break; - case OPT_HELP: /* --help or -h */ opt_help = true; break; + case OPT_HELP: /* --help or -h */ + opt_help = true; + break; - case OPT_SERVERS: /* --servers or -s */ opt_servers = strdup(optarg); break; + case OPT_SERVERS: /* --servers or -s */ + opt_servers = strdup(optarg); + break; case OPT_SLAP_TEST: if (strcmp(optarg, "get") == 0) { @@ -391,7 +510,7 @@ void options_parse(int argc, char *argv[]) { case OPT_SLAP_CONCURRENCY: errno = 0; opt_concurrency = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno != 0) { + if (errno) { fprintf(stderr, "Invalid value for concurrency: %s\n", optarg); exit(EXIT_FAILURE); } @@ -400,7 +519,7 @@ void options_parse(int argc, char *argv[]) { case OPT_SLAP_EXECUTE_NUMBER: errno = 0; opt_execute_number = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno != 0) { + if (errno) { fprintf(stderr, "Invalid value for execute: %s\n", optarg); exit(EXIT_FAILURE); } @@ -409,19 +528,22 @@ void options_parse(int argc, char *argv[]) { case OPT_SLAP_INITIAL_LOAD: errno = 0; opt_createial_load = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno != 0) { + if (errno) { fprintf(stderr, "Invalid value for initial load: %s\n", optarg); exit(EXIT_FAILURE); } break; - case OPT_QUIET: close_stdio(); break; + case OPT_QUIET: + close_stdio(); + break; case '?': /* getopt_long already printed an error message. */ exit(EXIT_FAILURE); - default: abort(); + default: + abort(); } }