Merging replication
authorBrian Aker <brian@gir-3.local>
Sat, 20 Jun 2009 19:18:01 +0000 (12:18 -0700)
committerBrian Aker <brian@gir-3.local>
Sat, 20 Jun 2009 19:18:01 +0000 (12:18 -0700)
docs/memcached_behavior.pod
libmemcached/memcached.c
libmemcached/memcached.h
libmemcached/memcached_behavior.c
libmemcached/memcached_constants.h
libmemcached/memcached_delete.c
libmemcached/memcached_get.c
libmemcached/memcached_storage.c
tests/function.c

index 71a46134b892cd985d952ca63a761236edec22af..00d762825d2da1f4757e2000b7843820843ba11a 100644 (file)
@@ -194,6 +194,14 @@ sent to the server.
 Set this value to specify that you really don't care about the result
 from your storage commands (set, add, replace, append, prepend). 
 
+=item MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS
+
+If you just want "a poor mans HA", you may specify the numbers of
+replicas libmemcached should store of each item (on different servers).
+This replication does not dedicate certain memcached servers to store the
+replicas in, but instead it will store the replicas together with all of the
+other objects (on the 'n' next servers specified in your server list).
+
 =back
 
 =head1 RETURN
index 5991aabb46ced3c685ed7a26fcf0f9a4ce40786e..4e8ec062bcf4f979f92d7aa86404baa640240ee6 100644 (file)
@@ -105,6 +105,7 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *source)
   new_clone->io_msg_watermark= source->io_msg_watermark;
   new_clone->io_bytes_watermark= source->io_bytes_watermark;
   new_clone->io_key_prefetch= source->io_key_prefetch;
+  new_clone->number_of_replicas= source->number_of_replicas;
 
   if (source->hosts)
     rc= memcached_server_push(new_clone, source->hosts);
index 3dae65ee5b2cfa4fa06ecb6be42fc2616742a45c..9ac0df70e42b585ba4a3425b5ce578128c560004 100644 (file)
@@ -121,6 +121,7 @@ struct memcached_st {
   memcached_trigger_key get_key_failure;
   memcached_trigger_delete_key delete_trigger;
   char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE];
+  uint32_t number_of_replicas;
 };
 
 
