3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
11 #include "ms_thread.h"
12 #include "ms_setting.h"
14 /* command distribution adjustment cycle */
15 #define CMD_DISTR_ADJUST_CYCLE 1000
16 #define DISADJUST_FACTOR 0.03 /**
17 * In one adjustment cycle, if undo set or get
18 * operations proportion is more than 3% , means
19 * there are too many new item or need more new
20 * item in the window. This factor shows it.
23 extern __thread ms_thread_t ms_thread
;
25 /* get item from task window */
26 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
);
27 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
);
28 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
);
29 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
);
32 /* select next operation to do */
33 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
);
36 /* set and get speed estimate for controlling and adjustment */
37 static bool ms_is_set_too_fast(ms_task_t
*task
);
38 static bool ms_is_get_too_fast(ms_task_t
*task
);
39 static void ms_kick_out_item(ms_task_item_t
*item
);
42 /* miss rate adjustment */
43 static bool ms_need_overwirte_item(ms_task_t
*task
);
44 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
);
47 /* deal with data verification initialization */
48 static void ms_task_data_verify_init(ms_task_t
*task
);
49 static void ms_task_expire_verify_init(ms_task_t
*task
);
52 /* select a new task to do */
53 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
);
56 /* run the selected task */
57 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
);
58 static void ms_update_stat_result(ms_conn_t
*c
);
59 static void ms_update_multi_get_result(ms_conn_t
*c
);
60 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
);
61 static void ms_update_task_result(ms_conn_t
*c
);
62 static void ms_single_getset_task_sch(ms_conn_t
*c
);
63 static void ms_multi_getset_task_sch(ms_conn_t
*c
);
64 static void ms_send_signal(ms_sync_lock_t
*sync_lock
);
65 static void ms_warmup_server(ms_conn_t
*c
);
66 static int ms_run_getset_task(ms_conn_t
*c
);
70 * used to get the current operation item(object)
72 * @param c, pointer of the concurrency
74 * @return ms_task_item_t*, current operating item
76 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
)
78 return c
->curr_task
.item
;
83 * used to get the next item to do get operation
85 * @param c, pointer of the concurrency
87 * @return ms_task_item_t*, the pointer of the next item to do
90 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
)
92 ms_task_item_t
*item
= NULL
;
94 if (c
->set_cursor
<= 0)
96 /* the first item in the window */
97 item
= &c
->item_win
[0];
99 else if (c
->set_cursor
> 0 && c
->set_cursor
< (uint32_t)c
->win_size
)
101 /* random get one item set before */
102 item
= &c
->item_win
[random() % (int64_t)c
->set_cursor
];
106 /* random get one item from the window */
107 item
= &c
->item_win
[random() % c
->win_size
];
111 } /* ms_get_next_get_item */
115 * used to get the next item to do set operation
117 * @param c, pointer of the concurrency
119 * @return ms_task_item_t*, the pointer of the next item to do
122 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
)
125 * when a set command successes, the cursor will plus 1. If set
126 * fails, the cursor doesn't change. it isn't necessary to
127 * increase the cursor here.
129 return &c
->item_win
[(int64_t)c
->set_cursor
% c
->win_size
];
134 * If we need do overwrite, we could select a item set before.
135 * This function is used to get a item set before to do
138 * @param c, pointer of the concurrency
140 * @return ms_task_item_t*, the pointer of the previous item of
143 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
)
145 if (c
->set_cursor
<= 0)
147 return &c
->item_win
[0];
151 return &c
->item_win
[(int64_t)-- c
->set_cursor
% c
->win_size
];
153 } /* ms_get_pre_set_item */
157 * According to the proportion of operations(get or set), select
158 * an operation to do.
160 * @param c, pointer of the concurrency
161 * @param task, pointer of current task in the concurrency
163 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
)
165 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
166 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
168 /* update cycle operation number if necessary */
169 if ((task
->cycle_undo_get
== 0) || (task
->cycle_undo_set
== 0))
171 task
->cycle_undo_get
+= (int)(CMD_DISTR_ADJUST_CYCLE
* get_prop
);
172 task
->cycle_undo_set
+= (int)(CMD_DISTR_ADJUST_CYCLE
* set_prop
);
176 * According to operation distribution to choose doing which
177 * operation. If it can't set new object to sever, just change
178 * to do get operation.
180 if ((set_prop
> PROP_ERROR
)
181 && ((double)task
->get_opt
* set_prop
>= (double)task
->set_opt
185 task
->item
= ms_get_next_set_item(c
);
190 task
->item
= ms_get_next_get_item(c
);
192 } /* ms_select_opt */
196 * used to judge whether the number of get operations done is
197 * more than expected number of get operations to do right now.
199 * @param task, pointer of current task in the concurrency
201 * @return bool, if get too fast, return true, else return false
203 static bool ms_is_get_too_fast(ms_task_t
*task
)
205 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
206 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
208 /* no get operation */
209 if (get_prop
< PROP_ERROR
)
214 int max_undo_set
= (int)(set_prop
/ get_prop
* (1.0 + DISADJUST_FACTOR
))
215 * task
->cycle_undo_get
;
217 if (((double)task
->get_opt
* set_prop
> (double)task
->set_opt
* get_prop
)
218 && (task
->cycle_undo_set
> max_undo_set
))
224 } /* ms_is_get_too_fast */
228 * used to judge whether the number of set operations done is
229 * more than expected number of set operations to do right now.
231 * @param task, pointer of current task in the concurrency
233 * @return bool, if set too fast, return true, else return false
235 static bool ms_is_set_too_fast(ms_task_t
*task
)
237 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
238 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
240 /* no set operation */
241 if (set_prop
< PROP_ERROR
)
246 /* If it does set operation too fast, skip some */
247 int max_undo_get
= (int)((get_prop
/ set_prop
* (1.0 + DISADJUST_FACTOR
))
248 * (double)task
->cycle_undo_set
);
250 if (((double)task
->get_opt
* set_prop
< (double)task
->set_opt
* get_prop
)
251 && (task
->cycle_undo_get
> max_undo_get
))
257 } /* ms_is_set_too_fast */
261 * kick out the old item in the window, and add a new item to
262 * overwrite the old item. When we don't want to do overwrite
263 * object, and the current item to do set operation is an old
264 * item, we could kick out the old item and add a new item. Then
265 * we can ensure we set new object every time.
267 * @param item, pointer of task item which includes the object
270 static void ms_kick_out_item(ms_task_item_t
*item
)
272 /* allocate a new item */
273 item
->key_prefix
= ms_get_key_prefix();
275 item
->key_suffix_offset
++;
276 item
->value_offset
= INVALID_OFFSET
; /* new item use invalid value offset */
277 item
->client_time
= 0;
278 } /* ms_kick_out_item */
282 * used to judge whether we need overwrite object based on the
283 * options user specified
285 * @param task, pointer of current task in the concurrency
287 * @return bool, if need overwrite, return true, else return
290 static bool ms_need_overwirte_item(ms_task_t
*task
)
292 ms_task_item_t
*item
= task
->item
;
294 assert(item
!= NULL
);
295 assert(task
->cmd
== CMD_SET
);
298 * according to data overwrite percent to determine if do data
301 if (task
->overwrite_set
< (double)task
->set_opt
302 * ms_setting
.overwrite_percent
)
308 } /* ms_need_overwirte_item */
312 * used to adjust operation. the function must be called after
313 * select operation. the function change get operation to set
314 * operation, or set operation to get operation based on the
317 * @param c, pointer of the concurrency
318 * @param task, pointer of current task in the concurrency
320 * @return bool, if success, return true, else return false
322 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
)
324 ms_task_item_t
*item
= task
->item
;
326 assert(item
!= NULL
);
328 if (task
->cmd
== CMD_SET
)
330 /* If did set operation too fast, skip some */
331 if (ms_is_set_too_fast(task
))
333 /* get the item instead */
334 if (item
->value_offset
!= INVALID_OFFSET
)
341 /* If the current item is not a new item, kick it out */
342 if (item
->value_offset
!= INVALID_OFFSET
)
344 if (ms_need_overwirte_item(task
))
347 task
->overwrite_set
++;
351 /* kick out the current item to do set operation */
352 ms_kick_out_item(item
);
355 else /* it's a new item */
358 if (ms_need_overwirte_item(task
))
360 item
= ms_get_pre_set_item(c
);
361 if (item
->value_offset
!= INVALID_OFFSET
)
364 task
->overwrite_set
++;
366 else /* previous set item is a new item */
368 /* select the previous item to run, and cancel overwrite */
378 if (item
->value_offset
== INVALID_OFFSET
)
385 * If It does get operation too fast, it will change the
388 if (ms_is_get_too_fast(task
))
390 /* don't kick out the first item in the window */
391 if (! ms_is_set_too_fast(task
))
393 ms_kick_out_item(item
);
403 assert(item
->value_offset
!= INVALID_OFFSET
);
408 } /* ms_adjust_opt */
412 * used to initialize the task which need verify data.
414 * @param task, pointer of current task in the concurrency
416 static void ms_task_data_verify_init(ms_task_t
*task
)
418 ms_task_item_t
*item
= task
->item
;
420 assert(item
!= NULL
);
421 assert(task
->cmd
== CMD_GET
);
424 * according to data verification percent to determine if do
427 if (task
->verified_get
< (double)task
->get_opt
428 * ms_setting
.verify_percent
)
431 * currently it doesn't do verify, just increase the counter,
432 * and do verification next proper get command
434 if ((task
->item
->value_offset
!= INVALID_OFFSET
)
435 && (item
->exp_time
== 0))
438 task
->finish_verify
= false;
439 task
->verified_get
++;
442 } /* ms_task_data_verify_init */
446 * used to initialize the task which need verify expire time.
448 * @param task, pointer of current task in the concurrency
450 static void ms_task_expire_verify_init(ms_task_t
*task
)
452 ms_task_item_t
*item
= task
->item
;
454 assert(item
!= NULL
);
455 assert(task
->cmd
== CMD_GET
);
456 assert(item
->exp_time
> 0);
459 task
->finish_verify
= false;
460 } /* ms_task_expire_verify_init */
464 * used to get one task, the function initializes the task
467 * @param c, pointer of the concurrency
468 * @param warmup, whether it need warmup
470 * @return ms_task_t*, pointer of current task in the
473 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
)
475 ms_task_t
*task
= &c
->curr_task
;
480 task
->finish_verify
= true;
481 task
->get_miss
= true;
486 task
->item
= ms_get_next_set_item(c
);
491 /* according to operation distribution to choose doing which operation */
492 ms_select_opt(c
, task
);
494 if (! ms_adjust_opt(c
, task
))
499 if ((ms_setting
.verify_percent
> 0) && (task
->cmd
== CMD_GET
))
501 ms_task_data_verify_init(task
);
504 if ((ms_setting
.exp_ver_per
> 0) && (task
->cmd
== CMD_GET
)
505 && (task
->item
->exp_time
> 0))
507 ms_task_expire_verify_init(task
);
514 * Only update get and delete counter, set counter will be
515 * updated after set operation successes.
517 if (task
->cmd
== CMD_GET
)
520 task
->cycle_undo_get
--;
528 * send a signal to the main monitor thread
530 * @param sync_lock, pointer of the lock
532 static void ms_send_signal(ms_sync_lock_t
*sync_lock
)
534 pthread_mutex_lock(&sync_lock
->lock
);
536 pthread_cond_signal(&sync_lock
->cond
);
537 pthread_mutex_unlock(&sync_lock
->lock
);
538 } /* ms_send_signal */
542 * If user only want to do get operation, but there is no object
543 * in server , so we use this function to warmup the server, and
544 * set some objects to server. It runs at the beginning of task.
546 * @param c, pointer of the concurrency
548 static void ms_warmup_server(ms_conn_t
*c
)
551 ms_task_item_t
*item
;
554 * Extra one loop to get the last command returned state.
555 * Normally it gets the previous command returned state.
557 if ((c
->remain_warmup_num
>= 0)
558 && (c
->remain_warmup_num
!= c
->warmup_num
))
560 item
= ms_get_cur_opt_item(c
);
561 /* only update the set command result state for data verification */
562 if ((c
->precmd
.cmd
== CMD_SET
) && (c
->precmd
.retstat
== MCD_STORED
))
564 item
->value_offset
= item
->key_suffix_offset
;
565 /* set success, update counter */
568 else if (c
->precmd
.cmd
== CMD_SET
&& c
->precmd
.retstat
!= MCD_STORED
)
570 printf("key: %lx didn't set success\n", item
->key_prefix
);
574 /* the last time don't run a task */
575 if (c
->remain_warmup_num
-- > 0)
577 /* operate next task item */
578 task
= ms_get_task(c
, true);
584 * finish warming up server, wait all connects initialize
585 * complete. Then all connects can start do task at the same
588 if (c
->remain_warmup_num
== -1)
590 ms_send_signal(&ms_global
.init_lock
);
591 c
->remain_warmup_num
--; /* never run the if branch */
593 } /* ms_warmup_server */
597 * dispatch single get and set task
599 * @param c, pointer of the concurrency
601 static void ms_single_getset_task_sch(ms_conn_t
*c
)
604 ms_task_item_t
*item
;
606 /* the last time don't run a task */
607 if (c
->remain_exec_num
-- > 0)
609 task
= ms_get_task(c
, false);
611 if (task
->cmd
== CMD_SET
)
615 else if (task
->cmd
== CMD_GET
)
617 assert(task
->cmd
== CMD_GET
);
618 ms_mcd_get(c
, item
, task
->verify
);
621 } /* ms_single_getset_task_sch */
625 * dispatch multi-get and set task
627 * @param c, pointer of the concurrency
629 static void ms_multi_getset_task_sch(ms_conn_t
*c
)
632 ms_mlget_task_item_t
*mlget_item
;
636 if (c
->remain_exec_num
-- > 0)
638 task
= ms_get_task(c
, false);
639 if (task
->cmd
== CMD_SET
) /* just do it */
641 ms_mcd_set(c
, task
->item
);
646 assert(task
->cmd
== CMD_GET
);
647 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.mlget_num
];
648 mlget_item
->item
= task
->item
;
649 mlget_item
->verify
= task
->verify
;
650 mlget_item
->finish_verify
= task
->finish_verify
;
651 mlget_item
->get_miss
= task
->get_miss
;
652 c
->mlget_task
.mlget_num
++;
654 /* enough multi-get task items can be done */
655 if ((c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
)
656 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
665 if ((c
->remain_exec_num
<= 0) && (c
->mlget_task
.mlget_num
> 0))
672 } /* ms_multi_getset_task_sch */
676 * calculate the difference value of two time points
678 * @param start_time, the start time
679 * @param end_time, the end time
681 * @return uint64_t, the difference value between start_time and end_time in us
683 int64_t ms_time_diff(struct timeval
*start_time
, struct timeval
*end_time
)
685 int64_t endtime
= end_time
->tv_sec
* 1000000 + end_time
->tv_usec
;
686 int64_t starttime
= start_time
->tv_sec
* 1000000 + start_time
->tv_usec
;
688 assert(endtime
>= starttime
);
690 return endtime
- starttime
;
695 * after get the response from server for multi-get, the
696 * function update the state of the task and do data verify if
699 * @param c, pointer of the concurrency
701 static void ms_update_multi_get_result(ms_conn_t
*c
)
703 ms_mlget_task_item_t
*mlget_item
;
704 ms_task_item_t
*item
;
705 char *orignval
= NULL
;
706 char *orignkey
= NULL
;
714 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
716 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
717 item
= mlget_item
->item
;
718 orignval
= &ms_setting
.char_block
[item
->value_offset
];
719 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
721 /* update get miss counter */
722 if (mlget_item
->get_miss
)
724 __sync_fetch_and_add(&ms_stats
.get_misses
, 1);
727 /* get nothing from server for this task item */
728 if (mlget_item
->verify
&& ! mlget_item
->finish_verify
)
730 /* verify expire time if necessary */
731 if (item
->exp_time
> 0)
733 struct timeval curr_time
;
734 gettimeofday(&curr_time
, NULL
);
736 /* object doesn't expire but can't get it now */
737 if (curr_time
.tv_sec
- item
->client_time
738 < item
->exp_time
- EXPIRE_TIME_ERROR
)
740 __sync_fetch_and_add(&ms_stats
.unexp_unget
, 1);
742 if (ms_setting
.verbose
)
746 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
747 localtime(&item
->client_time
));
748 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
749 localtime(&curr_time
.tv_sec
));
751 "\n\t<%d expire time verification failed, object "
752 "doesn't expire but can't get it now\n"
755 "\tset time: %s current time: %s "
756 "diff time: %d expire time: %d\n"
757 "\texpected data len: %d\n"
758 "\texpected data: %.*s\n"
759 "\treceived data: \n",
763 item
->key_size
- (int)KEY_PREFIX_SIZE
,
767 (int)(curr_time
.tv_sec
- item
->client_time
),
778 __sync_fetch_and_add(&ms_stats
.vef_miss
, 1);
780 if (ms_setting
.verbose
)
782 fprintf(stderr
, "\n<%d data verification failed\n"
785 "\texpected data len: %d\n"
786 "\texpected data: %.*s\n"
787 "\treceived data: \n",
788 c
->sfd
, item
->key_size
, item
->key_prefix
,
789 item
->key_size
- (int)KEY_PREFIX_SIZE
,
790 orignkey
, item
->value_size
, item
->value_size
, orignval
);
796 c
->mlget_task
.mlget_num
= 0;
797 c
->mlget_task
.value_index
= INVALID_OFFSET
;
798 } /* ms_update_multi_get_result */
802 * after get the response from server for single get, the
803 * function update the state of the task and do data verify if
806 * @param c, pointer of the concurrency
807 * @param item, pointer of task item which includes the object
810 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
)
812 char *orignval
= NULL
;
813 char *orignkey
= NULL
;
815 if ((c
== NULL
) || (item
== NULL
))
820 assert(item
!= NULL
);
822 orignval
= &ms_setting
.char_block
[item
->value_offset
];
823 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
825 /* update get miss counter */
826 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.get_miss
)
828 __sync_fetch_and_add(&ms_stats
.get_misses
, 1);
831 /* get nothing from server for this task item */
832 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.verify
833 && ! c
->curr_task
.finish_verify
)
835 /* verify expire time if necessary */
836 if (item
->exp_time
> 0)
838 struct timeval curr_time
;
839 gettimeofday(&curr_time
, NULL
);
841 /* object doesn't expire but can't get it now */
842 if (curr_time
.tv_sec
- item
->client_time
843 < item
->exp_time
- EXPIRE_TIME_ERROR
)
845 __sync_fetch_and_add(&ms_stats
.unexp_unget
, 1);
847 if (ms_setting
.verbose
)
851 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
852 localtime(&item
->client_time
));
853 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
854 localtime(&curr_time
.tv_sec
));
856 "\n\t<%d expire time verification failed, object "
857 "doesn't expire but can't get it now\n"
860 "\tset time: %s current time: %s "
861 "diff time: %d expire time: %d\n"
862 "\texpected data len: %d\n"
863 "\texpected data: %.*s\n"
864 "\treceived data: \n",
868 item
->key_size
- (int)KEY_PREFIX_SIZE
,
872 (int)(curr_time
.tv_sec
- item
->client_time
),
883 __sync_fetch_and_add(&ms_stats
.vef_miss
, 1);
885 if (ms_setting
.verbose
)
887 fprintf(stderr
, "\n<%d data verification failed\n"
890 "\texpected data len: %d\n"
891 "\texpected data: %.*s\n"
892 "\treceived data: \n",
893 c
->sfd
, item
->key_size
, item
->key_prefix
,
894 item
->key_size
- (int)KEY_PREFIX_SIZE
,
895 orignkey
, item
->value_size
, item
->value_size
, orignval
);
900 } /* ms_update_single_get_result */
904 * after get the response from server for set the function
905 * update the state of the task and do data verify if necessary.
907 * @param c, pointer of the concurrency
908 * @param item, pointer of task item which includes the object
911 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
)
913 if ((c
== NULL
) || (item
== NULL
))
918 assert(item
!= NULL
);
920 if (c
->precmd
.cmd
== CMD_SET
)
922 switch (c
->precmd
.retstat
)
925 if (item
->value_offset
== INVALID_OFFSET
)
927 /* first set with the same offset of key suffix */
928 item
->value_offset
= item
->key_suffix_offset
;
932 /* not first set, just increase the value offset */
933 item
->value_offset
+= 1;
936 /* set successes, update counter */
938 c
->curr_task
.set_opt
++;
939 c
->curr_task
.cycle_undo_set
--;
942 case MCD_SERVER_ERROR
:
947 } /* ms_update_set_result */
951 * update the response time result
953 * @param c, pointer of the concurrency
955 static void ms_update_stat_result(ms_conn_t
*c
)
957 bool get_miss
= false;
965 gettimeofday(&c
->end_time
, NULL
);
966 uint64_t time_diff
= (uint64_t)ms_time_diff(&c
->start_time
, &c
->end_time
);
968 pthread_mutex_lock(&ms_statistic
.stat_mutex
);
970 switch (c
->precmd
.cmd
)
973 ms_record_event(&ms_statistic
.set_stat
, time_diff
, false);
977 if (c
->curr_task
.get_miss
)
981 ms_record_event(&ms_statistic
.get_stat
, time_diff
, get_miss
);
988 ms_record_event(&ms_statistic
.total_stat
, time_diff
, get_miss
);
989 pthread_mutex_unlock(&ms_statistic
.stat_mutex
);
990 } /* ms_update_stat_result */
994 * after get response from server for the current operation, and
995 * before doing the next operation, update the state of the
998 * @param c, pointer of the concurrency
1000 static void ms_update_task_result(ms_conn_t
*c
)
1002 ms_task_item_t
*item
;
1010 item
= ms_get_cur_opt_item(c
);
1015 assert(item
!= NULL
);
1017 ms_update_set_result(c
, item
);
1019 if ((ms_setting
.stat_freq
> 0)
1020 && ((c
->precmd
.cmd
== CMD_SET
) || (c
->precmd
.cmd
== CMD_GET
)))
1022 ms_update_stat_result(c
);
1025 /* update multi-get task item */
1026 if (((ms_setting
.mult_key_num
> 1)
1027 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1028 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1030 ms_update_multi_get_result(c
);
1034 ms_update_single_get_result(c
, item
);
1036 } /* ms_update_task_result */
1040 * run get and set operation
1042 * @param c, pointer of the concurrency
1044 * @return int, if success, return 0, else return -1
1046 static int ms_run_getset_task(ms_conn_t
*c
)
1049 * extra one loop to get the last command return state. get the
1050 * last command return state.
1052 if ((c
->remain_exec_num
>= 0)
1053 && (c
->remain_exec_num
!= c
->exec_num
))
1055 ms_update_task_result(c
);
1059 if (ms_setting
.mult_key_num
> 1)
1061 /* operate next task item */
1062 ms_multi_getset_task_sch(c
);
1066 /* operate next task item */
1067 ms_single_getset_task_sch(c
);
1070 /* no task to do, exit */
1071 if ((c
->remain_exec_num
== -1) || ms_global
.time_out
)
1077 } /* ms_run_getset_task */
1081 * the state machine call the function to execute task.
1083 * @param c, pointer of the concurrency
1085 * @return int, if success, return 0, else return -1
1087 int ms_exec_task(struct conn
*c
)
1089 if (! ms_global
.finish_warmup
)
1091 ms_warmup_server(c
);
1095 if (ms_run_getset_task(c
) != 0)
1102 } /* ms_exec_task */