X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=src%2Fmemslap.c;h=6945b7d9caf78ad902f87dab50a359d43dfa0f81;hb=e69bb33d8da40ded7f7a58a321b9f220b6651c8c;hp=0ea9cca34d581cdcf0e05b18ff07859ade99fa76;hpb=c93c9ddbc37c21f33ab152b5121bb69baf437ad1;p=awesomized%2Flibmemcached diff --git a/src/memslap.c b/src/memslap.c index 0ea9cca3..6945b7d9 100644 --- a/src/memslap.c +++ b/src/memslap.c @@ -5,38 +5,55 @@ #include #include #include +#include #include +#include #include #include "client_options.h" #include "utilities.h" +#include "generator.h" +/* Global Thread counter */ +unsigned int thread_counter; +pthread_mutex_t counter_mutex; +pthread_cond_t count_threshhold; +unsigned int master_wakeup; +pthread_mutex_t sleeper_mutex; +pthread_cond_t sleep_threshhold; -/* Use this for string generation */ -static const char ALPHANUMERICS[]= - "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz"; - -#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1) +void *run_task(void *p); /* Types */ -typedef struct pairs_st pairs_st; +typedef struct conclusions_st conclusions_st; +typedef struct thread_context_st thread_context_st; +typedef enum { + AC_SET, + AC_GET, +} run_action; + +struct thread_context_st { + unsigned int x; + pairs_st *pairs; + run_action action; +}; -struct pairs_st { - char *key; - size_t key_length; - char *value; - size_t value_length; +struct conclusions_st { + long int load_time; + long int read_time; + unsigned int rows_loaded; + unsigned int rows_read; }; /* Prototypes */ void options_parse(int argc, char *argv[]); -static pairs_st *pairs_generate(void); -static void pairs_free(pairs_st *pairs); -static void get_random_string(char *buffer, size_t size); +void conclusions_print(conclusions_st *conclusion); +void scheduler(conclusions_st *conclusion); static int opt_verbose= 0; -static int opt_default_pairs= 100; +static unsigned int opt_default_pairs= 100; +static unsigned int opt_concurrency= 1; static int opt_displayflag= 0; static char *opt_servers= NULL; @@ -46,6 +63,9 @@ int main(int argc, char *argv[]) memcached_return rc; memcached_st *memc; pairs_st *pairs; + conclusions_st conclusion; + + memset(&conclusion, 0, sizeof(conclusions_st)); srandom(time(NULL)); memc= memcached_init(NULL); @@ -56,19 +76,14 @@ int main(int argc, char *argv[]) parse_opt_servers(memc, opt_servers); - pairs= pairs_generate(); + pairs= pairs_generate(opt_default_pairs); + pthread_mutex_init(&counter_mutex, NULL); + pthread_cond_init(&count_threshhold, NULL); + pthread_mutex_init(&sleeper_mutex, NULL); + pthread_cond_init(&sleep_threshhold, NULL); - for (x= 0; x < opt_default_pairs; x++) - { - printf("Key(%u) %.10s \t%.10s\n", x, pairs[x].key, pairs[x].value); - rc= memcached_set(memc, pairs[x].key, pairs[x].key_length, - pairs[x].value, pairs[x].value_length, - 0, 0); - if (rc != MEMCACHED_SUCCESS) - fprintf(stderr, "Failured on insert of %.*s\n", - (unsigned int)pairs[x].key_length, pairs[x].key); - } + scheduler(&conclusion); pairs_free(pairs); @@ -76,9 +91,78 @@ int main(int argc, char *argv[]) memcached_deinit(memc); + (void)pthread_mutex_init(&counter_mutex, NULL); + (void)pthread_cond_init(&count_threshhold, NULL); + (void)pthread_mutex_init(&sleeper_mutex, NULL); + (void)pthread_cond_init(&sleep_threshhold, NULL); + conclusions_print(&conclusion); + return 0; } +void scheduler(conclusions_st *conclusion) +{ + unsigned int x; + struct timeval start_time, end_time; + pthread_t mainthread; /* Thread descriptor */ + pthread_attr_t attr; /* Thread attributes */ + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, + PTHREAD_CREATE_DETACHED); + + pthread_mutex_lock(&counter_mutex); + thread_counter= 0; + + pthread_mutex_lock(&sleeper_mutex); + master_wakeup= 1; + pthread_mutex_unlock(&sleeper_mutex); + + for (x= 0; x < opt_concurrency; x++) + { + thread_context_st *context; + context= (thread_context_st *)malloc(sizeof(thread_context_st)); + + /* now you create the thread */ + if (pthread_create(&mainthread, &attr, run_task, + (void *)context) != 0) + { + fprintf(stderr,"Could not create thread\n"); + exit(1); + } + thread_counter++; + } + + pthread_mutex_unlock(&counter_mutex); + pthread_attr_destroy(&attr); + + pthread_mutex_lock(&sleeper_mutex); + master_wakeup= 0; + pthread_mutex_unlock(&sleeper_mutex); + pthread_cond_broadcast(&sleep_threshhold); + + gettimeofday(&start_time, NULL); + /* + We loop until we know that all children have cleaned up. + */ + pthread_mutex_lock(&counter_mutex); + while (thread_counter) + { + struct timespec abstime; + + memset(&abstime, 0, sizeof(struct timespec)); + abstime.tv_sec= 1; + + pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime); + } + pthread_mutex_unlock(&counter_mutex); + + gettimeofday(&end_time, NULL); + + conclusion->load_time= timedif(end_time, start_time); + conclusion->read_time= timedif(end_time, start_time); +} + void options_parse(int argc, char *argv[]) { static struct option long_options[]= @@ -89,6 +173,7 @@ void options_parse(int argc, char *argv[]) {"debug", no_argument, &opt_verbose, OPT_DEBUG}, {"servers", required_argument, NULL, OPT_SERVERS}, {"flag", no_argument, &opt_displayflag, OPT_FLAG}, + {"default-pairs", required_argument, NULL, OPT_SLAP_DEFAULT_PAIRS}, {0, 0, 0, 0}, }; @@ -120,6 +205,9 @@ void options_parse(int argc, char *argv[]) case OPT_SERVERS: /* --servers or -s */ opt_servers= strdup(optarg); break; + case OPT_SLAP_DEFAULT_PAIRS: + opt_default_pairs= strtol(optarg, (char **)NULL, 10); + break; case '?': /* getopt_long already printed an error message. */ exit(1); @@ -129,55 +217,68 @@ void options_parse(int argc, char *argv[]) } } -static void pairs_free(pairs_st *pairs) +void conclusions_print(conclusions_st *conclusion) { - unsigned int x; - - for (x= 0; x < opt_default_pairs; x++) - { - free(pairs[x].key); - free(pairs[x].value); - } - - free(pairs); + printf("\tLoaded %u rows\n", conclusion->rows_loaded); + printf("\tRead %u rows\n", conclusion->rows_read); + printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, + conclusion->load_time % 1000); + printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, + conclusion->read_time % 1000); } -static pairs_st *pairs_generate(void) +void *run_task(void *p) { unsigned int x; - pairs_st *pairs; + thread_context_st *context= (thread_context_st *)p; - pairs= (pairs_st*)malloc(sizeof(pairs_st) * opt_default_pairs); + pthread_mutex_lock(&sleeper_mutex); + while (master_wakeup) + { + pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); + } + pthread_mutex_unlock(&sleeper_mutex); - if (!pairs) - goto error; + /* Do Stuff */ - for (x= 0; x < opt_default_pairs; x++) + switch (context->action) { - pairs[x].key= (char *)malloc(sizeof(char) * 100); - if (!pairs[x].key) - goto error; - get_random_string(pairs[x].key, 100); - pairs[x].key_length= 100; - - pairs[x].value= (char *)malloc(sizeof(char) * 400); - if (!pairs[x].value) - goto error; - get_random_string(pairs[x].value, 400); - pairs[x].value_length= 400; + case AC_SET: + for (x= 0; x < opt_default_pairs; x++) + { + rc= memcached_set(memc, pairs[x].key, pairs[x].key_length, + pairs[x].value, pairs[x].value_length, + 0, 0); + if (rc != MEMCACHED_SUCCESS) + fprintf(stderr, "Failured on insert of %.*s\n", + (unsigned int)pairs[x].key_length, pairs[x].key); + conclusion->rows_loaded++; + } + break; + case AC_GET: + for (x= 0; x < opt_default_pairs; x++) + { + char *value; + size_t value_length; + uint16_t flags; + + value= memcached_get(memc, pairs[x].key, pairs[x].key_length, + &value_length, + &flags, &rc); + + if (rc != MEMCACHED_SUCCESS) + fprintf(stderr, "Failured on read of %.*s\n", + (unsigned int)pairs[x].key_length, pairs[x].key); + conclusion->rows_read++; + free(value); + } + break; } - return pairs; -error: - fprintf(stderr, "Memory Allocation failure in pairs_generate.\n"); - exit(0); -} - -static void get_random_string(char *buffer, size_t size) -{ - char *buffer_ptr= buffer; + pthread_mutex_lock(&counter_mutex); + thread_counter--; + pthread_cond_signal(&count_threshhold); + pthread_mutex_unlock(&counter_mutex); - while (--size) - *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE]; - *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE]; + free(context); }