Added support for two part shutdown of socket.
authorBrian Aker <brian@tangent.org>
Thu, 10 Jan 2013 10:00:25 +0000 (05:00 -0500)
committerBrian Aker <brian@tangent.org>
Thu, 10 Jan 2013 10:00:25 +0000 (05:00 -0500)
ChangeLog
libmemcached/instance.cc
libmemcached/instance.hpp
libmemcached/io.cc
libmemcached/io.hpp
libmemcached/quit.cc

index e32851ffc29e9ac94f461bde04135fb50253b5a4..26a2dc29ed6b87d8fbb19163d328b2cd830d3c3e 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,7 @@
+
+* Added support to do two part shutdown of socket.
+
+
 1.0.15 Mon Dec 17 07:25:44 EST 2012
 * Added support for Murmur3 (HASHKIT_HASH_MURMUR3)
 * Portability fixes.
index 50d6388ef884587da588532f6df46ba3b8a6f17f..faad235dcfb3f9b66fc9d5fc49b84e30bc49f5e8 100644 (file)
@@ -347,3 +347,24 @@ void memcached_instance_next_retry(memcached_server_instance_st self, const time
     ((org::libmemcached::Instance*)self)->next_retry= absolute_time;
   }
 }
+
+namespace org {
+namespace libmemcached {
+
+  bool Instance::valid() const
+  {
+    if (fd == INVALID_SOCKET)
+    {
+      return false;
+    }
+
+    return true;
+  }
+
+  bool Instance::is_shutting_down() const
+  {
+    return options.is_shutting_down;
+  }
+
+} // namespace libmemcached
+} // namespace org
index 097d350097d96578beeff4be78d9298ce62e3d66..8f66e96d044b27e467226df0bc1f078e27444377 100644 (file)
@@ -85,6 +85,13 @@ struct Instance {
   {
   }
 
+  bool valid() const;
+
+  bool is_shutting_down() const;
+
+  void start_close_socket();
+  void close_socket();
+
   uint32_t response_count() const
   {
     return cursor_active_;
index 6d9cad9a51f3e815e56e959ee3814b609bcd4681..c686397f8993dcc876ce061235be5ee0779e8941 100644 (file)
@@ -703,28 +703,34 @@ bool memcached_io_writev(org::libmemcached::Instance* ptr,
   return (complete_total == total);
 }
 
-
-void memcached_io_close(org::libmemcached::Instance* ptr)
+void org::libmemcached::Instance::start_close_socket()
 {
-  if (ptr->fd == INVALID_SOCKET)
+  if (fd != INVALID_SOCKET)
   {
-    return;
+    shutdown(fd, SHUT_WR);
+    options.is_shutting_down= true;
   }
+}
 
-  /* in case of death shutdown to avoid blocking at close() */
-  if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR and get_socket_errno() != ENOTCONN)
+void org::libmemcached::Instance::close_socket()
+{
+  if (fd != INVALID_SOCKET)
   {
-    WATCHPOINT_NUMBER(ptr->fd);
-    WATCHPOINT_ERRNO(get_socket_errno());
-    WATCHPOINT_ASSERT(get_socket_errno());
-  }
+    /* in case of death shutdown to avoid blocking at close() */
+    if (shutdown(fd, SHUT_RDWR) == SOCKET_ERROR and get_socket_errno() != ENOTCONN)
+    {
+      WATCHPOINT_NUMBER(fd);
+      WATCHPOINT_ERRNO(get_socket_errno());
+      WATCHPOINT_ASSERT(get_socket_errno());
+    }
 
-  if (closesocket(ptr->fd) == SOCKET_ERROR)
-  {
-    WATCHPOINT_ERRNO(get_socket_errno());
+    if (closesocket(fd) == SOCKET_ERROR)
+    {
+      WATCHPOINT_ERRNO(get_socket_errno());
+    }
+    state= MEMCACHED_SERVER_STATE_NEW;
+    fd= INVALID_SOCKET;
   }
-  ptr->state= MEMCACHED_SERVER_STATE_NEW;
-  ptr->fd= INVALID_SOCKET;
 }
 
 org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&)
index ed0b82ad6714f2f50a21300d2143465c85e5c97f..bf2133e8af1926f504f603c570ee3cc6814f629b 100644 (file)
@@ -62,8 +62,6 @@ memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr,
                                          size_t size,
                                          size_t& total);
 
-void memcached_io_close(org::libmemcached::Instance* ptr);
-
 /* Read n bytes of data from the server and store them in dta */
 memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr,
                                        void *dta,
index 77924fb4930ee277bf4c3a506e8bd6d0ca78884c..c12a1d82b32ca5b9385c49c6fc0562cd86b093a6 100644 (file)
   will force data to be completed.
 */
 
