bin: consolidate clients
[m6w6/libmemcached] / src / bin / memslap.cc
index b7b606e97e9bbc05870e583b879c8f17fa40706a..a9fb731ff09c671e7a2fea2a0eef186399d0f7ee 100644 (file)
@@ -37,7 +37,6 @@
 #include "client_options.h"
 #include "utilities.h"
 #include "generator.h"
-#include "execute.h"
 
 #define DEFAULT_INITIAL_LOAD   10000
 #define DEFAULT_EXECUTE_NUMBER 10000
@@ -46,7 +45,7 @@
 #define VALUE_BYTES 4096
 
 #define PROGRAM_NAME        "memslap"
-#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
+#define PROGRAM_DESCRIPTION "Generates a load against a memcached cluster of servers."
 
 /* Global Thread counter */
 volatile unsigned int master_wakeup;
@@ -80,7 +79,9 @@ struct thread_context_st {
   , memc(NULL)
   , root(memc_arg) {}
 
-  void init() { memc = memcached_clone(NULL, root); }
+  void init() {
+    memc = memcached_clone(NULL, root);
+  }
 
   ~thread_context_st() {
     if (execute_pairs) {
@@ -123,6 +124,107 @@ static char *opt_servers = NULL;
 static bool opt_udp_io = false;
 test_t opt_test = SET_TEST;
 
+
+unsigned int execute_set(memcached_st *memc, pairs_st *pairs, unsigned int number_of) {
+  uint32_t count = 0;
+  for (; count < number_of; ++count) {
+    memcached_return_t rc = memcached_set(memc, pairs[count].key, pairs[count].key_length,
+        pairs[count].value, pairs[count].value_length, 0, 0);
+    if (memcached_failed(rc)) {
+      fprintf(stderr, "%s:%d Failure on %u insert (%s) of %.*s\n", __FILE__, __LINE__, count,
+          memcached_last_error_message(memc), (unsigned int) pairs[count].key_length,
+          pairs[count].key);
+
+      // We will try to reconnect and see if that fixes the issue
+      memcached_quit(memc);
+
+      return count;
+    }
+  }
+
+  return count;
+}
+
+/*
+  Execute a memcached_get() on a set of pairs.
+  Return the number of rows retrieved.
+*/
+static unsigned int execute_get(memcached_st *memc, pairs_st *pairs, unsigned int number_of) {
+  unsigned int x;
+  unsigned int retrieved;
+
+  for (retrieved = 0, x = 0; x < number_of; x++) {
+    size_t value_length;
+    uint32_t flags;
+
+    unsigned int fetch_key = (unsigned int) ((unsigned int) random() % number_of);
+
+    memcached_return_t rc;
+    char *value = memcached_get(memc, pairs[fetch_key].key, pairs[fetch_key].key_length,
+        &value_length, &flags, &rc);
+
+    if (memcached_failed(rc)) {
+      fprintf(stderr, "%s:%d Failure on read(%s) of %.*s\n", __FILE__, __LINE__,
+          memcached_last_error_message(memc), (unsigned int) pairs[fetch_key].key_length,
+          pairs[fetch_key].key);
+    } else {
+      retrieved++;
+    }
+
+    ::free(value);
+  }
+
+  return retrieved;
+}
+
+/**
+ * Callback function to count the number of results
+ */
+static memcached_return_t callback_counter(const memcached_st *ptr, memcached_result_st *result,
+    void *context) {
+  (void) ptr;
+  (void) result;
+  unsigned int *counter = (unsigned int *) context;
+  *counter = *counter + 1;
+
+  return MEMCACHED_SUCCESS;
+}
+
+/**
+ * Try to run a large mget to get all of the keys
+ * @param memc memcached handle
+ * @param keys the keys to get
+ * @param key_length the length of the keys
+ * @param number_of the number of keys to try to get
+ * @return the number of keys received
+ */
+static unsigned int execute_mget(memcached_st *memc, const char *const *keys, size_t *key_length,
+    unsigned int number_of) {
+  unsigned int retrieved = 0;
+  memcached_execute_fn callbacks[] = {callback_counter};
+  memcached_return_t rc;
+  rc = memcached_mget_execute(memc, keys, key_length, (size_t) number_of, callbacks, &retrieved, 1);
+
+  if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_NOTFOUND || rc == MEMCACHED_BUFFERED
+      || rc == MEMCACHED_END)
+  {
+    rc = memcached_fetch_execute(memc, callbacks, (void *) &retrieved, 1);
+    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND && rc != MEMCACHED_END) {
+      fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__,
+          memcached_strerror(memc, rc));
+      memcached_quit(memc);
+      return 0;
+    }
+  } else {
+    fprintf(stderr, "%s:%d Failed to execute mget: %s\n", __FILE__, __LINE__,
+        memcached_strerror(memc, rc));
+    memcached_quit(memc);
+    return 0;
+  }
+
+  return retrieved;
+}
+
 extern "C" {
 
 static __attribute__((noreturn)) void *run_task(void *p) {
@@ -143,11 +245,13 @@ static __attribute__((noreturn)) void *run_task(void *p) {
     execute_set(context->memc, context->execute_pairs, context->execute_number);
     break;
 
-  case GET_TEST: execute_get(context->memc, context->initial_pairs, context->initial_number); break;
+  case GET_TEST:
+    execute_get(context->memc, context->initial_pairs, context->initial_number);
+    break;
 
   case MGET_TEST:
     execute_mget(context->memc, (const char *const *) context->keys, context->key_lengths,
-                 context->initial_number);
+        context->initial_number);
     break;
   }
 
@@ -155,7 +259,9 @@ static __attribute__((noreturn)) void *run_task(void *p) {
 
   pthread_exit(0);
 }
-}
+
+} // extern "C"
+
 
 int main(int argc, char *argv[]) {
   conclusions_st conclusion;
@@ -281,7 +387,7 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) {
     }
 
     /* now you create the thread */
