Merged trunk.
[m6w6/libmemcached] / libmemcached / io.c
index bc74af7d985962f920be22543a758a6b457c664c..dedcdaf9e9deff939b1d599255263dae8baa4d30 100644 (file)
@@ -55,14 +55,10 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
       return MEMCACHED_FAILURE;
   }
 
-  int timeout= ptr->root->poll_timeout;
-  if (ptr->root->flags.no_block == false)
-    timeout= -1;
-
   size_t loop_max= 5;
   while (--loop_max) // While loop is for ERESTART or EINTR
   {
-    error= poll(&fds, 1, timeout);
+    error= poll(&fds, 1, ptr->root->poll_timeout);
 
     switch (error)
     {
@@ -109,6 +105,11 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
   return MEMCACHED_FAILURE;
 }
 
+memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+{
+  return io_wait(ptr, MEM_WRITE);
+}
+
 /**
  * Try to fill the input buffer for a server with as much
  * data as possible.
@@ -385,7 +386,9 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr,
       buffer_end= MAX_UDP_DATAGRAM_LENGTH;
       should_write= length;
       if (ptr->write_buffer_offset + should_write > buffer_end)
+      {
         return -1;
+      }
     }
     else
     {
@@ -408,7 +411,9 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr,
       WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
       sent_length= io_flush(ptr, &rc);
       if (sent_length == -1)
+      {
         return -1;
+      }
 
       /* If io_flush calls memcached_purge, sent_length may be 0 */
       unlikely (sent_length != 0)
@@ -574,7 +579,9 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
     rc= memcached_purge(ptr);
 
     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+    {
       return -1;
+    }
   }
   ssize_t sent_length;
   size_t return_length;
@@ -587,7 +594,9 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
 
   // UDP Sanity check, make sure that we are not sending somthing too big
   if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
+  {
     return -1;
+  }
 
   if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
                                         && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
@@ -609,7 +618,8 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
     if (ptr->type == MEMCACHED_CONNECTION_UDP)
       increment_udp_message_id(ptr);
 
-    sent_length= send(ptr->fd, local_write_ptr, write_length, 0);
+    WATCHPOINT_ASSERT(ptr->fd != -1);
+    sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
     if (sent_length == SOCKET_ERROR)
     {
       ptr->cached_errno= get_socket_errno();
@@ -645,9 +655,12 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr,
           memcached_quit_server(ptr, true);
           return -1;
         }
+      case ENOTCONN:
+      case EPIPE:
       default:
         memcached_quit_server(ptr, true);
         *error= MEMCACHED_ERRNO;
+        WATCHPOINT_ASSERT(ptr->fd == -1);
         return -1;
       }
     }