From: Date: Wed, 23 Jul 2008 19:16:31 +0000 (-0700) Subject: Pushing weighted ketama code. X-Git-Tag: _23~24 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=957f0cc7c87115224087f2c0de83a4c4aaa2c36d;p=m6w6%2Flibmemcached Pushing weighted ketama code. --- diff --git a/ChangeLog b/ChangeLog index c8e04e6f..6500f2a2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,7 @@ 0.23 * Added strings.h header for Solaris 9 * Solaris 64bit fix. + * Support for weighted Ketama from Yin Chen. 0.22 Mon Jul 14 09:24:11 PDT 2008 * Fix where master key was no being checked for "bad key" diff --git a/THANKS b/THANKS index bf6975fa..9221e3a6 100644 --- a/THANKS +++ b/THANKS @@ -9,4 +9,4 @@ Kevin Dalley - Bug Fixes Patrick Galbraith - work on C++ interface Ross McFarland - Idea for sorting servers. Marcelo Fernandez - TCP/IP timeout pieces -Yin Chen - Ketama support +Yin Chen - Ketama support/weighted support diff --git a/docs/memcached_behavior.pod b/docs/memcached_behavior.pod index c33b2af5..f717984d 100755 --- a/docs/memcached_behavior.pod +++ b/docs/memcached_behavior.pod @@ -77,6 +77,16 @@ Support CAS operations (this is not enabled by default at this point in the serv Sets the default distribution to MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA and the hash to MEMCACHED_HASH_MD5. +=item MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED + +Sets the default distribution to MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA with the weighted support. +and the hash to MEMCACHED_HASH_MD5. + +=item MEMCACHED_BEHAVIOR_KETAMA_HASH + +Sets the hashing algorithm for host mapping on continuum. The value can be set +to either MEMCACHED_HASH_DEFAULT, MEMCACHED_HASH_MD5, MEMCACHED_HASH_CRC, MEMCACHED_HASH_FNV1_64, MEMCACHED_HASH_FNV1A_64, MEMCACHED_HASH_FNV1_32, and MEMCACHED_HASH_FNV1A_32. + =item MEMCACHED_BEHAVIOR_POLL_TIMEOUT Modify the timeout value that is used by poll(). The default value is -1. An signed int pointer must be passed to memcached_behavior_set() to change this value. For memcached_behavior_get() a signed int value will be cast and returned as the unsigned long long. diff --git a/libmemcached/common.h b/libmemcached/common.h index 8cef723a..0c9b87ce 100644 --- a/libmemcached/common.h +++ b/libmemcached/common.h @@ -78,6 +78,8 @@ typedef enum { MEM_BUFFER_REQUESTS= (1 << 8), MEM_USE_SORT_HOSTS= (1 << 9), MEM_VERIFY_KEY= (1 << 10), + /* 11 used for weighted ketama */ + MEM_KETAMA_WEIGHTED= (1 << 11), } memcached_flags; /* Hashing algo */ diff --git a/libmemcached/memcached.c b/libmemcached/memcached.c index 2efcc7d5..dcf1a4a0 100644 --- a/libmemcached/memcached.c +++ b/libmemcached/memcached.c @@ -103,6 +103,7 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *ptr) new_clone->retry_timeout= ptr->retry_timeout; new_clone->distribution= ptr->distribution; new_clone->hash= ptr->hash; + new_clone->hash_continuum= ptr->hash_continuum; new_clone->user_data= ptr->user_data; new_clone->on_clone= ptr->on_clone; diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index d1daf775..911b0444 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -93,6 +93,8 @@ struct memcached_st { memcached_trigger_delete_key delete_trigger; char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE]; size_t prefix_key_length; + memcached_hash hash_continuum; + uint32_t continuum_points_counter; }; diff --git a/libmemcached/memcached_behavior.c b/libmemcached/memcached_behavior.c index e89473cf..c001c461 100644 --- a/libmemcached/memcached_behavior.c +++ b/libmemcached/memcached_behavior.c @@ -58,9 +58,21 @@ memcached_return memcached_behavior_set(memcached_st *ptr, run_distribution(ptr); break; } + case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED: + { + ptr->hash= MEMCACHED_HASH_MD5; + ptr->distribution= MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA; + set_behavior_flag(ptr, MEM_KETAMA_WEIGHTED, data); + run_distribution(ptr); + break; + } case MEMCACHED_BEHAVIOR_HASH: ptr->hash= (memcached_hash)(data); break; + case MEMCACHED_BEHAVIOR_KETAMA_HASH: + ptr->hash_continuum= (memcached_hash)(data); + run_distribution(ptr); + break; case MEMCACHED_BEHAVIOR_CACHE_LOOKUPS: set_behavior_flag(ptr, MEM_USE_CACHE_LOOKUPS, data); memcached_quit(ptr); @@ -124,12 +136,17 @@ uint64_t memcached_behavior_get(memcached_st *ptr, case MEMCACHED_BEHAVIOR_VERIFY_KEY: temp_flag= MEM_VERIFY_KEY; break; + case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED: + temp_flag= MEM_KETAMA_WEIGHTED; + break; case MEMCACHED_BEHAVIOR_DISTRIBUTION: return ptr->distribution; case MEMCACHED_BEHAVIOR_KETAMA: - return (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA && ptr->hash == MEMCACHED_HASH_MD5 ) ? 1 : 0; + return (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA) ? 1 : 0; case MEMCACHED_BEHAVIOR_HASH: return ptr->hash; + case MEMCACHED_BEHAVIOR_KETAMA_HASH: + return ptr->hash_continuum; case MEMCACHED_BEHAVIOR_SORT_HOSTS: temp_flag= MEM_USE_SORT_HOSTS; break; diff --git a/libmemcached/memcached_constants.h b/libmemcached/memcached_constants.h index fd2f2b4d..69c7666b 100644 --- a/libmemcached/memcached_constants.h +++ b/libmemcached/memcached_constants.h @@ -86,6 +86,8 @@ typedef enum { MEMCACHED_BEHAVIOR_VERIFY_KEY, MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT, MEMCACHED_BEHAVIOR_RETRY_TIMEOUT, + MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, + MEMCACHED_BEHAVIOR_KETAMA_HASH, } memcached_behavior; typedef enum { diff --git a/libmemcached/memcached_hash.c b/libmemcached/memcached_hash.c index 5bb0fcee..75a1537a 100644 --- a/libmemcached/memcached_hash.c +++ b/libmemcached/memcached_hash.c @@ -11,18 +11,12 @@ static uint32_t FNV_32_PRIME= 16777619; static uint32_t internal_generate_hash(const char *key, size_t key_length); static uint32_t internal_generate_md5(const char *key, size_t key_length); -uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length) +uint32_t generate_hash_value(const char *key, size_t key_length, memcached_hash hash_algorithm) { uint32_t hash= 1; /* Just here to remove compile warning */ uint32_t x= 0; - - WATCHPOINT_ASSERT(ptr->number_of_hosts); - - if (ptr->number_of_hosts == 1) - return 0; - - switch (ptr->hash) + switch (hash_algorithm) { case MEMCACHED_HASH_DEFAULT: hash= internal_generate_hash(key, key_length); @@ -91,7 +85,20 @@ uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length) break; } } + return hash; +} + +uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length) +{ + uint32_t hash= 1; /* Just here to remove compile warning */ + + + WATCHPOINT_ASSERT(ptr->number_of_hosts); + if (ptr->number_of_hosts == 1) + return 0; + + hash= generate_hash_value(key, key_length, ptr->hash); WATCHPOINT_ASSERT(hash); return hash; } @@ -103,7 +110,7 @@ unsigned int dispatch_host(memcached_st *ptr, uint32_t hash) case MEMCACHED_DISTRIBUTION_CONSISTENT: case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA: { - uint32_t num= ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER; + uint32_t num= ptr->continuum_points_counter; WATCHPOINT_ASSERT(ptr->continuum); hash= hash; @@ -156,6 +163,7 @@ unsigned int dispatch_host(memcached_st *ptr, uint32_t hash) uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_length) { uint32_t hash= 1; /* Just here to remove compile warning */ + uint32_t result= 1; WATCHPOINT_ASSERT(ptr->number_of_hosts); @@ -165,6 +173,7 @@ uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_ hash = generate_hash(ptr, key, key_length); WATCHPOINT_ASSERT(hash); + return dispatch_host(ptr, hash); } diff --git a/libmemcached/memcached_hosts.c b/libmemcached/memcached_hosts.c index d75da5d2..55d7a49d 100644 --- a/libmemcached/memcached_hosts.c +++ b/libmemcached/memcached_hosts.c @@ -1,4 +1,5 @@ #include "common.h" +#include /* Protoypes (static) */ static memcached_return server_add(memcached_st *ptr, char *hostname, @@ -107,6 +108,12 @@ memcached_return update_continuum(memcached_st *ptr) uint32_t continuum_index= 0; uint32_t value; memcached_server_st *list; + uint32_t pointer_counter= 0; + uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER; + memcached_return rc; + uint64_t total_mem_bytes= 0; + memcached_stat_st *stat_p= NULL; + uint32_t is_ketama_weighted= 0; if (ptr->number_of_hosts > ptr->continuum_count) { @@ -125,9 +132,29 @@ memcached_return update_continuum(memcached_st *ptr) } list = ptr->hosts; + + is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); + if(is_ketama_weighted) + { + stat_p = memcached_stat(ptr, NULL, &rc); + for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) + { + list[host_index].limit_maxbytes= (stat_p + host_index)->limit_maxbytes; + total_mem_bytes += (stat_p + host_index)->limit_maxbytes; + } + } + for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) { - for(index= 1; index <= MEMCACHED_POINTS_PER_SERVER; ++index) + if(is_ketama_weighted) + { + float pct = (float)list[host_index].limit_maxbytes/ (float)total_mem_bytes; + pointer_per_server= floorf( pct * MEMCACHED_POINTS_PER_SERVER * (float)(ptr->number_of_hosts)); +#ifdef HAVE_DEBUG + printf("ketama_weighted:%s|%d|%llu|%u\n", list[host_index].hostname, list[host_index].port, list[host_index].limit_maxbytes, pointer_per_server); +#endif + } + for(index= 1; index <= pointer_per_server; ++index) { char sort_host[MEMCACHED_MAX_HOST_SORT_LENGTH]= ""; size_t sort_host_length; @@ -135,14 +162,19 @@ memcached_return update_continuum(memcached_st *ptr) sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d", list[host_index].hostname, list[host_index].port, index); WATCHPOINT_ASSERT(sort_host_length); - value= generate_hash(ptr, sort_host, sort_host_length); + value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum); ptr->continuum[continuum_index].index= host_index; ptr->continuum[continuum_index++].value= value; } + pointer_counter+= pointer_per_server; } WATCHPOINT_ASSERT(ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE); qsort(ptr->continuum, ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER, sizeof(memcached_continuum_item_st), continuum_item_cmp); + + ptr->continuum_points_counter= pointer_counter; + memcached_stat_free(NULL, stat_p); + #ifdef HAVE_DEBUG for (index= 0; index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++) { diff --git a/libmemcached/memcached_server.h b/libmemcached/memcached_server.h index f0eff961..c566ddbe 100644 --- a/libmemcached/memcached_server.h +++ b/libmemcached/memcached_server.h @@ -34,6 +34,7 @@ struct memcached_server_st { uint16_t count; time_t next_retry; memcached_st *root; + uint64_t limit_maxbytes; }; #define memcached_server_count(A) (A)->number_of_hosts diff --git a/tests/function.c b/tests/function.c index 59e1f2b9..d83b9729 100644 --- a/tests/function.c +++ b/tests/function.c @@ -24,8 +24,9 @@ #include "test.h" -#define GLOBAL_COUNT 100000 -#define GLOBAL2_COUNT 1000 +#define GLOBAL_COUNT 10000 +#define GLOBAL2_COUNT 100 +#define SERVERS_TO_CREATE 5 static uint32_t global_count; static pairs_st *global_pairs; @@ -2237,6 +2238,23 @@ test_return generate_data(memcached_st *memc) return 0; } +test_return generate_data_with_stats(memcached_st *memc) +{ + memcached_stat_st *stat_p= NULL; + memcached_return rc; + int host_index= 0; + execute_set(memc, global_pairs, global_count); + + //TODO: hosts used size stats + stat_p = memcached_stat(memc, NULL, &rc); + for (host_index = 0; host_index < SERVERS_TO_CREATE; ++host_index) + { + printf("\nserver %d|%s|%d bytes: %lld\n", host_index, (memc->hosts)[host_index].hostname, (memc->hosts)[host_index].port, (stat_p + host_index)->bytes); + } + + + return 0; +} test_return generate_buffer_data(memcached_st *memc) { int latch= 0; @@ -2521,6 +2539,24 @@ memcached_return pre_behavior_ketama(memcached_st *memc) return MEMCACHED_SUCCESS; } +memcached_return pre_behavior_ketama_weighted(memcached_st *memc) +{ + memcached_return rc; + uint64_t value; + + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1); + assert(rc == MEMCACHED_SUCCESS); + + value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); + assert(value == 1); + + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH, MEMCACHED_HASH_MD5); + assert(rc == MEMCACHED_SUCCESS); + + value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH); + assert(value == MEMCACHED_HASH_MD5); + return MEMCACHED_SUCCESS; +} void my_free(memcached_st *ptr, void *mem) { free(mem); @@ -2862,6 +2898,14 @@ test_st consistent_tests[] ={ {0, 0, 0} }; +test_st consistent_weighted_tests[] ={ + {"generate_pairs", 1, generate_pairs }, + {"generate_data", 1, generate_data_with_stats }, + {"get_read", 0, get_read_count }, + {"cleanup", 1, cleanup_pairs }, + {0, 0, 0} +}; + collection_st collection[] ={ {"block", 0, 0, tests}, {"nonblock", pre_nonblock, 0, tests}, @@ -2896,6 +2940,7 @@ collection_st collection[] ={ {"generate_nonblock", pre_nonblock, 0, generate_tests}, {"consistent_not", 0, 0, consistent_tests}, {"consistent_ketama", pre_behavior_ketama, 0, consistent_tests}, + {"consistent_ketama_weighted", pre_behavior_ketama_weighted, 0, consistent_weighted_tests}, {0, 0, 0, 0} }; diff --git a/tests/server.c b/tests/server.c index 1248981b..29f6699d 100644 --- a/tests/server.c +++ b/tests/server.c @@ -37,10 +37,20 @@ void server_startup(server_startup_st *construct) int count; int status; - if (construct->udp) - sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u", x, x+ TEST_PORT_BASE); - else - sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u", x, x+ TEST_PORT_BASE); + if (construct->udp){ + if(x == 0) { + sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u -m 128", x, x+ TEST_PORT_BASE); + } else { + sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u", x, x+ TEST_PORT_BASE); + } + } + else{ + if(x == 0) { + sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u -m 128", x, x+ TEST_PORT_BASE); + } else { + sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u", x, x+ TEST_PORT_BASE); + } + } status= system(buffer); count= sprintf(end_ptr, "localhost:%u,", x + TEST_PORT_BASE); end_ptr+= count;