#include "common.h"
+#include <math.h>
/* Protoypes (static) */
static memcached_return server_add(memcached_st *ptr, char *hostname,
uint32_t continuum_index= 0;
uint32_t value;
memcached_server_st *list;
+ uint32_t pointer_counter= 0;
+ uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER;
+ memcached_return rc;
+ uint64_t total_mem_bytes= 0;
+ memcached_stat_st *stat_p= NULL;
+ uint32_t is_ketama_weighted= 0;
if (ptr->number_of_hosts > ptr->continuum_count)
{
}
list = ptr->hosts;
+
+ is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+ if(is_ketama_weighted)
+ {
+ stat_p = memcached_stat(ptr, NULL, &rc);
+ for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
+ {
+ list[host_index].limit_maxbytes= (stat_p + host_index)->limit_maxbytes;
+ total_mem_bytes += (stat_p + host_index)->limit_maxbytes;
+ }
+ }
+
for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
{
- for(index= 1; index <= MEMCACHED_POINTS_PER_SERVER; ++index)
+ if(is_ketama_weighted)
+ {
+ float pct = (float)list[host_index].limit_maxbytes/ (float)total_mem_bytes;
+ pointer_per_server= floorf( pct * MEMCACHED_POINTS_PER_SERVER * (float)(ptr->number_of_hosts));
+#ifdef HAVE_DEBUG
+ printf("ketama_weighted:%s|%d|%llu|%u\n", list[host_index].hostname, list[host_index].port, list[host_index].limit_maxbytes, pointer_per_server);
+#endif
+ }
+ for(index= 1; index <= pointer_per_server; ++index)
{
char sort_host[MEMCACHED_MAX_HOST_SORT_LENGTH]= "";
size_t sort_host_length;
sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d",
list[host_index].hostname, list[host_index].port, index);
WATCHPOINT_ASSERT(sort_host_length);
- value= generate_hash(ptr, sort_host, sort_host_length);
+ value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum);
ptr->continuum[continuum_index].index= host_index;
ptr->continuum[continuum_index++].value= value;
}
+ pointer_counter+= pointer_per_server;
}
WATCHPOINT_ASSERT(ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);
qsort(ptr->continuum, ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER, sizeof(memcached_continuum_item_st), continuum_item_cmp);
+
+ ptr->continuum_points_counter= pointer_counter;
+ memcached_stat_free(NULL, stat_p);
+
#ifdef HAVE_DEBUG
for (index= 0; index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++)
{