From 324548f9250e1c0ad5814f1436e401ad989c84f3 Mon Sep 17 00:00:00 2001 From: Brian Aker Date: Sun, 1 Jan 2012 18:32:34 -0800 Subject: [PATCH] Fixes UDP protocol to just use sendmsg(). --- libmemcached/do.cc | 17 ++- libmemcached/error.cc | 15 ++- libmemcached/flush.cc | 153 ++++++++++++------------ libmemcached/response.cc | 3 +- libmemcached/verbosity.cc | 8 +- tests/libmemcached-1.0/mem_functions.cc | 5 +- tests/mem_udp.cc | 13 +- 7 files changed, 120 insertions(+), 94 deletions(-) diff --git a/libmemcached/do.cc b/libmemcached/do.cc index d289c479..3c3519c2 100644 --- a/libmemcached/do.cc +++ b/libmemcached/do.cc @@ -40,7 +40,22 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st instance, memcached_literal_param("UDP messages was attempted, but vector was not setup for it")); } - return MEMCACHED_NOT_SUPPORTED; + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + + increment_udp_message_id(instance); + vector[0].buffer= instance->write_buffer; + vector[0].length= UDP_DATAGRAM_HEADER_LENGTH; + + msg.msg_iov= (struct iovec*)vector; + msg.msg_iovlen= count; + + if (::sendmsg(instance->fd, &msg, 0) < 1) + { + return memcached_set_error(*instance, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + } + + return MEMCACHED_SUCCESS; } ssize_t sent_length= memcached_io_writev(instance, vector, count, with_flush); diff --git a/libmemcached/error.cc b/libmemcached/error.cc index f7764d0f..becffd23 100644 --- a/libmemcached/error.cc +++ b/libmemcached/error.cc @@ -395,11 +395,11 @@ static void _error_print(const memcached_error_t *error) if (error->size == 0) { - fprintf(stderr, "%s\n", memcached_strerror(NULL, error->rc) ); + fprintf(stderr, "\t%s\n", memcached_strerror(NULL, error->rc) ); } else { - fprintf(stderr, "%s %s\n", memcached_strerror(NULL, error->rc), error->message); + fprintf(stderr, "\t%s %s\n", memcached_strerror(NULL, error->rc), error->message); } _error_print(error->next); @@ -407,10 +407,19 @@ static void _error_print(const memcached_error_t *error) void memcached_error_print(const memcached_st *self) { - if (not self) + if (self == NULL) + { return; + } _error_print(self->error_messages); + + for (uint32_t x= 0; x < memcached_server_count(self); x++) + { + memcached_server_instance_st instance= memcached_server_instance_by_position(self, x); + + _error_print(instance->error_messages); + } } static void _error_free(memcached_error_t *error) diff --git a/libmemcached/flush.cc b/libmemcached/flush.cc index 44a1429a..6b8ea4b0 100644 --- a/libmemcached/flush.cc +++ b/libmemcached/flush.cc @@ -37,66 +37,97 @@ #include static memcached_return_t memcached_flush_binary(memcached_st *ptr, - time_t expiration); -static memcached_return_t memcached_flush_textual(memcached_st *ptr, - time_t expiration); - -memcached_return_t memcached_flush(memcached_st *ptr, time_t expiration) + time_t expiration, + const bool reply) { - memcached_return_t rc; - if (memcached_failed(rc= initialize_query(ptr, true))) - { - return rc; - } + protocol_binary_request_flush request= {}; - LIBMEMCACHED_MEMCACHED_FLUSH_START(); - if (ptr->flags.binary_protocol) + request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; + request.message.header.request.extlen= 4; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl(request.message.header.request.extlen); + request.message.body.expiration= htonl((uint32_t) expiration); + + memcached_return_t rc= MEMCACHED_SUCCESS; + + for (uint32_t x= 0; x < memcached_server_count(ptr); x++) { - rc= memcached_flush_binary(ptr, expiration); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); + + if (reply) + { + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; + } + else + { + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ; + } + + libmemcached_io_vector_st vector[]= + { + { NULL, 0 }, + { request.bytes, sizeof(request.bytes) } + }; + + memcached_return_t rrc; + if (memcached_failed(rrc= memcached_vdo(instance, vector, 2, true))) + { + if (instance->error_messages == NULL or instance->root->error_messages == NULL) + { + memcached_set_error(*instance, rrc, MEMCACHED_AT); + } + memcached_io_reset(instance); + rc= MEMCACHED_SOME_ERRORS; + } } - else + + for (uint32_t x= 0; x < memcached_server_count(ptr); x++) { - rc= memcached_flush_textual(ptr, expiration); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); + + if (memcached_server_response_count(instance) > 0) + { + (void)memcached_response(instance, NULL, 0, NULL); + } } - LIBMEMCACHED_MEMCACHED_FLUSH_END(); return rc; } static memcached_return_t memcached_flush_textual(memcached_st *ptr, - time_t expiration) + time_t expiration, + const bool reply) { - // Invert the logic to make it simpler to read the code - bool reply= memcached_is_replying(ptr); - - char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + char buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1]; int send_length= 0; if (expiration) { send_length= snprintf(buffer, sizeof(buffer), "%llu", (unsigned long long)expiration); } - if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE or send_length < 0) + if (size_t(send_length) >= sizeof(buffer) or send_length < 0) { return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)")); } - libmemcached_io_vector_st vector[]= - { - { memcached_literal_param("flush_all ") }, - { buffer, send_length }, - { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") }, - { memcached_literal_param("\r\n") } - }; - memcached_return_t rc= MEMCACHED_SUCCESS; for (uint32_t x= 0; x < memcached_server_count(ptr); x++) { memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); - memcached_return_t rrc= memcached_vdo(instance, vector, 4, true); - if (rrc == MEMCACHED_SUCCESS and reply == true) + libmemcached_io_vector_st vector[]= + { + { NULL, 0 }, + { memcached_literal_param("flush_all ") }, + { buffer, send_length }, + { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") }, + { memcached_literal_param("\r\n") } + }; + + memcached_return_t rrc= memcached_vdo(instance, vector, 5, true); + if (memcached_success(rrc) and reply == true) { char response_buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; rrc= memcached_response(instance, response_buffer, sizeof(response_buffer), NULL); @@ -105,7 +136,7 @@ static memcached_return_t memcached_flush_textual(memcached_st *ptr, if (memcached_failed(rrc)) { // If an error has already been reported, then don't add to it - if (instance->error_messages == NULL) + if (instance->error_messages == NULL or instance->root->error_messages == NULL) { memcached_set_error(*instance, rrc, MEMCACHED_AT); } @@ -116,56 +147,26 @@ static memcached_return_t memcached_flush_textual(memcached_st *ptr, return rc; } -static memcached_return_t memcached_flush_binary(memcached_st *ptr, - time_t expiration) +memcached_return_t memcached_flush(memcached_st *ptr, time_t expiration) { - protocol_binary_request_flush request= {}; - - request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ; - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; - request.message.header.request.extlen= 4; - request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; - request.message.header.request.bodylen= htonl(request.message.header.request.extlen); - request.message.body.expiration= htonl((uint32_t) expiration); - - memcached_return_t rc= MEMCACHED_SUCCESS; - - for (uint32_t x= 0; x < memcached_server_count(ptr); x++) + memcached_return_t rc; + if (memcached_failed(rc= initialize_query(ptr, true))) { - memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); - - if (memcached_is_replying(ptr)) - { - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; - } - else - { - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ; - } + return rc; + } - libmemcached_io_vector_st vector[]= - { - { request.bytes, sizeof(request.bytes) } - }; + bool reply= memcached_is_replying(ptr); - memcached_return_t rrc; - if ((rrc= memcached_vdo(instance, vector, 1, true))) - { - memcached_set_error(*instance, rrc, MEMCACHED_AT); - memcached_io_reset(instance); - rc= MEMCACHED_SOME_ERRORS; - } + LIBMEMCACHED_MEMCACHED_FLUSH_START(); + if (memcached_is_binary(ptr)) + { + rc= memcached_flush_binary(ptr, expiration, reply); } - - for (uint32_t x= 0; x < memcached_server_count(ptr); x++) + else { - memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); - - if (memcached_server_response_count(instance) > 0) - { - (void)memcached_response(instance, NULL, 0, NULL); - } + rc= memcached_flush_textual(ptr, expiration, reply); } + LIBMEMCACHED_MEMCACHED_FLUSH_END(); return rc; } diff --git a/libmemcached/response.cc b/libmemcached/response.cc index 445b9233..435cafe7 100644 --- a/libmemcached/response.cc +++ b/libmemcached/response.cc @@ -359,8 +359,9 @@ static memcached_return_t textual_read_one_response(memcached_server_write_insta break; } + buffer[total_read]= 0; return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT, - memcached_literal_param("Could not determine response")); + buffer, total_read); } static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr, diff --git a/libmemcached/verbosity.cc b/libmemcached/verbosity.cc index 557fb005..334a3316 100644 --- a/libmemcached/verbosity.cc +++ b/libmemcached/verbosity.cc @@ -59,7 +59,7 @@ static memcached_return_t _set_verbosity(const memcached_st *, memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc_ptr, 0); - rc= memcached_vdo(instance, vector, 3, true); + rc= memcached_vdo(instance, vector, 4, true); if (rc == MEMCACHED_SUCCESS) { @@ -81,11 +81,6 @@ memcached_return_t memcached_verbosity(memcached_st *ptr, uint32_t verbosity) return rc; } - if (memcached_is_udp(ptr)) - { - return MEMCACHED_NOT_SUPPORTED; - } - memcached_server_fn callbacks[1]; char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; @@ -99,6 +94,7 @@ memcached_return_t memcached_verbosity(memcached_st *ptr, uint32_t verbosity) libmemcached_io_vector_st vector[]= { + { NULL, 0 }, { memcached_literal_param("verbosity ") }, { buffer, send_length }, { memcached_literal_param("\r\n") } diff --git a/tests/libmemcached-1.0/mem_functions.cc b/tests/libmemcached-1.0/mem_functions.cc index b2b807b6..dd2abef2 100644 --- a/tests/libmemcached-1.0/mem_functions.cc +++ b/tests/libmemcached-1.0/mem_functions.cc @@ -2119,12 +2119,13 @@ static test_return_t fetch_all_results(memcached_st *memc, unsigned int &keys_re { keys_returned= 0; - memcached_result_st* result; - while ((result= memcached_fetch_result(memc, NULL, &rc))) + memcached_result_st* result= NULL; + while ((result= memcached_fetch_result(memc, result, &rc))) { test_compare(MEMCACHED_SUCCESS, rc); keys_returned+= 1; } + memcached_result_free(result); return TEST_SUCCESS; } diff --git a/tests/mem_udp.cc b/tests/mem_udp.cc index 577dabe8..1591bdcf 100644 --- a/tests/mem_udp.cc +++ b/tests/mem_udp.cc @@ -208,7 +208,7 @@ static test_return_t version_TEST(memcached_st *memc) static test_return_t verbosity_TEST(memcached_st *memc) { - test_compare(MEMCACHED_NOT_SUPPORTED, memcached_verbosity(memc, 0)); + test_compare(MEMCACHED_SUCCESS, memcached_verbosity(memc, 0)); return TEST_SUCCESS; } @@ -320,17 +320,19 @@ static test_return_t udp_buffered_set_test(memcached_st *memc) static test_return_t udp_set_too_big_test(memcached_st *memc) { test_true(memc); - char value[MAX_UDP_DATAGRAM_LENGTH]; Expected expected_ids; get_udp_request_ids(memc, expected_ids); - memset(value, int('f'), sizeof(value)); + std::vector value; + value.resize(1024 * 1024 * 10); test_compare_hint(MEMCACHED_WRITE_FAILURE, - memcached_set(memc, test_literal_param("bar"), - test_literal_param(value), + memcached_set(memc, + test_literal_param(__func__), + &value[0], value.size(), time_t(0), uint32_t(0)), memcached_last_error_message(memc)); + memcached_quit(memc); return post_udp_op_check(memc, expected_ids); } @@ -401,6 +403,7 @@ static test_return_t udp_flush_test(memcached_st *memc) { increment_request_id(&expected_ids[x]); } + memcached_error_print(memc); test_compare_hint(MEMCACHED_SUCCESS, memcached_flush(memc, 0), memcached_last_error_message(memc)); return post_udp_op_check(memc, expected_ids); -- 2.30.2