WIP
[m6w6/libmemcached] / contrib / bin / memaslap / ms_task.c
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #if defined(HAVE_SYS_TIME_H)
19 # include <sys/time.h>
20 #endif
21 #include <time.h>
22 #include <inttypes.h>
23
24 #include "ms_thread.h"
25 #include "ms_setting.h"
26 #include "ms_atomic.h"
27
28 /* command distribution adjustment cycle */
29 #define CMD_DISTR_ADJUST_CYCLE 1000
30 #define DISADJUST_FACTOR \
31 0.03 /** \
32 * In one adjustment cycle, if undo set or get \
33 * operations proportion is more than 3% , means \
34 * there are too many new item or need more new \
35 * item in the window. This factor shows it. \
36 */
37
38 /* get item from task window */
39 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
40 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
41 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
42 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c);
43
44 /* select next operation to do */
45 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
46
47 /* set and get speed estimate for controlling and adjustment */
48 static bool ms_is_set_too_fast(ms_task_t *task);
49 static bool ms_is_get_too_fast(ms_task_t *task);
50 static void ms_kick_out_item(ms_task_item_t *item);
51
52 /* miss rate adjustment */
53 static bool ms_need_overwrite_item(ms_task_t *task);
54 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
55
56 /* deal with data verification initialization */
57 static void ms_task_data_verify_init(ms_task_t *task);
58 static void ms_task_expire_verify_init(ms_task_t *task);
59
60 /* select a new task to do */
61 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
62
63 /* run the selected task */
64 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
65 static void ms_update_stat_result(ms_conn_t *c);
66 static void ms_update_multi_get_result(ms_conn_t *c);
67 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
68 static void ms_update_task_result(ms_conn_t *c);
69 static void ms_single_getset_task_sch(ms_conn_t *c);
70 static void ms_multi_getset_task_sch(ms_conn_t *c);
71 static void ms_send_signal(ms_sync_lock_t *sync_lock);
72 static void ms_warmup_server(ms_conn_t *c);
73 static int ms_run_getset_task(ms_conn_t *c);
74
75 /**
76 * used to get the current operation item(object)
77 *
78 * @param c, pointer of the concurrency
79 *
80 * @return ms_task_item_t*, current operating item
81 */
82 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c) {
83 return c->curr_task.item;
84 }
85
86 /**
87 * used to get the next item to do get operation
88 *
89 * @param c, pointer of the concurrency
90 *
91 * @return ms_task_item_t*, the pointer of the next item to do
92 * get operation
93 */
94 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c) {
95 ms_task_item_t *item = NULL;
96
97 if (c->set_cursor <= 0) {
98 /* the first item in the window */
99 item = &c->item_win[0];
100 } else if (c->set_cursor > 0 && c->set_cursor < (uint32_t) c->win_size) {
101 /* random get one item set before */
102 item = &c->item_win[random() % (int64_t) c->set_cursor];
103 } else {
104 /* random get one item from the window */
105 item = &c->item_win[random() % c->win_size];
106 }
107
108 return item;
109 } /* ms_get_next_get_item */
110
111 /**
112 * used to get the next item to do set operation
113 *
114 * @param c, pointer of the concurrency
115 *
116 * @return ms_task_item_t*, the pointer of the next item to do
117 * set operation
118 */
119 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c) {
120 /**
121 * when a set command successes, the cursor will plus 1. If set
122 * fails, the cursor doesn't change. it isn't necessary to
123 * increase the cursor here.
124 */
125 return &c->item_win[(int64_t) c->set_cursor % c->win_size];
126 }
127
128 /**
129 * If we need do overwrite, we could select a item set before.
130 * This function is used to get a item set before to do
131 * overwrite.
132 *
133 * @param c, pointer of the concurrency
134 *
135 * @return ms_task_item_t*, the pointer of the previous item of
136 * set operation
137 */
138 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c) {
139 return ms_get_next_get_item(c);
140 } /* ms_get_random_overwrite_item */
141
142 /**
143 * According to the proportion of operations(get or set), select
144 * an operation to do.
145 *
146 * @param c, pointer of the concurrency
147 * @param task, pointer of current task in the concurrency
148 */
149 static void ms_select_opt(ms_conn_t *c, ms_task_t *task) {
150 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
151 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
152
153 /* update cycle operation number if necessary */
154 if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0)) {
155 task->cycle_undo_get += (int) (CMD_DISTR_ADJUST_CYCLE * get_prop);
156 task->cycle_undo_set += (int) (CMD_DISTR_ADJUST_CYCLE * set_prop);
157 }
158
159 /**
160 * According to operation distribution to choose doing which
161 * operation. If it can't set new object to sever, just change
162 * to do get operation.
163 */
164 if ((set_prop > PROP_ERROR)
165 && ((double) task->get_opt * set_prop >= (double) task->set_opt * get_prop))
166 {
167 task->cmd = CMD_SET;
168 task->item = ms_get_next_set_item(c);
169 } else {
170 task->cmd = CMD_GET;
171 task->item = ms_get_next_get_item(c);
172 }
173 } /* ms_select_opt */
174
175 /**
176 * used to judge whether the number of get operations done is
177 * more than expected number of get operations to do right now.
178 *
179 * @param task, pointer of current task in the concurrency
180 *
181 * @return bool, if get too fast, return true, else return false
182 */
183 static bool ms_is_get_too_fast(ms_task_t *task) {
184 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
185 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
186
187 /* no get operation */
188 if (get_prop < PROP_ERROR) {
189 return false;
190 }
191
192 int max_undo_set = (int) (set_prop / get_prop * (1.0 + DISADJUST_FACTOR)) * task->cycle_undo_get;
193
194 if (((double) task->get_opt * set_prop > (double) task->set_opt * get_prop)
195 && (task->cycle_undo_set > max_undo_set))
196 {
197 return true;
198 }
199
200 return false;
201 } /* ms_is_get_too_fast */
202
203 /**
204 * used to judge whether the number of set operations done is
205 * more than expected number of set operations to do right now.
206 *
207 * @param task, pointer of current task in the concurrency
208 *
209 * @return bool, if set too fast, return true, else return false
210 */
211 static bool ms_is_set_too_fast(ms_task_t *task) {
212 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
213 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
214
215 /* no set operation */
216 if (set_prop < PROP_ERROR) {
217 return false;
218 }
219
220 /* If it does set operation too fast, skip some */
221 int max_undo_get =
222 (int) ((get_prop / set_prop * (1.0 + DISADJUST_FACTOR)) * (double) task->cycle_undo_set);
223
224 if (((double) task->get_opt * set_prop < (double) task->set_opt * get_prop)
225 && (task->cycle_undo_get > max_undo_get))
226 {
227 return true;
228 }
229
230 return false;
231 } /* ms_is_set_too_fast */
232
233 /**
234 * kick out the old item in the window, and add a new item to
235 * overwrite the old item. When we don't want to do overwrite
236 * object, and the current item to do set operation is an old
237 * item, we could kick out the old item and add a new item. Then
238 * we can ensure we set new object every time.
239 *
240 * @param item, pointer of task item which includes the object
241 * information
242 */
243 static void ms_kick_out_item(ms_task_item_t *item) {
244 /* allocate a new item */
245 item->key_prefix = ms_get_key_prefix();
246
247 item->key_suffix_offset++;
248 item->value_offset = INVALID_OFFSET; /* new item use invalid value offset */
249 item->client_time = 0;
250 } /* ms_kick_out_item */
251
252 /**
253 * used to judge whether we need overwrite object based on the
254 * options user specified
255 *
256 * @param task, pointer of current task in the concurrency
257 *
258 * @return bool, if need overwrite, return true, else return
259 * false
260 */
261 static bool ms_need_overwrite_item(ms_task_t *task) {
262 ms_task_item_t *item = task->item;
263
264 assert(item);
265 assert(task->cmd == CMD_SET);
266
267 /**
268 * according to data overwrite percent to determine if do data
269 * overwrite.
270 */
271 if (task->overwrite_set < (double) task->set_opt * ms_setting.overwrite_percent) {
272 return true;
273 }
274
275 return false;
276 } /* ms_need_overwirte_item */
277
278 /**
279 * used to adjust operation. the function must be called after
280 * select operation. the function change get operation to set
281 * operation, or set operation to get operation based on the
282 * current case.
283 *
284 * @param c, pointer of the concurrency
285 * @param task, pointer of current task in the concurrency
286 *
287 * @return bool, if success, return true, else return false
288 */
289 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task) {
290 ms_task_item_t *item = task->item;
291
292 assert(item);
293
294 if (task->cmd == CMD_SET) {
295 /* If did set operation too fast, skip some */
296 if (ms_is_set_too_fast(task)) {
297 /* get the item instead */
298 if (item->value_offset != INVALID_OFFSET) {
299 task->cmd = CMD_GET;
300 return true;
301 }
302 }
303
304 /* If the current item is not a new item, kick it out */
305 if (item->value_offset != INVALID_OFFSET) {
306 if (ms_need_overwrite_item(task)) {
307 /* overwrite */
308 task->overwrite_set++;
309 } else {
310 /* kick out the current item to do set operation */
311 ms_kick_out_item(item);
312 }
313 } else /* it's a new item */ {
314 /* need overwrite */
315 if (ms_need_overwrite_item(task)) {
316 /**
317 * overwrite not use the item with current set cursor, revert
318 * set cursor.
319 */
320 c->set_cursor--;
321
322 item = ms_get_random_overwrite_item(c);
323 if (item->value_offset != INVALID_OFFSET) {
324 task->item = item;
325 task->overwrite_set++;
326 } else /* item is a new item */ {
327 /* select the item to run, and cancel overwrite */
328 task->item = item;
329 }
330 }
331 }
332 task->cmd = CMD_SET;
333 return true;
334 } else {
335 if (item->value_offset == INVALID_OFFSET) {
336 task->cmd = CMD_SET;
337 return true;
338 }
339
340 /**
341 * If It does get operation too fast, it will change the
342 * operation to set.
343 */
344 if (ms_is_get_too_fast(task)) {
345 /* don't kick out the first item in the window */
346 if (!ms_is_set_too_fast(task)) {
347 ms_kick_out_item(item);
348 task->cmd = CMD_SET;
349 return true;
350 } else {
351 return false;
352 }
353 }
354
355 assert(item->value_offset != INVALID_OFFSET);
356
357 task->cmd = CMD_GET;
358 return true;
359 }
360 } /* ms_adjust_opt */
361
362 /**
363 * used to initialize the task which need verify data.
364 *
365 * @param task, pointer of current task in the concurrency
366 */
367 static void ms_task_data_verify_init(ms_task_t *task) {
368 ms_task_item_t *item = task->item;
369
370 assert(item);
371 assert(task->cmd == CMD_GET);
372
373 /**
374 * according to data verification percent to determine if do
375 * data verification.
376 */
377 if (task->verified_get < (double) task->get_opt * ms_setting.verify_percent) {
378 /**
379 * currently it doesn't do verify, just increase the counter,
380 * and do verification next proper get command
381 */
382 if ((task->item->value_offset != INVALID_OFFSET) && (item->exp_time == 0)) {
383 task->verify = true;
384 task->finish_verify = false;
385 task->verified_get++;
386 }
387 }
388 } /* ms_task_data_verify_init */
389
390 /**
391 * used to initialize the task which need verify expire time.
392 *
393 * @param task, pointer of current task in the concurrency
394 */
395 static void ms_task_expire_verify_init(ms_task_t *task) {
396 ms_task_item_t *item = task->item;
397
398 assert(item);
399 assert(task->cmd == CMD_GET);
400 assert(item->exp_time > 0);
401
402 task->verify = true;
403 task->finish_verify = false;
404 } /* ms_task_expire_verify_init */
405
406 /**
407 * used to get one task, the function initializes the task
408 * structure.
409 *
410 * @param c, pointer of the concurrency
411 * @param warmup, whether it need warmup
412 *
413 * @return ms_task_t*, pointer of current task in the
414 * concurrency
415 */
416 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup) {
417 ms_task_t *task = &c->curr_task;
418
419 while (1) {
420 task->verify = false;
421 task->finish_verify = true;
422 task->get_miss = true;
423
424 if (warmup) {
425 task->cmd = CMD_SET;
426 task->item = ms_get_next_set_item(c);
427
428 return task;
429 }
430
431 /* according to operation distribution to choose doing which operation */
432 ms_select_opt(c, task);
433
434 if (!ms_adjust_opt(c, task)) {
435 continue;
436 }
437
438 if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET)) {
439 ms_task_data_verify_init(task);
440 }
441
442 if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET) && (task->item->exp_time > 0)) {
443 ms_task_expire_verify_init(task);
444 }
445
446 break;
447 }
448
449 /**
450 * Only update get and delete counter, set counter will be
451 * updated after set operation successes.
452 */
453 if (task->cmd == CMD_GET) {
454 task->get_opt++;
455 task->cycle_undo_get--;
456 }
457
458 return task;
459 } /* ms_get_task */
460
461 /**
462 * send a signal to the main monitor thread
463 *
464 * @param sync_lock, pointer of the lock
465 */
466 static void ms_send_signal(ms_sync_lock_t *sync_lock) {
467 pthread_mutex_lock(&sync_lock->lock);
468 sync_lock->count++;
469 pthread_cond_signal(&sync_lock->cond);
470 pthread_mutex_unlock(&sync_lock->lock);
471 } /* ms_send_signal */
472
473 /**
474 * If user only want to do get operation, but there is no object
475 * in server , so we use this function to warmup the server, and
476 * set some objects to server. It runs at the beginning of task.
477 *
478 * @param c, pointer of the concurrency
479 */
480 static void ms_warmup_server(ms_conn_t *c) {
481 ms_task_t *task;
482 ms_task_item_t *item;
483
484 /**
485 * Extra one loop to get the last command returned state.
486 * Normally it gets the previous command returned state.
487 */
488 if ((c->remain_warmup_num >= 0) && (c->remain_warmup_num != c->warmup_num)) {
489 item = ms_get_cur_opt_item(c);
490 /* only update the set command result state for data verification */
491 if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED)) {
492 item->value_offset = item->key_suffix_offset;
493 /* set success, update counter */
494 c->set_cursor++;
495 } else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED) {
496 printf("key: %" PRIx64 " didn't set success\n", item->key_prefix);
497 }
498 }
499
500 /* the last time don't run a task */
501 if (c->remain_warmup_num-- > 0) {
502 /* operate next task item */
503 task = ms_get_task(c, true);
504 item = task->item;
505 ms_mcd_set(c, item);
506 }
507
508 /**
509 * finish warming up server, wait all connects initialize
510 * complete. Then all connects can start do task at the same
511 * time.
512 */
513 if (c->remain_warmup_num == -1) {
514 ms_send_signal(&ms_global.warmup_lock);
515 c->remain_warmup_num--; /* never run the if branch */
516 }
517 } /* ms_warmup_server */
518
519 /**
520 * dispatch single get and set task
521 *
522 * @param c, pointer of the concurrency
523 */
524 static void ms_single_getset_task_sch(ms_conn_t *c) {
525 ms_task_t *task;
526 ms_task_item_t *item;
527
528 /* the last time don't run a task */
529 if (c->remain_exec_num-- > 0) {
530 task = ms_get_task(c, false);
531 item = task->item;
532 if (task->cmd == CMD_SET) {
533 ms_mcd_set(c, item);
534 } else if (task->cmd == CMD_GET) {
535 assert(task->cmd == CMD_GET);
536 ms_mcd_get(c, item);
537 }
538 }
539 } /* ms_single_getset_task_sch */
540
541 /**
542 * dispatch multi-get and set task
543 *
544 * @param c, pointer of the concurrency
545 */
546 static void ms_multi_getset_task_sch(ms_conn_t *c) {
547 ms_task_t *task;
548 ms_mlget_task_item_t *mlget_item;
549
550 while (1) {
551 if (c->remain_exec_num-- > 0) {
552 task = ms_get_task(c, false);
553 if (task->cmd == CMD_SET) /* just do it */ {
554 ms_mcd_set(c, task->item);
555 break;
556 } else {
557 assert(task->cmd == CMD_GET);
558 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
559 mlget_item->item = task->item;
560 mlget_item->verify = task->verify;
561 mlget_item->finish_verify = task->finish_verify;
562 mlget_item->get_miss = task->get_miss;
563 c->mlget_task.mlget_num++;
564
565 /* enough multi-get task items can be done */
566 if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
567 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
568 {
569 ms_mcd_mlget(c);
570 break;
571 }
572 }
573 } else {
574 if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0)) {
575 ms_mcd_mlget(c);
576 }
577 break;
578 }
579 }
580 } /* ms_multi_getset_task_sch */
581
582 /**
583 * calculate the difference value of two time points
584 *
585 * @param start_time, the start time
586 * @param end_time, the end time
587 *
588 * @return uint64_t, the difference value between start_time and end_time in us
589 */
590 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time) {
591 int64_t endtime = end_time->tv_sec * 1000000 + end_time->tv_usec;
592 int64_t starttime = start_time->tv_sec * 1000000 + start_time->tv_usec;
593
594 assert(endtime >= starttime);
595
596 return endtime - starttime;
597 } /* ms_time_diff */
598
599 /**
600 * after get the response from server for multi-get, the
601 * function update the state of the task and do data verify if
602 * necessary.
603 *
604 * @param c, pointer of the concurrency
605 */
606 static void ms_update_multi_get_result(ms_conn_t *c) {
607 ms_mlget_task_item_t *mlget_item;
608 ms_task_item_t *item;
609 char *orignval = NULL;
610 char *orignkey = NULL;
611
612 if (c == NULL) {
613 return;
614 }
615 assert(c);
616
617 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
618 mlget_item = &c->mlget_task.mlget_item[i];
619 item = mlget_item->item;
620 orignval = &ms_setting.char_block[item->value_offset];
621 orignkey = &ms_setting.char_block[item->key_suffix_offset];
622
623 /* update get miss counter */
624 if (mlget_item->get_miss) {
625 atomic_add_size(&ms_stats.get_misses, 1);
626 }
627
628 /* get nothing from server for this task item */
629 if (mlget_item->verify && !mlget_item->finish_verify) {
630 /* verify expire time if necessary */
631 if (item->exp_time > 0) {
632 struct timeval curr_time;
633 gettimeofday(&curr_time, NULL);
634
635 /* object doesn't expire but can't get it now */
636 if (curr_time.tv_sec - item->client_time < item->exp_time - EXPIRE_TIME_ERROR) {
637 atomic_add_size(&ms_stats.unexp_unget, 1);
638
639 if (ms_setting.verbose) {
640 char set_time[64];
641 char cur_time[64];
642 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&item->client_time));
643 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&curr_time.tv_sec));
644 fprintf(stderr,
645 "\n\t<%d expire time verification failed, object "
646 "doesn't expire but can't get it now\n"
647 "\tkey len: %d\n"
648 "\tkey: %" PRIx64 " %.*s\n"
649 "\tset time: %s current time: %s "
650 "diff time: %d expire time: %d\n"
651 "\texpected data len: %d\n"
652 "\texpected data: %.*s\n"
653 "\treceived data: \n",
654 c->sfd, item->key_size, item->key_prefix,
655 item->key_size - (int) KEY_PREFIX_SIZE, orignkey, set_time, cur_time,
656 (int) (curr_time.tv_sec - item->client_time), item->exp_time, item->value_size,
657 item->value_size, orignval);
658 fflush(stderr);
659 }
660 }
661 } else {
662 atomic_add_size(&ms_stats.vef_miss, 1);
663
664 if (ms_setting.verbose) {
665 fprintf(stderr,
666 "\n<%d data verification failed\n"
667 "\tkey len: %d\n"
668 "\tkey: %" PRIx64 " %.*s\n"
669 "\texpected data len: %d\n"
670 "\texpected data: %.*s\n"
671 "\treceived data: \n",
672 c->sfd, item->key_size, item->key_prefix, item->key_size - (int) KEY_PREFIX_SIZE,
673 orignkey, item->value_size, item->value_size, orignval);
674 fflush(stderr);
675 }
676 }
677 }
678 }
679 c->mlget_task.mlget_num = 0;
680 c->mlget_task.value_index = INVALID_OFFSET;
681 } /* ms_update_multi_get_result */
682
683 /**
684 * after get the response from server for single get, the
685 * function update the state of the task and do data verify if
686 * necessary.
687 *
688 * @param c, pointer of the concurrency
689 * @param item, pointer of task item which includes the object
690 * information
691 */
692 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item) {
693 char *orignval = NULL;
694 char *orignkey = NULL;
695
696 if ((c == NULL) || (item == NULL)) {
697 return;
698 }
699 assert(c);
700 assert(item);
701
702 orignval = &ms_setting.char_block[item->value_offset];
703 orignkey = &ms_setting.char_block[item->key_suffix_offset];
704
705 /* update get miss counter */
706 if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss) {
707 atomic_add_size(&ms_stats.get_misses, 1);
708 }
709
710 /* get nothing from server for this task item */
711 if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify && !c->curr_task.finish_verify) {
712 /* verify expire time if necessary */
713 if (item->exp_time > 0) {
714 struct timeval curr_time;
715 gettimeofday(&curr_time, NULL);
716
717 /* object doesn't expire but can't get it now */
718 if (curr_time.tv_sec - item->client_time < item->exp_time - EXPIRE_TIME_ERROR) {
719 atomic_add_size(&ms_stats.unexp_unget, 1);
720
721 if (ms_setting.verbose) {
722 char set_time[64];
723 char cur_time[64];
724 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&item->client_time));
725 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&curr_time.tv_sec));
726 fprintf(stderr,
727 "\n\t<%d expire time verification failed, object "
728 "doesn't expire but can't get it now\n"
729 "\tkey len: %d\n"
730 "\tkey: %" PRIx64 " %.*s\n"
731 "\tset time: %s current time: %s "
732 "diff time: %d expire time: %d\n"
733 "\texpected data len: %d\n"
734 "\texpected data: %.*s\n"
735 "\treceived data: \n",
736 c->sfd, item->key_size, item->key_prefix, item->key_size - (int) KEY_PREFIX_SIZE,
737 orignkey, set_time, cur_time, (int) (curr_time.tv_sec - item->client_time),
738 item->exp_time, item->value_size, item->value_size, orignval);
739 fflush(stderr);
740 }
741 }
742 } else {
743 atomic_add_size(&ms_stats.vef_miss, 1);
744
745 if (ms_setting.verbose) {
746 fprintf(stderr,
747 "\n<%d data verification failed\n"
748 "\tkey len: %d\n"
749 "\tkey: %" PRIx64 " %.*s\n"
750 "\texpected data len: %d\n"
751 "\texpected data: %.*s\n"
752 "\treceived data: \n",
753 c->sfd, item->key_size, item->key_prefix, item->key_size - (int) KEY_PREFIX_SIZE,
754 orignkey, item->value_size, item->value_size, orignval);
755 fflush(stderr);
756 }
757 }
758 }
759 } /* ms_update_single_get_result */
760
761 /**
762 * after get the response from server for set the function
763 * update the state of the task and do data verify if necessary.
764 *
765 * @param c, pointer of the concurrency
766 * @param item, pointer of task item which includes the object
767 * information
768 */
769 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item) {
770 if ((c == NULL) || (item == NULL)) {
771 return;
772 }
773 assert(c);
774 assert(item);
775
776 if (c->precmd.cmd == CMD_SET) {
777 switch (c->precmd.retstat) {
778 case MCD_STORED:
779 if (item->value_offset == INVALID_OFFSET) {
780 /* first set with the same offset of key suffix */
781 item->value_offset = item->key_suffix_offset;
782 } else {
783 /* not first set, just increase the value offset */
784 item->value_offset += 1;
785 }
786
787 /* set successes, update counter */
788 c->set_cursor++;
789 c->curr_task.set_opt++;
790 c->curr_task.cycle_undo_set--;
791 break;
792
793 case MCD_SERVER_ERROR:
794 default:
795 break;
796 } /* switch */
797 }
798 } /* ms_update_set_result */
799
800 /**
801 * update the response time result
802 *
803 * @param c, pointer of the concurrency
804 */
805 static void ms_update_stat_result(ms_conn_t *c) {
806 bool get_miss = false;
807
808 if (c == NULL) {
809 return;
810 }
811 assert(c);
812
813 gettimeofday(&c->end_time, NULL);
814 uint64_t time_diff = (uint64_t) ms_time_diff(&c->start_time, &c->end_time);
815
816 pthread_mutex_lock(&ms_statistic.stat_mutex);
817
818 switch (c->precmd.cmd) {
819 case CMD_SET:
820 ms_record_event(&ms_statistic.set_stat, time_diff, false);
821 break;
822
823 case CMD_GET:
824 if (c->curr_task.get_miss) {
825 get_miss = true;
826 }
827 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
828 break;
829
830 default:
831 break;
832 } /* switch */
833
834 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
835 pthread_mutex_unlock(&ms_statistic.stat_mutex);
836 } /* ms_update_stat_result */
837
838 /**
839 * after get response from server for the current operation, and
840 * before doing the next operation, update the state of the
841 * current operation.
842 *
843 * @param c, pointer of the concurrency
844 */
845 static void ms_update_task_result(ms_conn_t *c) {
846 ms_task_item_t *item;
847
848 if (c == NULL) {
849 return;
850 }
851 assert(c);
852
853 item = ms_get_cur_opt_item(c);
854 if (item == NULL) {
855 return;
856 }
857 assert(item);
858
859 ms_update_set_result(c, item);
860
861 if ((ms_setting.stat_freq > 0) && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET))) {
862 ms_update_stat_result(c);
863 }
864
865 /* update multi-get task item */
866 if (((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
867 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
868 {
869 ms_update_multi_get_result(c);
870 } else {
871 ms_update_single_get_result(c, item);
872 }
873 } /* ms_update_task_result */
874
875 /**
876 * run get and set operation
877 *
878 * @param c, pointer of the concurrency
879 *
880 * @return int, if success, return EXIT_SUCCESS, else return -1
881 */
882 static int ms_run_getset_task(ms_conn_t *c) {
883 /**
884 * extra one loop to get the last command return state. get the
885 * last command return state.
886 */
887 if ((c->remain_exec_num >= 0) && (c->remain_exec_num != c->exec_num)) {
888 ms_update_task_result(c);
889 }
890
891 /* multi-get */
892 if (ms_setting.mult_key_num > 1) {
893 /* operate next task item */
894 ms_multi_getset_task_sch(c);
895 } else {
896 /* operate next task item */
897 ms_single_getset_task_sch(c);
898 }
899
900 /* no task to do, exit */
901 if ((c->remain_exec_num == -1) || ms_global.time_out) {
902 return -1;
903 }
904
905 return EXIT_SUCCESS;
906 } /* ms_run_getset_task */
907
908 /**
909 * the state machine call the function to execute task.
910 *
911 * @param c, pointer of the concurrency
912 *
913 * @return int, if success, return EXIT_SUCCESS, else return -1
914 */
915 int ms_exec_task(struct conn *c) {
916 if (!ms_global.finish_warmup) {
917 ms_warmup_server(c);
918 } else {
919 if (ms_run_getset_task(c)) {
920 return -1;
921 }
922 }
923
924 return EXIT_SUCCESS;
925 } /* ms_exec_task */