3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
15 #if TIME_WITH_SYS_TIME
16 # include <sys/time.h>
20 # include <sys/time.h>
26 #include "ms_thread.h"
27 #include "ms_setting.h"
28 #include "ms_atomic.h"
30 /* command distribution adjustment cycle */
31 #define CMD_DISTR_ADJUST_CYCLE 1000
32 #define DISADJUST_FACTOR 0.03 /**
33 * In one adjustment cycle, if undo set or get
34 * operations proportion is more than 3% , means
35 * there are too many new item or need more new
36 * item in the window. This factor shows it.
39 /* get item from task window */
40 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
);
41 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
);
42 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
);
43 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
);
46 /* select next operation to do */
47 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
);
50 /* set and get speed estimate for controlling and adjustment */
51 static bool ms_is_set_too_fast(ms_task_t
*task
);
52 static bool ms_is_get_too_fast(ms_task_t
*task
);
53 static void ms_kick_out_item(ms_task_item_t
*item
);
56 /* miss rate adjustment */
57 static bool ms_need_overwirte_item(ms_task_t
*task
);
58 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
);
61 /* deal with data verification initialization */
62 static void ms_task_data_verify_init(ms_task_t
*task
);
63 static void ms_task_expire_verify_init(ms_task_t
*task
);
66 /* select a new task to do */
67 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
);
70 /* run the selected task */
71 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
);
72 static void ms_update_stat_result(ms_conn_t
*c
);
73 static void ms_update_multi_get_result(ms_conn_t
*c
);
74 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
);
75 static void ms_update_task_result(ms_conn_t
*c
);
76 static void ms_single_getset_task_sch(ms_conn_t
*c
);
77 static void ms_multi_getset_task_sch(ms_conn_t
*c
);
78 static void ms_send_signal(ms_sync_lock_t
*sync_lock
);
79 static void ms_warmup_server(ms_conn_t
*c
);
80 static int ms_run_getset_task(ms_conn_t
*c
);
84 * used to get the current operation item(object)
86 * @param c, pointer of the concurrency
88 * @return ms_task_item_t*, current operating item
90 static ms_task_item_t
*ms_get_cur_opt_item(ms_conn_t
*c
)
92 return c
->curr_task
.item
;
97 * used to get the next item to do get operation
99 * @param c, pointer of the concurrency
101 * @return ms_task_item_t*, the pointer of the next item to do
104 static ms_task_item_t
*ms_get_next_get_item(ms_conn_t
*c
)
106 ms_task_item_t
*item
= NULL
;
108 if (c
->set_cursor
<= 0)
110 /* the first item in the window */
111 item
= &c
->item_win
[0];
113 else if (c
->set_cursor
> 0 && c
->set_cursor
< (uint32_t)c
->win_size
)
115 /* random get one item set before */
116 item
= &c
->item_win
[random() % (int64_t)c
->set_cursor
];
120 /* random get one item from the window */
121 item
= &c
->item_win
[random() % c
->win_size
];
125 } /* ms_get_next_get_item */
129 * used to get the next item to do set operation
131 * @param c, pointer of the concurrency
133 * @return ms_task_item_t*, the pointer of the next item to do
136 static ms_task_item_t
*ms_get_next_set_item(ms_conn_t
*c
)
139 * when a set command successes, the cursor will plus 1. If set
140 * fails, the cursor doesn't change. it isn't necessary to
141 * increase the cursor here.
143 return &c
->item_win
[(int64_t)c
->set_cursor
% c
->win_size
];
148 * If we need do overwrite, we could select a item set before.
149 * This function is used to get a item set before to do
152 * @param c, pointer of the concurrency
154 * @return ms_task_item_t*, the pointer of the previous item of
157 static ms_task_item_t
*ms_get_pre_set_item(ms_conn_t
*c
)
159 if (c
->set_cursor
<= 0)
161 return &c
->item_win
[0];
165 return &c
->item_win
[(int64_t)-- c
->set_cursor
% c
->win_size
];
167 } /* ms_get_pre_set_item */
171 * According to the proportion of operations(get or set), select
172 * an operation to do.
174 * @param c, pointer of the concurrency
175 * @param task, pointer of current task in the concurrency
177 static void ms_select_opt(ms_conn_t
*c
, ms_task_t
*task
)
179 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
180 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
182 /* update cycle operation number if necessary */
183 if ((task
->cycle_undo_get
== 0) || (task
->cycle_undo_set
== 0))
185 task
->cycle_undo_get
+= (int)(CMD_DISTR_ADJUST_CYCLE
* get_prop
);
186 task
->cycle_undo_set
+= (int)(CMD_DISTR_ADJUST_CYCLE
* set_prop
);
190 * According to operation distribution to choose doing which
191 * operation. If it can't set new object to sever, just change
192 * to do get operation.
194 if ((set_prop
> PROP_ERROR
)
195 && ((double)task
->get_opt
* set_prop
>= (double)task
->set_opt
199 task
->item
= ms_get_next_set_item(c
);
204 task
->item
= ms_get_next_get_item(c
);
206 } /* ms_select_opt */
210 * used to judge whether the number of get operations done is
211 * more than expected number of get operations to do right now.
213 * @param task, pointer of current task in the concurrency
215 * @return bool, if get too fast, return true, else return false
217 static bool ms_is_get_too_fast(ms_task_t
*task
)
219 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
220 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
222 /* no get operation */
223 if (get_prop
< PROP_ERROR
)
228 int max_undo_set
= (int)(set_prop
/ get_prop
* (1.0 + DISADJUST_FACTOR
))
229 * task
->cycle_undo_get
;
231 if (((double)task
->get_opt
* set_prop
> (double)task
->set_opt
* get_prop
)
232 && (task
->cycle_undo_set
> max_undo_set
))
238 } /* ms_is_get_too_fast */
242 * used to judge whether the number of set operations done is
243 * more than expected number of set operations to do right now.
245 * @param task, pointer of current task in the concurrency
247 * @return bool, if set too fast, return true, else return false
249 static bool ms_is_set_too_fast(ms_task_t
*task
)
251 double get_prop
= ms_setting
.cmd_distr
[CMD_GET
].cmd_prop
;
252 double set_prop
= ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
;
254 /* no set operation */
255 if (set_prop
< PROP_ERROR
)
260 /* If it does set operation too fast, skip some */
261 int max_undo_get
= (int)((get_prop
/ set_prop
* (1.0 + DISADJUST_FACTOR
))
262 * (double)task
->cycle_undo_set
);
264 if (((double)task
->get_opt
* set_prop
< (double)task
->set_opt
* get_prop
)
265 && (task
->cycle_undo_get
> max_undo_get
))
271 } /* ms_is_set_too_fast */
275 * kick out the old item in the window, and add a new item to
276 * overwrite the old item. When we don't want to do overwrite
277 * object, and the current item to do set operation is an old
278 * item, we could kick out the old item and add a new item. Then
279 * we can ensure we set new object every time.
281 * @param item, pointer of task item which includes the object
284 static void ms_kick_out_item(ms_task_item_t
*item
)
286 /* allocate a new item */
287 item
->key_prefix
= ms_get_key_prefix();
289 item
->key_suffix_offset
++;
290 item
->value_offset
= INVALID_OFFSET
; /* new item use invalid value offset */
291 item
->client_time
= 0;
292 } /* ms_kick_out_item */
296 * used to judge whether we need overwrite object based on the
297 * options user specified
299 * @param task, pointer of current task in the concurrency
301 * @return bool, if need overwrite, return true, else return
304 static bool ms_need_overwirte_item(ms_task_t
*task
)
306 ms_task_item_t
*item
= task
->item
;
308 assert(item
!= NULL
);
309 assert(task
->cmd
== CMD_SET
);
312 * according to data overwrite percent to determine if do data
315 if (task
->overwrite_set
< (double)task
->set_opt
316 * ms_setting
.overwrite_percent
)
322 } /* ms_need_overwirte_item */
326 * used to adjust operation. the function must be called after
327 * select operation. the function change get operation to set
328 * operation, or set operation to get operation based on the
331 * @param c, pointer of the concurrency
332 * @param task, pointer of current task in the concurrency
334 * @return bool, if success, return true, else return false
336 static bool ms_adjust_opt(ms_conn_t
*c
, ms_task_t
*task
)
338 ms_task_item_t
*item
= task
->item
;
340 assert(item
!= NULL
);
342 if (task
->cmd
== CMD_SET
)
344 /* If did set operation too fast, skip some */
345 if (ms_is_set_too_fast(task
))
347 /* get the item instead */
348 if (item
->value_offset
!= INVALID_OFFSET
)
355 /* If the current item is not a new item, kick it out */
356 if (item
->value_offset
!= INVALID_OFFSET
)
358 if (ms_need_overwirte_item(task
))
361 task
->overwrite_set
++;
365 /* kick out the current item to do set operation */
366 ms_kick_out_item(item
);
369 else /* it's a new item */
372 if (ms_need_overwirte_item(task
))
374 item
= ms_get_pre_set_item(c
);
375 if (item
->value_offset
!= INVALID_OFFSET
)
378 task
->overwrite_set
++;
380 else /* previous set item is a new item */
382 /* select the previous item to run, and cancel overwrite */
392 if (item
->value_offset
== INVALID_OFFSET
)
399 * If It does get operation too fast, it will change the
402 if (ms_is_get_too_fast(task
))
404 /* don't kick out the first item in the window */
405 if (! ms_is_set_too_fast(task
))
407 ms_kick_out_item(item
);
417 assert(item
->value_offset
!= INVALID_OFFSET
);
422 } /* ms_adjust_opt */
426 * used to initialize the task which need verify data.
428 * @param task, pointer of current task in the concurrency
430 static void ms_task_data_verify_init(ms_task_t
*task
)
432 ms_task_item_t
*item
= task
->item
;
434 assert(item
!= NULL
);
435 assert(task
->cmd
== CMD_GET
);
438 * according to data verification percent to determine if do
441 if (task
->verified_get
< (double)task
->get_opt
442 * ms_setting
.verify_percent
)
445 * currently it doesn't do verify, just increase the counter,
446 * and do verification next proper get command
448 if ((task
->item
->value_offset
!= INVALID_OFFSET
)
449 && (item
->exp_time
== 0))
452 task
->finish_verify
= false;
453 task
->verified_get
++;
456 } /* ms_task_data_verify_init */
460 * used to initialize the task which need verify expire time.
462 * @param task, pointer of current task in the concurrency
464 static void ms_task_expire_verify_init(ms_task_t
*task
)
466 ms_task_item_t
*item
= task
->item
;
468 assert(item
!= NULL
);
469 assert(task
->cmd
== CMD_GET
);
470 assert(item
->exp_time
> 0);
473 task
->finish_verify
= false;
474 } /* ms_task_expire_verify_init */
478 * used to get one task, the function initializes the task
481 * @param c, pointer of the concurrency
482 * @param warmup, whether it need warmup
484 * @return ms_task_t*, pointer of current task in the
487 static ms_task_t
*ms_get_task(ms_conn_t
*c
, bool warmup
)
489 ms_task_t
*task
= &c
->curr_task
;
494 task
->finish_verify
= true;
495 task
->get_miss
= true;
500 task
->item
= ms_get_next_set_item(c
);
505 /* according to operation distribution to choose doing which operation */
506 ms_select_opt(c
, task
);
508 if (! ms_adjust_opt(c
, task
))
513 if ((ms_setting
.verify_percent
> 0) && (task
->cmd
== CMD_GET
))
515 ms_task_data_verify_init(task
);
518 if ((ms_setting
.exp_ver_per
> 0) && (task
->cmd
== CMD_GET
)
519 && (task
->item
->exp_time
> 0))
521 ms_task_expire_verify_init(task
);
528 * Only update get and delete counter, set counter will be
529 * updated after set operation successes.
531 if (task
->cmd
== CMD_GET
)
534 task
->cycle_undo_get
--;
542 * send a signal to the main monitor thread
544 * @param sync_lock, pointer of the lock
546 static void ms_send_signal(ms_sync_lock_t
*sync_lock
)
548 pthread_mutex_lock(&sync_lock
->lock
);
550 pthread_cond_signal(&sync_lock
->cond
);
551 pthread_mutex_unlock(&sync_lock
->lock
);
552 } /* ms_send_signal */
556 * If user only want to do get operation, but there is no object
557 * in server , so we use this function to warmup the server, and
558 * set some objects to server. It runs at the beginning of task.
560 * @param c, pointer of the concurrency
562 static void ms_warmup_server(ms_conn_t
*c
)
565 ms_task_item_t
*item
;
568 * Extra one loop to get the last command returned state.
569 * Normally it gets the previous command returned state.
571 if ((c
->remain_warmup_num
>= 0)
572 && (c
->remain_warmup_num
!= c
->warmup_num
))
574 item
= ms_get_cur_opt_item(c
);
575 /* only update the set command result state for data verification */
576 if ((c
->precmd
.cmd
== CMD_SET
) && (c
->precmd
.retstat
== MCD_STORED
))
578 item
->value_offset
= item
->key_suffix_offset
;
579 /* set success, update counter */
582 else if (c
->precmd
.cmd
== CMD_SET
&& c
->precmd
.retstat
!= MCD_STORED
)
584 printf("key: %" PRIx64
" didn't set success\n", item
->key_prefix
);
588 /* the last time don't run a task */
589 if (c
->remain_warmup_num
-- > 0)
591 /* operate next task item */
592 task
= ms_get_task(c
, true);
598 * finish warming up server, wait all connects initialize
599 * complete. Then all connects can start do task at the same
602 if (c
->remain_warmup_num
== -1)
604 ms_send_signal(&ms_global
.init_lock
);
605 c
->remain_warmup_num
--; /* never run the if branch */
607 } /* ms_warmup_server */
611 * dispatch single get and set task
613 * @param c, pointer of the concurrency
615 static void ms_single_getset_task_sch(ms_conn_t
*c
)
618 ms_task_item_t
*item
;
620 /* the last time don't run a task */
621 if (c
->remain_exec_num
-- > 0)
623 task
= ms_get_task(c
, false);
625 if (task
->cmd
== CMD_SET
)
629 else if (task
->cmd
== CMD_GET
)
631 assert(task
->cmd
== CMD_GET
);
632 ms_mcd_get(c
, item
, task
->verify
);
635 } /* ms_single_getset_task_sch */
639 * dispatch multi-get and set task
641 * @param c, pointer of the concurrency
643 static void ms_multi_getset_task_sch(ms_conn_t
*c
)
646 ms_mlget_task_item_t
*mlget_item
;
650 if (c
->remain_exec_num
-- > 0)
652 task
= ms_get_task(c
, false);
653 if (task
->cmd
== CMD_SET
) /* just do it */
655 ms_mcd_set(c
, task
->item
);
660 assert(task
->cmd
== CMD_GET
);
661 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.mlget_num
];
662 mlget_item
->item
= task
->item
;
663 mlget_item
->verify
= task
->verify
;
664 mlget_item
->finish_verify
= task
->finish_verify
;
665 mlget_item
->get_miss
= task
->get_miss
;
666 c
->mlget_task
.mlget_num
++;
668 /* enough multi-get task items can be done */
669 if ((c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
)
670 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
679 if ((c
->remain_exec_num
<= 0) && (c
->mlget_task
.mlget_num
> 0))
686 } /* ms_multi_getset_task_sch */
690 * calculate the difference value of two time points
692 * @param start_time, the start time
693 * @param end_time, the end time
695 * @return uint64_t, the difference value between start_time and end_time in us
697 int64_t ms_time_diff(struct timeval
*start_time
, struct timeval
*end_time
)
699 int64_t endtime
= end_time
->tv_sec
* 1000000 + end_time
->tv_usec
;
700 int64_t starttime
= start_time
->tv_sec
* 1000000 + start_time
->tv_usec
;
702 assert(endtime
>= starttime
);
704 return endtime
- starttime
;
709 * after get the response from server for multi-get, the
710 * function update the state of the task and do data verify if
713 * @param c, pointer of the concurrency
715 static void ms_update_multi_get_result(ms_conn_t
*c
)
717 ms_mlget_task_item_t
*mlget_item
;
718 ms_task_item_t
*item
;
719 char *orignval
= NULL
;
720 char *orignkey
= NULL
;
728 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
730 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
731 item
= mlget_item
->item
;
732 orignval
= &ms_setting
.char_block
[item
->value_offset
];
733 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
735 /* update get miss counter */
736 if (mlget_item
->get_miss
)
738 atomic_add_size(&ms_stats
.get_misses
, 1);
741 /* get nothing from server for this task item */
742 if (mlget_item
->verify
&& ! mlget_item
->finish_verify
)
744 /* verify expire time if necessary */
745 if (item
->exp_time
> 0)
747 struct timeval curr_time
;
748 gettimeofday(&curr_time
, NULL
);
750 /* object doesn't expire but can't get it now */
751 if (curr_time
.tv_sec
- item
->client_time
752 < item
->exp_time
- EXPIRE_TIME_ERROR
)
754 atomic_add_size(&ms_stats
.unexp_unget
, 1);
756 if (ms_setting
.verbose
)
760 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
761 localtime(&item
->client_time
));
762 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
763 localtime(&curr_time
.tv_sec
));
765 "\n\t<%d expire time verification failed, object "
766 "doesn't expire but can't get it now\n"
768 "\tkey: %" PRIx64
" %.*s\n"
769 "\tset time: %s current time: %s "
770 "diff time: %d expire time: %d\n"
771 "\texpected data len: %d\n"
772 "\texpected data: %.*s\n"
773 "\treceived data: \n",
777 item
->key_size
- (int)KEY_PREFIX_SIZE
,
781 (int)(curr_time
.tv_sec
- item
->client_time
),
792 atomic_add_size(&ms_stats
.vef_miss
, 1);
794 if (ms_setting
.verbose
)
796 fprintf(stderr
, "\n<%d data verification failed\n"
798 "\tkey: %" PRIx64
" %.*s\n"
799 "\texpected data len: %d\n"
800 "\texpected data: %.*s\n"
801 "\treceived data: \n",
802 c
->sfd
, item
->key_size
, item
->key_prefix
,
803 item
->key_size
- (int)KEY_PREFIX_SIZE
,
804 orignkey
, item
->value_size
, item
->value_size
, orignval
);
810 c
->mlget_task
.mlget_num
= 0;
811 c
->mlget_task
.value_index
= INVALID_OFFSET
;
812 } /* ms_update_multi_get_result */
816 * after get the response from server for single get, the
817 * function update the state of the task and do data verify if
820 * @param c, pointer of the concurrency
821 * @param item, pointer of task item which includes the object
824 static void ms_update_single_get_result(ms_conn_t
*c
, ms_task_item_t
*item
)
826 char *orignval
= NULL
;
827 char *orignkey
= NULL
;
829 if ((c
== NULL
) || (item
== NULL
))
834 assert(item
!= NULL
);
836 orignval
= &ms_setting
.char_block
[item
->value_offset
];
837 orignkey
= &ms_setting
.char_block
[item
->key_suffix_offset
];
839 /* update get miss counter */
840 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.get_miss
)
842 atomic_add_size(&ms_stats
.get_misses
, 1);
845 /* get nothing from server for this task item */
846 if ((c
->precmd
.cmd
== CMD_GET
) && c
->curr_task
.verify
847 && ! c
->curr_task
.finish_verify
)
849 /* verify expire time if necessary */
850 if (item
->exp_time
> 0)
852 struct timeval curr_time
;
853 gettimeofday(&curr_time
, NULL
);
855 /* object doesn't expire but can't get it now */
856 if (curr_time
.tv_sec
- item
->client_time
857 < item
->exp_time
- EXPIRE_TIME_ERROR
)
859 atomic_add_size(&ms_stats
.unexp_unget
, 1);
861 if (ms_setting
.verbose
)
865 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
866 localtime(&item
->client_time
));
867 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
868 localtime(&curr_time
.tv_sec
));
870 "\n\t<%d expire time verification failed, object "
871 "doesn't expire but can't get it now\n"
873 "\tkey: %" PRIx64
" %.*s\n"
874 "\tset time: %s current time: %s "
875 "diff time: %d expire time: %d\n"
876 "\texpected data len: %d\n"
877 "\texpected data: %.*s\n"
878 "\treceived data: \n",
882 item
->key_size
- (int)KEY_PREFIX_SIZE
,
886 (int)(curr_time
.tv_sec
- item
->client_time
),
897 atomic_add_size(&ms_stats
.vef_miss
, 1);
899 if (ms_setting
.verbose
)
901 fprintf(stderr
, "\n<%d data verification failed\n"
903 "\tkey: %" PRIx64
" %.*s\n"
904 "\texpected data len: %d\n"
905 "\texpected data: %.*s\n"
906 "\treceived data: \n",
907 c
->sfd
, item
->key_size
, item
->key_prefix
,
908 item
->key_size
- (int)KEY_PREFIX_SIZE
,
909 orignkey
, item
->value_size
, item
->value_size
, orignval
);
914 } /* ms_update_single_get_result */
918 * after get the response from server for set the function
919 * update the state of the task and do data verify if necessary.
921 * @param c, pointer of the concurrency
922 * @param item, pointer of task item which includes the object
925 static void ms_update_set_result(ms_conn_t
*c
, ms_task_item_t
*item
)
927 if ((c
== NULL
) || (item
== NULL
))
932 assert(item
!= NULL
);
934 if (c
->precmd
.cmd
== CMD_SET
)
936 switch (c
->precmd
.retstat
)
939 if (item
->value_offset
== INVALID_OFFSET
)
941 /* first set with the same offset of key suffix */
942 item
->value_offset
= item
->key_suffix_offset
;
946 /* not first set, just increase the value offset */
947 item
->value_offset
+= 1;
950 /* set successes, update counter */
952 c
->curr_task
.set_opt
++;
953 c
->curr_task
.cycle_undo_set
--;
956 case MCD_SERVER_ERROR
:
961 } /* ms_update_set_result */
965 * update the response time result
967 * @param c, pointer of the concurrency
969 static void ms_update_stat_result(ms_conn_t
*c
)
971 bool get_miss
= false;
979 gettimeofday(&c
->end_time
, NULL
);
980 uint64_t time_diff
= (uint64_t)ms_time_diff(&c
->start_time
, &c
->end_time
);
982 pthread_mutex_lock(&ms_statistic
.stat_mutex
);
984 switch (c
->precmd
.cmd
)
987 ms_record_event(&ms_statistic
.set_stat
, time_diff
, false);
991 if (c
->curr_task
.get_miss
)
995 ms_record_event(&ms_statistic
.get_stat
, time_diff
, get_miss
);
1002 ms_record_event(&ms_statistic
.total_stat
, time_diff
, get_miss
);
1003 pthread_mutex_unlock(&ms_statistic
.stat_mutex
);
1004 } /* ms_update_stat_result */
1008 * after get response from server for the current operation, and
1009 * before doing the next operation, update the state of the
1010 * current operation.
1012 * @param c, pointer of the concurrency
1014 static void ms_update_task_result(ms_conn_t
*c
)
1016 ms_task_item_t
*item
;
1024 item
= ms_get_cur_opt_item(c
);
1029 assert(item
!= NULL
);
1031 ms_update_set_result(c
, item
);
1033 if ((ms_setting
.stat_freq
> 0)
1034 && ((c
->precmd
.cmd
== CMD_SET
) || (c
->precmd
.cmd
== CMD_GET
)))
1036 ms_update_stat_result(c
);
1039 /* update multi-get task item */
1040 if (((ms_setting
.mult_key_num
> 1)
1041 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1042 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1044 ms_update_multi_get_result(c
);
1048 ms_update_single_get_result(c
, item
);
1050 } /* ms_update_task_result */
1054 * run get and set operation
1056 * @param c, pointer of the concurrency
1058 * @return int, if success, return 0, else return -1
1060 static int ms_run_getset_task(ms_conn_t
*c
)
1063 * extra one loop to get the last command return state. get the
1064 * last command return state.
1066 if ((c
->remain_exec_num
>= 0)
1067 && (c
->remain_exec_num
!= c
->exec_num
))
1069 ms_update_task_result(c
);
1073 if (ms_setting
.mult_key_num
> 1)
1075 /* operate next task item */
1076 ms_multi_getset_task_sch(c
);
1080 /* operate next task item */
1081 ms_single_getset_task_sch(c
);
1084 /* no task to do, exit */
1085 if ((c
->remain_exec_num
== -1) || ms_global
.time_out
)
1091 } /* ms_run_getset_task */
1095 * the state machine call the function to execute task.
1097 * @param c, pointer of the concurrency
1099 * @return int, if success, return 0, else return -1
1101 int ms_exec_task(struct conn
*c
)
1103 if (! ms_global
.finish_warmup
)
1105 ms_warmup_server(c
);
1109 if (ms_run_getset_task(c
) != 0)
1116 } /* ms_exec_task */