gitignore [ci skip]
[awesomized/libmemcached] / libmemcached / io.cc
index ba4b347494a6e5261cafb68f7d9825a3e5ec0343..d2196f67d1e6524a0585f9999e2dca2facfb35e4 100644 (file)
@@ -70,7 +70,6 @@ static bool repack_input_buffer(memcached_instance_st* instance)
    */
     memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length);
     instance->read_ptr= instance->read_buffer;
-    instance->read_data_length= instance->read_buffer_length;
   }
 
   /* There is room in the buffer, try to fill it! */
@@ -80,8 +79,8 @@ static bool repack_input_buffer(memcached_instance_st* instance)
       /* Just try a single read to grab what's available */
       ssize_t nr;
       if ((nr= ::recv(instance->fd,
-                      instance->read_ptr + instance->read_data_length,
-                      MEMCACHED_MAX_BUFFER - instance->read_data_length,
+                      instance->read_ptr + instance->read_buffer_length,
+                      MEMCACHED_MAX_BUFFER - instance->read_buffer_length,
                       MSG_NOSIGNAL)) <= 0)
       {
         if (nr == 0)
@@ -113,7 +112,6 @@ static bool repack_input_buffer(memcached_instance_st* instance)
       }
       else // We read data, append to our read buffer
       {
-        instance->read_data_length+= size_t(nr);
         instance->read_buffer_length+= size_t(nr);
 
         return true;
@@ -177,7 +175,7 @@ static bool process_input_buffer(memcached_instance_st* instance)
 }
 
 static memcached_return_t io_wait(memcached_instance_st* instance,
-                                  const memc_read_or_write read_or_write)
+                                  const short events)
 {
   /*
    ** We are going to block on write, but at least on Solaris we might block
@@ -187,7 +185,7 @@ static memcached_return_t io_wait(memcached_instance_st* instance,
    ** The test is moved down in the purge function to avoid duplication of
    ** the test.
  */
-  if (read_or_write == MEM_WRITE)
+  if (events & POLLOUT)
   {
     if (memcached_purge(instance) == false)
     {
@@ -197,12 +195,11 @@ static memcached_return_t io_wait(memcached_instance_st* instance,
 
   struct pollfd fds;
   fds.fd= instance->fd;
-  fds.events= POLLIN;
+  fds.events= events;
   fds.revents= 0;
 
-  if (read_or_write == MEM_WRITE) /* write */
+  if (fds.events & POLLOUT) /* write */
   {
-    fds.events= POLLOUT;
     instance->io_wait_count.write++;
   }
   else
@@ -274,9 +271,11 @@ static memcached_return_t io_wait(memcached_instance_st* instance,
     case EFAULT:
     case ENOMEM:
       memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+      break;
 
     case EINVAL:
       memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+      break;
 
     default:
       memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
@@ -343,6 +342,7 @@ static bool io_flush(memcached_instance_st* instance,
     }
 
     ssize_t sent_length= ::send(instance->fd, local_write_ptr, write_length, flags);
+    int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
 
     if (sent_length == SOCKET_ERROR)
     {
@@ -371,7 +371,7 @@ static bool io_flush(memcached_instance_st* instance,
             continue;
           }
 
-          memcached_return_t rc= io_wait(instance, MEM_WRITE);
+          memcached_return_t rc= io_wait(instance, POLLOUT);
           if (memcached_success(rc))
           {
             continue;
@@ -382,14 +382,14 @@ static bool io_flush(memcached_instance_st* instance,
           }
 
           memcached_quit_server(instance, true);
-          error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT);
+          error= memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
           return false;
         }
       case ENOTCONN:
       case EPIPE:
       default:
         memcached_quit_server(instance, true);
-        error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT);
+        error= memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
         WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET);
         return false;
       }
@@ -409,12 +409,12 @@ static bool io_flush(memcached_instance_st* instance,
 
 memcached_return_t memcached_io_wait_for_write(memcached_instance_st* instance)
 {
-  return io_wait(instance, MEM_WRITE);
+  return io_wait(instance, POLLOUT);
 }
 
 memcached_return_t memcached_io_wait_for_read(memcached_instance_st* instance)
 {
-  return io_wait(instance, MEM_READ);
+  return io_wait(instance, POLLIN);
 }
 
 static memcached_return_t _io_fill(memcached_instance_st* instance)
@@ -423,6 +423,8 @@ static memcached_return_t _io_fill(memcached_instance_st* instance)
   do
   {
     data_read= ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL);
+    int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
+
     if (data_read == SOCKET_ERROR)
     {
       switch (get_socket_errno())
@@ -440,7 +442,7 @@ static memcached_return_t _io_fill(memcached_instance_st* instance)
 #endif
         {
           memcached_return_t io_wait_ret;
-          if (memcached_success(io_wait_ret= io_wait(instance, MEM_READ)))
+          if (memcached_success(io_wait_ret= io_wait(instance, POLLIN)))
           {
             continue;
           }
@@ -456,12 +458,13 @@ static memcached_return_t _io_fill(memcached_instance_st* instance)
         WATCHPOINT_ASSERT(0);
       case EBADF:
         assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket");
+        /* fall through */
       case EINVAL:
       case EFAULT:
       case ECONNREFUSED:
       default:
         memcached_quit_server(instance, true);
-        memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT);
+        memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
         break;
       }
 
@@ -486,7 +489,6 @@ static memcached_return_t _io_fill(memcached_instance_st* instance)
   } while (data_read <= 0);
 
   instance->io_bytes_sent= 0;
-  instance->read_data_length= (size_t) data_read;
   instance->read_buffer_length= (size_t) data_read;
   instance->read_ptr= instance->read_buffer;
 
@@ -576,7 +578,7 @@ memcached_return_t memcached_io_slurp(memcached_instance_st* instance)
 #ifdef __linux
       case ERESTART:
 #endif
-        if (memcached_success(io_wait(instance, MEM_READ)))
+        if (memcached_success(io_wait(instance, POLLIN)))
         {
           continue;
         }
@@ -590,6 +592,7 @@ memcached_return_t memcached_io_slurp(memcached_instance_st* instance)
         assert(0);
       case EBADF:
         assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state");
+        /* fall through */
       case EINVAL:
       case EFAULT:
       case ECONNREFUSED: