Added/restructured all additional hostname information
authorBrian Aker <brian@tangent.org>
Sun, 30 Sep 2007 17:18:42 +0000 (10:18 -0700)
committerBrian Aker <brian@tangent.org>
Sun, 30 Sep 2007 17:18:42 +0000 (10:18 -0700)
include/memcached.h
lib/Makefile.am
lib/memcached.c
lib/memcached_connect.c
lib/memcached_delete.c
lib/memcached_get.c
lib/memcached_response.c
lib/memcached_strerror.c
src/memslap.c
tests/output.res
tests/test.c

index 634f8884f0fad2306ff4923a25246315892bf44f..23688d552e6d6e296b4a4ec4c56ca9723a79016b 100644 (file)
@@ -60,6 +60,9 @@ typedef enum {
   MEMCACHED_PARTIAL_READ,
   MEMCACHED_SOME_ERRORS,
   MEMCACHED_NO_SERVERS,
+  MEMCACHED_END,
+  MEMCACHED_DELETED,
+  MEMCACHED_VALUE,
   MEMCACHED_MAXIMUM_RETURN, /* Always add new error code before */
 } memcached_return;
 
@@ -175,6 +178,13 @@ char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length,
 memcached_return memcached_server_add(memcached_st *ptr, char *hostname, 
                                       unsigned int port);
 
