Branch merge.
author <brian@gir-2.local> <>
Fri, 29 Feb 2008 07:19:55 +0000 (23:19 -0800)
committer <brian@gir-2.local> <>
Fri, 29 Feb 2008 07:19:55 +0000 (23:19 -0800)
1  2 
ChangeLog
docs/memcached_behavior.pod
include/memcached.h
lib/memcached_auto.c
lib/memcached_behavior.c
lib/memcached_delete.c
lib/memcached_mget.c
lib/memcached_storage.c
tests/function.c

diff --combined ChangeLog
index 55c1e8517649746c4777c0b83ee330f086c87370,f85969829e1ddb6480c54ee9d496ece0be5d709b..f3cd040bfd5dc30d01b217a02b8f0bd600bfa32d
+++ b/ChangeLog
@@@ -1,8 -1,8 +1,10 @@@
 +  * Replication support has been added to the library.
 +
  0.17
    * MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT added for connect timeout in
      non-block mode.
+   * Incompatible change in memcached_behavior_set() api. We now use a
+     uint64_t, instead of a pointer.
  
  0.16 Mon Feb 18 00:30:25 PST 2008
    * Work on the UDP protocol
index 428b26e2e05a969ba2432246d215551144f00295,2268e845976f77905a387fd44e66b5e7f63ea508..8da9183dd486fc1736a5681543a59e239c1d7f75
@@@ -10,14 -10,14 +10,14 @@@ C Client Library for memcached (libmemc
  
    #include <memcached.h>
  
-   unsigned long long
+   uint64_t
      memcached_behavior_get (memcached_st *ptr,
                              memcached_behavior flag);
  
    memcached_return
      memcached_behavior_set (memcached_st *ptr,
                              memcached_behavior flag,
-                             void *data);
+                             uint64_t data);
  
  =head1 DESCRIPTION
  
@@@ -37,12 -37,6 +37,12 @@@ memcached_behavior_set() will flush an
  
  =over 4
  
 +=item MEMCACHED_BEHAVIOR_REPLICAS
 +
 +By default libmemcached(3) stores data in just one node of the cluster. Setting this value
 +to a number will cause that number of copies to be kept. The value must be greater then 
 +zero, and must be at most the same value as the number of hosts in the cluster.
 +
  =item MEMCACHED_BEHAVIOR_NO_BLOCK
  
  Causes libmemcached(3) to use asychronous IO. This is the fastest transport
@@@ -117,6 -111,11 +117,11 @@@ memcached_behavior_get() returns eithe
  or 1 on simple flag behaviors (1 being enabled). memcached_behavior_set()
  returns whether or not the behavior was enabled.
  
+ =head1 NOTES
+ memcached_behavior_set() in version .17 was changed from taking a pointer
+ to data value, to taking a uin64_t. 
  =head1 HOME
  
  To find out more information please check:
diff --combined include/memcached.h
index ccfc930233727fa7c7719c9939c53d88b250db8b,2e3632954d5d98ce18f42698e514fa2963336d79..4a283ec6c83c8937bf22860054263cfad94216a2
@@@ -29,7 -29,6 +29,7 @@@ extern "C" 
  #define MEMCACHED_MAX_HOST_LENGTH 64
  #define MEMCACHED_WHEEL_SIZE 1024
  #define MEMCACHED_STRIDE 4
 +#define MEMCACHED_MAX_REPLICAS 4
  #define MEMCACHED_DEFAULT_TIMEOUT INT32_MAX
  
  /* string value */
@@@ -102,11 -101,9 +102,10 @@@ typedef enum 
    MEMCACHED_BEHAVIOR_POLL_TIMEOUT,
    MEMCACHED_BEHAVIOR_DISTRIBUTION,
    MEMCACHED_BEHAVIOR_BUFFER_REQUESTS,
-   MEMCACHED_BEHAVIOR_USER_DATA,
    MEMCACHED_BEHAVIOR_SORT_HOSTS,
    MEMCACHED_BEHAVIOR_VERIFY_KEY,
    MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT,
 +  MEMCACHED_BEHAVIOR_REPLICAS,
  } memcached_behavior;
  
  typedef enum {
@@@ -233,8 -230,8 +232,8 @@@ struct memcached_st 
    memcached_free_function call_free;
    memcached_malloc_function call_malloc;
    memcached_realloc_function call_realloc;
 +  uint8_t number_of_replicas;
  #ifdef NOT_USED /* Future Use */
 -  uint8_t replicas;
    memcached_return warning;
  #endif
  };
