Implemented support for noreply in the binary protocol
[awesomized/libmemcached] / libmemcached / memcached_io.c
index b9261f3d9bc4ef27540f5799e702a538032aca31..ffbd680648814690ac5bc683511bf0b7c22758d8 100644 (file)
@@ -30,6 +30,21 @@ static memcached_return io_wait(memcached_server_st *ptr,
   fds[0].fd= ptr->fd;
   fds[0].events= flags;
 
+  /*
+  ** We are going to block on write, but at least on Solaris we might block
+  ** on write if we haven't read anything from our input buffer..
+  ** Try to purge the input buffer if we don't do any flow control in the
+  ** application layer (just sending a lot of data etc)
+  ** The test is moved down in the purge function to avoid duplication of
+  ** the test.
+  */
+  if (read_or_write == MEM_WRITE)
+  {
+    memcached_return rc=memcached_purge(ptr);
+    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+       return MEMCACHED_FAILURE;
+  }
+
   error= poll(fds, 1, ptr->root->poll_timeout);
 
   if (error == 1)
@@ -75,7 +90,7 @@ void memcached_io_preread(memcached_st *ptr)
 #endif
 
 ssize_t memcached_io_read(memcached_server_st *ptr,
-                          char *buffer, size_t length)
+                          void *buffer, size_t length)
 {
   char *buffer_ptr;
 
@@ -83,7 +98,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
 
   while (length)
   {
-    uint8_t found_eof= 0;
     if (!ptr->read_buffer_length)
     {
       ssize_t data_read;
@@ -101,6 +115,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
           switch (errno)
           {
           case EAGAIN:
+          case EINTR: 
             {
               memcached_return rc;
 
@@ -119,11 +134,21 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
         }
         else
         {
-          found_eof= 1;
-          break;
+          /*
+            EOF. Any data received so far is incomplete
+            so discard it. This always reads by byte in case of TCP
+            and protocol enforcement happens at memcached_response()
+            looking for '\n'. We do not care for UDB which requests 8 bytes
+            at once. Generally, this means that connection went away. Since
+            for blocking I/O we do not return 0 and for non-blocking case
+            it will return EGAIN if data is not immediatly available.
+          */
+          memcached_quit_server(ptr, 1);
+          return -1;
         }
       }
 
+      ptr->io_bytes_sent = 0;
       ptr->read_data_length= data_read;
       ptr->read_buffer_length= data_read;
       ptr->read_ptr= ptr->read_buffer;
@@ -149,19 +174,18 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
       buffer_ptr++;
       break;
     }
-
-    if (found_eof)
-      break;
   }
 
-  return (size_t)(buffer_ptr - buffer);
+  return (size_t)(buffer_ptr - (char*)buffer);
 }
 
 ssize_t memcached_io_write(memcached_server_st *ptr,
-                           char *buffer, size_t length, char with_flush)
+                           const void *buffer, size_t length, char with_flush)
 {
   size_t original_length;
-  char* buffer_ptr;
+  const char* buffer_ptr;
+
+  WATCHPOINT_ASSERT(ptr->fd != -1);
 
   original_length= length;
   buffer_ptr= buffer;
@@ -186,17 +210,21 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
       memcached_return rc;
       ssize_t sent_length;
 
+      WATCHPOINT_ASSERT(ptr->fd != -1);
       sent_length= io_flush(ptr, &rc);
       if (sent_length == -1)
         return -1;
 
-      WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
+      /* If io_flush calls memcached_purge, sent_length may be 0 */
+      if (sent_length != 0)
+        WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
     }
   }
 
   if (with_flush)
   {
     memcached_return rc;
+    WATCHPOINT_ASSERT(ptr->fd != -1);
     if (io_flush(ptr, &rc) == -1)
       return -1;
   }
@@ -206,7 +234,31 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
 
 memcached_return memcached_io_close(memcached_server_st *ptr)
 {
-  close(ptr->fd);
+  int r;
+
+  if (ptr->fd == -1)
+    return MEMCACHED_SUCCESS;
+
+  /* in case of death shutdown to avoid blocking at close() */
+  if (1)
+  {
+    r= shutdown(ptr->fd, SHUT_RDWR);
+
+#ifdef HAVE_DEBUG
+    if (r && errno != ENOTCONN)
+    {
+      WATCHPOINT_NUMBER(ptr->fd);
+      WATCHPOINT_ERRNO(errno);
+      WATCHPOINT_ASSERT(errno);
+    }
+#endif
+  }
+
+  r= close(ptr->fd);
+#ifdef HAVE_DEBUG
+  if (r != 0)
+    WATCHPOINT_ERRNO(errno);
+#endif
 
   return MEMCACHED_SUCCESS;
 }
