Made the memcached_string stuff private, but still testable.
[m6w6/libmemcached] / clients / ms_task.c
1 /*
2 * File: ms_task.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <inttypes.h>
15
16 #include "ms_thread.h"
17 #include "ms_setting.h"
18 #include "ms_atomic.h"
19
20 /* command distribution adjustment cycle */
21 #define CMD_DISTR_ADJUST_CYCLE 1000
22 #define DISADJUST_FACTOR 0.03 /**
23 * In one adjustment cycle, if undo set or get
24 * operations proportion is more than 3% , means
25 * there are too many new item or need more new
26 * item in the window. This factor shows it.
27 */
28
29 /* get item from task window */
30 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
31 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
32 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
33 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c);
34
35
36 /* select next operation to do */
37 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
38
39
40 /* set and get speed estimate for controlling and adjustment */
41 static bool ms_is_set_too_fast(ms_task_t *task);
42 static bool ms_is_get_too_fast(ms_task_t *task);
43 static void ms_kick_out_item(ms_task_item_t *item);
44
45
46 /* miss rate adjustment */
47 static bool ms_need_overwirte_item(ms_task_t *task);
48 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
49
50
51 /* deal with data verification initialization */
52 static void ms_task_data_verify_init(ms_task_t *task);
53 static void ms_task_expire_verify_init(ms_task_t *task);
54
55
56 /* select a new task to do */
57 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
58
59
60 /* run the selected task */
61 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
62 static void ms_update_stat_result(ms_conn_t *c);
63 static void ms_update_multi_get_result(ms_conn_t *c);
64 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
65 static void ms_update_task_result(ms_conn_t *c);
66 static void ms_single_getset_task_sch(ms_conn_t *c);
67 static void ms_multi_getset_task_sch(ms_conn_t *c);
68 static void ms_send_signal(ms_sync_lock_t *sync_lock);
69 static void ms_warmup_server(ms_conn_t *c);
70 static int ms_run_getset_task(ms_conn_t *c);
71
72
73 /**
74 * used to get the current operation item(object)
75 *
76 * @param c, pointer of the concurrency
77 *
78 * @return ms_task_item_t*, current operating item
79 */
80 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
81 {
82 return c->curr_task.item;
83 }
84
85
86 /**
87 * used to get the next item to do get operation
88 *
89 * @param c, pointer of the concurrency
90 *
91 * @return ms_task_item_t*, the pointer of the next item to do
92 * get operation
93 */
94 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
95 {
96 ms_task_item_t *item= NULL;
97
98 if (c->set_cursor <= 0)
99 {
100 /* the first item in the window */
101 item= &c->item_win[0];
102 }
103 else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size)
104 {
105 /* random get one item set before */
106 item= &c->item_win[random() % (int64_t)c->set_cursor];
107 }
108 else
109 {
110 /* random get one item from the window */
111 item= &c->item_win[random() % c->win_size];
112 }
113
114 return item;
115 } /* ms_get_next_get_item */
116
117
118 /**
119 * used to get the next item to do set operation
120 *
121 * @param c, pointer of the concurrency
122 *
123 * @return ms_task_item_t*, the pointer of the next item to do
124 * set operation
125 */
126 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
127 {
128 /**
129 * when a set command successes, the cursor will plus 1. If set
130 * fails, the cursor doesn't change. it isn't necessary to
131 * increase the cursor here.
132 */
133 return &c->item_win[(int64_t)c->set_cursor % c->win_size];
134 }
135
136
137 /**
138 * If we need do overwrite, we could select a item set before.
139 * This function is used to get a item set before to do
140 * overwrite.
141 *
142 * @param c, pointer of the concurrency
143 *
144 * @return ms_task_item_t*, the pointer of the previous item of
145 * set operation
146 */
147 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c)
148 {
149 if (c->set_cursor <= 0)
150 {
151 return &c->item_win[0];
152 }
153 else
154 {
155 return &c->item_win[(int64_t)-- c->set_cursor % c->win_size];
156 }
157 } /* ms_get_pre_set_item */
158
159
160 /**
161 * According to the proportion of operations(get or set), select
162 * an operation to do.
163 *
164 * @param c, pointer of the concurrency
165 * @param task, pointer of current task in the concurrency
166 */
167 static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
168 {
169 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
170 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
171
172 /* update cycle operation number if necessary */
173 if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0))
174 {
175 task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
176 task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
177 }
178
179 /**
180 * According to operation distribution to choose doing which
181 * operation. If it can't set new object to sever, just change
182 * to do get operation.
183 */
184 if ((set_prop > PROP_ERROR)
185 && ((double)task->get_opt * set_prop >= (double)task->set_opt
186 * get_prop))
187 {
188 task->cmd= CMD_SET;
189 task->item= ms_get_next_set_item(c);
190 }
191 else
192 {
193 task->cmd= CMD_GET;
194 task->item= ms_get_next_get_item(c);
195 }
196 } /* ms_select_opt */
197
198
199 /**
200 * used to judge whether the number of get operations done is
201 * more than expected number of get operations to do right now.
202 *
203 * @param task, pointer of current task in the concurrency
204 *
205 * @return bool, if get too fast, return true, else return false
206 */
207 static bool ms_is_get_too_fast(ms_task_t *task)
208 {
209 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
210 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
211
212 /* no get operation */
213 if (get_prop < PROP_ERROR)
214 {
215 return false;
216 }
217
218 int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR))
219 * task->cycle_undo_get;
220
221 if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop)
222 && (task->cycle_undo_set > max_undo_set))
223 {
224 return true;
225 }
226
227 return false;
228 } /* ms_is_get_too_fast */
229
230
231 /**
232 * used to judge whether the number of set operations done is
233 * more than expected number of set operations to do right now.
234 *
235 * @param task, pointer of current task in the concurrency
236 *
237 * @return bool, if set too fast, return true, else return false
238 */
239 static bool ms_is_set_too_fast(ms_task_t *task)
240 {
241 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
242 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
243
244 /* no set operation */
245 if (set_prop < PROP_ERROR)
246 {
247 return false;
248 }
249
250 /* If it does set operation too fast, skip some */
251 int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
252 * (double)task->cycle_undo_set);
253
254 if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop)
255 && (task->cycle_undo_get > max_undo_get))
256 {
257 return true;
258 }
259
260 return false;
261 } /* ms_is_set_too_fast */
262
263
264 /**
265 * kick out the old item in the window, and add a new item to
266 * overwrite the old item. When we don't want to do overwrite
267 * object, and the current item to do set operation is an old
268 * item, we could kick out the old item and add a new item. Then
269 * we can ensure we set new object every time.
270 *
271 * @param item, pointer of task item which includes the object
272 * information
273 */
274 static void ms_kick_out_item(ms_task_item_t *item)
275 {
276 /* allocate a new item */
277 item->key_prefix= ms_get_key_prefix();
278
279 item->key_suffix_offset++;
280 item->value_offset= INVALID_OFFSET; /* new item use invalid value offset */
281 item->client_time= 0;
282 } /* ms_kick_out_item */
283
284
285 /**
286 * used to judge whether we need overwrite object based on the
287 * options user specified
288 *
289 * @param task, pointer of current task in the concurrency
290 *
291 * @return bool, if need overwrite, return true, else return
292 * false
293 */
294 static bool ms_need_overwirte_item(ms_task_t *task)
295 {
296 ms_task_item_t *item= task->item;
297
298 assert(item != NULL);
299 assert(task->cmd == CMD_SET);
300
301 /**
302 * according to data overwrite percent to determine if do data
303 * overwrite.
304 */
305 if (task->overwrite_set < (double)task->set_opt
306 * ms_setting.overwrite_percent)
307 {
308 return true;
309 }
310
311 return false;
312 } /* ms_need_overwirte_item */
313
314
315 /**
316 * used to adjust operation. the function must be called after
317 * select operation. the function change get operation to set
318 * operation, or set operation to get operation based on the
319 * current case.
320 *
321 * @param c, pointer of the concurrency
322 * @param task, pointer of current task in the concurrency
323 *
324 * @return bool, if success, return true, else return false
325 */
326 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
327 {
328 ms_task_item_t *item= task->item;
329
330 assert(item != NULL);
331
332 if (task->cmd == CMD_SET)
333 {
334 /* If did set operation too fast, skip some */
335 if (ms_is_set_too_fast(task))
336 {
337 /* get the item instead */
338 if (item->value_offset != INVALID_OFFSET)
339 {
340 task->cmd= CMD_GET;
341 return true;
342 }
343 }
344
345 /* If the current item is not a new item, kick it out */
346 if (item->value_offset != INVALID_OFFSET)
347 {
348 if (ms_need_overwirte_item(task))
349 {
350 /* overwrite */
351 task->overwrite_set++;
352 }
353 else
354 {
355 /* kick out the current item to do set operation */
356 ms_kick_out_item(item);
357 }
358 }
359 else /* it's a new item */
360 {
361 /* need overwrite */
362 if (ms_need_overwirte_item(task))
363 {
364 item= ms_get_pre_set_item(c);
365 if (item->value_offset != INVALID_OFFSET)
366 {
367 task->item= item;
368 task->overwrite_set++;
369 }
370 else /* previous set item is a new item */
371 {
372 /* select the previous item to run, and cancel overwrite */
373 task->item= item;
374 }
375 }
376 }
377 task->cmd= CMD_SET;
378 return true;
379 }
380 else
381 {
382 if (item->value_offset == INVALID_OFFSET)
383 {
384 task->cmd= CMD_SET;
385 return true;
386 }
387
388 /**
389 * If It does get operation too fast, it will change the
390 * operation to set.
391 */
392 if (ms_is_get_too_fast(task))
393 {
394 /* don't kick out the first item in the window */
395 if (! ms_is_set_too_fast(task))
396 {
397 ms_kick_out_item(item);
398 task->cmd= CMD_SET;
399 return true;
400 }
401 else
402 {
403 return false;
404 }
405 }
406
407 assert(item->value_offset != INVALID_OFFSET);
408
409 task->cmd= CMD_GET;
410 return true;
411 }
412 } /* ms_adjust_opt */
413
414
415 /**
416 * used to initialize the task which need verify data.
417 *
418 * @param task, pointer of current task in the concurrency
419 */
420 static void ms_task_data_verify_init(ms_task_t *task)
421 {
422 ms_task_item_t *item= task->item;
423
424 assert(item != NULL);
425 assert(task->cmd == CMD_GET);
426
427 /**
428 * according to data verification percent to determine if do
429 * data verification.
430 */
431 if (task->verified_get < (double)task->get_opt
432 * ms_setting.verify_percent)
433 {
434 /**
435 * currently it doesn't do verify, just increase the counter,
436 * and do verification next proper get command
437 */
438 if ((task->item->value_offset != INVALID_OFFSET)
439 && (item->exp_time == 0))
440 {
441 task->verify= true;
442 task->finish_verify= false;
443 task->verified_get++;
444 }
445 }
446 } /* ms_task_data_verify_init */
447
448
449 /**
450 * used to initialize the task which need verify expire time.
451 *
452 * @param task, pointer of current task in the concurrency
453 */
454 static void ms_task_expire_verify_init(ms_task_t *task)
455 {
456 ms_task_item_t *item= task->item;
457
458 assert(item != NULL);
459 assert(task->cmd == CMD_GET);
460 assert(item->exp_time > 0);
461
462 task->verify= true;
463 task->finish_verify= false;
464 } /* ms_task_expire_verify_init */
465
466
467 /**
468 * used to get one task, the function initializes the task
469 * structure.
470 *
471 * @param c, pointer of the concurrency
472 * @param warmup, whether it need warmup
473 *
474 * @return ms_task_t*, pointer of current task in the
475 * concurrency
476 */
477 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
478 {
479 ms_task_t *task= &c->curr_task;
480
481 while (1)
482 {
483 task->verify= false;
484 task->finish_verify= true;
485 task->get_miss= true;
486
487 if (warmup)
488 {
489 task->cmd= CMD_SET;
490 task->item= ms_get_next_set_item(c);
491
492 return task;
493 }
494
495 /* according to operation distribution to choose doing which operation */
496 ms_select_opt(c, task);
497
498 if (! ms_adjust_opt(c, task))
499 {
500 continue;
501 }
502
503 if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET))
504 {
505 ms_task_data_verify_init(task);
506 }
507
508 if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET)
509 && (task->item->exp_time > 0))
510 {
511 ms_task_expire_verify_init(task);
512 }
513
514 break;
515 }
516
517 /**
518 * Only update get and delete counter, set counter will be
519 * updated after set operation successes.
520 */
521 if (task->cmd == CMD_GET)
522 {
523 task->get_opt++;
524 task->cycle_undo_get--;
525 }
526
527 return task;
528 } /* ms_get_task */
529
530
531 /**
532 * send a signal to the main monitor thread
533 *
534 * @param sync_lock, pointer of the lock
535 */
536 static void ms_send_signal(ms_sync_lock_t *sync_lock)
537 {
538 pthread_mutex_lock(&sync_lock->lock);
539 sync_lock->count++;
540 pthread_cond_signal(&sync_lock->cond);
541 pthread_mutex_unlock(&sync_lock->lock);
542 } /* ms_send_signal */
543
544
545 /**
546 * If user only want to do get operation, but there is no object
547 * in server , so we use this function to warmup the server, and
548 * set some objects to server. It runs at the beginning of task.
549 *
550 * @param c, pointer of the concurrency
551 */
552 static void ms_warmup_server(ms_conn_t *c)
553 {
554 ms_task_t *task;
555 ms_task_item_t *item;
556
557 /**
558 * Extra one loop to get the last command returned state.
559 * Normally it gets the previous command returned state.
560 */
561 if ((c->remain_warmup_num >= 0)
562 && (c->remain_warmup_num != c->warmup_num))
563 {
564 item= ms_get_cur_opt_item(c);
565 /* only update the set command result state for data verification */
566 if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED))
567 {
568 item->value_offset= item->key_suffix_offset;
569 /* set success, update counter */
570 c->set_cursor++;
571 }
572 else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED)
573 {
574 printf("key: %" PRIx64 " didn't set success\n", item->key_prefix);
575 }
576 }
577
578 /* the last time don't run a task */
579 if (c->remain_warmup_num-- > 0)
580 {
581 /* operate next task item */
582 task= ms_get_task(c, true);
583 item= task->item;
584 ms_mcd_set(c, item);
585 }
586
587 /**
588 * finish warming up server, wait all connects initialize
589 * complete. Then all connects can start do task at the same
590 * time.
591 */
592 if (c->remain_warmup_num == -1)
593 {
594 ms_send_signal(&ms_global.init_lock);
595 c->remain_warmup_num--; /* never run the if branch */
596 }
597 } /* ms_warmup_server */
598
599
600 /**
601 * dispatch single get and set task
602 *
603 * @param c, pointer of the concurrency
604 */
605 static void ms_single_getset_task_sch(ms_conn_t *c)
606 {
607 ms_task_t *task;
608 ms_task_item_t *item;
609
610 /* the last time don't run a task */
611 if (c->remain_exec_num-- > 0)
612 {
613 task= ms_get_task(c, false);
614 item= task->item;
615 if (task->cmd == CMD_SET)
616 {
617 ms_mcd_set(c, item);
618 }
619 else if (task->cmd == CMD_GET)
620 {
621 assert(task->cmd == CMD_GET);
622 ms_mcd_get(c, item, task->verify);
623 }
624 }
625 } /* ms_single_getset_task_sch */
626
627
628 /**
629 * dispatch multi-get and set task
630 *
631 * @param c, pointer of the concurrency
632 */
633 static void ms_multi_getset_task_sch(ms_conn_t *c)
634 {
635 ms_task_t *task;
636 ms_mlget_task_item_t *mlget_item;
637
638 while (1)
639 {
640 if (c->remain_exec_num-- > 0)
641 {
642 task= ms_get_task(c, false);
643 if (task->cmd == CMD_SET) /* just do it */
644 {
645 ms_mcd_set(c, task->item);
646 break;
647 }
648 else
649 {
650 assert(task->cmd == CMD_GET);
651 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
652 mlget_item->item= task->item;
653 mlget_item->verify= task->verify;
654 mlget_item->finish_verify= task->finish_verify;
655 mlget_item->get_miss= task->get_miss;
656 c->mlget_task.mlget_num++;
657
658 /* enough multi-get task items can be done */
659 if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
660 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
661 {
662 ms_mcd_mlget(c);
663 break;
664 }
665 }
666 }
667 else
668 {
669 if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0))
670 {
671 ms_mcd_mlget(c);
672 }
673 break;
674 }
675 }
676 } /* ms_multi_getset_task_sch */
677
678
679 /**
680 * calculate the difference value of two time points
681 *
682 * @param start_time, the start time
683 * @param end_time, the end time
684 *
685 * @return uint64_t, the difference value between start_time and end_time in us
686 */
687 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
688 {
689 int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec;
690 int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec;
691
692 assert(endtime >= starttime);
693
694 return endtime - starttime;
695 } /* ms_time_diff */
696
697
698 /**
699 * after get the response from server for multi-get, the
700 * function update the state of the task and do data verify if
701 * necessary.
702 *
703 * @param c, pointer of the concurrency
704 */
705 static void ms_update_multi_get_result(ms_conn_t *c)
706 {
707 ms_mlget_task_item_t *mlget_item;
708 ms_task_item_t *item;
709 char *orignval= NULL;
710 char *orignkey= NULL;
711
712 if (c == NULL)
713 {
714 return;
715 }
716 assert(c != NULL);
717
718 for (int i= 0; i < c->mlget_task.mlget_num; i++)
719 {
720 mlget_item= &c->mlget_task.mlget_item[i];
721 item= mlget_item->item;
722 orignval= &ms_setting.char_block[item->value_offset];
723 orignkey= &ms_setting.char_block[item->key_suffix_offset];
724
725 /* update get miss counter */
726 if (mlget_item->get_miss)
727 {
728 atomic_add_size(&ms_stats.get_misses, 1);
729 }
730
731 /* get nothing from server for this task item */
732 if (mlget_item->verify && ! mlget_item->finish_verify)
733 {
734 /* verify expire time if necessary */
735 if (item->exp_time > 0)
736 {
737 struct timeval curr_time;
738 gettimeofday(&curr_time, NULL);
739
740 /* object doesn't expire but can't get it now */
741 if (curr_time.tv_sec - item->client_time
742 < item->exp_time - EXPIRE_TIME_ERROR)
743 {
744 atomic_add_size(&ms_stats.unexp_unget, 1);
745
746 if (ms_setting.verbose)
747 {
748 char set_time[64];
749 char cur_time[64];
750 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
751 localtime(&item->client_time));
752 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
753 localtime(&curr_time.tv_sec));
754 fprintf(stderr,
755 "\n\t<%d expire time verification failed, object "
756 "doesn't expire but can't get it now\n"
757 "\tkey len: %d\n"
758 "\tkey: %" PRIx64 " %.*s\n"
759 "\tset time: %s current time: %s "
760 "diff time: %d expire time: %d\n"
761 "\texpected data len: %d\n"
762 "\texpected data: %.*s\n"
763 "\treceived data: \n",
764 c->sfd,
765 item->key_size,
766 item->key_prefix,
767 item->key_size - (int)KEY_PREFIX_SIZE,
768 orignkey,
769 set_time,
770 cur_time,
771 (int)(curr_time.tv_sec - item->client_time),
772 item->exp_time,
773 item->value_size,
774 item->value_size,
775 orignval);
776 fflush(stderr);
777 }
778 }
779 }
780 else
781 {
782 atomic_add_size(&ms_stats.vef_miss, 1);
783
784 if (ms_setting.verbose)
785 {
786 fprintf(stderr, "\n<%d data verification failed\n"
787 "\tkey len: %d\n"
788 "\tkey: %" PRIx64 " %.*s\n"
789 "\texpected data len: %d\n"
790 "\texpected data: %.*s\n"
791 "\treceived data: \n",
792 c->sfd, item->key_size, item->key_prefix,
793 item->key_size - (int)KEY_PREFIX_SIZE,
794 orignkey, item->value_size, item->value_size, orignval);
795 fflush(stderr);
796 }
797 }
798 }
799 }
800 c->mlget_task.mlget_num= 0;
801 c->mlget_task.value_index= INVALID_OFFSET;
802 } /* ms_update_multi_get_result */
803
804
805 /**
806 * after get the response from server for single get, the
807 * function update the state of the task and do data verify if
808 * necessary.
809 *
810 * @param c, pointer of the concurrency
811 * @param item, pointer of task item which includes the object
812 * information
813 */
814 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
815 {
816 char *orignval= NULL;
817 char *orignkey= NULL;
818
819 if ((c == NULL) || (item == NULL))
820 {
821 return;
822 }
823 assert(c != NULL);
824 assert(item != NULL);
825
826 orignval= &ms_setting.char_block[item->value_offset];
827 orignkey= &ms_setting.char_block[item->key_suffix_offset];
828
829 /* update get miss counter */
830 if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
831 {
832 atomic_add_size(&ms_stats.get_misses, 1);
833 }
834
835 /* get nothing from server for this task item */
836 if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify
837 && ! c->curr_task.finish_verify)
838 {
839 /* verify expire time if necessary */
840 if (item->exp_time > 0)
841 {
842 struct timeval curr_time;
843 gettimeofday(&curr_time, NULL);
844
845 /* object doesn't expire but can't get it now */
846 if (curr_time.tv_sec - item->client_time
847 < item->exp_time - EXPIRE_TIME_ERROR)
848 {
849 atomic_add_size(&ms_stats.unexp_unget, 1);
850
851 if (ms_setting.verbose)
852 {
853 char set_time[64];
854 char cur_time[64];
855 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
856 localtime(&item->client_time));
857 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
858 localtime(&curr_time.tv_sec));
859 fprintf(stderr,
860 "\n\t<%d expire time verification failed, object "
861 "doesn't expire but can't get it now\n"
862 "\tkey len: %d\n"
863 "\tkey: %" PRIx64 " %.*s\n"
864 "\tset time: %s current time: %s "
865 "diff time: %d expire time: %d\n"
866 "\texpected data len: %d\n"
867 "\texpected data: %.*s\n"
868 "\treceived data: \n",
869 c->sfd,
870 item->key_size,
871 item->key_prefix,
872 item->key_size - (int)KEY_PREFIX_SIZE,
873 orignkey,
874 set_time,
875 cur_time,
876 (int)(curr_time.tv_sec - item->client_time),
877 item->exp_time,
878 item->value_size,
879 item->value_size,
880 orignval);
881 fflush(stderr);
882 }
883 }
884 }
885 else
886 {
887 atomic_add_size(&ms_stats.vef_miss, 1);
888
889 if (ms_setting.verbose)
890 {
891 fprintf(stderr, "\n<%d data verification failed\n"
892 "\tkey len: %d\n"
893 "\tkey: %" PRIx64 " %.*s\n"
894 "\texpected data len: %d\n"
895 "\texpected data: %.*s\n"
896 "\treceived data: \n",
897 c->sfd, item->key_size, item->key_prefix,
898 item->key_size - (int)KEY_PREFIX_SIZE,
899 orignkey, item->value_size, item->value_size, orignval);
900 fflush(stderr);
901 }
902 }
903 }
904 } /* ms_update_single_get_result */
905
906
907 /**
908 * after get the response from server for set the function
909 * update the state of the task and do data verify if necessary.
910 *
911 * @param c, pointer of the concurrency
912 * @param item, pointer of task item which includes the object
913 * information
914 */
915 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
916 {
917 if ((c == NULL) || (item == NULL))
918 {
919 return;
920 }
921 assert(c != NULL);
922 assert(item != NULL);
923
924 if (c->precmd.cmd == CMD_SET)
925 {
926 switch (c->precmd.retstat)
927 {
928 case MCD_STORED:
929 if (item->value_offset == INVALID_OFFSET)
930 {
931 /* first set with the same offset of key suffix */
932 item->value_offset= item->key_suffix_offset;
933 }
934 else
935 {
936 /* not first set, just increase the value offset */
937 item->value_offset+= 1;
938 }
939
940 /* set successes, update counter */
941 c->set_cursor++;
942 c->curr_task.set_opt++;
943 c->curr_task.cycle_undo_set--;
944 break;
945
946 case MCD_SERVER_ERROR:
947 default:
948 break;
949 } /* switch */
950 }
951 } /* ms_update_set_result */
952
953
954 /**
955 * update the response time result
956 *
957 * @param c, pointer of the concurrency
958 */
959 static void ms_update_stat_result(ms_conn_t *c)
960 {
961 bool get_miss= false;
962
963 if (c == NULL)
964 {
965 return;
966 }
967 assert(c != NULL);
968
969 gettimeofday(&c->end_time, NULL);
970 uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time);
971
972 pthread_mutex_lock(&ms_statistic.stat_mutex);
973
974 switch (c->precmd.cmd)
975 {
976 case CMD_SET:
977 ms_record_event(&ms_statistic.set_stat, time_diff, false);
978 break;
979
980 case CMD_GET:
981 if (c->curr_task.get_miss)
982 {
983 get_miss= true;
984 }
985 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
986 break;
987
988 default:
989 break;
990 } /* switch */
991
992 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
993 pthread_mutex_unlock(&ms_statistic.stat_mutex);
994 } /* ms_update_stat_result */
995
996
997 /**
998 * after get response from server for the current operation, and
999 * before doing the next operation, update the state of the
1000 * current operation.
1001 *
1002 * @param c, pointer of the concurrency
1003 */
1004 static void ms_update_task_result(ms_conn_t *c)
1005 {
1006 ms_task_item_t *item;
1007
1008 if (c == NULL)
1009 {
1010 return;
1011 }
1012 assert(c != NULL);
1013
1014 item= ms_get_cur_opt_item(c);
1015 if (item == NULL)
1016 {
1017 return;
1018 }
1019 assert(item != NULL);
1020
1021 ms_update_set_result(c, item);
1022
1023 if ((ms_setting.stat_freq > 0)
1024 && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET)))
1025 {
1026 ms_update_stat_result(c);
1027 }
1028
1029 /* update multi-get task item */
1030 if (((ms_setting.mult_key_num > 1)
1031 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1032 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1033 {
1034 ms_update_multi_get_result(c);
1035 }
1036 else
1037 {
1038 ms_update_single_get_result(c, item);
1039 }
1040 } /* ms_update_task_result */
1041
1042
1043 /**
1044 * run get and set operation
1045 *
1046 * @param c, pointer of the concurrency
1047 *
1048 * @return int, if success, return 0, else return -1
1049 */
1050 static int ms_run_getset_task(ms_conn_t *c)
1051 {
1052 /**
1053 * extra one loop to get the last command return state. get the
1054 * last command return state.
1055 */
1056 if ((c->remain_exec_num >= 0)
1057 && (c->remain_exec_num != c->exec_num))
1058 {
1059 ms_update_task_result(c);
1060 }
1061
1062 /* multi-get */
1063 if (ms_setting.mult_key_num > 1)
1064 {
1065 /* operate next task item */
1066 ms_multi_getset_task_sch(c);
1067 }
1068 else
1069 {
1070 /* operate next task item */
1071 ms_single_getset_task_sch(c);
1072 }
1073
1074 /* no task to do, exit */
1075 if ((c->remain_exec_num == -1) || ms_global.time_out)
1076 {
1077 return -1;
1078 }
1079
1080 return 0;
1081 } /* ms_run_getset_task */
1082
1083
1084 /**
1085 * the state machine call the function to execute task.
1086 *
1087 * @param c, pointer of the concurrency
1088 *
1089 * @return int, if success, return 0, else return -1
1090 */
1091 int ms_exec_task(struct conn *c)
1092 {
1093 if (! ms_global.finish_warmup)
1094 {
1095 ms_warmup_server(c);
1096 }
1097 else
1098 {
1099 if (ms_run_getset_task(c) != 0)
1100 {
1101 return -1;
1102 }
1103 }
1104
1105 return 0;
1106 } /* ms_exec_task */