From e69bb33d8da40ded7f7a58a321b9f220b6651c8c Mon Sep 17 00:00:00 2001 From: Brian Aker Date: Sun, 30 Sep 2007 10:18:42 -0700 Subject: [PATCH] Added/restructured all additional hostname information --- include/memcached.h | 12 +++ lib/Makefile.am | 1 + lib/memcached.c | 13 +-- lib/memcached_connect.c | 42 --------- lib/memcached_delete.c | 3 + lib/memcached_get.c | 42 ++++++--- lib/memcached_response.c | 4 +- lib/memcached_strerror.c | 6 ++ src/memslap.c | 185 ++++++++++++++++++++++++++++++++------- tests/output.res | 3 + tests/test.c | 73 +++++++++++++++ 11 files changed, 283 insertions(+), 101 deletions(-) diff --git a/include/memcached.h b/include/memcached.h index 634f8884..23688d55 100644 --- a/include/memcached.h +++ b/include/memcached.h @@ -60,6 +60,9 @@ typedef enum { MEMCACHED_PARTIAL_READ, MEMCACHED_SOME_ERRORS, MEMCACHED_NO_SERVERS, + MEMCACHED_END, + MEMCACHED_DELETED, + MEMCACHED_VALUE, MEMCACHED_MAXIMUM_RETURN, /* Always add new error code before */ } memcached_return; @@ -175,6 +178,13 @@ char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length, memcached_return memcached_server_add(memcached_st *ptr, char *hostname, unsigned int port); +memcached_server_st *memcached_server_list_append(memcached_server_st *ptr, + char *hostname, unsigned int port, + memcached_return *error); +void memcached_server_list_free(memcached_server_st *ptr); +memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *list); +unsigned int memcached_server_list_count(memcached_server_st *ptr); + /* These are all private, do not use. */ memcached_return memcached_connect(memcached_st *ptr); memcached_return memcached_response(memcached_st *ptr, @@ -205,6 +215,8 @@ void memcached_string_free(memcached_st *ptr, memcached_string_st *string); /* Some personal debugging functions */ #define WATCHPOINT printf("WATCHPOINT %s:%d\n", __FILE__, __LINE__);fflush(stdout); #define WATCHPOINT_ERROR(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout); +#define WATCHPOINT_STRING(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, A);fflush(stdout); +#define WATCHPOINT_NUMBER(A) printf("WATCHPOINT %s:%d %d\n", __FILE__, __LINE__, A);fflush(stdout); #ifdef __cplusplus diff --git a/lib/Makefile.am b/lib/Makefile.am index 1c688533..b474f28f 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -37,6 +37,7 @@ libmemcached_la_SOURCES = memcached.c \ memcached_quit.c \ memcached_flush.c \ memcached_string.c \ + memcached_hosts.c \ memcached_stats.c libmemcached_la_LIBADD = diff --git a/lib/memcached.c b/lib/memcached.c index 6cf81c73..68c5d3c5 100644 --- a/lib/memcached.c +++ b/lib/memcached.c @@ -25,19 +25,10 @@ memcached_st *memcached_init(memcached_st *ptr) void memcached_deinit(memcached_st *ptr) { - unsigned int x; - if (ptr->hosts) { - for (x= 0; x < ptr->number_of_hosts; x++) - { - if (ptr->hosts[x].fd > 0) - close(ptr->hosts[x].fd); - - free(ptr->hosts[x].hostname); - } - - free(ptr->hosts); + memcached_server_list_free(ptr->hosts); + ptr->hosts= NULL; } if (ptr->is_allocated == MEMCACHED_ALLOCATED) diff --git a/lib/memcached_connect.c b/lib/memcached_connect.c index 516b5db3..38105395 100644 --- a/lib/memcached_connect.c +++ b/lib/memcached_connect.c @@ -1,45 +1,5 @@ #include "common.h" -memcached_return memcached_server_add(memcached_st *ptr, char *hostname, unsigned int port) -{ - memcached_server_st *new_host_list; - char *new_hostname; - LIBMEMCACHED_MEMCACHED_SERVER_ADD_START(); - - if (!port) - port= MEMCACHED_DEFAULT_PORT; - - if (!hostname) - hostname= "localhost"; - - - new_host_list= (memcached_server_st *)realloc(ptr->hosts, sizeof(memcached_server_st) * (ptr->number_of_hosts+1)); - if (!new_host_list) - return MEMCACHED_MEMORY_ALLOCATION_FAILURE; - memset(&new_host_list[ptr->number_of_hosts], 0, sizeof(memcached_server_st)); - - if (!new_host_list) - return MEMCACHED_MEMORY_ALLOCATION_FAILURE; - - ptr->hosts= new_host_list; - - new_hostname= - (char *)malloc(sizeof(char) * (strlen(hostname)+1)); - if (!new_hostname) - return MEMCACHED_MEMORY_ALLOCATION_FAILURE; - - memset(new_hostname, 0, strlen(hostname)+1); - memcpy(new_hostname, hostname, strlen(hostname)); - ptr->hosts[ptr->number_of_hosts].hostname= new_hostname; - ptr->hosts[ptr->number_of_hosts].port= port; - ptr->hosts[ptr->number_of_hosts].fd= -1; - ptr->number_of_hosts++; - - LIBMEMCACHED_MEMCACHED_SERVER_ADD_END(); - - return MEMCACHED_SUCCESS; -} - memcached_return memcached_connect(memcached_st *ptr) { unsigned int x; @@ -52,9 +12,7 @@ memcached_return memcached_connect(memcached_st *ptr) return MEMCACHED_SUCCESS; if (!ptr->hosts) - { return MEMCACHED_NO_SERVERS; - } for (x= 0; x < ptr->number_of_hosts; x++) { diff --git a/lib/memcached_delete.c b/lib/memcached_delete.c index 5ec5d1b4..64036c79 100644 --- a/lib/memcached_delete.c +++ b/lib/memcached_delete.c @@ -36,5 +36,8 @@ memcached_return memcached_delete(memcached_st *ptr, char *key, size_t key_lengt rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key); LIBMEMCACHED_MEMCACHED_DELETE_END(); + if (rc == MEMCACHED_DELETED) + rc= MEMCACHED_SUCCESS; + return rc; } diff --git a/lib/memcached_get.c b/lib/memcached_get.c index aa50d70b..77bc68aa 100644 --- a/lib/memcached_get.c +++ b/lib/memcached_get.c @@ -113,6 +113,8 @@ static char *memcached_value_fetch(memcached_st *ptr, char *key, size_t *key_len return value; } } + else if (*error == MEMCACHED_END) + *error= MEMCACHED_NOTFOUND; return NULL; read_error: @@ -129,41 +131,57 @@ char *memcached_get(memcached_st *ptr, char *key, size_t key_length, char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; unsigned int server_key; char *value; - memcached_return rc; LIBMEMCACHED_MEMCACHED_GET_START(); *value_length= 0; *error= memcached_connect(ptr); if (*error != MEMCACHED_SUCCESS) - return NULL; + goto error; server_key= memcached_generate_hash(key, key_length) % ptr->number_of_hosts; send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, "get %.*s\r\n", (int)key_length, key); - if (*error != MEMCACHED_SUCCESS) - return NULL; - if ((send(ptr->hosts[server_key].fd, buffer, send_length, 0) == -1)) { *error= MEMCACHED_WRITE_FAILURE; - return NULL; + goto error; } value= memcached_value_fetch(ptr, key, &key_length, value_length, flags, error, 0, server_key); - /* We need to read END */ - rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key); - if (rc != MEMCACHED_NOTFOUND) + if (*error == MEMCACHED_END && *value_length == 0) + { + *error= MEMCACHED_NOTFOUND; + goto error; + } + else if (*error == MEMCACHED_SUCCESS) { - free(value); - *value_length= 0; - *error= MEMCACHED_PROTOCOL_ERROR; + memcached_return rc; + /* We need to read END */ + rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key); + + if (rc != MEMCACHED_END) + { + *error= MEMCACHED_PROTOCOL_ERROR; + goto error; + } } + else + goto error; + LIBMEMCACHED_MEMCACHED_GET_END(); return value; + +error: + free(value); + *value_length= 0; + + LIBMEMCACHED_MEMCACHED_GET_END(); + + return NULL; } memcached_return memcached_mget(memcached_st *ptr, diff --git a/lib/memcached_response.c b/lib/memcached_response.c index 12763513..cb74be0a 100644 --- a/lib/memcached_response.c +++ b/lib/memcached_response.c @@ -48,7 +48,7 @@ memcached_return memcached_response(memcached_st *ptr, return MEMCACHED_UNKNOWN_READ_FAILURE; } case 'D': /* DELETED */ - return MEMCACHED_SUCCESS; + return MEMCACHED_DELETED; case 'N': /* NOT_FOUND */ { if (buffer[4] == 'F') @@ -61,7 +61,7 @@ memcached_return memcached_response(memcached_st *ptr, case 'E': /* PROTOCOL ERROR or END */ { if (buffer[1] == 'N') - return MEMCACHED_NOTFOUND; + return MEMCACHED_END; else if (buffer[1] == 'R') return MEMCACHED_PROTOCOL_ERROR; else diff --git a/lib/memcached_strerror.c b/lib/memcached_strerror.c index f987b3cc..c917e96e 100644 --- a/lib/memcached_strerror.c +++ b/lib/memcached_strerror.c @@ -44,6 +44,12 @@ char *memcached_strerror(memcached_st *ptr, memcached_return rc) return "SOME ERRORS WERE REPORTED"; case MEMCACHED_NO_SERVERS: return "NO SERVERS DEFINED"; + case MEMCACHED_END: + return "SERVER END"; + case MEMCACHED_DELETED: + return "SERVER DELETE"; + case MEMCACHED_VALUE: + return "SERVER VALUE"; case MEMCACHED_MAXIMUM_RETURN: return "Gibberish returned!"; default: diff --git a/src/memslap.c b/src/memslap.c index 216506d8..6945b7d9 100644 --- a/src/memslap.c +++ b/src/memslap.c @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -14,9 +15,29 @@ #include "utilities.h" #include "generator.h" +/* Global Thread counter */ +unsigned int thread_counter; +pthread_mutex_t counter_mutex; +pthread_cond_t count_threshhold; +unsigned int master_wakeup; +pthread_mutex_t sleeper_mutex; +pthread_cond_t sleep_threshhold; + +void *run_task(void *p); /* Types */ typedef struct conclusions_st conclusions_st; +typedef struct thread_context_st thread_context_st; +typedef enum { + AC_SET, + AC_GET, +} run_action; + +struct thread_context_st { + unsigned int x; + pairs_st *pairs; + run_action action; +}; struct conclusions_st { long int load_time; @@ -28,9 +49,11 @@ struct conclusions_st { /* Prototypes */ void options_parse(int argc, char *argv[]); void conclusions_print(conclusions_st *conclusion); +void scheduler(conclusions_st *conclusion); static int opt_verbose= 0; static unsigned int opt_default_pairs= 100; +static unsigned int opt_concurrency= 1; static int opt_displayflag= 0; static char *opt_servers= NULL; @@ -39,7 +62,6 @@ int main(int argc, char *argv[]) unsigned int x; memcached_return rc; memcached_st *memc; - struct timeval start_time, end_time; pairs_st *pairs; conclusions_st conclusion; @@ -56,40 +78,12 @@ int main(int argc, char *argv[]) pairs= pairs_generate(opt_default_pairs); + pthread_mutex_init(&counter_mutex, NULL); + pthread_cond_init(&count_threshhold, NULL); + pthread_mutex_init(&sleeper_mutex, NULL); + pthread_cond_init(&sleep_threshhold, NULL); - gettimeofday(&start_time, NULL); - for (x= 0; x < opt_default_pairs; x++) - { - rc= memcached_set(memc, pairs[x].key, pairs[x].key_length, - pairs[x].value, pairs[x].value_length, - 0, 0); - if (rc != MEMCACHED_SUCCESS) - fprintf(stderr, "Failured on insert of %.*s\n", - (unsigned int)pairs[x].key_length, pairs[x].key); - conclusion.rows_loaded++; - } - gettimeofday(&end_time, NULL); - conclusion.load_time= timedif(end_time, start_time); - - gettimeofday(&start_time, NULL); - for (x= 0; x < opt_default_pairs; x++) - { - char *value; - size_t value_length; - uint16_t flags; - - value= memcached_get(memc, pairs[x].key, pairs[x].key_length, - &value_length, - &flags, &rc); - - if (rc != MEMCACHED_SUCCESS) - fprintf(stderr, "Failured on read of %.*s\n", - (unsigned int)pairs[x].key_length, pairs[x].key); - conclusion.rows_read++; - free(value); - } - gettimeofday(&end_time, NULL); - conclusion.read_time= timedif(end_time, start_time); + scheduler(&conclusion); pairs_free(pairs); @@ -97,11 +91,78 @@ int main(int argc, char *argv[]) memcached_deinit(memc); + (void)pthread_mutex_init(&counter_mutex, NULL); + (void)pthread_cond_init(&count_threshhold, NULL); + (void)pthread_mutex_init(&sleeper_mutex, NULL); + (void)pthread_cond_init(&sleep_threshhold, NULL); conclusions_print(&conclusion); return 0; } +void scheduler(conclusions_st *conclusion) +{ + unsigned int x; + struct timeval start_time, end_time; + pthread_t mainthread; /* Thread descriptor */ + pthread_attr_t attr; /* Thread attributes */ + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, + PTHREAD_CREATE_DETACHED); + + pthread_mutex_lock(&counter_mutex); + thread_counter= 0; + + pthread_mutex_lock(&sleeper_mutex); + master_wakeup= 1; + pthread_mutex_unlock(&sleeper_mutex); + + for (x= 0; x < opt_concurrency; x++) + { + thread_context_st *context; + context= (thread_context_st *)malloc(sizeof(thread_context_st)); + + /* now you create the thread */ + if (pthread_create(&mainthread, &attr, run_task, + (void *)context) != 0) + { + fprintf(stderr,"Could not create thread\n"); + exit(1); + } + thread_counter++; + } + + pthread_mutex_unlock(&counter_mutex); + pthread_attr_destroy(&attr); + + pthread_mutex_lock(&sleeper_mutex); + master_wakeup= 0; + pthread_mutex_unlock(&sleeper_mutex); + pthread_cond_broadcast(&sleep_threshhold); + + gettimeofday(&start_time, NULL); + /* + We loop until we know that all children have cleaned up. + */ + pthread_mutex_lock(&counter_mutex); + while (thread_counter) + { + struct timespec abstime; + + memset(&abstime, 0, sizeof(struct timespec)); + abstime.tv_sec= 1; + + pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime); + } + pthread_mutex_unlock(&counter_mutex); + + gettimeofday(&end_time, NULL); + + conclusion->load_time= timedif(end_time, start_time); + conclusion->read_time= timedif(end_time, start_time); +} + void options_parse(int argc, char *argv[]) { static struct option long_options[]= @@ -165,3 +226,59 @@ void conclusions_print(conclusions_st *conclusion) printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, conclusion->read_time % 1000); } + +void *run_task(void *p) +{ + unsigned int x; + thread_context_st *context= (thread_context_st *)p; + + pthread_mutex_lock(&sleeper_mutex); + while (master_wakeup) + { + pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); + } + pthread_mutex_unlock(&sleeper_mutex); + + /* Do Stuff */ + + switch (context->action) + { + case AC_SET: + for (x= 0; x < opt_default_pairs; x++) + { + rc= memcached_set(memc, pairs[x].key, pairs[x].key_length, + pairs[x].value, pairs[x].value_length, + 0, 0); + if (rc != MEMCACHED_SUCCESS) + fprintf(stderr, "Failured on insert of %.*s\n", + (unsigned int)pairs[x].key_length, pairs[x].key); + conclusion->rows_loaded++; + } + break; + case AC_GET: + for (x= 0; x < opt_default_pairs; x++) + { + char *value; + size_t value_length; + uint16_t flags; + + value= memcached_get(memc, pairs[x].key, pairs[x].key_length, + &value_length, + &flags, &rc); + + if (rc != MEMCACHED_SUCCESS) + fprintf(stderr, "Failured on read of %.*s\n", + (unsigned int)pairs[x].key_length, pairs[x].key); + conclusion->rows_read++; + free(value); + } + break; + } + + pthread_mutex_lock(&counter_mutex); + thread_counter--; + pthread_cond_signal(&count_threshhold); + pthread_mutex_unlock(&counter_mutex); + + free(context); +} diff --git a/tests/output.res b/tests/output.res index 50cb7861..50dd00ba 100644 --- a/tests/output.res +++ b/tests/output.res @@ -18,6 +18,9 @@ Error 16 -> MEMORY ALLOCATION FAILURE Error 17 -> PARTIAL READ Error 18 -> SOME ERRORS WERE REPORTED Error 19 -> NO SERVERS DEFINED +Error 20 -> SERVER END +Error 21 -> SERVER DELETE +Error 22 -> SERVER VALUE Found key pid Found key uptime Found key time diff --git a/tests/test.c b/tests/test.c index c60b4810..3ebb2429 100644 --- a/tests/test.c +++ b/tests/test.c @@ -536,6 +536,78 @@ void get_stats_multiple(void) memcached_deinit(memc); } +void add_host_test(void) +{ + unsigned int x; + memcached_st *memc; + memcached_server_st *servers; + memcached_return rc; + char servername[]= "0.example.com"; + + memc= memcached_init(NULL); + assert(memc); + rc= memcached_server_add(memc, "localhost", 0); + assert(rc == MEMCACHED_SUCCESS); + + servers= memcached_server_list_append(NULL, servername, 400, &rc); + assert(servers); + assert(1 == memcached_server_list_count(servers)); + + for (x= 2; x < 20; x++) + { + char buffer[SMALL_STRING_LEN]; + + snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x); + servers= memcached_server_list_append(servers, buffer, 401, + &rc); + assert(rc == MEMCACHED_SUCCESS); + assert(x == memcached_server_list_count(servers)); + } + + rc= memcached_server_push(memc, servers); + assert(rc == MEMCACHED_SUCCESS); + rc= memcached_server_push(memc, servers); + assert(rc == MEMCACHED_SUCCESS); + + memcached_server_list_free(servers); + memcached_deinit(memc); +} + +void add_host_test1(void) +{ + unsigned int x; + memcached_st *memc; + memcached_server_st *servers; + memcached_return rc; + char servername[]= "0.example.com"; + + memc= memcached_init(NULL); + assert(memc); + + servers= memcached_server_list_append(NULL, servername, 400, &rc); + assert(servers); + assert(1 == memcached_server_list_count(servers)); + + for (x= 2; x < 20; x++) + { + char buffer[SMALL_STRING_LEN]; + + snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x); + servers= memcached_server_list_append(servers, buffer, 401, + &rc); + assert(rc == MEMCACHED_SUCCESS); + assert(x == memcached_server_list_count(servers)); + } + + rc= memcached_server_push(memc, servers); + assert(rc == MEMCACHED_SUCCESS); + rc= memcached_server_push(memc, servers); + assert(rc == MEMCACHED_SUCCESS); + + memcached_server_list_free(servers); + memcached_deinit(memc); +} + int main(int argc, char *argv[]) { @@ -562,6 +634,7 @@ int main(int argc, char *argv[]) quit_test(); mget_test(); get_stats(); + add_host_test(); /* The multiple tests */ if (argc == 2) -- 2.30.2