Schooner memslap changes
[m6w6/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11 #include <stdio.h>
12 #include <limits.h>
13 #include <sys/uio.h>
14 #include <event.h>
15 #include <fcntl.h>
16 #include <netinet/tcp.h>
17 #include <arpa/inet.h>
18 #include "ms_setting.h"
19 #include "ms_thread.h"
20
21 /* for network write */
22 #define TRANSMIT_COMPLETE 0
23 #define TRANSMIT_INCOMPLETE 1
24 #define TRANSMIT_SOFT_ERROR 2
25 #define TRANSMIT_HARD_ERROR 3
26
27 /* for generating key */
28 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
29 #define KEY_PREFIX_MASK 0x1010101010101010
30
31 /* For parse the value length return by server */
32 #define KEY_TOKEN 1
33 #define VALUELEN_TOKEN 3
34
35 /* global increasing counter, to ensure the key prefix unique */
36 static uint64_t key_prefix_seq = KEY_PREFIX_BASE;
37
38 /* global increasing counter, generating request id for UDP */
39 static int udp_request_id = 0;
40
41 extern __thread ms_thread_t ms_thread;
42
43 /* generate upd request id */
44 static int ms_get_udp_request_id(void);
45
46 /* connect initialize */
47 static void ms_task_init(ms_conn_t *c);
48 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
49 static int ms_conn_sock_init(ms_conn_t *c);
50 static int ms_conn_event_init(ms_conn_t *c);
51 static int ms_conn_init(ms_conn_t *c, const int init_state,
52 const int read_buffer_size, const bool is_udp);
53 static void ms_warmup_num_init(ms_conn_t *c);
54 static int ms_item_win_init(ms_conn_t *c);
55
56 /* connection close */
57 void ms_conn_free(ms_conn_t *c);
58 static void ms_conn_close(ms_conn_t *c);
59
60 /* create network connection */
61 static int ms_new_socket(struct addrinfo *ai);
62 static void ms_maximize_sndbuf(const int sfd);
63 static int ms_network_connect(ms_conn_t *c, char * srv_host_name,
64 const int srv_port, const bool is_udp, int *ret_sfd);
65 static int ms_reconn(ms_conn_t *c);
66
67 /* read and parse */
68 static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens);
69 static int ms_ascii_process_line(ms_conn_t *c, char *command);
70 static int ms_try_read_line(ms_conn_t *c);
71 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
72 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
73 static int ms_try_read_network(ms_conn_t *c);
74 static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item,
75 char *value, int vlen);
76 static void ms_ascii_complete_nread(ms_conn_t *c);
77 static void ms_bin_complete_nread(ms_conn_t *c);
78 static void ms_complete_nread(ms_conn_t *c);
79
80 /* send functions */
81 static int ms_add_msghdr(ms_conn_t *c);
82 static int ms_ensure_iov_space(ms_conn_t *c);
83 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
84 static int ms_build_udp_headers(ms_conn_t *c);
85 static int ms_transmit(ms_conn_t *c);
86
87 /* status adjustment */
88 static void ms_conn_shrink(ms_conn_t *c);
89 static void ms_conn_set_state(ms_conn_t *c, int state);
90 static bool ms_update_event(ms_conn_t *c, const int new_flags);
91 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
92 static int ms_get_next_sock_index(ms_conn_t *c);
93 static int ms_update_conn_sock_event(ms_conn_t *c);
94 static bool ms_need_yield(ms_conn_t *c);
95 static void ms_update_start_time(ms_conn_t *c);
96
97 /* main loop */
98 static void ms_drive_machine(ms_conn_t *c);
99 void ms_event_handler(const int fd, const short which, void *arg);
100
101 /* ascii protocol */
102 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
103 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
104 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
105
106 /* binary protocol */
107 static int ms_bin_process_response(ms_conn_t *c);
108 static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len,
109 uint16_t key_len, uint32_t body_len);
110 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
111 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
112 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
113 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
114
115 /**
116 * each key has two parts, prefix and suffix. The suffix is a
117 * string random get form the character table. The prefix is a
118 * uint64_t variable. And the prefix must be unique. we use the
119 * prefix to identify a key. And the prefix can't include
120 * character ' ' '\r' '\n' '\0'.
121 *
122 * @return uint64_t
123 */
124 uint64_t ms_get_key_prefix(void)
125 {
126 uint64_t key_prefix;
127
128 pthread_mutex_lock(&ms_global.seq_mutex);
129 key_prefix_seq |= KEY_PREFIX_MASK;
130 key_prefix = key_prefix_seq;
131 key_prefix_seq++;
132 pthread_mutex_unlock(&ms_global.seq_mutex);
133
134 return key_prefix;
135 }
136
137 /**
138 * get an unique udp request id
139 *
140 * @return an unique UDP request id
141 */
142 static int ms_get_udp_request_id(void)
143 {
144 return(__sync_fetch_and_add(&udp_request_id, 1));
145 }
146
147 /**
148 * initialize current task structure
149 *
150 * @param c, pointer of the concurrency
151 */
152 static void ms_task_init(ms_conn_t *c)
153 {
154 c->curr_task.cmd = CMD_NULL;
155 c->curr_task.item = 0;
156 c->curr_task.verify = false;
157 c->curr_task.finish_verify = true;
158 c->curr_task.get_miss = true;
159
160 c->curr_task.get_opt = 0;
161 c->curr_task.set_opt = 0;
162 c->curr_task.cycle_undo_get = 0;
163 c->curr_task.cycle_undo_set = 0;
164 c->curr_task.verified_get = 0;
165 c->curr_task.overwrite_set = 0;
166 }
167
168 /**
169 * initialize udp for the connection structure
170 *
171 * @param c, pointer of the concurrency
172 * @param is_udp, whether it's udp
173 *
174 * @return int, if success, return 0, else return -1
175 */
176 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
177 {
178 c->hdrbuf = 0;
179 c->rudpbuf = 0;
180 c->udppkt = 0;
181
182 c->rudpsize = UDP_DATA_BUFFER_SIZE;
183 c->hdrsize = 0;
184
185 c->rudpbytes = 0;
186 c->packets = 0;
187 c->recvpkt = 0;
188 c->pktcurr = 0;
189 c->ordcurr = 0;
190
191 c->udp = is_udp;
192
193 if (c->udp || (!c->udp && ms_setting.facebook_test)) {
194 c->rudpbuf = (char *)malloc((size_t)c->rudpsize);
195 c->udppkt = (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
196
197 if (c->rudpbuf == NULL || c->udppkt == NULL) {
198 if (c->rudpbuf != NULL) free(c->rudpbuf);
199 if (c->udppkt != NULL) free(c->udppkt);
200 fprintf(stderr, "malloc()\n");
201 return -1;
202 }
203 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
204 }
205
206 return 0;
207 }
208
209 /**
210 * initialize the connection structure
211 *
212 * @param c, pointer of the concurrency
213 * @param init_state, (conn_read, conn_write, conn_closing)
214 * @param read_buffer_size
215 * @param is_udp, whether it's udp
216 *
217 * @return int, if success, return 0, else return -1
218 */
219 static int ms_conn_init(ms_conn_t *c, const int init_state,
220 const int read_buffer_size, const bool is_udp)
221 {
222 assert(c != NULL);
223
224 c->rbuf = c->wbuf = 0;
225 c->iov = 0;
226 c->msglist = 0;
227
228 c->rsize = read_buffer_size;
229 c->wsize = WRITE_BUFFER_SIZE;
230 c->iovsize = IOV_LIST_INITIAL;
231 c->msgsize = MSG_LIST_INITIAL;
232
233 /* for replication, each connection need connect all the server */
234 if (ms_setting.rep_write_srv > 0) {
235 c->total_sfds = ms_setting.srv_cnt;
236 } else {
237 c->total_sfds = ms_setting.sock_per_conn;
238 }
239 c->alive_sfds = 0;
240
241 c->rbuf = (char *)malloc((size_t)c->rsize);
242 c->wbuf = (char *)malloc((size_t)c->wsize);
243 c->iov = (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
244 c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * (size_t)c->msgsize);
245 if (ms_setting.mult_key_num > 1) {
246 c->mlget_task.mlget_item = (ms_mlget_task_item_t *)
247 malloc(sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
248 }
249 c->tcpsfd = (int *)malloc((size_t)c->total_sfds * sizeof(int));
250
251 if (c->rbuf == NULL || c->wbuf == NULL || c->iov == NULL
252 || c->msglist == NULL || c->tcpsfd == NULL
253 || (ms_setting.mult_key_num > 1 && c->mlget_task.mlget_item == NULL)) {
254
255 if (c->rbuf != NULL) free(c->rbuf);
256 if (c->wbuf != NULL) free(c->wbuf);
257 if (c->iov != NULL) free(c->iov);
258 if (c->msglist != NULL) free(c->msglist);
259 if (c->mlget_task.mlget_item != NULL) free(c->mlget_task.mlget_item);
260 if (c->tcpsfd != NULL) free(c->tcpsfd);
261 fprintf(stderr, "malloc()\n");
262 return -1;
263 }
264
265 c->state = init_state;
266 c->rvbytes = 0;
267 c->rbytes = 0;
268 c->rcurr = c->rbuf;
269 c->wcurr = c->wbuf;
270 c->iovused = 0;
271 c->msgcurr = 0;
272 c->msgused = 0;
273 c->cur_idx = c->total_sfds; /* default index is a invalid value */
274
275 c->ctnwrite = false;
276 c->readval = false;
277 c->change_sfd = false;
278
279 c->precmd.cmd = c->currcmd.cmd = CMD_NULL;
280 c->precmd.isfinish = true; /* default the previous command finished */
281 c->currcmd.isfinish = false;
282 c->precmd.retstat = c->currcmd.retstat = MCD_FAILURE;
283 c->precmd.key_prefix = c->currcmd.key_prefix = 0;
284
285 c->mlget_task.mlget_num = 0;
286 c->mlget_task.value_index = -1; /* default invalid value */
287
288 if (ms_setting.binary_prot) {
289 c->protocol = binary_prot;
290 } else if (is_udp) {
291 c->protocol = ascii_udp_prot;
292 } else {
293 c->protocol = ascii_prot;
294 }
295
296 /* initialize udp */
297 if (ms_conn_udp_init(c, is_udp) != 0) {
298 return -1;
299 }
300
301 /* initialize task */
302 ms_task_init(c);
303
304 if (!(ms_setting.facebook_test && is_udp)) {
305 __sync_fetch_and_add(&ms_stats.active_conns, 1);
306 }
307
308 return 0;
309 }
310
311 /**
312 * when doing 100% get operation, it could preset some objects
313 * to warmup the server. this function is used to initialize the
314 * number of the objects to preset.
315 *
316 * @param c, pointer of the concurrency
317 */
318 static void ms_warmup_num_init(ms_conn_t *c)
319 {
320 /* no set operation, preset all the items in the window */
321 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) {
322 c->warmup_num = c->win_size;
323 c->remain_warmup_num = c->warmup_num;
324 } else {
325 c->warmup_num = 0;
326 c->remain_warmup_num = c->warmup_num;
327 }
328 }
329
330 /**
331 * each connection has an item window, this function initialize
332 * the window. The window is used to generate task.
333 *
334 * @param c, pointer of the concurrency
335 *
336 * @return int, if success, return 0, else return -1
337 */
338 static int ms_item_win_init(ms_conn_t *c)
339 {
340 int exp_cnt = 0;
341
342 c->win_size = (int)ms_setting.win_size;
343 c->set_cursor = 0;
344 c->exec_num = ms_thread.thread_ctx->exec_num_perconn;
345 c->remain_exec_num = c->exec_num;
346
347 c->item_win = (ms_task_item_t *)malloc(sizeof(ms_task_item_t) * (size_t)c->win_size);
348 if (c->item_win == NULL) {
349 fprintf(stderr, "Can't allocate task item array for conn.\n");
350 return -1;
351 }
352 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
353
354 for (int i = 0; i < c->win_size; i++) {
355 c->item_win[i].key_size = (int)ms_setting.distr[i].key_size;
356 c->item_win[i].key_prefix = ms_get_key_prefix();
357 c->item_win[i].key_suffix_offset = ms_setting.distr[i].key_offset;
358 c->item_win[i].value_size = (int)ms_setting.distr[i].value_size;
359 c->item_win[i].value_offset = INVALID_OFFSET; /* default in invalid offset */
360 c->item_win[i].client_time = 0;
361
362 /* set expire time base on the proportion */
363 if (exp_cnt < ms_setting.exp_ver_per * i) {
364 c->item_win[i].exp_time = FIXED_EXPIRE_TIME;
365 exp_cnt++;
366 } else {
367 c->item_win[i].exp_time = 0;
368 }
369 }
370
371 ms_warmup_num_init(c);
372
373 return 0;
374 }
375
376 /**
377 * each connection structure can include one or more sock
378 * handlers. this function create these socks and connect the
379 * server(s).
380 *
381 * @param c, pointer of the concurrency
382 *
383 * @return int, if success, return 0, else return -1
384 */
385 static int ms_conn_sock_init(ms_conn_t *c)
386 {
387 int i;
388 int ret_sfd;
389 int srv_idx = 0;
390
391 assert(c != NULL);
392 assert(c->tcpsfd != NULL);
393
394 for (i = 0; i < c->total_sfds; i++) {
395 ret_sfd = 0;
396 if (ms_setting.rep_write_srv > 0) {
397 /* for replication, each connection need connect all the server */
398 srv_idx = i;
399 } else {
400 /* all the connections in a thread connects the same server */
401 srv_idx = ms_thread.thread_ctx->srv_idx;
402 }
403
404 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
405 ms_setting.servers[srv_idx].srv_port,
406 ms_setting.udp, &ret_sfd) != 0) {
407 break;
408 }
409
410 if (i == 0) {
411 c->sfd = ret_sfd;
412 }
413
414 if (!ms_setting.udp) {
415 c->tcpsfd[i] = ret_sfd;
416 }
417
418 c->alive_sfds++;
419 }
420
421 /* initialize udp sock handler if necessary */
422 if (ms_setting.facebook_test) {
423 ret_sfd = 0;
424 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
425 ms_setting.servers[srv_idx].srv_port,
426 true, &ret_sfd) != 0) {
427 c->udpsfd = 0;
428 } else {
429 c->udpsfd = ret_sfd;
430 }
431 }
432
433 if (i != c->total_sfds || (ms_setting.facebook_test && c->udpsfd == 0)) {
434 if (ms_setting.udp) {
435 close(c->sfd);
436 } else {
437 for (int j = 0; j < i; j++) {
438 close(c->tcpsfd[j]);
439 }
440 }
441
442 if (c->udpsfd != 0) {
443 close(c->udpsfd);
444 }
445
446 return -1;
447 }
448
449 return 0;
450 }
451
452 /**
453 * each connection is managed by libevent, this function
454 * initialize the event of the connection structure.
455 *
456 * @param c, pointer of the concurrency
457 *
458 * @return int, if success, return 0, else return -1
459 */
460 static int ms_conn_event_init(ms_conn_t *c)
461 {
462 /* default event timeout 10 seconds */
463 struct timeval t = {.tv_sec = EVENT_TIMEOUT, .tv_usec = 0};
464 short event_flags = EV_WRITE | EV_PERSIST;
465
466 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
467 event_base_set(ms_thread.base, &c->event);
468 c->ev_flags = event_flags;
469
470 if (c->total_sfds == 1) {
471 if (event_add(&c->event, NULL) == -1) {
472 return -1;
473 }
474 } else {
475 if (event_add(&c->event, &t) == -1) {
476 return -1;
477 }
478 }
479
480 return 0;
481 }
482
483 /**
484 * setup a connection, each connection structure of each
485 * thread must call this function to initialize.
486 *
487 * @param c, pointer of the concurrency
488 *
489 * @return int, if success, return 0, else return -1
490 */
491 int ms_setup_conn(ms_conn_t *c)
492 {
493 if (ms_item_win_init(c) != 0) {
494 return -1;
495 }
496
497 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0) {
498 return -1;
499 }
500
501 if (ms_conn_sock_init(c) != 0) {
502 return -1;
503 }
504
505 if (ms_conn_event_init(c) != 0) {
506 return -1;
507 }
508
509 return 0;
510 }
511
512 /**
513 * Frees a connection.
514 *
515 * @param c, pointer of the concurrency
516 */
517 void ms_conn_free(ms_conn_t *c)
518 {
519 if (c != NULL) {
520 if (c->hdrbuf != NULL)
521 free(c->hdrbuf);
522 if (c->msglist != NULL)
523 free(c->msglist);
524 if (c->rbuf != NULL)
525 free(c->rbuf);
526 if (c->wbuf != NULL)
527 free(c->wbuf);
528 if (c->iov != NULL)
529 free(c->iov);
530 if (c->mlget_task.mlget_item != NULL)
531 free(c->mlget_task.mlget_item);
532 if (c->rudpbuf != NULL)
533 free(c->rudpbuf);
534 if (c->udppkt != NULL)
535 free(c->udppkt);
536 if (c->item_win != NULL)
537 free(c->item_win);
538 if (c->tcpsfd != NULL)
539 free(c->tcpsfd);
540
541 if (--ms_thread.nactive_conn == 0) {
542 free(ms_thread.conn);
543 }
544 }
545 }
546
547 /**
548 * close a connection
549 *
550 * @param c, pointer of the concurrency
551 */
552 static void ms_conn_close(ms_conn_t *c)
553 {
554 assert(c != NULL);
555
556 /* delete the event, the socket and the connection */
557 event_del(&c->event);
558
559 for (int i = 0; i < c->total_sfds; i++) {
560 if (c->tcpsfd[i] > 0) {
561 close(c->tcpsfd[i]);
562 }
563 }
564 c->sfd = 0;
565
566 if (ms_setting.facebook_test) {
567 close(c->udpsfd);
568 }
569
570 __sync_fetch_and_sub(&ms_stats.active_conns, 1);
571
572 ms_conn_free(c);
573
574 if (ms_setting.run_time == 0) {
575 pthread_mutex_lock(&ms_global.run_lock.lock);
576 ms_global.run_lock.count++;
577 pthread_cond_signal(&ms_global.run_lock.cond);
578 pthread_mutex_unlock(&ms_global.run_lock.lock);
579 }
580
581 if (ms_thread.nactive_conn == 0) {
582 pthread_exit(NULL);
583 }
584 }
585
586 /**
587 * create a new sock
588 *
589 * @param ai, server address information
590 *
591 * @return int, if success, return 0, else return -1
592 */
593 static int ms_new_socket(struct addrinfo *ai)
594 {
595 int sfd;
596
597 if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
598 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
599 return -1;
600 }
601
602 return sfd;
603 }
604
605 /**
606 * Sets a socket's send buffer size to the maximum allowed by the system.
607 *
608 * @param sfd, file descriptor of socket
609 */
610 static void ms_maximize_sndbuf(const int sfd)
611 {
612 socklen_t intsize = sizeof(int);
613 unsigned int last_good = 0;
614 unsigned int min, max, avg;
615 unsigned int old_size;
616
617 /* Start with the default size. */
618 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
619 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
620 return;
621 }
622
623 /* Binary-search for the real maximum. */
624 min = old_size;
625 max = MAX_SENDBUF_SIZE;
626
627 while (min <= max) {
628 avg = ((unsigned int)(min + max)) / 2;
629 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
630 last_good = avg;
631 min = avg + 1;
632 } else {
633 max = avg - 1;
634 }
635 }
636 }
637
638 /**
639 * socket connects the server
640 *
641 * @param c, pointer of the concurrency
642 * @param srv_host_name, the host name of the server
643 * @param srv_port, port of server
644 * @param is_udp, whether it's udp
645 * @param ret_sfd, the connected socket file descriptor
646 *
647 * @return int, if success, return 0, else return -1
648 */
649 static int ms_network_connect(ms_conn_t *c, char * srv_host_name,
650 const int srv_port, const bool is_udp, int *ret_sfd)
651 {
652 int sfd;
653 struct linger ling = {0, 0};
654 struct addrinfo *ai;
655 struct addrinfo *next;
656 struct addrinfo hints;
657 char port_buf[NI_MAXSERV];
658 int error;
659 int success = 0;
660
661 int flags = 1;
662
663 /*
664 * the memset call clears nonstandard fields in some impementations
665 * that otherwise mess things up.
666 */
667 memset(&hints, 0, sizeof (hints));
668 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
669 if (is_udp) {
670 hints.ai_protocol = IPPROTO_UDP;
671 hints.ai_socktype = SOCK_DGRAM;
672 hints.ai_family = AF_INET; /* This left here because of issues with OSX 10.5 */
673 } else {
674 hints.ai_family = AF_UNSPEC;
675 hints.ai_protocol = IPPROTO_TCP;
676 hints.ai_socktype = SOCK_STREAM;
677 }
678
679 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
680 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
681 if (error != 0) {
682 if (error != EAI_SYSTEM)
683 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
684 else
685 perror("getaddrinfo()\n");
686
687 return -1;
688 }
689
690 for (next = ai; next; next = next->ai_next) {
691 if ((sfd = ms_new_socket(next)) == -1) {
692 freeaddrinfo(ai);
693 return -1;
694 }
695
696 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
697 if (is_udp) {
698 ms_maximize_sndbuf(sfd);
699 } else {
700 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
701 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
702 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
703 }
704
705 if (is_udp) {
706 c->srv_recv_addr_size = sizeof(struct sockaddr);
707 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
708 } else {
709 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) {
710 close(sfd);
711 freeaddrinfo(ai);
712 return -1;
713 }
714 }
715
716 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
717 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
718 fprintf(stderr, "setting O_NONBLOCK\n");
719 close(sfd);
720 freeaddrinfo(ai);
721 return -1;
722 }
723
724 if (ret_sfd != NULL) {
725 *ret_sfd = sfd;
726 }
727
728 success++;
729 }
730
731 freeaddrinfo(ai);
732
733 /* Return zero if we detected no errors in starting up connections */
734 return success == 0;
735 }
736
737 /**
738 * reconnect a disconnected sock
739 *
740 * @param c, pointer of the concurrency
741 *
742 * @return int, if success, return 0, else return -1
743 */
744 static int ms_reconn(ms_conn_t *c)
745 {
746 int srv_idx = 0;
747 int srv_conn_cnt = 0;
748
749 if (ms_setting.rep_write_srv > 0) {
750 srv_idx = c->cur_idx;
751 srv_conn_cnt = ms_setting.nconns;
752 } else {
753 srv_idx = ms_thread.thread_ctx->srv_idx;
754 srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt;
755 }
756
757 /* close the old socket handler */
758 close(c->sfd);
759 c->tcpsfd[c->cur_idx] = 0;
760
761 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
762 % srv_conn_cnt == 0) {
763
764 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
765 fprintf(stderr, "Server %s:%d disconnect\n",
766 ms_setting.servers[srv_idx].srv_host_name,
767 ms_setting.servers[srv_idx].srv_port);
768 }
769
770 if (ms_setting.rep_write_srv > 0) {
771 int i = 0;
772 for (i = 0; i < c->total_sfds; i++) {
773 if (c->tcpsfd[i] != 0) {
774 break;
775 }
776 }
777
778 /* all socks disconnect */
779 if (i == c->total_sfds) {
780 return -1;
781 }
782 } else {
783 do {
784 /* reconnect success, break the loop */
785 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
786 ms_setting.servers[srv_idx].srv_port,
787 ms_setting.udp, &c->sfd) == 0) {
788
789 c->tcpsfd[c->cur_idx] = c->sfd;
790 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
791 % srv_conn_cnt == 0) {
792
793 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
794 int reconn_time = (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec -
795 ms_setting.servers[srv_idx].disconn_time.tv_sec);
796 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
797 ms_setting.servers[srv_idx].srv_host_name,
798 ms_setting.servers[srv_idx].srv_port, reconn_time);
799 }
800 break;
801 }
802
803 if (c->total_sfds == 1) {
804 /* wait a second and reconnect */
805 sleep(1);
806 }
807 } while (c->total_sfds == 1);
808 }
809
810 if (c->total_sfds > 1 && c->tcpsfd[c->cur_idx] == 0) {
811 c->sfd = 0;
812 c->alive_sfds--;
813 }
814
815 return 0;
816 }
817
818 /**
819 * reconnect several disconnected socks in the connection
820 * structure, the ever-1-second timer of the thread will check
821 * whether some socks in the connections disconnect. if
822 * disconnect, reconnect the sock.
823 *
824 * @param c, pointer of the concurrency
825 *
826 * @return int, if success, return 0, else return -1
827 */
828 int ms_reconn_socks(ms_conn_t *c)
829 {
830 int srv_idx = 0;
831 int ret_sfd = 0;
832 int srv_conn_cnt = 0;
833 struct timeval cur_time;
834
835 assert(c != NULL);
836
837 if (c->total_sfds == 1 || c->total_sfds == c->alive_sfds) {
838 return 0;
839 }
840
841 for (int i = 0; i < c->total_sfds; i++) {
842 if (c->tcpsfd[i] == 0) {
843 gettimeofday(&cur_time, NULL);
844
845 /**
846 * For failover test of replication, reconnect the socks after
847 * it disconnects more than 5 seconds, Otherwise memslap will
848 * block at connect() function and the work threads can't work
849 * in this interval.
850 */
851 if (cur_time.tv_sec - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) {
852 break;
853 }
854
855 if (ms_setting.rep_write_srv > 0) {
856 srv_idx = i;
857 srv_conn_cnt = ms_setting.nconns;
858 } else {
859 srv_idx = ms_thread.thread_ctx->srv_idx;
860 srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt;
861 }
862
863 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
864 ms_setting.servers[srv_idx].srv_port,
865 ms_setting.udp, &ret_sfd) == 0) {
866
867 c->tcpsfd[i] = ret_sfd;
868 c->alive_sfds++;
869
870 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
871 % srv_conn_cnt == 0) {
872
873 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
874 int reconn_time = (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec -
875 ms_setting.servers[srv_idx].disconn_time.tv_sec);
876 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
877 ms_setting.servers[srv_idx].srv_host_name,
878 ms_setting.servers[srv_idx].srv_port, reconn_time);
879 }
880 }
881 }
882 }
883
884 return 0;
885 }
886
887 /**
888 * Tokenize the command string by replacing whitespace with '\0' and update
889 * the token array tokens with pointer to start of each token and length.
890 * Returns total number of tokens. The last valid token is the terminal
891 * token (value points to the first unprocessed character of the string and
892 * length zero).
893 *
894 * Usage example:
895 *
896 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
897 * for(int ix = 0; tokens[ix].length != 0; ix++) {
898 * ...
899 * }
900 * ncommand = tokens[ix].value - command;
901 * command = tokens[ix].value;
902 * }
903 *
904 * @param command, the command string to token
905 * @param tokens, array to store tokens
906 * @param max_tokens, maximum tokens number
907 *
908 * @return int, the number of tokens
909 */
910 static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens)
911 {
912 char *s, *e;
913 int ntokens = 0;
914
915 assert(command != NULL && tokens != NULL && max_tokens > 1);
916
917 for (s = e = command; ntokens < max_tokens - 1; ++e) {
918 if (*e == ' ') {
919 if (s != e) {
920 tokens[ntokens].value = s;
921 tokens[ntokens].length = (size_t)(e - s);
922 ntokens++;
923 *e = '\0';
924 }
925 s = e + 1;
926 } else if (*e == '\0') {
927 if (s != e) {
928 tokens[ntokens].value = s;
929 tokens[ntokens].length = (size_t)(e - s);
930 ntokens++;
931 }
932
933 break; /* string end */
934 }
935 }
936
937 return ntokens;
938 }
939
940 /**
941 * parse the response of server.
942 *
943 * @param c, pointer of the concurrency
944 * @param command, the string responded by server
945 *
946 * @return int, if the command completed return 0, else return
947 * -1
948 */
949 static int ms_ascii_process_line(ms_conn_t *c, char *command)
950 {
951 int ret = 0;
952 int64_t value_len;
953 char *buffer = command;
954
955 assert(c != NULL);
956
957 /**
958 * for command get, we store the returned value into local buffer
959 * then continue in ms_complete_nread().
960 */
961
962 switch (buffer[0]) {
963 case 'V': /* VALUE || VERSION */
964 if (buffer[1] == 'A') { /* VALUE */
965 token_t tokens[MAX_TOKENS];
966 ms_tokenize_command(command, tokens, MAX_TOKENS);
967 value_len = strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
968 c->currcmd.key_prefix = *(uint64_t *)tokens[KEY_TOKEN].value;
969
970 /*
971 We read the \r\n into the string since not doing so is more
972 cycles then the waster of memory to do so.
973
974 We are null terminating through, which will most likely make
975 some people lazy about using the return length.
976 */
977 c->rvbytes = (int)(value_len + 2);
978 c->readval = true;
979 ret = -1;
980 }
981
982 break;
983
984 case 'O': /* OK */
985 c->currcmd.retstat = MCD_SUCCESS;
986
987 case 'S': /* STORED STATS SERVER_ERROR */
988 if (buffer[2] == 'A') {/* STORED STATS */
989 /* STATS*/
990 c->currcmd.retstat = MCD_STAT;
991 } else if (buffer[1] == 'E') {
992 /* SERVER_ERROR */
993 printf("<%d %s\n", c->sfd, buffer);
994
995 c->currcmd.retstat = MCD_SERVER_ERROR;
996 } else if (buffer[1] == 'T') {
997 /* STORED */
998 c->currcmd.retstat = MCD_STORED;
999 } else {
1000 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1001 }
1002 break;
1003
1004 case 'D': /* DELETED DATA */
1005 if (buffer[1] == 'E') {
1006 c->currcmd.retstat = MCD_DELETED;
1007 } else {
1008 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1009 }
1010
1011 break;
1012
1013 case 'N': /* NOT_FOUND NOT_STORED*/
1014 if (buffer[4] == 'F') {
1015 c->currcmd.retstat = MCD_NOTFOUND;
1016 } else if (buffer[4] == 'S') {
1017 printf("<%d %s\n", c->sfd, buffer);
1018 c->currcmd.retstat = MCD_NOTSTORED;
1019 } else {
1020 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1021 }
1022 break;
1023
1024 case 'E': /* PROTOCOL ERROR or END */
1025 if (buffer[1] == 'N') {
1026 /* END */
1027 c->currcmd.retstat = MCD_END;
1028 } else if (buffer[1] == 'R') {
1029 printf("<%d ERROR\n", c->sfd);
1030 c->currcmd.retstat = MCD_PROTOCOL_ERROR;
1031 } else if (buffer[1] == 'X') {
1032 c->currcmd.retstat = MCD_DATA_EXISTS;
1033 printf("<%d %s\n", c->sfd, buffer);
1034 } else {
1035 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1036 }
1037 break;
1038
1039 case 'C': /* CLIENT ERROR */
1040 printf("<%d %s\n", c->sfd, buffer);
1041 c->currcmd.retstat = MCD_CLIENT_ERROR;
1042 break;
1043
1044 default:
1045 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1046 break;
1047 }
1048
1049 return ret;
1050 }
1051
1052 /**
1053 * after one operation completes, reset the concurrency
1054 *
1055 * @param c, pointer of the concurrency
1056 * @param timeout, whether it's timeout
1057 */
1058 void ms_reset_conn(ms_conn_t *c, bool timeout)
1059 {
1060 assert(c != NULL);
1061
1062 if (c->udp) {
1063 if (c->packets > 0 && c->packets < MAX_UDP_PACKET) {
1064 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
1065 }
1066
1067 c->packets = 0;
1068 c->recvpkt = 0;
1069 c->pktcurr = 0;
1070 c->ordcurr = 0;
1071 c->rudpbytes = 0;
1072 }
1073 c->currcmd.isfinish = true;
1074 c->ctnwrite = false;
1075 c->rbytes = 0;
1076 c->rcurr = c->rbuf;
1077 ms_conn_set_state(c, conn_write);
1078 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1079
1080 if (timeout) {
1081 ms_drive_machine(c);
1082 }
1083 }
1084
1085 /**
1086 * if we have a complete line in the buffer, process it.
1087 *
1088 * @param c, pointer of the concurrency
1089 *
1090 * @return int, if success, return 0, else return -1
1091 */
1092 static int ms_try_read_line(ms_conn_t *c)
1093 {
1094 if (c->protocol == binary_prot) {
1095 /* Do we have the complete packet header? */
1096 if ((uint64_t)c->rbytes < sizeof(c->binary_header)) {
1097 /* need more data! */
1098 return 0;
1099 } else {
1100 #ifdef NEED_ALIGN
1101 if (((long)(c->rcurr)) % 8 != 0) {
1102 /* must realign input buffer */
1103 memmove(c->rbuf, c->rcurr, c->rbytes);
1104 c->rcurr = c->rbuf;
1105 if (settings.verbose) {
1106 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1107 }
1108 }
1109 #endif
1110 protocol_binary_response_header* rsp;
1111 rsp = (protocol_binary_response_header*)c->rcurr;
1112
1113 c->binary_header = *rsp;
1114 c->binary_header.response.extlen = rsp->response.extlen;
1115 c->binary_header.response.keylen = ntohl(rsp->response.keylen);
1116 c->binary_header.response.bodylen = ntohl(rsp->response.bodylen);
1117 c->binary_header.response.status = ntohl(rsp->response.status);
1118
1119 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) {
1120 fprintf(stderr, "Invalid magic: %x\n",
1121 c->binary_header.response.magic);
1122 ms_conn_set_state(c, conn_closing);
1123 return 0;
1124 }
1125
1126 /* process this complete response */
1127 if (ms_bin_process_response(c) == 0) {
1128 /* current operation completed */
1129 ms_reset_conn(c, false);
1130 return -1;
1131 } else {
1132 c->rbytes -= (int32_t)sizeof(c->binary_header);
1133 c->rcurr += sizeof(c->binary_header);
1134 }
1135 }
1136 } else {
1137 char *el, *cont;
1138
1139 assert(c != NULL);
1140 assert(c->rcurr <= (c->rbuf + c->rsize));
1141
1142 if (c->rbytes == 0)
1143 return 0;
1144 el = memchr(c->rcurr, '\n', (size_t)c->rbytes);
1145 if (!el)
1146 return 0;
1147 cont = el + 1;
1148 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1149 el--;
1150 }
1151 *el = '\0';
1152
1153 assert(cont <= (c->rcurr + c->rbytes));
1154
1155 /* process this complete line */
1156 if (ms_ascii_process_line(c, c->rcurr) == 0) {
1157 /* current operation completed */
1158 ms_reset_conn(c, false);
1159 return -1;
1160 } else {
1161 /* current operation didn't complete */
1162 c->rbytes -= (int32_t)(cont - c->rcurr);
1163 c->rcurr = cont;
1164 }
1165
1166 assert(c->rcurr <= (c->rbuf + c->rsize));
1167 }
1168
1169 return -1;
1170 }
1171
1172 /**
1173 * because the packet of UDP can't ensure the order, the
1174 * function is used to sort the received udp packet.
1175 *
1176 * @param c, pointer of the concurrency
1177 * @param buf, the buffer to store the ordered packages data
1178 * @param rbytes, the maximum capacity of the buffer
1179 *
1180 * @return int, if success, return the copy bytes, else return
1181 * -1
1182 */
1183 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1184 {
1185 int len = 0;
1186 int wbytes = 0;
1187 uint16_t req_id = 0;
1188 uint16_t seq_num = 0;
1189 uint16_t packets = 0;
1190 unsigned char *header = NULL;
1191
1192 /* no enough data */
1193 assert(c != NULL);
1194 assert(buf != NULL);
1195 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1196
1197 /* calculate received packets count */
1198 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) {
1199 /* the last packet has some data */
1200 c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1201 } else {
1202 c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1203 }
1204
1205 /* get the total packets count if necessary */
1206 if (c->packets == 0) {
1207 c->packets = HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1208 }
1209
1210 /* build the ordered packet array */
1211 for (int i = c->pktcurr; i < c->recvpkt; i++) {
1212 header = (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1213 req_id = (uint16_t)HEADER_TO_REQID(header);
1214 assert(req_id == c->request_id % (1 << 16));
1215
1216 packets = (uint16_t)HEADER_TO_PACKETS(header);
1217 assert(c->packets == HEADER_TO_PACKETS(header));
1218
1219 seq_num = (uint16_t)HEADER_TO_SEQNUM(header);
1220 c->udppkt[seq_num].header = header;
1221 c->udppkt[seq_num].data = (char *)header + UDP_HEADER_SIZE;
1222
1223 if (i == c->recvpkt - 1) {
1224 /* last received packet */
1225 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) {
1226 c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1227 c->pktcurr++;
1228 } else {
1229 c->udppkt[seq_num].rbytes = c->rudpbytes % UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1230 }
1231 } else {
1232 c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1233 c->pktcurr++;
1234 }
1235 }
1236
1237 for (int i = c->ordcurr; i < c->recvpkt; i++) {
1238 /* there is some data to copy */
1239 if (c->udppkt[i].data != NULL && c->udppkt[i].copybytes < c->udppkt[i].rbytes) {
1240 header = c->udppkt[i].header;
1241 len = c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1242 if (len > rbytes - wbytes) {
1243 len = rbytes - wbytes;
1244 }
1245
1246 assert(len <= rbytes - wbytes);
1247 assert(i == HEADER_TO_SEQNUM(header));
1248
1249 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, (size_t)len);
1250 wbytes += len;
1251 c->udppkt[i].copybytes += len;
1252
1253 if (c->udppkt[i].copybytes == c->udppkt[i].rbytes
1254 && c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE) {
1255 /* finish copying all the data of this packet, next */
1256 c->ordcurr++;
1257 }
1258
1259 /* last received packet, and finish copying all the data */
1260 if (c->recvpkt == c->packets && i == c->recvpkt - 1
1261 && c->udppkt[i].copybytes == c->udppkt[i].rbytes) {
1262 break;
1263 }
1264
1265 /* no space to copy data */
1266 if (wbytes >= rbytes) {
1267 break;
1268 }
1269
1270 /* it doesn't finish reading all the data of the packet from network */
1271 if (i != c->recvpkt - 1
1272 && c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE) {
1273 break;
1274 }
1275 } else {
1276 /* no data to copy */
1277 break;
1278 }
1279 }
1280
1281 return(wbytes == 0 ? -1 : wbytes);
1282 }
1283
1284 /**
1285 * encapsulate upd read like tcp read
1286 *
1287 * @param c, pointer of the concurrency
1288 * @param buf, read buffer
1289 * @param len, length to read
1290 *
1291 * @return int, if success, return the read bytes, else return
1292 * -1
1293 */
1294 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1295 {
1296 int res = 0;
1297 int avail = 0;
1298 int rbytes = 0;
1299 int copybytes = 0;
1300
1301 assert(c->udp);
1302
1303 while (1) {
1304 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) {
1305 char *new_rbuf = realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1306 if (!new_rbuf) {
1307 fprintf(stderr, "Couldn't realloc input buffer.\n");
1308 c->rudpbytes = 0; /* ignore what we read */
1309 return -1;
1310 }
1311 c->rudpbuf = new_rbuf;
1312 c->rudpsize *= 2;
1313 }
1314
1315 avail = c->rudpsize - c->rudpbytes;
1316 /* UDP each time read a packet, 1400 bytes */
1317 res = (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1318
1319 if (res > 0) {
1320 __sync_fetch_and_add(&ms_stats.bytes_read, res);
1321 c->rudpbytes += res;
1322 rbytes += res;
1323 if (res == avail) {
1324 continue;
1325 } else {
1326 break;
1327 }
1328 }
1329
1330 if (res == 0) {
1331 /* "connection" closed */
1332 return res;
1333 }
1334
1335 if (res == -1) {
1336 /* no data to read */
1337 return res;
1338 }
1339 }
1340
1341 /* copy data to read buffer */
1342 if (rbytes > 0) {
1343 copybytes = ms_sort_udp_packet(c, buf, len);
1344 }
1345
1346 if (copybytes == -1) {
1347 __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
1348 }
1349
1350 return copybytes;
1351 }
1352
1353 /*
1354 * read from network as much as we can, handle buffer overflow and connection
1355 * close.
1356 * before reading, move the remaining incomplete fragment of a command
1357 * (if any) to the beginning of the buffer.
1358 * return 0 if there's nothing to read on the first read.
1359 */
1360 /**
1361 * read from network as much as we can, handle buffer overflow and connection
1362 * close. before reading, move the remaining incomplete fragment of a command
1363 * (if any) to the beginning of the buffer.
1364 *
1365 * @param c, pointer of the concurrency
1366 *
1367 * @return int,
1368 * return 0 if there's nothing to read on the first read.
1369 * return 1 if get data
1370 * return -1 if error happens
1371 */
1372 static int ms_try_read_network(ms_conn_t *c)
1373 {
1374 int gotdata = 0;
1375 int res;
1376 int64_t avail;
1377
1378 assert(c != NULL);
1379
1380 if (c->rcurr != c->rbuf &&
1381 (!c->readval || c->rvbytes > c->rsize - (c->rcurr - c->rbuf)
1382 || (c->readval && c->rcurr - c->rbuf > c->rbytes))) {
1383 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1384 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1385 c->rcurr = c->rbuf;
1386 }
1387
1388 while (1) {
1389 if (c->rbytes >= c->rsize) {
1390 char *new_rbuf = realloc(c->rbuf, (size_t)c->rsize * 2);
1391 if (!new_rbuf) {
1392 fprintf(stderr, "Couldn't realloc input buffer.\n");
1393 c->rbytes = 0; /* ignore what we read */
1394 return -1;
1395 }
1396 c->rcurr = c->rbuf = new_rbuf;
1397 c->rsize *= 2;
1398 }
1399
1400 avail = c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1401 if (avail == 0) {
1402 break;
1403 }
1404
1405 if (c->udp) {
1406 res = (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1407 } else {
1408 res = (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1409 }
1410
1411 if (res > 0) {
1412 if (!c->udp) {
1413 __sync_fetch_and_add(&ms_stats.bytes_read, res);
1414 }
1415 gotdata = 1;
1416 c->rbytes += res;
1417 if (res == avail) {
1418 continue;
1419 } else {
1420 break;
1421 }
1422 }
1423 if (res == 0) {
1424 /* connection closed */
1425 ms_conn_set_state(c, conn_closing);
1426 return -1;
1427 }
1428 if (res == -1) {
1429 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1430 /* Should close on unhandled errors. */
1431 ms_conn_set_state(c, conn_closing);
1432 return -1;
1433 }
1434 }
1435
1436 return gotdata;
1437 }
1438
1439 /**
1440 * after get the object from server, verify the value if
1441 * necessary.
1442 *
1443 * @param c, pointer of the concurrency
1444 * @param mlget_item, pointer of mulit-get task item structure
1445 * @param value, received value string
1446 * @param vlen, received value string length
1447 */
1448 static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item,
1449 char *value, int vlen)
1450 {
1451 if (c->curr_task.verify) {
1452 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1453 char *orignval = &ms_setting.char_block[c->curr_task.item->value_offset];
1454 char *orignkey = &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1455
1456 /* verify expire time if necessary */
1457 if (c->curr_task.item->exp_time > 0) {
1458 struct timeval curr_time;
1459 gettimeofday(&curr_time, NULL);
1460
1461 /* object expired but get it now */
1462 if (curr_time.tv_sec - c->curr_task.item->client_time
1463 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) {
1464 __sync_fetch_and_add(&ms_stats.exp_get, 1);
1465
1466 if (ms_setting.verbose) {
1467 char set_time[64];
1468 char cur_time[64];
1469 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1470 localtime(&c->curr_task.item->client_time));
1471 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1472 localtime(&curr_time.tv_sec));
1473 fprintf(stderr, "\n<%d expire time verification failed, "
1474 "object expired but get it now\n"
1475 "\tkey len: %d\n"
1476 "\tkey: %lx %.*s\n"
1477 "\tset time: %s current time: %s "
1478 "diff time: %d expire time: %d\n"
1479 "\texpected data: \n"
1480 "\treceived data len: %d\n"
1481 "\treceived data: %.*s\n",
1482 c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix,
1483 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1484 orignkey, set_time, cur_time,
1485 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1486 c->curr_task.item->exp_time,
1487 vlen, vlen, value);
1488 fflush(stderr);
1489 }
1490 }
1491 } else {
1492 if (c->curr_task.item->value_size != vlen
1493 || memcmp(orignval, value, (size_t)vlen) != 0) {
1494 __sync_fetch_and_add(&ms_stats.vef_failed, 1);
1495
1496 if (ms_setting.verbose) {
1497 fprintf(stderr, "\n<%d data verification failed\n"
1498 "\tkey len: %d\n"
1499 "\tkey: %lx %.*s\n"
1500 "\texpected data len: %d\n"
1501 "\texpected data: %.*s\n"
1502 "\treceived data len: %d\n"
1503 "\treceived data: %.*s\n",
1504 c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix,
1505 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1506 orignkey, c->curr_task.item->value_size,
1507 c->curr_task.item->value_size,
1508 orignval, vlen, vlen, value);
1509 fflush(stderr);
1510 }
1511 }
1512 }
1513
1514 c->curr_task.finish_verify = true;
1515
1516 if (mlget_item != NULL) {
1517 mlget_item->finish_verify = true;
1518 }
1519 }
1520 }
1521
1522 /**
1523 * For ASCII protocol, after store the data into the local
1524 * buffer, run this function to handle the data.
1525 *
1526 * @param c, pointer of the concurrency
1527 */
1528 static void ms_ascii_complete_nread(ms_conn_t *c)
1529 {
1530 assert(c != NULL);
1531 assert(c->rbytes >= c->rvbytes);
1532 assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
1533 if (c->rvbytes > 2) {
1534 assert(c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1535 }
1536
1537 /* multi-get */
1538 ms_mlget_task_item_t *mlget_item = NULL;
1539 if ((ms_setting.mult_key_num > 1 &&
1540 c->mlget_task.mlget_num >= ms_setting.mult_key_num) ||
1541 (c->remain_exec_num == 0 && c->mlget_task.mlget_num > 0)) {
1542
1543 c->mlget_task.value_index++;
1544 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index];
1545
1546 if (mlget_item->item->key_prefix == c->currcmd.key_prefix) {
1547 c->curr_task.item = mlget_item->item;
1548 c->curr_task.verify = mlget_item->verify;
1549 c->curr_task.finish_verify = mlget_item->finish_verify;
1550 mlget_item->get_miss = false;
1551 } else {
1552 /* Try to find the task item in multi-get task array */
1553 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
1554 mlget_item = &c->mlget_task.mlget_item[i];
1555 if (mlget_item->item->key_prefix == c->currcmd.key_prefix) {
1556
1557 c->curr_task.item = mlget_item->item;
1558 c->curr_task.verify = mlget_item->verify;
1559 c->curr_task.finish_verify = mlget_item->finish_verify;
1560 mlget_item->get_miss = false;
1561
1562 break;
1563 }
1564 }
1565 }
1566 }
1567
1568 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1569
1570 c->curr_task.get_miss = false;
1571 c->rbytes -= c->rvbytes;
1572 c->rcurr = c->rcurr + c->rvbytes;
1573 assert(c->rcurr <= (c->rbuf + c->rsize));
1574 c->readval = false;
1575 c->rvbytes = 0;
1576 }
1577
1578 /**
1579 * For binary protocol, after store the data into the local
1580 * buffer, run this function to handle the data.
1581 *
1582 * @param c, pointer of the concurrency
1583 */
1584 static void ms_bin_complete_nread(ms_conn_t *c)
1585 {
1586 assert(c != NULL);
1587 assert(c->rbytes >= c->rvbytes);
1588 assert(c->protocol == binary_prot);
1589
1590 int extlen = c->binary_header.response.extlen;
1591 int keylen = c->binary_header.response.keylen;
1592 uint8_t opcode = c->binary_header.response.opcode;
1593
1594 /* not get command or not include value, just return */
1595 if ((opcode != PROTOCOL_BINARY_CMD_GET && opcode != PROTOCOL_BINARY_CMD_GETQ) ||
1596 c->rvbytes <= extlen + keylen) {
1597 /* get miss */
1598 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) {
1599 c->currcmd.retstat = MCD_END;
1600 c->curr_task.get_miss = true;
1601 }
1602
1603 c->readval = false;
1604 c->rvbytes = 0;
1605 ms_reset_conn(c, false);
1606 return;
1607 }
1608
1609 /* multi-get */
1610 ms_mlget_task_item_t *mlget_item = NULL;
1611 if ((ms_setting.mult_key_num > 1 &&
1612 c->mlget_task.mlget_num >= ms_setting.mult_key_num) ||
1613 (c->remain_exec_num == 0 && c->mlget_task.mlget_num > 0)) {
1614
1615 c->mlget_task.value_index++;
1616 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index];
1617
1618 c->curr_task.item = mlget_item->item;
1619 c->curr_task.verify = mlget_item->verify;
1620 c->curr_task.finish_verify = mlget_item->finish_verify;
1621 mlget_item->get_miss = false;
1622 }
1623
1624 ms_verify_value(c, mlget_item, c->rcurr + extlen + keylen, c->rvbytes - extlen - keylen);
1625
1626 c->currcmd.retstat = MCD_END;
1627 c->curr_task.get_miss = false;
1628 c->rbytes -= c->rvbytes;
1629 c->rcurr = c->rcurr + c->rvbytes;
1630 assert(c->rcurr <= (c->rbuf + c->rsize));
1631 c->readval = false;
1632 c->rvbytes = 0;
1633
1634 if (ms_setting.mult_key_num > 1) {
1635 /* multi-get have check all the item */
1636 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) {
1637 ms_reset_conn(c, false);
1638 }
1639 } else {
1640 /* single get */
1641 ms_reset_conn(c, false);
1642 }
1643 }
1644
1645 /**
1646 * we get here after reading the value of get commands.
1647 *
1648 * @param c, pointer of the concurrency
1649 */
1650 static void ms_complete_nread(ms_conn_t *c)
1651 {
1652 assert(c != NULL);
1653 assert(c->rbytes >= c->rvbytes);
1654 assert(c->protocol == ascii_udp_prot
1655 || c->protocol == ascii_prot
1656 || c->protocol == binary_prot);
1657
1658 if (c->protocol == binary_prot) {
1659 ms_bin_complete_nread(c);
1660 } else {
1661 ms_ascii_complete_nread(c);
1662 }
1663 }
1664
1665 /**
1666 * Adds a message header to a connection.
1667 *
1668 * @param c, pointer of the concurrency
1669 *
1670 * @return int, if success, return 0, else return -1
1671 */
1672 static int ms_add_msghdr(ms_conn_t *c)
1673 {
1674 struct msghdr *msg;
1675
1676 assert(c != NULL);
1677
1678 if (c->msgsize == c->msgused) {
1679 msg = realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
1680 if (! msg)
1681 return -1;
1682 c->msglist = msg;
1683 c->msgsize *= 2;
1684 }
1685
1686 msg = c->msglist + c->msgused;
1687
1688 /**
1689 * this wipes msg_iovlen, msg_control, msg_controllen, and
1690 * msg_flags, the last 3 of which aren't defined on solaris:
1691 */
1692 memset(msg, 0, sizeof(struct msghdr));
1693
1694 msg->msg_iov = &c->iov[c->iovused];
1695
1696 if (c->udp && c->srv_recv_addr_size > 0) {
1697 msg->msg_name = &c->srv_recv_addr;
1698 msg->msg_namelen = c->srv_recv_addr_size;
1699 }
1700
1701 c->msgbytes = 0;
1702 c->msgused++;
1703
1704 if (c->udp) {
1705 /* Leave room for the UDP header, which we'll fill in later. */
1706 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
1707 }
1708
1709 return 0;
1710 }
1711
1712 /**
1713 * Ensures that there is room for another structure iovec in a connection's
1714 * iov list.
1715 *
1716 * @param c, pointer of the concurrency
1717 *
1718 * @return int, if success, return 0, else return -1
1719 */
1720 static int ms_ensure_iov_space(ms_conn_t *c)
1721 {
1722 assert(c != NULL);
1723
1724 if (c->iovused >= c->iovsize) {
1725 int i, iovnum;
1726 struct iovec *new_iov = (struct iovec *)realloc(c->iov,
1727 ((uint64_t)c->iovsize * 2) * sizeof(struct iovec));
1728 if (! new_iov)
1729 return -1;
1730 c->iov = new_iov;
1731 c->iovsize *= 2;
1732
1733 /* Point all the msghdr structures at the new list. */
1734 for (i = 0, iovnum = 0; i < c->msgused; i++) {
1735 c->msglist[i].msg_iov = &c->iov[iovnum];
1736 iovnum += (int)c->msglist[i].msg_iovlen;
1737 }
1738 }
1739
1740 return 0;
1741 }
1742
1743 /**
1744 * Adds data to the list of pending data that will be written out to a
1745 * connection.
1746 *
1747 * @param c, pointer of the concurrency
1748 * @param buf, the buffer includes data to send
1749 * @param len, the data length in the buffer
1750 *
1751 * @return int, if success, return 0, else return -1
1752 */
1753 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
1754 {
1755 struct msghdr *m;
1756 int leftover;
1757 bool limit_to_mtu;
1758
1759 assert(c != NULL);
1760
1761 do {
1762 m = &c->msglist[c->msgused - 1];
1763
1764 /*
1765 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
1766 */
1767 limit_to_mtu = c->udp;
1768
1769 /* We may need to start a new msghdr if this one is full. */
1770 if (m->msg_iovlen == IOV_MAX ||
1771 (limit_to_mtu && c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)) {
1772 ms_add_msghdr(c);
1773 m = &c->msglist[c->msgused - 1];
1774 }
1775
1776 if (ms_ensure_iov_space(c) != 0)
1777 return -1;
1778
1779 /* If the fragment is too big to fit in the datagram, split it up */
1780 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE) {
1781 leftover = len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
1782 len -= leftover;
1783 } else {
1784 leftover = 0;
1785 }
1786
1787 m = &c->msglist[c->msgused - 1];
1788 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
1789 m->msg_iov[m->msg_iovlen].iov_len = (size_t)len;
1790
1791 c->msgbytes += len;
1792 c->iovused++;
1793 m->msg_iovlen++;
1794
1795 buf = ((char *)buf) + len;
1796 len = leftover;
1797 } while (leftover > 0);
1798
1799 return 0;
1800 }
1801
1802 /**
1803 * Constructs a set of UDP headers and attaches them to the outgoing messages.
1804 *
1805 * @param c, pointer of the concurrency
1806 *
1807 * @return int, if success, return 0, else return -1
1808 */
1809 static int ms_build_udp_headers(ms_conn_t *c)
1810 {
1811 int i;
1812 unsigned char *hdr;
1813
1814 assert(c != NULL);
1815
1816 c->request_id = ms_get_udp_request_id();
1817
1818 if (c->msgused > c->hdrsize) {
1819 void *new_hdrbuf;
1820 if (c->hdrbuf)
1821 new_hdrbuf = realloc(c->hdrbuf, (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
1822 else
1823 new_hdrbuf = malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
1824 if (! new_hdrbuf)
1825 return -1;
1826 c->hdrbuf = (unsigned char *)new_hdrbuf;
1827 c->hdrsize = c->msgused * 2;
1828 }
1829
1830 /* If this is a multi-packet request, drop it. */
1831 if (c->udp && c->msgused > 1) {
1832 fprintf(stderr, "multi-packet request for UDP not supported.\n");
1833 return -1;
1834 }
1835
1836 hdr = c->hdrbuf;
1837 for (i = 0; i < c->msgused; i++) {
1838 c->msglist[i].msg_iov[0].iov_base = hdr;
1839 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
1840 *hdr++ = (unsigned char)(c->request_id / 256);
1841 *hdr++ = (unsigned char)(c->request_id % 256);
1842 *hdr++ = (unsigned char)(i / 256);
1843 *hdr++ = (unsigned char)(i % 256);
1844 *hdr++ = (unsigned char)(c->msgused / 256);
1845 *hdr++ = (unsigned char)(c->msgused % 256);
1846 *hdr++ = (unsigned char)1; /* support facebook memcached */
1847 *hdr++ = (unsigned char)0;
1848 assert(hdr == ((unsigned char *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE));
1849 }
1850
1851 return 0;
1852 }
1853
1854 /**
1855 * Transmit the next chunk of data from our list of msgbuf structures.
1856 *
1857 * @param c, pointer of the concurrency
1858 *
1859 * @return TRANSMIT_COMPLETE All done writing.
1860 * TRANSMIT_INCOMPLETE More data remaining to write.
1861 * TRANSMIT_SOFT_ERROR Can't write any more right now.
1862 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1863 */
1864 static int ms_transmit(ms_conn_t *c)
1865 {
1866 assert(c != NULL);
1867
1868 if (c->msgcurr < c->msgused &&
1869 c->msglist[c->msgcurr].msg_iovlen == 0) {
1870 /* Finished writing the current msg; advance to the next. */
1871 c->msgcurr++;
1872 }
1873
1874 if (c->msgcurr < c->msgused) {
1875 ssize_t res;
1876 struct msghdr *m = &c->msglist[c->msgcurr];
1877
1878 res = sendmsg(c->sfd, m, 0);
1879 if (res > 0) {
1880 __sync_fetch_and_add(&ms_stats.bytes_written, res);
1881
1882 /* We've written some of the data. Remove the completed
1883 iovec entries from the list of pending writes. */
1884 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len) {
1885 res -= (ssize_t)m->msg_iov->iov_len;
1886 m->msg_iovlen--;
1887 m->msg_iov++;
1888 }
1889
1890 /* Might have written just part of the last iovec entry;
1891 adjust it so the next write will do the rest. */
1892 if (res > 0) {
1893 m->msg_iov->iov_base = (unsigned char *)m->msg_iov->iov_base + res;
1894 m->msg_iov->iov_len -= (uint64_t)res;
1895 }
1896 return TRANSMIT_INCOMPLETE;
1897 }
1898 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1899 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
1900 fprintf(stderr, "Couldn't update event.\n");
1901 ms_conn_set_state(c, conn_closing);
1902 return TRANSMIT_HARD_ERROR;
1903 }
1904 return TRANSMIT_SOFT_ERROR;
1905 }
1906
1907 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1908 we have a real error, on which we close the connection */
1909 fprintf(stderr, "Failed to write, and not due to blocking.\n");
1910
1911 ms_conn_set_state(c, conn_closing);
1912 return TRANSMIT_HARD_ERROR;
1913 } else {
1914 return TRANSMIT_COMPLETE;
1915 }
1916 }
1917
1918 /**
1919 * Shrinks a connection's buffers if they're too big. This prevents
1920 * periodic large "mget" response from server chewing lots of client
1921 * memory.
1922 *
1923 * This should only be called in between requests since it can wipe output
1924 * buffers!
1925 *
1926 * @param c, pointer of the concurrency
1927 */
1928 static void ms_conn_shrink(ms_conn_t *c)
1929 {
1930 assert(c != NULL);
1931
1932 if (c->udp)
1933 return;
1934
1935 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
1936 char *newbuf;
1937
1938 if (c->rcurr != c->rbuf)
1939 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1940
1941 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
1942
1943 if (newbuf) {
1944 c->rbuf = newbuf;
1945 c->rsize = DATA_BUFFER_SIZE;
1946 }
1947 c->rcurr = c->rbuf;
1948 }
1949
1950 if (c->udp && c->rudpsize > UDP_DATA_BUFFER_HIGHWAT
1951 && c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE) {
1952 char *new_rbuf = (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1953 if (!new_rbuf) {
1954 c->rudpbuf = new_rbuf;
1955 c->rudpsize = UDP_DATA_BUFFER_SIZE;
1956 }
1957 /* TODO check error condition? */
1958 }
1959
1960 if (c->msgsize > MSG_LIST_HIGHWAT) {
1961 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist,
1962 MSG_LIST_INITIAL * sizeof(c->msglist[0]));
1963 if (newbuf) {
1964 c->msglist = newbuf;
1965 c->msgsize = MSG_LIST_INITIAL;
1966 }
1967 /* TODO check error condition? */
1968 }
1969
1970 if (c->iovsize > IOV_LIST_HIGHWAT) {
1971 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov,
1972 IOV_LIST_INITIAL * sizeof(c->iov[0]));
1973 if (newbuf) {
1974 c->iov = newbuf;
1975 c->iovsize = IOV_LIST_INITIAL;
1976 }
1977 /* TODO check return value */
1978 }
1979 }
1980
1981 /**
1982 * Sets a connection's current state in the state machine. Any special
1983 * processing that needs to happen on certain state transitions can
1984 * happen here.
1985 *
1986 * @param c, pointer of the concurrency
1987 * @param state, connection state
1988 */
1989 static void ms_conn_set_state(ms_conn_t *c, int state)
1990 {
1991 assert(c != NULL);
1992
1993 if (state != c->state) {
1994 if (state == conn_read) {
1995 ms_conn_shrink(c);
1996 }
1997 c->state = state;
1998 }
1999 }
2000
2001 /**
2002 * update the event if socks change state. for example: when
2003 * change the listen scoket read event to sock write event, or
2004 * change socket handler, we could call this function.
2005 *
2006 * @param c, pointer of the concurrency
2007 * @param new_flags, new event flags
2008 *
2009 * @return bool, if success, return true, else return false
2010 */
2011 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2012 {
2013 /* default event timeout 10 seconds */
2014 struct timeval t = {.tv_sec = EVENT_TIMEOUT, .tv_usec = 0};
2015
2016 assert(c != NULL);
2017
2018 struct event_base *base = c->event.ev_base;
2019 if (c->ev_flags == new_flags && ms_setting.rep_write_srv == 0
2020 && (!ms_setting.facebook_test || c->total_sfds == 1)) {
2021 return true;
2022 }
2023
2024 if (event_del(&c->event) == -1) {
2025 /* try to delete the event again */
2026 if (event_del(&c->event) == -1) {
2027 return false;
2028 }
2029 }
2030
2031 event_set(&c->event, c->sfd, (short)new_flags, ms_event_handler, (void *)c);
2032 event_base_set(base, &c->event);
2033 c->ev_flags = (short)new_flags;
2034
2035 if (c->total_sfds == 1) {
2036 if (event_add(&c->event, NULL) == -1) {
2037 return false;
2038 }
2039 } else {
2040 if (event_add(&c->event, &t) == -1) {
2041 return false;
2042 }
2043 }
2044
2045 return true;
2046 }
2047
2048 /**
2049 * If user want to get the expected throughput, we could limit
2050 * the performance of memslap. we could give up some work and
2051 * just wait a short time. The function is used to check this
2052 * case.
2053 *
2054 * @param c, pointer of the concurrency
2055 *
2056 * @return bool, if success, return true, else return false
2057 */
2058 static bool ms_need_yield(ms_conn_t *c)
2059 {
2060 int64_t tps = 0;
2061 int64_t time_diff = 0;
2062 struct timeval curr_time;
2063 ms_task_t *task = &c->curr_task;
2064
2065 if (ms_setting.expected_tps > 0) {
2066 gettimeofday(&curr_time, NULL);
2067 time_diff = ms_time_diff(&ms_thread.startup_time, &curr_time);
2068 tps = (int64_t)((task->get_opt + task->set_opt) / ((uint64_t)time_diff / 1000000));
2069
2070 /* current throughput is greater than expected throughput */
2071 if (tps > ms_thread.thread_ctx->tps_perconn) {
2072 return true;
2073 }
2074 }
2075
2076 return false;
2077 }
2078
2079 /**
2080 * used to update the start time of each operation
2081 *
2082 * @param c, pointer of the concurrency
2083 */
2084 static void ms_update_start_time(ms_conn_t *c)
2085 {
2086 ms_task_item_t *item = c->curr_task.item;
2087
2088 if (ms_setting.stat_freq > 0 || c->udp
2089 || (c->currcmd.cmd == CMD_SET && item->exp_time > 0)) {
2090
2091 gettimeofday(&c->start_time, NULL);
2092 if (c->currcmd.cmd == CMD_SET && item->exp_time > 0) {
2093 /* record the current time */
2094 item->client_time = c->start_time.tv_sec;
2095 }
2096 }
2097 }
2098
2099 /**
2100 * run the state machine
2101 *
2102 * @param c, pointer of the concurrency
2103 */
2104 static void ms_drive_machine(ms_conn_t *c)
2105 {
2106 bool stop = false;
2107
2108 assert(c != NULL);
2109
2110 while (!stop) {
2111 switch (c->state) {
2112 case conn_read:
2113 if (c->readval) {
2114 if (c->rbytes >= c->rvbytes) {
2115 ms_complete_nread(c);
2116 break;
2117 }
2118 } else {
2119 if (ms_try_read_line(c) != 0) {
2120 break;
2121 }
2122 }
2123
2124 if (ms_try_read_network(c) != 0) {
2125 break;
2126 }
2127
2128 /* doesn't read all the response data, wait event wake up */
2129 if (!c->currcmd.isfinish) {
2130 if (!ms_update_event(c, EV_READ | EV_PERSIST)) {
2131 fprintf(stderr, "Couldn't update event.\n");
2132 ms_conn_set_state(c, conn_closing);
2133 break;
2134 }
2135 stop = true;
2136 break;
2137 }
2138
2139 /* we have no command line and no data to read from network, next write */
2140 ms_conn_set_state(c, conn_write);
2141 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2142
2143 break;
2144
2145 case conn_write:
2146 if (!c->ctnwrite && ms_need_yield(c)) {
2147 usleep(10);
2148
2149 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2150 fprintf(stderr, "Couldn't update event.\n");
2151 ms_conn_set_state(c, conn_closing);
2152 break;
2153 }
2154 stop = true;
2155 break;
2156 }
2157
2158 if (!c->ctnwrite && ms_exec_task(c) != 0) {
2159 ms_conn_set_state(c, conn_closing);
2160 break;
2161 }
2162
2163 /* record the start time before starting to send data if necessary */
2164 if (!c->ctnwrite || (c->change_sfd && c->ctnwrite)) {
2165 if (c->change_sfd) {
2166 c->change_sfd = false;
2167 }
2168 ms_update_start_time(c);
2169 }
2170
2171 /* change sfd if necessary */
2172 if (c->change_sfd) {
2173 c->ctnwrite = true;
2174 stop = true;
2175 break;
2176 }
2177
2178 /* execute task until nothing need be written to network */
2179 if (!c->ctnwrite && c->msgcurr == c->msgused) {
2180 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2181 fprintf(stderr, "Couldn't update event.\n");
2182 ms_conn_set_state(c, conn_closing);
2183 break;
2184 }
2185 stop = true;
2186 break;
2187 }
2188
2189 switch (ms_transmit(c)) {
2190 case TRANSMIT_COMPLETE:
2191 /* we have no data to write to network, next wait repose */
2192 if (!ms_update_event(c, EV_READ | EV_PERSIST)) {
2193 fprintf(stderr, "Couldn't update event.\n");
2194 ms_conn_set_state(c, conn_closing);
2195 c->ctnwrite = false;
2196 break;
2197 }
2198 ms_conn_set_state(c, conn_read);
2199 c->ctnwrite = false;
2200 stop = true;
2201 break;
2202
2203 case TRANSMIT_INCOMPLETE:
2204 c->ctnwrite = true;
2205 break; /* Continue in state machine. */
2206
2207 case TRANSMIT_HARD_ERROR:
2208 c->ctnwrite = false;
2209 break;
2210
2211 case TRANSMIT_SOFT_ERROR:
2212 c->ctnwrite = true;
2213 stop = true;
2214 break;
2215 default:
2216 break;
2217 }
2218 break;
2219
2220 case conn_closing:
2221 /* recovery mode, need reconnect if connection close */
2222 if (ms_setting.reconnect && (!ms_global.time_out
2223 || (ms_setting.run_time == 0 && c->remain_exec_num > 0))) {
2224
2225 if (ms_reconn(c) != 0) {
2226 ms_conn_close(c);
2227 stop = true;
2228 break;
2229 }
2230
2231 ms_reset_conn(c, false);
2232
2233 if (c->total_sfds == 1) {
2234 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2235 fprintf(stderr, "Couldn't update event.\n");
2236 ms_conn_set_state(c, conn_closing);
2237 break;
2238 }
2239 }
2240
2241 break;
2242 } else {
2243 ms_conn_close(c);
2244 stop = true;
2245 break;
2246 }
2247 default:
2248 assert(0);
2249 }
2250 }
2251
2252 return;
2253 }
2254
2255 /**
2256 * the event handler of each thread
2257 *
2258 * @param fd, the file descriptor of socket
2259 * @param which, event flag
2260 * @param arg, argument
2261 */
2262 void ms_event_handler(const int fd, const short which, void *arg)
2263 {
2264 ms_conn_t *c = (ms_conn_t *)arg;
2265 assert(c != NULL);
2266
2267 c->which = which;
2268
2269 /* sanity */
2270 if (fd != c->sfd) {
2271 fprintf(stderr, "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2272 fd, c->sfd);
2273 ms_conn_close(c);
2274 exit(1);
2275 }
2276 assert(fd == c->sfd);
2277
2278 /* event timeout, close the current connection */
2279 if (c->which == EV_TIMEOUT) {
2280 ms_conn_set_state(c, conn_closing);
2281 }
2282
2283 ms_drive_machine(c);
2284
2285 /* wait for next event */
2286 return;
2287 }
2288
2289 /**
2290 * get the next socket descriptor index to run for replication
2291 *
2292 * @param c, pointer of the concurrency
2293 * @param cmd, command(get or set )
2294 *
2295 * @return int, if success, return the index, else return 0
2296 */
2297 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2298 {
2299 int sock_index = -1;
2300 int i = 0;
2301
2302 if (c->total_sfds == 1) {
2303 return 0;
2304 }
2305
2306 if (ms_setting.rep_write_srv == 0) {
2307 return sock_index;
2308 }
2309
2310 do {
2311 if (cmd == CMD_SET) {
2312 for (i = 0; i < ms_setting.rep_write_srv; i++) {
2313 if (c->tcpsfd[i] > 0) {
2314 break;
2315 }
2316 }
2317
2318 if (i == ms_setting.rep_write_srv) {
2319 /* random get one replication server to read */
2320 sock_index = (int)(random() % c->total_sfds);
2321 } else {
2322 /* random get one replication writing server to write */
2323 sock_index = (int)(random() % ms_setting.rep_write_srv);
2324 }
2325 } else if (cmd == CMD_GET) {
2326 /* random get one replication server to read */
2327 sock_index = (int)(random() % c->total_sfds);
2328 }
2329 } while (c->tcpsfd[sock_index] == 0);
2330
2331 return sock_index;
2332 }
2333
2334 /**
2335 * get the next socket descriptor index to run
2336 *
2337 * @param c, pointer of the concurrency
2338 *
2339 * @return int, return the index
2340 */
2341 static int ms_get_next_sock_index(ms_conn_t *c)
2342 {
2343 int sock_index = 0;
2344
2345 do {
2346 sock_index = (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2347 } while (c->tcpsfd[sock_index] == 0);
2348
2349 return sock_index;
2350 }
2351
2352 /**
2353 * update socket event of the connections
2354 *
2355 * @param c, pointer of the concurrency
2356 *
2357 * @return int, if success, return 0, else return -1
2358 */
2359 static int ms_update_conn_sock_event(ms_conn_t *c)
2360 {
2361 assert(c != NULL);
2362
2363 switch (c->currcmd.cmd) {
2364 case CMD_SET:
2365 if (ms_setting.facebook_test && c->udp) {
2366 c->sfd = c->tcpsfd[0];
2367 c->udp = false;
2368 c->change_sfd = true;
2369 }
2370 break;
2371 case CMD_GET:
2372 if (ms_setting.facebook_test && !c->udp) {
2373 c->sfd = c->udpsfd;
2374 c->udp = true;
2375 c->change_sfd = true;
2376 }
2377 break;
2378 default:
2379 break;
2380 }
2381
2382 if (!c->udp && c->total_sfds > 1) {
2383 if (c->cur_idx != c->total_sfds) {
2384 if (ms_setting.rep_write_srv == 0) {
2385 c->cur_idx = ms_get_next_sock_index(c);
2386 } else {
2387 c->cur_idx = ms_get_rep_sock_index(c, c->currcmd.cmd);
2388 }
2389 } else {
2390 /* must select the first sock of the connection at the beginning */
2391 c->cur_idx = 0;
2392 }
2393
2394 c->sfd = c->tcpsfd[c->cur_idx];
2395 assert(c->sfd != 0);
2396 c->change_sfd = true;
2397 }
2398
2399 if (c->change_sfd) {
2400 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2401 fprintf(stderr, "Couldn't update event.\n");
2402 ms_conn_set_state(c, conn_closing);
2403 return -1;
2404 }
2405 }
2406
2407 return 0;
2408 }
2409
2410 /**
2411 * for ASCII protocol, this function build the set command
2412 * string and send the command.
2413 *
2414 * @param c, pointer of the concurrency
2415 * @param item, pointer of task item which includes the object
2416 * information
2417 *
2418 * @return int, if success, return 0, else return -1
2419 */
2420 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2421 {
2422 int value_offset;
2423 int write_len;
2424 char *buffer = c->wbuf;
2425
2426 write_len= sprintf(buffer, " %u %d %d\r\n", 0, item->exp_time, item->value_size);
2427
2428 if (write_len > c->wsize) {
2429 /* ought to be always enough. just fail for simplicity */
2430 fprintf(stderr, "output command line too long.\n");
2431 return -1;
2432 }
2433
2434 if (item->value_offset == INVALID_OFFSET) {
2435 value_offset = item->key_suffix_offset;
2436 } else {
2437 value_offset = item->value_offset;
2438 }
2439
2440 if (ms_add_iov(c, "set ", 4) != 0 ||
2441 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 ||
2442 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2443 item->key_size - (int)KEY_PREFIX_SIZE) != 0 ||
2444 ms_add_iov(c, buffer, write_len) != 0 ||
2445 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size) != 0 ||
2446 ms_add_iov(c, "\r\n", 2) != 0 ||
2447 (c->udp && ms_build_udp_headers(c) != 0)) {
2448
2449 return -1;
2450 }
2451
2452 return 0;
2453 }
2454
2455 /**
2456 * used to send set command to server
2457 *
2458 * @param c, pointer of the concurrency
2459 * @param item, pointer of task item which includes the object
2460 * information
2461 *
2462 * @return int, if success, return 0, else return -1
2463 */
2464 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2465 {
2466 assert(c != NULL);
2467
2468 c->currcmd.cmd = CMD_SET;
2469 c->currcmd.isfinish = false;
2470 c->currcmd.retstat = MCD_FAILURE;
2471
2472 if (ms_update_conn_sock_event(c) != 0) {
2473 return -1;
2474 }
2475
2476 c->msgcurr = 0;
2477 c->msgused = 0;
2478 c->iovused = 0;
2479 if (ms_add_msghdr(c) != 0) {
2480 fprintf(stderr, "Out of memory preparing request.");
2481 return -1;
2482 }
2483
2484 /* binary protocol */
2485 if (c->protocol == binary_prot) {
2486 if (ms_build_bin_write_buf_set(c, item) != 0) {
2487 return -1;
2488 }
2489 } else {
2490 if (ms_build_ascii_write_buf_set(c, item) != 0) {
2491 return -1;
2492 }
2493 }
2494
2495 __sync_fetch_and_add(&ms_stats.obj_bytes, item->key_size + item->value_size);
2496 __sync_fetch_and_add(&ms_stats.cmd_set, 1);
2497
2498 return 0;
2499 }
2500
2501 /**
2502 * for ASCII protocol, this function build the get command
2503 * string and send the command.
2504 *
2505 * @param c, pointer of the concurrency
2506 * @param item, pointer of task item which includes the object
2507 * information
2508 *
2509 * @return int, if success, return 0, else return -1
2510 */
2511 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
2512 {
2513 if (ms_add_iov(c, "get ", 4) != 0 ||
2514 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 ||
2515 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2516 item->key_size - (int)KEY_PREFIX_SIZE) != 0 ||
2517 ms_add_iov(c, "\r\n", 2) != 0 ||
2518 (c->udp && ms_build_udp_headers(c) != 0)) {
2519
2520 return -1;
2521 }
2522
2523 return 0;
2524 }
2525
2526 /**
2527 * used to send the get command to server
2528 *
2529 * @param c, pointer of the concurrency
2530 * @param item, pointer of task item which includes the object
2531 * information
2532 * @param verify, whether do verification
2533 *
2534 * @return int, if success, return 0, else return -1
2535 */
2536 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
2537 {
2538 /* verify not supported yet */
2539 UNUSED_ARGUMENT(verify);
2540
2541 assert(c != NULL);
2542
2543 c->currcmd.cmd = CMD_GET;
2544 c->currcmd.isfinish = false;
2545 c->currcmd.retstat = MCD_FAILURE;
2546
2547 if (ms_update_conn_sock_event(c) != 0) {
2548 return -1;
2549 }
2550
2551 c->msgcurr = 0;
2552 c->msgused = 0;
2553 c->iovused = 0;
2554 if (ms_add_msghdr(c) != 0) {
2555 fprintf(stderr, "Out of memory preparing request.");
2556 return -1;
2557 }
2558
2559 /* binary protocol */
2560 if (c->protocol == binary_prot) {
2561 if (ms_build_bin_write_buf_get(c, item) != 0) {
2562 return -1;
2563 }
2564 } else {
2565 if (ms_build_ascii_write_buf_get(c, item) != 0) {
2566 return -1;
2567 }
2568 }
2569
2570 __sync_fetch_and_add(&ms_stats.cmd_get, 1);
2571
2572 return 0;
2573 }
2574
2575 /**
2576 * for ASCII protocol, this function build the multi-get command
2577 * string and send the command.
2578 *
2579 * @param c, pointer of the concurrency
2580 *
2581 * @return int, if success, return 0, else return -1
2582 */
2583 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
2584 {
2585 ms_task_item_t *item;
2586
2587 if (ms_add_iov(c, "get", 3) != 0) {
2588 return -1;
2589 }
2590
2591 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2592 item = c->mlget_task.mlget_item[i].item;
2593 assert(item != NULL);
2594 if (ms_add_iov(c, " ", 1) != 0 ||
2595 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 ||
2596 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2597 item->key_size - (int)KEY_PREFIX_SIZE) != 0) {
2598 return -1;
2599 }
2600 }
2601
2602 if (ms_add_iov(c, "\r\n", 2) != 0 ||
2603 (c->udp && ms_build_udp_headers(c) != 0)) {
2604 return -1;
2605 }
2606
2607 return 0;
2608 }
2609
2610 /**
2611 * used to send the multi-get command to server
2612 *
2613 * @param c, pointer of the concurrency
2614 *
2615 * @return int, if success, return 0, else return -1
2616 */
2617 int ms_mcd_mlget(ms_conn_t *c)
2618 {
2619 ms_task_item_t *item;
2620
2621 assert(c != NULL);
2622 assert(c->mlget_task.mlget_num >= 1);
2623
2624 c->currcmd.cmd = CMD_GET;
2625 c->currcmd.isfinish = false;
2626 c->currcmd.retstat = MCD_FAILURE;
2627
2628 if (ms_update_conn_sock_event(c) != 0) {
2629 return -1;
2630 }
2631
2632 c->msgcurr = 0;
2633 c->msgused = 0;
2634 c->iovused = 0;
2635 if (ms_add_msghdr(c) != 0) {
2636 fprintf(stderr, "Out of memory preparing request.");
2637 return -1;
2638 }
2639
2640 /* binary protocol */
2641 if (c->protocol == binary_prot) {
2642 if (ms_build_bin_write_buf_mlget(c) != 0) {
2643 return -1;
2644 }
2645 } else {
2646 if (ms_build_ascii_write_buf_mlget(c) != 0) {
2647 return -1;
2648 }
2649 }
2650
2651 /* decrease operation time of each item */
2652 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2653 item = c->mlget_task.mlget_item[i].item;
2654 __sync_fetch_and_add(&ms_stats.cmd_get, 1);
2655 }
2656
2657 return 0;
2658 }
2659
2660 /**
2661 * binary protocol support
2662 */
2663
2664 /**
2665 * for binary protocol, parse the response of server
2666 *
2667 * @param c, pointer of the concurrency
2668 *
2669 * @return int, if success, return 0, else return -1
2670 */
2671 static int ms_bin_process_response(ms_conn_t *c)
2672 {
2673 const char *errstr = NULL;
2674
2675 assert(c != NULL);
2676
2677 uint32_t bodylen = c->binary_header.response.bodylen;
2678 uint8_t opcode = c->binary_header.response.opcode;
2679 uint16_t status = c->binary_header.response.status;
2680
2681 if (bodylen > 0) {
2682 c->rvbytes = (int32_t)bodylen;
2683 c->readval = true;
2684 return 1;
2685 } else {
2686 switch (status) {
2687 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
2688 if (opcode == PROTOCOL_BINARY_CMD_SET) {
2689 c->currcmd.retstat = MCD_STORED;
2690 } else if (opcode == PROTOCOL_BINARY_CMD_DELETE) {
2691 c->currcmd.retstat = MCD_DELETED;
2692 } else if (opcode == PROTOCOL_BINARY_CMD_GET) {
2693 c->currcmd.retstat = MCD_END;
2694 }
2695 break;
2696 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
2697 errstr = "Out of memory";
2698 c->currcmd.retstat = MCD_SERVER_ERROR;
2699 break;
2700 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
2701 errstr = "Unknown command";
2702 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
2703 break;
2704 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
2705 errstr = "Not found";
2706 c->currcmd.retstat = MCD_NOTFOUND;
2707 break;
2708 case PROTOCOL_BINARY_RESPONSE_EINVAL:
2709 errstr = "Invalid arguments";
2710 c->currcmd.retstat = MCD_PROTOCOL_ERROR;
2711 break;
2712 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
2713 errstr = "Data exists for key.";
2714 break;
2715 case PROTOCOL_BINARY_RESPONSE_E2BIG:
2716 errstr = "Too large.";
2717 c->currcmd.retstat = MCD_SERVER_ERROR;
2718 break;
2719 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
2720 errstr = "Not stored.";
2721 c->currcmd.retstat = MCD_NOTSTORED;
2722 break;
2723 default:
2724 errstr = "Unknown error";
2725 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
2726 break;
2727 }
2728
2729 if (errstr != NULL) {
2730 fprintf(stderr, "%s\n", errstr);
2731 }
2732 }
2733
2734 return 0;
2735 }
2736
2737 /* build binary header and add the header to the buffer to send */
2738 /**
2739 * build binary header and add the header to the buffer to send
2740 *
2741 * @param c, pointer of the concurrency
2742 * @param opcode, operation code
2743 * @param hdr_len, length of header
2744 * @param key_len, length of key
2745 * @param body_len. length of body
2746 */
2747 static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len,
2748 uint16_t key_len, uint32_t body_len) {
2749 protocol_binary_request_header* header;
2750
2751 assert(c != NULL);
2752
2753 header = (protocol_binary_request_header *)c->wcurr;
2754
2755 header->request.magic = (uint8_t)PROTOCOL_BINARY_REQ;
2756 header->request.opcode = (uint8_t)opcode;
2757 header->request.keylen = htonl(key_len);
2758
2759 header->request.extlen = (uint8_t)hdr_len;
2760 header->request.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
2761 header->request.reserved = 0;
2762
2763 header->request.bodylen = htonl(body_len);
2764 header->request.opaque = 0;
2765 header->request.cas = 0;
2766
2767 ms_add_iov(c, c->wcurr, sizeof(header->request));
2768 }
2769
2770 /**
2771 * add the key to the socket write buffer array
2772 *
2773 * @param c, pointer of the concurrency
2774 * @param item, pointer of task item which includes the object
2775 * information
2776 */
2777 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
2778 {
2779 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
2780 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2781 item->key_size - (int)KEY_PREFIX_SIZE);
2782 }
2783
2784 /**
2785 * for binary protocol, this function build the set command
2786 * and add the command to send buffer array.
2787 *
2788 * @param c, pointer of the concurrency
2789 * @param item, pointer of task item which includes the object
2790 * information
2791 *
2792 * @return int, if success, return 0, else return -1
2793 */
2794 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2795 {
2796 assert(c->wbuf == c->wcurr);
2797
2798 int value_offset;
2799 protocol_binary_request_set* rep = (protocol_binary_request_set*)c->wcurr;
2800 uint16_t keylen = (uint16_t)item->key_size;
2801 uint32_t bodylen = (uint32_t)sizeof(rep->message.body) + (uint32_t)keylen + (uint32_t)item->value_size;
2802
2803 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_SET, sizeof(rep->message.body), keylen, bodylen);
2804 rep->message.body.flags = 0;
2805 rep->message.body.expiration = htonl((uint32_t)item->exp_time);
2806 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
2807 ms_add_key_to_iov(c, item);
2808
2809 if (item->value_offset == INVALID_OFFSET) {
2810 value_offset = item->key_suffix_offset;
2811 } else {
2812 value_offset = item->value_offset;
2813 }
2814 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
2815
2816 return 0;
2817 }
2818
2819 /**
2820 * for binary protocol, this function build the get command and
2821 * add the command to send buffer array.
2822 *
2823 * @param c, pointer of the concurrency
2824 * @param item, pointer of task item which includes the object
2825 * information
2826 *
2827 * @return int, if success, return 0, else return -1
2828 */
2829 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
2830 {
2831 assert(c->wbuf == c->wcurr);
2832
2833 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, (uint32_t)item->key_size);
2834 ms_add_key_to_iov(c, item);
2835
2836 return 0;
2837 }
2838
2839 /**
2840 * for binary protocol, this function build the multi-get
2841 * command and add the command to send buffer array.
2842 *
2843 * @param c, pointer of the concurrency
2844 * @param item, pointer of task item which includes the object
2845 * information
2846 *
2847 * @return int, if success, return 0, else return -1
2848 */
2849 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
2850 {
2851 ms_task_item_t *item;
2852
2853 assert(c->wbuf == c->wcurr);
2854
2855 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2856 item = c->mlget_task.mlget_item[i].item;
2857 assert(item != NULL);
2858
2859 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, (uint32_t)item->key_size);
2860 ms_add_key_to_iov(c, item);
2861 c->wcurr += sizeof(protocol_binary_request_get);
2862 }
2863
2864 c->wcurr = c->wbuf;
2865
2866 return 0;
2867 }
2868