From: Michael Wallner Date: Thu, 7 Jan 2021 13:03:17 +0000 (+0100) Subject: fix #105 EINTR handled too defensively when polling X-Git-Tag: 1.1.0-beta3~18 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=011ea033d902722b9744ea9f28e646734ffdcaf2;p=m6w6%2Flibmemcached fix #105 EINTR handled too defensively when polling --- diff --git a/CMake/_Include.cmake b/CMake/_Include.cmake index 39442dc2..a5a98710 100644 --- a/CMake/_Include.cmake +++ b/CMake/_Include.cmake @@ -134,8 +134,11 @@ check_type(in_port_t netinet/in.h) check_type(pid_t sys/types.h) check_type(ssize_t sys/types.h) check_type("struct msghdr" sys/socket.h) +check_type("struct timespec" time.h) check_cxx_symbol(abi::__cxa_demangle cxxabi.h) +check_symbol(CLOCK_MONOTONIC time.h) +check_symbol(clock_gettime time.h) check_symbol(ERESTART errno.h) check_symbol(fcntl fcntl.h) check_symbol(gettimeofday sys/time.h) diff --git a/ChangeLog-1.1.md b/ChangeLog-1.1.md index 53c42ff0..d9e8bf5f 100644 --- a/ChangeLog-1.1.md +++ b/ChangeLog-1.1.md @@ -1,5 +1,14 @@ # ChangeLog v1.1 +## v 1.1.0-beta3 + +> TBR + +**Changes from beta2:** + +* Fix [gh #105](https://github.com/m6w6/libmemcached/issues/105): + EINTR handled too defensively when polling. + ## v 1.1.0-beta2 > released 2020-12-28 diff --git a/docs/source/ChangeLog-1.1.rst b/docs/source/ChangeLog-1.1.rst index f08874e0..9bcdf781 100644 --- a/docs/source/ChangeLog-1.1.rst +++ b/docs/source/ChangeLog-1.1.rst @@ -5,6 +5,20 @@ ChangeLog v1.1 ============== +v 1.1.0-beta3 +------------- + +.. + + TBR + + +**Changes from beta2:** + + +* Fix `gh #105 `_\ : + EINTR handled too defensively when polling. + v 1.1.0-beta2 ------------- diff --git a/src/libmemcached/connect.cc b/src/libmemcached/connect.cc index 84617133..90f1b217 100644 --- a/src/libmemcached/connect.cc +++ b/src/libmemcached/connect.cc @@ -19,128 +19,6 @@ #include -static memcached_return_t connect_poll(memcached_instance_st *server, const int connection_error) { - struct pollfd fds[1]; - fds[0].fd = server->fd; - fds[0].events = server->events(); - fds[0].revents = 0; - - size_t loop_max = 5; - - if (server->root->connect_timeout == 0) { - return memcached_set_error( - *server, MEMCACHED_TIMEOUT, MEMCACHED_AT, - memcached_literal_param("The time to wait for a connection to be established was set to " - "zero which produces a timeout to every call to poll().")); - } - - while (--loop_max) // Should only loop on cases of ERESTART or EINTR - { - int number_of; - if ((number_of = poll(fds, 1, server->root->connect_timeout)) == SOCKET_ERROR) { - int local_errno = get_socket_errno(); // We cache in case closesocket() modifies errno - switch (local_errno) { -#ifdef HAVE_ERESTART - case ERESTART: -#endif - case EINTR: - continue; - - case EFAULT: - case ENOMEM: - return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - - case EINVAL: - return memcached_set_error( - *server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, - memcached_literal_param( - "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); - - default: // This should not happen - break; - } - - assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor"); - server->reset_socket(); - server->state = MEMCACHED_SERVER_STATE_NEW; - - return memcached_set_errno(*server, local_errno, MEMCACHED_AT); - } - - if (number_of == 0) { - if (connection_error != EALREADY) { - int err; - socklen_t len = sizeof(err); - if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) { - return memcached_set_errno( - *server, errno, MEMCACHED_AT, - memcached_literal_param( - "getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)")); - } - - // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error - if (err) { - return memcached_set_errno( - *server, err, MEMCACHED_AT, - memcached_literal_param("getsockopt() found the error from poll() after connect() " - "returned EINPROGRESS.")); - } - } - - return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, - memcached_literal_param("(number_of == 0)")); - } - - assert(number_of == 1); - - if (fds[0].revents & POLLERR or fds[0].revents & POLLHUP or fds[0].revents & POLLNVAL) { - int err; - socklen_t len = sizeof(err); - if (getsockopt(fds[0].fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) { - return memcached_set_errno( - *server, errno, MEMCACHED_AT, - memcached_literal_param( - "getsockopt() errored while looking up error state from poll()")); - } - - // We check the value to see what happened with the socket. - if (err == 0) // Should not happen - { - return MEMCACHED_SUCCESS; - } - errno = err; - - return memcached_set_errno( - *server, err, MEMCACHED_AT, - memcached_literal_param("getsockopt() found the error from poll() during connect.")); - } - assert(fds[0].revents & POLLOUT); - - if (fds[0].revents & POLLOUT and connection_error != EALREADY) { - int err; - socklen_t len = sizeof(err); - if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) { - return memcached_set_errno(*server, errno, MEMCACHED_AT); - } - - if (err == 0) { - return MEMCACHED_SUCCESS; - } - - return memcached_set_errno( - *server, err, MEMCACHED_AT, - memcached_literal_param( - "getsockopt() found the error from poll() after connect() returned EINPROGRESS.")); - } - - break; // We only have the loop setup for errno types that require restart - } - - // This should only be possible from ERESTART or EINTR; - return memcached_set_errno(*server, connection_error, MEMCACHED_AT, - memcached_literal_param("connect_poll() was exhausted")); -} - static memcached_return_t set_hostinfo(memcached_instance_st *server) { assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET); server->clear_addrinfo(); @@ -505,7 +383,7 @@ static memcached_return_t network_connect(memcached_instance_st *server) { { server->events(POLLOUT); server->state = MEMCACHED_SERVER_STATE_IN_PROGRESS; - memcached_return_t rc = connect_poll(server, local_error); + memcached_return_t rc = memcached_io_poll(server, IO_POLL_CONNECT, local_error); if (memcached_success(rc)) { server->state = MEMCACHED_SERVER_STATE_CONNECTED; diff --git a/src/libmemcached/io.cc b/src/libmemcached/io.cc index 1cc62e47..e41891a9 100644 --- a/src/libmemcached/io.cc +++ b/src/libmemcached/io.cc @@ -15,6 +15,7 @@ #include "libmemcached/common.h" #include "p9y/poll.hpp" +#include "p9y/clock_gettime.hpp" void initialize_binary_request(memcached_instance_st *server, protocol_binary_request_header &header) { @@ -130,118 +131,135 @@ static bool process_input_buffer(memcached_instance_st *instance) { return false; } -static memcached_return_t io_wait(memcached_instance_st *instance, const short events) { - /* - ** We are going to block on write, but at least on Solaris we might block - ** on write if we haven't read anything from our input buffer.. - ** Try to purge the input buffer if we don't do any flow control in the - ** application layer (just sending a lot of data etc) - ** The test is moved down in the purge function to avoid duplication of - ** the test. - */ - if (events & POLLOUT) { - if (memcached_purge(instance) == false) { - return MEMCACHED_FAILURE; - } +static memcached_return_t io_sock_err(memcached_instance_st *inst, + const char *reason_str, size_t reason_len) { + int err; + socklen_t len = sizeof(err); + + if (getsockopt(inst->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) { + return memcached_set_errno(*inst, errno, MEMCACHED_AT, + memcached_literal_param("getsockopt()")); } - struct pollfd fds; - fds.fd = instance->fd; - fds.events = events; - fds.revents = 0; + if (err) { + return memcached_set_errno(*inst, err, MEMCACHED_AT, reason_str, reason_len); + } + return MEMCACHED_SUCCESS; +} - if (fds.events & POLLOUT) /* write */ { - instance->io_wait_count.write++; +memcached_return_t memcached_io_poll(memcached_instance_st *inst, int16_t events, int prev_errno) { + int32_t timeout; + pollfd pfd{}; + pfd.fd = inst->fd; + pfd.events = events ? events : inst->events(); + + if (events) { + timeout = inst->root->poll_timeout; } else { - instance->io_wait_count.read++; + timeout = inst->root->connect_timeout; } - if (instance->root->poll_timeout - == 0) // Mimic 0 causes timeout behavior (not all platforms do this) - { - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, - memcached_literal_param("poll_timeout() was set to zero")); + if (!timeout) { + return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT, + memcached_literal_param("timeout was set to zero")); } - size_t loop_max = 5; - while (--loop_max) // While loop is for ERESTART or EINTR - { - int active_fd = poll(&fds, 1, instance->root->poll_timeout); + timespec tspec{}; // for clock_gettime() + int64_t start, elapsed; // ns + int32_t poll_timeout = timeout; // ms - if (active_fd >= 1) { - assert_msg(active_fd == 1, "poll() returned an unexpected number of active file descriptors"); - if (fds.revents & POLLIN or fds.revents & POLLOUT) { - return MEMCACHED_SUCCESS; - } + if (clock_gettime(CLOCK_MONOTONIC, &tspec)) { + return memcached_set_errno(*inst, errno, MEMCACHED_AT, + memcached_literal_param("clock_gettime()")); + } + start = tspec.tv_sec * 1000000000 + tspec.tv_nsec; + while (true) { + int active = poll(&pfd, 1, poll_timeout); - if (fds.revents & POLLHUP) { - return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, - memcached_literal_param("poll() detected hang up")); - } + if (active == SOCKET_ERROR) { + int local_errno = get_socket_errno(); - if (fds.revents & POLLERR) { - int local_errno = EINVAL; - int err; - socklen_t len = sizeof(err); - if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0) { - if (err == 0) // treat this as EINTR - { - continue; - } - local_errno = err; + switch (local_errno) { +#ifdef HAVE_ERESTART + case ERESTART: +#endif + case EINTR: + clock_gettime(CLOCK_MONOTONIC, &tspec); + elapsed = tspec.tv_sec * 1000000000 + tspec.tv_nsec - start; + if (elapsed / 1000000 >= timeout || !start /* safety if clock_gettime is broken */) { + return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT, + memcached_literal_param("timeout on interrupt or restart")); } - memcached_quit_server(instance, true); - return memcached_set_errno(*instance, local_errno, MEMCACHED_AT, - memcached_literal_param("poll() returned POLLHUP")); - } + poll_timeout -= elapsed / 1000000; + continue; - return memcached_set_error( - *instance, MEMCACHED_FAILURE, MEMCACHED_AT, - memcached_literal_param("poll() returned a value that was not dealt with")); - } + case EFAULT: + case ENOMEM: + return memcached_set_error(*inst, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - if (active_fd == 0) { - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, - memcached_literal_param("No active_fd were found")); + case EINVAL: + return memcached_set_error(*inst, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("RLIMIT_NOFILE exceeded, or invalid timeout")); + default: + if (events == IO_POLL_CONNECT) { + inst->reset_socket(); + inst->state = MEMCACHED_SERVER_STATE_NEW; + } + return memcached_set_errno(*inst, local_errno, MEMCACHED_AT, memcached_literal_param("poll()")); + } } - // Only an error should result in this code being called. - int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno - assert_msg(active_fd == -1, "poll() returned an unexpected value"); - switch (local_errno) { -#ifdef ERESTART - case ERESTART: -#endif - case EINTR: - continue; - - case EFAULT: - case ENOMEM: - memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - break; + if (active == 0) { + /* do not test SO_ERROR on EALREADY */ + if (prev_errno != EALREADY) { + memcached_return_t rc = io_sock_err(inst, memcached_literal_param("getsockopt() after poll() timed out")); + if (MEMCACHED_SUCCESS != rc) { + return rc; + } + } + return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT, + memcached_literal_param("time out")); + } - case EINVAL: - memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, - memcached_literal_param( - "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); - break; + assert_msg(active == 1, "poll() returned an unexpected number of active file descriptors"); - default: - memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); + if (pfd.revents & (POLLERR|POLLHUP|POLLNVAL)) { + memcached_return_t rc = io_sock_err(inst, memcached_literal_param("poll(POLLERR|POLLHUP|POLLNVAL)")); + if (MEMCACHED_SUCCESS != rc) { + if (events != IO_POLL_CONNECT) { + memcached_quit_server(inst, true); + } + return rc; + } } - - break; + if (pfd.revents & events || (events == IO_POLL_CONNECT && pfd.revents & POLLOUT)) { + return MEMCACHED_SUCCESS; + } +#if DEBUG + dprintf(STDERR_FILENO, "io_poll() looped!\n"); +#endif } +} - memcached_quit_server(instance, true); - - if (memcached_has_error(instance)) { - return memcached_instance_error_return(instance); +static memcached_return_t io_wait(memcached_instance_st *instance, const short events) { + if (events & POLLOUT) { + /* + ** We are going to block on write, but at least on Solaris we might block + ** on write if we haven't read anything from our input buffer.. + ** Try to purge the input buffer if we don't do any flow control in the + ** application layer (just sending a lot of data etc) + ** The test is moved down in the purge function to avoid duplication of + ** the test. + */ + if (memcached_purge(instance) == false) { + return MEMCACHED_FAILURE; + } + instance->io_wait_count.write++; + } else { + instance->io_wait_count.read++; } - return memcached_set_error( - *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, - memcached_literal_param("number of attempts to call io_wait() failed")); + return memcached_io_poll(instance, events); } static bool io_flush(memcached_instance_st *instance, const bool with_flush, diff --git a/src/libmemcached/io.hpp b/src/libmemcached/io.hpp index bd7635c9..625a446a 100644 --- a/src/libmemcached/io.hpp +++ b/src/libmemcached/io.hpp @@ -43,3 +43,6 @@ memcached_return_t memcached_safe_read(memcached_instance_st *ptr, void *dta, co memcached_instance_st *memcached_io_get_readable_server(memcached_st *memc, memcached_return_t &); memcached_return_t memcached_io_slurp(memcached_instance_st *ptr); + +#define IO_POLL_CONNECT 0 +memcached_return_t memcached_io_poll(memcached_instance_st *inst, int16_t events = IO_POLL_CONNECT, int prev_errno = 0); diff --git a/src/p9y/CMakeLists.txt b/src/p9y/CMakeLists.txt index ec7ce8d1..6906b8f0 100644 --- a/src/p9y/CMakeLists.txt +++ b/src/p9y/CMakeLists.txt @@ -6,6 +6,7 @@ add_library(p9y STATIC poll.hpp random.hpp index.hpp + clock_gettime.hpp p9y.cpp ) diff --git a/src/p9y/clock_gettime.hpp b/src/p9y/clock_gettime.hpp new file mode 100644 index 00000000..b0955610 --- /dev/null +++ b/src/p9y/clock_gettime.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include "mem_config.h" + +#if defined __cplusplus +# include +#else +# include +#endif + +#ifndef HAVE_CLOCK_MONOTONIC +# define CLOCK_MONOTONIC 1 +#endif + +#ifndef HAVE_STRUCT_TIMESPEC +struct timespec { + long tv_sec; + long tv_nsec; +}; +#endif + +#ifndef HAVE_CLOCK_GETTIME +# define P9Y_NEED_CLOCK_GETTIME +int clock_gettime(int clock_type, struct timespec *spec); +# ifdef _WIN32 +# endif // _WIN32 +#endif // HAVE_CLOCK_GETTIME diff --git a/src/p9y/p9y.cpp b/src/p9y/p9y.cpp index c9927495..4716bfbd 100644 --- a/src/p9y/p9y.cpp +++ b/src/p9y/p9y.cpp @@ -147,3 +147,27 @@ int poll(struct pollfd fds[], nfds_t nfds, int tmo) { return ret; } #endif // P9Y_NEED_POLL + +#include "clock_gettime.hpp" +#ifdef P9Y_NEED_CLOCK_GETTIME +# ifdef _WIN32 + +static inline __int64 wintime2unixtime(__int64 wintime) { + return wintime - 116444736000000000i64; +} + +int clock_gettime(int, struct timespec *spec) +{ + __int64 wintime, unixtime; + + GetSystemTimeAsFileTime((FILETIME*) &wintime); + unixtime = wintime2unixtime(wintime); + + + spec->tv_sec = unixtime / 10000000i64; + spec->tv_nsec = unixtime % 10000000i64 * 100; + + return 0; +} +# endif // _WIN32 +#endif // P9Y_NEED_CLOCK_GETTIME