From cf34e0d38a58e81ec3993af0db6c221585d712e5 Mon Sep 17 00:00:00 2001 From: Brian Aker Date: Mon, 1 Oct 2007 17:04:49 -0700 Subject: [PATCH] Added buffered IO to write calls --- ChangeLog | 1 + include/memcached.h | 5 +++ lib/Makefile.am | 20 ++++++------ lib/memcached_delete.c | 14 +++++++-- lib/memcached_io.c | 67 +++++++++++++++++++++++++++++++++++++++++ lib/memcached_io.h | 7 +++++ lib/memcached_storage.c | 51 +++++++++++++++++++++++++++---- tests/test.c | 62 +++++++++++++++++++++++++++++++++++++- 8 files changed, 208 insertions(+), 19 deletions(-) create mode 100644 lib/memcached_io.c create mode 100644 lib/memcached_io.h diff --git a/ChangeLog b/ChangeLog index 8230194b..f295a9dd 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,5 @@ 0.4 + * Added buffered IO to write calls for keys * memstat was broken (bad if/else on connect) 0.3 Mon Oct 1 06:37:52 PDT 2007 diff --git a/include/memcached.h b/include/memcached.h index 9c2e2f84..f4d0b25a 100644 --- a/include/memcached.h +++ b/include/memcached.h @@ -38,6 +38,8 @@ typedef struct memcached_server_st memcached_server_st; #define SMALL_STRING_LEN 1024 #define HUGE_STRING_LEN 8196 #define MEMCACHED_MAX_KEY 251 /* We add one to have it null terminated */ +//#define MEMCACHED_MAX_BUFFER 8196 +#define MEMCACHED_MAX_BUFFER 30 typedef enum { MEMCACHED_SUCCESS, @@ -118,6 +120,9 @@ struct memcached_st { memcached_server_st *hosts; unsigned int number_of_hosts; unsigned int cursor_server; + char write_buffer[MEMCACHED_MAX_BUFFER]; + size_t write_buffer_offset; + size_t write_between_flush; char connected; }; diff --git a/lib/Makefile.am b/lib/Makefile.am index b474f28f..4ba350fb 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -21,24 +21,26 @@ DTRACEFILES = memcached.o \ memcached_stats.o noinst_HEADERS = libmemcached_probes.h \ + memcached_io.h \ common.h lib_LTLIBRARIES = libmemcached.la libmemcached_la_SOURCES = memcached.c \ - memcached_strerror.c \ + memcached_auto.c \ memcached_connect.c \ - memcached_response.c \ - memcached_get.c \ - memcached_storage.c \ memcached_delete.c \ + memcached_flush.c \ + memcached_get.c \ memcached_hash.c \ - memcached_auto.c \ - memcached_verbosity.c \ + memcached_hosts.c \ + memcached_io.c \ memcached_quit.c \ - memcached_flush.c \ + memcached_response.c \ + memcached_storage.c \ memcached_string.c \ - memcached_hosts.c \ - memcached_stats.c + memcached_stats.c \ + memcached_strerror.c \ + memcached_verbosity.c libmemcached_la_LIBADD = if HAVE_DTRACE diff --git a/lib/memcached_delete.c b/lib/memcached_delete.c index 64036c79..ecd7d8dc 100644 --- a/lib/memcached_delete.c +++ b/lib/memcached_delete.c @@ -26,18 +26,26 @@ memcached_return memcached_delete(memcached_st *ptr, char *key, size_t key_lengt "delete %.*s\r\n", (int)key_length, key); if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) - return MEMCACHED_WRITE_FAILURE; + { + rc= MEMCACHED_WRITE_FAILURE; + goto error; + } sent_length= send(ptr->hosts[server_key].fd, buffer, send_length, 0); if (sent_length == -1 || sent_length != send_length) - return MEMCACHED_WRITE_FAILURE; + { + rc= MEMCACHED_WRITE_FAILURE; + goto error; + } rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key); - LIBMEMCACHED_MEMCACHED_DELETE_END(); if (rc == MEMCACHED_DELETED) rc= MEMCACHED_SUCCESS; + LIBMEMCACHED_MEMCACHED_DELETE_END(); + +error: return rc; } diff --git a/lib/memcached_io.c b/lib/memcached_io.c new file mode 100644 index 00000000..7067237f --- /dev/null +++ b/lib/memcached_io.c @@ -0,0 +1,67 @@ +/* + Basic socket buffered IO +*/ + +#include + +ssize_t memcached_io_read(memcached_st *ptr, char *buf, size_t length) +{ + return -1; +} + +ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, + char *buffer, size_t length) +{ + unsigned long long x; + + for (x= 0; x < length; x++) + { + ptr->write_buffer[ptr->write_buffer_offset]= buffer[x]; + ptr->write_buffer_offset++; + if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) + { + size_t sent_length; + + if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, + MEMCACHED_MAX_BUFFER, 0)) == -1) + return -1; + + assert(sent_length == MEMCACHED_MAX_BUFFER); + ptr->write_between_flush+= MEMCACHED_MAX_BUFFER; + + ptr->write_buffer_offset= 0; + } + } + + return length; +} + +ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) +{ + size_t sent_length; + + if (ptr->write_buffer_offset == 0) + return 0; + + if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, + ptr->write_buffer_offset, 0)) == -1) + return -1; + + assert(sent_length == ptr->write_buffer_offset); + + sent_length+= ptr->write_between_flush; + + ptr->write_buffer_offset= 0; + ptr->write_between_flush= 0; + + return sent_length; +} + +/* + Eventually we will just kill off the server with the problem. +*/ +void memcached_io_reset(memcached_st *ptr, unsigned int server_key) +{ + ptr->write_buffer_offset= 0; + memcached_quit(ptr); +} diff --git a/lib/memcached_io.h b/lib/memcached_io.h new file mode 100644 index 00000000..d9abd625 --- /dev/null +++ b/lib/memcached_io.h @@ -0,0 +1,7 @@ +/* Server IO, Not public! */ +#include + +ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key); +ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, + char *buffer, size_t length); +void memcached_io_reset(memcached_st *ptr, unsigned int server_key); diff --git a/lib/memcached_storage.c b/lib/memcached_storage.c index c1ce2b83..0c27c766 100644 --- a/lib/memcached_storage.c +++ b/lib/memcached_storage.c @@ -8,6 +8,7 @@ */ #include "common.h" +#include "memcached_io.h" static memcached_return memcached_send(memcached_st *ptr, char *key, size_t key_length, @@ -31,6 +32,8 @@ static memcached_return memcached_send(memcached_st *ptr, if (rc != MEMCACHED_SUCCESS) return rc; + assert(ptr->write_buffer_offset == 0); + server_key= memcached_generate_hash(key, key_length) % ptr->number_of_hosts; write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, @@ -38,27 +41,63 @@ static memcached_return memcached_send(memcached_st *ptr, (int)key_length, key, flags, (unsigned long long)expiration, value_length); if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) - return MEMCACHED_WRITE_FAILURE; - if ((sent_length= send(ptr->hosts[server_key].fd, buffer, write_length, 0)) == -1) - return MEMCACHED_WRITE_FAILURE; + { + rc= MEMCACHED_WRITE_FAILURE; + goto error; + } + + if ((sent_length= memcached_io_write(ptr, server_key, buffer, write_length)) == -1) + { + rc= MEMCACHED_WRITE_FAILURE; + goto error; + } assert(write_length == sent_length); - if ((sent_length= send(ptr->hosts[server_key].fd, value, value_length, 0)) == -1) + /* + We have to flush after sending the command. Memcached is not smart enough + to just keep reading from the socket :( + */ + if ((sent_length= memcached_io_flush(ptr, server_key)) == -1) return MEMCACHED_WRITE_FAILURE; + + if ((sent_length= memcached_io_write(ptr, server_key, value, value_length)) == -1) + { + rc= MEMCACHED_WRITE_FAILURE; + goto error; + } assert(value_length == sent_length); - if ((sent_length= send(ptr->hosts[server_key].fd, "\r\n", 2, 0)) == -1) - return MEMCACHED_WRITE_FAILURE; + if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2)) == -1) + { + rc= MEMCACHED_WRITE_FAILURE; + goto error; + } + assert(2 == sent_length); + if ((sent_length= memcached_io_flush(ptr, server_key)) == -1) + return MEMCACHED_WRITE_FAILURE; + + //assert(sent_length == write_length + value_length + 2); + sent_length= recv(ptr->hosts[server_key].fd, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 0); if (sent_length && buffer[0] == 'S') /* STORED */ return MEMCACHED_SUCCESS; else if (write_length && buffer[0] == 'N') /* NOT_STORED */ return MEMCACHED_NOTSTORED; + else if (write_length && buffer[0] == 'E') /* ERROR */ + { + printf("BUFFER :%s:\n", buffer); + return MEMCACHED_PROTOCOL_ERROR; + } else return MEMCACHED_READ_FAILURE; + +error: + memcached_io_reset(ptr, server_key); + + return rc; } memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length, diff --git a/tests/test.c b/tests/test.c index 37ed800b..7a3b8629 100644 --- a/tests/test.c +++ b/tests/test.c @@ -201,6 +201,64 @@ void get_test2(void) memcached_free(memc); } +void set_test2(void) +{ + memcached_st *memc; + memcached_return rc; + char *key= "foo"; + char *value= "train in the brain"; + size_t value_length= strlen(value); + unsigned int x; + + memc= memcached_create(NULL); + assert(memc); + rc= memcached_server_add(memc, "localhost", 0); + assert(rc == MEMCACHED_SUCCESS); + + for (x= 0; x < 10; x++) + { + rc= memcached_set(memc, key, strlen(key), + value, value_length, + (time_t)0, (uint16_t)0); + assert(rc == MEMCACHED_SUCCESS); + } + + memcached_free(memc); +} + +void set_test3(void) +{ + memcached_st *memc; + memcached_return rc; + char *key= "foo"; + char *value; + size_t value_length= 8191; + unsigned int x; + + value = (char*)malloc(value_length); + assert(value); + + for (x= 0; x < value_length; x++) + value[x] = (char) (x % 127); + + memc= memcached_create(NULL); + assert(memc); + rc= memcached_server_add(memc, "localhost", 0); + assert(rc == MEMCACHED_SUCCESS); + + for (x= 0; x < 1; x++) + { + rc= memcached_set(memc, key, strlen(key), + value, value_length, + (time_t)0, (uint16_t)0); + assert(rc == MEMCACHED_SUCCESS); + } + + free(value); + + memcached_free(memc); +} + void get_test3(void) { memcached_st *memc; @@ -618,6 +676,8 @@ int main(int argc, char *argv[]) connection_test(); error_test(); set_test(); + set_test2(); + set_test3(); add_test(); replace_test(); flush_test(); @@ -625,7 +685,7 @@ int main(int argc, char *argv[]) flush_test(); get_test(); get_test2(); - get_test3(); + get_test3(); get_test4(); stats_servername_test(); -- 2.30.2