X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.c;h=ef46a0d9e460b93f819e8c70d368ce3c2c107b72;hb=70c3fdd7d6d441a8be3e831f6a0aa3ae88166bc6;hp=3b8943c8dc31c1c00a0f0bc0b1705cb8f150f6e8;hpb=e42302e08fa4d04cb21eaf7493f5f92b11169c03;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.c b/libmemcached/io.c index 3b8943c8..ef46a0d9 100644 --- a/libmemcached/io.c +++ b/libmemcached/io.c @@ -53,16 +53,34 @@ static memcached_return_t io_wait(memcached_server_instance_st *ptr, if (ptr->root->flags.no_block == false) timeout= -1; - error= poll(&fds, 1, timeout); + size_t loop_max= 5; + while (--loop_max) + { + error= poll(&fds, 1, timeout); - if (error == 1) - return MEMCACHED_SUCCESS; - else if (error == 0) - return MEMCACHED_TIMEOUT; + switch (error) + { + case 1: + return MEMCACHED_SUCCESS; + case 0: + return MEMCACHED_TIMEOUT; +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + case EINTR: + continue; + default: + ptr->cached_errno= error; + memcached_quit_server(ptr, true); + + return MEMCACHED_FAILURE; + } + } /* Imposssible for anything other then -1 */ WATCHPOINT_ASSERT(error == -1); - memcached_quit_server(ptr, 1); + ptr->cached_errno= error; + memcached_quit_server(ptr, true); return MEMCACHED_FAILURE; } @@ -127,20 +145,21 @@ static bool process_input_buffer(memcached_server_instance_st *ptr) */ memcached_callback_st cb= *ptr->root->callbacks; - ptr->root->options.is_processing_input= true; + memcached_set_processing_input((memcached_st *)ptr->root, true); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; memcached_return_t error; + memcached_st *root= (memcached_st *)ptr->root; error= memcached_response(ptr, buffer, sizeof(buffer), - &ptr->root->result); + &root->result); - ptr->root->options.is_processing_input = false; + memcached_set_processing_input(root, false); if (error == MEMCACHED_SUCCESS) { for (unsigned int x= 0; x < cb.number_of_callback; x++) { - error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context); + error= (*cb.callback[x])(ptr->root, &root->result, cb.context); if (error != MEMCACHED_SUCCESS) break; } @@ -154,7 +173,41 @@ static bool process_input_buffer(memcached_server_instance_st *ptr) return false; } -#ifdef UNUSED +static inline void memcached_io_cork_push(memcached_server_st *ptr) +{ + (void)ptr; +#ifdef CORK + if (ptr->root->flags.cork == false || ptr->state.is_corked) + return; + + int enable= 1; + int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK, + &enable, (socklen_t)sizeof(int)); + if (! err) + ptr->state.is_corked= true; + + WATCHPOINT_ASSERT(ptr->state.is_corked == true); +#endif +} + +static inline void memcached_io_cork_pop(memcached_server_st *ptr) +{ + (void)ptr; +#ifdef CORK + if (ptr->root->flags.cork == false || ptr->state.is_corked == false) + return; + + int enable= 0; + int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK, + &enable, (socklen_t)sizeof(int)); + if (! err) + ptr->state.is_corked= false; + + WATCHPOINT_ASSERT(ptr->state.is_corked == false); +#endif +} + +#if 0 // Dead code, this should be removed. void memcached_io_preread(memcached_st *ptr) { unsigned int x; @@ -207,13 +260,16 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, { case EAGAIN: case EINTR: +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS) continue; /* fall through */ default: { - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); *nread= -1; return rc; } @@ -230,7 +286,7 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, for blocking I/O we do not return 0 and for non-blocking case it will return EGAIN if data is not immediatly available. */ - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); *nread= -1; return MEMCACHED_UNKNOWN_READ_FAILURE; } @@ -270,7 +326,7 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, } ssize_t memcached_io_write(memcached_server_instance_st *ptr, - const void *buffer, size_t length, char with_flush) + const void *buffer, size_t length, bool with_flush) { size_t original_length; const char* buffer_ptr; @@ -280,6 +336,12 @@ ssize_t memcached_io_write(memcached_server_instance_st *ptr, original_length= length; buffer_ptr= buffer; + /* more writable data is coming if a flush isn't required, so delay send */ + if (! with_flush) + { + memcached_io_cork_push(ptr); + } + while (length) { char *write_ptr; @@ -330,7 +392,11 @@ ssize_t memcached_io_write(memcached_server_instance_st *ptr, memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != -1); if (io_flush(ptr, &rc) == -1) + { return -1; + } + + memcached_io_cork_pop(ptr); } return (ssize_t) original_length; @@ -503,11 +569,11 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) continue; - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); return -1; } default: - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); *error= MEMCACHED_ERRNO; return -1; } @@ -516,7 +582,7 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, if (ptr->type == MEMCACHED_CONNECTION_UDP && (size_t)sent_length != write_length) { - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); return -1; } @@ -546,7 +612,7 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, */ void memcached_io_reset(memcached_server_instance_st *ptr) { - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); } /**