#include <stdio.h>
#include <stdlib.h>
+#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>
+#include <assert.h>
#include <sys/time.h>
#include <getopt.h>
+#include <pthread.h>
#include <memcached.h>
#include "client_options.h"
#include "utilities.h"
+#include "generator.h"
+#include "execute.h"
+#define DEFAULT_INITIAL_LOAD 10000
+#define DEFAULT_EXECUTE_NUMBER 10000
+#define DEFAULT_CONCURRENCY 1
-/* Use this for string generation */
-static const char ALPHANUMERICS[]=
- "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
+#define PROGRAM_NAME "memslap"
+#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
-#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1)
+/* Global Thread counter */
+unsigned int thread_counter;
+pthread_mutex_t counter_mutex;
+pthread_cond_t count_threshhold;
+unsigned int master_wakeup;
+pthread_mutex_t sleeper_mutex;
+pthread_cond_t sleep_threshhold;
+
+void *run_task(void *p);
/* Types */
-typedef struct pairs_st pairs_st;
typedef struct conclusions_st conclusions_st;
+typedef struct thread_context_st thread_context_st;
+typedef enum {
+ SET_TEST,
+ GET_TEST,
+} test_type;
+
+struct thread_context_st {
+ unsigned int key_count;
+ pairs_st *initial_pairs;
+ unsigned int initial_number;
+ pairs_st *execute_pairs;
+ unsigned int execute_number;
+ test_type test;
+ memcached_st *memc;
+};
struct conclusions_st {
long int load_time;
unsigned int rows_read;
};
-struct pairs_st {
- char *key;
- size_t key_length;
- char *value;
- size_t value_length;
-};
-
/* Prototypes */
void options_parse(int argc, char *argv[]);
-static pairs_st *pairs_generate(void);
-static void pairs_free(pairs_st *pairs);
-static void get_random_string(char *buffer, size_t size);
void conclusions_print(conclusions_st *conclusion);
+void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
+pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
+ unsigned int *actual_loaded);
+void flush_all(memcached_st *memc);
static int opt_verbose= 0;
-static unsigned int opt_default_pairs= 100;
+static int opt_flush= 0;
+static int opt_non_blocking_io= 0;
+static int opt_tcp_nodelay= 0;
+static unsigned int opt_execute_number= 0;
+static unsigned int opt_createial_load= 0;
+static unsigned int opt_concurrency= 0;
static int opt_displayflag= 0;
static char *opt_servers= NULL;
+test_type opt_test= SET_TEST;
int main(int argc, char *argv[])
{
- unsigned int x;
- memcached_return rc;
- memcached_st *memc;
- struct timeval start_time, end_time;
- pairs_st *pairs;
conclusions_st conclusion;
+ memcached_server_st *servers;
memset(&conclusion, 0, sizeof(conclusions_st));
srandom(time(NULL));
- memc= memcached_init(NULL);
options_parse(argc, argv);
if (!opt_servers)
- exit(0);
+ {
+ char *temp;
- parse_opt_servers(memc, opt_servers);
+ if ((temp= getenv("MEMCACHED_SERVERS")))
+ opt_servers= strdup(temp);
+ else
+ {
+ fprintf(stderr, "No Servers provided\n");
+ exit(1);
+ }
+ }
- pairs= pairs_generate();
+ servers= memcached_servers_parse(opt_servers);
+ pthread_mutex_init(&counter_mutex, NULL);
+ pthread_cond_init(&count_threshhold, NULL);
+ pthread_mutex_init(&sleeper_mutex, NULL);
+ pthread_cond_init(&sleep_threshhold, NULL);
- gettimeofday(&start_time, NULL);
- for (x= 0; x < opt_default_pairs; x++)
+ scheduler(servers, &conclusion);
+
+ free(opt_servers);
+
+ (void)pthread_mutex_destroy(&counter_mutex);
+ (void)pthread_cond_destroy(&count_threshhold);
+ (void)pthread_mutex_destroy(&sleeper_mutex);
+ (void)pthread_cond_destroy(&sleep_threshhold);
+ conclusions_print(&conclusion);
+ memcached_server_list_free(servers);
+
+ return 0;
+}
+
+void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
+{
+ unsigned int x;
+ unsigned int actual_loaded= 0; /* Fix warning */
+ memcached_st *memc;
+
+ struct timeval start_time, end_time;
+ pthread_t mainthread; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
+ pairs_st *pairs= NULL;
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr,
+ PTHREAD_CREATE_DETACHED);
+
+ memc= memcached_create(NULL);
+ memcached_server_push(memc, servers);
+
+ if (opt_flush)
+ flush_all(memc);
+ if (opt_createial_load)
+ pairs= load_create_data(memc, opt_createial_load, &actual_loaded);
+
+ /* We set this after we have loaded */
{
- rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
- pairs[x].value, pairs[x].value_length,
- 0, 0);
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on insert of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- conclusion.rows_loaded++;
+ unsigned int value= 1;
+ if (opt_non_blocking_io)
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &value);
+ if (opt_tcp_nodelay)
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &value);
}
- gettimeofday(&end_time, NULL);
- conclusion.load_time= timedif(end_time, start_time);
- gettimeofday(&start_time, NULL);
- for (x= 0; x < opt_default_pairs; x++)
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter= 0;
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 1;
+ pthread_mutex_unlock(&sleeper_mutex);
+
+ for (x= 0; x < opt_concurrency; x++)
{
- char *value;
- size_t value_length;
- uint16_t flags;
-
- value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
- &value_length,
- &flags, &rc);
-
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on read of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- conclusion.rows_read++;
- free(value);
+ thread_context_st *context;
+ context= (thread_context_st *)malloc(sizeof(thread_context_st));
+ memset(context, 0, sizeof(thread_context_st));
+
+ context->memc= memcached_clone(NULL, memc);
+ context->test= opt_test;
+
+ context->initial_pairs= pairs;
+ context->initial_number= actual_loaded;
+
+ if (opt_test == SET_TEST)
+ {
+ context->execute_pairs= pairs_generate(opt_execute_number, 400);
+ context->execute_number= opt_execute_number;
+ }
+
+ /* now you create the thread */
+ if (pthread_create(&mainthread, &attr, run_task,
+ (void *)context) != 0)
+ {
+ fprintf(stderr,"Could not create thread\n");
+ exit(1);
+ }
+ thread_counter++;
}
- gettimeofday(&end_time, NULL);
- conclusion.read_time= timedif(end_time, start_time);
- pairs_free(pairs);
+ pthread_mutex_unlock(&counter_mutex);
+ pthread_attr_destroy(&attr);
- free(opt_servers);
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 0;
+ pthread_mutex_unlock(&sleeper_mutex);
+ pthread_cond_broadcast(&sleep_threshhold);
- memcached_deinit(memc);
+ gettimeofday(&start_time, NULL);
+ /*
+ We loop until we know that all children have cleaned up.
+ */
+ pthread_mutex_lock(&counter_mutex);
+ while (thread_counter)
+ {
+ struct timespec abstime;
- conclusions_print(&conclusion);
+ memset(&abstime, 0, sizeof(struct timespec));
+ abstime.tv_sec= 1;
- return 0;
+ pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
+ }
+ pthread_mutex_unlock(&counter_mutex);
+
+ gettimeofday(&end_time, NULL);
+
+ conclusion->load_time= timedif(end_time, start_time);
+ conclusion->read_time= timedif(end_time, start_time);
+ pairs_free(pairs);
}
void options_parse(int argc, char *argv[])
{
+ memcached_programs_help_st help_options[]=
+ {
+ {0},
+ };
+
static struct option long_options[]=
{
- {"version", no_argument, NULL, OPT_VERSION},
- {"help", no_argument, NULL, OPT_HELP},
- {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
+ {"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
{"debug", no_argument, &opt_verbose, OPT_DEBUG},
- {"servers", required_argument, NULL, OPT_SERVERS},
+ {"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
{"flag", no_argument, &opt_displayflag, OPT_FLAG},
- {"default-pairs", required_argument, NULL, OPT_SLAP_DEFAULT_PAIRS},
+ {"flush", no_argument, &opt_flush, OPT_FLUSH},
+ {"help", no_argument, NULL, OPT_HELP},
+ {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
+ {"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
+ {"servers", required_argument, NULL, OPT_SERVERS},
+ {"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY},
+ {"test", required_argument, NULL, OPT_SLAP_TEST},
+ {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
+ {"version", no_argument, NULL, OPT_VERSION},
{0, 0, 0, 0},
};
opt_verbose = OPT_DEBUG;
break;
case OPT_VERSION: /* --version or -V */
- printf("memcache tools, memcat, v1.0\n");
- exit(0);
+ version_command(PROGRAM_NAME);
break;
case OPT_HELP: /* --help or -h */
- printf("useful help messages go here\n");
- exit(0);
+ help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
break;
case OPT_SERVERS: /* --servers or -s */
opt_servers= strdup(optarg);
break;
- case OPT_SLAP_DEFAULT_PAIRS:
- opt_default_pairs= strtol(optarg, (char **)NULL, 10);
+ case OPT_SLAP_TEST:
+ if (!strcmp(optarg, "get"))
+ opt_test= GET_TEST ;
+ else if (!strcmp(optarg, "set"))
+ opt_test= SET_TEST;
+ else
+ {
+ fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
+ exit(1);
+ }
+ break;
+ case OPT_SLAP_CONCURRENCY:
+ opt_concurrency= strtol(optarg, (char **)NULL, 10);
+ case OPT_SLAP_EXECUTE_NUMBER:
+ opt_execute_number= strtol(optarg, (char **)NULL, 10);
+ break;
+ case OPT_SLAP_INITIAL_LOAD:
+ opt_createial_load= strtol(optarg, (char **)NULL, 10);
break;
case '?':
/* getopt_long already printed an error message. */
abort();
}
}
-}
-static void pairs_free(pairs_st *pairs)
-{
- unsigned int x;
+ if (opt_test == GET_TEST && opt_createial_load == 0)
+ opt_createial_load= DEFAULT_INITIAL_LOAD;
- for (x= 0; x < opt_default_pairs; x++)
- {
- free(pairs[x].key);
- free(pairs[x].value);
- }
+ if (opt_execute_number == 0)
+ opt_execute_number= DEFAULT_EXECUTE_NUMBER;
- free(pairs);
+ if (opt_concurrency == 0)
+ opt_concurrency= DEFAULT_CONCURRENCY;
}
-static pairs_st *pairs_generate(void)
+void conclusions_print(conclusions_st *conclusion)
{
- unsigned int x;
- pairs_st *pairs;
+ printf("\tThreads connecting to servers %u\n", opt_concurrency);
+#ifdef NOT_FINISHED
+ printf("\tLoaded %u rows\n", conclusion->rows_loaded);
+ printf("\tRead %u rows\n", conclusion->rows_read);
+#endif
+ if (opt_test == SET_TEST)
+ printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
+ conclusion->load_time % 1000);
+ else
+ printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
+ conclusion->read_time % 1000);
+}
+
+void *run_task(void *p)
+{
+ thread_context_st *context= (thread_context_st *)p;
+ memcached_st *memc;
- pairs= (pairs_st*)malloc(sizeof(pairs_st) * opt_default_pairs);
+ memc= context->memc;
- if (!pairs)
- goto error;
+ pthread_mutex_lock(&sleeper_mutex);
+ while (master_wakeup)
+ {
+ pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
+ }
+ pthread_mutex_unlock(&sleeper_mutex);
- for (x= 0; x < opt_default_pairs; x++)
+ /* Do Stuff */
+ switch (context->test)
{
- pairs[x].key= (char *)malloc(sizeof(char) * 100);
- if (!pairs[x].key)
- goto error;
- get_random_string(pairs[x].key, 100);
- pairs[x].key_length= 100;
-
- pairs[x].value= (char *)malloc(sizeof(char) * 400);
- if (!pairs[x].value)
- goto error;
- get_random_string(pairs[x].value, 400);
- pairs[x].value_length= 400;
+ case SET_TEST:
+ execute_set(memc, context->execute_pairs, context->execute_number);
+ break;
+ case GET_TEST:
+ execute_get(memc, context->initial_pairs, context->initial_number);
+ break;
}
- return pairs;
-error:
- fprintf(stderr, "Memory Allocation failure in pairs_generate.\n");
- exit(0);
+ memcached_free(memc);
+
+ if (context->execute_pairs)
+ pairs_free(context->execute_pairs);
+ free(context);
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter--;
+ pthread_cond_signal(&count_threshhold);
+ pthread_mutex_unlock(&counter_mutex);
+
+ return NULL;
}
-void conclusions_print(conclusions_st *conclusion)
+void flush_all(memcached_st *memc)
{
- printf("\tLoaded %u rows\n", conclusion->rows_loaded);
- printf("\tRead %u rows\n", conclusion->rows_read);
- printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
- conclusion->load_time % 1000);
- printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
- conclusion->read_time % 1000);
+ memcached_flush(memc, 0);
}
-static void get_random_string(char *buffer, size_t size)
+pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
+ unsigned int *actual_loaded)
{
- char *buffer_ptr= buffer;
+ memcached_st *clone;
+ pairs_st *pairs;
+
+ clone= memcached_clone(NULL, memc);
+ /* We always used non-blocking IO for load since it is faster */
+ memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL );
- while (--size)
- *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
- *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
+ pairs= pairs_generate(number_of, 400);
+ *actual_loaded= execute_set(clone, pairs, number_of);
+
+ memcached_free(clone);
+
+ return pairs;
}