Compiles.
[m6w6/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <arpa/inet.h>
22 #if TIME_WITH_SYS_TIME
23 # include <sys/time.h>
24 # include <time.h>
25 #else
26 # if HAVE_SYS_TIME_H
27 # include <sys/time.h>
28 # else
29 # include <time.h>
30 # endif
31 #endif
32 #include "ms_setting.h"
33 #include "ms_thread.h"
34 #include "ms_atomic.h"
35
36 #ifdef linux
37 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
38 * optimize the conversion functions, but the prototypes generate warnings
39 * from gcc. The conversion methods isn't the bottleneck for my app, so
40 * just remove the warnings by undef'ing the optimization ..
41 */
42 #undef ntohs
43 #undef ntohl
44 #undef htons
45 #undef htonl
46 #endif
47
48 /* for network write */
49 #define TRANSMIT_COMPLETE 0
50 #define TRANSMIT_INCOMPLETE 1
51 #define TRANSMIT_SOFT_ERROR 2
52 #define TRANSMIT_HARD_ERROR 3
53
54 /* for generating key */
55 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
56 #define KEY_PREFIX_MASK 0x1010101010101010
57
58 /* For parse the value length return by server */
59 #define KEY_TOKEN 1
60 #define VALUELEN_TOKEN 3
61
62 /* global increasing counter, to ensure the key prefix unique */
63 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
64
65 /* global increasing counter, generating request id for UDP */
66 static volatile uint32_t udp_request_id= 0;
67
68 extern pthread_key_t ms_thread_key;
69
70 /* generate upd request id */
71 static uint32_t ms_get_udp_request_id(void);
72
73
74 /* connect initialize */
75 static void ms_task_init(ms_conn_t *c);
76 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
77 static int ms_conn_sock_init(ms_conn_t *c);
78 static int ms_conn_event_init(ms_conn_t *c);
79 static int ms_conn_init(ms_conn_t *c,
80 const int init_state,
81 const int read_buffer_size,
82 const bool is_udp);
83 static void ms_warmup_num_init(ms_conn_t *c);
84 static int ms_item_win_init(ms_conn_t *c);
85
86
87 /* connection close */
88 void ms_conn_free(ms_conn_t *c);
89 static void ms_conn_close(ms_conn_t *c);
90
91
92 /* create network connection */
93 static int ms_new_socket(struct addrinfo *ai);
94 static void ms_maximize_sndbuf(const int sfd);
95 static int ms_network_connect(ms_conn_t *c,
96 char *srv_host_name,
97 const int srv_port,
98 const bool is_udp,
99 int *ret_sfd);
100 static int ms_reconn(ms_conn_t *c);
101
102
103 /* read and parse */
104 static int ms_tokenize_command(char *command,
105 token_t *tokens,
106 const int max_tokens);
107 static int ms_ascii_process_line(ms_conn_t *c, char *command);
108 static int ms_try_read_line(ms_conn_t *c);
109 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
110 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
111 static int ms_try_read_network(ms_conn_t *c);
112 static void ms_verify_value(ms_conn_t *c,
113 ms_mlget_task_item_t *mlget_item,
114 char *value,
115 int vlen);
116 static void ms_ascii_complete_nread(ms_conn_t *c);
117 static void ms_bin_complete_nread(ms_conn_t *c);
118 static void ms_complete_nread(ms_conn_t *c);
119
120
121 /* send functions */
122 static int ms_add_msghdr(ms_conn_t *c);
123 static int ms_ensure_iov_space(ms_conn_t *c);
124 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
125 static int ms_build_udp_headers(ms_conn_t *c);
126 static int ms_transmit(ms_conn_t *c);
127
128
129 /* status adjustment */
130 static void ms_conn_shrink(ms_conn_t *c);
131 static void ms_conn_set_state(ms_conn_t *c, int state);
132 static bool ms_update_event(ms_conn_t *c, const int new_flags);
133 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
134 static uint32_t ms_get_next_sock_index(ms_conn_t *c);
135 static int ms_update_conn_sock_event(ms_conn_t *c);
136 static bool ms_need_yield(ms_conn_t *c);
137 static void ms_update_start_time(ms_conn_t *c);
138
139
140 /* main loop */
141 static void ms_drive_machine(ms_conn_t *c);
142 void ms_event_handler(const int fd, const short which, void *arg);
143
144
145 /* ascii protocol */
146 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
147 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
148 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
149
150
151 /* binary protocol */
152 static int ms_bin_process_response(ms_conn_t *c);
153 static void ms_add_bin_header(ms_conn_t *c,
154 uint8_t opcode,
155 uint8_t hdr_len,
156 uint16_t key_len,
157 uint32_t body_len);
158 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
159 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
160 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
161 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
162
163
164 /**
165 * each key has two parts, prefix and suffix. The suffix is a
166 * string random get form the character table. The prefix is a
167 * uint64_t variable. And the prefix must be unique. we use the
168 * prefix to identify a key. And the prefix can't include
169 * character ' ' '\r' '\n' '\0'.
170 *
171 * @return uint64_t
172 */
173 uint64_t ms_get_key_prefix(void)
174 {
175 uint64_t key_prefix;
176
177 pthread_mutex_lock(&ms_global.seq_mutex);
178 key_prefix_seq|= KEY_PREFIX_MASK;
179 key_prefix= key_prefix_seq;
180 key_prefix_seq++;
181 pthread_mutex_unlock(&ms_global.seq_mutex);
182
183 return key_prefix;
184 } /* ms_get_key_prefix */
185
186
187 /**
188 * get an unique udp request id
189 *
190 * @return an unique UDP request id
191 */
192 static uint32_t ms_get_udp_request_id(void)
193 {
194 return atomic_add_32_nv(&udp_request_id, 1);
195 }
196
197
198 /**
199 * initialize current task structure
200 *
201 * @param c, pointer of the concurrency
202 */
203 static void ms_task_init(ms_conn_t *c)
204 {
205 c->curr_task.cmd= CMD_NULL;
206 c->curr_task.item= 0;
207 c->curr_task.verify= false;
208 c->curr_task.finish_verify= true;
209 c->curr_task.get_miss= true;
210
211 c->curr_task.get_opt= 0;
212 c->curr_task.set_opt= 0;
213 c->curr_task.cycle_undo_get= 0;
214 c->curr_task.cycle_undo_set= 0;
215 c->curr_task.verified_get= 0;
216 c->curr_task.overwrite_set= 0;
217 } /* ms_task_init */
218
219
220 /**
221 * initialize udp for the connection structure
222 *
223 * @param c, pointer of the concurrency
224 * @param is_udp, whether it's udp
225 *
226 * @return int, if success, return 0, else return -1
227 */
228 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
229 {
230 c->hdrbuf= 0;
231 c->rudpbuf= 0;
232 c->udppkt= 0;
233
234 c->rudpsize= UDP_DATA_BUFFER_SIZE;
235 c->hdrsize= 0;
236
237 c->rudpbytes= 0;
238 c->packets= 0;
239 c->recvpkt= 0;
240 c->pktcurr= 0;
241 c->ordcurr= 0;
242
243 c->udp= is_udp;
244
245 if (c->udp || (! c->udp && ms_setting.facebook_test))
246 {
247 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
248 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
249
250 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
251 {
252 if (c->rudpbuf != NULL)
253 free(c->rudpbuf);
254 if (c->udppkt != NULL)
255 free(c->udppkt);
256 fprintf(stderr, "malloc()\n");
257 return -1;
258 }
259 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
260 }
261
262 return 0;
263 } /* ms_conn_udp_init */
264
265
266 /**
267 * initialize the connection structure
268 *
269 * @param c, pointer of the concurrency
270 * @param init_state, (conn_read, conn_write, conn_closing)
271 * @param read_buffer_size
272 * @param is_udp, whether it's udp
273 *
274 * @return int, if success, return 0, else return -1
275 */
276 static int ms_conn_init(ms_conn_t *c,
277 const int init_state,
278 const int read_buffer_size,
279 const bool is_udp)
280 {
281 assert(c != NULL);
282
283 c->rbuf= c->wbuf= 0;
284 c->iov= 0;
285 c->msglist= 0;
286
287 c->rsize= read_buffer_size;
288 c->wsize= WRITE_BUFFER_SIZE;
289 c->iovsize= IOV_LIST_INITIAL;
290 c->msgsize= MSG_LIST_INITIAL;
291
292 /* for replication, each connection need connect all the server */
293 if (ms_setting.rep_write_srv > 0)
294 {
295 c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
296 }
297 else
298 {
299 c->total_sfds= ms_setting.sock_per_conn;
300 }
301 c->alive_sfds= 0;
302
303 c->rbuf= (char *)malloc((size_t)c->rsize);
304 c->wbuf= (char *)malloc((size_t)c->wsize);
305 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
306 c->msglist= (struct msghdr *)malloc(
307 sizeof(struct msghdr) * (size_t)c->msgsize);
308 if (ms_setting.mult_key_num > 1)
309 {
310 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
311 malloc(
312 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
313 }
314 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
315
316 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
317 || (c->msglist == NULL) || (c->tcpsfd == NULL)
318 || ((ms_setting.mult_key_num > 1)
319 && (c->mlget_task.mlget_item == NULL)))
320 {
321 if (c->rbuf != NULL)
322 free(c->rbuf);
323 if (c->wbuf != NULL)
324 free(c->wbuf);
325 if (c->iov != NULL)
326 free(c->iov);
327 if (c->msglist != NULL)
328 free(c->msglist);
329 if (c->mlget_task.mlget_item != NULL)
330 free(c->mlget_task.mlget_item);
331 if (c->tcpsfd != NULL)
332 free(c->tcpsfd);
333 fprintf(stderr, "malloc()\n");
334 return -1;
335 }
336
337 c->state= init_state;
338 c->rvbytes= 0;
339 c->rbytes= 0;
340 c->rcurr= c->rbuf;
341 c->wcurr= c->wbuf;
342 c->iovused= 0;
343 c->msgcurr= 0;
344 c->msgused= 0;
345 c->cur_idx= c->total_sfds; /* default index is a invalid value */
346
347 c->ctnwrite= false;
348 c->readval= false;
349 c->change_sfd= false;
350
351 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
352 c->precmd.isfinish= true; /* default the previous command finished */
353 c->currcmd.isfinish= false;
354 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
355 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
356
357 c->mlget_task.mlget_num= 0;
358 c->mlget_task.value_index= -1; /* default invalid value */
359
360 if (ms_setting.binary_prot)
361 {
362 c->protocol= binary_prot;
363 }
364 else
365 {
366 c->protocol= ascii_prot;
367 }
368
369 /* initialize udp */
370 if (ms_conn_udp_init(c, is_udp) != 0)
371 {
372 return -1;
373 }
374
375 /* initialize task */
376 ms_task_init(c);
377
378 if (! (ms_setting.facebook_test && is_udp))
379 {
380 atomic_add_32(&ms_stats.active_conns, 1);
381 }
382
383 return 0;
384 } /* ms_conn_init */
385
386
387 /**
388 * when doing 100% get operation, it could preset some objects
389 * to warmup the server. this function is used to initialize the
390 * number of the objects to preset.
391 *
392 * @param c, pointer of the concurrency
393 */
394 static void ms_warmup_num_init(ms_conn_t *c)
395 {
396 /* no set operation, preset all the items in the window */
397 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
398 {
399 c->warmup_num= c->win_size;
400 c->remain_warmup_num= c->warmup_num;
401 }
402 else
403 {
404 c->warmup_num= 0;
405 c->remain_warmup_num= c->warmup_num;
406 }
407 } /* ms_warmup_num_init */
408
409
410 /**
411 * each connection has an item window, this function initialize
412 * the window. The window is used to generate task.
413 *
414 * @param c, pointer of the concurrency
415 *
416 * @return int, if success, return 0, else return -1
417 */
418 static int ms_item_win_init(ms_conn_t *c)
419 {
420 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
421 int exp_cnt= 0;
422
423 c->win_size= (int)ms_setting.win_size;
424 c->set_cursor= 0;
425 c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
426 c->remain_exec_num= c->exec_num;
427
428 c->item_win= (ms_task_item_t *)malloc(
429 sizeof(ms_task_item_t) * (size_t)c->win_size);
430 if (c->item_win == NULL)
431 {
432 fprintf(stderr, "Can't allocate task item array for conn.\n");
433 return -1;
434 }
435 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
436
437 for (int i= 0; i < c->win_size; i++)
438 {
439 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
440 c->item_win[i].key_prefix= ms_get_key_prefix();
441 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
442 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
443 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
444 c->item_win[i].client_time= 0;
445
446 /* set expire time base on the proportion */
447 if (exp_cnt < ms_setting.exp_ver_per * i)
448 {
449 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
450 exp_cnt++;
451 }
452 else
453 {
454 c->item_win[i].exp_time= 0;
455 }
456 }
457
458 ms_warmup_num_init(c);
459
460 return 0;
461 } /* ms_item_win_init */
462
463
464 /**
465 * each connection structure can include one or more sock
466 * handlers. this function create these socks and connect the
467 * server(s).
468 *
469 * @param c, pointer of the concurrency
470 *
471 * @return int, if success, return 0, else return -1
472 */
473 static int ms_conn_sock_init(ms_conn_t *c)
474 {
475 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
476 uint32_t i;
477 int ret_sfd;
478 uint32_t srv_idx= 0;
479
480 assert(c != NULL);
481 assert(c->tcpsfd != NULL);
482
483 for (i= 0; i < c->total_sfds; i++)
484 {
485 ret_sfd= 0;
486 if (ms_setting.rep_write_srv > 0)
487 {
488 /* for replication, each connection need connect all the server */
489 srv_idx= i % ms_setting.srv_cnt;
490 }
491 else
492 {
493 /* all the connections in a thread connects the same server */
494 srv_idx= ms_thread->thread_ctx->srv_idx;
495 }
496
497 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
498 ms_setting.servers[srv_idx].srv_port,
499 ms_setting.udp, &ret_sfd) != 0)
500 {
501 break;
502 }
503
504 if (i == 0)
505 {
506 c->sfd= ret_sfd;
507 }
508
509 if (! ms_setting.udp)
510 {
511 c->tcpsfd[i]= ret_sfd;
512 }
513
514 c->alive_sfds++;
515 }
516
517 /* initialize udp sock handler if necessary */
518 if (ms_setting.facebook_test)
519 {
520 ret_sfd= 0;
521 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
522 ms_setting.servers[srv_idx].srv_port,
523 true, &ret_sfd) != 0)
524 {
525 c->udpsfd= 0;
526 }
527 else
528 {
529 c->udpsfd= ret_sfd;
530 }
531 }
532
533 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
534 {
535 if (ms_setting.udp)
536 {
537 close(c->sfd);
538 }
539 else
540 {
541 for (uint32_t j= 0; j < i; j++)
542 {
543 close(c->tcpsfd[j]);
544 }
545 }
546
547 if (c->udpsfd != 0)
548 {
549 close(c->udpsfd);
550 }
551
552 return -1;
553 }
554
555 return 0;
556 } /* ms_conn_sock_init */
557
558
559 /**
560 * each connection is managed by libevent, this function
561 * initialize the event of the connection structure.
562 *
563 * @param c, pointer of the concurrency
564 *
565 * @return int, if success, return 0, else return -1
566 */
567 static int ms_conn_event_init(ms_conn_t *c)
568 {
569 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
570 short event_flags= EV_WRITE | EV_PERSIST;
571
572 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
573 event_base_set(ms_thread->base, &c->event);
574 c->ev_flags= event_flags;
575
576 if (event_add(&c->event, NULL) == -1)
577 {
578 return -1;
579 }
580
581 return 0;
582 } /* ms_conn_event_init */
583
584
585 /**
586 * setup a connection, each connection structure of each
587 * thread must call this function to initialize.
588 *
589 * @param c, pointer of the concurrency
590 *
591 * @return int, if success, return 0, else return -1
592 */
593 int ms_setup_conn(ms_conn_t *c)
594 {
595 if (ms_item_win_init(c) != 0)
596 {
597 return -1;
598 }
599
600 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
601 {
602 return -1;
603 }
604
605 if (ms_conn_sock_init(c) != 0)
606 {
607 return -1;
608 }
609
610 if (ms_conn_event_init(c) != 0)
611 {
612 return -1;
613 }
614
615 return 0;
616 } /* ms_setup_conn */
617
618
619 /**
620 * Frees a connection.
621 *
622 * @param c, pointer of the concurrency
623 */
624 void ms_conn_free(ms_conn_t *c)
625 {
626 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
627 if (c != NULL)
628 {
629 if (c->hdrbuf != NULL)
630 free(c->hdrbuf);
631 if (c->msglist != NULL)
632 free(c->msglist);
633 if (c->rbuf != NULL)
634 free(c->rbuf);
635 if (c->wbuf != NULL)
636 free(c->wbuf);
637 if (c->iov != NULL)
638 free(c->iov);
639 if (c->mlget_task.mlget_item != NULL)
640 free(c->mlget_task.mlget_item);
641 if (c->rudpbuf != NULL)
642 free(c->rudpbuf);
643 if (c->udppkt != NULL)
644 free(c->udppkt);
645 if (c->item_win != NULL)
646 free(c->item_win);
647 if (c->tcpsfd != NULL)
648 free(c->tcpsfd);
649
650 if (--ms_thread->nactive_conn == 0)
651 {
652 free(ms_thread->conn);
653 }
654 }
655 } /* ms_conn_free */
656
657
658 /**
659 * close a connection
660 *
661 * @param c, pointer of the concurrency
662 */
663 static void ms_conn_close(ms_conn_t *c)
664 {
665 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
666 assert(c != NULL);
667
668 /* delete the event, the socket and the connection */
669 event_del(&c->event);
670
671 for (uint32_t i= 0; i < c->total_sfds; i++)
672 {
673 if (c->tcpsfd[i] > 0)
674 {
675 close(c->tcpsfd[i]);
676 }
677 }
678 c->sfd= 0;
679
680 if (ms_setting.facebook_test)
681 {
682 close(c->udpsfd);
683 }
684
685 atomic_dec_32(&ms_stats.active_conns);
686
687 ms_conn_free(c);
688
689 if (ms_setting.run_time == 0)
690 {
691 pthread_mutex_lock(&ms_global.run_lock.lock);
692 ms_global.run_lock.count++;
693 pthread_cond_signal(&ms_global.run_lock.cond);
694 pthread_mutex_unlock(&ms_global.run_lock.lock);
695 }
696
697 if (ms_thread->nactive_conn == 0)
698 {
699 pthread_exit(NULL);
700 }
701 } /* ms_conn_close */
702
703
704 /**
705 * create a new sock
706 *
707 * @param ai, server address information
708 *
709 * @return int, if success, return 0, else return -1
710 */
711 static int ms_new_socket(struct addrinfo *ai)
712 {
713 int sfd;
714
715 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
716 {
717 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
718 return -1;
719 }
720
721 return sfd;
722 } /* ms_new_socket */
723
724
725 /**
726 * Sets a socket's send buffer size to the maximum allowed by the system.
727 *
728 * @param sfd, file descriptor of socket
729 */
730 static void ms_maximize_sndbuf(const int sfd)
731 {
732 socklen_t intsize= sizeof(int);
733 unsigned int last_good= 0;
734 unsigned int min, max, avg;
735 unsigned int old_size;
736
737 /* Start with the default size. */
738 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
739 {
740 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
741 return;
742 }
743
744 /* Binary-search for the real maximum. */
745 min= old_size;
746 max= MAX_SENDBUF_SIZE;
747
748 while (min <= max)
749 {
750 avg= ((unsigned int)(min + max)) / 2;
751 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
752 {
753 last_good= avg;
754 min= avg + 1;
755 }
756 else
757 {
758 max= avg - 1;
759 }
760 }
761 } /* ms_maximize_sndbuf */
762
763
764 /**
765 * socket connects the server
766 *
767 * @param c, pointer of the concurrency
768 * @param srv_host_name, the host name of the server
769 * @param srv_port, port of server
770 * @param is_udp, whether it's udp
771 * @param ret_sfd, the connected socket file descriptor
772 *
773 * @return int, if success, return 0, else return -1
774 */
775 static int ms_network_connect(ms_conn_t *c,
776 char *srv_host_name,
777 const int srv_port,
778 const bool is_udp,
779 int *ret_sfd)
780 {
781 int sfd;
782 struct linger ling=
783 {
784 0, 0
785 };
786 struct addrinfo *ai;
787 struct addrinfo *next;
788 struct addrinfo hints;
789 char port_buf[NI_MAXSERV];
790 int error;
791 int success= 0;
792
793 int flags= 1;
794
795 /*
796 * the memset call clears nonstandard fields in some impementations
797 * that otherwise mess things up.
798 */
799 memset(&hints, 0, sizeof(hints));
800 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
801 if (is_udp)
802 {
803 hints.ai_protocol= IPPROTO_UDP;
804 hints.ai_socktype= SOCK_DGRAM;
805 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
806 }
807 else
808 {
809 hints.ai_family= AF_UNSPEC;
810 hints.ai_protocol= IPPROTO_TCP;
811 hints.ai_socktype= SOCK_STREAM;
812 }
813
814 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
815 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
816 if (error != 0)
817 {
818 if (error != EAI_SYSTEM)
819 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
820 else
821 perror("getaddrinfo()\n");
822
823 return -1;
824 }
825
826 for (next= ai; next; next= next->ai_next)
827 {
828 if ((sfd= ms_new_socket(next)) == -1)
829 {
830 freeaddrinfo(ai);
831 return -1;
832 }
833
834 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
835 if (is_udp)
836 {
837 ms_maximize_sndbuf(sfd);
838 }
839 else
840 {
841 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
842 sizeof(flags));
843 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
844 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
845 sizeof(flags));
846 }
847
848 if (is_udp)
849 {
850 c->srv_recv_addr_size= sizeof(struct sockaddr);
851 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
852 }
853 else
854 {
855 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
856 {
857 close(sfd);
858 freeaddrinfo(ai);
859 return -1;
860 }
861 }
862
863 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
864 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
865 {
866 fprintf(stderr, "setting O_NONBLOCK\n");
867 close(sfd);
868 freeaddrinfo(ai);
869 return -1;
870 }
871
872 if (ret_sfd != NULL)
873 {
874 *ret_sfd= sfd;
875 }
876
877 success++;
878 }
879
880 freeaddrinfo(ai);
881
882 /* Return zero if we detected no errors in starting up connections */
883 return success == 0;
884 } /* ms_network_connect */
885
886
887 /**
888 * reconnect a disconnected sock
889 *
890 * @param c, pointer of the concurrency
891 *
892 * @return int, if success, return 0, else return -1
893 */
894 static int ms_reconn(ms_conn_t *c)
895 {
896 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
897 uint32_t srv_idx= 0;
898 uint32_t srv_conn_cnt= 0;
899
900 if (ms_setting.rep_write_srv > 0)
901 {
902 srv_idx= c->cur_idx % ms_setting.srv_cnt;
903 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
904 }
905 else
906 {
907 srv_idx= ms_thread->thread_ctx->srv_idx;
908 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
909 }
910
911 /* close the old socket handler */
912 close(c->sfd);
913 c->tcpsfd[c->cur_idx]= 0;
914
915 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
916 % srv_conn_cnt == 0)
917 {
918 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
919 fprintf(stderr, "Server %s:%d disconnect\n",
920 ms_setting.servers[srv_idx].srv_host_name,
921 ms_setting.servers[srv_idx].srv_port);
922 }
923
924 if (ms_setting.rep_write_srv > 0)
925 {
926 uint32_t i= 0;
927
928 for (i= 0; i < c->total_sfds; i++)
929 {
930 if (c->tcpsfd[i] != 0)
931 {
932 break;
933 }
934 }
935
936 /* all socks disconnect */
937 if (i == c->total_sfds)
938 {
939 return -1;
940 }
941 }
942 else
943 {
944 do
945 {
946 /* reconnect success, break the loop */
947 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
948 ms_setting.servers[srv_idx].srv_port,
949 ms_setting.udp, &c->sfd) == 0)
950 {
951 c->tcpsfd[c->cur_idx]= c->sfd;
952 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
953 % (uint32_t)srv_conn_cnt == 0)
954 {
955 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
956 int reconn_time=
957 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
958 - ms_setting.servers[srv_idx].disconn_time
959 .tv_sec);
960 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
961 ms_setting.servers[srv_idx].srv_host_name,
962 ms_setting.servers[srv_idx].srv_port, reconn_time);
963 }
964 break;
965 }
966
967 if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
968 {
969 /* wait a second and reconnect */
970 sleep(1);
971 }
972 }
973 while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
974 }
975
976 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
977 {
978 c->sfd= 0;
979 c->alive_sfds--;
980 }
981
982 return 0;
983 } /* ms_reconn */
984
985
986 /**
987 * reconnect several disconnected socks in the connection
988 * structure, the ever-1-second timer of the thread will check
989 * whether some socks in the connections disconnect. if
990 * disconnect, reconnect the sock.
991 *
992 * @param c, pointer of the concurrency
993 *
994 * @return int, if success, return 0, else return -1
995 */
996 int ms_reconn_socks(ms_conn_t *c)
997 {
998 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
999 uint32_t srv_idx= 0;
1000 int ret_sfd= 0;
1001 uint32_t srv_conn_cnt= 0;
1002 struct timeval cur_time;
1003
1004 assert(c != NULL);
1005
1006 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1007 {
1008 return 0;
1009 }
1010
1011 for (uint32_t i= 0; i < c->total_sfds; i++)
1012 {
1013 if (c->tcpsfd[i] == 0)
1014 {
1015 gettimeofday(&cur_time, NULL);
1016
1017 /**
1018 * For failover test of replication, reconnect the socks after
1019 * it disconnects more than 5 seconds, Otherwise memslap will
1020 * block at connect() function and the work threads can't work
1021 * in this interval.
1022 */
1023 if (cur_time.tv_sec
1024 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1025 {
1026 break;
1027 }
1028
1029 if (ms_setting.rep_write_srv > 0)
1030 {
1031 srv_idx= i % ms_setting.srv_cnt;
1032 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
1033 }
1034 else
1035 {
1036 srv_idx= ms_thread->thread_ctx->srv_idx;
1037 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1038 }
1039
1040 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1041 ms_setting.servers[srv_idx].srv_port,
1042 ms_setting.udp, &ret_sfd) == 0)
1043 {
1044 c->tcpsfd[i]= ret_sfd;
1045 c->alive_sfds++;
1046
1047 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1048 % (uint32_t)srv_conn_cnt == 0)
1049 {
1050 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1051 int reconn_time=
1052 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1053 - ms_setting.servers[srv_idx].disconn_time
1054 .tv_sec);
1055 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1056 ms_setting.servers[srv_idx].srv_host_name,
1057 ms_setting.servers[srv_idx].srv_port, reconn_time);
1058 }
1059 }
1060 }
1061 }
1062
1063 return 0;
1064 } /* ms_reconn_socks */
1065
1066
1067 /**
1068 * Tokenize the command string by replacing whitespace with '\0' and update
1069 * the token array tokens with pointer to start of each token and length.
1070 * Returns total number of tokens. The last valid token is the terminal
1071 * token (value points to the first unprocessed character of the string and
1072 * length zero).
1073 *
1074 * Usage example:
1075 *
1076 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1077 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1078 * ...
1079 * }
1080 * ncommand = tokens[ix].value - command;
1081 * command = tokens[ix].value;
1082 * }
1083 *
1084 * @param command, the command string to token
1085 * @param tokens, array to store tokens
1086 * @param max_tokens, maximum tokens number
1087 *
1088 * @return int, the number of tokens
1089 */
1090 static int ms_tokenize_command(char *command,
1091 token_t *tokens,
1092 const int max_tokens)
1093 {
1094 char *s, *e;
1095 int ntokens= 0;
1096
1097 assert(command != NULL && tokens != NULL && max_tokens > 1);
1098
1099 for (s= e= command; ntokens < max_tokens - 1; ++e)
1100 {
1101 if (*e == ' ')
1102 {
1103 if (s != e)
1104 {
1105 tokens[ntokens].value= s;
1106 tokens[ntokens].length= (size_t)(e - s);
1107 ntokens++;
1108 *e= '\0';
1109 }
1110 s= e + 1;
1111 }
1112 else if (*e == '\0')
1113 {
1114 if (s != e)
1115 {
1116 tokens[ntokens].value= s;
1117 tokens[ntokens].length= (size_t)(e - s);
1118 ntokens++;
1119 }
1120
1121 break; /* string end */
1122 }
1123 }
1124
1125 return ntokens;
1126 } /* ms_tokenize_command */
1127
1128
1129 /**
1130 * parse the response of server.
1131 *
1132 * @param c, pointer of the concurrency
1133 * @param command, the string responded by server
1134 *
1135 * @return int, if the command completed return 0, else return
1136 * -1
1137 */
1138 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1139 {
1140 int ret= 0;
1141 int64_t value_len;
1142 char *buffer= command;
1143
1144 assert(c != NULL);
1145
1146 /**
1147 * for command get, we store the returned value into local buffer
1148 * then continue in ms_complete_nread().
1149 */
1150
1151 switch (buffer[0])
1152 {
1153 case 'V': /* VALUE || VERSION */
1154 if (buffer[1] == 'A') /* VALUE */
1155 {
1156 token_t tokens[MAX_TOKENS];
1157 ms_tokenize_command(command, tokens, MAX_TOKENS);
1158 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1159 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1160
1161 /*
1162 * We read the \r\n into the string since not doing so is more
1163 * cycles then the waster of memory to do so.
1164 *
1165 * We are null terminating through, which will most likely make
1166 * some people lazy about using the return length.
1167 */
1168 c->rvbytes= (int)(value_len + 2);
1169 c->readval= true;
1170 ret= -1;
1171 }
1172
1173 break;
1174
1175 case 'O': /* OK */
1176 c->currcmd.retstat= MCD_SUCCESS;
1177
1178 case 'S': /* STORED STATS SERVER_ERROR */
1179 if (buffer[2] == 'A') /* STORED STATS */
1180 { /* STATS*/
1181 c->currcmd.retstat= MCD_STAT;
1182 }
1183 else if (buffer[1] == 'E')
1184 {
1185 /* SERVER_ERROR */
1186 printf("<%d %s\n", c->sfd, buffer);
1187
1188 c->currcmd.retstat= MCD_SERVER_ERROR;
1189 }
1190 else if (buffer[1] == 'T')
1191 {
1192 /* STORED */
1193 c->currcmd.retstat= MCD_STORED;
1194 }
1195 else
1196 {
1197 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1198 }
1199 break;
1200
1201 case 'D': /* DELETED DATA */
1202 if (buffer[1] == 'E')
1203 {
1204 c->currcmd.retstat= MCD_DELETED;
1205 }
1206 else
1207 {
1208 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1209 }
1210
1211 break;
1212
1213 case 'N': /* NOT_FOUND NOT_STORED*/
1214 if (buffer[4] == 'F')
1215 {
1216 c->currcmd.retstat= MCD_NOTFOUND;
1217 }
1218 else if (buffer[4] == 'S')
1219 {
1220 printf("<%d %s\n", c->sfd, buffer);
1221 c->currcmd.retstat= MCD_NOTSTORED;
1222 }
1223 else
1224 {
1225 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1226 }
1227 break;
1228
1229 case 'E': /* PROTOCOL ERROR or END */
1230 if (buffer[1] == 'N')
1231 {
1232 /* END */
1233 c->currcmd.retstat= MCD_END;
1234 }
1235 else if (buffer[1] == 'R')
1236 {
1237 printf("<%d ERROR\n", c->sfd);
1238 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1239 }
1240 else if (buffer[1] == 'X')
1241 {
1242 c->currcmd.retstat= MCD_DATA_EXISTS;
1243 printf("<%d %s\n", c->sfd, buffer);
1244 }
1245 else
1246 {
1247 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1248 }
1249 break;
1250
1251 case 'C': /* CLIENT ERROR */
1252 printf("<%d %s\n", c->sfd, buffer);
1253 c->currcmd.retstat= MCD_CLIENT_ERROR;
1254 break;
1255
1256 default:
1257 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1258 break;
1259 } /* switch */
1260
1261 return ret;
1262 } /* ms_ascii_process_line */
1263
1264
1265 /**
1266 * after one operation completes, reset the concurrency
1267 *
1268 * @param c, pointer of the concurrency
1269 * @param timeout, whether it's timeout
1270 */
1271 void ms_reset_conn(ms_conn_t *c, bool timeout)
1272 {
1273 assert(c != NULL);
1274
1275 if (c->udp)
1276 {
1277 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1278 {
1279 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
1280 }
1281
1282 c->packets= 0;
1283 c->recvpkt= 0;
1284 c->pktcurr= 0;
1285 c->ordcurr= 0;
1286 c->rudpbytes= 0;
1287 }
1288 c->currcmd.isfinish= true;
1289 c->ctnwrite= false;
1290 c->rbytes= 0;
1291 c->rcurr= c->rbuf;
1292 c->msgcurr = 0;
1293 c->msgused = 0;
1294 c->iovused = 0;
1295 ms_conn_set_state(c, conn_write);
1296 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1297
1298 if (timeout)
1299 {
1300 ms_drive_machine(c);
1301 }
1302 } /* ms_reset_conn */
1303
1304
1305 /**
1306 * if we have a complete line in the buffer, process it.
1307 *
1308 * @param c, pointer of the concurrency
1309 *
1310 * @return int, if success, return 0, else return -1
1311 */
1312 static int ms_try_read_line(ms_conn_t *c)
1313 {
1314 if (c->protocol == binary_prot)
1315 {
1316 /* Do we have the complete packet header? */
1317 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1318 {
1319 /* need more data! */
1320 return 0;
1321 }
1322 else
1323 {
1324 #ifdef NEED_ALIGN
1325 if (((long)(c->rcurr)) % 8 != 0)
1326 {
1327 /* must realign input buffer */
1328 memmove(c->rbuf, c->rcurr, c->rbytes);
1329 c->rcurr= c->rbuf;
1330 if (settings.verbose)
1331 {
1332 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1333 }
1334 }
1335 #endif
1336 protocol_binary_response_header *rsp;
1337 rsp= (protocol_binary_response_header *)c->rcurr;
1338
1339 c->binary_header= *rsp;
1340 c->binary_header.response.extlen= rsp->response.extlen;
1341 c->binary_header.response.keylen= ntohs(rsp->response.keylen);
1342 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1343 c->binary_header.response.status= ntohs(rsp->response.status);
1344
1345 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1346 {
1347 fprintf(stderr, "Invalid magic: %x\n",
1348 c->binary_header.response.magic);
1349 ms_conn_set_state(c, conn_closing);
1350 return 0;
1351 }
1352
1353 /* process this complete response */
1354 if (ms_bin_process_response(c) == 0)
1355 {
1356 /* current operation completed */
1357 ms_reset_conn(c, false);
1358 return -1;
1359 }
1360 else
1361 {
1362 c->rbytes-= (int32_t)sizeof(c->binary_header);
1363 c->rcurr+= sizeof(c->binary_header);
1364 }
1365 }
1366 }
1367 else
1368 {
1369 char *el, *cont;
1370
1371 assert(c != NULL);
1372 assert(c->rcurr <= (c->rbuf + c->rsize));
1373
1374 if (c->rbytes == 0)
1375 return 0;
1376
1377 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1378 if (! el)
1379 return 0;
1380
1381 cont= el + 1;
1382 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1383 {
1384 el--;
1385 }
1386 *el= '\0';
1387
1388 assert(cont <= (c->rcurr + c->rbytes));
1389
1390 /* process this complete line */
1391 if (ms_ascii_process_line(c, c->rcurr) == 0)
1392 {
1393 /* current operation completed */
1394 ms_reset_conn(c, false);
1395 return -1;
1396 }
1397 else
1398 {
1399 /* current operation didn't complete */
1400 c->rbytes-= (int32_t)(cont - c->rcurr);
1401 c->rcurr= cont;
1402 }
1403
1404 assert(c->rcurr <= (c->rbuf + c->rsize));
1405 }
1406
1407 return -1;
1408 } /* ms_try_read_line */
1409
1410
1411 /**
1412 * because the packet of UDP can't ensure the order, the
1413 * function is used to sort the received udp packet.
1414 *
1415 * @param c, pointer of the concurrency
1416 * @param buf, the buffer to store the ordered packages data
1417 * @param rbytes, the maximum capacity of the buffer
1418 *
1419 * @return int, if success, return the copy bytes, else return
1420 * -1
1421 */
1422 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1423 {
1424 int len= 0;
1425 int wbytes= 0;
1426 uint16_t req_id= 0;
1427 uint16_t seq_num= 0;
1428 uint16_t packets= 0;
1429 unsigned char *header= NULL;
1430
1431 /* no enough data */
1432 assert(c != NULL);
1433 assert(buf != NULL);
1434 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1435
1436 /* calculate received packets count */
1437 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1438 {
1439 /* the last packet has some data */
1440 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1441 }
1442 else
1443 {
1444 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1445 }
1446
1447 /* get the total packets count if necessary */
1448 if (c->packets == 0)
1449 {
1450 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1451 }
1452
1453 /* build the ordered packet array */
1454 for (int i= c->pktcurr; i < c->recvpkt; i++)
1455 {
1456 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1457 req_id= (uint16_t)HEADER_TO_REQID(header);
1458 assert(req_id == c->request_id % (1 << 16));
1459
1460 packets= (uint16_t)HEADER_TO_PACKETS(header);
1461 assert(c->packets == HEADER_TO_PACKETS(header));
1462
1463 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1464 c->udppkt[seq_num].header= header;
1465 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1466
1467 if (i == c->recvpkt - 1)
1468 {
1469 /* last received packet */
1470 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1471 {
1472 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1473 c->pktcurr++;
1474 }
1475 else
1476 {
1477 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1478 - UDP_HEADER_SIZE;
1479 }
1480 }
1481 else
1482 {
1483 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1484 c->pktcurr++;
1485 }
1486 }
1487
1488 for (int i= c->ordcurr; i < c->recvpkt; i++)
1489 {
1490 /* there is some data to copy */
1491 if ((c->udppkt[i].data != NULL)
1492 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1493 {
1494 header= c->udppkt[i].header;
1495 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1496 if (len > rbytes - wbytes)
1497 {
1498 len= rbytes - wbytes;
1499 }
1500
1501 assert(len <= rbytes - wbytes);
1502 assert(i == HEADER_TO_SEQNUM(header));
1503
1504 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1505 (size_t)len);
1506 wbytes+= len;
1507 c->udppkt[i].copybytes+= len;
1508
1509 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1510 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1511 {
1512 /* finish copying all the data of this packet, next */
1513 c->ordcurr++;
1514 }
1515
1516 /* last received packet, and finish copying all the data */
1517 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1518 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1519 {
1520 break;
1521 }
1522
1523 /* no space to copy data */
1524 if (wbytes >= rbytes)
1525 {
1526 break;
1527 }
1528
1529 /* it doesn't finish reading all the data of the packet from network */
1530 if ((i != c->recvpkt - 1)
1531 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1532 {
1533 break;
1534 }
1535 }
1536 else
1537 {
1538 /* no data to copy */
1539 break;
1540 }
1541 }
1542
1543 return wbytes == 0 ? -1 : wbytes;
1544 } /* ms_sort_udp_packet */
1545
1546
1547 /**
1548 * encapsulate upd read like tcp read
1549 *
1550 * @param c, pointer of the concurrency
1551 * @param buf, read buffer
1552 * @param len, length to read
1553 *
1554 * @return int, if success, return the read bytes, else return
1555 * -1
1556 */
1557 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1558 {
1559 int res= 0;
1560 int avail= 0;
1561 int rbytes= 0;
1562 int copybytes= 0;
1563
1564 assert(c->udp);
1565
1566 while (1)
1567 {
1568 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1569 {
1570 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1571 if (! new_rbuf)
1572 {
1573 fprintf(stderr, "Couldn't realloc input buffer.\n");
1574 c->rudpbytes= 0; /* ignore what we read */
1575 return -1;
1576 }
1577 c->rudpbuf= new_rbuf;
1578 c->rudpsize*= 2;
1579 }
1580
1581 avail= c->rudpsize - c->rudpbytes;
1582 /* UDP each time read a packet, 1400 bytes */
1583 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1584
1585 if (res > 0)
1586 {
1587 atomic_add_size(&ms_stats.bytes_read, res);
1588 c->rudpbytes+= res;
1589 rbytes+= res;
1590 if (res == avail)
1591 {
1592 continue;
1593 }
1594 else
1595 {
1596 break;
1597 }
1598 }
1599
1600 if (res == 0)
1601 {
1602 /* "connection" closed */
1603 return res;
1604 }
1605
1606 if (res == -1)
1607 {
1608 /* no data to read */
1609 return res;
1610 }
1611 }
1612
1613 /* copy data to read buffer */
1614 if (rbytes > 0)
1615 {
1616 copybytes= ms_sort_udp_packet(c, buf, len);
1617 }
1618
1619 if (copybytes == -1)
1620 {
1621 atomic_add_size(&ms_stats.pkt_disorder, 1);
1622 }
1623
1624 return copybytes;
1625 } /* ms_udp_read */
1626
1627
1628 /*
1629 * read from network as much as we can, handle buffer overflow and connection
1630 * close.
1631 * before reading, move the remaining incomplete fragment of a command
1632 * (if any) to the beginning of the buffer.
1633 * return 0 if there's nothing to read on the first read.
1634 */
1635
1636 /**
1637 * read from network as much as we can, handle buffer overflow and connection
1638 * close. before reading, move the remaining incomplete fragment of a command
1639 * (if any) to the beginning of the buffer.
1640 *
1641 * @param c, pointer of the concurrency
1642 *
1643 * @return int,
1644 * return 0 if there's nothing to read on the first read.
1645 * return 1 if get data
1646 * return -1 if error happens
1647 */
1648 static int ms_try_read_network(ms_conn_t *c)
1649 {
1650 int gotdata= 0;
1651 int res;
1652 int64_t avail;
1653
1654 assert(c != NULL);
1655
1656 if ((c->rcurr != c->rbuf)
1657 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1658 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1659 {
1660 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1661 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1662 c->rcurr= c->rbuf;
1663 }
1664
1665 while (1)
1666 {
1667 if (c->rbytes >= c->rsize)
1668 {
1669 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1670 if (! new_rbuf)
1671 {
1672 fprintf(stderr, "Couldn't realloc input buffer.\n");
1673 c->rbytes= 0; /* ignore what we read */
1674 return -1;
1675 }
1676 c->rcurr= c->rbuf= new_rbuf;
1677 c->rsize*= 2;
1678 }
1679
1680 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1681 if (avail == 0)
1682 {
1683 break;
1684 }
1685
1686 if (c->udp)
1687 {
1688 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1689 }
1690 else
1691 {
1692 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1693 }
1694
1695 if (res > 0)
1696 {
1697 if (! c->udp)
1698 {
1699 atomic_add_size(&ms_stats.bytes_read, res);
1700 }
1701 gotdata= 1;
1702 c->rbytes+= res;
1703 if (res == avail)
1704 {
1705 continue;
1706 }
1707 else
1708 {
1709 break;
1710 }
1711 }
1712 if (res == 0)
1713 {
1714 /* connection closed */
1715 ms_conn_set_state(c, conn_closing);
1716 return -1;
1717 }
1718 if (res == -1)
1719 {
1720 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1721 break;
1722 /* Should close on unhandled errors. */
1723 ms_conn_set_state(c, conn_closing);
1724 return -1;
1725 }
1726 }
1727
1728 return gotdata;
1729 } /* ms_try_read_network */
1730
1731
1732 /**
1733 * after get the object from server, verify the value if
1734 * necessary.
1735 *
1736 * @param c, pointer of the concurrency
1737 * @param mlget_item, pointer of mulit-get task item structure
1738 * @param value, received value string
1739 * @param vlen, received value string length
1740 */
1741 static void ms_verify_value(ms_conn_t *c,
1742 ms_mlget_task_item_t *mlget_item,
1743 char *value,
1744 int vlen)
1745 {
1746 if (c->curr_task.verify)
1747 {
1748 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1749 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1750 char *orignkey=
1751 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1752
1753 /* verify expire time if necessary */
1754 if (c->curr_task.item->exp_time > 0)
1755 {
1756 struct timeval curr_time;
1757 gettimeofday(&curr_time, NULL);
1758
1759 /* object expired but get it now */
1760 if (curr_time.tv_sec - c->curr_task.item->client_time
1761 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1762 {
1763 atomic_add_size(&ms_stats.exp_get, 1);
1764
1765 if (ms_setting.verbose)
1766 {
1767 char set_time[64];
1768 char cur_time[64];
1769 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1770 localtime(&c->curr_task.item->client_time));
1771 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1772 localtime(&curr_time.tv_sec));
1773 fprintf(stderr,
1774 "\n<%d expire time verification failed, "
1775 "object expired but get it now\n"
1776 "\tkey len: %d\n"
1777 "\tkey: %" PRIx64 " %.*s\n"
1778 "\tset time: %s current time: %s "
1779 "diff time: %d expire time: %d\n"
1780 "\texpected data: \n"
1781 "\treceived data len: %d\n"
1782 "\treceived data: %.*s\n",
1783 c->sfd,
1784 c->curr_task.item->key_size,
1785 c->curr_task.item->key_prefix,
1786 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1787 orignkey,
1788 set_time,
1789 cur_time,
1790 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1791 c->curr_task.item->exp_time,
1792 vlen,
1793 vlen,
1794 value);
1795 fflush(stderr);
1796 }
1797 }
1798 }
1799 else
1800 {
1801 if ((c->curr_task.item->value_size != vlen)
1802 || (memcmp(orignval, value, (size_t)vlen) != 0))
1803 {
1804 atomic_add_size(&ms_stats.vef_failed, 1);
1805
1806 if (ms_setting.verbose)
1807 {
1808 fprintf(stderr,
1809 "\n<%d data verification failed\n"
1810 "\tkey len: %d\n"
1811 "\tkey: %" PRIx64" %.*s\n"
1812 "\texpected data len: %d\n"
1813 "\texpected data: %.*s\n"
1814 "\treceived data len: %d\n"
1815 "\treceived data: %.*s\n",
1816 c->sfd,
1817 c->curr_task.item->key_size,
1818 c->curr_task.item->key_prefix,
1819 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1820 orignkey,
1821 c->curr_task.item->value_size,
1822 c->curr_task.item->value_size,
1823 orignval,
1824 vlen,
1825 vlen,
1826 value);
1827 fflush(stderr);
1828 }
1829 }
1830 }
1831
1832 c->curr_task.finish_verify= true;
1833
1834 if (mlget_item != NULL)
1835 {
1836 mlget_item->finish_verify= true;
1837 }
1838 }
1839 } /* ms_verify_value */
1840
1841
1842 /**
1843 * For ASCII protocol, after store the data into the local
1844 * buffer, run this function to handle the data.
1845 *
1846 * @param c, pointer of the concurrency
1847 */
1848 static void ms_ascii_complete_nread(ms_conn_t *c)
1849 {
1850 assert(c != NULL);
1851 assert(c->rbytes >= c->rvbytes);
1852 assert(c->protocol == ascii_prot);
1853 if (c->rvbytes > 2)
1854 {
1855 assert(
1856 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1857 }
1858
1859 /* multi-get */
1860 ms_mlget_task_item_t *mlget_item= NULL;
1861 if (((ms_setting.mult_key_num > 1)
1862 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1863 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1864 {
1865 c->mlget_task.value_index++;
1866 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1867
1868 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1869 {
1870 c->curr_task.item= mlget_item->item;
1871 c->curr_task.verify= mlget_item->verify;
1872 c->curr_task.finish_verify= mlget_item->finish_verify;
1873 mlget_item->get_miss= false;
1874 }
1875 else
1876 {
1877 /* Try to find the task item in multi-get task array */
1878 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1879 {
1880 mlget_item= &c->mlget_task.mlget_item[i];
1881 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1882 {
1883 c->curr_task.item= mlget_item->item;
1884 c->curr_task.verify= mlget_item->verify;
1885 c->curr_task.finish_verify= mlget_item->finish_verify;
1886 mlget_item->get_miss= false;
1887
1888 break;
1889 }
1890 }
1891 }
1892 }
1893
1894 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1895
1896 c->curr_task.get_miss= false;
1897 c->rbytes-= c->rvbytes;
1898 c->rcurr= c->rcurr + c->rvbytes;
1899 assert(c->rcurr <= (c->rbuf + c->rsize));
1900 c->readval= false;
1901 c->rvbytes= 0;
1902 } /* ms_ascii_complete_nread */
1903
1904
1905 /**
1906 * For binary protocol, after store the data into the local
1907 * buffer, run this function to handle the data.
1908 *
1909 * @param c, pointer of the concurrency
1910 */
1911 static void ms_bin_complete_nread(ms_conn_t *c)
1912 {
1913 assert(c != NULL);
1914 assert(c->rbytes >= c->rvbytes);
1915 assert(c->protocol == binary_prot);
1916
1917 int extlen= c->binary_header.response.extlen;
1918 int keylen= c->binary_header.response.keylen;
1919 uint8_t opcode= c->binary_header.response.opcode;
1920
1921 /* not get command or not include value, just return */
1922 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1923 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1924 || (c->rvbytes <= extlen + keylen))
1925 {
1926 /* get miss */
1927 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1928 {
1929 c->currcmd.retstat= MCD_END;
1930 c->curr_task.get_miss= true;
1931 }
1932
1933 c->readval= false;
1934 c->rvbytes= 0;
1935 ms_reset_conn(c, false);
1936 return;
1937 }
1938
1939 /* multi-get */
1940 ms_mlget_task_item_t *mlget_item= NULL;
1941 if (((ms_setting.mult_key_num > 1)
1942 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1943 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1944 {
1945 c->mlget_task.value_index++;
1946 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1947
1948 c->curr_task.item= mlget_item->item;
1949 c->curr_task.verify= mlget_item->verify;
1950 c->curr_task.finish_verify= mlget_item->finish_verify;
1951 mlget_item->get_miss= false;
1952 }
1953
1954 ms_verify_value(c,
1955 mlget_item,
1956 c->rcurr + extlen + keylen,
1957 c->rvbytes - extlen - keylen);
1958
1959 c->currcmd.retstat= MCD_END;
1960 c->curr_task.get_miss= false;
1961 c->rbytes-= c->rvbytes;
1962 c->rcurr= c->rcurr + c->rvbytes;
1963 assert(c->rcurr <= (c->rbuf + c->rsize));
1964 c->readval= false;
1965 c->rvbytes= 0;
1966
1967 if (ms_setting.mult_key_num > 1)
1968 {
1969 /* multi-get have check all the item */
1970 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1971 {
1972 ms_reset_conn(c, false);
1973 }
1974 }
1975 else
1976 {
1977 /* single get */
1978 ms_reset_conn(c, false);
1979 }
1980 } /* ms_bin_complete_nread */
1981
1982
1983 /**
1984 * we get here after reading the value of get commands.
1985 *
1986 * @param c, pointer of the concurrency
1987 */
1988 static void ms_complete_nread(ms_conn_t *c)
1989 {
1990 assert(c != NULL);
1991 assert(c->rbytes >= c->rvbytes);
1992 assert(c->protocol == ascii_prot
1993 || c->protocol == binary_prot);
1994
1995 if (c->protocol == binary_prot)
1996 {
1997 ms_bin_complete_nread(c);
1998 }
1999 else
2000 {
2001 ms_ascii_complete_nread(c);
2002 }
2003 } /* ms_complete_nread */
2004
2005
2006 /**
2007 * Adds a message header to a connection.
2008 *
2009 * @param c, pointer of the concurrency
2010 *
2011 * @return int, if success, return 0, else return -1
2012 */
2013 static int ms_add_msghdr(ms_conn_t *c)
2014 {
2015 struct msghdr *msg;
2016
2017 assert(c != NULL);
2018
2019 if (c->msgsize == c->msgused)
2020 {
2021 msg=
2022 realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
2023 if (! msg)
2024 return -1;
2025
2026 c->msglist= msg;
2027 c->msgsize*= 2;
2028 }
2029
2030 msg= c->msglist + c->msgused;
2031
2032 /**
2033 * this wipes msg_iovlen, msg_control, msg_controllen, and
2034 * msg_flags, the last 3 of which aren't defined on solaris:
2035 */
2036 memset(msg, 0, sizeof(struct msghdr));
2037
2038 msg->msg_iov= &c->iov[c->iovused];
2039
2040 if (c->udp && (c->srv_recv_addr_size > 0))
2041 {
2042 msg->msg_name= &c->srv_recv_addr;
2043 msg->msg_namelen= c->srv_recv_addr_size;
2044 }
2045
2046 c->msgbytes= 0;
2047 c->msgused++;
2048
2049 if (c->udp)
2050 {
2051 /* Leave room for the UDP header, which we'll fill in later. */
2052 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2053 }
2054
2055 return 0;
2056 } /* ms_add_msghdr */
2057
2058
2059 /**
2060 * Ensures that there is room for another structure iovec in a connection's
2061 * iov list.
2062 *
2063 * @param c, pointer of the concurrency
2064 *
2065 * @return int, if success, return 0, else return -1
2066 */
2067 static int ms_ensure_iov_space(ms_conn_t *c)
2068 {
2069 assert(c != NULL);
2070
2071 if (c->iovused >= c->iovsize)
2072 {
2073 int i, iovnum;
2074 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2075 ((size_t)c->iovsize
2076 * 2)
2077 * sizeof(struct iovec));
2078 if (! new_iov)
2079 return -1;
2080
2081 c->iov= new_iov;
2082 c->iovsize*= 2;
2083
2084 /* Point all the msghdr structures at the new list. */
2085 for (i= 0, iovnum= 0; i < c->msgused; i++)
2086 {
2087 c->msglist[i].msg_iov= &c->iov[iovnum];
2088 iovnum+= (int)c->msglist[i].msg_iovlen;
2089 }
2090 }
2091
2092 return 0;
2093 } /* ms_ensure_iov_space */
2094
2095
2096 /**
2097 * Adds data to the list of pending data that will be written out to a
2098 * connection.
2099 *
2100 * @param c, pointer of the concurrency
2101 * @param buf, the buffer includes data to send
2102 * @param len, the data length in the buffer
2103 *
2104 * @return int, if success, return 0, else return -1
2105 */
2106 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2107 {
2108 struct msghdr *m;
2109 int leftover;
2110 bool limit_to_mtu;
2111
2112 assert(c != NULL);
2113
2114 do
2115 {
2116 m= &c->msglist[c->msgused - 1];
2117
2118 /*
2119 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2120 */
2121 limit_to_mtu= c->udp;
2122
2123 /* We may need to start a new msghdr if this one is full. */
2124 if ((m->msg_iovlen == IOV_MAX)
2125 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2126 {
2127 ms_add_msghdr(c);
2128 m= &c->msglist[c->msgused - 1];
2129 }
2130
2131 if (ms_ensure_iov_space(c) != 0)
2132 return -1;
2133
2134 /* If the fragment is too big to fit in the datagram, split it up */
2135 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2136 {
2137 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2138 len-= leftover;
2139 }
2140 else
2141 {
2142 leftover= 0;
2143 }
2144
2145 m= &c->msglist[c->msgused - 1];
2146 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2147 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2148
2149 c->msgbytes+= len;
2150 c->iovused++;
2151 m->msg_iovlen++;
2152
2153 buf= ((char *)buf) + len;
2154 len= leftover;
2155 }
2156 while (leftover > 0);
2157
2158 return 0;
2159 } /* ms_add_iov */
2160
2161
2162 /**
2163 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2164 *
2165 * @param c, pointer of the concurrency
2166 *
2167 * @return int, if success, return 0, else return -1
2168 */
2169 static int ms_build_udp_headers(ms_conn_t *c)
2170 {
2171 int i;
2172 unsigned char *hdr;
2173
2174 assert(c != NULL);
2175
2176 c->request_id= ms_get_udp_request_id();
2177
2178 if (c->msgused > c->hdrsize)
2179 {
2180 void *new_hdrbuf;
2181 if (c->hdrbuf)
2182 new_hdrbuf= realloc(c->hdrbuf,
2183 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2184 else
2185 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2186 if (! new_hdrbuf)
2187 return -1;
2188
2189 c->hdrbuf= (unsigned char *)new_hdrbuf;
2190 c->hdrsize= c->msgused * 2;
2191 }
2192
2193 /* If this is a multi-packet request, drop it. */
2194 if (c->udp && (c->msgused > 1))
2195 {
2196 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2197 return -1;
2198 }
2199
2200 hdr= c->hdrbuf;
2201 for (i= 0; i < c->msgused; i++)
2202 {
2203 c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2204 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2205 *hdr++= (unsigned char)(c->request_id / 256);
2206 *hdr++= (unsigned char)(c->request_id % 256);
2207 *hdr++= (unsigned char)(i / 256);
2208 *hdr++= (unsigned char)(i % 256);
2209 *hdr++= (unsigned char)(c->msgused / 256);
2210 *hdr++= (unsigned char)(c->msgused % 256);
2211 *hdr++= (unsigned char)1; /* support facebook memcached */
2212 *hdr++= (unsigned char)0;
2213 assert(hdr ==
2214 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2215 + UDP_HEADER_SIZE));
2216 }
2217
2218 return 0;
2219 } /* ms_build_udp_headers */
2220
2221
2222 /**
2223 * Transmit the next chunk of data from our list of msgbuf structures.
2224 *
2225 * @param c, pointer of the concurrency
2226 *
2227 * @return TRANSMIT_COMPLETE All done writing.
2228 * TRANSMIT_INCOMPLETE More data remaining to write.
2229 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2230 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2231 */
2232 static int ms_transmit(ms_conn_t *c)
2233 {
2234 assert(c != NULL);
2235
2236 if ((c->msgcurr < c->msgused)
2237 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2238 {
2239 /* Finished writing the current msg; advance to the next. */
2240 c->msgcurr++;
2241 }
2242
2243 if (c->msgcurr < c->msgused)
2244 {
2245 ssize_t res;
2246 struct msghdr *m= &c->msglist[c->msgcurr];
2247
2248 res= sendmsg(c->sfd, m, 0);
2249 if (res > 0)
2250 {
2251 atomic_add_size(&ms_stats.bytes_written, res);
2252
2253 /* We've written some of the data. Remove the completed
2254 * iovec entries from the list of pending writes. */
2255 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2256 {
2257 res-= (ssize_t)m->msg_iov->iov_len;
2258 m->msg_iovlen--;
2259 m->msg_iov++;
2260 }
2261
2262 /* Might have written just part of the last iovec entry;
2263 * adjust it so the next write will do the rest. */
2264 if (res > 0)
2265 {
2266 m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2267 m->msg_iov->iov_len-= (size_t)res;
2268 }
2269 return TRANSMIT_INCOMPLETE;
2270 }
2271 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2272 {
2273 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2274 {
2275 fprintf(stderr, "Couldn't update event.\n");
2276 ms_conn_set_state(c, conn_closing);
2277 return TRANSMIT_HARD_ERROR;
2278 }
2279 return TRANSMIT_SOFT_ERROR;
2280 }
2281
2282 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2283 * we have a real error, on which we close the connection */
2284 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2285
2286 ms_conn_set_state(c, conn_closing);
2287 return TRANSMIT_HARD_ERROR;
2288 }
2289 else
2290 {
2291 return TRANSMIT_COMPLETE;
2292 }
2293 } /* ms_transmit */
2294
2295
2296 /**
2297 * Shrinks a connection's buffers if they're too big. This prevents
2298 * periodic large "mget" response from server chewing lots of client
2299 * memory.
2300 *
2301 * This should only be called in between requests since it can wipe output
2302 * buffers!
2303 *
2304 * @param c, pointer of the concurrency
2305 */
2306 static void ms_conn_shrink(ms_conn_t *c)
2307 {
2308 assert(c != NULL);
2309
2310 if (c->udp)
2311 return;
2312
2313 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2314 {
2315 char *newbuf;
2316
2317 if (c->rcurr != c->rbuf)
2318 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2319
2320 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2321
2322 if (newbuf)
2323 {
2324 c->rbuf= newbuf;
2325 c->rsize= DATA_BUFFER_SIZE;
2326 }
2327 c->rcurr= c->rbuf;
2328 }
2329
2330 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2331 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2332 {
2333 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2334 if (! new_rbuf)
2335 {
2336 c->rudpbuf= new_rbuf;
2337 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2338 }
2339 /* TODO check error condition? */
2340 }
2341
2342 if (c->msgsize > MSG_LIST_HIGHWAT)
2343 {
2344 struct msghdr *newbuf= (struct msghdr *)realloc(
2345 (void *)c->msglist,
2346 MSG_LIST_INITIAL
2347 * sizeof(c->msglist[0]));
2348 if (newbuf)
2349 {
2350 c->msglist= newbuf;
2351 c->msgsize= MSG_LIST_INITIAL;
2352 }
2353 /* TODO check error condition? */
2354 }
2355
2356 if (c->iovsize > IOV_LIST_HIGHWAT)
2357 {
2358 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2359 IOV_LIST_INITIAL
2360 * sizeof(c->iov[0]));
2361 if (newbuf)
2362 {
2363 c->iov= newbuf;
2364 c->iovsize= IOV_LIST_INITIAL;
2365 }
2366 /* TODO check return value */
2367 }
2368 } /* ms_conn_shrink */
2369
2370
2371 /**
2372 * Sets a connection's current state in the state machine. Any special
2373 * processing that needs to happen on certain state transitions can
2374 * happen here.
2375 *
2376 * @param c, pointer of the concurrency
2377 * @param state, connection state
2378 */
2379 static void ms_conn_set_state(ms_conn_t *c, int state)
2380 {
2381 assert(c != NULL);
2382
2383 if (state != c->state)
2384 {
2385 if (state == conn_read)
2386 {
2387 ms_conn_shrink(c);
2388 }
2389 c->state= state;
2390 }
2391 } /* ms_conn_set_state */
2392
2393
2394 /**
2395 * update the event if socks change state. for example: when
2396 * change the listen scoket read event to sock write event, or
2397 * change socket handler, we could call this function.
2398 *
2399 * @param c, pointer of the concurrency
2400 * @param new_flags, new event flags
2401 *
2402 * @return bool, if success, return true, else return false
2403 */
2404 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2405 {
2406 assert(c != NULL);
2407
2408 struct event_base *base= c->event.ev_base;
2409 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2410 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2411 {
2412 return true;
2413 }
2414
2415 if (event_del(&c->event) == -1)
2416 {
2417 /* try to delete the event again */
2418 if (event_del(&c->event) == -1)
2419 {
2420 return false;
2421 }
2422 }
2423
2424 event_set(&c->event,
2425 c->sfd,
2426 (short)new_flags,
2427 ms_event_handler,
2428 (void *)c);
2429 event_base_set(base, &c->event);
2430 c->ev_flags= (short)new_flags;
2431
2432 if (event_add(&c->event, NULL) == -1)
2433 {
2434 return false;
2435 }
2436
2437 return true;
2438 } /* ms_update_event */
2439
2440
2441 /**
2442 * If user want to get the expected throughput, we could limit
2443 * the performance of memslap. we could give up some work and
2444 * just wait a short time. The function is used to check this
2445 * case.
2446 *
2447 * @param c, pointer of the concurrency
2448 *
2449 * @return bool, if success, return true, else return false
2450 */
2451 static bool ms_need_yield(ms_conn_t *c)
2452 {
2453 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2454 int64_t tps= 0;
2455 int64_t time_diff= 0;
2456 struct timeval curr_time;
2457 ms_task_t *task= &c->curr_task;
2458
2459 if (ms_setting.expected_tps > 0)
2460 {
2461 gettimeofday(&curr_time, NULL);
2462 time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2463 tps=
2464 (int64_t)((task->get_opt
2465 + task->set_opt) / ((uint64_t)time_diff / 1000000));
2466
2467 /* current throughput is greater than expected throughput */
2468 if (tps > ms_thread->thread_ctx->tps_perconn)
2469 {
2470 return true;
2471 }
2472 }
2473
2474 return false;
2475 } /* ms_need_yield */
2476
2477
2478 /**
2479 * used to update the start time of each operation
2480 *
2481 * @param c, pointer of the concurrency
2482 */
2483 static void ms_update_start_time(ms_conn_t *c)
2484 {
2485 ms_task_item_t *item= c->curr_task.item;
2486
2487 if ((ms_setting.stat_freq > 0) || c->udp
2488 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2489 {
2490 gettimeofday(&c->start_time, NULL);
2491 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2492 {
2493 /* record the current time */
2494 item->client_time= c->start_time.tv_sec;
2495 }
2496 }
2497 } /* ms_update_start_time */
2498
2499
2500 /**
2501 * run the state machine
2502 *
2503 * @param c, pointer of the concurrency
2504 */
2505 static void ms_drive_machine(ms_conn_t *c)
2506 {
2507 bool stop= false;
2508
2509 assert(c != NULL);
2510
2511 while (! stop)
2512 {
2513 switch (c->state)
2514 {
2515 case conn_read:
2516 if (c->readval)
2517 {
2518 if (c->rbytes >= c->rvbytes)
2519 {
2520 ms_complete_nread(c);
2521 break;
2522 }
2523 }
2524 else
2525 {
2526 if (ms_try_read_line(c) != 0)
2527 {
2528 break;
2529 }
2530 }
2531
2532 if (ms_try_read_network(c) != 0)
2533 {
2534 break;
2535 }
2536
2537 /* doesn't read all the response data, wait event wake up */
2538 if (! c->currcmd.isfinish)
2539 {
2540 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2541 {
2542 fprintf(stderr, "Couldn't update event.\n");
2543 ms_conn_set_state(c, conn_closing);
2544 break;
2545 }
2546 stop= true;
2547 break;
2548 }
2549
2550 /* we have no command line and no data to read from network, next write */
2551 ms_conn_set_state(c, conn_write);
2552 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2553
2554 break;
2555
2556 case conn_write:
2557 if (! c->ctnwrite && ms_need_yield(c))
2558 {
2559 usleep(10);
2560
2561 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2562 {
2563 fprintf(stderr, "Couldn't update event.\n");
2564 ms_conn_set_state(c, conn_closing);
2565 break;
2566 }
2567 stop= true;
2568 break;
2569 }
2570
2571 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2572 {
2573 ms_conn_set_state(c, conn_closing);
2574 break;
2575 }
2576
2577 /* record the start time before starting to send data if necessary */
2578 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2579 {
2580 if (c->change_sfd)
2581 {
2582 c->change_sfd= false;
2583 }
2584 ms_update_start_time(c);
2585 }
2586
2587 /* change sfd if necessary */
2588 if (c->change_sfd)
2589 {
2590 c->ctnwrite= true;
2591 stop= true;
2592 break;
2593 }
2594
2595 /* execute task until nothing need be written to network */
2596 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2597 {
2598 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2599 {
2600 fprintf(stderr, "Couldn't update event.\n");
2601 ms_conn_set_state(c, conn_closing);
2602 break;
2603 }
2604 stop= true;
2605 break;
2606 }
2607
2608 switch (ms_transmit(c))
2609 {
2610 case TRANSMIT_COMPLETE:
2611 /* we have no data to write to network, next wait repose */
2612 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2613 {
2614 fprintf(stderr, "Couldn't update event.\n");
2615 ms_conn_set_state(c, conn_closing);
2616 c->ctnwrite= false;
2617 break;
2618 }
2619 ms_conn_set_state(c, conn_read);
2620 c->ctnwrite= false;
2621 stop= true;
2622 break;
2623
2624 case TRANSMIT_INCOMPLETE:
2625 c->ctnwrite= true;
2626 break; /* Continue in state machine. */
2627
2628 case TRANSMIT_HARD_ERROR:
2629 c->ctnwrite= false;
2630 break;
2631
2632 case TRANSMIT_SOFT_ERROR:
2633 c->ctnwrite= true;
2634 stop= true;
2635 break;
2636
2637 default:
2638 break;
2639 } /* switch */
2640
2641 break;
2642
2643 case conn_closing:
2644 /* recovery mode, need reconnect if connection close */
2645 if (ms_setting.reconnect && (! ms_global.time_out
2646 || ((ms_setting.run_time == 0)
2647 && (c->remain_exec_num > 0))))
2648 {
2649 if (ms_reconn(c) != 0)
2650 {
2651 ms_conn_close(c);
2652 stop= true;
2653 break;
2654 }
2655
2656 ms_reset_conn(c, false);
2657
2658 if (c->total_sfds == 1)
2659 {
2660 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2661 {
2662 fprintf(stderr, "Couldn't update event.\n");
2663 ms_conn_set_state(c, conn_closing);
2664 break;
2665 }
2666 }
2667
2668 break;
2669 }
2670 else
2671 {
2672 ms_conn_close(c);
2673 stop= true;
2674 break;
2675 }
2676
2677 default:
2678 assert(0);
2679 } /* switch */
2680 }
2681 } /* ms_drive_machine */
2682
2683
2684 /**
2685 * the event handler of each thread
2686 *
2687 * @param fd, the file descriptor of socket
2688 * @param which, event flag
2689 * @param arg, argument
2690 */
2691 void ms_event_handler(const int fd, const short which, void *arg)
2692 {
2693 ms_conn_t *c= (ms_conn_t *)arg;
2694
2695 assert(c != NULL);
2696
2697 c->which= which;
2698
2699 /* sanity */
2700 if (fd != c->sfd)
2701 {
2702 fprintf(stderr,
2703 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2704 fd,
2705 c->sfd);
2706 ms_conn_close(c);
2707 exit(1);
2708 }
2709 assert(fd == c->sfd);
2710
2711 ms_drive_machine(c);
2712
2713 /* wait for next event */
2714 } /* ms_event_handler */
2715
2716
2717 /**
2718 * get the next socket descriptor index to run for replication
2719 *
2720 * @param c, pointer of the concurrency
2721 * @param cmd, command(get or set )
2722 *
2723 * @return int, if success, return the index, else return 0
2724 */
2725 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2726 {
2727 uint32_t sock_index= 0;
2728 uint32_t i= 0;
2729
2730 if (c->total_sfds == 1)
2731 {
2732 return 0;
2733 }
2734
2735 if (ms_setting.rep_write_srv == 0)
2736 {
2737 return sock_index;
2738 }
2739
2740 do
2741 {
2742 if (cmd == CMD_SET)
2743 {
2744 for (i= 0; i < ms_setting.rep_write_srv; i++)
2745 {
2746 if (c->tcpsfd[i] > 0)
2747 {
2748 break;
2749 }
2750 }
2751
2752 if (i == ms_setting.rep_write_srv)
2753 {
2754 /* random get one replication server to read */
2755 sock_index= (uint32_t)(random() % c->total_sfds);
2756 }
2757 else
2758 {
2759 /* random get one replication writing server to write */
2760 sock_index= (uint32_t)(random() % ms_setting.rep_write_srv);
2761 }
2762 }
2763 else if (cmd == CMD_GET)
2764 {
2765 /* random get one replication server to read */
2766 sock_index= (uint32_t)(random() % c->total_sfds);
2767 }
2768 }
2769 while (c->tcpsfd[sock_index] == 0);
2770
2771 return sock_index;
2772 } /* ms_get_rep_sock_index */
2773
2774
2775 /**
2776 * get the next socket descriptor index to run
2777 *
2778 * @param c, pointer of the concurrency
2779 *
2780 * @return int, return the index
2781 */
2782 static uint32_t ms_get_next_sock_index(ms_conn_t *c)
2783 {
2784 uint32_t sock_index= 0;
2785
2786 do
2787 {
2788 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2789 }
2790 while (c->tcpsfd[sock_index] == 0);
2791
2792 return sock_index;
2793 } /* ms_get_next_sock_index */
2794
2795
2796 /**
2797 * update socket event of the connections
2798 *
2799 * @param c, pointer of the concurrency
2800 *
2801 * @return int, if success, return 0, else return -1
2802 */
2803 static int ms_update_conn_sock_event(ms_conn_t *c)
2804 {
2805 assert(c != NULL);
2806
2807 switch (c->currcmd.cmd)
2808 {
2809 case CMD_SET:
2810 if (ms_setting.facebook_test && c->udp)
2811 {
2812 c->sfd= c->tcpsfd[0];
2813 c->udp= false;
2814 c->change_sfd= true;
2815 }
2816 break;
2817
2818 case CMD_GET:
2819 if (ms_setting.facebook_test && ! c->udp)
2820 {
2821 c->sfd= c->udpsfd;
2822 c->udp= true;
2823 c->change_sfd= true;
2824 }
2825 break;
2826
2827 default:
2828 break;
2829 } /* switch */
2830
2831 if (! c->udp && (c->total_sfds > 1))
2832 {
2833 if (c->cur_idx != c->total_sfds)
2834 {
2835 if (ms_setting.rep_write_srv == 0)
2836 {
2837 c->cur_idx= ms_get_next_sock_index(c);
2838 }
2839 else
2840 {
2841 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2842 }
2843 }
2844 else
2845 {
2846 /* must select the first sock of the connection at the beginning */
2847 c->cur_idx= 0;
2848 }
2849
2850 c->sfd= c->tcpsfd[c->cur_idx];
2851 assert(c->sfd != 0);
2852 c->change_sfd= true;
2853 }
2854
2855 if (c->change_sfd)
2856 {
2857 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2858 {
2859 fprintf(stderr, "Couldn't update event.\n");
2860 ms_conn_set_state(c, conn_closing);
2861 return -1;
2862 }
2863 }
2864
2865 return 0;
2866 } /* ms_update_conn_sock_event */
2867
2868
2869 /**
2870 * for ASCII protocol, this function build the set command
2871 * string and send the command.
2872 *
2873 * @param c, pointer of the concurrency
2874 * @param item, pointer of task item which includes the object
2875 * information
2876 *
2877 * @return int, if success, return 0, else return -1
2878 */
2879 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2880 {
2881 int value_offset;
2882 int write_len;
2883 char *buffer= c->wbuf;
2884
2885 write_len= sprintf(buffer,
2886 " %u %d %d\r\n",
2887 0,
2888 item->exp_time,
2889 item->value_size);
2890
2891 if (write_len > c->wsize)
2892 {
2893 /* ought to be always enough. just fail for simplicity */
2894 fprintf(stderr, "output command line too long.\n");
2895 return -1;
2896 }
2897
2898 if (item->value_offset == INVALID_OFFSET)
2899 {
2900 value_offset= item->key_suffix_offset;
2901 }
2902 else
2903 {
2904 value_offset= item->value_offset;
2905 }
2906
2907 if ((ms_add_iov(c, "set ", 4) != 0)
2908 || (ms_add_iov(c, (char *)&item->key_prefix,
2909 (int)KEY_PREFIX_SIZE) != 0)
2910 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2911 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2912 || (ms_add_iov(c, buffer, write_len) != 0)
2913 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2914 item->value_size) != 0)
2915 || (ms_add_iov(c, "\r\n", 2) != 0)
2916 || (c->udp && (ms_build_udp_headers(c) != 0)))
2917 {
2918 return -1;
2919 }
2920
2921 return 0;
2922 } /* ms_build_ascii_write_buf_set */
2923
2924
2925 /**
2926 * used to send set command to server
2927 *
2928 * @param c, pointer of the concurrency
2929 * @param item, pointer of task item which includes the object
2930 * information
2931 *
2932 * @return int, if success, return 0, else return -1
2933 */
2934 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2935 {
2936 assert(c != NULL);
2937
2938 c->currcmd.cmd= CMD_SET;
2939 c->currcmd.isfinish= false;
2940 c->currcmd.retstat= MCD_FAILURE;
2941
2942 if (ms_update_conn_sock_event(c) != 0)
2943 {
2944 return -1;
2945 }
2946
2947 c->msgcurr= 0;
2948 c->msgused= 0;
2949 c->iovused= 0;
2950 if (ms_add_msghdr(c) != 0)
2951 {
2952 fprintf(stderr, "Out of memory preparing request.");
2953 return -1;
2954 }
2955
2956 /* binary protocol */
2957 if (c->protocol == binary_prot)
2958 {
2959 if (ms_build_bin_write_buf_set(c, item) != 0)
2960 {
2961 return -1;
2962 }
2963 }
2964 else
2965 {
2966 if (ms_build_ascii_write_buf_set(c, item) != 0)
2967 {
2968 return -1;
2969 }
2970 }
2971
2972 atomic_add_size(&ms_stats.obj_bytes,
2973 item->key_size + item->value_size);
2974 atomic_add_size(&ms_stats.cmd_set, 1);
2975
2976 return 0;
2977 } /* ms_mcd_set */
2978
2979
2980 /**
2981 * for ASCII protocol, this function build the get command
2982 * string and send the command.
2983 *
2984 * @param c, pointer of the concurrency
2985 * @param item, pointer of task item which includes the object
2986 * information
2987 *
2988 * @return int, if success, return 0, else return -1
2989 */
2990 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
2991 {
2992 if ((ms_add_iov(c, "get ", 4) != 0)
2993 || (ms_add_iov(c, (char *)&item->key_prefix,
2994 (int)KEY_PREFIX_SIZE) != 0)
2995 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2996 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2997 || (ms_add_iov(c, "\r\n", 2) != 0)
2998 || (c->udp && (ms_build_udp_headers(c) != 0)))
2999 {
3000 return -1;
3001 }
3002
3003 return 0;
3004 } /* ms_build_ascii_write_buf_get */
3005
3006
3007 /**
3008 * used to send the get command to server
3009 *
3010 * @param c, pointer of the concurrency
3011 * @param item, pointer of task item which includes the object
3012 * information
3013 *
3014 * @return int, if success, return 0, else return -1
3015 */
3016 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
3017 {
3018 assert(c != NULL);
3019
3020 c->currcmd.cmd= CMD_GET;
3021 c->currcmd.isfinish= false;
3022 c->currcmd.retstat= MCD_FAILURE;
3023
3024 if (ms_update_conn_sock_event(c) != 0)
3025 {
3026 return -1;
3027 }
3028
3029 c->msgcurr= 0;
3030 c->msgused= 0;
3031 c->iovused= 0;
3032 if (ms_add_msghdr(c) != 0)
3033 {
3034 fprintf(stderr, "Out of memory preparing request.");
3035 return -1;
3036 }
3037
3038 /* binary protocol */
3039 if (c->protocol == binary_prot)
3040 {
3041 if (ms_build_bin_write_buf_get(c, item) != 0)
3042 {
3043 return -1;
3044 }
3045 }
3046 else
3047 {
3048 if (ms_build_ascii_write_buf_get(c, item) != 0)
3049 {
3050 return -1;
3051 }
3052 }
3053
3054 atomic_add_size(&ms_stats.cmd_get, 1);
3055
3056 return 0;
3057 } /* ms_mcd_get */
3058
3059
3060 /**
3061 * for ASCII protocol, this function build the multi-get command
3062 * string and send the command.
3063 *
3064 * @param c, pointer of the concurrency
3065 *
3066 * @return int, if success, return 0, else return -1
3067 */
3068 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3069 {
3070 ms_task_item_t *item;
3071
3072 if (ms_add_iov(c, "get", 3) != 0)
3073 {
3074 return -1;
3075 }
3076
3077 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3078 {
3079 item= c->mlget_task.mlget_item[i].item;
3080 assert(item != NULL);
3081 if ((ms_add_iov(c, " ", 1) != 0)
3082 || (ms_add_iov(c, (char *)&item->key_prefix,
3083 (int)KEY_PREFIX_SIZE) != 0)
3084 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3085 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3086 {
3087 return -1;
3088 }
3089 }
3090
3091 if ((ms_add_iov(c, "\r\n", 2) != 0)
3092 || (c->udp && (ms_build_udp_headers(c) != 0)))
3093 {
3094 return -1;
3095 }
3096
3097 return 0;
3098 } /* ms_build_ascii_write_buf_mlget */
3099
3100
3101 /**
3102 * used to send the multi-get command to server
3103 *
3104 * @param c, pointer of the concurrency
3105 *
3106 * @return int, if success, return 0, else return -1
3107 */
3108 int ms_mcd_mlget(ms_conn_t *c)
3109 {
3110 ms_task_item_t *item;
3111
3112 assert(c != NULL);
3113 assert(c->mlget_task.mlget_num >= 1);
3114
3115 c->currcmd.cmd= CMD_GET;
3116 c->currcmd.isfinish= false;
3117 c->currcmd.retstat= MCD_FAILURE;
3118
3119 if (ms_update_conn_sock_event(c) != 0)
3120 {
3121 return -1;
3122 }
3123
3124 c->msgcurr= 0;
3125 c->msgused= 0;
3126 c->iovused= 0;
3127 if (ms_add_msghdr(c) != 0)
3128 {
3129 fprintf(stderr, "Out of memory preparing request.");
3130 return -1;
3131 }
3132
3133 /* binary protocol */
3134 if (c->protocol == binary_prot)
3135 {
3136 if (ms_build_bin_write_buf_mlget(c) != 0)
3137 {
3138 return -1;
3139 }
3140 }
3141 else
3142 {
3143 if (ms_build_ascii_write_buf_mlget(c) != 0)
3144 {
3145 return -1;
3146 }
3147 }
3148
3149 /* decrease operation time of each item */
3150 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3151 {
3152 item= c->mlget_task.mlget_item[i].item;
3153 atomic_add_size(&ms_stats.cmd_get, 1);
3154 }
3155
3156 return 0;
3157 } /* ms_mcd_mlget */
3158
3159
3160 /**
3161 * binary protocol support
3162 */
3163
3164 /**
3165 * for binary protocol, parse the response of server
3166 *
3167 * @param c, pointer of the concurrency
3168 *
3169 * @return int, if success, return 0, else return -1
3170 */
3171 static int ms_bin_process_response(ms_conn_t *c)
3172 {
3173 const char *errstr= NULL;
3174
3175 assert(c != NULL);
3176
3177 uint32_t bodylen= c->binary_header.response.bodylen;
3178 uint8_t opcode= c->binary_header.response.opcode;
3179 uint16_t status= c->binary_header.response.status;
3180
3181 if (bodylen > 0)
3182 {
3183 c->rvbytes= (int32_t)bodylen;
3184 c->readval= true;
3185 return 1;
3186 }
3187 else
3188 {
3189 switch (status)
3190 {
3191 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3192 if (opcode == PROTOCOL_BINARY_CMD_SET)
3193 {
3194 c->currcmd.retstat= MCD_STORED;
3195 }
3196 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3197 {
3198 c->currcmd.retstat= MCD_DELETED;
3199 }
3200 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3201 {
3202 c->currcmd.retstat= MCD_END;
3203 }
3204 break;
3205
3206 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3207 errstr= "Out of memory";
3208 c->currcmd.retstat= MCD_SERVER_ERROR;
3209 break;
3210
3211 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3212 errstr= "Unknown command";
3213 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3214 break;
3215
3216 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3217 errstr= "Not found";
3218 c->currcmd.retstat= MCD_NOTFOUND;
3219 break;
3220
3221 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3222 errstr= "Invalid arguments";
3223 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3224 break;
3225
3226 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3227 errstr= "Data exists for key.";
3228 break;
3229
3230 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3231 errstr= "Too large.";
3232 c->currcmd.retstat= MCD_SERVER_ERROR;
3233 break;
3234
3235 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3236 errstr= "Not stored.";
3237 c->currcmd.retstat= MCD_NOTSTORED;
3238 break;
3239
3240 default:
3241 errstr= "Unknown error";
3242 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3243 break;
3244 } /* switch */
3245
3246 if (errstr != NULL)
3247 {
3248 fprintf(stderr, "%s\n", errstr);
3249 }
3250 }
3251
3252 return 0;
3253 } /* ms_bin_process_response */
3254
3255
3256 /* build binary header and add the header to the buffer to send */
3257
3258 /**
3259 * build binary header and add the header to the buffer to send
3260 *
3261 * @param c, pointer of the concurrency
3262 * @param opcode, operation code
3263 * @param hdr_len, length of header
3264 * @param key_len, length of key
3265 * @param body_len. length of body
3266 */
3267 static void ms_add_bin_header(ms_conn_t *c,
3268 uint8_t opcode,
3269 uint8_t hdr_len,
3270 uint16_t key_len,
3271 uint32_t body_len)
3272 {
3273 protocol_binary_request_header *header;
3274
3275 assert(c != NULL);
3276
3277 header= (protocol_binary_request_header *)c->wcurr;
3278
3279 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3280 header->request.opcode= (uint8_t)opcode;
3281 header->request.keylen= htons(key_len);
3282
3283 header->request.extlen= (uint8_t)hdr_len;
3284 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3285 header->request.reserved= 0;
3286
3287 header->request.bodylen= htonl(body_len);
3288 header->request.opaque= 0;
3289 header->request.cas= 0;
3290
3291 ms_add_iov(c, c->wcurr, sizeof(header->request));
3292 } /* ms_add_bin_header */
3293
3294
3295 /**
3296 * add the key to the socket write buffer array
3297 *
3298 * @param c, pointer of the concurrency
3299 * @param item, pointer of task item which includes the object
3300 * information
3301 */
3302 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3303 {
3304 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3305 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3306 item->key_size - (int)KEY_PREFIX_SIZE);
3307 }
3308
3309
3310 /**
3311 * for binary protocol, this function build the set command
3312 * and add the command to send buffer array.
3313 *
3314 * @param c, pointer of the concurrency
3315 * @param item, pointer of task item which includes the object
3316 * information
3317 *
3318 * @return int, if success, return 0, else return -1
3319 */
3320 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3321 {
3322 assert(c->wbuf == c->wcurr);
3323
3324 int value_offset;
3325 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3326 uint16_t keylen= (uint16_t)item->key_size;
3327 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3328 + (uint32_t)keylen + (uint32_t)item->value_size;
3329
3330 ms_add_bin_header(c,
3331 PROTOCOL_BINARY_CMD_SET,
3332 sizeof(rep->message.body),
3333 keylen,
3334 bodylen);
3335 rep->message.body.flags= 0;
3336 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3337 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3338 ms_add_key_to_iov(c, item);
3339
3340 if (item->value_offset == INVALID_OFFSET)
3341 {
3342 value_offset= item->key_suffix_offset;
3343 }
3344 else
3345 {
3346 value_offset= item->value_offset;
3347 }
3348 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3349
3350 return 0;
3351 } /* ms_build_bin_write_buf_set */
3352
3353
3354 /**
3355 * for binary protocol, this function build the get command and
3356 * add the command to send buffer array.
3357 *
3358 * @param c, pointer of the concurrency
3359 * @param item, pointer of task item which includes the object
3360 * information
3361 *
3362 * @return int, if success, return 0, else return -1
3363 */
3364 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3365 {
3366 assert(c->wbuf == c->wcurr);
3367
3368 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3369 (uint32_t)item->key_size);
3370 ms_add_key_to_iov(c, item);
3371
3372 return 0;
3373 } /* ms_build_bin_write_buf_get */
3374
3375
3376 /**
3377 * for binary protocol, this function build the multi-get
3378 * command and add the command to send buffer array.
3379 *
3380 * @param c, pointer of the concurrency
3381 * @param item, pointer of task item which includes the object
3382 * information
3383 *
3384 * @return int, if success, return 0, else return -1
3385 */
3386 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3387 {
3388 ms_task_item_t *item;
3389
3390 assert(c->wbuf == c->wcurr);
3391
3392 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3393 {
3394 item= c->mlget_task.mlget_item[i].item;
3395 assert(item != NULL);
3396
3397 ms_add_bin_header(c,
3398 PROTOCOL_BINARY_CMD_GET,
3399 0,
3400 (uint16_t)item->key_size,
3401 (uint32_t)item->key_size);
3402 ms_add_key_to_iov(c, item);
3403 c->wcurr+= sizeof(protocol_binary_request_get);
3404 }
3405
3406 c->wcurr= c->wbuf;
3407
3408 return 0;
3409 } /* ms_build_bin_write_buf_mlget */