Add in MSG_DONTWAIT for recv().
[m6w6/libmemcached] / libmemcached / io.cc
index 74aa4a239d64bf3e9f894384860ce61694c19efe..fd719d6ee57410d52e2db8249cb8a0af48e8207b 100644 (file)
@@ -37,7 +37,7 @@
  */
 
 
-#include "libmemcached/common.h"
+#include <libmemcached/common.h>
 
 typedef enum {
   MEM_READ,
@@ -80,13 +80,22 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
   {
     memcached_return_t rc= memcached_purge(ptr);
     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+    {
       return MEMCACHED_FAILURE;
+    }
   }
 
   size_t loop_max= 5;
   while (--loop_max) // While loop is for ERESTART or EINTR
   {
-    error= poll(&fds, 1, ptr->root->poll_timeout);
+    if (ptr->root->poll_timeout) // Mimic 0 causes timeout behavior (not all platforms do this)
+    {
+      error= poll(&fds, 1, ptr->root->poll_timeout);
+    }
+    else
+    {
+      error= 0;
+    }
 
     switch (error)
     {
@@ -95,8 +104,10 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
       WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
 
       return MEMCACHED_SUCCESS;
+
     case 0: // Timeout occured, we let the while() loop do its thing.
-      return MEMCACHED_TIMEOUT;
+      return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+
     default:
       WATCHPOINT_ERRNO(get_socket_errno());
       switch (get_socket_errno())
@@ -106,6 +117,14 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
 #endif
       case EINTR:
         break;
+
+      case EFAULT:
+      case ENOMEM:
+        return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+
+      case EINVAL:
+        return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+
       default:
         if (fds.revents & POLLERR)
         {
@@ -120,7 +139,7 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
         }
         memcached_quit_server(ptr, true);
 
-        return MEMCACHED_FAILURE;
+        return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
       }
     }
   }
@@ -130,7 +149,7 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
   ptr->cached_errno= get_socket_errno();
   memcached_quit_server(ptr, true);
 
-  return MEMCACHED_FAILURE;
+  return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT);
 }
 
 memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
@@ -163,7 +182,7 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr)
     ssize_t nr= recv(ptr->fd,
                      ptr->read_ptr + ptr->read_data_length,
                      MEMCACHED_MAX_BUFFER - ptr->read_data_length,
-                     0);
+                     MSG_DONTWAIT);
 
     if (nr > 0)
     {
@@ -227,33 +246,6 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr)
   return false;
 }
 
