X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fmemslap.c;h=0d77fd6e172be763f923752a1dc5b243e348c360;hb=6e9731947eace4be83d553a242a44b36b3adcf22;hp=52abb4b1e2707fcf071bed31a4a18b50cbbd7341;hpb=3d1c182f0918fc41bb8a580494771f7504c5deb2;p=m6w6%2Flibmemcached diff --git a/clients/memslap.c b/clients/memslap.c index 52abb4b1..0d77fd6e 100644 --- a/clients/memslap.c +++ b/clients/memslap.c @@ -1,3 +1,42 @@ +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: + * + * Libmemcached library + * + * Copyright (C) 2011 Data Differential, http://datadifferential.com/ + * Copyright (C) 2006-2009 Brian Aker All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * The names of its contributors may not be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + + +#include #include #include #include @@ -6,10 +45,10 @@ #include #include #include -#include #include #include #include +#include #include @@ -41,6 +80,7 @@ typedef struct thread_context_st thread_context_st; typedef enum { SET_TEST, GET_TEST, + MGET_TEST } test_type; struct thread_context_st { @@ -49,6 +89,8 @@ struct thread_context_st { unsigned int initial_number; pairs_st *execute_pairs; unsigned int execute_number; + char **keys; + size_t *key_lengths; test_type test; memcached_st *memc; }; @@ -64,10 +106,11 @@ struct conclusions_st { void options_parse(int argc, char *argv[]); void conclusions_print(conclusions_st *conclusion); void scheduler(memcached_server_st *servers, conclusions_st *conclusion); -pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, +pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded); void flush_all(memcached_st *memc); +static int opt_binary= 0; static int opt_verbose= 0; static int opt_flush= 0; static int opt_non_blocking_io= 0; @@ -77,6 +120,7 @@ static unsigned int opt_createial_load= 0; static unsigned int opt_concurrency= 0; static int opt_displayflag= 0; static char *opt_servers= NULL; +static int opt_udp_io= 0; test_type opt_test= SET_TEST; int main(int argc, char *argv[]) @@ -86,7 +130,7 @@ int main(int argc, char *argv[]) memset(&conclusion, 0, sizeof(conclusions_st)); - srandom(time(NULL)); + srandom((unsigned int)time(NULL)); options_parse(argc, argv); if (!opt_servers) @@ -125,7 +169,6 @@ int main(int argc, char *argv[]) void scheduler(memcached_server_st *servers, conclusions_st *conclusion) { - unsigned int x; unsigned int actual_loaded= 0; /* Fix warning */ memcached_st *memc; @@ -139,13 +182,46 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) PTHREAD_CREATE_DETACHED); memc= memcached_create(NULL); + + /* We need to set udp behavior before adding servers to the client */ + if (opt_udp_io) + { + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, + (uint64_t)opt_udp_io); + for (uint32_t x= 0; x < memcached_server_list_count(servers); x++ ) + { + servers[x].type= MEMCACHED_CONNECTION_UDP; + } + } memcached_server_push(memc, servers); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, + (uint64_t)opt_binary); + if (opt_flush) flush_all(memc); if (opt_createial_load) pairs= load_create_data(memc, opt_createial_load, &actual_loaded); + char **keys= calloc(actual_loaded, sizeof(char*)); + size_t *key_lengths= calloc(actual_loaded, sizeof(size_t)); + + if (keys == NULL || key_lengths == NULL) + { + free(keys); + free(key_lengths); + keys= NULL; + key_lengths= NULL; + } + else + { + for (uint32_t x= 0; x < actual_loaded; ++x) + { + keys[x]= pairs[x].key; + key_lengths[x]= pairs[x].key_length; + } + } + /* We set this after we have loaded */ { if (opt_non_blocking_io) @@ -154,7 +230,6 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1); } - pthread_mutex_lock(&counter_mutex); thread_counter= 0; @@ -162,17 +237,18 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) master_wakeup= 1; pthread_mutex_unlock(&sleeper_mutex); - for (x= 0; x < opt_concurrency; x++) + for (uint32_t x= 0; x < opt_concurrency; x++) { thread_context_st *context; - context= (thread_context_st *)malloc(sizeof(thread_context_st)); - memset(context, 0, sizeof(thread_context_st)); + context= (thread_context_st *)calloc(1, sizeof(thread_context_st)); context->memc= memcached_clone(NULL, memc); context->test= opt_test; context->initial_pairs= pairs; context->initial_number= actual_loaded; + context->keys= keys; + context->key_lengths= key_lengths; if (opt_test == SET_TEST) { @@ -204,20 +280,15 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) */ pthread_mutex_lock(&counter_mutex); while (thread_counter) - { - struct timespec abstime; - - memset(&abstime, 0, sizeof(struct timespec)); - abstime.tv_sec= 1; - - pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime); - } + pthread_cond_wait(&count_threshhold, &counter_mutex); pthread_mutex_unlock(&counter_mutex); gettimeofday(&end_time, NULL); conclusion->load_time= timedif(end_time, start_time); conclusion->read_time= timedif(end_time, start_time); + free(keys); + free(key_lengths); pairs_free(pairs); memcached_free(memc); } @@ -231,26 +302,28 @@ void options_parse(int argc, char *argv[]) static struct option long_options[]= { - {"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY}, - {"debug", no_argument, &opt_verbose, OPT_DEBUG}, - {"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER}, - {"flag", no_argument, &opt_displayflag, OPT_FLAG}, - {"flush", no_argument, &opt_flush, OPT_FLUSH}, - {"help", no_argument, NULL, OPT_HELP}, - {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */ - {"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK}, - {"servers", required_argument, NULL, OPT_SERVERS}, - {"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY}, - {"test", required_argument, NULL, OPT_SLAP_TEST}, - {"verbose", no_argument, &opt_verbose, OPT_VERBOSE}, - {"version", no_argument, NULL, OPT_VERSION}, + {(OPTIONSTRING)"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY}, + {(OPTIONSTRING)"debug", no_argument, &opt_verbose, OPT_DEBUG}, + {(OPTIONSTRING)"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER}, + {(OPTIONSTRING)"flag", no_argument, &opt_displayflag, OPT_FLAG}, + {(OPTIONSTRING)"flush", no_argument, &opt_flush, OPT_FLUSH}, + {(OPTIONSTRING)"help", no_argument, NULL, OPT_HELP}, + {(OPTIONSTRING)"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */ + {(OPTIONSTRING)"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK}, + {(OPTIONSTRING)"servers", required_argument, NULL, OPT_SERVERS}, + {(OPTIONSTRING)"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY}, + {(OPTIONSTRING)"test", required_argument, NULL, OPT_SLAP_TEST}, + {(OPTIONSTRING)"verbose", no_argument, &opt_verbose, OPT_VERBOSE}, + {(OPTIONSTRING)"version", no_argument, NULL, OPT_VERSION}, + {(OPTIONSTRING)"binary", no_argument, NULL, OPT_BINARY}, + {(OPTIONSTRING)"udp", no_argument, NULL, OPT_UDP}, {0, 0, 0, 0}, }; int option_index= 0; int option_rv; - while (1) + while (1) { option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index); if (option_rv == -1) break; @@ -258,6 +331,18 @@ void options_parse(int argc, char *argv[]) { case 0: break; + case OPT_UDP: + if (opt_test == GET_TEST) + { + fprintf(stderr, "You can not run a get test in UDP mode. UDP mode " + "does not currently support get ops.\n"); + exit(1); + } + opt_udp_io= 1; + break; + case OPT_BINARY: + opt_binary = 1; + break; case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break; @@ -275,22 +360,35 @@ void options_parse(int argc, char *argv[]) break; case OPT_SLAP_TEST: if (!strcmp(optarg, "get")) + { + if (opt_udp_io == 1) + { + fprintf(stderr, "You can not run a get test in UDP mode. UDP mode " + "does not currently support get ops.\n"); + exit(1); + } opt_test= GET_TEST ; + } else if (!strcmp(optarg, "set")) opt_test= SET_TEST; - else + else if (!strcmp(optarg, "mget")) + { + opt_test= MGET_TEST; + } + else { fprintf(stderr, "Your test, %s, is not a known test\n", optarg); exit(1); } break; case OPT_SLAP_CONCURRENCY: - opt_concurrency= strtol(optarg, (char **)NULL, 10); + opt_concurrency= (unsigned int)strtoul(optarg, (char **)NULL, 10); + break; case OPT_SLAP_EXECUTE_NUMBER: - opt_execute_number= strtol(optarg, (char **)NULL, 10); + opt_execute_number= (unsigned int)strtoul(optarg, (char **)NULL, 10); break; case OPT_SLAP_INITIAL_LOAD: - opt_createial_load= strtol(optarg, (char **)NULL, 10); + opt_createial_load= (unsigned int)strtoul(optarg, (char **)NULL, 10); break; case '?': /* getopt_long already printed an error message. */ @@ -300,7 +398,7 @@ void options_parse(int argc, char *argv[]) } } - if (opt_test == GET_TEST && opt_createial_load == 0) + if ((opt_test == GET_TEST || opt_test == MGET_TEST) && opt_createial_load == 0) opt_createial_load= DEFAULT_INITIAL_LOAD; if (opt_execute_number == 0) @@ -318,10 +416,10 @@ void conclusions_print(conclusions_st *conclusion) printf("\tRead %u rows\n", conclusion->rows_read); #endif if (opt_test == SET_TEST) - printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, + printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, conclusion->load_time % 1000); else - printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, + printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, conclusion->read_time % 1000); } @@ -336,18 +434,26 @@ void *run_task(void *p) while (master_wakeup) { pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); - } + } pthread_mutex_unlock(&sleeper_mutex); /* Do Stuff */ switch (context->test) { case SET_TEST: + assert(context->execute_pairs); execute_set(memc, context->execute_pairs, context->execute_number); break; case GET_TEST: execute_get(memc, context->initial_pairs, context->initial_number); break; + case MGET_TEST: + execute_mget(memc, (const char*const*)context->keys, context->key_lengths, + context->initial_number); + break; + default: + WATCHPOINT_ASSERT(context->test); + break; } memcached_free(memc); @@ -355,9 +461,6 @@ void *run_task(void *p) if (context->execute_pairs) pairs_free(context->execute_pairs); - if (context->initial_pairs) - pairs_free(context->initial_pairs); - free(context); pthread_mutex_lock(&counter_mutex); @@ -373,20 +476,20 @@ void flush_all(memcached_st *memc) memcached_flush(memc, 0); } -pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, +pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded) { - memcached_st *clone; + memcached_st *memc_clone; pairs_st *pairs; - clone= memcached_clone(NULL, memc); + memc_clone= memcached_clone(NULL, memc); /* We always used non-blocking IO for load since it is faster */ - memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0); + memcached_behavior_set(memc_clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0); pairs= pairs_generate(number_of, 400); - *actual_loaded= execute_set(clone, pairs, number_of); + *actual_loaded= execute_set(memc_clone, pairs, number_of); - memcached_free(clone); + memcached_free(memc_clone); return pairs; }