Fix connect with timeouts
[awesomized/libmemcached] / libmemcached / memcached_io.c
index 7ed2a403f58563f375c8cd04424308a4de86e0b3..72a02a0f97f52b030b3c1ab45cefa84b742c4bf4 100644 (file)
@@ -39,7 +39,11 @@ static memcached_return io_wait(memcached_server_st *ptr,
   ** the test.
   */
   if (read_or_write == MEM_WRITE)
-    memcached_purge(ptr);
+  {
+    memcached_return rc=memcached_purge(ptr);
+    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+       return MEMCACHED_FAILURE;
+  }
 
   error= poll(fds, 1, ptr->root->poll_timeout);
 
@@ -94,7 +98,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
 
   while (length)
   {
-    uint8_t found_eof= 0;
     if (!ptr->read_buffer_length)
     {
       ssize_t data_read;
@@ -131,8 +134,17 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
         }
         else
         {
-          found_eof= 1;
-          break;
+          /*
+            EOF. Any data received so far is incomplete
+            so discard it. This always reads by byte in case of TCP
+            and protocol enforcement happens at memcached_response()
+            looking for '\n'. We do not care for UDB which requests 8 bytes
+            at once. Generally, this means that connection went away. Since
+            for blocking I/O we do not return 0 and for non-blocking case
+            it will return EGAIN if data is not immediatly available.
+          */
+          memcached_quit_server(ptr, 1);
+          return -1;
         }
       }
 
@@ -162,9 +174,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
       buffer_ptr++;
       break;
     }
-
-    if (found_eof)
-      break;
   }
 
   return (size_t)(buffer_ptr - (char*)buffer);
@@ -176,6 +185,8 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
   size_t original_length;
   const char* buffer_ptr;
 
+  WATCHPOINT_ASSERT(ptr->fd != -1);
+
   original_length= length;
   buffer_ptr= buffer;
 
@@ -199,17 +210,21 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
       memcached_return rc;
       ssize_t sent_length;
 
+      WATCHPOINT_ASSERT(ptr->fd != -1);
       sent_length= io_flush(ptr, &rc);
       if (sent_length == -1)
         return -1;
 
-      WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
+      /* If io_flush calls memcached_purge, sent_length may be 0 */
+      if (sent_length != 0)
+        WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
     }
   }
 
   if (with_flush)
   {
     memcached_return rc;
+    WATCHPOINT_ASSERT(ptr->fd != -1);
     if (io_flush(ptr, &rc) == -1)
       return -1;
   }
@@ -220,17 +235,24 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
 memcached_return memcached_io_close(memcached_server_st *ptr)
 {
   int r;
-  /* in case of death shutdown to avoid blocking at close() */
 
-  r= shutdown(ptr->fd, SHUT_RDWR);
+  if (ptr->fd == -1)
+    return MEMCACHED_SUCCESS;
 
-#ifdef HAVE_DEBUG
-  if (r && errno != ENOTCONN)
+  /* in case of death shutdown to avoid blocking at close() */
+  if (1)
   {
-    WATCHPOINT_ERRNO(errno);
-    WATCHPOINT_ASSERT(errno);
-  }
+    r= shutdown(ptr->fd, SHUT_RDWR);
+
+#ifdef HAVE_DEBUG
+    if (r && errno != ENOTCONN)
+    {
+      WATCHPOINT_NUMBER(ptr->fd);
+      WATCHPOINT_ERRNO(errno);
+      WATCHPOINT_ASSERT(errno);
+    }
 #endif
+  }
 
   r= close(ptr->fd);
 #ifdef HAVE_DEBUG