@@ -214,24 +266,42 @@ memcached_return memcached_io_close(memcached_server_st *ptr)
 static ssize_t io_flush(memcached_server_st *ptr,
                         memcached_return *error)
 {
-  size_t sent_length;
+  /*
+  ** We might want to purge the input buffer if we haven't consumed
+  ** any output yet... The test for the limits is the purge is inline
+  ** in the purge function to avoid duplicating the logic..
+  */
+  {
+     memcached_return rc;
+     WATCHPOINT_ASSERT(ptr->fd != -1);
+     rc= memcached_purge(ptr);
+
+     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+       return -1;
+  }
+  ssize_t sent_length;
   size_t return_length;
   char *local_write_ptr= ptr->write_buffer;
   size_t write_length= ptr->write_buffer_offset;
 
   *error= MEMCACHED_SUCCESS;
 
+  WATCHPOINT_ASSERT(ptr->fd != -1);
+
   if (ptr->write_buffer_offset == 0)
     return 0;
 
   /* Looking for memory overflows */
+#if defined(HAVE_DEBUG)
   if (write_length == MEMCACHED_MAX_BUFFER)
     WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
   WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
+#endif
 
   return_length= 0;
   while (write_length)
   {
+    WATCHPOINT_ASSERT(ptr->fd != -1);
     WATCHPOINT_ASSERT(write_length > 0);
     sent_length= 0;
     if (ptr->type == MEMCACHED_CONNECTION_UDP)
@@ -264,8 +334,9 @@ static ssize_t io_flush(memcached_server_st *ptr,
     }
     else
     {
-      if ((ssize_t)(sent_length= write(ptr->fd, local_write_ptr, 
-                                       write_length)) == -1)
+      WATCHPOINT_ASSERT(ptr->fd != -1);
+      if ((sent_length= write(ptr->fd, local_write_ptr, 
+                              write_length)) == -1)
       {
         switch (errno)
         {
@@ -276,7 +347,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
             memcached_return rc;
             rc= io_wait(ptr, MEM_WRITE);
 
-            if (rc == MEMCACHED_SUCCESS)
+            if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) 
               continue;
 
             memcached_quit_server(ptr, 1);
@@ -291,13 +362,16 @@ static ssize_t io_flush(memcached_server_st *ptr,
       }
     }
 
+    ptr->io_bytes_sent += sent_length;
+
     local_write_ptr+= sent_length;
     write_length-= sent_length;
     return_length+= sent_length;
   }
 
   WATCHPOINT_ASSERT(write_length == 0);
-  WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset);
+  // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
+  // ptr->write_buffer_offset);
   ptr->write_buffer_offset= 0;
 
   return return_length;
@@ -308,5 +382,75 @@ static ssize_t io_flush(memcached_server_st *ptr,
 */
 void memcached_io_reset(memcached_server_st *ptr)
 {
-  memcached_quit_server(ptr, 0);
+  memcached_quit_server(ptr, 1);
+}
+
+/**
+ * Read a given number of bytes from the server and place it into a specific
+ * buffer. Reset the IO channel on this server if an error occurs. 
+ */
+memcached_return memcached_safe_read(memcached_server_st *ptr,
+                                     void *dta,
+                                     size_t size)
+{
+  size_t offset= 0;
+  char *data= dta;
+
+  while (offset < size)
+  {
+    ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
+    if (nread <= 0)
+    {
+      memcached_io_reset(ptr);
+      return MEMCACHED_UNKNOWN_READ_FAILURE;
+    }
+    offset+= nread;
+  }
+
+  return MEMCACHED_SUCCESS;
+}
+
+memcached_return memcached_io_readline(memcached_server_st *ptr,
+                                       char *buffer_ptr,
+                                       size_t size)
+{
+  bool line_complete= false;
+  int total_nr= 0;
+
+  while (!line_complete)
+  {
+    if (ptr->read_buffer_length == 0)
+    {
+      /*
+       * We don't have any data in the buffer, so let's fill the read
+       * buffer. Call the standard read function to avoid duplicating
+       * the logic.
+       */
+      if (memcached_io_read(ptr, buffer_ptr, 1) != 1)
+        return MEMCACHED_UNKNOWN_READ_FAILURE;
+
+      if (*buffer_ptr == '\n')
+        line_complete= true;
+
+      ++buffer_ptr;
+      ++total_nr;
+    }
+
+    /* Now let's look in the buffer and copy as we go! */
+    while (ptr->read_buffer_length && total_nr < size && !line_complete)
+    {
+      *buffer_ptr = *ptr->read_ptr;
+      if (*buffer_ptr == '\n')
+        line_complete = true;
+      --ptr->read_buffer_length;
+      ++ptr->read_ptr;
+      ++total_nr;
+      ++buffer_ptr;
+    }
+
+    if (total_nr == size)
+      return MEMCACHED_PROTOCOL_ERROR;
+  }
+
+  return MEMCACHED_SUCCESS;
 }