Update for storage to now use vector
authorBrian Aker <brian@tangent.org>
Wed, 28 Dec 2011 19:57:48 +0000 (11:57 -0800)
committerBrian Aker <brian@tangent.org>
Wed, 28 Dec 2011 19:57:48 +0000 (11:57 -0800)
23 files changed:
libmemcached-1.0/struct/memcached.h
libmemcached/auto.cc
libmemcached/behavior.cc
libmemcached/common.h
libmemcached/delete.cc
libmemcached/do.cc
libmemcached/do.hpp
libmemcached/exist.cc
libmemcached/flush.cc
libmemcached/include.am
libmemcached/io.cc
libmemcached/io.h
libmemcached/io.hpp
libmemcached/is.h
libmemcached/key.cc
libmemcached/memcached.cc
libmemcached/response.cc
libmemcached/storage.cc
libmemcached/udp.cc [new file with mode: 0644]
libmemcached/udp.hpp [new file with mode: 0644]
libmemcached/verbosity.cc
tests/libmemcached-1.0/mem_functions.cc
tests/mem_udp.cc

index 152f310330c8402428df3500a5a39731372c2269..564eb74e55b9c825252a2cf16dd6eed16c5db8cb 100644 (file)
@@ -54,7 +54,7 @@ struct memcached_st {
     bool buffer_requests:1;
     bool hash_with_namespace:1;
     bool no_block:1; // Don't block
-    bool no_reply:1;
+    bool reply:1;
     bool randomize_replica_read:1;
     bool support_cas:1;
     bool tcp_nodelay:1;
index 01a0132c7ae65f41038ad08522636e155696a7b0..174f2623001dd47df60b82383ac7f755b2e2ddf7 100644 (file)
 
 #include <libmemcached/common.h>
 
-static memcached_return_t text_incr_decr(memcached_st *ptr,
+static memcached_return_t text_incr_decr(memcached_server_write_instance_st instance,
                                          const bool is_incr,
-                                         const char *group_key, size_t group_key_length,
                                          const char *key, size_t key_length,
-                                         uint64_t offset,
+                                         const uint64_t offset,
+                                         const bool reply,
                                          uint64_t& numeric_value)
 {
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
-  uint32_t server_key;
-  memcached_server_write_instance_st instance;
-  
-  // Invert the logic to make it simpler to read the code
-  bool reply= (ptr->flags.no_reply) ? false : true;
-
-  if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
-  {
-    return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT);
-  }
-
-  server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
-  instance= memcached_server_instance_fetch(ptr, server_key);
 
   int send_length= snprintf(buffer, sizeof(buffer), " %" PRIu64, offset);
   if (size_t(send_length) >= sizeof(buffer) or send_length < 0)
   {
-    return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, 
+    return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, 
                                memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
   }
 
   struct libmemcached_io_vector_st vector[]=
   {
     { memcached_literal_param("incr ") },
-    { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
+    { memcached_array_string(instance->root->_namespace), memcached_array_size(instance->root->_namespace) },
     { key, key_length },
     { buffer, send_length },
     { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
@@ -82,7 +69,13 @@ static memcached_return_t text_incr_decr(memcached_st *ptr,
   }
 
   memcached_return_t rc= memcached_vdo(instance, vector, 6, true);
-  if (reply == false or memcached_failed(rc))
+
+  if (reply == false)
+  {
+    return MEMCACHED_SUCCESS;
+  }
+
+  if (memcached_failed(rc))
   {
     numeric_value= UINT64_MAX;
     return rc;
@@ -93,19 +86,16 @@ static memcached_return_t text_incr_decr(memcached_st *ptr,
   return memcached_set_error(*instance, rc, MEMCACHED_AT);
 }
 
-static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,
-                                           const char *group_key, size_t group_key_length,
-                                           const char *key, size_t key_length,
-                                           uint64_t offset, uint64_t initial,
-                                           uint32_t expiration,
+static memcached_return_t binary_incr_decr(memcached_server_write_instance_st instance,
+                                           protocol_binary_command cmd,
+                                           const char *key, const size_t key_length,
+                                           const uint64_t offset,
+                                           const uint64_t initial,
+                                           const uint32_t expiration,
+                                           const bool reply,
                                            uint64_t *value)
 {
-  bool no_reply= ptr->flags.no_reply;
-
-  uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
-  memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
-
-  if (no_reply)
+  if (reply == false)
   {
     if(cmd == PROTOCOL_BINARY_CMD_DECREMENT)
     {
@@ -121,10 +111,10 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,
 
   request.message.header.request.magic= PROTOCOL_BINARY_REQ;
   request.message.header.request.opcode= cmd;
-  request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
+  request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(instance->root->_namespace)));
   request.message.header.request.extlen= 20;
   request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-  request.message.header.request.bodylen= htonl((uint32_t)(key_length + memcached_array_size(ptr->_namespace) +request.message.header.request.extlen));
+  request.message.header.request.bodylen= htonl((uint32_t)(key_length + memcached_array_size(instance->root->_namespace) +request.message.header.request.extlen));
   request.message.body.delta= memcached_htonll(offset);
   request.message.body.initial= memcached_htonll(initial);
   request.message.body.expiration= htonl((uint32_t) expiration);
@@ -132,7 +122,7 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,
   struct libmemcached_io_vector_st vector[]=
   {
     { request.bytes, sizeof(request.bytes) },
-    { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
+    { memcached_array_string(instance->root->_namespace), memcached_array_size(instance->root->_namespace) },
     { key, key_length }
   };
 
@@ -140,10 +130,10 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,
   if (memcached_failed(rc= memcached_vdo(instance, vector, 3, true)))
   {
     memcached_io_reset(instance);
-    return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc;
+    return MEMCACHED_WRITE_FAILURE;
   }
 
-  if (no_reply)
+  if (reply == false)
   {
     return MEMCACHED_SUCCESS;
   }
@@ -190,17 +180,28 @@ memcached_return_t memcached_increment_by_key(memcached_st *ptr,
     return rc;
   }
 
+  if (memcached_failed(rc= memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
+  {
+    return memcached_set_error(*ptr, rc, MEMCACHED_AT);
+  }
+
+  uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
+  memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
+
+  bool reply= memcached_is_replying(instance->root);
+
   LIBMEMCACHED_MEMCACHED_INCREMENT_START();
-  if (ptr->flags.binary_protocol)
+  if (memcached_is_binary(ptr))
   {
-    rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_INCREMENT,
-                         group_key, group_key_length, key, key_length,
-                         (uint64_t)offset, 0, MEMCACHED_EXPIRATION_NOT_ADD,
+    rc= binary_incr_decr(instance, PROTOCOL_BINARY_CMD_INCREMENT,
+                         key, key_length,
+                         uint64_t(offset), 0, MEMCACHED_EXPIRATION_NOT_ADD,
+                         reply,
                          value);
   }
   else
   {
-     rc= text_incr_decr(ptr, true, group_key, group_key_length, key, key_length, offset, *value);
+    rc= text_incr_decr(instance, true, key, key_length, offset, reply, *value);
   }
 
   LIBMEMCACHED_MEMCACHED_INCREMENT_END();
@@ -226,23 +227,29 @@ memcached_return_t memcached_decrement_by_key(memcached_st *ptr,
     return rc;
   }
 
-  if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol)))
+  if (memcached_failed(rc= memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
   {
-    return rc;
+    return memcached_set_error(*ptr, rc, MEMCACHED_AT);
   }
 
 
+  uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
+  memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
+
+  bool reply= memcached_is_replying(instance->root);
+
   LIBMEMCACHED_MEMCACHED_DECREMENT_START();
-  if (ptr->flags.binary_protocol)
+  if (memcached_is_binary(ptr))
   {
-    rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_DECREMENT,
-                         group_key, group_key_length, key, key_length,
-                         (uint64_t)offset, 0, MEMCACHED_EXPIRATION_NOT_ADD,
+    rc= binary_incr_decr(instance, PROTOCOL_BINARY_CMD_DECREMENT,
+                         key, key_length,
+                         offset, 0, MEMCACHED_EXPIRATION_NOT_ADD,
+                         reply,
                          value);
   }
   else
   {
-    rc= text_incr_decr(ptr, false, group_key, group_key_length, key, key_length, offset, *value);
+    rc= text_incr_decr(instance, false, key, key_length, offset, reply, *value);
   }
 
   LIBMEMCACHED_MEMCACHED_DECREMENT_END();
@@ -285,17 +292,23 @@ memcached_return_t memcached_increment_with_initial_by_key(memcached_st *ptr,
     return rc;
   }
 
-  if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol)))
+  if (memcached_failed(rc= memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
   {
-    return rc;
+    return memcached_set_error(*ptr, rc, MEMCACHED_AT);
   }
 
+  uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
+  memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
+
+  bool reply= memcached_is_replying(instance->root);
+
   LIBMEMCACHED_MEMCACHED_INCREMENT_WITH_INITIAL_START();
-  if (ptr->flags.binary_protocol)
+  if (memcached_is_binary(ptr))
   {
-    rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_INCREMENT,
-                         group_key, group_key_length, key, key_length,
-                         offset, initial, (uint32_t)expiration,
+    rc= binary_incr_decr(instance, PROTOCOL_BINARY_CMD_INCREMENT,
+                         key, key_length,
+                         offset, initial, uint32_t(expiration),
+                         reply,
                          value);
   }
   else
@@ -343,17 +356,24 @@ memcached_return_t memcached_decrement_with_initial_by_key(memcached_st *ptr,
     return rc;
   }
 
-  if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol)))
+  if (memcached_failed(rc= memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
   {
-    return rc;
+    return memcached_set_error(*ptr, rc, MEMCACHED_AT);
   }
 
+  uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
+  memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
+
+  bool reply= memcached_is_replying(instance->root);
+
+
   LIBMEMCACHED_MEMCACHED_INCREMENT_WITH_INITIAL_START();
-  if (ptr->flags.binary_protocol)
+  if (memcached_is_binary(ptr))
   {
-    rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_DECREMENT,
-                         group_key, group_key_length, key, key_length,
-                         offset, initial, (uint32_t)expiration,
+    rc= binary_incr_decr(instance, PROTOCOL_BINARY_CMD_DECREMENT,
+                         key, key_length,
+                         offset, initial, uint32_t(expiration),
+                         reply,
                          value);
   }
   else
@@ -365,4 +385,3 @@ memcached_return_t memcached_decrement_with_initial_by_key(memcached_st *ptr,
 
   return rc;
 }
-
index 71b3f0c02f10b836ae4370ec971e17cccdace450..abf69996c825029023eb3158f3c9cc8a3f1808fc 100644 (file)
@@ -128,9 +128,13 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr,
     ptr->flags.use_udp= bool(data);
     if (bool(data))
     {
-      ptr->flags.no_reply= true;
+      ptr->flags.reply= false;
       ptr->flags.buffer_requests= false;
     }
+    else
+    {
+      ptr->flags.reply= true;
+    }
     break;
 
   case MEMCACHED_BEHAVIOR_TCP_NODELAY:
@@ -236,7 +240,9 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr,
       return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
                                  memcached_literal_param("MEMCACHED_BEHAVIOR_NOREPLY cannot be disabled while MEMCACHED_BEHAVIOR_USE_UDP is enabled."));
     }
-    ptr->flags.no_reply= bool(data);
+    // We reverse the logic here to make it easier to understand throughout the
+    // code.
+    ptr->flags.reply= bool(data) ? false : true;
     break;
 
   case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS:
@@ -439,7 +445,7 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
     return ptr->flags.hash_with_namespace;
 
   case MEMCACHED_BEHAVIOR_NOREPLY:
-    return ptr->flags.no_reply;
+    return ptr->flags.reply ? false : true;
 
   case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS:
     return ptr->flags.auto_eject_hosts;
index 570735045823237e60442750dec3eebc129bc0b0..0f33a784c42b0b8f87d23e59613cb53676dfff3b 100644 (file)
@@ -105,6 +105,7 @@ memcached_return_t memcached_server_execute(memcached_st *ptr,
 #ifdef __cplusplus
 #include <libmemcached/string.hpp>
 #include <libmemcached/io.hpp>
+#include <libmemcached/udp.hpp>
 #include <libmemcached/do.hpp>
 #include <libmemcached/close_socket.hpp>
 #include <libmemcached/connect.hpp>
index 544ef49e058cd10217d628f12bae8a2bbb3900c9..b37f26881ef0acb0b14a89e0ba244ed0c3f7caaa 100644 (file)
@@ -44,18 +44,17 @@ memcached_return_t memcached_delete(memcached_st *ptr, const char *key, size_t k
   return memcached_delete_by_key(ptr, key, key_length, key, key_length, expiration);
 }
 
-static inline memcached_return_t ascii_delete(memcached_st *ptr,
-                                              memcached_server_write_instance_st instance,
+static inline memcached_return_t ascii_delete(memcached_server_write_instance_st instance,
                                               uint32_t ,
                                               const char *key,
-                                              size_t key_length,
-                                              bool& reply,
-                                              bool& flush)
+                                              const size_t key_length,
+                                              const bool reply,
+                                              const bool flush)
 {
   struct libmemcached_io_vector_st vector[]=
   {
     { memcached_literal_param("delete ") },
-    { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
+    { memcached_array_string(instance->root->_namespace), memcached_array_size(instance->root->_namespace) },
     { key, key_length },
     { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
     { memcached_literal_param("\r\n") }
@@ -80,13 +79,12 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr,
   return memcached_vdo(instance, vector, 5, flush);
 }
 
-static inline memcached_return_t binary_delete(memcached_st *ptr,
-                                               memcached_server_write_instance_st instance,
+static inline memcached_return_t binary_delete(memcached_server_write_instance_st instance,
                                                uint32_t server_key,
                                                const char *key,
-                                               size_t key_length,
-                                               bool& reply,
-                                               bool& flush)
+                                               const size_t key_length,
+                                               const bool reply,
+                                               const bool flush)
 {
   protocol_binary_request_delete request= {};
 
@@ -99,11 +97,11 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,
   {
     request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ;
   }
-  request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
+  request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(instance->root->_namespace)));
   request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-  request.message.header.request.bodylen= htonl((uint32_t)(key_length + memcached_array_size(ptr->_namespace)));
+  request.message.header.request.bodylen= htonl((uint32_t)(key_length + memcached_array_size(instance->root->_namespace)));
 
-  if (ptr->flags.use_udp and flush == false)
+  if (memcached_is_udp(instance->root))
   {
     size_t cmd_size= sizeof(request.bytes) + key_length;
     if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
@@ -120,7 +118,7 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,
   struct libmemcached_io_vector_st vector[]=
   {
     { request.bytes, sizeof(request.bytes) },
-    { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
+    { memcached_array_string(instance->root->_namespace), memcached_array_size(instance->root->_namespace) },
     { key, key_length }
   };
 
@@ -131,19 +129,19 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,
     memcached_io_reset(instance);
   }
 
-  if (ptr->number_of_replicas > 0)
+  if (instance->root->number_of_replicas > 0)
   {
     request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ;
 
-    for (uint32_t x= 0; x < ptr->number_of_replicas; ++x)
+    for (uint32_t x= 0; x < instance->root->number_of_replicas; ++x)
     {
       memcached_server_write_instance_st replica;
 
       ++server_key;
-      if (server_key == memcached_server_count(ptr))
+      if (server_key == memcached_server_count(instance->root))
         server_key= 0;
 
-      replica= memcached_server_instance_fetch(ptr, server_key);
+      replica= memcached_server_instance_fetch(instance->root, server_key);
 
       if (memcached_vdo(replica, vector, 3, flush) != MEMCACHED_SUCCESS)
       {
@@ -183,63 +181,63 @@ memcached_return_t memcached_delete_by_key(memcached_st *ptr,
     return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, 
                                memcached_literal_param("Memcached server version does not allow expiration of deleted items"));
   }
+
+  uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
+  memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
   
+  bool buffering= memcached_is_buffering(instance->root);
+  bool reply= memcached_is_replying(instance->root);
+
   // If a delete trigger exists, we need a response, so no buffering/noreply
   if (ptr->delete_trigger)
   {
-    if (ptr->flags.buffer_requests)
+    if (buffering)
     {
       return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, 
                                  memcached_literal_param("Delete triggers cannot be used if buffering is enabled"));
     }
 
-    if (ptr->flags.no_reply)
+    if (reply == false)
     {
       return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, 
                                  memcached_literal_param("Delete triggers cannot be used if MEMCACHED_BEHAVIOR_NOREPLY is set"));
     }
   }
 
-
-  uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
-  memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
-
-  bool to_write= (ptr->flags.buffer_requests) ? false : true;
-
-  // Invert the logic to make it simpler to read the code
-  bool reply= (ptr->flags.no_reply) ? false : true;
-
-  if (ptr->flags.binary_protocol)
+  if (memcached_is_binary(ptr))
   {
-    rc= binary_delete(ptr, instance, server_key, key, key_length, reply, to_write);
+    rc= binary_delete(instance, server_key, key, key_length, reply, buffering ? false : true);
   }
   else
   {
-    rc= ascii_delete(ptr, instance, server_key, key, key_length, reply, to_write);
+    rc= ascii_delete(instance, server_key, key, key_length, reply, buffering ? false : true);
   }
 
   if (rc == MEMCACHED_SUCCESS)
   {
-    if (to_write == false)
+    if (buffering == true)
     {
       rc= MEMCACHED_BUFFERED;
     }
-    else if (reply)
+    else if (reply == false)
+    {
+      rc= MEMCACHED_SUCCESS;
+    }
+    else
     {
       char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
       rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
       if (rc == MEMCACHED_DELETED)
       {
         rc= MEMCACHED_SUCCESS;
+        if (ptr->delete_trigger)
+        {
+          ptr->delete_trigger(ptr, key, key_length);
+        }
       }
     }
-
-    if (rc == MEMCACHED_SUCCESS and ptr->delete_trigger)
-    {
-      ptr->delete_trigger(ptr, key, key_length);
-    }
   }
 
   LIBMEMCACHED_MEMCACHED_DELETE_END();
-  return rc;
+  return memcached_set_error(*ptr, rc, MEMCACHED_AT );
 }
index 928e327292d374de487e8e75845734380e77651e..a793268dba1dc7c0781a89f91bec8bafcbbbde16 100644 (file)
@@ -11,8 +11,8 @@
 
 #include <libmemcached/common.h>
 
-memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr,
-                                 const struct libmemcached_io_vector_st *vector,
+memcached_return_t memcached_vdo(memcached_server_write_instance_st instance,
+                                 libmemcached_io_vector_st *vector,
                                  const size_t count,
                                  const bool with_flush)
 {
@@ -21,10 +21,10 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr,
   WATCHPOINT_ASSERT(count);
   WATCHPOINT_ASSERT(vector);
 
-  if (memcached_failed(rc= memcached_connect(ptr)))
+  if (memcached_failed(rc= memcached_connect(instance)))
   {
     WATCHPOINT_ERROR(rc);
-    assert_msg(ptr->error_messages, "memcached_connect() returned an error but the memcached_server_write_instance_st showed none.");
+    assert_msg(instance->error_messages, "memcached_connect() returned an error but the memcached_server_write_instance_st showed none.");
     return rc;
   }
 
@@ -33,17 +33,19 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr,
   ** before they start writing, if there is any data in buffer, clear it out,
   ** otherwise we might get a partial write.
   **/
-  if (memcached_is_udp(ptr->root) and with_flush and ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH)
+  if (memcached_is_udp(instance->root))
   {
-    if (memcached_io_write(ptr) == false)
+    size_t write_length= io_vector_total_size(vector, 11) +UDP_DATAGRAM_HEADER_LENGTH;
+
+    if (write_length > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
     {
-      memcached_io_reset(ptr);
-      return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
+      return MEMCACHED_WRITE_FAILURE;
     }
-  }
 
-  ssize_t sent_length= memcached_io_writev(ptr, vector, count, with_flush);
+    return MEMCACHED_NOT_SUPPORTED;
+  }
 
+  ssize_t sent_length= memcached_io_writev(instance, vector, count, with_flush);
   size_t command_length= 0;
   for (uint32_t x= 0; x < count; ++x, vector++)
   {
@@ -56,9 +58,9 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr,
     WATCHPOINT_ERROR(rc);
     WATCHPOINT_ERRNO(errno);
   }
-  else if ((ptr->root->flags.no_reply) == 0)
+  else if (memcached_is_replying(instance->root))
   {
-    memcached_server_response_increment(ptr);
+    memcached_server_response_increment(instance);
   }
 
   return rc;
index aac94912cf5d592f28a3bcaaa3cfeb6e35e9fae4..a42d8678976b523686e15736dab8f3911fbe3960 100644 (file)
@@ -38,6 +38,6 @@
 #pragma once
 
 memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr,
-                                 const struct libmemcached_io_vector_st *vector,
+                                 libmemcached_io_vector_st *vector,
                                  const size_t count,
                                  const bool with_flush);
index b68ba466db760fab7683c66d2c61c5a884575a69..ff0c1b42bfb05171fdb7eb7a239d0c3c30da942f 100644 (file)
@@ -58,14 +58,20 @@ static memcached_return_t ascii_exist(memcached_st *memc, memcached_server_write
     rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
 
     if (rc == MEMCACHED_NOTSTORED)
+    {
       rc= MEMCACHED_SUCCESS;
+    }
 
     if (rc == MEMCACHED_STORED)
+    {
       rc= MEMCACHED_NOTFOUND;
+    }
   }
 
   if (rc == MEMCACHED_WRITE_FAILURE)
+  {
     memcached_io_reset(instance);
+  }
 
   return rc;
 }
index 45cb458648de5233081745a1cfa0c16c90c825d9..99da07f4f471ce4057f726e0017c7179180ec44d 100644 (file)
@@ -67,7 +67,7 @@ static memcached_return_t memcached_flush_textual(memcached_st *ptr,
                                                   time_t expiration)
 {
   // Invert the logic to make it simpler to read the code
-  bool reply= (ptr->flags.no_reply) ? false : true;
+  bool reply= memcached_is_replying(ptr);
 
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
   int send_length= 0;
@@ -134,13 +134,13 @@ static memcached_return_t memcached_flush_binary(memcached_st *ptr,
   {
     memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
 
-    if (ptr->flags.no_reply)
+    if (memcached_is_replying(ptr))
     {
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ;
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
     }
     else
     {
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ;
     }
 
     struct libmemcached_io_vector_st vector[]=
index af49d2d54c1481b2f4898312cd21ba50427cca4c..a1e47ab9fac483795afc8cd4846e2446ee2d9cdf 100644 (file)
@@ -42,6 +42,7 @@ noinst_HEADERS+= \
                 libmemcached/server.hpp \
                 libmemcached/server_instance.h \
                 libmemcached/string.hpp \
+                libmemcached/udp.hpp \
                 libmemcached/virtual_bucket.h \
                 libmemcached/watchpoint.h
 
@@ -98,6 +99,7 @@ libmemcached_libmemcached_la_SOURCES+= \
                                       libmemcached/touch.cc \
                                       libmemcached/verbosity.cc \
                                       libmemcached/version.cc \
+                                      libmemcached/udp.cc \
                                       libmemcached/virtual_bucket.c
 
 libmemcached/options.cc: libmemcached/csl/parser.h
index 5e626b6a420ae456c7c5c62470735fa013c778b8..2b0866dc7ac9d86c6fd1626008272f3458fb5fe4 100644 (file)
@@ -44,30 +44,6 @@ enum memc_read_or_write {
   MEM_WRITE
 };
 
-/*
- * The udp request id consists of two seperate sections
- *   1) The thread id
- *   2) The message number
- * The thread id should only be set when the memcached_st struct is created
- * and should not be changed.
- *
- * The message num is incremented for each new message we send, this function
- * extracts the message number from message_id, increments it and then
- * writes the new value back into the header
- */
-static void increment_udp_message_id(memcached_server_write_instance_st ptr)
-{
-  struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
-  uint16_t cur_req= get_udp_datagram_request_id(header);
-  int msg_num= get_msg_num_from_request_id(cur_req);
-  int thread_id= get_thread_id_from_request_id(cur_req);
-
-  if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
-    msg_num= 0;
-
-  header->request_id= htons((uint16_t) (thread_id | msg_num));
-}
-
 /**
  * Try to fill the input buffer for a server with as much
  * data as possible.
@@ -192,7 +168,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr)
 }
 
 static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
-                                  memc_read_or_write read_or_write)
+                                  const memc_read_or_write read_or_write)
 {
   struct pollfd fds;
   fds.fd= ptr->fd;
@@ -835,7 +811,7 @@ void memcached_io_reset(memcached_server_write_instance_st ptr)
  */
 memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
                                        void *dta,
-                                       size_t size)
+                                       const size_t size)
 {
   size_t offset= 0;
   char *data= static_cast<char *>(dta);
@@ -918,18 +894,3 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
 
   return MEMCACHED_SUCCESS;
 }
-
-memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id)
-{
-  if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
-  {
-    return MEMCACHED_FAILURE;
-  }
-
-  struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
-  header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id)));
-  header->num_datagrams= htons(1);
-  header->sequence_number= htons(0);
-
-  return MEMCACHED_SUCCESS;
-}
index 00bd0c119f5f6f90e0bceb1a222d8cd44764f817..c2711e28928ba8f71363ad0b3decf446dd62fd51 100644 (file)
 
 #pragma once
 
-#define MAX_UDP_DATAGRAM_LENGTH 1400
-#define UDP_DATAGRAM_HEADER_LENGTH 8
-#define UDP_REQUEST_ID_MSG_SIG_DIGITS 10
-#define UDP_REQUEST_ID_THREAD_MASK 0xFFFF << UDP_REQUEST_ID_MSG_SIG_DIGITS
-#define get_udp_datagram_request_id(A) ntohs((A)->request_id)
-#define get_udp_datagram_seq_num(A) ntohs((A)->sequence_number)
-#define get_udp_datagram_num_datagrams(A) ntohs((A)->num_datagrams)
-#define get_msg_num_from_request_id(A) ( (A) & (~(UDP_REQUEST_ID_THREAD_MASK)) )
-#define get_thread_id_from_request_id(A) ( (A) & (UDP_REQUEST_ID_THREAD_MASK) ) >> UDP_REQUEST_ID_MSG_SIG_DIGITS
-#define generate_udp_request_thread_id(A) (A) << UDP_REQUEST_ID_MSG_SIG_DIGITS
-#define UDP_REQUEST_ID_MAX_THREAD_ID get_thread_id_from_request_id(0xFFFF)
-
-struct udp_datagram_header_st
-{
-  uint16_t request_id;
-  uint16_t sequence_number;
-  uint16_t num_datagrams;
-  uint16_t reserved;
-};
-
 struct libmemcached_io_vector_st
 {
   const void *buffer;
index af18fe3ad04bdfa0b2c1e9a56c0c0426c850f7b1..d91150f8cd70e3cfdae5e9aa578b0a7c2ef1370b 100644 (file)
@@ -67,10 +67,7 @@ void memcached_io_close(memcached_server_write_instance_st ptr);
 /* Read n bytes of data from the server and store them in dta */
 memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
                                        void *dta,
-                                       size_t size);
-
-memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr,
-                                                uint16_t thread_id);
+                                       const size_t size);
 
 memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc);
 
index 9f8783d8df4bee1097f3a03f720e486683aea6b8..6f8241d9deb5cdfafb4c30392bc1d7e00a345b47 100644 (file)
 /* These are private */ 
 #define memcached_is_allocated(__object) ((__object)->options.is_allocated)
 #define memcached_is_udp(__object) ((__object)->flags.use_udp)
+#define memcached_is_verify_key(__object) ((__object)->flags.verify_key)
+#define memcached_is_binary(__object) ((__object)->flags.binary_protocol)
 #define memcached_is_initialized(__object) ((__object)->options.is_initialized)
 #define memcached_is_purging(__object) ((__object)->state.is_purging)
 #define memcached_is_processing_input(__object) ((__object)->state.is_processing_input)
+
+#define memcached_is_buffering(__object) ((__object)->flags.buffer_requests)
+#define memcached_is_replying(__object) ((__object)->flags.reply)
+
+#define memcached_has_error(__object) ((__object)->error_messages)
+
 #define memcached_set_purging(__object, __value) ((__object)->state.is_purging= (__value))
 #define memcached_set_processing_input(__object, __value) ((__object)->state.is_processing_input= (__value))
 #define memcached_set_initialized(__object, __value) ((__object)->options.is_initialized(= (__value))
index ea98c729f02b92f5fdf187623dba9ec4a37b0ad0..e92468811ccf8b80ca3338eb7fffad7c9bb22b9c 100644 (file)
@@ -47,25 +47,15 @@ memcached_return_t memcached_key_test(memcached_st &memc,
     return memcached_set_error(memc, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT);
   }
 
-  if (not memc.flags.verify_key)
+  // If we don't need to verify the key, or we are using the binary protoocol,
+  // we just check the size of the key
+  if (memc.flags.verify_key == false or memc.flags.binary_protocol == true)
   {
     for (uint32_t x= 0; x < number_of_keys; x++)
     {
-      memcached_return_t rc= memcached_validate_key_length(*(key_length +x), false);
-      if (memcached_failed(rc))
-      {
-        return rc;
-      }
-    }
-
-    return MEMCACHED_SUCCESS;
-  }
-
-  if (memc.flags.binary_protocol)
-  {
-    for (uint32_t x= 0; x < number_of_keys; x++)
-    {
-      memcached_return_t rc= memcached_validate_key_length(*(key_length +x), false);
+      // We should set binary key, but the memcached server is broken for
+      // longer keys at the moment.
+      memcached_return_t rc= memcached_validate_key_length(*(key_length +x), false /* memc.flags.binary_protocol */);
       if (memcached_failed(rc))
       {
         return rc;
index 8c1e3acb12b1ac4a397dcf8e3a44aea25ee0f728..62052ada5ac25ec75bbd6759d6fb37567aa9cfab 100644 (file)
 #include <libmemcached/options.hpp>
 #include <libmemcached/virtual_bucket.h>
 
-#if 0
-static const memcached_st global_copy= {
-  .state= {
-    .is_purging= false, // .is_purging
-    .is_processing_input= false, // is_processing_input
-    .is_time_for_rebuild= false,
-  },
-  .flags= {
-    .auto_eject_hosts= false,
-    .binary_protocol= false,
-    .buffer_requests= false,
-    .hash_with_namespace= false,
-    .no_block= false,
-    .no_reply= false,
-    .randomize_replica_read= false,
-    .support_cas= false,
-    .tcp_nodelay= false,
-    .use_sort_hosts= false,
-    .use_udp= false,
-    .verify_key= false,
-    .tcp_keepalive= false,
-  },
-};
-#endif
-
 static inline bool _memcached_init(memcached_st *self)
 {
   self->state.is_purging= false;
@@ -76,7 +51,7 @@ static inline bool _memcached_init(memcached_st *self)
   self->flags.buffer_requests= false;
   self->flags.hash_with_namespace= false;
   self->flags.no_block= false;
-  self->flags.no_reply= false;
+  self->flags.reply= true;
   self->flags.randomize_replica_read= false;
   self->flags.support_cas= false;
   self->flags.tcp_nodelay= false;
index b996ee10bcdd14108ed96daa8bd8c8d8eeab1093..3312fb7c23642772e833d2425ce3a3f816b1937f 100644 (file)
@@ -705,7 +705,7 @@ memcached_return_t memcached_response(memcached_server_write_instance_st ptr,
                                       uint64_t& numeric_value)
 {
   /* We may have old commands in the buffer not set, first purge */
-  if ((ptr->root->flags.no_block) && (memcached_is_processing_input(ptr->root) == false))
+  if ((ptr->root->flags.no_block) and (memcached_is_processing_input(ptr->root) == false))
   {
     (void)memcached_io_write(ptr);
   }
@@ -715,7 +715,7 @@ memcached_return_t memcached_response(memcached_server_write_instance_st ptr,
    * returned the last one. Purge all pending messages to ensure backwards
    * compatibility.
  */
-  if (ptr->root->flags.binary_protocol == false)
+  if (memcached_is_binary(ptr->root) == false)
   {
     while (memcached_server_response_count(ptr) > 1)
     {
index 041575b6cce7fd3cb29fec76e822bd4ef658834a..3e29323286d667d7f5c9d1ad10fb654826fdd31f 100644 (file)
@@ -74,9 +74,9 @@ static inline const char *storage_op_string(memcached_storage_action_t verb)
   return "set ";
 }
 
-static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply)
+static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply)
 {
-  if (noreply)
+  if (reply == false)
   {
     switch (verb)
     {
@@ -124,22 +124,21 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
                                                 memcached_server_write_instance_st server,
                                                 uint32_t server_key,
                                                 const char *key,
-                                                size_t key_length,
+                                                const size_t key_length,
                                                 const char *value,
-                                                size_t value_length,
-                                                time_t expiration,
-                                                uint32_t flags,
-                                                uint64_t cas,
-                                                bool flush,
+                                                const size_t value_length,
+                                                const time_t expiration,
+                                                const uint32_t flags,
+                                                const uint64_t cas,
+                                                const bool flush,
+                                                const bool reply,
                                                 memcached_storage_action_t verb)
 {
   protocol_binary_request_set request= {};
   size_t send_length= sizeof(request.bytes);
 
-  bool noreply= server->root->flags.no_reply;
-
   request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-  request.message.header.request.opcode= get_com_code(verb, noreply);
+  request.message.header.request.opcode= get_com_code(verb, reply);
   request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
   request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
   if (verb == APPEND_OP or verb == PREPEND_OP)
@@ -161,20 +160,6 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
     request.message.header.request.cas= memcached_htonll(cas);
   }
 
-  if (server->root->flags.use_udp and flush == false)
-  {
-    size_t cmd_size= send_length + key_length + value_length;
-
-    if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
-    {
-      return MEMCACHED_WRITE_FAILURE;
-    }
-    if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
-    {
-      memcached_io_write(server);
-    }
-  }
-
   struct libmemcached_io_vector_st vector[]=
   {
     { request.bytes, send_length },
@@ -189,7 +174,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
   {
     memcached_io_reset(server);
 
-    if (ptr->error_messages == NULL)
+    if (memcached_has_error(ptr))
     {
       memcached_set_error(*server, rc, MEMCACHED_AT);
     }
@@ -197,7 +182,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
     return MEMCACHED_WRITE_FAILURE;
   }
 
-  if (verb == SET_OP && ptr->number_of_replicas > 0)
+  if (verb == SET_OP and ptr->number_of_replicas > 0)
   {
     request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
     WATCHPOINT_STRING("replicating");
@@ -228,7 +213,8 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
     return MEMCACHED_BUFFERED;
   }
 
-  if (noreply)
+  // No reply always assumes success
+  if (reply == false)
   {
     return MEMCACHED_SUCCESS;
   }
@@ -246,11 +232,9 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,
                                                const uint32_t flags,
                                                const uint64_t cas,
                                                const bool flush,
+                                               const bool reply,
                                                const memcached_storage_action_t verb)
 {
-  // Invert the logic to make it simpler to read the code
-  bool reply= (ptr->flags.no_reply) ? false : true;
-
   char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
   int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
   if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0)
@@ -302,43 +286,26 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,
     { memcached_literal_param("\r\n") }
   };
 
-  if (memcached_is_udp(instance->root))
-  {
-    size_t write_length= io_vector_total_size(vector, 11);
-
-    size_t cmd_size= write_length + value_length +2;
-    if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
-    {
-      return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
-    }
-
-    if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
-    {
-      memcached_io_write(instance);
-    }
-  }
-
   /* Send command header */
   memcached_return_t rc=  memcached_vdo(instance, vector, 11, flush);
   if (rc == MEMCACHED_SUCCESS)
   {
-    if (ptr->flags.no_reply and flush)
+    if (flush == false)
     {
-      rc= MEMCACHED_SUCCESS;
+      return MEMCACHED_BUFFERED;
     }
-    else if (flush == false)
+
+    if (reply == false)
     {
-      rc= MEMCACHED_BUFFERED;
+      return MEMCACHED_SUCCESS;
     }
-    else
-    {
-      char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
-      rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
 
-      if (rc == MEMCACHED_STORED)
-      {
-        rc= MEMCACHED_SUCCESS;
-      }
+    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+    rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
+
+    if (rc == MEMCACHED_STORED)
+    {
+      return MEMCACHED_SUCCESS;
     }
   }
 
@@ -347,9 +314,10 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,
     memcached_io_reset(instance);
   }
 
-  if (memcached_failed(rc) and ptr->error_messages == NULL)
+  assert(memcached_failed(rc));
+  if (memcached_has_error(ptr) == false)
   {
-    memcached_set_error(*ptr, rc, MEMCACHED_AT);
+    return memcached_set_error(*ptr, rc, MEMCACHED_AT);
   }
 
   return rc;
@@ -370,7 +338,7 @@ static inline memcached_return_t memcached_send(memcached_st *ptr,
     return rc;
   }
 
-  if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol)))
+  if (memcached_failed(rc= memcached_validate_key_length(key_length, memcached_is_binary(ptr))))
   {
     return rc;
   }
@@ -386,23 +354,27 @@ static inline memcached_return_t memcached_send(memcached_st *ptr,
   WATCHPOINT_SET(instance->io_wait_count.read= 0);
   WATCHPOINT_SET(instance->io_wait_count.write= 0);
 
-  bool flush= (bool) ((instance->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1);
-  if (ptr->flags.binary_protocol)
+
+  bool flush= true;
+  if (memcached_is_buffering(instance->root) and verb == SET_OP)
   {
-    rc= memcached_send_binary(ptr, instance, server_key,
-                              key, key_length,
-                              value, value_length, expiration,
-                              flags, cas, flush, verb);
+    flush= false;
   }
-  else
+
+  bool reply= memcached_is_replying(ptr);
+
+  if (memcached_is_binary(ptr))
   {
-    rc= memcached_send_ascii(ptr, instance,
-                             key, key_length,
-                             value, value_length, expiration,
-                             flags, cas, flush, verb);
+    return memcached_send_binary(ptr, instance, server_key,
+                                 key, key_length,
+                                 value, value_length, expiration,
+                                 flags, cas, flush, reply, verb);
   }
 
-  return rc;
+  return memcached_send_ascii(ptr, instance,
+                              key, key_length,
+                              value, value_length, expiration,
+                              flags, cas, flush, reply, verb);
 }
 
 
@@ -431,6 +403,11 @@ memcached_return_t memcached_add(memcached_st *ptr,
   rc= memcached_send(ptr, key, key_length,
                      key, key_length, value, value_length,
                      expiration, flags, 0, ADD_OP);
+
+  if (rc == MEMCACHED_NOTSTORED or rc == MEMCACHED_DATA_EXISTS)
+  {
+    memcached_set_error(*ptr, rc, MEMCACHED_AT);
+  }
   LIBMEMCACHED_MEMCACHED_ADD_END();
   return rc;
 }
@@ -546,11 +523,9 @@ memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
                                             time_t expiration,
                                             uint32_t flags)
 {
-  memcached_return_t rc;
-  rc= memcached_send(ptr, group_key, group_key_length,
-                     key, key_length, value, value_length,
-                     expiration, flags, 0, PREPEND_OP);
-  return rc;
+  return memcached_send(ptr, group_key, group_key_length,
+                        key, key_length, value, value_length,
+                        expiration, flags, 0, PREPEND_OP);
 }
 
 memcached_return_t memcached_append_by_key(memcached_st *ptr,
@@ -560,11 +535,9 @@ memcached_return_t memcached_append_by_key(memcached_st *ptr,
                                            time_t expiration,
                                            uint32_t flags)
 {
-  memcached_return_t rc;
-  rc= memcached_send(ptr, group_key, group_key_length,
-                     key, key_length, value, value_length,
-                     expiration, flags, 0, APPEND_OP);
-  return rc;
+  return memcached_send(ptr, group_key, group_key_length,
+                        key, key_length, value, value_length,
+                        expiration, flags, 0, APPEND_OP);
 }
 
 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
@@ -575,10 +548,8 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
                                         uint32_t flags,
                                         uint64_t cas)
 {
-  memcached_return_t rc;
-  rc= memcached_send(ptr, group_key, group_key_length,
-                     key, key_length, value, value_length,
-                     expiration, flags, cas, CAS_OP);
-  return rc;
+  return  memcached_send(ptr, group_key, group_key_length,
+                         key, key_length, value, value_length,
+                         expiration, flags, cas, CAS_OP);
 }
 
diff --git a/libmemcached/udp.cc b/libmemcached/udp.cc
new file mode 100644 (file)
index 0000000..905eedd
--- /dev/null
@@ -0,0 +1,77 @@
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  LibMemcached
+ *
+ *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <libmemcached/common.h>
+
+/*
+ * The udp request id consists of two seperate sections
+ *   1) The thread id
+ *   2) The message number
+ * The thread id should only be set when the memcached_st struct is created
+ * and should not be changed.
+ *
+ * The message num is incremented for each new message we send, this function
+ * extracts the message number from message_id, increments it and then
+ * writes the new value back into the header
+ */
+void increment_udp_message_id(memcached_server_write_instance_st ptr)
+{
+  struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
+  uint16_t cur_req= get_udp_datagram_request_id(header);
+  int msg_num= get_msg_num_from_request_id(cur_req);
+  int thread_id= get_thread_id_from_request_id(cur_req);
+
+  if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
+    msg_num= 0;
+
+  header->request_id= htons((uint16_t) (thread_id | msg_num));
+}
+
+bool memcached_io_init_udp_header(memcached_server_write_instance_st ptr, const uint16_t thread_id)
+{
+  if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
+  {
+    return MEMCACHED_FAILURE;
+  }
+
+  struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
+  header->request_id= htons(uint16_t((generate_udp_request_thread_id(thread_id))));
+  header->num_datagrams= htons(1);
+  header->sequence_number= htons(0);
+
+  return MEMCACHED_SUCCESS;
+}
diff --git a/libmemcached/udp.hpp b/libmemcached/udp.hpp
new file mode 100644 (file)
index 0000000..9cc5388
--- /dev/null
@@ -0,0 +1,59 @@
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  LibMemcached
+ *
+ *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#define MAX_UDP_DATAGRAM_LENGTH 1400
+#define UDP_DATAGRAM_HEADER_LENGTH 8
+#define UDP_REQUEST_ID_MSG_SIG_DIGITS 10
+#define UDP_REQUEST_ID_THREAD_MASK 0xFFFF << UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define get_udp_datagram_request_id(A) ntohs((A)->request_id)
+#define get_udp_datagram_seq_num(A) ntohs((A)->sequence_number)
+#define get_udp_datagram_num_datagrams(A) ntohs((A)->num_datagrams)
+#define get_msg_num_from_request_id(A) ( (A) & (~(UDP_REQUEST_ID_THREAD_MASK)) )
+#define get_thread_id_from_request_id(A) ( (A) & (UDP_REQUEST_ID_THREAD_MASK) ) >> UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define generate_udp_request_thread_id(A) (A) << UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define UDP_REQUEST_ID_MAX_THREAD_ID get_thread_id_from_request_id(0xFFFF)
+
+struct udp_datagram_header_st
+{
+  uint16_t request_id;
+  uint16_t sequence_number;
+  uint16_t num_datagrams;
+  uint16_t reserved;
+};
+
+bool memcached_io_init_udp_header(memcached_server_write_instance_st ptr, const uint16_t thread_id);
+void increment_udp_message_id(memcached_server_write_instance_st ptr);
index e2aac3c74ba88c24bee5560781c2d8cb66bfa616..0a5b9bbedd5414cbddab3c5fdf58f4a75e2d3378 100644 (file)
@@ -47,7 +47,7 @@ static memcached_return_t _set_verbosity(const memcached_st *,
                                          const memcached_server_st *server,
                                          void *context)
 {
const libmemcached_io_vector_st *execute= (const libmemcached_io_vector_st *)context;
libmemcached_io_vector_st *vector= (libmemcached_io_vector_st *)context;
 
   memcached_st local_memc;
   memcached_st *memc_ptr= memcached_create(&local_memc);
@@ -59,7 +59,7 @@ static memcached_return_t _set_verbosity(const memcached_st *,
     memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc_ptr, 0);
 
 
-    rc= memcached_vdo(instance, execute, 3, true);
+    rc= memcached_vdo(instance, vector, 3, true);
 
     if (rc == MEMCACHED_SUCCESS)
     {
index a4d424f0b66f44bf1dd0c0bcac9b70b850cfb7c3..83ea4460e2fefd45aadfc9093ba3474f3a8077b0 100644 (file)
@@ -112,7 +112,7 @@ static test_return_t pre_binary(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
-static bool return_value_based_on_buffering(memcached_st *memc)
+static memcached_return_t return_value_based_on_buffering(memcached_st *memc)
 {
   if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS))
   {
@@ -394,7 +394,7 @@ static test_return_t clone_test(memcached_st *memc)
       test_true(memc_clone->ketama.weighted == memc->ketama.weighted);
       test_true(memc_clone->flags.binary_protocol == memc->flags.binary_protocol);
       test_true(memc_clone->flags.hash_with_namespace == memc->flags.hash_with_namespace);
-      test_true(memc_clone->flags.no_reply == memc->flags.no_reply);
+      test_true(memc_clone->flags.reply == memc->flags.reply);
       test_true(memc_clone->flags.use_udp == memc->flags.use_udp);
       test_true(memc_clone->flags.auto_eject_hosts == memc->flags.auto_eject_hosts);
       test_true(memc_clone->flags.randomize_replica_read == memc->flags.randomize_replica_read);
@@ -530,32 +530,34 @@ static test_return_t set_test(memcached_st *memc)
 static test_return_t append_test(memcached_st *memc)
 {
   memcached_return_t rc;
-  const char *key= "fig";
   const char *in_value= "we";
-  char *out_value= NULL;
   size_t value_length;
   uint32_t flags;
 
-  rc= memcached_flush(memc, 0);
-  test_compare(MEMCACHED_SUCCESS, rc);
-
-  rc= memcached_set(memc, key, strlen(key),
-                    in_value, strlen(in_value),
-                    (time_t)0, (uint32_t)0);
-  test_compare(MEMCACHED_SUCCESS, rc);
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_flush(memc, 0));
 
-  rc= memcached_append(memc, key, strlen(key),
-                       " the", strlen(" the"),
-                       (time_t)0, (uint32_t)0);
-  test_compare(MEMCACHED_SUCCESS, rc);
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_set(memc,
+                             test_literal_param(__func__),
+                             in_value, strlen(in_value),
+                             time_t(0), uint32_t(0)));
 
-  rc= memcached_append(memc, key, strlen(key),
-                       " people", strlen(" people"),
-                       (time_t)0, (uint32_t)0);
-  test_compare(MEMCACHED_SUCCESS, rc);
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_append(memc,
+                                test_literal_param(__func__),
+                                " the", strlen(" the"),
+                                time_t(0), uint32_t(0)));
 
-  out_value= memcached_get(memc, key, strlen(key),
-                           &value_length, &flags, &rc);
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_append(memc,
+                                test_literal_param(__func__),
+                                " people", strlen(" people"),
+                                time_t(0), uint32_t(0)));
+
+  char *out_value= memcached_get(memc,
+                                 test_literal_param(__func__),
+                                 &value_length, &flags, &rc);
   test_memcmp(out_value, "we the people", strlen("we the people"));
   test_compare(strlen("we the people"), value_length);
   test_compare(MEMCACHED_SUCCESS, rc);
@@ -566,40 +568,40 @@ static test_return_t append_test(memcached_st *memc)
 
 static test_return_t append_binary_test(memcached_st *memc)
 {
-  memcached_return_t rc;
-  const char *key= "numbers";
   uint32_t store_list[] = { 23, 56, 499, 98, 32847, 0 };
-  uint32_t *value;
-  size_t value_length;
-  uint32_t flags;
-  uint32_t x;
 
-  rc= memcached_flush(memc, 0);
-  test_compare(MEMCACHED_SUCCESS, rc);
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_flush(memc, 0));
 
-  rc= memcached_set(memc,
-                    key, strlen(key),
-                    NULL, 0,
-                    (time_t)0, (uint32_t)0);
-  test_compare_got(MEMCACHED_SUCCESS, rc, memcached_strerror(NULL, rc));
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_set(memc,
+                             test_literal_param(__func__),
+                             NULL, 0,
+                             time_t(0), uint32_t(0)));
 
-  for (x= 0; store_list[x] ; x++)
+  size_t count= 0;
+  for (uint32_t x= 0; store_list[x] ; x++)
   {
-    rc= memcached_append(memc,
-                         key, strlen(key),
+    test_compare(MEMCACHED_SUCCESS,
+                 memcached_append(memc,
+                         test_literal_param(__func__),
                          (char *)&store_list[x], sizeof(uint32_t),
-                         (time_t)0, (uint32_t)0);
-    test_compare(MEMCACHED_SUCCESS, rc);
+                         time_t(0), uint32_t(0)));
+    count++;
   }
 
-  value= (uint32_t *)memcached_get(memc, key, strlen(key),
-                                   &value_length, &flags, &rc);
-  test_compare(value_length, sizeof(uint32_t) * x);
+  size_t value_length;
+  uint32_t flags;
+  memcached_return_t rc;
+  uint32_t *value= (uint32_t *)memcached_get(memc,
+                                             test_literal_param(__func__),
+                                             &value_length, &flags, &rc);
+  test_compare(value_length, sizeof(uint32_t) * count);
   test_compare(MEMCACHED_SUCCESS, rc);
 
-  for (uint32_t counter= x, *ptr= value; counter; counter--)
+  for (uint32_t counter= count, *ptr= value; counter; counter--)
   {
-    test_compare(*ptr, store_list[x - counter]);
+    test_compare(*ptr, store_list[count - counter]);
     ptr++;
   }
   free(value);
@@ -609,34 +611,31 @@ static test_return_t append_binary_test(memcached_st *memc)
 
 static test_return_t cas2_test(memcached_st *memc)
 {
-  memcached_return_t rc;
   const char *keys[]= {"fudge", "son", "food"};
   size_t key_length[]= {5, 3, 4};
   const char *value= "we the people";
   size_t value_length= strlen("we the people");
-  memcached_result_st results_obj;
-  memcached_result_st *results;
-  unsigned int set= 1;
 
   test_compare(MEMCACHED_SUCCESS, memcached_flush(memc, 0));
 
-  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, set);
+  test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, true));
 
   for (uint32_t x= 0; x < 3; x++)
   {
-    rc= memcached_set(memc, keys[x], key_length[x],
-                      keys[x], key_length[x],
-                      (time_t)50, (uint32_t)9);
-    test_compare(MEMCACHED_SUCCESS, rc);
+    test_compare(MEMCACHED_SUCCESS,
+                 memcached_set(memc, keys[x], key_length[x],
+                               keys[x], key_length[x],
+                               time_t(50), uint32_t(9)));
   }
 
   test_compare(MEMCACHED_SUCCESS, 
                memcached_mget(memc, keys, key_length, 3));
 
-  results= memcached_result_create(memc, &results_obj);
+  memcached_result_st *results= memcached_result_create(memc, NULL);
   test_true(results);
 
-  results= memcached_fetch_result(memc, &results_obj, &rc);
+  memcached_return_t rc;
+  results= memcached_fetch_result(memc, results, &rc);
   test_true(results);
   test_true(results->item_cas);
   test_compare(MEMCACHED_SUCCESS, rc);
@@ -646,38 +645,32 @@ static test_return_t cas2_test(memcached_st *memc)
   test_compare(strlen("we the people"), value_length);
   test_compare(MEMCACHED_SUCCESS, rc);
 
-  memcached_result_free(&results_obj);
+  memcached_result_free(results);
 
   return TEST_SUCCESS;
 }
 
 static test_return_t cas_test(memcached_st *memc)
 {
-  const char *key= "fun";
-  size_t key_length= strlen(key);
-  const char *value= "we the people";
-  const char* keys[2] = { key, NULL };
-  size_t keylengths[2] = { strlen(key), 0 };
-  size_t value_length= strlen(value);
-  const char *value2= "change the value";
-  size_t value2_length= strlen(value2);
+  const char* keys[2] = { __func__, NULL };
+  size_t keylengths[2] = { strlen(__func__), 0 };
 
   memcached_result_st results_obj;
-  memcached_result_st *results;
 
   test_compare(MEMCACHED_SUCCESS, memcached_flush(memc, 0));
 
   test_skip(true, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, true));
 
   test_compare(MEMCACHED_SUCCESS, 
-               memcached_set(memc, key, strlen(key),
-                             value, strlen(value),
+               memcached_set(memc,
+                             test_literal_param(__func__),
+                             test_literal_param("we the people"),
                              (time_t)0, (uint32_t)0));
 
   test_compare(MEMCACHED_SUCCESS,
                memcached_mget(memc, keys, keylengths, 1));
 
-  results= memcached_result_create(memc, &results_obj);
+  memcached_result_st *results= memcached_result_create(memc, &results_obj);
   test_true(results);
 
   memcached_return_t rc;
@@ -685,10 +678,11 @@ static test_return_t cas_test(memcached_st *memc)
   test_true(results);
   test_compare(MEMCACHED_SUCCESS, rc);
   test_true(memcached_result_cas(results));
-  test_memcmp(value, memcached_result_value(results), value_length);
-  test_compare(strlen(memcached_result_value(results)), value_length);
-  test_compare(MEMCACHED_SUCCESS, rc);
-  uint64_t cas = memcached_result_cas(results);
+  test_memcmp("we the people", memcached_result_value(results), test_literal_param_size("we the people"));
+  test_compare(test_literal_param_size("we the people"),
+               strlen(memcached_result_value(results)));
+
+  uint64_t cas= memcached_result_cas(results);
 
 #if 0
   results= memcached_fetch_result(memc, &results_obj, &rc);
@@ -696,15 +690,21 @@ static test_return_t cas_test(memcached_st *memc)
   test_true(results == NULL);
 #endif
 
-  rc= memcached_cas(memc, key, key_length, value2, value2_length, 0, 0, cas);
-  test_compare(MEMCACHED_SUCCESS, rc);
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_cas(memc,
+                             test_literal_param(__func__),
+                             test_literal_param("change the value"),
+                             0, 0, cas));
 
   /*
    * The item will have a new cas value, so try to set it again with the old
    * value. This should fail!
    */
