OPT_FLUSH,
OPT_HASH,
OPT_BINARY,
+ OPT_UDP
} memcached_options;
#endif /* CLIENT_OPTIONS */
static unsigned int opt_concurrency= 0;
static int opt_displayflag= 0;
static char *opt_servers= NULL;
+static int opt_udp_io= 0;
test_type opt_test= SET_TEST;
int main(int argc, char *argv[])
PTHREAD_CREATE_DETACHED);
memc= memcached_create(NULL);
+
+ /* We need to set udp behavior before adding servers to the client */
+ if (opt_udp_io)
+ {
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, opt_udp_io);
+ unsigned int i= 0;
+ for(i= 0; i < servers[0].count; i++ )
+ servers[i].type= MEMCACHED_CONNECTION_UDP;
+ }
memcached_server_push(memc, servers);
memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
{"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
{"version", no_argument, NULL, OPT_VERSION},
{"binary", no_argument, NULL, OPT_BINARY},
+ {"udp", no_argument, NULL, OPT_UDP},
{0, 0, 0, 0},
};
{
case 0:
break;
+ case OPT_UDP:
+ if (opt_test == GET_TEST)
+ {
+ fprintf(stderr, "You can not run a get test in UDP mode. UDP mode "
+ "does not currently support get ops.\n");
+ exit(1);
+ }
+ opt_udp_io= 1;
+ break;
case OPT_BINARY:
opt_binary = 1;
break;
break;
case OPT_SLAP_TEST:
if (!strcmp(optarg, "get"))
+ {
+ if (opt_udp_io == 1)
+ {
+ fprintf(stderr, "You can not run a get test in UDP mode. UDP mode "
+ "does not currently support get ops.\n");
+ exit(1);
+ }
opt_test= GET_TEST ;
+ }
else if (!strcmp(optarg, "set"))
opt_test= SET_TEST;
else
case OPT_HASH: return("Select hash type.");
case OPT_BINARY: return("Switch to binary protocol.");
case OPT_ANALYZE: return("Analyze the provided servers.");
+ case OPT_UDP: return("Use UDP protocol when communicating with server.");
};
WATCHPOINT_ASSERT(0);
MEM_KETAMA_WEIGHTED= (1 << 11),
MEM_BINARY_PROTOCOL= (1 << 12),
MEM_HASH_WITH_PREFIX_KEY= (1 << 13),
- MEM_NOREPLY= (1 << 14)
+ MEM_NOREPLY= (1 << 14),
+ MEM_USE_UDP= (1 << 15)
} memcached_flags;
/* Hashing algo */
if (new_clone == NULL)
return NULL;
- if (source->hosts)
- rc= memcached_server_push(new_clone, source->hosts);
-
- if (rc != MEMCACHED_SUCCESS)
- {
- memcached_free(new_clone);
-
- return NULL;
- }
-
-
new_clone->flags= source->flags;
new_clone->send_size= source->send_size;
new_clone->recv_size= source->recv_size;
new_clone->get_key_failure= source->get_key_failure;
new_clone->delete_trigger= source->delete_trigger;
+ if (source->hosts)
+ rc= memcached_server_push(new_clone, source->hosts);
+
+ if (rc != MEMCACHED_SUCCESS)
+ {
+ memcached_free(new_clone);
+
+ return NULL;
+ }
+
+
if (source->prefix_key[0] != 0)
{
strcpy(new_clone->prefix_key, source->prefix_key);
memcached_return rc;
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
unsigned int server_key;
+ bool no_reply= (ptr->flags & MEM_NOREPLY);
unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0)
return MEMCACHED_NO_SERVERS;
server_key= memcached_generate_hash(ptr, key, key_length);
send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "%s %s%.*s %u\r\n", verb,
+ "%s %s%.*s %u%s\r\n", verb,
ptr->prefix_key,
(int)key_length, key,
- offset);
+ offset, no_reply ? " noreply" : "");
unlikely (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
return MEMCACHED_WRITE_FAILURE;
rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
- if (rc != MEMCACHED_SUCCESS)
+ if (no_reply || rc != MEMCACHED_SUCCESS)
return rc;
rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
uint32_t offset, uint64_t *value)
{
unsigned int server_key;
+ bool no_reply= (ptr->flags & MEM_NOREPLY);
unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0)
return MEMCACHED_NO_SERVERS;
server_key= memcached_generate_hash(ptr, key, key_length);
+ if (no_reply)
+ {
+ if(cmd == PROTOCOL_BINARY_CMD_DECREMENT)
+ cmd= PROTOCOL_BINARY_CMD_DECREMENTQ;
+ if(cmd == PROTOCOL_BINARY_CMD_INCREMENT)
+ cmd= PROTOCOL_BINARY_CMD_INCREMENTQ;
+ }
protocol_binary_request_incr request= {.bytes= {0}};
request.message.header.request.magic= PROTOCOL_BINARY_REQ;
memcached_io_reset(&ptr->hosts[server_key]);
return MEMCACHED_WRITE_FAILURE;
}
-
+
+ if (no_reply)
+ return MEMCACHED_SUCCESS;
return memcached_response(&ptr->hosts[server_key], (char*)value, sizeof(*value), NULL);
}
#include <netinet/tcp.h>
/*
- This function is used to modify the behabior of running client.
+ This function is used to modify the behavior of running client.
We quit all connections so we can reset the sockets.
*/
set_behavior_flag(ptr, MEM_BUFFER_REQUESTS, data);
memcached_quit(ptr);
break;
+ case MEMCACHED_BEHAVIOR_USE_UDP:
+ if (ptr->number_of_hosts)
+ return MEMCACHED_FAILURE;
+ set_behavior_flag(ptr, MEM_USE_UDP, data);
+ if (data)
+ set_behavior_flag(ptr,MEM_NOREPLY,data);
+ break;
case MEMCACHED_BEHAVIOR_TCP_NODELAY:
set_behavior_flag(ptr, MEM_TCP_NODELAY, data);
memcached_quit(ptr);
case MEMCACHED_BEHAVIOR_BUFFER_REQUESTS:
temp_flag= MEM_BUFFER_REQUESTS;
break;
+ case MEMCACHED_BEHAVIOR_USE_UDP:
+ temp_flag= MEM_USE_UDP;
+ break;
case MEMCACHED_BEHAVIOR_TCP_NODELAY:
temp_flag= MEM_TCP_NODELAY;
break;
MEMCACHED_TIMEOUT,
MEMCACHED_BUFFERED,
MEMCACHED_BAD_KEY_PROVIDED,
+ MEMCACHED_INVALID_HOST_PROTOCOL,
MEMCACHED_MAXIMUM_RETURN /* Always add new error code before */
} memcached_return;
MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK,
MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK,
MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY,
- MEMCACHED_BEHAVIOR_NOREPLY
+ MEMCACHED_BEHAVIOR_NOREPLY,
+ MEMCACHED_BEHAVIOR_USE_UDP
} memcached_behavior;
typedef enum {
server_key= memcached_generate_hash(ptr, master_key, master_key_length);
to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
+ bool no_reply= (ptr->flags & MEM_NOREPLY);
if (ptr->flags & MEM_BINARY_PROTOCOL)
rc= binary_delete(ptr, server_key, key, key_length, to_write);
{
if (expiration)
send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "delete %s%.*s %u\r\n",
+ "delete %s%.*s %u%s\r\n",
ptr->prefix_key,
(int)key_length, key,
- (uint32_t)expiration);
+ (uint32_t)expiration, no_reply ? " noreply" :"" );
else
send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "delete %s%.*s\r\n",
+ "delete %s%.*s%s\r\n",
ptr->prefix_key,
- (int)key_length, key);
+ (int)key_length, key, no_reply ? " noreply" :"");
if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
{
rc= MEMCACHED_WRITE_FAILURE;
goto error;
}
-
+
+ if (ptr->flags & MEM_USE_UDP && !to_write)
+ {
+ if (send_length > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+ return MEMCACHED_WRITE_FAILURE;
+ if (send_length + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+ memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
+ }
+
rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
}
goto error;
if ((ptr->flags & MEM_BUFFER_REQUESTS))
- {
rc= MEMCACHED_BUFFERED;
- }
- else
+ else if (!no_reply)
{
rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
if (rc == MEMCACHED_DELETED)
protocol_binary_request_delete request= {.bytes= {0}};
request.message.header.request.magic= PROTOCOL_BINARY_REQ;
- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE;
+ if (ptr->flags & MEM_NOREPLY)
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ;
+ else
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE;
request.message.header.request.keylen= htons((uint16_t)key_length);
request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
request.message.header.request.bodylen= htonl(key_length);
+
+ if (ptr->flags & MEM_USE_UDP && !flush)
+ {
+ size_t cmd_size= sizeof(request.bytes) + key_length;
+ if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+ return MEMCACHED_WRITE_FAILURE;
+ if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+ memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
+ }
if ((memcached_do(&ptr->hosts[server_key], request.bytes,
sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
return rc;
}
+ /*
+ ** Since non buffering ops in UDP mode dont check to make sure they will fit
+ ** before they start writing, if there is any data in buffer, clear it out,
+ ** otherwise we might get a partial write.
+ **/
+ if (ptr->type == MEMCACHED_CONNECTION_UDP && with_flush && ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH)
+ memcached_io_write(ptr, NULL, 0, 1);
+
sent_length= memcached_io_write(ptr, command, command_length, with_flush);
if (sent_length == -1 || (size_t)sent_length != command_length)
size_t to_read;
char *value_ptr;
+ if (ptr->root->flags & MEM_USE_UDP)
+ return MEMCACHED_NOT_SUPPORTED;
+
WATCHPOINT_ASSERT(ptr->root);
end_ptr= buffer + MEMCACHED_DEFAULT_COMMAND_SIZE;
{
memcached_result_st *result_buffer= &ptr->result;
+ if (ptr->flags & MEM_USE_UDP)
+ {
+ *error= MEMCACHED_NOT_SUPPORTED;
+ return NULL;
+ }
+
while (ptr->cursor_server < ptr->number_of_hosts)
{
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
memcached_result_st *result,
memcached_return *error)
{
+
+ if (ptr->flags & MEM_USE_UDP)
+ {
+ *error= MEMCACHED_NOT_SUPPORTED;
+ return NULL;
+ }
+
if (result == NULL)
result= memcached_result_create(ptr, NULL);
for (x= 0; x < ptr->number_of_hosts; x++)
{
+ bool no_reply= (ptr->flags & MEM_NOREPLY);
if (expiration)
send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "flush_all %llu\r\n", (unsigned long long)expiration);
+ "flush_all %llu%s\r\n",
+ (unsigned long long)expiration, no_reply ? " noreply" : "");
else
send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "flush_all\r\n");
+ "flush_all%s\r\n", no_reply ? " noreply" : "");
rc= memcached_do(&ptr->hosts[x], buffer, send_length, 1);
- if (rc == MEMCACHED_SUCCESS)
+ if (rc == MEMCACHED_SUCCESS && !no_reply)
(void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
}
for (x= 0; x < ptr->number_of_hosts; x++)
{
+ if (ptr->flags & MEM_NOREPLY)
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ;
+ else
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
if (memcached_do(&ptr->hosts[x], request.bytes,
sizeof(request.bytes), 1) != MEMCACHED_SUCCESS)
{
uint32_t dummy_flags;
memcached_return dummy_error;
+ if (ptr->flags & MEM_USE_UDP)
+ {
+ *error= MEMCACHED_NOT_SUPPORTED;
+ return NULL;
+ }
+
/* Request the key */
*error= memcached_mget_by_key(ptr,
master_key,
uint8_t get_command_length= 4;
unsigned int master_server_key= 0;
+ if (ptr->flags & MEM_USE_UDP)
+ return MEMCACHED_NOT_SUPPORTED;
+
LIBMEMCACHED_MEMCACHED_MGET_START();
ptr->cursor_server= 0;
for (x= 0; x < count; x++)
{
+ if ((ptr->flags & MEM_USE_UDP && list[x].type != MEMCACHED_CONNECTION_UDP)
+ || ((list[x].type == MEMCACHED_CONNECTION_UDP)
+ && ! (ptr->flags & MEM_USE_UDP)) )
+ return MEMCACHED_INVALID_HOST_PROTOCOL;
+
WATCHPOINT_ASSERT(list[x].hostname[0] != 0);
memcached_server_create(ptr, &ptr->hosts[ptr->number_of_hosts]);
/* TODO check return type */
port= MEMCACHED_DEFAULT_PORT;
if (!hostname)
- hostname= "localhost";
+ hostname= "localhost";
return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_TCP);
}
{
memcached_server_st *new_host_list;
+ if ( (ptr->flags & MEM_USE_UDP && type != MEMCACHED_CONNECTION_UDP)
+ || ( (type == MEMCACHED_CONNECTION_UDP) && !(ptr->flags & MEM_USE_UDP) ) )
+ return MEMCACHED_INVALID_HOST_PROTOCOL;
+
if (ptr->call_realloc)
new_host_list= (memcached_server_st *)ptr->call_realloc(ptr, ptr->hosts,
sizeof(memcached_server_st) * (ptr->number_of_hosts+1));
} memc_read_or_write;
static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
+static void increment_udp_message_id(memcached_server_st *ptr);
static memcached_return io_wait(memcached_server_st *ptr,
memc_read_or_write read_or_write)
{
char *write_ptr;
size_t should_write;
+ size_t buffer_end;
- should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
- write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
-
- should_write= (should_write < length) ? should_write : length;
+ if (ptr->type == MEMCACHED_CONNECTION_UDP)
+ {
+ //UDP does not support partial writes
+ buffer_end= MAX_UDP_DATAGRAM_LENGTH;
+ should_write= length;
+ if (ptr->write_buffer_offset + should_write > buffer_end)
+ return -1;
+ }
+ else
+ {
+ buffer_end= MEMCACHED_MAX_BUFFER;
+ should_write= buffer_end - ptr->write_buffer_offset;
+ should_write= (should_write < length) ? should_write : length;
+ }
+ write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
memcpy(write_ptr, buffer_ptr, should_write);
ptr->write_buffer_offset+= should_write;
buffer_ptr+= should_write;
length-= should_write;
- if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
+ if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
{
memcached_return rc;
ssize_t sent_length;
/* If io_flush calls memcached_purge, sent_length may be 0 */
if (sent_length != 0)
- WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
+ WATCHPOINT_ASSERT(sent_length == buffer_end);
}
}
WATCHPOINT_ASSERT(ptr->fd != -1);
- if (ptr->write_buffer_offset == 0)
+ // UDP Sanity check, make sure that we are not sending somthing too big
+ if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
+ return -1;
+
+ if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
+ && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
return 0;
/* Looking for memory overflows */
WATCHPOINT_ASSERT(write_length > 0);
sent_length= 0;
if (ptr->type == MEMCACHED_CONNECTION_UDP)
+ increment_udp_message_id(ptr);
+ sent_length= write(ptr->fd, local_write_ptr, write_length);
+
+ if (sent_length == -1)
{
- struct addrinfo *ai;
-
- ai= ptr->address_info;
-
- /* Crappy test code */
- char buffer[HUGE_STRING_LEN + 8];
- memset(buffer, 0, HUGE_STRING_LEN + 8);
- memcpy (buffer+8, local_write_ptr, write_length);
- buffer[0]= 0;
- buffer[1]= 0;
- buffer[2]= 0;
- buffer[3]= 0;
- buffer[4]= 0;
- buffer[5]= 1;
- buffer[6]= 0;
- buffer[7]= 0;
- sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
- (struct sockaddr *)ai->ai_addr,
- ai->ai_addrlen);
- if (sent_length == -1)
+ ptr->cached_errno= errno;
+ switch (errno)
{
- WATCHPOINT_ERRNO(errno);
- WATCHPOINT_ASSERT(0);
- }
- sent_length-= 8; /* We remove the header */
- }
- else
- {
- WATCHPOINT_ASSERT(ptr->fd != -1);
- if ((sent_length= write(ptr->fd, local_write_ptr,
- write_length)) == -1)
+ case ENOBUFS:
+ continue;
+ case EAGAIN:
{
- ptr->cached_errno= errno;
- switch (errno)
- {
- case ENOBUFS:
- continue;
- case EAGAIN:
- {
- memcached_return rc;
- rc= io_wait(ptr, MEM_WRITE);
+ memcached_return rc;
+ rc= io_wait(ptr, MEM_WRITE);
- if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
- continue;
+ if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
+ continue;
- memcached_quit_server(ptr, 1);
- return -1;
- }
- default:
- memcached_quit_server(ptr, 1);
- *error= MEMCACHED_ERRNO;
- return -1;
- }
+ memcached_quit_server(ptr, 1);
+ return -1;
+ }
+ default:
+ memcached_quit_server(ptr, 1);
+ *error= MEMCACHED_ERRNO;
+ return -1;
}
}
+ if (ptr->type == MEMCACHED_CONNECTION_UDP && sent_length != write_length)
+ {
+ memcached_quit_server(ptr, 1);
+ return -1;
+ }
+
ptr->io_bytes_sent += sent_length;
local_write_ptr+= sent_length;
WATCHPOINT_ASSERT(write_length == 0);
// Need to study this assert() WATCHPOINT_ASSERT(return_length ==
// ptr->write_buffer_offset);
- ptr->write_buffer_offset= 0;
+
+ // if we are a udp server, the begining of the buffer is reserverd for
+ // the upd frame header
+ if (ptr->type == MEMCACHED_CONNECTION_UDP)
+ ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
+ else
+ ptr->write_buffer_offset= 0;
return return_length;
}
return MEMCACHED_SUCCESS;
}
+
+/*
+ * The udp request id consists of two seperate sections
+ * 1) The thread id
+ * 2) The message number
+ * The thread id should only be set when the memcached_st struct is created
+ * and should not be changed.
+ *
+ * The message num is incremented for each new message we send, this function
+ * extracts the message number from message_id, increments it and then
+ * writes the new value back into the header
+ */
+static void increment_udp_message_id(memcached_server_st *ptr)
+{
+ struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
+ uint16_t cur_req= get_udp_datagram_request_id(header);
+ uint16_t msg_num= get_msg_num_from_request_id(cur_req);
+ uint16_t thread_id= get_thread_id_from_request_id(cur_req);
+
+ if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
+ msg_num= 0;
+
+ header->request_id= htons(thread_id | msg_num);
+}
+
+memcached_return memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id)
+{
+ if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
+ return MEMCACHED_FAILURE;
+
+ struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
+ header->request_id= htons(generate_udp_request_thread_id(thread_id));
+ header->num_datagrams= htons(1);
+ header->sequence_number= htons(0);
+
+ return MEMCACHED_SUCCESS;
+}
#ifndef __MEMCACHED_IO_H__
#define __MEMCACHED_IO_H__
+#define MAX_UDP_DATAGRAM_LENGTH 1400
+#define UDP_DATAGRAM_HEADER_LENGTH 8
+#define UDP_REQUEST_ID_MSG_SIG_DIGITS 10
+#define UDP_REQUEST_ID_THREAD_MASK 0xFFFF << UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define get_udp_datagram_request_id(A) ntohs((A)->request_id)
+#define get_udp_datagram_seq_num(A) ntohs((A)->sequence_number)
+#define get_udp_datagram_num_datagrams(A) ntohs((A)->num_datagrams)
+#define get_msg_num_from_request_id(A) ( (A) & (~(UDP_REQUEST_ID_THREAD_MASK)) )
+#define get_thread_id_from_request_id(A) ( (A) & (UDP_REQUEST_ID_THREAD_MASK) ) >> UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define generate_udp_request_thread_id(A) (A) << UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define UDP_REQUEST_ID_MAX_THREAD_ID get_thread_id_from_request_id(0xFFFF)
+
+struct udp_datagram_header_st {
+ uint16_t request_id;
+ uint16_t sequence_number;
+ uint16_t num_datagrams;
+ uint16_t reserved;
+};
+
ssize_t memcached_io_write(memcached_server_st *ptr,
const void *buffer, size_t length, char with_flush);
void memcached_io_reset(memcached_server_st *ptr);
memcached_return memcached_read_one_response(memcached_server_st *ptr,
char *buffer, size_t buffer_length,
memcached_result_st *result);
+memcached_return memcached_io_init_udp_header(memcached_server_st *ptr,
+ uint16_t thread_id);
#endif /* __MEMCACHED_IO_H__ */
{
if (ptr->fd != -1)
{
- if (io_death == 0)
+ if (io_death == 0 && ptr->type != MEMCACHED_CONNECTION_UDP)
{
memcached_return rc;
ssize_t read_length;
memcached_io_close(ptr);
ptr->fd= -1;
- ptr->write_buffer_offset= 0;
+ ptr->write_buffer_offset= (ptr->type == MEMCACHED_CONNECTION_UDP) ? UDP_DATAGRAM_HEADER_LENGTH : 0 ;
ptr->read_buffer_length= 0;
ptr->read_ptr= ptr->read_buffer;
memcached_server_response_reset(ptr);
host->read_ptr= host->read_buffer;
if (memc)
host->next_retry= memc->retry_timeout;
+ if (type == MEMCACHED_CONNECTION_UDP)
+ {
+ host->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
+ memcached_io_init_udp_header(host, 0);
+ }
return host;
}
memcached_return rc;
memcached_stat_st *stats;
+ if (ptr->flags & MEM_USE_UDP)
+ {
+ *error= MEMCACHED_NOT_SUPPORTED;
+ return NULL;
+ }
+
if (ptr->call_malloc)
stats= (memcached_stat_st *)ptr->call_malloc(ptr, sizeof(memcached_stat_st)*(ptr->number_of_hosts));
else
(unsigned long long)expiration, value_length,
(ptr->flags & MEM_NOREPLY) ? " noreply" : "");
+ if (ptr->flags & MEM_USE_UDP && ptr->flags & MEM_BUFFER_REQUESTS) {
+ size_t cmd_size= write_length + value_length + 2;
+ if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+ return MEMCACHED_WRITE_FAILURE;
+ if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+ memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
+ }
+
if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
{
rc= MEMCACHED_WRITE_FAILURE;
}
if (ptr->flags & MEM_NOREPLY)
- {
return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
- }
if (to_write == 0)
return MEMCACHED_BUFFERED;
char flush;
protocol_binary_request_set request= {.bytes= {0}};
size_t send_length= sizeof(request.bytes);
- bool noreply = server->root->flags & MEM_NOREPLY;
+ bool noreply= server->root->flags & MEM_NOREPLY;
request.message.header.request.magic= PROTOCOL_BINARY_REQ;
request.message.header.request.opcode= get_com_code(verb, noreply);
flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
+ if ((server->root->flags & MEM_USE_UDP) && !flush)
+ {
+ size_t cmd_size= send_length + key_length + value_length;
+ if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+ return MEMCACHED_WRITE_FAILURE;
+ if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+ memcached_io_write(server,NULL,0, 1);
+ }
+
/* write the header */
if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
(memcached_io_write(server, key, key_length, 0) == -1) ||
continue;
}
+ if (ptr->flags & MEM_USE_UDP)
+ continue;
+
rrc= memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
if (rrc != MEMCACHED_SUCCESS)
rc= MEMCACHED_SOME_ERRORS;
memcached_return memcached_version(memcached_st *ptr)
{
+ if (ptr->flags & MEM_USE_UDP)
+ return MEMCACHED_NOT_SUPPORTED;
+
if (ptr->flags & MEM_BINARY_PROTOCOL)
return memcached_version_binary(ptr);
else
return TEST_SUCCESS;
}
+static void increment_request_id(uint16_t *id) {
+ (*id)++;
+ if ((*id & UDP_REQUEST_ID_THREAD_MASK) != 0)
+ *id= 0;
+}
+
+static uint16_t *get_udp_request_ids(memcached_st *memc)
+{
+ uint16_t *ids= malloc(sizeof(uint16_t) * memc->number_of_hosts);
+ assert(ids != NULL);
+ unsigned int i;
+ for (i= 0; i < memc->number_of_hosts; i++)
+ ids[i]= get_udp_datagram_request_id((struct udp_datagram_header_st *) memc->hosts[i].write_buffer);
+
+ return ids;
+}
+
+static test_return post_udp_op_check(memcached_st *memc, uint16_t *expected_req_ids) {
+ unsigned int i;
+ memcached_server_st *cur_server = memc->hosts;
+ uint16_t *cur_req_ids = get_udp_request_ids(memc);
+ for (i= 0; i < memc->number_of_hosts; i++)
+ {
+ assert(cur_server[i].cursor_active == 0);
+ assert(cur_req_ids[i] == expected_req_ids[i]);
+ }
+ free(expected_req_ids);
+ free(cur_req_ids);
+ return TEST_SUCCESS;
+}
+
+/*
+** There is a little bit of a hack here, instead of removing
+** the servers, I just set num host to 0 and them add then new udp servers
+**/
+static memcached_return init_udp(memcached_st *memc)
+{
+ memcached_version(memc);
+ if (memc->hosts[0].major_version != 1 || memc->hosts[0].minor_version != 2)
+ return MEMCACHED_FAILURE;
+
+ uint32_t num_hosts= memc->number_of_hosts;
+ unsigned int i= 0;
+ memcached_server_st servers[num_hosts];
+ memcpy(servers, memc->hosts, sizeof(memcached_server_st) * num_hosts);
+ memc->number_of_hosts= 0;
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1);
+ for (i= 0; i < num_hosts; i++)
+ {
+ assert(memcached_server_add_udp(memc, servers[i].hostname, servers[i].port) == MEMCACHED_SUCCESS);
+ assert(memc->hosts[i].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+ }
+ return MEMCACHED_SUCCESS;
+}
+
+static memcached_return binary_init_udp(memcached_st *memc)
+{
+ pre_binary(memc);
+ return init_udp(memc);
+}
+
+/* Make sure that I cant add a tcp server to a udp client */
+static test_return add_tcp_server_udp_client_test(memcached_st *memc)
+{
+ memcached_server_st server;
+ memcached_server_clone(&server, &memc->hosts[0]);
+ assert(memcached_server_remove(&(memc->hosts[0])) == MEMCACHED_SUCCESS);
+ assert(memcached_server_add(memc, server.hostname, server.port) == MEMCACHED_INVALID_HOST_PROTOCOL);
+ return TEST_SUCCESS;
+}
+
+/* Make sure that I cant add a udp server to a tcp client */
+static test_return add_udp_server_tcp_client_test(memcached_st *memc)
+{
+ memcached_server_st server;
+ memcached_server_clone(&server, &memc->hosts[0]);
+ assert(memcached_server_remove(&(memc->hosts[0])) == MEMCACHED_SUCCESS);
+
+ memcached_st tcp_client;
+ memcached_create(&tcp_client);
+ assert(memcached_server_add_udp(&tcp_client, server.hostname, server.port) == MEMCACHED_INVALID_HOST_PROTOCOL);
+ return TEST_SUCCESS;
+}
+
+static test_return set_udp_behavior_test(memcached_st *memc)
+{
+
+ memcached_quit(memc);
+ memc->number_of_hosts= 0;
+ run_distribution(memc);
+ assert(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1) == MEMCACHED_SUCCESS);
+ assert(memc->flags & MEM_USE_UDP);
+ assert(memc->flags & MEM_NOREPLY);;
+
+ assert(memc->number_of_hosts == 0);
+
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP,0);
+ assert(!(memc->flags & MEM_USE_UDP));
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY,0);
+ assert(!(memc->flags & MEM_NOREPLY));
+ return TEST_SUCCESS;
+}
+
+static test_return udp_set_test(memcached_st *memc)
+{
+ unsigned int i= 0;
+ unsigned int num_iters= 1025; //request id rolls over at 1024
+ for (i= 0; i < num_iters;i++)
+ {
+ memcached_return rc;
+ char *key= "foo";
+ char *value= "when we sanitize";
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int server_key= memcached_generate_hash(memc,key,strlen(key));
+ size_t init_offset= memc->hosts[server_key].write_buffer_offset;
+ rc= memcached_set(memc, key, strlen(key),
+ value, strlen(value),
+ (time_t)0, (uint32_t)0);
+ assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+ /** NB, the check below assumes that if new write_ptr is less than
+ * the original write_ptr that we have flushed. For large payloads, this
+ * maybe an invalid assumption, but for the small payload we have it is OK
+ */
+ if (rc == MEMCACHED_SUCCESS ||
+ memc->hosts[server_key].write_buffer_offset < init_offset)
+ increment_request_id(&expected_ids[server_key]);
+
+ if (rc == MEMCACHED_SUCCESS) {
+ assert(memc->hosts[server_key].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+ } else {
+ assert(memc->hosts[server_key].write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH);
+ assert(memc->hosts[server_key].write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH);
+ }
+ assert(post_udp_op_check(memc,expected_ids) == TEST_SUCCESS);
+ }
+ return TEST_SUCCESS;
+}
+
+static test_return udp_buffered_set_test(memcached_st *memc)
+{
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
+ return udp_set_test(memc);
+}
+
+static test_return udp_set_too_big_test(memcached_st *memc)
+{
+ memcached_return rc;
+ char *key= "bar";
+ char value[MAX_UDP_DATAGRAM_LENGTH];
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ rc= memcached_set(memc, key, strlen(key),
+ value, MAX_UDP_DATAGRAM_LENGTH,
+ (time_t)0, (uint32_t)0);
+ assert(rc == MEMCACHED_WRITE_FAILURE);
+ return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_delete_test(memcached_st *memc)
+{
+ unsigned int i= 0;
+ unsigned int num_iters= 1025; //request id rolls over at 1024
+ for (i= 0; i < num_iters;i++)
+ {
+ memcached_return rc;
+ char *key= "foo";
+ uint16_t *expected_ids=get_udp_request_ids(memc);
+ unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+ size_t init_offset= memc->hosts[server_key].write_buffer_offset;
+ rc= memcached_delete(memc, key, strlen(key), 0);
+ assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+ if (rc == MEMCACHED_SUCCESS || memc->hosts[server_key].write_buffer_offset < init_offset)
+ increment_request_id(&expected_ids[server_key]);
+ if (rc == MEMCACHED_SUCCESS)
+ assert(memc->hosts[server_key].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+ else
+ {
+ assert(memc->hosts[server_key].write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH);
+ assert(memc->hosts[server_key].write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH);
+ }
+ assert(post_udp_op_check(memc,expected_ids) == TEST_SUCCESS);
+ }
+ return TEST_SUCCESS;
+}
+
+static test_return udp_buffered_delete_test(memcached_st *memc)
+{
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
+ return udp_delete_test(memc);
+}
+
+test_return udp_verbosity_test(memcached_st *memc)
+{
+ memcached_return rc;
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int x;
+ for (x= 0; x < memc->number_of_hosts;x++)
+ increment_request_id(&expected_ids[x]);
+
+ rc= memcached_verbosity(memc,3);
+ assert(rc == MEMCACHED_SUCCESS);
+ return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_quit_test(memcached_st *memc)
+{
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ memcached_quit(memc);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_flush_test(memcached_st *memc)
+{
+ memcached_return rc;
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int x;
+ for (x= 0; x < memc->number_of_hosts;x++)
+ increment_request_id(&expected_ids[x]);
+
+ rc= memcached_flush(memc,0);
+ assert(rc == MEMCACHED_SUCCESS);
+ return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_incr_test(memcached_st *memc)
+{
+ memcached_return rc;
+ char *key= "incr";
+ char *value= "1";
+ rc= memcached_set(memc, key, strlen(key),
+ value, strlen(value),
+ (time_t)0, (uint32_t)0);
+
+ assert(rc == MEMCACHED_SUCCESS);
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+ increment_request_id(&expected_ids[server_key]);
+ uint64_t newvalue;
+ rc= memcached_increment(memc, key, strlen(key), 1, &newvalue);
+ assert(rc == MEMCACHED_SUCCESS);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_decr_test(memcached_st *memc)
+{
+ memcached_return rc;
+ char *key= "decr";
+ char *value= "1";
+ rc= memcached_set(memc, key, strlen(key),
+ value, strlen(value),
+ (time_t)0, (uint32_t)0);
+
+ assert(rc == MEMCACHED_SUCCESS);
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+ increment_request_id(&expected_ids[server_key]);
+ uint64_t newvalue;
+ rc= memcached_decrement(memc, key, strlen(key), 1, &newvalue);
+ assert(rc == MEMCACHED_SUCCESS);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+
+test_return udp_stat_test(memcached_st *memc)
+{
+ memcached_stat_st * rv= NULL;
+ memcached_return rc;
+ char args[]= "";
+ uint16_t *expected_ids = get_udp_request_ids(memc);
+ rv = memcached_stat(memc, args, &rc);
+ free(rv);
+ assert(rc == MEMCACHED_NOT_SUPPORTED);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_version_test(memcached_st *memc)
+{
+ memcached_return rc;
+ uint16_t *expected_ids = get_udp_request_ids(memc);
+ rc = memcached_version(memc);
+ assert(rc == MEMCACHED_NOT_SUPPORTED);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_get_test(memcached_st *memc)
+{
+ memcached_return rc;
+ char *key= "foo";
+ size_t vlen;
+ uint16_t *expected_ids = get_udp_request_ids(memc);
+ char *val= memcached_get(memc, key, strlen(key), &vlen, (uint32_t)0, &rc);
+ assert(rc == MEMCACHED_NOT_SUPPORTED);
+ assert(val == NULL);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_mixed_io_test(memcached_st *memc)
+{
+ test_st current_op;
+ test_st mixed_io_ops [] ={
+ {"udp_set_test", 0, udp_set_test},
+ {"udp_set_too_big_test", 0, udp_set_too_big_test},
+ {"udp_delete_test", 0, udp_delete_test},
+ {"udp_verbosity_test", 0, udp_verbosity_test},
+ {"udp_quit_test", 0, udp_quit_test},
+ {"udp_flush_test", 0, udp_flush_test},
+ {"udp_incr_test", 0, udp_incr_test},
+ {"udp_decr_test", 0, udp_decr_test},
+ {"udp_version_test", 0, udp_version_test}
+ };
+ unsigned int i= 0;
+ for (i= 0; i < 500; i++)
+ {
+ current_op= mixed_io_ops[random() % 9];
+ assert(current_op.function(memc) == TEST_SUCCESS);
+ }
+ return TEST_SUCCESS;
+}
+
+test_st udp_setup_server_tests[] ={
+ {"set_udp_behavior_test", 0, set_udp_behavior_test},
+ {"add_tcp_server_udp_client_test", 0, add_tcp_server_udp_client_test},
+ {"add_udp_server_tcp_client_test", 0, add_udp_server_tcp_client_test},
+ {0, 0, 0}
+};
+
+test_st upd_io_tests[] ={
+ {"udp_set_test", 0, udp_set_test},
+ {"udp_buffered_set_test", 0, udp_buffered_set_test},
+ {"udp_set_too_big_test", 0, udp_set_too_big_test},
+ {"udp_delete_test", 0, udp_delete_test},
+ {"udp_buffered_delete_test", 0, udp_buffered_delete_test},
+ {"udp_verbosity_test", 0, udp_verbosity_test},
+ {"udp_quit_test", 0, udp_quit_test},
+ {"udp_flush_test", 0, udp_flush_test},
+ {"udp_incr_test", 0, udp_incr_test},
+ {"udp_decr_test", 0, udp_decr_test},
+ {"udp_stat_test", 0, udp_stat_test},
+ {"udp_version_test", 0, udp_version_test},
+ {"udp_get_test", 0, udp_get_test},
+ {"udp_mixed_io_test", 0, udp_mixed_io_test},
+ {0, 0, 0}
+};
+
/* Clean the server before beginning testing */
test_st tests[] ={
{"flush", 0, flush_test },
};
collection_st collection[] ={
+ {"udp_setup", init_udp, 0, udp_setup_server_tests},
+ {"udp_io", init_udp, 0, upd_io_tests},
+ {"udp_binary_io", binary_init_udp, 0, upd_io_tests},
{"block", 0, 0, tests},
{"binary", pre_binary, 0, tests},
{"nonblock", pre_nonblock, 0, tests},
int count;
int status;
- if (construct->udp){
- if(x == 0) {
- sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -U %u -m 128",
- MEMCACHED_BINARY, x, x+ TEST_PORT_BASE);
- } else {
- sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -U %u",
- MEMCACHED_BINARY, x, x+ TEST_PORT_BASE);
- }
- }
- else{
- if(x == 0) {
- sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -m 128",
- MEMCACHED_BINARY, x, x+ TEST_PORT_BASE);
- } else {
- sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u",
- MEMCACHED_BINARY, x, x+ TEST_PORT_BASE);
- }
+ if(x == 0) {
+ sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -U %u -m 128",
+ MEMCACHED_BINARY, x, x + TEST_PORT_BASE, x + TEST_PORT_BASE);
+ } else {
+ sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -U %u",
+ MEMCACHED_BINARY, x, x + TEST_PORT_BASE, x + TEST_PORT_BASE);
}
+ fprintf(stderr, "STARTING SERVER: %s\n", buffer);
status= system(buffer);
count= sprintf(end_ptr, "localhost:%u,", x + TEST_PORT_BASE);
end_ptr+= count;