From d76b611a9173f8ea494b6287d768fdce39159958 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Fri, 1 May 2009 19:44:39 -0700 Subject: [PATCH] Add support for AUTO_EJECT_HOST --- libmemcached/common.h | 3 +- libmemcached/memcached.h | 1 + libmemcached/memcached_behavior.c | 8 +++- libmemcached/memcached_connect.c | 36 +++++++++------ libmemcached/memcached_constants.h | 4 +- libmemcached/memcached_hash.c | 8 ++++ libmemcached/memcached_hosts.c | 53 ++++++++++++++++++---- libmemcached/memcached_io.c | 14 ++---- libmemcached/memcached_quit.c | 2 + libmemcached/memcached_strerror.c | 2 + tests/function.c | 73 ++++++++++++++++++++++++++++++ 11 files changed, 168 insertions(+), 36 deletions(-) diff --git a/libmemcached/common.h b/libmemcached/common.h index b9560966..5dc77a08 100644 --- a/libmemcached/common.h +++ b/libmemcached/common.h @@ -77,7 +77,8 @@ typedef enum { MEM_BINARY_PROTOCOL= (1 << 12), MEM_HASH_WITH_PREFIX_KEY= (1 << 13), MEM_NOREPLY= (1 << 14), - MEM_USE_UDP= (1 << 15) + MEM_USE_UDP= (1 << 15), + MEM_AUTO_EJECT_HOSTS= (1 << 16) } memcached_flags; /* Hashing algo */ diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index 62fa65c0..aceab59b 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -117,6 +117,7 @@ struct memcached_st { uint32_t server_failure_limit; uint32_t io_msg_watermark; uint32_t io_bytes_watermark; + time_t next_distribution_rebuild; }; diff --git a/libmemcached/memcached_behavior.c b/libmemcached/memcached_behavior.c index 6bc00064..77c9bd3b 100644 --- a/libmemcached/memcached_behavior.c +++ b/libmemcached/memcached_behavior.c @@ -150,7 +150,10 @@ memcached_return memcached_behavior_set(memcached_st *ptr, break; case MEMCACHED_BEHAVIOR_NOREPLY: set_behavior_flag(ptr, MEM_NOREPLY, data); - break; + break; + case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS: + set_behavior_flag(ptr, MEM_AUTO_EJECT_HOSTS, data); + break; } return MEMCACHED_SUCCESS; @@ -263,6 +266,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr, case MEMCACHED_BEHAVIOR_NOREPLY: temp_flag= MEM_NOREPLY; break; + case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS: + temp_flag= MEM_AUTO_EJECT_HOSTS; + break; } WATCHPOINT_ASSERT(temp_flag); /* Programming mistake if it gets this far */ diff --git a/libmemcached/memcached_connect.c b/libmemcached/memcached_connect.c index 8e0e4208..fb79c80a 100644 --- a/libmemcached/memcached_connect.c +++ b/libmemcached/memcached_connect.c @@ -121,7 +121,7 @@ static memcached_return set_socket_options(memcached_server_st *ptr) } /* For the moment, not getting a nonblocking mode will not be fatal */ - if (ptr->root->flags & MEM_NO_BLOCK) + if ((ptr->root->flags & MEM_NO_BLOCK) || ptr->root->connect_timeout) { int flags; @@ -185,15 +185,6 @@ static memcached_return network_connect(memcached_server_st *ptr) { struct addrinfo *use; - if (ptr->root->server_failure_limit != 0) - { - if (ptr->server_failure_counter >= ptr->root->server_failure_limit) - { - memcached_server_remove(ptr); - return MEMCACHED_FAILURE; - } - } - if (!ptr->sockaddr_inited || (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS))) { @@ -253,7 +244,7 @@ static memcached_return network_connect(memcached_server_st *ptr) int err; int len = sizeof (err); (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); - ptr->cached_errno= errno; + ptr->cached_errno= (err == 0) ? errno : err; } (void)close(ptr->fd); @@ -275,7 +266,7 @@ static memcached_return network_connect(memcached_server_st *ptr) if (ptr->fd != -1) { /* restore flags */ - if (ptr->root->connect_timeout && (flags & O_NONBLOCK) == 0) + if (ptr->root->connect_timeout && (ptr->root->flags & MEM_NO_BLOCK) == 0) (void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK); WATCHPOINT_ASSERT(ptr->cursor_active == 0); @@ -297,6 +288,8 @@ static memcached_return network_connect(memcached_server_st *ptr) ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout; } ptr->server_failure_counter+= 1; + if (ptr->cached_errno == 0) + return MEMCACHED_TIMEOUT; return MEMCACHED_ERRNO; /* The last error should be from connect() */ } @@ -310,14 +303,29 @@ memcached_return memcached_connect(memcached_server_st *ptr) memcached_return rc= MEMCACHED_NO_SERVERS; LIBMEMCACHED_MEMCACHED_CONNECT_START(); - if (ptr->root->retry_timeout) + /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */ + if (ptr->root->retry_timeout && ptr->root->server_failure_limit) { struct timeval next_time; gettimeofday(&next_time, NULL); + + /* if we've had too many consecutive errors on this server, mark it dead. */ + if (ptr->server_failure_counter > ptr->root->server_failure_limit) + { + ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout; + ptr->server_failure_counter= 0; + } + if (next_time.tv_sec < ptr->next_retry) - return MEMCACHED_TIMEOUT; + { + if (memcached_behavior_get(ptr->root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS)) + run_distribution(ptr->root); + + return MEMCACHED_SERVER_MARKED_DEAD; + } } + /* We need to clean up the multi startup piece */ switch (ptr->type) { diff --git a/libmemcached/memcached_constants.h b/libmemcached/memcached_constants.h index 03c61871..9133a262 100644 --- a/libmemcached/memcached_constants.h +++ b/libmemcached/memcached_constants.h @@ -63,6 +63,7 @@ typedef enum { MEMCACHED_BUFFERED, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_INVALID_HOST_PROTOCOL, + MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_MAXIMUM_RETURN /* Always add new error code before */ } memcached_return; @@ -100,7 +101,8 @@ typedef enum { MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK, MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY, MEMCACHED_BEHAVIOR_NOREPLY, - MEMCACHED_BEHAVIOR_USE_UDP + MEMCACHED_BEHAVIOR_USE_UDP, + MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS } memcached_behavior; typedef enum { diff --git a/libmemcached/memcached_hash.c b/libmemcached/memcached_hash.c index d2af5165..0527733a 100644 --- a/libmemcached/memcached_hash.c +++ b/libmemcached/memcached_hash.c @@ -179,6 +179,14 @@ uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_ WATCHPOINT_ASSERT(hash); + if (memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS) && ptr->next_distribution_rebuild) { + struct timeval now; + + if (gettimeofday(&now, NULL) == 0 && + now.tv_sec > ptr->next_distribution_rebuild) + run_distribution(ptr); + } + return dispatch_host(ptr, hash); } diff --git a/libmemcached/memcached_hosts.c b/libmemcached/memcached_hosts.c index 2bcd189c..c8e15c4c 100644 --- a/libmemcached/memcached_hosts.c +++ b/libmemcached/memcached_hosts.c @@ -112,34 +112,63 @@ memcached_return update_continuum(memcached_st *ptr) uint32_t pointer_per_hash= 1; uint64_t total_weight= 0; uint32_t is_ketama_weighted= 0; + uint32_t is_auto_ejecting= 0; uint32_t points_per_server= 0; + uint32_t live_servers= 0; + struct timeval now; + + if (gettimeofday(&now, NULL) != 0) + { + ptr->cached_errno = errno; + return MEMCACHED_ERRNO; + } + + list = ptr->hosts; + + /* count live servers (those without a retry delay set) */ + is_auto_ejecting= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS); + if (is_auto_ejecting) + { + live_servers= 0; + ptr->next_distribution_rebuild= 0; + for (host_index= 0; host_index < ptr->number_of_hosts; ++host_index) + { + if (list[host_index].next_retry <= now.tv_sec) + live_servers++; + else + { + if (ptr->next_distribution_rebuild == 0 || list[host_index].next_retry < ptr->next_distribution_rebuild) + ptr->next_distribution_rebuild= list[host_index].next_retry; + } + } + } + else + live_servers= ptr->number_of_hosts; is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); points_per_server= is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER; - if (ptr->number_of_hosts == 0) + if (live_servers == 0) return MEMCACHED_SUCCESS; - if (ptr->number_of_hosts > ptr->continuum_count) + if (live_servers > ptr->continuum_count) { memcached_continuum_item_st *new_ptr; if (ptr->call_realloc) new_ptr= (memcached_continuum_item_st *)ptr->call_realloc(ptr, ptr->continuum, - sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server); + sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server); else new_ptr= (memcached_continuum_item_st *)realloc(ptr->continuum, - sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server); + sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server); if (new_ptr == 0) return MEMCACHED_MEMORY_ALLOCATION_FAILURE; ptr->continuum= new_ptr; - ptr->continuum_count= ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION; + ptr->continuum_count= live_servers + MEMCACHED_CONTINUUM_ADDITION; } - list = ptr->hosts; - if (is_ketama_weighted) { for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) @@ -148,16 +177,20 @@ memcached_return update_continuum(memcached_st *ptr) { list[host_index].weight = 1; } - total_weight += list[host_index].weight; + if (!is_auto_ejecting || list[host_index].next_retry <= now.tv_sec) + total_weight += list[host_index].weight; } } for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) { + if (is_auto_ejecting && list[host_index].next_retry > now.tv_sec) + continue; + if (is_ketama_weighted) { float pct = (float)list[host_index].weight / (float)total_weight; - pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)(ptr->number_of_hosts) + 0.0000000001) * 4; + pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)live_servers + 0.0000000001) * 4; pointer_per_hash= 4; #ifdef HAVE_DEBUG printf("ketama_weighted:%s|%d|%llu|%u\n", @@ -212,7 +245,7 @@ memcached_return update_continuum(memcached_st *ptr) qsort(ptr->continuum, ptr->continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp); #ifdef HAVE_DEBUG - for (index= 0; ptr->number_of_hosts && index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++) + for (index= 0; ptr->number_of_hosts && index < ((live_servers * MEMCACHED_POINTS_PER_SERVER) - 1); index++) { WATCHPOINT_ASSERT(ptr->continuum[index].value <= ptr->continuum[index + 1].value); } diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 43fb509e..3f184e06 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -116,16 +116,11 @@ ssize_t memcached_io_read(memcached_server_st *ptr, switch (errno) { case EAGAIN: - case EINTR: - { - memcached_return rc; - - rc= io_wait(ptr, MEM_READ); + case EINTR: + if (io_wait(ptr, MEM_READ) == MEMCACHED_SUCCESS) + continue; + /* fall through */ - if (rc == MEMCACHED_SUCCESS) - continue; - } - /* fall trough */ default: { memcached_quit_server(ptr, 1); @@ -177,6 +172,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr, } } + ptr->server_failure_counter= 0; return (size_t)(buffer_ptr - (char*)buffer); } diff --git a/libmemcached/memcached_quit.c b/libmemcached/memcached_quit.c index 80b9314a..c93cb188 100644 --- a/libmemcached/memcached_quit.c +++ b/libmemcached/memcached_quit.c @@ -49,6 +49,8 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death) ptr->read_ptr= ptr->read_buffer; memcached_server_response_reset(ptr); } + + ptr->server_failure_counter++; } void memcached_quit(memcached_st *ptr) diff --git a/libmemcached/memcached_strerror.c b/libmemcached/memcached_strerror.c index 72e51fda..4b626713 100644 --- a/libmemcached/memcached_strerror.c +++ b/libmemcached/memcached_strerror.c @@ -72,6 +72,8 @@ char *memcached_strerror(memcached_st *ptr __attribute__((unused)), memcached_re return "A BAD KEY WAS PROVIDED/CHARACTERS OUT OF RANGE"; case MEMCACHED_INVALID_HOST_PROTOCOL: return "THE HOST TRANSPORT PROTOCOL DOES NOT MATCH THAT OF THE CLIENT"; + case MEMCACHED_SERVER_MARKED_DEAD: + return "SERVER IS MARKED DEAD"; case MEMCACHED_MAXIMUM_RETURN: return "Gibberish returned!"; default: diff --git a/tests/function.c b/tests/function.c index 1f0c404b..182aa8c3 100644 --- a/tests/function.c +++ b/tests/function.c @@ -2407,6 +2407,73 @@ test_return user_supplied_bug18(memcached_st *trash) return 0; } +test_return auto_eject_hosts(memcached_st *trash) +{ + memcached_return rc; + memcached_st *memc= memcached_create(NULL); + assert(memc); + + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1); + assert(rc == MEMCACHED_SUCCESS); + + uint64_t value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); + assert(value == 1); + + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH, MEMCACHED_HASH_MD5); + assert(rc == MEMCACHED_SUCCESS); + + value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH); + assert(value == MEMCACHED_HASH_MD5); + + /* server should be removed when in delay */ + rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS, 1); + assert(rc == MEMCACHED_SUCCESS); + + value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS); + assert(value == 1); + + memcached_server_st *server_pool; + server_pool = memcached_servers_parse("10.0.1.1:11211 600,10.0.1.2:11211 300,10.0.1.3:11211 200,10.0.1.4:11211 350,10.0.1.5:11211 1000,10.0.1.6:11211 800,10.0.1.7:11211 950,10.0.1.8:11211 100"); + memcached_server_push(memc, server_pool); + + /* verify that the server list was parsed okay. */ + assert(memc->number_of_hosts == 8); + assert(strcmp(server_pool[0].hostname, "10.0.1.1") == 0); + assert(server_pool[0].port == 11211); + assert(server_pool[0].weight == 600); + assert(strcmp(server_pool[2].hostname, "10.0.1.3") == 0); + assert(server_pool[2].port == 11211); + assert(server_pool[2].weight == 200); + assert(strcmp(server_pool[7].hostname, "10.0.1.8") == 0); + assert(server_pool[7].port == 11211); + assert(server_pool[7].weight == 100); + + memc->hosts[2].next_retry = time(NULL) + 15; + memc->next_distribution_rebuild= time(NULL) - 1; + + for (int x= 0; x < 99; x++) + { + uint32_t server_idx = memcached_generate_hash(memc, test_cases[x].key, strlen(test_cases[x].key)); + assert(server_idx != 2); + } + + /* and re-added when it's back. */ + memc->hosts[2].next_retry = time(NULL) - 1; + memc->next_distribution_rebuild= time(NULL) - 1; + run_distribution(memc); + for (int x= 0; x < 99; x++) + { + uint32_t server_idx = memcached_generate_hash(memc, test_cases[x].key, strlen(test_cases[x].key)); + char *hostname = memc->hosts[server_idx].hostname; + assert(strcmp(hostname, test_cases[x].server) == 0); + } + + memcached_server_list_free(server_pool); + memcached_free(memc); + + return TEST_SUCCESS; +} + static test_return result_static(memcached_st *memc) { memcached_result_st result; @@ -3816,6 +3883,11 @@ test_st hsieh_availability[] ={ {0, 0, 0} }; +test_st ketama_auto_eject_hosts[] ={ + {"auto_eject_hosts", 1, auto_eject_hosts }, + {0, 0, 0} +}; + collection_st collection[] ={ {"hsieh_availability",0,0,hsieh_availability}, {"udp_setup", init_udp, 0, udp_setup_server_tests}, @@ -3835,6 +3907,7 @@ collection_st collection[] ={ {"fnv1_32", pre_hash_fnv1_32, 0, tests}, {"fnv1a_32", pre_hash_fnv1a_32, 0, tests}, {"ketama", pre_behavior_ketama, 0, tests}, + {"ketama_auto_eject_hosts", pre_behavior_ketama, 0, ketama_auto_eject_hosts}, {"unix_socket", pre_unix_socket, 0, tests}, {"unix_socket_nodelay", pre_nodelay, 0, tests}, {"poll_timeout", poll_timeout, 0, tests}, -- 2.30.2