X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=src%2Fmemslap.c;h=6945b7d9caf78ad902f87dab50a359d43dfa0f81;hb=e69bb33d8da40ded7f7a58a321b9f220b6651c8c;hp=216506d88b26625bdc38521a34a64f77a7d0e63e;hpb=320638630f5a35ddfc4fe0d571b24016a7a80de7;p=awesomized%2Flibmemcached diff --git a/src/memslap.c b/src/memslap.c index 216506d8..6945b7d9 100644 --- a/src/memslap.c +++ b/src/memslap.c @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -14,9 +15,29 @@ #include "utilities.h" #include "generator.h" +/* Global Thread counter */ +unsigned int thread_counter; +pthread_mutex_t counter_mutex; +pthread_cond_t count_threshhold; +unsigned int master_wakeup; +pthread_mutex_t sleeper_mutex; +pthread_cond_t sleep_threshhold; + +void *run_task(void *p); /* Types */ typedef struct conclusions_st conclusions_st; +typedef struct thread_context_st thread_context_st; +typedef enum { + AC_SET, + AC_GET, +} run_action; + +struct thread_context_st { + unsigned int x; + pairs_st *pairs; + run_action action; +}; struct conclusions_st { long int load_time; @@ -28,9 +49,11 @@ struct conclusions_st { /* Prototypes */ void options_parse(int argc, char *argv[]); void conclusions_print(conclusions_st *conclusion); +void scheduler(conclusions_st *conclusion); static int opt_verbose= 0; static unsigned int opt_default_pairs= 100; +static unsigned int opt_concurrency= 1; static int opt_displayflag= 0; static char *opt_servers= NULL; @@ -39,7 +62,6 @@ int main(int argc, char *argv[]) unsigned int x; memcached_return rc; memcached_st *memc; - struct timeval start_time, end_time; pairs_st *pairs; conclusions_st conclusion; @@ -56,40 +78,12 @@ int main(int argc, char *argv[]) pairs= pairs_generate(opt_default_pairs); + pthread_mutex_init(&counter_mutex, NULL); + pthread_cond_init(&count_threshhold, NULL); + pthread_mutex_init(&sleeper_mutex, NULL); + pthread_cond_init(&sleep_threshhold, NULL); - gettimeofday(&start_time, NULL); - for (x= 0; x < opt_default_pairs; x++) - { - rc= memcached_set(memc, pairs[x].key, pairs[x].key_length, - pairs[x].value, pairs[x].value_length, - 0, 0); - if (rc != MEMCACHED_SUCCESS) - fprintf(stderr, "Failured on insert of %.*s\n", - (unsigned int)pairs[x].key_length, pairs[x].key); - conclusion.rows_loaded++; - } - gettimeofday(&end_time, NULL); - conclusion.load_time= timedif(end_time, start_time); - - gettimeofday(&start_time, NULL); - for (x= 0; x < opt_default_pairs; x++) - { - char *value; - size_t value_length; - uint16_t flags; - - value= memcached_get(memc, pairs[x].key, pairs[x].key_length, - &value_length, - &flags, &rc); - - if (rc != MEMCACHED_SUCCESS) - fprintf(stderr, "Failured on read of %.*s\n", - (unsigned int)pairs[x].key_length, pairs[x].key); - conclusion.rows_read++; - free(value); - } - gettimeofday(&end_time, NULL); - conclusion.read_time= timedif(end_time, start_time); + scheduler(&conclusion); pairs_free(pairs); @@ -97,11 +91,78 @@ int main(int argc, char *argv[]) memcached_deinit(memc); + (void)pthread_mutex_init(&counter_mutex, NULL); + (void)pthread_cond_init(&count_threshhold, NULL); + (void)pthread_mutex_init(&sleeper_mutex, NULL); + (void)pthread_cond_init(&sleep_threshhold, NULL); conclusions_print(&conclusion); return 0; } +void scheduler(conclusions_st *conclusion) +{ + unsigned int x; + struct timeval start_time, end_time; + pthread_t mainthread; /* Thread descriptor */ + pthread_attr_t attr; /* Thread attributes */ + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, + PTHREAD_CREATE_DETACHED); + + pthread_mutex_lock(&counter_mutex); + thread_counter= 0; + + pthread_mutex_lock(&sleeper_mutex); + master_wakeup= 1; + pthread_mutex_unlock(&sleeper_mutex); + + for (x= 0; x < opt_concurrency; x++) + { + thread_context_st *context; + context= (thread_context_st *)malloc(sizeof(thread_context_st)); + + /* now you create the thread */ + if (pthread_create(&mainthread, &attr, run_task, + (void *)context) != 0) + { + fprintf(stderr,"Could not create thread\n"); + exit(1); + } + thread_counter++; + } + + pthread_mutex_unlock(&counter_mutex); + pthread_attr_destroy(&attr); + + pthread_mutex_lock(&sleeper_mutex); + master_wakeup= 0; + pthread_mutex_unlock(&sleeper_mutex); + pthread_cond_broadcast(&sleep_threshhold); + + gettimeofday(&start_time, NULL); + /* + We loop until we know that all children have cleaned up. + */ + pthread_mutex_lock(&counter_mutex); + while (thread_counter) + { + struct timespec abstime; + + memset(&abstime, 0, sizeof(struct timespec)); + abstime.tv_sec= 1; + + pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime); + } + pthread_mutex_unlock(&counter_mutex); + + gettimeofday(&end_time, NULL); + + conclusion->load_time= timedif(end_time, start_time); + conclusion->read_time= timedif(end_time, start_time); +} + void options_parse(int argc, char *argv[]) { static struct option long_options[]= @@ -165,3 +226,59 @@ void conclusions_print(conclusions_st *conclusion) printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, conclusion->read_time % 1000); } + +void *run_task(void *p) +{ + unsigned int x; + thread_context_st *context= (thread_context_st *)p; + + pthread_mutex_lock(&sleeper_mutex); + while (master_wakeup) + { + pthread_cond_wait(&sleep_threshhold, &sleeper_mutex); + } + pthread_mutex_unlock(&sleeper_mutex); + + /* Do Stuff */ + + switch (context->action) + { + case AC_SET: + for (x= 0; x < opt_default_pairs; x++) + { + rc= memcached_set(memc, pairs[x].key, pairs[x].key_length, + pairs[x].value, pairs[x].value_length, + 0, 0); + if (rc != MEMCACHED_SUCCESS) + fprintf(stderr, "Failured on insert of %.*s\n", + (unsigned int)pairs[x].key_length, pairs[x].key); + conclusion->rows_loaded++; + } + break; + case AC_GET: + for (x= 0; x < opt_default_pairs; x++) + { + char *value; + size_t value_length; + uint16_t flags; + + value= memcached_get(memc, pairs[x].key, pairs[x].key_length, + &value_length, + &flags, &rc); + + if (rc != MEMCACHED_SUCCESS) + fprintf(stderr, "Failured on read of %.*s\n", + (unsigned int)pairs[x].key_length, pairs[x].key); + conclusion->rows_read++; + free(value); + } + break; + } + + pthread_mutex_lock(&counter_mutex); + thread_counter--; + pthread_cond_signal(&count_threshhold); + pthread_mutex_unlock(&counter_mutex); + + free(context); +}