From: Brian Aker Date: Fri, 12 Oct 2007 08:46:27 +0000 (-0700) Subject: This is a rewrite of some of the IO code to handle larger loads of set data X-Git-Tag: 0.7~44 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=d6505b16fb85f77527934510d0395adfcbb66377;p=awesomized%2Flibmemcached This is a rewrite of some of the IO code to handle larger loads of set data on concurrent insert with non-blocking IO. A bug was also uncovered where the read bufffers for multiple hosts were not being read (all code has been refactored for that now). One user contributed test case has been added. --- diff --git a/include/memcached.h b/include/memcached.h index da051070..558fbb09 100644 --- a/include/memcached.h +++ b/include/memcached.h @@ -67,6 +67,7 @@ typedef enum { MEMCACHED_DELETED, MEMCACHED_VALUE, MEMCACHED_STAT, + MEMCACHED_ERRNO, MEMCACHED_MAXIMUM_RETURN, /* Always add new error code before */ } memcached_return; @@ -86,6 +87,7 @@ struct memcached_server_st { char *hostname; unsigned int port; int fd; + unsigned int stack_responses; }; struct memcached_stat_st { @@ -136,7 +138,6 @@ struct memcached_st { size_t write_buffer_offset; char connected; int my_errno; - unsigned int stack_responses; unsigned long long flags; memcached_return warning; /* Future Use */ }; @@ -196,6 +197,9 @@ char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length, #define memcached_server_name(A,B) B.hostname #define memcached_server_port(A,B) B.port #define memcached_server_list(A) A->hosts +#define memcached_server_response_increment(A,B) A->hosts[B].stack_responses++ +#define memcached_server_response_decrement(A,B) A->hosts[B].stack_responses-- +#define memcached_server_response_count(A,B) A->hosts[B].stack_responses memcached_return memcached_server_add(memcached_st *ptr, char *hostname, unsigned int port); diff --git a/lib/common.h b/lib/common.h index f922fee1..8440f412 100644 --- a/lib/common.h +++ b/lib/common.h @@ -32,5 +32,6 @@ memcached_return memcached_response(memcached_st *ptr, char *buffer, size_t buffer_length, unsigned int server_key); unsigned int memcached_generate_hash(memcached_st *ptr, char *key, size_t key_length); +void memcached_quit_server(memcached_st *ptr, unsigned int server_key); #endif /* __COMMON_H__ */ diff --git a/lib/memcached_connect.c b/lib/memcached_connect.c index f95b8e7d..116423ce 100644 --- a/lib/memcached_connect.c +++ b/lib/memcached_connect.c @@ -4,6 +4,7 @@ #include #include #include +#include memcached_return memcached_real_connect(memcached_st *ptr, unsigned int server_key) { @@ -13,7 +14,10 @@ memcached_return memcached_real_connect(memcached_st *ptr, unsigned int server_k if (ptr->hosts[server_key].fd == -1) { if ((h= gethostbyname(ptr->hosts[server_key].hostname)) == NULL) + { + ptr->my_errno= h_errno; return MEMCACHED_HOST_LOCKUP_FAILURE; + } servAddr.sin_family= h->h_addrtype; memcpy((char *) &servAddr.sin_addr.s_addr, h->h_addr_list[0], h->h_length); @@ -56,6 +60,7 @@ test_connect: { switch (errno) { /* We are spinning waiting on connect */ + case EALREADY: case EINPROGRESS: case EINTR: goto test_connect; @@ -63,7 +68,7 @@ test_connect: break; default: ptr->my_errno= errno; - return MEMCACHED_HOST_LOCKUP_FAILURE; + return MEMCACHED_ERRNO; } ptr->connected++; } diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 9277b6ba..2f86930e 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -107,7 +107,10 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, } if (with_flush) - memcached_io_flush(ptr, server_key); + { + if (memcached_io_flush(ptr, server_key) == -1) + return -1; + } return length; } @@ -115,30 +118,67 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) { size_t sent_length; + char *write_ptr= ptr->write_buffer; + size_t write_length= ptr->write_buffer_offset; + unsigned int loop= 1; if (ptr->write_buffer_offset == 0) return 0; - if (ptr->flags & MEM_NO_BLOCK) + while (write_length) { - struct timeval local_tv; - fd_set set; + if (ptr->flags & MEM_NO_BLOCK) + { - local_tv.tv_sec= 0; - local_tv.tv_usec= 300; + while (1) + { + struct timeval local_tv; + fd_set set; + int select_return; - FD_ZERO(&set); - FD_SET(ptr->hosts[server_key].fd, &set); + local_tv.tv_sec= 0; + local_tv.tv_usec= 300 * loop; - select(1, NULL, &set, NULL, &local_tv); - } - if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, - ptr->write_buffer_offset, 0)) == -1) - { - return -1; - } + FD_ZERO(&set); + FD_SET(ptr->hosts[server_key].fd, &set); + + select_return= select(1, NULL, &set, NULL, &local_tv); + + if (select_return == -1) + { + ptr->my_errno= errno; + return -1; + } + else if (!select_return) + break; + } + } - assert(sent_length == ptr->write_buffer_offset); + sent_length= 0; + if ((sent_length= send(ptr->hosts[server_key].fd, write_ptr, + write_length, 0)) == -1) + { + switch (errno) + { + case ENOBUFS: + case EAGAIN: + if (loop < 10) + { + loop++; + break; + } + /* Yes, we want to fall through */ + default: + ptr->my_errno= errno; + return -1; + } + } + else + { + write_ptr+= sent_length; + write_length-= sent_length; + } + } ptr->write_buffer_offset= 0; diff --git a/lib/memcached_quit.c b/lib/memcached_quit.c index 00b454c0..d7a42315 100644 --- a/lib/memcached_quit.c +++ b/lib/memcached_quit.c @@ -8,6 +8,21 @@ The reason we send "quit" is that in case we have buffered IO, this will force data to be completed. */ + +void memcached_quit_server(memcached_st *ptr, unsigned int server_key) +{ + if (ptr->hosts[server_key].fd != -1) + { + if (ptr->flags & MEM_NO_BLOCK) + memcached_io_write(ptr, server_key, "quit\r\n", 6, 1); + close(ptr->hosts[server_key].fd); + ptr->hosts[server_key].fd= -1; + ptr->hosts[server_key].stack_responses= 0; + } + + ptr->connected--; +} + void memcached_quit(memcached_st *ptr) { unsigned int x; @@ -15,15 +30,7 @@ void memcached_quit(memcached_st *ptr) if (ptr->hosts) { for (x= 0; x < ptr->number_of_hosts; x++) - { - if (ptr->hosts[x].fd != -1) - { - if (ptr->flags & MEM_NO_BLOCK) - memcached_io_write(ptr, x, "quit\r\n", 6, 1); - close(ptr->hosts[x].fd); - ptr->hosts[x].fd= -1; - } - } + memcached_quit_server(ptr, x); } ptr->connected= 0; diff --git a/lib/memcached_response.c b/lib/memcached_response.c index 5b0b7831..10547504 100644 --- a/lib/memcached_response.c +++ b/lib/memcached_response.c @@ -15,13 +15,17 @@ memcached_return memcached_response(memcached_st *ptr, unsigned int x; size_t send_length; char *buffer_ptr; + unsigned int max_messages; + memset(buffer, 0, buffer_length); send_length= 0; - for (x= 0; x <= ptr->stack_responses; x++) + max_messages= memcached_server_response_count(ptr, server_key); + for (x= 0; x <= max_messages; x++) { buffer_ptr= buffer; + while (1) { unsigned int read_length; @@ -37,8 +41,10 @@ memcached_return memcached_response(memcached_st *ptr, else buffer_ptr++; } + + if (memcached_server_response_count(ptr, server_key)) + memcached_server_response_decrement(ptr, server_key); } - ptr->stack_responses= 0; switch(buffer[0]) { diff --git a/lib/memcached_storage.c b/lib/memcached_storage.c index 6b76375c..353615fe 100644 --- a/lib/memcached_storage.c +++ b/lib/memcached_storage.c @@ -37,7 +37,12 @@ static memcached_return memcached_send(memcached_st *ptr, memset(buffer, 0, MEMCACHED_DEFAULT_COMMAND_SIZE); - /* Leaveing this assert in since only a library fubar could blow this */ + /* Leaving this assert in since only a library fubar could blow this */ + if (ptr->write_buffer_offset != 0) + { + WATCHPOINT_NUMBER(ptr->write_buffer_offset); + } + assert(ptr->write_buffer_offset == 0); server_key= memcached_generate_hash(ptr, key, key_length); @@ -74,6 +79,7 @@ static memcached_return memcached_send(memcached_st *ptr, if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2, 1)) == -1) { + memcached_quit_server(ptr, server_key); rc= MEMCACHED_WRITE_FAILURE; goto error; } @@ -81,7 +87,7 @@ static memcached_return memcached_send(memcached_st *ptr, if ((ptr->flags & MEM_NO_BLOCK) && verb == SET_OP) { rc= MEMCACHED_SUCCESS; - ptr->stack_responses++; + memcached_server_response_increment(ptr, server_key); } else { diff --git a/lib/memcached_strerror.c b/lib/memcached_strerror.c index 372cea23..587ac862 100644 --- a/lib/memcached_strerror.c +++ b/lib/memcached_strerror.c @@ -54,6 +54,8 @@ char *memcached_strerror(memcached_st *ptr, memcached_return rc) return "SERVER VALUE"; case MEMCACHED_STAT: return "STAT VALUE"; + case MEMCACHED_ERRNO: + return "UNKOWN ERROR SEE MY_ERRNO"; case MEMCACHED_MAXIMUM_RETURN: return "Gibberish returned!"; default: diff --git a/tests/test.c b/tests/test.c index 935e1400..714cd891 100644 --- a/tests/test.c +++ b/tests/test.c @@ -391,6 +391,7 @@ void get_stats_keys(memcached_st *memc) assert(rc == MEMCACHED_SUCCESS); for (ptr= list; *ptr; ptr++) printf("Found key %s\n", *ptr); + fflush(stdout); free(list); } @@ -485,6 +486,48 @@ void behavior_test(memcached_st *memc) assert(value == 0); } +/* Test case provided by Cal Haldenbrand */ +void user_supplied_bug1(memcached_st *memc) +{ + unsigned int setter= 1; + unsigned int x; + + long total= 0; + int size= 0; + srand(time(NULL)); + char key[10]; + char *randomstuff = (char *)malloc(6 * 1024); + memset(randomstuff, 0, 6 * 1024); + + memcached_return rc; + + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &setter); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &setter); + + + /* add key */ + for (x= 0 ; total < 20 * 1024576 ; x++ ) + { + unsigned int j= 0; + + size= (rand() % ( 5 * 1024 ) ) + 400; + memset(randomstuff, 0, 6 * 1024); + assert(size < 6 * 1024); /* Being safe here */ + + for (j= 0 ; j < size ;j++) + randomstuff[j] = (char) (rand() % 26) + 97; + + total += size; + sprintf(key, "%d", x); + rc = memcached_set(memc, key, strlen(key), + randomstuff, strlen(randomstuff), 10, 0); + /* If we fail, lets try again */ + if (rc != MEMCACHED_SUCCESS) + rc = memcached_set(memc, key, strlen(key), + randomstuff, strlen(randomstuff), 10, 0); + assert(rc == MEMCACHED_SUCCESS); + } +} void add_host_test1(memcached_st *memc) { unsigned int x; @@ -533,6 +576,7 @@ int main(int argc, char *argv[]) server_list= "localhost"; printf("servers %s\n", server_list); + srandom(time(NULL)); servers= memcached_servers_parse(server_list); assert(servers); @@ -565,6 +609,11 @@ int main(int argc, char *argv[]) {0, 0, 0} }; + test_st user_tests[] ={ + {"user_supplied_bug1", 0, user_supplied_bug1 }, + {0, 0, 0} + }; + fprintf(stderr, "\nBlock tests\n\n"); for (x= 0; tests[x].function_name; x++) { @@ -641,6 +690,24 @@ int main(int argc, char *argv[]) memcached_free(memc); } + fprintf(stderr, "\nUser Supplied tests\n\n"); + for (x= 0; user_tests[x].function_name; x++) + { + memcached_st *memc; + memcached_return rc; + memc= memcached_create(NULL); + assert(memc); + + rc= memcached_server_push(memc, servers); + assert(rc == MEMCACHED_SUCCESS); + + fprintf(stderr, "Testing %s", user_tests[x].function_name); + user_tests[x].function(memc); + fprintf(stderr, "\t\t\t\t\t[ ok ]\n"); + assert(memc); + memcached_free(memc); + } + /* Clean up whatever we might have left */ { memcached_st *memc; @@ -649,5 +716,8 @@ int main(int argc, char *argv[]) flush_test(memc); memcached_free(memc); } + + fprintf(stderr, "All tests completed successfully\n\n"); + return 0; }