From 111fdf4a1cc4fc5daa71cbfcebd126279ce26293 Mon Sep 17 00:00:00 2001 From: Date: Tue, 4 Nov 2008 03:45:47 +0530 Subject: [PATCH] Incomming code for additions in keys (see Changelog) --- ChangeLog | 4 + configure.ac | 2 +- libmemcached/common.h | 5 +- libmemcached/memcached.h | 23 ++++- libmemcached/memcached_behavior.c | 11 +++ libmemcached/memcached_callback.c | 2 +- libmemcached/memcached_constants.h | 11 ++- libmemcached/memcached_hash.c | 48 +++++----- libmemcached/memcached_hosts.c | 143 +++++++++++++++++++++-------- libmemcached/memcached_io.c | 8 +- libmemcached/memcached_parse.c | 15 ++- libmemcached/memcached_response.c | 2 +- libmemcached/memcached_server.c | 2 +- libmemcached/memcached_server.h | 1 + tests/function.c | 83 +++++++++++++---- 15 files changed, 267 insertions(+), 93 deletions(-) diff --git a/ChangeLog b/ChangeLog index 4c885c94..327602cb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,10 @@ 0.25 * Jenkins HASH added. * Update of Murmur hash code + * Support explicit weights (Robey Pointer, Evan Weaver) + * Bugfix for ketama continuum (Robey Pointer) + * New behavior MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY (Robey Pointer) + * Don't ever call stats for weighting servers, because it is unstable. 0.24 Tue Sep 16 02:59:03 PDT 2008 (never released) * Cleanup compile warnings. diff --git a/configure.ac b/configure.ac index 0decc23c..5206a6a5 100644 --- a/configure.ac +++ b/configure.ac @@ -7,7 +7,7 @@ MEMCACHED_LIBRARY_NAME=libmemcached #release versioning MEMCACHED_MAJOR_VERSION=0 -MEMCACHED_MINOR_VERSION=24 +MEMCACHED_MINOR_VERSION=25 MEMCACHED_MICRO_VERSION=0 #API version diff --git a/libmemcached/common.h b/libmemcached/common.h index b64af723..b8c8acf5 100644 --- a/libmemcached/common.h +++ b/libmemcached/common.h @@ -81,7 +81,8 @@ typedef enum { MEM_VERIFY_KEY= (1 << 10), /* 11 used for weighted ketama */ MEM_KETAMA_WEIGHTED= (1 << 11), - MEM_BINARY_PROTOCOL= (1 << 12) + MEM_BINARY_PROTOCOL= (1 << 12), + MEM_HASH_WITH_PREFIX_KEY= (1 << 13) } memcached_flags; /* Hashing algo */ @@ -125,7 +126,7 @@ extern uint64_t ntohll(uint64_t); extern uint64_t htonll(uint64_t); void host_reset(memcached_st *ptr, memcached_server_st *host, - const char *hostname, unsigned int port, + const char *hostname, unsigned int port, uint32_t weight, memcached_connection type); #endif /* __COMMON_H__ */ diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index f12975f6..96a4f15e 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -39,7 +39,7 @@ struct memcached_continuum_item_st { uint32_t value; }; -#define LIBMEMCACHED_VERSION_STRING "0.24" +#define LIBMEMCACHED_VERSION_STRING "0.25" struct memcached_stat_st { uint32_t pid; @@ -143,12 +143,29 @@ memcached_return memcached_server_add_unix_socket(memcached_st *ptr, const char *filename); memcached_return memcached_server_add(memcached_st *ptr, const char *hostname, unsigned int port); + +memcached_return memcached_server_add_udp_with_weight(memcached_st *ptr, + const char *hostname, + unsigned int port, + uint32_t weight); +memcached_return memcached_server_add_unix_socket_with_weight(memcached_st *ptr, + const char *filename, + uint32_t weight); +memcached_return memcached_server_add_with_weight(memcached_st *ptr, const char *hostname, + unsigned int port, + uint32_t weight); void memcached_server_list_free(memcached_server_st *ptr); memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *list); memcached_server_st *memcached_server_list_append(memcached_server_st *ptr, - const char *hostname, unsigned int port, - memcached_return *error); + const char *hostname, + unsigned int port, + memcached_return *error); +memcached_server_st *memcached_server_list_append_with_weight(memcached_server_st *ptr, + const char *hostname, + unsigned int port, + uint32_t weight, + memcached_return *error); unsigned int memcached_server_list_count(memcached_server_st *ptr); memcached_server_st *memcached_servers_parse(char *server_strings); diff --git a/libmemcached/memcached_behavior.c b/libmemcached/memcached_behavior.c index 0cd42efc..5f688440 100644 --- a/libmemcached/memcached_behavior.c +++ b/libmemcached/memcached_behavior.c @@ -1,4 +1,5 @@ #include "common.h" +#include #include #include #include @@ -58,6 +59,10 @@ memcached_return memcached_behavior_set(memcached_st *ptr, case MEMCACHED_BEHAVIOR_DISTRIBUTION: { ptr->distribution= (memcached_server_distribution)(data); + if (ptr->distribution == MEMCACHED_DISTRIBUTION_RANDOM) + { + srandom(time(NULL)); + } run_distribution(ptr); break; } @@ -124,6 +129,9 @@ memcached_return memcached_behavior_set(memcached_st *ptr, break; case MEMCACHED_BEHAVIOR_USER_DATA: return MEMCACHED_FAILURE; + case MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY: + set_behavior_flag(ptr, MEM_HASH_WITH_PREFIX_KEY, data); + break; } return MEMCACHED_SUCCESS; @@ -229,6 +237,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr, } case MEMCACHED_BEHAVIOR_USER_DATA: return MEMCACHED_FAILURE; + case MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY: + temp_flag= MEM_HASH_WITH_PREFIX_KEY; + break; } WATCHPOINT_ASSERT(temp_flag); /* Programming mistake if it gets this far */ diff --git a/libmemcached/memcached_callback.c b/libmemcached/memcached_callback.c index 5e36ddf6..d1c04242 100644 --- a/libmemcached/memcached_callback.c +++ b/libmemcached/memcached_callback.c @@ -21,7 +21,7 @@ memcached_return memcached_callback_set(memcached_st *ptr, { size_t key_length= strlen(key); - if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) + if (memcachd_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED) { return MEMCACHED_BAD_KEY_PROVIDED; } diff --git a/libmemcached/memcached_constants.h b/libmemcached/memcached_constants.h index fb8305f4..5fea0208 100644 --- a/libmemcached/memcached_constants.h +++ b/libmemcached/memcached_constants.h @@ -19,12 +19,13 @@ extern "C" { #define MEMCACHED_MAX_BUFFER 8196 #define MEMCACHED_MAX_HOST_LENGTH 64 #define MEMCACHED_MAX_HOST_SORT_LENGTH 86 /* Used for Ketama */ -#define MEMCACHED_POINTS_PER_SERVER 100 +#define MEMCACHED_POINTS_PER_SERVER 100 +#define MEMCACHED_POINTS_PER_SERVER_KETAMA 160 #define MEMCACHED_CONTINUUM_SIZE MEMCACHED_POINTS_PER_SERVER*100 /* This would then set max hosts to 100 */ #define MEMCACHED_STRIDE 4 #define MEMCACHED_DEFAULT_TIMEOUT 1000 #define MEMCACHED_CONTINUUM_ADDITION 10 /* How many extra slots we should build for in the continuum */ -#define MEMCACHED_PREFIX_KEY_MAX_SIZE 12 +#define MEMCACHED_PREFIX_KEY_MAX_SIZE 128 typedef enum { MEMCACHED_SUCCESS, @@ -66,7 +67,8 @@ typedef enum { typedef enum { MEMCACHED_DISTRIBUTION_MODULA, MEMCACHED_DISTRIBUTION_CONSISTENT, - MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA + MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA, + MEMCACHED_DISTRIBUTION_RANDOM } memcached_server_distribution; typedef enum { @@ -93,7 +95,8 @@ typedef enum { MEMCACHED_BEHAVIOR_RCV_TIMEOUT, MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT, MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK, - MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK + MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK, + MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY } memcached_behavior; typedef enum { diff --git a/libmemcached/memcached_hash.c b/libmemcached/memcached_hash.c index 901f2c48..ff7d0e39 100644 --- a/libmemcached/memcached_hash.c +++ b/libmemcached/memcached_hash.c @@ -121,37 +121,25 @@ static uint32_t dispatch_host(memcached_st *ptr, uint32_t hash) hash= hash; memcached_continuum_item_st *begin, *end, *left, *right, *middle; begin= left= ptr->continuum; - end= right= ptr->continuum + (num - 1); + end= right= ptr->continuum + num; - while (1) + while (left < right) { - memcached_continuum_item_st *rmiddle; - - middle = left + (right - left) / 2; - - if (middle==end) - return begin->index; - - if (middle==begin) - return end->index; - - rmiddle = middle+1; - - if (hashvalue && hash>=middle->value) - return middle->index; - + middle= left + (right - left) / 2; if (middle->value < hash) - left = middle + 1; - else if (middle->value > hash) - right = middle - 1; - - if (left>right) - return left->index; + left= middle + 1; + else + right= middle; } + if (right > end) + right= begin; + return right->index; } break; case MEMCACHED_DISTRIBUTION_MODULA: return hash % ptr->number_of_hosts; + case MEMCACHED_DISTRIBUTION_RANDOM: + return random() % ptr->number_of_hosts; default: WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */ return hash % ptr->number_of_hosts; @@ -174,7 +162,19 @@ uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_ if (ptr->number_of_hosts == 1) return 0; - hash= generate_hash(ptr, key, key_length); + if (ptr->flags & MEM_HASH_WITH_PREFIX_KEY) + { + int temp_len= ptr->prefix_key_length + key_length; + char *temp= (char *)malloc(temp_len); + strncpy(temp, ptr->prefix_key, ptr->prefix_key_length); + strncpy(temp + ptr->prefix_key_length, key, key_length); + hash= generate_hash(ptr, temp, temp_len); + free(temp); + } + else + { + hash= generate_hash(ptr, key, key_length); + } WATCHPOINT_ASSERT(hash); diff --git a/libmemcached/memcached_hosts.c b/libmemcached/memcached_hosts.c index f7a7dfcf..79e88701 100644 --- a/libmemcached/memcached_hosts.c +++ b/libmemcached/memcached_hosts.c @@ -4,6 +4,7 @@ /* Protoypes (static) */ static memcached_return server_add(memcached_st *ptr, const char *hostname, unsigned int port, + uint32_t weight, memcached_connection type); memcached_return update_continuum(memcached_st *ptr); @@ -44,6 +45,8 @@ memcached_return run_distribution(memcached_st *ptr) if (ptr->flags & MEM_USE_SORT_HOSTS) sort_hosts(ptr); break; + case MEMCACHED_DISTRIBUTION_RANDOM: + break; default: WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */ } @@ -52,13 +55,14 @@ memcached_return run_distribution(memcached_st *ptr) } void host_reset(memcached_st *ptr, memcached_server_st *host, - const char *hostname, unsigned int port, + const char *hostname, unsigned int port, uint32_t weight, memcached_connection type) { memset(host, 0, sizeof(memcached_server_st)); strncpy(host->hostname, hostname, MEMCACHED_MAX_HOST_LENGTH - 1); host->root= ptr ? ptr : NULL; host->port= port; + host->weight= weight; host->fd= -1; host->type= type; host->read_ptr= host->read_buffer; @@ -87,6 +91,17 @@ void server_list_free(memcached_st *ptr, memcached_server_st *servers) free(servers); } +static uint32_t ketama_server_hash(const char *key, unsigned int key_length, int alignment) +{ + unsigned char results[16]; + + md5_signature((unsigned char*)key, key_length, results); + return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24) + | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16) + | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8) + | (results[0 + alignment * 4] & 0xFF); +} + static int continuum_item_cmp(const void *t1, const void *t2) { memcached_continuum_item_st *ct1= (memcached_continuum_item_st *)t1; @@ -111,19 +126,22 @@ memcached_return update_continuum(memcached_st *ptr) memcached_server_st *list; uint32_t pointer_counter= 0; uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER; - memcached_return rc; - uint64_t total_mem_bytes= 0; - memcached_stat_st *stat_p= NULL; + uint32_t pointer_per_hash= 1; + uint64_t total_weight= 0; uint32_t is_ketama_weighted= 0; + uint32_t points_per_server= 0; + + is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); + points_per_server= is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER; if (ptr->number_of_hosts > ptr->continuum_count) { memcached_continuum_item_st *new_ptr; if (ptr->call_realloc) - new_ptr= (memcached_continuum_item_st *)ptr->call_realloc(ptr, ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * MEMCACHED_POINTS_PER_SERVER); + new_ptr= (memcached_continuum_item_st *)ptr->call_realloc(ptr, ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server); else - new_ptr= (memcached_continuum_item_st *)realloc(ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * MEMCACHED_POINTS_PER_SERVER); + new_ptr= (memcached_continuum_item_st *)realloc(ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server); if (new_ptr == 0) return MEMCACHED_MEMORY_ALLOCATION_FAILURE; @@ -134,58 +152,79 @@ memcached_return update_continuum(memcached_st *ptr) list = ptr->hosts; - is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); - if(is_ketama_weighted) + if (is_ketama_weighted) { - stat_p = memcached_stat(ptr, NULL, &rc); for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) { - list[host_index].limit_maxbytes= (stat_p + host_index)->limit_maxbytes; - total_mem_bytes += (stat_p + host_index)->limit_maxbytes; + if (list[host_index].weight == 0) + { + list[host_index].weight = 1; + } + total_weight += list[host_index].weight; } } for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) { - if(is_ketama_weighted) + if (is_ketama_weighted) { - float pct = (float)list[host_index].limit_maxbytes/ (float)total_mem_bytes; - pointer_per_server= floorf( pct * MEMCACHED_POINTS_PER_SERVER * (float)(ptr->number_of_hosts)); + float pct = (float)list[host_index].weight / (float)total_weight; + pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)(ptr->number_of_hosts) + 0.0000000001) * 4; + pointer_per_hash= 4; #ifdef HAVE_DEBUG printf("ketama_weighted:%s|%d|%llu|%u\n", list[host_index].hostname, list[host_index].port, - (unsigned long long)list[host_index].limit_maxbytes, + (unsigned long long)list[host_index].weight, pointer_per_server); #endif } - for(index= 1; index <= pointer_per_server; ++index) + for (index= 1; index <= pointer_per_server / pointer_per_hash; ++index) { char sort_host[MEMCACHED_MAX_HOST_SORT_LENGTH]= ""; size_t sort_host_length; - sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d", - list[host_index].hostname, list[host_index].port, index); + if (list[host_index].port == MEMCACHED_DEFAULT_PORT) + { + sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s-%d", + list[host_index].hostname, index - 1); + + } + else + { + sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d", + list[host_index].hostname, list[host_index].port, index - 1); + } WATCHPOINT_ASSERT(sort_host_length); - value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum); - ptr->continuum[continuum_index].index= host_index; - ptr->continuum[continuum_index++].value= value; + + if (is_ketama_weighted) + { + int i; + for (i = 0; i < pointer_per_hash; i++) + { + value= ketama_server_hash(sort_host, sort_host_length, i); + ptr->continuum[continuum_index].index= host_index; + ptr->continuum[continuum_index++].value= value; + } + } + else + { + value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum); + ptr->continuum[continuum_index].index= host_index; + ptr->continuum[continuum_index++].value= value; + } } pointer_counter+= pointer_per_server; } WATCHPOINT_ASSERT(ptr); WATCHPOINT_ASSERT(ptr->continuum); - WATCHPOINT_ASSERT(ptr->number_of_hosts); WATCHPOINT_ASSERT(ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE); ptr->continuum_points_counter= pointer_counter; qsort(ptr->continuum, ptr->continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp); - if (stat_p) - memcached_stat_free(NULL, stat_p); - #ifdef HAVE_DEBUG - for (index= 0; index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++) + for (index= 0; ptr->number_of_hosts && index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++) { WATCHPOINT_ASSERT(ptr->continuum[index].value <= ptr->continuum[index + 1].value); } @@ -205,7 +244,6 @@ memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *l return MEMCACHED_SUCCESS; count= list[0].count; - if (ptr->call_realloc) new_host_list= (memcached_server_st *)ptr->call_realloc(ptr, ptr->hosts, @@ -219,12 +257,12 @@ memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *l return MEMCACHED_MEMORY_ALLOCATION_FAILURE; ptr->hosts= new_host_list; - + for (x= 0; x < count; x++) { WATCHPOINT_ASSERT(list[x].hostname[0] != 0); host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], list[x].hostname, - list[x].port, list[x].type); + list[x].port, list[x].weight, list[x].type); ptr->number_of_hosts++; } ptr->hosts[0].count= ptr->number_of_hosts; @@ -232,17 +270,33 @@ memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *l return run_distribution(ptr); } -memcached_return memcached_server_add_unix_socket(memcached_st *ptr, const char *filename) +memcached_return memcached_server_add_unix_socket(memcached_st *ptr, + const char *filename) +{ + return memcached_server_add_unix_socket_with_weight(ptr, filename, 0); +} + +memcached_return memcached_server_add_unix_socket_with_weight(memcached_st *ptr, + const char *filename, + uint32_t weight) { if (!filename) return MEMCACHED_FAILURE; - return server_add(ptr, filename, 0, MEMCACHED_CONNECTION_UNIX_SOCKET); + return server_add(ptr, filename, 0, weight, MEMCACHED_CONNECTION_UNIX_SOCKET); } memcached_return memcached_server_add_udp(memcached_st *ptr, const char *hostname, unsigned int port) +{ + return memcached_server_add_udp_with_weight(ptr, hostname, port, 0); +} + +memcached_return memcached_server_add_udp_with_weight(memcached_st *ptr, + const char *hostname, + unsigned int port, + uint32_t weight) { if (!port) port= MEMCACHED_DEFAULT_PORT; @@ -250,12 +304,20 @@ memcached_return memcached_server_add_udp(memcached_st *ptr, if (!hostname) hostname= "localhost"; - return server_add(ptr, hostname, port, MEMCACHED_CONNECTION_UDP); + return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_UDP); } memcached_return memcached_server_add(memcached_st *ptr, const char *hostname, unsigned int port) +{ + return memcached_server_add_with_weight(ptr, hostname, port, 0); +} + +memcached_return memcached_server_add_with_weight(memcached_st *ptr, + const char *hostname, + unsigned int port, + uint32_t weight) { if (!port) port= MEMCACHED_DEFAULT_PORT; @@ -263,11 +325,12 @@ memcached_return memcached_server_add(memcached_st *ptr, if (!hostname) hostname= "localhost"; - return server_add(ptr, hostname, port, MEMCACHED_CONNECTION_TCP); + return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_TCP); } static memcached_return server_add(memcached_st *ptr, const char *hostname, unsigned int port, + uint32_t weight, memcached_connection type) { memcached_server_st *new_host_list; @@ -283,7 +346,7 @@ static memcached_return server_add(memcached_st *ptr, const char *hostname, ptr->hosts= new_host_list; - host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], hostname, port, type); + host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], hostname, port, weight, type); ptr->number_of_hosts++; ptr->hosts[0].count= ptr->number_of_hosts; @@ -317,8 +380,16 @@ memcached_return memcached_server_remove(memcached_server_st *st_ptr) } memcached_server_st *memcached_server_list_append(memcached_server_st *ptr, - const char *hostname, unsigned int port, + const char *hostname, unsigned int port, memcached_return *error) +{ + return memcached_server_list_append_with_weight(ptr, hostname, port, 0, error); +} + +memcached_server_st *memcached_server_list_append_with_weight(memcached_server_st *ptr, + const char *hostname, unsigned int port, + uint32_t weight, + memcached_return *error) { unsigned int count; memcached_server_st *new_host_list; @@ -343,7 +414,7 @@ memcached_server_st *memcached_server_list_append(memcached_server_st *ptr, return NULL; } - host_reset(NULL, &new_host_list[count-1], hostname, port, MEMCACHED_CONNECTION_TCP); + host_reset(NULL, &new_host_list[count-1], hostname, port, weight, MEMCACHED_CONNECTION_TCP); /* Backwards compatibility hack */ new_host_list[0].count= count; diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index ddca1972..da845912 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -232,7 +232,10 @@ memcached_return memcached_io_close(memcached_server_st *ptr) #endif r= close(ptr->fd); - WATCHPOINT_ASSERT(r == 0); +#ifdef HAVE_DEBUG + if (r != 0) + WATCHPOINT_ERRNO(errno); +#endif return MEMCACHED_SUCCESS; } @@ -334,7 +337,8 @@ static ssize_t io_flush(memcached_server_st *ptr, } WATCHPOINT_ASSERT(write_length == 0); - WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset); + // Need to study this assert() WATCHPOINT_ASSERT(return_length == + // ptr->write_buffer_offset); ptr->write_buffer_offset= 0; return return_length; diff --git a/libmemcached/memcached_parse.c b/libmemcached/memcached_parse.c index fc6a4cb1..d3d364a0 100644 --- a/libmemcached/memcached_parse.c +++ b/libmemcached/memcached_parse.c @@ -12,6 +12,7 @@ memcached_server_st *memcached_servers_parse(char *server_strings) { char *string; unsigned int port; + uint32_t weight; char *begin_ptr; char *end_ptr; memcached_server_st *servers= NULL; @@ -26,8 +27,9 @@ memcached_server_st *memcached_servers_parse(char *server_strings) string= index(begin_ptr, ',')) { char buffer[HUGE_STRING_LEN]; - char *ptr; + char *ptr, *ptr2; port= 0; + weight= 0; if (string) { @@ -52,9 +54,18 @@ memcached_server_st *memcached_servers_parse(char *server_strings) ptr++; port= strtoul(ptr, (char **)NULL, 10); + + ptr2= index(ptr, ' '); + if (! ptr2) + ptr2= index(ptr, ':'); + if (ptr2) + { + ptr2++; + weight = strtoul(ptr2, (char **)NULL, 10); + } } - servers= memcached_server_list_append(servers, buffer, port, &rc); + servers= memcached_server_list_append_with_weight(servers, buffer, port, weight, &rc); if (isspace(*begin_ptr)) begin_ptr++; diff --git a/libmemcached/memcached_response.c b/libmemcached/memcached_response.c index b103b34a..764dbdf0 100644 --- a/libmemcached/memcached_response.c +++ b/libmemcached/memcached_response.c @@ -53,7 +53,7 @@ memcached_return memcached_response(memcached_server_st *ptr, ssize_t read_length; read_length= memcached_io_read(ptr, buffer_ptr, 1); - WATCHPOINT_ASSERT(isgraph(*buffer_ptr) || isspace(*buffer_ptr)); + WATCHPOINT_ASSERT(*buffer_ptr != '\0'); if (read_length != 1) { diff --git a/libmemcached/memcached_server.c b/libmemcached/memcached_server.c index 1ee60101..fd022355 100644 --- a/libmemcached/memcached_server.c +++ b/libmemcached/memcached_server.c @@ -75,7 +75,7 @@ memcached_server_st *memcached_server_clone(memcached_server_st *clone, memcache new_clone->root= ptr->root; host_reset(new_clone->root, new_clone, - ptr->hostname, ptr->port, + ptr->hostname, ptr->port, ptr->weight, ptr->type); return new_clone; diff --git a/libmemcached/memcached_server.h b/libmemcached/memcached_server.h index d5ed2692..c2bdcdb1 100644 --- a/libmemcached/memcached_server.h +++ b/libmemcached/memcached_server.h @@ -38,6 +38,7 @@ struct memcached_server_st { uint64_t limit_maxbytes; uint32_t server_failure_counter; uint32_t io_bytes_sent; /* # bytes sent since last read */ + uint32_t weight; }; #define memcached_server_count(A) (A)->number_of_hosts diff --git a/tests/function.c b/tests/function.c index 4483232c..4ec247f6 100644 --- a/tests/function.c +++ b/tests/function.c @@ -48,13 +48,13 @@ static test_return server_list_null_test(memcached_st *ptr __attribute__((unuse memcached_server_st *server_list; memcached_return rc; - server_list= memcached_server_list_append(NULL, NULL, 0, NULL); + server_list= memcached_server_list_append_with_weight(NULL, NULL, 0, 0, NULL); assert(server_list == NULL); - server_list= memcached_server_list_append(NULL, "localhost", 0, NULL); + server_list= memcached_server_list_append_with_weight(NULL, "localhost", 0, 0, NULL); assert(server_list == NULL); - server_list= memcached_server_list_append(NULL, NULL, 0, &rc); + server_list= memcached_server_list_append_with_weight(NULL, NULL, 0, 0, &rc); assert(server_list == NULL); return 0; @@ -88,7 +88,7 @@ static test_return server_sort_test(memcached_st *ptr __attribute__((unused))) for (x= 0; x < TEST_PORT_COUNT; x++) { test_ports[x]= random() % 64000; - rc= memcached_server_add(local_memc, "localhost", test_ports[x]); + rc= memcached_server_add_with_weight(local_memc, "localhost", test_ports[x], 0); assert(local_memc->number_of_hosts == x + 1); assert(local_memc->hosts[0].count == x+1); assert(rc == MEMCACHED_SUCCESS); @@ -115,11 +115,11 @@ static test_return server_sort2_test(memcached_st *ptr __attribute__((unused))) rc= memcached_behavior_set(local_memc, MEMCACHED_BEHAVIOR_SORT_HOSTS, 1); assert(rc == MEMCACHED_SUCCESS); - rc= memcached_server_add(local_memc, "MEMCACHED_BEHAVIOR_SORT_HOSTS", 43043); + rc= memcached_server_add_with_weight(local_memc, "MEMCACHED_BEHAVIOR_SORT_HOSTS", 43043, 0); assert(rc == MEMCACHED_SUCCESS); assert(local_memc->hosts[0].port == 43043); - rc= memcached_server_add(local_memc, "MEMCACHED_BEHAVIOR_SORT_HOSTS", 43042); + rc= memcached_server_add_with_weight(local_memc, "MEMCACHED_BEHAVIOR_SORT_HOSTS", 43042, 0); assert(rc == MEMCACHED_SUCCESS); assert(local_memc->hosts[0].port == 43042); assert(local_memc->hosts[1].port == 43043); @@ -159,7 +159,7 @@ static test_return server_unsort_test(memcached_st *ptr __attribute__((unused)) for (x= 0; x < TEST_PORT_COUNT; x++) { test_ports[x]= random() % 64000; - rc= memcached_server_add(local_memc, "localhost", test_ports[x]); + rc= memcached_server_add_with_weight(local_memc, "localhost", test_ports[x], 0); assert(local_memc->number_of_hosts == x+1); assert(local_memc->hosts[0].count == x+1); assert(rc == MEMCACHED_SUCCESS); @@ -234,7 +234,7 @@ static test_return connection_test(memcached_st *memc) { memcached_return rc; - rc= memcached_server_add(memc, "localhost", 0); + rc= memcached_server_add_with_weight(memc, "localhost", 0, 0); assert(rc == MEMCACHED_SUCCESS); return 0; @@ -1238,7 +1238,7 @@ static test_return add_host_test(memcached_st *memc) memcached_return rc; char servername[]= "0.example.com"; - servers= memcached_server_list_append(NULL, servername, 400, &rc); + servers= memcached_server_list_append_with_weight(NULL, servername, 400, 0, &rc); assert(servers); assert(1 == memcached_server_list_count(servers)); @@ -1247,7 +1247,7 @@ static test_return add_host_test(memcached_st *memc) char buffer[SMALL_STRING_LEN]; snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x); - servers= memcached_server_list_append(servers, buffer, 401, + servers= memcached_server_list_append_with_weight(servers, buffer, 401, 0, &rc); assert(rc == MEMCACHED_SUCCESS); assert(x == memcached_server_list_count(servers)); @@ -2123,6 +2123,54 @@ static test_return user_supplied_bug17(memcached_st *memc) return 0; } +#include "ketama_test_cases.h" +test_return user_supplied_bug18(memcached_st *memc) +{ + memcached_return rc; + int value; + int i; + memcached_server_st *server_pool; + + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1); + assert(rc == MEMCACHED_SUCCESS); + + value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); + assert(value == 1); + + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH, MEMCACHED_HASH_MD5); + assert(rc == MEMCACHED_SUCCESS); + + value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH); + assert(value == MEMCACHED_HASH_MD5); + + while (memc->number_of_hosts > 0) + { + memcached_server_remove(memc->hosts); + } + server_pool = memcached_servers_parse("10.0.1.1:11211 600,10.0.1.2:11211 300,10.0.1.3:11211 200,10.0.1.4:11211 350,10.0.1.5:11211 1000,10.0.1.6:11211 800,10.0.1.7:11211 950,10.0.1.8:11211 100"); + memcached_server_push(memc, server_pool); + + /* verify that the server list was parsed okay. */ + assert(memc->number_of_hosts == 8); + assert(strcmp(server_pool[0].hostname, "10.0.1.1") == 0); + assert(server_pool[0].port == 11211); + assert(server_pool[0].weight == 600); + assert(strcmp(server_pool[2].hostname, "10.0.1.3") == 0); + assert(server_pool[2].port == 11211); + assert(server_pool[2].weight == 200); + assert(strcmp(server_pool[7].hostname, "10.0.1.8") == 0); + assert(server_pool[7].port == 11211); + assert(server_pool[7].weight == 100); + + /* verify the standard ketama set. */ + for (i= 0; i < 99; i++) + { + uint32_t server_idx = memcached_generate_hash(memc, test_cases[i].key, strlen(test_cases[i].key)); + char *hostname = memc->hosts[server_idx].hostname; + assert(strcmp(hostname, test_cases[i].server) == 0); + } + return 0; +} static test_return result_static(memcached_st *memc) { @@ -2325,7 +2373,7 @@ static test_return get_read_count(memcached_st *memc) clone= memcached_clone(NULL, memc); assert(clone); - memcached_server_add(clone, "localhost", 6666); + memcached_server_add_with_weight(clone, "localhost", 6666, 0); { char *return_value; @@ -2487,7 +2535,7 @@ static test_return add_host_test1(memcached_st *memc) char servername[]= "0.example.com"; memcached_server_st *servers; - servers= memcached_server_list_append(NULL, servername, 400, &rc); + servers= memcached_server_list_append_with_weight(NULL, servername, 400, 0, &rc); assert(servers); assert(1 == memcached_server_list_count(servers)); @@ -2496,7 +2544,7 @@ static test_return add_host_test1(memcached_st *memc) char buffer[SMALL_STRING_LEN]; snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x); - servers= memcached_server_list_append(servers, buffer, 401, + servers= memcached_server_list_append_with_weight(servers, buffer, 401, 0, &rc); assert(rc == MEMCACHED_SUCCESS); assert(x == memcached_server_list_count(servers)); @@ -2653,7 +2701,7 @@ static void *my_realloc(memcached_st *ptr __attribute__((unused)), void *mem, co return realloc(mem, size); } -static memcached_return set_prefix(memcached_st *memc) +static memcached_return set_prefix(memcached_st *memc) { memcached_return rc; const char *key= "mine"; @@ -2697,9 +2745,11 @@ static memcached_return set_prefix(memcached_st *memc) assert(value == NULL); /* Test a long key for failure */ + /* TODO, extend test to determine based on setting, what result should be */ long_key= "Thisismorethentheallottednumberofcharacters"; rc= memcached_callback_set(memc, MEMCACHED_CALLBACK_PREFIX_KEY, long_key); - assert(rc == MEMCACHED_BAD_KEY_PROVIDED); + //assert(rc == MEMCACHED_BAD_KEY_PROVIDED); + assert(rc == MEMCACHED_SUCCESS); /* Now test a key with spaces (which will fail from long key, since bad key is not set) */ long_key= "This is more then the allotted number of characters"; @@ -2813,7 +2863,7 @@ static memcached_return pre_unix_socket(memcached_st *memc) if (stat("/tmp/memcached.socket", &buf)) return MEMCACHED_FAILURE; - rc= memcached_server_add_unix_socket(memc, "/tmp/memcached.socket"); + rc= memcached_server_add_unix_socket_with_weight(memc, "/tmp/memcached.socket", 0); return rc; } @@ -2941,6 +2991,7 @@ test_st user_tests[] ={ {"user_supplied_bug15", 1, user_supplied_bug15 }, {"user_supplied_bug16", 1, user_supplied_bug16 }, {"user_supplied_bug17", 1, user_supplied_bug17 }, + {"user_supplied_bug18", 1, user_supplied_bug18 }, {0, 0, 0} }; -- 2.30.2