3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
11 #include "ms_thread.h"
12 #include "ms_setting.h"
14 /* command distribution adjustment cycle */
15 #define CMD_DISTR_ADJUST_CYCLE 1000
16 #define DISADJUST_FACTOR 0.03 /**
17 * In one adjustment cycle, if undo set or get
18 * operations proportion is more than 3% , means
19 * there are too many new item or need more new
20 * item in the window. This factor shows it.
23 extern __thread ms_thread_t ms_thread
;
25 /* get item from task window */
26 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
);
27 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
);
28 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
);
29 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
);
31 /* select next operation to do */
32 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
);
34 /* set and get speed estimate for controlling and adjustment */
35 static bool ms_is_set_too_fast(ms_task_t
*task
);
36 static bool ms_is_get_too_fast(ms_task_t
*task
);
37 static void ms_kick_out_item(ms_task_item_t
*item
);
39 /* miss rate adjustment */
40 static bool ms_need_overwirte_item(ms_task_t
*task
);
41 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
);
43 /* deal with data verification initialization */
44 static void ms_task_data_verify_init(ms_task_t
*task
);
45 static void ms_task_expire_verify_init(ms_task_t
*task
);
47 /* select a new task to do */
48 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
);
50 /* run the selected task */
51 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
);
52 static void ms_update_stat_result(ms_conn_t
*c
);
53 static void ms_update_multi_get_result(ms_conn_t
*c
);
54 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
);
55 static void ms_update_task_result(ms_conn_t
*c
);
56 static void ms_single_getset_task_sch(ms_conn_t
*c
);
57 static void ms_multi_getset_task_sch(ms_conn_t
*c
);
58 static void ms_send_signal(ms_sync_lock_t
*sync_lock
);
59 static void ms_warmup_server(ms_conn_t
*c
);
60 static int ms_run_getset_task(ms_conn_t
*c
);
63 * used to get the current operation item(object)
65 * @param c, pointer of the concurrency
67 * @return ms_task_item_t*, current operating item
69 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
)
71 return(c
->curr_task
.item
);
75 * used to get the next item to do get operation
77 * @param c, pointer of the concurrency
79 * @return ms_task_item_t*, the pointer of the next item to do
82 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
)
84 ms_task_item_t
*item
= NULL
;
86 if (c
->set_cursor
<= 0) {
87 /* the first item in the window */
88 item
= &c
->item_win
[0];
89 } else if (c
->set_cursor
> 0 && c
->set_cursor
< (uint32_t)c
->win_size
) {
90 /* random get one item set before */
91 item
= &c
->item_win
[random() % (int64_t)c
->set_cursor
];
93 /* random get one item from the window */
94 item
= &c
->item_win
[random() % c
->win_size
];
101 * used to get the next item to do set operation
103 * @param c, pointer of the concurrency
105 * @return ms_task_item_t*, the pointer of the next item to do
108 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
)
111 * when a set command successes, the cursor will plus 1. If set
112 * fails, the cursor doesn't change. it isn't necessary to
113 * increase the cursor here.
115 return(&c
->item_win
[(int64_t)c
->set_cursor
% c
->win_size
]);
119 * If we need do overwrite, we could select a item set before.
120 * This function is used to get a item set before to do
123 * @param c, pointer of the concurrency
125 * @return ms_task_item_t*, the pointer of the previous item of
128 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
)
130 if (c
->set_cursor
<= 0) {
131 return (&c
->item_win
[0]);
133 return(&c
->item_win
[(int64_t)--c
->set_cursor
% c
->win_size
]);
138 * According to the proportion of operations(get or set), select
139 * an operation to do.
141 * @param c, pointer of the concurrency
142 * @param task, pointer of current task in the concurrency
144 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
)
146 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
147 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
149 /* update cycle operation number if necessary */
150 if (task
->cycle_undo_get
== 0 || task
->cycle_undo_set
== 0) {
152 task
->cycle_undo_get
+= (int)(CMD_DISTR_ADJUST_CYCLE
* get_prop
);
153 task
->cycle_undo_set
+= (int)(CMD_DISTR_ADJUST_CYCLE
* set_prop
);
157 * According to operation distribution to choose doing which
158 * operation. If it can't set new object to sever, just change
159 * to do get operation.
161 if (set_prop
> PROP_ERROR
162 && (double)task
->get_opt
* set_prop
>= (double)task
->set_opt
* get_prop
) {
164 task
->item
= ms_get_next_set_item(c
);
167 task
->item
= ms_get_next_get_item(c
);
172 * used to judge whether the number of get operations done is
173 * more than expected number of get operations to do right now.
175 * @param task, pointer of current task in the concurrency
177 * @return bool, if get too fast, return true, else return false
179 static bool ms_is_get_too_fast(ms_task_t
*task
)
181 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
182 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
184 /* no get operation */
185 if (get_prop
< PROP_ERROR
) {
189 int max_undo_set
= (int)(set_prop
/ get_prop
*(1.0 + DISADJUST_FACTOR
))
190 * task
->cycle_undo_get
;
192 if ((double)task
->get_opt
* set_prop
> (double)task
->set_opt
* get_prop
193 && task
->cycle_undo_set
> max_undo_set
) {
202 * used to judge whether the number of set operations done is
203 * more than expected number of set operations to do right now.
205 * @param task, pointer of current task in the concurrency
207 * @return bool, if set too fast, return true, else return false
209 static bool ms_is_set_too_fast(ms_task_t
*task
)
211 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
212 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
214 /* no set operation */
215 if (set_prop
< PROP_ERROR
) {
219 /* If it does set operation too fast, skip some */
220 int max_undo_get
= (int)((get_prop
/ set_prop
* (1.0 + DISADJUST_FACTOR
))
221 * (double)task
->cycle_undo_set
);
223 if ((double)task
->get_opt
* set_prop
< (double)task
->set_opt
* get_prop
224 && task
->cycle_undo_get
> max_undo_get
) {
232 * kick out the old item in the window, and add a new item to
233 * overwrite the old item. When we don't want to do overwrite
234 * object, and the current item to do set operation is an old
235 * item, we could kick out the old item and add a new item. Then
236 * we can ensure we set new object every time.
238 * @param item, pointer of task item which includes the object
241 static void ms_kick_out_item(ms_task_item_t
*item
)
243 /* allocate a new item */
244 item
->key_prefix
= ms_get_key_prefix();
246 item
->key_suffix_offset
++;
247 item
->value_offset
= INVALID_OFFSET
; /* new item use invalid value offset */
248 item
->client_time
= 0;
252 * used to judge whether we need overwrite object based on the
253 * options user specified
255 * @param task, pointer of current task in the concurrency
257 * @return bool, if need overwrite, return true, else return
260 static bool ms_need_overwirte_item(ms_task_t
*task
)
262 ms_task_item_t
*item
= task
->item
;
264 assert(item
!= NULL
);
265 assert(task
->cmd
== CMD_SET
);
268 * according to data overwrite percent to determine if do data
271 if (task
->overwrite_set
< (double)task
->set_opt
* ms_setting
.overwrite_percent
) {
280 * used to adjust operation. the function must be called after
281 * select operation. the function change get operation to set
282 * operation, or set operation to get operation based on the
285 * @param c, pointer of the concurrency
286 * @param task, pointer of current task in the concurrency
288 * @return bool, if success, return true, else return false
290 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
)
292 ms_task_item_t
*item
= task
->item
;
293 assert(item
!= NULL
);
295 if (task
->cmd
== CMD_SET
) {
296 /* If did set operation too fast, skip some */
297 if (ms_is_set_too_fast(task
)) {
299 /* get the item instead */
300 if (item
->value_offset
!= INVALID_OFFSET
) {
306 /* If the current item is not a new item, kick it out */
307 if (item
->value_offset
!= INVALID_OFFSET
) {
308 if(ms_need_overwirte_item(task
)) {
310 task
->overwrite_set
++;
312 /* kick out the current item to do set operation */
313 ms_kick_out_item(item
);
315 } else { /* it's a new item */
317 if (ms_need_overwirte_item(task
)) {
318 item
= ms_get_pre_set_item(c
);
319 if (item
->value_offset
!= INVALID_OFFSET
) {
321 task
->overwrite_set
++;
322 } else { /* previous set item is a new item */
323 /* select the previous item to run, and cancel overwrite */
332 if (item
->value_offset
== INVALID_OFFSET
) {
338 * If It does get operation too fast, it will change the
341 if (ms_is_get_too_fast(task
)) {
342 /* don't kick out the first item in the window */
343 if (!ms_is_set_too_fast(task
)) {
344 ms_kick_out_item(item
);
352 assert(item
->value_offset
!= INVALID_OFFSET
);
360 * used to initialize the task which need verify data.
362 * @param task, pointer of current task in the concurrency
364 static void ms_task_data_verify_init(ms_task_t
*task
)
366 ms_task_item_t
*item
= task
->item
;
368 assert(item
!= NULL
);
369 assert(task
->cmd
== CMD_GET
);
372 * according to data verification percent to determine if do
375 if (task
->verified_get
< (double)task
->get_opt
* ms_setting
.verify_percent
) {
377 * currently it doesn't do verify, just increase the counter,
378 * and do verification next proper get command
380 if (task
->item
->value_offset
!= INVALID_OFFSET
&& item
->exp_time
== 0) {
382 task
->finish_verify
= false;
383 task
->verified_get
++;
389 * used to initialize the task which need verify expire time.
391 * @param task, pointer of current task in the concurrency
393 static void ms_task_expire_verify_init(ms_task_t
*task
)
395 ms_task_item_t
*item
= task
->item
;
397 assert(item
!= NULL
);
398 assert(task
->cmd
== CMD_GET
);
399 assert(item
->exp_time
> 0);
402 task
->finish_verify
= false;
406 * used to get one task, the function initializes the task
409 * @param c, pointer of the concurrency
410 * @param warmup, whether it need warmup
412 * @return ms_task_t*, pointer of current task in the
415 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
)
417 ms_task_t
*task
= &c
->curr_task
;
420 task
->verify
= false;
421 task
->finish_verify
= true;
422 task
->get_miss
= true;
426 task
->item
= ms_get_next_set_item(c
);
431 /* according to operation distribution to choose doing which operation */
432 ms_select_opt(c
, task
);
434 if (!ms_adjust_opt(c
, task
)) {
438 if (ms_setting
.verify_percent
> 0 && task
->cmd
== CMD_GET
) {
439 ms_task_data_verify_init(task
);
442 if (ms_setting
.exp_ver_per
> 0 && task
->cmd
== CMD_GET
443 && task
->item
->exp_time
> 0) {
444 ms_task_expire_verify_init(task
);
451 * Only update get and delete counter, set counter will be
452 * updated after set operation successes.
454 if (task
->cmd
== CMD_GET
) {
456 task
->cycle_undo_get
--;
463 * send a signal to the main monitor thread
465 * @param sync_lock, pointer of the lock
467 static void ms_send_signal(ms_sync_lock_t
*sync_lock
)
469 pthread_mutex_lock(&sync_lock
->lock
);
471 pthread_cond_signal(&sync_lock
->cond
);
472 pthread_mutex_unlock(&sync_lock
->lock
);
476 * If user only want to do get operation, but there is no object
477 * in server , so we use this function to warmup the server, and
478 * set some objects to server. It runs at the beginning of task.
480 * @param c, pointer of the concurrency
482 static void ms_warmup_server(ms_conn_t
*c
)
485 ms_task_item_t
*item
;
488 * Extra one loop to get the last command returned state.
489 * Normally it gets the previous command returned state.
491 if (c
->remain_warmup_num
>= 0 &&
492 c
->remain_warmup_num
!= c
->warmup_num
) {
493 item
= ms_get_cur_opt_item(c
);
494 /* only update the set command result state for data verification */
495 if (c
->precmd
.cmd
== CMD_SET
&& c
->precmd
.retstat
== MCD_STORED
) {
496 item
->value_offset
= item
->key_suffix_offset
;
497 /* set success, update counter */
499 } else if (c
->precmd
.cmd
== CMD_SET
&& c
->precmd
.retstat
!= MCD_STORED
) {
500 printf("key: %lx didn't set success\n", item
->key_prefix
);
504 /* the last time don't run a task */
505 if (c
->remain_warmup_num
-- > 0) {
506 /* operate next task item */
507 task
= ms_get_task(c
, true);
513 * finish warming up server, wait all connects initialize
514 * complete. Then all connects can start do task at the same
517 if (c
->remain_warmup_num
== -1) {
518 ms_send_signal(&ms_global
.init_lock
);
519 c
->remain_warmup_num
--; /* never run the if branch */
524 * dispatch single get and set task
526 * @param c, pointer of the concurrency
528 static void ms_single_getset_task_sch(ms_conn_t
*c
)
531 ms_task_item_t
*item
;
533 /* the last time don't run a task */
534 if (c
->remain_exec_num
-- > 0) {
535 task
= ms_get_task(c
, false);
537 if (task
->cmd
== CMD_SET
) {
539 } else if (task
->cmd
== CMD_GET
) {
540 assert(task
->cmd
== CMD_GET
);
541 ms_mcd_get(c
, item
, task
->verify
);
547 * dispatch multi-get and set task
549 * @param c, pointer of the concurrency
551 static void ms_multi_getset_task_sch(ms_conn_t
*c
)
554 ms_mlget_task_item_t
*mlget_item
;
557 if (c
->remain_exec_num
-- > 0) {
558 task
= ms_get_task(c
, false);
559 if (task
->cmd
== CMD_SET
) { /* just do it */
560 ms_mcd_set(c
, task
->item
);
563 assert(task
->cmd
== CMD_GET
);
564 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.mlget_num
];
565 mlget_item
->item
= task
->item
;
566 mlget_item
->verify
= task
->verify
;
567 mlget_item
->finish_verify
= task
->finish_verify
;
568 mlget_item
->get_miss
= task
->get_miss
;
569 c
->mlget_task
.mlget_num
++;
571 /* enough multi-get task items can be done */
572 if (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
||
573 (c
->remain_exec_num
== 0 && c
->mlget_task
.mlget_num
> 0)) {
579 if (c
->remain_exec_num
<= 0 && c
->mlget_task
.mlget_num
> 0) {
588 * calculate the difference value of two time points
590 * @param start_time, the start time
591 * @param end_time, the end time
593 * @return uint64_t, the difference value between start_time and end_time in us
595 int64_t ms_time_diff(struct timeval
*start_time
, struct timeval
*end_time
)
597 int64_t endtime
= end_time
->tv_sec
* 1000000 + end_time
->tv_usec
;
598 int64_t starttime
= start_time
->tv_sec
* 1000000 + start_time
->tv_usec
;
600 assert(endtime
>= starttime
);
602 return(endtime
- starttime
);
606 * after get the response from server for multi-get, the
607 * function update the state of the task and do data verify if
610 * @param c, pointer of the concurrency
612 static void ms_update_multi_get_result(ms_conn_t
*c
)
614 ms_mlget_task_item_t
*mlget_item
;
615 ms_task_item_t
*item
;
616 char *orignval
= NULL
;
617 char *orignkey
= NULL
;
624 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
625 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
626 item
= mlget_item
->item
;
627 orignval
= &ms_setting
.char_block
[item
->value_offset
];
628 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
630 /* update get miss counter */
631 if (mlget_item
->get_miss
) {
632 __sync_fetch_and_add(&ms_stats
.get_misses
, 1);
635 /* get nothing from server for this task item */
636 if (mlget_item
->verify
&& !mlget_item
->finish_verify
) {
637 /* verify expire time if necessary */
638 if (item
->exp_time
> 0) {
639 struct timeval curr_time
;
640 gettimeofday(&curr_time
, NULL
);
642 /* object doesn't expire but can't get it now */
643 if (curr_time
.tv_sec
- item
->client_time
644 < item
->exp_time
- EXPIRE_TIME_ERROR
) {
645 __sync_fetch_and_add(&ms_stats
.unexp_unget
, 1);
647 if (ms_setting
.verbose
) {
650 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
651 localtime(&item
->client_time
));
652 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
653 localtime(&curr_time
.tv_sec
));
654 fprintf(stderr
, "\n\t<%d expire time verification failed, object "
655 "doesn't expire but can't get it now\n"
658 "\tset time: %s current time: %s "
659 "diff time: %d expire time: %d\n"
660 "\texpected data len: %d\n"
661 "\texpected data: %.*s\n"
662 "\treceived data: \n",
663 c
->sfd
, item
->key_size
, item
->key_prefix
,
664 item
->key_size
- (int)KEY_PREFIX_SIZE
,
665 orignkey
, set_time
, cur_time
,
666 (int)(curr_time
.tv_sec
- item
->client_time
), item
->exp_time
,
667 item
->value_size
, item
->value_size
, orignval
);
672 __sync_fetch_and_add(&ms_stats
.vef_miss
, 1);
674 if (ms_setting
.verbose
) {
675 fprintf(stderr
, "\n<%d data verification failed\n"
678 "\texpected data len: %d\n"
679 "\texpected data: %.*s\n"
680 "\treceived data: \n",
681 c
->sfd
, item
->key_size
, item
->key_prefix
,
682 item
->key_size
- (int)KEY_PREFIX_SIZE
,
683 orignkey
, item
->value_size
, item
->value_size
, orignval
);
689 c
->mlget_task
.mlget_num
= 0;
690 c
->mlget_task
.value_index
= INVALID_OFFSET
;
694 * after get the response from server for single get, the
695 * function update the state of the task and do data verify if
698 * @param c, pointer of the concurrency
699 * @param item, pointer of task item which includes the object
702 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
)
704 char *orignval
= NULL
;
705 char *orignkey
= NULL
;
707 if (c
== NULL
|| item
== NULL
) {
711 assert(item
!= NULL
);
713 orignval
= &ms_setting
.char_block
[item
->value_offset
];
714 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
716 /* update get miss counter */
717 if (c
->precmd
.cmd
== CMD_GET
&& c
->curr_task
.get_miss
) {
718 __sync_fetch_and_add(&ms_stats
.get_misses
, 1);
721 /* get nothing from server for this task item */
722 if (c
->precmd
.cmd
== CMD_GET
&& c
->curr_task
.verify
723 && !c
->curr_task
.finish_verify
) {
725 /* verify expire time if necessary */
726 if (item
->exp_time
> 0) {
727 struct timeval curr_time
;
728 gettimeofday(&curr_time
, NULL
);
730 /* object doesn't expire but can't get it now */
731 if (curr_time
.tv_sec
- item
->client_time
732 < item
->exp_time
- EXPIRE_TIME_ERROR
) {
733 __sync_fetch_and_add(&ms_stats
.unexp_unget
, 1);
735 if (ms_setting
.verbose
) {
738 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
739 localtime(&item
->client_time
));
740 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
741 localtime(&curr_time
.tv_sec
));
742 fprintf(stderr
, "\n\t<%d expire time verification failed, object "
743 "doesn't expire but can't get it now\n"
746 "\tset time: %s current time: %s "
747 "diff time: %d expire time: %d\n"
748 "\texpected data len: %d\n"
749 "\texpected data: %.*s\n"
750 "\treceived data: \n",
751 c
->sfd
, item
->key_size
, item
->key_prefix
,
752 item
->key_size
- (int)KEY_PREFIX_SIZE
,
753 orignkey
, set_time
, cur_time
,
754 (int)(curr_time
.tv_sec
- item
->client_time
), item
->exp_time
,
755 item
->value_size
, item
->value_size
, orignval
);
760 __sync_fetch_and_add(&ms_stats
.vef_miss
, 1);
762 if (ms_setting
.verbose
) {
763 fprintf(stderr
, "\n<%d data verification failed\n"
766 "\texpected data len: %d\n"
767 "\texpected data: %.*s\n"
768 "\treceived data: \n",
769 c
->sfd
, item
->key_size
, item
->key_prefix
,
770 item
->key_size
- (int)KEY_PREFIX_SIZE
,
771 orignkey
, item
->value_size
, item
->value_size
, orignval
);
779 * after get the response from server for set the function
780 * update the state of the task and do data verify if necessary.
782 * @param c, pointer of the concurrency
783 * @param item, pointer of task item which includes the object
786 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
)
788 if (c
== NULL
|| item
== NULL
) {
792 assert(item
!= NULL
);
794 if (c
->precmd
.cmd
== CMD_SET
) {
795 switch (c
->precmd
.retstat
) {
797 if (item
->value_offset
== INVALID_OFFSET
) {
798 /* first set with the same offset of key suffix */
799 item
->value_offset
= item
->key_suffix_offset
;
801 /* not first set, just increase the value offset */
802 item
->value_offset
+= 1;
805 /* set successes, update counter */
807 c
->curr_task
.set_opt
++;
808 c
->curr_task
.cycle_undo_set
--;
811 case MCD_SERVER_ERROR
:
819 * update the response time result
821 * @param c, pointer of the concurrency
823 static void ms_update_stat_result(ms_conn_t
*c
)
825 bool get_miss
= false;
832 gettimeofday(&c
->end_time
, NULL
);
833 uint64_t time_diff
= (uint64_t)ms_time_diff(&c
->start_time
, &c
->end_time
);
835 pthread_mutex_lock(&ms_statistic
.stat_mutex
);
836 switch (c
->precmd
.cmd
) {
838 ms_record_event(&ms_statistic
.set_stat
, time_diff
, false);
841 if (c
->curr_task
.get_miss
) {
844 ms_record_event(&ms_statistic
.get_stat
, time_diff
, get_miss
);
849 ms_record_event(&ms_statistic
.total_stat
, time_diff
, get_miss
);
850 pthread_mutex_unlock(&ms_statistic
.stat_mutex
);
854 * after get response from server for the current operation, and
855 * before doing the next operation, update the state of the
858 * @param c, pointer of the concurrency
860 static void ms_update_task_result(ms_conn_t
*c
)
862 ms_task_item_t
*item
;
868 item
= ms_get_cur_opt_item(c
);
872 assert(item
!= NULL
);
874 ms_update_set_result(c
, item
);
876 if (ms_setting
.stat_freq
> 0
877 && (c
->precmd
.cmd
== CMD_SET
|| c
->precmd
.cmd
== CMD_GET
)) {
878 ms_update_stat_result(c
);
881 /* update multi-get task item */
882 if ((ms_setting
.mult_key_num
> 1 &&
883 c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
) ||
884 (c
->remain_exec_num
== 0 && c
->mlget_task
.mlget_num
> 0)) {
885 ms_update_multi_get_result(c
);
887 ms_update_single_get_result(c
, item
);
892 * run get and set operation
894 * @param c, pointer of the concurrency
896 * @return int, if success, return 0, else return -1
898 static int ms_run_getset_task(ms_conn_t
*c
)
901 * extra one loop to get the last command return state. get the
902 * last command return state.
904 if (c
->remain_exec_num
>= 0 &&
905 c
->remain_exec_num
!= c
->exec_num
) {
906 ms_update_task_result(c
);
910 if (ms_setting
.mult_key_num
> 1) {
911 /* operate next task item */
912 ms_multi_getset_task_sch(c
);
914 /* operate next task item */
915 ms_single_getset_task_sch(c
);
918 /* no task to do, exit */
919 if (c
->remain_exec_num
== -1 || ms_global
.time_out
) {
927 * the state machine call the function to execute task.
929 * @param c, pointer of the concurrency
931 * @return int, if success, return 0, else return -1
933 int ms_exec_task(struct conn
*c
)
935 if (!ms_global
.finish_warmup
) {
938 if (ms_run_getset_task(c
) != 0) {