X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fmemcached_io.c;h=9a11556bdbaee21765d8fabbe5af3db70f74a4f6;hb=d455dd963105dc3ccc120887215f66dec8f55377;hp=0624487f5ce8bdcc3f1ed8fd42c95c543a5c7ae4;hpb=b7f043ada7b39b3e9e244a7f6864a31cd08ad650;p=awesomized%2Flibmemcached diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 0624487f..9a11556b 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -30,7 +30,6 @@ static memcached_return io_wait(memcached_server_st *ptr, fds[0].fd= ptr->fd; fds[0].events= flags; -#ifdef NOT_DONE /* ** We are going to block on write, but at least on Solaris we might block ** on write if we haven't read anything from our input buffer.. @@ -41,10 +40,10 @@ static memcached_return io_wait(memcached_server_st *ptr, */ if (read_or_write == MEM_WRITE) { - if (memcached_purge(ptr) != MEMCACHED_SUCCESS || memcached_purge(ptr) != MEMCACHED_STORED) - return MEMCACHED_FAILURE; + memcached_return rc=memcached_purge(ptr); + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + return MEMCACHED_FAILURE; } -#endif error= poll(fds, 1, ptr->root->poll_timeout); @@ -99,7 +98,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr, while (length) { - uint8_t found_eof= 0; if (!ptr->read_buffer_length) { ssize_t data_read; @@ -136,8 +134,17 @@ ssize_t memcached_io_read(memcached_server_st *ptr, } else { - found_eof= 1; - break; + /* + EOF. Any data received so far is incomplete + so discard it. This always reads by byte in case of TCP + and protocol enforcement happens at memcached_response() + looking for '\n'. We do not care for UDB which requests 8 bytes + at once. Generally, this means that connection went away. Since + for blocking I/O we do not return 0 and for non-blocking case + it will return EGAIN if data is not immediatly available. + */ + memcached_quit_server(ptr, 1); + return -1; } } @@ -167,9 +174,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr, buffer_ptr++; break; } - - if (found_eof) - break; } return (size_t)(buffer_ptr - (char*)buffer); @@ -211,7 +215,9 @@ ssize_t memcached_io_write(memcached_server_st *ptr, if (sent_length == -1) return -1; - WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); + /* If io_flush calls memcached_purge, sent_length may be 0 */ + if (sent_length != 0) + WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); } } @@ -234,16 +240,19 @@ memcached_return memcached_io_close(memcached_server_st *ptr) return MEMCACHED_SUCCESS; /* in case of death shutdown to avoid blocking at close() */ - r= shutdown(ptr->fd, SHUT_RDWR); + if (1) + { + r= shutdown(ptr->fd, SHUT_RDWR); #ifdef HAVE_DEBUG - if (r && errno != ENOTCONN) - { - WATCHPOINT_NUMBER(ptr->fd); - WATCHPOINT_ERRNO(errno); - WATCHPOINT_ASSERT(errno); - } + if (r && errno != ENOTCONN) + { + WATCHPOINT_NUMBER(ptr->fd); + WATCHPOINT_ERRNO(errno); + WATCHPOINT_ASSERT(errno); + } #endif + } r= close(ptr->fd); #ifdef HAVE_DEBUG @@ -257,6 +266,19 @@ memcached_return memcached_io_close(memcached_server_st *ptr) static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error) { + /* + ** We might want to purge the input buffer if we haven't consumed + ** any output yet... The test for the limits is the purge is inline + ** in the purge function to avoid duplicating the logic.. + */ + { + memcached_return rc; + WATCHPOINT_ASSERT(ptr->fd != -1); + rc= memcached_purge(ptr); + + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + return -1; + } ssize_t sent_length; size_t return_length; char *local_write_ptr= ptr->write_buffer; @@ -312,22 +334,6 @@ static ssize_t io_flush(memcached_server_st *ptr, } else { -#ifdef NOT_DONE - /* - ** We might want to purge the input buffer if we haven't consumed - ** any output yet... The test for the limits is the purge is inline - ** in the purge function to avoid duplicating the logic.. - */ - { - memcached_return rc; - WATCHPOINT_ASSERT(ptr->fd != -1); - rc= memcached_purge(ptr); - - if (rc != MEMCACHED_SUCCESS || rc != MEMCACHED_STORED) - return -1; - } -#endif - WATCHPOINT_ASSERT(ptr->fd != -1); if ((sent_length= write(ptr->fd, local_write_ptr, write_length)) == -1) @@ -341,7 +347,7 @@ static ssize_t io_flush(memcached_server_st *ptr, memcached_return rc; rc= io_wait(ptr, MEM_WRITE); - if (rc == MEMCACHED_SUCCESS) + if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) continue; memcached_quit_server(ptr, 1); @@ -376,5 +382,30 @@ static ssize_t io_flush(memcached_server_st *ptr, */ void memcached_io_reset(memcached_server_st *ptr) { - memcached_quit_server(ptr, 0); + memcached_quit_server(ptr, 1); +} + +/** + * Read a given number of bytes from the server and place it into a specific + * buffer. Reset the IO channel on this server if an error occurs. + */ +memcached_return memcached_safe_read(memcached_server_st *ptr, + void *dta, + size_t size) +{ + size_t offset= 0; + char *data= dta; + + while (offset < size) + { + ssize_t nread= memcached_io_read(ptr, data + offset, size - offset); + if (nread <= 0) + { + memcached_io_reset(ptr); + return MEMCACHED_UNKNOWN_READ_FAILURE; + } + offset+= nread; + } + + return MEMCACHED_SUCCESS; }