fix #105 EINTR handled too defensively when polling
[awesomized/libmemcached] / src / libmemcached / io.cc
index 8e5d93b7f84b4c5527f5198e120247692821d68c..e41891a9b60e37768368eea87f316a7f25e626f4 100644 (file)
 */
 
 #include "libmemcached/common.h"
-
-#ifdef HAVE_SYS_SOCKET_H
-#  include <sys/socket.h>
-#endif
+#include "p9y/poll.hpp"
+#include "p9y/clock_gettime.hpp"
 
 void initialize_binary_request(memcached_instance_st *server,
                                protocol_binary_request_header &header) {
@@ -63,7 +61,7 @@ static bool repack_input_buffer(memcached_instance_st *instance) {
           case EWOULDBLOCK:
 #endif
           case EAGAIN:
-#ifdef __linux
+#ifdef HAVE_ERESTART
           case ERESTART:
 #endif
             break; // No IO is fine, we can just move on
@@ -101,7 +99,7 @@ static bool process_input_buffer(memcached_instance_st *instance) {
    ** We might be able to process some of the response messages if we
    ** have a callback set up
    */
-  if (instance->root->callbacks != NULL) {
+  if (instance->root->callbacks) {
     /*
      * We might have responses... try to read them out and fire
      * callbacks
@@ -133,118 +131,135 @@ static bool process_input_buffer(memcached_instance_st *instance) {
   return false;
 }
 
-static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
-  /*
-   ** We are going to block on write, but at least on Solaris we might block
-   ** on write if we haven't read anything from our input buffer..
-   ** Try to purge the input buffer if we don't do any flow control in the
-   ** application layer (just sending a lot of data etc)
-   ** The test is moved down in the purge function to avoid duplication of
-   ** the test.
-   */
-  if (events & POLLOUT) {
-    if (memcached_purge(instance) == false) {
-      return MEMCACHED_FAILURE;
-    }
+static memcached_return_t io_sock_err(memcached_instance_st *inst,
+                                      const char *reason_str, size_t reason_len) {
+  int err;
+  socklen_t len = sizeof(err);
+
+  if (getsockopt(inst->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
+    return memcached_set_errno(*inst, errno, MEMCACHED_AT,
+                               memcached_literal_param("getsockopt()"));
+  }
+
+  if (err) {
+    return memcached_set_errno(*inst, err, MEMCACHED_AT, reason_str, reason_len);
   }
+  return MEMCACHED_SUCCESS;
+}
 
-  struct pollfd fds;
-  fds.fd = instance->fd;
-  fds.events = events;
-  fds.revents = 0;
+memcached_return_t memcached_io_poll(memcached_instance_st *inst, int16_t events, int prev_errno) {
+  int32_t timeout;
+  pollfd pfd{};
+  pfd.fd = inst->fd;
+  pfd.events = events ? events : inst->events();
 
-  if (fds.events & POLLOUT) /* write */ {
-    instance->io_wait_count.write++;
+  if (events) {
+    timeout = inst->root->poll_timeout;
   } else {
-    instance->io_wait_count.read++;
+    timeout = inst->root->connect_timeout;
   }
 
-  if (instance->root->poll_timeout
-      == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
-  {
-    return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
-                               memcached_literal_param("poll_timeout() was set to zero"));
+  if (!timeout) {
+    return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+                               memcached_literal_param("timeout was set to zero"));
   }
 
-  size_t loop_max = 5;
-  while (--loop_max) // While loop is for ERESTART or EINTR
-  {
-    int active_fd = poll(&fds, 1, instance->root->poll_timeout);
+  timespec tspec{}; // for clock_gettime()
+  int64_t start, elapsed; // ns
+  int32_t poll_timeout = timeout; // ms
 
-    if (active_fd >= 1) {
-      assert_msg(active_fd == 1, "poll() returned an unexpected number of active file descriptors");
-      if (fds.revents & POLLIN or fds.revents & POLLOUT) {
-        return MEMCACHED_SUCCESS;
-      }
+  if (clock_gettime(CLOCK_MONOTONIC, &tspec)) {
+    return memcached_set_errno(*inst, errno, MEMCACHED_AT,
+                               memcached_literal_param("clock_gettime()"));
+  }
+  start = tspec.tv_sec * 1000000000 + tspec.tv_nsec;
+  while (true) {
+    int active = poll(&pfd, 1, poll_timeout);
 
-      if (fds.revents & POLLHUP) {
-        return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
-                                   memcached_literal_param("poll() detected hang up"));
-      }
+    if (active == SOCKET_ERROR) {
+      int local_errno = get_socket_errno();
 
-      if (fds.revents & POLLERR) {
-        int local_errno = EINVAL;
-        int err;
-        socklen_t len = sizeof(err);
-        if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0) {
-          if (err == 0) // treat this as EINTR
-          {
-            continue;
-          }
-          local_errno = err;
+      switch (local_errno) {
+#ifdef HAVE_ERESTART
+        case ERESTART:
+#endif
+      case EINTR:
+        clock_gettime(CLOCK_MONOTONIC, &tspec);
+        elapsed = tspec.tv_sec * 1000000000 + tspec.tv_nsec - start;
+        if (elapsed / 1000000 >= timeout || !start /* safety if clock_gettime is broken */) {
+          return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+                                     memcached_literal_param("timeout on interrupt or restart"));
         }
-        memcached_quit_server(instance, true);
-        return memcached_set_errno(*instance, local_errno, MEMCACHED_AT,
-                                   memcached_literal_param("poll() returned POLLHUP"));
-      }
+        poll_timeout -= elapsed / 1000000;
+        continue;
 
-      return memcached_set_error(
-          *instance, MEMCACHED_FAILURE, MEMCACHED_AT,
-          memcached_literal_param("poll() returned a value that was not dealt with"));
-    }
+      case EFAULT:
+      case ENOMEM:
+        return memcached_set_error(*inst, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
 
-    if (active_fd == 0) {
-      return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
-                                 memcached_literal_param("No active_fd were found"));
+      case EINVAL:
+        return memcached_set_error(*inst, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
+                            memcached_literal_param("RLIMIT_NOFILE exceeded, or invalid timeout"));
+      default:
+        if (events == IO_POLL_CONNECT) {
+          inst->reset_socket();
+          inst->state = MEMCACHED_SERVER_STATE_NEW;
+        }
+        return memcached_set_errno(*inst, local_errno, MEMCACHED_AT, memcached_literal_param("poll()"));
+      }
     }
 
-    // Only an error should result in this code being called.
-    int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
-    assert_msg(active_fd == -1, "poll() returned an unexpected value");
-    switch (local_errno) {
-#ifdef __linux
-    case ERESTART:
-#endif
-    case EINTR:
-      continue;
-
-    case EFAULT:
-    case ENOMEM:
-      memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
-      break;
+    if (active == 0) {
+      /* do not test SO_ERROR on EALREADY */
+      if (prev_errno != EALREADY) {
+        memcached_return_t rc = io_sock_err(inst, memcached_literal_param("getsockopt() after poll() timed out"));
+        if (MEMCACHED_SUCCESS != rc) {
+          return rc;
+        }
+      }
+      return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+                                 memcached_literal_param("time out"));
+    }
 
-    case EINVAL:
-      memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
-                          memcached_literal_param(
-                              "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
-      break;
+    assert_msg(active == 1, "poll() returned an unexpected number of active file descriptors");
 
-    default:
-      memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
+    if (pfd.revents & (POLLERR|POLLHUP|POLLNVAL)) {
+      memcached_return_t rc = io_sock_err(inst, memcached_literal_param("poll(POLLERR|POLLHUP|POLLNVAL)"));
+      if (MEMCACHED_SUCCESS != rc) {
+        if (events != IO_POLL_CONNECT) {
+          memcached_quit_server(inst, true);
+        }
+        return rc;
+      }
     }
-
-    break;
+    if (pfd.revents & events || (events == IO_POLL_CONNECT && pfd.revents & POLLOUT)) {
+      return MEMCACHED_SUCCESS;
+    }
+#if DEBUG
+    dprintf(STDERR_FILENO, "io_poll() looped!\n");
+#endif
   }
+}
 
-  memcached_quit_server(instance, true);
-
-  if (memcached_has_error(instance)) {
-    return memcached_instance_error_return(instance);
+static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
+  if (events & POLLOUT) {
+    /*
+     ** We are going to block on write, but at least on Solaris we might block
+     ** on write if we haven't read anything from our input buffer..
+     ** Try to purge the input buffer if we don't do any flow control in the
+     ** application layer (just sending a lot of data etc)
+     ** The test is moved down in the purge function to avoid duplication of
+     ** the test.
+     */
+    if (memcached_purge(instance) == false) {
+      return MEMCACHED_FAILURE;
+    }
+    instance->io_wait_count.write++;
+  } else {
+    instance->io_wait_count.read++;
   }
 
-  return memcached_set_error(
-      *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
-      memcached_literal_param("number of attempts to call io_wait() failed"));
+  return memcached_io_poll(instance, events);
 }
 
 static bool io_flush(memcached_instance_st *instance, const bool with_flush,
@@ -370,7 +385,7 @@ static memcached_return_t _io_fill(memcached_instance_st *instance) {
       case EWOULDBLOCK:
 #endif
       case EAGAIN:
-#ifdef __linux
+#ifdef HAVE_ERESTART
       case ERESTART:
 #endif
       {
@@ -499,7 +514,7 @@ memcached_return_t memcached_io_slurp(memcached_instance_st *instance) {
       case EWOULDBLOCK:
 #endif
       case EAGAIN:
-#ifdef __linux
+#ifdef ERESTART
       case ERESTART:
 #endif
         if (memcached_success(io_wait(instance, POLLIN))) {