index fb201d4ed82c4f1f31233f1d8bd7ba279f91a3c8..974bda56db46eb8d0cb3711c5a768a2fc5e8f816 100644 (file)
@@ -24,6 +24,9 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
 {
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS:
+    ptr->number_of_replicas= (uint32_t)data;
+    break;
   case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
     ptr->io_msg_watermark= (int32_t)data;
     break;
@@ -169,6 +172,8 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
 
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS:
+    return ptr->number_of_replicas;
   case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
     return ptr->io_msg_watermark;
   case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
index 6d8d00f16c80de383b8854de74c202d41252474e..5fdede9a765923ddc6b428f7732078ca1ced2c24 100644 (file)
@@ -105,7 +105,8 @@ typedef enum {
   MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY,
   MEMCACHED_BEHAVIOR_NOREPLY,
   MEMCACHED_BEHAVIOR_USE_UDP,
-  MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS
+  MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS,
+  MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS
 } memcached_behavior;
 
 typedef enum {
index f2fab3221a53cca25203b936d100d86460c5a259..93b1eb7a1a6f7e95415e20976881b7f598046a29 100644 (file)
@@ -1,4 +1,5 @@
 #include "common.h"
+#include "memcached/protocol_binary.h"
 
 memcached_return memcached_delete(memcached_st *ptr, const char *key, size_t key_length,
                                   time_t expiration)
@@ -117,14 +118,34 @@ static inline memcached_return binary_delete(memcached_st *ptr,
       memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
   }
   
+  memcached_return rc= MEMCACHED_SUCCESS;
+
   if ((memcached_do(&ptr->hosts[server_key], request.bytes, 
                     sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
       (memcached_io_write(&ptr->hosts[server_key], key, 
                           key_length, flush) == -1)) 
   {
     memcached_io_reset(&ptr->hosts[server_key]);
-    return MEMCACHED_WRITE_FAILURE;
+    rc= MEMCACHED_WRITE_FAILURE;
+  }
+
+  unlikely (ptr->number_of_replicas > 0) 
+  {
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ;
+
+    for (int x= 0; x < ptr->number_of_replicas; ++x)
+    {
+      ++server_key;
+      if (server_key == ptr->number_of_hosts)
+        server_key= 0;
+  
+      memcached_server_st* server= &ptr->hosts[server_key];
+      if ((memcached_do(server, (const char*)request.bytes, 
+                        sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
+          (memcached_io_write(server, key, key_length, flush) == -1))
+         memcached_io_reset(server);
+    }
   }
 
-  return MEMCACHED_SUCCESS;
+  return rc;
 }
index 05b317a6268b79d19b5a711d49dfb05cf4dc3cb1..0209b283ab1129ec5b53347e91d1420ca2366e36 100644 (file)
@@ -143,12 +143,6 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
   if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
     return MEMCACHED_BAD_KEY_PROVIDED;
 
-  if (ptr->flags & MEM_SUPPORT_CAS)
-  {
-    get_command= "gets ";
-    get_command_length= 5;
-  }
-
   if (master_key && master_key_length)
   {
     if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
@@ -181,6 +175,12 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
     return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys, 
                               key_length, number_of_keys);
 
+  if (ptr->flags & MEM_SUPPORT_CAS)
+  {
+    get_command= "gets ";
+    get_command_length= 5;
+  }
+
   /* 
     If a server fails we warn about errors and start all over with sending keys
     to the server.
@@ -256,7 +256,7 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
   return rc;
 }
 
-static memcached_return binary_mget_by_key(memcached_st *ptr, 
+static memcached_return simple_binary_mget(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
                                            char **keys, size_t *key_length, 
@@ -357,3 +357,148 @@ static memcached_return binary_mget_by_key(memcached_st *ptr,
 
   return rc;
 }
+
+static memcached_return replication_binary_mget(memcached_st *ptr,
+                                             uint32_t* hash, bool* dead_servers,
+                                             char **keys, size_t *key_length,
+                                             unsigned int number_of_keys)
+{
+  memcached_return rc= MEMCACHED_NOTFOUND;
+  uint32_t x;
+
+  int flush= number_of_keys == 1;
+
+  for (int replica = 0; replica <= ptr->number_of_replicas; ++replica)
+  {
+    bool success= true;    
+    
+    for (uint32_t x= 0; x < number_of_keys; ++x)
+    {
+      if (hash[x] == ptr->number_of_hosts)
+        continue; /* Already successfully sent */
+
+      uint32_t server= hash[x] + replica;
+      while (server >= ptr->number_of_hosts)
+        server -= ptr->number_of_hosts;
+
+      if (dead_servers[server])
+        continue;
+
+      if (memcached_server_response_count(&ptr->hosts[server]) == 0)
+      {
+        rc= memcached_connect(&ptr->hosts[server]);
+        if (rc != MEMCACHED_SUCCESS)
+        {
+          memcached_io_reset(&ptr->hosts[server]);
+          dead_servers[server]= true;
+          success= false;
+          continue;
+        }
+      }
+
+      protocol_binary_request_getk request= {.bytes= {0}};
+      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+      if (number_of_keys == 1)
+        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
+      else
+        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
+
+      request.message.header.request.keylen= htons((uint16_t)key_length[x]);
+      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+      request.message.header.request.bodylen= htonl(key_length[x]);
+
+      if ((memcached_io_write(&ptr->hosts[server], request.bytes,
+                              sizeof(request.bytes), 0) == -1) ||
+          (memcached_io_write(&ptr->hosts[server], keys[x],
+                              key_length[x], flush) == -1))
+      {
+        memcached_io_reset(&ptr->hosts[server]);
+        dead_servers[server]= true;
+        success= false;
+        continue;
+      }
+      memcached_server_response_increment(&ptr->hosts[server]);
+    }
+
+    if (number_of_keys > 1)
+    {
+      /*
+       * Send a noop command to flush the buffers
+       */
+      protocol_binary_request_noop request= {.bytes= {0}};
+      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
+      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+
+      for (x= 0; x < ptr->number_of_hosts; x++)
+        if (memcached_server_response_count(&ptr->hosts[x]))
+        {
+          if (memcached_io_write(&ptr->hosts[x], request.bytes,
+                                 sizeof(request.bytes), 1) == -1)
+          {
+            memcached_io_reset(&ptr->hosts[x]);
+            dead_servers[x]= true;
+            success= false;
+          }
+          memcached_server_response_increment(&ptr->hosts[x]);
+
+          /* mark all of the messages bound for this server as sent! */
+          for (uint32_t x= 0; x < number_of_keys; ++x)
+            if (hash[x] == x)
+              hash[x]= ptr->number_of_hosts;
+        }
+    }
+
+    if (success)
+      break;
+  }
+
+  return rc;
+}
+
+static memcached_return binary_mget_by_key(memcached_st *ptr,
+                                           unsigned int master_server_key,
+                                           bool is_master_key_set,
+                                           char **keys, size_t *key_length,
+                                           unsigned int number_of_keys)
+{
+  memcached_return rc;
+
+  if (ptr->number_of_replicas == 0) 
+  {
+    rc= simple_binary_mget(ptr, master_server_key, is_master_key_set,
+                           keys, key_length, number_of_keys);
+  } 
+  else 
+  {
+    uint32_t* hash;
+    bool* dead_servers;
+
+    hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys);
+    dead_servers= ptr->call_calloc(ptr, ptr->number_of_hosts, sizeof(bool));
+
+    if (hash == NULL || dead_servers == NULL)
+    {
+      ptr->call_free(ptr, hash);
+      ptr->call_free(ptr, dead_servers);
+      return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+    }
+
+    if (is_master_key_set)
+      for (unsigned int x= 0; x < number_of_keys; x++)
+        hash[x]= master_server_key;
+    else
+      for (unsigned int x= 0; x < number_of_keys; x++)
+        hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
+
+    rc= replication_binary_mget(ptr, hash, dead_servers, keys, 
+                                key_length, number_of_keys);
+
+    ptr->call_free(ptr, hash);
+    ptr->call_free(ptr, dead_servers);
+
+    return MEMCACHED_SUCCESS;
+  }
+
+  return rc;
+}
index 97d345bdfd3d791e210d20d69928556cc5bd2e45..713f831d2c4fe399cfddd8443e4b453a5a2ea5ca 100644 (file)
@@ -42,7 +42,9 @@ static char *storage_op_string(memcached_storage_action verb)
   /* NOTREACHED */
 }
 
