3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
16 #include <netinet/tcp.h>
17 #include <arpa/inet.h>
18 #include "ms_setting.h"
19 #include "ms_thread.h"
21 /* for network write */
22 #define TRANSMIT_COMPLETE 0
23 #define TRANSMIT_INCOMPLETE 1
24 #define TRANSMIT_SOFT_ERROR 2
25 #define TRANSMIT_HARD_ERROR 3
27 /* for generating key */
28 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
29 #define KEY_PREFIX_MASK 0x1010101010101010
31 /* For parse the value length return by server */
33 #define VALUELEN_TOKEN 3
35 /* global increasing counter, to ensure the key prefix unique */
36 static uint64_t key_prefix_seq
= KEY_PREFIX_BASE
;
38 /* global increasing counter, generating request id for UDP */
39 static int udp_request_id
= 0;
41 extern __thread ms_thread_t ms_thread
;
43 /* generate upd request id */
44 static int ms_get_udp_request_id(void);
46 /* connect initialize */
47 static void ms_task_init(ms_conn_t
*c
);
48 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
);
49 static int ms_conn_sock_init(ms_conn_t
*c
);
50 static int ms_conn_event_init(ms_conn_t
*c
);
51 static int ms_conn_init(ms_conn_t
*c
, const int init_state
,
52 const int read_buffer_size
, const bool is_udp
);
53 static void ms_warmup_num_init(ms_conn_t
*c
);
54 static int ms_item_win_init(ms_conn_t
*c
);
56 /* connection close */
57 void ms_conn_free(ms_conn_t
*c
);
58 static void ms_conn_close(ms_conn_t
*c
);
60 /* create network connection */
61 static int ms_new_socket(struct addrinfo
*ai
);
62 static void ms_maximize_sndbuf(const int sfd
);
63 static int ms_network_connect(ms_conn_t
*c
, char * srv_host_name
,
64 const int srv_port
, const bool is_udp
, int *ret_sfd
);
65 static int ms_reconn(ms_conn_t
*c
);
68 static int ms_tokenize_command(char *command
, token_t
*tokens
, const int max_tokens
);
69 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
);
70 static int ms_try_read_line(ms_conn_t
*c
);
71 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
);
72 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
);
73 static int ms_try_read_network(ms_conn_t
*c
);
74 static void ms_verify_value(ms_conn_t
*c
, ms_mlget_task_item_t
*mlget_item
,
75 char *value
, int vlen
);
76 static void ms_ascii_complete_nread(ms_conn_t
*c
);
77 static void ms_bin_complete_nread(ms_conn_t
*c
);
78 static void ms_complete_nread(ms_conn_t
*c
);
81 static int ms_add_msghdr(ms_conn_t
*c
);
82 static int ms_ensure_iov_space(ms_conn_t
*c
);
83 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
);
84 static int ms_build_udp_headers(ms_conn_t
*c
);
85 static int ms_transmit(ms_conn_t
*c
);
87 /* status adjustment */
88 static void ms_conn_shrink(ms_conn_t
*c
);
89 static void ms_conn_set_state(ms_conn_t
*c
, int state
);
90 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
);
91 static int ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
);
92 static int ms_get_next_sock_index(ms_conn_t
*c
);
93 static int ms_update_conn_sock_event(ms_conn_t
*c
);
94 static bool ms_need_yield(ms_conn_t
*c
);
95 static void ms_update_start_time(ms_conn_t
*c
);
98 static void ms_drive_machine(ms_conn_t
*c
);
99 void ms_event_handler(const int fd
, const short which
, void *arg
);
102 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
103 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
104 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
);
106 /* binary protocol */
107 static int ms_bin_process_response(ms_conn_t
*c
);
108 static void ms_add_bin_header(ms_conn_t
*c
, uint8_t opcode
, uint8_t hdr_len
,
109 uint16_t key_len
, uint32_t body_len
);
110 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
);
111 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
112 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
113 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
);
116 * each key has two parts, prefix and suffix. The suffix is a
117 * string random get form the character table. The prefix is a
118 * uint64_t variable. And the prefix must be unique. we use the
119 * prefix to identify a key. And the prefix can't include
120 * character ' ' '\r' '\n' '\0'.
124 uint64_t ms_get_key_prefix(void)
128 pthread_mutex_lock(&ms_global
.seq_mutex
);
129 key_prefix_seq
|= KEY_PREFIX_MASK
;
130 key_prefix
= key_prefix_seq
;
132 pthread_mutex_unlock(&ms_global
.seq_mutex
);
138 * get an unique udp request id
140 * @return an unique UDP request id
142 static int ms_get_udp_request_id(void)
144 return(__sync_fetch_and_add(&udp_request_id
, 1));
148 * initialize current task structure
150 * @param c, pointer of the concurrency
152 static void ms_task_init(ms_conn_t
*c
)
154 c
->curr_task
.cmd
= CMD_NULL
;
155 c
->curr_task
.item
= 0;
156 c
->curr_task
.verify
= false;
157 c
->curr_task
.finish_verify
= true;
158 c
->curr_task
.get_miss
= true;
160 c
->curr_task
.get_opt
= 0;
161 c
->curr_task
.set_opt
= 0;
162 c
->curr_task
.cycle_undo_get
= 0;
163 c
->curr_task
.cycle_undo_set
= 0;
164 c
->curr_task
.verified_get
= 0;
165 c
->curr_task
.overwrite_set
= 0;
169 * initialize udp for the connection structure
171 * @param c, pointer of the concurrency
172 * @param is_udp, whether it's udp
174 * @return int, if success, return 0, else return -1
176 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
)
182 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
193 if (c
->udp
|| (!c
->udp
&& ms_setting
.facebook_test
)) {
194 c
->rudpbuf
= (char *)malloc((size_t)c
->rudpsize
);
195 c
->udppkt
= (ms_udppkt_t
*)malloc(MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
197 if (c
->rudpbuf
== NULL
|| c
->udppkt
== NULL
) {
198 if (c
->rudpbuf
!= NULL
) free(c
->rudpbuf
);
199 if (c
->udppkt
!= NULL
) free(c
->udppkt
);
200 fprintf(stderr
, "malloc()\n");
203 memset(c
->udppkt
, 0, MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
210 * initialize the connection structure
212 * @param c, pointer of the concurrency
213 * @param init_state, (conn_read, conn_write, conn_closing)
214 * @param read_buffer_size
215 * @param is_udp, whether it's udp
217 * @return int, if success, return 0, else return -1
219 static int ms_conn_init(ms_conn_t
*c
, const int init_state
,
220 const int read_buffer_size
, const bool is_udp
)
224 c
->rbuf
= c
->wbuf
= 0;
228 c
->rsize
= read_buffer_size
;
229 c
->wsize
= WRITE_BUFFER_SIZE
;
230 c
->iovsize
= IOV_LIST_INITIAL
;
231 c
->msgsize
= MSG_LIST_INITIAL
;
233 /* for replication, each connection need connect all the server */
234 if (ms_setting
.rep_write_srv
> 0) {
235 c
->total_sfds
= ms_setting
.srv_cnt
;
237 c
->total_sfds
= ms_setting
.sock_per_conn
;
241 c
->rbuf
= (char *)malloc((size_t)c
->rsize
);
242 c
->wbuf
= (char *)malloc((size_t)c
->wsize
);
243 c
->iov
= (struct iovec
*)malloc(sizeof(struct iovec
) * (size_t)c
->iovsize
);
244 c
->msglist
= (struct msghdr
*)malloc(sizeof(struct msghdr
) * (size_t)c
->msgsize
);
245 if (ms_setting
.mult_key_num
> 1) {
246 c
->mlget_task
.mlget_item
= (ms_mlget_task_item_t
*)
247 malloc(sizeof(ms_mlget_task_item_t
) * (size_t)ms_setting
.mult_key_num
);
249 c
->tcpsfd
= (int *)malloc((size_t)c
->total_sfds
* sizeof(int));
251 if (c
->rbuf
== NULL
|| c
->wbuf
== NULL
|| c
->iov
== NULL
252 || c
->msglist
== NULL
|| c
->tcpsfd
== NULL
253 || (ms_setting
.mult_key_num
> 1 && c
->mlget_task
.mlget_item
== NULL
)) {
255 if (c
->rbuf
!= NULL
) free(c
->rbuf
);
256 if (c
->wbuf
!= NULL
) free(c
->wbuf
);
257 if (c
->iov
!= NULL
) free(c
->iov
);
258 if (c
->msglist
!= NULL
) free(c
->msglist
);
259 if (c
->mlget_task
.mlget_item
!= NULL
) free(c
->mlget_task
.mlget_item
);
260 if (c
->tcpsfd
!= NULL
) free(c
->tcpsfd
);
261 fprintf(stderr
, "malloc()\n");
265 c
->state
= init_state
;
273 c
->cur_idx
= c
->total_sfds
; /* default index is a invalid value */
277 c
->change_sfd
= false;
279 c
->precmd
.cmd
= c
->currcmd
.cmd
= CMD_NULL
;
280 c
->precmd
.isfinish
= true; /* default the previous command finished */
281 c
->currcmd
.isfinish
= false;
282 c
->precmd
.retstat
= c
->currcmd
.retstat
= MCD_FAILURE
;
283 c
->precmd
.key_prefix
= c
->currcmd
.key_prefix
= 0;
285 c
->mlget_task
.mlget_num
= 0;
286 c
->mlget_task
.value_index
= -1; /* default invalid value */
288 if (ms_setting
.binary_prot
) {
289 c
->protocol
= binary_prot
;
291 c
->protocol
= ascii_udp_prot
;
293 c
->protocol
= ascii_prot
;
297 if (ms_conn_udp_init(c
, is_udp
) != 0) {
301 /* initialize task */
304 if (!(ms_setting
.facebook_test
&& is_udp
)) {
305 __sync_fetch_and_add(&ms_stats
.active_conns
, 1);
312 * when doing 100% get operation, it could preset some objects
313 * to warmup the server. this function is used to initialize the
314 * number of the objects to preset.
316 * @param c, pointer of the concurrency
318 static void ms_warmup_num_init(ms_conn_t
*c
)
320 /* no set operation, preset all the items in the window */
321 if (ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
< PROP_ERROR
) {
322 c
->warmup_num
= c
->win_size
;
323 c
->remain_warmup_num
= c
->warmup_num
;
326 c
->remain_warmup_num
= c
->warmup_num
;
331 * each connection has an item window, this function initialize
332 * the window. The window is used to generate task.
334 * @param c, pointer of the concurrency
336 * @return int, if success, return 0, else return -1
338 static int ms_item_win_init(ms_conn_t
*c
)
342 c
->win_size
= (int)ms_setting
.win_size
;
344 c
->exec_num
= ms_thread
.thread_ctx
->exec_num_perconn
;
345 c
->remain_exec_num
= c
->exec_num
;
347 c
->item_win
= (ms_task_item_t
*)malloc(sizeof(ms_task_item_t
) * (size_t)c
->win_size
);
348 if (c
->item_win
== NULL
) {
349 fprintf(stderr
, "Can't allocate task item array for conn.\n");
352 memset(c
->item_win
, 0, sizeof(ms_task_item_t
) * (size_t)c
->win_size
);
354 for (int i
= 0; i
< c
->win_size
; i
++) {
355 c
->item_win
[i
].key_size
= (int)ms_setting
.distr
[i
].key_size
;
356 c
->item_win
[i
].key_prefix
= ms_get_key_prefix();
357 c
->item_win
[i
].key_suffix_offset
= ms_setting
.distr
[i
].key_offset
;
358 c
->item_win
[i
].value_size
= (int)ms_setting
.distr
[i
].value_size
;
359 c
->item_win
[i
].value_offset
= INVALID_OFFSET
; /* default in invalid offset */
360 c
->item_win
[i
].client_time
= 0;
362 /* set expire time base on the proportion */
363 if (exp_cnt
< ms_setting
.exp_ver_per
* i
) {
364 c
->item_win
[i
].exp_time
= FIXED_EXPIRE_TIME
;
367 c
->item_win
[i
].exp_time
= 0;
371 ms_warmup_num_init(c
);
377 * each connection structure can include one or more sock
378 * handlers. this function create these socks and connect the
381 * @param c, pointer of the concurrency
383 * @return int, if success, return 0, else return -1
385 static int ms_conn_sock_init(ms_conn_t
*c
)
392 assert(c
->tcpsfd
!= NULL
);
394 for (i
= 0; i
< c
->total_sfds
; i
++) {
396 if (ms_setting
.rep_write_srv
> 0) {
397 /* for replication, each connection need connect all the server */
400 /* all the connections in a thread connects the same server */
401 srv_idx
= ms_thread
.thread_ctx
->srv_idx
;
404 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
405 ms_setting
.servers
[srv_idx
].srv_port
,
406 ms_setting
.udp
, &ret_sfd
) != 0) {
414 if (!ms_setting
.udp
) {
415 c
->tcpsfd
[i
] = ret_sfd
;
421 /* initialize udp sock handler if necessary */
422 if (ms_setting
.facebook_test
) {
424 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
425 ms_setting
.servers
[srv_idx
].srv_port
,
426 true, &ret_sfd
) != 0) {
433 if (i
!= c
->total_sfds
|| (ms_setting
.facebook_test
&& c
->udpsfd
== 0)) {
434 if (ms_setting
.udp
) {
437 for (int j
= 0; j
< i
; j
++) {
442 if (c
->udpsfd
!= 0) {
453 * each connection is managed by libevent, this function
454 * initialize the event of the connection structure.
456 * @param c, pointer of the concurrency
458 * @return int, if success, return 0, else return -1
460 static int ms_conn_event_init(ms_conn_t
*c
)
462 /* default event timeout 10 seconds */
463 struct timeval t
= {.tv_sec
= EVENT_TIMEOUT
, .tv_usec
= 0};
464 short event_flags
= EV_WRITE
| EV_PERSIST
;
466 event_set(&c
->event
, c
->sfd
, event_flags
, ms_event_handler
, (void *)c
);
467 event_base_set(ms_thread
.base
, &c
->event
);
468 c
->ev_flags
= event_flags
;
470 if (c
->total_sfds
== 1) {
471 if (event_add(&c
->event
, NULL
) == -1) {
475 if (event_add(&c
->event
, &t
) == -1) {
484 * setup a connection, each connection structure of each
485 * thread must call this function to initialize.
487 * @param c, pointer of the concurrency
489 * @return int, if success, return 0, else return -1
491 int ms_setup_conn(ms_conn_t
*c
)
493 if (ms_item_win_init(c
) != 0) {
497 if (ms_conn_init(c
, conn_write
, DATA_BUFFER_SIZE
, ms_setting
.udp
) != 0) {
501 if (ms_conn_sock_init(c
) != 0) {
505 if (ms_conn_event_init(c
) != 0) {
513 * Frees a connection.
515 * @param c, pointer of the concurrency
517 void ms_conn_free(ms_conn_t
*c
)
520 if (c
->hdrbuf
!= NULL
)
522 if (c
->msglist
!= NULL
)
530 if (c
->mlget_task
.mlget_item
!= NULL
)
531 free(c
->mlget_task
.mlget_item
);
532 if (c
->rudpbuf
!= NULL
)
534 if (c
->udppkt
!= NULL
)
536 if (c
->item_win
!= NULL
)
538 if (c
->tcpsfd
!= NULL
)
541 if (--ms_thread
.nactive_conn
== 0) {
542 free(ms_thread
.conn
);
550 * @param c, pointer of the concurrency
552 static void ms_conn_close(ms_conn_t
*c
)
556 /* delete the event, the socket and the connection */
557 event_del(&c
->event
);
559 for (int i
= 0; i
< c
->total_sfds
; i
++) {
560 if (c
->tcpsfd
[i
] > 0) {
566 if (ms_setting
.facebook_test
) {
570 __sync_fetch_and_sub(&ms_stats
.active_conns
, 1);
574 if (ms_setting
.run_time
== 0) {
575 pthread_mutex_lock(&ms_global
.run_lock
.lock
);
576 ms_global
.run_lock
.count
++;
577 pthread_cond_signal(&ms_global
.run_lock
.cond
);
578 pthread_mutex_unlock(&ms_global
.run_lock
.lock
);
581 if (ms_thread
.nactive_conn
== 0) {
589 * @param ai, server address information
591 * @return int, if success, return 0, else return -1
593 static int ms_new_socket(struct addrinfo
*ai
)
597 if ((sfd
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
)) == -1) {
598 fprintf(stderr
, "socket() error: %s.\n", strerror(errno
));
606 * Sets a socket's send buffer size to the maximum allowed by the system.
608 * @param sfd, file descriptor of socket
610 static void ms_maximize_sndbuf(const int sfd
)
612 socklen_t intsize
= sizeof(int);
613 unsigned int last_good
= 0;
614 unsigned int min
, max
, avg
;
615 unsigned int old_size
;
617 /* Start with the default size. */
618 if (getsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, &old_size
, &intsize
) != 0) {
619 fprintf(stderr
, "getsockopt(SO_SNDBUF)\n");
623 /* Binary-search for the real maximum. */
625 max
= MAX_SENDBUF_SIZE
;
628 avg
= ((unsigned int)(min
+ max
)) / 2;
629 if (setsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, (void *)&avg
, intsize
) == 0) {
639 * socket connects the server
641 * @param c, pointer of the concurrency
642 * @param srv_host_name, the host name of the server
643 * @param srv_port, port of server
644 * @param is_udp, whether it's udp
645 * @param ret_sfd, the connected socket file descriptor
647 * @return int, if success, return 0, else return -1
649 static int ms_network_connect(ms_conn_t
*c
, char * srv_host_name
,
650 const int srv_port
, const bool is_udp
, int *ret_sfd
)
653 struct linger ling
= {0, 0};
655 struct addrinfo
*next
;
656 struct addrinfo hints
;
657 char port_buf
[NI_MAXSERV
];
664 * the memset call clears nonstandard fields in some impementations
665 * that otherwise mess things up.
667 memset(&hints
, 0, sizeof (hints
));
668 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
670 hints
.ai_protocol
= IPPROTO_UDP
;
671 hints
.ai_socktype
= SOCK_DGRAM
;
672 hints
.ai_family
= AF_INET
; /* This left here because of issues with OSX 10.5 */
674 hints
.ai_family
= AF_UNSPEC
;
675 hints
.ai_protocol
= IPPROTO_TCP
;
676 hints
.ai_socktype
= SOCK_STREAM
;
679 snprintf(port_buf
, NI_MAXSERV
, "%d", srv_port
);
680 error
= getaddrinfo(srv_host_name
, port_buf
, &hints
, &ai
);
682 if (error
!= EAI_SYSTEM
)
683 fprintf(stderr
, "getaddrinfo(): %s.\n", gai_strerror(error
));
685 perror("getaddrinfo()\n");
690 for (next
= ai
; next
; next
= next
->ai_next
) {
691 if ((sfd
= ms_new_socket(next
)) == -1) {
696 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
));
698 ms_maximize_sndbuf(sfd
);
700 setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
));
701 setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
702 setsockopt(sfd
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
, sizeof(flags
));
706 c
->srv_recv_addr_size
= sizeof(struct sockaddr
);
707 memcpy(&c
->srv_recv_addr
, next
->ai_addr
, c
->srv_recv_addr_size
);
709 if (connect(sfd
, next
->ai_addr
, next
->ai_addrlen
) == -1) {
716 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
717 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
718 fprintf(stderr
, "setting O_NONBLOCK\n");
724 if (ret_sfd
!= NULL
) {
733 /* Return zero if we detected no errors in starting up connections */
738 * reconnect a disconnected sock
740 * @param c, pointer of the concurrency
742 * @return int, if success, return 0, else return -1
744 static int ms_reconn(ms_conn_t
*c
)
747 int srv_conn_cnt
= 0;
749 if (ms_setting
.rep_write_srv
> 0) {
750 srv_idx
= c
->cur_idx
;
751 srv_conn_cnt
= ms_setting
.nconns
;
753 srv_idx
= ms_thread
.thread_ctx
->srv_idx
;
754 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
757 /* close the old socket handler */
759 c
->tcpsfd
[c
->cur_idx
] = 0;
761 if (__sync_fetch_and_add(&ms_setting
.servers
[srv_idx
].disconn_cnt
, 1)
762 % srv_conn_cnt
== 0) {
764 gettimeofday(&ms_setting
.servers
[srv_idx
].disconn_time
, NULL
);
765 fprintf(stderr
, "Server %s:%d disconnect\n",
766 ms_setting
.servers
[srv_idx
].srv_host_name
,
767 ms_setting
.servers
[srv_idx
].srv_port
);
770 if (ms_setting
.rep_write_srv
> 0) {
772 for (i
= 0; i
< c
->total_sfds
; i
++) {
773 if (c
->tcpsfd
[i
] != 0) {
778 /* all socks disconnect */
779 if (i
== c
->total_sfds
) {
784 /* reconnect success, break the loop */
785 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
786 ms_setting
.servers
[srv_idx
].srv_port
,
787 ms_setting
.udp
, &c
->sfd
) == 0) {
789 c
->tcpsfd
[c
->cur_idx
] = c
->sfd
;
790 if (__sync_fetch_and_add(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1)
791 % srv_conn_cnt
== 0) {
793 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
794 int reconn_time
= (int)(ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
-
795 ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
);
796 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
797 ms_setting
.servers
[srv_idx
].srv_host_name
,
798 ms_setting
.servers
[srv_idx
].srv_port
, reconn_time
);
803 if (c
->total_sfds
== 1) {
804 /* wait a second and reconnect */
807 } while (c
->total_sfds
== 1);
810 if (c
->total_sfds
> 1 && c
->tcpsfd
[c
->cur_idx
] == 0) {
819 * reconnect several disconnected socks in the connection
820 * structure, the ever-1-second timer of the thread will check
821 * whether some socks in the connections disconnect. if
822 * disconnect, reconnect the sock.
824 * @param c, pointer of the concurrency
826 * @return int, if success, return 0, else return -1
828 int ms_reconn_socks(ms_conn_t
*c
)
832 int srv_conn_cnt
= 0;
833 struct timeval cur_time
;
837 if (c
->total_sfds
== 1 || c
->total_sfds
== c
->alive_sfds
) {
841 for (int i
= 0; i
< c
->total_sfds
; i
++) {
842 if (c
->tcpsfd
[i
] == 0) {
843 gettimeofday(&cur_time
, NULL
);
846 * For failover test of replication, reconnect the socks after
847 * it disconnects more than 5 seconds, Otherwise memslap will
848 * block at connect() function and the work threads can't work
851 if (cur_time
.tv_sec
- ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
< 5) {
855 if (ms_setting
.rep_write_srv
> 0) {
857 srv_conn_cnt
= ms_setting
.nconns
;
859 srv_idx
= ms_thread
.thread_ctx
->srv_idx
;
860 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
863 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
864 ms_setting
.servers
[srv_idx
].srv_port
,
865 ms_setting
.udp
, &ret_sfd
) == 0) {
867 c
->tcpsfd
[i
] = ret_sfd
;
870 if (__sync_fetch_and_add(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1)
871 % srv_conn_cnt
== 0) {
873 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
874 int reconn_time
= (int)(ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
-
875 ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
);
876 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
877 ms_setting
.servers
[srv_idx
].srv_host_name
,
878 ms_setting
.servers
[srv_idx
].srv_port
, reconn_time
);
888 * Tokenize the command string by replacing whitespace with '\0' and update
889 * the token array tokens with pointer to start of each token and length.
890 * Returns total number of tokens. The last valid token is the terminal
891 * token (value points to the first unprocessed character of the string and
896 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
897 * for(int ix = 0; tokens[ix].length != 0; ix++) {
900 * ncommand = tokens[ix].value - command;
901 * command = tokens[ix].value;
904 * @param command, the command string to token
905 * @param tokens, array to store tokens
906 * @param max_tokens, maximum tokens number
908 * @return int, the number of tokens
910 static int ms_tokenize_command(char *command
, token_t
*tokens
, const int max_tokens
)
915 assert(command
!= NULL
&& tokens
!= NULL
&& max_tokens
> 1);
917 for (s
= e
= command
; ntokens
< max_tokens
- 1; ++e
) {
920 tokens
[ntokens
].value
= s
;
921 tokens
[ntokens
].length
= (size_t)(e
- s
);
926 } else if (*e
== '\0') {
928 tokens
[ntokens
].value
= s
;
929 tokens
[ntokens
].length
= (size_t)(e
- s
);
933 break; /* string end */
941 * parse the response of server.
943 * @param c, pointer of the concurrency
944 * @param command, the string responded by server
946 * @return int, if the command completed return 0, else return
949 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
)
953 char *buffer
= command
;
958 * for command get, we store the returned value into local buffer
959 * then continue in ms_complete_nread().
963 case 'V': /* VALUE || VERSION */
964 if (buffer
[1] == 'A') { /* VALUE */
965 token_t tokens
[MAX_TOKENS
];
966 ms_tokenize_command(command
, tokens
, MAX_TOKENS
);
967 value_len
= strtol(tokens
[VALUELEN_TOKEN
].value
, NULL
, 10);
968 c
->currcmd
.key_prefix
= *(uint64_t *)tokens
[KEY_TOKEN
].value
;
971 We read the \r\n into the string since not doing so is more
972 cycles then the waster of memory to do so.
974 We are null terminating through, which will most likely make
975 some people lazy about using the return length.
977 c
->rvbytes
= (int)(value_len
+ 2);
985 c
->currcmd
.retstat
= MCD_SUCCESS
;
987 case 'S': /* STORED STATS SERVER_ERROR */
988 if (buffer
[2] == 'A') {/* STORED STATS */
990 c
->currcmd
.retstat
= MCD_STAT
;
991 } else if (buffer
[1] == 'E') {
993 printf("<%d %s\n", c
->sfd
, buffer
);
995 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
996 } else if (buffer
[1] == 'T') {
998 c
->currcmd
.retstat
= MCD_STORED
;
1000 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1004 case 'D': /* DELETED DATA */
1005 if (buffer
[1] == 'E') {
1006 c
->currcmd
.retstat
= MCD_DELETED
;
1008 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1013 case 'N': /* NOT_FOUND NOT_STORED*/
1014 if (buffer
[4] == 'F') {
1015 c
->currcmd
.retstat
= MCD_NOTFOUND
;
1016 } else if (buffer
[4] == 'S') {
1017 printf("<%d %s\n", c
->sfd
, buffer
);
1018 c
->currcmd
.retstat
= MCD_NOTSTORED
;
1020 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1024 case 'E': /* PROTOCOL ERROR or END */
1025 if (buffer
[1] == 'N') {
1027 c
->currcmd
.retstat
= MCD_END
;
1028 } else if (buffer
[1] == 'R') {
1029 printf("<%d ERROR\n", c
->sfd
);
1030 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
1031 } else if (buffer
[1] == 'X') {
1032 c
->currcmd
.retstat
= MCD_DATA_EXISTS
;
1033 printf("<%d %s\n", c
->sfd
, buffer
);
1035 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1039 case 'C': /* CLIENT ERROR */
1040 printf("<%d %s\n", c
->sfd
, buffer
);
1041 c
->currcmd
.retstat
= MCD_CLIENT_ERROR
;
1045 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1053 * after one operation completes, reset the concurrency
1055 * @param c, pointer of the concurrency
1056 * @param timeout, whether it's timeout
1058 void ms_reset_conn(ms_conn_t
*c
, bool timeout
)
1063 if (c
->packets
> 0 && c
->packets
< MAX_UDP_PACKET
) {
1064 memset(c
->udppkt
, 0, sizeof(ms_udppkt_t
) * (uint64_t)c
->packets
);
1073 c
->currcmd
.isfinish
= true;
1074 c
->ctnwrite
= false;
1077 ms_conn_set_state(c
, conn_write
);
1078 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
1081 ms_drive_machine(c
);
1086 * if we have a complete line in the buffer, process it.
1088 * @param c, pointer of the concurrency
1090 * @return int, if success, return 0, else return -1
1092 static int ms_try_read_line(ms_conn_t
*c
)
1094 if (c
->protocol
== binary_prot
) {
1095 /* Do we have the complete packet header? */
1096 if ((uint64_t)c
->rbytes
< sizeof(c
->binary_header
)) {
1097 /* need more data! */
1101 if (((long)(c
->rcurr
)) % 8 != 0) {
1102 /* must realign input buffer */
1103 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
1105 if (settings
.verbose
) {
1106 fprintf(stderr
, "%d: Realign input buffer.\n", c
->sfd
);
1110 protocol_binary_response_header
* rsp
;
1111 rsp
= (protocol_binary_response_header
*)c
->rcurr
;
1113 c
->binary_header
= *rsp
;
1114 c
->binary_header
.response
.extlen
= rsp
->response
.extlen
;
1115 c
->binary_header
.response
.keylen
= ntohl(rsp
->response
.keylen
);
1116 c
->binary_header
.response
.bodylen
= ntohl(rsp
->response
.bodylen
);
1117 c
->binary_header
.response
.status
= ntohl(rsp
->response
.status
);
1119 if (c
->binary_header
.response
.magic
!= PROTOCOL_BINARY_RES
) {
1120 fprintf(stderr
, "Invalid magic: %x\n",
1121 c
->binary_header
.response
.magic
);
1122 ms_conn_set_state(c
, conn_closing
);
1126 /* process this complete response */
1127 if (ms_bin_process_response(c
) == 0) {
1128 /* current operation completed */
1129 ms_reset_conn(c
, false);
1132 c
->rbytes
-= (int32_t)sizeof(c
->binary_header
);
1133 c
->rcurr
+= sizeof(c
->binary_header
);
1140 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1144 el
= memchr(c
->rcurr
, '\n', (size_t)c
->rbytes
);
1148 if ((el
- c
->rcurr
) > 1 && *(el
- 1) == '\r') {
1153 assert(cont
<= (c
->rcurr
+ c
->rbytes
));
1155 /* process this complete line */
1156 if (ms_ascii_process_line(c
, c
->rcurr
) == 0) {
1157 /* current operation completed */
1158 ms_reset_conn(c
, false);
1161 /* current operation didn't complete */
1162 c
->rbytes
-= (int32_t)(cont
- c
->rcurr
);
1166 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1173 * because the packet of UDP can't ensure the order, the
1174 * function is used to sort the received udp packet.
1176 * @param c, pointer of the concurrency
1177 * @param buf, the buffer to store the ordered packages data
1178 * @param rbytes, the maximum capacity of the buffer
1180 * @return int, if success, return the copy bytes, else return
1183 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
)
1187 uint16_t req_id
= 0;
1188 uint16_t seq_num
= 0;
1189 uint16_t packets
= 0;
1190 unsigned char *header
= NULL
;
1192 /* no enough data */
1194 assert(buf
!= NULL
);
1195 assert(c
->rudpbytes
>= UDP_HEADER_SIZE
);
1197 /* calculate received packets count */
1198 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
>= UDP_HEADER_SIZE
) {
1199 /* the last packet has some data */
1200 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
+ 1;
1202 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
;
1205 /* get the total packets count if necessary */
1206 if (c
->packets
== 0) {
1207 c
->packets
= HEADER_TO_PACKETS((unsigned char *)c
->rudpbuf
);
1210 /* build the ordered packet array */
1211 for (int i
= c
->pktcurr
; i
< c
->recvpkt
; i
++) {
1212 header
= (unsigned char *)c
->rudpbuf
+ i
* UDP_MAX_PAYLOAD_SIZE
;
1213 req_id
= (uint16_t)HEADER_TO_REQID(header
);
1214 assert(req_id
== c
->request_id
% (1 << 16));
1216 packets
= (uint16_t)HEADER_TO_PACKETS(header
);
1217 assert(c
->packets
== HEADER_TO_PACKETS(header
));
1219 seq_num
= (uint16_t)HEADER_TO_SEQNUM(header
);
1220 c
->udppkt
[seq_num
].header
= header
;
1221 c
->udppkt
[seq_num
].data
= (char *)header
+ UDP_HEADER_SIZE
;
1223 if (i
== c
->recvpkt
- 1) {
1224 /* last received packet */
1225 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
== 0) {
1226 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1229 c
->udppkt
[seq_num
].rbytes
= c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1232 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1237 for (int i
= c
->ordcurr
; i
< c
->recvpkt
; i
++) {
1238 /* there is some data to copy */
1239 if (c
->udppkt
[i
].data
!= NULL
&& c
->udppkt
[i
].copybytes
< c
->udppkt
[i
].rbytes
) {
1240 header
= c
->udppkt
[i
].header
;
1241 len
= c
->udppkt
[i
].rbytes
- c
->udppkt
[i
].copybytes
;
1242 if (len
> rbytes
- wbytes
) {
1243 len
= rbytes
- wbytes
;
1246 assert(len
<= rbytes
- wbytes
);
1247 assert(i
== HEADER_TO_SEQNUM(header
));
1249 memcpy(buf
+ wbytes
, c
->udppkt
[i
].data
+ c
->udppkt
[i
].copybytes
, (size_t)len
);
1251 c
->udppkt
[i
].copybytes
+= len
;
1253 if (c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
1254 && c
->udppkt
[i
].rbytes
== UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
) {
1255 /* finish copying all the data of this packet, next */
1259 /* last received packet, and finish copying all the data */
1260 if (c
->recvpkt
== c
->packets
&& i
== c
->recvpkt
- 1
1261 && c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
) {
1265 /* no space to copy data */
1266 if (wbytes
>= rbytes
) {
1270 /* it doesn't finish reading all the data of the packet from network */
1271 if (i
!= c
->recvpkt
- 1
1272 && c
->udppkt
[i
].rbytes
< UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
) {
1276 /* no data to copy */
1281 return(wbytes
== 0 ? -1 : wbytes
);
1285 * encapsulate upd read like tcp read
1287 * @param c, pointer of the concurrency
1288 * @param buf, read buffer
1289 * @param len, length to read
1291 * @return int, if success, return the read bytes, else return
1294 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
)
1304 if (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
> c
->rudpsize
) {
1305 char *new_rbuf
= realloc(c
->rudpbuf
, (size_t)c
->rudpsize
* 2);
1307 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1308 c
->rudpbytes
= 0; /* ignore what we read */
1311 c
->rudpbuf
= new_rbuf
;
1315 avail
= c
->rudpsize
- c
->rudpbytes
;
1316 /* UDP each time read a packet, 1400 bytes */
1317 res
= (int)read(c
->sfd
, c
->rudpbuf
+ c
->rudpbytes
, (size_t)avail
);
1320 __sync_fetch_and_add(&ms_stats
.bytes_read
, res
);
1321 c
->rudpbytes
+= res
;
1331 /* "connection" closed */
1336 /* no data to read */
1341 /* copy data to read buffer */
1343 copybytes
= ms_sort_udp_packet(c
, buf
, len
);
1346 if (copybytes
== -1) {
1347 __sync_fetch_and_add(&ms_stats
.pkt_disorder
, 1);
1354 * read from network as much as we can, handle buffer overflow and connection
1356 * before reading, move the remaining incomplete fragment of a command
1357 * (if any) to the beginning of the buffer.
1358 * return 0 if there's nothing to read on the first read.
1361 * read from network as much as we can, handle buffer overflow and connection
1362 * close. before reading, move the remaining incomplete fragment of a command
1363 * (if any) to the beginning of the buffer.
1365 * @param c, pointer of the concurrency
1368 * return 0 if there's nothing to read on the first read.
1369 * return 1 if get data
1370 * return -1 if error happens
1372 static int ms_try_read_network(ms_conn_t
*c
)
1380 if (c
->rcurr
!= c
->rbuf
&&
1381 (!c
->readval
|| c
->rvbytes
> c
->rsize
- (c
->rcurr
- c
->rbuf
)
1382 || (c
->readval
&& c
->rcurr
- c
->rbuf
> c
->rbytes
))) {
1383 if (c
->rbytes
!= 0) /* otherwise there's nothing to copy */
1384 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
1389 if (c
->rbytes
>= c
->rsize
) {
1390 char *new_rbuf
= realloc(c
->rbuf
, (size_t)c
->rsize
* 2);
1392 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1393 c
->rbytes
= 0; /* ignore what we read */
1396 c
->rcurr
= c
->rbuf
= new_rbuf
;
1400 avail
= c
->rsize
- c
->rbytes
- (c
->rcurr
- c
->rbuf
);
1406 res
= (int32_t)ms_udp_read(c
, c
->rcurr
+ c
->rbytes
, (int32_t)avail
);
1408 res
= (int)read(c
->sfd
, c
->rcurr
+ c
->rbytes
, (size_t)avail
);
1413 __sync_fetch_and_add(&ms_stats
.bytes_read
, res
);
1424 /* connection closed */
1425 ms_conn_set_state(c
, conn_closing
);
1429 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) break;
1430 /* Should close on unhandled errors. */
1431 ms_conn_set_state(c
, conn_closing
);
1440 * after get the object from server, verify the value if
1443 * @param c, pointer of the concurrency
1444 * @param mlget_item, pointer of mulit-get task item structure
1445 * @param value, received value string
1446 * @param vlen, received value string length
1448 static void ms_verify_value(ms_conn_t
*c
, ms_mlget_task_item_t
*mlget_item
,
1449 char *value
, int vlen
)
1451 if (c
->curr_task
.verify
) {
1452 assert(c
->curr_task
.item
->value_offset
!= INVALID_OFFSET
);
1453 char *orignval
= &ms_setting
.char_block
[c
->curr_task
.item
->value_offset
];
1454 char *orignkey
= &ms_setting
.char_block
[c
->curr_task
.item
->key_suffix_offset
];
1456 /* verify expire time if necessary */
1457 if (c
->curr_task
.item
->exp_time
> 0) {
1458 struct timeval curr_time
;
1459 gettimeofday(&curr_time
, NULL
);
1461 /* object expired but get it now */
1462 if (curr_time
.tv_sec
- c
->curr_task
.item
->client_time
1463 > c
->curr_task
.item
->exp_time
+ EXPIRE_TIME_ERROR
) {
1464 __sync_fetch_and_add(&ms_stats
.exp_get
, 1);
1466 if (ms_setting
.verbose
) {
1469 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
1470 localtime(&c
->curr_task
.item
->client_time
));
1471 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
1472 localtime(&curr_time
.tv_sec
));
1473 fprintf(stderr
, "\n<%d expire time verification failed, "
1474 "object expired but get it now\n"
1477 "\tset time: %s current time: %s "
1478 "diff time: %d expire time: %d\n"
1479 "\texpected data: \n"
1480 "\treceived data len: %d\n"
1481 "\treceived data: %.*s\n",
1482 c
->sfd
, c
->curr_task
.item
->key_size
, c
->curr_task
.item
->key_prefix
,
1483 c
->curr_task
.item
->key_size
- (int)KEY_PREFIX_SIZE
,
1484 orignkey
, set_time
, cur_time
,
1485 (int)(curr_time
.tv_sec
- c
->curr_task
.item
->client_time
),
1486 c
->curr_task
.item
->exp_time
,
1492 if (c
->curr_task
.item
->value_size
!= vlen
1493 || memcmp(orignval
, value
, (size_t)vlen
) != 0) {
1494 __sync_fetch_and_add(&ms_stats
.vef_failed
, 1);
1496 if (ms_setting
.verbose
) {
1497 fprintf(stderr
, "\n<%d data verification failed\n"
1500 "\texpected data len: %d\n"
1501 "\texpected data: %.*s\n"
1502 "\treceived data len: %d\n"
1503 "\treceived data: %.*s\n",
1504 c
->sfd
, c
->curr_task
.item
->key_size
, c
->curr_task
.item
->key_prefix
,
1505 c
->curr_task
.item
->key_size
- (int)KEY_PREFIX_SIZE
,
1506 orignkey
, c
->curr_task
.item
->value_size
,
1507 c
->curr_task
.item
->value_size
,
1508 orignval
, vlen
, vlen
, value
);
1514 c
->curr_task
.finish_verify
= true;
1516 if (mlget_item
!= NULL
) {
1517 mlget_item
->finish_verify
= true;
1523 * For ASCII protocol, after store the data into the local
1524 * buffer, run this function to handle the data.
1526 * @param c, pointer of the concurrency
1528 static void ms_ascii_complete_nread(ms_conn_t
*c
)
1531 assert(c
->rbytes
>= c
->rvbytes
);
1532 assert(c
->protocol
== ascii_udp_prot
|| c
->protocol
== ascii_prot
);
1533 if (c
->rvbytes
> 2) {
1534 assert(c
->rcurr
[c
->rvbytes
- 1] == '\n' && c
->rcurr
[c
->rvbytes
- 2] == '\r');
1538 ms_mlget_task_item_t
*mlget_item
= NULL
;
1539 if ((ms_setting
.mult_key_num
> 1 &&
1540 c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
) ||
1541 (c
->remain_exec_num
== 0 && c
->mlget_task
.mlget_num
> 0)) {
1543 c
->mlget_task
.value_index
++;
1544 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1546 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
) {
1547 c
->curr_task
.item
= mlget_item
->item
;
1548 c
->curr_task
.verify
= mlget_item
->verify
;
1549 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1550 mlget_item
->get_miss
= false;
1552 /* Try to find the task item in multi-get task array */
1553 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
1554 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
1555 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
) {
1557 c
->curr_task
.item
= mlget_item
->item
;
1558 c
->curr_task
.verify
= mlget_item
->verify
;
1559 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1560 mlget_item
->get_miss
= false;
1568 ms_verify_value(c
, mlget_item
, c
->rcurr
, c
->rvbytes
- 2);
1570 c
->curr_task
.get_miss
= false;
1571 c
->rbytes
-= c
->rvbytes
;
1572 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1573 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1579 * For binary protocol, after store the data into the local
1580 * buffer, run this function to handle the data.
1582 * @param c, pointer of the concurrency
1584 static void ms_bin_complete_nread(ms_conn_t
*c
)
1587 assert(c
->rbytes
>= c
->rvbytes
);
1588 assert(c
->protocol
== binary_prot
);
1590 int extlen
= c
->binary_header
.response
.extlen
;
1591 int keylen
= c
->binary_header
.response
.keylen
;
1592 uint8_t opcode
= c
->binary_header
.response
.opcode
;
1594 /* not get command or not include value, just return */
1595 if ((opcode
!= PROTOCOL_BINARY_CMD_GET
&& opcode
!= PROTOCOL_BINARY_CMD_GETQ
) ||
1596 c
->rvbytes
<= extlen
+ keylen
) {
1598 if (c
->binary_header
.response
.opcode
== PROTOCOL_BINARY_CMD_GET
) {
1599 c
->currcmd
.retstat
= MCD_END
;
1600 c
->curr_task
.get_miss
= true;
1605 ms_reset_conn(c
, false);
1610 ms_mlget_task_item_t
*mlget_item
= NULL
;
1611 if ((ms_setting
.mult_key_num
> 1 &&
1612 c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
) ||
1613 (c
->remain_exec_num
== 0 && c
->mlget_task
.mlget_num
> 0)) {
1615 c
->mlget_task
.value_index
++;
1616 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1618 c
->curr_task
.item
= mlget_item
->item
;
1619 c
->curr_task
.verify
= mlget_item
->verify
;
1620 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1621 mlget_item
->get_miss
= false;
1624 ms_verify_value(c
, mlget_item
, c
->rcurr
+ extlen
+ keylen
, c
->rvbytes
- extlen
- keylen
);
1626 c
->currcmd
.retstat
= MCD_END
;
1627 c
->curr_task
.get_miss
= false;
1628 c
->rbytes
-= c
->rvbytes
;
1629 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1630 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1634 if (ms_setting
.mult_key_num
> 1) {
1635 /* multi-get have check all the item */
1636 if (c
->mlget_task
.value_index
== c
->mlget_task
.mlget_num
- 1) {
1637 ms_reset_conn(c
, false);
1641 ms_reset_conn(c
, false);
1646 * we get here after reading the value of get commands.
1648 * @param c, pointer of the concurrency
1650 static void ms_complete_nread(ms_conn_t
*c
)
1653 assert(c
->rbytes
>= c
->rvbytes
);
1654 assert(c
->protocol
== ascii_udp_prot
1655 || c
->protocol
== ascii_prot
1656 || c
->protocol
== binary_prot
);
1658 if (c
->protocol
== binary_prot
) {
1659 ms_bin_complete_nread(c
);
1661 ms_ascii_complete_nread(c
);
1666 * Adds a message header to a connection.
1668 * @param c, pointer of the concurrency
1670 * @return int, if success, return 0, else return -1
1672 static int ms_add_msghdr(ms_conn_t
*c
)
1678 if (c
->msgsize
== c
->msgused
) {
1679 msg
= realloc(c
->msglist
, (uint64_t)c
->msgsize
* 2 * sizeof(struct msghdr
));
1686 msg
= c
->msglist
+ c
->msgused
;
1689 * this wipes msg_iovlen, msg_control, msg_controllen, and
1690 * msg_flags, the last 3 of which aren't defined on solaris:
1692 memset(msg
, 0, sizeof(struct msghdr
));
1694 msg
->msg_iov
= &c
->iov
[c
->iovused
];
1696 if (c
->udp
&& c
->srv_recv_addr_size
> 0) {
1697 msg
->msg_name
= &c
->srv_recv_addr
;
1698 msg
->msg_namelen
= c
->srv_recv_addr_size
;
1705 /* Leave room for the UDP header, which we'll fill in later. */
1706 return ms_add_iov(c
, NULL
, UDP_HEADER_SIZE
);
1713 * Ensures that there is room for another structure iovec in a connection's
1716 * @param c, pointer of the concurrency
1718 * @return int, if success, return 0, else return -1
1720 static int ms_ensure_iov_space(ms_conn_t
*c
)
1724 if (c
->iovused
>= c
->iovsize
) {
1726 struct iovec
*new_iov
= (struct iovec
*)realloc(c
->iov
,
1727 ((uint64_t)c
->iovsize
* 2) * sizeof(struct iovec
));
1733 /* Point all the msghdr structures at the new list. */
1734 for (i
= 0, iovnum
= 0; i
< c
->msgused
; i
++) {
1735 c
->msglist
[i
].msg_iov
= &c
->iov
[iovnum
];
1736 iovnum
+= (int)c
->msglist
[i
].msg_iovlen
;
1744 * Adds data to the list of pending data that will be written out to a
1747 * @param c, pointer of the concurrency
1748 * @param buf, the buffer includes data to send
1749 * @param len, the data length in the buffer
1751 * @return int, if success, return 0, else return -1
1753 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
)
1762 m
= &c
->msglist
[c
->msgused
- 1];
1765 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
1767 limit_to_mtu
= c
->udp
;
1769 /* We may need to start a new msghdr if this one is full. */
1770 if (m
->msg_iovlen
== IOV_MAX
||
1771 (limit_to_mtu
&& c
->msgbytes
>= UDP_MAX_SEND_PAYLOAD_SIZE
)) {
1773 m
= &c
->msglist
[c
->msgused
- 1];
1776 if (ms_ensure_iov_space(c
) != 0)
1779 /* If the fragment is too big to fit in the datagram, split it up */
1780 if (limit_to_mtu
&& len
+ c
->msgbytes
> UDP_MAX_SEND_PAYLOAD_SIZE
) {
1781 leftover
= len
+ c
->msgbytes
- UDP_MAX_SEND_PAYLOAD_SIZE
;
1787 m
= &c
->msglist
[c
->msgused
- 1];
1788 m
->msg_iov
[m
->msg_iovlen
].iov_base
= (void *)buf
;
1789 m
->msg_iov
[m
->msg_iovlen
].iov_len
= (size_t)len
;
1795 buf
= ((char *)buf
) + len
;
1797 } while (leftover
> 0);
1803 * Constructs a set of UDP headers and attaches them to the outgoing messages.
1805 * @param c, pointer of the concurrency
1807 * @return int, if success, return 0, else return -1
1809 static int ms_build_udp_headers(ms_conn_t
*c
)
1816 c
->request_id
= ms_get_udp_request_id();
1818 if (c
->msgused
> c
->hdrsize
) {
1821 new_hdrbuf
= realloc(c
->hdrbuf
, (size_t)c
->msgused
* 2 * UDP_HEADER_SIZE
);
1823 new_hdrbuf
= malloc((size_t)c
->msgused
* 2 * UDP_HEADER_SIZE
);
1826 c
->hdrbuf
= (unsigned char *)new_hdrbuf
;
1827 c
->hdrsize
= c
->msgused
* 2;
1830 /* If this is a multi-packet request, drop it. */
1831 if (c
->udp
&& c
->msgused
> 1) {
1832 fprintf(stderr
, "multi-packet request for UDP not supported.\n");
1837 for (i
= 0; i
< c
->msgused
; i
++) {
1838 c
->msglist
[i
].msg_iov
[0].iov_base
= hdr
;
1839 c
->msglist
[i
].msg_iov
[0].iov_len
= UDP_HEADER_SIZE
;
1840 *hdr
++ = (unsigned char)(c
->request_id
/ 256);
1841 *hdr
++ = (unsigned char)(c
->request_id
% 256);
1842 *hdr
++ = (unsigned char)(i
/ 256);
1843 *hdr
++ = (unsigned char)(i
% 256);
1844 *hdr
++ = (unsigned char)(c
->msgused
/ 256);
1845 *hdr
++ = (unsigned char)(c
->msgused
% 256);
1846 *hdr
++ = (unsigned char)1; /* support facebook memcached */
1847 *hdr
++ = (unsigned char)0;
1848 assert(hdr
== ((unsigned char *)c
->msglist
[i
].msg_iov
[0].iov_base
+ UDP_HEADER_SIZE
));
1855 * Transmit the next chunk of data from our list of msgbuf structures.
1857 * @param c, pointer of the concurrency
1859 * @return TRANSMIT_COMPLETE All done writing.
1860 * TRANSMIT_INCOMPLETE More data remaining to write.
1861 * TRANSMIT_SOFT_ERROR Can't write any more right now.
1862 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1864 static int ms_transmit(ms_conn_t
*c
)
1868 if (c
->msgcurr
< c
->msgused
&&
1869 c
->msglist
[c
->msgcurr
].msg_iovlen
== 0) {
1870 /* Finished writing the current msg; advance to the next. */
1874 if (c
->msgcurr
< c
->msgused
) {
1876 struct msghdr
*m
= &c
->msglist
[c
->msgcurr
];
1878 res
= sendmsg(c
->sfd
, m
, 0);
1880 __sync_fetch_and_add(&ms_stats
.bytes_written
, res
);
1882 /* We've written some of the data. Remove the completed
1883 iovec entries from the list of pending writes. */
1884 while (m
->msg_iovlen
> 0 && res
>= (ssize_t
)m
->msg_iov
->iov_len
) {
1885 res
-= (ssize_t
)m
->msg_iov
->iov_len
;
1890 /* Might have written just part of the last iovec entry;
1891 adjust it so the next write will do the rest. */
1893 m
->msg_iov
->iov_base
= (unsigned char *)m
->msg_iov
->iov_base
+ res
;
1894 m
->msg_iov
->iov_len
-= (uint64_t)res
;
1896 return TRANSMIT_INCOMPLETE
;
1898 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
1899 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
1900 fprintf(stderr
, "Couldn't update event.\n");
1901 ms_conn_set_state(c
, conn_closing
);
1902 return TRANSMIT_HARD_ERROR
;
1904 return TRANSMIT_SOFT_ERROR
;
1907 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1908 we have a real error, on which we close the connection */
1909 fprintf(stderr
, "Failed to write, and not due to blocking.\n");
1911 ms_conn_set_state(c
, conn_closing
);
1912 return TRANSMIT_HARD_ERROR
;
1914 return TRANSMIT_COMPLETE
;
1919 * Shrinks a connection's buffers if they're too big. This prevents
1920 * periodic large "mget" response from server chewing lots of client
1923 * This should only be called in between requests since it can wipe output
1926 * @param c, pointer of the concurrency
1928 static void ms_conn_shrink(ms_conn_t
*c
)
1935 if (c
->rsize
> READ_BUFFER_HIGHWAT
&& c
->rbytes
< DATA_BUFFER_SIZE
) {
1938 if (c
->rcurr
!= c
->rbuf
)
1939 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
1941 newbuf
= (char *)realloc((void *)c
->rbuf
, DATA_BUFFER_SIZE
);
1945 c
->rsize
= DATA_BUFFER_SIZE
;
1950 if (c
->udp
&& c
->rudpsize
> UDP_DATA_BUFFER_HIGHWAT
1951 && c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
< UDP_DATA_BUFFER_SIZE
) {
1952 char *new_rbuf
= (char *)realloc(c
->rudpbuf
, (size_t)c
->rudpsize
* 2);
1954 c
->rudpbuf
= new_rbuf
;
1955 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
1957 /* TODO check error condition? */
1960 if (c
->msgsize
> MSG_LIST_HIGHWAT
) {
1961 struct msghdr
*newbuf
= (struct msghdr
*) realloc((void *)c
->msglist
,
1962 MSG_LIST_INITIAL
* sizeof(c
->msglist
[0]));
1964 c
->msglist
= newbuf
;
1965 c
->msgsize
= MSG_LIST_INITIAL
;
1967 /* TODO check error condition? */
1970 if (c
->iovsize
> IOV_LIST_HIGHWAT
) {
1971 struct iovec
*newbuf
= (struct iovec
*) realloc((void *)c
->iov
,
1972 IOV_LIST_INITIAL
* sizeof(c
->iov
[0]));
1975 c
->iovsize
= IOV_LIST_INITIAL
;
1977 /* TODO check return value */
1982 * Sets a connection's current state in the state machine. Any special
1983 * processing that needs to happen on certain state transitions can
1986 * @param c, pointer of the concurrency
1987 * @param state, connection state
1989 static void ms_conn_set_state(ms_conn_t
*c
, int state
)
1993 if (state
!= c
->state
) {
1994 if (state
== conn_read
) {
2002 * update the event if socks change state. for example: when
2003 * change the listen scoket read event to sock write event, or
2004 * change socket handler, we could call this function.
2006 * @param c, pointer of the concurrency
2007 * @param new_flags, new event flags
2009 * @return bool, if success, return true, else return false
2011 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
)
2013 /* default event timeout 10 seconds */
2014 struct timeval t
= {.tv_sec
= EVENT_TIMEOUT
, .tv_usec
= 0};
2018 struct event_base
*base
= c
->event
.ev_base
;
2019 if (c
->ev_flags
== new_flags
&& ms_setting
.rep_write_srv
== 0
2020 && (!ms_setting
.facebook_test
|| c
->total_sfds
== 1)) {
2024 if (event_del(&c
->event
) == -1) {
2025 /* try to delete the event again */
2026 if (event_del(&c
->event
) == -1) {
2031 event_set(&c
->event
, c
->sfd
, (short)new_flags
, ms_event_handler
, (void *)c
);
2032 event_base_set(base
, &c
->event
);
2033 c
->ev_flags
= (short)new_flags
;
2035 if (c
->total_sfds
== 1) {
2036 if (event_add(&c
->event
, NULL
) == -1) {
2040 if (event_add(&c
->event
, &t
) == -1) {
2049 * If user want to get the expected throughput, we could limit
2050 * the performance of memslap. we could give up some work and
2051 * just wait a short time. The function is used to check this
2054 * @param c, pointer of the concurrency
2056 * @return bool, if success, return true, else return false
2058 static bool ms_need_yield(ms_conn_t
*c
)
2061 int64_t time_diff
= 0;
2062 struct timeval curr_time
;
2063 ms_task_t
*task
= &c
->curr_task
;
2065 if (ms_setting
.expected_tps
> 0) {
2066 gettimeofday(&curr_time
, NULL
);
2067 time_diff
= ms_time_diff(&ms_thread
.startup_time
, &curr_time
);
2068 tps
= (int64_t)((task
->get_opt
+ task
->set_opt
) / ((uint64_t)time_diff
/ 1000000));
2070 /* current throughput is greater than expected throughput */
2071 if (tps
> ms_thread
.thread_ctx
->tps_perconn
) {
2080 * used to update the start time of each operation
2082 * @param c, pointer of the concurrency
2084 static void ms_update_start_time(ms_conn_t
*c
)
2086 ms_task_item_t
*item
= c
->curr_task
.item
;
2088 if (ms_setting
.stat_freq
> 0 || c
->udp
2089 || (c
->currcmd
.cmd
== CMD_SET
&& item
->exp_time
> 0)) {
2091 gettimeofday(&c
->start_time
, NULL
);
2092 if (c
->currcmd
.cmd
== CMD_SET
&& item
->exp_time
> 0) {
2093 /* record the current time */
2094 item
->client_time
= c
->start_time
.tv_sec
;
2100 * run the state machine
2102 * @param c, pointer of the concurrency
2104 static void ms_drive_machine(ms_conn_t
*c
)
2114 if (c
->rbytes
>= c
->rvbytes
) {
2115 ms_complete_nread(c
);
2119 if (ms_try_read_line(c
) != 0) {
2124 if (ms_try_read_network(c
) != 0) {
2128 /* doesn't read all the response data, wait event wake up */
2129 if (!c
->currcmd
.isfinish
) {
2130 if (!ms_update_event(c
, EV_READ
| EV_PERSIST
)) {
2131 fprintf(stderr
, "Couldn't update event.\n");
2132 ms_conn_set_state(c
, conn_closing
);
2139 /* we have no command line and no data to read from network, next write */
2140 ms_conn_set_state(c
, conn_write
);
2141 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
2146 if (!c
->ctnwrite
&& ms_need_yield(c
)) {
2149 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
2150 fprintf(stderr
, "Couldn't update event.\n");
2151 ms_conn_set_state(c
, conn_closing
);
2158 if (!c
->ctnwrite
&& ms_exec_task(c
) != 0) {
2159 ms_conn_set_state(c
, conn_closing
);
2163 /* record the start time before starting to send data if necessary */
2164 if (!c
->ctnwrite
|| (c
->change_sfd
&& c
->ctnwrite
)) {
2165 if (c
->change_sfd
) {
2166 c
->change_sfd
= false;
2168 ms_update_start_time(c
);
2171 /* change sfd if necessary */
2172 if (c
->change_sfd
) {
2178 /* execute task until nothing need be written to network */
2179 if (!c
->ctnwrite
&& c
->msgcurr
== c
->msgused
) {
2180 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
2181 fprintf(stderr
, "Couldn't update event.\n");
2182 ms_conn_set_state(c
, conn_closing
);
2189 switch (ms_transmit(c
)) {
2190 case TRANSMIT_COMPLETE
:
2191 /* we have no data to write to network, next wait repose */
2192 if (!ms_update_event(c
, EV_READ
| EV_PERSIST
)) {
2193 fprintf(stderr
, "Couldn't update event.\n");
2194 ms_conn_set_state(c
, conn_closing
);
2195 c
->ctnwrite
= false;
2198 ms_conn_set_state(c
, conn_read
);
2199 c
->ctnwrite
= false;
2203 case TRANSMIT_INCOMPLETE
:
2205 break; /* Continue in state machine. */
2207 case TRANSMIT_HARD_ERROR
:
2208 c
->ctnwrite
= false;
2211 case TRANSMIT_SOFT_ERROR
:
2221 /* recovery mode, need reconnect if connection close */
2222 if (ms_setting
.reconnect
&& (!ms_global
.time_out
2223 || (ms_setting
.run_time
== 0 && c
->remain_exec_num
> 0))) {
2225 if (ms_reconn(c
) != 0) {
2231 ms_reset_conn(c
, false);
2233 if (c
->total_sfds
== 1) {
2234 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
2235 fprintf(stderr
, "Couldn't update event.\n");
2236 ms_conn_set_state(c
, conn_closing
);
2256 * the event handler of each thread
2258 * @param fd, the file descriptor of socket
2259 * @param which, event flag
2260 * @param arg, argument
2262 void ms_event_handler(const int fd
, const short which
, void *arg
)
2264 ms_conn_t
*c
= (ms_conn_t
*)arg
;
2271 fprintf(stderr
, "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2276 assert(fd
== c
->sfd
);
2278 /* event timeout, close the current connection */
2279 if (c
->which
== EV_TIMEOUT
) {
2280 ms_conn_set_state(c
, conn_closing
);
2283 ms_drive_machine(c
);
2285 /* wait for next event */
2290 * get the next socket descriptor index to run for replication
2292 * @param c, pointer of the concurrency
2293 * @param cmd, command(get or set )
2295 * @return int, if success, return the index, else return 0
2297 static int ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
)
2299 int sock_index
= -1;
2302 if (c
->total_sfds
== 1) {
2306 if (ms_setting
.rep_write_srv
== 0) {
2311 if (cmd
== CMD_SET
) {
2312 for (i
= 0; i
< ms_setting
.rep_write_srv
; i
++) {
2313 if (c
->tcpsfd
[i
] > 0) {
2318 if (i
== ms_setting
.rep_write_srv
) {
2319 /* random get one replication server to read */
2320 sock_index
= (int)(random() % c
->total_sfds
);
2322 /* random get one replication writing server to write */
2323 sock_index
= (int)(random() % ms_setting
.rep_write_srv
);
2325 } else if (cmd
== CMD_GET
) {
2326 /* random get one replication server to read */
2327 sock_index
= (int)(random() % c
->total_sfds
);
2329 } while (c
->tcpsfd
[sock_index
] == 0);
2335 * get the next socket descriptor index to run
2337 * @param c, pointer of the concurrency
2339 * @return int, return the index
2341 static int ms_get_next_sock_index(ms_conn_t
*c
)
2346 sock_index
= (++c
->cur_idx
== c
->total_sfds
) ? 0 : c
->cur_idx
;
2347 } while (c
->tcpsfd
[sock_index
] == 0);
2353 * update socket event of the connections
2355 * @param c, pointer of the concurrency
2357 * @return int, if success, return 0, else return -1
2359 static int ms_update_conn_sock_event(ms_conn_t
*c
)
2363 switch (c
->currcmd
.cmd
) {
2365 if (ms_setting
.facebook_test
&& c
->udp
) {
2366 c
->sfd
= c
->tcpsfd
[0];
2368 c
->change_sfd
= true;
2372 if (ms_setting
.facebook_test
&& !c
->udp
) {
2375 c
->change_sfd
= true;
2382 if (!c
->udp
&& c
->total_sfds
> 1) {
2383 if (c
->cur_idx
!= c
->total_sfds
) {
2384 if (ms_setting
.rep_write_srv
== 0) {
2385 c
->cur_idx
= ms_get_next_sock_index(c
);
2387 c
->cur_idx
= ms_get_rep_sock_index(c
, c
->currcmd
.cmd
);
2390 /* must select the first sock of the connection at the beginning */
2394 c
->sfd
= c
->tcpsfd
[c
->cur_idx
];
2395 assert(c
->sfd
!= 0);
2396 c
->change_sfd
= true;
2399 if (c
->change_sfd
) {
2400 if (!ms_update_event(c
, EV_WRITE
| EV_PERSIST
)) {
2401 fprintf(stderr
, "Couldn't update event.\n");
2402 ms_conn_set_state(c
, conn_closing
);
2411 * for ASCII protocol, this function build the set command
2412 * string and send the command.
2414 * @param c, pointer of the concurrency
2415 * @param item, pointer of task item which includes the object
2418 * @return int, if success, return 0, else return -1
2420 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2424 char *buffer
= c
->wbuf
;
2426 write_len
= sprintf(buffer
, " %u %d %d\r\n", 0, item
->exp_time
, item
->value_size
);
2428 if (write_len
> c
->wsize
) {
2429 /* ought to be always enough. just fail for simplicity */
2430 fprintf(stderr
, "output command line too long.\n");
2434 if (item
->value_offset
== INVALID_OFFSET
) {
2435 value_offset
= item
->key_suffix_offset
;
2437 value_offset
= item
->value_offset
;
2440 if (ms_add_iov(c
, "set ", 4) != 0 ||
2441 ms_add_iov(c
, (char *)&item
->key_prefix
, (int)KEY_PREFIX_SIZE
) != 0 ||
2442 ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2443 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0 ||
2444 ms_add_iov(c
, buffer
, write_len
) != 0 ||
2445 ms_add_iov(c
, &ms_setting
.char_block
[value_offset
], item
->value_size
) != 0 ||
2446 ms_add_iov(c
, "\r\n", 2) != 0 ||
2447 (c
->udp
&& ms_build_udp_headers(c
) != 0)) {
2456 * used to send set command to server
2458 * @param c, pointer of the concurrency
2459 * @param item, pointer of task item which includes the object
2462 * @return int, if success, return 0, else return -1
2464 int ms_mcd_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2468 c
->currcmd
.cmd
= CMD_SET
;
2469 c
->currcmd
.isfinish
= false;
2470 c
->currcmd
.retstat
= MCD_FAILURE
;
2472 if (ms_update_conn_sock_event(c
) != 0) {
2479 if (ms_add_msghdr(c
) != 0) {
2480 fprintf(stderr
, "Out of memory preparing request.");
2484 /* binary protocol */
2485 if (c
->protocol
== binary_prot
) {
2486 if (ms_build_bin_write_buf_set(c
, item
) != 0) {
2490 if (ms_build_ascii_write_buf_set(c
, item
) != 0) {
2495 __sync_fetch_and_add(&ms_stats
.obj_bytes
, item
->key_size
+ item
->value_size
);
2496 __sync_fetch_and_add(&ms_stats
.cmd_set
, 1);
2502 * for ASCII protocol, this function build the get command
2503 * string and send the command.
2505 * @param c, pointer of the concurrency
2506 * @param item, pointer of task item which includes the object
2509 * @return int, if success, return 0, else return -1
2511 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
)
2513 if (ms_add_iov(c
, "get ", 4) != 0 ||
2514 ms_add_iov(c
, (char *)&item
->key_prefix
, (int)KEY_PREFIX_SIZE
) != 0 ||
2515 ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2516 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0 ||
2517 ms_add_iov(c
, "\r\n", 2) != 0 ||
2518 (c
->udp
&& ms_build_udp_headers(c
) != 0)) {
2527 * used to send the get command to server
2529 * @param c, pointer of the concurrency
2530 * @param item, pointer of task item which includes the object
2532 * @param verify, whether do verification
2534 * @return int, if success, return 0, else return -1
2536 int ms_mcd_get(ms_conn_t
*c
, ms_task_item_t
*item
, bool verify
)
2538 /* verify not supported yet */
2539 UNUSED_ARGUMENT(verify
);
2543 c
->currcmd
.cmd
= CMD_GET
;
2544 c
->currcmd
.isfinish
= false;
2545 c
->currcmd
.retstat
= MCD_FAILURE
;
2547 if (ms_update_conn_sock_event(c
) != 0) {
2554 if (ms_add_msghdr(c
) != 0) {
2555 fprintf(stderr
, "Out of memory preparing request.");
2559 /* binary protocol */
2560 if (c
->protocol
== binary_prot
) {
2561 if (ms_build_bin_write_buf_get(c
, item
) != 0) {
2565 if (ms_build_ascii_write_buf_get(c
, item
) != 0) {
2570 __sync_fetch_and_add(&ms_stats
.cmd_get
, 1);
2576 * for ASCII protocol, this function build the multi-get command
2577 * string and send the command.
2579 * @param c, pointer of the concurrency
2581 * @return int, if success, return 0, else return -1
2583 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
)
2585 ms_task_item_t
*item
;
2587 if (ms_add_iov(c
, "get", 3) != 0) {
2591 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
2592 item
= c
->mlget_task
.mlget_item
[i
].item
;
2593 assert(item
!= NULL
);
2594 if (ms_add_iov(c
, " ", 1) != 0 ||
2595 ms_add_iov(c
, (char *)&item
->key_prefix
, (int)KEY_PREFIX_SIZE
) != 0 ||
2596 ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2597 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0) {
2602 if (ms_add_iov(c
, "\r\n", 2) != 0 ||
2603 (c
->udp
&& ms_build_udp_headers(c
) != 0)) {
2611 * used to send the multi-get command to server
2613 * @param c, pointer of the concurrency
2615 * @return int, if success, return 0, else return -1
2617 int ms_mcd_mlget(ms_conn_t
*c
)
2619 ms_task_item_t
*item
;
2622 assert(c
->mlget_task
.mlget_num
>= 1);
2624 c
->currcmd
.cmd
= CMD_GET
;
2625 c
->currcmd
.isfinish
= false;
2626 c
->currcmd
.retstat
= MCD_FAILURE
;
2628 if (ms_update_conn_sock_event(c
) != 0) {
2635 if (ms_add_msghdr(c
) != 0) {
2636 fprintf(stderr
, "Out of memory preparing request.");
2640 /* binary protocol */
2641 if (c
->protocol
== binary_prot
) {
2642 if (ms_build_bin_write_buf_mlget(c
) != 0) {
2646 if (ms_build_ascii_write_buf_mlget(c
) != 0) {
2651 /* decrease operation time of each item */
2652 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
2653 item
= c
->mlget_task
.mlget_item
[i
].item
;
2654 __sync_fetch_and_add(&ms_stats
.cmd_get
, 1);
2661 * binary protocol support
2665 * for binary protocol, parse the response of server
2667 * @param c, pointer of the concurrency
2669 * @return int, if success, return 0, else return -1
2671 static int ms_bin_process_response(ms_conn_t
*c
)
2673 const char *errstr
= NULL
;
2677 uint32_t bodylen
= c
->binary_header
.response
.bodylen
;
2678 uint8_t opcode
= c
->binary_header
.response
.opcode
;
2679 uint16_t status
= c
->binary_header
.response
.status
;
2682 c
->rvbytes
= (int32_t)bodylen
;
2687 case PROTOCOL_BINARY_RESPONSE_SUCCESS
:
2688 if (opcode
== PROTOCOL_BINARY_CMD_SET
) {
2689 c
->currcmd
.retstat
= MCD_STORED
;
2690 } else if (opcode
== PROTOCOL_BINARY_CMD_DELETE
) {
2691 c
->currcmd
.retstat
= MCD_DELETED
;
2692 } else if (opcode
== PROTOCOL_BINARY_CMD_GET
) {
2693 c
->currcmd
.retstat
= MCD_END
;
2696 case PROTOCOL_BINARY_RESPONSE_ENOMEM
:
2697 errstr
= "Out of memory";
2698 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
2700 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
:
2701 errstr
= "Unknown command";
2702 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
2704 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
:
2705 errstr
= "Not found";
2706 c
->currcmd
.retstat
= MCD_NOTFOUND
;
2708 case PROTOCOL_BINARY_RESPONSE_EINVAL
:
2709 errstr
= "Invalid arguments";
2710 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
2712 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
:
2713 errstr
= "Data exists for key.";
2715 case PROTOCOL_BINARY_RESPONSE_E2BIG
:
2716 errstr
= "Too large.";
2717 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
2719 case PROTOCOL_BINARY_RESPONSE_NOT_STORED
:
2720 errstr
= "Not stored.";
2721 c
->currcmd
.retstat
= MCD_NOTSTORED
;
2724 errstr
= "Unknown error";
2725 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
2729 if (errstr
!= NULL
) {
2730 fprintf(stderr
, "%s\n", errstr
);
2737 /* build binary header and add the header to the buffer to send */
2739 * build binary header and add the header to the buffer to send
2741 * @param c, pointer of the concurrency
2742 * @param opcode, operation code
2743 * @param hdr_len, length of header
2744 * @param key_len, length of key
2745 * @param body_len. length of body
2747 static void ms_add_bin_header(ms_conn_t
*c
, uint8_t opcode
, uint8_t hdr_len
,
2748 uint16_t key_len
, uint32_t body_len
) {
2749 protocol_binary_request_header
* header
;
2753 header
= (protocol_binary_request_header
*)c
->wcurr
;
2755 header
->request
.magic
= (uint8_t)PROTOCOL_BINARY_REQ
;
2756 header
->request
.opcode
= (uint8_t)opcode
;
2757 header
->request
.keylen
= htonl(key_len
);
2759 header
->request
.extlen
= (uint8_t)hdr_len
;
2760 header
->request
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
;
2761 header
->request
.reserved
= 0;
2763 header
->request
.bodylen
= htonl(body_len
);
2764 header
->request
.opaque
= 0;
2765 header
->request
.cas
= 0;
2767 ms_add_iov(c
, c
->wcurr
, sizeof(header
->request
));
2771 * add the key to the socket write buffer array
2773 * @param c, pointer of the concurrency
2774 * @param item, pointer of task item which includes the object
2777 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
)
2779 ms_add_iov(c
, (char *)&item
->key_prefix
, (int)KEY_PREFIX_SIZE
);
2780 ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2781 item
->key_size
- (int)KEY_PREFIX_SIZE
);
2785 * for binary protocol, this function build the set command
2786 * and add the command to send buffer array.
2788 * @param c, pointer of the concurrency
2789 * @param item, pointer of task item which includes the object
2792 * @return int, if success, return 0, else return -1
2794 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2796 assert(c
->wbuf
== c
->wcurr
);
2799 protocol_binary_request_set
* rep
= (protocol_binary_request_set
*)c
->wcurr
;
2800 uint16_t keylen
= (uint16_t)item
->key_size
;
2801 uint32_t bodylen
= (uint32_t)sizeof(rep
->message
.body
) + (uint32_t)keylen
+ (uint32_t)item
->value_size
;
2803 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_SET
, sizeof(rep
->message
.body
), keylen
, bodylen
);
2804 rep
->message
.body
.flags
= 0;
2805 rep
->message
.body
.expiration
= htonl((uint32_t)item
->exp_time
);
2806 ms_add_iov(c
, &rep
->message
.body
, sizeof(rep
->message
.body
));
2807 ms_add_key_to_iov(c
, item
);
2809 if (item
->value_offset
== INVALID_OFFSET
) {
2810 value_offset
= item
->key_suffix_offset
;
2812 value_offset
= item
->value_offset
;
2814 ms_add_iov(c
, &ms_setting
.char_block
[value_offset
], item
->value_size
);
2820 * for binary protocol, this function build the get command and
2821 * add the command to send buffer array.
2823 * @param c, pointer of the concurrency
2824 * @param item, pointer of task item which includes the object
2827 * @return int, if success, return 0, else return -1
2829 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
)
2831 assert(c
->wbuf
== c
->wcurr
);
2833 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_GET
, 0, (uint16_t)item
->key_size
, (uint32_t)item
->key_size
);
2834 ms_add_key_to_iov(c
, item
);
2840 * for binary protocol, this function build the multi-get
2841 * command and add the command to send buffer array.
2843 * @param c, pointer of the concurrency
2844 * @param item, pointer of task item which includes the object
2847 * @return int, if success, return 0, else return -1
2849 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
)
2851 ms_task_item_t
*item
;
2853 assert(c
->wbuf
== c
->wcurr
);
2855 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++) {
2856 item
= c
->mlget_task
.mlget_item
[i
].item
;
2857 assert(item
!= NULL
);
2859 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_GET
, 0, (uint16_t)item
->key_size
, (uint32_t)item
->key_size
);
2860 ms_add_key_to_iov(c
, item
);
2861 c
->wcurr
+= sizeof(protocol_binary_request_get
);