@@@ -264,7 -261,7 +263,7 @@@ memcached_return memcached_flush(memcac
  memcached_return memcached_verbosity(memcached_st *ptr, unsigned int verbosity);
  void memcached_quit(memcached_st *ptr);
  char *memcached_strerror(memcached_st *ptr, memcached_return rc);
- memcached_return memcached_behavior_set(memcached_st *ptr, memcached_behavior flag, void *data);
+ memcached_return memcached_behavior_set(memcached_st *ptr, memcached_behavior flag, uint64_t data);
  unsigned long long memcached_behavior_get(memcached_st *ptr, memcached_behavior flag);
  
  /* All of the functions for adding data to the server */
diff --combined lib/memcached_auto.c
index 1f61a18ce0fe05c318fee3d0f7ebafa469f65eca,a607ed66e23298881c2ee5bfd6d3ea429aafbb62..3aa63eedc4a8a7af09240250cebf49851bbeea73
@@@ -7,15 -7,14 +7,15 @@@ static memcached_return memcached_auto(
                                         uint64_t *value)
  {
    size_t send_length;
 -  memcached_return rc;
    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
    unsigned int server_key;
 +  uint8_t replicas= 0;
 +  memcached_return rc[MEMCACHED_MAX_REPLICAS];
  
-   if (key_length == 0)
+   unlikely (key_length == 0)
      return MEMCACHED_NO_KEY_PROVIDED;
  
-   if (ptr->hosts == NULL || ptr->number_of_hosts == 0)
+   unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0)
      return MEMCACHED_NO_SERVERS;
  
    if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
                          "%s %.*s %u\r\n", verb, 
                          (int)key_length, key,
                          offset);
