This is a rewrite of some of the IO code to handle larger loads of set data
authorBrian Aker <brian@tangent.org>
Fri, 12 Oct 2007 08:46:27 +0000 (01:46 -0700)
committerBrian Aker <brian@tangent.org>
Fri, 12 Oct 2007 08:46:27 +0000 (01:46 -0700)
on concurrent insert with non-blocking IO.

A bug was also uncovered where the read bufffers for multiple hosts were not
being read (all code has been refactored for that now).

One user contributed test case has been added.

include/memcached.h
lib/common.h
lib/memcached_connect.c
lib/memcached_io.c
lib/memcached_quit.c
lib/memcached_response.c
lib/memcached_storage.c
lib/memcached_strerror.c
tests/test.c

index da0510701d3859d8f0654d141379f6784584800c..558fbb09e22273d8764fa8dbeaab63e1722d49c1 100644 (file)
@@ -67,6 +67,7 @@ typedef enum {
   MEMCACHED_DELETED,
   MEMCACHED_VALUE,
   MEMCACHED_STAT,
+  MEMCACHED_ERRNO,
   MEMCACHED_MAXIMUM_RETURN, /* Always add new error code before */
 } memcached_return;
 
@@ -86,6 +87,7 @@ struct memcached_server_st {
   char *hostname;
   unsigned int port;
   int fd;
+  unsigned int stack_responses;
 };
 
 struct memcached_stat_st {
@@ -136,7 +138,6 @@ struct memcached_st {
   size_t write_buffer_offset;
   char connected;
   int my_errno;
-  unsigned int stack_responses;
   unsigned long long flags;
   memcached_return warning; /* Future Use */
 };
@@ -196,6 +197,9 @@ char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length,
 #define memcached_server_name(A,B) B.hostname
 #define memcached_server_port(A,B) B.port
 #define memcached_server_list(A) A->hosts
+#define memcached_server_response_increment(A,B) A->hosts[B].stack_responses++
+#define memcached_server_response_decrement(A,B) A->hosts[B].stack_responses--
+#define memcached_server_response_count(A,B) A->hosts[B].stack_responses
 
 memcached_return memcached_server_add(memcached_st *ptr, char *hostname, 
                                       unsigned int port);
index f922fee15680dabe42c1acde41d6e239feb5c481..8440f4121e000b686d3c7659f9631a7b77c1ffae 100644 (file)
@@ -32,5 +32,6 @@ memcached_return memcached_response(memcached_st *ptr,
                                     char *buffer, size_t buffer_length,
                                     unsigned int server_key);
 unsigned int memcached_generate_hash(memcached_st *ptr, char *key, size_t key_length);
+void memcached_quit_server(memcached_st *ptr, unsigned int server_key);
 
 #endif /* __COMMON_H__ */
index f95b8e7d752f3115298cf85310b63c1d93a9bb69..116423cea9d98ce926f94684fb84e758a24f7aba 100644 (file)
@@ -4,6 +4,7 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/tcp.h>
+#include <netdb.h>
 
 memcached_return memcached_real_connect(memcached_st *ptr, unsigned int server_key)
 {
@@ -13,7 +14,10 @@ memcached_return memcached_real_connect(memcached_st *ptr, unsigned int server_k
   if (ptr->hosts[server_key].fd == -1)
   {
     if ((h= gethostbyname(ptr->hosts[server_key].hostname)) == NULL)
+    {
+      ptr->my_errno= h_errno;
       return MEMCACHED_HOST_LOCKUP_FAILURE;
+    }
 
     servAddr.sin_family= h->h_addrtype;
     memcpy((char *) &servAddr.sin_addr.s_addr, h->h_addr_list[0], h->h_length);
@@ -56,6 +60,7 @@ test_connect:
     {
       switch (errno) {
         /* We are spinning waiting on connect */
+      case EALREADY:
       case EINPROGRESS:
       case EINTR:
         goto test_connect;
@@ -63,7 +68,7 @@ test_connect:
         break;
       default:
         ptr->my_errno= errno;
-        return MEMCACHED_HOST_LOCKUP_FAILURE;
+        return MEMCACHED_ERRNO;
       }
       ptr->connected++;
     }
index 9277b6ba3a883a387829acc9130046843f3756f4..2f86930e3690f482c34a2fd7c523ba4d2c316677 100644 (file)
@@ -107,7 +107,10 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
   }
 
   if (with_flush)
-    memcached_io_flush(ptr, server_key);
+  {
+    if (memcached_io_flush(ptr, server_key) == -1)
+      return -1;
+  }
 
   return length;
 }
@@ -115,30 +118,67 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
 {
   size_t sent_length;
+  char *write_ptr= ptr->write_buffer;
+  size_t write_length= ptr->write_buffer_offset;
+  unsigned int loop= 1;
 
   if (ptr->write_buffer_offset == 0)
     return 0;
 
-  if (ptr->flags & MEM_NO_BLOCK)
+  while (write_length)
   {
-    struct timeval local_tv;
-    fd_set set;
+    if (ptr->flags & MEM_NO_BLOCK)
+    {
 
-    local_tv.tv_sec= 0;
-    local_tv.tv_usec= 300;
+      while (1)
+      {
+        struct timeval local_tv;
+        fd_set set;
+        int select_return;
 
-    FD_ZERO(&set);
-    FD_SET(ptr->hosts[server_key].fd, &set);
+        local_tv.tv_sec= 0;
+        local_tv.tv_usec= 300 * loop;
 
-    select(1, NULL, &set, NULL, &local_tv);
-  }
-  if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, 
-                         ptr->write_buffer_offset, 0)) == -1)
-  {
-    return -1;
-  }
+        FD_ZERO(&set);
+        FD_SET(ptr->hosts[server_key].fd, &set);
+
+        select_return= select(1, NULL, &set, NULL, &local_tv);
+
+        if (select_return == -1)
+        {
+          ptr->my_errno= errno;
+          return -1;
+        }
+        else if (!select_return)
+          break;
+      }
+    }
 
