more cleanup
[m6w6/libmemcached] / contrib / bin / memaslap / ms_conn.c
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #include <stdio.h>
19 #include <inttypes.h>
20 #include <limits.h>
21 #include <sys/uio.h>
22 #include <event.h>
23 #include <fcntl.h>
24 #include <netinet/tcp.h>
25 #include <netinet/in.h>
26
27 #if defined(HAVE_ARPA_INET_H)
28 # include <arpa/inet.h>
29 #endif
30
31 #if defined(HAVE_SYS_TIME_H)
32 # include <sys/time.h>
33 #endif
34 #include <time.h>
35
36 #include "ms_setting.h"
37 #include "ms_thread.h"
38 #include "ms_atomic.h"
39
40 #ifdef linux
41 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
42 * optimize the conversion functions, but the prototypes generate warnings
43 * from gcc. The conversion methods isn't the bottleneck for my app, so
44 * just remove the warnings by undef'ing the optimization ..
45 */
46 # undef ntohs
47 # undef ntohl
48 # undef htons
49 # undef htonl
50 #endif
51
52 /* for network write */
53 #define TRANSMIT_COMPLETE 0
54 #define TRANSMIT_INCOMPLETE 1
55 #define TRANSMIT_SOFT_ERROR 2
56 #define TRANSMIT_HARD_ERROR 3
57
58 /* for generating key */
59 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
60 #define KEY_PREFIX_MASK 0x1010101010101010
61
62 /* For parse the value length return by server */
63 #define KEY_TOKEN 1
64 #define VALUELEN_TOKEN 3
65
66 /* global increasing counter, to ensure the key prefix unique */
67 static uint64_t key_prefix_seq = KEY_PREFIX_BASE;
68
69 /* global increasing counter, generating request id for UDP */
70 static ATOMIC uint32_t udp_request_id = 0;
71
72 extern pthread_key_t ms_thread_key;
73
74 /* generate upd request id */
75 static uint32_t ms_get_udp_request_id(void);
76
77 /* connect initialize */
78 static void ms_task_init(ms_conn_t *c);
79 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
80 static int ms_conn_sock_init(ms_conn_t *c);
81 static int ms_conn_event_init(ms_conn_t *c);
82 static int ms_conn_init(ms_conn_t *c, const int init_state, const int read_buffer_size,
83 const bool is_udp);
84 static void ms_warmup_num_init(ms_conn_t *c);
85 static int ms_item_win_init(ms_conn_t *c);
86
87 /* connection close */
88 void ms_conn_free(ms_conn_t *c);
89 static void ms_conn_close(ms_conn_t *c);
90
91 /* create network connection */
92 static int ms_new_socket(struct addrinfo *ai);
93 static void ms_maximize_sndbuf(const int sfd);
94 static int ms_network_connect(ms_conn_t *c, char *srv_host_name, const int srv_port,
95 const bool is_udp, int *ret_sfd);
96 static int ms_reconn(ms_conn_t *c);
97
98 /* read and parse */
99 static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens);
100 static int ms_ascii_process_line(ms_conn_t *c, char *command);
101 static int ms_try_read_line(ms_conn_t *c);
102 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
103 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
104 static int ms_try_read_network(ms_conn_t *c);
105 static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, char *value, int vlen);
106 static void ms_ascii_complete_nread(ms_conn_t *c);
107 static void ms_bin_complete_nread(ms_conn_t *c);
108 static void ms_complete_nread(ms_conn_t *c);
109
110 /* send functions */
111 static int ms_add_msghdr(ms_conn_t *c);
112 static int ms_ensure_iov_space(ms_conn_t *c);
113 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
114 static int ms_build_udp_headers(ms_conn_t *c);
115 static int ms_transmit(ms_conn_t *c);
116
117 /* status adjustment */
118 static void ms_conn_shrink(ms_conn_t *c);
119 static void ms_conn_set_state(ms_conn_t *c, int state);
120 static bool ms_update_event(ms_conn_t *c, const int new_flags);
121 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
122 static uint32_t ms_get_next_sock_index(ms_conn_t *c);
123 static int ms_update_conn_sock_event(ms_conn_t *c);
124 static bool ms_need_yield(ms_conn_t *c);
125 static void ms_update_start_time(ms_conn_t *c);
126
127 /* main loop */
128 static void ms_drive_machine(ms_conn_t *c);
129 void ms_event_handler(const int fd, const short which, void *arg);
130
131 /* ascii protocol */
132 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
133 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
134 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
135
136 /* binary protocol */
137 static int ms_bin_process_response(ms_conn_t *c);
138 static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, uint16_t key_len,
139 uint32_t body_len);
140 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
141 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
142 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
143 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
144
145 /**
146 * each key has two parts, prefix and suffix. The suffix is a
147 * string random get form the character table. The prefix is a
148 * uint64_t variable. And the prefix must be unique. we use the
149 * prefix to identify a key. And the prefix can't include
150 * character ' ' '\r' '\n' '\0'.
151 *
152 * @return uint64_t
153 */
154 uint64_t ms_get_key_prefix(void) {
155 uint64_t key_prefix;
156
157 pthread_mutex_lock(&ms_global.seq_mutex);
158 key_prefix_seq |= KEY_PREFIX_MASK;
159 key_prefix = key_prefix_seq;
160 key_prefix_seq++;
161 pthread_mutex_unlock(&ms_global.seq_mutex);
162
163 return key_prefix;
164 } /* ms_get_key_prefix */
165
166 /**
167 * get an unique udp request id
168 *
169 * @return an unique UDP request id
170 */
171 static uint32_t ms_get_udp_request_id(void) {
172 return atomic_add_32_nv(&udp_request_id, 1);
173 }
174
175 /**
176 * initialize current task structure
177 *
178 * @param c, pointer of the concurrency
179 */
180 static void ms_task_init(ms_conn_t *c) {
181 c->curr_task.cmd = CMD_NULL;
182 c->curr_task.item = 0;
183 c->curr_task.verify = false;
184 c->curr_task.finish_verify = true;
185 c->curr_task.get_miss = true;
186
187 c->curr_task.get_opt = 0;
188 c->curr_task.set_opt = 0;
189 c->curr_task.cycle_undo_get = 0;
190 c->curr_task.cycle_undo_set = 0;
191 c->curr_task.verified_get = 0;
192 c->curr_task.overwrite_set = 0;
193 } /* ms_task_init */
194
195 /**
196 * initialize udp for the connection structure
197 *
198 * @param c, pointer of the concurrency
199 * @param is_udp, whether it's udp
200 *
201 * @return int, if success, return EXIT_SUCCESS, else return -1
202 */
203 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) {
204 c->hdrbuf = 0;
205 c->rudpbuf = 0;
206 c->udppkt = 0;
207
208 c->rudpsize = UDP_DATA_BUFFER_SIZE;
209 c->hdrsize = 0;
210
211 c->rudpbytes = 0;
212 c->packets = 0;
213 c->recvpkt = 0;
214 c->pktcurr = 0;
215 c->ordcurr = 0;
216
217 c->udp = is_udp;
218
219 if (c->udp || (!c->udp && ms_setting.facebook_test)) {
220 c->rudpbuf = (char *) malloc((size_t) c->rudpsize);
221 c->udppkt = (ms_udppkt_t *) malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
222
223 if ((c->rudpbuf == NULL) || (c->udppkt == NULL)) {
224 if (c->rudpbuf)
225 free(c->rudpbuf);
226 if (c->udppkt)
227 free(c->udppkt);
228 fprintf(stderr, "malloc()\n");
229 return -1;
230 }
231 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
232 }
233
234 return EXIT_SUCCESS;
235 } /* ms_conn_udp_init */
236
237 /**
238 * initialize the connection structure
239 *
240 * @param c, pointer of the concurrency
241 * @param init_state, (conn_read, conn_write, conn_closing)
242 * @param read_buffer_size
243 * @param is_udp, whether it's udp
244 *
245 * @return int, if success, return EXIT_SUCCESS, else return -1
246 */
247 static int ms_conn_init(ms_conn_t *c, const int init_state, const int read_buffer_size,
248 const bool is_udp) {
249 assert(c);
250
251 c->rbuf = c->wbuf = 0;
252 c->iov = 0;
253 c->msglist = 0;
254
255 c->rsize = read_buffer_size;
256 c->wsize = WRITE_BUFFER_SIZE;
257 c->iovsize = IOV_LIST_INITIAL;
258 c->msgsize = MSG_LIST_INITIAL;
259
260 /* for replication, each connection need connect all the server */
261 if (ms_setting.rep_write_srv > 0) {
262 c->total_sfds = ms_setting.srv_cnt * ms_setting.sock_per_conn;
263 } else {
264 c->total_sfds = ms_setting.sock_per_conn;
265 }
266 c->alive_sfds = 0;
267
268 c->rbuf = (char *) malloc((size_t) c->rsize);
269 c->wbuf = (char *) malloc((size_t) c->wsize);
270 c->iov = (struct iovec *) malloc(sizeof(struct iovec) * (size_t) c->iovsize);
271 c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * (size_t) c->msgsize);
272 if (ms_setting.mult_key_num > 1) {
273 c->mlget_task.mlget_item = (ms_mlget_task_item_t *) malloc(sizeof(ms_mlget_task_item_t)
274 * (size_t) ms_setting.mult_key_num);
275 }
276 c->tcpsfd = (int *) malloc((size_t) c->total_sfds * sizeof(int));
277
278 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL) || (c->msglist == NULL)
279 || (c->tcpsfd == NULL)
280 || ((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_item == NULL)))
281 {
282 if (c->rbuf)
283 free(c->rbuf);
284 if (c->wbuf)
285 free(c->wbuf);
286 if (c->iov)
287 free(c->iov);
288 if (c->msglist)
289 free(c->msglist);
290 if (c->mlget_task.mlget_item)
291 free(c->mlget_task.mlget_item);
292 if (c->tcpsfd)
293 free(c->tcpsfd);
294 fprintf(stderr, "malloc()\n");
295 return -1;
296 }
297
298 c->state = init_state;
299 c->rvbytes = 0;
300 c->rbytes = 0;
301 c->rcurr = c->rbuf;
302 c->wcurr = c->wbuf;
303 c->iovused = 0;
304 c->msgcurr = 0;
305 c->msgused = 0;
306 c->cur_idx = c->total_sfds; /* default index is a invalid value */
307
308 c->ctnwrite = false;
309 c->readval = false;
310 c->change_sfd = false;
311
312 c->precmd.cmd = c->currcmd.cmd = CMD_NULL;
313 c->precmd.isfinish = true; /* default the previous command finished */
314 c->currcmd.isfinish = false;
315 c->precmd.retstat = c->currcmd.retstat = MCD_FAILURE;
316 c->precmd.key_prefix = c->currcmd.key_prefix = 0;
317
318 c->mlget_task.mlget_num = 0;
319 c->mlget_task.value_index = -1; /* default invalid value */
320
321 if (ms_setting.binary_prot_) {
322 c->protocol = binary_prot;
323 } else {
324 c->protocol = ascii_prot;
325 }
326
327 /* initialize udp */
328 if (ms_conn_udp_init(c, is_udp)) {
329 return -1;
330 }
331
332 /* initialize task */
333 ms_task_init(c);
334
335 if (!(ms_setting.facebook_test && is_udp)) {
336 atomic_add_32(&ms_stats.active_conns, 1);
337 }
338
339 return EXIT_SUCCESS;
340 } /* ms_conn_init */
341
342 /**
343 * when doing 100% get operation, it could preset some objects
344 * to warmup the server. this function is used to initialize the
345 * number of the objects to preset.
346 *
347 * @param c, pointer of the concurrency
348 */
349 static void ms_warmup_num_init(ms_conn_t *c) {
350 /* no set operation, preset all the items in the window */
351 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) {
352 c->warmup_num = c->win_size;
353 c->remain_warmup_num = c->warmup_num;
354 } else {
355 c->warmup_num = 0;
356 c->remain_warmup_num = c->warmup_num;
357 }
358 } /* ms_warmup_num_init */
359
360 /**
361 * each connection has an item window, this function initialize
362 * the window. The window is used to generate task.
363 *
364 * @param c, pointer of the concurrency
365 *
366 * @return int, if success, return EXIT_SUCCESS, else return -1
367 */
368 static int ms_item_win_init(ms_conn_t *c) {
369 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
370 int exp_cnt = 0;
371
372 c->win_size = (int) ms_setting.win_size;
373 c->set_cursor = 0;
374 c->exec_num = ms_thread->thread_ctx->exec_num_perconn;
375 c->remain_exec_num = c->exec_num;
376
377 c->item_win = (ms_task_item_t *) malloc(sizeof(ms_task_item_t) * (size_t) c->win_size);
378 if (c->item_win == NULL) {
379 fprintf(stderr, "Can't allocate task item array for conn.\n");
380 return -1;
381 }
382 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t) c->win_size);
383
384 for (int i = 0; i < c->win_size; i++) {
385 c->item_win[i].key_size = (int) ms_setting.distr[i].key_size;
386 c->item_win[i].key_prefix = ms_get_key_prefix();
387 c->item_win[i].key_suffix_offset = ms_setting.distr[i].key_offset;
388 c->item_win[i].value_size = (int) ms_setting.distr[i].value_size;
389 c->item_win[i].value_offset = INVALID_OFFSET; /* default in invalid offset */
390 c->item_win[i].client_time = 0;
391
392 /* set expire time base on the proportion */
393 if (exp_cnt < ms_setting.exp_ver_per * i) {
394 c->item_win[i].exp_time = FIXED_EXPIRE_TIME;
395 exp_cnt++;
396 } else {
397 c->item_win[i].exp_time = 0;
398 }
399 }
400
401 ms_warmup_num_init(c);
402
403 return EXIT_SUCCESS;
404 } /* ms_item_win_init */
405
406 /**
407 * each connection structure can include one or more sock
408 * handlers. this function create these socks and connect the
409 * server(s).
410 *
411 * @param c, pointer of the concurrency
412 *
413 * @return int, if success, return EXIT_SUCCESS, else return -1
414 */
415 static int ms_conn_sock_init(ms_conn_t *c) {
416 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
417 uint32_t i;
418 int ret_sfd;
419 uint32_t srv_idx = 0;
420
421 assert(c);
422 assert(c->tcpsfd);
423
424 for (i = 0; i < c->total_sfds; i++) {
425 ret_sfd = 0;
426 if (ms_setting.rep_write_srv > 0) {
427 /* for replication, each connection need connect all the server */
428 srv_idx = i % ms_setting.srv_cnt;
429 } else {
430 /* all the connections in a thread connects the same server */
431 srv_idx = ms_thread->thread_ctx->srv_idx;
432 }
433
434 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
435 ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &ret_sfd)
436 )
437 {
438 break;
439 }
440
441 if (i == 0) {
442 c->sfd = ret_sfd;
443 }
444
445 if (!ms_setting.udp) {
446 c->tcpsfd[i] = ret_sfd;
447 }
448
449 c->alive_sfds++;
450 }
451
452 /* initialize udp sock handler if necessary */
453 if (ms_setting.facebook_test) {
454 ret_sfd = 0;
455 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
456 ms_setting.servers[srv_idx].srv_port, true, &ret_sfd)
457 )
458 {
459 c->udpsfd = 0;
460 } else {
461 c->udpsfd = ret_sfd;
462 }
463 }
464
465 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0))) {
466 if (ms_setting.udp) {
467 close(c->sfd);
468 } else {
469 for (uint32_t j = 0; j < i; j++) {
470 close(c->tcpsfd[j]);
471 }
472 }
473
474 if (c->udpsfd) {
475 close(c->udpsfd);
476 }
477
478 return -1;
479 }
480
481 return EXIT_SUCCESS;
482 } /* ms_conn_sock_init */
483
484 /**
485 * each connection is managed by libevent, this function
486 * initialize the event of the connection structure.
487 *
488 * @param c, pointer of the concurrency
489 *
490 * @return int, if success, return EXIT_SUCCESS, else return -1
491 */
492 static int ms_conn_event_init(ms_conn_t *c) {
493 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
494 short event_flags = EV_WRITE | EV_PERSIST;
495
496 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *) c);
497 event_base_set(ms_thread->base, &c->event);
498 c->ev_flags = event_flags;
499
500 if (event_add(&c->event, NULL) == -1) {
501 return -1;
502 }
503
504 return EXIT_SUCCESS;
505 } /* ms_conn_event_init */
506
507 /**
508 * setup a connection, each connection structure of each
509 * thread must call this function to initialize.
510 *
511 * @param c, pointer of the concurrency
512 *
513 * @return int, if success, return EXIT_SUCCESS, else return -1
514 */
515 int ms_setup_conn(ms_conn_t *c) {
516 if (ms_item_win_init(c)) {
517 return -1;
518 }
519
520 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp)) {
521 return -1;
522 }
523
524 if (ms_conn_sock_init(c)) {
525 return -1;
526 }
527
528 if (ms_conn_event_init(c)) {
529 return -1;
530 }
531
532 return EXIT_SUCCESS;
533 } /* ms_setup_conn */
534
535 /**
536 * Frees a connection.
537 *
538 * @param c, pointer of the concurrency
539 */
540 void ms_conn_free(ms_conn_t *c) {
541 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
542 if (c) {
543 if (c->hdrbuf)
544 free(c->hdrbuf);
545 if (c->msglist)
546 free(c->msglist);
547 if (c->rbuf)
548 free(c->rbuf);
549 if (c->wbuf)
550 free(c->wbuf);
551 if (c->iov)
552 free(c->iov);
553 if (c->mlget_task.mlget_item)
554 free(c->mlget_task.mlget_item);
555 if (c->rudpbuf)
556 free(c->rudpbuf);
557 if (c->udppkt)
558 free(c->udppkt);
559 if (c->item_win)
560 free(c->item_win);
561 if (c->tcpsfd)
562 free(c->tcpsfd);
563
564 if (--ms_thread->nactive_conn == 0) {
565 free(ms_thread->conn);
566 }
567 }
568 } /* ms_conn_free */
569
570 /**
571 * close a connection
572 *
573 * @param c, pointer of the concurrency
574 */
575 static void ms_conn_close(ms_conn_t *c) {
576 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
577 assert(c);
578
579 /* delete the event, the socket and the connection */
580 event_del(&c->event);
581
582 for (uint32_t i = 0; i < c->total_sfds; i++) {
583 if (c->tcpsfd[i] > 0) {
584 close(c->tcpsfd[i]);
585 }
586 }
587 c->sfd = 0;
588
589 if (ms_setting.facebook_test) {
590 close(c->udpsfd);
591 }
592
593 atomic_dec_32(&ms_stats.active_conns);
594
595 ms_conn_free(c);
596
597 if (ms_setting.run_time == 0) {
598 pthread_mutex_lock(&ms_global.run_lock.lock);
599 ms_global.run_lock.count++;
600 pthread_cond_signal(&ms_global.run_lock.cond);
601 pthread_mutex_unlock(&ms_global.run_lock.lock);
602 }
603
604 if (ms_thread->nactive_conn == 0) {
605 event_base_loopbreak(ms_thread->base);
606 }
607 } /* ms_conn_close */
608
609 /**
610 * create a new sock
611 *
612 * @param ai, server address information
613 *
614 * @return int, if success, return EXIT_SUCCESS, else return -1
615 */
616 static int ms_new_socket(struct addrinfo *ai) {
617 int sfd;
618
619 if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
620 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
621 return -1;
622 }
623
624 return sfd;
625 } /* ms_new_socket */
626
627 /**
628 * Sets a socket's send buffer size to the maximum allowed by the system.
629 *
630 * @param sfd, file descriptor of socket
631 */
632 static void ms_maximize_sndbuf(const int sfd) {
633 socklen_t intsize = sizeof(int);
634 unsigned int last_good = 0;
635 unsigned int min, max, avg;
636 unsigned int old_size;
637
638 /* Start with the default size. */
639 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize)) {
640 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
641 return;
642 }
643
644 /* Binary-search for the real maximum. */
645 min = old_size;
646 max = MAX_SENDBUF_SIZE;
647
648 while (min <= max) {
649 avg = ((unsigned int) (min + max)) / 2;
650 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *) &avg, intsize) == 0) {
651 last_good = avg;
652 min = avg + 1;
653 } else {
654 max = avg - 1;
655 }
656 }
657 (void) last_good;
658 } /* ms_maximize_sndbuf */
659
660 /**
661 * socket connects the server
662 *
663 * @param c, pointer of the concurrency
664 * @param srv_host_name, the host name of the server
665 * @param srv_port, port of server
666 * @param is_udp, whether it's udp
667 * @param ret_sfd, the connected socket file descriptor
668 *
669 * @return int, if success, return EXIT_SUCCESS, else return -1
670 */
671 static int ms_network_connect(ms_conn_t *c, char *srv_host_name, const int srv_port,
672 const bool is_udp, int *ret_sfd) {
673 int sfd;
674 struct linger ling = {0, 0};
675 struct addrinfo *ai;
676 struct addrinfo *next;
677 struct addrinfo hints;
678 char port_buf[NI_MAXSERV];
679 int error;
680 int success = 0;
681
682 int flags = 1;
683
684 /*
685 * the memset call clears nonstandard fields in some impementations
686 * that otherwise mess things up.
687 */
688 memset(&hints, 0, sizeof(hints));
689 #ifdef AI_ADDRCONFIG
690 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
691 #else
692 hints.ai_flags = AI_PASSIVE;
693 #endif /* AI_ADDRCONFIG */
694 if (is_udp) {
695 hints.ai_protocol = IPPROTO_UDP;
696 hints.ai_socktype = SOCK_DGRAM;
697 hints.ai_family = AF_INET; /* This left here because of issues with OSX 10.5 */
698 } else {
699 hints.ai_family = AF_UNSPEC;
700 hints.ai_protocol = IPPROTO_TCP;
701 hints.ai_socktype = SOCK_STREAM;
702 }
703
704 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
705 error = getaddrinfo(srv_host_name, port_buf, &hints, &ai);
706 if (error) {
707 if (error != EAI_SYSTEM)
708 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
709 else
710 perror("getaddrinfo()");
711
712 return -1;
713 }
714
715 for (next = ai; next; next = next->ai_next) {
716 if ((sfd = ms_new_socket(next)) == -1) {
717 freeaddrinfo(ai);
718 return -1;
719 }
720
721 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *) &flags, sizeof(flags));
722 if (is_udp) {
723 ms_maximize_sndbuf(sfd);
724 } else {
725 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &flags, sizeof(flags));
726 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *) &ling, sizeof(ling));
727 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *) &flags, sizeof(flags));
728 }
729
730 if (is_udp) {
731 c->srv_recv_addr_size = sizeof(struct sockaddr);
732 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
733 } else {
734 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) {
735 close(sfd);
736 freeaddrinfo(ai);
737 return -1;
738 }
739 }
740
741 if (((flags = fcntl(sfd, F_GETFL, 0)) < 0) || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)) {
742 fprintf(stderr, "setting O_NONBLOCK\n");
743 close(sfd);
744 freeaddrinfo(ai);
745 return -1;
746 }
747
748 if (ret_sfd) {
749 *ret_sfd = sfd;
750 }
751
752 success++;
753 }
754
755 freeaddrinfo(ai);
756
757 /* Return zero if we detected no errors in starting up connections */
758 return success == 0;
759 } /* ms_network_connect */
760
761 /**
762 * reconnect a disconnected sock
763 *
764 * @param c, pointer of the concurrency
765 *
766 * @return int, if success, return EXIT_SUCCESS, else return -1
767 */
768 static int ms_reconn(ms_conn_t *c) {
769 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
770 uint32_t srv_idx = 0;
771 uint32_t srv_conn_cnt = 0;
772
773 if (ms_setting.rep_write_srv > 0) {
774 srv_idx = c->cur_idx % ms_setting.srv_cnt;
775 srv_conn_cnt = ms_setting.sock_per_conn * ms_setting.nconns;
776 } else {
777 srv_idx = ms_thread->thread_ctx->srv_idx;
778 srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt;
779 }
780
781 /* close the old socket handler */
782 close(c->sfd);
783 c->tcpsfd[c->cur_idx] = 0;
784
785 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) % srv_conn_cnt == 0) {
786 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
787 fprintf(stderr, "Server %s:%d disconnect\n", ms_setting.servers[srv_idx].srv_host_name,
788 ms_setting.servers[srv_idx].srv_port);
789 }
790
791 if (ms_setting.rep_write_srv > 0) {
792 uint32_t i = 0;
793
794 for (i = 0; i < c->total_sfds; i++) {
795 if (c->tcpsfd[i]) {
796 break;
797 }
798 }
799
800 /* all socks disconnect */
801 if (i == c->total_sfds) {
802 return -1;
803 }
804 } else {
805 do {
806 /* reconnect success, break the loop */
807 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
808 ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &c->sfd)
809 == 0)
810 {
811 c->tcpsfd[c->cur_idx] = c->sfd;
812 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) % (uint32_t) srv_conn_cnt
813 == 0) {
814 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
815 int reconn_time = (int) (ms_setting.servers[srv_idx].reconn_time.tv_sec
816 - ms_setting.servers[srv_idx].disconn_time.tv_sec);
817 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
818 ms_setting.servers[srv_idx].srv_host_name, ms_setting.servers[srv_idx].srv_port,
819 reconn_time);
820 }
821 break;
822 }
823
824 if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) {
825 /* wait a second and reconnect */
826 sleep(1);
827 }
828 } while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
829 }
830
831 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) {
832 c->sfd = 0;
833 c->alive_sfds--;
834 }
835
836 return EXIT_SUCCESS;
837 } /* ms_reconn */
838
839 /**
840 * reconnect several disconnected socks in the connection
841 * structure, the ever-1-second timer of the thread will check
842 * whether some socks in the connections disconnect. if
843 * disconnect, reconnect the sock.
844 *
845 * @param c, pointer of the concurrency
846 *
847 * @return int, if success, return EXIT_SUCCESS, else return -1
848 */
849 int ms_reconn_socks(ms_conn_t *c) {
850 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
851 uint32_t srv_idx = 0;
852 int ret_sfd = 0;
853 uint32_t srv_conn_cnt = 0;
854 struct timeval cur_time;
855
856 assert(c);
857
858 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds)) {
859 return EXIT_SUCCESS;
860 }
861
862 for (uint32_t i = 0; i < c->total_sfds; i++) {
863 if (c->tcpsfd[i] == 0) {
864 gettimeofday(&cur_time, NULL);
865
866 /**
867 * For failover test of replication, reconnect the socks after
868 * it disconnects more than 5 seconds, Otherwise memslap will
869 * block at connect() function and the work threads can't work
870 * in this interval.
871 */
872 if (cur_time.tv_sec - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) {
873 break;
874 }
875
876 if (ms_setting.rep_write_srv > 0) {
877 srv_idx = i % ms_setting.srv_cnt;
878 srv_conn_cnt = ms_setting.sock_per_conn * ms_setting.nconns;
879 } else {
880 srv_idx = ms_thread->thread_ctx->srv_idx;
881 srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt;
882 }
883
884 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
885 ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &ret_sfd)
886 == 0)
887 {
888 c->tcpsfd[i] = ret_sfd;
889 c->alive_sfds++;
890
891 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) % (uint32_t) srv_conn_cnt
892 == 0) {
893 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
894 int reconn_time = (int) (ms_setting.servers[srv_idx].reconn_time.tv_sec
895 - ms_setting.servers[srv_idx].disconn_time.tv_sec);
896 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
897 ms_setting.servers[srv_idx].srv_host_name, ms_setting.servers[srv_idx].srv_port,
898 reconn_time);
899 }
900 }
901 }
902 }
903
904 return EXIT_SUCCESS;
905 } /* ms_reconn_socks */
906
907 /**
908 * Tokenize the command string by replacing whitespace with '\0' and update
909 * the token array tokens with pointer to start of each token and length.
910 * Returns total number of tokens. The last valid token is the terminal
911 * token (value points to the first unprocessed character of the string and
912 * length zero).
913 *
914 * Usage example:
915 *
916 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
917 * for(int ix = 0; tokens[ix].length; ix++) {
918 * ...
919 * }
920 * ncommand = tokens[ix].value - command;
921 * command = tokens[ix].value;
922 * }
923 *
924 * @param command, the command string to token
925 * @param tokens, array to store tokens
926 * @param max_tokens, maximum tokens number
927 *
928 * @return int, the number of tokens
929 */
930 static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens) {
931 char *s, *e;
932 int ntokens = 0;
933
934 assert(command && tokens && max_tokens > 1);
935
936 for (s = e = command; ntokens < max_tokens - 1; ++e) {
937 if (*e == ' ') {
938 if (s != e) {
939 tokens[ntokens].value = s;
940 tokens[ntokens].length = (size_t)(e - s);
941 ntokens++;
942 *e = '\0';
943 }
944 s = e + 1;
945 } else if (*e == '\0') {
946 if (s != e) {
947 tokens[ntokens].value = s;
948 tokens[ntokens].length = (size_t)(e - s);
949 ntokens++;
950 }
951
952 break; /* string end */
953 }
954 }
955
956 return ntokens;
957 } /* ms_tokenize_command */
958
959 /**
960 * parse the response of server.
961 *
962 * @param c, pointer of the concurrency
963 * @param command, the string responded by server
964 *
965 * @return int, if the command completed return EXIT_SUCCESS, else return
966 * -1
967 */
968 static int ms_ascii_process_line(ms_conn_t *c, char *command) {
969 int ret = 0;
970 int64_t value_len;
971 char *buffer = command;
972
973 assert(c);
974
975 /**
976 * for command get, we store the returned value into local buffer
977 * then continue in ms_complete_nread().
978 */
979
980 switch (buffer[0]) {
981 case 'V': /* VALUE || VERSION */
982 if (buffer[1] == 'A') /* VALUE */ {
983 token_t tokens[MAX_TOKENS];
984 ms_tokenize_command(command, tokens, MAX_TOKENS);
985 errno = 0;
986 value_len = strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
987 if (errno) {
988 printf("<%d ERROR %s\n", c->sfd, strerror(errno));
989 }
990 memcpy(&c->currcmd.key_prefix, tokens[KEY_TOKEN].value, sizeof(c->currcmd.key_prefix));
991
992 /*
993 * We read the \r\n into the string since not doing so is more
994 * cycles then the waster of memory to do so.
995 *
996 * We are null terminating through, which will most likely make
997 * some people lazy about using the return length.
998 */
999 c->rvbytes = (int) (value_len + 2);
1000 c->readval = true;
1001 ret = -1;
1002 }
1003
1004 break;
1005
1006 case 'O': /* OK */
1007 c->currcmd.retstat = MCD_SUCCESS;
1008 break;
1009
1010 case 'S': /* STORED STATS SERVER_ERROR */
1011 if (buffer[2] == 'A') /* STORED STATS */ { /* STATS*/
1012 c->currcmd.retstat = MCD_STAT;
1013 } else if (buffer[1] == 'E') {
1014 /* SERVER_ERROR */
1015 printf("<%d %s\n", c->sfd, buffer);
1016
1017 c->currcmd.retstat = MCD_SERVER_ERROR;
1018 } else if (buffer[1] == 'T') {
1019 /* STORED */
1020 c->currcmd.retstat = MCD_STORED;
1021 } else {
1022 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1023 }
1024 break;
1025
1026 case 'D': /* DELETED DATA */
1027 if (buffer[1] == 'E') {
1028 c->currcmd.retstat = MCD_DELETED;
1029 } else {
1030 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1031 }
1032
1033 break;
1034
1035 case 'N': /* NOT_FOUND NOT_STORED*/
1036 if (buffer[4] == 'F') {
1037 c->currcmd.retstat = MCD_NOTFOUND;
1038 } else if (buffer[4] == 'S') {
1039 printf("<%d %s\n", c->sfd, buffer);
1040 c->currcmd.retstat = MCD_NOTSTORED;
1041 } else {
1042 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1043 }
1044 break;
1045
1046 case 'E': /* PROTOCOL ERROR or END */
1047 if (buffer[1] == 'N') {
1048 /* END */
1049 c->currcmd.retstat = MCD_END;
1050 } else if (buffer[1] == 'R') {
1051 printf("<%d ERROR\n", c->sfd);
1052 c->currcmd.retstat = MCD_PROTOCOL_ERROR;
1053 } else if (buffer[1] == 'X') {
1054 c->currcmd.retstat = MCD_DATA_EXISTS;
1055 printf("<%d %s\n", c->sfd, buffer);
1056 } else {
1057 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1058 }
1059 break;
1060
1061 case 'C': /* CLIENT ERROR */
1062 printf("<%d %s\n", c->sfd, buffer);
1063 c->currcmd.retstat = MCD_CLIENT_ERROR;
1064 break;
1065
1066 default:
1067 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1068 break;
1069 } /* switch */
1070
1071 return ret;
1072 } /* ms_ascii_process_line */
1073
1074 /**
1075 * after one operation completes, reset the concurrency
1076 *
1077 * @param c, pointer of the concurrency
1078 * @param timeout, whether it's timeout
1079 */
1080 void ms_reset_conn(ms_conn_t *c, bool timeout) {
1081 assert(c);
1082
1083 if (c->udp) {
1084 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) {
1085 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t) c->packets);
1086 }
1087
1088 c->packets = 0;
1089 c->recvpkt = 0;
1090 c->pktcurr = 0;
1091 c->ordcurr = 0;
1092 c->rudpbytes = 0;
1093 }
1094 c->currcmd.isfinish = true;
1095 c->ctnwrite = false;
1096 c->rbytes = 0;
1097 c->rcurr = c->rbuf;
1098 c->msgcurr = 0;
1099 c->msgused = 0;
1100 c->iovused = 0;
1101 ms_conn_set_state(c, conn_write);
1102 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1103
1104 if (timeout) {
1105 ms_drive_machine(c);
1106 }
1107 } /* ms_reset_conn */
1108
1109 /**
1110 * if we have a complete line in the buffer, process it.
1111 *
1112 * @param c, pointer of the concurrency
1113 *
1114 * @return int, if success, return EXIT_SUCCESS, else return -1
1115 */
1116 static int ms_try_read_line(ms_conn_t *c) {
1117 if (c->protocol == binary_prot) {
1118 /* Do we have the complete packet header? */
1119 if ((uint64_t) c->rbytes < sizeof(c->binary_header)) {
1120 /* need more data! */
1121 return EXIT_SUCCESS;
1122 } else {
1123 #ifdef NEED_ALIGN
1124 if (((long) (c->rcurr)) % 8) {
1125 /* must realign input buffer */
1126 memmove(c->rbuf, c->rcurr, c->rbytes);
1127 c->rcurr = c->rbuf;
1128 if (settings.verbose) {
1129 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1130 }
1131 }
1132 #endif
1133 protocol_binary_response_header *rsp;
1134 rsp = (protocol_binary_response_header *) c->rcurr;
1135
1136 c->binary_header = *rsp;
1137 c->binary_header.response.extlen = rsp->response.extlen;
1138 c->binary_header.response.keylen = ntohs(rsp->response.keylen);
1139 c->binary_header.response.bodylen = ntohl(rsp->response.bodylen);
1140 c->binary_header.response.status = ntohs(rsp->response.status);
1141
1142 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) {
1143 fprintf(stderr, "Invalid magic: %x\n", c->binary_header.response.magic);
1144 ms_conn_set_state(c, conn_closing);
1145 return EXIT_SUCCESS;
1146 }
1147
1148 /* process this complete response */
1149 if (ms_bin_process_response(c) == 0) {
1150 /* current operation completed */
1151 ms_reset_conn(c, false);
1152 return -1;
1153 } else {
1154 c->rbytes -= (int32_t) sizeof(c->binary_header);
1155 c->rcurr += sizeof(c->binary_header);
1156 }
1157 }
1158 } else {
1159 char *el, *cont;
1160
1161 assert(c);
1162 assert(c->rcurr <= (c->rbuf + c->rsize));
1163
1164 if (c->rbytes == 0)
1165 return EXIT_SUCCESS;
1166
1167 el = memchr(c->rcurr, '\n', (size_t) c->rbytes);
1168 if (!el)
1169 return EXIT_SUCCESS;
1170
1171 cont = el + 1;
1172 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r')) {
1173 el--;
1174 }
1175 *el = '\0';
1176
1177 assert(cont <= (c->rcurr + c->rbytes));
1178
1179 /* process this complete line */
1180 if (ms_ascii_process_line(c, c->rcurr) == 0) {
1181 /* current operation completed */
1182 ms_reset_conn(c, false);
1183 return -1;
1184 } else {
1185 /* current operation didn't complete */
1186 c->rbytes -= (int32_t)(cont - c->rcurr);
1187 c->rcurr = cont;
1188 }
1189
1190 assert(c->rcurr <= (c->rbuf + c->rsize));
1191 }
1192
1193 return -1;
1194 } /* ms_try_read_line */
1195
1196 /**
1197 * because the packet of UDP can't ensure the order, the
1198 * function is used to sort the received udp packet.
1199 *
1200 * @param c, pointer of the concurrency
1201 * @param buf, the buffer to store the ordered packages data
1202 * @param rbytes, the maximum capacity of the buffer
1203 *
1204 * @return int, if success, return the copy bytes, else return
1205 * -1
1206 */
1207 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes) {
1208 int len = 0;
1209 int wbytes = 0;
1210 uint16_t req_id = 0;
1211 uint16_t seq_num = 0;
1212 uint16_t packets = 0;
1213 unsigned char *header = NULL;
1214
1215 /* no enough data */
1216 assert(c);
1217 assert(buf);
1218 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1219
1220 /* calculate received packets count */
1221 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) {
1222 /* the last packet has some data */
1223 c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1224 } else {
1225 c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1226 }
1227
1228 /* get the total packets count if necessary */
1229 if (c->packets == 0) {
1230 c->packets = HEADER_TO_PACKETS((unsigned char *) c->rudpbuf);
1231 }
1232
1233 /* build the ordered packet array */
1234 for (int i = c->pktcurr; i < c->recvpkt; i++) {
1235 header = (unsigned char *) c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1236 req_id = (uint16_t) HEADER_TO_REQID(header);
1237 assert(req_id == c->request_id % (1 << 16));
1238
1239 packets = (uint16_t) HEADER_TO_PACKETS(header);
1240 assert(c->packets == HEADER_TO_PACKETS(header));
1241
1242 seq_num = (uint16_t) HEADER_TO_SEQNUM(header);
1243 c->udppkt[seq_num].header = header;
1244 c->udppkt[seq_num].data = (char *) header + UDP_HEADER_SIZE;
1245
1246 if (i == c->recvpkt - 1) {
1247 /* last received packet */
1248 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) {
1249 c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1250 c->pktcurr++;
1251 } else {
1252 c->udppkt[seq_num].rbytes = c->rudpbytes % UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1253 }
1254 } else {
1255 c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1256 c->pktcurr++;
1257 }
1258 }
1259
1260 for (int i = c->ordcurr; i < c->recvpkt; i++) {
1261 /* there is some data to copy */
1262 if ((c->udppkt[i].data) && (c->udppkt[i].copybytes < c->udppkt[i].rbytes)) {
1263 header = c->udppkt[i].header;
1264 len = c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1265 if (len > rbytes - wbytes) {
1266 len = rbytes - wbytes;
1267 }
1268
1269 assert(len <= rbytes - wbytes);
1270 assert(i == HEADER_TO_SEQNUM(header));
1271
1272 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, (size_t) len);
1273 wbytes += len;
1274 c->udppkt[i].copybytes += len;
1275
1276 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1277 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1278 {
1279 /* finish copying all the data of this packet, next */
1280 c->ordcurr++;
1281 }
1282
1283 /* last received packet, and finish copying all the data */
1284 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1285 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1286 {
1287 break;
1288 }
1289
1290 /* no space to copy data */
1291 if (wbytes >= rbytes) {
1292 break;
1293 }
1294
1295 /* it doesn't finish reading all the data of the packet from network */
1296 if ((i != c->recvpkt - 1) && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) {
1297 break;
1298 }
1299 } else {
1300 /* no data to copy */
1301 break;
1302 }
1303 }
1304 (void) packets;
1305
1306 return wbytes == 0 ? -1 : wbytes;
1307 } /* ms_sort_udp_packet */
1308
1309 /**
1310 * encapsulate upd read like tcp read
1311 *
1312 * @param c, pointer of the concurrency
1313 * @param buf, read buffer
1314 * @param len, length to read
1315 *
1316 * @return int, if success, return the read bytes, else return
1317 * -1
1318 */
1319 static int ms_udp_read(ms_conn_t *c, char *buf, int len) {
1320 int res = 0;
1321 int avail = 0;
1322 int rbytes = 0;
1323 int copybytes = 0;
1324
1325 assert(c->udp);
1326
1327 while (1) {
1328 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) {
1329 char *new_rbuf = realloc(c->rudpbuf, (size_t) c->rudpsize * 2);
1330 if (!new_rbuf) {
1331 fprintf(stderr, "Couldn't realloc input buffer.\n");
1332 c->rudpbytes = 0; /* ignore what we read */
1333 return -1;
1334 }
1335 c->rudpbuf = new_rbuf;
1336 c->rudpsize *= 2;
1337 }
1338
1339 avail = c->rudpsize - c->rudpbytes;
1340 /* UDP each time read a packet, 1400 bytes */
1341 res = (int) read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t) avail);
1342
1343 if (res > 0) {
1344 atomic_add_size(&ms_stats.bytes_read, res);
1345 c->rudpbytes += res;
1346 rbytes += res;
1347 if (res == avail) {
1348 continue;
1349 } else {
1350 break;
1351 }
1352 }
1353
1354 if (res == 0) {
1355 /* "connection" closed */
1356 return res;
1357 }
1358
1359 if (res == -1) {
1360 /* no data to read */
1361 return res;
1362 }
1363 }
1364
1365 /* copy data to read buffer */
1366 if (rbytes > 0) {
1367 copybytes = ms_sort_udp_packet(c, buf, len);
1368 }
1369
1370 if (copybytes == -1) {
1371 atomic_add_size(&ms_stats.pkt_disorder, 1);
1372 }
1373
1374 return copybytes;
1375 } /* ms_udp_read */
1376
1377 /*
1378 * read from network as much as we can, handle buffer overflow and connection
1379 * close.
1380 * before reading, move the remaining incomplete fragment of a command
1381 * (if any) to the beginning of the buffer.
1382 * return EXIT_SUCCESS if there's nothing to read on the first read.
1383 */
1384
1385 /**
1386 * read from network as much as we can, handle buffer overflow and connection
1387 * close. before reading, move the remaining incomplete fragment of a command
1388 * (if any) to the beginning of the buffer.
1389 *
1390 * @param c, pointer of the concurrency
1391 *
1392 * @return int,
1393 * return EXIT_SUCCESS if there's nothing to read on the first read.
1394 * return EXIT_FAILURE if get data
1395 * return -1 if error happens
1396 */
1397 static int ms_try_read_network(ms_conn_t *c) {
1398 int gotdata = 0;
1399 int res;
1400 int64_t avail;
1401
1402 assert(c);
1403
1404 if ((c->rcurr != c->rbuf)
1405 && (!c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1406 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1407 {
1408 if (c->rbytes) /* otherwise there's nothing to copy */
1409 memmove(c->rbuf, c->rcurr, (size_t) c->rbytes);
1410 c->rcurr = c->rbuf;
1411 }
1412
1413 while (1) {
1414 if (c->rbytes >= c->rsize) {
1415 char *new_rbuf = realloc(c->rbuf, (size_t) c->rsize * 2);
1416 if (!new_rbuf) {
1417 fprintf(stderr, "Couldn't realloc input buffer.\n");
1418 c->rbytes = 0; /* ignore what we read */
1419 return -1;
1420 }
1421 c->rcurr = c->rbuf = new_rbuf;
1422 c->rsize *= 2;
1423 }
1424
1425 avail = c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1426 if (avail == 0) {
1427 break;
1428 }
1429
1430 if (c->udp) {
1431 res = (int32_t) ms_udp_read(c, c->rcurr + c->rbytes, (int32_t) avail);
1432 } else {
1433 res = (int) read(c->sfd, c->rcurr + c->rbytes, (size_t) avail);
1434 }
1435
1436 if (res > 0) {
1437 if (!c->udp) {
1438 atomic_add_size(&ms_stats.bytes_read, res);
1439 }
1440 gotdata = 1;
1441 c->rbytes += res;
1442 if (res == avail) {
1443 continue;
1444 } else {
1445 break;
1446 }
1447 }
1448 if (res == 0) {
1449 /* connection closed */
1450 ms_conn_set_state(c, conn_closing);
1451 return -1;
1452 }
1453 if (res == -1) {
1454 if ((errno == EAGAIN) || (EAGAIN != EWOULDBLOCK && errno == EWOULDBLOCK))
1455 break;
1456 /* Should close on unhandled errors. */
1457 ms_conn_set_state(c, conn_closing);
1458 return -1;
1459 }
1460 }
1461
1462 return gotdata;
1463 } /* ms_try_read_network */
1464
1465 /**
1466 * after get the object from server, verify the value if
1467 * necessary.
1468 *
1469 * @param c, pointer of the concurrency
1470 * @param mlget_item, pointer of mulit-get task item structure
1471 * @param value, received value string
1472 * @param vlen, received value string length
1473 */
1474 static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, char *value, int vlen) {
1475 if (c->curr_task.verify) {
1476 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1477 char *orignval = &ms_setting.char_block[c->curr_task.item->value_offset];
1478 char *orignkey = &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1479
1480 /* verify expire time if necessary */
1481 if (c->curr_task.item->exp_time > 0) {
1482 struct timeval curr_time;
1483 gettimeofday(&curr_time, NULL);
1484
1485 /* object expired but get it now */
1486 if (curr_time.tv_sec - c->curr_task.item->client_time
1487 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1488 {
1489 atomic_add_size(&ms_stats.exp_get, 1);
1490
1491 if (ms_setting.verbose) {
1492 char set_time[64];
1493 char cur_time[64];
1494 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&c->curr_task.item->client_time));
1495 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&curr_time.tv_sec));
1496 fprintf(stderr,
1497 "\n<%d expire time verification failed, "
1498 "object expired but get it now\n"
1499 "\tkey len: %d\n"
1500 "\tkey: %" PRIx64 " %.*s\n"
1501 "\tset time: %s current time: %s "
1502 "diff time: %d expire time: %d\n"
1503 "\texpected data: \n"
1504 "\treceived data len: %d\n"
1505 "\treceived data: %.*s\n",
1506 c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix,
1507 c->curr_task.item->key_size - (int) KEY_PREFIX_SIZE, orignkey, set_time, cur_time,
1508 (int) (curr_time.tv_sec - c->curr_task.item->client_time),
1509 c->curr_task.item->exp_time, vlen, vlen, value);
1510 fflush(stderr);
1511 }
1512 }
1513 } else {
1514 if ((c->curr_task.item->value_size != vlen) || (memcmp(orignval, value, (size_t) vlen)))
1515 {
1516 atomic_add_size(&ms_stats.vef_failed, 1);
1517
1518 if (ms_setting.verbose) {
1519 fprintf(stderr,
1520 "\n<%d data verification failed\n"
1521 "\tkey len: %d\n"
1522 "\tkey: %" PRIx64 " %.*s\n"
1523 "\texpected data len: %d\n"
1524 "\texpected data: %.*s\n"
1525 "\treceived data len: %d\n"
1526 "\treceived data: %.*s\n",
1527 c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix,
1528 c->curr_task.item->key_size - (int) KEY_PREFIX_SIZE, orignkey,
1529 c->curr_task.item->value_size, c->curr_task.item->value_size, orignval, vlen,
1530 vlen, value);
1531 fflush(stderr);
1532 }
1533 }
1534 }
1535
1536 c->curr_task.finish_verify = true;
1537
1538 if (mlget_item) {
1539 mlget_item->finish_verify = true;
1540 }
1541 }
1542 } /* ms_verify_value */
1543
1544 /**
1545 * For ASCII protocol, after store the data into the local
1546 * buffer, run this function to handle the data.
1547 *
1548 * @param c, pointer of the concurrency
1549 */
1550 static void ms_ascii_complete_nread(ms_conn_t *c) {
1551 assert(c);
1552 assert(c->rbytes >= c->rvbytes);
1553 assert(c->protocol == ascii_prot);
1554 if (c->rvbytes > 2) {
1555 assert(c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1556 }
1557
1558 /* multi-get */
1559 ms_mlget_task_item_t *mlget_item = NULL;
1560 if (((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1561 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1562 {
1563 c->mlget_task.value_index++;
1564 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index];
1565
1566 if (mlget_item->item->key_prefix == c->currcmd.key_prefix) {
1567 c->curr_task.item = mlget_item->item;
1568 c->curr_task.verify = mlget_item->verify;
1569 c->curr_task.finish_verify = mlget_item->finish_verify;
1570 mlget_item->get_miss = false;
1571 } else {
1572 /* Try to find the task item in multi-get task array */
1573 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
1574 mlget_item = &c->mlget_task.mlget_item[i];
1575 if (mlget_item->item->key_prefix == c->currcmd.key_prefix) {
1576 c->curr_task.item = mlget_item->item;
1577 c->curr_task.verify = mlget_item->verify;
1578 c->curr_task.finish_verify = mlget_item->finish_verify;
1579 mlget_item->get_miss = false;
1580
1581 break;
1582 }
1583 }
1584 }
1585 }
1586
1587 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1588
1589 c->curr_task.get_miss = false;
1590 c->rbytes -= c->rvbytes;
1591 c->rcurr = c->rcurr + c->rvbytes;
1592 assert(c->rcurr <= (c->rbuf + c->rsize));
1593 c->readval = false;
1594 c->rvbytes = 0;
1595 } /* ms_ascii_complete_nread */
1596
1597 /**
1598 * For binary protocol, after store the data into the local
1599 * buffer, run this function to handle the data.
1600 *
1601 * @param c, pointer of the concurrency
1602 */
1603 static void ms_bin_complete_nread(ms_conn_t *c) {
1604 assert(c);
1605 assert(c->rbytes >= c->rvbytes);
1606 assert(c->protocol == binary_prot);
1607
1608 int extlen = c->binary_header.response.extlen;
1609 int keylen = c->binary_header.response.keylen;
1610 uint8_t opcode = c->binary_header.response.opcode;
1611
1612 /* not get command or not include value, just return */
1613 if (((opcode != PROTOCOL_BINARY_CMD_GET) && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1614 || (c->rvbytes <= extlen + keylen))
1615 {
1616 /* get miss */
1617 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) {
1618 c->currcmd.retstat = MCD_END;
1619 c->curr_task.get_miss = true;
1620 }
1621
1622 c->readval = false;
1623 c->rvbytes = 0;
1624 ms_reset_conn(c, false);
1625 return;
1626 }
1627
1628 /* multi-get */
1629 ms_mlget_task_item_t *mlget_item = NULL;
1630 if (((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1631 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1632 {
1633 c->mlget_task.value_index++;
1634 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index];
1635
1636 c->curr_task.item = mlget_item->item;
1637 c->curr_task.verify = mlget_item->verify;
1638 c->curr_task.finish_verify = mlget_item->finish_verify;
1639 mlget_item->get_miss = false;
1640 }
1641
1642 ms_verify_value(c, mlget_item, c->rcurr + extlen + keylen, c->rvbytes - extlen - keylen);
1643
1644 c->currcmd.retstat = MCD_END;
1645 c->curr_task.get_miss = false;
1646 c->rbytes -= c->rvbytes;
1647 c->rcurr = c->rcurr + c->rvbytes;
1648 assert(c->rcurr <= (c->rbuf + c->rsize));
1649 c->readval = false;
1650 c->rvbytes = 0;
1651
1652 if (ms_setting.mult_key_num > 1) {
1653 /* multi-get have check all the item */
1654 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) {
1655 ms_reset_conn(c, false);
1656 }
1657 } else {
1658 /* single get */
1659 ms_reset_conn(c, false);
1660 }
1661 } /* ms_bin_complete_nread */
1662
1663 /**
1664 * we get here after reading the value of get commands.
1665 *
1666 * @param c, pointer of the concurrency
1667 */
1668 static void ms_complete_nread(ms_conn_t *c) {
1669 assert(c);
1670 assert(c->rbytes >= c->rvbytes);
1671 assert(c->protocol == ascii_prot || c->protocol == binary_prot);
1672
1673 if (c->protocol == binary_prot) {
1674 ms_bin_complete_nread(c);
1675 } else {
1676 ms_ascii_complete_nread(c);
1677 }
1678 } /* ms_complete_nread */
1679
1680 /**
1681 * Adds a message header to a connection.
1682 *
1683 * @param c, pointer of the concurrency
1684 *
1685 * @return int, if success, return EXIT_SUCCESS, else return -1
1686 */
1687 static int ms_add_msghdr(ms_conn_t *c) {
1688 struct msghdr *msg;
1689
1690 assert(c);
1691
1692 if (c->msgsize == c->msgused) {
1693 msg = realloc(c->msglist, (size_t) c->msgsize * 2 * sizeof(struct msghdr));
1694 if (!msg)
1695 return -1;
1696
1697 c->msglist = msg;
1698 c->msgsize *= 2;
1699 }
1700
1701 msg = c->msglist + c->msgused;
1702
1703 /**
1704 * this wipes msg_iovlen, msg_control, msg_controllen, and
1705 * msg_flags, the last 3 of which aren't defined on solaris:
1706 */
1707 memset(msg, 0, sizeof(struct msghdr));
1708
1709 msg->msg_iov = &c->iov[c->iovused];
1710
1711 if (c->udp && (c->srv_recv_addr_size > 0)) {
1712 msg->msg_name = &c->srv_recv_addr;
1713 msg->msg_namelen = c->srv_recv_addr_size;
1714 }
1715
1716 c->msgbytes = 0;
1717 c->msgused++;
1718
1719 if (c->udp) {
1720 /* Leave room for the UDP header, which we'll fill in later. */
1721 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
1722 }
1723
1724 return EXIT_SUCCESS;
1725 } /* ms_add_msghdr */
1726
1727 /**
1728 * Ensures that there is room for another structure iovec in a connection's
1729 * iov list.
1730 *
1731 * @param c, pointer of the concurrency
1732 *
1733 * @return int, if success, return EXIT_SUCCESS, else return -1
1734 */
1735 static int ms_ensure_iov_space(ms_conn_t *c) {
1736 assert(c);
1737
1738 if (c->iovused >= c->iovsize) {
1739 int i, iovnum;
1740 struct iovec *new_iov =
1741 (struct iovec *) realloc(c->iov, ((size_t) c->iovsize * 2) * sizeof(struct iovec));
1742 if (!new_iov)
1743 return -1;
1744
1745 c->iov = new_iov;
1746 c->iovsize *= 2;
1747
1748 /* Point all the msghdr structures at the new list. */
1749 for (i = 0, iovnum = 0; i < c->msgused; i++) {
1750 c->msglist[i].msg_iov = &c->iov[iovnum];
1751 iovnum += (int) c->msglist[i].msg_iovlen;
1752 }
1753 }
1754
1755 return EXIT_SUCCESS;
1756 } /* ms_ensure_iov_space */
1757
1758 /**
1759 * Adds data to the list of pending data that will be written out to a
1760 * connection.
1761 *
1762 * @param c, pointer of the concurrency
1763 * @param buf, the buffer includes data to send
1764 * @param len, the data length in the buffer
1765 *
1766 * @return int, if success, return EXIT_SUCCESS, else return -1
1767 */
1768 static int ms_add_iov(ms_conn_t *c, const void *buf, int len) {
1769 struct msghdr *m;
1770 int leftover;
1771 bool limit_to_mtu;
1772
1773 assert(c);
1774
1775 do {
1776 m = &c->msglist[c->msgused - 1];
1777
1778 /*
1779 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
1780 */
1781 limit_to_mtu = c->udp;
1782
1783 #ifdef IOV_MAX
1784 /* We may need to start a new msghdr if this one is full. */
1785 if ((m->msg_iovlen == IOV_MAX) || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
1786 {
1787 ms_add_msghdr(c);
1788 m = &c->msglist[c->msgused - 1];
1789 }
1790 #endif
1791
1792 if (ms_ensure_iov_space(c))
1793 return -1;
1794
1795 /* If the fragment is too big to fit in the datagram, split it up */
1796 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE)) {
1797 leftover = len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
1798 len -= leftover;
1799 } else {
1800 leftover = 0;
1801 }
1802
1803 m = &c->msglist[c->msgused - 1];
1804 m->msg_iov[m->msg_iovlen].iov_base = (void *) buf;
1805 m->msg_iov[m->msg_iovlen].iov_len = (size_t) len;
1806
1807 c->msgbytes += len;
1808 c->iovused++;
1809 m->msg_iovlen++;
1810
1811 buf = ((char *) buf) + len;
1812 len = leftover;
1813 } while (leftover > 0);
1814
1815 return EXIT_SUCCESS;
1816 } /* ms_add_iov */
1817
1818 /**
1819 * Constructs a set of UDP headers and attaches them to the outgoing messages.
1820 *
1821 * @param c, pointer of the concurrency
1822 *
1823 * @return int, if success, return EXIT_SUCCESS, else return -1
1824 */
1825 static int ms_build_udp_headers(ms_conn_t *c) {
1826 int i;
1827 unsigned char *hdr;
1828
1829 assert(c);
1830
1831 c->request_id = ms_get_udp_request_id();
1832
1833 if (c->msgused > c->hdrsize) {
1834 void *new_hdrbuf;
1835 if (c->hdrbuf)
1836 new_hdrbuf = realloc(c->hdrbuf, (size_t) c->msgused * 2 * UDP_HEADER_SIZE);
1837 else
1838 new_hdrbuf = malloc((size_t) c->msgused * 2 * UDP_HEADER_SIZE);
1839 if (!new_hdrbuf)
1840 return -1;
1841
1842 c->hdrbuf = (unsigned char *) new_hdrbuf;
1843 c->hdrsize = c->msgused * 2;
1844 }
1845
1846 /* If this is a multi-packet request, drop it. */
1847 if (c->udp && (c->msgused > 1)) {
1848 fprintf(stderr, "multi-packet request for UDP not supported.\n");
1849 return -1;
1850 }
1851
1852 hdr = c->hdrbuf;
1853 for (i = 0; i < c->msgused; i++) {
1854 c->msglist[i].msg_iov[0].iov_base = (void *) hdr;
1855 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
1856 *hdr++ = (unsigned char) (c->request_id / 256);
1857 *hdr++ = (unsigned char) (c->request_id % 256);
1858 *hdr++ = (unsigned char) (i / 256);
1859 *hdr++ = (unsigned char) (i % 256);
1860 *hdr++ = (unsigned char) (c->msgused / 256);
1861 *hdr++ = (unsigned char) (c->msgused % 256);
1862 *hdr++ = (unsigned char) 1; /* support facebook memcached */
1863 *hdr++ = (unsigned char) 0;
1864 assert(hdr == ((unsigned char *) c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE));
1865 }
1866
1867 return EXIT_SUCCESS;
1868 } /* ms_build_udp_headers */
1869
1870 /**
1871 * Transmit the next chunk of data from our list of msgbuf structures.
1872 *
1873 * @param c, pointer of the concurrency
1874 *
1875 * @return TRANSMIT_COMPLETE All done writing.
1876 * TRANSMIT_INCOMPLETE More data remaining to write.
1877 * TRANSMIT_SOFT_ERROR Can't write any more right now.
1878 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1879 */
1880 static int ms_transmit(ms_conn_t *c) {
1881 assert(c);
1882
1883 if ((c->msgcurr < c->msgused) && (c->msglist[c->msgcurr].msg_iovlen == 0)) {
1884 /* Finished writing the current msg; advance to the next. */
1885 c->msgcurr++;
1886 }
1887
1888 if (c->msgcurr < c->msgused) {
1889 ssize_t res;
1890 struct msghdr *m = &c->msglist[c->msgcurr];
1891
1892 res = sendmsg(c->sfd, m, 0);
1893 if (res > 0) {
1894 atomic_add_size(&ms_stats.bytes_written, res);
1895
1896 /* We've written some of the data. Remove the completed
1897 * iovec entries from the list of pending writes. */
1898 while (m->msg_iovlen > 0 && res >= (ssize_t) m->msg_iov->iov_len) {
1899 res -= (ssize_t) m->msg_iov->iov_len;
1900 m->msg_iovlen--;
1901 m->msg_iov++;
1902 }
1903
1904 /* Might have written just part of the last iovec entry;
1905 * adjust it so the next write will do the rest. */
1906 if (res > 0) {
1907 m->msg_iov->iov_base = (void *) ((unsigned char *) m->msg_iov->iov_base + res);
1908 m->msg_iov->iov_len -= (size_t) res;
1909 }
1910 return TRANSMIT_INCOMPLETE;
1911 }
1912 if ((res == -1) && ((errno == EAGAIN) || (EAGAIN != EWOULDBLOCK && errno == EWOULDBLOCK))) {
1913 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
1914 fprintf(stderr, "Couldn't update event.\n");
1915 ms_conn_set_state(c, conn_closing);
1916 return TRANSMIT_HARD_ERROR;
1917 }
1918 return TRANSMIT_SOFT_ERROR;
1919 }
1920
1921 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1922 * we have a real error, on which we close the connection */
1923 fprintf(stderr, "Failed to write, and not due to blocking.\n");
1924
1925 ms_conn_set_state(c, conn_closing);
1926 return TRANSMIT_HARD_ERROR;
1927 } else {
1928 return TRANSMIT_COMPLETE;
1929 }
1930 } /* ms_transmit */
1931
1932 /**
1933 * Shrinks a connection's buffers if they're too big. This prevents
1934 * periodic large "mget" response from server chewing lots of client
1935 * memory.
1936 *
1937 * This should only be called in between requests since it can wipe output
1938 * buffers!
1939 *
1940 * @param c, pointer of the concurrency
1941 */
1942 static void ms_conn_shrink(ms_conn_t *c) {
1943 assert(c);
1944
1945 if (c->udp)
1946 return;
1947
1948 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE)) {
1949 char *newbuf;
1950
1951 if (c->rcurr != c->rbuf)
1952 memmove(c->rbuf, c->rcurr, (size_t) c->rbytes);
1953
1954 newbuf = (char *) realloc((void *) c->rbuf, DATA_BUFFER_SIZE);
1955
1956 if (newbuf) {
1957 c->rbuf = newbuf;
1958 c->rsize = DATA_BUFFER_SIZE;
1959 }
1960 c->rcurr = c->rbuf;
1961 }
1962
1963 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
1964 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
1965 {
1966 char *new_rbuf = (char *) realloc(c->rudpbuf, (size_t) c->rudpsize * 2);
1967 if (new_rbuf) {
1968 c->rudpbuf = new_rbuf;
1969 c->rudpsize = UDP_DATA_BUFFER_SIZE;
1970 }
1971 /* TODO check error condition? */
1972 }
1973
1974 if (c->msgsize > MSG_LIST_HIGHWAT) {
1975 struct msghdr *newbuf =
1976 (struct msghdr *) realloc((void *) c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
1977 if (newbuf) {
1978 c->msglist = newbuf;
1979 c->msgsize = MSG_LIST_INITIAL;
1980 }
1981 /* TODO check error condition? */
1982 }
1983
1984 if (c->iovsize > IOV_LIST_HIGHWAT) {
1985 struct iovec *newbuf =
1986 (struct iovec *) realloc((void *) c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
1987 if (newbuf) {
1988 c->iov = newbuf;
1989 c->iovsize = IOV_LIST_INITIAL;
1990 }
1991 /* TODO check return value */
1992 }
1993 } /* ms_conn_shrink */
1994
1995 /**
1996 * Sets a connection's current state in the state machine. Any special
1997 * processing that needs to happen on certain state transitions can
1998 * happen here.
1999 *
2000 * @param c, pointer of the concurrency
2001 * @param state, connection state
2002 */
2003 static void ms_conn_set_state(ms_conn_t *c, int state) {
2004 assert(c);
2005
2006 if (state != c->state) {
2007 if (state == conn_read) {
2008 ms_conn_shrink(c);
2009 }
2010 c->state = state;
2011 }
2012 } /* ms_conn_set_state */
2013
2014 /**
2015 * update the event if socks change state. for example: when
2016 * change the listen scoket read event to sock write event, or
2017 * change socket handler, we could call this function.
2018 *
2019 * @param c, pointer of the concurrency
2020 * @param new_flags, new event flags
2021 *
2022 * @return bool, if success, return true, else return false
2023 */
2024 static bool ms_update_event(ms_conn_t *c, const int new_flags) {
2025 assert(c);
2026
2027 struct event_base *base = c->event.ev_base;
2028 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2029 && (!ms_setting.facebook_test || (c->total_sfds == 1)))
2030 {
2031 return true;
2032 }
2033
2034 if (event_del(&c->event) == -1) {
2035 /* try to delete the event again */
2036 if (event_del(&c->event) == -1) {
2037 return false;
2038 }
2039 }
2040
2041 event_set(&c->event, c->sfd, (short) new_flags, ms_event_handler, (void *) c);
2042 event_base_set(base, &c->event);
2043 c->ev_flags = (short) new_flags;
2044
2045 if (event_add(&c->event, NULL) == -1) {
2046 return false;
2047 }
2048
2049 return true;
2050 } /* ms_update_event */
2051
2052 /**
2053 * If user want to get the expected throughput, we could limit
2054 * the performance of memslap. we could give up some work and
2055 * just wait a short time. The function is used to check this
2056 * case.
2057 *
2058 * @param c, pointer of the concurrency
2059 *
2060 * @return bool, if success, return true, else return false
2061 */
2062 static bool ms_need_yield(ms_conn_t *c) {
2063 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
2064 int64_t tps = 0;
2065 int64_t time_diff = 0;
2066 struct timeval curr_time;
2067 ms_task_t *task = &c->curr_task;
2068
2069 if (ms_setting.expected_tps > 0) {
2070 gettimeofday(&curr_time, NULL);
2071 time_diff = ms_time_diff(&ms_thread->startup_time, &curr_time);
2072 tps = (int64_t)(((task->get_opt + task->set_opt) / (uint64_t) time_diff) * 1000000);
2073
2074 /* current throughput is greater than expected throughput */
2075 if (tps > ms_thread->thread_ctx->tps_perconn) {
2076 return true;
2077 }
2078 }
2079
2080 return false;
2081 } /* ms_need_yield */
2082
2083 /**
2084 * used to update the start time of each operation
2085 *
2086 * @param c, pointer of the concurrency
2087 */
2088 static void ms_update_start_time(ms_conn_t *c) {
2089 ms_task_item_t *item = c->curr_task.item;
2090
2091 if ((ms_setting.stat_freq > 0) || c->udp || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2092 {
2093 gettimeofday(&c->start_time, NULL);
2094 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)) {
2095 /* record the current time */
2096 item->client_time = c->start_time.tv_sec;
2097 }
2098 }
2099 } /* ms_update_start_time */
2100
2101 /**
2102 * run the state machine
2103 *
2104 * @param c, pointer of the concurrency
2105 */
2106 static void ms_drive_machine(ms_conn_t *c) {
2107 bool stop = false;
2108
2109 assert(c);
2110
2111 while (!stop) {
2112 switch (c->state) {
2113 case conn_read:
2114 if (c->readval) {
2115 if (c->rbytes >= c->rvbytes) {
2116 ms_complete_nread(c);
2117 break;
2118 }
2119 } else {
2120 if (ms_try_read_line(c)) {
2121 break;
2122 }
2123 }
2124
2125 if (ms_try_read_network(c)) {
2126 break;
2127 }
2128
2129 /* doesn't read all the response data, wait event wake up */
2130 if (!c->currcmd.isfinish) {
2131 if (!ms_update_event(c, EV_READ | EV_PERSIST)) {
2132 fprintf(stderr, "Couldn't update event.\n");
2133 ms_conn_set_state(c, conn_closing);
2134 break;
2135 }
2136 stop = true;
2137 break;
2138 }
2139
2140 /* we have no command line and no data to read from network, next write */
2141 ms_conn_set_state(c, conn_write);
2142 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2143
2144 break;
2145
2146 case conn_write:
2147 if (!c->ctnwrite && ms_need_yield(c)) {
2148 usleep(10);
2149
2150 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2151 fprintf(stderr, "Couldn't update event.\n");
2152 ms_conn_set_state(c, conn_closing);
2153 break;
2154 }
2155 stop = true;
2156 break;
2157 }
2158
2159 if (!c->ctnwrite && (ms_exec_task(c))) {
2160 ms_conn_set_state(c, conn_closing);
2161 break;
2162 }
2163
2164 /* record the start time before starting to send data if necessary */
2165 if (!c->ctnwrite || (c->change_sfd && c->ctnwrite)) {
2166 if (c->change_sfd) {
2167 c->change_sfd = false;
2168 }
2169 ms_update_start_time(c);
2170 }
2171
2172 /* change sfd if necessary */
2173 if (c->change_sfd) {
2174 c->ctnwrite = true;
2175 stop = true;
2176 break;
2177 }
2178
2179 /* execute task until nothing need be written to network */
2180 if (!c->ctnwrite && (c->msgcurr == c->msgused)) {
2181 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2182 fprintf(stderr, "Couldn't update event.\n");
2183 ms_conn_set_state(c, conn_closing);
2184 break;
2185 }
2186 stop = true;
2187 break;
2188 }
2189
2190 switch (ms_transmit(c)) {
2191 case TRANSMIT_COMPLETE:
2192 /* we have no data to write to network, next wait repose */
2193 if (!ms_update_event(c, EV_READ | EV_PERSIST)) {
2194 fprintf(stderr, "Couldn't update event.\n");
2195 ms_conn_set_state(c, conn_closing);
2196 c->ctnwrite = false;
2197 break;
2198 }
2199 ms_conn_set_state(c, conn_read);
2200 c->ctnwrite = false;
2201 stop = true;
2202 break;
2203
2204 case TRANSMIT_INCOMPLETE:
2205 c->ctnwrite = true;
2206 break; /* Continue in state machine. */
2207
2208 case TRANSMIT_HARD_ERROR:
2209 c->ctnwrite = false;
2210 break;
2211
2212 case TRANSMIT_SOFT_ERROR:
2213 c->ctnwrite = true;
2214 stop = true;
2215 break;
2216
2217 default:
2218 break;
2219 } /* switch */
2220
2221 break;
2222
2223 case conn_closing:
2224 /* recovery mode, need reconnect if connection close */
2225 if (ms_setting.reconnect
2226 && (!ms_global.time_out || ((ms_setting.run_time == 0) && (c->remain_exec_num > 0))))
2227 {
2228 if (ms_reconn(c)) {
2229 ms_conn_close(c);
2230 stop = true;
2231 break;
2232 }
2233
2234 ms_reset_conn(c, false);
2235
2236 if (c->total_sfds == 1) {
2237 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2238 fprintf(stderr, "Couldn't update event.\n");
2239 ms_conn_set_state(c, conn_closing);
2240 break;
2241 }
2242 }
2243
2244 break;
2245 } else {
2246 ms_conn_close(c);
2247 stop = true;
2248 break;
2249 }
2250
2251 default:
2252 assert(0);
2253 } /* switch */
2254 }
2255 } /* ms_drive_machine */
2256
2257 /**
2258 * the event handler of each thread
2259 *
2260 * @param fd, the file descriptor of socket
2261 * @param which, event flag
2262 * @param arg, argument
2263 */
2264 void ms_event_handler(const int fd, const short which, void *arg) {
2265 ms_conn_t *c = (ms_conn_t *) arg;
2266
2267 assert(c);
2268
2269 c->which = which;
2270
2271 /* sanity */
2272 if (fd != c->sfd) {
2273 fprintf(stderr, "Catastrophic: event fd: %d doesn't match conn fd: %d\n", fd, c->sfd);
2274 ms_conn_close(c);
2275 exit(1);
2276 }
2277 assert(fd == c->sfd);
2278
2279 ms_drive_machine(c);
2280
2281 /* wait for next event */
2282 } /* ms_event_handler */
2283
2284 /**
2285 * get the next socket descriptor index to run for replication
2286 *
2287 * @param c, pointer of the concurrency
2288 * @param cmd, command(get or set )
2289 *
2290 * @return int, if success, return the index, else return EXIT_SUCCESS
2291 */
2292 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) {
2293 uint32_t sock_index = 0;
2294 uint32_t i = 0;
2295
2296 if (c->total_sfds == 1) {
2297 return EXIT_SUCCESS;
2298 }
2299
2300 if (ms_setting.rep_write_srv == 0) {
2301 return sock_index;
2302 }
2303
2304 do {
2305 if (cmd == CMD_SET) {
2306 for (i = 0; i < ms_setting.rep_write_srv; i++) {
2307 if (c->tcpsfd[i] > 0) {
2308 break;
2309 }
2310 }
2311
2312 if (i == ms_setting.rep_write_srv) {
2313 /* random get one replication server to read */
2314 sock_index = (uint32_t) random() % c->total_sfds;
2315 } else {
2316 /* random get one replication writing server to write */
2317 sock_index = (uint32_t) random() % ms_setting.rep_write_srv;
2318 }
2319 } else if (cmd == CMD_GET) {
2320 /* random get one replication server to read */
2321 sock_index = (uint32_t) random() % c->total_sfds;
2322 }
2323 } while (c->tcpsfd[sock_index] == 0);
2324
2325 return sock_index;
2326 } /* ms_get_rep_sock_index */
2327
2328 /**
2329 * get the next socket descriptor index to run
2330 *
2331 * @param c, pointer of the concurrency
2332 *
2333 * @return int, return the index
2334 */
2335 static uint32_t ms_get_next_sock_index(ms_conn_t *c) {
2336 uint32_t sock_index = 0;
2337
2338 do {
2339 sock_index = (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2340 } while (c->tcpsfd[sock_index] == 0);
2341
2342 return sock_index;
2343 } /* ms_get_next_sock_index */
2344
2345 /**
2346 * update socket event of the connections
2347 *
2348 * @param c, pointer of the concurrency
2349 *
2350 * @return int, if success, return EXIT_SUCCESS, else return -1
2351 */
2352 static int ms_update_conn_sock_event(ms_conn_t *c) {
2353 assert(c);
2354
2355 switch (c->currcmd.cmd) {
2356 case CMD_SET:
2357 if (ms_setting.facebook_test && c->udp) {
2358 c->sfd = c->tcpsfd[0];
2359 c->udp = false;
2360 c->change_sfd = true;
2361 }
2362 break;
2363
2364 case CMD_GET:
2365 if (ms_setting.facebook_test && !c->udp) {
2366 c->sfd = c->udpsfd;
2367 c->udp = true;
2368 c->change_sfd = true;
2369 }
2370 break;
2371
2372 default:
2373 break;
2374 } /* switch */
2375
2376 if (!c->udp && (c->total_sfds > 1)) {
2377 if (c->cur_idx != c->total_sfds) {
2378 if (ms_setting.rep_write_srv == 0) {
2379 c->cur_idx = ms_get_next_sock_index(c);
2380 } else {
2381 c->cur_idx = ms_get_rep_sock_index(c, c->currcmd.cmd);
2382 }
2383 } else {
2384 /* must select the first sock of the connection at the beginning */
2385 c->cur_idx = 0;
2386 }
2387
2388 c->sfd = c->tcpsfd[c->cur_idx];
2389 assert(c->sfd);
2390 c->change_sfd = true;
2391 }
2392
2393 if (c->change_sfd) {
2394 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2395 fprintf(stderr, "Couldn't update event.\n");
2396 ms_conn_set_state(c, conn_closing);
2397 return -1;
2398 }
2399 }
2400
2401 return EXIT_SUCCESS;
2402 } /* ms_update_conn_sock_event */
2403
2404 /**
2405 * for ASCII protocol, this function build the set command
2406 * string and send the command.
2407 *
2408 * @param c, pointer of the concurrency
2409 * @param item, pointer of task item which includes the object
2410 * information
2411 *
2412 * @return int, if success, return EXIT_SUCCESS, else return -1
2413 */
2414 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) {
2415 int value_offset;
2416 int write_len;
2417 char *buffer = c->wbuf;
2418
2419 write_len = snprintf(buffer, c->wsize, " %u %d %d\r\n", 0, item->exp_time, item->value_size);
2420
2421 if (write_len > c->wsize || write_len < 0) {
2422 /* ought to be always enough. just fail for simplicity */
2423 fprintf(stderr, "output command line too long.\n");
2424 return -1;
2425 }
2426
2427 if (item->value_offset == INVALID_OFFSET) {
2428 value_offset = item->key_suffix_offset;
2429 } else {
2430 value_offset = item->value_offset;
2431 }
2432
2433 if ((ms_add_iov(c, "set ", 4))
2434 || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE))
2435 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2436 item->key_size - (int) KEY_PREFIX_SIZE)
2437 )
2438 || (ms_add_iov(c, buffer, write_len))
2439 || (ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size))
2440 || (ms_add_iov(c, "\r\n", 2)) || (c->udp && (ms_build_udp_headers(c))))
2441 {
2442 return -1;
2443 }
2444
2445 return EXIT_SUCCESS;
2446 } /* ms_build_ascii_write_buf_set */
2447
2448 /**
2449 * used to send set command to server
2450 *
2451 * @param c, pointer of the concurrency
2452 * @param item, pointer of task item which includes the object
2453 * information
2454 *
2455 * @return int, if success, return EXIT_SUCCESS, else return -1
2456 */
2457 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) {
2458 assert(c);
2459
2460 c->currcmd.cmd = CMD_SET;
2461 c->currcmd.isfinish = false;
2462 c->currcmd.retstat = MCD_FAILURE;
2463
2464 if (ms_update_conn_sock_event(c)) {
2465 return -1;
2466 }
2467
2468 c->msgcurr = 0;
2469 c->msgused = 0;
2470 c->iovused = 0;
2471 if (ms_add_msghdr(c)) {
2472 fprintf(stderr, "Out of memory preparing request.");
2473 return -1;
2474 }
2475
2476 /* binary protocol */
2477 if (c->protocol == binary_prot) {
2478 if (ms_build_bin_write_buf_set(c, item)) {
2479 return -1;
2480 }
2481 } else {
2482 if (ms_build_ascii_write_buf_set(c, item)) {
2483 return -1;
2484 }
2485 }
2486
2487 atomic_add_size(&ms_stats.obj_bytes, item->key_size + item->value_size);
2488 atomic_add_size(&ms_stats.cmd_set, 1);
2489
2490 return EXIT_SUCCESS;
2491 } /* ms_mcd_set */
2492
2493 /**
2494 * for ASCII protocol, this function build the get command
2495 * string and send the command.
2496 *
2497 * @param c, pointer of the concurrency
2498 * @param item, pointer of task item which includes the object
2499 * information
2500 *
2501 * @return int, if success, return EXIT_SUCCESS, else return -1
2502 */
2503 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) {
2504 if ((ms_add_iov(c, "get ", 4))
2505 || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE))
2506 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2507 item->key_size - (int) KEY_PREFIX_SIZE)
2508 )
2509 || (ms_add_iov(c, "\r\n", 2)) || (c->udp && (ms_build_udp_headers(c))))
2510 {
2511 return -1;
2512 }
2513
2514 return EXIT_SUCCESS;
2515 } /* ms_build_ascii_write_buf_get */
2516
2517 /**
2518 * used to send the get command to server
2519 *
2520 * @param c, pointer of the concurrency
2521 * @param item, pointer of task item which includes the object
2522 * information
2523 *
2524 * @return int, if success, return EXIT_SUCCESS, else return -1
2525 */
2526 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) {
2527 assert(c);
2528
2529 c->currcmd.cmd = CMD_GET;
2530 c->currcmd.isfinish = false;
2531 c->currcmd.retstat = MCD_FAILURE;
2532
2533 if (ms_update_conn_sock_event(c)) {
2534 return -1;
2535 }
2536
2537 c->msgcurr = 0;
2538 c->msgused = 0;
2539 c->iovused = 0;
2540 if (ms_add_msghdr(c)) {
2541 fprintf(stderr, "Out of memory preparing request.");
2542 return -1;
2543 }
2544
2545 /* binary protocol */
2546 if (c->protocol == binary_prot) {
2547 if (ms_build_bin_write_buf_get(c, item)) {
2548 return -1;
2549 }
2550 } else {
2551 if (ms_build_ascii_write_buf_get(c, item)) {
2552 return -1;
2553 }
2554 }
2555
2556 atomic_add_size(&ms_stats.cmd_get, 1);
2557
2558 return EXIT_SUCCESS;
2559 } /* ms_mcd_get */
2560
2561 /**
2562 * for ASCII protocol, this function build the multi-get command
2563 * string and send the command.
2564 *
2565 * @param c, pointer of the concurrency
2566 *
2567 * @return int, if success, return EXIT_SUCCESS, else return -1
2568 */
2569 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) {
2570 ms_task_item_t *item;
2571
2572 if (ms_add_iov(c, "get", 3)) {
2573 return -1;
2574 }
2575
2576 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2577 item = c->mlget_task.mlget_item[i].item;
2578 assert(item);
2579 if ((ms_add_iov(c, " ", 1))
2580 || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE))
2581 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2582 item->key_size - (int) KEY_PREFIX_SIZE)
2583 ))
2584 {
2585 return -1;
2586 }
2587 }
2588
2589 if ((ms_add_iov(c, "\r\n", 2)) || (c->udp && (ms_build_udp_headers(c)))) {
2590 return -1;
2591 }
2592
2593 return EXIT_SUCCESS;
2594 } /* ms_build_ascii_write_buf_mlget */
2595
2596 /**
2597 * used to send the multi-get command to server
2598 *
2599 * @param c, pointer of the concurrency
2600 *
2601 * @return int, if success, return EXIT_SUCCESS, else return -1
2602 */
2603 int ms_mcd_mlget(ms_conn_t *c) {
2604 ms_task_item_t *item;
2605
2606 assert(c);
2607 assert(c->mlget_task.mlget_num >= 1);
2608
2609 c->currcmd.cmd = CMD_GET;
2610 c->currcmd.isfinish = false;
2611 c->currcmd.retstat = MCD_FAILURE;
2612
2613 if (ms_update_conn_sock_event(c)) {
2614 return -1;
2615 }
2616
2617 c->msgcurr = 0;
2618 c->msgused = 0;
2619 c->iovused = 0;
2620 if (ms_add_msghdr(c)) {
2621 fprintf(stderr, "Out of memory preparing request.");
2622 return -1;
2623 }
2624
2625 /* binary protocol */
2626 if (c->protocol == binary_prot) {
2627 if (ms_build_bin_write_buf_mlget(c)) {
2628 return -1;
2629 }
2630 } else {
2631 if (ms_build_ascii_write_buf_mlget(c)) {
2632 return -1;
2633 }
2634 }
2635
2636 /* decrease operation time of each item */
2637 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2638 item = c->mlget_task.mlget_item[i].item;
2639 atomic_add_size(&ms_stats.cmd_get, 1);
2640 }
2641
2642 (void) item;
2643
2644 return EXIT_SUCCESS;
2645 } /* ms_mcd_mlget */
2646
2647 /**
2648 * binary protocol support
2649 */
2650
2651 /**
2652 * for binary protocol, parse the response of server
2653 *
2654 * @param c, pointer of the concurrency
2655 *
2656 * @return int, if success, return EXIT_SUCCESS, else return -1
2657 */
2658 static int ms_bin_process_response(ms_conn_t *c) {
2659 const char *errstr = NULL;
2660
2661 assert(c);
2662
2663 uint32_t bodylen = c->binary_header.response.bodylen;
2664 uint8_t opcode = c->binary_header.response.opcode;
2665 uint16_t status = c->binary_header.response.status;
2666
2667 if (bodylen > 0) {
2668 c->rvbytes = (int32_t) bodylen;
2669 c->readval = true;
2670 return EXIT_FAILURE;
2671 } else {
2672 switch (status) {
2673 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
2674 if (opcode == PROTOCOL_BINARY_CMD_SET) {
2675 c->currcmd.retstat = MCD_STORED;
2676 } else if (opcode == PROTOCOL_BINARY_CMD_DELETE) {
2677 c->currcmd.retstat = MCD_DELETED;
2678 } else if (opcode == PROTOCOL_BINARY_CMD_GET) {
2679 c->currcmd.retstat = MCD_END;
2680 }
2681 break;
2682
2683 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
2684 errstr = "Out of memory";
2685 c->currcmd.retstat = MCD_SERVER_ERROR;
2686 break;
2687
2688 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
2689 errstr = "Unknown command";
2690 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
2691 break;
2692
2693 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
2694 errstr = "Not found";
2695 c->currcmd.retstat = MCD_NOTFOUND;
2696 break;
2697
2698 case PROTOCOL_BINARY_RESPONSE_EINVAL:
2699 errstr = "Invalid arguments";
2700 c->currcmd.retstat = MCD_PROTOCOL_ERROR;
2701 break;
2702
2703 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
2704 errstr = "Data exists for key.";
2705 break;
2706
2707 case PROTOCOL_BINARY_RESPONSE_E2BIG:
2708 errstr = "Too large.";
2709 c->currcmd.retstat = MCD_SERVER_ERROR;
2710 break;
2711
2712 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
2713 errstr = "Not stored.";
2714 c->currcmd.retstat = MCD_NOTSTORED;
2715 break;
2716
2717 default:
2718 errstr = "Unknown error";
2719 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
2720 break;
2721 } /* switch */
2722
2723 if (errstr) {
2724 fprintf(stderr, "%s\n", errstr);
2725 }
2726 }
2727
2728 return EXIT_SUCCESS;
2729 } /* ms_bin_process_response */
2730
2731 /* build binary header and add the header to the buffer to send */
2732
2733 /**
2734 * build binary header and add the header to the buffer to send
2735 *
2736 * @param c, pointer of the concurrency
2737 * @param opcode, operation code
2738 * @param hdr_len, length of header
2739 * @param key_len, length of key
2740 * @param body_len. length of body
2741 */
2742 static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, uint16_t key_len,
2743 uint32_t body_len) {
2744 protocol_binary_request_header *header;
2745
2746 assert(c);
2747
2748 header = (protocol_binary_request_header *) c->wcurr;
2749
2750 header->request.magic = (uint8_t) PROTOCOL_BINARY_REQ;
2751 header->request.opcode = (uint8_t) opcode;
2752 header->request.keylen = htons(key_len);
2753
2754 header->request.extlen = (uint8_t) hdr_len;
2755 header->request.datatype = (uint8_t) PROTOCOL_BINARY_RAW_BYTES;
2756 header->request.vbucket = 0;
2757
2758 header->request.bodylen = htonl(body_len);
2759 header->request.opaque = 0;
2760 header->request.cas = 0;
2761
2762 ms_add_iov(c, c->wcurr, sizeof(header->request));
2763 } /* ms_add_bin_header */
2764
2765 /**
2766 * add the key to the socket write buffer array
2767 *
2768 * @param c, pointer of the concurrency
2769 * @param item, pointer of task item which includes the object
2770 * information
2771 */
2772 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) {
2773 ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE);
2774 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2775 item->key_size - (int) KEY_PREFIX_SIZE);
2776 }
2777
2778 /**
2779 * for binary protocol, this function build the set command
2780 * and add the command to send buffer array.
2781 *
2782 * @param c, pointer of the concurrency
2783 * @param item, pointer of task item which includes the object
2784 * information
2785 *
2786 * @return int, if success, return EXIT_SUCCESS, else return -1
2787 */
2788 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) {
2789 assert(c->wbuf == c->wcurr);
2790
2791 int value_offset;
2792 protocol_binary_request_set *rep = (protocol_binary_request_set *) c->wcurr;
2793 uint16_t keylen = (uint16_t) item->key_size;
2794 uint32_t bodylen =
2795 (uint32_t) sizeof(rep->message.body) + (uint32_t) keylen + (uint32_t) item->value_size;
2796
2797 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_SET, sizeof(rep->message.body), keylen, bodylen);
2798 rep->message.body.flags = 0;
2799 rep->message.body.expiration = htonl((uint32_t) item->exp_time);
2800 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
2801 ms_add_key_to_iov(c, item);
2802
2803 if (item->value_offset == INVALID_OFFSET) {
2804 value_offset = item->key_suffix_offset;
2805 } else {
2806 value_offset = item->value_offset;
2807 }
2808 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
2809
2810 return EXIT_SUCCESS;
2811 } /* ms_build_bin_write_buf_set */
2812
2813 /**
2814 * for binary protocol, this function build the get command and
2815 * add the command to send buffer array.
2816 *
2817 * @param c, pointer of the concurrency
2818 * @param item, pointer of task item which includes the object
2819 * information
2820 *
2821 * @return int, if success, return EXIT_SUCCESS, else return -1
2822 */
2823 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) {
2824 assert(c->wbuf == c->wcurr);
2825
2826 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t) item->key_size,
2827 (uint32_t) item->key_size);
2828 ms_add_key_to_iov(c, item);
2829
2830 return EXIT_SUCCESS;
2831 } /* ms_build_bin_write_buf_get */
2832
2833 /**
2834 * for binary protocol, this function build the multi-get
2835 * command and add the command to send buffer array.
2836 *
2837 * @param c, pointer of the concurrency
2838 * @param item, pointer of task item which includes the object
2839 * information
2840 *
2841 * @return int, if success, return EXIT_SUCCESS, else return -1
2842 */
2843 static int ms_build_bin_write_buf_mlget(ms_conn_t *c) {
2844 ms_task_item_t *item;
2845
2846 assert(c->wbuf == c->wcurr);
2847
2848 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2849 item = c->mlget_task.mlget_item[i].item;
2850 assert(item);
2851
2852 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t) item->key_size,
2853 (uint32_t) item->key_size);
2854 ms_add_key_to_iov(c, item);
2855 c->wcurr += sizeof(protocol_binary_request_get);
2856 }
2857
2858 c->wcurr = c->wbuf;
2859
2860 return EXIT_SUCCESS;
2861 } /* ms_build_bin_write_buf_mlget */