Fix connect with timeouts
[awesomized/libmemcached] / libmemcached / memcached_io.c
index 6f8528933f27258179c96f7e8a2e47ee976ee035..72a02a0f97f52b030b3c1ab45cefa84b742c4bf4 100644 (file)
@@ -30,7 +30,6 @@ static memcached_return io_wait(memcached_server_st *ptr,
   fds[0].fd= ptr->fd;
   fds[0].events= flags;
 
-#ifdef NOT_DONE
   /*
   ** We are going to block on write, but at least on Solaris we might block
   ** on write if we haven't read anything from our input buffer..
@@ -41,10 +40,10 @@ static memcached_return io_wait(memcached_server_st *ptr,
   */
   if (read_or_write == MEM_WRITE)
   {
-    if (memcached_purge(ptr) != MEMCACHED_SUCCESS || memcached_purge(ptr) != MEMCACHED_STORED)
-      return MEMCACHED_FAILURE;
+    memcached_return rc=memcached_purge(ptr);
+    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+       return MEMCACHED_FAILURE;
   }
-#endif
 
   error= poll(fds, 1, ptr->root->poll_timeout);
 
@@ -216,7 +215,9 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
       if (sent_length == -1)
         return -1;
 
-      WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
+      /* If io_flush calls memcached_purge, sent_length may be 0 */
+      if (sent_length != 0)
+        WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
     }
   }
 
@@ -265,6 +266,19 @@ memcached_return memcached_io_close(memcached_server_st *ptr)
 static ssize_t io_flush(memcached_server_st *ptr,
                         memcached_return *error)
 {
+  /*
+  ** We might want to purge the input buffer if we haven't consumed
+  ** any output yet... The test for the limits is the purge is inline
+  ** in the purge function to avoid duplicating the logic..
+  */
+  {
+     memcached_return rc;
+     WATCHPOINT_ASSERT(ptr->fd != -1);
+     rc= memcached_purge(ptr);
+
+     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+       return -1;
+  }
   ssize_t sent_length;
   size_t return_length;
   char *local_write_ptr= ptr->write_buffer;
@@ -320,26 +334,11 @@ static ssize_t io_flush(memcached_server_st *ptr,
     }
     else
     {
-#ifdef NOT_DONE
-      /*
-      ** We might want to purge the input buffer if we haven't consumed
-      ** any output yet... The test for the limits is the purge is inline
-      ** in the purge function to avoid duplicating the logic..
-      */
-      {
-        memcached_return rc;
-        WATCHPOINT_ASSERT(ptr->fd != -1);
-        rc= memcached_purge(ptr);
-
-        if (rc != MEMCACHED_SUCCESS || rc != MEMCACHED_STORED)
-          return -1;
-      }
-#endif
-
       WATCHPOINT_ASSERT(ptr->fd != -1);
       if ((sent_length= write(ptr->fd, local_write_ptr, 
                               write_length)) == -1)
       {
+        ptr->cached_errno= errno;
         switch (errno)
         {
         case ENOBUFS:
@@ -349,7 +348,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
             memcached_return rc;
             rc= io_wait(ptr, MEM_WRITE);
 
-            if (rc == MEMCACHED_SUCCESS)
+            if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) 
               continue;
 
             memcached_quit_server(ptr, 1);
@@ -357,7 +356,6 @@ static ssize_t io_flush(memcached_server_st *ptr,
           }
         default:
           memcached_quit_server(ptr, 1);
-          ptr->cached_errno= errno;
           *error= MEMCACHED_ERRNO;
           return -1;
         }
@@ -386,3 +384,73 @@ void memcached_io_reset(memcached_server_st *ptr)
 {
   memcached_quit_server(ptr, 1);
 }
+
+/**
+ * Read a given number of bytes from the server and place it into a specific
+ * buffer. Reset the IO channel on this server if an error occurs. 
+ */
+memcached_return memcached_safe_read(memcached_server_st *ptr,
+                                     void *dta,
+                                     size_t size)
+{
+  size_t offset= 0;
+  char *data= dta;
+
+  while (offset < size)
+  {
+    ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
+    if (nread <= 0)
+    {
+      memcached_io_reset(ptr);
+      return MEMCACHED_UNKNOWN_READ_FAILURE;
+    }
+    offset+= nread;
+  }
+
+  return MEMCACHED_SUCCESS;
+}
+
+memcached_return memcached_io_readline(memcached_server_st *ptr,
+                                       char *buffer_ptr,
+                                       size_t size)
+{
+  bool line_complete= false;
+  int total_nr= 0;
+
+  while (!line_complete)
+  {
+    if (ptr->read_buffer_length == 0)
+    {
+      /*
+       * We don't have any data in the buffer, so let's fill the read
+       * buffer. Call the standard read function to avoid duplicating
+       * the logic.
+       */
+      if (memcached_io_read(ptr, buffer_ptr, 1) != 1)
+        return MEMCACHED_UNKNOWN_READ_FAILURE;
+
+      if (*buffer_ptr == '\n')
+        line_complete= true;
+
+      ++buffer_ptr;
+      ++total_nr;
+    }
+
+    /* Now let's look in the buffer and copy as we go! */
+    while (ptr->read_buffer_length && total_nr < size && !line_complete)
+    {
+      *buffer_ptr = *ptr->read_ptr;
+      if (*buffer_ptr == '\n')
+        line_complete = true;
+      --ptr->read_buffer_length;
+      ++ptr->read_ptr;
+      ++total_nr;
+      ++buffer_ptr;
+    }
+
+    if (total_nr == size)
+      return MEMCACHED_PROTOCOL_ERROR;
+  }
+
+  return MEMCACHED_SUCCESS;
+}