-   if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
+   unlikely (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
      return MEMCACHED_WRITE_FAILURE;
  
 -  rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
 -  if (rc != MEMCACHED_SUCCESS)
 -    return rc;
 -
 -  rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
 -
 -  /* 
 -    So why recheck responce? Because the protocol is brain dead :)
 -    The number returned might end up equaling one of the string 
 -    values. Less chance of a mistake with strncmp() so we will 
 -    use it. We still called memcached_response() though since it
 -    worked its magic for non-blocking IO.
 -  */
 -  if (!strncmp(buffer, "ERROR\r\n", 7))
 +  do 
    {
 -    *value= 0;
 -    rc= MEMCACHED_PROTOCOL_ERROR;
 -  }
 -  else if (!strncmp(buffer, "NOT_FOUND\r\n", 11))
 +    rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
 +    if (rc[replicas] != MEMCACHED_SUCCESS)
 +      goto error;
 +
 +    rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
 +
 +    /* 
 +      So why recheck responce? Because the protocol is brain dead :)
 +      The number returned might end up equaling one of the string 
 +      values. Less chance of a mistake with strncmp() so we will 
 +      use it. We still called memcached_response() though since it
 +      worked its magic for non-blocking IO.
 +    */
 +    if (!strncmp(buffer, "ERROR\r\n", 7))
 +    {
 +      *value= 0;
 +      rc[replicas]= MEMCACHED_PROTOCOL_ERROR;
 +    }
 +    else if (!strncmp(buffer, "NOT_FOUND\r\n", 11))
 +    {
 +      *value= 0;
 +      rc[replicas]= MEMCACHED_NOTFOUND;
 +    }
 +    else
 +    {
 +      *value= (uint64_t)strtoll(buffer, (char **)NULL, 10);
 +      rc[replicas]= MEMCACHED_SUCCESS;
 +    }
 +    /* On error we just jump to the next potential server */
 +error:
 +    if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT)
 +    {
 +      if (server_key == (ptr->number_of_hosts - 1))
 +        server_key= 0;
 +      else
 +        server_key++;
 +    }
 +  } while ((++replicas) < ptr->number_of_replicas);
 +
 +  /* As long as one object gets stored, we count this as a success */
 +  while (replicas--)
    {
 -    *value= 0;
 -    rc= MEMCACHED_NOTFOUND;
 -  }
 -  else
 -  {
 -    *value= (uint64_t)strtoll(buffer, (char **)NULL, 10);
 -    rc= MEMCACHED_SUCCESS;
 +    if (rc[replicas] == MEMCACHED_STORED)
 +      return MEMCACHED_SUCCESS;
    }
  
 -  return rc;
 +  return rc[0];
  }
  
  memcached_return memcached_increment(memcached_st *ptr, 
diff --combined lib/memcached_behavior.c
index 2a2ef5cd9bfc4eea4302cf9115ab1da2a53bdb10,7c5db61d9b63e63234959100ec6bc51bd68f2f49..910a99e6c22b3c0857db89477b5cb41e1baf8241
@@@ -9,37 -9,20 +9,30 @@@
    We quit all connections so we can reset the sockets.
  */
  
- void set_behavior_flag(memcached_st *ptr, memcached_flags temp_flag, void *data)
+ void set_behavior_flag(memcached_st *ptr, memcached_flags temp_flag, uint64_t data)
  {
-   uint8_t truefalse;
    if (data)
-     truefalse= *(unsigned int *)data;
-   else
-     truefalse= 0;
-   if (truefalse)
      ptr->flags|= temp_flag;
    else
-     ptr->flags+= temp_flag;
+     ptr->flags&= ~temp_flag;
  }
  
  memcached_return memcached_behavior_set(memcached_st *ptr, 
                                          memcached_behavior flag, 
-                                         void *data)
+                                         uint64_t data)
  {
    switch (flag)
    {
 +  case MEMCACHED_BEHAVIOR_REPLICAS:
 +    {
 +      uint8_t number_of_replicas= (uint8_t)data;
 +
 +      if (number_of_replicas > ptr->number_of_hosts || number_of_replicas == 0 || number_of_replicas > MEMCACHED_MAX_REPLICAS)
 +        return MEMCACHED_FAILURE;
 +      else
 +        ptr->number_of_replicas= number_of_replicas;
 +      break;
 +    }
    case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
      set_behavior_flag(ptr, MEM_SUPPORT_CAS, data);
      break;
      memcached_quit(ptr);
      break;
    case MEMCACHED_BEHAVIOR_DISTRIBUTION:
-     ptr->distribution= *(memcached_server_distribution *)(data);
+     ptr->distribution= (memcached_server_distribution)data;
      break;
    case MEMCACHED_BEHAVIOR_HASH:
-     ptr->hash= *(memcached_hash *)(data);
+     ptr->hash= (memcached_hash)(data);
      break;
    case MEMCACHED_BEHAVIOR_CACHE_LOOKUPS:
      set_behavior_flag(ptr, MEM_USE_CACHE_LOOKUPS, data);
    case MEMCACHED_BEHAVIOR_SORT_HOSTS:
      set_behavior_flag(ptr, MEM_USE_SORT_HOSTS, data);
      break;
-   case MEMCACHED_BEHAVIOR_USER_DATA:
-     ptr->user_data= data;
-     break;
    case MEMCACHED_BEHAVIOR_POLL_TIMEOUT:
      {
-       int32_t timeout= (*((int32_t *)data));
+       int32_t timeout= (int32_t)data;
  
        ptr->poll_timeout= timeout;
        break;
      }
    case MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT:
      {
-       int32_t timeout= (*((int32_t *)data));
+       int32_t timeout= (int32_t)data;
  
        ptr->connect_timeout= timeout;
        break;
      }
    case MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE:
      {
-       ptr->send_size= (*((int *)data));
+       ptr->send_size= data;
        memcached_quit(ptr);
        break;
      }
    case MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE:
      {
-       ptr->recv_size= (*((int *)data));
+       ptr->recv_size= data;
        memcached_quit(ptr);
        break;
      }
    return MEMCACHED_SUCCESS;
  }
  
- unsigned long long memcached_behavior_get(memcached_st *ptr, 
-                                           memcached_behavior flag)
+ uint64_t memcached_behavior_get(memcached_st *ptr, 
+                                 memcached_behavior flag)
  {
    memcached_flags temp_flag= 0;
  
    switch (flag)
    {
 +  case MEMCACHED_BEHAVIOR_REPLICAS:
 +    return (unsigned long long)ptr->number_of_replicas;
    case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
      temp_flag= MEM_SUPPORT_CAS;
      break;
    case MEMCACHED_BEHAVIOR_SORT_HOSTS:
      temp_flag= MEM_USE_SORT_HOSTS;
      break;
-   case MEMCACHED_BEHAVIOR_USER_DATA:
-     return 0;
    case MEMCACHED_BEHAVIOR_POLL_TIMEOUT:
      {
        return (unsigned long long)ptr->poll_timeout;
diff --combined lib/memcached_delete.c
index 96db7b4efabe2f3bcf5247ab685f7197033bc236,60e3bed4406e3ce89c1b6c64868559af8f77b7b8..24dc16601c178dc30b498658701bdae332948a31
@@@ -14,15 -14,16 +14,15 @@@ memcached_return memcached_delete_by_ke
  {
    char to_write;
    size_t send_length;
 -  memcached_return rc;
    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
    unsigned int server_key;
 -
 -  LIBMEMCACHED_MEMCACHED_DELETE_START();
 +  uint8_t replicas= 0;
 +  memcached_return rc[MEMCACHED_MAX_REPLICAS];
  
-   if (key_length == 0)
+   unlikely (key_length == 0)
      return MEMCACHED_NO_KEY_PROVIDED;
  
-   if (ptr->hosts == NULL || ptr->number_of_hosts == 0)
+   unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0)
      return MEMCACHED_NO_SERVERS;
  
    server_key= memcached_generate_hash(ptr, master_key, master_key_length);
  
    if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
    {
 -    rc= MEMCACHED_WRITE_FAILURE;
 +    rc[replicas]= MEMCACHED_WRITE_FAILURE;
      goto error;
    }
  
    to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
  
 -  rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
 -  if (rc != MEMCACHED_SUCCESS)
 -    goto error;
 -
 -  if ((ptr->flags & MEM_BUFFER_REQUESTS))
 +  do
    {
 -    rc= MEMCACHED_BUFFERED;
 -  }
 -  else
 +    rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
 +    if (rc[replicas] != MEMCACHED_SUCCESS)
 +      goto error;
 +
 +    if ((ptr->flags & MEM_BUFFER_REQUESTS))
 +    {
 +      rc[replicas]= MEMCACHED_BUFFERED;
 +    }
 +    else
 +    {
 +      rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
 +      if (rc[replicas] == MEMCACHED_DELETED)
 +        rc[replicas]= MEMCACHED_SUCCESS;
 +    }
 +
 +    /* On error we just jump to the next potential server */
 +error:
 +    if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT)
 +    {
 +      if (server_key == (ptr->number_of_hosts - 1))
 +        server_key= 0;
 +      else
 +        server_key++;
 +    }
 +  } while ((++replicas) < ptr->number_of_replicas);
 +
 +  /* As long as one object gets stored, we count this as a success */
 +  while (replicas--)
    {
 -    rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
 -    if (rc == MEMCACHED_DELETED)
 -      rc= MEMCACHED_SUCCESS;
 +    if (rc[replicas] == MEMCACHED_DELETED)
 +      return MEMCACHED_SUCCESS;
 +    else if (rc[replicas] == MEMCACHED_DELETED)
 +      rc[replicas]= MEMCACHED_BUFFERED;
    }
  
 -error:
 -  LIBMEMCACHED_MEMCACHED_DELETE_END();
 -  return rc;
 +  return rc[0];
  }
diff --combined lib/memcached_mget.c
index 0000000000000000000000000000000000000000,0000000000000000000000000000000000000000..a25fa8e09f30a6d5bc0f3d9210a6319d8672e4d0
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,125 @@@
++#include "common.h"
++#include "memcached_io.h"
++
++memcached_return memcached_mget(memcached_st *ptr, 
++                                char **keys, size_t *key_length, 
++                                unsigned int number_of_keys)
++{
++  return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
++}
++
++memcached_return memcached_mget_by_key(memcached_st *ptr, 
++                                       char *master_key, size_t master_key_length,
++                                       char **keys, size_t *key_length, 
++                                       unsigned int number_of_keys)
++{
++  unsigned int x;
++  memcached_return rc= MEMCACHED_NOTFOUND;
++  char *get_command= "get ";
++  uint8_t get_command_length= 4;
++  unsigned int master_server_key= 0;
++
++  LIBMEMCACHED_MEMCACHED_MGET_START();
++  ptr->cursor_server= 0;
++
++  if (number_of_keys == 0)
++    return MEMCACHED_NOTFOUND;
++
++  if (ptr->number_of_hosts == 0)
++    return MEMCACHED_NO_SERVERS;
++
++  if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
++    return MEMCACHED_BAD_KEY_PROVIDED;
++
++  if (ptr->flags & MEM_SUPPORT_CAS)
++  {
++    get_command= "gets ";
++    get_command_length= 5;
++  }
++
++  if (master_key && master_key_length)
++    master_server_key= memcached_generate_hash(ptr, master_key, master_key_length);
++
++  /* 
++    Here is where we pay for the non-block API. We need to remove any data sitting
++    in the queue before we start our get.
++
++    It might be optimum to bounce the connection if count > some number.
++  */
++  for (x= 0; x < ptr->number_of_hosts; x++)
++  {
++    if (memcached_server_response_count(&ptr->hosts[x]))
++    {
++      char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
++
++      if (ptr->flags & MEM_NO_BLOCK)
++        (void)memcached_io_write(&ptr->hosts[x], NULL, 0, 1);
++
++      while(memcached_server_response_count(&ptr->hosts[x]))
++        (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
++    }
++  }
++
++  /* 
++    If a server fails we warn about errors and start all over with sending keys
++    to the server.
++  */
++  for (x= 0; x < number_of_keys; x++)
++  {
++    unsigned int server_key;
++
++    if (master_server_key)
++      server_key= master_server_key;
++    else
++      server_key= memcached_generate_hash(ptr, keys[x], key_length[x]);
++
++    if (memcached_server_response_count(&ptr->hosts[server_key]) == 0)
++    {
++      rc= memcached_connect(&ptr->hosts[server_key]);
++
++      if (rc != MEMCACHED_SUCCESS)
++        continue;
++
++      if ((memcached_io_write(&ptr->hosts[server_key], get_command, get_command_length, 0)) == -1)
++      {
++        rc= MEMCACHED_SOME_ERRORS;
++        continue;
++      }
++      WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0);
++      memcached_server_response_increment(&ptr->hosts[server_key]);
++      WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 1);
++    }
++
++    if ((memcached_io_write(&ptr->hosts[server_key], keys[x], key_length[x], 0)) == -1)
++    {
++      memcached_server_response_reset(&ptr->hosts[server_key]);
++      rc= MEMCACHED_SOME_ERRORS;
++      continue;
++    }
++
++    if ((memcached_io_write(&ptr->hosts[server_key], " ", 1, 0)) == -1)
++    {
++      memcached_server_response_reset(&ptr->hosts[server_key]);
++      rc= MEMCACHED_SOME_ERRORS;
++      continue;
++    }
++  }
++
++  /*
++    Should we muddle on if some servers are dead?
++  */
++  for (x= 0; x < ptr->number_of_hosts; x++)
++  {
++    if (memcached_server_response_count(&ptr->hosts[x]))
++    {
++      /* We need to do something about non-connnected hosts in the future */
++      if ((memcached_io_write(&ptr->hosts[x], "\r\n", 2, 1)) == -1)
++      {
++        rc= MEMCACHED_SOME_ERRORS;
++      }
++    }
++  }
++
++  LIBMEMCACHED_MEMCACHED_MGET_END();
++  return rc;
++}
diff --combined lib/memcached_storage.c
index 8dc381c0b77ec4a19477be68992841e1aff9f752,fb3e6b2bc8027a64658c337e50ec807fe7c36af3..3b1ee242ab087747a5faad00b74281054c4a44ac
@@@ -53,18 -53,17 +53,18 @@@ static inline memcached_return memcache
    char to_write;
    size_t write_length;
    ssize_t sent_length;
 -  memcached_return rc;
 +  memcached_return rc[MEMCACHED_MAX_REPLICAS];
    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
    unsigned int server_key;
 +  uint8_t replicas= 0;
  
    WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
    WATCHPOINT_ASSERT(!(value && value_length == 0));
  
