Merge Thomason's cork patch.
authorBrian Aker <brian@gaz>
Fri, 15 Jan 2010 00:35:02 +0000 (16:35 -0800)
committerBrian Aker <brian@gaz>
Fri, 15 Jan 2010 00:35:02 +0000 (16:35 -0800)
12 files changed:
ChangeLog
docs/memcached_behavior.pod
libmemcached/behavior.c
libmemcached/common.h
libmemcached/connect.c
libmemcached/constants.h
libmemcached/io.c
libmemcached/memcached.c
libmemcached/memcached.h
libmemcached/server.c
libmemcached/server.h
tests/mem_functions.c

index 308aac834bcf737af58fb8e6ca9418a9d465a788..c324e485306256c1bc6ef5fde45794e4d9a11fde 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,7 @@
 0.38
 
+ * Added MEMCACHED_BEHAVIOR_CORK.
  * memslap now creates a configuration file at ~/.memslap.cnf
 
  * memcached_purge() now calls any callbacks registered during get
index f59150f1751f5ef7265614a65eafc9a6a44f6a3e..2e0240afbe4c0c88a9463e22267739dc79f3543e 100644 (file)
@@ -218,6 +218,28 @@ on the starting point of the replica reads is randomized between the servers.
 This allows distributing read load to multiple servers with the expense of 
 more write traffic.
 
+=item MEMCACHED_BEHAVIOR_CORK
+
+Enable TCP_CORK behavior. This is only available as an option Linux.
+MEMCACHED_NO_SERVERS is returned if no servers are available to test with.
+MEMCACHED_NOT_SUPPORTED is returned if we were not able to determine
+if support was available. All other responses then MEMCACHED_SUCCESS
+report an error of some sort. This behavior also enables 
+MEMCACHED_BEHAVIOR_TCP_NODELAY when set.
+
+
+=item MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE
+
+Find the current size of SO_SNDBUF. A value of 0 means either an error
+occured or no hosts were available. It is safe to assume system default
+if this occurs.
+
+=item MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE
+
+Find the current size of SO_RCVBUF. A value of 0 means either an error
+occured or no hosts were available. It is safe to assume system default
+if this occurs.
+
 =back
 
 =head1 RETURN
index 6d1688601ce6f552463c139b05329008f1cdb9d7..01f5bd303c87bfccca57cd66ca5c6d3e9c8112ec 100644 (file)
@@ -185,6 +185,57 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr,
       srandom((uint32_t) time(NULL));
       ptr->flags.randomize_replica_read= set_flag(data);
       break;
+  case MEMCACHED_BEHAVIOR_CORK:
+      {
+        memcached_server_instance_st *instance;
+        bool action= set_flag(data);
+
+        if (action == false)
+        {
+          ptr->flags.cork= set_flag(false);
+          return MEMCACHED_SUCCESS;
+        }
+
+        instance= memcached_server_instance_fetch(ptr, 0);
+        if (! instance)
+          return MEMCACHED_NO_SERVERS;
+
+
+        /* We just try the first host, and if it is down we return zero */
+        memcached_return_t rc;
+        rc= memcached_connect(instance);
+        if (rc != MEMCACHED_SUCCESS)
+        {
+          return rc;
+        }
+
+        /* Now we test! */
+        memcached_ternary_t enabled;
+        enabled= cork_switch(instance, true);
+
+        switch (enabled)
+        {
+        case MEM_FALSE:
+          return ptr->cached_errno ? MEMCACHED_ERRNO : MEMCACHED_FAILURE ;
+        case MEM_TRUE:
+          {
+            enabled= cork_switch(instance, false);
+
+            if (enabled == false) // Possible bug in OS?
+            {
+              memcached_quit_server(instance, false); // We should reset everything on this error.
+              return MEMCACHED_ERRNO;  // Errno will be true because we will have already set it.
+            }
+            ptr->flags.cork= true;
+            ptr->flags.tcp_nodelay= true;
+          }
+          break;
+        case MEM_NOT:
+        default:
+          return MEMCACHED_NOT_SUPPORTED;
+        }
+      }
+      break;
   case MEMCACHED_BEHAVIOR_MAX:
   default:
     /* Shouldn't get here */
