Merged trunk.
[m6w6/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <stdio.h>
15 #include <limits.h>
16 #include <sys/uio.h>
17 #include <event.h>
18 #include <fcntl.h>
19 #include <netinet/tcp.h>
20 #include <arpa/inet.h>
21 #include "ms_setting.h"
22 #include "ms_thread.h"
23
24 /* for network write */
25 #define TRANSMIT_COMPLETE 0
26 #define TRANSMIT_INCOMPLETE 1
27 #define TRANSMIT_SOFT_ERROR 2
28 #define TRANSMIT_HARD_ERROR 3
29
30 /* for generating key */
31 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
32 #define KEY_PREFIX_MASK 0x1010101010101010
33
34 /* For parse the value length return by server */
35 #define KEY_TOKEN 1
36 #define VALUELEN_TOKEN 3
37
38 /* global increasing counter, to ensure the key prefix unique */
39 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
40
41 /* global increasing counter, generating request id for UDP */
42 static int udp_request_id= 0;
43
44 extern __thread ms_thread_t ms_thread;
45
46 /* generate upd request id */
47 static int ms_get_udp_request_id(void);
48
49
50 /* connect initialize */
51 static void ms_task_init(ms_conn_t *c);
52 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
53 static int ms_conn_sock_init(ms_conn_t *c);
54 static int ms_conn_event_init(ms_conn_t *c);
55 static int ms_conn_init(ms_conn_t *c,
56 const int init_state,
57 const int read_buffer_size,
58 const bool is_udp);
59 static void ms_warmup_num_init(ms_conn_t *c);
60 static int ms_item_win_init(ms_conn_t *c);
61
62
63 /* connection close */
64 void ms_conn_free(ms_conn_t *c);
65 static void ms_conn_close(ms_conn_t *c);
66
67
68 /* create network connection */
69 static int ms_new_socket(struct addrinfo *ai);
70 static void ms_maximize_sndbuf(const int sfd);
71 static int ms_network_connect(ms_conn_t *c,
72 char *srv_host_name,
73 const int srv_port,
74 const bool is_udp,
75 int *ret_sfd);
76 static int ms_reconn(ms_conn_t *c);
77
78
79 /* read and parse */
80 static int ms_tokenize_command(char *command,
81 token_t *tokens,
82 const int max_tokens);
83 static int ms_ascii_process_line(ms_conn_t *c, char *command);
84 static int ms_try_read_line(ms_conn_t *c);
85 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
86 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
87 static int ms_try_read_network(ms_conn_t *c);
88 static void ms_verify_value(ms_conn_t *c,
89 ms_mlget_task_item_t *mlget_item,
90 char *value,
91 int vlen);
92 static void ms_ascii_complete_nread(ms_conn_t *c);
93 static void ms_bin_complete_nread(ms_conn_t *c);
94 static void ms_complete_nread(ms_conn_t *c);
95
96
97 /* send functions */
98 static int ms_add_msghdr(ms_conn_t *c);
99 static int ms_ensure_iov_space(ms_conn_t *c);
100 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
101 static int ms_build_udp_headers(ms_conn_t *c);
102 static int ms_transmit(ms_conn_t *c);
103
104
105 /* status adjustment */
106 static void ms_conn_shrink(ms_conn_t *c);
107 static void ms_conn_set_state(ms_conn_t *c, int state);
108 static bool ms_update_event(ms_conn_t *c, const int new_flags);
109 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
110 static int ms_get_next_sock_index(ms_conn_t *c);
111 static int ms_update_conn_sock_event(ms_conn_t *c);
112 static bool ms_need_yield(ms_conn_t *c);
113 static void ms_update_start_time(ms_conn_t *c);
114
115
116 /* main loop */
117 static void ms_drive_machine(ms_conn_t *c);
118 void ms_event_handler(const int fd, const short which, void *arg);
119
120
121 /* ascii protocol */
122 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
123 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
124 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
125
126
127 /* binary protocol */
128 static int ms_bin_process_response(ms_conn_t *c);
129 static void ms_add_bin_header(ms_conn_t *c,
130 uint8_t opcode,
131 uint8_t hdr_len,
132 uint16_t key_len,
133 uint32_t body_len);
134 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
135 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
136 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
137 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
138
139
140 /**
141 * each key has two parts, prefix and suffix. The suffix is a
142 * string random get form the character table. The prefix is a
143 * uint64_t variable. And the prefix must be unique. we use the
144 * prefix to identify a key. And the prefix can't include
145 * character ' ' '\r' '\n' '\0'.
146 *
147 * @return uint64_t
148 */
149 uint64_t ms_get_key_prefix(void)
150 {
151 uint64_t key_prefix;
152
153 pthread_mutex_lock(&ms_global.seq_mutex);
154 key_prefix_seq|= KEY_PREFIX_MASK;
155 key_prefix= key_prefix_seq;
156 key_prefix_seq++;
157 pthread_mutex_unlock(&ms_global.seq_mutex);
158
159 return key_prefix;
160 } /* ms_get_key_prefix */
161
162
163 /**
164 * get an unique udp request id
165 *
166 * @return an unique UDP request id
167 */
168 static int ms_get_udp_request_id(void)
169 {
170 return __sync_fetch_and_add(&udp_request_id, 1);
171 }
172
173
174 /**
175 * initialize current task structure
176 *
177 * @param c, pointer of the concurrency
178 */
179 static void ms_task_init(ms_conn_t *c)
180 {
181 c->curr_task.cmd= CMD_NULL;
182 c->curr_task.item= 0;
183 c->curr_task.verify= false;
184 c->curr_task.finish_verify= true;
185 c->curr_task.get_miss= true;
186
187 c->curr_task.get_opt= 0;
188 c->curr_task.set_opt= 0;
189 c->curr_task.cycle_undo_get= 0;
190 c->curr_task.cycle_undo_set= 0;
191 c->curr_task.verified_get= 0;
192 c->curr_task.overwrite_set= 0;
193 } /* ms_task_init */
194
195
196 /**
197 * initialize udp for the connection structure
198 *
199 * @param c, pointer of the concurrency
200 * @param is_udp, whether it's udp
201 *
202 * @return int, if success, return 0, else return -1
203 */
204 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
205 {
206 c->hdrbuf= 0;
207 c->rudpbuf= 0;
208 c->udppkt= 0;
209
210 c->rudpsize= UDP_DATA_BUFFER_SIZE;
211 c->hdrsize= 0;
212
213 c->rudpbytes= 0;
214 c->packets= 0;
215 c->recvpkt= 0;
216 c->pktcurr= 0;
217 c->ordcurr= 0;
218
219 c->udp= is_udp;
220
221 if (c->udp || (! c->udp && ms_setting.facebook_test))
222 {
223 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
224 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
225
226 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
227 {
228 if (c->rudpbuf != NULL)
229 free(c->rudpbuf);
230 if (c->udppkt != NULL)
231 free(c->udppkt);
232 fprintf(stderr, "malloc()\n");
233 return -1;
234 }
235 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
236 }
237
238 return 0;
239 } /* ms_conn_udp_init */
240
241
242 /**
243 * initialize the connection structure
244 *
245 * @param c, pointer of the concurrency
246 * @param init_state, (conn_read, conn_write, conn_closing)
247 * @param read_buffer_size
248 * @param is_udp, whether it's udp
249 *
250 * @return int, if success, return 0, else return -1
251 */
252 static int ms_conn_init(ms_conn_t *c,
253 const int init_state,
254 const int read_buffer_size,
255 const bool is_udp)
256 {
257 assert(c != NULL);
258
259 c->rbuf= c->wbuf= 0;
260 c->iov= 0;
261 c->msglist= 0;
262
263 c->rsize= read_buffer_size;
264 c->wsize= WRITE_BUFFER_SIZE;
265 c->iovsize= IOV_LIST_INITIAL;
266 c->msgsize= MSG_LIST_INITIAL;
267
268 /* for replication, each connection need connect all the server */
269 if (ms_setting.rep_write_srv > 0)
270 {
271 c->total_sfds= ms_setting.srv_cnt;
272 }
273 else
274 {
275 c->total_sfds= ms_setting.sock_per_conn;
276 }
277 c->alive_sfds= 0;
278
279 c->rbuf= (char *)malloc((size_t)c->rsize);
280 c->wbuf= (char *)malloc((size_t)c->wsize);
281 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
282 c->msglist= (struct msghdr *)malloc(
283 sizeof(struct msghdr) * (size_t)c->msgsize);
284 if (ms_setting.mult_key_num > 1)
285 {
286 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
287 malloc(
288 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
289 }
290 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
291
292 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
293 || (c->msglist == NULL) || (c->tcpsfd == NULL)
294 || ((ms_setting.mult_key_num > 1)
295 && (c->mlget_task.mlget_item == NULL)))
296 {
297 if (c->rbuf != NULL)
298 free(c->rbuf);
299 if (c->wbuf != NULL)
300 free(c->wbuf);
301 if (c->iov != NULL)
302 free(c->iov);
303 if (c->msglist != NULL)
304 free(c->msglist);
305 if (c->mlget_task.mlget_item != NULL)
306 free(c->mlget_task.mlget_item);
307 if (c->tcpsfd != NULL)
308 free(c->tcpsfd);
309 fprintf(stderr, "malloc()\n");
310 return -1;
311 }
312
313 c->state= init_state;
314 c->rvbytes= 0;
315 c->rbytes= 0;
316 c->rcurr= c->rbuf;
317 c->wcurr= c->wbuf;
318 c->iovused= 0;
319 c->msgcurr= 0;
320 c->msgused= 0;
321 c->cur_idx= c->total_sfds; /* default index is a invalid value */
322
323 c->ctnwrite= false;
324 c->readval= false;
325 c->change_sfd= false;
326
327 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
328 c->precmd.isfinish= true; /* default the previous command finished */
329 c->currcmd.isfinish= false;
330 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
331 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
332
333 c->mlget_task.mlget_num= 0;
334 c->mlget_task.value_index= -1; /* default invalid value */
335
336 if (ms_setting.binary_prot)
337 {
338 c->protocol= binary_prot;
339 }
340 else if (is_udp)
341 {
342 c->protocol= ascii_udp_prot;
343 }
344 else
345 {
346 c->protocol= ascii_prot;
347 }
348
349 /* initialize udp */
350 if (ms_conn_udp_init(c, is_udp) != 0)
351 {
352 return -1;
353 }
354
355 /* initialize task */
356 ms_task_init(c);
357
358 if (! (ms_setting.facebook_test && is_udp))
359 {
360 __sync_fetch_and_add(&ms_stats.active_conns, 1);
361 }
362
363 return 0;
364 } /* ms_conn_init */
365
366
367 /**
368 * when doing 100% get operation, it could preset some objects
369 * to warmup the server. this function is used to initialize the
370 * number of the objects to preset.
371 *
372 * @param c, pointer of the concurrency
373 */
374 static void ms_warmup_num_init(ms_conn_t *c)
375 {
376 /* no set operation, preset all the items in the window */
377 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
378 {
379 c->warmup_num= c->win_size;
380 c->remain_warmup_num= c->warmup_num;
381 }
382 else
383 {
384 c->warmup_num= 0;
385 c->remain_warmup_num= c->warmup_num;
386 }
387 } /* ms_warmup_num_init */
388
389
390 /**
391 * each connection has an item window, this function initialize
392 * the window. The window is used to generate task.
393 *
394 * @param c, pointer of the concurrency
395 *
396 * @return int, if success, return 0, else return -1
397 */
398 static int ms_item_win_init(ms_conn_t *c)
399 {
400 int exp_cnt= 0;
401
402 c->win_size= (int)ms_setting.win_size;
403 c->set_cursor= 0;
404 c->exec_num= ms_thread.thread_ctx->exec_num_perconn;
405 c->remain_exec_num= c->exec_num;
406
407 c->item_win= (ms_task_item_t *)malloc(
408 sizeof(ms_task_item_t) * (size_t)c->win_size);
409 if (c->item_win == NULL)
410 {
411 fprintf(stderr, "Can't allocate task item array for conn.\n");
412 return -1;
413 }
414 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
415
416 for (int i= 0; i < c->win_size; i++)
417 {
418 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
419 c->item_win[i].key_prefix= ms_get_key_prefix();
420 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
421 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
422 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
423 c->item_win[i].client_time= 0;
424
425 /* set expire time base on the proportion */
426 if (exp_cnt < ms_setting.exp_ver_per * i)
427 {
428 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
429 exp_cnt++;
430 }
431 else
432 {
433 c->item_win[i].exp_time= 0;
434 }
435 }
436
437 ms_warmup_num_init(c);
438
439 return 0;
440 } /* ms_item_win_init */
441
442
443 /**
444 * each connection structure can include one or more sock
445 * handlers. this function create these socks and connect the
446 * server(s).
447 *
448 * @param c, pointer of the concurrency
449 *
450 * @return int, if success, return 0, else return -1
451 */
452 static int ms_conn_sock_init(ms_conn_t *c)
453 {
454 int i;
455 int ret_sfd;
456 int srv_idx= 0;
457
458 assert(c != NULL);
459 assert(c->tcpsfd != NULL);
460
461 for (i= 0; i < c->total_sfds; i++)
462 {
463 ret_sfd= 0;
464 if (ms_setting.rep_write_srv > 0)
465 {
466 /* for replication, each connection need connect all the server */
467 srv_idx= i;
468 }
469 else
470 {
471 /* all the connections in a thread connects the same server */
472 srv_idx= ms_thread.thread_ctx->srv_idx;
473 }
474
475 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
476 ms_setting.servers[srv_idx].srv_port,
477 ms_setting.udp, &ret_sfd) != 0)
478 {
479 break;
480 }
481
482 if (i == 0)
483 {
484 c->sfd= ret_sfd;
485 }
486
487 if (! ms_setting.udp)
488 {
489 c->tcpsfd[i]= ret_sfd;
490 }
491
492 c->alive_sfds++;
493 }
494
495 /* initialize udp sock handler if necessary */
496 if (ms_setting.facebook_test)
497 {
498 ret_sfd= 0;
499 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
500 ms_setting.servers[srv_idx].srv_port,
501 true, &ret_sfd) != 0)
502 {
503 c->udpsfd= 0;
504 }
505 else
506 {
507 c->udpsfd= ret_sfd;
508 }
509 }
510
511 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
512 {
513 if (ms_setting.udp)
514 {
515 close(c->sfd);
516 }
517 else
518 {
519 for (int j= 0; j < i; j++)
520 {
521 close(c->tcpsfd[j]);
522 }
523 }
524
525 if (c->udpsfd != 0)
526 {
527 close(c->udpsfd);
528 }
529
530 return -1;
531 }
532
533 return 0;
534 } /* ms_conn_sock_init */
535
536
537 /**
538 * each connection is managed by libevent, this function
539 * initialize the event of the connection structure.
540 *
541 * @param c, pointer of the concurrency
542 *
543 * @return int, if success, return 0, else return -1
544 */
545 static int ms_conn_event_init(ms_conn_t *c)
546 {
547 /* default event timeout 10 seconds */
548 struct timeval t=
549 {
550 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
551 };
552 short event_flags= EV_WRITE | EV_PERSIST;
553
554 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
555 event_base_set(ms_thread.base, &c->event);
556 c->ev_flags= event_flags;
557
558 if (c->total_sfds == 1)
559 {
560 if (event_add(&c->event, NULL) == -1)
561 {
562 return -1;
563 }
564 }
565 else
566 {
567 if (event_add(&c->event, &t) == -1)
568 {
569 return -1;
570 }
571 }
572
573 return 0;
574 } /* ms_conn_event_init */
575
576
577 /**
578 * setup a connection, each connection structure of each
579 * thread must call this function to initialize.
580 *
581 * @param c, pointer of the concurrency
582 *
583 * @return int, if success, return 0, else return -1
584 */
585 int ms_setup_conn(ms_conn_t *c)
586 {
587 if (ms_item_win_init(c) != 0)
588 {
589 return -1;
590 }
591
592 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
593 {
594 return -1;
595 }
596
597 if (ms_conn_sock_init(c) != 0)
598 {
599 return -1;
600 }
601
602 if (ms_conn_event_init(c) != 0)
603 {
604 return -1;
605 }
606
607 return 0;
608 } /* ms_setup_conn */
609
610
611 /**
612 * Frees a connection.
613 *
614 * @param c, pointer of the concurrency
615 */
616 void ms_conn_free(ms_conn_t *c)
617 {
618 if (c != NULL)
619 {
620 if (c->hdrbuf != NULL)
621 free(c->hdrbuf);
622 if (c->msglist != NULL)
623 free(c->msglist);
624 if (c->rbuf != NULL)
625 free(c->rbuf);
626 if (c->wbuf != NULL)
627 free(c->wbuf);
628 if (c->iov != NULL)
629 free(c->iov);
630 if (c->mlget_task.mlget_item != NULL)
631 free(c->mlget_task.mlget_item);
632 if (c->rudpbuf != NULL)
633 free(c->rudpbuf);
634 if (c->udppkt != NULL)
635 free(c->udppkt);
636 if (c->item_win != NULL)
637 free(c->item_win);
638 if (c->tcpsfd != NULL)
639 free(c->tcpsfd);
640
641 if (--ms_thread.nactive_conn == 0)
642 {
643 free(ms_thread.conn);
644 }
645 }
646 } /* ms_conn_free */
647
648
649 /**
650 * close a connection
651 *
652 * @param c, pointer of the concurrency
653 */
654 static void ms_conn_close(ms_conn_t *c)
655 {
656 assert(c != NULL);
657
658 /* delete the event, the socket and the connection */
659 event_del(&c->event);
660
661 for (int i= 0; i < c->total_sfds; i++)
662 {
663 if (c->tcpsfd[i] > 0)
664 {
665 close(c->tcpsfd[i]);
666 }
667 }
668 c->sfd= 0;
669
670 if (ms_setting.facebook_test)
671 {
672 close(c->udpsfd);
673 }
674
675 __sync_fetch_and_sub(&ms_stats.active_conns, 1);
676
677 ms_conn_free(c);
678
679 if (ms_setting.run_time == 0)
680 {
681 pthread_mutex_lock(&ms_global.run_lock.lock);
682 ms_global.run_lock.count++;
683 pthread_cond_signal(&ms_global.run_lock.cond);
684 pthread_mutex_unlock(&ms_global.run_lock.lock);
685 }
686
687 if (ms_thread.nactive_conn == 0)
688 {
689 pthread_exit(NULL);
690 }
691 } /* ms_conn_close */
692
693
694 /**
695 * create a new sock
696 *
697 * @param ai, server address information
698 *
699 * @return int, if success, return 0, else return -1
700 */
701 static int ms_new_socket(struct addrinfo *ai)
702 {
703 int sfd;
704
705 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
706 {
707 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
708 return -1;
709 }
710
711 return sfd;
712 } /* ms_new_socket */
713
714
715 /**
716 * Sets a socket's send buffer size to the maximum allowed by the system.
717 *
718 * @param sfd, file descriptor of socket
719 */
720 static void ms_maximize_sndbuf(const int sfd)
721 {
722 socklen_t intsize= sizeof(int);
723 unsigned int last_good= 0;
724 unsigned int min, max, avg;
725 unsigned int old_size;
726
727 /* Start with the default size. */
728 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
729 {
730 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
731 return;
732 }
733
734 /* Binary-search for the real maximum. */
735 min= old_size;
736 max= MAX_SENDBUF_SIZE;
737
738 while (min <= max)
739 {
740 avg= ((unsigned int)(min + max)) / 2;
741 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
742 {
743 last_good= avg;
744 min= avg + 1;
745 }
746 else
747 {
748 max= avg - 1;
749 }
750 }
751 } /* ms_maximize_sndbuf */
752
753
754 /**
755 * socket connects the server
756 *
757 * @param c, pointer of the concurrency
758 * @param srv_host_name, the host name of the server
759 * @param srv_port, port of server
760 * @param is_udp, whether it's udp
761 * @param ret_sfd, the connected socket file descriptor
762 *
763 * @return int, if success, return 0, else return -1
764 */
765 static int ms_network_connect(ms_conn_t *c,
766 char *srv_host_name,
767 const int srv_port,
768 const bool is_udp,
769 int *ret_sfd)
770 {
771 int sfd;
772 struct linger ling=
773 {
774 0, 0
775 };
776 struct addrinfo *ai;
777 struct addrinfo *next;
778 struct addrinfo hints;
779 char port_buf[NI_MAXSERV];
780 int error;
781 int success= 0;
782
783 int flags= 1;
784
785 /*
786 * the memset call clears nonstandard fields in some impementations
787 * that otherwise mess things up.
788 */
789 memset(&hints, 0, sizeof(hints));
790 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
791 if (is_udp)
792 {
793 hints.ai_protocol= IPPROTO_UDP;
794 hints.ai_socktype= SOCK_DGRAM;
795 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
796 }
797 else
798 {
799 hints.ai_family= AF_UNSPEC;
800 hints.ai_protocol= IPPROTO_TCP;
801 hints.ai_socktype= SOCK_STREAM;
802 }
803
804 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
805 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
806 if (error != 0)
807 {
808 if (error != EAI_SYSTEM)
809 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
810 else
811 perror("getaddrinfo()\n");
812
813 return -1;
814 }
815
816 for (next= ai; next; next= next->ai_next)
817 {
818 if ((sfd= ms_new_socket(next)) == -1)
819 {
820 freeaddrinfo(ai);
821 return -1;
822 }
823
824 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
825 if (is_udp)
826 {
827 ms_maximize_sndbuf(sfd);
828 }
829 else
830 {
831 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
832 sizeof(flags));
833 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
834 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
835 sizeof(flags));
836 }
837
838 if (is_udp)
839 {
840 c->srv_recv_addr_size= sizeof(struct sockaddr);
841 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
842 }
843 else
844 {
845 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
846 {
847 close(sfd);
848 freeaddrinfo(ai);
849 return -1;
850 }
851 }
852
853 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
854 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
855 {
856 fprintf(stderr, "setting O_NONBLOCK\n");
857 close(sfd);
858 freeaddrinfo(ai);
859 return -1;
860 }
861
862 if (ret_sfd != NULL)
863 {
864 *ret_sfd= sfd;
865 }
866
867 success++;
868 }
869
870 freeaddrinfo(ai);
871
872 /* Return zero if we detected no errors in starting up connections */
873 return success == 0;
874 } /* ms_network_connect */
875
876
877 /**
878 * reconnect a disconnected sock
879 *
880 * @param c, pointer of the concurrency
881 *
882 * @return int, if success, return 0, else return -1
883 */
884 static int ms_reconn(ms_conn_t *c)
885 {
886 int srv_idx= 0;
887 int srv_conn_cnt= 0;
888
889 if (ms_setting.rep_write_srv > 0)
890 {
891 srv_idx= c->cur_idx;
892 srv_conn_cnt= ms_setting.nconns;
893 }
894 else
895 {
896 srv_idx= ms_thread.thread_ctx->srv_idx;
897 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
898 }
899
900 /* close the old socket handler */
901 close(c->sfd);
902 c->tcpsfd[c->cur_idx]= 0;
903
904 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
905 % srv_conn_cnt == 0)
906 {
907 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
908 fprintf(stderr, "Server %s:%d disconnect\n",
909 ms_setting.servers[srv_idx].srv_host_name,
910 ms_setting.servers[srv_idx].srv_port);
911 }
912
913 if (ms_setting.rep_write_srv > 0)
914 {
915 int i= 0;
916 for (i= 0; i < c->total_sfds; i++)
917 {
918 if (c->tcpsfd[i] != 0)
919 {
920 break;
921 }
922 }
923
924 /* all socks disconnect */
925 if (i == c->total_sfds)
926 {
927 return -1;
928 }
929 }
930 else
931 {
932 do
933 {
934 /* reconnect success, break the loop */
935 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
936 ms_setting.servers[srv_idx].srv_port,
937 ms_setting.udp, &c->sfd) == 0)
938 {
939 c->tcpsfd[c->cur_idx]= c->sfd;
940 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
941 % srv_conn_cnt == 0)
942 {
943 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
944 int reconn_time=
945 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
946 - ms_setting.servers[srv_idx].disconn_time
947 .tv_sec);
948 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
949 ms_setting.servers[srv_idx].srv_host_name,
950 ms_setting.servers[srv_idx].srv_port, reconn_time);
951 }
952 break;
953 }
954
955 if (c->total_sfds == 1)
956 {
957 /* wait a second and reconnect */
958 sleep(1);
959 }
960 }
961 while (c->total_sfds == 1);
962 }
963
964 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
965 {
966 c->sfd= 0;
967 c->alive_sfds--;
968 }
969
970 return 0;
971 } /* ms_reconn */
972
973
974 /**
975 * reconnect several disconnected socks in the connection
976 * structure, the ever-1-second timer of the thread will check
977 * whether some socks in the connections disconnect. if
978 * disconnect, reconnect the sock.
979 *
980 * @param c, pointer of the concurrency
981 *
982 * @return int, if success, return 0, else return -1
983 */
984 int ms_reconn_socks(ms_conn_t *c)
985 {
986 int srv_idx= 0;
987 int ret_sfd= 0;
988 int srv_conn_cnt= 0;
989 struct timeval cur_time;
990
991 assert(c != NULL);
992
993 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
994 {
995 return 0;
996 }
997
998 for (int i= 0; i < c->total_sfds; i++)
999 {
1000 if (c->tcpsfd[i] == 0)
1001 {
1002 gettimeofday(&cur_time, NULL);
1003
1004 /**
1005 * For failover test of replication, reconnect the socks after
1006 * it disconnects more than 5 seconds, Otherwise memslap will
1007 * block at connect() function and the work threads can't work
1008 * in this interval.
1009 */
1010 if (cur_time.tv_sec
1011 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1012 {
1013 break;
1014 }
1015
1016 if (ms_setting.rep_write_srv > 0)
1017 {
1018 srv_idx= i;
1019 srv_conn_cnt= ms_setting.nconns;
1020 }
1021 else
1022 {
1023 srv_idx= ms_thread.thread_ctx->srv_idx;
1024 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1025 }
1026
1027 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1028 ms_setting.servers[srv_idx].srv_port,
1029 ms_setting.udp, &ret_sfd) == 0)
1030 {
1031 c->tcpsfd[i]= ret_sfd;
1032 c->alive_sfds++;
1033
1034 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1035 % srv_conn_cnt == 0)
1036 {
1037 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1038 int reconn_time=
1039 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1040 - ms_setting.servers[srv_idx].disconn_time
1041 .tv_sec);
1042 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1043 ms_setting.servers[srv_idx].srv_host_name,
1044 ms_setting.servers[srv_idx].srv_port, reconn_time);
1045 }
1046 }
1047 }
1048 }
1049
1050 return 0;
1051 } /* ms_reconn_socks */
1052
1053
1054 /**
1055 * Tokenize the command string by replacing whitespace with '\0' and update
1056 * the token array tokens with pointer to start of each token and length.
1057 * Returns total number of tokens. The last valid token is the terminal
1058 * token (value points to the first unprocessed character of the string and
1059 * length zero).
1060 *
1061 * Usage example:
1062 *
1063 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1064 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1065 * ...
1066 * }
1067 * ncommand = tokens[ix].value - command;
1068 * command = tokens[ix].value;
1069 * }
1070 *
1071 * @param command, the command string to token
1072 * @param tokens, array to store tokens
1073 * @param max_tokens, maximum tokens number
1074 *
1075 * @return int, the number of tokens
1076 */
1077 static int ms_tokenize_command(char *command,
1078 token_t *tokens,
1079 const int max_tokens)
1080 {
1081 char *s, *e;
1082 int ntokens= 0;
1083
1084 assert(command != NULL && tokens != NULL && max_tokens > 1);
1085
1086 for (s= e= command; ntokens < max_tokens - 1; ++e)
1087 {
1088 if (*e == ' ')
1089 {
1090 if (s != e)
1091 {
1092 tokens[ntokens].value= s;
1093 tokens[ntokens].length= (size_t)(e - s);
1094 ntokens++;
1095 *e= '\0';
1096 }
1097 s= e + 1;
1098 }
1099 else if (*e == '\0')
1100 {
1101 if (s != e)
1102 {
1103 tokens[ntokens].value= s;
1104 tokens[ntokens].length= (size_t)(e - s);
1105 ntokens++;
1106 }
1107
1108 break; /* string end */
1109 }
1110 }
1111
1112 return ntokens;
1113 } /* ms_tokenize_command */
1114
1115
1116 /**
1117 * parse the response of server.
1118 *
1119 * @param c, pointer of the concurrency
1120 * @param command, the string responded by server
1121 *
1122 * @return int, if the command completed return 0, else return
1123 * -1
1124 */
1125 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1126 {
1127 int ret= 0;
1128 int64_t value_len;
1129 char *buffer= command;
1130
1131 assert(c != NULL);
1132
1133 /**
1134 * for command get, we store the returned value into local buffer
1135 * then continue in ms_complete_nread().
1136 */
1137
1138 switch (buffer[0])
1139 {
1140 case 'V': /* VALUE || VERSION */
1141 if (buffer[1] == 'A') /* VALUE */
1142 {
1143 token_t tokens[MAX_TOKENS];
1144 ms_tokenize_command(command, tokens, MAX_TOKENS);
1145 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1146 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1147
1148 /*
1149 * We read the \r\n into the string since not doing so is more
1150 * cycles then the waster of memory to do so.
1151 *
1152 * We are null terminating through, which will most likely make
1153 * some people lazy about using the return length.
1154 */
1155 c->rvbytes= (int)(value_len + 2);
1156 c->readval= true;
1157 ret= -1;
1158 }
1159
1160 break;
1161
1162 case 'O': /* OK */
1163 c->currcmd.retstat= MCD_SUCCESS;
1164
1165 case 'S': /* STORED STATS SERVER_ERROR */
1166 if (buffer[2] == 'A') /* STORED STATS */
1167 { /* STATS*/
1168 c->currcmd.retstat= MCD_STAT;
1169 }
1170 else if (buffer[1] == 'E')
1171 {
1172 /* SERVER_ERROR */
1173 printf("<%d %s\n", c->sfd, buffer);
1174
1175 c->currcmd.retstat= MCD_SERVER_ERROR;
1176 }
1177 else if (buffer[1] == 'T')
1178 {
1179 /* STORED */
1180 c->currcmd.retstat= MCD_STORED;
1181 }
1182 else
1183 {
1184 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1185 }
1186 break;
1187
1188 case 'D': /* DELETED DATA */
1189 if (buffer[1] == 'E')
1190 {
1191 c->currcmd.retstat= MCD_DELETED;
1192 }
1193 else
1194 {
1195 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1196 }
1197
1198 break;
1199
1200 case 'N': /* NOT_FOUND NOT_STORED*/
1201 if (buffer[4] == 'F')
1202 {
1203 c->currcmd.retstat= MCD_NOTFOUND;
1204 }
1205 else if (buffer[4] == 'S')
1206 {
1207 printf("<%d %s\n", c->sfd, buffer);
1208 c->currcmd.retstat= MCD_NOTSTORED;
1209 }
1210 else
1211 {
1212 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1213 }
1214 break;
1215
1216 case 'E': /* PROTOCOL ERROR or END */
1217 if (buffer[1] == 'N')
1218 {
1219 /* END */
1220 c->currcmd.retstat= MCD_END;
1221 }
1222 else if (buffer[1] == 'R')
1223 {
1224 printf("<%d ERROR\n", c->sfd);
1225 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1226 }
1227 else if (buffer[1] == 'X')
1228 {
1229 c->currcmd.retstat= MCD_DATA_EXISTS;
1230 printf("<%d %s\n", c->sfd, buffer);
1231 }
1232 else
1233 {
1234 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1235 }
1236 break;
1237
1238 case 'C': /* CLIENT ERROR */
1239 printf("<%d %s\n", c->sfd, buffer);
1240 c->currcmd.retstat= MCD_CLIENT_ERROR;
1241 break;
1242
1243 default:
1244 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1245 break;
1246 } /* switch */
1247
1248 return ret;
1249 } /* ms_ascii_process_line */
1250
1251
1252 /**
1253 * after one operation completes, reset the concurrency
1254 *
1255 * @param c, pointer of the concurrency
1256 * @param timeout, whether it's timeout
1257 */
1258 void ms_reset_conn(ms_conn_t *c, bool timeout)
1259 {
1260 assert(c != NULL);
1261
1262 if (c->udp)
1263 {
1264 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1265 {
1266 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
1267 }
1268
1269 c->packets= 0;
1270 c->recvpkt= 0;
1271 c->pktcurr= 0;
1272 c->ordcurr= 0;
1273 c->rudpbytes= 0;
1274 }
1275 c->currcmd.isfinish= true;
1276 c->ctnwrite= false;
1277 c->rbytes= 0;
1278 c->rcurr= c->rbuf;
1279 ms_conn_set_state(c, conn_write);
1280 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1281
1282 if (timeout)
1283 {
1284 ms_drive_machine(c);
1285 }
1286 } /* ms_reset_conn */
1287
1288
1289 /**
1290 * if we have a complete line in the buffer, process it.
1291 *
1292 * @param c, pointer of the concurrency
1293 *
1294 * @return int, if success, return 0, else return -1
1295 */
1296 static int ms_try_read_line(ms_conn_t *c)
1297 {
1298 if (c->protocol == binary_prot)
1299 {
1300 /* Do we have the complete packet header? */
1301 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1302 {
1303 /* need more data! */
1304 return 0;
1305 }
1306 else
1307 {
1308 #ifdef NEED_ALIGN
1309 if (((long)(c->rcurr)) % 8 != 0)
1310 {
1311 /* must realign input buffer */
1312 memmove(c->rbuf, c->rcurr, c->rbytes);
1313 c->rcurr= c->rbuf;
1314 if (settings.verbose)
1315 {
1316 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1317 }
1318 }
1319 #endif
1320 protocol_binary_response_header *rsp;
1321 rsp= (protocol_binary_response_header *)c->rcurr;
1322
1323 c->binary_header= *rsp;
1324 c->binary_header.response.extlen= rsp->response.extlen;
1325 c->binary_header.response.keylen= ntohl(rsp->response.keylen);
1326 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1327 c->binary_header.response.status= ntohl(rsp->response.status);
1328
1329 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1330 {
1331 fprintf(stderr, "Invalid magic: %x\n",
1332 c->binary_header.response.magic);
1333 ms_conn_set_state(c, conn_closing);
1334 return 0;
1335 }
1336
1337 /* process this complete response */
1338 if (ms_bin_process_response(c) == 0)
1339 {
1340 /* current operation completed */
1341 ms_reset_conn(c, false);
1342 return -1;
1343 }
1344 else
1345 {
1346 c->rbytes-= (int32_t)sizeof(c->binary_header);
1347 c->rcurr+= sizeof(c->binary_header);
1348 }
1349 }
1350 }
1351 else
1352 {
1353 char *el, *cont;
1354
1355 assert(c != NULL);
1356 assert(c->rcurr <= (c->rbuf + c->rsize));
1357
1358 if (c->rbytes == 0)
1359 return 0;
1360
1361 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1362 if (! el)
1363 return 0;
1364
1365 cont= el + 1;
1366 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1367 {
1368 el--;
1369 }
1370 *el= '\0';
1371
1372 assert(cont <= (c->rcurr + c->rbytes));
1373
1374 /* process this complete line */
1375 if (ms_ascii_process_line(c, c->rcurr) == 0)
1376 {
1377 /* current operation completed */
1378 ms_reset_conn(c, false);
1379 return -1;
1380 }
1381 else
1382 {
1383 /* current operation didn't complete */
1384 c->rbytes-= (int32_t)(cont - c->rcurr);
1385 c->rcurr= cont;
1386 }
1387
1388 assert(c->rcurr <= (c->rbuf + c->rsize));
1389 }
1390
1391 return -1;
1392 } /* ms_try_read_line */
1393
1394
1395 /**
1396 * because the packet of UDP can't ensure the order, the
1397 * function is used to sort the received udp packet.
1398 *
1399 * @param c, pointer of the concurrency
1400 * @param buf, the buffer to store the ordered packages data
1401 * @param rbytes, the maximum capacity of the buffer
1402 *
1403 * @return int, if success, return the copy bytes, else return
1404 * -1
1405 */
1406 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1407 {
1408 int len= 0;
1409 int wbytes= 0;
1410 uint16_t req_id= 0;
1411 uint16_t seq_num= 0;
1412 uint16_t packets= 0;
1413 unsigned char *header= NULL;
1414
1415 /* no enough data */
1416 assert(c != NULL);
1417 assert(buf != NULL);
1418 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1419
1420 /* calculate received packets count */
1421 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1422 {
1423 /* the last packet has some data */
1424 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1425 }
1426 else
1427 {
1428 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1429 }
1430
1431 /* get the total packets count if necessary */
1432 if (c->packets == 0)
1433 {
1434 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1435 }
1436
1437 /* build the ordered packet array */
1438 for (int i= c->pktcurr; i < c->recvpkt; i++)
1439 {
1440 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1441 req_id= (uint16_t)HEADER_TO_REQID(header);
1442 assert(req_id == c->request_id % (1 << 16));
1443
1444 packets= (uint16_t)HEADER_TO_PACKETS(header);
1445 assert(c->packets == HEADER_TO_PACKETS(header));
1446
1447 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1448 c->udppkt[seq_num].header= header;
1449 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1450
1451 if (i == c->recvpkt - 1)
1452 {
1453 /* last received packet */
1454 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1455 {
1456 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1457 c->pktcurr++;
1458 }
1459 else
1460 {
1461 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1462 - UDP_HEADER_SIZE;
1463 }
1464 }
1465 else
1466 {
1467 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1468 c->pktcurr++;
1469 }
1470 }
1471
1472 for (int i= c->ordcurr; i < c->recvpkt; i++)
1473 {
1474 /* there is some data to copy */
1475 if ((c->udppkt[i].data != NULL)
1476 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1477 {
1478 header= c->udppkt[i].header;
1479 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1480 if (len > rbytes - wbytes)
1481 {
1482 len= rbytes - wbytes;
1483 }
1484
1485 assert(len <= rbytes - wbytes);
1486 assert(i == HEADER_TO_SEQNUM(header));
1487
1488 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1489 (size_t)len);
1490 wbytes+= len;
1491 c->udppkt[i].copybytes+= len;
1492
1493 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1494 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1495 {
1496 /* finish copying all the data of this packet, next */
1497 c->ordcurr++;
1498 }
1499
1500 /* last received packet, and finish copying all the data */
1501 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1502 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1503 {
1504 break;
1505 }
1506
1507 /* no space to copy data */
1508 if (wbytes >= rbytes)
1509 {
1510 break;
1511 }
1512
1513 /* it doesn't finish reading all the data of the packet from network */
1514 if ((i != c->recvpkt - 1)
1515 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1516 {
1517 break;
1518 }
1519 }
1520 else
1521 {
1522 /* no data to copy */
1523 break;
1524 }
1525 }
1526
1527 return wbytes == 0 ? -1 : wbytes;
1528 } /* ms_sort_udp_packet */
1529
1530
1531 /**
1532 * encapsulate upd read like tcp read
1533 *
1534 * @param c, pointer of the concurrency
1535 * @param buf, read buffer
1536 * @param len, length to read
1537 *
1538 * @return int, if success, return the read bytes, else return
1539 * -1
1540 */
1541 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1542 {
1543 int res= 0;
1544 int avail= 0;
1545 int rbytes= 0;
1546 int copybytes= 0;
1547
1548 assert(c->udp);
1549
1550 while (1)
1551 {
1552 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1553 {
1554 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1555 if (! new_rbuf)
1556 {
1557 fprintf(stderr, "Couldn't realloc input buffer.\n");
1558 c->rudpbytes= 0; /* ignore what we read */
1559 return -1;
1560 }
1561 c->rudpbuf= new_rbuf;
1562 c->rudpsize*= 2;
1563 }
1564
1565 avail= c->rudpsize - c->rudpbytes;
1566 /* UDP each time read a packet, 1400 bytes */
1567 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1568
1569 if (res > 0)
1570 {
1571 __sync_fetch_and_add(&ms_stats.bytes_read, res);
1572 c->rudpbytes+= res;
1573 rbytes+= res;
1574 if (res == avail)
1575 {
1576 continue;
1577 }
1578 else
1579 {
1580 break;
1581 }
1582 }
1583
1584 if (res == 0)
1585 {
1586 /* "connection" closed */
1587 return res;
1588 }
1589
1590 if (res == -1)
1591 {
1592 /* no data to read */
1593 return res;
1594 }
1595 }
1596
1597 /* copy data to read buffer */
1598 if (rbytes > 0)
1599 {
1600 copybytes= ms_sort_udp_packet(c, buf, len);
1601 }
1602
1603 if (copybytes == -1)
1604 {
1605 __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
1606 }
1607
1608 return copybytes;
1609 } /* ms_udp_read */
1610
1611
1612 /*
1613 * read from network as much as we can, handle buffer overflow and connection
1614 * close.
1615 * before reading, move the remaining incomplete fragment of a command
1616 * (if any) to the beginning of the buffer.
1617 * return 0 if there's nothing to read on the first read.
1618 */
1619
1620 /**
1621 * read from network as much as we can, handle buffer overflow and connection
1622 * close. before reading, move the remaining incomplete fragment of a command
1623 * (if any) to the beginning of the buffer.
1624 *
1625 * @param c, pointer of the concurrency
1626 *
1627 * @return int,
1628 * return 0 if there's nothing to read on the first read.
1629 * return 1 if get data
1630 * return -1 if error happens
1631 */
1632 static int ms_try_read_network(ms_conn_t *c)
1633 {
1634 int gotdata= 0;
1635 int res;
1636 int64_t avail;
1637
1638 assert(c != NULL);
1639
1640 if ((c->rcurr != c->rbuf)
1641 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1642 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1643 {
1644 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1645 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1646 c->rcurr= c->rbuf;
1647 }
1648
1649 while (1)
1650 {
1651 if (c->rbytes >= c->rsize)
1652 {
1653 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1654 if (! new_rbuf)
1655 {
1656 fprintf(stderr, "Couldn't realloc input buffer.\n");
1657 c->rbytes= 0; /* ignore what we read */
1658 return -1;
1659 }
1660 c->rcurr= c->rbuf= new_rbuf;
1661 c->rsize*= 2;
1662 }
1663
1664 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1665 if (avail == 0)
1666 {
1667 break;
1668 }
1669
1670 if (c->udp)
1671 {
1672 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1673 }
1674 else
1675 {
1676 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1677 }
1678
1679 if (res > 0)
1680 {
1681 if (! c->udp)
1682 {
1683 __sync_fetch_and_add(&ms_stats.bytes_read, res);
1684 }
1685 gotdata= 1;
1686 c->rbytes+= res;
1687 if (res == avail)
1688 {
1689 continue;
1690 }
1691 else
1692 {
1693 break;
1694 }
1695 }
1696 if (res == 0)
1697 {
1698 /* connection closed */
1699 ms_conn_set_state(c, conn_closing);
1700 return -1;
1701 }
1702 if (res == -1)
1703 {
1704 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1705 break;
1706 /* Should close on unhandled errors. */
1707 ms_conn_set_state(c, conn_closing);
1708 return -1;
1709 }
1710 }
1711
1712 return gotdata;
1713 } /* ms_try_read_network */
1714
1715
1716 /**
1717 * after get the object from server, verify the value if
1718 * necessary.
1719 *
1720 * @param c, pointer of the concurrency
1721 * @param mlget_item, pointer of mulit-get task item structure
1722 * @param value, received value string
1723 * @param vlen, received value string length
1724 */
1725 static void ms_verify_value(ms_conn_t *c,
1726 ms_mlget_task_item_t *mlget_item,
1727 char *value,
1728 int vlen)
1729 {
1730 if (c->curr_task.verify)
1731 {
1732 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1733 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1734 char *orignkey=
1735 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1736
1737 /* verify expire time if necessary */
1738 if (c->curr_task.item->exp_time > 0)
1739 {
1740 struct timeval curr_time;
1741 gettimeofday(&curr_time, NULL);
1742
1743 /* object expired but get it now */
1744 if (curr_time.tv_sec - c->curr_task.item->client_time
1745 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1746 {
1747 __sync_fetch_and_add(&ms_stats.exp_get, 1);
1748
1749 if (ms_setting.verbose)
1750 {
1751 char set_time[64];
1752 char cur_time[64];
1753 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1754 localtime(&c->curr_task.item->client_time));
1755 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1756 localtime(&curr_time.tv_sec));
1757 fprintf(stderr,
1758 "\n<%d expire time verification failed, "
1759 "object expired but get it now\n"
1760 "\tkey len: %d\n"
1761 "\tkey: %lx %.*s\n"
1762 "\tset time: %s current time: %s "
1763 "diff time: %d expire time: %d\n"
1764 "\texpected data: \n"
1765 "\treceived data len: %d\n"
1766 "\treceived data: %.*s\n",
1767 c->sfd,
1768 c->curr_task.item->key_size,
1769 c->curr_task.item->key_prefix,
1770 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1771 orignkey,
1772 set_time,
1773 cur_time,
1774 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1775 c->curr_task.item->exp_time,
1776 vlen,
1777 vlen,
1778 value);
1779 fflush(stderr);
1780 }
1781 }
1782 }
1783 else
1784 {
1785 if ((c->curr_task.item->value_size != vlen)
1786 || (memcmp(orignval, value, (size_t)vlen) != 0))
1787 {
1788 __sync_fetch_and_add(&ms_stats.vef_failed, 1);
1789
1790 if (ms_setting.verbose)
1791 {
1792 fprintf(stderr,
1793 "\n<%d data verification failed\n"
1794 "\tkey len: %d\n"
1795 "\tkey: %lx %.*s\n"
1796 "\texpected data len: %d\n"
1797 "\texpected data: %.*s\n"
1798 "\treceived data len: %d\n"
1799 "\treceived data: %.*s\n",
1800 c->sfd,
1801 c->curr_task.item->key_size,
1802 c->curr_task.item->key_prefix,
1803 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1804 orignkey,
1805 c->curr_task.item->value_size,
1806 c->curr_task.item->value_size,
1807 orignval,
1808 vlen,
1809 vlen,
1810 value);
1811 fflush(stderr);
1812 }
1813 }
1814 }
1815
1816 c->curr_task.finish_verify= true;
1817
1818 if (mlget_item != NULL)
1819 {
1820 mlget_item->finish_verify= true;
1821 }
1822 }
1823 } /* ms_verify_value */
1824
1825
1826 /**
1827 * For ASCII protocol, after store the data into the local
1828 * buffer, run this function to handle the data.
1829 *
1830 * @param c, pointer of the concurrency
1831 */
1832 static void ms_ascii_complete_nread(ms_conn_t *c)
1833 {
1834 assert(c != NULL);
1835 assert(c->rbytes >= c->rvbytes);
1836 assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
1837 if (c->rvbytes > 2)
1838 {
1839 assert(
1840 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1841 }
1842
1843 /* multi-get */
1844 ms_mlget_task_item_t *mlget_item= NULL;
1845 if (((ms_setting.mult_key_num > 1)
1846 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1847 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1848 {
1849 c->mlget_task.value_index++;
1850 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1851
1852 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1853 {
1854 c->curr_task.item= mlget_item->item;
1855 c->curr_task.verify= mlget_item->verify;
1856 c->curr_task.finish_verify= mlget_item->finish_verify;
1857 mlget_item->get_miss= false;
1858 }
1859 else
1860 {
1861 /* Try to find the task item in multi-get task array */
1862 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1863 {
1864 mlget_item= &c->mlget_task.mlget_item[i];
1865 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1866 {
1867 c->curr_task.item= mlget_item->item;
1868 c->curr_task.verify= mlget_item->verify;
1869 c->curr_task.finish_verify= mlget_item->finish_verify;
1870 mlget_item->get_miss= false;
1871
1872 break;
1873 }
1874 }
1875 }
1876 }
1877
1878 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1879
1880 c->curr_task.get_miss= false;
1881 c->rbytes-= c->rvbytes;
1882 c->rcurr= c->rcurr + c->rvbytes;
1883 assert(c->rcurr <= (c->rbuf + c->rsize));
1884 c->readval= false;
1885 c->rvbytes= 0;
1886 } /* ms_ascii_complete_nread */
1887
1888
1889 /**
1890 * For binary protocol, after store the data into the local
1891 * buffer, run this function to handle the data.
1892 *
1893 * @param c, pointer of the concurrency
1894 */
1895 static void ms_bin_complete_nread(ms_conn_t *c)
1896 {
1897 assert(c != NULL);
1898 assert(c->rbytes >= c->rvbytes);
1899 assert(c->protocol == binary_prot);
1900
1901 int extlen= c->binary_header.response.extlen;
1902 int keylen= c->binary_header.response.keylen;
1903 uint8_t opcode= c->binary_header.response.opcode;
1904
1905 /* not get command or not include value, just return */
1906 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1907 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1908 || (c->rvbytes <= extlen + keylen))
1909 {
1910 /* get miss */
1911 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1912 {
1913 c->currcmd.retstat= MCD_END;
1914 c->curr_task.get_miss= true;
1915 }
1916
1917 c->readval= false;
1918 c->rvbytes= 0;
1919 ms_reset_conn(c, false);
1920 return;
1921 }
1922
1923 /* multi-get */
1924 ms_mlget_task_item_t *mlget_item= NULL;
1925 if (((ms_setting.mult_key_num > 1)
1926 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1927 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1928 {
1929 c->mlget_task.value_index++;
1930 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1931
1932 c->curr_task.item= mlget_item->item;
1933 c->curr_task.verify= mlget_item->verify;
1934 c->curr_task.finish_verify= mlget_item->finish_verify;
1935 mlget_item->get_miss= false;
1936 }
1937
1938 ms_verify_value(c,
1939 mlget_item,
1940 c->rcurr + extlen + keylen,
1941 c->rvbytes - extlen - keylen);
1942
1943 c->currcmd.retstat= MCD_END;
1944 c->curr_task.get_miss= false;
1945 c->rbytes-= c->rvbytes;
1946 c->rcurr= c->rcurr + c->rvbytes;
1947 assert(c->rcurr <= (c->rbuf + c->rsize));
1948 c->readval= false;
1949 c->rvbytes= 0;
1950
1951 if (ms_setting.mult_key_num > 1)
1952 {
1953 /* multi-get have check all the item */
1954 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1955 {
1956 ms_reset_conn(c, false);
1957 }
1958 }
1959 else
1960 {
1961 /* single get */
1962 ms_reset_conn(c, false);
1963 }
1964 } /* ms_bin_complete_nread */
1965
1966
1967 /**
1968 * we get here after reading the value of get commands.
1969 *
1970 * @param c, pointer of the concurrency
1971 */
1972 static void ms_complete_nread(ms_conn_t *c)
1973 {
1974 assert(c != NULL);
1975 assert(c->rbytes >= c->rvbytes);
1976 assert(c->protocol == ascii_udp_prot
1977 || c->protocol == ascii_prot
1978 || c->protocol == binary_prot);
1979
1980 if (c->protocol == binary_prot)
1981 {
1982 ms_bin_complete_nread(c);
1983 }
1984 else
1985 {
1986 ms_ascii_complete_nread(c);
1987 }
1988 } /* ms_complete_nread */
1989
1990
1991 /**
1992 * Adds a message header to a connection.
1993 *
1994 * @param c, pointer of the concurrency
1995 *
1996 * @return int, if success, return 0, else return -1
1997 */
1998 static int ms_add_msghdr(ms_conn_t *c)
1999 {
2000 struct msghdr *msg;
2001
2002 assert(c != NULL);
2003
2004 if (c->msgsize == c->msgused)
2005 {
2006 msg=
2007 realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
2008 if (! msg)
2009 return -1;
2010
2011 c->msglist= msg;
2012 c->msgsize*= 2;
2013 }
2014
2015 msg= c->msglist + c->msgused;
2016
2017 /**
2018 * this wipes msg_iovlen, msg_control, msg_controllen, and
2019 * msg_flags, the last 3 of which aren't defined on solaris:
2020 */
2021 memset(msg, 0, sizeof(struct msghdr));
2022
2023 msg->msg_iov= &c->iov[c->iovused];
2024
2025 if (c->udp && (c->srv_recv_addr_size > 0))
2026 {
2027 msg->msg_name= &c->srv_recv_addr;
2028 msg->msg_namelen= c->srv_recv_addr_size;
2029 }
2030
2031 c->msgbytes= 0;
2032 c->msgused++;
2033
2034 if (c->udp)
2035 {
2036 /* Leave room for the UDP header, which we'll fill in later. */
2037 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2038 }
2039
2040 return 0;
2041 } /* ms_add_msghdr */
2042
2043
2044 /**
2045 * Ensures that there is room for another structure iovec in a connection's
2046 * iov list.
2047 *
2048 * @param c, pointer of the concurrency
2049 *
2050 * @return int, if success, return 0, else return -1
2051 */
2052 static int ms_ensure_iov_space(ms_conn_t *c)
2053 {
2054 assert(c != NULL);
2055
2056 if (c->iovused >= c->iovsize)
2057 {
2058 int i, iovnum;
2059 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2060 ((uint64_t)c->iovsize
2061 * 2)
2062 * sizeof(struct iovec));
2063 if (! new_iov)
2064 return -1;
2065
2066 c->iov= new_iov;
2067 c->iovsize*= 2;
2068
2069 /* Point all the msghdr structures at the new list. */
2070 for (i= 0, iovnum= 0; i < c->msgused; i++)
2071 {
2072 c->msglist[i].msg_iov= &c->iov[iovnum];
2073 iovnum+= (int)c->msglist[i].msg_iovlen;
2074 }
2075 }
2076
2077 return 0;
2078 } /* ms_ensure_iov_space */
2079
2080
2081 /**
2082 * Adds data to the list of pending data that will be written out to a
2083 * connection.
2084 *
2085 * @param c, pointer of the concurrency
2086 * @param buf, the buffer includes data to send
2087 * @param len, the data length in the buffer
2088 *
2089 * @return int, if success, return 0, else return -1
2090 */
2091 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2092 {
2093 struct msghdr *m;
2094 int leftover;
2095 bool limit_to_mtu;
2096
2097 assert(c != NULL);
2098
2099 do
2100 {
2101 m= &c->msglist[c->msgused - 1];
2102
2103 /*
2104 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2105 */
2106 limit_to_mtu= c->udp;
2107
2108 /* We may need to start a new msghdr if this one is full. */
2109 if ((m->msg_iovlen == IOV_MAX)
2110 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2111 {
2112 ms_add_msghdr(c);
2113 m= &c->msglist[c->msgused - 1];
2114 }
2115
2116 if (ms_ensure_iov_space(c) != 0)
2117 return -1;
2118
2119 /* If the fragment is too big to fit in the datagram, split it up */
2120 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2121 {
2122 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2123 len-= leftover;
2124 }
2125 else
2126 {
2127 leftover= 0;
2128 }
2129
2130 m= &c->msglist[c->msgused - 1];
2131 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2132 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2133
2134 c->msgbytes+= len;
2135 c->iovused++;
2136 m->msg_iovlen++;
2137
2138 buf= ((char *)buf) + len;
2139 len= leftover;
2140 }
2141 while (leftover > 0);
2142
2143 return 0;
2144 } /* ms_add_iov */
2145
2146
2147 /**
2148 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2149 *
2150 * @param c, pointer of the concurrency
2151 *
2152 * @return int, if success, return 0, else return -1
2153 */
2154 static int ms_build_udp_headers(ms_conn_t *c)
2155 {
2156 int i;
2157 unsigned char *hdr;
2158
2159 assert(c != NULL);
2160
2161 c->request_id= ms_get_udp_request_id();
2162
2163 if (c->msgused > c->hdrsize)
2164 {
2165 void *new_hdrbuf;
2166 if (c->hdrbuf)
2167 new_hdrbuf= realloc(c->hdrbuf,
2168 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2169 else
2170 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2171 if (! new_hdrbuf)
2172 return -1;
2173
2174 c->hdrbuf= (unsigned char *)new_hdrbuf;
2175 c->hdrsize= c->msgused * 2;
2176 }
2177
2178 /* If this is a multi-packet request, drop it. */
2179 if (c->udp && (c->msgused > 1))
2180 {
2181 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2182 return -1;
2183 }
2184
2185 hdr= c->hdrbuf;
2186 for (i= 0; i < c->msgused; i++)
2187 {
2188 c->msglist[i].msg_iov[0].iov_base= hdr;
2189 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2190 *hdr++= (unsigned char)(c->request_id / 256);
2191 *hdr++= (unsigned char)(c->request_id % 256);
2192 *hdr++= (unsigned char)(i / 256);
2193 *hdr++= (unsigned char)(i % 256);
2194 *hdr++= (unsigned char)(c->msgused / 256);
2195 *hdr++= (unsigned char)(c->msgused % 256);
2196 *hdr++= (unsigned char)1; /* support facebook memcached */
2197 *hdr++= (unsigned char)0;
2198 assert(hdr ==
2199 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2200 + UDP_HEADER_SIZE));
2201 }
2202
2203 return 0;
2204 } /* ms_build_udp_headers */
2205
2206
2207 /**
2208 * Transmit the next chunk of data from our list of msgbuf structures.
2209 *
2210 * @param c, pointer of the concurrency
2211 *
2212 * @return TRANSMIT_COMPLETE All done writing.
2213 * TRANSMIT_INCOMPLETE More data remaining to write.
2214 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2215 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2216 */
2217 static int ms_transmit(ms_conn_t *c)
2218 {
2219 assert(c != NULL);
2220
2221 if ((c->msgcurr < c->msgused)
2222 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2223 {
2224 /* Finished writing the current msg; advance to the next. */
2225 c->msgcurr++;
2226 }
2227
2228 if (c->msgcurr < c->msgused)
2229 {
2230 ssize_t res;
2231 struct msghdr *m= &c->msglist[c->msgcurr];
2232
2233 res= sendmsg(c->sfd, m, 0);
2234 if (res > 0)
2235 {
2236 __sync_fetch_and_add(&ms_stats.bytes_written, res);
2237
2238 /* We've written some of the data. Remove the completed
2239 * iovec entries from the list of pending writes. */
2240 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2241 {
2242 res-= (ssize_t)m->msg_iov->iov_len;
2243 m->msg_iovlen--;
2244 m->msg_iov++;
2245 }
2246
2247 /* Might have written just part of the last iovec entry;
2248 * adjust it so the next write will do the rest. */
2249 if (res > 0)
2250 {
2251 m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res;
2252 m->msg_iov->iov_len-= (uint64_t)res;
2253 }
2254 return TRANSMIT_INCOMPLETE;
2255 }
2256 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2257 {
2258 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2259 {
2260 fprintf(stderr, "Couldn't update event.\n");
2261 ms_conn_set_state(c, conn_closing);
2262 return TRANSMIT_HARD_ERROR;
2263 }
2264 return TRANSMIT_SOFT_ERROR;
2265 }
2266
2267 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2268 * we have a real error, on which we close the connection */
2269 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2270
2271 ms_conn_set_state(c, conn_closing);
2272 return TRANSMIT_HARD_ERROR;
2273 }
2274 else
2275 {
2276 return TRANSMIT_COMPLETE;
2277 }
2278 } /* ms_transmit */
2279
2280
2281 /**
2282 * Shrinks a connection's buffers if they're too big. This prevents
2283 * periodic large "mget" response from server chewing lots of client
2284 * memory.
2285 *
2286 * This should only be called in between requests since it can wipe output
2287 * buffers!
2288 *
2289 * @param c, pointer of the concurrency
2290 */
2291 static void ms_conn_shrink(ms_conn_t *c)
2292 {
2293 assert(c != NULL);
2294
2295 if (c->udp)
2296 return;
2297
2298 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2299 {
2300 char *newbuf;
2301
2302 if (c->rcurr != c->rbuf)
2303 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2304
2305 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2306
2307 if (newbuf)
2308 {
2309 c->rbuf= newbuf;
2310 c->rsize= DATA_BUFFER_SIZE;
2311 }
2312 c->rcurr= c->rbuf;
2313 }
2314
2315 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2316 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2317 {
2318 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2319 if (! new_rbuf)
2320 {
2321 c->rudpbuf= new_rbuf;
2322 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2323 }
2324 /* TODO check error condition? */
2325 }
2326
2327 if (c->msgsize > MSG_LIST_HIGHWAT)
2328 {
2329 struct msghdr *newbuf= (struct msghdr *)realloc(
2330 (void *)c->msglist,
2331 MSG_LIST_INITIAL
2332 * sizeof(c->msglist[0]));
2333 if (newbuf)
2334 {
2335 c->msglist= newbuf;
2336 c->msgsize= MSG_LIST_INITIAL;
2337 }
2338 /* TODO check error condition? */
2339 }
2340
2341 if (c->iovsize > IOV_LIST_HIGHWAT)
2342 {
2343 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2344 IOV_LIST_INITIAL
2345 * sizeof(c->iov[0]));
2346 if (newbuf)
2347 {
2348 c->iov= newbuf;
2349 c->iovsize= IOV_LIST_INITIAL;
2350 }
2351 /* TODO check return value */
2352 }
2353 } /* ms_conn_shrink */
2354
2355
2356 /**
2357 * Sets a connection's current state in the state machine. Any special
2358 * processing that needs to happen on certain state transitions can
2359 * happen here.
2360 *
2361 * @param c, pointer of the concurrency
2362 * @param state, connection state
2363 */
2364 static void ms_conn_set_state(ms_conn_t *c, int state)
2365 {
2366 assert(c != NULL);
2367
2368 if (state != c->state)
2369 {
2370 if (state == conn_read)
2371 {
2372 ms_conn_shrink(c);
2373 }
2374 c->state= state;
2375 }
2376 } /* ms_conn_set_state */
2377
2378
2379 /**
2380 * update the event if socks change state. for example: when
2381 * change the listen scoket read event to sock write event, or
2382 * change socket handler, we could call this function.
2383 *
2384 * @param c, pointer of the concurrency
2385 * @param new_flags, new event flags
2386 *
2387 * @return bool, if success, return true, else return false
2388 */
2389 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2390 {
2391 /* default event timeout 10 seconds */
2392 struct timeval t=
2393 {
2394 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
2395 };
2396
2397 assert(c != NULL);
2398
2399 struct event_base *base= c->event.ev_base;
2400 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2401 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2402 {
2403 return true;
2404 }
2405
2406 if (event_del(&c->event) == -1)
2407 {
2408 /* try to delete the event again */
2409 if (event_del(&c->event) == -1)
2410 {
2411 return false;
2412 }
2413 }
2414
2415 event_set(&c->event,
2416 c->sfd,
2417 (short)new_flags,
2418 ms_event_handler,
2419 (void *)c);
2420 event_base_set(base, &c->event);
2421 c->ev_flags= (short)new_flags;
2422
2423 if (c->total_sfds == 1)
2424 {
2425 if (event_add(&c->event, NULL) == -1)
2426 {
2427 return false;
2428 }
2429 }
2430 else
2431 {
2432 if (event_add(&c->event, &t) == -1)
2433 {
2434 return false;
2435 }
2436 }
2437
2438 return true;
2439 } /* ms_update_event */
2440
2441
2442 /**
2443 * If user want to get the expected throughput, we could limit
2444 * the performance of memslap. we could give up some work and
2445 * just wait a short time. The function is used to check this
2446 * case.
2447 *
2448 * @param c, pointer of the concurrency
2449 *
2450 * @return bool, if success, return true, else return false
2451 */
2452 static bool ms_need_yield(ms_conn_t *c)
2453 {
2454 int64_t tps= 0;
2455 int64_t time_diff= 0;
2456 struct timeval curr_time;
2457 ms_task_t *task= &c->curr_task;
2458
2459 if (ms_setting.expected_tps > 0)
2460 {
2461 gettimeofday(&curr_time, NULL);
2462 time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time);
2463 tps=
2464 (int64_t)((task->get_opt
2465 + task->set_opt) / ((uint64_t)time_diff / 1000000));
2466
2467 /* current throughput is greater than expected throughput */
2468 if (tps > ms_thread.thread_ctx->tps_perconn)
2469 {
2470 return true;
2471 }
2472 }
2473
2474 return false;
2475 } /* ms_need_yield */
2476
2477
2478 /**
2479 * used to update the start time of each operation
2480 *
2481 * @param c, pointer of the concurrency
2482 */
2483 static void ms_update_start_time(ms_conn_t *c)
2484 {
2485 ms_task_item_t *item= c->curr_task.item;
2486
2487 if ((ms_setting.stat_freq > 0) || c->udp
2488 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2489 {
2490 gettimeofday(&c->start_time, NULL);
2491 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2492 {
2493 /* record the current time */
2494 item->client_time= c->start_time.tv_sec;
2495 }
2496 }
2497 } /* ms_update_start_time */
2498
2499
2500 /**
2501 * run the state machine
2502 *
2503 * @param c, pointer of the concurrency
2504 */
2505 static void ms_drive_machine(ms_conn_t *c)
2506 {
2507 bool stop= false;
2508
2509 assert(c != NULL);
2510
2511 while (! stop)
2512 {
2513 switch (c->state)
2514 {
2515 case conn_read:
2516 if (c->readval)
2517 {
2518 if (c->rbytes >= c->rvbytes)
2519 {
2520 ms_complete_nread(c);
2521 break;
2522 }
2523 }
2524 else
2525 {
2526 if (ms_try_read_line(c) != 0)
2527 {
2528 break;
2529 }
2530 }
2531
2532 if (ms_try_read_network(c) != 0)
2533 {
2534 break;
2535 }
2536
2537 /* doesn't read all the response data, wait event wake up */
2538 if (! c->currcmd.isfinish)
2539 {
2540 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2541 {
2542 fprintf(stderr, "Couldn't update event.\n");
2543 ms_conn_set_state(c, conn_closing);
2544 break;
2545 }
2546 stop= true;
2547 break;
2548 }
2549
2550 /* we have no command line and no data to read from network, next write */
2551 ms_conn_set_state(c, conn_write);
2552 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2553
2554 break;
2555
2556 case conn_write:
2557 if (! c->ctnwrite && ms_need_yield(c))
2558 {
2559 usleep(10);
2560
2561 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2562 {
2563 fprintf(stderr, "Couldn't update event.\n");
2564 ms_conn_set_state(c, conn_closing);
2565 break;
2566 }
2567 stop= true;
2568 break;
2569 }
2570
2571 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2572 {
2573 ms_conn_set_state(c, conn_closing);
2574 break;
2575 }
2576
2577 /* record the start time before starting to send data if necessary */
2578 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2579 {
2580 if (c->change_sfd)
2581 {
2582 c->change_sfd= false;
2583 }
2584 ms_update_start_time(c);
2585 }
2586
2587 /* change sfd if necessary */
2588 if (c->change_sfd)
2589 {
2590 c->ctnwrite= true;
2591 stop= true;
2592 break;
2593 }
2594
2595 /* execute task until nothing need be written to network */
2596 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2597 {
2598 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2599 {
2600 fprintf(stderr, "Couldn't update event.\n");
2601 ms_conn_set_state(c, conn_closing);
2602 break;
2603 }
2604 stop= true;
2605 break;
2606 }
2607
2608 switch (ms_transmit(c))
2609 {
2610 case TRANSMIT_COMPLETE:
2611 /* we have no data to write to network, next wait repose */
2612 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2613 {
2614 fprintf(stderr, "Couldn't update event.\n");
2615 ms_conn_set_state(c, conn_closing);
2616 c->ctnwrite= false;
2617 break;
2618 }
2619 ms_conn_set_state(c, conn_read);
2620 c->ctnwrite= false;
2621 stop= true;
2622 break;
2623
2624 case TRANSMIT_INCOMPLETE:
2625 c->ctnwrite= true;
2626 break; /* Continue in state machine. */
2627
2628 case TRANSMIT_HARD_ERROR:
2629 c->ctnwrite= false;
2630 break;
2631
2632 case TRANSMIT_SOFT_ERROR:
2633 c->ctnwrite= true;
2634 stop= true;
2635 break;
2636
2637 default:
2638 break;
2639 } /* switch */
2640
2641 break;
2642
2643 case conn_closing:
2644 /* recovery mode, need reconnect if connection close */
2645 if (ms_setting.reconnect && (! ms_global.time_out
2646 || ((ms_setting.run_time == 0)
2647 && (c->remain_exec_num > 0))))
2648 {
2649 if (ms_reconn(c) != 0)
2650 {
2651 ms_conn_close(c);
2652 stop= true;
2653 break;
2654 }
2655
2656 ms_reset_conn(c, false);
2657
2658 if (c->total_sfds == 1)
2659 {
2660 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2661 {
2662 fprintf(stderr, "Couldn't update event.\n");
2663 ms_conn_set_state(c, conn_closing);
2664 break;
2665 }
2666 }
2667
2668 break;
2669 }
2670 else
2671 {
2672 ms_conn_close(c);
2673 stop= true;
2674 break;
2675 }
2676
2677 default:
2678 assert(0);
2679 } /* switch */
2680 }
2681 } /* ms_drive_machine */
2682
2683
2684 /**
2685 * the event handler of each thread
2686 *
2687 * @param fd, the file descriptor of socket
2688 * @param which, event flag
2689 * @param arg, argument
2690 */
2691 void ms_event_handler(const int fd, const short which, void *arg)
2692 {
2693 ms_conn_t *c= (ms_conn_t *)arg;
2694
2695 assert(c != NULL);
2696
2697 c->which= which;
2698
2699 /* sanity */
2700 if (fd != c->sfd)
2701 {
2702 fprintf(stderr,
2703 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2704 fd,
2705 c->sfd);
2706 ms_conn_close(c);
2707 exit(1);
2708 }
2709 assert(fd == c->sfd);
2710
2711 /* event timeout, close the current connection */
2712 if (c->which == EV_TIMEOUT)
2713 {
2714 ms_conn_set_state(c, conn_closing);
2715 }
2716
2717 ms_drive_machine(c);
2718
2719 /* wait for next event */
2720 } /* ms_event_handler */
2721
2722
2723 /**
2724 * get the next socket descriptor index to run for replication
2725 *
2726 * @param c, pointer of the concurrency
2727 * @param cmd, command(get or set )
2728 *
2729 * @return int, if success, return the index, else return 0
2730 */
2731 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2732 {
2733 int sock_index= -1;
2734 int i= 0;
2735
2736 if (c->total_sfds == 1)
2737 {
2738 return 0;
2739 }
2740
2741 if (ms_setting.rep_write_srv == 0)
2742 {
2743 return sock_index;
2744 }
2745
2746 do
2747 {
2748 if (cmd == CMD_SET)
2749 {
2750 for (i= 0; i < ms_setting.rep_write_srv; i++)
2751 {
2752 if (c->tcpsfd[i] > 0)
2753 {
2754 break;
2755 }
2756 }
2757
2758 if (i == ms_setting.rep_write_srv)
2759 {
2760 /* random get one replication server to read */
2761 sock_index= (int)(random() % c->total_sfds);
2762 }
2763 else
2764 {
2765 /* random get one replication writing server to write */
2766 sock_index= (int)(random() % ms_setting.rep_write_srv);
2767 }
2768 }
2769 else if (cmd == CMD_GET)
2770 {
2771 /* random get one replication server to read */
2772 sock_index= (int)(random() % c->total_sfds);
2773 }
2774 }
2775 while (c->tcpsfd[sock_index] == 0);
2776
2777 return sock_index;
2778 } /* ms_get_rep_sock_index */
2779
2780
2781 /**
2782 * get the next socket descriptor index to run
2783 *
2784 * @param c, pointer of the concurrency
2785 *
2786 * @return int, return the index
2787 */
2788 static int ms_get_next_sock_index(ms_conn_t *c)
2789 {
2790 int sock_index= 0;
2791
2792 do
2793 {
2794 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2795 }
2796 while (c->tcpsfd[sock_index] == 0);
2797
2798 return sock_index;
2799 } /* ms_get_next_sock_index */
2800
2801
2802 /**
2803 * update socket event of the connections
2804 *
2805 * @param c, pointer of the concurrency
2806 *
2807 * @return int, if success, return 0, else return -1
2808 */
2809 static int ms_update_conn_sock_event(ms_conn_t *c)
2810 {
2811 assert(c != NULL);
2812
2813 switch (c->currcmd.cmd)
2814 {
2815 case CMD_SET:
2816 if (ms_setting.facebook_test && c->udp)
2817 {
2818 c->sfd= c->tcpsfd[0];
2819 c->udp= false;
2820 c->change_sfd= true;
2821 }
2822 break;
2823
2824 case CMD_GET:
2825 if (ms_setting.facebook_test && ! c->udp)
2826 {
2827 c->sfd= c->udpsfd;
2828 c->udp= true;
2829 c->change_sfd= true;
2830 }
2831 break;
2832
2833 default:
2834 break;
2835 } /* switch */
2836
2837 if (! c->udp && (c->total_sfds > 1))
2838 {
2839 if (c->cur_idx != c->total_sfds)
2840 {
2841 if (ms_setting.rep_write_srv == 0)
2842 {
2843 c->cur_idx= ms_get_next_sock_index(c);
2844 }
2845 else
2846 {
2847 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2848 }
2849 }
2850 else
2851 {
2852 /* must select the first sock of the connection at the beginning */
2853 c->cur_idx= 0;
2854 }
2855
2856 c->sfd= c->tcpsfd[c->cur_idx];
2857 assert(c->sfd != 0);
2858 c->change_sfd= true;
2859 }
2860
2861 if (c->change_sfd)
2862 {
2863 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2864 {
2865 fprintf(stderr, "Couldn't update event.\n");
2866 ms_conn_set_state(c, conn_closing);
2867 return -1;
2868 }
2869 }
2870
2871 return 0;
2872 } /* ms_update_conn_sock_event */
2873
2874
2875 /**
2876 * for ASCII protocol, this function build the set command
2877 * string and send the command.
2878 *
2879 * @param c, pointer of the concurrency
2880 * @param item, pointer of task item which includes the object
2881 * information
2882 *
2883 * @return int, if success, return 0, else return -1
2884 */
2885 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2886 {
2887 int value_offset;
2888 int write_len;
2889 char *buffer= c->wbuf;
2890
2891 write_len= sprintf(buffer,
2892 " %u %d %d\r\n",
2893 0,
2894 item->exp_time,
2895 item->value_size);
2896
2897 if (write_len > c->wsize)
2898 {
2899 /* ought to be always enough. just fail for simplicity */
2900 fprintf(stderr, "output command line too long.\n");
2901 return -1;
2902 }
2903
2904 if (item->value_offset == INVALID_OFFSET)
2905 {
2906 value_offset= item->key_suffix_offset;
2907 }
2908 else
2909 {
2910 value_offset= item->value_offset;
2911 }
2912
2913 if ((ms_add_iov(c, "set ", 4) != 0)
2914 || (ms_add_iov(c, (char *)&item->key_prefix,
2915 (int)KEY_PREFIX_SIZE) != 0)
2916 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2917 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2918 || (ms_add_iov(c, buffer, write_len) != 0)
2919 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2920 item->value_size) != 0)
2921 || (ms_add_iov(c, "\r\n", 2) != 0)
2922 || (c->udp && (ms_build_udp_headers(c) != 0)))
2923 {
2924 return -1;
2925 }
2926
2927 return 0;
2928 } /* ms_build_ascii_write_buf_set */
2929
2930
2931 /**
2932 * used to send set command to server
2933 *
2934 * @param c, pointer of the concurrency
2935 * @param item, pointer of task item which includes the object
2936 * information
2937 *
2938 * @return int, if success, return 0, else return -1
2939 */
2940 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2941 {
2942 assert(c != NULL);
2943
2944 c->currcmd.cmd= CMD_SET;
2945 c->currcmd.isfinish= false;
2946 c->currcmd.retstat= MCD_FAILURE;
2947
2948 if (ms_update_conn_sock_event(c) != 0)
2949 {
2950 return -1;
2951 }
2952
2953 c->msgcurr= 0;
2954 c->msgused= 0;
2955 c->iovused= 0;
2956 if (ms_add_msghdr(c) != 0)
2957 {
2958 fprintf(stderr, "Out of memory preparing request.");
2959 return -1;
2960 }
2961
2962 /* binary protocol */
2963 if (c->protocol == binary_prot)
2964 {
2965 if (ms_build_bin_write_buf_set(c, item) != 0)
2966 {
2967 return -1;
2968 }
2969 }
2970 else
2971 {
2972 if (ms_build_ascii_write_buf_set(c, item) != 0)
2973 {
2974 return -1;
2975 }
2976 }
2977
2978 __sync_fetch_and_add(&ms_stats.obj_bytes,
2979 item->key_size + item->value_size);
2980 __sync_fetch_and_add(&ms_stats.cmd_set, 1);
2981
2982 return 0;
2983 } /* ms_mcd_set */
2984
2985
2986 /**
2987 * for ASCII protocol, this function build the get command
2988 * string and send the command.
2989 *
2990 * @param c, pointer of the concurrency
2991 * @param item, pointer of task item which includes the object
2992 * information
2993 *
2994 * @return int, if success, return 0, else return -1
2995 */
2996 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
2997 {
2998 if ((ms_add_iov(c, "get ", 4) != 0)
2999 || (ms_add_iov(c, (char *)&item->key_prefix,
3000 (int)KEY_PREFIX_SIZE) != 0)
3001 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3002 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3003 || (ms_add_iov(c, "\r\n", 2) != 0)
3004 || (c->udp && (ms_build_udp_headers(c) != 0)))
3005 {
3006 return -1;
3007 }
3008
3009 return 0;
3010 } /* ms_build_ascii_write_buf_get */
3011
3012
3013 /**
3014 * used to send the get command to server
3015 *
3016 * @param c, pointer of the concurrency
3017 * @param item, pointer of task item which includes the object
3018 * information
3019 * @param verify, whether do verification
3020 *
3021 * @return int, if success, return 0, else return -1
3022 */
3023 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
3024 {
3025 /* verify not supported yet */
3026 UNUSED_ARGUMENT(verify);
3027
3028 assert(c != NULL);
3029
3030 c->currcmd.cmd= CMD_GET;
3031 c->currcmd.isfinish= false;
3032 c->currcmd.retstat= MCD_FAILURE;
3033
3034 if (ms_update_conn_sock_event(c) != 0)
3035 {
3036 return -1;
3037 }
3038
3039 c->msgcurr= 0;
3040 c->msgused= 0;
3041 c->iovused= 0;
3042 if (ms_add_msghdr(c) != 0)
3043 {
3044 fprintf(stderr, "Out of memory preparing request.");
3045 return -1;
3046 }
3047
3048 /* binary protocol */
3049 if (c->protocol == binary_prot)
3050 {
3051 if (ms_build_bin_write_buf_get(c, item) != 0)
3052 {
3053 return -1;
3054 }
3055 }
3056 else
3057 {
3058 if (ms_build_ascii_write_buf_get(c, item) != 0)
3059 {
3060 return -1;
3061 }
3062 }
3063
3064 __sync_fetch_and_add(&ms_stats.cmd_get, 1);
3065
3066 return 0;
3067 } /* ms_mcd_get */
3068
3069
3070 /**
3071 * for ASCII protocol, this function build the multi-get command
3072 * string and send the command.
3073 *
3074 * @param c, pointer of the concurrency
3075 *
3076 * @return int, if success, return 0, else return -1
3077 */
3078 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3079 {
3080 ms_task_item_t *item;
3081
3082 if (ms_add_iov(c, "get", 3) != 0)
3083 {
3084 return -1;
3085 }
3086
3087 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3088 {
3089 item= c->mlget_task.mlget_item[i].item;
3090 assert(item != NULL);
3091 if ((ms_add_iov(c, " ", 1) != 0)
3092 || (ms_add_iov(c, (char *)&item->key_prefix,
3093 (int)KEY_PREFIX_SIZE) != 0)
3094 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3095 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3096 {
3097 return -1;
3098 }
3099 }
3100
3101 if ((ms_add_iov(c, "\r\n", 2) != 0)
3102 || (c->udp && (ms_build_udp_headers(c) != 0)))
3103 {
3104 return -1;
3105 }
3106
3107 return 0;
3108 } /* ms_build_ascii_write_buf_mlget */
3109
3110
3111 /**
3112 * used to send the multi-get command to server
3113 *
3114 * @param c, pointer of the concurrency
3115 *
3116 * @return int, if success, return 0, else return -1
3117 */
3118 int ms_mcd_mlget(ms_conn_t *c)
3119 {
3120 ms_task_item_t *item;
3121
3122 assert(c != NULL);
3123 assert(c->mlget_task.mlget_num >= 1);
3124
3125 c->currcmd.cmd= CMD_GET;
3126 c->currcmd.isfinish= false;
3127 c->currcmd.retstat= MCD_FAILURE;
3128
3129 if (ms_update_conn_sock_event(c) != 0)
3130 {
3131 return -1;
3132 }
3133
3134 c->msgcurr= 0;
3135 c->msgused= 0;
3136 c->iovused= 0;
3137 if (ms_add_msghdr(c) != 0)
3138 {
3139 fprintf(stderr, "Out of memory preparing request.");
3140 return -1;
3141 }
3142
3143 /* binary protocol */
3144 if (c->protocol == binary_prot)
3145 {
3146 if (ms_build_bin_write_buf_mlget(c) != 0)
3147 {
3148 return -1;
3149 }
3150 }
3151 else
3152 {
3153 if (ms_build_ascii_write_buf_mlget(c) != 0)
3154 {
3155 return -1;
3156 }
3157 }
3158
3159 /* decrease operation time of each item */
3160 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3161 {
3162 item= c->mlget_task.mlget_item[i].item;
3163 __sync_fetch_and_add(&ms_stats.cmd_get, 1);
3164 }
3165
3166 return 0;
3167 } /* ms_mcd_mlget */
3168
3169
3170 /**
3171 * binary protocol support
3172 */
3173
3174 /**
3175 * for binary protocol, parse the response of server
3176 *
3177 * @param c, pointer of the concurrency
3178 *
3179 * @return int, if success, return 0, else return -1
3180 */
3181 static int ms_bin_process_response(ms_conn_t *c)
3182 {
3183 const char *errstr= NULL;
3184
3185 assert(c != NULL);
3186
3187 uint32_t bodylen= c->binary_header.response.bodylen;
3188 uint8_t opcode= c->binary_header.response.opcode;
3189 uint16_t status= c->binary_header.response.status;
3190
3191 if (bodylen > 0)
3192 {
3193 c->rvbytes= (int32_t)bodylen;
3194 c->readval= true;
3195 return 1;
3196 }
3197 else
3198 {
3199 switch (status)
3200 {
3201 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3202 if (opcode == PROTOCOL_BINARY_CMD_SET)
3203 {
3204 c->currcmd.retstat= MCD_STORED;
3205 }
3206 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3207 {
3208 c->currcmd.retstat= MCD_DELETED;
3209 }
3210 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3211 {
3212 c->currcmd.retstat= MCD_END;
3213 }
3214 break;
3215
3216 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3217 errstr= "Out of memory";
3218 c->currcmd.retstat= MCD_SERVER_ERROR;
3219 break;
3220
3221 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3222 errstr= "Unknown command";
3223 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3224 break;
3225
3226 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3227 errstr= "Not found";
3228 c->currcmd.retstat= MCD_NOTFOUND;
3229 break;
3230
3231 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3232 errstr= "Invalid arguments";
3233 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3234 break;
3235
3236 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3237 errstr= "Data exists for key.";
3238 break;
3239
3240 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3241 errstr= "Too large.";
3242 c->currcmd.retstat= MCD_SERVER_ERROR;
3243 break;
3244
3245 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3246 errstr= "Not stored.";
3247 c->currcmd.retstat= MCD_NOTSTORED;
3248 break;
3249
3250 default:
3251 errstr= "Unknown error";
3252 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3253 break;
3254 } /* switch */
3255
3256 if (errstr != NULL)
3257 {
3258 fprintf(stderr, "%s\n", errstr);
3259 }
3260 }
3261
3262 return 0;
3263 } /* ms_bin_process_response */
3264
3265
3266 /* build binary header and add the header to the buffer to send */
3267
3268 /**
3269 * build binary header and add the header to the buffer to send
3270 *
3271 * @param c, pointer of the concurrency
3272 * @param opcode, operation code
3273 * @param hdr_len, length of header
3274 * @param key_len, length of key
3275 * @param body_len. length of body
3276 */
3277 static void ms_add_bin_header(ms_conn_t *c,
3278 uint8_t opcode,
3279 uint8_t hdr_len,
3280 uint16_t key_len,
3281 uint32_t body_len)
3282 {
3283 protocol_binary_request_header *header;
3284
3285 assert(c != NULL);
3286
3287 header= (protocol_binary_request_header *)c->wcurr;
3288
3289 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3290 header->request.opcode= (uint8_t)opcode;
3291 header->request.keylen= htonl(key_len);
3292
3293 header->request.extlen= (uint8_t)hdr_len;
3294 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3295 header->request.reserved= 0;
3296
3297 header->request.bodylen= htonl(body_len);
3298 header->request.opaque= 0;
3299 header->request.cas= 0;
3300
3301 ms_add_iov(c, c->wcurr, sizeof(header->request));
3302 } /* ms_add_bin_header */
3303
3304
3305 /**
3306 * add the key to the socket write buffer array
3307 *
3308 * @param c, pointer of the concurrency
3309 * @param item, pointer of task item which includes the object
3310 * information
3311 */
3312 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3313 {
3314 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3315 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3316 item->key_size - (int)KEY_PREFIX_SIZE);
3317 }
3318
3319
3320 /**
3321 * for binary protocol, this function build the set command
3322 * and add the command to send buffer array.
3323 *
3324 * @param c, pointer of the concurrency
3325 * @param item, pointer of task item which includes the object
3326 * information
3327 *
3328 * @return int, if success, return 0, else return -1
3329 */
3330 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3331 {
3332 assert(c->wbuf == c->wcurr);
3333
3334 int value_offset;
3335 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3336 uint16_t keylen= (uint16_t)item->key_size;
3337 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3338 + (uint32_t)keylen + (uint32_t)item->value_size;
3339
3340 ms_add_bin_header(c,
3341 PROTOCOL_BINARY_CMD_SET,
3342 sizeof(rep->message.body),
3343 keylen,
3344 bodylen);
3345 rep->message.body.flags= 0;
3346 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3347 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3348 ms_add_key_to_iov(c, item);
3349
3350 if (item->value_offset == INVALID_OFFSET)
3351 {
3352 value_offset= item->key_suffix_offset;
3353 }
3354 else
3355 {
3356 value_offset= item->value_offset;
3357 }
3358 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3359
3360 return 0;
3361 } /* ms_build_bin_write_buf_set */
3362
3363
3364 /**
3365 * for binary protocol, this function build the get command and
3366 * add the command to send buffer array.
3367 *
3368 * @param c, pointer of the concurrency
3369 * @param item, pointer of task item which includes the object
3370 * information
3371 *
3372 * @return int, if success, return 0, else return -1
3373 */
3374 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3375 {
3376 assert(c->wbuf == c->wcurr);
3377
3378 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3379 (uint32_t)item->key_size);
3380 ms_add_key_to_iov(c, item);
3381
3382 return 0;
3383 } /* ms_build_bin_write_buf_get */
3384
3385
3386 /**
3387 * for binary protocol, this function build the multi-get
3388 * command and add the command to send buffer array.
3389 *
3390 * @param c, pointer of the concurrency
3391 * @param item, pointer of task item which includes the object
3392 * information
3393 *
3394 * @return int, if success, return 0, else return -1
3395 */
3396 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3397 {
3398 ms_task_item_t *item;
3399
3400 assert(c->wbuf == c->wcurr);
3401
3402 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3403 {
3404 item= c->mlget_task.mlget_item[i].item;
3405 assert(item != NULL);
3406
3407 ms_add_bin_header(c,
3408 PROTOCOL_BINARY_CMD_GET,
3409 0,
3410 (uint16_t)item->key_size,
3411 (uint32_t)item->key_size);
3412 ms_add_key_to_iov(c, item);
3413 c->wcurr+= sizeof(protocol_binary_request_get);
3414 }
3415
3416 c->wcurr= c->wbuf;
3417
3418 return 0;
3419 } /* ms_build_bin_write_buf_mlget */