#include <fcntl.h>
#include <sys/time.h>
#include <getopt.h>
+#include <pthread.h>
#include <memcached.h>
#include "utilities.h"
#include "generator.h"
+/* Global Thread counter */
+unsigned int thread_counter;
+pthread_mutex_t counter_mutex;
+pthread_cond_t count_threshhold;
+unsigned int master_wakeup;
+pthread_mutex_t sleeper_mutex;
+pthread_cond_t sleep_threshhold;
+
+void *run_task(void *p);
/* Types */
typedef struct conclusions_st conclusions_st;
+typedef struct thread_context_st thread_context_st;
+typedef enum {
+ AC_SET,
+ AC_GET,
+} run_action;
+
+struct thread_context_st {
+ unsigned int x;
+ pairs_st *pairs;
+ run_action action;
+};
struct conclusions_st {
long int load_time;
/* Prototypes */
void options_parse(int argc, char *argv[]);
void conclusions_print(conclusions_st *conclusion);
+void scheduler(conclusions_st *conclusion);
static int opt_verbose= 0;
static unsigned int opt_default_pairs= 100;
+static unsigned int opt_concurrency= 1;
static int opt_displayflag= 0;
static char *opt_servers= NULL;
unsigned int x;
memcached_return rc;
memcached_st *memc;
- struct timeval start_time, end_time;
pairs_st *pairs;
conclusions_st conclusion;
pairs= pairs_generate(opt_default_pairs);
+ pthread_mutex_init(&counter_mutex, NULL);
+ pthread_cond_init(&count_threshhold, NULL);
+ pthread_mutex_init(&sleeper_mutex, NULL);
+ pthread_cond_init(&sleep_threshhold, NULL);
- gettimeofday(&start_time, NULL);
- for (x= 0; x < opt_default_pairs; x++)
- {
- rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
- pairs[x].value, pairs[x].value_length,
- 0, 0);
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on insert of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- conclusion.rows_loaded++;
- }
- gettimeofday(&end_time, NULL);
- conclusion.load_time= timedif(end_time, start_time);
-
- gettimeofday(&start_time, NULL);
- for (x= 0; x < opt_default_pairs; x++)
- {
- char *value;
- size_t value_length;
- uint16_t flags;
-
- value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
- &value_length,
- &flags, &rc);
-
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on read of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- conclusion.rows_read++;
- free(value);
- }
- gettimeofday(&end_time, NULL);
- conclusion.read_time= timedif(end_time, start_time);
+ scheduler(&conclusion);
pairs_free(pairs);
memcached_deinit(memc);
+ (void)pthread_mutex_init(&counter_mutex, NULL);
+ (void)pthread_cond_init(&count_threshhold, NULL);
+ (void)pthread_mutex_init(&sleeper_mutex, NULL);
+ (void)pthread_cond_init(&sleep_threshhold, NULL);
conclusions_print(&conclusion);
return 0;
}
+void scheduler(conclusions_st *conclusion)
+{
+ unsigned int x;
+ struct timeval start_time, end_time;
+ pthread_t mainthread; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr,
+ PTHREAD_CREATE_DETACHED);
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter= 0;
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 1;
+ pthread_mutex_unlock(&sleeper_mutex);
+
+ for (x= 0; x < opt_concurrency; x++)
+ {
+ thread_context_st *context;
+ context= (thread_context_st *)malloc(sizeof(thread_context_st));
+
+ /* now you create the thread */
+ if (pthread_create(&mainthread, &attr, run_task,
+ (void *)context) != 0)
+ {
+ fprintf(stderr,"Could not create thread\n");
+ exit(1);
+ }
+ thread_counter++;
+ }
+
+ pthread_mutex_unlock(&counter_mutex);
+ pthread_attr_destroy(&attr);
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 0;
+ pthread_mutex_unlock(&sleeper_mutex);
+ pthread_cond_broadcast(&sleep_threshhold);
+
+ gettimeofday(&start_time, NULL);
+ /*
+ We loop until we know that all children have cleaned up.
+ */
+ pthread_mutex_lock(&counter_mutex);
+ while (thread_counter)
+ {
+ struct timespec abstime;
+
+ memset(&abstime, 0, sizeof(struct timespec));
+ abstime.tv_sec= 1;
+
+ pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
+ }
+ pthread_mutex_unlock(&counter_mutex);
+
+ gettimeofday(&end_time, NULL);
+
+ conclusion->load_time= timedif(end_time, start_time);
+ conclusion->read_time= timedif(end_time, start_time);
+}
+
void options_parse(int argc, char *argv[])
{
static struct option long_options[]=
printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
conclusion->read_time % 1000);
}
+
+void *run_task(void *p)
+{
+ unsigned int x;
+ thread_context_st *context= (thread_context_st *)p;
+
+ pthread_mutex_lock(&sleeper_mutex);
+ while (master_wakeup)
+ {
+ pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
+ }
+ pthread_mutex_unlock(&sleeper_mutex);
+
+ /* Do Stuff */
+
+ switch (context->action)
+ {
+ case AC_SET:
+ for (x= 0; x < opt_default_pairs; x++)
+ {
+ rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
+ pairs[x].value, pairs[x].value_length,
+ 0, 0);
+ if (rc != MEMCACHED_SUCCESS)
+ fprintf(stderr, "Failured on insert of %.*s\n",
+ (unsigned int)pairs[x].key_length, pairs[x].key);
+ conclusion->rows_loaded++;
+ }
+ break;
+ case AC_GET:
+ for (x= 0; x < opt_default_pairs; x++)
+ {
+ char *value;
+ size_t value_length;
+ uint16_t flags;
+
+ value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
+ &value_length,
+ &flags, &rc);
+
+ if (rc != MEMCACHED_SUCCESS)
+ fprintf(stderr, "Failured on read of %.*s\n",
+ (unsigned int)pairs[x].key_length, pairs[x].key);
+ conclusion->rows_read++;
+ free(value);
+ }
+ break;
+ }
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter--;
+ pthread_cond_signal(&count_threshhold);
+ pthread_mutex_unlock(&counter_mutex);
+
+ free(context);
+}