First merge of Trond's patches (cherry picking).
[m6w6/libmemcached] / libmemcached / memcached_io.c
index 69771d71f172c7f421c2aea792b3af1e9d2e7cdf..fd86437fd670153e15cb67b75ca4ebc32f13aea2 100644 (file)
@@ -1,6 +1,14 @@
-/*
-  Basic socket buffered IO
-*/
+/* LibMemcached
+ * Copyright (C) 2006-2009 Brian Aker
+ * All rights reserved.
+ *
+ * Use and distribution licensed under the BSD license.  See
+ * the COPYING file in the parent directory for full text.
+ *
+ * Summary: Server IO, Not public!
+ *
+ */
+
 
 #include "common.h"
 #include "memcached_io.h"
 
 typedef enum {
   MEM_READ,
-  MEM_WRITE,
+  MEM_WRITE
 } memc_read_or_write;
 
-static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
+static ssize_t io_flush(memcached_server_st *ptr, memcached_return_t *error);
 static void increment_udp_message_id(memcached_server_st *ptr);
 
-static memcached_return io_wait(memcached_server_st *ptr,
-                                memc_read_or_write read_or_write)
+static memcached_return_t io_wait(memcached_server_st *ptr,
+                                  memc_read_or_write read_or_write)
 {
-  struct pollfd fds[1];
-  short flags= 0;
+  struct pollfd fds= {
+    .fd= ptr->fd,
+    .events = POLLIN
+  };
   int error;
 
-  if (read_or_write == MEM_WRITE) /* write */
-    flags= POLLOUT;
-  else
-    flags= POLLIN;
-
-  memset(&fds, 0, sizeof(struct pollfd));
-  fds[0].fd= ptr->fd;
-  fds[0].events= flags;
+  unlikely (read_or_write == MEM_WRITE) /* write */
+    fds.events= POLLOUT;
 
   /*
-  ** We are going to block on write, but at least on Solaris we might block
-  ** on write if we haven't read anything from our input buffer..
-  ** Try to purge the input buffer if we don't do any flow control in the
-  ** application layer (just sending a lot of data etc)
-  ** The test is moved down in the purge function to avoid duplication of
-  ** the test.
 */
+   ** We are going to block on write, but at least on Solaris we might block
+   ** on write if we haven't read anything from our input buffer..
+   ** Try to purge the input buffer if we don't do any flow control in the
+   ** application layer (just sending a lot of data etc)
+   ** The test is moved down in the purge function to avoid duplication of
+   ** the test.
+ */
   if (read_or_write == MEM_WRITE)
   {
-    memcached_return rc=memcached_purge(ptr);
+    memcached_return_t rc= memcached_purge(ptr);
     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
-       return MEMCACHED_FAILURE;
+      return MEMCACHED_FAILURE;
   }
 
-  error= poll(fds, 1, ptr->root->poll_timeout);
+  int timeout= ptr->root->poll_timeout;
+  if (ptr->root->flags.no_block == false)
+    timeout= -1;
+
+  error= poll(&fds, 1, timeout);
 
   if (error == 1)
     return MEMCACHED_SUCCESS;
   else if (error == 0)
-  {
     return MEMCACHED_TIMEOUT;
-  }
 
   /* Imposssible for anything other then -1 */
   WATCHPOINT_ASSERT(error == -1);
   memcached_quit_server(ptr, 1);
 
   return MEMCACHED_FAILURE;
+}
+
+/**
+ * Try to fill the input buffer for a server with as much
+ * data as possible.
+ *
+ * @param ptr the server to pack
+ */
+static bool repack_input_buffer(memcached_server_st *ptr)
+{
+  if (ptr->read_ptr != ptr->read_buffer)
+  {
+    /* Move all of the data to the beginning of the buffer so
+     ** that we can fit more data into the buffer...
+   */
+    memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
+    ptr->read_ptr= ptr->read_buffer;
+    ptr->read_data_length= ptr->read_buffer_length;
+  }
+
+  /* There is room in the buffer, try to fill it! */
+  if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
+  {
+    /* Just try a single read to grab what's available */
+    ssize_t nr= read(ptr->fd,
+                     ptr->read_ptr + ptr->read_data_length,
+                     MEMCACHED_MAX_BUFFER - ptr->read_data_length);
+
+    if (nr > 0)
+    {
+      ptr->read_data_length+= (size_t)nr;
+      ptr->read_buffer_length+= (size_t)nr;
+      return true;
+    }
+  }
+  return false;
+}
+
+/**
+ * If the we have callbacks connected to this server structure
+ * we may start process the input queue and fire the callbacks
+ * for the incomming messages. This function is _only_ called
+ * when the input buffer is full, so that we _know_ that we have
+ * at least _one_ message to process.
+ *
+ * @param ptr the server to star processing iput messages for
+ * @return true if we processed anything, false otherwise
+ */
+static bool process_input_buffer(memcached_server_st *ptr)
+{
+  /*
+   ** We might be able to process some of the response messages if we
+   ** have a callback set up
+ */
+  if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
+  {
+    /*
+     * We might have responses... try to read them out and fire
+     * callbacks
+   */
+    memcached_callback_st cb= *ptr->root->callbacks;
+
+    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+    memcached_return_t error;
+    error= memcached_response(ptr, buffer, sizeof(buffer),
+                              &ptr->root->result);
+    if (error == MEMCACHED_SUCCESS)
+    {
+      for (unsigned int x= 0; x < cb.number_of_callback; x++)
+      {
+        error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context);
+        if (error != MEMCACHED_SUCCESS)
+          break;
+      }
 
+      /* @todo what should I do with the error message??? */
+    }
+    /* @todo what should I do with other error messages?? */
+    return true;
+  }
+
+  return false;
 }
 
 #ifdef UNUSED
