Merge lp:~tangent-org/libmemcached/1.0-build/ Build: jenkins-Libmemcached-170
[awesomized/libmemcached] / libmemcached / io.cc
index bafa28c818a45c16f7b8da226c638e08da793c07..8cee0178711eeff6587cc8ded3ae226bfa79c2fa 100644 (file)
 
 #include <libmemcached/common.h>
 
+#ifdef HAVE_SYS_SOCKET_H
+#  include <sys/socket.h>
+#endif
+
+void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header)
+{
+  server->request_id++;
+  header.request.magic= PROTOCOL_BINARY_REQ;
+  header.request.opaque= htons(server->request_id);
+}
+
 enum memc_read_or_write {
   MEM_READ,
   MEM_WRITE
@@ -50,7 +61,7 @@ enum memc_read_or_write {
  *
  * @param ptr the server to pack
  */
-static bool repack_input_buffer(memcached_server_write_instance_st ptr)
+static bool repack_input_buffer(org::libmemcached::Instance* ptr)
 {
   if (ptr->read_ptr != ptr->read_buffer)
   {
@@ -68,10 +79,10 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr)
     do {
       /* Just try a single read to grab what's available */
       ssize_t nr;
-      if ((nr= recv(ptr->fd,
+      if ((nr= ::recv(ptr->fd,
                     ptr->read_ptr + ptr->read_data_length,
                     MEMCACHED_MAX_BUFFER - ptr->read_data_length,
-                    MSG_DONTWAIT)) <= 0)
+                    MSG_DONTWAIT|MSG_NOSIGNAL)) <= 0)
       {
         if (nr == 0)
         {
@@ -123,7 +134,7 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr)
  * @param ptr the server to star processing iput messages for
  * @return true if we processed anything, false otherwise
  */
-static bool process_input_buffer(memcached_server_write_instance_st ptr)
+static bool process_input_buffer(org::libmemcached::Instance* ptr)
 {
   /*
    ** We might be able to process some of the response messages if we
@@ -165,7 +176,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr)
   return false;
 }
 
-static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
+static memcached_return_t io_wait(org::libmemcached::Instance* ptr,
                                   const memc_read_or_write read_or_write)
 {
   /*
@@ -205,7 +216,6 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
     return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
   }
 
-  int local_errno;
   size_t loop_max= 5;
   while (--loop_max) // While loop is for ERESTART or EINTR
   {
@@ -213,17 +223,47 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
 
     if (active_fd >= 1)
     {
-      assert_msg(active_fd == 1 , "poll() returned an unexpected value");
-      return MEMCACHED_SUCCESS;
+      assert_msg(active_fd == 1 , "poll() returned an unexpected number of active file descriptors");
+      if (fds.revents & POLLIN or fds.revents & POLLOUT)
+      {
+        return MEMCACHED_SUCCESS;
+      }
+
+      if (fds.revents & POLLHUP)
+      {
+        return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, 
+                                   memcached_literal_param("poll() detected hang up"));
+      }
+
+      if (fds.revents & POLLERR)
+      {
+        int local_errno= EINVAL;
+        int err;
+        socklen_t len= sizeof (err);
+        if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == 0)
+        {
+          if (err == 0) // treat this as EINTR
+          {
+            continue;
+          }
+          local_errno= err;
+        }
+        memcached_quit_server(ptr, true);
+        return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT,
+                                   memcached_literal_param("poll() returned POLLHUP"));
+      }
+      
+      return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with"));
     }
-    else if (active_fd == 0)
+
+    if (active_fd == 0)
     {
       ptr->io_wait_count.timeouts++;
       return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
     }
 
     // Only an error should result in this code being called.
-    local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
+    int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
     assert_msg(active_fd == -1 , "poll() returned an unexpected value");
     switch (local_errno)
     {
@@ -235,37 +275,30 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
 
     case EFAULT:
     case ENOMEM:
-      return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+      memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
 
     case EINVAL:
-      return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+      memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
 
     default:
-      if (fds.revents & POLLERR)
-      {
-        int err;
-        socklen_t len= sizeof (err);
-        if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
-        {
-          if (err == 0) // treat this as EINTR
-          {
-            continue;
-          }
-          local_errno= err;
-        }
-      }
-      break;
+      memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
     }
 
-    break; // should only occur from poll error
+    break;
   }
 
   memcached_quit_server(ptr, true);
 
-  return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
+  if (memcached_has_error(ptr))
+  {
+    return memcached_instance_error_return(ptr);
+  }
+
+  return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, 
+                             memcached_literal_param("number of attempts to call io_wait() failed"));
 }
 
-static bool io_flush(memcached_server_write_instance_st ptr,
+static bool io_flush(org::libmemcached::Instance* ptr,
                      const bool with_flush,
                      memcached_return_t& error)
 {
@@ -301,7 +334,16 @@ static bool io_flush(memcached_server_write_instance_st ptr,
     WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
     WATCHPOINT_ASSERT(write_length > 0);
 
-    int flags= with_flush ? MSG_NOSIGNAL|MSG_DONTWAIT : MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE;
+    int flags;
+    if (with_flush)
+    {
+      flags= MSG_NOSIGNAL|MSG_DONTWAIT;
+    }
+    else
+    {
+      flags= MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE;
+    }
+
     ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags);
 
     if (sent_length == SOCKET_ERROR)
@@ -367,17 +409,17 @@ static bool io_flush(memcached_server_write_instance_st ptr,
   return true;
 }
 
-memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* ptr)
 {
   return io_wait(ptr, MEM_WRITE);
 }
 
-static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize_t& nread)
+static memcached_return_t _io_fill(org::libmemcached::Instance* ptr)
 {
   ssize_t data_read;
   do
   {
-    data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
+    data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT|MSG_NOSIGNAL);
     if (data_read == SOCKET_ERROR)
     {
       switch (get_socket_errno())
@@ -415,12 +457,12 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize
       case EFAULT:
       case ECONNREFUSED:
       default:
-        {
-          memcached_quit_server(ptr, true);
-          nread= -1;
-          return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
-        }
+        memcached_quit_server(ptr, true);
+        memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+        break;
       }
+
+      return memcached_instance_error_return(ptr);
     }
     else if (data_read == 0)
     {
@@ -434,7 +476,6 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize
         it will return EGAIN if data is not immediatly available.
       */
       memcached_quit_server(ptr, true);
-      nread= -1;
       return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, 
                                  memcached_literal_param("::rec() returned zero, server has disconnected"));
     }
@@ -449,11 +490,11 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize
   return MEMCACHED_SUCCESS;
 }
 
-memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
+memcached_return_t memcached_io_read(org::libmemcached::Instance* ptr,
                                      void *buffer, size_t length, ssize_t& nread)
 {
   assert(memcached_is_udp(ptr->root) == false);
-  assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error
+  assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error
   char *buffer_ptr= static_cast<char *>(buffer);
 
   if (ptr->fd == INVALID_SOCKET)
@@ -469,8 +510,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
     if (ptr->read_buffer_length == 0)
     {
       memcached_return_t io_fill_ret;
-      if (memcached_fatal(io_fill_ret= _io_fill(ptr, nread)))
+      if (memcached_fatal(io_fill_ret= _io_fill(ptr)))
       {
+        nread= -1;
         return io_fill_ret;
       }
     }
@@ -500,9 +542,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
   return MEMCACHED_SUCCESS;
 }
 
-memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
+memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr)
 {
-  assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st");
+  assert_msg(ptr, "Programmer error, invalid Instance");
   assert(memcached_is_udp(ptr->root) == false);
 
   if (ptr->fd == INVALID_SOCKET)
@@ -515,7 +557,7 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
   char buffer[MEMCACHED_MAX_BUFFER];
   do
   {
-    data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
+    data_read= ::recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT|MSG_NOSIGNAL);
     if (data_read == SOCKET_ERROR)
     {
       switch (get_socket_errno())
@@ -557,7 +599,7 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
   return MEMCACHED_CONNECTION_FAILURE;
 }
 
-static bool _io_write(memcached_server_write_instance_st ptr,
+static bool _io_write(org::libmemcached::Instance* ptr,
                       const void *buffer, size_t length, bool with_flush,
                       size_t& written)
 {
@@ -610,13 +652,13 @@ static bool _io_write(memcached_server_write_instance_st ptr,
   return true;
 }
 
-bool memcached_io_write(memcached_server_write_instance_st ptr)
+bool memcached_io_write(org::libmemcached::Instance* ptr)
 {
   size_t written;
   return _io_write(ptr, NULL, 0, true, written);
 }
 
-ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
+ssize_t memcached_io_write(org::libmemcached::Instance* ptr,
                            const void *buffer, const size_t length, const bool with_flush)
 {
   size_t written;
@@ -629,9 +671,9 @@ ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
   return ssize_t(written);
 }
 
-bool memcached_io_writev(memcached_server_write_instance_st ptr,
-                            libmemcached_io_vector_st vector[],
-                            const size_t number_of, const bool with_flush)
+bool memcached_io_writev(org::libmemcached::Instance* ptr,
+                         libmemcached_io_vector_st vector[],
+                         const size_t number_of, const bool with_flush)
 {
   ssize_t complete_total= 0;
   ssize_t total= 0;
@@ -662,7 +704,7 @@ bool memcached_io_writev(memcached_server_write_instance_st ptr,
 }
 
 
-void memcached_io_close(memcached_server_write_instance_st ptr)
+void memcached_io_close(org::libmemcached::Instance* ptr)
 {
   if (ptr->fd == INVALID_SOCKET)
   {
@@ -670,7 +712,7 @@ void memcached_io_close(memcached_server_write_instance_st ptr)
   }
 
   /* in case of death shutdown to avoid blocking at close() */
-  if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
+  if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR and get_socket_errno() != ENOTCONN)
   {
     WATCHPOINT_NUMBER(ptr->fd);
     WATCHPOINT_ERRNO(get_socket_errno());
@@ -685,7 +727,7 @@ void memcached_io_close(memcached_server_write_instance_st ptr)
   ptr->fd= INVALID_SOCKET;
 }
 
-memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
+org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc)
 {
 #define MAX_SERVERS_TO_POLL 100
   struct pollfd fds[MAX_SERVERS_TO_POLL];
@@ -693,18 +735,18 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
 
   for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x)
   {
-    memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
+    org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x);
 
     if (instance->read_buffer_length > 0) /* I have data in the buffer */
     {
       return instance;
     }
 
-    if (memcached_server_response_count(instance) > 0)
+    if (instance->response_count() > 0)
     {
-      fds[host_index].events = POLLIN;
-      fds[host_index].revents = 0;
-      fds[host_index].fd = instance->fd;
+      fds[host_index].events= POLLIN;
+      fds[host_index].revents= 0;
+      fds[host_index].fd= instance->fd;
       ++host_index;
     }
   }
@@ -714,10 +756,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
     /* We have 0 or 1 server with pending events.. */
     for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
     {
-      memcached_server_write_instance_st instance=
-        memcached_server_instance_fetch(memc, x);
+      org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x);
 
-      if (memcached_server_response_count(instance) > 0)
+      if (instance->response_count() > 0)
       {
         return instance;
       }
@@ -742,7 +783,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
       {
         for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
         {
-          memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y);
+          org::libmemcached::Instance* instance= memcached_instance_fetch(memc, y);
 
           if (instance->fd == fds[x].fd)
           {
@@ -759,7 +800,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
 /*
   Eventually we will just kill off the server with the problem.
 */
-void memcached_io_reset(memcached_server_write_instance_st ptr)
+void memcached_io_reset(org::libmemcached::Instance* ptr)
 {
   memcached_quit_server(ptr, true);
 }
@@ -768,7 +809,7 @@ void memcached_io_reset(memcached_server_write_instance_st ptr)
  * Read a given number of bytes from the server and place it into a specific
  * buffer. Reset the IO channel on this server if an error occurs.
  */
-memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
+memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr,
                                        void *dta,
                                        const size_t size)
 {
@@ -793,7 +834,7 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
   return MEMCACHED_SUCCESS;
 }
 
-memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
+memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr,
                                          char *buffer_ptr,
                                          size_t size,
                                          size_t& total_nr)