-  rc= memcached_cas(memc, key, key_length, value2, value2_length, 0, 0, cas);
-  test_compare(MEMCACHED_DATA_EXISTS, rc);
+  test_compare(MEMCACHED_DATA_EXISTS,
+               memcached_cas(memc,
+                             test_literal_param(__func__),
+                             test_literal_param("change the value"),
+                             0, 0, cas));
 
   memcached_result_free(&results_obj);
 
@@ -753,31 +753,21 @@ static test_return_t prepend_test(memcached_st *memc)
 */
 static test_return_t add_test(memcached_st *memc)
 {
-  memcached_return_t rc;
-  const char *key= "foo";
-  const char *value= "when we sanitize";
-  unsigned long long setting_value;
-
-  setting_value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NO_BLOCK);
+  test_compare_hint(return_value_based_on_buffering(memc),
+                    memcached_set(memc,
+                                  test_literal_param(__func__),
+                                  test_literal_param("when we sanitize"),
+                                  time_t(0), uint32_t(0)),
+                    memcached_last_error_message(memc));
 
-  rc= memcached_set(memc, key, strlen(key),
-                    value, strlen(value),
-                    (time_t)0, (uint32_t)0);
-  test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED);
   memcached_quit(memc);
-  rc= memcached_add(memc, key, strlen(key),
-                    value, strlen(value),
-                    (time_t)0, (uint32_t)0);
 
