fix #105 EINTR handled too defensively when polling
authorMichael Wallner <mike@php.net>
Thu, 7 Jan 2021 13:03:17 +0000 (14:03 +0100)
committerMichael Wallner <mike@php.net>
Thu, 7 Jan 2021 17:24:58 +0000 (18:24 +0100)
CMake/_Include.cmake
ChangeLog-1.1.md
docs/source/ChangeLog-1.1.rst
src/libmemcached/connect.cc
src/libmemcached/io.cc
src/libmemcached/io.hpp
src/p9y/CMakeLists.txt
src/p9y/clock_gettime.hpp [new file with mode: 0644]
src/p9y/p9y.cpp

index 39442dc2901cfc5cd8b6499d85a0de5f64e06088..a5a98710aaf31f3a3d21a71ef1015ce9e408ac6d 100644 (file)
@@ -134,8 +134,11 @@ check_type(in_port_t netinet/in.h)
 check_type(pid_t sys/types.h)
 check_type(ssize_t sys/types.h)
 check_type("struct msghdr" sys/socket.h)
+check_type("struct timespec" time.h)
 
 check_cxx_symbol(abi::__cxa_demangle cxxabi.h)
+check_symbol(CLOCK_MONOTONIC time.h)
+check_symbol(clock_gettime time.h)
 check_symbol(ERESTART errno.h)
 check_symbol(fcntl fcntl.h)
 check_symbol(gettimeofday sys/time.h)
index 53c42ff0f8a9d9a20044c239dfa7e8e11dcb6bc0..d9e8bf5f234ab77ed69fd727317b5a35c335888e 100644 (file)
@@ -1,5 +1,14 @@
 # ChangeLog v1.1
 
