From: Michael Wallner Date: Fri, 20 Nov 2020 10:16:29 +0000 (+0100) Subject: bin: consolidate clients X-Git-Tag: 1.1.0-beta1~97 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=e1ba5b9e4eb179295274026ad8fd40a68eb6c67f;p=m6w6%2Flibmemcached bin: consolidate clients --- diff --git a/src/bin/common/checks.hpp b/src/bin/common/checks.hpp index dc0ed92c..9c963522 100644 --- a/src/bin/common/checks.hpp +++ b/src/bin/common/checks.hpp @@ -54,6 +54,22 @@ bool check_memcached(const client_options &opt, memcached_st &memc) { return true; } +bool check_return(const client_options &opt, memcached_st &memc, memcached_return_t rc) { + if (!memcached_success(rc)) { + if (!opt.isset("quiet")) { + if (!memcached_fatal(rc)) { + if (opt.isset("verbose")) { + std::cerr << "Failed: " << memcached_strerror(&memc, rc) << "\n";; + } + } else { + std::cerr << "Fatal error: " << memcached_last_error_message(&memc) << "\n"; + } + } + return false; + } + return true; +} + bool check_return(const client_options &opt, memcached_st &memc, const char *key, memcached_return_t rc) { if (!memcached_success(rc)) { diff --git a/src/bin/common/random.hpp b/src/bin/common/random.hpp new file mode 100644 index 00000000..eab844f1 --- /dev/null +++ b/src/bin/common/random.hpp @@ -0,0 +1,44 @@ +/* + +--------------------------------------------------------------------+ + | libmemcached - C/C++ Client Library for memcached | + +--------------------------------------------------------------------+ + | Redistribution and use in source and binary forms, with or without | + | modification, are permitted under the terms of the BSD license. | + | You should have received a copy of the license in a bundled file | + | named LICENSE; in case you did not receive a copy you can review | + | the terms online at: https://opensource.org/licenses/BSD-3-Clause | + +--------------------------------------------------------------------+ + | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ | + | Copyright (c) 2020 Michael Wallner | + +--------------------------------------------------------------------+ +*/ + +#pragma once + +#include "time.hpp" +#include + +class random64 { +public: + using typ = std::mt19937_64::result_type; + + random64() + : gen{static_cast(time_clock::now().time_since_epoch().count())} + , dst{} + {} + + typ operator()(typ min = 0, typ max = std::numeric_limits::max()) { + return (dst(gen) % (max - min)) + min; + } + + void fill(char *buf, size_t len, + const std::string &set = "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz") { + for (auto i = 0ul; i < len; ++i) { + buf[i] = set[(*this)(0, set.length()-1)]; + } + } + +private: + std::mt19937_64 gen; + std::uniform_int_distribution dst; +}; diff --git a/src/bin/common/time.hpp b/src/bin/common/time.hpp new file mode 100644 index 00000000..18eee30c --- /dev/null +++ b/src/bin/common/time.hpp @@ -0,0 +1,21 @@ +/* + +--------------------------------------------------------------------+ + | libmemcached - C/C++ Client Library for memcached | + +--------------------------------------------------------------------+ + | Redistribution and use in source and binary forms, with or without | + | modification, are permitted under the terms of the BSD license. | + | You should have received a copy of the license in a bundled file | + | named LICENSE; in case you did not receive a copy you can review | + | the terms online at: https://opensource.org/licenses/BSD-3-Clause | + +--------------------------------------------------------------------+ + | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ | + | Copyright (c) 2020 Michael Wallner | + +--------------------------------------------------------------------+ +*/ + +#include + +using time_clock = std::chrono::high_resolution_clock; +using time_point = std::chrono::time_point; +using time_format = std::chrono::duration>; +using time_format_ms = std::chrono::duration>; diff --git a/src/bin/memslap.cc b/src/bin/memslap.cc index a9fb731f..d1c58cd6 100644 --- a/src/bin/memslap.cc +++ b/src/bin/memslap.cc @@ -15,586 +15,369 @@ #include "mem_config.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "libmemcached-1.0/memcached.h" - -#include "client_options.h" -#include "utilities.h" -#include "generator.h" - -#define DEFAULT_INITIAL_LOAD 10000 -#define DEFAULT_EXECUTE_NUMBER 10000 -#define DEFAULT_CONCURRENCY 1 - -#define VALUE_BYTES 4096 - #define PROGRAM_NAME "memslap" -#define PROGRAM_DESCRIPTION "Generates a load against a memcached cluster of servers." - -/* Global Thread counter */ -volatile unsigned int master_wakeup; -pthread_mutex_t sleeper_mutex; -pthread_cond_t sleep_threshhold; - -/* Types */ -enum test_t { SET_TEST, GET_TEST, MGET_TEST }; - -struct thread_context_st { - unsigned int key_count; - pairs_st *initial_pairs; - unsigned int initial_number; - pairs_st *execute_pairs; - unsigned int execute_number; - char **keys; - size_t *key_lengths; - test_t test; - memcached_st *memc; - const memcached_st *root; - - thread_context_st(const memcached_st *memc_arg, test_t test_arg) - : key_count(0) - , initial_pairs(NULL) - , initial_number(0) - , execute_pairs(NULL) - , execute_number(0) - , keys(0) - , key_lengths(NULL) - , test(test_arg) - , memc(NULL) - , root(memc_arg) {} - - void init() { - memc = memcached_clone(NULL, root); - } +#define PROGRAM_DESCRIPTION "Generate load against a cluster of memcached servers." +#define PROGRAM_VERSION "1.1" - ~thread_context_st() { - if (execute_pairs) { - pairs_free(execute_pairs); - } - memcached_free(memc); - } -}; +#define DEFAULT_INITIAL_LOAD 10000ul +#define DEFAULT_EXECUTE_NUMBER 10000ul +#define DEFAULT_CONCURRENCY 1ul -struct conclusions_st { - long int load_time; - long int read_time; - unsigned int rows_loaded; - unsigned int rows_read; - - conclusions_st() - : load_time(0) - , read_time(0) - , rows_loaded(0) - , rows_read() {} -}; +#include "common/options.hpp" +#include "common/checks.hpp" +#include "common/time.hpp" +#include "common/random.hpp" -/* Prototypes */ -void options_parse(int argc, char *argv[]); -void conclusions_print(conclusions_st *conclusion); -void scheduler(memcached_server_st *servers, conclusions_st *conclusion); -pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded); -void flush_all(memcached_st *memc); - -static bool opt_binary = 0; -static int opt_verbose = 0; -static int opt_flush = 0; -static int opt_non_blocking_io = 0; -static int opt_tcp_nodelay = 0; -static unsigned int opt_execute_number = 0; -static unsigned int opt_createial_load = 0; -static unsigned int opt_concurrency = 0; -static int opt_displayflag = 0; -static char *opt_servers = NULL; -static bool opt_udp_io = false; -test_t opt_test = SET_TEST; - - -unsigned int execute_set(memcached_st *memc, pairs_st *pairs, unsigned int number_of) { - uint32_t count = 0; - for (; count < number_of; ++count) { - memcached_return_t rc = memcached_set(memc, pairs[count].key, pairs[count].key_length, - pairs[count].value, pairs[count].value_length, 0, 0); - if (memcached_failed(rc)) { - fprintf(stderr, "%s:%d Failure on %u insert (%s) of %.*s\n", __FILE__, __LINE__, count, - memcached_last_error_message(memc), (unsigned int) pairs[count].key_length, - pairs[count].key); - - // We will try to reconnect and see if that fixes the issue - memcached_quit(memc); - - return count; - } - } +#include +#include +#include - return count; -} +static std::atomic_bool wakeup; -/* - Execute a memcached_get() on a set of pairs. - Return the number of rows retrieved. -*/ -static unsigned int execute_get(memcached_st *memc, pairs_st *pairs, unsigned int number_of) { - unsigned int x; - unsigned int retrieved; +static memcached_return_t counter(const memcached_st *, memcached_result_st *, void *ctx) { + auto c = static_cast(ctx); + ++(*c); + return MEMCACHED_SUCCESS; +} - for (retrieved = 0, x = 0; x < number_of; x++) { - size_t value_length; - uint32_t flags; +struct keyval_st { + struct data { + std::vector chr; + std::vector len; + explicit data(size_t num) + : chr(num) + , len(num) + {} + }; - unsigned int fetch_key = (unsigned int) ((unsigned int) random() % number_of); + data key; + data val; - memcached_return_t rc; - char *value = memcached_get(memc, pairs[fetch_key].key, pairs[fetch_key].key_length, - &value_length, &flags, &rc); + size_t num; + random64 rnd; - if (memcached_failed(rc)) { - fprintf(stderr, "%s:%d Failure on read(%s) of %.*s\n", __FILE__, __LINE__, - memcached_last_error_message(memc), (unsigned int) pairs[fetch_key].key_length, - pairs[fetch_key].key); - } else { - retrieved++; + explicit keyval_st(size_t num_) + : key{num_} + , val{num_} + , num{num_} + , rnd{} + { + for (auto i = 0u; i < num; ++i) { + gen(key.chr[i], key.len[i], val.chr[i], val.len[i]); } + } - ::free(value); + ~keyval_st() { + for (auto i = 0u; i < num; ++i) { + delete [] key.chr[i]; + delete [] val.chr[i]; + } } - return retrieved; -} +private: + void gen_key(char *&key_chr, size_t &key_len) { + key_len = rnd(20,60); + key_chr = new char[key_len + 1]; + rnd.fill(key_chr, key_len); + key_chr[key_len] = 0; + } + void gen_val(const char *key_chr, const size_t key_len, char *&val_chr, size_t &val_len) { + val_len = rnd(50, 5000); + val_chr = new char[val_len]; + + for (auto len = 0u; len < val_len; len += key_len) { + auto val_pos = val_chr + len; + auto rem_len = len + key_len > val_len ? val_len % key_len : key_len; + memcpy(val_pos, key_chr, rem_len); + } + } + void gen(char *&key_chr, size_t &key_len, char *&val_chr, size_t &val_len) { + gen_key(key_chr, key_len); + gen_val(key_chr, key_len, val_chr, val_len); + } +}; -/** - * Callback function to count the number of results - */ -static memcached_return_t callback_counter(const memcached_st *ptr, memcached_result_st *result, - void *context) { - (void) ptr; - (void) result; - unsigned int *counter = (unsigned int *) context; - *counter = *counter + 1; +static size_t execute_get(const client_options &opt, memcached_st &memc, const keyval_st &kv) { + size_t retrieved = 0; + random64 rnd{}; - return MEMCACHED_SUCCESS; -} + for (auto i = 0u; i < kv.num; ++i) { + memcached_return_t rc; + auto r = rnd(0, kv.num); + free(memcached_get(&memc, kv.key.chr[r], kv.key.len[r], nullptr, nullptr, &rc)); -/** - * Try to run a large mget to get all of the keys - * @param memc memcached handle - * @param keys the keys to get - * @param key_length the length of the keys - * @param number_of the number of keys to try to get - * @return the number of keys received - */ -static unsigned int execute_mget(memcached_st *memc, const char *const *keys, size_t *key_length, - unsigned int number_of) { - unsigned int retrieved = 0; - memcached_execute_fn callbacks[] = {callback_counter}; - memcached_return_t rc; - rc = memcached_mget_execute(memc, keys, key_length, (size_t) number_of, callbacks, &retrieved, 1); - - if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_NOTFOUND || rc == MEMCACHED_BUFFERED - || rc == MEMCACHED_END) - { - rc = memcached_fetch_execute(memc, callbacks, (void *) &retrieved, 1); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND && rc != MEMCACHED_END) { - fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__, - memcached_strerror(memc, rc)); - memcached_quit(memc); - return 0; + if (check_return(opt, memc, kv.key.chr[r], rc)) { + ++retrieved; } - } else { - fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__, - memcached_strerror(memc, rc)); - memcached_quit(memc); - return 0; } return retrieved; } -extern "C" { +static size_t execute_mget(const client_options &opt, memcached_st &memc, const keyval_st &kv) { + size_t retrieved = 0; + memcached_execute_fn cb[] = {&counter}; -static __attribute__((noreturn)) void *run_task(void *p) { - thread_context_st *context = (thread_context_st *) p; + auto rc = memcached_mget_execute(&memc, kv.key.chr.data(), kv.key.len.data(), kv.num, cb, &retrieved, 1); - context->init(); - - pthread_mutex_lock(&sleeper_mutex); - while (master_wakeup) { - pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); - } - pthread_mutex_unlock(&sleeper_mutex); - - /* Do Stuff */ - switch (context->test) { - case SET_TEST: - assert(context->execute_pairs); - execute_set(context->memc, context->execute_pairs, context->execute_number); - break; - - case GET_TEST: - execute_get(context->memc, context->initial_pairs, context->initial_number); - break; - - case MGET_TEST: - execute_mget(context->memc, (const char *const *) context->keys, context->key_lengths, - context->initial_number); - break; + while (rc != MEMCACHED_END && memcached_success(rc)) { + rc = memcached_fetch_execute(&memc, cb, &retrieved, 1); } - delete context; - - pthread_exit(0); -} - -} // extern "C" - - -int main(int argc, char *argv[]) { - conclusions_st conclusion; - - srandom((unsigned int) time(NULL)); - options_parse(argc, argv); - - if (opt_servers == NULL) { - char *temp; - - if ((temp = getenv("MEMCACHED_SERVERS"))) { - opt_servers = strdup(temp); - } - - if (opt_servers == NULL) { - std::cerr << "No servers provided" << std::endl; - exit(EXIT_FAILURE); + if (memcached_fatal(rc)) { + if (!opt.isset("quiet")) { + std::cerr << "Failed mget: " << memcached_strerror(&memc, rc) << ": " + << memcached_last_error_message(&memc); } } + return retrieved; +} - memcached_server_st *servers = memcached_servers_parse(opt_servers); - if (servers == NULL or memcached_server_list_count(servers) == 0) { - std::cerr << "Invalid server list provided:" << opt_servers << std::endl; - return EXIT_FAILURE; - } - - pthread_mutex_init(&sleeper_mutex, NULL); - pthread_cond_init(&sleep_threshhold, NULL); +static size_t execute_set(const client_options &opt, memcached_st &memc, const keyval_st &kv) { + for (auto i = 0u; i < kv.num; ++i) { + auto rc = memcached_set(&memc, kv.key.chr[i], kv.key.len[i], kv.val.chr[i], kv.val.len[i], 0, 0); - int error_code = EXIT_SUCCESS; - try { - scheduler(servers, &conclusion); - } catch (std::exception &e) { - std::cerr << "Died with exception: " << e.what() << std::endl; - error_code = EXIT_FAILURE; + if (!check_return(opt, memc, kv.key.chr[i], rc)) { + memcached_quit(&memc); + return i; + } } - free(opt_servers); - - (void) pthread_mutex_destroy(&sleeper_mutex); - (void) pthread_cond_destroy(&sleep_threshhold); - conclusions_print(&conclusion); - memcached_server_list_free(servers); - - return error_code; + return kv.num; } -void scheduler(memcached_server_st *servers, conclusions_st *conclusion) { - unsigned int actual_loaded = 0; /* Fix warning */ - - struct timeval start_time, end_time; - pairs_st *pairs = NULL; - - memcached_st *memc = memcached_create(NULL); - - memcached_server_push(memc, servers); +class thread_context { +public: + thread_context(const client_options &opt_, const memcached_st &memc_, const keyval_st &kv_) + : opt{opt_} + , kv{kv_} + , count{} + , root(memc_) + , memc{} + , thread([this]{ execute(); }) + {} + + ~thread_context() { + memcached_free(&memc); + } - /* We need to set udp behavior before adding servers to the client */ - if (opt_udp_io) { - if (memcached_failed(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, opt_udp_io))) { - std::cerr << "Failed to enable UDP." << std::endl; - memcached_free(memc); - exit(EXIT_FAILURE); - } + size_t complete() { + thread.join(); + return count; } - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, (uint64_t) opt_binary); +private: + const client_options &opt; + const keyval_st &kv; + size_t count; + const memcached_st &root; + memcached_st memc; + std::thread thread; - if (opt_flush) { - flush_all(memc); - } + void execute() { + memcached_clone(&memc, &root); - if (opt_createial_load) { - pairs = load_create_data(memc, opt_createial_load, &actual_loaded); - } + while (!wakeup.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } - char **keys = static_cast(calloc(actual_loaded, sizeof(char *))); - size_t *key_lengths = static_cast(calloc(actual_loaded, sizeof(size_t))); - - if (keys == NULL or key_lengths == NULL) { - free(keys); - free(key_lengths); - keys = NULL; - key_lengths = NULL; - } else { - for (uint32_t x = 0; x < actual_loaded; ++x) { - keys[x] = pairs[x].key; - key_lengths[x] = pairs[x].key_length; + if (!strcmp("set", opt.argof("test"))) { + count = execute_set(opt, memc, kv); + } else if (!strcmp("mget", opt.argof("test"))) { + count = execute_mget(opt, memc, kv); + } else { + if (strcmp("get", opt.argof("test"))) { + if (!opt.isset("quiet")) { + std::cerr << "Unknown --test: '" << opt.argof("test") << "'.\n"; + } + } + count = execute_get(opt, memc, kv); } } +}; - /* We set this after we have loaded */ - { - if (opt_non_blocking_io) - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1); - - if (opt_tcp_nodelay) - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1); - } +using opt_apply = std::function; - pthread_mutex_lock(&sleeper_mutex); - master_wakeup = 1; - pthread_mutex_unlock(&sleeper_mutex); +static opt_apply wrap_stoul(unsigned long &ul) { + return [&ul](const client_options &, const client_options::extended_option &ext, memcached_st *) { + if (ext.arg && *ext.arg) { + auto c = std::stoul(ext.arg); + if (c) { + ul = c; + } + } + return true; + }; +} - pthread_t *threads = new (std::nothrow) pthread_t[opt_concurrency]; +int main(int argc, char *argv[]) { + client_options opt{PROGRAM_NAME, PROGRAM_VERSION, PROGRAM_DESCRIPTION}; + auto concurrency = DEFAULT_CONCURRENCY; + auto load_count = DEFAULT_INITIAL_LOAD; + auto test_count = DEFAULT_EXECUTE_NUMBER; - if (threads == NULL) { - exit(EXIT_FAILURE); + for (const auto &def : opt.defaults) { + opt.add(def); } - for (uint32_t x = 0; x < opt_concurrency; x++) { - thread_context_st *context = new thread_context_st(memc, opt_test); - context->test = opt_test; - - context->initial_pairs = pairs; - context->initial_number = actual_loaded; - context->keys = keys; - context->key_lengths = key_lengths; - - if (opt_test == SET_TEST) { - context->execute_pairs = pairs_generate(opt_execute_number, VALUE_BYTES); - context->execute_number = opt_execute_number; + opt.add("noreply", 'R', no_argument, "Enable the NOREPLY behavior for storage commands.") + .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) { + if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, ext.set)) { + if(!opt_.isset("quiet")) { + std::cerr << memcached_last_error_message(memc); + } + return false; } - - /* now you create the thread */ - if (pthread_create(threads + x, NULL, run_task, (void *) context)) { - fprintf(stderr, "Could not create thread\n"); - exit(1); + return true; + }; + opt.add("udp", 'U', no_argument, "Use UDP.") + .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) { + if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, ext.set)) { + if (!opt_.isset("quiet")) { + std::cerr << memcached_last_error_message(memc) << "\n"; + } + return false; } + return true; + }; + opt.add("flush", 'F', no_argument, "Flush all servers prior test."); + opt.add("test", 't', required_argument, "Test to perform (options: get,mget,set; default: get)."); + opt.add("concurrency", 'c', required_argument, "Concurrency (number of threads to start; default: 1).") + .apply = wrap_stoul(concurrency); + opt.add("execute-number", 'e', required_argument, "Number of times to execute the tests (default: 10000).") + .apply = wrap_stoul(test_count); + opt.add("initial-load", 'l', required_argument, "Number of keys to load before executing tests (default: 10000)." + "\n\t\tDEPRECATED: --execute-number takes precedence.") + .apply = wrap_stoul(load_count); + + char set[] = "set"; + opt.set("test", true, set); + + if (!opt.parse(argc, argv)) { + exit(EXIT_FAILURE); } - pthread_mutex_lock(&sleeper_mutex); - master_wakeup = 0; - pthread_mutex_unlock(&sleeper_mutex); - pthread_cond_broadcast(&sleep_threshhold); - gettimeofday(&start_time, NULL); - - for (uint32_t x = 0; x < opt_concurrency; x++) { - void *retval; - pthread_join(threads[x], &retval); + memcached_st memc; + if (!check_memcached(opt, memc)) { + exit(EXIT_FAILURE); } - delete[] threads; - - gettimeofday(&end_time, NULL); - conclusion->load_time = timedif(end_time, start_time); - conclusion->read_time = timedif(end_time, start_time); - free(keys); - free(key_lengths); - pairs_free(pairs); - memcached_free(memc); -} + if (!opt.apply(&memc)) { + memcached_free(&memc); + exit(EXIT_FAILURE); + } -void options_parse(int argc, char *argv[]) { - memcached_programs_help_st help_options[] = { - {0}, - }; + auto total_start = time_clock::now(); + std::cout << std::fixed << std::setprecision(3); - static struct option long_options[] = { - {(OPTIONSTRING) "concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY}, - {(OPTIONSTRING) "debug", no_argument, &opt_verbose, OPT_DEBUG}, - {(OPTIONSTRING) "quiet", no_argument, NULL, OPT_QUIET}, - {(OPTIONSTRING) "execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER}, - {(OPTIONSTRING) "flag", no_argument, &opt_displayflag, OPT_FLAG}, - {(OPTIONSTRING) "flush", no_argument, &opt_flush, OPT_FLUSH}, - {(OPTIONSTRING) "help", no_argument, NULL, OPT_HELP}, - {(OPTIONSTRING) "initial-load", required_argument, NULL, - OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */ - {(OPTIONSTRING) "non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK}, - {(OPTIONSTRING) "servers", required_argument, NULL, OPT_SERVERS}, - {(OPTIONSTRING) "tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY}, - {(OPTIONSTRING) "test", required_argument, NULL, OPT_SLAP_TEST}, - {(OPTIONSTRING) "verbose", no_argument, &opt_verbose, OPT_VERBOSE}, - {(OPTIONSTRING) "version", no_argument, NULL, OPT_VERSION}, - {(OPTIONSTRING) "binary", no_argument, NULL, OPT_BINARY}, - {(OPTIONSTRING) "udp", no_argument, NULL, OPT_UDP}, - {0, 0, 0, 0}, + auto align = [](std::ostream &io) -> std::ostream &{ + return io << std::right << std::setw(8); }; - bool opt_help = false; - bool opt_version = false; - int option_index = 0; - while (1) { - int option_rv = getopt_long(argc, argv, "Vhvds:", long_options, &option_index); - - if (option_rv == -1) - break; - - switch (option_rv) { - case 0: - break; - - case OPT_UDP: - if (opt_test == GET_TEST) { - fprintf(stderr, - "You can not run a get test in UDP mode. UDP mode " - "does not currently support get ops.\n"); - exit(1); - } - opt_udp_io = true; - break; - - case OPT_BINARY: - opt_binary = true; - break; - - case OPT_VERBOSE: /* --verbose or -v */ - opt_verbose = OPT_VERBOSE; - break; - - case OPT_DEBUG: /* --debug or -d */ - opt_verbose = OPT_DEBUG; - break; - - case OPT_VERSION: /* --version or -V */ - opt_version = true; - break; - - case OPT_HELP: /* --help or -h */ - opt_help = true; - break; - - case OPT_SERVERS: /* --servers or -s */ - opt_servers = strdup(optarg); - break; - - case OPT_SLAP_TEST: - if (strcmp(optarg, "get") == 0) { - if (opt_udp_io == 1) { - fprintf(stderr, - "You can not run a get test in UDP mode. UDP mode " - "does not currently support get ops.\n"); - exit(EXIT_FAILURE); - } - opt_test = GET_TEST; - } else if (strcmp(optarg, "set") == 0) { - opt_test = SET_TEST; - } else if (strcmp(optarg, "mget") == 0) { - opt_test = MGET_TEST; - } else { - fprintf(stderr, "Your test, %s, is not a known test\n", optarg); - exit(EXIT_FAILURE); - } - break; - - case OPT_SLAP_CONCURRENCY: - errno = 0; - opt_concurrency = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno) { - fprintf(stderr, "Invalid value for concurrency: %s\n", optarg); - exit(EXIT_FAILURE); - } - break; - - case OPT_SLAP_EXECUTE_NUMBER: - errno = 0; - opt_execute_number = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno) { - fprintf(stderr, "Invalid value for execute: %s\n", optarg); - exit(EXIT_FAILURE); - } - break; - - case OPT_SLAP_INITIAL_LOAD: - errno = 0; - opt_createial_load = (unsigned int) strtoul(optarg, (char **) NULL, 10); - if (errno) { - fprintf(stderr, "Invalid value for initial load: %s\n", optarg); - exit(EXIT_FAILURE); + if (opt.isset("flush")) { + if (opt.isset("verbose")) { + std::cout << "- Flushing servers ...\n"; + } + auto flush_start = time_clock::now(); + auto rc = memcached_flush(&memc, 0); + auto flush_elapsed = time_clock::now() - flush_start; + if (!memcached_success(rc)) { + if (!opt.isset("quiet")) { + std::cerr << "Failed to FLUSH: " << memcached_last_error_message(&memc) << "\n"; } - break; - - case OPT_QUIET: - close_stdio(); - break; - - case '?': - /* getopt_long already printed an error message. */ + memcached_free(&memc); exit(EXIT_FAILURE); - - default: - abort(); + } + if (!opt.isset("quiet")) { + std::cout << "Time to flush " << align + << memcached_server_count(&memc) + << " servers: " + << align << time_format(flush_elapsed).count() + << " seconds.\n"; } } - if (opt_version) { - version_command(PROGRAM_NAME); - exit(EXIT_SUCCESS); + if (opt.isset("verbose")) { + std::cout << "- Generating random test data ...\n"; } - - if (opt_help) { - help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options); - exit(EXIT_SUCCESS); + auto keyval_start = time_clock::now(); + keyval_st kv{test_count}; + auto keyval_elapsed = time_clock::now() - keyval_start; + + if (!opt.isset("quiet")) { + std::cout << "Time to generate " + << align << test_count + << " test keys: " + << align << time_format(keyval_elapsed).count() + << " seconds.\n"; } - if ((opt_test == GET_TEST or opt_test == MGET_TEST) and opt_createial_load == 0) - opt_createial_load = DEFAULT_INITIAL_LOAD; - - if (opt_execute_number == 0) - opt_execute_number = DEFAULT_EXECUTE_NUMBER; - - if (opt_concurrency == 0) - opt_concurrency = DEFAULT_CONCURRENCY; -} - -void conclusions_print(conclusions_st *conclusion) { - printf("\tThreads connecting to servers %u\n", opt_concurrency); -#ifdef NOT_FINISHED - printf("\tLoaded %u rows\n", conclusion->rows_loaded); - printf("\tRead %u rows\n", conclusion->rows_read); -#endif - if (opt_test == SET_TEST) - printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, - conclusion->load_time % 1000); - else - printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, - conclusion->read_time % 1000); -} - -void flush_all(memcached_st *memc) { - memcached_flush(memc, 0); -} - -pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, - unsigned int *actual_loaded) { - memcached_st *memc_clone = memcached_clone(NULL, memc); - /* We always used non-blocking IO for load since it is faster */ - memcached_behavior_set(memc_clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0); - - pairs_st *pairs = pairs_generate(number_of, VALUE_BYTES); - *actual_loaded = execute_set(memc_clone, pairs, number_of); - - memcached_free(memc_clone); + if (strcmp(opt.argof("test"), "set")) { + if (opt.isset("verbose")) { + std::cout << "- Feeding initial load to servers ...\n"; + } + auto feed_start = time_clock::now(); + auto count = execute_set(opt, memc, kv); + check_return(opt, memc, memcached_flush_buffers(&memc)); + auto feed_elapsed = time_clock::now() - feed_start; + + if (!opt.isset("quiet")) { + std::cout << "Time to set " + << align << count + << " keys: " + << align << time_format(feed_elapsed).count() + << " seconds.\n"; + } + } - return pairs; + if (opt.isset("verbose")) { + std::cout << "- Starting " << concurrency << " threads ...\n"; + } + auto thread_start = time_clock::now(); + std::vector threads{}; + threads.reserve(concurrency); + for (auto i = 0ul; i < concurrency; ++i) { + threads.push_back(new thread_context(opt, memc, kv)); + } + auto thread_elapsed = time_clock::now() - thread_start; + if (!opt.isset("quiet")) { + std::cout << "Time to start " + << align << concurrency + << " threads: " + << time_format(thread_elapsed).count() + << " seconds.\n"; + } + if (opt.isset("verbose")) { + std::cout << "- Starting test: " << test_count + << " x " << opt.argof("test") + << " x " << concurrency + << " ...\n"; + } + auto count = 0ul; + auto test_start = time_clock::now(); + wakeup.store(true, std::memory_order_release); + for (auto &thread : threads) { + count += thread->complete(); + delete thread; + } + auto test_elapsed = time_clock::now() - test_start; + + if (!opt.isset("quiet")) { + std::cout << "--------------------------------------------------------------------\n" + << "Time to " << std::left << std::setw(4) + << opt.argof("test") << " " + << align << count + << " keys by " + << std::setw(4) + << concurrency << " threads: " + << align << time_format(test_elapsed).count() + << " seconds.\n"; + + std::cout << "--------------------------------------------------------------------\n" + << "Time total: " + << align << std::setw(12) + << time_format(time_clock::now() - total_start).count() + << " seconds.\n"; + } + exit(EXIT_SUCCESS); } diff --git a/src/bin/memstat.cc b/src/bin/memstat.cc index 5208fb4d..638297ac 100644 --- a/src/bin/memstat.cc +++ b/src/bin/memstat.cc @@ -19,12 +19,14 @@ #define PROGRAM_DESCRIPTION "Print stats/version of or analyze a memcached cluster." #define PROGRAM_VERSION "1.1" +#define DEFAULT_LATENCY_ITERATIONS 100 // update help string, if changed + #include "common/options.hpp" #include "common/checks.hpp" +#include "common/time.hpp" #include "common/utilities.h" #include -#include #include static memcached_return_t print_server_version(const memcached_st *, @@ -83,11 +85,6 @@ static bool analyze_stat(const client_options &opt, memcached_st *memc, memcache return true; } -using time_clock = std::chrono::high_resolution_clock; -using time_point = std::chrono::time_point; -using time_format = std::chrono::duration>; -using time_format_ms = std::chrono::duration>; - static void latency_test(uint32_t iterations, std::vector &servers) { const char *test_key = "libmemcached_test_key"; size_t test_key_len = strlen(test_key); @@ -146,7 +143,7 @@ static void latency_test(uint32_t iterations, std::vector &servers } static bool analyze_latency(client_options &opt, memcached_st *root) { - uint32_t num_of_tests = 100; + uint32_t num_of_tests = DEFAULT_LATENCY_ITERATIONS; if (auto iter_str = opt.argof("iterations")) { num_of_tests = std::stoul(iter_str); diff --git a/test/setup.cpp b/test/setup.cpp index 22c7c2ad..a506976a 100644 --- a/test/setup.cpp +++ b/test/setup.cpp @@ -59,6 +59,7 @@ static inline void setup_signals() { # define LSAN_OPTIONS \ "suppressions=" SOURCES_ROOT "/test/LeakSanitizer.suppressions," \ "" + static inline void setup_asan(char **argv) { const auto set = getenv("ASAN_OPTIONS"); @@ -82,6 +83,24 @@ static inline void setup_lsan(char **argv) { # define setup_lsan(a) (void) a #endif +#if HAVE_TSAN +# define TSAN_OPTIONS \ + "abort_on_error=0," \ + "halt_on_error=0" \ + "" +static inline void setup_tsan(char **argv) { + const auto set = getenv("TSAN_OPTIONS"); + + if (!set || !*set) { + SET_ENV_EX(tsan, "TSAN_OPTIONS", TSAN_OPTIONS, 0); + execvp(argv[0], argv); + perror("exec()"); + } +} +#else +# define setup_tsan(a) (void) a +#endif + #if LIBMEMCACHED_WITH_SASL_SUPPORT static inline void setup_sasl() { SET_ENV_EX(sasl_pwdb, "MEMCACHED_SASL_PWDB", LIBMEMCACHED_WITH_SASL_PWDB, 0); @@ -100,6 +119,7 @@ int setup(int &, char ***argv) { setup_random(); setup_asan(*argv); setup_lsan(*argv); + setup_tsan(*argv); setup_sasl(); return 0; diff --git a/test/tests/bin/memslap.cpp b/test/tests/bin/memslap.cpp index dee7ebc3..cc640cdd 100644 --- a/test/tests/bin/memslap.cpp +++ b/test/tests/bin/memslap.cpp @@ -10,26 +10,27 @@ TEST_CASE("bin/memslap") { SECTION("no servers provided") { string output; REQUIRE_FALSE(sh.run("memslap", output)); - REQUIRE(output == "No servers provided\n"); + REQUIRE(output == "No servers provided.\n"); } SECTION("--help") { string output; REQUIRE(sh.run("memslap --help", output)); - REQUIRE_THAT(output, Contains("memslap")); - REQUIRE_THAT(output, Contains("v1")); - REQUIRE_THAT(output, Contains("help")); - REQUIRE_THAT(output, Contains("version")); - REQUIRE_THAT(output, Contains("option")); - REQUIRE_THAT(output, Contains("--")); - REQUIRE_THAT(output, Contains("=")); + REQUIRE_THAT(output, Contains("memslap v1")); + REQUIRE_THAT(output, Contains("Usage:")); + REQUIRE_THAT(output, Contains("Options:")); + REQUIRE_THAT(output, Contains("-h|--help")); + REQUIRE_THAT(output, Contains("-V|--version")); + REQUIRE_THAT(output, Contains("--concurrency")); + REQUIRE_THAT(output, Contains("Environment:")); + REQUIRE_THAT(output, Contains("MEMCACHED_SERVERS")); } SECTION("with servers") { auto test = MemcachedCluster::udp(); auto flags = {"--binary", "--udp", "--flush", "--test=mget", "--test=get", "--tcp-nodelay", - "--non-blocking", "--initial-load=1000"}; + "--non-blocking", "--execute-number=1000"}; string servers{"--servers="}; for (const auto &server : test.cluster.getServers()) {