-  /* Too many broken OS'es have broken loopback in async, so we can't be sure of the result */
-  if (setting_value)
-  {
-    test_true(rc == MEMCACHED_NOTSTORED or rc == MEMCACHED_STORED);
-  }
-  else
-  {
-    test_true(rc == MEMCACHED_NOTSTORED or rc == MEMCACHED_DATA_EXISTS);
-  }
+  test_compare_hint(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) ? MEMCACHED_DATA_EXISTS : MEMCACHED_NOTSTORED,
+                    memcached_add(memc,
+                                  test_literal_param(__func__),
+                                  test_literal_param("try something else"),
+                                  time_t(0), uint32_t(0)),
+                    memcached_last_error_message(memc));
 
   return TEST_SUCCESS;
 }
@@ -807,20 +797,17 @@ static test_return_t add_wrapper(memcached_st *memc)
 
 static test_return_t replace_test(memcached_st *memc)
 {
-  memcached_return_t rc;
-  const char *key= "foo";
-  const char *value= "when we sanitize";
-  const char *original= "first we insert some data";
-
-  rc= memcached_set(memc, key, strlen(key),
-                    original, strlen(original),
-                    (time_t)0, (uint32_t)0);
-  test_true(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+  test_compare(return_value_based_on_buffering(memc),
+               memcached_set(memc,
+                             test_literal_param(__func__),
+                             test_literal_param("when we sanitize"),
+                             time_t(0), uint32_t(0)));
 
   test_compare(MEMCACHED_SUCCESS,
-               memcached_replace(memc, key, strlen(key),
-                                 value, strlen(value),
-                                 (time_t)0, (uint32_t)0));
+               memcached_replace(memc,
+                                 test_literal_param(__func__),
+                                 test_literal_param("first we insert some data"),
+                                 time_t(0), uint32_t(0)));
 
   return TEST_SUCCESS;
 }
