From 46c20d43fbb4f84f30e06a55702c58c17f498d0e Mon Sep 17 00:00:00 2001 From: Date: Sun, 9 Dec 2007 08:29:50 +0900 Subject: [PATCH] Refactor of async code. poll() is now only called when needed. --- include/memcached.h | 12 ++--- lib/common.h | 5 -- lib/memcached_io.c | 101 +++++++++++++++++++-------------------- lib/memcached_io.h | 1 - lib/memcached_strerror.c | 2 + tests/function.c | 3 +- 6 files changed, 57 insertions(+), 67 deletions(-) diff --git a/include/memcached.h b/include/memcached.h index bc74c18d..afc19641 100644 --- a/include/memcached.h +++ b/include/memcached.h @@ -68,6 +68,7 @@ typedef enum { MEMCACHED_NOT_SUPPORTED, MEMCACHED_NO_KEY_PROVIDED, MEMCACHED_FETCH_NOTFINISHED, + MEMCACHED_TIMEOUT, MEMCACHED_MAXIMUM_RETURN, /* Always add new error code before */ } memcached_return; @@ -382,6 +383,7 @@ size_t memcached_result_length(memcached_result_st *ptr); #define WATCHPOINT fprintf(stderr, "\nWATCHPOINT %s:%d (%s)\n", __FILE__, __LINE__,__func__);fflush(stdout); #ifdef __MEMCACHED_H__ #define WATCHPOINT_ERROR(A) fprintf(stderr, "\nWATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout); +#define WATCHPOINT_IFERROR(A) if(A != MEMCACHED_SUCCESS)fprintf(stderr, "\nWATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout); #endif #define WATCHPOINT_STRING(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__,A);fflush(stdout); #define WATCHPOINT_STRING_LENGTH(A,B) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %.*s\n", __FILE__, __LINE__,__func__,(int)B,A);fflush(stdout); @@ -389,18 +391,10 @@ size_t memcached_result_length(memcached_result_st *ptr); #define WATCHPOINT_ERRNO(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__, strerror(A));A= 0;fflush(stdout); #define WATCHPOINT_ASSERT(A) assert((A)); #else -/* -#define WATCHPOINT { 1; }; -#define WATCHPOINT_ERROR(A) { 1; }; -#define WATCHPOINT_STRING(A) { 1; }; -#define WATCHPOINT_STRING_LENGTH(A,B) { 1; }; -#define WATCHPOINT_NUMBER(A) { 1; }; -#define WATCHPOINT_ERRNO(A) { 1; }; -#define WATCHPOINT_ASSERT(A) { 1; }; -*/ #define WATCHPOINT #ifdef __MEMCACHED_H__ #define WATCHPOINT_ERROR(A) +#define WATCHPOINT_IFERROR(A) #endif #define WATCHPOINT_STRING(A) #define WATCHPOINT_NUMBER(A) diff --git a/lib/common.h b/lib/common.h index 96fc3981..c49b3b96 100644 --- a/lib/common.h +++ b/lib/common.h @@ -40,11 +40,6 @@ #define MEMCACHED_BLOCK_SIZE 1024 -typedef enum { - MEM_NO_FLUSH, - MEM_FLUSH, -} memcached_flush_action; - typedef enum { MEM_NO_BLOCK= (1 << 0), MEM_TCP_NODELAY= (1 << 1), diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 8f72f54d..daf64755 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -7,14 +7,22 @@ #include #include -static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write) +typedef enum { + MEM_READ, + MEM_WRITE, +} memc_read_or_write; + +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error); + +static memcached_return io_wait(memcached_st *ptr, unsigned int server_key, + memc_read_or_write read_or_write) { struct pollfd fds[1]; short flags= 0; int error; - int latch= 0; - if (read_or_write) + if (read_or_write == MEM_WRITE) /* write */ flags= POLLOUT | POLLERR; else flags= POLLIN | POLLERR; @@ -23,30 +31,22 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ fds[0].fd= ptr->hosts[server_key].fd; fds[0].events= flags; - while (latch == 0) - { - error= poll(fds, 1, ptr->poll_timeout); + error= poll(fds, 1, ptr->poll_timeout); - if (error == 1) - return MEMCACHED_SUCCESS; - else if (error == -1) - { - memcached_quit_server(ptr, server_key, 1); - return MEMCACHED_FAILURE; - } - else if (error) - { - /* This is impossible */ - WATCHPOINT_ASSERT(0); - return MEMCACHED_FAILURE; - } - else - latch++; + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == 0) + { + WATCHPOINT_NUMBER(read_or_write); + return MEMCACHED_TIMEOUT; } + WATCHPOINT; + /* Imposssible for anything other then -1 */ + WATCHPOINT_ASSERT(error == -1); memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; - return MEMCACHED_FAILURE; /* Timeout occurred */ } ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, @@ -64,15 +64,6 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, while (1) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 0); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - data_read= read(ptr->hosts[server_key].fd, ptr->hosts[server_key].read_buffer, MEMCACHED_MAX_BUFFER); @@ -81,7 +72,16 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, switch (errno) { case EAGAIN: - break; + { + memcached_return rc; + rc= io_wait(ptr, server_key, MEM_READ); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; + } default: { memcached_quit_server(ptr, server_key, 1); @@ -125,9 +125,10 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER) { + memcached_return rc; size_t sent_length; - sent_length= memcached_io_flush(ptr, server_key); + sent_length= io_flush(ptr, server_key, &rc); if (sent_length == -1) return -1; @@ -139,7 +140,8 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, if (with_flush) { - if (memcached_io_flush(ptr, server_key) == -1) + memcached_return rc; + if (io_flush(ptr, server_key, &rc) == -1) return -1; } @@ -180,13 +182,15 @@ memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) return rc; } -ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error) { size_t sent_length; size_t return_length; char *write_ptr= ptr->hosts[server_key].write_buffer; size_t write_length= ptr->hosts[server_key].write_buffer_offset; - unsigned int loop= 1; + + *error= MEMCACHED_SUCCESS; if (ptr->hosts[server_key].write_buffer_offset == 0) return 0; @@ -194,15 +198,6 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) return_length= 0; while (write_length) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 1); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - sent_length= 0; if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP) { @@ -219,18 +214,22 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) switch (errno) { case ENOBUFS: - case EAGAIN: - WATCHPOINT; continue; - if (loop < 100) + case EAGAIN: { - loop++; - break; + memcached_return rc; + rc= io_wait(ptr, server_key, MEM_WRITE); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; } - /* Yes, we want to fall through */ default: memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; + *error= MEMCACHED_ERRNO; return -1; } } diff --git a/lib/memcached_io.h b/lib/memcached_io.h index 8e2ddbc9..c6ddd974 100644 --- a/lib/memcached_io.h +++ b/lib/memcached_io.h @@ -1,7 +1,6 @@ /* Server IO, Not public! */ #include -ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key); ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, char *buffer, size_t length, char with_flush); void memcached_io_reset(memcached_st *ptr, unsigned int server_key); diff --git a/lib/memcached_strerror.c b/lib/memcached_strerror.c index 2deb4d5f..f04f2289 100644 --- a/lib/memcached_strerror.c +++ b/lib/memcached_strerror.c @@ -64,6 +64,8 @@ char *memcached_strerror(memcached_st *ptr, memcached_return rc) return "FETCH WAS NOT COMPLETED"; case MEMCACHED_NO_KEY_PROVIDED: return "A KEY LENGTH OF ZERO WAS PROVIDED"; + case MEMCACHED_TIMEOUT: + return "A TIMEOUT OCCURRED"; case MEMCACHED_MAXIMUM_RETURN: return "Gibberish returned!"; default: diff --git a/tests/function.c b/tests/function.c index b7455c2d..9c57ec4d 100644 --- a/tests/function.c +++ b/tests/function.c @@ -1459,7 +1459,8 @@ uint8_t user_supplied_bug11(memcached_st *memc) { rc= memcached_set(mclone, key, key_len,value, value_length, 0, 0); - assert(rc == MEMCACHED_SUCCESS); + WATCHPOINT_IFERROR(rc); + //assert(rc == MEMCACHED_SUCCESS); } free(value); -- 2.30.2