Implement memcached_behavior_noreply
authorTrond Norbye <trond.norbye@sun.com>
Tue, 20 Jan 2009 11:34:40 +0000 (12:34 +0100)
committerTrond Norbye <trond.norbye@sun.com>
Tue, 20 Jan 2009 11:34:40 +0000 (12:34 +0100)
docs/memcached_behavior.pod
libmemcached/common.h
libmemcached/memcached_behavior.c
libmemcached/memcached_constants.h
libmemcached/memcached_storage.c
tests/function.c

index 2dc214ae910c1340bf2bab23f8290571214c94b3..f4246556f7a43c701c8adee8d6eec7348da17af1 100755 (executable)
@@ -159,6 +159,13 @@ at least 10 IO requests sent without reading the input buffer). Setting
 this value to high, may cause libmemcached to deadlock (trying to send 
 data, but the send will block because the input buffer in the kernel is full).
 
+=item MEMCACHED_BEHAVIOR_NOREPLY
+
+Set this value to specify that you really don't care about the result
+from your storage commands (set, add, replace, append, prepend). With
+this flag enabled, each storage request will be sent immediately to the
+server overriding any setting of MEMCACHED_BEHAVIOR_BUFFER_REQUESTS.
+
 =back
 
 =head1 RETURN
index 760fb04241c5a3ac77a28eee0ba97fdc89bb2e65..40787d3863ea1ab4ce7f466f199ba361dbb66cb2 100644 (file)
@@ -75,7 +75,8 @@ typedef enum {
   /* 11 used for weighted ketama */
   MEM_KETAMA_WEIGHTED= (1 << 11),
   MEM_BINARY_PROTOCOL= (1 << 12),
-  MEM_HASH_WITH_PREFIX_KEY= (1 << 13)
+  MEM_HASH_WITH_PREFIX_KEY= (1 << 13),
+  MEM_NOREPLY= (1 << 14)
 } memcached_flags;
 
 /* Hashing algo */
index fc785e25c58562bcae365e0ac02d7631f08fe8fe..3eca4eac30233214d7b1973485b78c088da608c3 100644 (file)
@@ -136,6 +136,9 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
   case MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY:
     set_behavior_flag(ptr, MEM_HASH_WITH_PREFIX_KEY, data);
     break;
+  case MEMCACHED_BEHAVIOR_NOREPLY:
+    set_behavior_flag(ptr, MEM_NOREPLY, data);
+    break;     
   }
 
   return MEMCACHED_SUCCESS;
@@ -244,6 +247,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
   case MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY:
     temp_flag= MEM_HASH_WITH_PREFIX_KEY;
     break;
+  case MEMCACHED_BEHAVIOR_NOREPLY:
+    temp_flag= MEM_NOREPLY;
+    break;
   }
 
   WATCHPOINT_ASSERT(temp_flag); /* Programming mistake if it gets this far */
index 7f5bbdabff66f4a690799c0a511a93da5ce5a319..fc47a3b6e96835f2937c6d0f7f8be6f10962b10d 100644 (file)
@@ -96,7 +96,8 @@ typedef enum {
   MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT,
   MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK,
   MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK,
-  MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY
+  MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY,
+  MEMCACHED_BEHAVIOR_NOREPLY
 } memcached_behavior;
 
 typedef enum {
index 1ac70d146c9971c839fa6e0a59694dee81937686..0f817c476524811e136b589dac90d6b1a7035d6e 100644 (file)
@@ -124,6 +124,19 @@ static inline memcached_return memcached_send(memcached_st *ptr,
   else
     to_write= 1;
 
+  if (ptr->flags & MEM_NOREPLY)
+  {
+    if (memcached_io_write(&ptr->hosts[server_key], " noreply\r\n", 
+                           10, 1) == -1)
+    {
+      rc= MEMCACHED_WRITE_FAILURE;
+      goto error;
+    }
+
+    memcached_server_response_decrement(&ptr->hosts[server_key]);
+    return MEMCACHED_SUCCESS;
+  }
+
   if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
   {
     rc= MEMCACHED_WRITE_FAILURE;
@@ -378,6 +391,18 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
     request.message.header.request.cas= htonll(cas);
   
   flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
+
+  /* The binary protocol does not implement NOREPLY right now, so I need to
+   * parse the return message from the server. The problem we tried to solve
+   * is the fact that the async mode will buffer commands, and if we issue a
+   * lot of sets followed by a get we have to flush the send buffer before we
+   * can send the get command (and all those commands have to be executed).
+   * The workaround for the binary is that we send the command, but we await
+   * parsing of the result message until we really need to do it..
+   */
+  if (server->root->flags & MEM_NOREPLY)
+    flush=1;
+
   /* write the header */
   if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
       (memcached_io_write(server, key, key_length, 0) == -1) ||
@@ -387,9 +412,9 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
     return MEMCACHED_WRITE_FAILURE;
   }
 
-  if (flush == 0)
+  if (flush == 0 || server->root->flags & MEM_NOREPLY)
     return MEMCACHED_BUFFERED;
-  
+
   return memcached_response(server, NULL, 0, NULL);   
 }
 
index 4bc0be4ef9392c7c0f0e2531bd7a417541a5ec77..e72dc7eed64de3f8998f88a86d013ccc997f0727 100644 (file)
@@ -3031,6 +3031,44 @@ static memcached_return  poll_timeout(memcached_st *memc)
   return MEMCACHED_SUCCESS;
 }
 
+static memcached_return noreply_test(memcached_st *memc)
+{
+  memcached_return ret;
+  ret= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, 1);
+  assert(ret == MEMCACHED_SUCCESS);
+  ret= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
+  assert(ret == MEMCACHED_SUCCESS);
+  assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NOREPLY) == 1);
+  assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS) == 1);
+  
+  for (int x= 0; x < 100; ++x) {
+    char key[10];
+    size_t len= sprintf(key, "%d", x);
+    ret= memcached_set(memc, key, len, key, len, 0, 0);
+    assert(ret == MEMCACHED_SUCCESS || ret == MEMCACHED_BUFFERED);
+  }
+  
+  /*
+  ** NOTE: Don't ever do this in your code! this is not a supported use of the
+  ** API and is _ONLY_ done this way to verify that the library works the
+  ** way it is supposed to do!!!!
+  */
+  int no_msg= 0;
+  for (int x= 0; x < memc->number_of_hosts; ++x) {
+     no_msg+= memc->hosts[x].cursor_active;
+  }
+  
+  /*
+  ** The binary protocol does not implement quiet commands yet. Fix this
+  ** test they are implemented!
+  */
+  if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) == 1)
+     assert(no_msg == 100);
+  else
+     assert(no_msg == 0);
+
+  return MEMCACHED_SUCCESS;
+}
 
 /* Clean the server before beginning testing */
 test_st tests[] ={
@@ -3072,6 +3110,7 @@ test_st tests[] ={
   {"memcached_server_cursor", 1, memcached_server_cursor_test },
   {"read_through", 1, read_through },
   {"delete_through", 1, delete_through },
+  {"noreply", 1, noreply_test},
   {0, 0, 0}
 };