Update for release
[m6w6/libmemcached] / clients / ms_task.c
1 /*
2 * File: ms_task.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <inttypes.h>
15 #if TIME_WITH_SYS_TIME
16 # include <sys/time.h>
17 # include <time.h>
18 #else
19 # if HAVE_SYS_TIME_H
20 # include <sys/time.h>
21 # else
22 # include <time.h>
23 # endif
24 #endif
25
26 #include "ms_thread.h"
27 #include "ms_setting.h"
28 #include "ms_atomic.h"
29
30 /* command distribution adjustment cycle */
31 #define CMD_DISTR_ADJUST_CYCLE 1000
32 #define DISADJUST_FACTOR 0.03 /**
33 * In one adjustment cycle, if undo set or get
34 * operations proportion is more than 3% , means
35 * there are too many new item or need more new
36 * item in the window. This factor shows it.
37 */
38
39 /* get item from task window */
40 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
41 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
42 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
43 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c);
44
45
46 /* select next operation to do */
47 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
48
49
50 /* set and get speed estimate for controlling and adjustment */
51 static bool ms_is_set_too_fast(ms_task_t *task);
52 static bool ms_is_get_too_fast(ms_task_t *task);
53 static void ms_kick_out_item(ms_task_item_t *item);
54
55
56 /* miss rate adjustment */
57 static bool ms_need_overwrite_item(ms_task_t *task);
58 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
59
60
61 /* deal with data verification initialization */
62 static void ms_task_data_verify_init(ms_task_t *task);
63 static void ms_task_expire_verify_init(ms_task_t *task);
64
65
66 /* select a new task to do */
67 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
68
69
70 /* run the selected task */
71 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
72 static void ms_update_stat_result(ms_conn_t *c);
73 static void ms_update_multi_get_result(ms_conn_t *c);
74 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
75 static void ms_update_task_result(ms_conn_t *c);
76 static void ms_single_getset_task_sch(ms_conn_t *c);
77 static void ms_multi_getset_task_sch(ms_conn_t *c);
78 static void ms_send_signal(ms_sync_lock_t *sync_lock);
79 static void ms_warmup_server(ms_conn_t *c);
80 static int ms_run_getset_task(ms_conn_t *c);
81
82
83 /**
84 * used to get the current operation item(object)
85 *
86 * @param c, pointer of the concurrency
87 *
88 * @return ms_task_item_t*, current operating item
89 */
90 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
91 {
92 return c->curr_task.item;
93 }
94
95
96 /**
97 * used to get the next item to do get operation
98 *
99 * @param c, pointer of the concurrency
100 *
101 * @return ms_task_item_t*, the pointer of the next item to do
102 * get operation
103 */
104 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
105 {
106 ms_task_item_t *item= NULL;
107
108 if (c->set_cursor <= 0)
109 {
110 /* the first item in the window */
111 item= &c->item_win[0];
112 }
113 else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size)
114 {
115 /* random get one item set before */
116 item= &c->item_win[random() % (int64_t)c->set_cursor];
117 }
118 else
119 {
120 /* random get one item from the window */
121 item= &c->item_win[random() % c->win_size];
122 }
123
124 return item;
125 } /* ms_get_next_get_item */
126
127
128 /**
129 * used to get the next item to do set operation
130 *
131 * @param c, pointer of the concurrency
132 *
133 * @return ms_task_item_t*, the pointer of the next item to do
134 * set operation
135 */
136 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
137 {
138 /**
139 * when a set command successes, the cursor will plus 1. If set
140 * fails, the cursor doesn't change. it isn't necessary to
141 * increase the cursor here.
142 */
143 return &c->item_win[(int64_t)c->set_cursor % c->win_size];
144 }
145
146
147 /**
148 * If we need do overwrite, we could select a item set before.
149 * This function is used to get a item set before to do
150 * overwrite.
151 *
152 * @param c, pointer of the concurrency
153 *
154 * @return ms_task_item_t*, the pointer of the previous item of
155 * set operation
156 */
157 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c)
158 {
159 return ms_get_next_get_item(c);
160 } /* ms_get_random_overwrite_item */
161
162 /**
163 * According to the proportion of operations(get or set), select
164 * an operation to do.
165 *
166 * @param c, pointer of the concurrency
167 * @param task, pointer of current task in the concurrency
168 */
169 static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
170 {
171 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
172 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
173
174 /* update cycle operation number if necessary */
175 if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0))
176 {
177 task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
178 task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
179 }
180
181 /**
182 * According to operation distribution to choose doing which
183 * operation. If it can't set new object to sever, just change
184 * to do get operation.
185 */
186 if ((set_prop > PROP_ERROR)
187 && ((double)task->get_opt * set_prop >= (double)task->set_opt
188 * get_prop))
189 {
190 task->cmd= CMD_SET;
191 task->item= ms_get_next_set_item(c);
192 }
193 else
194 {
195 task->cmd= CMD_GET;
196 task->item= ms_get_next_get_item(c);
197 }
198 } /* ms_select_opt */
199
200
201 /**
202 * used to judge whether the number of get operations done is
203 * more than expected number of get operations to do right now.
204 *
205 * @param task, pointer of current task in the concurrency
206 *
207 * @return bool, if get too fast, return true, else return false
208 */
209 static bool ms_is_get_too_fast(ms_task_t *task)
210 {
211 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
212 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
213
214 /* no get operation */
215 if (get_prop < PROP_ERROR)
216 {
217 return false;
218 }
219
220 int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR))
221 * task->cycle_undo_get;
222
223 if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop)
224 && (task->cycle_undo_set > max_undo_set))
225 {
226 return true;
227 }
228
229 return false;
230 } /* ms_is_get_too_fast */
231
232
233 /**
234 * used to judge whether the number of set operations done is
235 * more than expected number of set operations to do right now.
236 *
237 * @param task, pointer of current task in the concurrency
238 *
239 * @return bool, if set too fast, return true, else return false
240 */
241 static bool ms_is_set_too_fast(ms_task_t *task)
242 {
243 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
244 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
245
246 /* no set operation */
247 if (set_prop < PROP_ERROR)
248 {
249 return false;
250 }
251
252 /* If it does set operation too fast, skip some */
253 int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
254 * (double)task->cycle_undo_set);
255
256 if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop)
257 && (task->cycle_undo_get > max_undo_get))
258 {
259 return true;
260 }
261
262 return false;
263 } /* ms_is_set_too_fast */
264
265
266 /**
267 * kick out the old item in the window, and add a new item to
268 * overwrite the old item. When we don't want to do overwrite
269 * object, and the current item to do set operation is an old
270 * item, we could kick out the old item and add a new item. Then
271 * we can ensure we set new object every time.
272 *
273 * @param item, pointer of task item which includes the object
274 * information
275 */
276 static void ms_kick_out_item(ms_task_item_t *item)
277 {
278 /* allocate a new item */
279 item->key_prefix= ms_get_key_prefix();
280
281 item->key_suffix_offset++;
282 item->value_offset= INVALID_OFFSET; /* new item use invalid value offset */
283 item->client_time= 0;
284 } /* ms_kick_out_item */
285
286
287 /**
288 * used to judge whether we need overwrite object based on the
289 * options user specified
290 *
291 * @param task, pointer of current task in the concurrency
292 *
293 * @return bool, if need overwrite, return true, else return
294 * false
295 */
296 static bool ms_need_overwrite_item(ms_task_t *task)
297 {
298 ms_task_item_t *item= task->item;
299
300 assert(item != NULL);
301 assert(task->cmd == CMD_SET);
302
303 /**
304 * according to data overwrite percent to determine if do data
305 * overwrite.
306 */
307 if (task->overwrite_set < (double)task->set_opt
308 * ms_setting.overwrite_percent)
309 {
310 return true;
311 }
312
313 return false;
314 } /* ms_need_overwirte_item */
315
316
317 /**
318 * used to adjust operation. the function must be called after
319 * select operation. the function change get operation to set
320 * operation, or set operation to get operation based on the
321 * current case.
322 *
323 * @param c, pointer of the concurrency
324 * @param task, pointer of current task in the concurrency
325 *
326 * @return bool, if success, return true, else return false
327 */
328 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
329 {
330 ms_task_item_t *item= task->item;
331
332 assert(item != NULL);
333
334 if (task->cmd == CMD_SET)
335 {
336 /* If did set operation too fast, skip some */
337 if (ms_is_set_too_fast(task))
338 {
339 /* get the item instead */
340 if (item->value_offset != INVALID_OFFSET)
341 {
342 task->cmd= CMD_GET;
343 return true;
344 }
345 }
346
347 /* If the current item is not a new item, kick it out */
348 if (item->value_offset != INVALID_OFFSET)
349 {
350 if (ms_need_overwrite_item(task))
351 {
352 /* overwrite */
353 task->overwrite_set++;
354 }
355 else
356 {
357 /* kick out the current item to do set operation */
358 ms_kick_out_item(item);
359 }
360 }
361 else /* it's a new item */
362 {
363 /* need overwrite */
364 if (ms_need_overwrite_item(task))
365 {
366 /**
367 * overwrite not use the item with current set cursor, revert
368 * set cursor.
369 */
370 c->set_cursor--;
371
372 item= ms_get_random_overwrite_item(c);
373 if (item->value_offset != INVALID_OFFSET)
374 {
375 task->item= item;
376 task->overwrite_set++;
377 }
378 else /* item is a new item */
379 {
380 /* select the item to run, and cancel overwrite */
381 task->item= item;
382 }
383 }
384 }
385 task->cmd= CMD_SET;
386 return true;
387 }
388 else
389 {
390 if (item->value_offset == INVALID_OFFSET)
391 {
392 task->cmd= CMD_SET;
393 return true;
394 }
395
396 /**
397 * If It does get operation too fast, it will change the
398 * operation to set.
399 */
400 if (ms_is_get_too_fast(task))
401 {
402 /* don't kick out the first item in the window */
403 if (! ms_is_set_too_fast(task))
404 {
405 ms_kick_out_item(item);
406 task->cmd= CMD_SET;
407 return true;
408 }
409 else
410 {
411 return false;
412 }
413 }
414
415 assert(item->value_offset != INVALID_OFFSET);
416
417 task->cmd= CMD_GET;
418 return true;
419 }
420 } /* ms_adjust_opt */
421
422
423 /**
424 * used to initialize the task which need verify data.
425 *
426 * @param task, pointer of current task in the concurrency
427 */
428 static void ms_task_data_verify_init(ms_task_t *task)
429 {
430 ms_task_item_t *item= task->item;
431
432 assert(item != NULL);
433 assert(task->cmd == CMD_GET);
434
435 /**
436 * according to data verification percent to determine if do
437 * data verification.
438 */
439 if (task->verified_get < (double)task->get_opt
440 * ms_setting.verify_percent)
441 {
442 /**
443 * currently it doesn't do verify, just increase the counter,
444 * and do verification next proper get command
445 */
446 if ((task->item->value_offset != INVALID_OFFSET)
447 && (item->exp_time == 0))
448 {
449 task->verify= true;
450 task->finish_verify= false;
451 task->verified_get++;
452 }
453 }
454 } /* ms_task_data_verify_init */
455
456
457 /**
458 * used to initialize the task which need verify expire time.
459 *
460 * @param task, pointer of current task in the concurrency
461 */
462 static void ms_task_expire_verify_init(ms_task_t *task)
463 {
464 ms_task_item_t *item= task->item;
465
466 assert(item != NULL);
467 assert(task->cmd == CMD_GET);
468 assert(item->exp_time > 0);
469
470 task->verify= true;
471 task->finish_verify= false;
472 } /* ms_task_expire_verify_init */
473
474
475 /**
476 * used to get one task, the function initializes the task
477 * structure.
478 *
479 * @param c, pointer of the concurrency
480 * @param warmup, whether it need warmup
481 *
482 * @return ms_task_t*, pointer of current task in the
483 * concurrency
484 */
485 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
486 {
487 ms_task_t *task= &c->curr_task;
488
489 while (1)
490 {
491 task->verify= false;
492 task->finish_verify= true;
493 task->get_miss= true;
494
495 if (warmup)
496 {
497 task->cmd= CMD_SET;
498 task->item= ms_get_next_set_item(c);
499
500 return task;
501 }
502
503 /* according to operation distribution to choose doing which operation */
504 ms_select_opt(c, task);
505
506 if (! ms_adjust_opt(c, task))
507 {
508 continue;
509 }
510
511 if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET))
512 {
513 ms_task_data_verify_init(task);
514 }
515
516 if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET)
517 && (task->item->exp_time > 0))
518 {
519 ms_task_expire_verify_init(task);
520 }
521
522 break;
523 }
524
525 /**
526 * Only update get and delete counter, set counter will be
527 * updated after set operation successes.
528 */
529 if (task->cmd == CMD_GET)
530 {
531 task->get_opt++;
532 task->cycle_undo_get--;
533 }
534
535 return task;
536 } /* ms_get_task */
537
538
539 /**
540 * send a signal to the main monitor thread
541 *
542 * @param sync_lock, pointer of the lock
543 */
544 static void ms_send_signal(ms_sync_lock_t *sync_lock)
545 {
546 pthread_mutex_lock(&sync_lock->lock);
547 sync_lock->count++;
548 pthread_cond_signal(&sync_lock->cond);
549 pthread_mutex_unlock(&sync_lock->lock);
550 } /* ms_send_signal */
551
552
553 /**
554 * If user only want to do get operation, but there is no object
555 * in server , so we use this function to warmup the server, and
556 * set some objects to server. It runs at the beginning of task.
557 *
558 * @param c, pointer of the concurrency
559 */
560 static void ms_warmup_server(ms_conn_t *c)
561 {
562 ms_task_t *task;
563 ms_task_item_t *item;
564
565 /**
566 * Extra one loop to get the last command returned state.
567 * Normally it gets the previous command returned state.
568 */
569 if ((c->remain_warmup_num >= 0)
570 && (c->remain_warmup_num != c->warmup_num))
571 {
572 item= ms_get_cur_opt_item(c);
573 /* only update the set command result state for data verification */
574 if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED))
575 {
576 item->value_offset= item->key_suffix_offset;
577 /* set success, update counter */
578 c->set_cursor++;
579 }
580 else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED)
581 {
582 printf("key: %" PRIx64 " didn't set success\n", item->key_prefix);
583 }
584 }
585
586 /* the last time don't run a task */
587 if (c->remain_warmup_num-- > 0)
588 {
589 /* operate next task item */
590 task= ms_get_task(c, true);
591 item= task->item;
592 ms_mcd_set(c, item);
593 }
594
595 /**
596 * finish warming up server, wait all connects initialize
597 * complete. Then all connects can start do task at the same
598 * time.
599 */
600 if (c->remain_warmup_num == -1)
601 {
602 ms_send_signal(&ms_global.warmup_lock);
603 c->remain_warmup_num--; /* never run the if branch */
604 }
605 } /* ms_warmup_server */
606
607
608 /**
609 * dispatch single get and set task
610 *
611 * @param c, pointer of the concurrency
612 */
613 static void ms_single_getset_task_sch(ms_conn_t *c)
614 {
615 ms_task_t *task;
616 ms_task_item_t *item;
617
618 /* the last time don't run a task */
619 if (c->remain_exec_num-- > 0)
620 {
621 task= ms_get_task(c, false);
622 item= task->item;
623 if (task->cmd == CMD_SET)
624 {
625 ms_mcd_set(c, item);
626 }
627 else if (task->cmd == CMD_GET)
628 {
629 assert(task->cmd == CMD_GET);
630 ms_mcd_get(c, item);
631 }
632 }
633 } /* ms_single_getset_task_sch */
634
635
636 /**
637 * dispatch multi-get and set task
638 *
639 * @param c, pointer of the concurrency
640 */
641 static void ms_multi_getset_task_sch(ms_conn_t *c)
642 {
643 ms_task_t *task;
644 ms_mlget_task_item_t *mlget_item;
645
646 while (1)
647 {
648 if (c->remain_exec_num-- > 0)
649 {
650 task= ms_get_task(c, false);
651 if (task->cmd == CMD_SET) /* just do it */
652 {
653 ms_mcd_set(c, task->item);
654 break;
655 }
656 else
657 {
658 assert(task->cmd == CMD_GET);
659 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
660 mlget_item->item= task->item;
661 mlget_item->verify= task->verify;
662 mlget_item->finish_verify= task->finish_verify;
663 mlget_item->get_miss= task->get_miss;
664 c->mlget_task.mlget_num++;
665
666 /* enough multi-get task items can be done */
667 if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
668 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
669 {
670 ms_mcd_mlget(c);
671 break;
672 }
673 }
674 }
675 else
676 {
677 if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0))
678 {
679 ms_mcd_mlget(c);
680 }
681 break;
682 }
683 }
684 } /* ms_multi_getset_task_sch */
685
686
687 /**
688 * calculate the difference value of two time points
689 *
690 * @param start_time, the start time
691 * @param end_time, the end time
692 *
693 * @return uint64_t, the difference value between start_time and end_time in us
694 */
695 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
696 {
697 int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec;
698 int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec;
699
700 assert(endtime >= starttime);
701
702 return endtime - starttime;
703 } /* ms_time_diff */
704
705
706 /**
707 * after get the response from server for multi-get, the
708 * function update the state of the task and do data verify if
709 * necessary.
710 *
711 * @param c, pointer of the concurrency
712 */
713 static void ms_update_multi_get_result(ms_conn_t *c)
714 {
715 ms_mlget_task_item_t *mlget_item;
716 ms_task_item_t *item;
717 char *orignval= NULL;
718 char *orignkey= NULL;
719
720 if (c == NULL)
721 {
722 return;
723 }
724 assert(c != NULL);
725
726 for (int i= 0; i < c->mlget_task.mlget_num; i++)
727 {
728 mlget_item= &c->mlget_task.mlget_item[i];
729 item= mlget_item->item;
730 orignval= &ms_setting.char_block[item->value_offset];
731 orignkey= &ms_setting.char_block[item->key_suffix_offset];
732
733 /* update get miss counter */
734 if (mlget_item->get_miss)
735 {
736 atomic_add_size(&ms_stats.get_misses, 1);
737 }
738
739 /* get nothing from server for this task item */
740 if (mlget_item->verify && ! mlget_item->finish_verify)
741 {
742 /* verify expire time if necessary */
743 if (item->exp_time > 0)
744 {
745 struct timeval curr_time;
746 gettimeofday(&curr_time, NULL);
747
748 /* object doesn't expire but can't get it now */
749 if (curr_time.tv_sec - item->client_time
750 < item->exp_time - EXPIRE_TIME_ERROR)
751 {
752 atomic_add_size(&ms_stats.unexp_unget, 1);
753
754 if (ms_setting.verbose)
755 {
756 char set_time[64];
757 char cur_time[64];
758 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
759 localtime(&item->client_time));
760 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
761 localtime(&curr_time.tv_sec));
762 fprintf(stderr,
763 "\n\t<%d expire time verification failed, object "
764 "doesn't expire but can't get it now\n"
765 "\tkey len: %d\n"
766 "\tkey: %" PRIx64 " %.*s\n"
767 "\tset time: %s current time: %s "
768 "diff time: %d expire time: %d\n"
769 "\texpected data len: %d\n"
770 "\texpected data: %.*s\n"
771 "\treceived data: \n",
772 c->sfd,
773 item->key_size,
774 item->key_prefix,
775 item->key_size - (int)KEY_PREFIX_SIZE,
776 orignkey,
777 set_time,
778 cur_time,
779 (int)(curr_time.tv_sec - item->client_time),
780 item->exp_time,
781 item->value_size,
782 item->value_size,
783 orignval);
784 fflush(stderr);
785 }
786 }
787 }
788 else
789 {
790 atomic_add_size(&ms_stats.vef_miss, 1);
791
792 if (ms_setting.verbose)
793 {
794 fprintf(stderr, "\n<%d data verification failed\n"
795 "\tkey len: %d\n"
796 "\tkey: %" PRIx64 " %.*s\n"
797 "\texpected data len: %d\n"
798 "\texpected data: %.*s\n"
799 "\treceived data: \n",
800 c->sfd, item->key_size, item->key_prefix,
801 item->key_size - (int)KEY_PREFIX_SIZE,
802 orignkey, item->value_size, item->value_size, orignval);
803 fflush(stderr);
804 }
805 }
806 }
807 }
808 c->mlget_task.mlget_num= 0;
809 c->mlget_task.value_index= INVALID_OFFSET;
810 } /* ms_update_multi_get_result */
811
812
813 /**
814 * after get the response from server for single get, the
815 * function update the state of the task and do data verify if
816 * necessary.
817 *
818 * @param c, pointer of the concurrency
819 * @param item, pointer of task item which includes the object
820 * information
821 */
822 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
823 {
824 char *orignval= NULL;
825 char *orignkey= NULL;
826
827 if ((c == NULL) || (item == NULL))
828 {
829 return;
830 }
831 assert(c != NULL);
832 assert(item != NULL);
833
834 orignval= &ms_setting.char_block[item->value_offset];
835 orignkey= &ms_setting.char_block[item->key_suffix_offset];
836
837 /* update get miss counter */
838 if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
839 {
840 atomic_add_size(&ms_stats.get_misses, 1);
841 }
842
843 /* get nothing from server for this task item */
844 if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify
845 && ! c->curr_task.finish_verify)
846 {
847 /* verify expire time if necessary */
848 if (item->exp_time > 0)
849 {
850 struct timeval curr_time;
851 gettimeofday(&curr_time, NULL);
852
853 /* object doesn't expire but can't get it now */
854 if (curr_time.tv_sec - item->client_time
855 < item->exp_time - EXPIRE_TIME_ERROR)
856 {
857 atomic_add_size(&ms_stats.unexp_unget, 1);
858
859 if (ms_setting.verbose)
860 {
861 char set_time[64];
862 char cur_time[64];
863 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
864 localtime(&item->client_time));
865 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
866 localtime(&curr_time.tv_sec));
867 fprintf(stderr,
868 "\n\t<%d expire time verification failed, object "
869 "doesn't expire but can't get it now\n"
870 "\tkey len: %d\n"
871 "\tkey: %" PRIx64 " %.*s\n"
872 "\tset time: %s current time: %s "
873 "diff time: %d expire time: %d\n"
874 "\texpected data len: %d\n"
875 "\texpected data: %.*s\n"
876 "\treceived data: \n",
877 c->sfd,
878 item->key_size,
879 item->key_prefix,
880 item->key_size - (int)KEY_PREFIX_SIZE,
881 orignkey,
882 set_time,
883 cur_time,
884 (int)(curr_time.tv_sec - item->client_time),
885 item->exp_time,
886 item->value_size,
887 item->value_size,
888 orignval);
889 fflush(stderr);
890 }
891 }
892 }
893 else
894 {
895 atomic_add_size(&ms_stats.vef_miss, 1);
896
897 if (ms_setting.verbose)
898 {
899 fprintf(stderr, "\n<%d data verification failed\n"
900 "\tkey len: %d\n"
901 "\tkey: %" PRIx64 " %.*s\n"
902 "\texpected data len: %d\n"
903 "\texpected data: %.*s\n"
904 "\treceived data: \n",
905 c->sfd, item->key_size, item->key_prefix,
906 item->key_size - (int)KEY_PREFIX_SIZE,
907 orignkey, item->value_size, item->value_size, orignval);
908 fflush(stderr);
909 }
910 }
911 }
912 } /* ms_update_single_get_result */
913
914
915 /**
916 * after get the response from server for set the function
917 * update the state of the task and do data verify if necessary.
918 *
919 * @param c, pointer of the concurrency
920 * @param item, pointer of task item which includes the object
921 * information
922 */
923 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
924 {
925 if ((c == NULL) || (item == NULL))
926 {
927 return;
928 }
929 assert(c != NULL);
930 assert(item != NULL);
931
932 if (c->precmd.cmd == CMD_SET)
933 {
934 switch (c->precmd.retstat)
935 {
936 case MCD_STORED:
937 if (item->value_offset == INVALID_OFFSET)
938 {
939 /* first set with the same offset of key suffix */
940 item->value_offset= item->key_suffix_offset;
941 }
942 else
943 {
944 /* not first set, just increase the value offset */
945 item->value_offset+= 1;
946 }
947
948 /* set successes, update counter */
949 c->set_cursor++;
950 c->curr_task.set_opt++;
951 c->curr_task.cycle_undo_set--;
952 break;
953
954 case MCD_SERVER_ERROR:
955 default:
956 break;
957 } /* switch */
958 }
959 } /* ms_update_set_result */
960
961
962 /**
963 * update the response time result
964 *
965 * @param c, pointer of the concurrency
966 */
967 static void ms_update_stat_result(ms_conn_t *c)
968 {
969 bool get_miss= false;
970
971 if (c == NULL)
972 {
973 return;
974 }
975 assert(c != NULL);
976
977 gettimeofday(&c->end_time, NULL);
978 uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time);
979
980 pthread_mutex_lock(&ms_statistic.stat_mutex);
981
982 switch (c->precmd.cmd)
983 {
984 case CMD_SET:
985 ms_record_event(&ms_statistic.set_stat, time_diff, false);
986 break;
987
988 case CMD_GET:
989 if (c->curr_task.get_miss)
990 {
991 get_miss= true;
992 }
993 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
994 break;
995
996 default:
997 break;
998 } /* switch */
999
1000 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
1001 pthread_mutex_unlock(&ms_statistic.stat_mutex);
1002 } /* ms_update_stat_result */
1003
1004
1005 /**
1006 * after get response from server for the current operation, and
1007 * before doing the next operation, update the state of the
1008 * current operation.
1009 *
1010 * @param c, pointer of the concurrency
1011 */
1012 static void ms_update_task_result(ms_conn_t *c)
1013 {
1014 ms_task_item_t *item;
1015
1016 if (c == NULL)
1017 {
1018 return;
1019 }
1020 assert(c != NULL);
1021
1022 item= ms_get_cur_opt_item(c);
1023 if (item == NULL)
1024 {
1025 return;
1026 }
1027 assert(item != NULL);
1028
1029 ms_update_set_result(c, item);
1030
1031 if ((ms_setting.stat_freq > 0)
1032 && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET)))
1033 {
1034 ms_update_stat_result(c);
1035 }
1036
1037 /* update multi-get task item */
1038 if (((ms_setting.mult_key_num > 1)
1039 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1040 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1041 {
1042 ms_update_multi_get_result(c);
1043 }
1044 else
1045 {
1046 ms_update_single_get_result(c, item);
1047 }
1048 } /* ms_update_task_result */
1049
1050
1051 /**
1052 * run get and set operation
1053 *
1054 * @param c, pointer of the concurrency
1055 *
1056 * @return int, if success, return EXIT_SUCCESS, else return -1
1057 */
1058 static int ms_run_getset_task(ms_conn_t *c)
1059 {
1060 /**
1061 * extra one loop to get the last command return state. get the
1062 * last command return state.
1063 */
1064 if ((c->remain_exec_num >= 0)
1065 && (c->remain_exec_num != c->exec_num))
1066 {
1067 ms_update_task_result(c);
1068 }
1069
1070 /* multi-get */
1071 if (ms_setting.mult_key_num > 1)
1072 {
1073 /* operate next task item */
1074 ms_multi_getset_task_sch(c);
1075 }
1076 else
1077 {
1078 /* operate next task item */
1079 ms_single_getset_task_sch(c);
1080 }
1081
1082 /* no task to do, exit */
1083 if ((c->remain_exec_num == -1) || ms_global.time_out)
1084 {
1085 return -1;
1086 }
1087
1088 return EXIT_SUCCESS;
1089 } /* ms_run_getset_task */
1090
1091
1092 /**
1093 * the state machine call the function to execute task.
1094 *
1095 * @param c, pointer of the concurrency
1096 *
1097 * @return int, if success, return EXIT_SUCCESS, else return -1
1098 */
1099 int ms_exec_task(struct conn *c)
1100 {
1101 if (! ms_global.finish_warmup)
1102 {
1103 ms_warmup_server(c);
1104 }
1105 else
1106 {
1107 if (ms_run_getset_task(c) != 0)
1108 {
1109 return -1;
1110 }
1111 }
1112
1113 return EXIT_SUCCESS;
1114 } /* ms_exec_task */