src/bin: apply clang-format
[m6w6/libmemcached] / src / bin / memaslap / ms_conn.c
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #include <stdio.h>
19 #include <inttypes.h>
20 #include <limits.h>
21 #include <sys/uio.h>
22 #include <event.h>
23 #include <fcntl.h>
24 #include <netinet/tcp.h>
25 #include <netinet/in.h>
26
27 #if defined(HAVE_ARPA_INET_H)
28 # include <arpa/inet.h>
29 #endif
30
31 #if defined(HAVE_SYS_TIME_H)
32 # include <sys/time.h>
33 #endif
34
35 #if defined(HAVE_TIME_H)
36 # include <time.h>
37 #endif
38
39 #include "ms_setting.h"
40 #include "ms_thread.h"
41 #include "ms_atomic.h"
42
43 #ifdef linux
44 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
45 * optimize the conversion functions, but the prototypes generate warnings
46 * from gcc. The conversion methods isn't the bottleneck for my app, so
47 * just remove the warnings by undef'ing the optimization ..
48 */
49 # undef ntohs
50 # undef ntohl
51 # undef htons
52 # undef htonl
53 #endif
54
55 /* for network write */
56 #define TRANSMIT_COMPLETE 0
57 #define TRANSMIT_INCOMPLETE 1
58 #define TRANSMIT_SOFT_ERROR 2
59 #define TRANSMIT_HARD_ERROR 3
60
61 /* for generating key */
62 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
63 #define KEY_PREFIX_MASK 0x1010101010101010
64
65 /* For parse the value length return by server */
66 #define KEY_TOKEN 1
67 #define VALUELEN_TOKEN 3
68
69 /* global increasing counter, to ensure the key prefix unique */
70 static uint64_t key_prefix_seq = KEY_PREFIX_BASE;
71
72 /* global increasing counter, generating request id for UDP */
73 static ATOMIC uint32_t udp_request_id = 0;
74
75 extern pthread_key_t ms_thread_key;
76
77 /* generate upd request id */
78 static uint32_t ms_get_udp_request_id(void);
79
80 /* connect initialize */
81 static void ms_task_init(ms_conn_t *c);
82 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
83 static int ms_conn_sock_init(ms_conn_t *c);
84 static int ms_conn_event_init(ms_conn_t *c);
85 static int ms_conn_init(ms_conn_t *c, const int init_state, const int read_buffer_size,
86 const bool is_udp);
87 static void ms_warmup_num_init(ms_conn_t *c);
88 static int ms_item_win_init(ms_conn_t *c);
89
90 /* connection close */
91 void ms_conn_free(ms_conn_t *c);
92 static void ms_conn_close(ms_conn_t *c);
93
94 /* create network connection */
95 static int ms_new_socket(struct addrinfo *ai);
96 static void ms_maximize_sndbuf(const int sfd);
97 static int ms_network_connect(ms_conn_t *c, char *srv_host_name, const int srv_port,
98 const bool is_udp, int *ret_sfd);
99 static int ms_reconn(ms_conn_t *c);
100
101 /* read and parse */
102 static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens);
103 static int ms_ascii_process_line(ms_conn_t *c, char *command);
104 static int ms_try_read_line(ms_conn_t *c);
105 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
106 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
107 static int ms_try_read_network(ms_conn_t *c);
108 static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, char *value, int vlen);
109 static void ms_ascii_complete_nread(ms_conn_t *c);
110 static void ms_bin_complete_nread(ms_conn_t *c);
111 static void ms_complete_nread(ms_conn_t *c);
112
113 /* send functions */
114 static int ms_add_msghdr(ms_conn_t *c);
115 static int ms_ensure_iov_space(ms_conn_t *c);
116 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
117 static int ms_build_udp_headers(ms_conn_t *c);
118 static int ms_transmit(ms_conn_t *c);
119
120 /* status adjustment */
121 static void ms_conn_shrink(ms_conn_t *c);
122 static void ms_conn_set_state(ms_conn_t *c, int state);
123 static bool ms_update_event(ms_conn_t *c, const int new_flags);
124 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
125 static uint32_t ms_get_next_sock_index(ms_conn_t *c);
126 static int ms_update_conn_sock_event(ms_conn_t *c);
127 static bool ms_need_yield(ms_conn_t *c);
128 static void ms_update_start_time(ms_conn_t *c);
129
130 /* main loop */
131 static void ms_drive_machine(ms_conn_t *c);
132 void ms_event_handler(const int fd, const short which, void *arg);
133
134 /* ascii protocol */
135 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
136 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
137 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
138
139 /* binary protocol */
140 static int ms_bin_process_response(ms_conn_t *c);
141 static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, uint16_t key_len,
142 uint32_t body_len);
143 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
144 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
145 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
146 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
147
148 /**
149 * each key has two parts, prefix and suffix. The suffix is a
150 * string random get form the character table. The prefix is a
151 * uint64_t variable. And the prefix must be unique. we use the
152 * prefix to identify a key. And the prefix can't include
153 * character ' ' '\r' '\n' '\0'.
154 *
155 * @return uint64_t
156 */
157 uint64_t ms_get_key_prefix(void) {
158 uint64_t key_prefix;
159
160 pthread_mutex_lock(&ms_global.seq_mutex);
161 key_prefix_seq |= KEY_PREFIX_MASK;
162 key_prefix = key_prefix_seq;
163 key_prefix_seq++;
164 pthread_mutex_unlock(&ms_global.seq_mutex);
165
166 return key_prefix;
167 } /* ms_get_key_prefix */
168
169 /**
170 * get an unique udp request id
171 *
172 * @return an unique UDP request id
173 */
174 static uint32_t ms_get_udp_request_id(void) {
175 return atomic_add_32_nv(&udp_request_id, 1);
176 }
177
178 /**
179 * initialize current task structure
180 *
181 * @param c, pointer of the concurrency
182 */
183 static void ms_task_init(ms_conn_t *c) {
184 c->curr_task.cmd = CMD_NULL;
185 c->curr_task.item = 0;
186 c->curr_task.verify = false;
187 c->curr_task.finish_verify = true;
188 c->curr_task.get_miss = true;
189
190 c->curr_task.get_opt = 0;
191 c->curr_task.set_opt = 0;
192 c->curr_task.cycle_undo_get = 0;
193 c->curr_task.cycle_undo_set = 0;
194 c->curr_task.verified_get = 0;
195 c->curr_task.overwrite_set = 0;
196 } /* ms_task_init */
197
198 /**
199 * initialize udp for the connection structure
200 *
201 * @param c, pointer of the concurrency
202 * @param is_udp, whether it's udp
203 *
204 * @return int, if success, return EXIT_SUCCESS, else return -1
205 */
206 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) {
207 c->hdrbuf = 0;
208 c->rudpbuf = 0;
209 c->udppkt = 0;
210
211 c->rudpsize = UDP_DATA_BUFFER_SIZE;
212 c->hdrsize = 0;
213
214 c->rudpbytes = 0;
215 c->packets = 0;
216 c->recvpkt = 0;
217 c->pktcurr = 0;
218 c->ordcurr = 0;
219
220 c->udp = is_udp;
221
222 if (c->udp || (!c->udp && ms_setting.facebook_test)) {
223 c->rudpbuf = (char *) malloc((size_t) c->rudpsize);
224 c->udppkt = (ms_udppkt_t *) malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
225
226 if ((c->rudpbuf == NULL) || (c->udppkt == NULL)) {
227 if (c->rudpbuf != NULL)
228 free(c->rudpbuf);
229 if (c->udppkt != NULL)
230 free(c->udppkt);
231 fprintf(stderr, "malloc()\n");
232 return -1;
233 }
234 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
235 }
236
237 return EXIT_SUCCESS;
238 } /* ms_conn_udp_init */
239
240 /**
241 * initialize the connection structure
242 *
243 * @param c, pointer of the concurrency
244 * @param init_state, (conn_read, conn_write, conn_closing)
245 * @param read_buffer_size
246 * @param is_udp, whether it's udp
247 *
248 * @return int, if success, return EXIT_SUCCESS, else return -1
249 */
250 static int ms_conn_init(ms_conn_t *c, const int init_state, const int read_buffer_size,
251 const bool is_udp) {
252 assert(c != NULL);
253
254 c->rbuf = c->wbuf = 0;
255 c->iov = 0;
256 c->msglist = 0;
257
258 c->rsize = read_buffer_size;
259 c->wsize = WRITE_BUFFER_SIZE;
260 c->iovsize = IOV_LIST_INITIAL;
261 c->msgsize = MSG_LIST_INITIAL;
262
263 /* for replication, each connection need connect all the server */
264 if (ms_setting.rep_write_srv > 0) {
265 c->total_sfds = ms_setting.srv_cnt * ms_setting.sock_per_conn;
266 } else {
267 c->total_sfds = ms_setting.sock_per_conn;
268 }
269 c->alive_sfds = 0;
270
271 c->rbuf = (char *) malloc((size_t) c->rsize);
272 c->wbuf = (char *) malloc((size_t) c->wsize);
273 c->iov = (struct iovec *) malloc(sizeof(struct iovec) * (size_t) c->iovsize);
274 c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * (size_t) c->msgsize);
275 if (ms_setting.mult_key_num > 1) {
276 c->mlget_task.mlget_item = (ms_mlget_task_item_t *) malloc(sizeof(ms_mlget_task_item_t)
277 * (size_t) ms_setting.mult_key_num);
278 }
279 c->tcpsfd = (int *) malloc((size_t) c->total_sfds * sizeof(int));
280
281 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL) || (c->msglist == NULL)
282 || (c->tcpsfd == NULL)
283 || ((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_item == NULL)))
284 {
285 if (c->rbuf != NULL)
286 free(c->rbuf);
287 if (c->wbuf != NULL)
288 free(c->wbuf);
289 if (c->iov != NULL)
290 free(c->iov);
291 if (c->msglist != NULL)
292 free(c->msglist);
293 if (c->mlget_task.mlget_item != NULL)
294 free(c->mlget_task.mlget_item);
295 if (c->tcpsfd != NULL)
296 free(c->tcpsfd);
297 fprintf(stderr, "malloc()\n");
298 return -1;
299 }
300
301 c->state = init_state;
302 c->rvbytes = 0;
303 c->rbytes = 0;
304 c->rcurr = c->rbuf;
305 c->wcurr = c->wbuf;
306 c->iovused = 0;
307 c->msgcurr = 0;
308 c->msgused = 0;
309 c->cur_idx = c->total_sfds; /* default index is a invalid value */
310
311 c->ctnwrite = false;
312 c->readval = false;
313 c->change_sfd = false;
314
315 c->precmd.cmd = c->currcmd.cmd = CMD_NULL;
316 c->precmd.isfinish = true; /* default the previous command finished */
317 c->currcmd.isfinish = false;
318 c->precmd.retstat = c->currcmd.retstat = MCD_FAILURE;
319 c->precmd.key_prefix = c->currcmd.key_prefix = 0;
320
321 c->mlget_task.mlget_num = 0;
322 c->mlget_task.value_index = -1; /* default invalid value */
323
324 if (ms_setting.binary_prot_) {
325 c->protocol = binary_prot;
326 } else {
327 c->protocol = ascii_prot;
328 }
329
330 /* initialize udp */
331 if (ms_conn_udp_init(c, is_udp) != 0) {
332 return -1;
333 }
334
335 /* initialize task */
336 ms_task_init(c);
337
338 if (!(ms_setting.facebook_test && is_udp)) {
339 atomic_add_32(&ms_stats.active_conns, 1);
340 }
341
342 return EXIT_SUCCESS;
343 } /* ms_conn_init */
344
345 /**
346 * when doing 100% get operation, it could preset some objects
347 * to warmup the server. this function is used to initialize the
348 * number of the objects to preset.
349 *
350 * @param c, pointer of the concurrency
351 */
352 static void ms_warmup_num_init(ms_conn_t *c) {
353 /* no set operation, preset all the items in the window */
354 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) {
355 c->warmup_num = c->win_size;
356 c->remain_warmup_num = c->warmup_num;
357 } else {
358 c->warmup_num = 0;
359 c->remain_warmup_num = c->warmup_num;
360 }
361 } /* ms_warmup_num_init */
362
363 /**
364 * each connection has an item window, this function initialize
365 * the window. The window is used to generate task.
366 *
367 * @param c, pointer of the concurrency
368 *
369 * @return int, if success, return EXIT_SUCCESS, else return -1
370 */
371 static int ms_item_win_init(ms_conn_t *c) {
372 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
373 int exp_cnt = 0;
374
375 c->win_size = (int) ms_setting.win_size;
376 c->set_cursor = 0;
377 c->exec_num = ms_thread->thread_ctx->exec_num_perconn;
378 c->remain_exec_num = c->exec_num;
379
380 c->item_win = (ms_task_item_t *) malloc(sizeof(ms_task_item_t) * (size_t) c->win_size);
381 if (c->item_win == NULL) {
382 fprintf(stderr, "Can't allocate task item array for conn.\n");
383 return -1;
384 }
385 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t) c->win_size);
386
387 for (int i = 0; i < c->win_size; i++) {
388 c->item_win[i].key_size = (int) ms_setting.distr[i].key_size;
389 c->item_win[i].key_prefix = ms_get_key_prefix();
390 c->item_win[i].key_suffix_offset = ms_setting.distr[i].key_offset;
391 c->item_win[i].value_size = (int) ms_setting.distr[i].value_size;
392 c->item_win[i].value_offset = INVALID_OFFSET; /* default in invalid offset */
393 c->item_win[i].client_time = 0;
394
395 /* set expire time base on the proportion */
396 if (exp_cnt < ms_setting.exp_ver_per * i) {
397 c->item_win[i].exp_time = FIXED_EXPIRE_TIME;
398 exp_cnt++;
399 } else {
400 c->item_win[i].exp_time = 0;
401 }
402 }
403
404 ms_warmup_num_init(c);
405
406 return EXIT_SUCCESS;
407 } /* ms_item_win_init */
408
409 /**
410 * each connection structure can include one or more sock
411 * handlers. this function create these socks and connect the
412 * server(s).
413 *
414 * @param c, pointer of the concurrency
415 *
416 * @return int, if success, return EXIT_SUCCESS, else return -1
417 */
418 static int ms_conn_sock_init(ms_conn_t *c) {
419 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
420 uint32_t i;
421 int ret_sfd;
422 uint32_t srv_idx = 0;
423
424 assert(c != NULL);
425 assert(c->tcpsfd != NULL);
426
427 for (i = 0; i < c->total_sfds; i++) {
428 ret_sfd = 0;
429 if (ms_setting.rep_write_srv > 0) {
430 /* for replication, each connection need connect all the server */
431 srv_idx = i % ms_setting.srv_cnt;
432 } else {
433 /* all the connections in a thread connects the same server */
434 srv_idx = ms_thread->thread_ctx->srv_idx;
435 }
436
437 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
438 ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &ret_sfd)
439 != 0)
440 {
441 break;
442 }
443
444 if (i == 0) {
445 c->sfd = ret_sfd;
446 }
447
448 if (!ms_setting.udp) {
449 c->tcpsfd[i] = ret_sfd;
450 }
451
452 c->alive_sfds++;
453 }
454
455 /* initialize udp sock handler if necessary */
456 if (ms_setting.facebook_test) {
457 ret_sfd = 0;
458 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
459 ms_setting.servers[srv_idx].srv_port, true, &ret_sfd)
460 != 0)
461 {
462 c->udpsfd = 0;
463 } else {
464 c->udpsfd = ret_sfd;
465 }
466 }
467
468 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0))) {
469 if (ms_setting.udp) {
470 close(c->sfd);
471 } else {
472 for (uint32_t j = 0; j < i; j++) {
473 close(c->tcpsfd[j]);
474 }
475 }
476
477 if (c->udpsfd != 0) {
478 close(c->udpsfd);
479 }
480
481 return -1;
482 }
483
484 return EXIT_SUCCESS;
485 } /* ms_conn_sock_init */
486
487 /**
488 * each connection is managed by libevent, this function
489 * initialize the event of the connection structure.
490 *
491 * @param c, pointer of the concurrency
492 *
493 * @return int, if success, return EXIT_SUCCESS, else return -1
494 */
495 static int ms_conn_event_init(ms_conn_t *c) {
496 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
497 short event_flags = EV_WRITE | EV_PERSIST;
498
499 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *) c);
500 event_base_set(ms_thread->base, &c->event);
501 c->ev_flags = event_flags;
502
503 if (event_add(&c->event, NULL) == -1) {
504 return -1;
505 }
506
507 return EXIT_SUCCESS;
508 } /* ms_conn_event_init */
509
510 /**
511 * setup a connection, each connection structure of each
512 * thread must call this function to initialize.
513 *
514 * @param c, pointer of the concurrency
515 *
516 * @return int, if success, return EXIT_SUCCESS, else return -1
517 */
518 int ms_setup_conn(ms_conn_t *c) {
519 if (ms_item_win_init(c) != 0) {
520 return -1;
521 }
522
523 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0) {
524 return -1;
525 }
526
527 if (ms_conn_sock_init(c) != 0) {
528 return -1;
529 }
530
531 if (ms_conn_event_init(c) != 0) {
532 return -1;
533 }
534
535 return EXIT_SUCCESS;
536 } /* ms_setup_conn */
537
538 /**
539 * Frees a connection.
540 *
541 * @param c, pointer of the concurrency
542 */
543 void ms_conn_free(ms_conn_t *c) {
544 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
545 if (c != NULL) {
546 if (c->hdrbuf != NULL)
547 free(c->hdrbuf);
548 if (c->msglist != NULL)
549 free(c->msglist);
550 if (c->rbuf != NULL)
551 free(c->rbuf);
552 if (c->wbuf != NULL)
553 free(c->wbuf);
554 if (c->iov != NULL)
555 free(c->iov);
556 if (c->mlget_task.mlget_item != NULL)
557 free(c->mlget_task.mlget_item);
558 if (c->rudpbuf != NULL)
559 free(c->rudpbuf);
560 if (c->udppkt != NULL)
561 free(c->udppkt);
562 if (c->item_win != NULL)
563 free(c->item_win);
564 if (c->tcpsfd != NULL)
565 free(c->tcpsfd);
566
567 if (--ms_thread->nactive_conn == 0) {
568 free(ms_thread->conn);
569 }
570 }
571 } /* ms_conn_free */
572
573 /**
574 * close a connection
575 *
576 * @param c, pointer of the concurrency
577 */
578 static void ms_conn_close(ms_conn_t *c) {
579 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
580 assert(c != NULL);
581
582 /* delete the event, the socket and the connection */
583 event_del(&c->event);
584
585 for (uint32_t i = 0; i < c->total_sfds; i++) {
586 if (c->tcpsfd[i] > 0) {
587 close(c->tcpsfd[i]);
588 }
589 }
590 c->sfd = 0;
591
592 if (ms_setting.facebook_test) {
593 close(c->udpsfd);
594 }
595
596 atomic_dec_32(&ms_stats.active_conns);
597
598 ms_conn_free(c);
599
600 if (ms_setting.run_time == 0) {
601 pthread_mutex_lock(&ms_global.run_lock.lock);
602 ms_global.run_lock.count++;
603 pthread_cond_signal(&ms_global.run_lock.cond);
604 pthread_mutex_unlock(&ms_global.run_lock.lock);
605 }
606
607 if (ms_thread->nactive_conn == 0) {
608 pthread_exit(NULL);
609 }
610 } /* ms_conn_close */
611
612 /**
613 * create a new sock
614 *
615 * @param ai, server address information
616 *
617 * @return int, if success, return EXIT_SUCCESS, else return -1
618 */
619 static int ms_new_socket(struct addrinfo *ai) {
620 int sfd;
621
622 if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
623 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
624 return -1;
625 }
626
627 return sfd;
628 } /* ms_new_socket */
629
630 /**
631 * Sets a socket's send buffer size to the maximum allowed by the system.
632 *
633 * @param sfd, file descriptor of socket
634 */
635 static void ms_maximize_sndbuf(const int sfd) {
636 socklen_t intsize = sizeof(int);
637 unsigned int last_good = 0;
638 unsigned int min, max, avg;
639 unsigned int old_size;
640
641 /* Start with the default size. */
642 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
643 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
644 return;
645 }
646
647 /* Binary-search for the real maximum. */
648 min = old_size;
649 max = MAX_SENDBUF_SIZE;
650
651 while (min <= max) {
652 avg = ((unsigned int) (min + max)) / 2;
653 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *) &avg, intsize) == 0) {
654 last_good = avg;
655 min = avg + 1;
656 } else {
657 max = avg - 1;
658 }
659 }
660 (void) last_good;
661 } /* ms_maximize_sndbuf */
662
663 /**
664 * socket connects the server
665 *
666 * @param c, pointer of the concurrency
667 * @param srv_host_name, the host name of the server
668 * @param srv_port, port of server
669 * @param is_udp, whether it's udp
670 * @param ret_sfd, the connected socket file descriptor
671 *
672 * @return int, if success, return EXIT_SUCCESS, else return -1
673 */
674 static int ms_network_connect(ms_conn_t *c, char *srv_host_name, const int srv_port,
675 const bool is_udp, int *ret_sfd) {
676 int sfd;
677 struct linger ling = {0, 0};
678 struct addrinfo *ai;
679 struct addrinfo *next;
680 struct addrinfo hints;
681 char port_buf[NI_MAXSERV];
682 int error;
683 int success = 0;
684
685 int flags = 1;
686
687 /*
688 * the memset call clears nonstandard fields in some impementations
689 * that otherwise mess things up.
690 */
691 memset(&hints, 0, sizeof(hints));
692 #ifdef AI_ADDRCONFIG
693 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
694 #else
695 hints.ai_flags = AI_PASSIVE;
696 #endif /* AI_ADDRCONFIG */
697 if (is_udp) {
698 hints.ai_protocol = IPPROTO_UDP;
699 hints.ai_socktype = SOCK_DGRAM;
700 hints.ai_family = AF_INET; /* This left here because of issues with OSX 10.5 */
701 } else {
702 hints.ai_family = AF_UNSPEC;
703 hints.ai_protocol = IPPROTO_TCP;
704 hints.ai_socktype = SOCK_STREAM;
705 }
706
707 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
708 error = getaddrinfo(srv_host_name, port_buf, &hints, &ai);
709 if (error != 0) {
710 if (error != EAI_SYSTEM)
711 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
712 else
713 perror("getaddrinfo()");
714
715 return -1;
716 }
717
718 for (next = ai; next; next = next->ai_next) {
719 if ((sfd = ms_new_socket(next)) == -1) {
720 freeaddrinfo(ai);
721 return -1;
722 }
723
724 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *) &flags, sizeof(flags));
725 if (is_udp) {
726 ms_maximize_sndbuf(sfd);
727 } else {
728 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &flags, sizeof(flags));
729 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *) &ling, sizeof(ling));
730 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *) &flags, sizeof(flags));
731 }
732
733 if (is_udp) {
734 c->srv_recv_addr_size = sizeof(struct sockaddr);
735 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
736 } else {
737 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) {
738 close(sfd);
739 freeaddrinfo(ai);
740 return -1;
741 }
742 }
743
744 if (((flags = fcntl(sfd, F_GETFL, 0)) < 0) || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)) {
745 fprintf(stderr, "setting O_NONBLOCK\n");
746 close(sfd);
747 freeaddrinfo(ai);
748 return -1;
749 }
750
751 if (ret_sfd != NULL) {
752 *ret_sfd = sfd;
753 }
754
755 success++;
756 }
757
758 freeaddrinfo(ai);
759
760 /* Return zero if we detected no errors in starting up connections */
761 return success == 0;
762 } /* ms_network_connect */
763
764 /**
765 * reconnect a disconnected sock
766 *
767 * @param c, pointer of the concurrency
768 *
769 * @return int, if success, return EXIT_SUCCESS, else return -1
770 */
771 static int ms_reconn(ms_conn_t *c) {
772 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
773 uint32_t srv_idx = 0;
774 uint32_t srv_conn_cnt = 0;
775
776 if (ms_setting.rep_write_srv > 0) {
777 srv_idx = c->cur_idx % ms_setting.srv_cnt;
778 srv_conn_cnt = ms_setting.sock_per_conn * ms_setting.nconns;
779 } else {
780 srv_idx = ms_thread->thread_ctx->srv_idx;
781 srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt;
782 }
783
784 /* close the old socket handler */
785 close(c->sfd);
786 c->tcpsfd[c->cur_idx] = 0;
787
788 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) % srv_conn_cnt == 0) {
789 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
790 fprintf(stderr, "Server %s:%d disconnect\n", ms_setting.servers[srv_idx].srv_host_name,
791 ms_setting.servers[srv_idx].srv_port);
792 }
793
794 if (ms_setting.rep_write_srv > 0) {
795 uint32_t i = 0;
796
797 for (i = 0; i < c->total_sfds; i++) {
798 if (c->tcpsfd[i] != 0) {
799 break;
800 }
801 }
802
803 /* all socks disconnect */
804 if (i == c->total_sfds) {
805 return -1;
806 }
807 } else {
808 do {
809 /* reconnect success, break the loop */
810 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
811 ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &c->sfd)
812 == 0)
813 {
814 c->tcpsfd[c->cur_idx] = c->sfd;
815 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) % (uint32_t) srv_conn_cnt
816 == 0) {
817 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
818 int reconn_time = (int) (ms_setting.servers[srv_idx].reconn_time.tv_sec
819 - ms_setting.servers[srv_idx].disconn_time.tv_sec);
820 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
821 ms_setting.servers[srv_idx].srv_host_name, ms_setting.servers[srv_idx].srv_port,
822 reconn_time);
823 }
824 break;
825 }
826
827 if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) {
828 /* wait a second and reconnect */
829 sleep(1);
830 }
831 } while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
832 }
833
834 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) {
835 c->sfd = 0;
836 c->alive_sfds--;
837 }
838
839 return EXIT_SUCCESS;
840 } /* ms_reconn */
841
842 /**
843 * reconnect several disconnected socks in the connection
844 * structure, the ever-1-second timer of the thread will check
845 * whether some socks in the connections disconnect. if
846 * disconnect, reconnect the sock.
847 *
848 * @param c, pointer of the concurrency
849 *
850 * @return int, if success, return EXIT_SUCCESS, else return -1
851 */
852 int ms_reconn_socks(ms_conn_t *c) {
853 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
854 uint32_t srv_idx = 0;
855 int ret_sfd = 0;
856 uint32_t srv_conn_cnt = 0;
857 struct timeval cur_time;
858
859 assert(c != NULL);
860
861 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds)) {
862 return EXIT_SUCCESS;
863 }
864
865 for (uint32_t i = 0; i < c->total_sfds; i++) {
866 if (c->tcpsfd[i] == 0) {
867 gettimeofday(&cur_time, NULL);
868
869 /**
870 * For failover test of replication, reconnect the socks after
871 * it disconnects more than 5 seconds, Otherwise memslap will
872 * block at connect() function and the work threads can't work
873 * in this interval.
874 */
875 if (cur_time.tv_sec - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) {
876 break;
877 }
878
879 if (ms_setting.rep_write_srv > 0) {
880 srv_idx = i % ms_setting.srv_cnt;
881 srv_conn_cnt = ms_setting.sock_per_conn * ms_setting.nconns;
882 } else {
883 srv_idx = ms_thread->thread_ctx->srv_idx;
884 srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt;
885 }
886
887 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
888 ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &ret_sfd)
889 == 0)
890 {
891 c->tcpsfd[i] = ret_sfd;
892 c->alive_sfds++;
893
894 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) % (uint32_t) srv_conn_cnt
895 == 0) {
896 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
897 int reconn_time = (int) (ms_setting.servers[srv_idx].reconn_time.tv_sec
898 - ms_setting.servers[srv_idx].disconn_time.tv_sec);
899 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
900 ms_setting.servers[srv_idx].srv_host_name, ms_setting.servers[srv_idx].srv_port,
901 reconn_time);
902 }
903 }
904 }
905 }
906
907 return EXIT_SUCCESS;
908 } /* ms_reconn_socks */
909
910 /**
911 * Tokenize the command string by replacing whitespace with '\0' and update
912 * the token array tokens with pointer to start of each token and length.
913 * Returns total number of tokens. The last valid token is the terminal
914 * token (value points to the first unprocessed character of the string and
915 * length zero).
916 *
917 * Usage example:
918 *
919 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
920 * for(int ix = 0; tokens[ix].length != 0; ix++) {
921 * ...
922 * }
923 * ncommand = tokens[ix].value - command;
924 * command = tokens[ix].value;
925 * }
926 *
927 * @param command, the command string to token
928 * @param tokens, array to store tokens
929 * @param max_tokens, maximum tokens number
930 *
931 * @return int, the number of tokens
932 */
933 static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens) {
934 char *s, *e;
935 int ntokens = 0;
936
937 assert(command != NULL && tokens != NULL && max_tokens > 1);
938
939 for (s = e = command; ntokens < max_tokens - 1; ++e) {
940 if (*e == ' ') {
941 if (s != e) {
942 tokens[ntokens].value = s;
943 tokens[ntokens].length = (size_t)(e - s);
944 ntokens++;
945 *e = '\0';
946 }
947 s = e + 1;
948 } else if (*e == '\0') {
949 if (s != e) {
950 tokens[ntokens].value = s;
951 tokens[ntokens].length = (size_t)(e - s);
952 ntokens++;
953 }
954
955 break; /* string end */
956 }
957 }
958
959 return ntokens;
960 } /* ms_tokenize_command */
961
962 /**
963 * parse the response of server.
964 *
965 * @param c, pointer of the concurrency
966 * @param command, the string responded by server
967 *
968 * @return int, if the command completed return EXIT_SUCCESS, else return
969 * -1
970 */
971 static int ms_ascii_process_line(ms_conn_t *c, char *command) {
972 int ret = 0;
973 int64_t value_len;
974 char *buffer = command;
975
976 assert(c != NULL);
977
978 /**
979 * for command get, we store the returned value into local buffer
980 * then continue in ms_complete_nread().
981 */
982
983 switch (buffer[0]) {
984 case 'V': /* VALUE || VERSION */
985 if (buffer[1] == 'A') /* VALUE */ {
986 token_t tokens[MAX_TOKENS];
987 ms_tokenize_command(command, tokens, MAX_TOKENS);
988 errno = 0;
989 value_len = strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
990 if (errno != 0) {
991 printf("<%d ERROR %s\n", c->sfd, strerror(errno));
992 }
993 memcpy(&c->currcmd.key_prefix, tokens[KEY_TOKEN].value, sizeof(c->currcmd.key_prefix));
994
995 /*
996 * We read the \r\n into the string since not doing so is more
997 * cycles then the waster of memory to do so.
998 *
999 * We are null terminating through, which will most likely make
1000 * some people lazy about using the return length.
1001 */
1002 c->rvbytes = (int) (value_len + 2);
1003 c->readval = true;
1004 ret = -1;
1005 }
1006
1007 break;
1008
1009 case 'O': /* OK */ c->currcmd.retstat = MCD_SUCCESS; break;
1010
1011 case 'S': /* STORED STATS SERVER_ERROR */
1012 if (buffer[2] == 'A') /* STORED STATS */ { /* STATS*/
1013 c->currcmd.retstat = MCD_STAT;
1014 } else if (buffer[1] == 'E') {
1015 /* SERVER_ERROR */
1016 printf("<%d %s\n", c->sfd, buffer);
1017
1018 c->currcmd.retstat = MCD_SERVER_ERROR;
1019 } else if (buffer[1] == 'T') {
1020 /* STORED */
1021 c->currcmd.retstat = MCD_STORED;
1022 } else {
1023 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1024 }
1025 break;
1026
1027 case 'D': /* DELETED DATA */
1028 if (buffer[1] == 'E') {
1029 c->currcmd.retstat = MCD_DELETED;
1030 } else {
1031 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1032 }
1033
1034 break;
1035
1036 case 'N': /* NOT_FOUND NOT_STORED*/
1037 if (buffer[4] == 'F') {
1038 c->currcmd.retstat = MCD_NOTFOUND;
1039 } else if (buffer[4] == 'S') {
1040 printf("<%d %s\n", c->sfd, buffer);
1041 c->currcmd.retstat = MCD_NOTSTORED;
1042 } else {
1043 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1044 }
1045 break;
1046
1047 case 'E': /* PROTOCOL ERROR or END */
1048 if (buffer[1] == 'N') {
1049 /* END */
1050 c->currcmd.retstat = MCD_END;
1051 } else if (buffer[1] == 'R') {
1052 printf("<%d ERROR\n", c->sfd);
1053 c->currcmd.retstat = MCD_PROTOCOL_ERROR;
1054 } else if (buffer[1] == 'X') {
1055 c->currcmd.retstat = MCD_DATA_EXISTS;
1056 printf("<%d %s\n", c->sfd, buffer);
1057 } else {
1058 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
1059 }
1060 break;
1061
1062 case 'C': /* CLIENT ERROR */
1063 printf("<%d %s\n", c->sfd, buffer);
1064 c->currcmd.retstat = MCD_CLIENT_ERROR;
1065 break;
1066
1067 default: c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; break;
1068 } /* switch */
1069
1070 return ret;
1071 } /* ms_ascii_process_line */
1072
1073 /**
1074 * after one operation completes, reset the concurrency
1075 *
1076 * @param c, pointer of the concurrency
1077 * @param timeout, whether it's timeout
1078 */
1079 void ms_reset_conn(ms_conn_t *c, bool timeout) {
1080 assert(c != NULL);
1081
1082 if (c->udp) {
1083 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) {
1084 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t) c->packets);
1085 }
1086
1087 c->packets = 0;
1088 c->recvpkt = 0;
1089 c->pktcurr = 0;
1090 c->ordcurr = 0;
1091 c->rudpbytes = 0;
1092 }
1093 c->currcmd.isfinish = true;
1094 c->ctnwrite = false;
1095 c->rbytes = 0;
1096 c->rcurr = c->rbuf;
1097 c->msgcurr = 0;
1098 c->msgused = 0;
1099 c->iovused = 0;
1100 ms_conn_set_state(c, conn_write);
1101 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1102
1103 if (timeout) {
1104 ms_drive_machine(c);
1105 }
1106 } /* ms_reset_conn */
1107
1108 /**
1109 * if we have a complete line in the buffer, process it.
1110 *
1111 * @param c, pointer of the concurrency
1112 *
1113 * @return int, if success, return EXIT_SUCCESS, else return -1
1114 */
1115 static int ms_try_read_line(ms_conn_t *c) {
1116 if (c->protocol == binary_prot) {
1117 /* Do we have the complete packet header? */
1118 if ((uint64_t) c->rbytes < sizeof(c->binary_header)) {
1119 /* need more data! */
1120 return EXIT_SUCCESS;
1121 } else {
1122 #ifdef NEED_ALIGN
1123 if (((long) (c->rcurr)) % 8 != 0) {
1124 /* must realign input buffer */
1125 memmove(c->rbuf, c->rcurr, c->rbytes);
1126 c->rcurr = c->rbuf;
1127 if (settings.verbose) {
1128 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1129 }
1130 }
1131 #endif
1132 protocol_binary_response_header *rsp;
1133 rsp = (protocol_binary_response_header *) c->rcurr;
1134
1135 c->binary_header = *rsp;
1136 c->binary_header.response.extlen = rsp->response.extlen;
1137 c->binary_header.response.keylen = ntohs(rsp->response.keylen);
1138 c->binary_header.response.bodylen = ntohl(rsp->response.bodylen);
1139 c->binary_header.response.status = ntohs(rsp->response.status);
1140
1141 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) {
1142 fprintf(stderr, "Invalid magic: %x\n", c->binary_header.response.magic);
1143 ms_conn_set_state(c, conn_closing);
1144 return EXIT_SUCCESS;
1145 }
1146
1147 /* process this complete response */
1148 if (ms_bin_process_response(c) == 0) {
1149 /* current operation completed */
1150 ms_reset_conn(c, false);
1151 return -1;
1152 } else {
1153 c->rbytes -= (int32_t) sizeof(c->binary_header);
1154 c->rcurr += sizeof(c->binary_header);
1155 }
1156 }
1157 } else {
1158 char *el, *cont;
1159
1160 assert(c != NULL);
1161 assert(c->rcurr <= (c->rbuf + c->rsize));
1162
1163 if (c->rbytes == 0)
1164 return EXIT_SUCCESS;
1165
1166 el = memchr(c->rcurr, '\n', (size_t) c->rbytes);
1167 if (!el)
1168 return EXIT_SUCCESS;
1169
1170 cont = el + 1;
1171 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r')) {
1172 el--;
1173 }
1174 *el = '\0';
1175
1176 assert(cont <= (c->rcurr + c->rbytes));
1177
1178 /* process this complete line */
1179 if (ms_ascii_process_line(c, c->rcurr) == 0) {
1180 /* current operation completed */
1181 ms_reset_conn(c, false);
1182 return -1;
1183 } else {
1184 /* current operation didn't complete */
1185 c->rbytes -= (int32_t)(cont - c->rcurr);
1186 c->rcurr = cont;
1187 }
1188
1189 assert(c->rcurr <= (c->rbuf + c->rsize));
1190 }
1191
1192 return -1;
1193 } /* ms_try_read_line */
1194
1195 /**
1196 * because the packet of UDP can't ensure the order, the
1197 * function is used to sort the received udp packet.
1198 *
1199 * @param c, pointer of the concurrency
1200 * @param buf, the buffer to store the ordered packages data
1201 * @param rbytes, the maximum capacity of the buffer
1202 *
1203 * @return int, if success, return the copy bytes, else return
1204 * -1
1205 */
1206 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes) {
1207 int len = 0;
1208 int wbytes = 0;
1209 uint16_t req_id = 0;
1210 uint16_t seq_num = 0;
1211 uint16_t packets = 0;
1212 unsigned char *header = NULL;
1213
1214 /* no enough data */
1215 assert(c != NULL);
1216 assert(buf != NULL);
1217 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1218
1219 /* calculate received packets count */
1220 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) {
1221 /* the last packet has some data */
1222 c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1223 } else {
1224 c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1225 }
1226
1227 /* get the total packets count if necessary */
1228 if (c->packets == 0) {
1229 c->packets = HEADER_TO_PACKETS((unsigned char *) c->rudpbuf);
1230 }
1231
1232 /* build the ordered packet array */
1233 for (int i = c->pktcurr; i < c->recvpkt; i++) {
1234 header = (unsigned char *) c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1235 req_id = (uint16_t) HEADER_TO_REQID(header);
1236 assert(req_id == c->request_id % (1 << 16));
1237
1238 packets = (uint16_t) HEADER_TO_PACKETS(header);
1239 assert(c->packets == HEADER_TO_PACKETS(header));
1240
1241 seq_num = (uint16_t) HEADER_TO_SEQNUM(header);
1242 c->udppkt[seq_num].header = header;
1243 c->udppkt[seq_num].data = (char *) header + UDP_HEADER_SIZE;
1244
1245 if (i == c->recvpkt - 1) {
1246 /* last received packet */
1247 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) {
1248 c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1249 c->pktcurr++;
1250 } else {
1251 c->udppkt[seq_num].rbytes = c->rudpbytes % UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1252 }
1253 } else {
1254 c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1255 c->pktcurr++;
1256 }
1257 }
1258
1259 for (int i = c->ordcurr; i < c->recvpkt; i++) {
1260 /* there is some data to copy */
1261 if ((c->udppkt[i].data != NULL) && (c->udppkt[i].copybytes < c->udppkt[i].rbytes)) {
1262 header = c->udppkt[i].header;
1263 len = c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1264 if (len > rbytes - wbytes) {
1265 len = rbytes - wbytes;
1266 }
1267
1268 assert(len <= rbytes - wbytes);
1269 assert(i == HEADER_TO_SEQNUM(header));
1270
1271 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, (size_t) len);
1272 wbytes += len;
1273 c->udppkt[i].copybytes += len;
1274
1275 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1276 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1277 {
1278 /* finish copying all the data of this packet, next */
1279 c->ordcurr++;
1280 }
1281
1282 /* last received packet, and finish copying all the data */
1283 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1284 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1285 {
1286 break;
1287 }
1288
1289 /* no space to copy data */
1290 if (wbytes >= rbytes) {
1291 break;
1292 }
1293
1294 /* it doesn't finish reading all the data of the packet from network */
1295 if ((i != c->recvpkt - 1) && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) {
1296 break;
1297 }
1298 } else {
1299 /* no data to copy */
1300 break;
1301 }
1302 }
1303 (void) packets;
1304
1305 return wbytes == 0 ? -1 : wbytes;
1306 } /* ms_sort_udp_packet */
1307
1308 /**
1309 * encapsulate upd read like tcp read
1310 *
1311 * @param c, pointer of the concurrency
1312 * @param buf, read buffer
1313 * @param len, length to read
1314 *
1315 * @return int, if success, return the read bytes, else return
1316 * -1
1317 */
1318 static int ms_udp_read(ms_conn_t *c, char *buf, int len) {
1319 int res = 0;
1320 int avail = 0;
1321 int rbytes = 0;
1322 int copybytes = 0;
1323
1324 assert(c->udp);
1325
1326 while (1) {
1327 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) {
1328 char *new_rbuf = realloc(c->rudpbuf, (size_t) c->rudpsize * 2);
1329 if (!new_rbuf) {
1330 fprintf(stderr, "Couldn't realloc input buffer.\n");
1331 c->rudpbytes = 0; /* ignore what we read */
1332 return -1;
1333 }
1334 c->rudpbuf = new_rbuf;
1335 c->rudpsize *= 2;
1336 }
1337
1338 avail = c->rudpsize - c->rudpbytes;
1339 /* UDP each time read a packet, 1400 bytes */
1340 res = (int) read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t) avail);
1341
1342 if (res > 0) {
1343 atomic_add_size(&ms_stats.bytes_read, res);
1344 c->rudpbytes += res;
1345 rbytes += res;
1346 if (res == avail) {
1347 continue;
1348 } else {
1349 break;
1350 }
1351 }
1352
1353 if (res == 0) {
1354 /* "connection" closed */
1355 return res;
1356 }
1357
1358 if (res == -1) {
1359 /* no data to read */
1360 return res;
1361 }
1362 }
1363
1364 /* copy data to read buffer */
1365 if (rbytes > 0) {
1366 copybytes = ms_sort_udp_packet(c, buf, len);
1367 }
1368
1369 if (copybytes == -1) {
1370 atomic_add_size(&ms_stats.pkt_disorder, 1);
1371 }
1372
1373 return copybytes;
1374 } /* ms_udp_read */
1375
1376 /*
1377 * read from network as much as we can, handle buffer overflow and connection
1378 * close.
1379 * before reading, move the remaining incomplete fragment of a command
1380 * (if any) to the beginning of the buffer.
1381 * return EXIT_SUCCESS if there's nothing to read on the first read.
1382 */
1383
1384 /**
1385 * read from network as much as we can, handle buffer overflow and connection
1386 * close. before reading, move the remaining incomplete fragment of a command
1387 * (if any) to the beginning of the buffer.
1388 *
1389 * @param c, pointer of the concurrency
1390 *
1391 * @return int,
1392 * return EXIT_SUCCESS if there's nothing to read on the first read.
1393 * return EXIT_FAILURE if get data
1394 * return -1 if error happens
1395 */
1396 static int ms_try_read_network(ms_conn_t *c) {
1397 int gotdata = 0;
1398 int res;
1399 int64_t avail;
1400
1401 assert(c != NULL);
1402
1403 if ((c->rcurr != c->rbuf)
1404 && (!c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1405 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1406 {
1407 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1408 memmove(c->rbuf, c->rcurr, (size_t) c->rbytes);
1409 c->rcurr = c->rbuf;
1410 }
1411
1412 while (1) {
1413 if (c->rbytes >= c->rsize) {
1414 char *new_rbuf = realloc(c->rbuf, (size_t) c->rsize * 2);
1415 if (!new_rbuf) {
1416 fprintf(stderr, "Couldn't realloc input buffer.\n");
1417 c->rbytes = 0; /* ignore what we read */
1418 return -1;
1419 }
1420 c->rcurr = c->rbuf = new_rbuf;
1421 c->rsize *= 2;
1422 }
1423
1424 avail = c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1425 if (avail == 0) {
1426 break;
1427 }
1428
1429 if (c->udp) {
1430 res = (int32_t) ms_udp_read(c, c->rcurr + c->rbytes, (int32_t) avail);
1431 } else {
1432 res = (int) read(c->sfd, c->rcurr + c->rbytes, (size_t) avail);
1433 }
1434
1435 if (res > 0) {
1436 if (!c->udp) {
1437 atomic_add_size(&ms_stats.bytes_read, res);
1438 }
1439 gotdata = 1;
1440 c->rbytes += res;
1441 if (res == avail) {
1442 continue;
1443 } else {
1444 break;
1445 }
1446 }
1447 if (res == 0) {
1448 /* connection closed */
1449 ms_conn_set_state(c, conn_closing);
1450 return -1;
1451 }
1452 if (res == -1) {
1453 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1454 break;
1455 /* Should close on unhandled errors. */
1456 ms_conn_set_state(c, conn_closing);
1457 return -1;
1458 }
1459 }
1460
1461 return gotdata;
1462 } /* ms_try_read_network */
1463
1464 /**
1465 * after get the object from server, verify the value if
1466 * necessary.
1467 *
1468 * @param c, pointer of the concurrency
1469 * @param mlget_item, pointer of mulit-get task item structure
1470 * @param value, received value string
1471 * @param vlen, received value string length
1472 */
1473 static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, char *value, int vlen) {
1474 if (c->curr_task.verify) {
1475 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1476 char *orignval = &ms_setting.char_block[c->curr_task.item->value_offset];
1477 char *orignkey = &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1478
1479 /* verify expire time if necessary */
1480 if (c->curr_task.item->exp_time > 0) {
1481 struct timeval curr_time;
1482 gettimeofday(&curr_time, NULL);
1483
1484 /* object expired but get it now */
1485 if (curr_time.tv_sec - c->curr_task.item->client_time
1486 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1487 {
1488 atomic_add_size(&ms_stats.exp_get, 1);
1489
1490 if (ms_setting.verbose) {
1491 char set_time[64];
1492 char cur_time[64];
1493 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&c->curr_task.item->client_time));
1494 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&curr_time.tv_sec));
1495 fprintf(stderr,
1496 "\n<%d expire time verification failed, "
1497 "object expired but get it now\n"
1498 "\tkey len: %d\n"
1499 "\tkey: %" PRIx64 " %.*s\n"
1500 "\tset time: %s current time: %s "
1501 "diff time: %d expire time: %d\n"
1502 "\texpected data: \n"
1503 "\treceived data len: %d\n"
1504 "\treceived data: %.*s\n",
1505 c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix,
1506 c->curr_task.item->key_size - (int) KEY_PREFIX_SIZE, orignkey, set_time, cur_time,
1507 (int) (curr_time.tv_sec - c->curr_task.item->client_time),
1508 c->curr_task.item->exp_time, vlen, vlen, value);
1509 fflush(stderr);
1510 }
1511 }
1512 } else {
1513 if ((c->curr_task.item->value_size != vlen) || (memcmp(orignval, value, (size_t) vlen) != 0))
1514 {
1515 atomic_add_size(&ms_stats.vef_failed, 1);
1516
1517 if (ms_setting.verbose) {
1518 fprintf(stderr,
1519 "\n<%d data verification failed\n"
1520 "\tkey len: %d\n"
1521 "\tkey: %" PRIx64 " %.*s\n"
1522 "\texpected data len: %d\n"
1523 "\texpected data: %.*s\n"
1524 "\treceived data len: %d\n"
1525 "\treceived data: %.*s\n",
1526 c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix,
1527 c->curr_task.item->key_size - (int) KEY_PREFIX_SIZE, orignkey,
1528 c->curr_task.item->value_size, c->curr_task.item->value_size, orignval, vlen,
1529 vlen, value);
1530 fflush(stderr);
1531 }
1532 }
1533 }
1534
1535 c->curr_task.finish_verify = true;
1536
1537 if (mlget_item != NULL) {
1538 mlget_item->finish_verify = true;
1539 }
1540 }
1541 } /* ms_verify_value */
1542
1543 /**
1544 * For ASCII protocol, after store the data into the local
1545 * buffer, run this function to handle the data.
1546 *
1547 * @param c, pointer of the concurrency
1548 */
1549 static void ms_ascii_complete_nread(ms_conn_t *c) {
1550 assert(c != NULL);
1551 assert(c->rbytes >= c->rvbytes);
1552 assert(c->protocol == ascii_prot);
1553 if (c->rvbytes > 2) {
1554 assert(c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1555 }
1556
1557 /* multi-get */
1558 ms_mlget_task_item_t *mlget_item = NULL;
1559 if (((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1560 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1561 {
1562 c->mlget_task.value_index++;
1563 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index];
1564
1565 if (mlget_item->item->key_prefix == c->currcmd.key_prefix) {
1566 c->curr_task.item = mlget_item->item;
1567 c->curr_task.verify = mlget_item->verify;
1568 c->curr_task.finish_verify = mlget_item->finish_verify;
1569 mlget_item->get_miss = false;
1570 } else {
1571 /* Try to find the task item in multi-get task array */
1572 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
1573 mlget_item = &c->mlget_task.mlget_item[i];
1574 if (mlget_item->item->key_prefix == c->currcmd.key_prefix) {
1575 c->curr_task.item = mlget_item->item;
1576 c->curr_task.verify = mlget_item->verify;
1577 c->curr_task.finish_verify = mlget_item->finish_verify;
1578 mlget_item->get_miss = false;
1579
1580 break;
1581 }
1582 }
1583 }
1584 }
1585
1586 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1587
1588 c->curr_task.get_miss = false;
1589 c->rbytes -= c->rvbytes;
1590 c->rcurr = c->rcurr + c->rvbytes;
1591 assert(c->rcurr <= (c->rbuf + c->rsize));
1592 c->readval = false;
1593 c->rvbytes = 0;
1594 } /* ms_ascii_complete_nread */
1595
1596 /**
1597 * For binary protocol, after store the data into the local
1598 * buffer, run this function to handle the data.
1599 *
1600 * @param c, pointer of the concurrency
1601 */
1602 static void ms_bin_complete_nread(ms_conn_t *c) {
1603 assert(c != NULL);
1604 assert(c->rbytes >= c->rvbytes);
1605 assert(c->protocol == binary_prot);
1606
1607 int extlen = c->binary_header.response.extlen;
1608 int keylen = c->binary_header.response.keylen;
1609 uint8_t opcode = c->binary_header.response.opcode;
1610
1611 /* not get command or not include value, just return */
1612 if (((opcode != PROTOCOL_BINARY_CMD_GET) && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1613 || (c->rvbytes <= extlen + keylen))
1614 {
1615 /* get miss */
1616 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) {
1617 c->currcmd.retstat = MCD_END;
1618 c->curr_task.get_miss = true;
1619 }
1620
1621 c->readval = false;
1622 c->rvbytes = 0;
1623 ms_reset_conn(c, false);
1624 return;
1625 }
1626
1627 /* multi-get */
1628 ms_mlget_task_item_t *mlget_item = NULL;
1629 if (((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1630 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1631 {
1632 c->mlget_task.value_index++;
1633 mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index];
1634
1635 c->curr_task.item = mlget_item->item;
1636 c->curr_task.verify = mlget_item->verify;
1637 c->curr_task.finish_verify = mlget_item->finish_verify;
1638 mlget_item->get_miss = false;
1639 }
1640
1641 ms_verify_value(c, mlget_item, c->rcurr + extlen + keylen, c->rvbytes - extlen - keylen);
1642
1643 c->currcmd.retstat = MCD_END;
1644 c->curr_task.get_miss = false;
1645 c->rbytes -= c->rvbytes;
1646 c->rcurr = c->rcurr + c->rvbytes;
1647 assert(c->rcurr <= (c->rbuf + c->rsize));
1648 c->readval = false;
1649 c->rvbytes = 0;
1650
1651 if (ms_setting.mult_key_num > 1) {
1652 /* multi-get have check all the item */
1653 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) {
1654 ms_reset_conn(c, false);
1655 }
1656 } else {
1657 /* single get */
1658 ms_reset_conn(c, false);
1659 }
1660 } /* ms_bin_complete_nread */
1661
1662 /**
1663 * we get here after reading the value of get commands.
1664 *
1665 * @param c, pointer of the concurrency
1666 */
1667 static void ms_complete_nread(ms_conn_t *c) {
1668 assert(c != NULL);
1669 assert(c->rbytes >= c->rvbytes);
1670 assert(c->protocol == ascii_prot || c->protocol == binary_prot);
1671
1672 if (c->protocol == binary_prot) {
1673 ms_bin_complete_nread(c);
1674 } else {
1675 ms_ascii_complete_nread(c);
1676 }
1677 } /* ms_complete_nread */
1678
1679 /**
1680 * Adds a message header to a connection.
1681 *
1682 * @param c, pointer of the concurrency
1683 *
1684 * @return int, if success, return EXIT_SUCCESS, else return -1
1685 */
1686 static int ms_add_msghdr(ms_conn_t *c) {
1687 struct msghdr *msg;
1688
1689 assert(c != NULL);
1690
1691 if (c->msgsize == c->msgused) {
1692 msg = realloc(c->msglist, (size_t) c->msgsize * 2 * sizeof(struct msghdr));
1693 if (!msg)
1694 return -1;
1695
1696 c->msglist = msg;
1697 c->msgsize *= 2;
1698 }
1699
1700 msg = c->msglist + c->msgused;
1701
1702 /**
1703 * this wipes msg_iovlen, msg_control, msg_controllen, and
1704 * msg_flags, the last 3 of which aren't defined on solaris:
1705 */
1706 memset(msg, 0, sizeof(struct msghdr));
1707
1708 msg->msg_iov = &c->iov[c->iovused];
1709
1710 if (c->udp && (c->srv_recv_addr_size > 0)) {
1711 msg->msg_name = &c->srv_recv_addr;
1712 msg->msg_namelen = c->srv_recv_addr_size;
1713 }
1714
1715 c->msgbytes = 0;
1716 c->msgused++;
1717
1718 if (c->udp) {
1719 /* Leave room for the UDP header, which we'll fill in later. */
1720 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
1721 }
1722
1723 return EXIT_SUCCESS;
1724 } /* ms_add_msghdr */
1725
1726 /**
1727 * Ensures that there is room for another structure iovec in a connection's
1728 * iov list.
1729 *
1730 * @param c, pointer of the concurrency
1731 *
1732 * @return int, if success, return EXIT_SUCCESS, else return -1
1733 */
1734 static int ms_ensure_iov_space(ms_conn_t *c) {
1735 assert(c != NULL);
1736
1737 if (c->iovused >= c->iovsize) {
1738 int i, iovnum;
1739 struct iovec *new_iov =
1740 (struct iovec *) realloc(c->iov, ((size_t) c->iovsize * 2) * sizeof(struct iovec));
1741 if (!new_iov)
1742 return -1;
1743
1744 c->iov = new_iov;
1745 c->iovsize *= 2;
1746
1747 /* Point all the msghdr structures at the new list. */
1748 for (i = 0, iovnum = 0; i < c->msgused; i++) {
1749 c->msglist[i].msg_iov = &c->iov[iovnum];
1750 iovnum += (int) c->msglist[i].msg_iovlen;
1751 }
1752 }
1753
1754 return EXIT_SUCCESS;
1755 } /* ms_ensure_iov_space */
1756
1757 /**
1758 * Adds data to the list of pending data that will be written out to a
1759 * connection.
1760 *
1761 * @param c, pointer of the concurrency
1762 * @param buf, the buffer includes data to send
1763 * @param len, the data length in the buffer
1764 *
1765 * @return int, if success, return EXIT_SUCCESS, else return -1
1766 */
1767 static int ms_add_iov(ms_conn_t *c, const void *buf, int len) {
1768 struct msghdr *m;
1769 int leftover;
1770 bool limit_to_mtu;
1771
1772 assert(c != NULL);
1773
1774 do {
1775 m = &c->msglist[c->msgused - 1];
1776
1777 /*
1778 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
1779 */
1780 limit_to_mtu = c->udp;
1781
1782 #ifdef IOV_MAX
1783 /* We may need to start a new msghdr if this one is full. */
1784 if ((m->msg_iovlen == IOV_MAX) || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
1785 {
1786 ms_add_msghdr(c);
1787 m = &c->msglist[c->msgused - 1];
1788 }
1789 #endif
1790
1791 if (ms_ensure_iov_space(c) != 0)
1792 return -1;
1793
1794 /* If the fragment is too big to fit in the datagram, split it up */
1795 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE)) {
1796 leftover = len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
1797 len -= leftover;
1798 } else {
1799 leftover = 0;
1800 }
1801
1802 m = &c->msglist[c->msgused - 1];
1803 m->msg_iov[m->msg_iovlen].iov_base = (void *) buf;
1804 m->msg_iov[m->msg_iovlen].iov_len = (size_t) len;
1805
1806 c->msgbytes += len;
1807 c->iovused++;
1808 m->msg_iovlen++;
1809
1810 buf = ((char *) buf) + len;
1811 len = leftover;
1812 } while (leftover > 0);
1813
1814 return EXIT_SUCCESS;
1815 } /* ms_add_iov */
1816
1817 /**
1818 * Constructs a set of UDP headers and attaches them to the outgoing messages.
1819 *
1820 * @param c, pointer of the concurrency
1821 *
1822 * @return int, if success, return EXIT_SUCCESS, else return -1
1823 */
1824 static int ms_build_udp_headers(ms_conn_t *c) {
1825 int i;
1826 unsigned char *hdr;
1827
1828 assert(c != NULL);
1829
1830 c->request_id = ms_get_udp_request_id();
1831
1832 if (c->msgused > c->hdrsize) {
1833 void *new_hdrbuf;
1834 if (c->hdrbuf)
1835 new_hdrbuf = realloc(c->hdrbuf, (size_t) c->msgused * 2 * UDP_HEADER_SIZE);
1836 else
1837 new_hdrbuf = malloc((size_t) c->msgused * 2 * UDP_HEADER_SIZE);
1838 if (!new_hdrbuf)
1839 return -1;
1840
1841 c->hdrbuf = (unsigned char *) new_hdrbuf;
1842 c->hdrsize = c->msgused * 2;
1843 }
1844
1845 /* If this is a multi-packet request, drop it. */
1846 if (c->udp && (c->msgused > 1)) {
1847 fprintf(stderr, "multi-packet request for UDP not supported.\n");
1848 return -1;
1849 }
1850
1851 hdr = c->hdrbuf;
1852 for (i = 0; i < c->msgused; i++) {
1853 c->msglist[i].msg_iov[0].iov_base = (void *) hdr;
1854 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
1855 *hdr++ = (unsigned char) (c->request_id / 256);
1856 *hdr++ = (unsigned char) (c->request_id % 256);
1857 *hdr++ = (unsigned char) (i / 256);
1858 *hdr++ = (unsigned char) (i % 256);
1859 *hdr++ = (unsigned char) (c->msgused / 256);
1860 *hdr++ = (unsigned char) (c->msgused % 256);
1861 *hdr++ = (unsigned char) 1; /* support facebook memcached */
1862 *hdr++ = (unsigned char) 0;
1863 assert(hdr == ((unsigned char *) c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE));
1864 }
1865
1866 return EXIT_SUCCESS;
1867 } /* ms_build_udp_headers */
1868
1869 /**
1870 * Transmit the next chunk of data from our list of msgbuf structures.
1871 *
1872 * @param c, pointer of the concurrency
1873 *
1874 * @return TRANSMIT_COMPLETE All done writing.
1875 * TRANSMIT_INCOMPLETE More data remaining to write.
1876 * TRANSMIT_SOFT_ERROR Can't write any more right now.
1877 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1878 */
1879 static int ms_transmit(ms_conn_t *c) {
1880 assert(c != NULL);
1881
1882 if ((c->msgcurr < c->msgused) && (c->msglist[c->msgcurr].msg_iovlen == 0)) {
1883 /* Finished writing the current msg; advance to the next. */
1884 c->msgcurr++;
1885 }
1886
1887 if (c->msgcurr < c->msgused) {
1888 ssize_t res;
1889 struct msghdr *m = &c->msglist[c->msgcurr];
1890
1891 res = sendmsg(c->sfd, m, 0);
1892 if (res > 0) {
1893 atomic_add_size(&ms_stats.bytes_written, res);
1894
1895 /* We've written some of the data. Remove the completed
1896 * iovec entries from the list of pending writes. */
1897 while (m->msg_iovlen > 0 && res >= (ssize_t) m->msg_iov->iov_len) {
1898 res -= (ssize_t) m->msg_iov->iov_len;
1899 m->msg_iovlen--;
1900 m->msg_iov++;
1901 }
1902
1903 /* Might have written just part of the last iovec entry;
1904 * adjust it so the next write will do the rest. */
1905 if (res > 0) {
1906 m->msg_iov->iov_base = (void *) ((unsigned char *) m->msg_iov->iov_base + res);
1907 m->msg_iov->iov_len -= (size_t) res;
1908 }
1909 return TRANSMIT_INCOMPLETE;
1910 }
1911 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1912 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
1913 fprintf(stderr, "Couldn't update event.\n");
1914 ms_conn_set_state(c, conn_closing);
1915 return TRANSMIT_HARD_ERROR;
1916 }
1917 return TRANSMIT_SOFT_ERROR;
1918 }
1919
1920 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1921 * we have a real error, on which we close the connection */
1922 fprintf(stderr, "Failed to write, and not due to blocking.\n");
1923
1924 ms_conn_set_state(c, conn_closing);
1925 return TRANSMIT_HARD_ERROR;
1926 } else {
1927 return TRANSMIT_COMPLETE;
1928 }
1929 } /* ms_transmit */
1930
1931 /**
1932 * Shrinks a connection's buffers if they're too big. This prevents
1933 * periodic large "mget" response from server chewing lots of client
1934 * memory.
1935 *
1936 * This should only be called in between requests since it can wipe output
1937 * buffers!
1938 *
1939 * @param c, pointer of the concurrency
1940 */
1941 static void ms_conn_shrink(ms_conn_t *c) {
1942 assert(c != NULL);
1943
1944 if (c->udp)
1945 return;
1946
1947 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE)) {
1948 char *newbuf;
1949
1950 if (c->rcurr != c->rbuf)
1951 memmove(c->rbuf, c->rcurr, (size_t) c->rbytes);
1952
1953 newbuf = (char *) realloc((void *) c->rbuf, DATA_BUFFER_SIZE);
1954
1955 if (newbuf) {
1956 c->rbuf = newbuf;
1957 c->rsize = DATA_BUFFER_SIZE;
1958 }
1959 c->rcurr = c->rbuf;
1960 }
1961
1962 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
1963 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
1964 {
1965 char *new_rbuf = (char *) realloc(c->rudpbuf, (size_t) c->rudpsize * 2);
1966 if (new_rbuf) {
1967 c->rudpbuf = new_rbuf;
1968 c->rudpsize = UDP_DATA_BUFFER_SIZE;
1969 }
1970 /* TODO check error condition? */
1971 }
1972
1973 if (c->msgsize > MSG_LIST_HIGHWAT) {
1974 struct msghdr *newbuf =
1975 (struct msghdr *) realloc((void *) c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
1976 if (newbuf) {
1977 c->msglist = newbuf;
1978 c->msgsize = MSG_LIST_INITIAL;
1979 }
1980 /* TODO check error condition? */
1981 }
1982
1983 if (c->iovsize > IOV_LIST_HIGHWAT) {
1984 struct iovec *newbuf =
1985 (struct iovec *) realloc((void *) c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
1986 if (newbuf) {
1987 c->iov = newbuf;
1988 c->iovsize = IOV_LIST_INITIAL;
1989 }
1990 /* TODO check return value */
1991 }
1992 } /* ms_conn_shrink */
1993
1994 /**
1995 * Sets a connection's current state in the state machine. Any special
1996 * processing that needs to happen on certain state transitions can
1997 * happen here.
1998 *
1999 * @param c, pointer of the concurrency
2000 * @param state, connection state
2001 */
2002 static void ms_conn_set_state(ms_conn_t *c, int state) {
2003 assert(c != NULL);
2004
2005 if (state != c->state) {
2006 if (state == conn_read) {
2007 ms_conn_shrink(c);
2008 }
2009 c->state = state;
2010 }
2011 } /* ms_conn_set_state */
2012
2013 /**
2014 * update the event if socks change state. for example: when
2015 * change the listen scoket read event to sock write event, or
2016 * change socket handler, we could call this function.
2017 *
2018 * @param c, pointer of the concurrency
2019 * @param new_flags, new event flags
2020 *
2021 * @return bool, if success, return true, else return false
2022 */
2023 static bool ms_update_event(ms_conn_t *c, const int new_flags) {
2024 assert(c != NULL);
2025
2026 struct event_base *base = c->event.ev_base;
2027 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2028 && (!ms_setting.facebook_test || (c->total_sfds == 1)))
2029 {
2030 return true;
2031 }
2032
2033 if (event_del(&c->event) == -1) {
2034 /* try to delete the event again */
2035 if (event_del(&c->event) == -1) {
2036 return false;
2037 }
2038 }
2039
2040 event_set(&c->event, c->sfd, (short) new_flags, ms_event_handler, (void *) c);
2041 event_base_set(base, &c->event);
2042 c->ev_flags = (short) new_flags;
2043
2044 if (event_add(&c->event, NULL) == -1) {
2045 return false;
2046 }
2047
2048 return true;
2049 } /* ms_update_event */
2050
2051 /**
2052 * If user want to get the expected throughput, we could limit
2053 * the performance of memslap. we could give up some work and
2054 * just wait a short time. The function is used to check this
2055 * case.
2056 *
2057 * @param c, pointer of the concurrency
2058 *
2059 * @return bool, if success, return true, else return false
2060 */
2061 static bool ms_need_yield(ms_conn_t *c) {
2062 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
2063 int64_t tps = 0;
2064 int64_t time_diff = 0;
2065 struct timeval curr_time;
2066 ms_task_t *task = &c->curr_task;
2067
2068 if (ms_setting.expected_tps > 0) {
2069 gettimeofday(&curr_time, NULL);
2070 time_diff = ms_time_diff(&ms_thread->startup_time, &curr_time);
2071 tps = (int64_t)(((task->get_opt + task->set_opt) / (uint64_t) time_diff) * 1000000);
2072
2073 /* current throughput is greater than expected throughput */
2074 if (tps > ms_thread->thread_ctx->tps_perconn) {
2075 return true;
2076 }
2077 }
2078
2079 return false;
2080 } /* ms_need_yield */
2081
2082 /**
2083 * used to update the start time of each operation
2084 *
2085 * @param c, pointer of the concurrency
2086 */
2087 static void ms_update_start_time(ms_conn_t *c) {
2088 ms_task_item_t *item = c->curr_task.item;
2089
2090 if ((ms_setting.stat_freq > 0) || c->udp || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2091 {
2092 gettimeofday(&c->start_time, NULL);
2093 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)) {
2094 /* record the current time */
2095 item->client_time = c->start_time.tv_sec;
2096 }
2097 }
2098 } /* ms_update_start_time */
2099
2100 /**
2101 * run the state machine
2102 *
2103 * @param c, pointer of the concurrency
2104 */
2105 static void ms_drive_machine(ms_conn_t *c) {
2106 bool stop = false;
2107
2108 assert(c != NULL);
2109
2110 while (!stop) {
2111 switch (c->state) {
2112 case conn_read:
2113 if (c->readval) {
2114 if (c->rbytes >= c->rvbytes) {
2115 ms_complete_nread(c);
2116 break;
2117 }
2118 } else {
2119 if (ms_try_read_line(c) != 0) {
2120 break;
2121 }
2122 }
2123
2124 if (ms_try_read_network(c) != 0) {
2125 break;
2126 }
2127
2128 /* doesn't read all the response data, wait event wake up */
2129 if (!c->currcmd.isfinish) {
2130 if (!ms_update_event(c, EV_READ | EV_PERSIST)) {
2131 fprintf(stderr, "Couldn't update event.\n");
2132 ms_conn_set_state(c, conn_closing);
2133 break;
2134 }
2135 stop = true;
2136 break;
2137 }
2138
2139 /* we have no command line and no data to read from network, next write */
2140 ms_conn_set_state(c, conn_write);
2141 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2142
2143 break;
2144
2145 case conn_write:
2146 if (!c->ctnwrite && ms_need_yield(c)) {
2147 usleep(10);
2148
2149 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2150 fprintf(stderr, "Couldn't update event.\n");
2151 ms_conn_set_state(c, conn_closing);
2152 break;
2153 }
2154 stop = true;
2155 break;
2156 }
2157
2158 if (!c->ctnwrite && (ms_exec_task(c) != 0)) {
2159 ms_conn_set_state(c, conn_closing);
2160 break;
2161 }
2162
2163 /* record the start time before starting to send data if necessary */
2164 if (!c->ctnwrite || (c->change_sfd && c->ctnwrite)) {
2165 if (c->change_sfd) {
2166 c->change_sfd = false;
2167 }
2168 ms_update_start_time(c);
2169 }
2170
2171 /* change sfd if necessary */
2172 if (c->change_sfd) {
2173 c->ctnwrite = true;
2174 stop = true;
2175 break;
2176 }
2177
2178 /* execute task until nothing need be written to network */
2179 if (!c->ctnwrite && (c->msgcurr == c->msgused)) {
2180 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2181 fprintf(stderr, "Couldn't update event.\n");
2182 ms_conn_set_state(c, conn_closing);
2183 break;
2184 }
2185 stop = true;
2186 break;
2187 }
2188
2189 switch (ms_transmit(c)) {
2190 case TRANSMIT_COMPLETE:
2191 /* we have no data to write to network, next wait repose */
2192 if (!ms_update_event(c, EV_READ | EV_PERSIST)) {
2193 fprintf(stderr, "Couldn't update event.\n");
2194 ms_conn_set_state(c, conn_closing);
2195 c->ctnwrite = false;
2196 break;
2197 }
2198 ms_conn_set_state(c, conn_read);
2199 c->ctnwrite = false;
2200 stop = true;
2201 break;
2202
2203 case TRANSMIT_INCOMPLETE: c->ctnwrite = true; break; /* Continue in state machine. */
2204
2205 case TRANSMIT_HARD_ERROR: c->ctnwrite = false; break;
2206
2207 case TRANSMIT_SOFT_ERROR:
2208 c->ctnwrite = true;
2209 stop = true;
2210 break;
2211
2212 default: break;
2213 } /* switch */
2214
2215 break;
2216
2217 case conn_closing:
2218 /* recovery mode, need reconnect if connection close */
2219 if (ms_setting.reconnect
2220 && (!ms_global.time_out || ((ms_setting.run_time == 0) && (c->remain_exec_num > 0))))
2221 {
2222 if (ms_reconn(c) != 0) {
2223 ms_conn_close(c);
2224 stop = true;
2225 break;
2226 }
2227
2228 ms_reset_conn(c, false);
2229
2230 if (c->total_sfds == 1) {
2231 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2232 fprintf(stderr, "Couldn't update event.\n");
2233 ms_conn_set_state(c, conn_closing);
2234 break;
2235 }
2236 }
2237
2238 break;
2239 } else {
2240 ms_conn_close(c);
2241 stop = true;
2242 break;
2243 }
2244
2245 default: assert(0);
2246 } /* switch */
2247 }
2248 } /* ms_drive_machine */
2249
2250 /**
2251 * the event handler of each thread
2252 *
2253 * @param fd, the file descriptor of socket
2254 * @param which, event flag
2255 * @param arg, argument
2256 */
2257 void ms_event_handler(const int fd, const short which, void *arg) {
2258 ms_conn_t *c = (ms_conn_t *) arg;
2259
2260 assert(c != NULL);
2261
2262 c->which = which;
2263
2264 /* sanity */
2265 if (fd != c->sfd) {
2266 fprintf(stderr, "Catastrophic: event fd: %d doesn't match conn fd: %d\n", fd, c->sfd);
2267 ms_conn_close(c);
2268 exit(1);
2269 }
2270 assert(fd == c->sfd);
2271
2272 ms_drive_machine(c);
2273
2274 /* wait for next event */
2275 } /* ms_event_handler */
2276
2277 /**
2278 * get the next socket descriptor index to run for replication
2279 *
2280 * @param c, pointer of the concurrency
2281 * @param cmd, command(get or set )
2282 *
2283 * @return int, if success, return the index, else return EXIT_SUCCESS
2284 */
2285 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) {
2286 uint32_t sock_index = 0;
2287 uint32_t i = 0;
2288
2289 if (c->total_sfds == 1) {
2290 return EXIT_SUCCESS;
2291 }
2292
2293 if (ms_setting.rep_write_srv == 0) {
2294 return sock_index;
2295 }
2296
2297 do {
2298 if (cmd == CMD_SET) {
2299 for (i = 0; i < ms_setting.rep_write_srv; i++) {
2300 if (c->tcpsfd[i] > 0) {
2301 break;
2302 }
2303 }
2304
2305 if (i == ms_setting.rep_write_srv) {
2306 /* random get one replication server to read */
2307 sock_index = (uint32_t) random() % c->total_sfds;
2308 } else {
2309 /* random get one replication writing server to write */
2310 sock_index = (uint32_t) random() % ms_setting.rep_write_srv;
2311 }
2312 } else if (cmd == CMD_GET) {
2313 /* random get one replication server to read */
2314 sock_index = (uint32_t) random() % c->total_sfds;
2315 }
2316 } while (c->tcpsfd[sock_index] == 0);
2317
2318 return sock_index;
2319 } /* ms_get_rep_sock_index */
2320
2321 /**
2322 * get the next socket descriptor index to run
2323 *
2324 * @param c, pointer of the concurrency
2325 *
2326 * @return int, return the index
2327 */
2328 static uint32_t ms_get_next_sock_index(ms_conn_t *c) {
2329 uint32_t sock_index = 0;
2330
2331 do {
2332 sock_index = (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2333 } while (c->tcpsfd[sock_index] == 0);
2334
2335 return sock_index;
2336 } /* ms_get_next_sock_index */
2337
2338 /**
2339 * update socket event of the connections
2340 *
2341 * @param c, pointer of the concurrency
2342 *
2343 * @return int, if success, return EXIT_SUCCESS, else return -1
2344 */
2345 static int ms_update_conn_sock_event(ms_conn_t *c) {
2346 assert(c != NULL);
2347
2348 switch (c->currcmd.cmd) {
2349 case CMD_SET:
2350 if (ms_setting.facebook_test && c->udp) {
2351 c->sfd = c->tcpsfd[0];
2352 c->udp = false;
2353 c->change_sfd = true;
2354 }
2355 break;
2356
2357 case CMD_GET:
2358 if (ms_setting.facebook_test && !c->udp) {
2359 c->sfd = c->udpsfd;
2360 c->udp = true;
2361 c->change_sfd = true;
2362 }
2363 break;
2364
2365 default: break;
2366 } /* switch */
2367
2368 if (!c->udp && (c->total_sfds > 1)) {
2369 if (c->cur_idx != c->total_sfds) {
2370 if (ms_setting.rep_write_srv == 0) {
2371 c->cur_idx = ms_get_next_sock_index(c);
2372 } else {
2373 c->cur_idx = ms_get_rep_sock_index(c, c->currcmd.cmd);
2374 }
2375 } else {
2376 /* must select the first sock of the connection at the beginning */
2377 c->cur_idx = 0;
2378 }
2379
2380 c->sfd = c->tcpsfd[c->cur_idx];
2381 assert(c->sfd != 0);
2382 c->change_sfd = true;
2383 }
2384
2385 if (c->change_sfd) {
2386 if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
2387 fprintf(stderr, "Couldn't update event.\n");
2388 ms_conn_set_state(c, conn_closing);
2389 return -1;
2390 }
2391 }
2392
2393 return EXIT_SUCCESS;
2394 } /* ms_update_conn_sock_event */
2395
2396 /**
2397 * for ASCII protocol, this function build the set command
2398 * string and send the command.
2399 *
2400 * @param c, pointer of the concurrency
2401 * @param item, pointer of task item which includes the object
2402 * information
2403 *
2404 * @return int, if success, return EXIT_SUCCESS, else return -1
2405 */
2406 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) {
2407 int value_offset;
2408 int write_len;
2409 char *buffer = c->wbuf;
2410
2411 write_len = snprintf(buffer, c->wsize, " %u %d %d\r\n", 0, item->exp_time, item->value_size);
2412
2413 if (write_len > c->wsize || write_len < 0) {
2414 /* ought to be always enough. just fail for simplicity */
2415 fprintf(stderr, "output command line too long.\n");
2416 return -1;
2417 }
2418
2419 if (item->value_offset == INVALID_OFFSET) {
2420 value_offset = item->key_suffix_offset;
2421 } else {
2422 value_offset = item->value_offset;
2423 }
2424
2425 if ((ms_add_iov(c, "set ", 4) != 0)
2426 || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE) != 0)
2427 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2428 item->key_size - (int) KEY_PREFIX_SIZE)
2429 != 0)
2430 || (ms_add_iov(c, buffer, write_len) != 0)
2431 || (ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size) != 0)
2432 || (ms_add_iov(c, "\r\n", 2) != 0) || (c->udp && (ms_build_udp_headers(c) != 0)))
2433 {
2434 return -1;
2435 }
2436
2437 return EXIT_SUCCESS;
2438 } /* ms_build_ascii_write_buf_set */
2439
2440 /**
2441 * used to send set command to server
2442 *
2443 * @param c, pointer of the concurrency
2444 * @param item, pointer of task item which includes the object
2445 * information
2446 *
2447 * @return int, if success, return EXIT_SUCCESS, else return -1
2448 */
2449 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) {
2450 assert(c != NULL);
2451
2452 c->currcmd.cmd = CMD_SET;
2453 c->currcmd.isfinish = false;
2454 c->currcmd.retstat = MCD_FAILURE;
2455
2456 if (ms_update_conn_sock_event(c) != 0) {
2457 return -1;
2458 }
2459
2460 c->msgcurr = 0;
2461 c->msgused = 0;
2462 c->iovused = 0;
2463 if (ms_add_msghdr(c) != 0) {
2464 fprintf(stderr, "Out of memory preparing request.");
2465 return -1;
2466 }
2467
2468 /* binary protocol */
2469 if (c->protocol == binary_prot) {
2470 if (ms_build_bin_write_buf_set(c, item) != 0) {
2471 return -1;
2472 }
2473 } else {
2474 if (ms_build_ascii_write_buf_set(c, item) != 0) {
2475 return -1;
2476 }
2477 }
2478
2479 atomic_add_size(&ms_stats.obj_bytes, item->key_size + item->value_size);
2480 atomic_add_size(&ms_stats.cmd_set, 1);
2481
2482 return EXIT_SUCCESS;
2483 } /* ms_mcd_set */
2484
2485 /**
2486 * for ASCII protocol, this function build the get command
2487 * string and send the command.
2488 *
2489 * @param c, pointer of the concurrency
2490 * @param item, pointer of task item which includes the object
2491 * information
2492 *
2493 * @return int, if success, return EXIT_SUCCESS, else return -1
2494 */
2495 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) {
2496 if ((ms_add_iov(c, "get ", 4) != 0)
2497 || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE) != 0)
2498 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2499 item->key_size - (int) KEY_PREFIX_SIZE)
2500 != 0)
2501 || (ms_add_iov(c, "\r\n", 2) != 0) || (c->udp && (ms_build_udp_headers(c) != 0)))
2502 {
2503 return -1;
2504 }
2505
2506 return EXIT_SUCCESS;
2507 } /* ms_build_ascii_write_buf_get */
2508
2509 /**
2510 * used to send the get command to server
2511 *
2512 * @param c, pointer of the concurrency
2513 * @param item, pointer of task item which includes the object
2514 * information
2515 *
2516 * @return int, if success, return EXIT_SUCCESS, else return -1
2517 */
2518 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) {
2519 assert(c != NULL);
2520
2521 c->currcmd.cmd = CMD_GET;
2522 c->currcmd.isfinish = false;
2523 c->currcmd.retstat = MCD_FAILURE;
2524
2525 if (ms_update_conn_sock_event(c) != 0) {
2526 return -1;
2527 }
2528
2529 c->msgcurr = 0;
2530 c->msgused = 0;
2531 c->iovused = 0;
2532 if (ms_add_msghdr(c) != 0) {
2533 fprintf(stderr, "Out of memory preparing request.");
2534 return -1;
2535 }
2536
2537 /* binary protocol */
2538 if (c->protocol == binary_prot) {
2539 if (ms_build_bin_write_buf_get(c, item) != 0) {
2540 return -1;
2541 }
2542 } else {
2543 if (ms_build_ascii_write_buf_get(c, item) != 0) {
2544 return -1;
2545 }
2546 }
2547
2548 atomic_add_size(&ms_stats.cmd_get, 1);
2549
2550 return EXIT_SUCCESS;
2551 } /* ms_mcd_get */
2552
2553 /**
2554 * for ASCII protocol, this function build the multi-get command
2555 * string and send the command.
2556 *
2557 * @param c, pointer of the concurrency
2558 *
2559 * @return int, if success, return EXIT_SUCCESS, else return -1
2560 */
2561 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) {
2562 ms_task_item_t *item;
2563
2564 if (ms_add_iov(c, "get", 3) != 0) {
2565 return -1;
2566 }
2567
2568 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2569 item = c->mlget_task.mlget_item[i].item;
2570 assert(item != NULL);
2571 if ((ms_add_iov(c, " ", 1) != 0)
2572 || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE) != 0)
2573 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2574 item->key_size - (int) KEY_PREFIX_SIZE)
2575 != 0))
2576 {
2577 return -1;
2578 }
2579 }
2580
2581 if ((ms_add_iov(c, "\r\n", 2) != 0) || (c->udp && (ms_build_udp_headers(c) != 0))) {
2582 return -1;
2583 }
2584
2585 return EXIT_SUCCESS;
2586 } /* ms_build_ascii_write_buf_mlget */
2587
2588 /**
2589 * used to send the multi-get command to server
2590 *
2591 * @param c, pointer of the concurrency
2592 *
2593 * @return int, if success, return EXIT_SUCCESS, else return -1
2594 */
2595 int ms_mcd_mlget(ms_conn_t *c) {
2596 ms_task_item_t *item;
2597
2598 assert(c != NULL);
2599 assert(c->mlget_task.mlget_num >= 1);
2600
2601 c->currcmd.cmd = CMD_GET;
2602 c->currcmd.isfinish = false;
2603 c->currcmd.retstat = MCD_FAILURE;
2604
2605 if (ms_update_conn_sock_event(c) != 0) {
2606 return -1;
2607 }
2608
2609 c->msgcurr = 0;
2610 c->msgused = 0;
2611 c->iovused = 0;
2612 if (ms_add_msghdr(c) != 0) {
2613 fprintf(stderr, "Out of memory preparing request.");
2614 return -1;
2615 }
2616
2617 /* binary protocol */
2618 if (c->protocol == binary_prot) {
2619 if (ms_build_bin_write_buf_mlget(c) != 0) {
2620 return -1;
2621 }
2622 } else {
2623 if (ms_build_ascii_write_buf_mlget(c) != 0) {
2624 return -1;
2625 }
2626 }
2627
2628 /* decrease operation time of each item */
2629 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2630 item = c->mlget_task.mlget_item[i].item;
2631 atomic_add_size(&ms_stats.cmd_get, 1);
2632 }
2633
2634 (void) item;
2635
2636 return EXIT_SUCCESS;
2637 } /* ms_mcd_mlget */
2638
2639 /**
2640 * binary protocol support
2641 */
2642
2643 /**
2644 * for binary protocol, parse the response of server
2645 *
2646 * @param c, pointer of the concurrency
2647 *
2648 * @return int, if success, return EXIT_SUCCESS, else return -1
2649 */
2650 static int ms_bin_process_response(ms_conn_t *c) {
2651 const char *errstr = NULL;
2652
2653 assert(c != NULL);
2654
2655 uint32_t bodylen = c->binary_header.response.bodylen;
2656 uint8_t opcode = c->binary_header.response.opcode;
2657 uint16_t status = c->binary_header.response.status;
2658
2659 if (bodylen > 0) {
2660 c->rvbytes = (int32_t) bodylen;
2661 c->readval = true;
2662 return EXIT_FAILURE;
2663 } else {
2664 switch (status) {
2665 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
2666 if (opcode == PROTOCOL_BINARY_CMD_SET) {
2667 c->currcmd.retstat = MCD_STORED;
2668 } else if (opcode == PROTOCOL_BINARY_CMD_DELETE) {
2669 c->currcmd.retstat = MCD_DELETED;
2670 } else if (opcode == PROTOCOL_BINARY_CMD_GET) {
2671 c->currcmd.retstat = MCD_END;
2672 }
2673 break;
2674
2675 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
2676 errstr = "Out of memory";
2677 c->currcmd.retstat = MCD_SERVER_ERROR;
2678 break;
2679
2680 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
2681 errstr = "Unknown command";
2682 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
2683 break;
2684
2685 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
2686 errstr = "Not found";
2687 c->currcmd.retstat = MCD_NOTFOUND;
2688 break;
2689
2690 case PROTOCOL_BINARY_RESPONSE_EINVAL:
2691 errstr = "Invalid arguments";
2692 c->currcmd.retstat = MCD_PROTOCOL_ERROR;
2693 break;
2694
2695 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: errstr = "Data exists for key."; break;
2696
2697 case PROTOCOL_BINARY_RESPONSE_E2BIG:
2698 errstr = "Too large.";
2699 c->currcmd.retstat = MCD_SERVER_ERROR;
2700 break;
2701
2702 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
2703 errstr = "Not stored.";
2704 c->currcmd.retstat = MCD_NOTSTORED;
2705 break;
2706
2707 default:
2708 errstr = "Unknown error";
2709 c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
2710 break;
2711 } /* switch */
2712
2713 if (errstr != NULL) {
2714 fprintf(stderr, "%s\n", errstr);
2715 }
2716 }
2717
2718 return EXIT_SUCCESS;
2719 } /* ms_bin_process_response */
2720
2721 /* build binary header and add the header to the buffer to send */
2722
2723 /**
2724 * build binary header and add the header to the buffer to send
2725 *
2726 * @param c, pointer of the concurrency
2727 * @param opcode, operation code
2728 * @param hdr_len, length of header
2729 * @param key_len, length of key
2730 * @param body_len. length of body
2731 */
2732 static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, uint16_t key_len,
2733 uint32_t body_len) {
2734 protocol_binary_request_header *header;
2735
2736 assert(c != NULL);
2737
2738 header = (protocol_binary_request_header *) c->wcurr;
2739
2740 header->request.magic = (uint8_t) PROTOCOL_BINARY_REQ;
2741 header->request.opcode = (uint8_t) opcode;
2742 header->request.keylen = htons(key_len);
2743
2744 header->request.extlen = (uint8_t) hdr_len;
2745 header->request.datatype = (uint8_t) PROTOCOL_BINARY_RAW_BYTES;
2746 header->request.vbucket = 0;
2747
2748 header->request.bodylen = htonl(body_len);
2749 header->request.opaque = 0;
2750 header->request.cas = 0;
2751
2752 ms_add_iov(c, c->wcurr, sizeof(header->request));
2753 } /* ms_add_bin_header */
2754
2755 /**
2756 * add the key to the socket write buffer array
2757 *
2758 * @param c, pointer of the concurrency
2759 * @param item, pointer of task item which includes the object
2760 * information
2761 */
2762 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) {
2763 ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE);
2764 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2765 item->key_size - (int) KEY_PREFIX_SIZE);
2766 }
2767
2768 /**
2769 * for binary protocol, this function build the set command
2770 * and add the command to send buffer array.
2771 *
2772 * @param c, pointer of the concurrency
2773 * @param item, pointer of task item which includes the object
2774 * information
2775 *
2776 * @return int, if success, return EXIT_SUCCESS, else return -1
2777 */
2778 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) {
2779 assert(c->wbuf == c->wcurr);
2780
2781 int value_offset;
2782 protocol_binary_request_set *rep = (protocol_binary_request_set *) c->wcurr;
2783 uint16_t keylen = (uint16_t) item->key_size;
2784 uint32_t bodylen =
2785 (uint32_t) sizeof(rep->message.body) + (uint32_t) keylen + (uint32_t) item->value_size;
2786
2787 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_SET, sizeof(rep->message.body), keylen, bodylen);
2788 rep->message.body.flags = 0;
2789 rep->message.body.expiration = htonl((uint32_t) item->exp_time);
2790 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
2791 ms_add_key_to_iov(c, item);
2792
2793 if (item->value_offset == INVALID_OFFSET) {
2794 value_offset = item->key_suffix_offset;
2795 } else {
2796 value_offset = item->value_offset;
2797 }
2798 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
2799
2800 return EXIT_SUCCESS;
2801 } /* ms_build_bin_write_buf_set */
2802
2803 /**
2804 * for binary protocol, this function build the get command and
2805 * add the command to send buffer array.
2806 *
2807 * @param c, pointer of the concurrency
2808 * @param item, pointer of task item which includes the object
2809 * information
2810 *
2811 * @return int, if success, return EXIT_SUCCESS, else return -1
2812 */
2813 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) {
2814 assert(c->wbuf == c->wcurr);
2815
2816 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t) item->key_size,
2817 (uint32_t) item->key_size);
2818 ms_add_key_to_iov(c, item);
2819
2820 return EXIT_SUCCESS;
2821 } /* ms_build_bin_write_buf_get */
2822
2823 /**
2824 * for binary protocol, this function build the multi-get
2825 * command and add the command to send buffer array.
2826 *
2827 * @param c, pointer of the concurrency
2828 * @param item, pointer of task item which includes the object
2829 * information
2830 *
2831 * @return int, if success, return EXIT_SUCCESS, else return -1
2832 */
2833 static int ms_build_bin_write_buf_mlget(ms_conn_t *c) {
2834 ms_task_item_t *item;
2835
2836 assert(c->wbuf == c->wcurr);
2837
2838 for (int i = 0; i < c->mlget_task.mlget_num; i++) {
2839 item = c->mlget_task.mlget_item[i].item;
2840 assert(item != NULL);
2841
2842 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t) item->key_size,
2843 (uint32_t) item->key_size);
2844 ms_add_key_to_iov(c, item);
2845 c->wcurr += sizeof(protocol_binary_request_get);
2846 }
2847
2848 c->wcurr = c->wbuf;
2849
2850 return EXIT_SUCCESS;
2851 } /* ms_build_bin_write_buf_mlget */