@@ -833,11 +820,10 @@ static test_return_t delete_test(memcached_st *memc)
                              test_literal_param("when we sanitize"),
                              time_t(0), uint32_t(0)));
 
-  memcached_return_t rc= memcached_delete(memc, 
-                                          test_literal_param(__func__),
-                                          time_t(0));
-  test_compare_hint(MEMCACHED_SUCCESS,
-                    rc,
+  test_compare_hint(return_value_based_on_buffering(memc),
+                    memcached_delete(memc, 
+                                     test_literal_param(__func__),
+                                     time_t(0)),
                     memcached_last_error_message(memc));
 
   return TEST_SUCCESS;
@@ -853,13 +839,11 @@ static test_return_t flush_test(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
-static memcached_return_t  server_function(const memcached_st *ptr,
-                                           const memcached_server_st *server,
-                                           void *context)
+static memcached_return_t  server_function(const memcached_st *,
+                                           const memcached_server_st *,
+                                           void *)
 {
-  (void)ptr; (void)server; (void)context;
   /* Do Nothing */
-
   return MEMCACHED_SUCCESS;
 }
 
@@ -879,7 +863,6 @@ static test_return_t bad_key_test(memcached_st *memc)
   memcached_return_t rc;
   const char *key= "foo bad";
   uint32_t flags;
-  memcached_st *memc_clone;
 
   uint64_t query_id= memcached_query_id(memc);
   
@@ -888,7 +871,7 @@ static test_return_t bad_key_test(memcached_st *memc)
 
   test_compare(query_id, memcached_query_id(memc)); // We should not increase the query_id for memcached_behavior_get()
 
-  memc_clone= memcached_clone(NULL, memc);
+  memcached_st *memc_clone= memcached_clone(NULL, memc);
   test_true(memc_clone);
 
   query_id= memcached_query_id(memc_clone);
@@ -1001,15 +984,14 @@ static memcached_return_t read_through_trigger(memcached_st *memc,
 
 static test_return_t read_through(memcached_st *memc)
 {
-  memcached_return_t rc;
-  const char *key= "foo";
-  char *string;
-  size_t string_length;
-  uint32_t flags;
   memcached_trigger_key_fn cb= (memcached_trigger_key_fn)read_through_trigger;
 
-  string= memcached_get(memc, key, strlen(key),
-                        &string_length, &flags, &rc);
+  size_t string_length;
+  uint32_t flags;
+  memcached_return_t rc;
+  char *string= memcached_get(memc,
+                              test_literal_param(__func__),
+                              &string_length, &flags, &rc);
 
   test_compare(MEMCACHED_NOTFOUND, rc);
   test_false(string_length);
@@ -1018,7 +1000,8 @@ static test_return_t read_through(memcached_st *memc)
   rc= memcached_callback_set(memc, MEMCACHED_CALLBACK_GET_FAILURE, *(void **)&cb);
   test_compare(MEMCACHED_SUCCESS, rc);
 
-  string= memcached_get(memc, key, strlen(key),
+  string= memcached_get(memc,
+                        test_literal_param(__func__),
                         &string_length, &flags, &rc);
 
   test_compare(MEMCACHED_SUCCESS, rc);
@@ -1027,7 +1010,8 @@ static test_return_t read_through(memcached_st *memc)
   test_strcmp(READ_THROUGH_VALUE, string);
   free(string);
 
-  string= memcached_get(memc, key, strlen(key),
+  string= memcached_get(memc,
+                        test_literal_param(__func__),
                         &string_length, &flags, &rc);
 
   test_compare(MEMCACHED_SUCCESS, rc);
@@ -1043,17 +1027,19 @@ static test_return_t read_through(memcached_st *memc)
 static test_return_t get_test(memcached_st *memc)
 {
   memcached_return_t rc;
-  const char *key= "foo";
   char *string;
   size_t string_length;
   uint32_t flags;
 
   uint64_t query_id= memcached_query_id(memc);
-  rc= memcached_delete(memc, key, strlen(key), (time_t)0);
+  rc= memcached_delete(memc,
+                       test_literal_param(__func__),
+                       time_t(0));
   test_true_got(rc == MEMCACHED_BUFFERED || rc == MEMCACHED_NOTFOUND, memcached_last_error_message(memc));
   test_compare(query_id +1, memcached_query_id(memc));
 
-  string= memcached_get(memc, key, strlen(key),
+  string= memcached_get(memc,
+                        test_literal_param(__func__),
                         &string_length, &flags, &rc);
 
   test_compare_got(MEMCACHED_NOTFOUND, rc, memcached_strerror(NULL, rc));
@@ -1065,14 +1051,14 @@ static test_return_t get_test(memcached_st *memc)
 
 static test_return_t get_test2(memcached_st *memc)
 {
-  const char *key= "foo";
   const char *value= "when we sanitize";
 
   uint64_t query_id= memcached_query_id(memc);
-  memcached_return_t rc= memcached_set(memc, key, strlen(key),
-                                       value, strlen(value),
-                                       (time_t)0, (uint32_t)0);
-  test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED);
+  test_compare(return_value_based_on_buffering(memc),
+               memcached_set(memc,
+                             test_literal_param(__func__),
+                             value, strlen(value),
+                             time_t(0), uint32_t(0)));
   test_compare(query_id +1, memcached_query_id(memc));
 
   query_id= memcached_query_id(memc);
@@ -1080,7 +1066,9 @@ static test_return_t get_test2(memcached_st *memc)
 
   uint32_t flags;
   size_t string_length;
-  char *string= memcached_get(memc, key, strlen(key),
+  memcached_return_t rc;
+  char *string= memcached_get(memc,
+                              test_literal_param(__func__),
                               &string_length, &flags, &rc);
   test_compare(query_id +1, memcached_query_id(memc));
 
@@ -1099,11 +1087,11 @@ static test_return_t set_test2(memcached_st *memc)
 {
   for (uint32_t x= 0; x < 10; x++)
   {
-    memcached_return_t rc= memcached_set(memc,
-                                         test_literal_param("foo"),
-                                         test_literal_param("train in the brain"),
-                                         time_t(0), uint32_t(0));
-    test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED);
+    test_compare(return_value_based_on_buffering(memc),
+                 memcached_set(memc,
+                               test_literal_param("foo"),
+                               test_literal_param("train in the brain"),
+                               time_t(0), uint32_t(0)));
   }
 
   return TEST_SUCCESS;
@@ -1128,10 +1116,11 @@ static test_return_t set_test3(memcached_st *memc)
     snprintf(key, sizeof(key), "foo%u", x);
 
     uint64_t query_id= memcached_query_id(memc);
-    memcached_return_t rc= memcached_set(memc, key, strlen(key),
-                                         &value[0], value.size(),
-                                         (time_t)0, (uint32_t)0);
-    test_true_got(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED, memcached_strerror(NULL, rc));
+    test_compare_hint(return_value_based_on_buffering(memc),
+                      memcached_set(memc, key, strlen(key),
+                                    &value[0], value.size(),
+                                    time_t(0), uint32_t(0)),
+                      memcached_last_error_message(memc));
     test_compare(query_id +1, memcached_query_id(memc));
   }
 
@@ -1140,7 +1129,6 @@ static test_return_t set_test3(memcached_st *memc)
 
 static test_return_t get_test3(memcached_st *memc)
 {
-  const char *key= "foo";
   size_t value_length= 8191;
 
   std::vector<char> value;
@@ -1150,20 +1138,23 @@ static test_return_t get_test3(memcached_st *memc)
     value.push_back(char(x % 127));
   }
 
-  memcached_return_t rc;
-  rc= memcached_set(memc, key, strlen(key),
-                    &value[0], value.size(),
-                    (time_t)0, (uint32_t)0);
-  test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED);
+  test_compare_hint(return_value_based_on_buffering(memc),
+                    memcached_set(memc,
+                                  test_literal_param(__func__),
+                                  &value[0], value.size(),
+                                  time_t(0), uint32_t(0)),
+                    memcached_last_error_message(memc));
 
   size_t string_length;
   uint32_t flags;
-  char *string= memcached_get(memc, key, strlen(key),
+  memcached_return_t rc;
+  char *string= memcached_get(memc,
+                              test_literal_param(__func__),
                               &string_length, &flags, &rc);
 
   test_compare(MEMCACHED_SUCCESS, rc);
   test_true(string);
-  test_compare(string_length, value_length);
+  test_compare(value.size(), string_length);
   test_memcmp(string, &value[0], string_length);
 
   free(string);
@@ -1173,7 +1164,6 @@ static test_return_t get_test3(memcached_st *memc)
 
 static test_return_t get_test4(memcached_st *memc)
 {
-  const char *key= "foo";
   size_t value_length= 8191;
 
   std::vector<char> value;
@@ -1183,21 +1173,25 @@ static test_return_t get_test4(memcached_st *memc)
     value.push_back(char(x % 127));
   }
 
-  memcached_return_t rc= memcached_set(memc, key, strlen(key),
-                                       &value[0], value.size(),
-                                       (time_t)0, (uint32_t)0);
-  test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED);
+  test_compare_hint(return_value_based_on_buffering(memc),
+                    memcached_set(memc,
+                                  test_literal_param(__func__),
+                                  &value[0], value.size(),
+                                  time_t(0), uint32_t(0)),
+                    memcached_last_error_message(memc));
 
   for (uint32_t x= 0; x < 10; x++)
   {
     uint32_t flags;
     size_t string_length;
-    char *string= memcached_get(memc, key, strlen(key),
+    memcached_return_t rc;
+    char *string= memcached_get(memc,
+                                test_literal_param(__func__),
                                 &string_length, &flags, &rc);
 
     test_compare(MEMCACHED_SUCCESS, rc);
     test_true(string);
-    test_compare(string_length, value_length);
+    test_compare(value.size(), string_length);
     test_memcmp(string, &value[0], string_length);
     free(string);
   }
@@ -1221,14 +1215,18 @@ static test_return_t get_test5(memcached_st *memc)
   uint32_t flags;
   size_t rlen;
 
-  memcached_return_t rc= memcached_set(memc, keys[0], lengths[0],
-                                     keys[0], lengths[0], 0, 0);
+  test_compare_hint(return_value_based_on_buffering(memc),
+                    memcached_set(memc, keys[0], lengths[0],
+                                  keys[0], lengths[0],
+                                  time_t(0), uint32_t(0)),
+                    memcached_last_error_message(memc));
   test_compare(MEMCACHED_SUCCESS, memcached_mget(memc, keys, lengths, test_array_length(keys)));
 
   memcached_result_st results_obj;
   memcached_result_st *results= memcached_result_create(memc, &results_obj);
   test_true(results);
 
+  memcached_return_t rc;
   results= memcached_fetch_result(memc, &results_obj, &rc);
   test_true(results);
 
@@ -1260,7 +1258,11 @@ static test_return_t mget_end(memcached_st *memc)
   // Set foo and foo2
   for (size_t x= 0; x < test_array_length(keys); x++)
   {
-    test_compare(MEMCACHED_SUCCESS, memcached_set(memc, keys[x], lengths[x], values[x], strlen(values[x]), (time_t)0, (uint32_t)0));
+    test_compare(MEMCACHED_SUCCESS,
+                 memcached_set(memc,
+                               keys[x], lengths[x],
+                               values[x], strlen(values[x]),
+                               time_t(0), uint32_t(0)));
   }
 
   char *string;
@@ -1268,7 +1270,10 @@ static test_return_t mget_end(memcached_st *memc)
   uint32_t flags;
 
   // retrieve both via mget
-  test_compare(MEMCACHED_SUCCESS, memcached_mget(memc, keys, lengths, test_array_length(keys)));
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_mget(memc,
+                              keys, lengths,
+                              test_array_length(keys)));
 
   char key[MEMCACHED_MAX_KEY];
   size_t key_length;
@@ -1296,8 +1301,8 @@ static test_return_t mget_end(memcached_st *memc)
   test_null(string);
 
   // now get just one
-  rc= memcached_mget(memc, keys, lengths, 1);
-  test_compare(MEMCACHED_SUCCESS, rc);
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_mget(memc, keys, lengths, 1));
 
   string= memcached_fetch(memc, key, &key_length, &string_length, &flags, &rc);
   test_compare(key_length, lengths[0]);
index 227cea633462933f481d9373d719e752a1095152..590ad7b9d816da5bc5f9c9aa70611e41ce1867af 100644 (file)
@@ -48,6 +48,7 @@ using namespace libtest;
 #include <libmemcached-1.0/memcached.h>
 #include <libmemcached/server_instance.h>
 #include <libmemcached/io.h>
+#include <libmemcached/udp.hpp>
 #include <libmemcachedutil-1.0/util.h>
 
 #include <cassert>
@@ -205,13 +206,13 @@ static test_return_t set_udp_behavior_test(memcached_st *memc)
 
   test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_DISTRIBUTION, memc->distribution));
   test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, true));
-  test_true(memc->flags.use_udp);
-  test_true(memc->flags.no_reply);
+  test_compare(true, memc->flags.use_udp);
+  test_compare(false, memc->flags.reply);
 
   test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, false));
