Branch merge for fixes in reconnect.
author <brian@gir-2.local> <>
Fri, 29 Feb 2008 07:22:14 +0000 (23:22 -0800)
committer <brian@gir-2.local> <>
Fri, 29 Feb 2008 07:22:14 +0000 (23:22 -0800)
13 files changed:
ChangeLog
docs/libmemcached.pod
docs/memcached_behavior.pod
include/memcached.h
lib/Makefile.am
lib/memcached.c
lib/memcached_auto.c
lib/memcached_behavior.c
lib/memcached_delete.c
lib/memcached_get.c
lib/memcached_mget.c [new file with mode: 0644]
lib/memcached_storage.c
tests/function.c

index 199536b56e77d4b6f925aa692602ed50c0739dc2..3a9e3afa546f70d09323267cb462d89b464ac58a 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,5 @@
+  * Replication support has been added to the library.
+
 0.17 Wed Feb 27 03:33:29 PST 2008
   * MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT added for connect timeout in
     non-block mode.
index 1700cf5b386b9350fcd47b7ff9dc7b35d556038d..a4fbb876ea0a837d0392d3419e3317667bf687ac 100755 (executable)
@@ -38,6 +38,11 @@ This value can be translated to a printable string with memcached_strerr(3).
 Partitioning based on keys is supported in the library. Using the key partioning 
 functions it is possible to group sets of object onto servers.
 
+Replication can be done across multiple nodes. libmemcached(3) will spread
+data out when consistent hashing has been enabled and replication has been
+specified to send data to more then one node. This feature is considered
+very experimental at this stage.
+
 C<memcached_st> structures are thread-safe, but each thread must
 contain its own structure (that is, if you want to share these among
 threads you must provide your own locking). No global variables are
index 2268e845976f77905a387fd44e66b5e7f63ea508..8da9183dd486fc1736a5681543a59e239c1d7f75 100755 (executable)
@@ -37,6 +37,12 @@ memcached_behavior_set() will flush and reset all connections.
 
 =over 4
 
+=item MEMCACHED_BEHAVIOR_REPLICAS
+
+By default libmemcached(3) stores data in just one node of the cluster. Setting this value
+to a number will cause that number of copies to be kept. The value must be greater then 
+zero, and must be at most the same value as the number of hosts in the cluster.
+
 =item MEMCACHED_BEHAVIOR_NO_BLOCK
 
 Causes libmemcached(3) to use asychronous IO. This is the fastest transport
