Schooner memslap changes
[awesomized/libmemcached] / clients / ms_task.c
1 /*
2 * File: ms_task.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11 #include "ms_thread.h"
12 #include "ms_setting.h"
13
14 /* command distribution adjustment cycle */
15 #define CMD_DISTR_ADJUST_CYCLE 1000
16 #define DISADJUST_FACTOR 0.03 /**
17 * In one adjustment cycle, if undo set or get
18 * operations proportion is more than 3% , means
19 * there are too many new item or need more new
20 * item in the window. This factor shows it.
21 */
22
23 extern __thread ms_thread_t ms_thread;
24
25 /* get item from task window */
26 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
27 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
28 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
29 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c);
30
31 /* select next operation to do */
32 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
33
34 /* set and get speed estimate for controlling and adjustment */
35 static bool ms_is_set_too_fast(ms_task_t *task);
36 static bool ms_is_get_too_fast(ms_task_t *task);
37 static void ms_kick_out_item(ms_task_item_t *item);
38
39 /* miss rate adjustment */
40 static bool ms_need_overwirte_item(ms_task_t *task);
41 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
42
43 /* deal with data verification initialization */
44 static void ms_task_data_verify_init(ms_task_t *task);
45 static void ms_task_expire_verify_init(ms_task_t *task);
46
47 /* select a new task to do */
48 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
49
50 /* run the selected task */
51 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
52 static void ms_update_stat_result(ms_conn_t *c);
53 static void ms_update_multi_get_result(ms_conn_t *c);
54 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
55 static void ms_update_task_result(ms_conn_t *c);
56 static void ms_single_getset_task_sch(ms_conn_t *c);
57 static void ms_multi_getset_task_sch(ms_conn_t *c);
58 static void ms_send_signal(ms_sync_lock_t *sync_lock);
59 static void ms_warmup_server(ms_conn_t *c);
60 static int ms_run_getset_task(ms_conn_t *c);
61
62 /**
63 * used to get the current operation item(object)
64 *
65 * @param c, pointer of the concurrency
66 *
67 * @return ms_task_item_t*, current operating item
68 */
69 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
70 {
71 return(c->curr_task.item);
72 }
73
74 /**
75 * used to get the next item to do get operation
76 *
77 * @param c, pointer of the concurrency
78 *
79 * @return ms_task_item_t*, the pointer of the next item to do
80 * get operation
81 */
82 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
83 {
84 ms_task_item_t *item = NULL;
85
86 if (c->set_cursor <= 0) {
87 /* the first item in the window */
88 item = &c->item_win[0];
89 } else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size) {
90 /* random get one item set before */
91 item = &c->item_win[random() % (int64_t)c->set_cursor];
92 } else {
93 /* random get one item from the window */
94 item = &c->item_win[random() % c->win_size];
95 }
96
97 return item;
98 }
99
100 /**
101 * used to get the next item to do set operation
102 *
103 * @param c, pointer of the concurrency
104 *
105 * @return ms_task_item_t*, the pointer of the next item to do
106 * set operation
107 */
108 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
109 {
110 /**
111 * when a set command successes, the cursor will plus 1. If set
112 * fails, the cursor doesn't change. it isn't necessary to
113 * increase the cursor here.
114 */
115 return(&c->item_win[(int64_t)c->set_cursor % c->win_size]);
116 }
117
118 /**
119 * If we need do overwrite, we could select a item set before.
120 * This function is used to get a item set before to do
121 * overwrite.
122 *
123 * @param c, pointer of the concurrency
124 *
125 * @return ms_task_item_t*, the pointer of the previous item of
126 * set operation
127 */
128 static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c)
129 {
130 if (c->set_cursor <= 0) {
131 return (&c->item_win[0]);
132 } else {
133 return(&c->item_win[(int64_t)--c->set_cursor % c->win_size]);
134 }
135 }
136
137 /**
138 * According to the proportion of operations(get or set), select
139 * an operation to do.
140 *
141 * @param c, pointer of the concurrency
142 * @param task, pointer of current task in the concurrency
143 */
144 static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
145 {
146 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
147 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
148
149 /* update cycle operation number if necessary */
150 if (task->cycle_undo_get == 0 || task->cycle_undo_set == 0) {
151
152 task->cycle_undo_get += (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
153 task->cycle_undo_set += (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
154 }
155
156 /**
157 * According to operation distribution to choose doing which
158 * operation. If it can't set new object to sever, just change
159 * to do get operation.
160 */
161 if (set_prop > PROP_ERROR
162 && (double)task->get_opt * set_prop >= (double)task->set_opt * get_prop) {
163 task->cmd = CMD_SET;
164 task->item = ms_get_next_set_item(c);
165 } else {
166 task->cmd = CMD_GET;
167 task->item = ms_get_next_get_item(c);
168 }
169 }
170
171 /**
172 * used to judge whether the number of get operations done is
173 * more than expected number of get operations to do right now.
174 *
175 * @param task, pointer of current task in the concurrency
176 *
177 * @return bool, if get too fast, return true, else return false
178 */
179 static bool ms_is_get_too_fast(ms_task_t *task)
180 {
181 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
182 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
183
184 /* no get operation */
185 if (get_prop < PROP_ERROR) {
186 return false;
187 }
188
189 int max_undo_set = (int)(set_prop / get_prop *(1.0 + DISADJUST_FACTOR))
190 * task->cycle_undo_get;
191
192 if ((double)task->get_opt * set_prop > (double)task->set_opt * get_prop
193 && task->cycle_undo_set > max_undo_set) {
194
195 return true;
196 }
197
198 return false;
199 }
200
201 /**
202 * used to judge whether the number of set operations done is
203 * more than expected number of set operations to do right now.
204 *
205 * @param task, pointer of current task in the concurrency
206 *
207 * @return bool, if set too fast, return true, else return false
208 */
209 static bool ms_is_set_too_fast(ms_task_t *task)
210 {
211 double get_prop = ms_setting.cmd_distr[CMD_GET].cmd_prop;
212 double set_prop = ms_setting.cmd_distr[CMD_SET].cmd_prop;
213
214 /* no set operation */
215 if (set_prop < PROP_ERROR) {
216 return false;
217 }
218
219 /* If it does set operation too fast, skip some */
220 int max_undo_get = (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
221 * (double)task->cycle_undo_set);
222
223 if ((double)task->get_opt * set_prop < (double)task->set_opt * get_prop
224 && task->cycle_undo_get > max_undo_get) {
225 return true;
226 }
227
228 return false;
229 }
230
231 /**
232 * kick out the old item in the window, and add a new item to
233 * overwrite the old item. When we don't want to do overwrite
234 * object, and the current item to do set operation is an old
235 * item, we could kick out the old item and add a new item. Then
236 * we can ensure we set new object every time.
237 *
238 * @param item, pointer of task item which includes the object
239 * information
240 */
241 static void ms_kick_out_item(ms_task_item_t *item)
242 {
243 /* allocate a new item */
244 item->key_prefix = ms_get_key_prefix();
245
246 item->key_suffix_offset++;
247 item->value_offset = INVALID_OFFSET; /* new item use invalid value offset */
248 item->client_time = 0;
249 }
250
251 /**
252 * used to judge whether we need overwrite object based on the
253 * options user specified
254 *
255 * @param task, pointer of current task in the concurrency
256 *
257 * @return bool, if need overwrite, return true, else return
258 * false
259 */
260 static bool ms_need_overwirte_item(ms_task_t *task)
261 {
262 ms_task_item_t *item = task->item;
263
264 assert(item != NULL);
265 assert(task->cmd == CMD_SET);
266
267 /**
268 * according to data overwrite percent to determine if do data
269 * overwrite.
270 */
271 if (task->overwrite_set < (double)task->set_opt * ms_setting.overwrite_percent) {
272
273 return true;
274 }
275
276 return false;
277 }
278
279 /**
280 * used to adjust operation. the function must be called after
281 * select operation. the function change get operation to set
282 * operation, or set operation to get operation based on the
283 * current case.
284 *
285 * @param c, pointer of the concurrency
286 * @param task, pointer of current task in the concurrency
287 *
288 * @return bool, if success, return true, else return false
289 */
290 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
291 {
292 ms_task_item_t *item = task->item;
293 assert(item != NULL);
294
295 if (task->cmd == CMD_SET) {
296 /* If did set operation too fast, skip some */
297 if (ms_is_set_too_fast(task)) {
298
299 /* get the item instead */
300 if (item->value_offset != INVALID_OFFSET) {
301 task->cmd = CMD_GET;
302 return true;
303 }
304 }
305
306 /* If the current item is not a new item, kick it out */
307 if (item->value_offset != INVALID_OFFSET) {
308 if(ms_need_overwirte_item(task)) {
309 /* overwrite */
310 task->overwrite_set++;
311 } else {
312 /* kick out the current item to do set operation */
313 ms_kick_out_item(item);
314 }
315 } else { /* it's a new item */
316 /* need overwrite */
317 if (ms_need_overwirte_item(task)) {
318 item = ms_get_pre_set_item(c);
319 if (item->value_offset != INVALID_OFFSET) {
320 task->item = item;
321 task->overwrite_set++;
322 } else { /* previous set item is a new item */
323 /* select the previous item to run, and cancel overwrite */
324 task->item = item;
325 }
326 }
327 }
328 task->cmd = CMD_SET;
329 return true;
330 } else {
331
332 if (item->value_offset == INVALID_OFFSET) {
333 task->cmd = CMD_SET;
334 return true;
335 }
336
337 /**
338 * If It does get operation too fast, it will change the
339 * operation to set.
340 */
341 if (ms_is_get_too_fast(task)) {
342 /* don't kick out the first item in the window */
343 if (!ms_is_set_too_fast(task)) {
344 ms_kick_out_item(item);
345 task->cmd = CMD_SET;
346 return true;
347 } else {
348 return false;
349 }
350 }
351
352 assert(item->value_offset != INVALID_OFFSET);
353
354 task->cmd = CMD_GET;
355 return true;
356 }
357 }
358
359 /**
360 * used to initialize the task which need verify data.
361 *
362 * @param task, pointer of current task in the concurrency
363 */
364 static void ms_task_data_verify_init(ms_task_t *task)
365 {
366 ms_task_item_t *item = task->item;
367
368 assert(item != NULL);
369 assert(task->cmd == CMD_GET);
370
371 /**
372 * according to data verification percent to determine if do
373 * data verification.
374 */
375 if (task->verified_get < (double)task->get_opt * ms_setting.verify_percent) {
376 /**
377 * currently it doesn't do verify, just increase the counter,
378 * and do verification next proper get command
379 */
380 if (task->item->value_offset != INVALID_OFFSET && item->exp_time == 0) {
381 task->verify = true;
382 task->finish_verify = false;
383 task->verified_get++;
384 }
385 }
386 }
387
388 /**
389 * used to initialize the task which need verify expire time.
390 *
391 * @param task, pointer of current task in the concurrency
392 */
393 static void ms_task_expire_verify_init(ms_task_t *task)
394 {
395 ms_task_item_t *item = task->item;
396
397 assert(item != NULL);
398 assert(task->cmd == CMD_GET);
399 assert(item->exp_time > 0);
400
401 task->verify = true;
402 task->finish_verify = false;
403 }
404
405 /**
406 * used to get one task, the function initializes the task
407 * structure.
408 *
409 * @param c, pointer of the concurrency
410 * @param warmup, whether it need warmup
411 *
412 * @return ms_task_t*, pointer of current task in the
413 * concurrency
414 */
415 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
416 {
417 ms_task_t *task = &c->curr_task;
418
419 while (1) {
420 task->verify = false;
421 task->finish_verify = true;
422 task->get_miss = true;
423
424 if (warmup) {
425 task->cmd = CMD_SET;
426 task->item = ms_get_next_set_item(c);
427
428 return task;
429 }
430
431 /* according to operation distribution to choose doing which operation */
432 ms_select_opt(c, task);
433
434 if (!ms_adjust_opt(c, task)) {
435 continue;
436 }
437
438 if (ms_setting.verify_percent > 0 && task->cmd == CMD_GET) {
439 ms_task_data_verify_init(task);
440 }
441
442 if (ms_setting.exp_ver_per > 0 && task->cmd == CMD_GET
443 && task->item->exp_time > 0) {
444 ms_task_expire_verify_init(task);
445 }
446
447 break;
448 }
449
450 /**
451 * Only update get and delete counter, set counter will be
452 * updated after set operation successes.
453 */
454 if (task->cmd == CMD_GET) {
455 task->get_opt++;
456 task->cycle_undo_get--;
457 }
458
459 return task;
460 }
461
462 /**
463 * send a signal to the main monitor thread
464 *
465 * @param sync_lock, pointer of the lock
466 */
467 static void ms_send_signal(ms_sync_lock_t *sync_lock)
468 {
469 pthread_mutex_lock(&sync_lock->lock);
470 sync_lock->count++;
471 pthread_cond_signal(&sync_lock->cond);
472 pthread_mutex_unlock(&sync_lock->lock);
473 }
474
475 /**
476 * If user only want to do get operation, but there is no object
477 * in server , so we use this function to warmup the server, and
478 * set some objects to server. It runs at the beginning of task.
479 *
480 * @param c, pointer of the concurrency
481 */
482 static void ms_warmup_server(ms_conn_t *c)
483 {
484 ms_task_t *task;
485 ms_task_item_t *item;
486
487 /**
488 * Extra one loop to get the last command returned state.
489 * Normally it gets the previous command returned state.
490 */
491 if (c->remain_warmup_num >= 0 &&
492 c->remain_warmup_num != c->warmup_num) {
493 item = ms_get_cur_opt_item(c);
494 /* only update the set command result state for data verification */
495 if (c->precmd.cmd == CMD_SET && c->precmd.retstat == MCD_STORED) {
496 item->value_offset = item->key_suffix_offset;
497 /* set success, update counter */
498 c->set_cursor++;
499 } else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED) {
500 printf("key: %lx didn't set success\n", item->key_prefix);
501 }
502 }
503
504 /* the last time don't run a task */
505 if (c->remain_warmup_num-- > 0) {
506 /* operate next task item */
507 task = ms_get_task(c, true);
508 item = task->item;
509 ms_mcd_set(c, item);
510 }
511
512 /**
513 * finish warming up server, wait all connects initialize
514 * complete. Then all connects can start do task at the same
515 * time.
516 */
517 if (c->remain_warmup_num == -1) {
518 ms_send_signal(&ms_global.init_lock);
519 c->remain_warmup_num--; /* never run the if branch */
520 }
521 }
522
523 /**
524 * dispatch single get and set task
525 *
526 * @param c, pointer of the concurrency
527 */
528 static void ms_single_getset_task_sch(ms_conn_t *c)
529 {
530 ms_task_t *task;
531 ms_task_item_t *item;
532
533 /* the last time don't run a task */
534 if (c->remain_exec_num-- > 0) {
535 task = ms_get_task(c, false);
536 item = task->item;
537 if (task->cmd == CMD_SET) {
538 ms_mcd_set(c, item);
539 } else if (task->cmd == CMD_GET) {
540 assert(task->cmd == CMD_GET);
541 ms_mcd_get(c, item, task->verify);
542 }
543 }
544 }
545
546 /**
547 * dispatch multi-get and set task
548 *
549 * @param c, pointer of the concurrency
550 */
551 static void ms_multi_getset_task_sch(ms_conn_t *c)
552 {
553 ms_task_t *task;
554 ms_mlget_task_item_t *mlget_item;
555
556 while (1) {
557 if (c->remain_exec_num-- > 0) {
558 task = ms_get_task(c, false);
559 if (task->cmd == CMD_SET) { /* just do it */
560 ms_mcd_set(c, task->item);
561 break;
562 } else {
563 assert(task->cmd == CMD_GET);
564 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
565 mlget_item->item = task->item;
566 mlget_item->verify = task->verify;
567 mlget_item->finish_verify = task->finish_verify;
568 mlget_item->get_miss = task->get_miss;
569 c->mlget_task.mlget_num++;
570
571 /* enough multi-get task items can be done */
572 if (c->mlget_task.mlget_num >= ms_setting.mult_key_num ||
573 (c->remain_exec_num == 0 && c->mlget_task.mlget_num > 0)) {
574 ms_mcd_mlget(c);
575 break;
576 }
577 }
578 } else {
579 if (c->remain_exec_num <= 0 && c->mlget_task.mlget_num > 0) {
580 ms_mcd_mlget(c);
581 }
582 break;
583 }
584 }
585 }
586
587 /**
588 * calculate the difference value of two time points
589 *
590 * @param start_time, the start time
591 * @param end_time, the end time
592 *
593 * @return uint64_t, the difference value between start_time and end_time in us
594 */
595 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
596 {
597 int64_t endtime = end_time->tv_sec * 1000000 + end_time->tv_usec;
598 int64_t starttime = start_time->tv_sec * 1000000 + start_time->tv_usec;
599
600 assert(endtime >= starttime);
601
602 return(endtime - starttime);
603 }
604
605 /**
606 * after get the response from server for multi-get, the
607 * function update the state of the task and do data verify if
608 * necessary.
609 *
610 * @param c, pointer of the concurrency
611 */
612 static void ms_update_multi_get_result(ms_conn_t *c)
613 {
614 ms_mlget_task_item_t *mlget_item;
615 ms_task_item_t *item;
616 char *orignval = NULL;
617 char *orignkey = NULL;
618
619 if (c == NULL) {
620 return;
621 }
622 assert(c != NULL);
623
624 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
625 mlget_item = &c->mlget_task.mlget_item[i];
626 item = mlget_item->item;
627 orignval = &ms_setting.char_block[item->value_offset];
628 orignkey = &ms_setting.char_block[item->key_suffix_offset];
629
630 /* update get miss counter */
631 if (mlget_item->get_miss) {
632 __sync_fetch_and_add(&ms_stats.get_misses, 1);
633 }
634
635 /* get nothing from server for this task item */
636 if (mlget_item->verify && !mlget_item->finish_verify) {
637 /* verify expire time if necessary */
638 if (item->exp_time > 0) {
639 struct timeval curr_time;
640 gettimeofday(&curr_time, NULL);
641
642 /* object doesn't expire but can't get it now */
643 if (curr_time.tv_sec - item->client_time
644 < item->exp_time - EXPIRE_TIME_ERROR) {
645 __sync_fetch_and_add(&ms_stats.unexp_unget, 1);
646
647 if (ms_setting.verbose) {
648 char set_time[64];
649 char cur_time[64];
650 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
651 localtime(&item->client_time));
652 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
653 localtime(&curr_time.tv_sec));
654 fprintf(stderr, "\n\t<%d expire time verification failed, object "
655 "doesn't expire but can't get it now\n"
656 "\tkey len: %d\n"
657 "\tkey: %lx %.*s\n"
658 "\tset time: %s current time: %s "
659 "diff time: %d expire time: %d\n"
660 "\texpected data len: %d\n"
661 "\texpected data: %.*s\n"
662 "\treceived data: \n",
663 c->sfd, item->key_size, item->key_prefix,
664 item->key_size - (int)KEY_PREFIX_SIZE,
665 orignkey, set_time, cur_time,
666 (int)(curr_time.tv_sec - item->client_time), item->exp_time,
667 item->value_size, item->value_size, orignval);
668 fflush(stderr);
669 }
670 }
671 } else {
672 __sync_fetch_and_add(&ms_stats.vef_miss, 1);
673
674 if (ms_setting.verbose) {
675 fprintf(stderr, "\n<%d data verification failed\n"
676 "\tkey len: %d\n"
677 "\tkey: %lx %.*s\n"
678 "\texpected data len: %d\n"
679 "\texpected data: %.*s\n"
680 "\treceived data: \n",
681 c->sfd, item->key_size, item->key_prefix,
682 item->key_size - (int)KEY_PREFIX_SIZE,
683 orignkey, item->value_size, item->value_size, orignval);
684 fflush(stderr);
685 }
686 }
687 }
688 }
689 c->mlget_task.mlget_num = 0;
690 c->mlget_task.value_index = INVALID_OFFSET;
691 }
692
693 /**
694 * after get the response from server for single get, the
695 * function update the state of the task and do data verify if
696 * necessary.
697 *
698 * @param c, pointer of the concurrency
699 * @param item, pointer of task item which includes the object
700 * information
701 */
702 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
703 {
704 char *orignval = NULL;
705 char *orignkey = NULL;
706
707 if (c == NULL || item == NULL) {
708 return;
709 }
710 assert(c != NULL);
711 assert(item != NULL);
712
713 orignval = &ms_setting.char_block[item->value_offset];
714 orignkey = &ms_setting.char_block[item->key_suffix_offset];
715
716 /* update get miss counter */
717 if (c->precmd.cmd == CMD_GET && c->curr_task.get_miss) {
718 __sync_fetch_and_add(&ms_stats.get_misses, 1);
719 }
720
721 /* get nothing from server for this task item */
722 if (c->precmd.cmd == CMD_GET && c->curr_task.verify
723 && !c->curr_task.finish_verify) {
724
725 /* verify expire time if necessary */
726 if (item->exp_time > 0) {
727 struct timeval curr_time;
728 gettimeofday(&curr_time, NULL);
729
730 /* object doesn't expire but can't get it now */
731 if (curr_time.tv_sec - item->client_time
732 < item->exp_time - EXPIRE_TIME_ERROR) {
733 __sync_fetch_and_add(&ms_stats.unexp_unget, 1);
734
735 if (ms_setting.verbose) {
736 char set_time[64];
737 char cur_time[64];
738 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
739 localtime(&item->client_time));
740 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
741 localtime(&curr_time.tv_sec));
742 fprintf(stderr, "\n\t<%d expire time verification failed, object "
743 "doesn't expire but can't get it now\n"
744 "\tkey len: %d\n"
745 "\tkey: %lx %.*s\n"
746 "\tset time: %s current time: %s "
747 "diff time: %d expire time: %d\n"
748 "\texpected data len: %d\n"
749 "\texpected data: %.*s\n"
750 "\treceived data: \n",
751 c->sfd, item->key_size, item->key_prefix,
752 item->key_size - (int)KEY_PREFIX_SIZE,
753 orignkey, set_time, cur_time,
754 (int)(curr_time.tv_sec - item->client_time), item->exp_time,
755 item->value_size, item->value_size, orignval);
756 fflush(stderr);
757 }
758 }
759 } else {
760 __sync_fetch_and_add(&ms_stats.vef_miss, 1);
761
762 if (ms_setting.verbose) {
763 fprintf(stderr, "\n<%d data verification failed\n"
764 "\tkey len: %d\n"
765 "\tkey: %lx %.*s\n"
766 "\texpected data len: %d\n"
767 "\texpected data: %.*s\n"
768 "\treceived data: \n",
769 c->sfd, item->key_size, item->key_prefix,
770 item->key_size - (int)KEY_PREFIX_SIZE,
771 orignkey, item->value_size, item->value_size, orignval);
772 fflush(stderr);
773 }
774 }
775 }
776 }
777
778 /**
779 * after get the response from server for set the function
780 * update the state of the task and do data verify if necessary.
781 *
782 * @param c, pointer of the concurrency
783 * @param item, pointer of task item which includes the object
784 * information
785 */
786 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
787 {
788 if (c == NULL || item == NULL) {
789 return;
790 }
791 assert(c != NULL);
792 assert(item != NULL);
793
794 if (c->precmd.cmd == CMD_SET) {
795 switch (c->precmd.retstat) {
796 case MCD_STORED:
797 if (item->value_offset == INVALID_OFFSET) {
798 /* first set with the same offset of key suffix */
799 item->value_offset = item->key_suffix_offset;
800 } else {
801 /* not first set, just increase the value offset */
802 item->value_offset += 1;
803 }
804
805 /* set successes, update counter */
806 c->set_cursor++;
807 c->curr_task.set_opt++;
808 c->curr_task.cycle_undo_set--;
809 break;
810
811 case MCD_SERVER_ERROR:
812 default:
813 break;
814 }
815 }
816 }
817
818 /**
819 * update the response time result
820 *
821 * @param c, pointer of the concurrency
822 */
823 static void ms_update_stat_result(ms_conn_t *c)
824 {
825 bool get_miss = false;
826
827 if (c == NULL) {
828 return;
829 }
830 assert(c != NULL);
831
832 gettimeofday(&c->end_time, NULL);
833 uint64_t time_diff = (uint64_t)ms_time_diff(&c->start_time, &c->end_time);
834
835 pthread_mutex_lock(&ms_statistic.stat_mutex);
836 switch (c->precmd.cmd) {
837 case CMD_SET:
838 ms_record_event(&ms_statistic.set_stat, time_diff, false);
839 break;
840 case CMD_GET:
841 if (c->curr_task.get_miss) {
842 get_miss = true;
843 }
844 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
845 break;
846 default:
847 break;
848 }
849 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
850 pthread_mutex_unlock(&ms_statistic.stat_mutex);
851 }
852
853 /**
854 * after get response from server for the current operation, and
855 * before doing the next operation, update the state of the
856 * current operation.
857 *
858 * @param c, pointer of the concurrency
859 */
860 static void ms_update_task_result(ms_conn_t *c)
861 {
862 ms_task_item_t *item;
863 if (c == NULL) {
864 return;
865 }
866 assert(c != NULL);
867
868 item = ms_get_cur_opt_item(c);
869 if (item == NULL) {
870 return;
871 }
872 assert(item != NULL);
873
874 ms_update_set_result(c, item);
875
876 if (ms_setting.stat_freq > 0
877 && (c->precmd.cmd == CMD_SET || c->precmd.cmd == CMD_GET)) {
878 ms_update_stat_result(c);
879 }
880
881 /* update multi-get task item */
882 if ((ms_setting.mult_key_num > 1 &&
883 c->mlget_task.mlget_num >= ms_setting.mult_key_num) ||
884 (c->remain_exec_num == 0 && c->mlget_task.mlget_num > 0)) {
885 ms_update_multi_get_result(c);
886 } else {
887 ms_update_single_get_result(c, item);
888 }
889 }
890
891 /**
892 * run get and set operation
893 *
894 * @param c, pointer of the concurrency
895 *
896 * @return int, if success, return 0, else return -1
897 */
898 static int ms_run_getset_task(ms_conn_t *c)
899 {
900 /**
901 * extra one loop to get the last command return state. get the
902 * last command return state.
903 */
904 if (c->remain_exec_num >= 0 &&
905 c->remain_exec_num != c->exec_num) {
906 ms_update_task_result(c);
907 }
908
909 /* multi-get */
910 if (ms_setting.mult_key_num > 1) {
911 /* operate next task item */
912 ms_multi_getset_task_sch(c);
913 } else {
914 /* operate next task item */
915 ms_single_getset_task_sch(c);
916 }
917
918 /* no task to do, exit */
919 if (c->remain_exec_num == -1 || ms_global.time_out) {
920 return -1;
921 }
922
923 return 0;
924 }
925
926 /**
927 * the state machine call the function to execute task.
928 *
929 * @param c, pointer of the concurrency
930 *
931 * @return int, if success, return 0, else return -1
932 */
933 int ms_exec_task(struct conn *c)
934 {
935 if (!ms_global.finish_warmup) {
936 ms_warmup_server(c);
937 } else {
938 if (ms_run_getset_task(c) != 0) {
939 return -1;
940 }
941 }
942
943 return 0;
944 }