Big fix for async mode to make sure all data has been pushed through socket
author <brian@gir.tangent.org> <>
Sat, 1 Dec 2007 03:48:03 +0000 (19:48 -0800)
committer <brian@gir.tangent.org> <>
Sat, 1 Dec 2007 03:48:03 +0000 (19:48 -0800)
before close.

Refactor of memcached_get() to use common code.

ChangeLog
include/memcached.h
lib/common.h
lib/memcached_connect.c
lib/memcached_get.c
lib/memcached_io.c
lib/memcached_io.h
lib/memcached_quit.c
lib/memcached_storage.c
tests/function.c

index ffb487e229564add2a825b8c893d982d548ab14e..42273139a53e3b679d7fe77f26d04ef7c280d118 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,10 @@
   * Updates for consistent hashing
   * IPV6 support
   * Static allocation for hostname (performance)
+  * Fixed bug where in non-block mode all data might not have been sent on
+    close().
+  * Refactor of memcached_get() to use common code.
+
 
 0.11 Mon Nov 26 01:05:52 PST 2007
   * Added option to memcache_behavior_set() so that poll() can be timed out.
index 8b883534e50d29ad919f33078ae47539c21371e8..c0b5a7cb324f8e0b299621e76c257979355aba63 100644 (file)
@@ -194,6 +194,7 @@ struct memcached_st {
   memcached_hash hash;
   memcached_server_distribution distribution;
   unsigned int wheel[MEMCACHED_WHEEL_SIZE];
+  uint8_t replicas;
   memcached_return warning; /* Future Use */
 };
 
@@ -322,6 +323,7 @@ size_t memcached_result_length(memcached_result_st *ptr);
 #define WATCHPOINT_ERROR(A) fprintf(stderr, "\nWATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout);
 #endif
 #define WATCHPOINT_STRING(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__,A);fflush(stdout);
+#define WATCHPOINT_STRING_LENGTH(A,B) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %.*s\n", __FILE__, __LINE__,__func__,(int)B,A);fflush(stdout);
 #define WATCHPOINT_NUMBER(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %zu\n", __FILE__, __LINE__,__func__,(size_t)(A));fflush(stdout);
 #define WATCHPOINT_ERRNO(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__, strerror(A));A= 0;fflush(stdout);
 #define WATCHPOINT_ASSERT(A) assert((A));
@@ -330,6 +332,7 @@ size_t memcached_result_length(memcached_result_st *ptr);
 #define WATCHPOINT { 1; };
 #define WATCHPOINT_ERROR(A) { 1; };
 #define WATCHPOINT_STRING(A) { 1; };
+#define WATCHPOINT_STRING_LENGTH(A,B) { 1; };
 #define WATCHPOINT_NUMBER(A) { 1; };
 #define WATCHPOINT_ERRNO(A) { 1; };
 #define WATCHPOINT_ASSERT(A) { 1; };
index d4a7baca7a002676e0607ca3287626122cca49b8..5c2d2682e3a59e01995167f4f322e52b24d847ef 100644 (file)
 #include <assert.h>
 #include <time.h>
 #include <errno.h>
+#include <fcntl.h>
+#include <sys/un.h>
+#include <netinet/tcp.h>
+
 
 
 #include <memcached.h>
 
 #define MEMCACHED_BLOCK_SIZE 1024
 
+typedef enum {
+  MEM_NO_FLUSH,
+  MEM_FLUSH,
+} memcached_flush_action;
+
 typedef enum {
   MEM_NO_BLOCK= (1 << 0),
   MEM_TCP_NODELAY= (1 << 1),
index 672b1c23b65e5ce4f1d0c29c9502352186425d42..82a08ab7327ea513908d4fac32066e8d8743952f 100644 (file)
@@ -1,13 +1,5 @@
 #include "common.h"
 
-#include <fcntl.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <netinet/tcp.h>
-#include <netdb.h>
-#include <netinet/in.h>
-
 static memcached_return set_hostinfo(memcached_server_st *server)
 {
   struct addrinfo *ai;
@@ -140,10 +132,11 @@ static memcached_return tcp_connect(memcached_st *ptr, unsigned int server_key)
                                            use->ai_protocol)) < 0)
     {
       ptr->cached_errno= errno;
+      WATCHPOINT_ERRNO(errno);
       return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
     }
 
