X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fmemslap.cc;h=a8c0da1ed7b61a531dbbe66cbc86682eae2b22e6;hb=5c09a7ed2edbb9876493f30d59433331b63f6bfc;hp=d8927e7ececbf3030b9d9da55b45449ecfd80e24;hpb=67456d74f5bd4f354a360d70da503dc58cbe5971;p=m6w6%2Flibmemcached diff --git a/clients/memslap.cc b/clients/memslap.cc index d8927e7e..a8c0da1e 100644 --- a/clients/memslap.cc +++ b/clients/memslap.cc @@ -37,18 +37,23 @@ #include -#include -#include -#include -#include -#include -#include -#include + +#include +#include +#include +#include #include -#include #include +#include #include -#include +#include +#include +#include +#include +#include +#include + +#include #include @@ -65,23 +70,16 @@ #define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers." /* Global Thread counter */ -volatile unsigned int thread_counter; -pthread_mutex_t counter_mutex; -pthread_cond_t count_threshhold; volatile unsigned int master_wakeup; pthread_mutex_t sleeper_mutex; pthread_cond_t sleep_threshhold; -void *run_task(void *p); - /* Types */ -typedef struct conclusions_st conclusions_st; -typedef struct thread_context_st thread_context_st; -typedef enum { +enum test_t { SET_TEST, GET_TEST, MGET_TEST -} test_type; +}; struct thread_context_st { unsigned int key_count; @@ -91,8 +89,37 @@ struct thread_context_st { unsigned int execute_number; char **keys; size_t *key_lengths; - test_type test; + test_t test; memcached_st *memc; + const memcached_st* root; + + thread_context_st(const memcached_st* memc_arg, test_t test_arg) : + key_count(0), + initial_pairs(NULL), + initial_number(0), + execute_pairs(NULL), + execute_number(0), + keys(0), + key_lengths(NULL), + test(test_arg), + memc(NULL), + root(memc_arg) + { + } + + void init() + { + memc= memcached_clone(NULL, root); + } + + ~thread_context_st() + { + if (execute_pairs) + { + pairs_free(execute_pairs); + } + memcached_free(memc); + } }; struct conclusions_st { @@ -100,6 +127,13 @@ struct conclusions_st { long int read_time; unsigned int rows_loaded; unsigned int rows_read; + + conclusions_st() : + load_time(0), + read_time(0), + rows_loaded(0), + rows_read() + { } }; /* Prototypes */ @@ -110,7 +144,7 @@ pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded); void flush_all(memcached_st *memc); -static int opt_binary= 0; +static bool opt_binary= 0; static int opt_verbose= 0; static int opt_flush= 0; static int opt_non_blocking_io= 0; @@ -120,36 +154,73 @@ static unsigned int opt_createial_load= 0; static unsigned int opt_concurrency= 0; static int opt_displayflag= 0; static char *opt_servers= NULL; -static int opt_udp_io= 0; -test_type opt_test= SET_TEST; +static bool opt_udp_io= false; +test_t opt_test= SET_TEST; + +extern "C" { + +static void *run_task(void *p) +{ + thread_context_st *context= (thread_context_st *)p; + + context->init(); + + pthread_mutex_lock(&sleeper_mutex); + while (master_wakeup) + { + pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); + } + pthread_mutex_unlock(&sleeper_mutex); + + /* Do Stuff */ + switch (context->test) + { + case SET_TEST: + assert(context->execute_pairs); + execute_set(context->memc, context->execute_pairs, context->execute_number); + break; + + case GET_TEST: + execute_get(context->memc, context->initial_pairs, context->initial_number); + break; + + case MGET_TEST: + execute_mget(context->memc, (const char*const*)context->keys, context->key_lengths, context->initial_number); + break; + } + + delete context; + + pthread_exit(0); +} + +} + int main(int argc, char *argv[]) { conclusions_st conclusion; - memcached_server_st *servers; - - memset(&conclusion, 0, sizeof(conclusions_st)); srandom((unsigned int)time(NULL)); options_parse(argc, argv); - if (!opt_servers) + if (opt_servers == NULL) { char *temp; if ((temp= getenv("MEMCACHED_SERVERS"))) + { opt_servers= strdup(temp); + } else { fprintf(stderr, "No Servers provided\n"); - exit(1); + return EXIT_FAILURE; } } - servers= memcached_servers_parse(opt_servers); + memcached_server_st *servers= memcached_servers_parse(opt_servers); - pthread_mutex_init(&counter_mutex, NULL); - pthread_cond_init(&count_threshhold, NULL); pthread_mutex_init(&sleeper_mutex, NULL); pthread_cond_init(&sleep_threshhold, NULL); @@ -157,56 +228,53 @@ int main(int argc, char *argv[]) free(opt_servers); - (void)pthread_mutex_destroy(&counter_mutex); - (void)pthread_cond_destroy(&count_threshhold); (void)pthread_mutex_destroy(&sleeper_mutex); (void)pthread_cond_destroy(&sleep_threshhold); conclusions_print(&conclusion); memcached_server_list_free(servers); - return 0; + return EXIT_SUCCESS; } void scheduler(memcached_server_st *servers, conclusions_st *conclusion) { unsigned int actual_loaded= 0; /* Fix warning */ - memcached_st *memc; struct timeval start_time, end_time; - pthread_t mainthread; /* Thread descriptor */ - pthread_attr_t attr; /* Thread attributes */ pairs_st *pairs= NULL; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, - PTHREAD_CREATE_DETACHED); + memcached_st *memc= memcached_create(NULL); - memc= memcached_create(NULL); + memcached_server_push(memc, servers); /* We need to set udp behavior before adding servers to the client */ if (opt_udp_io) { - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, - (uint64_t)opt_udp_io); - for (uint32_t x= 0; x < memcached_server_list_count(servers); x++ ) + if (memcached_failed(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, opt_udp_io))) { - servers[x].type= MEMCACHED_CONNECTION_UDP; + std::cerr << "Failed to enable UDP." << std::endl; + memcached_free(memc); + exit(EXIT_FAILURE); } } - memcached_server_push(memc, servers); memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, (uint64_t)opt_binary); if (opt_flush) + { flush_all(memc); + } + if (opt_createial_load) + { pairs= load_create_data(memc, opt_createial_load, &actual_loaded); + } char **keys= static_cast(calloc(actual_loaded, sizeof(char*))); size_t *key_lengths= static_cast(calloc(actual_loaded, sizeof(size_t))); - if (keys == NULL || key_lengths == NULL) + if (keys == NULL or key_lengths == NULL) { free(keys); free(key_lengths); @@ -226,23 +294,25 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) { if (opt_non_blocking_io) memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1); + if (opt_tcp_nodelay) memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1); } - pthread_mutex_lock(&counter_mutex); - thread_counter= 0; - pthread_mutex_lock(&sleeper_mutex); master_wakeup= 1; pthread_mutex_unlock(&sleeper_mutex); - for (uint32_t x= 0; x < opt_concurrency; x++) + pthread_t *threads= new (std::nothrow) pthread_t[opt_concurrency]; + + if (not threads) { - thread_context_st *context; - context= (thread_context_st *)calloc(1, sizeof(thread_context_st)); + exit(EXIT_FAILURE); + } - context->memc= memcached_clone(NULL, memc); + for (uint32_t x= 0; x < opt_concurrency; x++) + { + thread_context_st *context= new thread_context_st(memc, opt_test); context->test= opt_test; context->initial_pairs= pairs; @@ -257,31 +327,25 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) } /* now you create the thread */ - if (pthread_create(&mainthread, &attr, run_task, - (void *)context) != 0) + if (pthread_create(threads +x, NULL, run_task, (void *)context) != 0) { fprintf(stderr,"Could not create thread\n"); exit(1); } - thread_counter++; } - pthread_mutex_unlock(&counter_mutex); - pthread_attr_destroy(&attr); - pthread_mutex_lock(&sleeper_mutex); master_wakeup= 0; pthread_mutex_unlock(&sleeper_mutex); pthread_cond_broadcast(&sleep_threshhold); - gettimeofday(&start_time, NULL); - /* - We loop until we know that all children have cleaned up. - */ - pthread_mutex_lock(&counter_mutex); - while (thread_counter) - pthread_cond_wait(&count_threshhold, &counter_mutex); - pthread_mutex_unlock(&counter_mutex); + + for (uint32_t x= 0; x < opt_concurrency; x++) + { + void *retval; + pthread_join(threads[x], &retval); + } + delete [] threads; gettimeofday(&end_time, NULL); @@ -304,6 +368,7 @@ void options_parse(int argc, char *argv[]) { {(OPTIONSTRING)"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY}, {(OPTIONSTRING)"debug", no_argument, &opt_verbose, OPT_DEBUG}, + {(OPTIONSTRING)"quiet", no_argument, NULL, OPT_QUIET}, {(OPTIONSTRING)"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER}, {(OPTIONSTRING)"flag", no_argument, &opt_displayflag, OPT_FLAG}, {(OPTIONSTRING)"flush", no_argument, &opt_flush, OPT_FLUSH}, @@ -320,17 +385,20 @@ void options_parse(int argc, char *argv[]) {0, 0, 0, 0}, }; + bool opt_help= false; + bool opt_version= false; int option_index= 0; - int option_rv; - while (1) { - option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index); + int option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index); + if (option_rv == -1) break; + switch (option_rv) { case 0: break; + case OPT_UDP: if (opt_test == GET_TEST) { @@ -338,67 +406,98 @@ void options_parse(int argc, char *argv[]) "does not currently support get ops.\n"); exit(1); } - opt_udp_io= 1; + opt_udp_io= true; break; + case OPT_BINARY: - opt_binary = 1; + opt_binary= true; break; + case OPT_VERBOSE: /* --verbose or -v */ - opt_verbose = OPT_VERBOSE; + opt_verbose= OPT_VERBOSE; break; + case OPT_DEBUG: /* --debug or -d */ opt_verbose = OPT_DEBUG; break; + case OPT_VERSION: /* --version or -V */ - version_command(PROGRAM_NAME); + opt_version= true; break; + case OPT_HELP: /* --help or -h */ - help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options); + opt_help= true; break; + case OPT_SERVERS: /* --servers or -s */ opt_servers= strdup(optarg); break; + case OPT_SLAP_TEST: - if (!strcmp(optarg, "get")) + if (strcmp(optarg, "get") == 0) { if (opt_udp_io == 1) { fprintf(stderr, "You can not run a get test in UDP mode. UDP mode " "does not currently support get ops.\n"); - exit(1); + exit(EXIT_FAILURE); } opt_test= GET_TEST ; } - else if (!strcmp(optarg, "set")) + else if (strcmp(optarg, "set") == 0) + { opt_test= SET_TEST; - else if (!strcmp(optarg, "mget")) + } + else if (strcmp(optarg, "mget") == 0) { opt_test= MGET_TEST; } else { fprintf(stderr, "Your test, %s, is not a known test\n", optarg); - exit(1); + exit(EXIT_FAILURE); } break; + case OPT_SLAP_CONCURRENCY: opt_concurrency= (unsigned int)strtoul(optarg, (char **)NULL, 10); break; + case OPT_SLAP_EXECUTE_NUMBER: opt_execute_number= (unsigned int)strtoul(optarg, (char **)NULL, 10); break; + case OPT_SLAP_INITIAL_LOAD: opt_createial_load= (unsigned int)strtoul(optarg, (char **)NULL, 10); break; + + case OPT_QUIET: + close_stdio(); + break; + + case '?': /* getopt_long already printed an error message. */ - exit(1); + exit(EXIT_FAILURE); + default: abort(); } } - if ((opt_test == GET_TEST || opt_test == MGET_TEST) && opt_createial_load == 0) + if (opt_version) + { + version_command(PROGRAM_NAME); + exit(EXIT_SUCCESS); + } + + if (opt_help) + { + help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options); + exit(EXIT_SUCCESS); + } + + if ((opt_test == GET_TEST or opt_test == MGET_TEST) and opt_createial_load == 0) opt_createial_load= DEFAULT_INITIAL_LOAD; if (opt_execute_number == 0) @@ -423,54 +522,6 @@ void conclusions_print(conclusions_st *conclusion) conclusion->read_time % 1000); } -void *run_task(void *p) -{ - thread_context_st *context= (thread_context_st *)p; - memcached_st *memc; - - memc= context->memc; - - pthread_mutex_lock(&sleeper_mutex); - while (master_wakeup) - { - pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); - } - pthread_mutex_unlock(&sleeper_mutex); - - /* Do Stuff */ - switch (context->test) - { - case SET_TEST: - assert(context->execute_pairs); - execute_set(memc, context->execute_pairs, context->execute_number); - break; - case GET_TEST: - execute_get(memc, context->initial_pairs, context->initial_number); - break; - case MGET_TEST: - execute_mget(memc, (const char*const*)context->keys, context->key_lengths, - context->initial_number); - break; - default: - WATCHPOINT_ASSERT(context->test); - break; - } - - memcached_free(memc); - - if (context->execute_pairs) - pairs_free(context->execute_pairs); - - free(context); - - pthread_mutex_lock(&counter_mutex); - thread_counter--; - pthread_cond_signal(&count_threshhold); - pthread_mutex_unlock(&counter_mutex); - - return NULL; -} - void flush_all(memcached_st *memc) { memcached_flush(memc, 0); @@ -479,14 +530,11 @@ void flush_all(memcached_st *memc) pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded) { - memcached_st *memc_clone; - pairs_st *pairs; - - memc_clone= memcached_clone(NULL, memc); + memcached_st *memc_clone= memcached_clone(NULL, memc); /* We always used non-blocking IO for load since it is faster */ memcached_behavior_set(memc_clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0); - pairs= pairs_generate(number_of, 400); + pairs_st *pairs= pairs_generate(number_of, 400); *actual_loaded= execute_set(memc_clone, pairs, number_of); memcached_free(memc_clone);