#include "client_options.h"
#include "utilities.h"
#include "generator.h"
-#include "execute.h"
#define DEFAULT_INITIAL_LOAD 10000
#define DEFAULT_EXECUTE_NUMBER 10000
#define VALUE_BYTES 4096
#define PROGRAM_NAME "memslap"
-#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
+#define PROGRAM_DESCRIPTION "Generates a load against a memcached cluster of servers."
/* Global Thread counter */
volatile unsigned int master_wakeup;
, memc(NULL)
, root(memc_arg) {}
- void init() { memc = memcached_clone(NULL, root); }
+ void init() {
+ memc = memcached_clone(NULL, root);
+ }
~thread_context_st() {
if (execute_pairs) {
static bool opt_udp_io = false;
test_t opt_test = SET_TEST;
+
+unsigned int execute_set(memcached_st *memc, pairs_st *pairs, unsigned int number_of) {
+ uint32_t count = 0;
+ for (; count < number_of; ++count) {
+ memcached_return_t rc = memcached_set(memc, pairs[count].key, pairs[count].key_length,
+ pairs[count].value, pairs[count].value_length, 0, 0);
+ if (memcached_failed(rc)) {
+ fprintf(stderr, "%s:%d Failure on %u insert (%s) of %.*s\n", __FILE__, __LINE__, count,
+ memcached_last_error_message(memc), (unsigned int) pairs[count].key_length,
+ pairs[count].key);
+
+ // We will try to reconnect and see if that fixes the issue
+ memcached_quit(memc);
+
+ return count;
+ }
+ }
+
+ return count;
+}
+
+/*
+ Execute a memcached_get() on a set of pairs.
+ Return the number of rows retrieved.
+*/
+static unsigned int execute_get(memcached_st *memc, pairs_st *pairs, unsigned int number_of) {
+ unsigned int x;
+ unsigned int retrieved;
+
+ for (retrieved = 0, x = 0; x < number_of; x++) {
+ size_t value_length;
+ uint32_t flags;
+
+ unsigned int fetch_key = (unsigned int) ((unsigned int) random() % number_of);
+
+ memcached_return_t rc;
+ char *value = memcached_get(memc, pairs[fetch_key].key, pairs[fetch_key].key_length,
+ &value_length, &flags, &rc);
+
+ if (memcached_failed(rc)) {
+ fprintf(stderr, "%s:%d Failure on read(%s) of %.*s\n", __FILE__, __LINE__,
+ memcached_last_error_message(memc), (unsigned int) pairs[fetch_key].key_length,
+ pairs[fetch_key].key);
+ } else {
+ retrieved++;
+ }
+
+ ::free(value);
+ }
+
+ return retrieved;
+}
+
+/**
+ * Callback function to count the number of results
+ */
+static memcached_return_t callback_counter(const memcached_st *ptr, memcached_result_st *result,
+ void *context) {
+ (void) ptr;
+ (void) result;
+ unsigned int *counter = (unsigned int *) context;
+ *counter = *counter + 1;
+
+ return MEMCACHED_SUCCESS;
+}
+
+/**
+ * Try to run a large mget to get all of the keys
+ * @param memc memcached handle
+ * @param keys the keys to get
+ * @param key_length the length of the keys
+ * @param number_of the number of keys to try to get
+ * @return the number of keys received
+ */
+static unsigned int execute_mget(memcached_st *memc, const char *const *keys, size_t *key_length,
+ unsigned int number_of) {
+ unsigned int retrieved = 0;
+ memcached_execute_fn callbacks[] = {callback_counter};
+ memcached_return_t rc;
+ rc = memcached_mget_execute(memc, keys, key_length, (size_t) number_of, callbacks, &retrieved, 1);
+
+ if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_NOTFOUND || rc == MEMCACHED_BUFFERED
+ || rc == MEMCACHED_END)
+ {
+ rc = memcached_fetch_execute(memc, callbacks, (void *) &retrieved, 1);
+ if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND && rc != MEMCACHED_END) {
+ fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__,
+ memcached_strerror(memc, rc));
+ memcached_quit(memc);
+ return 0;
+ }
+ } else {
+ fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__,
+ memcached_strerror(memc, rc));
+ memcached_quit(memc);
+ return 0;
+ }
+
+ return retrieved;
+}
+
extern "C" {
static __attribute__((noreturn)) void *run_task(void *p) {
execute_set(context->memc, context->execute_pairs, context->execute_number);
break;
- case GET_TEST: execute_get(context->memc, context->initial_pairs, context->initial_number); break;
+ case GET_TEST:
+ execute_get(context->memc, context->initial_pairs, context->initial_number);
+ break;
case MGET_TEST:
execute_mget(context->memc, (const char *const *) context->keys, context->key_lengths,
- context->initial_number);
+ context->initial_number);
break;
}
pthread_exit(0);
}
-}
+
+} // extern "C"
+
int main(int argc, char *argv[]) {
conclusions_st conclusion;
}
/* now you create the thread */
- if (pthread_create(threads + x, NULL, run_task, (void *) context) != 0) {
+ if (pthread_create(threads + x, NULL, run_task, (void *) context)) {
fprintf(stderr, "Could not create thread\n");
exit(1);
}
break;
switch (option_rv) {
- case 0: break;
+ case 0:
+ break;
case OPT_UDP:
if (opt_test == GET_TEST) {
opt_udp_io = true;
break;
- case OPT_BINARY: opt_binary = true; break;
+ case OPT_BINARY:
+ opt_binary = true;
+ break;
- case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break;
+ case OPT_VERBOSE: /* --verbose or -v */
+ opt_verbose = OPT_VERBOSE;
+ break;
- case OPT_DEBUG: /* --debug or -d */ opt_verbose = OPT_DEBUG; break;
+ case OPT_DEBUG: /* --debug or -d */
+ opt_verbose = OPT_DEBUG;
+ break;
- case OPT_VERSION: /* --version or -V */ opt_version = true; break;
+ case OPT_VERSION: /* --version or -V */
+ opt_version = true;
+ break;
- case OPT_HELP: /* --help or -h */ opt_help = true; break;
+ case OPT_HELP: /* --help or -h */
+ opt_help = true;
+ break;
- case OPT_SERVERS: /* --servers or -s */ opt_servers = strdup(optarg); break;
+ case OPT_SERVERS: /* --servers or -s */
+ opt_servers = strdup(optarg);
+ break;
case OPT_SLAP_TEST:
if (strcmp(optarg, "get") == 0) {
case OPT_SLAP_CONCURRENCY:
errno = 0;
opt_concurrency = (unsigned int) strtoul(optarg, (char **) NULL, 10);
- if (errno != 0) {
+ if (errno) {
fprintf(stderr, "Invalid value for concurrency: %s\n", optarg);
exit(EXIT_FAILURE);
}
case OPT_SLAP_EXECUTE_NUMBER:
errno = 0;
opt_execute_number = (unsigned int) strtoul(optarg, (char **) NULL, 10);
- if (errno != 0) {
+ if (errno) {
fprintf(stderr, "Invalid value for execute: %s\n", optarg);
exit(EXIT_FAILURE);
}
case OPT_SLAP_INITIAL_LOAD:
errno = 0;
opt_createial_load = (unsigned int) strtoul(optarg, (char **) NULL, 10);
- if (errno != 0) {
+ if (errno) {
fprintf(stderr, "Invalid value for initial load: %s\n", optarg);
exit(EXIT_FAILURE);
}
break;
- case OPT_QUIET: close_stdio(); break;
+ case OPT_QUIET:
+ close_stdio();
+ break;
case '?':
/* getopt_long already printed an error message. */
exit(EXIT_FAILURE);
- default: abort();
+ default:
+ abort();
}
}