Added support for storing replicas on multiple servers
authorTrond Norbye <trond.norbye@sun.com>
Tue, 9 Jun 2009 19:36:39 +0000 (21:36 +0200)
committerTrond Norbye <trond.norbye@sun.com>
Tue, 9 Jun 2009 19:36:39 +0000 (21:36 +0200)
docs/memcached_behavior.pod
libmemcached/memcached.c
libmemcached/memcached.h
libmemcached/memcached_behavior.c
libmemcached/memcached_constants.h
libmemcached/memcached_delete.c
libmemcached/memcached_get.c
libmemcached/memcached_storage.c
tests/function.c

index 71a46134b892cd985d952ca63a761236edec22af..00d762825d2da1f4757e2000b7843820843ba11a 100644 (file)
@@ -194,6 +194,14 @@ sent to the server.
 Set this value to specify that you really don't care about the result
 from your storage commands (set, add, replace, append, prepend). 
 
+=item MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS
+
+If you just want "a poor mans HA", you may specify the numbers of
+replicas libmemcached should store of each item (on different servers).
+This replication does not dedicate certain memcached servers to store the
+replicas in, but instead it will store the replicas together with all of the
+other objects (on the 'n' next servers specified in your server list).
+
 =back
 
 =head1 RETURN
index a93bef7d651512ac7d49c4cf92e260fb200fb866..b880d83377c493e9faeeea411218d5007507311e 100644 (file)
@@ -111,6 +111,7 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *source)
   new_clone->io_msg_watermark= source->io_msg_watermark;
   new_clone->io_bytes_watermark= source->io_bytes_watermark;
   new_clone->io_key_prefetch= source->io_key_prefetch;
+  new_clone->number_of_replicas= source->number_of_replicas;
 
   if (source->hosts)
     rc= memcached_server_push(new_clone, source->hosts);
index 7f2b9a3449ab9f532342d6ca77bdda8d1115b97c..47cc4c564d069f42c112a30421d985c365c96121 100644 (file)
@@ -119,6 +119,7 @@ struct memcached_st {
   memcached_trigger_key get_key_failure;
   memcached_trigger_delete_key delete_trigger;
   char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE];
+  uint32_t number_of_replicas;
 };
 
 
