X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=d2196f67d1e6524a0585f9999e2dca2facfb35e4;hb=32eebdc2e1ae14f41bb15e212036b772760a3fd1;hp=ba4b347494a6e5261cafb68f7d9825a3e5ec0343;hpb=8c96d045f23d3f787ea6fc84237eb9e30c1c4fce;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index ba4b3474..d2196f67 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -70,7 +70,6 @@ static bool repack_input_buffer(memcached_instance_st* instance) */ memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length); instance->read_ptr= instance->read_buffer; - instance->read_data_length= instance->read_buffer_length; } /* There is room in the buffer, try to fill it! */ @@ -80,8 +79,8 @@ static bool repack_input_buffer(memcached_instance_st* instance) /* Just try a single read to grab what's available */ ssize_t nr; if ((nr= ::recv(instance->fd, - instance->read_ptr + instance->read_data_length, - MEMCACHED_MAX_BUFFER - instance->read_data_length, + instance->read_ptr + instance->read_buffer_length, + MEMCACHED_MAX_BUFFER - instance->read_buffer_length, MSG_NOSIGNAL)) <= 0) { if (nr == 0) @@ -113,7 +112,6 @@ static bool repack_input_buffer(memcached_instance_st* instance) } else // We read data, append to our read buffer { - instance->read_data_length+= size_t(nr); instance->read_buffer_length+= size_t(nr); return true; @@ -177,7 +175,7 @@ static bool process_input_buffer(memcached_instance_st* instance) } static memcached_return_t io_wait(memcached_instance_st* instance, - const memc_read_or_write read_or_write) + const short events) { /* ** We are going to block on write, but at least on Solaris we might block @@ -187,7 +185,7 @@ static memcached_return_t io_wait(memcached_instance_st* instance, ** The test is moved down in the purge function to avoid duplication of ** the test. */ - if (read_or_write == MEM_WRITE) + if (events & POLLOUT) { if (memcached_purge(instance) == false) { @@ -197,12 +195,11 @@ static memcached_return_t io_wait(memcached_instance_st* instance, struct pollfd fds; fds.fd= instance->fd; - fds.events= POLLIN; + fds.events= events; fds.revents= 0; - if (read_or_write == MEM_WRITE) /* write */ + if (fds.events & POLLOUT) /* write */ { - fds.events= POLLOUT; instance->io_wait_count.write++; } else @@ -274,9 +271,11 @@ static memcached_return_t io_wait(memcached_instance_st* instance, case EFAULT: case ENOMEM: memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + break; case EINVAL: memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + break; default: memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); @@ -343,6 +342,7 @@ static bool io_flush(memcached_instance_st* instance, } ssize_t sent_length= ::send(instance->fd, local_write_ptr, write_length, flags); + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno if (sent_length == SOCKET_ERROR) { @@ -371,7 +371,7 @@ static bool io_flush(memcached_instance_st* instance, continue; } - memcached_return_t rc= io_wait(instance, MEM_WRITE); + memcached_return_t rc= io_wait(instance, POLLOUT); if (memcached_success(rc)) { continue; @@ -382,14 +382,14 @@ static bool io_flush(memcached_instance_st* instance, } memcached_quit_server(instance, true); - error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + error= memcached_set_errno(*instance, local_errno, MEMCACHED_AT); return false; } case ENOTCONN: case EPIPE: default: memcached_quit_server(instance, true); - error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + error= memcached_set_errno(*instance, local_errno, MEMCACHED_AT); WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET); return false; } @@ -409,12 +409,12 @@ static bool io_flush(memcached_instance_st* instance, memcached_return_t memcached_io_wait_for_write(memcached_instance_st* instance) { - return io_wait(instance, MEM_WRITE); + return io_wait(instance, POLLOUT); } memcached_return_t memcached_io_wait_for_read(memcached_instance_st* instance) { - return io_wait(instance, MEM_READ); + return io_wait(instance, POLLIN); } static memcached_return_t _io_fill(memcached_instance_st* instance) @@ -423,6 +423,8 @@ static memcached_return_t _io_fill(memcached_instance_st* instance) do { data_read= ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL); + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + if (data_read == SOCKET_ERROR) { switch (get_socket_errno()) @@ -440,7 +442,7 @@ static memcached_return_t _io_fill(memcached_instance_st* instance) #endif { memcached_return_t io_wait_ret; - if (memcached_success(io_wait_ret= io_wait(instance, MEM_READ))) + if (memcached_success(io_wait_ret= io_wait(instance, POLLIN))) { continue; } @@ -456,12 +458,13 @@ static memcached_return_t _io_fill(memcached_instance_st* instance) WATCHPOINT_ASSERT(0); case EBADF: assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket"); + /* fall through */ case EINVAL: case EFAULT: case ECONNREFUSED: default: memcached_quit_server(instance, true); - memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + memcached_set_errno(*instance, local_errno, MEMCACHED_AT); break; } @@ -486,7 +489,6 @@ static memcached_return_t _io_fill(memcached_instance_st* instance) } while (data_read <= 0); instance->io_bytes_sent= 0; - instance->read_data_length= (size_t) data_read; instance->read_buffer_length= (size_t) data_read; instance->read_ptr= instance->read_buffer; @@ -576,7 +578,7 @@ memcached_return_t memcached_io_slurp(memcached_instance_st* instance) #ifdef __linux case ERESTART: #endif - if (memcached_success(io_wait(instance, MEM_READ))) + if (memcached_success(io_wait(instance, POLLIN))) { continue; } @@ -590,6 +592,7 @@ memcached_return_t memcached_io_slurp(memcached_instance_st* instance) assert(0); case EBADF: assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state"); + /* fall through */ case EINVAL: case EFAULT: case ECONNREFUSED: