From: Date: Fri, 29 Feb 2008 07:22:14 +0000 (-0800) Subject: Branch merge for fixes in reconnect. X-Git-Tag: _20~1^2~55^2~1^2~2 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=2f58ddc89656a2367c20ab50cef8e2fc5f476319;hp=f80333febbaae93bf7f5171724873821c41f04e9;p=m6w6%2Flibmemcached Branch merge for fixes in reconnect. --- diff --git a/ChangeLog b/ChangeLog index 199536b5..3a9e3afa 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,5 @@ + * Replication support has been added to the library. + 0.17 Wed Feb 27 03:33:29 PST 2008 * MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT added for connect timeout in non-block mode. diff --git a/docs/libmemcached.pod b/docs/libmemcached.pod index 1700cf5b..a4fbb876 100755 --- a/docs/libmemcached.pod +++ b/docs/libmemcached.pod @@ -38,6 +38,11 @@ This value can be translated to a printable string with memcached_strerr(3). Partitioning based on keys is supported in the library. Using the key partioning functions it is possible to group sets of object onto servers. +Replication can be done across multiple nodes. libmemcached(3) will spread +data out when consistent hashing has been enabled and replication has been +specified to send data to more then one node. This feature is considered +very experimental at this stage. + C structures are thread-safe, but each thread must contain its own structure (that is, if you want to share these among threads you must provide your own locking). No global variables are diff --git a/docs/memcached_behavior.pod b/docs/memcached_behavior.pod index 2268e845..8da9183d 100755 --- a/docs/memcached_behavior.pod +++ b/docs/memcached_behavior.pod @@ -37,6 +37,12 @@ memcached_behavior_set() will flush and reset all connections. =over 4 +=item MEMCACHED_BEHAVIOR_REPLICAS + +By default libmemcached(3) stores data in just one node of the cluster. Setting this value +to a number will cause that number of copies to be kept. The value must be greater then +zero, and must be at most the same value as the number of hosts in the cluster. + =item MEMCACHED_BEHAVIOR_NO_BLOCK Causes libmemcached(3) to use asychronous IO. This is the fastest transport diff --git a/include/memcached.h b/include/memcached.h index 570404e8..1c80878d 100644 --- a/include/memcached.h +++ b/include/memcached.h @@ -29,6 +29,7 @@ extern "C" { #define MEMCACHED_MAX_HOST_LENGTH 64 #define MEMCACHED_WHEEL_SIZE 1024 #define MEMCACHED_STRIDE 4 +#define MEMCACHED_MAX_REPLICAS 4 #define MEMCACHED_DEFAULT_TIMEOUT INT32_MAX /* string value */ @@ -104,6 +105,7 @@ typedef enum { MEMCACHED_BEHAVIOR_SORT_HOSTS, MEMCACHED_BEHAVIOR_VERIFY_KEY, MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT, + MEMCACHED_BEHAVIOR_REPLICAS, } memcached_behavior; typedef enum { @@ -230,8 +232,8 @@ struct memcached_st { memcached_free_function call_free; memcached_malloc_function call_malloc; memcached_realloc_function call_realloc; + uint8_t number_of_replicas; #ifdef NOT_USED /* Future Use */ - uint8_t replicas; memcached_return warning; #endif }; diff --git a/lib/Makefile.am b/lib/Makefile.am index 5257b7ba..dbbffa79 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -44,6 +44,7 @@ libmemcached_la_SOURCES = crc.c \ memcached_io.c \ md5.c \ memcached_key.c \ + memcached_mget.c \ memcached_quit.c \ memcached_parse.c \ memcached_response.c \ diff --git a/lib/memcached.c b/lib/memcached.c index 31c12ab3..c0510619 100644 --- a/lib/memcached.c +++ b/lib/memcached.c @@ -25,6 +25,7 @@ memcached_st *memcached_create(memcached_st *ptr) WATCHPOINT_ASSERT(result_ptr); ptr->poll_timeout= MEMCACHED_DEFAULT_TIMEOUT; ptr->distribution= MEMCACHED_DISTRIBUTION_MODULA; + ptr->number_of_replicas= 1; return ptr; } diff --git a/lib/memcached_auto.c b/lib/memcached_auto.c index a607ed66..3aa63eed 100644 --- a/lib/memcached_auto.c +++ b/lib/memcached_auto.c @@ -7,9 +7,10 @@ static memcached_return memcached_auto(memcached_st *ptr, uint64_t *value) { size_t send_length; - memcached_return rc; char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; unsigned int server_key; + uint8_t replicas= 0; + memcached_return rc[MEMCACHED_MAX_REPLICAS]; unlikely (key_length == 0) return MEMCACHED_NO_KEY_PROVIDED; @@ -29,36 +30,55 @@ static memcached_return memcached_auto(memcached_st *ptr, unlikely (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) return MEMCACHED_WRITE_FAILURE; - rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1); - if (rc != MEMCACHED_SUCCESS) - return rc; - - rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); - - /* - So why recheck responce? Because the protocol is brain dead :) - The number returned might end up equaling one of the string - values. Less chance of a mistake with strncmp() so we will - use it. We still called memcached_response() though since it - worked its magic for non-blocking IO. - */ - if (!strncmp(buffer, "ERROR\r\n", 7)) + do { - *value= 0; - rc= MEMCACHED_PROTOCOL_ERROR; - } - else if (!strncmp(buffer, "NOT_FOUND\r\n", 11)) + rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1); + if (rc[replicas] != MEMCACHED_SUCCESS) + goto error; + + rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + /* + So why recheck responce? Because the protocol is brain dead :) + The number returned might end up equaling one of the string + values. Less chance of a mistake with strncmp() so we will + use it. We still called memcached_response() though since it + worked its magic for non-blocking IO. + */ + if (!strncmp(buffer, "ERROR\r\n", 7)) + { + *value= 0; + rc[replicas]= MEMCACHED_PROTOCOL_ERROR; + } + else if (!strncmp(buffer, "NOT_FOUND\r\n", 11)) + { + *value= 0; + rc[replicas]= MEMCACHED_NOTFOUND; + } + else + { + *value= (uint64_t)strtoll(buffer, (char **)NULL, 10); + rc[replicas]= MEMCACHED_SUCCESS; + } + /* On error we just jump to the next potential server */ +error: + if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT) + { + if (server_key == (ptr->number_of_hosts - 1)) + server_key= 0; + else + server_key++; + } + } while ((++replicas) < ptr->number_of_replicas); + + /* As long as one object gets stored, we count this as a success */ + while (replicas--) { - *value= 0; - rc= MEMCACHED_NOTFOUND; - } - else - { - *value= (uint64_t)strtoll(buffer, (char **)NULL, 10); - rc= MEMCACHED_SUCCESS; + if (rc[replicas] == MEMCACHED_STORED) + return MEMCACHED_SUCCESS; } - return rc; + return rc[0]; } memcached_return memcached_increment(memcached_st *ptr, diff --git a/lib/memcached_behavior.c b/lib/memcached_behavior.c index 7c5db61d..910a99e6 100644 --- a/lib/memcached_behavior.c +++ b/lib/memcached_behavior.c @@ -23,6 +23,16 @@ memcached_return memcached_behavior_set(memcached_st *ptr, { switch (flag) { + case MEMCACHED_BEHAVIOR_REPLICAS: + { + uint8_t number_of_replicas= (uint8_t)data; + + if (number_of_replicas > ptr->number_of_hosts || number_of_replicas == 0 || number_of_replicas > MEMCACHED_MAX_REPLICAS) + return MEMCACHED_FAILURE; + else + ptr->number_of_replicas= number_of_replicas; + break; + } case MEMCACHED_BEHAVIOR_SUPPORT_CAS: set_behavior_flag(ptr, MEM_SUPPORT_CAS, data); break; @@ -96,6 +106,8 @@ uint64_t memcached_behavior_get(memcached_st *ptr, switch (flag) { + case MEMCACHED_BEHAVIOR_REPLICAS: + return (unsigned long long)ptr->number_of_replicas; case MEMCACHED_BEHAVIOR_SUPPORT_CAS: temp_flag= MEM_SUPPORT_CAS; break; diff --git a/lib/memcached_delete.c b/lib/memcached_delete.c index 60e3bed4..24dc1660 100644 --- a/lib/memcached_delete.c +++ b/lib/memcached_delete.c @@ -14,11 +14,10 @@ memcached_return memcached_delete_by_key(memcached_st *ptr, { char to_write; size_t send_length; - memcached_return rc; char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; unsigned int server_key; - - LIBMEMCACHED_MEMCACHED_DELETE_START(); + uint8_t replicas= 0; + memcached_return rc[MEMCACHED_MAX_REPLICAS]; unlikely (key_length == 0) return MEMCACHED_NO_KEY_PROVIDED; @@ -38,28 +37,48 @@ memcached_return memcached_delete_by_key(memcached_st *ptr, if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) { - rc= MEMCACHED_WRITE_FAILURE; + rc[replicas]= MEMCACHED_WRITE_FAILURE; goto error; } to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1; - rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write); - if (rc != MEMCACHED_SUCCESS) - goto error; - - if ((ptr->flags & MEM_BUFFER_REQUESTS)) + do { - rc= MEMCACHED_BUFFERED; - } - else + rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write); + if (rc[replicas] != MEMCACHED_SUCCESS) + goto error; + + if ((ptr->flags & MEM_BUFFER_REQUESTS)) + { + rc[replicas]= MEMCACHED_BUFFERED; + } + else + { + rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + if (rc[replicas] == MEMCACHED_DELETED) + rc[replicas]= MEMCACHED_SUCCESS; + } + + /* On error we just jump to the next potential server */ +error: + if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT) + { + if (server_key == (ptr->number_of_hosts - 1)) + server_key= 0; + else + server_key++; + } + } while ((++replicas) < ptr->number_of_replicas); + + /* As long as one object gets stored, we count this as a success */ + while (replicas--) { - rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); - if (rc == MEMCACHED_DELETED) - rc= MEMCACHED_SUCCESS; + if (rc[replicas] == MEMCACHED_DELETED) + return MEMCACHED_SUCCESS; + else if (rc[replicas] == MEMCACHED_DELETED) + rc[replicas]= MEMCACHED_BUFFERED; } -error: - LIBMEMCACHED_MEMCACHED_DELETE_END(); - return rc; + return rc[0]; } diff --git a/lib/memcached_get.c b/lib/memcached_get.c index 8cc7ce30..374793f5 100644 --- a/lib/memcached_get.c +++ b/lib/memcached_get.c @@ -20,153 +20,87 @@ char *memcached_get_by_key(memcached_st *ptr, uint32_t *flags, memcached_return *error) { - char *value; - size_t dummy_length; - uint32_t dummy_flags; - memcached_return dummy_error; - - /* Request the key */ - *error= memcached_mget_by_key(ptr, - master_key, - master_key_length, - &key, &key_length, 1); - - value= memcached_fetch(ptr, NULL, NULL, - value_length, flags, error); - /* This is for historical reasons */ - if (*error == MEMCACHED_END) - *error= MEMCACHED_NOTFOUND; - - if (value == NULL) - return NULL; - - (void)memcached_fetch(ptr, NULL, NULL, - &dummy_length, &dummy_flags, - &dummy_error); - WATCHPOINT_ASSERT(dummy_length == 0); - - return value; -} - -memcached_return memcached_mget(memcached_st *ptr, - char **keys, size_t *key_length, - unsigned int number_of_keys) -{ - return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys); -} - -memcached_return memcached_mget_by_key(memcached_st *ptr, - char *master_key, size_t master_key_length, - char **keys, size_t *key_length, - unsigned int number_of_keys) -{ - unsigned int x; - memcached_return rc= MEMCACHED_NOTFOUND; - char *get_command= "get "; - uint8_t get_command_length= 4; - unsigned int master_server_key= 0; - - LIBMEMCACHED_MEMCACHED_MGET_START(); - ptr->cursor_server= 0; - - if (number_of_keys == 0) - return MEMCACHED_NOTFOUND; + unsigned int server_key; + size_t send_length; + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + memcached_result_st *result_buffer= &ptr->result; + memcached_return rc[MEMCACHED_MAX_REPLICAS]; + uint8_t replicas= 0; if (ptr->number_of_hosts == 0) - return MEMCACHED_NO_SERVERS; - - if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED)) - return MEMCACHED_BAD_KEY_PROVIDED; + { + *error= MEMCACHED_NO_SERVERS; + return NULL; + } - if (ptr->flags & MEM_SUPPORT_CAS) + if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) { - get_command= "gets "; - get_command_length= 5; + *value_length= 0; + *error= MEMCACHED_BAD_KEY_PROVIDED; + return NULL; } - if (master_key && master_key_length) - master_server_key= memcached_generate_hash(ptr, master_key, master_key_length); + if (master_key) + server_key= memcached_generate_hash(ptr, master_key, master_key_length); + else + server_key= memcached_generate_hash(ptr, key, key_length); - /* - Here is where we pay for the non-block API. We need to remove any data sitting - in the queue before we start our get. + send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, + "get %.*s\r\n", (int)key_length, key); - It might be optimum to bounce the connection if count > some number. - */ - for (x= 0; x < ptr->number_of_hosts; x++) + do { - if (memcached_server_response_count(&ptr->hosts[x])) + if (memcached_server_response_count(&ptr->hosts[server_key])) { char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; if (ptr->flags & MEM_NO_BLOCK) - (void)memcached_io_write(&ptr->hosts[x], NULL, 0, 1); + (void)memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1); - while(memcached_server_response_count(&ptr->hosts[x])) - (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result); + while(memcached_server_response_count(&ptr->hosts[server_key])) + (void)memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result_buffer); } - } - /* - If a server fails we warn about errors and start all over with sending keys - to the server. - */ - for (x= 0; x < number_of_keys; x++) - { - unsigned int server_key; + rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1); + if (rc[replicas] != MEMCACHED_SUCCESS) + goto error; - if (master_server_key) - server_key= master_server_key; - else - server_key= memcached_generate_hash(ptr, keys[x], key_length[x]); + rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result_buffer); - if (memcached_server_response_count(&ptr->hosts[server_key]) == 0) - { - rc= memcached_connect(&ptr->hosts[server_key]); - - if (rc != MEMCACHED_SUCCESS) - continue; - - if ((memcached_io_write(&ptr->hosts[server_key], get_command, get_command_length, 0)) == -1) - { - rc= MEMCACHED_SOME_ERRORS; - continue; - } - WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0); - memcached_server_response_increment(&ptr->hosts[server_key]); - WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 1); - } - - if ((memcached_io_write(&ptr->hosts[server_key], keys[x], key_length[x], 0)) == -1) + /* On no key found, we check the replica */ + if (rc[replicas] == MEMCACHED_END) /* END means that we move on to the next */ { memcached_server_response_reset(&ptr->hosts[server_key]); - rc= MEMCACHED_SOME_ERRORS; - continue; } - - if ((memcached_io_write(&ptr->hosts[server_key], " ", 1, 0)) == -1) + else if (rc[replicas] == MEMCACHED_SUCCESS) { - memcached_server_response_reset(&ptr->hosts[server_key]); - rc= MEMCACHED_SOME_ERRORS; - continue; + *value_length= memcached_string_length(&result_buffer->value); + + if (result_buffer->flags) + *flags= result_buffer->flags; + + return memcached_string_c_copy(&result_buffer->value); } - } - /* - Should we muddle on if some servers are dead? - */ - for (x= 0; x < ptr->number_of_hosts; x++) - { - if (memcached_server_response_count(&ptr->hosts[x])) + /* On error we just jump to the next potential server */ +error: + if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT) { - /* We need to do something about non-connnected hosts in the future */ - if ((memcached_io_write(&ptr->hosts[x], "\r\n", 2, 1)) == -1) - { - rc= MEMCACHED_SOME_ERRORS; - } + if (server_key == (ptr->number_of_hosts - 1)) + server_key= 0; + else + server_key++; } - } + } while ((++replicas) < ptr->number_of_replicas); + + /* TODO: An error on replica 1 of host down, but not found on 2, will give wrong error */ + /* This is for historical reasons */ + if (rc[0] == MEMCACHED_END) + *error= MEMCACHED_NOTFOUND; + else + *error= rc[0]; + + *value_length= 0; - LIBMEMCACHED_MEMCACHED_MGET_END(); - return rc; + return NULL; } diff --git a/lib/memcached_mget.c b/lib/memcached_mget.c new file mode 100644 index 00000000..a25fa8e0 --- /dev/null +++ b/lib/memcached_mget.c @@ -0,0 +1,125 @@ +#include "common.h" +#include "memcached_io.h" + +memcached_return memcached_mget(memcached_st *ptr, + char **keys, size_t *key_length, + unsigned int number_of_keys) +{ + return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys); +} + +memcached_return memcached_mget_by_key(memcached_st *ptr, + char *master_key, size_t master_key_length, + char **keys, size_t *key_length, + unsigned int number_of_keys) +{ + unsigned int x; + memcached_return rc= MEMCACHED_NOTFOUND; + char *get_command= "get "; + uint8_t get_command_length= 4; + unsigned int master_server_key= 0; + + LIBMEMCACHED_MEMCACHED_MGET_START(); + ptr->cursor_server= 0; + + if (number_of_keys == 0) + return MEMCACHED_NOTFOUND; + + if (ptr->number_of_hosts == 0) + return MEMCACHED_NO_SERVERS; + + if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED)) + return MEMCACHED_BAD_KEY_PROVIDED; + + if (ptr->flags & MEM_SUPPORT_CAS) + { + get_command= "gets "; + get_command_length= 5; + } + + if (master_key && master_key_length) + master_server_key= memcached_generate_hash(ptr, master_key, master_key_length); + + /* + Here is where we pay for the non-block API. We need to remove any data sitting + in the queue before we start our get. + + It might be optimum to bounce the connection if count > some number. + */ + for (x= 0; x < ptr->number_of_hosts; x++) + { + if (memcached_server_response_count(&ptr->hosts[x])) + { + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + + if (ptr->flags & MEM_NO_BLOCK) + (void)memcached_io_write(&ptr->hosts[x], NULL, 0, 1); + + while(memcached_server_response_count(&ptr->hosts[x])) + (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result); + } + } + + /* + If a server fails we warn about errors and start all over with sending keys + to the server. + */ + for (x= 0; x < number_of_keys; x++) + { + unsigned int server_key; + + if (master_server_key) + server_key= master_server_key; + else + server_key= memcached_generate_hash(ptr, keys[x], key_length[x]); + + if (memcached_server_response_count(&ptr->hosts[server_key]) == 0) + { + rc= memcached_connect(&ptr->hosts[server_key]); + + if (rc != MEMCACHED_SUCCESS) + continue; + + if ((memcached_io_write(&ptr->hosts[server_key], get_command, get_command_length, 0)) == -1) + { + rc= MEMCACHED_SOME_ERRORS; + continue; + } + WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0); + memcached_server_response_increment(&ptr->hosts[server_key]); + WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 1); + } + + if ((memcached_io_write(&ptr->hosts[server_key], keys[x], key_length[x], 0)) == -1) + { + memcached_server_response_reset(&ptr->hosts[server_key]); + rc= MEMCACHED_SOME_ERRORS; + continue; + } + + if ((memcached_io_write(&ptr->hosts[server_key], " ", 1, 0)) == -1) + { + memcached_server_response_reset(&ptr->hosts[server_key]); + rc= MEMCACHED_SOME_ERRORS; + continue; + } + } + + /* + Should we muddle on if some servers are dead? + */ + for (x= 0; x < ptr->number_of_hosts; x++) + { + if (memcached_server_response_count(&ptr->hosts[x])) + { + /* We need to do something about non-connnected hosts in the future */ + if ((memcached_io_write(&ptr->hosts[x], "\r\n", 2, 1)) == -1) + { + rc= MEMCACHED_SOME_ERRORS; + } + } + } + + LIBMEMCACHED_MEMCACHED_MGET_END(); + return rc; +} diff --git a/lib/memcached_storage.c b/lib/memcached_storage.c index 0b5a73e1..2a0f88d4 100644 --- a/lib/memcached_storage.c +++ b/lib/memcached_storage.c @@ -53,9 +53,10 @@ static inline memcached_return memcached_send(memcached_st *ptr, char to_write; size_t write_length; ssize_t sent_length; - memcached_return rc; + memcached_return rc[MEMCACHED_MAX_REPLICAS]; char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; unsigned int server_key; + uint8_t replicas= 0; unlikely (key_length == 0) return MEMCACHED_NO_KEY_PROVIDED; @@ -81,46 +82,56 @@ static inline memcached_return memcached_send(memcached_st *ptr, (unsigned long long)expiration, value_length); if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) - { - rc= MEMCACHED_WRITE_FAILURE; - goto error; - } - - rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0); - if (rc != MEMCACHED_SUCCESS) - goto error; - - if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1) - { - rc= MEMCACHED_WRITE_FAILURE; - goto error; - } + return MEMCACHED_WRITE_FAILURE; if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) to_write= 0; else to_write= 1; - if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1) + do { - rc= MEMCACHED_WRITE_FAILURE; - goto error; - } + rc[replicas]= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0); + + if (rc[replicas] != MEMCACHED_SUCCESS) + goto error; - if (to_write == 0) - return MEMCACHED_BUFFERED; + if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1) + { + rc[replicas]= MEMCACHED_WRITE_FAILURE; + goto error; + } - rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1) + { + rc[replicas]= MEMCACHED_WRITE_FAILURE; + goto error; + } - if (rc == MEMCACHED_STORED) - return MEMCACHED_SUCCESS; - else - return rc; + if (to_write == 0) + return MEMCACHED_BUFFERED; + else + rc[replicas]= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + /* On error we just jump to the next potential server */ error: - memcached_io_reset(&ptr->hosts[server_key]); + if (replicas > 1 && ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT) + { + if (server_key == (ptr->number_of_hosts - 1)) + server_key= 0; + else + server_key++; + } + } while ((++replicas) < ptr->number_of_replicas); + + /* As long as one object gets stored, we count this as a success */ + while (replicas--) + { + if (rc[replicas] == MEMCACHED_STORED) + return MEMCACHED_SUCCESS; + } - return rc; + return rc[0]; } memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length, diff --git a/tests/function.c b/tests/function.c index 2258757d..b10ead9b 100644 --- a/tests/function.c +++ b/tests/function.c @@ -2308,6 +2308,16 @@ memcached_return enable_consistent(memcached_st *memc) return MEMCACHED_SUCCESS; } +memcached_return enable_replication(memcached_st *memc) +{ + uint64_t value; + value= 2; + enable_consistent(memc); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_REPLICAS, &value); + + return MEMCACHED_SUCCESS; +} + memcached_return enable_cas(memcached_st *memc) { unsigned int set= 1; @@ -2518,6 +2528,7 @@ collection_st collection[] ={ {"poll_timeout", poll_timeout, 0, tests}, {"gets", enable_cas, 0, tests}, {"consistent", enable_consistent, 0, tests}, + {"replication", enable_consistent, 0, tests}, {"memory_allocators", set_memory_alloc, 0, tests}, // {"udp", pre_udp, 0, tests}, {"version_1_2_3", check_for_1_2_3, 0, version_1_2_3},