Purge the input buffer if I discover a lot of commands being sent and none read ...
authorTrond Norbye <trond.norbye@sun.com>
Wed, 22 Oct 2008 20:08:15 +0000 (22:08 +0200)
committerTrond Norbye <trond.norbye@sun.com>
Wed, 22 Oct 2008 20:08:15 +0000 (22:08 +0200)
docs/memcached_behavior.pod
libmemcached/Makefile.am
libmemcached/memcached.c
libmemcached/memcached.h
libmemcached/memcached_behavior.c
libmemcached/memcached_constants.h
libmemcached/memcached_io.c
libmemcached/memcached_io.h
libmemcached/memcached_purge.c [new file with mode: 0644]
libmemcached/memcached_server.h

index 51e09e71436236ddede4ece9abc6b09b02cdd0d4..2dc214ae910c1340bf2bab23f8290571214c94b3 100755 (executable)
@@ -144,6 +144,21 @@ this flag on an open connection.
 Set this value to enable the server be removed after continuous MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT
 times connection failure.
 
+=item MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK
+
+Set this value to tune the number of messages that may be sent before 
+libmemcached should start to automatically drain the input queue. Setting
+this value to high, may cause libmemcached to deadlock (trying to send data,
+but the send will block because the input buffer in the kernel is full).
+
+=item MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK
+
+Set this value to tune the number of bytes that may be sent before
+libmemcached should start to automatically drain the input queue (need
+at least 10 IO requests sent without reading the input buffer). Setting
+this value to high, may cause libmemcached to deadlock (trying to send 
+data, but the send will block because the input buffer in the kernel is full).
+
 =back
 
 =head1 RETURN
index fc272120f2bdb986d478997297216e81451fa143..bb6cfa0bd22d9c6145b62c8a1df01f4745060d48 100644 (file)
@@ -54,6 +54,7 @@ libmemcached_la_SOURCES = crc.c \
                          memcached_hash.c \
                          memcached_hosts.c \
                          memcached_io.c \
+                         memcached_purge.c \
                          md5.c \
                          memcached_key.c \
                          memcached_quit.c \
index d5bfd19060986eb50703c20ccd9d246e9b3f6d37..accf24b717dc073dc530263445c784ea0896b1ba 100644 (file)
@@ -28,6 +28,9 @@ memcached_st *memcached_create(memcached_st *ptr)
   ptr->retry_timeout= 0;
   ptr->distribution= MEMCACHED_DISTRIBUTION_MODULA;
 
+  ptr->io_msg_watermark = 500;
+  ptr->io_bytes_watermark = 65 * 1024;
+
   return ptr;
 }
 
index 3b549ebceb1974c9ff69ab000bb28c35efb2d4f1..f12975f6e8d41face40dc792ea0151d61019f2f6 100644 (file)
@@ -100,6 +100,9 @@ struct memcached_st {
   int32_t snd_timeout;
   int32_t rcv_timeout;
   uint32_t server_failure_limit;
+  uint32_t io_msg_watermark;
+  uint32_t io_bytes_watermark;
+  char purging;
 };
 
 
