X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=src%2Fbin%2Fmemslap.cc;h=d1c58cd6030e33d70e222634af76826180f6ce33;hb=e1ba5b9e4eb179295274026ad8fd40a68eb6c67f;hp=a9fb731ff09c671e7a2fea2a0eef186399d0f7ee;hpb=af58c4c6067fa41da3b4c8f01200065226070e37;p=awesomized%2Flibmemcached diff --git a/src/bin/memslap.cc b/src/bin/memslap.cc index a9fb731f..d1c58cd6 100644 --- a/src/bin/memslap.cc +++ b/src/bin/memslap.cc @@ -15,586 +15,369 @@ #include "mem_config.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "libmemcached-1.0/memcached.h" - -#include "client_options.h" -#include "utilities.h" -#include "generator.h" - -#define DEFAULT_INITIAL_LOAD 10000 -#define DEFAULT_EXECUTE_NUMBER 10000 -#define DEFAULT_CONCURRENCY 1 - -#define VALUE_BYTES 4096 - #define PROGRAM_NAME "memslap" -#define PROGRAM_DESCRIPTION "Generates a load against a memcached cluster of servers." - -/* Global Thread counter */ -volatile unsigned int master_wakeup; -pthread_mutex_t sleeper_mutex; -pthread_cond_t sleep_threshhold; - -/* Types */ -enum test_t { SET_TEST, GET_TEST, MGET_TEST }; - -struct thread_context_st { - unsigned int key_count; - pairs_st *initial_pairs; - unsigned int initial_number; - pairs_st *execute_pairs; - unsigned int execute_number; - char **keys; - size_t *key_lengths; - test_t test; - memcached_st *memc; - const memcached_st *root; - - thread_context_st(const memcached_st *memc_arg, test_t test_arg) - : key_count(0) - , initial_pairs(NULL) - , initial_number(0) - , execute_pairs(NULL) - , execute_number(0) - , keys(0) - , key_lengths(NULL) - , test(test_arg) - , memc(NULL) - , root(memc_arg) {} - - void init() { - memc = memcached_clone(NULL, root); - } +#define PROGRAM_DESCRIPTION "Generate load against a cluster of memcached servers." +#define PROGRAM_VERSION "1.1" - ~thread_context_st() { - if (execute_pairs) { - pairs_free(execute_pairs); - } - memcached_free(memc); - } -}; +#define DEFAULT_INITIAL_LOAD 10000ul +#define DEFAULT_EXECUTE_NUMBER 10000ul +#define DEFAULT_CONCURRENCY 1ul -struct conclusions_st { - long int load_time; - long int read_time; - unsigned int rows_loaded; - unsigned int rows_read; - - conclusions_st() - : load_time(0) - , read_time(0) - , rows_loaded(0) - , rows_read() {} -}; +#include "common/options.hpp" +#include "common/checks.hpp" +#include "common/time.hpp" +#include "common/random.hpp" -/* Prototypes */ -void options_parse(int argc, char *argv[]); -void conclusions_print(conclusions_st *conclusion); -void scheduler(memcached_server_st *servers, conclusions_st *conclusion); -pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded); -void flush_all(memcached_st *memc); - -static bool opt_binary = 0; -static int opt_verbose = 0; -static int opt_flush = 0; -static int opt_non_blocking_io = 0; -static int opt_tcp_nodelay = 0; -static unsigned int opt_execute_number = 0; -static unsigned int opt_createial_load = 0; -static unsigned int opt_concurrency = 0; -static int opt_displayflag = 0; -static char *opt_servers = NULL; -static bool opt_udp_io = false; -test_t opt_test = SET_TEST; - - -unsigned int execute_set(memcached_st *memc, pairs_st *pairs, unsigned int number_of) { - uint32_t count = 0; - for (; count < number_of; ++count) { - memcached_return_t rc = memcached_set(memc, pairs[count].key, pairs[count].key_length, - pairs[count].value, pairs[count].value_length, 0, 0); - if (memcached_failed(rc)) { - fprintf(stderr, "%s:%d Failure on %u insert (%s) of %.*s\n", __FILE__, __LINE__, count, - memcached_last_error_message(memc), (unsigned int) pairs[count].key_length, - pairs[count].key); - - // We will try to reconnect and see if that fixes the issue - memcached_quit(memc); - - return count; - } - } +#include +#include +#include - return count; -} +static std::atomic_bool wakeup; -/* - Execute a memcached_get() on a set of pairs. - Return the number of rows retrieved. -*/ -static unsigned int execute_get(memcached_st *memc, pairs_st *pairs, unsigned int number_of) { - unsigned int x; - unsigned int retrieved; +static memcached_return_t counter(const memcached_st *, memcached_result_st *, void *ctx) { + auto c = static_cast(ctx); + ++(*c); + return MEMCACHED_SUCCESS; +} - for (retrieved = 0, x = 0; x < number_of; x++) { - size_t value_length; - uint32_t flags; +struct keyval_st { + struct data { + std::vector chr; + std::vector len; + explicit data(size_t num) + : chr(num) + , len(num) + {} + }; - unsigned int fetch_key = (unsigned int) ((unsigned int) random() % number_of); + data key; + data val; - memcached_return_t rc; - char *value = memcached_get(memc, pairs[fetch_key].key, pairs[fetch_key].key_length, - &value_length, &flags, &rc); + size_t num; + random64 rnd; - if (memcached_failed(rc)) { - fprintf(stderr, "%s:%d Failure on read(%s) of %.*s\n", __FILE__, __LINE__, - memcached_last_error_message(memc), (unsigned int) pairs[fetch_key].key_length, - pairs[fetch_key].key); - } else { - retrieved++; + explicit keyval_st(size_t num_) + : key{num_} + , val{num_} + , num{num_} + , rnd{} + { + for (auto i = 0u; i < num; ++i) { + gen(key.chr[i], key.len[i], val.chr[i], val.len[i]); } + } - ::free(value); + ~keyval_st() { + for (auto i = 0u; i < num; ++i) { + delete [] key.chr[i]; + delete [] val.chr[i]; + } } - return retrieved; -} +private: + void gen_key(char *&key_chr, size_t &key_len) { + key_len = rnd(20,60); + key_chr = new char[key_len + 1]; + rnd.fill(key_chr, key_len); + key_chr[key_len] = 0; + } + void gen_val(const char *key_chr, const size_t key_len, char *&val_chr, size_t &val_len) { + val_len = rnd(50, 5000); + val_chr = new char[val_len]; + + for (auto len = 0u; len < val_len; len += key_len) { + auto val_pos = val_chr + len; + auto rem_len = len + key_len > val_len ? val_len % key_len : key_len; + memcpy(val_pos, key_chr, rem_len); + } + } + void gen(char *&key_chr, size_t &key_len, char *&val_chr, size_t &val_len) { + gen_key(key_chr, key_len); + gen_val(key_chr, key_len, val_chr, val_len); + } +}; -/** - * Callback function to count the number of results - */ -static memcached_return_t callback_counter(const memcached_st *ptr, memcached_result_st *result, - void *context) { - (void) ptr; - (void) result; - unsigned int *counter = (unsigned int *) context; - *counter = *counter + 1; +static size_t execute_get(const client_options &opt, memcached_st &memc, const keyval_st &kv) { + size_t retrieved = 0; + random64 rnd{}; - return MEMCACHED_SUCCESS; -} + for (auto i = 0u; i < kv.num; ++i) { + memcached_return_t rc; + auto r = rnd(0, kv.num); + free(memcached_get(&memc, kv.key.chr[r], kv.key.len[r], nullptr, nullptr, &rc)); -/** - * Try to run a large mget to get all of the keys - * @param memc memcached handle - * @param keys the keys to get - * @param key_length the length of the keys - * @param number_of the number of keys to try to get - * @return the number of keys received - */ -static unsigned int execute_mget(memcached_st *memc, const char *const *keys, size_t *key_length, - unsigned int number_of) { - unsigned int retrieved = 0; - memcached_execute_fn callbacks[] = {callback_counter}; - memcached_return_t rc; - rc = memcached_mget_execute(memc, keys, key_length, (size_t) number_of, callbacks, &retrieved, 1); - - if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_NOTFOUND || rc == MEMCACHED_BUFFERED - || rc == MEMCACHED_END) - { - rc = memcached_fetch_execute(memc, callbacks, (void *) &retrieved, 1); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND && rc != MEMCACHED_END) { - fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__, - memcached_strerror(memc, rc)); - memcached_quit(memc); - return 0; + if (check_return(opt, memc, kv.key.chr[r], rc)) { + ++retrieved; } - } else { - fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__, - memcached_strerror(memc, rc)); - memcached_quit(memc); - return 0; } return retrieved; } -extern "C" { +static size_t execute_mget(const client_options &opt, memcached_st &memc, const keyval_st &kv) { + size_t retrieved = 0; + memcached_execute_fn cb[] = {&counter}; -static __attribute__((noreturn)) void *run_task(void *p) { - thread_context_st *context = (thread_context_st *) p; + auto rc = memcached_mget_execute(&memc, kv.key.chr.data(), kv.key.len.data(), kv.num, cb, &retrieved, 1); - context->init(); - - pthread_mutex_lock(&sleeper_mutex); - while (master_wakeup) { - pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); - } - pthread_mutex_unlock(&sleeper_mutex); - - /* Do Stuff */ - switch (context->test) { - case SET_TEST: - assert(context->execute_pairs); - execute_set(context->memc, context->execute_pairs, context->execute_number); - break; - - case GET_TEST: - execute_get(context->memc, context->initial_pairs, context->initial_number); - break; - - case MGET_TEST: - execute_mget(context->memc, (const char *const *) context->keys, context->key_lengths, - context->initial_number); - break; + while (rc != MEMCACHED_END && memcached_success(rc)) { + rc = memcached_fetch_execute(&memc, cb, &retrieved, 1); } - delete context; - - pthread_exit(0); -} - -} // extern "C" - - -int main(int argc, char *argv[]) { - conclusions_st conclusion; - - srandom((unsigned int) time(NULL)); - options_parse(argc, argv); - - if (opt_servers == NULL) { - char *temp; - - if ((temp = getenv("MEMCACHED_SERVERS"))) { - opt_servers = strdup(temp); - } - - if (opt_servers == NULL) { - std::cerr << "No servers provided" << std::endl; - exit(EXIT_FAILURE); + if (memcached_fatal(rc)) { + if (!opt.isset("quiet")) { + std::cerr << "Failed mget: " << memcached_strerror(&memc, rc) << ": " + << memcached_last_error_message(&memc); } } + return retrieved; +} - memcached_server_st *servers = memcached_servers_parse(opt_servers); - if (servers == NULL or memcached_server_list_count(servers) == 0) { - std::cerr << "Invalid server list provided:" << opt_servers << std::endl; - return EXIT_FAILURE; - } - - pthread_mutex_init(&sleeper_mutex, NULL); - pthread_cond_init(&sleep_threshhold, NULL); +static size_t execute_set(const client_options &opt, memcached_st &memc, const keyval_st &kv) { + for (auto i = 0u; i < kv.num; ++i) { + auto rc = memcached_set(&memc, kv.key.chr[i], kv.key.len[i], kv.val.chr[i], kv.val.len[i], 0, 0); - int error_code = EXIT_SUCCESS; - try { - scheduler(servers, &conclusion); - } catch (std::exception &e) { - std::cerr << "Died with exception: " << e.what() << std::endl; - error_code = EXIT_FAILURE; + if (!check_return(opt, memc, kv.key.chr[i], rc)) { + memcached_quit(&memc); + return i; + } } - free(opt_servers); - - (void) pthread_mutex_destroy(&sleeper_mutex); - (void) pthread_cond_destroy(&sleep_threshhold); - conclusions_print(&conclusion); - memcached_server_list_free(servers); - - return error_code; + return kv.num; } -void scheduler(memcached_server_st *servers, conclusions_st *conclusion) { - unsigned int actual_loaded = 0; /* Fix warning */ - - struct timeval start_time, end_time; - pairs_st *pairs = NULL; - - memcached_st *memc = memcached_create(NULL); - - memcached_server_push(memc, servers); +class thread_context { +public: + thread_context(const client_options &opt_, const memcached_st &memc_, const keyval_st &kv_) + : opt{opt_} + , kv{kv_} + , count{} + , root(memc_) + , memc{} + , thread([this]{ execute(); }) + {} + + ~thread_context() { + memcached_free(&memc); + } - /* We need to set udp behavior before adding servers to the client */ - if (opt_udp_io) { - if (memcached_failed(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, opt_udp_io))) { - std::cerr << "Failed to enable UDP." << std::endl; - memcached_free(memc); - exit(EXIT_FAILURE); - } + size_t complete() { + thread.join(); + return count; } - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, (uint64_t) opt_binary); +private: + const client_options &opt; + const keyval_st &kv; + size_t count; + const memcached_st &root; + memcached_st memc; + std::thread thread; - if (opt_flush) { - flush_all(memc); - } + void execute() { + memcached_clone(&memc, &root); - if (opt_createial_load) { - pairs = load_create_data(memc, opt_createial_load, &actual_loaded); - } + while (!wakeup.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } - char **keys = static_cast(calloc(actual_loaded, sizeof(char *))); - size_t *key_lengths = static_cast(calloc(actual_loaded, sizeof(size_t))); - - if (keys == NULL or key_lengths == NULL) { - free(keys); - free(key_lengths); - keys = NULL; - key_lengths = NULL; - } else { - for (uint32_t x = 0; x < actual_loaded; ++x) { - keys[x] = pairs[x].key; - key_lengths[x] = pairs[x].key_length; + if (!strcmp("set", opt.argof("test"))) { + count = execute_set(opt, memc, kv); + } else if (!strcmp("mget", opt.argof("test"))) { + count = execute_mget(opt, memc, kv); + } else { + if (strcmp("get", opt.argof("test"))) { + if (!opt.isset("quiet")) { + std::cerr << "Unknown --test: '" << opt.argof("test") << "'.\n"; + } + } + count = execute_get(opt, memc, kv); } } +}; - /* We set this after we have loaded */ - { - if (opt_non_blocking_io) - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1); - - if (opt_tcp_nodelay) - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1); - } +using opt_apply = std::function; - pthread_mutex_lock(&sleeper_mutex); - master_wakeup = 1; - pthread_mutex_unlock(&sleeper_mutex); +static opt_apply wrap_stoul(unsigned long &ul) { + return [&ul](const client_options &, const client_options::extended_option &ext, memcached_st *) { + if (ext.arg && *ext.arg) { + auto c = std::stoul(ext.arg); + if (c) { + ul = c; + } + } + return true; + }; +} - pthread_t *threads = new (std::nothrow) pthread_t[opt_concurrency]; +int main(int argc, char *argv[]) { + client_options opt{PROGRAM_NAME, PROGRAM_VERSION, PROGRAM_DESCRIPTION}; + auto concurrency = DEFAULT_CONCURRENCY; + auto load_count = DEFAULT_INITIAL_LOAD; + auto test_count = DEFAULT_EXECUTE_NUMBER; - if (threads == NULL) { - exit(EXIT_FAILURE); + for (const auto &def : opt.defaults) { + opt.add(def); } - for (uint32_t x = 0; x < opt_concurrency; x++) { - thread_context_st *context = new thread_context_st(memc, opt_test); - context->test = opt_test; - - context->initial_pairs = pairs; - context->initial_number = actual_loaded; - context->keys = keys; - context->key_lengths = key_lengths; - - if (opt_test == SET_TEST) { - context->execute_pairs = pairs_generate(opt_execute_number, VALUE_BYTES); - context->execute_number = opt_execute_number; + opt.add("noreply", 'R', no_argument, "Enable the NOREPLY behavior for storage commands.") + .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) { + if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, ext.set)) { + if(!opt_.isset("quiet")) { + std::cerr << memcached_last_error_message(memc); + } + return false; } - - /* now you create the thread */ - if (pthread_create(threads + x, NULL, run_task, (void *) context)) { - fprintf(stderr, "Could not create thread\n"); - exit(1); + return true; + }; + opt.add("udp", 'U', no_argument, "Use UDP.") + .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) { + if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, ext.set)) { + if (!opt_.isset("quiet")) { + std::cerr << memcached_last_error_message(memc) << "\n"; + } + return false; } + return true; + }; + opt.add("flush", 'F', no_argument, "Flush all servers prior test."); + opt.add("test", 't', required_argument, "Test to perform (options: get,mget,set; default: get)."); + opt.add("concurrency", 'c', required_argument, "Concurrency (number of threads to start; default: 1).") + .apply = wrap_stoul(concurrency); + opt.add("execute-number", 'e', required_argument, "Number of times to execute the tests (default: 10000).") + .apply = wrap_stoul(test_count); + opt.add("initial-load", 'l', required_argument, "Number of keys to load before executing tests (default: 10000)." + "\n\t\tDEPRECATED: --execute-number takes precedence.") + .apply = wrap_stoul(load_count); + + char set[] = "set"; + opt.set("test", true, set); + + if (!opt.parse(argc, argv)) { + exit(EXIT_FAILURE); } - pthread_mutex_lock(&sleeper_mutex); - master_wakeup = 0; - pthread_mutex_unlock(&sleeper_mutex); - pthread_cond_broadcast(&sleep_threshhold); - gettimeofday(&start_time, NULL); - - for (uint32_t x = 0; x < opt_concurrency; x++) { - void *retval; - pthread_join(threads[x], &retval); + memcached_st memc; + if (!check_memcached(opt, memc)) { + exit(EXIT_FAILURE); } - delete[] threads; - - gettimeofday(&end_time, NULL); - conclusion->load_time = timedif(end_time, start_time); - conclusion->read_time = timedif(end_time, start_time); - free(keys); - free(key_lengths); - pairs_free(pairs); - memcached_free(memc); -} + if (!opt.apply(&memc)) { + memcached_free(&memc); + exit(EXIT_FAILURE); + } -void options_parse(int argc, char *argv[]) { - memcached_programs_help_st help_options[] = { - {0}, - }; + auto total_start = time_clock::now(); + std::cout << std::fixed << std::setprecision(3); - static struct option long_options[] = { - {(OPTIONSTRING) "concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY}, - {(OPTIONSTRING) "debug", no_argument, &opt_verbose, OPT_DEBUG}, - {(OPTIONSTRING) "quiet", no_argument, NULL, OPT_QUIET}, - {(OPTIONSTRING) "execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER}, - {(OPTIONSTRING) "flag", no_argument, &opt_displayflag, OPT_FLAG}, - {(OPTIONSTRING) "flush", no_argument, &opt_flush, OPT_FLUSH}, - {(OPTIONSTRING) "help", no_argument, NULL, OPT_HELP}, - {(OPTIONSTRING) "initial-load", required_argument, NULL, - OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */ - {(OPTIONSTRING) "non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK}, - {(OPTIONSTRING) "servers", required_argument, NULL, OPT_SERVERS}, - {(OPTIONSTRING) "tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY}, - {(OPTIONSTRING) "test", required_argument, NULL, OPT_SLAP_TEST}, - {(OPTIONSTRING) "verbose", no_argument, &opt_verbose, OPT_VERBOSE}, - {(OPTIONSTRING) "version", no_argument, NULL, OPT_VERSION}, - {(OPTIONSTRING) "binary", no_argument, NULL, OPT_BINARY}, - {(OPTIONSTRING) "udp", no_argument, NULL, OPT_UDP}, - {0, 0, 0, 0}, + auto align = [](std::ostream &io) -> std::ostream &{ + return io << std::right << std::setw(8); }; - bool opt_help = false; - bool opt_version = false; - int option_index = 0; - while (1) { - int option_rv = getopt_long(argc, argv, "Vhvds:", long_options, &option_index); - - if (option_rv == -1) - break; - - switch (option_rv) { - case 0: - break; - - case OPT_UDP: - if (opt_test == GET_TEST) { - fprintf(stderr, - "You can not run a get test in UDP mode. UDP mode " - "does not currently support get ops.\n"); - exit(1); - } - opt_udp_io = true; - break; - - case OPT_BINARY: - opt_binary = true; - break; - - case OPT_VERBOSE: /* --verbose or -v */ - opt_verbose = OPT_VERBOSE; - break; - - case OPT_DEBUG: /* --debug or -d */ - opt_verbose = OPT_DEBUG; - break; - - case OPT_VERSION: /* --version or -V */ - opt_version = true; - break; - - case OPT_HELP: /* --help or -h */ - opt_help = true; - break; - - case OPT_SERVERS: /* --servers or -s */ - opt_servers = strdup(optarg); - break; - - case OPT_SLAP_TEST: - if (strcmp(optarg, "get") == 0) { - if (opt_udp_io == 1) { - fprintf(stderr, - "You can not run a get test in UDP mode. UDP mode " - "does not currently support get ops.\n"); - exit(EXIT_FAILURE); - } - opt_test = GET_TEST; - } else if (strcmp(optarg, "set") == 0) { - opt_test = SET_TEST; - } else if (strcmp(optarg, "mget") == 0) { - opt_test = MGET_TEST; - } else { - fprintf(stderr, "Your test, %s, is not a known test\n", optarg); - exit(EXIT_FAILURE); - } - break; - - case OPT_SLAP_CONCURRENCY: - errno = 0; - opt_concurrency = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno) { - fprintf(stderr, "Invalid value for concurrency: %s\n", optarg); - exit(EXIT_FAILURE); - } - break; - - case OPT_SLAP_EXECUTE_NUMBER: - errno = 0; - opt_execute_number = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno) { - fprintf(stderr, "Invalid value for execute: %s\n", optarg); - exit(EXIT_FAILURE); - } - break; - - case OPT_SLAP_INITIAL_LOAD: - errno = 0; - opt_createial_load = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno) { - fprintf(stderr, "Invalid value for initial load: %s\n", optarg); - exit(EXIT_FAILURE); + if (opt.isset("flush")) { + if (opt.isset("verbose")) { + std::cout << "- Flushing servers ...\n"; + } + auto flush_start = time_clock::now(); + auto rc = memcached_flush(&memc, 0); + auto flush_elapsed = time_clock::now() - flush_start; + if (!memcached_success(rc)) { + if (!opt.isset("quiet")) { + std::cerr << "Failed to FLUSH: " << memcached_last_error_message(&memc) << "\n"; } - break; - - case OPT_QUIET: - close_stdio(); - break; - - case '?': - /* getopt_long already printed an error message. */ + memcached_free(&memc); exit(EXIT_FAILURE); - - default: - abort(); + } + if (!opt.isset("quiet")) { + std::cout << "Time to flush " << align + << memcached_server_count(&memc) + << " servers: " + << align << time_format(flush_elapsed).count() + << " seconds.\n"; } } - if (opt_version) { - version_command(PROGRAM_NAME); - exit(EXIT_SUCCESS); + if (opt.isset("verbose")) { + std::cout << "- Generating random test data ...\n"; } - - if (opt_help) { - help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options); - exit(EXIT_SUCCESS); + auto keyval_start = time_clock::now(); + keyval_st kv{test_count}; + auto keyval_elapsed = time_clock::now() - keyval_start; + + if (!opt.isset("quiet")) { + std::cout << "Time to generate " + << align << test_count + << " test keys: " + << align << time_format(keyval_elapsed).count() + << " seconds.\n"; } - if ((opt_test == GET_TEST or opt_test == MGET_TEST) and opt_createial_load == 0) - opt_createial_load = DEFAULT_INITIAL_LOAD; - - if (opt_execute_number == 0) - opt_execute_number = DEFAULT_EXECUTE_NUMBER; - - if (opt_concurrency == 0) - opt_concurrency = DEFAULT_CONCURRENCY; -} - -void conclusions_print(conclusions_st *conclusion) { - printf("\tThreads connecting to servers %u\n", opt_concurrency); -#ifdef NOT_FINISHED - printf("\tLoaded %u rows\n", conclusion->rows_loaded); - printf("\tRead %u rows\n", conclusion->rows_read); -#endif - if (opt_test == SET_TEST) - printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, - conclusion->load_time % 1000); - else - printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, - conclusion->read_time % 1000); -} - -void flush_all(memcached_st *memc) { - memcached_flush(memc, 0); -} - -pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, - unsigned int *actual_loaded) { - memcached_st *memc_clone = memcached_clone(NULL, memc); - /* We always used non-blocking IO for load since it is faster */ - memcached_behavior_set(memc_clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0); - - pairs_st *pairs = pairs_generate(number_of, VALUE_BYTES); - *actual_loaded = execute_set(memc_clone, pairs, number_of); - - memcached_free(memc_clone); + if (strcmp(opt.argof("test"), "set")) { + if (opt.isset("verbose")) { + std::cout << "- Feeding initial load to servers ...\n"; + } + auto feed_start = time_clock::now(); + auto count = execute_set(opt, memc, kv); + check_return(opt, memc, memcached_flush_buffers(&memc)); + auto feed_elapsed = time_clock::now() - feed_start; + + if (!opt.isset("quiet")) { + std::cout << "Time to set " + << align << count + << " keys: " + << align << time_format(feed_elapsed).count() + << " seconds.\n"; + } + } - return pairs; + if (opt.isset("verbose")) { + std::cout << "- Starting " << concurrency << " threads ...\n"; + } + auto thread_start = time_clock::now(); + std::vector threads{}; + threads.reserve(concurrency); + for (auto i = 0ul; i < concurrency; ++i) { + threads.push_back(new thread_context(opt, memc, kv)); + } + auto thread_elapsed = time_clock::now() - thread_start; + if (!opt.isset("quiet")) { + std::cout << "Time to start " + << align << concurrency + << " threads: " + << time_format(thread_elapsed).count() + << " seconds.\n"; + } + if (opt.isset("verbose")) { + std::cout << "- Starting test: " << test_count + << " x " << opt.argof("test") + << " x " << concurrency + << " ...\n"; + } + auto count = 0ul; + auto test_start = time_clock::now(); + wakeup.store(true, std::memory_order_release); + for (auto &thread : threads) { + count += thread->complete(); + delete thread; + } + auto test_elapsed = time_clock::now() - test_start; + + if (!opt.isset("quiet")) { + std::cout << "--------------------------------------------------------------------\n" + << "Time to " << std::left << std::setw(4) + << opt.argof("test") << " " + << align << count + << " keys by " + << std::setw(4) + << concurrency << " threads: " + << align << time_format(test_elapsed).count() + << " seconds.\n"; + + std::cout << "--------------------------------------------------------------------\n" + << "Time total: " + << align << std::setw(12) + << time_format(time_clock::now() - total_start).count() + << " seconds.\n"; + } + exit(EXIT_SUCCESS); }