@@ -90,8 +177,8 @@ void memcached_io_preread(memcached_st *ptr)
 }
 #endif
 
-memcached_return memcached_io_read(memcached_server_st *ptr,
-                                   void *buffer, size_t length, ssize_t *nread)
+memcached_return_t memcached_io_read(memcached_server_st *ptr,
+                                     void *buffer, size_t length, ssize_t *nread)
 {
   char *buffer_ptr;
 
@@ -111,14 +198,14 @@ memcached_return memcached_io_read(memcached_server_st *ptr,
         else if (data_read == -1)
         {
           ptr->cached_errno= errno;
-          memcached_return rc= MEMCACHED_UNKNOWN_READ_FAILURE;
+          memcached_return_t rc= MEMCACHED_UNKNOWN_READ_FAILURE;
           switch (errno)
           {
           case EAGAIN:
           case EINTR:
             if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS)
               continue;
-          /* fall through */
+            /* fall through */
 
           default:
             {
@@ -146,8 +233,8 @@ memcached_return memcached_io_read(memcached_server_st *ptr,
       }
 
       ptr->io_bytes_sent = 0;
-      ptr->read_data_length= data_read;
-      ptr->read_buffer_length= data_read;
+      ptr->read_data_length= (size_t) data_read;
+      ptr->read_buffer_length= (size_t) data_read;
       ptr->read_ptr= ptr->read_buffer;
     }
 
@@ -174,7 +261,7 @@ memcached_return memcached_io_read(memcached_server_st *ptr,
   }
 
   ptr->server_failure_counter= 0;
-  *nread = (size_t)(buffer_ptr - (char*)buffer);
+  *nread = (ssize_t)(buffer_ptr - (char*)buffer);
   return MEMCACHED_SUCCESS;
 }
 
@@ -218,7 +305,7 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
 
     if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
     {
-      memcached_return rc;
+      memcached_return_t rc;
       ssize_t sent_length;
 
       WATCHPOINT_ASSERT(ptr->fd != -1);
@@ -236,16 +323,16 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
 
   if (with_flush)
   {
-    memcached_return rc;
+    memcached_return_t rc;
     WATCHPOINT_ASSERT(ptr->fd != -1);
     if (io_flush(ptr, &rc) == -1)
       return -1;
   }
 
-  return original_length;
+  return (ssize_t) original_length;
 }
 
-memcached_return memcached_io_close(memcached_server_st *ptr)
+memcached_return_t memcached_io_close(memcached_server_st *ptr)
 {
   int r;
 
@@ -327,20 +414,20 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc)
 }
 
 static ssize_t io_flush(memcached_server_st *ptr,
-                        memcached_return *error)
+                        memcached_return_t *error)
 {
   /*
-  ** We might want to purge the input buffer if we haven't consumed
-  ** any output yet... The test for the limits is the purge is inline
-  ** in the purge function to avoid duplicating the logic..
 */
+   ** We might want to purge the input buffer if we haven't consumed
+   ** any output yet... The test for the limits is the purge is inline
+   ** in the purge function to avoid duplicating the logic..
+ */
   {
-     memcached_return rc;
-     WATCHPOINT_ASSERT(ptr->fd != -1);
-     rc= memcached_purge(ptr);
+    memcached_return_t rc;
+    WATCHPOINT_ASSERT(ptr->fd != -1);
+    rc= memcached_purge(ptr);
 
-     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
-       return -1;
+    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+      return -1;
   }
   ssize_t sent_length;
   size_t return_length;
@@ -356,7 +443,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
     return -1;
 
   if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
-          && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
+                                        && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
     return 0;
 
   /* Looking for memory overflows */
@@ -384,16 +471,26 @@ static ssize_t io_flush(memcached_server_st *ptr,
       case ENOBUFS:
         continue;
       case EAGAIN:
-      {
-        memcached_return rc;
-        rc= io_wait(ptr, MEM_WRITE);
+        {
+          /*
+           * We may be blocked on write because the input buffer
+           * is full. Let's check if we have room in our input
+           * buffer for more data and retry the write before
+           * waiting..
+         */
+          if (repack_input_buffer(ptr) ||
+              process_input_buffer(ptr))
+            continue;
 
-        if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
-          continue;
+          memcached_return_t rc;
+          rc= io_wait(ptr, MEM_WRITE);
 
-        memcached_quit_server(ptr, 1);
-        return -1;
-      }
+          if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
+            continue;
+
+          memcached_quit_server(ptr, 1);
+          return -1;
+        }
       default:
         memcached_quit_server(ptr, 1);
         *error= MEMCACHED_ERRNO;
@@ -408,11 +505,11 @@ static ssize_t io_flush(memcached_server_st *ptr,
       return -1;
     }
 
-    ptr->io_bytes_sent += sent_length;
+    ptr->io_bytes_sent += (uint32_t) sent_length;
 
     local_write_ptr+= sent_length;
-    write_length-= sent_length;
-    return_length+= sent_length;
+    write_length-= (uint32_t) sent_length;
+    return_length+= (uint32_t) sent_length;
   }
 
   WATCHPOINT_ASSERT(write_length == 0);
@@ -426,10 +523,10 @@ static ssize_t io_flush(memcached_server_st *ptr,
   else
     ptr->write_buffer_offset= 0;
 
-  return return_length;
+  return (ssize_t) return_length;
 }
 
-/* 
+/*
   Eventually we will just kill off the server with the problem.
 */
 void memcached_io_reset(memcached_server_st *ptr)
@@ -439,11 +536,11 @@ void memcached_io_reset(memcached_server_st *ptr)
 
 /**
  * Read a given number of bytes from the server and place it into a specific
- * buffer. Reset the IO channel on this server if an error occurs. 
+ * buffer. Reset the IO channel on this server if an error occurs.
  */
-memcached_return memcached_safe_read(memcached_server_st *ptr,
-                                     void *dta,
-                                     size_t size)
+memcached_return_t memcached_safe_read(memcached_server_st *ptr,
+                                       void *dta,
+                                       size_t size)
 {
   size_t offset= 0;
   char *data= dta;
@@ -451,20 +548,20 @@ memcached_return memcached_safe_read(memcached_server_st *ptr,
   while (offset < size)
   {
     ssize_t nread;
-    memcached_return rc= memcached_io_read(ptr, data + offset, size - offset,
-                                           &nread);
+    memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset,
+                                             &nread);
     if (rc != MEMCACHED_SUCCESS)
       return rc;
 
-    offset+= nread;
+    offset+= (size_t) nread;
   }
 
   return MEMCACHED_SUCCESS;
 }
 
-memcached_return memcached_io_readline(memcached_server_st *ptr,
-                                       char *buffer_ptr,
-                                       size_t size)
+memcached_return_t memcached_io_readline(memcached_server_st *ptr,
+                                         char *buffer_ptr,
+                                         size_t size)
 {
   bool line_complete= false;
   size_t total_nr= 0;
@@ -477,9 +574,9 @@ memcached_return memcached_io_readline(memcached_server_st *ptr,
        * We don't have any data in the buffer, so let's fill the read
        * buffer. Call the standard read function to avoid duplicating
        * the logic.
-       */
+     */
       ssize_t nread;
-      memcached_return rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
+      memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
       if (rc != MEMCACHED_SUCCESS)
         return rc;
 
@@ -524,22 +621,22 @@ static void increment_udp_message_id(memcached_server_st *ptr)
 {
   struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
   uint16_t cur_req= get_udp_datagram_request_id(header);
-  uint16_t msg_num= get_msg_num_from_request_id(cur_req);
-  uint16_t thread_id= get_thread_id_from_request_id(cur_req);
-  
+  int msg_num= get_msg_num_from_request_id(cur_req);
+  int thread_id= get_thread_id_from_request_id(cur_req);
+
   if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
     msg_num= 0;
 
-  header->request_id= htons(thread_id | msg_num);
+  header->request_id= htons((uint16_t) (thread_id | msg_num));
 }
 
-memcached_return memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id)
+memcached_return_t memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id)
 {
   if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
     return MEMCACHED_FAILURE;
 
   struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
-  header->request_id= htons(generate_udp_request_thread_id(thread_id));
+  header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id)));
   header->num_datagrams= htons(1);
   header->sequence_number= htons(0);