Leftover prototypes from deleted code removed.
[awesomized/libmemcached] / src / memslap.c
index 6945b7d9caf78ad902f87dab50a359d43dfa0f81..486ff7cedf5ffaf915a3a5bda798ee9440a8f4f3 100644 (file)
@@ -1,10 +1,12 @@
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <sys/mman.h>
 #include <fcntl.h>
+#include <assert.h>
 #include <sys/time.h>
 #include <getopt.h>
 #include <pthread.h>
 #include "client_options.h"
 #include "utilities.h"
 #include "generator.h"
+#include "execute.h"
+
+#define DEFAULT_INITIAL_LOAD 10000
+#define DEFAULT_EXECUTE_NUMBER 10000
+#define DEFAULT_CONCURRENCY 1
+
+#define PROGRAM_NAME "memslap"
+#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
 
 /* Global Thread counter */
 unsigned int thread_counter;
@@ -29,14 +39,18 @@ void *run_task(void *p);
 typedef struct conclusions_st conclusions_st;
 typedef struct thread_context_st thread_context_st;
 typedef enum {
-  AC_SET,
-  AC_GET,
-} run_action;
+  SET_TEST,
+  GET_TEST,
+} test_type;
 
 struct thread_context_st {
-  unsigned int x;
-  pairs_st *pairs;
-  run_action action;
+  unsigned int key_count;
+  pairs_st *initial_pairs;
+  unsigned int initial_number;
+  pairs_st *execute_pairs;
+  unsigned int execute_number;
+  test_type test;
+  memcached_st *memc;
 };
 
 struct conclusions_st {
@@ -49,68 +63,96 @@ struct conclusions_st {
 /* Prototypes */
 void options_parse(int argc, char *argv[]);
 void conclusions_print(conclusions_st *conclusion);
-void scheduler(conclusions_st *conclusion);
+void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
+pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, 
+                           unsigned int *actual_loaded);
+void flush_all(memcached_st *memc);
 
 static int opt_verbose= 0;
-static unsigned int opt_default_pairs= 100;
-static unsigned int opt_concurrency= 1;
+static int opt_flush= 0;
+static int opt_non_blocking_io= 0;
+static int opt_tcp_nodelay= 0;
+static unsigned int opt_execute_number= 0;
+static unsigned int opt_createial_load= 0;
+static unsigned int opt_concurrency= 0;
 static int opt_displayflag= 0;
 static char *opt_servers= NULL;
+test_type opt_test= SET_TEST;
 
 int main(int argc, char *argv[])
 {
-  unsigned int x;
-  memcached_return rc;
-  memcached_st *memc;
-  pairs_st *pairs;
   conclusions_st conclusion;
+  memcached_server_st *servers;
 
   memset(&conclusion, 0, sizeof(conclusions_st));
 
   srandom(time(NULL));
-  memc= memcached_init(NULL);
   options_parse(argc, argv);
 
   if (!opt_servers)
-    exit(0);
+  {
+    char *temp;
 
-  parse_opt_servers(memc, opt_servers);
+    if ((temp= getenv("MEMCACHED_SERVERS")))
+      opt_servers= strdup(temp);
+    else
+      exit(1);
+  }
 
-  pairs= pairs_generate(opt_default_pairs);
+  servers= memcached_servers_parse(opt_servers);
 
   pthread_mutex_init(&counter_mutex, NULL);
   pthread_cond_init(&count_threshhold, NULL);
   pthread_mutex_init(&sleeper_mutex, NULL);
   pthread_cond_init(&sleep_threshhold, NULL);
 
-  scheduler(&conclusion);
-
-  pairs_free(pairs);
+  scheduler(servers, &conclusion);
 
   free(opt_servers);
 
-  memcached_deinit(memc);
-
-  (void)pthread_mutex_init(&counter_mutex, NULL);
-  (void)pthread_cond_init(&count_threshhold, NULL);
-  (void)pthread_mutex_init(&sleeper_mutex, NULL);
-  (void)pthread_cond_init(&sleep_threshhold, NULL);
+  (void)pthread_mutex_destroy(&counter_mutex);
+  (void)pthread_cond_destroy(&count_threshhold);
+  (void)pthread_mutex_destroy(&sleeper_mutex);
+  (void)pthread_cond_destroy(&sleep_threshhold);
   conclusions_print(&conclusion);
+  memcached_server_list_free(servers);
 
   return 0;
 }
 
-void scheduler(conclusions_st *conclusion)
+void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
 {
   unsigned int x;
+  unsigned int actual_loaded= 0; /* Fix warning */
+  memcached_st *memc;
+
   struct timeval start_time, end_time;
   pthread_t mainthread;            /* Thread descriptor */
   pthread_attr_t attr;          /* Thread attributes */
+  pairs_st *pairs= NULL;
 
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr,
                               PTHREAD_CREATE_DETACHED);
 
+  memc= memcached_create(NULL);
+  memcached_server_push(memc, servers);
+
+  if (opt_flush)
+    flush_all(memc);
+  if (opt_createial_load)
+    pairs= load_create_data(memc, opt_createial_load, &actual_loaded);
+
+  /* We set this after we have loaded */
+  {
+    unsigned int value= 1;
+    if (opt_non_blocking_io)
+      memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &value);
+    if (opt_tcp_nodelay)
+      memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &value);
+  }
+
+
   pthread_mutex_lock(&counter_mutex);
   thread_counter= 0;
 