-   if (key_length == 0)
+   unlikely (key_length == 0)
      return MEMCACHED_NO_KEY_PROVIDED;
  
-   if (ptr->number_of_hosts == 0)
+   unlikely (ptr->number_of_hosts == 0)
      return MEMCACHED_NO_SERVERS;
  
    if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
                             (unsigned long long)expiration, value_length);
  
    if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
 -  {
 -    rc= MEMCACHED_WRITE_FAILURE;
 -    goto error;
 -  }
 -
 -  rc=  memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
 -  if (rc != MEMCACHED_SUCCESS)
 -    goto error;
 -
 -  if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
 -  {
 -    rc= MEMCACHED_WRITE_FAILURE;
 -    goto error;
 -  }
 +    return MEMCACHED_WRITE_FAILURE;
  
    if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
      to_write= 0;
    else
      to_write= 1;
  
 -  if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
 +  do
    {
 -    rc= MEMCACHED_WRITE_FAILURE;
 -    goto error;
 -  }
 +    rc[replicas]=  memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
 +
 +    if (rc[replicas] != MEMCACHED_SUCCESS)
 +      goto error;
  
 -  if (to_write == 0)
 -    return MEMCACHED_BUFFERED;
 +    if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
 +    {
 +      rc[replicas]= MEMCACHED_WRITE_FAILURE;
 +      goto error;
 +    }
  
 -  rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
 +    if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
 +    {
 +      rc[replicas]= MEMCACHED_WRITE_FAILURE;
 +      goto error;
 +    }
  
 -  if (rc == MEMCACHED_STORED)
 -    return MEMCACHED_SUCCESS;
 -  else 
 -    return rc;
 +    if (to_write == 0)
 +      return MEMCACHED_BUFFERED;
 +    else
 +      rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
  
 +    /* On error we just jump to the next potential server */
  error:
 -  memcached_io_reset(&ptr->hosts[server_key]);
 +    if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT)
 +    {
 +      if (server_key == (ptr->number_of_hosts - 1))
 +        server_key= 0;
 +      else
 +        server_key++;
 +    }
 +  } while ((++replicas) < ptr->number_of_replicas);
 +
 +  /* As long as one object gets stored, we count this as a success */
 +  while (replicas--)
 +  {
 +    if (rc[replicas] == MEMCACHED_STORED)
 +      return MEMCACHED_SUCCESS;
 +  }
  
 -  return rc;
 +  return rc[0];
  }
  
  memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length, 