@@ -250,41 +301,54 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
     return (uint64_t)ptr->rcv_timeout;
   case MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE:
     {
-      int sock_size;
+      int sock_size= 0;
       socklen_t sock_length= sizeof(int);
       memcached_server_instance_st *instance;
 
+      if (ptr->send_size != -1) // If value is -1 then we are using the default
+        return (uint64_t) ptr->send_size;
+
       instance= memcached_server_instance_fetch(ptr, 0);
 
-      /* REFACTOR */
-      /* We just try the first host, and if it is down we return zero */
-      if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
-        return 0;
+      if (instance) // If we have an instance we test, otherwise we just set and pray
+      {
+        /* REFACTOR */
+        /* We just try the first host, and if it is down we return zero */
+        if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
+          return 0;
 
-      if (getsockopt(instance->fd, SOL_SOCKET,
-                     SO_SNDBUF, &sock_size, &sock_length))
-        return 0; /* Zero means error */
+        if (getsockopt(instance->fd, SOL_SOCKET,
+                       SO_SNDBUF, &sock_size, &sock_length))
+          return 0; /* Zero means error */
+      }
 
       return (uint64_t) sock_size;
     }
   case MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE:
     {
-      int sock_size;
+      int sock_size= 0;
       socklen_t sock_length= sizeof(int);
       memcached_server_instance_st *instance;
 
+      if (ptr->recv_size != -1) // If value is -1 then we are using the default
+        return (uint64_t) ptr->recv_size;
+
       instance= memcached_server_instance_fetch(ptr, 0);
 
       /** 
         @note REFACTOR 
       */
-      /* We just try the first host, and if it is down we return zero */
-      if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
-        return 0;
+      if (instance)
+      {
+        /* We just try the first host, and if it is down we return zero */
+        if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
+          return 0;
 
-      if (getsockopt(instance->fd, SOL_SOCKET,
-                     SO_RCVBUF, &sock_size, &sock_length))
-        return 0; /* Zero means error */
+        if (getsockopt(instance->fd, SOL_SOCKET,
+                       SO_RCVBUF, &sock_size, &sock_length))
+          return 0; /* Zero means error */
+
+      }
 
       return (uint64_t) sock_size;
     }
@@ -298,6 +362,8 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
     return ptr->flags.auto_eject_hosts;
   case MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ:
     return ptr->flags.randomize_replica_read;
+  case MEMCACHED_BEHAVIOR_CORK:
+    return ptr->flags.cork;
   case MEMCACHED_BEHAVIOR_MAX:
   default:
     WATCHPOINT_ASSERT(0); /* Programming mistake if it gets this far */
index e78d5143b69ff4eace11c8816881f49d7a21d464..1d77ba1674891c01a94a42af89fbda3fc66b0a0d 100644 (file)
@@ -71,6 +71,13 @@ struct memcached_continuum_item_st
   uint32_t value;
 };
 
+/* Yum, Fortran.... can you make the reference? */
+typedef enum {
+  MEM_NOT= -1,
+  MEM_FALSE= false,
+  MEM_TRUE= true,
+} memcached_ternary_t;
+
 
 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
 
@@ -83,7 +90,6 @@ struct memcached_continuum_item_st
 #define unlikely(x)     if(__builtin_expect((x) != 0, 0))
 #endif
 
-
 #define MEMCACHED_BLOCK_SIZE 1024
 #define MEMCACHED_DEFAULT_COMMAND_SIZE 350
 #define SMALL_STRING_LEN 1024
@@ -155,4 +161,42 @@ static inline memcached_return_t memcached_validate_key_length(size_t key_length
   return MEMCACHED_SUCCESS;
 }
 
+#ifdef TCP_CORK
+  #define CORK TCP_CORK
+#elif defined TCP_NOPUSH
+  #define CORK TCP_NOPUSH
+#endif
+
+/*
+  cork_switch() tries to enable TCP_CORK. IF TCP_CORK is not an option
+  on the system it returns false but sets errno to 0. Otherwise on
+  failure errno is set.
+*/
+static inline memcached_ternary_t cork_switch(memcached_server_st *ptr, bool enable)
+{
+#ifdef CORK
+  if (ptr->type != MEMCACHED_CONNECTION_TCP)
+    return MEM_FALSE;
+
+  int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK,
+                      &enable, (socklen_t)sizeof(int));
+  if (! err)
+  {
+    return MEM_TRUE;
+  }
+  else
+  {
+    ptr->cached_errno= errno;
+    return MEM_FALSE;
+  }
+#else
+  (void)ptr;
+  (void)enable;
+
+  ptr->cached_errno= 0;
+
+  return MEM_NOT;
+#endif
+}
+
 #endif /* LIBMEMCACHED_COMMON_H */
