X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fmemslap.c;h=18ac2dd1f2e187074d615bed58b02ea14da4fa17;hb=fe5b4854b046a8b0613c89303b04d7e60e4c8223;hp=01f11e2d75dd08d7d1efb494a882538e5455298a;hpb=aadde7bff9b295789dbf4a1eeb9b37825a970c02;p=m6w6%2Flibmemcached diff --git a/clients/memslap.c b/clients/memslap.c index 01f11e2d..18ac2dd1 100644 --- a/clients/memslap.c +++ b/clients/memslap.c @@ -1,389 +1,883 @@ -#include +/* + * memslap + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * All rights reserved. + * http://www.schoonerinfotech.com/ + * + * Use and distribution licensed under the BSD license. See + * the COPYING file for full text. + * + * Authors: + * Brian Aker + * Mingqiang Zhuang + * + */ +#include "config.h" + #include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include - -#include - -#include "client_options.h" -#include "utilities.h" -#include "generator.h" -#include "execute.h" - -#define DEFAULT_INITIAL_LOAD 10000 -#define DEFAULT_EXECUTE_NUMBER 10000 -#define DEFAULT_CONCURRENCY 1 - -#define PROGRAM_NAME "memslap" -#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers." - -/* Global Thread counter */ -volatile unsigned int thread_counter; -pthread_mutex_t counter_mutex; -pthread_cond_t count_threshhold; -volatile unsigned int master_wakeup; -pthread_mutex_t sleeper_mutex; -pthread_cond_t sleep_threshhold; - -void *run_task(void *p); - -/* Types */ -typedef struct conclusions_st conclusions_st; -typedef struct thread_context_st thread_context_st; -typedef enum { - SET_TEST, - GET_TEST, -} test_type; - -struct thread_context_st { - unsigned int key_count; - pairs_st *initial_pairs; - unsigned int initial_number; - pairs_st *execute_pairs; - unsigned int execute_number; - test_type test; - memcached_st *memc; -}; +#include +#if TIME_WITH_SYS_TIME +# include +# include +#else +# if HAVE_SYS_TIME_H +# include +# else +# include +# endif +#endif + + +#include "ms_sigsegv.h" +#include "ms_setting.h" +#include "ms_thread.h" + +#define PROGRAM_NAME "memslap" +#define PROGRAM_DESCRIPTION \ + "Generates workload against memcached servers." + +#ifdef __sun + /* For some odd reason the option struct on solaris defines the argument + * as char* and not const char* + */ +#define OPTIONSTRING char* +#else +#define OPTIONSTRING const char* +#endif -struct conclusions_st { - long int load_time; - long int read_time; - unsigned int rows_loaded; - unsigned int rows_read; +/* options */ +static struct option long_options[]= +{ + { (OPTIONSTRING)"servers", required_argument, NULL, + OPT_SERVERS }, + { (OPTIONSTRING)"threads", required_argument, NULL, + OPT_THREAD_NUMBER }, + { (OPTIONSTRING)"concurrency", required_argument, NULL, + OPT_CONCURRENCY }, + { (OPTIONSTRING)"conn_sock", required_argument, NULL, + OPT_SOCK_PER_CONN }, + { (OPTIONSTRING)"execute_number", required_argument, NULL, + OPT_EXECUTE_NUMBER }, + { (OPTIONSTRING)"time", required_argument, NULL, + OPT_TIME }, + { (OPTIONSTRING)"cfg_cmd", required_argument, NULL, + OPT_CONFIG_CMD }, + { (OPTIONSTRING)"win_size", required_argument, NULL, + OPT_WINDOW_SIZE }, + { (OPTIONSTRING)"fixed_size", required_argument, NULL, + OPT_FIXED_LTH }, + { (OPTIONSTRING)"verify", required_argument, NULL, + OPT_VERIFY }, + { (OPTIONSTRING)"division", required_argument, NULL, + OPT_GETS_DIVISION }, + { (OPTIONSTRING)"stat_freq", required_argument, NULL, + OPT_STAT_FREQ }, + { (OPTIONSTRING)"exp_verify", required_argument, NULL, + OPT_EXPIRE }, + { (OPTIONSTRING)"overwrite", required_argument, NULL, + OPT_OVERWRITE }, + { (OPTIONSTRING)"reconnect", no_argument, NULL, + OPT_RECONNECT }, + { (OPTIONSTRING)"udp", no_argument, NULL, + OPT_UDP }, + { (OPTIONSTRING)"facebook", no_argument, NULL, + OPT_FACEBOOK_TEST }, + { (OPTIONSTRING)"binary", no_argument, NULL, + OPT_BINARY_PROTOCOL }, + { (OPTIONSTRING)"tps", required_argument, NULL, + OPT_TPS }, + { (OPTIONSTRING)"rep_write", required_argument, NULL, + OPT_REP_WRITE_SRV }, + { (OPTIONSTRING)"verbose", no_argument, NULL, + OPT_VERBOSE }, + { (OPTIONSTRING)"help", no_argument, NULL, + OPT_HELP }, + { (OPTIONSTRING)"version", no_argument, NULL, + OPT_VERSION }, + { 0, 0, 0, 0 }, }; /* Prototypes */ -void options_parse(int argc, char *argv[]); -void conclusions_print(conclusions_st *conclusion); -void scheduler(memcached_server_st *servers, conclusions_st *conclusion); -pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, - unsigned int *actual_loaded); -void flush_all(memcached_st *memc); - -static int opt_verbose= 0; -static int opt_flush= 0; -static int opt_non_blocking_io= 0; -static int opt_tcp_nodelay= 0; -static unsigned int opt_execute_number= 0; -static unsigned int opt_createial_load= 0; -static unsigned int opt_concurrency= 0; -static int opt_displayflag= 0; -static char *opt_servers= NULL; -test_type opt_test= SET_TEST; - -int main(int argc, char *argv[]) +static void ms_sync_lock_init(void); +static void ms_sync_lock_destroy(void); +static void ms_global_struct_init(void); +static void ms_global_struct_destroy(void); +static void ms_version_command(const char *command_name); +static const char *ms_lookup_help(ms_options_t option); +static int64_t ms_parse_time(void); +static int64_t ms_parse_size(void); +static void ms_options_parse(int argc, char *argv[]); +static int ms_check_para(void); +static void ms_statistic_init(void); +static void ms_stats_init(void); +static void ms_print_statistics(int time); +static void ms_print_memslap_stats(struct timeval *start_time, + struct timeval *end_time); +static void ms_monitor_slap_mode(void); +void ms_help_command(const char *command_name, const char *description); + + +/* initialize the global locks */ +static void ms_sync_lock_init() { - conclusions_st conclusion; - memcached_server_st *servers; + ms_global.init_lock.count= 0; + pthread_mutex_init(&ms_global.init_lock.lock, NULL); + pthread_cond_init(&ms_global.init_lock.cond, NULL); - memset(&conclusion, 0, sizeof(conclusions_st)); + ms_global.warmup_lock.count = 0; + pthread_mutex_init(&ms_global.warmup_lock.lock, NULL); + pthread_cond_init(&ms_global.warmup_lock.cond, NULL); - srandom(time(NULL)); - options_parse(argc, argv); + ms_global.run_lock.count= 0; + pthread_mutex_init(&ms_global.run_lock.lock, NULL); + pthread_cond_init(&ms_global.run_lock.cond, NULL); - if (!opt_servers) - { - char *temp; + pthread_mutex_init(&ms_global.quit_mutex, NULL); + pthread_mutex_init(&ms_global.seq_mutex, NULL); +} /* ms_sync_lock_init */ - if ((temp= getenv("MEMCACHED_SERVERS"))) - opt_servers= strdup(temp); - else - { - fprintf(stderr, "No Servers provided\n"); - exit(1); - } - } - servers= memcached_servers_parse(opt_servers); +/* destroy the global locks */ +static void ms_sync_lock_destroy() +{ + pthread_mutex_destroy(&ms_global.init_lock.lock); + pthread_cond_destroy(&ms_global.init_lock.cond); + + pthread_mutex_destroy(&ms_global.warmup_lock.lock); + pthread_cond_destroy(&ms_global.warmup_lock.cond); - pthread_mutex_init(&counter_mutex, NULL); - pthread_cond_init(&count_threshhold, NULL); - pthread_mutex_init(&sleeper_mutex, NULL); - pthread_cond_init(&sleep_threshhold, NULL); + pthread_mutex_destroy(&ms_global.run_lock.lock); + pthread_cond_destroy(&ms_global.run_lock.cond); - scheduler(servers, &conclusion); + pthread_mutex_destroy(&ms_global.quit_mutex); + pthread_mutex_destroy(&ms_global.seq_mutex); - free(opt_servers); + if (ms_setting.stat_freq > 0) + { + pthread_mutex_destroy(&ms_statistic.stat_mutex); + } +} /* ms_sync_lock_destroy */ - (void)pthread_mutex_destroy(&counter_mutex); - (void)pthread_cond_destroy(&count_threshhold); - (void)pthread_mutex_destroy(&sleeper_mutex); - (void)pthread_cond_destroy(&sleep_threshhold); - conclusions_print(&conclusion); - memcached_server_list_free(servers); - return 0; +/* initialize the global structure */ +static void ms_global_struct_init() +{ + ms_sync_lock_init(); + ms_global.finish_warmup= false; + ms_global.time_out= false; } -void scheduler(memcached_server_st *servers, conclusions_st *conclusion) + +/* destroy the global structure */ +static void ms_global_struct_destroy() { - unsigned int x; - unsigned int actual_loaded= 0; /* Fix warning */ - memcached_st *memc; + ms_sync_lock_destroy(); +} - struct timeval start_time, end_time; - pthread_t mainthread; /* Thread descriptor */ - pthread_attr_t attr; /* Thread attributes */ - pairs_st *pairs= NULL; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, - PTHREAD_CREATE_DETACHED); +/** + * output the version information + * + * @param command_name, the string of this process + */ +static void ms_version_command(const char *command_name) +{ + printf("%s v%u.%u\n", command_name, 1U, 0U); + exit(0); +} - memc= memcached_create(NULL); - memcached_server_push(memc, servers); - if (opt_flush) - flush_all(memc); - if (opt_createial_load) - pairs= load_create_data(memc, opt_createial_load, &actual_loaded); +/** + * get the description of the option + * + * @param option, option of command line + * + * @return char*, description of the command option + */ +static const char *ms_lookup_help(ms_options_t option) +{ + switch (option) + { + case OPT_SERVERS: + return + "List one or more servers to connect. Servers count must be less than\n" + " threads count. e.g.: --servers=localhost:1234,localhost:11211"; + + case OPT_VERSION: + return "Display the version of the application and then exit."; + + case OPT_HELP: + return "Display this message and then exit."; + + case OPT_EXECUTE_NUMBER: + return "Number of operations(get and set) to execute for the\n" + " given test. Default 1000000."; + + case OPT_THREAD_NUMBER: + return + "Number of threads to startup, better equal to CPU numbers. Default 8."; + + case OPT_CONCURRENCY: + return "Number of concurrency to simulate with load. Default 128."; + + case OPT_FIXED_LTH: + return "Fixed length of value."; + + case OPT_VERIFY: + return "The proportion of date verification, e.g.: --verify=0.01"; + + case OPT_GETS_DIVISION: + return "Number of keys to multi-get once. Default 1, means single get."; + + case OPT_TIME: + return + "How long the test to run, suffix: s-seconds, m-minutes, h-hours,\n" + " d-days e.g.: --time=2h."; + + case OPT_CONFIG_CMD: + return + "Load the configure file to get command,key and value distribution list."; + + case OPT_WINDOW_SIZE: + return + "Task window size of each concurrency, suffix: K, M e.g.: --win_size=10k.\n" + " Default 10k."; + + case OPT_UDP: + return + "UDP support, default memslap uses TCP, TCP port and UDP port of\n" + " server must be same."; + + case OPT_EXPIRE: + return + "The proportion of objects with expire time, e.g.: --exp_verify=0.01.\n" + " Default no object with expire time"; + + case OPT_OVERWRITE: + return + "The proportion of objects need overwrite, e.g.: --overwrite=0.01.\n" + " Default never overwrite object."; + + case OPT_STAT_FREQ: + return + "Frequency of dumping statistic information. suffix: s-seconds,\n" + " m-minutes, e.g.: --resp_freq=10s."; + + case OPT_SOCK_PER_CONN: + return "Number of TCP socks per concurrency. Default 1."; + + case OPT_RECONNECT: + return + "Reconnect support, when connection is closed it will be reconnected."; + + case OPT_VERBOSE: + return + "Whether it outputs detailed information when verification fails."; + + case OPT_FACEBOOK_TEST: + return + "Whether it enables facebook test feature, set with TCP and multi-get with UDP."; + + case OPT_BINARY_PROTOCOL: + return + "Whether it enables binary protocol. Default with ASCII protocol."; + + case OPT_TPS: + return "Expected throughput, suffix: K, e.g.: --tps=10k."; + + case OPT_REP_WRITE_SRV: + return "The first nth servers can write data, e.g.: --rep_write=2."; + + default: + return "Forgot to document this option :)"; + } /* switch */ +} /* ms_lookup_help */ + + +/** + * output the help information + * + * @param command_name, the string of this process + * @param description, description of this process + * @param long_options, global options array + */ +void ms_help_command(const char *command_name, const char *description) +{ + char *help_message= NULL; - /* We set this after we have loaded */ + printf("%s v%u.%u\n", command_name, 1U, 0U); + printf(" %s\n\n", description); + printf( + "Usage:\n" + " memslap -hV | -s servers [-F config_file] [-t time | -x exe_num] [...]\n\n" + "Options:\n"); + + for (int x= 0; long_options[x].name; x++) { - if (opt_non_blocking_io) - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1); - if (opt_tcp_nodelay) - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1); + printf(" -%c, --%s%c\n", long_options[x].val, long_options[x].name, + long_options[x].has_arg ? '=' : ' '); + + if ((help_message= (char *)ms_lookup_help(long_options[x].val)) != NULL) + { + printf(" %s\n", help_message); + } } + printf( + "\nExamples:\n" + " memslap -s 127.0.0.1:11211 -S 5s\n" + " memslap -s 127.0.0.1:11211 -t 2m -v 0.2 -e 0.05 -b\n" + " memslap -s 127.0.0.1:11211 -F config -t 2m -w 40k -S 20s -o 0.2\n" + " memslap -s 127.0.0.1:11211 -F config -t 2m -T 4 -c 128 -d 20 -P 40k\n" + " memslap -s 127.0.0.1:11211 -F config -t 2m -d 50 -a -n 40\n" + " memslap -s 127.0.0.1:11211,127.0.0.1:11212 -F config -t 2m\n" + " memslap -s 127.0.0.1:11211,127.0.0.1:11212 -F config -t 2m -p 2\n\n"); + + exit(0); +} /* ms_help_command */ - pthread_mutex_lock(&counter_mutex); - thread_counter= 0; - pthread_mutex_lock(&sleeper_mutex); - master_wakeup= 1; - pthread_mutex_unlock(&sleeper_mutex); +/* used to parse the time string */ +static int64_t ms_parse_time() +{ + int64_t ret= 0; + char unit= optarg[strlen(optarg) - 1]; + + optarg[strlen(optarg) - 1]= '\0'; + ret= atoi(optarg); - for (x= 0; x < opt_concurrency; x++) + switch (unit) { - thread_context_st *context; - context= (thread_context_st *)malloc(sizeof(thread_context_st)); - memset(context, 0, sizeof(thread_context_st)); + case 'd': + case 'D': + ret*= 24; - context->memc= memcached_clone(NULL, memc); - context->test= opt_test; + case 'h': + case 'H': + ret*= 60; - context->initial_pairs= pairs; - context->initial_number= actual_loaded; + case 'm': + case 'M': + ret*= 60; - if (opt_test == SET_TEST) - { - context->execute_pairs= pairs_generate(opt_execute_number, 400); - context->execute_number= opt_execute_number; - } + case 's': + case 'S': + break; - /* now you create the thread */ - if (pthread_create(&mainthread, &attr, run_task, - (void *)context) != 0) - { - fprintf(stderr,"Could not create thread\n"); - exit(1); - } - thread_counter++; - } + default: + ret= -1; + break; + } /* switch */ - pthread_mutex_unlock(&counter_mutex); - pthread_attr_destroy(&attr); + return ret; +} /* ms_parse_time */ - pthread_mutex_lock(&sleeper_mutex); - master_wakeup= 0; - pthread_mutex_unlock(&sleeper_mutex); - pthread_cond_broadcast(&sleep_threshhold); - gettimeofday(&start_time, NULL); - /* - We loop until we know that all children have cleaned up. - */ - pthread_mutex_lock(&counter_mutex); - while (thread_counter) - { - struct timespec abstime; +/* used to parse the size string */ +static int64_t ms_parse_size() +{ + int64_t ret= -1; + char unit= optarg[strlen(optarg) - 1]; - memset(&abstime, 0, sizeof(struct timespec)); - abstime.tv_sec= 1; + optarg[strlen(optarg) - 1]= '\0'; + ret= strtoll(optarg, (char **)NULL, 10); - pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime); - } - pthread_mutex_unlock(&counter_mutex); + switch (unit) + { + case 'k': + case 'K': + ret*= 1024; + break; - gettimeofday(&end_time, NULL); + case 'm': + case 'M': + ret*= 1024 * 1024; + break; - conclusion->load_time= timedif(end_time, start_time); - conclusion->read_time= timedif(end_time, start_time); - pairs_free(pairs); - memcached_free(memc); -} + case 'g': + case 'G': + ret*= 1024 * 1024 * 1024; + break; -void options_parse(int argc, char *argv[]) -{ - memcached_programs_help_st help_options[]= - { - {0}, - }; + default: + ret= -1; + break; + } /* switch */ + + return ret; +} /* ms_parse_size */ - static struct option long_options[]= - { - {"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY}, - {"debug", no_argument, &opt_verbose, OPT_DEBUG}, - {"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER}, - {"flag", no_argument, &opt_displayflag, OPT_FLAG}, - {"flush", no_argument, &opt_flush, OPT_FLUSH}, - {"help", no_argument, NULL, OPT_HELP}, - {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */ - {"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK}, - {"servers", required_argument, NULL, OPT_SERVERS}, - {"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY}, - {"test", required_argument, NULL, OPT_SLAP_TEST}, - {"verbose", no_argument, &opt_verbose, OPT_VERBOSE}, - {"version", no_argument, NULL, OPT_VERSION}, - {0, 0, 0, 0}, - }; +/* used to parse the options of command line */ +static void ms_options_parse(int argc, char *argv[]) +{ int option_index= 0; int option_rv; - while (1) + while ((option_rv= getopt_long(argc, argv, "VhURbaBs:x:T:c:X:v:d:" + "t:S:F:w:e:o:n:P:p:", + long_options, &option_index)) != -1) { - option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index); - if (option_rv == -1) break; switch (option_rv) { case 0: break; - case OPT_VERBOSE: /* --verbose or -v */ - opt_verbose = OPT_VERBOSE; + + case OPT_VERSION: /* --version or -V */ + ms_version_command(PROGRAM_NAME); + break; + + case OPT_HELP: /* --help or -h */ + ms_help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION); + break; + + case OPT_SERVERS: /* --servers or -s */ + ms_setting.srv_str= strdup(optarg); + break; + + case OPT_CONCURRENCY: /* --concurrency or -c */ + ms_setting.nconns= (uint32_t)strtoul(optarg, (char **) NULL, 10); + if (ms_setting.nconns <= 0) + { + fprintf(stderr, "Concurrency must be greater than 0.:-)\n"); + exit(1); + } + break; + + case OPT_EXECUTE_NUMBER: /* --execute_number or -x */ + ms_setting.exec_num= (int)strtol(optarg, (char **) NULL, 10); + if (ms_setting.exec_num <= 0) + { + fprintf(stderr, "Execute number must be greater than 0.:-)\n"); + exit(1); + } + break; + + case OPT_THREAD_NUMBER: /* --threads or -T */ + ms_setting.nthreads= (uint32_t)strtoul(optarg, (char **) NULL, 10); + if (ms_setting.nthreads <= 0) + { + fprintf(stderr, "Threads number must be greater than 0.:-)\n"); + exit(1); + } + break; + + case OPT_FIXED_LTH: /* --fixed_size or -X */ + ms_setting.fixed_value_size= (size_t)strtoull(optarg, (char **) NULL, 10); + if ((ms_setting.fixed_value_size <= 0) + || (ms_setting.fixed_value_size > MAX_VALUE_SIZE)) + { + fprintf(stderr, "Value size must be between 0 and 1M.:-)\n"); + exit(1); + } + break; + + case OPT_VERIFY: /* --verify or -v */ + ms_setting.verify_percent= atof(optarg); + if ((ms_setting.verify_percent <= 0) + || (ms_setting.verify_percent > 1.0)) + { + fprintf(stderr, "Data verification rate must be " + "greater than 0 and less than 1.0. :-)\n"); + exit(1); + } + break; + + case OPT_GETS_DIVISION: /* --division or -d */ + ms_setting.mult_key_num= (int)strtol(optarg, (char **) NULL, 10); + if (ms_setting.mult_key_num <= 0) + { + fprintf(stderr, "Multi-get key number must be greater than 0.:-)\n"); + exit(1); + } + break; + + case OPT_TIME: /* --time or -t */ + ms_setting.run_time= (int)ms_parse_time(); + if (ms_setting.run_time == -1) + { + fprintf(stderr, "Please specify the run time. :-)\n" + "'s' for second, 'm' for minute, 'h' for hour, " + "'d' for day. e.g.: --time=24h (means 24 hours).\n"); + exit(1); + } + + if (ms_setting.run_time == 0) + { + fprintf(stderr, "Running time can not be 0. :-)\n"); + exit(1); + } + break; + + case OPT_CONFIG_CMD: /* --cfg_cmd or -F */ + ms_setting.cfg_file= strdup(optarg); + break; + + case OPT_WINDOW_SIZE: /* --win_size or -w */ + ms_setting.win_size= (size_t)ms_parse_size(); + if (ms_setting.win_size == (size_t)-1) + { + fprintf( + stderr, + "Please specify the item window size. :-)\n" + "e.g.: --win_size=10k (means 10k task window size).\n"); + exit(1); + } break; - case OPT_DEBUG: /* --debug or -d */ - opt_verbose = OPT_DEBUG; + + case OPT_UDP: /* --udp or -U*/ + ms_setting.udp= true; break; - case OPT_VERSION: /* --version or -V */ - version_command(PROGRAM_NAME); + + case OPT_EXPIRE: /* --exp_verify or -e */ + ms_setting.exp_ver_per= atof(optarg); + if ((ms_setting.exp_ver_per <= 0) || (ms_setting.exp_ver_per > 1.0)) + { + fprintf(stderr, "Expire time verification rate must be " + "greater than 0 and less than 1.0. :-)\n"); + exit(1); + } break; - case OPT_HELP: /* --help or -h */ - help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options); + + case OPT_OVERWRITE: /* --overwrite or -o */ + ms_setting.overwrite_percent= atof(optarg); + if ((ms_setting.overwrite_percent <= 0) + || (ms_setting.overwrite_percent > 1.0)) + { + fprintf(stderr, "Objects overwrite rate must be " + "greater than 0 and less than 1.0. :-)\n"); + exit(1); + } break; - case OPT_SERVERS: /* --servers or -s */ - opt_servers= strdup(optarg); + + case OPT_STAT_FREQ: /* --stat_freq or -S */ + ms_setting.stat_freq= (int)ms_parse_time(); + if (ms_setting.stat_freq == -1) + { + fprintf(stderr, "Please specify the frequency of dumping " + "statistic information. :-)\n" + "'s' for second, 'm' for minute, 'h' for hour, " + "'d' for day. e.g.: --time=24h (means 24 hours).\n"); + exit(1); + } + + if (ms_setting.stat_freq == 0) + { + fprintf(stderr, "The frequency of dumping statistic information " + "can not be 0. :-)\n"); + exit(1); + } break; - case OPT_SLAP_TEST: - if (!strcmp(optarg, "get")) - opt_test= GET_TEST ; - else if (!strcmp(optarg, "set")) - opt_test= SET_TEST; - else + + case OPT_SOCK_PER_CONN: /* --conn_sock or -n */ + ms_setting.sock_per_conn= (uint32_t)strtoul(optarg, (char **) NULL, 10); + if (ms_setting.sock_per_conn <= 0) { - fprintf(stderr, "Your test, %s, is not a known test\n", optarg); + fprintf(stderr, "Number of socks of each concurrency " + "must be greater than 0.:-)\n"); exit(1); } break; - case OPT_SLAP_CONCURRENCY: - opt_concurrency= strtol(optarg, (char **)NULL, 10); - case OPT_SLAP_EXECUTE_NUMBER: - opt_execute_number= strtol(optarg, (char **)NULL, 10); + + case OPT_RECONNECT: /* --reconnect or -R */ + ms_setting.reconnect= true; + break; + + case OPT_VERBOSE: /* --verbose or -b */ + ms_setting.verbose= true; + break; + + case OPT_FACEBOOK_TEST: /* --facebook or -a */ + ms_setting.facebook_test= true; + break; + + case OPT_BINARY_PROTOCOL: /* --binary or -B */ + ms_setting.binary_prot= true; break; - case OPT_SLAP_INITIAL_LOAD: - opt_createial_load= strtol(optarg, (char **)NULL, 10); + + case OPT_TPS: /* --tps or -P */ + ms_setting.expected_tps= (int)ms_parse_size(); + if (ms_setting.expected_tps == -1) + { + fprintf(stderr, + "Please specify the item expected throughput. :-)\n" + "e.g.: --tps=10k (means 10k throughput).\n"); + exit(1); + } break; + + case OPT_REP_WRITE_SRV: /* --rep_write or -p */ + ms_setting.rep_write_srv= (uint32_t)strtoul(optarg, (char **) NULL, 10); + if (ms_setting.rep_write_srv <= 0) + { + fprintf(stderr, + "Number of replication writing server must be greater " + "than 0.:-)\n"); + exit(1); + } + break; + case '?': /* getopt_long already printed an error message. */ exit(1); + default: abort(); + } /* switch */ + } +} /* ms_options_parse */ + + +static int ms_check_para() +{ + if (ms_setting.srv_str == NULL) + { + char *temp; + + if ((temp= getenv("MEMCACHED_SERVERS"))) + { + ms_setting.srv_str= strdup(temp); } + else + { + fprintf(stderr, "No Servers provided\n\n"); + return -1; + } + } + + if (ms_setting.nconns % (uint32_t)ms_setting.nthreads != 0) + { + fprintf(stderr, "Concurrency must be the multiples of threads count.\n"); + return -1; } - if (opt_test == GET_TEST && opt_createial_load == 0) - opt_createial_load= DEFAULT_INITIAL_LOAD; + if (ms_setting.win_size % UNIT_ITEMS_COUNT != 0) + { + fprintf(stderr, "Window size must be the multiples of 1024.\n\n"); + return -1; + } - if (opt_execute_number == 0) - opt_execute_number= DEFAULT_EXECUTE_NUMBER; + return 0; +} /* ms_check_para */ - if (opt_concurrency == 0) - opt_concurrency= DEFAULT_CONCURRENCY; -} -void conclusions_print(conclusions_st *conclusion) +/* initialize the statistic structure */ +static void ms_statistic_init() { - printf("\tThreads connecting to servers %u\n", opt_concurrency); -#ifdef NOT_FINISHED - printf("\tLoaded %u rows\n", conclusion->rows_loaded); - printf("\tRead %u rows\n", conclusion->rows_read); -#endif - if (opt_test == SET_TEST) - printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, - conclusion->load_time % 1000); - else - printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, - conclusion->read_time % 1000); -} + pthread_mutex_init(&ms_statistic.stat_mutex, NULL); + ms_init_stats(&ms_statistic.get_stat, "Get"); + ms_init_stats(&ms_statistic.set_stat, "Set"); + ms_init_stats(&ms_statistic.total_stat, "Total"); +} /* ms_statistic_init */ + + +/* initialize the global state structure */ +static void ms_stats_init() +{ + memset(&ms_stats, 0, sizeof(ms_stats_t)); + if (ms_setting.stat_freq > 0) + { + ms_statistic_init(); + } +} /* ms_stats_init */ + -void *run_task(void *p) +/* use to output the statistic */ +static void ms_print_statistics(int in_time) { - thread_context_st *context= (thread_context_st *)p; - memcached_st *memc; + int obj_size= (int)(ms_setting.avg_key_size + ms_setting.avg_val_size); - memc= context->memc; + printf("\033[1;1H\033[2J\n"); + ms_dump_format_stats(&ms_statistic.get_stat, in_time, + ms_setting.stat_freq, obj_size); + ms_dump_format_stats(&ms_statistic.set_stat, in_time, + ms_setting.stat_freq, obj_size); + ms_dump_format_stats(&ms_statistic.total_stat, in_time, + ms_setting.stat_freq, obj_size); +} /* ms_print_statistics */ - pthread_mutex_lock(&sleeper_mutex); - while (master_wakeup) + +/* used to print the states of memslap */ +static void ms_print_memslap_stats(struct timeval *start_time, + struct timeval *end_time) +{ + char buf[1024]; + char *pos= buf; + + pos+= sprintf(pos, "cmd_get: %zu\n", + ms_stats.cmd_get); + pos+= sprintf(pos, "cmd_set: %zu\n", + ms_stats.cmd_set); + pos+= sprintf(pos, "get_misses: %zu\n", + ms_stats.get_misses); + + if (ms_setting.verify_percent > 0) { - pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); - } - pthread_mutex_unlock(&sleeper_mutex); + pos+= sprintf(pos, "verify_misses: %zu\n", + ms_stats.vef_miss); + pos+= sprintf(pos, "verify_failed: %zu\n", + ms_stats.vef_failed); + } - /* Do Stuff */ - switch (context->test) + if (ms_setting.exp_ver_per > 0) { - case SET_TEST: - execute_set(memc, context->execute_pairs, context->execute_number); - break; - case GET_TEST: - execute_get(memc, context->initial_pairs, context->initial_number); - break; + pos+= sprintf(pos, "expired_get: %zu\n", + ms_stats.exp_get); + pos+= sprintf(pos, "unexpired_unget: %zu\n", + ms_stats.unexp_unget); } - memcached_free(memc); + pos+= sprintf(pos, + "written_bytes: %zu\n", + ms_stats.bytes_written); + pos+= sprintf(pos, "read_bytes: %zu\n", + ms_stats.bytes_read); + pos+= sprintf(pos, "object_bytes: %zu\n", + ms_stats.obj_bytes); - if (context->execute_pairs) - pairs_free(context->execute_pairs); + if (ms_setting.udp || ms_setting.facebook_test) + { + pos+= sprintf(pos, "packet_disorder: %zu\n", + ms_stats.pkt_disorder); + pos+= sprintf(pos, "packet_drop: %zu\n", + ms_stats.pkt_drop); + pos+= sprintf(pos, "udp_timeout: %zu\n", + ms_stats.udp_timeout); + } - free(context); + if (ms_setting.stat_freq > 0) + { + ms_dump_stats(&ms_statistic.get_stat); + ms_dump_stats(&ms_statistic.set_stat); + ms_dump_stats(&ms_statistic.total_stat); + } - pthread_mutex_lock(&counter_mutex); - thread_counter--; - pthread_cond_signal(&count_threshhold); - pthread_mutex_unlock(&counter_mutex); + int64_t time_diff= ms_time_diff(start_time, end_time); + pos+= sprintf( + pos, + "\nRun time: %.1fs Ops: %llu TPS: %.0Lf Net_rate: %.1fM/s\n", + (double)time_diff / 1000000, + (unsigned long long)(ms_stats.cmd_get + ms_stats.cmd_set), + (ms_stats.cmd_get + + ms_stats.cmd_set) / ((long double)time_diff / 1000000), + (double)( + ms_stats.bytes_written + + ms_stats.bytes_read) / 1024 / 1024 + / ((double)time_diff / 1000000)); + + fprintf(stdout, "%s", buf); + fflush(stdout); +} /* ms_print_memslap_stats */ + + +/* the loop of the main thread, wait the work threads to complete */ +static void ms_monitor_slap_mode() +{ + int second= 0; + struct timeval start_time, end_time; - return NULL; -} + /* Wait all the threads complete initialization. */ + pthread_mutex_lock(&ms_global.init_lock.lock); + while (ms_global.init_lock.count < ms_setting.nthreads) + { + pthread_cond_wait(&ms_global.init_lock.cond, + &ms_global.init_lock.lock); + } + pthread_mutex_unlock(&ms_global.init_lock.lock); -void flush_all(memcached_st *memc) -{ - memcached_flush(memc, 0); -} + /* only when there is no set operation it need warm up */ + if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) + { + /* Wait all the connects complete warm up. */ + pthread_mutex_lock(&ms_global.warmup_lock.lock); + while (ms_global.warmup_lock.count < ms_setting.nconns) + { + pthread_cond_wait(&ms_global.warmup_lock.cond, &ms_global.warmup_lock.lock); + } + pthread_mutex_unlock(&ms_global.warmup_lock.lock); + } + ms_global.finish_warmup= true; + + /* running in "run time" mode, user specify run time */ + if (ms_setting.run_time > 0) + { + gettimeofday(&start_time, NULL); + while (1) + { + sleep(1); + second++; + + if ((ms_setting.stat_freq > 0) && (second % ms_setting.stat_freq == 0) + && (ms_stats.active_conns >= ms_setting.nconns) + && (ms_stats.active_conns <= INT_MAX)) + { + ms_print_statistics(second); + } + + if (ms_setting.run_time <= second) + { + ms_global.time_out= true; + break; + } + + /* all connections disconnect */ + if ((second > 5) && (ms_stats.active_conns == 0)) + { + break; + } + } + gettimeofday(&end_time, NULL); + sleep(1); /* wait all threads clean up */ + } + else + { + /* running in "execute number" mode, user specify execute number */ + gettimeofday(&start_time, NULL); + + /* + * We loop until we know that all connects have cleaned up. + */ + pthread_mutex_lock(&ms_global.run_lock.lock); + while (ms_global.run_lock.count < ms_setting.nconns) + { + pthread_cond_wait(&ms_global.run_lock.cond, &ms_global.run_lock.lock); + } + pthread_mutex_unlock(&ms_global.run_lock.lock); + + gettimeofday(&end_time, NULL); + } -pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, - unsigned int *actual_loaded) + ms_print_memslap_stats(&start_time, &end_time); +} /* ms_monitor_slap_mode */ + + +/* the main function */ +int main(int argc, char *argv[]) { - memcached_st *clone; - pairs_st *pairs; + srandom((unsigned int)time(NULL)); + ms_global_struct_init(); - clone= memcached_clone(NULL, memc); - /* We always used non-blocking IO for load since it is faster */ - memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0); + /* initialization */ + ms_setting_init_pre(); + ms_options_parse(argc, argv); + if (ms_check_para()) + { + ms_help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION); + exit(1); + } + ms_setting_init_post(); + ms_stats_init(); + ms_thread_init(); - pairs= pairs_generate(number_of, 400); - *actual_loaded= execute_set(clone, pairs, number_of); + /* waiting work thread complete its task */ + ms_monitor_slap_mode(); - memcached_free(clone); + /* clean up */ + ms_thread_cleanup(); + ms_global_struct_destroy(); + ms_setting_cleanup(); - return pairs; -} + return 0; +} /* main */