diff --combined tests/function.c
index 69ae84b14fcfce8b4af94f7b11e9ed456891ec6f,2258757d646981c1d9dc537b29ef0f37f09ac447..b10ead9b6b225258fbb4f12f84d09787a1b8ec08
@@@ -63,12 -63,10 +63,10 @@@ uint8_t server_list_null_test(memcached
  
  uint8_t server_sort_test(memcached_st *ptr)
  {
-   unsigned int setting;
    memcached_server_st *server_list;
    memcached_return rc;
  
-   setting= 1;
-   memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_SORT_HOSTS, &setting);
+   memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_SORT_HOSTS, 1);
  
    server_list= memcached_server_list_append(NULL, "arg", 0, &rc);
    assert(server_list);
@@@ -269,7 -267,7 +267,7 @@@ uint8_t cas2_test(memcached_st *memc
    rc= memcached_flush(memc, 0);
    assert(rc == MEMCACHED_SUCCESS);
  
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, set);
  
    for (x= 0; x < 3; x++)
    {
@@@ -312,7 -310,7 +310,7 @@@ uint8_t cas_test(memcached_st *memc
    rc= memcached_flush(memc, 0);
    assert(rc == MEMCACHED_SUCCESS);
  
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, set);
  
    rc= memcached_set(memc, key, strlen(key), 
                      value, strlen(value),
@@@ -505,7 -503,7 +503,7 @@@ uint8_t bad_key_test(memcached_st *memc
    clone= memcached_clone(NULL, memc);
    assert(clone);
  
-   (void)memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_VERIFY_KEY, &set);
+   (void)memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_VERIFY_KEY, set);
  
    string= memcached_get(clone, key, strlen(key),
                          &string_length, &flags, &rc);
    assert(!string);
  
    set= 0;
-   (void)memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_VERIFY_KEY, &set);
+   (void)memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_VERIFY_KEY, set);
    string= memcached_get(clone, key, strlen(key),
                          &string_length, &flags, &rc);
    assert(rc == MEMCACHED_NOTFOUND);