-    if (pthread_create(threads + x, NULL, run_task, (void *) context) != 0) {
+    if (pthread_create(threads + x, NULL, run_task, (void *) context)) {
       fprintf(stderr, "Could not create thread\n");
       exit(1);
     }
@@ -345,7 +451,8 @@ void options_parse(int argc, char *argv[]) {
       break;
 
     switch (option_rv) {
-    case 0: break;
+    case 0:
+      break;
 
     case OPT_UDP:
       if (opt_test == GET_TEST) {
@@ -357,17 +464,29 @@ void options_parse(int argc, char *argv[]) {
       opt_udp_io = true;
       break;
 
-    case OPT_BINARY: opt_binary = true; break;
+    case OPT_BINARY:
+      opt_binary = true;
+      break;
 
-    case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break;
+    case OPT_VERBOSE: /* --verbose or -v */
+      opt_verbose = OPT_VERBOSE;
+      break;
 
-    case OPT_DEBUG: /* --debug or -d */ opt_verbose = OPT_DEBUG; break;
+    case OPT_DEBUG: /* --debug or -d */
+      opt_verbose = OPT_DEBUG;
+      break;
 
-    case OPT_VERSION: /* --version or -V */ opt_version = true; break;
+    case OPT_VERSION: /* --version or -V */
+      opt_version = true;
+      break;
 
-    case OPT_HELP: /* --help or -h */ opt_help = true; break;
+    case OPT_HELP: /* --help or -h */
+      opt_help = true;
+      break;
 
-    case OPT_SERVERS: /* --servers or -s */ opt_servers = strdup(optarg); break;
+    case OPT_SERVERS: /* --servers or -s */
+      opt_servers = strdup(optarg);
+      break;
 
     case OPT_SLAP_TEST:
       if (strcmp(optarg, "get") == 0) {
@@ -391,7 +510,7 @@ void options_parse(int argc, char *argv[]) {
     case OPT_SLAP_CONCURRENCY:
       errno = 0;
       opt_concurrency = (unsigned int) strtoul(optarg, (char **) NULL, 10);
-      if (errno != 0) {
+      if (errno) {
         fprintf(stderr, "Invalid value for concurrency: %s\n", optarg);
         exit(EXIT_FAILURE);
       }
@@ -400,7 +519,7 @@ void options_parse(int argc, char *argv[]) {
     case OPT_SLAP_EXECUTE_NUMBER:
       errno = 0;
       opt_execute_number = (unsigned int) strtoul(optarg, (char **) NULL, 10);
-      if (errno != 0) {
+      if (errno) {
         fprintf(stderr, "Invalid value for execute: %s\n", optarg);
         exit(EXIT_FAILURE);
       }
@@ -409,19 +528,22 @@ void options_parse(int argc, char *argv[]) {
     case OPT_SLAP_INITIAL_LOAD:
       errno = 0;
       opt_createial_load = (unsigned int) strtoul(optarg, (char **) NULL, 10);
-      if (errno != 0) {
+      if (errno) {
         fprintf(stderr, "Invalid value for initial load: %s\n", optarg);
         exit(EXIT_FAILURE);
       }
       break;
 
-    case OPT_QUIET: close_stdio(); break;
+    case OPT_QUIET:
+      close_stdio();
+      break;
 
     case '?':
       /* getopt_long already printed an error message. */
       exit(EXIT_FAILURE);
 
-    default: abort();
+    default:
+      abort();
     }
   }