From 51351e672a6a1626e09d5a9d41e3229df44fc3c8 Mon Sep 17 00:00:00 2001 From: Brian Aker Date: Wed, 28 Dec 2011 11:57:48 -0800 Subject: [PATCH] Update for storage to now use vector --- libmemcached-1.0/struct/memcached.h | 2 +- libmemcached/auto.cc | 139 +++++----- libmemcached/behavior.cc | 12 +- libmemcached/common.h | 1 + libmemcached/delete.cc | 80 +++--- libmemcached/do.cc | 26 +- libmemcached/do.hpp | 2 +- libmemcached/exist.cc | 6 + libmemcached/flush.cc | 8 +- libmemcached/include.am | 2 + libmemcached/io.cc | 43 +-- libmemcached/io.h | 20 -- libmemcached/io.hpp | 5 +- libmemcached/is.h | 8 + libmemcached/key.cc | 22 +- libmemcached/memcached.cc | 27 +- libmemcached/response.cc | 4 +- libmemcached/storage.cc | 149 +++++----- libmemcached/udp.cc | 77 ++++++ libmemcached/udp.hpp | 59 ++++ libmemcached/verbosity.cc | 4 +- tests/libmemcached-1.0/mem_functions.cc | 345 ++++++++++++------------ tests/mem_udp.cc | 85 +++--- 23 files changed, 588 insertions(+), 538 deletions(-) create mode 100644 libmemcached/udp.cc create mode 100644 libmemcached/udp.hpp diff --git a/libmemcached-1.0/struct/memcached.h b/libmemcached-1.0/struct/memcached.h index 152f3103..564eb74e 100644 --- a/libmemcached-1.0/struct/memcached.h +++ b/libmemcached-1.0/struct/memcached.h @@ -54,7 +54,7 @@ struct memcached_st { bool buffer_requests:1; bool hash_with_namespace:1; bool no_block:1; // Don't block - bool no_reply:1; + bool reply:1; bool randomize_replica_read:1; bool support_cas:1; bool tcp_nodelay:1; diff --git a/libmemcached/auto.cc b/libmemcached/auto.cc index 01a0132c..174f2623 100644 --- a/libmemcached/auto.cc +++ b/libmemcached/auto.cc @@ -37,39 +37,26 @@ #include -static memcached_return_t text_incr_decr(memcached_st *ptr, +static memcached_return_t text_incr_decr(memcached_server_write_instance_st instance, const bool is_incr, - const char *group_key, size_t group_key_length, const char *key, size_t key_length, - uint64_t offset, + const uint64_t offset, + const bool reply, uint64_t& numeric_value) { char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - uint32_t server_key; - memcached_server_write_instance_st instance; - - // Invert the logic to make it simpler to read the code - bool reply= (ptr->flags.no_reply) ? false : true; - - if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1))) - { - return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT); - } - - server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); - instance= memcached_server_instance_fetch(ptr, server_key); int send_length= snprintf(buffer, sizeof(buffer), " %" PRIu64, offset); if (size_t(send_length) >= sizeof(buffer) or send_length < 0) { - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, + return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)")); } struct libmemcached_io_vector_st vector[]= { { memcached_literal_param("incr ") }, - { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) }, + { memcached_array_string(instance->root->_namespace), memcached_array_size(instance->root->_namespace) }, { key, key_length }, { buffer, send_length }, { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") }, @@ -82,7 +69,13 @@ static memcached_return_t text_incr_decr(memcached_st *ptr, } memcached_return_t rc= memcached_vdo(instance, vector, 6, true); - if (reply == false or memcached_failed(rc)) + + if (reply == false) + { + return MEMCACHED_SUCCESS; + } + + if (memcached_failed(rc)) { numeric_value= UINT64_MAX; return rc; @@ -93,19 +86,16 @@ static memcached_return_t text_incr_decr(memcached_st *ptr, return memcached_set_error(*instance, rc, MEMCACHED_AT); } -static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, - const char *group_key, size_t group_key_length, - const char *key, size_t key_length, - uint64_t offset, uint64_t initial, - uint32_t expiration, +static memcached_return_t binary_incr_decr(memcached_server_write_instance_st instance, + protocol_binary_command cmd, + const char *key, const size_t key_length, + const uint64_t offset, + const uint64_t initial, + const uint32_t expiration, + const bool reply, uint64_t *value) { - bool no_reply= ptr->flags.no_reply; - - uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); - memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); - - if (no_reply) + if (reply == false) { if(cmd == PROTOCOL_BINARY_CMD_DECREMENT) { @@ -121,10 +111,10 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, request.message.header.request.magic= PROTOCOL_BINARY_REQ; request.message.header.request.opcode= cmd; - request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace))); + request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(instance->root->_namespace))); request.message.header.request.extlen= 20; request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; - request.message.header.request.bodylen= htonl((uint32_t)(key_length + memcached_array_size(ptr->_namespace) +request.message.header.request.extlen)); + request.message.header.request.bodylen= htonl((uint32_t)(key_length + memcached_array_size(instance->root->_namespace) +request.message.header.request.extlen)); request.message.body.delta= memcached_htonll(offset); request.message.body.initial= memcached_htonll(initial); request.message.body.expiration= htonl((uint32_t) expiration); @@ -132,7 +122,7 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, struct libmemcached_io_vector_st vector[]= { { request.bytes, sizeof(request.bytes) }, - { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) }, + { memcached_array_string(instance->root->_namespace), memcached_array_size(instance->root->_namespace) }, { key, key_length } }; @@ -140,10 +130,10 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, if (memcached_failed(rc= memcached_vdo(instance, vector, 3, true))) { memcached_io_reset(instance); - return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc; + return MEMCACHED_WRITE_FAILURE; } - if (no_reply) + if (reply == false) { return MEMCACHED_SUCCESS; } @@ -190,17 +180,28 @@ memcached_return_t memcached_increment_by_key(memcached_st *ptr, return rc; } + if (memcached_failed(rc= memcached_key_test(*ptr, (const char **)&key, &key_length, 1))) + { + return memcached_set_error(*ptr, rc, MEMCACHED_AT); + } + + uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); + + bool reply= memcached_is_replying(instance->root); + LIBMEMCACHED_MEMCACHED_INCREMENT_START(); - if (ptr->flags.binary_protocol) + if (memcached_is_binary(ptr)) { - rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_INCREMENT, - group_key, group_key_length, key, key_length, - (uint64_t)offset, 0, MEMCACHED_EXPIRATION_NOT_ADD, + rc= binary_incr_decr(instance, PROTOCOL_BINARY_CMD_INCREMENT, + key, key_length, + uint64_t(offset), 0, MEMCACHED_EXPIRATION_NOT_ADD, + reply, value); } else { - rc= text_incr_decr(ptr, true, group_key, group_key_length, key, key_length, offset, *value); + rc= text_incr_decr(instance, true, key, key_length, offset, reply, *value); } LIBMEMCACHED_MEMCACHED_INCREMENT_END(); @@ -226,23 +227,29 @@ memcached_return_t memcached_decrement_by_key(memcached_st *ptr, return rc; } - if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol))) + if (memcached_failed(rc= memcached_key_test(*ptr, (const char **)&key, &key_length, 1))) { - return rc; + return memcached_set_error(*ptr, rc, MEMCACHED_AT); } + uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); + + bool reply= memcached_is_replying(instance->root); + LIBMEMCACHED_MEMCACHED_DECREMENT_START(); - if (ptr->flags.binary_protocol) + if (memcached_is_binary(ptr)) { - rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_DECREMENT, - group_key, group_key_length, key, key_length, - (uint64_t)offset, 0, MEMCACHED_EXPIRATION_NOT_ADD, + rc= binary_incr_decr(instance, PROTOCOL_BINARY_CMD_DECREMENT, + key, key_length, + offset, 0, MEMCACHED_EXPIRATION_NOT_ADD, + reply, value); } else { - rc= text_incr_decr(ptr, false, group_key, group_key_length, key, key_length, offset, *value); + rc= text_incr_decr(instance, false, key, key_length, offset, reply, *value); } LIBMEMCACHED_MEMCACHED_DECREMENT_END(); @@ -285,17 +292,23 @@ memcached_return_t memcached_increment_with_initial_by_key(memcached_st *ptr, return rc; } - if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol))) + if (memcached_failed(rc= memcached_key_test(*ptr, (const char **)&key, &key_length, 1))) { - return rc; + return memcached_set_error(*ptr, rc, MEMCACHED_AT); } + uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); + + bool reply= memcached_is_replying(instance->root); + LIBMEMCACHED_MEMCACHED_INCREMENT_WITH_INITIAL_START(); - if (ptr->flags.binary_protocol) + if (memcached_is_binary(ptr)) { - rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_INCREMENT, - group_key, group_key_length, key, key_length, - offset, initial, (uint32_t)expiration, + rc= binary_incr_decr(instance, PROTOCOL_BINARY_CMD_INCREMENT, + key, key_length, + offset, initial, uint32_t(expiration), + reply, value); } else @@ -343,17 +356,24 @@ memcached_return_t memcached_decrement_with_initial_by_key(memcached_st *ptr, return rc; } - if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol))) + if (memcached_failed(rc= memcached_key_test(*ptr, (const char **)&key, &key_length, 1))) { - return rc; + return memcached_set_error(*ptr, rc, MEMCACHED_AT); } + uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); + + bool reply= memcached_is_replying(instance->root); + + LIBMEMCACHED_MEMCACHED_INCREMENT_WITH_INITIAL_START(); - if (ptr->flags.binary_protocol) + if (memcached_is_binary(ptr)) { - rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_DECREMENT, - group_key, group_key_length, key, key_length, - offset, initial, (uint32_t)expiration, + rc= binary_incr_decr(instance, PROTOCOL_BINARY_CMD_DECREMENT, + key, key_length, + offset, initial, uint32_t(expiration), + reply, value); } else @@ -365,4 +385,3 @@ memcached_return_t memcached_decrement_with_initial_by_key(memcached_st *ptr, return rc; } - diff --git a/libmemcached/behavior.cc b/libmemcached/behavior.cc index 71b3f0c0..abf69996 100644 --- a/libmemcached/behavior.cc +++ b/libmemcached/behavior.cc @@ -128,9 +128,13 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr, ptr->flags.use_udp= bool(data); if (bool(data)) { - ptr->flags.no_reply= true; + ptr->flags.reply= false; ptr->flags.buffer_requests= false; } + else + { + ptr->flags.reply= true; + } break; case MEMCACHED_BEHAVIOR_TCP_NODELAY: @@ -236,7 +240,9 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr, return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("MEMCACHED_BEHAVIOR_NOREPLY cannot be disabled while MEMCACHED_BEHAVIOR_USE_UDP is enabled.")); } - ptr->flags.no_reply= bool(data); + // We reverse the logic here to make it easier to understand throughout the + // code. + ptr->flags.reply= bool(data) ? false : true; break; case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS: @@ -439,7 +445,7 @@ uint64_t memcached_behavior_get(memcached_st *ptr, return ptr->flags.hash_with_namespace; case MEMCACHED_BEHAVIOR_NOREPLY: - return ptr->flags.no_reply; + return ptr->flags.reply ? false : true; case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS: return ptr->flags.auto_eject_hosts; diff --git a/libmemcached/common.h b/libmemcached/common.h index 57073504..0f33a784 100644 --- a/libmemcached/common.h +++ b/libmemcached/common.h @@ -105,6 +105,7 @@ memcached_return_t memcached_server_execute(memcached_st *ptr, #ifdef __cplusplus #include #include +#include #include #include #include diff --git a/libmemcached/delete.cc b/libmemcached/delete.cc index 544ef49e..b37f2688 100644 --- a/libmemcached/delete.cc +++ b/libmemcached/delete.cc @@ -44,18 +44,17 @@ memcached_return_t memcached_delete(memcached_st *ptr, const char *key, size_t k return memcached_delete_by_key(ptr, key, key_length, key, key_length, expiration); } -static inline memcached_return_t ascii_delete(memcached_st *ptr, - memcached_server_write_instance_st instance, +static inline memcached_return_t ascii_delete(memcached_server_write_instance_st instance, uint32_t , const char *key, - size_t key_length, - bool& reply, - bool& flush) + const size_t key_length, + const bool reply, + const bool flush) { struct libmemcached_io_vector_st vector[]= { { memcached_literal_param("delete ") }, - { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) }, + { memcached_array_string(instance->root->_namespace), memcached_array_size(instance->root->_namespace) }, { key, key_length }, { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") }, { memcached_literal_param("\r\n") } @@ -80,13 +79,12 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr, return memcached_vdo(instance, vector, 5, flush); } -static inline memcached_return_t binary_delete(memcached_st *ptr, - memcached_server_write_instance_st instance, +static inline memcached_return_t binary_delete(memcached_server_write_instance_st instance, uint32_t server_key, const char *key, - size_t key_length, - bool& reply, - bool& flush) + const size_t key_length, + const bool reply, + const bool flush) { protocol_binary_request_delete request= {}; @@ -99,11 +97,11 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, { request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ; } - request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace))); + request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(instance->root->_namespace))); request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; - request.message.header.request.bodylen= htonl((uint32_t)(key_length + memcached_array_size(ptr->_namespace))); + request.message.header.request.bodylen= htonl((uint32_t)(key_length + memcached_array_size(instance->root->_namespace))); - if (ptr->flags.use_udp and flush == false) + if (memcached_is_udp(instance->root)) { size_t cmd_size= sizeof(request.bytes) + key_length; if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) @@ -120,7 +118,7 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, struct libmemcached_io_vector_st vector[]= { { request.bytes, sizeof(request.bytes) }, - { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) }, + { memcached_array_string(instance->root->_namespace), memcached_array_size(instance->root->_namespace) }, { key, key_length } }; @@ -131,19 +129,19 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, memcached_io_reset(instance); } - if (ptr->number_of_replicas > 0) + if (instance->root->number_of_replicas > 0) { request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ; - for (uint32_t x= 0; x < ptr->number_of_replicas; ++x) + for (uint32_t x= 0; x < instance->root->number_of_replicas; ++x) { memcached_server_write_instance_st replica; ++server_key; - if (server_key == memcached_server_count(ptr)) + if (server_key == memcached_server_count(instance->root)) server_key= 0; - replica= memcached_server_instance_fetch(ptr, server_key); + replica= memcached_server_instance_fetch(instance->root, server_key); if (memcached_vdo(replica, vector, 3, flush) != MEMCACHED_SUCCESS) { @@ -183,63 +181,63 @@ memcached_return_t memcached_delete_by_key(memcached_st *ptr, return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("Memcached server version does not allow expiration of deleted items")); } + + uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); + bool buffering= memcached_is_buffering(instance->root); + bool reply= memcached_is_replying(instance->root); + // If a delete trigger exists, we need a response, so no buffering/noreply if (ptr->delete_trigger) { - if (ptr->flags.buffer_requests) + if (buffering) { return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("Delete triggers cannot be used if buffering is enabled")); } - if (ptr->flags.no_reply) + if (reply == false) { return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("Delete triggers cannot be used if MEMCACHED_BEHAVIOR_NOREPLY is set")); } } - - uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); - memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); - - bool to_write= (ptr->flags.buffer_requests) ? false : true; - - // Invert the logic to make it simpler to read the code - bool reply= (ptr->flags.no_reply) ? false : true; - - if (ptr->flags.binary_protocol) + if (memcached_is_binary(ptr)) { - rc= binary_delete(ptr, instance, server_key, key, key_length, reply, to_write); + rc= binary_delete(instance, server_key, key, key_length, reply, buffering ? false : true); } else { - rc= ascii_delete(ptr, instance, server_key, key, key_length, reply, to_write); + rc= ascii_delete(instance, server_key, key, key_length, reply, buffering ? false : true); } if (rc == MEMCACHED_SUCCESS) { - if (to_write == false) + if (buffering == true) { rc= MEMCACHED_BUFFERED; } - else if (reply) + else if (reply == false) + { + rc= MEMCACHED_SUCCESS; + } + else { char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); if (rc == MEMCACHED_DELETED) { rc= MEMCACHED_SUCCESS; + if (ptr->delete_trigger) + { + ptr->delete_trigger(ptr, key, key_length); + } } } - - if (rc == MEMCACHED_SUCCESS and ptr->delete_trigger) - { - ptr->delete_trigger(ptr, key, key_length); - } } LIBMEMCACHED_MEMCACHED_DELETE_END(); - return rc; + return memcached_set_error(*ptr, rc, MEMCACHED_AT ); } diff --git a/libmemcached/do.cc b/libmemcached/do.cc index 928e3272..a793268d 100644 --- a/libmemcached/do.cc +++ b/libmemcached/do.cc @@ -11,8 +11,8 @@ #include -memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr, - const struct libmemcached_io_vector_st *vector, +memcached_return_t memcached_vdo(memcached_server_write_instance_st instance, + libmemcached_io_vector_st *vector, const size_t count, const bool with_flush) { @@ -21,10 +21,10 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(count); WATCHPOINT_ASSERT(vector); - if (memcached_failed(rc= memcached_connect(ptr))) + if (memcached_failed(rc= memcached_connect(instance))) { WATCHPOINT_ERROR(rc); - assert_msg(ptr->error_messages, "memcached_connect() returned an error but the memcached_server_write_instance_st showed none."); + assert_msg(instance->error_messages, "memcached_connect() returned an error but the memcached_server_write_instance_st showed none."); return rc; } @@ -33,17 +33,19 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr, ** before they start writing, if there is any data in buffer, clear it out, ** otherwise we might get a partial write. **/ - if (memcached_is_udp(ptr->root) and with_flush and ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH) + if (memcached_is_udp(instance->root)) { - if (memcached_io_write(ptr) == false) + size_t write_length= io_vector_total_size(vector, 11) +UDP_DATAGRAM_HEADER_LENGTH; + + if (write_length > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) { - memcached_io_reset(ptr); - return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + return MEMCACHED_WRITE_FAILURE; } - } - ssize_t sent_length= memcached_io_writev(ptr, vector, count, with_flush); + return MEMCACHED_NOT_SUPPORTED; + } + ssize_t sent_length= memcached_io_writev(instance, vector, count, with_flush); size_t command_length= 0; for (uint32_t x= 0; x < count; ++x, vector++) { @@ -56,9 +58,9 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr, WATCHPOINT_ERROR(rc); WATCHPOINT_ERRNO(errno); } - else if ((ptr->root->flags.no_reply) == 0) + else if (memcached_is_replying(instance->root)) { - memcached_server_response_increment(ptr); + memcached_server_response_increment(instance); } return rc; diff --git a/libmemcached/do.hpp b/libmemcached/do.hpp index aac94912..a42d8678 100644 --- a/libmemcached/do.hpp +++ b/libmemcached/do.hpp @@ -38,6 +38,6 @@ #pragma once memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr, - const struct libmemcached_io_vector_st *vector, + libmemcached_io_vector_st *vector, const size_t count, const bool with_flush); diff --git a/libmemcached/exist.cc b/libmemcached/exist.cc index b68ba466..ff0c1b42 100644 --- a/libmemcached/exist.cc +++ b/libmemcached/exist.cc @@ -58,14 +58,20 @@ static memcached_return_t ascii_exist(memcached_st *memc, memcached_server_write rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); if (rc == MEMCACHED_NOTSTORED) + { rc= MEMCACHED_SUCCESS; + } if (rc == MEMCACHED_STORED) + { rc= MEMCACHED_NOTFOUND; + } } if (rc == MEMCACHED_WRITE_FAILURE) + { memcached_io_reset(instance); + } return rc; } diff --git a/libmemcached/flush.cc b/libmemcached/flush.cc index 45cb4586..99da07f4 100644 --- a/libmemcached/flush.cc +++ b/libmemcached/flush.cc @@ -67,7 +67,7 @@ static memcached_return_t memcached_flush_textual(memcached_st *ptr, time_t expiration) { // Invert the logic to make it simpler to read the code - bool reply= (ptr->flags.no_reply) ? false : true; + bool reply= memcached_is_replying(ptr); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; int send_length= 0; @@ -134,13 +134,13 @@ static memcached_return_t memcached_flush_binary(memcached_st *ptr, { memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); - if (ptr->flags.no_reply) + if (memcached_is_replying(ptr)) { - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; } else { - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ; } struct libmemcached_io_vector_st vector[]= diff --git a/libmemcached/include.am b/libmemcached/include.am index af49d2d5..a1e47ab9 100644 --- a/libmemcached/include.am +++ b/libmemcached/include.am @@ -42,6 +42,7 @@ noinst_HEADERS+= \ libmemcached/server.hpp \ libmemcached/server_instance.h \ libmemcached/string.hpp \ + libmemcached/udp.hpp \ libmemcached/virtual_bucket.h \ libmemcached/watchpoint.h @@ -98,6 +99,7 @@ libmemcached_libmemcached_la_SOURCES+= \ libmemcached/touch.cc \ libmemcached/verbosity.cc \ libmemcached/version.cc \ + libmemcached/udp.cc \ libmemcached/virtual_bucket.c libmemcached/options.cc: libmemcached/csl/parser.h diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 5e626b6a..2b0866dc 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -44,30 +44,6 @@ enum memc_read_or_write { MEM_WRITE }; -/* - * The udp request id consists of two seperate sections - * 1) The thread id - * 2) The message number - * The thread id should only be set when the memcached_st struct is created - * and should not be changed. - * - * The message num is incremented for each new message we send, this function - * extracts the message number from message_id, increments it and then - * writes the new value back into the header - */ -static void increment_udp_message_id(memcached_server_write_instance_st ptr) -{ - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - uint16_t cur_req= get_udp_datagram_request_id(header); - int msg_num= get_msg_num_from_request_id(cur_req); - int thread_id= get_thread_id_from_request_id(cur_req); - - if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) - msg_num= 0; - - header->request_id= htons((uint16_t) (thread_id | msg_num)); -} - /** * Try to fill the input buffer for a server with as much * data as possible. @@ -192,7 +168,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) } static memcached_return_t io_wait(memcached_server_write_instance_st ptr, - memc_read_or_write read_or_write) + const memc_read_or_write read_or_write) { struct pollfd fds; fds.fd= ptr->fd; @@ -835,7 +811,7 @@ void memcached_io_reset(memcached_server_write_instance_st ptr) */ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, void *dta, - size_t size) + const size_t size) { size_t offset= 0; char *data= static_cast(dta); @@ -918,18 +894,3 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } - -memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id) -{ - if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) - { - return MEMCACHED_FAILURE; - } - - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id))); - header->num_datagrams= htons(1); - header->sequence_number= htons(0); - - return MEMCACHED_SUCCESS; -} diff --git a/libmemcached/io.h b/libmemcached/io.h index 00bd0c11..c2711e28 100644 --- a/libmemcached/io.h +++ b/libmemcached/io.h @@ -38,26 +38,6 @@ #pragma once -#define MAX_UDP_DATAGRAM_LENGTH 1400 -#define UDP_DATAGRAM_HEADER_LENGTH 8 -#define UDP_REQUEST_ID_MSG_SIG_DIGITS 10 -#define UDP_REQUEST_ID_THREAD_MASK 0xFFFF << UDP_REQUEST_ID_MSG_SIG_DIGITS -#define get_udp_datagram_request_id(A) ntohs((A)->request_id) -#define get_udp_datagram_seq_num(A) ntohs((A)->sequence_number) -#define get_udp_datagram_num_datagrams(A) ntohs((A)->num_datagrams) -#define get_msg_num_from_request_id(A) ( (A) & (~(UDP_REQUEST_ID_THREAD_MASK)) ) -#define get_thread_id_from_request_id(A) ( (A) & (UDP_REQUEST_ID_THREAD_MASK) ) >> UDP_REQUEST_ID_MSG_SIG_DIGITS -#define generate_udp_request_thread_id(A) (A) << UDP_REQUEST_ID_MSG_SIG_DIGITS -#define UDP_REQUEST_ID_MAX_THREAD_ID get_thread_id_from_request_id(0xFFFF) - -struct udp_datagram_header_st -{ - uint16_t request_id; - uint16_t sequence_number; - uint16_t num_datagrams; - uint16_t reserved; -}; - struct libmemcached_io_vector_st { const void *buffer; diff --git a/libmemcached/io.hpp b/libmemcached/io.hpp index af18fe3a..d91150f8 100644 --- a/libmemcached/io.hpp +++ b/libmemcached/io.hpp @@ -67,10 +67,7 @@ void memcached_io_close(memcached_server_write_instance_st ptr); /* Read n bytes of data from the server and store them in dta */ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, void *dta, - size_t size); - -memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, - uint16_t thread_id); + const size_t size); memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc); diff --git a/libmemcached/is.h b/libmemcached/is.h index 9f8783d8..6f8241d9 100644 --- a/libmemcached/is.h +++ b/libmemcached/is.h @@ -40,9 +40,17 @@ /* These are private */ #define memcached_is_allocated(__object) ((__object)->options.is_allocated) #define memcached_is_udp(__object) ((__object)->flags.use_udp) +#define memcached_is_verify_key(__object) ((__object)->flags.verify_key) +#define memcached_is_binary(__object) ((__object)->flags.binary_protocol) #define memcached_is_initialized(__object) ((__object)->options.is_initialized) #define memcached_is_purging(__object) ((__object)->state.is_purging) #define memcached_is_processing_input(__object) ((__object)->state.is_processing_input) + +#define memcached_is_buffering(__object) ((__object)->flags.buffer_requests) +#define memcached_is_replying(__object) ((__object)->flags.reply) + +#define memcached_has_error(__object) ((__object)->error_messages) + #define memcached_set_purging(__object, __value) ((__object)->state.is_purging= (__value)) #define memcached_set_processing_input(__object, __value) ((__object)->state.is_processing_input= (__value)) #define memcached_set_initialized(__object, __value) ((__object)->options.is_initialized(= (__value)) diff --git a/libmemcached/key.cc b/libmemcached/key.cc index ea98c729..e9246881 100644 --- a/libmemcached/key.cc +++ b/libmemcached/key.cc @@ -47,25 +47,15 @@ memcached_return_t memcached_key_test(memcached_st &memc, return memcached_set_error(memc, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT); } - if (not memc.flags.verify_key) + // If we don't need to verify the key, or we are using the binary protoocol, + // we just check the size of the key + if (memc.flags.verify_key == false or memc.flags.binary_protocol == true) { for (uint32_t x= 0; x < number_of_keys; x++) { - memcached_return_t rc= memcached_validate_key_length(*(key_length +x), false); - if (memcached_failed(rc)) - { - return rc; - } - } - - return MEMCACHED_SUCCESS; - } - - if (memc.flags.binary_protocol) - { - for (uint32_t x= 0; x < number_of_keys; x++) - { - memcached_return_t rc= memcached_validate_key_length(*(key_length +x), false); + // We should set binary key, but the memcached server is broken for + // longer keys at the moment. + memcached_return_t rc= memcached_validate_key_length(*(key_length +x), false /* memc.flags.binary_protocol */); if (memcached_failed(rc)) { return rc; diff --git a/libmemcached/memcached.cc b/libmemcached/memcached.cc index 8c1e3acb..62052ada 100644 --- a/libmemcached/memcached.cc +++ b/libmemcached/memcached.cc @@ -40,31 +40,6 @@ #include #include -#if 0 -static const memcached_st global_copy= { - .state= { - .is_purging= false, // .is_purging - .is_processing_input= false, // is_processing_input - .is_time_for_rebuild= false, - }, - .flags= { - .auto_eject_hosts= false, - .binary_protocol= false, - .buffer_requests= false, - .hash_with_namespace= false, - .no_block= false, - .no_reply= false, - .randomize_replica_read= false, - .support_cas= false, - .tcp_nodelay= false, - .use_sort_hosts= false, - .use_udp= false, - .verify_key= false, - .tcp_keepalive= false, - }, -}; -#endif - static inline bool _memcached_init(memcached_st *self) { self->state.is_purging= false; @@ -76,7 +51,7 @@ static inline bool _memcached_init(memcached_st *self) self->flags.buffer_requests= false; self->flags.hash_with_namespace= false; self->flags.no_block= false; - self->flags.no_reply= false; + self->flags.reply= true; self->flags.randomize_replica_read= false; self->flags.support_cas= false; self->flags.tcp_nodelay= false; diff --git a/libmemcached/response.cc b/libmemcached/response.cc index b996ee10..3312fb7c 100644 --- a/libmemcached/response.cc +++ b/libmemcached/response.cc @@ -705,7 +705,7 @@ memcached_return_t memcached_response(memcached_server_write_instance_st ptr, uint64_t& numeric_value) { /* We may have old commands in the buffer not set, first purge */ - if ((ptr->root->flags.no_block) && (memcached_is_processing_input(ptr->root) == false)) + if ((ptr->root->flags.no_block) and (memcached_is_processing_input(ptr->root) == false)) { (void)memcached_io_write(ptr); } @@ -715,7 +715,7 @@ memcached_return_t memcached_response(memcached_server_write_instance_st ptr, * returned the last one. Purge all pending messages to ensure backwards * compatibility. */ - if (ptr->root->flags.binary_protocol == false) + if (memcached_is_binary(ptr->root) == false) { while (memcached_server_response_count(ptr) > 1) { diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index 041575b6..3e293232 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -74,9 +74,9 @@ static inline const char *storage_op_string(memcached_storage_action_t verb) return "set "; } -static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply) +static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply) { - if (noreply) + if (reply == false) { switch (verb) { @@ -124,22 +124,21 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, memcached_server_write_instance_st server, uint32_t server_key, const char *key, - size_t key_length, + const size_t key_length, const char *value, - size_t value_length, - time_t expiration, - uint32_t flags, - uint64_t cas, - bool flush, + const size_t value_length, + const time_t expiration, + const uint32_t flags, + const uint64_t cas, + const bool flush, + const bool reply, memcached_storage_action_t verb) { protocol_binary_request_set request= {}; size_t send_length= sizeof(request.bytes); - bool noreply= server->root->flags.no_reply; - request.message.header.request.magic= PROTOCOL_BINARY_REQ; - request.message.header.request.opcode= get_com_code(verb, noreply); + request.message.header.request.opcode= get_com_code(verb, reply); request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace))); request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; if (verb == APPEND_OP or verb == PREPEND_OP) @@ -161,20 +160,6 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, request.message.header.request.cas= memcached_htonll(cas); } - if (server->root->flags.use_udp and flush == false) - { - size_t cmd_size= send_length + key_length + value_length; - - if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) - { - return MEMCACHED_WRITE_FAILURE; - } - if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) - { - memcached_io_write(server); - } - } - struct libmemcached_io_vector_st vector[]= { { request.bytes, send_length }, @@ -189,7 +174,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, { memcached_io_reset(server); - if (ptr->error_messages == NULL) + if (memcached_has_error(ptr)) { memcached_set_error(*server, rc, MEMCACHED_AT); } @@ -197,7 +182,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, return MEMCACHED_WRITE_FAILURE; } - if (verb == SET_OP && ptr->number_of_replicas > 0) + if (verb == SET_OP and ptr->number_of_replicas > 0) { request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ; WATCHPOINT_STRING("replicating"); @@ -228,7 +213,8 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, return MEMCACHED_BUFFERED; } - if (noreply) + // No reply always assumes success + if (reply == false) { return MEMCACHED_SUCCESS; } @@ -246,11 +232,9 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, const uint32_t flags, const uint64_t cas, const bool flush, + const bool reply, const memcached_storage_action_t verb) { - // Invert the logic to make it simpler to read the code - bool reply= (ptr->flags.no_reply) ? false : true; - char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1]; int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags); if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0) @@ -302,43 +286,26 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, { memcached_literal_param("\r\n") } }; - if (memcached_is_udp(instance->root)) - { - size_t write_length= io_vector_total_size(vector, 11); - - size_t cmd_size= write_length + value_length +2; - if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) - { - return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - } - - if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) - { - memcached_io_write(instance); - } - } - /* Send command header */ memcached_return_t rc= memcached_vdo(instance, vector, 11, flush); if (rc == MEMCACHED_SUCCESS) { - if (ptr->flags.no_reply and flush) + if (flush == false) { - rc= MEMCACHED_SUCCESS; + return MEMCACHED_BUFFERED; } - else if (flush == false) + + if (reply == false) { - rc= MEMCACHED_BUFFERED; + return MEMCACHED_SUCCESS; } - else - { - char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); - if (rc == MEMCACHED_STORED) - { - rc= MEMCACHED_SUCCESS; - } + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_STORED) + { + return MEMCACHED_SUCCESS; } } @@ -347,9 +314,10 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, memcached_io_reset(instance); } - if (memcached_failed(rc) and ptr->error_messages == NULL) + assert(memcached_failed(rc)); + if (memcached_has_error(ptr) == false) { - memcached_set_error(*ptr, rc, MEMCACHED_AT); + return memcached_set_error(*ptr, rc, MEMCACHED_AT); } return rc; @@ -370,7 +338,7 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, return rc; } - if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol))) + if (memcached_failed(rc= memcached_validate_key_length(key_length, memcached_is_binary(ptr)))) { return rc; } @@ -386,23 +354,27 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, WATCHPOINT_SET(instance->io_wait_count.read= 0); WATCHPOINT_SET(instance->io_wait_count.write= 0); - bool flush= (bool) ((instance->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1); - if (ptr->flags.binary_protocol) + + bool flush= true; + if (memcached_is_buffering(instance->root) and verb == SET_OP) { - rc= memcached_send_binary(ptr, instance, server_key, - key, key_length, - value, value_length, expiration, - flags, cas, flush, verb); + flush= false; } - else + + bool reply= memcached_is_replying(ptr); + + if (memcached_is_binary(ptr)) { - rc= memcached_send_ascii(ptr, instance, - key, key_length, - value, value_length, expiration, - flags, cas, flush, verb); + return memcached_send_binary(ptr, instance, server_key, + key, key_length, + value, value_length, expiration, + flags, cas, flush, reply, verb); } - return rc; + return memcached_send_ascii(ptr, instance, + key, key_length, + value, value_length, expiration, + flags, cas, flush, reply, verb); } @@ -431,6 +403,11 @@ memcached_return_t memcached_add(memcached_st *ptr, rc= memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags, 0, ADD_OP); + + if (rc == MEMCACHED_NOTSTORED or rc == MEMCACHED_DATA_EXISTS) + { + memcached_set_error(*ptr, rc, MEMCACHED_AT); + } LIBMEMCACHED_MEMCACHED_ADD_END(); return rc; } @@ -546,11 +523,9 @@ memcached_return_t memcached_prepend_by_key(memcached_st *ptr, time_t expiration, uint32_t flags) { - memcached_return_t rc; - rc= memcached_send(ptr, group_key, group_key_length, - key, key_length, value, value_length, - expiration, flags, 0, PREPEND_OP); - return rc; + return memcached_send(ptr, group_key, group_key_length, + key, key_length, value, value_length, + expiration, flags, 0, PREPEND_OP); } memcached_return_t memcached_append_by_key(memcached_st *ptr, @@ -560,11 +535,9 @@ memcached_return_t memcached_append_by_key(memcached_st *ptr, time_t expiration, uint32_t flags) { - memcached_return_t rc; - rc= memcached_send(ptr, group_key, group_key_length, - key, key_length, value, value_length, - expiration, flags, 0, APPEND_OP); - return rc; + return memcached_send(ptr, group_key, group_key_length, + key, key_length, value, value_length, + expiration, flags, 0, APPEND_OP); } memcached_return_t memcached_cas_by_key(memcached_st *ptr, @@ -575,10 +548,8 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, uint32_t flags, uint64_t cas) { - memcached_return_t rc; - rc= memcached_send(ptr, group_key, group_key_length, - key, key_length, value, value_length, - expiration, flags, cas, CAS_OP); - return rc; + return memcached_send(ptr, group_key, group_key_length, + key, key_length, value, value_length, + expiration, flags, cas, CAS_OP); } diff --git a/libmemcached/udp.cc b/libmemcached/udp.cc new file mode 100644 index 00000000..905eedd7 --- /dev/null +++ b/libmemcached/udp.cc @@ -0,0 +1,77 @@ +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: + * + * LibMemcached + * + * Copyright (C) 2011 Data Differential, http://datadifferential.com/ + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * The names of its contributors may not be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +/* + * The udp request id consists of two seperate sections + * 1) The thread id + * 2) The message number + * The thread id should only be set when the memcached_st struct is created + * and should not be changed. + * + * The message num is incremented for each new message we send, this function + * extracts the message number from message_id, increments it and then + * writes the new value back into the header + */ +void increment_udp_message_id(memcached_server_write_instance_st ptr) +{ + struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; + uint16_t cur_req= get_udp_datagram_request_id(header); + int msg_num= get_msg_num_from_request_id(cur_req); + int thread_id= get_thread_id_from_request_id(cur_req); + + if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) + msg_num= 0; + + header->request_id= htons((uint16_t) (thread_id | msg_num)); +} + +bool memcached_io_init_udp_header(memcached_server_write_instance_st ptr, const uint16_t thread_id) +{ + if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) + { + return MEMCACHED_FAILURE; + } + + struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; + header->request_id= htons(uint16_t((generate_udp_request_thread_id(thread_id)))); + header->num_datagrams= htons(1); + header->sequence_number= htons(0); + + return MEMCACHED_SUCCESS; +} diff --git a/libmemcached/udp.hpp b/libmemcached/udp.hpp new file mode 100644 index 00000000..9cc53889 --- /dev/null +++ b/libmemcached/udp.hpp @@ -0,0 +1,59 @@ +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: + * + * LibMemcached + * + * Copyright (C) 2011 Data Differential, http://datadifferential.com/ + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * The names of its contributors may not be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#define MAX_UDP_DATAGRAM_LENGTH 1400 +#define UDP_DATAGRAM_HEADER_LENGTH 8 +#define UDP_REQUEST_ID_MSG_SIG_DIGITS 10 +#define UDP_REQUEST_ID_THREAD_MASK 0xFFFF << UDP_REQUEST_ID_MSG_SIG_DIGITS +#define get_udp_datagram_request_id(A) ntohs((A)->request_id) +#define get_udp_datagram_seq_num(A) ntohs((A)->sequence_number) +#define get_udp_datagram_num_datagrams(A) ntohs((A)->num_datagrams) +#define get_msg_num_from_request_id(A) ( (A) & (~(UDP_REQUEST_ID_THREAD_MASK)) ) +#define get_thread_id_from_request_id(A) ( (A) & (UDP_REQUEST_ID_THREAD_MASK) ) >> UDP_REQUEST_ID_MSG_SIG_DIGITS +#define generate_udp_request_thread_id(A) (A) << UDP_REQUEST_ID_MSG_SIG_DIGITS +#define UDP_REQUEST_ID_MAX_THREAD_ID get_thread_id_from_request_id(0xFFFF) + +struct udp_datagram_header_st +{ + uint16_t request_id; + uint16_t sequence_number; + uint16_t num_datagrams; + uint16_t reserved; +}; + +bool memcached_io_init_udp_header(memcached_server_write_instance_st ptr, const uint16_t thread_id); +void increment_udp_message_id(memcached_server_write_instance_st ptr); diff --git a/libmemcached/verbosity.cc b/libmemcached/verbosity.cc index e2aac3c7..0a5b9bbe 100644 --- a/libmemcached/verbosity.cc +++ b/libmemcached/verbosity.cc @@ -47,7 +47,7 @@ static memcached_return_t _set_verbosity(const memcached_st *, const memcached_server_st *server, void *context) { - const libmemcached_io_vector_st *execute= (const libmemcached_io_vector_st *)context; + libmemcached_io_vector_st *vector= (libmemcached_io_vector_st *)context; memcached_st local_memc; memcached_st *memc_ptr= memcached_create(&local_memc); @@ -59,7 +59,7 @@ static memcached_return_t _set_verbosity(const memcached_st *, memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc_ptr, 0); - rc= memcached_vdo(instance, execute, 3, true); + rc= memcached_vdo(instance, vector, 3, true); if (rc == MEMCACHED_SUCCESS) { diff --git a/tests/libmemcached-1.0/mem_functions.cc b/tests/libmemcached-1.0/mem_functions.cc index a4d424f0..83ea4460 100644 --- a/tests/libmemcached-1.0/mem_functions.cc +++ b/tests/libmemcached-1.0/mem_functions.cc @@ -112,7 +112,7 @@ static test_return_t pre_binary(memcached_st *memc) return TEST_SUCCESS; } -static bool return_value_based_on_buffering(memcached_st *memc) +static memcached_return_t return_value_based_on_buffering(memcached_st *memc) { if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS)) { @@ -394,7 +394,7 @@ static test_return_t clone_test(memcached_st *memc) test_true(memc_clone->ketama.weighted == memc->ketama.weighted); test_true(memc_clone->flags.binary_protocol == memc->flags.binary_protocol); test_true(memc_clone->flags.hash_with_namespace == memc->flags.hash_with_namespace); - test_true(memc_clone->flags.no_reply == memc->flags.no_reply); + test_true(memc_clone->flags.reply == memc->flags.reply); test_true(memc_clone->flags.use_udp == memc->flags.use_udp); test_true(memc_clone->flags.auto_eject_hosts == memc->flags.auto_eject_hosts); test_true(memc_clone->flags.randomize_replica_read == memc->flags.randomize_replica_read); @@ -530,32 +530,34 @@ static test_return_t set_test(memcached_st *memc) static test_return_t append_test(memcached_st *memc) { memcached_return_t rc; - const char *key= "fig"; const char *in_value= "we"; - char *out_value= NULL; size_t value_length; uint32_t flags; - rc= memcached_flush(memc, 0); - test_compare(MEMCACHED_SUCCESS, rc); - - rc= memcached_set(memc, key, strlen(key), - in_value, strlen(in_value), - (time_t)0, (uint32_t)0); - test_compare(MEMCACHED_SUCCESS, rc); + test_compare(MEMCACHED_SUCCESS, + memcached_flush(memc, 0)); - rc= memcached_append(memc, key, strlen(key), - " the", strlen(" the"), - (time_t)0, (uint32_t)0); - test_compare(MEMCACHED_SUCCESS, rc); + test_compare(MEMCACHED_SUCCESS, + memcached_set(memc, + test_literal_param(__func__), + in_value, strlen(in_value), + time_t(0), uint32_t(0))); - rc= memcached_append(memc, key, strlen(key), - " people", strlen(" people"), - (time_t)0, (uint32_t)0); - test_compare(MEMCACHED_SUCCESS, rc); + test_compare(MEMCACHED_SUCCESS, + memcached_append(memc, + test_literal_param(__func__), + " the", strlen(" the"), + time_t(0), uint32_t(0))); - out_value= memcached_get(memc, key, strlen(key), - &value_length, &flags, &rc); + test_compare(MEMCACHED_SUCCESS, + memcached_append(memc, + test_literal_param(__func__), + " people", strlen(" people"), + time_t(0), uint32_t(0))); + + char *out_value= memcached_get(memc, + test_literal_param(__func__), + &value_length, &flags, &rc); test_memcmp(out_value, "we the people", strlen("we the people")); test_compare(strlen("we the people"), value_length); test_compare(MEMCACHED_SUCCESS, rc); @@ -566,40 +568,40 @@ static test_return_t append_test(memcached_st *memc) static test_return_t append_binary_test(memcached_st *memc) { - memcached_return_t rc; - const char *key= "numbers"; uint32_t store_list[] = { 23, 56, 499, 98, 32847, 0 }; - uint32_t *value; - size_t value_length; - uint32_t flags; - uint32_t x; - rc= memcached_flush(memc, 0); - test_compare(MEMCACHED_SUCCESS, rc); + test_compare(MEMCACHED_SUCCESS, + memcached_flush(memc, 0)); - rc= memcached_set(memc, - key, strlen(key), - NULL, 0, - (time_t)0, (uint32_t)0); - test_compare_got(MEMCACHED_SUCCESS, rc, memcached_strerror(NULL, rc)); + test_compare(MEMCACHED_SUCCESS, + memcached_set(memc, + test_literal_param(__func__), + NULL, 0, + time_t(0), uint32_t(0))); - for (x= 0; store_list[x] ; x++) + size_t count= 0; + for (uint32_t x= 0; store_list[x] ; x++) { - rc= memcached_append(memc, - key, strlen(key), + test_compare(MEMCACHED_SUCCESS, + memcached_append(memc, + test_literal_param(__func__), (char *)&store_list[x], sizeof(uint32_t), - (time_t)0, (uint32_t)0); - test_compare(MEMCACHED_SUCCESS, rc); + time_t(0), uint32_t(0))); + count++; } - value= (uint32_t *)memcached_get(memc, key, strlen(key), - &value_length, &flags, &rc); - test_compare(value_length, sizeof(uint32_t) * x); + size_t value_length; + uint32_t flags; + memcached_return_t rc; + uint32_t *value= (uint32_t *)memcached_get(memc, + test_literal_param(__func__), + &value_length, &flags, &rc); + test_compare(value_length, sizeof(uint32_t) * count); test_compare(MEMCACHED_SUCCESS, rc); - for (uint32_t counter= x, *ptr= value; counter; counter--) + for (uint32_t counter= count, *ptr= value; counter; counter--) { - test_compare(*ptr, store_list[x - counter]); + test_compare(*ptr, store_list[count - counter]); ptr++; } free(value); @@ -609,34 +611,31 @@ static test_return_t append_binary_test(memcached_st *memc) static test_return_t cas2_test(memcached_st *memc) { - memcached_return_t rc; const char *keys[]= {"fudge", "son", "food"}; size_t key_length[]= {5, 3, 4}; const char *value= "we the people"; size_t value_length= strlen("we the people"); - memcached_result_st results_obj; - memcached_result_st *results; - unsigned int set= 1; test_compare(MEMCACHED_SUCCESS, memcached_flush(memc, 0)); - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, set); + test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, true)); for (uint32_t x= 0; x < 3; x++) { - rc= memcached_set(memc, keys[x], key_length[x], - keys[x], key_length[x], - (time_t)50, (uint32_t)9); - test_compare(MEMCACHED_SUCCESS, rc); + test_compare(MEMCACHED_SUCCESS, + memcached_set(memc, keys[x], key_length[x], + keys[x], key_length[x], + time_t(50), uint32_t(9))); } test_compare(MEMCACHED_SUCCESS, memcached_mget(memc, keys, key_length, 3)); - results= memcached_result_create(memc, &results_obj); + memcached_result_st *results= memcached_result_create(memc, NULL); test_true(results); - results= memcached_fetch_result(memc, &results_obj, &rc); + memcached_return_t rc; + results= memcached_fetch_result(memc, results, &rc); test_true(results); test_true(results->item_cas); test_compare(MEMCACHED_SUCCESS, rc); @@ -646,38 +645,32 @@ static test_return_t cas2_test(memcached_st *memc) test_compare(strlen("we the people"), value_length); test_compare(MEMCACHED_SUCCESS, rc); - memcached_result_free(&results_obj); + memcached_result_free(results); return TEST_SUCCESS; } static test_return_t cas_test(memcached_st *memc) { - const char *key= "fun"; - size_t key_length= strlen(key); - const char *value= "we the people"; - const char* keys[2] = { key, NULL }; - size_t keylengths[2] = { strlen(key), 0 }; - size_t value_length= strlen(value); - const char *value2= "change the value"; - size_t value2_length= strlen(value2); + const char* keys[2] = { __func__, NULL }; + size_t keylengths[2] = { strlen(__func__), 0 }; memcached_result_st results_obj; - memcached_result_st *results; test_compare(MEMCACHED_SUCCESS, memcached_flush(memc, 0)); test_skip(true, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, true)); test_compare(MEMCACHED_SUCCESS, - memcached_set(memc, key, strlen(key), - value, strlen(value), + memcached_set(memc, + test_literal_param(__func__), + test_literal_param("we the people"), (time_t)0, (uint32_t)0)); test_compare(MEMCACHED_SUCCESS, memcached_mget(memc, keys, keylengths, 1)); - results= memcached_result_create(memc, &results_obj); + memcached_result_st *results= memcached_result_create(memc, &results_obj); test_true(results); memcached_return_t rc; @@ -685,10 +678,11 @@ static test_return_t cas_test(memcached_st *memc) test_true(results); test_compare(MEMCACHED_SUCCESS, rc); test_true(memcached_result_cas(results)); - test_memcmp(value, memcached_result_value(results), value_length); - test_compare(strlen(memcached_result_value(results)), value_length); - test_compare(MEMCACHED_SUCCESS, rc); - uint64_t cas = memcached_result_cas(results); + test_memcmp("we the people", memcached_result_value(results), test_literal_param_size("we the people")); + test_compare(test_literal_param_size("we the people"), + strlen(memcached_result_value(results))); + + uint64_t cas= memcached_result_cas(results); #if 0 results= memcached_fetch_result(memc, &results_obj, &rc); @@ -696,15 +690,21 @@ static test_return_t cas_test(memcached_st *memc) test_true(results == NULL); #endif - rc= memcached_cas(memc, key, key_length, value2, value2_length, 0, 0, cas); - test_compare(MEMCACHED_SUCCESS, rc); + test_compare(MEMCACHED_SUCCESS, + memcached_cas(memc, + test_literal_param(__func__), + test_literal_param("change the value"), + 0, 0, cas)); /* * The item will have a new cas value, so try to set it again with the old * value. This should fail! */ - rc= memcached_cas(memc, key, key_length, value2, value2_length, 0, 0, cas); - test_compare(MEMCACHED_DATA_EXISTS, rc); + test_compare(MEMCACHED_DATA_EXISTS, + memcached_cas(memc, + test_literal_param(__func__), + test_literal_param("change the value"), + 0, 0, cas)); memcached_result_free(&results_obj); @@ -753,31 +753,21 @@ static test_return_t prepend_test(memcached_st *memc) */ static test_return_t add_test(memcached_st *memc) { - memcached_return_t rc; - const char *key= "foo"; - const char *value= "when we sanitize"; - unsigned long long setting_value; - - setting_value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NO_BLOCK); + test_compare_hint(return_value_based_on_buffering(memc), + memcached_set(memc, + test_literal_param(__func__), + test_literal_param("when we sanitize"), + time_t(0), uint32_t(0)), + memcached_last_error_message(memc)); - rc= memcached_set(memc, key, strlen(key), - value, strlen(value), - (time_t)0, (uint32_t)0); - test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); memcached_quit(memc); - rc= memcached_add(memc, key, strlen(key), - value, strlen(value), - (time_t)0, (uint32_t)0); - /* Too many broken OS'es have broken loopback in async, so we can't be sure of the result */ - if (setting_value) - { - test_true(rc == MEMCACHED_NOTSTORED or rc == MEMCACHED_STORED); - } - else - { - test_true(rc == MEMCACHED_NOTSTORED or rc == MEMCACHED_DATA_EXISTS); - } + test_compare_hint(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) ? MEMCACHED_DATA_EXISTS : MEMCACHED_NOTSTORED, + memcached_add(memc, + test_literal_param(__func__), + test_literal_param("try something else"), + time_t(0), uint32_t(0)), + memcached_last_error_message(memc)); return TEST_SUCCESS; } @@ -807,20 +797,17 @@ static test_return_t add_wrapper(memcached_st *memc) static test_return_t replace_test(memcached_st *memc) { - memcached_return_t rc; - const char *key= "foo"; - const char *value= "when we sanitize"; - const char *original= "first we insert some data"; - - rc= memcached_set(memc, key, strlen(key), - original, strlen(original), - (time_t)0, (uint32_t)0); - test_true(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED); + test_compare(return_value_based_on_buffering(memc), + memcached_set(memc, + test_literal_param(__func__), + test_literal_param("when we sanitize"), + time_t(0), uint32_t(0))); test_compare(MEMCACHED_SUCCESS, - memcached_replace(memc, key, strlen(key), - value, strlen(value), - (time_t)0, (uint32_t)0)); + memcached_replace(memc, + test_literal_param(__func__), + test_literal_param("first we insert some data"), + time_t(0), uint32_t(0))); return TEST_SUCCESS; } @@ -833,11 +820,10 @@ static test_return_t delete_test(memcached_st *memc) test_literal_param("when we sanitize"), time_t(0), uint32_t(0))); - memcached_return_t rc= memcached_delete(memc, - test_literal_param(__func__), - time_t(0)); - test_compare_hint(MEMCACHED_SUCCESS, - rc, + test_compare_hint(return_value_based_on_buffering(memc), + memcached_delete(memc, + test_literal_param(__func__), + time_t(0)), memcached_last_error_message(memc)); return TEST_SUCCESS; @@ -853,13 +839,11 @@ static test_return_t flush_test(memcached_st *memc) return TEST_SUCCESS; } -static memcached_return_t server_function(const memcached_st *ptr, - const memcached_server_st *server, - void *context) +static memcached_return_t server_function(const memcached_st *, + const memcached_server_st *, + void *) { - (void)ptr; (void)server; (void)context; /* Do Nothing */ - return MEMCACHED_SUCCESS; } @@ -879,7 +863,6 @@ static test_return_t bad_key_test(memcached_st *memc) memcached_return_t rc; const char *key= "foo bad"; uint32_t flags; - memcached_st *memc_clone; uint64_t query_id= memcached_query_id(memc); @@ -888,7 +871,7 @@ static test_return_t bad_key_test(memcached_st *memc) test_compare(query_id, memcached_query_id(memc)); // We should not increase the query_id for memcached_behavior_get() - memc_clone= memcached_clone(NULL, memc); + memcached_st *memc_clone= memcached_clone(NULL, memc); test_true(memc_clone); query_id= memcached_query_id(memc_clone); @@ -1001,15 +984,14 @@ static memcached_return_t read_through_trigger(memcached_st *memc, static test_return_t read_through(memcached_st *memc) { - memcached_return_t rc; - const char *key= "foo"; - char *string; - size_t string_length; - uint32_t flags; memcached_trigger_key_fn cb= (memcached_trigger_key_fn)read_through_trigger; - string= memcached_get(memc, key, strlen(key), - &string_length, &flags, &rc); + size_t string_length; + uint32_t flags; + memcached_return_t rc; + char *string= memcached_get(memc, + test_literal_param(__func__), + &string_length, &flags, &rc); test_compare(MEMCACHED_NOTFOUND, rc); test_false(string_length); @@ -1018,7 +1000,8 @@ static test_return_t read_through(memcached_st *memc) rc= memcached_callback_set(memc, MEMCACHED_CALLBACK_GET_FAILURE, *(void **)&cb); test_compare(MEMCACHED_SUCCESS, rc); - string= memcached_get(memc, key, strlen(key), + string= memcached_get(memc, + test_literal_param(__func__), &string_length, &flags, &rc); test_compare(MEMCACHED_SUCCESS, rc); @@ -1027,7 +1010,8 @@ static test_return_t read_through(memcached_st *memc) test_strcmp(READ_THROUGH_VALUE, string); free(string); - string= memcached_get(memc, key, strlen(key), + string= memcached_get(memc, + test_literal_param(__func__), &string_length, &flags, &rc); test_compare(MEMCACHED_SUCCESS, rc); @@ -1043,17 +1027,19 @@ static test_return_t read_through(memcached_st *memc) static test_return_t get_test(memcached_st *memc) { memcached_return_t rc; - const char *key= "foo"; char *string; size_t string_length; uint32_t flags; uint64_t query_id= memcached_query_id(memc); - rc= memcached_delete(memc, key, strlen(key), (time_t)0); + rc= memcached_delete(memc, + test_literal_param(__func__), + time_t(0)); test_true_got(rc == MEMCACHED_BUFFERED || rc == MEMCACHED_NOTFOUND, memcached_last_error_message(memc)); test_compare(query_id +1, memcached_query_id(memc)); - string= memcached_get(memc, key, strlen(key), + string= memcached_get(memc, + test_literal_param(__func__), &string_length, &flags, &rc); test_compare_got(MEMCACHED_NOTFOUND, rc, memcached_strerror(NULL, rc)); @@ -1065,14 +1051,14 @@ static test_return_t get_test(memcached_st *memc) static test_return_t get_test2(memcached_st *memc) { - const char *key= "foo"; const char *value= "when we sanitize"; uint64_t query_id= memcached_query_id(memc); - memcached_return_t rc= memcached_set(memc, key, strlen(key), - value, strlen(value), - (time_t)0, (uint32_t)0); - test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); + test_compare(return_value_based_on_buffering(memc), + memcached_set(memc, + test_literal_param(__func__), + value, strlen(value), + time_t(0), uint32_t(0))); test_compare(query_id +1, memcached_query_id(memc)); query_id= memcached_query_id(memc); @@ -1080,7 +1066,9 @@ static test_return_t get_test2(memcached_st *memc) uint32_t flags; size_t string_length; - char *string= memcached_get(memc, key, strlen(key), + memcached_return_t rc; + char *string= memcached_get(memc, + test_literal_param(__func__), &string_length, &flags, &rc); test_compare(query_id +1, memcached_query_id(memc)); @@ -1099,11 +1087,11 @@ static test_return_t set_test2(memcached_st *memc) { for (uint32_t x= 0; x < 10; x++) { - memcached_return_t rc= memcached_set(memc, - test_literal_param("foo"), - test_literal_param("train in the brain"), - time_t(0), uint32_t(0)); - test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); + test_compare(return_value_based_on_buffering(memc), + memcached_set(memc, + test_literal_param("foo"), + test_literal_param("train in the brain"), + time_t(0), uint32_t(0))); } return TEST_SUCCESS; @@ -1128,10 +1116,11 @@ static test_return_t set_test3(memcached_st *memc) snprintf(key, sizeof(key), "foo%u", x); uint64_t query_id= memcached_query_id(memc); - memcached_return_t rc= memcached_set(memc, key, strlen(key), - &value[0], value.size(), - (time_t)0, (uint32_t)0); - test_true_got(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED, memcached_strerror(NULL, rc)); + test_compare_hint(return_value_based_on_buffering(memc), + memcached_set(memc, key, strlen(key), + &value[0], value.size(), + time_t(0), uint32_t(0)), + memcached_last_error_message(memc)); test_compare(query_id +1, memcached_query_id(memc)); } @@ -1140,7 +1129,6 @@ static test_return_t set_test3(memcached_st *memc) static test_return_t get_test3(memcached_st *memc) { - const char *key= "foo"; size_t value_length= 8191; std::vector value; @@ -1150,20 +1138,23 @@ static test_return_t get_test3(memcached_st *memc) value.push_back(char(x % 127)); } - memcached_return_t rc; - rc= memcached_set(memc, key, strlen(key), - &value[0], value.size(), - (time_t)0, (uint32_t)0); - test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); + test_compare_hint(return_value_based_on_buffering(memc), + memcached_set(memc, + test_literal_param(__func__), + &value[0], value.size(), + time_t(0), uint32_t(0)), + memcached_last_error_message(memc)); size_t string_length; uint32_t flags; - char *string= memcached_get(memc, key, strlen(key), + memcached_return_t rc; + char *string= memcached_get(memc, + test_literal_param(__func__), &string_length, &flags, &rc); test_compare(MEMCACHED_SUCCESS, rc); test_true(string); - test_compare(string_length, value_length); + test_compare(value.size(), string_length); test_memcmp(string, &value[0], string_length); free(string); @@ -1173,7 +1164,6 @@ static test_return_t get_test3(memcached_st *memc) static test_return_t get_test4(memcached_st *memc) { - const char *key= "foo"; size_t value_length= 8191; std::vector value; @@ -1183,21 +1173,25 @@ static test_return_t get_test4(memcached_st *memc) value.push_back(char(x % 127)); } - memcached_return_t rc= memcached_set(memc, key, strlen(key), - &value[0], value.size(), - (time_t)0, (uint32_t)0); - test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); + test_compare_hint(return_value_based_on_buffering(memc), + memcached_set(memc, + test_literal_param(__func__), + &value[0], value.size(), + time_t(0), uint32_t(0)), + memcached_last_error_message(memc)); for (uint32_t x= 0; x < 10; x++) { uint32_t flags; size_t string_length; - char *string= memcached_get(memc, key, strlen(key), + memcached_return_t rc; + char *string= memcached_get(memc, + test_literal_param(__func__), &string_length, &flags, &rc); test_compare(MEMCACHED_SUCCESS, rc); test_true(string); - test_compare(string_length, value_length); + test_compare(value.size(), string_length); test_memcmp(string, &value[0], string_length); free(string); } @@ -1221,14 +1215,18 @@ static test_return_t get_test5(memcached_st *memc) uint32_t flags; size_t rlen; - memcached_return_t rc= memcached_set(memc, keys[0], lengths[0], - keys[0], lengths[0], 0, 0); + test_compare_hint(return_value_based_on_buffering(memc), + memcached_set(memc, keys[0], lengths[0], + keys[0], lengths[0], + time_t(0), uint32_t(0)), + memcached_last_error_message(memc)); test_compare(MEMCACHED_SUCCESS, memcached_mget(memc, keys, lengths, test_array_length(keys))); memcached_result_st results_obj; memcached_result_st *results= memcached_result_create(memc, &results_obj); test_true(results); + memcached_return_t rc; results= memcached_fetch_result(memc, &results_obj, &rc); test_true(results); @@ -1260,7 +1258,11 @@ static test_return_t mget_end(memcached_st *memc) // Set foo and foo2 for (size_t x= 0; x < test_array_length(keys); x++) { - test_compare(MEMCACHED_SUCCESS, memcached_set(memc, keys[x], lengths[x], values[x], strlen(values[x]), (time_t)0, (uint32_t)0)); + test_compare(MEMCACHED_SUCCESS, + memcached_set(memc, + keys[x], lengths[x], + values[x], strlen(values[x]), + time_t(0), uint32_t(0))); } char *string; @@ -1268,7 +1270,10 @@ static test_return_t mget_end(memcached_st *memc) uint32_t flags; // retrieve both via mget - test_compare(MEMCACHED_SUCCESS, memcached_mget(memc, keys, lengths, test_array_length(keys))); + test_compare(MEMCACHED_SUCCESS, + memcached_mget(memc, + keys, lengths, + test_array_length(keys))); char key[MEMCACHED_MAX_KEY]; size_t key_length; @@ -1296,8 +1301,8 @@ static test_return_t mget_end(memcached_st *memc) test_null(string); // now get just one - rc= memcached_mget(memc, keys, lengths, 1); - test_compare(MEMCACHED_SUCCESS, rc); + test_compare(MEMCACHED_SUCCESS, + memcached_mget(memc, keys, lengths, 1)); string= memcached_fetch(memc, key, &key_length, &string_length, &flags, &rc); test_compare(key_length, lengths[0]); diff --git a/tests/mem_udp.cc b/tests/mem_udp.cc index 227cea63..590ad7b9 100644 --- a/tests/mem_udp.cc +++ b/tests/mem_udp.cc @@ -48,6 +48,7 @@ using namespace libtest; #include #include #include +#include #include #include @@ -205,13 +206,13 @@ static test_return_t set_udp_behavior_test(memcached_st *memc) test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_DISTRIBUTION, memc->distribution)); test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, true)); - test_true(memc->flags.use_udp); - test_true(memc->flags.no_reply); + test_compare(true, memc->flags.use_udp); + test_compare(false, memc->flags.reply); test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, false)); - test_false(memc->flags.use_udp); + test_compare(false, memc->flags.use_udp); test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, false)); - test_false(memc->flags.no_reply); + test_compare(true, memc->flags.reply); return TEST_SUCCESS; } @@ -237,29 +238,23 @@ static test_return_t udp_set_test(memcached_st *memc) memcached_server_instance_st instance= memcached_server_instance_by_position(memc, server_key); size_t init_offset= instance->write_buffer_offset; - memcached_return_t rc= memcached_set(memc, test_literal_param("foo"), - test_literal_param("when we sanitize"), - time_t(0), uint32_t(0)); - test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); - /** NB, the check below assumes that if new write_ptr is less than - * the original write_ptr that we have flushed. For large payloads, this - * maybe an invalid assumption, but for the small payload we have it is OK - */ - if (rc == MEMCACHED_SUCCESS or instance->write_buffer_offset < init_offset) + test_compare_hint(MEMCACHED_SUCCESS, + memcached_set(memc, + test_literal_param("foo"), + test_literal_param("when we sanitize"), + time_t(0), uint32_t(0)), + memcached_last_error_message(memc)); + + /* + NB, the check below assumes that if new write_ptr is less than + the original write_ptr that we have flushed. For large payloads, this + maybe an invalid assumption, but for the small payload we have it is OK + */ + if (instance->write_buffer_offset < init_offset) { increment_request_id(&expected_ids[server_key]); } - if (rc == MEMCACHED_SUCCESS) - { - test_true(instance->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH); - } - else - { - test_true(instance->write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH); - test_true(instance->write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH); - } - test_compare(TEST_SUCCESS, post_udp_op_check(memc, expected_ids)); } @@ -269,7 +264,8 @@ static test_return_t udp_set_test(memcached_st *memc) static test_return_t udp_buffered_set_test(memcached_st *memc) { test_true(memc); - test_compare(MEMCACHED_INVALID_ARGUMENTS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, true)); + test_compare(MEMCACHED_INVALID_ARGUMENTS, + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, true)); return TEST_SUCCESS; } @@ -305,23 +301,14 @@ static test_return_t udp_delete_test(memcached_st *memc) memcached_server_instance_st instance= memcached_server_instance_by_position(memc, server_key); size_t init_offset= instance->write_buffer_offset; - memcached_return_t rc= memcached_delete(memc, test_literal_param("foo"), 0); - test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); + test_compare(MEMCACHED_SUCCESS, + memcached_delete(memc, test_literal_param("foo"), 0)); - if (rc == MEMCACHED_SUCCESS or instance->write_buffer_offset < init_offset) + if (instance->write_buffer_offset < init_offset) { increment_request_id(&expected_ids[server_key]); } - if (rc == MEMCACHED_SUCCESS) - { - test_true(instance->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH); - } - else - { - test_true(instance->write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH); - test_true(instance->write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH); - } test_compare(TEST_SUCCESS, post_udp_op_check(memc, expected_ids)); } @@ -373,9 +360,10 @@ static test_return_t udp_flush_test(memcached_st *memc) static test_return_t udp_incr_test(memcached_st *memc) { - test_compare(MEMCACHED_SUCCESS, memcached_set(memc, test_literal_param("incr"), - test_literal_param("1"), - (time_t)0, (uint32_t)0)); + test_compare(MEMCACHED_SUCCESS, + memcached_set(memc, test_literal_param("incr"), + test_literal_param("1"), + (time_t)0, (uint32_t)0)); Expected expected_ids; get_udp_request_ids(memc, expected_ids); @@ -391,19 +379,23 @@ static test_return_t udp_incr_test(memcached_st *memc) static test_return_t udp_decr_test(memcached_st *memc) { - test_compare(MEMCACHED_SUCCESS, memcached_set(memc, - test_literal_param("decr"), - test_literal_param("1"), - (time_t)0, (uint32_t)0)); + test_compare(MEMCACHED_SUCCESS, + memcached_set(memc, + test_literal_param(__func__), + test_literal_param("1"), + (time_t)0, (uint32_t)0)); Expected expected_ids; get_udp_request_ids(memc, expected_ids); - unsigned int server_key= memcached_generate_hash(memc, test_literal_param("decr")); + unsigned int server_key= memcached_generate_hash(memc, + test_literal_param(__func__)); increment_request_id(&expected_ids[server_key]); uint64_t newvalue; - test_compare(MEMCACHED_SUCCESS, memcached_decrement(memc, test_literal_param("decr"), 1, &newvalue)); + test_compare(MEMCACHED_SUCCESS, memcached_decrement(memc, + test_literal_param(__func__), + 1, &newvalue)); return post_udp_op_check(memc, expected_ids); } @@ -427,7 +419,8 @@ static test_return_t udp_version_test(memcached_st *memc) Expected expected_ids; get_udp_request_ids(memc, expected_ids); - test_compare(MEMCACHED_NOT_SUPPORTED, memcached_version(memc)); + test_compare(MEMCACHED_NOT_SUPPORTED, + memcached_version(memc)); return post_udp_op_check(memc, expected_ids); } -- 2.30.2