X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=src%2Flibmemcached%2Fio.cc;h=e41891a9b60e37768368eea87f316a7f25e626f4;hb=011ea033d902722b9744ea9f28e646734ffdcaf2;hp=8e5d93b7f84b4c5527f5198e120247692821d68c;hpb=0f27cca46a4a529459ca988268972f480a9730c6;p=awesomized%2Flibmemcached diff --git a/src/libmemcached/io.cc b/src/libmemcached/io.cc index 8e5d93b7..e41891a9 100644 --- a/src/libmemcached/io.cc +++ b/src/libmemcached/io.cc @@ -14,10 +14,8 @@ */ #include "libmemcached/common.h" - -#ifdef HAVE_SYS_SOCKET_H -# include -#endif +#include "p9y/poll.hpp" +#include "p9y/clock_gettime.hpp" void initialize_binary_request(memcached_instance_st *server, protocol_binary_request_header &header) { @@ -63,7 +61,7 @@ static bool repack_input_buffer(memcached_instance_st *instance) { case EWOULDBLOCK: #endif case EAGAIN: -#ifdef __linux +#ifdef HAVE_ERESTART case ERESTART: #endif break; // No IO is fine, we can just move on @@ -101,7 +99,7 @@ static bool process_input_buffer(memcached_instance_st *instance) { ** We might be able to process some of the response messages if we ** have a callback set up */ - if (instance->root->callbacks != NULL) { + if (instance->root->callbacks) { /* * We might have responses... try to read them out and fire * callbacks @@ -133,118 +131,135 @@ static bool process_input_buffer(memcached_instance_st *instance) { return false; } -static memcached_return_t io_wait(memcached_instance_st *instance, const short events) { - /* - ** We are going to block on write, but at least on Solaris we might block - ** on write if we haven't read anything from our input buffer.. - ** Try to purge the input buffer if we don't do any flow control in the - ** application layer (just sending a lot of data etc) - ** The test is moved down in the purge function to avoid duplication of - ** the test. - */ - if (events & POLLOUT) { - if (memcached_purge(instance) == false) { - return MEMCACHED_FAILURE; - } +static memcached_return_t io_sock_err(memcached_instance_st *inst, + const char *reason_str, size_t reason_len) { + int err; + socklen_t len = sizeof(err); + + if (getsockopt(inst->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) { + return memcached_set_errno(*inst, errno, MEMCACHED_AT, + memcached_literal_param("getsockopt()")); + } + + if (err) { + return memcached_set_errno(*inst, err, MEMCACHED_AT, reason_str, reason_len); } + return MEMCACHED_SUCCESS; +} - struct pollfd fds; - fds.fd = instance->fd; - fds.events = events; - fds.revents = 0; +memcached_return_t memcached_io_poll(memcached_instance_st *inst, int16_t events, int prev_errno) { + int32_t timeout; + pollfd pfd{}; + pfd.fd = inst->fd; + pfd.events = events ? events : inst->events(); - if (fds.events & POLLOUT) /* write */ { - instance->io_wait_count.write++; + if (events) { + timeout = inst->root->poll_timeout; } else { - instance->io_wait_count.read++; + timeout = inst->root->connect_timeout; } - if (instance->root->poll_timeout - == 0) // Mimic 0 causes timeout behavior (not all platforms do this) - { - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, - memcached_literal_param("poll_timeout() was set to zero")); + if (!timeout) { + return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT, + memcached_literal_param("timeout was set to zero")); } - size_t loop_max = 5; - while (--loop_max) // While loop is for ERESTART or EINTR - { - int active_fd = poll(&fds, 1, instance->root->poll_timeout); + timespec tspec{}; // for clock_gettime() + int64_t start, elapsed; // ns + int32_t poll_timeout = timeout; // ms - if (active_fd >= 1) { - assert_msg(active_fd == 1, "poll() returned an unexpected number of active file descriptors"); - if (fds.revents & POLLIN or fds.revents & POLLOUT) { - return MEMCACHED_SUCCESS; - } + if (clock_gettime(CLOCK_MONOTONIC, &tspec)) { + return memcached_set_errno(*inst, errno, MEMCACHED_AT, + memcached_literal_param("clock_gettime()")); + } + start = tspec.tv_sec * 1000000000 + tspec.tv_nsec; + while (true) { + int active = poll(&pfd, 1, poll_timeout); - if (fds.revents & POLLHUP) { - return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, - memcached_literal_param("poll() detected hang up")); - } + if (active == SOCKET_ERROR) { + int local_errno = get_socket_errno(); - if (fds.revents & POLLERR) { - int local_errno = EINVAL; - int err; - socklen_t len = sizeof(err); - if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0) { - if (err == 0) // treat this as EINTR - { - continue; - } - local_errno = err; + switch (local_errno) { +#ifdef HAVE_ERESTART + case ERESTART: +#endif + case EINTR: + clock_gettime(CLOCK_MONOTONIC, &tspec); + elapsed = tspec.tv_sec * 1000000000 + tspec.tv_nsec - start; + if (elapsed / 1000000 >= timeout || !start /* safety if clock_gettime is broken */) { + return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT, + memcached_literal_param("timeout on interrupt or restart")); } - memcached_quit_server(instance, true); - return memcached_set_errno(*instance, local_errno, MEMCACHED_AT, - memcached_literal_param("poll() returned POLLHUP")); - } + poll_timeout -= elapsed / 1000000; + continue; - return memcached_set_error( - *instance, MEMCACHED_FAILURE, MEMCACHED_AT, - memcached_literal_param("poll() returned a value that was not dealt with")); - } + case EFAULT: + case ENOMEM: + return memcached_set_error(*inst, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - if (active_fd == 0) { - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, - memcached_literal_param("No active_fd were found")); + case EINVAL: + return memcached_set_error(*inst, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("RLIMIT_NOFILE exceeded, or invalid timeout")); + default: + if (events == IO_POLL_CONNECT) { + inst->reset_socket(); + inst->state = MEMCACHED_SERVER_STATE_NEW; + } + return memcached_set_errno(*inst, local_errno, MEMCACHED_AT, memcached_literal_param("poll()")); + } } - // Only an error should result in this code being called. - int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno - assert_msg(active_fd == -1, "poll() returned an unexpected value"); - switch (local_errno) { -#ifdef __linux - case ERESTART: -#endif - case EINTR: - continue; - - case EFAULT: - case ENOMEM: - memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - break; + if (active == 0) { + /* do not test SO_ERROR on EALREADY */ + if (prev_errno != EALREADY) { + memcached_return_t rc = io_sock_err(inst, memcached_literal_param("getsockopt() after poll() timed out")); + if (MEMCACHED_SUCCESS != rc) { + return rc; + } + } + return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT, + memcached_literal_param("time out")); + } - case EINVAL: - memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, - memcached_literal_param( - "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); - break; + assert_msg(active == 1, "poll() returned an unexpected number of active file descriptors"); - default: - memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); + if (pfd.revents & (POLLERR|POLLHUP|POLLNVAL)) { + memcached_return_t rc = io_sock_err(inst, memcached_literal_param("poll(POLLERR|POLLHUP|POLLNVAL)")); + if (MEMCACHED_SUCCESS != rc) { + if (events != IO_POLL_CONNECT) { + memcached_quit_server(inst, true); + } + return rc; + } } - - break; + if (pfd.revents & events || (events == IO_POLL_CONNECT && pfd.revents & POLLOUT)) { + return MEMCACHED_SUCCESS; + } +#if DEBUG + dprintf(STDERR_FILENO, "io_poll() looped!\n"); +#endif } +} - memcached_quit_server(instance, true); - - if (memcached_has_error(instance)) { - return memcached_instance_error_return(instance); +static memcached_return_t io_wait(memcached_instance_st *instance, const short events) { + if (events & POLLOUT) { + /* + ** We are going to block on write, but at least on Solaris we might block + ** on write if we haven't read anything from our input buffer.. + ** Try to purge the input buffer if we don't do any flow control in the + ** application layer (just sending a lot of data etc) + ** The test is moved down in the purge function to avoid duplication of + ** the test. + */ + if (memcached_purge(instance) == false) { + return MEMCACHED_FAILURE; + } + instance->io_wait_count.write++; + } else { + instance->io_wait_count.read++; } - return memcached_set_error( - *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, - memcached_literal_param("number of attempts to call io_wait() failed")); + return memcached_io_poll(instance, events); } static bool io_flush(memcached_instance_st *instance, const bool with_flush, @@ -370,7 +385,7 @@ static memcached_return_t _io_fill(memcached_instance_st *instance) { case EWOULDBLOCK: #endif case EAGAIN: -#ifdef __linux +#ifdef HAVE_ERESTART case ERESTART: #endif { @@ -499,7 +514,7 @@ memcached_return_t memcached_io_slurp(memcached_instance_st *instance) { case EWOULDBLOCK: #endif case EAGAIN: -#ifdef __linux +#ifdef ERESTART case ERESTART: #endif if (memcached_success(io_wait(instance, POLLIN))) {