index 570404e831f1cd1091a4d2803b268934abdb7026..1c80878dec9b2e2fdfe2f6e75c6067870af92d70 100644 (file)
@@ -29,6 +29,7 @@ extern "C" {
 #define MEMCACHED_MAX_HOST_LENGTH 64
 #define MEMCACHED_WHEEL_SIZE 1024
 #define MEMCACHED_STRIDE 4
+#define MEMCACHED_MAX_REPLICAS 4
 #define MEMCACHED_DEFAULT_TIMEOUT INT32_MAX
 
 /* string value */
@@ -104,6 +105,7 @@ typedef enum {
   MEMCACHED_BEHAVIOR_SORT_HOSTS,
   MEMCACHED_BEHAVIOR_VERIFY_KEY,
   MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT,
+  MEMCACHED_BEHAVIOR_REPLICAS,
 } memcached_behavior;
 
 typedef enum {
@@ -230,8 +232,8 @@ struct memcached_st {
   memcached_free_function call_free;
   memcached_malloc_function call_malloc;
   memcached_realloc_function call_realloc;
+  uint8_t number_of_replicas;
 #ifdef NOT_USED /* Future Use */
-  uint8_t replicas;
   memcached_return warning;
 #endif
 };
index 5257b7bafdbd29929d05f772221017d2b9f44221..dbbffa79d105e0ddf308e838e1028ca85d91821b 100644 (file)
@@ -44,6 +44,7 @@ libmemcached_la_SOURCES = crc.c \
                          memcached_io.c \
                          md5.c \
                          memcached_key.c \
+                         memcached_mget.c \
                          memcached_quit.c \
                          memcached_parse.c \
                          memcached_response.c \
index 31c12ab3158bc52af7665376171816157f27e3c5..c0510619f8db392b16cd9ac3a83110e41eeb4587 100644 (file)
@@ -25,6 +25,7 @@ memcached_st *memcached_create(memcached_st *ptr)
   WATCHPOINT_ASSERT(result_ptr);
   ptr->poll_timeout= MEMCACHED_DEFAULT_TIMEOUT;
   ptr->distribution= MEMCACHED_DISTRIBUTION_MODULA;
+  ptr->number_of_replicas= 1;
 
   return ptr;
 }
index a607ed66e23298881c2ee5bfd6d3ea429aafbb62..3aa63eedc4a8a7af09240250cebf49851bbeea73 100644 (file)
@@ -7,9 +7,10 @@ static memcached_return memcached_auto(memcached_st *ptr,
                                        uint64_t *value)
 {
   size_t send_length;
-  memcached_return rc;
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
   unsigned int server_key;
+  uint8_t replicas= 0;
+  memcached_return rc[MEMCACHED_MAX_REPLICAS];
 
   unlikely (key_length == 0)
     return MEMCACHED_NO_KEY_PROVIDED;
@@ -29,36 +30,55 @@ static memcached_return memcached_auto(memcached_st *ptr,
   unlikely (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
     return MEMCACHED_WRITE_FAILURE;
 
-  rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
-  if (rc != MEMCACHED_SUCCESS)
-    return rc;
-
-  rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
-
-  /* 
-    So why recheck responce? Because the protocol is brain dead :)
-    The number returned might end up equaling one of the string 
-    values. Less chance of a mistake with strncmp() so we will 
-    use it. We still called memcached_response() though since it
-    worked its magic for non-blocking IO.
-  */
-  if (!strncmp(buffer, "ERROR\r\n", 7))
+  do 
   {
-    *value= 0;
-    rc= MEMCACHED_PROTOCOL_ERROR;
-  }
-  else if (!strncmp(buffer, "NOT_FOUND\r\n", 11))
+    rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
+    if (rc[replicas] != MEMCACHED_SUCCESS)
+      goto error;
+
+    rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
+
+    /* 
+      So why recheck responce? Because the protocol is brain dead :)
+      The number returned might end up equaling one of the string 
+      values. Less chance of a mistake with strncmp() so we will 
+      use it. We still called memcached_response() though since it
+      worked its magic for non-blocking IO.
+    */
+    if (!strncmp(buffer, "ERROR\r\n", 7))
+    {
+      *value= 0;
+      rc[replicas]= MEMCACHED_PROTOCOL_ERROR;
+    }
+    else if (!strncmp(buffer, "NOT_FOUND\r\n", 11))
+    {
+      *value= 0;
+      rc[replicas]= MEMCACHED_NOTFOUND;
+    }
+    else
+    {
+      *value= (uint64_t)strtoll(buffer, (char **)NULL, 10);
+      rc[replicas]= MEMCACHED_SUCCESS;
+    }
+    /* On error we just jump to the next potential server */
+error:
+    if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT)
+    {
+      if (server_key == (ptr->number_of_hosts - 1))
+        server_key= 0;
+      else
+        server_key++;
+    }
+  } while ((++replicas) < ptr->number_of_replicas);
+
+  /* As long as one object gets stored, we count this as a success */
+  while (replicas--)
   {
-    *value= 0;
-    rc= MEMCACHED_NOTFOUND;
-  }
-  else
-  {
-    *value= (uint64_t)strtoll(buffer, (char **)NULL, 10);
-    rc= MEMCACHED_SUCCESS;
+    if (rc[replicas] == MEMCACHED_STORED)
+      return MEMCACHED_SUCCESS;
   }
 
-  return rc;
+  return rc[0];
 }
 
 memcached_return memcached_increment(memcached_st *ptr, 
index 7c5db61d9b63e63234959100ec6bc51bd68f2f49..910a99e6c22b3c0857db89477b5cb41e1baf8241 100644 (file)
@@ -23,6 +23,16 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
 {
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_REPLICAS:
+    {
+      uint8_t number_of_replicas= (uint8_t)data;
+
+      if (number_of_replicas > ptr->number_of_hosts || number_of_replicas == 0 || number_of_replicas > MEMCACHED_MAX_REPLICAS)
+        return MEMCACHED_FAILURE;
+      else
+        ptr->number_of_replicas= number_of_replicas;
+      break;
+    }
   case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
     set_behavior_flag(ptr, MEM_SUPPORT_CAS, data);
     break;
@@ -96,6 +106,8 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
 
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_REPLICAS:
+    return (unsigned long long)ptr->number_of_replicas;
   case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
     temp_flag= MEM_SUPPORT_CAS;
     break;
index 60e3bed4406e3ce89c1b6c64868559af8f77b7b8..24dc16601c178dc30b498658701bdae332948a31 100644 (file)
@@ -14,11 +14,10 @@ memcached_return memcached_delete_by_key(memcached_st *ptr,
 {
   char to_write;
   size_t send_length;
-  memcached_return rc;
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
   unsigned int server_key;
-
-  LIBMEMCACHED_MEMCACHED_DELETE_START();
+  uint8_t replicas= 0;
+  memcached_return rc[MEMCACHED_MAX_REPLICAS];
 
   unlikely (key_length == 0)
     return MEMCACHED_NO_KEY_PROVIDED;
@@ -38,28 +37,48 @@ memcached_return memcached_delete_by_key(memcached_st *ptr,
 
   if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
   {
-    rc= MEMCACHED_WRITE_FAILURE;
+    rc[replicas]= MEMCACHED_WRITE_FAILURE;
     goto error;
   }
 
   to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
 
-  rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
-  if (rc != MEMCACHED_SUCCESS)
-    goto error;
-
-  if ((ptr->flags & MEM_BUFFER_REQUESTS))
+  do
   {
-    rc= MEMCACHED_BUFFERED;
-  }
-  else
+    rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
+    if (rc[replicas] != MEMCACHED_SUCCESS)
+      goto error;
+
+    if ((ptr->flags & MEM_BUFFER_REQUESTS))
+    {
+      rc[replicas]= MEMCACHED_BUFFERED;
+    }
+    else
+    {
+      rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
+      if (rc[replicas] == MEMCACHED_DELETED)
+        rc[replicas]= MEMCACHED_SUCCESS;
+    }
+
+    /* On error we just jump to the next potential server */
+error:
+    if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT)
+    {
+      if (server_key == (ptr->number_of_hosts - 1))
+        server_key= 0;
+      else
+        server_key++;
+    }
+  } while ((++replicas) < ptr->number_of_replicas);
+
+  /* As long as one object gets stored, we count this as a success */
+  while (replicas--)
   {
-    rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
-    if (rc == MEMCACHED_DELETED)
-      rc= MEMCACHED_SUCCESS;
+    if (rc[replicas] == MEMCACHED_DELETED)
+      return MEMCACHED_SUCCESS;
+    else if (rc[replicas] == MEMCACHED_DELETED)
+      rc[replicas]= MEMCACHED_BUFFERED;
   }
 
-error:
-  LIBMEMCACHED_MEMCACHED_DELETE_END();
-  return rc;
+  return rc[0];
 }
index 8cc7ce30b04adee6eb7d62555ed9fd90b0c8ecd4..374793f5346e485718c9436ca7d679006178cabe 100644 (file)
@@ -20,153 +20,87 @@ char *memcached_get_by_key(memcached_st *ptr,
                            uint32_t *flags,
                            memcached_return *error)
 {
-  char *value;
-  size_t dummy_length;
-  uint32_t dummy_flags;
-  memcached_return dummy_error;
-
-  /* Request the key */
-  *error= memcached_mget_by_key(ptr, 
-                                master_key, 
-                                master_key_length, 
-                                &key, &key_length, 1);
-
-  value= memcached_fetch(ptr, NULL, NULL, 
-                         value_length, flags, error);
-  /* This is for historical reasons */
-  if (*error == MEMCACHED_END)
-    *error= MEMCACHED_NOTFOUND;
-
-  if (value == NULL)
-    return NULL;
-
-  (void)memcached_fetch(ptr, NULL, NULL, 
-                        &dummy_length, &dummy_flags, 
-                        &dummy_error);
-  WATCHPOINT_ASSERT(dummy_length == 0);
-
-  return value;
-}
-
-memcached_return memcached_mget(memcached_st *ptr, 
-                                char **keys, size_t *key_length, 
-                                unsigned int number_of_keys)
-{
-  return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
-}
-
-memcached_return memcached_mget_by_key(memcached_st *ptr, 
-                                       char *master_key, size_t master_key_length,
-                                       char **keys, size_t *key_length, 
-                                       unsigned int number_of_keys)
-{
-  unsigned int x;
-  memcached_return rc= MEMCACHED_NOTFOUND;
-  char *get_command= "get ";
-  uint8_t get_command_length= 4;
-  unsigned int master_server_key= 0;
-
-  LIBMEMCACHED_MEMCACHED_MGET_START();
-  ptr->cursor_server= 0;
-
-  if (number_of_keys == 0)
-    return MEMCACHED_NOTFOUND;
+  unsigned int server_key;
+  size_t send_length;
+  char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+  memcached_result_st *result_buffer= &ptr->result;
+  memcached_return rc[MEMCACHED_MAX_REPLICAS];
+  uint8_t replicas= 0;
 
   if (ptr->number_of_hosts == 0)
-    return MEMCACHED_NO_SERVERS;
-
-  if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
-    return MEMCACHED_BAD_KEY_PROVIDED;
+  {
+    *error= MEMCACHED_NO_SERVERS;
+    return NULL;
+  }
 
-  if (ptr->flags & MEM_SUPPORT_CAS)
+  if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
   {
-    get_command= "gets ";
-    get_command_length= 5;
+    *value_length= 0;
+    *error= MEMCACHED_BAD_KEY_PROVIDED;
+    return NULL;
   }
 
-  if (master_key && master_key_length)
-    master_server_key= memcached_generate_hash(ptr, master_key, master_key_length);
+  if (master_key)
+    server_key= memcached_generate_hash(ptr, master_key, master_key_length);
+  else
+    server_key= memcached_generate_hash(ptr, key, key_length);
 
-  /* 
-    Here is where we pay for the non-block API. We need to remove any data sitting
-    in the queue before we start our get.
+  send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
+                        "get %.*s\r\n", (int)key_length, key);
 
-    It might be optimum to bounce the connection if count > some number.
-  */
-  for (x= 0; x < ptr->number_of_hosts; x++)
+  do
   {
-    if (memcached_server_response_count(&ptr->hosts[x]))
+    if (memcached_server_response_count(&ptr->hosts[server_key]))
     {
       char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
 
       if (ptr->flags & MEM_NO_BLOCK)
-        (void)memcached_io_write(&ptr->hosts[x], NULL, 0, 1);
+        (void)memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
 
-      while(memcached_server_response_count(&ptr->hosts[x]))
-        (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
+      while(memcached_server_response_count(&ptr->hosts[server_key]))
+        (void)memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result_buffer);
     }
-  }
 
-  /* 
-    If a server fails we warn about errors and start all over with sending keys
-    to the server.
-  */
-  for (x= 0; x < number_of_keys; x++)
-  {
-    unsigned int server_key;
+    rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
+    if (rc[replicas] != MEMCACHED_SUCCESS)
+      goto error;
 
-    if (master_server_key)
-      server_key= master_server_key;
-    else
-      server_key= memcached_generate_hash(ptr, keys[x], key_length[x]);
+    rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result_buffer);
 
-    if (memcached_server_response_count(&ptr->hosts[server_key]) == 0)
-    {
-      rc= memcached_connect(&ptr->hosts[server_key]);
-
-      if (rc != MEMCACHED_SUCCESS)
-        continue;
-
-      if ((memcached_io_write(&ptr->hosts[server_key], get_command, get_command_length, 0)) == -1)
-      {
-        rc= MEMCACHED_SOME_ERRORS;
-        continue;
-      }
-      WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0);
-      memcached_server_response_increment(&ptr->hosts[server_key]);
-      WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 1);
-    }
-
-    if ((memcached_io_write(&ptr->hosts[server_key], keys[x], key_length[x], 0)) == -1)
+    /* On no key found, we check the replica */
+    if (rc[replicas] == MEMCACHED_END) /* END means that we move on to the next */
     {
       memcached_server_response_reset(&ptr->hosts[server_key]);
-      rc= MEMCACHED_SOME_ERRORS;
-      continue;
     }
-
-    if ((memcached_io_write(&ptr->hosts[server_key], " ", 1, 0)) == -1)
+    else if (rc[replicas] == MEMCACHED_SUCCESS)
     {
-      memcached_server_response_reset(&ptr->hosts[server_key]);
-      rc= MEMCACHED_SOME_ERRORS;
-      continue;
+      *value_length= memcached_string_length(&result_buffer->value);
+    
+      if (result_buffer->flags)
+        *flags= result_buffer->flags;
+
+      return  memcached_string_c_copy(&result_buffer->value);
     }
-  }
 
-  /*
-    Should we muddle on if some servers are dead?
-  */
-  for (x= 0; x < ptr->number_of_hosts; x++)
-  {
-    if (memcached_server_response_count(&ptr->hosts[x]))
+    /* On error we just jump to the next potential server */
+error:
+    if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT)
     {
-      /* We need to do something about non-connnected hosts in the future */
-      if ((memcached_io_write(&ptr->hosts[x], "\r\n", 2, 1)) == -1)
-      {
-        rc= MEMCACHED_SOME_ERRORS;
-      }
+      if (server_key == (ptr->number_of_hosts - 1))
+        server_key= 0;
+      else
+        server_key++;
     }
-  }
+  } while ((++replicas) < ptr->number_of_replicas);
+
+  /* TODO: An error on replica 1 of host down, but not found on 2, will give wrong error */
+  /* This is for historical reasons */
+  if (rc[0] == MEMCACHED_END)
+    *error= MEMCACHED_NOTFOUND;
+  else
+    *error= rc[0];
+
+  *value_length= 0;
 
