Update all docs.
[awesomized/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23 #if TIME_WITH_SYS_TIME
24 # include <sys/time.h>
25 # include <time.h>
26 #else
27 # if HAVE_SYS_TIME_H
28 # include <sys/time.h>
29 # else
30 # include <time.h>
31 # endif
32 #endif
33 #include "ms_setting.h"
34 #include "ms_thread.h"
35 #include "ms_atomic.h"
36
37 #ifdef linux
38 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
39 * optimize the conversion functions, but the prototypes generate warnings
40 * from gcc. The conversion methods isn't the bottleneck for my app, so
41 * just remove the warnings by undef'ing the optimization ..
42 */
43 #undef ntohs
44 #undef ntohl
45 #undef htons
46 #undef htonl
47 #endif
48
49 /* for network write */
50 #define TRANSMIT_COMPLETE 0
51 #define TRANSMIT_INCOMPLETE 1
52 #define TRANSMIT_SOFT_ERROR 2
53 #define TRANSMIT_HARD_ERROR 3
54
55 /* for generating key */
56 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
57 #define KEY_PREFIX_MASK 0x1010101010101010
58
59 /* For parse the value length return by server */
60 #define KEY_TOKEN 1
61 #define VALUELEN_TOKEN 3
62
63 /* global increasing counter, to ensure the key prefix unique */
64 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
65
66 /* global increasing counter, generating request id for UDP */
67 static volatile uint32_t udp_request_id= 0;
68
69 extern pthread_key_t ms_thread_key;
70
71 /* generate upd request id */
72 static uint32_t ms_get_udp_request_id(void);
73
74
75 /* connect initialize */
76 static void ms_task_init(ms_conn_t *c);
77 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
78 static int ms_conn_sock_init(ms_conn_t *c);
79 static int ms_conn_event_init(ms_conn_t *c);
80 static int ms_conn_init(ms_conn_t *c,
81 const int init_state,
82 const int read_buffer_size,
83 const bool is_udp);
84 static void ms_warmup_num_init(ms_conn_t *c);
85 static int ms_item_win_init(ms_conn_t *c);
86
87
88 /* connection close */
89 void ms_conn_free(ms_conn_t *c);
90 static void ms_conn_close(ms_conn_t *c);
91
92
93 /* create network connection */
94 static int ms_new_socket(struct addrinfo *ai);
95 static void ms_maximize_sndbuf(const int sfd);
96 static int ms_network_connect(ms_conn_t *c,
97 char *srv_host_name,
98 const int srv_port,
99 const bool is_udp,
100 int *ret_sfd);
101 static int ms_reconn(ms_conn_t *c);
102
103
104 /* read and parse */
105 static int ms_tokenize_command(char *command,
106 token_t *tokens,
107 const int max_tokens);
108 static int ms_ascii_process_line(ms_conn_t *c, char *command);
109 static int ms_try_read_line(ms_conn_t *c);
110 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
111 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
112 static int ms_try_read_network(ms_conn_t *c);
113 static void ms_verify_value(ms_conn_t *c,
114 ms_mlget_task_item_t *mlget_item,
115 char *value,
116 int vlen);
117 static void ms_ascii_complete_nread(ms_conn_t *c);
118 static void ms_bin_complete_nread(ms_conn_t *c);
119 static void ms_complete_nread(ms_conn_t *c);
120
121
122 /* send functions */
123 static int ms_add_msghdr(ms_conn_t *c);
124 static int ms_ensure_iov_space(ms_conn_t *c);
125 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
126 static int ms_build_udp_headers(ms_conn_t *c);
127 static int ms_transmit(ms_conn_t *c);
128
129
130 /* status adjustment */
131 static void ms_conn_shrink(ms_conn_t *c);
132 static void ms_conn_set_state(ms_conn_t *c, int state);
133 static bool ms_update_event(ms_conn_t *c, const int new_flags);
134 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
135 static uint32_t ms_get_next_sock_index(ms_conn_t *c);
136 static int ms_update_conn_sock_event(ms_conn_t *c);
137 static bool ms_need_yield(ms_conn_t *c);
138 static void ms_update_start_time(ms_conn_t *c);
139
140
141 /* main loop */
142 static void ms_drive_machine(ms_conn_t *c);
143 void ms_event_handler(const int fd, const short which, void *arg);
144
145
146 /* ascii protocol */
147 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
148 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
149 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
150
151
152 /* binary protocol */
153 static int ms_bin_process_response(ms_conn_t *c);
154 static void ms_add_bin_header(ms_conn_t *c,
155 uint8_t opcode,
156 uint8_t hdr_len,
157 uint16_t key_len,
158 uint32_t body_len);
159 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
160 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
161 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
162 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
163
164
165 /**
166 * each key has two parts, prefix and suffix. The suffix is a
167 * string random get form the character table. The prefix is a
168 * uint64_t variable. And the prefix must be unique. we use the
169 * prefix to identify a key. And the prefix can't include
170 * character ' ' '\r' '\n' '\0'.
171 *
172 * @return uint64_t
173 */
174 uint64_t ms_get_key_prefix(void)
175 {
176 uint64_t key_prefix;
177
178 pthread_mutex_lock(&ms_global.seq_mutex);
179 key_prefix_seq|= KEY_PREFIX_MASK;
180 key_prefix= key_prefix_seq;
181 key_prefix_seq++;
182 pthread_mutex_unlock(&ms_global.seq_mutex);
183
184 return key_prefix;
185 } /* ms_get_key_prefix */
186
187
188 /**
189 * get an unique udp request id
190 *
191 * @return an unique UDP request id
192 */
193 static uint32_t ms_get_udp_request_id(void)
194 {
195 return atomic_add_32_nv(&udp_request_id, 1);
196 }
197
198
199 /**
200 * initialize current task structure
201 *
202 * @param c, pointer of the concurrency
203 */
204 static void ms_task_init(ms_conn_t *c)
205 {
206 c->curr_task.cmd= CMD_NULL;
207 c->curr_task.item= 0;
208 c->curr_task.verify= false;
209 c->curr_task.finish_verify= true;
210 c->curr_task.get_miss= true;
211
212 c->curr_task.get_opt= 0;
213 c->curr_task.set_opt= 0;
214 c->curr_task.cycle_undo_get= 0;
215 c->curr_task.cycle_undo_set= 0;
216 c->curr_task.verified_get= 0;
217 c->curr_task.overwrite_set= 0;
218 } /* ms_task_init */
219
220
221 /**
222 * initialize udp for the connection structure
223 *
224 * @param c, pointer of the concurrency
225 * @param is_udp, whether it's udp
226 *
227 * @return int, if success, return EXIT_SUCCESS, else return -1
228 */
229 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
230 {
231 c->hdrbuf= 0;
232 c->rudpbuf= 0;
233 c->udppkt= 0;
234
235 c->rudpsize= UDP_DATA_BUFFER_SIZE;
236 c->hdrsize= 0;
237
238 c->rudpbytes= 0;
239 c->packets= 0;
240 c->recvpkt= 0;
241 c->pktcurr= 0;
242 c->ordcurr= 0;
243
244 c->udp= is_udp;
245
246 if (c->udp || (! c->udp && ms_setting.facebook_test))
247 {
248 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
249 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
250
251 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
252 {
253 if (c->rudpbuf != NULL)
254 free(c->rudpbuf);
255 if (c->udppkt != NULL)
256 free(c->udppkt);
257 fprintf(stderr, "malloc()\n");
258 return -1;
259 }
260 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
261 }
262
263 return EXIT_SUCCESS;
264 } /* ms_conn_udp_init */
265
266
267 /**
268 * initialize the connection structure
269 *
270 * @param c, pointer of the concurrency
271 * @param init_state, (conn_read, conn_write, conn_closing)
272 * @param read_buffer_size
273 * @param is_udp, whether it's udp
274 *
275 * @return int, if success, return EXIT_SUCCESS, else return -1
276 */
277 static int ms_conn_init(ms_conn_t *c,
278 const int init_state,
279 const int read_buffer_size,
280 const bool is_udp)
281 {
282 assert(c != NULL);
283
284 c->rbuf= c->wbuf= 0;
285 c->iov= 0;
286 c->msglist= 0;
287
288 c->rsize= read_buffer_size;
289 c->wsize= WRITE_BUFFER_SIZE;
290 c->iovsize= IOV_LIST_INITIAL;
291 c->msgsize= MSG_LIST_INITIAL;
292
293 /* for replication, each connection need connect all the server */
294 if (ms_setting.rep_write_srv > 0)
295 {
296 c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
297 }
298 else
299 {
300 c->total_sfds= ms_setting.sock_per_conn;
301 }
302 c->alive_sfds= 0;
303
304 c->rbuf= (char *)malloc((size_t)c->rsize);
305 c->wbuf= (char *)malloc((size_t)c->wsize);
306 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
307 c->msglist= (struct msghdr *)malloc(
308 sizeof(struct msghdr) * (size_t)c->msgsize);
309 if (ms_setting.mult_key_num > 1)
310 {
311 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
312 malloc(
313 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
314 }
315 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
316
317 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
318 || (c->msglist == NULL) || (c->tcpsfd == NULL)
319 || ((ms_setting.mult_key_num > 1)
320 && (c->mlget_task.mlget_item == NULL)))
321 {
322 if (c->rbuf != NULL)
323 free(c->rbuf);
324 if (c->wbuf != NULL)
325 free(c->wbuf);
326 if (c->iov != NULL)
327 free(c->iov);
328 if (c->msglist != NULL)
329 free(c->msglist);
330 if (c->mlget_task.mlget_item != NULL)
331 free(c->mlget_task.mlget_item);
332 if (c->tcpsfd != NULL)
333 free(c->tcpsfd);
334 fprintf(stderr, "malloc()\n");
335 return -1;
336 }
337
338 c->state= init_state;
339 c->rvbytes= 0;
340 c->rbytes= 0;
341 c->rcurr= c->rbuf;
342 c->wcurr= c->wbuf;
343 c->iovused= 0;
344 c->msgcurr= 0;
345 c->msgused= 0;
346 c->cur_idx= c->total_sfds; /* default index is a invalid value */
347
348 c->ctnwrite= false;
349 c->readval= false;
350 c->change_sfd= false;
351
352 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
353 c->precmd.isfinish= true; /* default the previous command finished */
354 c->currcmd.isfinish= false;
355 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
356 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
357
358 c->mlget_task.mlget_num= 0;
359 c->mlget_task.value_index= -1; /* default invalid value */
360
361 if (ms_setting.binary_prot)
362 {
363 c->protocol= binary_prot;
364 }
365 else
366 {
367 c->protocol= ascii_prot;
368 }
369
370 /* initialize udp */
371 if (ms_conn_udp_init(c, is_udp) != 0)
372 {
373 return -1;
374 }
375
376 /* initialize task */
377 ms_task_init(c);
378
379 if (! (ms_setting.facebook_test && is_udp))
380 {
381 atomic_add_32(&ms_stats.active_conns, 1);
382 }
383
384 return EXIT_SUCCESS;
385 } /* ms_conn_init */
386
387
388 /**
389 * when doing 100% get operation, it could preset some objects
390 * to warmup the server. this function is used to initialize the
391 * number of the objects to preset.
392 *
393 * @param c, pointer of the concurrency
394 */
395 static void ms_warmup_num_init(ms_conn_t *c)
396 {
397 /* no set operation, preset all the items in the window */
398 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
399 {
400 c->warmup_num= c->win_size;
401 c->remain_warmup_num= c->warmup_num;
402 }
403 else
404 {
405 c->warmup_num= 0;
406 c->remain_warmup_num= c->warmup_num;
407 }
408 } /* ms_warmup_num_init */
409
410
411 /**
412 * each connection has an item window, this function initialize
413 * the window. The window is used to generate task.
414 *
415 * @param c, pointer of the concurrency
416 *
417 * @return int, if success, return EXIT_SUCCESS, else return -1
418 */
419 static int ms_item_win_init(ms_conn_t *c)
420 {
421 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
422 int exp_cnt= 0;
423
424 c->win_size= (int)ms_setting.win_size;
425 c->set_cursor= 0;
426 c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
427 c->remain_exec_num= c->exec_num;
428
429 c->item_win= (ms_task_item_t *)malloc(
430 sizeof(ms_task_item_t) * (size_t)c->win_size);
431 if (c->item_win == NULL)
432 {
433 fprintf(stderr, "Can't allocate task item array for conn.\n");
434 return -1;
435 }
436 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
437
438 for (int i= 0; i < c->win_size; i++)
439 {
440 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
441 c->item_win[i].key_prefix= ms_get_key_prefix();
442 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
443 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
444 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
445 c->item_win[i].client_time= 0;
446
447 /* set expire time base on the proportion */
448 if (exp_cnt < ms_setting.exp_ver_per * i)
449 {
450 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
451 exp_cnt++;
452 }
453 else
454 {
455 c->item_win[i].exp_time= 0;
456 }
457 }
458
459 ms_warmup_num_init(c);
460
461 return EXIT_SUCCESS;
462 } /* ms_item_win_init */
463
464
465 /**
466 * each connection structure can include one or more sock
467 * handlers. this function create these socks and connect the
468 * server(s).
469 *
470 * @param c, pointer of the concurrency
471 *
472 * @return int, if success, return EXIT_SUCCESS, else return -1
473 */
474 static int ms_conn_sock_init(ms_conn_t *c)
475 {
476 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
477 uint32_t i;
478 int ret_sfd;
479 uint32_t srv_idx= 0;
480
481 assert(c != NULL);
482 assert(c->tcpsfd != NULL);
483
484 for (i= 0; i < c->total_sfds; i++)
485 {
486 ret_sfd= 0;
487 if (ms_setting.rep_write_srv > 0)
488 {
489 /* for replication, each connection need connect all the server */
490 srv_idx= i % ms_setting.srv_cnt;
491 }
492 else
493 {
494 /* all the connections in a thread connects the same server */
495 srv_idx= ms_thread->thread_ctx->srv_idx;
496 }
497
498 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
499 ms_setting.servers[srv_idx].srv_port,
500 ms_setting.udp, &ret_sfd) != 0)
501 {
502 break;
503 }
504
505 if (i == 0)
506 {
507 c->sfd= ret_sfd;
508 }
509
510 if (! ms_setting.udp)
511 {
512 c->tcpsfd[i]= ret_sfd;
513 }
514
515 c->alive_sfds++;
516 }
517
518 /* initialize udp sock handler if necessary */
519 if (ms_setting.facebook_test)
520 {
521 ret_sfd= 0;
522 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
523 ms_setting.servers[srv_idx].srv_port,
524 true, &ret_sfd) != 0)
525 {
526 c->udpsfd= 0;
527 }
528 else
529 {
530 c->udpsfd= ret_sfd;
531 }
532 }
533
534 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
535 {
536 if (ms_setting.udp)
537 {
538 close(c->sfd);
539 }
540 else
541 {
542 for (uint32_t j= 0; j < i; j++)
543 {
544 close(c->tcpsfd[j]);
545 }
546 }
547
548 if (c->udpsfd != 0)
549 {
550 close(c->udpsfd);
551 }
552
553 return -1;
554 }
555
556 return EXIT_SUCCESS;
557 } /* ms_conn_sock_init */
558
559
560 /**
561 * each connection is managed by libevent, this function
562 * initialize the event of the connection structure.
563 *
564 * @param c, pointer of the concurrency
565 *
566 * @return int, if success, return EXIT_SUCCESS, else return -1
567 */
568 static int ms_conn_event_init(ms_conn_t *c)
569 {
570 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
571 short event_flags= EV_WRITE | EV_PERSIST;
572
573 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
574 event_base_set(ms_thread->base, &c->event);
575 c->ev_flags= event_flags;
576
577 if (event_add(&c->event, NULL) == -1)
578 {
579 return -1;
580 }
581
582 return EXIT_SUCCESS;
583 } /* ms_conn_event_init */
584
585
586 /**
587 * setup a connection, each connection structure of each
588 * thread must call this function to initialize.
589 *
590 * @param c, pointer of the concurrency
591 *
592 * @return int, if success, return EXIT_SUCCESS, else return -1
593 */
594 int ms_setup_conn(ms_conn_t *c)
595 {
596 if (ms_item_win_init(c) != 0)
597 {
598 return -1;
599 }
600
601 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
602 {
603 return -1;
604 }
605
606 if (ms_conn_sock_init(c) != 0)
607 {
608 return -1;
609 }
610
611 if (ms_conn_event_init(c) != 0)
612 {
613 return -1;
614 }
615
616 return EXIT_SUCCESS;
617 } /* ms_setup_conn */
618
619
620 /**
621 * Frees a connection.
622 *
623 * @param c, pointer of the concurrency
624 */
625 void ms_conn_free(ms_conn_t *c)
626 {
627 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
628 if (c != NULL)
629 {
630 if (c->hdrbuf != NULL)
631 free(c->hdrbuf);
632 if (c->msglist != NULL)
633 free(c->msglist);
634 if (c->rbuf != NULL)
635 free(c->rbuf);
636 if (c->wbuf != NULL)
637 free(c->wbuf);
638 if (c->iov != NULL)
639 free(c->iov);
640 if (c->mlget_task.mlget_item != NULL)
641 free(c->mlget_task.mlget_item);
642 if (c->rudpbuf != NULL)
643 free(c->rudpbuf);
644 if (c->udppkt != NULL)
645 free(c->udppkt);
646 if (c->item_win != NULL)
647 free(c->item_win);
648 if (c->tcpsfd != NULL)
649 free(c->tcpsfd);
650
651 if (--ms_thread->nactive_conn == 0)
652 {
653 free(ms_thread->conn);
654 }
655 }
656 } /* ms_conn_free */
657
658
659 /**
660 * close a connection
661 *
662 * @param c, pointer of the concurrency
663 */
664 static void ms_conn_close(ms_conn_t *c)
665 {
666 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
667 assert(c != NULL);
668
669 /* delete the event, the socket and the connection */
670 event_del(&c->event);
671
672 for (uint32_t i= 0; i < c->total_sfds; i++)
673 {
674 if (c->tcpsfd[i] > 0)
675 {
676 close(c->tcpsfd[i]);
677 }
678 }
679 c->sfd= 0;
680
681 if (ms_setting.facebook_test)
682 {
683 close(c->udpsfd);
684 }
685
686 atomic_dec_32(&ms_stats.active_conns);
687
688 ms_conn_free(c);
689
690 if (ms_setting.run_time == 0)
691 {
692 pthread_mutex_lock(&ms_global.run_lock.lock);
693 ms_global.run_lock.count++;
694 pthread_cond_signal(&ms_global.run_lock.cond);
695 pthread_mutex_unlock(&ms_global.run_lock.lock);
696 }
697
698 if (ms_thread->nactive_conn == 0)
699 {
700 pthread_exit(NULL);
701 }
702 } /* ms_conn_close */
703
704
705 /**
706 * create a new sock
707 *
708 * @param ai, server address information
709 *
710 * @return int, if success, return EXIT_SUCCESS, else return -1
711 */
712 static int ms_new_socket(struct addrinfo *ai)
713 {
714 int sfd;
715
716 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
717 {
718 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
719 return -1;
720 }
721
722 return sfd;
723 } /* ms_new_socket */
724
725
726 /**
727 * Sets a socket's send buffer size to the maximum allowed by the system.
728 *
729 * @param sfd, file descriptor of socket
730 */
731 static void ms_maximize_sndbuf(const int sfd)
732 {
733 socklen_t intsize= sizeof(int);
734 unsigned int last_good= 0;
735 unsigned int min, max, avg;
736 unsigned int old_size;
737
738 /* Start with the default size. */
739 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
740 {
741 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
742 return;
743 }
744
745 /* Binary-search for the real maximum. */
746 min= old_size;
747 max= MAX_SENDBUF_SIZE;
748
749 while (min <= max)
750 {
751 avg= ((unsigned int)(min + max)) / 2;
752 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
753 {
754 last_good= avg;
755 min= avg + 1;
756 }
757 else
758 {
759 max= avg - 1;
760 }
761 }
762 } /* ms_maximize_sndbuf */
763
764
765 /**
766 * socket connects the server
767 *
768 * @param c, pointer of the concurrency
769 * @param srv_host_name, the host name of the server
770 * @param srv_port, port of server
771 * @param is_udp, whether it's udp
772 * @param ret_sfd, the connected socket file descriptor
773 *
774 * @return int, if success, return EXIT_SUCCESS, else return -1
775 */
776 static int ms_network_connect(ms_conn_t *c,
777 char *srv_host_name,
778 const int srv_port,
779 const bool is_udp,
780 int *ret_sfd)
781 {
782 int sfd;
783 struct linger ling=
784 {
785 0, 0
786 };
787 struct addrinfo *ai;
788 struct addrinfo *next;
789 struct addrinfo hints;
790 char port_buf[NI_MAXSERV];
791 int error;
792 int success= 0;
793
794 int flags= 1;
795
796 /*
797 * the memset call clears nonstandard fields in some impementations
798 * that otherwise mess things up.
799 */
800 memset(&hints, 0, sizeof(hints));
801 #ifdef AI_ADDRCONFIG
802 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
803 #else
804 hints.ai_flags= AI_PASSIVE;
805 #endif /* AI_ADDRCONFIG */
806 if (is_udp)
807 {
808 hints.ai_protocol= IPPROTO_UDP;
809 hints.ai_socktype= SOCK_DGRAM;
810 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
811 }
812 else
813 {
814 hints.ai_family= AF_UNSPEC;
815 hints.ai_protocol= IPPROTO_TCP;
816 hints.ai_socktype= SOCK_STREAM;
817 }
818
819 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
820 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
821 if (error != 0)
822 {
823 if (error != EAI_SYSTEM)
824 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
825 else
826 perror("getaddrinfo()\n");
827
828 return -1;
829 }
830
831 for (next= ai; next; next= next->ai_next)
832 {
833 if ((sfd= ms_new_socket(next)) == -1)
834 {
835 freeaddrinfo(ai);
836 return -1;
837 }
838
839 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
840 if (is_udp)
841 {
842 ms_maximize_sndbuf(sfd);
843 }
844 else
845 {
846 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
847 sizeof(flags));
848 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
849 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
850 sizeof(flags));
851 }
852
853 if (is_udp)
854 {
855 c->srv_recv_addr_size= sizeof(struct sockaddr);
856 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
857 }
858 else
859 {
860 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
861 {
862 close(sfd);
863 freeaddrinfo(ai);
864 return -1;
865 }
866 }
867
868 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
869 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
870 {
871 fprintf(stderr, "setting O_NONBLOCK\n");
872 close(sfd);
873 freeaddrinfo(ai);
874 return -1;
875 }
876
877 if (ret_sfd != NULL)
878 {
879 *ret_sfd= sfd;
880 }
881
882 success++;
883 }
884
885 freeaddrinfo(ai);
886
887 /* Return zero if we detected no errors in starting up connections */
888 return success == 0;
889 } /* ms_network_connect */
890
891
892 /**
893 * reconnect a disconnected sock
894 *
895 * @param c, pointer of the concurrency
896 *
897 * @return int, if success, return EXIT_SUCCESS, else return -1
898 */
899 static int ms_reconn(ms_conn_t *c)
900 {
901 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
902 uint32_t srv_idx= 0;
903 uint32_t srv_conn_cnt= 0;
904
905 if (ms_setting.rep_write_srv > 0)
906 {
907 srv_idx= c->cur_idx % ms_setting.srv_cnt;
908 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
909 }
910 else
911 {
912 srv_idx= ms_thread->thread_ctx->srv_idx;
913 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
914 }
915
916 /* close the old socket handler */
917 close(c->sfd);
918 c->tcpsfd[c->cur_idx]= 0;
919
920 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
921 % srv_conn_cnt == 0)
922 {
923 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
924 fprintf(stderr, "Server %s:%d disconnect\n",
925 ms_setting.servers[srv_idx].srv_host_name,
926 ms_setting.servers[srv_idx].srv_port);
927 }
928
929 if (ms_setting.rep_write_srv > 0)
930 {
931 uint32_t i= 0;
932
933 for (i= 0; i < c->total_sfds; i++)
934 {
935 if (c->tcpsfd[i] != 0)
936 {
937 break;
938 }
939 }
940
941 /* all socks disconnect */
942 if (i == c->total_sfds)
943 {
944 return -1;
945 }
946 }
947 else
948 {
949 do
950 {
951 /* reconnect success, break the loop */
952 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
953 ms_setting.servers[srv_idx].srv_port,
954 ms_setting.udp, &c->sfd) == 0)
955 {
956 c->tcpsfd[c->cur_idx]= c->sfd;
957 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
958 % (uint32_t)srv_conn_cnt == 0)
959 {
960 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
961 int reconn_time=
962 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
963 - ms_setting.servers[srv_idx].disconn_time
964 .tv_sec);
965 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
966 ms_setting.servers[srv_idx].srv_host_name,
967 ms_setting.servers[srv_idx].srv_port, reconn_time);
968 }
969 break;
970 }
971
972 if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
973 {
974 /* wait a second and reconnect */
975 sleep(1);
976 }
977 }
978 while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
979 }
980
981 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
982 {
983 c->sfd= 0;
984 c->alive_sfds--;
985 }
986
987 return EXIT_SUCCESS;
988 } /* ms_reconn */
989
990
991 /**
992 * reconnect several disconnected socks in the connection
993 * structure, the ever-1-second timer of the thread will check
994 * whether some socks in the connections disconnect. if
995 * disconnect, reconnect the sock.
996 *
997 * @param c, pointer of the concurrency
998 *
999 * @return int, if success, return EXIT_SUCCESS, else return -1
1000 */
1001 int ms_reconn_socks(ms_conn_t *c)
1002 {
1003 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
1004 uint32_t srv_idx= 0;
1005 int ret_sfd= 0;
1006 uint32_t srv_conn_cnt= 0;
1007 struct timeval cur_time;
1008
1009 assert(c != NULL);
1010
1011 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1012 {
1013 return EXIT_SUCCESS;
1014 }
1015
1016 for (uint32_t i= 0; i < c->total_sfds; i++)
1017 {
1018 if (c->tcpsfd[i] == 0)
1019 {
1020 gettimeofday(&cur_time, NULL);
1021
1022 /**
1023 * For failover test of replication, reconnect the socks after
1024 * it disconnects more than 5 seconds, Otherwise memslap will
1025 * block at connect() function and the work threads can't work
1026 * in this interval.
1027 */
1028 if (cur_time.tv_sec
1029 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1030 {
1031 break;
1032 }
1033
1034 if (ms_setting.rep_write_srv > 0)
1035 {
1036 srv_idx= i % ms_setting.srv_cnt;
1037 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
1038 }
1039 else
1040 {
1041 srv_idx= ms_thread->thread_ctx->srv_idx;
1042 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1043 }
1044
1045 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1046 ms_setting.servers[srv_idx].srv_port,
1047 ms_setting.udp, &ret_sfd) == 0)
1048 {
1049 c->tcpsfd[i]= ret_sfd;
1050 c->alive_sfds++;
1051
1052 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1053 % (uint32_t)srv_conn_cnt == 0)
1054 {
1055 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1056 int reconn_time=
1057 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1058 - ms_setting.servers[srv_idx].disconn_time
1059 .tv_sec);
1060 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1061 ms_setting.servers[srv_idx].srv_host_name,
1062 ms_setting.servers[srv_idx].srv_port, reconn_time);
1063 }
1064 }
1065 }
1066 }
1067
1068 return EXIT_SUCCESS;
1069 } /* ms_reconn_socks */
1070
1071
1072 /**
1073 * Tokenize the command string by replacing whitespace with '\0' and update
1074 * the token array tokens with pointer to start of each token and length.
1075 * Returns total number of tokens. The last valid token is the terminal
1076 * token (value points to the first unprocessed character of the string and
1077 * length zero).
1078 *
1079 * Usage example:
1080 *
1081 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1082 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1083 * ...
1084 * }
1085 * ncommand = tokens[ix].value - command;
1086 * command = tokens[ix].value;
1087 * }
1088 *
1089 * @param command, the command string to token
1090 * @param tokens, array to store tokens
1091 * @param max_tokens, maximum tokens number
1092 *
1093 * @return int, the number of tokens
1094 */
1095 static int ms_tokenize_command(char *command,
1096 token_t *tokens,
1097 const int max_tokens)
1098 {
1099 char *s, *e;
1100 int ntokens= 0;
1101
1102 assert(command != NULL && tokens != NULL && max_tokens > 1);
1103
1104 for (s= e= command; ntokens < max_tokens - 1; ++e)
1105 {
1106 if (*e == ' ')
1107 {
1108 if (s != e)
1109 {
1110 tokens[ntokens].value= s;
1111 tokens[ntokens].length= (size_t)(e - s);
1112 ntokens++;
1113 *e= '\0';
1114 }
1115 s= e + 1;
1116 }
1117 else if (*e == '\0')
1118 {
1119 if (s != e)
1120 {
1121 tokens[ntokens].value= s;
1122 tokens[ntokens].length= (size_t)(e - s);
1123 ntokens++;
1124 }
1125
1126 break; /* string end */
1127 }
1128 }
1129
1130 return ntokens;
1131 } /* ms_tokenize_command */
1132
1133
1134 /**
1135 * parse the response of server.
1136 *
1137 * @param c, pointer of the concurrency
1138 * @param command, the string responded by server
1139 *
1140 * @return int, if the command completed return EXIT_SUCCESS, else return
1141 * -1
1142 */
1143 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1144 {
1145 int ret= 0;
1146 int64_t value_len;
1147 char *buffer= command;
1148
1149 assert(c != NULL);
1150
1151 /**
1152 * for command get, we store the returned value into local buffer
1153 * then continue in ms_complete_nread().
1154 */
1155
1156 switch (buffer[0])
1157 {
1158 case 'V': /* VALUE || VERSION */
1159 if (buffer[1] == 'A') /* VALUE */
1160 {
1161 token_t tokens[MAX_TOKENS];
1162 ms_tokenize_command(command, tokens, MAX_TOKENS);
1163 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1164 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1165
1166 /*
1167 * We read the \r\n into the string since not doing so is more
1168 * cycles then the waster of memory to do so.
1169 *
1170 * We are null terminating through, which will most likely make
1171 * some people lazy about using the return length.
1172 */
1173 c->rvbytes= (int)(value_len + 2);
1174 c->readval= true;
1175 ret= -1;
1176 }
1177
1178 break;
1179
1180 case 'O': /* OK */
1181 c->currcmd.retstat= MCD_SUCCESS;
1182
1183 case 'S': /* STORED STATS SERVER_ERROR */
1184 if (buffer[2] == 'A') /* STORED STATS */
1185 { /* STATS*/
1186 c->currcmd.retstat= MCD_STAT;
1187 }
1188 else if (buffer[1] == 'E')
1189 {
1190 /* SERVER_ERROR */
1191 printf("<%d %s\n", c->sfd, buffer);
1192
1193 c->currcmd.retstat= MCD_SERVER_ERROR;
1194 }
1195 else if (buffer[1] == 'T')
1196 {
1197 /* STORED */
1198 c->currcmd.retstat= MCD_STORED;
1199 }
1200 else
1201 {
1202 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1203 }
1204 break;
1205
1206 case 'D': /* DELETED DATA */
1207 if (buffer[1] == 'E')
1208 {
1209 c->currcmd.retstat= MCD_DELETED;
1210 }
1211 else
1212 {
1213 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1214 }
1215
1216 break;
1217
1218 case 'N': /* NOT_FOUND NOT_STORED*/
1219 if (buffer[4] == 'F')
1220 {
1221 c->currcmd.retstat= MCD_NOTFOUND;
1222 }
1223 else if (buffer[4] == 'S')
1224 {
1225 printf("<%d %s\n", c->sfd, buffer);
1226 c->currcmd.retstat= MCD_NOTSTORED;
1227 }
1228 else
1229 {
1230 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1231 }
1232 break;
1233
1234 case 'E': /* PROTOCOL ERROR or END */
1235 if (buffer[1] == 'N')
1236 {
1237 /* END */
1238 c->currcmd.retstat= MCD_END;
1239 }
1240 else if (buffer[1] == 'R')
1241 {
1242 printf("<%d ERROR\n", c->sfd);
1243 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1244 }
1245 else if (buffer[1] == 'X')
1246 {
1247 c->currcmd.retstat= MCD_DATA_EXISTS;
1248 printf("<%d %s\n", c->sfd, buffer);
1249 }
1250 else
1251 {
1252 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1253 }
1254 break;
1255
1256 case 'C': /* CLIENT ERROR */
1257 printf("<%d %s\n", c->sfd, buffer);
1258 c->currcmd.retstat= MCD_CLIENT_ERROR;
1259 break;
1260
1261 default:
1262 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1263 break;
1264 } /* switch */
1265
1266 return ret;
1267 } /* ms_ascii_process_line */
1268
1269
1270 /**
1271 * after one operation completes, reset the concurrency
1272 *
1273 * @param c, pointer of the concurrency
1274 * @param timeout, whether it's timeout
1275 */
1276 void ms_reset_conn(ms_conn_t *c, bool timeout)
1277 {
1278 assert(c != NULL);
1279
1280 if (c->udp)
1281 {
1282 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1283 {
1284 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
1285 }
1286
1287 c->packets= 0;
1288 c->recvpkt= 0;
1289 c->pktcurr= 0;
1290 c->ordcurr= 0;
1291 c->rudpbytes= 0;
1292 }
1293 c->currcmd.isfinish= true;
1294 c->ctnwrite= false;
1295 c->rbytes= 0;
1296 c->rcurr= c->rbuf;
1297 c->msgcurr = 0;
1298 c->msgused = 0;
1299 c->iovused = 0;
1300 ms_conn_set_state(c, conn_write);
1301 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1302
1303 if (timeout)
1304 {
1305 ms_drive_machine(c);
1306 }
1307 } /* ms_reset_conn */
1308
1309
1310 /**
1311 * if we have a complete line in the buffer, process it.
1312 *
1313 * @param c, pointer of the concurrency
1314 *
1315 * @return int, if success, return EXIT_SUCCESS, else return -1
1316 */
1317 static int ms_try_read_line(ms_conn_t *c)
1318 {
1319 if (c->protocol == binary_prot)
1320 {
1321 /* Do we have the complete packet header? */
1322 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1323 {
1324 /* need more data! */
1325 return EXIT_SUCCESS;
1326 }
1327 else
1328 {
1329 #ifdef NEED_ALIGN
1330 if (((long)(c->rcurr)) % 8 != 0)
1331 {
1332 /* must realign input buffer */
1333 memmove(c->rbuf, c->rcurr, c->rbytes);
1334 c->rcurr= c->rbuf;
1335 if (settings.verbose)
1336 {
1337 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1338 }
1339 }
1340 #endif
1341 protocol_binary_response_header *rsp;
1342 rsp= (protocol_binary_response_header *)c->rcurr;
1343
1344 c->binary_header= *rsp;
1345 c->binary_header.response.extlen= rsp->response.extlen;
1346 c->binary_header.response.keylen= ntohs(rsp->response.keylen);
1347 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1348 c->binary_header.response.status= ntohs(rsp->response.status);
1349
1350 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1351 {
1352 fprintf(stderr, "Invalid magic: %x\n",
1353 c->binary_header.response.magic);
1354 ms_conn_set_state(c, conn_closing);
1355 return EXIT_SUCCESS;
1356 }
1357
1358 /* process this complete response */
1359 if (ms_bin_process_response(c) == 0)
1360 {
1361 /* current operation completed */
1362 ms_reset_conn(c, false);
1363 return -1;
1364 }
1365 else
1366 {
1367 c->rbytes-= (int32_t)sizeof(c->binary_header);
1368 c->rcurr+= sizeof(c->binary_header);
1369 }
1370 }
1371 }
1372 else
1373 {
1374 char *el, *cont;
1375
1376 assert(c != NULL);
1377 assert(c->rcurr <= (c->rbuf + c->rsize));
1378
1379 if (c->rbytes == 0)
1380 return EXIT_SUCCESS;
1381
1382 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1383 if (! el)
1384 return EXIT_SUCCESS;
1385
1386 cont= el + 1;
1387 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1388 {
1389 el--;
1390 }
1391 *el= '\0';
1392
1393 assert(cont <= (c->rcurr + c->rbytes));
1394
1395 /* process this complete line */
1396 if (ms_ascii_process_line(c, c->rcurr) == 0)
1397 {
1398 /* current operation completed */
1399 ms_reset_conn(c, false);
1400 return -1;
1401 }
1402 else
1403 {
1404 /* current operation didn't complete */
1405 c->rbytes-= (int32_t)(cont - c->rcurr);
1406 c->rcurr= cont;
1407 }
1408
1409 assert(c->rcurr <= (c->rbuf + c->rsize));
1410 }
1411
1412 return -1;
1413 } /* ms_try_read_line */
1414
1415
1416 /**
1417 * because the packet of UDP can't ensure the order, the
1418 * function is used to sort the received udp packet.
1419 *
1420 * @param c, pointer of the concurrency
1421 * @param buf, the buffer to store the ordered packages data
1422 * @param rbytes, the maximum capacity of the buffer
1423 *
1424 * @return int, if success, return the copy bytes, else return
1425 * -1
1426 */
1427 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1428 {
1429 int len= 0;
1430 int wbytes= 0;
1431 uint16_t req_id= 0;
1432 uint16_t seq_num= 0;
1433 uint16_t packets= 0;
1434 unsigned char *header= NULL;
1435
1436 /* no enough data */
1437 assert(c != NULL);
1438 assert(buf != NULL);
1439 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1440
1441 /* calculate received packets count */
1442 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1443 {
1444 /* the last packet has some data */
1445 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1446 }
1447 else
1448 {
1449 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1450 }
1451
1452 /* get the total packets count if necessary */
1453 if (c->packets == 0)
1454 {
1455 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1456 }
1457
1458 /* build the ordered packet array */
1459 for (int i= c->pktcurr; i < c->recvpkt; i++)
1460 {
1461 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1462 req_id= (uint16_t)HEADER_TO_REQID(header);
1463 assert(req_id == c->request_id % (1 << 16));
1464
1465 packets= (uint16_t)HEADER_TO_PACKETS(header);
1466 assert(c->packets == HEADER_TO_PACKETS(header));
1467
1468 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1469 c->udppkt[seq_num].header= header;
1470 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1471
1472 if (i == c->recvpkt - 1)
1473 {
1474 /* last received packet */
1475 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1476 {
1477 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1478 c->pktcurr++;
1479 }
1480 else
1481 {
1482 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1483 - UDP_HEADER_SIZE;
1484 }
1485 }
1486 else
1487 {
1488 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1489 c->pktcurr++;
1490 }
1491 }
1492
1493 for (int i= c->ordcurr; i < c->recvpkt; i++)
1494 {
1495 /* there is some data to copy */
1496 if ((c->udppkt[i].data != NULL)
1497 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1498 {
1499 header= c->udppkt[i].header;
1500 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1501 if (len > rbytes - wbytes)
1502 {
1503 len= rbytes - wbytes;
1504 }
1505
1506 assert(len <= rbytes - wbytes);
1507 assert(i == HEADER_TO_SEQNUM(header));
1508
1509 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1510 (size_t)len);
1511 wbytes+= len;
1512 c->udppkt[i].copybytes+= len;
1513
1514 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1515 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1516 {
1517 /* finish copying all the data of this packet, next */
1518 c->ordcurr++;
1519 }
1520
1521 /* last received packet, and finish copying all the data */
1522 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1523 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1524 {
1525 break;
1526 }
1527
1528 /* no space to copy data */
1529 if (wbytes >= rbytes)
1530 {
1531 break;
1532 }
1533
1534 /* it doesn't finish reading all the data of the packet from network */
1535 if ((i != c->recvpkt - 1)
1536 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1537 {
1538 break;
1539 }
1540 }
1541 else
1542 {
1543 /* no data to copy */
1544 break;
1545 }
1546 }
1547
1548 return wbytes == 0 ? -1 : wbytes;
1549 } /* ms_sort_udp_packet */
1550
1551
1552 /**
1553 * encapsulate upd read like tcp read
1554 *
1555 * @param c, pointer of the concurrency
1556 * @param buf, read buffer
1557 * @param len, length to read
1558 *
1559 * @return int, if success, return the read bytes, else return
1560 * -1
1561 */
1562 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1563 {
1564 int res= 0;
1565 int avail= 0;
1566 int rbytes= 0;
1567 int copybytes= 0;
1568
1569 assert(c->udp);
1570
1571 while (1)
1572 {
1573 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1574 {
1575 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1576 if (! new_rbuf)
1577 {
1578 fprintf(stderr, "Couldn't realloc input buffer.\n");
1579 c->rudpbytes= 0; /* ignore what we read */
1580 return -1;
1581 }
1582 c->rudpbuf= new_rbuf;
1583 c->rudpsize*= 2;
1584 }
1585
1586 avail= c->rudpsize - c->rudpbytes;
1587 /* UDP each time read a packet, 1400 bytes */
1588 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1589
1590 if (res > 0)
1591 {
1592 atomic_add_size(&ms_stats.bytes_read, res);
1593 c->rudpbytes+= res;
1594 rbytes+= res;
1595 if (res == avail)
1596 {
1597 continue;
1598 }
1599 else
1600 {
1601 break;
1602 }
1603 }
1604
1605 if (res == 0)
1606 {
1607 /* "connection" closed */
1608 return res;
1609 }
1610
1611 if (res == -1)
1612 {
1613 /* no data to read */
1614 return res;
1615 }
1616 }
1617
1618 /* copy data to read buffer */
1619 if (rbytes > 0)
1620 {
1621 copybytes= ms_sort_udp_packet(c, buf, len);
1622 }
1623
1624 if (copybytes == -1)
1625 {
1626 atomic_add_size(&ms_stats.pkt_disorder, 1);
1627 }
1628
1629 return copybytes;
1630 } /* ms_udp_read */
1631
1632
1633 /*
1634 * read from network as much as we can, handle buffer overflow and connection
1635 * close.
1636 * before reading, move the remaining incomplete fragment of a command
1637 * (if any) to the beginning of the buffer.
1638 * return EXIT_SUCCESS if there's nothing to read on the first read.
1639 */
1640
1641 /**
1642 * read from network as much as we can, handle buffer overflow and connection
1643 * close. before reading, move the remaining incomplete fragment of a command
1644 * (if any) to the beginning of the buffer.
1645 *
1646 * @param c, pointer of the concurrency
1647 *
1648 * @return int,
1649 * return EXIT_SUCCESS if there's nothing to read on the first read.
1650 * return EXIT_FAILURE if get data
1651 * return -1 if error happens
1652 */
1653 static int ms_try_read_network(ms_conn_t *c)
1654 {
1655 int gotdata= 0;
1656 int res;
1657 int64_t avail;
1658
1659 assert(c != NULL);
1660
1661 if ((c->rcurr != c->rbuf)
1662 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1663 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1664 {
1665 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1666 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1667 c->rcurr= c->rbuf;
1668 }
1669
1670 while (1)
1671 {
1672 if (c->rbytes >= c->rsize)
1673 {
1674 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1675 if (! new_rbuf)
1676 {
1677 fprintf(stderr, "Couldn't realloc input buffer.\n");
1678 c->rbytes= 0; /* ignore what we read */
1679 return -1;
1680 }
1681 c->rcurr= c->rbuf= new_rbuf;
1682 c->rsize*= 2;
1683 }
1684
1685 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1686 if (avail == 0)
1687 {
1688 break;
1689 }
1690
1691 if (c->udp)
1692 {
1693 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1694 }
1695 else
1696 {
1697 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1698 }
1699
1700 if (res > 0)
1701 {
1702 if (! c->udp)
1703 {
1704 atomic_add_size(&ms_stats.bytes_read, res);
1705 }
1706 gotdata= 1;
1707 c->rbytes+= res;
1708 if (res == avail)
1709 {
1710 continue;
1711 }
1712 else
1713 {
1714 break;
1715 }
1716 }
1717 if (res == 0)
1718 {
1719 /* connection closed */
1720 ms_conn_set_state(c, conn_closing);
1721 return -1;
1722 }
1723 if (res == -1)
1724 {
1725 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1726 break;
1727 /* Should close on unhandled errors. */
1728 ms_conn_set_state(c, conn_closing);
1729 return -1;
1730 }
1731 }
1732
1733 return gotdata;
1734 } /* ms_try_read_network */
1735
1736
1737 /**
1738 * after get the object from server, verify the value if
1739 * necessary.
1740 *
1741 * @param c, pointer of the concurrency
1742 * @param mlget_item, pointer of mulit-get task item structure
1743 * @param value, received value string
1744 * @param vlen, received value string length
1745 */
1746 static void ms_verify_value(ms_conn_t *c,
1747 ms_mlget_task_item_t *mlget_item,
1748 char *value,
1749 int vlen)
1750 {
1751 if (c->curr_task.verify)
1752 {
1753 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1754 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1755 char *orignkey=
1756 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1757
1758 /* verify expire time if necessary */
1759 if (c->curr_task.item->exp_time > 0)
1760 {
1761 struct timeval curr_time;
1762 gettimeofday(&curr_time, NULL);
1763
1764 /* object expired but get it now */
1765 if (curr_time.tv_sec - c->curr_task.item->client_time
1766 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1767 {
1768 atomic_add_size(&ms_stats.exp_get, 1);
1769
1770 if (ms_setting.verbose)
1771 {
1772 char set_time[64];
1773 char cur_time[64];
1774 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1775 localtime(&c->curr_task.item->client_time));
1776 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1777 localtime(&curr_time.tv_sec));
1778 fprintf(stderr,
1779 "\n<%d expire time verification failed, "
1780 "object expired but get it now\n"
1781 "\tkey len: %d\n"
1782 "\tkey: %" PRIx64 " %.*s\n"
1783 "\tset time: %s current time: %s "
1784 "diff time: %d expire time: %d\n"
1785 "\texpected data: \n"
1786 "\treceived data len: %d\n"
1787 "\treceived data: %.*s\n",
1788 c->sfd,
1789 c->curr_task.item->key_size,
1790 c->curr_task.item->key_prefix,
1791 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1792 orignkey,
1793 set_time,
1794 cur_time,
1795 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1796 c->curr_task.item->exp_time,
1797 vlen,
1798 vlen,
1799 value);
1800 fflush(stderr);
1801 }
1802 }
1803 }
1804 else
1805 {
1806 if ((c->curr_task.item->value_size != vlen)
1807 || (memcmp(orignval, value, (size_t)vlen) != 0))
1808 {
1809 atomic_add_size(&ms_stats.vef_failed, 1);
1810
1811 if (ms_setting.verbose)
1812 {
1813 fprintf(stderr,
1814 "\n<%d data verification failed\n"
1815 "\tkey len: %d\n"
1816 "\tkey: %" PRIx64" %.*s\n"
1817 "\texpected data len: %d\n"
1818 "\texpected data: %.*s\n"
1819 "\treceived data len: %d\n"
1820 "\treceived data: %.*s\n",
1821 c->sfd,
1822 c->curr_task.item->key_size,
1823 c->curr_task.item->key_prefix,
1824 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1825 orignkey,
1826 c->curr_task.item->value_size,
1827 c->curr_task.item->value_size,
1828 orignval,
1829 vlen,
1830 vlen,
1831 value);
1832 fflush(stderr);
1833 }
1834 }
1835 }
1836
1837 c->curr_task.finish_verify= true;
1838
1839 if (mlget_item != NULL)
1840 {
1841 mlget_item->finish_verify= true;
1842 }
1843 }
1844 } /* ms_verify_value */
1845
1846
1847 /**
1848 * For ASCII protocol, after store the data into the local
1849 * buffer, run this function to handle the data.
1850 *
1851 * @param c, pointer of the concurrency
1852 */
1853 static void ms_ascii_complete_nread(ms_conn_t *c)
1854 {
1855 assert(c != NULL);
1856 assert(c->rbytes >= c->rvbytes);
1857 assert(c->protocol == ascii_prot);
1858 if (c->rvbytes > 2)
1859 {
1860 assert(
1861 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1862 }
1863
1864 /* multi-get */
1865 ms_mlget_task_item_t *mlget_item= NULL;
1866 if (((ms_setting.mult_key_num > 1)
1867 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1868 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1869 {
1870 c->mlget_task.value_index++;
1871 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1872
1873 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1874 {
1875 c->curr_task.item= mlget_item->item;
1876 c->curr_task.verify= mlget_item->verify;
1877 c->curr_task.finish_verify= mlget_item->finish_verify;
1878 mlget_item->get_miss= false;
1879 }
1880 else
1881 {
1882 /* Try to find the task item in multi-get task array */
1883 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1884 {
1885 mlget_item= &c->mlget_task.mlget_item[i];
1886 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1887 {
1888 c->curr_task.item= mlget_item->item;
1889 c->curr_task.verify= mlget_item->verify;
1890 c->curr_task.finish_verify= mlget_item->finish_verify;
1891 mlget_item->get_miss= false;
1892
1893 break;
1894 }
1895 }
1896 }
1897 }
1898
1899 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1900
1901 c->curr_task.get_miss= false;
1902 c->rbytes-= c->rvbytes;
1903 c->rcurr= c->rcurr + c->rvbytes;
1904 assert(c->rcurr <= (c->rbuf + c->rsize));
1905 c->readval= false;
1906 c->rvbytes= 0;
1907 } /* ms_ascii_complete_nread */
1908
1909
1910 /**
1911 * For binary protocol, after store the data into the local
1912 * buffer, run this function to handle the data.
1913 *
1914 * @param c, pointer of the concurrency
1915 */
1916 static void ms_bin_complete_nread(ms_conn_t *c)
1917 {
1918 assert(c != NULL);
1919 assert(c->rbytes >= c->rvbytes);
1920 assert(c->protocol == binary_prot);
1921
1922 int extlen= c->binary_header.response.extlen;
1923 int keylen= c->binary_header.response.keylen;
1924 uint8_t opcode= c->binary_header.response.opcode;
1925
1926 /* not get command or not include value, just return */
1927 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1928 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1929 || (c->rvbytes <= extlen + keylen))
1930 {
1931 /* get miss */
1932 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1933 {
1934 c->currcmd.retstat= MCD_END;
1935 c->curr_task.get_miss= true;
1936 }
1937
1938 c->readval= false;
1939 c->rvbytes= 0;
1940 ms_reset_conn(c, false);
1941 return;
1942 }
1943
1944 /* multi-get */
1945 ms_mlget_task_item_t *mlget_item= NULL;
1946 if (((ms_setting.mult_key_num > 1)
1947 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1948 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1949 {
1950 c->mlget_task.value_index++;
1951 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1952
1953 c->curr_task.item= mlget_item->item;
1954 c->curr_task.verify= mlget_item->verify;
1955 c->curr_task.finish_verify= mlget_item->finish_verify;
1956 mlget_item->get_miss= false;
1957 }
1958
1959 ms_verify_value(c,
1960 mlget_item,
1961 c->rcurr + extlen + keylen,
1962 c->rvbytes - extlen - keylen);
1963
1964 c->currcmd.retstat= MCD_END;
1965 c->curr_task.get_miss= false;
1966 c->rbytes-= c->rvbytes;
1967 c->rcurr= c->rcurr + c->rvbytes;
1968 assert(c->rcurr <= (c->rbuf + c->rsize));
1969 c->readval= false;
1970 c->rvbytes= 0;
1971
1972 if (ms_setting.mult_key_num > 1)
1973 {
1974 /* multi-get have check all the item */
1975 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1976 {
1977 ms_reset_conn(c, false);
1978 }
1979 }
1980 else
1981 {
1982 /* single get */
1983 ms_reset_conn(c, false);
1984 }
1985 } /* ms_bin_complete_nread */
1986
1987
1988 /**
1989 * we get here after reading the value of get commands.
1990 *
1991 * @param c, pointer of the concurrency
1992 */
1993 static void ms_complete_nread(ms_conn_t *c)
1994 {
1995 assert(c != NULL);
1996 assert(c->rbytes >= c->rvbytes);
1997 assert(c->protocol == ascii_prot
1998 || c->protocol == binary_prot);
1999
2000 if (c->protocol == binary_prot)
2001 {
2002 ms_bin_complete_nread(c);
2003 }
2004 else
2005 {
2006 ms_ascii_complete_nread(c);
2007 }
2008 } /* ms_complete_nread */
2009
2010
2011 /**
2012 * Adds a message header to a connection.
2013 *
2014 * @param c, pointer of the concurrency
2015 *
2016 * @return int, if success, return EXIT_SUCCESS, else return -1
2017 */
2018 static int ms_add_msghdr(ms_conn_t *c)
2019 {
2020 struct msghdr *msg;
2021
2022 assert(c != NULL);
2023
2024 if (c->msgsize == c->msgused)
2025 {
2026 msg=
2027 realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
2028 if (! msg)
2029 return -1;
2030
2031 c->msglist= msg;
2032 c->msgsize*= 2;
2033 }
2034
2035 msg= c->msglist + c->msgused;
2036
2037 /**
2038 * this wipes msg_iovlen, msg_control, msg_controllen, and
2039 * msg_flags, the last 3 of which aren't defined on solaris:
2040 */
2041 memset(msg, 0, sizeof(struct msghdr));
2042
2043 msg->msg_iov= &c->iov[c->iovused];
2044
2045 if (c->udp && (c->srv_recv_addr_size > 0))
2046 {
2047 msg->msg_name= &c->srv_recv_addr;
2048 msg->msg_namelen= c->srv_recv_addr_size;
2049 }
2050
2051 c->msgbytes= 0;
2052 c->msgused++;
2053
2054 if (c->udp)
2055 {
2056 /* Leave room for the UDP header, which we'll fill in later. */
2057 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2058 }
2059
2060 return EXIT_SUCCESS;
2061 } /* ms_add_msghdr */
2062
2063
2064 /**
2065 * Ensures that there is room for another structure iovec in a connection's
2066 * iov list.
2067 *
2068 * @param c, pointer of the concurrency
2069 *
2070 * @return int, if success, return EXIT_SUCCESS, else return -1
2071 */
2072 static int ms_ensure_iov_space(ms_conn_t *c)
2073 {
2074 assert(c != NULL);
2075
2076 if (c->iovused >= c->iovsize)
2077 {
2078 int i, iovnum;
2079 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2080 ((size_t)c->iovsize
2081 * 2)
2082 * sizeof(struct iovec));
2083 if (! new_iov)
2084 return -1;
2085
2086 c->iov= new_iov;
2087 c->iovsize*= 2;
2088
2089 /* Point all the msghdr structures at the new list. */
2090 for (i= 0, iovnum= 0; i < c->msgused; i++)
2091 {
2092 c->msglist[i].msg_iov= &c->iov[iovnum];
2093 iovnum+= (int)c->msglist[i].msg_iovlen;
2094 }
2095 }
2096
2097 return EXIT_SUCCESS;
2098 } /* ms_ensure_iov_space */
2099
2100
2101 /**
2102 * Adds data to the list of pending data that will be written out to a
2103 * connection.
2104 *
2105 * @param c, pointer of the concurrency
2106 * @param buf, the buffer includes data to send
2107 * @param len, the data length in the buffer
2108 *
2109 * @return int, if success, return EXIT_SUCCESS, else return -1
2110 */
2111 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2112 {
2113 struct msghdr *m;
2114 int leftover;
2115 bool limit_to_mtu;
2116
2117 assert(c != NULL);
2118
2119 do
2120 {
2121 m= &c->msglist[c->msgused - 1];
2122
2123 /*
2124 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2125 */
2126 limit_to_mtu= c->udp;
2127
2128 #ifdef IOV_MAX
2129 /* We may need to start a new msghdr if this one is full. */
2130 if ((m->msg_iovlen == IOV_MAX)
2131 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2132 {
2133 ms_add_msghdr(c);
2134 m= &c->msglist[c->msgused - 1];
2135 }
2136 #endif
2137
2138 if (ms_ensure_iov_space(c) != 0)
2139 return -1;
2140
2141 /* If the fragment is too big to fit in the datagram, split it up */
2142 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2143 {
2144 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2145 len-= leftover;
2146 }
2147 else
2148 {
2149 leftover= 0;
2150 }
2151
2152 m= &c->msglist[c->msgused - 1];
2153 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2154 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2155
2156 c->msgbytes+= len;
2157 c->iovused++;
2158 m->msg_iovlen++;
2159
2160 buf= ((char *)buf) + len;
2161 len= leftover;
2162 }
2163 while (leftover > 0);
2164
2165 return EXIT_SUCCESS;
2166 } /* ms_add_iov */
2167
2168
2169 /**
2170 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2171 *
2172 * @param c, pointer of the concurrency
2173 *
2174 * @return int, if success, return EXIT_SUCCESS, else return -1
2175 */
2176 static int ms_build_udp_headers(ms_conn_t *c)
2177 {
2178 int i;
2179 unsigned char *hdr;
2180
2181 assert(c != NULL);
2182
2183 c->request_id= ms_get_udp_request_id();
2184
2185 if (c->msgused > c->hdrsize)
2186 {
2187 void *new_hdrbuf;
2188 if (c->hdrbuf)
2189 new_hdrbuf= realloc(c->hdrbuf,
2190 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2191 else
2192 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2193 if (! new_hdrbuf)
2194 return -1;
2195
2196 c->hdrbuf= (unsigned char *)new_hdrbuf;
2197 c->hdrsize= c->msgused * 2;
2198 }
2199
2200 /* If this is a multi-packet request, drop it. */
2201 if (c->udp && (c->msgused > 1))
2202 {
2203 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2204 return -1;
2205 }
2206
2207 hdr= c->hdrbuf;
2208 for (i= 0; i < c->msgused; i++)
2209 {
2210 c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2211 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2212 *hdr++= (unsigned char)(c->request_id / 256);
2213 *hdr++= (unsigned char)(c->request_id % 256);
2214 *hdr++= (unsigned char)(i / 256);
2215 *hdr++= (unsigned char)(i % 256);
2216 *hdr++= (unsigned char)(c->msgused / 256);
2217 *hdr++= (unsigned char)(c->msgused % 256);
2218 *hdr++= (unsigned char)1; /* support facebook memcached */
2219 *hdr++= (unsigned char)0;
2220 assert(hdr ==
2221 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2222 + UDP_HEADER_SIZE));
2223 }
2224
2225 return EXIT_SUCCESS;
2226 } /* ms_build_udp_headers */
2227
2228
2229 /**
2230 * Transmit the next chunk of data from our list of msgbuf structures.
2231 *
2232 * @param c, pointer of the concurrency
2233 *
2234 * @return TRANSMIT_COMPLETE All done writing.
2235 * TRANSMIT_INCOMPLETE More data remaining to write.
2236 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2237 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2238 */
2239 static int ms_transmit(ms_conn_t *c)
2240 {
2241 assert(c != NULL);
2242
2243 if ((c->msgcurr < c->msgused)
2244 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2245 {
2246 /* Finished writing the current msg; advance to the next. */
2247 c->msgcurr++;
2248 }
2249
2250 if (c->msgcurr < c->msgused)
2251 {
2252 ssize_t res;
2253 struct msghdr *m= &c->msglist[c->msgcurr];
2254
2255 res= sendmsg(c->sfd, m, 0);
2256 if (res > 0)
2257 {
2258 atomic_add_size(&ms_stats.bytes_written, res);
2259
2260 /* We've written some of the data. Remove the completed
2261 * iovec entries from the list of pending writes. */
2262 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2263 {
2264 res-= (ssize_t)m->msg_iov->iov_len;
2265 m->msg_iovlen--;
2266 m->msg_iov++;
2267 }
2268
2269 /* Might have written just part of the last iovec entry;
2270 * adjust it so the next write will do the rest. */
2271 if (res > 0)
2272 {
2273 m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2274 m->msg_iov->iov_len-= (size_t)res;
2275 }
2276 return TRANSMIT_INCOMPLETE;
2277 }
2278 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2279 {
2280 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2281 {
2282 fprintf(stderr, "Couldn't update event.\n");
2283 ms_conn_set_state(c, conn_closing);
2284 return TRANSMIT_HARD_ERROR;
2285 }
2286 return TRANSMIT_SOFT_ERROR;
2287 }
2288
2289 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2290 * we have a real error, on which we close the connection */
2291 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2292
2293 ms_conn_set_state(c, conn_closing);
2294 return TRANSMIT_HARD_ERROR;
2295 }
2296 else
2297 {
2298 return TRANSMIT_COMPLETE;
2299 }
2300 } /* ms_transmit */
2301
2302
2303 /**
2304 * Shrinks a connection's buffers if they're too big. This prevents
2305 * periodic large "mget" response from server chewing lots of client
2306 * memory.
2307 *
2308 * This should only be called in between requests since it can wipe output
2309 * buffers!
2310 *
2311 * @param c, pointer of the concurrency
2312 */
2313 static void ms_conn_shrink(ms_conn_t *c)
2314 {
2315 assert(c != NULL);
2316
2317 if (c->udp)
2318 return;
2319
2320 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2321 {
2322 char *newbuf;
2323
2324 if (c->rcurr != c->rbuf)
2325 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2326
2327 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2328
2329 if (newbuf)
2330 {
2331 c->rbuf= newbuf;
2332 c->rsize= DATA_BUFFER_SIZE;
2333 }
2334 c->rcurr= c->rbuf;
2335 }
2336
2337 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2338 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2339 {
2340 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2341 if (! new_rbuf)
2342 {
2343 c->rudpbuf= new_rbuf;
2344 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2345 }
2346 /* TODO check error condition? */
2347 }
2348
2349 if (c->msgsize > MSG_LIST_HIGHWAT)
2350 {
2351 struct msghdr *newbuf= (struct msghdr *)realloc(
2352 (void *)c->msglist,
2353 MSG_LIST_INITIAL
2354 * sizeof(c->msglist[0]));
2355 if (newbuf)
2356 {
2357 c->msglist= newbuf;
2358 c->msgsize= MSG_LIST_INITIAL;
2359 }
2360 /* TODO check error condition? */
2361 }
2362
2363 if (c->iovsize > IOV_LIST_HIGHWAT)
2364 {
2365 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2366 IOV_LIST_INITIAL
2367 * sizeof(c->iov[0]));
2368 if (newbuf)
2369 {
2370 c->iov= newbuf;
2371 c->iovsize= IOV_LIST_INITIAL;
2372 }
2373 /* TODO check return value */
2374 }
2375 } /* ms_conn_shrink */
2376
2377
2378 /**
2379 * Sets a connection's current state in the state machine. Any special
2380 * processing that needs to happen on certain state transitions can
2381 * happen here.
2382 *
2383 * @param c, pointer of the concurrency
2384 * @param state, connection state
2385 */
2386 static void ms_conn_set_state(ms_conn_t *c, int state)
2387 {
2388 assert(c != NULL);
2389
2390 if (state != c->state)
2391 {
2392 if (state == conn_read)
2393 {
2394 ms_conn_shrink(c);
2395 }
2396 c->state= state;
2397 }
2398 } /* ms_conn_set_state */
2399
2400
2401 /**
2402 * update the event if socks change state. for example: when
2403 * change the listen scoket read event to sock write event, or
2404 * change socket handler, we could call this function.
2405 *
2406 * @param c, pointer of the concurrency
2407 * @param new_flags, new event flags
2408 *
2409 * @return bool, if success, return true, else return false
2410 */
2411 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2412 {
2413 assert(c != NULL);
2414
2415 struct event_base *base= c->event.ev_base;
2416 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2417 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2418 {
2419 return true;
2420 }
2421
2422 if (event_del(&c->event) == -1)
2423 {
2424 /* try to delete the event again */
2425 if (event_del(&c->event) == -1)
2426 {
2427 return false;
2428 }
2429 }
2430
2431 event_set(&c->event,
2432 c->sfd,
2433 (short)new_flags,
2434 ms_event_handler,
2435 (void *)c);
2436 event_base_set(base, &c->event);
2437 c->ev_flags= (short)new_flags;
2438
2439 if (event_add(&c->event, NULL) == -1)
2440 {
2441 return false;
2442 }
2443
2444 return true;
2445 } /* ms_update_event */
2446
2447
2448 /**
2449 * If user want to get the expected throughput, we could limit
2450 * the performance of memslap. we could give up some work and
2451 * just wait a short time. The function is used to check this
2452 * case.
2453 *
2454 * @param c, pointer of the concurrency
2455 *
2456 * @return bool, if success, return true, else return false
2457 */
2458 static bool ms_need_yield(ms_conn_t *c)
2459 {
2460 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2461 int64_t tps= 0;
2462 int64_t time_diff= 0;
2463 struct timeval curr_time;
2464 ms_task_t *task= &c->curr_task;
2465
2466 if (ms_setting.expected_tps > 0)
2467 {
2468 gettimeofday(&curr_time, NULL);
2469 time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2470 tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000);
2471
2472 /* current throughput is greater than expected throughput */
2473 if (tps > ms_thread->thread_ctx->tps_perconn)
2474 {
2475 return true;
2476 }
2477 }
2478
2479 return false;
2480 } /* ms_need_yield */
2481
2482
2483 /**
2484 * used to update the start time of each operation
2485 *
2486 * @param c, pointer of the concurrency
2487 */
2488 static void ms_update_start_time(ms_conn_t *c)
2489 {
2490 ms_task_item_t *item= c->curr_task.item;
2491
2492 if ((ms_setting.stat_freq > 0) || c->udp
2493 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2494 {
2495 gettimeofday(&c->start_time, NULL);
2496 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2497 {
2498 /* record the current time */
2499 item->client_time= c->start_time.tv_sec;
2500 }
2501 }
2502 } /* ms_update_start_time */
2503
2504
2505 /**
2506 * run the state machine
2507 *
2508 * @param c, pointer of the concurrency
2509 */
2510 static void ms_drive_machine(ms_conn_t *c)
2511 {
2512 bool stop= false;
2513
2514 assert(c != NULL);
2515
2516 while (! stop)
2517 {
2518 switch (c->state)
2519 {
2520 case conn_read:
2521 if (c->readval)
2522 {
2523 if (c->rbytes >= c->rvbytes)
2524 {
2525 ms_complete_nread(c);
2526 break;
2527 }
2528 }
2529 else
2530 {
2531 if (ms_try_read_line(c) != 0)
2532 {
2533 break;
2534 }
2535 }
2536
2537 if (ms_try_read_network(c) != 0)
2538 {
2539 break;
2540 }
2541
2542 /* doesn't read all the response data, wait event wake up */
2543 if (! c->currcmd.isfinish)
2544 {
2545 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2546 {
2547 fprintf(stderr, "Couldn't update event.\n");
2548 ms_conn_set_state(c, conn_closing);
2549 break;
2550 }
2551 stop= true;
2552 break;
2553 }
2554
2555 /* we have no command line and no data to read from network, next write */
2556 ms_conn_set_state(c, conn_write);
2557 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2558
2559 break;
2560
2561 case conn_write:
2562 if (! c->ctnwrite && ms_need_yield(c))
2563 {
2564 usleep(10);
2565
2566 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2567 {
2568 fprintf(stderr, "Couldn't update event.\n");
2569 ms_conn_set_state(c, conn_closing);
2570 break;
2571 }
2572 stop= true;
2573 break;
2574 }
2575
2576 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2577 {
2578 ms_conn_set_state(c, conn_closing);
2579 break;
2580 }
2581
2582 /* record the start time before starting to send data if necessary */
2583 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2584 {
2585 if (c->change_sfd)
2586 {
2587 c->change_sfd= false;
2588 }
2589 ms_update_start_time(c);
2590 }
2591
2592 /* change sfd if necessary */
2593 if (c->change_sfd)
2594 {
2595 c->ctnwrite= true;
2596 stop= true;
2597 break;
2598 }
2599
2600 /* execute task until nothing need be written to network */
2601 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2602 {
2603 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2604 {
2605 fprintf(stderr, "Couldn't update event.\n");
2606 ms_conn_set_state(c, conn_closing);
2607 break;
2608 }
2609 stop= true;
2610 break;
2611 }
2612
2613 switch (ms_transmit(c))
2614 {
2615 case TRANSMIT_COMPLETE:
2616 /* we have no data to write to network, next wait repose */
2617 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2618 {
2619 fprintf(stderr, "Couldn't update event.\n");
2620 ms_conn_set_state(c, conn_closing);
2621 c->ctnwrite= false;
2622 break;
2623 }
2624 ms_conn_set_state(c, conn_read);
2625 c->ctnwrite= false;
2626 stop= true;
2627 break;
2628
2629 case TRANSMIT_INCOMPLETE:
2630 c->ctnwrite= true;
2631 break; /* Continue in state machine. */
2632
2633 case TRANSMIT_HARD_ERROR:
2634 c->ctnwrite= false;
2635 break;
2636
2637 case TRANSMIT_SOFT_ERROR:
2638 c->ctnwrite= true;
2639 stop= true;
2640 break;
2641
2642 default:
2643 break;
2644 } /* switch */
2645
2646 break;
2647
2648 case conn_closing:
2649 /* recovery mode, need reconnect if connection close */
2650 if (ms_setting.reconnect && (! ms_global.time_out
2651 || ((ms_setting.run_time == 0)
2652 && (c->remain_exec_num > 0))))
2653 {
2654 if (ms_reconn(c) != 0)
2655 {
2656 ms_conn_close(c);
2657 stop= true;
2658 break;
2659 }
2660
2661 ms_reset_conn(c, false);
2662
2663 if (c->total_sfds == 1)
2664 {
2665 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2666 {
2667 fprintf(stderr, "Couldn't update event.\n");
2668 ms_conn_set_state(c, conn_closing);
2669 break;
2670 }
2671 }
2672
2673 break;
2674 }
2675 else
2676 {
2677 ms_conn_close(c);
2678 stop= true;
2679 break;
2680 }
2681
2682 default:
2683 assert(0);
2684 } /* switch */
2685 }
2686 } /* ms_drive_machine */
2687
2688
2689 /**
2690 * the event handler of each thread
2691 *
2692 * @param fd, the file descriptor of socket
2693 * @param which, event flag
2694 * @param arg, argument
2695 */
2696 void ms_event_handler(const int fd, const short which, void *arg)
2697 {
2698 ms_conn_t *c= (ms_conn_t *)arg;
2699
2700 assert(c != NULL);
2701
2702 c->which= which;
2703
2704 /* sanity */
2705 if (fd != c->sfd)
2706 {
2707 fprintf(stderr,
2708 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2709 fd,
2710 c->sfd);
2711 ms_conn_close(c);
2712 exit(1);
2713 }
2714 assert(fd == c->sfd);
2715
2716 ms_drive_machine(c);
2717
2718 /* wait for next event */
2719 } /* ms_event_handler */
2720
2721
2722 /**
2723 * get the next socket descriptor index to run for replication
2724 *
2725 * @param c, pointer of the concurrency
2726 * @param cmd, command(get or set )
2727 *
2728 * @return int, if success, return the index, else return EXIT_SUCCESS
2729 */
2730 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2731 {
2732 uint32_t sock_index= 0;
2733 uint32_t i= 0;
2734
2735 if (c->total_sfds == 1)
2736 {
2737 return EXIT_SUCCESS;
2738 }
2739
2740 if (ms_setting.rep_write_srv == 0)
2741 {
2742 return sock_index;
2743 }
2744
2745 do
2746 {
2747 if (cmd == CMD_SET)
2748 {
2749 for (i= 0; i < ms_setting.rep_write_srv; i++)
2750 {
2751 if (c->tcpsfd[i] > 0)
2752 {
2753 break;
2754 }
2755 }
2756
2757 if (i == ms_setting.rep_write_srv)
2758 {
2759 /* random get one replication server to read */
2760 sock_index= (uint32_t)random() % c->total_sfds;
2761 }
2762 else
2763 {
2764 /* random get one replication writing server to write */
2765 sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
2766 }
2767 }
2768 else if (cmd == CMD_GET)
2769 {
2770 /* random get one replication server to read */
2771 sock_index= (uint32_t)random() % c->total_sfds;
2772 }
2773 }
2774 while (c->tcpsfd[sock_index] == 0);
2775
2776 return sock_index;
2777 } /* ms_get_rep_sock_index */
2778
2779
2780 /**
2781 * get the next socket descriptor index to run
2782 *
2783 * @param c, pointer of the concurrency
2784 *
2785 * @return int, return the index
2786 */
2787 static uint32_t ms_get_next_sock_index(ms_conn_t *c)
2788 {
2789 uint32_t sock_index= 0;
2790
2791 do
2792 {
2793 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2794 }
2795 while (c->tcpsfd[sock_index] == 0);
2796
2797 return sock_index;
2798 } /* ms_get_next_sock_index */
2799
2800
2801 /**
2802 * update socket event of the connections
2803 *
2804 * @param c, pointer of the concurrency
2805 *
2806 * @return int, if success, return EXIT_SUCCESS, else return -1
2807 */
2808 static int ms_update_conn_sock_event(ms_conn_t *c)
2809 {
2810 assert(c != NULL);
2811
2812 switch (c->currcmd.cmd)
2813 {
2814 case CMD_SET:
2815 if (ms_setting.facebook_test && c->udp)
2816 {
2817 c->sfd= c->tcpsfd[0];
2818 c->udp= false;
2819 c->change_sfd= true;
2820 }
2821 break;
2822
2823 case CMD_GET:
2824 if (ms_setting.facebook_test && ! c->udp)
2825 {
2826 c->sfd= c->udpsfd;
2827 c->udp= true;
2828 c->change_sfd= true;
2829 }
2830 break;
2831
2832 default:
2833 break;
2834 } /* switch */
2835
2836 if (! c->udp && (c->total_sfds > 1))
2837 {
2838 if (c->cur_idx != c->total_sfds)
2839 {
2840 if (ms_setting.rep_write_srv == 0)
2841 {
2842 c->cur_idx= ms_get_next_sock_index(c);
2843 }
2844 else
2845 {
2846 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2847 }
2848 }
2849 else
2850 {
2851 /* must select the first sock of the connection at the beginning */
2852 c->cur_idx= 0;
2853 }
2854
2855 c->sfd= c->tcpsfd[c->cur_idx];
2856 assert(c->sfd != 0);
2857 c->change_sfd= true;
2858 }
2859
2860 if (c->change_sfd)
2861 {
2862 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2863 {
2864 fprintf(stderr, "Couldn't update event.\n");
2865 ms_conn_set_state(c, conn_closing);
2866 return -1;
2867 }
2868 }
2869
2870 return EXIT_SUCCESS;
2871 } /* ms_update_conn_sock_event */
2872
2873
2874 /**
2875 * for ASCII protocol, this function build the set command
2876 * string and send the command.
2877 *
2878 * @param c, pointer of the concurrency
2879 * @param item, pointer of task item which includes the object
2880 * information
2881 *
2882 * @return int, if success, return EXIT_SUCCESS, else return -1
2883 */
2884 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2885 {
2886 int value_offset;
2887 int write_len;
2888 char *buffer= c->wbuf;
2889
2890 write_len= snprintf(buffer,
2891 c->wsize,
2892 " %u %d %d\r\n",
2893 0,
2894 item->exp_time,
2895 item->value_size);
2896
2897 if (write_len > c->wsize || write_len < 0)
2898 {
2899 /* ought to be always enough. just fail for simplicity */
2900 fprintf(stderr, "output command line too long.\n");
2901 return -1;
2902 }
2903
2904 if (item->value_offset == INVALID_OFFSET)
2905 {
2906 value_offset= item->key_suffix_offset;
2907 }
2908 else
2909 {
2910 value_offset= item->value_offset;
2911 }
2912
2913 if ((ms_add_iov(c, "set ", 4) != 0)
2914 || (ms_add_iov(c, (char *)&item->key_prefix,
2915 (int)KEY_PREFIX_SIZE) != 0)
2916 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2917 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2918 || (ms_add_iov(c, buffer, write_len) != 0)
2919 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2920 item->value_size) != 0)
2921 || (ms_add_iov(c, "\r\n", 2) != 0)
2922 || (c->udp && (ms_build_udp_headers(c) != 0)))
2923 {
2924 return -1;
2925 }
2926
2927 return EXIT_SUCCESS;
2928 } /* ms_build_ascii_write_buf_set */
2929
2930
2931 /**
2932 * used to send set command to server
2933 *
2934 * @param c, pointer of the concurrency
2935 * @param item, pointer of task item which includes the object
2936 * information
2937 *
2938 * @return int, if success, return EXIT_SUCCESS, else return -1
2939 */
2940 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2941 {
2942 assert(c != NULL);
2943
2944 c->currcmd.cmd= CMD_SET;
2945 c->currcmd.isfinish= false;
2946 c->currcmd.retstat= MCD_FAILURE;
2947
2948 if (ms_update_conn_sock_event(c) != 0)
2949 {
2950 return -1;
2951 }
2952
2953 c->msgcurr= 0;
2954 c->msgused= 0;
2955 c->iovused= 0;
2956 if (ms_add_msghdr(c) != 0)
2957 {
2958 fprintf(stderr, "Out of memory preparing request.");
2959 return -1;
2960 }
2961
2962 /* binary protocol */
2963 if (c->protocol == binary_prot)
2964 {
2965 if (ms_build_bin_write_buf_set(c, item) != 0)
2966 {
2967 return -1;
2968 }
2969 }
2970 else
2971 {
2972 if (ms_build_ascii_write_buf_set(c, item) != 0)
2973 {
2974 return -1;
2975 }
2976 }
2977
2978 atomic_add_size(&ms_stats.obj_bytes,
2979 item->key_size + item->value_size);
2980 atomic_add_size(&ms_stats.cmd_set, 1);
2981
2982 return EXIT_SUCCESS;
2983 } /* ms_mcd_set */
2984
2985
2986 /**
2987 * for ASCII protocol, this function build the get command
2988 * string and send the command.
2989 *
2990 * @param c, pointer of the concurrency
2991 * @param item, pointer of task item which includes the object
2992 * information
2993 *
2994 * @return int, if success, return EXIT_SUCCESS, else return -1
2995 */
2996 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
2997 {
2998 if ((ms_add_iov(c, "get ", 4) != 0)
2999 || (ms_add_iov(c, (char *)&item->key_prefix,
3000 (int)KEY_PREFIX_SIZE) != 0)
3001 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3002 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3003 || (ms_add_iov(c, "\r\n", 2) != 0)
3004 || (c->udp && (ms_build_udp_headers(c) != 0)))
3005 {
3006 return -1;
3007 }
3008
3009 return EXIT_SUCCESS;
3010 } /* ms_build_ascii_write_buf_get */
3011
3012
3013 /**
3014 * used to send the get command to server
3015 *
3016 * @param c, pointer of the concurrency
3017 * @param item, pointer of task item which includes the object
3018 * information
3019 *
3020 * @return int, if success, return EXIT_SUCCESS, else return -1
3021 */
3022 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
3023 {
3024 assert(c != NULL);
3025
3026 c->currcmd.cmd= CMD_GET;
3027 c->currcmd.isfinish= false;
3028 c->currcmd.retstat= MCD_FAILURE;
3029
3030 if (ms_update_conn_sock_event(c) != 0)
3031 {
3032 return -1;
3033 }
3034
3035 c->msgcurr= 0;
3036 c->msgused= 0;
3037 c->iovused= 0;
3038 if (ms_add_msghdr(c) != 0)
3039 {
3040 fprintf(stderr, "Out of memory preparing request.");
3041 return -1;
3042 }
3043
3044 /* binary protocol */
3045 if (c->protocol == binary_prot)
3046 {
3047 if (ms_build_bin_write_buf_get(c, item) != 0)
3048 {
3049 return -1;
3050 }
3051 }
3052 else
3053 {
3054 if (ms_build_ascii_write_buf_get(c, item) != 0)
3055 {
3056 return -1;
3057 }
3058 }
3059
3060 atomic_add_size(&ms_stats.cmd_get, 1);
3061
3062 return EXIT_SUCCESS;
3063 } /* ms_mcd_get */
3064
3065
3066 /**
3067 * for ASCII protocol, this function build the multi-get command
3068 * string and send the command.
3069 *
3070 * @param c, pointer of the concurrency
3071 *
3072 * @return int, if success, return EXIT_SUCCESS, else return -1
3073 */
3074 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3075 {
3076 ms_task_item_t *item;
3077
3078 if (ms_add_iov(c, "get", 3) != 0)
3079 {
3080 return -1;
3081 }
3082
3083 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3084 {
3085 item= c->mlget_task.mlget_item[i].item;
3086 assert(item != NULL);
3087 if ((ms_add_iov(c, " ", 1) != 0)
3088 || (ms_add_iov(c, (char *)&item->key_prefix,
3089 (int)KEY_PREFIX_SIZE) != 0)
3090 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3091 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3092 {
3093 return -1;
3094 }
3095 }
3096
3097 if ((ms_add_iov(c, "\r\n", 2) != 0)
3098 || (c->udp && (ms_build_udp_headers(c) != 0)))
3099 {
3100 return -1;
3101 }
3102
3103 return EXIT_SUCCESS;
3104 } /* ms_build_ascii_write_buf_mlget */
3105
3106
3107 /**
3108 * used to send the multi-get command to server
3109 *
3110 * @param c, pointer of the concurrency
3111 *
3112 * @return int, if success, return EXIT_SUCCESS, else return -1
3113 */
3114 int ms_mcd_mlget(ms_conn_t *c)
3115 {
3116 ms_task_item_t *item;
3117
3118 assert(c != NULL);
3119 assert(c->mlget_task.mlget_num >= 1);
3120
3121 c->currcmd.cmd= CMD_GET;
3122 c->currcmd.isfinish= false;
3123 c->currcmd.retstat= MCD_FAILURE;
3124
3125 if (ms_update_conn_sock_event(c) != 0)
3126 {
3127 return -1;
3128 }
3129
3130 c->msgcurr= 0;
3131 c->msgused= 0;
3132 c->iovused= 0;
3133 if (ms_add_msghdr(c) != 0)
3134 {
3135 fprintf(stderr, "Out of memory preparing request.");
3136 return -1;
3137 }
3138
3139 /* binary protocol */
3140 if (c->protocol == binary_prot)
3141 {
3142 if (ms_build_bin_write_buf_mlget(c) != 0)
3143 {
3144 return -1;
3145 }
3146 }
3147 else
3148 {
3149 if (ms_build_ascii_write_buf_mlget(c) != 0)
3150 {
3151 return -1;
3152 }
3153 }
3154
3155 /* decrease operation time of each item */
3156 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3157 {
3158 item= c->mlget_task.mlget_item[i].item;
3159 atomic_add_size(&ms_stats.cmd_get, 1);
3160 }
3161
3162 return EXIT_SUCCESS;
3163 } /* ms_mcd_mlget */
3164
3165
3166 /**
3167 * binary protocol support
3168 */
3169
3170 /**
3171 * for binary protocol, parse the response of server
3172 *
3173 * @param c, pointer of the concurrency
3174 *
3175 * @return int, if success, return EXIT_SUCCESS, else return -1
3176 */
3177 static int ms_bin_process_response(ms_conn_t *c)
3178 {
3179 const char *errstr= NULL;
3180
3181 assert(c != NULL);
3182
3183 uint32_t bodylen= c->binary_header.response.bodylen;
3184 uint8_t opcode= c->binary_header.response.opcode;
3185 uint16_t status= c->binary_header.response.status;
3186
3187 if (bodylen > 0)
3188 {
3189 c->rvbytes= (int32_t)bodylen;
3190 c->readval= true;
3191 return EXIT_FAILURE;
3192 }
3193 else
3194 {
3195 switch (status)
3196 {
3197 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3198 if (opcode == PROTOCOL_BINARY_CMD_SET)
3199 {
3200 c->currcmd.retstat= MCD_STORED;
3201 }
3202 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3203 {
3204 c->currcmd.retstat= MCD_DELETED;
3205 }
3206 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3207 {
3208 c->currcmd.retstat= MCD_END;
3209 }
3210 break;
3211
3212 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3213 errstr= "Out of memory";
3214 c->currcmd.retstat= MCD_SERVER_ERROR;
3215 break;
3216
3217 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3218 errstr= "Unknown command";
3219 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3220 break;
3221
3222 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3223 errstr= "Not found";
3224 c->currcmd.retstat= MCD_NOTFOUND;
3225 break;
3226
3227 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3228 errstr= "Invalid arguments";
3229 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3230 break;
3231
3232 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3233 errstr= "Data exists for key.";
3234 break;
3235
3236 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3237 errstr= "Too large.";
3238 c->currcmd.retstat= MCD_SERVER_ERROR;
3239 break;
3240
3241 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3242 errstr= "Not stored.";
3243 c->currcmd.retstat= MCD_NOTSTORED;
3244 break;
3245
3246 default:
3247 errstr= "Unknown error";
3248 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3249 break;
3250 } /* switch */
3251
3252 if (errstr != NULL)
3253 {
3254 fprintf(stderr, "%s\n", errstr);
3255 }
3256 }
3257
3258 return EXIT_SUCCESS;
3259 } /* ms_bin_process_response */
3260
3261
3262 /* build binary header and add the header to the buffer to send */
3263
3264 /**
3265 * build binary header and add the header to the buffer to send
3266 *
3267 * @param c, pointer of the concurrency
3268 * @param opcode, operation code
3269 * @param hdr_len, length of header
3270 * @param key_len, length of key
3271 * @param body_len. length of body
3272 */
3273 static void ms_add_bin_header(ms_conn_t *c,
3274 uint8_t opcode,
3275 uint8_t hdr_len,
3276 uint16_t key_len,
3277 uint32_t body_len)
3278 {
3279 protocol_binary_request_header *header;
3280
3281 assert(c != NULL);
3282
3283 header= (protocol_binary_request_header *)c->wcurr;
3284
3285 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3286 header->request.opcode= (uint8_t)opcode;
3287 header->request.keylen= htons(key_len);
3288
3289 header->request.extlen= (uint8_t)hdr_len;
3290 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3291 header->request.vbucket= 0;
3292
3293 header->request.bodylen= htonl(body_len);
3294 header->request.opaque= 0;
3295 header->request.cas= 0;
3296
3297 ms_add_iov(c, c->wcurr, sizeof(header->request));
3298 } /* ms_add_bin_header */
3299
3300
3301 /**
3302 * add the key to the socket write buffer array
3303 *
3304 * @param c, pointer of the concurrency
3305 * @param item, pointer of task item which includes the object
3306 * information
3307 */
3308 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3309 {
3310 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3311 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3312 item->key_size - (int)KEY_PREFIX_SIZE);
3313 }
3314
3315
3316 /**
3317 * for binary protocol, this function build the set command
3318 * and add the command to send buffer array.
3319 *
3320 * @param c, pointer of the concurrency
3321 * @param item, pointer of task item which includes the object
3322 * information
3323 *
3324 * @return int, if success, return EXIT_SUCCESS, else return -1
3325 */
3326 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3327 {
3328 assert(c->wbuf == c->wcurr);
3329
3330 int value_offset;
3331 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3332 uint16_t keylen= (uint16_t)item->key_size;
3333 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3334 + (uint32_t)keylen + (uint32_t)item->value_size;
3335
3336 ms_add_bin_header(c,
3337 PROTOCOL_BINARY_CMD_SET,
3338 sizeof(rep->message.body),
3339 keylen,
3340 bodylen);
3341 rep->message.body.flags= 0;
3342 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3343 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3344 ms_add_key_to_iov(c, item);
3345
3346 if (item->value_offset == INVALID_OFFSET)
3347 {
3348 value_offset= item->key_suffix_offset;
3349 }
3350 else
3351 {
3352 value_offset= item->value_offset;
3353 }
3354 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3355
3356 return EXIT_SUCCESS;
3357 } /* ms_build_bin_write_buf_set */
3358
3359
3360 /**
3361 * for binary protocol, this function build the get command and
3362 * add the command to send buffer array.
3363 *
3364 * @param c, pointer of the concurrency
3365 * @param item, pointer of task item which includes the object
3366 * information
3367 *
3368 * @return int, if success, return EXIT_SUCCESS, else return -1
3369 */
3370 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3371 {
3372 assert(c->wbuf == c->wcurr);
3373
3374 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3375 (uint32_t)item->key_size);
3376 ms_add_key_to_iov(c, item);
3377
3378 return EXIT_SUCCESS;
3379 } /* ms_build_bin_write_buf_get */
3380
3381
3382 /**
3383 * for binary protocol, this function build the multi-get
3384 * command and add the command to send buffer array.
3385 *
3386 * @param c, pointer of the concurrency
3387 * @param item, pointer of task item which includes the object
3388 * information
3389 *
3390 * @return int, if success, return EXIT_SUCCESS, else return -1
3391 */
3392 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3393 {
3394 ms_task_item_t *item;
3395
3396 assert(c->wbuf == c->wcurr);
3397
3398 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3399 {
3400 item= c->mlget_task.mlget_item[i].item;
3401 assert(item != NULL);
3402
3403 ms_add_bin_header(c,
3404 PROTOCOL_BINARY_CMD_GET,
3405 0,
3406 (uint16_t)item->key_size,
3407 (uint32_t)item->key_size);
3408 ms_add_key_to_iov(c, item);
3409 c->wcurr+= sizeof(protocol_binary_request_get);
3410 }
3411
3412 c->wcurr= c->wbuf;
3413
3414 return EXIT_SUCCESS;
3415 } /* ms_build_bin_write_buf_mlget */