Make sure we use the correct strerror() in case someone is using threads.
[awesomized/libmemcached] / libmemcached / io.cc
index fdb265acabf81bdf2482bd27581333da4c6d267a..6594aa75d13972e2a84d920c921ce4d0b225e326 100644 (file)
@@ -179,18 +179,52 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr)
   /* There is room in the buffer, try to fill it! */
   if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
   {
-    /* Just try a single read to grab what's available */
-    ssize_t nr= recv(ptr->fd,
-                     ptr->read_ptr + ptr->read_data_length,
-                     MEMCACHED_MAX_BUFFER - ptr->read_data_length,
-                     MSG_DONTWAIT);
+    do {
+      /* Just try a single read to grab what's available */
+      ssize_t nr= recv(ptr->fd,
+                       ptr->read_ptr + ptr->read_data_length,
+                       MEMCACHED_MAX_BUFFER - ptr->read_data_length,
+                       MSG_DONTWAIT);
+
+      switch (nr)
+      {
+      case SOCKET_ERROR:
+        {
+          switch (get_socket_errno())
+          {
+          case EINTR:
+            continue;
 
-    if (nr > 0)
-    {
-      ptr->read_data_length+= (size_t)nr;
-      ptr->read_buffer_length+= (size_t)nr;
-      return true;
-    }
+          case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+          case EAGAIN:
+#endif
+#ifdef TARGET_OS_LINUX
+          case ERESTART:
+#endif
+            break; // No IO is fine, we can just move on
+
+          default:
+            memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+          }
+        }
+        break;
+
+      case 0: // Shutdown on the socket has occurred
+        {
+          memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
+        }
+        break;
+
+      default:
+        {
+          ptr->read_data_length+= size_t(nr);
+          ptr->read_buffer_length+= size_t(nr);
+          return true;
+        }
+        break;
+      }
+    } while (0);
   }
   return false;
 }
@@ -250,8 +284,15 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr)
 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
                                      void *buffer, size_t length, ssize_t *nread)
 {
+  assert(ptr); // Programmer error
   char *buffer_ptr= static_cast<char *>(buffer);
 
+  if (ptr->fd == INVALID_SOCKET)
+  {
+    assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
+    return MEMCACHED_CONNECTION_FAILURE;
+  }
+
   while (length)
   {
     if (not ptr->read_buffer_length)
@@ -264,11 +305,14 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
         {
           switch (get_socket_errno())
           {
+          case EINTR: // We just retry
+            continue;
+
+          case ETIMEDOUT: // OSX
           case EWOULDBLOCK:
 #ifdef USE_EAGAIN
           case EAGAIN:
 #endif
-          case EINTR:
 #ifdef TARGET_OS_LINUX
           case ERESTART:
 #endif
@@ -280,6 +324,15 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
 
             /* fall through */
 
+          case ENOTCONN: // Programmer Error
+            WATCHPOINT_ASSERT(0);
+          case ENOTSOCK:
+            WATCHPOINT_ASSERT(0);
+          case EBADF:
+            assert(ptr->fd != INVALID_SOCKET);
+          case EINVAL:
+          case EFAULT:
+          case ECONNREFUSED:
           default:
             {
               memcached_quit_server(ptr, true);
@@ -300,6 +353,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
             it will return EGAIN if data is not immediatly available.
           */
           WATCHPOINT_STRING("We had a zero length recv()");
+          assert(0);
           memcached_quit_server(ptr, true);
           *nread= -1;
           return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
@@ -339,6 +393,62 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
   return MEMCACHED_SUCCESS;
 }
 
+memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
+{
+  assert(ptr); // Programmer error
+
+  if (ptr->fd == INVALID_SOCKET)
+  {
+    assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
+    return MEMCACHED_CONNECTION_FAILURE;
+  }
+
+  ssize_t data_read;
+  char buffer[MEMCACHED_MAX_BUFFER];
+  do
+  {
+    data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
+    if (data_read == SOCKET_ERROR)
+    {
+      switch (get_socket_errno())
+      {
+      case EINTR: // We just retry
+        continue;
+
+      case ETIMEDOUT: // OSX
+      case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+      case EAGAIN:
+#endif
+#ifdef TARGET_OS_LINUX
+      case ERESTART:
+#endif
+        if (memcached_success(io_wait(ptr, MEM_READ)))
+        {
+          continue;
+        }
+        return MEMCACHED_IN_PROGRESS;
+
+        /* fall through */
+
+      case ENOTCONN: // Programmer Error
+        WATCHPOINT_ASSERT(0);
+      case ENOTSOCK:
+        WATCHPOINT_ASSERT(0);
+      case EBADF:
+        assert(ptr->fd != INVALID_SOCKET);
+      case EINVAL:
+      case EFAULT:
+      case ECONNREFUSED:
+      default:
+        return MEMCACHED_CONNECTION_FAILURE; // We want this!
+      }
+    }
+  } while (data_read > 0);
+
+  return MEMCACHED_CONNECTION_FAILURE;
+}
+
 static ssize_t _io_write(memcached_server_write_instance_st ptr,
                          const void *buffer, size_t length, bool with_flush)
 {
@@ -475,9 +585,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
   struct pollfd fds[MAX_SERVERS_TO_POLL];
   unsigned int host_index= 0;
 
-  for (uint32_t x= 0;
-       x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL;
-       ++x)
+  for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
   {
     memcached_server_write_instance_st instance=
       memcached_server_instance_fetch(memc, x);
@@ -628,9 +736,11 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
            * buffer for more data and retry the write before
            * waiting..
          */
-          if (repack_input_buffer(ptr) ||
+          if (repack_input_buffer(ptr) or
               process_input_buffer(ptr))
+          {
             continue;
+          }
 
           memcached_return_t rc= io_wait(ptr, MEM_WRITE);
           if (memcached_success(rc))
@@ -708,13 +818,11 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
   while (offset < size)
   {
     ssize_t nread;
-    memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset, &nread);
-    if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
-    {
-      memcached_quit_server(ptr, true);
-      return memcached_set_error(*ptr, rc, MEMCACHED_AT);
-    }
-    else if (memcached_failed(rc))
+    memcached_return_t rc;
+
+    while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { };
+
+    if (memcached_failed(rc))
     {
       return rc;
     }