3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
12 #include "mem_config.h"
14 #if defined(HAVE_SYS_TIME_H)
15 # include <sys/time.h>
18 #if defined(HAVE_TIME_H)
22 #include "ms_thread.h"
23 #include "ms_setting.h"
24 #include "ms_atomic.h"
26 /* command distribution adjustment cycle */
27 #define CMD_DISTR_ADJUST_CYCLE 1000
28 #define DISADJUST_FACTOR 0.03 /**
29 * In one adjustment cycle, if undo set or get
30 * operations proportion is more than 3% , means
31 * there are too many new item or need more new
32 * item in the window. This factor shows it.
35 /* get item from task window */
36 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
);
37 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
);
38 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
);
39 static ms_task_item_t
*ms_get_random_overwrite_item(ms_conn_t
*c
);
42 /* select next operation to do */
43 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
);
46 /* set and get speed estimate for controlling and adjustment */
47 static bool ms_is_set_too_fast(ms_task_t
*task
);
48 static bool ms_is_get_too_fast(ms_task_t
*task
);
49 static void ms_kick_out_item(ms_task_item_t
*item
);
52 /* miss rate adjustment */
53 static bool ms_need_overwrite_item(ms_task_t
*task
);
54 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
);
57 /* deal with data verification initialization */
58 static void ms_task_data_verify_init(ms_task_t
*task
);
59 static void ms_task_expire_verify_init(ms_task_t
*task
);
62 /* select a new task to do */
63 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
);
66 /* run the selected task */
67 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
);
68 static void ms_update_stat_result(ms_conn_t
*c
);
69 static void ms_update_multi_get_result(ms_conn_t
*c
);
70 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
);
71 static void ms_update_task_result(ms_conn_t
*c
);
72 static void ms_single_getset_task_sch(ms_conn_t
*c
);
73 static void ms_multi_getset_task_sch(ms_conn_t
*c
);
74 static void ms_send_signal(ms_sync_lock_t
*sync_lock
);
75 static void ms_warmup_server(ms_conn_t
*c
);
76 static int ms_run_getset_task(ms_conn_t
*c
);
80 * used to get the current operation item(object)
82 * @param c, pointer of the concurrency
84 * @return ms_task_item_t*, current operating item
86 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
)
88 return c
->curr_task
.item
;
93 * used to get the next item to do get operation
95 * @param c, pointer of the concurrency
97 * @return ms_task_item_t*, the pointer of the next item to do
100 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
)
102 ms_task_item_t
*item
= NULL
;
104 if (c
->set_cursor
<= 0)
106 /* the first item in the window */
107 item
= &c
->item_win
[0];
109 else if (c
->set_cursor
> 0 && c
->set_cursor
< (uint32_t)c
->win_size
)
111 /* random get one item set before */
112 item
= &c
->item_win
[random() % (int64_t)c
->set_cursor
];
116 /* random get one item from the window */
117 item
= &c
->item_win
[random() % c
->win_size
];
121 } /* ms_get_next_get_item */
125 * used to get the next item to do set operation
127 * @param c, pointer of the concurrency
129 * @return ms_task_item_t*, the pointer of the next item to do
132 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
)
135 * when a set command successes, the cursor will plus 1. If set
136 * fails, the cursor doesn't change. it isn't necessary to
137 * increase the cursor here.
139 return &c
->item_win
[(int64_t)c
->set_cursor
% c
->win_size
];
144 * If we need do overwrite, we could select a item set before.
145 * This function is used to get a item set before to do
148 * @param c, pointer of the concurrency
150 * @return ms_task_item_t*, the pointer of the previous item of
153 static ms_task_item_t
*ms_get_random_overwrite_item(ms_conn_t
*c
)
155 return ms_get_next_get_item(c
);
156 } /* ms_get_random_overwrite_item */
159 * According to the proportion of operations(get or set), select
160 * an operation to do.
162 * @param c, pointer of the concurrency
163 * @param task, pointer of current task in the concurrency
165 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
)
167 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
168 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
170 /* update cycle operation number if necessary */
171 if ((task
->cycle_undo_get
== 0) || (task
->cycle_undo_set
== 0))
173 task
->cycle_undo_get
+= (int)(CMD_DISTR_ADJUST_CYCLE
* get_prop
);
174 task
->cycle_undo_set
+= (int)(CMD_DISTR_ADJUST_CYCLE
* set_prop
);
178 * According to operation distribution to choose doing which
179 * operation. If it can't set new object to sever, just change
180 * to do get operation.
182 if ((set_prop
> PROP_ERROR
)
183 && ((double)task
->get_opt
* set_prop
>= (double)task
->set_opt
187 task
->item
= ms_get_next_set_item(c
);
192 task
->item
= ms_get_next_get_item(c
);
194 } /* ms_select_opt */
198 * used to judge whether the number of get operations done is
199 * more than expected number of get operations to do right now.
201 * @param task, pointer of current task in the concurrency
203 * @return bool, if get too fast, return true, else return false
205 static bool ms_is_get_too_fast(ms_task_t
*task
)
207 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
208 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
210 /* no get operation */
211 if (get_prop
< PROP_ERROR
)
216 int max_undo_set
= (int)(set_prop
/ get_prop
* (1.0 + DISADJUST_FACTOR
))
217 * task
->cycle_undo_get
;
219 if (((double)task
->get_opt
* set_prop
> (double)task
->set_opt
* get_prop
)
220 && (task
->cycle_undo_set
> max_undo_set
))
226 } /* ms_is_get_too_fast */
230 * used to judge whether the number of set operations done is
231 * more than expected number of set operations to do right now.
233 * @param task, pointer of current task in the concurrency
235 * @return bool, if set too fast, return true, else return false
237 static bool ms_is_set_too_fast(ms_task_t
*task
)
239 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
240 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
242 /* no set operation */
243 if (set_prop
< PROP_ERROR
)
248 /* If it does set operation too fast, skip some */
249 int max_undo_get
= (int)((get_prop
/ set_prop
* (1.0 + DISADJUST_FACTOR
))
250 * (double)task
->cycle_undo_set
);
252 if (((double)task
->get_opt
* set_prop
< (double)task
->set_opt
* get_prop
)
253 && (task
->cycle_undo_get
> max_undo_get
))
259 } /* ms_is_set_too_fast */
263 * kick out the old item in the window, and add a new item to
264 * overwrite the old item. When we don't want to do overwrite
265 * object, and the current item to do set operation is an old
266 * item, we could kick out the old item and add a new item. Then
267 * we can ensure we set new object every time.
269 * @param item, pointer of task item which includes the object
272 static void ms_kick_out_item(ms_task_item_t
*item
)
274 /* allocate a new item */
275 item
->key_prefix
= ms_get_key_prefix();
277 item
->key_suffix_offset
++;
278 item
->value_offset
= INVALID_OFFSET
; /* new item use invalid value offset */
279 item
->client_time
= 0;
280 } /* ms_kick_out_item */
284 * used to judge whether we need overwrite object based on the
285 * options user specified
287 * @param task, pointer of current task in the concurrency
289 * @return bool, if need overwrite, return true, else return
292 static bool ms_need_overwrite_item(ms_task_t
*task
)
294 ms_task_item_t
*item
= task
->item
;
296 assert(item
!= NULL
);
297 assert(task
->cmd
== CMD_SET
);
300 * according to data overwrite percent to determine if do data
303 if (task
->overwrite_set
< (double)task
->set_opt
304 * ms_setting
.overwrite_percent
)
310 } /* ms_need_overwirte_item */
314 * used to adjust operation. the function must be called after
315 * select operation. the function change get operation to set
316 * operation, or set operation to get operation based on the
319 * @param c, pointer of the concurrency
320 * @param task, pointer of current task in the concurrency
322 * @return bool, if success, return true, else return false
324 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
)
326 ms_task_item_t
*item
= task
->item
;
328 assert(item
!= NULL
);
330 if (task
->cmd
== CMD_SET
)
332 /* If did set operation too fast, skip some */
333 if (ms_is_set_too_fast(task
))
335 /* get the item instead */
336 if (item
->value_offset
!= INVALID_OFFSET
)
343 /* If the current item is not a new item, kick it out */
344 if (item
->value_offset
!= INVALID_OFFSET
)
346 if (ms_need_overwrite_item(task
))
349 task
->overwrite_set
++;
353 /* kick out the current item to do set operation */
354 ms_kick_out_item(item
);
357 else /* it's a new item */
360 if (ms_need_overwrite_item(task
))
363 * overwrite not use the item with current set cursor, revert
368 item
= ms_get_random_overwrite_item(c
);
369 if (item
->value_offset
!= INVALID_OFFSET
)
372 task
->overwrite_set
++;
374 else /* item is a new item */
376 /* select the item to run, and cancel overwrite */
386 if (item
->value_offset
== INVALID_OFFSET
)
393 * If It does get operation too fast, it will change the
396 if (ms_is_get_too_fast(task
))
398 /* don't kick out the first item in the window */
399 if (! ms_is_set_too_fast(task
))
401 ms_kick_out_item(item
);
411 assert(item
->value_offset
!= INVALID_OFFSET
);
416 } /* ms_adjust_opt */
420 * used to initialize the task which need verify data.
422 * @param task, pointer of current task in the concurrency
424 static void ms_task_data_verify_init(ms_task_t
*task
)
426 ms_task_item_t
*item
= task
->item
;
428 assert(item
!= NULL
);
429 assert(task
->cmd
== CMD_GET
);
432 * according to data verification percent to determine if do
435 if (task
->verified_get
< (double)task
->get_opt
436 * ms_setting
.verify_percent
)
439 * currently it doesn't do verify, just increase the counter,
440 * and do verification next proper get command
442 if ((task
->item
->value_offset
!= INVALID_OFFSET
)
443 && (item
->exp_time
== 0))
446 task
->finish_verify
= false;
447 task
->verified_get
++;
450 } /* ms_task_data_verify_init */
454 * used to initialize the task which need verify expire time.
456 * @param task, pointer of current task in the concurrency
458 static void ms_task_expire_verify_init(ms_task_t
*task
)
460 ms_task_item_t
*item
= task
->item
;
462 assert(item
!= NULL
);
463 assert(task
->cmd
== CMD_GET
);
464 assert(item
->exp_time
> 0);
467 task
->finish_verify
= false;
468 } /* ms_task_expire_verify_init */
472 * used to get one task, the function initializes the task
475 * @param c, pointer of the concurrency
476 * @param warmup, whether it need warmup
478 * @return ms_task_t*, pointer of current task in the
481 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
)
483 ms_task_t
*task
= &c
->curr_task
;
488 task
->finish_verify
= true;
489 task
->get_miss
= true;
494 task
->item
= ms_get_next_set_item(c
);
499 /* according to operation distribution to choose doing which operation */
500 ms_select_opt(c
, task
);
502 if (! ms_adjust_opt(c
, task
))
507 if ((ms_setting
.verify_percent
> 0) && (task
->cmd
== CMD_GET
))
509 ms_task_data_verify_init(task
);
512 if ((ms_setting
.exp_ver_per
> 0) && (task
->cmd
== CMD_GET
)
513 && (task
->item
->exp_time
> 0))
515 ms_task_expire_verify_init(task
);
522 * Only update get and delete counter, set counter will be
523 * updated after set operation successes.
525 if (task
->cmd
== CMD_GET
)
528 task
->cycle_undo_get
--;
536 * send a signal to the main monitor thread
538 * @param sync_lock, pointer of the lock
540 static void ms_send_signal(ms_sync_lock_t
*sync_lock
)
542 pthread_mutex_lock(&sync_lock
->lock
);
544 pthread_cond_signal(&sync_lock
->cond
);
545 pthread_mutex_unlock(&sync_lock
->lock
);
546 } /* ms_send_signal */
550 * If user only want to do get operation, but there is no object
551 * in server , so we use this function to warmup the server, and
552 * set some objects to server. It runs at the beginning of task.
554 * @param c, pointer of the concurrency
556 static void ms_warmup_server(ms_conn_t
*c
)
559 ms_task_item_t
*item
;
562 * Extra one loop to get the last command returned state.
563 * Normally it gets the previous command returned state.
565 if ((c
->remain_warmup_num
>= 0)
566 && (c
->remain_warmup_num
!= c
->warmup_num
))
568 item
= ms_get_cur_opt_item(c
);
569 /* only update the set command result state for data verification */
570 if ((c
->precmd
.cmd
== CMD_SET
) && (c
->precmd
.retstat
== MCD_STORED
))
572 item
->value_offset
= item
->key_suffix_offset
;
573 /* set success, update counter */
576 else if (c
->precmd
.cmd
== CMD_SET
&& c
->precmd
.retstat
!= MCD_STORED
)
578 printf("key: %" PRIx64
" didn't set success\n", item
->key_prefix
);
582 /* the last time don't run a task */
583 if (c
->remain_warmup_num
-- > 0)
585 /* operate next task item */
586 task
= ms_get_task(c
, true);
592 * finish warming up server, wait all connects initialize
593 * complete. Then all connects can start do task at the same
596 if (c
->remain_warmup_num
== -1)
598 ms_send_signal(&ms_global
.warmup_lock
);
599 c
->remain_warmup_num
--; /* never run the if branch */
601 } /* ms_warmup_server */
605 * dispatch single get and set task
607 * @param c, pointer of the concurrency
609 static void ms_single_getset_task_sch(ms_conn_t
*c
)
612 ms_task_item_t
*item
;
614 /* the last time don't run a task */
615 if (c
->remain_exec_num
-- > 0)
617 task
= ms_get_task(c
, false);
619 if (task
->cmd
== CMD_SET
)
623 else if (task
->cmd
== CMD_GET
)
625 assert(task
->cmd
== CMD_GET
);
629 } /* ms_single_getset_task_sch */
633 * dispatch multi-get and set task
635 * @param c, pointer of the concurrency
637 static void ms_multi_getset_task_sch(ms_conn_t
*c
)
640 ms_mlget_task_item_t
*mlget_item
;
644 if (c
->remain_exec_num
-- > 0)
646 task
= ms_get_task(c
, false);
647 if (task
->cmd
== CMD_SET
) /* just do it */
649 ms_mcd_set(c
, task
->item
);
654 assert(task
->cmd
== CMD_GET
);
655 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.mlget_num
];
656 mlget_item
->item
= task
->item
;
657 mlget_item
->verify
= task
->verify
;
658 mlget_item
->finish_verify
= task
->finish_verify
;
659 mlget_item
->get_miss
= task
->get_miss
;
660 c
->mlget_task
.mlget_num
++;
662 /* enough multi-get task items can be done */
663 if ((c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
)
664 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
673 if ((c
->remain_exec_num
<= 0) && (c
->mlget_task
.mlget_num
> 0))
680 } /* ms_multi_getset_task_sch */
684 * calculate the difference value of two time points
686 * @param start_time, the start time
687 * @param end_time, the end time
689 * @return uint64_t, the difference value between start_time and end_time in us
691 int64_t ms_time_diff(struct timeval
*start_time
, struct timeval
*end_time
)
693 int64_t endtime
= end_time
->tv_sec
* 1000000 + end_time
->tv_usec
;
694 int64_t starttime
= start_time
->tv_sec
* 1000000 + start_time
->tv_usec
;
696 assert(endtime
>= starttime
);
698 return endtime
- starttime
;
703 * after get the response from server for multi-get, the
704 * function update the state of the task and do data verify if
707 * @param c, pointer of the concurrency
709 static void ms_update_multi_get_result(ms_conn_t
*c
)
711 ms_mlget_task_item_t
*mlget_item
;
712 ms_task_item_t
*item
;
713 char *orignval
= NULL
;
714 char *orignkey
= NULL
;
722 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
724 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
725 item
= mlget_item
->item
;
726 orignval
= &ms_setting
.char_block
[item
->value_offset
];
727 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
729 /* update get miss counter */
730 if (mlget_item
->get_miss
)
732 atomic_add_size(&ms_stats
.get_misses
, 1);
735 /* get nothing from server for this task item */
736 if (mlget_item
->verify
&& ! mlget_item
->finish_verify
)
738 /* verify expire time if necessary */
739 if (item
->exp_time
> 0)
741 struct timeval curr_time
;
742 gettimeofday(&curr_time
, NULL
);
744 /* object doesn't expire but can't get it now */
745 if (curr_time
.tv_sec
- item
->client_time
746 < item
->exp_time
- EXPIRE_TIME_ERROR
)
748 atomic_add_size(&ms_stats
.unexp_unget
, 1);
750 if (ms_setting
.verbose
)
754 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
755 localtime(&item
->client_time
));
756 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
757 localtime(&curr_time
.tv_sec
));
759 "\n\t<%d expire time verification failed, object "
760 "doesn't expire but can't get it now\n"
762 "\tkey: %" PRIx64
" %.*s\n"
763 "\tset time: %s current time: %s "
764 "diff time: %d expire time: %d\n"
765 "\texpected data len: %d\n"
766 "\texpected data: %.*s\n"
767 "\treceived data: \n",
771 item
->key_size
- (int)KEY_PREFIX_SIZE
,
775 (int)(curr_time
.tv_sec
- item
->client_time
),
786 atomic_add_size(&ms_stats
.vef_miss
, 1);
788 if (ms_setting
.verbose
)
790 fprintf(stderr
, "\n<%d data verification failed\n"
792 "\tkey: %" PRIx64
" %.*s\n"
793 "\texpected data len: %d\n"
794 "\texpected data: %.*s\n"
795 "\treceived data: \n",
796 c
->sfd
, item
->key_size
, item
->key_prefix
,
797 item
->key_size
- (int)KEY_PREFIX_SIZE
,
798 orignkey
, item
->value_size
, item
->value_size
, orignval
);
804 c
->mlget_task
.mlget_num
= 0;
805 c
->mlget_task
.value_index
= INVALID_OFFSET
;
806 } /* ms_update_multi_get_result */
810 * after get the response from server for single get, the
811 * function update the state of the task and do data verify if
814 * @param c, pointer of the concurrency
815 * @param item, pointer of task item which includes the object
818 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
)
820 char *orignval
= NULL
;
821 char *orignkey
= NULL
;
823 if ((c
== NULL
) || (item
== NULL
))
828 assert(item
!= NULL
);
830 orignval
= &ms_setting
.char_block
[item
->value_offset
];
831 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
833 /* update get miss counter */
834 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.get_miss
)
836 atomic_add_size(&ms_stats
.get_misses
, 1);
839 /* get nothing from server for this task item */
840 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.verify
841 && ! c
->curr_task
.finish_verify
)
843 /* verify expire time if necessary */
844 if (item
->exp_time
> 0)
846 struct timeval curr_time
;
847 gettimeofday(&curr_time
, NULL
);
849 /* object doesn't expire but can't get it now */
850 if (curr_time
.tv_sec
- item
->client_time
851 < item
->exp_time
- EXPIRE_TIME_ERROR
)
853 atomic_add_size(&ms_stats
.unexp_unget
, 1);
855 if (ms_setting
.verbose
)
859 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
860 localtime(&item
->client_time
));
861 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
862 localtime(&curr_time
.tv_sec
));
864 "\n\t<%d expire time verification failed, object "
865 "doesn't expire but can't get it now\n"
867 "\tkey: %" PRIx64
" %.*s\n"
868 "\tset time: %s current time: %s "
869 "diff time: %d expire time: %d\n"
870 "\texpected data len: %d\n"
871 "\texpected data: %.*s\n"
872 "\treceived data: \n",
876 item
->key_size
- (int)KEY_PREFIX_SIZE
,
880 (int)(curr_time
.tv_sec
- item
->client_time
),
891 atomic_add_size(&ms_stats
.vef_miss
, 1);
893 if (ms_setting
.verbose
)
895 fprintf(stderr
, "\n<%d data verification failed\n"
897 "\tkey: %" PRIx64
" %.*s\n"
898 "\texpected data len: %d\n"
899 "\texpected data: %.*s\n"
900 "\treceived data: \n",
901 c
->sfd
, item
->key_size
, item
->key_prefix
,
902 item
->key_size
- (int)KEY_PREFIX_SIZE
,
903 orignkey
, item
->value_size
, item
->value_size
, orignval
);
908 } /* ms_update_single_get_result */
912 * after get the response from server for set the function
913 * update the state of the task and do data verify if necessary.
915 * @param c, pointer of the concurrency
916 * @param item, pointer of task item which includes the object
919 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
)
921 if ((c
== NULL
) || (item
== NULL
))
926 assert(item
!= NULL
);
928 if (c
->precmd
.cmd
== CMD_SET
)
930 switch (c
->precmd
.retstat
)
933 if (item
->value_offset
== INVALID_OFFSET
)
935 /* first set with the same offset of key suffix */
936 item
->value_offset
= item
->key_suffix_offset
;
940 /* not first set, just increase the value offset */
941 item
->value_offset
+= 1;
944 /* set successes, update counter */
946 c
->curr_task
.set_opt
++;
947 c
->curr_task
.cycle_undo_set
--;
950 case MCD_SERVER_ERROR
:
955 } /* ms_update_set_result */
959 * update the response time result
961 * @param c, pointer of the concurrency
963 static void ms_update_stat_result(ms_conn_t
*c
)
965 bool get_miss
= false;
973 gettimeofday(&c
->end_time
, NULL
);
974 uint64_t time_diff
= (uint64_t)ms_time_diff(&c
->start_time
, &c
->end_time
);
976 pthread_mutex_lock(&ms_statistic
.stat_mutex
);
978 switch (c
->precmd
.cmd
)
981 ms_record_event(&ms_statistic
.set_stat
, time_diff
, false);
985 if (c
->curr_task
.get_miss
)
989 ms_record_event(&ms_statistic
.get_stat
, time_diff
, get_miss
);
996 ms_record_event(&ms_statistic
.total_stat
, time_diff
, get_miss
);
997 pthread_mutex_unlock(&ms_statistic
.stat_mutex
);
998 } /* ms_update_stat_result */
1002 * after get response from server for the current operation, and
1003 * before doing the next operation, update the state of the
1004 * current operation.
1006 * @param c, pointer of the concurrency
1008 static void ms_update_task_result(ms_conn_t
*c
)
1010 ms_task_item_t
*item
;
1018 item
= ms_get_cur_opt_item(c
);
1023 assert(item
!= NULL
);
1025 ms_update_set_result(c
, item
);
1027 if ((ms_setting
.stat_freq
> 0)
1028 && ((c
->precmd
.cmd
== CMD_SET
) || (c
->precmd
.cmd
== CMD_GET
)))
1030 ms_update_stat_result(c
);
1033 /* update multi-get task item */
1034 if (((ms_setting
.mult_key_num
> 1)
1035 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1036 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1038 ms_update_multi_get_result(c
);
1042 ms_update_single_get_result(c
, item
);
1044 } /* ms_update_task_result */
1048 * run get and set operation
1050 * @param c, pointer of the concurrency
1052 * @return int, if success, return EXIT_SUCCESS, else return -1
1054 static int ms_run_getset_task(ms_conn_t
*c
)
1057 * extra one loop to get the last command return state. get the
1058 * last command return state.
1060 if ((c
->remain_exec_num
>= 0)
1061 && (c
->remain_exec_num
!= c
->exec_num
))
1063 ms_update_task_result(c
);
1067 if (ms_setting
.mult_key_num
> 1)
1069 /* operate next task item */
1070 ms_multi_getset_task_sch(c
);
1074 /* operate next task item */
1075 ms_single_getset_task_sch(c
);
1078 /* no task to do, exit */
1079 if ((c
->remain_exec_num
== -1) || ms_global
.time_out
)
1084 return EXIT_SUCCESS
;
1085 } /* ms_run_getset_task */
1089 * the state machine call the function to execute task.
1091 * @param c, pointer of the concurrency
1093 * @return int, if success, return EXIT_SUCCESS, else return -1
1095 int ms_exec_task(struct conn
*c
)
1097 if (! ms_global
.finish_warmup
)
1099 ms_warmup_server(c
);
1103 if (ms_run_getset_task(c
) != 0)
1109 return EXIT_SUCCESS
;
1110 } /* ms_exec_task */