-  test_false(memc->flags.use_udp);
+  test_compare(false, memc->flags.use_udp);
   test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, false));
-  test_false(memc->flags.no_reply);
+  test_compare(true, memc->flags.reply);
 
   return TEST_SUCCESS;
 }
@@ -237,29 +238,23 @@ static test_return_t udp_set_test(memcached_st *memc)
     memcached_server_instance_st instance= memcached_server_instance_by_position(memc, server_key);
     size_t init_offset= instance->write_buffer_offset;
 
-    memcached_return_t rc= memcached_set(memc, test_literal_param("foo"),
-                                         test_literal_param("when we sanitize"),
-                                         time_t(0), uint32_t(0));
-    test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED);
-    /** NB, the check below assumes that if new write_ptr is less than
-     *  the original write_ptr that we have flushed. For large payloads, this
-     *  maybe an invalid assumption, but for the small payload we have it is OK
-     */
-    if (rc == MEMCACHED_SUCCESS or instance->write_buffer_offset < init_offset)
+    test_compare_hint(MEMCACHED_SUCCESS, 
+                      memcached_set(memc,
+                                    test_literal_param("foo"),
+                                    test_literal_param("when we sanitize"),
+                                    time_t(0), uint32_t(0)),
+                      memcached_last_error_message(memc));
+
+    /*
+      NB, the check below assumes that if new write_ptr is less than
+      the original write_ptr that we have flushed. For large payloads, this
+      maybe an invalid assumption, but for the small payload we have it is OK
+    */
+    if (instance->write_buffer_offset < init_offset)
     {
       increment_request_id(&expected_ids[server_key]);
     }
 
