From: Eric Lambert Date: Mon, 9 Mar 2009 21:59:06 +0000 (+0100) Subject: udp support in fire and forget mode for all ops but get/gets, stat and version X-Git-Tag: 0.27~23 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=70d4781acf107377fa194a04d05e8cf31fd211c8;p=m6w6%2Flibmemcached udp support in fire and forget mode for all ops but get/gets, stat and version --- diff --git a/clients/client_options.h b/clients/client_options.h index a3431336..a4f0f47f 100644 --- a/clients/client_options.h +++ b/clients/client_options.h @@ -24,6 +24,7 @@ typedef enum { OPT_FLUSH, OPT_HASH, OPT_BINARY, + OPT_UDP } memcached_options; #endif /* CLIENT_OPTIONS */ diff --git a/clients/memslap.c b/clients/memslap.c index 7aa69ead..cd170eb1 100644 --- a/clients/memslap.c +++ b/clients/memslap.c @@ -77,6 +77,7 @@ static unsigned int opt_createial_load= 0; static unsigned int opt_concurrency= 0; static int opt_displayflag= 0; static char *opt_servers= NULL; +static int opt_udp_io= 0; test_type opt_test= SET_TEST; int main(int argc, char *argv[]) @@ -139,6 +140,15 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) PTHREAD_CREATE_DETACHED); memc= memcached_create(NULL); + + /* We need to set udp behavior before adding servers to the client */ + if (opt_udp_io) + { + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, opt_udp_io); + unsigned int i= 0; + for(i= 0; i < servers[0].count; i++ ) + servers[i].type= MEMCACHED_CONNECTION_UDP; + } memcached_server_push(memc, servers); memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary); @@ -240,6 +250,7 @@ void options_parse(int argc, char *argv[]) {"verbose", no_argument, &opt_verbose, OPT_VERBOSE}, {"version", no_argument, NULL, OPT_VERSION}, {"binary", no_argument, NULL, OPT_BINARY}, + {"udp", no_argument, NULL, OPT_UDP}, {0, 0, 0, 0}, }; @@ -254,6 +265,15 @@ void options_parse(int argc, char *argv[]) { case 0: break; + case OPT_UDP: + if (opt_test == GET_TEST) + { + fprintf(stderr, "You can not run a get test in UDP mode. UDP mode " + "does not currently support get ops.\n"); + exit(1); + } + opt_udp_io= 1; + break; case OPT_BINARY: opt_binary = 1; break; @@ -274,7 +294,15 @@ void options_parse(int argc, char *argv[]) break; case OPT_SLAP_TEST: if (!strcmp(optarg, "get")) + { + if (opt_udp_io == 1) + { + fprintf(stderr, "You can not run a get test in UDP mode. UDP mode " + "does not currently support get ops.\n"); + exit(1); + } opt_test= GET_TEST ; + } else if (!strcmp(optarg, "set")) opt_test= SET_TEST; else diff --git a/clients/utilities.c b/clients/utilities.c index adab23fa..529955ce 100644 --- a/clients/utilities.c +++ b/clients/utilities.c @@ -45,6 +45,7 @@ static char *lookup_help(memcached_options option) case OPT_HASH: return("Select hash type."); case OPT_BINARY: return("Switch to binary protocol."); case OPT_ANALYZE: return("Analyze the provided servers."); + case OPT_UDP: return("Use UDP protocol when communicating with server."); }; WATCHPOINT_ASSERT(0); diff --git a/libmemcached/common.h b/libmemcached/common.h index 54b33a20..d91c6b30 100644 --- a/libmemcached/common.h +++ b/libmemcached/common.h @@ -76,7 +76,8 @@ typedef enum { MEM_KETAMA_WEIGHTED= (1 << 11), MEM_BINARY_PROTOCOL= (1 << 12), MEM_HASH_WITH_PREFIX_KEY= (1 << 13), - MEM_NOREPLY= (1 << 14) + MEM_NOREPLY= (1 << 14), + MEM_USE_UDP= (1 << 15) } memcached_flags; /* Hashing algo */ diff --git a/libmemcached/memcached.c b/libmemcached/memcached.c index 1144115f..a15cd2f2 100644 --- a/libmemcached/memcached.c +++ b/libmemcached/memcached.c @@ -87,17 +87,6 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *source) if (new_clone == NULL) return NULL; - if (source->hosts) - rc= memcached_server_push(new_clone, source->hosts); - - if (rc != MEMCACHED_SUCCESS) - { - memcached_free(new_clone); - - return NULL; - } - - new_clone->flags= source->flags; new_clone->send_size= source->send_size; new_clone->recv_size= source->recv_size; @@ -120,6 +109,17 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *source) new_clone->get_key_failure= source->get_key_failure; new_clone->delete_trigger= source->delete_trigger; + if (source->hosts) + rc= memcached_server_push(new_clone, source->hosts); + + if (rc != MEMCACHED_SUCCESS) + { + memcached_free(new_clone); + + return NULL; + } + + if (source->prefix_key[0] != 0) { strcpy(new_clone->prefix_key, source->prefix_key); diff --git a/libmemcached/memcached_auto.c b/libmemcached/memcached_auto.c index 9808ab65..787ef687 100644 --- a/libmemcached/memcached_auto.c +++ b/libmemcached/memcached_auto.c @@ -10,6 +10,7 @@ static memcached_return memcached_auto(memcached_st *ptr, memcached_return rc; char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; unsigned int server_key; + bool no_reply= (ptr->flags & MEM_NOREPLY); unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0) return MEMCACHED_NO_SERVERS; @@ -20,15 +21,15 @@ static memcached_return memcached_auto(memcached_st *ptr, server_key= memcached_generate_hash(ptr, key, key_length); send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "%s %s%.*s %u\r\n", verb, + "%s %s%.*s %u%s\r\n", verb, ptr->prefix_key, (int)key_length, key, - offset); + offset, no_reply ? " noreply" : ""); unlikely (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) return MEMCACHED_WRITE_FAILURE; rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1); - if (rc != MEMCACHED_SUCCESS) + if (no_reply || rc != MEMCACHED_SUCCESS) return rc; rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); @@ -64,12 +65,20 @@ static memcached_return binary_incr_decr(memcached_st *ptr, uint8_t cmd, uint32_t offset, uint64_t *value) { unsigned int server_key; + bool no_reply= (ptr->flags & MEM_NOREPLY); unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0) return MEMCACHED_NO_SERVERS; server_key= memcached_generate_hash(ptr, key, key_length); + if (no_reply) + { + if(cmd == PROTOCOL_BINARY_CMD_DECREMENT) + cmd= PROTOCOL_BINARY_CMD_DECREMENTQ; + if(cmd == PROTOCOL_BINARY_CMD_INCREMENT) + cmd= PROTOCOL_BINARY_CMD_INCREMENTQ; + } protocol_binary_request_incr request= {.bytes= {0}}; request.message.header.request.magic= PROTOCOL_BINARY_REQ; @@ -88,7 +97,9 @@ static memcached_return binary_incr_decr(memcached_st *ptr, uint8_t cmd, memcached_io_reset(&ptr->hosts[server_key]); return MEMCACHED_WRITE_FAILURE; } - + + if (no_reply) + return MEMCACHED_SUCCESS; return memcached_response(&ptr->hosts[server_key], (char*)value, sizeof(*value), NULL); } diff --git a/libmemcached/memcached_behavior.c b/libmemcached/memcached_behavior.c index f2341550..e8f65ddf 100644 --- a/libmemcached/memcached_behavior.c +++ b/libmemcached/memcached_behavior.c @@ -5,7 +5,7 @@ #include /* - This function is used to modify the behabior of running client. + This function is used to modify the behavior of running client. We quit all connections so we can reset the sockets. */ @@ -54,6 +54,13 @@ memcached_return memcached_behavior_set(memcached_st *ptr, set_behavior_flag(ptr, MEM_BUFFER_REQUESTS, data); memcached_quit(ptr); break; + case MEMCACHED_BEHAVIOR_USE_UDP: + if (ptr->number_of_hosts) + return MEMCACHED_FAILURE; + set_behavior_flag(ptr, MEM_USE_UDP, data); + if (data) + set_behavior_flag(ptr,MEM_NOREPLY,data); + break; case MEMCACHED_BEHAVIOR_TCP_NODELAY: set_behavior_flag(ptr, MEM_TCP_NODELAY, data); memcached_quit(ptr); @@ -170,6 +177,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr, case MEMCACHED_BEHAVIOR_BUFFER_REQUESTS: temp_flag= MEM_BUFFER_REQUESTS; break; + case MEMCACHED_BEHAVIOR_USE_UDP: + temp_flag= MEM_USE_UDP; + break; case MEMCACHED_BEHAVIOR_TCP_NODELAY: temp_flag= MEM_TCP_NODELAY; break; diff --git a/libmemcached/memcached_constants.h b/libmemcached/memcached_constants.h index fc47a3b6..1af6aa4a 100644 --- a/libmemcached/memcached_constants.h +++ b/libmemcached/memcached_constants.h @@ -61,6 +61,7 @@ typedef enum { MEMCACHED_TIMEOUT, MEMCACHED_BUFFERED, MEMCACHED_BAD_KEY_PROVIDED, + MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_MAXIMUM_RETURN /* Always add new error code before */ } memcached_return; @@ -97,7 +98,8 @@ typedef enum { MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK, MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK, MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY, - MEMCACHED_BEHAVIOR_NOREPLY + MEMCACHED_BEHAVIOR_NOREPLY, + MEMCACHED_BEHAVIOR_USE_UDP } memcached_behavior; typedef enum { diff --git a/libmemcached/memcached_delete.c b/libmemcached/memcached_delete.c index 84b6160c..f2fab322 100644 --- a/libmemcached/memcached_delete.c +++ b/libmemcached/memcached_delete.c @@ -36,6 +36,7 @@ memcached_return memcached_delete_by_key(memcached_st *ptr, server_key= memcached_generate_hash(ptr, master_key, master_key_length); to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1; + bool no_reply= (ptr->flags & MEM_NOREPLY); if (ptr->flags & MEM_BINARY_PROTOCOL) rc= binary_delete(ptr, server_key, key, key_length, to_write); @@ -43,22 +44,30 @@ memcached_return memcached_delete_by_key(memcached_st *ptr, { if (expiration) send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "delete %s%.*s %u\r\n", + "delete %s%.*s %u%s\r\n", ptr->prefix_key, (int)key_length, key, - (uint32_t)expiration); + (uint32_t)expiration, no_reply ? " noreply" :"" ); else send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "delete %s%.*s\r\n", + "delete %s%.*s%s\r\n", ptr->prefix_key, - (int)key_length, key); + (int)key_length, key, no_reply ? " noreply" :""); if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) { rc= MEMCACHED_WRITE_FAILURE; goto error; } - + + if (ptr->flags & MEM_USE_UDP && !to_write) + { + if (send_length > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) + return MEMCACHED_WRITE_FAILURE; + if (send_length + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) + memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1); + } + rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write); } @@ -66,10 +75,8 @@ memcached_return memcached_delete_by_key(memcached_st *ptr, goto error; if ((ptr->flags & MEM_BUFFER_REQUESTS)) - { rc= MEMCACHED_BUFFERED; - } - else + else if (!no_reply) { rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); if (rc == MEMCACHED_DELETED) @@ -93,10 +100,22 @@ static inline memcached_return binary_delete(memcached_st *ptr, protocol_binary_request_delete request= {.bytes= {0}}; request.message.header.request.magic= PROTOCOL_BINARY_REQ; - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE; + if (ptr->flags & MEM_NOREPLY) + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ; + else + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE; request.message.header.request.keylen= htons((uint16_t)key_length); request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; request.message.header.request.bodylen= htonl(key_length); + + if (ptr->flags & MEM_USE_UDP && !flush) + { + size_t cmd_size= sizeof(request.bytes) + key_length; + if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) + return MEMCACHED_WRITE_FAILURE; + if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) + memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1); + } if ((memcached_do(&ptr->hosts[server_key], request.bytes, sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) || diff --git a/libmemcached/memcached_do.c b/libmemcached/memcached_do.c index c364efe8..b46805a0 100644 --- a/libmemcached/memcached_do.c +++ b/libmemcached/memcached_do.c @@ -15,6 +15,14 @@ memcached_return memcached_do(memcached_server_st *ptr, const void *command, return rc; } + /* + ** Since non buffering ops in UDP mode dont check to make sure they will fit + ** before they start writing, if there is any data in buffer, clear it out, + ** otherwise we might get a partial write. + **/ + if (ptr->type == MEMCACHED_CONNECTION_UDP && with_flush && ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH) + memcached_io_write(ptr, NULL, 0, 1); + sent_length= memcached_io_write(ptr, command, command_length, with_flush); if (sent_length == -1 || (size_t)sent_length != command_length) diff --git a/libmemcached/memcached_fetch.c b/libmemcached/memcached_fetch.c index f728bebc..e810b64a 100644 --- a/libmemcached/memcached_fetch.c +++ b/libmemcached/memcached_fetch.c @@ -14,6 +14,9 @@ memcached_return value_fetch(memcached_server_st *ptr, size_t to_read; char *value_ptr; + if (ptr->root->flags & MEM_USE_UDP) + return MEMCACHED_NOT_SUPPORTED; + WATCHPOINT_ASSERT(ptr->root); end_ptr= buffer + MEMCACHED_DEFAULT_COMMAND_SIZE; @@ -133,6 +136,12 @@ char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length, { memcached_result_st *result_buffer= &ptr->result; + if (ptr->flags & MEM_USE_UDP) + { + *error= MEMCACHED_NOT_SUPPORTED; + return NULL; + } + while (ptr->cursor_server < ptr->number_of_hosts) { char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; @@ -184,6 +193,13 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr, memcached_result_st *result, memcached_return *error) { + + if (ptr->flags & MEM_USE_UDP) + { + *error= MEMCACHED_NOT_SUPPORTED; + return NULL; + } + if (result == NULL) result= memcached_result_create(ptr, NULL); diff --git a/libmemcached/memcached_flush.c b/libmemcached/memcached_flush.c index 4e048d00..55a6ca71 100644 --- a/libmemcached/memcached_flush.c +++ b/libmemcached/memcached_flush.c @@ -31,16 +31,18 @@ static memcached_return memcached_flush_textual(memcached_st *ptr, for (x= 0; x < ptr->number_of_hosts; x++) { + bool no_reply= (ptr->flags & MEM_NOREPLY); if (expiration) send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "flush_all %llu\r\n", (unsigned long long)expiration); + "flush_all %llu%s\r\n", + (unsigned long long)expiration, no_reply ? " noreply" : ""); else send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "flush_all\r\n"); + "flush_all%s\r\n", no_reply ? " noreply" : ""); rc= memcached_do(&ptr->hosts[x], buffer, send_length, 1); - if (rc == MEMCACHED_SUCCESS) + if (rc == MEMCACHED_SUCCESS && !no_reply) (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); } @@ -65,6 +67,10 @@ static memcached_return memcached_flush_binary(memcached_st *ptr, for (x= 0; x < ptr->number_of_hosts; x++) { + if (ptr->flags & MEM_NOREPLY) + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ; + else + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; if (memcached_do(&ptr->hosts[x], request.bytes, sizeof(request.bytes), 1) != MEMCACHED_SUCCESS) { diff --git a/libmemcached/memcached_get.c b/libmemcached/memcached_get.c index 2cc40151..3305a772 100644 --- a/libmemcached/memcached_get.c +++ b/libmemcached/memcached_get.c @@ -27,6 +27,12 @@ char *memcached_get_by_key(memcached_st *ptr, uint32_t dummy_flags; memcached_return dummy_error; + if (ptr->flags & MEM_USE_UDP) + { + *error= MEMCACHED_NOT_SUPPORTED; + return NULL; + } + /* Request the key */ *error= memcached_mget_by_key(ptr, master_key, @@ -120,6 +126,9 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, uint8_t get_command_length= 4; unsigned int master_server_key= 0; + if (ptr->flags & MEM_USE_UDP) + return MEMCACHED_NOT_SUPPORTED; + LIBMEMCACHED_MEMCACHED_MGET_START(); ptr->cursor_server= 0; diff --git a/libmemcached/memcached_hosts.c b/libmemcached/memcached_hosts.c index c0d0f036..2bcd189c 100644 --- a/libmemcached/memcached_hosts.c +++ b/libmemcached/memcached_hosts.c @@ -248,6 +248,11 @@ memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *l for (x= 0; x < count; x++) { + if ((ptr->flags & MEM_USE_UDP && list[x].type != MEMCACHED_CONNECTION_UDP) + || ((list[x].type == MEMCACHED_CONNECTION_UDP) + && ! (ptr->flags & MEM_USE_UDP)) ) + return MEMCACHED_INVALID_HOST_PROTOCOL; + WATCHPOINT_ASSERT(list[x].hostname[0] != 0); memcached_server_create(ptr, &ptr->hosts[ptr->number_of_hosts]); /* TODO check return type */ @@ -313,7 +318,7 @@ memcached_return memcached_server_add_with_weight(memcached_st *ptr, port= MEMCACHED_DEFAULT_PORT; if (!hostname) - hostname= "localhost"; + hostname= "localhost"; return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_TCP); } @@ -325,6 +330,10 @@ static memcached_return server_add(memcached_st *ptr, const char *hostname, { memcached_server_st *new_host_list; + if ( (ptr->flags & MEM_USE_UDP && type != MEMCACHED_CONNECTION_UDP) + || ( (type == MEMCACHED_CONNECTION_UDP) && !(ptr->flags & MEM_USE_UDP) ) ) + return MEMCACHED_INVALID_HOST_PROTOCOL; + if (ptr->call_realloc) new_host_list= (memcached_server_st *)ptr->call_realloc(ptr, ptr->hosts, sizeof(memcached_server_st) * (ptr->number_of_hosts+1)); diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 72a02a0f..dc12ebe7 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -13,6 +13,7 @@ typedef enum { } memc_read_or_write; static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error); +static void increment_udp_message_id(memcached_server_st *ptr); static memcached_return io_wait(memcached_server_st *ptr, memc_read_or_write read_or_write) @@ -194,18 +195,30 @@ ssize_t memcached_io_write(memcached_server_st *ptr, { char *write_ptr; size_t should_write; + size_t buffer_end; - should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset; - write_ptr= ptr->write_buffer + ptr->write_buffer_offset; - - should_write= (should_write < length) ? should_write : length; + if (ptr->type == MEMCACHED_CONNECTION_UDP) + { + //UDP does not support partial writes + buffer_end= MAX_UDP_DATAGRAM_LENGTH; + should_write= length; + if (ptr->write_buffer_offset + should_write > buffer_end) + return -1; + } + else + { + buffer_end= MEMCACHED_MAX_BUFFER; + should_write= buffer_end - ptr->write_buffer_offset; + should_write= (should_write < length) ? should_write : length; + } + write_ptr= ptr->write_buffer + ptr->write_buffer_offset; memcpy(write_ptr, buffer_ptr, should_write); ptr->write_buffer_offset+= should_write; buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) + if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) { memcached_return rc; ssize_t sent_length; @@ -217,7 +230,7 @@ ssize_t memcached_io_write(memcached_server_st *ptr, /* If io_flush calls memcached_purge, sent_length may be 0 */ if (sent_length != 0) - WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); + WATCHPOINT_ASSERT(sent_length == buffer_end); } } @@ -288,7 +301,12 @@ static ssize_t io_flush(memcached_server_st *ptr, WATCHPOINT_ASSERT(ptr->fd != -1); - if (ptr->write_buffer_offset == 0) + // UDP Sanity check, make sure that we are not sending somthing too big + if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) + return -1; + + if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP + && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) return 0; /* Looking for memory overflows */ @@ -305,63 +323,40 @@ static ssize_t io_flush(memcached_server_st *ptr, WATCHPOINT_ASSERT(write_length > 0); sent_length= 0; if (ptr->type == MEMCACHED_CONNECTION_UDP) + increment_udp_message_id(ptr); + sent_length= write(ptr->fd, local_write_ptr, write_length); + + if (sent_length == -1) { - struct addrinfo *ai; - - ai= ptr->address_info; - - /* Crappy test code */ - char buffer[HUGE_STRING_LEN + 8]; - memset(buffer, 0, HUGE_STRING_LEN + 8); - memcpy (buffer+8, local_write_ptr, write_length); - buffer[0]= 0; - buffer[1]= 0; - buffer[2]= 0; - buffer[3]= 0; - buffer[4]= 0; - buffer[5]= 1; - buffer[6]= 0; - buffer[7]= 0; - sent_length= sendto(ptr->fd, buffer, write_length + 8, 0, - (struct sockaddr *)ai->ai_addr, - ai->ai_addrlen); - if (sent_length == -1) + ptr->cached_errno= errno; + switch (errno) { - WATCHPOINT_ERRNO(errno); - WATCHPOINT_ASSERT(0); - } - sent_length-= 8; /* We remove the header */ - } - else - { - WATCHPOINT_ASSERT(ptr->fd != -1); - if ((sent_length= write(ptr->fd, local_write_ptr, - write_length)) == -1) + case ENOBUFS: + continue; + case EAGAIN: { - ptr->cached_errno= errno; - switch (errno) - { - case ENOBUFS: - continue; - case EAGAIN: - { - memcached_return rc; - rc= io_wait(ptr, MEM_WRITE); + memcached_return rc; + rc= io_wait(ptr, MEM_WRITE); - if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) - continue; + if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) + continue; - memcached_quit_server(ptr, 1); - return -1; - } - default: - memcached_quit_server(ptr, 1); - *error= MEMCACHED_ERRNO; - return -1; - } + memcached_quit_server(ptr, 1); + return -1; + } + default: + memcached_quit_server(ptr, 1); + *error= MEMCACHED_ERRNO; + return -1; } } + if (ptr->type == MEMCACHED_CONNECTION_UDP && sent_length != write_length) + { + memcached_quit_server(ptr, 1); + return -1; + } + ptr->io_bytes_sent += sent_length; local_write_ptr+= sent_length; @@ -372,7 +367,13 @@ static ssize_t io_flush(memcached_server_st *ptr, WATCHPOINT_ASSERT(write_length == 0); // Need to study this assert() WATCHPOINT_ASSERT(return_length == // ptr->write_buffer_offset); - ptr->write_buffer_offset= 0; + + // if we are a udp server, the begining of the buffer is reserverd for + // the upd frame header + if (ptr->type == MEMCACHED_CONNECTION_UDP) + ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; + else + ptr->write_buffer_offset= 0; return return_length; } @@ -454,3 +455,40 @@ memcached_return memcached_io_readline(memcached_server_st *ptr, return MEMCACHED_SUCCESS; } + +/* + * The udp request id consists of two seperate sections + * 1) The thread id + * 2) The message number + * The thread id should only be set when the memcached_st struct is created + * and should not be changed. + * + * The message num is incremented for each new message we send, this function + * extracts the message number from message_id, increments it and then + * writes the new value back into the header + */ +static void increment_udp_message_id(memcached_server_st *ptr) +{ + struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; + uint16_t cur_req= get_udp_datagram_request_id(header); + uint16_t msg_num= get_msg_num_from_request_id(cur_req); + uint16_t thread_id= get_thread_id_from_request_id(cur_req); + + if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) + msg_num= 0; + + header->request_id= htons(thread_id | msg_num); +} + +memcached_return memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id) +{ + if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) + return MEMCACHED_FAILURE; + + struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; + header->request_id= htons(generate_udp_request_thread_id(thread_id)); + header->num_datagrams= htons(1); + header->sequence_number= htons(0); + + return MEMCACHED_SUCCESS; +} diff --git a/libmemcached/memcached_io.h b/libmemcached/memcached_io.h index 1bed7959..f1a622c0 100644 --- a/libmemcached/memcached_io.h +++ b/libmemcached/memcached_io.h @@ -4,6 +4,25 @@ #ifndef __MEMCACHED_IO_H__ #define __MEMCACHED_IO_H__ +#define MAX_UDP_DATAGRAM_LENGTH 1400 +#define UDP_DATAGRAM_HEADER_LENGTH 8 +#define UDP_REQUEST_ID_MSG_SIG_DIGITS 10 +#define UDP_REQUEST_ID_THREAD_MASK 0xFFFF << UDP_REQUEST_ID_MSG_SIG_DIGITS +#define get_udp_datagram_request_id(A) ntohs((A)->request_id) +#define get_udp_datagram_seq_num(A) ntohs((A)->sequence_number) +#define get_udp_datagram_num_datagrams(A) ntohs((A)->num_datagrams) +#define get_msg_num_from_request_id(A) ( (A) & (~(UDP_REQUEST_ID_THREAD_MASK)) ) +#define get_thread_id_from_request_id(A) ( (A) & (UDP_REQUEST_ID_THREAD_MASK) ) >> UDP_REQUEST_ID_MSG_SIG_DIGITS +#define generate_udp_request_thread_id(A) (A) << UDP_REQUEST_ID_MSG_SIG_DIGITS +#define UDP_REQUEST_ID_MAX_THREAD_ID get_thread_id_from_request_id(0xFFFF) + +struct udp_datagram_header_st { + uint16_t request_id; + uint16_t sequence_number; + uint16_t num_datagrams; + uint16_t reserved; +}; + ssize_t memcached_io_write(memcached_server_st *ptr, const void *buffer, size_t length, char with_flush); void memcached_io_reset(memcached_server_st *ptr); @@ -22,4 +41,6 @@ memcached_return memcached_safe_read(memcached_server_st *ptr, memcached_return memcached_read_one_response(memcached_server_st *ptr, char *buffer, size_t buffer_length, memcached_result_st *result); +memcached_return memcached_io_init_udp_header(memcached_server_st *ptr, + uint16_t thread_id); #endif /* __MEMCACHED_IO_H__ */ diff --git a/libmemcached/memcached_quit.c b/libmemcached/memcached_quit.c index 14eca73d..592bde9f 100644 --- a/libmemcached/memcached_quit.c +++ b/libmemcached/memcached_quit.c @@ -13,7 +13,7 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death) { if (ptr->fd != -1) { - if (io_death == 0) + if (io_death == 0 && ptr->type != MEMCACHED_CONNECTION_UDP) { memcached_return rc; ssize_t read_length; @@ -43,7 +43,7 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death) memcached_io_close(ptr); ptr->fd= -1; - ptr->write_buffer_offset= 0; + ptr->write_buffer_offset= (ptr->type == MEMCACHED_CONNECTION_UDP) ? UDP_DATAGRAM_HEADER_LENGTH : 0 ; ptr->read_buffer_length= 0; ptr->read_ptr= ptr->read_buffer; memcached_server_response_reset(ptr); diff --git a/libmemcached/memcached_server.c b/libmemcached/memcached_server.c index b7dd8a94..38d81048 100644 --- a/libmemcached/memcached_server.c +++ b/libmemcached/memcached_server.c @@ -41,6 +41,11 @@ memcached_server_st *memcached_server_create_with(memcached_st *memc, memcached_ host->read_ptr= host->read_buffer; if (memc) host->next_retry= memc->retry_timeout; + if (type == MEMCACHED_CONNECTION_UDP) + { + host->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; + memcached_io_init_udp_header(host, 0); + } return host; } diff --git a/libmemcached/memcached_stats.c b/libmemcached/memcached_stats.c index 9069bccb..3ad236d2 100644 --- a/libmemcached/memcached_stats.c +++ b/libmemcached/memcached_stats.c @@ -340,6 +340,12 @@ memcached_stat_st *memcached_stat(memcached_st *ptr, char *args, memcached_retur memcached_return rc; memcached_stat_st *stats; + if (ptr->flags & MEM_USE_UDP) + { + *error= MEMCACHED_NOT_SUPPORTED; + return NULL; + } + if (ptr->call_malloc) stats= (memcached_stat_st *)ptr->call_malloc(ptr, sizeof(memcached_stat_st)*(ptr->number_of_hosts)); else diff --git a/libmemcached/memcached_storage.c b/libmemcached/memcached_storage.c index 1f551b87..9acc7c3b 100644 --- a/libmemcached/memcached_storage.c +++ b/libmemcached/memcached_storage.c @@ -105,6 +105,14 @@ static inline memcached_return memcached_send(memcached_st *ptr, (unsigned long long)expiration, value_length, (ptr->flags & MEM_NOREPLY) ? " noreply" : ""); + if (ptr->flags & MEM_USE_UDP && ptr->flags & MEM_BUFFER_REQUESTS) { + size_t cmd_size= write_length + value_length + 2; + if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) + return MEMCACHED_WRITE_FAILURE; + if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) + memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1); + } + if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) { rc= MEMCACHED_WRITE_FAILURE; @@ -135,9 +143,7 @@ static inline memcached_return memcached_send(memcached_st *ptr, } if (ptr->flags & MEM_NOREPLY) - { return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; - } if (to_write == 0) return MEMCACHED_BUFFERED; @@ -394,7 +400,7 @@ static memcached_return memcached_send_binary(memcached_server_st* server, char flush; protocol_binary_request_set request= {.bytes= {0}}; size_t send_length= sizeof(request.bytes); - bool noreply = server->root->flags & MEM_NOREPLY; + bool noreply= server->root->flags & MEM_NOREPLY; request.message.header.request.magic= PROTOCOL_BINARY_REQ; request.message.header.request.opcode= get_com_code(verb, noreply); @@ -417,6 +423,15 @@ static memcached_return memcached_send_binary(memcached_server_st* server, flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1; + if ((server->root->flags & MEM_USE_UDP) && !flush) + { + size_t cmd_size= send_length + key_length + value_length; + if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) + return MEMCACHED_WRITE_FAILURE; + if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) + memcached_io_write(server,NULL,0, 1); + } + /* write the header */ if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) || (memcached_io_write(server, key, key_length, 0) == -1) || diff --git a/libmemcached/memcached_verbosity.c b/libmemcached/memcached_verbosity.c index 7cfb073f..8dc9df31 100644 --- a/libmemcached/memcached_verbosity.c +++ b/libmemcached/memcached_verbosity.c @@ -24,6 +24,9 @@ memcached_return memcached_verbosity(memcached_st *ptr, unsigned int verbosity) continue; } + if (ptr->flags & MEM_USE_UDP) + continue; + rrc= memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); if (rrc != MEMCACHED_SUCCESS) rc= MEMCACHED_SOME_ERRORS; diff --git a/libmemcached/memcached_version.c b/libmemcached/memcached_version.c index 5b2386f2..30bcb1bd 100644 --- a/libmemcached/memcached_version.c +++ b/libmemcached/memcached_version.c @@ -10,6 +10,9 @@ static inline memcached_return memcached_version_textual(memcached_st *ptr); memcached_return memcached_version(memcached_st *ptr) { + if (ptr->flags & MEM_USE_UDP) + return MEMCACHED_NOT_SUPPORTED; + if (ptr->flags & MEM_BINARY_PROTOCOL) return memcached_version_binary(ptr); else diff --git a/tests/function.c b/tests/function.c index bb2d3993..09d2673e 100644 --- a/tests/function.c +++ b/tests/function.c @@ -3173,6 +3173,349 @@ static test_return analyzer_test(memcached_st *memc) return TEST_SUCCESS; } +static void increment_request_id(uint16_t *id) { + (*id)++; + if ((*id & UDP_REQUEST_ID_THREAD_MASK) != 0) + *id= 0; +} + +static uint16_t *get_udp_request_ids(memcached_st *memc) +{ + uint16_t *ids= malloc(sizeof(uint16_t) * memc->number_of_hosts); + assert(ids != NULL); + unsigned int i; + for (i= 0; i < memc->number_of_hosts; i++) + ids[i]= get_udp_datagram_request_id((struct udp_datagram_header_st *) memc->hosts[i].write_buffer); + + return ids; +} + +static test_return post_udp_op_check(memcached_st *memc, uint16_t *expected_req_ids) { + unsigned int i; + memcached_server_st *cur_server = memc->hosts; + uint16_t *cur_req_ids = get_udp_request_ids(memc); + for (i= 0; i < memc->number_of_hosts; i++) + { + assert(cur_server[i].cursor_active == 0); + assert(cur_req_ids[i] == expected_req_ids[i]); + } + free(expected_req_ids); + free(cur_req_ids); + return TEST_SUCCESS; +} + +/* +** There is a little bit of a hack here, instead of removing +** the servers, I just set num host to 0 and them add then new udp servers +**/ +static memcached_return init_udp(memcached_st *memc) +{ + memcached_version(memc); + if (memc->hosts[0].major_version != 1 || memc->hosts[0].minor_version != 2) + return MEMCACHED_FAILURE; + + uint32_t num_hosts= memc->number_of_hosts; + unsigned int i= 0; + memcached_server_st servers[num_hosts]; + memcpy(servers, memc->hosts, sizeof(memcached_server_st) * num_hosts); + memc->number_of_hosts= 0; + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1); + for (i= 0; i < num_hosts; i++) + { + assert(memcached_server_add_udp(memc, servers[i].hostname, servers[i].port) == MEMCACHED_SUCCESS); + assert(memc->hosts[i].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH); + } + return MEMCACHED_SUCCESS; +} + +static memcached_return binary_init_udp(memcached_st *memc) +{ + pre_binary(memc); + return init_udp(memc); +} + +/* Make sure that I cant add a tcp server to a udp client */ +static test_return add_tcp_server_udp_client_test(memcached_st *memc) +{ + memcached_server_st server; + memcached_server_clone(&server, &memc->hosts[0]); + assert(memcached_server_remove(&(memc->hosts[0])) == MEMCACHED_SUCCESS); + assert(memcached_server_add(memc, server.hostname, server.port) == MEMCACHED_INVALID_HOST_PROTOCOL); + return TEST_SUCCESS; +} + +/* Make sure that I cant add a udp server to a tcp client */ +static test_return add_udp_server_tcp_client_test(memcached_st *memc) +{ + memcached_server_st server; + memcached_server_clone(&server, &memc->hosts[0]); + assert(memcached_server_remove(&(memc->hosts[0])) == MEMCACHED_SUCCESS); + + memcached_st tcp_client; + memcached_create(&tcp_client); + assert(memcached_server_add_udp(&tcp_client, server.hostname, server.port) == MEMCACHED_INVALID_HOST_PROTOCOL); + return TEST_SUCCESS; +} + +static test_return set_udp_behavior_test(memcached_st *memc) +{ + + memcached_quit(memc); + memc->number_of_hosts= 0; + run_distribution(memc); + assert(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1) == MEMCACHED_SUCCESS); + assert(memc->flags & MEM_USE_UDP); + assert(memc->flags & MEM_NOREPLY);; + + assert(memc->number_of_hosts == 0); + + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP,0); + assert(!(memc->flags & MEM_USE_UDP)); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY,0); + assert(!(memc->flags & MEM_NOREPLY)); + return TEST_SUCCESS; +} + +static test_return udp_set_test(memcached_st *memc) +{ + unsigned int i= 0; + unsigned int num_iters= 1025; //request id rolls over at 1024 + for (i= 0; i < num_iters;i++) + { + memcached_return rc; + char *key= "foo"; + char *value= "when we sanitize"; + uint16_t *expected_ids= get_udp_request_ids(memc); + unsigned int server_key= memcached_generate_hash(memc,key,strlen(key)); + size_t init_offset= memc->hosts[server_key].write_buffer_offset; + rc= memcached_set(memc, key, strlen(key), + value, strlen(value), + (time_t)0, (uint32_t)0); + assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED); + /** NB, the check below assumes that if new write_ptr is less than + * the original write_ptr that we have flushed. For large payloads, this + * maybe an invalid assumption, but for the small payload we have it is OK + */ + if (rc == MEMCACHED_SUCCESS || + memc->hosts[server_key].write_buffer_offset < init_offset) + increment_request_id(&expected_ids[server_key]); + + if (rc == MEMCACHED_SUCCESS) { + assert(memc->hosts[server_key].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH); + } else { + assert(memc->hosts[server_key].write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH); + assert(memc->hosts[server_key].write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH); + } + assert(post_udp_op_check(memc,expected_ids) == TEST_SUCCESS); + } + return TEST_SUCCESS; +} + +static test_return udp_buffered_set_test(memcached_st *memc) +{ + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1); + return udp_set_test(memc); +} + +static test_return udp_set_too_big_test(memcached_st *memc) +{ + memcached_return rc; + char *key= "bar"; + char value[MAX_UDP_DATAGRAM_LENGTH]; + uint16_t *expected_ids= get_udp_request_ids(memc); + rc= memcached_set(memc, key, strlen(key), + value, MAX_UDP_DATAGRAM_LENGTH, + (time_t)0, (uint32_t)0); + assert(rc == MEMCACHED_WRITE_FAILURE); + return post_udp_op_check(memc,expected_ids); +} + +test_return udp_delete_test(memcached_st *memc) +{ + unsigned int i= 0; + unsigned int num_iters= 1025; //request id rolls over at 1024 + for (i= 0; i < num_iters;i++) + { + memcached_return rc; + char *key= "foo"; + uint16_t *expected_ids=get_udp_request_ids(memc); + unsigned int server_key= memcached_generate_hash(memc, key, strlen(key)); + size_t init_offset= memc->hosts[server_key].write_buffer_offset; + rc= memcached_delete(memc, key, strlen(key), 0); + assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED); + if (rc == MEMCACHED_SUCCESS || memc->hosts[server_key].write_buffer_offset < init_offset) + increment_request_id(&expected_ids[server_key]); + if (rc == MEMCACHED_SUCCESS) + assert(memc->hosts[server_key].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH); + else + { + assert(memc->hosts[server_key].write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH); + assert(memc->hosts[server_key].write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH); + } + assert(post_udp_op_check(memc,expected_ids) == TEST_SUCCESS); + } + return TEST_SUCCESS; +} + +static test_return udp_buffered_delete_test(memcached_st *memc) +{ + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1); + return udp_delete_test(memc); +} + +test_return udp_verbosity_test(memcached_st *memc) +{ + memcached_return rc; + uint16_t *expected_ids= get_udp_request_ids(memc); + unsigned int x; + for (x= 0; x < memc->number_of_hosts;x++) + increment_request_id(&expected_ids[x]); + + rc= memcached_verbosity(memc,3); + assert(rc == MEMCACHED_SUCCESS); + return post_udp_op_check(memc,expected_ids); +} + +test_return udp_quit_test(memcached_st *memc) +{ + uint16_t *expected_ids= get_udp_request_ids(memc); + memcached_quit(memc); + return post_udp_op_check(memc, expected_ids); +} + +test_return udp_flush_test(memcached_st *memc) +{ + memcached_return rc; + uint16_t *expected_ids= get_udp_request_ids(memc); + unsigned int x; + for (x= 0; x < memc->number_of_hosts;x++) + increment_request_id(&expected_ids[x]); + + rc= memcached_flush(memc,0); + assert(rc == MEMCACHED_SUCCESS); + return post_udp_op_check(memc,expected_ids); +} + +test_return udp_incr_test(memcached_st *memc) +{ + memcached_return rc; + char *key= "incr"; + char *value= "1"; + rc= memcached_set(memc, key, strlen(key), + value, strlen(value), + (time_t)0, (uint32_t)0); + + assert(rc == MEMCACHED_SUCCESS); + uint16_t *expected_ids= get_udp_request_ids(memc); + unsigned int server_key= memcached_generate_hash(memc, key, strlen(key)); + increment_request_id(&expected_ids[server_key]); + uint64_t newvalue; + rc= memcached_increment(memc, key, strlen(key), 1, &newvalue); + assert(rc == MEMCACHED_SUCCESS); + return post_udp_op_check(memc, expected_ids); +} + +test_return udp_decr_test(memcached_st *memc) +{ + memcached_return rc; + char *key= "decr"; + char *value= "1"; + rc= memcached_set(memc, key, strlen(key), + value, strlen(value), + (time_t)0, (uint32_t)0); + + assert(rc == MEMCACHED_SUCCESS); + uint16_t *expected_ids= get_udp_request_ids(memc); + unsigned int server_key= memcached_generate_hash(memc, key, strlen(key)); + increment_request_id(&expected_ids[server_key]); + uint64_t newvalue; + rc= memcached_decrement(memc, key, strlen(key), 1, &newvalue); + assert(rc == MEMCACHED_SUCCESS); + return post_udp_op_check(memc, expected_ids); +} + + +test_return udp_stat_test(memcached_st *memc) +{ + memcached_stat_st * rv= NULL; + memcached_return rc; + char args[]= ""; + uint16_t *expected_ids = get_udp_request_ids(memc); + rv = memcached_stat(memc, args, &rc); + free(rv); + assert(rc == MEMCACHED_NOT_SUPPORTED); + return post_udp_op_check(memc, expected_ids); +} + +test_return udp_version_test(memcached_st *memc) +{ + memcached_return rc; + uint16_t *expected_ids = get_udp_request_ids(memc); + rc = memcached_version(memc); + assert(rc == MEMCACHED_NOT_SUPPORTED); + return post_udp_op_check(memc, expected_ids); +} + +test_return udp_get_test(memcached_st *memc) +{ + memcached_return rc; + char *key= "foo"; + size_t vlen; + uint16_t *expected_ids = get_udp_request_ids(memc); + char *val= memcached_get(memc, key, strlen(key), &vlen, (uint32_t)0, &rc); + assert(rc == MEMCACHED_NOT_SUPPORTED); + assert(val == NULL); + return post_udp_op_check(memc, expected_ids); +} + +test_return udp_mixed_io_test(memcached_st *memc) +{ + test_st current_op; + test_st mixed_io_ops [] ={ + {"udp_set_test", 0, udp_set_test}, + {"udp_set_too_big_test", 0, udp_set_too_big_test}, + {"udp_delete_test", 0, udp_delete_test}, + {"udp_verbosity_test", 0, udp_verbosity_test}, + {"udp_quit_test", 0, udp_quit_test}, + {"udp_flush_test", 0, udp_flush_test}, + {"udp_incr_test", 0, udp_incr_test}, + {"udp_decr_test", 0, udp_decr_test}, + {"udp_version_test", 0, udp_version_test} + }; + unsigned int i= 0; + for (i= 0; i < 500; i++) + { + current_op= mixed_io_ops[random() % 9]; + assert(current_op.function(memc) == TEST_SUCCESS); + } + return TEST_SUCCESS; +} + +test_st udp_setup_server_tests[] ={ + {"set_udp_behavior_test", 0, set_udp_behavior_test}, + {"add_tcp_server_udp_client_test", 0, add_tcp_server_udp_client_test}, + {"add_udp_server_tcp_client_test", 0, add_udp_server_tcp_client_test}, + {0, 0, 0} +}; + +test_st upd_io_tests[] ={ + {"udp_set_test", 0, udp_set_test}, + {"udp_buffered_set_test", 0, udp_buffered_set_test}, + {"udp_set_too_big_test", 0, udp_set_too_big_test}, + {"udp_delete_test", 0, udp_delete_test}, + {"udp_buffered_delete_test", 0, udp_buffered_delete_test}, + {"udp_verbosity_test", 0, udp_verbosity_test}, + {"udp_quit_test", 0, udp_quit_test}, + {"udp_flush_test", 0, udp_flush_test}, + {"udp_incr_test", 0, udp_incr_test}, + {"udp_decr_test", 0, udp_decr_test}, + {"udp_stat_test", 0, udp_stat_test}, + {"udp_version_test", 0, udp_version_test}, + {"udp_get_test", 0, udp_get_test}, + {"udp_mixed_io_test", 0, udp_mixed_io_test}, + {0, 0, 0} +}; + /* Clean the server before beginning testing */ test_st tests[] ={ {"flush", 0, flush_test }, @@ -3317,6 +3660,9 @@ test_st consistent_weighted_tests[] ={ }; collection_st collection[] ={ + {"udp_setup", init_udp, 0, udp_setup_server_tests}, + {"udp_io", init_udp, 0, upd_io_tests}, + {"udp_binary_io", binary_init_udp, 0, upd_io_tests}, {"block", 0, 0, tests}, {"binary", pre_binary, 0, tests}, {"nonblock", pre_nonblock, 0, tests}, diff --git a/tests/server.c b/tests/server.c index 5023188c..8a7970a0 100644 --- a/tests/server.c +++ b/tests/server.c @@ -38,24 +38,14 @@ void server_startup(server_startup_st *construct) int count; int status; - if (construct->udp){ - if(x == 0) { - sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -U %u -m 128", - MEMCACHED_BINARY, x, x+ TEST_PORT_BASE); - } else { - sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -U %u", - MEMCACHED_BINARY, x, x+ TEST_PORT_BASE); - } - } - else{ - if(x == 0) { - sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -m 128", - MEMCACHED_BINARY, x, x+ TEST_PORT_BASE); - } else { - sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u", - MEMCACHED_BINARY, x, x+ TEST_PORT_BASE); - } + if(x == 0) { + sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -U %u -m 128", + MEMCACHED_BINARY, x, x + TEST_PORT_BASE, x + TEST_PORT_BASE); + } else { + sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -U %u", + MEMCACHED_BINARY, x, x + TEST_PORT_BASE, x + TEST_PORT_BASE); } + fprintf(stderr, "STARTING SERVER: %s\n", buffer); status= system(buffer); count= sprintf(end_ptr, "localhost:%u,", x + TEST_PORT_BASE); end_ptr+= count;