Implemented support for noreply in the binary protocol
authorTrond Norbye <trond.norbye@sun.com>
Tue, 27 Jan 2009 11:34:32 +0000 (12:34 +0100)
committerTrond Norbye <trond.norbye@sun.com>
Tue, 27 Jan 2009 11:34:32 +0000 (12:34 +0100)
Please note that the quiet commands in the binary protocol was added
recently, so you need a recent build of the memcached 1.3.x server.
(at least http://github.com/dustin/memcached/commit/0e8a58a8afbb8f15e42b001f2442858cfa3dcbb6 )

libmemcached/memcached/protocol_binary.h
libmemcached/memcached_response.c
libmemcached/memcached_storage.c
tests/function.c

index ee5dd0b9a011cce5575b07a2f7912dc14736f258..08df72e8b10fe0c505086443c8409f49ecd6a118 100644 (file)
@@ -93,7 +93,17 @@ extern "C"
     PROTOCOL_BINARY_CMD_GETKQ = 0x0d,
     PROTOCOL_BINARY_CMD_APPEND = 0x0e,
     PROTOCOL_BINARY_CMD_PREPEND = 0x0f,
-    PROTOCOL_BINARY_CMD_STAT = 0x10
+    PROTOCOL_BINARY_CMD_STAT = 0x10,
+    PROTOCOL_BINARY_CMD_SETQ = 0x11,
+    PROTOCOL_BINARY_CMD_ADDQ = 0x12,
+    PROTOCOL_BINARY_CMD_REPLACEQ = 0x13,
+    PROTOCOL_BINARY_CMD_DELETEQ = 0x14,
+    PROTOCOL_BINARY_CMD_INCREMENTQ = 0x15,
+    PROTOCOL_BINARY_CMD_DECREMENTQ = 0x16,
+    PROTOCOL_BINARY_CMD_QUITQ = 0x17,
+    PROTOCOL_BINARY_CMD_FLUSHQ = 0x18,
+    PROTOCOL_BINARY_CMD_APPENDQ = 0x19,
+    PROTOCOL_BINARY_CMD_PREPENDQ = 0x1a
   } protocol_binary_command;
 
   /**
index e9ff94ac399dbe731a94daee1639d35fb980ce5b..940c9f8844e26632181e09b404a9b8c277a75585 100644 (file)
@@ -300,14 +300,30 @@ static memcached_return binary_read_one_response(memcached_server_st *ptr,
   else if (header.response.bodylen) 
   {
      /* What should I do with the error message??? just discard it for now */
-    char buffer[SMALL_STRING_LEN];
+    char hole[SMALL_STRING_LEN];
     while (bodylen > 0) 
     {
       size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen;
-      if (memcached_safe_read(ptr, buffer, nr) != MEMCACHED_SUCCESS)
+      if (memcached_safe_read(ptr, hole, nr) != MEMCACHED_SUCCESS)
         return MEMCACHED_UNKNOWN_READ_FAILURE;
       bodylen -= nr;
     }
+
+    /* This might be an error from one of the quiet commands.. if
+     * so, just throw it away and get the next one. What about creating
+     * a callback to the user with the error information?
+     */
+    switch (header.response.opcode)
+    {
+    case PROTOCOL_BINARY_CMD_SETQ:
+    case PROTOCOL_BINARY_CMD_ADDQ:
+    case PROTOCOL_BINARY_CMD_REPLACEQ:
+    case PROTOCOL_BINARY_CMD_APPENDQ:
+    case PROTOCOL_BINARY_CMD_PREPENDQ:
+      return binary_read_one_response(ptr, buffer, buffer_length, result);
+    default:
+      break;
+    }
   }
 
   memcached_return rc= MEMCACHED_SUCCESS;
index 111d0ea7718b71754ee7f93840da1ed78fcf104d..4b62606832cb1462e21468f5cd5f291be116d2b8 100644 (file)
@@ -333,6 +333,55 @@ memcached_return memcached_cas_by_key(memcached_st *ptr,
   return rc;
 }
 
