Merge in pid/ping status.
[m6w6/libmemcached] / libmemcached / io.cc
index aa7c0ccb4db5f93ec2c944ee09469ba480c49775..f77a097325c84d0c166c5f4149c8d793c54427d5 100644 (file)
 
 
 #include <libmemcached/common.h>
+#include <cassert>
 
-typedef enum {
+enum memc_read_or_write {
   MEM_READ,
   MEM_WRITE
-} memc_read_or_write;
+};
 
 static ssize_t io_flush(memcached_server_write_instance_st ptr,
                         const bool with_flush,
@@ -56,8 +57,6 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
   fds.fd= ptr->fd;
   fds.events= POLLIN;
 
-  int error;
-
   if (read_or_write == MEM_WRITE) /* write */
   {
     fds.events= POLLOUT;
@@ -85,18 +84,16 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
     }
   }
 
+  if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
+  {
+    return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+  }
+
   size_t loop_max= 5;
   while (--loop_max) // While loop is for ERESTART or EINTR
   {
-    if (ptr->root->poll_timeout) // Mimic 0 causes timeout behavior (not all platforms do this)
-    {
-      error= poll(&fds, 1, ptr->root->poll_timeout);
-    }
-    else
-    {
-      error= 0;
-    }
 
+    int error= poll(&fds, 1, ptr->root->poll_timeout);
     switch (error)
     {
     case 1: // Success!
@@ -131,11 +128,11 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
           int err;
           socklen_t len= sizeof (err);
           (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
-          ptr->cached_errno= (err == 0) ? get_socket_errno() : err;
+          memcached_set_errno(*ptr, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT);
         }
         else
         {
-          ptr->cached_errno= get_socket_errno();
+          memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
         }
         memcached_quit_server(ptr, true);
 
@@ -144,12 +141,9 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
     }
   }
 
-  /* Imposssible for anything other then -1 */
-  WATCHPOINT_ASSERT(error == -1);
-  ptr->cached_errno= get_socket_errno();
   memcached_quit_server(ptr, true);
 
-  return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT);
+  return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
 }
 
 memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
@@ -178,18 +172,52 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr)
   /* There is room in the buffer, try to fill it! */
   if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
   {
-    /* Just try a single read to grab what's available */
-    ssize_t nr= recv(ptr->fd,
-                     ptr->read_ptr + ptr->read_data_length,
-                     MEMCACHED_MAX_BUFFER - ptr->read_data_length,
-                     0);
+    do {
+      /* Just try a single read to grab what's available */
+      ssize_t nr= recv(ptr->fd,
+                       ptr->read_ptr + ptr->read_data_length,
+                       MEMCACHED_MAX_BUFFER - ptr->read_data_length,
+                       MSG_DONTWAIT);
+
+      switch (nr)
+      {
+      case SOCKET_ERROR:
+        {
+          switch (get_socket_errno())
+          {
+          case EINTR:
+            continue;
 
-    if (nr > 0)
-    {
-      ptr->read_data_length+= (size_t)nr;
-      ptr->read_buffer_length+= (size_t)nr;
-      return true;
-    }
+          case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+          case EAGAIN:
+#endif
+#ifdef TARGET_OS_LINUX
+          case ERESTART:
+#endif
+            break; // No IO is fine, we can just move on
+
+          default:
+            memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+          }
+        }
+        break;
+
+      case 0: // Shutdown on the socket has occurred
+        {
+          memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
+        }
+        break;
+
+      default:
+        {
+          ptr->read_data_length+= size_t(nr);
+          ptr->read_buffer_length+= size_t(nr);
+          return true;
+        }
+        break;
+      }
+    } while (0);
   }
   return false;
 }
