Merging support for by_key operations.
[awesomized/libmemcached] / clients / memslap.c
1 #include "libmemcached/common.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <sys/types.h>
6 #include <sys/stat.h>
7 #include <sys/types.h>
8 #include <sys/mman.h>
9 #include <fcntl.h>
10 #include <sys/time.h>
11 #include <getopt.h>
12 #include <pthread.h>
13 #include <assert.h>
14
15 #include <libmemcached/memcached.h>
16
17 #include "client_options.h"
18 #include "utilities.h"
19 #include "generator.h"
20 #include "execute.h"
21
22 #define DEFAULT_INITIAL_LOAD 10000
23 #define DEFAULT_EXECUTE_NUMBER 10000
24 #define DEFAULT_CONCURRENCY 1
25
26 #define PROGRAM_NAME "memslap"
27 #define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
28
29 /* Global Thread counter */
30 volatile unsigned int thread_counter;
31 pthread_mutex_t counter_mutex;
32 pthread_cond_t count_threshhold;
33 volatile unsigned int master_wakeup;
34 pthread_mutex_t sleeper_mutex;
35 pthread_cond_t sleep_threshhold;
36
37 void *run_task(void *p);
38
39 /* Types */
40 typedef struct conclusions_st conclusions_st;
41 typedef struct thread_context_st thread_context_st;
42 typedef enum {
43 SET_TEST,
44 GET_TEST,
45 MGET_TEST
46 } test_type;
47
48 struct thread_context_st {
49 unsigned int key_count;
50 pairs_st *initial_pairs;
51 unsigned int initial_number;
52 pairs_st *execute_pairs;
53 unsigned int execute_number;
54 char **keys;
55 size_t *key_lengths;
56 test_type test;
57 memcached_st *memc;
58 };
59
60 struct conclusions_st {
61 long int load_time;
62 long int read_time;
63 unsigned int rows_loaded;
64 unsigned int rows_read;
65 };
66
67 /* Prototypes */
68 void options_parse(int argc, char *argv[]);
69 void conclusions_print(conclusions_st *conclusion);
70 void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
71 pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
72 unsigned int *actual_loaded);
73 void flush_all(memcached_st *memc);
74
75 static int opt_binary= 0;
76 static int opt_verbose= 0;
77 static int opt_flush= 0;
78 static int opt_non_blocking_io= 0;
79 static int opt_tcp_nodelay= 0;
80 static unsigned int opt_execute_number= 0;
81 static unsigned int opt_createial_load= 0;
82 static unsigned int opt_concurrency= 0;
83 static int opt_displayflag= 0;
84 static char *opt_servers= NULL;
85 static int opt_udp_io= 0;
86 test_type opt_test= SET_TEST;
87
88 int main(int argc, char *argv[])
89 {
90 conclusions_st conclusion;
91 memcached_server_st *servers;
92
93 memset(&conclusion, 0, sizeof(conclusions_st));
94
95 srandom((unsigned int)time(NULL));
96 options_parse(argc, argv);
97
98 if (!opt_servers)
99 {
100 char *temp;
101
102 if ((temp= getenv("MEMCACHED_SERVERS")))
103 opt_servers= strdup(temp);
104 else
105 {
106 fprintf(stderr, "No Servers provided\n");
107 exit(1);
108 }
109 }
110
111 servers= memcached_servers_parse(opt_servers);
112
113 pthread_mutex_init(&counter_mutex, NULL);
114 pthread_cond_init(&count_threshhold, NULL);
115 pthread_mutex_init(&sleeper_mutex, NULL);
116 pthread_cond_init(&sleep_threshhold, NULL);
117
118 scheduler(servers, &conclusion);
119
120 free(opt_servers);
121
122 (void)pthread_mutex_destroy(&counter_mutex);
123 (void)pthread_cond_destroy(&count_threshhold);
124 (void)pthread_mutex_destroy(&sleeper_mutex);
125 (void)pthread_cond_destroy(&sleep_threshhold);
126 conclusions_print(&conclusion);
127 memcached_server_list_free(servers);
128
129 return 0;
130 }
131
132 void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
133 {
134 unsigned int x;
135 unsigned int actual_loaded= 0; /* Fix warning */
136 memcached_st *memc;
137
138 struct timeval start_time, end_time;
139 pthread_t mainthread; /* Thread descriptor */
140 pthread_attr_t attr; /* Thread attributes */
141 pairs_st *pairs= NULL;
142
143 pthread_attr_init(&attr);
144 pthread_attr_setdetachstate(&attr,
145 PTHREAD_CREATE_DETACHED);
146
147 memc= memcached_create(NULL);
148
149 /* We need to set udp behavior before adding servers to the client */
150 if (opt_udp_io)
151 {
152 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP,
153 (uint64_t)opt_udp_io);
154 for(x= 0; x < servers[0].count; x++ )
155 servers[x].type= MEMCACHED_CONNECTION_UDP;
156 }
157 memcached_server_push(memc, servers);
158
159 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL,
160 (uint64_t)opt_binary);
161
162 if (opt_flush)
163 flush_all(memc);
164 if (opt_createial_load)
165 pairs= load_create_data(memc, opt_createial_load, &actual_loaded);
166
167 char **keys= calloc(actual_loaded, sizeof(char*));
168 size_t *key_lengths= calloc(actual_loaded, sizeof(size_t));
169
170 if (keys == NULL || key_lengths == NULL)
171 {
172 free(keys);
173 free(key_lengths);
174 keys= NULL;
175 key_lengths= NULL;
176 } else {
177 for (x= 0; x < actual_loaded; ++x)
178 {
179 keys[x]= pairs[x].key;
180 key_lengths[x]= pairs[x].key_length;
181 }
182 }
183
184 /* We set this after we have loaded */
185 {
186 if (opt_non_blocking_io)
187 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
188 if (opt_tcp_nodelay)
189 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
190 }
191
192 pthread_mutex_lock(&counter_mutex);
193 thread_counter= 0;
194
195 pthread_mutex_lock(&sleeper_mutex);
196 master_wakeup= 1;
197 pthread_mutex_unlock(&sleeper_mutex);
198
199 for (x= 0; x < opt_concurrency; x++)
200 {
201 thread_context_st *context;
202 context= (thread_context_st *)calloc(1, sizeof(thread_context_st));
203
204 context->memc= memcached_clone(NULL, memc);
205 context->test= opt_test;
206
207 context->initial_pairs= pairs;
208 context->initial_number= actual_loaded;
209 context->keys= keys;
210 context->key_lengths= key_lengths;
211
212 if (opt_test == SET_TEST)
213 {
214 context->execute_pairs= pairs_generate(opt_execute_number, 400);
215 context->execute_number= opt_execute_number;
216 }
217
218 /* now you create the thread */
219 if (pthread_create(&mainthread, &attr, run_task,
220 (void *)context) != 0)
221 {
222 fprintf(stderr,"Could not create thread\n");
223 exit(1);
224 }
225 thread_counter++;
226 }
227
228 pthread_mutex_unlock(&counter_mutex);
229 pthread_attr_destroy(&attr);
230
231 pthread_mutex_lock(&sleeper_mutex);
232 master_wakeup= 0;
233 pthread_mutex_unlock(&sleeper_mutex);
234 pthread_cond_broadcast(&sleep_threshhold);
235
236 gettimeofday(&start_time, NULL);
237 /*
238 We loop until we know that all children have cleaned up.
239 */
240 pthread_mutex_lock(&counter_mutex);
241 while (thread_counter)
242 pthread_cond_wait(&count_threshhold, &counter_mutex);
243 pthread_mutex_unlock(&counter_mutex);
244
245 gettimeofday(&end_time, NULL);
246
247 conclusion->load_time= timedif(end_time, start_time);
248 conclusion->read_time= timedif(end_time, start_time);
249 free(keys);
250 free(key_lengths);
251 pairs_free(pairs);
252 memcached_free(memc);
253 }
254
255 void options_parse(int argc, char *argv[])
256 {
257 memcached_programs_help_st help_options[]=
258 {
259 {0},
260 };
261
262 static struct option long_options[]=
263 {
264 {(OPTIONSTRING)"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
265 {(OPTIONSTRING)"debug", no_argument, &opt_verbose, OPT_DEBUG},
266 {(OPTIONSTRING)"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
267 {(OPTIONSTRING)"flag", no_argument, &opt_displayflag, OPT_FLAG},
268 {(OPTIONSTRING)"flush", no_argument, &opt_flush, OPT_FLUSH},
269 {(OPTIONSTRING)"help", no_argument, NULL, OPT_HELP},
270 {(OPTIONSTRING)"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
271 {(OPTIONSTRING)"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
272 {(OPTIONSTRING)"servers", required_argument, NULL, OPT_SERVERS},
273 {(OPTIONSTRING)"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY},
274 {(OPTIONSTRING)"test", required_argument, NULL, OPT_SLAP_TEST},
275 {(OPTIONSTRING)"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
276 {(OPTIONSTRING)"version", no_argument, NULL, OPT_VERSION},
277 {(OPTIONSTRING)"binary", no_argument, NULL, OPT_BINARY},
278 {(OPTIONSTRING)"udp", no_argument, NULL, OPT_UDP},
279 {0, 0, 0, 0},
280 };
281
282 int option_index= 0;
283 int option_rv;
284
285 while (1)
286 {
287 option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index);
288 if (option_rv == -1) break;
289 switch (option_rv)
290 {
291 case 0:
292 break;
293 case OPT_UDP:
294 if (opt_test == GET_TEST)
295 {
296 fprintf(stderr, "You can not run a get test in UDP mode. UDP mode "
297 "does not currently support get ops.\n");
298 exit(1);
299 }
300 opt_udp_io= 1;
301 break;
302 case OPT_BINARY:
303 opt_binary = 1;
304 break;
305 case OPT_VERBOSE: /* --verbose or -v */
306 opt_verbose = OPT_VERBOSE;
307 break;
308 case OPT_DEBUG: /* --debug or -d */
309 opt_verbose = OPT_DEBUG;
310 break;
311 case OPT_VERSION: /* --version or -V */
312 version_command(PROGRAM_NAME);
313 break;
314 case OPT_HELP: /* --help or -h */
315 help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
316 break;
317 case OPT_SERVERS: /* --servers or -s */
318 opt_servers= strdup(optarg);
319 break;
320 case OPT_SLAP_TEST:
321 if (!strcmp(optarg, "get"))
322 {
323 if (opt_udp_io == 1)
324 {
325 fprintf(stderr, "You can not run a get test in UDP mode. UDP mode "
326 "does not currently support get ops.\n");
327 exit(1);
328 }
329 opt_test= GET_TEST ;
330 }
331 else if (!strcmp(optarg, "set"))
332 opt_test= SET_TEST;
333 else if (!strcmp(optarg, "mget"))
334 {
335 opt_test= MGET_TEST;
336 }
337 else
338 {
339 fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
340 exit(1);
341 }
342 break;
343 case OPT_SLAP_CONCURRENCY:
344 opt_concurrency= (unsigned int)strtoul(optarg, (char **)NULL, 10);
345 break;
346 case OPT_SLAP_EXECUTE_NUMBER:
347 opt_execute_number= (unsigned int)strtoul(optarg, (char **)NULL, 10);
348 break;
349 case OPT_SLAP_INITIAL_LOAD:
350 opt_createial_load= (unsigned int)strtoul(optarg, (char **)NULL, 10);
351 break;
352 case '?':
353 /* getopt_long already printed an error message. */
354 exit(1);
355 default:
356 abort();
357 }
358 }
359
360 if ((opt_test == GET_TEST || opt_test == MGET_TEST) && opt_createial_load == 0)
361 opt_createial_load= DEFAULT_INITIAL_LOAD;
362
363 if (opt_execute_number == 0)
364 opt_execute_number= DEFAULT_EXECUTE_NUMBER;
365
366 if (opt_concurrency == 0)
367 opt_concurrency= DEFAULT_CONCURRENCY;
368 }
369
370 void conclusions_print(conclusions_st *conclusion)
371 {
372 printf("\tThreads connecting to servers %u\n", opt_concurrency);
373 #ifdef NOT_FINISHED
374 printf("\tLoaded %u rows\n", conclusion->rows_loaded);
375 printf("\tRead %u rows\n", conclusion->rows_read);
376 #endif
377 if (opt_test == SET_TEST)
378 printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
379 conclusion->load_time % 1000);
380 else
381 printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
382 conclusion->read_time % 1000);
383 }
384
385 void *run_task(void *p)
386 {
387 thread_context_st *context= (thread_context_st *)p;
388 memcached_st *memc;
389
390 memc= context->memc;
391
392 pthread_mutex_lock(&sleeper_mutex);
393 while (master_wakeup)
394 {
395 pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
396 }
397 pthread_mutex_unlock(&sleeper_mutex);
398
399 /* Do Stuff */
400 switch (context->test)
401 {
402 case SET_TEST:
403 assert(context->execute_pairs);
404 execute_set(memc, context->execute_pairs, context->execute_number);
405 break;
406 case GET_TEST:
407 execute_get(memc, context->initial_pairs, context->initial_number);
408 break;
409 case MGET_TEST:
410 execute_mget(memc, (const char*const*)context->keys, context->key_lengths,
411 context->initial_number);
412 break;
413 default:
414 WATCHPOINT_ASSERT(context->test);
415 break;
416 }
417
418 memcached_free(memc);
419
420 if (context->execute_pairs)
421 pairs_free(context->execute_pairs);
422
423 free(context);
424
425 pthread_mutex_lock(&counter_mutex);
426 thread_counter--;
427 pthread_cond_signal(&count_threshhold);
428 pthread_mutex_unlock(&counter_mutex);
429
430 return NULL;
431 }
432
433 void flush_all(memcached_st *memc)
434 {
435 memcached_flush(memc, 0);
436 }
437
438 pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
439 unsigned int *actual_loaded)
440 {
441 memcached_st *memc_clone;
442 pairs_st *pairs;
443
444 memc_clone= memcached_clone(NULL, memc);
445 /* We always used non-blocking IO for load since it is faster */
446 memcached_behavior_set(memc_clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0);
447
448 pairs= pairs_generate(number_of, 400);
449 *actual_loaded= execute_set(memc_clone, pairs, number_of);
450
451 memcached_free(memc_clone);
452
453 return pairs;
454 }