3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
16 #include "ms_thread.h"
17 #include "ms_setting.h"
18 #include "ms_atomic.h"
20 /* command distribution adjustment cycle */
21 #define CMD_DISTR_ADJUST_CYCLE 1000
22 #define DISADJUST_FACTOR 0.03 /**
23 * In one adjustment cycle, if undo set or get
24 * operations proportion is more than 3% , means
25 * there are too many new item or need more new
26 * item in the window. This factor shows it.
29 /* get item from task window */
30 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
);
31 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
);
32 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
);
33 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
);
36 /* select next operation to do */
37 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
);
40 /* set and get speed estimate for controlling and adjustment */
41 static bool ms_is_set_too_fast(ms_task_t
*task
);
42 static bool ms_is_get_too_fast(ms_task_t
*task
);
43 static void ms_kick_out_item(ms_task_item_t
*item
);
46 /* miss rate adjustment */
47 static bool ms_need_overwirte_item(ms_task_t
*task
);
48 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
);
51 /* deal with data verification initialization */
52 static void ms_task_data_verify_init(ms_task_t
*task
);
53 static void ms_task_expire_verify_init(ms_task_t
*task
);
56 /* select a new task to do */
57 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
);
60 /* run the selected task */
61 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
);
62 static void ms_update_stat_result(ms_conn_t
*c
);
63 static void ms_update_multi_get_result(ms_conn_t
*c
);
64 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
);
65 static void ms_update_task_result(ms_conn_t
*c
);
66 static void ms_single_getset_task_sch(ms_conn_t
*c
);
67 static void ms_multi_getset_task_sch(ms_conn_t
*c
);
68 static void ms_send_signal(ms_sync_lock_t
*sync_lock
);
69 static void ms_warmup_server(ms_conn_t
*c
);
70 static int ms_run_getset_task(ms_conn_t
*c
);
74 * used to get the current operation item(object)
76 * @param c, pointer of the concurrency
78 * @return ms_task_item_t*, current operating item
80 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
)
82 return c
->curr_task
.item
;
87 * used to get the next item to do get operation
89 * @param c, pointer of the concurrency
91 * @return ms_task_item_t*, the pointer of the next item to do
94 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
)
96 ms_task_item_t
*item
= NULL
;
98 if (c
->set_cursor
<= 0)
100 /* the first item in the window */
101 item
= &c
->item_win
[0];
103 else if (c
->set_cursor
> 0 && c
->set_cursor
< (uint32_t)c
->win_size
)
105 /* random get one item set before */
106 item
= &c
->item_win
[random() % (int64_t)c
->set_cursor
];
110 /* random get one item from the window */
111 item
= &c
->item_win
[random() % c
->win_size
];
115 } /* ms_get_next_get_item */
119 * used to get the next item to do set operation
121 * @param c, pointer of the concurrency
123 * @return ms_task_item_t*, the pointer of the next item to do
126 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
)
129 * when a set command successes, the cursor will plus 1. If set
130 * fails, the cursor doesn't change. it isn't necessary to
131 * increase the cursor here.
133 return &c
->item_win
[(int64_t)c
->set_cursor
% c
->win_size
];
138 * If we need do overwrite, we could select a item set before.
139 * This function is used to get a item set before to do
142 * @param c, pointer of the concurrency
144 * @return ms_task_item_t*, the pointer of the previous item of
147 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
)
149 if (c
->set_cursor
<= 0)
151 return &c
->item_win
[0];
155 return &c
->item_win
[(int64_t)-- c
->set_cursor
% c
->win_size
];
157 } /* ms_get_pre_set_item */
161 * According to the proportion of operations(get or set), select
162 * an operation to do.
164 * @param c, pointer of the concurrency
165 * @param task, pointer of current task in the concurrency
167 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
)
169 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
170 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
172 /* update cycle operation number if necessary */
173 if ((task
->cycle_undo_get
== 0) || (task
->cycle_undo_set
== 0))
175 task
->cycle_undo_get
+= (int)(CMD_DISTR_ADJUST_CYCLE
* get_prop
);
176 task
->cycle_undo_set
+= (int)(CMD_DISTR_ADJUST_CYCLE
* set_prop
);
180 * According to operation distribution to choose doing which
181 * operation. If it can't set new object to sever, just change
182 * to do get operation.
184 if ((set_prop
> PROP_ERROR
)
185 && ((double)task
->get_opt
* set_prop
>= (double)task
->set_opt
189 task
->item
= ms_get_next_set_item(c
);
194 task
->item
= ms_get_next_get_item(c
);
196 } /* ms_select_opt */
200 * used to judge whether the number of get operations done is
201 * more than expected number of get operations to do right now.
203 * @param task, pointer of current task in the concurrency
205 * @return bool, if get too fast, return true, else return false
207 static bool ms_is_get_too_fast(ms_task_t
*task
)
209 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
210 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
212 /* no get operation */
213 if (get_prop
< PROP_ERROR
)
218 int max_undo_set
= (int)(set_prop
/ get_prop
* (1.0 + DISADJUST_FACTOR
))
219 * task
->cycle_undo_get
;
221 if (((double)task
->get_opt
* set_prop
> (double)task
->set_opt
* get_prop
)
222 && (task
->cycle_undo_set
> max_undo_set
))
228 } /* ms_is_get_too_fast */
232 * used to judge whether the number of set operations done is
233 * more than expected number of set operations to do right now.
235 * @param task, pointer of current task in the concurrency
237 * @return bool, if set too fast, return true, else return false
239 static bool ms_is_set_too_fast(ms_task_t
*task
)
241 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
242 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
244 /* no set operation */
245 if (set_prop
< PROP_ERROR
)
250 /* If it does set operation too fast, skip some */
251 int max_undo_get
= (int)((get_prop
/ set_prop
* (1.0 + DISADJUST_FACTOR
))
252 * (double)task
->cycle_undo_set
);
254 if (((double)task
->get_opt
* set_prop
< (double)task
->set_opt
* get_prop
)
255 && (task
->cycle_undo_get
> max_undo_get
))
261 } /* ms_is_set_too_fast */
265 * kick out the old item in the window, and add a new item to
266 * overwrite the old item. When we don't want to do overwrite
267 * object, and the current item to do set operation is an old
268 * item, we could kick out the old item and add a new item. Then
269 * we can ensure we set new object every time.
271 * @param item, pointer of task item which includes the object
274 static void ms_kick_out_item(ms_task_item_t
*item
)
276 /* allocate a new item */
277 item
->key_prefix
= ms_get_key_prefix();
279 item
->key_suffix_offset
++;
280 item
->value_offset
= INVALID_OFFSET
; /* new item use invalid value offset */
281 item
->client_time
= 0;
282 } /* ms_kick_out_item */
286 * used to judge whether we need overwrite object based on the
287 * options user specified
289 * @param task, pointer of current task in the concurrency
291 * @return bool, if need overwrite, return true, else return
294 static bool ms_need_overwirte_item(ms_task_t
*task
)
296 ms_task_item_t
*item
= task
->item
;
298 assert(item
!= NULL
);
299 assert(task
->cmd
== CMD_SET
);
302 * according to data overwrite percent to determine if do data
305 if (task
->overwrite_set
< (double)task
->set_opt
306 * ms_setting
.overwrite_percent
)
312 } /* ms_need_overwirte_item */
316 * used to adjust operation. the function must be called after
317 * select operation. the function change get operation to set
318 * operation, or set operation to get operation based on the
321 * @param c, pointer of the concurrency
322 * @param task, pointer of current task in the concurrency
324 * @return bool, if success, return true, else return false
326 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
)
328 ms_task_item_t
*item
= task
->item
;
330 assert(item
!= NULL
);
332 if (task
->cmd
== CMD_SET
)
334 /* If did set operation too fast, skip some */
335 if (ms_is_set_too_fast(task
))
337 /* get the item instead */
338 if (item
->value_offset
!= INVALID_OFFSET
)
345 /* If the current item is not a new item, kick it out */
346 if (item
->value_offset
!= INVALID_OFFSET
)
348 if (ms_need_overwirte_item(task
))
351 task
->overwrite_set
++;
355 /* kick out the current item to do set operation */
356 ms_kick_out_item(item
);
359 else /* it's a new item */
362 if (ms_need_overwirte_item(task
))
364 item
= ms_get_pre_set_item(c
);
365 if (item
->value_offset
!= INVALID_OFFSET
)
368 task
->overwrite_set
++;
370 else /* previous set item is a new item */
372 /* select the previous item to run, and cancel overwrite */
382 if (item
->value_offset
== INVALID_OFFSET
)
389 * If It does get operation too fast, it will change the
392 if (ms_is_get_too_fast(task
))
394 /* don't kick out the first item in the window */
395 if (! ms_is_set_too_fast(task
))
397 ms_kick_out_item(item
);
407 assert(item
->value_offset
!= INVALID_OFFSET
);
412 } /* ms_adjust_opt */
416 * used to initialize the task which need verify data.
418 * @param task, pointer of current task in the concurrency
420 static void ms_task_data_verify_init(ms_task_t
*task
)
422 ms_task_item_t
*item
= task
->item
;
424 assert(item
!= NULL
);
425 assert(task
->cmd
== CMD_GET
);
428 * according to data verification percent to determine if do
431 if (task
->verified_get
< (double)task
->get_opt
432 * ms_setting
.verify_percent
)
435 * currently it doesn't do verify, just increase the counter,
436 * and do verification next proper get command
438 if ((task
->item
->value_offset
!= INVALID_OFFSET
)
439 && (item
->exp_time
== 0))
442 task
->finish_verify
= false;
443 task
->verified_get
++;
446 } /* ms_task_data_verify_init */
450 * used to initialize the task which need verify expire time.
452 * @param task, pointer of current task in the concurrency
454 static void ms_task_expire_verify_init(ms_task_t
*task
)
456 ms_task_item_t
*item
= task
->item
;
458 assert(item
!= NULL
);
459 assert(task
->cmd
== CMD_GET
);
460 assert(item
->exp_time
> 0);
463 task
->finish_verify
= false;
464 } /* ms_task_expire_verify_init */
468 * used to get one task, the function initializes the task
471 * @param c, pointer of the concurrency
472 * @param warmup, whether it need warmup
474 * @return ms_task_t*, pointer of current task in the
477 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
)
479 ms_task_t
*task
= &c
->curr_task
;
484 task
->finish_verify
= true;
485 task
->get_miss
= true;
490 task
->item
= ms_get_next_set_item(c
);
495 /* according to operation distribution to choose doing which operation */
496 ms_select_opt(c
, task
);
498 if (! ms_adjust_opt(c
, task
))
503 if ((ms_setting
.verify_percent
> 0) && (task
->cmd
== CMD_GET
))
505 ms_task_data_verify_init(task
);
508 if ((ms_setting
.exp_ver_per
> 0) && (task
->cmd
== CMD_GET
)
509 && (task
->item
->exp_time
> 0))
511 ms_task_expire_verify_init(task
);
518 * Only update get and delete counter, set counter will be
519 * updated after set operation successes.
521 if (task
->cmd
== CMD_GET
)
524 task
->cycle_undo_get
--;
532 * send a signal to the main monitor thread
534 * @param sync_lock, pointer of the lock
536 static void ms_send_signal(ms_sync_lock_t
*sync_lock
)
538 pthread_mutex_lock(&sync_lock
->lock
);
540 pthread_cond_signal(&sync_lock
->cond
);
541 pthread_mutex_unlock(&sync_lock
->lock
);
542 } /* ms_send_signal */
546 * If user only want to do get operation, but there is no object
547 * in server , so we use this function to warmup the server, and
548 * set some objects to server. It runs at the beginning of task.
550 * @param c, pointer of the concurrency
552 static void ms_warmup_server(ms_conn_t
*c
)
555 ms_task_item_t
*item
;
558 * Extra one loop to get the last command returned state.
559 * Normally it gets the previous command returned state.
561 if ((c
->remain_warmup_num
>= 0)
562 && (c
->remain_warmup_num
!= c
->warmup_num
))
564 item
= ms_get_cur_opt_item(c
);
565 /* only update the set command result state for data verification */
566 if ((c
->precmd
.cmd
== CMD_SET
) && (c
->precmd
.retstat
== MCD_STORED
))
568 item
->value_offset
= item
->key_suffix_offset
;
569 /* set success, update counter */
572 else if (c
->precmd
.cmd
== CMD_SET
&& c
->precmd
.retstat
!= MCD_STORED
)
574 printf("key: %" PRIx64
" didn't set success\n", item
->key_prefix
);
578 /* the last time don't run a task */
579 if (c
->remain_warmup_num
-- > 0)
581 /* operate next task item */
582 task
= ms_get_task(c
, true);
588 * finish warming up server, wait all connects initialize
589 * complete. Then all connects can start do task at the same
592 if (c
->remain_warmup_num
== -1)
594 ms_send_signal(&ms_global
.init_lock
);
595 c
->remain_warmup_num
--; /* never run the if branch */
597 } /* ms_warmup_server */
601 * dispatch single get and set task
603 * @param c, pointer of the concurrency
605 static void ms_single_getset_task_sch(ms_conn_t
*c
)
608 ms_task_item_t
*item
;
610 /* the last time don't run a task */
611 if (c
->remain_exec_num
-- > 0)
613 task
= ms_get_task(c
, false);
615 if (task
->cmd
== CMD_SET
)
619 else if (task
->cmd
== CMD_GET
)
621 assert(task
->cmd
== CMD_GET
);
622 ms_mcd_get(c
, item
, task
->verify
);
625 } /* ms_single_getset_task_sch */
629 * dispatch multi-get and set task
631 * @param c, pointer of the concurrency
633 static void ms_multi_getset_task_sch(ms_conn_t
*c
)
636 ms_mlget_task_item_t
*mlget_item
;
640 if (c
->remain_exec_num
-- > 0)
642 task
= ms_get_task(c
, false);
643 if (task
->cmd
== CMD_SET
) /* just do it */
645 ms_mcd_set(c
, task
->item
);
650 assert(task
->cmd
== CMD_GET
);
651 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.mlget_num
];
652 mlget_item
->item
= task
->item
;
653 mlget_item
->verify
= task
->verify
;
654 mlget_item
->finish_verify
= task
->finish_verify
;
655 mlget_item
->get_miss
= task
->get_miss
;
656 c
->mlget_task
.mlget_num
++;
658 /* enough multi-get task items can be done */
659 if ((c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
)
660 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
669 if ((c
->remain_exec_num
<= 0) && (c
->mlget_task
.mlget_num
> 0))
676 } /* ms_multi_getset_task_sch */
680 * calculate the difference value of two time points
682 * @param start_time, the start time
683 * @param end_time, the end time
685 * @return uint64_t, the difference value between start_time and end_time in us
687 int64_t ms_time_diff(struct timeval
*start_time
, struct timeval
*end_time
)
689 int64_t endtime
= end_time
->tv_sec
* 1000000 + end_time
->tv_usec
;
690 int64_t starttime
= start_time
->tv_sec
* 1000000 + start_time
->tv_usec
;
692 assert(endtime
>= starttime
);
694 return endtime
- starttime
;
699 * after get the response from server for multi-get, the
700 * function update the state of the task and do data verify if
703 * @param c, pointer of the concurrency
705 static void ms_update_multi_get_result(ms_conn_t
*c
)
707 ms_mlget_task_item_t
*mlget_item
;
708 ms_task_item_t
*item
;
709 char *orignval
= NULL
;
710 char *orignkey
= NULL
;
718 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
720 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
721 item
= mlget_item
->item
;
722 orignval
= &ms_setting
.char_block
[item
->value_offset
];
723 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
725 /* update get miss counter */
726 if (mlget_item
->get_miss
)
728 atomic_add_size(&ms_stats
.get_misses
, 1);
731 /* get nothing from server for this task item */
732 if (mlget_item
->verify
&& ! mlget_item
->finish_verify
)
734 /* verify expire time if necessary */
735 if (item
->exp_time
> 0)
737 struct timeval curr_time
;
738 gettimeofday(&curr_time
, NULL
);
740 /* object doesn't expire but can't get it now */
741 if (curr_time
.tv_sec
- item
->client_time
742 < item
->exp_time
- EXPIRE_TIME_ERROR
)
744 atomic_add_size(&ms_stats
.unexp_unget
, 1);
746 if (ms_setting
.verbose
)
750 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
751 localtime(&item
->client_time
));
752 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
753 localtime(&curr_time
.tv_sec
));
755 "\n\t<%d expire time verification failed, object "
756 "doesn't expire but can't get it now\n"
758 "\tkey: %" PRIx64
" %.*s\n"
759 "\tset time: %s current time: %s "
760 "diff time: %d expire time: %d\n"
761 "\texpected data len: %d\n"
762 "\texpected data: %.*s\n"
763 "\treceived data: \n",
767 item
->key_size
- (int)KEY_PREFIX_SIZE
,
771 (int)(curr_time
.tv_sec
- item
->client_time
),
782 atomic_add_size(&ms_stats
.vef_miss
, 1);
784 if (ms_setting
.verbose
)
786 fprintf(stderr
, "\n<%d data verification failed\n"
788 "\tkey: %" PRIx64
" %.*s\n"
789 "\texpected data len: %d\n"
790 "\texpected data: %.*s\n"
791 "\treceived data: \n",
792 c
->sfd
, item
->key_size
, item
->key_prefix
,
793 item
->key_size
- (int)KEY_PREFIX_SIZE
,
794 orignkey
, item
->value_size
, item
->value_size
, orignval
);
800 c
->mlget_task
.mlget_num
= 0;
801 c
->mlget_task
.value_index
= INVALID_OFFSET
;
802 } /* ms_update_multi_get_result */
806 * after get the response from server for single get, the
807 * function update the state of the task and do data verify if
810 * @param c, pointer of the concurrency
811 * @param item, pointer of task item which includes the object
814 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
)
816 char *orignval
= NULL
;
817 char *orignkey
= NULL
;
819 if ((c
== NULL
) || (item
== NULL
))
824 assert(item
!= NULL
);
826 orignval
= &ms_setting
.char_block
[item
->value_offset
];
827 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
829 /* update get miss counter */
830 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.get_miss
)
832 atomic_add_size(&ms_stats
.get_misses
, 1);
835 /* get nothing from server for this task item */
836 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.verify
837 && ! c
->curr_task
.finish_verify
)
839 /* verify expire time if necessary */
840 if (item
->exp_time
> 0)
842 struct timeval curr_time
;
843 gettimeofday(&curr_time
, NULL
);
845 /* object doesn't expire but can't get it now */
846 if (curr_time
.tv_sec
- item
->client_time
847 < item
->exp_time
- EXPIRE_TIME_ERROR
)
849 atomic_add_size(&ms_stats
.unexp_unget
, 1);
851 if (ms_setting
.verbose
)
855 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
856 localtime(&item
->client_time
));
857 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
858 localtime(&curr_time
.tv_sec
));
860 "\n\t<%d expire time verification failed, object "
861 "doesn't expire but can't get it now\n"
863 "\tkey: %" PRIx64
" %.*s\n"
864 "\tset time: %s current time: %s "
865 "diff time: %d expire time: %d\n"
866 "\texpected data len: %d\n"
867 "\texpected data: %.*s\n"
868 "\treceived data: \n",
872 item
->key_size
- (int)KEY_PREFIX_SIZE
,
876 (int)(curr_time
.tv_sec
- item
->client_time
),
887 atomic_add_size(&ms_stats
.vef_miss
, 1);
889 if (ms_setting
.verbose
)
891 fprintf(stderr
, "\n<%d data verification failed\n"
893 "\tkey: %" PRIx64
" %.*s\n"
894 "\texpected data len: %d\n"
895 "\texpected data: %.*s\n"
896 "\treceived data: \n",
897 c
->sfd
, item
->key_size
, item
->key_prefix
,
898 item
->key_size
- (int)KEY_PREFIX_SIZE
,
899 orignkey
, item
->value_size
, item
->value_size
, orignval
);
904 } /* ms_update_single_get_result */
908 * after get the response from server for set the function
909 * update the state of the task and do data verify if necessary.
911 * @param c, pointer of the concurrency
912 * @param item, pointer of task item which includes the object
915 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
)
917 if ((c
== NULL
) || (item
== NULL
))
922 assert(item
!= NULL
);
924 if (c
->precmd
.cmd
== CMD_SET
)
926 switch (c
->precmd
.retstat
)
929 if (item
->value_offset
== INVALID_OFFSET
)
931 /* first set with the same offset of key suffix */
932 item
->value_offset
= item
->key_suffix_offset
;
936 /* not first set, just increase the value offset */
937 item
->value_offset
+= 1;
940 /* set successes, update counter */
942 c
->curr_task
.set_opt
++;
943 c
->curr_task
.cycle_undo_set
--;
946 case MCD_SERVER_ERROR
:
951 } /* ms_update_set_result */
955 * update the response time result
957 * @param c, pointer of the concurrency
959 static void ms_update_stat_result(ms_conn_t
*c
)
961 bool get_miss
= false;
969 gettimeofday(&c
->end_time
, NULL
);
970 uint64_t time_diff
= (uint64_t)ms_time_diff(&c
->start_time
, &c
->end_time
);
972 pthread_mutex_lock(&ms_statistic
.stat_mutex
);
974 switch (c
->precmd
.cmd
)
977 ms_record_event(&ms_statistic
.set_stat
, time_diff
, false);
981 if (c
->curr_task
.get_miss
)
985 ms_record_event(&ms_statistic
.get_stat
, time_diff
, get_miss
);
992 ms_record_event(&ms_statistic
.total_stat
, time_diff
, get_miss
);
993 pthread_mutex_unlock(&ms_statistic
.stat_mutex
);
994 } /* ms_update_stat_result */
998 * after get response from server for the current operation, and
999 * before doing the next operation, update the state of the
1000 * current operation.
1002 * @param c, pointer of the concurrency
1004 static void ms_update_task_result(ms_conn_t
*c
)
1006 ms_task_item_t
*item
;
1014 item
= ms_get_cur_opt_item(c
);
1019 assert(item
!= NULL
);
1021 ms_update_set_result(c
, item
);
1023 if ((ms_setting
.stat_freq
> 0)
1024 && ((c
->precmd
.cmd
== CMD_SET
) || (c
->precmd
.cmd
== CMD_GET
)))
1026 ms_update_stat_result(c
);
1029 /* update multi-get task item */
1030 if (((ms_setting
.mult_key_num
> 1)
1031 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1032 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1034 ms_update_multi_get_result(c
);
1038 ms_update_single_get_result(c
, item
);
1040 } /* ms_update_task_result */
1044 * run get and set operation
1046 * @param c, pointer of the concurrency
1048 * @return int, if success, return 0, else return -1
1050 static int ms_run_getset_task(ms_conn_t
*c
)
1053 * extra one loop to get the last command return state. get the
1054 * last command return state.
1056 if ((c
->remain_exec_num
>= 0)
1057 && (c
->remain_exec_num
!= c
->exec_num
))
1059 ms_update_task_result(c
);
1063 if (ms_setting
.mult_key_num
> 1)
1065 /* operate next task item */
1066 ms_multi_getset_task_sch(c
);
1070 /* operate next task item */
1071 ms_single_getset_task_sch(c
);
1074 /* no task to do, exit */
1075 if ((c
->remain_exec_num
== -1) || ms_global
.time_out
)
1081 } /* ms_run_getset_task */
1085 * the state machine call the function to execute task.
1087 * @param c, pointer of the concurrency
1089 * @return int, if success, return 0, else return -1
1091 int ms_exec_task(struct conn
*c
)
1093 if (! ms_global
.finish_warmup
)
1095 ms_warmup_server(c
);
1099 if (ms_run_getset_task(c
) != 0)
1106 } /* ms_exec_task */