From: Brian Aker Date: Sat, 20 Jun 2009 19:18:01 +0000 (-0700) Subject: Merging replication X-Git-Tag: 0.31~15 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;ds=inline;h=29db6c77e126d9ae0c416ad76402fdbef7372cdb;hp=-c;p=m6w6%2Flibmemcached Merging replication --- 29db6c77e126d9ae0c416ad76402fdbef7372cdb diff --combined libmemcached/memcached.h index 3dae65ee,23f5d99a..9ac0df70 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@@ -21,7 -21,6 +21,7 @@@ #ifdef MEMCACHED_INTERNAL #include #endif +#include #include #include #include @@@ -121,6 -120,7 +121,7 @@@ struct memcached_st memcached_trigger_key get_key_failure; memcached_trigger_delete_key delete_trigger; char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE]; + uint32_t number_of_replicas; }; diff --combined libmemcached/memcached_get.c index 05b317a6,cdae9922..0209b283 --- a/libmemcached/memcached_get.c +++ b/libmemcached/memcached_get.c @@@ -143,12 -143,6 +143,6 @@@ memcached_return memcached_mget_by_key( if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED)) return MEMCACHED_BAD_KEY_PROVIDED; - if (ptr->flags & MEM_SUPPORT_CAS) - { - get_command= "gets "; - get_command_length= 5; - } - if (master_key && master_key_length) { if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) @@@ -181,6 -175,12 +175,12 @@@ return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys, key_length, number_of_keys); + if (ptr->flags & MEM_SUPPORT_CAS) + { + get_command= "gets "; + get_command_length= 5; + } + /* If a server fails we warn about errors and start all over with sending keys to the server. @@@ -256,7 -256,7 +256,7 @@@ return rc; } - static memcached_return binary_mget_by_key(memcached_st *ptr, + static memcached_return simple_binary_mget(memcached_st *ptr, unsigned int master_server_key, bool is_master_key_set, char **keys, size_t *key_length, @@@ -357,3 -357,147 +357,148 @@@ return rc; } + + static memcached_return replication_binary_mget(memcached_st *ptr, + uint32_t* hash, bool* dead_servers, + char **keys, size_t *key_length, + unsigned int number_of_keys) + { + memcached_return rc= MEMCACHED_NOTFOUND; + uint32_t x; + + int flush= number_of_keys == 1; + + for (int replica = 0; replica <= ptr->number_of_replicas; ++replica) + { + bool success= true; + + for (uint32_t x= 0; x < number_of_keys; ++x) + { + if (hash[x] == ptr->number_of_hosts) + continue; /* Already successfully sent */ + + uint32_t server= hash[x] + replica; + while (server >= ptr->number_of_hosts) + server -= ptr->number_of_hosts; + + if (dead_servers[server]) + continue; + + if (memcached_server_response_count(&ptr->hosts[server]) == 0) + { + rc= memcached_connect(&ptr->hosts[server]); + if (rc != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[server]); + dead_servers[server]= true; + success= false; + continue; + } + } + + protocol_binary_request_getk request= {.bytes= {0}}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + if (number_of_keys == 1) + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK; + else + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ; + + request.message.header.request.keylen= htons((uint16_t)key_length[x]); + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl(key_length[x]); + + if ((memcached_io_write(&ptr->hosts[server], request.bytes, + sizeof(request.bytes), 0) == -1) || + (memcached_io_write(&ptr->hosts[server], keys[x], + key_length[x], flush) == -1)) + { + memcached_io_reset(&ptr->hosts[server]); + dead_servers[server]= true; + success= false; + continue; + } + memcached_server_response_increment(&ptr->hosts[server]); + } + + if (number_of_keys > 1) + { + /* + * Send a noop command to flush the buffers + */ + protocol_binary_request_noop request= {.bytes= {0}}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + + for (x= 0; x < ptr->number_of_hosts; x++) + if (memcached_server_response_count(&ptr->hosts[x])) + { + if (memcached_io_write(&ptr->hosts[x], request.bytes, + sizeof(request.bytes), 1) == -1) + { + memcached_io_reset(&ptr->hosts[x]); + dead_servers[x]= true; + success= false; + } + memcached_server_response_increment(&ptr->hosts[x]); + + /* mark all of the messages bound for this server as sent! */ + for (uint32_t x= 0; x < number_of_keys; ++x) + if (hash[x] == x) + hash[x]= ptr->number_of_hosts; + } + } + - if (success) { ++ if (success) + break; - } + } + + return rc; + } + + static memcached_return binary_mget_by_key(memcached_st *ptr, + unsigned int master_server_key, + bool is_master_key_set, + char **keys, size_t *key_length, + unsigned int number_of_keys) + { + memcached_return rc; + + if (ptr->number_of_replicas == 0) + { + rc= simple_binary_mget(ptr, master_server_key, is_master_key_set, + keys, key_length, number_of_keys); - } else { ++ } ++ else ++ { + uint32_t* hash; + bool* dead_servers; + + hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys); + dead_servers= ptr->call_calloc(ptr, ptr->number_of_hosts, sizeof(bool)); + + if (hash == NULL || dead_servers == NULL) + { + ptr->call_free(ptr, hash); + ptr->call_free(ptr, dead_servers); + return MEMCACHED_MEMORY_ALLOCATION_FAILURE; + } + + if (is_master_key_set) - for (unsigned int x= 0; x < number_of_keys; ++x) ++ for (unsigned int x= 0; x < number_of_keys; x++) + hash[x]= master_server_key; + else - for (unsigned int x= 0; x < number_of_keys; ++x) ++ for (unsigned int x= 0; x < number_of_keys; x++) + hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]); + + rc= replication_binary_mget(ptr, hash, dead_servers, keys, + key_length, number_of_keys); + + ptr->call_free(ptr, hash); + ptr->call_free(ptr, dead_servers); + + return MEMCACHED_SUCCESS; + } + + return rc; + } diff --combined tests/function.c index 99ad776a,7b13b1b6..5f83e1ea --- a/tests/function.c +++ b/tests/function.c @@@ -236,6 -236,7 +236,7 @@@ static test_return clone_test(memcache assert(clone->server_failure_limit == memc->server_failure_limit); assert(clone->snd_timeout == memc->snd_timeout); assert(clone->user_data == memc->user_data); + assert(clone->number_of_replicas == memc->number_of_replicas); memcached_free(clone); } @@@ -1552,6 -1553,9 +1553,9 @@@ static test_return behavior_test(memca value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE); assert(value > 0); + value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, value + 1); + assert((value + 1) == memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS)); return 0; } @@@ -3047,6 -3051,24 +3051,24 @@@ static memcached_return pre_binary(mem return rc; } + static memcached_return pre_replication(memcached_st *memc) + { + memcached_return rc= MEMCACHED_FAILURE; + if (pre_binary(memc) == MEMCACHED_SUCCESS) + { + /* + * Make sure that we store the item on all servers + * (master + replicas == number of servers) + */ + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, + memc->number_of_hosts - 1); + assert(rc == MEMCACHED_SUCCESS); + assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS) == memc->number_of_hosts - 1); + } + + return rc; + } + static void my_free(memcached_st *ptr __attribute__((unused)), void *mem) { free(mem); @@@ -3542,6 -3564,175 +3564,177 @@@ static test_return connection_pool_test } #endif + static test_return replication_set_test(memcached_st *memc) + { + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0); + + rc= memcached_set(memc, "bubba", 5, "0", 1, 0, 0); + assert(rc == MEMCACHED_SUCCESS); + + /* + ** "bubba" should now be stored on all of our servers. We don't have an + ** easy to use API to address each individual server, so I'll just iterate + ** through a bunch of "master keys" and I should most likely hit all of the + ** servers... + */ + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + size_t len; + uint32_t flags; + char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, + &len, &flags, &rc); + assert(rc == MEMCACHED_SUCCESS); + assert(val != NULL); + free(val); + } + + memcached_free(clone); + + return TEST_SUCCESS; + } + + static test_return replication_get_test(memcached_st *memc) + { + memcached_return rc; + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ + for (int host= 0; host < memc->number_of_hosts; ++host) { + memcached_st *clone= memcached_clone(NULL, memc); + clone->hosts[host].port= 0; + + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + size_t len; + uint32_t flags; + char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, + &len, &flags, &rc); + assert(rc == MEMCACHED_SUCCESS); + assert(val != NULL); + free(val); + } + + memcached_free(clone); + } + + return TEST_SUCCESS; + } + + static test_return replication_mget_test(memcached_st *memc) + { + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0); + + char *keys[]= { "bubba", "key1", "key2", "key3" }; + size_t len[]= { 5, 4, 4, 4 }; + + for (int x=0; x< 4; ++x) + { + rc= memcached_set(memc, keys[x], len[x], "0", 1, 0, 0); + assert(rc == MEMCACHED_SUCCESS); + } + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ + memcached_result_st result_obj; - for (int host= 0; host < clone->number_of_hosts; ++host) { ++ for (int host= 0; host < clone->number_of_hosts; ++host) ++ { + memcached_st *clone= memcached_clone(NULL, memc); + clone->hosts[host].port= 0; + + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + + rc= memcached_mget_by_key(clone, key, 1, keys, len, 4); + assert(rc == MEMCACHED_SUCCESS); + + memcached_result_st *results= memcached_result_create(clone, &result_obj); + assert(results); + + int hits= 0; + while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL) + { - ++hits; ++ hits++; + } + assert(hits == 4); + memcached_result_free(&result_obj); + } + + memcached_free(clone); + } + + return TEST_SUCCESS; + } + + static test_return replication_delete_test(memcached_st *memc) + { + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + /* Delete the items from all of the servers except 1 */ + uint64_t repl= memcached_behavior_get(memc, + MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, --repl); + + char *keys[]= { "bubba", "key1", "key2", "key3" }; + size_t len[]= { 5, 4, 4, 4 }; + + for (int x=0; x< 4; ++x) + { + rc= memcached_delete_by_key(memc, keys[0], len[0], keys[x], len[x], 0); + assert(rc == MEMCACHED_SUCCESS); + } + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ + int hash= memcached_generate_hash(memc, keys[0], len[0]); + for (int x= 0; x < (repl + 1); ++x) { + clone->hosts[hash].port= 0; + if (++hash == clone->number_of_hosts) + hash= 0; + } + + memcached_result_st result_obj; - for (int host= 0; host < clone->number_of_hosts; ++host) { ++ for (int host= 0; host < clone->number_of_hosts; ++host) ++ { + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + + rc= memcached_mget_by_key(clone, key, 1, keys, len, 4); + assert(rc == MEMCACHED_SUCCESS); + + memcached_result_st *results= memcached_result_create(clone, &result_obj); + assert(results); + + int hits= 0; + while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL) + { + ++hits; + } + assert(hits == 4); + memcached_result_free(&result_obj); + } + } + memcached_free(clone); + + return TEST_SUCCESS; + } + static void increment_request_id(uint16_t *id) { (*id)++; @@@ -3554,6 -3745,6 +3747,7 @@@ static uint16_t *get_udp_request_ids(me uint16_t *ids= malloc(sizeof(uint16_t) * memc->number_of_hosts); assert(ids != NULL); unsigned int x; ++ for (x= 0; x < memc->number_of_hosts; x++) ids[x]= get_udp_datagram_request_id((struct udp_datagram_header_st *) memc->hosts[x].write_buffer); @@@ -3565,6 -3756,6 +3759,7 @@@ static test_return post_udp_op_check(me unsigned int x; memcached_server_st *cur_server = memc->hosts; uint16_t *cur_req_ids = get_udp_request_ids(memc); ++ for (x= 0; x < memc->number_of_hosts; x++) { assert(cur_server[x].cursor_active == 0); @@@ -3572,6 -3763,6 +3767,7 @@@ } free(expected_req_ids); free(cur_req_ids); ++ return TEST_SUCCESS; } @@@ -3593,6 -3784,6 +3789,7 @@@ static memcached_return init_udp(memcac memcpy(servers, memc->hosts, sizeof(memcached_server_st) * num_hosts); for (x= 0; x < num_hosts; x++) memcached_server_free(&memc->hosts[x]); ++ memc->number_of_hosts= 0; memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1); for (x= 0; x < num_hosts; x++) @@@ -3600,6 -3791,6 +3797,7 @@@ assert(memcached_server_add_udp(memc, servers[x].hostname, servers[x].port) == MEMCACHED_SUCCESS); assert(memc->hosts[x].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH); } ++ return MEMCACHED_SUCCESS; } @@@ -4247,6 -4438,14 +4445,14 @@@ test_st user_tests[] = {0, 0, 0} }; + test_st replication_tests[]= { + {"set", 1, replication_set_test }, + {"get", 0, replication_get_test }, + {"mget", 0, replication_mget_test }, + {"delete", 0, replication_delete_test }, + {0, 0, 0} + }; + test_st generate_tests[] ={ {"generate_pairs", 1, generate_pairs }, {"generate_data", 1, generate_data }, @@@ -4351,6 -4550,7 +4557,7 @@@ collection_st collection[] = {"consistent_ketama", pre_behavior_ketama, 0, consistent_tests}, {"consistent_ketama_weighted", pre_behavior_ketama_weighted, 0, consistent_weighted_tests}, {"test_hashes", 0, 0, hash_tests}, + {"replication", pre_replication, 0, replication_tests}, {0, 0, 0, 0} };