3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
12 #include "mem_config.h"
20 #include <netinet/tcp.h>
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
24 #if defined(HAVE_SYS_TIME_H)
25 # include <sys/time.h>
28 #if defined(HAVE_TIME_H)
32 #include "ms_setting.h"
33 #include "ms_thread.h"
34 #include "ms_atomic.h"
37 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
38 * optimize the conversion functions, but the prototypes generate warnings
39 * from gcc. The conversion methods isn't the bottleneck for my app, so
40 * just remove the warnings by undef'ing the optimization ..
48 /* for network write */
49 #define TRANSMIT_COMPLETE 0
50 #define TRANSMIT_INCOMPLETE 1
51 #define TRANSMIT_SOFT_ERROR 2
52 #define TRANSMIT_HARD_ERROR 3
54 /* for generating key */
55 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
56 #define KEY_PREFIX_MASK 0x1010101010101010
58 /* For parse the value length return by server */
60 #define VALUELEN_TOKEN 3
62 /* global increasing counter, to ensure the key prefix unique */
63 static uint64_t key_prefix_seq
= KEY_PREFIX_BASE
;
65 /* global increasing counter, generating request id for UDP */
66 static volatile uint32_t udp_request_id
= 0;
68 extern pthread_key_t ms_thread_key
;
70 /* generate upd request id */
71 static uint32_t ms_get_udp_request_id(void);
74 /* connect initialize */
75 static void ms_task_init(ms_conn_t
*c
);
76 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
);
77 static int ms_conn_sock_init(ms_conn_t
*c
);
78 static int ms_conn_event_init(ms_conn_t
*c
);
79 static int ms_conn_init(ms_conn_t
*c
,
81 const int read_buffer_size
,
83 static void ms_warmup_num_init(ms_conn_t
*c
);
84 static int ms_item_win_init(ms_conn_t
*c
);
87 /* connection close */
88 void ms_conn_free(ms_conn_t
*c
);
89 static void ms_conn_close(ms_conn_t
*c
);
92 /* create network connection */
93 static int ms_new_socket(struct addrinfo
*ai
);
94 static void ms_maximize_sndbuf(const int sfd
);
95 static int ms_network_connect(ms_conn_t
*c
,
100 static int ms_reconn(ms_conn_t
*c
);
104 static int ms_tokenize_command(char *command
,
106 const int max_tokens
);
107 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
);
108 static int ms_try_read_line(ms_conn_t
*c
);
109 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
);
110 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
);
111 static int ms_try_read_network(ms_conn_t
*c
);
112 static void ms_verify_value(ms_conn_t
*c
,
113 ms_mlget_task_item_t
*mlget_item
,
116 static void ms_ascii_complete_nread(ms_conn_t
*c
);
117 static void ms_bin_complete_nread(ms_conn_t
*c
);
118 static void ms_complete_nread(ms_conn_t
*c
);
122 static int ms_add_msghdr(ms_conn_t
*c
);
123 static int ms_ensure_iov_space(ms_conn_t
*c
);
124 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
);
125 static int ms_build_udp_headers(ms_conn_t
*c
);
126 static int ms_transmit(ms_conn_t
*c
);
129 /* status adjustment */
130 static void ms_conn_shrink(ms_conn_t
*c
);
131 static void ms_conn_set_state(ms_conn_t
*c
, int state
);
132 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
);
133 static uint32_t ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
);
134 static uint32_t ms_get_next_sock_index(ms_conn_t
*c
);
135 static int ms_update_conn_sock_event(ms_conn_t
*c
);
136 static bool ms_need_yield(ms_conn_t
*c
);
137 static void ms_update_start_time(ms_conn_t
*c
);
141 static void ms_drive_machine(ms_conn_t
*c
);
142 void ms_event_handler(const int fd
, const short which
, void *arg
);
146 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
147 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
148 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
);
151 /* binary protocol */
152 static int ms_bin_process_response(ms_conn_t
*c
);
153 static void ms_add_bin_header(ms_conn_t
*c
,
158 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
);
159 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
160 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
161 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
);
165 * each key has two parts, prefix and suffix. The suffix is a
166 * string random get form the character table. The prefix is a
167 * uint64_t variable. And the prefix must be unique. we use the
168 * prefix to identify a key. And the prefix can't include
169 * character ' ' '\r' '\n' '\0'.
173 uint64_t ms_get_key_prefix(void)
177 pthread_mutex_lock(&ms_global
.seq_mutex
);
178 key_prefix_seq
|= KEY_PREFIX_MASK
;
179 key_prefix
= key_prefix_seq
;
181 pthread_mutex_unlock(&ms_global
.seq_mutex
);
184 } /* ms_get_key_prefix */
188 * get an unique udp request id
190 * @return an unique UDP request id
192 static uint32_t ms_get_udp_request_id(void)
194 return atomic_add_32_nv(&udp_request_id
, 1);
199 * initialize current task structure
201 * @param c, pointer of the concurrency
203 static void ms_task_init(ms_conn_t
*c
)
205 c
->curr_task
.cmd
= CMD_NULL
;
206 c
->curr_task
.item
= 0;
207 c
->curr_task
.verify
= false;
208 c
->curr_task
.finish_verify
= true;
209 c
->curr_task
.get_miss
= true;
211 c
->curr_task
.get_opt
= 0;
212 c
->curr_task
.set_opt
= 0;
213 c
->curr_task
.cycle_undo_get
= 0;
214 c
->curr_task
.cycle_undo_set
= 0;
215 c
->curr_task
.verified_get
= 0;
216 c
->curr_task
.overwrite_set
= 0;
221 * initialize udp for the connection structure
223 * @param c, pointer of the concurrency
224 * @param is_udp, whether it's udp
226 * @return int, if success, return EXIT_SUCCESS, else return -1
228 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
)
234 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
245 if (c
->udp
|| (! c
->udp
&& ms_setting
.facebook_test
))
247 c
->rudpbuf
= (char *)malloc((size_t)c
->rudpsize
);
248 c
->udppkt
= (ms_udppkt_t
*)malloc(MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
250 if ((c
->rudpbuf
== NULL
) || (c
->udppkt
== NULL
))
252 if (c
->rudpbuf
!= NULL
)
254 if (c
->udppkt
!= NULL
)
256 fprintf(stderr
, "malloc()\n");
259 memset(c
->udppkt
, 0, MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
263 } /* ms_conn_udp_init */
267 * initialize the connection structure
269 * @param c, pointer of the concurrency
270 * @param init_state, (conn_read, conn_write, conn_closing)
271 * @param read_buffer_size
272 * @param is_udp, whether it's udp
274 * @return int, if success, return EXIT_SUCCESS, else return -1
276 static int ms_conn_init(ms_conn_t
*c
,
277 const int init_state
,
278 const int read_buffer_size
,
287 c
->rsize
= read_buffer_size
;
288 c
->wsize
= WRITE_BUFFER_SIZE
;
289 c
->iovsize
= IOV_LIST_INITIAL
;
290 c
->msgsize
= MSG_LIST_INITIAL
;
292 /* for replication, each connection need connect all the server */
293 if (ms_setting
.rep_write_srv
> 0)
295 c
->total_sfds
= ms_setting
.srv_cnt
* ms_setting
.sock_per_conn
;
299 c
->total_sfds
= ms_setting
.sock_per_conn
;
303 c
->rbuf
= (char *)malloc((size_t)c
->rsize
);
304 c
->wbuf
= (char *)malloc((size_t)c
->wsize
);
305 c
->iov
= (struct iovec
*)malloc(sizeof(struct iovec
) * (size_t)c
->iovsize
);
306 c
->msglist
= (struct msghdr
*)malloc(
307 sizeof(struct msghdr
) * (size_t)c
->msgsize
);
308 if (ms_setting
.mult_key_num
> 1)
310 c
->mlget_task
.mlget_item
= (ms_mlget_task_item_t
*)
312 sizeof(ms_mlget_task_item_t
) * (size_t)ms_setting
.mult_key_num
);
314 c
->tcpsfd
= (int *)malloc((size_t)c
->total_sfds
* sizeof(int));
316 if ((c
->rbuf
== NULL
) || (c
->wbuf
== NULL
) || (c
->iov
== NULL
)
317 || (c
->msglist
== NULL
) || (c
->tcpsfd
== NULL
)
318 || ((ms_setting
.mult_key_num
> 1)
319 && (c
->mlget_task
.mlget_item
== NULL
)))
327 if (c
->msglist
!= NULL
)
329 if (c
->mlget_task
.mlget_item
!= NULL
)
330 free(c
->mlget_task
.mlget_item
);
331 if (c
->tcpsfd
!= NULL
)
333 fprintf(stderr
, "malloc()\n");
337 c
->state
= init_state
;
345 c
->cur_idx
= c
->total_sfds
; /* default index is a invalid value */
349 c
->change_sfd
= false;
351 c
->precmd
.cmd
= c
->currcmd
.cmd
= CMD_NULL
;
352 c
->precmd
.isfinish
= true; /* default the previous command finished */
353 c
->currcmd
.isfinish
= false;
354 c
->precmd
.retstat
= c
->currcmd
.retstat
= MCD_FAILURE
;
355 c
->precmd
.key_prefix
= c
->currcmd
.key_prefix
= 0;
357 c
->mlget_task
.mlget_num
= 0;
358 c
->mlget_task
.value_index
= -1; /* default invalid value */
360 if (ms_setting
.binary_prot_
)
362 c
->protocol
= binary_prot
;
366 c
->protocol
= ascii_prot
;
370 if (ms_conn_udp_init(c
, is_udp
) != 0)
375 /* initialize task */
378 if (! (ms_setting
.facebook_test
&& is_udp
))
380 atomic_add_32(&ms_stats
.active_conns
, 1);
388 * when doing 100% get operation, it could preset some objects
389 * to warmup the server. this function is used to initialize the
390 * number of the objects to preset.
392 * @param c, pointer of the concurrency
394 static void ms_warmup_num_init(ms_conn_t
*c
)
396 /* no set operation, preset all the items in the window */
397 if (ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
< PROP_ERROR
)
399 c
->warmup_num
= c
->win_size
;
400 c
->remain_warmup_num
= c
->warmup_num
;
405 c
->remain_warmup_num
= c
->warmup_num
;
407 } /* ms_warmup_num_init */
411 * each connection has an item window, this function initialize
412 * the window. The window is used to generate task.
414 * @param c, pointer of the concurrency
416 * @return int, if success, return EXIT_SUCCESS, else return -1
418 static int ms_item_win_init(ms_conn_t
*c
)
420 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
423 c
->win_size
= (int)ms_setting
.win_size
;
425 c
->exec_num
= ms_thread
->thread_ctx
->exec_num_perconn
;
426 c
->remain_exec_num
= c
->exec_num
;
428 c
->item_win
= (ms_task_item_t
*)malloc(
429 sizeof(ms_task_item_t
) * (size_t)c
->win_size
);
430 if (c
->item_win
== NULL
)
432 fprintf(stderr
, "Can't allocate task item array for conn.\n");
435 memset(c
->item_win
, 0, sizeof(ms_task_item_t
) * (size_t)c
->win_size
);
437 for (int i
= 0; i
< c
->win_size
; i
++)
439 c
->item_win
[i
].key_size
= (int)ms_setting
.distr
[i
].key_size
;
440 c
->item_win
[i
].key_prefix
= ms_get_key_prefix();
441 c
->item_win
[i
].key_suffix_offset
= ms_setting
.distr
[i
].key_offset
;
442 c
->item_win
[i
].value_size
= (int)ms_setting
.distr
[i
].value_size
;
443 c
->item_win
[i
].value_offset
= INVALID_OFFSET
; /* default in invalid offset */
444 c
->item_win
[i
].client_time
= 0;
446 /* set expire time base on the proportion */
447 if (exp_cnt
< ms_setting
.exp_ver_per
* i
)
449 c
->item_win
[i
].exp_time
= FIXED_EXPIRE_TIME
;
454 c
->item_win
[i
].exp_time
= 0;
458 ms_warmup_num_init(c
);
461 } /* ms_item_win_init */
465 * each connection structure can include one or more sock
466 * handlers. this function create these socks and connect the
469 * @param c, pointer of the concurrency
471 * @return int, if success, return EXIT_SUCCESS, else return -1
473 static int ms_conn_sock_init(ms_conn_t
*c
)
475 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
481 assert(c
->tcpsfd
!= NULL
);
483 for (i
= 0; i
< c
->total_sfds
; i
++)
486 if (ms_setting
.rep_write_srv
> 0)
488 /* for replication, each connection need connect all the server */
489 srv_idx
= i
% ms_setting
.srv_cnt
;
493 /* all the connections in a thread connects the same server */
494 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
497 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
498 ms_setting
.servers
[srv_idx
].srv_port
,
499 ms_setting
.udp
, &ret_sfd
) != 0)
509 if (! ms_setting
.udp
)
511 c
->tcpsfd
[i
]= ret_sfd
;
517 /* initialize udp sock handler if necessary */
518 if (ms_setting
.facebook_test
)
521 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
522 ms_setting
.servers
[srv_idx
].srv_port
,
523 true, &ret_sfd
) != 0)
533 if ((i
!= c
->total_sfds
) || (ms_setting
.facebook_test
&& (c
->udpsfd
== 0)))
541 for (uint32_t j
= 0; j
< i
; j
++)
556 } /* ms_conn_sock_init */
560 * each connection is managed by libevent, this function
561 * initialize the event of the connection structure.
563 * @param c, pointer of the concurrency
565 * @return int, if success, return EXIT_SUCCESS, else return -1
567 static int ms_conn_event_init(ms_conn_t
*c
)
569 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
570 short event_flags
= EV_WRITE
| EV_PERSIST
;
572 event_set(&c
->event
, c
->sfd
, event_flags
, ms_event_handler
, (void *)c
);
573 event_base_set(ms_thread
->base
, &c
->event
);
574 c
->ev_flags
= event_flags
;
576 if (event_add(&c
->event
, NULL
) == -1)
582 } /* ms_conn_event_init */
586 * setup a connection, each connection structure of each
587 * thread must call this function to initialize.
589 * @param c, pointer of the concurrency
591 * @return int, if success, return EXIT_SUCCESS, else return -1
593 int ms_setup_conn(ms_conn_t
*c
)
595 if (ms_item_win_init(c
) != 0)
600 if (ms_conn_init(c
, conn_write
, DATA_BUFFER_SIZE
, ms_setting
.udp
) != 0)
605 if (ms_conn_sock_init(c
) != 0)
610 if (ms_conn_event_init(c
) != 0)
616 } /* ms_setup_conn */
620 * Frees a connection.
622 * @param c, pointer of the concurrency
624 void ms_conn_free(ms_conn_t
*c
)
626 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
629 if (c
->hdrbuf
!= NULL
)
631 if (c
->msglist
!= NULL
)
639 if (c
->mlget_task
.mlget_item
!= NULL
)
640 free(c
->mlget_task
.mlget_item
);
641 if (c
->rudpbuf
!= NULL
)
643 if (c
->udppkt
!= NULL
)
645 if (c
->item_win
!= NULL
)
647 if (c
->tcpsfd
!= NULL
)
650 if (--ms_thread
->nactive_conn
== 0)
652 free(ms_thread
->conn
);
661 * @param c, pointer of the concurrency
663 static void ms_conn_close(ms_conn_t
*c
)
665 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
668 /* delete the event, the socket and the connection */
669 event_del(&c
->event
);
671 for (uint32_t i
= 0; i
< c
->total_sfds
; i
++)
673 if (c
->tcpsfd
[i
] > 0)
680 if (ms_setting
.facebook_test
)
685 atomic_dec_32(&ms_stats
.active_conns
);
689 if (ms_setting
.run_time
== 0)
691 pthread_mutex_lock(&ms_global
.run_lock
.lock
);
692 ms_global
.run_lock
.count
++;
693 pthread_cond_signal(&ms_global
.run_lock
.cond
);
694 pthread_mutex_unlock(&ms_global
.run_lock
.lock
);
697 if (ms_thread
->nactive_conn
== 0)
701 } /* ms_conn_close */
707 * @param ai, server address information
709 * @return int, if success, return EXIT_SUCCESS, else return -1
711 static int ms_new_socket(struct addrinfo
*ai
)
715 if ((sfd
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
)) == -1)
717 fprintf(stderr
, "socket() error: %s.\n", strerror(errno
));
722 } /* ms_new_socket */
726 * Sets a socket's send buffer size to the maximum allowed by the system.
728 * @param sfd, file descriptor of socket
730 static void ms_maximize_sndbuf(const int sfd
)
732 socklen_t intsize
= sizeof(int);
733 unsigned int last_good
= 0;
734 unsigned int min
, max
, avg
;
735 unsigned int old_size
;
737 /* Start with the default size. */
738 if (getsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, &old_size
, &intsize
) != 0)
740 fprintf(stderr
, "getsockopt(SO_SNDBUF)\n");
744 /* Binary-search for the real maximum. */
746 max
= MAX_SENDBUF_SIZE
;
750 avg
= ((unsigned int)(min
+ max
)) / 2;
751 if (setsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, (void *)&avg
, intsize
) == 0)
762 } /* ms_maximize_sndbuf */
766 * socket connects the server
768 * @param c, pointer of the concurrency
769 * @param srv_host_name, the host name of the server
770 * @param srv_port, port of server
771 * @param is_udp, whether it's udp
772 * @param ret_sfd, the connected socket file descriptor
774 * @return int, if success, return EXIT_SUCCESS, else return -1
776 static int ms_network_connect(ms_conn_t
*c
,
788 struct addrinfo
*next
;
789 struct addrinfo hints
;
790 char port_buf
[NI_MAXSERV
];
797 * the memset call clears nonstandard fields in some impementations
798 * that otherwise mess things up.
800 memset(&hints
, 0, sizeof(hints
));
802 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
804 hints
.ai_flags
= AI_PASSIVE
;
805 #endif /* AI_ADDRCONFIG */
808 hints
.ai_protocol
= IPPROTO_UDP
;
809 hints
.ai_socktype
= SOCK_DGRAM
;
810 hints
.ai_family
= AF_INET
; /* This left here because of issues with OSX 10.5 */
814 hints
.ai_family
= AF_UNSPEC
;
815 hints
.ai_protocol
= IPPROTO_TCP
;
816 hints
.ai_socktype
= SOCK_STREAM
;
819 snprintf(port_buf
, NI_MAXSERV
, "%d", srv_port
);
820 error
= getaddrinfo(srv_host_name
, port_buf
, &hints
, &ai
);
823 if (error
!= EAI_SYSTEM
)
824 fprintf(stderr
, "getaddrinfo(): %s.\n", gai_strerror(error
));
826 perror("getaddrinfo()\n");
831 for (next
= ai
; next
; next
= next
->ai_next
)
833 if ((sfd
= ms_new_socket(next
)) == -1)
839 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
));
842 ms_maximize_sndbuf(sfd
);
846 setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
,
848 setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
849 setsockopt(sfd
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
,
855 c
->srv_recv_addr_size
= sizeof(struct sockaddr
);
856 memcpy(&c
->srv_recv_addr
, next
->ai_addr
, c
->srv_recv_addr_size
);
860 if (connect(sfd
, next
->ai_addr
, next
->ai_addrlen
) == -1)
868 if (((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0)
869 || (fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0))
871 fprintf(stderr
, "setting O_NONBLOCK\n");
887 /* Return zero if we detected no errors in starting up connections */
889 } /* ms_network_connect */
893 * reconnect a disconnected sock
895 * @param c, pointer of the concurrency
897 * @return int, if success, return EXIT_SUCCESS, else return -1
899 static int ms_reconn(ms_conn_t
*c
)
901 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
903 uint32_t srv_conn_cnt
= 0;
905 if (ms_setting
.rep_write_srv
> 0)
907 srv_idx
= c
->cur_idx
% ms_setting
.srv_cnt
;
908 srv_conn_cnt
= ms_setting
.sock_per_conn
* ms_setting
.nconns
;
912 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
913 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
916 /* close the old socket handler */
918 c
->tcpsfd
[c
->cur_idx
]= 0;
920 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].disconn_cnt
, 1)
923 gettimeofday(&ms_setting
.servers
[srv_idx
].disconn_time
, NULL
);
924 fprintf(stderr
, "Server %s:%d disconnect\n",
925 ms_setting
.servers
[srv_idx
].srv_host_name
,
926 ms_setting
.servers
[srv_idx
].srv_port
);
929 if (ms_setting
.rep_write_srv
> 0)
933 for (i
= 0; i
< c
->total_sfds
; i
++)
935 if (c
->tcpsfd
[i
] != 0)
941 /* all socks disconnect */
942 if (i
== c
->total_sfds
)
951 /* reconnect success, break the loop */
952 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
953 ms_setting
.servers
[srv_idx
].srv_port
,
954 ms_setting
.udp
, &c
->sfd
) == 0)
956 c
->tcpsfd
[c
->cur_idx
]= c
->sfd
;
957 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1)
958 % (uint32_t)srv_conn_cnt
== 0)
960 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
962 (int)(ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
963 - ms_setting
.servers
[srv_idx
].disconn_time
965 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
966 ms_setting
.servers
[srv_idx
].srv_host_name
,
967 ms_setting
.servers
[srv_idx
].srv_port
, reconn_time
);
972 if (ms_setting
.rep_write_srv
== 0 && c
->total_sfds
> 0)
974 /* wait a second and reconnect */
978 while (ms_setting
.rep_write_srv
== 0 && c
->total_sfds
> 0);
981 if ((c
->total_sfds
> 1) && (c
->tcpsfd
[c
->cur_idx
] == 0))
992 * reconnect several disconnected socks in the connection
993 * structure, the ever-1-second timer of the thread will check
994 * whether some socks in the connections disconnect. if
995 * disconnect, reconnect the sock.
997 * @param c, pointer of the concurrency
999 * @return int, if success, return EXIT_SUCCESS, else return -1
1001 int ms_reconn_socks(ms_conn_t
*c
)
1003 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
1004 uint32_t srv_idx
= 0;
1006 uint32_t srv_conn_cnt
= 0;
1007 struct timeval cur_time
;
1011 if ((c
->total_sfds
== 1) || (c
->total_sfds
== c
->alive_sfds
))
1013 return EXIT_SUCCESS
;
1016 for (uint32_t i
= 0; i
< c
->total_sfds
; i
++)
1018 if (c
->tcpsfd
[i
] == 0)
1020 gettimeofday(&cur_time
, NULL
);
1023 * For failover test of replication, reconnect the socks after
1024 * it disconnects more than 5 seconds, Otherwise memslap will
1025 * block at connect() function and the work threads can't work
1029 - ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
< 5)
1034 if (ms_setting
.rep_write_srv
> 0)
1036 srv_idx
= i
% ms_setting
.srv_cnt
;
1037 srv_conn_cnt
= ms_setting
.sock_per_conn
* ms_setting
.nconns
;
1041 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
1042 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
1045 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
1046 ms_setting
.servers
[srv_idx
].srv_port
,
1047 ms_setting
.udp
, &ret_sfd
) == 0)
1049 c
->tcpsfd
[i
]= ret_sfd
;
1052 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1)
1053 % (uint32_t)srv_conn_cnt
== 0)
1055 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
1057 (int)(ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
1058 - ms_setting
.servers
[srv_idx
].disconn_time
1060 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
1061 ms_setting
.servers
[srv_idx
].srv_host_name
,
1062 ms_setting
.servers
[srv_idx
].srv_port
, reconn_time
);
1068 return EXIT_SUCCESS
;
1069 } /* ms_reconn_socks */
1073 * Tokenize the command string by replacing whitespace with '\0' and update
1074 * the token array tokens with pointer to start of each token and length.
1075 * Returns total number of tokens. The last valid token is the terminal
1076 * token (value points to the first unprocessed character of the string and
1081 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1082 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1085 * ncommand = tokens[ix].value - command;
1086 * command = tokens[ix].value;
1089 * @param command, the command string to token
1090 * @param tokens, array to store tokens
1091 * @param max_tokens, maximum tokens number
1093 * @return int, the number of tokens
1095 static int ms_tokenize_command(char *command
,
1097 const int max_tokens
)
1102 assert(command
!= NULL
&& tokens
!= NULL
&& max_tokens
> 1);
1104 for (s
= e
= command
; ntokens
< max_tokens
- 1; ++e
)
1110 tokens
[ntokens
].value
= s
;
1111 tokens
[ntokens
].length
= (size_t)(e
- s
);
1117 else if (*e
== '\0')
1121 tokens
[ntokens
].value
= s
;
1122 tokens
[ntokens
].length
= (size_t)(e
- s
);
1126 break; /* string end */
1131 } /* ms_tokenize_command */
1135 * parse the response of server.
1137 * @param c, pointer of the concurrency
1138 * @param command, the string responded by server
1140 * @return int, if the command completed return EXIT_SUCCESS, else return
1143 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
)
1147 char *buffer
= command
;
1152 * for command get, we store the returned value into local buffer
1153 * then continue in ms_complete_nread().
1158 case 'V': /* VALUE || VERSION */
1159 if (buffer
[1] == 'A') /* VALUE */
1161 token_t tokens
[MAX_TOKENS
];
1162 ms_tokenize_command(command
, tokens
, MAX_TOKENS
);
1164 value_len
= strtol(tokens
[VALUELEN_TOKEN
].value
, NULL
, 10);
1167 printf("<%d ERROR %s\n", c
->sfd
, strerror(errno
));
1169 c
->currcmd
.key_prefix
= *(uint64_t *)tokens
[KEY_TOKEN
].value
;
1172 * We read the \r\n into the string since not doing so is more
1173 * cycles then the waster of memory to do so.
1175 * We are null terminating through, which will most likely make
1176 * some people lazy about using the return length.
1178 c
->rvbytes
= (int)(value_len
+ 2);
1186 c
->currcmd
.retstat
= MCD_SUCCESS
;
1188 case 'S': /* STORED STATS SERVER_ERROR */
1189 if (buffer
[2] == 'A') /* STORED STATS */
1191 c
->currcmd
.retstat
= MCD_STAT
;
1193 else if (buffer
[1] == 'E')
1196 printf("<%d %s\n", c
->sfd
, buffer
);
1198 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
1200 else if (buffer
[1] == 'T')
1203 c
->currcmd
.retstat
= MCD_STORED
;
1207 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1211 case 'D': /* DELETED DATA */
1212 if (buffer
[1] == 'E')
1214 c
->currcmd
.retstat
= MCD_DELETED
;
1218 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1223 case 'N': /* NOT_FOUND NOT_STORED*/
1224 if (buffer
[4] == 'F')
1226 c
->currcmd
.retstat
= MCD_NOTFOUND
;
1228 else if (buffer
[4] == 'S')
1230 printf("<%d %s\n", c
->sfd
, buffer
);
1231 c
->currcmd
.retstat
= MCD_NOTSTORED
;
1235 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1239 case 'E': /* PROTOCOL ERROR or END */
1240 if (buffer
[1] == 'N')
1243 c
->currcmd
.retstat
= MCD_END
;
1245 else if (buffer
[1] == 'R')
1247 printf("<%d ERROR\n", c
->sfd
);
1248 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
1250 else if (buffer
[1] == 'X')
1252 c
->currcmd
.retstat
= MCD_DATA_EXISTS
;
1253 printf("<%d %s\n", c
->sfd
, buffer
);
1257 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1261 case 'C': /* CLIENT ERROR */
1262 printf("<%d %s\n", c
->sfd
, buffer
);
1263 c
->currcmd
.retstat
= MCD_CLIENT_ERROR
;
1267 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1272 } /* ms_ascii_process_line */
1276 * after one operation completes, reset the concurrency
1278 * @param c, pointer of the concurrency
1279 * @param timeout, whether it's timeout
1281 void ms_reset_conn(ms_conn_t
*c
, bool timeout
)
1287 if ((c
->packets
> 0) && (c
->packets
< MAX_UDP_PACKET
))
1289 memset(c
->udppkt
, 0, sizeof(ms_udppkt_t
) * (size_t)c
->packets
);
1298 c
->currcmd
.isfinish
= true;
1305 ms_conn_set_state(c
, conn_write
);
1306 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
1310 ms_drive_machine(c
);
1312 } /* ms_reset_conn */
1316 * if we have a complete line in the buffer, process it.
1318 * @param c, pointer of the concurrency
1320 * @return int, if success, return EXIT_SUCCESS, else return -1
1322 static int ms_try_read_line(ms_conn_t
*c
)
1324 if (c
->protocol
== binary_prot
)
1326 /* Do we have the complete packet header? */
1327 if ((uint64_t)c
->rbytes
< sizeof(c
->binary_header
))
1329 /* need more data! */
1330 return EXIT_SUCCESS
;
1335 if (((long)(c
->rcurr
)) % 8 != 0)
1337 /* must realign input buffer */
1338 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
1340 if (settings
.verbose
)
1342 fprintf(stderr
, "%d: Realign input buffer.\n", c
->sfd
);
1346 protocol_binary_response_header
*rsp
;
1347 rsp
= (protocol_binary_response_header
*)c
->rcurr
;
1349 c
->binary_header
= *rsp
;
1350 c
->binary_header
.response
.extlen
= rsp
->response
.extlen
;
1351 c
->binary_header
.response
.keylen
= ntohs(rsp
->response
.keylen
);
1352 c
->binary_header
.response
.bodylen
= ntohl(rsp
->response
.bodylen
);
1353 c
->binary_header
.response
.status
= ntohs(rsp
->response
.status
);
1355 if (c
->binary_header
.response
.magic
!= PROTOCOL_BINARY_RES
)
1357 fprintf(stderr
, "Invalid magic: %x\n",
1358 c
->binary_header
.response
.magic
);
1359 ms_conn_set_state(c
, conn_closing
);
1360 return EXIT_SUCCESS
;
1363 /* process this complete response */
1364 if (ms_bin_process_response(c
) == 0)
1366 /* current operation completed */
1367 ms_reset_conn(c
, false);
1372 c
->rbytes
-= (int32_t)sizeof(c
->binary_header
);
1373 c
->rcurr
+= sizeof(c
->binary_header
);
1382 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1385 return EXIT_SUCCESS
;
1387 el
= memchr(c
->rcurr
, '\n', (size_t)c
->rbytes
);
1389 return EXIT_SUCCESS
;
1392 if (((el
- c
->rcurr
) > 1) && (*(el
- 1) == '\r'))
1398 assert(cont
<= (c
->rcurr
+ c
->rbytes
));
1400 /* process this complete line */
1401 if (ms_ascii_process_line(c
, c
->rcurr
) == 0)
1403 /* current operation completed */
1404 ms_reset_conn(c
, false);
1409 /* current operation didn't complete */
1410 c
->rbytes
-= (int32_t)(cont
- c
->rcurr
);
1414 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1418 } /* ms_try_read_line */
1422 * because the packet of UDP can't ensure the order, the
1423 * function is used to sort the received udp packet.
1425 * @param c, pointer of the concurrency
1426 * @param buf, the buffer to store the ordered packages data
1427 * @param rbytes, the maximum capacity of the buffer
1429 * @return int, if success, return the copy bytes, else return
1432 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
)
1437 uint16_t seq_num
= 0;
1438 uint16_t packets
= 0;
1439 unsigned char *header
= NULL
;
1441 /* no enough data */
1443 assert(buf
!= NULL
);
1444 assert(c
->rudpbytes
>= UDP_HEADER_SIZE
);
1446 /* calculate received packets count */
1447 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
>= UDP_HEADER_SIZE
)
1449 /* the last packet has some data */
1450 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
+ 1;
1454 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
;
1457 /* get the total packets count if necessary */
1458 if (c
->packets
== 0)
1460 c
->packets
= HEADER_TO_PACKETS((unsigned char *)c
->rudpbuf
);
1463 /* build the ordered packet array */
1464 for (int i
= c
->pktcurr
; i
< c
->recvpkt
; i
++)
1466 header
= (unsigned char *)c
->rudpbuf
+ i
* UDP_MAX_PAYLOAD_SIZE
;
1467 req_id
= (uint16_t)HEADER_TO_REQID(header
);
1468 assert(req_id
== c
->request_id
% (1 << 16));
1470 packets
= (uint16_t)HEADER_TO_PACKETS(header
);
1471 assert(c
->packets
== HEADER_TO_PACKETS(header
));
1473 seq_num
= (uint16_t)HEADER_TO_SEQNUM(header
);
1474 c
->udppkt
[seq_num
].header
= header
;
1475 c
->udppkt
[seq_num
].data
= (char *)header
+ UDP_HEADER_SIZE
;
1477 if (i
== c
->recvpkt
- 1)
1479 /* last received packet */
1480 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
== 0)
1482 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1487 c
->udppkt
[seq_num
].rbytes
= c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
1493 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1498 for (int i
= c
->ordcurr
; i
< c
->recvpkt
; i
++)
1500 /* there is some data to copy */
1501 if ((c
->udppkt
[i
].data
!= NULL
)
1502 && (c
->udppkt
[i
].copybytes
< c
->udppkt
[i
].rbytes
))
1504 header
= c
->udppkt
[i
].header
;
1505 len
= c
->udppkt
[i
].rbytes
- c
->udppkt
[i
].copybytes
;
1506 if (len
> rbytes
- wbytes
)
1508 len
= rbytes
- wbytes
;
1511 assert(len
<= rbytes
- wbytes
);
1512 assert(i
== HEADER_TO_SEQNUM(header
));
1514 memcpy(buf
+ wbytes
, c
->udppkt
[i
].data
+ c
->udppkt
[i
].copybytes
,
1517 c
->udppkt
[i
].copybytes
+= len
;
1519 if ((c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
)
1520 && (c
->udppkt
[i
].rbytes
== UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
))
1522 /* finish copying all the data of this packet, next */
1526 /* last received packet, and finish copying all the data */
1527 if ((c
->recvpkt
== c
->packets
) && (i
== c
->recvpkt
- 1)
1528 && (c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
))
1533 /* no space to copy data */
1534 if (wbytes
>= rbytes
)
1539 /* it doesn't finish reading all the data of the packet from network */
1540 if ((i
!= c
->recvpkt
- 1)
1541 && (c
->udppkt
[i
].rbytes
< UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
))
1548 /* no data to copy */
1554 return wbytes
== 0 ? -1 : wbytes
;
1555 } /* ms_sort_udp_packet */
1559 * encapsulate upd read like tcp read
1561 * @param c, pointer of the concurrency
1562 * @param buf, read buffer
1563 * @param len, length to read
1565 * @return int, if success, return the read bytes, else return
1568 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
)
1579 if (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
> c
->rudpsize
)
1581 char *new_rbuf
= realloc(c
->rudpbuf
, (size_t)c
->rudpsize
* 2);
1584 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1585 c
->rudpbytes
= 0; /* ignore what we read */
1588 c
->rudpbuf
= new_rbuf
;
1592 avail
= c
->rudpsize
- c
->rudpbytes
;
1593 /* UDP each time read a packet, 1400 bytes */
1594 res
= (int)read(c
->sfd
, c
->rudpbuf
+ c
->rudpbytes
, (size_t)avail
);
1598 atomic_add_size(&ms_stats
.bytes_read
, res
);
1613 /* "connection" closed */
1619 /* no data to read */
1624 /* copy data to read buffer */
1627 copybytes
= ms_sort_udp_packet(c
, buf
, len
);
1630 if (copybytes
== -1)
1632 atomic_add_size(&ms_stats
.pkt_disorder
, 1);
1640 * read from network as much as we can, handle buffer overflow and connection
1642 * before reading, move the remaining incomplete fragment of a command
1643 * (if any) to the beginning of the buffer.
1644 * return EXIT_SUCCESS if there's nothing to read on the first read.
1648 * read from network as much as we can, handle buffer overflow and connection
1649 * close. before reading, move the remaining incomplete fragment of a command
1650 * (if any) to the beginning of the buffer.
1652 * @param c, pointer of the concurrency
1655 * return EXIT_SUCCESS if there's nothing to read on the first read.
1656 * return EXIT_FAILURE if get data
1657 * return -1 if error happens
1659 static int ms_try_read_network(ms_conn_t
*c
)
1667 if ((c
->rcurr
!= c
->rbuf
)
1668 && (! c
->readval
|| (c
->rvbytes
> c
->rsize
- (c
->rcurr
- c
->rbuf
))
1669 || (c
->readval
&& (c
->rcurr
- c
->rbuf
> c
->rbytes
))))
1671 if (c
->rbytes
!= 0) /* otherwise there's nothing to copy */
1672 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
1678 if (c
->rbytes
>= c
->rsize
)
1680 char *new_rbuf
= realloc(c
->rbuf
, (size_t)c
->rsize
* 2);
1683 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1684 c
->rbytes
= 0; /* ignore what we read */
1687 c
->rcurr
= c
->rbuf
= new_rbuf
;
1691 avail
= c
->rsize
- c
->rbytes
- (c
->rcurr
- c
->rbuf
);
1699 res
= (int32_t)ms_udp_read(c
, c
->rcurr
+ c
->rbytes
, (int32_t)avail
);
1703 res
= (int)read(c
->sfd
, c
->rcurr
+ c
->rbytes
, (size_t)avail
);
1710 atomic_add_size(&ms_stats
.bytes_read
, res
);
1725 /* connection closed */
1726 ms_conn_set_state(c
, conn_closing
);
1731 if ((errno
== EAGAIN
) || (errno
== EWOULDBLOCK
))
1733 /* Should close on unhandled errors. */
1734 ms_conn_set_state(c
, conn_closing
);
1740 } /* ms_try_read_network */
1744 * after get the object from server, verify the value if
1747 * @param c, pointer of the concurrency
1748 * @param mlget_item, pointer of mulit-get task item structure
1749 * @param value, received value string
1750 * @param vlen, received value string length
1752 static void ms_verify_value(ms_conn_t
*c
,
1753 ms_mlget_task_item_t
*mlget_item
,
1757 if (c
->curr_task
.verify
)
1759 assert(c
->curr_task
.item
->value_offset
!= INVALID_OFFSET
);
1760 char *orignval
= &ms_setting
.char_block
[c
->curr_task
.item
->value_offset
];
1762 &ms_setting
.char_block
[c
->curr_task
.item
->key_suffix_offset
];
1764 /* verify expire time if necessary */
1765 if (c
->curr_task
.item
->exp_time
> 0)
1767 struct timeval curr_time
;
1768 gettimeofday(&curr_time
, NULL
);
1770 /* object expired but get it now */
1771 if (curr_time
.tv_sec
- c
->curr_task
.item
->client_time
1772 > c
->curr_task
.item
->exp_time
+ EXPIRE_TIME_ERROR
)
1774 atomic_add_size(&ms_stats
.exp_get
, 1);
1776 if (ms_setting
.verbose
)
1780 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
1781 localtime(&c
->curr_task
.item
->client_time
));
1782 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
1783 localtime(&curr_time
.tv_sec
));
1785 "\n<%d expire time verification failed, "
1786 "object expired but get it now\n"
1788 "\tkey: %" PRIx64
" %.*s\n"
1789 "\tset time: %s current time: %s "
1790 "diff time: %d expire time: %d\n"
1791 "\texpected data: \n"
1792 "\treceived data len: %d\n"
1793 "\treceived data: %.*s\n",
1795 c
->curr_task
.item
->key_size
,
1796 c
->curr_task
.item
->key_prefix
,
1797 c
->curr_task
.item
->key_size
- (int)KEY_PREFIX_SIZE
,
1801 (int)(curr_time
.tv_sec
- c
->curr_task
.item
->client_time
),
1802 c
->curr_task
.item
->exp_time
,
1812 if ((c
->curr_task
.item
->value_size
!= vlen
)
1813 || (memcmp(orignval
, value
, (size_t)vlen
) != 0))
1815 atomic_add_size(&ms_stats
.vef_failed
, 1);
1817 if (ms_setting
.verbose
)
1820 "\n<%d data verification failed\n"
1822 "\tkey: %" PRIx64
" %.*s\n"
1823 "\texpected data len: %d\n"
1824 "\texpected data: %.*s\n"
1825 "\treceived data len: %d\n"
1826 "\treceived data: %.*s\n",
1828 c
->curr_task
.item
->key_size
,
1829 c
->curr_task
.item
->key_prefix
,
1830 c
->curr_task
.item
->key_size
- (int)KEY_PREFIX_SIZE
,
1832 c
->curr_task
.item
->value_size
,
1833 c
->curr_task
.item
->value_size
,
1843 c
->curr_task
.finish_verify
= true;
1845 if (mlget_item
!= NULL
)
1847 mlget_item
->finish_verify
= true;
1850 } /* ms_verify_value */
1854 * For ASCII protocol, after store the data into the local
1855 * buffer, run this function to handle the data.
1857 * @param c, pointer of the concurrency
1859 static void ms_ascii_complete_nread(ms_conn_t
*c
)
1862 assert(c
->rbytes
>= c
->rvbytes
);
1863 assert(c
->protocol
== ascii_prot
);
1867 c
->rcurr
[c
->rvbytes
- 1] == '\n' && c
->rcurr
[c
->rvbytes
- 2] == '\r');
1871 ms_mlget_task_item_t
*mlget_item
= NULL
;
1872 if (((ms_setting
.mult_key_num
> 1)
1873 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1874 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1876 c
->mlget_task
.value_index
++;
1877 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1879 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
)
1881 c
->curr_task
.item
= mlget_item
->item
;
1882 c
->curr_task
.verify
= mlget_item
->verify
;
1883 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1884 mlget_item
->get_miss
= false;
1888 /* Try to find the task item in multi-get task array */
1889 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
1891 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
1892 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
)
1894 c
->curr_task
.item
= mlget_item
->item
;
1895 c
->curr_task
.verify
= mlget_item
->verify
;
1896 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1897 mlget_item
->get_miss
= false;
1905 ms_verify_value(c
, mlget_item
, c
->rcurr
, c
->rvbytes
- 2);
1907 c
->curr_task
.get_miss
= false;
1908 c
->rbytes
-= c
->rvbytes
;
1909 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1910 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1913 } /* ms_ascii_complete_nread */
1917 * For binary protocol, after store the data into the local
1918 * buffer, run this function to handle the data.
1920 * @param c, pointer of the concurrency
1922 static void ms_bin_complete_nread(ms_conn_t
*c
)
1925 assert(c
->rbytes
>= c
->rvbytes
);
1926 assert(c
->protocol
== binary_prot
);
1928 int extlen
= c
->binary_header
.response
.extlen
;
1929 int keylen
= c
->binary_header
.response
.keylen
;
1930 uint8_t opcode
= c
->binary_header
.response
.opcode
;
1932 /* not get command or not include value, just return */
1933 if (((opcode
!= PROTOCOL_BINARY_CMD_GET
)
1934 && (opcode
!= PROTOCOL_BINARY_CMD_GETQ
))
1935 || (c
->rvbytes
<= extlen
+ keylen
))
1938 if (c
->binary_header
.response
.opcode
== PROTOCOL_BINARY_CMD_GET
)
1940 c
->currcmd
.retstat
= MCD_END
;
1941 c
->curr_task
.get_miss
= true;
1946 ms_reset_conn(c
, false);
1951 ms_mlget_task_item_t
*mlget_item
= NULL
;
1952 if (((ms_setting
.mult_key_num
> 1)
1953 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1954 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1956 c
->mlget_task
.value_index
++;
1957 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1959 c
->curr_task
.item
= mlget_item
->item
;
1960 c
->curr_task
.verify
= mlget_item
->verify
;
1961 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1962 mlget_item
->get_miss
= false;
1967 c
->rcurr
+ extlen
+ keylen
,
1968 c
->rvbytes
- extlen
- keylen
);
1970 c
->currcmd
.retstat
= MCD_END
;
1971 c
->curr_task
.get_miss
= false;
1972 c
->rbytes
-= c
->rvbytes
;
1973 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1974 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1978 if (ms_setting
.mult_key_num
> 1)
1980 /* multi-get have check all the item */
1981 if (c
->mlget_task
.value_index
== c
->mlget_task
.mlget_num
- 1)
1983 ms_reset_conn(c
, false);
1989 ms_reset_conn(c
, false);
1991 } /* ms_bin_complete_nread */
1995 * we get here after reading the value of get commands.
1997 * @param c, pointer of the concurrency
1999 static void ms_complete_nread(ms_conn_t
*c
)
2002 assert(c
->rbytes
>= c
->rvbytes
);
2003 assert(c
->protocol
== ascii_prot
2004 || c
->protocol
== binary_prot
);
2006 if (c
->protocol
== binary_prot
)
2008 ms_bin_complete_nread(c
);
2012 ms_ascii_complete_nread(c
);
2014 } /* ms_complete_nread */
2018 * Adds a message header to a connection.
2020 * @param c, pointer of the concurrency
2022 * @return int, if success, return EXIT_SUCCESS, else return -1
2024 static int ms_add_msghdr(ms_conn_t
*c
)
2030 if (c
->msgsize
== c
->msgused
)
2033 realloc(c
->msglist
, (size_t)c
->msgsize
* 2 * sizeof(struct msghdr
));
2041 msg
= c
->msglist
+ c
->msgused
;
2044 * this wipes msg_iovlen, msg_control, msg_controllen, and
2045 * msg_flags, the last 3 of which aren't defined on solaris:
2047 memset(msg
, 0, sizeof(struct msghdr
));
2049 msg
->msg_iov
= &c
->iov
[c
->iovused
];
2051 if (c
->udp
&& (c
->srv_recv_addr_size
> 0))
2053 msg
->msg_name
= &c
->srv_recv_addr
;
2054 msg
->msg_namelen
= c
->srv_recv_addr_size
;
2062 /* Leave room for the UDP header, which we'll fill in later. */
2063 return ms_add_iov(c
, NULL
, UDP_HEADER_SIZE
);
2066 return EXIT_SUCCESS
;
2067 } /* ms_add_msghdr */
2071 * Ensures that there is room for another structure iovec in a connection's
2074 * @param c, pointer of the concurrency
2076 * @return int, if success, return EXIT_SUCCESS, else return -1
2078 static int ms_ensure_iov_space(ms_conn_t
*c
)
2082 if (c
->iovused
>= c
->iovsize
)
2085 struct iovec
*new_iov
= (struct iovec
*)realloc(c
->iov
,
2088 * sizeof(struct iovec
));
2095 /* Point all the msghdr structures at the new list. */
2096 for (i
= 0, iovnum
= 0; i
< c
->msgused
; i
++)
2098 c
->msglist
[i
].msg_iov
= &c
->iov
[iovnum
];
2099 iovnum
+= (int)c
->msglist
[i
].msg_iovlen
;
2103 return EXIT_SUCCESS
;
2104 } /* ms_ensure_iov_space */
2108 * Adds data to the list of pending data that will be written out to a
2111 * @param c, pointer of the concurrency
2112 * @param buf, the buffer includes data to send
2113 * @param len, the data length in the buffer
2115 * @return int, if success, return EXIT_SUCCESS, else return -1
2117 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
)
2127 m
= &c
->msglist
[c
->msgused
- 1];
2130 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2132 limit_to_mtu
= c
->udp
;
2135 /* We may need to start a new msghdr if this one is full. */
2136 if ((m
->msg_iovlen
== IOV_MAX
)
2137 || (limit_to_mtu
&& (c
->msgbytes
>= UDP_MAX_SEND_PAYLOAD_SIZE
)))
2140 m
= &c
->msglist
[c
->msgused
- 1];
2144 if (ms_ensure_iov_space(c
) != 0)
2147 /* If the fragment is too big to fit in the datagram, split it up */
2148 if (limit_to_mtu
&& (len
+ c
->msgbytes
> UDP_MAX_SEND_PAYLOAD_SIZE
))
2150 leftover
= len
+ c
->msgbytes
- UDP_MAX_SEND_PAYLOAD_SIZE
;
2158 m
= &c
->msglist
[c
->msgused
- 1];
2159 m
->msg_iov
[m
->msg_iovlen
].iov_base
= (void *)buf
;
2160 m
->msg_iov
[m
->msg_iovlen
].iov_len
= (size_t)len
;
2166 buf
= ((char *)buf
) + len
;
2169 while (leftover
> 0);
2171 return EXIT_SUCCESS
;
2176 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2178 * @param c, pointer of the concurrency
2180 * @return int, if success, return EXIT_SUCCESS, else return -1
2182 static int ms_build_udp_headers(ms_conn_t
*c
)
2189 c
->request_id
= ms_get_udp_request_id();
2191 if (c
->msgused
> c
->hdrsize
)
2195 new_hdrbuf
= realloc(c
->hdrbuf
,
2196 (size_t)c
->msgused
* 2 * UDP_HEADER_SIZE
);
2198 new_hdrbuf
= malloc((size_t)c
->msgused
* 2 * UDP_HEADER_SIZE
);
2202 c
->hdrbuf
= (unsigned char *)new_hdrbuf
;
2203 c
->hdrsize
= c
->msgused
* 2;
2206 /* If this is a multi-packet request, drop it. */
2207 if (c
->udp
&& (c
->msgused
> 1))
2209 fprintf(stderr
, "multi-packet request for UDP not supported.\n");
2214 for (i
= 0; i
< c
->msgused
; i
++)
2216 c
->msglist
[i
].msg_iov
[0].iov_base
= (void *)hdr
;
2217 c
->msglist
[i
].msg_iov
[0].iov_len
= UDP_HEADER_SIZE
;
2218 *hdr
++= (unsigned char)(c
->request_id
/ 256);
2219 *hdr
++= (unsigned char)(c
->request_id
% 256);
2220 *hdr
++= (unsigned char)(i
/ 256);
2221 *hdr
++= (unsigned char)(i
% 256);
2222 *hdr
++= (unsigned char)(c
->msgused
/ 256);
2223 *hdr
++= (unsigned char)(c
->msgused
% 256);
2224 *hdr
++= (unsigned char)1; /* support facebook memcached */
2225 *hdr
++= (unsigned char)0;
2227 ((unsigned char *)c
->msglist
[i
].msg_iov
[0].iov_base
2228 + UDP_HEADER_SIZE
));
2231 return EXIT_SUCCESS
;
2232 } /* ms_build_udp_headers */
2236 * Transmit the next chunk of data from our list of msgbuf structures.
2238 * @param c, pointer of the concurrency
2240 * @return TRANSMIT_COMPLETE All done writing.
2241 * TRANSMIT_INCOMPLETE More data remaining to write.
2242 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2243 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2245 static int ms_transmit(ms_conn_t
*c
)
2249 if ((c
->msgcurr
< c
->msgused
)
2250 && (c
->msglist
[c
->msgcurr
].msg_iovlen
== 0))
2252 /* Finished writing the current msg; advance to the next. */
2256 if (c
->msgcurr
< c
->msgused
)
2259 struct msghdr
*m
= &c
->msglist
[c
->msgcurr
];
2261 res
= sendmsg(c
->sfd
, m
, 0);
2264 atomic_add_size(&ms_stats
.bytes_written
, res
);
2266 /* We've written some of the data. Remove the completed
2267 * iovec entries from the list of pending writes. */
2268 while (m
->msg_iovlen
> 0 && res
>= (ssize_t
)m
->msg_iov
->iov_len
)
2270 res
-= (ssize_t
)m
->msg_iov
->iov_len
;
2275 /* Might have written just part of the last iovec entry;
2276 * adjust it so the next write will do the rest. */
2279 m
->msg_iov
->iov_base
= (void *)((unsigned char *)m
->msg_iov
->iov_base
+ res
);
2280 m
->msg_iov
->iov_len
-= (size_t)res
;
2282 return TRANSMIT_INCOMPLETE
;
2284 if ((res
== -1) && ((errno
== EAGAIN
) || (errno
== EWOULDBLOCK
)))
2286 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2288 fprintf(stderr
, "Couldn't update event.\n");
2289 ms_conn_set_state(c
, conn_closing
);
2290 return TRANSMIT_HARD_ERROR
;
2292 return TRANSMIT_SOFT_ERROR
;
2295 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2296 * we have a real error, on which we close the connection */
2297 fprintf(stderr
, "Failed to write, and not due to blocking.\n");
2299 ms_conn_set_state(c
, conn_closing
);
2300 return TRANSMIT_HARD_ERROR
;
2304 return TRANSMIT_COMPLETE
;
2310 * Shrinks a connection's buffers if they're too big. This prevents
2311 * periodic large "mget" response from server chewing lots of client
2314 * This should only be called in between requests since it can wipe output
2317 * @param c, pointer of the concurrency
2319 static void ms_conn_shrink(ms_conn_t
*c
)
2326 if ((c
->rsize
> READ_BUFFER_HIGHWAT
) && (c
->rbytes
< DATA_BUFFER_SIZE
))
2330 if (c
->rcurr
!= c
->rbuf
)
2331 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
2333 newbuf
= (char *)realloc((void *)c
->rbuf
, DATA_BUFFER_SIZE
);
2338 c
->rsize
= DATA_BUFFER_SIZE
;
2343 if (c
->udp
&& (c
->rudpsize
> UDP_DATA_BUFFER_HIGHWAT
)
2344 && (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
< UDP_DATA_BUFFER_SIZE
))
2346 char *new_rbuf
= (char *)realloc(c
->rudpbuf
, (size_t)c
->rudpsize
* 2);
2349 c
->rudpbuf
= new_rbuf
;
2350 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
2352 /* TODO check error condition? */
2355 if (c
->msgsize
> MSG_LIST_HIGHWAT
)
2357 struct msghdr
*newbuf
= (struct msghdr
*)realloc(
2360 * sizeof(c
->msglist
[0]));
2364 c
->msgsize
= MSG_LIST_INITIAL
;
2366 /* TODO check error condition? */
2369 if (c
->iovsize
> IOV_LIST_HIGHWAT
)
2371 struct iovec
*newbuf
= (struct iovec
*)realloc((void *)c
->iov
,
2373 * sizeof(c
->iov
[0]));
2377 c
->iovsize
= IOV_LIST_INITIAL
;
2379 /* TODO check return value */
2381 } /* ms_conn_shrink */
2385 * Sets a connection's current state in the state machine. Any special
2386 * processing that needs to happen on certain state transitions can
2389 * @param c, pointer of the concurrency
2390 * @param state, connection state
2392 static void ms_conn_set_state(ms_conn_t
*c
, int state
)
2396 if (state
!= c
->state
)
2398 if (state
== conn_read
)
2404 } /* ms_conn_set_state */
2408 * update the event if socks change state. for example: when
2409 * change the listen scoket read event to sock write event, or
2410 * change socket handler, we could call this function.
2412 * @param c, pointer of the concurrency
2413 * @param new_flags, new event flags
2415 * @return bool, if success, return true, else return false
2417 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
)
2421 struct event_base
*base
= c
->event
.ev_base
;
2422 if ((c
->ev_flags
== new_flags
) && (ms_setting
.rep_write_srv
== 0)
2423 && (! ms_setting
.facebook_test
|| (c
->total_sfds
== 1)))
2428 if (event_del(&c
->event
) == -1)
2430 /* try to delete the event again */
2431 if (event_del(&c
->event
) == -1)
2437 event_set(&c
->event
,
2442 event_base_set(base
, &c
->event
);
2443 c
->ev_flags
= (short)new_flags
;
2445 if (event_add(&c
->event
, NULL
) == -1)
2451 } /* ms_update_event */
2455 * If user want to get the expected throughput, we could limit
2456 * the performance of memslap. we could give up some work and
2457 * just wait a short time. The function is used to check this
2460 * @param c, pointer of the concurrency
2462 * @return bool, if success, return true, else return false
2464 static bool ms_need_yield(ms_conn_t
*c
)
2466 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
2468 int64_t time_diff
= 0;
2469 struct timeval curr_time
;
2470 ms_task_t
*task
= &c
->curr_task
;
2472 if (ms_setting
.expected_tps
> 0)
2474 gettimeofday(&curr_time
, NULL
);
2475 time_diff
= ms_time_diff(&ms_thread
->startup_time
, &curr_time
);
2476 tps
= (int64_t)(((task
->get_opt
+ task
->set_opt
) / (uint64_t)time_diff
) * 1000000);
2478 /* current throughput is greater than expected throughput */
2479 if (tps
> ms_thread
->thread_ctx
->tps_perconn
)
2486 } /* ms_need_yield */
2490 * used to update the start time of each operation
2492 * @param c, pointer of the concurrency
2494 static void ms_update_start_time(ms_conn_t
*c
)
2496 ms_task_item_t
*item
= c
->curr_task
.item
;
2498 if ((ms_setting
.stat_freq
> 0) || c
->udp
2499 || ((c
->currcmd
.cmd
== CMD_SET
) && (item
->exp_time
> 0)))
2501 gettimeofday(&c
->start_time
, NULL
);
2502 if ((c
->currcmd
.cmd
== CMD_SET
) && (item
->exp_time
> 0))
2504 /* record the current time */
2505 item
->client_time
= c
->start_time
.tv_sec
;
2508 } /* ms_update_start_time */
2512 * run the state machine
2514 * @param c, pointer of the concurrency
2516 static void ms_drive_machine(ms_conn_t
*c
)
2529 if (c
->rbytes
>= c
->rvbytes
)
2531 ms_complete_nread(c
);
2537 if (ms_try_read_line(c
) != 0)
2543 if (ms_try_read_network(c
) != 0)
2548 /* doesn't read all the response data, wait event wake up */
2549 if (! c
->currcmd
.isfinish
)
2551 if (! ms_update_event(c
, EV_READ
| EV_PERSIST
))
2553 fprintf(stderr
, "Couldn't update event.\n");
2554 ms_conn_set_state(c
, conn_closing
);
2561 /* we have no command line and no data to read from network, next write */
2562 ms_conn_set_state(c
, conn_write
);
2563 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
2568 if (! c
->ctnwrite
&& ms_need_yield(c
))
2572 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2574 fprintf(stderr
, "Couldn't update event.\n");
2575 ms_conn_set_state(c
, conn_closing
);
2582 if (! c
->ctnwrite
&& (ms_exec_task(c
) != 0))
2584 ms_conn_set_state(c
, conn_closing
);
2588 /* record the start time before starting to send data if necessary */
2589 if (! c
->ctnwrite
|| (c
->change_sfd
&& c
->ctnwrite
))
2593 c
->change_sfd
= false;
2595 ms_update_start_time(c
);
2598 /* change sfd if necessary */
2606 /* execute task until nothing need be written to network */
2607 if (! c
->ctnwrite
&& (c
->msgcurr
== c
->msgused
))
2609 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2611 fprintf(stderr
, "Couldn't update event.\n");
2612 ms_conn_set_state(c
, conn_closing
);
2619 switch (ms_transmit(c
))
2621 case TRANSMIT_COMPLETE
:
2622 /* we have no data to write to network, next wait repose */
2623 if (! ms_update_event(c
, EV_READ
| EV_PERSIST
))
2625 fprintf(stderr
, "Couldn't update event.\n");
2626 ms_conn_set_state(c
, conn_closing
);
2630 ms_conn_set_state(c
, conn_read
);
2635 case TRANSMIT_INCOMPLETE
:
2637 break; /* Continue in state machine. */
2639 case TRANSMIT_HARD_ERROR
:
2643 case TRANSMIT_SOFT_ERROR
:
2655 /* recovery mode, need reconnect if connection close */
2656 if (ms_setting
.reconnect
&& (! ms_global
.time_out
2657 || ((ms_setting
.run_time
== 0)
2658 && (c
->remain_exec_num
> 0))))
2660 if (ms_reconn(c
) != 0)
2667 ms_reset_conn(c
, false);
2669 if (c
->total_sfds
== 1)
2671 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2673 fprintf(stderr
, "Couldn't update event.\n");
2674 ms_conn_set_state(c
, conn_closing
);
2692 } /* ms_drive_machine */
2696 * the event handler of each thread
2698 * @param fd, the file descriptor of socket
2699 * @param which, event flag
2700 * @param arg, argument
2702 void ms_event_handler(const int fd
, const short which
, void *arg
)
2704 ms_conn_t
*c
= (ms_conn_t
*)arg
;
2714 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2720 assert(fd
== c
->sfd
);
2722 ms_drive_machine(c
);
2724 /* wait for next event */
2725 } /* ms_event_handler */
2729 * get the next socket descriptor index to run for replication
2731 * @param c, pointer of the concurrency
2732 * @param cmd, command(get or set )
2734 * @return int, if success, return the index, else return EXIT_SUCCESS
2736 static uint32_t ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
)
2738 uint32_t sock_index
= 0;
2741 if (c
->total_sfds
== 1)
2743 return EXIT_SUCCESS
;
2746 if (ms_setting
.rep_write_srv
== 0)
2755 for (i
= 0; i
< ms_setting
.rep_write_srv
; i
++)
2757 if (c
->tcpsfd
[i
] > 0)
2763 if (i
== ms_setting
.rep_write_srv
)
2765 /* random get one replication server to read */
2766 sock_index
= (uint32_t)random() % c
->total_sfds
;
2770 /* random get one replication writing server to write */
2771 sock_index
= (uint32_t)random() % ms_setting
.rep_write_srv
;
2774 else if (cmd
== CMD_GET
)
2776 /* random get one replication server to read */
2777 sock_index
= (uint32_t)random() % c
->total_sfds
;
2780 while (c
->tcpsfd
[sock_index
] == 0);
2783 } /* ms_get_rep_sock_index */
2787 * get the next socket descriptor index to run
2789 * @param c, pointer of the concurrency
2791 * @return int, return the index
2793 static uint32_t ms_get_next_sock_index(ms_conn_t
*c
)
2795 uint32_t sock_index
= 0;
2799 sock_index
= (++c
->cur_idx
== c
->total_sfds
) ? 0 : c
->cur_idx
;
2801 while (c
->tcpsfd
[sock_index
] == 0);
2804 } /* ms_get_next_sock_index */
2808 * update socket event of the connections
2810 * @param c, pointer of the concurrency
2812 * @return int, if success, return EXIT_SUCCESS, else return -1
2814 static int ms_update_conn_sock_event(ms_conn_t
*c
)
2818 switch (c
->currcmd
.cmd
)
2821 if (ms_setting
.facebook_test
&& c
->udp
)
2823 c
->sfd
= c
->tcpsfd
[0];
2825 c
->change_sfd
= true;
2830 if (ms_setting
.facebook_test
&& ! c
->udp
)
2834 c
->change_sfd
= true;
2842 if (! c
->udp
&& (c
->total_sfds
> 1))
2844 if (c
->cur_idx
!= c
->total_sfds
)
2846 if (ms_setting
.rep_write_srv
== 0)
2848 c
->cur_idx
= ms_get_next_sock_index(c
);
2852 c
->cur_idx
= ms_get_rep_sock_index(c
, c
->currcmd
.cmd
);
2857 /* must select the first sock of the connection at the beginning */
2861 c
->sfd
= c
->tcpsfd
[c
->cur_idx
];
2862 assert(c
->sfd
!= 0);
2863 c
->change_sfd
= true;
2868 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2870 fprintf(stderr
, "Couldn't update event.\n");
2871 ms_conn_set_state(c
, conn_closing
);
2876 return EXIT_SUCCESS
;
2877 } /* ms_update_conn_sock_event */
2881 * for ASCII protocol, this function build the set command
2882 * string and send the command.
2884 * @param c, pointer of the concurrency
2885 * @param item, pointer of task item which includes the object
2888 * @return int, if success, return EXIT_SUCCESS, else return -1
2890 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2894 char *buffer
= c
->wbuf
;
2896 write_len
= snprintf(buffer
,
2903 if (write_len
> c
->wsize
|| write_len
< 0)
2905 /* ought to be always enough. just fail for simplicity */
2906 fprintf(stderr
, "output command line too long.\n");
2910 if (item
->value_offset
== INVALID_OFFSET
)
2912 value_offset
= item
->key_suffix_offset
;
2916 value_offset
= item
->value_offset
;
2919 if ((ms_add_iov(c
, "set ", 4) != 0)
2920 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
2921 (int)KEY_PREFIX_SIZE
) != 0)
2922 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2923 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0)
2924 || (ms_add_iov(c
, buffer
, write_len
) != 0)
2925 || (ms_add_iov(c
, &ms_setting
.char_block
[value_offset
],
2926 item
->value_size
) != 0)
2927 || (ms_add_iov(c
, "\r\n", 2) != 0)
2928 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
2933 return EXIT_SUCCESS
;
2934 } /* ms_build_ascii_write_buf_set */
2938 * used to send set command to server
2940 * @param c, pointer of the concurrency
2941 * @param item, pointer of task item which includes the object
2944 * @return int, if success, return EXIT_SUCCESS, else return -1
2946 int ms_mcd_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2950 c
->currcmd
.cmd
= CMD_SET
;
2951 c
->currcmd
.isfinish
= false;
2952 c
->currcmd
.retstat
= MCD_FAILURE
;
2954 if (ms_update_conn_sock_event(c
) != 0)
2962 if (ms_add_msghdr(c
) != 0)
2964 fprintf(stderr
, "Out of memory preparing request.");
2968 /* binary protocol */
2969 if (c
->protocol
== binary_prot
)
2971 if (ms_build_bin_write_buf_set(c
, item
) != 0)
2978 if (ms_build_ascii_write_buf_set(c
, item
) != 0)
2984 atomic_add_size(&ms_stats
.obj_bytes
,
2985 item
->key_size
+ item
->value_size
);
2986 atomic_add_size(&ms_stats
.cmd_set
, 1);
2988 return EXIT_SUCCESS
;
2993 * for ASCII protocol, this function build the get command
2994 * string and send the command.
2996 * @param c, pointer of the concurrency
2997 * @param item, pointer of task item which includes the object
3000 * @return int, if success, return EXIT_SUCCESS, else return -1
3002 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
)
3004 if ((ms_add_iov(c
, "get ", 4) != 0)
3005 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
3006 (int)KEY_PREFIX_SIZE
) != 0)
3007 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
3008 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0)
3009 || (ms_add_iov(c
, "\r\n", 2) != 0)
3010 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
3015 return EXIT_SUCCESS
;
3016 } /* ms_build_ascii_write_buf_get */
3020 * used to send the get command to server
3022 * @param c, pointer of the concurrency
3023 * @param item, pointer of task item which includes the object
3026 * @return int, if success, return EXIT_SUCCESS, else return -1
3028 int ms_mcd_get(ms_conn_t
*c
, ms_task_item_t
*item
)
3032 c
->currcmd
.cmd
= CMD_GET
;
3033 c
->currcmd
.isfinish
= false;
3034 c
->currcmd
.retstat
= MCD_FAILURE
;
3036 if (ms_update_conn_sock_event(c
) != 0)
3044 if (ms_add_msghdr(c
) != 0)
3046 fprintf(stderr
, "Out of memory preparing request.");
3050 /* binary protocol */
3051 if (c
->protocol
== binary_prot
)
3053 if (ms_build_bin_write_buf_get(c
, item
) != 0)
3060 if (ms_build_ascii_write_buf_get(c
, item
) != 0)
3066 atomic_add_size(&ms_stats
.cmd_get
, 1);
3068 return EXIT_SUCCESS
;
3073 * for ASCII protocol, this function build the multi-get command
3074 * string and send the command.
3076 * @param c, pointer of the concurrency
3078 * @return int, if success, return EXIT_SUCCESS, else return -1
3080 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
)
3082 ms_task_item_t
*item
;
3084 if (ms_add_iov(c
, "get", 3) != 0)
3089 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3091 item
= c
->mlget_task
.mlget_item
[i
].item
;
3092 assert(item
!= NULL
);
3093 if ((ms_add_iov(c
, " ", 1) != 0)
3094 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
3095 (int)KEY_PREFIX_SIZE
) != 0)
3096 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
3097 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0))
3103 if ((ms_add_iov(c
, "\r\n", 2) != 0)
3104 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
3109 return EXIT_SUCCESS
;
3110 } /* ms_build_ascii_write_buf_mlget */
3114 * used to send the multi-get command to server
3116 * @param c, pointer of the concurrency
3118 * @return int, if success, return EXIT_SUCCESS, else return -1
3120 int ms_mcd_mlget(ms_conn_t
*c
)
3122 ms_task_item_t
*item
;
3125 assert(c
->mlget_task
.mlget_num
>= 1);
3127 c
->currcmd
.cmd
= CMD_GET
;
3128 c
->currcmd
.isfinish
= false;
3129 c
->currcmd
.retstat
= MCD_FAILURE
;
3131 if (ms_update_conn_sock_event(c
) != 0)
3139 if (ms_add_msghdr(c
) != 0)
3141 fprintf(stderr
, "Out of memory preparing request.");
3145 /* binary protocol */
3146 if (c
->protocol
== binary_prot
)
3148 if (ms_build_bin_write_buf_mlget(c
) != 0)
3155 if (ms_build_ascii_write_buf_mlget(c
) != 0)
3161 /* decrease operation time of each item */
3162 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3164 item
= c
->mlget_task
.mlget_item
[i
].item
;
3165 atomic_add_size(&ms_stats
.cmd_get
, 1);
3170 return EXIT_SUCCESS
;
3171 } /* ms_mcd_mlget */
3175 * binary protocol support
3179 * for binary protocol, parse the response of server
3181 * @param c, pointer of the concurrency
3183 * @return int, if success, return EXIT_SUCCESS, else return -1
3185 static int ms_bin_process_response(ms_conn_t
*c
)
3187 const char *errstr
= NULL
;
3191 uint32_t bodylen
= c
->binary_header
.response
.bodylen
;
3192 uint8_t opcode
= c
->binary_header
.response
.opcode
;
3193 uint16_t status
= c
->binary_header
.response
.status
;
3197 c
->rvbytes
= (int32_t)bodylen
;
3199 return EXIT_FAILURE
;
3205 case PROTOCOL_BINARY_RESPONSE_SUCCESS
:
3206 if (opcode
== PROTOCOL_BINARY_CMD_SET
)
3208 c
->currcmd
.retstat
= MCD_STORED
;
3210 else if (opcode
== PROTOCOL_BINARY_CMD_DELETE
)
3212 c
->currcmd
.retstat
= MCD_DELETED
;
3214 else if (opcode
== PROTOCOL_BINARY_CMD_GET
)
3216 c
->currcmd
.retstat
= MCD_END
;
3220 case PROTOCOL_BINARY_RESPONSE_ENOMEM
:
3221 errstr
= "Out of memory";
3222 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
3225 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
:
3226 errstr
= "Unknown command";
3227 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
3230 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
:
3231 errstr
= "Not found";
3232 c
->currcmd
.retstat
= MCD_NOTFOUND
;
3235 case PROTOCOL_BINARY_RESPONSE_EINVAL
:
3236 errstr
= "Invalid arguments";
3237 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
3240 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
:
3241 errstr
= "Data exists for key.";
3244 case PROTOCOL_BINARY_RESPONSE_E2BIG
:
3245 errstr
= "Too large.";
3246 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
3249 case PROTOCOL_BINARY_RESPONSE_NOT_STORED
:
3250 errstr
= "Not stored.";
3251 c
->currcmd
.retstat
= MCD_NOTSTORED
;
3255 errstr
= "Unknown error";
3256 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
3262 fprintf(stderr
, "%s\n", errstr
);
3266 return EXIT_SUCCESS
;
3267 } /* ms_bin_process_response */
3270 /* build binary header and add the header to the buffer to send */
3273 * build binary header and add the header to the buffer to send
3275 * @param c, pointer of the concurrency
3276 * @param opcode, operation code
3277 * @param hdr_len, length of header
3278 * @param key_len, length of key
3279 * @param body_len. length of body
3281 static void ms_add_bin_header(ms_conn_t
*c
,
3287 protocol_binary_request_header
*header
;
3291 header
= (protocol_binary_request_header
*)c
->wcurr
;
3293 header
->request
.magic
= (uint8_t)PROTOCOL_BINARY_REQ
;
3294 header
->request
.opcode
= (uint8_t)opcode
;
3295 header
->request
.keylen
= htons(key_len
);
3297 header
->request
.extlen
= (uint8_t)hdr_len
;
3298 header
->request
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
;
3299 header
->request
.vbucket
= 0;
3301 header
->request
.bodylen
= htonl(body_len
);
3302 header
->request
.opaque
= 0;
3303 header
->request
.cas
= 0;
3305 ms_add_iov(c
, c
->wcurr
, sizeof(header
->request
));
3306 } /* ms_add_bin_header */
3310 * add the key to the socket write buffer array
3312 * @param c, pointer of the concurrency
3313 * @param item, pointer of task item which includes the object
3316 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
)
3318 ms_add_iov(c
, (char *)&item
->key_prefix
, (int)KEY_PREFIX_SIZE
);
3319 ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
3320 item
->key_size
- (int)KEY_PREFIX_SIZE
);
3325 * for binary protocol, this function build the set command
3326 * and add the command to send buffer array.
3328 * @param c, pointer of the concurrency
3329 * @param item, pointer of task item which includes the object
3332 * @return int, if success, return EXIT_SUCCESS, else return -1
3334 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
)
3336 assert(c
->wbuf
== c
->wcurr
);
3339 protocol_binary_request_set
*rep
= (protocol_binary_request_set
*)c
->wcurr
;
3340 uint16_t keylen
= (uint16_t)item
->key_size
;
3341 uint32_t bodylen
= (uint32_t)sizeof(rep
->message
.body
)
3342 + (uint32_t)keylen
+ (uint32_t)item
->value_size
;
3344 ms_add_bin_header(c
,
3345 PROTOCOL_BINARY_CMD_SET
,
3346 sizeof(rep
->message
.body
),
3349 rep
->message
.body
.flags
= 0;
3350 rep
->message
.body
.expiration
= htonl((uint32_t)item
->exp_time
);
3351 ms_add_iov(c
, &rep
->message
.body
, sizeof(rep
->message
.body
));
3352 ms_add_key_to_iov(c
, item
);
3354 if (item
->value_offset
== INVALID_OFFSET
)
3356 value_offset
= item
->key_suffix_offset
;
3360 value_offset
= item
->value_offset
;
3362 ms_add_iov(c
, &ms_setting
.char_block
[value_offset
], item
->value_size
);
3364 return EXIT_SUCCESS
;
3365 } /* ms_build_bin_write_buf_set */
3369 * for binary protocol, this function build the get command and
3370 * add the command to send buffer array.
3372 * @param c, pointer of the concurrency
3373 * @param item, pointer of task item which includes the object
3376 * @return int, if success, return EXIT_SUCCESS, else return -1
3378 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
)
3380 assert(c
->wbuf
== c
->wcurr
);
3382 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_GET
, 0, (uint16_t)item
->key_size
,
3383 (uint32_t)item
->key_size
);
3384 ms_add_key_to_iov(c
, item
);
3386 return EXIT_SUCCESS
;
3387 } /* ms_build_bin_write_buf_get */
3391 * for binary protocol, this function build the multi-get
3392 * command and add the command to send buffer array.
3394 * @param c, pointer of the concurrency
3395 * @param item, pointer of task item which includes the object
3398 * @return int, if success, return EXIT_SUCCESS, else return -1
3400 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
)
3402 ms_task_item_t
*item
;
3404 assert(c
->wbuf
== c
->wcurr
);
3406 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3408 item
= c
->mlget_task
.mlget_item
[i
].item
;
3409 assert(item
!= NULL
);
3411 ms_add_bin_header(c
,
3412 PROTOCOL_BINARY_CMD_GET
,
3414 (uint16_t)item
->key_size
,
3415 (uint32_t)item
->key_size
);
3416 ms_add_key_to_iov(c
, item
);
3417 c
->wcurr
+= sizeof(protocol_binary_request_get
);
3422 return EXIT_SUCCESS
;
3423 } /* ms_build_bin_write_buf_mlget */