index ef4907dd24e02502cd96b69b534421cf3c2e7729..0cd42efc64fee5fc41054316a4a50a53f7eaaf4a 100644 (file)
@@ -23,6 +23,12 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
 {
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
+    ptr->io_msg_watermark= (int32_t)data;
+    break;
+  case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
+    ptr->io_bytes_watermark= (int32_t)data;
+    break;
   case MEMCACHED_BEHAVIOR_SND_TIMEOUT:
     ptr->snd_timeout= (int32_t)data;
     break;     
@@ -130,6 +136,12 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
 
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
+    temp_flag = ptr->io_msg_watermark;
+    break;
+  case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
+    temp_flag = ptr->io_bytes_watermark;
+    break;
   case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
     temp_flag= MEM_BINARY_PROTOCOL;
     break;     
index 47774d37e641adab8138ea5eac3716fd5ea6a626..fb8305f46b2583c1dab8551babfe567ebb1013e3 100644 (file)
@@ -91,7 +91,9 @@ typedef enum {
   MEMCACHED_BEHAVIOR_BINARY_PROTOCOL,
   MEMCACHED_BEHAVIOR_SND_TIMEOUT,
   MEMCACHED_BEHAVIOR_RCV_TIMEOUT,
-  MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT
+  MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT,
+  MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK,
+  MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK
 } memcached_behavior;
 
 typedef enum {
index 47486d3095b799872a6943004c23821af525c7b9..ddca197260f818de70c5e04e6e60a553479f1b8e 100644 (file)
@@ -30,6 +30,17 @@ static memcached_return io_wait(memcached_server_st *ptr,
   fds[0].fd= ptr->fd;
   fds[0].events= flags;
 
+  /*
+  ** We are going to block on write, but at least on Solaris we might block
+  ** on write if we haven't read anything from our input buffer..
+  ** Try to purge the input buffer if we don't do any flow control in the
+  ** application layer (just sending a lot of data etc)
+  ** The test is moved down in the purge function to avoid duplication of
+  ** the test.
+  */
+  if (read_or_write == MEM_WRITE)
+    memcached_purge(ptr);
+
   error= poll(fds, 1, ptr->root->poll_timeout);
 
   if (error == 1)
@@ -124,6 +135,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
         }
       }
 
+      ptr->io_bytes_sent = 0;
       ptr->read_data_length= data_read;
       ptr->read_buffer_length= data_read;
       ptr->read_ptr= ptr->read_buffer;
@@ -280,6 +292,13 @@ static ssize_t io_flush(memcached_server_st *ptr,
     }
     else
     {
+      /*
+      ** We might want to purge the input buffer if we haven't consumed
+      ** any output yet... The test for the limits is the purge is inline
+      ** in the purge function to avoid duplicating the logic..
+      */
+      memcached_purge(ptr);
+
       if ((sent_length= write(ptr->fd, local_write_ptr, 
                                        write_length)) == -1)
       {
@@ -307,6 +326,8 @@ static ssize_t io_flush(memcached_server_st *ptr,
       }
     }
 
+    ptr->io_bytes_sent += sent_length;
+
     local_write_ptr+= sent_length;
     write_length-= sent_length;
     return_length+= sent_length;
index ac0ffdf874df4b44126c7fe88763ef14a4ebf6e3..1da530ee42b34b67e90caff3d6527b8ad0b7c542 100644 (file)
@@ -10,5 +10,5 @@ void memcached_io_reset(memcached_server_st *ptr);
 ssize_t memcached_io_read(memcached_server_st *ptr,
                           void *buffer, size_t length);
 memcached_return memcached_io_close(memcached_server_st *ptr);
-
+void memcached_purge(memcached_server_st *ptr);
 #endif /* __MEMCACHED_IO_H__ */
diff --git a/libmemcached/memcached_purge.c b/libmemcached/memcached_purge.c
new file mode 100644 (file)
index 0000000..d9d4305
--- /dev/null
@@ -0,0 +1,42 @@
+#include <assert.h>
+
+#include "common.h"
+#include "memcached_io.h"
+
+void memcached_purge(memcached_server_st *ptr) 
+{
+  char buffer[2048];
+  size_t buffer_length = sizeof(buffer);
+  memcached_result_st result;
+
+  if (ptr->root->purging || /* already purging */
+      (memcached_server_response_count(ptr) < ptr->root->io_msg_watermark && 
+       ptr->io_bytes_sent < ptr->root->io_bytes_watermark) ||
+      (ptr->io_bytes_sent > ptr->root->io_bytes_watermark && 
+       memcached_server_response_count(ptr) < 10)) {
+    return;
+  }
+
+  /* memcached_io_write and memcached_response may call memcached_purge
+     so we need to be able stop any recursion.. */
+  ptr->root->purging = 1;
+
+  /* Force a flush of the buffer to ensure that we don't have the n-1 pending
+     requests buffered up.. */
+  memcached_io_write(ptr, NULL, 0, 1);
+
+  /* we have already incremented the response counter, and memcached_response
+     will read out all messages.. To avoid memcached_response to wait forever
+     for a response to a command I have in my buffer, let's decrement the 
+     response counter :) */
+  memcached_server_response_decrement(ptr);
+  
+  /* memcached_response may call memcached_io_read, but let's use a short
+     timeout if there is no data yet */
+  int32_t timeout = ptr->root->poll_timeout;
+  ptr->root->poll_timeout = 1;
+  memcached_response(ptr, buffer, sizeof(buffer), &result);
+  ptr->root->poll_timeout = timeout;
+  memcached_server_response_increment(ptr);
+  ptr->root->purging = 0;
+}
index 891c9c73ab596cfd8ded2d60c91cdbe7b822a52a..d5ed2692528530737497ffbd29cdf200b668beb6 100644 (file)
@@ -37,6 +37,7 @@ struct memcached_server_st {
   memcached_st *root;
   uint64_t limit_maxbytes;
   uint32_t server_failure_counter;
+  uint32_t io_bytes_sent; /* # bytes sent since last read */
 };
 
 #define memcached_server_count(A) (A)->number_of_hosts