#include <fcntl.h>
#include <sys/time.h>
#include <getopt.h>
+#include <pthread.h>
#include <memcached.h>
#include "client_options.h"
#include "utilities.h"
#include "generator.h"
+#include "execute.h"
+#define DEFAULT_INITIAL_LOAD 10000
+#define DEFAULT_EXECUTE_NUMBER 10000
+#define DEFAULT_CONCURRENCY 1
+
+#define PROGRAM_NAME "memslap"
+#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
+
+/* Global Thread counter */
+unsigned int thread_counter;
+pthread_mutex_t counter_mutex;
+pthread_cond_t count_threshhold;
+unsigned int master_wakeup;
+pthread_mutex_t sleeper_mutex;
+pthread_cond_t sleep_threshhold;
+
+void *run_task(void *p);
/* Types */
typedef struct conclusions_st conclusions_st;
+typedef struct thread_context_st thread_context_st;
+typedef enum {
+ SET_TEST,
+ GET_TEST,
+} test_type;
+
+struct thread_context_st {
+ unsigned int key_count;
+ pairs_st *initial_pairs;
+ unsigned int initial_number;
+ pairs_st *execute_pairs;
+ unsigned int execute_number;
+ test_type test;
+ memcached_server_st *servers;
+};
struct conclusions_st {
long int load_time;
/* Prototypes */
void options_parse(int argc, char *argv[]);
void conclusions_print(conclusions_st *conclusion);
+void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
+pairs_st *load_createial_data(memcached_server_st *servers, unsigned int number_of,
+ unsigned int *actual_loaded);
static int opt_verbose= 0;
-static unsigned int opt_default_pairs= 100;
+static unsigned int opt_execute_number= 0;
+static unsigned int opt_createial_load= 0;
+static unsigned int opt_concurrency= 0;
static int opt_displayflag= 0;
static char *opt_servers= NULL;
+test_type opt_test= SET_TEST;
int main(int argc, char *argv[])
{
- unsigned int x;
- memcached_return rc;
- memcached_st *memc;
- struct timeval start_time, end_time;
- pairs_st *pairs;
conclusions_st conclusion;
+ memcached_server_st *servers;
memset(&conclusion, 0, sizeof(conclusions_st));
srandom(time(NULL));
- memc= memcached_init(NULL);
options_parse(argc, argv);
if (!opt_servers)
exit(0);
- parse_opt_servers(memc, opt_servers);
+ servers= parse_opt_servers(opt_servers);
- pairs= pairs_generate(opt_default_pairs);
+ pthread_mutex_init(&counter_mutex, NULL);
+ pthread_cond_init(&count_threshhold, NULL);
+ pthread_mutex_init(&sleeper_mutex, NULL);
+ pthread_cond_init(&sleep_threshhold, NULL);
+ scheduler(servers, &conclusion);
- gettimeofday(&start_time, NULL);
- for (x= 0; x < opt_default_pairs; x++)
+ free(opt_servers);
+
+ (void)pthread_mutex_destroy(&counter_mutex);
+ (void)pthread_cond_destroy(&count_threshhold);
+ (void)pthread_mutex_destroy(&sleeper_mutex);
+ (void)pthread_cond_destroy(&sleep_threshhold);
+ conclusions_print(&conclusion);
+ memcached_server_list_free(servers);
+
+ return 0;
+}
+
+void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
+{
+ unsigned int x;
+ unsigned int actual_loaded;
+
+ struct timeval start_time, end_time;
+ pthread_t mainthread; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
+ pairs_st *pairs= NULL;
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr,
+ PTHREAD_CREATE_DETACHED);
+
+ if (opt_createial_load)
+ pairs= load_createial_data(servers, opt_createial_load, &actual_loaded);
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter= 0;
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 1;
+ pthread_mutex_unlock(&sleeper_mutex);
+
+ for (x= 0; x < opt_concurrency; x++)
{
- rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
- pairs[x].value, pairs[x].value_length,
- 0, 0);
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on insert of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- conclusion.rows_loaded++;
+ thread_context_st *context;
+ context= (thread_context_st *)malloc(sizeof(thread_context_st));
+
+ context->servers= servers;
+ context->test= opt_test;
+
+ context->initial_pairs= pairs;
+ context->initial_number= actual_loaded;
+
+ if (opt_test == SET_TEST)
+ {
+ context->execute_pairs= pairs_generate(opt_execute_number);
+ context->execute_number= opt_execute_number;
+ }
+
+ /* now you create the thread */
+ if (pthread_create(&mainthread, &attr, run_task,
+ (void *)context) != 0)
+ {
+ fprintf(stderr,"Could not create thread\n");
+ exit(1);
+ }
+ thread_counter++;
}
- gettimeofday(&end_time, NULL);
- conclusion.load_time= timedif(end_time, start_time);
+
+ pthread_mutex_unlock(&counter_mutex);
+ pthread_attr_destroy(&attr);
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 0;
+ pthread_mutex_unlock(&sleeper_mutex);
+ pthread_cond_broadcast(&sleep_threshhold);
gettimeofday(&start_time, NULL);
- for (x= 0; x < opt_default_pairs; x++)
+ /*
+ We loop until we know that all children have cleaned up.
+ */
+ pthread_mutex_lock(&counter_mutex);
+ while (thread_counter)
{
- char *value;
- size_t value_length;
- uint16_t flags;
-
- value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
- &value_length,
- &flags, &rc);
-
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on read of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- conclusion.rows_read++;
- free(value);
- }
- gettimeofday(&end_time, NULL);
- conclusion.read_time= timedif(end_time, start_time);
+ struct timespec abstime;
- pairs_free(pairs);
-
- free(opt_servers);
+ memset(&abstime, 0, sizeof(struct timespec));
+ abstime.tv_sec= 1;
- memcached_deinit(memc);
+ pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
+ }
+ pthread_mutex_unlock(&counter_mutex);
- conclusions_print(&conclusion);
+ gettimeofday(&end_time, NULL);
- return 0;
+ conclusion->load_time= timedif(end_time, start_time);
+ conclusion->read_time= timedif(end_time, start_time);
+ pairs_free(pairs);
}
void options_parse(int argc, char *argv[])
{
+ memcached_programs_help_st help_options[]=
+ {
+ {0},
+ };
+
static struct option long_options[]=
{
- {"version", no_argument, NULL, OPT_VERSION},
- {"help", no_argument, NULL, OPT_HELP},
- {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
+ {"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
{"debug", no_argument, &opt_verbose, OPT_DEBUG},
- {"servers", required_argument, NULL, OPT_SERVERS},
+ {"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
{"flag", no_argument, &opt_displayflag, OPT_FLAG},
- {"default-pairs", required_argument, NULL, OPT_SLAP_DEFAULT_PAIRS},
+ {"help", no_argument, NULL, OPT_HELP},
+ {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
+ {"servers", required_argument, NULL, OPT_SERVERS},
+ {"test", required_argument, NULL, OPT_SLAP_TEST},
+ {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
+ {"version", no_argument, NULL, OPT_VERSION},
{0, 0, 0, 0},
};
opt_verbose = OPT_DEBUG;
break;
case OPT_VERSION: /* --version or -V */
- printf("memcache tools, memcat, v1.0\n");
- exit(0);
+ version_command(PROGRAM_NAME);
break;
case OPT_HELP: /* --help or -h */
- printf("useful help messages go here\n");
- exit(0);
+ help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
break;
case OPT_SERVERS: /* --servers or -s */
opt_servers= strdup(optarg);
break;
- case OPT_SLAP_DEFAULT_PAIRS:
- opt_default_pairs= strtol(optarg, (char **)NULL, 10);
+ case OPT_SLAP_TEST:
+ if (!strcmp(optarg, "get"))
+ opt_test= GET_TEST ;
+ else if (!strcmp(optarg, "set"))
+ opt_test= SET_TEST;
+ else
+ {
+ fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
+ exit(1);
+ }
+ break;
+ case OPT_SLAP_CONCURRENCY:
+ opt_concurrency= strtol(optarg, (char **)NULL, 10);
+ case OPT_SLAP_EXECUTE_NUMBER:
+ opt_execute_number= strtol(optarg, (char **)NULL, 10);
+ break;
+ case OPT_SLAP_INITIAL_LOAD:
+ opt_createial_load= strtol(optarg, (char **)NULL, 10);
break;
case '?':
/* getopt_long already printed an error message. */
abort();
}
}
+
+ if (opt_test == GET_TEST && opt_createial_load == 0)
+ opt_createial_load= DEFAULT_INITIAL_LOAD;
+
+ if (opt_execute_number == 0)
+ opt_execute_number= DEFAULT_EXECUTE_NUMBER;
+
+ if (opt_concurrency == 0)
+ opt_concurrency= DEFAULT_CONCURRENCY;
}
void conclusions_print(conclusions_st *conclusion)
{
+ printf("\tThreads connecting to servers %u\n", opt_concurrency);
+#ifdef NOT_FINISHED
printf("\tLoaded %u rows\n", conclusion->rows_loaded);
printf("\tRead %u rows\n", conclusion->rows_read);
- printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
- conclusion->load_time % 1000);
- printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
- conclusion->read_time % 1000);
+#endif
+ if (opt_test == SET_TEST)
+ printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
+ conclusion->load_time % 1000);
+ else
+ printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
+ conclusion->read_time % 1000);
+}
+
+void *run_task(void *p)
+{
+ thread_context_st *context= (thread_context_st *)p;
+ memcached_st *memc;
+
+ memc= memcached_create(NULL);
+
+ memcached_server_push(memc, context->servers);
+
+ pthread_mutex_lock(&sleeper_mutex);
+ while (master_wakeup)
+ {
+ pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
+ }
+ pthread_mutex_unlock(&sleeper_mutex);
+
+ /* Do Stuff */
+ switch (context->test)
+ {
+ case SET_TEST:
+ execute_set(memc, context->execute_pairs, context->execute_number);
+ break;
+ case GET_TEST:
+ execute_get(memc, context->initial_pairs, context->initial_number);
+ break;
+ }
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter--;
+ pthread_cond_signal(&count_threshhold);
+ pthread_mutex_unlock(&counter_mutex);
+ memcached_free(memc);
+
+ free(context);
+
+ return NULL;
+}
+
+pairs_st *load_createial_data(memcached_server_st *servers, unsigned int number_of,
+ unsigned int *actual_loaded)
+{
+ memcached_st *memc;
+ pairs_st *pairs;
+
+ memc= memcached_create(NULL);
+ memcached_server_push(memc, servers);
+
+ pairs= pairs_generate(number_of);
+ *actual_loaded= execute_set(memc, pairs, number_of);
+
+ memcached_free(memc);
+
+ return pairs;
}