Remove how use instance (keep API intact)
[m6w6/libmemcached] / libmemcached / io.cc
index 85474d3eae215b8b7e33564fc605cf2b6cb7c2b2..d2e40043dfaa1c958ae1e346ed8dc1dbacb6bd3a 100644 (file)
 
 #include <libmemcached/common.h>
 
+void initialize_binary_request(memcached_server_write_instance_st server, protocol_binary_request_header& header)
+{
+  server->request_id++;
+  header.request.magic= PROTOCOL_BINARY_REQ;
+  header.request.opaque= htons(server->request_id);
+}
+
 enum memc_read_or_write {
   MEM_READ,
   MEM_WRITE
@@ -205,7 +212,6 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
     return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
   }
 
-  int local_errno;
   size_t loop_max= 5;
   while (--loop_max) // While loop is for ERESTART or EINTR
   {
@@ -213,17 +219,47 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
 
     if (active_fd >= 1)
     {
-      assert_msg(active_fd == 1 , "poll() returned an unexpected value");
-      return MEMCACHED_SUCCESS;
+      assert_msg(active_fd == 1 , "poll() returned an unexpected number of active file descriptors");
+      if (fds.revents & POLLIN or fds.revents & POLLOUT)
+      {
+        return MEMCACHED_SUCCESS;
+      }
+
+      if (fds.revents & POLLHUP)
+      {
+        return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, 
+                                   memcached_literal_param("poll() detected hang up"));
+      }
+
+      if (fds.revents & POLLERR)
+      {
+        int local_errno= EINVAL;
+        int err;
+        socklen_t len= sizeof (err);
+        if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+        {
+          if (err == 0) // treat this as EINTR
+          {
+            continue;
+          }
+          local_errno= err;
+        }
+        memcached_quit_server(ptr, true);
+        return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT,
+                                   memcached_literal_param("poll() returned POLLHUP"));
+      }
+      
+      return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with"));
     }
-    else if (active_fd == 0)
+
+    if (active_fd == 0)
     {
       ptr->io_wait_count.timeouts++;
       return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
     }
 
     // Only an error should result in this code being called.
-    local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
+    int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
     assert_msg(active_fd == -1 , "poll() returned an unexpected value");
     switch (local_errno)
     {
@@ -235,34 +271,27 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
 
     case EFAULT:
     case ENOMEM:
-      return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+      memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
 
     case EINVAL:
-      return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+      memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
 
     default:
-      if (fds.revents & POLLERR)
-      {
-        int err;
-        socklen_t len= sizeof (err);
-        if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
-        {
-          if (err == 0) // treat this as EINTR
-          {
-            continue;
-          }
-          local_errno= err;
-        }
-      }
-      break;
+      memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
     }
 
-    break; // should only occur from poll error
+    break;
   }
 
   memcached_quit_server(ptr, true);
 
-  return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
+  if (memcached_has_error(ptr))
+  {
+    return memcached_instance_error_return(ptr);
+  }
+
+  return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, 
+                             memcached_literal_param("number of attempts to call io_wait() failed"));
 }
 
 static bool io_flush(memcached_server_write_instance_st ptr,
@@ -372,7 +401,7 @@ memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_s
   return io_wait(ptr, MEM_WRITE);
 }
 
-static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize_t& nread)
+static memcached_return_t _io_fill(memcached_server_write_instance_st ptr)
 {
   ssize_t data_read;
   do
@@ -415,12 +444,12 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize
       case EFAULT:
       case ECONNREFUSED:
       default:
-        {
-          memcached_quit_server(ptr, true);
-          nread= -1;
-          return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
-        }
+        memcached_quit_server(ptr, true);
+        memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+        break;
       }
+
+      return memcached_instance_error_return(ptr);
     }
     else if (data_read == 0)
     {
@@ -434,7 +463,6 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize
         it will return EGAIN if data is not immediatly available.
       */
       memcached_quit_server(ptr, true);
-      nread= -1;
       return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, 
                                  memcached_literal_param("::rec() returned zero, server has disconnected"));
     }
@@ -469,17 +497,16 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
     if (ptr->read_buffer_length == 0)
     {
       memcached_return_t io_fill_ret;
-      if (memcached_fatal(io_fill_ret= _io_fill(ptr, nread)))
+      if (memcached_fatal(io_fill_ret= _io_fill(ptr)))
       {
+        nread= -1;
         return io_fill_ret;
       }
     }
 
     if (length > 1)
     {
-      size_t difference;
-
-      difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
+      size_t difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
 
       memcpy(buffer_ptr, ptr->read_ptr, difference);
       length -= difference;
@@ -702,11 +729,11 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
       return instance;
     }
 
-    if (memcached_server_response_count(instance) > 0)
+    if (memcached_instance_response_count(instance) > 0)
     {
-      fds[host_index].events = POLLIN;
-      fds[host_index].revents = 0;
-      fds[host_index].fd = instance->fd;
+      fds[host_index].events= POLLIN;
+      fds[host_index].revents= 0;
+      fds[host_index].fd= instance->fd;
       ++host_index;
     }
   }
@@ -716,10 +743,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
     /* We have 0 or 1 server with pending events.. */
     for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
     {
-      memcached_server_write_instance_st instance=
-        memcached_server_instance_fetch(memc, x);
+      memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
 
-      if (memcached_server_response_count(instance) > 0)
+      if (memcached_instance_response_count(instance) > 0)
       {
         return instance;
       }