See changes in changelog, but...
[awesomized/libmemcached] / src / memslap.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <sys/types.h>
4 #include <sys/stat.h>
5 #include <sys/types.h>
6 #include <sys/mman.h>
7 #include <fcntl.h>
8 #include <sys/time.h>
9 #include <getopt.h>
10 #include <pthread.h>
11
12 #include <memcached.h>
13
14 #include "client_options.h"
15 #include "utilities.h"
16 #include "generator.h"
17 #include "execute.h"
18
19 #define DEFAULT_INITIAL_LOAD 10000
20 #define DEFAULT_EXECUTE_NUMBER 10000
21 #define DEFAULT_CONCURRENCY 1
22
23 #define PROGRAM_NAME "memslap"
24 #define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
25
26 /* Global Thread counter */
27 unsigned int thread_counter;
28 pthread_mutex_t counter_mutex;
29 pthread_cond_t count_threshhold;
30 unsigned int master_wakeup;
31 pthread_mutex_t sleeper_mutex;
32 pthread_cond_t sleep_threshhold;
33
34 void *run_task(void *p);
35
36 /* Types */
37 typedef struct conclusions_st conclusions_st;
38 typedef struct thread_context_st thread_context_st;
39 typedef enum {
40 SET_TEST,
41 GET_TEST,
42 } test_type;
43
44 struct thread_context_st {
45 unsigned int key_count;
46 pairs_st *initial_pairs;
47 unsigned int initial_number;
48 pairs_st *execute_pairs;
49 unsigned int execute_number;
50 test_type test;
51 memcached_server_st *servers;
52 };
53
54 struct conclusions_st {
55 long int load_time;
56 long int read_time;
57 unsigned int rows_loaded;
58 unsigned int rows_read;
59 };
60
61 /* Prototypes */
62 void options_parse(int argc, char *argv[]);
63 void conclusions_print(conclusions_st *conclusion);
64 void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
65 pairs_st *load_create_data(memcached_server_st *servers, unsigned int number_of,
66 unsigned int *actual_loaded);
67 void flush_all(memcached_server_st *servers);
68
69 static int opt_verbose= 0;
70 static int opt_flush= 0;
71 static int opt_non_blocking_io= 0;
72 static int opt_tcp_nodelay= 0;
73 static unsigned int opt_execute_number= 0;
74 static unsigned int opt_createial_load= 0;
75 static unsigned int opt_concurrency= 0;
76 static int opt_displayflag= 0;
77 static char *opt_servers= NULL;
78 test_type opt_test= SET_TEST;
79
80 int main(int argc, char *argv[])
81 {
82 conclusions_st conclusion;
83 memcached_server_st *servers;
84
85 memset(&conclusion, 0, sizeof(conclusions_st));
86
87 srandom(time(NULL));
88 options_parse(argc, argv);
89
90 if (!opt_servers)
91 exit(0);
92
93 servers= parse_opt_servers(opt_servers);
94
95 pthread_mutex_init(&counter_mutex, NULL);
96 pthread_cond_init(&count_threshhold, NULL);
97 pthread_mutex_init(&sleeper_mutex, NULL);
98 pthread_cond_init(&sleep_threshhold, NULL);
99
100 scheduler(servers, &conclusion);
101
102 free(opt_servers);
103
104 (void)pthread_mutex_destroy(&counter_mutex);
105 (void)pthread_cond_destroy(&count_threshhold);
106 (void)pthread_mutex_destroy(&sleeper_mutex);
107 (void)pthread_cond_destroy(&sleep_threshhold);
108 conclusions_print(&conclusion);
109 memcached_server_list_free(servers);
110
111 return 0;
112 }
113
114 void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
115 {
116 unsigned int x;
117 unsigned int actual_loaded;
118
119 struct timeval start_time, end_time;
120 pthread_t mainthread; /* Thread descriptor */
121 pthread_attr_t attr; /* Thread attributes */
122 pairs_st *pairs= NULL;
123
124 pthread_attr_init(&attr);
125 pthread_attr_setdetachstate(&attr,
126 PTHREAD_CREATE_DETACHED);
127
128 if (opt_flush)
129 flush_all(servers);
130 if (opt_createial_load)
131 pairs= load_create_data(servers, opt_createial_load, &actual_loaded);
132
133 pthread_mutex_lock(&counter_mutex);
134 thread_counter= 0;
135
136 pthread_mutex_lock(&sleeper_mutex);
137 master_wakeup= 1;
138 pthread_mutex_unlock(&sleeper_mutex);
139
140 for (x= 0; x < opt_concurrency; x++)
141 {
142 thread_context_st *context;
143 context= (thread_context_st *)malloc(sizeof(thread_context_st));
144
145 context->servers= servers;
146 context->test= opt_test;
147
148 context->initial_pairs= pairs;
149 context->initial_number= actual_loaded;
150
151 if (opt_test == SET_TEST)
152 {
153 context->execute_pairs= pairs_generate(opt_execute_number);
154 context->execute_number= opt_execute_number;
155 }
156
157 /* now you create the thread */
158 if (pthread_create(&mainthread, &attr, run_task,
159 (void *)context) != 0)
160 {
161 fprintf(stderr,"Could not create thread\n");
162 exit(1);
163 }
164 thread_counter++;
165 }
166
167 pthread_mutex_unlock(&counter_mutex);
168 pthread_attr_destroy(&attr);
169
170 pthread_mutex_lock(&sleeper_mutex);
171 master_wakeup= 0;
172 pthread_mutex_unlock(&sleeper_mutex);
173 pthread_cond_broadcast(&sleep_threshhold);
174
175 gettimeofday(&start_time, NULL);
176 /*
177 We loop until we know that all children have cleaned up.
178 */
179 pthread_mutex_lock(&counter_mutex);
180 while (thread_counter)
181 {
182 struct timespec abstime;
183
184 memset(&abstime, 0, sizeof(struct timespec));
185 abstime.tv_sec= 1;
186
187 pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
188 }
189 pthread_mutex_unlock(&counter_mutex);
190
191 gettimeofday(&end_time, NULL);
192
193 conclusion->load_time= timedif(end_time, start_time);
194 conclusion->read_time= timedif(end_time, start_time);
195 pairs_free(pairs);
196 }
197
198 void options_parse(int argc, char *argv[])
199 {
200 memcached_programs_help_st help_options[]=
201 {
202 {0},
203 };
204
205 static struct option long_options[]=
206 {
207 {"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
208 {"debug", no_argument, &opt_verbose, OPT_DEBUG},
209 {"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
210 {"flag", no_argument, &opt_displayflag, OPT_FLAG},
211 {"flush", no_argument, &opt_flush, OPT_FLUSH},
212 {"help", no_argument, NULL, OPT_HELP},
213 {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
214 {"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
215 {"servers", required_argument, NULL, OPT_SERVERS},
216 {"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY},
217 {"test", required_argument, NULL, OPT_SLAP_TEST},
218 {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
219 {"version", no_argument, NULL, OPT_VERSION},
220 {0, 0, 0, 0},
221 };
222
223 int option_index= 0;
224 int option_rv;
225
226 while (1)
227 {
228 option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index);
229 if (option_rv == -1) break;
230 switch (option_rv)
231 {
232 case 0:
233 break;
234 case OPT_VERBOSE: /* --verbose or -v */
235 opt_verbose = OPT_VERBOSE;
236 break;
237 case OPT_DEBUG: /* --debug or -d */
238 opt_verbose = OPT_DEBUG;
239 break;
240 case OPT_VERSION: /* --version or -V */
241 version_command(PROGRAM_NAME);
242 break;
243 case OPT_HELP: /* --help or -h */
244 help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
245 break;
246 case OPT_SERVERS: /* --servers or -s */
247 opt_servers= strdup(optarg);
248 break;
249 case OPT_SLAP_TEST:
250 if (!strcmp(optarg, "get"))
251 opt_test= GET_TEST ;
252 else if (!strcmp(optarg, "set"))
253 opt_test= SET_TEST;
254 else
255 {
256 fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
257 exit(1);
258 }
259 break;
260 case OPT_SLAP_CONCURRENCY:
261 opt_concurrency= strtol(optarg, (char **)NULL, 10);
262 case OPT_SLAP_EXECUTE_NUMBER:
263 opt_execute_number= strtol(optarg, (char **)NULL, 10);
264 break;
265 case OPT_SLAP_INITIAL_LOAD:
266 opt_createial_load= strtol(optarg, (char **)NULL, 10);
267 break;
268 case '?':
269 /* getopt_long already printed an error message. */
270 exit(1);
271 default:
272 abort();
273 }
274 }
275
276 if (opt_test == GET_TEST && opt_createial_load == 0)
277 opt_createial_load= DEFAULT_INITIAL_LOAD;
278
279 if (opt_execute_number == 0)
280 opt_execute_number= DEFAULT_EXECUTE_NUMBER;
281
282 if (opt_concurrency == 0)
283 opt_concurrency= DEFAULT_CONCURRENCY;
284 }
285
286 void conclusions_print(conclusions_st *conclusion)
287 {
288 printf("\tThreads connecting to servers %u\n", opt_concurrency);
289 #ifdef NOT_FINISHED
290 printf("\tLoaded %u rows\n", conclusion->rows_loaded);
291 printf("\tRead %u rows\n", conclusion->rows_read);
292 #endif
293 if (opt_test == SET_TEST)
294 printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
295 conclusion->load_time % 1000);
296 else
297 printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
298 conclusion->read_time % 1000);
299 }
300
301 void *run_task(void *p)
302 {
303 thread_context_st *context= (thread_context_st *)p;
304 memcached_st *memc;
305
306 memc= memcached_create(NULL);
307 if (opt_non_blocking_io)
308 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL );
309 if (opt_tcp_nodelay)
310 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, NULL );
311
312 memcached_server_push(memc, context->servers);
313
314 pthread_mutex_lock(&sleeper_mutex);
315 while (master_wakeup)
316 {
317 pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
318 }
319 pthread_mutex_unlock(&sleeper_mutex);
320
321 /* Do Stuff */
322 switch (context->test)
323 {
324 case SET_TEST:
325 execute_set(memc, context->execute_pairs, context->execute_number);
326 break;
327 case GET_TEST:
328 execute_get(memc, context->initial_pairs, context->initial_number);
329 break;
330 }
331
332 pthread_mutex_lock(&counter_mutex);
333 thread_counter--;
334 pthread_cond_signal(&count_threshhold);
335 pthread_mutex_unlock(&counter_mutex);
336 memcached_free(memc);
337
338 if (context->execute_pairs)
339 pairs_free(context->execute_pairs);
340 free(context);
341
342 return NULL;
343 }
344
345 void flush_all(memcached_server_st *servers)
346 {
347 memcached_st *memc;
348
349 memc= memcached_create(NULL);
350
351 memcached_server_push(memc, servers);
352
353 memcached_flush(memc, 0);
354
355 memcached_free(memc);
356 }
357
358 pairs_st *load_create_data(memcached_server_st *servers, unsigned int number_of,
359 unsigned int *actual_loaded)
360 {
361 memcached_st *memc;
362 pairs_st *pairs;
363
364 memc= memcached_create(NULL);
365 /* We always used non-blocking IO for load since it is faster */
366 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL );
367 memcached_server_push(memc, servers);
368
369 pairs= pairs_generate(number_of);
370 *actual_loaded= execute_set(memc, pairs, number_of);
371
372 memcached_free(memc);
373
374 return pairs;
375 }