Merge from Trond
[m6w6/libmemcached] / clients / memslap.c
1 #include "libmemcached/common.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <sys/types.h>
6 #include <sys/stat.h>
7 #include <sys/types.h>
8 #include <sys/mman.h>
9 #include <fcntl.h>
10 #include <sys/time.h>
11 #include <getopt.h>
12 #include <pthread.h>
13 #include <assert.h>
14
15 #include <libmemcached/memcached.h>
16
17 #include "client_options.h"
18 #include "utilities.h"
19 #include "generator.h"
20 #include "execute.h"
21
22 #define DEFAULT_INITIAL_LOAD 10000
23 #define DEFAULT_EXECUTE_NUMBER 10000
24 #define DEFAULT_CONCURRENCY 1
25
26 #define PROGRAM_NAME "memslap"
27 #define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
28
29 /* Global Thread counter */
30 volatile unsigned int thread_counter;
31 pthread_mutex_t counter_mutex;
32 pthread_cond_t count_threshhold;
33 volatile unsigned int master_wakeup;
34 pthread_mutex_t sleeper_mutex;
35 pthread_cond_t sleep_threshhold;
36
37 void *run_task(void *p);
38
39 /* Types */
40 typedef struct conclusions_st conclusions_st;
41 typedef struct thread_context_st thread_context_st;
42 typedef enum {
43 SET_TEST,
44 GET_TEST,
45 } test_type;
46
47 struct thread_context_st {
48 unsigned int key_count;
49 pairs_st *initial_pairs;
50 unsigned int initial_number;
51 pairs_st *execute_pairs;
52 unsigned int execute_number;
53 test_type test;
54 memcached_st *memc;
55 };
56
57 struct conclusions_st {
58 long int load_time;
59 long int read_time;
60 unsigned int rows_loaded;
61 unsigned int rows_read;
62 };
63
64 /* Prototypes */
65 void options_parse(int argc, char *argv[]);
66 void conclusions_print(conclusions_st *conclusion);
67 void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
68 pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
69 unsigned int *actual_loaded);
70 void flush_all(memcached_st *memc);
71
72 static int opt_binary= 0;
73 static int opt_verbose= 0;
74 static int opt_flush= 0;
75 static int opt_non_blocking_io= 0;
76 static int opt_tcp_nodelay= 0;
77 static unsigned int opt_execute_number= 0;
78 static unsigned int opt_createial_load= 0;
79 static unsigned int opt_concurrency= 0;
80 static int opt_displayflag= 0;
81 static char *opt_servers= NULL;
82 static int opt_udp_io= 0;
83 test_type opt_test= SET_TEST;
84
85 int main(int argc, char *argv[])
86 {
87 conclusions_st conclusion;
88 memcached_server_st *servers;
89
90 memset(&conclusion, 0, sizeof(conclusions_st));
91
92 srandom((unsigned int)time(NULL));
93 options_parse(argc, argv);
94
95 if (!opt_servers)
96 {
97 char *temp;
98
99 if ((temp= getenv("MEMCACHED_SERVERS")))
100 opt_servers= strdup(temp);
101 else
102 {
103 fprintf(stderr, "No Servers provided\n");
104 exit(1);
105 }
106 }
107
108 servers= memcached_servers_parse(opt_servers);
109
110 pthread_mutex_init(&counter_mutex, NULL);
111 pthread_cond_init(&count_threshhold, NULL);
112 pthread_mutex_init(&sleeper_mutex, NULL);
113 pthread_cond_init(&sleep_threshhold, NULL);
114
115 scheduler(servers, &conclusion);
116
117 free(opt_servers);
118
119 (void)pthread_mutex_destroy(&counter_mutex);
120 (void)pthread_cond_destroy(&count_threshhold);
121 (void)pthread_mutex_destroy(&sleeper_mutex);
122 (void)pthread_cond_destroy(&sleep_threshhold);
123 conclusions_print(&conclusion);
124 memcached_server_list_free(servers);
125
126 return 0;
127 }
128
129 void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
130 {
131 unsigned int x;
132 unsigned int actual_loaded= 0; /* Fix warning */
133 memcached_st *memc;
134
135 struct timeval start_time, end_time;
136 pthread_t mainthread; /* Thread descriptor */
137 pthread_attr_t attr; /* Thread attributes */
138 pairs_st *pairs= NULL;
139
140 pthread_attr_init(&attr);
141 pthread_attr_setdetachstate(&attr,
142 PTHREAD_CREATE_DETACHED);
143
144 memc= memcached_create(NULL);
145
146 /* We need to set udp behavior before adding servers to the client */
147 if (opt_udp_io)
148 {
149 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP,
150 (uint64_t)opt_udp_io);
151 for(x= 0; x < servers[0].count; x++ )
152 servers[x].type= MEMCACHED_CONNECTION_UDP;
153 }
154 memcached_server_push(memc, servers);
155
156 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL,
157 (uint64_t)opt_binary);
158
159 if (opt_flush)
160 flush_all(memc);
161 if (opt_createial_load)
162 pairs= load_create_data(memc, opt_createial_load, &actual_loaded);
163
164 /* We set this after we have loaded */
165 {
166 if (opt_non_blocking_io)
167 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
168 if (opt_tcp_nodelay)
169 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
170 }
171
172
173 pthread_mutex_lock(&counter_mutex);
174 thread_counter= 0;
175
176 pthread_mutex_lock(&sleeper_mutex);
177 master_wakeup= 1;
178 pthread_mutex_unlock(&sleeper_mutex);
179
180 for (x= 0; x < opt_concurrency; x++)
181 {
182 thread_context_st *context;
183 context= (thread_context_st *)calloc(1, sizeof(thread_context_st));
184
185 context->memc= memcached_clone(NULL, memc);
186 context->test= opt_test;
187
188 context->initial_pairs= pairs;
189 context->initial_number= actual_loaded;
190
191 if (opt_test == SET_TEST)
192 {
193 context->execute_pairs= pairs_generate(opt_execute_number, 400);
194 context->execute_number= opt_execute_number;
195 }
196
197 /* now you create the thread */
198 if (pthread_create(&mainthread, &attr, run_task,
199 (void *)context) != 0)
200 {
201 fprintf(stderr,"Could not create thread\n");
202 exit(1);
203 }
204 thread_counter++;
205 }
206
207 pthread_mutex_unlock(&counter_mutex);
208 pthread_attr_destroy(&attr);
209
210 pthread_mutex_lock(&sleeper_mutex);
211 master_wakeup= 0;
212 pthread_mutex_unlock(&sleeper_mutex);
213 pthread_cond_broadcast(&sleep_threshhold);
214
215 gettimeofday(&start_time, NULL);
216 /*
217 We loop until we know that all children have cleaned up.
218 */
219 pthread_mutex_lock(&counter_mutex);
220 while (thread_counter)
221 pthread_cond_wait(&count_threshhold, &counter_mutex);
222 pthread_mutex_unlock(&counter_mutex);
223
224 gettimeofday(&end_time, NULL);
225
226 conclusion->load_time= timedif(end_time, start_time);
227 conclusion->read_time= timedif(end_time, start_time);
228 pairs_free(pairs);
229 memcached_free(memc);
230 }
231
232 void options_parse(int argc, char *argv[])
233 {
234 memcached_programs_help_st help_options[]=
235 {
236 {0},
237 };
238
239 static struct option long_options[]=
240 {
241 {(OPTIONSTRING)"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
242 {(OPTIONSTRING)"debug", no_argument, &opt_verbose, OPT_DEBUG},
243 {(OPTIONSTRING)"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
244 {(OPTIONSTRING)"flag", no_argument, &opt_displayflag, OPT_FLAG},
245 {(OPTIONSTRING)"flush", no_argument, &opt_flush, OPT_FLUSH},
246 {(OPTIONSTRING)"help", no_argument, NULL, OPT_HELP},
247 {(OPTIONSTRING)"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
248 {(OPTIONSTRING)"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
249 {(OPTIONSTRING)"servers", required_argument, NULL, OPT_SERVERS},
250 {(OPTIONSTRING)"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY},
251 {(OPTIONSTRING)"test", required_argument, NULL, OPT_SLAP_TEST},
252 {(OPTIONSTRING)"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
253 {(OPTIONSTRING)"version", no_argument, NULL, OPT_VERSION},
254 {(OPTIONSTRING)"binary", no_argument, NULL, OPT_BINARY},
255 {(OPTIONSTRING)"udp", no_argument, NULL, OPT_UDP},
256 {0, 0, 0, 0},
257 };
258
259 int option_index= 0;
260 int option_rv;
261
262 while (1)
263 {
264 option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index);
265 if (option_rv == -1) break;
266 switch (option_rv)
267 {
268 case 0:
269 break;
270 case OPT_UDP:
271 if (opt_test == GET_TEST)
272 {
273 fprintf(stderr, "You can not run a get test in UDP mode. UDP mode "
274 "does not currently support get ops.\n");
275 exit(1);
276 }
277 opt_udp_io= 1;
278 break;
279 case OPT_BINARY:
280 opt_binary = 1;
281 break;
282 case OPT_VERBOSE: /* --verbose or -v */
283 opt_verbose = OPT_VERBOSE;
284 break;
285 case OPT_DEBUG: /* --debug or -d */
286 opt_verbose = OPT_DEBUG;
287 break;
288 case OPT_VERSION: /* --version or -V */
289 version_command(PROGRAM_NAME);
290 break;
291 case OPT_HELP: /* --help or -h */
292 help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
293 break;
294 case OPT_SERVERS: /* --servers or -s */
295 opt_servers= strdup(optarg);
296 break;
297 case OPT_SLAP_TEST:
298 if (!strcmp(optarg, "get"))
299 {
300 if (opt_udp_io == 1)
301 {
302 fprintf(stderr, "You can not run a get test in UDP mode. UDP mode "
303 "does not currently support get ops.\n");
304 exit(1);
305 }
306 opt_test= GET_TEST ;
307 }
308 else if (!strcmp(optarg, "set"))
309 opt_test= SET_TEST;
310 else
311 {
312 fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
313 exit(1);
314 }
315 break;
316 case OPT_SLAP_CONCURRENCY:
317 opt_concurrency= (unsigned int)strtoul(optarg, (char **)NULL, 10);
318 break;
319 case OPT_SLAP_EXECUTE_NUMBER:
320 opt_execute_number= (unsigned int)strtoul(optarg, (char **)NULL, 10);
321 break;
322 case OPT_SLAP_INITIAL_LOAD:
323 opt_createial_load= (unsigned int)strtoul(optarg, (char **)NULL, 10);
324 break;
325 case '?':
326 /* getopt_long already printed an error message. */
327 exit(1);
328 default:
329 abort();
330 }
331 }
332
333 if (opt_test == GET_TEST && opt_createial_load == 0)
334 opt_createial_load= DEFAULT_INITIAL_LOAD;
335
336 if (opt_execute_number == 0)
337 opt_execute_number= DEFAULT_EXECUTE_NUMBER;
338
339 if (opt_concurrency == 0)
340 opt_concurrency= DEFAULT_CONCURRENCY;
341 }
342
343 void conclusions_print(conclusions_st *conclusion)
344 {
345 printf("\tThreads connecting to servers %u\n", opt_concurrency);
346 #ifdef NOT_FINISHED
347 printf("\tLoaded %u rows\n", conclusion->rows_loaded);
348 printf("\tRead %u rows\n", conclusion->rows_read);
349 #endif
350 if (opt_test == SET_TEST)
351 printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000,
352 conclusion->load_time % 1000);
353 else
354 printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
355 conclusion->read_time % 1000);
356 }
357
358 void *run_task(void *p)
359 {
360 thread_context_st *context= (thread_context_st *)p;
361 memcached_st *memc;
362
363 memc= context->memc;
364
365 pthread_mutex_lock(&sleeper_mutex);
366 while (master_wakeup)
367 {
368 pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
369 }
370 pthread_mutex_unlock(&sleeper_mutex);
371
372 /* Do Stuff */
373 switch (context->test)
374 {
375 case SET_TEST:
376 assert(context->execute_pairs);
377 execute_set(memc, context->execute_pairs, context->execute_number);
378 break;
379 case GET_TEST:
380 execute_get(memc, context->initial_pairs, context->initial_number);
381 break;
382 default:
383 WATCHPOINT_ASSERT(context->test);
384 break;
385 }
386
387 memcached_free(memc);
388
389 if (context->execute_pairs)
390 pairs_free(context->execute_pairs);
391
392 free(context);
393
394 pthread_mutex_lock(&counter_mutex);
395 thread_counter--;
396 pthread_cond_signal(&count_threshhold);
397 pthread_mutex_unlock(&counter_mutex);
398
399 return NULL;
400 }
401
402 void flush_all(memcached_st *memc)
403 {
404 memcached_flush(memc, 0);
405 }
406
407 pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
408 unsigned int *actual_loaded)
409 {
410 memcached_st *memc_clone;
411 pairs_st *pairs;
412
413 memc_clone= memcached_clone(NULL, memc);
414 /* We always used non-blocking IO for load since it is faster */
415 memcached_behavior_set(memc_clone, MEMCACHED_BEHAVIOR_NO_BLOCK, 0);
416
417 pairs= pairs_generate(number_of, 400);
418 *actual_loaded= execute_set(memc_clone, pairs, number_of);
419
420 memcached_free(memc_clone);
421
422 return pairs;
423 }