2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
16 #include "mem_config.h"
24 #include <netinet/tcp.h>
25 #include <netinet/in.h>
27 #if defined(HAVE_ARPA_INET_H)
28 # include <arpa/inet.h>
31 #if defined(HAVE_SYS_TIME_H)
32 # include <sys/time.h>
35 #if defined(HAVE_TIME_H)
39 #include "ms_setting.h"
40 #include "ms_thread.h"
41 #include "ms_atomic.h"
44 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
45 * optimize the conversion functions, but the prototypes generate warnings
46 * from gcc. The conversion methods isn't the bottleneck for my app, so
47 * just remove the warnings by undef'ing the optimization ..
55 /* for network write */
56 #define TRANSMIT_COMPLETE 0
57 #define TRANSMIT_INCOMPLETE 1
58 #define TRANSMIT_SOFT_ERROR 2
59 #define TRANSMIT_HARD_ERROR 3
61 /* for generating key */
62 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
63 #define KEY_PREFIX_MASK 0x1010101010101010
65 /* For parse the value length return by server */
67 #define VALUELEN_TOKEN 3
69 /* global increasing counter, to ensure the key prefix unique */
70 static uint64_t key_prefix_seq
= KEY_PREFIX_BASE
;
72 /* global increasing counter, generating request id for UDP */
73 static ATOMIC
uint32_t udp_request_id
= 0;
75 extern pthread_key_t ms_thread_key
;
77 /* generate upd request id */
78 static uint32_t ms_get_udp_request_id(void);
80 /* connect initialize */
81 static void ms_task_init(ms_conn_t
*c
);
82 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
);
83 static int ms_conn_sock_init(ms_conn_t
*c
);
84 static int ms_conn_event_init(ms_conn_t
*c
);
85 static int ms_conn_init(ms_conn_t
*c
, const int init_state
, const int read_buffer_size
,
87 static void ms_warmup_num_init(ms_conn_t
*c
);
88 static int ms_item_win_init(ms_conn_t
*c
);
90 /* connection close */
91 void ms_conn_free(ms_conn_t
*c
);
92 static void ms_conn_close(ms_conn_t
*c
);
94 /* create network connection */
95 static int ms_new_socket(struct addrinfo
*ai
);
96 static void ms_maximize_sndbuf(const int sfd
);
97 static int ms_network_connect(ms_conn_t
*c
, char *srv_host_name
, const int srv_port
,
98 const bool is_udp
, int *ret_sfd
);
99 static int ms_reconn(ms_conn_t
*c
);
102 static int ms_tokenize_command(char *command
, token_t
*tokens
, const int max_tokens
);
103 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
);
104 static int ms_try_read_line(ms_conn_t
*c
);
105 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
);
106 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
);
107 static int ms_try_read_network(ms_conn_t
*c
);
108 static void ms_verify_value(ms_conn_t
*c
, ms_mlget_task_item_t
*mlget_item
, char *value
, int vlen
);
109 static void ms_ascii_complete_nread(ms_conn_t
*c
);
110 static void ms_bin_complete_nread(ms_conn_t
*c
);
111 static void ms_complete_nread(ms_conn_t
*c
);
114 static int ms_add_msghdr(ms_conn_t
*c
);
115 static int ms_ensure_iov_space(ms_conn_t
*c
);
116 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
);
117 static int ms_build_udp_headers(ms_conn_t
*c
);
118 static int ms_transmit(ms_conn_t
*c
);
120 /* status adjustment */
121 static void ms_conn_shrink(ms_conn_t
*c
);
122 static void ms_conn_set_state(ms_conn_t
*c
, int state
);
123 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
);
124 static uint32_t ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
);
125 static uint32_t ms_get_next_sock_index(ms_conn_t
*c
);
126 static int ms_update_conn_sock_event(ms_conn_t
*c
);
127 static bool ms_need_yield(ms_conn_t
*c
);
128 static void ms_update_start_time(ms_conn_t
*c
);
131 static void ms_drive_machine(ms_conn_t
*c
);
132 void ms_event_handler(const int fd
, const short which
, void *arg
);
135 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
136 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
137 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
);
139 /* binary protocol */
140 static int ms_bin_process_response(ms_conn_t
*c
);
141 static void ms_add_bin_header(ms_conn_t
*c
, uint8_t opcode
, uint8_t hdr_len
, uint16_t key_len
,
143 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
);
144 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
145 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
146 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
);
149 * each key has two parts, prefix and suffix. The suffix is a
150 * string random get form the character table. The prefix is a
151 * uint64_t variable. And the prefix must be unique. we use the
152 * prefix to identify a key. And the prefix can't include
153 * character ' ' '\r' '\n' '\0'.
157 uint64_t ms_get_key_prefix(void) {
160 pthread_mutex_lock(&ms_global
.seq_mutex
);
161 key_prefix_seq
|= KEY_PREFIX_MASK
;
162 key_prefix
= key_prefix_seq
;
164 pthread_mutex_unlock(&ms_global
.seq_mutex
);
167 } /* ms_get_key_prefix */
170 * get an unique udp request id
172 * @return an unique UDP request id
174 static uint32_t ms_get_udp_request_id(void) {
175 return atomic_add_32_nv(&udp_request_id
, 1);
179 * initialize current task structure
181 * @param c, pointer of the concurrency
183 static void ms_task_init(ms_conn_t
*c
) {
184 c
->curr_task
.cmd
= CMD_NULL
;
185 c
->curr_task
.item
= 0;
186 c
->curr_task
.verify
= false;
187 c
->curr_task
.finish_verify
= true;
188 c
->curr_task
.get_miss
= true;
190 c
->curr_task
.get_opt
= 0;
191 c
->curr_task
.set_opt
= 0;
192 c
->curr_task
.cycle_undo_get
= 0;
193 c
->curr_task
.cycle_undo_set
= 0;
194 c
->curr_task
.verified_get
= 0;
195 c
->curr_task
.overwrite_set
= 0;
199 * initialize udp for the connection structure
201 * @param c, pointer of the concurrency
202 * @param is_udp, whether it's udp
204 * @return int, if success, return EXIT_SUCCESS, else return -1
206 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
) {
211 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
222 if (c
->udp
|| (!c
->udp
&& ms_setting
.facebook_test
)) {
223 c
->rudpbuf
= (char *) malloc((size_t) c
->rudpsize
);
224 c
->udppkt
= (ms_udppkt_t
*) malloc(MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
226 if ((c
->rudpbuf
== NULL
) || (c
->udppkt
== NULL
)) {
227 if (c
->rudpbuf
!= NULL
)
229 if (c
->udppkt
!= NULL
)
231 fprintf(stderr
, "malloc()\n");
234 memset(c
->udppkt
, 0, MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
238 } /* ms_conn_udp_init */
241 * initialize the connection structure
243 * @param c, pointer of the concurrency
244 * @param init_state, (conn_read, conn_write, conn_closing)
245 * @param read_buffer_size
246 * @param is_udp, whether it's udp
248 * @return int, if success, return EXIT_SUCCESS, else return -1
250 static int ms_conn_init(ms_conn_t
*c
, const int init_state
, const int read_buffer_size
,
254 c
->rbuf
= c
->wbuf
= 0;
258 c
->rsize
= read_buffer_size
;
259 c
->wsize
= WRITE_BUFFER_SIZE
;
260 c
->iovsize
= IOV_LIST_INITIAL
;
261 c
->msgsize
= MSG_LIST_INITIAL
;
263 /* for replication, each connection need connect all the server */
264 if (ms_setting
.rep_write_srv
> 0) {
265 c
->total_sfds
= ms_setting
.srv_cnt
* ms_setting
.sock_per_conn
;
267 c
->total_sfds
= ms_setting
.sock_per_conn
;
271 c
->rbuf
= (char *) malloc((size_t) c
->rsize
);
272 c
->wbuf
= (char *) malloc((size_t) c
->wsize
);
273 c
->iov
= (struct iovec
*) malloc(sizeof(struct iovec
) * (size_t) c
->iovsize
);
274 c
->msglist
= (struct msghdr
*) malloc(sizeof(struct msghdr
) * (size_t) c
->msgsize
);
275 if (ms_setting
.mult_key_num
> 1) {
276 c
->mlget_task
.mlget_item
= (ms_mlget_task_item_t
*) malloc(sizeof(ms_mlget_task_item_t
)
277 * (size_t) ms_setting
.mult_key_num
);
279 c
->tcpsfd
= (int *) malloc((size_t) c
->total_sfds
* sizeof(int));
281 if ((c
->rbuf
== NULL
) || (c
->wbuf
== NULL
) || (c
->iov
== NULL
) || (c
->msglist
== NULL
)
282 || (c
->tcpsfd
== NULL
)
283 || ((ms_setting
.mult_key_num
> 1) && (c
->mlget_task
.mlget_item
== NULL
)))
291 if (c
->msglist
!= NULL
)
293 if (c
->mlget_task
.mlget_item
!= NULL
)
294 free(c
->mlget_task
.mlget_item
);
295 if (c
->tcpsfd
!= NULL
)
297 fprintf(stderr
, "malloc()\n");
301 c
->state
= init_state
;
309 c
->cur_idx
= c
->total_sfds
; /* default index is a invalid value */
313 c
->change_sfd
= false;
315 c
->precmd
.cmd
= c
->currcmd
.cmd
= CMD_NULL
;
316 c
->precmd
.isfinish
= true; /* default the previous command finished */
317 c
->currcmd
.isfinish
= false;
318 c
->precmd
.retstat
= c
->currcmd
.retstat
= MCD_FAILURE
;
319 c
->precmd
.key_prefix
= c
->currcmd
.key_prefix
= 0;
321 c
->mlget_task
.mlget_num
= 0;
322 c
->mlget_task
.value_index
= -1; /* default invalid value */
324 if (ms_setting
.binary_prot_
) {
325 c
->protocol
= binary_prot
;
327 c
->protocol
= ascii_prot
;
331 if (ms_conn_udp_init(c
, is_udp
) != 0) {
335 /* initialize task */
338 if (!(ms_setting
.facebook_test
&& is_udp
)) {
339 atomic_add_32(&ms_stats
.active_conns
, 1);
346 * when doing 100% get operation, it could preset some objects
347 * to warmup the server. this function is used to initialize the
348 * number of the objects to preset.
350 * @param c, pointer of the concurrency
352 static void ms_warmup_num_init(ms_conn_t
*c
) {
353 /* no set operation, preset all the items in the window */
354 if (ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
< PROP_ERROR
) {
355 c
->warmup_num
= c
->win_size
;
356 c
->remain_warmup_num
= c
->warmup_num
;
359 c
->remain_warmup_num
= c
->warmup_num
;
361 } /* ms_warmup_num_init */
364 * each connection has an item window, this function initialize
365 * the window. The window is used to generate task.
367 * @param c, pointer of the concurrency
369 * @return int, if success, return EXIT_SUCCESS, else return -1
371 static int ms_item_win_init(ms_conn_t
*c
) {
372 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
375 c
->win_size
= (int) ms_setting
.win_size
;
377 c
->exec_num
= ms_thread
->thread_ctx
->exec_num_perconn
;
378 c
->remain_exec_num
= c
->exec_num
;
380 c
->item_win
= (ms_task_item_t
*) malloc(sizeof(ms_task_item_t
) * (size_t) c
->win_size
);
381 if (c
->item_win
== NULL
) {
382 fprintf(stderr
, "Can't allocate task item array for conn.\n");
385 memset(c
->item_win
, 0, sizeof(ms_task_item_t
) * (size_t) c
->win_size
);
387 for (int i
= 0; i
< c
->win_size
; i
++) {
388 c
->item_win
[i
].key_size
= (int) ms_setting
.distr
[i
].key_size
;
389 c
->item_win
[i
].key_prefix
= ms_get_key_prefix();
390 c
->item_win
[i
].key_suffix_offset
= ms_setting
.distr
[i
].key_offset
;
391 c
->item_win
[i
].value_size
= (int) ms_setting
.distr
[i
].value_size
;
392 c
->item_win
[i
].value_offset
= INVALID_OFFSET
; /* default in invalid offset */
393 c
->item_win
[i
].client_time
= 0;
395 /* set expire time base on the proportion */
396 if (exp_cnt
< ms_setting
.exp_ver_per
* i
) {
397 c
->item_win
[i
].exp_time
= FIXED_EXPIRE_TIME
;
400 c
->item_win
[i
].exp_time
= 0;
404 ms_warmup_num_init(c
);
407 } /* ms_item_win_init */
410 * each connection structure can include one or more sock
411 * handlers. this function create these socks and connect the
414 * @param c, pointer of the concurrency
416 * @return int, if success, return EXIT_SUCCESS, else return -1
418 static int ms_conn_sock_init(ms_conn_t
*c
) {
419 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
422 uint32_t srv_idx
= 0;
425 assert(c
->tcpsfd
!= NULL
);
427 for (i
= 0; i
< c
->total_sfds
; i
++) {
429 if (ms_setting
.rep_write_srv
> 0) {
430 /* for replication, each connection need connect all the server */
431 srv_idx
= i
% ms_setting
.srv_cnt
;
433 /* all the connections in a thread connects the same server */
434 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
437 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
438 ms_setting
.servers
[srv_idx
].srv_port
, ms_setting
.udp
, &ret_sfd
)
448 if (!ms_setting
.udp
) {
449 c
->tcpsfd
[i
] = ret_sfd
;
455 /* initialize udp sock handler if necessary */
456 if (ms_setting
.facebook_test
) {
458 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
459 ms_setting
.servers
[srv_idx
].srv_port
, true, &ret_sfd
)
468 if ((i
!= c
->total_sfds
) || (ms_setting
.facebook_test
&& (c
->udpsfd
== 0))) {
469 if (ms_setting
.udp
) {
472 for (uint32_t j
= 0; j
< i
; j
++) {
477 if (c
->udpsfd
!= 0) {
485 } /* ms_conn_sock_init */
488 * each connection is managed by libevent, this function
489 * initialize the event of the connection structure.
491 * @param c, pointer of the concurrency
493 * @return int, if success, return EXIT_SUCCESS, else return -1
495 static int ms_conn_event_init(ms_conn_t
*c
) {
496 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
497 short event_flags
= EV_WRITE
| EV_PERSIST
;
499 event_set(&c
->event
, c
->sfd
, event_flags
, ms_event_handler
, (void *) c
);
500 event_base_set(ms_thread
->base
, &c
->event
);
501 c
->ev_flags
= event_flags
;
503 if (event_add(&c
->event
, NULL
) == -1) {
508 } /* ms_conn_event_init */
511 * setup a connection, each connection structure of each
512 * thread must call this function to initialize.
514 * @param c, pointer of the concurrency
516 * @return int, if success, return EXIT_SUCCESS, else return -1
518 int ms_setup_conn(ms_conn_t
*c
) {
519 if (ms_item_win_init(c
) != 0) {
523 if (ms_conn_init(c
, conn_write
, DATA_BUFFER_SIZE
, ms_setting
.udp
) != 0) {
527 if (ms_conn_sock_init(c
) != 0) {
531 if (ms_conn_event_init(c
) != 0) {
536 } /* ms_setup_conn */
539 * Frees a connection.
541 * @param c, pointer of the concurrency
543 void ms_conn_free(ms_conn_t
*c
) {
544 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
546 if (c
->hdrbuf
!= NULL
)
548 if (c
->msglist
!= NULL
)
556 if (c
->mlget_task
.mlget_item
!= NULL
)
557 free(c
->mlget_task
.mlget_item
);
558 if (c
->rudpbuf
!= NULL
)
560 if (c
->udppkt
!= NULL
)
562 if (c
->item_win
!= NULL
)
564 if (c
->tcpsfd
!= NULL
)
567 if (--ms_thread
->nactive_conn
== 0) {
568 free(ms_thread
->conn
);
576 * @param c, pointer of the concurrency
578 static void ms_conn_close(ms_conn_t
*c
) {
579 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
582 /* delete the event, the socket and the connection */
583 event_del(&c
->event
);
585 for (uint32_t i
= 0; i
< c
->total_sfds
; i
++) {
586 if (c
->tcpsfd
[i
] > 0) {
592 if (ms_setting
.facebook_test
) {
596 atomic_dec_32(&ms_stats
.active_conns
);
600 if (ms_setting
.run_time
== 0) {
601 pthread_mutex_lock(&ms_global
.run_lock
.lock
);
602 ms_global
.run_lock
.count
++;
603 pthread_cond_signal(&ms_global
.run_lock
.cond
);
604 pthread_mutex_unlock(&ms_global
.run_lock
.lock
);
607 if (ms_thread
->nactive_conn
== 0) {
610 } /* ms_conn_close */
615 * @param ai, server address information
617 * @return int, if success, return EXIT_SUCCESS, else return -1
619 static int ms_new_socket(struct addrinfo
*ai
) {
622 if ((sfd
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
)) == -1) {
623 fprintf(stderr
, "socket() error: %s.\n", strerror(errno
));
628 } /* ms_new_socket */
631 * Sets a socket's send buffer size to the maximum allowed by the system.
633 * @param sfd, file descriptor of socket
635 static void ms_maximize_sndbuf(const int sfd
) {
636 socklen_t intsize
= sizeof(int);
637 unsigned int last_good
= 0;
638 unsigned int min
, max
, avg
;
639 unsigned int old_size
;
641 /* Start with the default size. */
642 if (getsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, &old_size
, &intsize
) != 0) {
643 fprintf(stderr
, "getsockopt(SO_SNDBUF)\n");
647 /* Binary-search for the real maximum. */
649 max
= MAX_SENDBUF_SIZE
;
652 avg
= ((unsigned int) (min
+ max
)) / 2;
653 if (setsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, (void *) &avg
, intsize
) == 0) {
661 } /* ms_maximize_sndbuf */
664 * socket connects the server
666 * @param c, pointer of the concurrency
667 * @param srv_host_name, the host name of the server
668 * @param srv_port, port of server
669 * @param is_udp, whether it's udp
670 * @param ret_sfd, the connected socket file descriptor
672 * @return int, if success, return EXIT_SUCCESS, else return -1
674 static int ms_network_connect(ms_conn_t
*c
, char *srv_host_name
, const int srv_port
,
675 const bool is_udp
, int *ret_sfd
) {
677 struct linger ling
= {0, 0};
679 struct addrinfo
*next
;
680 struct addrinfo hints
;
681 char port_buf
[NI_MAXSERV
];
688 * the memset call clears nonstandard fields in some impementations
689 * that otherwise mess things up.
691 memset(&hints
, 0, sizeof(hints
));
693 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
695 hints
.ai_flags
= AI_PASSIVE
;
696 #endif /* AI_ADDRCONFIG */
698 hints
.ai_protocol
= IPPROTO_UDP
;
699 hints
.ai_socktype
= SOCK_DGRAM
;
700 hints
.ai_family
= AF_INET
; /* This left here because of issues with OSX 10.5 */
702 hints
.ai_family
= AF_UNSPEC
;
703 hints
.ai_protocol
= IPPROTO_TCP
;
704 hints
.ai_socktype
= SOCK_STREAM
;
707 snprintf(port_buf
, NI_MAXSERV
, "%d", srv_port
);
708 error
= getaddrinfo(srv_host_name
, port_buf
, &hints
, &ai
);
710 if (error
!= EAI_SYSTEM
)
711 fprintf(stderr
, "getaddrinfo(): %s.\n", gai_strerror(error
));
713 perror("getaddrinfo()");
718 for (next
= ai
; next
; next
= next
->ai_next
) {
719 if ((sfd
= ms_new_socket(next
)) == -1) {
724 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *) &flags
, sizeof(flags
));
726 ms_maximize_sndbuf(sfd
);
728 setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *) &flags
, sizeof(flags
));
729 setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *) &ling
, sizeof(ling
));
730 setsockopt(sfd
, IPPROTO_TCP
, TCP_NODELAY
, (void *) &flags
, sizeof(flags
));
734 c
->srv_recv_addr_size
= sizeof(struct sockaddr
);
735 memcpy(&c
->srv_recv_addr
, next
->ai_addr
, c
->srv_recv_addr_size
);
737 if (connect(sfd
, next
->ai_addr
, next
->ai_addrlen
) == -1) {
744 if (((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0) || (fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0)) {
745 fprintf(stderr
, "setting O_NONBLOCK\n");
751 if (ret_sfd
!= NULL
) {
760 /* Return zero if we detected no errors in starting up connections */
762 } /* ms_network_connect */
765 * reconnect a disconnected sock
767 * @param c, pointer of the concurrency
769 * @return int, if success, return EXIT_SUCCESS, else return -1
771 static int ms_reconn(ms_conn_t
*c
) {
772 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
773 uint32_t srv_idx
= 0;
774 uint32_t srv_conn_cnt
= 0;
776 if (ms_setting
.rep_write_srv
> 0) {
777 srv_idx
= c
->cur_idx
% ms_setting
.srv_cnt
;
778 srv_conn_cnt
= ms_setting
.sock_per_conn
* ms_setting
.nconns
;
780 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
781 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
784 /* close the old socket handler */
786 c
->tcpsfd
[c
->cur_idx
] = 0;
788 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].disconn_cnt
, 1) % srv_conn_cnt
== 0) {
789 gettimeofday(&ms_setting
.servers
[srv_idx
].disconn_time
, NULL
);
790 fprintf(stderr
, "Server %s:%d disconnect\n", ms_setting
.servers
[srv_idx
].srv_host_name
,
791 ms_setting
.servers
[srv_idx
].srv_port
);
794 if (ms_setting
.rep_write_srv
> 0) {
797 for (i
= 0; i
< c
->total_sfds
; i
++) {
798 if (c
->tcpsfd
[i
] != 0) {
803 /* all socks disconnect */
804 if (i
== c
->total_sfds
) {
809 /* reconnect success, break the loop */
810 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
811 ms_setting
.servers
[srv_idx
].srv_port
, ms_setting
.udp
, &c
->sfd
)
814 c
->tcpsfd
[c
->cur_idx
] = c
->sfd
;
815 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1) % (uint32_t) srv_conn_cnt
817 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
818 int reconn_time
= (int) (ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
819 - ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
);
820 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
821 ms_setting
.servers
[srv_idx
].srv_host_name
, ms_setting
.servers
[srv_idx
].srv_port
,
827 if (ms_setting
.rep_write_srv
== 0 && c
->total_sfds
> 0) {
828 /* wait a second and reconnect */
831 } while (ms_setting
.rep_write_srv
== 0 && c
->total_sfds
> 0);
834 if ((c
->total_sfds
> 1) && (c
->tcpsfd
[c
->cur_idx
] == 0)) {
843 * reconnect several disconnected socks in the connection
844 * structure, the ever-1-second timer of the thread will check
845 * whether some socks in the connections disconnect. if
846 * disconnect, reconnect the sock.
848 * @param c, pointer of the concurrency
850 * @return int, if success, return EXIT_SUCCESS, else return -1
852 int ms_reconn_socks(ms_conn_t
*c
) {
853 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
854 uint32_t srv_idx
= 0;
856 uint32_t srv_conn_cnt
= 0;
857 struct timeval cur_time
;
861 if ((c
->total_sfds
== 1) || (c
->total_sfds
== c
->alive_sfds
)) {
865 for (uint32_t i
= 0; i
< c
->total_sfds
; i
++) {
866 if (c
->tcpsfd
[i
] == 0) {
867 gettimeofday(&cur_time
, NULL
);
870 * For failover test of replication, reconnect the socks after
871 * it disconnects more than 5 seconds, Otherwise memslap will
872 * block at connect() function and the work threads can't work
875 if (cur_time
.tv_sec
- ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
< 5) {
879 if (ms_setting
.rep_write_srv
> 0) {
880 srv_idx
= i
% ms_setting
.srv_cnt
;
881 srv_conn_cnt
= ms_setting
.sock_per_conn
* ms_setting
.nconns
;
883 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
884 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
887 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
888 ms_setting
.servers
[srv_idx
].srv_port
, ms_setting
.udp
, &ret_sfd
)
891 c
->tcpsfd
[i
] = ret_sfd
;
894 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1) % (uint32_t) srv_conn_cnt
896 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
897 int reconn_time
= (int) (ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
898 - ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
);
899 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
900 ms_setting
.servers
[srv_idx
].srv_host_name
, ms_setting
.servers
[srv_idx
].srv_port
,
908 } /* ms_reconn_socks */
911 * Tokenize the command string by replacing whitespace with '\0' and update
912 * the token array tokens with pointer to start of each token and length.
913 * Returns total number of tokens. The last valid token is the terminal
914 * token (value points to the first unprocessed character of the string and
919 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
920 * for(int ix = 0; tokens[ix].length != 0; ix++) {
923 * ncommand = tokens[ix].value - command;
924 * command = tokens[ix].value;
927 * @param command, the command string to token
928 * @param tokens, array to store tokens
929 * @param max_tokens, maximum tokens number
931 * @return int, the number of tokens
933 static int ms_tokenize_command(char *command
, token_t
*tokens
, const int max_tokens
) {
937 assert(command
!= NULL
&& tokens
!= NULL
&& max_tokens
> 1);
939 for (s
= e
= command
; ntokens
< max_tokens
- 1; ++e
) {
942 tokens
[ntokens
].value
= s
;
943 tokens
[ntokens
].length
= (size_t)(e
- s
);
948 } else if (*e
== '\0') {
950 tokens
[ntokens
].value
= s
;
951 tokens
[ntokens
].length
= (size_t)(e
- s
);
955 break; /* string end */
960 } /* ms_tokenize_command */
963 * parse the response of server.
965 * @param c, pointer of the concurrency
966 * @param command, the string responded by server
968 * @return int, if the command completed return EXIT_SUCCESS, else return
971 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
) {
974 char *buffer
= command
;
979 * for command get, we store the returned value into local buffer
980 * then continue in ms_complete_nread().
984 case 'V': /* VALUE || VERSION */
985 if (buffer
[1] == 'A') /* VALUE */ {
986 token_t tokens
[MAX_TOKENS
];
987 ms_tokenize_command(command
, tokens
, MAX_TOKENS
);
989 value_len
= strtol(tokens
[VALUELEN_TOKEN
].value
, NULL
, 10);
991 printf("<%d ERROR %s\n", c
->sfd
, strerror(errno
));
993 memcpy(&c
->currcmd
.key_prefix
, tokens
[KEY_TOKEN
].value
, sizeof(c
->currcmd
.key_prefix
));
996 * We read the \r\n into the string since not doing so is more
997 * cycles then the waster of memory to do so.
999 * We are null terminating through, which will most likely make
1000 * some people lazy about using the return length.
1002 c
->rvbytes
= (int) (value_len
+ 2);
1009 case 'O': /* OK */ c
->currcmd
.retstat
= MCD_SUCCESS
; break;
1011 case 'S': /* STORED STATS SERVER_ERROR */
1012 if (buffer
[2] == 'A') /* STORED STATS */ { /* STATS*/
1013 c
->currcmd
.retstat
= MCD_STAT
;
1014 } else if (buffer
[1] == 'E') {
1016 printf("<%d %s\n", c
->sfd
, buffer
);
1018 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
1019 } else if (buffer
[1] == 'T') {
1021 c
->currcmd
.retstat
= MCD_STORED
;
1023 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1027 case 'D': /* DELETED DATA */
1028 if (buffer
[1] == 'E') {
1029 c
->currcmd
.retstat
= MCD_DELETED
;
1031 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1036 case 'N': /* NOT_FOUND NOT_STORED*/
1037 if (buffer
[4] == 'F') {
1038 c
->currcmd
.retstat
= MCD_NOTFOUND
;
1039 } else if (buffer
[4] == 'S') {
1040 printf("<%d %s\n", c
->sfd
, buffer
);
1041 c
->currcmd
.retstat
= MCD_NOTSTORED
;
1043 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1047 case 'E': /* PROTOCOL ERROR or END */
1048 if (buffer
[1] == 'N') {
1050 c
->currcmd
.retstat
= MCD_END
;
1051 } else if (buffer
[1] == 'R') {
1052 printf("<%d ERROR\n", c
->sfd
);
1053 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
1054 } else if (buffer
[1] == 'X') {
1055 c
->currcmd
.retstat
= MCD_DATA_EXISTS
;
1056 printf("<%d %s\n", c
->sfd
, buffer
);
1058 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1062 case 'C': /* CLIENT ERROR */
1063 printf("<%d %s\n", c
->sfd
, buffer
);
1064 c
->currcmd
.retstat
= MCD_CLIENT_ERROR
;
1067 default: c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
; break;
1071 } /* ms_ascii_process_line */
1074 * after one operation completes, reset the concurrency
1076 * @param c, pointer of the concurrency
1077 * @param timeout, whether it's timeout
1079 void ms_reset_conn(ms_conn_t
*c
, bool timeout
) {
1083 if ((c
->packets
> 0) && (c
->packets
< MAX_UDP_PACKET
)) {
1084 memset(c
->udppkt
, 0, sizeof(ms_udppkt_t
) * (size_t) c
->packets
);
1093 c
->currcmd
.isfinish
= true;
1094 c
->ctnwrite
= false;
1100 ms_conn_set_state(c
, conn_write
);
1101 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
1104 ms_drive_machine(c
);
1106 } /* ms_reset_conn */
1109 * if we have a complete line in the buffer, process it.
1111 * @param c, pointer of the concurrency
1113 * @return int, if success, return EXIT_SUCCESS, else return -1
1115 static int ms_try_read_line(ms_conn_t
*c
) {
1116 if (c
->protocol
== binary_prot
) {
1117 /* Do we have the complete packet header? */
1118 if ((uint64_t) c
->rbytes
< sizeof(c
->binary_header
)) {
1119 /* need more data! */
1120 return EXIT_SUCCESS
;
1123 if (((long) (c
->rcurr
)) % 8 != 0) {
1124 /* must realign input buffer */
1125 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
1127 if (settings
.verbose
) {
1128 fprintf(stderr
, "%d: Realign input buffer.\n", c
->sfd
);
1132 protocol_binary_response_header
*rsp
;
1133 rsp
= (protocol_binary_response_header
*) c
->rcurr
;
1135 c
->binary_header
= *rsp
;
1136 c
->binary_header
.response
.extlen
= rsp
->response
.extlen
;
1137 c
->binary_header
.response
.keylen
= ntohs(rsp
->response
.keylen
);
1138 c
->binary_header
.response
.bodylen
= ntohl(rsp
->response
.bodylen
);
1139 c
->binary_header
.response
.status
= ntohs(rsp
->response
.status
);
1141 if (c
->binary_header
.response
.magic
!= PROTOCOL_BINARY_RES
) {
1142 fprintf(stderr
, "Invalid magic: %x\n", c
->binary_header
.response
.magic
);
1143 ms_conn_set_state(c
, conn_closing
);
1144 return EXIT_SUCCESS
;
1147 /* process this complete response */
1148 if (ms_bin_process_response(c
) == 0) {
1149 /* current operation completed */
1150 ms_reset_conn(c
, false);
1153 c
->rbytes
-= (int32_t) sizeof(c
->binary_header
);
1154 c
->rcurr
+= sizeof(c
->binary_header
);
1161 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1164 return EXIT_SUCCESS
;
1166 el
= memchr(c
->rcurr
, '\n', (size_t) c
->rbytes
);
1168 return EXIT_SUCCESS
;
1171 if (((el
- c
->rcurr
) > 1) && (*(el
- 1) == '\r')) {
1176 assert(cont
<= (c
->rcurr
+ c
->rbytes
));
1178 /* process this complete line */
1179 if (ms_ascii_process_line(c
, c
->rcurr
) == 0) {
1180 /* current operation completed */
1181 ms_reset_conn(c
, false);
1184 /* current operation didn't complete */
1185 c
->rbytes
-= (int32_t)(cont
- c
->rcurr
);
1189 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1193 } /* ms_try_read_line */
1196 * because the packet of UDP can't ensure the order, the
1197 * function is used to sort the received udp packet.
1199 * @param c, pointer of the concurrency
1200 * @param buf, the buffer to store the ordered packages data
1201 * @param rbytes, the maximum capacity of the buffer
1203 * @return int, if success, return the copy bytes, else return
1206 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
) {
1209 uint16_t req_id
= 0;
1210 uint16_t seq_num
= 0;
1211 uint16_t packets
= 0;
1212 unsigned char *header
= NULL
;
1214 /* no enough data */
1216 assert(buf
!= NULL
);
1217 assert(c
->rudpbytes
>= UDP_HEADER_SIZE
);
1219 /* calculate received packets count */
1220 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
>= UDP_HEADER_SIZE
) {
1221 /* the last packet has some data */
1222 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
+ 1;
1224 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
;
1227 /* get the total packets count if necessary */
1228 if (c
->packets
== 0) {
1229 c
->packets
= HEADER_TO_PACKETS((unsigned char *) c
->rudpbuf
);
1232 /* build the ordered packet array */
1233 for (int i
= c
->pktcurr
; i
< c
->recvpkt
; i
++) {
1234 header
= (unsigned char *) c
->rudpbuf
+ i
* UDP_MAX_PAYLOAD_SIZE
;
1235 req_id
= (uint16_t) HEADER_TO_REQID(header
);
1236 assert(req_id
== c
->request_id
% (1 << 16));
1238 packets
= (uint16_t) HEADER_TO_PACKETS(header
);
1239 assert(c
->packets
== HEADER_TO_PACKETS(header
));
1241 seq_num
= (uint16_t) HEADER_TO_SEQNUM(header
);
1242 c
->udppkt
[seq_num
].header
= header
;
1243 c
->udppkt
[seq_num
].data
= (char *) header
+ UDP_HEADER_SIZE
;
1245 if (i
== c
->recvpkt
- 1) {
1246 /* last received packet */
1247 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
== 0) {
1248 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1251 c
->udppkt
[seq_num
].rbytes
= c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1254 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1259 for (int i
= c
->ordcurr
; i
< c
->recvpkt
; i
++) {
1260 /* there is some data to copy */
1261 if ((c
->udppkt
[i
].data
!= NULL
) && (c
->udppkt
[i
].copybytes
< c
->udppkt
[i
].rbytes
)) {
1262 header
= c
->udppkt
[i
].header
;
1263 len
= c
->udppkt
[i
].rbytes
- c
->udppkt
[i
].copybytes
;
1264 if (len
> rbytes
- wbytes
) {
1265 len
= rbytes
- wbytes
;
1268 assert(len
<= rbytes
- wbytes
);
1269 assert(i
== HEADER_TO_SEQNUM(header
));
1271 memcpy(buf
+ wbytes
, c
->udppkt
[i
].data
+ c
->udppkt
[i
].copybytes
, (size_t) len
);
1273 c
->udppkt
[i
].copybytes
+= len
;
1275 if ((c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
)
1276 && (c
->udppkt
[i
].rbytes
== UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
))
1278 /* finish copying all the data of this packet, next */
1282 /* last received packet, and finish copying all the data */
1283 if ((c
->recvpkt
== c
->packets
) && (i
== c
->recvpkt
- 1)
1284 && (c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
))
1289 /* no space to copy data */
1290 if (wbytes
>= rbytes
) {
1294 /* it doesn't finish reading all the data of the packet from network */
1295 if ((i
!= c
->recvpkt
- 1) && (c
->udppkt
[i
].rbytes
< UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
)) {
1299 /* no data to copy */
1305 return wbytes
== 0 ? -1 : wbytes
;
1306 } /* ms_sort_udp_packet */
1309 * encapsulate upd read like tcp read
1311 * @param c, pointer of the concurrency
1312 * @param buf, read buffer
1313 * @param len, length to read
1315 * @return int, if success, return the read bytes, else return
1318 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
) {
1327 if (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
> c
->rudpsize
) {
1328 char *new_rbuf
= realloc(c
->rudpbuf
, (size_t) c
->rudpsize
* 2);
1330 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1331 c
->rudpbytes
= 0; /* ignore what we read */
1334 c
->rudpbuf
= new_rbuf
;
1338 avail
= c
->rudpsize
- c
->rudpbytes
;
1339 /* UDP each time read a packet, 1400 bytes */
1340 res
= (int) read(c
->sfd
, c
->rudpbuf
+ c
->rudpbytes
, (size_t) avail
);
1343 atomic_add_size(&ms_stats
.bytes_read
, res
);
1344 c
->rudpbytes
+= res
;
1354 /* "connection" closed */
1359 /* no data to read */
1364 /* copy data to read buffer */
1366 copybytes
= ms_sort_udp_packet(c
, buf
, len
);
1369 if (copybytes
== -1) {
1370 atomic_add_size(&ms_stats
.pkt_disorder
, 1);
1377 * read from network as much as we can, handle buffer overflow and connection
1379 * before reading, move the remaining incomplete fragment of a command
1380 * (if any) to the beginning of the buffer.
1381 * return EXIT_SUCCESS if there's nothing to read on the first read.
1385 * read from network as much as we can, handle buffer overflow and connection
1386 * close. before reading, move the remaining incomplete fragment of a command
1387 * (if any) to the beginning of the buffer.
1389 * @param c, pointer of the concurrency
1392 * return EXIT_SUCCESS if there's nothing to read on the first read.
1393 * return EXIT_FAILURE if get data
1394 * return -1 if error happens
1396 static int ms_try_read_network(ms_conn_t
*c
) {
1403 if ((c
->rcurr
!= c
->rbuf
)
1404 && (!c
->readval
|| (c
->rvbytes
> c
->rsize
- (c
->rcurr
- c
->rbuf
))
1405 || (c
->readval
&& (c
->rcurr
- c
->rbuf
> c
->rbytes
))))
1407 if (c
->rbytes
!= 0) /* otherwise there's nothing to copy */
1408 memmove(c
->rbuf
, c
->rcurr
, (size_t) c
->rbytes
);
1413 if (c
->rbytes
>= c
->rsize
) {
1414 char *new_rbuf
= realloc(c
->rbuf
, (size_t) c
->rsize
* 2);
1416 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1417 c
->rbytes
= 0; /* ignore what we read */
1420 c
->rcurr
= c
->rbuf
= new_rbuf
;
1424 avail
= c
->rsize
- c
->rbytes
- (c
->rcurr
- c
->rbuf
);
1430 res
= (int32_t) ms_udp_read(c
, c
->rcurr
+ c
->rbytes
, (int32_t) avail
);
1432 res
= (int) read(c
->sfd
, c
->rcurr
+ c
->rbytes
, (size_t) avail
);
1437 atomic_add_size(&ms_stats
.bytes_read
, res
);
1448 /* connection closed */
1449 ms_conn_set_state(c
, conn_closing
);
1453 if ((errno
== EAGAIN
) || (errno
== EWOULDBLOCK
))
1455 /* Should close on unhandled errors. */
1456 ms_conn_set_state(c
, conn_closing
);
1462 } /* ms_try_read_network */
1465 * after get the object from server, verify the value if
1468 * @param c, pointer of the concurrency
1469 * @param mlget_item, pointer of mulit-get task item structure
1470 * @param value, received value string
1471 * @param vlen, received value string length
1473 static void ms_verify_value(ms_conn_t
*c
, ms_mlget_task_item_t
*mlget_item
, char *value
, int vlen
) {
1474 if (c
->curr_task
.verify
) {
1475 assert(c
->curr_task
.item
->value_offset
!= INVALID_OFFSET
);
1476 char *orignval
= &ms_setting
.char_block
[c
->curr_task
.item
->value_offset
];
1477 char *orignkey
= &ms_setting
.char_block
[c
->curr_task
.item
->key_suffix_offset
];
1479 /* verify expire time if necessary */
1480 if (c
->curr_task
.item
->exp_time
> 0) {
1481 struct timeval curr_time
;
1482 gettimeofday(&curr_time
, NULL
);
1484 /* object expired but get it now */
1485 if (curr_time
.tv_sec
- c
->curr_task
.item
->client_time
1486 > c
->curr_task
.item
->exp_time
+ EXPIRE_TIME_ERROR
)
1488 atomic_add_size(&ms_stats
.exp_get
, 1);
1490 if (ms_setting
.verbose
) {
1493 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S", localtime(&c
->curr_task
.item
->client_time
));
1494 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S", localtime(&curr_time
.tv_sec
));
1496 "\n<%d expire time verification failed, "
1497 "object expired but get it now\n"
1499 "\tkey: %" PRIx64
" %.*s\n"
1500 "\tset time: %s current time: %s "
1501 "diff time: %d expire time: %d\n"
1502 "\texpected data: \n"
1503 "\treceived data len: %d\n"
1504 "\treceived data: %.*s\n",
1505 c
->sfd
, c
->curr_task
.item
->key_size
, c
->curr_task
.item
->key_prefix
,
1506 c
->curr_task
.item
->key_size
- (int) KEY_PREFIX_SIZE
, orignkey
, set_time
, cur_time
,
1507 (int) (curr_time
.tv_sec
- c
->curr_task
.item
->client_time
),
1508 c
->curr_task
.item
->exp_time
, vlen
, vlen
, value
);
1513 if ((c
->curr_task
.item
->value_size
!= vlen
) || (memcmp(orignval
, value
, (size_t) vlen
) != 0))
1515 atomic_add_size(&ms_stats
.vef_failed
, 1);
1517 if (ms_setting
.verbose
) {
1519 "\n<%d data verification failed\n"
1521 "\tkey: %" PRIx64
" %.*s\n"
1522 "\texpected data len: %d\n"
1523 "\texpected data: %.*s\n"
1524 "\treceived data len: %d\n"
1525 "\treceived data: %.*s\n",
1526 c
->sfd
, c
->curr_task
.item
->key_size
, c
->curr_task
.item
->key_prefix
,
1527 c
->curr_task
.item
->key_size
- (int) KEY_PREFIX_SIZE
, orignkey
,
1528 c
->curr_task
.item
->value_size
, c
->curr_task
.item
->value_size
, orignval
, vlen
,
1535 c
->curr_task
.finish_verify
= true;
1537 if (mlget_item
!= NULL
) {
1538 mlget_item
->finish_verify
= true;
1541 } /* ms_verify_value */
1544 * For ASCII protocol, after store the data into the local
1545 * buffer, run this function to handle the data.
1547 * @param c, pointer of the concurrency
1549 static void ms_ascii_complete_nread(ms_conn_t
*c
) {
1551 assert(c
->rbytes
>= c
->rvbytes
);
1552 assert(c
->protocol
== ascii_prot
);
1553 if (c
->rvbytes
> 2) {
1554 assert(c
->rcurr
[c
->rvbytes
- 1] == '\n' && c
->rcurr
[c
->rvbytes
- 2] == '\r');
1558 ms_mlget_task_item_t
*mlget_item
= NULL
;
1559 if (((ms_setting
.mult_key_num
> 1) && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1560 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1562 c
->mlget_task
.value_index
++;
1563 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1565 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
) {
1566 c
->curr_task
.item
= mlget_item
->item
;
1567 c
->curr_task
.verify
= mlget_item
->verify
;
1568 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1569 mlget_item
->get_miss
= false;
1571 /* Try to find the task item in multi-get task array */
1572 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
1573 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
1574 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
) {
1575 c
->curr_task
.item
= mlget_item
->item
;
1576 c
->curr_task
.verify
= mlget_item
->verify
;
1577 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1578 mlget_item
->get_miss
= false;
1586 ms_verify_value(c
, mlget_item
, c
->rcurr
, c
->rvbytes
- 2);
1588 c
->curr_task
.get_miss
= false;
1589 c
->rbytes
-= c
->rvbytes
;
1590 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1591 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1594 } /* ms_ascii_complete_nread */
1597 * For binary protocol, after store the data into the local
1598 * buffer, run this function to handle the data.
1600 * @param c, pointer of the concurrency
1602 static void ms_bin_complete_nread(ms_conn_t
*c
) {
1604 assert(c
->rbytes
>= c
->rvbytes
);
1605 assert(c
->protocol
== binary_prot
);
1607 int extlen
= c
->binary_header
.response
.extlen
;
1608 int keylen
= c
->binary_header
.response
.keylen
;
1609 uint8_t opcode
= c
->binary_header
.response
.opcode
;
1611 /* not get command or not include value, just return */
1612 if (((opcode
!= PROTOCOL_BINARY_CMD_GET
) && (opcode
!= PROTOCOL_BINARY_CMD_GETQ
))
1613 || (c
->rvbytes
<= extlen
+ keylen
))
1616 if (c
->binary_header
.response
.opcode
== PROTOCOL_BINARY_CMD_GET
) {
1617 c
->currcmd
.retstat
= MCD_END
;
1618 c
->curr_task
.get_miss
= true;
1623 ms_reset_conn(c
, false);
1628 ms_mlget_task_item_t
*mlget_item
= NULL
;
1629 if (((ms_setting
.mult_key_num
> 1) && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1630 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1632 c
->mlget_task
.value_index
++;
1633 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1635 c
->curr_task
.item
= mlget_item
->item
;
1636 c
->curr_task
.verify
= mlget_item
->verify
;
1637 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1638 mlget_item
->get_miss
= false;
1641 ms_verify_value(c
, mlget_item
, c
->rcurr
+ extlen
+ keylen
, c
->rvbytes
- extlen
- keylen
);
1643 c
->currcmd
.retstat
= MCD_END
;
1644 c
->curr_task
.get_miss
= false;
1645 c
->rbytes
-= c
->rvbytes
;
1646 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1647 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1651 if (ms_setting
.mult_key_num
> 1) {
1652 /* multi-get have check all the item */
1653 if (c
->mlget_task
.value_index
== c
->mlget_task
.mlget_num
- 1) {
1654 ms_reset_conn(c
, false);
1658 ms_reset_conn(c
, false);
1660 } /* ms_bin_complete_nread */
1663 * we get here after reading the value of get commands.
1665 * @param c, pointer of the concurrency
1667 static void ms_complete_nread(ms_conn_t
*c
) {
1669 assert(c
->rbytes
>= c
->rvbytes
);
1670 assert(c
->protocol
== ascii_prot
|| c
->protocol
== binary_prot
);
1672 if (c
->protocol
== binary_prot
) {
1673 ms_bin_complete_nread(c
);
1675 ms_ascii_complete_nread(c
);
1677 } /* ms_complete_nread */
1680 * Adds a message header to a connection.
1682 * @param c, pointer of the concurrency
1684 * @return int, if success, return EXIT_SUCCESS, else return -1
1686 static int ms_add_msghdr(ms_conn_t
*c
) {
1691 if (c
->msgsize
== c
->msgused
) {
1692 msg
= realloc(c
->msglist
, (size_t) c
->msgsize
* 2 * sizeof(struct msghdr
));
1700 msg
= c
->msglist
+ c
->msgused
;
1703 * this wipes msg_iovlen, msg_control, msg_controllen, and
1704 * msg_flags, the last 3 of which aren't defined on solaris:
1706 memset(msg
, 0, sizeof(struct msghdr
));
1708 msg
->msg_iov
= &c
->iov
[c
->iovused
];
1710 if (c
->udp
&& (c
->srv_recv_addr_size
> 0)) {
1711 msg
->msg_name
= &c
->srv_recv_addr
;
1712 msg
->msg_namelen
= c
->srv_recv_addr_size
;
1719 /* Leave room for the UDP header, which we'll fill in later. */
1720 return ms_add_iov(c
, NULL
, UDP_HEADER_SIZE
);
1723 return EXIT_SUCCESS
;
1724 } /* ms_add_msghdr */
1727 * Ensures that there is room for another structure iovec in a connection's
1730 * @param c, pointer of the concurrency
1732 * @return int, if success, return EXIT_SUCCESS, else return -1
1734 static int ms_ensure_iov_space(ms_conn_t
*c
) {
1737 if (c
->iovused
>= c
->iovsize
) {
1739 struct iovec
*new_iov
=
1740 (struct iovec
*) realloc(c
->iov
, ((size_t) c
->iovsize
* 2) * sizeof(struct iovec
));
1747 /* Point all the msghdr structures at the new list. */
1748 for (i
= 0, iovnum
= 0; i
< c
->msgused
; i
++) {
1749 c
->msglist
[i
].msg_iov
= &c
->iov
[iovnum
];
1750 iovnum
+= (int) c
->msglist
[i
].msg_iovlen
;
1754 return EXIT_SUCCESS
;
1755 } /* ms_ensure_iov_space */
1758 * Adds data to the list of pending data that will be written out to a
1761 * @param c, pointer of the concurrency
1762 * @param buf, the buffer includes data to send
1763 * @param len, the data length in the buffer
1765 * @return int, if success, return EXIT_SUCCESS, else return -1
1767 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
) {
1775 m
= &c
->msglist
[c
->msgused
- 1];
1778 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
1780 limit_to_mtu
= c
->udp
;
1783 /* We may need to start a new msghdr if this one is full. */
1784 if ((m
->msg_iovlen
== IOV_MAX
) || (limit_to_mtu
&& (c
->msgbytes
>= UDP_MAX_SEND_PAYLOAD_SIZE
)))
1787 m
= &c
->msglist
[c
->msgused
- 1];
1791 if (ms_ensure_iov_space(c
) != 0)
1794 /* If the fragment is too big to fit in the datagram, split it up */
1795 if (limit_to_mtu
&& (len
+ c
->msgbytes
> UDP_MAX_SEND_PAYLOAD_SIZE
)) {
1796 leftover
= len
+ c
->msgbytes
- UDP_MAX_SEND_PAYLOAD_SIZE
;
1802 m
= &c
->msglist
[c
->msgused
- 1];
1803 m
->msg_iov
[m
->msg_iovlen
].iov_base
= (void *) buf
;
1804 m
->msg_iov
[m
->msg_iovlen
].iov_len
= (size_t) len
;
1810 buf
= ((char *) buf
) + len
;
1812 } while (leftover
> 0);
1814 return EXIT_SUCCESS
;
1818 * Constructs a set of UDP headers and attaches them to the outgoing messages.
1820 * @param c, pointer of the concurrency
1822 * @return int, if success, return EXIT_SUCCESS, else return -1
1824 static int ms_build_udp_headers(ms_conn_t
*c
) {
1830 c
->request_id
= ms_get_udp_request_id();
1832 if (c
->msgused
> c
->hdrsize
) {
1835 new_hdrbuf
= realloc(c
->hdrbuf
, (size_t) c
->msgused
* 2 * UDP_HEADER_SIZE
);
1837 new_hdrbuf
= malloc((size_t) c
->msgused
* 2 * UDP_HEADER_SIZE
);
1841 c
->hdrbuf
= (unsigned char *) new_hdrbuf
;
1842 c
->hdrsize
= c
->msgused
* 2;
1845 /* If this is a multi-packet request, drop it. */
1846 if (c
->udp
&& (c
->msgused
> 1)) {
1847 fprintf(stderr
, "multi-packet request for UDP not supported.\n");
1852 for (i
= 0; i
< c
->msgused
; i
++) {
1853 c
->msglist
[i
].msg_iov
[0].iov_base
= (void *) hdr
;
1854 c
->msglist
[i
].msg_iov
[0].iov_len
= UDP_HEADER_SIZE
;
1855 *hdr
++ = (unsigned char) (c
->request_id
/ 256);
1856 *hdr
++ = (unsigned char) (c
->request_id
% 256);
1857 *hdr
++ = (unsigned char) (i
/ 256);
1858 *hdr
++ = (unsigned char) (i
% 256);
1859 *hdr
++ = (unsigned char) (c
->msgused
/ 256);
1860 *hdr
++ = (unsigned char) (c
->msgused
% 256);
1861 *hdr
++ = (unsigned char) 1; /* support facebook memcached */
1862 *hdr
++ = (unsigned char) 0;
1863 assert(hdr
== ((unsigned char *) c
->msglist
[i
].msg_iov
[0].iov_base
+ UDP_HEADER_SIZE
));
1866 return EXIT_SUCCESS
;
1867 } /* ms_build_udp_headers */
1870 * Transmit the next chunk of data from our list of msgbuf structures.
1872 * @param c, pointer of the concurrency
1874 * @return TRANSMIT_COMPLETE All done writing.
1875 * TRANSMIT_INCOMPLETE More data remaining to write.
1876 * TRANSMIT_SOFT_ERROR Can't write any more right now.
1877 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1879 static int ms_transmit(ms_conn_t
*c
) {
1882 if ((c
->msgcurr
< c
->msgused
) && (c
->msglist
[c
->msgcurr
].msg_iovlen
== 0)) {
1883 /* Finished writing the current msg; advance to the next. */
1887 if (c
->msgcurr
< c
->msgused
) {
1889 struct msghdr
*m
= &c
->msglist
[c
->msgcurr
];
1891 res
= sendmsg(c
->sfd
, m
, 0);
1893 atomic_add_size(&ms_stats
.bytes_written
, res
);
1895 /* We've written some of the data. Remove the completed
1896 * iovec entries from the list of pending writes. */
1897 while (m
->msg_iovlen
> 0 && res
>= (ssize_t
) m
->msg_iov
->iov_len
) {
1898 res
-= (ssize_t
) m
->msg_iov
->iov_len
;
1903 /* Might have written just part of the last iovec entry;
1904 * adjust it so the next write will do the rest. */
1906 m
->msg_iov
->iov_base
= (void *) ((unsigned char *) m
->msg_iov
->iov_base
+ res
);
1907 m
->msg_iov
->iov_len
-= (size_t) res
;
1909 return TRANSMIT_INCOMPLETE
;
1911 if ((res
== -1) && ((errno
== EAGAIN
) || (errno
== EWOULDBLOCK
))) {
1912 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
1913 fprintf(stderr
, "Couldn't update event.\n");
1914 ms_conn_set_state(c
, conn_closing
);
1915 return TRANSMIT_HARD_ERROR
;
1917 return TRANSMIT_SOFT_ERROR
;
1920 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1921 * we have a real error, on which we close the connection */
1922 fprintf(stderr
, "Failed to write, and not due to blocking.\n");
1924 ms_conn_set_state(c
, conn_closing
);
1925 return TRANSMIT_HARD_ERROR
;
1927 return TRANSMIT_COMPLETE
;
1932 * Shrinks a connection's buffers if they're too big. This prevents
1933 * periodic large "mget" response from server chewing lots of client
1936 * This should only be called in between requests since it can wipe output
1939 * @param c, pointer of the concurrency
1941 static void ms_conn_shrink(ms_conn_t
*c
) {
1947 if ((c
->rsize
> READ_BUFFER_HIGHWAT
) && (c
->rbytes
< DATA_BUFFER_SIZE
)) {
1950 if (c
->rcurr
!= c
->rbuf
)
1951 memmove(c
->rbuf
, c
->rcurr
, (size_t) c
->rbytes
);
1953 newbuf
= (char *) realloc((void *) c
->rbuf
, DATA_BUFFER_SIZE
);
1957 c
->rsize
= DATA_BUFFER_SIZE
;
1962 if (c
->udp
&& (c
->rudpsize
> UDP_DATA_BUFFER_HIGHWAT
)
1963 && (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
< UDP_DATA_BUFFER_SIZE
))
1965 char *new_rbuf
= (char *) realloc(c
->rudpbuf
, (size_t) c
->rudpsize
* 2);
1967 c
->rudpbuf
= new_rbuf
;
1968 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
1970 /* TODO check error condition? */
1973 if (c
->msgsize
> MSG_LIST_HIGHWAT
) {
1974 struct msghdr
*newbuf
=
1975 (struct msghdr
*) realloc((void *) c
->msglist
, MSG_LIST_INITIAL
* sizeof(c
->msglist
[0]));
1977 c
->msglist
= newbuf
;
1978 c
->msgsize
= MSG_LIST_INITIAL
;
1980 /* TODO check error condition? */
1983 if (c
->iovsize
> IOV_LIST_HIGHWAT
) {
1984 struct iovec
*newbuf
=
1985 (struct iovec
*) realloc((void *) c
->iov
, IOV_LIST_INITIAL
* sizeof(c
->iov
[0]));
1988 c
->iovsize
= IOV_LIST_INITIAL
;
1990 /* TODO check return value */
1992 } /* ms_conn_shrink */
1995 * Sets a connection's current state in the state machine. Any special
1996 * processing that needs to happen on certain state transitions can
1999 * @param c, pointer of the concurrency
2000 * @param state, connection state
2002 static void ms_conn_set_state(ms_conn_t
*c
, int state
) {
2005 if (state
!= c
->state
) {
2006 if (state
== conn_read
) {
2011 } /* ms_conn_set_state */
2014 * update the event if socks change state. for example: when
2015 * change the listen scoket read event to sock write event, or
2016 * change socket handler, we could call this function.
2018 * @param c, pointer of the concurrency
2019 * @param new_flags, new event flags
2021 * @return bool, if success, return true, else return false
2023 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
) {
2026 struct event_base
*base
= c
->event
.ev_base
;
2027 if ((c
->ev_flags
== new_flags
) && (ms_setting
.rep_write_srv
== 0)
2028 && (!ms_setting
.facebook_test
|| (c
->total_sfds
== 1)))
2033 if (event_del(&c
->event
) == -1) {
2034 /* try to delete the event again */
2035 if (event_del(&c
->event
) == -1) {
2040 event_set(&c
->event
, c
->sfd
, (short) new_flags
, ms_event_handler
, (void *) c
);
2041 event_base_set(base
, &c
->event
);
2042 c
->ev_flags
= (short) new_flags
;
2044 if (event_add(&c
->event
, NULL
) == -1) {
2049 } /* ms_update_event */
2052 * If user want to get the expected throughput, we could limit
2053 * the performance of memslap. we could give up some work and
2054 * just wait a short time. The function is used to check this
2057 * @param c, pointer of the concurrency
2059 * @return bool, if success, return true, else return false
2061 static bool ms_need_yield(ms_conn_t
*c
) {
2062 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
2064 int64_t time_diff
= 0;
2065 struct timeval curr_time
;
2066 ms_task_t
*task
= &c
->curr_task
;
2068 if (ms_setting
.expected_tps
> 0) {
2069 gettimeofday(&curr_time
, NULL
);
2070 time_diff
= ms_time_diff(&ms_thread
->startup_time
, &curr_time
);
2071 tps
= (int64_t)(((task
->get_opt
+ task
->set_opt
) / (uint64_t) time_diff
) * 1000000);
2073 /* current throughput is greater than expected throughput */
2074 if (tps
> ms_thread
->thread_ctx
->tps_perconn
) {
2080 } /* ms_need_yield */
2083 * used to update the start time of each operation
2085 * @param c, pointer of the concurrency
2087 static void ms_update_start_time(ms_conn_t
*c
) {
2088 ms_task_item_t
*item
= c
->curr_task
.item
;
2090 if ((ms_setting
.stat_freq
> 0) || c
->udp
|| ((c
->currcmd
.cmd
== CMD_SET
) && (item
->exp_time
> 0)))
2092 gettimeofday(&c
->start_time
, NULL
);
2093 if ((c
->currcmd
.cmd
== CMD_SET
) && (item
->exp_time
> 0)) {
2094 /* record the current time */
2095 item
->client_time
= c
->start_time
.tv_sec
;
2098 } /* ms_update_start_time */
2101 * run the state machine
2103 * @param c, pointer of the concurrency
2105 static void ms_drive_machine(ms_conn_t
*c
) {
2114 if (c
->rbytes
>= c
->rvbytes
) {
2115 ms_complete_nread(c
);
2119 if (ms_try_read_line(c
) != 0) {
2124 if (ms_try_read_network(c
) != 0) {
2128 /* doesn't read all the response data, wait event wake up */
2129 if (!c
->currcmd
.isfinish
) {
2130 if (!ms_update_event(c
, EV_READ
| EV_PERSIST
)) {
2131 fprintf(stderr
, "Couldn't update event.\n");
2132 ms_conn_set_state(c
, conn_closing
);
2139 /* we have no command line and no data to read from network, next write */
2140 ms_conn_set_state(c
, conn_write
);
2141 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
2146 if (!c
->ctnwrite
&& ms_need_yield(c
)) {
2149 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
2150 fprintf(stderr
, "Couldn't update event.\n");
2151 ms_conn_set_state(c
, conn_closing
);
2158 if (!c
->ctnwrite
&& (ms_exec_task(c
) != 0)) {
2159 ms_conn_set_state(c
, conn_closing
);
2163 /* record the start time before starting to send data if necessary */
2164 if (!c
->ctnwrite
|| (c
->change_sfd
&& c
->ctnwrite
)) {
2165 if (c
->change_sfd
) {
2166 c
->change_sfd
= false;
2168 ms_update_start_time(c
);
2171 /* change sfd if necessary */
2172 if (c
->change_sfd
) {
2178 /* execute task until nothing need be written to network */
2179 if (!c
->ctnwrite
&& (c
->msgcurr
== c
->msgused
)) {
2180 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
2181 fprintf(stderr
, "Couldn't update event.\n");
2182 ms_conn_set_state(c
, conn_closing
);
2189 switch (ms_transmit(c
)) {
2190 case TRANSMIT_COMPLETE
:
2191 /* we have no data to write to network, next wait repose */
2192 if (!ms_update_event(c
, EV_READ
| EV_PERSIST
)) {
2193 fprintf(stderr
, "Couldn't update event.\n");
2194 ms_conn_set_state(c
, conn_closing
);
2195 c
->ctnwrite
= false;
2198 ms_conn_set_state(c
, conn_read
);
2199 c
->ctnwrite
= false;
2203 case TRANSMIT_INCOMPLETE
: c
->ctnwrite
= true; break; /* Continue in state machine. */
2205 case TRANSMIT_HARD_ERROR
: c
->ctnwrite
= false; break;
2207 case TRANSMIT_SOFT_ERROR
:
2218 /* recovery mode, need reconnect if connection close */
2219 if (ms_setting
.reconnect
2220 && (!ms_global
.time_out
|| ((ms_setting
.run_time
== 0) && (c
->remain_exec_num
> 0))))
2222 if (ms_reconn(c
) != 0) {
2228 ms_reset_conn(c
, false);
2230 if (c
->total_sfds
== 1) {
2231 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
2232 fprintf(stderr
, "Couldn't update event.\n");
2233 ms_conn_set_state(c
, conn_closing
);
2248 } /* ms_drive_machine */
2251 * the event handler of each thread
2253 * @param fd, the file descriptor of socket
2254 * @param which, event flag
2255 * @param arg, argument
2257 void ms_event_handler(const int fd
, const short which
, void *arg
) {
2258 ms_conn_t
*c
= (ms_conn_t
*) arg
;
2266 fprintf(stderr
, "Catastrophic: event fd: %d doesn't match conn fd: %d\n", fd
, c
->sfd
);
2270 assert(fd
== c
->sfd
);
2272 ms_drive_machine(c
);
2274 /* wait for next event */
2275 } /* ms_event_handler */
2278 * get the next socket descriptor index to run for replication
2280 * @param c, pointer of the concurrency
2281 * @param cmd, command(get or set )
2283 * @return int, if success, return the index, else return EXIT_SUCCESS
2285 static uint32_t ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
) {
2286 uint32_t sock_index
= 0;
2289 if (c
->total_sfds
== 1) {
2290 return EXIT_SUCCESS
;
2293 if (ms_setting
.rep_write_srv
== 0) {
2298 if (cmd
== CMD_SET
) {
2299 for (i
= 0; i
< ms_setting
.rep_write_srv
; i
++) {
2300 if (c
->tcpsfd
[i
] > 0) {
2305 if (i
== ms_setting
.rep_write_srv
) {
2306 /* random get one replication server to read */
2307 sock_index
= (uint32_t) random() % c
->total_sfds
;
2309 /* random get one replication writing server to write */
2310 sock_index
= (uint32_t) random() % ms_setting
.rep_write_srv
;
2312 } else if (cmd
== CMD_GET
) {
2313 /* random get one replication server to read */
2314 sock_index
= (uint32_t) random() % c
->total_sfds
;
2316 } while (c
->tcpsfd
[sock_index
] == 0);
2319 } /* ms_get_rep_sock_index */
2322 * get the next socket descriptor index to run
2324 * @param c, pointer of the concurrency
2326 * @return int, return the index
2328 static uint32_t ms_get_next_sock_index(ms_conn_t
*c
) {
2329 uint32_t sock_index
= 0;
2332 sock_index
= (++c
->cur_idx
== c
->total_sfds
) ? 0 : c
->cur_idx
;
2333 } while (c
->tcpsfd
[sock_index
] == 0);
2336 } /* ms_get_next_sock_index */
2339 * update socket event of the connections
2341 * @param c, pointer of the concurrency
2343 * @return int, if success, return EXIT_SUCCESS, else return -1
2345 static int ms_update_conn_sock_event(ms_conn_t
*c
) {
2348 switch (c
->currcmd
.cmd
) {
2350 if (ms_setting
.facebook_test
&& c
->udp
) {
2351 c
->sfd
= c
->tcpsfd
[0];
2353 c
->change_sfd
= true;
2358 if (ms_setting
.facebook_test
&& !c
->udp
) {
2361 c
->change_sfd
= true;
2368 if (!c
->udp
&& (c
->total_sfds
> 1)) {
2369 if (c
->cur_idx
!= c
->total_sfds
) {
2370 if (ms_setting
.rep_write_srv
== 0) {
2371 c
->cur_idx
= ms_get_next_sock_index(c
);
2373 c
->cur_idx
= ms_get_rep_sock_index(c
, c
->currcmd
.cmd
);
2376 /* must select the first sock of the connection at the beginning */
2380 c
->sfd
= c
->tcpsfd
[c
->cur_idx
];
2381 assert(c
->sfd
!= 0);
2382 c
->change_sfd
= true;
2385 if (c
->change_sfd
) {
2386 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
2387 fprintf(stderr
, "Couldn't update event.\n");
2388 ms_conn_set_state(c
, conn_closing
);
2393 return EXIT_SUCCESS
;
2394 } /* ms_update_conn_sock_event */
2397 * for ASCII protocol, this function build the set command
2398 * string and send the command.
2400 * @param c, pointer of the concurrency
2401 * @param item, pointer of task item which includes the object
2404 * @return int, if success, return EXIT_SUCCESS, else return -1
2406 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
) {
2409 char *buffer
= c
->wbuf
;
2411 write_len
= snprintf(buffer
, c
->wsize
, " %u %d %d\r\n", 0, item
->exp_time
, item
->value_size
);
2413 if (write_len
> c
->wsize
|| write_len
< 0) {
2414 /* ought to be always enough. just fail for simplicity */
2415 fprintf(stderr
, "output command line too long.\n");
2419 if (item
->value_offset
== INVALID_OFFSET
) {
2420 value_offset
= item
->key_suffix_offset
;
2422 value_offset
= item
->value_offset
;
2425 if ((ms_add_iov(c
, "set ", 4) != 0)
2426 || (ms_add_iov(c
, (char *) &item
->key_prefix
, (int) KEY_PREFIX_SIZE
) != 0)
2427 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2428 item
->key_size
- (int) KEY_PREFIX_SIZE
)
2430 || (ms_add_iov(c
, buffer
, write_len
) != 0)
2431 || (ms_add_iov(c
, &ms_setting
.char_block
[value_offset
], item
->value_size
) != 0)
2432 || (ms_add_iov(c
, "\r\n", 2) != 0) || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
2437 return EXIT_SUCCESS
;
2438 } /* ms_build_ascii_write_buf_set */
2441 * used to send set command to server
2443 * @param c, pointer of the concurrency
2444 * @param item, pointer of task item which includes the object
2447 * @return int, if success, return EXIT_SUCCESS, else return -1
2449 int ms_mcd_set(ms_conn_t
*c
, ms_task_item_t
*item
) {
2452 c
->currcmd
.cmd
= CMD_SET
;
2453 c
->currcmd
.isfinish
= false;
2454 c
->currcmd
.retstat
= MCD_FAILURE
;
2456 if (ms_update_conn_sock_event(c
) != 0) {
2463 if (ms_add_msghdr(c
) != 0) {
2464 fprintf(stderr
, "Out of memory preparing request.");
2468 /* binary protocol */
2469 if (c
->protocol
== binary_prot
) {
2470 if (ms_build_bin_write_buf_set(c
, item
) != 0) {
2474 if (ms_build_ascii_write_buf_set(c
, item
) != 0) {
2479 atomic_add_size(&ms_stats
.obj_bytes
, item
->key_size
+ item
->value_size
);
2480 atomic_add_size(&ms_stats
.cmd_set
, 1);
2482 return EXIT_SUCCESS
;
2486 * for ASCII protocol, this function build the get command
2487 * string and send the command.
2489 * @param c, pointer of the concurrency
2490 * @param item, pointer of task item which includes the object
2493 * @return int, if success, return EXIT_SUCCESS, else return -1
2495 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
) {
2496 if ((ms_add_iov(c
, "get ", 4) != 0)
2497 || (ms_add_iov(c
, (char *) &item
->key_prefix
, (int) KEY_PREFIX_SIZE
) != 0)
2498 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2499 item
->key_size
- (int) KEY_PREFIX_SIZE
)
2501 || (ms_add_iov(c
, "\r\n", 2) != 0) || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
2506 return EXIT_SUCCESS
;
2507 } /* ms_build_ascii_write_buf_get */
2510 * used to send the get command to server
2512 * @param c, pointer of the concurrency
2513 * @param item, pointer of task item which includes the object
2516 * @return int, if success, return EXIT_SUCCESS, else return -1
2518 int ms_mcd_get(ms_conn_t
*c
, ms_task_item_t
*item
) {
2521 c
->currcmd
.cmd
= CMD_GET
;
2522 c
->currcmd
.isfinish
= false;
2523 c
->currcmd
.retstat
= MCD_FAILURE
;
2525 if (ms_update_conn_sock_event(c
) != 0) {
2532 if (ms_add_msghdr(c
) != 0) {
2533 fprintf(stderr
, "Out of memory preparing request.");
2537 /* binary protocol */
2538 if (c
->protocol
== binary_prot
) {
2539 if (ms_build_bin_write_buf_get(c
, item
) != 0) {
2543 if (ms_build_ascii_write_buf_get(c
, item
) != 0) {
2548 atomic_add_size(&ms_stats
.cmd_get
, 1);
2550 return EXIT_SUCCESS
;
2554 * for ASCII protocol, this function build the multi-get command
2555 * string and send the command.
2557 * @param c, pointer of the concurrency
2559 * @return int, if success, return EXIT_SUCCESS, else return -1
2561 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
) {
2562 ms_task_item_t
*item
;
2564 if (ms_add_iov(c
, "get", 3) != 0) {
2568 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
2569 item
= c
->mlget_task
.mlget_item
[i
].item
;
2570 assert(item
!= NULL
);
2571 if ((ms_add_iov(c
, " ", 1) != 0)
2572 || (ms_add_iov(c
, (char *) &item
->key_prefix
, (int) KEY_PREFIX_SIZE
) != 0)
2573 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2574 item
->key_size
- (int) KEY_PREFIX_SIZE
)
2581 if ((ms_add_iov(c
, "\r\n", 2) != 0) || (c
->udp
&& (ms_build_udp_headers(c
) != 0))) {
2585 return EXIT_SUCCESS
;
2586 } /* ms_build_ascii_write_buf_mlget */
2589 * used to send the multi-get command to server
2591 * @param c, pointer of the concurrency
2593 * @return int, if success, return EXIT_SUCCESS, else return -1
2595 int ms_mcd_mlget(ms_conn_t
*c
) {
2596 ms_task_item_t
*item
;
2599 assert(c
->mlget_task
.mlget_num
>= 1);
2601 c
->currcmd
.cmd
= CMD_GET
;
2602 c
->currcmd
.isfinish
= false;
2603 c
->currcmd
.retstat
= MCD_FAILURE
;
2605 if (ms_update_conn_sock_event(c
) != 0) {
2612 if (ms_add_msghdr(c
) != 0) {
2613 fprintf(stderr
, "Out of memory preparing request.");
2617 /* binary protocol */
2618 if (c
->protocol
== binary_prot
) {
2619 if (ms_build_bin_write_buf_mlget(c
) != 0) {
2623 if (ms_build_ascii_write_buf_mlget(c
) != 0) {
2628 /* decrease operation time of each item */
2629 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
2630 item
= c
->mlget_task
.mlget_item
[i
].item
;
2631 atomic_add_size(&ms_stats
.cmd_get
, 1);
2636 return EXIT_SUCCESS
;
2637 } /* ms_mcd_mlget */
2640 * binary protocol support
2644 * for binary protocol, parse the response of server
2646 * @param c, pointer of the concurrency
2648 * @return int, if success, return EXIT_SUCCESS, else return -1
2650 static int ms_bin_process_response(ms_conn_t
*c
) {
2651 const char *errstr
= NULL
;
2655 uint32_t bodylen
= c
->binary_header
.response
.bodylen
;
2656 uint8_t opcode
= c
->binary_header
.response
.opcode
;
2657 uint16_t status
= c
->binary_header
.response
.status
;
2660 c
->rvbytes
= (int32_t) bodylen
;
2662 return EXIT_FAILURE
;
2665 case PROTOCOL_BINARY_RESPONSE_SUCCESS
:
2666 if (opcode
== PROTOCOL_BINARY_CMD_SET
) {
2667 c
->currcmd
.retstat
= MCD_STORED
;
2668 } else if (opcode
== PROTOCOL_BINARY_CMD_DELETE
) {
2669 c
->currcmd
.retstat
= MCD_DELETED
;
2670 } else if (opcode
== PROTOCOL_BINARY_CMD_GET
) {
2671 c
->currcmd
.retstat
= MCD_END
;
2675 case PROTOCOL_BINARY_RESPONSE_ENOMEM
:
2676 errstr
= "Out of memory";
2677 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
2680 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
:
2681 errstr
= "Unknown command";
2682 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
2685 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
:
2686 errstr
= "Not found";
2687 c
->currcmd
.retstat
= MCD_NOTFOUND
;
2690 case PROTOCOL_BINARY_RESPONSE_EINVAL
:
2691 errstr
= "Invalid arguments";
2692 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
2695 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
: errstr
= "Data exists for key."; break;
2697 case PROTOCOL_BINARY_RESPONSE_E2BIG
:
2698 errstr
= "Too large.";
2699 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
2702 case PROTOCOL_BINARY_RESPONSE_NOT_STORED
:
2703 errstr
= "Not stored.";
2704 c
->currcmd
.retstat
= MCD_NOTSTORED
;
2708 errstr
= "Unknown error";
2709 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
2713 if (errstr
!= NULL
) {
2714 fprintf(stderr
, "%s\n", errstr
);
2718 return EXIT_SUCCESS
;
2719 } /* ms_bin_process_response */
2721 /* build binary header and add the header to the buffer to send */
2724 * build binary header and add the header to the buffer to send
2726 * @param c, pointer of the concurrency
2727 * @param opcode, operation code
2728 * @param hdr_len, length of header
2729 * @param key_len, length of key
2730 * @param body_len. length of body
2732 static void ms_add_bin_header(ms_conn_t
*c
, uint8_t opcode
, uint8_t hdr_len
, uint16_t key_len
,
2733 uint32_t body_len
) {
2734 protocol_binary_request_header
*header
;
2738 header
= (protocol_binary_request_header
*) c
->wcurr
;
2740 header
->request
.magic
= (uint8_t) PROTOCOL_BINARY_REQ
;
2741 header
->request
.opcode
= (uint8_t) opcode
;
2742 header
->request
.keylen
= htons(key_len
);
2744 header
->request
.extlen
= (uint8_t) hdr_len
;
2745 header
->request
.datatype
= (uint8_t) PROTOCOL_BINARY_RAW_BYTES
;
2746 header
->request
.vbucket
= 0;
2748 header
->request
.bodylen
= htonl(body_len
);
2749 header
->request
.opaque
= 0;
2750 header
->request
.cas
= 0;
2752 ms_add_iov(c
, c
->wcurr
, sizeof(header
->request
));
2753 } /* ms_add_bin_header */
2756 * add the key to the socket write buffer array
2758 * @param c, pointer of the concurrency
2759 * @param item, pointer of task item which includes the object
2762 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
) {
2763 ms_add_iov(c
, (char *) &item
->key_prefix
, (int) KEY_PREFIX_SIZE
);
2764 ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2765 item
->key_size
- (int) KEY_PREFIX_SIZE
);
2769 * for binary protocol, this function build the set command
2770 * and add the command to send buffer array.
2772 * @param c, pointer of the concurrency
2773 * @param item, pointer of task item which includes the object
2776 * @return int, if success, return EXIT_SUCCESS, else return -1
2778 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
) {
2779 assert(c
->wbuf
== c
->wcurr
);
2782 protocol_binary_request_set
*rep
= (protocol_binary_request_set
*) c
->wcurr
;
2783 uint16_t keylen
= (uint16_t) item
->key_size
;
2785 (uint32_t) sizeof(rep
->message
.body
) + (uint32_t) keylen
+ (uint32_t) item
->value_size
;
2787 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_SET
, sizeof(rep
->message
.body
), keylen
, bodylen
);
2788 rep
->message
.body
.flags
= 0;
2789 rep
->message
.body
.expiration
= htonl((uint32_t) item
->exp_time
);
2790 ms_add_iov(c
, &rep
->message
.body
, sizeof(rep
->message
.body
));
2791 ms_add_key_to_iov(c
, item
);
2793 if (item
->value_offset
== INVALID_OFFSET
) {
2794 value_offset
= item
->key_suffix_offset
;
2796 value_offset
= item
->value_offset
;
2798 ms_add_iov(c
, &ms_setting
.char_block
[value_offset
], item
->value_size
);
2800 return EXIT_SUCCESS
;
2801 } /* ms_build_bin_write_buf_set */
2804 * for binary protocol, this function build the get command and
2805 * add the command to send buffer array.
2807 * @param c, pointer of the concurrency
2808 * @param item, pointer of task item which includes the object
2811 * @return int, if success, return EXIT_SUCCESS, else return -1
2813 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
) {
2814 assert(c
->wbuf
== c
->wcurr
);
2816 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_GET
, 0, (uint16_t) item
->key_size
,
2817 (uint32_t) item
->key_size
);
2818 ms_add_key_to_iov(c
, item
);
2820 return EXIT_SUCCESS
;
2821 } /* ms_build_bin_write_buf_get */
2824 * for binary protocol, this function build the multi-get
2825 * command and add the command to send buffer array.
2827 * @param c, pointer of the concurrency
2828 * @param item, pointer of task item which includes the object
2831 * @return int, if success, return EXIT_SUCCESS, else return -1
2833 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
) {
2834 ms_task_item_t
*item
;
2836 assert(c
->wbuf
== c
->wcurr
);
2838 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
2839 item
= c
->mlget_task
.mlget_item
[i
].item
;
2840 assert(item
!= NULL
);
2842 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_GET
, 0, (uint16_t) item
->key_size
,
2843 (uint32_t) item
->key_size
);
2844 ms_add_key_to_iov(c
, item
);
2845 c
->wcurr
+= sizeof(protocol_binary_request_get
);
2850 return EXIT_SUCCESS
;
2851 } /* ms_build_bin_write_buf_mlget */