Made the new memslap code build. Moved config.h include to the beginning of each...
[awesomized/libmemcached] / clients / ms_task.c
1 /*
2 * File: ms_task.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include "ms_thread.h"
15 #include "ms_setting.h"
16
17 /* command distribution adjustment cycle */
18 #define CMD_DISTR_ADJUST_CYCLE 1000
19 #define DISADJUST_FACTOR 0.03 /**
20 * In one adjustment cycle, if undo set or get
21 * operations proportion is more than 3% , means
22 * there are too many new item or need more new
23 * item in the window. This factor shows it.
24 */
25
26 extern __thread ms_thread_t ms_thread;
27
28 /* get item from task window */
29 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
30 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
31 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
32 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c);
33
34
35 /* select next operation to do */
36 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
37
38
39 /* set and get speed estimate for controlling and adjustment */
40 static bool ms_is_set_too_fast(ms_task_t *task);
41 static bool ms_is_get_too_fast(ms_task_t *task);
42 static void ms_kick_out_item(ms_task_item_t *item);
43
44
45 /* miss rate adjustment */
46 static bool ms_need_overwirte_item(ms_task_t *task);
47 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
48
49
50 /* deal with data verification initialization */
51 static void ms_task_data_verify_init(ms_task_t *task);
52 static void ms_task_expire_verify_init(ms_task_t *task);
53
54
55 /* select a new task to do */
56 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
57
58
59 /* run the selected task */
60 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
61 static void ms_update_stat_result(ms_conn_t *c);
62 static void ms_update_multi_get_result(ms_conn_t *c);
63 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
64 static void ms_update_task_result(ms_conn_t *c);
65 static void ms_single_getset_task_sch(ms_conn_t *c);
66 static void ms_multi_getset_task_sch(ms_conn_t *c);
67 static void ms_send_signal(ms_sync_lock_t *sync_lock);
68 static void ms_warmup_server(ms_conn_t *c);
69 static int ms_run_getset_task(ms_conn_t *c);
70
71
72 /**
73 * used to get the current operation item(object)
74 *
75 * @param c, pointer of the concurrency
76 *
77 * @return ms_task_item_t*, current operating item
78 */
79 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
80 {
81 return c->curr_task.item;
82 }
83
84
85 /**
86 * used to get the next item to do get operation
87 *
88 * @param c, pointer of the concurrency
89 *
90 * @return ms_task_item_t*, the pointer of the next item to do
91 * get operation
92 */
93 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
94 {
95 ms_task_item_t *item= NULL;
96
97 if (c->set_cursor <= 0)
98 {
99 /* the first item in the window */
100 item= &c->item_win[0];
101 }
102 else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size)
103 {
104 /* random get one item set before */
105 item= &c->item_win[random() % (int64_t)c->set_cursor];
106 }
107 else
108 {
109 /* random get one item from the window */
110 item= &c->item_win[random() % c->win_size];
111 }
112
113 return item;
114 } /* ms_get_next_get_item */
115
116
117 /**
118 * used to get the next item to do set operation
119 *
120 * @param c, pointer of the concurrency
121 *
122 * @return ms_task_item_t*, the pointer of the next item to do
123 * set operation
124 */
125 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
126 {
127 /**
128 * when a set command successes, the cursor will plus 1. If set
129 * fails, the cursor doesn't change. it isn't necessary to
130 * increase the cursor here.
131 */
132 return &c->item_win[(int64_t)c->set_cursor % c->win_size];
133 }
134
135
136 /**
137 * If we need do overwrite, we could select a item set before.
138 * This function is used to get a item set before to do
139 * overwrite.
140 *
141 * @param c, pointer of the concurrency
142 *
143 * @return ms_task_item_t*, the pointer of the previous item of
144 * set operation
145 */
146 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c)
147 {
148 if (c->set_cursor <= 0)
149 {
150 return &c->item_win[0];
151 }
152 else
153 {
154 return &c->item_win[(int64_t)-- c->set_cursor % c->win_size];
155 }
156 } /* ms_get_pre_set_item */
157
158
159 /**
160 * According to the proportion of operations(get or set), select
161 * an operation to do.
162 *
163 * @param c, pointer of the concurrency
164 * @param task, pointer of current task in the concurrency
165 */
166 static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
167 {
168 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
169 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
170
171 /* update cycle operation number if necessary */
172 if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0))
173 {
174 task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
175 task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
176 }
177
178 /**
179 * According to operation distribution to choose doing which
180 * operation. If it can't set new object to sever, just change
181 * to do get operation.
182 */
183 if ((set_prop > PROP_ERROR)
184 && ((double)task->get_opt * set_prop >= (double)task->set_opt
185 * get_prop))
186 {
187 task->cmd= CMD_SET;
188 task->item= ms_get_next_set_item(c);
189 }
190 else
191 {
192 task->cmd= CMD_GET;
193 task->item= ms_get_next_get_item(c);
194 }
195 } /* ms_select_opt */
196
197
198 /**
199 * used to judge whether the number of get operations done is
200 * more than expected number of get operations to do right now.
201 *
202 * @param task, pointer of current task in the concurrency
203 *
204 * @return bool, if get too fast, return true, else return false
205 */
206 static bool ms_is_get_too_fast(ms_task_t *task)
207 {
208 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
209 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
210
211 /* no get operation */
212 if (get_prop < PROP_ERROR)
213 {
214 return false;
215 }
216
217 int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR))
218 * task->cycle_undo_get;
219
220 if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop)
221 && (task->cycle_undo_set > max_undo_set))
222 {
223 return true;
224 }
225
226 return false;
227 } /* ms_is_get_too_fast */
228
229
230 /**
231 * used to judge whether the number of set operations done is
232 * more than expected number of set operations to do right now.
233 *
234 * @param task, pointer of current task in the concurrency
235 *
236 * @return bool, if set too fast, return true, else return false
237 */
238 static bool ms_is_set_too_fast(ms_task_t *task)
239 {
240 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
241 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
242
243 /* no set operation */
244 if (set_prop < PROP_ERROR)
245 {
246 return false;
247 }
248
249 /* If it does set operation too fast, skip some */
250 int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
251 * (double)task->cycle_undo_set);
252
253 if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop)
254 && (task->cycle_undo_get > max_undo_get))
255 {
256 return true;
257 }
258
259 return false;
260 } /* ms_is_set_too_fast */
261
262
263 /**
264 * kick out the old item in the window, and add a new item to
265 * overwrite the old item. When we don't want to do overwrite
266 * object, and the current item to do set operation is an old
267 * item, we could kick out the old item and add a new item. Then
268 * we can ensure we set new object every time.
269 *
270 * @param item, pointer of task item which includes the object
271 * information
272 */
273 static void ms_kick_out_item(ms_task_item_t *item)
274 {
275 /* allocate a new item */
276 item->key_prefix= ms_get_key_prefix();
277
278 item->key_suffix_offset++;
279 item->value_offset= INVALID_OFFSET; /* new item use invalid value offset */
280 item->client_time= 0;
281 } /* ms_kick_out_item */
282
283
284 /**
285 * used to judge whether we need overwrite object based on the
286 * options user specified
287 *
288 * @param task, pointer of current task in the concurrency
289 *
290 * @return bool, if need overwrite, return true, else return
291 * false
292 */
293 static bool ms_need_overwirte_item(ms_task_t *task)
294 {
295 ms_task_item_t *item= task->item;
296
297 assert(item != NULL);
298 assert(task->cmd == CMD_SET);
299
300 /**
301 * according to data overwrite percent to determine if do data
302 * overwrite.
303 */
304 if (task->overwrite_set < (double)task->set_opt
305 * ms_setting.overwrite_percent)
306 {
307 return true;
308 }
309
310 return false;
311 } /* ms_need_overwirte_item */
312
313
314 /**
315 * used to adjust operation. the function must be called after
316 * select operation. the function change get operation to set
317 * operation, or set operation to get operation based on the
318 * current case.
319 *
320 * @param c, pointer of the concurrency
321 * @param task, pointer of current task in the concurrency
322 *
323 * @return bool, if success, return true, else return false
324 */
325 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
326 {
327 ms_task_item_t *item= task->item;
328
329 assert(item != NULL);
330
331 if (task->cmd == CMD_SET)
332 {
333 /* If did set operation too fast, skip some */
334 if (ms_is_set_too_fast(task))
335 {
336 /* get the item instead */
337 if (item->value_offset != INVALID_OFFSET)
338 {
339 task->cmd= CMD_GET;
340 return true;
341 }
342 }
343
344 /* If the current item is not a new item, kick it out */
345 if (item->value_offset != INVALID_OFFSET)
346 {
347 if (ms_need_overwirte_item(task))
348 {
349 /* overwrite */
350 task->overwrite_set++;
351 }
352 else
353 {
354 /* kick out the current item to do set operation */
355 ms_kick_out_item(item);
356 }
357 }
358 else /* it's a new item */
359 {
360 /* need overwrite */
361 if (ms_need_overwirte_item(task))
362 {
363 item= ms_get_pre_set_item(c);
364 if (item->value_offset != INVALID_OFFSET)
365 {
366 task->item= item;
367 task->overwrite_set++;
368 }
369 else /* previous set item is a new item */
370 {
371 /* select the previous item to run, and cancel overwrite */
372 task->item= item;
373 }
374 }
375 }
376 task->cmd= CMD_SET;
377 return true;
378 }
379 else
380 {
381 if (item->value_offset == INVALID_OFFSET)
382 {
383 task->cmd= CMD_SET;
384 return true;
385 }
386
387 /**
388 * If It does get operation too fast, it will change the
389 * operation to set.
390 */
391 if (ms_is_get_too_fast(task))
392 {
393 /* don't kick out the first item in the window */
394 if (! ms_is_set_too_fast(task))
395 {
396 ms_kick_out_item(item);
397 task->cmd= CMD_SET;
398 return true;
399 }
400 else
401 {
402 return false;
403 }
404 }
405
406 assert(item->value_offset != INVALID_OFFSET);
407
408 task->cmd= CMD_GET;
409 return true;
410 }
411 } /* ms_adjust_opt */
412
413
414 /**
415 * used to initialize the task which need verify data.
416 *
417 * @param task, pointer of current task in the concurrency
418 */
419 static void ms_task_data_verify_init(ms_task_t *task)
420 {
421 ms_task_item_t *item= task->item;
422
423 assert(item != NULL);
424 assert(task->cmd == CMD_GET);
425
426 /**
427 * according to data verification percent to determine if do
428 * data verification.
429 */
430 if (task->verified_get < (double)task->get_opt
431 * ms_setting.verify_percent)
432 {
433 /**
434 * currently it doesn't do verify, just increase the counter,
435 * and do verification next proper get command
436 */
437 if ((task->item->value_offset != INVALID_OFFSET)
438 && (item->exp_time == 0))
439 {
440 task->verify= true;
441 task->finish_verify= false;
442 task->verified_get++;
443 }
444 }
445 } /* ms_task_data_verify_init */
446
447
448 /**
449 * used to initialize the task which need verify expire time.
450 *
451 * @param task, pointer of current task in the concurrency
452 */
453 static void ms_task_expire_verify_init(ms_task_t *task)
454 {
455 ms_task_item_t *item= task->item;
456
457 assert(item != NULL);
458 assert(task->cmd == CMD_GET);
459 assert(item->exp_time > 0);
460
461 task->verify= true;
462 task->finish_verify= false;
463 } /* ms_task_expire_verify_init */
464
465
466 /**
467 * used to get one task, the function initializes the task
468 * structure.
469 *
470 * @param c, pointer of the concurrency
471 * @param warmup, whether it need warmup
472 *
473 * @return ms_task_t*, pointer of current task in the
474 * concurrency
475 */
476 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
477 {
478 ms_task_t *task= &c->curr_task;
479
480 while (1)
481 {
482 task->verify= false;
483 task->finish_verify= true;
484 task->get_miss= true;
485
486 if (warmup)
487 {
488 task->cmd= CMD_SET;
489 task->item= ms_get_next_set_item(c);
490
491 return task;
492 }
493
494 /* according to operation distribution to choose doing which operation */
495 ms_select_opt(c, task);
496
497 if (! ms_adjust_opt(c, task))
498 {
499 continue;
500 }
501
502 if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET))
503 {
504 ms_task_data_verify_init(task);
505 }
506
507 if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET)
508 && (task->item->exp_time > 0))
509 {
510 ms_task_expire_verify_init(task);
511 }
512
513 break;
514 }
515
516 /**
517 * Only update get and delete counter, set counter will be
518 * updated after set operation successes.
519 */
520 if (task->cmd == CMD_GET)
521 {
522 task->get_opt++;
523 task->cycle_undo_get--;
524 }
525
526 return task;
527 } /* ms_get_task */
528
529
530 /**
531 * send a signal to the main monitor thread
532 *
533 * @param sync_lock, pointer of the lock
534 */
535 static void ms_send_signal(ms_sync_lock_t *sync_lock)
536 {
537 pthread_mutex_lock(&sync_lock->lock);
538 sync_lock->count++;
539 pthread_cond_signal(&sync_lock->cond);
540 pthread_mutex_unlock(&sync_lock->lock);
541 } /* ms_send_signal */
542
543
544 /**
545 * If user only want to do get operation, but there is no object
546 * in server , so we use this function to warmup the server, and
547 * set some objects to server. It runs at the beginning of task.
548 *
549 * @param c, pointer of the concurrency
550 */
551 static void ms_warmup_server(ms_conn_t *c)
552 {
553 ms_task_t *task;
554 ms_task_item_t *item;
555
556 /**
557 * Extra one loop to get the last command returned state.
558 * Normally it gets the previous command returned state.
559 */
560 if ((c->remain_warmup_num >= 0)
561 && (c->remain_warmup_num != c->warmup_num))
562 {
563 item= ms_get_cur_opt_item(c);
564 /* only update the set command result state for data verification */
565 if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED))
566 {
567 item->value_offset= item->key_suffix_offset;
568 /* set success, update counter */
569 c->set_cursor++;
570 }
571 else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED)
572 {
573 printf("key: %lx didn't set success\n", item->key_prefix);
574 }
575 }
576
577 /* the last time don't run a task */
578 if (c->remain_warmup_num-- > 0)
579 {
580 /* operate next task item */
581 task= ms_get_task(c, true);
582 item= task->item;
583 ms_mcd_set(c, item);
584 }
585
586 /**
587 * finish warming up server, wait all connects initialize
588 * complete. Then all connects can start do task at the same
589 * time.
590 */
591 if (c->remain_warmup_num == -1)
592 {
593 ms_send_signal(&ms_global.init_lock);
594 c->remain_warmup_num--; /* never run the if branch */
595 }
596 } /* ms_warmup_server */
597
598
599 /**
600 * dispatch single get and set task
601 *
602 * @param c, pointer of the concurrency
603 */
604 static void ms_single_getset_task_sch(ms_conn_t *c)
605 {
606 ms_task_t *task;
607 ms_task_item_t *item;
608
609 /* the last time don't run a task */
610 if (c->remain_exec_num-- > 0)
611 {
612 task= ms_get_task(c, false);
613 item= task->item;
614 if (task->cmd == CMD_SET)
615 {
616 ms_mcd_set(c, item);
617 }
618 else if (task->cmd == CMD_GET)
619 {
620 assert(task->cmd == CMD_GET);
621 ms_mcd_get(c, item, task->verify);
622 }
623 }
624 } /* ms_single_getset_task_sch */
625
626
627 /**
628 * dispatch multi-get and set task
629 *
630 * @param c, pointer of the concurrency
631 */
632 static void ms_multi_getset_task_sch(ms_conn_t *c)
633 {
634 ms_task_t *task;
635 ms_mlget_task_item_t *mlget_item;
636
637 while (1)
638 {
639 if (c->remain_exec_num-- > 0)
640 {
641 task= ms_get_task(c, false);
642 if (task->cmd == CMD_SET) /* just do it */
643 {
644 ms_mcd_set(c, task->item);
645 break;
646 }
647 else
648 {
649 assert(task->cmd == CMD_GET);
650 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
651 mlget_item->item= task->item;
652 mlget_item->verify= task->verify;
653 mlget_item->finish_verify= task->finish_verify;
654 mlget_item->get_miss= task->get_miss;
655 c->mlget_task.mlget_num++;
656
657 /* enough multi-get task items can be done */
658 if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
659 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
660 {
661 ms_mcd_mlget(c);
662 break;
663 }
664 }
665 }
666 else
667 {
668 if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0))
669 {
670 ms_mcd_mlget(c);
671 }
672 break;
673 }
674 }
675 } /* ms_multi_getset_task_sch */
676
677
678 /**
679 * calculate the difference value of two time points
680 *
681 * @param start_time, the start time
682 * @param end_time, the end time
683 *
684 * @return uint64_t, the difference value between start_time and end_time in us
685 */
686 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
687 {
688 int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec;
689 int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec;
690
691 assert(endtime >= starttime);
692
693 return endtime - starttime;
694 } /* ms_time_diff */
695
696
697 /**
698 * after get the response from server for multi-get, the
699 * function update the state of the task and do data verify if
700 * necessary.
701 *
702 * @param c, pointer of the concurrency
703 */
704 static void ms_update_multi_get_result(ms_conn_t *c)
705 {
706 ms_mlget_task_item_t *mlget_item;
707 ms_task_item_t *item;
708 char *orignval= NULL;
709 char *orignkey= NULL;
710
711 if (c == NULL)
712 {
713 return;
714 }
715 assert(c != NULL);
716
717 for (int i= 0; i < c->mlget_task.mlget_num; i++)
718 {
719 mlget_item= &c->mlget_task.mlget_item[i];
720 item= mlget_item->item;
721 orignval= &ms_setting.char_block[item->value_offset];
722 orignkey= &ms_setting.char_block[item->key_suffix_offset];
723
724 /* update get miss counter */
725 if (mlget_item->get_miss)
726 {
727 __sync_fetch_and_add(&ms_stats.get_misses, 1);
728 }
729
730 /* get nothing from server for this task item */
731 if (mlget_item->verify && ! mlget_item->finish_verify)
732 {
733 /* verify expire time if necessary */
734 if (item->exp_time > 0)
735 {
736 struct timeval curr_time;
737 gettimeofday(&curr_time, NULL);
738
739 /* object doesn't expire but can't get it now */
740 if (curr_time.tv_sec - item->client_time
741 < item->exp_time - EXPIRE_TIME_ERROR)
742 {
743 __sync_fetch_and_add(&ms_stats.unexp_unget, 1);
744
745 if (ms_setting.verbose)
746 {
747 char set_time[64];
748 char cur_time[64];
749 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
750 localtime(&item->client_time));
751 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
752 localtime(&curr_time.tv_sec));
753 fprintf(stderr,
754 "\n\t<%d expire time verification failed, object "
755 "doesn't expire but can't get it now\n"
756 "\tkey len: %d\n"
757 "\tkey: %lx %.*s\n"
758 "\tset time: %s current time: %s "
759 "diff time: %d expire time: %d\n"
760 "\texpected data len: %d\n"
761 "\texpected data: %.*s\n"
762 "\treceived data: \n",
763 c->sfd,
764 item->key_size,
765 item->key_prefix,
766 item->key_size - (int)KEY_PREFIX_SIZE,
767 orignkey,
768 set_time,
769 cur_time,
770 (int)(curr_time.tv_sec - item->client_time),
771 item->exp_time,
772 item->value_size,
773 item->value_size,
774 orignval);
775 fflush(stderr);
776 }
777 }
778 }
779 else
780 {
781 __sync_fetch_and_add(&ms_stats.vef_miss, 1);
782
783 if (ms_setting.verbose)
784 {
785 fprintf(stderr, "\n<%d data verification failed\n"
786 "\tkey len: %d\n"
787 "\tkey: %lx %.*s\n"
788 "\texpected data len: %d\n"
789 "\texpected data: %.*s\n"
790 "\treceived data: \n",
791 c->sfd, item->key_size, item->key_prefix,
792 item->key_size - (int)KEY_PREFIX_SIZE,
793 orignkey, item->value_size, item->value_size, orignval);
794 fflush(stderr);
795 }
796 }
797 }
798 }
799 c->mlget_task.mlget_num= 0;
800 c->mlget_task.value_index= INVALID_OFFSET;
801 } /* ms_update_multi_get_result */
802
803
804 /**
805 * after get the response from server for single get, the
806 * function update the state of the task and do data verify if
807 * necessary.
808 *
809 * @param c, pointer of the concurrency
810 * @param item, pointer of task item which includes the object
811 * information
812 */
813 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
814 {
815 char *orignval= NULL;
816 char *orignkey= NULL;
817
818 if ((c == NULL) || (item == NULL))
819 {
820 return;
821 }
822 assert(c != NULL);
823 assert(item != NULL);
824
825 orignval= &ms_setting.char_block[item->value_offset];
826 orignkey= &ms_setting.char_block[item->key_suffix_offset];
827
828 /* update get miss counter */
829 if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
830 {
831 __sync_fetch_and_add(&ms_stats.get_misses, 1);
832 }
833
834 /* get nothing from server for this task item */
835 if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify
836 && ! c->curr_task.finish_verify)
837 {
838 /* verify expire time if necessary */
839 if (item->exp_time > 0)
840 {
841 struct timeval curr_time;
842 gettimeofday(&curr_time, NULL);
843
844 /* object doesn't expire but can't get it now */
845 if (curr_time.tv_sec - item->client_time
846 < item->exp_time - EXPIRE_TIME_ERROR)
847 {
848 __sync_fetch_and_add(&ms_stats.unexp_unget, 1);
849
850 if (ms_setting.verbose)
851 {
852 char set_time[64];
853 char cur_time[64];
854 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
855 localtime(&item->client_time));
856 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
857 localtime(&curr_time.tv_sec));
858 fprintf(stderr,
859 "\n\t<%d expire time verification failed, object "
860 "doesn't expire but can't get it now\n"
861 "\tkey len: %d\n"
862 "\tkey: %lx %.*s\n"
863 "\tset time: %s current time: %s "
864 "diff time: %d expire time: %d\n"
865 "\texpected data len: %d\n"
866 "\texpected data: %.*s\n"
867 "\treceived data: \n",
868 c->sfd,
869 item->key_size,
870 item->key_prefix,
871 item->key_size - (int)KEY_PREFIX_SIZE,
872 orignkey,
873 set_time,
874 cur_time,
875 (int)(curr_time.tv_sec - item->client_time),
876 item->exp_time,
877 item->value_size,
878 item->value_size,
879 orignval);
880 fflush(stderr);
881 }
882 }
883 }
884 else
885 {
886 __sync_fetch_and_add(&ms_stats.vef_miss, 1);
887
888 if (ms_setting.verbose)
889 {
890 fprintf(stderr, "\n<%d data verification failed\n"
891 "\tkey len: %d\n"
892 "\tkey: %lx %.*s\n"
893 "\texpected data len: %d\n"
894 "\texpected data: %.*s\n"
895 "\treceived data: \n",
896 c->sfd, item->key_size, item->key_prefix,
897 item->key_size - (int)KEY_PREFIX_SIZE,
898 orignkey, item->value_size, item->value_size, orignval);
899 fflush(stderr);
900 }
901 }
902 }
903 } /* ms_update_single_get_result */
904
905
906 /**
907 * after get the response from server for set the function
908 * update the state of the task and do data verify if necessary.
909 *
910 * @param c, pointer of the concurrency
911 * @param item, pointer of task item which includes the object
912 * information
913 */
914 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
915 {
916 if ((c == NULL) || (item == NULL))
917 {
918 return;
919 }
920 assert(c != NULL);
921 assert(item != NULL);
922
923 if (c->precmd.cmd == CMD_SET)
924 {
925 switch (c->precmd.retstat)
926 {
927 case MCD_STORED:
928 if (item->value_offset == INVALID_OFFSET)
929 {
930 /* first set with the same offset of key suffix */
931 item->value_offset= item->key_suffix_offset;
932 }
933 else
934 {
935 /* not first set, just increase the value offset */
936 item->value_offset+= 1;
937 }
938
939 /* set successes, update counter */
940 c->set_cursor++;
941 c->curr_task.set_opt++;
942 c->curr_task.cycle_undo_set--;
943 break;
944
945 case MCD_SERVER_ERROR:
946 default:
947 break;
948 } /* switch */
949 }
950 } /* ms_update_set_result */
951
952
953 /**
954 * update the response time result
955 *
956 * @param c, pointer of the concurrency
957 */
958 static void ms_update_stat_result(ms_conn_t *c)
959 {
960 bool get_miss= false;
961
962 if (c == NULL)
963 {
964 return;
965 }
966 assert(c != NULL);
967
968 gettimeofday(&c->end_time, NULL);
969 uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time);
970
971 pthread_mutex_lock(&ms_statistic.stat_mutex);
972
973 switch (c->precmd.cmd)
974 {
975 case CMD_SET:
976 ms_record_event(&ms_statistic.set_stat, time_diff, false);
977 break;
978
979 case CMD_GET:
980 if (c->curr_task.get_miss)
981 {
982 get_miss= true;
983 }
984 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
985 break;
986
987 default:
988 break;
989 } /* switch */
990
991 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
992 pthread_mutex_unlock(&ms_statistic.stat_mutex);
993 } /* ms_update_stat_result */
994
995
996 /**
997 * after get response from server for the current operation, and
998 * before doing the next operation, update the state of the
999 * current operation.
1000 *
1001 * @param c, pointer of the concurrency
1002 */
1003 static void ms_update_task_result(ms_conn_t *c)
1004 {
1005 ms_task_item_t *item;
1006
1007 if (c == NULL)
1008 {
1009 return;
1010 }
1011 assert(c != NULL);
1012
1013 item= ms_get_cur_opt_item(c);
1014 if (item == NULL)
1015 {
1016 return;
1017 }
1018 assert(item != NULL);
1019
1020 ms_update_set_result(c, item);
1021
1022 if ((ms_setting.stat_freq > 0)
1023 && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET)))
1024 {
1025 ms_update_stat_result(c);
1026 }
1027
1028 /* update multi-get task item */
1029 if (((ms_setting.mult_key_num > 1)
1030 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1031 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1032 {
1033 ms_update_multi_get_result(c);
1034 }
1035 else
1036 {
1037 ms_update_single_get_result(c, item);
1038 }
1039 } /* ms_update_task_result */
1040
1041
1042 /**
1043 * run get and set operation
1044 *
1045 * @param c, pointer of the concurrency
1046 *
1047 * @return int, if success, return 0, else return -1
1048 */
1049 static int ms_run_getset_task(ms_conn_t *c)
1050 {
1051 /**
1052 * extra one loop to get the last command return state. get the
1053 * last command return state.
1054 */
1055 if ((c->remain_exec_num >= 0)
1056 && (c->remain_exec_num != c->exec_num))
1057 {
1058 ms_update_task_result(c);
1059 }
1060
1061 /* multi-get */
1062 if (ms_setting.mult_key_num > 1)
1063 {
1064 /* operate next task item */
1065 ms_multi_getset_task_sch(c);
1066 }
1067 else
1068 {
1069 /* operate next task item */
1070 ms_single_getset_task_sch(c);
1071 }
1072
1073 /* no task to do, exit */
1074 if ((c->remain_exec_num == -1) || ms_global.time_out)
1075 {
1076 return -1;
1077 }
1078
1079 return 0;
1080 } /* ms_run_getset_task */
1081
1082
1083 /**
1084 * the state machine call the function to execute task.
1085 *
1086 * @param c, pointer of the concurrency
1087 *
1088 * @return int, if success, return 0, else return -1
1089 */
1090 int ms_exec_task(struct conn *c)
1091 {
1092 if (! ms_global.finish_warmup)
1093 {
1094 ms_warmup_server(c);
1095 }
1096 else
1097 {
1098 if (ms_run_getset_task(c) != 0)
1099 {
1100 return -1;
1101 }
1102 }
1103
1104 return 0;
1105 } /* ms_exec_task */