-    if (rc == MEMCACHED_SUCCESS)
-    {
-      test_true(instance->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
-    }
-    else
-    {
-      test_true(instance->write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH);
-      test_true(instance->write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH);
-    }
-
     test_compare(TEST_SUCCESS, post_udp_op_check(memc, expected_ids));
   }
 
@@ -269,7 +264,8 @@ static test_return_t udp_set_test(memcached_st *memc)
 static test_return_t udp_buffered_set_test(memcached_st *memc)
 {
   test_true(memc);
-  test_compare(MEMCACHED_INVALID_ARGUMENTS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, true));
+  test_compare(MEMCACHED_INVALID_ARGUMENTS,
+               memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, true));
   return TEST_SUCCESS;
 }
 
@@ -305,23 +301,14 @@ static test_return_t udp_delete_test(memcached_st *memc)
     memcached_server_instance_st instance= memcached_server_instance_by_position(memc, server_key);
     size_t init_offset= instance->write_buffer_offset;
 
-    memcached_return_t rc= memcached_delete(memc, test_literal_param("foo"), 0);
-    test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED);
+    test_compare(MEMCACHED_SUCCESS,
+                 memcached_delete(memc, test_literal_param("foo"), 0));
 
-    if (rc == MEMCACHED_SUCCESS or instance->write_buffer_offset < init_offset)
+    if (instance->write_buffer_offset < init_offset)
     {
       increment_request_id(&expected_ids[server_key]);
     }
 
