Added support for two part shutdown of socket.
[m6w6/libmemcached] / libmemcached / io.cc
index 837959da5db6696765036ffe5787741c8235658c..c686397f8993dcc876ce061235be5ee0779e8941 100644 (file)
 
 #include <libmemcached/common.h>
 
+#ifdef HAVE_SYS_SOCKET_H
+#  include <sys/socket.h>
+#endif
+
 void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header)
 {
   server->request_id++;
@@ -75,10 +79,10 @@ static bool repack_input_buffer(org::libmemcached::Instance* ptr)
     do {
       /* Just try a single read to grab what's available */
       ssize_t nr;
-      if ((nr= recv(ptr->fd,
+      if ((nr= ::recv(ptr->fd,
                     ptr->read_ptr + ptr->read_data_length,
                     MEMCACHED_MAX_BUFFER - ptr->read_data_length,
-                    MSG_DONTWAIT)) <= 0)
+                    MSG_NOSIGNAL)) <= 0)
       {
         if (nr == 0)
         {
@@ -236,7 +240,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* ptr,
         int local_errno= EINVAL;
         int err;
         socklen_t len= sizeof (err);
-        if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+        if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == 0)
         {
           if (err == 0) // treat this as EINTR
           {
@@ -330,7 +334,16 @@ static bool io_flush(org::libmemcached::Instance* ptr,
     WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
     WATCHPOINT_ASSERT(write_length > 0);
 
-    int flags= with_flush ? MSG_NOSIGNAL|MSG_DONTWAIT : MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE;
+    int flags;
+    if (with_flush)
+    {
+      flags= MSG_NOSIGNAL;
+    }
+    else
+    {
+      flags= MSG_NOSIGNAL|MSG_MORE;
+    }
+
     ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags);
 
     if (sent_length == SOCKET_ERROR)
@@ -406,7 +419,7 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* ptr)
   ssize_t data_read;
   do
   {
-    data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
+    data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL);
     if (data_read == SOCKET_ERROR)
     {
       switch (get_socket_errno())
@@ -544,7 +557,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr)
   char buffer[MEMCACHED_MAX_BUFFER];
   do
   {
-    data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
+    data_read= ::recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_NOSIGNAL);
     if (data_read == SOCKET_ERROR)
     {
       switch (get_socket_errno())
@@ -690,31 +703,37 @@ bool memcached_io_writev(org::libmemcached::Instance* ptr,
   return (complete_total == total);
 }
 
-
-void memcached_io_close(org::libmemcached::Instance* ptr)
+void org::libmemcached::Instance::start_close_socket()
 {
-  if (ptr->fd == INVALID_SOCKET)
+  if (fd != INVALID_SOCKET)
   {
-    return;
+    shutdown(fd, SHUT_WR);
+    options.is_shutting_down= true;
   }
+}
 
-  /* in case of death shutdown to avoid blocking at close() */
-  if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
+void org::libmemcached::Instance::close_socket()
+{
+  if (fd != INVALID_SOCKET)
   {
-    WATCHPOINT_NUMBER(ptr->fd);
-    WATCHPOINT_ERRNO(get_socket_errno());
-    WATCHPOINT_ASSERT(get_socket_errno());
-  }
+    /* in case of death shutdown to avoid blocking at close() */
+    if (shutdown(fd, SHUT_RDWR) == SOCKET_ERROR and get_socket_errno() != ENOTCONN)
+    {
+      WATCHPOINT_NUMBER(fd);
+      WATCHPOINT_ERRNO(get_socket_errno());
+      WATCHPOINT_ASSERT(get_socket_errno());
+    }
 
-  if (closesocket(ptr->fd) == SOCKET_ERROR)
-  {
-    WATCHPOINT_ERRNO(get_socket_errno());
+    if (closesocket(fd) == SOCKET_ERROR)
+    {
+      WATCHPOINT_ERRNO(get_socket_errno());
+    }
+    state= MEMCACHED_SERVER_STATE_NEW;
+    fd= INVALID_SOCKET;
   }
-  ptr->state= MEMCACHED_SERVER_STATE_NEW;
-  ptr->fd= INVALID_SOCKET;
 }
 
-org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc)
+org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&)
 {
 #define MAX_SERVERS_TO_POLL 100
   struct pollfd fds[MAX_SERVERS_TO_POLL];
@@ -729,7 +748,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc
       return instance;
     }
 
-    if (memcached_instance_response_count(instance) > 0)
+    if (instance->response_count() > 0)
     {
       fds[host_index].events= POLLIN;
       fds[host_index].revents= 0;
@@ -745,7 +764,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc
     {
       org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x);
 
-      if (memcached_instance_response_count(instance) > 0)
+      if (instance->response_count() > 0)
       {
         return instance;
       }