-    /* For the moment, not getting a nonblocking mode will note be fatal */
+    /* For the moment, not getting a nonblocking mode will not be fatal */
     if (ptr->flags & MEM_NO_BLOCK)
     {
       int flags;
index 63db8d5dec14af264e09eec6173d0d9ae3b30295..0ca40523ad28f80067e548f72b37a204f925fcbd 100644 (file)
@@ -146,76 +146,32 @@ char *memcached_get(memcached_st *ptr, char *key, size_t key_length,
                     uint16_t *flags,
                     memcached_return *error)
 {
-  char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
-  char *buf_ptr= buffer;
-  unsigned int server_key;
-  memcached_string_st *result_buffer;
-  LIBMEMCACHED_MEMCACHED_GET_START();
+  char *value;
+  char *dummy_value;
+  size_t dummy_length;
+  uint16_t dummy_flags;
+  memcached_return dummy_error;
 
-  if (key_length == 0)
-  {
-    *error= MEMCACHED_NO_KEY_PROVIDED;
-    return NULL;
-  }
+  /* Request the key */
+  *error= memcached_mget(ptr, &key, &key_length, 1);
 
-  if (ptr->hosts == NULL || ptr->number_of_hosts == 0)
-  {
-    *error= MEMCACHED_NO_SERVERS;
-    return NULL;
-  }
-
-  server_key= memcached_generate_hash(ptr, key, key_length);
-  result_buffer= &ptr->result_buffer;
+  value= memcached_fetch(ptr, NULL, NULL, 
+                         value_length, flags, error);
 
-  *value_length= 0;
-  memcpy(buf_ptr, "get ", 4);
-  buf_ptr+= 4;
-  memcpy(buf_ptr, key, key_length);
-  buf_ptr+= key_length;
-  memcpy(buf_ptr, "\r\n", 2);
-  buf_ptr+= 2;
-
-  *error= memcached_do(ptr, server_key, buffer, (size_t)(buf_ptr - buffer), 1);
-  if (*error != MEMCACHED_SUCCESS)
-    goto error;
-
-  *error= memcached_value_fetch(ptr, NULL, NULL, result_buffer, 
-                                flags, NULL, server_key);
-  *value_length= memcached_string_length(result_buffer);
-  if (*error == MEMCACHED_END && *value_length == 0)
-  {
-    *error= MEMCACHED_NOTFOUND;
-    goto error;
-  }
-  else if (*error == MEMCACHED_END)
-  {
-    WATCHPOINT_ASSERT(0); /* If this happens we have somehow messed up the fetch */
-  }
-  else if (*error == MEMCACHED_SUCCESS)
-  {
-    memcached_return rc;
-    /* We need to read END */
-    rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
-
-    if (rc != MEMCACHED_END)
-    {
-      *error= MEMCACHED_PROTOCOL_ERROR;
-      goto error;
-    }
-  }
-  else 
-      goto error;
-
-  LIBMEMCACHED_MEMCACHED_GET_END();
-
-  return memcached_string_c_copy(result_buffer);
+  if (value == NULL)
+    return NULL;
 
-error:
-  *value_length= 0;
+  /* We do a second read to clean the cursor */
+  dummy_value= memcached_fetch(ptr, NULL, NULL, 
+                               &dummy_length, &dummy_flags, 
+                               &dummy_error);
 
-  LIBMEMCACHED_MEMCACHED_GET_END();
+  /* Something is really wrong if this happens */
+  WATCHPOINT_ASSERT(dummy_value == NULL);
+  if (dummy_value)
+    free(dummy_value);
 
-  return NULL;
+  return value;
 }
 
 memcached_return memcached_mget(memcached_st *ptr, 
index 8a05ddb8361e3efc03d40f85a40cd947f0665585..0657dfd65da7f2e2a3e5bfb2df116596178627e3 100644 (file)
@@ -124,6 +124,32 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
   return length;
 }
 
+memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
+{
+  struct pollfd fds[1];
+  short flags= 0;
+  struct timespec timer;
+  memcached_return rc;
+
+  timer.tv_sec= 1;
+  timer.tv_nsec= 0;
+  flags= POLLHUP |  POLLERR;
+
+  memset(&fds, 0, sizeof(struct pollfd));
+  fds[0].fd= ptr->hosts[server_key].fd;
+  fds[0].events= flags;
+  fds[0].revents= flags;
+
+  if (poll(fds, 1, ptr->poll_timeout) < 0)
+    rc= MEMCACHED_FAILURE;
+  else
+    rc= MEMCACHED_SUCCESS;
+
+  close(ptr->hosts[server_key].fd);
+
+  return rc;
+}
+
 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
 {
   size_t sent_length;
index bb5b8fc539b3203ed6146287d38b89f998a7f17f..8e2ddbc952245c3526a01971f23f8724399b867b 100644 (file)
@@ -7,3 +7,4 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
 void memcached_io_reset(memcached_st *ptr, unsigned int server_key);
 ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
                           char *buffer, size_t length);
+memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key);
index a0cc617299962c4eacaeda28609c5d5e04d342cb..483b177249ae2e956f4a0f56872e0f146f3ae5e2 100644 (file)
@@ -18,12 +18,11 @@ void memcached_quit_server(memcached_st *ptr, unsigned int server_key)
 
   if (ptr->hosts[server_key].fd != -1)
   {
-    if (ptr->flags & MEM_NO_BLOCK && ptr->hosts[server_key].stack_responses)
-      memcached_io_flush(ptr, server_key);
+    memcached_return rc;
+    rc= memcached_do(ptr, server_key, "quit\r\n", 6, 1);
+    WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
 
-    memcached_io_write(ptr, server_key, "quit\r\n", 6, 1);
-
-    close(ptr->hosts[server_key].fd);
+    memcached_io_close(ptr, server_key);
     ptr->hosts[server_key].fd= -1;
     ptr->hosts[server_key].stack_responses= 0;
     ptr->hosts[server_key].cursor_active= 0;
index 93968ad982dc87d17924f5835b7f9a61bbb0b53d..6672a86ad2939be30e1d221256132a8e904f9555 100644 (file)
@@ -62,16 +62,8 @@ static inline memcached_return memcached_send(memcached_st *ptr,
   if (key_length == 0)
     return MEMCACHED_NO_KEY_PROVIDED;
 
-  if (ptr->hosts == NULL || ptr->number_of_hosts == 0)
-    return MEMCACHED_NO_SERVERS;
-    
   server_key= memcached_generate_hash(ptr, key, key_length);
 
-  rc= memcached_connect(ptr, server_key);
-  if (rc != MEMCACHED_SUCCESS)
-    return rc;
-
-
   if (cas)
     write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
                            "%s %.*s %u %llu %zu %llu\r\n", storage_op_string(verb),
@@ -90,15 +82,9 @@ static inline memcached_return memcached_send(memcached_st *ptr,
     goto error;
   }
 
-  /* 
-    We have to flush after sending the command. Memcached is not smart enough
-    to just keep reading from the socket :(
-  */
-  if ((sent_length= memcached_io_write(ptr, server_key, buffer, write_length, 0)) == -1)
-  {
-    rc= MEMCACHED_WRITE_FAILURE;
+  rc=  memcached_do(ptr, server_key, buffer, write_length, 0);
+  if (rc != MEMCACHED_SUCCESS)
     goto error;
-  }
 
   if ((sent_length= memcached_io_write(ptr, server_key, value, value_length, 0)) == -1)
   {
@@ -118,7 +104,7 @@ static inline memcached_return memcached_send(memcached_st *ptr,
     goto error;
   }
 
-  if ((ptr->flags & MEM_NO_BLOCK) && verb == SET_OP)
+  if (to_write == 0)
   {
     rc= MEMCACHED_SUCCESS;
     memcached_server_response_increment(ptr, server_key);
index 3a0bd648b5c72a7f4157de61738c5ab94fab27a1..9cd10aff18a343c8373eafa8c3ba12cb14e75b17 100644 (file)
@@ -319,12 +319,21 @@ uint8_t prepend_test(memcached_st *memc)
   return 0;
 }
 
+/* 
+  Set the value, then quit to make sure it is flushed.
+  Come back in and test that add fails.
+*/
 uint8_t add_test(memcached_st *memc)
 {
   memcached_return rc;
   char *key= "foo";
   char *value= "when we sanitize";
 
+  rc= memcached_set(memc, key, strlen(key), 
+                    value, strlen(value),
+                    (time_t)0, (uint16_t)0);
+  assert(rc == MEMCACHED_SUCCESS);
+  memcached_quit(memc);
   rc= memcached_add(memc, key, strlen(key), 
                     value, strlen(value),
                     (time_t)0, (uint16_t)0);
@@ -1760,7 +1769,7 @@ test_st tests[] ={
   {"set", 0, set_test },
   {"set2", 0, set_test2 },
   {"set3", 0, set_test3 },
-  {"add", 0, add_test },
+  {"add", 1, add_test },
   {"replace", 0, replace_test },
   {"delete", 1, delete_test },
   {"get", 1, get_test },