@@ -249,29 +277,35 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr)
 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
                                      void *buffer, size_t length, ssize_t *nread)
 {
-  char *buffer_ptr;
+  assert(ptr); // Programmer error
+  char *buffer_ptr= static_cast<char *>(buffer);
 
-  buffer_ptr= static_cast<char *>(buffer);
+  if (ptr->fd == INVALID_SOCKET)
+  {
+    assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
+    return MEMCACHED_CONNECTION_FAILURE;
+  }
 
   while (length)
   {
     if (not ptr->read_buffer_length)
     {
       ssize_t data_read;
-
       do
       {
-        data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0);
-
+        data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
         if (data_read == SOCKET_ERROR)
         {
           switch (get_socket_errno())
           {
+          case EINTR: // We just retry
+            continue;
+
+          case ETIMEDOUT: // OSX
           case EWOULDBLOCK:
 #ifdef USE_EAGAIN
           case EAGAIN:
 #endif
-          case EINTR:
 #ifdef TARGET_OS_LINUX
           case ERESTART:
 #endif
@@ -279,9 +313,19 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
             {
               continue;
             }
+            return MEMCACHED_IN_PROGRESS;
 
             /* fall through */
 
+          case ENOTCONN: // Programmer Error
+            WATCHPOINT_ASSERT(0);
+          case ENOTSOCK:
+            WATCHPOINT_ASSERT(0);
+          case EBADF:
+            assert(ptr->fd != INVALID_SOCKET);
+          case EINVAL:
+          case EFAULT:
+          case ECONNREFUSED:
           default:
             {
               memcached_quit_server(ptr, true);
@@ -302,6 +346,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
             it will return EGAIN if data is not immediatly available.
           */
           WATCHPOINT_STRING("We had a zero length recv()");
+          assert(0);
           memcached_quit_server(ptr, true);
           *nread= -1;
           return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
@@ -341,6 +386,62 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
   return MEMCACHED_SUCCESS;
 }
 
+memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
+{
+  assert(ptr); // Programmer error
+
+  if (ptr->fd == INVALID_SOCKET)
+  {
+    assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
+    return MEMCACHED_CONNECTION_FAILURE;
+  }
+
+  ssize_t data_read;
+  char buffer[MEMCACHED_MAX_BUFFER];
+  do
+  {
+    data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
+    if (data_read == SOCKET_ERROR)
+    {
+      switch (get_socket_errno())
+      {
+      case EINTR: // We just retry
+        continue;
+
+      case ETIMEDOUT: // OSX
+      case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+      case EAGAIN:
+#endif
+#ifdef TARGET_OS_LINUX
+      case ERESTART:
+#endif
+        if (memcached_success(io_wait(ptr, MEM_READ)))
+        {
+          continue;
+        }
+        return MEMCACHED_IN_PROGRESS;
+
+        /* fall through */
+
+      case ENOTCONN: // Programmer Error
+        WATCHPOINT_ASSERT(0);
+      case ENOTSOCK:
+        WATCHPOINT_ASSERT(0);
+      case EBADF:
+        assert(ptr->fd != INVALID_SOCKET);
+      case EINVAL:
+      case EFAULT:
+      case ECONNREFUSED:
+      default:
+        return MEMCACHED_CONNECTION_FAILURE; // We want this!
+      }
+    }
+  } while (data_read > 0);
+
+  return MEMCACHED_CONNECTION_FAILURE;
+}
+
 static ssize_t _io_write(memcached_server_write_instance_st ptr,
                          const void *buffer, size_t length, bool with_flush)
 {
@@ -477,9 +578,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
   struct pollfd fds[MAX_SERVERS_TO_POLL];
   unsigned int host_index= 0;
 
-  for (uint32_t x= 0;
-       x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL;
-       ++x)
+  for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
   {
     memcached_server_write_instance_st instance=
       memcached_server_instance_fetch(memc, x);
@@ -513,7 +612,8 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
     return NULL;
   }
 
-  switch (poll(fds, host_index, memc->poll_timeout))
+  int error= poll(fds, host_index, memc->poll_timeout);
+  switch (error)
   {
   case -1:
     memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
@@ -610,7 +710,7 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
 
     if (sent_length == SOCKET_ERROR)
     {
-      ptr->cached_errno= get_socket_errno();
+      memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
 #if 0 // @todo I should look at why we hit this bit of code hard frequently
       WATCHPOINT_ERRNO(get_socket_errno());
       WATCHPOINT_NUMBER(get_socket_errno());
@@ -630,9 +730,11 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
            * buffer for more data and retry the write before
            * waiting..
          */
-          if (repack_input_buffer(ptr) ||
+          if (repack_input_buffer(ptr) or
               process_input_buffer(ptr))
+          {
             continue;
+          }
 
           memcached_return_t rc= io_wait(ptr, MEM_WRITE);
           if (memcached_success(rc))
@@ -710,9 +812,11 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
   while (offset < size)
   {
     ssize_t nread;
-    memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset,
-                                             &nread);
-    if (rc != MEMCACHED_SUCCESS)
+    memcached_return_t rc;
+
+    while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { };
+
+    if (memcached_failed(rc))
     {
       return rc;
     }
@@ -730,7 +834,7 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
   bool line_complete= false;
   size_t total_nr= 0;
 
-  while (!line_complete)
+  while (not line_complete)
   {
     if (ptr->read_buffer_length == 0)
     {
@@ -741,7 +845,12 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
      */
       ssize_t nread;
       memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
-      if (rc != MEMCACHED_SUCCESS)
+      if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
+      {
+        memcached_quit_server(ptr, true);
+        return memcached_set_error(*ptr, rc, MEMCACHED_AT);
+      }
+      else if (memcached_failed(rc))
       {
         return rc;
       }