2afc9fa10f029dc4a3b477317516a5f9ee408fff
[awesomized/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "mem_config.h"
13
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23
24 #if defined(HAVE_SYS_TIME_H)
25 # include <sys/time.h>
26 #endif
27
28 #if defined(HAVE_TIME_H)
29 # include <time.h>
30 #endif
31
32 #include "ms_setting.h"
33 #include "ms_thread.h"
34 #include "ms_atomic.h"
35
36 #ifdef linux
37 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
38 * optimize the conversion functions, but the prototypes generate warnings
39 * from gcc. The conversion methods isn't the bottleneck for my app, so
40 * just remove the warnings by undef'ing the optimization ..
41 */
42 #undef ntohs
43 #undef ntohl
44 #undef htons
45 #undef htonl
46 #endif
47
48 /* for network write */
49 #define TRANSMIT_COMPLETE 0
50 #define TRANSMIT_INCOMPLETE 1
51 #define TRANSMIT_SOFT_ERROR 2
52 #define TRANSMIT_HARD_ERROR 3
53
54 /* for generating key */
55 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
56 #define KEY_PREFIX_MASK 0x1010101010101010
57
58 /* For parse the value length return by server */
59 #define KEY_TOKEN 1
60 #define VALUELEN_TOKEN 3
61
62 /* global increasing counter, to ensure the key prefix unique */
63 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
64
65 /* global increasing counter, generating request id for UDP */
66 static volatile uint32_t udp_request_id= 0;
67
68 extern pthread_key_t ms_thread_key;
69
70 /* generate upd request id */
71 static uint32_t ms_get_udp_request_id(void);
72
73
74 /* connect initialize */
75 static void ms_task_init(ms_conn_t *c);
76 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
77 static int ms_conn_sock_init(ms_conn_t *c);
78 static int ms_conn_event_init(ms_conn_t *c);
79 static int ms_conn_init(ms_conn_t *c,
80 const int init_state,
81 const int read_buffer_size,
82 const bool is_udp);
83 static void ms_warmup_num_init(ms_conn_t *c);
84 static int ms_item_win_init(ms_conn_t *c);
85
86
87 /* connection close */
88 void ms_conn_free(ms_conn_t *c);
89 static void ms_conn_close(ms_conn_t *c);
90
91
92 /* create network connection */
93 static int ms_new_socket(struct addrinfo *ai);
94 static void ms_maximize_sndbuf(const int sfd);
95 static int ms_network_connect(ms_conn_t *c,
96 char *srv_host_name,
97 const int srv_port,
98 const bool is_udp,
99 int *ret_sfd);
100 static int ms_reconn(ms_conn_t *c);
101
102
103 /* read and parse */
104 static int ms_tokenize_command(char *command,
105 token_t *tokens,
106 const int max_tokens);
107 static int ms_ascii_process_line(ms_conn_t *c, char *command);
108 static int ms_try_read_line(ms_conn_t *c);
109 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
110 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
111 static int ms_try_read_network(ms_conn_t *c);
112 static void ms_verify_value(ms_conn_t *c,
113 ms_mlget_task_item_t *mlget_item,
114 char *value,
115 int vlen);
116 static void ms_ascii_complete_nread(ms_conn_t *c);
117 static void ms_bin_complete_nread(ms_conn_t *c);
118 static void ms_complete_nread(ms_conn_t *c);
119
120
121 /* send functions */
122 static int ms_add_msghdr(ms_conn_t *c);
123 static int ms_ensure_iov_space(ms_conn_t *c);
124 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
125 static int ms_build_udp_headers(ms_conn_t *c);
126 static int ms_transmit(ms_conn_t *c);
127
128
129 /* status adjustment */
130 static void ms_conn_shrink(ms_conn_t *c);
131 static void ms_conn_set_state(ms_conn_t *c, int state);
132 static bool ms_update_event(ms_conn_t *c, const int new_flags);
133 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
134 static uint32_t ms_get_next_sock_index(ms_conn_t *c);
135 static int ms_update_conn_sock_event(ms_conn_t *c);
136 static bool ms_need_yield(ms_conn_t *c);
137 static void ms_update_start_time(ms_conn_t *c);
138
139
140 /* main loop */
141 static void ms_drive_machine(ms_conn_t *c);
142 void ms_event_handler(const int fd, const short which, void *arg);
143
144
145 /* ascii protocol */
146 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
147 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
148 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
149
150
151 /* binary protocol */
152 static int ms_bin_process_response(ms_conn_t *c);
153 static void ms_add_bin_header(ms_conn_t *c,
154 uint8_t opcode,
155 uint8_t hdr_len,
156 uint16_t key_len,
157 uint32_t body_len);
158 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
159 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
160 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
161 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
162
163
164 /**
165 * each key has two parts, prefix and suffix. The suffix is a
166 * string random get form the character table. The prefix is a
167 * uint64_t variable. And the prefix must be unique. we use the
168 * prefix to identify a key. And the prefix can't include
169 * character ' ' '\r' '\n' '\0'.
170 *
171 * @return uint64_t
172 */
173 uint64_t ms_get_key_prefix(void)
174 {
175 uint64_t key_prefix;
176
177 pthread_mutex_lock(&ms_global.seq_mutex);
178 key_prefix_seq|= KEY_PREFIX_MASK;
179 key_prefix= key_prefix_seq;
180 key_prefix_seq++;
181 pthread_mutex_unlock(&ms_global.seq_mutex);
182
183 return key_prefix;
184 } /* ms_get_key_prefix */
185
186
187 /**
188 * get an unique udp request id
189 *
190 * @return an unique UDP request id
191 */
192 static uint32_t ms_get_udp_request_id(void)
193 {
194 return atomic_add_32_nv(&udp_request_id, 1);
195 }
196
197
198 /**
199 * initialize current task structure
200 *
201 * @param c, pointer of the concurrency
202 */
203 static void ms_task_init(ms_conn_t *c)
204 {
205 c->curr_task.cmd= CMD_NULL;
206 c->curr_task.item= 0;
207 c->curr_task.verify= false;
208 c->curr_task.finish_verify= true;
209 c->curr_task.get_miss= true;
210
211 c->curr_task.get_opt= 0;
212 c->curr_task.set_opt= 0;
213 c->curr_task.cycle_undo_get= 0;
214 c->curr_task.cycle_undo_set= 0;
215 c->curr_task.verified_get= 0;
216 c->curr_task.overwrite_set= 0;
217 } /* ms_task_init */
218
219
220 /**
221 * initialize udp for the connection structure
222 *
223 * @param c, pointer of the concurrency
224 * @param is_udp, whether it's udp
225 *
226 * @return int, if success, return EXIT_SUCCESS, else return -1
227 */
228 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
229 {
230 c->hdrbuf= 0;
231 c->rudpbuf= 0;
232 c->udppkt= 0;
233
234 c->rudpsize= UDP_DATA_BUFFER_SIZE;
235 c->hdrsize= 0;
236
237 c->rudpbytes= 0;
238 c->packets= 0;
239 c->recvpkt= 0;
240 c->pktcurr= 0;
241 c->ordcurr= 0;
242
243 c->udp= is_udp;
244
245 if (c->udp || (! c->udp && ms_setting.facebook_test))
246 {
247 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
248 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
249
250 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
251 {
252 if (c->rudpbuf != NULL)
253 free(c->rudpbuf);
254 if (c->udppkt != NULL)
255 free(c->udppkt);
256 fprintf(stderr, "malloc()\n");
257 return -1;
258 }
259 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
260 }
261
262 return EXIT_SUCCESS;
263 } /* ms_conn_udp_init */
264
265
266 /**
267 * initialize the connection structure
268 *
269 * @param c, pointer of the concurrency
270 * @param init_state, (conn_read, conn_write, conn_closing)
271 * @param read_buffer_size
272 * @param is_udp, whether it's udp
273 *
274 * @return int, if success, return EXIT_SUCCESS, else return -1
275 */
276 static int ms_conn_init(ms_conn_t *c,
277 const int init_state,
278 const int read_buffer_size,
279 const bool is_udp)
280 {
281 assert(c != NULL);
282
283 c->rbuf= c->wbuf= 0;
284 c->iov= 0;
285 c->msglist= 0;
286
287 c->rsize= read_buffer_size;
288 c->wsize= WRITE_BUFFER_SIZE;
289 c->iovsize= IOV_LIST_INITIAL;
290 c->msgsize= MSG_LIST_INITIAL;
291
292 /* for replication, each connection need connect all the server */
293 if (ms_setting.rep_write_srv > 0)
294 {
295 c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
296 }
297 else
298 {
299 c->total_sfds= ms_setting.sock_per_conn;
300 }
301 c->alive_sfds= 0;
302
303 c->rbuf= (char *)malloc((size_t)c->rsize);
304 c->wbuf= (char *)malloc((size_t)c->wsize);
305 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
306 c->msglist= (struct msghdr *)malloc(
307 sizeof(struct msghdr) * (size_t)c->msgsize);
308 if (ms_setting.mult_key_num > 1)
309 {
310 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
311 malloc(
312 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
313 }
314 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
315
316 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
317 || (c->msglist == NULL) || (c->tcpsfd == NULL)
318 || ((ms_setting.mult_key_num > 1)
319 && (c->mlget_task.mlget_item == NULL)))
320 {
321 if (c->rbuf != NULL)
322 free(c->rbuf);
323 if (c->wbuf != NULL)
324 free(c->wbuf);
325 if (c->iov != NULL)
326 free(c->iov);
327 if (c->msglist != NULL)
328 free(c->msglist);
329 if (c->mlget_task.mlget_item != NULL)
330 free(c->mlget_task.mlget_item);
331 if (c->tcpsfd != NULL)
332 free(c->tcpsfd);
333 fprintf(stderr, "malloc()\n");
334 return -1;
335 }
336
337 c->state= init_state;
338 c->rvbytes= 0;
339 c->rbytes= 0;
340 c->rcurr= c->rbuf;
341 c->wcurr= c->wbuf;
342 c->iovused= 0;
343 c->msgcurr= 0;
344 c->msgused= 0;
345 c->cur_idx= c->total_sfds; /* default index is a invalid value */
346
347 c->ctnwrite= false;
348 c->readval= false;
349 c->change_sfd= false;
350
351 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
352 c->precmd.isfinish= true; /* default the previous command finished */
353 c->currcmd.isfinish= false;
354 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
355 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
356
357 c->mlget_task.mlget_num= 0;
358 c->mlget_task.value_index= -1; /* default invalid value */
359
360 if (ms_setting.binary_prot_)
361 {
362 c->protocol= binary_prot;
363 }
364 else
365 {
366 c->protocol= ascii_prot;
367 }
368
369 /* initialize udp */
370 if (ms_conn_udp_init(c, is_udp) != 0)
371 {
372 return -1;
373 }
374
375 /* initialize task */
376 ms_task_init(c);
377
378 if (! (ms_setting.facebook_test && is_udp))
379 {
380 atomic_add_32(&ms_stats.active_conns, 1);
381 }
382
383 return EXIT_SUCCESS;
384 } /* ms_conn_init */
385
386
387 /**
388 * when doing 100% get operation, it could preset some objects
389 * to warmup the server. this function is used to initialize the
390 * number of the objects to preset.
391 *
392 * @param c, pointer of the concurrency
393 */
394 static void ms_warmup_num_init(ms_conn_t *c)
395 {
396 /* no set operation, preset all the items in the window */
397 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
398 {
399 c->warmup_num= c->win_size;
400 c->remain_warmup_num= c->warmup_num;
401 }
402 else
403 {
404 c->warmup_num= 0;
405 c->remain_warmup_num= c->warmup_num;
406 }
407 } /* ms_warmup_num_init */
408
409
410 /**
411 * each connection has an item window, this function initialize
412 * the window. The window is used to generate task.
413 *
414 * @param c, pointer of the concurrency
415 *
416 * @return int, if success, return EXIT_SUCCESS, else return -1
417 */
418 static int ms_item_win_init(ms_conn_t *c)
419 {
420 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
421 int exp_cnt= 0;
422
423 c->win_size= (int)ms_setting.win_size;
424 c->set_cursor= 0;
425 c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
426 c->remain_exec_num= c->exec_num;
427
428 c->item_win= (ms_task_item_t *)malloc(
429 sizeof(ms_task_item_t) * (size_t)c->win_size);
430 if (c->item_win == NULL)
431 {
432 fprintf(stderr, "Can't allocate task item array for conn.\n");
433 return -1;
434 }
435 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
436
437 for (int i= 0; i < c->win_size; i++)
438 {
439 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
440 c->item_win[i].key_prefix= ms_get_key_prefix();
441 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
442 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
443 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
444 c->item_win[i].client_time= 0;
445
446 /* set expire time base on the proportion */
447 if (exp_cnt < ms_setting.exp_ver_per * i)
448 {
449 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
450 exp_cnt++;
451 }
452 else
453 {
454 c->item_win[i].exp_time= 0;
455 }
456 }
457
458 ms_warmup_num_init(c);
459
460 return EXIT_SUCCESS;
461 } /* ms_item_win_init */
462
463
464 /**
465 * each connection structure can include one or more sock
466 * handlers. this function create these socks and connect the
467 * server(s).
468 *
469 * @param c, pointer of the concurrency
470 *
471 * @return int, if success, return EXIT_SUCCESS, else return -1
472 */
473 static int ms_conn_sock_init(ms_conn_t *c)
474 {
475 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
476 uint32_t i;
477 int ret_sfd;
478 uint32_t srv_idx= 0;
479
480 assert(c != NULL);
481 assert(c->tcpsfd != NULL);
482
483 for (i= 0; i < c->total_sfds; i++)
484 {
485 ret_sfd= 0;
486 if (ms_setting.rep_write_srv > 0)
487 {
488 /* for replication, each connection need connect all the server */
489 srv_idx= i % ms_setting.srv_cnt;
490 }
491 else
492 {
493 /* all the connections in a thread connects the same server */
494 srv_idx= ms_thread->thread_ctx->srv_idx;
495 }
496
497 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
498 ms_setting.servers[srv_idx].srv_port,
499 ms_setting.udp, &ret_sfd) != 0)
500 {
501 break;
502 }
503
504 if (i == 0)
505 {
506 c->sfd= ret_sfd;
507 }
508
509 if (! ms_setting.udp)
510 {
511 c->tcpsfd[i]= ret_sfd;
512 }
513
514 c->alive_sfds++;
515 }
516
517 /* initialize udp sock handler if necessary */
518 if (ms_setting.facebook_test)
519 {
520 ret_sfd= 0;
521 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
522 ms_setting.servers[srv_idx].srv_port,
523 true, &ret_sfd) != 0)
524 {
525 c->udpsfd= 0;
526 }
527 else
528 {
529 c->udpsfd= ret_sfd;
530 }
531 }
532
533 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
534 {
535 if (ms_setting.udp)
536 {
537 close(c->sfd);
538 }
539 else
540 {
541 for (uint32_t j= 0; j < i; j++)
542 {
543 close(c->tcpsfd[j]);
544 }
545 }
546
547 if (c->udpsfd != 0)
548 {
549 close(c->udpsfd);
550 }
551
552 return -1;
553 }
554
555 return EXIT_SUCCESS;
556 } /* ms_conn_sock_init */
557
558
559 /**
560 * each connection is managed by libevent, this function
561 * initialize the event of the connection structure.
562 *
563 * @param c, pointer of the concurrency
564 *
565 * @return int, if success, return EXIT_SUCCESS, else return -1
566 */
567 static int ms_conn_event_init(ms_conn_t *c)
568 {
569 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
570 short event_flags= EV_WRITE | EV_PERSIST;
571
572 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
573 event_base_set(ms_thread->base, &c->event);
574 c->ev_flags= event_flags;
575
576 if (event_add(&c->event, NULL) == -1)
577 {
578 return -1;
579 }
580
581 return EXIT_SUCCESS;
582 } /* ms_conn_event_init */
583
584
585 /**
586 * setup a connection, each connection structure of each
587 * thread must call this function to initialize.
588 *
589 * @param c, pointer of the concurrency
590 *
591 * @return int, if success, return EXIT_SUCCESS, else return -1
592 */
593 int ms_setup_conn(ms_conn_t *c)
594 {
595 if (ms_item_win_init(c) != 0)
596 {
597 return -1;
598 }
599
600 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
601 {
602 return -1;
603 }
604
605 if (ms_conn_sock_init(c) != 0)
606 {
607 return -1;
608 }
609
610 if (ms_conn_event_init(c) != 0)
611 {
612 return -1;
613 }
614
615 return EXIT_SUCCESS;
616 } /* ms_setup_conn */
617
618
619 /**
620 * Frees a connection.
621 *
622 * @param c, pointer of the concurrency
623 */
624 void ms_conn_free(ms_conn_t *c)
625 {
626 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
627 if (c != NULL)
628 {
629 if (c->hdrbuf != NULL)
630 free(c->hdrbuf);
631 if (c->msglist != NULL)
632 free(c->msglist);
633 if (c->rbuf != NULL)
634 free(c->rbuf);
635 if (c->wbuf != NULL)
636 free(c->wbuf);
637 if (c->iov != NULL)
638 free(c->iov);
639 if (c->mlget_task.mlget_item != NULL)
640 free(c->mlget_task.mlget_item);
641 if (c->rudpbuf != NULL)
642 free(c->rudpbuf);
643 if (c->udppkt != NULL)
644 free(c->udppkt);
645 if (c->item_win != NULL)
646 free(c->item_win);
647 if (c->tcpsfd != NULL)
648 free(c->tcpsfd);
649
650 if (--ms_thread->nactive_conn == 0)
651 {
652 free(ms_thread->conn);
653 }
654 }
655 } /* ms_conn_free */
656
657
658 /**
659 * close a connection
660 *
661 * @param c, pointer of the concurrency
662 */
663 static void ms_conn_close(ms_conn_t *c)
664 {
665 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
666 assert(c != NULL);
667
668 /* delete the event, the socket and the connection */
669 event_del(&c->event);
670
671 for (uint32_t i= 0; i < c->total_sfds; i++)
672 {
673 if (c->tcpsfd[i] > 0)
674 {
675 close(c->tcpsfd[i]);
676 }
677 }
678 c->sfd= 0;
679
680 if (ms_setting.facebook_test)
681 {
682 close(c->udpsfd);
683 }
684
685 atomic_dec_32(&ms_stats.active_conns);
686
687 ms_conn_free(c);
688
689 if (ms_setting.run_time == 0)
690 {
691 pthread_mutex_lock(&ms_global.run_lock.lock);
692 ms_global.run_lock.count++;
693 pthread_cond_signal(&ms_global.run_lock.cond);
694 pthread_mutex_unlock(&ms_global.run_lock.lock);
695 }
696
697 if (ms_thread->nactive_conn == 0)
698 {
699 pthread_exit(NULL);
700 }
701 } /* ms_conn_close */
702
703
704 /**
705 * create a new sock
706 *
707 * @param ai, server address information
708 *
709 * @return int, if success, return EXIT_SUCCESS, else return -1
710 */
711 static int ms_new_socket(struct addrinfo *ai)
712 {
713 int sfd;
714
715 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
716 {
717 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
718 return -1;
719 }
720
721 return sfd;
722 } /* ms_new_socket */
723
724
725 /**
726 * Sets a socket's send buffer size to the maximum allowed by the system.
727 *
728 * @param sfd, file descriptor of socket
729 */
730 static void ms_maximize_sndbuf(const int sfd)
731 {
732 socklen_t intsize= sizeof(int);
733 unsigned int last_good= 0;
734 unsigned int min, max, avg;
735 unsigned int old_size;
736
737 /* Start with the default size. */
738 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
739 {
740 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
741 return;
742 }
743
744 /* Binary-search for the real maximum. */
745 min= old_size;
746 max= MAX_SENDBUF_SIZE;
747
748 while (min <= max)
749 {
750 avg= ((unsigned int)(min + max)) / 2;
751 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
752 {
753 last_good= avg;
754 min= avg + 1;
755 }
756 else
757 {
758 max= avg - 1;
759 }
760 }
761 (void)last_good;
762 } /* ms_maximize_sndbuf */
763
764
765 /**
766 * socket connects the server
767 *
768 * @param c, pointer of the concurrency
769 * @param srv_host_name, the host name of the server
770 * @param srv_port, port of server
771 * @param is_udp, whether it's udp
772 * @param ret_sfd, the connected socket file descriptor
773 *
774 * @return int, if success, return EXIT_SUCCESS, else return -1
775 */
776 static int ms_network_connect(ms_conn_t *c,
777 char *srv_host_name,
778 const int srv_port,
779 const bool is_udp,
780 int *ret_sfd)
781 {
782 int sfd;
783 struct linger ling=
784 {
785 0, 0
786 };
787 struct addrinfo *ai;
788 struct addrinfo *next;
789 struct addrinfo hints;
790 char port_buf[NI_MAXSERV];
791 int error;
792 int success= 0;
793
794 int flags= 1;
795
796 /*
797 * the memset call clears nonstandard fields in some impementations
798 * that otherwise mess things up.
799 */
800 memset(&hints, 0, sizeof(hints));
801 #ifdef AI_ADDRCONFIG
802 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
803 #else
804 hints.ai_flags= AI_PASSIVE;
805 #endif /* AI_ADDRCONFIG */
806 if (is_udp)
807 {
808 hints.ai_protocol= IPPROTO_UDP;
809 hints.ai_socktype= SOCK_DGRAM;
810 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
811 }
812 else
813 {
814 hints.ai_family= AF_UNSPEC;
815 hints.ai_protocol= IPPROTO_TCP;
816 hints.ai_socktype= SOCK_STREAM;
817 }
818
819 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
820 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
821 if (error != 0)
822 {
823 if (error != EAI_SYSTEM)
824 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
825 else
826 perror("getaddrinfo()\n");
827
828 return -1;
829 }
830
831 for (next= ai; next; next= next->ai_next)
832 {
833 if ((sfd= ms_new_socket(next)) == -1)
834 {
835 freeaddrinfo(ai);
836 return -1;
837 }
838
839 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
840 if (is_udp)
841 {
842 ms_maximize_sndbuf(sfd);
843 }
844 else
845 {
846 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
847 sizeof(flags));
848 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
849 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
850 sizeof(flags));
851 }
852
853 if (is_udp)
854 {
855 c->srv_recv_addr_size= sizeof(struct sockaddr);
856 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
857 }
858 else
859 {
860 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
861 {
862 close(sfd);
863 freeaddrinfo(ai);
864 return -1;
865 }
866 }
867
868 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
869 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
870 {
871 fprintf(stderr, "setting O_NONBLOCK\n");
872 close(sfd);
873 freeaddrinfo(ai);
874 return -1;
875 }
876
877 if (ret_sfd != NULL)
878 {
879 *ret_sfd= sfd;
880 }
881
882 success++;
883 }
884
885 freeaddrinfo(ai);
886
887 /* Return zero if we detected no errors in starting up connections */
888 return success == 0;
889 } /* ms_network_connect */
890
891
892 /**
893 * reconnect a disconnected sock
894 *
895 * @param c, pointer of the concurrency
896 *
897 * @return int, if success, return EXIT_SUCCESS, else return -1
898 */
899 static int ms_reconn(ms_conn_t *c)
900 {
901 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
902 uint32_t srv_idx= 0;
903 uint32_t srv_conn_cnt= 0;
904
905 if (ms_setting.rep_write_srv > 0)
906 {
907 srv_idx= c->cur_idx % ms_setting.srv_cnt;
908 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
909 }
910 else
911 {
912 srv_idx= ms_thread->thread_ctx->srv_idx;
913 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
914 }
915
916 /* close the old socket handler */
917 close(c->sfd);
918 c->tcpsfd[c->cur_idx]= 0;
919
920 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
921 % srv_conn_cnt == 0)
922 {
923 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
924 fprintf(stderr, "Server %s:%d disconnect\n",
925 ms_setting.servers[srv_idx].srv_host_name,
926 ms_setting.servers[srv_idx].srv_port);
927 }
928
929 if (ms_setting.rep_write_srv > 0)
930 {
931 uint32_t i= 0;
932
933 for (i= 0; i < c->total_sfds; i++)
934 {
935 if (c->tcpsfd[i] != 0)
936 {
937 break;
938 }
939 }
940
941 /* all socks disconnect */
942 if (i == c->total_sfds)
943 {
944 return -1;
945 }
946 }
947 else
948 {
949 do
950 {
951 /* reconnect success, break the loop */
952 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
953 ms_setting.servers[srv_idx].srv_port,
954 ms_setting.udp, &c->sfd) == 0)
955 {
956 c->tcpsfd[c->cur_idx]= c->sfd;
957 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
958 % (uint32_t)srv_conn_cnt == 0)
959 {
960 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
961 int reconn_time=
962 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
963 - ms_setting.servers[srv_idx].disconn_time
964 .tv_sec);
965 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
966 ms_setting.servers[srv_idx].srv_host_name,
967 ms_setting.servers[srv_idx].srv_port, reconn_time);
968 }
969 break;
970 }
971
972 if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
973 {
974 /* wait a second and reconnect */
975 sleep(1);
976 }
977 }
978 while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
979 }
980
981 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
982 {
983 c->sfd= 0;
984 c->alive_sfds--;
985 }
986
987 return EXIT_SUCCESS;
988 } /* ms_reconn */
989
990
991 /**
992 * reconnect several disconnected socks in the connection
993 * structure, the ever-1-second timer of the thread will check
994 * whether some socks in the connections disconnect. if
995 * disconnect, reconnect the sock.
996 *
997 * @param c, pointer of the concurrency
998 *
999 * @return int, if success, return EXIT_SUCCESS, else return -1
1000 */
1001 int ms_reconn_socks(ms_conn_t *c)
1002 {
1003 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
1004 uint32_t srv_idx= 0;
1005 int ret_sfd= 0;
1006 uint32_t srv_conn_cnt= 0;
1007 struct timeval cur_time;
1008
1009 assert(c != NULL);
1010
1011 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1012 {
1013 return EXIT_SUCCESS;
1014 }
1015
1016 for (uint32_t i= 0; i < c->total_sfds; i++)
1017 {
1018 if (c->tcpsfd[i] == 0)
1019 {
1020 gettimeofday(&cur_time, NULL);
1021
1022 /**
1023 * For failover test of replication, reconnect the socks after
1024 * it disconnects more than 5 seconds, Otherwise memslap will
1025 * block at connect() function and the work threads can't work
1026 * in this interval.
1027 */
1028 if (cur_time.tv_sec
1029 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1030 {
1031 break;
1032 }
1033
1034 if (ms_setting.rep_write_srv > 0)
1035 {
1036 srv_idx= i % ms_setting.srv_cnt;
1037 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
1038 }
1039 else
1040 {
1041 srv_idx= ms_thread->thread_ctx->srv_idx;
1042 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1043 }
1044
1045 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1046 ms_setting.servers[srv_idx].srv_port,
1047 ms_setting.udp, &ret_sfd) == 0)
1048 {
1049 c->tcpsfd[i]= ret_sfd;
1050 c->alive_sfds++;
1051
1052 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1053 % (uint32_t)srv_conn_cnt == 0)
1054 {
1055 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1056 int reconn_time=
1057 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1058 - ms_setting.servers[srv_idx].disconn_time
1059 .tv_sec);
1060 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1061 ms_setting.servers[srv_idx].srv_host_name,
1062 ms_setting.servers[srv_idx].srv_port, reconn_time);
1063 }
1064 }
1065 }
1066 }
1067
1068 return EXIT_SUCCESS;
1069 } /* ms_reconn_socks */
1070
1071
1072 /**
1073 * Tokenize the command string by replacing whitespace with '\0' and update
1074 * the token array tokens with pointer to start of each token and length.
1075 * Returns total number of tokens. The last valid token is the terminal
1076 * token (value points to the first unprocessed character of the string and
1077 * length zero).
1078 *
1079 * Usage example:
1080 *
1081 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1082 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1083 * ...
1084 * }
1085 * ncommand = tokens[ix].value - command;
1086 * command = tokens[ix].value;
1087 * }
1088 *
1089 * @param command, the command string to token
1090 * @param tokens, array to store tokens
1091 * @param max_tokens, maximum tokens number
1092 *
1093 * @return int, the number of tokens
1094 */
1095 static int ms_tokenize_command(char *command,
1096 token_t *tokens,
1097 const int max_tokens)
1098 {
1099 char *s, *e;
1100 int ntokens= 0;
1101
1102 assert(command != NULL && tokens != NULL && max_tokens > 1);
1103
1104 for (s= e= command; ntokens < max_tokens - 1; ++e)
1105 {
1106 if (*e == ' ')
1107 {
1108 if (s != e)
1109 {
1110 tokens[ntokens].value= s;
1111 tokens[ntokens].length= (size_t)(e - s);
1112 ntokens++;
1113 *e= '\0';
1114 }
1115 s= e + 1;
1116 }
1117 else if (*e == '\0')
1118 {
1119 if (s != e)
1120 {
1121 tokens[ntokens].value= s;
1122 tokens[ntokens].length= (size_t)(e - s);
1123 ntokens++;
1124 }
1125
1126 break; /* string end */
1127 }
1128 }
1129
1130 return ntokens;
1131 } /* ms_tokenize_command */
1132
1133
1134 /**
1135 * parse the response of server.
1136 *
1137 * @param c, pointer of the concurrency
1138 * @param command, the string responded by server
1139 *
1140 * @return int, if the command completed return EXIT_SUCCESS, else return
1141 * -1
1142 */
1143 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1144 {
1145 int ret= 0;
1146 int64_t value_len;
1147 char *buffer= command;
1148
1149 assert(c != NULL);
1150
1151 /**
1152 * for command get, we store the returned value into local buffer
1153 * then continue in ms_complete_nread().
1154 */
1155
1156 switch (buffer[0])
1157 {
1158 case 'V': /* VALUE || VERSION */
1159 if (buffer[1] == 'A') /* VALUE */
1160 {
1161 token_t tokens[MAX_TOKENS];
1162 ms_tokenize_command(command, tokens, MAX_TOKENS);
1163 errno= 0;
1164 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1165 if (errno != 0)
1166 {
1167 printf("<%d ERROR %s\n", c->sfd, strerror(errno));
1168 }
1169 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1170
1171 /*
1172 * We read the \r\n into the string since not doing so is more
1173 * cycles then the waster of memory to do so.
1174 *
1175 * We are null terminating through, which will most likely make
1176 * some people lazy about using the return length.
1177 */
1178 c->rvbytes= (int)(value_len + 2);
1179 c->readval= true;
1180 ret= -1;
1181 }
1182
1183 break;
1184
1185 case 'O': /* OK */
1186 c->currcmd.retstat= MCD_SUCCESS;
1187
1188 case 'S': /* STORED STATS SERVER_ERROR */
1189 if (buffer[2] == 'A') /* STORED STATS */
1190 { /* STATS*/
1191 c->currcmd.retstat= MCD_STAT;
1192 }
1193 else if (buffer[1] == 'E')
1194 {
1195 /* SERVER_ERROR */
1196 printf("<%d %s\n", c->sfd, buffer);
1197
1198 c->currcmd.retstat= MCD_SERVER_ERROR;
1199 }
1200 else if (buffer[1] == 'T')
1201 {
1202 /* STORED */
1203 c->currcmd.retstat= MCD_STORED;
1204 }
1205 else
1206 {
1207 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1208 }
1209 break;
1210
1211 case 'D': /* DELETED DATA */
1212 if (buffer[1] == 'E')
1213 {
1214 c->currcmd.retstat= MCD_DELETED;
1215 }
1216 else
1217 {
1218 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1219 }
1220
1221 break;
1222
1223 case 'N': /* NOT_FOUND NOT_STORED*/
1224 if (buffer[4] == 'F')
1225 {
1226 c->currcmd.retstat= MCD_NOTFOUND;
1227 }
1228 else if (buffer[4] == 'S')
1229 {
1230 printf("<%d %s\n", c->sfd, buffer);
1231 c->currcmd.retstat= MCD_NOTSTORED;
1232 }
1233 else
1234 {
1235 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1236 }
1237 break;
1238
1239 case 'E': /* PROTOCOL ERROR or END */
1240 if (buffer[1] == 'N')
1241 {
1242 /* END */
1243 c->currcmd.retstat= MCD_END;
1244 }
1245 else if (buffer[1] == 'R')
1246 {
1247 printf("<%d ERROR\n", c->sfd);
1248 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1249 }
1250 else if (buffer[1] == 'X')
1251 {
1252 c->currcmd.retstat= MCD_DATA_EXISTS;
1253 printf("<%d %s\n", c->sfd, buffer);
1254 }
1255 else
1256 {
1257 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1258 }
1259 break;
1260
1261 case 'C': /* CLIENT ERROR */
1262 printf("<%d %s\n", c->sfd, buffer);
1263 c->currcmd.retstat= MCD_CLIENT_ERROR;
1264 break;
1265
1266 default:
1267 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1268 break;
1269 } /* switch */
1270
1271 return ret;
1272 } /* ms_ascii_process_line */
1273
1274
1275 /**
1276 * after one operation completes, reset the concurrency
1277 *
1278 * @param c, pointer of the concurrency
1279 * @param timeout, whether it's timeout
1280 */
1281 void ms_reset_conn(ms_conn_t *c, bool timeout)
1282 {
1283 assert(c != NULL);
1284
1285 if (c->udp)
1286 {
1287 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1288 {
1289 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
1290 }
1291
1292 c->packets= 0;
1293 c->recvpkt= 0;
1294 c->pktcurr= 0;
1295 c->ordcurr= 0;
1296 c->rudpbytes= 0;
1297 }
1298 c->currcmd.isfinish= true;
1299 c->ctnwrite= false;
1300 c->rbytes= 0;
1301 c->rcurr= c->rbuf;
1302 c->msgcurr = 0;
1303 c->msgused = 0;
1304 c->iovused = 0;
1305 ms_conn_set_state(c, conn_write);
1306 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1307
1308 if (timeout)
1309 {
1310 ms_drive_machine(c);
1311 }
1312 } /* ms_reset_conn */
1313
1314
1315 /**
1316 * if we have a complete line in the buffer, process it.
1317 *
1318 * @param c, pointer of the concurrency
1319 *
1320 * @return int, if success, return EXIT_SUCCESS, else return -1
1321 */
1322 static int ms_try_read_line(ms_conn_t *c)
1323 {
1324 if (c->protocol == binary_prot)
1325 {
1326 /* Do we have the complete packet header? */
1327 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1328 {
1329 /* need more data! */
1330 return EXIT_SUCCESS;
1331 }
1332 else
1333 {
1334 #ifdef NEED_ALIGN
1335 if (((long)(c->rcurr)) % 8 != 0)
1336 {
1337 /* must realign input buffer */
1338 memmove(c->rbuf, c->rcurr, c->rbytes);
1339 c->rcurr= c->rbuf;
1340 if (settings.verbose)
1341 {
1342 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1343 }
1344 }
1345 #endif
1346 protocol_binary_response_header *rsp;
1347 rsp= (protocol_binary_response_header *)c->rcurr;
1348
1349 c->binary_header= *rsp;
1350 c->binary_header.response.extlen= rsp->response.extlen;
1351 c->binary_header.response.keylen= ntohs(rsp->response.keylen);
1352 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1353 c->binary_header.response.status= ntohs(rsp->response.status);
1354
1355 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1356 {
1357 fprintf(stderr, "Invalid magic: %x\n",
1358 c->binary_header.response.magic);
1359 ms_conn_set_state(c, conn_closing);
1360 return EXIT_SUCCESS;
1361 }
1362
1363 /* process this complete response */
1364 if (ms_bin_process_response(c) == 0)
1365 {
1366 /* current operation completed */
1367 ms_reset_conn(c, false);
1368 return -1;
1369 }
1370 else
1371 {
1372 c->rbytes-= (int32_t)sizeof(c->binary_header);
1373 c->rcurr+= sizeof(c->binary_header);
1374 }
1375 }
1376 }
1377 else
1378 {
1379 char *el, *cont;
1380
1381 assert(c != NULL);
1382 assert(c->rcurr <= (c->rbuf + c->rsize));
1383
1384 if (c->rbytes == 0)
1385 return EXIT_SUCCESS;
1386
1387 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1388 if (! el)
1389 return EXIT_SUCCESS;
1390
1391 cont= el + 1;
1392 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1393 {
1394 el--;
1395 }
1396 *el= '\0';
1397
1398 assert(cont <= (c->rcurr + c->rbytes));
1399
1400 /* process this complete line */
1401 if (ms_ascii_process_line(c, c->rcurr) == 0)
1402 {
1403 /* current operation completed */
1404 ms_reset_conn(c, false);
1405 return -1;
1406 }
1407 else
1408 {
1409 /* current operation didn't complete */
1410 c->rbytes-= (int32_t)(cont - c->rcurr);
1411 c->rcurr= cont;
1412 }
1413
1414 assert(c->rcurr <= (c->rbuf + c->rsize));
1415 }
1416
1417 return -1;
1418 } /* ms_try_read_line */
1419
1420
1421 /**
1422 * because the packet of UDP can't ensure the order, the
1423 * function is used to sort the received udp packet.
1424 *
1425 * @param c, pointer of the concurrency
1426 * @param buf, the buffer to store the ordered packages data
1427 * @param rbytes, the maximum capacity of the buffer
1428 *
1429 * @return int, if success, return the copy bytes, else return
1430 * -1
1431 */
1432 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1433 {
1434 int len= 0;
1435 int wbytes= 0;
1436 uint16_t req_id= 0;
1437 uint16_t seq_num= 0;
1438 uint16_t packets= 0;
1439 unsigned char *header= NULL;
1440
1441 /* no enough data */
1442 assert(c != NULL);
1443 assert(buf != NULL);
1444 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1445
1446 /* calculate received packets count */
1447 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1448 {
1449 /* the last packet has some data */
1450 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1451 }
1452 else
1453 {
1454 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1455 }
1456
1457 /* get the total packets count if necessary */
1458 if (c->packets == 0)
1459 {
1460 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1461 }
1462
1463 /* build the ordered packet array */
1464 for (int i= c->pktcurr; i < c->recvpkt; i++)
1465 {
1466 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1467 req_id= (uint16_t)HEADER_TO_REQID(header);
1468 assert(req_id == c->request_id % (1 << 16));
1469
1470 packets= (uint16_t)HEADER_TO_PACKETS(header);
1471 assert(c->packets == HEADER_TO_PACKETS(header));
1472
1473 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1474 c->udppkt[seq_num].header= header;
1475 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1476
1477 if (i == c->recvpkt - 1)
1478 {
1479 /* last received packet */
1480 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1481 {
1482 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1483 c->pktcurr++;
1484 }
1485 else
1486 {
1487 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1488 - UDP_HEADER_SIZE;
1489 }
1490 }
1491 else
1492 {
1493 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1494 c->pktcurr++;
1495 }
1496 }
1497
1498 for (int i= c->ordcurr; i < c->recvpkt; i++)
1499 {
1500 /* there is some data to copy */
1501 if ((c->udppkt[i].data != NULL)
1502 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1503 {
1504 header= c->udppkt[i].header;
1505 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1506 if (len > rbytes - wbytes)
1507 {
1508 len= rbytes - wbytes;
1509 }
1510
1511 assert(len <= rbytes - wbytes);
1512 assert(i == HEADER_TO_SEQNUM(header));
1513
1514 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1515 (size_t)len);
1516 wbytes+= len;
1517 c->udppkt[i].copybytes+= len;
1518
1519 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1520 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1521 {
1522 /* finish copying all the data of this packet, next */
1523 c->ordcurr++;
1524 }
1525
1526 /* last received packet, and finish copying all the data */
1527 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1528 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1529 {
1530 break;
1531 }
1532
1533 /* no space to copy data */
1534 if (wbytes >= rbytes)
1535 {
1536 break;
1537 }
1538
1539 /* it doesn't finish reading all the data of the packet from network */
1540 if ((i != c->recvpkt - 1)
1541 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1542 {
1543 break;
1544 }
1545 }
1546 else
1547 {
1548 /* no data to copy */
1549 break;
1550 }
1551 }
1552 (void)packets;
1553
1554 return wbytes == 0 ? -1 : wbytes;
1555 } /* ms_sort_udp_packet */
1556
1557
1558 /**
1559 * encapsulate upd read like tcp read
1560 *
1561 * @param c, pointer of the concurrency
1562 * @param buf, read buffer
1563 * @param len, length to read
1564 *
1565 * @return int, if success, return the read bytes, else return
1566 * -1
1567 */
1568 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1569 {
1570 int res= 0;
1571 int avail= 0;
1572 int rbytes= 0;
1573 int copybytes= 0;
1574
1575 assert(c->udp);
1576
1577 while (1)
1578 {
1579 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1580 {
1581 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1582 if (! new_rbuf)
1583 {
1584 fprintf(stderr, "Couldn't realloc input buffer.\n");
1585 c->rudpbytes= 0; /* ignore what we read */
1586 return -1;
1587 }
1588 c->rudpbuf= new_rbuf;
1589 c->rudpsize*= 2;
1590 }
1591
1592 avail= c->rudpsize - c->rudpbytes;
1593 /* UDP each time read a packet, 1400 bytes */
1594 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1595
1596 if (res > 0)
1597 {
1598 atomic_add_size(&ms_stats.bytes_read, res);
1599 c->rudpbytes+= res;
1600 rbytes+= res;
1601 if (res == avail)
1602 {
1603 continue;
1604 }
1605 else
1606 {
1607 break;
1608 }
1609 }
1610
1611 if (res == 0)
1612 {
1613 /* "connection" closed */
1614 return res;
1615 }
1616
1617 if (res == -1)
1618 {
1619 /* no data to read */
1620 return res;
1621 }
1622 }
1623
1624 /* copy data to read buffer */
1625 if (rbytes > 0)
1626 {
1627 copybytes= ms_sort_udp_packet(c, buf, len);
1628 }
1629
1630 if (copybytes == -1)
1631 {
1632 atomic_add_size(&ms_stats.pkt_disorder, 1);
1633 }
1634
1635 return copybytes;
1636 } /* ms_udp_read */
1637
1638
1639 /*
1640 * read from network as much as we can, handle buffer overflow and connection
1641 * close.
1642 * before reading, move the remaining incomplete fragment of a command
1643 * (if any) to the beginning of the buffer.
1644 * return EXIT_SUCCESS if there's nothing to read on the first read.
1645 */
1646
1647 /**
1648 * read from network as much as we can, handle buffer overflow and connection
1649 * close. before reading, move the remaining incomplete fragment of a command
1650 * (if any) to the beginning of the buffer.
1651 *
1652 * @param c, pointer of the concurrency
1653 *
1654 * @return int,
1655 * return EXIT_SUCCESS if there's nothing to read on the first read.
1656 * return EXIT_FAILURE if get data
1657 * return -1 if error happens
1658 */
1659 static int ms_try_read_network(ms_conn_t *c)
1660 {
1661 int gotdata= 0;
1662 int res;
1663 int64_t avail;
1664
1665 assert(c != NULL);
1666
1667 if ((c->rcurr != c->rbuf)
1668 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1669 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1670 {
1671 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1672 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1673 c->rcurr= c->rbuf;
1674 }
1675
1676 while (1)
1677 {
1678 if (c->rbytes >= c->rsize)
1679 {
1680 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1681 if (! new_rbuf)
1682 {
1683 fprintf(stderr, "Couldn't realloc input buffer.\n");
1684 c->rbytes= 0; /* ignore what we read */
1685 return -1;
1686 }
1687 c->rcurr= c->rbuf= new_rbuf;
1688 c->rsize*= 2;
1689 }
1690
1691 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1692 if (avail == 0)
1693 {
1694 break;
1695 }
1696
1697 if (c->udp)
1698 {
1699 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1700 }
1701 else
1702 {
1703 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1704 }
1705
1706 if (res > 0)
1707 {
1708 if (! c->udp)
1709 {
1710 atomic_add_size(&ms_stats.bytes_read, res);
1711 }
1712 gotdata= 1;
1713 c->rbytes+= res;
1714 if (res == avail)
1715 {
1716 continue;
1717 }
1718 else
1719 {
1720 break;
1721 }
1722 }
1723 if (res == 0)
1724 {
1725 /* connection closed */
1726 ms_conn_set_state(c, conn_closing);
1727 return -1;
1728 }
1729 if (res == -1)
1730 {
1731 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1732 break;
1733 /* Should close on unhandled errors. */
1734 ms_conn_set_state(c, conn_closing);
1735 return -1;
1736 }
1737 }
1738
1739 return gotdata;
1740 } /* ms_try_read_network */
1741
1742
1743 /**
1744 * after get the object from server, verify the value if
1745 * necessary.
1746 *
1747 * @param c, pointer of the concurrency
1748 * @param mlget_item, pointer of mulit-get task item structure
1749 * @param value, received value string
1750 * @param vlen, received value string length
1751 */
1752 static void ms_verify_value(ms_conn_t *c,
1753 ms_mlget_task_item_t *mlget_item,
1754 char *value,
1755 int vlen)
1756 {
1757 if (c->curr_task.verify)
1758 {
1759 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1760 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1761 char *orignkey=
1762 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1763
1764 /* verify expire time if necessary */
1765 if (c->curr_task.item->exp_time > 0)
1766 {
1767 struct timeval curr_time;
1768 gettimeofday(&curr_time, NULL);
1769
1770 /* object expired but get it now */
1771 if (curr_time.tv_sec - c->curr_task.item->client_time
1772 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1773 {
1774 atomic_add_size(&ms_stats.exp_get, 1);
1775
1776 if (ms_setting.verbose)
1777 {
1778 char set_time[64];
1779 char cur_time[64];
1780 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1781 localtime(&c->curr_task.item->client_time));
1782 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1783 localtime(&curr_time.tv_sec));
1784 fprintf(stderr,
1785 "\n<%d expire time verification failed, "
1786 "object expired but get it now\n"
1787 "\tkey len: %d\n"
1788 "\tkey: %" PRIx64 " %.*s\n"
1789 "\tset time: %s current time: %s "
1790 "diff time: %d expire time: %d\n"
1791 "\texpected data: \n"
1792 "\treceived data len: %d\n"
1793 "\treceived data: %.*s\n",
1794 c->sfd,
1795 c->curr_task.item->key_size,
1796 c->curr_task.item->key_prefix,
1797 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1798 orignkey,
1799 set_time,
1800 cur_time,
1801 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1802 c->curr_task.item->exp_time,
1803 vlen,
1804 vlen,
1805 value);
1806 fflush(stderr);
1807 }
1808 }
1809 }
1810 else
1811 {
1812 if ((c->curr_task.item->value_size != vlen)
1813 || (memcmp(orignval, value, (size_t)vlen) != 0))
1814 {
1815 atomic_add_size(&ms_stats.vef_failed, 1);
1816
1817 if (ms_setting.verbose)
1818 {
1819 fprintf(stderr,
1820 "\n<%d data verification failed\n"
1821 "\tkey len: %d\n"
1822 "\tkey: %" PRIx64" %.*s\n"
1823 "\texpected data len: %d\n"
1824 "\texpected data: %.*s\n"
1825 "\treceived data len: %d\n"
1826 "\treceived data: %.*s\n",
1827 c->sfd,
1828 c->curr_task.item->key_size,
1829 c->curr_task.item->key_prefix,
1830 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1831 orignkey,
1832 c->curr_task.item->value_size,
1833 c->curr_task.item->value_size,
1834 orignval,
1835 vlen,
1836 vlen,
1837 value);
1838 fflush(stderr);
1839 }
1840 }
1841 }
1842
1843 c->curr_task.finish_verify= true;
1844
1845 if (mlget_item != NULL)
1846 {
1847 mlget_item->finish_verify= true;
1848 }
1849 }
1850 } /* ms_verify_value */
1851
1852
1853 /**
1854 * For ASCII protocol, after store the data into the local
1855 * buffer, run this function to handle the data.
1856 *
1857 * @param c, pointer of the concurrency
1858 */
1859 static void ms_ascii_complete_nread(ms_conn_t *c)
1860 {
1861 assert(c != NULL);
1862 assert(c->rbytes >= c->rvbytes);
1863 assert(c->protocol == ascii_prot);
1864 if (c->rvbytes > 2)
1865 {
1866 assert(
1867 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1868 }
1869
1870 /* multi-get */
1871 ms_mlget_task_item_t *mlget_item= NULL;
1872 if (((ms_setting.mult_key_num > 1)
1873 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1874 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1875 {
1876 c->mlget_task.value_index++;
1877 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1878
1879 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1880 {
1881 c->curr_task.item= mlget_item->item;
1882 c->curr_task.verify= mlget_item->verify;
1883 c->curr_task.finish_verify= mlget_item->finish_verify;
1884 mlget_item->get_miss= false;
1885 }
1886 else
1887 {
1888 /* Try to find the task item in multi-get task array */
1889 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1890 {
1891 mlget_item= &c->mlget_task.mlget_item[i];
1892 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1893 {
1894 c->curr_task.item= mlget_item->item;
1895 c->curr_task.verify= mlget_item->verify;
1896 c->curr_task.finish_verify= mlget_item->finish_verify;
1897 mlget_item->get_miss= false;
1898
1899 break;
1900 }
1901 }
1902 }
1903 }
1904
1905 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1906
1907 c->curr_task.get_miss= false;
1908 c->rbytes-= c->rvbytes;
1909 c->rcurr= c->rcurr + c->rvbytes;
1910 assert(c->rcurr <= (c->rbuf + c->rsize));
1911 c->readval= false;
1912 c->rvbytes= 0;
1913 } /* ms_ascii_complete_nread */
1914
1915
1916 /**
1917 * For binary protocol, after store the data into the local
1918 * buffer, run this function to handle the data.
1919 *
1920 * @param c, pointer of the concurrency
1921 */
1922 static void ms_bin_complete_nread(ms_conn_t *c)
1923 {
1924 assert(c != NULL);
1925 assert(c->rbytes >= c->rvbytes);
1926 assert(c->protocol == binary_prot);
1927
1928 int extlen= c->binary_header.response.extlen;
1929 int keylen= c->binary_header.response.keylen;
1930 uint8_t opcode= c->binary_header.response.opcode;
1931
1932 /* not get command or not include value, just return */
1933 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1934 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1935 || (c->rvbytes <= extlen + keylen))
1936 {
1937 /* get miss */
1938 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1939 {
1940 c->currcmd.retstat= MCD_END;
1941 c->curr_task.get_miss= true;
1942 }
1943
1944 c->readval= false;
1945 c->rvbytes= 0;
1946 ms_reset_conn(c, false);
1947 return;
1948 }
1949
1950 /* multi-get */
1951 ms_mlget_task_item_t *mlget_item= NULL;
1952 if (((ms_setting.mult_key_num > 1)
1953 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1954 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1955 {
1956 c->mlget_task.value_index++;
1957 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1958
1959 c->curr_task.item= mlget_item->item;
1960 c->curr_task.verify= mlget_item->verify;
1961 c->curr_task.finish_verify= mlget_item->finish_verify;
1962 mlget_item->get_miss= false;
1963 }
1964
1965 ms_verify_value(c,
1966 mlget_item,
1967 c->rcurr + extlen + keylen,
1968 c->rvbytes - extlen - keylen);
1969
1970 c->currcmd.retstat= MCD_END;
1971 c->curr_task.get_miss= false;
1972 c->rbytes-= c->rvbytes;
1973 c->rcurr= c->rcurr + c->rvbytes;
1974 assert(c->rcurr <= (c->rbuf + c->rsize));
1975 c->readval= false;
1976 c->rvbytes= 0;
1977
1978 if (ms_setting.mult_key_num > 1)
1979 {
1980 /* multi-get have check all the item */
1981 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1982 {
1983 ms_reset_conn(c, false);
1984 }
1985 }
1986 else
1987 {
1988 /* single get */
1989 ms_reset_conn(c, false);
1990 }
1991 } /* ms_bin_complete_nread */
1992
1993
1994 /**
1995 * we get here after reading the value of get commands.
1996 *
1997 * @param c, pointer of the concurrency
1998 */
1999 static void ms_complete_nread(ms_conn_t *c)
2000 {
2001 assert(c != NULL);
2002 assert(c->rbytes >= c->rvbytes);
2003 assert(c->protocol == ascii_prot
2004 || c->protocol == binary_prot);
2005
2006 if (c->protocol == binary_prot)
2007 {
2008 ms_bin_complete_nread(c);
2009 }
2010 else
2011 {
2012 ms_ascii_complete_nread(c);
2013 }
2014 } /* ms_complete_nread */
2015
2016
2017 /**
2018 * Adds a message header to a connection.
2019 *
2020 * @param c, pointer of the concurrency
2021 *
2022 * @return int, if success, return EXIT_SUCCESS, else return -1
2023 */
2024 static int ms_add_msghdr(ms_conn_t *c)
2025 {
2026 struct msghdr *msg;
2027
2028 assert(c != NULL);
2029
2030 if (c->msgsize == c->msgused)
2031 {
2032 msg=
2033 realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
2034 if (! msg)
2035 return -1;
2036
2037 c->msglist= msg;
2038 c->msgsize*= 2;
2039 }
2040
2041 msg= c->msglist + c->msgused;
2042
2043 /**
2044 * this wipes msg_iovlen, msg_control, msg_controllen, and
2045 * msg_flags, the last 3 of which aren't defined on solaris:
2046 */
2047 memset(msg, 0, sizeof(struct msghdr));
2048
2049 msg->msg_iov= &c->iov[c->iovused];
2050
2051 if (c->udp && (c->srv_recv_addr_size > 0))
2052 {
2053 msg->msg_name= &c->srv_recv_addr;
2054 msg->msg_namelen= c->srv_recv_addr_size;
2055 }
2056
2057 c->msgbytes= 0;
2058 c->msgused++;
2059
2060 if (c->udp)
2061 {
2062 /* Leave room for the UDP header, which we'll fill in later. */
2063 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2064 }
2065
2066 return EXIT_SUCCESS;
2067 } /* ms_add_msghdr */
2068
2069
2070 /**
2071 * Ensures that there is room for another structure iovec in a connection's
2072 * iov list.
2073 *
2074 * @param c, pointer of the concurrency
2075 *
2076 * @return int, if success, return EXIT_SUCCESS, else return -1
2077 */
2078 static int ms_ensure_iov_space(ms_conn_t *c)
2079 {
2080 assert(c != NULL);
2081
2082 if (c->iovused >= c->iovsize)
2083 {
2084 int i, iovnum;
2085 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2086 ((size_t)c->iovsize
2087 * 2)
2088 * sizeof(struct iovec));
2089 if (! new_iov)
2090 return -1;
2091
2092 c->iov= new_iov;
2093 c->iovsize*= 2;
2094
2095 /* Point all the msghdr structures at the new list. */
2096 for (i= 0, iovnum= 0; i < c->msgused; i++)
2097 {
2098 c->msglist[i].msg_iov= &c->iov[iovnum];
2099 iovnum+= (int)c->msglist[i].msg_iovlen;
2100 }
2101 }
2102
2103 return EXIT_SUCCESS;
2104 } /* ms_ensure_iov_space */
2105
2106
2107 /**
2108 * Adds data to the list of pending data that will be written out to a
2109 * connection.
2110 *
2111 * @param c, pointer of the concurrency
2112 * @param buf, the buffer includes data to send
2113 * @param len, the data length in the buffer
2114 *
2115 * @return int, if success, return EXIT_SUCCESS, else return -1
2116 */
2117 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2118 {
2119 struct msghdr *m;
2120 int leftover;
2121 bool limit_to_mtu;
2122
2123 assert(c != NULL);
2124
2125 do
2126 {
2127 m= &c->msglist[c->msgused - 1];
2128
2129 /*
2130 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2131 */
2132 limit_to_mtu= c->udp;
2133
2134 #ifdef IOV_MAX
2135 /* We may need to start a new msghdr if this one is full. */
2136 if ((m->msg_iovlen == IOV_MAX)
2137 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2138 {
2139 ms_add_msghdr(c);
2140 m= &c->msglist[c->msgused - 1];
2141 }
2142 #endif
2143
2144 if (ms_ensure_iov_space(c) != 0)
2145 return -1;
2146
2147 /* If the fragment is too big to fit in the datagram, split it up */
2148 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2149 {
2150 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2151 len-= leftover;
2152 }
2153 else
2154 {
2155 leftover= 0;
2156 }
2157
2158 m= &c->msglist[c->msgused - 1];
2159 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2160 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2161
2162 c->msgbytes+= len;
2163 c->iovused++;
2164 m->msg_iovlen++;
2165
2166 buf= ((char *)buf) + len;
2167 len= leftover;
2168 }
2169 while (leftover > 0);
2170
2171 return EXIT_SUCCESS;
2172 } /* ms_add_iov */
2173
2174
2175 /**
2176 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2177 *
2178 * @param c, pointer of the concurrency
2179 *
2180 * @return int, if success, return EXIT_SUCCESS, else return -1
2181 */
2182 static int ms_build_udp_headers(ms_conn_t *c)
2183 {
2184 int i;
2185 unsigned char *hdr;
2186
2187 assert(c != NULL);
2188
2189 c->request_id= ms_get_udp_request_id();
2190
2191 if (c->msgused > c->hdrsize)
2192 {
2193 void *new_hdrbuf;
2194 if (c->hdrbuf)
2195 new_hdrbuf= realloc(c->hdrbuf,
2196 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2197 else
2198 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2199 if (! new_hdrbuf)
2200 return -1;
2201
2202 c->hdrbuf= (unsigned char *)new_hdrbuf;
2203 c->hdrsize= c->msgused * 2;
2204 }
2205
2206 /* If this is a multi-packet request, drop it. */
2207 if (c->udp && (c->msgused > 1))
2208 {
2209 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2210 return -1;
2211 }
2212
2213 hdr= c->hdrbuf;
2214 for (i= 0; i < c->msgused; i++)
2215 {
2216 c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2217 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2218 *hdr++= (unsigned char)(c->request_id / 256);
2219 *hdr++= (unsigned char)(c->request_id % 256);
2220 *hdr++= (unsigned char)(i / 256);
2221 *hdr++= (unsigned char)(i % 256);
2222 *hdr++= (unsigned char)(c->msgused / 256);
2223 *hdr++= (unsigned char)(c->msgused % 256);
2224 *hdr++= (unsigned char)1; /* support facebook memcached */
2225 *hdr++= (unsigned char)0;
2226 assert(hdr ==
2227 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2228 + UDP_HEADER_SIZE));
2229 }
2230
2231 return EXIT_SUCCESS;
2232 } /* ms_build_udp_headers */
2233
2234
2235 /**
2236 * Transmit the next chunk of data from our list of msgbuf structures.
2237 *
2238 * @param c, pointer of the concurrency
2239 *
2240 * @return TRANSMIT_COMPLETE All done writing.
2241 * TRANSMIT_INCOMPLETE More data remaining to write.
2242 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2243 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2244 */
2245 static int ms_transmit(ms_conn_t *c)
2246 {
2247 assert(c != NULL);
2248
2249 if ((c->msgcurr < c->msgused)
2250 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2251 {
2252 /* Finished writing the current msg; advance to the next. */
2253 c->msgcurr++;
2254 }
2255
2256 if (c->msgcurr < c->msgused)
2257 {
2258 ssize_t res;
2259 struct msghdr *m= &c->msglist[c->msgcurr];
2260
2261 res= sendmsg(c->sfd, m, 0);
2262 if (res > 0)
2263 {
2264 atomic_add_size(&ms_stats.bytes_written, res);
2265
2266 /* We've written some of the data. Remove the completed
2267 * iovec entries from the list of pending writes. */
2268 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2269 {
2270 res-= (ssize_t)m->msg_iov->iov_len;
2271 m->msg_iovlen--;
2272 m->msg_iov++;
2273 }
2274
2275 /* Might have written just part of the last iovec entry;
2276 * adjust it so the next write will do the rest. */
2277 if (res > 0)
2278 {
2279 m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2280 m->msg_iov->iov_len-= (size_t)res;
2281 }
2282 return TRANSMIT_INCOMPLETE;
2283 }
2284 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2285 {
2286 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2287 {
2288 fprintf(stderr, "Couldn't update event.\n");
2289 ms_conn_set_state(c, conn_closing);
2290 return TRANSMIT_HARD_ERROR;
2291 }
2292 return TRANSMIT_SOFT_ERROR;
2293 }
2294
2295 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2296 * we have a real error, on which we close the connection */
2297 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2298
2299 ms_conn_set_state(c, conn_closing);
2300 return TRANSMIT_HARD_ERROR;
2301 }
2302 else
2303 {
2304 return TRANSMIT_COMPLETE;
2305 }
2306 } /* ms_transmit */
2307
2308
2309 /**
2310 * Shrinks a connection's buffers if they're too big. This prevents
2311 * periodic large "mget" response from server chewing lots of client
2312 * memory.
2313 *
2314 * This should only be called in between requests since it can wipe output
2315 * buffers!
2316 *
2317 * @param c, pointer of the concurrency
2318 */
2319 static void ms_conn_shrink(ms_conn_t *c)
2320 {
2321 assert(c != NULL);
2322
2323 if (c->udp)
2324 return;
2325
2326 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2327 {
2328 char *newbuf;
2329
2330 if (c->rcurr != c->rbuf)
2331 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2332
2333 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2334
2335 if (newbuf)
2336 {
2337 c->rbuf= newbuf;
2338 c->rsize= DATA_BUFFER_SIZE;
2339 }
2340 c->rcurr= c->rbuf;
2341 }
2342
2343 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2344 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2345 {
2346 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2347 if (! new_rbuf)
2348 {
2349 c->rudpbuf= new_rbuf;
2350 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2351 }
2352 /* TODO check error condition? */
2353 }
2354
2355 if (c->msgsize > MSG_LIST_HIGHWAT)
2356 {
2357 struct msghdr *newbuf= (struct msghdr *)realloc(
2358 (void *)c->msglist,
2359 MSG_LIST_INITIAL
2360 * sizeof(c->msglist[0]));
2361 if (newbuf)
2362 {
2363 c->msglist= newbuf;
2364 c->msgsize= MSG_LIST_INITIAL;
2365 }
2366 /* TODO check error condition? */
2367 }
2368
2369 if (c->iovsize > IOV_LIST_HIGHWAT)
2370 {
2371 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2372 IOV_LIST_INITIAL
2373 * sizeof(c->iov[0]));
2374 if (newbuf)
2375 {
2376 c->iov= newbuf;
2377 c->iovsize= IOV_LIST_INITIAL;
2378 }
2379 /* TODO check return value */
2380 }
2381 } /* ms_conn_shrink */
2382
2383
2384 /**
2385 * Sets a connection's current state in the state machine. Any special
2386 * processing that needs to happen on certain state transitions can
2387 * happen here.
2388 *
2389 * @param c, pointer of the concurrency
2390 * @param state, connection state
2391 */
2392 static void ms_conn_set_state(ms_conn_t *c, int state)
2393 {
2394 assert(c != NULL);
2395
2396 if (state != c->state)
2397 {
2398 if (state == conn_read)
2399 {
2400 ms_conn_shrink(c);
2401 }
2402 c->state= state;
2403 }
2404 } /* ms_conn_set_state */
2405
2406
2407 /**
2408 * update the event if socks change state. for example: when
2409 * change the listen scoket read event to sock write event, or
2410 * change socket handler, we could call this function.
2411 *
2412 * @param c, pointer of the concurrency
2413 * @param new_flags, new event flags
2414 *
2415 * @return bool, if success, return true, else return false
2416 */
2417 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2418 {
2419 assert(c != NULL);
2420
2421 struct event_base *base= c->event.ev_base;
2422 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2423 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2424 {
2425 return true;
2426 }
2427
2428 if (event_del(&c->event) == -1)
2429 {
2430 /* try to delete the event again */
2431 if (event_del(&c->event) == -1)
2432 {
2433 return false;
2434 }
2435 }
2436
2437 event_set(&c->event,
2438 c->sfd,
2439 (short)new_flags,
2440 ms_event_handler,
2441 (void *)c);
2442 event_base_set(base, &c->event);
2443 c->ev_flags= (short)new_flags;
2444
2445 if (event_add(&c->event, NULL) == -1)
2446 {
2447 return false;
2448 }
2449
2450 return true;
2451 } /* ms_update_event */
2452
2453
2454 /**
2455 * If user want to get the expected throughput, we could limit
2456 * the performance of memslap. we could give up some work and
2457 * just wait a short time. The function is used to check this
2458 * case.
2459 *
2460 * @param c, pointer of the concurrency
2461 *
2462 * @return bool, if success, return true, else return false
2463 */
2464 static bool ms_need_yield(ms_conn_t *c)
2465 {
2466 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2467 int64_t tps= 0;
2468 int64_t time_diff= 0;
2469 struct timeval curr_time;
2470 ms_task_t *task= &c->curr_task;
2471
2472 if (ms_setting.expected_tps > 0)
2473 {
2474 gettimeofday(&curr_time, NULL);
2475 time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2476 tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000);
2477
2478 /* current throughput is greater than expected throughput */
2479 if (tps > ms_thread->thread_ctx->tps_perconn)
2480 {
2481 return true;
2482 }
2483 }
2484
2485 return false;
2486 } /* ms_need_yield */
2487
2488
2489 /**
2490 * used to update the start time of each operation
2491 *
2492 * @param c, pointer of the concurrency
2493 */
2494 static void ms_update_start_time(ms_conn_t *c)
2495 {
2496 ms_task_item_t *item= c->curr_task.item;
2497
2498 if ((ms_setting.stat_freq > 0) || c->udp
2499 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2500 {
2501 gettimeofday(&c->start_time, NULL);
2502 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2503 {
2504 /* record the current time */
2505 item->client_time= c->start_time.tv_sec;
2506 }
2507 }
2508 } /* ms_update_start_time */
2509
2510
2511 /**
2512 * run the state machine
2513 *
2514 * @param c, pointer of the concurrency
2515 */
2516 static void ms_drive_machine(ms_conn_t *c)
2517 {
2518 bool stop= false;
2519
2520 assert(c != NULL);
2521
2522 while (! stop)
2523 {
2524 switch (c->state)
2525 {
2526 case conn_read:
2527 if (c->readval)
2528 {
2529 if (c->rbytes >= c->rvbytes)
2530 {
2531 ms_complete_nread(c);
2532 break;
2533 }
2534 }
2535 else
2536 {
2537 if (ms_try_read_line(c) != 0)
2538 {
2539 break;
2540 }
2541 }
2542
2543 if (ms_try_read_network(c) != 0)
2544 {
2545 break;
2546 }
2547
2548 /* doesn't read all the response data, wait event wake up */
2549 if (! c->currcmd.isfinish)
2550 {
2551 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2552 {
2553 fprintf(stderr, "Couldn't update event.\n");
2554 ms_conn_set_state(c, conn_closing);
2555 break;
2556 }
2557 stop= true;
2558 break;
2559 }
2560
2561 /* we have no command line and no data to read from network, next write */
2562 ms_conn_set_state(c, conn_write);
2563 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2564
2565 break;
2566
2567 case conn_write:
2568 if (! c->ctnwrite && ms_need_yield(c))
2569 {
2570 usleep(10);
2571
2572 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2573 {
2574 fprintf(stderr, "Couldn't update event.\n");
2575 ms_conn_set_state(c, conn_closing);
2576 break;
2577 }
2578 stop= true;
2579 break;
2580 }
2581
2582 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2583 {
2584 ms_conn_set_state(c, conn_closing);
2585 break;
2586 }
2587
2588 /* record the start time before starting to send data if necessary */
2589 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2590 {
2591 if (c->change_sfd)
2592 {
2593 c->change_sfd= false;
2594 }
2595 ms_update_start_time(c);
2596 }
2597
2598 /* change sfd if necessary */
2599 if (c->change_sfd)
2600 {
2601 c->ctnwrite= true;
2602 stop= true;
2603 break;
2604 }
2605
2606 /* execute task until nothing need be written to network */
2607 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2608 {
2609 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2610 {
2611 fprintf(stderr, "Couldn't update event.\n");
2612 ms_conn_set_state(c, conn_closing);
2613 break;
2614 }
2615 stop= true;
2616 break;
2617 }
2618
2619 switch (ms_transmit(c))
2620 {
2621 case TRANSMIT_COMPLETE:
2622 /* we have no data to write to network, next wait repose */
2623 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2624 {
2625 fprintf(stderr, "Couldn't update event.\n");
2626 ms_conn_set_state(c, conn_closing);
2627 c->ctnwrite= false;
2628 break;
2629 }
2630 ms_conn_set_state(c, conn_read);
2631 c->ctnwrite= false;
2632 stop= true;
2633 break;
2634
2635 case TRANSMIT_INCOMPLETE:
2636 c->ctnwrite= true;
2637 break; /* Continue in state machine. */
2638
2639 case TRANSMIT_HARD_ERROR:
2640 c->ctnwrite= false;
2641 break;
2642
2643 case TRANSMIT_SOFT_ERROR:
2644 c->ctnwrite= true;
2645 stop= true;
2646 break;
2647
2648 default:
2649 break;
2650 } /* switch */
2651
2652 break;
2653
2654 case conn_closing:
2655 /* recovery mode, need reconnect if connection close */
2656 if (ms_setting.reconnect && (! ms_global.time_out
2657 || ((ms_setting.run_time == 0)
2658 && (c->remain_exec_num > 0))))
2659 {
2660 if (ms_reconn(c) != 0)
2661 {
2662 ms_conn_close(c);
2663 stop= true;
2664 break;
2665 }
2666
2667 ms_reset_conn(c, false);
2668
2669 if (c->total_sfds == 1)
2670 {
2671 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2672 {
2673 fprintf(stderr, "Couldn't update event.\n");
2674 ms_conn_set_state(c, conn_closing);
2675 break;
2676 }
2677 }
2678
2679 break;
2680 }
2681 else
2682 {
2683 ms_conn_close(c);
2684 stop= true;
2685 break;
2686 }
2687
2688 default:
2689 assert(0);
2690 } /* switch */
2691 }
2692 } /* ms_drive_machine */
2693
2694
2695 /**
2696 * the event handler of each thread
2697 *
2698 * @param fd, the file descriptor of socket
2699 * @param which, event flag
2700 * @param arg, argument
2701 */
2702 void ms_event_handler(const int fd, const short which, void *arg)
2703 {
2704 ms_conn_t *c= (ms_conn_t *)arg;
2705
2706 assert(c != NULL);
2707
2708 c->which= which;
2709
2710 /* sanity */
2711 if (fd != c->sfd)
2712 {
2713 fprintf(stderr,
2714 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2715 fd,
2716 c->sfd);
2717 ms_conn_close(c);
2718 exit(1);
2719 }
2720 assert(fd == c->sfd);
2721
2722 ms_drive_machine(c);
2723
2724 /* wait for next event */
2725 } /* ms_event_handler */
2726
2727
2728 /**
2729 * get the next socket descriptor index to run for replication
2730 *
2731 * @param c, pointer of the concurrency
2732 * @param cmd, command(get or set )
2733 *
2734 * @return int, if success, return the index, else return EXIT_SUCCESS
2735 */
2736 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2737 {
2738 uint32_t sock_index= 0;
2739 uint32_t i= 0;
2740
2741 if (c->total_sfds == 1)
2742 {
2743 return EXIT_SUCCESS;
2744 }
2745
2746 if (ms_setting.rep_write_srv == 0)
2747 {
2748 return sock_index;
2749 }
2750
2751 do
2752 {
2753 if (cmd == CMD_SET)
2754 {
2755 for (i= 0; i < ms_setting.rep_write_srv; i++)
2756 {
2757 if (c->tcpsfd[i] > 0)
2758 {
2759 break;
2760 }
2761 }
2762
2763 if (i == ms_setting.rep_write_srv)
2764 {
2765 /* random get one replication server to read */
2766 sock_index= (uint32_t)random() % c->total_sfds;
2767 }
2768 else
2769 {
2770 /* random get one replication writing server to write */
2771 sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
2772 }
2773 }
2774 else if (cmd == CMD_GET)
2775 {
2776 /* random get one replication server to read */
2777 sock_index= (uint32_t)random() % c->total_sfds;
2778 }
2779 }
2780 while (c->tcpsfd[sock_index] == 0);
2781
2782 return sock_index;
2783 } /* ms_get_rep_sock_index */
2784
2785
2786 /**
2787 * get the next socket descriptor index to run
2788 *
2789 * @param c, pointer of the concurrency
2790 *
2791 * @return int, return the index
2792 */
2793 static uint32_t ms_get_next_sock_index(ms_conn_t *c)
2794 {
2795 uint32_t sock_index= 0;
2796
2797 do
2798 {
2799 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2800 }
2801 while (c->tcpsfd[sock_index] == 0);
2802
2803 return sock_index;
2804 } /* ms_get_next_sock_index */
2805
2806
2807 /**
2808 * update socket event of the connections
2809 *
2810 * @param c, pointer of the concurrency
2811 *
2812 * @return int, if success, return EXIT_SUCCESS, else return -1
2813 */
2814 static int ms_update_conn_sock_event(ms_conn_t *c)
2815 {
2816 assert(c != NULL);
2817
2818 switch (c->currcmd.cmd)
2819 {
2820 case CMD_SET:
2821 if (ms_setting.facebook_test && c->udp)
2822 {
2823 c->sfd= c->tcpsfd[0];
2824 c->udp= false;
2825 c->change_sfd= true;
2826 }
2827 break;
2828
2829 case CMD_GET:
2830 if (ms_setting.facebook_test && ! c->udp)
2831 {
2832 c->sfd= c->udpsfd;
2833 c->udp= true;
2834 c->change_sfd= true;
2835 }
2836 break;
2837
2838 default:
2839 break;
2840 } /* switch */
2841
2842 if (! c->udp && (c->total_sfds > 1))
2843 {
2844 if (c->cur_idx != c->total_sfds)
2845 {
2846 if (ms_setting.rep_write_srv == 0)
2847 {
2848 c->cur_idx= ms_get_next_sock_index(c);
2849 }
2850 else
2851 {
2852 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2853 }
2854 }
2855 else
2856 {
2857 /* must select the first sock of the connection at the beginning */
2858 c->cur_idx= 0;
2859 }
2860
2861 c->sfd= c->tcpsfd[c->cur_idx];
2862 assert(c->sfd != 0);
2863 c->change_sfd= true;
2864 }
2865
2866 if (c->change_sfd)
2867 {
2868 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2869 {
2870 fprintf(stderr, "Couldn't update event.\n");
2871 ms_conn_set_state(c, conn_closing);
2872 return -1;
2873 }
2874 }
2875
2876 return EXIT_SUCCESS;
2877 } /* ms_update_conn_sock_event */
2878
2879
2880 /**
2881 * for ASCII protocol, this function build the set command
2882 * string and send the command.
2883 *
2884 * @param c, pointer of the concurrency
2885 * @param item, pointer of task item which includes the object
2886 * information
2887 *
2888 * @return int, if success, return EXIT_SUCCESS, else return -1
2889 */
2890 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2891 {
2892 int value_offset;
2893 int write_len;
2894 char *buffer= c->wbuf;
2895
2896 write_len= snprintf(buffer,
2897 c->wsize,
2898 " %u %d %d\r\n",
2899 0,
2900 item->exp_time,
2901 item->value_size);
2902
2903 if (write_len > c->wsize || write_len < 0)
2904 {
2905 /* ought to be always enough. just fail for simplicity */
2906 fprintf(stderr, "output command line too long.\n");
2907 return -1;
2908 }
2909
2910 if (item->value_offset == INVALID_OFFSET)
2911 {
2912 value_offset= item->key_suffix_offset;
2913 }
2914 else
2915 {
2916 value_offset= item->value_offset;
2917 }
2918
2919 if ((ms_add_iov(c, "set ", 4) != 0)
2920 || (ms_add_iov(c, (char *)&item->key_prefix,
2921 (int)KEY_PREFIX_SIZE) != 0)
2922 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2923 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2924 || (ms_add_iov(c, buffer, write_len) != 0)
2925 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2926 item->value_size) != 0)
2927 || (ms_add_iov(c, "\r\n", 2) != 0)
2928 || (c->udp && (ms_build_udp_headers(c) != 0)))
2929 {
2930 return -1;
2931 }
2932
2933 return EXIT_SUCCESS;
2934 } /* ms_build_ascii_write_buf_set */
2935
2936
2937 /**
2938 * used to send set command to server
2939 *
2940 * @param c, pointer of the concurrency
2941 * @param item, pointer of task item which includes the object
2942 * information
2943 *
2944 * @return int, if success, return EXIT_SUCCESS, else return -1
2945 */
2946 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2947 {
2948 assert(c != NULL);
2949
2950 c->currcmd.cmd= CMD_SET;
2951 c->currcmd.isfinish= false;
2952 c->currcmd.retstat= MCD_FAILURE;
2953
2954 if (ms_update_conn_sock_event(c) != 0)
2955 {
2956 return -1;
2957 }
2958
2959 c->msgcurr= 0;
2960 c->msgused= 0;
2961 c->iovused= 0;
2962 if (ms_add_msghdr(c) != 0)
2963 {
2964 fprintf(stderr, "Out of memory preparing request.");
2965 return -1;
2966 }
2967
2968 /* binary protocol */
2969 if (c->protocol == binary_prot)
2970 {
2971 if (ms_build_bin_write_buf_set(c, item) != 0)
2972 {
2973 return -1;
2974 }
2975 }
2976 else
2977 {
2978 if (ms_build_ascii_write_buf_set(c, item) != 0)
2979 {
2980 return -1;
2981 }
2982 }
2983
2984 atomic_add_size(&ms_stats.obj_bytes,
2985 item->key_size + item->value_size);
2986 atomic_add_size(&ms_stats.cmd_set, 1);
2987
2988 return EXIT_SUCCESS;
2989 } /* ms_mcd_set */
2990
2991
2992 /**
2993 * for ASCII protocol, this function build the get command
2994 * string and send the command.
2995 *
2996 * @param c, pointer of the concurrency
2997 * @param item, pointer of task item which includes the object
2998 * information
2999 *
3000 * @return int, if success, return EXIT_SUCCESS, else return -1
3001 */
3002 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3003 {
3004 if ((ms_add_iov(c, "get ", 4) != 0)
3005 || (ms_add_iov(c, (char *)&item->key_prefix,
3006 (int)KEY_PREFIX_SIZE) != 0)
3007 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3008 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3009 || (ms_add_iov(c, "\r\n", 2) != 0)
3010 || (c->udp && (ms_build_udp_headers(c) != 0)))
3011 {
3012 return -1;
3013 }
3014
3015 return EXIT_SUCCESS;
3016 } /* ms_build_ascii_write_buf_get */
3017
3018
3019 /**
3020 * used to send the get command to server
3021 *
3022 * @param c, pointer of the concurrency
3023 * @param item, pointer of task item which includes the object
3024 * information
3025 *
3026 * @return int, if success, return EXIT_SUCCESS, else return -1
3027 */
3028 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
3029 {
3030 assert(c != NULL);
3031
3032 c->currcmd.cmd= CMD_GET;
3033 c->currcmd.isfinish= false;
3034 c->currcmd.retstat= MCD_FAILURE;
3035
3036 if (ms_update_conn_sock_event(c) != 0)
3037 {
3038 return -1;
3039 }
3040
3041 c->msgcurr= 0;
3042 c->msgused= 0;
3043 c->iovused= 0;
3044 if (ms_add_msghdr(c) != 0)
3045 {
3046 fprintf(stderr, "Out of memory preparing request.");
3047 return -1;
3048 }
3049
3050 /* binary protocol */
3051 if (c->protocol == binary_prot)
3052 {
3053 if (ms_build_bin_write_buf_get(c, item) != 0)
3054 {
3055 return -1;
3056 }
3057 }
3058 else
3059 {
3060 if (ms_build_ascii_write_buf_get(c, item) != 0)
3061 {
3062 return -1;
3063 }
3064 }
3065
3066 atomic_add_size(&ms_stats.cmd_get, 1);
3067
3068 return EXIT_SUCCESS;
3069 } /* ms_mcd_get */
3070
3071
3072 /**
3073 * for ASCII protocol, this function build the multi-get command
3074 * string and send the command.
3075 *
3076 * @param c, pointer of the concurrency
3077 *
3078 * @return int, if success, return EXIT_SUCCESS, else return -1
3079 */
3080 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3081 {
3082 ms_task_item_t *item;
3083
3084 if (ms_add_iov(c, "get", 3) != 0)
3085 {
3086 return -1;
3087 }
3088
3089 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3090 {
3091 item= c->mlget_task.mlget_item[i].item;
3092 assert(item != NULL);
3093 if ((ms_add_iov(c, " ", 1) != 0)
3094 || (ms_add_iov(c, (char *)&item->key_prefix,
3095 (int)KEY_PREFIX_SIZE) != 0)
3096 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3097 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3098 {
3099 return -1;
3100 }
3101 }
3102
3103 if ((ms_add_iov(c, "\r\n", 2) != 0)
3104 || (c->udp && (ms_build_udp_headers(c) != 0)))
3105 {
3106 return -1;
3107 }
3108
3109 return EXIT_SUCCESS;
3110 } /* ms_build_ascii_write_buf_mlget */
3111
3112
3113 /**
3114 * used to send the multi-get command to server
3115 *
3116 * @param c, pointer of the concurrency
3117 *
3118 * @return int, if success, return EXIT_SUCCESS, else return -1
3119 */
3120 int ms_mcd_mlget(ms_conn_t *c)
3121 {
3122 ms_task_item_t *item;
3123
3124 assert(c != NULL);
3125 assert(c->mlget_task.mlget_num >= 1);
3126
3127 c->currcmd.cmd= CMD_GET;
3128 c->currcmd.isfinish= false;
3129 c->currcmd.retstat= MCD_FAILURE;
3130
3131 if (ms_update_conn_sock_event(c) != 0)
3132 {
3133 return -1;
3134 }
3135
3136 c->msgcurr= 0;
3137 c->msgused= 0;
3138 c->iovused= 0;
3139 if (ms_add_msghdr(c) != 0)
3140 {
3141 fprintf(stderr, "Out of memory preparing request.");
3142 return -1;
3143 }
3144
3145 /* binary protocol */
3146 if (c->protocol == binary_prot)
3147 {
3148 if (ms_build_bin_write_buf_mlget(c) != 0)
3149 {
3150 return -1;
3151 }
3152 }
3153 else
3154 {
3155 if (ms_build_ascii_write_buf_mlget(c) != 0)
3156 {
3157 return -1;
3158 }
3159 }
3160
3161 /* decrease operation time of each item */
3162 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3163 {
3164 item= c->mlget_task.mlget_item[i].item;
3165 atomic_add_size(&ms_stats.cmd_get, 1);
3166 }
3167
3168 (void)item;
3169
3170 return EXIT_SUCCESS;
3171 } /* ms_mcd_mlget */
3172
3173
3174 /**
3175 * binary protocol support
3176 */
3177
3178 /**
3179 * for binary protocol, parse the response of server
3180 *
3181 * @param c, pointer of the concurrency
3182 *
3183 * @return int, if success, return EXIT_SUCCESS, else return -1
3184 */
3185 static int ms_bin_process_response(ms_conn_t *c)
3186 {
3187 const char *errstr= NULL;
3188
3189 assert(c != NULL);
3190
3191 uint32_t bodylen= c->binary_header.response.bodylen;
3192 uint8_t opcode= c->binary_header.response.opcode;
3193 uint16_t status= c->binary_header.response.status;
3194
3195 if (bodylen > 0)
3196 {
3197 c->rvbytes= (int32_t)bodylen;
3198 c->readval= true;
3199 return EXIT_FAILURE;
3200 }
3201 else
3202 {
3203 switch (status)
3204 {
3205 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3206 if (opcode == PROTOCOL_BINARY_CMD_SET)
3207 {
3208 c->currcmd.retstat= MCD_STORED;
3209 }
3210 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3211 {
3212 c->currcmd.retstat= MCD_DELETED;
3213 }
3214 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3215 {
3216 c->currcmd.retstat= MCD_END;
3217 }
3218 break;
3219
3220 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3221 errstr= "Out of memory";
3222 c->currcmd.retstat= MCD_SERVER_ERROR;
3223 break;
3224
3225 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3226 errstr= "Unknown command";
3227 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3228 break;
3229
3230 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3231 errstr= "Not found";
3232 c->currcmd.retstat= MCD_NOTFOUND;
3233 break;
3234
3235 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3236 errstr= "Invalid arguments";
3237 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3238 break;
3239
3240 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3241 errstr= "Data exists for key.";
3242 break;
3243
3244 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3245 errstr= "Too large.";
3246 c->currcmd.retstat= MCD_SERVER_ERROR;
3247 break;
3248
3249 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3250 errstr= "Not stored.";
3251 c->currcmd.retstat= MCD_NOTSTORED;
3252 break;
3253
3254 default:
3255 errstr= "Unknown error";
3256 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3257 break;
3258 } /* switch */
3259
3260 if (errstr != NULL)
3261 {
3262 fprintf(stderr, "%s\n", errstr);
3263 }
3264 }
3265
3266 return EXIT_SUCCESS;
3267 } /* ms_bin_process_response */
3268
3269
3270 /* build binary header and add the header to the buffer to send */
3271
3272 /**
3273 * build binary header and add the header to the buffer to send
3274 *
3275 * @param c, pointer of the concurrency
3276 * @param opcode, operation code
3277 * @param hdr_len, length of header
3278 * @param key_len, length of key
3279 * @param body_len. length of body
3280 */
3281 static void ms_add_bin_header(ms_conn_t *c,
3282 uint8_t opcode,
3283 uint8_t hdr_len,
3284 uint16_t key_len,
3285 uint32_t body_len)
3286 {
3287 protocol_binary_request_header *header;
3288
3289 assert(c != NULL);
3290
3291 header= (protocol_binary_request_header *)c->wcurr;
3292
3293 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3294 header->request.opcode= (uint8_t)opcode;
3295 header->request.keylen= htons(key_len);
3296
3297 header->request.extlen= (uint8_t)hdr_len;
3298 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3299 header->request.vbucket= 0;
3300
3301 header->request.bodylen= htonl(body_len);
3302 header->request.opaque= 0;
3303 header->request.cas= 0;
3304
3305 ms_add_iov(c, c->wcurr, sizeof(header->request));
3306 } /* ms_add_bin_header */
3307
3308
3309 /**
3310 * add the key to the socket write buffer array
3311 *
3312 * @param c, pointer of the concurrency
3313 * @param item, pointer of task item which includes the object
3314 * information
3315 */
3316 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3317 {
3318 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3319 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3320 item->key_size - (int)KEY_PREFIX_SIZE);
3321 }
3322
3323
3324 /**
3325 * for binary protocol, this function build the set command
3326 * and add the command to send buffer array.
3327 *
3328 * @param c, pointer of the concurrency
3329 * @param item, pointer of task item which includes the object
3330 * information
3331 *
3332 * @return int, if success, return EXIT_SUCCESS, else return -1
3333 */
3334 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3335 {
3336 assert(c->wbuf == c->wcurr);
3337
3338 int value_offset;
3339 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3340 uint16_t keylen= (uint16_t)item->key_size;
3341 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3342 + (uint32_t)keylen + (uint32_t)item->value_size;
3343
3344 ms_add_bin_header(c,
3345 PROTOCOL_BINARY_CMD_SET,
3346 sizeof(rep->message.body),
3347 keylen,
3348 bodylen);
3349 rep->message.body.flags= 0;
3350 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3351 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3352 ms_add_key_to_iov(c, item);
3353
3354 if (item->value_offset == INVALID_OFFSET)
3355 {
3356 value_offset= item->key_suffix_offset;
3357 }
3358 else
3359 {
3360 value_offset= item->value_offset;
3361 }
3362 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3363
3364 return EXIT_SUCCESS;
3365 } /* ms_build_bin_write_buf_set */
3366
3367
3368 /**
3369 * for binary protocol, this function build the get command and
3370 * add the command to send buffer array.
3371 *
3372 * @param c, pointer of the concurrency
3373 * @param item, pointer of task item which includes the object
3374 * information
3375 *
3376 * @return int, if success, return EXIT_SUCCESS, else return -1
3377 */
3378 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3379 {
3380 assert(c->wbuf == c->wcurr);
3381
3382 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3383 (uint32_t)item->key_size);
3384 ms_add_key_to_iov(c, item);
3385
3386 return EXIT_SUCCESS;
3387 } /* ms_build_bin_write_buf_get */
3388
3389
3390 /**
3391 * for binary protocol, this function build the multi-get
3392 * command and add the command to send buffer array.
3393 *
3394 * @param c, pointer of the concurrency
3395 * @param item, pointer of task item which includes the object
3396 * information
3397 *
3398 * @return int, if success, return EXIT_SUCCESS, else return -1
3399 */
3400 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3401 {
3402 ms_task_item_t *item;
3403
3404 assert(c->wbuf == c->wcurr);
3405
3406 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3407 {
3408 item= c->mlget_task.mlget_item[i].item;
3409 assert(item != NULL);
3410
3411 ms_add_bin_header(c,
3412 PROTOCOL_BINARY_CMD_GET,
3413 0,
3414 (uint16_t)item->key_size,
3415 (uint32_t)item->key_size);
3416 ms_add_key_to_iov(c, item);
3417 c->wcurr+= sizeof(protocol_binary_request_get);
3418 }
3419
3420 c->wcurr= c->wbuf;
3421
3422 return EXIT_SUCCESS;
3423 } /* ms_build_bin_write_buf_mlget */