-void memcached_quit_server(org::libmemcached::Instance* ptr, bool io_death)
+void memcached_quit_server(org::libmemcached::Instance* instance, bool io_death)
 {
-  if (ptr->fd != INVALID_SOCKET)
+  if (instance->valid())
   {
-    if (io_death == false and memcached_is_udp(ptr->root) == false and ptr->options.is_shutting_down == false)
+    if (io_death == false and memcached_is_udp(instance->root) == false and instance->is_shutting_down() == false)
     {
-      ptr->options.is_shutting_down= true;
-
       memcached_return_t rc;
-      if (ptr->root->flags.binary_protocol)
+      if (instance->root->flags.binary_protocol)
       {
         protocol_binary_request_quit request= {}; // = {.bytes= {0}};
 
-        initialize_binary_request(ptr, request.message.header);
+        initialize_binary_request(instance, request.message.header);
 
         request.message.header.request.opcode = PROTOCOL_BINARY_CMD_QUIT;
         request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
@@ -69,7 +67,7 @@ void memcached_quit_server(org::libmemcached::Instance* ptr, bool io_death)
           { request.bytes, sizeof(request.bytes) }
         };
 
-        rc= memcached_vdo(ptr, vector, 1, true);
+        rc= memcached_vdo(instance, vector, 1, true);
       }
       else
       {
@@ -78,9 +76,11 @@ void memcached_quit_server(org::libmemcached::Instance* ptr, bool io_death)
           { memcached_literal_param("quit\r\n") }
         };
 
-        rc= memcached_vdo(ptr, vector, 1, true);
+        rc= memcached_vdo(instance, vector, 1, true);
       }
 
+      instance->start_close_socket();
+
       /* read until socket is closed, or there is an error
        * closing the socket before all data is read
        * results in server throwing away all data which is
@@ -89,17 +89,17 @@ void memcached_quit_server(org::libmemcached::Instance* ptr, bool io_death)
        * In .40 we began to only do this if we had been doing buffered
        * requests of had replication enabled.
        */
-      if (memcached_success(rc) and (ptr->root->flags.buffer_requests or ptr->root->number_of_replicas))
+      if (memcached_success(rc) and (instance->root->flags.buffer_requests or instance->root->number_of_replicas))
       {
         if (0)
         {
           memcached_return_t rc_slurp;
-          while (memcached_continue(rc_slurp= memcached_io_slurp(ptr))) {} ;
+          while (memcached_continue(rc_slurp= memcached_io_slurp(instance))) {} ;
           WATCHPOINT_ASSERT(rc_slurp == MEMCACHED_CONNECTION_FAILURE);
         }
         else
         {
-          memcached_io_slurp(ptr);
+          memcached_io_slurp(instance);
         }
       }
 
@@ -110,47 +110,49 @@ void memcached_quit_server(org::libmemcached::Instance* ptr, bool io_death)
        * server to ensure that the server processed all of the data we
        * sent to the server.
        */
-      ptr->server_failure_counter= 0;
+      instance->server_failure_counter= 0;
     }
-    memcached_io_close(ptr);
+
   }
 
-  ptr->state= MEMCACHED_SERVER_STATE_NEW;
-  ptr->cursor_active_= 0;
-  ptr->io_bytes_sent= 0;
-  ptr->write_buffer_offset= size_t(ptr->root and memcached_is_udp(ptr->root) ? UDP_DATAGRAM_HEADER_LENGTH : 0);
-  ptr->read_buffer_length= 0;
-  ptr->read_ptr= ptr->read_buffer;
-  ptr->options.is_shutting_down= false;
-  memcached_server_response_reset(ptr);
+  instance->close_socket();
+
+  instance->state= MEMCACHED_SERVER_STATE_NEW;
+  instance->cursor_active_= 0;
+  instance->io_bytes_sent= 0;
+  instance->write_buffer_offset= size_t(instance->root and memcached_is_udp(instance->root) ? UDP_DATAGRAM_HEADER_LENGTH : 0);
+  instance->read_buffer_length= 0;
+  instance->read_ptr= instance->read_buffer;
+  instance->options.is_shutting_down= false;
+  memcached_server_response_reset(instance);
 
   // We reset the version so that if we end up talking to a different server
   // we don't have stale server version information.
-  ptr->major_version= ptr->minor_version= ptr->micro_version= UINT8_MAX;
+  instance->major_version= instance->minor_version= instance->micro_version= UINT8_MAX;
 
   if (io_death)
   {
-    memcached_mark_server_for_timeout(ptr);
+    memcached_mark_server_for_timeout(instance);
   }
 }
 
-void send_quit(memcached_st *ptr)
+void send_quit(memcached_st *memc)
 {
-  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
+  for (uint32_t x= 0; x < memcached_server_count(memc); x++)
   {
-    org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, x);
+    org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x);
 
     memcached_quit_server(instance, false);
   }
 }
 
-void memcached_quit(memcached_st *ptr)
+void memcached_quit(memcached_st *memc)
 {
   memcached_return_t rc;
-  if (memcached_failed(rc= initialize_query(ptr, true)))
+  if (memcached_failed(rc= initialize_query(memc, true)))
   {
     return;
   }
 
-  send_quit(ptr);
+  send_quit(memc);
 }