@@ -122,6 +164,19 @@ void scheduler(conclusions_st *conclusion)
   {
     thread_context_st *context;
     context= (thread_context_st *)malloc(sizeof(thread_context_st));
+    memset(context, 0, sizeof(thread_context_st));
+
+    context->memc= memcached_clone(NULL, memc);
+    context->test= opt_test;
+
+    context->initial_pairs= pairs;
+    context->initial_number= actual_loaded;
+
+    if (opt_test == SET_TEST)
+    {
+      context->execute_pairs= pairs_generate(opt_execute_number);
+      context->execute_number= opt_execute_number;
+    }
 
     /* now you create the thread */
     if (pthread_create(&mainthread, &attr, run_task,
@@ -161,19 +216,31 @@ void scheduler(conclusions_st *conclusion)
 
   conclusion->load_time= timedif(end_time, start_time);
   conclusion->read_time= timedif(end_time, start_time);
+  pairs_free(pairs);
 }
 
 void options_parse(int argc, char *argv[])
 {
+  memcached_programs_help_st help_options[]=
+  {
+    {0},
+  };
+
   static struct option long_options[]=
     {
-      {"version", no_argument, NULL, OPT_VERSION},
-      {"help", no_argument, NULL, OPT_HELP},
-      {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
+      {"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
       {"debug", no_argument, &opt_verbose, OPT_DEBUG},
-      {"servers", required_argument, NULL, OPT_SERVERS},
+      {"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
       {"flag", no_argument, &opt_displayflag, OPT_FLAG},
-      {"default-pairs", required_argument, NULL, OPT_SLAP_DEFAULT_PAIRS},
+      {"flush", no_argument, &opt_flush, OPT_FLUSH},
+      {"help", no_argument, NULL, OPT_HELP},
+      {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
+      {"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
+      {"servers", required_argument, NULL, OPT_SERVERS},
+      {"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY},
+      {"test", required_argument, NULL, OPT_SLAP_TEST},
+      {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
+      {"version", no_argument, NULL, OPT_VERSION},
       {0, 0, 0, 0},
     };
 
@@ -195,18 +262,32 @@ void options_parse(int argc, char *argv[])
       opt_verbose = OPT_DEBUG;
       break;
     case OPT_VERSION: /* --version or -V */
-      printf("memcache tools, memcat, v1.0\n");
-      exit(0);
+      version_command(PROGRAM_NAME);
       break;
     case OPT_HELP: /* --help or -h */
-      printf("useful help messages go here\n");
-      exit(0);
+      help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
       break;
     case OPT_SERVERS: /* --servers or -s */
       opt_servers= strdup(optarg);
       break;
-    case OPT_SLAP_DEFAULT_PAIRS:
-      opt_default_pairs= strtol(optarg, (char **)NULL, 10);
+    case OPT_SLAP_TEST:
+      if (!strcmp(optarg, "get"))
+        opt_test= GET_TEST ;
+      else if (!strcmp(optarg, "set"))
+        opt_test= SET_TEST;
+      else 
+      {
+        fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
+        exit(1);
+      }
+      break;
+    case OPT_SLAP_CONCURRENCY:
+      opt_concurrency= strtol(optarg, (char **)NULL, 10);
+    case OPT_SLAP_EXECUTE_NUMBER:
+      opt_execute_number= strtol(optarg, (char **)NULL, 10);
+      break;
+    case OPT_SLAP_INITIAL_LOAD:
+      opt_createial_load= strtol(optarg, (char **)NULL, 10);
       break;
     case '?':
       /* getopt_long already printed an error message. */
@@ -215,22 +296,38 @@ void options_parse(int argc, char *argv[])
       abort();
     }
   }
+
+  if (opt_test == GET_TEST && opt_createial_load == 0)
+    opt_createial_load= DEFAULT_INITIAL_LOAD;
+
+  if (opt_execute_number == 0)
+    opt_execute_number= DEFAULT_EXECUTE_NUMBER;
+
+  if (opt_concurrency == 0)
+    opt_concurrency= DEFAULT_CONCURRENCY;
 }
 
 void conclusions_print(conclusions_st *conclusion)
 {
+  printf("\tThreads connecting to servers %u\n", opt_concurrency);
+#ifdef NOT_FINISHED
   printf("\tLoaded %u rows\n", conclusion->rows_loaded);
   printf("\tRead %u rows\n", conclusion->rows_read);
-  printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, 
-         conclusion->load_time % 1000);
-  printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, 
-         conclusion->read_time % 1000);
+#endif
+  if (opt_test == SET_TEST)
+    printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, 
+           conclusion->load_time % 1000);
+  else
+    printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, 
+           conclusion->read_time % 1000);
 }
 
 void *run_task(void *p)
 {
-  unsigned int x;
   thread_context_st *context= (thread_context_st *)p;
+  memcached_st *memc;
+
+  memc= context->memc;
 
   pthread_mutex_lock(&sleeper_mutex);
   while (master_wakeup)
@@ -240,45 +337,49 @@ void *run_task(void *p)
   pthread_mutex_unlock(&sleeper_mutex);
 
   /* Do Stuff */
-
-  switch (context->action)
+  switch (context->test)
   {
-  case AC_SET:
-    for (x= 0; x < opt_default_pairs; x++)
-    {
-      rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
-                        pairs[x].value, pairs[x].value_length,
-                        0, 0);
-      if (rc != MEMCACHED_SUCCESS)
-        fprintf(stderr, "Failured on insert of %.*s\n", 
-                (unsigned int)pairs[x].key_length, pairs[x].key);
-      conclusion->rows_loaded++;
-  }
+  case SET_TEST:
+    execute_set(memc, context->execute_pairs, context->execute_number);
     break;
-  case AC_GET:
-    for (x= 0; x < opt_default_pairs; x++)
-    {
-      char *value;
-      size_t value_length;
-      uint16_t flags;
-
-      value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
-                           &value_length,
-                           &flags, &rc);
-
-      if (rc != MEMCACHED_SUCCESS)
-        fprintf(stderr, "Failured on read of %.*s\n", 
-                (unsigned int)pairs[x].key_length, pairs[x].key);
-      conclusion->rows_read++;
-      free(value);
-    }
+  case GET_TEST:
+    execute_get(memc, context->initial_pairs, context->initial_number);
     break;
   }
 
+  memcached_free(memc);
+
+  if (context->execute_pairs)
+    pairs_free(context->execute_pairs);
+  free(context);
+
   pthread_mutex_lock(&counter_mutex);
   thread_counter--;
   pthread_cond_signal(&count_threshhold);
   pthread_mutex_unlock(&counter_mutex);
 
-  free(context);
+  return NULL;
+}
+
+void flush_all(memcached_st *memc)
+{
+  memcached_flush(memc, 0);
+}
+
+pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, 
+                           unsigned int *actual_loaded)
+{
+  memcached_st *clone;
+  pairs_st *pairs;
+
+  clone= memcached_clone(NULL, memc);
+  /* We always used non-blocking IO for load since it is faster */
+  memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL );
+
+  pairs= pairs_generate(number_of);
+  *actual_loaded= execute_set(clone, pairs, number_of);
+
+  memcached_free(clone);
+
+  return pairs;
 }