Merge Lee
[awesomized/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <arpa/inet.h>
22 #if TIME_WITH_SYS_TIME
23 # include <sys/time.h>
24 # include <time.h>
25 #else
26 # if HAVE_SYS_TIME_H
27 # include <sys/time.h>
28 # else
29 # include <time.h>
30 # endif
31 #endif
32 #include "ms_setting.h"
33 #include "ms_thread.h"
34 #include "ms_atomic.h"
35
36 #ifdef linux
37 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
38 * optimize the conversion functions, but the prototypes generate warnings
39 * from gcc. The conversion methods isn't the bottleneck for my app, so
40 * just remove the warnings by undef'ing the optimization ..
41 */
42 #undef ntohs
43 #undef ntohl
44 #undef htons
45 #undef htonl
46 #endif
47
48 /* for network write */
49 #define TRANSMIT_COMPLETE 0
50 #define TRANSMIT_INCOMPLETE 1
51 #define TRANSMIT_SOFT_ERROR 2
52 #define TRANSMIT_HARD_ERROR 3
53
54 /* for generating key */
55 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
56 #define KEY_PREFIX_MASK 0x1010101010101010
57
58 /* For parse the value length return by server */
59 #define KEY_TOKEN 1
60 #define VALUELEN_TOKEN 3
61
62 /* global increasing counter, to ensure the key prefix unique */
63 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
64
65 /* global increasing counter, generating request id for UDP */
66 static volatile uint32_t udp_request_id= 0;
67
68 extern pthread_key_t ms_thread_key;
69
70 /* generate upd request id */
71 static uint32_t ms_get_udp_request_id(void);
72
73
74 /* connect initialize */
75 static void ms_task_init(ms_conn_t *c);
76 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
77 static int ms_conn_sock_init(ms_conn_t *c);
78 static int ms_conn_event_init(ms_conn_t *c);
79 static int ms_conn_init(ms_conn_t *c,
80 const int init_state,
81 const int read_buffer_size,
82 const bool is_udp);
83 static void ms_warmup_num_init(ms_conn_t *c);
84 static int ms_item_win_init(ms_conn_t *c);
85
86
87 /* connection close */
88 void ms_conn_free(ms_conn_t *c);
89 static void ms_conn_close(ms_conn_t *c);
90
91
92 /* create network connection */
93 static int ms_new_socket(struct addrinfo *ai);
94 static void ms_maximize_sndbuf(const int sfd);
95 static int ms_network_connect(ms_conn_t *c,
96 char *srv_host_name,
97 const int srv_port,
98 const bool is_udp,
99 int *ret_sfd);
100 static int ms_reconn(ms_conn_t *c);
101
102
103 /* read and parse */
104 static int ms_tokenize_command(char *command,
105 token_t *tokens,
106 const int max_tokens);
107 static int ms_ascii_process_line(ms_conn_t *c, char *command);
108 static int ms_try_read_line(ms_conn_t *c);
109 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
110 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
111 static int ms_try_read_network(ms_conn_t *c);
112 static void ms_verify_value(ms_conn_t *c,
113 ms_mlget_task_item_t *mlget_item,
114 char *value,
115 int vlen);
116 static void ms_ascii_complete_nread(ms_conn_t *c);
117 static void ms_bin_complete_nread(ms_conn_t *c);
118 static void ms_complete_nread(ms_conn_t *c);
119
120
121 /* send functions */
122 static int ms_add_msghdr(ms_conn_t *c);
123 static int ms_ensure_iov_space(ms_conn_t *c);
124 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
125 static int ms_build_udp_headers(ms_conn_t *c);
126 static int ms_transmit(ms_conn_t *c);
127
128
129 /* status adjustment */
130 static void ms_conn_shrink(ms_conn_t *c);
131 static void ms_conn_set_state(ms_conn_t *c, int state);
132 static bool ms_update_event(ms_conn_t *c, const int new_flags);
133 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
134 static int ms_get_next_sock_index(ms_conn_t *c);
135 static int ms_update_conn_sock_event(ms_conn_t *c);
136 static bool ms_need_yield(ms_conn_t *c);
137 static void ms_update_start_time(ms_conn_t *c);
138
139
140 /* main loop */
141 static void ms_drive_machine(ms_conn_t *c);
142 void ms_event_handler(const int fd, const short which, void *arg);
143
144
145 /* ascii protocol */
146 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
147 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
148 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
149
150
151 /* binary protocol */
152 static int ms_bin_process_response(ms_conn_t *c);
153 static void ms_add_bin_header(ms_conn_t *c,
154 uint8_t opcode,
155 uint8_t hdr_len,
156 uint16_t key_len,
157 uint32_t body_len);
158 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
159 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
160 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
161 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
162
163
164 /**
165 * each key has two parts, prefix and suffix. The suffix is a
166 * string random get form the character table. The prefix is a
167 * uint64_t variable. And the prefix must be unique. we use the
168 * prefix to identify a key. And the prefix can't include
169 * character ' ' '\r' '\n' '\0'.
170 *
171 * @return uint64_t
172 */
173 uint64_t ms_get_key_prefix(void)
174 {
175 uint64_t key_prefix;
176
177 pthread_mutex_lock(&ms_global.seq_mutex);
178 key_prefix_seq|= KEY_PREFIX_MASK;
179 key_prefix= key_prefix_seq;
180 key_prefix_seq++;
181 pthread_mutex_unlock(&ms_global.seq_mutex);
182
183 return key_prefix;
184 } /* ms_get_key_prefix */
185
186
187 /**
188 * get an unique udp request id
189 *
190 * @return an unique UDP request id
191 */
192 static uint32_t ms_get_udp_request_id(void)
193 {
194 return atomic_add_32_nv(&udp_request_id, 1);
195 }
196
197
198 /**
199 * initialize current task structure
200 *
201 * @param c, pointer of the concurrency
202 */
203 static void ms_task_init(ms_conn_t *c)
204 {
205 c->curr_task.cmd= CMD_NULL;
206 c->curr_task.item= 0;
207 c->curr_task.verify= false;
208 c->curr_task.finish_verify= true;
209 c->curr_task.get_miss= true;
210
211 c->curr_task.get_opt= 0;
212 c->curr_task.set_opt= 0;
213 c->curr_task.cycle_undo_get= 0;
214 c->curr_task.cycle_undo_set= 0;
215 c->curr_task.verified_get= 0;
216 c->curr_task.overwrite_set= 0;
217 } /* ms_task_init */
218
219
220 /**
221 * initialize udp for the connection structure
222 *
223 * @param c, pointer of the concurrency
224 * @param is_udp, whether it's udp
225 *
226 * @return int, if success, return 0, else return -1
227 */
228 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
229 {
230 c->hdrbuf= 0;
231 c->rudpbuf= 0;
232 c->udppkt= 0;
233
234 c->rudpsize= UDP_DATA_BUFFER_SIZE;
235 c->hdrsize= 0;
236
237 c->rudpbytes= 0;
238 c->packets= 0;
239 c->recvpkt= 0;
240 c->pktcurr= 0;
241 c->ordcurr= 0;
242
243 c->udp= is_udp;
244
245 if (c->udp || (! c->udp && ms_setting.facebook_test))
246 {
247 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
248 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
249
250 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
251 {
252 if (c->rudpbuf != NULL)
253 free(c->rudpbuf);
254 if (c->udppkt != NULL)
255 free(c->udppkt);
256 fprintf(stderr, "malloc()\n");
257 return -1;
258 }
259 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
260 }
261
262 return 0;
263 } /* ms_conn_udp_init */
264
265
266 /**
267 * initialize the connection structure
268 *
269 * @param c, pointer of the concurrency
270 * @param init_state, (conn_read, conn_write, conn_closing)
271 * @param read_buffer_size
272 * @param is_udp, whether it's udp
273 *
274 * @return int, if success, return 0, else return -1
275 */
276 static int ms_conn_init(ms_conn_t *c,
277 const int init_state,
278 const int read_buffer_size,
279 const bool is_udp)
280 {
281 assert(c != NULL);
282
283 c->rbuf= c->wbuf= 0;
284 c->iov= 0;
285 c->msglist= 0;
286
287 c->rsize= read_buffer_size;
288 c->wsize= WRITE_BUFFER_SIZE;
289 c->iovsize= IOV_LIST_INITIAL;
290 c->msgsize= MSG_LIST_INITIAL;
291
292 /* for replication, each connection need connect all the server */
293 if (ms_setting.rep_write_srv > 0)
294 {
295 c->total_sfds= ms_setting.srv_cnt;
296 }
297 else
298 {
299 c->total_sfds= ms_setting.sock_per_conn;
300 }
301 c->alive_sfds= 0;
302
303 c->rbuf= (char *)malloc((size_t)c->rsize);
304 c->wbuf= (char *)malloc((size_t)c->wsize);
305 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
306 c->msglist= (struct msghdr *)malloc(
307 sizeof(struct msghdr) * (size_t)c->msgsize);
308 if (ms_setting.mult_key_num > 1)
309 {
310 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
311 malloc(
312 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
313 }
314 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
315
316 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
317 || (c->msglist == NULL) || (c->tcpsfd == NULL)
318 || ((ms_setting.mult_key_num > 1)
319 && (c->mlget_task.mlget_item == NULL)))
320 {
321 if (c->rbuf != NULL)
322 free(c->rbuf);
323 if (c->wbuf != NULL)
324 free(c->wbuf);
325 if (c->iov != NULL)
326 free(c->iov);
327 if (c->msglist != NULL)
328 free(c->msglist);
329 if (c->mlget_task.mlget_item != NULL)
330 free(c->mlget_task.mlget_item);
331 if (c->tcpsfd != NULL)
332 free(c->tcpsfd);
333 fprintf(stderr, "malloc()\n");
334 return -1;
335 }
336
337 c->state= init_state;
338 c->rvbytes= 0;
339 c->rbytes= 0;
340 c->rcurr= c->rbuf;
341 c->wcurr= c->wbuf;
342 c->iovused= 0;
343 c->msgcurr= 0;
344 c->msgused= 0;
345 c->cur_idx= c->total_sfds; /* default index is a invalid value */
346
347 c->ctnwrite= false;
348 c->readval= false;
349 c->change_sfd= false;
350
351 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
352 c->precmd.isfinish= true; /* default the previous command finished */
353 c->currcmd.isfinish= false;
354 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
355 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
356
357 c->mlget_task.mlget_num= 0;
358 c->mlget_task.value_index= -1; /* default invalid value */
359
360 if (ms_setting.binary_prot)
361 {
362 c->protocol= binary_prot;
363 }
364 else if (is_udp)
365 {
366 c->protocol= ascii_udp_prot;
367 }
368 else
369 {
370 c->protocol= ascii_prot;
371 }
372
373 /* initialize udp */
374 if (ms_conn_udp_init(c, is_udp) != 0)
375 {
376 return -1;
377 }
378
379 /* initialize task */
380 ms_task_init(c);
381
382 if (! (ms_setting.facebook_test && is_udp))
383 {
384 atomic_add_32(&ms_stats.active_conns, 1);
385 }
386
387 return 0;
388 } /* ms_conn_init */
389
390
391 /**
392 * when doing 100% get operation, it could preset some objects
393 * to warmup the server. this function is used to initialize the
394 * number of the objects to preset.
395 *
396 * @param c, pointer of the concurrency
397 */
398 static void ms_warmup_num_init(ms_conn_t *c)
399 {
400 /* no set operation, preset all the items in the window */
401 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
402 {
403 c->warmup_num= c->win_size;
404 c->remain_warmup_num= c->warmup_num;
405 }
406 else
407 {
408 c->warmup_num= 0;
409 c->remain_warmup_num= c->warmup_num;
410 }
411 } /* ms_warmup_num_init */
412
413
414 /**
415 * each connection has an item window, this function initialize
416 * the window. The window is used to generate task.
417 *
418 * @param c, pointer of the concurrency
419 *
420 * @return int, if success, return 0, else return -1
421 */
422 static int ms_item_win_init(ms_conn_t *c)
423 {
424 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
425 int exp_cnt= 0;
426
427 c->win_size= (int)ms_setting.win_size;
428 c->set_cursor= 0;
429 c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
430 c->remain_exec_num= c->exec_num;
431
432 c->item_win= (ms_task_item_t *)malloc(
433 sizeof(ms_task_item_t) * (size_t)c->win_size);
434 if (c->item_win == NULL)
435 {
436 fprintf(stderr, "Can't allocate task item array for conn.\n");
437 return -1;
438 }
439 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
440
441 for (int i= 0; i < c->win_size; i++)
442 {
443 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
444 c->item_win[i].key_prefix= ms_get_key_prefix();
445 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
446 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
447 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
448 c->item_win[i].client_time= 0;
449
450 /* set expire time base on the proportion */
451 if (exp_cnt < ms_setting.exp_ver_per * i)
452 {
453 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
454 exp_cnt++;
455 }
456 else
457 {
458 c->item_win[i].exp_time= 0;
459 }
460 }
461
462 ms_warmup_num_init(c);
463
464 return 0;
465 } /* ms_item_win_init */
466
467
468 /**
469 * each connection structure can include one or more sock
470 * handlers. this function create these socks and connect the
471 * server(s).
472 *
473 * @param c, pointer of the concurrency
474 *
475 * @return int, if success, return 0, else return -1
476 */
477 static int ms_conn_sock_init(ms_conn_t *c)
478 {
479 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
480 int i;
481 int ret_sfd;
482 int srv_idx= 0;
483
484 assert(c != NULL);
485 assert(c->tcpsfd != NULL);
486
487 for (i= 0; i < c->total_sfds; i++)
488 {
489 ret_sfd= 0;
490 if (ms_setting.rep_write_srv > 0)
491 {
492 /* for replication, each connection need connect all the server */
493 srv_idx= i;
494 }
495 else
496 {
497 /* all the connections in a thread connects the same server */
498 srv_idx= ms_thread->thread_ctx->srv_idx;
499 }
500
501 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
502 ms_setting.servers[srv_idx].srv_port,
503 ms_setting.udp, &ret_sfd) != 0)
504 {
505 break;
506 }
507
508 if (i == 0)
509 {
510 c->sfd= ret_sfd;
511 }
512
513 if (! ms_setting.udp)
514 {
515 c->tcpsfd[i]= ret_sfd;
516 }
517
518 c->alive_sfds++;
519 }
520
521 /* initialize udp sock handler if necessary */
522 if (ms_setting.facebook_test)
523 {
524 ret_sfd= 0;
525 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
526 ms_setting.servers[srv_idx].srv_port,
527 true, &ret_sfd) != 0)
528 {
529 c->udpsfd= 0;
530 }
531 else
532 {
533 c->udpsfd= ret_sfd;
534 }
535 }
536
537 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
538 {
539 if (ms_setting.udp)
540 {
541 close(c->sfd);
542 }
543 else
544 {
545 for (int j= 0; j < i; j++)
546 {
547 close(c->tcpsfd[j]);
548 }
549 }
550
551 if (c->udpsfd != 0)
552 {
553 close(c->udpsfd);
554 }
555
556 return -1;
557 }
558
559 return 0;
560 } /* ms_conn_sock_init */
561
562
563 /**
564 * each connection is managed by libevent, this function
565 * initialize the event of the connection structure.
566 *
567 * @param c, pointer of the concurrency
568 *
569 * @return int, if success, return 0, else return -1
570 */
571 static int ms_conn_event_init(ms_conn_t *c)
572 {
573 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
574 /* default event timeout 10 seconds */
575 struct timeval t=
576 {
577 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
578 };
579 short event_flags= EV_WRITE | EV_PERSIST;
580
581 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
582 event_base_set(ms_thread->base, &c->event);
583 c->ev_flags= event_flags;
584
585 if (c->total_sfds == 1)
586 {
587 if (event_add(&c->event, NULL) == -1)
588 {
589 return -1;
590 }
591 }
592 else
593 {
594 if (event_add(&c->event, &t) == -1)
595 {
596 return -1;
597 }
598 }
599
600 return 0;
601 } /* ms_conn_event_init */
602
603
604 /**
605 * setup a connection, each connection structure of each
606 * thread must call this function to initialize.
607 *
608 * @param c, pointer of the concurrency
609 *
610 * @return int, if success, return 0, else return -1
611 */
612 int ms_setup_conn(ms_conn_t *c)
613 {
614 if (ms_item_win_init(c) != 0)
615 {
616 return -1;
617 }
618
619 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
620 {
621 return -1;
622 }
623
624 if (ms_conn_sock_init(c) != 0)
625 {
626 return -1;
627 }
628
629 if (ms_conn_event_init(c) != 0)
630 {
631 return -1;
632 }
633
634 return 0;
635 } /* ms_setup_conn */
636
637
638 /**
639 * Frees a connection.
640 *
641 * @param c, pointer of the concurrency
642 */
643 void ms_conn_free(ms_conn_t *c)
644 {
645 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
646 if (c != NULL)
647 {
648 if (c->hdrbuf != NULL)
649 free(c->hdrbuf);
650 if (c->msglist != NULL)
651 free(c->msglist);
652 if (c->rbuf != NULL)
653 free(c->rbuf);
654 if (c->wbuf != NULL)
655 free(c->wbuf);
656 if (c->iov != NULL)
657 free(c->iov);
658 if (c->mlget_task.mlget_item != NULL)
659 free(c->mlget_task.mlget_item);
660 if (c->rudpbuf != NULL)
661 free(c->rudpbuf);
662 if (c->udppkt != NULL)
663 free(c->udppkt);
664 if (c->item_win != NULL)
665 free(c->item_win);
666 if (c->tcpsfd != NULL)
667 free(c->tcpsfd);
668
669 if (--ms_thread->nactive_conn == 0)
670 {
671 free(ms_thread->conn);
672 }
673 }
674 } /* ms_conn_free */
675
676
677 /**
678 * close a connection
679 *
680 * @param c, pointer of the concurrency
681 */
682 static void ms_conn_close(ms_conn_t *c)
683 {
684 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
685 assert(c != NULL);
686
687 /* delete the event, the socket and the connection */
688 event_del(&c->event);
689
690 for (int i= 0; i < c->total_sfds; i++)
691 {
692 if (c->tcpsfd[i] > 0)
693 {
694 close(c->tcpsfd[i]);
695 }
696 }
697 c->sfd= 0;
698
699 if (ms_setting.facebook_test)
700 {
701 close(c->udpsfd);
702 }
703
704 atomic_dec_32(&ms_stats.active_conns);
705
706 ms_conn_free(c);
707
708 if (ms_setting.run_time == 0)
709 {
710 pthread_mutex_lock(&ms_global.run_lock.lock);
711 ms_global.run_lock.count++;
712 pthread_cond_signal(&ms_global.run_lock.cond);
713 pthread_mutex_unlock(&ms_global.run_lock.lock);
714 }
715
716 if (ms_thread->nactive_conn == 0)
717 {
718 pthread_exit(NULL);
719 }
720 } /* ms_conn_close */
721
722
723 /**
724 * create a new sock
725 *
726 * @param ai, server address information
727 *
728 * @return int, if success, return 0, else return -1
729 */
730 static int ms_new_socket(struct addrinfo *ai)
731 {
732 int sfd;
733
734 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
735 {
736 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
737 return -1;
738 }
739
740 return sfd;
741 } /* ms_new_socket */
742
743
744 /**
745 * Sets a socket's send buffer size to the maximum allowed by the system.
746 *
747 * @param sfd, file descriptor of socket
748 */
749 static void ms_maximize_sndbuf(const int sfd)
750 {
751 socklen_t intsize= sizeof(int);
752 unsigned int last_good= 0;
753 unsigned int min, max, avg;
754 unsigned int old_size;
755
756 /* Start with the default size. */
757 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
758 {
759 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
760 return;
761 }
762
763 /* Binary-search for the real maximum. */
764 min= old_size;
765 max= MAX_SENDBUF_SIZE;
766
767 while (min <= max)
768 {
769 avg= ((unsigned int)(min + max)) / 2;
770 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
771 {
772 last_good= avg;
773 min= avg + 1;
774 }
775 else
776 {
777 max= avg - 1;
778 }
779 }
780 } /* ms_maximize_sndbuf */
781
782
783 /**
784 * socket connects the server
785 *
786 * @param c, pointer of the concurrency
787 * @param srv_host_name, the host name of the server
788 * @param srv_port, port of server
789 * @param is_udp, whether it's udp
790 * @param ret_sfd, the connected socket file descriptor
791 *
792 * @return int, if success, return 0, else return -1
793 */
794 static int ms_network_connect(ms_conn_t *c,
795 char *srv_host_name,
796 const int srv_port,
797 const bool is_udp,
798 int *ret_sfd)
799 {
800 int sfd;
801 struct linger ling=
802 {
803 0, 0
804 };
805 struct addrinfo *ai;
806 struct addrinfo *next;
807 struct addrinfo hints;
808 char port_buf[NI_MAXSERV];
809 int error;
810 int success= 0;
811
812 int flags= 1;
813
814 /*
815 * the memset call clears nonstandard fields in some impementations
816 * that otherwise mess things up.
817 */
818 memset(&hints, 0, sizeof(hints));
819 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
820 if (is_udp)
821 {
822 hints.ai_protocol= IPPROTO_UDP;
823 hints.ai_socktype= SOCK_DGRAM;
824 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
825 }
826 else
827 {
828 hints.ai_family= AF_UNSPEC;
829 hints.ai_protocol= IPPROTO_TCP;
830 hints.ai_socktype= SOCK_STREAM;
831 }
832
833 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
834 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
835 if (error != 0)
836 {
837 if (error != EAI_SYSTEM)
838 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
839 else
840 perror("getaddrinfo()\n");
841
842 return -1;
843 }
844
845 for (next= ai; next; next= next->ai_next)
846 {
847 if ((sfd= ms_new_socket(next)) == -1)
848 {
849 freeaddrinfo(ai);
850 return -1;
851 }
852
853 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
854 if (is_udp)
855 {
856 ms_maximize_sndbuf(sfd);
857 }
858 else
859 {
860 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
861 sizeof(flags));
862 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
863 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
864 sizeof(flags));
865 }
866
867 if (is_udp)
868 {
869 c->srv_recv_addr_size= sizeof(struct sockaddr);
870 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
871 }
872 else
873 {
874 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
875 {
876 close(sfd);
877 freeaddrinfo(ai);
878 return -1;
879 }
880 }
881
882 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
883 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
884 {
885 fprintf(stderr, "setting O_NONBLOCK\n");
886 close(sfd);
887 freeaddrinfo(ai);
888 return -1;
889 }
890
891 if (ret_sfd != NULL)
892 {
893 *ret_sfd= sfd;
894 }
895
896 success++;
897 }
898
899 freeaddrinfo(ai);
900
901 /* Return zero if we detected no errors in starting up connections */
902 return success == 0;
903 } /* ms_network_connect */
904
905
906 /**
907 * reconnect a disconnected sock
908 *
909 * @param c, pointer of the concurrency
910 *
911 * @return int, if success, return 0, else return -1
912 */
913 static int ms_reconn(ms_conn_t *c)
914 {
915 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
916 int srv_idx= 0;
917 int32_t srv_conn_cnt= 0;
918
919 if (ms_setting.rep_write_srv > 0)
920 {
921 srv_idx= c->cur_idx;
922 srv_conn_cnt= (int)ms_setting.nconns;
923 }
924 else
925 {
926 srv_idx= ms_thread->thread_ctx->srv_idx;
927 srv_conn_cnt= (int32_t)((int)ms_setting.nconns / ms_setting.srv_cnt);
928 }
929
930 /* close the old socket handler */
931 close(c->sfd);
932 c->tcpsfd[c->cur_idx]= 0;
933
934 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
935 % (uint32_t)srv_conn_cnt == 0)
936 {
937 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
938 fprintf(stderr, "Server %s:%d disconnect\n",
939 ms_setting.servers[srv_idx].srv_host_name,
940 ms_setting.servers[srv_idx].srv_port);
941 }
942
943 if (ms_setting.rep_write_srv > 0)
944 {
945 int i= 0;
946 for (i= 0; i < c->total_sfds; i++)
947 {
948 if (c->tcpsfd[i] != 0)
949 {
950 break;
951 }
952 }
953
954 /* all socks disconnect */
955 if (i == c->total_sfds)
956 {
957 return -1;
958 }
959 }
960 else
961 {
962 do
963 {
964 /* reconnect success, break the loop */
965 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
966 ms_setting.servers[srv_idx].srv_port,
967 ms_setting.udp, &c->sfd) == 0)
968 {
969 c->tcpsfd[c->cur_idx]= c->sfd;
970 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
971 % (uint32_t)srv_conn_cnt == 0)
972 {
973 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
974 int reconn_time=
975 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
976 - ms_setting.servers[srv_idx].disconn_time
977 .tv_sec);
978 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
979 ms_setting.servers[srv_idx].srv_host_name,
980 ms_setting.servers[srv_idx].srv_port, reconn_time);
981 }
982 break;
983 }
984
985 if (c->total_sfds == 1)
986 {
987 /* wait a second and reconnect */
988 sleep(1);
989 }
990 }
991 while (c->total_sfds == 1);
992 }
993
994 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
995 {
996 c->sfd= 0;
997 c->alive_sfds--;
998 }
999
1000 return 0;
1001 } /* ms_reconn */
1002
1003
1004 /**
1005 * reconnect several disconnected socks in the connection
1006 * structure, the ever-1-second timer of the thread will check
1007 * whether some socks in the connections disconnect. if
1008 * disconnect, reconnect the sock.
1009 *
1010 * @param c, pointer of the concurrency
1011 *
1012 * @return int, if success, return 0, else return -1
1013 */
1014 int ms_reconn_socks(ms_conn_t *c)
1015 {
1016 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
1017 int srv_idx= 0;
1018 int ret_sfd= 0;
1019 int srv_conn_cnt= 0;
1020 struct timeval cur_time;
1021
1022 assert(c != NULL);
1023
1024 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1025 {
1026 return 0;
1027 }
1028
1029 for (int i= 0; i < c->total_sfds; i++)
1030 {
1031 if (c->tcpsfd[i] == 0)
1032 {
1033 gettimeofday(&cur_time, NULL);
1034
1035 /**
1036 * For failover test of replication, reconnect the socks after
1037 * it disconnects more than 5 seconds, Otherwise memslap will
1038 * block at connect() function and the work threads can't work
1039 * in this interval.
1040 */
1041 if (cur_time.tv_sec
1042 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1043 {
1044 break;
1045 }
1046
1047 if (ms_setting.rep_write_srv > 0)
1048 {
1049 srv_idx= i;
1050 srv_conn_cnt= (int)ms_setting.nconns;
1051 }
1052 else
1053 {
1054 srv_idx= ms_thread->thread_ctx->srv_idx;
1055 srv_conn_cnt= (int)ms_setting.nconns / ms_setting.srv_cnt;
1056 }
1057
1058 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1059 ms_setting.servers[srv_idx].srv_port,
1060 ms_setting.udp, &ret_sfd) == 0)
1061 {
1062 c->tcpsfd[i]= ret_sfd;
1063 c->alive_sfds++;
1064
1065 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1066 % (uint32_t)srv_conn_cnt == 0)
1067 {
1068 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1069 int reconn_time=
1070 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1071 - ms_setting.servers[srv_idx].disconn_time
1072 .tv_sec);
1073 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1074 ms_setting.servers[srv_idx].srv_host_name,
1075 ms_setting.servers[srv_idx].srv_port, reconn_time);
1076 }
1077 }
1078 }
1079 }
1080
1081 return 0;
1082 } /* ms_reconn_socks */
1083
1084
1085 /**
1086 * Tokenize the command string by replacing whitespace with '\0' and update
1087 * the token array tokens with pointer to start of each token and length.
1088 * Returns total number of tokens. The last valid token is the terminal
1089 * token (value points to the first unprocessed character of the string and
1090 * length zero).
1091 *
1092 * Usage example:
1093 *
1094 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1095 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1096 * ...
1097 * }
1098 * ncommand = tokens[ix].value - command;
1099 * command = tokens[ix].value;
1100 * }
1101 *
1102 * @param command, the command string to token
1103 * @param tokens, array to store tokens
1104 * @param max_tokens, maximum tokens number
1105 *
1106 * @return int, the number of tokens
1107 */
1108 static int ms_tokenize_command(char *command,
1109 token_t *tokens,
1110 const int max_tokens)
1111 {
1112 char *s, *e;
1113 int ntokens= 0;
1114
1115 assert(command != NULL && tokens != NULL && max_tokens > 1);
1116
1117 for (s= e= command; ntokens < max_tokens - 1; ++e)
1118 {
1119 if (*e == ' ')
1120 {
1121 if (s != e)
1122 {
1123 tokens[ntokens].value= s;
1124 tokens[ntokens].length= (size_t)(e - s);
1125 ntokens++;
1126 *e= '\0';
1127 }
1128 s= e + 1;
1129 }
1130 else if (*e == '\0')
1131 {
1132 if (s != e)
1133 {
1134 tokens[ntokens].value= s;
1135 tokens[ntokens].length= (size_t)(e - s);
1136 ntokens++;
1137 }
1138
1139 break; /* string end */
1140 }
1141 }
1142
1143 return ntokens;
1144 } /* ms_tokenize_command */
1145
1146
1147 /**
1148 * parse the response of server.
1149 *
1150 * @param c, pointer of the concurrency
1151 * @param command, the string responded by server
1152 *
1153 * @return int, if the command completed return 0, else return
1154 * -1
1155 */
1156 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1157 {
1158 int ret= 0;
1159 int64_t value_len;
1160 char *buffer= command;
1161
1162 assert(c != NULL);
1163
1164 /**
1165 * for command get, we store the returned value into local buffer
1166 * then continue in ms_complete_nread().
1167 */
1168
1169 switch (buffer[0])
1170 {
1171 case 'V': /* VALUE || VERSION */
1172 if (buffer[1] == 'A') /* VALUE */
1173 {
1174 token_t tokens[MAX_TOKENS];
1175 ms_tokenize_command(command, tokens, MAX_TOKENS);
1176 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1177 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1178
1179 /*
1180 * We read the \r\n into the string since not doing so is more
1181 * cycles then the waster of memory to do so.
1182 *
1183 * We are null terminating through, which will most likely make
1184 * some people lazy about using the return length.
1185 */
1186 c->rvbytes= (int)(value_len + 2);
1187 c->readval= true;
1188 ret= -1;
1189 }
1190
1191 break;
1192
1193 case 'O': /* OK */
1194 c->currcmd.retstat= MCD_SUCCESS;
1195
1196 case 'S': /* STORED STATS SERVER_ERROR */
1197 if (buffer[2] == 'A') /* STORED STATS */
1198 { /* STATS*/
1199 c->currcmd.retstat= MCD_STAT;
1200 }
1201 else if (buffer[1] == 'E')
1202 {
1203 /* SERVER_ERROR */
1204 printf("<%d %s\n", c->sfd, buffer);
1205
1206 c->currcmd.retstat= MCD_SERVER_ERROR;
1207 }
1208 else if (buffer[1] == 'T')
1209 {
1210 /* STORED */
1211 c->currcmd.retstat= MCD_STORED;
1212 }
1213 else
1214 {
1215 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1216 }
1217 break;
1218
1219 case 'D': /* DELETED DATA */
1220 if (buffer[1] == 'E')
1221 {
1222 c->currcmd.retstat= MCD_DELETED;
1223 }
1224 else
1225 {
1226 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1227 }
1228
1229 break;
1230
1231 case 'N': /* NOT_FOUND NOT_STORED*/
1232 if (buffer[4] == 'F')
1233 {
1234 c->currcmd.retstat= MCD_NOTFOUND;
1235 }
1236 else if (buffer[4] == 'S')
1237 {
1238 printf("<%d %s\n", c->sfd, buffer);
1239 c->currcmd.retstat= MCD_NOTSTORED;
1240 }
1241 else
1242 {
1243 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1244 }
1245 break;
1246
1247 case 'E': /* PROTOCOL ERROR or END */
1248 if (buffer[1] == 'N')
1249 {
1250 /* END */
1251 c->currcmd.retstat= MCD_END;
1252 }
1253 else if (buffer[1] == 'R')
1254 {
1255 printf("<%d ERROR\n", c->sfd);
1256 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1257 }
1258 else if (buffer[1] == 'X')
1259 {
1260 c->currcmd.retstat= MCD_DATA_EXISTS;
1261 printf("<%d %s\n", c->sfd, buffer);
1262 }
1263 else
1264 {
1265 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1266 }
1267 break;
1268
1269 case 'C': /* CLIENT ERROR */
1270 printf("<%d %s\n", c->sfd, buffer);
1271 c->currcmd.retstat= MCD_CLIENT_ERROR;
1272 break;
1273
1274 default:
1275 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1276 break;
1277 } /* switch */
1278
1279 return ret;
1280 } /* ms_ascii_process_line */
1281
1282
1283 /**
1284 * after one operation completes, reset the concurrency
1285 *
1286 * @param c, pointer of the concurrency
1287 * @param timeout, whether it's timeout
1288 */
1289 void ms_reset_conn(ms_conn_t *c, bool timeout)
1290 {
1291 assert(c != NULL);
1292
1293 if (c->udp)
1294 {
1295 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1296 {
1297 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
1298 }
1299
1300 c->packets= 0;
1301 c->recvpkt= 0;
1302 c->pktcurr= 0;
1303 c->ordcurr= 0;
1304 c->rudpbytes= 0;
1305 }
1306 c->currcmd.isfinish= true;
1307 c->ctnwrite= false;
1308 c->rbytes= 0;
1309 c->rcurr= c->rbuf;
1310 ms_conn_set_state(c, conn_write);
1311 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1312
1313 if (timeout)
1314 {
1315 ms_drive_machine(c);
1316 }
1317 } /* ms_reset_conn */
1318
1319
1320 /**
1321 * if we have a complete line in the buffer, process it.
1322 *
1323 * @param c, pointer of the concurrency
1324 *
1325 * @return int, if success, return 0, else return -1
1326 */
1327 static int ms_try_read_line(ms_conn_t *c)
1328 {
1329 if (c->protocol == binary_prot)
1330 {
1331 /* Do we have the complete packet header? */
1332 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1333 {
1334 /* need more data! */
1335 return 0;
1336 }
1337 else
1338 {
1339 #ifdef NEED_ALIGN
1340 if (((long)(c->rcurr)) % 8 != 0)
1341 {
1342 /* must realign input buffer */
1343 memmove(c->rbuf, c->rcurr, c->rbytes);
1344 c->rcurr= c->rbuf;
1345 if (settings.verbose)
1346 {
1347 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1348 }
1349 }
1350 #endif
1351 protocol_binary_response_header *rsp;
1352 rsp= (protocol_binary_response_header *)c->rcurr;
1353
1354 c->binary_header= *rsp;
1355 c->binary_header.response.extlen= rsp->response.extlen;
1356 c->binary_header.response.keylen= ntohs(rsp->response.keylen);
1357 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1358 c->binary_header.response.status= ntohs(rsp->response.status);
1359
1360 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1361 {
1362 fprintf(stderr, "Invalid magic: %x\n",
1363 c->binary_header.response.magic);
1364 ms_conn_set_state(c, conn_closing);
1365 return 0;
1366 }
1367
1368 /* process this complete response */
1369 if (ms_bin_process_response(c) == 0)
1370 {
1371 /* current operation completed */
1372 ms_reset_conn(c, false);
1373 return -1;
1374 }
1375 else
1376 {
1377 c->rbytes-= (int32_t)sizeof(c->binary_header);
1378 c->rcurr+= sizeof(c->binary_header);
1379 }
1380 }
1381 }
1382 else
1383 {
1384 char *el, *cont;
1385
1386 assert(c != NULL);
1387 assert(c->rcurr <= (c->rbuf + c->rsize));
1388
1389 if (c->rbytes == 0)
1390 return 0;
1391
1392 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1393 if (! el)
1394 return 0;
1395
1396 cont= el + 1;
1397 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1398 {
1399 el--;
1400 }
1401 *el= '\0';
1402
1403 assert(cont <= (c->rcurr + c->rbytes));
1404
1405 /* process this complete line */
1406 if (ms_ascii_process_line(c, c->rcurr) == 0)
1407 {
1408 /* current operation completed */
1409 ms_reset_conn(c, false);
1410 return -1;
1411 }
1412 else
1413 {
1414 /* current operation didn't complete */
1415 c->rbytes-= (int32_t)(cont - c->rcurr);
1416 c->rcurr= cont;
1417 }
1418
1419 assert(c->rcurr <= (c->rbuf + c->rsize));
1420 }
1421
1422 return -1;
1423 } /* ms_try_read_line */
1424
1425
1426 /**
1427 * because the packet of UDP can't ensure the order, the
1428 * function is used to sort the received udp packet.
1429 *
1430 * @param c, pointer of the concurrency
1431 * @param buf, the buffer to store the ordered packages data
1432 * @param rbytes, the maximum capacity of the buffer
1433 *
1434 * @return int, if success, return the copy bytes, else return
1435 * -1
1436 */
1437 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1438 {
1439 int len= 0;
1440 int wbytes= 0;
1441 uint16_t req_id= 0;
1442 uint16_t seq_num= 0;
1443 uint16_t packets= 0;
1444 unsigned char *header= NULL;
1445
1446 /* no enough data */
1447 assert(c != NULL);
1448 assert(buf != NULL);
1449 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1450
1451 /* calculate received packets count */
1452 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1453 {
1454 /* the last packet has some data */
1455 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1456 }
1457 else
1458 {
1459 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1460 }
1461
1462 /* get the total packets count if necessary */
1463 if (c->packets == 0)
1464 {
1465 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1466 }
1467
1468 /* build the ordered packet array */
1469 for (int i= c->pktcurr; i < c->recvpkt; i++)
1470 {
1471 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1472 req_id= (uint16_t)HEADER_TO_REQID(header);
1473 assert(req_id == c->request_id % (1 << 16));
1474
1475 packets= (uint16_t)HEADER_TO_PACKETS(header);
1476 assert(c->packets == HEADER_TO_PACKETS(header));
1477
1478 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1479 c->udppkt[seq_num].header= header;
1480 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1481
1482 if (i == c->recvpkt - 1)
1483 {
1484 /* last received packet */
1485 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1486 {
1487 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1488 c->pktcurr++;
1489 }
1490 else
1491 {
1492 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1493 - UDP_HEADER_SIZE;
1494 }
1495 }
1496 else
1497 {
1498 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1499 c->pktcurr++;
1500 }
1501 }
1502
1503 for (int i= c->ordcurr; i < c->recvpkt; i++)
1504 {
1505 /* there is some data to copy */
1506 if ((c->udppkt[i].data != NULL)
1507 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1508 {
1509 header= c->udppkt[i].header;
1510 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1511 if (len > rbytes - wbytes)
1512 {
1513 len= rbytes - wbytes;
1514 }
1515
1516 assert(len <= rbytes - wbytes);
1517 assert(i == HEADER_TO_SEQNUM(header));
1518
1519 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1520 (size_t)len);
1521 wbytes+= len;
1522 c->udppkt[i].copybytes+= len;
1523
1524 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1525 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1526 {
1527 /* finish copying all the data of this packet, next */
1528 c->ordcurr++;
1529 }
1530
1531 /* last received packet, and finish copying all the data */
1532 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1533 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1534 {
1535 break;
1536 }
1537
1538 /* no space to copy data */
1539 if (wbytes >= rbytes)
1540 {
1541 break;
1542 }
1543
1544 /* it doesn't finish reading all the data of the packet from network */
1545 if ((i != c->recvpkt - 1)
1546 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1547 {
1548 break;
1549 }
1550 }
1551 else
1552 {
1553 /* no data to copy */
1554 break;
1555 }
1556 }
1557
1558 return wbytes == 0 ? -1 : wbytes;
1559 } /* ms_sort_udp_packet */
1560
1561
1562 /**
1563 * encapsulate upd read like tcp read
1564 *
1565 * @param c, pointer of the concurrency
1566 * @param buf, read buffer
1567 * @param len, length to read
1568 *
1569 * @return int, if success, return the read bytes, else return
1570 * -1
1571 */
1572 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1573 {
1574 int res= 0;
1575 int avail= 0;
1576 int rbytes= 0;
1577 int copybytes= 0;
1578
1579 assert(c->udp);
1580
1581 while (1)
1582 {
1583 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1584 {
1585 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1586 if (! new_rbuf)
1587 {
1588 fprintf(stderr, "Couldn't realloc input buffer.\n");
1589 c->rudpbytes= 0; /* ignore what we read */
1590 return -1;
1591 }
1592 c->rudpbuf= new_rbuf;
1593 c->rudpsize*= 2;
1594 }
1595
1596 avail= c->rudpsize - c->rudpbytes;
1597 /* UDP each time read a packet, 1400 bytes */
1598 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1599
1600 if (res > 0)
1601 {
1602 atomic_add_size(&ms_stats.bytes_read, res);
1603 c->rudpbytes+= res;
1604 rbytes+= res;
1605 if (res == avail)
1606 {
1607 continue;
1608 }
1609 else
1610 {
1611 break;
1612 }
1613 }
1614
1615 if (res == 0)
1616 {
1617 /* "connection" closed */
1618 return res;
1619 }
1620
1621 if (res == -1)
1622 {
1623 /* no data to read */
1624 return res;
1625 }
1626 }
1627
1628 /* copy data to read buffer */
1629 if (rbytes > 0)
1630 {
1631 copybytes= ms_sort_udp_packet(c, buf, len);
1632 }
1633
1634 if (copybytes == -1)
1635 {
1636 atomic_add_size(&ms_stats.pkt_disorder, 1);
1637 }
1638
1639 return copybytes;
1640 } /* ms_udp_read */
1641
1642
1643 /*
1644 * read from network as much as we can, handle buffer overflow and connection
1645 * close.
1646 * before reading, move the remaining incomplete fragment of a command
1647 * (if any) to the beginning of the buffer.
1648 * return 0 if there's nothing to read on the first read.
1649 */
1650
1651 /**
1652 * read from network as much as we can, handle buffer overflow and connection
1653 * close. before reading, move the remaining incomplete fragment of a command
1654 * (if any) to the beginning of the buffer.
1655 *
1656 * @param c, pointer of the concurrency
1657 *
1658 * @return int,
1659 * return 0 if there's nothing to read on the first read.
1660 * return 1 if get data
1661 * return -1 if error happens
1662 */
1663 static int ms_try_read_network(ms_conn_t *c)
1664 {
1665 int gotdata= 0;
1666 int res;
1667 int64_t avail;
1668
1669 assert(c != NULL);
1670
1671 if ((c->rcurr != c->rbuf)
1672 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1673 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1674 {
1675 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1676 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1677 c->rcurr= c->rbuf;
1678 }
1679
1680 while (1)
1681 {
1682 if (c->rbytes >= c->rsize)
1683 {
1684 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1685 if (! new_rbuf)
1686 {
1687 fprintf(stderr, "Couldn't realloc input buffer.\n");
1688 c->rbytes= 0; /* ignore what we read */
1689 return -1;
1690 }
1691 c->rcurr= c->rbuf= new_rbuf;
1692 c->rsize*= 2;
1693 }
1694
1695 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1696 if (avail == 0)
1697 {
1698 break;
1699 }
1700
1701 if (c->udp)
1702 {
1703 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1704 }
1705 else
1706 {
1707 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1708 }
1709
1710 if (res > 0)
1711 {
1712 if (! c->udp)
1713 {
1714 atomic_add_size(&ms_stats.bytes_read, res);
1715 }
1716 gotdata= 1;
1717 c->rbytes+= res;
1718 if (res == avail)
1719 {
1720 continue;
1721 }
1722 else
1723 {
1724 break;
1725 }
1726 }
1727 if (res == 0)
1728 {
1729 /* connection closed */
1730 ms_conn_set_state(c, conn_closing);
1731 return -1;
1732 }
1733 if (res == -1)
1734 {
1735 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1736 break;
1737 /* Should close on unhandled errors. */
1738 ms_conn_set_state(c, conn_closing);
1739 return -1;
1740 }
1741 }
1742
1743 return gotdata;
1744 } /* ms_try_read_network */
1745
1746
1747 /**
1748 * after get the object from server, verify the value if
1749 * necessary.
1750 *
1751 * @param c, pointer of the concurrency
1752 * @param mlget_item, pointer of mulit-get task item structure
1753 * @param value, received value string
1754 * @param vlen, received value string length
1755 */
1756 static void ms_verify_value(ms_conn_t *c,
1757 ms_mlget_task_item_t *mlget_item,
1758 char *value,
1759 int vlen)
1760 {
1761 if (c->curr_task.verify)
1762 {
1763 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1764 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1765 char *orignkey=
1766 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1767
1768 /* verify expire time if necessary */
1769 if (c->curr_task.item->exp_time > 0)
1770 {
1771 struct timeval curr_time;
1772 gettimeofday(&curr_time, NULL);
1773
1774 /* object expired but get it now */
1775 if (curr_time.tv_sec - c->curr_task.item->client_time
1776 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1777 {
1778 atomic_add_size(&ms_stats.exp_get, 1);
1779
1780 if (ms_setting.verbose)
1781 {
1782 char set_time[64];
1783 char cur_time[64];
1784 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1785 localtime(&c->curr_task.item->client_time));
1786 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1787 localtime(&curr_time.tv_sec));
1788 fprintf(stderr,
1789 "\n<%d expire time verification failed, "
1790 "object expired but get it now\n"
1791 "\tkey len: %d\n"
1792 "\tkey: %" PRIx64 " %.*s\n"
1793 "\tset time: %s current time: %s "
1794 "diff time: %d expire time: %d\n"
1795 "\texpected data: \n"
1796 "\treceived data len: %d\n"
1797 "\treceived data: %.*s\n",
1798 c->sfd,
1799 c->curr_task.item->key_size,
1800 c->curr_task.item->key_prefix,
1801 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1802 orignkey,
1803 set_time,
1804 cur_time,
1805 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1806 c->curr_task.item->exp_time,
1807 vlen,
1808 vlen,
1809 value);
1810 fflush(stderr);
1811 }
1812 }
1813 }
1814 else
1815 {
1816 if ((c->curr_task.item->value_size != vlen)
1817 || (memcmp(orignval, value, (size_t)vlen) != 0))
1818 {
1819 atomic_add_size(&ms_stats.vef_failed, 1);
1820
1821 if (ms_setting.verbose)
1822 {
1823 fprintf(stderr,
1824 "\n<%d data verification failed\n"
1825 "\tkey len: %d\n"
1826 "\tkey: %" PRIx64" %.*s\n"
1827 "\texpected data len: %d\n"
1828 "\texpected data: %.*s\n"
1829 "\treceived data len: %d\n"
1830 "\treceived data: %.*s\n",
1831 c->sfd,
1832 c->curr_task.item->key_size,
1833 c->curr_task.item->key_prefix,
1834 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1835 orignkey,
1836 c->curr_task.item->value_size,
1837 c->curr_task.item->value_size,
1838 orignval,
1839 vlen,
1840 vlen,
1841 value);
1842 fflush(stderr);
1843 }
1844 }
1845 }
1846
1847 c->curr_task.finish_verify= true;
1848
1849 if (mlget_item != NULL)
1850 {
1851 mlget_item->finish_verify= true;
1852 }
1853 }
1854 } /* ms_verify_value */
1855
1856
1857 /**
1858 * For ASCII protocol, after store the data into the local
1859 * buffer, run this function to handle the data.
1860 *
1861 * @param c, pointer of the concurrency
1862 */
1863 static void ms_ascii_complete_nread(ms_conn_t *c)
1864 {
1865 assert(c != NULL);
1866 assert(c->rbytes >= c->rvbytes);
1867 assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
1868 if (c->rvbytes > 2)
1869 {
1870 assert(
1871 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1872 }
1873
1874 /* multi-get */
1875 ms_mlget_task_item_t *mlget_item= NULL;
1876 if (((ms_setting.mult_key_num > 1)
1877 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1878 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1879 {
1880 c->mlget_task.value_index++;
1881 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1882
1883 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1884 {
1885 c->curr_task.item= mlget_item->item;
1886 c->curr_task.verify= mlget_item->verify;
1887 c->curr_task.finish_verify= mlget_item->finish_verify;
1888 mlget_item->get_miss= false;
1889 }
1890 else
1891 {
1892 /* Try to find the task item in multi-get task array */
1893 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1894 {
1895 mlget_item= &c->mlget_task.mlget_item[i];
1896 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1897 {
1898 c->curr_task.item= mlget_item->item;
1899 c->curr_task.verify= mlget_item->verify;
1900 c->curr_task.finish_verify= mlget_item->finish_verify;
1901 mlget_item->get_miss= false;
1902
1903 break;
1904 }
1905 }
1906 }
1907 }
1908
1909 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1910
1911 c->curr_task.get_miss= false;
1912 c->rbytes-= c->rvbytes;
1913 c->rcurr= c->rcurr + c->rvbytes;
1914 assert(c->rcurr <= (c->rbuf + c->rsize));
1915 c->readval= false;
1916 c->rvbytes= 0;
1917 } /* ms_ascii_complete_nread */
1918
1919
1920 /**
1921 * For binary protocol, after store the data into the local
1922 * buffer, run this function to handle the data.
1923 *
1924 * @param c, pointer of the concurrency
1925 */
1926 static void ms_bin_complete_nread(ms_conn_t *c)
1927 {
1928 assert(c != NULL);
1929 assert(c->rbytes >= c->rvbytes);
1930 assert(c->protocol == binary_prot);
1931
1932 int extlen= c->binary_header.response.extlen;
1933 int keylen= c->binary_header.response.keylen;
1934 uint8_t opcode= c->binary_header.response.opcode;
1935
1936 /* not get command or not include value, just return */
1937 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1938 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1939 || (c->rvbytes <= extlen + keylen))
1940 {
1941 /* get miss */
1942 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1943 {
1944 c->currcmd.retstat= MCD_END;
1945 c->curr_task.get_miss= true;
1946 }
1947
1948 c->readval= false;
1949 c->rvbytes= 0;
1950 ms_reset_conn(c, false);
1951 return;
1952 }
1953
1954 /* multi-get */
1955 ms_mlget_task_item_t *mlget_item= NULL;
1956 if (((ms_setting.mult_key_num > 1)
1957 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1958 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1959 {
1960 c->mlget_task.value_index++;
1961 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1962
1963 c->curr_task.item= mlget_item->item;
1964 c->curr_task.verify= mlget_item->verify;
1965 c->curr_task.finish_verify= mlget_item->finish_verify;
1966 mlget_item->get_miss= false;
1967 }
1968
1969 ms_verify_value(c,
1970 mlget_item,
1971 c->rcurr + extlen + keylen,
1972 c->rvbytes - extlen - keylen);
1973
1974 c->currcmd.retstat= MCD_END;
1975 c->curr_task.get_miss= false;
1976 c->rbytes-= c->rvbytes;
1977 c->rcurr= c->rcurr + c->rvbytes;
1978 assert(c->rcurr <= (c->rbuf + c->rsize));
1979 c->readval= false;
1980 c->rvbytes= 0;
1981
1982 if (ms_setting.mult_key_num > 1)
1983 {
1984 /* multi-get have check all the item */
1985 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1986 {
1987 ms_reset_conn(c, false);
1988 }
1989 }
1990 else
1991 {
1992 /* single get */
1993 ms_reset_conn(c, false);
1994 }
1995 } /* ms_bin_complete_nread */
1996
1997
1998 /**
1999 * we get here after reading the value of get commands.
2000 *
2001 * @param c, pointer of the concurrency
2002 */
2003 static void ms_complete_nread(ms_conn_t *c)
2004 {
2005 assert(c != NULL);
2006 assert(c->rbytes >= c->rvbytes);
2007 assert(c->protocol == ascii_udp_prot
2008 || c->protocol == ascii_prot
2009 || c->protocol == binary_prot);
2010
2011 if (c->protocol == binary_prot)
2012 {
2013 ms_bin_complete_nread(c);
2014 }
2015 else
2016 {
2017 ms_ascii_complete_nread(c);
2018 }
2019 } /* ms_complete_nread */
2020
2021
2022 /**
2023 * Adds a message header to a connection.
2024 *
2025 * @param c, pointer of the concurrency
2026 *
2027 * @return int, if success, return 0, else return -1
2028 */
2029 static int ms_add_msghdr(ms_conn_t *c)
2030 {
2031 struct msghdr *msg;
2032
2033 assert(c != NULL);
2034
2035 if (c->msgsize == c->msgused)
2036 {
2037 msg=
2038 realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
2039 if (! msg)
2040 return -1;
2041
2042 c->msglist= msg;
2043 c->msgsize*= 2;
2044 }
2045
2046 msg= c->msglist + c->msgused;
2047
2048 /**
2049 * this wipes msg_iovlen, msg_control, msg_controllen, and
2050 * msg_flags, the last 3 of which aren't defined on solaris:
2051 */
2052 memset(msg, 0, sizeof(struct msghdr));
2053
2054 msg->msg_iov= &c->iov[c->iovused];
2055
2056 if (c->udp && (c->srv_recv_addr_size > 0))
2057 {
2058 msg->msg_name= &c->srv_recv_addr;
2059 msg->msg_namelen= c->srv_recv_addr_size;
2060 }
2061
2062 c->msgbytes= 0;
2063 c->msgused++;
2064
2065 if (c->udp)
2066 {
2067 /* Leave room for the UDP header, which we'll fill in later. */
2068 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2069 }
2070
2071 return 0;
2072 } /* ms_add_msghdr */
2073
2074
2075 /**
2076 * Ensures that there is room for another structure iovec in a connection's
2077 * iov list.
2078 *
2079 * @param c, pointer of the concurrency
2080 *
2081 * @return int, if success, return 0, else return -1
2082 */
2083 static int ms_ensure_iov_space(ms_conn_t *c)
2084 {
2085 assert(c != NULL);
2086
2087 if (c->iovused >= c->iovsize)
2088 {
2089 int i, iovnum;
2090 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2091 ((size_t)c->iovsize
2092 * 2)
2093 * sizeof(struct iovec));
2094 if (! new_iov)
2095 return -1;
2096
2097 c->iov= new_iov;
2098 c->iovsize*= 2;
2099
2100 /* Point all the msghdr structures at the new list. */
2101 for (i= 0, iovnum= 0; i < c->msgused; i++)
2102 {
2103 c->msglist[i].msg_iov= &c->iov[iovnum];
2104 iovnum+= (int)c->msglist[i].msg_iovlen;
2105 }
2106 }
2107
2108 return 0;
2109 } /* ms_ensure_iov_space */
2110
2111
2112 /**
2113 * Adds data to the list of pending data that will be written out to a
2114 * connection.
2115 *
2116 * @param c, pointer of the concurrency
2117 * @param buf, the buffer includes data to send
2118 * @param len, the data length in the buffer
2119 *
2120 * @return int, if success, return 0, else return -1
2121 */
2122 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2123 {
2124 struct msghdr *m;
2125 int leftover;
2126 bool limit_to_mtu;
2127
2128 assert(c != NULL);
2129
2130 do
2131 {
2132 m= &c->msglist[c->msgused - 1];
2133
2134 /*
2135 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2136 */
2137 limit_to_mtu= c->udp;
2138
2139 /* We may need to start a new msghdr if this one is full. */
2140 if ((m->msg_iovlen == IOV_MAX)
2141 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2142 {
2143 ms_add_msghdr(c);
2144 m= &c->msglist[c->msgused - 1];
2145 }
2146
2147 if (ms_ensure_iov_space(c) != 0)
2148 return -1;
2149
2150 /* If the fragment is too big to fit in the datagram, split it up */
2151 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2152 {
2153 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2154 len-= leftover;
2155 }
2156 else
2157 {
2158 leftover= 0;
2159 }
2160
2161 m= &c->msglist[c->msgused - 1];
2162 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2163 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2164
2165 c->msgbytes+= len;
2166 c->iovused++;
2167 m->msg_iovlen++;
2168
2169 buf= ((char *)buf) + len;
2170 len= leftover;
2171 }
2172 while (leftover > 0);
2173
2174 return 0;
2175 } /* ms_add_iov */
2176
2177
2178 /**
2179 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2180 *
2181 * @param c, pointer of the concurrency
2182 *
2183 * @return int, if success, return 0, else return -1
2184 */
2185 static int ms_build_udp_headers(ms_conn_t *c)
2186 {
2187 int i;
2188 unsigned char *hdr;
2189
2190 assert(c != NULL);
2191
2192 c->request_id= ms_get_udp_request_id();
2193
2194 if (c->msgused > c->hdrsize)
2195 {
2196 void *new_hdrbuf;
2197 if (c->hdrbuf)
2198 new_hdrbuf= realloc(c->hdrbuf,
2199 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2200 else
2201 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2202 if (! new_hdrbuf)
2203 return -1;
2204
2205 c->hdrbuf= (unsigned char *)new_hdrbuf;
2206 c->hdrsize= c->msgused * 2;
2207 }
2208
2209 /* If this is a multi-packet request, drop it. */
2210 if (c->udp && (c->msgused > 1))
2211 {
2212 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2213 return -1;
2214 }
2215
2216 hdr= c->hdrbuf;
2217 for (i= 0; i < c->msgused; i++)
2218 {
2219 c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2220 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2221 *hdr++= (unsigned char)(c->request_id / 256);
2222 *hdr++= (unsigned char)(c->request_id % 256);
2223 *hdr++= (unsigned char)(i / 256);
2224 *hdr++= (unsigned char)(i % 256);
2225 *hdr++= (unsigned char)(c->msgused / 256);
2226 *hdr++= (unsigned char)(c->msgused % 256);
2227 *hdr++= (unsigned char)1; /* support facebook memcached */
2228 *hdr++= (unsigned char)0;
2229 assert(hdr ==
2230 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2231 + UDP_HEADER_SIZE));
2232 }
2233
2234 return 0;
2235 } /* ms_build_udp_headers */
2236
2237
2238 /**
2239 * Transmit the next chunk of data from our list of msgbuf structures.
2240 *
2241 * @param c, pointer of the concurrency
2242 *
2243 * @return TRANSMIT_COMPLETE All done writing.
2244 * TRANSMIT_INCOMPLETE More data remaining to write.
2245 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2246 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2247 */
2248 static int ms_transmit(ms_conn_t *c)
2249 {
2250 assert(c != NULL);
2251
2252 if ((c->msgcurr < c->msgused)
2253 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2254 {
2255 /* Finished writing the current msg; advance to the next. */
2256 c->msgcurr++;
2257 }
2258
2259 if (c->msgcurr < c->msgused)
2260 {
2261 ssize_t res;
2262 struct msghdr *m= &c->msglist[c->msgcurr];
2263
2264 res= sendmsg(c->sfd, m, 0);
2265 if (res > 0)
2266 {
2267 atomic_add_size(&ms_stats.bytes_written, res);
2268
2269 /* We've written some of the data. Remove the completed
2270 * iovec entries from the list of pending writes. */
2271 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2272 {
2273 res-= (ssize_t)m->msg_iov->iov_len;
2274 m->msg_iovlen--;
2275 m->msg_iov++;
2276 }
2277
2278 /* Might have written just part of the last iovec entry;
2279 * adjust it so the next write will do the rest. */
2280 if (res > 0)
2281 {
2282 m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2283 m->msg_iov->iov_len-= (size_t)res;
2284 }
2285 return TRANSMIT_INCOMPLETE;
2286 }
2287 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2288 {
2289 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2290 {
2291 fprintf(stderr, "Couldn't update event.\n");
2292 ms_conn_set_state(c, conn_closing);
2293 return TRANSMIT_HARD_ERROR;
2294 }
2295 return TRANSMIT_SOFT_ERROR;
2296 }
2297
2298 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2299 * we have a real error, on which we close the connection */
2300 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2301
2302 ms_conn_set_state(c, conn_closing);
2303 return TRANSMIT_HARD_ERROR;
2304 }
2305 else
2306 {
2307 return TRANSMIT_COMPLETE;
2308 }
2309 } /* ms_transmit */
2310
2311
2312 /**
2313 * Shrinks a connection's buffers if they're too big. This prevents
2314 * periodic large "mget" response from server chewing lots of client
2315 * memory.
2316 *
2317 * This should only be called in between requests since it can wipe output
2318 * buffers!
2319 *
2320 * @param c, pointer of the concurrency
2321 */
2322 static void ms_conn_shrink(ms_conn_t *c)
2323 {
2324 assert(c != NULL);
2325
2326 if (c->udp)
2327 return;
2328
2329 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2330 {
2331 char *newbuf;
2332
2333 if (c->rcurr != c->rbuf)
2334 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2335
2336 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2337
2338 if (newbuf)
2339 {
2340 c->rbuf= newbuf;
2341 c->rsize= DATA_BUFFER_SIZE;
2342 }
2343 c->rcurr= c->rbuf;
2344 }
2345
2346 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2347 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2348 {
2349 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2350 if (! new_rbuf)
2351 {
2352 c->rudpbuf= new_rbuf;
2353 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2354 }
2355 /* TODO check error condition? */
2356 }
2357
2358 if (c->msgsize > MSG_LIST_HIGHWAT)
2359 {
2360 struct msghdr *newbuf= (struct msghdr *)realloc(
2361 (void *)c->msglist,
2362 MSG_LIST_INITIAL
2363 * sizeof(c->msglist[0]));
2364 if (newbuf)
2365 {
2366 c->msglist= newbuf;
2367 c->msgsize= MSG_LIST_INITIAL;
2368 }
2369 /* TODO check error condition? */
2370 }
2371
2372 if (c->iovsize > IOV_LIST_HIGHWAT)
2373 {
2374 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2375 IOV_LIST_INITIAL
2376 * sizeof(c->iov[0]));
2377 if (newbuf)
2378 {
2379 c->iov= newbuf;
2380 c->iovsize= IOV_LIST_INITIAL;
2381 }
2382 /* TODO check return value */
2383 }
2384 } /* ms_conn_shrink */
2385
2386
2387 /**
2388 * Sets a connection's current state in the state machine. Any special
2389 * processing that needs to happen on certain state transitions can
2390 * happen here.
2391 *
2392 * @param c, pointer of the concurrency
2393 * @param state, connection state
2394 */
2395 static void ms_conn_set_state(ms_conn_t *c, int state)
2396 {
2397 assert(c != NULL);
2398
2399 if (state != c->state)
2400 {
2401 if (state == conn_read)
2402 {
2403 ms_conn_shrink(c);
2404 }
2405 c->state= state;
2406 }
2407 } /* ms_conn_set_state */
2408
2409
2410 /**
2411 * update the event if socks change state. for example: when
2412 * change the listen scoket read event to sock write event, or
2413 * change socket handler, we could call this function.
2414 *
2415 * @param c, pointer of the concurrency
2416 * @param new_flags, new event flags
2417 *
2418 * @return bool, if success, return true, else return false
2419 */
2420 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2421 {
2422 /* default event timeout 10 seconds */
2423 struct timeval t=
2424 {
2425 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
2426 };
2427
2428 assert(c != NULL);
2429
2430 struct event_base *base= c->event.ev_base;
2431 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2432 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2433 {
2434 return true;
2435 }
2436
2437 if (event_del(&c->event) == -1)
2438 {
2439 /* try to delete the event again */
2440 if (event_del(&c->event) == -1)
2441 {
2442 return false;
2443 }
2444 }
2445
2446 event_set(&c->event,
2447 c->sfd,
2448 (short)new_flags,
2449 ms_event_handler,
2450 (void *)c);
2451 event_base_set(base, &c->event);
2452 c->ev_flags= (short)new_flags;
2453
2454 if (c->total_sfds == 1)
2455 {
2456 if (event_add(&c->event, NULL) == -1)
2457 {
2458 return false;
2459 }
2460 }
2461 else
2462 {
2463 if (event_add(&c->event, &t) == -1)
2464 {
2465 return false;
2466 }
2467 }
2468
2469 return true;
2470 } /* ms_update_event */
2471
2472
2473 /**
2474 * If user want to get the expected throughput, we could limit
2475 * the performance of memslap. we could give up some work and
2476 * just wait a short time. The function is used to check this
2477 * case.
2478 *
2479 * @param c, pointer of the concurrency
2480 *
2481 * @return bool, if success, return true, else return false
2482 */
2483 static bool ms_need_yield(ms_conn_t *c)
2484 {
2485 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2486 int64_t tps= 0;
2487 int64_t time_diff= 0;
2488 struct timeval curr_time;
2489 ms_task_t *task= &c->curr_task;
2490
2491 if (ms_setting.expected_tps > 0)
2492 {
2493 gettimeofday(&curr_time, NULL);
2494 time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2495 tps=
2496 (int64_t)((task->get_opt
2497 + task->set_opt) / ((uint64_t)time_diff / 1000000));
2498
2499 /* current throughput is greater than expected throughput */
2500 if (tps > ms_thread->thread_ctx->tps_perconn)
2501 {
2502 return true;
2503 }
2504 }
2505
2506 return false;
2507 } /* ms_need_yield */
2508
2509
2510 /**
2511 * used to update the start time of each operation
2512 *
2513 * @param c, pointer of the concurrency
2514 */
2515 static void ms_update_start_time(ms_conn_t *c)
2516 {
2517 ms_task_item_t *item= c->curr_task.item;
2518
2519 if ((ms_setting.stat_freq > 0) || c->udp
2520 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2521 {
2522 gettimeofday(&c->start_time, NULL);
2523 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2524 {
2525 /* record the current time */
2526 item->client_time= c->start_time.tv_sec;
2527 }
2528 }
2529 } /* ms_update_start_time */
2530
2531
2532 /**
2533 * run the state machine
2534 *
2535 * @param c, pointer of the concurrency
2536 */
2537 static void ms_drive_machine(ms_conn_t *c)
2538 {
2539 bool stop= false;
2540
2541 assert(c != NULL);
2542
2543 while (! stop)
2544 {
2545 switch (c->state)
2546 {
2547 case conn_read:
2548 if (c->readval)
2549 {
2550 if (c->rbytes >= c->rvbytes)
2551 {
2552 ms_complete_nread(c);
2553 break;
2554 }
2555 }
2556 else
2557 {
2558 if (ms_try_read_line(c) != 0)
2559 {
2560 break;
2561 }
2562 }
2563
2564 if (ms_try_read_network(c) != 0)
2565 {
2566 break;
2567 }
2568
2569 /* doesn't read all the response data, wait event wake up */
2570 if (! c->currcmd.isfinish)
2571 {
2572 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2573 {
2574 fprintf(stderr, "Couldn't update event.\n");
2575 ms_conn_set_state(c, conn_closing);
2576 break;
2577 }
2578 stop= true;
2579 break;
2580 }
2581
2582 /* we have no command line and no data to read from network, next write */
2583 ms_conn_set_state(c, conn_write);
2584 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2585
2586 break;
2587
2588 case conn_write:
2589 if (! c->ctnwrite && ms_need_yield(c))
2590 {
2591 usleep(10);
2592
2593 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2594 {
2595 fprintf(stderr, "Couldn't update event.\n");
2596 ms_conn_set_state(c, conn_closing);
2597 break;
2598 }
2599 stop= true;
2600 break;
2601 }
2602
2603 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2604 {
2605 ms_conn_set_state(c, conn_closing);
2606 break;
2607 }
2608
2609 /* record the start time before starting to send data if necessary */
2610 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2611 {
2612 if (c->change_sfd)
2613 {
2614 c->change_sfd= false;
2615 }
2616 ms_update_start_time(c);
2617 }
2618
2619 /* change sfd if necessary */
2620 if (c->change_sfd)
2621 {
2622 c->ctnwrite= true;
2623 stop= true;
2624 break;
2625 }
2626
2627 /* execute task until nothing need be written to network */
2628 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2629 {
2630 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2631 {
2632 fprintf(stderr, "Couldn't update event.\n");
2633 ms_conn_set_state(c, conn_closing);
2634 break;
2635 }
2636 stop= true;
2637 break;
2638 }
2639
2640 switch (ms_transmit(c))
2641 {
2642 case TRANSMIT_COMPLETE:
2643 /* we have no data to write to network, next wait repose */
2644 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2645 {
2646 fprintf(stderr, "Couldn't update event.\n");
2647 ms_conn_set_state(c, conn_closing);
2648 c->ctnwrite= false;
2649 break;
2650 }
2651 ms_conn_set_state(c, conn_read);
2652 c->ctnwrite= false;
2653 stop= true;
2654 break;
2655
2656 case TRANSMIT_INCOMPLETE:
2657 c->ctnwrite= true;
2658 break; /* Continue in state machine. */
2659
2660 case TRANSMIT_HARD_ERROR:
2661 c->ctnwrite= false;
2662 break;
2663
2664 case TRANSMIT_SOFT_ERROR:
2665 c->ctnwrite= true;
2666 stop= true;
2667 break;
2668
2669 default:
2670 break;
2671 } /* switch */
2672
2673 break;
2674
2675 case conn_closing:
2676 /* recovery mode, need reconnect if connection close */
2677 if (ms_setting.reconnect && (! ms_global.time_out
2678 || ((ms_setting.run_time == 0)
2679 && (c->remain_exec_num > 0))))
2680 {
2681 if (ms_reconn(c) != 0)
2682 {
2683 ms_conn_close(c);
2684 stop= true;
2685 break;
2686 }
2687
2688 ms_reset_conn(c, false);
2689
2690 if (c->total_sfds == 1)
2691 {
2692 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2693 {
2694 fprintf(stderr, "Couldn't update event.\n");
2695 ms_conn_set_state(c, conn_closing);
2696 break;
2697 }
2698 }
2699
2700 break;
2701 }
2702 else
2703 {
2704 ms_conn_close(c);
2705 stop= true;
2706 break;
2707 }
2708
2709 default:
2710 assert(0);
2711 } /* switch */
2712 }
2713 } /* ms_drive_machine */
2714
2715
2716 /**
2717 * the event handler of each thread
2718 *
2719 * @param fd, the file descriptor of socket
2720 * @param which, event flag
2721 * @param arg, argument
2722 */
2723 void ms_event_handler(const int fd, const short which, void *arg)
2724 {
2725 ms_conn_t *c= (ms_conn_t *)arg;
2726
2727 assert(c != NULL);
2728
2729 c->which= which;
2730
2731 /* sanity */
2732 if (fd != c->sfd)
2733 {
2734 fprintf(stderr,
2735 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2736 fd,
2737 c->sfd);
2738 ms_conn_close(c);
2739 exit(1);
2740 }
2741 assert(fd == c->sfd);
2742
2743 /* event timeout, close the current connection */
2744 if (c->which == EV_TIMEOUT)
2745 {
2746 ms_conn_set_state(c, conn_closing);
2747 }
2748
2749 ms_drive_machine(c);
2750
2751 /* wait for next event */
2752 } /* ms_event_handler */
2753
2754
2755 /**
2756 * get the next socket descriptor index to run for replication
2757 *
2758 * @param c, pointer of the concurrency
2759 * @param cmd, command(get or set )
2760 *
2761 * @return int, if success, return the index, else return 0
2762 */
2763 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2764 {
2765 int sock_index= -1;
2766 int i= 0;
2767
2768 if (c->total_sfds == 1)
2769 {
2770 return 0;
2771 }
2772
2773 if (ms_setting.rep_write_srv == 0)
2774 {
2775 return sock_index;
2776 }
2777
2778 do
2779 {
2780 if (cmd == CMD_SET)
2781 {
2782 for (i= 0; i < ms_setting.rep_write_srv; i++)
2783 {
2784 if (c->tcpsfd[i] > 0)
2785 {
2786 break;
2787 }
2788 }
2789
2790 if (i == ms_setting.rep_write_srv)
2791 {
2792 /* random get one replication server to read */
2793 sock_index= (int)(random() % c->total_sfds);
2794 }
2795 else
2796 {
2797 /* random get one replication writing server to write */
2798 sock_index= (int)(random() % ms_setting.rep_write_srv);
2799 }
2800 }
2801 else if (cmd == CMD_GET)
2802 {
2803 /* random get one replication server to read */
2804 sock_index= (int)(random() % c->total_sfds);
2805 }
2806 }
2807 while (c->tcpsfd[sock_index] == 0);
2808
2809 return sock_index;
2810 } /* ms_get_rep_sock_index */
2811
2812
2813 /**
2814 * get the next socket descriptor index to run
2815 *
2816 * @param c, pointer of the concurrency
2817 *
2818 * @return int, return the index
2819 */
2820 static int ms_get_next_sock_index(ms_conn_t *c)
2821 {
2822 int sock_index= 0;
2823
2824 do
2825 {
2826 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2827 }
2828 while (c->tcpsfd[sock_index] == 0);
2829
2830 return sock_index;
2831 } /* ms_get_next_sock_index */
2832
2833
2834 /**
2835 * update socket event of the connections
2836 *
2837 * @param c, pointer of the concurrency
2838 *
2839 * @return int, if success, return 0, else return -1
2840 */
2841 static int ms_update_conn_sock_event(ms_conn_t *c)
2842 {
2843 assert(c != NULL);
2844
2845 switch (c->currcmd.cmd)
2846 {
2847 case CMD_SET:
2848 if (ms_setting.facebook_test && c->udp)
2849 {
2850 c->sfd= c->tcpsfd[0];
2851 c->udp= false;
2852 c->change_sfd= true;
2853 }
2854 break;
2855
2856 case CMD_GET:
2857 if (ms_setting.facebook_test && ! c->udp)
2858 {
2859 c->sfd= c->udpsfd;
2860 c->udp= true;
2861 c->change_sfd= true;
2862 }
2863 break;
2864
2865 default:
2866 break;
2867 } /* switch */
2868
2869 if (! c->udp && (c->total_sfds > 1))
2870 {
2871 if (c->cur_idx != c->total_sfds)
2872 {
2873 if (ms_setting.rep_write_srv == 0)
2874 {
2875 c->cur_idx= ms_get_next_sock_index(c);
2876 }
2877 else
2878 {
2879 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2880 }
2881 }
2882 else
2883 {
2884 /* must select the first sock of the connection at the beginning */
2885 c->cur_idx= 0;
2886 }
2887
2888 c->sfd= c->tcpsfd[c->cur_idx];
2889 assert(c->sfd != 0);
2890 c->change_sfd= true;
2891 }
2892
2893 if (c->change_sfd)
2894 {
2895 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2896 {
2897 fprintf(stderr, "Couldn't update event.\n");
2898 ms_conn_set_state(c, conn_closing);
2899 return -1;
2900 }
2901 }
2902
2903 return 0;
2904 } /* ms_update_conn_sock_event */
2905
2906
2907 /**
2908 * for ASCII protocol, this function build the set command
2909 * string and send the command.
2910 *
2911 * @param c, pointer of the concurrency
2912 * @param item, pointer of task item which includes the object
2913 * information
2914 *
2915 * @return int, if success, return 0, else return -1
2916 */
2917 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2918 {
2919 int value_offset;
2920 int write_len;
2921 char *buffer= c->wbuf;
2922
2923 write_len= sprintf(buffer,
2924 " %u %d %d\r\n",
2925 0,
2926 item->exp_time,
2927 item->value_size);
2928
2929 if (write_len > c->wsize)
2930 {
2931 /* ought to be always enough. just fail for simplicity */
2932 fprintf(stderr, "output command line too long.\n");
2933 return -1;
2934 }
2935
2936 if (item->value_offset == INVALID_OFFSET)
2937 {
2938 value_offset= item->key_suffix_offset;
2939 }
2940 else
2941 {
2942 value_offset= item->value_offset;
2943 }
2944
2945 if ((ms_add_iov(c, "set ", 4) != 0)
2946 || (ms_add_iov(c, (char *)&item->key_prefix,
2947 (int)KEY_PREFIX_SIZE) != 0)
2948 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2949 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2950 || (ms_add_iov(c, buffer, write_len) != 0)
2951 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2952 item->value_size) != 0)
2953 || (ms_add_iov(c, "\r\n", 2) != 0)
2954 || (c->udp && (ms_build_udp_headers(c) != 0)))
2955 {
2956 return -1;
2957 }
2958
2959 return 0;
2960 } /* ms_build_ascii_write_buf_set */
2961
2962
2963 /**
2964 * used to send set command to server
2965 *
2966 * @param c, pointer of the concurrency
2967 * @param item, pointer of task item which includes the object
2968 * information
2969 *
2970 * @return int, if success, return 0, else return -1
2971 */
2972 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2973 {
2974 assert(c != NULL);
2975
2976 c->currcmd.cmd= CMD_SET;
2977 c->currcmd.isfinish= false;
2978 c->currcmd.retstat= MCD_FAILURE;
2979
2980 if (ms_update_conn_sock_event(c) != 0)
2981 {
2982 return -1;
2983 }
2984
2985 c->msgcurr= 0;
2986 c->msgused= 0;
2987 c->iovused= 0;
2988 if (ms_add_msghdr(c) != 0)
2989 {
2990 fprintf(stderr, "Out of memory preparing request.");
2991 return -1;
2992 }
2993
2994 /* binary protocol */
2995 if (c->protocol == binary_prot)
2996 {
2997 if (ms_build_bin_write_buf_set(c, item) != 0)
2998 {
2999 return -1;
3000 }
3001 }
3002 else
3003 {
3004 if (ms_build_ascii_write_buf_set(c, item) != 0)
3005 {
3006 return -1;
3007 }
3008 }
3009
3010 atomic_add_size(&ms_stats.obj_bytes,
3011 item->key_size + item->value_size);
3012 atomic_add_size(&ms_stats.cmd_set, 1);
3013
3014 return 0;
3015 } /* ms_mcd_set */
3016
3017
3018 /**
3019 * for ASCII protocol, this function build the get command
3020 * string and send the command.
3021 *
3022 * @param c, pointer of the concurrency
3023 * @param item, pointer of task item which includes the object
3024 * information
3025 *
3026 * @return int, if success, return 0, else return -1
3027 */
3028 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3029 {
3030 if ((ms_add_iov(c, "get ", 4) != 0)
3031 || (ms_add_iov(c, (char *)&item->key_prefix,
3032 (int)KEY_PREFIX_SIZE) != 0)
3033 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3034 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3035 || (ms_add_iov(c, "\r\n", 2) != 0)
3036 || (c->udp && (ms_build_udp_headers(c) != 0)))
3037 {
3038 return -1;
3039 }
3040
3041 return 0;
3042 } /* ms_build_ascii_write_buf_get */
3043
3044
3045 /**
3046 * used to send the get command to server
3047 *
3048 * @param c, pointer of the concurrency
3049 * @param item, pointer of task item which includes the object
3050 * information
3051 * @param verify, whether do verification
3052 *
3053 * @return int, if success, return 0, else return -1
3054 */
3055 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
3056 {
3057 /* verify not supported yet */
3058 UNUSED_ARGUMENT(verify);
3059
3060 assert(c != NULL);
3061
3062 c->currcmd.cmd= CMD_GET;
3063 c->currcmd.isfinish= false;
3064 c->currcmd.retstat= MCD_FAILURE;
3065
3066 if (ms_update_conn_sock_event(c) != 0)
3067 {
3068 return -1;
3069 }
3070
3071 c->msgcurr= 0;
3072 c->msgused= 0;
3073 c->iovused= 0;
3074 if (ms_add_msghdr(c) != 0)
3075 {
3076 fprintf(stderr, "Out of memory preparing request.");
3077 return -1;
3078 }
3079
3080 /* binary protocol */
3081 if (c->protocol == binary_prot)
3082 {
3083 if (ms_build_bin_write_buf_get(c, item) != 0)
3084 {
3085 return -1;
3086 }
3087 }
3088 else
3089 {
3090 if (ms_build_ascii_write_buf_get(c, item) != 0)
3091 {
3092 return -1;
3093 }
3094 }
3095
3096 atomic_add_size(&ms_stats.cmd_get, 1);
3097
3098 return 0;
3099 } /* ms_mcd_get */
3100
3101
3102 /**
3103 * for ASCII protocol, this function build the multi-get command
3104 * string and send the command.
3105 *
3106 * @param c, pointer of the concurrency
3107 *
3108 * @return int, if success, return 0, else return -1
3109 */
3110 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3111 {
3112 ms_task_item_t *item;
3113
3114 if (ms_add_iov(c, "get", 3) != 0)
3115 {
3116 return -1;
3117 }
3118
3119 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3120 {
3121 item= c->mlget_task.mlget_item[i].item;
3122 assert(item != NULL);
3123 if ((ms_add_iov(c, " ", 1) != 0)
3124 || (ms_add_iov(c, (char *)&item->key_prefix,
3125 (int)KEY_PREFIX_SIZE) != 0)
3126 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3127 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3128 {
3129 return -1;
3130 }
3131 }
3132
3133 if ((ms_add_iov(c, "\r\n", 2) != 0)
3134 || (c->udp && (ms_build_udp_headers(c) != 0)))
3135 {
3136 return -1;
3137 }
3138
3139 return 0;
3140 } /* ms_build_ascii_write_buf_mlget */
3141
3142
3143 /**
3144 * used to send the multi-get command to server
3145 *
3146 * @param c, pointer of the concurrency
3147 *
3148 * @return int, if success, return 0, else return -1
3149 */
3150 int ms_mcd_mlget(ms_conn_t *c)
3151 {
3152 ms_task_item_t *item;
3153
3154 assert(c != NULL);
3155 assert(c->mlget_task.mlget_num >= 1);
3156
3157 c->currcmd.cmd= CMD_GET;
3158 c->currcmd.isfinish= false;
3159 c->currcmd.retstat= MCD_FAILURE;
3160
3161 if (ms_update_conn_sock_event(c) != 0)
3162 {
3163 return -1;
3164 }
3165
3166 c->msgcurr= 0;
3167 c->msgused= 0;
3168 c->iovused= 0;
3169 if (ms_add_msghdr(c) != 0)
3170 {
3171 fprintf(stderr, "Out of memory preparing request.");
3172 return -1;
3173 }
3174
3175 /* binary protocol */
3176 if (c->protocol == binary_prot)
3177 {
3178 if (ms_build_bin_write_buf_mlget(c) != 0)
3179 {
3180 return -1;
3181 }
3182 }
3183 else
3184 {
3185 if (ms_build_ascii_write_buf_mlget(c) != 0)
3186 {
3187 return -1;
3188 }
3189 }
3190
3191 /* decrease operation time of each item */
3192 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3193 {
3194 item= c->mlget_task.mlget_item[i].item;
3195 atomic_add_size(&ms_stats.cmd_get, 1);
3196 }
3197
3198 return 0;
3199 } /* ms_mcd_mlget */
3200
3201
3202 /**
3203 * binary protocol support
3204 */
3205
3206 /**
3207 * for binary protocol, parse the response of server
3208 *
3209 * @param c, pointer of the concurrency
3210 *
3211 * @return int, if success, return 0, else return -1
3212 */
3213 static int ms_bin_process_response(ms_conn_t *c)
3214 {
3215 const char *errstr= NULL;
3216
3217 assert(c != NULL);
3218
3219 uint32_t bodylen= c->binary_header.response.bodylen;
3220 uint8_t opcode= c->binary_header.response.opcode;
3221 uint16_t status= c->binary_header.response.status;
3222
3223 if (bodylen > 0)
3224 {
3225 c->rvbytes= (int32_t)bodylen;
3226 c->readval= true;
3227 return 1;
3228 }
3229 else
3230 {
3231 switch (status)
3232 {
3233 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3234 if (opcode == PROTOCOL_BINARY_CMD_SET)
3235 {
3236 c->currcmd.retstat= MCD_STORED;
3237 }
3238 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3239 {
3240 c->currcmd.retstat= MCD_DELETED;
3241 }
3242 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3243 {
3244 c->currcmd.retstat= MCD_END;
3245 }
3246 break;
3247
3248 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3249 errstr= "Out of memory";
3250 c->currcmd.retstat= MCD_SERVER_ERROR;
3251 break;
3252
3253 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3254 errstr= "Unknown command";
3255 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3256 break;
3257
3258 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3259 errstr= "Not found";
3260 c->currcmd.retstat= MCD_NOTFOUND;
3261 break;
3262
3263 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3264 errstr= "Invalid arguments";
3265 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3266 break;
3267
3268 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3269 errstr= "Data exists for key.";
3270 break;
3271
3272 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3273 errstr= "Too large.";
3274 c->currcmd.retstat= MCD_SERVER_ERROR;
3275 break;
3276
3277 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3278 errstr= "Not stored.";
3279 c->currcmd.retstat= MCD_NOTSTORED;
3280 break;
3281
3282 default:
3283 errstr= "Unknown error";
3284 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3285 break;
3286 } /* switch */
3287
3288 if (errstr != NULL)
3289 {
3290 fprintf(stderr, "%s\n", errstr);
3291 }
3292 }
3293
3294 return 0;
3295 } /* ms_bin_process_response */
3296
3297
3298 /* build binary header and add the header to the buffer to send */
3299
3300 /**
3301 * build binary header and add the header to the buffer to send
3302 *
3303 * @param c, pointer of the concurrency
3304 * @param opcode, operation code
3305 * @param hdr_len, length of header
3306 * @param key_len, length of key
3307 * @param body_len. length of body
3308 */
3309 static void ms_add_bin_header(ms_conn_t *c,
3310 uint8_t opcode,
3311 uint8_t hdr_len,
3312 uint16_t key_len,
3313 uint32_t body_len)
3314 {
3315 protocol_binary_request_header *header;
3316
3317 assert(c != NULL);
3318
3319 header= (protocol_binary_request_header *)c->wcurr;
3320
3321 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3322 header->request.opcode= (uint8_t)opcode;
3323 header->request.keylen= htons(key_len);
3324
3325 header->request.extlen= (uint8_t)hdr_len;
3326 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3327 header->request.reserved= 0;
3328
3329 header->request.bodylen= htonl(body_len);
3330 header->request.opaque= 0;
3331 header->request.cas= 0;
3332
3333 ms_add_iov(c, c->wcurr, sizeof(header->request));
3334 } /* ms_add_bin_header */
3335
3336
3337 /**
3338 * add the key to the socket write buffer array
3339 *
3340 * @param c, pointer of the concurrency
3341 * @param item, pointer of task item which includes the object
3342 * information
3343 */
3344 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3345 {
3346 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3347 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3348 item->key_size - (int)KEY_PREFIX_SIZE);
3349 }
3350
3351
3352 /**
3353 * for binary protocol, this function build the set command
3354 * and add the command to send buffer array.
3355 *
3356 * @param c, pointer of the concurrency
3357 * @param item, pointer of task item which includes the object
3358 * information
3359 *
3360 * @return int, if success, return 0, else return -1
3361 */
3362 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3363 {
3364 assert(c->wbuf == c->wcurr);
3365
3366 int value_offset;
3367 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3368 uint16_t keylen= (uint16_t)item->key_size;
3369 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3370 + (uint32_t)keylen + (uint32_t)item->value_size;
3371
3372 ms_add_bin_header(c,
3373 PROTOCOL_BINARY_CMD_SET,
3374 sizeof(rep->message.body),
3375 keylen,
3376 bodylen);
3377 rep->message.body.flags= 0;
3378 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3379 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3380 ms_add_key_to_iov(c, item);
3381
3382 if (item->value_offset == INVALID_OFFSET)
3383 {
3384 value_offset= item->key_suffix_offset;
3385 }
3386 else
3387 {
3388 value_offset= item->value_offset;
3389 }
3390 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3391
3392 return 0;
3393 } /* ms_build_bin_write_buf_set */
3394
3395
3396 /**
3397 * for binary protocol, this function build the get command and
3398 * add the command to send buffer array.
3399 *
3400 * @param c, pointer of the concurrency
3401 * @param item, pointer of task item which includes the object
3402 * information
3403 *
3404 * @return int, if success, return 0, else return -1
3405 */
3406 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3407 {
3408 assert(c->wbuf == c->wcurr);
3409
3410 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3411 (uint32_t)item->key_size);
3412 ms_add_key_to_iov(c, item);
3413
3414 return 0;
3415 } /* ms_build_bin_write_buf_get */
3416
3417
3418 /**
3419 * for binary protocol, this function build the multi-get
3420 * command and add the command to send buffer array.
3421 *
3422 * @param c, pointer of the concurrency
3423 * @param item, pointer of task item which includes the object
3424 * information
3425 *
3426 * @return int, if success, return 0, else return -1
3427 */
3428 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3429 {
3430 ms_task_item_t *item;
3431
3432 assert(c->wbuf == c->wcurr);
3433
3434 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3435 {
3436 item= c->mlget_task.mlget_item[i].item;
3437 assert(item != NULL);
3438
3439 ms_add_bin_header(c,
3440 PROTOCOL_BINARY_CMD_GET,
3441 0,
3442 (uint16_t)item->key_size,
3443 (uint32_t)item->key_size);
3444 ms_add_key_to_iov(c, item);
3445 c->wcurr+= sizeof(protocol_binary_request_get);
3446 }
3447
3448 c->wcurr= c->wbuf;
3449
3450 return 0;
3451 } /* ms_build_bin_write_buf_mlget */