return memcached_behavior_set_distribution(ptr, (memcached_server_distribution_t)data);
case MEMCACHED_BEHAVIOR_KETAMA:
{
- if (data)
- {
- (void)memcached_behavior_set_key_hash(ptr, MEMCACHED_HASH_MD5);
- (void)memcached_behavior_set_distribution_hash(ptr, MEMCACHED_HASH_MD5);
- (void)memcached_behavior_set_distribution(ptr, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA);
- }
- else
- {
- (void)memcached_behavior_set_key_hash(ptr, MEMCACHED_HASH_DEFAULT);
- (void)memcached_behavior_set_distribution_hash(ptr, MEMCACHED_HASH_DEFAULT);
- (void)memcached_behavior_set_distribution(ptr, MEMCACHED_DISTRIBUTION_MODULA);
- }
+ if (data) // Turn on
+ return memcached_behavior_set_distribution(ptr, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA);
- break;
+ return memcached_behavior_set_distribution(ptr, MEMCACHED_DISTRIBUTION_MODULA);
}
case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED:
{
case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA:
case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY:
{
- uint32_t num= ptr->continuum_points_counter;
+ uint32_t num= ptr->ketama.continuum_points_counter;
WATCHPOINT_ASSERT(ptr->continuum);
hash= hash;
memcached_continuum_item_st *begin, *end, *left, *right, *middle;
- begin= left= ptr->continuum;
- end= right= ptr->continuum + num;
+ begin= left= ptr->ketama.continuum;
+ end= right= ptr->ketama.continuum + num;
while (left < right)
{
static inline void _regen_for_auto_eject(memcached_st *ptr)
{
- if (_is_auto_eject_host(ptr) && ptr->next_distribution_rebuild)
+ if (_is_auto_eject_host(ptr) && ptr->ketama.next_distribution_rebuild)
{
struct timeval now;
if (gettimeofday(&now, NULL) == 0 &&
- now.tv_sec > ptr->next_distribution_rebuild)
+ now.tv_sec > ptr->ketama.next_distribution_rebuild)
{
run_distribution(ptr);
}
if (is_auto_ejecting)
{
live_servers= 0;
- ptr->next_distribution_rebuild= 0;
+ ptr->ketama.next_distribution_rebuild= 0;
for (host_index= 0; host_index < memcached_server_count(ptr); ++host_index)
{
if (list[host_index].next_retry <= now.tv_sec)
live_servers++;
else
{
- if (ptr->next_distribution_rebuild == 0 || list[host_index].next_retry < ptr->next_distribution_rebuild)
- ptr->next_distribution_rebuild= list[host_index].next_retry;
+ if (ptr->ketama.next_distribution_rebuild == 0 || list[host_index].next_retry < ptr->ketama.next_distribution_rebuild)
+ ptr->ketama.next_distribution_rebuild= list[host_index].next_retry;
}
}
}
if (live_servers == 0)
return MEMCACHED_SUCCESS;
- if (live_servers > ptr->continuum_count)
+ if (live_servers > ptr->ketama.continuum_count)
{
memcached_continuum_item_st *new_ptr;
- new_ptr= libmemcached_realloc(ptr, ptr->continuum,
+ new_ptr= libmemcached_realloc(ptr, ptr->ketama.continuum,
sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
if (new_ptr == 0)
return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
- ptr->continuum= new_ptr;
- ptr->continuum_count= live_servers + MEMCACHED_CONTINUUM_ADDITION;
+ ptr->ketama.continuum= new_ptr;
+ ptr->ketama.continuum_count= live_servers + MEMCACHED_CONTINUUM_ADDITION;
}
if (is_ketama_weighted)
for (uint32_t x= 0; x < pointer_per_hash; x++)
{
value= ketama_server_hash(sort_host, (size_t)sort_host_length, x);
- ptr->continuum[continuum_index].index= host_index;
- ptr->continuum[continuum_index++].value= value;
+ ptr->ketama.continuum[continuum_index].index= host_index;
+ ptr->ketama.continuum[continuum_index++].value= value;
}
}
else
{
value= hashkit_digest(&ptr->distribution_hashkit, sort_host, (size_t)sort_host_length);
- ptr->continuum[continuum_index].index= host_index;
- ptr->continuum[continuum_index++].value= value;
+ ptr->ketama.continuum[continuum_index].index= host_index;
+ ptr->ketama.continuum[continuum_index++].value= value;
}
}
}
for (uint32_t x = 0; x < pointer_per_hash; x++)
{
value= ketama_server_hash(sort_host, (size_t)sort_host_length, x);
- ptr->continuum[continuum_index].index= host_index;
- ptr->continuum[continuum_index++].value= value;
+ ptr->ketama.continuum[continuum_index].index= host_index;
+ ptr->ketama.continuum[continuum_index++].value= value;
}
}
else
{
value= hashkit_digest(&ptr->distribution_hashkit, sort_host, (size_t)sort_host_length);
- ptr->continuum[continuum_index].index= host_index;
- ptr->continuum[continuum_index++].value= value;
+ ptr->ketama.continuum[continuum_index].index= host_index;
+ ptr->ketama.continuum[continuum_index++].value= value;
}
}
}
WATCHPOINT_ASSERT(ptr);
WATCHPOINT_ASSERT(ptr->continuum);
WATCHPOINT_ASSERT(memcached_server_count(ptr) * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);
- ptr->continuum_points_counter= pointer_counter;
- qsort(ptr->continuum, ptr->continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
+ ptr->ketama.continuum_points_counter= pointer_counter;
+ qsort(ptr->ketama.continuum, ptr->ketama.continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
#ifdef DEBUG
for (pointer_index= 0; memcached_server_count(ptr) && pointer_index < ((live_servers * MEMCACHED_POINTS_PER_SERVER) - 1); pointer_index++)
if (! hash_ptr)
return false;
- self->continuum_points_counter= 0;
+ self->ketama.continuum_points_counter= 0;
self->number_of_hosts= 0;
self->servers= NULL;
self->poll_timeout= MEMCACHED_DEFAULT_TIMEOUT;
self->connect_timeout= MEMCACHED_DEFAULT_CONNECT_TIMEOUT;
self->retry_timeout= 0;
- self->continuum_count= 0;
+ self->ketama.continuum_count= 0;
self->send_size= -1;
self->recv_size= -1;
self->user_data= NULL;
- self->next_distribution_rebuild= 0;
+ self->ketama.next_distribution_rebuild= 0;
self->number_of_replicas= 0;
hash_ptr= hashkit_create(&self->distribution_hashkit);
if (! hash_ptr)
return false;
- self->continuum= NULL;
+ self->ketama.continuum= NULL;
self->allocators= memcached_allocators_return_default();
if (ptr->on_cleanup)
ptr->on_cleanup(ptr);
- if (ptr->continuum)
- libmemcached_free(ptr, ptr->continuum);
+ if (ptr->ketama.continuum)
+ libmemcached_free(ptr, ptr->ketama.continuum);
memcached_array_free(ptr->prefix_key);
ptr->prefix_key= NULL;
} flags;
memcached_server_distribution_t distribution;
hashkit_st hashkit;
- uint32_t continuum_points_counter; // Ketama
uint32_t number_of_hosts;
memcached_server_st *servers;
memcached_server_st *last_disconnected_server;
int32_t poll_timeout;
int32_t connect_timeout;
int32_t retry_timeout;
- uint32_t continuum_count; // Ketama
int send_size;
int recv_size;
void *user_data;
- time_t next_distribution_rebuild; // Ketama
uint32_t number_of_replicas;
hashkit_st distribution_hashkit;
memcached_result_st result;
- memcached_continuum_item_st *continuum; // Ketama
+
+ struct {
+ uint32_t continuum_count; // Ketama
+ uint32_t continuum_points_counter; // Ketama
+ time_t next_distribution_rebuild; // Ketama
+ memcached_continuum_item_st *continuum; // Ketama
+ } ketama;
struct _allocators_st {
memcached_calloc_fn calloc;
/* VDEAAAAA hashes to fffcd1b5, after the last continuum point, and lets
* us test the boundary wraparound.
*/
- test_true(memcached_generate_hash(memc, (char *)"VDEAAAAA", 8) == memc->continuum[0].index);
+ test_true(memcached_generate_hash(memc, (char *)"VDEAAAAA", 8) == memc->ketama.continuum[0].index);
/* verify the standard ketama set. */
for (x= 0; x < 99; x++)
instance= memcached_server_instance_by_position(memc, 2);
((memcached_server_write_instance_st)instance)->next_retry = time(NULL) + 15;
- memc->next_distribution_rebuild= time(NULL) - 1;
+ memc->ketama.next_distribution_rebuild= time(NULL) - 1;
/*
This would not work if there were only two hosts.
/* and re-added when it's back. */
((memcached_server_write_instance_st)instance)->next_retry = time(NULL) - 1;
- memc->next_distribution_rebuild= time(NULL) - 1;
+ memc->ketama.next_distribution_rebuild= time(NULL) - 1;
memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_DISTRIBUTION,
memc->distribution);
for (size_t x= 0; x < 99; x++)
/* VDEAAAAA hashes to fffcd1b5, after the last continuum point, and lets
* us test the boundary wraparound.
*/
- test_true(memcached_generate_hash(memc, (char *)"VDEAAAAA", 8) == memc->continuum[0].index);
+ test_true(memcached_generate_hash(memc, (char *)"VDEAAAAA", 8) == memc->ketama.continuum[0].index);
/* verify the standard ketama set. */
for (x= 0; x < 99; x++)
{
memcached_return_t rc;
uint64_t value;
- int x;
memcached_server_st *server_pool;
memcached_st *memc;
/* VDEAAAAA hashes to fffcd1b5, after the last continuum point, and lets
* us test the boundary wraparound.
*/
- test_true(memcached_generate_hash(memc, (char *)"VDEAAAAA", 8) == memc->continuum[0].index);
+ test_true(memcached_generate_hash(memc, (char *)"VDEAAAAA", 8) == memc->ketama.continuum[0].index);
/* verify the standard ketama set. */
- for (x= 0; x < 99; x++)
+ for (uint32_t x= 0; x < 99; x++)
{
uint32_t server_idx= memcached_generate_hash(memc, ketama_test_cases_spy[x].key, strlen(ketama_test_cases_spy[x].key));