0.23
* Added strings.h header for Solaris 9
* Solaris 64bit fix.
+ * Support for weighted Ketama from Yin Chen.
0.22 Mon Jul 14 09:24:11 PDT 2008
* Fix where master key was no being checked for "bad key"
Patrick Galbraith - work on C++ interface
Ross McFarland - Idea for sorting servers.
Marcelo Fernandez - TCP/IP timeout pieces
-Yin Chen - Ketama support
+Yin Chen - Ketama support/weighted support
Sets the default distribution to MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA
and the hash to MEMCACHED_HASH_MD5.
+=item MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED
+
+Sets the default distribution to MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA with the weighted support.
+and the hash to MEMCACHED_HASH_MD5.
+
+=item MEMCACHED_BEHAVIOR_KETAMA_HASH
+
+Sets the hashing algorithm for host mapping on continuum. The value can be set
+to either MEMCACHED_HASH_DEFAULT, MEMCACHED_HASH_MD5, MEMCACHED_HASH_CRC, MEMCACHED_HASH_FNV1_64, MEMCACHED_HASH_FNV1A_64, MEMCACHED_HASH_FNV1_32, and MEMCACHED_HASH_FNV1A_32.
+
=item MEMCACHED_BEHAVIOR_POLL_TIMEOUT
Modify the timeout value that is used by poll(). The default value is -1. An signed int pointer must be passed to memcached_behavior_set() to change this value. For memcached_behavior_get() a signed int value will be cast and returned as the unsigned long long.
MEM_BUFFER_REQUESTS= (1 << 8),
MEM_USE_SORT_HOSTS= (1 << 9),
MEM_VERIFY_KEY= (1 << 10),
+ /* 11 used for weighted ketama */
+ MEM_KETAMA_WEIGHTED= (1 << 11),
} memcached_flags;
/* Hashing algo */
new_clone->retry_timeout= ptr->retry_timeout;
new_clone->distribution= ptr->distribution;
new_clone->hash= ptr->hash;
+ new_clone->hash_continuum= ptr->hash_continuum;
new_clone->user_data= ptr->user_data;
new_clone->on_clone= ptr->on_clone;
memcached_trigger_delete_key delete_trigger;
char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE];
size_t prefix_key_length;
+ memcached_hash hash_continuum;
+ uint32_t continuum_points_counter;
};
run_distribution(ptr);
break;
}
+ case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED:
+ {
+ ptr->hash= MEMCACHED_HASH_MD5;
+ ptr->distribution= MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA;
+ set_behavior_flag(ptr, MEM_KETAMA_WEIGHTED, data);
+ run_distribution(ptr);
+ break;
+ }
case MEMCACHED_BEHAVIOR_HASH:
ptr->hash= (memcached_hash)(data);
break;
+ case MEMCACHED_BEHAVIOR_KETAMA_HASH:
+ ptr->hash_continuum= (memcached_hash)(data);
+ run_distribution(ptr);
+ break;
case MEMCACHED_BEHAVIOR_CACHE_LOOKUPS:
set_behavior_flag(ptr, MEM_USE_CACHE_LOOKUPS, data);
memcached_quit(ptr);
case MEMCACHED_BEHAVIOR_VERIFY_KEY:
temp_flag= MEM_VERIFY_KEY;
break;
+ case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED:
+ temp_flag= MEM_KETAMA_WEIGHTED;
+ break;
case MEMCACHED_BEHAVIOR_DISTRIBUTION:
return ptr->distribution;
case MEMCACHED_BEHAVIOR_KETAMA:
- return (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA && ptr->hash == MEMCACHED_HASH_MD5 ) ? 1 : 0;
+ return (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA) ? 1 : 0;
case MEMCACHED_BEHAVIOR_HASH:
return ptr->hash;
+ case MEMCACHED_BEHAVIOR_KETAMA_HASH:
+ return ptr->hash_continuum;
case MEMCACHED_BEHAVIOR_SORT_HOSTS:
temp_flag= MEM_USE_SORT_HOSTS;
break;
MEMCACHED_BEHAVIOR_VERIFY_KEY,
MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT,
MEMCACHED_BEHAVIOR_RETRY_TIMEOUT,
+ MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED,
+ MEMCACHED_BEHAVIOR_KETAMA_HASH,
} memcached_behavior;
typedef enum {
static uint32_t internal_generate_hash(const char *key, size_t key_length);
static uint32_t internal_generate_md5(const char *key, size_t key_length);
-uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length)
+uint32_t generate_hash_value(const char *key, size_t key_length, memcached_hash hash_algorithm)
{
uint32_t hash= 1; /* Just here to remove compile warning */
uint32_t x= 0;
-
- WATCHPOINT_ASSERT(ptr->number_of_hosts);
-
- if (ptr->number_of_hosts == 1)
- return 0;
-
- switch (ptr->hash)
+ switch (hash_algorithm)
{
case MEMCACHED_HASH_DEFAULT:
hash= internal_generate_hash(key, key_length);
break;
}
}
+ return hash;
+}
+
+uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length)
+{
+ uint32_t hash= 1; /* Just here to remove compile warning */
+
+
+ WATCHPOINT_ASSERT(ptr->number_of_hosts);
+ if (ptr->number_of_hosts == 1)
+ return 0;
+
+ hash= generate_hash_value(key, key_length, ptr->hash);
WATCHPOINT_ASSERT(hash);
return hash;
}
case MEMCACHED_DISTRIBUTION_CONSISTENT:
case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA:
{
- uint32_t num= ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER;
+ uint32_t num= ptr->continuum_points_counter;
WATCHPOINT_ASSERT(ptr->continuum);
hash= hash;
uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_length)
{
uint32_t hash= 1; /* Just here to remove compile warning */
+ uint32_t result= 1;
WATCHPOINT_ASSERT(ptr->number_of_hosts);
hash = generate_hash(ptr, key, key_length);
WATCHPOINT_ASSERT(hash);
+
return dispatch_host(ptr, hash);
}
#include "common.h"
+#include <math.h>
/* Protoypes (static) */
static memcached_return server_add(memcached_st *ptr, char *hostname,
uint32_t continuum_index= 0;
uint32_t value;
memcached_server_st *list;
+ uint32_t pointer_counter= 0;
+ uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER;
+ memcached_return rc;
+ uint64_t total_mem_bytes= 0;
+ memcached_stat_st *stat_p= NULL;
+ uint32_t is_ketama_weighted= 0;
if (ptr->number_of_hosts > ptr->continuum_count)
{
}
list = ptr->hosts;
+
+ is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+ if(is_ketama_weighted)
+ {
+ stat_p = memcached_stat(ptr, NULL, &rc);
+ for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
+ {
+ list[host_index].limit_maxbytes= (stat_p + host_index)->limit_maxbytes;
+ total_mem_bytes += (stat_p + host_index)->limit_maxbytes;
+ }
+ }
+
for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
{
- for(index= 1; index <= MEMCACHED_POINTS_PER_SERVER; ++index)
+ if(is_ketama_weighted)
+ {
+ float pct = (float)list[host_index].limit_maxbytes/ (float)total_mem_bytes;
+ pointer_per_server= floorf( pct * MEMCACHED_POINTS_PER_SERVER * (float)(ptr->number_of_hosts));
+#ifdef HAVE_DEBUG
+ printf("ketama_weighted:%s|%d|%llu|%u\n", list[host_index].hostname, list[host_index].port, list[host_index].limit_maxbytes, pointer_per_server);
+#endif
+ }
+ for(index= 1; index <= pointer_per_server; ++index)
{
char sort_host[MEMCACHED_MAX_HOST_SORT_LENGTH]= "";
size_t sort_host_length;
sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d",
list[host_index].hostname, list[host_index].port, index);
WATCHPOINT_ASSERT(sort_host_length);
- value= generate_hash(ptr, sort_host, sort_host_length);
+ value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum);
ptr->continuum[continuum_index].index= host_index;
ptr->continuum[continuum_index++].value= value;
}
+ pointer_counter+= pointer_per_server;
}
WATCHPOINT_ASSERT(ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);
qsort(ptr->continuum, ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER, sizeof(memcached_continuum_item_st), continuum_item_cmp);
+
+ ptr->continuum_points_counter= pointer_counter;
+ memcached_stat_free(NULL, stat_p);
+
#ifdef HAVE_DEBUG
for (index= 0; index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++)
{
uint16_t count;
time_t next_retry;
memcached_st *root;
+ uint64_t limit_maxbytes;
};
#define memcached_server_count(A) (A)->number_of_hosts
#include "test.h"
-#define GLOBAL_COUNT 100000
-#define GLOBAL2_COUNT 1000
+#define GLOBAL_COUNT 10000
+#define GLOBAL2_COUNT 100
+#define SERVERS_TO_CREATE 5
static uint32_t global_count;
static pairs_st *global_pairs;
return 0;
}
+test_return generate_data_with_stats(memcached_st *memc)
+{
+ memcached_stat_st *stat_p= NULL;
+ memcached_return rc;
+ int host_index= 0;
+ execute_set(memc, global_pairs, global_count);
+
+ //TODO: hosts used size stats
+ stat_p = memcached_stat(memc, NULL, &rc);
+ for (host_index = 0; host_index < SERVERS_TO_CREATE; ++host_index)
+ {
+ printf("\nserver %d|%s|%d bytes: %lld\n", host_index, (memc->hosts)[host_index].hostname, (memc->hosts)[host_index].port, (stat_p + host_index)->bytes);
+ }
+
+
+ return 0;
+}
test_return generate_buffer_data(memcached_st *memc)
{
int latch= 0;
return MEMCACHED_SUCCESS;
}
+memcached_return pre_behavior_ketama_weighted(memcached_st *memc)
+{
+ memcached_return rc;
+ uint64_t value;
+
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+ assert(value == 1);
+
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH, MEMCACHED_HASH_MD5);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH);
+ assert(value == MEMCACHED_HASH_MD5);
+ return MEMCACHED_SUCCESS;
+}
void my_free(memcached_st *ptr, void *mem)
{
free(mem);
{0, 0, 0}
};
+test_st consistent_weighted_tests[] ={
+ {"generate_pairs", 1, generate_pairs },
+ {"generate_data", 1, generate_data_with_stats },
+ {"get_read", 0, get_read_count },
+ {"cleanup", 1, cleanup_pairs },
+ {0, 0, 0}
+};
+
collection_st collection[] ={
{"block", 0, 0, tests},
{"nonblock", pre_nonblock, 0, tests},
{"generate_nonblock", pre_nonblock, 0, generate_tests},
{"consistent_not", 0, 0, consistent_tests},
{"consistent_ketama", pre_behavior_ketama, 0, consistent_tests},
+ {"consistent_ketama_weighted", pre_behavior_ketama_weighted, 0, consistent_weighted_tests},
{0, 0, 0, 0}
};
int count;
int status;
- if (construct->udp)
- sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u", x, x+ TEST_PORT_BASE);
- else
- sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u", x, x+ TEST_PORT_BASE);
+ if (construct->udp){
+ if(x == 0) {
+ sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u -m 128", x, x+ TEST_PORT_BASE);
+ } else {
+ sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u", x, x+ TEST_PORT_BASE);
+ }
+ }
+ else{
+ if(x == 0) {
+ sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u -m 128", x, x+ TEST_PORT_BASE);
+ } else {
+ sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u", x, x+ TEST_PORT_BASE);
+ }
+ }
status= system(buffer);
count= sprintf(end_ptr, "localhost:%u,", x + TEST_PORT_BASE);
end_ptr+= count;