+static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply) {
+   uint8_t ret;
+
+  if (noreply)
+    switch (verb)
+    {
+    case SET_OP:
+      ret=PROTOCOL_BINARY_CMD_SETQ;
+      break;
+    case ADD_OP:
+      ret=PROTOCOL_BINARY_CMD_ADDQ;
+      break;
+    case CAS_OP: /* FALLTHROUGH */
+    case REPLACE_OP:
+      ret=PROTOCOL_BINARY_CMD_REPLACEQ;
+      break;
+    case APPEND_OP:
+      ret=PROTOCOL_BINARY_CMD_APPENDQ;
+      break;
+    case PREPEND_OP:
+      ret=PROTOCOL_BINARY_CMD_PREPENDQ;
+      break;
+    }
+  else
+    switch (verb)
+    {
+    case SET_OP:
+      ret=PROTOCOL_BINARY_CMD_SET;
+      break;
+    case ADD_OP:
+      ret=PROTOCOL_BINARY_CMD_ADD;
+      break;
+    case CAS_OP: /* FALLTHROUGH */
+    case REPLACE_OP:
+      ret=PROTOCOL_BINARY_CMD_REPLACE;
+      break;
+    case APPEND_OP:
+      ret=PROTOCOL_BINARY_CMD_APPEND;
+      break;
+    case PREPEND_OP:
+      ret=PROTOCOL_BINARY_CMD_PREPEND;
+      break;
+    }
+
+   return ret;
+}
+
+
+
 static memcached_return memcached_send_binary(memcached_server_st* server, 
                                               const char *key, 
                                               size_t key_length, 
@@ -346,30 +395,10 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
   char flush;
   protocol_binary_request_set request= {.bytes= {0}};
   size_t send_length= sizeof(request.bytes);
+  bool noreply = server->root->flags & MEM_NOREPLY;
 
   request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-  switch (verb) 
-  {
-  case SET_OP:
-    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SET;
-    break;
-  case ADD_OP:
-    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_ADD;
-    break;
-  case REPLACE_OP:
-    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE;
-    break;
-  case APPEND_OP:
-    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_APPEND;
-    break;
-  case PREPEND_OP:
-    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_PREPEND;
-    break;
-  case CAS_OP:
-    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE;
-      break;
-  }
-
+  request.message.header.request.opcode= get_com_code(verb, noreply);
   request.message.header.request.keylen= htons((uint16_t)key_length);
   request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
   if (verb == APPEND_OP || verb == PREPEND_OP)
@@ -397,10 +426,16 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
     memcached_io_reset(server);
     return MEMCACHED_WRITE_FAILURE;
   }
+  
+  if (noreply)
+    memcached_server_response_decrement(server);
 
   if (flush == 0)
     return MEMCACHED_BUFFERED;
 
+  if (noreply)
+    return MEMCACHED_SUCCESS;
+
   return memcached_response(server, NULL, 0, NULL);   
 }
 
index 35d1c5a6251f214dd3bc8288e5bba9f4016c8bb4..bb2d399310390781f62da698d0923e00f419a4bf 100644 (file)
@@ -3040,51 +3040,115 @@ static test_return noreply_test(memcached_st *memc)
   assert(ret == MEMCACHED_SUCCESS);
   ret= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
   assert(ret == MEMCACHED_SUCCESS);
+  ret= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, 1);
+  assert(ret == MEMCACHED_SUCCESS);
   assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NOREPLY) == 1);
   assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS) == 1);