index 36971f60aa9c86e4a19d4bf8e42bc85a47ea1aa3..6d08eb709065f35b02303e209ac666316a8e7950 100644 (file)
@@ -111,7 +111,7 @@ static memcached_return_t set_socket_options(memcached_server_st *ptr)
       return MEMCACHED_FAILURE;
   }
 
-  if (ptr->root->send_size)
+  if (ptr->root->send_size > 0)
   {
     int error;
 
@@ -122,7 +122,7 @@ static memcached_return_t set_socket_options(memcached_server_st *ptr)
       return MEMCACHED_FAILURE;
   }
 
-  if (ptr->root->recv_size)
+  if (ptr->root->recv_size > 0)
   {
     int error;
 
index 5c704027b925eac8a45dccabdaf791dd27b02d15..11b2f677e2702c2e984d7778f38f676c308ba094 100644 (file)
@@ -114,6 +114,7 @@ typedef enum {
   MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS,
   MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS,
   MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ,
+  MEMCACHED_BEHAVIOR_CORK,
   MEMCACHED_BEHAVIOR_MAX
 } memcached_behavior_t;
 
index 3b8943c8dc31c1c00a0f0bc0b1705cb8f150f6e8..5102d95b7b118cda6aae0e8086748395ddcb37d5 100644 (file)
@@ -154,6 +154,36 @@ static bool process_input_buffer(memcached_server_instance_st *ptr)
   return false;
 }
 
