Uncrustify
[m6w6/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11 #include <stdio.h>
12 #include <limits.h>
13 #include <sys/uio.h>
14 #include <event.h>
15 #include <fcntl.h>
16 #include <netinet/tcp.h>
17 #include <arpa/inet.h>
18 #include "ms_setting.h"
19 #include "ms_thread.h"
20
21 /* for network write */
22 #define TRANSMIT_COMPLETE 0
23 #define TRANSMIT_INCOMPLETE 1
24 #define TRANSMIT_SOFT_ERROR 2
25 #define TRANSMIT_HARD_ERROR 3
26
27 /* for generating key */
28 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
29 #define KEY_PREFIX_MASK 0x1010101010101010
30
31 /* For parse the value length return by server */
32 #define KEY_TOKEN 1
33 #define VALUELEN_TOKEN 3
34
35 /* global increasing counter, to ensure the key prefix unique */
36 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
37
38 /* global increasing counter, generating request id for UDP */
39 static int udp_request_id= 0;
40
41 extern __thread ms_thread_t ms_thread;
42
43 /* generate upd request id */
44 static int ms_get_udp_request_id(void);
45
46
47 /* connect initialize */
48 static void ms_task_init(ms_conn_t *c);
49 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
50 static int ms_conn_sock_init(ms_conn_t *c);
51 static int ms_conn_event_init(ms_conn_t *c);
52 static int ms_conn_init(ms_conn_t *c,
53 const int init_state,
54 const int read_buffer_size,
55 const bool is_udp);
56 static void ms_warmup_num_init(ms_conn_t *c);
57 static int ms_item_win_init(ms_conn_t *c);
58
59
60 /* connection close */
61 void ms_conn_free(ms_conn_t *c);
62 static void ms_conn_close(ms_conn_t *c);
63
64
65 /* create network connection */
66 static int ms_new_socket(struct addrinfo *ai);
67 static void ms_maximize_sndbuf(const int sfd);
68 static int ms_network_connect(ms_conn_t *c,
69 char *srv_host_name,
70 const int srv_port,
71 const bool is_udp,
72 int *ret_sfd);
73 static int ms_reconn(ms_conn_t *c);
74
75
76 /* read and parse */
77 static int ms_tokenize_command(char *command,
78 token_t *tokens,
79 const int max_tokens);
80 static int ms_ascii_process_line(ms_conn_t *c, char *command);
81 static int ms_try_read_line(ms_conn_t *c);
82 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
83 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
84 static int ms_try_read_network(ms_conn_t *c);
85 static void ms_verify_value(ms_conn_t *c,
86 ms_mlget_task_item_t *mlget_item,
87 char *value,
88 int vlen);
89 static void ms_ascii_complete_nread(ms_conn_t *c);
90 static void ms_bin_complete_nread(ms_conn_t *c);
91 static void ms_complete_nread(ms_conn_t *c);
92
93
94 /* send functions */
95 static int ms_add_msghdr(ms_conn_t *c);
96 static int ms_ensure_iov_space(ms_conn_t *c);
97 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
98 static int ms_build_udp_headers(ms_conn_t *c);
99 static int ms_transmit(ms_conn_t *c);
100
101
102 /* status adjustment */
103 static void ms_conn_shrink(ms_conn_t *c);
104 static void ms_conn_set_state(ms_conn_t *c, int state);
105 static bool ms_update_event(ms_conn_t *c, const int new_flags);
106 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
107 static int ms_get_next_sock_index(ms_conn_t *c);
108 static int ms_update_conn_sock_event(ms_conn_t *c);
109 static bool ms_need_yield(ms_conn_t *c);
110 static void ms_update_start_time(ms_conn_t *c);
111
112
113 /* main loop */
114 static void ms_drive_machine(ms_conn_t *c);
115 void ms_event_handler(const int fd, const short which, void *arg);
116
117
118 /* ascii protocol */
119 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
120 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
121 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
122
123
124 /* binary protocol */
125 static int ms_bin_process_response(ms_conn_t *c);
126 static void ms_add_bin_header(ms_conn_t *c,
127 uint8_t opcode,
128 uint8_t hdr_len,
129 uint16_t key_len,
130 uint32_t body_len);
131 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
132 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
133 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
134 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
135
136
137 /**
138 * each key has two parts, prefix and suffix. The suffix is a
139 * string random get form the character table. The prefix is a
140 * uint64_t variable. And the prefix must be unique. we use the
141 * prefix to identify a key. And the prefix can't include
142 * character ' ' '\r' '\n' '\0'.
143 *
144 * @return uint64_t
145 */
146 uint64_t ms_get_key_prefix(void)
147 {
148 uint64_t key_prefix;
149
150 pthread_mutex_lock(&ms_global.seq_mutex);
151 key_prefix_seq|= KEY_PREFIX_MASK;
152 key_prefix= key_prefix_seq;
153 key_prefix_seq++;
154 pthread_mutex_unlock(&ms_global.seq_mutex);
155
156 return key_prefix;
157 } /* ms_get_key_prefix */
158
159
160 /**
161 * get an unique udp request id
162 *
163 * @return an unique UDP request id
164 */
165 static int ms_get_udp_request_id(void)
166 {
167 return __sync_fetch_and_add(&udp_request_id, 1);
168 }
169
170
171 /**
172 * initialize current task structure
173 *
174 * @param c, pointer of the concurrency
175 */
176 static void ms_task_init(ms_conn_t *c)
177 {
178 c->curr_task.cmd= CMD_NULL;
179 c->curr_task.item= 0;
180 c->curr_task.verify= false;
181 c->curr_task.finish_verify= true;
182 c->curr_task.get_miss= true;
183
184 c->curr_task.get_opt= 0;
185 c->curr_task.set_opt= 0;
186 c->curr_task.cycle_undo_get= 0;
187 c->curr_task.cycle_undo_set= 0;
188 c->curr_task.verified_get= 0;
189 c->curr_task.overwrite_set= 0;
190 } /* ms_task_init */
191
192
193 /**
194 * initialize udp for the connection structure
195 *
196 * @param c, pointer of the concurrency
197 * @param is_udp, whether it's udp
198 *
199 * @return int, if success, return 0, else return -1
200 */
201 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
202 {
203 c->hdrbuf= 0;
204 c->rudpbuf= 0;
205 c->udppkt= 0;
206
207 c->rudpsize= UDP_DATA_BUFFER_SIZE;
208 c->hdrsize= 0;
209
210 c->rudpbytes= 0;
211 c->packets= 0;
212 c->recvpkt= 0;
213 c->pktcurr= 0;
214 c->ordcurr= 0;
215
216 c->udp= is_udp;
217
218 if (c->udp || (! c->udp && ms_setting.facebook_test))
219 {
220 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
221 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
222
223 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
224 {
225 if (c->rudpbuf != NULL)
226 free(c->rudpbuf);
227 if (c->udppkt != NULL)
228 free(c->udppkt);
229 fprintf(stderr, "malloc()\n");
230 return -1;
231 }
232 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
233 }
234
235 return 0;
236 } /* ms_conn_udp_init */
237
238
239 /**
240 * initialize the connection structure
241 *
242 * @param c, pointer of the concurrency
243 * @param init_state, (conn_read, conn_write, conn_closing)
244 * @param read_buffer_size
245 * @param is_udp, whether it's udp
246 *
247 * @return int, if success, return 0, else return -1
248 */
249 static int ms_conn_init(ms_conn_t *c,
250 const int init_state,
251 const int read_buffer_size,
252 const bool is_udp)
253 {
254 assert(c != NULL);
255
256 c->rbuf= c->wbuf= 0;
257 c->iov= 0;
258 c->msglist= 0;
259
260 c->rsize= read_buffer_size;
261 c->wsize= WRITE_BUFFER_SIZE;
262 c->iovsize= IOV_LIST_INITIAL;
263 c->msgsize= MSG_LIST_INITIAL;
264
265 /* for replication, each connection need connect all the server */
266 if (ms_setting.rep_write_srv > 0)
267 {
268 c->total_sfds= ms_setting.srv_cnt;
269 }
270 else
271 {
272 c->total_sfds= ms_setting.sock_per_conn;
273 }
274 c->alive_sfds= 0;
275
276 c->rbuf= (char *)malloc((size_t)c->rsize);
277 c->wbuf= (char *)malloc((size_t)c->wsize);
278 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
279 c->msglist= (struct msghdr *)malloc(
280 sizeof(struct msghdr) * (size_t)c->msgsize);
281 if (ms_setting.mult_key_num > 1)
282 {
283 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
284 malloc(
285 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
286 }
287 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
288
289 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
290 || (c->msglist == NULL) || (c->tcpsfd == NULL)
291 || ((ms_setting.mult_key_num > 1)
292 && (c->mlget_task.mlget_item == NULL)))
293 {
294 if (c->rbuf != NULL)
295 free(c->rbuf);
296 if (c->wbuf != NULL)
297 free(c->wbuf);
298 if (c->iov != NULL)
299 free(c->iov);
300 if (c->msglist != NULL)
301 free(c->msglist);
302 if (c->mlget_task.mlget_item != NULL)
303 free(c->mlget_task.mlget_item);
304 if (c->tcpsfd != NULL)
305 free(c->tcpsfd);
306 fprintf(stderr, "malloc()\n");
307 return -1;
308 }
309
310 c->state= init_state;
311 c->rvbytes= 0;
312 c->rbytes= 0;
313 c->rcurr= c->rbuf;
314 c->wcurr= c->wbuf;
315 c->iovused= 0;
316 c->msgcurr= 0;
317 c->msgused= 0;
318 c->cur_idx= c->total_sfds; /* default index is a invalid value */
319
320 c->ctnwrite= false;
321 c->readval= false;
322 c->change_sfd= false;
323
324 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
325 c->precmd.isfinish= true; /* default the previous command finished */
326 c->currcmd.isfinish= false;
327 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
328 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
329
330 c->mlget_task.mlget_num= 0;
331 c->mlget_task.value_index= -1; /* default invalid value */
332
333 if (ms_setting.binary_prot)
334 {
335 c->protocol= binary_prot;
336 }
337 else if (is_udp)
338 {
339 c->protocol= ascii_udp_prot;
340 }
341 else
342 {
343 c->protocol= ascii_prot;
344 }
345
346 /* initialize udp */
347 if (ms_conn_udp_init(c, is_udp) != 0)
348 {
349 return -1;
350 }
351
352 /* initialize task */
353 ms_task_init(c);
354
355 if (! (ms_setting.facebook_test && is_udp))
356 {
357 __sync_fetch_and_add(&ms_stats.active_conns, 1);
358 }
359
360 return 0;
361 } /* ms_conn_init */
362
363
364 /**
365 * when doing 100% get operation, it could preset some objects
366 * to warmup the server. this function is used to initialize the
367 * number of the objects to preset.
368 *
369 * @param c, pointer of the concurrency
370 */
371 static void ms_warmup_num_init(ms_conn_t *c)
372 {
373 /* no set operation, preset all the items in the window */
374 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
375 {
376 c->warmup_num= c->win_size;
377 c->remain_warmup_num= c->warmup_num;
378 }
379 else
380 {
381 c->warmup_num= 0;
382 c->remain_warmup_num= c->warmup_num;
383 }
384 } /* ms_warmup_num_init */
385
386
387 /**
388 * each connection has an item window, this function initialize
389 * the window. The window is used to generate task.
390 *
391 * @param c, pointer of the concurrency
392 *
393 * @return int, if success, return 0, else return -1
394 */
395 static int ms_item_win_init(ms_conn_t *c)
396 {
397 int exp_cnt= 0;
398
399 c->win_size= (int)ms_setting.win_size;
400 c->set_cursor= 0;
401 c->exec_num= ms_thread.thread_ctx->exec_num_perconn;
402 c->remain_exec_num= c->exec_num;
403
404 c->item_win= (ms_task_item_t *)malloc(
405 sizeof(ms_task_item_t) * (size_t)c->win_size);
406 if (c->item_win == NULL)
407 {
408 fprintf(stderr, "Can't allocate task item array for conn.\n");
409 return -1;
410 }
411 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
412
413 for (int i= 0; i < c->win_size; i++)
414 {
415 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
416 c->item_win[i].key_prefix= ms_get_key_prefix();
417 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
418 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
419 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
420 c->item_win[i].client_time= 0;
421
422 /* set expire time base on the proportion */
423 if (exp_cnt < ms_setting.exp_ver_per * i)
424 {
425 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
426 exp_cnt++;
427 }
428 else
429 {
430 c->item_win[i].exp_time= 0;
431 }
432 }
433
434 ms_warmup_num_init(c);
435
436 return 0;
437 } /* ms_item_win_init */
438
439
440 /**
441 * each connection structure can include one or more sock
442 * handlers. this function create these socks and connect the
443 * server(s).
444 *
445 * @param c, pointer of the concurrency
446 *
447 * @return int, if success, return 0, else return -1
448 */
449 static int ms_conn_sock_init(ms_conn_t *c)
450 {
451 int i;
452 int ret_sfd;
453 int srv_idx= 0;
454
455 assert(c != NULL);
456 assert(c->tcpsfd != NULL);
457
458 for (i= 0; i < c->total_sfds; i++)
459 {
460 ret_sfd= 0;
461 if (ms_setting.rep_write_srv > 0)
462 {
463 /* for replication, each connection need connect all the server */
464 srv_idx= i;
465 }
466 else
467 {
468 /* all the connections in a thread connects the same server */
469 srv_idx= ms_thread.thread_ctx->srv_idx;
470 }
471
472 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
473 ms_setting.servers[srv_idx].srv_port,
474 ms_setting.udp, &ret_sfd) != 0)
475 {
476 break;
477 }
478
479 if (i == 0)
480 {
481 c->sfd= ret_sfd;
482 }
483
484 if (! ms_setting.udp)
485 {
486 c->tcpsfd[i]= ret_sfd;
487 }
488
489 c->alive_sfds++;
490 }
491
492 /* initialize udp sock handler if necessary */
493 if (ms_setting.facebook_test)
494 {
495 ret_sfd= 0;
496 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
497 ms_setting.servers[srv_idx].srv_port,
498 true, &ret_sfd) != 0)
499 {
500 c->udpsfd= 0;
501 }
502 else
503 {
504 c->udpsfd= ret_sfd;
505 }
506 }
507
508 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
509 {
510 if (ms_setting.udp)
511 {
512 close(c->sfd);
513 }
514 else
515 {
516 for (int j= 0; j < i; j++)
517 {
518 close(c->tcpsfd[j]);
519 }
520 }
521
522 if (c->udpsfd != 0)
523 {
524 close(c->udpsfd);
525 }
526
527 return -1;
528 }
529
530 return 0;
531 } /* ms_conn_sock_init */
532
533
534 /**
535 * each connection is managed by libevent, this function
536 * initialize the event of the connection structure.
537 *
538 * @param c, pointer of the concurrency
539 *
540 * @return int, if success, return 0, else return -1
541 */
542 static int ms_conn_event_init(ms_conn_t *c)
543 {
544 /* default event timeout 10 seconds */
545 struct timeval t=
546 {
547 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
548 };
549 short event_flags= EV_WRITE | EV_PERSIST;
550
551 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
552 event_base_set(ms_thread.base, &c->event);
553 c->ev_flags= event_flags;
554
555 if (c->total_sfds == 1)
556 {
557 if (event_add(&c->event, NULL) == -1)
558 {
559 return -1;
560 }
561 }
562 else
563 {
564 if (event_add(&c->event, &t) == -1)
565 {
566 return -1;
567 }
568 }
569
570 return 0;
571 } /* ms_conn_event_init */
572
573
574 /**
575 * setup a connection, each connection structure of each
576 * thread must call this function to initialize.
577 *
578 * @param c, pointer of the concurrency
579 *
580 * @return int, if success, return 0, else return -1
581 */
582 int ms_setup_conn(ms_conn_t *c)
583 {
584 if (ms_item_win_init(c) != 0)
585 {
586 return -1;
587 }
588
589 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
590 {
591 return -1;
592 }
593
594 if (ms_conn_sock_init(c) != 0)
595 {
596 return -1;
597 }
598
599 if (ms_conn_event_init(c) != 0)
600 {
601 return -1;
602 }
603
604 return 0;
605 } /* ms_setup_conn */
606
607
608 /**
609 * Frees a connection.
610 *
611 * @param c, pointer of the concurrency
612 */
613 void ms_conn_free(ms_conn_t *c)
614 {
615 if (c != NULL)
616 {
617 if (c->hdrbuf != NULL)
618 free(c->hdrbuf);
619 if (c->msglist != NULL)
620 free(c->msglist);
621 if (c->rbuf != NULL)
622 free(c->rbuf);
623 if (c->wbuf != NULL)
624 free(c->wbuf);
625 if (c->iov != NULL)
626 free(c->iov);
627 if (c->mlget_task.mlget_item != NULL)
628 free(c->mlget_task.mlget_item);
629 if (c->rudpbuf != NULL)
630 free(c->rudpbuf);
631 if (c->udppkt != NULL)
632 free(c->udppkt);
633 if (c->item_win != NULL)
634 free(c->item_win);
635 if (c->tcpsfd != NULL)
636 free(c->tcpsfd);
637
638 if (--ms_thread.nactive_conn == 0)
639 {
640 free(ms_thread.conn);
641 }
642 }
643 } /* ms_conn_free */
644
645
646 /**
647 * close a connection
648 *
649 * @param c, pointer of the concurrency
650 */
651 static void ms_conn_close(ms_conn_t *c)
652 {
653 assert(c != NULL);
654
655 /* delete the event, the socket and the connection */
656 event_del(&c->event);
657
658 for (int i= 0; i < c->total_sfds; i++)
659 {
660 if (c->tcpsfd[i] > 0)
661 {
662 close(c->tcpsfd[i]);
663 }
664 }
665 c->sfd= 0;
666
667 if (ms_setting.facebook_test)
668 {
669 close(c->udpsfd);
670 }
671
672 __sync_fetch_and_sub(&ms_stats.active_conns, 1);
673
674 ms_conn_free(c);
675
676 if (ms_setting.run_time == 0)
677 {
678 pthread_mutex_lock(&ms_global.run_lock.lock);
679 ms_global.run_lock.count++;
680 pthread_cond_signal(&ms_global.run_lock.cond);
681 pthread_mutex_unlock(&ms_global.run_lock.lock);
682 }
683
684 if (ms_thread.nactive_conn == 0)
685 {
686 pthread_exit(NULL);
687 }
688 } /* ms_conn_close */
689
690
691 /**
692 * create a new sock
693 *
694 * @param ai, server address information
695 *
696 * @return int, if success, return 0, else return -1
697 */
698 static int ms_new_socket(struct addrinfo *ai)
699 {
700 int sfd;
701
702 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
703 {
704 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
705 return -1;
706 }
707
708 return sfd;
709 } /* ms_new_socket */
710
711
712 /**
713 * Sets a socket's send buffer size to the maximum allowed by the system.
714 *
715 * @param sfd, file descriptor of socket
716 */
717 static void ms_maximize_sndbuf(const int sfd)
718 {
719 socklen_t intsize= sizeof(int);
720 unsigned int last_good= 0;
721 unsigned int min, max, avg;
722 unsigned int old_size;
723
724 /* Start with the default size. */
725 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
726 {
727 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
728 return;
729 }
730
731 /* Binary-search for the real maximum. */
732 min= old_size;
733 max= MAX_SENDBUF_SIZE;
734
735 while (min <= max)
736 {
737 avg= ((unsigned int)(min + max)) / 2;
738 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
739 {
740 last_good= avg;
741 min= avg + 1;
742 }
743 else
744 {
745 max= avg - 1;
746 }
747 }
748 } /* ms_maximize_sndbuf */
749
750
751 /**
752 * socket connects the server
753 *
754 * @param c, pointer of the concurrency
755 * @param srv_host_name, the host name of the server
756 * @param srv_port, port of server
757 * @param is_udp, whether it's udp
758 * @param ret_sfd, the connected socket file descriptor
759 *
760 * @return int, if success, return 0, else return -1
761 */
762 static int ms_network_connect(ms_conn_t *c,
763 char *srv_host_name,
764 const int srv_port,
765 const bool is_udp,
766 int *ret_sfd)
767 {
768 int sfd;
769 struct linger ling=
770 {
771 0, 0
772 };
773 struct addrinfo *ai;
774 struct addrinfo *next;
775 struct addrinfo hints;
776 char port_buf[NI_MAXSERV];
777 int error;
778 int success= 0;
779
780 int flags= 1;
781
782 /*
783 * the memset call clears nonstandard fields in some impementations
784 * that otherwise mess things up.
785 */
786 memset(&hints, 0, sizeof(hints));
787 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
788 if (is_udp)
789 {
790 hints.ai_protocol= IPPROTO_UDP;
791 hints.ai_socktype= SOCK_DGRAM;
792 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
793 }
794 else
795 {
796 hints.ai_family= AF_UNSPEC;
797 hints.ai_protocol= IPPROTO_TCP;
798 hints.ai_socktype= SOCK_STREAM;
799 }
800
801 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
802 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
803 if (error != 0)
804 {
805 if (error != EAI_SYSTEM)
806 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
807 else
808 perror("getaddrinfo()\n");
809
810 return -1;
811 }
812
813 for (next= ai; next; next= next->ai_next)
814 {
815 if ((sfd= ms_new_socket(next)) == -1)
816 {
817 freeaddrinfo(ai);
818 return -1;
819 }
820
821 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
822 if (is_udp)
823 {
824 ms_maximize_sndbuf(sfd);
825 }
826 else
827 {
828 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
829 sizeof(flags));
830 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
831 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
832 sizeof(flags));
833 }
834
835 if (is_udp)
836 {
837 c->srv_recv_addr_size= sizeof(struct sockaddr);
838 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
839 }
840 else
841 {
842 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
843 {
844 close(sfd);
845 freeaddrinfo(ai);
846 return -1;
847 }
848 }
849
850 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
851 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
852 {
853 fprintf(stderr, "setting O_NONBLOCK\n");
854 close(sfd);
855 freeaddrinfo(ai);
856 return -1;
857 }
858
859 if (ret_sfd != NULL)
860 {
861 *ret_sfd= sfd;
862 }
863
864 success++;
865 }
866
867 freeaddrinfo(ai);
868
869 /* Return zero if we detected no errors in starting up connections */
870 return success == 0;
871 } /* ms_network_connect */
872
873
874 /**
875 * reconnect a disconnected sock
876 *
877 * @param c, pointer of the concurrency
878 *
879 * @return int, if success, return 0, else return -1
880 */
881 static int ms_reconn(ms_conn_t *c)
882 {
883 int srv_idx= 0;
884 int srv_conn_cnt= 0;
885
886 if (ms_setting.rep_write_srv > 0)
887 {
888 srv_idx= c->cur_idx;
889 srv_conn_cnt= ms_setting.nconns;
890 }
891 else
892 {
893 srv_idx= ms_thread.thread_ctx->srv_idx;
894 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
895 }
896
897 /* close the old socket handler */
898 close(c->sfd);
899 c->tcpsfd[c->cur_idx]= 0;
900
901 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
902 % srv_conn_cnt == 0)
903 {
904 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
905 fprintf(stderr, "Server %s:%d disconnect\n",
906 ms_setting.servers[srv_idx].srv_host_name,
907 ms_setting.servers[srv_idx].srv_port);
908 }
909
910 if (ms_setting.rep_write_srv > 0)
911 {
912 int i= 0;
913 for (i= 0; i < c->total_sfds; i++)
914 {
915 if (c->tcpsfd[i] != 0)
916 {
917 break;
918 }
919 }
920
921 /* all socks disconnect */
922 if (i == c->total_sfds)
923 {
924 return -1;
925 }
926 }
927 else
928 {
929 do
930 {
931 /* reconnect success, break the loop */
932 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
933 ms_setting.servers[srv_idx].srv_port,
934 ms_setting.udp, &c->sfd) == 0)
935 {
936 c->tcpsfd[c->cur_idx]= c->sfd;
937 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
938 % srv_conn_cnt == 0)
939 {
940 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
941 int reconn_time=
942 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
943 - ms_setting.servers[srv_idx].disconn_time
944 .tv_sec);
945 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
946 ms_setting.servers[srv_idx].srv_host_name,
947 ms_setting.servers[srv_idx].srv_port, reconn_time);
948 }
949 break;
950 }
951
952 if (c->total_sfds == 1)
953 {
954 /* wait a second and reconnect */
955 sleep(1);
956 }
957 }
958 while (c->total_sfds == 1);
959 }
960
961 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
962 {
963 c->sfd= 0;
964 c->alive_sfds--;
965 }
966
967 return 0;
968 } /* ms_reconn */
969
970
971 /**
972 * reconnect several disconnected socks in the connection
973 * structure, the ever-1-second timer of the thread will check
974 * whether some socks in the connections disconnect. if
975 * disconnect, reconnect the sock.
976 *
977 * @param c, pointer of the concurrency
978 *
979 * @return int, if success, return 0, else return -1
980 */
981 int ms_reconn_socks(ms_conn_t *c)
982 {
983 int srv_idx= 0;
984 int ret_sfd= 0;
985 int srv_conn_cnt= 0;
986 struct timeval cur_time;
987
988 assert(c != NULL);
989
990 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
991 {
992 return 0;
993 }
994
995 for (int i= 0; i < c->total_sfds; i++)
996 {
997 if (c->tcpsfd[i] == 0)
998 {
999 gettimeofday(&cur_time, NULL);
1000
1001 /**
1002 * For failover test of replication, reconnect the socks after
1003 * it disconnects more than 5 seconds, Otherwise memslap will
1004 * block at connect() function and the work threads can't work
1005 * in this interval.
1006 */
1007 if (cur_time.tv_sec
1008 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1009 {
1010 break;
1011 }
1012
1013 if (ms_setting.rep_write_srv > 0)
1014 {
1015 srv_idx= i;
1016 srv_conn_cnt= ms_setting.nconns;
1017 }
1018 else
1019 {
1020 srv_idx= ms_thread.thread_ctx->srv_idx;
1021 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1022 }
1023
1024 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1025 ms_setting.servers[srv_idx].srv_port,
1026 ms_setting.udp, &ret_sfd) == 0)
1027 {
1028 c->tcpsfd[i]= ret_sfd;
1029 c->alive_sfds++;
1030
1031 if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1032 % srv_conn_cnt == 0)
1033 {
1034 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1035 int reconn_time=
1036 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1037 - ms_setting.servers[srv_idx].disconn_time
1038 .tv_sec);
1039 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1040 ms_setting.servers[srv_idx].srv_host_name,
1041 ms_setting.servers[srv_idx].srv_port, reconn_time);
1042 }
1043 }
1044 }
1045 }
1046
1047 return 0;
1048 } /* ms_reconn_socks */
1049
1050
1051 /**
1052 * Tokenize the command string by replacing whitespace with '\0' and update
1053 * the token array tokens with pointer to start of each token and length.
1054 * Returns total number of tokens. The last valid token is the terminal
1055 * token (value points to the first unprocessed character of the string and
1056 * length zero).
1057 *
1058 * Usage example:
1059 *
1060 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1061 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1062 * ...
1063 * }
1064 * ncommand = tokens[ix].value - command;
1065 * command = tokens[ix].value;
1066 * }
1067 *
1068 * @param command, the command string to token
1069 * @param tokens, array to store tokens
1070 * @param max_tokens, maximum tokens number
1071 *
1072 * @return int, the number of tokens
1073 */
1074 static int ms_tokenize_command(char *command,
1075 token_t *tokens,
1076 const int max_tokens)
1077 {
1078 char *s, *e;
1079 int ntokens= 0;
1080
1081 assert(command != NULL && tokens != NULL && max_tokens > 1);
1082
1083 for (s= e= command; ntokens < max_tokens - 1; ++e)
1084 {
1085 if (*e == ' ')
1086 {
1087 if (s != e)
1088 {
1089 tokens[ntokens].value= s;
1090 tokens[ntokens].length= (size_t)(e - s);
1091 ntokens++;
1092 *e= '\0';
1093 }
1094 s= e + 1;
1095 }
1096 else if (*e == '\0')
1097 {
1098 if (s != e)
1099 {
1100 tokens[ntokens].value= s;
1101 tokens[ntokens].length= (size_t)(e - s);
1102 ntokens++;
1103 }
1104
1105 break; /* string end */
1106 }
1107 }
1108
1109 return ntokens;
1110 } /* ms_tokenize_command */
1111
1112
1113 /**
1114 * parse the response of server.
1115 *
1116 * @param c, pointer of the concurrency
1117 * @param command, the string responded by server
1118 *
1119 * @return int, if the command completed return 0, else return
1120 * -1
1121 */
1122 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1123 {
1124 int ret= 0;
1125 int64_t value_len;
1126 char *buffer= command;
1127
1128 assert(c != NULL);
1129
1130 /**
1131 * for command get, we store the returned value into local buffer
1132 * then continue in ms_complete_nread().
1133 */
1134
1135 switch (buffer[0])
1136 {
1137 case 'V': /* VALUE || VERSION */
1138 if (buffer[1] == 'A') /* VALUE */
1139 {
1140 token_t tokens[MAX_TOKENS];
1141 ms_tokenize_command(command, tokens, MAX_TOKENS);
1142 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1143 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1144
1145 /*
1146 * We read the \r\n into the string since not doing so is more
1147 * cycles then the waster of memory to do so.
1148 *
1149 * We are null terminating through, which will most likely make
1150 * some people lazy about using the return length.
1151 */
1152 c->rvbytes= (int)(value_len + 2);
1153 c->readval= true;
1154 ret= -1;
1155 }
1156
1157 break;
1158
1159 case 'O': /* OK */
1160 c->currcmd.retstat= MCD_SUCCESS;
1161
1162 case 'S': /* STORED STATS SERVER_ERROR */
1163 if (buffer[2] == 'A') /* STORED STATS */
1164 { /* STATS*/
1165 c->currcmd.retstat= MCD_STAT;
1166 }
1167 else if (buffer[1] == 'E')
1168 {
1169 /* SERVER_ERROR */
1170 printf("<%d %s\n", c->sfd, buffer);
1171
1172 c->currcmd.retstat= MCD_SERVER_ERROR;
1173 }
1174 else if (buffer[1] == 'T')
1175 {
1176 /* STORED */
1177 c->currcmd.retstat= MCD_STORED;
1178 }
1179 else
1180 {
1181 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1182 }
1183 break;
1184
1185 case 'D': /* DELETED DATA */
1186 if (buffer[1] == 'E')
1187 {
1188 c->currcmd.retstat= MCD_DELETED;
1189 }
1190 else
1191 {
1192 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1193 }
1194
1195 break;
1196
1197 case 'N': /* NOT_FOUND NOT_STORED*/
1198 if (buffer[4] == 'F')
1199 {
1200 c->currcmd.retstat= MCD_NOTFOUND;
1201 }
1202 else if (buffer[4] == 'S')
1203 {
1204 printf("<%d %s\n", c->sfd, buffer);
1205 c->currcmd.retstat= MCD_NOTSTORED;
1206 }
1207 else
1208 {
1209 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1210 }
1211 break;
1212
1213 case 'E': /* PROTOCOL ERROR or END */
1214 if (buffer[1] == 'N')
1215 {
1216 /* END */
1217 c->currcmd.retstat= MCD_END;
1218 }
1219 else if (buffer[1] == 'R')
1220 {
1221 printf("<%d ERROR\n", c->sfd);
1222 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1223 }
1224 else if (buffer[1] == 'X')
1225 {
1226 c->currcmd.retstat= MCD_DATA_EXISTS;
1227 printf("<%d %s\n", c->sfd, buffer);
1228 }
1229 else
1230 {
1231 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1232 }
1233 break;
1234
1235 case 'C': /* CLIENT ERROR */
1236 printf("<%d %s\n", c->sfd, buffer);
1237 c->currcmd.retstat= MCD_CLIENT_ERROR;
1238 break;
1239
1240 default:
1241 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1242 break;
1243 } /* switch */
1244
1245 return ret;
1246 } /* ms_ascii_process_line */
1247
1248
1249 /**
1250 * after one operation completes, reset the concurrency
1251 *
1252 * @param c, pointer of the concurrency
1253 * @param timeout, whether it's timeout
1254 */
1255 void ms_reset_conn(ms_conn_t *c, bool timeout)
1256 {
1257 assert(c != NULL);
1258
1259 if (c->udp)
1260 {
1261 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1262 {
1263 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
1264 }
1265
1266 c->packets= 0;
1267 c->recvpkt= 0;
1268 c->pktcurr= 0;
1269 c->ordcurr= 0;
1270 c->rudpbytes= 0;
1271 }
1272 c->currcmd.isfinish= true;
1273 c->ctnwrite= false;
1274 c->rbytes= 0;
1275 c->rcurr= c->rbuf;
1276 ms_conn_set_state(c, conn_write);
1277 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1278
1279 if (timeout)
1280 {
1281 ms_drive_machine(c);
1282 }
1283 } /* ms_reset_conn */
1284
1285
1286 /**
1287 * if we have a complete line in the buffer, process it.
1288 *
1289 * @param c, pointer of the concurrency
1290 *
1291 * @return int, if success, return 0, else return -1
1292 */
1293 static int ms_try_read_line(ms_conn_t *c)
1294 {
1295 if (c->protocol == binary_prot)
1296 {
1297 /* Do we have the complete packet header? */
1298 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1299 {
1300 /* need more data! */
1301 return 0;
1302 }
1303 else
1304 {
1305 #ifdef NEED_ALIGN
1306 if (((long)(c->rcurr)) % 8 != 0)
1307 {
1308 /* must realign input buffer */
1309 memmove(c->rbuf, c->rcurr, c->rbytes);
1310 c->rcurr= c->rbuf;
1311 if (settings.verbose)
1312 {
1313 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1314 }
1315 }
1316 #endif
1317 protocol_binary_response_header *rsp;
1318 rsp= (protocol_binary_response_header *)c->rcurr;
1319
1320 c->binary_header= *rsp;
1321 c->binary_header.response.extlen= rsp->response.extlen;
1322 c->binary_header.response.keylen= ntohl(rsp->response.keylen);
1323 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1324 c->binary_header.response.status= ntohl(rsp->response.status);
1325
1326 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1327 {
1328 fprintf(stderr, "Invalid magic: %x\n",
1329 c->binary_header.response.magic);
1330 ms_conn_set_state(c, conn_closing);
1331 return 0;
1332 }
1333
1334 /* process this complete response */
1335 if (ms_bin_process_response(c) == 0)
1336 {
1337 /* current operation completed */
1338 ms_reset_conn(c, false);
1339 return -1;
1340 }
1341 else
1342 {
1343 c->rbytes-= (int32_t)sizeof(c->binary_header);
1344 c->rcurr+= sizeof(c->binary_header);
1345 }
1346 }
1347 }
1348 else
1349 {
1350 char *el, *cont;
1351
1352 assert(c != NULL);
1353 assert(c->rcurr <= (c->rbuf + c->rsize));
1354
1355 if (c->rbytes == 0)
1356 return 0;
1357
1358 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1359 if (! el)
1360 return 0;
1361
1362 cont= el + 1;
1363 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1364 {
1365 el--;
1366 }
1367 *el= '\0';
1368
1369 assert(cont <= (c->rcurr + c->rbytes));
1370
1371 /* process this complete line */
1372 if (ms_ascii_process_line(c, c->rcurr) == 0)
1373 {
1374 /* current operation completed */
1375 ms_reset_conn(c, false);
1376 return -1;
1377 }
1378 else
1379 {
1380 /* current operation didn't complete */
1381 c->rbytes-= (int32_t)(cont - c->rcurr);
1382 c->rcurr= cont;
1383 }
1384
1385 assert(c->rcurr <= (c->rbuf + c->rsize));
1386 }
1387
1388 return -1;
1389 } /* ms_try_read_line */
1390
1391
1392 /**
1393 * because the packet of UDP can't ensure the order, the
1394 * function is used to sort the received udp packet.
1395 *
1396 * @param c, pointer of the concurrency
1397 * @param buf, the buffer to store the ordered packages data
1398 * @param rbytes, the maximum capacity of the buffer
1399 *
1400 * @return int, if success, return the copy bytes, else return
1401 * -1
1402 */
1403 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1404 {
1405 int len= 0;
1406 int wbytes= 0;
1407 uint16_t req_id= 0;
1408 uint16_t seq_num= 0;
1409 uint16_t packets= 0;
1410 unsigned char *header= NULL;
1411
1412 /* no enough data */
1413 assert(c != NULL);
1414 assert(buf != NULL);
1415 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1416
1417 /* calculate received packets count */
1418 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1419 {
1420 /* the last packet has some data */
1421 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1422 }
1423 else
1424 {
1425 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1426 }
1427
1428 /* get the total packets count if necessary */
1429 if (c->packets == 0)
1430 {
1431 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1432 }
1433
1434 /* build the ordered packet array */
1435 for (int i= c->pktcurr; i < c->recvpkt; i++)
1436 {
1437 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1438 req_id= (uint16_t)HEADER_TO_REQID(header);
1439 assert(req_id == c->request_id % (1 << 16));
1440
1441 packets= (uint16_t)HEADER_TO_PACKETS(header);
1442 assert(c->packets == HEADER_TO_PACKETS(header));
1443
1444 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1445 c->udppkt[seq_num].header= header;
1446 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1447
1448 if (i == c->recvpkt - 1)
1449 {
1450 /* last received packet */
1451 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1452 {
1453 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1454 c->pktcurr++;
1455 }
1456 else
1457 {
1458 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1459 - UDP_HEADER_SIZE;
1460 }
1461 }
1462 else
1463 {
1464 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1465 c->pktcurr++;
1466 }
1467 }
1468
1469 for (int i= c->ordcurr; i < c->recvpkt; i++)
1470 {
1471 /* there is some data to copy */
1472 if ((c->udppkt[i].data != NULL)
1473 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1474 {
1475 header= c->udppkt[i].header;
1476 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1477 if (len > rbytes - wbytes)
1478 {
1479 len= rbytes - wbytes;
1480 }
1481
1482 assert(len <= rbytes - wbytes);
1483 assert(i == HEADER_TO_SEQNUM(header));
1484
1485 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1486 (size_t)len);
1487 wbytes+= len;
1488 c->udppkt[i].copybytes+= len;
1489
1490 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1491 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1492 {
1493 /* finish copying all the data of this packet, next */
1494 c->ordcurr++;
1495 }
1496
1497 /* last received packet, and finish copying all the data */
1498 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1499 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1500 {
1501 break;
1502 }
1503
1504 /* no space to copy data */
1505 if (wbytes >= rbytes)
1506 {
1507 break;
1508 }
1509
1510 /* it doesn't finish reading all the data of the packet from network */
1511 if ((i != c->recvpkt - 1)
1512 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1513 {
1514 break;
1515 }
1516 }
1517 else
1518 {
1519 /* no data to copy */
1520 break;
1521 }
1522 }
1523
1524 return wbytes == 0 ? -1 : wbytes;
1525 } /* ms_sort_udp_packet */
1526
1527
1528 /**
1529 * encapsulate upd read like tcp read
1530 *
1531 * @param c, pointer of the concurrency
1532 * @param buf, read buffer
1533 * @param len, length to read
1534 *
1535 * @return int, if success, return the read bytes, else return
1536 * -1
1537 */
1538 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1539 {
1540 int res= 0;
1541 int avail= 0;
1542 int rbytes= 0;
1543 int copybytes= 0;
1544
1545 assert(c->udp);
1546
1547 while (1)
1548 {
1549 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1550 {
1551 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1552 if (! new_rbuf)
1553 {
1554 fprintf(stderr, "Couldn't realloc input buffer.\n");
1555 c->rudpbytes= 0; /* ignore what we read */
1556 return -1;
1557 }
1558 c->rudpbuf= new_rbuf;
1559 c->rudpsize*= 2;
1560 }
1561
1562 avail= c->rudpsize - c->rudpbytes;
1563 /* UDP each time read a packet, 1400 bytes */
1564 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1565
1566 if (res > 0)
1567 {
1568 __sync_fetch_and_add(&ms_stats.bytes_read, res);
1569 c->rudpbytes+= res;
1570 rbytes+= res;
1571 if (res == avail)
1572 {
1573 continue;
1574 }
1575 else
1576 {
1577 break;
1578 }
1579 }
1580
1581 if (res == 0)
1582 {
1583 /* "connection" closed */
1584 return res;
1585 }
1586
1587 if (res == -1)
1588 {
1589 /* no data to read */
1590 return res;
1591 }
1592 }
1593
1594 /* copy data to read buffer */
1595 if (rbytes > 0)
1596 {
1597 copybytes= ms_sort_udp_packet(c, buf, len);
1598 }
1599
1600 if (copybytes == -1)
1601 {
1602 __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
1603 }
1604
1605 return copybytes;
1606 } /* ms_udp_read */
1607
1608
1609 /*
1610 * read from network as much as we can, handle buffer overflow and connection
1611 * close.
1612 * before reading, move the remaining incomplete fragment of a command
1613 * (if any) to the beginning of the buffer.
1614 * return 0 if there's nothing to read on the first read.
1615 */
1616
1617 /**
1618 * read from network as much as we can, handle buffer overflow and connection
1619 * close. before reading, move the remaining incomplete fragment of a command
1620 * (if any) to the beginning of the buffer.
1621 *
1622 * @param c, pointer of the concurrency
1623 *
1624 * @return int,
1625 * return 0 if there's nothing to read on the first read.
1626 * return 1 if get data
1627 * return -1 if error happens
1628 */
1629 static int ms_try_read_network(ms_conn_t *c)
1630 {
1631 int gotdata= 0;
1632 int res;
1633 int64_t avail;
1634
1635 assert(c != NULL);
1636
1637 if ((c->rcurr != c->rbuf)
1638 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1639 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1640 {
1641 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1642 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1643 c->rcurr= c->rbuf;
1644 }
1645
1646 while (1)
1647 {
1648 if (c->rbytes >= c->rsize)
1649 {
1650 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1651 if (! new_rbuf)
1652 {
1653 fprintf(stderr, "Couldn't realloc input buffer.\n");
1654 c->rbytes= 0; /* ignore what we read */
1655 return -1;
1656 }
1657 c->rcurr= c->rbuf= new_rbuf;
1658 c->rsize*= 2;
1659 }
1660
1661 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1662 if (avail == 0)
1663 {
1664 break;
1665 }
1666
1667 if (c->udp)
1668 {
1669 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1670 }
1671 else
1672 {
1673 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1674 }
1675
1676 if (res > 0)
1677 {
1678 if (! c->udp)
1679 {
1680 __sync_fetch_and_add(&ms_stats.bytes_read, res);
1681 }
1682 gotdata= 1;
1683 c->rbytes+= res;
1684 if (res == avail)
1685 {
1686 continue;
1687 }
1688 else
1689 {
1690 break;
1691 }
1692 }
1693 if (res == 0)
1694 {
1695 /* connection closed */
1696 ms_conn_set_state(c, conn_closing);
1697 return -1;
1698 }
1699 if (res == -1)
1700 {
1701 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1702 break;
1703 /* Should close on unhandled errors. */
1704 ms_conn_set_state(c, conn_closing);
1705 return -1;
1706 }
1707 }
1708
1709 return gotdata;
1710 } /* ms_try_read_network */
1711
1712
1713 /**
1714 * after get the object from server, verify the value if
1715 * necessary.
1716 *
1717 * @param c, pointer of the concurrency
1718 * @param mlget_item, pointer of mulit-get task item structure
1719 * @param value, received value string
1720 * @param vlen, received value string length
1721 */
1722 static void ms_verify_value(ms_conn_t *c,
1723 ms_mlget_task_item_t *mlget_item,
1724 char *value,
1725 int vlen)
1726 {
1727 if (c->curr_task.verify)
1728 {
1729 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1730 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1731 char *orignkey=
1732 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1733
1734 /* verify expire time if necessary */
1735 if (c->curr_task.item->exp_time > 0)
1736 {
1737 struct timeval curr_time;
1738 gettimeofday(&curr_time, NULL);
1739
1740 /* object expired but get it now */
1741 if (curr_time.tv_sec - c->curr_task.item->client_time
1742 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1743 {
1744 __sync_fetch_and_add(&ms_stats.exp_get, 1);
1745
1746 if (ms_setting.verbose)
1747 {
1748 char set_time[64];
1749 char cur_time[64];
1750 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1751 localtime(&c->curr_task.item->client_time));
1752 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1753 localtime(&curr_time.tv_sec));
1754 fprintf(stderr,
1755 "\n<%d expire time verification failed, "
1756 "object expired but get it now\n"
1757 "\tkey len: %d\n"
1758 "\tkey: %lx %.*s\n"
1759 "\tset time: %s current time: %s "
1760 "diff time: %d expire time: %d\n"
1761 "\texpected data: \n"
1762 "\treceived data len: %d\n"
1763 "\treceived data: %.*s\n",
1764 c->sfd,
1765 c->curr_task.item->key_size,
1766 c->curr_task.item->key_prefix,
1767 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1768 orignkey,
1769 set_time,
1770 cur_time,
1771 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1772 c->curr_task.item->exp_time,
1773 vlen,
1774 vlen,
1775 value);
1776 fflush(stderr);
1777 }
1778 }
1779 }
1780 else
1781 {
1782 if ((c->curr_task.item->value_size != vlen)
1783 || (memcmp(orignval, value, (size_t)vlen) != 0))
1784 {
1785 __sync_fetch_and_add(&ms_stats.vef_failed, 1);
1786
1787 if (ms_setting.verbose)
1788 {
1789 fprintf(stderr,
1790 "\n<%d data verification failed\n"
1791 "\tkey len: %d\n"
1792 "\tkey: %lx %.*s\n"
1793 "\texpected data len: %d\n"
1794 "\texpected data: %.*s\n"
1795 "\treceived data len: %d\n"
1796 "\treceived data: %.*s\n",
1797 c->sfd,
1798 c->curr_task.item->key_size,
1799 c->curr_task.item->key_prefix,
1800 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1801 orignkey,
1802 c->curr_task.item->value_size,
1803 c->curr_task.item->value_size,
1804 orignval,
1805 vlen,
1806 vlen,
1807 value);
1808 fflush(stderr);
1809 }
1810 }
1811 }
1812
1813 c->curr_task.finish_verify= true;
1814
1815 if (mlget_item != NULL)
1816 {
1817 mlget_item->finish_verify= true;
1818 }
1819 }
1820 } /* ms_verify_value */
1821
1822
1823 /**
1824 * For ASCII protocol, after store the data into the local
1825 * buffer, run this function to handle the data.
1826 *
1827 * @param c, pointer of the concurrency
1828 */
1829 static void ms_ascii_complete_nread(ms_conn_t *c)
1830 {
1831 assert(c != NULL);
1832 assert(c->rbytes >= c->rvbytes);
1833 assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
1834 if (c->rvbytes > 2)
1835 {
1836 assert(
1837 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1838 }
1839
1840 /* multi-get */
1841 ms_mlget_task_item_t *mlget_item= NULL;
1842 if (((ms_setting.mult_key_num > 1)
1843 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1844 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1845 {
1846 c->mlget_task.value_index++;
1847 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1848
1849 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1850 {
1851 c->curr_task.item= mlget_item->item;
1852 c->curr_task.verify= mlget_item->verify;
1853 c->curr_task.finish_verify= mlget_item->finish_verify;
1854 mlget_item->get_miss= false;
1855 }
1856 else
1857 {
1858 /* Try to find the task item in multi-get task array */
1859 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1860 {
1861 mlget_item= &c->mlget_task.mlget_item[i];
1862 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1863 {
1864 c->curr_task.item= mlget_item->item;
1865 c->curr_task.verify= mlget_item->verify;
1866 c->curr_task.finish_verify= mlget_item->finish_verify;
1867 mlget_item->get_miss= false;
1868
1869 break;
1870 }
1871 }
1872 }
1873 }
1874
1875 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1876
1877 c->curr_task.get_miss= false;
1878 c->rbytes-= c->rvbytes;
1879 c->rcurr= c->rcurr + c->rvbytes;
1880 assert(c->rcurr <= (c->rbuf + c->rsize));
1881 c->readval= false;
1882 c->rvbytes= 0;
1883 } /* ms_ascii_complete_nread */
1884
1885
1886 /**
1887 * For binary protocol, after store the data into the local
1888 * buffer, run this function to handle the data.
1889 *
1890 * @param c, pointer of the concurrency
1891 */
1892 static void ms_bin_complete_nread(ms_conn_t *c)
1893 {
1894 assert(c != NULL);
1895 assert(c->rbytes >= c->rvbytes);
1896 assert(c->protocol == binary_prot);
1897
1898 int extlen= c->binary_header.response.extlen;
1899 int keylen= c->binary_header.response.keylen;
1900 uint8_t opcode= c->binary_header.response.opcode;
1901
1902 /* not get command or not include value, just return */
1903 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1904 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1905 || (c->rvbytes <= extlen + keylen))
1906 {
1907 /* get miss */
1908 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1909 {
1910 c->currcmd.retstat= MCD_END;
1911 c->curr_task.get_miss= true;
1912 }
1913
1914 c->readval= false;
1915 c->rvbytes= 0;
1916 ms_reset_conn(c, false);
1917 return;
1918 }
1919
1920 /* multi-get */
1921 ms_mlget_task_item_t *mlget_item= NULL;
1922 if (((ms_setting.mult_key_num > 1)
1923 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1924 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1925 {
1926 c->mlget_task.value_index++;
1927 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1928
1929 c->curr_task.item= mlget_item->item;
1930 c->curr_task.verify= mlget_item->verify;
1931 c->curr_task.finish_verify= mlget_item->finish_verify;
1932 mlget_item->get_miss= false;
1933 }
1934
1935 ms_verify_value(c,
1936 mlget_item,
1937 c->rcurr + extlen + keylen,
1938 c->rvbytes - extlen - keylen);
1939
1940 c->currcmd.retstat= MCD_END;
1941 c->curr_task.get_miss= false;
1942 c->rbytes-= c->rvbytes;
1943 c->rcurr= c->rcurr + c->rvbytes;
1944 assert(c->rcurr <= (c->rbuf + c->rsize));
1945 c->readval= false;
1946 c->rvbytes= 0;
1947
1948 if (ms_setting.mult_key_num > 1)
1949 {
1950 /* multi-get have check all the item */
1951 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1952 {
1953 ms_reset_conn(c, false);
1954 }
1955 }
1956 else
1957 {
1958 /* single get */
1959 ms_reset_conn(c, false);
1960 }
1961 } /* ms_bin_complete_nread */
1962
1963
1964 /**
1965 * we get here after reading the value of get commands.
1966 *
1967 * @param c, pointer of the concurrency
1968 */
1969 static void ms_complete_nread(ms_conn_t *c)
1970 {
1971 assert(c != NULL);
1972 assert(c->rbytes >= c->rvbytes);
1973 assert(c->protocol == ascii_udp_prot
1974 || c->protocol == ascii_prot
1975 || c->protocol == binary_prot);
1976
1977 if (c->protocol == binary_prot)
1978 {
1979 ms_bin_complete_nread(c);
1980 }
1981 else
1982 {
1983 ms_ascii_complete_nread(c);
1984 }
1985 } /* ms_complete_nread */
1986
1987
1988 /**
1989 * Adds a message header to a connection.
1990 *
1991 * @param c, pointer of the concurrency
1992 *
1993 * @return int, if success, return 0, else return -1
1994 */
1995 static int ms_add_msghdr(ms_conn_t *c)
1996 {
1997 struct msghdr *msg;
1998
1999 assert(c != NULL);
2000
2001 if (c->msgsize == c->msgused)
2002 {
2003 msg=
2004 realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
2005 if (! msg)
2006 return -1;
2007
2008 c->msglist= msg;
2009 c->msgsize*= 2;
2010 }
2011
2012 msg= c->msglist + c->msgused;
2013
2014 /**
2015 * this wipes msg_iovlen, msg_control, msg_controllen, and
2016 * msg_flags, the last 3 of which aren't defined on solaris:
2017 */
2018 memset(msg, 0, sizeof(struct msghdr));
2019
2020 msg->msg_iov= &c->iov[c->iovused];
2021
2022 if (c->udp && (c->srv_recv_addr_size > 0))
2023 {
2024 msg->msg_name= &c->srv_recv_addr;
2025 msg->msg_namelen= c->srv_recv_addr_size;
2026 }
2027
2028 c->msgbytes= 0;
2029 c->msgused++;
2030
2031 if (c->udp)
2032 {
2033 /* Leave room for the UDP header, which we'll fill in later. */
2034 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2035 }
2036
2037 return 0;
2038 } /* ms_add_msghdr */
2039
2040
2041 /**
2042 * Ensures that there is room for another structure iovec in a connection's
2043 * iov list.
2044 *
2045 * @param c, pointer of the concurrency
2046 *
2047 * @return int, if success, return 0, else return -1
2048 */
2049 static int ms_ensure_iov_space(ms_conn_t *c)
2050 {
2051 assert(c != NULL);
2052
2053 if (c->iovused >= c->iovsize)
2054 {
2055 int i, iovnum;
2056 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2057 ((uint64_t)c->iovsize
2058 * 2)
2059 * sizeof(struct iovec));
2060 if (! new_iov)
2061 return -1;
2062
2063 c->iov= new_iov;
2064 c->iovsize*= 2;
2065
2066 /* Point all the msghdr structures at the new list. */
2067 for (i= 0, iovnum= 0; i < c->msgused; i++)
2068 {
2069 c->msglist[i].msg_iov= &c->iov[iovnum];
2070 iovnum+= (int)c->msglist[i].msg_iovlen;
2071 }
2072 }
2073
2074 return 0;
2075 } /* ms_ensure_iov_space */
2076
2077
2078 /**
2079 * Adds data to the list of pending data that will be written out to a
2080 * connection.
2081 *
2082 * @param c, pointer of the concurrency
2083 * @param buf, the buffer includes data to send
2084 * @param len, the data length in the buffer
2085 *
2086 * @return int, if success, return 0, else return -1
2087 */
2088 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2089 {
2090 struct msghdr *m;
2091 int leftover;
2092 bool limit_to_mtu;
2093
2094 assert(c != NULL);
2095
2096 do
2097 {
2098 m= &c->msglist[c->msgused - 1];
2099
2100 /*
2101 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2102 */
2103 limit_to_mtu= c->udp;
2104
2105 /* We may need to start a new msghdr if this one is full. */
2106 if ((m->msg_iovlen == IOV_MAX)
2107 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2108 {
2109 ms_add_msghdr(c);
2110 m= &c->msglist[c->msgused - 1];
2111 }
2112
2113 if (ms_ensure_iov_space(c) != 0)
2114 return -1;
2115
2116 /* If the fragment is too big to fit in the datagram, split it up */
2117 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2118 {
2119 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2120 len-= leftover;
2121 }
2122 else
2123 {
2124 leftover= 0;
2125 }
2126
2127 m= &c->msglist[c->msgused - 1];
2128 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2129 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2130
2131 c->msgbytes+= len;
2132 c->iovused++;
2133 m->msg_iovlen++;
2134
2135 buf= ((char *)buf) + len;
2136 len= leftover;
2137 }
2138 while (leftover > 0);
2139
2140 return 0;
2141 } /* ms_add_iov */
2142
2143
2144 /**
2145 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2146 *
2147 * @param c, pointer of the concurrency
2148 *
2149 * @return int, if success, return 0, else return -1
2150 */
2151 static int ms_build_udp_headers(ms_conn_t *c)
2152 {
2153 int i;
2154 unsigned char *hdr;
2155
2156 assert(c != NULL);
2157
2158 c->request_id= ms_get_udp_request_id();
2159
2160 if (c->msgused > c->hdrsize)
2161 {
2162 void *new_hdrbuf;
2163 if (c->hdrbuf)
2164 new_hdrbuf= realloc(c->hdrbuf,
2165 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2166 else
2167 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2168 if (! new_hdrbuf)
2169 return -1;
2170
2171 c->hdrbuf= (unsigned char *)new_hdrbuf;
2172 c->hdrsize= c->msgused * 2;
2173 }
2174
2175 /* If this is a multi-packet request, drop it. */
2176 if (c->udp && (c->msgused > 1))
2177 {
2178 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2179 return -1;
2180 }
2181
2182 hdr= c->hdrbuf;
2183 for (i= 0; i < c->msgused; i++)
2184 {
2185 c->msglist[i].msg_iov[0].iov_base= hdr;
2186 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2187 *hdr++= (unsigned char)(c->request_id / 256);
2188 *hdr++= (unsigned char)(c->request_id % 256);
2189 *hdr++= (unsigned char)(i / 256);
2190 *hdr++= (unsigned char)(i % 256);
2191 *hdr++= (unsigned char)(c->msgused / 256);
2192 *hdr++= (unsigned char)(c->msgused % 256);
2193 *hdr++= (unsigned char)1; /* support facebook memcached */
2194 *hdr++= (unsigned char)0;
2195 assert(hdr ==
2196 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2197 + UDP_HEADER_SIZE));
2198 }
2199
2200 return 0;
2201 } /* ms_build_udp_headers */
2202
2203
2204 /**
2205 * Transmit the next chunk of data from our list of msgbuf structures.
2206 *
2207 * @param c, pointer of the concurrency
2208 *
2209 * @return TRANSMIT_COMPLETE All done writing.
2210 * TRANSMIT_INCOMPLETE More data remaining to write.
2211 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2212 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2213 */
2214 static int ms_transmit(ms_conn_t *c)
2215 {
2216 assert(c != NULL);
2217
2218 if ((c->msgcurr < c->msgused)
2219 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2220 {
2221 /* Finished writing the current msg; advance to the next. */
2222 c->msgcurr++;
2223 }
2224
2225 if (c->msgcurr < c->msgused)
2226 {
2227 ssize_t res;
2228 struct msghdr *m= &c->msglist[c->msgcurr];
2229
2230 res= sendmsg(c->sfd, m, 0);
2231 if (res > 0)
2232 {
2233 __sync_fetch_and_add(&ms_stats.bytes_written, res);
2234
2235 /* We've written some of the data. Remove the completed
2236 * iovec entries from the list of pending writes. */
2237 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2238 {
2239 res-= (ssize_t)m->msg_iov->iov_len;
2240 m->msg_iovlen--;
2241 m->msg_iov++;
2242 }
2243
2244 /* Might have written just part of the last iovec entry;
2245 * adjust it so the next write will do the rest. */
2246 if (res > 0)
2247 {
2248 m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res;
2249 m->msg_iov->iov_len-= (uint64_t)res;
2250 }
2251 return TRANSMIT_INCOMPLETE;
2252 }
2253 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2254 {
2255 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2256 {
2257 fprintf(stderr, "Couldn't update event.\n");
2258 ms_conn_set_state(c, conn_closing);
2259 return TRANSMIT_HARD_ERROR;
2260 }
2261 return TRANSMIT_SOFT_ERROR;
2262 }
2263
2264 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2265 * we have a real error, on which we close the connection */
2266 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2267
2268 ms_conn_set_state(c, conn_closing);
2269 return TRANSMIT_HARD_ERROR;
2270 }
2271 else
2272 {
2273 return TRANSMIT_COMPLETE;
2274 }
2275 } /* ms_transmit */
2276
2277
2278 /**
2279 * Shrinks a connection's buffers if they're too big. This prevents
2280 * periodic large "mget" response from server chewing lots of client
2281 * memory.
2282 *
2283 * This should only be called in between requests since it can wipe output
2284 * buffers!
2285 *
2286 * @param c, pointer of the concurrency
2287 */
2288 static void ms_conn_shrink(ms_conn_t *c)
2289 {
2290 assert(c != NULL);
2291
2292 if (c->udp)
2293 return;
2294
2295 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2296 {
2297 char *newbuf;
2298
2299 if (c->rcurr != c->rbuf)
2300 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2301
2302 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2303
2304 if (newbuf)
2305 {
2306 c->rbuf= newbuf;
2307 c->rsize= DATA_BUFFER_SIZE;
2308 }
2309 c->rcurr= c->rbuf;
2310 }
2311
2312 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2313 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2314 {
2315 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2316 if (! new_rbuf)
2317 {
2318 c->rudpbuf= new_rbuf;
2319 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2320 }
2321 /* TODO check error condition? */
2322 }
2323
2324 if (c->msgsize > MSG_LIST_HIGHWAT)
2325 {
2326 struct msghdr *newbuf= (struct msghdr *)realloc(
2327 (void *)c->msglist,
2328 MSG_LIST_INITIAL
2329 * sizeof(c->msglist[0]));
2330 if (newbuf)
2331 {
2332 c->msglist= newbuf;
2333 c->msgsize= MSG_LIST_INITIAL;
2334 }
2335 /* TODO check error condition? */
2336 }
2337
2338 if (c->iovsize > IOV_LIST_HIGHWAT)
2339 {
2340 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2341 IOV_LIST_INITIAL
2342 * sizeof(c->iov[0]));
2343 if (newbuf)
2344 {
2345 c->iov= newbuf;
2346 c->iovsize= IOV_LIST_INITIAL;
2347 }
2348 /* TODO check return value */
2349 }
2350 } /* ms_conn_shrink */
2351
2352
2353 /**
2354 * Sets a connection's current state in the state machine. Any special
2355 * processing that needs to happen on certain state transitions can
2356 * happen here.
2357 *
2358 * @param c, pointer of the concurrency
2359 * @param state, connection state
2360 */
2361 static void ms_conn_set_state(ms_conn_t *c, int state)
2362 {
2363 assert(c != NULL);
2364
2365 if (state != c->state)
2366 {
2367 if (state == conn_read)
2368 {
2369 ms_conn_shrink(c);
2370 }
2371 c->state= state;
2372 }
2373 } /* ms_conn_set_state */
2374
2375
2376 /**
2377 * update the event if socks change state. for example: when
2378 * change the listen scoket read event to sock write event, or
2379 * change socket handler, we could call this function.
2380 *
2381 * @param c, pointer of the concurrency
2382 * @param new_flags, new event flags
2383 *
2384 * @return bool, if success, return true, else return false
2385 */
2386 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2387 {
2388 /* default event timeout 10 seconds */
2389 struct timeval t=
2390 {
2391 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
2392 };
2393
2394 assert(c != NULL);
2395
2396 struct event_base *base= c->event.ev_base;
2397 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2398 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2399 {
2400 return true;
2401 }
2402
2403 if (event_del(&c->event) == -1)
2404 {
2405 /* try to delete the event again */
2406 if (event_del(&c->event) == -1)
2407 {
2408 return false;
2409 }
2410 }
2411
2412 event_set(&c->event,
2413 c->sfd,
2414 (short)new_flags,
2415 ms_event_handler,
2416 (void *)c);
2417 event_base_set(base, &c->event);
2418 c->ev_flags= (short)new_flags;
2419
2420 if (c->total_sfds == 1)
2421 {
2422 if (event_add(&c->event, NULL) == -1)
2423 {
2424 return false;
2425 }
2426 }
2427 else
2428 {
2429 if (event_add(&c->event, &t) == -1)
2430 {
2431 return false;
2432 }
2433 }
2434
2435 return true;
2436 } /* ms_update_event */
2437
2438
2439 /**
2440 * If user want to get the expected throughput, we could limit
2441 * the performance of memslap. we could give up some work and
2442 * just wait a short time. The function is used to check this
2443 * case.
2444 *
2445 * @param c, pointer of the concurrency
2446 *
2447 * @return bool, if success, return true, else return false
2448 */
2449 static bool ms_need_yield(ms_conn_t *c)
2450 {
2451 int64_t tps= 0;
2452 int64_t time_diff= 0;
2453 struct timeval curr_time;
2454 ms_task_t *task= &c->curr_task;
2455
2456 if (ms_setting.expected_tps > 0)
2457 {
2458 gettimeofday(&curr_time, NULL);
2459 time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time);
2460 tps=
2461 (int64_t)((task->get_opt
2462 + task->set_opt) / ((uint64_t)time_diff / 1000000));
2463
2464 /* current throughput is greater than expected throughput */
2465 if (tps > ms_thread.thread_ctx->tps_perconn)
2466 {
2467 return true;
2468 }
2469 }
2470
2471 return false;
2472 } /* ms_need_yield */
2473
2474
2475 /**
2476 * used to update the start time of each operation
2477 *
2478 * @param c, pointer of the concurrency
2479 */
2480 static void ms_update_start_time(ms_conn_t *c)
2481 {
2482 ms_task_item_t *item= c->curr_task.item;
2483
2484 if ((ms_setting.stat_freq > 0) || c->udp
2485 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2486 {
2487 gettimeofday(&c->start_time, NULL);
2488 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2489 {
2490 /* record the current time */
2491 item->client_time= c->start_time.tv_sec;
2492 }
2493 }
2494 } /* ms_update_start_time */
2495
2496
2497 /**
2498 * run the state machine
2499 *
2500 * @param c, pointer of the concurrency
2501 */
2502 static void ms_drive_machine(ms_conn_t *c)
2503 {
2504 bool stop= false;
2505
2506 assert(c != NULL);
2507
2508 while (! stop)
2509 {
2510 switch (c->state)
2511 {
2512 case conn_read:
2513 if (c->readval)
2514 {
2515 if (c->rbytes >= c->rvbytes)
2516 {
2517 ms_complete_nread(c);
2518 break;
2519 }
2520 }
2521 else
2522 {
2523 if (ms_try_read_line(c) != 0)
2524 {
2525 break;
2526 }
2527 }
2528
2529 if (ms_try_read_network(c) != 0)
2530 {
2531 break;
2532 }
2533
2534 /* doesn't read all the response data, wait event wake up */
2535 if (! c->currcmd.isfinish)
2536 {
2537 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2538 {
2539 fprintf(stderr, "Couldn't update event.\n");
2540 ms_conn_set_state(c, conn_closing);
2541 break;
2542 }
2543 stop= true;
2544 break;
2545 }
2546
2547 /* we have no command line and no data to read from network, next write */
2548 ms_conn_set_state(c, conn_write);
2549 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2550
2551 break;
2552
2553 case conn_write:
2554 if (! c->ctnwrite && ms_need_yield(c))
2555 {
2556 usleep(10);
2557
2558 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2559 {
2560 fprintf(stderr, "Couldn't update event.\n");
2561 ms_conn_set_state(c, conn_closing);
2562 break;
2563 }
2564 stop= true;
2565 break;
2566 }
2567
2568 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2569 {
2570 ms_conn_set_state(c, conn_closing);
2571 break;
2572 }
2573
2574 /* record the start time before starting to send data if necessary */
2575 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2576 {
2577 if (c->change_sfd)
2578 {
2579 c->change_sfd= false;
2580 }
2581 ms_update_start_time(c);
2582 }
2583
2584 /* change sfd if necessary */
2585 if (c->change_sfd)
2586 {
2587 c->ctnwrite= true;
2588 stop= true;
2589 break;
2590 }
2591
2592 /* execute task until nothing need be written to network */
2593 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2594 {
2595 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2596 {
2597 fprintf(stderr, "Couldn't update event.\n");
2598 ms_conn_set_state(c, conn_closing);
2599 break;
2600 }
2601 stop= true;
2602 break;
2603 }
2604
2605 switch (ms_transmit(c))
2606 {
2607 case TRANSMIT_COMPLETE:
2608 /* we have no data to write to network, next wait repose */
2609 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2610 {
2611 fprintf(stderr, "Couldn't update event.\n");
2612 ms_conn_set_state(c, conn_closing);
2613 c->ctnwrite= false;
2614 break;
2615 }
2616 ms_conn_set_state(c, conn_read);
2617 c->ctnwrite= false;
2618 stop= true;
2619 break;
2620
2621 case TRANSMIT_INCOMPLETE:
2622 c->ctnwrite= true;
2623 break; /* Continue in state machine. */
2624
2625 case TRANSMIT_HARD_ERROR:
2626 c->ctnwrite= false;
2627 break;
2628
2629 case TRANSMIT_SOFT_ERROR:
2630 c->ctnwrite= true;
2631 stop= true;
2632 break;
2633
2634 default:
2635 break;
2636 } /* switch */
2637
2638 break;
2639
2640 case conn_closing:
2641 /* recovery mode, need reconnect if connection close */
2642 if (ms_setting.reconnect && (! ms_global.time_out
2643 || ((ms_setting.run_time == 0)
2644 && (c->remain_exec_num > 0))))
2645 {
2646 if (ms_reconn(c) != 0)
2647 {
2648 ms_conn_close(c);
2649 stop= true;
2650 break;
2651 }
2652
2653 ms_reset_conn(c, false);
2654
2655 if (c->total_sfds == 1)
2656 {
2657 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2658 {
2659 fprintf(stderr, "Couldn't update event.\n");
2660 ms_conn_set_state(c, conn_closing);
2661 break;
2662 }
2663 }
2664
2665 break;
2666 }
2667 else
2668 {
2669 ms_conn_close(c);
2670 stop= true;
2671 break;
2672 }
2673
2674 default:
2675 assert(0);
2676 } /* switch */
2677 }
2678 } /* ms_drive_machine */
2679
2680
2681 /**
2682 * the event handler of each thread
2683 *
2684 * @param fd, the file descriptor of socket
2685 * @param which, event flag
2686 * @param arg, argument
2687 */
2688 void ms_event_handler(const int fd, const short which, void *arg)
2689 {
2690 ms_conn_t *c= (ms_conn_t *)arg;
2691
2692 assert(c != NULL);
2693
2694 c->which= which;
2695
2696 /* sanity */
2697 if (fd != c->sfd)
2698 {
2699 fprintf(stderr,
2700 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2701 fd,
2702 c->sfd);
2703 ms_conn_close(c);
2704 exit(1);
2705 }
2706 assert(fd == c->sfd);
2707
2708 /* event timeout, close the current connection */
2709 if (c->which == EV_TIMEOUT)
2710 {
2711 ms_conn_set_state(c, conn_closing);
2712 }
2713
2714 ms_drive_machine(c);
2715
2716 /* wait for next event */
2717 } /* ms_event_handler */
2718
2719
2720 /**
2721 * get the next socket descriptor index to run for replication
2722 *
2723 * @param c, pointer of the concurrency
2724 * @param cmd, command(get or set )
2725 *
2726 * @return int, if success, return the index, else return 0
2727 */
2728 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2729 {
2730 int sock_index= -1;
2731 int i= 0;
2732
2733 if (c->total_sfds == 1)
2734 {
2735 return 0;
2736 }
2737
2738 if (ms_setting.rep_write_srv == 0)
2739 {
2740 return sock_index;
2741 }
2742
2743 do
2744 {
2745 if (cmd == CMD_SET)
2746 {
2747 for (i= 0; i < ms_setting.rep_write_srv; i++)
2748 {
2749 if (c->tcpsfd[i] > 0)
2750 {
2751 break;
2752 }
2753 }
2754
2755 if (i == ms_setting.rep_write_srv)
2756 {
2757 /* random get one replication server to read */
2758 sock_index= (int)(random() % c->total_sfds);
2759 }
2760 else
2761 {
2762 /* random get one replication writing server to write */
2763 sock_index= (int)(random() % ms_setting.rep_write_srv);
2764 }
2765 }
2766 else if (cmd == CMD_GET)
2767 {
2768 /* random get one replication server to read */
2769 sock_index= (int)(random() % c->total_sfds);
2770 }
2771 }
2772 while (c->tcpsfd[sock_index] == 0);
2773
2774 return sock_index;
2775 } /* ms_get_rep_sock_index */
2776
2777
2778 /**
2779 * get the next socket descriptor index to run
2780 *
2781 * @param c, pointer of the concurrency
2782 *
2783 * @return int, return the index
2784 */
2785 static int ms_get_next_sock_index(ms_conn_t *c)
2786 {
2787 int sock_index= 0;
2788
2789 do
2790 {
2791 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2792 }
2793 while (c->tcpsfd[sock_index] == 0);
2794
2795 return sock_index;
2796 } /* ms_get_next_sock_index */
2797
2798
2799 /**
2800 * update socket event of the connections
2801 *
2802 * @param c, pointer of the concurrency
2803 *
2804 * @return int, if success, return 0, else return -1
2805 */
2806 static int ms_update_conn_sock_event(ms_conn_t *c)
2807 {
2808 assert(c != NULL);
2809
2810 switch (c->currcmd.cmd)
2811 {
2812 case CMD_SET:
2813 if (ms_setting.facebook_test && c->udp)
2814 {
2815 c->sfd= c->tcpsfd[0];
2816 c->udp= false;
2817 c->change_sfd= true;
2818 }
2819 break;
2820
2821 case CMD_GET:
2822 if (ms_setting.facebook_test && ! c->udp)
2823 {
2824 c->sfd= c->udpsfd;
2825 c->udp= true;
2826 c->change_sfd= true;
2827 }
2828 break;
2829
2830 default:
2831 break;
2832 } /* switch */
2833
2834 if (! c->udp && (c->total_sfds > 1))
2835 {
2836 if (c->cur_idx != c->total_sfds)
2837 {
2838 if (ms_setting.rep_write_srv == 0)
2839 {
2840 c->cur_idx= ms_get_next_sock_index(c);
2841 }
2842 else
2843 {
2844 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2845 }
2846 }
2847 else
2848 {
2849 /* must select the first sock of the connection at the beginning */
2850 c->cur_idx= 0;
2851 }
2852
2853 c->sfd= c->tcpsfd[c->cur_idx];
2854 assert(c->sfd != 0);
2855 c->change_sfd= true;
2856 }
2857
2858 if (c->change_sfd)
2859 {
2860 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2861 {
2862 fprintf(stderr, "Couldn't update event.\n");
2863 ms_conn_set_state(c, conn_closing);
2864 return -1;
2865 }
2866 }
2867
2868 return 0;
2869 } /* ms_update_conn_sock_event */
2870
2871
2872 /**
2873 * for ASCII protocol, this function build the set command
2874 * string and send the command.
2875 *
2876 * @param c, pointer of the concurrency
2877 * @param item, pointer of task item which includes the object
2878 * information
2879 *
2880 * @return int, if success, return 0, else return -1
2881 */
2882 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2883 {
2884 int value_offset;
2885 int write_len;
2886 char *buffer= c->wbuf;
2887
2888 write_len= sprintf(buffer,
2889 " %u %d %d\r\n",
2890 0,
2891 item->exp_time,
2892 item->value_size);
2893
2894 if (write_len > c->wsize)
2895 {
2896 /* ought to be always enough. just fail for simplicity */
2897 fprintf(stderr, "output command line too long.\n");
2898 return -1;
2899 }
2900
2901 if (item->value_offset == INVALID_OFFSET)
2902 {
2903 value_offset= item->key_suffix_offset;
2904 }
2905 else
2906 {
2907 value_offset= item->value_offset;
2908 }
2909
2910 if ((ms_add_iov(c, "set ", 4) != 0)
2911 || (ms_add_iov(c, (char *)&item->key_prefix,
2912 (int)KEY_PREFIX_SIZE) != 0)
2913 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2914 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2915 || (ms_add_iov(c, buffer, write_len) != 0)
2916 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2917 item->value_size) != 0)
2918 || (ms_add_iov(c, "\r\n", 2) != 0)
2919 || (c->udp && (ms_build_udp_headers(c) != 0)))
2920 {
2921 return -1;
2922 }
2923
2924 return 0;
2925 } /* ms_build_ascii_write_buf_set */
2926
2927
2928 /**
2929 * used to send set command to server
2930 *
2931 * @param c, pointer of the concurrency
2932 * @param item, pointer of task item which includes the object
2933 * information
2934 *
2935 * @return int, if success, return 0, else return -1
2936 */
2937 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2938 {
2939 assert(c != NULL);
2940
2941 c->currcmd.cmd= CMD_SET;
2942 c->currcmd.isfinish= false;
2943 c->currcmd.retstat= MCD_FAILURE;
2944
2945 if (ms_update_conn_sock_event(c) != 0)
2946 {
2947 return -1;
2948 }
2949
2950 c->msgcurr= 0;
2951 c->msgused= 0;
2952 c->iovused= 0;
2953 if (ms_add_msghdr(c) != 0)
2954 {
2955 fprintf(stderr, "Out of memory preparing request.");
2956 return -1;
2957 }
2958
2959 /* binary protocol */
2960 if (c->protocol == binary_prot)
2961 {
2962 if (ms_build_bin_write_buf_set(c, item) != 0)
2963 {
2964 return -1;
2965 }
2966 }
2967 else
2968 {
2969 if (ms_build_ascii_write_buf_set(c, item) != 0)
2970 {
2971 return -1;
2972 }
2973 }
2974
2975 __sync_fetch_and_add(&ms_stats.obj_bytes,
2976 item->key_size + item->value_size);
2977 __sync_fetch_and_add(&ms_stats.cmd_set, 1);
2978
2979 return 0;
2980 } /* ms_mcd_set */
2981
2982
2983 /**
2984 * for ASCII protocol, this function build the get command
2985 * string and send the command.
2986 *
2987 * @param c, pointer of the concurrency
2988 * @param item, pointer of task item which includes the object
2989 * information
2990 *
2991 * @return int, if success, return 0, else return -1
2992 */
2993 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
2994 {
2995 if ((ms_add_iov(c, "get ", 4) != 0)
2996 || (ms_add_iov(c, (char *)&item->key_prefix,
2997 (int)KEY_PREFIX_SIZE) != 0)
2998 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2999 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3000 || (ms_add_iov(c, "\r\n", 2) != 0)
3001 || (c->udp && (ms_build_udp_headers(c) != 0)))
3002 {
3003 return -1;
3004 }
3005
3006 return 0;
3007 } /* ms_build_ascii_write_buf_get */
3008
3009
3010 /**
3011 * used to send the get command to server
3012 *
3013 * @param c, pointer of the concurrency
3014 * @param item, pointer of task item which includes the object
3015 * information
3016 * @param verify, whether do verification
3017 *
3018 * @return int, if success, return 0, else return -1
3019 */
3020 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
3021 {
3022 /* verify not supported yet */
3023 UNUSED_ARGUMENT(verify);
3024
3025 assert(c != NULL);
3026
3027 c->currcmd.cmd= CMD_GET;
3028 c->currcmd.isfinish= false;
3029 c->currcmd.retstat= MCD_FAILURE;
3030
3031 if (ms_update_conn_sock_event(c) != 0)
3032 {
3033 return -1;
3034 }
3035
3036 c->msgcurr= 0;
3037 c->msgused= 0;
3038 c->iovused= 0;
3039 if (ms_add_msghdr(c) != 0)
3040 {
3041 fprintf(stderr, "Out of memory preparing request.");
3042 return -1;
3043 }
3044
3045 /* binary protocol */
3046 if (c->protocol == binary_prot)
3047 {
3048 if (ms_build_bin_write_buf_get(c, item) != 0)
3049 {
3050 return -1;
3051 }
3052 }
3053 else
3054 {
3055 if (ms_build_ascii_write_buf_get(c, item) != 0)
3056 {
3057 return -1;
3058 }
3059 }
3060
3061 __sync_fetch_and_add(&ms_stats.cmd_get, 1);
3062
3063 return 0;
3064 } /* ms_mcd_get */
3065
3066
3067 /**
3068 * for ASCII protocol, this function build the multi-get command
3069 * string and send the command.
3070 *
3071 * @param c, pointer of the concurrency
3072 *
3073 * @return int, if success, return 0, else return -1
3074 */
3075 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3076 {
3077 ms_task_item_t *item;
3078
3079 if (ms_add_iov(c, "get", 3) != 0)
3080 {
3081 return -1;
3082 }
3083
3084 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3085 {
3086 item= c->mlget_task.mlget_item[i].item;
3087 assert(item != NULL);
3088 if ((ms_add_iov(c, " ", 1) != 0)
3089 || (ms_add_iov(c, (char *)&item->key_prefix,
3090 (int)KEY_PREFIX_SIZE) != 0)
3091 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3092 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3093 {
3094 return -1;
3095 }
3096 }
3097
3098 if ((ms_add_iov(c, "\r\n", 2) != 0)
3099 || (c->udp && (ms_build_udp_headers(c) != 0)))
3100 {
3101 return -1;
3102 }
3103
3104 return 0;
3105 } /* ms_build_ascii_write_buf_mlget */
3106
3107
3108 /**
3109 * used to send the multi-get command to server
3110 *
3111 * @param c, pointer of the concurrency
3112 *
3113 * @return int, if success, return 0, else return -1
3114 */
3115 int ms_mcd_mlget(ms_conn_t *c)
3116 {
3117 ms_task_item_t *item;
3118
3119 assert(c != NULL);
3120 assert(c->mlget_task.mlget_num >= 1);
3121
3122 c->currcmd.cmd= CMD_GET;
3123 c->currcmd.isfinish= false;
3124 c->currcmd.retstat= MCD_FAILURE;
3125
3126 if (ms_update_conn_sock_event(c) != 0)
3127 {
3128 return -1;
3129 }
3130
3131 c->msgcurr= 0;
3132 c->msgused= 0;
3133 c->iovused= 0;
3134 if (ms_add_msghdr(c) != 0)
3135 {
3136 fprintf(stderr, "Out of memory preparing request.");
3137 return -1;
3138 }
3139
3140 /* binary protocol */
3141 if (c->protocol == binary_prot)
3142 {
3143 if (ms_build_bin_write_buf_mlget(c) != 0)
3144 {
3145 return -1;
3146 }
3147 }
3148 else
3149 {
3150 if (ms_build_ascii_write_buf_mlget(c) != 0)
3151 {
3152 return -1;
3153 }
3154 }
3155
3156 /* decrease operation time of each item */
3157 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3158 {
3159 item= c->mlget_task.mlget_item[i].item;
3160 __sync_fetch_and_add(&ms_stats.cmd_get, 1);
3161 }
3162
3163 return 0;
3164 } /* ms_mcd_mlget */
3165
3166
3167 /**
3168 * binary protocol support
3169 */
3170
3171 /**
3172 * for binary protocol, parse the response of server
3173 *
3174 * @param c, pointer of the concurrency
3175 *
3176 * @return int, if success, return 0, else return -1
3177 */
3178 static int ms_bin_process_response(ms_conn_t *c)
3179 {
3180 const char *errstr= NULL;
3181
3182 assert(c != NULL);
3183
3184 uint32_t bodylen= c->binary_header.response.bodylen;
3185 uint8_t opcode= c->binary_header.response.opcode;
3186 uint16_t status= c->binary_header.response.status;
3187
3188 if (bodylen > 0)
3189 {
3190 c->rvbytes= (int32_t)bodylen;
3191 c->readval= true;
3192 return 1;
3193 }
3194 else
3195 {
3196 switch (status)
3197 {
3198 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3199 if (opcode == PROTOCOL_BINARY_CMD_SET)
3200 {
3201 c->currcmd.retstat= MCD_STORED;
3202 }
3203 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3204 {
3205 c->currcmd.retstat= MCD_DELETED;
3206 }
3207 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3208 {
3209 c->currcmd.retstat= MCD_END;
3210 }
3211 break;
3212
3213 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3214 errstr= "Out of memory";
3215 c->currcmd.retstat= MCD_SERVER_ERROR;
3216 break;
3217
3218 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3219 errstr= "Unknown command";
3220 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3221 break;
3222
3223 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3224 errstr= "Not found";
3225 c->currcmd.retstat= MCD_NOTFOUND;
3226 break;
3227
3228 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3229 errstr= "Invalid arguments";
3230 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3231 break;
3232
3233 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3234 errstr= "Data exists for key.";
3235 break;
3236
3237 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3238 errstr= "Too large.";
3239 c->currcmd.retstat= MCD_SERVER_ERROR;
3240 break;
3241
3242 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3243 errstr= "Not stored.";
3244 c->currcmd.retstat= MCD_NOTSTORED;
3245 break;
3246
3247 default:
3248 errstr= "Unknown error";
3249 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3250 break;
3251 } /* switch */
3252
3253 if (errstr != NULL)
3254 {
3255 fprintf(stderr, "%s\n", errstr);
3256 }
3257 }
3258
3259 return 0;
3260 } /* ms_bin_process_response */
3261
3262
3263 /* build binary header and add the header to the buffer to send */
3264
3265 /**
3266 * build binary header and add the header to the buffer to send
3267 *
3268 * @param c, pointer of the concurrency
3269 * @param opcode, operation code
3270 * @param hdr_len, length of header
3271 * @param key_len, length of key
3272 * @param body_len. length of body
3273 */
3274 static void ms_add_bin_header(ms_conn_t *c,
3275 uint8_t opcode,
3276 uint8_t hdr_len,
3277 uint16_t key_len,
3278 uint32_t body_len)
3279 {
3280 protocol_binary_request_header *header;
3281
3282 assert(c != NULL);
3283
3284 header= (protocol_binary_request_header *)c->wcurr;
3285
3286 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3287 header->request.opcode= (uint8_t)opcode;
3288 header->request.keylen= htonl(key_len);
3289
3290 header->request.extlen= (uint8_t)hdr_len;
3291 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3292 header->request.reserved= 0;
3293
3294 header->request.bodylen= htonl(body_len);
3295 header->request.opaque= 0;
3296 header->request.cas= 0;
3297
3298 ms_add_iov(c, c->wcurr, sizeof(header->request));
3299 } /* ms_add_bin_header */
3300
3301
3302 /**
3303 * add the key to the socket write buffer array
3304 *
3305 * @param c, pointer of the concurrency
3306 * @param item, pointer of task item which includes the object
3307 * information
3308 */
3309 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3310 {
3311 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3312 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3313 item->key_size - (int)KEY_PREFIX_SIZE);
3314 }
3315
3316
3317 /**
3318 * for binary protocol, this function build the set command
3319 * and add the command to send buffer array.
3320 *
3321 * @param c, pointer of the concurrency
3322 * @param item, pointer of task item which includes the object
3323 * information
3324 *
3325 * @return int, if success, return 0, else return -1
3326 */
3327 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3328 {
3329 assert(c->wbuf == c->wcurr);
3330
3331 int value_offset;
3332 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3333 uint16_t keylen= (uint16_t)item->key_size;
3334 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3335 + (uint32_t)keylen + (uint32_t)item->value_size;
3336
3337 ms_add_bin_header(c,
3338 PROTOCOL_BINARY_CMD_SET,
3339 sizeof(rep->message.body),
3340 keylen,
3341 bodylen);
3342 rep->message.body.flags= 0;
3343 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3344 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3345 ms_add_key_to_iov(c, item);
3346
3347 if (item->value_offset == INVALID_OFFSET)
3348 {
3349 value_offset= item->key_suffix_offset;
3350 }
3351 else
3352 {
3353 value_offset= item->value_offset;
3354 }
3355 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3356
3357 return 0;
3358 } /* ms_build_bin_write_buf_set */
3359
3360
3361 /**
3362 * for binary protocol, this function build the get command and
3363 * add the command to send buffer array.
3364 *
3365 * @param c, pointer of the concurrency
3366 * @param item, pointer of task item which includes the object
3367 * information
3368 *
3369 * @return int, if success, return 0, else return -1
3370 */
3371 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3372 {
3373 assert(c->wbuf == c->wcurr);
3374
3375 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3376 (uint32_t)item->key_size);
3377 ms_add_key_to_iov(c, item);
3378
3379 return 0;
3380 } /* ms_build_bin_write_buf_get */
3381
3382
3383 /**
3384 * for binary protocol, this function build the multi-get
3385 * command and add the command to send buffer array.
3386 *
3387 * @param c, pointer of the concurrency
3388 * @param item, pointer of task item which includes the object
3389 * information
3390 *
3391 * @return int, if success, return 0, else return -1
3392 */
3393 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3394 {
3395 ms_task_item_t *item;
3396
3397 assert(c->wbuf == c->wcurr);
3398
3399 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3400 {
3401 item= c->mlget_task.mlget_item[i].item;
3402 assert(item != NULL);
3403
3404 ms_add_bin_header(c,
3405 PROTOCOL_BINARY_CMD_GET,
3406 0,
3407 (uint16_t)item->key_size,
3408 (uint32_t)item->key_size);
3409 ms_add_key_to_iov(c, item);
3410 c->wcurr+= sizeof(protocol_binary_request_get);
3411 }
3412
3413 c->wcurr= c->wbuf;
3414
3415 return 0;
3416 } /* ms_build_bin_write_buf_mlget */