-static memcached_return memcached_send_binary(memcached_server_st* server, 
+static memcached_return memcached_send_binary(memcached_st *ptr,
+                                              const char *master_key, 
+                                              size_t master_key_length,
                                               const char *key, 
                                               size_t key_length, 
                                               const char *value, 
@@ -80,13 +82,14 @@ static inline memcached_return memcached_send(memcached_st *ptr,
   if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
     return MEMCACHED_BAD_KEY_PROVIDED;
 
-  server_key= memcached_generate_hash(ptr, master_key, master_key_length);
-
   if (ptr->flags & MEM_BINARY_PROTOCOL)
-    return memcached_send_binary(&ptr->hosts[server_key], key, key_length, 
-                                 value, value_length, expiration, 
+    return memcached_send_binary(ptr, master_key, master_key_length,
+                                 key, key_length,
+                                 value, value_length, expiration,
                                  flags, cas, verb);
 
+  server_key= memcached_generate_hash(ptr, master_key, master_key_length);
+
   if (cas)
     write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
                            "%s %s%.*s %u %llu %zu %llu%s\r\n", 
@@ -408,8 +411,10 @@ static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply)
 
 
 
-static memcached_return memcached_send_binary(memcached_server_st* server, 
-                                              const char *key, 
+static memcached_return memcached_send_binary(memcached_st *ptr,
+                                              const char *master_key, 
+                                              size_t master_key_length,
+                                              const char *key,
                                               size_t key_length, 
                                               const char *value, 
                                               size_t value_length, 
@@ -421,6 +426,9 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
   char flush;
   protocol_binary_request_set request= {.bytes= {0}};
   size_t send_length= sizeof(request.bytes);
+  uint32_t server_key= memcached_generate_hash(ptr, master_key, 
+                                               master_key_length);
+  memcached_server_st *server= &ptr->hosts[server_key];
   bool noreply= server->root->flags & MEM_NOREPLY;
 
   request.message.header.request.magic= PROTOCOL_BINARY_REQ;
@@ -461,7 +469,26 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
     memcached_io_reset(server);
     return MEMCACHED_WRITE_FAILURE;
   }
-  
+
+  unlikely (verb == SET_OP && ptr->number_of_replicas > 0) 
+  {
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
+
+    for (int x= 0; x < ptr->number_of_replicas; ++x)
+    {
+      ++server_key;
+      if (server_key == ptr->number_of_hosts)
+        server_key= 0;
+
+      memcached_server_st *srv= &ptr->hosts[server_key];
+      if ((memcached_do(srv, (const char*)request.bytes, 
+                        send_length, 0) != MEMCACHED_SUCCESS) ||
+          (memcached_io_write(srv, key, key_length, 0) == -1) ||
+          (memcached_io_write(srv, value, value_length, flush) == -1))
+        memcached_io_reset(server);
+    }
+  }
+
   if (flush == 0)
     return MEMCACHED_BUFFERED;
 
index 99ad776aecc1bc8154c4dc58cca229b885fd925c..5f83e1ea2a16a05d8f5133c189aff8a212fbe172 100644 (file)
@@ -236,6 +236,7 @@ static test_return  clone_test(memcached_st *memc)
     assert(clone->server_failure_limit == memc->server_failure_limit);
     assert(clone->snd_timeout == memc->snd_timeout);
     assert(clone->user_data == memc->user_data);
+    assert(clone->number_of_replicas == memc->number_of_replicas);
 
     memcached_free(clone);
   }
@@ -1552,6 +1553,9 @@ static test_return  behavior_test(memcached_st *memc)
   value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE);
   assert(value > 0);
 
+  value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, value + 1);
+  assert((value + 1) == memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS));
   return 0;
 }
 
