3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
14 #include "ms_thread.h"
15 #include "ms_setting.h"
17 /* command distribution adjustment cycle */
18 #define CMD_DISTR_ADJUST_CYCLE 1000
19 #define DISADJUST_FACTOR 0.03 /**
20 * In one adjustment cycle, if undo set or get
21 * operations proportion is more than 3% , means
22 * there are too many new item or need more new
23 * item in the window. This factor shows it.
26 extern __thread ms_thread_t ms_thread
;
28 /* get item from task window */
29 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
);
30 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
);
31 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
);
32 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
);
35 /* select next operation to do */
36 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
);
39 /* set and get speed estimate for controlling and adjustment */
40 static bool ms_is_set_too_fast(ms_task_t
*task
);
41 static bool ms_is_get_too_fast(ms_task_t
*task
);
42 static void ms_kick_out_item(ms_task_item_t
*item
);
45 /* miss rate adjustment */
46 static bool ms_need_overwirte_item(ms_task_t
*task
);
47 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
);
50 /* deal with data verification initialization */
51 static void ms_task_data_verify_init(ms_task_t
*task
);
52 static void ms_task_expire_verify_init(ms_task_t
*task
);
55 /* select a new task to do */
56 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
);
59 /* run the selected task */
60 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
);
61 static void ms_update_stat_result(ms_conn_t
*c
);
62 static void ms_update_multi_get_result(ms_conn_t
*c
);
63 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
);
64 static void ms_update_task_result(ms_conn_t
*c
);
65 static void ms_single_getset_task_sch(ms_conn_t
*c
);
66 static void ms_multi_getset_task_sch(ms_conn_t
*c
);
67 static void ms_send_signal(ms_sync_lock_t
*sync_lock
);
68 static void ms_warmup_server(ms_conn_t
*c
);
69 static int ms_run_getset_task(ms_conn_t
*c
);
73 * used to get the current operation item(object)
75 * @param c, pointer of the concurrency
77 * @return ms_task_item_t*, current operating item
79 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
)
81 return c
->curr_task
.item
;
86 * used to get the next item to do get operation
88 * @param c, pointer of the concurrency
90 * @return ms_task_item_t*, the pointer of the next item to do
93 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
)
95 ms_task_item_t
*item
= NULL
;
97 if (c
->set_cursor
<= 0)
99 /* the first item in the window */
100 item
= &c
->item_win
[0];
102 else if (c
->set_cursor
> 0 && c
->set_cursor
< (uint32_t)c
->win_size
)
104 /* random get one item set before */
105 item
= &c
->item_win
[random() % (int64_t)c
->set_cursor
];
109 /* random get one item from the window */
110 item
= &c
->item_win
[random() % c
->win_size
];
114 } /* ms_get_next_get_item */
118 * used to get the next item to do set operation
120 * @param c, pointer of the concurrency
122 * @return ms_task_item_t*, the pointer of the next item to do
125 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
)
128 * when a set command successes, the cursor will plus 1. If set
129 * fails, the cursor doesn't change. it isn't necessary to
130 * increase the cursor here.
132 return &c
->item_win
[(int64_t)c
->set_cursor
% c
->win_size
];
137 * If we need do overwrite, we could select a item set before.
138 * This function is used to get a item set before to do
141 * @param c, pointer of the concurrency
143 * @return ms_task_item_t*, the pointer of the previous item of
146 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
)
148 if (c
->set_cursor
<= 0)
150 return &c
->item_win
[0];
154 return &c
->item_win
[(int64_t)-- c
->set_cursor
% c
->win_size
];
156 } /* ms_get_pre_set_item */
160 * According to the proportion of operations(get or set), select
161 * an operation to do.
163 * @param c, pointer of the concurrency
164 * @param task, pointer of current task in the concurrency
166 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
)
168 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
169 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
171 /* update cycle operation number if necessary */
172 if ((task
->cycle_undo_get
== 0) || (task
->cycle_undo_set
== 0))
174 task
->cycle_undo_get
+= (int)(CMD_DISTR_ADJUST_CYCLE
* get_prop
);
175 task
->cycle_undo_set
+= (int)(CMD_DISTR_ADJUST_CYCLE
* set_prop
);
179 * According to operation distribution to choose doing which
180 * operation. If it can't set new object to sever, just change
181 * to do get operation.
183 if ((set_prop
> PROP_ERROR
)
184 && ((double)task
->get_opt
* set_prop
>= (double)task
->set_opt
188 task
->item
= ms_get_next_set_item(c
);
193 task
->item
= ms_get_next_get_item(c
);
195 } /* ms_select_opt */
199 * used to judge whether the number of get operations done is
200 * more than expected number of get operations to do right now.
202 * @param task, pointer of current task in the concurrency
204 * @return bool, if get too fast, return true, else return false
206 static bool ms_is_get_too_fast(ms_task_t
*task
)
208 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
209 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
211 /* no get operation */
212 if (get_prop
< PROP_ERROR
)
217 int max_undo_set
= (int)(set_prop
/ get_prop
* (1.0 + DISADJUST_FACTOR
))
218 * task
->cycle_undo_get
;
220 if (((double)task
->get_opt
* set_prop
> (double)task
->set_opt
* get_prop
)
221 && (task
->cycle_undo_set
> max_undo_set
))
227 } /* ms_is_get_too_fast */
231 * used to judge whether the number of set operations done is
232 * more than expected number of set operations to do right now.
234 * @param task, pointer of current task in the concurrency
236 * @return bool, if set too fast, return true, else return false
238 static bool ms_is_set_too_fast(ms_task_t
*task
)
240 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
241 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
243 /* no set operation */
244 if (set_prop
< PROP_ERROR
)
249 /* If it does set operation too fast, skip some */
250 int max_undo_get
= (int)((get_prop
/ set_prop
* (1.0 + DISADJUST_FACTOR
))
251 * (double)task
->cycle_undo_set
);
253 if (((double)task
->get_opt
* set_prop
< (double)task
->set_opt
* get_prop
)
254 && (task
->cycle_undo_get
> max_undo_get
))
260 } /* ms_is_set_too_fast */
264 * kick out the old item in the window, and add a new item to
265 * overwrite the old item. When we don't want to do overwrite
266 * object, and the current item to do set operation is an old
267 * item, we could kick out the old item and add a new item. Then
268 * we can ensure we set new object every time.
270 * @param item, pointer of task item which includes the object
273 static void ms_kick_out_item(ms_task_item_t
*item
)
275 /* allocate a new item */
276 item
->key_prefix
= ms_get_key_prefix();
278 item
->key_suffix_offset
++;
279 item
->value_offset
= INVALID_OFFSET
; /* new item use invalid value offset */
280 item
->client_time
= 0;
281 } /* ms_kick_out_item */
285 * used to judge whether we need overwrite object based on the
286 * options user specified
288 * @param task, pointer of current task in the concurrency
290 * @return bool, if need overwrite, return true, else return
293 static bool ms_need_overwirte_item(ms_task_t
*task
)
295 ms_task_item_t
*item
= task
->item
;
297 assert(item
!= NULL
);
298 assert(task
->cmd
== CMD_SET
);
301 * according to data overwrite percent to determine if do data
304 if (task
->overwrite_set
< (double)task
->set_opt
305 * ms_setting
.overwrite_percent
)
311 } /* ms_need_overwirte_item */
315 * used to adjust operation. the function must be called after
316 * select operation. the function change get operation to set
317 * operation, or set operation to get operation based on the
320 * @param c, pointer of the concurrency
321 * @param task, pointer of current task in the concurrency
323 * @return bool, if success, return true, else return false
325 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
)
327 ms_task_item_t
*item
= task
->item
;
329 assert(item
!= NULL
);
331 if (task
->cmd
== CMD_SET
)
333 /* If did set operation too fast, skip some */
334 if (ms_is_set_too_fast(task
))
336 /* get the item instead */
337 if (item
->value_offset
!= INVALID_OFFSET
)
344 /* If the current item is not a new item, kick it out */
345 if (item
->value_offset
!= INVALID_OFFSET
)
347 if (ms_need_overwirte_item(task
))
350 task
->overwrite_set
++;
354 /* kick out the current item to do set operation */
355 ms_kick_out_item(item
);
358 else /* it's a new item */
361 if (ms_need_overwirte_item(task
))
363 item
= ms_get_pre_set_item(c
);
364 if (item
->value_offset
!= INVALID_OFFSET
)
367 task
->overwrite_set
++;
369 else /* previous set item is a new item */
371 /* select the previous item to run, and cancel overwrite */
381 if (item
->value_offset
== INVALID_OFFSET
)
388 * If It does get operation too fast, it will change the
391 if (ms_is_get_too_fast(task
))
393 /* don't kick out the first item in the window */
394 if (! ms_is_set_too_fast(task
))
396 ms_kick_out_item(item
);
406 assert(item
->value_offset
!= INVALID_OFFSET
);
411 } /* ms_adjust_opt */
415 * used to initialize the task which need verify data.
417 * @param task, pointer of current task in the concurrency
419 static void ms_task_data_verify_init(ms_task_t
*task
)
421 ms_task_item_t
*item
= task
->item
;
423 assert(item
!= NULL
);
424 assert(task
->cmd
== CMD_GET
);
427 * according to data verification percent to determine if do
430 if (task
->verified_get
< (double)task
->get_opt
431 * ms_setting
.verify_percent
)
434 * currently it doesn't do verify, just increase the counter,
435 * and do verification next proper get command
437 if ((task
->item
->value_offset
!= INVALID_OFFSET
)
438 && (item
->exp_time
== 0))
441 task
->finish_verify
= false;
442 task
->verified_get
++;
445 } /* ms_task_data_verify_init */
449 * used to initialize the task which need verify expire time.
451 * @param task, pointer of current task in the concurrency
453 static void ms_task_expire_verify_init(ms_task_t
*task
)
455 ms_task_item_t
*item
= task
->item
;
457 assert(item
!= NULL
);
458 assert(task
->cmd
== CMD_GET
);
459 assert(item
->exp_time
> 0);
462 task
->finish_verify
= false;
463 } /* ms_task_expire_verify_init */
467 * used to get one task, the function initializes the task
470 * @param c, pointer of the concurrency
471 * @param warmup, whether it need warmup
473 * @return ms_task_t*, pointer of current task in the
476 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
)
478 ms_task_t
*task
= &c
->curr_task
;
483 task
->finish_verify
= true;
484 task
->get_miss
= true;
489 task
->item
= ms_get_next_set_item(c
);
494 /* according to operation distribution to choose doing which operation */
495 ms_select_opt(c
, task
);
497 if (! ms_adjust_opt(c
, task
))
502 if ((ms_setting
.verify_percent
> 0) && (task
->cmd
== CMD_GET
))
504 ms_task_data_verify_init(task
);
507 if ((ms_setting
.exp_ver_per
> 0) && (task
->cmd
== CMD_GET
)
508 && (task
->item
->exp_time
> 0))
510 ms_task_expire_verify_init(task
);
517 * Only update get and delete counter, set counter will be
518 * updated after set operation successes.
520 if (task
->cmd
== CMD_GET
)
523 task
->cycle_undo_get
--;
531 * send a signal to the main monitor thread
533 * @param sync_lock, pointer of the lock
535 static void ms_send_signal(ms_sync_lock_t
*sync_lock
)
537 pthread_mutex_lock(&sync_lock
->lock
);
539 pthread_cond_signal(&sync_lock
->cond
);
540 pthread_mutex_unlock(&sync_lock
->lock
);
541 } /* ms_send_signal */
545 * If user only want to do get operation, but there is no object
546 * in server , so we use this function to warmup the server, and
547 * set some objects to server. It runs at the beginning of task.
549 * @param c, pointer of the concurrency
551 static void ms_warmup_server(ms_conn_t
*c
)
554 ms_task_item_t
*item
;
557 * Extra one loop to get the last command returned state.
558 * Normally it gets the previous command returned state.
560 if ((c
->remain_warmup_num
>= 0)
561 && (c
->remain_warmup_num
!= c
->warmup_num
))
563 item
= ms_get_cur_opt_item(c
);
564 /* only update the set command result state for data verification */
565 if ((c
->precmd
.cmd
== CMD_SET
) && (c
->precmd
.retstat
== MCD_STORED
))
567 item
->value_offset
= item
->key_suffix_offset
;
568 /* set success, update counter */
571 else if (c
->precmd
.cmd
== CMD_SET
&& c
->precmd
.retstat
!= MCD_STORED
)
573 printf("key: %lx didn't set success\n", item
->key_prefix
);
577 /* the last time don't run a task */
578 if (c
->remain_warmup_num
-- > 0)
580 /* operate next task item */
581 task
= ms_get_task(c
, true);
587 * finish warming up server, wait all connects initialize
588 * complete. Then all connects can start do task at the same
591 if (c
->remain_warmup_num
== -1)
593 ms_send_signal(&ms_global
.init_lock
);
594 c
->remain_warmup_num
--; /* never run the if branch */
596 } /* ms_warmup_server */
600 * dispatch single get and set task
602 * @param c, pointer of the concurrency
604 static void ms_single_getset_task_sch(ms_conn_t
*c
)
607 ms_task_item_t
*item
;
609 /* the last time don't run a task */
610 if (c
->remain_exec_num
-- > 0)
612 task
= ms_get_task(c
, false);
614 if (task
->cmd
== CMD_SET
)
618 else if (task
->cmd
== CMD_GET
)
620 assert(task
->cmd
== CMD_GET
);
621 ms_mcd_get(c
, item
, task
->verify
);
624 } /* ms_single_getset_task_sch */
628 * dispatch multi-get and set task
630 * @param c, pointer of the concurrency
632 static void ms_multi_getset_task_sch(ms_conn_t
*c
)
635 ms_mlget_task_item_t
*mlget_item
;
639 if (c
->remain_exec_num
-- > 0)
641 task
= ms_get_task(c
, false);
642 if (task
->cmd
== CMD_SET
) /* just do it */
644 ms_mcd_set(c
, task
->item
);
649 assert(task
->cmd
== CMD_GET
);
650 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.mlget_num
];
651 mlget_item
->item
= task
->item
;
652 mlget_item
->verify
= task
->verify
;
653 mlget_item
->finish_verify
= task
->finish_verify
;
654 mlget_item
->get_miss
= task
->get_miss
;
655 c
->mlget_task
.mlget_num
++;
657 /* enough multi-get task items can be done */
658 if ((c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
)
659 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
668 if ((c
->remain_exec_num
<= 0) && (c
->mlget_task
.mlget_num
> 0))
675 } /* ms_multi_getset_task_sch */
679 * calculate the difference value of two time points
681 * @param start_time, the start time
682 * @param end_time, the end time
684 * @return uint64_t, the difference value between start_time and end_time in us
686 int64_t ms_time_diff(struct timeval
*start_time
, struct timeval
*end_time
)
688 int64_t endtime
= end_time
->tv_sec
* 1000000 + end_time
->tv_usec
;
689 int64_t starttime
= start_time
->tv_sec
* 1000000 + start_time
->tv_usec
;
691 assert(endtime
>= starttime
);
693 return endtime
- starttime
;
698 * after get the response from server for multi-get, the
699 * function update the state of the task and do data verify if
702 * @param c, pointer of the concurrency
704 static void ms_update_multi_get_result(ms_conn_t
*c
)
706 ms_mlget_task_item_t
*mlget_item
;
707 ms_task_item_t
*item
;
708 char *orignval
= NULL
;
709 char *orignkey
= NULL
;
717 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
719 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
720 item
= mlget_item
->item
;
721 orignval
= &ms_setting
.char_block
[item
->value_offset
];
722 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
724 /* update get miss counter */
725 if (mlget_item
->get_miss
)
727 __sync_fetch_and_add(&ms_stats
.get_misses
, 1);
730 /* get nothing from server for this task item */
731 if (mlget_item
->verify
&& ! mlget_item
->finish_verify
)
733 /* verify expire time if necessary */
734 if (item
->exp_time
> 0)
736 struct timeval curr_time
;
737 gettimeofday(&curr_time
, NULL
);
739 /* object doesn't expire but can't get it now */
740 if (curr_time
.tv_sec
- item
->client_time
741 < item
->exp_time
- EXPIRE_TIME_ERROR
)
743 __sync_fetch_and_add(&ms_stats
.unexp_unget
, 1);
745 if (ms_setting
.verbose
)
749 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
750 localtime(&item
->client_time
));
751 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
752 localtime(&curr_time
.tv_sec
));
754 "\n\t<%d expire time verification failed, object "
755 "doesn't expire but can't get it now\n"
758 "\tset time: %s current time: %s "
759 "diff time: %d expire time: %d\n"
760 "\texpected data len: %d\n"
761 "\texpected data: %.*s\n"
762 "\treceived data: \n",
766 item
->key_size
- (int)KEY_PREFIX_SIZE
,
770 (int)(curr_time
.tv_sec
- item
->client_time
),
781 __sync_fetch_and_add(&ms_stats
.vef_miss
, 1);
783 if (ms_setting
.verbose
)
785 fprintf(stderr
, "\n<%d data verification failed\n"
788 "\texpected data len: %d\n"
789 "\texpected data: %.*s\n"
790 "\treceived data: \n",
791 c
->sfd
, item
->key_size
, item
->key_prefix
,
792 item
->key_size
- (int)KEY_PREFIX_SIZE
,
793 orignkey
, item
->value_size
, item
->value_size
, orignval
);
799 c
->mlget_task
.mlget_num
= 0;
800 c
->mlget_task
.value_index
= INVALID_OFFSET
;
801 } /* ms_update_multi_get_result */
805 * after get the response from server for single get, the
806 * function update the state of the task and do data verify if
809 * @param c, pointer of the concurrency
810 * @param item, pointer of task item which includes the object
813 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
)
815 char *orignval
= NULL
;
816 char *orignkey
= NULL
;
818 if ((c
== NULL
) || (item
== NULL
))
823 assert(item
!= NULL
);
825 orignval
= &ms_setting
.char_block
[item
->value_offset
];
826 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
828 /* update get miss counter */
829 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.get_miss
)
831 __sync_fetch_and_add(&ms_stats
.get_misses
, 1);
834 /* get nothing from server for this task item */
835 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.verify
836 && ! c
->curr_task
.finish_verify
)
838 /* verify expire time if necessary */
839 if (item
->exp_time
> 0)
841 struct timeval curr_time
;
842 gettimeofday(&curr_time
, NULL
);
844 /* object doesn't expire but can't get it now */
845 if (curr_time
.tv_sec
- item
->client_time
846 < item
->exp_time
- EXPIRE_TIME_ERROR
)
848 __sync_fetch_and_add(&ms_stats
.unexp_unget
, 1);
850 if (ms_setting
.verbose
)
854 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
855 localtime(&item
->client_time
));
856 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
857 localtime(&curr_time
.tv_sec
));
859 "\n\t<%d expire time verification failed, object "
860 "doesn't expire but can't get it now\n"
863 "\tset time: %s current time: %s "
864 "diff time: %d expire time: %d\n"
865 "\texpected data len: %d\n"
866 "\texpected data: %.*s\n"
867 "\treceived data: \n",
871 item
->key_size
- (int)KEY_PREFIX_SIZE
,
875 (int)(curr_time
.tv_sec
- item
->client_time
),
886 __sync_fetch_and_add(&ms_stats
.vef_miss
, 1);
888 if (ms_setting
.verbose
)
890 fprintf(stderr
, "\n<%d data verification failed\n"
893 "\texpected data len: %d\n"
894 "\texpected data: %.*s\n"
895 "\treceived data: \n",
896 c
->sfd
, item
->key_size
, item
->key_prefix
,
897 item
->key_size
- (int)KEY_PREFIX_SIZE
,
898 orignkey
, item
->value_size
, item
->value_size
, orignval
);
903 } /* ms_update_single_get_result */
907 * after get the response from server for set the function
908 * update the state of the task and do data verify if necessary.
910 * @param c, pointer of the concurrency
911 * @param item, pointer of task item which includes the object
914 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
)
916 if ((c
== NULL
) || (item
== NULL
))
921 assert(item
!= NULL
);
923 if (c
->precmd
.cmd
== CMD_SET
)
925 switch (c
->precmd
.retstat
)
928 if (item
->value_offset
== INVALID_OFFSET
)
930 /* first set with the same offset of key suffix */
931 item
->value_offset
= item
->key_suffix_offset
;
935 /* not first set, just increase the value offset */
936 item
->value_offset
+= 1;
939 /* set successes, update counter */
941 c
->curr_task
.set_opt
++;
942 c
->curr_task
.cycle_undo_set
--;
945 case MCD_SERVER_ERROR
:
950 } /* ms_update_set_result */
954 * update the response time result
956 * @param c, pointer of the concurrency
958 static void ms_update_stat_result(ms_conn_t
*c
)
960 bool get_miss
= false;
968 gettimeofday(&c
->end_time
, NULL
);
969 uint64_t time_diff
= (uint64_t)ms_time_diff(&c
->start_time
, &c
->end_time
);
971 pthread_mutex_lock(&ms_statistic
.stat_mutex
);
973 switch (c
->precmd
.cmd
)
976 ms_record_event(&ms_statistic
.set_stat
, time_diff
, false);
980 if (c
->curr_task
.get_miss
)
984 ms_record_event(&ms_statistic
.get_stat
, time_diff
, get_miss
);
991 ms_record_event(&ms_statistic
.total_stat
, time_diff
, get_miss
);
992 pthread_mutex_unlock(&ms_statistic
.stat_mutex
);
993 } /* ms_update_stat_result */
997 * after get response from server for the current operation, and
998 * before doing the next operation, update the state of the
1001 * @param c, pointer of the concurrency
1003 static void ms_update_task_result(ms_conn_t
*c
)
1005 ms_task_item_t
*item
;
1013 item
= ms_get_cur_opt_item(c
);
1018 assert(item
!= NULL
);
1020 ms_update_set_result(c
, item
);
1022 if ((ms_setting
.stat_freq
> 0)
1023 && ((c
->precmd
.cmd
== CMD_SET
) || (c
->precmd
.cmd
== CMD_GET
)))
1025 ms_update_stat_result(c
);
1028 /* update multi-get task item */
1029 if (((ms_setting
.mult_key_num
> 1)
1030 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1031 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1033 ms_update_multi_get_result(c
);
1037 ms_update_single_get_result(c
, item
);
1039 } /* ms_update_task_result */
1043 * run get and set operation
1045 * @param c, pointer of the concurrency
1047 * @return int, if success, return 0, else return -1
1049 static int ms_run_getset_task(ms_conn_t
*c
)
1052 * extra one loop to get the last command return state. get the
1053 * last command return state.
1055 if ((c
->remain_exec_num
>= 0)
1056 && (c
->remain_exec_num
!= c
->exec_num
))
1058 ms_update_task_result(c
);
1062 if (ms_setting
.mult_key_num
> 1)
1064 /* operate next task item */
1065 ms_multi_getset_task_sch(c
);
1069 /* operate next task item */
1070 ms_single_getset_task_sch(c
);
1073 /* no task to do, exit */
1074 if ((c
->remain_exec_num
== -1) || ms_global
.time_out
)
1080 } /* ms_run_getset_task */
1084 * the state machine call the function to execute task.
1086 * @param c, pointer of the concurrency
1088 * @return int, if success, return 0, else return -1
1090 int ms_exec_task(struct conn
*c
)
1092 if (! ms_global
.finish_warmup
)
1094 ms_warmup_server(c
);
1098 if (ms_run_getset_task(c
) != 0)
1105 } /* ms_exec_task */