index fb201d4ed82c4f1f31233f1d8bd7ba279f91a3c8..974bda56db46eb8d0cb3711c5a768a2fc5e8f816 100644 (file)
@@ -24,6 +24,9 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
 {
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS:
+    ptr->number_of_replicas= (uint32_t)data;
+    break;
   case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
     ptr->io_msg_watermark= (int32_t)data;
     break;
@@ -169,6 +172,8 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
 
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS:
+    return ptr->number_of_replicas;
   case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
     return ptr->io_msg_watermark;
   case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
index 6d8d00f16c80de383b8854de74c202d41252474e..5fdede9a765923ddc6b428f7732078ca1ced2c24 100644 (file)
@@ -105,7 +105,8 @@ typedef enum {
   MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY,
   MEMCACHED_BEHAVIOR_NOREPLY,
   MEMCACHED_BEHAVIOR_USE_UDP,
-  MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS
+  MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS,
+  MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS
 } memcached_behavior;
 
 typedef enum {
index f2fab3221a53cca25203b936d100d86460c5a259..5eb7e39c5ddd604750bcde94ac14f5db89c57aae 100644 (file)
@@ -1,4 +1,5 @@
 #include "common.h"
+#include "memcached/protocol_binary.h"
 
 memcached_return memcached_delete(memcached_st *ptr, const char *key, size_t key_length,
                                   time_t expiration)
@@ -117,14 +118,34 @@ static inline memcached_return binary_delete(memcached_st *ptr,
       memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
   }
   
+  memcached_return rc= MEMCACHED_SUCCESS;
+
   if ((memcached_do(&ptr->hosts[server_key], request.bytes, 
                     sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
       (memcached_io_write(&ptr->hosts[server_key], key, 
                           key_length, flush) == -1)) 
   {
     memcached_io_reset(&ptr->hosts[server_key]);
-    return MEMCACHED_WRITE_FAILURE;
+    rc= MEMCACHED_WRITE_FAILURE;
+  }
+
+  if (ptr->number_of_replicas > 0) 
+  {
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ;
+
+    for (int x= 0; x < ptr->number_of_replicas; ++x)
+    {
+      ++server_key;
+      if (server_key == ptr->number_of_hosts)
+        server_key= 0;
+  
+      memcached_server_st* server= &ptr->hosts[server_key];
+      if ((memcached_do(server, (const char*)request.bytes, 
+                        sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
+          (memcached_io_write(server, key, key_length, flush) == -1))
+         memcached_io_reset(server);
+    }
   }
 
-  return MEMCACHED_SUCCESS;
+  return rc;
 }
index 05b317a6268b79d19b5a711d49dfb05cf4dc3cb1..cdfb5d29082c56366d253c826d948fa6c969015a 100644 (file)
@@ -143,12 +143,6 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
   if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
     return MEMCACHED_BAD_KEY_PROVIDED;
 
-  if (ptr->flags & MEM_SUPPORT_CAS)
-  {
-    get_command= "gets ";
-    get_command_length= 5;
-  }
-
   if (master_key && master_key_length)
   {
     if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
@@ -181,6 +175,12 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
     return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys, 
                               key_length, number_of_keys);
 
+  if (ptr->flags & MEM_SUPPORT_CAS)
+  {
+    get_command= "gets ";
+    get_command_length= 5;
+  }
+
   /* 
     If a server fails we warn about errors and start all over with sending keys
     to the server.
@@ -256,7 +256,7 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
   return rc;
 }
 
-static memcached_return binary_mget_by_key(memcached_st *ptr, 
+static memcached_return simple_binary_mget(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
                                            char **keys, size_t *key_length, 
@@ -357,3 +357,171 @@ static memcached_return binary_mget_by_key(memcached_st *ptr,
 
   return rc;
 }
+
+static memcached_return replication_binary_mget(memcached_st *ptr,
+                                             uint32_t* hash, bool* dead_servers,
+                                             char **keys, size_t *key_length,
+                                             unsigned int number_of_keys)
+{
+  memcached_return rc= MEMCACHED_NOTFOUND;
+  uint32_t x;
+
+  int flush= number_of_keys == 1;
+
+  for (int replica = 0; replica <= ptr->number_of_replicas; ++replica)
+  {
+    bool success= true;    
+    
+    for (uint32_t x= 0; x < number_of_keys; ++x)
+    {
+      if (hash[x] == ptr->number_of_hosts)
+        continue; /* Already successfully sent */
+
+      uint32_t server= hash[x] + replica;
+      while (server >= ptr->number_of_hosts)
+        server -= ptr->number_of_hosts;
+
+      if (dead_servers[server])
+        continue;
+
+      if (memcached_server_response_count(&ptr->hosts[server]) == 0)
+      {
+        rc= memcached_connect(&ptr->hosts[server]);
+        if (rc != MEMCACHED_SUCCESS)
+        {
+          memcached_io_reset(&ptr->hosts[server]);
+          dead_servers[server]= true;
+          success= false;
+          continue;
+        }
+      }
+
+      protocol_binary_request_getk request= {.bytes= {0}};
+      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+      if (number_of_keys == 1)
+        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
+      else
+        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
+
+      request.message.header.request.keylen= htons((uint16_t)key_length[x]);
+      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+      request.message.header.request.bodylen= htonl(key_length[x]);
+
+      if ((memcached_io_write(&ptr->hosts[server], request.bytes,
+                              sizeof(request.bytes), 0) == -1) ||
+          (memcached_io_write(&ptr->hosts[server], keys[x],
+                              key_length[x], flush) == -1))
+      {
+        memcached_io_reset(&ptr->hosts[server]);
+        dead_servers[server]= true;
+        success= false;
+        continue;
+      }
+      memcached_server_response_increment(&ptr->hosts[server]);
+    }
+
+    if (number_of_keys > 1)
+    {
+      /*
+       * Send a noop command to flush the buffers
+       */
+      protocol_binary_request_noop request= {.bytes= {0}};
+      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
+      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+
+      for (x= 0; x < ptr->number_of_hosts; x++)
+        if (memcached_server_response_count(&ptr->hosts[x]))
+        {
+          if (memcached_io_write(&ptr->hosts[x], request.bytes,
+                                 sizeof(request.bytes), 1) == -1)
+          {
+            memcached_io_reset(&ptr->hosts[x]);
+            dead_servers[x]= true;
+            success= false;
+          }
+          memcached_server_response_increment(&ptr->hosts[x]);
+
+          /* mark all of the messages bound for this server as sent! */
+          for (uint32_t x= 0; x < number_of_keys; ++x)
+            if (hash[x] == x)
+              hash[x]= ptr->number_of_hosts;
+        }
+    }
+
+    if (success) {
+      break;
+    }
+  }
+
+  return rc;
+}
+
+static memcached_return binary_mget_by_key(memcached_st *ptr,
+                                           unsigned int master_server_key,
+                                           bool is_master_key_set,
+                                           char **keys, size_t *key_length,
+                                           unsigned int number_of_keys)
+{
+  memcached_return rc;
+
+  if (ptr->number_of_replicas == 0) {
+    rc= simple_binary_mget(ptr, master_server_key, is_master_key_set,
+                           keys, key_length, number_of_keys);
+  } else {
+    uint32_t* hash;
+    bool* dead_servers;
+
+    if (ptr->call_malloc)
+    {
+      hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys);
+      dead_servers= ptr->call_malloc(ptr, sizeof(bool) * ptr->number_of_hosts);
+    }
+    else
+    {
+      hash = malloc(sizeof(uint32_t) * number_of_keys);
+      dead_servers= malloc(sizeof(bool) * ptr->number_of_hosts);
+    }
+
+    if (hash == NULL || dead_servers == NULL)
+    {
+      if (ptr->call_free)
+      {
+        if (hash != NULL) ptr->call_free(ptr, hash);
+        if (dead_servers != NULL) ptr->call_free(ptr, dead_servers);
+      }
+      else
+      {
+        free(hash); /* No need to check for NULL (just look in the C spec) */
+        free(dead_servers);
+      }
+      return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+    }
+
+    memset(dead_servers, 0, sizeof(bool) * ptr->number_of_hosts);
+
+    if (is_master_key_set)
+      for (unsigned int x= 0; x < number_of_keys; ++x)
+        hash[x]= master_server_key;
+    else
+      for (unsigned int x= 0; x < number_of_keys; ++x)
+        hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
+
+    rc= replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys);
+
+    if (ptr->call_free)
+    {
+      ptr->call_free(ptr, hash);
+      ptr->call_free(ptr, dead_servers);
+    }
+    else
+    {
+      free(hash);
+      free(dead_servers);
+    }
+
+    return MEMCACHED_SUCCESS;
+  }
+
+  return rc;
+}
index 97d345bdfd3d791e210d20d69928556cc5bd2e45..fe8024d305e0e72bf9b3b9f0c334e2ebe2b8a890 100644 (file)
@@ -42,7 +42,9 @@ static char *storage_op_string(memcached_storage_action verb)
   /* NOTREACHED */
 }
 