@@ -3047,6 +3051,24 @@ static memcached_return  pre_binary(memcached_st *memc)
   return rc;
 }
 
+static memcached_return pre_replication(memcached_st *memc)
+{
+  memcached_return rc= MEMCACHED_FAILURE;
+  if (pre_binary(memc) == MEMCACHED_SUCCESS) 
+  {
+    /*
+     * Make sure that we store the item on all servers 
+     * (master + replicas == number of servers) 
+     */
+    rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 
+                               memc->number_of_hosts - 1);
+    assert(rc == MEMCACHED_SUCCESS);
+    assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS) == memc->number_of_hosts - 1);
+  }
+
+  return rc;
+}
+
 static void my_free(memcached_st *ptr __attribute__((unused)), void *mem)
 {
   free(mem);
@@ -3542,6 +3564,177 @@ static test_return connection_pool_test(memcached_st *memc)
 }
 #endif
 
+static test_return replication_set_test(memcached_st *memc)
+{
+  memcached_return rc;
+  memcached_st *clone= memcached_clone(NULL, memc);
+  memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0);
+
+  rc= memcached_set(memc, "bubba", 5, "0", 1, 0, 0);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  /*
+  ** "bubba" should now be stored on all of our servers. We don't have an
+  ** easy to use API to address each individual server, so I'll just iterate
+  ** through a bunch of "master keys" and I should most likely hit all of the
+  ** servers...
+  */
+  for (int x= 'a'; x <= 'z'; ++x)
+  {
+    char key[2]= { [0]= (char)x };
+    size_t len;
+    uint32_t flags;
+    char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, 
+                                    &len, &flags, &rc);
+    assert(rc == MEMCACHED_SUCCESS);
+    assert(val != NULL);
+    free(val);
+  }
+
+  memcached_free(clone);
+
+  return TEST_SUCCESS;
+}
+
+static test_return replication_get_test(memcached_st *memc)
+{
+  memcached_return rc;
+
+  /*
+   * Don't do the following in your code. I am abusing the internal details
+   * within the library, and this is not a supported interface.
+   * This is to verify correct behavior in the library
+   */
+  for (int host= 0; host < memc->number_of_hosts; ++host) {
+    memcached_st *clone= memcached_clone(NULL, memc);
+    clone->hosts[host].port= 0;
+
+    for (int x= 'a'; x <= 'z'; ++x)
+    {
+      char key[2]= { [0]= (char)x };
+      size_t len;
+      uint32_t flags;
+      char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, 
+                                      &len, &flags, &rc);
+      assert(rc == MEMCACHED_SUCCESS);
+      assert(val != NULL);
+      free(val);
+    }
+
+    memcached_free(clone);
+  }
+
+  return TEST_SUCCESS;
+}
+
+static test_return replication_mget_test(memcached_st *memc)
+{
+  memcached_return rc;
+  memcached_st *clone= memcached_clone(NULL, memc);
+  memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0);
+
+  char *keys[]= { "bubba", "key1", "key2", "key3" };
+  size_t len[]= { 5, 4, 4, 4 };
+
+  for (int x=0; x< 4; ++x)
+  {
+    rc= memcached_set(memc, keys[x], len[x], "0", 1, 0, 0);
+    assert(rc == MEMCACHED_SUCCESS);
+  }
+
+  /*
+   * Don't do the following in your code. I am abusing the internal details
+   * within the library, and this is not a supported interface.
+   * This is to verify correct behavior in the library
+   */
+  memcached_result_st result_obj;
+  for (int host= 0; host < clone->number_of_hosts; ++host) 
+  {
+    memcached_st *clone= memcached_clone(NULL, memc);
+    clone->hosts[host].port= 0;
+
+    for (int x= 'a'; x <= 'z'; ++x)
+    {
+      char key[2]= { [0]= (char)x };
+
+      rc= memcached_mget_by_key(clone, key, 1, keys, len, 4);
+      assert(rc == MEMCACHED_SUCCESS);
+
+      memcached_result_st *results= memcached_result_create(clone, &result_obj);
+      assert(results);
+
+      int hits= 0;
+      while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL)
+      {
+        hits++;
+      }
+      assert(hits == 4);
+      memcached_result_free(&result_obj);
+    }
+
+    memcached_free(clone);
+  }
+
+  return TEST_SUCCESS;
+}
+
+static test_return replication_delete_test(memcached_st *memc)
+{
+  memcached_return rc;
+  memcached_st *clone= memcached_clone(NULL, memc);
+  /* Delete the items from all of the servers except 1 */
+  uint64_t repl= memcached_behavior_get(memc,
+                                        MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, --repl);
+
+  char *keys[]= { "bubba", "key1", "key2", "key3" };
+  size_t len[]= { 5, 4, 4, 4 };
+
+  for (int x=0; x< 4; ++x)
+  {
+    rc= memcached_delete_by_key(memc, keys[0], len[0], keys[x], len[x], 0);
+    assert(rc == MEMCACHED_SUCCESS);
+  }
+
+  /*
+   * Don't do the following in your code. I am abusing the internal details
+   * within the library, and this is not a supported interface.
+   * This is to verify correct behavior in the library
+   */
+  int hash= memcached_generate_hash(memc, keys[0], len[0]);
+  for (int x= 0; x < (repl + 1); ++x) {
+    clone->hosts[hash].port= 0;
+    if (++hash == clone->number_of_hosts)
+      hash= 0;
+  }
+
+  memcached_result_st result_obj;
+  for (int host= 0; host < clone->number_of_hosts; ++host) 
+  {
+    for (int x= 'a'; x <= 'z'; ++x)
+    {
+      char key[2]= { [0]= (char)x };
+
+      rc= memcached_mget_by_key(clone, key, 1, keys, len, 4);
+      assert(rc == MEMCACHED_SUCCESS);
+
+      memcached_result_st *results= memcached_result_create(clone, &result_obj);
+      assert(results);
+
+      int hits= 0;
+      while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL)
+      {
+        ++hits;
+      }
+      assert(hits == 4);
+      memcached_result_free(&result_obj);
+    }
+  }
+  memcached_free(clone);
+
+  return TEST_SUCCESS;
+}
+
 static void increment_request_id(uint16_t *id)
 {
   (*id)++;
@@ -3554,6 +3747,7 @@ static uint16_t *get_udp_request_ids(memcached_st *memc)
   uint16_t *ids= malloc(sizeof(uint16_t) * memc->number_of_hosts);
   assert(ids != NULL);
   unsigned int x;
+
   for (x= 0; x < memc->number_of_hosts; x++)
     ids[x]= get_udp_datagram_request_id((struct udp_datagram_header_st *) memc->hosts[x].write_buffer);
 
@@ -3565,6 +3759,7 @@ static test_return post_udp_op_check(memcached_st *memc, uint16_t *expected_req_
   unsigned int x;
   memcached_server_st *cur_server = memc->hosts;
   uint16_t *cur_req_ids = get_udp_request_ids(memc);
+
   for (x= 0; x < memc->number_of_hosts; x++)
   {
     assert(cur_server[x].cursor_active == 0);
@@ -3572,6 +3767,7 @@ static test_return post_udp_op_check(memcached_st *memc, uint16_t *expected_req_
   }
   free(expected_req_ids);
   free(cur_req_ids);
+
   return TEST_SUCCESS;
 }
 
@@ -3593,6 +3789,7 @@ static memcached_return init_udp(memcached_st *memc)
   memcpy(servers, memc->hosts, sizeof(memcached_server_st) * num_hosts);
   for (x= 0; x < num_hosts; x++)
     memcached_server_free(&memc->hosts[x]);
+
   memc->number_of_hosts= 0;
   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1);
   for (x= 0; x < num_hosts; x++)
@@ -3600,6 +3797,7 @@ static memcached_return init_udp(memcached_st *memc)
     assert(memcached_server_add_udp(memc, servers[x].hostname, servers[x].port) == MEMCACHED_SUCCESS);
     assert(memc->hosts[x].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
   }
+
   return MEMCACHED_SUCCESS;
 }
 
@@ -4247,6 +4445,14 @@ test_st user_tests[] ={
   {0, 0, 0}
 };
 
+test_st replication_tests[]= {
+  {"set", 1, replication_set_test },
+  {"get", 0, replication_get_test },
+  {"mget", 0, replication_mget_test },
+  {"delete", 0, replication_delete_test },
+  {0, 0, 0}
+};
+
 test_st generate_tests[] ={
   {"generate_pairs", 1, generate_pairs },
   {"generate_data", 1, generate_data },
@@ -4351,6 +4557,7 @@ collection_st collection[] ={
   {"consistent_ketama", pre_behavior_ketama, 0, consistent_tests},
   {"consistent_ketama_weighted", pre_behavior_ketama_weighted, 0, consistent_weighted_tests},
   {"test_hashes", 0, 0, hash_tests},
+  {"replication", pre_replication, 0, replication_tests},
   {0, 0, 0, 0}
 };