@@@ -1124,36 -1122,36 +1122,36 @@@ uint8_t behavior_test(memcached_st *mem
    unsigned long long value;
    unsigned int set= 1;
  
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, set);
    value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NO_BLOCK);
    assert(value == 1);
  
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, set);
    value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY);
    assert(value == 1);
  
    set= MEMCACHED_HASH_MD5;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, set);
    value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_HASH);
    assert(value == MEMCACHED_HASH_MD5);
  
    set= 0;
  
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, set);
    value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NO_BLOCK);
    assert(value == 0);
  
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, set);
    value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY);
    assert(value == 0);
  
    set= MEMCACHED_HASH_DEFAULT;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, set);
    value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_HASH);
    assert(value == MEMCACHED_HASH_DEFAULT);
  
    set= MEMCACHED_HASH_CRC;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &set);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, set);
    value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_HASH);
    assert(value == MEMCACHED_HASH_CRC);
  
@@@ -1183,8 -1181,8 +1181,8 @@@ uint8_t user_supplied_bug1(memcached_s
    /* We just keep looking at the same values over and over */
    srandom(10);
  
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &setter);
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, setter);
  
  
    /* add key */
@@@ -1222,13 -1220,13 +1220,13 @@@ uint8_t user_supplied_bug2(memcached_s
    unsigned long long total;
  
    setter= 1;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &setter);
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, setter);
  #ifdef NOT_YET
    setter = 20 * 1024576;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE, &setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE, setter);
    setter = 20 * 1024576;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE, &setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE, setter);
    getter = memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE);
    getter = memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE);
  