-#if 0 // Dead code, this should be removed.
-void memcached_io_preread(memcached_st *ptr)
-{
-  unsigned int x;
-
-  return;
-
-  for (x= 0; x < memcached_server_count(ptr); x++)
-  {
-    if (memcached_server_response_count(ptr, x) &&
-        ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
-    {
-      size_t data_read;
-
-      data_read= recv(ptr->hosts[x].fd,
-                      ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
-                      MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length, 0);
-      if (data_read == SOCKET_ERROR)
-        continue;
-
-      ptr->hosts[x].read_buffer_length+= data_read;
-      ptr->hosts[x].read_data_length+= data_read;
-    }
-  }
-}
-#endif
-
 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
                                      void *buffer, size_t length, ssize_t *nread)
 {
@@ -267,17 +259,12 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
     {
       ssize_t data_read;
 
-      while (1)
+      do
       {
-        data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0);
-        if (data_read > 0)
-        {
-          break;
-        }
-        else if (data_read == SOCKET_ERROR)
+        data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
+
+        if (data_read == SOCKET_ERROR)
         {
-          ptr->cached_errno= get_socket_errno();
-          memcached_return_t rc= MEMCACHED_ERRNO;
           switch (get_socket_errno())
           {
           case EWOULDBLOCK:
@@ -288,19 +275,22 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
 #ifdef TARGET_OS_LINUX
           case ERESTART:
 #endif
-            if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS)
+            if (memcached_success(io_wait(ptr, MEM_READ)))
+            {
               continue;
+            }
+
             /* fall through */
 
           default:
             {
               memcached_quit_server(ptr, true);
               *nread= -1;
-              return rc;
+              return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
             }
           }
         }
-        else
+        else if (data_read == 0)
         {
           /*
             EOF. Any data received so far is incomplete
@@ -314,9 +304,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
           WATCHPOINT_STRING("We had a zero length recv()");
           memcached_quit_server(ptr, true);
           *nread= -1;
-          return MEMCACHED_UNKNOWN_READ_FAILURE;
+          return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
         }
-      }
+      } while (data_read <= 0);
 
       ptr->io_bytes_sent = 0;
       ptr->read_data_length= (size_t) data_read;
@@ -393,11 +383,10 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr,
 
     if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
     {
-      memcached_return_t rc;
-      ssize_t sent_length;
-
       WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-      sent_length= io_flush(ptr, with_flush, &rc);
+
+      memcached_return_t rc;
+      ssize_t sent_length= io_flush(ptr, with_flush, &rc);
       if (sent_length == -1)
       {
         return -1;
@@ -459,11 +448,11 @@ ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
 }
 
 
-memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr)
+void memcached_io_close(memcached_server_write_instance_st ptr)
 {
   if (ptr->fd == INVALID_SOCKET)
   {
-    return MEMCACHED_SUCCESS;
+    return;
   }
 
   /* in case of death shutdown to avoid blocking at close() */
@@ -478,8 +467,8 @@ memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr)
   {
     WATCHPOINT_ERRNO(get_socket_errno());
   }
-
-  return MEMCACHED_SUCCESS;
+  ptr->state= MEMCACHED_SERVER_STATE_NEW;
+  ptr->fd= INVALID_SOCKET;
 }
 
 memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
@@ -524,13 +513,14 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
     return NULL;
   }
 
-  int err= poll(fds, host_index, memc->poll_timeout);
-  switch (err) {
+  switch (poll(fds, host_index, memc->poll_timeout))
+  {
   case -1:
-    memcached_set_errno(memc, get_socket_errno(), NULL);
+    memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
     /* FALLTHROUGH */
   case 0:
     break;
+
   default:
     for (size_t x= 0; x < host_index; ++x)
     {
@@ -582,12 +572,15 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
   // UDP Sanity check, make sure that we are not sending somthing too big
   if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
   {
+    *error= MEMCACHED_WRITE_FAILURE;
     return -1;
   }
 
   if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
                                         && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
+  {
     return 0;
+  }
 
   /* Looking for memory overflows */
 #if defined(DEBUG)
@@ -641,29 +634,36 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
               process_input_buffer(ptr))
             continue;
 
-          memcached_return_t rc;
-          rc= io_wait(ptr, MEM_WRITE);
-
-          if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
+          memcached_return_t rc= io_wait(ptr, MEM_WRITE);
+          if (memcached_success(rc))
+          {
             continue;
+          }
+          else if (rc == MEMCACHED_TIMEOUT)
+          {
+            *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+            return -1;
+          }
 
           memcached_quit_server(ptr, true);
+          *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
           return -1;
         }
       case ENOTCONN:
       case EPIPE:
       default:
         memcached_quit_server(ptr, true);
-        *error= MEMCACHED_ERRNO;
+        *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
         WATCHPOINT_ASSERT(ptr->fd == -1);
         return -1;
       }
     }
 
-    if (ptr->type == MEMCACHED_CONNECTION_UDP &&
+    if (ptr->type == MEMCACHED_CONNECTION_UDP and
         (size_t)sent_length != write_length)
     {
       memcached_quit_server(ptr, true);
+      *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
       return -1;
     }
 
@@ -713,7 +713,9 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
     memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset,
                                              &nread);
     if (rc != MEMCACHED_SUCCESS)
+    {
       return rc;
+    }
 
     offset+= (size_t) nread;
   }
@@ -740,7 +742,9 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
       ssize_t nread;
       memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
       if (rc != MEMCACHED_SUCCESS)
+      {
         return rc;
+      }
 
       if (*buffer_ptr == '\n')
         line_complete= true;