From: Date: Mon, 3 Dec 2007 03:29:59 +0000 (+0900) Subject: Fixing failure of socket issue. X-Git-Tag: 0.13~53 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=e1944ccd4d1e65f4192783fa9c564c9b747bb618;p=awesomized%2Flibmemcached Fixing failure of socket issue. --- diff --git a/include/memcached.h b/include/memcached.h index 11c760d2..c4871027 100644 --- a/include/memcached.h +++ b/include/memcached.h @@ -35,6 +35,7 @@ typedef struct memcached_server_st memcached_server_st; #define MEMCACHED_MAX_HOST_LENGTH 64 #define MEMCACHED_WHEEL_SIZE 1024 #define MEMCACHED_STRIDE 4 +#define MEMCACHED_DEFAILT_TIMEOUT 100 typedef enum { MEMCACHED_SUCCESS, diff --git a/lib/common.h b/lib/common.h index 2edd83e5..96fc3981 100644 --- a/lib/common.h +++ b/lib/common.h @@ -67,7 +67,7 @@ memcached_return memcached_response(memcached_st *ptr, char *buffer, size_t buffer_length, unsigned int server_key); unsigned int memcached_generate_hash(memcached_st *ptr, char *key, size_t key_length); -void memcached_quit_server(memcached_st *ptr, unsigned int server_key); +void memcached_quit_server(memcached_st *ptr, unsigned int server_key, uint8_t io_death); #define memcached_server_response_increment(A,B) A->hosts[B].stack_responses++ #define memcached_server_response_decrement(A,B) A->hosts[B].stack_responses-- diff --git a/lib/memcached.c b/lib/memcached.c index 591bf1f6..5e189cda 100644 --- a/lib/memcached.c +++ b/lib/memcached.c @@ -22,7 +22,7 @@ memcached_st *memcached_create(memcached_st *ptr) } string_ptr= memcached_string_create(ptr, &ptr->result_buffer, 0); WATCHPOINT_ASSERT(string_ptr); - ptr->poll_timeout= -1; + ptr->poll_timeout= MEMCACHED_DEFAILT_TIMEOUT; ptr->distribution= MEMCACHED_DISTRIBUTION_MODULO; return ptr; diff --git a/lib/memcached_do.c b/lib/memcached_do.c index ab229e17..ce9cceb6 100644 --- a/lib/memcached_do.c +++ b/lib/memcached_do.c @@ -18,10 +18,7 @@ memcached_return memcached_do(memcached_st *ptr, unsigned int server_key, char * sent_length= memcached_io_write(ptr, server_key, command, command_length, with_flush); if (sent_length == -1 || sent_length != command_length) - { - memcached_quit_server(ptr, server_key); rc= MEMCACHED_WRITE_FAILURE; - } return rc; } diff --git a/lib/memcached_get.c b/lib/memcached_get.c index aab22538..598767f8 100644 --- a/lib/memcached_get.c +++ b/lib/memcached_get.c @@ -55,6 +55,10 @@ memcached_return memcached_mget(memcached_st *ptr, memcached_finish(ptr); + /* + If a server fails we warn about errors and start all over with sending keys + to the server. + */ for (x= 0; x < number_of_keys; x++) { unsigned int server_key; @@ -67,7 +71,6 @@ memcached_return memcached_mget(memcached_st *ptr, if ((memcached_io_write(ptr, server_key, get_command, get_command_length, 0)) == -1) { - memcached_quit_server(ptr, server_key); rc= MEMCACHED_SOME_ERRORS; continue; } @@ -77,7 +80,6 @@ memcached_return memcached_mget(memcached_st *ptr, if ((memcached_io_write(ptr, server_key, keys[x], key_length[x], 0)) == -1) { ptr->hosts[server_key].cursor_active= 0; - memcached_quit_server(ptr, server_key); rc= MEMCACHED_SOME_ERRORS; continue; } @@ -85,7 +87,6 @@ memcached_return memcached_mget(memcached_st *ptr, if ((memcached_io_write(ptr, server_key, " ", 1, 0)) == -1) { ptr->hosts[server_key].cursor_active= 0; - memcached_quit_server(ptr, server_key); rc= MEMCACHED_SOME_ERRORS; continue; } @@ -101,7 +102,6 @@ memcached_return memcached_mget(memcached_st *ptr, /* We need to doo something about non-connnected hosts in the future */ if ((memcached_io_write(ptr, x, "\r\n", 2, 1)) == -1) { - memcached_quit_server(ptr, x); rc= MEMCACHED_SOME_ERRORS; } } diff --git a/lib/memcached_io.c b/lib/memcached_io.c index f9eae4db..8f72f54d 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -11,6 +11,8 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ { struct pollfd fds[1]; short flags= 0; + int error; + int latch= 0; if (read_or_write) flags= POLLOUT | POLLERR; @@ -21,10 +23,30 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ fds[0].fd= ptr->hosts[server_key].fd; fds[0].events= flags; - if (poll(fds, 1, ptr->poll_timeout) < 0) - return MEMCACHED_FAILURE; + while (latch == 0) + { + error= poll(fds, 1, ptr->poll_timeout); - return MEMCACHED_SUCCESS; + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == -1) + { + memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; + } + else if (error) + { + /* This is impossible */ + WATCHPOINT_ASSERT(0); + return MEMCACHED_FAILURE; + } + else + latch++; + } + + memcached_quit_server(ptr, server_key, 1); + + return MEMCACHED_FAILURE; /* Timeout occurred */ } ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, @@ -62,6 +84,7 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, break; default: { + memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; return -1; } @@ -105,6 +128,8 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, size_t sent_length; sent_length= memcached_io_flush(ptr, server_key); + if (sent_length == -1) + return -1; WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; @@ -128,6 +153,7 @@ memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) rc= MEMCACHED_SUCCESS; if (ptr->flags & MEM_NO_BLOCK) { + int error; struct pollfd fds[1]; short flags= 0; @@ -138,8 +164,15 @@ memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) fds[0].events= flags; fds[0].revents= 0; - if (poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout) < 0) - rc= MEMCACHED_FAILURE; + error= poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout); + + if (error == -1) + { + memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; + } + else if (error == 0) + return MEMCACHED_FAILURE; /* Timeout occurred */ } close(ptr->hosts[server_key].fd); @@ -196,6 +229,7 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) } /* Yes, we want to fall through */ default: + memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; return -1; } diff --git a/lib/memcached_quit.c b/lib/memcached_quit.c index 85a52861..3125abad 100644 --- a/lib/memcached_quit.c +++ b/lib/memcached_quit.c @@ -9,20 +9,24 @@ will force data to be completed. */ -void memcached_quit_server(memcached_st *ptr, unsigned int server_key) +void memcached_quit_server(memcached_st *ptr, unsigned int server_key, uint8_t io_death) { - if (ptr->hosts == NULL || - ptr->number_of_hosts == 0 || - server_key > ptr->number_of_hosts) + if (server_key > ptr->number_of_hosts) + { + WATCHPOINT_ASSERT(0); return; + } if (ptr->hosts[server_key].fd != -1) { - memcached_return rc; - rc= memcached_do(ptr, server_key, "quit\r\n", 6, 1); - WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED); - - memcached_io_close(ptr, server_key); + if (io_death == 0) + { + memcached_return rc; + rc= memcached_do(ptr, server_key, "quit\r\n", 6, 1); + WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED); + + memcached_io_close(ptr, server_key); + } ptr->hosts[server_key].fd= -1; ptr->hosts[server_key].stack_responses= 0; ptr->hosts[server_key].cursor_active= 0; @@ -38,11 +42,14 @@ void memcached_quit_server(memcached_st *ptr, unsigned int server_key) void memcached_quit(memcached_st *ptr) { unsigned int x; + if (ptr->hosts == NULL || + ptr->number_of_hosts == 0) + return; if (ptr->hosts && ptr->number_of_hosts) { for (x= 0; x < ptr->number_of_hosts; x++) - memcached_quit_server(ptr, x); + memcached_quit_server(ptr, x, 0); } ptr->connected= 0; diff --git a/lib/memcached_storage.c b/lib/memcached_storage.c index ee48cad3..ba8835cb 100644 --- a/lib/memcached_storage.c +++ b/lib/memcached_storage.c @@ -102,7 +102,6 @@ static inline memcached_return memcached_send(memcached_st *ptr, if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2, to_write)) == -1) { - memcached_quit_server(ptr, server_key); rc= MEMCACHED_WRITE_FAILURE; goto error; } diff --git a/tests/function.c b/tests/function.c index 34082966..77117826 100644 --- a/tests/function.c +++ b/tests/function.c @@ -1386,6 +1386,41 @@ uint8_t user_supplied_bug9(memcached_st *memc) return 0; } +uint8_t user_supplied_bug10(memcached_st *memc) +{ + char *key= "foo"; + char *value; + size_t value_length= 512; + unsigned int x; + int key_len= 3; + memcached_return rc; + unsigned int set= 1; + memcached_st *mclone= memcached_clone(NULL, memc); + + memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_NO_BLOCK, &set); + memcached_behavior_set(mclone, MEMCACHED_BEHAVIOR_TCP_NODELAY, &set); + + value = (char*)malloc(value_length * sizeof(char)); + + for (x= 0; x < value_length; x++) + value[x]= (char) (x % 127); + + for (x= 1; x <= 100000; ++x) + { + rc= memcached_set(mclone, key, key_len,value, value_length, 0, 0); + + assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_WRITE_FAILURE); + + if (rc == MEMCACHED_WRITE_FAILURE) + x--; + } + + free(value); + memcached_free(mclone); + + return 0; +} + uint8_t result_static(memcached_st *memc) { memcached_result_st result; @@ -1882,6 +1917,7 @@ test_st user_tests[] ={ {"user_supplied_bug7", 1, user_supplied_bug7 }, {"user_supplied_bug8", 1, user_supplied_bug8 }, {"user_supplied_bug9", 1, user_supplied_bug9 }, + {"user_supplied_bug10", 1, user_supplied_bug10 }, {0, 0, 0} };