Missing hosts file.
[awesomized/libmemcached] / src / memslap.c
index 3551536b75f95830acede837f604f6e6e394ccd6..c9c65771d94fc020e0d2da741dffc86bd63697d4 100644 (file)
 #include <sys/types.h>
 #include <sys/mman.h>
 #include <fcntl.h>
+#include <sys/time.h>
 #include <getopt.h>
+#include <pthread.h>
 
 #include <memcached.h>
 
 #include "client_options.h"
 #include "utilities.h"
+#include "generator.h"
+#include "execute.h"
 
+#define DEFAULT_INITIAL_LOAD 10000
+#define DEFAULT_EXECUTE_NUMBER 10000
+#define DEFAULT_CONCURRENCY 1
 
-/* Use this for string generation */
-static const char ALPHANUMERICS[]=
-  "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
+#define PROGRAM_NAME "memslap"
+#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
 
-#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1)
+/* Global Thread counter */
+unsigned int thread_counter;
+pthread_mutex_t counter_mutex;
+pthread_cond_t count_threshhold;
+unsigned int master_wakeup;
+pthread_mutex_t sleeper_mutex;
+pthread_cond_t sleep_threshhold;
+
+void *run_task(void *p);
 
 /* Types */
-typedef struct pairs_st pairs_st;
+typedef struct conclusions_st conclusions_st;
+typedef struct thread_context_st thread_context_st;
+typedef enum {
+  SET_TEST,
+  GET_TEST,
+} test_type;
+
+struct thread_context_st {
+  unsigned int key_count;
+  pairs_st *initial_pairs;
+  unsigned int initial_number;
+  pairs_st *execute_pairs;
+  unsigned int execute_number;
+  test_type test;
+  memcached_server_st *servers;
+};
 
-struct pairs_st {
-  char *key;
-  size_t key_length;
-  char *value;
-  size_t value_length;
+struct conclusions_st {
+  long int load_time;
+  long int read_time;
+  unsigned int rows_loaded;
+  unsigned int rows_read;
 };
 
 /* Prototypes */
 void options_parse(int argc, char *argv[]);
-static pairs_st *pairs_generate(void);
-static void pairs_free(pairs_st *pairs);
-static void get_random_string(char *buffer, size_t size);
+void conclusions_print(conclusions_st *conclusion);
+void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
+pairs_st *load_createial_data(memcached_server_st *servers, unsigned int number_of, 
+                            unsigned int *actual_loaded);
 
 static int opt_verbose= 0;
-static int opt_default_pairs= 100;
+static unsigned int opt_execute_number= 0;
+static unsigned int opt_createial_load= 0;
+static unsigned int opt_concurrency= 0;
 static int opt_displayflag= 0;
 static char *opt_servers= NULL;
+test_type opt_test= SET_TEST;
 
 int main(int argc, char *argv[])
 {
-  unsigned int x;
-  memcached_return rc;
-  memcached_st *memc;
-  pairs_st *pairs;
+  conclusions_st conclusion;
+  memcached_server_st *servers;
+
+  memset(&conclusion, 0, sizeof(conclusions_st));
 
   srandom(time(NULL));
-  memc= memcached_init(NULL);
   options_parse(argc, argv);
 
   if (!opt_servers)
     exit(0);
 
-  parse_opt_servers(memc, opt_servers);
+  servers= parse_opt_servers(opt_servers);
+
+  pthread_mutex_init(&counter_mutex, NULL);
+  pthread_cond_init(&count_threshhold, NULL);
+  pthread_mutex_init(&sleeper_mutex, NULL);
+  pthread_cond_init(&sleep_threshhold, NULL);
+
+  scheduler(servers, &conclusion);
+
+  free(opt_servers);
+
+  (void)pthread_mutex_destroy(&counter_mutex);
+  (void)pthread_cond_destroy(&count_threshhold);
+  (void)pthread_mutex_destroy(&sleeper_mutex);
+  (void)pthread_cond_destroy(&sleep_threshhold);
+  conclusions_print(&conclusion);
+  memcached_server_list_free(servers);
+
+  return 0;
+}
+
+void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
+{
+  unsigned int x;
+  unsigned int actual_loaded;
+
+  struct timeval start_time, end_time;
+  pthread_t mainthread;            /* Thread descriptor */
+  pthread_attr_t attr;          /* Thread attributes */
+  pairs_st *pairs= NULL;
 
-  pairs= pairs_generate();
+  pthread_attr_init(&attr);
+  pthread_attr_setdetachstate(&attr,
+                              PTHREAD_CREATE_DETACHED);
 
+  if (opt_createial_load)
+    pairs= load_createial_data(servers, opt_createial_load, &actual_loaded);
 
-  for (x= 0; x < opt_default_pairs; x++)
+  pthread_mutex_lock(&counter_mutex);
+  thread_counter= 0;
+
+  pthread_mutex_lock(&sleeper_mutex);
+  master_wakeup= 1;
+  pthread_mutex_unlock(&sleeper_mutex);
+
+  for (x= 0; x < opt_concurrency; x++)
   {
-    printf("Key(%u) %.10s \t%.10s\n", x, pairs[x].key, pairs[x].value);
-    rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
-                      pairs[x].value, pairs[x].value_length,
-                      0, 0);
-    if (rc != MEMCACHED_SUCCESS)
-      fprintf(stderr, "Failured on insert of %.*s\n", 
-              (unsigned int)pairs[x].key_length, pairs[x].key);
+    thread_context_st *context;
+    context= (thread_context_st *)malloc(sizeof(thread_context_st));
+
+    context->servers= servers;
+    context->test= opt_test;
+
+    context->initial_pairs= pairs;
+    context->initial_number= actual_loaded;
+
+    if (opt_test == SET_TEST)
+    {
+      context->execute_pairs= pairs_generate(opt_execute_number);
+      context->execute_number= opt_execute_number;
+    }
+
+    /* now you create the thread */
+    if (pthread_create(&mainthread, &attr, run_task,
+                       (void *)context) != 0)
+    {
+      fprintf(stderr,"Could not create thread\n");
+      exit(1);
+    }
+    thread_counter++;
   }
 
-  for (x= 0; x < opt_default_pairs; x++)
+  pthread_mutex_unlock(&counter_mutex);
+  pthread_attr_destroy(&attr);
+
+  pthread_mutex_lock(&sleeper_mutex);
+  master_wakeup= 0;
+  pthread_mutex_unlock(&sleeper_mutex);
+  pthread_cond_broadcast(&sleep_threshhold);
+
+  gettimeofday(&start_time, NULL);
+  /*
+    We loop until we know that all children have cleaned up.
+  */
+  pthread_mutex_lock(&counter_mutex);
+  while (thread_counter)
   {
-    printf("Key(%u) %.10s \n", x, pairs[x].key);
-    char *value;
-    size_t value_length;
-    uint16_t flags;
-
-    value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
-                         &value_length,
-                         &flags, &rc);
-
-    WATCHPOINT_ERROR(rc);
-    if (rc != MEMCACHED_SUCCESS)
-      fprintf(stderr, "Failured on read of %.*s\n", 
-              (unsigned int)pairs[x].key_length, pairs[x].key);
-    printf("\t%.10s\n", value);
-    free(value);
-  }
+    struct timespec abstime;
 