-  LIBMEMCACHED_MEMCACHED_MGET_END();
-  return rc;
+  return NULL;
 }
diff --git a/lib/memcached_mget.c b/lib/memcached_mget.c
new file mode 100644 (file)
index 0000000..a25fa8e
--- /dev/null
@@ -0,0 +1,125 @@
+#include "common.h"
+#include "memcached_io.h"
+
+memcached_return memcached_mget(memcached_st *ptr, 
+                                char **keys, size_t *key_length, 
+                                unsigned int number_of_keys)
+{
+  return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
+}
+
+memcached_return memcached_mget_by_key(memcached_st *ptr, 
+                                       char *master_key, size_t master_key_length,
+                                       char **keys, size_t *key_length, 
+                                       unsigned int number_of_keys)
+{
+  unsigned int x;
+  memcached_return rc= MEMCACHED_NOTFOUND;
+  char *get_command= "get ";
+  uint8_t get_command_length= 4;
+  unsigned int master_server_key= 0;
+
+  LIBMEMCACHED_MEMCACHED_MGET_START();
+  ptr->cursor_server= 0;
+
+  if (number_of_keys == 0)
+    return MEMCACHED_NOTFOUND;
+
+  if (ptr->number_of_hosts == 0)
+    return MEMCACHED_NO_SERVERS;
+
+  if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
+    return MEMCACHED_BAD_KEY_PROVIDED;
+
+  if (ptr->flags & MEM_SUPPORT_CAS)
+  {
+    get_command= "gets ";
+    get_command_length= 5;
+  }
+
+  if (master_key && master_key_length)
+    master_server_key= memcached_generate_hash(ptr, master_key, master_key_length);
+
+  /* 
+    Here is where we pay for the non-block API. We need to remove any data sitting
+    in the queue before we start our get.
+
+    It might be optimum to bounce the connection if count > some number.
+  */
+  for (x= 0; x < ptr->number_of_hosts; x++)
+  {
+    if (memcached_server_response_count(&ptr->hosts[x]))
+    {
+      char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+
+      if (ptr->flags & MEM_NO_BLOCK)
+        (void)memcached_io_write(&ptr->hosts[x], NULL, 0, 1);
+
+      while(memcached_server_response_count(&ptr->hosts[x]))
+        (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
+    }
+  }
+
+  /* 
+    If a server fails we warn about errors and start all over with sending keys
+    to the server.
+  */
+  for (x= 0; x < number_of_keys; x++)
+  {
+    unsigned int server_key;
+
+    if (master_server_key)
+      server_key= master_server_key;
+    else
+      server_key= memcached_generate_hash(ptr, keys[x], key_length[x]);
+
+    if (memcached_server_response_count(&ptr->hosts[server_key]) == 0)
+    {
+      rc= memcached_connect(&ptr->hosts[server_key]);
+
+      if (rc != MEMCACHED_SUCCESS)
+        continue;
+
+      if ((memcached_io_write(&ptr->hosts[server_key], get_command, get_command_length, 0)) == -1)
+      {
+        rc= MEMCACHED_SOME_ERRORS;
+        continue;
+      }
+      WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0);
+      memcached_server_response_increment(&ptr->hosts[server_key]);
+      WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 1);
+    }
+
+    if ((memcached_io_write(&ptr->hosts[server_key], keys[x], key_length[x], 0)) == -1)
+    {
+      memcached_server_response_reset(&ptr->hosts[server_key]);
+      rc= MEMCACHED_SOME_ERRORS;
+      continue;
+    }
+
+    if ((memcached_io_write(&ptr->hosts[server_key], " ", 1, 0)) == -1)
+    {
+      memcached_server_response_reset(&ptr->hosts[server_key]);
+      rc= MEMCACHED_SOME_ERRORS;
+      continue;
+    }
+  }
+
+  /*
+    Should we muddle on if some servers are dead?
+  */
+  for (x= 0; x < ptr->number_of_hosts; x++)
+  {
+    if (memcached_server_response_count(&ptr->hosts[x]))
+    {
+      /* We need to do something about non-connnected hosts in the future */
+      if ((memcached_io_write(&ptr->hosts[x], "\r\n", 2, 1)) == -1)
+      {
+        rc= MEMCACHED_SOME_ERRORS;
+      }
+    }
+  }
+
+  LIBMEMCACHED_MEMCACHED_MGET_END();
+  return rc;
+}
index 0b5a73e10b3d45a0912f13c52ca47babf5cfdae7..2a0f88d493f355df2e862af79dfd1c8d8b2b22af 100644 (file)
@@ -53,9 +53,10 @@ static inline memcached_return memcached_send(memcached_st *ptr,
   char to_write;
   size_t write_length;
   ssize_t sent_length;
-  memcached_return rc;
+  memcached_return rc[MEMCACHED_MAX_REPLICAS];
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
   unsigned int server_key;
+  uint8_t replicas= 0;
 
   unlikely (key_length == 0)
     return MEMCACHED_NO_KEY_PROVIDED;
@@ -81,46 +82,56 @@ static inline memcached_return memcached_send(memcached_st *ptr,
                            (unsigned long long)expiration, value_length);
 
   if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
-  {
-    rc= MEMCACHED_WRITE_FAILURE;
-    goto error;
-  }
-
-  rc=  memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
-  if (rc != MEMCACHED_SUCCESS)
-    goto error;
-
-  if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
-  {
-    rc= MEMCACHED_WRITE_FAILURE;
-    goto error;
-  }
+    return MEMCACHED_WRITE_FAILURE;
 
   if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
     to_write= 0;
   else
     to_write= 1;
 
-  if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
+  do
   {
-    rc= MEMCACHED_WRITE_FAILURE;
-    goto error;
-  }
+    rc[replicas]=  memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
+
+    if (rc[replicas] != MEMCACHED_SUCCESS)
+      goto error;
 
-  if (to_write == 0)
-    return MEMCACHED_BUFFERED;
+    if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
+    {
+      rc[replicas]= MEMCACHED_WRITE_FAILURE;
+      goto error;
+    }
 
-  rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
+    if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
+    {
+      rc[replicas]= MEMCACHED_WRITE_FAILURE;
+      goto error;
+    }
 
-  if (rc == MEMCACHED_STORED)
-    return MEMCACHED_SUCCESS;
-  else 
-    return rc;
+    if (to_write == 0)
+      return MEMCACHED_BUFFERED;
+    else
+      rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
 
+    /* On error we just jump to the next potential server */
 error:
-  memcached_io_reset(&ptr->hosts[server_key]);
+    if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT)
+    {
+      if (server_key == (ptr->number_of_hosts - 1))
+        server_key= 0;
+      else
+        server_key++;
+    }
+  } while ((++replicas) < ptr->number_of_replicas);
+
+  /* As long as one object gets stored, we count this as a success */
+  while (replicas--)
+  {
+    if (rc[replicas] == MEMCACHED_STORED)
+      return MEMCACHED_SUCCESS;
+  }
 
-  return rc;
+  return rc[0];
 }
 
 memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length, 
index 2258757d646981c1d9dc537b29ef0f37f09ac447..b10ead9b6b225258fbb4f12f84d09787a1b8ec08 100644 (file)
@@ -2308,6 +2308,16 @@ memcached_return enable_consistent(memcached_st *memc)
   return MEMCACHED_SUCCESS;
 }
 
+memcached_return enable_replication(memcached_st *memc)
+{
+  uint64_t value;
+  value= 2;
+  enable_consistent(memc);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_REPLICAS, &value);
+
+  return MEMCACHED_SUCCESS;
+}
+
 memcached_return enable_cas(memcached_st *memc)
 {
   unsigned int set= 1;
@@ -2518,6 +2528,7 @@ collection_st collection[] ={
   {"poll_timeout", poll_timeout, 0, tests},
   {"gets", enable_cas, 0, tests},
   {"consistent", enable_consistent, 0, tests},
+  {"replication", enable_consistent, 0, tests},
   {"memory_allocators", set_memory_alloc, 0, tests},
 //  {"udp", pre_udp, 0, tests},
   {"version_1_2_3", check_for_1_2_3, 0, version_1_2_3},