Merge Monty
[awesomized/libmemcached] / clients / ms_task.c
1 /*
2 * File: ms_task.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <inttypes.h>
15 #if TIME_WITH_SYS_TIME
16 # include <sys/time.h>
17 # include <time.h>
18 #else
19 # if HAVE_SYS_TIME_H
20 # include <sys/time.h>
21 # else
22 # include <time.h>
23 # endif
24 #endif
25
26 #include "ms_thread.h"
27 #include "ms_setting.h"
28 #include "ms_atomic.h"
29
30 /* command distribution adjustment cycle */
31 #define CMD_DISTR_ADJUST_CYCLE 1000
32 #define DISADJUST_FACTOR 0.03 /**
33 * In one adjustment cycle, if undo set or get
34 * operations proportion is more than 3% , means
35 * there are too many new item or need more new
36 * item in the window. This factor shows it.
37 */
38
39 /* get item from task window */
40 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
41 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
42 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
43 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c);
44
45
46 /* select next operation to do */
47 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
48
49
50 /* set and get speed estimate for controlling and adjustment */
51 static bool ms_is_set_too_fast(ms_task_t *task);
52 static bool ms_is_get_too_fast(ms_task_t *task);
53 static void ms_kick_out_item(ms_task_item_t *item);
54
55
56 /* miss rate adjustment */
57 static bool ms_need_overwirte_item(ms_task_t *task);
58 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
59
60
61 /* deal with data verification initialization */
62 static void ms_task_data_verify_init(ms_task_t *task);
63 static void ms_task_expire_verify_init(ms_task_t *task);
64
65
66 /* select a new task to do */
67 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
68
69
70 /* run the selected task */
71 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
72 static void ms_update_stat_result(ms_conn_t *c);
73 static void ms_update_multi_get_result(ms_conn_t *c);
74 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
75 static void ms_update_task_result(ms_conn_t *c);
76 static void ms_single_getset_task_sch(ms_conn_t *c);
77 static void ms_multi_getset_task_sch(ms_conn_t *c);
78 static void ms_send_signal(ms_sync_lock_t *sync_lock);
79 static void ms_warmup_server(ms_conn_t *c);
80 static int ms_run_getset_task(ms_conn_t *c);
81
82
83 /**
84 * used to get the current operation item(object)
85 *
86 * @param c, pointer of the concurrency
87 *
88 * @return ms_task_item_t*, current operating item
89 */
90 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
91 {
92 return c->curr_task.item;
93 }
94
95
96 /**
97 * used to get the next item to do get operation
98 *
99 * @param c, pointer of the concurrency
100 *
101 * @return ms_task_item_t*, the pointer of the next item to do
102 * get operation
103 */
104 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
105 {
106 ms_task_item_t *item= NULL;
107
108 if (c->set_cursor <= 0)
109 {
110 /* the first item in the window */
111 item= &c->item_win[0];
112 }
113 else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size)
114 {
115 /* random get one item set before */
116 item= &c->item_win[random() % (int64_t)c->set_cursor];
117 }
118 else
119 {
120 /* random get one item from the window */
121 item= &c->item_win[random() % c->win_size];
122 }
123
124 return item;
125 } /* ms_get_next_get_item */
126
127
128 /**
129 * used to get the next item to do set operation
130 *
131 * @param c, pointer of the concurrency
132 *
133 * @return ms_task_item_t*, the pointer of the next item to do
134 * set operation
135 */
136 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
137 {
138 /**
139 * when a set command successes, the cursor will plus 1. If set
140 * fails, the cursor doesn't change. it isn't necessary to
141 * increase the cursor here.
142 */
143 return &c->item_win[(int64_t)c->set_cursor % c->win_size];
144 }
145
146
147 /**
148 * If we need do overwrite, we could select a item set before.
149 * This function is used to get a item set before to do
150 * overwrite.
151 *
152 * @param c, pointer of the concurrency
153 *
154 * @return ms_task_item_t*, the pointer of the previous item of
155 * set operation
156 */
157 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c)
158 {
159 if (c->set_cursor <= 0)
160 {
161 return &c->item_win[0];
162 }
163 else
164 {
165 return &c->item_win[(int64_t)-- c->set_cursor % c->win_size];
166 }
167 } /* ms_get_pre_set_item */
168
169
170 /**
171 * According to the proportion of operations(get or set), select
172 * an operation to do.
173 *
174 * @param c, pointer of the concurrency
175 * @param task, pointer of current task in the concurrency
176 */
177 static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
178 {
179 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
180 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
181
182 /* update cycle operation number if necessary */
183 if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0))
184 {
185 task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
186 task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
187 }
188
189 /**
190 * According to operation distribution to choose doing which
191 * operation. If it can't set new object to sever, just change
192 * to do get operation.
193 */
194 if ((set_prop > PROP_ERROR)
195 && ((double)task->get_opt * set_prop >= (double)task->set_opt
196 * get_prop))
197 {
198 task->cmd= CMD_SET;
199 task->item= ms_get_next_set_item(c);
200 }
201 else
202 {
203 task->cmd= CMD_GET;
204 task->item= ms_get_next_get_item(c);
205 }
206 } /* ms_select_opt */
207
208
209 /**
210 * used to judge whether the number of get operations done is
211 * more than expected number of get operations to do right now.
212 *
213 * @param task, pointer of current task in the concurrency
214 *
215 * @return bool, if get too fast, return true, else return false
216 */
217 static bool ms_is_get_too_fast(ms_task_t *task)
218 {
219 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
220 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
221
222 /* no get operation */
223 if (get_prop < PROP_ERROR)
224 {
225 return false;
226 }
227
228 int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR))
229 * task->cycle_undo_get;
230
231 if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop)
232 && (task->cycle_undo_set > max_undo_set))
233 {
234 return true;
235 }
236
237 return false;
238 } /* ms_is_get_too_fast */
239
240
241 /**
242 * used to judge whether the number of set operations done is
243 * more than expected number of set operations to do right now.
244 *
245 * @param task, pointer of current task in the concurrency
246 *
247 * @return bool, if set too fast, return true, else return false
248 */
249 static bool ms_is_set_too_fast(ms_task_t *task)
250 {
251 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
252 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
253
254 /* no set operation */
255 if (set_prop < PROP_ERROR)
256 {
257 return false;
258 }
259
260 /* If it does set operation too fast, skip some */
261 int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
262 * (double)task->cycle_undo_set);
263
264 if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop)
265 && (task->cycle_undo_get > max_undo_get))
266 {
267 return true;
268 }
269
270 return false;
271 } /* ms_is_set_too_fast */
272
273
274 /**
275 * kick out the old item in the window, and add a new item to
276 * overwrite the old item. When we don't want to do overwrite
277 * object, and the current item to do set operation is an old
278 * item, we could kick out the old item and add a new item. Then
279 * we can ensure we set new object every time.
280 *
281 * @param item, pointer of task item which includes the object
282 * information
283 */
284 static void ms_kick_out_item(ms_task_item_t *item)
285 {
286 /* allocate a new item */
287 item->key_prefix= ms_get_key_prefix();
288
289 item->key_suffix_offset++;
290 item->value_offset= INVALID_OFFSET; /* new item use invalid value offset */
291 item->client_time= 0;
292 } /* ms_kick_out_item */
293
294
295 /**
296 * used to judge whether we need overwrite object based on the
297 * options user specified
298 *
299 * @param task, pointer of current task in the concurrency
300 *
301 * @return bool, if need overwrite, return true, else return
302 * false
303 */
304 static bool ms_need_overwirte_item(ms_task_t *task)
305 {
306 ms_task_item_t *item= task->item;
307
308 assert(item != NULL);
309 assert(task->cmd == CMD_SET);
310
311 /**
312 * according to data overwrite percent to determine if do data
313 * overwrite.
314 */
315 if (task->overwrite_set < (double)task->set_opt
316 * ms_setting.overwrite_percent)
317 {
318 return true;
319 }
320
321 return false;
322 } /* ms_need_overwirte_item */
323
324
325 /**
326 * used to adjust operation. the function must be called after
327 * select operation. the function change get operation to set
328 * operation, or set operation to get operation based on the
329 * current case.
330 *
331 * @param c, pointer of the concurrency
332 * @param task, pointer of current task in the concurrency
333 *
334 * @return bool, if success, return true, else return false
335 */
336 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
337 {
338 ms_task_item_t *item= task->item;
339
340 assert(item != NULL);
341
342 if (task->cmd == CMD_SET)
343 {
344 /* If did set operation too fast, skip some */
345 if (ms_is_set_too_fast(task))
346 {
347 /* get the item instead */
348 if (item->value_offset != INVALID_OFFSET)
349 {
350 task->cmd= CMD_GET;
351 return true;
352 }
353 }
354
355 /* If the current item is not a new item, kick it out */
356 if (item->value_offset != INVALID_OFFSET)
357 {
358 if (ms_need_overwirte_item(task))
359 {
360 /* overwrite */
361 task->overwrite_set++;
362 }
363 else
364 {
365 /* kick out the current item to do set operation */
366 ms_kick_out_item(item);
367 }
368 }
369 else /* it's a new item */
370 {
371 /* need overwrite */
372 if (ms_need_overwirte_item(task))
373 {
374 item= ms_get_pre_set_item(c);
375 if (item->value_offset != INVALID_OFFSET)
376 {
377 task->item= item;
378 task->overwrite_set++;
379 }
380 else /* previous set item is a new item */
381 {
382 /* select the previous item to run, and cancel overwrite */
383 task->item= item;
384 }
385 }
386 }
387 task->cmd= CMD_SET;
388 return true;
389 }
390 else
391 {
392 if (item->value_offset == INVALID_OFFSET)
393 {
394 task->cmd= CMD_SET;
395 return true;
396 }
397
398 /**
399 * If It does get operation too fast, it will change the
400 * operation to set.
401 */
402 if (ms_is_get_too_fast(task))
403 {
404 /* don't kick out the first item in the window */
405 if (! ms_is_set_too_fast(task))
406 {
407 ms_kick_out_item(item);
408 task->cmd= CMD_SET;
409 return true;
410 }
411 else
412 {
413 return false;
414 }
415 }
416
417 assert(item->value_offset != INVALID_OFFSET);
418
419 task->cmd= CMD_GET;
420 return true;
421 }
422 } /* ms_adjust_opt */
423
424
425 /**
426 * used to initialize the task which need verify data.
427 *
428 * @param task, pointer of current task in the concurrency
429 */
430 static void ms_task_data_verify_init(ms_task_t *task)
431 {
432 ms_task_item_t *item= task->item;
433
434 assert(item != NULL);
435 assert(task->cmd == CMD_GET);
436
437 /**
438 * according to data verification percent to determine if do
439 * data verification.
440 */
441 if (task->verified_get < (double)task->get_opt
442 * ms_setting.verify_percent)
443 {
444 /**
445 * currently it doesn't do verify, just increase the counter,
446 * and do verification next proper get command
447 */
448 if ((task->item->value_offset != INVALID_OFFSET)
449 && (item->exp_time == 0))
450 {
451 task->verify= true;
452 task->finish_verify= false;
453 task->verified_get++;
454 }
455 }
456 } /* ms_task_data_verify_init */
457
458
459 /**
460 * used to initialize the task which need verify expire time.
461 *
462 * @param task, pointer of current task in the concurrency
463 */
464 static void ms_task_expire_verify_init(ms_task_t *task)
465 {
466 ms_task_item_t *item= task->item;
467
468 assert(item != NULL);
469 assert(task->cmd == CMD_GET);
470 assert(item->exp_time > 0);
471
472 task->verify= true;
473 task->finish_verify= false;
474 } /* ms_task_expire_verify_init */
475
476
477 /**
478 * used to get one task, the function initializes the task
479 * structure.
480 *
481 * @param c, pointer of the concurrency
482 * @param warmup, whether it need warmup
483 *
484 * @return ms_task_t*, pointer of current task in the
485 * concurrency
486 */
487 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
488 {
489 ms_task_t *task= &c->curr_task;
490
491 while (1)
492 {
493 task->verify= false;
494 task->finish_verify= true;
495 task->get_miss= true;
496
497 if (warmup)
498 {
499 task->cmd= CMD_SET;
500 task->item= ms_get_next_set_item(c);
501
502 return task;
503 }
504
505 /* according to operation distribution to choose doing which operation */
506 ms_select_opt(c, task);
507
508 if (! ms_adjust_opt(c, task))
509 {
510 continue;
511 }
512
513 if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET))
514 {
515 ms_task_data_verify_init(task);
516 }
517
518 if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET)
519 && (task->item->exp_time > 0))
520 {
521 ms_task_expire_verify_init(task);
522 }
523
524 break;
525 }
526
527 /**
528 * Only update get and delete counter, set counter will be
529 * updated after set operation successes.
530 */
531 if (task->cmd == CMD_GET)
532 {
533 task->get_opt++;
534 task->cycle_undo_get--;
535 }
536
537 return task;
538 } /* ms_get_task */
539
540
541 /**
542 * send a signal to the main monitor thread
543 *
544 * @param sync_lock, pointer of the lock
545 */
546 static void ms_send_signal(ms_sync_lock_t *sync_lock)
547 {
548 pthread_mutex_lock(&sync_lock->lock);
549 sync_lock->count++;
550 pthread_cond_signal(&sync_lock->cond);
551 pthread_mutex_unlock(&sync_lock->lock);
552 } /* ms_send_signal */
553
554
555 /**
556 * If user only want to do get operation, but there is no object
557 * in server , so we use this function to warmup the server, and
558 * set some objects to server. It runs at the beginning of task.
559 *
560 * @param c, pointer of the concurrency
561 */
562 static void ms_warmup_server(ms_conn_t *c)
563 {
564 ms_task_t *task;
565 ms_task_item_t *item;
566
567 /**
568 * Extra one loop to get the last command returned state.
569 * Normally it gets the previous command returned state.
570 */
571 if ((c->remain_warmup_num >= 0)
572 && (c->remain_warmup_num != c->warmup_num))
573 {
574 item= ms_get_cur_opt_item(c);
575 /* only update the set command result state for data verification */
576 if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED))
577 {
578 item->value_offset= item->key_suffix_offset;
579 /* set success, update counter */
580 c->set_cursor++;
581 }
582 else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED)
583 {
584 printf("key: %" PRIx64 " didn't set success\n", item->key_prefix);
585 }
586 }
587
588 /* the last time don't run a task */
589 if (c->remain_warmup_num-- > 0)
590 {
591 /* operate next task item */
592 task= ms_get_task(c, true);
593 item= task->item;
594 ms_mcd_set(c, item);
595 }
596
597 /**
598 * finish warming up server, wait all connects initialize
599 * complete. Then all connects can start do task at the same
600 * time.
601 */
602 if (c->remain_warmup_num == -1)
603 {
604 ms_send_signal(&ms_global.init_lock);
605 c->remain_warmup_num--; /* never run the if branch */
606 }
607 } /* ms_warmup_server */
608
609
610 /**
611 * dispatch single get and set task
612 *
613 * @param c, pointer of the concurrency
614 */
615 static void ms_single_getset_task_sch(ms_conn_t *c)
616 {
617 ms_task_t *task;
618 ms_task_item_t *item;
619
620 /* the last time don't run a task */
621 if (c->remain_exec_num-- > 0)
622 {
623 task= ms_get_task(c, false);
624 item= task->item;
625 if (task->cmd == CMD_SET)
626 {
627 ms_mcd_set(c, item);
628 }
629 else if (task->cmd == CMD_GET)
630 {
631 assert(task->cmd == CMD_GET);
632 ms_mcd_get(c, item, task->verify);
633 }
634 }
635 } /* ms_single_getset_task_sch */
636
637
638 /**
639 * dispatch multi-get and set task
640 *
641 * @param c, pointer of the concurrency
642 */
643 static void ms_multi_getset_task_sch(ms_conn_t *c)
644 {
645 ms_task_t *task;
646 ms_mlget_task_item_t *mlget_item;
647
648 while (1)
649 {
650 if (c->remain_exec_num-- > 0)
651 {
652 task= ms_get_task(c, false);
653 if (task->cmd == CMD_SET) /* just do it */
654 {
655 ms_mcd_set(c, task->item);
656 break;
657 }
658 else
659 {
660 assert(task->cmd == CMD_GET);
661 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
662 mlget_item->item= task->item;
663 mlget_item->verify= task->verify;
664 mlget_item->finish_verify= task->finish_verify;
665 mlget_item->get_miss= task->get_miss;
666 c->mlget_task.mlget_num++;
667
668 /* enough multi-get task items can be done */
669 if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
670 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
671 {
672 ms_mcd_mlget(c);
673 break;
674 }
675 }
676 }
677 else
678 {
679 if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0))
680 {
681 ms_mcd_mlget(c);
682 }
683 break;
684 }
685 }
686 } /* ms_multi_getset_task_sch */
687
688
689 /**
690 * calculate the difference value of two time points
691 *
692 * @param start_time, the start time
693 * @param end_time, the end time
694 *
695 * @return uint64_t, the difference value between start_time and end_time in us
696 */
697 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
698 {
699 int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec;
700 int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec;
701
702 assert(endtime >= starttime);
703
704 return endtime - starttime;
705 } /* ms_time_diff */
706
707
708 /**
709 * after get the response from server for multi-get, the
710 * function update the state of the task and do data verify if
711 * necessary.
712 *
713 * @param c, pointer of the concurrency
714 */
715 static void ms_update_multi_get_result(ms_conn_t *c)
716 {
717 ms_mlget_task_item_t *mlget_item;
718 ms_task_item_t *item;
719 char *orignval= NULL;
720 char *orignkey= NULL;
721
722 if (c == NULL)
723 {
724 return;
725 }
726 assert(c != NULL);
727
728 for (int i= 0; i < c->mlget_task.mlget_num; i++)
729 {
730 mlget_item= &c->mlget_task.mlget_item[i];
731 item= mlget_item->item;
732 orignval= &ms_setting.char_block[item->value_offset];
733 orignkey= &ms_setting.char_block[item->key_suffix_offset];
734
735 /* update get miss counter */
736 if (mlget_item->get_miss)
737 {
738 atomic_add_size(&ms_stats.get_misses, 1);
739 }
740
741 /* get nothing from server for this task item */
742 if (mlget_item->verify && ! mlget_item->finish_verify)
743 {
744 /* verify expire time if necessary */
745 if (item->exp_time > 0)
746 {
747 struct timeval curr_time;
748 gettimeofday(&curr_time, NULL);
749
750 /* object doesn't expire but can't get it now */
751 if (curr_time.tv_sec - item->client_time
752 < item->exp_time - EXPIRE_TIME_ERROR)
753 {
754 atomic_add_size(&ms_stats.unexp_unget, 1);
755
756 if (ms_setting.verbose)
757 {
758 char set_time[64];
759 char cur_time[64];
760 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
761 localtime(&item->client_time));
762 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
763 localtime(&curr_time.tv_sec));
764 fprintf(stderr,
765 "\n\t<%d expire time verification failed, object "
766 "doesn't expire but can't get it now\n"
767 "\tkey len: %d\n"
768 "\tkey: %" PRIx64 " %.*s\n"
769 "\tset time: %s current time: %s "
770 "diff time: %d expire time: %d\n"
771 "\texpected data len: %d\n"
772 "\texpected data: %.*s\n"
773 "\treceived data: \n",
774 c->sfd,
775 item->key_size,
776 item->key_prefix,
777 item->key_size - (int)KEY_PREFIX_SIZE,
778 orignkey,
779 set_time,
780 cur_time,
781 (int)(curr_time.tv_sec - item->client_time),
782 item->exp_time,
783 item->value_size,
784 item->value_size,
785 orignval);
786 fflush(stderr);
787 }
788 }
789 }
790 else
791 {
792 atomic_add_size(&ms_stats.vef_miss, 1);
793
794 if (ms_setting.verbose)
795 {
796 fprintf(stderr, "\n<%d data verification failed\n"
797 "\tkey len: %d\n"
798 "\tkey: %" PRIx64 " %.*s\n"
799 "\texpected data len: %d\n"
800 "\texpected data: %.*s\n"
801 "\treceived data: \n",
802 c->sfd, item->key_size, item->key_prefix,
803 item->key_size - (int)KEY_PREFIX_SIZE,
804 orignkey, item->value_size, item->value_size, orignval);
805 fflush(stderr);
806 }
807 }
808 }
809 }
810 c->mlget_task.mlget_num= 0;
811 c->mlget_task.value_index= INVALID_OFFSET;
812 } /* ms_update_multi_get_result */
813
814
815 /**
816 * after get the response from server for single get, the
817 * function update the state of the task and do data verify if
818 * necessary.
819 *
820 * @param c, pointer of the concurrency
821 * @param item, pointer of task item which includes the object
822 * information
823 */
824 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
825 {
826 char *orignval= NULL;
827 char *orignkey= NULL;
828
829 if ((c == NULL) || (item == NULL))
830 {
831 return;
832 }
833 assert(c != NULL);
834 assert(item != NULL);
835
836 orignval= &ms_setting.char_block[item->value_offset];
837 orignkey= &ms_setting.char_block[item->key_suffix_offset];
838
839 /* update get miss counter */
840 if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
841 {
842 atomic_add_size(&ms_stats.get_misses, 1);
843 }
844
845 /* get nothing from server for this task item */
846 if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify
847 && ! c->curr_task.finish_verify)
848 {
849 /* verify expire time if necessary */
850 if (item->exp_time > 0)
851 {
852 struct timeval curr_time;
853 gettimeofday(&curr_time, NULL);
854
855 /* object doesn't expire but can't get it now */
856 if (curr_time.tv_sec - item->client_time
857 < item->exp_time - EXPIRE_TIME_ERROR)
858 {
859 atomic_add_size(&ms_stats.unexp_unget, 1);
860
861 if (ms_setting.verbose)
862 {
863 char set_time[64];
864 char cur_time[64];
865 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
866 localtime(&item->client_time));
867 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
868 localtime(&curr_time.tv_sec));
869 fprintf(stderr,
870 "\n\t<%d expire time verification failed, object "
871 "doesn't expire but can't get it now\n"
872 "\tkey len: %d\n"
873 "\tkey: %" PRIx64 " %.*s\n"
874 "\tset time: %s current time: %s "
875 "diff time: %d expire time: %d\n"
876 "\texpected data len: %d\n"
877 "\texpected data: %.*s\n"
878 "\treceived data: \n",
879 c->sfd,
880 item->key_size,
881 item->key_prefix,
882 item->key_size - (int)KEY_PREFIX_SIZE,
883 orignkey,
884 set_time,
885 cur_time,
886 (int)(curr_time.tv_sec - item->client_time),
887 item->exp_time,
888 item->value_size,
889 item->value_size,
890 orignval);
891 fflush(stderr);
892 }
893 }
894 }
895 else
896 {
897 atomic_add_size(&ms_stats.vef_miss, 1);
898
899 if (ms_setting.verbose)
900 {
901 fprintf(stderr, "\n<%d data verification failed\n"
902 "\tkey len: %d\n"
903 "\tkey: %" PRIx64 " %.*s\n"
904 "\texpected data len: %d\n"
905 "\texpected data: %.*s\n"
906 "\treceived data: \n",
907 c->sfd, item->key_size, item->key_prefix,
908 item->key_size - (int)KEY_PREFIX_SIZE,
909 orignkey, item->value_size, item->value_size, orignval);
910 fflush(stderr);
911 }
912 }
913 }
914 } /* ms_update_single_get_result */
915
916
917 /**
918 * after get the response from server for set the function
919 * update the state of the task and do data verify if necessary.
920 *
921 * @param c, pointer of the concurrency
922 * @param item, pointer of task item which includes the object
923 * information
924 */
925 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
926 {
927 if ((c == NULL) || (item == NULL))
928 {
929 return;
930 }
931 assert(c != NULL);
932 assert(item != NULL);
933
934 if (c->precmd.cmd == CMD_SET)
935 {
936 switch (c->precmd.retstat)
937 {
938 case MCD_STORED:
939 if (item->value_offset == INVALID_OFFSET)
940 {
941 /* first set with the same offset of key suffix */
942 item->value_offset= item->key_suffix_offset;
943 }
944 else
945 {
946 /* not first set, just increase the value offset */
947 item->value_offset+= 1;
948 }
949
950 /* set successes, update counter */
951 c->set_cursor++;
952 c->curr_task.set_opt++;
953 c->curr_task.cycle_undo_set--;
954 break;
955
956 case MCD_SERVER_ERROR:
957 default:
958 break;
959 } /* switch */
960 }
961 } /* ms_update_set_result */
962
963
964 /**
965 * update the response time result
966 *
967 * @param c, pointer of the concurrency
968 */
969 static void ms_update_stat_result(ms_conn_t *c)
970 {
971 bool get_miss= false;
972
973 if (c == NULL)
974 {
975 return;
976 }
977 assert(c != NULL);
978
979 gettimeofday(&c->end_time, NULL);
980 uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time);
981
982 pthread_mutex_lock(&ms_statistic.stat_mutex);
983
984 switch (c->precmd.cmd)
985 {
986 case CMD_SET:
987 ms_record_event(&ms_statistic.set_stat, time_diff, false);
988 break;
989
990 case CMD_GET:
991 if (c->curr_task.get_miss)
992 {
993 get_miss= true;
994 }
995 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
996 break;
997
998 default:
999 break;
1000 } /* switch */
1001
1002 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
1003 pthread_mutex_unlock(&ms_statistic.stat_mutex);
1004 } /* ms_update_stat_result */
1005
1006
1007 /**
1008 * after get response from server for the current operation, and
1009 * before doing the next operation, update the state of the
1010 * current operation.
1011 *
1012 * @param c, pointer of the concurrency
1013 */
1014 static void ms_update_task_result(ms_conn_t *c)
1015 {
1016 ms_task_item_t *item;
1017
1018 if (c == NULL)
1019 {
1020 return;
1021 }
1022 assert(c != NULL);
1023
1024 item= ms_get_cur_opt_item(c);
1025 if (item == NULL)
1026 {
1027 return;
1028 }
1029 assert(item != NULL);
1030
1031 ms_update_set_result(c, item);
1032
1033 if ((ms_setting.stat_freq > 0)
1034 && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET)))
1035 {
1036 ms_update_stat_result(c);
1037 }
1038
1039 /* update multi-get task item */
1040 if (((ms_setting.mult_key_num > 1)
1041 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1042 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1043 {
1044 ms_update_multi_get_result(c);
1045 }
1046 else
1047 {
1048 ms_update_single_get_result(c, item);
1049 }
1050 } /* ms_update_task_result */
1051
1052
1053 /**
1054 * run get and set operation
1055 *
1056 * @param c, pointer of the concurrency
1057 *
1058 * @return int, if success, return 0, else return -1
1059 */
1060 static int ms_run_getset_task(ms_conn_t *c)
1061 {
1062 /**
1063 * extra one loop to get the last command return state. get the
1064 * last command return state.
1065 */
1066 if ((c->remain_exec_num >= 0)
1067 && (c->remain_exec_num != c->exec_num))
1068 {
1069 ms_update_task_result(c);
1070 }
1071
1072 /* multi-get */
1073 if (ms_setting.mult_key_num > 1)
1074 {
1075 /* operate next task item */
1076 ms_multi_getset_task_sch(c);
1077 }
1078 else
1079 {
1080 /* operate next task item */
1081 ms_single_getset_task_sch(c);
1082 }
1083
1084 /* no task to do, exit */
1085 if ((c->remain_exec_num == -1) || ms_global.time_out)
1086 {
1087 return -1;
1088 }
1089
1090 return 0;
1091 } /* ms_run_getset_task */
1092
1093
1094 /**
1095 * the state machine call the function to execute task.
1096 *
1097 * @param c, pointer of the concurrency
1098 *
1099 * @return int, if success, return 0, else return -1
1100 */
1101 int ms_exec_task(struct conn *c)
1102 {
1103 if (! ms_global.finish_warmup)
1104 {
1105 ms_warmup_server(c);
1106 }
1107 else
1108 {
1109 if (ms_run_getset_task(c) != 0)
1110 {
1111 return -1;
1112 }
1113 }
1114
1115 return 0;
1116 } /* ms_exec_task */