From: Date: Sat, 1 Dec 2007 03:48:03 +0000 (-0800) Subject: Big fix for async mode to make sure all data has been pushed through socket X-Git-Tag: 0.13~60 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=97ccdd0fbb452e094e1fcfde80376f805370ff70;p=m6w6%2Flibmemcached Big fix for async mode to make sure all data has been pushed through socket before close. Refactor of memcached_get() to use common code. --- diff --git a/ChangeLog b/ChangeLog index ffb487e2..42273139 100644 --- a/ChangeLog +++ b/ChangeLog @@ -2,6 +2,10 @@ * Updates for consistent hashing * IPV6 support * Static allocation for hostname (performance) + * Fixed bug where in non-block mode all data might not have been sent on + close(). + * Refactor of memcached_get() to use common code. + 0.11 Mon Nov 26 01:05:52 PST 2007 * Added option to memcache_behavior_set() so that poll() can be timed out. diff --git a/include/memcached.h b/include/memcached.h index 8b883534..c0b5a7cb 100644 --- a/include/memcached.h +++ b/include/memcached.h @@ -194,6 +194,7 @@ struct memcached_st { memcached_hash hash; memcached_server_distribution distribution; unsigned int wheel[MEMCACHED_WHEEL_SIZE]; + uint8_t replicas; memcached_return warning; /* Future Use */ }; @@ -322,6 +323,7 @@ size_t memcached_result_length(memcached_result_st *ptr); #define WATCHPOINT_ERROR(A) fprintf(stderr, "\nWATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout); #endif #define WATCHPOINT_STRING(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__,A);fflush(stdout); +#define WATCHPOINT_STRING_LENGTH(A,B) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %.*s\n", __FILE__, __LINE__,__func__,(int)B,A);fflush(stdout); #define WATCHPOINT_NUMBER(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %zu\n", __FILE__, __LINE__,__func__,(size_t)(A));fflush(stdout); #define WATCHPOINT_ERRNO(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__, strerror(A));A= 0;fflush(stdout); #define WATCHPOINT_ASSERT(A) assert((A)); @@ -330,6 +332,7 @@ size_t memcached_result_length(memcached_result_st *ptr); #define WATCHPOINT { 1; }; #define WATCHPOINT_ERROR(A) { 1; }; #define WATCHPOINT_STRING(A) { 1; }; +#define WATCHPOINT_STRING_LENGTH(A,B) { 1; }; #define WATCHPOINT_NUMBER(A) { 1; }; #define WATCHPOINT_ERRNO(A) { 1; }; #define WATCHPOINT_ASSERT(A) { 1; }; diff --git a/lib/common.h b/lib/common.h index d4a7baca..5c2d2682 100644 --- a/lib/common.h +++ b/lib/common.h @@ -19,6 +19,10 @@ #include #include #include +#include +#include +#include + #include @@ -36,6 +40,11 @@ #define MEMCACHED_BLOCK_SIZE 1024 +typedef enum { + MEM_NO_FLUSH, + MEM_FLUSH, +} memcached_flush_action; + typedef enum { MEM_NO_BLOCK= (1 << 0), MEM_TCP_NODELAY= (1 << 1), diff --git a/lib/memcached_connect.c b/lib/memcached_connect.c index 672b1c23..82a08ab7 100644 --- a/lib/memcached_connect.c +++ b/lib/memcached_connect.c @@ -1,13 +1,5 @@ #include "common.h" -#include -#include -#include -#include -#include -#include -#include - static memcached_return set_hostinfo(memcached_server_st *server) { struct addrinfo *ai; @@ -140,10 +132,11 @@ static memcached_return tcp_connect(memcached_st *ptr, unsigned int server_key) use->ai_protocol)) < 0) { ptr->cached_errno= errno; + WATCHPOINT_ERRNO(errno); return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE; } - /* For the moment, not getting a nonblocking mode will note be fatal */ + /* For the moment, not getting a nonblocking mode will not be fatal */ if (ptr->flags & MEM_NO_BLOCK) { int flags; diff --git a/lib/memcached_get.c b/lib/memcached_get.c index 63db8d5d..0ca40523 100644 --- a/lib/memcached_get.c +++ b/lib/memcached_get.c @@ -146,76 +146,32 @@ char *memcached_get(memcached_st *ptr, char *key, size_t key_length, uint16_t *flags, memcached_return *error) { - char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - char *buf_ptr= buffer; - unsigned int server_key; - memcached_string_st *result_buffer; - LIBMEMCACHED_MEMCACHED_GET_START(); + char *value; + char *dummy_value; + size_t dummy_length; + uint16_t dummy_flags; + memcached_return dummy_error; - if (key_length == 0) - { - *error= MEMCACHED_NO_KEY_PROVIDED; - return NULL; - } + /* Request the key */ + *error= memcached_mget(ptr, &key, &key_length, 1); - if (ptr->hosts == NULL || ptr->number_of_hosts == 0) - { - *error= MEMCACHED_NO_SERVERS; - return NULL; - } - - server_key= memcached_generate_hash(ptr, key, key_length); - result_buffer= &ptr->result_buffer; + value= memcached_fetch(ptr, NULL, NULL, + value_length, flags, error); - *value_length= 0; - memcpy(buf_ptr, "get ", 4); - buf_ptr+= 4; - memcpy(buf_ptr, key, key_length); - buf_ptr+= key_length; - memcpy(buf_ptr, "\r\n", 2); - buf_ptr+= 2; - - *error= memcached_do(ptr, server_key, buffer, (size_t)(buf_ptr - buffer), 1); - if (*error != MEMCACHED_SUCCESS) - goto error; - - *error= memcached_value_fetch(ptr, NULL, NULL, result_buffer, - flags, NULL, server_key); - *value_length= memcached_string_length(result_buffer); - if (*error == MEMCACHED_END && *value_length == 0) - { - *error= MEMCACHED_NOTFOUND; - goto error; - } - else if (*error == MEMCACHED_END) - { - WATCHPOINT_ASSERT(0); /* If this happens we have somehow messed up the fetch */ - } - else if (*error == MEMCACHED_SUCCESS) - { - memcached_return rc; - /* We need to read END */ - rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key); - - if (rc != MEMCACHED_END) - { - *error= MEMCACHED_PROTOCOL_ERROR; - goto error; - } - } - else - goto error; - - LIBMEMCACHED_MEMCACHED_GET_END(); - - return memcached_string_c_copy(result_buffer); + if (value == NULL) + return NULL; -error: - *value_length= 0; + /* We do a second read to clean the cursor */ + dummy_value= memcached_fetch(ptr, NULL, NULL, + &dummy_length, &dummy_flags, + &dummy_error); - LIBMEMCACHED_MEMCACHED_GET_END(); + /* Something is really wrong if this happens */ + WATCHPOINT_ASSERT(dummy_value == NULL); + if (dummy_value) + free(dummy_value); - return NULL; + return value; } memcached_return memcached_mget(memcached_st *ptr, diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 8a05ddb8..0657dfd6 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -124,6 +124,32 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, return length; } +memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) +{ + struct pollfd fds[1]; + short flags= 0; + struct timespec timer; + memcached_return rc; + + timer.tv_sec= 1; + timer.tv_nsec= 0; + flags= POLLHUP | POLLERR; + + memset(&fds, 0, sizeof(struct pollfd)); + fds[0].fd= ptr->hosts[server_key].fd; + fds[0].events= flags; + fds[0].revents= flags; + + if (poll(fds, 1, ptr->poll_timeout) < 0) + rc= MEMCACHED_FAILURE; + else + rc= MEMCACHED_SUCCESS; + + close(ptr->hosts[server_key].fd); + + return rc; +} + ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) { size_t sent_length; diff --git a/lib/memcached_io.h b/lib/memcached_io.h index bb5b8fc5..8e2ddbc9 100644 --- a/lib/memcached_io.h +++ b/lib/memcached_io.h @@ -7,3 +7,4 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, void memcached_io_reset(memcached_st *ptr, unsigned int server_key); ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, char *buffer, size_t length); +memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key); diff --git a/lib/memcached_quit.c b/lib/memcached_quit.c index a0cc6172..483b1772 100644 --- a/lib/memcached_quit.c +++ b/lib/memcached_quit.c @@ -18,12 +18,11 @@ void memcached_quit_server(memcached_st *ptr, unsigned int server_key) if (ptr->hosts[server_key].fd != -1) { - if (ptr->flags & MEM_NO_BLOCK && ptr->hosts[server_key].stack_responses) - memcached_io_flush(ptr, server_key); + memcached_return rc; + rc= memcached_do(ptr, server_key, "quit\r\n", 6, 1); + WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS); - memcached_io_write(ptr, server_key, "quit\r\n", 6, 1); - - close(ptr->hosts[server_key].fd); + memcached_io_close(ptr, server_key); ptr->hosts[server_key].fd= -1; ptr->hosts[server_key].stack_responses= 0; ptr->hosts[server_key].cursor_active= 0; diff --git a/lib/memcached_storage.c b/lib/memcached_storage.c index 93968ad9..6672a86a 100644 --- a/lib/memcached_storage.c +++ b/lib/memcached_storage.c @@ -62,16 +62,8 @@ static inline memcached_return memcached_send(memcached_st *ptr, if (key_length == 0) return MEMCACHED_NO_KEY_PROVIDED; - if (ptr->hosts == NULL || ptr->number_of_hosts == 0) - return MEMCACHED_NO_SERVERS; - server_key= memcached_generate_hash(ptr, key, key_length); - rc= memcached_connect(ptr, server_key); - if (rc != MEMCACHED_SUCCESS) - return rc; - - if (cas) write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, "%s %.*s %u %llu %zu %llu\r\n", storage_op_string(verb), @@ -90,15 +82,9 @@ static inline memcached_return memcached_send(memcached_st *ptr, goto error; } - /* - We have to flush after sending the command. Memcached is not smart enough - to just keep reading from the socket :( - */ - if ((sent_length= memcached_io_write(ptr, server_key, buffer, write_length, 0)) == -1) - { - rc= MEMCACHED_WRITE_FAILURE; + rc= memcached_do(ptr, server_key, buffer, write_length, 0); + if (rc != MEMCACHED_SUCCESS) goto error; - } if ((sent_length= memcached_io_write(ptr, server_key, value, value_length, 0)) == -1) { @@ -118,7 +104,7 @@ static inline memcached_return memcached_send(memcached_st *ptr, goto error; } - if ((ptr->flags & MEM_NO_BLOCK) && verb == SET_OP) + if (to_write == 0) { rc= MEMCACHED_SUCCESS; memcached_server_response_increment(ptr, server_key); diff --git a/tests/function.c b/tests/function.c index 3a0bd648..9cd10aff 100644 --- a/tests/function.c +++ b/tests/function.c @@ -319,12 +319,21 @@ uint8_t prepend_test(memcached_st *memc) return 0; } +/* + Set the value, then quit to make sure it is flushed. + Come back in and test that add fails. +*/ uint8_t add_test(memcached_st *memc) { memcached_return rc; char *key= "foo"; char *value= "when we sanitize"; + rc= memcached_set(memc, key, strlen(key), + value, strlen(value), + (time_t)0, (uint16_t)0); + assert(rc == MEMCACHED_SUCCESS); + memcached_quit(memc); rc= memcached_add(memc, key, strlen(key), value, strlen(value), (time_t)0, (uint16_t)0); @@ -1760,7 +1769,7 @@ test_st tests[] ={ {"set", 0, set_test }, {"set2", 0, set_test2 }, {"set3", 0, set_test3 }, - {"add", 0, add_test }, + {"add", 1, add_test }, {"replace", 0, replace_test }, {"delete", 1, delete_test }, {"get", 1, get_test },