From: Trond Norbye Date: Tue, 9 Jun 2009 19:36:39 +0000 (+0200) Subject: Added support for storing replicas on multiple servers X-Git-Tag: 0.31~15^2~3 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=7afbf5c09228f6614b8831ec5f374971a38bbf9f;p=awesomized%2Flibmemcached Added support for storing replicas on multiple servers --- diff --git a/docs/memcached_behavior.pod b/docs/memcached_behavior.pod index 71a46134..00d76282 100644 --- a/docs/memcached_behavior.pod +++ b/docs/memcached_behavior.pod @@ -194,6 +194,14 @@ sent to the server. Set this value to specify that you really don't care about the result from your storage commands (set, add, replace, append, prepend). +=item MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS + +If you just want "a poor mans HA", you may specify the numbers of +replicas libmemcached should store of each item (on different servers). +This replication does not dedicate certain memcached servers to store the +replicas in, but instead it will store the replicas together with all of the +other objects (on the 'n' next servers specified in your server list). + =back =head1 RETURN diff --git a/libmemcached/memcached.c b/libmemcached/memcached.c index a93bef7d..b880d833 100644 --- a/libmemcached/memcached.c +++ b/libmemcached/memcached.c @@ -111,6 +111,7 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *source) new_clone->io_msg_watermark= source->io_msg_watermark; new_clone->io_bytes_watermark= source->io_bytes_watermark; new_clone->io_key_prefetch= source->io_key_prefetch; + new_clone->number_of_replicas= source->number_of_replicas; if (source->hosts) rc= memcached_server_push(new_clone, source->hosts); diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index 7f2b9a34..47cc4c56 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -119,6 +119,7 @@ struct memcached_st { memcached_trigger_key get_key_failure; memcached_trigger_delete_key delete_trigger; char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE]; + uint32_t number_of_replicas; }; diff --git a/libmemcached/memcached_behavior.c b/libmemcached/memcached_behavior.c index fb201d4e..974bda56 100644 --- a/libmemcached/memcached_behavior.c +++ b/libmemcached/memcached_behavior.c @@ -24,6 +24,9 @@ memcached_return memcached_behavior_set(memcached_st *ptr, { switch (flag) { + case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS: + ptr->number_of_replicas= (uint32_t)data; + break; case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK: ptr->io_msg_watermark= (int32_t)data; break; @@ -169,6 +172,8 @@ uint64_t memcached_behavior_get(memcached_st *ptr, switch (flag) { + case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS: + return ptr->number_of_replicas; case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK: return ptr->io_msg_watermark; case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK: diff --git a/libmemcached/memcached_constants.h b/libmemcached/memcached_constants.h index 6d8d00f1..5fdede9a 100644 --- a/libmemcached/memcached_constants.h +++ b/libmemcached/memcached_constants.h @@ -105,7 +105,8 @@ typedef enum { MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY, MEMCACHED_BEHAVIOR_NOREPLY, MEMCACHED_BEHAVIOR_USE_UDP, - MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS + MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS, + MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS } memcached_behavior; typedef enum { diff --git a/libmemcached/memcached_delete.c b/libmemcached/memcached_delete.c index f2fab322..5eb7e39c 100644 --- a/libmemcached/memcached_delete.c +++ b/libmemcached/memcached_delete.c @@ -1,4 +1,5 @@ #include "common.h" +#include "memcached/protocol_binary.h" memcached_return memcached_delete(memcached_st *ptr, const char *key, size_t key_length, time_t expiration) @@ -117,14 +118,34 @@ static inline memcached_return binary_delete(memcached_st *ptr, memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1); } + memcached_return rc= MEMCACHED_SUCCESS; + if ((memcached_do(&ptr->hosts[server_key], request.bytes, sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) || (memcached_io_write(&ptr->hosts[server_key], key, key_length, flush) == -1)) { memcached_io_reset(&ptr->hosts[server_key]); - return MEMCACHED_WRITE_FAILURE; + rc= MEMCACHED_WRITE_FAILURE; + } + + if (ptr->number_of_replicas > 0) + { + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ; + + for (int x= 0; x < ptr->number_of_replicas; ++x) + { + ++server_key; + if (server_key == ptr->number_of_hosts) + server_key= 0; + + memcached_server_st* server= &ptr->hosts[server_key]; + if ((memcached_do(server, (const char*)request.bytes, + sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) || + (memcached_io_write(server, key, key_length, flush) == -1)) + memcached_io_reset(server); + } } - return MEMCACHED_SUCCESS; + return rc; } diff --git a/libmemcached/memcached_get.c b/libmemcached/memcached_get.c index 05b317a6..cdfb5d29 100644 --- a/libmemcached/memcached_get.c +++ b/libmemcached/memcached_get.c @@ -143,12 +143,6 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED)) return MEMCACHED_BAD_KEY_PROVIDED; - if (ptr->flags & MEM_SUPPORT_CAS) - { - get_command= "gets "; - get_command_length= 5; - } - if (master_key && master_key_length) { if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) @@ -181,6 +175,12 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys, key_length, number_of_keys); + if (ptr->flags & MEM_SUPPORT_CAS) + { + get_command= "gets "; + get_command_length= 5; + } + /* If a server fails we warn about errors and start all over with sending keys to the server. @@ -256,7 +256,7 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, return rc; } -static memcached_return binary_mget_by_key(memcached_st *ptr, +static memcached_return simple_binary_mget(memcached_st *ptr, unsigned int master_server_key, bool is_master_key_set, char **keys, size_t *key_length, @@ -357,3 +357,171 @@ static memcached_return binary_mget_by_key(memcached_st *ptr, return rc; } + +static memcached_return replication_binary_mget(memcached_st *ptr, + uint32_t* hash, bool* dead_servers, + char **keys, size_t *key_length, + unsigned int number_of_keys) +{ + memcached_return rc= MEMCACHED_NOTFOUND; + uint32_t x; + + int flush= number_of_keys == 1; + + for (int replica = 0; replica <= ptr->number_of_replicas; ++replica) + { + bool success= true; + + for (uint32_t x= 0; x < number_of_keys; ++x) + { + if (hash[x] == ptr->number_of_hosts) + continue; /* Already successfully sent */ + + uint32_t server= hash[x] + replica; + while (server >= ptr->number_of_hosts) + server -= ptr->number_of_hosts; + + if (dead_servers[server]) + continue; + + if (memcached_server_response_count(&ptr->hosts[server]) == 0) + { + rc= memcached_connect(&ptr->hosts[server]); + if (rc != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[server]); + dead_servers[server]= true; + success= false; + continue; + } + } + + protocol_binary_request_getk request= {.bytes= {0}}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + if (number_of_keys == 1) + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK; + else + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ; + + request.message.header.request.keylen= htons((uint16_t)key_length[x]); + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl(key_length[x]); + + if ((memcached_io_write(&ptr->hosts[server], request.bytes, + sizeof(request.bytes), 0) == -1) || + (memcached_io_write(&ptr->hosts[server], keys[x], + key_length[x], flush) == -1)) + { + memcached_io_reset(&ptr->hosts[server]); + dead_servers[server]= true; + success= false; + continue; + } + memcached_server_response_increment(&ptr->hosts[server]); + } + + if (number_of_keys > 1) + { + /* + * Send a noop command to flush the buffers + */ + protocol_binary_request_noop request= {.bytes= {0}}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + + for (x= 0; x < ptr->number_of_hosts; x++) + if (memcached_server_response_count(&ptr->hosts[x])) + { + if (memcached_io_write(&ptr->hosts[x], request.bytes, + sizeof(request.bytes), 1) == -1) + { + memcached_io_reset(&ptr->hosts[x]); + dead_servers[x]= true; + success= false; + } + memcached_server_response_increment(&ptr->hosts[x]); + + /* mark all of the messages bound for this server as sent! */ + for (uint32_t x= 0; x < number_of_keys; ++x) + if (hash[x] == x) + hash[x]= ptr->number_of_hosts; + } + } + + if (success) { + break; + } + } + + return rc; +} + +static memcached_return binary_mget_by_key(memcached_st *ptr, + unsigned int master_server_key, + bool is_master_key_set, + char **keys, size_t *key_length, + unsigned int number_of_keys) +{ + memcached_return rc; + + if (ptr->number_of_replicas == 0) { + rc= simple_binary_mget(ptr, master_server_key, is_master_key_set, + keys, key_length, number_of_keys); + } else { + uint32_t* hash; + bool* dead_servers; + + if (ptr->call_malloc) + { + hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys); + dead_servers= ptr->call_malloc(ptr, sizeof(bool) * ptr->number_of_hosts); + } + else + { + hash = malloc(sizeof(uint32_t) * number_of_keys); + dead_servers= malloc(sizeof(bool) * ptr->number_of_hosts); + } + + if (hash == NULL || dead_servers == NULL) + { + if (ptr->call_free) + { + if (hash != NULL) ptr->call_free(ptr, hash); + if (dead_servers != NULL) ptr->call_free(ptr, dead_servers); + } + else + { + free(hash); /* No need to check for NULL (just look in the C spec) */ + free(dead_servers); + } + return MEMCACHED_MEMORY_ALLOCATION_FAILURE; + } + + memset(dead_servers, 0, sizeof(bool) * ptr->number_of_hosts); + + if (is_master_key_set) + for (unsigned int x= 0; x < number_of_keys; ++x) + hash[x]= master_server_key; + else + for (unsigned int x= 0; x < number_of_keys; ++x) + hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]); + + rc= replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys); + + if (ptr->call_free) + { + ptr->call_free(ptr, hash); + ptr->call_free(ptr, dead_servers); + } + else + { + free(hash); + free(dead_servers); + } + + return MEMCACHED_SUCCESS; + } + + return rc; +} diff --git a/libmemcached/memcached_storage.c b/libmemcached/memcached_storage.c index 97d345bd..fe8024d3 100644 --- a/libmemcached/memcached_storage.c +++ b/libmemcached/memcached_storage.c @@ -42,7 +42,9 @@ static char *storage_op_string(memcached_storage_action verb) /* NOTREACHED */ } -static memcached_return memcached_send_binary(memcached_server_st* server, +static memcached_return memcached_send_binary(memcached_st *ptr, + const char *master_key, + size_t master_key_length, const char *key, size_t key_length, const char *value, @@ -80,13 +82,14 @@ static inline memcached_return memcached_send(memcached_st *ptr, if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) return MEMCACHED_BAD_KEY_PROVIDED; - server_key= memcached_generate_hash(ptr, master_key, master_key_length); - if (ptr->flags & MEM_BINARY_PROTOCOL) - return memcached_send_binary(&ptr->hosts[server_key], key, key_length, - value, value_length, expiration, + return memcached_send_binary(ptr, master_key, master_key_length, + key, key_length, + value, value_length, expiration, flags, cas, verb); + server_key= memcached_generate_hash(ptr, master_key, master_key_length); + if (cas) write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, "%s %s%.*s %u %llu %zu %llu%s\r\n", @@ -408,8 +411,10 @@ static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply) -static memcached_return memcached_send_binary(memcached_server_st* server, - const char *key, +static memcached_return memcached_send_binary(memcached_st *ptr, + const char *master_key, + size_t master_key_length, + const char *key, size_t key_length, const char *value, size_t value_length, @@ -421,6 +426,9 @@ static memcached_return memcached_send_binary(memcached_server_st* server, char flush; protocol_binary_request_set request= {.bytes= {0}}; size_t send_length= sizeof(request.bytes); + uint32_t server_key= memcached_generate_hash(ptr, master_key, + master_key_length); + memcached_server_st *server= &ptr->hosts[server_key]; bool noreply= server->root->flags & MEM_NOREPLY; request.message.header.request.magic= PROTOCOL_BINARY_REQ; @@ -461,7 +469,26 @@ static memcached_return memcached_send_binary(memcached_server_st* server, memcached_io_reset(server); return MEMCACHED_WRITE_FAILURE; } - + + if (verb == SET_OP && ptr->number_of_replicas > 0) + { + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ; + + for (int x= 0; x < ptr->number_of_replicas; ++x) + { + ++server_key; + if (server_key == ptr->number_of_hosts) + server_key= 0; + + memcached_server_st *srv= &ptr->hosts[server_key]; + if ((memcached_do(srv, (const char*)request.bytes, + send_length, 0) != MEMCACHED_SUCCESS) || + (memcached_io_write(srv, key, key_length, 0) == -1) || + (memcached_io_write(srv, value, value_length, flush) == -1)) + memcached_io_reset(server); + } + } + if (flush == 0) return MEMCACHED_BUFFERED; diff --git a/tests/function.c b/tests/function.c index d15c1a0c..3113fb1f 100644 --- a/tests/function.c +++ b/tests/function.c @@ -235,6 +235,7 @@ static test_return clone_test(memcached_st *memc) assert(clone->server_failure_limit == memc->server_failure_limit); assert(clone->snd_timeout == memc->snd_timeout); assert(clone->user_data == memc->user_data); + assert(clone->number_of_replicas == memc->number_of_replicas); memcached_free(clone); } @@ -1541,6 +1542,9 @@ static test_return behavior_test(memcached_st *memc) value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE); assert(value > 0); + value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, value + 1); + assert((value + 1) == memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS)); return 0; } @@ -3036,6 +3040,24 @@ static memcached_return pre_binary(memcached_st *memc) return rc; } +static memcached_return pre_replication(memcached_st *memc) +{ + memcached_return rc= MEMCACHED_FAILURE; + if (pre_binary(memc) == MEMCACHED_SUCCESS) + { + /* + * Make sure that we store the item on all servers + * (master + replicas == number of servers) + */ + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, + memc->number_of_hosts - 1); + assert(rc == MEMCACHED_SUCCESS); + assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS) == memc->number_of_hosts - 1); + } + + return rc; +} + static void my_free(memcached_st *ptr __attribute__((unused)), void *mem) { free(mem); @@ -3498,6 +3520,175 @@ static test_return connection_pool_test(memcached_st *memc) } #endif +static test_return replication_set_test(memcached_st *memc) +{ + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0); + + rc= memcached_set(memc, "bubba", 5, "0", 1, 0, 0); + assert(rc == MEMCACHED_SUCCESS); + + /* + ** "bubba" should now be stored on all of our servers. We don't have an + ** easy to use API to address each individual server, so I'll just iterate + ** through a bunch of "master keys" and I should most likely hit all of the + ** servers... + */ + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + size_t len; + uint32_t flags; + char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, + &len, &flags, &rc); + assert(rc == MEMCACHED_SUCCESS); + assert(val != NULL); + free(val); + } + + memcached_free(clone); + + return TEST_SUCCESS; +} + +static test_return replication_get_test(memcached_st *memc) +{ + memcached_return rc; + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ + for (int host= 0; host < memc->number_of_hosts; ++host) { + memcached_st *clone= memcached_clone(NULL, memc); + clone->hosts[host].port= 0; + + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + size_t len; + uint32_t flags; + char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, + &len, &flags, &rc); + assert(rc == MEMCACHED_SUCCESS); + assert(val != NULL); + free(val); + } + + memcached_free(clone); + } + + return TEST_SUCCESS; +} + +static test_return replication_mget_test(memcached_st *memc) +{ + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0); + + char *keys[]= { "bubba", "key1", "key2", "key3" }; + size_t len[]= { 5, 4, 4, 4 }; + + for (int x=0; x< 4; ++x) + { + rc= memcached_set(memc, keys[x], len[x], "0", 1, 0, 0); + assert(rc == MEMCACHED_SUCCESS); + } + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ + memcached_result_st result_obj; + for (int host= 0; host < clone->number_of_hosts; ++host) { + memcached_st *clone= memcached_clone(NULL, memc); + clone->hosts[host].port= 0; + + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + + rc= memcached_mget_by_key(clone, key, 1, keys, len, 4); + assert(rc == MEMCACHED_SUCCESS); + + memcached_result_st *results= memcached_result_create(clone, &result_obj); + assert(results); + + int hits= 0; + while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL) + { + ++hits; + } + assert(hits == 4); + memcached_result_free(&result_obj); + } + + memcached_free(clone); + } + + return TEST_SUCCESS; +} + +static test_return replication_delete_test(memcached_st *memc) +{ + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + /* Delete the items from all of the servers except 1 */ + uint64_t repl= memcached_behavior_get(memc, + MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, --repl); + + char *keys[]= { "bubba", "key1", "key2", "key3" }; + size_t len[]= { 5, 4, 4, 4 }; + + for (int x=0; x< 4; ++x) + { + rc= memcached_delete_by_key(memc, keys[0], len[0], keys[x], len[x], 0); + assert(rc == MEMCACHED_SUCCESS); + } + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ + int hash= memcached_generate_hash(memc, keys[0], len[0]); + for (int x= 0; x < (repl + 1); ++x) { + clone->hosts[hash].port= 0; + if (++hash == clone->number_of_hosts) + hash= 0; + } + + memcached_result_st result_obj; + for (int host= 0; host < clone->number_of_hosts; ++host) { + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + + rc= memcached_mget_by_key(clone, key, 1, keys, len, 4); + assert(rc == MEMCACHED_SUCCESS); + + memcached_result_st *results= memcached_result_create(clone, &result_obj); + assert(results); + + int hits= 0; + while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL) + { + ++hits; + } + assert(hits == 4); + memcached_result_free(&result_obj); + } + } + memcached_free(clone); + + return TEST_SUCCESS; +} + static void increment_request_id(uint16_t *id) { (*id)++; @@ -4202,6 +4393,14 @@ test_st user_tests[] ={ {0, 0, 0} }; +test_st replication_tests[]= { + {"set", 1, replication_set_test }, + {"get", 0, replication_get_test }, + {"mget", 0, replication_mget_test }, + {"delete", 0, replication_delete_test }, + {0, 0, 0} +}; + test_st generate_tests[] ={ {"generate_pairs", 1, generate_pairs }, {"generate_data", 1, generate_data }, @@ -4305,6 +4504,7 @@ collection_st collection[] ={ {"consistent_ketama", pre_behavior_ketama, 0, consistent_tests}, {"consistent_ketama_weighted", pre_behavior_ketama_weighted, 0, consistent_weighted_tests}, {"test_hashes", 0, 0, hash_tests}, + {"replication", pre_replication, 0, replication_tests}, {0, 0, 0, 0} };