@@@ -1279,13 -1277,13 +1277,13 @@@ uint8_t user_supplied_bug3(memcached_s
    size_t key_lengths[KEY_COUNT];
  
    setter= 1;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &setter);
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, setter);
  #ifdef NOT_YET
    setter = 20 * 1024576;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE, &setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE, setter);
    setter = 20 * 1024576;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE, &setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE, setter);
    getter = memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE);
    getter = memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE);
  #endif
@@@ -1648,10 -1646,10 +1646,10 @@@ uint8_t user_supplied_bug10(memcached_s
    memcached_st *mclone= memcached_clone(NULL, memc);
    int32_t timeout;
  
-   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_NO_BLOCK, &set);
-   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_TCP_NODELAY, &set);
+   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_NO_BLOCK, set);
+   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_TCP_NODELAY, set);
    timeout= 2;
-   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, &timeout);
+   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, timeout);
  
    value = (char*)malloc(value_length * sizeof(char));
  
@@@ -1689,10 -1687,10 +1687,10 @@@ uint8_t user_supplied_bug11(memcached_s
    int32_t timeout;
    memcached_st *mclone= memcached_clone(NULL, memc);
  
-   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_NO_BLOCK, &set);
-   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_TCP_NODELAY, &set);
+   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_NO_BLOCK, set);
+   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_TCP_NODELAY, set);
    timeout= -1;
-   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, &timeout);
+   memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, timeout);
  
    timeout= (int32_t)memcached_behavior_get(mclone, MEMCACHED_BEHAVIOR_POLL_TIMEOUT);
  
@@@ -1797,7 -1795,7 +1795,7 @@@ uint8_t user_supplied_bug13(memcached_s
  uint8_t user_supplied_bug14(memcached_st *memc)
  {
    int setter= 1;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &setter);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, setter);
    memcached_return rc;
    char *key= "foo";
    char *value;
@@@ -2003,7 -2001,7 +2001,7 @@@ uint8_t generate_buffer_data(memcached_
    int latch= 0;
  
    latch= 1;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, &latch);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, latch);
    generate_data(memc);
  
    return 0;