-    if (rc == MEMCACHED_SUCCESS)
-    {
-      test_true(instance->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
-    }
-    else
-    {
-      test_true(instance->write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH);
-      test_true(instance->write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH);
-    }
     test_compare(TEST_SUCCESS, post_udp_op_check(memc, expected_ids));
   }
 
@@ -373,9 +360,10 @@ static test_return_t udp_flush_test(memcached_st *memc)
 
 static test_return_t udp_incr_test(memcached_st *memc)
 {
-  test_compare(MEMCACHED_SUCCESS, memcached_set(memc, test_literal_param("incr"), 
-                                                test_literal_param("1"),
-                                                (time_t)0, (uint32_t)0));
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_set(memc, test_literal_param("incr"), 
+                             test_literal_param("1"),
+                             (time_t)0, (uint32_t)0));
 
   Expected expected_ids;
   get_udp_request_ids(memc, expected_ids);
@@ -391,19 +379,23 @@ static test_return_t udp_incr_test(memcached_st *memc)
 
 static test_return_t udp_decr_test(memcached_st *memc)
 {
-  test_compare(MEMCACHED_SUCCESS, memcached_set(memc, 
-                                                test_literal_param("decr"),
-                                                test_literal_param("1"),
-                                                (time_t)0, (uint32_t)0));
+  test_compare(MEMCACHED_SUCCESS,
+               memcached_set(memc, 
+                             test_literal_param(__func__),
+                             test_literal_param("1"),
+                             (time_t)0, (uint32_t)0));
 
   Expected expected_ids;
   get_udp_request_ids(memc, expected_ids);
 
-  unsigned int server_key= memcached_generate_hash(memc, test_literal_param("decr"));
+  unsigned int server_key= memcached_generate_hash(memc,
+                                                   test_literal_param(__func__));
   increment_request_id(&expected_ids[server_key]);
 
   uint64_t newvalue;
-  test_compare(MEMCACHED_SUCCESS, memcached_decrement(memc, test_literal_param("decr"), 1, &newvalue));
+  test_compare(MEMCACHED_SUCCESS, memcached_decrement(memc,
+                                                      test_literal_param(__func__),
+                                                      1, &newvalue));
 
   return post_udp_op_check(memc, expected_ids);
 }
@@ -427,7 +419,8 @@ static test_return_t udp_version_test(memcached_st *memc)
   Expected expected_ids;
   get_udp_request_ids(memc, expected_ids);
 
-  test_compare(MEMCACHED_NOT_SUPPORTED, memcached_version(memc));
+  test_compare(MEMCACHED_NOT_SUPPORTED,
+               memcached_version(memc));
 
   return post_udp_op_check(memc, expected_ids);
 }