Merge Lee
[awesomized/libmemcached] / libmemcached / io.c
index 6d19c672da79f5414562847111716b3bfb512b98..bca5ea4bb2f0d1a86b900634182789bf5c092705 100644 (file)
@@ -19,10 +19,10 @@ typedef enum {
   MEM_WRITE
 } memc_read_or_write;
 
-static ssize_t io_flush(memcached_server_st *ptr, memcached_return_t *error);
-static void increment_udp_message_id(memcached_server_st *ptr);
+static ssize_t io_flush(memcached_server_instance_st *ptr, memcached_return_t *error);
+static void increment_udp_message_id(memcached_server_instance_st *ptr);
 
-static memcached_return_t io_wait(memcached_server_st *ptr,
+static memcached_return_t io_wait(memcached_server_instance_st *ptr,
                                   memc_read_or_write read_or_write)
 {
   struct pollfd fds= {
@@ -73,7 +73,7 @@ static memcached_return_t io_wait(memcached_server_st *ptr,
  *
  * @param ptr the server to pack
  */
-static bool repack_input_buffer(memcached_server_st *ptr)
+static bool repack_input_buffer(memcached_server_instance_st *ptr)
 {
   if (ptr->read_ptr != ptr->read_buffer)
   {
@@ -113,7 +113,7 @@ static bool repack_input_buffer(memcached_server_st *ptr)
  * @param ptr the server to star processing iput messages for
  * @return true if we processed anything, false otherwise
  */
-static bool process_input_buffer(memcached_server_st *ptr)
+static bool process_input_buffer(memcached_server_instance_st *ptr)
 {
   /*
    ** We might be able to process some of the response messages if we
@@ -127,15 +127,21 @@ static bool process_input_buffer(memcached_server_st *ptr)
    */
     memcached_callback_st cb= *ptr->root->callbacks;
 
+    memcached_set_processing_input((memcached_st *)ptr->root, true);
+
     char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
     memcached_return_t error;
+    memcached_st *root= (memcached_st *)ptr->root;
     error= memcached_response(ptr, buffer, sizeof(buffer),
-                              &ptr->root->result);
+                              &root->result);
+
+    memcached_set_processing_input(root, false);
+
     if (error == MEMCACHED_SUCCESS)
     {
       for (unsigned int x= 0; x < cb.number_of_callback; x++)
       {
-        error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context);
+        error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
         if (error != MEMCACHED_SUCCESS)
           break;
       }
@@ -149,6 +155,42 @@ static bool process_input_buffer(memcached_server_st *ptr)
   return false;
 }
 
+static inline void memcached_io_cork_push(memcached_server_st *ptr)
+{
+#ifdef CORK
+  if (ptr->root->flags.cork == false || ptr->state.is_corked)
+    return;
+
+  int enable= 1;
+  int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK,
+                      &enable, (socklen_t)sizeof(int));
+  if (! err)
+    ptr->state.is_corked= true;
+
+  WATCHPOINT_ASSERT(ptr->state.is_corked == true);
+#else
+  (void)ptr;
+#endif
+}
+
+static inline void memcached_io_cork_pop(memcached_server_st *ptr)
+{
+#ifdef CORK
+  if (ptr->root->flags.cork == false || ptr->state.is_corked == false)
+    return;
+
+  int enable= 0;
+  int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK,
+                      &enable, (socklen_t)sizeof(int));
+  if (! err)
+    ptr->state.is_corked= false;
+
+  WATCHPOINT_ASSERT(ptr->state.is_corked == false);
+#else
+  (void)ptr;
+#endif
+}
+
 #ifdef UNUSED
 void memcached_io_preread(memcached_st *ptr)
 {
@@ -176,7 +218,7 @@ void memcached_io_preread(memcached_st *ptr)
 }
 #endif
 
-memcached_return_t memcached_io_read(memcached_server_st *ptr,
+memcached_return_t memcached_io_read(memcached_server_instance_st *ptr,
                                      void *buffer, size_t length, ssize_t *nread)
 {
   char *buffer_ptr;
@@ -264,7 +306,7 @@ memcached_return_t memcached_io_read(memcached_server_st *ptr,
   return MEMCACHED_SUCCESS;
 }
 
-ssize_t memcached_io_write(memcached_server_st *ptr,
+ssize_t memcached_io_write(memcached_server_instance_st *ptr,
                            const void *buffer, size_t length, char with_flush)
 {
   size_t original_length;
@@ -275,6 +317,12 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
   original_length= length;
   buffer_ptr= buffer;
 
+  /* more writable data is coming if a flush isn't required, so delay send */
+  if (! with_flush)
+  {
+    memcached_io_cork_push(ptr);
+  }
+
   while (length)
   {
     char *write_ptr;
@@ -325,13 +373,17 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
     memcached_return_t rc;
     WATCHPOINT_ASSERT(ptr->fd != -1);
     if (io_flush(ptr, &rc) == -1)
+    {
       return -1;
+    }
+
+    memcached_io_cork_pop(ptr);
   }
 
   return (ssize_t) original_length;
 }
 
-memcached_return_t memcached_io_close(memcached_server_st *ptr)
+memcached_return_t memcached_io_close(memcached_server_instance_st *ptr)
 {
   if (ptr->fd == -1)
   {
@@ -354,24 +406,27 @@ memcached_return_t memcached_io_close(memcached_server_st *ptr)
   return MEMCACHED_SUCCESS;
 }
 
-memcached_server_st *memcached_io_get_readable_server(memcached_st *memc)
+memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *memc)
 {
 #define MAX_SERVERS_TO_POLL 100
   struct pollfd fds[MAX_SERVERS_TO_POLL];
   unsigned int host_index= 0;
 
-  for (unsigned int x= 0;
+  for (uint32_t x= 0;
        x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL;
        ++x)
   {
-    if (memc->hosts[x].read_buffer_length > 0) /* I have data in the buffer */
-      return &memc->hosts[x];
+    memcached_server_instance_st *instance=
+      memcached_server_instance_fetch(memc, x);
+
+    if (instance->read_buffer_length > 0) /* I have data in the buffer */
+      return instance;
 
-    if (memcached_server_response_count(&memc->hosts[x]) > 0)
+    if (memcached_server_response_count(instance) > 0)
     {
       fds[host_index].events = POLLIN;
       fds[host_index].revents = 0;
-      fds[host_index].fd = memc->hosts[x].fd;
+      fds[host_index].fd = instance->fd;
       ++host_index;
     }
   }
@@ -379,9 +434,16 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc)
   if (host_index < 2)
   {
     /* We have 0 or 1 server with pending events.. */
-    for (unsigned int x= 0; x< memcached_server_count(memc); ++x)
-      if (memcached_server_response_count(&memc->hosts[x]) > 0)
-        return &memc->hosts[x];
+    for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
+    {
+      memcached_server_instance_st *instance=
+        memcached_server_instance_fetch(memc, x);
+
+      if (memcached_server_response_count(instance) > 0)
+      {
+        return instance;
+      }
+    }
 
     return NULL;
   }
@@ -398,10 +460,13 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc)
     {
       if (fds[x].revents & POLLIN)
       {
-        for (unsigned int y= 0; y < memcached_server_count(memc); ++y)
+        for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
         {
-          if (memc->hosts[y].fd == fds[x].fd)
-            return &memc->hosts[y];
+          memcached_server_instance_st *instance=
+            memcached_server_instance_fetch(memc, y);
+
+          if (instance->fd == fds[x].fd)
+            return instance;
         }
       }
     }
@@ -410,7 +475,7 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc)
   return NULL;
 }
 
-static ssize_t io_flush(memcached_server_st *ptr,
+static ssize_t io_flush(memcached_server_instance_st *ptr,
                         memcached_return_t *error)
 {
   /*
@@ -526,7 +591,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
 /*
   Eventually we will just kill off the server with the problem.
 */
-void memcached_io_reset(memcached_server_st *ptr)
+void memcached_io_reset(memcached_server_instance_st *ptr)
 {
   memcached_quit_server(ptr, 1);
 }
@@ -535,7 +600,7 @@ void memcached_io_reset(memcached_server_st *ptr)
  * Read a given number of bytes from the server and place it into a specific
  * buffer. Reset the IO channel on this server if an error occurs.
  */
-memcached_return_t memcached_safe_read(memcached_server_st *ptr,
+memcached_return_t memcached_safe_read(memcached_server_instance_st *ptr,
                                        void *dta,
                                        size_t size)
 {
@@ -556,7 +621,7 @@ memcached_return_t memcached_safe_read(memcached_server_st *ptr,
   return MEMCACHED_SUCCESS;
 }
 
-memcached_return_t memcached_io_readline(memcached_server_st *ptr,
+memcached_return_t memcached_io_readline(memcached_server_instance_st *ptr,
                                          char *buffer_ptr,
                                          size_t size)
 {
@@ -614,7 +679,7 @@ memcached_return_t memcached_io_readline(memcached_server_st *ptr,
  * extracts the message number from message_id, increments it and then
  * writes the new value back into the header
  */
-static void increment_udp_message_id(memcached_server_st *ptr)
+static void increment_udp_message_id(memcached_server_instance_st *ptr)
 {
   struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
   uint16_t cur_req= get_udp_datagram_request_id(header);
@@ -627,7 +692,7 @@ static void increment_udp_message_id(memcached_server_st *ptr)
   header->request_id= htons((uint16_t) (thread_id | msg_num));
 }
 
-memcached_return_t memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id)
+memcached_return_t memcached_io_init_udp_header(memcached_server_instance_st *ptr, uint16_t thread_id)
 {
   if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
     return MEMCACHED_FAILURE;