+## v 1.1.0-beta3
+
+> TBR
+
+**Changes from beta2:**
+
+* Fix [gh #105](https://github.com/m6w6/libmemcached/issues/105):
+  EINTR handled too defensively when polling.
+
 ## v 1.1.0-beta2
 
 > released 2020-12-28
index f08874e0b1b4daed1986cabb2e512215fecf6023..9bcdf781890ed43b0c7fcf3f4ceff82571051875 100644 (file)
@@ -5,6 +5,20 @@
 ChangeLog v1.1
 ==============
 
+v 1.1.0-beta3
+-------------
+
+..
+
+   TBR
+
+
+**Changes from beta2:**
+
+
+* Fix `gh #105 <https://github.com/m6w6/libmemcached/issues/105>`_\ :
+  EINTR handled too defensively when polling.
+
 v 1.1.0-beta2
 -------------
 
index 84617133741a9e5332e212ac50f29af4f2a8b563..90f1b21756a5930299ba405a446af4d79c0d7498 100644 (file)
 #include <cassert>
 
 
-static memcached_return_t connect_poll(memcached_instance_st *server, const int connection_error) {
-  struct pollfd fds[1];
-  fds[0].fd = server->fd;
-  fds[0].events = server->events();
-  fds[0].revents = 0;
-
-  size_t loop_max = 5;
-
-  if (server->root->connect_timeout == 0) {
-    return memcached_set_error(
-        *server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
-        memcached_literal_param("The time to wait for a connection to be established was set to "
-                                "zero which produces a timeout to every call to poll()."));
-  }
-
-  while (--loop_max) // Should only loop on cases of ERESTART or EINTR
-  {
-    int number_of;
-    if ((number_of = poll(fds, 1, server->root->connect_timeout)) == SOCKET_ERROR) {
-      int local_errno = get_socket_errno(); // We cache in case closesocket() modifies errno
-      switch (local_errno) {
-#ifdef HAVE_ERESTART
-      case ERESTART:
-#endif
-      case EINTR:
-        continue;
-
-      case EFAULT:
-      case ENOMEM:
-        return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
-
-      case EINVAL:
-        return memcached_set_error(
-            *server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
-            memcached_literal_param(
-                "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
-
-      default: // This should not happen
-        break;
-      }
-
-      assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
-      server->reset_socket();
-      server->state = MEMCACHED_SERVER_STATE_NEW;
-
-      return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
-    }
-
-    if (number_of == 0) {
-      if (connection_error != EALREADY) {
-        int err;
-        socklen_t len = sizeof(err);
-        if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
-          return memcached_set_errno(
-              *server, errno, MEMCACHED_AT,
-              memcached_literal_param(
-                  "getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
-        }
-
-        // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
-        if (err) {
-          return memcached_set_errno(
-              *server, err, MEMCACHED_AT,
-              memcached_literal_param("getsockopt() found the error from poll() after connect() "
-                                      "returned EINPROGRESS."));
-        }
-      }
-
-      return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
-                                 memcached_literal_param("(number_of == 0)"));
-    }
-
-    assert(number_of == 1);
-
-    if (fds[0].revents & POLLERR or fds[0].revents & POLLHUP or fds[0].revents & POLLNVAL) {
-      int err;
-      socklen_t len = sizeof(err);
-      if (getsockopt(fds[0].fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
-        return memcached_set_errno(
-            *server, errno, MEMCACHED_AT,
-            memcached_literal_param(
-                "getsockopt() errored while looking up error state from poll()"));
-      }
-
-      // We check the value to see what happened with the socket.
-      if (err == 0) // Should not happen
-      {
-        return MEMCACHED_SUCCESS;
-      }
-      errno = err;
-
-      return memcached_set_errno(
-          *server, err, MEMCACHED_AT,
-          memcached_literal_param("getsockopt() found the error from poll() during connect."));
-    }
-    assert(fds[0].revents & POLLOUT);
-
-    if (fds[0].revents & POLLOUT and connection_error != EALREADY) {
-      int err;
-      socklen_t len = sizeof(err);
-      if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
-        return memcached_set_errno(*server, errno, MEMCACHED_AT);
-      }
-
-      if (err == 0) {
-        return MEMCACHED_SUCCESS;
-      }
-
-      return memcached_set_errno(
-          *server, err, MEMCACHED_AT,
-          memcached_literal_param(
-              "getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
-    }
-
-    break; // We only have the loop setup for errno types that require restart
-  }
-
-  // This should only be possible from ERESTART or EINTR;
-  return memcached_set_errno(*server, connection_error, MEMCACHED_AT,
-                             memcached_literal_param("connect_poll() was exhausted"));
-}
-
 static memcached_return_t set_hostinfo(memcached_instance_st *server) {
   assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
   server->clear_addrinfo();
@@ -505,7 +383,7 @@ static memcached_return_t network_connect(memcached_instance_st *server) {
     {
       server->events(POLLOUT);
       server->state = MEMCACHED_SERVER_STATE_IN_PROGRESS;
-      memcached_return_t rc = connect_poll(server, local_error);
+      memcached_return_t rc = memcached_io_poll(server, IO_POLL_CONNECT, local_error);
 
       if (memcached_success(rc)) {
         server->state = MEMCACHED_SERVER_STATE_CONNECTED;
index 1cc62e47d942d077f329d400761d5938f37db5c6..e41891a9b60e37768368eea87f316a7f25e626f4 100644 (file)
@@ -15,6 +15,7 @@
 
 #include "libmemcached/common.h"
 #include "p9y/poll.hpp"
+#include "p9y/clock_gettime.hpp"
 
 void initialize_binary_request(memcached_instance_st *server,
                                protocol_binary_request_header &header) {
@@ -130,118 +131,135 @@ static bool process_input_buffer(memcached_instance_st *instance) {
   return false;
 }
 
-static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
-  /*
-   ** We are going to block on write, but at least on Solaris we might block
-   ** on write if we haven't read anything from our input buffer..
-   ** Try to purge the input buffer if we don't do any flow control in the
-   ** application layer (just sending a lot of data etc)
-   ** The test is moved down in the purge function to avoid duplication of
-   ** the test.
-   */
-  if (events & POLLOUT) {
-    if (memcached_purge(instance) == false) {
-      return MEMCACHED_FAILURE;
-    }
+static memcached_return_t io_sock_err(memcached_instance_st *inst,
+                                      const char *reason_str, size_t reason_len) {
+  int err;
+  socklen_t len = sizeof(err);
+
+  if (getsockopt(inst->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
+    return memcached_set_errno(*inst, errno, MEMCACHED_AT,
+                               memcached_literal_param("getsockopt()"));
   }
 
-  struct pollfd fds;
-  fds.fd = instance->fd;
-  fds.events = events;
-  fds.revents = 0;
+  if (err) {
+    return memcached_set_errno(*inst, err, MEMCACHED_AT, reason_str, reason_len);
+  }
+  return MEMCACHED_SUCCESS;
+}
 
-  if (fds.events & POLLOUT) /* write */ {
-    instance->io_wait_count.write++;
+memcached_return_t memcached_io_poll(memcached_instance_st *inst, int16_t events, int prev_errno) {
+  int32_t timeout;
+  pollfd pfd{};
+  pfd.fd = inst->fd;
+  pfd.events = events ? events : inst->events();
+
+  if (events) {
+    timeout = inst->root->poll_timeout;
   } else {
-    instance->io_wait_count.read++;
+    timeout = inst->root->connect_timeout;
   }
 
-  if (instance->root->poll_timeout
-      == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
-  {
-    return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
-                               memcached_literal_param("poll_timeout() was set to zero"));
+  if (!timeout) {
+    return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+                               memcached_literal_param("timeout was set to zero"));
   }
 
-  size_t loop_max = 5;
-  while (--loop_max) // While loop is for ERESTART or EINTR
-  {
-    int active_fd = poll(&fds, 1, instance->root->poll_timeout);
+  timespec tspec{}; // for clock_gettime()
+  int64_t start, elapsed; // ns
+  int32_t poll_timeout = timeout; // ms
 
-    if (active_fd >= 1) {
-      assert_msg(active_fd == 1, "poll() returned an unexpected number of active file descriptors");
-      if (fds.revents & POLLIN or fds.revents & POLLOUT) {
-        return MEMCACHED_SUCCESS;
-      }
+  if (clock_gettime(CLOCK_MONOTONIC, &tspec)) {
+    return memcached_set_errno(*inst, errno, MEMCACHED_AT,
+                               memcached_literal_param("clock_gettime()"));
+  }
+  start = tspec.tv_sec * 1000000000 + tspec.tv_nsec;
+  while (true) {
+    int active = poll(&pfd, 1, poll_timeout);
 
-      if (fds.revents & POLLHUP) {
-        return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
-                                   memcached_literal_param("poll() detected hang up"));
-      }
+    if (active == SOCKET_ERROR) {
+      int local_errno = get_socket_errno();
 
-      if (fds.revents & POLLERR) {
-        int local_errno = EINVAL;
-        int err;
-        socklen_t len = sizeof(err);
-        if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0) {
-          if (err == 0) // treat this as EINTR
-          {
-            continue;
-          }
-          local_errno = err;
+      switch (local_errno) {
+#ifdef HAVE_ERESTART
+        case ERESTART:
+#endif
+      case EINTR:
+        clock_gettime(CLOCK_MONOTONIC, &tspec);
+        elapsed = tspec.tv_sec * 1000000000 + tspec.tv_nsec - start;
+        if (elapsed / 1000000 >= timeout || !start /* safety if clock_gettime is broken */) {
+          return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+                                     memcached_literal_param("timeout on interrupt or restart"));
         }
-        memcached_quit_server(instance, true);
-        return memcached_set_errno(*instance, local_errno, MEMCACHED_AT,
-                                   memcached_literal_param("poll() returned POLLHUP"));
-      }
+        poll_timeout -= elapsed / 1000000;
+        continue;
 
-      return memcached_set_error(
-          *instance, MEMCACHED_FAILURE, MEMCACHED_AT,
-          memcached_literal_param("poll() returned a value that was not dealt with"));
-    }
+      case EFAULT:
+      case ENOMEM:
+        return memcached_set_error(*inst, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
 
-    if (active_fd == 0) {
-      return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
-                                 memcached_literal_param("No active_fd were found"));
+      case EINVAL:
+        return memcached_set_error(*inst, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
+                            memcached_literal_param("RLIMIT_NOFILE exceeded, or invalid timeout"));
+      default:
+        if (events == IO_POLL_CONNECT) {
+          inst->reset_socket();
+          inst->state = MEMCACHED_SERVER_STATE_NEW;
+        }
+        return memcached_set_errno(*inst, local_errno, MEMCACHED_AT, memcached_literal_param("poll()"));
+      }
     }
 
-    // Only an error should result in this code being called.
-    int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
-    assert_msg(active_fd == -1, "poll() returned an unexpected value");
-    switch (local_errno) {
-#ifdef ERESTART
-    case ERESTART:
-#endif
-    case EINTR:
-      continue;
-
-    case EFAULT:
-    case ENOMEM:
-      memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
-      break;
+    if (active == 0) {
+      /* do not test SO_ERROR on EALREADY */
+      if (prev_errno != EALREADY) {
+        memcached_return_t rc = io_sock_err(inst, memcached_literal_param("getsockopt() after poll() timed out"));
+        if (MEMCACHED_SUCCESS != rc) {
+          return rc;
+        }
+      }
+      return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
+                                 memcached_literal_param("time out"));
+    }
 
-    case EINVAL:
-      memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
-                          memcached_literal_param(
-                              "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
-      break;
+    assert_msg(active == 1, "poll() returned an unexpected number of active file descriptors");
 
-    default:
-      memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
+    if (pfd.revents & (POLLERR|POLLHUP|POLLNVAL)) {
+      memcached_return_t rc = io_sock_err(inst, memcached_literal_param("poll(POLLERR|POLLHUP|POLLNVAL)"));
+      if (MEMCACHED_SUCCESS != rc) {
+        if (events != IO_POLL_CONNECT) {
+          memcached_quit_server(inst, true);
+        }
+        return rc;
+      }
     }
-
-    break;
+    if (pfd.revents & events || (events == IO_POLL_CONNECT && pfd.revents & POLLOUT)) {
+      return MEMCACHED_SUCCESS;
+    }
+#if DEBUG
+    dprintf(STDERR_FILENO, "io_poll() looped!\n");
+#endif
   }
+}
 
-  memcached_quit_server(instance, true);
-
-  if (memcached_has_error(instance)) {
-    return memcached_instance_error_return(instance);
+static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
+  if (events & POLLOUT) {
+    /*
+     ** We are going to block on write, but at least on Solaris we might block
+     ** on write if we haven't read anything from our input buffer..
+     ** Try to purge the input buffer if we don't do any flow control in the
+     ** application layer (just sending a lot of data etc)
+     ** The test is moved down in the purge function to avoid duplication of
+     ** the test.
+     */
+    if (memcached_purge(instance) == false) {
+      return MEMCACHED_FAILURE;
+    }
+    instance->io_wait_count.write++;
+  } else {
+    instance->io_wait_count.read++;
   }
 
-  return memcached_set_error(
-      *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
-      memcached_literal_param("number of attempts to call io_wait() failed"));
+  return memcached_io_poll(instance, events);
 }
 
 static bool io_flush(memcached_instance_st *instance, const bool with_flush,
index bd7635c9abe5fdb66066897e4c7f1707b00534a0..625a446ab52abb78e4fc7d5b0acfb0e2145d6c5b 100644 (file)
@@ -43,3 +43,6 @@ memcached_return_t memcached_safe_read(memcached_instance_st *ptr, void *dta, co
 memcached_instance_st *memcached_io_get_readable_server(memcached_st *memc, memcached_return_t &);
 
 memcached_return_t memcached_io_slurp(memcached_instance_st *ptr);
+
+#define IO_POLL_CONNECT 0
+memcached_return_t memcached_io_poll(memcached_instance_st *inst, int16_t events = IO_POLL_CONNECT, int prev_errno = 0);
index ec7ce8d16b4cfb6473199920206a9b372be33ab5..6906b8f0154f0febcf41541ccbc35dd3210480fc 100644 (file)
@@ -6,6 +6,7 @@ add_library(p9y STATIC
     poll.hpp
     random.hpp
     index.hpp
+    clock_gettime.hpp
     p9y.cpp
     )
 
diff --git a/src/p9y/clock_gettime.hpp b/src/p9y/clock_gettime.hpp
new file mode 100644 (file)
index 0000000..b095561
--- /dev/null
@@ -0,0 +1,27 @@
+#pragma once
+
+#include "mem_config.h"
+
+#if defined __cplusplus
+# include <ctime>
+#else
+# include <time.h>
+#endif
+
+#ifndef HAVE_CLOCK_MONOTONIC
+# define CLOCK_MONOTONIC 1
+#endif
+
+#ifndef HAVE_STRUCT_TIMESPEC
+struct timespec {
+  long tv_sec;
+  long tv_nsec;
+};
+#endif
+
+#ifndef HAVE_CLOCK_GETTIME
+# define P9Y_NEED_CLOCK_GETTIME
+int clock_gettime(int clock_type, struct timespec *spec);
+# ifdef _WIN32
+# endif // _WIN32
+#endif // HAVE_CLOCK_GETTIME
index c9927495ebe4b5aceeca3917389d47c18f5709a6..4716bfbd8760460a685dc578f93062006fad03cf 100644 (file)
@@ -147,3 +147,27 @@ int poll(struct pollfd fds[], nfds_t nfds, int tmo) {
   return ret;
 }
 #endif // P9Y_NEED_POLL
+
+#include "clock_gettime.hpp"
+#ifdef P9Y_NEED_CLOCK_GETTIME
+# ifdef _WIN32
+
+static inline __int64 wintime2unixtime(__int64 wintime) {
+  return wintime - 116444736000000000i64;
+}
+
+int clock_gettime(int, struct timespec *spec)
+{
+  __int64 wintime, unixtime;
+
+  GetSystemTimeAsFileTime((FILETIME*) &wintime);
+  unixtime = wintime2unixtime(wintime);
+
+
+  spec->tv_sec = unixtime / 10000000i64;
+  spec->tv_nsec = unixtime % 10000000i64 * 100;
+
+  return 0;
+}
+# endif // _WIN32
+#endif // P9Y_NEED_CLOCK_GETTIME