void sort_hosts(memcached_st *ptr);
+uint32_t generate_hash(memcached_st *ptr, char *key, size_t key_length);
#endif /* __COMMON_H__ */
/* string value */
#define LIBMEMCACHED_VERSION_STRING "0.19"
+struct continuum_item{
+ uint32_t index;
+ uint32_t value;
+};
+
struct memcached_stat_st {
uint32_t pid;
uint32_t uptime;
memcached_server_distribution distribution;
void *user_data;
unsigned int wheel[MEMCACHED_WHEEL_SIZE];
+ struct continuum_item continuum[MEMCACHED_CONTINUUM_SIZE];
memcached_clone_func on_clone;
memcached_cleanup_func on_cleanup;
memcached_free_function call_free;
#endif
};
+
/* Public API */
const char * memcached_lib_version(void);
#define MEMCACHED_MAX_KEY 251 /* We add one to have it null terminated */
#define MEMCACHED_MAX_BUFFER 8196
#define MEMCACHED_MAX_HOST_LENGTH 64
+#define MEMCACHED_MAX_HOST_SORT_LENGTH 86 /* Used for Ketama */
#define MEMCACHED_WHEEL_SIZE 1024
+#define MEMCACHED_CONTINUUM_SIZE 2048
+#define MEMCACHED_POINTS_PER_SERVER 100
#define MEMCACHED_STRIDE 4
#define MEMCACHED_DEFAULT_TIMEOUT INT32_MAX
typedef enum {
MEMCACHED_DISTRIBUTION_MODULA,
MEMCACHED_DISTRIBUTION_CONSISTENT,
+ MEMCACHED_DISTRIBUTION_CONSISTENT_WHEEL,
+ MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA,
} memcached_server_distribution;
typedef enum {
static uint32_t internal_generate_md5(char *key, size_t key_length);
static uint32_t internal_generate_ketama_md5(char *key, size_t key_length);
-unsigned int memcached_generate_hash(memcached_st *ptr, char *key, size_t key_length)
+uint32_t generate_hash(memcached_st *ptr, char *key, size_t key_length)
{
uint32_t hash= 1; /* Just here to remove compile warning */
- unsigned int x;
+ uint32_t x= 0;
+
WATCHPOINT_ASSERT(ptr->number_of_hosts);
}
WATCHPOINT_ASSERT(hash);
+ return hash;
+}
+
+unsigned int dispatch_host(memcached_st *ptr, uint32_t hash)
+{
+ switch (ptr->distribution)
+ {
+ case MEMCACHED_DISTRIBUTION_CONSISTENT:
+ case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA:
+ {
+ int num= ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER;
+
+ hash= hash;
+ struct continuum_item *begin, *end, *left, *right, *middle;
+ begin= left= ptr->continuum;
+ end= right= ptr->continuum + (num - 1);
+
+ while (1)
+ {
+ struct continuum_item *rmiddle;
+
+ middle = left + (right - left) / 2;
+
+ if (middle==end)
+ return begin->index;
+
+ if (middle==begin)
+ return end->index;
+
+ rmiddle = middle+1;
+
+ if (hash<rmiddle->value && hash>=middle->value)
+ return middle->index;
+
+ if (middle->value < hash)
+ left = middle + 1;
+ else if (middle->value > hash)
+ right = middle - 1;
+
+ if (left>right)
+ return left->index;
+ }
+ }
+ break;
+ case MEMCACHED_DISTRIBUTION_CONSISTENT_WHEEL:
+ {
+ unsigned int server_key;
+
+ server_key= hash % MEMCACHED_WHEEL_SIZE;
+
+ return ptr->wheel[server_key];
+ }
+ case MEMCACHED_DISTRIBUTION_MODULA:
+ return hash % ptr->number_of_hosts;
+ default:
+ WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */
+ return hash % ptr->number_of_hosts;
+ }
if (ptr->distribution == MEMCACHED_DISTRIBUTION_MODULA)
{
return hash % ptr->number_of_hosts;
}
- else
+ else if (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT)
{
unsigned int server_key;
return ptr->wheel[server_key];
}
+ else if (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA)
+ {
+ int num = ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER;
+
+ hash = hash;
+ struct continuum_item *begin, *end, *left, *right, *middle;
+ begin = left = ptr->continuum;
+ end = right = ptr->continuum + (num - 1);
+ while(1)
+ {
+ middle = left + (right - left) / 2;
+ if(middle==end)
+ {
+ return begin->index;
+ }
+ if(middle==begin)
+ {
+ return end->index;
+ }
+ struct continuum_item *rmiddle = middle+1;
+ if(hash<rmiddle->value && hash>=middle->value)
+ return middle->index;
+
+ if(middle->value < hash) {
+ left = middle + 1;
+ }else if(middle->value > hash) {
+ right = middle - 1;
+ }
+
+ if (left>right)
+ return left->index;
+ }
+ }
+ else
+ {
+ WATCHPOINT_ASSERT(0);
+ }
+}
+
+unsigned int memcached_generate_hash(memcached_st *ptr, char *key, size_t key_length)
+{
+ uint32_t hash= 1; /* Just here to remove compile warning */
+
+ WATCHPOINT_ASSERT(ptr->number_of_hosts);
+
+ if (ptr->number_of_hosts == 1)
+ return 0;
+
+ hash = generate_hash(ptr, key, key_length);
+
+ WATCHPOINT_ASSERT(hash);
+ return dispatch_host(ptr, hash);
}
static uint32_t internal_generate_hash(char *key, size_t key_length)
free(servers);
}
+static int continuum_item_cmp(const void *t1, const void *t2)
+{
+ struct continuum_item *ct1 = (struct continuum_item *)t1;
+ struct continuum_item *ct2 = (struct continuum_item *)t2;
+ if(ct1->value == ct2->value)
+ return 0;
+ else if(ct1->value > ct2->value)
+ return 1;
+ else
+ return -1;
+}
+
+static uint32_t internal_generate_ketama_md5(char *key, size_t key_length)
+{
+ unsigned char results[16];
+
+ md5_signature((unsigned char*)key, (unsigned int)key_length, results);
+
+ return ( (results[3] ) << 24)
+ | ( (results[2] ) << 16)
+ | ( (results[1] ) << 8)
+ | ( results[0] );
+}
+
+void update_continuum(memcached_st *ptr)
+{
+ int index;
+ int host_index;
+ int continuum_index= 0;
+ int value;
+ memcached_server_st *list = ptr->hosts;
+
+ for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
+ {
+ for(index= 1; index <= MEMCACHED_POINTS_PER_SERVER; ++index)
+ {
+ char sort_host[MEMCACHED_MAX_HOST_SORT_LENGTH]= "";
+ size_t sort_host_length;
+
+ sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d",
+ list[host_index].hostname, list[host_index].port, index);
+ WATCHPOINT_ASSERT(sort_host_length);
+ value= internal_generate_ketama_md5(sort_host, sort_host_length);
+ ptr->continuum[continuum_index].index= host_index;
+ ptr->continuum[continuum_index++].value= value;
+ }
+ }
+
+ qsort(ptr->continuum, ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER, sizeof(struct continuum_item), continuum_item_cmp);
+#ifdef HAVE_DEBUG
+ for(index= 0; index < ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER - 1; ++index)
+ {
+ WATCHPOINT_ASSERT(ptr->continuum[index].value < ptr->continuum[index + 1].value);
+ }
+#endif
+}
+
+
memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *list)
{
unsigned int x;
if (ptr->flags & MEM_USE_SORT_HOSTS)
sort_hosts(ptr);
- rebalance_wheel(ptr);
+ switch (ptr->distribution)
+ {
+ case MEMCACHED_DISTRIBUTION_CONSISTENT:
+ case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA:
+ update_continuum(ptr);
+ break;
+ case MEMCACHED_DISTRIBUTION_CONSISTENT_WHEEL:
+ rebalance_wheel(ptr);
+ break;
+ case MEMCACHED_DISTRIBUTION_MODULA:
+ break;
+ default:
+ WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */
+ }
return MEMCACHED_SUCCESS;
}
ptr->hosts[0].count= ptr->number_of_hosts;
- rebalance_wheel(ptr);
+ switch (ptr->distribution)
+ {
+ case MEMCACHED_DISTRIBUTION_CONSISTENT:
+ case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA:
+ update_continuum(ptr);
+ break;
+ case MEMCACHED_DISTRIBUTION_CONSISTENT_WHEEL:
+ rebalance_wheel(ptr);
+ break;
+ case MEMCACHED_DISTRIBUTION_MODULA:
+ break;
+ default:
+ WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */
+ }
LIBMEMCACHED_MEMCACHED_SERVER_ADD_END();