MEM_BINARY_PROTOCOL= (1 << 12),
MEM_HASH_WITH_PREFIX_KEY= (1 << 13),
MEM_NOREPLY= (1 << 14),
- MEM_USE_UDP= (1 << 15)
+ MEM_USE_UDP= (1 << 15),
+ MEM_AUTO_EJECT_HOSTS= (1 << 16)
} memcached_flags;
/* Hashing algo */
uint32_t server_failure_limit;
uint32_t io_msg_watermark;
uint32_t io_bytes_watermark;
+ time_t next_distribution_rebuild;
};
break;
case MEMCACHED_BEHAVIOR_NOREPLY:
set_behavior_flag(ptr, MEM_NOREPLY, data);
- break;
+ break;
+ case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS:
+ set_behavior_flag(ptr, MEM_AUTO_EJECT_HOSTS, data);
+ break;
}
return MEMCACHED_SUCCESS;
case MEMCACHED_BEHAVIOR_NOREPLY:
temp_flag= MEM_NOREPLY;
break;
+ case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS:
+ temp_flag= MEM_AUTO_EJECT_HOSTS;
+ break;
}
WATCHPOINT_ASSERT(temp_flag); /* Programming mistake if it gets this far */
}
/* For the moment, not getting a nonblocking mode will not be fatal */
- if (ptr->root->flags & MEM_NO_BLOCK)
+ if ((ptr->root->flags & MEM_NO_BLOCK) || ptr->root->connect_timeout)
{
int flags;
{
struct addrinfo *use;
- if (ptr->root->server_failure_limit != 0)
- {
- if (ptr->server_failure_counter >= ptr->root->server_failure_limit)
- {
- memcached_server_remove(ptr);
- return MEMCACHED_FAILURE;
- }
- }
-
if (!ptr->sockaddr_inited ||
(!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS)))
{
int err;
int len = sizeof (err);
(void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
- ptr->cached_errno= errno;
+ ptr->cached_errno= (err == 0) ? errno : err;
}
(void)close(ptr->fd);
if (ptr->fd != -1)
{
/* restore flags */
- if (ptr->root->connect_timeout && (flags & O_NONBLOCK) == 0)
+ if (ptr->root->connect_timeout && (ptr->root->flags & MEM_NO_BLOCK) == 0)
(void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK);
WATCHPOINT_ASSERT(ptr->cursor_active == 0);
ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
}
ptr->server_failure_counter+= 1;
+ if (ptr->cached_errno == 0)
+ return MEMCACHED_TIMEOUT;
return MEMCACHED_ERRNO; /* The last error should be from connect() */
}
memcached_return rc= MEMCACHED_NO_SERVERS;
LIBMEMCACHED_MEMCACHED_CONNECT_START();
- if (ptr->root->retry_timeout)
+ /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
+ if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
{
struct timeval next_time;
gettimeofday(&next_time, NULL);
+
+ /* if we've had too many consecutive errors on this server, mark it dead. */
+ if (ptr->server_failure_counter > ptr->root->server_failure_limit)
+ {
+ ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
+ ptr->server_failure_counter= 0;
+ }
+
if (next_time.tv_sec < ptr->next_retry)
- return MEMCACHED_TIMEOUT;
+ {
+ if (memcached_behavior_get(ptr->root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
+ run_distribution(ptr->root);
+
+ return MEMCACHED_SERVER_MARKED_DEAD;
+ }
}
+
/* We need to clean up the multi startup piece */
switch (ptr->type)
{
MEMCACHED_BUFFERED,
MEMCACHED_BAD_KEY_PROVIDED,
MEMCACHED_INVALID_HOST_PROTOCOL,
+ MEMCACHED_SERVER_MARKED_DEAD,
MEMCACHED_MAXIMUM_RETURN /* Always add new error code before */
} memcached_return;
MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK,
MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY,
MEMCACHED_BEHAVIOR_NOREPLY,
- MEMCACHED_BEHAVIOR_USE_UDP
+ MEMCACHED_BEHAVIOR_USE_UDP,
+ MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS
} memcached_behavior;
typedef enum {
WATCHPOINT_ASSERT(hash);
+ if (memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS) && ptr->next_distribution_rebuild) {
+ struct timeval now;
+
+ if (gettimeofday(&now, NULL) == 0 &&
+ now.tv_sec > ptr->next_distribution_rebuild)
+ run_distribution(ptr);
+ }
+
return dispatch_host(ptr, hash);
}
uint32_t pointer_per_hash= 1;
uint64_t total_weight= 0;
uint32_t is_ketama_weighted= 0;
+ uint32_t is_auto_ejecting= 0;
uint32_t points_per_server= 0;
+ uint32_t live_servers= 0;
+ struct timeval now;
+
+ if (gettimeofday(&now, NULL) != 0)
+ {
+ ptr->cached_errno = errno;
+ return MEMCACHED_ERRNO;
+ }
+
+ list = ptr->hosts;
+
+ /* count live servers (those without a retry delay set) */
+ is_auto_ejecting= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS);
+ if (is_auto_ejecting)
+ {
+ live_servers= 0;
+ ptr->next_distribution_rebuild= 0;
+ for (host_index= 0; host_index < ptr->number_of_hosts; ++host_index)
+ {
+ if (list[host_index].next_retry <= now.tv_sec)
+ live_servers++;
+ else
+ {
+ if (ptr->next_distribution_rebuild == 0 || list[host_index].next_retry < ptr->next_distribution_rebuild)
+ ptr->next_distribution_rebuild= list[host_index].next_retry;
+ }
+ }
+ }
+ else
+ live_servers= ptr->number_of_hosts;
is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
points_per_server= is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER;
- if (ptr->number_of_hosts == 0)
+ if (live_servers == 0)
return MEMCACHED_SUCCESS;
- if (ptr->number_of_hosts > ptr->continuum_count)
+ if (live_servers > ptr->continuum_count)
{
memcached_continuum_item_st *new_ptr;
if (ptr->call_realloc)
new_ptr= (memcached_continuum_item_st *)ptr->call_realloc(ptr, ptr->continuum,
- sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
+ sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
else
new_ptr= (memcached_continuum_item_st *)realloc(ptr->continuum,
- sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
+ sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
if (new_ptr == 0)
return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
ptr->continuum= new_ptr;
- ptr->continuum_count= ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION;
+ ptr->continuum_count= live_servers + MEMCACHED_CONTINUUM_ADDITION;
}
- list = ptr->hosts;
-
if (is_ketama_weighted)
{
for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
{
list[host_index].weight = 1;
}
- total_weight += list[host_index].weight;
+ if (!is_auto_ejecting || list[host_index].next_retry <= now.tv_sec)
+ total_weight += list[host_index].weight;
}
}
for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
{
+ if (is_auto_ejecting && list[host_index].next_retry > now.tv_sec)
+ continue;
+
if (is_ketama_weighted)
{
float pct = (float)list[host_index].weight / (float)total_weight;
- pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)(ptr->number_of_hosts) + 0.0000000001) * 4;
+ pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)live_servers + 0.0000000001) * 4;
pointer_per_hash= 4;
#ifdef HAVE_DEBUG
printf("ketama_weighted:%s|%d|%llu|%u\n",
qsort(ptr->continuum, ptr->continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
#ifdef HAVE_DEBUG
- for (index= 0; ptr->number_of_hosts && index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++)
+ for (index= 0; ptr->number_of_hosts && index < ((live_servers * MEMCACHED_POINTS_PER_SERVER) - 1); index++)
{
WATCHPOINT_ASSERT(ptr->continuum[index].value <= ptr->continuum[index + 1].value);
}
switch (errno)
{
case EAGAIN:
- case EINTR:
- {
- memcached_return rc;
-
- rc= io_wait(ptr, MEM_READ);
+ case EINTR:
+ if (io_wait(ptr, MEM_READ) == MEMCACHED_SUCCESS)
+ continue;
+ /* fall through */
- if (rc == MEMCACHED_SUCCESS)
- continue;
- }
- /* fall trough */
default:
{
memcached_quit_server(ptr, 1);
}
}
+ ptr->server_failure_counter= 0;
return (size_t)(buffer_ptr - (char*)buffer);
}
ptr->read_ptr= ptr->read_buffer;
memcached_server_response_reset(ptr);
}
+
+ ptr->server_failure_counter++;
}
void memcached_quit(memcached_st *ptr)
return "A BAD KEY WAS PROVIDED/CHARACTERS OUT OF RANGE";
case MEMCACHED_INVALID_HOST_PROTOCOL:
return "THE HOST TRANSPORT PROTOCOL DOES NOT MATCH THAT OF THE CLIENT";
+ case MEMCACHED_SERVER_MARKED_DEAD:
+ return "SERVER IS MARKED DEAD";
case MEMCACHED_MAXIMUM_RETURN:
return "Gibberish returned!";
default:
return 0;
}
+test_return auto_eject_hosts(memcached_st *trash)
+{
+ memcached_return rc;
+ memcached_st *memc= memcached_create(NULL);
+ assert(memc);
+
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ uint64_t value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+ assert(value == 1);
+
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH, MEMCACHED_HASH_MD5);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH);
+ assert(value == MEMCACHED_HASH_MD5);
+
+ /* server should be removed when in delay */
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS, 1);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS);
+ assert(value == 1);
+
+ memcached_server_st *server_pool;
+ server_pool = memcached_servers_parse("10.0.1.1:11211 600,10.0.1.2:11211 300,10.0.1.3:11211 200,10.0.1.4:11211 350,10.0.1.5:11211 1000,10.0.1.6:11211 800,10.0.1.7:11211 950,10.0.1.8:11211 100");
+ memcached_server_push(memc, server_pool);
+
+ /* verify that the server list was parsed okay. */
+ assert(memc->number_of_hosts == 8);
+ assert(strcmp(server_pool[0].hostname, "10.0.1.1") == 0);
+ assert(server_pool[0].port == 11211);
+ assert(server_pool[0].weight == 600);
+ assert(strcmp(server_pool[2].hostname, "10.0.1.3") == 0);
+ assert(server_pool[2].port == 11211);
+ assert(server_pool[2].weight == 200);
+ assert(strcmp(server_pool[7].hostname, "10.0.1.8") == 0);
+ assert(server_pool[7].port == 11211);
+ assert(server_pool[7].weight == 100);
+
+ memc->hosts[2].next_retry = time(NULL) + 15;
+ memc->next_distribution_rebuild= time(NULL) - 1;
+
+ for (int x= 0; x < 99; x++)
+ {
+ uint32_t server_idx = memcached_generate_hash(memc, test_cases[x].key, strlen(test_cases[x].key));
+ assert(server_idx != 2);
+ }
+
+ /* and re-added when it's back. */
+ memc->hosts[2].next_retry = time(NULL) - 1;
+ memc->next_distribution_rebuild= time(NULL) - 1;
+ run_distribution(memc);
+ for (int x= 0; x < 99; x++)
+ {
+ uint32_t server_idx = memcached_generate_hash(memc, test_cases[x].key, strlen(test_cases[x].key));
+ char *hostname = memc->hosts[server_idx].hostname;
+ assert(strcmp(hostname, test_cases[x].server) == 0);
+ }
+
+ memcached_server_list_free(server_pool);
+ memcached_free(memc);
+
+ return TEST_SUCCESS;
+}
+
static test_return result_static(memcached_st *memc)
{
memcached_result_st result;
{0, 0, 0}
};
+test_st ketama_auto_eject_hosts[] ={
+ {"auto_eject_hosts", 1, auto_eject_hosts },
+ {0, 0, 0}
+};
+
collection_st collection[] ={
{"hsieh_availability",0,0,hsieh_availability},
{"udp_setup", init_udp, 0, udp_setup_server_tests},
{"fnv1_32", pre_hash_fnv1_32, 0, tests},
{"fnv1a_32", pre_hash_fnv1a_32, 0, tests},
{"ketama", pre_behavior_ketama, 0, tests},
+ {"ketama_auto_eject_hosts", pre_behavior_ketama, 0, ketama_auto_eject_hosts},
{"unix_socket", pre_unix_socket, 0, tests},
{"unix_socket_nodelay", pre_nodelay, 0, tests},
{"poll_timeout", poll_timeout, 0, tests},