-static memcached_return memcached_send_binary(memcached_server_st* server, 
+static memcached_return memcached_send_binary(memcached_st *ptr,
+                                              const char *master_key, 
+                                              size_t master_key_length,
                                               const char *key, 
                                               size_t key_length, 
                                               const char *value, 
@@ -80,13 +82,14 @@ static inline memcached_return memcached_send(memcached_st *ptr,
   if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
     return MEMCACHED_BAD_KEY_PROVIDED;
 
-  server_key= memcached_generate_hash(ptr, master_key, master_key_length);
-
   if (ptr->flags & MEM_BINARY_PROTOCOL)
-    return memcached_send_binary(&ptr->hosts[server_key], key, key_length, 
-                                 value, value_length, expiration, 
+    return memcached_send_binary(ptr, master_key, master_key_length,
+                                 key, key_length,
+                                 value, value_length, expiration,
                                  flags, cas, verb);
 
+  server_key= memcached_generate_hash(ptr, master_key, master_key_length);
+
   if (cas)
     write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
                            "%s %s%.*s %u %llu %zu %llu%s\r\n", 
@@ -408,8 +411,10 @@ static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply)
 
 
 
-static memcached_return memcached_send_binary(memcached_server_st* server, 
-                                              const char *key, 
+static memcached_return memcached_send_binary(memcached_st *ptr,
+                                              const char *master_key, 
+                                              size_t master_key_length,
+                                              const char *key,
                                               size_t key_length, 
                                               const char *value, 
                                               size_t value_length, 
@@ -421,6 +426,9 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
   char flush;
   protocol_binary_request_set request= {.bytes= {0}};
   size_t send_length= sizeof(request.bytes);
+  uint32_t server_key= memcached_generate_hash(ptr, master_key, 
+                                               master_key_length);
+  memcached_server_st *server= &ptr->hosts[server_key];
   bool noreply= server->root->flags & MEM_NOREPLY;
 
   request.message.header.request.magic= PROTOCOL_BINARY_REQ;
@@ -461,7 +469,26 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
     memcached_io_reset(server);
     return MEMCACHED_WRITE_FAILURE;
   }
-  
+
+  if (verb == SET_OP && ptr->number_of_replicas > 0) 
+  {
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
+
+    for (int x= 0; x < ptr->number_of_replicas; ++x)
+    {
+      ++server_key;
+      if (server_key == ptr->number_of_hosts)
+        server_key= 0;
+
+      memcached_server_st *srv= &ptr->hosts[server_key];
+      if ((memcached_do(srv, (const char*)request.bytes, 
+                        send_length, 0) != MEMCACHED_SUCCESS) ||
+          (memcached_io_write(srv, key, key_length, 0) == -1) ||
+          (memcached_io_write(srv, value, value_length, flush) == -1))
+        memcached_io_reset(server);
+    }
+  }
+
   if (flush == 0)
     return MEMCACHED_BUFFERED;
 
index d15c1a0ce16b5b4155a2d3621cee0d048947a341..3113fb1f5943aa98f186b4d5751611e2e99547df 100644 (file)
@@ -235,6 +235,7 @@ static test_return  clone_test(memcached_st *memc)
     assert(clone->server_failure_limit == memc->server_failure_limit);
     assert(clone->snd_timeout == memc->snd_timeout);
     assert(clone->user_data == memc->user_data);
+    assert(clone->number_of_replicas == memc->number_of_replicas);
 
     memcached_free(clone);
   }
@@ -1541,6 +1542,9 @@ static test_return  behavior_test(memcached_st *memc)
   value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE);
   assert(value > 0);
 
+  value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, value + 1);
+  assert((value + 1) == memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS));
   return 0;
 }
 