+static inline void memcached_io_cork_push(memcached_server_st *ptr)
+{
+#ifdef CORK
+  if (ptr->root->flags.cork == false || ptr->state.is_corked)
+    return;
+
+  ptr->state.is_corked=
+    cork_switch(ptr, true) == MEM_TRUE ? true : false;
+
+  WATCHPOINT_ASSERT(ptr->state.is_corked == true);
+#else
+  (void)ptr;
+#endif
+}
+
+static inline void memcached_io_cork_pop(memcached_server_st *ptr)
+{
+#ifdef CORK
+  if (ptr->root->flags.cork == false || ptr->state.is_corked == false)
+    return;
+
+  ptr->state.is_corked=
+    cork_switch(ptr, false) == MEM_FALSE ? false : true;
+
+  WATCHPOINT_ASSERT(ptr->state.is_corked == false);
+#else
+  (void)ptr;
+#endif
+}
+
 #ifdef UNUSED
 void memcached_io_preread(memcached_st *ptr)
 {
@@ -280,6 +310,12 @@ ssize_t memcached_io_write(memcached_server_instance_st *ptr,
   original_length= length;
   buffer_ptr= buffer;
 
+  /* more writable data is coming if a flush isn't required, so delay send */
+  if (! with_flush)
+  {
+    memcached_io_cork_push(ptr);
+  }
+
   while (length)
   {
     char *write_ptr;
@@ -330,7 +366,11 @@ ssize_t memcached_io_write(memcached_server_instance_st *ptr,
     memcached_return_t rc;
     WATCHPOINT_ASSERT(ptr->fd != -1);
     if (io_flush(ptr, &rc) == -1)
+    {
       return -1;
+    }
+
+    memcached_io_cork_pop(ptr);
   }
 
   return (ssize_t) original_length;
index 203f0984704bde38999aba6aee88498346f0a65a..027ef20acbacc3a5260dbecbffc76e5ae409aeb4 100644 (file)
@@ -37,6 +37,10 @@ memcached_st *memcached_create(memcached_st *ptr)
   ptr->retry_timeout= 0;
   ptr->distribution= MEMCACHED_DISTRIBUTION_MODULA;
 
+
+  ptr->send_size= -1;
+  ptr->recv_size= -1;
+
   /* TODO, Document why we picked these defaults */
   ptr->io_msg_watermark= 500;
   ptr->io_bytes_watermark= 65 * 1024;
index e4510d6d9290f2f26f3c8e63c35c85b4b3b032d5..d86f9bc9f74cdfa5cddce68c33c2b6c71827bb50 100644 (file)
@@ -83,6 +83,7 @@ struct memcached_st {
     bool use_sort_hosts:1;
     bool use_udp:1;
     bool verify_key:1;
+    bool cork:1;
   } flags;
   int32_t poll_timeout;
   int32_t connect_timeout;
index 0d343f1d0c38460195d4c00f2fdcfbe97d118a26..f61f572e6b4d5dd439903e03a2deb3b58d221eb6 100644 (file)
@@ -51,6 +51,7 @@ memcached_server_st *memcached_server_create_with(memcached_st *memc, memcached_
   host->fd= -1;
   host->type= type;
   host->read_ptr= host->read_buffer;
+  host->state.is_corked= 0;
   if (memc)
     host->next_retry= memc->retry_timeout;
   if (type == MEMCACHED_CONNECTION_UDP)
index 1b99ecbb66cc94422453895a2af2d30717763e52..0051e0617a0c53f2192c1f8cb007ee423e3ab4e0 100644 (file)
@@ -29,6 +29,9 @@ struct memcached_server_st {
   uint32_t io_bytes_sent; /* # bytes sent since last read */
   uint32_t server_failure_counter;
   uint32_t weight;
+  struct { // Place any "state" sort variables in here.
+    bool is_corked;
+  } state;
   uint8_t major_version;
   uint8_t micro_version;
   uint8_t minor_version;
index 9ec6b751cda5090d4486c202c1093b94472da0e0..3b545c86266f3e7a4ea94334a3fecbe0fee5d4d1 100644 (file)
@@ -1858,6 +1858,29 @@ static test_return_t  behavior_test(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
+static test_return_t MEMCACHED_BEHAVIOR_CORK_test(memcached_st *memc)
+{
+  memcached_return_t rc;
+  bool set= true;
+  bool value;
+
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_CORK, set);
+#ifdef TCP_CORK
+  test_truth(rc == MEMCACHED_SUCCESS);
+#else
+  test_truth(rc == MEMCACHED_NOT_SUPPORTED);
+#endif
+
+  value= (bool)memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_CORK);
+#ifdef TCP_CORK
+  test_truth((bool)value == set);
+#else
+  test_false((bool)value == set);
+#endif
+
+  return TEST_SUCCESS;
+}
+
 static test_return_t fetch_all_results(memcached_st *memc)
 {
   memcached_return_t rc= MEMCACHED_SUCCESS;
@@ -3362,13 +3385,37 @@ static test_return_t  add_host_test1(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
-static test_return_t  pre_nonblock(memcached_st *memc)
+static test_return_t pre_nonblock(memcached_st *memc)
 {
   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 0);
 
   return TEST_SUCCESS;
 }
 
+static test_return_t pre_cork(memcached_st *memc)
+{
+  memcached_return_t rc;
+  bool set= true;
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_CORK, set);
+
+  if (rc == MEMCACHED_SUCCESS)
+    return TEST_SUCCESS;
+
+  return TEST_SKIPPED;
+}
+
+static test_return_t pre_cork_and_nonblock(memcached_st *memc)
+{
+  test_return_t rc;
+  
+  rc= pre_cork(memc);
+
+  if (rc != TEST_SUCCESS)
+    return rc;
+
+  return pre_nonblock(memc);
+}
+
 static test_return_t pre_nonblock_binary(memcached_st *memc)
 {
   memcached_return_t rc= MEMCACHED_FAILURE;
@@ -5762,6 +5809,7 @@ test_st tests[] ={
 
 test_st behavior_tests[] ={
   {"behavior_test", 0, (test_callback_fn)behavior_test},
+  {"MEMCACHED_BEHAVIOR_CORK", 0, (test_callback_fn)MEMCACHED_BEHAVIOR_CORK_test},
   {0, 0, 0}
 };
 
@@ -5977,6 +6025,8 @@ collection_st collection[] ={
   {"generate_murmur", (test_callback_fn)pre_murmur, 0, generate_tests},
   {"generate_jenkins", (test_callback_fn)pre_jenkins, 0, generate_tests},
   {"generate_nonblock", (test_callback_fn)pre_nonblock, 0, generate_tests},
+  {"generate_corked", (test_callback_fn)pre_cork, 0, generate_tests},
+  {"generate_corked_and_nonblock", (test_callback_fn)pre_cork_and_nonblock, 0, generate_tests},
   {"consistent_not", 0, 0, consistent_tests},
   {"consistent_ketama", (test_callback_fn)pre_behavior_ketama, 0, consistent_tests},
   {"consistent_ketama_weighted", (test_callback_fn)pre_behavior_ketama_weighted, 0, consistent_weighted_tests},