-  
-  for (int x= 0; x < 100; ++x) {
-    char key[10];
-    size_t len= sprintf(key, "%d", x);
-    ret= memcached_set(memc, key, len, key, len, 0, 0);
-    assert(ret == MEMCACHED_SUCCESS || ret == MEMCACHED_BUFFERED);
-  }
-  
-  /*
-  ** NOTE: Don't ever do this in your code! this is not a supported use of the
-  ** API and is _ONLY_ done this way to verify that the library works the
-  ** way it is supposed to do!!!!
-  */
-  int no_msg= 0;
-  for (int x= 0; x < memc->number_of_hosts; ++x) {
-     no_msg+= memc->hosts[x].cursor_active;
+  assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS) == 1);
+
+  for (int count=0; count < 5; ++count)
+  {
+    for (int x=0; x < 100; ++x)
+    {
+      char key[10];
+      size_t len=sprintf(key, "%d", x);
+      switch (count)
+      {
+      case 0:
+        ret=memcached_add(memc, key, len, key, len, 0, 0);
+        break;
+      case 1:
+        ret=memcached_replace(memc, key, len, key, len, 0, 0);
+        break;
+      case 2:
+        ret=memcached_set(memc, key, len, key, len, 0, 0);
+        break;
+      case 3:
+        ret=memcached_append(memc, key, len, key, len, 0, 0);
+        break;
+      case 4:
+        ret=memcached_prepend(memc, key, len, key, len, 0, 0);
+        break;
+      }
+      assert(ret == MEMCACHED_SUCCESS || ret == MEMCACHED_BUFFERED);
+    }
+
+    /*
+    ** NOTE: Don't ever do this in your code! this is not a supported use of the
+    ** API and is _ONLY_ done this way to verify that the library works the
+    ** way it is supposed to do!!!!
+    */
+    int no_msg=0;
+    for (int x=0; x < memc->number_of_hosts; ++x)
+      no_msg+=memc->hosts[x].cursor_active;
+
+    assert(no_msg == 0);
+    assert(memcached_flush_buffers(memc) == MEMCACHED_SUCCESS);
+
+    /*
+     ** Now validate that all items was set properly!
+     */
+    for (int x=0; x < 100; ++x)
+    {
+      char key[10];
+      size_t len=sprintf(key, "%d", x);
+      size_t length;
+      uint32_t flags;
+      char* value=memcached_get(memc, key, strlen(key),
+                                &length, &flags, &ret);
+      assert(ret == MEMCACHED_SUCCESS && value != NULL);
+      switch (count)
+      {
+      case 0: /* FALLTHROUGH */
+      case 1: /* FALLTHROUGH */
+      case 2:
+        assert(strncmp(value, key, len) == 0);
+        assert(len == length);
+        break;
+      case 3:
+        assert(length == len * 2);
+        break;
+      case 4:
+        assert(length == len * 3);
+        break;
+      }
+      free(value);
+    }
   }
-  
-  /*
-  ** The binary protocol does not implement quiet commands yet. Fix this
-  ** test they are implemented!
-  */
-  if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) == 1)
-     assert(no_msg == 100);
-  else
-     assert(no_msg == 0);
 
-  assert(memcached_flush_buffers(memc) == MEMCACHED_SUCCESS);
+  /* Try setting an illegal cas value (should not return an error to
+   * the caller (because we don't expect a return message from the server)
+   */
+  char* keys[]= {"0"};
+  size_t lengths[]= {1};
+  size_t length;
+  uint32_t flags;
+  memcached_result_st results_obj;
+  memcached_result_st *results;
+  ret=memcached_mget(memc, keys, lengths, 1);
+  assert(ret == MEMCACHED_SUCCESS);
+
+  results=memcached_result_create(memc, &results_obj);
+  assert(results);
+  results=memcached_fetch_result(memc, &results_obj, &ret);
+  assert(results);
+  assert(ret == MEMCACHED_SUCCESS);
+  uint64_t cas= memcached_result_cas(results);
+  memcached_result_free(&results_obj);
+
+  ret= memcached_cas(memc, keys[0], lengths[0], keys[0], lengths[0], 0, 0, cas);
+  assert(ret == MEMCACHED_SUCCESS);
 
   /*
-  ** Now validate that all items was set properly!
-  */
-  for (int x= 0; x < 100; ++x) {
-    char key[10];
-    size_t len= sprintf(key, "%d", x);
-    size_t length;
-    uint32_t flags;
-    char* value= memcached_get(memc, key, strlen(key), 
-                               &length, &flags, &ret);    
-    assert(ret == MEMCACHED_SUCCESS && value != NULL);
-    assert(strncmp(value, key, len) == 0);
-    free(value);
-  }
+   * The item will have a new cas value, so try to set it again with the old
+   * value. This should fail!
+   */
+  ret= memcached_cas(memc, keys[0], lengths[0], keys[0], lengths[0], 0, 0, cas);
+  assert(ret == MEMCACHED_SUCCESS);
+  assert(memcached_flush_buffers(memc) == MEMCACHED_SUCCESS);
+  char* value=memcached_get(memc, keys[0], lengths[0], &length, &flags, &ret);
+  assert(ret == MEMCACHED_SUCCESS && value != NULL);
+  free(value);
 
   return TEST_SUCCESS;
 }