From 9265fa2a7ff654250faa9eef568dce3f9ec5dddf Mon Sep 17 00:00:00 2001 From: Brian Aker Date: Sat, 8 Oct 2011 16:34:32 -0700 Subject: [PATCH] Merge in fixes for UDP --- libmemcached/behavior.cc | 20 +++- libmemcached/connect.cc | 11 +- libmemcached/do.cc | 10 +- libmemcached/flush.cc | 89 ++++++++------- libmemcached/hosts.cc | 46 ++------ libmemcached/io.cc | 31 +++--- libmemcached/is.h | 1 + libmemcached/memcached.cc | 45 ++++---- libmemcached/quit.cc | 4 +- libmemcached/server.cc | 2 +- libmemcached/storage.cc | 184 +++++++++++++++---------------- tests/include.am | 4 +- tests/mem_udp.cc | 223 ++++++++++++++------------------------ 13 files changed, 307 insertions(+), 363 deletions(-) diff --git a/libmemcached/behavior.cc b/libmemcached/behavior.cc index 2b3b389c..64456a4b 100644 --- a/libmemcached/behavior.cc +++ b/libmemcached/behavior.cc @@ -114,19 +114,22 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr, break; case MEMCACHED_BEHAVIOR_BUFFER_REQUESTS: + if (ptr->flags.use_udp) + { + return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("MEMCACHED_BEHAVIOR_BUFFER_REQUESTS cannot be set while MEMCACHED_BEHAVIOR_USE_UDP is enabled.")); + } ptr->flags.buffer_requests= bool(data); send_quit(ptr); break; case MEMCACHED_BEHAVIOR_USE_UDP: - if (memcached_server_count(ptr)) - { - return MEMCACHED_FAILURE; - } + send_quit(ptr); // We need t shutdown all of the connections to make sure we do the correct protocol ptr->flags.use_udp= bool(data); - if (data) + if (bool(data)) { - ptr->flags.no_reply= bool(data); + ptr->flags.no_reply= true; + ptr->flags.buffer_requests= false; } break; @@ -228,6 +231,11 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr, break; case MEMCACHED_BEHAVIOR_NOREPLY: + if (ptr->flags.use_udp and bool(data) == false) + { + return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("MEMCACHED_BEHAVIOR_NOREPLY cannot be disabled while MEMCACHED_BEHAVIOR_USE_UDP is enabled.")); + } ptr->flags.no_reply= bool(data); break; diff --git a/libmemcached/connect.cc b/libmemcached/connect.cc index 09cac309..3cc8e245 100644 --- a/libmemcached/connect.cc +++ b/libmemcached/connect.cc @@ -144,7 +144,7 @@ static memcached_return_t set_hostinfo(memcached_server_st *server) #if 0 hints.ai_family= AF_INET; #endif - if (server->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(server->root)) { hints.ai_protocol= IPPROTO_UDP; hints.ai_socktype= SOCK_DGRAM; @@ -226,7 +226,7 @@ static void set_socket_options(memcached_server_st *server) { assert_msg(server->fd != -1, "invalid socket was passed to set_socket_options()"); - if (server->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(server->root)) { return; } @@ -442,7 +442,7 @@ static memcached_return_t network_connect(memcached_server_st *server) while (server->address_info_next and server->fd == INVALID_SOCKET) { /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */ - if (server->type == MEMCACHED_CONNECTION_UDP && server->address_info_next->ai_family != AF_INET) + if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET) { server->address_info_next= server->address_info_next->ai_next; continue; @@ -615,6 +615,11 @@ memcached_return_t memcached_connect(memcached_server_write_instance_st server) return rc; } + if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root)) + { + return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections")); + } + /* We need to clean up the multi startup piece */ switch (server->type) { diff --git a/libmemcached/do.cc b/libmemcached/do.cc index 551b28ea..c3cfab93 100644 --- a/libmemcached/do.cc +++ b/libmemcached/do.cc @@ -30,7 +30,7 @@ memcached_return_t memcached_do(memcached_server_write_instance_st ptr, const vo ** before they start writing, if there is any data in buffer, clear it out, ** otherwise we might get a partial write. **/ - if (ptr->type == MEMCACHED_CONNECTION_UDP && with_flush && ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH) + if (memcached_is_udp(ptr->root) and with_flush and ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH) { memcached_io_write(ptr, NULL, 0, true); } @@ -70,9 +70,13 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr, ** before they start writing, if there is any data in buffer, clear it out, ** otherwise we might get a partial write. **/ - if (ptr->type == MEMCACHED_CONNECTION_UDP && with_flush && ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH) + if (memcached_is_udp(ptr->root) and with_flush and ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH) { - memcached_io_write(ptr, NULL, 0, true); + if (memcached_io_write(ptr, NULL, 0, true) == -1) + { + memcached_io_reset(ptr); + return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + } } ssize_t sent_length= memcached_io_writev(ptr, vector, count, with_flush); diff --git a/libmemcached/flush.cc b/libmemcached/flush.cc index 5895f030..f2c490ba 100644 --- a/libmemcached/flush.cc +++ b/libmemcached/flush.cc @@ -51,54 +51,68 @@ memcached_return_t memcached_flush(memcached_st *ptr, time_t expiration) LIBMEMCACHED_MEMCACHED_FLUSH_START(); if (ptr->flags.binary_protocol) + { rc= memcached_flush_binary(ptr, expiration); + } else + { rc= memcached_flush_textual(ptr, expiration); + } LIBMEMCACHED_MEMCACHED_FLUSH_END(); + return rc; } static memcached_return_t memcached_flush_textual(memcached_st *ptr, time_t expiration) { - unlikely (memcached_server_count(ptr) == 0) - return MEMCACHED_NO_SERVERS; + bool reply= ptr->flags.no_reply ? false : true; - for (unsigned int x= 0; x < memcached_server_count(ptr); x++) + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + int send_length; + if (expiration) + { + send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, + "flush_all %llu%s\r\n", + (unsigned long long)expiration, reply ? "" : " noreply"); + } + else { - memcached_return_t rc; - char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, + "flush_all%s\r\n", reply ? "" : " noreply"); + } - bool no_reply= ptr->flags.no_reply; - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(ptr, x); + if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE or send_length < 0) + { + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, + memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)")); + } - int send_length; - if (expiration) - { - send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "flush_all %llu%s\r\n", - (unsigned long long)expiration, no_reply ? " noreply" : ""); - } - else + + memcached_return_t rc= MEMCACHED_SUCCESS; + for (unsigned int x= 0; x < memcached_server_count(ptr); x++) + { + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); + + memcached_return_t rrc= memcached_do(instance, buffer, (size_t)send_length, true); + if (rrc == MEMCACHED_SUCCESS and reply == true) { - send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "flush_all%s\r\n", no_reply ? " noreply" : ""); + char response_buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rrc= memcached_response(instance, response_buffer, sizeof(response_buffer), NULL); } - if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE || send_length < 0) + if (memcached_failed(rrc)) { - return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, - memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)")); + // If an error has already been reported, then don't add to it + if (instance->error_messages == NULL) + { + memcached_set_error(*instance, rrc, MEMCACHED_AT); + } + rc= MEMCACHED_SOME_ERRORS; } - - rc= memcached_do(instance, buffer, (size_t)send_length, true); - - if (rc == MEMCACHED_SUCCESS && !no_reply) - (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); } - return MEMCACHED_SUCCESS; + return rc; } static memcached_return_t memcached_flush_binary(memcached_st *ptr, @@ -106,9 +120,6 @@ static memcached_return_t memcached_flush_binary(memcached_st *ptr, { protocol_binary_request_flush request= {}; - unlikely (memcached_server_count(ptr) == 0) - return MEMCACHED_NO_SERVERS; - request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ; request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; request.message.header.request.extlen= 4; @@ -116,10 +127,11 @@ static memcached_return_t memcached_flush_binary(memcached_st *ptr, request.message.header.request.bodylen= htonl(request.message.header.request.extlen); request.message.body.expiration= htonl((uint32_t) expiration); + memcached_return_t rc= MEMCACHED_SUCCESS; + for (uint32_t x= 0; x < memcached_server_count(ptr); x++) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(ptr, x); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); if (ptr->flags.no_reply) { @@ -130,21 +142,24 @@ static memcached_return_t memcached_flush_binary(memcached_st *ptr, request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; } - if (memcached_do(instance, request.bytes, sizeof(request.bytes), true) != MEMCACHED_SUCCESS) + memcached_return_t rrc; + if ((rrc= memcached_do(instance, request.bytes, sizeof(request.bytes), true))) { + memcached_set_error(*instance, rrc, MEMCACHED_AT); memcached_io_reset(instance); - return MEMCACHED_WRITE_FAILURE; + rc= MEMCACHED_SOME_ERRORS; } } for (uint32_t x= 0; x < memcached_server_count(ptr); x++) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(ptr, x); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); if (memcached_server_response_count(instance) > 0) + { (void)memcached_response(instance, NULL, 0, NULL); + } } - return MEMCACHED_SUCCESS; + return rc; } diff --git a/libmemcached/hosts.cc b/libmemcached/hosts.cc index 30456559..fe3c977a 100644 --- a/libmemcached/hosts.cc +++ b/libmemcached/hosts.cc @@ -45,11 +45,10 @@ static memcached_return_t update_continuum(memcached_st *ptr); static int compare_servers(const void *p1, const void *p2) { - int return_value; memcached_server_instance_st a= (memcached_server_instance_st)p1; memcached_server_instance_st b= (memcached_server_instance_st)p2; - return_value= strcmp(a->hostname, b->hostname); + int return_value= strcmp(a->hostname, b->hostname); if (return_value == 0) { @@ -353,11 +352,6 @@ static memcached_return_t server_add(memcached_st *ptr, memcached_connection_t type) { assert_msg(ptr, "Programmer mistake, somehow server_add() was passed a NULL memcached_st"); - if ( (ptr->flags.use_udp and type != MEMCACHED_CONNECTION_UDP) - or ( (type == MEMCACHED_CONNECTION_UDP) and (not ptr->flags.use_udp) ) ) - { - return memcached_set_error(*ptr, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT); - } memcached_server_st *new_host_list= static_cast(libmemcached_realloc(ptr, memcached_server_list(ptr), sizeof(memcached_server_st) * (ptr->number_of_hosts + 1))); @@ -394,7 +388,7 @@ static memcached_return_t server_add(memcached_st *ptr, memcached_return_t memcached_server_push(memcached_st *ptr, const memcached_server_list_st list) { - if (not list) + if (list == NULL) { return MEMCACHED_SUCCESS; } @@ -405,8 +399,10 @@ memcached_return_t memcached_server_push(memcached_st *ptr, const memcached_serv new_host_list= static_cast(libmemcached_realloc(ptr, memcached_server_list(ptr), sizeof(memcached_server_st) * (count + memcached_server_count(ptr)))); - if (not new_host_list) + if (new_host_list == NULL) + { return MEMCACHED_MEMORY_ALLOCATION_FAILURE; + } memcached_server_list_set(ptr, new_host_list); @@ -414,12 +410,6 @@ memcached_return_t memcached_server_push(memcached_st *ptr, const memcached_serv { memcached_server_write_instance_st instance; - if ((ptr->flags.use_udp && list[x].type != MEMCACHED_CONNECTION_UDP) - or ((list[x].type == MEMCACHED_CONNECTION_UDP) and not (ptr->flags.use_udp)) ) - { - return MEMCACHED_INVALID_HOST_PROTOCOL; - } - WATCHPOINT_ASSERT(list[x].hostname[0] != 0); // We have extended the array, and now we will find it, and use it. @@ -484,32 +474,16 @@ memcached_return_t memcached_server_add_udp(memcached_st *ptr, } memcached_return_t memcached_server_add_udp_with_weight(memcached_st *ptr, - const char *hostname, - in_port_t port, - uint32_t weight) + const char *, + in_port_t, + uint32_t) { if (ptr == NULL) { return MEMCACHED_INVALID_ARGUMENTS; } - if (not port) - { - port= MEMCACHED_DEFAULT_PORT; - } - - if (not hostname) - { - hostname= "localhost"; - } - - memcached_string_t _hostname= { memcached_string_make_from_cstr(hostname) }; - if (memcached_is_valid_servername(_hostname) == false) - { - memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("Invalid hostname provided")); - } - - return server_add(ptr, _hostname, port, weight, MEMCACHED_CONNECTION_UDP); + return memcached_set_error(*ptr, MEMCACHED_DEPRECATED, MEMCACHED_AT); } memcached_return_t memcached_server_add(memcached_st *ptr, @@ -538,7 +512,7 @@ memcached_return_t memcached_server_add_with_weight(memcached_st *ptr, if (hostname_length == 0) { hostname= "localhost"; - hostname_length= sizeof("localhost") -1; + hostname_length= memcached_literal_param_size("localhost"); } memcached_string_t _hostname= { hostname, hostname_length }; diff --git a/libmemcached/io.cc b/libmemcached/io.cc index c6d647be..8642c187 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -315,14 +315,13 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); // UDP Sanity check, make sure that we are not sending somthing too big - if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) + if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH) { *error= MEMCACHED_WRITE_FAILURE; return -1; } - if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP - && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) + if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) { return 0; } @@ -339,8 +338,10 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); - if (ptr->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(ptr->root)) + { increment_udp_message_id(ptr); + } ssize_t sent_length= 0; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -375,8 +376,7 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, * buffer for more data and retry the write before * waiting.. */ - if (repack_input_buffer(ptr) or - process_input_buffer(ptr)) + if (repack_input_buffer(ptr) or process_input_buffer(ptr)) { continue; } @@ -406,19 +406,18 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, } } - if (ptr->type == MEMCACHED_CONNECTION_UDP and - size_t(sent_length) != write_length) + if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length) { memcached_quit_server(ptr, true); *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); return -1; } - ptr->io_bytes_sent += (uint32_t) sent_length; + ptr->io_bytes_sent+= uint32_t(sent_length); local_write_ptr+= sent_length; - write_length-= (uint32_t) sent_length; - return_length+= (uint32_t) sent_length; + write_length-= uint32_t(sent_length); + return_length+= uint32_t(sent_length); } WATCHPOINT_ASSERT(write_length == 0); @@ -427,10 +426,14 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, // if we are a udp server, the begining of the buffer is reserverd for // the upd frame header - if (ptr->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(ptr->root)) + { ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; + } else + { ptr->write_buffer_offset= 0; + } return (ssize_t) return_length; } @@ -621,7 +624,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, size_t should_write; size_t buffer_end; - if (ptr->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(ptr->root)) { //UDP does not support partial writes buffer_end= MAX_UDP_DATAGRAM_LENGTH; @@ -644,7 +647,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) + if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); diff --git a/libmemcached/is.h b/libmemcached/is.h index f155334b..9f8783d8 100644 --- a/libmemcached/is.h +++ b/libmemcached/is.h @@ -39,6 +39,7 @@ /* These are private */ #define memcached_is_allocated(__object) ((__object)->options.is_allocated) +#define memcached_is_udp(__object) ((__object)->flags.use_udp) #define memcached_is_initialized(__object) ((__object)->options.is_initialized) #define memcached_is_purging(__object) ((__object)->state.is_purging) #define memcached_is_processing_input(__object) ((__object)->state.is_processing_input) diff --git a/libmemcached/memcached.cc b/libmemcached/memcached.cc index e81f19ee..163658ef 100644 --- a/libmemcached/memcached.cc +++ b/libmemcached/memcached.cc @@ -89,7 +89,7 @@ static inline bool _memcached_init(memcached_st *self) self->distribution= MEMCACHED_DISTRIBUTION_MODULA; - if (not hashkit_create(&self->hashkit)) + if (hashkit_create(&self->hashkit) == NULL) { return false; } @@ -190,21 +190,21 @@ static void _free(memcached_st *ptr, bool release_st) memcached_st *memcached_create(memcached_st *ptr) { - if (ptr == NULL) + if (ptr) + { + ptr->options.is_allocated= false; + } + else { ptr= (memcached_st *)malloc(sizeof(memcached_st)); - if (not ptr) + if (ptr == NULL) { return NULL; /* MEMCACHED_MEMORY_ALLOCATION_FAILURE */ } ptr->options.is_allocated= true; } - else - { - ptr->options.is_allocated= false; - } #if 0 memcached_set_purging(ptr, false); @@ -231,19 +231,18 @@ memcached_st *memcached_create(memcached_st *ptr) memcached_st *memcached(const char *string, size_t length) { memcached_st *self= memcached_create(NULL); - if (not self) + if (self == NULL) { errno= ENOMEM; return NULL; } - if (not length) + if (length == 0) { return self; } memcached_return_t rc= memcached_parse_configuration(self, string, length); - if (memcached_success(rc) and memcached_parse_filename(self)) { rc= memcached_parse_configure_file(*self, memcached_parse_filename(self), memcached_parse_filename_length(self)); @@ -319,20 +318,22 @@ void memcached_free(memcached_st *ptr) */ memcached_st *memcached_clone(memcached_st *clone, const memcached_st *source) { - memcached_return_t rc= MEMCACHED_SUCCESS; - - if (not source) + if (source == NULL) + { return memcached_create(clone); + } - if (clone && memcached_is_allocated(clone)) + if (clone and memcached_is_allocated(clone)) { return NULL; } memcached_st *new_clone= memcached_create(clone); - if (not new_clone) + if (new_clone == NULL) + { return NULL; + } new_clone->flags= source->flags; new_clone->send_size= source->send_size; @@ -342,7 +343,7 @@ memcached_st *memcached_clone(memcached_st *clone, const memcached_st *source) new_clone->retry_timeout= source->retry_timeout; new_clone->distribution= source->distribution; - if (not hashkit_clone(&new_clone->hashkit, &source->hashkit)) + if (hashkit_clone(&new_clone->hashkit, &source->hashkit) == NULL) { memcached_free(new_clone); return NULL; @@ -369,14 +370,10 @@ memcached_st *memcached_clone(memcached_st *clone, const memcached_st *source) if (memcached_server_count(source)) { - rc= memcached_push(new_clone, source); - } - - if (memcached_failed(rc)) - { - memcached_free(new_clone); - - return NULL; + if (memcached_failed(memcached_push(new_clone, source))) + { + return NULL; + } } diff --git a/libmemcached/quit.cc b/libmemcached/quit.cc index 0162700d..ffd4cd45 100644 --- a/libmemcached/quit.cc +++ b/libmemcached/quit.cc @@ -50,7 +50,7 @@ void memcached_quit_server(memcached_server_st *ptr, bool io_death) { if (ptr->fd != INVALID_SOCKET) { - if (io_death == false && ptr->type != MEMCACHED_CONNECTION_UDP && ptr->options.is_shutting_down == false) + if (io_death == false and memcached_is_udp(ptr->root) == false and ptr->options.is_shutting_down == false) { ptr->options.is_shutting_down= true; @@ -100,7 +100,7 @@ void memcached_quit_server(memcached_server_st *ptr, bool io_death) ptr->state= MEMCACHED_SERVER_STATE_NEW; ptr->cursor_active= 0; ptr->io_bytes_sent= 0; - ptr->write_buffer_offset= (size_t) ((ptr->type == MEMCACHED_CONNECTION_UDP) ? UDP_DATAGRAM_HEADER_LENGTH : 0); + ptr->write_buffer_offset= size_t(ptr->root and memcached_is_udp(ptr->root) ? UDP_DATAGRAM_HEADER_LENGTH : 0); ptr->read_buffer_length= 0; ptr->read_ptr= ptr->read_buffer; ptr->options.is_shutting_down= false; diff --git a/libmemcached/server.cc b/libmemcached/server.cc index c9c5ffca..cf63f4ca 100644 --- a/libmemcached/server.cc +++ b/libmemcached/server.cc @@ -132,7 +132,7 @@ memcached_server_st *__server_create_with(memcached_st *memc, _server_init(self, const_cast(memc), hostname, port, weight, type); - if (type == MEMCACHED_CONNECTION_UDP) + if (memc and memcached_is_udp(memc)) { self->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; memcached_io_init_udp_header(self, 0); diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index 12c29275..f3616251 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -49,58 +49,48 @@ static inline const char *storage_op_string(memcached_storage_action_t verb) static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply) { - /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise - * be used uninitialized in this function. FAIL */ - uint8_t ret= 0; - if (noreply) + { switch (verb) { case SET_OP: - ret=PROTOCOL_BINARY_CMD_SETQ; - break; - case ADD_OP: - ret=PROTOCOL_BINARY_CMD_ADDQ; - break; - case CAS_OP: /* FALLTHROUGH */ - case REPLACE_OP: - ret=PROTOCOL_BINARY_CMD_REPLACEQ; - break; - case APPEND_OP: - ret=PROTOCOL_BINARY_CMD_APPENDQ; - break; - case PREPEND_OP: - ret=PROTOCOL_BINARY_CMD_PREPENDQ; - break; - default: - WATCHPOINT_ASSERT(verb); - break; - } - else - switch (verb) - { - case SET_OP: - ret=PROTOCOL_BINARY_CMD_SET; - break; + return PROTOCOL_BINARY_CMD_SETQ; + case ADD_OP: - ret=PROTOCOL_BINARY_CMD_ADD; - break; + return PROTOCOL_BINARY_CMD_ADDQ; + case CAS_OP: /* FALLTHROUGH */ case REPLACE_OP: - ret=PROTOCOL_BINARY_CMD_REPLACE; - break; + return PROTOCOL_BINARY_CMD_REPLACEQ; + case APPEND_OP: - ret=PROTOCOL_BINARY_CMD_APPEND; - break; + return PROTOCOL_BINARY_CMD_APPENDQ; + case PREPEND_OP: - ret=PROTOCOL_BINARY_CMD_PREPEND; - break; - default: - WATCHPOINT_ASSERT(verb); - break; + return PROTOCOL_BINARY_CMD_PREPENDQ; } + } - return ret; + switch (verb) + { + case SET_OP: + break; + + case ADD_OP: + return PROTOCOL_BINARY_CMD_ADD; + + case CAS_OP: /* FALLTHROUGH */ + case REPLACE_OP: + return PROTOCOL_BINARY_CMD_REPLACE; + + case APPEND_OP: + return PROTOCOL_BINARY_CMD_APPEND; + + case PREPEND_OP: + return PROTOCOL_BINARY_CMD_PREPEND; + } + + return PROTOCOL_BINARY_CMD_SET; } static memcached_return_t memcached_send_binary(memcached_st *ptr, @@ -113,9 +103,9 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, time_t expiration, uint32_t flags, uint64_t cas, + bool flush, memcached_storage_action_t verb) { - bool flush; protocol_binary_request_set request= {}; size_t send_length= sizeof(request.bytes); @@ -125,7 +115,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, request.message.header.request.opcode= get_com_code(verb, noreply); request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace))); request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; - if (verb == APPEND_OP || verb == PREPEND_OP) + if (verb == APPEND_OP or verb == PREPEND_OP) { send_length -= 8; /* append & prepend does not contain extras! */ } @@ -144,9 +134,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, request.message.header.request.cas= memcached_htonll(cas); } - flush= (bool) ((server->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1); - - if (server->root->flags.use_udp && ! flush) + if (server->root->flags.use_udp and flush == false) { size_t cmd_size= send_length + key_length + value_length; @@ -173,7 +161,13 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, if ((rc= memcached_vdo(server, vector, 4, flush)) != MEMCACHED_SUCCESS) { memcached_io_reset(server); - return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc; + + if (ptr->error_messages == NULL) + { + memcached_set_error(*server, rc, MEMCACHED_AT); + } + + return MEMCACHED_WRITE_FAILURE; } if (verb == SET_OP && ptr->number_of_replicas > 0) @@ -183,13 +177,13 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, for (uint32_t x= 0; x < ptr->number_of_replicas; x++) { - memcached_server_write_instance_st instance; - ++server_key; if (server_key == memcached_server_count(ptr)) + { server_key= 0; + } - instance= memcached_server_instance_fetch(ptr, server_key); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); if (memcached_vdo(instance, vector, 4, false) != MEMCACHED_SUCCESS) { @@ -224,12 +218,11 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, time_t expiration, uint32_t flags, uint64_t cas, + bool flush, memcached_storage_action_t verb) { - bool to_write; size_t write_length; char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - memcached_return_t rc; if (cas) { @@ -242,13 +235,10 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, (unsigned long long)expiration, (unsigned long)value_length, (unsigned long long)cas, (ptr->flags.no_reply) ? " noreply" : ""); - if (check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE || check_length < 0) + if (check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE or check_length < 0) { - rc= memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, - memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)")); - memcached_io_reset(instance); - - return rc; + return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, + memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)")); } write_length= check_length; } @@ -276,76 +266,79 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, flags, (unsigned long long)expiration, (unsigned long)value_length, ptr->flags.no_reply ? " noreply" : ""); - if ((size_t)check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE -size_t(buffer_ptr - buffer) || check_length < 0) + if ((size_t)check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE -size_t(buffer_ptr - buffer) or check_length < 0) { - rc= memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)")); - memcached_io_reset(instance); - - return rc; } write_length+= (size_t)check_length; WATCHPOINT_ASSERT(write_length < MEMCACHED_DEFAULT_COMMAND_SIZE); } - if (ptr->flags.use_udp && ptr->flags.buffer_requests) + if (ptr->flags.use_udp and ptr->flags.buffer_requests) { size_t cmd_size= write_length + value_length +2; if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) + { return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + } if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) + { memcached_io_write(instance, NULL, 0, true); + } } if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) { - rc= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); } - else + + struct libmemcached_io_vector_st vector[]= { - struct libmemcached_io_vector_st vector[]= - { - { buffer, write_length }, - { value, value_length }, - { memcached_literal_param("\r\n") } - }; + { buffer, write_length }, + { value, value_length }, + { memcached_literal_param("\r\n") } + }; - if (ptr->flags.buffer_requests && verb == SET_OP) + if (memcached_is_udp(instance->root) and (write_length +value_length +memcached_literal_param_size("\r\n") +UDP_DATAGRAM_HEADER_LENGTH > MAX_UDP_DATAGRAM_LENGTH)) + { + return memcached_set_error(*instance, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT, memcached_literal_param("UDP packet is too large")); + } + + /* Send command header */ + memcached_return_t rc= memcached_vdo(instance, vector, 3, flush); + if (rc == MEMCACHED_SUCCESS) + { + if (ptr->flags.no_reply and flush) { - to_write= false; + rc= MEMCACHED_SUCCESS; } - else + else if (flush == false) { - to_write= true; + rc= MEMCACHED_BUFFERED; } - - /* Send command header */ - rc= memcached_vdo(instance, vector, 3, to_write); - if (rc == MEMCACHED_SUCCESS) + else { + rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); - if (ptr->flags.no_reply) - { - rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; - } - else if (to_write == false) + if (rc == MEMCACHED_STORED) { - rc= MEMCACHED_BUFFERED; - } - else - { - rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); - - if (rc == MEMCACHED_STORED) - rc= MEMCACHED_SUCCESS; + rc= MEMCACHED_SUCCESS; } } } if (rc == MEMCACHED_WRITE_FAILURE) + { memcached_io_reset(instance); + } + + if (memcached_failed(rc) and ptr->error_messages == NULL) + { + memcached_set_error(*ptr, rc, MEMCACHED_AT); + } return rc; } @@ -381,19 +374,20 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, WATCHPOINT_SET(instance->io_wait_count.read= 0); WATCHPOINT_SET(instance->io_wait_count.write= 0); + bool flush= (bool) ((instance->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1); if (ptr->flags.binary_protocol) { rc= memcached_send_binary(ptr, instance, server_key, key, key_length, value, value_length, expiration, - flags, cas, verb); + flags, cas, flush, verb); } else { rc= memcached_send_ascii(ptr, instance, key, key_length, value, value_length, expiration, - flags, cas, verb); + flags, cas, flush, verb); } return rc; diff --git a/tests/include.am b/tests/include.am index b51dc2c3..e21afbb7 100644 --- a/tests/include.am +++ b/tests/include.am @@ -364,10 +364,10 @@ gdb-sasl: tests/sasl @$(DEBUG_COMMAND) tests/sasl gdb-atom: tests/atomsmasher - @$(DEBUG_COMMAND) tests/testudp + @$(DEBUG_COMMAND) tests/atomsmasher gdb-udp: tests/testudp - @$(DEBUG_COMMAND) tests/atomsmasher + @$(DEBUG_COMMAND) tests/testudp gdb-plus: tests/testplus $(DEBUG_COMMAND) tests/testplus diff --git a/tests/mem_udp.cc b/tests/mem_udp.cc index 077f8c99..8f6caad5 100644 --- a/tests/mem_udp.cc +++ b/tests/mem_udp.cc @@ -15,15 +15,15 @@ using namespace libtest; -#include +#include #include #include +#include -#include -#include -#include -#include -#include +#include +#include +#include +#include #include #include #include @@ -42,35 +42,30 @@ using namespace libtest; */ static test_return_t pre_binary(memcached_st *memc) { - memcached_return_t rc= MEMCACHED_FAILURE; - memcached_st *memc_clone; - memcached_server_instance_st instance; - - memc_clone= memcached_clone(NULL, memc); + memcached_st *memc_clone= memcached_clone(NULL, memc); test_true(memc_clone); + // The memcached_version needs to be done on a clone, because the server // will not toggle protocol on an connection. memcached_version(memc_clone); - instance= memcached_server_instance_by_position(memc_clone, 0); - - if (instance->major_version >= 1 && instance->minor_version > 2) - { - rc = memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1); - test_true(rc == MEMCACHED_SUCCESS); - test_true(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) == 1); - } + test_compare(MEMCACHED_SUCCESS, memcached_version(memc)); + test_compare(true, libmemcached_util_version_check(memc, 1, 2, 1)); + test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, true)); + test_compare(true, memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL)); memcached_free(memc_clone); - return rc == MEMCACHED_SUCCESS ? TEST_SUCCESS : TEST_SKIPPED; + return TEST_SUCCESS; } static void increment_request_id(uint16_t *id) { (*id)++; if ((*id & UDP_REQUEST_ID_THREAD_MASK) != 0) + { *id= 0; + } } static uint16_t *get_udp_request_ids(memcached_st *memc) @@ -80,8 +75,7 @@ static uint16_t *get_udp_request_ids(memcached_st *memc) for (uint32_t x= 0; x < memcached_server_count(memc); x++) { - memcached_server_instance_st instance= - memcached_server_instance_by_position(memc, x); + memcached_server_instance_st instance= memcached_server_instance_by_position(memc, x); ids[x]= get_udp_datagram_request_id((struct udp_datagram_header_st *) ((memcached_server_instance_st )instance)->write_buffer); } @@ -115,49 +109,14 @@ static test_return_t post_udp_op_check(memcached_st *memc, uint16_t *expected_re **/ static test_return_t init_udp(memcached_st *memc) { - memcached_version(memc); -#if 0 - memcached_server_instance_st instance= - memcached_server_instance_by_position(memc, 0); - - /* For the time being, only support udp test for >= 1.2.6 && < 1.3 */ - if (instance->major_version != 1 || instance->minor_version != 2 - || instance->micro_version < 6) - return TEST_SKIPPED; - - uint32_t num_hosts= memcached_server_count(memc); - memcached_server_st servers[num_hosts]; - memcpy(servers, memcached_server_list(memc), sizeof(memcached_server_st) * num_hosts); - for (uint32_t x= 0; x < num_hosts; x++) - { - memcached_server_instance_st set_instance= - memcached_server_instance_by_position(memc, x); + test_skip(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, true)); - memcached_server_free(((memcached_server_write_instance_st)set_instance)); - } - - memc->number_of_hosts= 0; - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1); - for (uint32_t x= 0; x < num_hosts; x++) - { - memcached_server_instance_st set_instance= - memcached_server_instance_by_position(memc, x); - - test_true(memcached_server_add_udp(memc, servers[x].hostname, servers[x].port) == MEMCACHED_SUCCESS); - test_true(set_instance->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH); - } -#endif - - return TEST_SKIPPED; + return TEST_SUCCESS; } static test_return_t binary_init_udp(memcached_st *memc) { - test_return_t test_rc; - test_rc= pre_binary(memc); - - if (test_rc != TEST_SUCCESS) - return test_rc; + test_skip(TEST_SUCCESS, pre_binary(memc)); return init_udp(memc); } @@ -198,20 +157,17 @@ static test_return_t add_udp_server_tcp_client_test(memcached_st *memc) static test_return_t set_udp_behavior_test(memcached_st *memc) { - memcached_quit(memc); - memc->number_of_hosts= 0; - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_DISTRIBUTION, memc->distribution); - test_true(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1) == MEMCACHED_SUCCESS); + + test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_DISTRIBUTION, memc->distribution)); + test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, true)); test_true(memc->flags.use_udp); test_true(memc->flags.no_reply); - test_true(memcached_server_count(memc) == 0); - - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP,0); - test_true(! (memc->flags.use_udp)); - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY,0); - test_true(! (memc->flags.no_reply)); + test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, false)); + test_false(memc->flags.use_udp); + test_compare(MEMCACHED_SUCCESS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, false)); + test_false(memc->flags.no_reply); return TEST_SUCCESS; } @@ -222,24 +178,20 @@ static test_return_t udp_set_test(memcached_st *memc) for (size_t x= 0; x < num_iters;x++) { - memcached_return_t rc; - const char *key= "foo"; - const char *value= "when we sanitize"; uint16_t *expected_ids= get_udp_request_ids(memc); - unsigned int server_key= memcached_generate_hash(memc, key, strlen(key)); - memcached_server_instance_st instance= - memcached_server_instance_by_position(memc, server_key); + unsigned int server_key= memcached_generate_hash(memc, test_literal_param("foo")); + memcached_server_instance_st instance= memcached_server_instance_by_position(memc, server_key); size_t init_offset= instance->write_buffer_offset; - rc= memcached_set(memc, key, strlen(key), - value, strlen(value), - (time_t)0, (uint32_t)0); - test_true(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED); + memcached_return_t rc= memcached_set(memc, test_literal_param("foo"), + test_literal_param("when we sanitize"), + (time_t)0, (uint32_t)0); + test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); /** NB, the check below assumes that if new write_ptr is less than * the original write_ptr that we have flushed. For large payloads, this * maybe an invalid assumption, but for the small payload we have it is OK */ - if (rc == MEMCACHED_SUCCESS || + if (rc == MEMCACHED_SUCCESS or instance->write_buffer_offset < init_offset) increment_request_id(&expected_ids[server_key]); @@ -254,48 +206,47 @@ static test_return_t udp_set_test(memcached_st *memc) } test_true(post_udp_op_check(memc, expected_ids) == TEST_SUCCESS); } + return TEST_SUCCESS; } static test_return_t udp_buffered_set_test(memcached_st *memc) { - memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1); - return udp_set_test(memc); + test_compare(MEMCACHED_INVALID_ARGUMENTS, memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, true)); + return TEST_SUCCESS; } static test_return_t udp_set_too_big_test(memcached_st *memc) { - memcached_return_t rc; - const char *key= "bar"; char value[MAX_UDP_DATAGRAM_LENGTH]; uint16_t *expected_ids= get_udp_request_ids(memc); - rc= memcached_set(memc, key, strlen(key), - value, MAX_UDP_DATAGRAM_LENGTH, - (time_t)0, (uint32_t)0); - test_true(rc == MEMCACHED_WRITE_FAILURE); + + memset(value, int('f'), sizeof(value)); + + test_compare_hint(MEMCACHED_WRITE_FAILURE, memcached_set(memc, test_literal_param("bar"), value, sizeof(value), time_t(0), uint32_t(0)), + memcached_last_error_message(memc)); return post_udp_op_check(memc,expected_ids); } static test_return_t udp_delete_test(memcached_st *memc) { - unsigned int num_iters= 1025; //request id rolls over at 1024 - - for (size_t x= 0; x < num_iters;x++) + //request id rolls over at 1024 + for (size_t x= 0; x < 1025; x++) { - memcached_return_t rc; - const char *key= "foo"; uint16_t *expected_ids=get_udp_request_ids(memc); - unsigned int server_key= memcached_generate_hash(memc, key, strlen(key)); - memcached_server_instance_st instance= - memcached_server_instance_by_position(memc, server_key); + unsigned int server_key= memcached_generate_hash(memc, test_literal_param("foo")); + memcached_server_instance_st instance= memcached_server_instance_by_position(memc, server_key); size_t init_offset= instance->write_buffer_offset; - rc= memcached_delete(memc, key, strlen(key), 0); - test_true(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED); + memcached_return_t rc= memcached_delete(memc, test_literal_param("foo"), 0); + test_true(rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED); - if (rc == MEMCACHED_SUCCESS || instance->write_buffer_offset < init_offset) + if (rc == MEMCACHED_SUCCESS or instance->write_buffer_offset < init_offset) + { increment_request_id(&expected_ids[server_key]); + } + if (rc == MEMCACHED_SUCCESS) { test_true(instance->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH); @@ -318,7 +269,6 @@ static test_return_t udp_buffered_delete_test(memcached_st *memc) static test_return_t udp_verbosity_test(memcached_st *memc) { - memcached_return_t rc; uint16_t *expected_ids= get_udp_request_ids(memc); for (size_t x= 0; x < memcached_server_count(memc); x++) @@ -326,8 +276,8 @@ static test_return_t udp_verbosity_test(memcached_st *memc) increment_request_id(&expected_ids[x]); } - rc= memcached_verbosity(memc,3); - test_true(rc == MEMCACHED_SUCCESS); + test_compare(MEMCACHED_SUCCESS, memcached_verbosity(memc, 3)); + return post_udp_op_check(memc,expected_ids); } @@ -340,94 +290,85 @@ static test_return_t udp_quit_test(memcached_st *memc) static test_return_t udp_flush_test(memcached_st *memc) { - memcached_return_t rc; uint16_t *expected_ids= get_udp_request_ids(memc); for (size_t x= 0; x < memcached_server_count(memc); x++) { increment_request_id(&expected_ids[x]); } + test_compare_hint(MEMCACHED_SUCCESS, memcached_flush(memc, 0), memcached_last_error_message(memc)); - rc= memcached_flush(memc,0); - test_true(rc == MEMCACHED_SUCCESS); - return post_udp_op_check(memc,expected_ids); + return post_udp_op_check(memc, expected_ids); } static test_return_t udp_incr_test(memcached_st *memc) { - memcached_return_t rc; - const char *key= "incr"; - const char *value= "1"; - rc= memcached_set(memc, key, strlen(key), - value, strlen(value), - (time_t)0, (uint32_t)0); + test_compare(MEMCACHED_SUCCESS, memcached_set(memc, test_literal_param("incr"), + test_literal_param("1"), + (time_t)0, (uint32_t)0)); - test_true(rc == MEMCACHED_SUCCESS); uint16_t *expected_ids= get_udp_request_ids(memc); - unsigned int server_key= memcached_generate_hash(memc, key, strlen(key)); + unsigned int server_key= memcached_generate_hash(memc, test_literal_param("incr")); increment_request_id(&expected_ids[server_key]); + uint64_t newvalue; - rc= memcached_increment(memc, key, strlen(key), 1, &newvalue); - test_true(rc == MEMCACHED_SUCCESS); + test_compare(MEMCACHED_SUCCESS, memcached_increment(memc, test_literal_param("incr"), 1, &newvalue)); + return post_udp_op_check(memc, expected_ids); } static test_return_t udp_decr_test(memcached_st *memc) { - memcached_return_t rc; - const char *key= "decr"; - const char *value= "1"; - rc= memcached_set(memc, key, strlen(key), - value, strlen(value), - (time_t)0, (uint32_t)0); + test_compare(MEMCACHED_SUCCESS, memcached_set(memc, + test_literal_param("decr"), + test_literal_param("1"), + (time_t)0, (uint32_t)0)); - test_true(rc == MEMCACHED_SUCCESS); uint16_t *expected_ids= get_udp_request_ids(memc); - unsigned int server_key= memcached_generate_hash(memc, key, strlen(key)); + unsigned int server_key= memcached_generate_hash(memc, test_literal_param("decr")); increment_request_id(&expected_ids[server_key]); + uint64_t newvalue; - rc= memcached_decrement(memc, key, strlen(key), 1, &newvalue); - test_true(rc == MEMCACHED_SUCCESS); + test_compare(MEMCACHED_SUCCESS, memcached_decrement(memc, test_literal_param("decr"), 1, &newvalue)); + return post_udp_op_check(memc, expected_ids); } static test_return_t udp_stat_test(memcached_st *memc) { - memcached_stat_st * rv= NULL; memcached_return_t rc; char args[]= ""; uint16_t *expected_ids = get_udp_request_ids(memc); - rv = memcached_stat(memc, args, &rc); + memcached_stat_st *rv = memcached_stat(memc, args, &rc); free(rv); - test_true(rc == MEMCACHED_NOT_SUPPORTED); + test_compare(MEMCACHED_NOT_SUPPORTED, rc); + return post_udp_op_check(memc, expected_ids); } static test_return_t udp_version_test(memcached_st *memc) { - memcached_return_t rc; uint16_t *expected_ids = get_udp_request_ids(memc); - rc = memcached_version(memc); - test_true(rc == MEMCACHED_NOT_SUPPORTED); + + test_compare(MEMCACHED_NOT_SUPPORTED, memcached_version(memc)); + return post_udp_op_check(memc, expected_ids); } static test_return_t udp_get_test(memcached_st *memc) { memcached_return_t rc; - const char *key= "foo"; size_t vlen; uint16_t *expected_ids = get_udp_request_ids(memc); - char *val= memcached_get(memc, key, strlen(key), &vlen, (uint32_t)0, &rc); - test_true(rc == MEMCACHED_NOT_SUPPORTED); - test_true(val == NULL); + test_null(memcached_get(memc, test_literal_param("foo"), &vlen, (uint32_t)0, &rc)); + test_compare(MEMCACHED_NOT_SUPPORTED, rc); + return post_udp_op_check(memc, expected_ids); } static test_return_t udp_mixed_io_test(memcached_st *memc) { - test_st current_op; test_st mixed_io_ops [] ={ {"udp_set_test", 0, (test_callback_fn*)udp_set_test}, @@ -439,8 +380,10 @@ static test_return_t udp_mixed_io_test(memcached_st *memc) (test_callback_fn*)udp_verbosity_test}, {"udp_quit_test", 0, (test_callback_fn*)udp_quit_test}, +#if 0 {"udp_flush_test", 0, (test_callback_fn*)udp_flush_test}, +#endif {"udp_incr_test", 0, (test_callback_fn*)udp_incr_test}, {"udp_decr_test", 0, @@ -451,8 +394,8 @@ static test_return_t udp_mixed_io_test(memcached_st *memc) for (size_t x= 0; x < 500; x++) { - current_op= mixed_io_ops[random() % 9]; - test_true(current_op.test_fn(memc) == TEST_SUCCESS); + test_st current_op= mixed_io_ops[(random() % 8)]; + test_compare(TEST_SUCCESS, current_op.test_fn(memc)); } return TEST_SUCCESS; } -- 2.30.2