63d7fed865957356840b7db0e9a3782239940938
[m6w6/libmemcached] / contrib / bin / memaslap / ms_task.c
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #if defined(HAVE_SYS_TIME_H)
19 # include <sys/time.h>
20 #endif
21
22 #if defined(HAVE_TIME_H)
23 # include <time.h>
24 #endif
25
26 #include "ms_thread.h"
27 #include "ms_setting.h"
28 #include "ms_atomic.h"
29
30 /* command distribution adjustment cycle */
31 #define CMD_DISTR_ADJUST_CYCLE 1000
32 #define DISADJUST_FACTOR \
33 0.03 /** \
34 * In one adjustment cycle, if undo set or get \
35 * operations proportion is more than 3% , means \
36 * there are too many new item or need more new \
37 * item in the window. This factor shows it. \
38 */
39
40 /* get item from task window */
41 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
42 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
43 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
44 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c);
45
46 /* select next operation to do */
47 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
48
49 /* set and get speed estimate for controlling and adjustment */
50 static bool ms_is_set_too_fast(ms_task_t *task);
51 static bool ms_is_get_too_fast(ms_task_t *task);
52 static void ms_kick_out_item(ms_task_item_t *item);
53
54 /* miss rate adjustment */
55 static bool ms_need_overwrite_item(ms_task_t *task);
56 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
57
58 /* deal with data verification initialization */
59 static void ms_task_data_verify_init(ms_task_t *task);
60 static void ms_task_expire_verify_init(ms_task_t *task);
61
62 /* select a new task to do */
63 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
64
65 /* run the selected task */
66 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
67 static void ms_update_stat_result(ms_conn_t *c);
68 static void ms_update_multi_get_result(ms_conn_t *c);
69 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
70 static void ms_update_task_result(ms_conn_t *c);
71 static void ms_single_getset_task_sch(ms_conn_t *c);
72 static void ms_multi_getset_task_sch(ms_conn_t *c);
73 static void ms_send_signal(ms_sync_lock_t *sync_lock);
74 static void ms_warmup_server(ms_conn_t *c);
75 static int ms_run_getset_task(ms_conn_t *c);
76
77 /**
78 * used to get the current operation item(object)
79 *
80 * @param c, pointer of the concurrency
81 *
82 * @return ms_task_item_t*, current operating item
83 */
84 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c) {
85 return c->curr_task.item;
86 }
87
88 /**
89 * used to get the next item to do get operation
90 *
91 * @param c, pointer of the concurrency
92 *
93 * @return ms_task_item_t*, the pointer of the next item to do
94 * get operation
95 */
96 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c) {
97 ms_task_item_t *item = NULL;
98
99 if (c->set_cursor <= 0) {
100 /* the first item in the window */
101 item = &c->item_win[0];
102 } else if (c->set_cursor > 0 && c->set_cursor < (uint32_t) c->win_size) {
103 /* random get one item set before */
104 item = &c->item_win[random() % (int64_t) c->set_cursor];
105 } else {
106 /* random get one item from the window */
107 item = &c->item_win[random() % c->win_size];
108 }
109
110 return item;
111 } /* ms_get_next_get_item */
112
113 /**
114 * used to get the next item to do set operation
115 *
116 * @param c, pointer of the concurrency
117 *
118 * @return ms_task_item_t*, the pointer of the next item to do
119 * set operation
120 */
121 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c) {
122 /**
123 * when a set command successes, the cursor will plus 1. If set
124 * fails, the cursor doesn't change. it isn't necessary to
125 * increase the cursor here.
126 */
127 return &c->item_win[(int64_t) c->set_cursor % c->win_size];
128 }
129
130 /**
131 * If we need do overwrite, we could select a item set before.
132 * This function is used to get a item set before to do
133 * overwrite.
134 *
135 * @param c, pointer of the concurrency
136 *
137 * @return ms_task_item_t*, the pointer of the previous item of
138 * set operation
139 */
140 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c) {
141 return ms_get_next_get_item(c);
142 } /* ms_get_random_overwrite_item */
143
144 /**
145 * According to the proportion of operations(get or set), select
146 * an operation to do.
147 *
148 * @param c, pointer of the concurrency
149 * @param task, pointer of current task in the concurrency
150 */
151 static void ms_select_opt(ms_conn_t *c, ms_task_t *task) {
152 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
153 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
154
155 /* update cycle operation number if necessary */
156 if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0)) {
157 task->cycle_undo_get += (int) (CMD_DISTR_ADJUST_CYCLE * get_prop);
158 task->cycle_undo_set += (int) (CMD_DISTR_ADJUST_CYCLE * set_prop);
159 }
160
161 /**
162 * According to operation distribution to choose doing which
163 * operation. If it can't set new object to sever, just change
164 * to do get operation.
165 */
166 if ((set_prop > PROP_ERROR)
167 && ((double) task->get_opt * set_prop >= (double) task->set_opt * get_prop))
168 {
169 task->cmd = CMD_SET;
170 task->item = ms_get_next_set_item(c);
171 } else {
172 task->cmd = CMD_GET;
173 task->item = ms_get_next_get_item(c);
174 }
175 } /* ms_select_opt */
176
177 /**
178 * used to judge whether the number of get operations done is
179 * more than expected number of get operations to do right now.
180 *
181 * @param task, pointer of current task in the concurrency
182 *
183 * @return bool, if get too fast, return true, else return false
184 */
185 static bool ms_is_get_too_fast(ms_task_t *task) {
186 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
187 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
188
189 /* no get operation */
190 if (get_prop < PROP_ERROR) {
191 return false;
192 }
193
194 int max_undo_set = (int) (set_prop / get_prop * (1.0 + DISADJUST_FACTOR)) * task->cycle_undo_get;
195
196 if (((double) task->get_opt * set_prop > (double) task->set_opt * get_prop)
197 && (task->cycle_undo_set > max_undo_set))
198 {
199 return true;
200 }
201
202 return false;
203 } /* ms_is_get_too_fast */
204
205 /**
206 * used to judge whether the number of set operations done is
207 * more than expected number of set operations to do right now.
208 *
209 * @param task, pointer of current task in the concurrency
210 *
211 * @return bool, if set too fast, return true, else return false
212 */
213 static bool ms_is_set_too_fast(ms_task_t *task) {
214 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
215 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
216
217 /* no set operation */
218 if (set_prop < PROP_ERROR) {
219 return false;
220 }
221
222 /* If it does set operation too fast, skip some */
223 int max_undo_get =
224 (int) ((get_prop / set_prop * (1.0 + DISADJUST_FACTOR)) * (double) task->cycle_undo_set);
225
226 if (((double) task->get_opt * set_prop < (double) task->set_opt * get_prop)
227 && (task->cycle_undo_get > max_undo_get))
228 {
229 return true;
230 }
231
232 return false;
233 } /* ms_is_set_too_fast */
234
235 /**
236 * kick out the old item in the window, and add a new item to
237 * overwrite the old item. When we don't want to do overwrite
238 * object, and the current item to do set operation is an old
239 * item, we could kick out the old item and add a new item. Then
240 * we can ensure we set new object every time.
241 *
242 * @param item, pointer of task item which includes the object
243 * information
244 */
245 static void ms_kick_out_item(ms_task_item_t *item) {
246 /* allocate a new item */
247 item->key_prefix = ms_get_key_prefix();
248
249 item->key_suffix_offset++;
250 item->value_offset = INVALID_OFFSET; /* new item use invalid value offset */
251 item->client_time = 0;
252 } /* ms_kick_out_item */
253
254 /**
255 * used to judge whether we need overwrite object based on the
256 * options user specified
257 *
258 * @param task, pointer of current task in the concurrency
259 *
260 * @return bool, if need overwrite, return true, else return
261 * false
262 */
263 static bool ms_need_overwrite_item(ms_task_t *task) {
264 ms_task_item_t *item = task->item;
265
266 assert(item);
267 assert(task->cmd == CMD_SET);
268
269 /**
270 * according to data overwrite percent to determine if do data
271 * overwrite.
272 */
273 if (task->overwrite_set < (double) task->set_opt * ms_setting.overwrite_percent) {
274 return true;
275 }
276
277 return false;
278 } /* ms_need_overwirte_item */
279
280 /**
281 * used to adjust operation. the function must be called after
282 * select operation. the function change get operation to set
283 * operation, or set operation to get operation based on the
284 * current case.
285 *
286 * @param c, pointer of the concurrency
287 * @param task, pointer of current task in the concurrency
288 *
289 * @return bool, if success, return true, else return false
290 */
291 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task) {
292 ms_task_item_t *item = task->item;
293
294 assert(item);
295
296 if (task->cmd == CMD_SET) {
297 /* If did set operation too fast, skip some */
298 if (ms_is_set_too_fast(task)) {
299 /* get the item instead */
300 if (item->value_offset != INVALID_OFFSET) {
301 task->cmd = CMD_GET;
302 return true;
303 }
304 }
305
306 /* If the current item is not a new item, kick it out */
307 if (item->value_offset != INVALID_OFFSET) {
308 if (ms_need_overwrite_item(task)) {
309 /* overwrite */
310 task->overwrite_set++;
311 } else {
312 /* kick out the current item to do set operation */
313 ms_kick_out_item(item);
314 }
315 } else /* it's a new item */ {
316 /* need overwrite */
317 if (ms_need_overwrite_item(task)) {
318 /**
319 * overwrite not use the item with current set cursor, revert
320 * set cursor.
321 */
322 c->set_cursor--;
323
324 item = ms_get_random_overwrite_item(c);
325 if (item->value_offset != INVALID_OFFSET) {
326 task->item = item;
327 task->overwrite_set++;
328 } else /* item is a new item */ {
329 /* select the item to run, and cancel overwrite */
330 task->item = item;
331 }
332 }
333 }
334 task->cmd = CMD_SET;
335 return true;
336 } else {
337 if (item->value_offset == INVALID_OFFSET) {
338 task->cmd = CMD_SET;
339 return true;
340 }
341
342 /**
343 * If It does get operation too fast, it will change the
344 * operation to set.
345 */
346 if (ms_is_get_too_fast(task)) {
347 /* don't kick out the first item in the window */
348 if (!ms_is_set_too_fast(task)) {
349 ms_kick_out_item(item);
350 task->cmd = CMD_SET;
351 return true;
352 } else {
353 return false;
354 }
355 }
356
357 assert(item->value_offset != INVALID_OFFSET);
358
359 task->cmd = CMD_GET;
360 return true;
361 }
362 } /* ms_adjust_opt */
363
364 /**
365 * used to initialize the task which need verify data.
366 *
367 * @param task, pointer of current task in the concurrency
368 */
369 static void ms_task_data_verify_init(ms_task_t *task) {
370 ms_task_item_t *item = task->item;
371
372 assert(item);
373 assert(task->cmd == CMD_GET);
374
375 /**
376 * according to data verification percent to determine if do
377 * data verification.
378 */
379 if (task->verified_get < (double) task->get_opt * ms_setting.verify_percent) {
380 /**
381 * currently it doesn't do verify, just increase the counter,
382 * and do verification next proper get command
383 */
384 if ((task->item->value_offset != INVALID_OFFSET) && (item->exp_time == 0)) {
385 task->verify = true;
386 task->finish_verify = false;
387 task->verified_get++;
388 }
389 }
390 } /* ms_task_data_verify_init */
391
392 /**
393 * used to initialize the task which need verify expire time.
394 *
395 * @param task, pointer of current task in the concurrency
396 */
397 static void ms_task_expire_verify_init(ms_task_t *task) {
398 ms_task_item_t *item = task->item;
399
400 assert(item);
401 assert(task->cmd == CMD_GET);
402 assert(item->exp_time > 0);
403
404 task->verify = true;
405 task->finish_verify = false;
406 } /* ms_task_expire_verify_init */
407
408 /**
409 * used to get one task, the function initializes the task
410 * structure.
411 *
412 * @param c, pointer of the concurrency
413 * @param warmup, whether it need warmup
414 *
415 * @return ms_task_t*, pointer of current task in the
416 * concurrency
417 */
418 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup) {
419 ms_task_t *task = &c->curr_task;
420
421 while (1) {
422 task->verify = false;
423 task->finish_verify = true;
424 task->get_miss = true;
425
426 if (warmup) {
427 task->cmd = CMD_SET;
428 task->item = ms_get_next_set_item(c);
429
430 return task;
431 }
432
433 /* according to operation distribution to choose doing which operation */
434 ms_select_opt(c, task);
435
436 if (!ms_adjust_opt(c, task)) {
437 continue;
438 }
439
440 if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET)) {
441 ms_task_data_verify_init(task);
442 }
443
444 if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET) && (task->item->exp_time > 0)) {
445 ms_task_expire_verify_init(task);
446 }
447
448 break;
449 }
450
451 /**
452 * Only update get and delete counter, set counter will be
453 * updated after set operation successes.
454 */
455 if (task->cmd == CMD_GET) {
456 task->get_opt++;
457 task->cycle_undo_get--;
458 }
459
460 return task;
461 } /* ms_get_task */
462
463 /**
464 * send a signal to the main monitor thread
465 *
466 * @param sync_lock, pointer of the lock
467 */
468 static void ms_send_signal(ms_sync_lock_t *sync_lock) {
469 pthread_mutex_lock(&sync_lock->lock);
470 sync_lock->count++;
471 pthread_cond_signal(&sync_lock->cond);
472 pthread_mutex_unlock(&sync_lock->lock);
473 } /* ms_send_signal */
474
475 /**
476 * If user only want to do get operation, but there is no object
477 * in server , so we use this function to warmup the server, and
478 * set some objects to server. It runs at the beginning of task.
479 *
480 * @param c, pointer of the concurrency
481 */
482 static void ms_warmup_server(ms_conn_t *c) {
483 ms_task_t *task;
484 ms_task_item_t *item;
485
486 /**
487 * Extra one loop to get the last command returned state.
488 * Normally it gets the previous command returned state.
489 */
490 if ((c->remain_warmup_num >= 0) && (c->remain_warmup_num != c->warmup_num)) {
491 item = ms_get_cur_opt_item(c);
492 /* only update the set command result state for data verification */
493 if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED)) {
494 item->value_offset = item->key_suffix_offset;
495 /* set success, update counter */
496 c->set_cursor++;
497 } else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED) {
498 printf("key: %" PRIx64 " didn't set success\n", item->key_prefix);
499 }
500 }
501
502 /* the last time don't run a task */
503 if (c->remain_warmup_num-- > 0) {
504 /* operate next task item */
505 task = ms_get_task(c, true);
506 item = task->item;
507 ms_mcd_set(c, item);
508 }
509
510 /**
511 * finish warming up server, wait all connects initialize
512 * complete. Then all connects can start do task at the same
513 * time.
514 */
515 if (c->remain_warmup_num == -1) {
516 ms_send_signal(&ms_global.warmup_lock);
517 c->remain_warmup_num--; /* never run the if branch */
518 }
519 } /* ms_warmup_server */
520
521 /**
522 * dispatch single get and set task
523 *
524 * @param c, pointer of the concurrency
525 */
526 static void ms_single_getset_task_sch(ms_conn_t *c) {
527 ms_task_t *task;
528 ms_task_item_t *item;
529
530 /* the last time don't run a task */
531 if (c->remain_exec_num-- > 0) {
532 task = ms_get_task(c, false);
533 item = task->item;
534 if (task->cmd == CMD_SET) {
535 ms_mcd_set(c, item);
536 } else if (task->cmd == CMD_GET) {
537 assert(task->cmd == CMD_GET);
538 ms_mcd_get(c, item);
539 }
540 }
541 } /* ms_single_getset_task_sch */
542
543 /**
544 * dispatch multi-get and set task
545 *
546 * @param c, pointer of the concurrency
547 */
548 static void ms_multi_getset_task_sch(ms_conn_t *c) {
549 ms_task_t *task;
550 ms_mlget_task_item_t *mlget_item;
551
552 while (1) {
553 if (c->remain_exec_num-- > 0) {
554 task = ms_get_task(c, false);
555 if (task->cmd == CMD_SET) /* just do it */ {
556 ms_mcd_set(c, task->item);
557 break;
558 } else {
559 assert(task->cmd == CMD_GET);
560 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
561 mlget_item->item = task->item;
562 mlget_item->verify = task->verify;
563 mlget_item->finish_verify = task->finish_verify;
564 mlget_item->get_miss = task->get_miss;
565 c->mlget_task.mlget_num++;
566
567 /* enough multi-get task items can be done */
568 if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
569 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
570 {
571 ms_mcd_mlget(c);
572 break;
573 }
574 }
575 } else {
576 if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0)) {
577 ms_mcd_mlget(c);
578 }
579 break;
580 }
581 }
582 } /* ms_multi_getset_task_sch */
583
584 /**
585 * calculate the difference value of two time points
586 *
587 * @param start_time, the start time
588 * @param end_time, the end time
589 *
590 * @return uint64_t, the difference value between start_time and end_time in us
591 */
592 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time) {
593 int64_t endtime = end_time->tv_sec * 1000000 + end_time->tv_usec;
594 int64_t starttime = start_time->tv_sec * 1000000 + start_time->tv_usec;
595
596 assert(endtime >= starttime);
597
598 return endtime - starttime;
599 } /* ms_time_diff */
600
601 /**
602 * after get the response from server for multi-get, the
603 * function update the state of the task and do data verify if
604 * necessary.
605 *
606 * @param c, pointer of the concurrency
607 */
608 static void ms_update_multi_get_result(ms_conn_t *c) {
609 ms_mlget_task_item_t *mlget_item;
610 ms_task_item_t *item;
611 char *orignval = NULL;
612 char *orignkey = NULL;
613
614 if (c == NULL) {
615 return;
616 }
617 assert(c);
618
619 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
620 mlget_item = &c->mlget_task.mlget_item[i];
621 item = mlget_item->item;
622 orignval = &ms_setting.char_block[item->value_offset];
623 orignkey = &ms_setting.char_block[item->key_suffix_offset];
624
625 /* update get miss counter */
626 if (mlget_item->get_miss) {
627 atomic_add_size(&ms_stats.get_misses, 1);
628 }
629
630 /* get nothing from server for this task item */
631 if (mlget_item->verify && !mlget_item->finish_verify) {
632 /* verify expire time if necessary */
633 if (item->exp_time > 0) {
634 struct timeval curr_time;
635 gettimeofday(&curr_time, NULL);
636
637 /* object doesn't expire but can't get it now */
638 if (curr_time.tv_sec - item->client_time < item->exp_time - EXPIRE_TIME_ERROR) {
639 atomic_add_size(&ms_stats.unexp_unget, 1);
640
641 if (ms_setting.verbose) {
642 char set_time[64];
643 char cur_time[64];
644 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&item->client_time));
645 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&curr_time.tv_sec));
646 fprintf(stderr,
647 "\n\t<%d expire time verification failed, object "
648 "doesn't expire but can't get it now\n"
649 "\tkey len: %d\n"
650 "\tkey: %" PRIx64 " %.*s\n"
651 "\tset time: %s current time: %s "
652 "diff time: %d expire time: %d\n"
653 "\texpected data len: %d\n"
654 "\texpected data: %.*s\n"
655 "\treceived data: \n",
656 c->sfd, item->key_size, item->key_prefix,
657 item->key_size - (int) KEY_PREFIX_SIZE, orignkey, set_time, cur_time,
658 (int) (curr_time.tv_sec - item->client_time), item->exp_time, item->value_size,
659 item->value_size, orignval);
660 fflush(stderr);
661 }
662 }
663 } else {
664 atomic_add_size(&ms_stats.vef_miss, 1);
665
666 if (ms_setting.verbose) {
667 fprintf(stderr,
668 "\n<%d data verification failed\n"
669 "\tkey len: %d\n"
670 "\tkey: %" PRIx64 " %.*s\n"
671 "\texpected data len: %d\n"
672 "\texpected data: %.*s\n"
673 "\treceived data: \n",
674 c->sfd, item->key_size, item->key_prefix, item->key_size - (int) KEY_PREFIX_SIZE,
675 orignkey, item->value_size, item->value_size, orignval);
676 fflush(stderr);
677 }
678 }
679 }
680 }
681 c->mlget_task.mlget_num = 0;
682 c->mlget_task.value_index = INVALID_OFFSET;
683 } /* ms_update_multi_get_result */
684
685 /**
686 * after get the response from server for single get, the
687 * function update the state of the task and do data verify if
688 * necessary.
689 *
690 * @param c, pointer of the concurrency
691 * @param item, pointer of task item which includes the object
692 * information
693 */
694 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item) {
695 char *orignval = NULL;
696 char *orignkey = NULL;
697
698 if ((c == NULL) || (item == NULL)) {
699 return;
700 }
701 assert(c);
702 assert(item);
703
704 orignval = &ms_setting.char_block[item->value_offset];
705 orignkey = &ms_setting.char_block[item->key_suffix_offset];
706
707 /* update get miss counter */
708 if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss) {
709 atomic_add_size(&ms_stats.get_misses, 1);
710 }
711
712 /* get nothing from server for this task item */
713 if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify && !c->curr_task.finish_verify) {
714 /* verify expire time if necessary */
715 if (item->exp_time > 0) {
716 struct timeval curr_time;
717 gettimeofday(&curr_time, NULL);
718
719 /* object doesn't expire but can't get it now */
720 if (curr_time.tv_sec - item->client_time < item->exp_time - EXPIRE_TIME_ERROR) {
721 atomic_add_size(&ms_stats.unexp_unget, 1);
722
723 if (ms_setting.verbose) {
724 char set_time[64];
725 char cur_time[64];
726 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&item->client_time));
727 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&curr_time.tv_sec));
728 fprintf(stderr,
729 "\n\t<%d expire time verification failed, object "
730 "doesn't expire but can't get it now\n"
731 "\tkey len: %d\n"
732 "\tkey: %" PRIx64 " %.*s\n"
733 "\tset time: %s current time: %s "
734 "diff time: %d expire time: %d\n"
735 "\texpected data len: %d\n"
736 "\texpected data: %.*s\n"
737 "\treceived data: \n",
738 c->sfd, item->key_size, item->key_prefix, item->key_size - (int) KEY_PREFIX_SIZE,
739 orignkey, set_time, cur_time, (int) (curr_time.tv_sec - item->client_time),
740 item->exp_time, item->value_size, item->value_size, orignval);
741 fflush(stderr);
742 }
743 }
744 } else {
745 atomic_add_size(&ms_stats.vef_miss, 1);
746
747 if (ms_setting.verbose) {
748 fprintf(stderr,
749 "\n<%d data verification failed\n"
750 "\tkey len: %d\n"
751 "\tkey: %" PRIx64 " %.*s\n"
752 "\texpected data len: %d\n"
753 "\texpected data: %.*s\n"
754 "\treceived data: \n",
755 c->sfd, item->key_size, item->key_prefix, item->key_size - (int) KEY_PREFIX_SIZE,
756 orignkey, item->value_size, item->value_size, orignval);
757 fflush(stderr);
758 }
759 }
760 }
761 } /* ms_update_single_get_result */
762
763 /**
764 * after get the response from server for set the function
765 * update the state of the task and do data verify if necessary.
766 *
767 * @param c, pointer of the concurrency
768 * @param item, pointer of task item which includes the object
769 * information
770 */
771 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item) {
772 if ((c == NULL) || (item == NULL)) {
773 return;
774 }
775 assert(c);
776 assert(item);
777
778 if (c->precmd.cmd == CMD_SET) {
779 switch (c->precmd.retstat) {
780 case MCD_STORED:
781 if (item->value_offset == INVALID_OFFSET) {
782 /* first set with the same offset of key suffix */
783 item->value_offset = item->key_suffix_offset;
784 } else {
785 /* not first set, just increase the value offset */
786 item->value_offset += 1;
787 }
788
789 /* set successes, update counter */
790 c->set_cursor++;
791 c->curr_task.set_opt++;
792 c->curr_task.cycle_undo_set--;
793 break;
794
795 case MCD_SERVER_ERROR:
796 default:
797 break;
798 } /* switch */
799 }
800 } /* ms_update_set_result */
801
802 /**
803 * update the response time result
804 *
805 * @param c, pointer of the concurrency
806 */
807 static void ms_update_stat_result(ms_conn_t *c) {
808 bool get_miss = false;
809
810 if (c == NULL) {
811 return;
812 }
813 assert(c);
814
815 gettimeofday(&c->end_time, NULL);
816 uint64_t time_diff = (uint64_t) ms_time_diff(&c->start_time, &c->end_time);
817
818 pthread_mutex_lock(&ms_statistic.stat_mutex);
819
820 switch (c->precmd.cmd) {
821 case CMD_SET:
822 ms_record_event(&ms_statistic.set_stat, time_diff, false);
823 break;
824
825 case CMD_GET:
826 if (c->curr_task.get_miss) {
827 get_miss = true;
828 }
829 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
830 break;
831
832 default:
833 break;
834 } /* switch */
835
836 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
837 pthread_mutex_unlock(&ms_statistic.stat_mutex);
838 } /* ms_update_stat_result */
839
840 /**
841 * after get response from server for the current operation, and
842 * before doing the next operation, update the state of the
843 * current operation.
844 *
845 * @param c, pointer of the concurrency
846 */
847 static void ms_update_task_result(ms_conn_t *c) {
848 ms_task_item_t *item;
849
850 if (c == NULL) {
851 return;
852 }
853 assert(c);
854
855 item = ms_get_cur_opt_item(c);
856 if (item == NULL) {
857 return;
858 }
859 assert(item);
860
861 ms_update_set_result(c, item);
862
863 if ((ms_setting.stat_freq > 0) && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET))) {
864 ms_update_stat_result(c);
865 }
866
867 /* update multi-get task item */
868 if (((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
869 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
870 {
871 ms_update_multi_get_result(c);
872 } else {
873 ms_update_single_get_result(c, item);
874 }
875 } /* ms_update_task_result */
876
877 /**
878 * run get and set operation
879 *
880 * @param c, pointer of the concurrency
881 *
882 * @return int, if success, return EXIT_SUCCESS, else return -1
883 */
884 static int ms_run_getset_task(ms_conn_t *c) {
885 /**
886 * extra one loop to get the last command return state. get the
887 * last command return state.
888 */
889 if ((c->remain_exec_num >= 0) && (c->remain_exec_num != c->exec_num)) {
890 ms_update_task_result(c);
891 }
892
893 /* multi-get */
894 if (ms_setting.mult_key_num > 1) {
895 /* operate next task item */
896 ms_multi_getset_task_sch(c);
897 } else {
898 /* operate next task item */
899 ms_single_getset_task_sch(c);
900 }
901
902 /* no task to do, exit */
903 if ((c->remain_exec_num == -1) || ms_global.time_out) {
904 return -1;
905 }
906
907 return EXIT_SUCCESS;
908 } /* ms_run_getset_task */
909
910 /**
911 * the state machine call the function to execute task.
912 *
913 * @param c, pointer of the concurrency
914 *
915 * @return int, if success, return EXIT_SUCCESS, else return -1
916 */
917 int ms_exec_task(struct conn *c) {
918 if (!ms_global.finish_warmup) {
919 ms_warmup_server(c);
920 } else {
921 if (ms_run_getset_task(c)) {
922 return -1;
923 }
924 }
925
926 return EXIT_SUCCESS;
927 } /* ms_exec_task */