+memcached_server_st *memcached_server_list_append(memcached_server_st *ptr, 
+                                             char *hostname, unsigned int port, 
+                                             memcached_return *error);
+void memcached_server_list_free(memcached_server_st *ptr);
+memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *list);
+unsigned int memcached_server_list_count(memcached_server_st *ptr);
+
 /* These are all private, do not use. */
 memcached_return memcached_connect(memcached_st *ptr);
 memcached_return memcached_response(memcached_st *ptr, 
@@ -205,6 +215,8 @@ void memcached_string_free(memcached_st *ptr, memcached_string_st *string);
 /* Some personal debugging functions */
 #define WATCHPOINT printf("WATCHPOINT %s:%d\n", __FILE__, __LINE__);fflush(stdout);
 #define WATCHPOINT_ERROR(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout);
+#define WATCHPOINT_STRING(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, A);fflush(stdout);
+#define WATCHPOINT_NUMBER(A) printf("WATCHPOINT %s:%d %d\n", __FILE__, __LINE__, A);fflush(stdout);
 
 
 #ifdef __cplusplus
index 1c6885334fcc78441ff212d2c857b40bdc7ff889..b474f28fadaf948ec353f8e2467943451e56f5fa 100644 (file)
@@ -37,6 +37,7 @@ libmemcached_la_SOURCES = memcached.c \
                          memcached_quit.c \
                          memcached_flush.c \
                          memcached_string.c \
+                         memcached_hosts.c \
                          memcached_stats.c
 libmemcached_la_LIBADD =
 
index 6cf81c734664d65da4dfa7dd0cd4911658cd39f3..68c5d3c5ffbbaa3f081fb5b6736d71d603b5dca1 100644 (file)
@@ -25,19 +25,10 @@ memcached_st *memcached_init(memcached_st *ptr)
 
 void memcached_deinit(memcached_st *ptr)
 {
-  unsigned int x;
-
   if (ptr->hosts)
   {
-    for (x= 0; x < ptr->number_of_hosts; x++)
-    {
-      if (ptr->hosts[x].fd > 0)
-        close(ptr->hosts[x].fd);
-
-      free(ptr->hosts[x].hostname);
-    }
-
-    free(ptr->hosts);
+    memcached_server_list_free(ptr->hosts);
+    ptr->hosts= NULL;
   }
 
   if (ptr->is_allocated == MEMCACHED_ALLOCATED)
index 516b5db346482e24500854deef4fbc0c4267aef5..381053950b16540dbc5d05925ebcc87031d369ab 100644 (file)
@@ -1,45 +1,5 @@
 #include "common.h"
 
-memcached_return memcached_server_add(memcached_st *ptr, char *hostname, unsigned int port)
-{
-  memcached_server_st *new_host_list;
-  char *new_hostname;
-  LIBMEMCACHED_MEMCACHED_SERVER_ADD_START();
-
-  if (!port)
-    port= MEMCACHED_DEFAULT_PORT; 
-
-  if (!hostname)
-    hostname= "localhost"; 
-
-
-  new_host_list= (memcached_server_st *)realloc(ptr->hosts, sizeof(memcached_server_st) * (ptr->number_of_hosts+1));
-  if (!new_host_list)
-    return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
-  memset(&new_host_list[ptr->number_of_hosts], 0, sizeof(memcached_server_st));
-  
-  if (!new_host_list)
-    return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
-
-  ptr->hosts= new_host_list;
-
-  new_hostname=
-    (char *)malloc(sizeof(char) * (strlen(hostname)+1));
-  if (!new_hostname)
-    return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
-
-  memset(new_hostname, 0, strlen(hostname)+1);
-  memcpy(new_hostname, hostname, strlen(hostname));
-  ptr->hosts[ptr->number_of_hosts].hostname= new_hostname;
-  ptr->hosts[ptr->number_of_hosts].port= port;
-  ptr->hosts[ptr->number_of_hosts].fd= -1;
-  ptr->number_of_hosts++;
-
-  LIBMEMCACHED_MEMCACHED_SERVER_ADD_END();
-
-  return MEMCACHED_SUCCESS;
-}
-
 memcached_return memcached_connect(memcached_st *ptr)
 {
   unsigned int x;
@@ -52,9 +12,7 @@ memcached_return memcached_connect(memcached_st *ptr)
     return MEMCACHED_SUCCESS;
 
   if (!ptr->hosts)
-  {
     return MEMCACHED_NO_SERVERS;
-  }
 
   for (x= 0; x < ptr->number_of_hosts; x++)
   {
index 5ec5d1b4c11d353a52857ae25a9037193cd5260c..64036c7998bbb84a0034765986853b799b099292 100644 (file)
@@ -36,5 +36,8 @@ memcached_return memcached_delete(memcached_st *ptr, char *key, size_t key_lengt
   rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
   LIBMEMCACHED_MEMCACHED_DELETE_END();
 
+  if (rc == MEMCACHED_DELETED)
+    rc= MEMCACHED_SUCCESS;
+
   return rc;
 }
index aa50d70bcf0423e7e6d6bc7e68b109b2c7649d8e..77bc68aa60df63a1f78a213263121cb3d2e714c6 100644 (file)
@@ -113,6 +113,8 @@ static char *memcached_value_fetch(memcached_st *ptr, char *key, size_t *key_len
       return value;
     }
   }
+  else if (*error == MEMCACHED_END)
+    *error= MEMCACHED_NOTFOUND;
 
   return NULL;
 read_error:
@@ -129,41 +131,57 @@ char *memcached_get(memcached_st *ptr, char *key, size_t key_length,
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
   unsigned int server_key;
   char *value;
-  memcached_return rc;
   LIBMEMCACHED_MEMCACHED_GET_START();
 
   *value_length= 0;
   *error= memcached_connect(ptr);
 
   if (*error != MEMCACHED_SUCCESS)
-    return NULL;
+    goto error;
 
   server_key= memcached_generate_hash(key, key_length) % ptr->number_of_hosts;
 
   send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, "get %.*s\r\n", 
                         (int)key_length, key);
-  if (*error != MEMCACHED_SUCCESS)
-    return NULL;
-
   if ((send(ptr->hosts[server_key].fd, buffer, send_length, 0) == -1))
   {
     *error= MEMCACHED_WRITE_FAILURE;
-    return NULL;
+    goto error;
   }
 
   value= memcached_value_fetch(ptr, key, &key_length, value_length, flags,
                                error, 0, server_key);
-  /* We need to read END */
-  rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
-  if (rc != MEMCACHED_NOTFOUND)
+  if (*error == MEMCACHED_END && *value_length == 0)
+  {
+    *error= MEMCACHED_NOTFOUND;
+    goto error;
+  }
+  else if (*error == MEMCACHED_SUCCESS)
   {
-    free(value);
-    *value_length= 0;
-    *error= MEMCACHED_PROTOCOL_ERROR;
+    memcached_return rc;
+    /* We need to read END */
+    rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
+
+    if (rc != MEMCACHED_END)
+    {
+      *error= MEMCACHED_PROTOCOL_ERROR;
+      goto error;
+    }
   }
+  else 
+      goto error;
+
   LIBMEMCACHED_MEMCACHED_GET_END();
 
   return value;
+
+error:
+  free(value);
+  *value_length= 0;
+
+  LIBMEMCACHED_MEMCACHED_GET_END();
+
+    return NULL;
 }
 
 memcached_return memcached_mget(memcached_st *ptr, 
index 1276351332d829d851eab826fe4ce61f540465ed..cb74be0a911c6f1fe9ba383299de8c5f668ca199 100644 (file)
@@ -48,7 +48,7 @@ memcached_return memcached_response(memcached_st *ptr,
         return MEMCACHED_UNKNOWN_READ_FAILURE;
     }
   case 'D': /* DELETED */
-    return MEMCACHED_SUCCESS;
+    return MEMCACHED_DELETED;
   case 'N': /* NOT_FOUND */
     {
       if (buffer[4] == 'F')
@@ -61,7 +61,7 @@ memcached_return memcached_response(memcached_st *ptr,
   case 'E': /* PROTOCOL ERROR or END */
     {
       if (buffer[1] == 'N')
-        return MEMCACHED_NOTFOUND;
+        return MEMCACHED_END;
       else if (buffer[1] == 'R')
         return MEMCACHED_PROTOCOL_ERROR;
       else
index f987b3cc52b55e6f55001c36f990c21203601f0e..c917e96e7e3c8dd58c71df96a8c81751761c0238 100644 (file)
@@ -44,6 +44,12 @@ char *memcached_strerror(memcached_st *ptr, memcached_return rc)
     return "SOME ERRORS WERE REPORTED";
   case MEMCACHED_NO_SERVERS:
     return "NO SERVERS DEFINED";
+  case MEMCACHED_END:
+    return "SERVER END";
+  case MEMCACHED_DELETED:
+    return "SERVER DELETE";
+  case MEMCACHED_VALUE:
+    return "SERVER VALUE";
   case MEMCACHED_MAXIMUM_RETURN:
     return "Gibberish returned!";
   default:
index 216506d88b26625bdc38521a34a64f77a7d0e63e..6945b7d9caf78ad902f87dab50a359d43dfa0f81 100644 (file)
@@ -7,6 +7,7 @@
 #include <fcntl.h>
 #include <sys/time.h>
 #include <getopt.h>
+#include <pthread.h>
 
 #include <memcached.h>
 
 #include "utilities.h"
 #include "generator.h"
 
+/* Global Thread counter */
+unsigned int thread_counter;
+pthread_mutex_t counter_mutex;
+pthread_cond_t count_threshhold;
+unsigned int master_wakeup;
+pthread_mutex_t sleeper_mutex;
+pthread_cond_t sleep_threshhold;
+
+void *run_task(void *p);
 
 /* Types */
 typedef struct conclusions_st conclusions_st;
+typedef struct thread_context_st thread_context_st;
+typedef enum {
+  AC_SET,
+  AC_GET,
+} run_action;
+
+struct thread_context_st {
+  unsigned int x;
+  pairs_st *pairs;
+  run_action action;
+};
 
 struct conclusions_st {
   long int load_time;
@@ -28,9 +49,11 @@ struct conclusions_st {
 /* Prototypes */
 void options_parse(int argc, char *argv[]);
 void conclusions_print(conclusions_st *conclusion);
+void scheduler(conclusions_st *conclusion);
 
 static int opt_verbose= 0;
 static unsigned int opt_default_pairs= 100;
+static unsigned int opt_concurrency= 1;
 static int opt_displayflag= 0;
 static char *opt_servers= NULL;
 
@@ -39,7 +62,6 @@ int main(int argc, char *argv[])
   unsigned int x;
   memcached_return rc;
   memcached_st *memc;
-  struct timeval start_time, end_time;
   pairs_st *pairs;
   conclusions_st conclusion;
 
@@ -56,40 +78,12 @@ int main(int argc, char *argv[])
 
   pairs= pairs_generate(opt_default_pairs);
 
+  pthread_mutex_init(&counter_mutex, NULL);
+  pthread_cond_init(&count_threshhold, NULL);
+  pthread_mutex_init(&sleeper_mutex, NULL);
+  pthread_cond_init(&sleep_threshhold, NULL);
 
-  gettimeofday(&start_time, NULL);
-  for (x= 0; x < opt_default_pairs; x++)
-  {
-    rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
-                      pairs[x].value, pairs[x].value_length,
-                      0, 0);
-    if (rc != MEMCACHED_SUCCESS)
-      fprintf(stderr, "Failured on insert of %.*s\n", 
-              (unsigned int)pairs[x].key_length, pairs[x].key);
-    conclusion.rows_loaded++;
-  }
-  gettimeofday(&end_time, NULL);
-  conclusion.load_time= timedif(end_time, start_time);
-
-  gettimeofday(&start_time, NULL);
-  for (x= 0; x < opt_default_pairs; x++)
-  {
-    char *value;
-    size_t value_length;
-    uint16_t flags;
-
-    value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
-                         &value_length,
-                         &flags, &rc);
-
-    if (rc != MEMCACHED_SUCCESS)
-      fprintf(stderr, "Failured on read of %.*s\n", 
-              (unsigned int)pairs[x].key_length, pairs[x].key);
-    conclusion.rows_read++;
-    free(value);
-  }
-  gettimeofday(&end_time, NULL);
-  conclusion.read_time= timedif(end_time, start_time);
+  scheduler(&conclusion);
 
   pairs_free(pairs);
 
@@ -97,11 +91,78 @@ int main(int argc, char *argv[])
 
   memcached_deinit(memc);
 
+  (void)pthread_mutex_init(&counter_mutex, NULL);
+  (void)pthread_cond_init(&count_threshhold, NULL);
+  (void)pthread_mutex_init(&sleeper_mutex, NULL);
+  (void)pthread_cond_init(&sleep_threshhold, NULL);
   conclusions_print(&conclusion);
 
   return 0;
 }
 
+void scheduler(conclusions_st *conclusion)
+{
+  unsigned int x;
+  struct timeval start_time, end_time;
+  pthread_t mainthread;            /* Thread descriptor */
+  pthread_attr_t attr;          /* Thread attributes */
+
+  pthread_attr_init(&attr);
+  pthread_attr_setdetachstate(&attr,
+                              PTHREAD_CREATE_DETACHED);
+
+  pthread_mutex_lock(&counter_mutex);
+  thread_counter= 0;
+
+  pthread_mutex_lock(&sleeper_mutex);
+  master_wakeup= 1;
+  pthread_mutex_unlock(&sleeper_mutex);
+
+  for (x= 0; x < opt_concurrency; x++)
+  {
+    thread_context_st *context;
+    context= (thread_context_st *)malloc(sizeof(thread_context_st));
+
+    /* now you create the thread */
+    if (pthread_create(&mainthread, &attr, run_task,
+                       (void *)context) != 0)
+    {
+      fprintf(stderr,"Could not create thread\n");
+      exit(1);
+    }
+    thread_counter++;
+  }
+
+  pthread_mutex_unlock(&counter_mutex);
+  pthread_attr_destroy(&attr);
+
+  pthread_mutex_lock(&sleeper_mutex);
+  master_wakeup= 0;
+  pthread_mutex_unlock(&sleeper_mutex);
+  pthread_cond_broadcast(&sleep_threshhold);
+
+  gettimeofday(&start_time, NULL);
+  /*
+    We loop until we know that all children have cleaned up.
+  */
+  pthread_mutex_lock(&counter_mutex);
+  while (thread_counter)
+  {
+    struct timespec abstime;
+
+    memset(&abstime, 0, sizeof(struct timespec));
+    abstime.tv_sec= 1;
+
+    pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
+  }
+  pthread_mutex_unlock(&counter_mutex);
+
+  gettimeofday(&end_time, NULL);
+
+  conclusion->load_time= timedif(end_time, start_time);
+  conclusion->read_time= timedif(end_time, start_time);
+}
+
 void options_parse(int argc, char *argv[])
 {
   static struct option long_options[]=
@@ -165,3 +226,59 @@ void conclusions_print(conclusions_st *conclusion)
   printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, 
          conclusion->read_time % 1000);
 }
+
+void *run_task(void *p)
+{
+  unsigned int x;
+  thread_context_st *context= (thread_context_st *)p;
+
+  pthread_mutex_lock(&sleeper_mutex);
+  while (master_wakeup)
+  {
+    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
+  } 
+  pthread_mutex_unlock(&sleeper_mutex);
+
+  /* Do Stuff */
+
+  switch (context->action)
+  {
+  case AC_SET:
+    for (x= 0; x < opt_default_pairs; x++)
+    {
+      rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
+                        pairs[x].value, pairs[x].value_length,
+                        0, 0);
+      if (rc != MEMCACHED_SUCCESS)
+        fprintf(stderr, "Failured on insert of %.*s\n", 
+                (unsigned int)pairs[x].key_length, pairs[x].key);
+      conclusion->rows_loaded++;
+  }
+    break;
+  case AC_GET:
+    for (x= 0; x < opt_default_pairs; x++)
+    {
+      char *value;
+      size_t value_length;
+      uint16_t flags;
+
+      value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
+                           &value_length,
+                           &flags, &rc);
+
+      if (rc != MEMCACHED_SUCCESS)
+        fprintf(stderr, "Failured on read of %.*s\n", 
+                (unsigned int)pairs[x].key_length, pairs[x].key);
+      conclusion->rows_read++;
+      free(value);
+    }
+    break;
+  }
+
+  pthread_mutex_lock(&counter_mutex);
+  thread_counter--;
+  pthread_cond_signal(&count_threshhold);
+  pthread_mutex_unlock(&counter_mutex);
+
+  free(context);
+}
index 50cb7861556eba56e5e3bef951aaf074efbce8e5..50dd00ba53efe567bd34a79ee48ba367271457c8 100644 (file)
@@ -18,6 +18,9 @@ Error 16 -> MEMORY ALLOCATION FAILURE
 Error 17 -> PARTIAL READ
 Error 18 -> SOME ERRORS WERE REPORTED
 Error 19 -> NO SERVERS DEFINED
+Error 20 -> SERVER END
+Error 21 -> SERVER DELETE
+Error 22 -> SERVER VALUE
 Found key pid
 Found key uptime
 Found key time
index c60b48104738f1981ed34dee712aea0b2ed124f9..3ebb24297a6f32759d1070c5d0dfeab17ab96353 100644 (file)
@@ -536,6 +536,78 @@ void get_stats_multiple(void)
  memcached_deinit(memc);
 }
 
+void add_host_test(void)
+{
+  unsigned int x;
+  memcached_st *memc;
+  memcached_server_st *servers;
+  memcached_return rc;
+  char servername[]= "0.example.com";
+
+  memc= memcached_init(NULL);
+  assert(memc);
+  rc= memcached_server_add(memc, "localhost", 0);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  servers= memcached_server_list_append(NULL, servername, 400, &rc);
+  assert(servers);
+  assert(1 == memcached_server_list_count(servers));
+
+  for (x= 2; x < 20; x++)
+  {
+    char buffer[SMALL_STRING_LEN];
+
+    snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x);
+    servers= memcached_server_list_append(servers, buffer, 401, 
+                                     &rc);
+    assert(rc == MEMCACHED_SUCCESS);
+    assert(x == memcached_server_list_count(servers));
+  }
+
+  rc= memcached_server_push(memc, servers);
+  assert(rc == MEMCACHED_SUCCESS);
+  rc= memcached_server_push(memc, servers);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  memcached_server_list_free(servers);
+  memcached_deinit(memc);
+}
+
+void add_host_test1(void)
+{
+  unsigned int x;
+  memcached_st *memc;
+  memcached_server_st *servers;
+  memcached_return rc;
+  char servername[]= "0.example.com";
+
+  memc= memcached_init(NULL);
+  assert(memc);
+
+  servers= memcached_server_list_append(NULL, servername, 400, &rc);
+  assert(servers);
+  assert(1 == memcached_server_list_count(servers));
+
+  for (x= 2; x < 20; x++)
+  {
+    char buffer[SMALL_STRING_LEN];
+
+    snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x);
+    servers= memcached_server_list_append(servers, buffer, 401, 
+                                     &rc);
+    assert(rc == MEMCACHED_SUCCESS);
+    assert(x == memcached_server_list_count(servers));
+  }
+
+  rc= memcached_server_push(memc, servers);
+  assert(rc == MEMCACHED_SUCCESS);
+  rc= memcached_server_push(memc, servers);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  memcached_server_list_free(servers);
+  memcached_deinit(memc);
+}
+
 
 int main(int argc, char *argv[])
 {
@@ -562,6 +634,7 @@ int main(int argc, char *argv[])
   quit_test();
   mget_test();
   get_stats();
+  add_host_test();
 
   /* The multiple tests */
   if (argc == 2)