X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=d2196f67d1e6524a0585f9999e2dca2facfb35e4;hb=50532bea9eb770554e1e962d03b868987303acbe;hp=cb8ed5c23199eac47b5665ca6733384216b4ffbb;hpb=d9db3f534bc38ea22e83e5529ff93aea4150ab19;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index cb8ed5c2..d2196f67 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -70,7 +70,6 @@ static bool repack_input_buffer(memcached_instance_st* instance) */ memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length); instance->read_ptr= instance->read_buffer; - instance->read_data_length= instance->read_buffer_length; } /* There is room in the buffer, try to fill it! */ @@ -80,8 +79,8 @@ static bool repack_input_buffer(memcached_instance_st* instance) /* Just try a single read to grab what's available */ ssize_t nr; if ((nr= ::recv(instance->fd, - instance->read_ptr + instance->read_data_length, - MEMCACHED_MAX_BUFFER - instance->read_data_length, + instance->read_ptr + instance->read_buffer_length, + MEMCACHED_MAX_BUFFER - instance->read_buffer_length, MSG_NOSIGNAL)) <= 0) { if (nr == 0) @@ -99,7 +98,7 @@ static bool repack_input_buffer(memcached_instance_st* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif break; // No IO is fine, we can just move on @@ -113,7 +112,6 @@ static bool repack_input_buffer(memcached_instance_st* instance) } else // We read data, append to our read buffer { - instance->read_data_length+= size_t(nr); instance->read_buffer_length+= size_t(nr); return true; @@ -177,7 +175,7 @@ static bool process_input_buffer(memcached_instance_st* instance) } static memcached_return_t io_wait(memcached_instance_st* instance, - const memc_read_or_write read_or_write) + const short events) { /* ** We are going to block on write, but at least on Solaris we might block @@ -187,7 +185,7 @@ static memcached_return_t io_wait(memcached_instance_st* instance, ** The test is moved down in the purge function to avoid duplication of ** the test. */ - if (read_or_write == MEM_WRITE) + if (events & POLLOUT) { if (memcached_purge(instance) == false) { @@ -197,12 +195,11 @@ static memcached_return_t io_wait(memcached_instance_st* instance, struct pollfd fds; fds.fd= instance->fd; - fds.events= POLLIN; + fds.events= events; fds.revents= 0; - if (read_or_write == MEM_WRITE) /* write */ + if (fds.events & POLLOUT) /* write */ { - fds.events= POLLOUT; instance->io_wait_count.write++; } else @@ -265,7 +262,7 @@ static memcached_return_t io_wait(memcached_instance_st* instance, assert_msg(active_fd == -1 , "poll() returned an unexpected value"); switch (local_errno) { -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif case EINTR: @@ -274,9 +271,11 @@ static memcached_return_t io_wait(memcached_instance_st* instance, case EFAULT: case ENOMEM: memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + break; case EINVAL: memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + break; default: memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); @@ -343,6 +342,7 @@ static bool io_flush(memcached_instance_st* instance, } ssize_t sent_length= ::send(instance->fd, local_write_ptr, write_length, flags); + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno if (sent_length == SOCKET_ERROR) { @@ -371,7 +371,7 @@ static bool io_flush(memcached_instance_st* instance, continue; } - memcached_return_t rc= io_wait(instance, MEM_WRITE); + memcached_return_t rc= io_wait(instance, POLLOUT); if (memcached_success(rc)) { continue; @@ -382,14 +382,14 @@ static bool io_flush(memcached_instance_st* instance, } memcached_quit_server(instance, true); - error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + error= memcached_set_errno(*instance, local_errno, MEMCACHED_AT); return false; } case ENOTCONN: case EPIPE: default: memcached_quit_server(instance, true); - error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + error= memcached_set_errno(*instance, local_errno, MEMCACHED_AT); WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET); return false; } @@ -409,7 +409,12 @@ static bool io_flush(memcached_instance_st* instance, memcached_return_t memcached_io_wait_for_write(memcached_instance_st* instance) { - return io_wait(instance, MEM_WRITE); + return io_wait(instance, POLLOUT); +} + +memcached_return_t memcached_io_wait_for_read(memcached_instance_st* instance) +{ + return io_wait(instance, POLLIN); } static memcached_return_t _io_fill(memcached_instance_st* instance) @@ -418,6 +423,8 @@ static memcached_return_t _io_fill(memcached_instance_st* instance) do { data_read= ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL); + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + if (data_read == SOCKET_ERROR) { switch (get_socket_errno()) @@ -430,12 +437,12 @@ static memcached_return_t _io_fill(memcached_instance_st* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif { memcached_return_t io_wait_ret; - if (memcached_success(io_wait_ret= io_wait(instance, MEM_READ))) + if (memcached_success(io_wait_ret= io_wait(instance, POLLIN))) { continue; } @@ -451,12 +458,13 @@ static memcached_return_t _io_fill(memcached_instance_st* instance) WATCHPOINT_ASSERT(0); case EBADF: assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket"); + /* fall through */ case EINVAL: case EFAULT: case ECONNREFUSED: default: memcached_quit_server(instance, true); - memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + memcached_set_errno(*instance, local_errno, MEMCACHED_AT); break; } @@ -481,7 +489,6 @@ static memcached_return_t _io_fill(memcached_instance_st* instance) } while (data_read <= 0); instance->io_bytes_sent= 0; - instance->read_data_length= (size_t) data_read; instance->read_buffer_length= (size_t) data_read; instance->read_ptr= instance->read_buffer; @@ -568,10 +575,10 @@ memcached_return_t memcached_io_slurp(memcached_instance_st* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif - if (memcached_success(io_wait(instance, MEM_READ))) + if (memcached_success(io_wait(instance, POLLIN))) { continue; } @@ -585,6 +592,7 @@ memcached_return_t memcached_io_slurp(memcached_instance_st* instance) assert(0); case EBADF: assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state"); + /* fall through */ case EINVAL: case EFAULT: case ECONNREFUSED: