X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=d2196f67d1e6524a0585f9999e2dca2facfb35e4;hb=32eebdc2e1ae14f41bb15e212036b772760a3fd1;hp=0660fe9db25fd1bad4b425c814078df4004c3159;hpb=bf8213041709c75147393c8bd6b51b8f9e064f7c;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 0660fe9d..d2196f67 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -40,10 +40,10 @@ #include #ifdef HAVE_SYS_SOCKET_H -# include +# include #endif -void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header) +void initialize_binary_request(memcached_instance_st* server, protocol_binary_request_header& header) { server->request_id++; header.request.magic= PROTOCOL_BINARY_REQ; @@ -61,7 +61,7 @@ enum memc_read_or_write { * * @param instance the server to pack */ -static bool repack_input_buffer(org::libmemcached::Instance* instance) +static bool repack_input_buffer(memcached_instance_st* instance) { if (instance->read_ptr != instance->read_buffer) { @@ -70,7 +70,6 @@ static bool repack_input_buffer(org::libmemcached::Instance* instance) */ memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length); instance->read_ptr= instance->read_buffer; - instance->read_data_length= instance->read_buffer_length; } /* There is room in the buffer, try to fill it! */ @@ -80,8 +79,8 @@ static bool repack_input_buffer(org::libmemcached::Instance* instance) /* Just try a single read to grab what's available */ ssize_t nr; if ((nr= ::recv(instance->fd, - instance->read_ptr + instance->read_data_length, - MEMCACHED_MAX_BUFFER - instance->read_data_length, + instance->read_ptr + instance->read_buffer_length, + MEMCACHED_MAX_BUFFER - instance->read_buffer_length, MSG_NOSIGNAL)) <= 0) { if (nr == 0) @@ -99,7 +98,7 @@ static bool repack_input_buffer(org::libmemcached::Instance* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif break; // No IO is fine, we can just move on @@ -113,7 +112,6 @@ static bool repack_input_buffer(org::libmemcached::Instance* instance) } else // We read data, append to our read buffer { - instance->read_data_length+= size_t(nr); instance->read_buffer_length+= size_t(nr); return true; @@ -134,7 +132,7 @@ static bool repack_input_buffer(org::libmemcached::Instance* instance) * @param instance the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(org::libmemcached::Instance* instance) +static bool process_input_buffer(memcached_instance_st* instance) { /* ** We might be able to process some of the response messages if we @@ -176,8 +174,8 @@ static bool process_input_buffer(org::libmemcached::Instance* instance) return false; } -static memcached_return_t io_wait(org::libmemcached::Instance* instance, - const memc_read_or_write read_or_write) +static memcached_return_t io_wait(memcached_instance_st* instance, + const short events) { /* ** We are going to block on write, but at least on Solaris we might block @@ -187,7 +185,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, ** The test is moved down in the purge function to avoid duplication of ** the test. */ - if (read_or_write == MEM_WRITE) + if (events & POLLOUT) { if (memcached_purge(instance) == false) { @@ -197,12 +195,11 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, struct pollfd fds; fds.fd= instance->fd; - fds.events= POLLIN; + fds.events= events; fds.revents= 0; - if (read_or_write == MEM_WRITE) /* write */ + if (fds.events & POLLOUT) /* write */ { - fds.events= POLLOUT; instance->io_wait_count.write++; } else @@ -212,8 +209,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, if (instance->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) { - instance->io_wait_count.timeouts++; - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_literal_param("poll_timeout() was set to zero")); } size_t loop_max= 5; @@ -258,8 +254,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, if (active_fd == 0) { - instance->io_wait_count.timeouts++; - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_literal_param("No active_fd were found")); } // Only an error should result in this code being called. @@ -267,7 +262,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, assert_msg(active_fd == -1 , "poll() returned an unexpected value"); switch (local_errno) { -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif case EINTR: @@ -276,9 +271,11 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, case EFAULT: case ENOMEM: memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + break; case EINVAL: memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + break; default: memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); @@ -298,7 +295,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, memcached_literal_param("number of attempts to call io_wait() failed")); } -static bool io_flush(org::libmemcached::Instance* instance, +static bool io_flush(memcached_instance_st* instance, const bool with_flush, memcached_return_t& error) { @@ -345,6 +342,7 @@ static bool io_flush(org::libmemcached::Instance* instance, } ssize_t sent_length= ::send(instance->fd, local_write_ptr, write_length, flags); + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno if (sent_length == SOCKET_ERROR) { @@ -373,7 +371,7 @@ static bool io_flush(org::libmemcached::Instance* instance, continue; } - memcached_return_t rc= io_wait(instance, MEM_WRITE); + memcached_return_t rc= io_wait(instance, POLLOUT); if (memcached_success(rc)) { continue; @@ -384,14 +382,14 @@ static bool io_flush(org::libmemcached::Instance* instance, } memcached_quit_server(instance, true); - error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + error= memcached_set_errno(*instance, local_errno, MEMCACHED_AT); return false; } case ENOTCONN: case EPIPE: default: memcached_quit_server(instance, true); - error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + error= memcached_set_errno(*instance, local_errno, MEMCACHED_AT); WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET); return false; } @@ -409,17 +407,24 @@ static bool io_flush(org::libmemcached::Instance* instance, return true; } -memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* instance) +memcached_return_t memcached_io_wait_for_write(memcached_instance_st* instance) +{ + return io_wait(instance, POLLOUT); +} + +memcached_return_t memcached_io_wait_for_read(memcached_instance_st* instance) { - return io_wait(instance, MEM_WRITE); + return io_wait(instance, POLLIN); } -static memcached_return_t _io_fill(org::libmemcached::Instance* instance) +static memcached_return_t _io_fill(memcached_instance_st* instance) { ssize_t data_read; do { data_read= ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL); + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + if (data_read == SOCKET_ERROR) { switch (get_socket_errno()) @@ -432,12 +437,12 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif { memcached_return_t io_wait_ret; - if (memcached_success(io_wait_ret= io_wait(instance, MEM_READ))) + if (memcached_success(io_wait_ret= io_wait(instance, POLLIN))) { continue; } @@ -453,12 +458,13 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* instance) WATCHPOINT_ASSERT(0); case EBADF: assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket"); + /* fall through */ case EINVAL: case EFAULT: case ECONNREFUSED: default: memcached_quit_server(instance, true); - memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + memcached_set_errno(*instance, local_errno, MEMCACHED_AT); break; } @@ -483,14 +489,13 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* instance) } while (data_read <= 0); instance->io_bytes_sent= 0; - instance->read_data_length= (size_t) data_read; instance->read_buffer_length= (size_t) data_read; instance->read_ptr= instance->read_buffer; return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_read(org::libmemcached::Instance* instance, +memcached_return_t memcached_io_read(memcached_instance_st* instance, void *buffer, size_t length, ssize_t& nread) { assert(memcached_is_udp(instance->root) == false); @@ -542,7 +547,7 @@ memcached_return_t memcached_io_read(org::libmemcached::Instance* instance, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) +memcached_return_t memcached_io_slurp(memcached_instance_st* instance) { assert_msg(instance, "Programmer error, invalid Instance"); assert(memcached_is_udp(instance->root) == false); @@ -570,10 +575,10 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif - if (memcached_success(io_wait(instance, MEM_READ))) + if (memcached_success(io_wait(instance, POLLIN))) { continue; } @@ -587,6 +592,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) assert(0); case EBADF: assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state"); + /* fall through */ case EINVAL: case EFAULT: case ECONNREFUSED: @@ -599,7 +605,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) return MEMCACHED_CONNECTION_FAILURE; } -static bool _io_write(org::libmemcached::Instance* instance, +static bool _io_write(memcached_instance_st* instance, const void *buffer, size_t length, bool with_flush, size_t& written) { @@ -652,13 +658,13 @@ static bool _io_write(org::libmemcached::Instance* instance, return true; } -bool memcached_io_write(org::libmemcached::Instance* instance) +bool memcached_io_write(memcached_instance_st* instance) { size_t written; return _io_write(instance, NULL, 0, true, written); } -ssize_t memcached_io_write(org::libmemcached::Instance* instance, +ssize_t memcached_io_write(memcached_instance_st* instance, const void *buffer, const size_t length, const bool with_flush) { size_t written; @@ -671,7 +677,7 @@ ssize_t memcached_io_write(org::libmemcached::Instance* instance, return ssize_t(written); } -bool memcached_io_writev(org::libmemcached::Instance* instance, +bool memcached_io_writev(memcached_instance_st* instance, libmemcached_io_vector_st vector[], const size_t number_of, const bool with_flush) { @@ -703,7 +709,7 @@ bool memcached_io_writev(org::libmemcached::Instance* instance, return (complete_total == total); } -void org::libmemcached::Instance::start_close_socket() +void memcached_instance_st::start_close_socket() { if (fd != INVALID_SOCKET) { @@ -712,7 +718,7 @@ void org::libmemcached::Instance::start_close_socket() } } -void org::libmemcached::Instance::reset_socket() +void memcached_instance_st::reset_socket() { if (fd != INVALID_SOCKET) { @@ -721,7 +727,7 @@ void org::libmemcached::Instance::reset_socket() } } -void org::libmemcached::Instance::close_socket() +void memcached_instance_st::close_socket() { if (fd != INVALID_SOCKET) { @@ -757,7 +763,7 @@ void org::libmemcached::Instance::close_socket() major_version= minor_version= micro_version= UINT8_MAX; } -org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, memcached_return_t&) +memcached_instance_st* memcached_io_get_readable_server(Memcached *memc, memcached_return_t&) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; @@ -765,7 +771,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, m for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); + memcached_instance_st* instance= memcached_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ { @@ -786,7 +792,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, m /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); + memcached_instance_st* instance= memcached_instance_fetch(memc, x); if (instance->response_count() > 0) { @@ -813,7 +819,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, m { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, y); + memcached_instance_st* instance= memcached_instance_fetch(memc, y); if (instance->fd == fds[x].fd) { @@ -830,7 +836,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, m /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(org::libmemcached::Instance* instance) +void memcached_io_reset(memcached_instance_st* instance) { memcached_quit_server(instance, true); } @@ -839,7 +845,7 @@ void memcached_io_reset(org::libmemcached::Instance* instance) * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(org::libmemcached::Instance* instance, +memcached_return_t memcached_safe_read(memcached_instance_st* instance, void *dta, const size_t size) { @@ -864,7 +870,7 @@ memcached_return_t memcached_safe_read(org::libmemcached::Instance* instance, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(org::libmemcached::Instance* instance, +memcached_return_t memcached_io_readline(memcached_instance_st* instance, char *buffer_ptr, size_t size, size_t& total_nr)