Fix for decrement
[awesomized/libmemcached] / libmemcached / memcached_io.c
index 47486d3095b799872a6943004c23821af525c7b9..6f8528933f27258179c96f7e8a2e47ee976ee035 100644 (file)
@@ -30,6 +30,22 @@ static memcached_return io_wait(memcached_server_st *ptr,
   fds[0].fd= ptr->fd;
   fds[0].events= flags;
 
+#ifdef NOT_DONE
+  /*
+  ** We are going to block on write, but at least on Solaris we might block
+  ** on write if we haven't read anything from our input buffer..
+  ** Try to purge the input buffer if we don't do any flow control in the
+  ** application layer (just sending a lot of data etc)
+  ** The test is moved down in the purge function to avoid duplication of
+  ** the test.
+  */
+  if (read_or_write == MEM_WRITE)
+  {
+    if (memcached_purge(ptr) != MEMCACHED_SUCCESS || memcached_purge(ptr) != MEMCACHED_STORED)
+      return MEMCACHED_FAILURE;
+  }
+#endif
+
   error= poll(fds, 1, ptr->root->poll_timeout);
 
   if (error == 1)
@@ -83,7 +99,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
 
   while (length)
   {
-    uint8_t found_eof= 0;
     if (!ptr->read_buffer_length)
     {
       ssize_t data_read;
@@ -101,6 +116,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
           switch (errno)
           {
           case EAGAIN:
+          case EINTR: 
             {
               memcached_return rc;
 
@@ -119,11 +135,21 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
         }
         else
         {
-          found_eof= 1;
-          break;
+          /*
+            EOF. Any data received so far is incomplete
+            so discard it. This always reads by byte in case of TCP
+            and protocol enforcement happens at memcached_response()
+            looking for '\n'. We do not care for UDB which requests 8 bytes
+            at once. Generally, this means that connection went away. Since
+            for blocking I/O we do not return 0 and for non-blocking case
+            it will return EGAIN if data is not immediatly available.
+          */
+          memcached_quit_server(ptr, 1);
+          return -1;
         }
       }
 
+      ptr->io_bytes_sent = 0;
       ptr->read_data_length= data_read;
       ptr->read_buffer_length= data_read;
       ptr->read_ptr= ptr->read_buffer;
@@ -149,9 +175,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
       buffer_ptr++;
       break;
     }
-
-    if (found_eof)
-      break;
   }
 
   return (size_t)(buffer_ptr - (char*)buffer);
@@ -163,6 +186,8 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
   size_t original_length;
   const char* buffer_ptr;
 
+  WATCHPOINT_ASSERT(ptr->fd != -1);
+
   original_length= length;
   buffer_ptr= buffer;
 
@@ -186,6 +211,7 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
       memcached_return rc;
       ssize_t sent_length;
 
+      WATCHPOINT_ASSERT(ptr->fd != -1);
       sent_length= io_flush(ptr, &rc);
       if (sent_length == -1)
         return -1;
@@ -197,6 +223,7 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
   if (with_flush)
   {
     memcached_return rc;
+    WATCHPOINT_ASSERT(ptr->fd != -1);
     if (io_flush(ptr, &rc) == -1)
       return -1;
   }
@@ -207,20 +234,30 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
 memcached_return memcached_io_close(memcached_server_st *ptr)
 {
   int r;
-  /* in case of death shutdown to avoid blocking at close() */
 
-  r= shutdown(ptr->fd, SHUT_RDWR);
+  if (ptr->fd == -1)
+    return MEMCACHED_SUCCESS;
 
-#ifdef HAVE_DEBUG
-  if (r && errno != ENOTCONN)
+  /* in case of death shutdown to avoid blocking at close() */
+  if (1)
   {
-    WATCHPOINT_ERRNO(errno);
-    WATCHPOINT_ASSERT(errno);
-  }
+    r= shutdown(ptr->fd, SHUT_RDWR);
+
+#ifdef HAVE_DEBUG
+    if (r && errno != ENOTCONN)
+    {
+      WATCHPOINT_NUMBER(ptr->fd);
+      WATCHPOINT_ERRNO(errno);
+      WATCHPOINT_ASSERT(errno);
+    }
 #endif
+  }
 
   r= close(ptr->fd);
-  WATCHPOINT_ASSERT(r == 0);
+#ifdef HAVE_DEBUG
+  if (r != 0)
+    WATCHPOINT_ERRNO(errno);
+#endif
 
   return MEMCACHED_SUCCESS;
 }
@@ -235,6 +272,8 @@ static ssize_t io_flush(memcached_server_st *ptr,
 
   *error= MEMCACHED_SUCCESS;
 
+  WATCHPOINT_ASSERT(ptr->fd != -1);
+
   if (ptr->write_buffer_offset == 0)
     return 0;
 
@@ -248,6 +287,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
   return_length= 0;
   while (write_length)
   {
+    WATCHPOINT_ASSERT(ptr->fd != -1);
     WATCHPOINT_ASSERT(write_length > 0);
     sent_length= 0;
     if (ptr->type == MEMCACHED_CONNECTION_UDP)
@@ -280,8 +320,25 @@ static ssize_t io_flush(memcached_server_st *ptr,
     }
     else
     {
+#ifdef NOT_DONE
+      /*
+      ** We might want to purge the input buffer if we haven't consumed
+      ** any output yet... The test for the limits is the purge is inline
+      ** in the purge function to avoid duplicating the logic..
+      */
+      {
+        memcached_return rc;
+        WATCHPOINT_ASSERT(ptr->fd != -1);
+        rc= memcached_purge(ptr);
+
+        if (rc != MEMCACHED_SUCCESS || rc != MEMCACHED_STORED)
+          return -1;
+      }
+#endif
+
+      WATCHPOINT_ASSERT(ptr->fd != -1);
       if ((sent_length= write(ptr->fd, local_write_ptr, 
-                                       write_length)) == -1)
+                              write_length)) == -1)
       {
         switch (errno)
         {
@@ -307,13 +364,16 @@ static ssize_t io_flush(memcached_server_st *ptr,
       }
     }
 
+    ptr->io_bytes_sent += sent_length;
+
     local_write_ptr+= sent_length;
     write_length-= sent_length;
     return_length+= sent_length;
   }
 
   WATCHPOINT_ASSERT(write_length == 0);
-  WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset);
+  // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
+  // ptr->write_buffer_offset);
   ptr->write_buffer_offset= 0;
 
   return return_length;
@@ -324,5 +384,5 @@ static ssize_t io_flush(memcached_server_st *ptr,
 */
 void memcached_io_reset(memcached_server_st *ptr)
 {
-  memcached_quit_server(ptr, 0);
+  memcached_quit_server(ptr, 1);
 }