@@@ -2120,7 -2118,7 +2118,7 @@@ uint8_t delete_buffer_generate(memcache
    unsigned int x;
  
    latch= 1;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, &latch);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, latch);
  
    for (x= 0; x < global_count; x++)
    {
@@@ -2171,7 -2169,7 +2169,7 @@@ uint8_t add_host_test1(memcached_st *me
  
  memcached_return pre_nonblock(memcached_st *memc)
  {
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 0);
  
    return MEMCACHED_SUCCESS;
  }
  memcached_return pre_md5(memcached_st *memc)
  {
    memcached_hash value= MEMCACHED_HASH_MD5;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, value);
  
    return MEMCACHED_SUCCESS;
  }
  memcached_return pre_crc(memcached_st *memc)
  {
    memcached_hash value= MEMCACHED_HASH_CRC;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, value);
  
    return MEMCACHED_SUCCESS;
  }
  memcached_return pre_hsieh(memcached_st *memc)
  {
    memcached_hash value= MEMCACHED_HASH_HSIEH;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, value);
  
    return MEMCACHED_SUCCESS;
  }
  memcached_return pre_hash_fnv1_64(memcached_st *memc)
  {
    memcached_hash value= MEMCACHED_HASH_FNV1_64;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, value);
  
    return MEMCACHED_SUCCESS;
  }
  memcached_return pre_hash_fnv1a_64(memcached_st *memc)
  {
    memcached_hash value= MEMCACHED_HASH_FNV1A_64;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, value);
  
    return MEMCACHED_SUCCESS;
  }
  memcached_return pre_hash_fnv1_32(memcached_st *memc)
  {
    memcached_hash value= MEMCACHED_HASH_FNV1_32;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, value);
  
    return MEMCACHED_SUCCESS;
  }
  memcached_return pre_hash_fnv1a_32(memcached_st *memc)
  {
    memcached_hash value= MEMCACHED_HASH_FNV1A_32;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, value);
  
    return MEMCACHED_SUCCESS;
  }
  memcached_return pre_hash_ketama(memcached_st *memc)
  {
    memcached_hash value= MEMCACHED_HASH_KETAMA;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, value);
  
    return MEMCACHED_SUCCESS;
  }
@@@ -2297,7 -2295,7 +2295,7 @@@ memcached_return enable_consistent(memc
  {
    memcached_server_distribution value= MEMCACHED_DISTRIBUTION_CONSISTENT;
    memcached_hash hash;
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_DISTRIBUTION, &value);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_DISTRIBUTION, value);
    pre_hsieh(memc);
  
    value= (memcached_server_distribution)memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_DISTRIBUTION);
    return MEMCACHED_SUCCESS;
  }
  
 +memcached_return enable_replication(memcached_st *memc)
 +{
 +  uint64_t value;
 +  value= 2;
 +  enable_consistent(memc);
 +  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_REPLICAS, &value);
++
++  return MEMCACHED_SUCCESS;
 +}
 +
  memcached_return enable_cas(memcached_st *memc)
  {
    unsigned int set= 1;
        memc->hosts[0].minor_version >= 2 &&
        memc->hosts[0].micro_version >= 4)
    {
-     memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, &set);
+     memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, set);
  
      return MEMCACHED_SUCCESS;
    }
@@@ -2383,8 -2373,8 +2383,8 @@@ memcached_return pre_udp(memcached_st *
  
  memcached_return pre_nodelay(memcached_st *memc)
  {
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL);
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, NULL);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 0);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 0);
  
    return MEMCACHED_SUCCESS;
  }
@@@ -2395,7 -2385,7 +2395,7 @@@ memcached_return poll_timeout(memcached
  
    timeout= 100;
  
-   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, &timeout);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, timeout);
  
    timeout= (int32_t)memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_POLL_TIMEOUT);
  
@@@ -2528,7 -2518,6 +2528,7 @@@ collection_st collection[] =
    {"poll_timeout", poll_timeout, 0, tests},
    {"gets", enable_cas, 0, tests},
    {"consistent", enable_consistent, 0, tests},
 +  {"replication", enable_consistent, 0, tests},
    {"memory_allocators", set_memory_alloc, 0, tests},
  //  {"udp", pre_udp, 0, tests},
    {"version_1_2_3", check_for_1_2_3, 0, version_1_2_3},