#include "mem_config.h"
-#include <cassert>
-#include <cerrno>
-#include <cstdio>
-#include <cstdlib>
-#include <cstring>
-#include <fcntl.h>
-#include <getopt.h>
-#include <memory>
-#include <pthread.h>
-#include <sys/mman.h>
-#include <sys/stat.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include <iostream>
-
-#include "libmemcached-1.0/memcached.h"
-
-#include "client_options.h"
-#include "utilities.h"
-#include "generator.h"
-
-#define DEFAULT_INITIAL_LOAD 10000
-#define DEFAULT_EXECUTE_NUMBER 10000
-#define DEFAULT_CONCURRENCY 1
-
-#define VALUE_BYTES 4096
-
#define PROGRAM_NAME "memslap"
-#define PROGRAM_DESCRIPTION "Generates a load against a memcached cluster of servers."
-
-/* Global Thread counter */
-volatile unsigned int master_wakeup;
-pthread_mutex_t sleeper_mutex;
-pthread_cond_t sleep_threshhold;
-
-/* Types */
-enum test_t { SET_TEST, GET_TEST, MGET_TEST };
-
-struct thread_context_st {
- unsigned int key_count;
- pairs_st *initial_pairs;
- unsigned int initial_number;
- pairs_st *execute_pairs;
- unsigned int execute_number;
- char **keys;
- size_t *key_lengths;
- test_t test;
- memcached_st *memc;
- const memcached_st *root;
-
- thread_context_st(const memcached_st *memc_arg, test_t test_arg)
- : key_count(0)
- , initial_pairs(NULL)
- , initial_number(0)
- , execute_pairs(NULL)
- , execute_number(0)
- , keys(0)
- , key_lengths(NULL)
- , test(test_arg)
- , memc(NULL)
- , root(memc_arg) {}
-
- void init() {
- memc = memcached_clone(NULL, root);
- }
+#define PROGRAM_DESCRIPTION "Generate load against a cluster of memcached servers."
+#define PROGRAM_VERSION "1.1"
- ~thread_context_st() {
- if (execute_pairs) {
- pairs_free(execute_pairs);
- }
- memcached_free(memc);
- }
-};
+#define DEFAULT_INITIAL_LOAD 10000ul
+#define DEFAULT_EXECUTE_NUMBER 10000ul
+#define DEFAULT_CONCURRENCY 1ul
-struct conclusions_st {
- long int load_time;
- long int read_time;
- unsigned int rows_loaded;
- unsigned int rows_read;
-
- conclusions_st()
- : load_time(0)
- , read_time(0)
- , rows_loaded(0)
- , rows_read() {}
-};
+#include "common/options.hpp"
+#include "common/checks.hpp"
+#include "common/time.hpp"
+#include "common/random.hpp"
-/* Prototypes */
-void options_parse(int argc, char *argv[]);
-void conclusions_print(conclusions_st *conclusion);
-void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
-pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded);
-void flush_all(memcached_st *memc);
-
-static bool opt_binary = 0;
-static int opt_verbose = 0;
-static int opt_flush = 0;
-static int opt_non_blocking_io = 0;
-static int opt_tcp_nodelay = 0;
-static unsigned int opt_execute_number = 0;
-static unsigned int opt_createial_load = 0;
-static unsigned int opt_concurrency = 0;
-static int opt_displayflag = 0;
-static char *opt_servers = NULL;
-static bool opt_udp_io = false;
-test_t opt_test = SET_TEST;
-
-
-unsigned int execute_set(memcached_st *memc, pairs_st *pairs, unsigned int number_of) {
- uint32_t count = 0;
- for (; count < number_of; ++count) {
- memcached_return_t rc = memcached_set(memc, pairs[count].key, pairs[count].key_length,
- pairs[count].value, pairs[count].value_length, 0, 0);
- if (memcached_failed(rc)) {
- fprintf(stderr, "%s:%d Failure on %u insert (%s) of %.*s\n", __FILE__, __LINE__, count,
- memcached_last_error_message(memc), (unsigned int) pairs[count].key_length,
- pairs[count].key);
-
- // We will try to reconnect and see if that fixes the issue
- memcached_quit(memc);
-
- return count;
- }
- }
+#include <atomic>
+#include <thread>
+#include <iomanip>
- return count;
-}
+static std::atomic_bool wakeup;
-/*
- Execute a memcached_get() on a set of pairs.
- Return the number of rows retrieved.
-*/
-static unsigned int execute_get(memcached_st *memc, pairs_st *pairs, unsigned int number_of) {
- unsigned int x;
- unsigned int retrieved;
+static memcached_return_t counter(const memcached_st *, memcached_result_st *, void *ctx) {
+ auto c = static_cast<size_t *>(ctx);
+ ++(*c);
+ return MEMCACHED_SUCCESS;
+}
- for (retrieved = 0, x = 0; x < number_of; x++) {
- size_t value_length;
- uint32_t flags;
+struct keyval_st {
+ struct data {
+ std::vector<char *> chr;
+ std::vector<size_t> len;
+ explicit data(size_t num)
+ : chr(num)
+ , len(num)
+ {}
+ };
- unsigned int fetch_key = (unsigned int) ((unsigned int) random() % number_of);
+ data key;
+ data val;
- memcached_return_t rc;
- char *value = memcached_get(memc, pairs[fetch_key].key, pairs[fetch_key].key_length,
- &value_length, &flags, &rc);
+ size_t num;
+ random64 rnd;
- if (memcached_failed(rc)) {
- fprintf(stderr, "%s:%d Failure on read(%s) of %.*s\n", __FILE__, __LINE__,
- memcached_last_error_message(memc), (unsigned int) pairs[fetch_key].key_length,
- pairs[fetch_key].key);
- } else {
- retrieved++;
+ explicit keyval_st(size_t num_)
+ : key{num_}
+ , val{num_}
+ , num{num_}
+ , rnd{}
+ {
+ for (auto i = 0u; i < num; ++i) {
+ gen(key.chr[i], key.len[i], val.chr[i], val.len[i]);
}
+ }
- ::free(value);
+ ~keyval_st() {
+ for (auto i = 0u; i < num; ++i) {
+ delete [] key.chr[i];
+ delete [] val.chr[i];
+ }
}
- return retrieved;
-}
+private:
+ void gen_key(char *&key_chr, size_t &key_len) {
+ key_len = rnd(20,60);
+ key_chr = new char[key_len + 1];
+ rnd.fill(key_chr, key_len);
+ key_chr[key_len] = 0;
+ }
+ void gen_val(const char *key_chr, const size_t key_len, char *&val_chr, size_t &val_len) {
+ val_len = rnd(50, 5000);
+ val_chr = new char[val_len];
+
+ for (auto len = 0u; len < val_len; len += key_len) {
+ auto val_pos = val_chr + len;
+ auto rem_len = len + key_len > val_len ? val_len % key_len : key_len;
+ memcpy(val_pos, key_chr, rem_len);
+ }
+ }
+ void gen(char *&key_chr, size_t &key_len, char *&val_chr, size_t &val_len) {
+ gen_key(key_chr, key_len);
+ gen_val(key_chr, key_len, val_chr, val_len);
+ }
+};
-/**
- * Callback function to count the number of results
- */
-static memcached_return_t callback_counter(const memcached_st *ptr, memcached_result_st *result,
- void *context) {
- (void) ptr;
- (void) result;
- unsigned int *counter = (unsigned int *) context;
- *counter = *counter + 1;
+static size_t execute_get(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
+ size_t retrieved = 0;
+ random64 rnd{};
- return MEMCACHED_SUCCESS;
-}
+ for (auto i = 0u; i < kv.num; ++i) {
+ memcached_return_t rc;
+ auto r = rnd(0, kv.num);
+ free(memcached_get(&memc, kv.key.chr[r], kv.key.len[r], nullptr, nullptr, &rc));
-/**
- * Try to run a large mget to get all of the keys
- * @param memc memcached handle
- * @param keys the keys to get
- * @param key_length the length of the keys
- * @param number_of the number of keys to try to get
- * @return the number of keys received
- */
-static unsigned int execute_mget(memcached_st *memc, const char *const *keys, size_t *key_length,
- unsigned int number_of) {
- unsigned int retrieved = 0;
- memcached_execute_fn callbacks[] = {callback_counter};
- memcached_return_t rc;
- rc = memcached_mget_execute(memc, keys, key_length, (size_t) number_of, callbacks, &retrieved, 1);
-
- if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_NOTFOUND || rc == MEMCACHED_BUFFERED
- || rc == MEMCACHED_END)
- {
- rc = memcached_fetch_execute(memc, callbacks, (void *) &retrieved, 1);
- if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND && rc != MEMCACHED_END) {
- fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__,
- memcached_strerror(memc, rc));
- memcached_quit(memc);
- return 0;
+ if (check_return(opt, memc, kv.key.chr[r], rc)) {
+ ++retrieved;
}
- } else {
- fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__,
- memcached_strerror(memc, rc));
- memcached_quit(memc);
- return 0;
}
return retrieved;
}
-extern "C" {
+static size_t execute_mget(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
+ size_t retrieved = 0;
+ memcached_execute_fn cb[] = {&counter};
-static __attribute__((noreturn)) void *run_task(void *p) {
- thread_context_st *context = (thread_context_st *) p;
+ auto rc = memcached_mget_execute(&memc, kv.key.chr.data(), kv.key.len.data(), kv.num, cb, &retrieved, 1);
- context->init();
-
- pthread_mutex_lock(&sleeper_mutex);
- while (master_wakeup) {
- pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
- }
- pthread_mutex_unlock(&sleeper_mutex);
-
- /* Do Stuff */
- switch (context->test) {
- case SET_TEST:
- assert(context->execute_pairs);
- execute_set(context->memc, context->execute_pairs, context->execute_number);
- break;
-
- case GET_TEST:
- execute_get(context->memc, context->initial_pairs, context->initial_number);
- break;
-
- case MGET_TEST:
- execute_mget(context->memc, (const char *const *) context->keys, context->key_lengths,
- context->initial_number);
- break;
+ while (rc != MEMCACHED_END && memcached_success(rc)) {
+ rc = memcached_fetch_execute(&memc, cb, &retrieved, 1);
}
- delete context;
-
- pthread_exit(0);
-}
-
-} // extern "C"
-
-
-int main(int argc, char *argv[]) {
- conclusions_st conclusion;
-
- srandom((unsigned int) time(NULL));
- options_parse(argc, argv);
-
- if (opt_servers == NULL) {
- char *temp;
-
- if ((temp = getenv("MEMCACHED_SERVERS"))) {
- opt_servers = strdup(temp);
- }
-
- if (opt_servers == NULL) {
- std::cerr << "No servers provided" << std::endl;
- exit(EXIT_FAILURE);
+ if (memcached_fatal(rc)) {
+ if (!opt.isset("quiet")) {
+ std::cerr << "Failed mget: " << memcached_strerror(&memc, rc) << ": "
+ << memcached_last_error_message(&memc);
}
}
+ return retrieved;
+}
- memcached_server_st *servers = memcached_servers_parse(opt_servers);
- if (servers == NULL or memcached_server_list_count(servers) == 0) {
- std::cerr << "Invalid server list provided:" << opt_servers << std::endl;
- return EXIT_FAILURE;
- }
-
- pthread_mutex_init(&sleeper_mutex, NULL);
- pthread_cond_init(&sleep_threshhold, NULL);
+static size_t execute_set(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
+ for (auto i = 0u; i < kv.num; ++i) {
+ auto rc = memcached_set(&memc, kv.key.chr[i], kv.key.len[i], kv.val.chr[i], kv.val.len[i], 0, 0);
- int error_code = EXIT_SUCCESS;
- try {
- scheduler(servers, &conclusion);
- } catch (std::exception &e) {
- std::cerr << "Died with exception: " << e.what() << std::endl;
- error_code = EXIT_FAILURE;
+ if (!check_return(opt, memc, kv.key.chr[i], rc)) {
+ memcached_quit(&memc);
+ return i;
+ }
}
- free(opt_servers);
-
- (void) pthread_mutex_destroy(&sleeper_mutex);
- (void) pthread_cond_destroy(&sleep_threshhold);
- conclusions_print(&conclusion);
- memcached_server_list_free(servers);
-
- return error_code;
+ return kv.num;
}
-void scheduler(memcached_server_st *servers, conclusions_st *conclusion) {
- unsigned int actual_loaded = 0; /* Fix warning */
-
- struct timeval start_time, end_time;
- pairs_st *pairs = NULL;
-
- memcached_st *memc = memcached_create(NULL);
-
- memcached_server_push(memc, servers);
+class thread_context {
+public:
+ thread_context(const client_options &opt_, const memcached_st &memc_, const keyval_st &kv_)
+ : opt{opt_}
+ , kv{kv_}
+ , count{}
+ , root(memc_)
+ , memc{}
+ , thread([this]{ execute(); })
+ {}
+
+ ~thread_context() {
+ memcached_free(&memc);
+ }
- /* We need to set udp behavior before adding servers to the client */
- if (opt_udp_io) {
- if (memcached_failed(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, opt_udp_io))) {
- std::cerr << "Failed to enable UDP." << std::endl;
- memcached_free(memc);
- exit(EXIT_FAILURE);
- }
+ size_t complete() {
+ thread.join();
+ return count;
}
- memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, (uint64_t) opt_binary);
+private:
+ const client_options &opt;
+ const keyval_st &kv;
+ size_t count;
+ const memcached_st &root;
+ memcached_st memc;
+ std::thread thread;
- if (opt_flush) {
- flush_all(memc);
- }
+ void execute() {
+ memcached_clone(&memc, &root);
- if (opt_createial_load) {
- pairs = load_create_data(memc, opt_createial_load, &actual_loaded);
- }
+ while (!wakeup.load(std::memory_order_acquire)) {
+ std::this_thread::yield();
+ }
- char **keys = static_cast<char **>(calloc(actual_loaded, sizeof(char *)));
- size_t *key_lengths = static_cast<size_t *>(calloc(actual_loaded, sizeof(size_t)));
-
- if (keys == NULL or key_lengths == NULL) {
- free(keys);
- free(key_lengths);
- keys = NULL;
- key_lengths = NULL;
- } else {
- for (uint32_t x = 0; x < actual_loaded; ++x) {
- keys[x] = pairs[x].key;
- key_lengths[x] = pairs[x].key_length;
+ if (!strcmp("set", opt.argof("test"))) {
+ count = execute_set(opt, memc, kv);
+ } else if (!strcmp("mget", opt.argof("test"))) {
+ count = execute_mget(opt, memc, kv);
+ } else {
+ if (strcmp("get", opt.argof("test"))) {
+ if (!opt.isset("quiet")) {
+ std::cerr << "Unknown --test: '" << opt.argof("test") << "'.\n";
+ }
+ }
+ count = execute_get(opt, memc, kv);
}
}
+};
- /* We set this after we have loaded */
- {
- if (opt_non_blocking_io)
- memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
-
- if (opt_tcp_nodelay)
- memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
- }
+using opt_apply = std::function<bool(const client_options &, const client_options::extended_option &ext, memcached_st *)>;
- pthread_mutex_lock(&sleeper_mutex);
- master_wakeup = 1;
- pthread_mutex_unlock(&sleeper_mutex);
+static opt_apply wrap_stoul(unsigned long &ul) {
+ return [&ul](const client_options &, const client_options::extended_option &ext, memcached_st *) {
+ if (ext.arg && *ext.arg) {
+ auto c = std::stoul(ext.arg);
+ if (c) {
+ ul = c;
+ }
+ }
+ return true;
+ };
+}
- pthread_t *threads = new (std::nothrow) pthread_t[opt_concurrency];
+int main(int argc, char *argv[]) {
+ client_options opt{PROGRAM_NAME, PROGRAM_VERSION, PROGRAM_DESCRIPTION};
+ auto concurrency = DEFAULT_CONCURRENCY;
+ auto load_count = DEFAULT_INITIAL_LOAD;
+ auto test_count = DEFAULT_EXECUTE_NUMBER;
- if (threads == NULL) {
- exit(EXIT_FAILURE);
+ for (const auto &def : opt.defaults) {
+ opt.add(def);
}
- for (uint32_t x = 0; x < opt_concurrency; x++) {
- thread_context_st *context = new thread_context_st(memc, opt_test);
- context->test = opt_test;
-
- context->initial_pairs = pairs;
- context->initial_number = actual_loaded;
- context->keys = keys;
- context->key_lengths = key_lengths;
-
- if (opt_test == SET_TEST) {
- context->execute_pairs = pairs_generate(opt_execute_number, VALUE_BYTES);
- context->execute_number = opt_execute_number;
+ opt.add("noreply", 'R', no_argument, "Enable the NOREPLY behavior for storage commands.")
+ .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) {
+ if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, ext.set)) {
+ if(!opt_.isset("quiet")) {
+ std::cerr << memcached_last_error_message(memc);
+ }
+ return false;
}
-
- /* now you create the thread */
- if (pthread_create(threads + x, NULL, run_task, (void *) context)) {
- fprintf(stderr, "Could not create thread\n");
- exit(1);
+ return true;
+ };
+ opt.add("udp", 'U', no_argument, "Use UDP.")
+ .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) {
+ if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, ext.set)) {
+ if (!opt_.isset("quiet")) {
+ std::cerr << memcached_last_error_message(memc) << "\n";
+ }
+ return false;
}
+ return true;
+ };
+ opt.add("flush", 'F', no_argument, "Flush all servers prior test.");
+ opt.add("test", 't', required_argument, "Test to perform (options: get,mget,set; default: get).");
+ opt.add("concurrency", 'c', required_argument, "Concurrency (number of threads to start; default: 1).")
+ .apply = wrap_stoul(concurrency);
+ opt.add("execute-number", 'e', required_argument, "Number of times to execute the tests (default: 10000).")
+ .apply = wrap_stoul(test_count);
+ opt.add("initial-load", 'l', required_argument, "Number of keys to load before executing tests (default: 10000)."
+ "\n\t\tDEPRECATED: --execute-number takes precedence.")
+ .apply = wrap_stoul(load_count);
+
+ char set[] = "set";
+ opt.set("test", true, set);
+
+ if (!opt.parse(argc, argv)) {
+ exit(EXIT_FAILURE);
}
- pthread_mutex_lock(&sleeper_mutex);
- master_wakeup = 0;
- pthread_mutex_unlock(&sleeper_mutex);
- pthread_cond_broadcast(&sleep_threshhold);
- gettimeofday(&start_time, NULL);
-
- for (uint32_t x = 0; x < opt_concurrency; x++) {
- void *retval;
- pthread_join(threads[x], &retval);
+ memcached_st memc;
+ if (!check_memcached(opt, memc)) {
+ exit(EXIT_FAILURE);
}
- delete[] threads;
-
- gettimeofday(&end_time, NULL);
- conclusion->load_time = timedif(end_time, start_time);
- conclusion->read_time = timedif(end_time, start_time);
- free(keys);
- free(key_lengths);
- pairs_free(pairs);
- memcached_free(memc);
-}
+ if (!opt.apply(&memc)) {
+ memcached_free(&memc);
+ exit(EXIT_FAILURE);
+ }
-void options_parse(int argc, char *argv[]) {
- memcached_programs_help_st help_options[] = {
- {0},
- };
+ auto total_start = time_clock::now();
+ std::cout << std::fixed << std::setprecision(3);
- static struct option long_options[] = {
- {(OPTIONSTRING) "concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
- {(OPTIONSTRING) "debug", no_argument, &opt_verbose, OPT_DEBUG},
- {(OPTIONSTRING) "quiet", no_argument, NULL, OPT_QUIET},
- {(OPTIONSTRING) "execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
- {(OPTIONSTRING) "flag", no_argument, &opt_displayflag, OPT_FLAG},
- {(OPTIONSTRING) "flush", no_argument, &opt_flush, OPT_FLUSH},
- {(OPTIONSTRING) "help", no_argument, NULL, OPT_HELP},
- {(OPTIONSTRING) "initial-load", required_argument, NULL,
- OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
- {(OPTIONSTRING) "non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
- {(OPTIONSTRING) "servers", required_argument, NULL, OPT_SERVERS},
- {(OPTIONSTRING) "tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY},
- {(OPTIONSTRING) "test", required_argument, NULL, OPT_SLAP_TEST},
- {(OPTIONSTRING) "verbose", no_argument, &opt_verbose, OPT_VERBOSE},
- {(OPTIONSTRING) "version", no_argument, NULL, OPT_VERSION},
- {(OPTIONSTRING) "binary", no_argument, NULL, OPT_BINARY},
- {(OPTIONSTRING) "udp", no_argument, NULL, OPT_UDP},
- {0, 0, 0, 0},
+ auto align = [](std::ostream &io) -> std::ostream &{
+ return io << std::right << std::setw(8);
};
- bool opt_help = false;
- bool opt_version = false;
- int option_index = 0;
- while (1) {
- int option_rv = getopt_long(argc, argv, "Vhvds:", long_options, &option_index);
-
- if (option_rv == -1)
- break;
-
- switch (option_rv) {
- case 0:
- break;
-
- case OPT_UDP:
- if (opt_test == GET_TEST) {
- fprintf(stderr,
- "You can not run a get test in UDP mode. UDP mode "
- "does not currently support get ops.\n");
- exit(1);
- }
- opt_udp_io = true;
- break;
-
- case OPT_BINARY:
- opt_binary = true;
- break;
-
- case OPT_VERBOSE: /* --verbose or -v */
- opt_verbose = OPT_VERBOSE;
- break;
-
- case OPT_DEBUG: /* --debug or -d */
- opt_verbose = OPT_DEBUG;
- break;
-
- case OPT_VERSION: /* --version or -V */
- opt_version = true;
- break;
-
- case OPT_HELP: /* --help or -h */
- opt_help = true;
- break;
-
- case OPT_SERVERS: /* --servers or -s */
- opt_servers = strdup(optarg);
- break;
-
- case OPT_SLAP_TEST:
- if (strcmp(optarg, "get") == 0) {
- if (opt_udp_io == 1) {
- fprintf(stderr,
- "You can not run a get test in UDP mode. UDP mode "
- "does not currently support get ops.\n");
- exit(EXIT_FAILURE);
- }
- opt_test = GET_TEST;
- } else if (strcmp(optarg, "set") == 0) {
- opt_test = SET_TEST;
- } else if (strcmp(optarg, "mget") == 0) {
- opt_test = MGET_TEST;
- } else {
- fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
- exit(EXIT_FAILURE);
- }
- break;
-
- case OPT_SLAP_CONCURRENCY:
- errno = 0;
- opt_concurrency = (unsigned int) strtoul(optarg, (char **) NULL, 10);
- if (errno) {
- fprintf(stderr, "Invalid value for concurrency: %s\n", optarg);
- exit(EXIT_FAILURE);
- }
- break;
-
- case OPT_SLAP_EXECUTE_NUMBER:
- errno = 0;
- opt_execute_number = (unsigned int) strtoul(optarg, (char **) NULL, 10);
- if (errno) {
- fprintf(stderr, "Invalid value for execute: %s\n", optarg);
- exit(EXIT_FAILURE);
- }
- break;
-
- case OPT_SLAP_INITIAL_LOAD:
- errno = 0;
- opt_createial_load = (unsigned int) strtoul(optarg, (char **) NULL, 10);
- if (errno) {
- fprintf(stderr, "Invalid value for initial load: %s\n", optarg);
- exit(EXIT_FAILURE);
+ if (opt.isset("flush")) {
+ if (opt.isset("verbose")) {
+ std::cout << "- Flushing servers ...\n";
+ }
+ auto flush_start = time_clock::now();
+ auto rc = memcached_flush(&memc, 0);
+ auto flush_elapsed = time_clock::now() - flush_start;
+ if (!memcached_success(rc)) {
+ if (!opt.isset("quiet")) {
+ std::cerr << "Failed to FLUSH: " << memcached_last_error_message(&memc) << "\n";
}
- break;
-
- case OPT_QUIET:
- close_stdio();
- break;
-
- case '?':
- /* getopt_long already printed an error message. */
+ memcached_free(&memc);
exit(EXIT_FAILURE);
-
- default:
- abort();
+ }
+ if (!opt.isset("quiet")) {
+ std::cout << "Time to flush " << align
+ << memcached_server_count(&memc)
+ << " servers: "
+ << align << time_format(flush_elapsed).count()
+ << " seconds.\n";
}
}
- if (opt_version) {
- version_command(PROGRAM_NAME);
- exit(EXIT_SUCCESS);
+ if (opt.isset("verbose")) {
+ std::cout << "- Generating random test data ...\n";
}
-
- if (opt_help) {
- help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
- exit(EXIT_SUCCESS);
+ auto keyval_start = time_clock::now();
+ keyval_st kv{test_count};
+ auto keyval_elapsed = time_clock::now() - keyval_start;
+
+ if (!opt.isset("quiet")) {
+ std::cout << "Time to generate "
+ << align << test_count
+ << " test keys: "
+ << align << time_format(keyval_elapsed).count()
+ << " seconds.\n";
}
- if ((opt_test == GET_TEST or opt_test == MGET_TEST) and opt_createial_load == 0)
- opt_createial_load = DEFAULT_INITIAL_LOAD;
-
- if (opt_execute_number == 0)
- opt_execute_number = DEFAULT_EXECUTE_NUMBER;
-
- if (opt_concurrency == 0)
- opt_concurrency = DEFAULT_CONCURRENCY;
-}
-
-void conclusions_print(conclusions_st *conclusion) {
- printf("\tThreads connecting to servers %u\n", opt_concurrency);
-#ifdef NOT_FINISHED
- printf("\tLoaded %u rows\n", conclusion->rows_loaded);
- printf("\tRead %u rows\n", conclusion->rows_read);
-#endif
- if (opt_test == SET_TEST)
- printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
- conclusion->load_time % 1000);
- else
- printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
- conclusion->read_time % 1000);
-}
-
-void flush_all(memcached_st *memc) {
- memcached_flush(memc, 0);
-}
-
-pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
- unsigned int *actual_loaded) {
- memcached_st *memc_clone = memcached_clone(NULL, memc);
- /* We always used non-blocking IO for load since it is faster */
- memcached_behavior_set(memc_clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0);
-
- pairs_st *pairs = pairs_generate(number_of, VALUE_BYTES);
- *actual_loaded = execute_set(memc_clone, pairs, number_of);
-
- memcached_free(memc_clone);
+ if (strcmp(opt.argof("test"), "set")) {
+ if (opt.isset("verbose")) {
+ std::cout << "- Feeding initial load to servers ...\n";
+ }
+ auto feed_start = time_clock::now();
+ auto count = execute_set(opt, memc, kv);
+ check_return(opt, memc, memcached_flush_buffers(&memc));
+ auto feed_elapsed = time_clock::now() - feed_start;
+
+ if (!opt.isset("quiet")) {
+ std::cout << "Time to set "
+ << align << count
+ << " keys: "
+ << align << time_format(feed_elapsed).count()
+ << " seconds.\n";
+ }
+ }
- return pairs;
+ if (opt.isset("verbose")) {
+ std::cout << "- Starting " << concurrency << " threads ...\n";
+ }
+ auto thread_start = time_clock::now();
+ std::vector<thread_context *> threads{};
+ threads.reserve(concurrency);
+ for (auto i = 0ul; i < concurrency; ++i) {
+ threads.push_back(new thread_context(opt, memc, kv));
+ }
+ auto thread_elapsed = time_clock::now() - thread_start;
+ if (!opt.isset("quiet")) {
+ std::cout << "Time to start "
+ << align << concurrency
+ << " threads: "
+ << time_format(thread_elapsed).count()
+ << " seconds.\n";
+ }
+ if (opt.isset("verbose")) {
+ std::cout << "- Starting test: " << test_count
+ << " x " << opt.argof("test")
+ << " x " << concurrency
+ << " ...\n";
+ }
+ auto count = 0ul;
+ auto test_start = time_clock::now();
+ wakeup.store(true, std::memory_order_release);
+ for (auto &thread : threads) {
+ count += thread->complete();
+ delete thread;
+ }
+ auto test_elapsed = time_clock::now() - test_start;
+
+ if (!opt.isset("quiet")) {
+ std::cout << "--------------------------------------------------------------------\n"
+ << "Time to " << std::left << std::setw(4)
+ << opt.argof("test") << " "
+ << align << count
+ << " keys by "
+ << std::setw(4)
+ << concurrency << " threads: "
+ << align << time_format(test_elapsed).count()
+ << " seconds.\n";
+
+ std::cout << "--------------------------------------------------------------------\n"
+ << "Time total: "
+ << align << std::setw(12)
+ << time_format(time_clock::now() - total_start).count()
+ << " seconds.\n";
+ }
+ exit(EXIT_SUCCESS);
}