-  assert(sent_length == ptr->write_buffer_offset);
+    sent_length= 0;
+    if ((sent_length= send(ptr->hosts[server_key].fd, write_ptr, 
+                           write_length, 0)) == -1)
+    {
+      switch (errno)
+      {
+      case ENOBUFS:
+      case EAGAIN:
+        if (loop < 10)
+        {
+          loop++;
+          break;
+        }
+        /* Yes, we want to fall through */
+      default:
+        ptr->my_errno= errno;
+        return -1;
+      }
+    }
+    else
+    {
+      write_ptr+= sent_length;
+      write_length-= sent_length;
+    }
+  }
 
   ptr->write_buffer_offset= 0;
 
index 00b454c0cfe866161db87877cd727a0a3fb12b0e..d7a42315cc39ae6fe6365925814400f18eae7d0b 100644 (file)
@@ -8,6 +8,21 @@
   The reason we send "quit" is that in case we have buffered IO, this 
   will force data to be completed.
 */
+
+void memcached_quit_server(memcached_st *ptr, unsigned int server_key)
+{
+  if (ptr->hosts[server_key].fd != -1)
+  {
+    if (ptr->flags & MEM_NO_BLOCK)
+      memcached_io_write(ptr, server_key, "quit\r\n", 6, 1);
+    close(ptr->hosts[server_key].fd);
+    ptr->hosts[server_key].fd= -1;
+    ptr->hosts[server_key].stack_responses= 0;
+  }
+
+  ptr->connected--;
+}
+
 void memcached_quit(memcached_st *ptr)
 {
   unsigned int x;
@@ -15,15 +30,7 @@ void memcached_quit(memcached_st *ptr)
   if (ptr->hosts)
   {
     for (x= 0; x < ptr->number_of_hosts; x++)
-    {
-      if (ptr->hosts[x].fd != -1)
-      {
-        if (ptr->flags & MEM_NO_BLOCK)
-          memcached_io_write(ptr, x, "quit\r\n", 6, 1);
-        close(ptr->hosts[x].fd);
-        ptr->hosts[x].fd= -1;
-      }
-    }
+      memcached_quit_server(ptr, x);
   }
 
   ptr->connected= 0;
index 5b0b7831cd8b7fef75d43e806cfa0c30393bde90..10547504743c476956ade3a7a5ebfe664f44aa4c 100644 (file)
@@ -15,13 +15,17 @@ memcached_return memcached_response(memcached_st *ptr,
   unsigned int x;
   size_t send_length;
   char *buffer_ptr;
+  unsigned int max_messages;
+
 
   memset(buffer, 0, buffer_length);
   send_length= 0;
 
-  for (x= 0; x <= ptr->stack_responses; x++)
+  max_messages= memcached_server_response_count(ptr, server_key);
+  for (x= 0; x <=  max_messages; x++)
   {
     buffer_ptr= buffer;
+
     while (1)
     {
       unsigned int read_length;
@@ -37,8 +41,10 @@ memcached_return memcached_response(memcached_st *ptr,
       else
         buffer_ptr++;
     }
+
+    if (memcached_server_response_count(ptr, server_key))
+      memcached_server_response_decrement(ptr, server_key);
   }
-  ptr->stack_responses= 0;
 
   switch(buffer[0])
   {
index 6b76375cdb20bd62505f9ee5e5fcec5061af048e..353615fe8aaa55b5729c12cbdb3ea321956b53bc 100644 (file)
@@ -37,7 +37,12 @@ static memcached_return memcached_send(memcached_st *ptr,
 
   memset(buffer, 0, MEMCACHED_DEFAULT_COMMAND_SIZE);
 
-  /* Leaveing this assert in since only a library fubar could blow this */
+  /* Leaving this assert in since only a library fubar could blow this */
+  if (ptr->write_buffer_offset != 0)
+  {
+    WATCHPOINT_NUMBER(ptr->write_buffer_offset);
+  }
+    
   assert(ptr->write_buffer_offset == 0);
 
   server_key= memcached_generate_hash(ptr, key, key_length);
@@ -74,6 +79,7 @@ static memcached_return memcached_send(memcached_st *ptr,
 
   if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2, 1)) == -1)
   {
+    memcached_quit_server(ptr, server_key);
     rc= MEMCACHED_WRITE_FAILURE;
     goto error;
   }
@@ -81,7 +87,7 @@ static memcached_return memcached_send(memcached_st *ptr,
   if ((ptr->flags & MEM_NO_BLOCK) && verb == SET_OP)
   {
     rc= MEMCACHED_SUCCESS;
-    ptr->stack_responses++;
+    memcached_server_response_increment(ptr, server_key);
   }
   else
   {
index 372cea23b1ff1e4cd3a77ee5836eb7729af5f63c..587ac862aad37bd16a6ec5d6e0b1e5eb2f000265 100644 (file)
@@ -54,6 +54,8 @@ char *memcached_strerror(memcached_st *ptr, memcached_return rc)
     return "SERVER VALUE";
   case MEMCACHED_STAT:
     return "STAT VALUE";
+  case MEMCACHED_ERRNO:
+    return "UNKOWN ERROR SEE MY_ERRNO";
   case MEMCACHED_MAXIMUM_RETURN:
     return "Gibberish returned!";
   default:
index 935e14009f428c8735e71371692200fc9963427d..714cd89120223fe01883746486f1f7ab51361263 100644 (file)
@@ -391,6 +391,7 @@ void get_stats_keys(memcached_st *memc)
  assert(rc == MEMCACHED_SUCCESS);
  for (ptr= list; *ptr; ptr++)
    printf("Found key %s\n", *ptr);
+ fflush(stdout);
 
  free(list);
 }
@@ -485,6 +486,48 @@ void behavior_test(memcached_st *memc)
   assert(value == 0);
 }
 
+/* Test case provided by Cal Haldenbrand */
+void user_supplied_bug1(memcached_st *memc)
+{
+  unsigned int setter= 1;
+  unsigned int x;
+
+  long total= 0;
+  int size= 0;
+  srand(time(NULL));
+  char key[10];
+  char *randomstuff = (char *)malloc(6 * 1024); 
+  memset(randomstuff, 0, 6 * 1024);
+
+  memcached_return rc;
+
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &setter);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &setter);
+
+
+  /* add key */
+  for (x= 0 ; total < 20 * 1024576 ; x++ )
+  {
+    unsigned int j= 0;
+
+    size= (rand() % ( 5 * 1024 ) ) + 400;
+    memset(randomstuff, 0, 6 * 1024);
+    assert(size < 6 * 1024); /* Being safe here */
+
+    for (j= 0 ; j < size ;j++) 
+      randomstuff[j] = (char) (rand() % 26) + 97;
+
+    total += size;
+    sprintf(key, "%d", x);
+    rc = memcached_set(memc, key, strlen(key), 
+                       randomstuff, strlen(randomstuff), 10, 0);
+    /* If we fail, lets try again */
+    if (rc != MEMCACHED_SUCCESS)
+      rc = memcached_set(memc, key, strlen(key), 
+                         randomstuff, strlen(randomstuff), 10, 0);
+    assert(rc == MEMCACHED_SUCCESS);
+  }
+}
 void add_host_test1(memcached_st *memc)
 {
   unsigned int x;
@@ -533,6 +576,7 @@ int main(int argc, char *argv[])
     server_list= "localhost";
 
   printf("servers %s\n", server_list);
+  srandom(time(NULL));
 
   servers= memcached_servers_parse(server_list);
   assert(servers);
@@ -565,6 +609,11 @@ int main(int argc, char *argv[])
     {0, 0, 0}
   };
 
+  test_st user_tests[] ={
+    {"user_supplied_bug1", 0, user_supplied_bug1 },
+    {0, 0, 0}
+  };
+
   fprintf(stderr, "\nBlock tests\n\n");
   for (x= 0; tests[x].function_name; x++)
   {
@@ -641,6 +690,24 @@ int main(int argc, char *argv[])
     memcached_free(memc);
   }
 
+  fprintf(stderr, "\nUser Supplied tests\n\n");
+  for (x= 0; user_tests[x].function_name; x++)
+  {
+    memcached_st *memc;
+    memcached_return rc;
+    memc= memcached_create(NULL);
+    assert(memc);
+
+    rc= memcached_server_push(memc, servers);
+    assert(rc == MEMCACHED_SUCCESS);
+
+    fprintf(stderr, "Testing %s", user_tests[x].function_name);
+    user_tests[x].function(memc);
+    fprintf(stderr, "\t\t\t\t\t[ ok ]\n");
+    assert(memc);
+    memcached_free(memc);
+  }
+
   /* Clean up whatever we might have left */
   {
     memcached_st *memc;
@@ -649,5 +716,8 @@ int main(int argc, char *argv[])
     flush_test(memc);
     memcached_free(memc);
   }
+
+  fprintf(stderr, "All tests completed successfully\n\n");
+
   return 0;
 }