feb6a476bbf91e2b65372d53f15ac8e9b36c1f6a
[awesomized/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <stdio.h>
15 #include <limits.h>
16 #include <sys/uio.h>
17 #include <event.h>
18 #include <fcntl.h>
19 #include <netinet/tcp.h>
20 #include <arpa/inet.h>
21 #include "ms_setting.h"
22 #include "ms_thread.h"
23 #include "ms_atomic.h"
24
25 /* for network write */
26 #define TRANSMIT_COMPLETE 0
27 #define TRANSMIT_INCOMPLETE 1
28 #define TRANSMIT_SOFT_ERROR 2
29 #define TRANSMIT_HARD_ERROR 3
30
31 /* for generating key */
32 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
33 #define KEY_PREFIX_MASK 0x1010101010101010
34
35 /* For parse the value length return by server */
36 #define KEY_TOKEN 1
37 #define VALUELEN_TOKEN 3
38
39 /* global increasing counter, to ensure the key prefix unique */
40 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
41
42 /* global increasing counter, generating request id for UDP */
43 static int32_t udp_request_id= 0;
44
45 extern __thread ms_thread_t ms_thread;
46
47 /* generate upd request id */
48 static int ms_get_udp_request_id(void);
49
50
51 /* connect initialize */
52 static void ms_task_init(ms_conn_t *c);
53 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
54 static int ms_conn_sock_init(ms_conn_t *c);
55 static int ms_conn_event_init(ms_conn_t *c);
56 static int ms_conn_init(ms_conn_t *c,
57 const int init_state,
58 const int read_buffer_size,
59 const bool is_udp);
60 static void ms_warmup_num_init(ms_conn_t *c);
61 static int ms_item_win_init(ms_conn_t *c);
62
63
64 /* connection close */
65 void ms_conn_free(ms_conn_t *c);
66 static void ms_conn_close(ms_conn_t *c);
67
68
69 /* create network connection */
70 static int ms_new_socket(struct addrinfo *ai);
71 static void ms_maximize_sndbuf(const int sfd);
72 static int ms_network_connect(ms_conn_t *c,
73 char *srv_host_name,
74 const int srv_port,
75 const bool is_udp,
76 int *ret_sfd);
77 static int ms_reconn(ms_conn_t *c);
78
79
80 /* read and parse */
81 static int ms_tokenize_command(char *command,
82 token_t *tokens,
83 const int max_tokens);
84 static int ms_ascii_process_line(ms_conn_t *c, char *command);
85 static int ms_try_read_line(ms_conn_t *c);
86 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
87 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
88 static int ms_try_read_network(ms_conn_t *c);
89 static void ms_verify_value(ms_conn_t *c,
90 ms_mlget_task_item_t *mlget_item,
91 char *value,
92 int vlen);
93 static void ms_ascii_complete_nread(ms_conn_t *c);
94 static void ms_bin_complete_nread(ms_conn_t *c);
95 static void ms_complete_nread(ms_conn_t *c);
96
97
98 /* send functions */
99 static int ms_add_msghdr(ms_conn_t *c);
100 static int ms_ensure_iov_space(ms_conn_t *c);
101 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
102 static int ms_build_udp_headers(ms_conn_t *c);
103 static int ms_transmit(ms_conn_t *c);
104
105
106 /* status adjustment */
107 static void ms_conn_shrink(ms_conn_t *c);
108 static void ms_conn_set_state(ms_conn_t *c, int state);
109 static bool ms_update_event(ms_conn_t *c, const int new_flags);
110 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
111 static int ms_get_next_sock_index(ms_conn_t *c);
112 static int ms_update_conn_sock_event(ms_conn_t *c);
113 static bool ms_need_yield(ms_conn_t *c);
114 static void ms_update_start_time(ms_conn_t *c);
115
116
117 /* main loop */
118 static void ms_drive_machine(ms_conn_t *c);
119 void ms_event_handler(const int fd, const short which, void *arg);
120
121
122 /* ascii protocol */
123 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
124 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
125 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
126
127
128 /* binary protocol */
129 static int ms_bin_process_response(ms_conn_t *c);
130 static void ms_add_bin_header(ms_conn_t *c,
131 uint8_t opcode,
132 uint8_t hdr_len,
133 uint16_t key_len,
134 uint32_t body_len);
135 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
136 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
137 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
138 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
139
140
141 /**
142 * each key has two parts, prefix and suffix. The suffix is a
143 * string random get form the character table. The prefix is a
144 * uint64_t variable. And the prefix must be unique. we use the
145 * prefix to identify a key. And the prefix can't include
146 * character ' ' '\r' '\n' '\0'.
147 *
148 * @return uint64_t
149 */
150 uint64_t ms_get_key_prefix(void)
151 {
152 uint64_t key_prefix;
153
154 pthread_mutex_lock(&ms_global.seq_mutex);
155 key_prefix_seq|= KEY_PREFIX_MASK;
156 key_prefix= key_prefix_seq;
157 key_prefix_seq++;
158 pthread_mutex_unlock(&ms_global.seq_mutex);
159
160 return key_prefix;
161 } /* ms_get_key_prefix */
162
163
164 /**
165 * get an unique udp request id
166 *
167 * @return an unique UDP request id
168 */
169 static int ms_get_udp_request_id(void)
170 {
171 return __sync_fetch_and_add(&udp_request_id, 1);
172 }
173
174
175 /**
176 * initialize current task structure
177 *
178 * @param c, pointer of the concurrency
179 */
180 static void ms_task_init(ms_conn_t *c)
181 {
182 c->curr_task.cmd= CMD_NULL;
183 c->curr_task.item= 0;
184 c->curr_task.verify= false;
185 c->curr_task.finish_verify= true;
186 c->curr_task.get_miss= true;
187
188 c->curr_task.get_opt= 0;
189 c->curr_task.set_opt= 0;
190 c->curr_task.cycle_undo_get= 0;
191 c->curr_task.cycle_undo_set= 0;
192 c->curr_task.verified_get= 0;
193 c->curr_task.overwrite_set= 0;
194 } /* ms_task_init */
195
196
197 /**
198 * initialize udp for the connection structure
199 *
200 * @param c, pointer of the concurrency
201 * @param is_udp, whether it's udp
202 *
203 * @return int, if success, return 0, else return -1
204 */
205 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
206 {
207 c->hdrbuf= 0;
208 c->rudpbuf= 0;
209 c->udppkt= 0;
210
211 c->rudpsize= UDP_DATA_BUFFER_SIZE;
212 c->hdrsize= 0;
213
214 c->rudpbytes= 0;
215 c->packets= 0;
216 c->recvpkt= 0;
217 c->pktcurr= 0;
218 c->ordcurr= 0;
219
220 c->udp= is_udp;
221
222 if (c->udp || (! c->udp && ms_setting.facebook_test))
223 {
224 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
225 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
226
227 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
228 {
229 if (c->rudpbuf != NULL)
230 free(c->rudpbuf);
231 if (c->udppkt != NULL)
232 free(c->udppkt);
233 fprintf(stderr, "malloc()\n");
234 return -1;
235 }
236 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
237 }
238
239 return 0;
240 } /* ms_conn_udp_init */
241
242
243 /**
244 * initialize the connection structure
245 *
246 * @param c, pointer of the concurrency
247 * @param init_state, (conn_read, conn_write, conn_closing)
248 * @param read_buffer_size
249 * @param is_udp, whether it's udp
250 *
251 * @return int, if success, return 0, else return -1
252 */
253 static int ms_conn_init(ms_conn_t *c,
254 const int init_state,
255 const int read_buffer_size,
256 const bool is_udp)
257 {
258 assert(c != NULL);
259
260 c->rbuf= c->wbuf= 0;
261 c->iov= 0;
262 c->msglist= 0;
263
264 c->rsize= read_buffer_size;
265 c->wsize= WRITE_BUFFER_SIZE;
266 c->iovsize= IOV_LIST_INITIAL;
267 c->msgsize= MSG_LIST_INITIAL;
268
269 /* for replication, each connection need connect all the server */
270 if (ms_setting.rep_write_srv > 0)
271 {
272 c->total_sfds= ms_setting.srv_cnt;
273 }
274 else
275 {
276 c->total_sfds= ms_setting.sock_per_conn;
277 }
278 c->alive_sfds= 0;
279
280 c->rbuf= (char *)malloc((size_t)c->rsize);
281 c->wbuf= (char *)malloc((size_t)c->wsize);
282 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
283 c->msglist= (struct msghdr *)malloc(
284 sizeof(struct msghdr) * (size_t)c->msgsize);
285 if (ms_setting.mult_key_num > 1)
286 {
287 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
288 malloc(
289 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
290 }
291 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
292
293 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
294 || (c->msglist == NULL) || (c->tcpsfd == NULL)
295 || ((ms_setting.mult_key_num > 1)
296 && (c->mlget_task.mlget_item == NULL)))
297 {
298 if (c->rbuf != NULL)
299 free(c->rbuf);
300 if (c->wbuf != NULL)
301 free(c->wbuf);
302 if (c->iov != NULL)
303 free(c->iov);
304 if (c->msglist != NULL)
305 free(c->msglist);
306 if (c->mlget_task.mlget_item != NULL)
307 free(c->mlget_task.mlget_item);
308 if (c->tcpsfd != NULL)
309 free(c->tcpsfd);
310 fprintf(stderr, "malloc()\n");
311 return -1;
312 }
313
314 c->state= init_state;
315 c->rvbytes= 0;
316 c->rbytes= 0;
317 c->rcurr= c->rbuf;
318 c->wcurr= c->wbuf;
319 c->iovused= 0;
320 c->msgcurr= 0;
321 c->msgused= 0;
322 c->cur_idx= c->total_sfds; /* default index is a invalid value */
323
324 c->ctnwrite= false;
325 c->readval= false;
326 c->change_sfd= false;
327
328 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
329 c->precmd.isfinish= true; /* default the previous command finished */
330 c->currcmd.isfinish= false;
331 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
332 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
333
334 c->mlget_task.mlget_num= 0;
335 c->mlget_task.value_index= -1; /* default invalid value */
336
337 if (ms_setting.binary_prot)
338 {
339 c->protocol= binary_prot;
340 }
341 else if (is_udp)
342 {
343 c->protocol= ascii_udp_prot;
344 }
345 else
346 {
347 c->protocol= ascii_prot;
348 }
349
350 /* initialize udp */
351 if (ms_conn_udp_init(c, is_udp) != 0)
352 {
353 return -1;
354 }
355
356 /* initialize task */
357 ms_task_init(c);
358
359 if (! (ms_setting.facebook_test && is_udp))
360 {
361 __sync_fetch_and_add(&ms_stats.active_conns, 1);
362 }
363
364 return 0;
365 } /* ms_conn_init */
366
367
368 /**
369 * when doing 100% get operation, it could preset some objects
370 * to warmup the server. this function is used to initialize the
371 * number of the objects to preset.
372 *
373 * @param c, pointer of the concurrency
374 */
375 static void ms_warmup_num_init(ms_conn_t *c)
376 {
377 /* no set operation, preset all the items in the window */
378 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
379 {
380 c->warmup_num= c->win_size;
381 c->remain_warmup_num= c->warmup_num;
382 }
383 else
384 {
385 c->warmup_num= 0;
386 c->remain_warmup_num= c->warmup_num;
387 }
388 } /* ms_warmup_num_init */
389
390
391 /**
392 * each connection has an item window, this function initialize
393 * the window. The window is used to generate task.
394 *
395 * @param c, pointer of the concurrency
396 *
397 * @return int, if success, return 0, else return -1
398 */
399 static int ms_item_win_init(ms_conn_t *c)
400 {
401 int exp_cnt= 0;
402
403 c->win_size= (int)ms_setting.win_size;
404 c->set_cursor= 0;
405 c->exec_num= ms_thread.thread_ctx->exec_num_perconn;
406 c->remain_exec_num= c->exec_num;
407
408 c->item_win= (ms_task_item_t *)malloc(
409 sizeof(ms_task_item_t) * (size_t)c->win_size);
410 if (c->item_win == NULL)
411 {
412 fprintf(stderr, "Can't allocate task item array for conn.\n");
413 return -1;
414 }
415 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
416
417 for (int i= 0; i < c->win_size; i++)
418 {
419 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
420 c->item_win[i].key_prefix= ms_get_key_prefix();
421 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
422 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
423 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
424 c->item_win[i].client_time= 0;
425
426 /* set expire time base on the proportion */
427 if (exp_cnt < ms_setting.exp_ver_per * i)
428 {
429 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
430 exp_cnt++;
431 }
432 else
433 {
434 c->item_win[i].exp_time= 0;
435 }
436 }
437
438 ms_warmup_num_init(c);
439
440 return 0;
441 } /* ms_item_win_init */
442
443
444 /**
445 * each connection structure can include one or more sock
446 * handlers. this function create these socks and connect the
447 * server(s).
448 *
449 * @param c, pointer of the concurrency
450 *
451 * @return int, if success, return 0, else return -1
452 */
453 static int ms_conn_sock_init(ms_conn_t *c)
454 {
455 int i;
456 int ret_sfd;
457 int srv_idx= 0;
458
459 assert(c != NULL);
460 assert(c->tcpsfd != NULL);
461
462 for (i= 0; i < c->total_sfds; i++)
463 {
464 ret_sfd= 0;
465 if (ms_setting.rep_write_srv > 0)
466 {
467 /* for replication, each connection need connect all the server */
468 srv_idx= i;
469 }
470 else
471 {
472 /* all the connections in a thread connects the same server */
473 srv_idx= ms_thread.thread_ctx->srv_idx;
474 }
475
476 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
477 ms_setting.servers[srv_idx].srv_port,
478 ms_setting.udp, &ret_sfd) != 0)
479 {
480 break;
481 }
482
483 if (i == 0)
484 {
485 c->sfd= ret_sfd;
486 }
487
488 if (! ms_setting.udp)
489 {
490 c->tcpsfd[i]= ret_sfd;
491 }
492
493 c->alive_sfds++;
494 }
495
496 /* initialize udp sock handler if necessary */
497 if (ms_setting.facebook_test)
498 {
499 ret_sfd= 0;
500 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
501 ms_setting.servers[srv_idx].srv_port,
502 true, &ret_sfd) != 0)
503 {
504 c->udpsfd= 0;
505 }
506 else
507 {
508 c->udpsfd= ret_sfd;
509 }
510 }
511
512 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
513 {
514 if (ms_setting.udp)
515 {
516 close(c->sfd);
517 }
518 else
519 {
520 for (int j= 0; j < i; j++)
521 {
522 close(c->tcpsfd[j]);
523 }
524 }
525
526 if (c->udpsfd != 0)
527 {
528 close(c->udpsfd);
529 }
530
531 return -1;
532 }
533
534 return 0;
535 } /* ms_conn_sock_init */
536
537
538 /**
539 * each connection is managed by libevent, this function
540 * initialize the event of the connection structure.
541 *
542 * @param c, pointer of the concurrency
543 *
544 * @return int, if success, return 0, else return -1
545 */
546 static int ms_conn_event_init(ms_conn_t *c)
547 {
548 /* default event timeout 10 seconds */
549 struct timeval t=
550 {
551 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
552 };
553 short event_flags= EV_WRITE | EV_PERSIST;
554
555 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
556 event_base_set(ms_thread.base, &c->event);
557 c->ev_flags= event_flags;
558
559 if (c->total_sfds == 1)
560 {
561 if (event_add(&c->event, NULL) == -1)
562 {
563 return -1;
564 }
565 }
566 else
567 {
568 if (event_add(&c->event, &t) == -1)
569 {
570 return -1;
571 }
572 }
573
574 return 0;
575 } /* ms_conn_event_init */
576
577
578 /**
579 * setup a connection, each connection structure of each
580 * thread must call this function to initialize.
581 *
582 * @param c, pointer of the concurrency
583 *
584 * @return int, if success, return 0, else return -1
585 */
586 int ms_setup_conn(ms_conn_t *c)
587 {
588 if (ms_item_win_init(c) != 0)
589 {
590 return -1;
591 }
592
593 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
594 {
595 return -1;
596 }
597
598 if (ms_conn_sock_init(c) != 0)
599 {
600 return -1;
601 }
602
603 if (ms_conn_event_init(c) != 0)
604 {
605 return -1;
606 }
607
608 return 0;
609 } /* ms_setup_conn */
610
611
612 /**
613 * Frees a connection.
614 *
615 * @param c, pointer of the concurrency
616 */
617 void ms_conn_free(ms_conn_t *c)
618 {
619 if (c != NULL)
620 {
621 if (c->hdrbuf != NULL)
622 free(c->hdrbuf);
623 if (c->msglist != NULL)
624 free(c->msglist);
625 if (c->rbuf != NULL)
626 free(c->rbuf);
627 if (c->wbuf != NULL)
628 free(c->wbuf);
629 if (c->iov != NULL)
630 free(c->iov);
631 if (c->mlget_task.mlget_item != NULL)
632 free(c->mlget_task.mlget_item);
633 if (c->rudpbuf != NULL)
634 free(c->rudpbuf);
635 if (c->udppkt != NULL)
636 free(c->udppkt);
637 if (c->item_win != NULL)
638 free(c->item_win);
639 if (c->tcpsfd != NULL)
640 free(c->tcpsfd);
641
642 if (--ms_thread.nactive_conn == 0)
643 {
644 free(ms_thread.conn);
645 }
646 }
647 } /* ms_conn_free */
648
649
650 /**
651 * close a connection
652 *
653 * @param c, pointer of the concurrency
654 */
655 static void ms_conn_close(ms_conn_t *c)
656 {
657 assert(c != NULL);
658
659 /* delete the event, the socket and the connection */
660 event_del(&c->event);
661
662 for (int i= 0; i < c->total_sfds; i++)
663 {
664 if (c->tcpsfd[i] > 0)
665 {
666 close(c->tcpsfd[i]);
667 }
668 }
669 c->sfd= 0;
670
671 if (ms_setting.facebook_test)
672 {
673 close(c->udpsfd);
674 }
675
676 __sync_fetch_and_sub(&ms_stats.active_conns, 1);
677
678 ms_conn_free(c);
679
680 if (ms_setting.run_time == 0)
681 {
682 pthread_mutex_lock(&ms_global.run_lock.lock);
683 ms_global.run_lock.count++;
684 pthread_cond_signal(&ms_global.run_lock.cond);
685 pthread_mutex_unlock(&ms_global.run_lock.lock);
686 }
687
688 if (ms_thread.nactive_conn == 0)
689 {
690 pthread_exit(NULL);
691 }
692 } /* ms_conn_close */
693
694
695 /**
696 * create a new sock
697 *
698 * @param ai, server address information
699 *
700 * @return int, if success, return 0, else return -1
701 */
702 static int ms_new_socket(struct addrinfo *ai)
703 {
704 int sfd;
705
706 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
707 {
708 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
709 return -1;
710 }
711
712 return sfd;
713 } /* ms_new_socket */
714
715
716 /**
717 * Sets a socket's send buffer size to the maximum allowed by the system.
718 *
719 * @param sfd, file descriptor of socket
720 */
721 static void ms_maximize_sndbuf(const int sfd)
722 {
723 socklen_t intsize= sizeof(int);
724 unsigned int last_good= 0;
725 unsigned int min, max, avg;
726 unsigned int old_size;
727
728 /* Start with the default size. */
729 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
730 {
731 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
732 return;
733 }
734
735 /* Binary-search for the real maximum. */
736 min= old_size;
737 max= MAX_SENDBUF_SIZE;
738
739 while (min <= max)
740 {
741 avg= ((unsigned int)(min + max)) / 2;
742 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
743 {
744 last_good= avg;
745 min= avg + 1;
746 }
747 else
748 {
749 max= avg - 1;
750 }
751 }
752 } /* ms_maximize_sndbuf */
753
754
755 /**
756 * socket connects the server
757 *
758 * @param c, pointer of the concurrency
759 * @param srv_host_name, the host name of the server
760 * @param srv_port, port of server
761 * @param is_udp, whether it's udp
762 * @param ret_sfd, the connected socket file descriptor
763 *
764 * @return int, if success, return 0, else return -1
765 */
766 static int ms_network_connect(ms_conn_t *c,
767 char *srv_host_name,
768 const int srv_port,
769 const bool is_udp,
770 int *ret_sfd)
771 {
772 int sfd;
773 struct linger ling=
774 {
775 0, 0
776 };
777 struct addrinfo *ai;
778 struct addrinfo *next;
779 struct addrinfo hints;
780 char port_buf[NI_MAXSERV];
781 int error;
782 int success= 0;
783
784 int flags= 1;
785
786 /*
787 * the memset call clears nonstandard fields in some impementations
788 * that otherwise mess things up.
789 */
790 memset(&hints, 0, sizeof(hints));
791 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
792 if (is_udp)
793 {
794 hints.ai_protocol= IPPROTO_UDP;
795 hints.ai_socktype= SOCK_DGRAM;
796 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
797 }
798 else
799 {
800 hints.ai_family= AF_UNSPEC;
801 hints.ai_protocol= IPPROTO_TCP;
802 hints.ai_socktype= SOCK_STREAM;
803 }
804
805 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
806 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
807 if (error != 0)
808 {
809 if (error != EAI_SYSTEM)
810 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
811 else
812 perror("getaddrinfo()\n");
813
814 return -1;
815 }
816
817 for (next= ai; next; next= next->ai_next)
818 {
819 if ((sfd= ms_new_socket(next)) == -1)
820 {
821 freeaddrinfo(ai);
822 return -1;
823 }
824
825 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
826 if (is_udp)
827 {
828 ms_maximize_sndbuf(sfd);
829 }
830 else
831 {
832 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
833 sizeof(flags));
834 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
835 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
836 sizeof(flags));
837 }
838
839 if (is_udp)
840 {
841 c->srv_recv_addr_size= sizeof(struct sockaddr);
842 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
843 }
844 else
845 {
846 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
847 {
848 close(sfd);
849 freeaddrinfo(ai);
850 return -1;
851 }
852 }
853
854 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
855 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
856 {
857 fprintf(stderr, "setting O_NONBLOCK\n");
858 close(sfd);
859 freeaddrinfo(ai);
860 return -1;
861 }
862
863 if (ret_sfd != NULL)
864 {
865 *ret_sfd= sfd;
866 }
867
868 success++;
869 }
870
871 freeaddrinfo(ai);
872
873 /* Return zero if we detected no errors in starting up connections */
874 return success == 0;
875 } /* ms_network_connect */
876
877
878 /**
879 * reconnect a disconnected sock
880 *
881 * @param c, pointer of the concurrency
882 *
883 * @return int, if success, return 0, else return -1
884 */
885 static int ms_reconn(ms_conn_t *c)
886 {
887 int srv_idx= 0;
888 int srv_conn_cnt= 0;
889
890 if (ms_setting.rep_write_srv > 0)
891 {
892 srv_idx= c->cur_idx;
893 srv_conn_cnt= ms_setting.nconns;
894 }
895 else
896 {
897 srv_idx= ms_thread.thread_ctx->srv_idx;
898 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
899 }
900
901 /* close the old socket handler */
902 close(c->sfd);
903 c->tcpsfd[c->cur_idx]= 0;
904
905 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
906 % srv_conn_cnt == 0)
907 {
908 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
909 fprintf(stderr, "Server %s:%d disconnect\n",
910 ms_setting.servers[srv_idx].srv_host_name,
911 ms_setting.servers[srv_idx].srv_port);
912 }
913
914 if (ms_setting.rep_write_srv > 0)
915 {
916 int i= 0;
917 for (i= 0; i < c->total_sfds; i++)
918 {
919 if (c->tcpsfd[i] != 0)
920 {
921 break;
922 }
923 }
924
925 /* all socks disconnect */
926 if (i == c->total_sfds)
927 {
928 return -1;
929 }
930 }
931 else
932 {
933 do
934 {
935 /* reconnect success, break the loop */
936 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
937 ms_setting.servers[srv_idx].srv_port,
938 ms_setting.udp, &c->sfd) == 0)
939 {
940 c->tcpsfd[c->cur_idx]= c->sfd;
941 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
942 % srv_conn_cnt == 0)
943 {
944 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
945 int reconn_time=
946 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
947 - ms_setting.servers[srv_idx].disconn_time
948 .tv_sec);
949 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
950 ms_setting.servers[srv_idx].srv_host_name,
951 ms_setting.servers[srv_idx].srv_port, reconn_time);
952 }
953 break;
954 }
955
956 if (c->total_sfds == 1)
957 {
958 /* wait a second and reconnect */
959 sleep(1);
960 }
961 }
962 while (c->total_sfds == 1);
963 }
964
965 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
966 {
967 c->sfd= 0;
968 c->alive_sfds--;
969 }
970
971 return 0;
972 } /* ms_reconn */
973
974
975 /**
976 * reconnect several disconnected socks in the connection
977 * structure, the ever-1-second timer of the thread will check
978 * whether some socks in the connections disconnect. if
979 * disconnect, reconnect the sock.
980 *
981 * @param c, pointer of the concurrency
982 *
983 * @return int, if success, return 0, else return -1
984 */
985 int ms_reconn_socks(ms_conn_t *c)
986 {
987 int srv_idx= 0;
988 int ret_sfd= 0;
989 int srv_conn_cnt= 0;
990 struct timeval cur_time;
991
992 assert(c != NULL);
993
994 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
995 {
996 return 0;
997 }
998
999 for (int i= 0; i < c->total_sfds; i++)
1000 {
1001 if (c->tcpsfd[i] == 0)
1002 {
1003 gettimeofday(&cur_time, NULL);
1004
1005 /**
1006 * For failover test of replication, reconnect the socks after
1007 * it disconnects more than 5 seconds, Otherwise memslap will
1008 * block at connect() function and the work threads can't work
1009 * in this interval.
1010 */
1011 if (cur_time.tv_sec
1012 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1013 {
1014 break;
1015 }
1016
1017 if (ms_setting.rep_write_srv > 0)
1018 {
1019 srv_idx= i;
1020 srv_conn_cnt= ms_setting.nconns;
1021 }
1022 else
1023 {
1024 srv_idx= ms_thread.thread_ctx->srv_idx;
1025 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1026 }
1027
1028 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1029 ms_setting.servers[srv_idx].srv_port,
1030 ms_setting.udp, &ret_sfd) == 0)
1031 {
1032 c->tcpsfd[i]= ret_sfd;
1033 c->alive_sfds++;
1034
1035 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1036 % srv_conn_cnt == 0)
1037 {
1038 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1039 int reconn_time=
1040 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1041 - ms_setting.servers[srv_idx].disconn_time
1042 .tv_sec);
1043 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1044 ms_setting.servers[srv_idx].srv_host_name,
1045 ms_setting.servers[srv_idx].srv_port, reconn_time);
1046 }
1047 }
1048 }
1049 }
1050
1051 return 0;
1052 } /* ms_reconn_socks */
1053
1054
1055 /**
1056 * Tokenize the command string by replacing whitespace with '\0' and update
1057 * the token array tokens with pointer to start of each token and length.
1058 * Returns total number of tokens. The last valid token is the terminal
1059 * token (value points to the first unprocessed character of the string and
1060 * length zero).
1061 *
1062 * Usage example:
1063 *
1064 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1065 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1066 * ...
1067 * }
1068 * ncommand = tokens[ix].value - command;
1069 * command = tokens[ix].value;
1070 * }
1071 *
1072 * @param command, the command string to token
1073 * @param tokens, array to store tokens
1074 * @param max_tokens, maximum tokens number
1075 *
1076 * @return int, the number of tokens
1077 */
1078 static int ms_tokenize_command(char *command,
1079 token_t *tokens,
1080 const int max_tokens)
1081 {
1082 char *s, *e;
1083 int ntokens= 0;
1084
1085 assert(command != NULL && tokens != NULL && max_tokens > 1);
1086
1087 for (s= e= command; ntokens < max_tokens - 1; ++e)
1088 {
1089 if (*e == ' ')
1090 {
1091 if (s != e)
1092 {
1093 tokens[ntokens].value= s;
1094 tokens[ntokens].length= (size_t)(e - s);
1095 ntokens++;
1096 *e= '\0';
1097 }
1098 s= e + 1;
1099 }
1100 else if (*e == '\0')
1101 {
1102 if (s != e)
1103 {
1104 tokens[ntokens].value= s;
1105 tokens[ntokens].length= (size_t)(e - s);
1106 ntokens++;
1107 }
1108
1109 break; /* string end */
1110 }
1111 }
1112
1113 return ntokens;
1114 } /* ms_tokenize_command */
1115
1116
1117 /**
1118 * parse the response of server.
1119 *
1120 * @param c, pointer of the concurrency
1121 * @param command, the string responded by server
1122 *
1123 * @return int, if the command completed return 0, else return
1124 * -1
1125 */
1126 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1127 {
1128 int ret= 0;
1129 int64_t value_len;
1130 char *buffer= command;
1131
1132 assert(c != NULL);
1133
1134 /**
1135 * for command get, we store the returned value into local buffer
1136 * then continue in ms_complete_nread().
1137 */
1138
1139 switch (buffer[0])
1140 {
1141 case 'V': /* VALUE || VERSION */
1142 if (buffer[1] == 'A') /* VALUE */
1143 {
1144 token_t tokens[MAX_TOKENS];
1145 ms_tokenize_command(command, tokens, MAX_TOKENS);
1146 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1147 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1148
1149 /*
1150 * We read the \r\n into the string since not doing so is more
1151 * cycles then the waster of memory to do so.
1152 *
1153 * We are null terminating through, which will most likely make
1154 * some people lazy about using the return length.
1155 */
1156 c->rvbytes= (int)(value_len + 2);
1157 c->readval= true;
1158 ret= -1;
1159 }
1160
1161 break;
1162
1163 case 'O': /* OK */
1164 c->currcmd.retstat= MCD_SUCCESS;
1165
1166 case 'S': /* STORED STATS SERVER_ERROR */
1167 if (buffer[2] == 'A') /* STORED STATS */
1168 { /* STATS*/
1169 c->currcmd.retstat= MCD_STAT;
1170 }
1171 else if (buffer[1] == 'E')
1172 {
1173 /* SERVER_ERROR */
1174 printf("<%d %s\n", c->sfd, buffer);
1175
1176 c->currcmd.retstat= MCD_SERVER_ERROR;
1177 }
1178 else if (buffer[1] == 'T')
1179 {
1180 /* STORED */
1181 c->currcmd.retstat= MCD_STORED;
1182 }
1183 else
1184 {
1185 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1186 }
1187 break;
1188
1189 case 'D': /* DELETED DATA */
1190 if (buffer[1] == 'E')
1191 {
1192 c->currcmd.retstat= MCD_DELETED;
1193 }
1194 else
1195 {
1196 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1197 }
1198
1199 break;
1200
1201 case 'N': /* NOT_FOUND NOT_STORED*/
1202 if (buffer[4] == 'F')
1203 {
1204 c->currcmd.retstat= MCD_NOTFOUND;
1205 }
1206 else if (buffer[4] == 'S')
1207 {
1208 printf("<%d %s\n", c->sfd, buffer);
1209 c->currcmd.retstat= MCD_NOTSTORED;
1210 }
1211 else
1212 {
1213 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1214 }
1215 break;
1216
1217 case 'E': /* PROTOCOL ERROR or END */
1218 if (buffer[1] == 'N')
1219 {
1220 /* END */
1221 c->currcmd.retstat= MCD_END;
1222 }
1223 else if (buffer[1] == 'R')
1224 {
1225 printf("<%d ERROR\n", c->sfd);
1226 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1227 }
1228 else if (buffer[1] == 'X')
1229 {
1230 c->currcmd.retstat= MCD_DATA_EXISTS;
1231 printf("<%d %s\n", c->sfd, buffer);
1232 }
1233 else
1234 {
1235 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1236 }
1237 break;
1238
1239 case 'C': /* CLIENT ERROR */
1240 printf("<%d %s\n", c->sfd, buffer);
1241 c->currcmd.retstat= MCD_CLIENT_ERROR;
1242 break;
1243
1244 default:
1245 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1246 break;
1247 } /* switch */
1248
1249 return ret;
1250 } /* ms_ascii_process_line */
1251
1252
1253 /**
1254 * after one operation completes, reset the concurrency
1255 *
1256 * @param c, pointer of the concurrency
1257 * @param timeout, whether it's timeout
1258 */
1259 void ms_reset_conn(ms_conn_t *c, bool timeout)
1260 {
1261 assert(c != NULL);
1262
1263 if (c->udp)
1264 {
1265 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1266 {
1267 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
1268 }
1269
1270 c->packets= 0;
1271 c->recvpkt= 0;
1272 c->pktcurr= 0;
1273 c->ordcurr= 0;
1274 c->rudpbytes= 0;
1275 }
1276 c->currcmd.isfinish= true;
1277 c->ctnwrite= false;
1278 c->rbytes= 0;
1279 c->rcurr= c->rbuf;
1280 ms_conn_set_state(c, conn_write);
1281 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1282
1283 if (timeout)
1284 {
1285 ms_drive_machine(c);
1286 }
1287 } /* ms_reset_conn */
1288
1289
1290 /**
1291 * if we have a complete line in the buffer, process it.
1292 *
1293 * @param c, pointer of the concurrency
1294 *
1295 * @return int, if success, return 0, else return -1
1296 */
1297 static int ms_try_read_line(ms_conn_t *c)
1298 {
1299 if (c->protocol == binary_prot)
1300 {
1301 /* Do we have the complete packet header? */
1302 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1303 {
1304 /* need more data! */
1305 return 0;
1306 }
1307 else
1308 {
1309 #ifdef NEED_ALIGN
1310 if (((long)(c->rcurr)) % 8 != 0)
1311 {
1312 /* must realign input buffer */
1313 memmove(c->rbuf, c->rcurr, c->rbytes);
1314 c->rcurr= c->rbuf;
1315 if (settings.verbose)
1316 {
1317 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1318 }
1319 }
1320 #endif
1321 protocol_binary_response_header *rsp;
1322 rsp= (protocol_binary_response_header *)c->rcurr;
1323
1324 c->binary_header= *rsp;
1325 c->binary_header.response.extlen= rsp->response.extlen;
1326 c->binary_header.response.keylen= ntohl(rsp->response.keylen);
1327 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1328 c->binary_header.response.status= ntohl(rsp->response.status);
1329
1330 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1331 {
1332 fprintf(stderr, "Invalid magic: %x\n",
1333 c->binary_header.response.magic);
1334 ms_conn_set_state(c, conn_closing);
1335 return 0;
1336 }
1337
1338 /* process this complete response */
1339 if (ms_bin_process_response(c) == 0)
1340 {
1341 /* current operation completed */
1342 ms_reset_conn(c, false);
1343 return -1;
1344 }
1345 else
1346 {
1347 c->rbytes-= (int32_t)sizeof(c->binary_header);
1348 c->rcurr+= sizeof(c->binary_header);
1349 }
1350 }
1351 }
1352 else
1353 {
1354 char *el, *cont;
1355
1356 assert(c != NULL);
1357 assert(c->rcurr <= (c->rbuf + c->rsize));
1358
1359 if (c->rbytes == 0)
1360 return 0;
1361
1362 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1363 if (! el)
1364 return 0;
1365
1366 cont= el + 1;
1367 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1368 {
1369 el--;
1370 }
1371 *el= '\0';
1372
1373 assert(cont <= (c->rcurr + c->rbytes));
1374
1375 /* process this complete line */
1376 if (ms_ascii_process_line(c, c->rcurr) == 0)
1377 {
1378 /* current operation completed */
1379 ms_reset_conn(c, false);
1380 return -1;
1381 }
1382 else
1383 {
1384 /* current operation didn't complete */
1385 c->rbytes-= (int32_t)(cont - c->rcurr);
1386 c->rcurr= cont;
1387 }
1388
1389 assert(c->rcurr <= (c->rbuf + c->rsize));
1390 }
1391
1392 return -1;
1393 } /* ms_try_read_line */
1394
1395
1396 /**
1397 * because the packet of UDP can't ensure the order, the
1398 * function is used to sort the received udp packet.
1399 *
1400 * @param c, pointer of the concurrency
1401 * @param buf, the buffer to store the ordered packages data
1402 * @param rbytes, the maximum capacity of the buffer
1403 *
1404 * @return int, if success, return the copy bytes, else return
1405 * -1
1406 */
1407 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1408 {
1409 int len= 0;
1410 int wbytes= 0;
1411 uint16_t req_id= 0;
1412 uint16_t seq_num= 0;
1413 uint16_t packets= 0;
1414 unsigned char *header= NULL;
1415
1416 /* no enough data */
1417 assert(c != NULL);
1418 assert(buf != NULL);
1419 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1420
1421 /* calculate received packets count */
1422 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1423 {
1424 /* the last packet has some data */
1425 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1426 }
1427 else
1428 {
1429 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1430 }
1431
1432 /* get the total packets count if necessary */
1433 if (c->packets == 0)
1434 {
1435 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1436 }
1437
1438 /* build the ordered packet array */
1439 for (int i= c->pktcurr; i < c->recvpkt; i++)
1440 {
1441 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1442 req_id= (uint16_t)HEADER_TO_REQID(header);
1443 assert(req_id == c->request_id % (1 << 16));
1444
1445 packets= (uint16_t)HEADER_TO_PACKETS(header);
1446 assert(c->packets == HEADER_TO_PACKETS(header));
1447
1448 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1449 c->udppkt[seq_num].header= header;
1450 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1451
1452 if (i == c->recvpkt - 1)
1453 {
1454 /* last received packet */
1455 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1456 {
1457 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1458 c->pktcurr++;
1459 }
1460 else
1461 {
1462 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1463 - UDP_HEADER_SIZE;
1464 }
1465 }
1466 else
1467 {
1468 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1469 c->pktcurr++;
1470 }
1471 }
1472
1473 for (int i= c->ordcurr; i < c->recvpkt; i++)
1474 {
1475 /* there is some data to copy */
1476 if ((c->udppkt[i].data != NULL)
1477 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1478 {
1479 header= c->udppkt[i].header;
1480 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1481 if (len > rbytes - wbytes)
1482 {
1483 len= rbytes - wbytes;
1484 }
1485
1486 assert(len <= rbytes - wbytes);
1487 assert(i == HEADER_TO_SEQNUM(header));
1488
1489 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1490 (size_t)len);
1491 wbytes+= len;
1492 c->udppkt[i].copybytes+= len;
1493
1494 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1495 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1496 {
1497 /* finish copying all the data of this packet, next */
1498 c->ordcurr++;
1499 }
1500
1501 /* last received packet, and finish copying all the data */
1502 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1503 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1504 {
1505 break;
1506 }
1507
1508 /* no space to copy data */
1509 if (wbytes >= rbytes)
1510 {
1511 break;
1512 }
1513
1514 /* it doesn't finish reading all the data of the packet from network */
1515 if ((i != c->recvpkt - 1)
1516 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1517 {
1518 break;
1519 }
1520 }
1521 else
1522 {
1523 /* no data to copy */
1524 break;
1525 }
1526 }
1527
1528 return wbytes == 0 ? -1 : wbytes;
1529 } /* ms_sort_udp_packet */
1530
1531
1532 /**
1533 * encapsulate upd read like tcp read
1534 *
1535 * @param c, pointer of the concurrency
1536 * @param buf, read buffer
1537 * @param len, length to read
1538 *
1539 * @return int, if success, return the read bytes, else return
1540 * -1
1541 */
1542 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1543 {
1544 int res= 0;
1545 int avail= 0;
1546 int rbytes= 0;
1547 int copybytes= 0;
1548
1549 assert(c->udp);
1550
1551 while (1)
1552 {
1553 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1554 {
1555 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1556 if (! new_rbuf)
1557 {
1558 fprintf(stderr, "Couldn't realloc input buffer.\n");
1559 c->rudpbytes= 0; /* ignore what we read */
1560 return -1;
1561 }
1562 c->rudpbuf= new_rbuf;
1563 c->rudpsize*= 2;
1564 }
1565
1566 avail= c->rudpsize - c->rudpbytes;
1567 /* UDP each time read a packet, 1400 bytes */
1568 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1569
1570 if (res > 0)
1571 {
1572 __sync_fetch_and_add(&ms_stats.bytes_read, res);
1573 c->rudpbytes+= res;
1574 rbytes+= res;
1575 if (res == avail)
1576 {
1577 continue;
1578 }
1579 else
1580 {
1581 break;
1582 }
1583 }
1584
1585 if (res == 0)
1586 {
1587 /* "connection" closed */
1588 return res;
1589 }
1590
1591 if (res == -1)
1592 {
1593 /* no data to read */
1594 return res;
1595 }
1596 }
1597
1598 /* copy data to read buffer */
1599 if (rbytes > 0)
1600 {
1601 copybytes= ms_sort_udp_packet(c, buf, len);
1602 }
1603
1604 if (copybytes == -1)
1605 {
1606 __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
1607 }
1608
1609 return copybytes;
1610 } /* ms_udp_read */
1611
1612
1613 /*
1614 * read from network as much as we can, handle buffer overflow and connection
1615 * close.
1616 * before reading, move the remaining incomplete fragment of a command
1617 * (if any) to the beginning of the buffer.
1618 * return 0 if there's nothing to read on the first read.
1619 */
1620
1621 /**
1622 * read from network as much as we can, handle buffer overflow and connection
1623 * close. before reading, move the remaining incomplete fragment of a command
1624 * (if any) to the beginning of the buffer.
1625 *
1626 * @param c, pointer of the concurrency
1627 *
1628 * @return int,
1629 * return 0 if there's nothing to read on the first read.
1630 * return 1 if get data
1631 * return -1 if error happens
1632 */
1633 static int ms_try_read_network(ms_conn_t *c)
1634 {
1635 int gotdata= 0;
1636 int res;
1637 int64_t avail;
1638
1639 assert(c != NULL);
1640
1641 if ((c->rcurr != c->rbuf)
1642 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1643 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1644 {
1645 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1646 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1647 c->rcurr= c->rbuf;
1648 }
1649
1650 while (1)
1651 {
1652 if (c->rbytes >= c->rsize)
1653 {
1654 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1655 if (! new_rbuf)
1656 {
1657 fprintf(stderr, "Couldn't realloc input buffer.\n");
1658 c->rbytes= 0; /* ignore what we read */
1659 return -1;
1660 }
1661 c->rcurr= c->rbuf= new_rbuf;
1662 c->rsize*= 2;
1663 }
1664
1665 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1666 if (avail == 0)
1667 {
1668 break;
1669 }
1670
1671 if (c->udp)
1672 {
1673 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1674 }
1675 else
1676 {
1677 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1678 }
1679
1680 if (res > 0)
1681 {
1682 if (! c->udp)
1683 {
1684 __sync_fetch_and_add(&ms_stats.bytes_read, res);
1685 }
1686 gotdata= 1;
1687 c->rbytes+= res;
1688 if (res == avail)
1689 {
1690 continue;
1691 }
1692 else
1693 {
1694 break;
1695 }
1696 }
1697 if (res == 0)
1698 {
1699 /* connection closed */
1700 ms_conn_set_state(c, conn_closing);
1701 return -1;
1702 }
1703 if (res == -1)
1704 {
1705 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1706 break;
1707 /* Should close on unhandled errors. */
1708 ms_conn_set_state(c, conn_closing);
1709 return -1;
1710 }
1711 }
1712
1713 return gotdata;
1714 } /* ms_try_read_network */
1715
1716
1717 /**
1718 * after get the object from server, verify the value if
1719 * necessary.
1720 *
1721 * @param c, pointer of the concurrency
1722 * @param mlget_item, pointer of mulit-get task item structure
1723 * @param value, received value string
1724 * @param vlen, received value string length
1725 */
1726 static void ms_verify_value(ms_conn_t *c,
1727 ms_mlget_task_item_t *mlget_item,
1728 char *value,
1729 int vlen)
1730 {
1731 if (c->curr_task.verify)
1732 {
1733 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1734 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1735 char *orignkey=
1736 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1737
1738 /* verify expire time if necessary */
1739 if (c->curr_task.item->exp_time > 0)
1740 {
1741 struct timeval curr_time;
1742 gettimeofday(&curr_time, NULL);
1743
1744 /* object expired but get it now */
1745 if (curr_time.tv_sec - c->curr_task.item->client_time
1746 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1747 {
1748 __sync_fetch_and_add(&ms_stats.exp_get, 1);
1749
1750 if (ms_setting.verbose)
1751 {
1752 char set_time[64];
1753 char cur_time[64];
1754 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1755 localtime(&c->curr_task.item->client_time));
1756 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1757 localtime(&curr_time.tv_sec));
1758 fprintf(stderr,
1759 "\n<%d expire time verification failed, "
1760 "object expired but get it now\n"
1761 "\tkey len: %d\n"
1762 "\tkey: %lx %.*s\n"
1763 "\tset time: %s current time: %s "
1764 "diff time: %d expire time: %d\n"
1765 "\texpected data: \n"
1766 "\treceived data len: %d\n"
1767 "\treceived data: %.*s\n",
1768 c->sfd,
1769 c->curr_task.item->key_size,
1770 c->curr_task.item->key_prefix,
1771 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1772 orignkey,
1773 set_time,
1774 cur_time,
1775 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1776 c->curr_task.item->exp_time,
1777 vlen,
1778 vlen,
1779 value);
1780 fflush(stderr);
1781 }
1782 }
1783 }
1784 else
1785 {
1786 if ((c->curr_task.item->value_size != vlen)
1787 || (memcmp(orignval, value, (size_t)vlen) != 0))
1788 {
1789 __sync_fetch_and_add(&ms_stats.vef_failed, 1);
1790
1791 if (ms_setting.verbose)
1792 {
1793 fprintf(stderr,
1794 "\n<%d data verification failed\n"
1795 "\tkey len: %d\n"
1796 "\tkey: %lx %.*s\n"
1797 "\texpected data len: %d\n"
1798 "\texpected data: %.*s\n"
1799 "\treceived data len: %d\n"
1800 "\treceived data: %.*s\n",
1801 c->sfd,
1802 c->curr_task.item->key_size,
1803 c->curr_task.item->key_prefix,
1804 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1805 orignkey,
1806 c->curr_task.item->value_size,
1807 c->curr_task.item->value_size,
1808 orignval,
1809 vlen,
1810 vlen,
1811 value);
1812 fflush(stderr);
1813 }
1814 }
1815 }
1816
1817 c->curr_task.finish_verify= true;
1818
1819 if (mlget_item != NULL)
1820 {
1821 mlget_item->finish_verify= true;
1822 }
1823 }
1824 } /* ms_verify_value */
1825
1826
1827 /**
1828 * For ASCII protocol, after store the data into the local
1829 * buffer, run this function to handle the data.
1830 *
1831 * @param c, pointer of the concurrency
1832 */
1833 static void ms_ascii_complete_nread(ms_conn_t *c)
1834 {
1835 assert(c != NULL);
1836 assert(c->rbytes >= c->rvbytes);
1837 assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
1838 if (c->rvbytes > 2)
1839 {
1840 assert(
1841 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1842 }
1843
1844 /* multi-get */
1845 ms_mlget_task_item_t *mlget_item= NULL;
1846 if (((ms_setting.mult_key_num > 1)
1847 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1848 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1849 {
1850 c->mlget_task.value_index++;
1851 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1852
1853 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1854 {
1855 c->curr_task.item= mlget_item->item;
1856 c->curr_task.verify= mlget_item->verify;
1857 c->curr_task.finish_verify= mlget_item->finish_verify;
1858 mlget_item->get_miss= false;
1859 }
1860 else
1861 {
1862 /* Try to find the task item in multi-get task array */
1863 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1864 {
1865 mlget_item= &c->mlget_task.mlget_item[i];
1866 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1867 {
1868 c->curr_task.item= mlget_item->item;
1869 c->curr_task.verify= mlget_item->verify;
1870 c->curr_task.finish_verify= mlget_item->finish_verify;
1871 mlget_item->get_miss= false;
1872
1873 break;
1874 }
1875 }
1876 }
1877 }
1878
1879 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1880
1881 c->curr_task.get_miss= false;
1882 c->rbytes-= c->rvbytes;
1883 c->rcurr= c->rcurr + c->rvbytes;
1884 assert(c->rcurr <= (c->rbuf + c->rsize));
1885 c->readval= false;
1886 c->rvbytes= 0;
1887 } /* ms_ascii_complete_nread */
1888
1889
1890 /**
1891 * For binary protocol, after store the data into the local
1892 * buffer, run this function to handle the data.
1893 *
1894 * @param c, pointer of the concurrency
1895 */
1896 static void ms_bin_complete_nread(ms_conn_t *c)
1897 {
1898 assert(c != NULL);
1899 assert(c->rbytes >= c->rvbytes);
1900 assert(c->protocol == binary_prot);
1901
1902 int extlen= c->binary_header.response.extlen;
1903 int keylen= c->binary_header.response.keylen;
1904 uint8_t opcode= c->binary_header.response.opcode;
1905
1906 /* not get command or not include value, just return */
1907 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1908 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1909 || (c->rvbytes <= extlen + keylen))
1910 {
1911 /* get miss */
1912 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1913 {
1914 c->currcmd.retstat= MCD_END;
1915 c->curr_task.get_miss= true;
1916 }
1917
1918 c->readval= false;
1919 c->rvbytes= 0;
1920 ms_reset_conn(c, false);
1921 return;
1922 }
1923
1924 /* multi-get */
1925 ms_mlget_task_item_t *mlget_item= NULL;
1926 if (((ms_setting.mult_key_num > 1)
1927 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1928 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1929 {
1930 c->mlget_task.value_index++;
1931 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1932
1933 c->curr_task.item= mlget_item->item;
1934 c->curr_task.verify= mlget_item->verify;
1935 c->curr_task.finish_verify= mlget_item->finish_verify;
1936 mlget_item->get_miss= false;
1937 }
1938
1939 ms_verify_value(c,
1940 mlget_item,
1941 c->rcurr + extlen + keylen,
1942 c->rvbytes - extlen - keylen);
1943
1944 c->currcmd.retstat= MCD_END;
1945 c->curr_task.get_miss= false;
1946 c->rbytes-= c->rvbytes;
1947 c->rcurr= c->rcurr + c->rvbytes;
1948 assert(c->rcurr <= (c->rbuf + c->rsize));
1949 c->readval= false;
1950 c->rvbytes= 0;
1951
1952 if (ms_setting.mult_key_num > 1)
1953 {
1954 /* multi-get have check all the item */
1955 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1956 {
1957 ms_reset_conn(c, false);
1958 }
1959 }
1960 else
1961 {
1962 /* single get */
1963 ms_reset_conn(c, false);
1964 }
1965 } /* ms_bin_complete_nread */
1966
1967
1968 /**
1969 * we get here after reading the value of get commands.
1970 *
1971 * @param c, pointer of the concurrency
1972 */
1973 static void ms_complete_nread(ms_conn_t *c)
1974 {
1975 assert(c != NULL);
1976 assert(c->rbytes >= c->rvbytes);
1977 assert(c->protocol == ascii_udp_prot
1978 || c->protocol == ascii_prot
1979 || c->protocol == binary_prot);
1980
1981 if (c->protocol == binary_prot)
1982 {
1983 ms_bin_complete_nread(c);
1984 }
1985 else
1986 {
1987 ms_ascii_complete_nread(c);
1988 }
1989 } /* ms_complete_nread */
1990
1991
1992 /**
1993 * Adds a message header to a connection.
1994 *
1995 * @param c, pointer of the concurrency
1996 *
1997 * @return int, if success, return 0, else return -1
1998 */
1999 static int ms_add_msghdr(ms_conn_t *c)
2000 {
2001 struct msghdr *msg;
2002
2003 assert(c != NULL);
2004
2005 if (c->msgsize == c->msgused)
2006 {
2007 msg=
2008 realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
2009 if (! msg)
2010 return -1;
2011
2012 c->msglist= msg;
2013 c->msgsize*= 2;
2014 }
2015
2016 msg= c->msglist + c->msgused;
2017
2018 /**
2019 * this wipes msg_iovlen, msg_control, msg_controllen, and
2020 * msg_flags, the last 3 of which aren't defined on solaris:
2021 */
2022 memset(msg, 0, sizeof(struct msghdr));
2023
2024 msg->msg_iov= &c->iov[c->iovused];
2025
2026 if (c->udp && (c->srv_recv_addr_size > 0))
2027 {
2028 msg->msg_name= &c->srv_recv_addr;
2029 msg->msg_namelen= c->srv_recv_addr_size;
2030 }
2031
2032 c->msgbytes= 0;
2033 c->msgused++;
2034
2035 if (c->udp)
2036 {
2037 /* Leave room for the UDP header, which we'll fill in later. */
2038 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2039 }
2040
2041 return 0;
2042 } /* ms_add_msghdr */
2043
2044
2045 /**
2046 * Ensures that there is room for another structure iovec in a connection's
2047 * iov list.
2048 *
2049 * @param c, pointer of the concurrency
2050 *
2051 * @return int, if success, return 0, else return -1
2052 */
2053 static int ms_ensure_iov_space(ms_conn_t *c)
2054 {
2055 assert(c != NULL);
2056
2057 if (c->iovused >= c->iovsize)
2058 {
2059 int i, iovnum;
2060 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2061 ((uint64_t)c->iovsize
2062 * 2)
2063 * sizeof(struct iovec));
2064 if (! new_iov)
2065 return -1;
2066
2067 c->iov= new_iov;
2068 c->iovsize*= 2;
2069
2070 /* Point all the msghdr structures at the new list. */
2071 for (i= 0, iovnum= 0; i < c->msgused; i++)
2072 {
2073 c->msglist[i].msg_iov= &c->iov[iovnum];
2074 iovnum+= (int)c->msglist[i].msg_iovlen;
2075 }
2076 }
2077
2078 return 0;
2079 } /* ms_ensure_iov_space */
2080
2081
2082 /**
2083 * Adds data to the list of pending data that will be written out to a
2084 * connection.
2085 *
2086 * @param c, pointer of the concurrency
2087 * @param buf, the buffer includes data to send
2088 * @param len, the data length in the buffer
2089 *
2090 * @return int, if success, return 0, else return -1
2091 */
2092 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2093 {
2094 struct msghdr *m;
2095 int leftover;
2096 bool limit_to_mtu;
2097
2098 assert(c != NULL);
2099
2100 do
2101 {
2102 m= &c->msglist[c->msgused - 1];
2103
2104 /*
2105 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2106 */
2107 limit_to_mtu= c->udp;
2108
2109 /* We may need to start a new msghdr if this one is full. */
2110 if ((m->msg_iovlen == IOV_MAX)
2111 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2112 {
2113 ms_add_msghdr(c);
2114 m= &c->msglist[c->msgused - 1];
2115 }
2116
2117 if (ms_ensure_iov_space(c) != 0)
2118 return -1;
2119
2120 /* If the fragment is too big to fit in the datagram, split it up */
2121 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2122 {
2123 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2124 len-= leftover;
2125 }
2126 else
2127 {
2128 leftover= 0;
2129 }
2130
2131 m= &c->msglist[c->msgused - 1];
2132 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2133 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2134
2135 c->msgbytes+= len;
2136 c->iovused++;
2137 m->msg_iovlen++;
2138
2139 buf= ((char *)buf) + len;
2140 len= leftover;
2141 }
2142 while (leftover > 0);
2143
2144 return 0;
2145 } /* ms_add_iov */
2146
2147
2148 /**
2149 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2150 *
2151 * @param c, pointer of the concurrency
2152 *
2153 * @return int, if success, return 0, else return -1
2154 */
2155 static int ms_build_udp_headers(ms_conn_t *c)
2156 {
2157 int i;
2158 unsigned char *hdr;
2159
2160 assert(c != NULL);
2161
2162 c->request_id= ms_get_udp_request_id();
2163
2164 if (c->msgused > c->hdrsize)
2165 {
2166 void *new_hdrbuf;
2167 if (c->hdrbuf)
2168 new_hdrbuf= realloc(c->hdrbuf,
2169 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2170 else
2171 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2172 if (! new_hdrbuf)
2173 return -1;
2174
2175 c->hdrbuf= (unsigned char *)new_hdrbuf;
2176 c->hdrsize= c->msgused * 2;
2177 }
2178
2179 /* If this is a multi-packet request, drop it. */
2180 if (c->udp && (c->msgused > 1))
2181 {
2182 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2183 return -1;
2184 }
2185
2186 hdr= c->hdrbuf;
2187 for (i= 0; i < c->msgused; i++)
2188 {
2189 c->msglist[i].msg_iov[0].iov_base= hdr;
2190 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2191 *hdr++= (unsigned char)(c->request_id / 256);
2192 *hdr++= (unsigned char)(c->request_id % 256);
2193 *hdr++= (unsigned char)(i / 256);
2194 *hdr++= (unsigned char)(i % 256);
2195 *hdr++= (unsigned char)(c->msgused / 256);
2196 *hdr++= (unsigned char)(c->msgused % 256);
2197 *hdr++= (unsigned char)1; /* support facebook memcached */
2198 *hdr++= (unsigned char)0;
2199 assert(hdr ==
2200 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2201 + UDP_HEADER_SIZE));
2202 }
2203
2204 return 0;
2205 } /* ms_build_udp_headers */
2206
2207
2208 /**
2209 * Transmit the next chunk of data from our list of msgbuf structures.
2210 *
2211 * @param c, pointer of the concurrency
2212 *
2213 * @return TRANSMIT_COMPLETE All done writing.
2214 * TRANSMIT_INCOMPLETE More data remaining to write.
2215 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2216 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2217 */
2218 static int ms_transmit(ms_conn_t *c)
2219 {
2220 assert(c != NULL);
2221
2222 if ((c->msgcurr < c->msgused)
2223 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2224 {
2225 /* Finished writing the current msg; advance to the next. */
2226 c->msgcurr++;
2227 }
2228
2229 if (c->msgcurr < c->msgused)
2230 {
2231 ssize_t res;
2232 struct msghdr *m= &c->msglist[c->msgcurr];
2233
2234 res= sendmsg(c->sfd, m, 0);
2235 if (res > 0)
2236 {
2237 __sync_fetch_and_add(&ms_stats.bytes_written, res);
2238
2239 /* We've written some of the data. Remove the completed
2240 * iovec entries from the list of pending writes. */
2241 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2242 {
2243 res-= (ssize_t)m->msg_iov->iov_len;
2244 m->msg_iovlen--;
2245 m->msg_iov++;
2246 }
2247
2248 /* Might have written just part of the last iovec entry;
2249 * adjust it so the next write will do the rest. */
2250 if (res > 0)
2251 {
2252 m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res;
2253 m->msg_iov->iov_len-= (uint64_t)res;
2254 }
2255 return TRANSMIT_INCOMPLETE;
2256 }
2257 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2258 {
2259 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2260 {
2261 fprintf(stderr, "Couldn't update event.\n");
2262 ms_conn_set_state(c, conn_closing);
2263 return TRANSMIT_HARD_ERROR;
2264 }
2265 return TRANSMIT_SOFT_ERROR;
2266 }
2267
2268 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2269 * we have a real error, on which we close the connection */
2270 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2271
2272 ms_conn_set_state(c, conn_closing);
2273 return TRANSMIT_HARD_ERROR;
2274 }
2275 else
2276 {
2277 return TRANSMIT_COMPLETE;
2278 }
2279 } /* ms_transmit */
2280
2281
2282 /**
2283 * Shrinks a connection's buffers if they're too big. This prevents
2284 * periodic large "mget" response from server chewing lots of client
2285 * memory.
2286 *
2287 * This should only be called in between requests since it can wipe output
2288 * buffers!
2289 *
2290 * @param c, pointer of the concurrency
2291 */
2292 static void ms_conn_shrink(ms_conn_t *c)
2293 {
2294 assert(c != NULL);
2295
2296 if (c->udp)
2297 return;
2298
2299 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2300 {
2301 char *newbuf;
2302
2303 if (c->rcurr != c->rbuf)
2304 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2305
2306 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2307
2308 if (newbuf)
2309 {
2310 c->rbuf= newbuf;
2311 c->rsize= DATA_BUFFER_SIZE;
2312 }
2313 c->rcurr= c->rbuf;
2314 }
2315
2316 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2317 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2318 {
2319 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2320 if (! new_rbuf)
2321 {
2322 c->rudpbuf= new_rbuf;
2323 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2324 }
2325 /* TODO check error condition? */
2326 }
2327
2328 if (c->msgsize > MSG_LIST_HIGHWAT)
2329 {
2330 struct msghdr *newbuf= (struct msghdr *)realloc(
2331 (void *)c->msglist,
2332 MSG_LIST_INITIAL
2333 * sizeof(c->msglist[0]));
2334 if (newbuf)
2335 {
2336 c->msglist= newbuf;
2337 c->msgsize= MSG_LIST_INITIAL;
2338 }
2339 /* TODO check error condition? */
2340 }
2341
2342 if (c->iovsize > IOV_LIST_HIGHWAT)
2343 {
2344 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2345 IOV_LIST_INITIAL
2346 * sizeof(c->iov[0]));
2347 if (newbuf)
2348 {
2349 c->iov= newbuf;
2350 c->iovsize= IOV_LIST_INITIAL;
2351 }
2352 /* TODO check return value */
2353 }
2354 } /* ms_conn_shrink */
2355
2356
2357 /**
2358 * Sets a connection's current state in the state machine. Any special
2359 * processing that needs to happen on certain state transitions can
2360 * happen here.
2361 *
2362 * @param c, pointer of the concurrency
2363 * @param state, connection state
2364 */
2365 static void ms_conn_set_state(ms_conn_t *c, int state)
2366 {
2367 assert(c != NULL);
2368
2369 if (state != c->state)
2370 {
2371 if (state == conn_read)
2372 {
2373 ms_conn_shrink(c);
2374 }
2375 c->state= state;
2376 }
2377 } /* ms_conn_set_state */
2378
2379
2380 /**
2381 * update the event if socks change state. for example: when
2382 * change the listen scoket read event to sock write event, or
2383 * change socket handler, we could call this function.
2384 *
2385 * @param c, pointer of the concurrency
2386 * @param new_flags, new event flags
2387 *
2388 * @return bool, if success, return true, else return false
2389 */
2390 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2391 {
2392 /* default event timeout 10 seconds */
2393 struct timeval t=
2394 {
2395 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
2396 };
2397
2398 assert(c != NULL);
2399
2400 struct event_base *base= c->event.ev_base;
2401 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2402 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2403 {
2404 return true;
2405 }
2406
2407 if (event_del(&c->event) == -1)
2408 {
2409 /* try to delete the event again */
2410 if (event_del(&c->event) == -1)
2411 {
2412 return false;
2413 }
2414 }
2415
2416 event_set(&c->event,
2417 c->sfd,
2418 (short)new_flags,
2419 ms_event_handler,
2420 (void *)c);
2421 event_base_set(base, &c->event);
2422 c->ev_flags= (short)new_flags;
2423
2424 if (c->total_sfds == 1)
2425 {
2426 if (event_add(&c->event, NULL) == -1)
2427 {
2428 return false;
2429 }
2430 }
2431 else
2432 {
2433 if (event_add(&c->event, &t) == -1)
2434 {
2435 return false;
2436 }
2437 }
2438
2439 return true;
2440 } /* ms_update_event */
2441
2442
2443 /**
2444 * If user want to get the expected throughput, we could limit
2445 * the performance of memslap. we could give up some work and
2446 * just wait a short time. The function is used to check this
2447 * case.
2448 *
2449 * @param c, pointer of the concurrency
2450 *
2451 * @return bool, if success, return true, else return false
2452 */
2453 static bool ms_need_yield(ms_conn_t *c)
2454 {
2455 int64_t tps= 0;
2456 int64_t time_diff= 0;
2457 struct timeval curr_time;
2458 ms_task_t *task= &c->curr_task;
2459
2460 if (ms_setting.expected_tps > 0)
2461 {
2462 gettimeofday(&curr_time, NULL);
2463 time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time);
2464 tps=
2465 (int64_t)((task->get_opt
2466 + task->set_opt) / ((uint64_t)time_diff / 1000000));
2467
2468 /* current throughput is greater than expected throughput */
2469 if (tps > ms_thread.thread_ctx->tps_perconn)
2470 {
2471 return true;
2472 }
2473 }
2474
2475 return false;
2476 } /* ms_need_yield */
2477
2478
2479 /**
2480 * used to update the start time of each operation
2481 *
2482 * @param c, pointer of the concurrency
2483 */
2484 static void ms_update_start_time(ms_conn_t *c)
2485 {
2486 ms_task_item_t *item= c->curr_task.item;
2487
2488 if ((ms_setting.stat_freq > 0) || c->udp
2489 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2490 {
2491 gettimeofday(&c->start_time, NULL);
2492 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2493 {
2494 /* record the current time */
2495 item->client_time= c->start_time.tv_sec;
2496 }
2497 }
2498 } /* ms_update_start_time */
2499
2500
2501 /**
2502 * run the state machine
2503 *
2504 * @param c, pointer of the concurrency
2505 */
2506 static void ms_drive_machine(ms_conn_t *c)
2507 {
2508 bool stop= false;
2509
2510 assert(c != NULL);
2511
2512 while (! stop)
2513 {
2514 switch (c->state)
2515 {
2516 case conn_read:
2517 if (c->readval)
2518 {
2519 if (c->rbytes >= c->rvbytes)
2520 {
2521 ms_complete_nread(c);
2522 break;
2523 }
2524 }
2525 else
2526 {
2527 if (ms_try_read_line(c) != 0)
2528 {
2529 break;
2530 }
2531 }
2532
2533 if (ms_try_read_network(c) != 0)
2534 {
2535 break;
2536 }
2537
2538 /* doesn't read all the response data, wait event wake up */
2539 if (! c->currcmd.isfinish)
2540 {
2541 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2542 {
2543 fprintf(stderr, "Couldn't update event.\n");
2544 ms_conn_set_state(c, conn_closing);
2545 break;
2546 }
2547 stop= true;
2548 break;
2549 }
2550
2551 /* we have no command line and no data to read from network, next write */
2552 ms_conn_set_state(c, conn_write);
2553 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2554
2555 break;
2556
2557 case conn_write:
2558 if (! c->ctnwrite && ms_need_yield(c))
2559 {
2560 usleep(10);
2561
2562 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2563 {
2564 fprintf(stderr, "Couldn't update event.\n");
2565 ms_conn_set_state(c, conn_closing);
2566 break;
2567 }
2568 stop= true;
2569 break;
2570 }
2571
2572 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2573 {
2574 ms_conn_set_state(c, conn_closing);
2575 break;
2576 }
2577
2578 /* record the start time before starting to send data if necessary */
2579 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2580 {
2581 if (c->change_sfd)
2582 {
2583 c->change_sfd= false;
2584 }
2585 ms_update_start_time(c);
2586 }
2587
2588 /* change sfd if necessary */
2589 if (c->change_sfd)
2590 {
2591 c->ctnwrite= true;
2592 stop= true;
2593 break;
2594 }
2595
2596 /* execute task until nothing need be written to network */
2597 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2598 {
2599 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2600 {
2601 fprintf(stderr, "Couldn't update event.\n");
2602 ms_conn_set_state(c, conn_closing);
2603 break;
2604 }
2605 stop= true;
2606 break;
2607 }
2608
2609 switch (ms_transmit(c))
2610 {
2611 case TRANSMIT_COMPLETE:
2612 /* we have no data to write to network, next wait repose */
2613 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2614 {
2615 fprintf(stderr, "Couldn't update event.\n");
2616 ms_conn_set_state(c, conn_closing);
2617 c->ctnwrite= false;
2618 break;
2619 }
2620 ms_conn_set_state(c, conn_read);
2621 c->ctnwrite= false;
2622 stop= true;
2623 break;
2624
2625 case TRANSMIT_INCOMPLETE:
2626 c->ctnwrite= true;
2627 break; /* Continue in state machine. */
2628
2629 case TRANSMIT_HARD_ERROR:
2630 c->ctnwrite= false;
2631 break;
2632
2633 case TRANSMIT_SOFT_ERROR:
2634 c->ctnwrite= true;
2635 stop= true;
2636 break;
2637
2638 default:
2639 break;
2640 } /* switch */
2641
2642 break;
2643
2644 case conn_closing:
2645 /* recovery mode, need reconnect if connection close */
2646 if (ms_setting.reconnect && (! ms_global.time_out
2647 || ((ms_setting.run_time == 0)
2648 && (c->remain_exec_num > 0))))
2649 {
2650 if (ms_reconn(c) != 0)
2651 {
2652 ms_conn_close(c);
2653 stop= true;
2654 break;
2655 }
2656
2657 ms_reset_conn(c, false);
2658
2659 if (c->total_sfds == 1)
2660 {
2661 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2662 {
2663 fprintf(stderr, "Couldn't update event.\n");
2664 ms_conn_set_state(c, conn_closing);
2665 break;
2666 }
2667 }
2668
2669 break;
2670 }
2671 else
2672 {
2673 ms_conn_close(c);
2674 stop= true;
2675 break;
2676 }
2677
2678 default:
2679 assert(0);
2680 } /* switch */
2681 }
2682 } /* ms_drive_machine */
2683
2684
2685 /**
2686 * the event handler of each thread
2687 *
2688 * @param fd, the file descriptor of socket
2689 * @param which, event flag
2690 * @param arg, argument
2691 */
2692 void ms_event_handler(const int fd, const short which, void *arg)
2693 {
2694 ms_conn_t *c= (ms_conn_t *)arg;
2695
2696 assert(c != NULL);
2697
2698 c->which= which;
2699
2700 /* sanity */
2701 if (fd != c->sfd)
2702 {
2703 fprintf(stderr,
2704 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2705 fd,
2706 c->sfd);
2707 ms_conn_close(c);
2708 exit(1);
2709 }
2710 assert(fd == c->sfd);
2711
2712 /* event timeout, close the current connection */
2713 if (c->which == EV_TIMEOUT)
2714 {
2715 ms_conn_set_state(c, conn_closing);
2716 }
2717
2718 ms_drive_machine(c);
2719
2720 /* wait for next event */
2721 } /* ms_event_handler */
2722
2723
2724 /**
2725 * get the next socket descriptor index to run for replication
2726 *
2727 * @param c, pointer of the concurrency
2728 * @param cmd, command(get or set )
2729 *
2730 * @return int, if success, return the index, else return 0
2731 */
2732 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2733 {
2734 int sock_index= -1;
2735 int i= 0;
2736
2737 if (c->total_sfds == 1)
2738 {
2739 return 0;
2740 }
2741
2742 if (ms_setting.rep_write_srv == 0)
2743 {
2744 return sock_index;
2745 }
2746
2747 do
2748 {
2749 if (cmd == CMD_SET)
2750 {
2751 for (i= 0; i < ms_setting.rep_write_srv; i++)
2752 {
2753 if (c->tcpsfd[i] > 0)
2754 {
2755 break;
2756 }
2757 }
2758
2759 if (i == ms_setting.rep_write_srv)
2760 {
2761 /* random get one replication server to read */
2762 sock_index= (int)(random() % c->total_sfds);
2763 }
2764 else
2765 {
2766 /* random get one replication writing server to write */
2767 sock_index= (int)(random() % ms_setting.rep_write_srv);
2768 }
2769 }
2770 else if (cmd == CMD_GET)
2771 {
2772 /* random get one replication server to read */
2773 sock_index= (int)(random() % c->total_sfds);
2774 }
2775 }
2776 while (c->tcpsfd[sock_index] == 0);
2777
2778 return sock_index;
2779 } /* ms_get_rep_sock_index */
2780
2781
2782 /**
2783 * get the next socket descriptor index to run
2784 *
2785 * @param c, pointer of the concurrency
2786 *
2787 * @return int, return the index
2788 */
2789 static int ms_get_next_sock_index(ms_conn_t *c)
2790 {
2791 int sock_index= 0;
2792
2793 do
2794 {
2795 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2796 }
2797 while (c->tcpsfd[sock_index] == 0);
2798
2799 return sock_index;
2800 } /* ms_get_next_sock_index */
2801
2802
2803 /**
2804 * update socket event of the connections
2805 *
2806 * @param c, pointer of the concurrency
2807 *
2808 * @return int, if success, return 0, else return -1
2809 */
2810 static int ms_update_conn_sock_event(ms_conn_t *c)
2811 {
2812 assert(c != NULL);
2813
2814 switch (c->currcmd.cmd)
2815 {
2816 case CMD_SET:
2817 if (ms_setting.facebook_test && c->udp)
2818 {
2819 c->sfd= c->tcpsfd[0];
2820 c->udp= false;
2821 c->change_sfd= true;
2822 }
2823 break;
2824
2825 case CMD_GET:
2826 if (ms_setting.facebook_test && ! c->udp)
2827 {
2828 c->sfd= c->udpsfd;
2829 c->udp= true;
2830 c->change_sfd= true;
2831 }
2832 break;
2833
2834 default:
2835 break;
2836 } /* switch */
2837
2838 if (! c->udp && (c->total_sfds > 1))
2839 {
2840 if (c->cur_idx != c->total_sfds)
2841 {
2842 if (ms_setting.rep_write_srv == 0)
2843 {
2844 c->cur_idx= ms_get_next_sock_index(c);
2845 }
2846 else
2847 {
2848 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2849 }
2850 }
2851 else
2852 {
2853 /* must select the first sock of the connection at the beginning */
2854 c->cur_idx= 0;
2855 }
2856
2857 c->sfd= c->tcpsfd[c->cur_idx];
2858 assert(c->sfd != 0);
2859 c->change_sfd= true;
2860 }
2861
2862 if (c->change_sfd)
2863 {
2864 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2865 {
2866 fprintf(stderr, "Couldn't update event.\n");
2867 ms_conn_set_state(c, conn_closing);
2868 return -1;
2869 }
2870 }
2871
2872 return 0;
2873 } /* ms_update_conn_sock_event */
2874
2875
2876 /**
2877 * for ASCII protocol, this function build the set command
2878 * string and send the command.
2879 *
2880 * @param c, pointer of the concurrency
2881 * @param item, pointer of task item which includes the object
2882 * information
2883 *
2884 * @return int, if success, return 0, else return -1
2885 */
2886 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2887 {
2888 int value_offset;
2889 int write_len;
2890 char *buffer= c->wbuf;
2891
2892 write_len= sprintf(buffer,
2893 " %u %d %d\r\n",
2894 0,
2895 item->exp_time,
2896 item->value_size);
2897
2898 if (write_len > c->wsize)
2899 {
2900 /* ought to be always enough. just fail for simplicity */
2901 fprintf(stderr, "output command line too long.\n");
2902 return -1;
2903 }
2904
2905 if (item->value_offset == INVALID_OFFSET)
2906 {
2907 value_offset= item->key_suffix_offset;
2908 }
2909 else
2910 {
2911 value_offset= item->value_offset;
2912 }
2913
2914 if ((ms_add_iov(c, "set ", 4) != 0)
2915 || (ms_add_iov(c, (char *)&item->key_prefix,
2916 (int)KEY_PREFIX_SIZE) != 0)
2917 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2918 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2919 || (ms_add_iov(c, buffer, write_len) != 0)
2920 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2921 item->value_size) != 0)
2922 || (ms_add_iov(c, "\r\n", 2) != 0)
2923 || (c->udp && (ms_build_udp_headers(c) != 0)))
2924 {
2925 return -1;
2926 }
2927
2928 return 0;
2929 } /* ms_build_ascii_write_buf_set */
2930
2931
2932 /**
2933 * used to send set command to server
2934 *
2935 * @param c, pointer of the concurrency
2936 * @param item, pointer of task item which includes the object
2937 * information
2938 *
2939 * @return int, if success, return 0, else return -1
2940 */
2941 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2942 {
2943 assert(c != NULL);
2944
2945 c->currcmd.cmd= CMD_SET;
2946 c->currcmd.isfinish= false;
2947 c->currcmd.retstat= MCD_FAILURE;
2948
2949 if (ms_update_conn_sock_event(c) != 0)
2950 {
2951 return -1;
2952 }
2953
2954 c->msgcurr= 0;
2955 c->msgused= 0;
2956 c->iovused= 0;
2957 if (ms_add_msghdr(c) != 0)
2958 {
2959 fprintf(stderr, "Out of memory preparing request.");
2960 return -1;
2961 }
2962
2963 /* binary protocol */
2964 if (c->protocol == binary_prot)
2965 {
2966 if (ms_build_bin_write_buf_set(c, item) != 0)
2967 {
2968 return -1;
2969 }
2970 }
2971 else
2972 {
2973 if (ms_build_ascii_write_buf_set(c, item) != 0)
2974 {
2975 return -1;
2976 }
2977 }
2978
2979 __sync_fetch_and_add(&ms_stats.obj_bytes,
2980 item->key_size + item->value_size);
2981 __sync_fetch_and_add(&ms_stats.cmd_set, 1);
2982
2983 return 0;
2984 } /* ms_mcd_set */
2985
2986
2987 /**
2988 * for ASCII protocol, this function build the get command
2989 * string and send the command.
2990 *
2991 * @param c, pointer of the concurrency
2992 * @param item, pointer of task item which includes the object
2993 * information
2994 *
2995 * @return int, if success, return 0, else return -1
2996 */
2997 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
2998 {
2999 if ((ms_add_iov(c, "get ", 4) != 0)
3000 || (ms_add_iov(c, (char *)&item->key_prefix,
3001 (int)KEY_PREFIX_SIZE) != 0)
3002 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3003 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3004 || (ms_add_iov(c, "\r\n", 2) != 0)
3005 || (c->udp && (ms_build_udp_headers(c) != 0)))
3006 {
3007 return -1;
3008 }
3009
3010 return 0;
3011 } /* ms_build_ascii_write_buf_get */
3012
3013
3014 /**
3015 * used to send the get command to server
3016 *
3017 * @param c, pointer of the concurrency
3018 * @param item, pointer of task item which includes the object
3019 * information
3020 * @param verify, whether do verification
3021 *
3022 * @return int, if success, return 0, else return -1
3023 */
3024 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
3025 {
3026 /* verify not supported yet */
3027 UNUSED_ARGUMENT(verify);
3028
3029 assert(c != NULL);
3030
3031 c->currcmd.cmd= CMD_GET;
3032 c->currcmd.isfinish= false;
3033 c->currcmd.retstat= MCD_FAILURE;
3034
3035 if (ms_update_conn_sock_event(c) != 0)
3036 {
3037 return -1;
3038 }
3039
3040 c->msgcurr= 0;
3041 c->msgused= 0;
3042 c->iovused= 0;
3043 if (ms_add_msghdr(c) != 0)
3044 {
3045 fprintf(stderr, "Out of memory preparing request.");
3046 return -1;
3047 }
3048
3049 /* binary protocol */
3050 if (c->protocol == binary_prot)
3051 {
3052 if (ms_build_bin_write_buf_get(c, item) != 0)
3053 {
3054 return -1;
3055 }
3056 }
3057 else
3058 {
3059 if (ms_build_ascii_write_buf_get(c, item) != 0)
3060 {
3061 return -1;
3062 }
3063 }
3064
3065 __sync_fetch_and_add(&ms_stats.cmd_get, 1);
3066
3067 return 0;
3068 } /* ms_mcd_get */
3069
3070
3071 /**
3072 * for ASCII protocol, this function build the multi-get command
3073 * string and send the command.
3074 *
3075 * @param c, pointer of the concurrency
3076 *
3077 * @return int, if success, return 0, else return -1
3078 */
3079 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3080 {
3081 ms_task_item_t *item;
3082
3083 if (ms_add_iov(c, "get", 3) != 0)
3084 {
3085 return -1;
3086 }
3087
3088 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3089 {
3090 item= c->mlget_task.mlget_item[i].item;
3091 assert(item != NULL);
3092 if ((ms_add_iov(c, " ", 1) != 0)
3093 || (ms_add_iov(c, (char *)&item->key_prefix,
3094 (int)KEY_PREFIX_SIZE) != 0)
3095 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3096 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3097 {
3098 return -1;
3099 }
3100 }
3101
3102 if ((ms_add_iov(c, "\r\n", 2) != 0)
3103 || (c->udp && (ms_build_udp_headers(c) != 0)))
3104 {
3105 return -1;
3106 }
3107
3108 return 0;
3109 } /* ms_build_ascii_write_buf_mlget */
3110
3111
3112 /**
3113 * used to send the multi-get command to server
3114 *
3115 * @param c, pointer of the concurrency
3116 *
3117 * @return int, if success, return 0, else return -1
3118 */
3119 int ms_mcd_mlget(ms_conn_t *c)
3120 {
3121 ms_task_item_t *item;
3122
3123 assert(c != NULL);
3124 assert(c->mlget_task.mlget_num >= 1);
3125
3126 c->currcmd.cmd= CMD_GET;
3127 c->currcmd.isfinish= false;
3128 c->currcmd.retstat= MCD_FAILURE;
3129
3130 if (ms_update_conn_sock_event(c) != 0)
3131 {
3132 return -1;
3133 }
3134
3135 c->msgcurr= 0;
3136 c->msgused= 0;
3137 c->iovused= 0;
3138 if (ms_add_msghdr(c) != 0)
3139 {
3140 fprintf(stderr, "Out of memory preparing request.");
3141 return -1;
3142 }
3143
3144 /* binary protocol */
3145 if (c->protocol == binary_prot)
3146 {
3147 if (ms_build_bin_write_buf_mlget(c) != 0)
3148 {
3149 return -1;
3150 }
3151 }
3152 else
3153 {
3154 if (ms_build_ascii_write_buf_mlget(c) != 0)
3155 {
3156 return -1;
3157 }
3158 }
3159
3160 /* decrease operation time of each item */
3161 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3162 {
3163 item= c->mlget_task.mlget_item[i].item;
3164 __sync_fetch_and_add(&ms_stats.cmd_get, 1);
3165 }
3166
3167 return 0;
3168 } /* ms_mcd_mlget */
3169
3170
3171 /**
3172 * binary protocol support
3173 */
3174
3175 /**
3176 * for binary protocol, parse the response of server
3177 *
3178 * @param c, pointer of the concurrency
3179 *
3180 * @return int, if success, return 0, else return -1
3181 */
3182 static int ms_bin_process_response(ms_conn_t *c)
3183 {
3184 const char *errstr= NULL;
3185
3186 assert(c != NULL);
3187
3188 uint32_t bodylen= c->binary_header.response.bodylen;
3189 uint8_t opcode= c->binary_header.response.opcode;
3190 uint16_t status= c->binary_header.response.status;
3191
3192 if (bodylen > 0)
3193 {
3194 c->rvbytes= (int32_t)bodylen;
3195 c->readval= true;
3196 return 1;
3197 }
3198 else
3199 {
3200 switch (status)
3201 {
3202 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3203 if (opcode == PROTOCOL_BINARY_CMD_SET)
3204 {
3205 c->currcmd.retstat= MCD_STORED;
3206 }
3207 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3208 {
3209 c->currcmd.retstat= MCD_DELETED;
3210 }
3211 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3212 {
3213 c->currcmd.retstat= MCD_END;
3214 }
3215 break;
3216
3217 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3218 errstr= "Out of memory";
3219 c->currcmd.retstat= MCD_SERVER_ERROR;
3220 break;
3221
3222 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3223 errstr= "Unknown command";
3224 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3225 break;
3226
3227 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3228 errstr= "Not found";
3229 c->currcmd.retstat= MCD_NOTFOUND;
3230 break;
3231
3232 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3233 errstr= "Invalid arguments";
3234 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3235 break;
3236
3237 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3238 errstr= "Data exists for key.";
3239 break;
3240
3241 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3242 errstr= "Too large.";
3243 c->currcmd.retstat= MCD_SERVER_ERROR;
3244 break;
3245
3246 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3247 errstr= "Not stored.";
3248 c->currcmd.retstat= MCD_NOTSTORED;
3249 break;
3250
3251 default:
3252 errstr= "Unknown error";
3253 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3254 break;
3255 } /* switch */
3256
3257 if (errstr != NULL)
3258 {
3259 fprintf(stderr, "%s\n", errstr);
3260 }
3261 }
3262
3263 return 0;
3264 } /* ms_bin_process_response */
3265
3266
3267 /* build binary header and add the header to the buffer to send */
3268
3269 /**
3270 * build binary header and add the header to the buffer to send
3271 *
3272 * @param c, pointer of the concurrency
3273 * @param opcode, operation code
3274 * @param hdr_len, length of header
3275 * @param key_len, length of key
3276 * @param body_len. length of body
3277 */
3278 static void ms_add_bin_header(ms_conn_t *c,
3279 uint8_t opcode,
3280 uint8_t hdr_len,
3281 uint16_t key_len,
3282 uint32_t body_len)
3283 {
3284 protocol_binary_request_header *header;
3285
3286 assert(c != NULL);
3287
3288 header= (protocol_binary_request_header *)c->wcurr;
3289
3290 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3291 header->request.opcode= (uint8_t)opcode;
3292 header->request.keylen= htonl(key_len);
3293
3294 header->request.extlen= (uint8_t)hdr_len;
3295 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3296 header->request.reserved= 0;
3297
3298 header->request.bodylen= htonl(body_len);
3299 header->request.opaque= 0;
3300 header->request.cas= 0;
3301
3302 ms_add_iov(c, c->wcurr, sizeof(header->request));
3303 } /* ms_add_bin_header */
3304
3305
3306 /**
3307 * add the key to the socket write buffer array
3308 *
3309 * @param c, pointer of the concurrency
3310 * @param item, pointer of task item which includes the object
3311 * information
3312 */
3313 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3314 {
3315 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3316 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3317 item->key_size - (int)KEY_PREFIX_SIZE);
3318 }
3319
3320
3321 /**
3322 * for binary protocol, this function build the set command
3323 * and add the command to send buffer array.
3324 *
3325 * @param c, pointer of the concurrency
3326 * @param item, pointer of task item which includes the object
3327 * information
3328 *
3329 * @return int, if success, return 0, else return -1
3330 */
3331 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3332 {
3333 assert(c->wbuf == c->wcurr);
3334
3335 int value_offset;
3336 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3337 uint16_t keylen= (uint16_t)item->key_size;
3338 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3339 + (uint32_t)keylen + (uint32_t)item->value_size;
3340
3341 ms_add_bin_header(c,
3342 PROTOCOL_BINARY_CMD_SET,
3343 sizeof(rep->message.body),
3344 keylen,
3345 bodylen);
3346 rep->message.body.flags= 0;
3347 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3348 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3349 ms_add_key_to_iov(c, item);
3350
3351 if (item->value_offset == INVALID_OFFSET)
3352 {
3353 value_offset= item->key_suffix_offset;
3354 }
3355 else
3356 {
3357 value_offset= item->value_offset;
3358 }
3359 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3360
3361 return 0;
3362 } /* ms_build_bin_write_buf_set */
3363
3364
3365 /**
3366 * for binary protocol, this function build the get command and
3367 * add the command to send buffer array.
3368 *
3369 * @param c, pointer of the concurrency
3370 * @param item, pointer of task item which includes the object
3371 * information
3372 *
3373 * @return int, if success, return 0, else return -1
3374 */
3375 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3376 {
3377 assert(c->wbuf == c->wcurr);
3378
3379 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3380 (uint32_t)item->key_size);
3381 ms_add_key_to_iov(c, item);
3382
3383 return 0;
3384 } /* ms_build_bin_write_buf_get */
3385
3386
3387 /**
3388 * for binary protocol, this function build the multi-get
3389 * command and add the command to send buffer array.
3390 *
3391 * @param c, pointer of the concurrency
3392 * @param item, pointer of task item which includes the object
3393 * information
3394 *
3395 * @return int, if success, return 0, else return -1
3396 */
3397 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3398 {
3399 ms_task_item_t *item;
3400
3401 assert(c->wbuf == c->wcurr);
3402
3403 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3404 {
3405 item= c->mlget_task.mlget_item[i].item;
3406 assert(item != NULL);
3407
3408 ms_add_bin_header(c,
3409 PROTOCOL_BINARY_CMD_GET,
3410 0,
3411 (uint16_t)item->key_size,
3412 (uint32_t)item->key_size);
3413 ms_add_key_to_iov(c, item);
3414 c->wcurr+= sizeof(protocol_binary_request_get);
3415 }
3416
3417 c->wcurr= c->wbuf;
3418
3419 return 0;
3420 } /* ms_build_bin_write_buf_mlget */