@@ -244,6 +266,19 @@ memcached_return memcached_io_close(memcached_server_st *ptr)
 static ssize_t io_flush(memcached_server_st *ptr,
                         memcached_return *error)
 {
+  /*
+  ** We might want to purge the input buffer if we haven't consumed
+  ** any output yet... The test for the limits is the purge is inline
+  ** in the purge function to avoid duplicating the logic..
+  */
+  {
+     memcached_return rc;
+     WATCHPOINT_ASSERT(ptr->fd != -1);
+     rc= memcached_purge(ptr);
+
+     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+       return -1;
+  }
   ssize_t sent_length;
   size_t return_length;
   char *local_write_ptr= ptr->write_buffer;
@@ -251,6 +286,8 @@ static ssize_t io_flush(memcached_server_st *ptr,
 
   *error= MEMCACHED_SUCCESS;
 
+  WATCHPOINT_ASSERT(ptr->fd != -1);
+
   if (ptr->write_buffer_offset == 0)
     return 0;
 
@@ -264,6 +301,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
   return_length= 0;
   while (write_length)
   {
+    WATCHPOINT_ASSERT(ptr->fd != -1);
     WATCHPOINT_ASSERT(write_length > 0);
     sent_length= 0;
     if (ptr->type == MEMCACHED_CONNECTION_UDP)
@@ -296,16 +334,11 @@ static ssize_t io_flush(memcached_server_st *ptr,
     }
     else
     {
-      /*
-      ** We might want to purge the input buffer if we haven't consumed
-      ** any output yet... The test for the limits is the purge is inline
-      ** in the purge function to avoid duplicating the logic..
-      */
-      memcached_purge(ptr);
-
+      WATCHPOINT_ASSERT(ptr->fd != -1);
       if ((sent_length= write(ptr->fd, local_write_ptr, 
-                                       write_length)) == -1)
+                              write_length)) == -1)
       {
+        ptr->cached_errno= errno;
         switch (errno)
         {
         case ENOBUFS:
@@ -315,7 +348,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
             memcached_return rc;
             rc= io_wait(ptr, MEM_WRITE);
 
-            if (rc == MEMCACHED_SUCCESS)
+            if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) 
               continue;
 
             memcached_quit_server(ptr, 1);
@@ -323,7 +356,6 @@ static ssize_t io_flush(memcached_server_st *ptr,
           }
         default:
           memcached_quit_server(ptr, 1);
-          ptr->cached_errno= errno;
           *error= MEMCACHED_ERRNO;
           return -1;
         }
@@ -350,5 +382,75 @@ static ssize_t io_flush(memcached_server_st *ptr,
 */
 void memcached_io_reset(memcached_server_st *ptr)
 {
-  memcached_quit_server(ptr, 0);
+  memcached_quit_server(ptr, 1);
+}
+
+/**
+ * Read a given number of bytes from the server and place it into a specific
+ * buffer. Reset the IO channel on this server if an error occurs. 
+ */
+memcached_return memcached_safe_read(memcached_server_st *ptr,
+                                     void *dta,
+                                     size_t size)
+{
+  size_t offset= 0;
+  char *data= dta;
+
+  while (offset < size)
+  {
+    ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
+    if (nread <= 0)
+    {
+      memcached_io_reset(ptr);
+      return MEMCACHED_UNKNOWN_READ_FAILURE;
+    }
+    offset+= nread;
+  }
+
+  return MEMCACHED_SUCCESS;
+}
+
+memcached_return memcached_io_readline(memcached_server_st *ptr,
+                                       char *buffer_ptr,
+                                       size_t size)
+{
+  bool line_complete= false;
+  int total_nr= 0;
+
+  while (!line_complete)
+  {
+    if (ptr->read_buffer_length == 0)
+    {
+      /*
+       * We don't have any data in the buffer, so let's fill the read
+       * buffer. Call the standard read function to avoid duplicating
+       * the logic.
+       */
+      if (memcached_io_read(ptr, buffer_ptr, 1) != 1)
+        return MEMCACHED_UNKNOWN_READ_FAILURE;
+
+      if (*buffer_ptr == '\n')
+        line_complete= true;
+
+      ++buffer_ptr;
+      ++total_nr;
+    }
+
+    /* Now let's look in the buffer and copy as we go! */
+    while (ptr->read_buffer_length && total_nr < size && !line_complete)
+    {
+      *buffer_ptr = *ptr->read_ptr;
+      if (*buffer_ptr == '\n')
+        line_complete = true;
+      --ptr->read_buffer_length;
+      ++ptr->read_ptr;
+      ++total_nr;
+      ++buffer_ptr;
+    }
+
+    if (total_nr == size)
+      return MEMCACHED_PROTOCOL_ERROR;
+  }
+
+  return MEMCACHED_SUCCESS;
 }