fix io_read bug
[awesomized/libmemcached] / libmemcached / memcached_io.c
index 93ca41732d4ca26741c2c905e10e9ea001795e15..7ed2a403f58563f375c8cd04424308a4de86e0b3 100644 (file)
@@ -30,6 +30,17 @@ static memcached_return io_wait(memcached_server_st *ptr,
   fds[0].fd= ptr->fd;
   fds[0].events= flags;
 
+  /*
+  ** We are going to block on write, but at least on Solaris we might block
+  ** on write if we haven't read anything from our input buffer..
+  ** Try to purge the input buffer if we don't do any flow control in the
+  ** application layer (just sending a lot of data etc)
+  ** The test is moved down in the purge function to avoid duplication of
+  ** the test.
+  */
+  if (read_or_write == MEM_WRITE)
+    memcached_purge(ptr);
+
   error= poll(fds, 1, ptr->root->poll_timeout);
 
   if (error == 1)
@@ -75,7 +86,7 @@ void memcached_io_preread(memcached_st *ptr)
 #endif
 
 ssize_t memcached_io_read(memcached_server_st *ptr,
-                          char *buffer, size_t length)
+                          void *buffer, size_t length)
 {
   char *buffer_ptr;
 
@@ -101,6 +112,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
           switch (errno)
           {
           case EAGAIN:
+          case EINTR: 
             {
               memcached_return rc;
 
@@ -124,6 +136,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
         }
       }
 
+      ptr->io_bytes_sent = 0;
       ptr->read_data_length= data_read;
       ptr->read_buffer_length= data_read;
       ptr->read_ptr= ptr->read_buffer;
@@ -154,11 +167,11 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
       break;
   }
 
-  return (size_t)(buffer_ptr - buffer);
+  return (size_t)(buffer_ptr - (char*)buffer);
 }
 
 ssize_t memcached_io_write(memcached_server_st *ptr,
-                           const char *buffer, size_t length, char with_flush)
+                           const void *buffer, size_t length, char with_flush)
 {
   size_t original_length;
   const char* buffer_ptr;
@@ -204,14 +217,26 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
   return original_length;
 }
 
-memcached_return memcached_io_close(memcached_server_st *ptr, uint8_t io_death)
+memcached_return memcached_io_close(memcached_server_st *ptr)
 {
+  int r;
   /* in case of death shutdown to avoid blocking at close() */
 
-  if (io_death)
-    shutdown(ptr->fd, SHUT_RDWR);
-  else
-    close(ptr->fd);
+  r= shutdown(ptr->fd, SHUT_RDWR);
+
+#ifdef HAVE_DEBUG
+  if (r && errno != ENOTCONN)
+  {
+    WATCHPOINT_ERRNO(errno);
+    WATCHPOINT_ASSERT(errno);
+  }
+#endif
+
+  r= close(ptr->fd);
+#ifdef HAVE_DEBUG
+  if (r != 0)
+    WATCHPOINT_ERRNO(errno);
+#endif
 
   return MEMCACHED_SUCCESS;
 }
@@ -219,7 +244,7 @@ memcached_return memcached_io_close(memcached_server_st *ptr, uint8_t io_death)
 static ssize_t io_flush(memcached_server_st *ptr,
                         memcached_return *error)
 {
-  size_t sent_length;
+  ssize_t sent_length;
   size_t return_length;
   char *local_write_ptr= ptr->write_buffer;
   size_t write_length= ptr->write_buffer_offset;
@@ -230,9 +255,11 @@ static ssize_t io_flush(memcached_server_st *ptr,
     return 0;
 
   /* Looking for memory overflows */
+#if defined(HAVE_DEBUG)
   if (write_length == MEMCACHED_MAX_BUFFER)
     WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
   WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
+#endif
 
   return_length= 0;
   while (write_length)
@@ -269,7 +296,14 @@ static ssize_t io_flush(memcached_server_st *ptr,
     }
     else
     {
-      if ((ssize_t)(sent_length= write(ptr->fd, local_write_ptr, 
+      /*
+      ** We might want to purge the input buffer if we haven't consumed
+      ** any output yet... The test for the limits is the purge is inline
+      ** in the purge function to avoid duplicating the logic..
+      */
+      memcached_purge(ptr);
+
+      if ((sent_length= write(ptr->fd, local_write_ptr, 
                                        write_length)) == -1)
       {
         switch (errno)
@@ -296,13 +330,16 @@ static ssize_t io_flush(memcached_server_st *ptr,
       }
     }
 
+    ptr->io_bytes_sent += sent_length;
+
     local_write_ptr+= sent_length;
     write_length-= sent_length;
     return_length+= sent_length;
   }
 
   WATCHPOINT_ASSERT(write_length == 0);
-  WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset);
+  // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
+  // ptr->write_buffer_offset);
   ptr->write_buffer_offset= 0;
 
   return return_length;