From: Michael Wallner Date: Tue, 20 Apr 2021 16:39:30 +0000 (+0200) Subject: libmemcached: add MEMCACHED_BEHAVIOR_META_PROTOCOL X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=5a6a6ab8eb7b4ad34b335e812fae6dbccc489f48;p=awesomized%2Flibmemcached libmemcached: add MEMCACHED_BEHAVIOR_META_PROTOCOL --- diff --git a/ChangeLog-1.2.md b/ChangeLog-1.2.md new file mode 100644 index 00000000..d9d685bb --- /dev/null +++ b/ChangeLog-1.2.md @@ -0,0 +1,9 @@ +# ChangeLog v1.2 + +## v 1.2.0-beta1 + +> released TBD + +* Initial transparent support for the memcached META protocol: + * Add `MEMCACHED_BEHAVIOR_META_PROTOCOL` enabling the META protocol. + * Add `-m` switch to client programs enabling the META protocol. diff --git a/ChangeLog.md b/ChangeLog.md index a78d08eb..69621785 120000 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1 +1 @@ -ChangeLog-1.1.md \ No newline at end of file +ChangeLog-1.2.md \ No newline at end of file diff --git a/docs/source/ChangeLog-1.1.rst b/docs/source/ChangeLog-1.1.rst index 635e0619..bd54515a 100644 --- a/docs/source/ChangeLog-1.1.rst +++ b/docs/source/ChangeLog-1.1.rst @@ -96,10 +96,6 @@ v 1.1.0-beta1 released 2020-12-21 -**NOTE:**\ :raw-html-m2r:`
` -This is a bug fix release, not a feature release. The minor version number -was incremented due to the following changes: - * Ported build system to CMake. * Ported test suite to Catch2. diff --git a/docs/source/ChangeLog-1.2.rst b/docs/source/ChangeLog-1.2.rst new file mode 100644 index 00000000..a9175a64 --- /dev/null +++ b/docs/source/ChangeLog-1.2.rst @@ -0,0 +1,17 @@ + +ChangeLog v1.2 +============== + +v 1.2.0-beta1 +------------- + +.. + + released TBD + + + +* Initial transparent support for the memcached META protocol: + + * Add ``MEMCACHED_BEHAVIOR_META_PROTOCOL`` enabling the META protocol. + * Add ``-m`` switch to client programs enabling the META protocol. diff --git a/docs/source/changelogs.rst b/docs/source/changelogs.rst index bc6d19a7..78e65aec 100644 --- a/docs/source/changelogs.rst +++ b/docs/source/changelogs.rst @@ -3,10 +3,11 @@ Change Logs .. toctree:: - ChangeLog-1.1 + ChangeLog-1.2 .. toctree:: :maxdepth: 1 + ChangeLog-1.1 ChangeLog-1.0 ChangeLog-0 diff --git a/docs/source/libmemcached/memcached_behavior.rst b/docs/source/libmemcached/memcached_behavior.rst index 99f3268c..5d3933c4 100644 --- a/docs/source/libmemcached/memcached_behavior.rst +++ b/docs/source/libmemcached/memcached_behavior.rst @@ -179,6 +179,12 @@ SYNOPSIS Enable the use of the binary protocol. Please note that you cannot toggle this flag on an open connection. + .. enumerator:: MEMCACHED_BEHAVIOR_META_PROTOCOL + + Enable the use of the META protocol. This setting can be switched on and + off at will when using an ASCII protocol connection, but causes + a reconnect when using the binary protocol. + .. enumerator:: MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK Set this value to tune the number of messages that may be sent before diff --git a/include/libmemcached-1.0/struct/memcached.h b/include/libmemcached-1.0/struct/memcached.h index a81b119b..1615bf08 100644 --- a/include/libmemcached-1.0/struct/memcached.h +++ b/include/libmemcached-1.0/struct/memcached.h @@ -43,7 +43,7 @@ struct memcached_st { bool tcp_keepalive : 1; bool is_aes : 1; bool is_fetching_version : 1; - bool not_used : 1; + bool meta_protocol : 1; } flags; memcached_server_distribution_t distribution; diff --git a/include/libmemcached-1.0/types/behavior.h b/include/libmemcached-1.0/types/behavior.h index 4e6b1ec1..43dd130e 100644 --- a/include/libmemcached-1.0/types/behavior.h +++ b/include/libmemcached-1.0/types/behavior.h @@ -54,6 +54,7 @@ enum memcached_behavior_t { MEMCACHED_BEHAVIOR_REMOVE_FAILED_SERVERS, MEMCACHED_BEHAVIOR_DEAD_TIMEOUT, MEMCACHED_BEHAVIOR_SERVER_TIMEOUT_LIMIT, + MEMCACHED_BEHAVIOR_META_PROTOCOL, MEMCACHED_BEHAVIOR_MAX }; diff --git a/src/bin/common/options.hpp b/src/bin/common/options.hpp index 93275b33..28c02e3b 100644 --- a/src/bin/common/options.hpp +++ b/src/bin/common/options.hpp @@ -126,6 +126,17 @@ public: } return true; }; + def("meta", 'm', no_argument, "Use the text based meta memcached protocol.") + .apply = [](const client_options &opt_, const extended_option &ext, memcached_st *memc) { + if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_META_PROTOCOL, ext.set)) { + if (!opt_.isset("quiet")) { + std::cerr << memcached_last_error_message(memc); + } + return false; + } + return true; + }; + def("buffer", 'B', no_argument, "Buffer requests.") .apply = [](const client_options &opt, const extended_option &ext, memcached_st *memc) { if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, ext.set)) { diff --git a/src/bin/memslap.cc b/src/bin/memslap.cc index 10e50b47..e041d035 100644 --- a/src/bin/memslap.cc +++ b/src/bin/memslap.cc @@ -242,7 +242,7 @@ int main(int argc, char *argv[]) { opt.add("noreply", 'R', no_argument, "Enable the NOREPLY behavior for storage commands.") .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) { if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, ext.set)) { - if(!opt_.isset("quiet")) { + if (!opt_.isset("quiet")) { std::cerr << memcached_last_error_message(memc); } return false; diff --git a/src/libmemcached/auto.cc b/src/libmemcached/auto.cc index d511d3f3..e1bb4d3b 100644 --- a/src/libmemcached/auto.cc +++ b/src/libmemcached/auto.cc @@ -39,6 +39,47 @@ static void auto_response(memcached_instance_st *instance, const bool reply, mem } } +static memcached_return_t meta_incr_decr(memcached_instance_st *instance, bool is_incr, bool w_init, + const char *key, size_t key_len, + uint64_t offset, uint64_t initial, uint32_t expiration) { + char new_buf[32] = " N", inl_buf[32] = " J", dlt_buf[32] = " D", exp_buf[32] = " T"; + size_t new_len = strlen(new_buf), inl_len = strlen(inl_buf), dlt_len = strlen(dlt_buf), exp_len = strlen(exp_buf); + size_t io_num = 0; + libmemcached_io_vector_st io_vec[10] = {}; + + io_vec[io_num++] = {memcached_literal_param("ma ")}; + io_vec[io_num++] = {memcached_array_string(instance->root->_namespace), + memcached_array_size(instance->root->_namespace)}, + io_vec[io_num++] = {key, key_len}; + + if (!is_incr) { + io_vec[io_num++] = {memcached_literal_param(" MD")}; + } + if (w_init) { + new_len += snprintf(new_buf + new_len, sizeof(new_buf) - new_len, "%" PRIu32, expiration); + io_vec[io_num++] = {new_buf, new_len}; + inl_len += snprintf(inl_buf + inl_len, sizeof(inl_buf) - inl_len, "%" PRIu64, initial); + io_vec[io_num++] = {inl_buf, inl_len}; + } + if (offset != 1) { + dlt_len += snprintf(dlt_buf + dlt_len, sizeof(dlt_buf) - dlt_len, "%" PRIu64, offset); + io_vec[io_num++] = {dlt_buf, dlt_len}; + } + if (expiration) { + exp_len += snprintf(exp_buf + exp_len, sizeof(exp_buf) - exp_len, "%" PRIu32, expiration); + io_vec[io_num++] = {exp_buf, exp_len}; + } + + if (memcached_is_replying(instance->root)) { + io_vec[io_num++] = {memcached_literal_param(" v")}; + } else { + io_vec[io_num++] = {memcached_literal_param(" q")}; + } + io_vec[io_num++] = {memcached_literal_param(" O+\r\n")}; + + return memcached_vdo(instance, io_vec, io_num, true); +} + static memcached_return_t text_incr_decr(memcached_instance_st *instance, const bool is_incr, const char *key, size_t key_length, const uint64_t offset, const bool reply) { @@ -145,6 +186,8 @@ static memcached_return_t increment_decrement_by_key(const protocol_binary_comma if (memcached_is_binary(memc)) { rc = binary_incr_decr(instance, command, key, key_length, uint64_t(offset), 0, MEMCACHED_EXPIRATION_NOT_ADD, reply); + } else if (memcached_is_meta(memc)) { + rc = meta_incr_decr(instance, command == PROTOCOL_BINARY_CMD_INCREMENT, false, key, key_length, offset, 0, 0); } else { rc = text_incr_decr(instance, command == PROTOCOL_BINARY_CMD_INCREMENT ? true : false, key, key_length, offset, reply); @@ -189,7 +232,9 @@ increment_decrement_with_initial_by_key(const protocol_binary_command command, M if (memcached_is_binary(memc)) { rc = binary_incr_decr(instance, command, key, key_length, offset, initial, uint32_t(expiration), reply); - + } else if (memcached_is_meta(memc)) { + rc = meta_incr_decr(instance, command == PROTOCOL_BINARY_CMD_INCREMENT, true, + key, key_length, offset, initial, uint32_t(expiration)); } else { rc = memcached_set_error( *memc, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, diff --git a/src/libmemcached/behavior.cc b/src/libmemcached/behavior.cc index 859e56aa..ce8a662f 100644 --- a/src/libmemcached/behavior.cc +++ b/src/libmemcached/behavior.cc @@ -96,14 +96,22 @@ memcached_return_t memcached_behavior_set(memcached_st *shell, const memcached_b break; case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL: - send_quit( - ptr); // We need t shutdown all of the connections to make sure we do the correct protocol + // We need t shutdown all of the connections to make sure we do the correct protocol + send_quit(ptr); if (data) { ptr->flags.verify_key = false; } ptr->flags.binary_protocol = bool(data); break; + case MEMCACHED_BEHAVIOR_META_PROTOCOL: + if (data && ptr->flags.binary_protocol) { + send_quit(ptr); + ptr->flags.binary_protocol = false; + } + ptr->flags.meta_protocol = bool(data); + break; + case MEMCACHED_BEHAVIOR_SUPPORT_CAS: ptr->flags.support_cas = bool(data); break; @@ -623,6 +631,8 @@ const char *libmemcached_string_behavior(const memcached_behavior_t flag) { return "MEMCACHED_BEHAVIOR_KETAMA_HASH"; case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL: return "MEMCACHED_BEHAVIOR_BINARY_PROTOCOL"; + case MEMCACHED_BEHAVIOR_META_PROTOCOL: + return "MEMCACHED_BEHAVIOR_META_PROTOCOL"; case MEMCACHED_BEHAVIOR_SND_TIMEOUT: return "MEMCACHED_BEHAVIOR_SND_TIMEOUT"; case MEMCACHED_BEHAVIOR_RCV_TIMEOUT: diff --git a/src/libmemcached/delete.cc b/src/libmemcached/delete.cc index 6a69ae0d..fffa42e8 100644 --- a/src/libmemcached/delete.cc +++ b/src/libmemcached/delete.cc @@ -20,6 +20,30 @@ memcached_return_t memcached_delete(memcached_st *shell, const char *key, size_t return memcached_delete_by_key(shell, key, key_length, key, key_length, expiration); } +static inline memcached_return_t meta_delete(memcached_instance_st *instance, + const char *key, size_t key_length, + time_t expiration) { + + char ex_buf[32] = " I T"; + size_t io_num = 0, ex_len = strlen(ex_buf); + libmemcached_io_vector_st io_vec[6] = {}; + io_vec[io_num++] = {memcached_literal_param("md ")}; + io_vec[io_num++] = {memcached_array_string(instance->root->_namespace), + memcached_array_size(instance->root->_namespace)}; + io_vec[io_num++] = {key, key_length}; + if (!memcached_is_replying(instance->root)) { + io_vec[io_num++] = {" q", 2}; + } + if (expiration) { + ex_len += snprintf(ex_buf + ex_len, sizeof(ex_buf) - ex_len, "%llu", (unsigned long long) expiration); + io_vec[io_num++] = {ex_buf, ex_len}; + } + io_vec[io_num++] = {memcached_literal_param("\r\n")}; + + /* Send command header, only flush if we are NOT buffering */ + return memcached_vdo(instance, io_vec, io_num, !memcached_is_buffering(instance->root)); +} + static inline memcached_return_t ascii_delete(memcached_instance_st *instance, uint32_t, const char *key, const size_t key_length, const bool reply, const bool is_buffering) { @@ -85,10 +109,9 @@ static inline memcached_return_t binary_delete(memcached_instance_st *instance, return rc; } -memcached_return_t memcached_delete_by_key(memcached_st *shell, const char *group_key, +memcached_return_t memcached_delete_by_key(memcached_st *memc, const char *group_key, size_t group_key_length, const char *key, size_t key_length, time_t expiration) { - Memcached *memc = memcached2Memcached(shell); LIBMEMCACHED_MEMCACHED_DELETE_START(); memcached_return_t rc; @@ -100,16 +123,15 @@ memcached_return_t memcached_delete_by_key(memcached_st *shell, const char *grou return memcached_last_error(memc); } - if (expiration) { + if (expiration && !memcached_is_meta(memc)) { return memcached_set_error( *memc, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param( "Memcached server version does not allow expiration of deleted items")); } - uint32_t server_key = - memcached_generate_hash_with_redistribution(memc, group_key, group_key_length); - memcached_instance_st *instance = memcached_instance_fetch(memc, server_key); + auto server_key = memcached_generate_hash_with_redistribution(memc, group_key, group_key_length); + auto *instance = memcached_instance_fetch(memc, server_key); bool is_buffering = memcached_is_buffering(instance->root); bool is_replying = memcached_is_replying(instance->root); @@ -132,6 +154,8 @@ memcached_return_t memcached_delete_by_key(memcached_st *shell, const char *grou if (memcached_is_binary(memc)) { rc = binary_delete(instance, server_key, key, key_length, is_replying, is_buffering); + } else if (memcached_is_meta(memc)) { + rc = meta_delete(instance, key, key_length, expiration); } else { rc = ascii_delete(instance, server_key, key, key_length, is_replying, is_buffering); } diff --git a/src/libmemcached/fetch.cc b/src/libmemcached/fetch.cc index fee6a10d..5d4c349c 100644 --- a/src/libmemcached/fetch.cc +++ b/src/libmemcached/fetch.cc @@ -167,8 +167,8 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr, memcached_result_ *error = MEMCACHED_END; } else if (*error == MEMCACHED_MAXIMUM_RETURN and result->count) { *error = MEMCACHED_END; - } else if (*error == MEMCACHED_MAXIMUM_RETURN) // while() loop was never entered - { + } else if (*error == MEMCACHED_MAXIMUM_RETURN) { + // while() loop was never entered *error = MEMCACHED_NOTFOUND; } else if (connection_failures) { /* diff --git a/src/libmemcached/get.cc b/src/libmemcached/get.cc index 8801d31d..32da9b9a 100644 --- a/src/libmemcached/get.cc +++ b/src/libmemcached/get.cc @@ -137,7 +137,7 @@ static memcached_return_t mget_by_key_real(memcached_st *ptr, const char *group_ size_t group_key_length, const char *const *keys, const size_t *key_length, size_t number_of_keys, const bool mget_mode) { - bool failures_occured_in_sending = false; + bool failures_occurred_in_sending = false, success_happened = false; const char *get_command = "get"; uint8_t get_command_length = 3; unsigned int master_server_key = (unsigned int) -1; /* 0 is a valid server id! */ @@ -198,91 +198,143 @@ static memcached_return_t mget_by_key_real(memcached_st *ptr, const char *group_ number_of_keys, mget_mode); } - if (ptr->flags.support_cas) { - get_command = "gets"; - get_command_length = 4; - } + if (memcached_is_meta(ptr)) { + size_t hosts_connected = 0; - /* - If a server fails we warn about errors and start all over with sending keys - to the server. - */ - WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS); - size_t hosts_connected = 0; - for (uint32_t x = 0; x < number_of_keys; x++) { - uint32_t server_key; - - if (is_group_key_set) { - server_key = master_server_key; - } else { - server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); - } - - memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key); + for (uint32_t x = 0; x < number_of_keys; ++x) { + auto io_num = 0; + libmemcached_io_vector_st io_vec[8] = {}; + + io_vec[io_num++] = {memcached_literal_param("mg ")}; + io_vec[io_num++] = {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)}; + io_vec[io_num++] = {keys[x], key_length[x]}; + if (memcached_is_cas(ptr)) { + io_vec[io_num++] = { memcached_literal_param(" c")}; + } + io_vec[io_num++] = {memcached_literal_param(" k t f v\r\n")}; - libmemcached_io_vector_st vector[] = { - {get_command, get_command_length}, - {memcached_literal_param(" ")}, - {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)}, - {keys[x], key_length[x]}}; + uint32_t server_key; + if (is_group_key_set) { + server_key = master_server_key; + } else { + server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); + } - if (instance->response_count() == 0) { - rc = memcached_connect(instance); + auto *instance = memcached_instance_fetch(ptr, server_key); + if (!instance->response_count()) { + rc = memcached_connect(instance); - if (memcached_failed(rc)) { - memcached_set_error(*instance, rc, MEMCACHED_AT); - continue; + if (memcached_failed(rc)) { + memcached_set_error(*instance, rc, MEMCACHED_AT); + continue; + } + hosts_connected++; } - hosts_connected++; - - if ((memcached_io_writev(instance, vector, 1, false)) == false) { - failures_occured_in_sending = true; + if (!memcached_io_writev(instance, io_vec, io_num, false)) { + failures_occurred_in_sending = true; continue; } - WATCHPOINT_ASSERT(instance->cursor_active_ == 0); memcached_instance_response_increment(instance); - WATCHPOINT_ASSERT(instance->cursor_active_ == 1); } - { - if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) { - memcached_instance_response_reset(instance); - failures_occured_in_sending = true; - continue; + for (uint32_t x = 0; x < memcached_server_count(ptr); x++) { + memcached_instance_st *instance = memcached_instance_fetch(ptr, x); + + if (instance->response_count()) { + /* We need to do something about non-connected hosts in the future */ + if (!memcached_io_write(instance)) { + failures_occurred_in_sending = true; + } else { + success_happened = true; + } } } - } + } else { + if (ptr->flags.support_cas) { + get_command = "gets"; + get_command_length = 4; + } + + /* + If a server fails we warn about errors and start all over with sending keys + to the server. + */ + WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS); + size_t hosts_connected = 0; + for (uint32_t x = 0; x < number_of_keys; x++) { + uint32_t server_key; + + if (is_group_key_set) { + server_key = master_server_key; + } else { + server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); + } + + memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key); - if (hosts_connected == 0) { - LIBMEMCACHED_MEMCACHED_MGET_END(); + libmemcached_io_vector_st vector[] = { + {get_command, get_command_length}, + {memcached_literal_param(" ")}, + {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)}, + {keys[x], key_length[x]}}; + + if (instance->response_count() == 0) { + rc = memcached_connect(instance); - if (memcached_failed(rc)) { - return rc; + if (memcached_failed(rc)) { + memcached_set_error(*instance, rc, MEMCACHED_AT); + continue; + } + hosts_connected++; + + if ((memcached_io_writev(instance, vector, 1, false)) == false) { + failures_occurred_in_sending = true; + continue; + } + WATCHPOINT_ASSERT(instance->cursor_active_ == 0); + memcached_instance_response_increment(instance); + WATCHPOINT_ASSERT(instance->cursor_active_ == 1); + } + + { + if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) { + memcached_instance_response_reset(instance); + failures_occurred_in_sending = true; + continue; + } + } } - return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT); - } + if (hosts_connected == 0) { + LIBMEMCACHED_MEMCACHED_MGET_END(); - /* - Should we muddle on if some servers are dead? - */ - bool success_happened = false; - for (uint32_t x = 0; x < memcached_server_count(ptr); x++) { - memcached_instance_st *instance = memcached_instance_fetch(ptr, x); + if (memcached_failed(rc)) { + return rc; + } - if (instance->response_count()) { - /* We need to do something about non-connnected hosts in the future */ - if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) { - failures_occured_in_sending = true; - } else { - success_happened = true; + return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT); + } + + /* + Should we muddle on if some servers are dead? + */ + for (uint32_t x = 0; x < memcached_server_count(ptr); x++) { + memcached_instance_st *instance = memcached_instance_fetch(ptr, x); + + if (instance->response_count()) { + /* We need to do something about non-connnected hosts in the future */ + if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) { + failures_occurred_in_sending = true; + } else { + success_happened = true; + } } } } LIBMEMCACHED_MEMCACHED_MGET_END(); - if (failures_occured_in_sending and success_happened) { + if (failures_occurred_in_sending and success_happened) { return MEMCACHED_SOME_ERRORS; } diff --git a/src/libmemcached/is.h b/src/libmemcached/is.h index 229fd9b0..d52d2103 100644 --- a/src/libmemcached/is.h +++ b/src/libmemcached/is.h @@ -26,6 +26,7 @@ #define memcached_is_udp(__object) ((__object)->flags.use_udp) #define memcached_is_verify_key(__object) ((__object)->flags.verify_key) #define memcached_is_binary(__object) ((__object)->flags.binary_protocol) +#define memcached_is_meta(__object) ((__object)->flags.meta_protocol) #define memcached_is_fetching_version(__object) ((__object)->flags.is_fetching_version) #define memcached_is_buffering(__object) ((__object)->flags.buffer_requests) #define memcached_is_replying(__object) ((__object)->flags.reply) @@ -47,6 +48,7 @@ #define memcached_set_udp(__object, __flag) ((__object).flags.use_udp = __flag) #define memcached_set_verify_key(__object, __flag) ((__object).flags.verify_key = __flag) #define memcached_set_binary(__object, __flag) ((__object).flags.binary_protocol = __flag) +#define memcached_set_meta(__object, __flag) ((__object).flags.meta_protocol = __flag) #define memcached_set_fetching_version(__object, __flag) \ ((__object).flags.is_fetching_version = __flag) #define memcached_set_buffering(__object, __flag) ((__object).flags.buffer_requests = __flag) diff --git a/src/libmemcached/memcached.cc b/src/libmemcached/memcached.cc index f789cb4f..beccab2e 100644 --- a/src/libmemcached/memcached.cc +++ b/src/libmemcached/memcached.cc @@ -26,6 +26,7 @@ static inline bool _memcached_init(Memcached *self) { self->flags.auto_eject_hosts = false; self->flags.binary_protocol = false; + self->flags.meta_protocol = false; self->flags.buffer_requests = false; self->flags.hash_with_namespace = false; self->flags.no_block = false; diff --git a/src/libmemcached/response.cc b/src/libmemcached/response.cc index 7e4610b5..fe01e388 100644 --- a/src/libmemcached/response.cc +++ b/src/libmemcached/response.cc @@ -16,11 +16,243 @@ #include "libmemcached/common.h" #include "libmemcached/string.hpp" +static inline memcached_return_t ascii_read_inc(memcached_instance_st *instance, const char *buffer, + memcached_result_st *result) { + { + errno = 0; + unsigned long long int auto_return_value = strtoull(buffer, (char **) NULL, 10); + + if (errno) { + result->numeric_value = UINT64_MAX; + return memcached_set_error(*instance, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT, + memcached_literal_param("Numeric response was out of range")); + } + + result->numeric_value = uint64_t(auto_return_value); + return MEMCACHED_SUCCESS; + } +} + +static inline memcached_return_t ascii_read_ull(size_t &value, char **string_ptr, char *end_ptr) { + char *next_ptr = *string_ptr; + + errno = 0; + value = strtoull(next_ptr, string_ptr, 10); + + if (errno) { + return MEMCACHED_ERRNO; + } + if (*string_ptr >= end_ptr) { + return MEMCACHED_PARTIAL_READ; + } + return MEMCACHED_SUCCESS; +} + +static inline memcached_return_t ascii_read_key(char *buf, size_t len, char **str_ptr, char *end_ptr) { + char *tmp_ptr = buf; + while (**str_ptr != ' ' && **str_ptr != '\r') { + *(tmp_ptr++) = *((*str_ptr)++); + if (*str_ptr == end_ptr || size_t(tmp_ptr - buf) >= len) { + return MEMCACHED_PARTIAL_READ; + } + } + *tmp_ptr = 0; + return MEMCACHED_SUCCESS; +} + +static inline memcached_return_t ascii_read_val(memcached_instance_st *instance, memcached_result_st *result, + ssize_t &to_read) { + /* We add two bytes so that we can walk the \r\n */ + if (memcached_failed(memcached_string_check(&result->value, to_read))) { + return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + } + + { + char *value_ptr = memcached_string_value_mutable(&result->value); + /* + We read the \r\n into the string since not doing so is more + cycles then the waster of memory to do so. + + We are null terminating through, which will most likely make + some people lazy about using the return length. + */ + memcached_return_t read_rc = memcached_io_read(instance, value_ptr, to_read, to_read); + if (memcached_failed(read_rc)) { + if (read_rc == MEMCACHED_IN_PROGRESS) { + memcached_quit_server(instance, true); + return memcached_set_error(*instance, MEMCACHED_IN_PROGRESS, MEMCACHED_AT); + } + return read_rc; + } + + /* This next bit blows the API, but this is internal....*/ + { + char *char_ptr = memcached_string_value_mutable(&result->value); + char_ptr[--to_read] = 0; + char_ptr[--to_read] = 0; + memcached_string_set_length(&result->value, to_read); + } + + return MEMCACHED_SUCCESS; + } +} + +static memcached_return_t result_decrypt(memcached_instance_st *instance, memcached_result_st *result) { + memcached_return_t rc = MEMCACHED_SUCCESS; + + if (memcached_result_length(result)) { + hashkit_string_st *destination = hashkit_decrypt(&instance->root->hashkit, memcached_result_value(result), + memcached_result_length(result)); + if(!destination) { + return memcached_set_error(*instance->root, MEMCACHED_FAILURE, MEMCACHED_AT, + memcached_literal_param("hashkit_decrypt() failed")); + } + + memcached_result_reset_value(result); + + rc = memcached_result_set_value(result, hashkit_string_c_str(destination), hashkit_string_length(destination)); + if (memcached_failed(rc)) { + rc = memcached_set_error(*instance->root, MEMCACHED_FAILURE, MEMCACHED_AT, + memcached_literal_param("hashkit_decrypt() failed")); + } + + hashkit_string_free(destination); + } + + return rc; +} + +static memcached_return_t meta_fetch_flags(memcached_instance_st *instance, char *str_ptr, char *end_ptr, + memcached_result_st *result, char opaque[MEMCACHED_MAX_KEY]) { + size_t ull_val; + char *tmp_ptr; + + while (str_ptr < end_ptr && *str_ptr != '\r') { + switch (*str_ptr++) { + case ' ': + break; + + case 'c': // CAS + if (MEMCACHED_SUCCESS != ascii_read_ull(ull_val, &str_ptr, end_ptr)) { + goto read_error; + } + result->item_cas = ull_val; + break; + + case 'f': + if (MEMCACHED_SUCCESS != ascii_read_ull(ull_val, &str_ptr, end_ptr)) { + goto read_error; + } + result->item_flags = ull_val; + break; + + case 'k': + str_ptr += memcached_array_size(instance->root->_namespace); + if (str_ptr >= end_ptr) { + goto read_error; + } + tmp_ptr = str_ptr; + if (MEMCACHED_SUCCESS != ascii_read_key(result->item_key, sizeof(result->item_key), &str_ptr, end_ptr)) { + goto read_error; + } + result->key_length = str_ptr - tmp_ptr; + break; + + case 'l': + if (ascii_read_ull(ull_val, &str_ptr, end_ptr)) { + goto read_error; + } + /* legacy result does not support last_accessed */ + break; + + case 'O': + tmp_ptr = str_ptr; + if (MEMCACHED_SUCCESS != ascii_read_key(opaque, MEMCACHED_MAX_KEY, &str_ptr, end_ptr)) { + goto read_error; + } + /* legacy result does not support opaque */ + break; + + case 's': // size + if (MEMCACHED_SUCCESS != ascii_read_ull(ull_val, &str_ptr, end_ptr)) { + goto read_error; + } + /* legacy result does not support size */ + break; + + case 't': + if (MEMCACHED_SUCCESS != ascii_read_ull(ull_val, &str_ptr, end_ptr)) { + goto read_error; + } + result->item_expiration = time_t(ull_val); + break; + + case 'W': + case 'X': + case 'Z': + /* legacy result does not support re-cache semantics */ + break; + } + } + + return MEMCACHED_SUCCESS; +read_error: + memcached_io_reset(instance); + return MEMCACHED_PARTIAL_READ; +} + +static memcached_return_t meta_va_fetch(memcached_instance_st *instance, char *buffer, + memcached_result_st *result) { + char opaque[MEMCACHED_MAX_KEY] = ""; + char *str_ptr = buffer + sizeof("VA"); + char *end_ptr = buffer + MEMCACHED_DEFAULT_COMMAND_SIZE; + size_t ull_val, val_len; + memcached_return_t rc; + + while (isspace(*str_ptr) && str_ptr < end_ptr) { + ++str_ptr; + } + if (str_ptr == end_ptr) { + goto read_error; + } + + if (MEMCACHED_SUCCESS != ascii_read_ull(ull_val, &str_ptr, end_ptr)) { + goto read_error; + } + val_len = ull_val; + + rc = meta_fetch_flags(instance, str_ptr, end_ptr, result, opaque); + if (memcached_success(rc)) { + auto read_len = ssize_t(val_len + 2); + rc = ascii_read_val(instance, result, read_len); + if (memcached_success(rc)) { + if (read_len != ssize_t(val_len)) { + goto read_error; + } + + /* meta INC/DEC response */ + if ('+' == *opaque) { + rc = ascii_read_inc(instance, result->value.string, result); + } else if (memcached_is_encrypted(instance->root)) { + rc = result_decrypt(instance, result); + if (memcached_failed(rc)) { + memcached_result_reset(result); + } + } + } + } + + return rc; + +read_error: + memcached_io_reset(instance); + return MEMCACHED_PARTIAL_READ; +} + static memcached_return_t textual_value_fetch(memcached_instance_st *instance, char *buffer, memcached_result_st *result) { - char *next_ptr; ssize_t read_length = 0; - size_t value_length; + size_t value_length, ull_val; WATCHPOINT_ASSERT(instance->root); char *end_ptr = buffer + MEMCACHED_DEFAULT_COMMAND_SIZE; @@ -61,14 +293,10 @@ static memcached_return_t textual_value_fetch(memcached_instance_st *instance, c goto read_error; } - for (next_ptr = string_ptr; isdigit(*string_ptr); string_ptr++) { - }; - errno = 0; - result->item_flags = (uint32_t) strtoul(next_ptr, &string_ptr, 10); - - if (errno or end_ptr == string_ptr) { + if (MEMCACHED_SUCCESS != ascii_read_ull(ull_val, &string_ptr, end_ptr)) { goto read_error; } + result->item_flags = ull_val; /* Length fetch move past space*/ string_ptr++; @@ -76,14 +304,10 @@ static memcached_return_t textual_value_fetch(memcached_instance_st *instance, c goto read_error; } - for (next_ptr = string_ptr; isdigit(*string_ptr); string_ptr++) { - }; - errno = 0; - value_length = (size_t) strtoull(next_ptr, &string_ptr, 10); - - if (errno or end_ptr == string_ptr) { + if (MEMCACHED_SUCCESS != ascii_read_ull(ull_val, &string_ptr, end_ptr)) { goto read_error; } + value_length = ull_val; /* Skip spaces */ if (*string_ptr == '\r') { @@ -91,84 +315,37 @@ static memcached_return_t textual_value_fetch(memcached_instance_st *instance, c string_ptr += 2; } else { string_ptr++; - for (next_ptr = string_ptr; isdigit(*string_ptr); string_ptr++) { - }; - errno = 0; - result->item_cas = strtoull(next_ptr, &string_ptr, 10); + if (MEMCACHED_SUCCESS != ascii_read_ull(ull_val, &string_ptr, end_ptr)) { + goto read_error; + } + result->item_cas = ull_val; } - if (errno or end_ptr < string_ptr) { + if (end_ptr < string_ptr) { goto read_error; } - /* We add two bytes so that we can walk the \r\n */ - if (memcached_failed(memcached_string_check(&result->value, value_length + 2))) { - return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - } - - { - char *value_ptr = memcached_string_value_mutable(&result->value); - /* - We read the \r\n into the string since not doing so is more - cycles then the waster of memory to do so. - - We are null terminating through, which will most likely make - some people lazy about using the return length. - */ - size_t to_read = (value_length) + 2; - memcached_return_t rrc = memcached_io_read(instance, value_ptr, to_read, read_length); - if (memcached_failed(rrc) and rrc == MEMCACHED_IN_PROGRESS) { - memcached_quit_server(instance, true); - return memcached_set_error(*instance, MEMCACHED_IN_PROGRESS, MEMCACHED_AT); - } else if (memcached_failed(rrc)) { - return rrc; - } + read_length = ssize_t(value_length + 2); + rc = ascii_read_val(instance, result, read_length); + if (MEMCACHED_SUCCESS != rc) { + return rc; } - if (read_length != (ssize_t)(value_length + 2)) { + if (read_length != (ssize_t)(value_length)) { goto read_error; } - /* This next bit blows the API, but this is internal....*/ - { - char *char_ptr; - char_ptr = memcached_string_value_mutable(&result->value); - ; - char_ptr[value_length] = 0; - char_ptr[value_length + 1] = 0; - memcached_string_set_length(&result->value, value_length); - } - - if (memcached_is_encrypted(instance->root) and memcached_result_length(result)) { - hashkit_string_st *destination; - - if ((destination = hashkit_decrypt(&instance->root->hashkit, memcached_result_value(result), - memcached_result_length(result))) - == NULL) - { - rc = memcached_set_error(*instance->root, MEMCACHED_FAILURE, MEMCACHED_AT, - memcached_literal_param("hashkit_decrypt() failed")); - } else { - memcached_result_reset_value(result); - if (memcached_failed(memcached_result_set_value(result, hashkit_string_c_str(destination), - hashkit_string_length(destination)))) - { - rc = memcached_set_error(*instance->root, MEMCACHED_FAILURE, MEMCACHED_AT, - memcached_literal_param("hashkit_decrypt() failed")); - } - } - + if (memcached_is_encrypted(instance->root)) { + rc = result_decrypt(instance, result); if (memcached_failed(rc)) { memcached_result_reset(result); } - hashkit_string_free(destination); } return rc; read_error: memcached_io_reset(instance); - return MEMCACHED_PARTIAL_READ; } @@ -186,12 +363,16 @@ static memcached_return_t textual_read_one_response(memcached_instance_st *insta switch (buffer[0]) { case 'V': { // VALUE - if (buffer[1] == 'A' and buffer[2] == 'L' and buffer[3] == 'U' and buffer[4] == 'E') /* VALUE */ + if (buffer[1] == 'A' and buffer[2] == 'L' and buffer[3] == 'U' and buffer[4] == 'E') /* VALUE */ { /* We add back in one because we will need to search for END */ memcached_server_response_increment(instance); return textual_value_fetch(instance, buffer, result); } + // VA + else if (buffer[1] == 'A' && buffer[2] == ' ') { + return meta_va_fetch(instance, buffer, result); + } // VERSION else if (buffer[1] == 'E' and buffer[2] == 'R' and buffer[3] == 'S' and buffer[4] == 'I' and buffer[5] == 'O' and buffer[6] == 'N') /* VERSION */ @@ -304,12 +485,18 @@ static memcached_return_t textual_read_one_response(memcached_instance_st *insta case 'N': { // NOT_FOUND - if (buffer[1] == 'O' and buffer[2] == 'T' and buffer[3] == '_' and buffer[4] == 'F' + if (buffer[1] == 'F') { + return MEMCACHED_NOTFOUND; + } + else if (buffer[1] == 'O' and buffer[2] == 'T' and buffer[3] == '_' and buffer[4] == 'F' and buffer[5] == 'O' and buffer[6] == 'U' and buffer[7] == 'N' and buffer[8] == 'D') { return MEMCACHED_NOTFOUND; } // NOT_STORED + else if (buffer[1] == 'S') { + return MEMCACHED_NOTSTORED; + } else if (buffer[1] == 'O' and buffer[2] == 'T' and buffer[3] == '_' and buffer[4] == 'S' and buffer[5] == 'T' and buffer[6] == 'O' and buffer[7] == 'R' and buffer[8] == 'E' and buffer[9] == 'D') @@ -318,29 +505,28 @@ static memcached_return_t textual_read_one_response(memcached_instance_st *insta } } break; + case 'M': /* META NOOP */ + if (buffer[1] == 'N') { + return MEMCACHED_END; + } + break; + case 'E': /* PROTOCOL ERROR or END */ { // END - if (buffer[1] == 'N' and buffer[2] == 'D') { - return MEMCACHED_END; - } -#if 0 - // PROTOCOL_ERROR - else if (buffer[1] == 'R' and buffer[2] == 'O' and buffer[3] == 'T' and buffer[4] == 'O' and buffer[5] == 'C' and buffer[6] == 'O' and buffer[7] == 'L' - and buffer[8] == '_' - and buffer[9] == 'E' and buffer[10] == 'R' and buffer[11] == 'R' and buffer[12] == 'O' and buffer[13] == 'R') - { - return MEMCACHED_PROTOCOL_ERROR; + if (buffer[1] == 'N') { + if (buffer[2] == 'D') { + return MEMCACHED_END; } -#endif + return MEMCACHED_NOTFOUND; + } // ERROR else if (buffer[1] == 'R' and buffer[2] == 'R' and buffer[3] == 'O' and buffer[4] == 'R') { return MEMCACHED_ERROR; } // EXISTS - else if (buffer[1] == 'X' and buffer[2] == 'I' and buffer[3] == 'S' and buffer[4] == 'T' - and buffer[5] == 'S') + else if (buffer[1] == 'X') { return MEMCACHED_DATA_EXISTS; } @@ -397,29 +583,7 @@ static memcached_return_t textual_read_one_response(memcached_instance_st *insta case '7': /* INCR/DECR response */ case '8': /* INCR/DECR response */ case '9': /* INCR/DECR response */ - { - errno = 0; - unsigned long long int auto_return_value = strtoull(buffer, (char **) NULL, 10); - - if (auto_return_value == ULLONG_MAX and errno == ERANGE) { - result->numeric_value = UINT64_MAX; - return memcached_set_error(*instance, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT, - memcached_literal_param("Numeric response was out of range")); - } else if (errno == EINVAL) { - result->numeric_value = UINT64_MAX; - return memcached_set_error(*instance, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT, - memcached_literal_param("Numeric response was out of range")); - } else if (errno) { - result->numeric_value = UINT64_MAX; - return memcached_set_error(*instance, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT, - memcached_literal_param("Numeric response was out of range")); - } - - result->numeric_value = uint64_t(auto_return_value); - - WATCHPOINT_STRING(buffer); - return MEMCACHED_SUCCESS; - } + return ascii_read_inc(instance, buffer, result); default: break; diff --git a/src/libmemcached/storage.cc b/src/libmemcached/storage.cc index 8e6d5377..3d82edf5 100644 --- a/src/libmemcached/storage.cc +++ b/src/libmemcached/storage.cc @@ -176,6 +176,75 @@ static memcached_return_t memcached_send_binary(Memcached *ptr, memcached_instan return memcached_response(server, NULL, 0, NULL); } +static memcached_return_t +memcached_send_meta(memcached_st *ptr, memcached_instance_st *instance, + const char *key, size_t key_len, + const char *val, size_t val_len, + time_t expiration, uint32_t flags, uint64_t cas, + bool flush, memcached_storage_action_t verb) { + static const char modes[] = "SREPAS"; + char fl_buf[32] = " F", cs_buf[32] = " C", ex_buf[32] = " T", sz_buf[32] = " S"; + size_t io_num = 0, fl_len = strlen(fl_buf), cs_len = strlen(cs_buf), ex_len = strlen(ex_buf), sz_len = strlen(sz_buf); + libmemcached_io_vector_st io_vec[16] = {}; + + io_vec[io_num++] = {memcached_literal_param("ms ")}; + io_vec[io_num++] = {memcached_array_string(ptr->_namespace), + memcached_array_size(ptr->_namespace)}; + io_vec[io_num++] = {key, key_len}; + + if (verb != SET_OP) { + io_vec[io_num++] = {memcached_literal_param(" M")}; + io_vec[io_num++] = {&modes[verb], 1}; + } + + if (!memcached_is_replying(ptr)) { + io_vec[io_num++] = { memcached_literal_param(" q")}; + } + + fl_len += snprintf(fl_buf + fl_len, sizeof(fl_buf) - fl_len, "%" PRIu32, flags); + io_vec[io_num++] = {fl_buf, fl_len}; + if (expiration) { + ex_len += snprintf(ex_buf + ex_len, sizeof(ex_buf) - ex_len, "%" PRIi64, (int64_t) expiration); + io_vec[io_num++] = {ex_buf, ex_len}; + } + if (cas) { + cs_len += snprintf(cs_buf + cs_len, sizeof(cs_buf) - cs_len, "%" PRIu64, cas); + io_vec[io_num++] = {cs_buf, cs_len}; + } + + /* we have to send a data block even if it's empty, else memcached errors out with ITEM TOO BIG */ + sz_len += snprintf(sz_buf + sz_len, sizeof(sz_buf) - sz_len, "%" PRIu64, (uint64_t) val_len); + io_vec[io_num++] = {sz_buf, sz_len}; + io_vec[io_num++] = {memcached_literal_param("\r\n")}; + io_vec[io_num++] = {val, val_len}; + io_vec[io_num++] = {memcached_literal_param("\r\n")}; + + /* Send command header */ + memcached_return_t rc = memcached_vdo(instance, io_vec, io_num, flush); + + // If we should not reply, return with MEMCACHED_SUCCESS, unless error + if (!memcached_is_replying(ptr)) { + return memcached_success(rc) ? MEMCACHED_SUCCESS : rc; + } + + if (!flush) { + return memcached_success(rc) ? MEMCACHED_BUFFERED : rc; + } + + if (rc == MEMCACHED_SUCCESS) { + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc = memcached_response(instance, buffer, sizeof(buffer), NULL); + + if (rc == MEMCACHED_SUCCESS) { + return MEMCACHED_SUCCESS; + } + } + + assert(memcached_failed(rc)); + + return rc; +} + static memcached_return_t memcached_send_ascii(Memcached *ptr, memcached_instance_st *instance, const char *key, const size_t key_length, const char *value, const size_t value_length, @@ -312,6 +381,9 @@ memcached_send(memcached_st *shell, const char *group_key, size_t group_key_leng if (memcached_is_binary(ptr)) { rc = memcached_send_binary(ptr, instance, server_key, key, key_length, value, value_length, expiration, flags, cas, flush, reply, verb); + } else if (memcached_is_meta(ptr)) { + rc = memcached_send_meta(ptr, instance, key, key_length, value, value_length, expiration, + flags, cas, flush, verb); } else { rc = memcached_send_ascii(ptr, instance, key, key_length, value, value_length, expiration, flags, cas, flush, reply, verb); diff --git a/src/libmemcached/touch.cc b/src/libmemcached/touch.cc index 6d8fdc4e..b8c8f6b6 100644 --- a/src/libmemcached/touch.cc +++ b/src/libmemcached/touch.cc @@ -15,6 +15,29 @@ #include "libmemcached/common.h" +static memcached_return_t meta_touch(memcached_instance_st *instance, const char *key, size_t key_len, time_t expiration) { + char ex_buf[32] = " T"; + size_t ex_len = strlen(ex_buf); + + ex_len += snprintf(ex_buf + ex_len, sizeof(ex_buf) - ex_len, "%lu", (unsigned long) expiration); + + libmemcached_io_vector_st io_vec[] = { + {memcached_literal_param("mg ")}, + {memcached_array_string(instance->root->_namespace), + memcached_array_size(instance->root->_namespace)}, + {key, key_len}, + {ex_buf, ex_len}, + {memcached_literal_param("\r\n")} + }; + + memcached_return_t rc; + if (memcached_failed(rc = memcached_vdo(instance, io_vec, 5, true))) { + return memcached_set_error(*instance, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + } + + return rc; +} + static memcached_return_t ascii_touch(memcached_instance_st *instance, const char *key, size_t key_length, time_t expiration) { char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1]; @@ -98,8 +121,10 @@ memcached_return_t memcached_touch_by_key(memcached_st *shell, const char *group memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key); - if (ptr->flags.binary_protocol) { + if (memcached_is_binary(ptr)) { rc = binary_touch(instance, key, key_length, expiration); + } else if (memcached_is_meta(ptr)) { + rc = meta_touch(instance, key, key_length, expiration); } else { rc = ascii_touch(instance, key, key_length, expiration); } diff --git a/test/lib/MemcachedCluster.cpp b/test/lib/MemcachedCluster.cpp index 4b1aa2c5..ce348d65 100644 --- a/test/lib/MemcachedCluster.cpp +++ b/test/lib/MemcachedCluster.cpp @@ -131,6 +131,11 @@ void MemcachedCluster::enableBinaryProto(bool enable) { MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, enable)); } +void MemcachedCluster::enableMetaProto(bool enable) { + REQUIRE(MEMCACHED_SUCCESS == memcached_behavior_set(&memc, + MEMCACHED_BEHAVIOR_META_PROTOCOL, enable)); +} + void MemcachedCluster::enableBuffering(bool enable) { REQUIRE(MEMCACHED_SUCCESS == memcached_behavior_set(&memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, enable)); @@ -146,3 +151,15 @@ void MemcachedCluster::killOneServer() const { const auto &victim = servers[random_num((size_t)0, servers.size() - 1)]; ::kill(victim.getPid(), SIGKILL); } + +bool MemcachedCluster::isGEVersion(uint8_t major, uint8_t minor, uint8_t micro) { + REQUIRE(MEMCACHED_SUCCESS == memcached_version(&memc)); + auto inst = memcached_server_instance_by_position(&memc, 0); + auto maj = memcached_server_major_version(inst); + auto min = memcached_server_minor_version(inst); + auto mic = memcached_server_micro_version(inst); + + return (maj > major) + || (maj == major && min > minor) + || (maj == major && min == minor && mic >= micro); +} diff --git a/test/lib/MemcachedCluster.hpp b/test/lib/MemcachedCluster.hpp index dcda9ddd..7e04bb8f 100644 --- a/test/lib/MemcachedCluster.hpp +++ b/test/lib/MemcachedCluster.hpp @@ -39,6 +39,7 @@ public: MemcachedCluster &operator=(MemcachedCluster &&mc) noexcept; void enableBinaryProto(bool enable = true); + void enableMetaProto(bool enable = true); void enableBuffering(bool enable = true); void enableReplication(); void flush(); @@ -53,6 +54,7 @@ public: #endif void killOneServer() const; + bool isGEVersion(uint8_t major, uint8_t minor = 0, uint8_t micro = 0); private: behaviors_t to_set; diff --git a/test/lib/ReturnMatcher.cpp b/test/lib/ReturnMatcher.cpp index 926214e7..ab3b33f7 100644 --- a/test/lib/ReturnMatcher.cpp +++ b/test/lib/ReturnMatcher.cpp @@ -1,6 +1,8 @@ #include "ReturnMatcher.hpp" +#include "libmemcached/error.hpp" bool ReturnMatcher::match(const memcached_return_t &arg) const { + actual->v = arg; if (arg != expected) { if (expected == MEMCACHED_SUCCESS && arg == MEMCACHED_BUFFERED && memc) { return memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS); @@ -24,7 +26,11 @@ ReturnMatcher &ReturnMatcher::operator=(memcached_st *memc_) { } string ReturnMatcher::describe() const { - return "is " + to_string(expected) - + "\n actual: " + memcached_last_error_message(memc); + return "is " + to_string(expected) + "\n" + + "expected:\n" + + " " + memcached_strerror(memc, expected) + "\n" + + "actual:\n" + + " " + memcached_strerror(memc, actual->v) + "\n" + + " " + (memcached_has_current_error(*memc) ? memcached_last_error_message(memc) : ""); } diff --git a/test/lib/ReturnMatcher.hpp b/test/lib/ReturnMatcher.hpp index f95eb2f3..a0b84fb7 100644 --- a/test/lib/ReturnMatcher.hpp +++ b/test/lib/ReturnMatcher.hpp @@ -22,9 +22,20 @@ public: explicit ReturnMatcher(memcached_st *memc_, memcached_return_t expected_ = MEMCACHED_SUCCESS) : memc{memc_} - , expected{expected_} {} + , expected{expected_} + , actual{new actual_st} {} - ReturnMatcher(const ReturnMatcher &) = default; + ~ReturnMatcher() override { + if (actual) { + delete actual; + } + actual = nullptr; + } + ReturnMatcher(const ReturnMatcher &other) { + expected = other.expected; + memc = other.memc; + actual = new actual_st(other.actual->v); + } bool match(const memcached_return_t &arg) const override; ReturnMatcher success(); @@ -37,6 +48,13 @@ protected: private: memcached_st *memc; memcached_return_t expected{MEMCACHED_SUCCESS}; + + struct actual_st { + memcached_return_t v; + explicit actual_st(memcached_return_t _v = MEMCACHED_SUCCESS) + : v{_v} {} + }; + actual_st *actual; }; class LoneReturnMatcher { diff --git a/test/lib/common.hpp b/test/lib/common.hpp index 083efe0d..39845601 100644 --- a/test/lib/common.hpp +++ b/test/lib/common.hpp @@ -45,12 +45,10 @@ using socket_or_port_t = variant; for (auto &[name, test] : tests) DYNAMIC_SECTION("test " << name) #define REQUIRE_SUCCESS(rc) \ do { \ - INFO("expected: SUCCESS"); \ REQUIRE_THAT(rc, test.returns.success()); \ } while (0) #define REQUIRE_RC(rc, call) \ do { \ - INFO("expected: " << memcached_strerror(nullptr, rc)); \ REQUIRE_THAT(call, test.returns(rc)); \ } while (0) diff --git a/test/tests/memcached/append.cpp b/test/tests/memcached/append.cpp index fec1ea4b..5ff0805f 100644 --- a/test/tests/memcached/append.cpp +++ b/test/tests/memcached/append.cpp @@ -4,14 +4,20 @@ TEST_CASE("memcached_append") { pair tests[] = { {"bin_mixed", MemcachedCluster::mixed()}, - {"mixed", MemcachedCluster::mixed()} + {"meta_mixed", MemcachedCluster::mixed()}, + {"ascii_mixed", MemcachedCluster::mixed()} }; tests[0].second.enableBinaryProto(); + tests[1].second.enableMetaProto(); LOOPED_SECTION(tests) { auto memc = &test.memc; + if (name == "meta_mixed" && !test.isGEVersion(1, 6)) { + continue; + } + SECTION("text") { const char *values[] = { "one", "two", "three", "four" diff --git a/test/tests/memcached/cas.cpp b/test/tests/memcached/cas.cpp index 7ac72ae1..ebc93dd0 100644 --- a/test/tests/memcached/cas.cpp +++ b/test/tests/memcached/cas.cpp @@ -11,31 +11,41 @@ TEST_CASE("memcached_cas") { auto memc = &test.memc; const char *keys[2] = {__func__, NULL}; size_t keylengths[2] = {strlen(__func__), 0}; - - REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, true)); - REQUIRE_SUCCESS(memcached_set(memc, S(__func__), S("we the people"), (time_t) 0, (uint32_t) 0)); - REQUIRE_SUCCESS(memcached_mget(memc, keys, keylengths, 1)); - - memcached_result_st *results = memcached_result_create(memc, nullptr); - REQUIRE(results); - - memcached_return_t rc; - results = memcached_fetch_result(memc, results, &rc); - REQUIRE(results); - REQUIRE_SUCCESS(rc); - - REQUIRE(memcached_result_cas(results)); - REQUIRE("we the people"s == string(memcached_result_value(results), memcached_result_length(results))); - - uint64_t cas = memcached_result_cas(results); - REQUIRE(memcached_success(memcached_cas(memc, S(__func__), S("change the value"), 0, 0, cas))); - - /* - * The item will have a new cas value, so try to set it again with the old - * value. This should fail! - */ - REQUIRE_RC(MEMCACHED_DATA_EXISTS, memcached_cas(memc, S(__func__), S("change the value"), 0, 0, cas)); - - memcached_result_free(results); + auto proto = GENERATE(as(), 0, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, MEMCACHED_BEHAVIOR_META_PROTOCOL); + + DYNAMIC_SECTION((proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL")) { + + if (proto != MEMCACHED_BEHAVIOR_META_PROTOCOL || test.isGEVersion(1, 6)) { + REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, true)); + REQUIRE_SUCCESS( + memcached_set(memc, S(__func__), S("we the people"), (time_t) 0, (uint32_t) 0)); + REQUIRE_SUCCESS(memcached_mget(memc, keys, keylengths, 1)); + + memcached_result_st *results = memcached_result_create(memc, nullptr); + REQUIRE(results); + + memcached_return_t rc; + results = memcached_fetch_result(memc, results, &rc); + REQUIRE(results); + REQUIRE_SUCCESS(rc); + + REQUIRE(memcached_result_cas(results)); + REQUIRE("we the people"s + == string(memcached_result_value(results), memcached_result_length(results))); + + uint64_t cas = memcached_result_cas(results); + REQUIRE( + memcached_success(memcached_cas(memc, S(__func__), S("change the value"), 0, 0, cas))); + + /* + * The item will have a new cas value, so try to set it again with the old + * value. This should fail! + */ + REQUIRE_RC(MEMCACHED_DATA_EXISTS, + memcached_cas(memc, S(__func__), S("change the value"), 0, 0, cas)); + + memcached_result_free(results); + } + } } } diff --git a/test/tests/memcached/dump.cpp b/test/tests/memcached/dump.cpp index bb516ae9..d45853c2 100644 --- a/test/tests/memcached/dump.cpp +++ b/test/tests/memcached/dump.cpp @@ -16,29 +16,36 @@ TEST_CASE("memcached_dump") { LOOPED_SECTION(tests) { auto memc = &test.memc; + auto meta = GENERATE(0, 1); - SECTION("prepared with 64 KVs") { - for (int i = 0; i < 64; ++i) { - char key[8]; - int len = snprintf(key, sizeof(key) - 1, "k_%d", i); + DYNAMIC_SECTION("meta=" << meta) { + if (!meta || test.isGEVersion(1, 6)) { + SECTION("prepared with 64 KVs") { + REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_META_PROTOCOL, meta)); - CHECKED_IF(len) { - REQUIRE_SUCCESS(memcached_set(memc, key, len, key, len, 0, 0)); - } - } + for (int i = 0; i < 64; ++i) { + char key[8]; + int len = snprintf(key, sizeof(key) - 1, "k_%d", i); - memcached_quit(memc); + CHECKED_IF(len) { + REQUIRE_SUCCESS(memcached_set(memc, key, len, key, len, 0, 0)); + } + } - // let memcached sort itself - using namespace chrono_literals; - this_thread::sleep_for(3s); + memcached_quit(memc); - SECTION("dumps 64 KVs") { - size_t counter = 0; - memcached_dump_fn fn[] = {dump_cb}; + // let memcached sort itself + using namespace chrono_literals; + this_thread::sleep_for(3s); - REQUIRE_SUCCESS(memcached_dump(memc, fn, &counter, 1)); - REQUIRE(counter == 64); + SECTION("dumps 64 KVs") { + size_t counter = 0; + memcached_dump_fn fn[] = {dump_cb}; + + REQUIRE_SUCCESS(memcached_dump(memc, fn, &counter, 1)); + REQUIRE(counter == 64); + } + } } } } diff --git a/test/tests/memcached/inc_dec.cpp b/test/tests/memcached/inc_dec.cpp index 00f7435e..40339479 100644 --- a/test/tests/memcached/inc_dec.cpp +++ b/test/tests/memcached/inc_dec.cpp @@ -8,26 +8,32 @@ TEST_CASE("memcached_inc_dec") { LOOPED_SECTION(tests) { auto memc = &test.memc; - uint64_t binary = GENERATE(0, 1); + auto proto = GENERATE(as{}, 0, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, MEMCACHED_BEHAVIOR_META_PROTOCOL); char *prefix = GENERATE(as{}, "", "namespace:"); - REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, binary)); + if (proto == MEMCACHED_BEHAVIOR_META_PROTOCOL && !test.isGEVersion(1, 6)) { + continue; + } + if (proto) { + REQUIRE_SUCCESS(memcached_behavior_set(memc, proto, 1)); + } if (*prefix) { REQUIRE_SUCCESS(memcached_callback_set(memc, MEMCACHED_CALLBACK_NAMESPACE, prefix)); } - DYNAMIC_SECTION("increment (binary=" << binary << ", prefix=" << prefix << ")") { + DYNAMIC_SECTION("increment (" << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ", prefix=" << prefix << ")") { uint64_t number; REQUIRE_SUCCESS(memcached_set(memc, S("number"), S("0"), 0, 0)); for (auto i = 1; i <= 10; ++i) { + INFO("iteration " << i); REQUIRE_SUCCESS(memcached_increment(memc, S("number"), 1, &number)); REQUIRE(number == static_cast(i)); } } - DYNAMIC_SECTION("increment by_key (binary=" << binary << ", prefix=" << prefix << ")") { + DYNAMIC_SECTION("increment by_key (" << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ", prefix=" << prefix << ")") { uint64_t number; REQUIRE_SUCCESS(memcached_set_by_key(memc, S("key"), S("number"), S("0"), 0, 0)); @@ -38,11 +44,11 @@ TEST_CASE("memcached_inc_dec") { } } - DYNAMIC_SECTION("increment with initial (binary=" << binary << ", prefix=" << prefix << ")") { + DYNAMIC_SECTION("increment with initial (" << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ", prefix=" << prefix << ")") { uint64_t number; uint64_t initial = GENERATE(0, 456); - if (!binary) { + if (!proto) { REQUIRE_RC(MEMCACHED_INVALID_ARGUMENTS, memcached_increment_with_initial(memc, S("number"), 1, initial, 0, &number)); } else { @@ -53,11 +59,11 @@ TEST_CASE("memcached_inc_dec") { } } - DYNAMIC_SECTION("increment with initial by_key (binary=" << binary << ", prefix=" << prefix << ")") { + DYNAMIC_SECTION("increment with initial by_key (" << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ", prefix=" << prefix << ")") { uint64_t number; uint64_t initial = GENERATE(0, 456); - if (!binary) { + if (!proto) { REQUIRE_RC(MEMCACHED_INVALID_ARGUMENTS, memcached_increment_with_initial_by_key(memc, S("key"), S("number"), 1, initial, 0, &number)); } else { @@ -68,7 +74,7 @@ TEST_CASE("memcached_inc_dec") { } } - DYNAMIC_SECTION("decrement (binary=" << binary << ", prefix=" << prefix << ")") { + DYNAMIC_SECTION("decrement (" << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ", prefix=" << prefix << ")") { uint64_t number; REQUIRE_SUCCESS(memcached_set(memc, S("number"), S("10"), 0, 0)); @@ -78,7 +84,7 @@ TEST_CASE("memcached_inc_dec") { REQUIRE(number == static_cast(i)); } } - DYNAMIC_SECTION("decrement by_key (binary=" << binary << ", prefix=" << prefix << ")") { + DYNAMIC_SECTION("decrement by_key (" << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ", prefix=" << prefix << ")") { uint64_t number; REQUIRE_SUCCESS(memcached_set_by_key(memc, S("key"), S("number"), S("10"), 0, 0)); @@ -88,11 +94,11 @@ TEST_CASE("memcached_inc_dec") { REQUIRE(number == static_cast(i)); } } - DYNAMIC_SECTION("decrement with initial (binary=" << binary << ", prefix=" << prefix << ")") { + DYNAMIC_SECTION("decrement with initial (" << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ", prefix=" << prefix << ")") { uint64_t number; uint64_t initial = GENERATE(987, 456); - if (!binary) { + if (!proto) { REQUIRE_RC(MEMCACHED_INVALID_ARGUMENTS, memcached_decrement_with_initial(memc, S("number"), 1, initial, 0, &number)); } else { @@ -102,11 +108,11 @@ TEST_CASE("memcached_inc_dec") { REQUIRE(number == initial - 123); } } - DYNAMIC_SECTION("decrement with initial by_key (binary=" << binary << ", prefix=" << prefix << ")") { + DYNAMIC_SECTION("decrement with initial by_key (" << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ", prefix=" << prefix << ")") { uint64_t number; uint64_t initial = GENERATE(987, 456); - if (!binary) { + if (!proto) { REQUIRE_RC(MEMCACHED_INVALID_ARGUMENTS, memcached_decrement_with_initial_by_key(memc, S("key"), S("number"), 1, initial, 0, &number)); } else { diff --git a/test/tests/memcached/simple.cpp b/test/tests/memcached/simple.cpp index 6856d63e..bf7a64ee 100644 --- a/test/tests/memcached/simple.cpp +++ b/test/tests/memcached/simple.cpp @@ -17,19 +17,25 @@ TEST_CASE("memcached_simple") { } uint64_t buffered = GENERATE(0, 1); - uint64_t binary = GENERATE(0, 1); + auto proto = GENERATE(as{}, 0, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, MEMCACHED_BEHAVIOR_META_PROTOCOL); REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, buffered)); - REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, binary)); + if (proto) { + REQUIRE_SUCCESS(memcached_behavior_set(memc, proto, 1)); + } + + if (proto == MEMCACHED_BEHAVIOR_META_PROTOCOL && !test.isGEVersion(1, 6)) { + continue; + } - DYNAMIC_SECTION("set (buffered=" << buffered << ",binary=" << binary << ")") { + DYNAMIC_SECTION("set (buffered=" << buffered << ",proto=" << proto << ")") { for (auto i = 0; i < 10; ++i) { REQUIRE_RC(buffered ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS, memcached_set(memc, S(__func__), S(__func__), 0, 0)); } } - DYNAMIC_SECTION("add (buffered=" << buffered << ",binary=" << binary << ")") { + DYNAMIC_SECTION("add (buffered=" << buffered << " " << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ")") { memcached_return_t rc; Malloced empty(memcached_get(memc, S(__func__), nullptr, nullptr, &rc)); @@ -49,18 +55,18 @@ TEST_CASE("memcached_simple") { REQUIRE(*val); REQUIRE(string(__func__) == string(*val, len)); - REQUIRE_RC(binary ? MEMCACHED_DATA_EXISTS : MEMCACHED_NOTSTORED, + REQUIRE_RC(proto == MEMCACHED_BEHAVIOR_BINARY_PROTOCOL ? MEMCACHED_DATA_EXISTS : MEMCACHED_NOTSTORED, memcached_add(memc, S(__func__), S("update"), 0, 0)); } - DYNAMIC_SECTION("replace (buffered=" << buffered << ",binary=" << binary << ")") { + DYNAMIC_SECTION("replace (buffered=" << buffered << " " << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ")") { REQUIRE_RC(buffered ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS, memcached_set(memc, S(__func__), S(__func__), 0, 0)); REQUIRE_SUCCESS(memcached_replace(memc, S(__func__), S("replaced"), 0, 0)); } - DYNAMIC_SECTION("not found (buffered=" << buffered << ",binary=" << binary << ")") { + DYNAMIC_SECTION("not found (buffered=" << buffered << " " << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ")") { memcached_return_t rc; Malloced val(memcached_get(memc, S("not-found"), nullptr, nullptr, &rc)); REQUIRE_RC(MEMCACHED_NOTFOUND, rc); @@ -71,11 +77,11 @@ TEST_CASE("memcached_simple") { REQUIRE_FALSE(*val); } - DYNAMIC_SECTION("verbosity (buffered=" << buffered << ",binary=" << binary << ")") { + DYNAMIC_SECTION("verbosity (buffered=" << buffered << " " << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ")") { REQUIRE_SUCCESS(memcached_verbosity(memc, 0)); } - DYNAMIC_SECTION("version (buffered=" << buffered << ",binary=" << binary << ")") { + DYNAMIC_SECTION("version (buffered=" << buffered << " " << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL") << ")") { REQUIRE_SUCCESS(memcached_version(memc)); } } diff --git a/test/tests/memcached/touch.cpp b/test/tests/memcached/touch.cpp index 982236b5..fa865b78 100644 --- a/test/tests/memcached/touch.cpp +++ b/test/tests/memcached/touch.cpp @@ -5,11 +5,17 @@ TEST_CASE("memcached_touch") { auto test = MemcachedCluster::mixed(); auto memc = &test.memc; memcached_return_t rc; - auto binary = GENERATE(0, 1); + auto proto = GENERATE(as{}, 0, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, MEMCACHED_BEHAVIOR_META_PROTOCOL); - test.enableBinaryProto(binary); + if (proto) { + REQUIRE_SUCCESS(memcached_behavior_set(memc, proto, 1)); + } + + if (proto == MEMCACHED_BEHAVIOR_META_PROTOCOL && !test.isGEVersion(1, 6)) { + return; + } - DYNAMIC_SECTION("touch binary=" << binary) { + DYNAMIC_SECTION("touch " << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL")) { REQUIRE_FALSE(memcached_get(memc, S(__func__), nullptr, nullptr, &rc)); REQUIRE_RC(MEMCACHED_NOTFOUND, rc); @@ -30,7 +36,7 @@ TEST_CASE("memcached_touch") { REQUIRE_FALSE(*val); } - DYNAMIC_SECTION("touch_by_key binary=" << binary) { + DYNAMIC_SECTION("touch_by_key " << (proto ? libmemcached_string_behavior(proto) + sizeof("MEMCACHED_BEHAVIOR") : "ASCII_PROTOCOL")) { REQUIRE_RC(MEMCACHED_NOTFOUND, memcached_touch_by_key(memc, S(__func__), S(__func__), 60)); REQUIRE_SUCCESS(memcached_set_by_key(memc, S(__func__), S(__func__), S(__func__), 2, 0));