Uncrustify
[awesomized/libmemcached] / clients / ms_task.c
1 /*
2 * File: ms_task.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11 #include "ms_thread.h"
12 #include "ms_setting.h"
13
14 /* command distribution adjustment cycle */
15 #define CMD_DISTR_ADJUST_CYCLE 1000
16 #define DISADJUST_FACTOR 0.03 /**
17 * In one adjustment cycle, if undo set or get
18 * operations proportion is more than 3% , means
19 * there are too many new item or need more new
20 * item in the window. This factor shows it.
21 */
22
23 extern __thread ms_thread_t ms_thread;
24
25 /* get item from task window */
26 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
27 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
28 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
29 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c);
30
31
32 /* select next operation to do */
33 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
34
35
36 /* set and get speed estimate for controlling and adjustment */
37 static bool ms_is_set_too_fast(ms_task_t *task);
38 static bool ms_is_get_too_fast(ms_task_t *task);
39 static void ms_kick_out_item(ms_task_item_t *item);
40
41
42 /* miss rate adjustment */
43 static bool ms_need_overwirte_item(ms_task_t *task);
44 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
45
46
47 /* deal with data verification initialization */
48 static void ms_task_data_verify_init(ms_task_t *task);
49 static void ms_task_expire_verify_init(ms_task_t *task);
50
51
52 /* select a new task to do */
53 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
54
55
56 /* run the selected task */
57 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
58 static void ms_update_stat_result(ms_conn_t *c);
59 static void ms_update_multi_get_result(ms_conn_t *c);
60 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
61 static void ms_update_task_result(ms_conn_t *c);
62 static void ms_single_getset_task_sch(ms_conn_t *c);
63 static void ms_multi_getset_task_sch(ms_conn_t *c);
64 static void ms_send_signal(ms_sync_lock_t *sync_lock);
65 static void ms_warmup_server(ms_conn_t *c);
66 static int ms_run_getset_task(ms_conn_t *c);
67
68
69 /**
70 * used to get the current operation item(object)
71 *
72 * @param c, pointer of the concurrency
73 *
74 * @return ms_task_item_t*, current operating item
75 */
76 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
77 {
78 return c->curr_task.item;
79 }
80
81
82 /**
83 * used to get the next item to do get operation
84 *
85 * @param c, pointer of the concurrency
86 *
87 * @return ms_task_item_t*, the pointer of the next item to do
88 * get operation
89 */
90 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
91 {
92 ms_task_item_t *item= NULL;
93
94 if (c->set_cursor <= 0)
95 {
96 /* the first item in the window */
97 item= &c->item_win[0];
98 }
99 else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size)
100 {
101 /* random get one item set before */
102 item= &c->item_win[random() % (int64_t)c->set_cursor];
103 }
104 else
105 {
106 /* random get one item from the window */
107 item= &c->item_win[random() % c->win_size];
108 }
109
110 return item;
111 } /* ms_get_next_get_item */
112
113
114 /**
115 * used to get the next item to do set operation
116 *
117 * @param c, pointer of the concurrency
118 *
119 * @return ms_task_item_t*, the pointer of the next item to do
120 * set operation
121 */
122 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
123 {
124 /**
125 * when a set command successes, the cursor will plus 1. If set
126 * fails, the cursor doesn't change. it isn't necessary to
127 * increase the cursor here.
128 */
129 return &c->item_win[(int64_t)c->set_cursor % c->win_size];
130 }
131
132
133 /**
134 * If we need do overwrite, we could select a item set before.
135 * This function is used to get a item set before to do
136 * overwrite.
137 *
138 * @param c, pointer of the concurrency
139 *
140 * @return ms_task_item_t*, the pointer of the previous item of
141 * set operation
142 */
143 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c)
144 {
145 if (c->set_cursor <= 0)
146 {
147 return &c->item_win[0];
148 }
149 else
150 {
151 return &c->item_win[(int64_t)-- c->set_cursor % c->win_size];
152 }
153 } /* ms_get_pre_set_item */
154
155
156 /**
157 * According to the proportion of operations(get or set), select
158 * an operation to do.
159 *
160 * @param c, pointer of the concurrency
161 * @param task, pointer of current task in the concurrency
162 */
163 static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
164 {
165 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
166 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
167
168 /* update cycle operation number if necessary */
169 if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0))
170 {
171 task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
172 task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
173 }
174
175 /**
176 * According to operation distribution to choose doing which
177 * operation. If it can't set new object to sever, just change
178 * to do get operation.
179 */
180 if ((set_prop > PROP_ERROR)
181 && ((double)task->get_opt * set_prop >= (double)task->set_opt
182 * get_prop))
183 {
184 task->cmd= CMD_SET;
185 task->item= ms_get_next_set_item(c);
186 }
187 else
188 {
189 task->cmd= CMD_GET;
190 task->item= ms_get_next_get_item(c);
191 }
192 } /* ms_select_opt */
193
194
195 /**
196 * used to judge whether the number of get operations done is
197 * more than expected number of get operations to do right now.
198 *
199 * @param task, pointer of current task in the concurrency
200 *
201 * @return bool, if get too fast, return true, else return false
202 */
203 static bool ms_is_get_too_fast(ms_task_t *task)
204 {
205 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
206 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
207
208 /* no get operation */
209 if (get_prop < PROP_ERROR)
210 {
211 return false;
212 }
213
214 int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR))
215 * task->cycle_undo_get;
216
217 if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop)
218 && (task->cycle_undo_set > max_undo_set))
219 {
220 return true;
221 }
222
223 return false;
224 } /* ms_is_get_too_fast */
225
226
227 /**
228 * used to judge whether the number of set operations done is
229 * more than expected number of set operations to do right now.
230 *
231 * @param task, pointer of current task in the concurrency
232 *
233 * @return bool, if set too fast, return true, else return false
234 */
235 static bool ms_is_set_too_fast(ms_task_t *task)
236 {
237 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
238 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
239
240 /* no set operation */
241 if (set_prop < PROP_ERROR)
242 {
243 return false;
244 }
245
246 /* If it does set operation too fast, skip some */
247 int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
248 * (double)task->cycle_undo_set);
249
250 if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop)
251 && (task->cycle_undo_get > max_undo_get))
252 {
253 return true;
254 }
255
256 return false;
257 } /* ms_is_set_too_fast */
258
259
260 /**
261 * kick out the old item in the window, and add a new item to
262 * overwrite the old item. When we don't want to do overwrite
263 * object, and the current item to do set operation is an old
264 * item, we could kick out the old item and add a new item. Then
265 * we can ensure we set new object every time.
266 *
267 * @param item, pointer of task item which includes the object
268 * information
269 */
270 static void ms_kick_out_item(ms_task_item_t *item)
271 {
272 /* allocate a new item */
273 item->key_prefix= ms_get_key_prefix();
274
275 item->key_suffix_offset++;
276 item->value_offset= INVALID_OFFSET; /* new item use invalid value offset */
277 item->client_time= 0;
278 } /* ms_kick_out_item */
279
280
281 /**
282 * used to judge whether we need overwrite object based on the
283 * options user specified
284 *
285 * @param task, pointer of current task in the concurrency
286 *
287 * @return bool, if need overwrite, return true, else return
288 * false
289 */
290 static bool ms_need_overwirte_item(ms_task_t *task)
291 {
292 ms_task_item_t *item= task->item;
293
294 assert(item != NULL);
295 assert(task->cmd == CMD_SET);
296
297 /**
298 * according to data overwrite percent to determine if do data
299 * overwrite.
300 */
301 if (task->overwrite_set < (double)task->set_opt
302 * ms_setting.overwrite_percent)
303 {
304 return true;
305 }
306
307 return false;
308 } /* ms_need_overwirte_item */
309
310
311 /**
312 * used to adjust operation. the function must be called after
313 * select operation. the function change get operation to set
314 * operation, or set operation to get operation based on the
315 * current case.
316 *
317 * @param c, pointer of the concurrency
318 * @param task, pointer of current task in the concurrency
319 *
320 * @return bool, if success, return true, else return false
321 */
322 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
323 {
324 ms_task_item_t *item= task->item;
325
326 assert(item != NULL);
327
328 if (task->cmd == CMD_SET)
329 {
330 /* If did set operation too fast, skip some */
331 if (ms_is_set_too_fast(task))
332 {
333 /* get the item instead */
334 if (item->value_offset != INVALID_OFFSET)
335 {
336 task->cmd= CMD_GET;
337 return true;
338 }
339 }
340
341 /* If the current item is not a new item, kick it out */
342 if (item->value_offset != INVALID_OFFSET)
343 {
344 if (ms_need_overwirte_item(task))
345 {
346 /* overwrite */
347 task->overwrite_set++;
348 }
349 else
350 {
351 /* kick out the current item to do set operation */
352 ms_kick_out_item(item);
353 }
354 }
355 else /* it's a new item */
356 {
357 /* need overwrite */
358 if (ms_need_overwirte_item(task))
359 {
360 item= ms_get_pre_set_item(c);
361 if (item->value_offset != INVALID_OFFSET)
362 {
363 task->item= item;
364 task->overwrite_set++;
365 }
366 else /* previous set item is a new item */
367 {
368 /* select the previous item to run, and cancel overwrite */
369 task->item= item;
370 }
371 }
372 }
373 task->cmd= CMD_SET;
374 return true;
375 }
376 else
377 {
378 if (item->value_offset == INVALID_OFFSET)
379 {
380 task->cmd= CMD_SET;
381 return true;
382 }
383
384 /**
385 * If It does get operation too fast, it will change the
386 * operation to set.
387 */
388 if (ms_is_get_too_fast(task))
389 {
390 /* don't kick out the first item in the window */
391 if (! ms_is_set_too_fast(task))
392 {
393 ms_kick_out_item(item);
394 task->cmd= CMD_SET;
395 return true;
396 }
397 else
398 {
399 return false;
400 }
401 }
402
403 assert(item->value_offset != INVALID_OFFSET);
404
405 task->cmd= CMD_GET;
406 return true;
407 }
408 } /* ms_adjust_opt */
409
410
411 /**
412 * used to initialize the task which need verify data.
413 *
414 * @param task, pointer of current task in the concurrency
415 */
416 static void ms_task_data_verify_init(ms_task_t *task)
417 {
418 ms_task_item_t *item= task->item;
419
420 assert(item != NULL);
421 assert(task->cmd == CMD_GET);
422
423 /**
424 * according to data verification percent to determine if do
425 * data verification.
426 */
427 if (task->verified_get < (double)task->get_opt
428 * ms_setting.verify_percent)
429 {
430 /**
431 * currently it doesn't do verify, just increase the counter,
432 * and do verification next proper get command
433 */
434 if ((task->item->value_offset != INVALID_OFFSET)
435 && (item->exp_time == 0))
436 {
437 task->verify= true;
438 task->finish_verify= false;
439 task->verified_get++;
440 }
441 }
442 } /* ms_task_data_verify_init */
443
444
445 /**
446 * used to initialize the task which need verify expire time.
447 *
448 * @param task, pointer of current task in the concurrency
449 */
450 static void ms_task_expire_verify_init(ms_task_t *task)
451 {
452 ms_task_item_t *item= task->item;
453
454 assert(item != NULL);
455 assert(task->cmd == CMD_GET);
456 assert(item->exp_time > 0);
457
458 task->verify= true;
459 task->finish_verify= false;
460 } /* ms_task_expire_verify_init */
461
462
463 /**
464 * used to get one task, the function initializes the task
465 * structure.
466 *
467 * @param c, pointer of the concurrency
468 * @param warmup, whether it need warmup
469 *
470 * @return ms_task_t*, pointer of current task in the
471 * concurrency
472 */
473 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
474 {
475 ms_task_t *task= &c->curr_task;
476
477 while (1)
478 {
479 task->verify= false;
480 task->finish_verify= true;
481 task->get_miss= true;
482
483 if (warmup)
484 {
485 task->cmd= CMD_SET;
486 task->item= ms_get_next_set_item(c);
487
488 return task;
489 }
490
491 /* according to operation distribution to choose doing which operation */
492 ms_select_opt(c, task);
493
494 if (! ms_adjust_opt(c, task))
495 {
496 continue;
497 }
498
499 if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET))
500 {
501 ms_task_data_verify_init(task);
502 }
503
504 if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET)
505 && (task->item->exp_time > 0))
506 {
507 ms_task_expire_verify_init(task);
508 }
509
510 break;
511 }
512
513 /**
514 * Only update get and delete counter, set counter will be
515 * updated after set operation successes.
516 */
517 if (task->cmd == CMD_GET)
518 {
519 task->get_opt++;
520 task->cycle_undo_get--;
521 }
522
523 return task;
524 } /* ms_get_task */
525
526
527 /**
528 * send a signal to the main monitor thread
529 *
530 * @param sync_lock, pointer of the lock
531 */
532 static void ms_send_signal(ms_sync_lock_t *sync_lock)
533 {
534 pthread_mutex_lock(&sync_lock->lock);
535 sync_lock->count++;
536 pthread_cond_signal(&sync_lock->cond);
537 pthread_mutex_unlock(&sync_lock->lock);
538 } /* ms_send_signal */
539
540
541 /**
542 * If user only want to do get operation, but there is no object
543 * in server , so we use this function to warmup the server, and
544 * set some objects to server. It runs at the beginning of task.
545 *
546 * @param c, pointer of the concurrency
547 */
548 static void ms_warmup_server(ms_conn_t *c)
549 {
550 ms_task_t *task;
551 ms_task_item_t *item;
552
553 /**
554 * Extra one loop to get the last command returned state.
555 * Normally it gets the previous command returned state.
556 */
557 if ((c->remain_warmup_num >= 0)
558 && (c->remain_warmup_num != c->warmup_num))
559 {
560 item= ms_get_cur_opt_item(c);
561 /* only update the set command result state for data verification */
562 if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED))
563 {
564 item->value_offset= item->key_suffix_offset;
565 /* set success, update counter */
566 c->set_cursor++;
567 }
568 else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED)
569 {
570 printf("key: %lx didn't set success\n", item->key_prefix);
571 }
572 }
573
574 /* the last time don't run a task */
575 if (c->remain_warmup_num-- > 0)
576 {
577 /* operate next task item */
578 task= ms_get_task(c, true);
579 item= task->item;
580 ms_mcd_set(c, item);
581 }
582
583 /**
584 * finish warming up server, wait all connects initialize
585 * complete. Then all connects can start do task at the same
586 * time.
587 */
588 if (c->remain_warmup_num == -1)
589 {
590 ms_send_signal(&ms_global.init_lock);
591 c->remain_warmup_num--; /* never run the if branch */
592 }
593 } /* ms_warmup_server */
594
595
596 /**
597 * dispatch single get and set task
598 *
599 * @param c, pointer of the concurrency
600 */
601 static void ms_single_getset_task_sch(ms_conn_t *c)
602 {
603 ms_task_t *task;
604 ms_task_item_t *item;
605
606 /* the last time don't run a task */
607 if (c->remain_exec_num-- > 0)
608 {
609 task= ms_get_task(c, false);
610 item= task->item;
611 if (task->cmd == CMD_SET)
612 {
613 ms_mcd_set(c, item);
614 }
615 else if (task->cmd == CMD_GET)
616 {
617 assert(task->cmd == CMD_GET);
618 ms_mcd_get(c, item, task->verify);
619 }
620 }
621 } /* ms_single_getset_task_sch */
622
623
624 /**
625 * dispatch multi-get and set task
626 *
627 * @param c, pointer of the concurrency
628 */
629 static void ms_multi_getset_task_sch(ms_conn_t *c)
630 {
631 ms_task_t *task;
632 ms_mlget_task_item_t *mlget_item;
633
634 while (1)
635 {
636 if (c->remain_exec_num-- > 0)
637 {
638 task= ms_get_task(c, false);
639 if (task->cmd == CMD_SET) /* just do it */
640 {
641 ms_mcd_set(c, task->item);
642 break;
643 }
644 else
645 {
646 assert(task->cmd == CMD_GET);
647 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
648 mlget_item->item= task->item;
649 mlget_item->verify= task->verify;
650 mlget_item->finish_verify= task->finish_verify;
651 mlget_item->get_miss= task->get_miss;
652 c->mlget_task.mlget_num++;
653
654 /* enough multi-get task items can be done */
655 if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
656 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
657 {
658 ms_mcd_mlget(c);
659 break;
660 }
661 }
662 }
663 else
664 {
665 if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0))
666 {
667 ms_mcd_mlget(c);
668 }
669 break;
670 }
671 }
672 } /* ms_multi_getset_task_sch */
673
674
675 /**
676 * calculate the difference value of two time points
677 *
678 * @param start_time, the start time
679 * @param end_time, the end time
680 *
681 * @return uint64_t, the difference value between start_time and end_time in us
682 */
683 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
684 {
685 int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec;
686 int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec;
687
688 assert(endtime >= starttime);
689
690 return endtime - starttime;
691 } /* ms_time_diff */
692
693
694 /**
695 * after get the response from server for multi-get, the
696 * function update the state of the task and do data verify if
697 * necessary.
698 *
699 * @param c, pointer of the concurrency
700 */
701 static void ms_update_multi_get_result(ms_conn_t *c)
702 {
703 ms_mlget_task_item_t *mlget_item;
704 ms_task_item_t *item;
705 char *orignval= NULL;
706 char *orignkey= NULL;
707
708 if (c == NULL)
709 {
710 return;
711 }
712 assert(c != NULL);
713
714 for (int i= 0; i < c->mlget_task.mlget_num; i++)
715 {
716 mlget_item= &c->mlget_task.mlget_item[i];
717 item= mlget_item->item;
718 orignval= &ms_setting.char_block[item->value_offset];
719 orignkey= &ms_setting.char_block[item->key_suffix_offset];
720
721 /* update get miss counter */
722 if (mlget_item->get_miss)
723 {
724 __sync_fetch_and_add(&ms_stats.get_misses, 1);
725 }
726
727 /* get nothing from server for this task item */
728 if (mlget_item->verify && ! mlget_item->finish_verify)
729 {
730 /* verify expire time if necessary */
731 if (item->exp_time > 0)
732 {
733 struct timeval curr_time;
734 gettimeofday(&curr_time, NULL);
735
736 /* object doesn't expire but can't get it now */
737 if (curr_time.tv_sec - item->client_time
738 < item->exp_time - EXPIRE_TIME_ERROR)
739 {
740 __sync_fetch_and_add(&ms_stats.unexp_unget, 1);
741
742 if (ms_setting.verbose)
743 {
744 char set_time[64];
745 char cur_time[64];
746 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
747 localtime(&item->client_time));
748 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
749 localtime(&curr_time.tv_sec));
750 fprintf(stderr,
751 "\n\t<%d expire time verification failed, object "
752 "doesn't expire but can't get it now\n"
753 "\tkey len: %d\n"
754 "\tkey: %lx %.*s\n"
755 "\tset time: %s current time: %s "
756 "diff time: %d expire time: %d\n"
757 "\texpected data len: %d\n"
758 "\texpected data: %.*s\n"
759 "\treceived data: \n",
760 c->sfd,
761 item->key_size,
762 item->key_prefix,
763 item->key_size - (int)KEY_PREFIX_SIZE,
764 orignkey,
765 set_time,
766 cur_time,
767 (int)(curr_time.tv_sec - item->client_time),
768 item->exp_time,
769 item->value_size,
770 item->value_size,
771 orignval);
772 fflush(stderr);
773 }
774 }
775 }
776 else
777 {
778 __sync_fetch_and_add(&ms_stats.vef_miss, 1);
779
780 if (ms_setting.verbose)
781 {
782 fprintf(stderr, "\n<%d data verification failed\n"
783 "\tkey len: %d\n"
784 "\tkey: %lx %.*s\n"
785 "\texpected data len: %d\n"
786 "\texpected data: %.*s\n"
787 "\treceived data: \n",
788 c->sfd, item->key_size, item->key_prefix,
789 item->key_size - (int)KEY_PREFIX_SIZE,
790 orignkey, item->value_size, item->value_size, orignval);
791 fflush(stderr);
792 }
793 }
794 }
795 }
796 c->mlget_task.mlget_num= 0;
797 c->mlget_task.value_index= INVALID_OFFSET;
798 } /* ms_update_multi_get_result */
799
800
801 /**
802 * after get the response from server for single get, the
803 * function update the state of the task and do data verify if
804 * necessary.
805 *
806 * @param c, pointer of the concurrency
807 * @param item, pointer of task item which includes the object
808 * information
809 */
810 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
811 {
812 char *orignval= NULL;
813 char *orignkey= NULL;
814
815 if ((c == NULL) || (item == NULL))
816 {
817 return;
818 }
819 assert(c != NULL);
820 assert(item != NULL);
821
822 orignval= &ms_setting.char_block[item->value_offset];
823 orignkey= &ms_setting.char_block[item->key_suffix_offset];
824
825 /* update get miss counter */
826 if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
827 {
828 __sync_fetch_and_add(&ms_stats.get_misses, 1);
829 }
830
831 /* get nothing from server for this task item */
832 if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify
833 && ! c->curr_task.finish_verify)
834 {
835 /* verify expire time if necessary */
836 if (item->exp_time > 0)
837 {
838 struct timeval curr_time;
839 gettimeofday(&curr_time, NULL);
840
841 /* object doesn't expire but can't get it now */
842 if (curr_time.tv_sec - item->client_time
843 < item->exp_time - EXPIRE_TIME_ERROR)
844 {
845 __sync_fetch_and_add(&ms_stats.unexp_unget, 1);
846
847 if (ms_setting.verbose)
848 {
849 char set_time[64];
850 char cur_time[64];
851 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
852 localtime(&item->client_time));
853 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
854 localtime(&curr_time.tv_sec));
855 fprintf(stderr,
856 "\n\t<%d expire time verification failed, object "
857 "doesn't expire but can't get it now\n"
858 "\tkey len: %d\n"
859 "\tkey: %lx %.*s\n"
860 "\tset time: %s current time: %s "
861 "diff time: %d expire time: %d\n"
862 "\texpected data len: %d\n"
863 "\texpected data: %.*s\n"
864 "\treceived data: \n",
865 c->sfd,
866 item->key_size,
867 item->key_prefix,
868 item->key_size - (int)KEY_PREFIX_SIZE,
869 orignkey,
870 set_time,
871 cur_time,
872 (int)(curr_time.tv_sec - item->client_time),
873 item->exp_time,
874 item->value_size,
875 item->value_size,
876 orignval);
877 fflush(stderr);
878 }
879 }
880 }
881 else
882 {
883 __sync_fetch_and_add(&ms_stats.vef_miss, 1);
884
885 if (ms_setting.verbose)
886 {
887 fprintf(stderr, "\n<%d data verification failed\n"
888 "\tkey len: %d\n"
889 "\tkey: %lx %.*s\n"
890 "\texpected data len: %d\n"
891 "\texpected data: %.*s\n"
892 "\treceived data: \n",
893 c->sfd, item->key_size, item->key_prefix,
894 item->key_size - (int)KEY_PREFIX_SIZE,
895 orignkey, item->value_size, item->value_size, orignval);
896 fflush(stderr);
897 }
898 }
899 }
900 } /* ms_update_single_get_result */
901
902
903 /**
904 * after get the response from server for set the function
905 * update the state of the task and do data verify if necessary.
906 *
907 * @param c, pointer of the concurrency
908 * @param item, pointer of task item which includes the object
909 * information
910 */
911 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
912 {
913 if ((c == NULL) || (item == NULL))
914 {
915 return;
916 }
917 assert(c != NULL);
918 assert(item != NULL);
919
920 if (c->precmd.cmd == CMD_SET)
921 {
922 switch (c->precmd.retstat)
923 {
924 case MCD_STORED:
925 if (item->value_offset == INVALID_OFFSET)
926 {
927 /* first set with the same offset of key suffix */
928 item->value_offset= item->key_suffix_offset;
929 }
930 else
931 {
932 /* not first set, just increase the value offset */
933 item->value_offset+= 1;
934 }
935
936 /* set successes, update counter */
937 c->set_cursor++;
938 c->curr_task.set_opt++;
939 c->curr_task.cycle_undo_set--;
940 break;
941
942 case MCD_SERVER_ERROR:
943 default:
944 break;
945 } /* switch */
946 }
947 } /* ms_update_set_result */
948
949
950 /**
951 * update the response time result
952 *
953 * @param c, pointer of the concurrency
954 */
955 static void ms_update_stat_result(ms_conn_t *c)
956 {
957 bool get_miss= false;
958
959 if (c == NULL)
960 {
961 return;
962 }
963 assert(c != NULL);
964
965 gettimeofday(&c->end_time, NULL);
966 uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time);
967
968 pthread_mutex_lock(&ms_statistic.stat_mutex);
969
970 switch (c->precmd.cmd)
971 {
972 case CMD_SET:
973 ms_record_event(&ms_statistic.set_stat, time_diff, false);
974 break;
975
976 case CMD_GET:
977 if (c->curr_task.get_miss)
978 {
979 get_miss= true;
980 }
981 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
982 break;
983
984 default:
985 break;
986 } /* switch */
987
988 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
989 pthread_mutex_unlock(&ms_statistic.stat_mutex);
990 } /* ms_update_stat_result */
991
992
993 /**
994 * after get response from server for the current operation, and
995 * before doing the next operation, update the state of the
996 * current operation.
997 *
998 * @param c, pointer of the concurrency
999 */
1000 static void ms_update_task_result(ms_conn_t *c)
1001 {
1002 ms_task_item_t *item;
1003
1004 if (c == NULL)
1005 {
1006 return;
1007 }
1008 assert(c != NULL);
1009
1010 item= ms_get_cur_opt_item(c);
1011 if (item == NULL)
1012 {
1013 return;
1014 }
1015 assert(item != NULL);
1016
1017 ms_update_set_result(c, item);
1018
1019 if ((ms_setting.stat_freq > 0)
1020 && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET)))
1021 {
1022 ms_update_stat_result(c);
1023 }
1024
1025 /* update multi-get task item */
1026 if (((ms_setting.mult_key_num > 1)
1027 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1028 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1029 {
1030 ms_update_multi_get_result(c);
1031 }
1032 else
1033 {
1034 ms_update_single_get_result(c, item);
1035 }
1036 } /* ms_update_task_result */
1037
1038
1039 /**
1040 * run get and set operation
1041 *
1042 * @param c, pointer of the concurrency
1043 *
1044 * @return int, if success, return 0, else return -1
1045 */
1046 static int ms_run_getset_task(ms_conn_t *c)
1047 {
1048 /**
1049 * extra one loop to get the last command return state. get the
1050 * last command return state.
1051 */
1052 if ((c->remain_exec_num >= 0)
1053 && (c->remain_exec_num != c->exec_num))
1054 {
1055 ms_update_task_result(c);
1056 }
1057
1058 /* multi-get */
1059 if (ms_setting.mult_key_num > 1)
1060 {
1061 /* operate next task item */
1062 ms_multi_getset_task_sch(c);
1063 }
1064 else
1065 {
1066 /* operate next task item */
1067 ms_single_getset_task_sch(c);
1068 }
1069
1070 /* no task to do, exit */
1071 if ((c->remain_exec_num == -1) || ms_global.time_out)
1072 {
1073 return -1;
1074 }
1075
1076 return 0;
1077 } /* ms_run_getset_task */
1078
1079
1080 /**
1081 * the state machine call the function to execute task.
1082 *
1083 * @param c, pointer of the concurrency
1084 *
1085 * @return int, if success, return 0, else return -1
1086 */
1087 int ms_exec_task(struct conn *c)
1088 {
1089 if (! ms_global.finish_warmup)
1090 {
1091 ms_warmup_server(c);
1092 }
1093 else
1094 {
1095 if (ms_run_getset_task(c) != 0)
1096 {
1097 return -1;
1098 }
1099 }
1100
1101 return 0;
1102 } /* ms_exec_task */