#include <cassert>
-static memcached_return_t connect_poll(memcached_instance_st *server, const int connection_error) {
- struct pollfd fds[1];
- fds[0].fd = server->fd;
- fds[0].events = server->events();
- fds[0].revents = 0;
-
- size_t loop_max = 5;
-
- if (server->root->connect_timeout == 0) {
- return memcached_set_error(
- *server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
- memcached_literal_param("The time to wait for a connection to be established was set to "
- "zero which produces a timeout to every call to poll()."));
- }
-
- while (--loop_max) // Should only loop on cases of ERESTART or EINTR
- {
- int number_of;
- if ((number_of = poll(fds, 1, server->root->connect_timeout)) == SOCKET_ERROR) {
- int local_errno = get_socket_errno(); // We cache in case closesocket() modifies errno
- switch (local_errno) {
-#ifdef HAVE_ERESTART
- case ERESTART:
-#endif
- case EINTR:
- continue;
-
- case EFAULT:
- case ENOMEM:
- return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
-
- case EINVAL:
- return memcached_set_error(
- *server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
- memcached_literal_param(
- "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
-
- default: // This should not happen
- break;
- }
-
- assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
- server->reset_socket();
- server->state = MEMCACHED_SERVER_STATE_NEW;
-
- return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
- }
-
- if (number_of == 0) {
- if (connection_error != EALREADY) {
- int err;
- socklen_t len = sizeof(err);
- if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
- return memcached_set_errno(
- *server, errno, MEMCACHED_AT,
- memcached_literal_param(
- "getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
- }
-
- // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
- if (err) {
- return memcached_set_errno(
- *server, err, MEMCACHED_AT,
- memcached_literal_param("getsockopt() found the error from poll() after connect() "
- "returned EINPROGRESS."));
- }
- }
-
- return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
- memcached_literal_param("(number_of == 0)"));
- }
-
- assert(number_of == 1);
-
- if (fds[0].revents & POLLERR or fds[0].revents & POLLHUP or fds[0].revents & POLLNVAL) {
- int err;
- socklen_t len = sizeof(err);
- if (getsockopt(fds[0].fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
- return memcached_set_errno(
- *server, errno, MEMCACHED_AT,
- memcached_literal_param(
- "getsockopt() errored while looking up error state from poll()"));
- }
-
- // We check the value to see what happened with the socket.
- if (err == 0) // Should not happen
- {
- return MEMCACHED_SUCCESS;
- }
- errno = err;
-
- return memcached_set_errno(
- *server, err, MEMCACHED_AT,
- memcached_literal_param("getsockopt() found the error from poll() during connect."));
- }
- assert(fds[0].revents & POLLOUT);
-
- if (fds[0].revents & POLLOUT and connection_error != EALREADY) {
- int err;
- socklen_t len = sizeof(err);
- if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
- return memcached_set_errno(*server, errno, MEMCACHED_AT);
- }
-
- if (err == 0) {
- return MEMCACHED_SUCCESS;
- }
-
- return memcached_set_errno(
- *server, err, MEMCACHED_AT,
- memcached_literal_param(
- "getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
- }
-
- break; // We only have the loop setup for errno types that require restart
- }
-
- // This should only be possible from ERESTART or EINTR;
- return memcached_set_errno(*server, connection_error, MEMCACHED_AT,
- memcached_literal_param("connect_poll() was exhausted"));
-}
-
static memcached_return_t set_hostinfo(memcached_instance_st *server) {
assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
server->clear_addrinfo();
{
server->events(POLLOUT);
server->state = MEMCACHED_SERVER_STATE_IN_PROGRESS;
- memcached_return_t rc = connect_poll(server, local_error);
+ memcached_return_t rc = memcached_io_poll(server, IO_POLL_CONNECT, local_error);
if (memcached_success(rc)) {
server->state = MEMCACHED_SERVER_STATE_CONNECTED;
#include "libmemcached/common.h"
#include "p9y/poll.hpp"
+#include "p9y/clock_gettime.hpp"
void initialize_binary_request(memcached_instance_st *server,
protocol_binary_request_header &header) {
return false;
}
-static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
- /*
- ** We are going to block on write, but at least on Solaris we might block
- ** on write if we haven't read anything from our input buffer..
- ** Try to purge the input buffer if we don't do any flow control in the
- ** application layer (just sending a lot of data etc)
- ** The test is moved down in the purge function to avoid duplication of
- ** the test.
- */
- if (events & POLLOUT) {
- if (memcached_purge(instance) == false) {
- return MEMCACHED_FAILURE;
- }
+static memcached_return_t io_sock_err(memcached_instance_st *inst,
+ const char *reason_str, size_t reason_len) {
+ int err;
+ socklen_t len = sizeof(err);
+
+ if (getsockopt(inst->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
+ return memcached_set_errno(*inst, errno, MEMCACHED_AT,
+ memcached_literal_param("getsockopt()"));
}
- struct pollfd fds;
- fds.fd = instance->fd;
- fds.events = events;
- fds.revents = 0;
+ if (err) {
+ return memcached_set_errno(*inst, err, MEMCACHED_AT, reason_str, reason_len);
+ }
+ return MEMCACHED_SUCCESS;
+}
- if (fds.events & POLLOUT) /* write */ {
- instance->io_wait_count.write++;
+memcached_return_t memcached_io_poll(memcached_instance_st *inst, int16_t events, int prev_errno) {
+ int32_t timeout;
+ pollfd pfd{};
+ pfd.fd = inst->fd;
+ pfd.events = events ? events : inst->events();
+
+ if (events) {
+ timeout = inst->root->poll_timeout;
} else {
- instance->io_wait_count.read++;
+ timeout = inst->root->connect_timeout;
}
- if (instance->root->poll_timeout
- == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
- {
- return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
- memcached_literal_param("poll_timeout() was set to zero"));
+ if (!timeout) {
+ return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+ memcached_literal_param("timeout was set to zero"));
}
- size_t loop_max = 5;
- while (--loop_max) // While loop is for ERESTART or EINTR
- {
- int active_fd = poll(&fds, 1, instance->root->poll_timeout);
+ timespec tspec{}; // for clock_gettime()
+ int64_t start, elapsed; // ns
+ int32_t poll_timeout = timeout; // ms
- if (active_fd >= 1) {
- assert_msg(active_fd == 1, "poll() returned an unexpected number of active file descriptors");
- if (fds.revents & POLLIN or fds.revents & POLLOUT) {
- return MEMCACHED_SUCCESS;
- }
+ if (clock_gettime(CLOCK_MONOTONIC, &tspec)) {
+ return memcached_set_errno(*inst, errno, MEMCACHED_AT,
+ memcached_literal_param("clock_gettime()"));
+ }
+ start = tspec.tv_sec * 1000000000 + tspec.tv_nsec;
+ while (true) {
+ int active = poll(&pfd, 1, poll_timeout);
- if (fds.revents & POLLHUP) {
- return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
- memcached_literal_param("poll() detected hang up"));
- }
+ if (active == SOCKET_ERROR) {
+ int local_errno = get_socket_errno();
- if (fds.revents & POLLERR) {
- int local_errno = EINVAL;
- int err;
- socklen_t len = sizeof(err);
- if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0) {
- if (err == 0) // treat this as EINTR
- {
- continue;
- }
- local_errno = err;
+ switch (local_errno) {
+#ifdef HAVE_ERESTART
+ case ERESTART:
+#endif
+ case EINTR:
+ clock_gettime(CLOCK_MONOTONIC, &tspec);
+ elapsed = tspec.tv_sec * 1000000000 + tspec.tv_nsec - start;
+ if (elapsed / 1000000 >= timeout || !start /* safety if clock_gettime is broken */) {
+ return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+ memcached_literal_param("timeout on interrupt or restart"));
}
- memcached_quit_server(instance, true);
- return memcached_set_errno(*instance, local_errno, MEMCACHED_AT,
- memcached_literal_param("poll() returned POLLHUP"));
- }
+ poll_timeout -= elapsed / 1000000;
+ continue;
- return memcached_set_error(
- *instance, MEMCACHED_FAILURE, MEMCACHED_AT,
- memcached_literal_param("poll() returned a value that was not dealt with"));
- }
+ case EFAULT:
+ case ENOMEM:
+ return memcached_set_error(*inst, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
- if (active_fd == 0) {
- return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
- memcached_literal_param("No active_fd were found"));
+ case EINVAL:
+ return memcached_set_error(*inst, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
+ memcached_literal_param("RLIMIT_NOFILE exceeded, or invalid timeout"));
+ default:
+ if (events == IO_POLL_CONNECT) {
+ inst->reset_socket();
+ inst->state = MEMCACHED_SERVER_STATE_NEW;
+ }
+ return memcached_set_errno(*inst, local_errno, MEMCACHED_AT, memcached_literal_param("poll()"));
+ }
}
- // Only an error should result in this code being called.
- int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
- assert_msg(active_fd == -1, "poll() returned an unexpected value");
- switch (local_errno) {
-#ifdef ERESTART
- case ERESTART:
-#endif
- case EINTR:
- continue;
-
- case EFAULT:
- case ENOMEM:
- memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
- break;
+ if (active == 0) {
+ /* do not test SO_ERROR on EALREADY */
+ if (prev_errno != EALREADY) {
+ memcached_return_t rc = io_sock_err(inst, memcached_literal_param("getsockopt() after poll() timed out"));
+ if (MEMCACHED_SUCCESS != rc) {
+ return rc;
+ }
+ }
+ return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+ memcached_literal_param("time out"));
+ }
- case EINVAL:
- memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
- memcached_literal_param(
- "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
- break;
+ assert_msg(active == 1, "poll() returned an unexpected number of active file descriptors");
- default:
- memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
+ if (pfd.revents & (POLLERR|POLLHUP|POLLNVAL)) {
+ memcached_return_t rc = io_sock_err(inst, memcached_literal_param("poll(POLLERR|POLLHUP|POLLNVAL)"));
+ if (MEMCACHED_SUCCESS != rc) {
+ if (events != IO_POLL_CONNECT) {
+ memcached_quit_server(inst, true);
+ }
+ return rc;
+ }
}
-
- break;
+ if (pfd.revents & events || (events == IO_POLL_CONNECT && pfd.revents & POLLOUT)) {
+ return MEMCACHED_SUCCESS;
+ }
+#if DEBUG
+ dprintf(STDERR_FILENO, "io_poll() looped!\n");
+#endif
}
+}
- memcached_quit_server(instance, true);
-
- if (memcached_has_error(instance)) {
- return memcached_instance_error_return(instance);
+static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
+ if (events & POLLOUT) {
+ /*
+ ** We are going to block on write, but at least on Solaris we might block
+ ** on write if we haven't read anything from our input buffer..
+ ** Try to purge the input buffer if we don't do any flow control in the
+ ** application layer (just sending a lot of data etc)
+ ** The test is moved down in the purge function to avoid duplication of
+ ** the test.
+ */
+ if (memcached_purge(instance) == false) {
+ return MEMCACHED_FAILURE;
+ }
+ instance->io_wait_count.write++;
+ } else {
+ instance->io_wait_count.read++;
}
- return memcached_set_error(
- *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
- memcached_literal_param("number of attempts to call io_wait() failed"));
+ return memcached_io_poll(instance, events);
}
static bool io_flush(memcached_instance_st *instance, const bool with_flush,