@@ -3036,6 +3040,24 @@ static memcached_return  pre_binary(memcached_st *memc)
   return rc;
 }
 
+static memcached_return pre_replication(memcached_st *memc)
+{
+  memcached_return rc= MEMCACHED_FAILURE;
+  if (pre_binary(memc) == MEMCACHED_SUCCESS) 
+  {
+    /*
+     * Make sure that we store the item on all servers 
+     * (master + replicas == number of servers) 
+     */
+    rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 
+                               memc->number_of_hosts - 1);
+    assert(rc == MEMCACHED_SUCCESS);
+    assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS) == memc->number_of_hosts - 1);
+  }
+
+  return rc;
+}
+
 static void my_free(memcached_st *ptr __attribute__((unused)), void *mem)
 {
   free(mem);
@@ -3498,6 +3520,175 @@ static test_return connection_pool_test(memcached_st *memc)
 }
 #endif
 
+static test_return replication_set_test(memcached_st *memc)
+{
+  memcached_return rc;
+  memcached_st *clone= memcached_clone(NULL, memc);
+  memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0);
+
+  rc= memcached_set(memc, "bubba", 5, "0", 1, 0, 0);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  /*
+  ** "bubba" should now be stored on all of our servers. We don't have an
+  ** easy to use API to address each individual server, so I'll just iterate
+  ** through a bunch of "master keys" and I should most likely hit all of the
+  ** servers...
+  */
+  for (int x= 'a'; x <= 'z'; ++x)
+  {
+    char key[2]= { [0]= (char)x };
+    size_t len;
+    uint32_t flags;
+    char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, 
+                                    &len, &flags, &rc);
+    assert(rc == MEMCACHED_SUCCESS);
+    assert(val != NULL);
+    free(val);
+  }
+
+  memcached_free(clone);
+
+  return TEST_SUCCESS;
+}
+
+static test_return replication_get_test(memcached_st *memc)
+{
+  memcached_return rc;
+
+  /*
+   * Don't do the following in your code. I am abusing the internal details
+   * within the library, and this is not a supported interface.
+   * This is to verify correct behavior in the library
+   */
+  for (int host= 0; host < memc->number_of_hosts; ++host) {
+    memcached_st *clone= memcached_clone(NULL, memc);
+    clone->hosts[host].port= 0;
+
+    for (int x= 'a'; x <= 'z'; ++x)
+    {
+      char key[2]= { [0]= (char)x };
+      size_t len;
+      uint32_t flags;
+      char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, 
+                                      &len, &flags, &rc);
+      assert(rc == MEMCACHED_SUCCESS);
+      assert(val != NULL);
+      free(val);
+    }
+
+    memcached_free(clone);
+  }
+
+  return TEST_SUCCESS;
+}
+
+static test_return replication_mget_test(memcached_st *memc)
+{
+  memcached_return rc;
+  memcached_st *clone= memcached_clone(NULL, memc);
+  memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0);
+
+  char *keys[]= { "bubba", "key1", "key2", "key3" };
+  size_t len[]= { 5, 4, 4, 4 };
+
+  for (int x=0; x< 4; ++x)
+  {
+    rc= memcached_set(memc, keys[x], len[x], "0", 1, 0, 0);
+    assert(rc == MEMCACHED_SUCCESS);
+  }
+
+  /*
+   * Don't do the following in your code. I am abusing the internal details
+   * within the library, and this is not a supported interface.
+   * This is to verify correct behavior in the library
+   */
+  memcached_result_st result_obj;
+  for (int host= 0; host < clone->number_of_hosts; ++host) {
+    memcached_st *clone= memcached_clone(NULL, memc);
+    clone->hosts[host].port= 0;
+
+    for (int x= 'a'; x <= 'z'; ++x)
+    {
+      char key[2]= { [0]= (char)x };
+
+      rc= memcached_mget_by_key(clone, key, 1, keys, len, 4);
+      assert(rc == MEMCACHED_SUCCESS);
+
+      memcached_result_st *results= memcached_result_create(clone, &result_obj);
+      assert(results);
+
+      int hits= 0;
+      while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL)
+      {
+        ++hits;
+      }
+      assert(hits == 4);
+      memcached_result_free(&result_obj);
+    }
+
+    memcached_free(clone);
+  }
+
+  return TEST_SUCCESS;
+}
+
+static test_return replication_delete_test(memcached_st *memc)
+{
+  memcached_return rc;
+  memcached_st *clone= memcached_clone(NULL, memc);
+  /* Delete the items from all of the servers except 1 */
+  uint64_t repl= memcached_behavior_get(memc,
+                                        MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, --repl);
+
+  char *keys[]= { "bubba", "key1", "key2", "key3" };
+  size_t len[]= { 5, 4, 4, 4 };
+
+  for (int x=0; x< 4; ++x)
+  {
+    rc= memcached_delete_by_key(memc, keys[0], len[0], keys[x], len[x], 0);
+    assert(rc == MEMCACHED_SUCCESS);
+  }
+
+  /*
+   * Don't do the following in your code. I am abusing the internal details
+   * within the library, and this is not a supported interface.
+   * This is to verify correct behavior in the library
+   */
+  int hash= memcached_generate_hash(memc, keys[0], len[0]);
+  for (int x= 0; x < (repl + 1); ++x) {
+    clone->hosts[hash].port= 0;
+    if (++hash == clone->number_of_hosts)
+      hash= 0;
+  }
+
+  memcached_result_st result_obj;
+  for (int host= 0; host < clone->number_of_hosts; ++host) {
+    for (int x= 'a'; x <= 'z'; ++x)
+    {
+      char key[2]= { [0]= (char)x };
+
+      rc= memcached_mget_by_key(clone, key, 1, keys, len, 4);
+      assert(rc == MEMCACHED_SUCCESS);
+
+      memcached_result_st *results= memcached_result_create(clone, &result_obj);
+      assert(results);
+
+      int hits= 0;
+      while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL)
+      {
+        ++hits;
+      }
+      assert(hits == 4);
+      memcached_result_free(&result_obj);
+    }
+  }
+  memcached_free(clone);
+
+  return TEST_SUCCESS;
+}
+
 static void increment_request_id(uint16_t *id)
 {
   (*id)++;
@@ -4202,6 +4393,14 @@ test_st user_tests[] ={
   {0, 0, 0}
 };
 
+test_st replication_tests[]= {
+  {"set", 1, replication_set_test },
+  {"get", 0, replication_get_test },
+  {"mget", 0, replication_mget_test },
+  {"delete", 0, replication_delete_test },
+  {0, 0, 0}
+};
+
 test_st generate_tests[] ={
   {"generate_pairs", 1, generate_pairs },
   {"generate_data", 1, generate_data },
@@ -4305,6 +4504,7 @@ collection_st collection[] ={
   {"consistent_ketama", pre_behavior_ketama, 0, consistent_tests},
   {"consistent_ketama_weighted", pre_behavior_ketama_weighted, 0, consistent_weighted_tests},
   {"test_hashes", 0, 0, hash_tests},
+  {"replication", pre_replication, 0, replication_tests},
   {0, 0, 0, 0}
 };