2a65f6cfa423cf83951fba2c8e20db67789243ab
[awesomized/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23
24 #if defined(HAVE_SYS_TIME_H)
25 # include <sys/time.h>
26 #endif
27
28 #if defined(HAVE_TIME_H)
29 # include <time.h>
30 #endif
31
32 #include "ms_setting.h"
33 #include "ms_thread.h"
34 #include "ms_atomic.h"
35
36 #ifdef linux
37 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
38 * optimize the conversion functions, but the prototypes generate warnings
39 * from gcc. The conversion methods isn't the bottleneck for my app, so
40 * just remove the warnings by undef'ing the optimization ..
41 */
42 #undef ntohs
43 #undef ntohl
44 #undef htons
45 #undef htonl
46 #endif
47
48 /* for network write */
49 #define TRANSMIT_COMPLETE 0
50 #define TRANSMIT_INCOMPLETE 1
51 #define TRANSMIT_SOFT_ERROR 2
52 #define TRANSMIT_HARD_ERROR 3
53
54 /* for generating key */
55 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
56 #define KEY_PREFIX_MASK 0x1010101010101010
57
58 /* For parse the value length return by server */
59 #define KEY_TOKEN 1
60 #define VALUELEN_TOKEN 3
61
62 /* global increasing counter, to ensure the key prefix unique */
63 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
64
65 /* global increasing counter, generating request id for UDP */
66 static volatile uint32_t udp_request_id= 0;
67
68 extern pthread_key_t ms_thread_key;
69
70 /* generate upd request id */
71 static uint32_t ms_get_udp_request_id(void);
72
73
74 /* connect initialize */
75 static void ms_task_init(ms_conn_t *c);
76 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
77 static int ms_conn_sock_init(ms_conn_t *c);
78 static int ms_conn_event_init(ms_conn_t *c);
79 static int ms_conn_init(ms_conn_t *c,
80 const int init_state,
81 const int read_buffer_size,
82 const bool is_udp);
83 static void ms_warmup_num_init(ms_conn_t *c);
84 static int ms_item_win_init(ms_conn_t *c);
85
86
87 /* connection close */
88 void ms_conn_free(ms_conn_t *c);
89 static void ms_conn_close(ms_conn_t *c);
90
91
92 /* create network connection */
93 static int ms_new_socket(struct addrinfo *ai);
94 static void ms_maximize_sndbuf(const int sfd);
95 static int ms_network_connect(ms_conn_t *c,
96 char *srv_host_name,
97 const int srv_port,
98 const bool is_udp,
99 int *ret_sfd);
100 static int ms_reconn(ms_conn_t *c);
101
102
103 /* read and parse */
104 static int ms_tokenize_command(char *command,
105 token_t *tokens,
106 const int max_tokens);
107 static int ms_ascii_process_line(ms_conn_t *c, char *command);
108 static int ms_try_read_line(ms_conn_t *c);
109 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
110 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
111 static int ms_try_read_network(ms_conn_t *c);
112 static void ms_verify_value(ms_conn_t *c,
113 ms_mlget_task_item_t *mlget_item,
114 char *value,
115 int vlen);
116 static void ms_ascii_complete_nread(ms_conn_t *c);
117 static void ms_bin_complete_nread(ms_conn_t *c);
118 static void ms_complete_nread(ms_conn_t *c);
119
120
121 /* send functions */
122 static int ms_add_msghdr(ms_conn_t *c);
123 static int ms_ensure_iov_space(ms_conn_t *c);
124 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
125 static int ms_build_udp_headers(ms_conn_t *c);
126 static int ms_transmit(ms_conn_t *c);
127
128
129 /* status adjustment */
130 static void ms_conn_shrink(ms_conn_t *c);
131 static void ms_conn_set_state(ms_conn_t *c, int state);
132 static bool ms_update_event(ms_conn_t *c, const int new_flags);
133 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
134 static uint32_t ms_get_next_sock_index(ms_conn_t *c);
135 static int ms_update_conn_sock_event(ms_conn_t *c);
136 static bool ms_need_yield(ms_conn_t *c);
137 static void ms_update_start_time(ms_conn_t *c);
138
139
140 /* main loop */
141 static void ms_drive_machine(ms_conn_t *c);
142 void ms_event_handler(const int fd, const short which, void *arg);
143
144
145 /* ascii protocol */
146 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
147 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
148 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
149
150
151 /* binary protocol */
152 static int ms_bin_process_response(ms_conn_t *c);
153 static void ms_add_bin_header(ms_conn_t *c,
154 uint8_t opcode,
155 uint8_t hdr_len,
156 uint16_t key_len,
157 uint32_t body_len);
158 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
159 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
160 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
161 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
162
163
164 /**
165 * each key has two parts, prefix and suffix. The suffix is a
166 * string random get form the character table. The prefix is a
167 * uint64_t variable. And the prefix must be unique. we use the
168 * prefix to identify a key. And the prefix can't include
169 * character ' ' '\r' '\n' '\0'.
170 *
171 * @return uint64_t
172 */
173 uint64_t ms_get_key_prefix(void)
174 {
175 uint64_t key_prefix;
176
177 pthread_mutex_lock(&ms_global.seq_mutex);
178 key_prefix_seq|= KEY_PREFIX_MASK;
179 key_prefix= key_prefix_seq;
180 key_prefix_seq++;
181 pthread_mutex_unlock(&ms_global.seq_mutex);
182
183 return key_prefix;
184 } /* ms_get_key_prefix */
185
186
187 /**
188 * get an unique udp request id
189 *
190 * @return an unique UDP request id
191 */
192 static uint32_t ms_get_udp_request_id(void)
193 {
194 return atomic_add_32_nv(&udp_request_id, 1);
195 }
196
197
198 /**
199 * initialize current task structure
200 *
201 * @param c, pointer of the concurrency
202 */
203 static void ms_task_init(ms_conn_t *c)
204 {
205 c->curr_task.cmd= CMD_NULL;
206 c->curr_task.item= 0;
207 c->curr_task.verify= false;
208 c->curr_task.finish_verify= true;
209 c->curr_task.get_miss= true;
210
211 c->curr_task.get_opt= 0;
212 c->curr_task.set_opt= 0;
213 c->curr_task.cycle_undo_get= 0;
214 c->curr_task.cycle_undo_set= 0;
215 c->curr_task.verified_get= 0;
216 c->curr_task.overwrite_set= 0;
217 } /* ms_task_init */
218
219
220 /**
221 * initialize udp for the connection structure
222 *
223 * @param c, pointer of the concurrency
224 * @param is_udp, whether it's udp
225 *
226 * @return int, if success, return EXIT_SUCCESS, else return -1
227 */
228 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
229 {
230 c->hdrbuf= 0;
231 c->rudpbuf= 0;
232 c->udppkt= 0;
233
234 c->rudpsize= UDP_DATA_BUFFER_SIZE;
235 c->hdrsize= 0;
236
237 c->rudpbytes= 0;
238 c->packets= 0;
239 c->recvpkt= 0;
240 c->pktcurr= 0;
241 c->ordcurr= 0;
242
243 c->udp= is_udp;
244
245 if (c->udp || (! c->udp && ms_setting.facebook_test))
246 {
247 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
248 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
249
250 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
251 {
252 if (c->rudpbuf != NULL)
253 free(c->rudpbuf);
254 if (c->udppkt != NULL)
255 free(c->udppkt);
256 fprintf(stderr, "malloc()\n");
257 return -1;
258 }
259 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
260 }
261
262 return EXIT_SUCCESS;
263 } /* ms_conn_udp_init */
264
265
266 /**
267 * initialize the connection structure
268 *
269 * @param c, pointer of the concurrency
270 * @param init_state, (conn_read, conn_write, conn_closing)
271 * @param read_buffer_size
272 * @param is_udp, whether it's udp
273 *
274 * @return int, if success, return EXIT_SUCCESS, else return -1
275 */
276 static int ms_conn_init(ms_conn_t *c,
277 const int init_state,
278 const int read_buffer_size,
279 const bool is_udp)
280 {
281 assert(c != NULL);
282
283 c->rbuf= c->wbuf= 0;
284 c->iov= 0;
285 c->msglist= 0;
286
287 c->rsize= read_buffer_size;
288 c->wsize= WRITE_BUFFER_SIZE;
289 c->iovsize= IOV_LIST_INITIAL;
290 c->msgsize= MSG_LIST_INITIAL;
291
292 /* for replication, each connection need connect all the server */
293 if (ms_setting.rep_write_srv > 0)
294 {
295 c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
296 }
297 else
298 {
299 c->total_sfds= ms_setting.sock_per_conn;
300 }
301 c->alive_sfds= 0;
302
303 c->rbuf= (char *)malloc((size_t)c->rsize);
304 c->wbuf= (char *)malloc((size_t)c->wsize);
305 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
306 c->msglist= (struct msghdr *)malloc(
307 sizeof(struct msghdr) * (size_t)c->msgsize);
308 if (ms_setting.mult_key_num > 1)
309 {
310 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
311 malloc(
312 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
313 }
314 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
315
316 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
317 || (c->msglist == NULL) || (c->tcpsfd == NULL)
318 || ((ms_setting.mult_key_num > 1)
319 && (c->mlget_task.mlget_item == NULL)))
320 {
321 if (c->rbuf != NULL)
322 free(c->rbuf);
323 if (c->wbuf != NULL)
324 free(c->wbuf);
325 if (c->iov != NULL)
326 free(c->iov);
327 if (c->msglist != NULL)
328 free(c->msglist);
329 if (c->mlget_task.mlget_item != NULL)
330 free(c->mlget_task.mlget_item);
331 if (c->tcpsfd != NULL)
332 free(c->tcpsfd);
333 fprintf(stderr, "malloc()\n");
334 return -1;
335 }
336
337 c->state= init_state;
338 c->rvbytes= 0;
339 c->rbytes= 0;
340 c->rcurr= c->rbuf;
341 c->wcurr= c->wbuf;
342 c->iovused= 0;
343 c->msgcurr= 0;
344 c->msgused= 0;
345 c->cur_idx= c->total_sfds; /* default index is a invalid value */
346
347 c->ctnwrite= false;
348 c->readval= false;
349 c->change_sfd= false;
350
351 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
352 c->precmd.isfinish= true; /* default the previous command finished */
353 c->currcmd.isfinish= false;
354 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
355 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
356
357 c->mlget_task.mlget_num= 0;
358 c->mlget_task.value_index= -1; /* default invalid value */
359
360 if (ms_setting.binary_prot_)
361 {
362 c->protocol= binary_prot;
363 }
364 else
365 {
366 c->protocol= ascii_prot;
367 }
368
369 /* initialize udp */
370 if (ms_conn_udp_init(c, is_udp) != 0)
371 {
372 return -1;
373 }
374
375 /* initialize task */
376 ms_task_init(c);
377
378 if (! (ms_setting.facebook_test && is_udp))
379 {
380 atomic_add_32(&ms_stats.active_conns, 1);
381 }
382
383 return EXIT_SUCCESS;
384 } /* ms_conn_init */
385
386
387 /**
388 * when doing 100% get operation, it could preset some objects
389 * to warmup the server. this function is used to initialize the
390 * number of the objects to preset.
391 *
392 * @param c, pointer of the concurrency
393 */
394 static void ms_warmup_num_init(ms_conn_t *c)
395 {
396 /* no set operation, preset all the items in the window */
397 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
398 {
399 c->warmup_num= c->win_size;
400 c->remain_warmup_num= c->warmup_num;
401 }
402 else
403 {
404 c->warmup_num= 0;
405 c->remain_warmup_num= c->warmup_num;
406 }
407 } /* ms_warmup_num_init */
408
409
410 /**
411 * each connection has an item window, this function initialize
412 * the window. The window is used to generate task.
413 *
414 * @param c, pointer of the concurrency
415 *
416 * @return int, if success, return EXIT_SUCCESS, else return -1
417 */
418 static int ms_item_win_init(ms_conn_t *c)
419 {
420 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
421 int exp_cnt= 0;
422
423 c->win_size= (int)ms_setting.win_size;
424 c->set_cursor= 0;
425 c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
426 c->remain_exec_num= c->exec_num;
427
428 c->item_win= (ms_task_item_t *)malloc(
429 sizeof(ms_task_item_t) * (size_t)c->win_size);
430 if (c->item_win == NULL)
431 {
432 fprintf(stderr, "Can't allocate task item array for conn.\n");
433 return -1;
434 }
435 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
436
437 for (int i= 0; i < c->win_size; i++)
438 {
439 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
440 c->item_win[i].key_prefix= ms_get_key_prefix();
441 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
442 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
443 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
444 c->item_win[i].client_time= 0;
445
446 /* set expire time base on the proportion */
447 if (exp_cnt < ms_setting.exp_ver_per * i)
448 {
449 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
450 exp_cnt++;
451 }
452 else
453 {
454 c->item_win[i].exp_time= 0;
455 }
456 }
457
458 ms_warmup_num_init(c);
459
460 return EXIT_SUCCESS;
461 } /* ms_item_win_init */
462
463
464 /**
465 * each connection structure can include one or more sock
466 * handlers. this function create these socks and connect the
467 * server(s).
468 *
469 * @param c, pointer of the concurrency
470 *
471 * @return int, if success, return EXIT_SUCCESS, else return -1
472 */
473 static int ms_conn_sock_init(ms_conn_t *c)
474 {
475 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
476 uint32_t i;
477 int ret_sfd;
478 uint32_t srv_idx= 0;
479
480 assert(c != NULL);
481 assert(c->tcpsfd != NULL);
482
483 for (i= 0; i < c->total_sfds; i++)
484 {
485 ret_sfd= 0;
486 if (ms_setting.rep_write_srv > 0)
487 {
488 /* for replication, each connection need connect all the server */
489 srv_idx= i % ms_setting.srv_cnt;
490 }
491 else
492 {
493 /* all the connections in a thread connects the same server */
494 srv_idx= ms_thread->thread_ctx->srv_idx;
495 }
496
497 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
498 ms_setting.servers[srv_idx].srv_port,
499 ms_setting.udp, &ret_sfd) != 0)
500 {
501 break;
502 }
503
504 if (i == 0)
505 {
506 c->sfd= ret_sfd;
507 }
508
509 if (! ms_setting.udp)
510 {
511 c->tcpsfd[i]= ret_sfd;
512 }
513
514 c->alive_sfds++;
515 }
516
517 /* initialize udp sock handler if necessary */
518 if (ms_setting.facebook_test)
519 {
520 ret_sfd= 0;
521 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
522 ms_setting.servers[srv_idx].srv_port,
523 true, &ret_sfd) != 0)
524 {
525 c->udpsfd= 0;
526 }
527 else
528 {
529 c->udpsfd= ret_sfd;
530 }
531 }
532
533 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
534 {
535 if (ms_setting.udp)
536 {
537 close(c->sfd);
538 }
539 else
540 {
541 for (uint32_t j= 0; j < i; j++)
542 {
543 close(c->tcpsfd[j]);
544 }
545 }
546
547 if (c->udpsfd != 0)
548 {
549 close(c->udpsfd);
550 }
551
552 return -1;
553 }
554
555 return EXIT_SUCCESS;
556 } /* ms_conn_sock_init */
557
558
559 /**
560 * each connection is managed by libevent, this function
561 * initialize the event of the connection structure.
562 *
563 * @param c, pointer of the concurrency
564 *
565 * @return int, if success, return EXIT_SUCCESS, else return -1
566 */
567 static int ms_conn_event_init(ms_conn_t *c)
568 {
569 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
570 short event_flags= EV_WRITE | EV_PERSIST;
571
572 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
573 event_base_set(ms_thread->base, &c->event);
574 c->ev_flags= event_flags;
575
576 if (event_add(&c->event, NULL) == -1)
577 {
578 return -1;
579 }
580
581 return EXIT_SUCCESS;
582 } /* ms_conn_event_init */
583
584
585 /**
586 * setup a connection, each connection structure of each
587 * thread must call this function to initialize.
588 *
589 * @param c, pointer of the concurrency
590 *
591 * @return int, if success, return EXIT_SUCCESS, else return -1
592 */
593 int ms_setup_conn(ms_conn_t *c)
594 {
595 if (ms_item_win_init(c) != 0)
596 {
597 return -1;
598 }
599
600 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
601 {
602 return -1;
603 }
604
605 if (ms_conn_sock_init(c) != 0)
606 {
607 return -1;
608 }
609
610 if (ms_conn_event_init(c) != 0)
611 {
612 return -1;
613 }
614
615 return EXIT_SUCCESS;
616 } /* ms_setup_conn */
617
618
619 /**
620 * Frees a connection.
621 *
622 * @param c, pointer of the concurrency
623 */
624 void ms_conn_free(ms_conn_t *c)
625 {
626 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
627 if (c != NULL)
628 {
629 if (c->hdrbuf != NULL)
630 free(c->hdrbuf);
631 if (c->msglist != NULL)
632 free(c->msglist);
633 if (c->rbuf != NULL)
634 free(c->rbuf);
635 if (c->wbuf != NULL)
636 free(c->wbuf);
637 if (c->iov != NULL)
638 free(c->iov);
639 if (c->mlget_task.mlget_item != NULL)
640 free(c->mlget_task.mlget_item);
641 if (c->rudpbuf != NULL)
642 free(c->rudpbuf);
643 if (c->udppkt != NULL)
644 free(c->udppkt);
645 if (c->item_win != NULL)
646 free(c->item_win);
647 if (c->tcpsfd != NULL)
648 free(c->tcpsfd);
649
650 if (--ms_thread->nactive_conn == 0)
651 {
652 free(ms_thread->conn);
653 }
654 }
655 } /* ms_conn_free */
656
657
658 /**
659 * close a connection
660 *
661 * @param c, pointer of the concurrency
662 */
663 static void ms_conn_close(ms_conn_t *c)
664 {
665 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
666 assert(c != NULL);
667
668 /* delete the event, the socket and the connection */
669 event_del(&c->event);
670
671 for (uint32_t i= 0; i < c->total_sfds; i++)
672 {
673 if (c->tcpsfd[i] > 0)
674 {
675 close(c->tcpsfd[i]);
676 }
677 }
678 c->sfd= 0;
679
680 if (ms_setting.facebook_test)
681 {
682 close(c->udpsfd);
683 }
684
685 atomic_dec_32(&ms_stats.active_conns);
686
687 ms_conn_free(c);
688
689 if (ms_setting.run_time == 0)
690 {
691 pthread_mutex_lock(&ms_global.run_lock.lock);
692 ms_global.run_lock.count++;
693 pthread_cond_signal(&ms_global.run_lock.cond);
694 pthread_mutex_unlock(&ms_global.run_lock.lock);
695 }
696
697 if (ms_thread->nactive_conn == 0)
698 {
699 pthread_exit(NULL);
700 }
701 } /* ms_conn_close */
702
703
704 /**
705 * create a new sock
706 *
707 * @param ai, server address information
708 *
709 * @return int, if success, return EXIT_SUCCESS, else return -1
710 */
711 static int ms_new_socket(struct addrinfo *ai)
712 {
713 int sfd;
714
715 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
716 {
717 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
718 return -1;
719 }
720
721 return sfd;
722 } /* ms_new_socket */
723
724
725 /**
726 * Sets a socket's send buffer size to the maximum allowed by the system.
727 *
728 * @param sfd, file descriptor of socket
729 */
730 static void ms_maximize_sndbuf(const int sfd)
731 {
732 socklen_t intsize= sizeof(int);
733 unsigned int last_good= 0;
734 unsigned int min, max, avg;
735 unsigned int old_size;
736
737 /* Start with the default size. */
738 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
739 {
740 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
741 return;
742 }
743
744 /* Binary-search for the real maximum. */
745 min= old_size;
746 max= MAX_SENDBUF_SIZE;
747
748 while (min <= max)
749 {
750 avg= ((unsigned int)(min + max)) / 2;
751 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
752 {
753 last_good= avg;
754 min= avg + 1;
755 }
756 else
757 {
758 max= avg - 1;
759 }
760 }
761 } /* ms_maximize_sndbuf */
762
763
764 /**
765 * socket connects the server
766 *
767 * @param c, pointer of the concurrency
768 * @param srv_host_name, the host name of the server
769 * @param srv_port, port of server
770 * @param is_udp, whether it's udp
771 * @param ret_sfd, the connected socket file descriptor
772 *
773 * @return int, if success, return EXIT_SUCCESS, else return -1
774 */
775 static int ms_network_connect(ms_conn_t *c,
776 char *srv_host_name,
777 const int srv_port,
778 const bool is_udp,
779 int *ret_sfd)
780 {
781 int sfd;
782 struct linger ling=
783 {
784 0, 0
785 };
786 struct addrinfo *ai;
787 struct addrinfo *next;
788 struct addrinfo hints;
789 char port_buf[NI_MAXSERV];
790 int error;
791 int success= 0;
792
793 int flags= 1;
794
795 /*
796 * the memset call clears nonstandard fields in some impementations
797 * that otherwise mess things up.
798 */
799 memset(&hints, 0, sizeof(hints));
800 #ifdef AI_ADDRCONFIG
801 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
802 #else
803 hints.ai_flags= AI_PASSIVE;
804 #endif /* AI_ADDRCONFIG */
805 if (is_udp)
806 {
807 hints.ai_protocol= IPPROTO_UDP;
808 hints.ai_socktype= SOCK_DGRAM;
809 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
810 }
811 else
812 {
813 hints.ai_family= AF_UNSPEC;
814 hints.ai_protocol= IPPROTO_TCP;
815 hints.ai_socktype= SOCK_STREAM;
816 }
817
818 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
819 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
820 if (error != 0)
821 {
822 if (error != EAI_SYSTEM)
823 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
824 else
825 perror("getaddrinfo()\n");
826
827 return -1;
828 }
829
830 for (next= ai; next; next= next->ai_next)
831 {
832 if ((sfd= ms_new_socket(next)) == -1)
833 {
834 freeaddrinfo(ai);
835 return -1;
836 }
837
838 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
839 if (is_udp)
840 {
841 ms_maximize_sndbuf(sfd);
842 }
843 else
844 {
845 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
846 sizeof(flags));
847 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
848 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
849 sizeof(flags));
850 }
851
852 if (is_udp)
853 {
854 c->srv_recv_addr_size= sizeof(struct sockaddr);
855 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
856 }
857 else
858 {
859 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
860 {
861 close(sfd);
862 freeaddrinfo(ai);
863 return -1;
864 }
865 }
866
867 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
868 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
869 {
870 fprintf(stderr, "setting O_NONBLOCK\n");
871 close(sfd);
872 freeaddrinfo(ai);
873 return -1;
874 }
875
876 if (ret_sfd != NULL)
877 {
878 *ret_sfd= sfd;
879 }
880
881 success++;
882 }
883
884 freeaddrinfo(ai);
885
886 /* Return zero if we detected no errors in starting up connections */
887 return success == 0;
888 } /* ms_network_connect */
889
890
891 /**
892 * reconnect a disconnected sock
893 *
894 * @param c, pointer of the concurrency
895 *
896 * @return int, if success, return EXIT_SUCCESS, else return -1
897 */
898 static int ms_reconn(ms_conn_t *c)
899 {
900 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
901 uint32_t srv_idx= 0;
902 uint32_t srv_conn_cnt= 0;
903
904 if (ms_setting.rep_write_srv > 0)
905 {
906 srv_idx= c->cur_idx % ms_setting.srv_cnt;
907 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
908 }
909 else
910 {
911 srv_idx= ms_thread->thread_ctx->srv_idx;
912 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
913 }
914
915 /* close the old socket handler */
916 close(c->sfd);
917 c->tcpsfd[c->cur_idx]= 0;
918
919 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
920 % srv_conn_cnt == 0)
921 {
922 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
923 fprintf(stderr, "Server %s:%d disconnect\n",
924 ms_setting.servers[srv_idx].srv_host_name,
925 ms_setting.servers[srv_idx].srv_port);
926 }
927
928 if (ms_setting.rep_write_srv > 0)
929 {
930 uint32_t i= 0;
931
932 for (i= 0; i < c->total_sfds; i++)
933 {
934 if (c->tcpsfd[i] != 0)
935 {
936 break;
937 }
938 }
939
940 /* all socks disconnect */
941 if (i == c->total_sfds)
942 {
943 return -1;
944 }
945 }
946 else
947 {
948 do
949 {
950 /* reconnect success, break the loop */
951 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
952 ms_setting.servers[srv_idx].srv_port,
953 ms_setting.udp, &c->sfd) == 0)
954 {
955 c->tcpsfd[c->cur_idx]= c->sfd;
956 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
957 % (uint32_t)srv_conn_cnt == 0)
958 {
959 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
960 int reconn_time=
961 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
962 - ms_setting.servers[srv_idx].disconn_time
963 .tv_sec);
964 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
965 ms_setting.servers[srv_idx].srv_host_name,
966 ms_setting.servers[srv_idx].srv_port, reconn_time);
967 }
968 break;
969 }
970
971 if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
972 {
973 /* wait a second and reconnect */
974 sleep(1);
975 }
976 }
977 while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
978 }
979
980 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
981 {
982 c->sfd= 0;
983 c->alive_sfds--;
984 }
985
986 return EXIT_SUCCESS;
987 } /* ms_reconn */
988
989
990 /**
991 * reconnect several disconnected socks in the connection
992 * structure, the ever-1-second timer of the thread will check
993 * whether some socks in the connections disconnect. if
994 * disconnect, reconnect the sock.
995 *
996 * @param c, pointer of the concurrency
997 *
998 * @return int, if success, return EXIT_SUCCESS, else return -1
999 */
1000 int ms_reconn_socks(ms_conn_t *c)
1001 {
1002 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
1003 uint32_t srv_idx= 0;
1004 int ret_sfd= 0;
1005 uint32_t srv_conn_cnt= 0;
1006 struct timeval cur_time;
1007
1008 assert(c != NULL);
1009
1010 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1011 {
1012 return EXIT_SUCCESS;
1013 }
1014
1015 for (uint32_t i= 0; i < c->total_sfds; i++)
1016 {
1017 if (c->tcpsfd[i] == 0)
1018 {
1019 gettimeofday(&cur_time, NULL);
1020
1021 /**
1022 * For failover test of replication, reconnect the socks after
1023 * it disconnects more than 5 seconds, Otherwise memslap will
1024 * block at connect() function and the work threads can't work
1025 * in this interval.
1026 */
1027 if (cur_time.tv_sec
1028 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1029 {
1030 break;
1031 }
1032
1033 if (ms_setting.rep_write_srv > 0)
1034 {
1035 srv_idx= i % ms_setting.srv_cnt;
1036 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
1037 }
1038 else
1039 {
1040 srv_idx= ms_thread->thread_ctx->srv_idx;
1041 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1042 }
1043
1044 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1045 ms_setting.servers[srv_idx].srv_port,
1046 ms_setting.udp, &ret_sfd) == 0)
1047 {
1048 c->tcpsfd[i]= ret_sfd;
1049 c->alive_sfds++;
1050
1051 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1052 % (uint32_t)srv_conn_cnt == 0)
1053 {
1054 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1055 int reconn_time=
1056 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1057 - ms_setting.servers[srv_idx].disconn_time
1058 .tv_sec);
1059 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1060 ms_setting.servers[srv_idx].srv_host_name,
1061 ms_setting.servers[srv_idx].srv_port, reconn_time);
1062 }
1063 }
1064 }
1065 }
1066
1067 return EXIT_SUCCESS;
1068 } /* ms_reconn_socks */
1069
1070
1071 /**
1072 * Tokenize the command string by replacing whitespace with '\0' and update
1073 * the token array tokens with pointer to start of each token and length.
1074 * Returns total number of tokens. The last valid token is the terminal
1075 * token (value points to the first unprocessed character of the string and
1076 * length zero).
1077 *
1078 * Usage example:
1079 *
1080 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1081 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1082 * ...
1083 * }
1084 * ncommand = tokens[ix].value - command;
1085 * command = tokens[ix].value;
1086 * }
1087 *
1088 * @param command, the command string to token
1089 * @param tokens, array to store tokens
1090 * @param max_tokens, maximum tokens number
1091 *
1092 * @return int, the number of tokens
1093 */
1094 static int ms_tokenize_command(char *command,
1095 token_t *tokens,
1096 const int max_tokens)
1097 {
1098 char *s, *e;
1099 int ntokens= 0;
1100
1101 assert(command != NULL && tokens != NULL && max_tokens > 1);
1102
1103 for (s= e= command; ntokens < max_tokens - 1; ++e)
1104 {
1105 if (*e == ' ')
1106 {
1107 if (s != e)
1108 {
1109 tokens[ntokens].value= s;
1110 tokens[ntokens].length= (size_t)(e - s);
1111 ntokens++;
1112 *e= '\0';
1113 }
1114 s= e + 1;
1115 }
1116 else if (*e == '\0')
1117 {
1118 if (s != e)
1119 {
1120 tokens[ntokens].value= s;
1121 tokens[ntokens].length= (size_t)(e - s);
1122 ntokens++;
1123 }
1124
1125 break; /* string end */
1126 }
1127 }
1128
1129 return ntokens;
1130 } /* ms_tokenize_command */
1131
1132
1133 /**
1134 * parse the response of server.
1135 *
1136 * @param c, pointer of the concurrency
1137 * @param command, the string responded by server
1138 *
1139 * @return int, if the command completed return EXIT_SUCCESS, else return
1140 * -1
1141 */
1142 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1143 {
1144 int ret= 0;
1145 int64_t value_len;
1146 char *buffer= command;
1147
1148 assert(c != NULL);
1149
1150 /**
1151 * for command get, we store the returned value into local buffer
1152 * then continue in ms_complete_nread().
1153 */
1154
1155 switch (buffer[0])
1156 {
1157 case 'V': /* VALUE || VERSION */
1158 if (buffer[1] == 'A') /* VALUE */
1159 {
1160 token_t tokens[MAX_TOKENS];
1161 ms_tokenize_command(command, tokens, MAX_TOKENS);
1162 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1163 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1164
1165 /*
1166 * We read the \r\n into the string since not doing so is more
1167 * cycles then the waster of memory to do so.
1168 *
1169 * We are null terminating through, which will most likely make
1170 * some people lazy about using the return length.
1171 */
1172 c->rvbytes= (int)(value_len + 2);
1173 c->readval= true;
1174 ret= -1;
1175 }
1176
1177 break;
1178
1179 case 'O': /* OK */
1180 c->currcmd.retstat= MCD_SUCCESS;
1181
1182 case 'S': /* STORED STATS SERVER_ERROR */
1183 if (buffer[2] == 'A') /* STORED STATS */
1184 { /* STATS*/
1185 c->currcmd.retstat= MCD_STAT;
1186 }
1187 else if (buffer[1] == 'E')
1188 {
1189 /* SERVER_ERROR */
1190 printf("<%d %s\n", c->sfd, buffer);
1191
1192 c->currcmd.retstat= MCD_SERVER_ERROR;
1193 }
1194 else if (buffer[1] == 'T')
1195 {
1196 /* STORED */
1197 c->currcmd.retstat= MCD_STORED;
1198 }
1199 else
1200 {
1201 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1202 }
1203 break;
1204
1205 case 'D': /* DELETED DATA */
1206 if (buffer[1] == 'E')
1207 {
1208 c->currcmd.retstat= MCD_DELETED;
1209 }
1210 else
1211 {
1212 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1213 }
1214
1215 break;
1216
1217 case 'N': /* NOT_FOUND NOT_STORED*/
1218 if (buffer[4] == 'F')
1219 {
1220 c->currcmd.retstat= MCD_NOTFOUND;
1221 }
1222 else if (buffer[4] == 'S')
1223 {
1224 printf("<%d %s\n", c->sfd, buffer);
1225 c->currcmd.retstat= MCD_NOTSTORED;
1226 }
1227 else
1228 {
1229 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1230 }
1231 break;
1232
1233 case 'E': /* PROTOCOL ERROR or END */
1234 if (buffer[1] == 'N')
1235 {
1236 /* END */
1237 c->currcmd.retstat= MCD_END;
1238 }
1239 else if (buffer[1] == 'R')
1240 {
1241 printf("<%d ERROR\n", c->sfd);
1242 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1243 }
1244 else if (buffer[1] == 'X')
1245 {
1246 c->currcmd.retstat= MCD_DATA_EXISTS;
1247 printf("<%d %s\n", c->sfd, buffer);
1248 }
1249 else
1250 {
1251 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1252 }
1253 break;
1254
1255 case 'C': /* CLIENT ERROR */
1256 printf("<%d %s\n", c->sfd, buffer);
1257 c->currcmd.retstat= MCD_CLIENT_ERROR;
1258 break;
1259
1260 default:
1261 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1262 break;
1263 } /* switch */
1264
1265 return ret;
1266 } /* ms_ascii_process_line */
1267
1268
1269 /**
1270 * after one operation completes, reset the concurrency
1271 *
1272 * @param c, pointer of the concurrency
1273 * @param timeout, whether it's timeout
1274 */
1275 void ms_reset_conn(ms_conn_t *c, bool timeout)
1276 {
1277 assert(c != NULL);
1278
1279 if (c->udp)
1280 {
1281 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1282 {
1283 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
1284 }
1285
1286 c->packets= 0;
1287 c->recvpkt= 0;
1288 c->pktcurr= 0;
1289 c->ordcurr= 0;
1290 c->rudpbytes= 0;
1291 }
1292 c->currcmd.isfinish= true;
1293 c->ctnwrite= false;
1294 c->rbytes= 0;
1295 c->rcurr= c->rbuf;
1296 c->msgcurr = 0;
1297 c->msgused = 0;
1298 c->iovused = 0;
1299 ms_conn_set_state(c, conn_write);
1300 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1301
1302 if (timeout)
1303 {
1304 ms_drive_machine(c);
1305 }
1306 } /* ms_reset_conn */
1307
1308
1309 /**
1310 * if we have a complete line in the buffer, process it.
1311 *
1312 * @param c, pointer of the concurrency
1313 *
1314 * @return int, if success, return EXIT_SUCCESS, else return -1
1315 */
1316 static int ms_try_read_line(ms_conn_t *c)
1317 {
1318 if (c->protocol == binary_prot)
1319 {
1320 /* Do we have the complete packet header? */
1321 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1322 {
1323 /* need more data! */
1324 return EXIT_SUCCESS;
1325 }
1326 else
1327 {
1328 #ifdef NEED_ALIGN
1329 if (((long)(c->rcurr)) % 8 != 0)
1330 {
1331 /* must realign input buffer */
1332 memmove(c->rbuf, c->rcurr, c->rbytes);
1333 c->rcurr= c->rbuf;
1334 if (settings.verbose)
1335 {
1336 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1337 }
1338 }
1339 #endif
1340 protocol_binary_response_header *rsp;
1341 rsp= (protocol_binary_response_header *)c->rcurr;
1342
1343 c->binary_header= *rsp;
1344 c->binary_header.response.extlen= rsp->response.extlen;
1345 c->binary_header.response.keylen= ntohs(rsp->response.keylen);
1346 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1347 c->binary_header.response.status= ntohs(rsp->response.status);
1348
1349 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1350 {
1351 fprintf(stderr, "Invalid magic: %x\n",
1352 c->binary_header.response.magic);
1353 ms_conn_set_state(c, conn_closing);
1354 return EXIT_SUCCESS;
1355 }
1356
1357 /* process this complete response */
1358 if (ms_bin_process_response(c) == 0)
1359 {
1360 /* current operation completed */
1361 ms_reset_conn(c, false);
1362 return -1;
1363 }
1364 else
1365 {
1366 c->rbytes-= (int32_t)sizeof(c->binary_header);
1367 c->rcurr+= sizeof(c->binary_header);
1368 }
1369 }
1370 }
1371 else
1372 {
1373 char *el, *cont;
1374
1375 assert(c != NULL);
1376 assert(c->rcurr <= (c->rbuf + c->rsize));
1377
1378 if (c->rbytes == 0)
1379 return EXIT_SUCCESS;
1380
1381 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1382 if (! el)
1383 return EXIT_SUCCESS;
1384
1385 cont= el + 1;
1386 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1387 {
1388 el--;
1389 }
1390 *el= '\0';
1391
1392 assert(cont <= (c->rcurr + c->rbytes));
1393
1394 /* process this complete line */
1395 if (ms_ascii_process_line(c, c->rcurr) == 0)
1396 {
1397 /* current operation completed */
1398 ms_reset_conn(c, false);
1399 return -1;
1400 }
1401 else
1402 {
1403 /* current operation didn't complete */
1404 c->rbytes-= (int32_t)(cont - c->rcurr);
1405 c->rcurr= cont;
1406 }
1407
1408 assert(c->rcurr <= (c->rbuf + c->rsize));
1409 }
1410
1411 return -1;
1412 } /* ms_try_read_line */
1413
1414
1415 /**
1416 * because the packet of UDP can't ensure the order, the
1417 * function is used to sort the received udp packet.
1418 *
1419 * @param c, pointer of the concurrency
1420 * @param buf, the buffer to store the ordered packages data
1421 * @param rbytes, the maximum capacity of the buffer
1422 *
1423 * @return int, if success, return the copy bytes, else return
1424 * -1
1425 */
1426 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1427 {
1428 int len= 0;
1429 int wbytes= 0;
1430 uint16_t req_id= 0;
1431 uint16_t seq_num= 0;
1432 uint16_t packets= 0;
1433 unsigned char *header= NULL;
1434
1435 /* no enough data */
1436 assert(c != NULL);
1437 assert(buf != NULL);
1438 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1439
1440 /* calculate received packets count */
1441 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1442 {
1443 /* the last packet has some data */
1444 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1445 }
1446 else
1447 {
1448 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1449 }
1450
1451 /* get the total packets count if necessary */
1452 if (c->packets == 0)
1453 {
1454 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1455 }
1456
1457 /* build the ordered packet array */
1458 for (int i= c->pktcurr; i < c->recvpkt; i++)
1459 {
1460 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1461 req_id= (uint16_t)HEADER_TO_REQID(header);
1462 assert(req_id == c->request_id % (1 << 16));
1463
1464 packets= (uint16_t)HEADER_TO_PACKETS(header);
1465 assert(c->packets == HEADER_TO_PACKETS(header));
1466
1467 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1468 c->udppkt[seq_num].header= header;
1469 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1470
1471 if (i == c->recvpkt - 1)
1472 {
1473 /* last received packet */
1474 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1475 {
1476 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1477 c->pktcurr++;
1478 }
1479 else
1480 {
1481 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1482 - UDP_HEADER_SIZE;
1483 }
1484 }
1485 else
1486 {
1487 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1488 c->pktcurr++;
1489 }
1490 }
1491
1492 for (int i= c->ordcurr; i < c->recvpkt; i++)
1493 {
1494 /* there is some data to copy */
1495 if ((c->udppkt[i].data != NULL)
1496 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1497 {
1498 header= c->udppkt[i].header;
1499 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1500 if (len > rbytes - wbytes)
1501 {
1502 len= rbytes - wbytes;
1503 }
1504
1505 assert(len <= rbytes - wbytes);
1506 assert(i == HEADER_TO_SEQNUM(header));
1507
1508 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1509 (size_t)len);
1510 wbytes+= len;
1511 c->udppkt[i].copybytes+= len;
1512
1513 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1514 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1515 {
1516 /* finish copying all the data of this packet, next */
1517 c->ordcurr++;
1518 }
1519
1520 /* last received packet, and finish copying all the data */
1521 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1522 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1523 {
1524 break;
1525 }
1526
1527 /* no space to copy data */
1528 if (wbytes >= rbytes)
1529 {
1530 break;
1531 }
1532
1533 /* it doesn't finish reading all the data of the packet from network */
1534 if ((i != c->recvpkt - 1)
1535 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1536 {
1537 break;
1538 }
1539 }
1540 else
1541 {
1542 /* no data to copy */
1543 break;
1544 }
1545 }
1546
1547 return wbytes == 0 ? -1 : wbytes;
1548 } /* ms_sort_udp_packet */
1549
1550
1551 /**
1552 * encapsulate upd read like tcp read
1553 *
1554 * @param c, pointer of the concurrency
1555 * @param buf, read buffer
1556 * @param len, length to read
1557 *
1558 * @return int, if success, return the read bytes, else return
1559 * -1
1560 */
1561 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1562 {
1563 int res= 0;
1564 int avail= 0;
1565 int rbytes= 0;
1566 int copybytes= 0;
1567
1568 assert(c->udp);
1569
1570 while (1)
1571 {
1572 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1573 {
1574 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1575 if (! new_rbuf)
1576 {
1577 fprintf(stderr, "Couldn't realloc input buffer.\n");
1578 c->rudpbytes= 0; /* ignore what we read */
1579 return -1;
1580 }
1581 c->rudpbuf= new_rbuf;
1582 c->rudpsize*= 2;
1583 }
1584
1585 avail= c->rudpsize - c->rudpbytes;
1586 /* UDP each time read a packet, 1400 bytes */
1587 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1588
1589 if (res > 0)
1590 {
1591 atomic_add_size(&ms_stats.bytes_read, res);
1592 c->rudpbytes+= res;
1593 rbytes+= res;
1594 if (res == avail)
1595 {
1596 continue;
1597 }
1598 else
1599 {
1600 break;
1601 }
1602 }
1603
1604 if (res == 0)
1605 {
1606 /* "connection" closed */
1607 return res;
1608 }
1609
1610 if (res == -1)
1611 {
1612 /* no data to read */
1613 return res;
1614 }
1615 }
1616
1617 /* copy data to read buffer */
1618 if (rbytes > 0)
1619 {
1620 copybytes= ms_sort_udp_packet(c, buf, len);
1621 }
1622
1623 if (copybytes == -1)
1624 {
1625 atomic_add_size(&ms_stats.pkt_disorder, 1);
1626 }
1627
1628 return copybytes;
1629 } /* ms_udp_read */
1630
1631
1632 /*
1633 * read from network as much as we can, handle buffer overflow and connection
1634 * close.
1635 * before reading, move the remaining incomplete fragment of a command
1636 * (if any) to the beginning of the buffer.
1637 * return EXIT_SUCCESS if there's nothing to read on the first read.
1638 */
1639
1640 /**
1641 * read from network as much as we can, handle buffer overflow and connection
1642 * close. before reading, move the remaining incomplete fragment of a command
1643 * (if any) to the beginning of the buffer.
1644 *
1645 * @param c, pointer of the concurrency
1646 *
1647 * @return int,
1648 * return EXIT_SUCCESS if there's nothing to read on the first read.
1649 * return EXIT_FAILURE if get data
1650 * return -1 if error happens
1651 */
1652 static int ms_try_read_network(ms_conn_t *c)
1653 {
1654 int gotdata= 0;
1655 int res;
1656 int64_t avail;
1657
1658 assert(c != NULL);
1659
1660 if ((c->rcurr != c->rbuf)
1661 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1662 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1663 {
1664 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1665 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1666 c->rcurr= c->rbuf;
1667 }
1668
1669 while (1)
1670 {
1671 if (c->rbytes >= c->rsize)
1672 {
1673 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1674 if (! new_rbuf)
1675 {
1676 fprintf(stderr, "Couldn't realloc input buffer.\n");
1677 c->rbytes= 0; /* ignore what we read */
1678 return -1;
1679 }
1680 c->rcurr= c->rbuf= new_rbuf;
1681 c->rsize*= 2;
1682 }
1683
1684 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1685 if (avail == 0)
1686 {
1687 break;
1688 }
1689
1690 if (c->udp)
1691 {
1692 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1693 }
1694 else
1695 {
1696 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1697 }
1698
1699 if (res > 0)
1700 {
1701 if (! c->udp)
1702 {
1703 atomic_add_size(&ms_stats.bytes_read, res);
1704 }
1705 gotdata= 1;
1706 c->rbytes+= res;
1707 if (res == avail)
1708 {
1709 continue;
1710 }
1711 else
1712 {
1713 break;
1714 }
1715 }
1716 if (res == 0)
1717 {
1718 /* connection closed */
1719 ms_conn_set_state(c, conn_closing);
1720 return -1;
1721 }
1722 if (res == -1)
1723 {
1724 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1725 break;
1726 /* Should close on unhandled errors. */
1727 ms_conn_set_state(c, conn_closing);
1728 return -1;
1729 }
1730 }
1731
1732 return gotdata;
1733 } /* ms_try_read_network */
1734
1735
1736 /**
1737 * after get the object from server, verify the value if
1738 * necessary.
1739 *
1740 * @param c, pointer of the concurrency
1741 * @param mlget_item, pointer of mulit-get task item structure
1742 * @param value, received value string
1743 * @param vlen, received value string length
1744 */
1745 static void ms_verify_value(ms_conn_t *c,
1746 ms_mlget_task_item_t *mlget_item,
1747 char *value,
1748 int vlen)
1749 {
1750 if (c->curr_task.verify)
1751 {
1752 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1753 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1754 char *orignkey=
1755 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1756
1757 /* verify expire time if necessary */
1758 if (c->curr_task.item->exp_time > 0)
1759 {
1760 struct timeval curr_time;
1761 gettimeofday(&curr_time, NULL);
1762
1763 /* object expired but get it now */
1764 if (curr_time.tv_sec - c->curr_task.item->client_time
1765 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1766 {
1767 atomic_add_size(&ms_stats.exp_get, 1);
1768
1769 if (ms_setting.verbose)
1770 {
1771 char set_time[64];
1772 char cur_time[64];
1773 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1774 localtime(&c->curr_task.item->client_time));
1775 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1776 localtime(&curr_time.tv_sec));
1777 fprintf(stderr,
1778 "\n<%d expire time verification failed, "
1779 "object expired but get it now\n"
1780 "\tkey len: %d\n"
1781 "\tkey: %" PRIx64 " %.*s\n"
1782 "\tset time: %s current time: %s "
1783 "diff time: %d expire time: %d\n"
1784 "\texpected data: \n"
1785 "\treceived data len: %d\n"
1786 "\treceived data: %.*s\n",
1787 c->sfd,
1788 c->curr_task.item->key_size,
1789 c->curr_task.item->key_prefix,
1790 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1791 orignkey,
1792 set_time,
1793 cur_time,
1794 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1795 c->curr_task.item->exp_time,
1796 vlen,
1797 vlen,
1798 value);
1799 fflush(stderr);
1800 }
1801 }
1802 }
1803 else
1804 {
1805 if ((c->curr_task.item->value_size != vlen)
1806 || (memcmp(orignval, value, (size_t)vlen) != 0))
1807 {
1808 atomic_add_size(&ms_stats.vef_failed, 1);
1809
1810 if (ms_setting.verbose)
1811 {
1812 fprintf(stderr,
1813 "\n<%d data verification failed\n"
1814 "\tkey len: %d\n"
1815 "\tkey: %" PRIx64" %.*s\n"
1816 "\texpected data len: %d\n"
1817 "\texpected data: %.*s\n"
1818 "\treceived data len: %d\n"
1819 "\treceived data: %.*s\n",
1820 c->sfd,
1821 c->curr_task.item->key_size,
1822 c->curr_task.item->key_prefix,
1823 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1824 orignkey,
1825 c->curr_task.item->value_size,
1826 c->curr_task.item->value_size,
1827 orignval,
1828 vlen,
1829 vlen,
1830 value);
1831 fflush(stderr);
1832 }
1833 }
1834 }
1835
1836 c->curr_task.finish_verify= true;
1837
1838 if (mlget_item != NULL)
1839 {
1840 mlget_item->finish_verify= true;
1841 }
1842 }
1843 } /* ms_verify_value */
1844
1845
1846 /**
1847 * For ASCII protocol, after store the data into the local
1848 * buffer, run this function to handle the data.
1849 *
1850 * @param c, pointer of the concurrency
1851 */
1852 static void ms_ascii_complete_nread(ms_conn_t *c)
1853 {
1854 assert(c != NULL);
1855 assert(c->rbytes >= c->rvbytes);
1856 assert(c->protocol == ascii_prot);
1857 if (c->rvbytes > 2)
1858 {
1859 assert(
1860 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1861 }
1862
1863 /* multi-get */
1864 ms_mlget_task_item_t *mlget_item= NULL;
1865 if (((ms_setting.mult_key_num > 1)
1866 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1867 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1868 {
1869 c->mlget_task.value_index++;
1870 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1871
1872 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1873 {
1874 c->curr_task.item= mlget_item->item;
1875 c->curr_task.verify= mlget_item->verify;
1876 c->curr_task.finish_verify= mlget_item->finish_verify;
1877 mlget_item->get_miss= false;
1878 }
1879 else
1880 {
1881 /* Try to find the task item in multi-get task array */
1882 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1883 {
1884 mlget_item= &c->mlget_task.mlget_item[i];
1885 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1886 {
1887 c->curr_task.item= mlget_item->item;
1888 c->curr_task.verify= mlget_item->verify;
1889 c->curr_task.finish_verify= mlget_item->finish_verify;
1890 mlget_item->get_miss= false;
1891
1892 break;
1893 }
1894 }
1895 }
1896 }
1897
1898 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1899
1900 c->curr_task.get_miss= false;
1901 c->rbytes-= c->rvbytes;
1902 c->rcurr= c->rcurr + c->rvbytes;
1903 assert(c->rcurr <= (c->rbuf + c->rsize));
1904 c->readval= false;
1905 c->rvbytes= 0;
1906 } /* ms_ascii_complete_nread */
1907
1908
1909 /**
1910 * For binary protocol, after store the data into the local
1911 * buffer, run this function to handle the data.
1912 *
1913 * @param c, pointer of the concurrency
1914 */
1915 static void ms_bin_complete_nread(ms_conn_t *c)
1916 {
1917 assert(c != NULL);
1918 assert(c->rbytes >= c->rvbytes);
1919 assert(c->protocol == binary_prot);
1920
1921 int extlen= c->binary_header.response.extlen;
1922 int keylen= c->binary_header.response.keylen;
1923 uint8_t opcode= c->binary_header.response.opcode;
1924
1925 /* not get command or not include value, just return */
1926 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1927 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1928 || (c->rvbytes <= extlen + keylen))
1929 {
1930 /* get miss */
1931 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1932 {
1933 c->currcmd.retstat= MCD_END;
1934 c->curr_task.get_miss= true;
1935 }
1936
1937 c->readval= false;
1938 c->rvbytes= 0;
1939 ms_reset_conn(c, false);
1940 return;
1941 }
1942
1943 /* multi-get */
1944 ms_mlget_task_item_t *mlget_item= NULL;
1945 if (((ms_setting.mult_key_num > 1)
1946 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1947 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1948 {
1949 c->mlget_task.value_index++;
1950 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1951
1952 c->curr_task.item= mlget_item->item;
1953 c->curr_task.verify= mlget_item->verify;
1954 c->curr_task.finish_verify= mlget_item->finish_verify;
1955 mlget_item->get_miss= false;
1956 }
1957
1958 ms_verify_value(c,
1959 mlget_item,
1960 c->rcurr + extlen + keylen,
1961 c->rvbytes - extlen - keylen);
1962
1963 c->currcmd.retstat= MCD_END;
1964 c->curr_task.get_miss= false;
1965 c->rbytes-= c->rvbytes;
1966 c->rcurr= c->rcurr + c->rvbytes;
1967 assert(c->rcurr <= (c->rbuf + c->rsize));
1968 c->readval= false;
1969 c->rvbytes= 0;
1970
1971 if (ms_setting.mult_key_num > 1)
1972 {
1973 /* multi-get have check all the item */
1974 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1975 {
1976 ms_reset_conn(c, false);
1977 }
1978 }
1979 else
1980 {
1981 /* single get */
1982 ms_reset_conn(c, false);
1983 }
1984 } /* ms_bin_complete_nread */
1985
1986
1987 /**
1988 * we get here after reading the value of get commands.
1989 *
1990 * @param c, pointer of the concurrency
1991 */
1992 static void ms_complete_nread(ms_conn_t *c)
1993 {
1994 assert(c != NULL);
1995 assert(c->rbytes >= c->rvbytes);
1996 assert(c->protocol == ascii_prot
1997 || c->protocol == binary_prot);
1998
1999 if (c->protocol == binary_prot)
2000 {
2001 ms_bin_complete_nread(c);
2002 }
2003 else
2004 {
2005 ms_ascii_complete_nread(c);
2006 }
2007 } /* ms_complete_nread */
2008
2009
2010 /**
2011 * Adds a message header to a connection.
2012 *
2013 * @param c, pointer of the concurrency
2014 *
2015 * @return int, if success, return EXIT_SUCCESS, else return -1
2016 */
2017 static int ms_add_msghdr(ms_conn_t *c)
2018 {
2019 struct msghdr *msg;
2020
2021 assert(c != NULL);
2022
2023 if (c->msgsize == c->msgused)
2024 {
2025 msg=
2026 realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
2027 if (! msg)
2028 return -1;
2029
2030 c->msglist= msg;
2031 c->msgsize*= 2;
2032 }
2033
2034 msg= c->msglist + c->msgused;
2035
2036 /**
2037 * this wipes msg_iovlen, msg_control, msg_controllen, and
2038 * msg_flags, the last 3 of which aren't defined on solaris:
2039 */
2040 memset(msg, 0, sizeof(struct msghdr));
2041
2042 msg->msg_iov= &c->iov[c->iovused];
2043
2044 if (c->udp && (c->srv_recv_addr_size > 0))
2045 {
2046 msg->msg_name= &c->srv_recv_addr;
2047 msg->msg_namelen= c->srv_recv_addr_size;
2048 }
2049
2050 c->msgbytes= 0;
2051 c->msgused++;
2052
2053 if (c->udp)
2054 {
2055 /* Leave room for the UDP header, which we'll fill in later. */
2056 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2057 }
2058
2059 return EXIT_SUCCESS;
2060 } /* ms_add_msghdr */
2061
2062
2063 /**
2064 * Ensures that there is room for another structure iovec in a connection's
2065 * iov list.
2066 *
2067 * @param c, pointer of the concurrency
2068 *
2069 * @return int, if success, return EXIT_SUCCESS, else return -1
2070 */
2071 static int ms_ensure_iov_space(ms_conn_t *c)
2072 {
2073 assert(c != NULL);
2074
2075 if (c->iovused >= c->iovsize)
2076 {
2077 int i, iovnum;
2078 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2079 ((size_t)c->iovsize
2080 * 2)
2081 * sizeof(struct iovec));
2082 if (! new_iov)
2083 return -1;
2084
2085 c->iov= new_iov;
2086 c->iovsize*= 2;
2087
2088 /* Point all the msghdr structures at the new list. */
2089 for (i= 0, iovnum= 0; i < c->msgused; i++)
2090 {
2091 c->msglist[i].msg_iov= &c->iov[iovnum];
2092 iovnum+= (int)c->msglist[i].msg_iovlen;
2093 }
2094 }
2095
2096 return EXIT_SUCCESS;
2097 } /* ms_ensure_iov_space */
2098
2099
2100 /**
2101 * Adds data to the list of pending data that will be written out to a
2102 * connection.
2103 *
2104 * @param c, pointer of the concurrency
2105 * @param buf, the buffer includes data to send
2106 * @param len, the data length in the buffer
2107 *
2108 * @return int, if success, return EXIT_SUCCESS, else return -1
2109 */
2110 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2111 {
2112 struct msghdr *m;
2113 int leftover;
2114 bool limit_to_mtu;
2115
2116 assert(c != NULL);
2117
2118 do
2119 {
2120 m= &c->msglist[c->msgused - 1];
2121
2122 /*
2123 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2124 */
2125 limit_to_mtu= c->udp;
2126
2127 #ifdef IOV_MAX
2128 /* We may need to start a new msghdr if this one is full. */
2129 if ((m->msg_iovlen == IOV_MAX)
2130 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2131 {
2132 ms_add_msghdr(c);
2133 m= &c->msglist[c->msgused - 1];
2134 }
2135 #endif
2136
2137 if (ms_ensure_iov_space(c) != 0)
2138 return -1;
2139
2140 /* If the fragment is too big to fit in the datagram, split it up */
2141 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2142 {
2143 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2144 len-= leftover;
2145 }
2146 else
2147 {
2148 leftover= 0;
2149 }
2150
2151 m= &c->msglist[c->msgused - 1];
2152 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2153 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2154
2155 c->msgbytes+= len;
2156 c->iovused++;
2157 m->msg_iovlen++;
2158
2159 buf= ((char *)buf) + len;
2160 len= leftover;
2161 }
2162 while (leftover > 0);
2163
2164 return EXIT_SUCCESS;
2165 } /* ms_add_iov */
2166
2167
2168 /**
2169 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2170 *
2171 * @param c, pointer of the concurrency
2172 *
2173 * @return int, if success, return EXIT_SUCCESS, else return -1
2174 */
2175 static int ms_build_udp_headers(ms_conn_t *c)
2176 {
2177 int i;
2178 unsigned char *hdr;
2179
2180 assert(c != NULL);
2181
2182 c->request_id= ms_get_udp_request_id();
2183
2184 if (c->msgused > c->hdrsize)
2185 {
2186 void *new_hdrbuf;
2187 if (c->hdrbuf)
2188 new_hdrbuf= realloc(c->hdrbuf,
2189 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2190 else
2191 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2192 if (! new_hdrbuf)
2193 return -1;
2194
2195 c->hdrbuf= (unsigned char *)new_hdrbuf;
2196 c->hdrsize= c->msgused * 2;
2197 }
2198
2199 /* If this is a multi-packet request, drop it. */
2200 if (c->udp && (c->msgused > 1))
2201 {
2202 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2203 return -1;
2204 }
2205
2206 hdr= c->hdrbuf;
2207 for (i= 0; i < c->msgused; i++)
2208 {
2209 c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2210 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2211 *hdr++= (unsigned char)(c->request_id / 256);
2212 *hdr++= (unsigned char)(c->request_id % 256);
2213 *hdr++= (unsigned char)(i / 256);
2214 *hdr++= (unsigned char)(i % 256);
2215 *hdr++= (unsigned char)(c->msgused / 256);
2216 *hdr++= (unsigned char)(c->msgused % 256);
2217 *hdr++= (unsigned char)1; /* support facebook memcached */
2218 *hdr++= (unsigned char)0;
2219 assert(hdr ==
2220 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2221 + UDP_HEADER_SIZE));
2222 }
2223
2224 return EXIT_SUCCESS;
2225 } /* ms_build_udp_headers */
2226
2227
2228 /**
2229 * Transmit the next chunk of data from our list of msgbuf structures.
2230 *
2231 * @param c, pointer of the concurrency
2232 *
2233 * @return TRANSMIT_COMPLETE All done writing.
2234 * TRANSMIT_INCOMPLETE More data remaining to write.
2235 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2236 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2237 */
2238 static int ms_transmit(ms_conn_t *c)
2239 {
2240 assert(c != NULL);
2241
2242 if ((c->msgcurr < c->msgused)
2243 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2244 {
2245 /* Finished writing the current msg; advance to the next. */
2246 c->msgcurr++;
2247 }
2248
2249 if (c->msgcurr < c->msgused)
2250 {
2251 ssize_t res;
2252 struct msghdr *m= &c->msglist[c->msgcurr];
2253
2254 res= sendmsg(c->sfd, m, 0);
2255 if (res > 0)
2256 {
2257 atomic_add_size(&ms_stats.bytes_written, res);
2258
2259 /* We've written some of the data. Remove the completed
2260 * iovec entries from the list of pending writes. */
2261 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2262 {
2263 res-= (ssize_t)m->msg_iov->iov_len;
2264 m->msg_iovlen--;
2265 m->msg_iov++;
2266 }
2267
2268 /* Might have written just part of the last iovec entry;
2269 * adjust it so the next write will do the rest. */
2270 if (res > 0)
2271 {
2272 m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2273 m->msg_iov->iov_len-= (size_t)res;
2274 }
2275 return TRANSMIT_INCOMPLETE;
2276 }
2277 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2278 {
2279 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2280 {
2281 fprintf(stderr, "Couldn't update event.\n");
2282 ms_conn_set_state(c, conn_closing);
2283 return TRANSMIT_HARD_ERROR;
2284 }
2285 return TRANSMIT_SOFT_ERROR;
2286 }
2287
2288 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2289 * we have a real error, on which we close the connection */
2290 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2291
2292 ms_conn_set_state(c, conn_closing);
2293 return TRANSMIT_HARD_ERROR;
2294 }
2295 else
2296 {
2297 return TRANSMIT_COMPLETE;
2298 }
2299 } /* ms_transmit */
2300
2301
2302 /**
2303 * Shrinks a connection's buffers if they're too big. This prevents
2304 * periodic large "mget" response from server chewing lots of client
2305 * memory.
2306 *
2307 * This should only be called in between requests since it can wipe output
2308 * buffers!
2309 *
2310 * @param c, pointer of the concurrency
2311 */
2312 static void ms_conn_shrink(ms_conn_t *c)
2313 {
2314 assert(c != NULL);
2315
2316 if (c->udp)
2317 return;
2318
2319 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2320 {
2321 char *newbuf;
2322
2323 if (c->rcurr != c->rbuf)
2324 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2325
2326 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2327
2328 if (newbuf)
2329 {
2330 c->rbuf= newbuf;
2331 c->rsize= DATA_BUFFER_SIZE;
2332 }
2333 c->rcurr= c->rbuf;
2334 }
2335
2336 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2337 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2338 {
2339 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2340 if (! new_rbuf)
2341 {
2342 c->rudpbuf= new_rbuf;
2343 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2344 }
2345 /* TODO check error condition? */
2346 }
2347
2348 if (c->msgsize > MSG_LIST_HIGHWAT)
2349 {
2350 struct msghdr *newbuf= (struct msghdr *)realloc(
2351 (void *)c->msglist,
2352 MSG_LIST_INITIAL
2353 * sizeof(c->msglist[0]));
2354 if (newbuf)
2355 {
2356 c->msglist= newbuf;
2357 c->msgsize= MSG_LIST_INITIAL;
2358 }
2359 /* TODO check error condition? */
2360 }
2361
2362 if (c->iovsize > IOV_LIST_HIGHWAT)
2363 {
2364 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2365 IOV_LIST_INITIAL
2366 * sizeof(c->iov[0]));
2367 if (newbuf)
2368 {
2369 c->iov= newbuf;
2370 c->iovsize= IOV_LIST_INITIAL;
2371 }
2372 /* TODO check return value */
2373 }
2374 } /* ms_conn_shrink */
2375
2376
2377 /**
2378 * Sets a connection's current state in the state machine. Any special
2379 * processing that needs to happen on certain state transitions can
2380 * happen here.
2381 *
2382 * @param c, pointer of the concurrency
2383 * @param state, connection state
2384 */
2385 static void ms_conn_set_state(ms_conn_t *c, int state)
2386 {
2387 assert(c != NULL);
2388
2389 if (state != c->state)
2390 {
2391 if (state == conn_read)
2392 {
2393 ms_conn_shrink(c);
2394 }
2395 c->state= state;
2396 }
2397 } /* ms_conn_set_state */
2398
2399
2400 /**
2401 * update the event if socks change state. for example: when
2402 * change the listen scoket read event to sock write event, or
2403 * change socket handler, we could call this function.
2404 *
2405 * @param c, pointer of the concurrency
2406 * @param new_flags, new event flags
2407 *
2408 * @return bool, if success, return true, else return false
2409 */
2410 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2411 {
2412 assert(c != NULL);
2413
2414 struct event_base *base= c->event.ev_base;
2415 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2416 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2417 {
2418 return true;
2419 }
2420
2421 if (event_del(&c->event) == -1)
2422 {
2423 /* try to delete the event again */
2424 if (event_del(&c->event) == -1)
2425 {
2426 return false;
2427 }
2428 }
2429
2430 event_set(&c->event,
2431 c->sfd,
2432 (short)new_flags,
2433 ms_event_handler,
2434 (void *)c);
2435 event_base_set(base, &c->event);
2436 c->ev_flags= (short)new_flags;
2437
2438 if (event_add(&c->event, NULL) == -1)
2439 {
2440 return false;
2441 }
2442
2443 return true;
2444 } /* ms_update_event */
2445
2446
2447 /**
2448 * If user want to get the expected throughput, we could limit
2449 * the performance of memslap. we could give up some work and
2450 * just wait a short time. The function is used to check this
2451 * case.
2452 *
2453 * @param c, pointer of the concurrency
2454 *
2455 * @return bool, if success, return true, else return false
2456 */
2457 static bool ms_need_yield(ms_conn_t *c)
2458 {
2459 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2460 int64_t tps= 0;
2461 int64_t time_diff= 0;
2462 struct timeval curr_time;
2463 ms_task_t *task= &c->curr_task;
2464
2465 if (ms_setting.expected_tps > 0)
2466 {
2467 gettimeofday(&curr_time, NULL);
2468 time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2469 tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000);
2470
2471 /* current throughput is greater than expected throughput */
2472 if (tps > ms_thread->thread_ctx->tps_perconn)
2473 {
2474 return true;
2475 }
2476 }
2477
2478 return false;
2479 } /* ms_need_yield */
2480
2481
2482 /**
2483 * used to update the start time of each operation
2484 *
2485 * @param c, pointer of the concurrency
2486 */
2487 static void ms_update_start_time(ms_conn_t *c)
2488 {
2489 ms_task_item_t *item= c->curr_task.item;
2490
2491 if ((ms_setting.stat_freq > 0) || c->udp
2492 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2493 {
2494 gettimeofday(&c->start_time, NULL);
2495 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2496 {
2497 /* record the current time */
2498 item->client_time= c->start_time.tv_sec;
2499 }
2500 }
2501 } /* ms_update_start_time */
2502
2503
2504 /**
2505 * run the state machine
2506 *
2507 * @param c, pointer of the concurrency
2508 */
2509 static void ms_drive_machine(ms_conn_t *c)
2510 {
2511 bool stop= false;
2512
2513 assert(c != NULL);
2514
2515 while (! stop)
2516 {
2517 switch (c->state)
2518 {
2519 case conn_read:
2520 if (c->readval)
2521 {
2522 if (c->rbytes >= c->rvbytes)
2523 {
2524 ms_complete_nread(c);
2525 break;
2526 }
2527 }
2528 else
2529 {
2530 if (ms_try_read_line(c) != 0)
2531 {
2532 break;
2533 }
2534 }
2535
2536 if (ms_try_read_network(c) != 0)
2537 {
2538 break;
2539 }
2540
2541 /* doesn't read all the response data, wait event wake up */
2542 if (! c->currcmd.isfinish)
2543 {
2544 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2545 {
2546 fprintf(stderr, "Couldn't update event.\n");
2547 ms_conn_set_state(c, conn_closing);
2548 break;
2549 }
2550 stop= true;
2551 break;
2552 }
2553
2554 /* we have no command line and no data to read from network, next write */
2555 ms_conn_set_state(c, conn_write);
2556 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2557
2558 break;
2559
2560 case conn_write:
2561 if (! c->ctnwrite && ms_need_yield(c))
2562 {
2563 usleep(10);
2564
2565 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2566 {
2567 fprintf(stderr, "Couldn't update event.\n");
2568 ms_conn_set_state(c, conn_closing);
2569 break;
2570 }
2571 stop= true;
2572 break;
2573 }
2574
2575 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2576 {
2577 ms_conn_set_state(c, conn_closing);
2578 break;
2579 }
2580
2581 /* record the start time before starting to send data if necessary */
2582 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2583 {
2584 if (c->change_sfd)
2585 {
2586 c->change_sfd= false;
2587 }
2588 ms_update_start_time(c);
2589 }
2590
2591 /* change sfd if necessary */
2592 if (c->change_sfd)
2593 {
2594 c->ctnwrite= true;
2595 stop= true;
2596 break;
2597 }
2598
2599 /* execute task until nothing need be written to network */
2600 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2601 {
2602 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2603 {
2604 fprintf(stderr, "Couldn't update event.\n");
2605 ms_conn_set_state(c, conn_closing);
2606 break;
2607 }
2608 stop= true;
2609 break;
2610 }
2611
2612 switch (ms_transmit(c))
2613 {
2614 case TRANSMIT_COMPLETE:
2615 /* we have no data to write to network, next wait repose */
2616 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2617 {
2618 fprintf(stderr, "Couldn't update event.\n");
2619 ms_conn_set_state(c, conn_closing);
2620 c->ctnwrite= false;
2621 break;
2622 }
2623 ms_conn_set_state(c, conn_read);
2624 c->ctnwrite= false;
2625 stop= true;
2626 break;
2627
2628 case TRANSMIT_INCOMPLETE:
2629 c->ctnwrite= true;
2630 break; /* Continue in state machine. */
2631
2632 case TRANSMIT_HARD_ERROR:
2633 c->ctnwrite= false;
2634 break;
2635
2636 case TRANSMIT_SOFT_ERROR:
2637 c->ctnwrite= true;
2638 stop= true;
2639 break;
2640
2641 default:
2642 break;
2643 } /* switch */
2644
2645 break;
2646
2647 case conn_closing:
2648 /* recovery mode, need reconnect if connection close */
2649 if (ms_setting.reconnect && (! ms_global.time_out
2650 || ((ms_setting.run_time == 0)
2651 && (c->remain_exec_num > 0))))
2652 {
2653 if (ms_reconn(c) != 0)
2654 {
2655 ms_conn_close(c);
2656 stop= true;
2657 break;
2658 }
2659
2660 ms_reset_conn(c, false);
2661
2662 if (c->total_sfds == 1)
2663 {
2664 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2665 {
2666 fprintf(stderr, "Couldn't update event.\n");
2667 ms_conn_set_state(c, conn_closing);
2668 break;
2669 }
2670 }
2671
2672 break;
2673 }
2674 else
2675 {
2676 ms_conn_close(c);
2677 stop= true;
2678 break;
2679 }
2680
2681 default:
2682 assert(0);
2683 } /* switch */
2684 }
2685 } /* ms_drive_machine */
2686
2687
2688 /**
2689 * the event handler of each thread
2690 *
2691 * @param fd, the file descriptor of socket
2692 * @param which, event flag
2693 * @param arg, argument
2694 */
2695 void ms_event_handler(const int fd, const short which, void *arg)
2696 {
2697 ms_conn_t *c= (ms_conn_t *)arg;
2698
2699 assert(c != NULL);
2700
2701 c->which= which;
2702
2703 /* sanity */
2704 if (fd != c->sfd)
2705 {
2706 fprintf(stderr,
2707 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2708 fd,
2709 c->sfd);
2710 ms_conn_close(c);
2711 exit(1);
2712 }
2713 assert(fd == c->sfd);
2714
2715 ms_drive_machine(c);
2716
2717 /* wait for next event */
2718 } /* ms_event_handler */
2719
2720
2721 /**
2722 * get the next socket descriptor index to run for replication
2723 *
2724 * @param c, pointer of the concurrency
2725 * @param cmd, command(get or set )
2726 *
2727 * @return int, if success, return the index, else return EXIT_SUCCESS
2728 */
2729 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2730 {
2731 uint32_t sock_index= 0;
2732 uint32_t i= 0;
2733
2734 if (c->total_sfds == 1)
2735 {
2736 return EXIT_SUCCESS;
2737 }
2738
2739 if (ms_setting.rep_write_srv == 0)
2740 {
2741 return sock_index;
2742 }
2743
2744 do
2745 {
2746 if (cmd == CMD_SET)
2747 {
2748 for (i= 0; i < ms_setting.rep_write_srv; i++)
2749 {
2750 if (c->tcpsfd[i] > 0)
2751 {
2752 break;
2753 }
2754 }
2755
2756 if (i == ms_setting.rep_write_srv)
2757 {
2758 /* random get one replication server to read */
2759 sock_index= (uint32_t)random() % c->total_sfds;
2760 }
2761 else
2762 {
2763 /* random get one replication writing server to write */
2764 sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
2765 }
2766 }
2767 else if (cmd == CMD_GET)
2768 {
2769 /* random get one replication server to read */
2770 sock_index= (uint32_t)random() % c->total_sfds;
2771 }
2772 }
2773 while (c->tcpsfd[sock_index] == 0);
2774
2775 return sock_index;
2776 } /* ms_get_rep_sock_index */
2777
2778
2779 /**
2780 * get the next socket descriptor index to run
2781 *
2782 * @param c, pointer of the concurrency
2783 *
2784 * @return int, return the index
2785 */
2786 static uint32_t ms_get_next_sock_index(ms_conn_t *c)
2787 {
2788 uint32_t sock_index= 0;
2789
2790 do
2791 {
2792 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2793 }
2794 while (c->tcpsfd[sock_index] == 0);
2795
2796 return sock_index;
2797 } /* ms_get_next_sock_index */
2798
2799
2800 /**
2801 * update socket event of the connections
2802 *
2803 * @param c, pointer of the concurrency
2804 *
2805 * @return int, if success, return EXIT_SUCCESS, else return -1
2806 */
2807 static int ms_update_conn_sock_event(ms_conn_t *c)
2808 {
2809 assert(c != NULL);
2810
2811 switch (c->currcmd.cmd)
2812 {
2813 case CMD_SET:
2814 if (ms_setting.facebook_test && c->udp)
2815 {
2816 c->sfd= c->tcpsfd[0];
2817 c->udp= false;
2818 c->change_sfd= true;
2819 }
2820 break;
2821
2822 case CMD_GET:
2823 if (ms_setting.facebook_test && ! c->udp)
2824 {
2825 c->sfd= c->udpsfd;
2826 c->udp= true;
2827 c->change_sfd= true;
2828 }
2829 break;
2830
2831 default:
2832 break;
2833 } /* switch */
2834
2835 if (! c->udp && (c->total_sfds > 1))
2836 {
2837 if (c->cur_idx != c->total_sfds)
2838 {
2839 if (ms_setting.rep_write_srv == 0)
2840 {
2841 c->cur_idx= ms_get_next_sock_index(c);
2842 }
2843 else
2844 {
2845 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2846 }
2847 }
2848 else
2849 {
2850 /* must select the first sock of the connection at the beginning */
2851 c->cur_idx= 0;
2852 }
2853
2854 c->sfd= c->tcpsfd[c->cur_idx];
2855 assert(c->sfd != 0);
2856 c->change_sfd= true;
2857 }
2858
2859 if (c->change_sfd)
2860 {
2861 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2862 {
2863 fprintf(stderr, "Couldn't update event.\n");
2864 ms_conn_set_state(c, conn_closing);
2865 return -1;
2866 }
2867 }
2868
2869 return EXIT_SUCCESS;
2870 } /* ms_update_conn_sock_event */
2871
2872
2873 /**
2874 * for ASCII protocol, this function build the set command
2875 * string and send the command.
2876 *
2877 * @param c, pointer of the concurrency
2878 * @param item, pointer of task item which includes the object
2879 * information
2880 *
2881 * @return int, if success, return EXIT_SUCCESS, else return -1
2882 */
2883 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2884 {
2885 int value_offset;
2886 int write_len;
2887 char *buffer= c->wbuf;
2888
2889 write_len= snprintf(buffer,
2890 c->wsize,
2891 " %u %d %d\r\n",
2892 0,
2893 item->exp_time,
2894 item->value_size);
2895
2896 if (write_len > c->wsize || write_len < 0)
2897 {
2898 /* ought to be always enough. just fail for simplicity */
2899 fprintf(stderr, "output command line too long.\n");
2900 return -1;
2901 }
2902
2903 if (item->value_offset == INVALID_OFFSET)
2904 {
2905 value_offset= item->key_suffix_offset;
2906 }
2907 else
2908 {
2909 value_offset= item->value_offset;
2910 }
2911
2912 if ((ms_add_iov(c, "set ", 4) != 0)
2913 || (ms_add_iov(c, (char *)&item->key_prefix,
2914 (int)KEY_PREFIX_SIZE) != 0)
2915 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2916 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2917 || (ms_add_iov(c, buffer, write_len) != 0)
2918 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2919 item->value_size) != 0)
2920 || (ms_add_iov(c, "\r\n", 2) != 0)
2921 || (c->udp && (ms_build_udp_headers(c) != 0)))
2922 {
2923 return -1;
2924 }
2925
2926 return EXIT_SUCCESS;
2927 } /* ms_build_ascii_write_buf_set */
2928
2929
2930 /**
2931 * used to send set command to server
2932 *
2933 * @param c, pointer of the concurrency
2934 * @param item, pointer of task item which includes the object
2935 * information
2936 *
2937 * @return int, if success, return EXIT_SUCCESS, else return -1
2938 */
2939 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2940 {
2941 assert(c != NULL);
2942
2943 c->currcmd.cmd= CMD_SET;
2944 c->currcmd.isfinish= false;
2945 c->currcmd.retstat= MCD_FAILURE;
2946
2947 if (ms_update_conn_sock_event(c) != 0)
2948 {
2949 return -1;
2950 }
2951
2952 c->msgcurr= 0;
2953 c->msgused= 0;
2954 c->iovused= 0;
2955 if (ms_add_msghdr(c) != 0)
2956 {
2957 fprintf(stderr, "Out of memory preparing request.");
2958 return -1;
2959 }
2960
2961 /* binary protocol */
2962 if (c->protocol == binary_prot)
2963 {
2964 if (ms_build_bin_write_buf_set(c, item) != 0)
2965 {
2966 return -1;
2967 }
2968 }
2969 else
2970 {
2971 if (ms_build_ascii_write_buf_set(c, item) != 0)
2972 {
2973 return -1;
2974 }
2975 }
2976
2977 atomic_add_size(&ms_stats.obj_bytes,
2978 item->key_size + item->value_size);
2979 atomic_add_size(&ms_stats.cmd_set, 1);
2980
2981 return EXIT_SUCCESS;
2982 } /* ms_mcd_set */
2983
2984
2985 /**
2986 * for ASCII protocol, this function build the get command
2987 * string and send the command.
2988 *
2989 * @param c, pointer of the concurrency
2990 * @param item, pointer of task item which includes the object
2991 * information
2992 *
2993 * @return int, if success, return EXIT_SUCCESS, else return -1
2994 */
2995 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
2996 {
2997 if ((ms_add_iov(c, "get ", 4) != 0)
2998 || (ms_add_iov(c, (char *)&item->key_prefix,
2999 (int)KEY_PREFIX_SIZE) != 0)
3000 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3001 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3002 || (ms_add_iov(c, "\r\n", 2) != 0)
3003 || (c->udp && (ms_build_udp_headers(c) != 0)))
3004 {
3005 return -1;
3006 }
3007
3008 return EXIT_SUCCESS;
3009 } /* ms_build_ascii_write_buf_get */
3010
3011
3012 /**
3013 * used to send the get command to server
3014 *
3015 * @param c, pointer of the concurrency
3016 * @param item, pointer of task item which includes the object
3017 * information
3018 *
3019 * @return int, if success, return EXIT_SUCCESS, else return -1
3020 */
3021 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
3022 {
3023 assert(c != NULL);
3024
3025 c->currcmd.cmd= CMD_GET;
3026 c->currcmd.isfinish= false;
3027 c->currcmd.retstat= MCD_FAILURE;
3028
3029 if (ms_update_conn_sock_event(c) != 0)
3030 {
3031 return -1;
3032 }
3033
3034 c->msgcurr= 0;
3035 c->msgused= 0;
3036 c->iovused= 0;
3037 if (ms_add_msghdr(c) != 0)
3038 {
3039 fprintf(stderr, "Out of memory preparing request.");
3040 return -1;
3041 }
3042
3043 /* binary protocol */
3044 if (c->protocol == binary_prot)
3045 {
3046 if (ms_build_bin_write_buf_get(c, item) != 0)
3047 {
3048 return -1;
3049 }
3050 }
3051 else
3052 {
3053 if (ms_build_ascii_write_buf_get(c, item) != 0)
3054 {
3055 return -1;
3056 }
3057 }
3058
3059 atomic_add_size(&ms_stats.cmd_get, 1);
3060
3061 return EXIT_SUCCESS;
3062 } /* ms_mcd_get */
3063
3064
3065 /**
3066 * for ASCII protocol, this function build the multi-get command
3067 * string and send the command.
3068 *
3069 * @param c, pointer of the concurrency
3070 *
3071 * @return int, if success, return EXIT_SUCCESS, else return -1
3072 */
3073 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3074 {
3075 ms_task_item_t *item;
3076
3077 if (ms_add_iov(c, "get", 3) != 0)
3078 {
3079 return -1;
3080 }
3081
3082 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3083 {
3084 item= c->mlget_task.mlget_item[i].item;
3085 assert(item != NULL);
3086 if ((ms_add_iov(c, " ", 1) != 0)
3087 || (ms_add_iov(c, (char *)&item->key_prefix,
3088 (int)KEY_PREFIX_SIZE) != 0)
3089 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3090 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3091 {
3092 return -1;
3093 }
3094 }
3095
3096 if ((ms_add_iov(c, "\r\n", 2) != 0)
3097 || (c->udp && (ms_build_udp_headers(c) != 0)))
3098 {
3099 return -1;
3100 }
3101
3102 return EXIT_SUCCESS;
3103 } /* ms_build_ascii_write_buf_mlget */
3104
3105
3106 /**
3107 * used to send the multi-get command to server
3108 *
3109 * @param c, pointer of the concurrency
3110 *
3111 * @return int, if success, return EXIT_SUCCESS, else return -1
3112 */
3113 int ms_mcd_mlget(ms_conn_t *c)
3114 {
3115 ms_task_item_t *item;
3116
3117 assert(c != NULL);
3118 assert(c->mlget_task.mlget_num >= 1);
3119
3120 c->currcmd.cmd= CMD_GET;
3121 c->currcmd.isfinish= false;
3122 c->currcmd.retstat= MCD_FAILURE;
3123
3124 if (ms_update_conn_sock_event(c) != 0)
3125 {
3126 return -1;
3127 }
3128
3129 c->msgcurr= 0;
3130 c->msgused= 0;
3131 c->iovused= 0;
3132 if (ms_add_msghdr(c) != 0)
3133 {
3134 fprintf(stderr, "Out of memory preparing request.");
3135 return -1;
3136 }
3137
3138 /* binary protocol */
3139 if (c->protocol == binary_prot)
3140 {
3141 if (ms_build_bin_write_buf_mlget(c) != 0)
3142 {
3143 return -1;
3144 }
3145 }
3146 else
3147 {
3148 if (ms_build_ascii_write_buf_mlget(c) != 0)
3149 {
3150 return -1;
3151 }
3152 }
3153
3154 /* decrease operation time of each item */
3155 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3156 {
3157 item= c->mlget_task.mlget_item[i].item;
3158 atomic_add_size(&ms_stats.cmd_get, 1);
3159 }
3160
3161 return EXIT_SUCCESS;
3162 } /* ms_mcd_mlget */
3163
3164
3165 /**
3166 * binary protocol support
3167 */
3168
3169 /**
3170 * for binary protocol, parse the response of server
3171 *
3172 * @param c, pointer of the concurrency
3173 *
3174 * @return int, if success, return EXIT_SUCCESS, else return -1
3175 */
3176 static int ms_bin_process_response(ms_conn_t *c)
3177 {
3178 const char *errstr= NULL;
3179
3180 assert(c != NULL);
3181
3182 uint32_t bodylen= c->binary_header.response.bodylen;
3183 uint8_t opcode= c->binary_header.response.opcode;
3184 uint16_t status= c->binary_header.response.status;
3185
3186 if (bodylen > 0)
3187 {
3188 c->rvbytes= (int32_t)bodylen;
3189 c->readval= true;
3190 return EXIT_FAILURE;
3191 }
3192 else
3193 {
3194 switch (status)
3195 {
3196 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3197 if (opcode == PROTOCOL_BINARY_CMD_SET)
3198 {
3199 c->currcmd.retstat= MCD_STORED;
3200 }
3201 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3202 {
3203 c->currcmd.retstat= MCD_DELETED;
3204 }
3205 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3206 {
3207 c->currcmd.retstat= MCD_END;
3208 }
3209 break;
3210
3211 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3212 errstr= "Out of memory";
3213 c->currcmd.retstat= MCD_SERVER_ERROR;
3214 break;
3215
3216 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3217 errstr= "Unknown command";
3218 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3219 break;
3220
3221 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3222 errstr= "Not found";
3223 c->currcmd.retstat= MCD_NOTFOUND;
3224 break;
3225
3226 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3227 errstr= "Invalid arguments";
3228 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3229 break;
3230
3231 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3232 errstr= "Data exists for key.";
3233 break;
3234
3235 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3236 errstr= "Too large.";
3237 c->currcmd.retstat= MCD_SERVER_ERROR;
3238 break;
3239
3240 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3241 errstr= "Not stored.";
3242 c->currcmd.retstat= MCD_NOTSTORED;
3243 break;
3244
3245 default:
3246 errstr= "Unknown error";
3247 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3248 break;
3249 } /* switch */
3250
3251 if (errstr != NULL)
3252 {
3253 fprintf(stderr, "%s\n", errstr);
3254 }
3255 }
3256
3257 return EXIT_SUCCESS;
3258 } /* ms_bin_process_response */
3259
3260
3261 /* build binary header and add the header to the buffer to send */
3262
3263 /**
3264 * build binary header and add the header to the buffer to send
3265 *
3266 * @param c, pointer of the concurrency
3267 * @param opcode, operation code
3268 * @param hdr_len, length of header
3269 * @param key_len, length of key
3270 * @param body_len. length of body
3271 */
3272 static void ms_add_bin_header(ms_conn_t *c,
3273 uint8_t opcode,
3274 uint8_t hdr_len,
3275 uint16_t key_len,
3276 uint32_t body_len)
3277 {
3278 protocol_binary_request_header *header;
3279
3280 assert(c != NULL);
3281
3282 header= (protocol_binary_request_header *)c->wcurr;
3283
3284 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3285 header->request.opcode= (uint8_t)opcode;
3286 header->request.keylen= htons(key_len);
3287
3288 header->request.extlen= (uint8_t)hdr_len;
3289 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3290 header->request.vbucket= 0;
3291
3292 header->request.bodylen= htonl(body_len);
3293 header->request.opaque= 0;
3294 header->request.cas= 0;
3295
3296 ms_add_iov(c, c->wcurr, sizeof(header->request));
3297 } /* ms_add_bin_header */
3298
3299
3300 /**
3301 * add the key to the socket write buffer array
3302 *
3303 * @param c, pointer of the concurrency
3304 * @param item, pointer of task item which includes the object
3305 * information
3306 */
3307 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3308 {
3309 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3310 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3311 item->key_size - (int)KEY_PREFIX_SIZE);
3312 }
3313
3314
3315 /**
3316 * for binary protocol, this function build the set command
3317 * and add the command to send buffer array.
3318 *
3319 * @param c, pointer of the concurrency
3320 * @param item, pointer of task item which includes the object
3321 * information
3322 *
3323 * @return int, if success, return EXIT_SUCCESS, else return -1
3324 */
3325 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3326 {
3327 assert(c->wbuf == c->wcurr);
3328
3329 int value_offset;
3330 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3331 uint16_t keylen= (uint16_t)item->key_size;
3332 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3333 + (uint32_t)keylen + (uint32_t)item->value_size;
3334
3335 ms_add_bin_header(c,
3336 PROTOCOL_BINARY_CMD_SET,
3337 sizeof(rep->message.body),
3338 keylen,
3339 bodylen);
3340 rep->message.body.flags= 0;
3341 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3342 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3343 ms_add_key_to_iov(c, item);
3344
3345 if (item->value_offset == INVALID_OFFSET)
3346 {
3347 value_offset= item->key_suffix_offset;
3348 }
3349 else
3350 {
3351 value_offset= item->value_offset;
3352 }
3353 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3354
3355 return EXIT_SUCCESS;
3356 } /* ms_build_bin_write_buf_set */
3357
3358
3359 /**
3360 * for binary protocol, this function build the get command and
3361 * add the command to send buffer array.
3362 *
3363 * @param c, pointer of the concurrency
3364 * @param item, pointer of task item which includes the object
3365 * information
3366 *
3367 * @return int, if success, return EXIT_SUCCESS, else return -1
3368 */
3369 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3370 {
3371 assert(c->wbuf == c->wcurr);
3372
3373 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3374 (uint32_t)item->key_size);
3375 ms_add_key_to_iov(c, item);
3376
3377 return EXIT_SUCCESS;
3378 } /* ms_build_bin_write_buf_get */
3379
3380
3381 /**
3382 * for binary protocol, this function build the multi-get
3383 * command and add the command to send buffer array.
3384 *
3385 * @param c, pointer of the concurrency
3386 * @param item, pointer of task item which includes the object
3387 * information
3388 *
3389 * @return int, if success, return EXIT_SUCCESS, else return -1
3390 */
3391 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3392 {
3393 ms_task_item_t *item;
3394
3395 assert(c->wbuf == c->wcurr);
3396
3397 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3398 {
3399 item= c->mlget_task.mlget_item[i].item;
3400 assert(item != NULL);
3401
3402 ms_add_bin_header(c,
3403 PROTOCOL_BINARY_CMD_GET,
3404 0,
3405 (uint16_t)item->key_size,
3406 (uint32_t)item->key_size);
3407 ms_add_key_to_iov(c, item);
3408 c->wcurr+= sizeof(protocol_binary_request_get);
3409 }
3410
3411 c->wcurr= c->wbuf;
3412
3413 return EXIT_SUCCESS;
3414 } /* ms_build_bin_write_buf_mlget */