#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>
+#include <sys/time.h>
#include <getopt.h>
+#include <pthread.h>
#include <memcached.h>
#include "client_options.h"
#include "utilities.h"
+#include "generator.h"
+/* Global Thread counter */
+unsigned int thread_counter;
+pthread_mutex_t counter_mutex;
+pthread_cond_t count_threshhold;
+unsigned int master_wakeup;
+pthread_mutex_t sleeper_mutex;
+pthread_cond_t sleep_threshhold;
-/* Use this for string generation */
-static const char ALPHANUMERICS[]=
- "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
-
-#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1)
+void *run_task(void *p);
/* Types */
-typedef struct pairs_st pairs_st;
+typedef struct conclusions_st conclusions_st;
+typedef struct thread_context_st thread_context_st;
+typedef enum {
+ AC_SET,
+ AC_GET,
+} run_action;
+
+struct thread_context_st {
+ unsigned int x;
+ pairs_st *pairs;
+ run_action action;
+};
-struct pairs_st {
- char *key;
- size_t key_length;
- char *value;
- size_t value_length;
+struct conclusions_st {
+ long int load_time;
+ long int read_time;
+ unsigned int rows_loaded;
+ unsigned int rows_read;
};
/* Prototypes */
void options_parse(int argc, char *argv[]);
-static pairs_st *pairs_generate(void);
-static void pairs_free(pairs_st *pairs);
-static void get_random_string(char *buffer, size_t size);
+void conclusions_print(conclusions_st *conclusion);
+void scheduler(conclusions_st *conclusion);
static int opt_verbose= 0;
-static int opt_default_pairs= 100;
+static unsigned int opt_default_pairs= 100;
+static unsigned int opt_concurrency= 1;
static int opt_displayflag= 0;
static char *opt_servers= NULL;
memcached_return rc;
memcached_st *memc;
pairs_st *pairs;
+ conclusions_st conclusion;
+
+ memset(&conclusion, 0, sizeof(conclusions_st));
srandom(time(NULL));
memc= memcached_init(NULL);
parse_opt_servers(memc, opt_servers);
- pairs= pairs_generate();
+ pairs= pairs_generate(opt_default_pairs);
+ pthread_mutex_init(&counter_mutex, NULL);
+ pthread_cond_init(&count_threshhold, NULL);
+ pthread_mutex_init(&sleeper_mutex, NULL);
+ pthread_cond_init(&sleep_threshhold, NULL);
- for (x= 0; x < opt_default_pairs; x++)
- {
- printf("Key(%u) %.10s \t%.10s\n", x, pairs[x].key, pairs[x].value);
- rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
- pairs[x].value, pairs[x].value_length,
- 0, 0);
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on insert of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- }
+ scheduler(&conclusion);
pairs_free(pairs);
memcached_deinit(memc);
+ (void)pthread_mutex_init(&counter_mutex, NULL);
+ (void)pthread_cond_init(&count_threshhold, NULL);
+ (void)pthread_mutex_init(&sleeper_mutex, NULL);
+ (void)pthread_cond_init(&sleep_threshhold, NULL);
+ conclusions_print(&conclusion);
+
return 0;
}
+void scheduler(conclusions_st *conclusion)
+{
+ unsigned int x;
+ struct timeval start_time, end_time;
+ pthread_t mainthread; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr,
+ PTHREAD_CREATE_DETACHED);
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter= 0;
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 1;
+ pthread_mutex_unlock(&sleeper_mutex);
+
+ for (x= 0; x < opt_concurrency; x++)
+ {
+ thread_context_st *context;
+ context= (thread_context_st *)malloc(sizeof(thread_context_st));
+
+ /* now you create the thread */
+ if (pthread_create(&mainthread, &attr, run_task,
+ (void *)context) != 0)
+ {
+ fprintf(stderr,"Could not create thread\n");
+ exit(1);
+ }
+ thread_counter++;
+ }
+
+ pthread_mutex_unlock(&counter_mutex);
+ pthread_attr_destroy(&attr);
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 0;
+ pthread_mutex_unlock(&sleeper_mutex);
+ pthread_cond_broadcast(&sleep_threshhold);
+
+ gettimeofday(&start_time, NULL);
+ /*
+ We loop until we know that all children have cleaned up.
+ */
+ pthread_mutex_lock(&counter_mutex);
+ while (thread_counter)
+ {
+ struct timespec abstime;
+
+ memset(&abstime, 0, sizeof(struct timespec));
+ abstime.tv_sec= 1;
+
+ pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
+ }
+ pthread_mutex_unlock(&counter_mutex);
+
+ gettimeofday(&end_time, NULL);
+
+ conclusion->load_time= timedif(end_time, start_time);
+ conclusion->read_time= timedif(end_time, start_time);
+}
+
void options_parse(int argc, char *argv[])
{
static struct option long_options[]=
}
}
-static void pairs_free(pairs_st *pairs)
+void conclusions_print(conclusions_st *conclusion)
{
- unsigned int x;
-
- for (x= 0; x < opt_default_pairs; x++)
- {
- free(pairs[x].key);
- free(pairs[x].value);
- }
-
- free(pairs);
+ printf("\tLoaded %u rows\n", conclusion->rows_loaded);
+ printf("\tRead %u rows\n", conclusion->rows_read);
+ printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
+ conclusion->load_time % 1000);
+ printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
+ conclusion->read_time % 1000);
}
-static pairs_st *pairs_generate(void)
+void *run_task(void *p)
{
unsigned int x;
- pairs_st *pairs;
+ thread_context_st *context= (thread_context_st *)p;
- pairs= (pairs_st*)malloc(sizeof(pairs_st) * opt_default_pairs);
+ pthread_mutex_lock(&sleeper_mutex);
+ while (master_wakeup)
+ {
+ pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
+ }
+ pthread_mutex_unlock(&sleeper_mutex);
- if (!pairs)
- goto error;
+ /* Do Stuff */
- for (x= 0; x < opt_default_pairs; x++)
+ switch (context->action)
{
- pairs[x].key= (char *)malloc(sizeof(char) * 100);
- if (!pairs[x].key)
- goto error;
- get_random_string(pairs[x].key, 100);
- pairs[x].key_length= 100;
-
- pairs[x].value= (char *)malloc(sizeof(char) * 400);
- if (!pairs[x].value)
- goto error;
- get_random_string(pairs[x].value, 400);
- pairs[x].value_length= 400;
+ case AC_SET:
+ for (x= 0; x < opt_default_pairs; x++)
+ {
+ rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
+ pairs[x].value, pairs[x].value_length,
+ 0, 0);
+ if (rc != MEMCACHED_SUCCESS)
+ fprintf(stderr, "Failured on insert of %.*s\n",
+ (unsigned int)pairs[x].key_length, pairs[x].key);
+ conclusion->rows_loaded++;
+ }
+ break;
+ case AC_GET:
+ for (x= 0; x < opt_default_pairs; x++)
+ {
+ char *value;
+ size_t value_length;
+ uint16_t flags;
+
+ value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
+ &value_length,
+ &flags, &rc);
+
+ if (rc != MEMCACHED_SUCCESS)
+ fprintf(stderr, "Failured on read of %.*s\n",
+ (unsigned int)pairs[x].key_length, pairs[x].key);
+ conclusion->rows_read++;
+ free(value);
+ }
+ break;
}
- return pairs;
-error:
- fprintf(stderr, "Memory Allocation failure in pairs_generate.\n");
- exit(0);
-}
-
-static void get_random_string(char *buffer, size_t size)
-{
- char *buffer_ptr= buffer;
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter--;
+ pthread_cond_signal(&count_threshhold);
+ pthread_mutex_unlock(&counter_mutex);
- while (--size)
- *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
- *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
+ free(context);
}