-  pairs_free(pairs);
+    memset(&abstime, 0, sizeof(struct timespec));
+    abstime.tv_sec= 1;
 
-  free(opt_servers);
+    pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
+  }
+  pthread_mutex_unlock(&counter_mutex);
 
-  memcached_deinit(memc);
+  gettimeofday(&end_time, NULL);
 
-  return 0;
+  conclusion->load_time= timedif(end_time, start_time);
+  conclusion->read_time= timedif(end_time, start_time);
+  pairs_free(pairs);
 }
 
 void options_parse(int argc, char *argv[])
 {
+  memcached_programs_help_st help_options[]=
+  {
+    {0},
+  };
+
   static struct option long_options[]=
     {
-      {"version", no_argument, NULL, OPT_VERSION},
-      {"help", no_argument, NULL, OPT_HELP},
-      {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
+      {"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
       {"debug", no_argument, &opt_verbose, OPT_DEBUG},
-      {"servers", required_argument, NULL, OPT_SERVERS},
+      {"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
       {"flag", no_argument, &opt_displayflag, OPT_FLAG},
-      {"default-pairs", required_argument, NULL, OPT_SLAP_DEFAULT_PAIRS},
+      {"help", no_argument, NULL, OPT_HELP},
+      {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
+      {"servers", required_argument, NULL, OPT_SERVERS},
+      {"test", required_argument, NULL, OPT_SLAP_TEST},
+      {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
+      {"version", no_argument, NULL, OPT_VERSION},
       {0, 0, 0, 0},
     };
 
@@ -130,18 +229,32 @@ void options_parse(int argc, char *argv[])
       opt_verbose = OPT_DEBUG;
       break;
     case OPT_VERSION: /* --version or -V */
-      printf("memcache tools, memcat, v1.0\n");
-      exit(0);
+      version_command(PROGRAM_NAME);
       break;
     case OPT_HELP: /* --help or -h */
-      printf("useful help messages go here\n");
-      exit(0);
+      help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
       break;
     case OPT_SERVERS: /* --servers or -s */
       opt_servers= strdup(optarg);
       break;
-    case OPT_SLAP_DEFAULT_PAIRS:
-      opt_default_pairs= strtol(optarg, (char **)NULL, 10);
+    case OPT_SLAP_TEST:
+      if (!strcmp(optarg, "get"))
+        opt_test= GET_TEST ;
+      else if (!strcmp(optarg, "set"))
+        opt_test= SET_TEST;
+      else 
+      {
+        fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
+        exit(1);
+      }
+      break;
+    case OPT_SLAP_CONCURRENCY:
+      opt_concurrency= strtol(optarg, (char **)NULL, 10);
+    case OPT_SLAP_EXECUTE_NUMBER:
+      opt_execute_number= strtol(optarg, (char **)NULL, 10);
+      break;
+    case OPT_SLAP_INITIAL_LOAD:
+      opt_createial_load= strtol(optarg, (char **)NULL, 10);
       break;
     case '?':
       /* getopt_long already printed an error message. */
@@ -150,57 +263,85 @@ void options_parse(int argc, char *argv[])
       abort();
     }
   }
-}
 
-static void pairs_free(pairs_st *pairs)
-{
-  unsigned int x;
+  if (opt_test == GET_TEST && opt_createial_load == 0)
+    opt_createial_load= DEFAULT_INITIAL_LOAD;
 
-  for (x= 0; x < opt_default_pairs; x++)
-  {
-    free(pairs[x].key);
-    free(pairs[x].value);
-  }
+  if (opt_execute_number == 0)
+    opt_execute_number= DEFAULT_EXECUTE_NUMBER;
 
-  free(pairs);
+  if (opt_concurrency == 0)
+    opt_concurrency= DEFAULT_CONCURRENCY;
 }
 
-static pairs_st *pairs_generate(void)
+void conclusions_print(conclusions_st *conclusion)
 {
-  unsigned int x;
-  pairs_st *pairs;
+  printf("\tThreads connecting to servers %u\n", opt_concurrency);
+#ifdef NOT_FINISHED
+  printf("\tLoaded %u rows\n", conclusion->rows_loaded);
+  printf("\tRead %u rows\n", conclusion->rows_read);
+#endif
+  if (opt_test == SET_TEST)
+    printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, 
+           conclusion->load_time % 1000);
+  else
+    printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, 
+           conclusion->read_time % 1000);
+}
 
-  pairs= (pairs_st*)malloc(sizeof(pairs_st) * opt_default_pairs);
+void *run_task(void *p)
+{
+  thread_context_st *context= (thread_context_st *)p;
+  memcached_st *memc;
 
-  if (!pairs)
-    goto error;
+  memc= memcached_create(NULL);
+  
+  memcached_server_push(memc, context->servers);
 
-  for (x= 0; x < opt_default_pairs; x++)
+  pthread_mutex_lock(&sleeper_mutex);
+  while (master_wakeup)
   {
-    pairs[x].key= (char *)malloc(sizeof(char) * 100);
-    if (!pairs[x].key)
-      goto error;
-    get_random_string(pairs[x].key, 100);
-    pairs[x].key_length= 100;
-
-    pairs[x].value= (char *)malloc(sizeof(char) * 400);
-    if (!pairs[x].value)
-      goto error;
-    get_random_string(pairs[x].value, 400);
-    pairs[x].value_length= 400;
+    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
+  } 
+  pthread_mutex_unlock(&sleeper_mutex);
+
+  /* Do Stuff */
+  switch (context->test)
+  {
+  case SET_TEST:
+    execute_set(memc, context->execute_pairs, context->execute_number);
+    break;
+  case GET_TEST:
+    execute_get(memc, context->initial_pairs, context->initial_number);
+    break;
   }
 
-  return pairs;
-error:
-    fprintf(stderr, "Memory Allocation failure in pairs_generate.\n");
-    exit(0);
+  pthread_mutex_lock(&counter_mutex);
+  thread_counter--;
+  pthread_cond_signal(&count_threshhold);
+  pthread_mutex_unlock(&counter_mutex);
+  memcached_free(memc);
+
+  if (context->execute_pairs)
+    pairs_free(context->execute_pairs);
+  free(context);
+
+  return NULL;
 }
 
-static void get_random_string(char *buffer, size_t size)
+pairs_st *load_createial_data(memcached_server_st *servers, unsigned int number_of, 
+                            unsigned int *actual_loaded)
 {
-  char *buffer_ptr= buffer;
+  memcached_st *memc;
+  pairs_st *pairs;
 
-  while (--size)
-    *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
-  *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
+  memc= memcached_create(NULL);
+  memcached_server_push(memc, servers);
+
+  pairs= pairs_generate(number_of);
+  *actual_loaded= execute_set(memc, pairs, number_of);
+
+  memcached_free(memc);
+
+  return pairs;
 }