3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
20 #include <netinet/tcp.h>
21 #include <arpa/inet.h>
22 #include "ms_setting.h"
23 #include "ms_thread.h"
24 #include "ms_atomic.h"
27 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
28 * optimize the conversion functions, but the prototypes generate warnings
29 * from gcc. The conversion methods isn't the bottleneck for my app, so
30 * just remove the warnings by undef'ing the optimization ..
38 /* for network write */
39 #define TRANSMIT_COMPLETE 0
40 #define TRANSMIT_INCOMPLETE 1
41 #define TRANSMIT_SOFT_ERROR 2
42 #define TRANSMIT_HARD_ERROR 3
44 /* for generating key */
45 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
46 #define KEY_PREFIX_MASK 0x1010101010101010
48 /* For parse the value length return by server */
50 #define VALUELEN_TOKEN 3
52 /* global increasing counter, to ensure the key prefix unique */
53 static uint64_t key_prefix_seq
= KEY_PREFIX_BASE
;
55 /* global increasing counter, generating request id for UDP */
56 static volatile uint32_t udp_request_id
= 0;
58 extern pthread_key_t ms_thread_key
;
60 /* generate upd request id */
61 static uint32_t ms_get_udp_request_id(void);
64 /* connect initialize */
65 static void ms_task_init(ms_conn_t
*c
);
66 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
);
67 static int ms_conn_sock_init(ms_conn_t
*c
);
68 static int ms_conn_event_init(ms_conn_t
*c
);
69 static int ms_conn_init(ms_conn_t
*c
,
71 const int read_buffer_size
,
73 static void ms_warmup_num_init(ms_conn_t
*c
);
74 static int ms_item_win_init(ms_conn_t
*c
);
77 /* connection close */
78 void ms_conn_free(ms_conn_t
*c
);
79 static void ms_conn_close(ms_conn_t
*c
);
82 /* create network connection */
83 static int ms_new_socket(struct addrinfo
*ai
);
84 static void ms_maximize_sndbuf(const int sfd
);
85 static int ms_network_connect(ms_conn_t
*c
,
90 static int ms_reconn(ms_conn_t
*c
);
94 static int ms_tokenize_command(char *command
,
96 const int max_tokens
);
97 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
);
98 static int ms_try_read_line(ms_conn_t
*c
);
99 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
);
100 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
);
101 static int ms_try_read_network(ms_conn_t
*c
);
102 static void ms_verify_value(ms_conn_t
*c
,
103 ms_mlget_task_item_t
*mlget_item
,
106 static void ms_ascii_complete_nread(ms_conn_t
*c
);
107 static void ms_bin_complete_nread(ms_conn_t
*c
);
108 static void ms_complete_nread(ms_conn_t
*c
);
112 static int ms_add_msghdr(ms_conn_t
*c
);
113 static int ms_ensure_iov_space(ms_conn_t
*c
);
114 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
);
115 static int ms_build_udp_headers(ms_conn_t
*c
);
116 static int ms_transmit(ms_conn_t
*c
);
119 /* status adjustment */
120 static void ms_conn_shrink(ms_conn_t
*c
);
121 static void ms_conn_set_state(ms_conn_t
*c
, int state
);
122 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
);
123 static int ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
);
124 static int ms_get_next_sock_index(ms_conn_t
*c
);
125 static int ms_update_conn_sock_event(ms_conn_t
*c
);
126 static bool ms_need_yield(ms_conn_t
*c
);
127 static void ms_update_start_time(ms_conn_t
*c
);
131 static void ms_drive_machine(ms_conn_t
*c
);
132 void ms_event_handler(const int fd
, const short which
, void *arg
);
136 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
137 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
138 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
);
141 /* binary protocol */
142 static int ms_bin_process_response(ms_conn_t
*c
);
143 static void ms_add_bin_header(ms_conn_t
*c
,
148 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
);
149 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
150 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
151 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
);
155 * each key has two parts, prefix and suffix. The suffix is a
156 * string random get form the character table. The prefix is a
157 * uint64_t variable. And the prefix must be unique. we use the
158 * prefix to identify a key. And the prefix can't include
159 * character ' ' '\r' '\n' '\0'.
163 uint64_t ms_get_key_prefix(void)
167 pthread_mutex_lock(&ms_global
.seq_mutex
);
168 key_prefix_seq
|= KEY_PREFIX_MASK
;
169 key_prefix
= key_prefix_seq
;
171 pthread_mutex_unlock(&ms_global
.seq_mutex
);
174 } /* ms_get_key_prefix */
178 * get an unique udp request id
180 * @return an unique UDP request id
182 static uint32_t ms_get_udp_request_id(void)
184 return atomic_add_32_nv(&udp_request_id
, 1);
189 * initialize current task structure
191 * @param c, pointer of the concurrency
193 static void ms_task_init(ms_conn_t
*c
)
195 c
->curr_task
.cmd
= CMD_NULL
;
196 c
->curr_task
.item
= 0;
197 c
->curr_task
.verify
= false;
198 c
->curr_task
.finish_verify
= true;
199 c
->curr_task
.get_miss
= true;
201 c
->curr_task
.get_opt
= 0;
202 c
->curr_task
.set_opt
= 0;
203 c
->curr_task
.cycle_undo_get
= 0;
204 c
->curr_task
.cycle_undo_set
= 0;
205 c
->curr_task
.verified_get
= 0;
206 c
->curr_task
.overwrite_set
= 0;
211 * initialize udp for the connection structure
213 * @param c, pointer of the concurrency
214 * @param is_udp, whether it's udp
216 * @return int, if success, return 0, else return -1
218 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
)
224 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
235 if (c
->udp
|| (! c
->udp
&& ms_setting
.facebook_test
))
237 c
->rudpbuf
= (char *)malloc((size_t)c
->rudpsize
);
238 c
->udppkt
= (ms_udppkt_t
*)malloc(MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
240 if ((c
->rudpbuf
== NULL
) || (c
->udppkt
== NULL
))
242 if (c
->rudpbuf
!= NULL
)
244 if (c
->udppkt
!= NULL
)
246 fprintf(stderr
, "malloc()\n");
249 memset(c
->udppkt
, 0, MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
253 } /* ms_conn_udp_init */
257 * initialize the connection structure
259 * @param c, pointer of the concurrency
260 * @param init_state, (conn_read, conn_write, conn_closing)
261 * @param read_buffer_size
262 * @param is_udp, whether it's udp
264 * @return int, if success, return 0, else return -1
266 static int ms_conn_init(ms_conn_t
*c
,
267 const int init_state
,
268 const int read_buffer_size
,
277 c
->rsize
= read_buffer_size
;
278 c
->wsize
= WRITE_BUFFER_SIZE
;
279 c
->iovsize
= IOV_LIST_INITIAL
;
280 c
->msgsize
= MSG_LIST_INITIAL
;
282 /* for replication, each connection need connect all the server */
283 if (ms_setting
.rep_write_srv
> 0)
285 c
->total_sfds
= ms_setting
.srv_cnt
;
289 c
->total_sfds
= ms_setting
.sock_per_conn
;
293 c
->rbuf
= (char *)malloc((size_t)c
->rsize
);
294 c
->wbuf
= (char *)malloc((size_t)c
->wsize
);
295 c
->iov
= (struct iovec
*)malloc(sizeof(struct iovec
) * (size_t)c
->iovsize
);
296 c
->msglist
= (struct msghdr
*)malloc(
297 sizeof(struct msghdr
) * (size_t)c
->msgsize
);
298 if (ms_setting
.mult_key_num
> 1)
300 c
->mlget_task
.mlget_item
= (ms_mlget_task_item_t
*)
302 sizeof(ms_mlget_task_item_t
) * (size_t)ms_setting
.mult_key_num
);
304 c
->tcpsfd
= (int *)malloc((size_t)c
->total_sfds
* sizeof(int));
306 if ((c
->rbuf
== NULL
) || (c
->wbuf
== NULL
) || (c
->iov
== NULL
)
307 || (c
->msglist
== NULL
) || (c
->tcpsfd
== NULL
)
308 || ((ms_setting
.mult_key_num
> 1)
309 && (c
->mlget_task
.mlget_item
== NULL
)))
317 if (c
->msglist
!= NULL
)
319 if (c
->mlget_task
.mlget_item
!= NULL
)
320 free(c
->mlget_task
.mlget_item
);
321 if (c
->tcpsfd
!= NULL
)
323 fprintf(stderr
, "malloc()\n");
327 c
->state
= init_state
;
335 c
->cur_idx
= c
->total_sfds
; /* default index is a invalid value */
339 c
->change_sfd
= false;
341 c
->precmd
.cmd
= c
->currcmd
.cmd
= CMD_NULL
;
342 c
->precmd
.isfinish
= true; /* default the previous command finished */
343 c
->currcmd
.isfinish
= false;
344 c
->precmd
.retstat
= c
->currcmd
.retstat
= MCD_FAILURE
;
345 c
->precmd
.key_prefix
= c
->currcmd
.key_prefix
= 0;
347 c
->mlget_task
.mlget_num
= 0;
348 c
->mlget_task
.value_index
= -1; /* default invalid value */
350 if (ms_setting
.binary_prot
)
352 c
->protocol
= binary_prot
;
356 c
->protocol
= ascii_udp_prot
;
360 c
->protocol
= ascii_prot
;
364 if (ms_conn_udp_init(c
, is_udp
) != 0)
369 /* initialize task */
372 if (! (ms_setting
.facebook_test
&& is_udp
))
374 atomic_add_32(&ms_stats
.active_conns
, 1);
382 * when doing 100% get operation, it could preset some objects
383 * to warmup the server. this function is used to initialize the
384 * number of the objects to preset.
386 * @param c, pointer of the concurrency
388 static void ms_warmup_num_init(ms_conn_t
*c
)
390 /* no set operation, preset all the items in the window */
391 if (ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
< PROP_ERROR
)
393 c
->warmup_num
= c
->win_size
;
394 c
->remain_warmup_num
= c
->warmup_num
;
399 c
->remain_warmup_num
= c
->warmup_num
;
401 } /* ms_warmup_num_init */
405 * each connection has an item window, this function initialize
406 * the window. The window is used to generate task.
408 * @param c, pointer of the concurrency
410 * @return int, if success, return 0, else return -1
412 static int ms_item_win_init(ms_conn_t
*c
)
414 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
417 c
->win_size
= (int)ms_setting
.win_size
;
419 c
->exec_num
= ms_thread
->thread_ctx
->exec_num_perconn
;
420 c
->remain_exec_num
= c
->exec_num
;
422 c
->item_win
= (ms_task_item_t
*)malloc(
423 sizeof(ms_task_item_t
) * (size_t)c
->win_size
);
424 if (c
->item_win
== NULL
)
426 fprintf(stderr
, "Can't allocate task item array for conn.\n");
429 memset(c
->item_win
, 0, sizeof(ms_task_item_t
) * (size_t)c
->win_size
);
431 for (int i
= 0; i
< c
->win_size
; i
++)
433 c
->item_win
[i
].key_size
= (int)ms_setting
.distr
[i
].key_size
;
434 c
->item_win
[i
].key_prefix
= ms_get_key_prefix();
435 c
->item_win
[i
].key_suffix_offset
= ms_setting
.distr
[i
].key_offset
;
436 c
->item_win
[i
].value_size
= (int)ms_setting
.distr
[i
].value_size
;
437 c
->item_win
[i
].value_offset
= INVALID_OFFSET
; /* default in invalid offset */
438 c
->item_win
[i
].client_time
= 0;
440 /* set expire time base on the proportion */
441 if (exp_cnt
< ms_setting
.exp_ver_per
* i
)
443 c
->item_win
[i
].exp_time
= FIXED_EXPIRE_TIME
;
448 c
->item_win
[i
].exp_time
= 0;
452 ms_warmup_num_init(c
);
455 } /* ms_item_win_init */
459 * each connection structure can include one or more sock
460 * handlers. this function create these socks and connect the
463 * @param c, pointer of the concurrency
465 * @return int, if success, return 0, else return -1
467 static int ms_conn_sock_init(ms_conn_t
*c
)
469 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
475 assert(c
->tcpsfd
!= NULL
);
477 for (i
= 0; i
< c
->total_sfds
; i
++)
480 if (ms_setting
.rep_write_srv
> 0)
482 /* for replication, each connection need connect all the server */
487 /* all the connections in a thread connects the same server */
488 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
491 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
492 ms_setting
.servers
[srv_idx
].srv_port
,
493 ms_setting
.udp
, &ret_sfd
) != 0)
503 if (! ms_setting
.udp
)
505 c
->tcpsfd
[i
]= ret_sfd
;
511 /* initialize udp sock handler if necessary */
512 if (ms_setting
.facebook_test
)
515 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
516 ms_setting
.servers
[srv_idx
].srv_port
,
517 true, &ret_sfd
) != 0)
527 if ((i
!= c
->total_sfds
) || (ms_setting
.facebook_test
&& (c
->udpsfd
== 0)))
535 for (int j
= 0; j
< i
; j
++)
550 } /* ms_conn_sock_init */
554 * each connection is managed by libevent, this function
555 * initialize the event of the connection structure.
557 * @param c, pointer of the concurrency
559 * @return int, if success, return 0, else return -1
561 static int ms_conn_event_init(ms_conn_t
*c
)
563 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
564 /* default event timeout 10 seconds */
567 .tv_sec
= EVENT_TIMEOUT
, .tv_usec
= 0
569 short event_flags
= EV_WRITE
| EV_PERSIST
;
571 event_set(&c
->event
, c
->sfd
, event_flags
, ms_event_handler
, (void *)c
);
572 event_base_set(ms_thread
->base
, &c
->event
);
573 c
->ev_flags
= event_flags
;
575 if (c
->total_sfds
== 1)
577 if (event_add(&c
->event
, NULL
) == -1)
584 if (event_add(&c
->event
, &t
) == -1)
591 } /* ms_conn_event_init */
595 * setup a connection, each connection structure of each
596 * thread must call this function to initialize.
598 * @param c, pointer of the concurrency
600 * @return int, if success, return 0, else return -1
602 int ms_setup_conn(ms_conn_t
*c
)
604 if (ms_item_win_init(c
) != 0)
609 if (ms_conn_init(c
, conn_write
, DATA_BUFFER_SIZE
, ms_setting
.udp
) != 0)
614 if (ms_conn_sock_init(c
) != 0)
619 if (ms_conn_event_init(c
) != 0)
625 } /* ms_setup_conn */
629 * Frees a connection.
631 * @param c, pointer of the concurrency
633 void ms_conn_free(ms_conn_t
*c
)
635 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
638 if (c
->hdrbuf
!= NULL
)
640 if (c
->msglist
!= NULL
)
648 if (c
->mlget_task
.mlget_item
!= NULL
)
649 free(c
->mlget_task
.mlget_item
);
650 if (c
->rudpbuf
!= NULL
)
652 if (c
->udppkt
!= NULL
)
654 if (c
->item_win
!= NULL
)
656 if (c
->tcpsfd
!= NULL
)
659 if (--ms_thread
->nactive_conn
== 0)
661 free(ms_thread
->conn
);
670 * @param c, pointer of the concurrency
672 static void ms_conn_close(ms_conn_t
*c
)
674 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
677 /* delete the event, the socket and the connection */
678 event_del(&c
->event
);
680 for (int i
= 0; i
< c
->total_sfds
; i
++)
682 if (c
->tcpsfd
[i
] > 0)
689 if (ms_setting
.facebook_test
)
694 atomic_dec_32(&ms_stats
.active_conns
);
698 if (ms_setting
.run_time
== 0)
700 pthread_mutex_lock(&ms_global
.run_lock
.lock
);
701 ms_global
.run_lock
.count
++;
702 pthread_cond_signal(&ms_global
.run_lock
.cond
);
703 pthread_mutex_unlock(&ms_global
.run_lock
.lock
);
706 if (ms_thread
->nactive_conn
== 0)
710 } /* ms_conn_close */
716 * @param ai, server address information
718 * @return int, if success, return 0, else return -1
720 static int ms_new_socket(struct addrinfo
*ai
)
724 if ((sfd
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
)) == -1)
726 fprintf(stderr
, "socket() error: %s.\n", strerror(errno
));
731 } /* ms_new_socket */
735 * Sets a socket's send buffer size to the maximum allowed by the system.
737 * @param sfd, file descriptor of socket
739 static void ms_maximize_sndbuf(const int sfd
)
741 socklen_t intsize
= sizeof(int);
742 unsigned int last_good
= 0;
743 unsigned int min
, max
, avg
;
744 unsigned int old_size
;
746 /* Start with the default size. */
747 if (getsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, &old_size
, &intsize
) != 0)
749 fprintf(stderr
, "getsockopt(SO_SNDBUF)\n");
753 /* Binary-search for the real maximum. */
755 max
= MAX_SENDBUF_SIZE
;
759 avg
= ((unsigned int)(min
+ max
)) / 2;
760 if (setsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, (void *)&avg
, intsize
) == 0)
770 } /* ms_maximize_sndbuf */
774 * socket connects the server
776 * @param c, pointer of the concurrency
777 * @param srv_host_name, the host name of the server
778 * @param srv_port, port of server
779 * @param is_udp, whether it's udp
780 * @param ret_sfd, the connected socket file descriptor
782 * @return int, if success, return 0, else return -1
784 static int ms_network_connect(ms_conn_t
*c
,
796 struct addrinfo
*next
;
797 struct addrinfo hints
;
798 char port_buf
[NI_MAXSERV
];
805 * the memset call clears nonstandard fields in some impementations
806 * that otherwise mess things up.
808 memset(&hints
, 0, sizeof(hints
));
809 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
812 hints
.ai_protocol
= IPPROTO_UDP
;
813 hints
.ai_socktype
= SOCK_DGRAM
;
814 hints
.ai_family
= AF_INET
; /* This left here because of issues with OSX 10.5 */
818 hints
.ai_family
= AF_UNSPEC
;
819 hints
.ai_protocol
= IPPROTO_TCP
;
820 hints
.ai_socktype
= SOCK_STREAM
;
823 snprintf(port_buf
, NI_MAXSERV
, "%d", srv_port
);
824 error
= getaddrinfo(srv_host_name
, port_buf
, &hints
, &ai
);
827 if (error
!= EAI_SYSTEM
)
828 fprintf(stderr
, "getaddrinfo(): %s.\n", gai_strerror(error
));
830 perror("getaddrinfo()\n");
835 for (next
= ai
; next
; next
= next
->ai_next
)
837 if ((sfd
= ms_new_socket(next
)) == -1)
843 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
));
846 ms_maximize_sndbuf(sfd
);
850 setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
,
852 setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
853 setsockopt(sfd
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
,
859 c
->srv_recv_addr_size
= sizeof(struct sockaddr
);
860 memcpy(&c
->srv_recv_addr
, next
->ai_addr
, c
->srv_recv_addr_size
);
864 if (connect(sfd
, next
->ai_addr
, next
->ai_addrlen
) == -1)
872 if (((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0)
873 || (fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0))
875 fprintf(stderr
, "setting O_NONBLOCK\n");
891 /* Return zero if we detected no errors in starting up connections */
893 } /* ms_network_connect */
897 * reconnect a disconnected sock
899 * @param c, pointer of the concurrency
901 * @return int, if success, return 0, else return -1
903 static int ms_reconn(ms_conn_t
*c
)
905 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
907 int32_t srv_conn_cnt
= 0;
909 if (ms_setting
.rep_write_srv
> 0)
912 srv_conn_cnt
= ms_setting
.nconns
;
916 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
917 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
920 /* close the old socket handler */
922 c
->tcpsfd
[c
->cur_idx
]= 0;
924 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].disconn_cnt
, 1)
925 % (uint32_t)srv_conn_cnt
== 0)
927 gettimeofday(&ms_setting
.servers
[srv_idx
].disconn_time
, NULL
);
928 fprintf(stderr
, "Server %s:%d disconnect\n",
929 ms_setting
.servers
[srv_idx
].srv_host_name
,
930 ms_setting
.servers
[srv_idx
].srv_port
);
933 if (ms_setting
.rep_write_srv
> 0)
936 for (i
= 0; i
< c
->total_sfds
; i
++)
938 if (c
->tcpsfd
[i
] != 0)
944 /* all socks disconnect */
945 if (i
== c
->total_sfds
)
954 /* reconnect success, break the loop */
955 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
956 ms_setting
.servers
[srv_idx
].srv_port
,
957 ms_setting
.udp
, &c
->sfd
) == 0)
959 c
->tcpsfd
[c
->cur_idx
]= c
->sfd
;
960 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1)
961 % (uint32_t)srv_conn_cnt
== 0)
963 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
965 (int)(ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
966 - ms_setting
.servers
[srv_idx
].disconn_time
968 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
969 ms_setting
.servers
[srv_idx
].srv_host_name
,
970 ms_setting
.servers
[srv_idx
].srv_port
, reconn_time
);
975 if (c
->total_sfds
== 1)
977 /* wait a second and reconnect */
981 while (c
->total_sfds
== 1);
984 if ((c
->total_sfds
> 1) && (c
->tcpsfd
[c
->cur_idx
] == 0))
995 * reconnect several disconnected socks in the connection
996 * structure, the ever-1-second timer of the thread will check
997 * whether some socks in the connections disconnect. if
998 * disconnect, reconnect the sock.
1000 * @param c, pointer of the concurrency
1002 * @return int, if success, return 0, else return -1
1004 int ms_reconn_socks(ms_conn_t
*c
)
1006 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
1009 int srv_conn_cnt
= 0;
1010 struct timeval cur_time
;
1014 if ((c
->total_sfds
== 1) || (c
->total_sfds
== c
->alive_sfds
))
1019 for (int i
= 0; i
< c
->total_sfds
; i
++)
1021 if (c
->tcpsfd
[i
] == 0)
1023 gettimeofday(&cur_time
, NULL
);
1026 * For failover test of replication, reconnect the socks after
1027 * it disconnects more than 5 seconds, Otherwise memslap will
1028 * block at connect() function and the work threads can't work
1032 - ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
< 5)
1037 if (ms_setting
.rep_write_srv
> 0)
1040 srv_conn_cnt
= ms_setting
.nconns
;
1044 srv_idx
= ms_thread
->thread_ctx
->srv_idx
;
1045 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
1048 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
1049 ms_setting
.servers
[srv_idx
].srv_port
,
1050 ms_setting
.udp
, &ret_sfd
) == 0)
1052 c
->tcpsfd
[i
]= ret_sfd
;
1055 if (atomic_add_32_nv(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1)
1056 % (uint32_t)srv_conn_cnt
== 0)
1058 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
1060 (int)(ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
1061 - ms_setting
.servers
[srv_idx
].disconn_time
1063 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
1064 ms_setting
.servers
[srv_idx
].srv_host_name
,
1065 ms_setting
.servers
[srv_idx
].srv_port
, reconn_time
);
1072 } /* ms_reconn_socks */
1076 * Tokenize the command string by replacing whitespace with '\0' and update
1077 * the token array tokens with pointer to start of each token and length.
1078 * Returns total number of tokens. The last valid token is the terminal
1079 * token (value points to the first unprocessed character of the string and
1084 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1085 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1088 * ncommand = tokens[ix].value - command;
1089 * command = tokens[ix].value;
1092 * @param command, the command string to token
1093 * @param tokens, array to store tokens
1094 * @param max_tokens, maximum tokens number
1096 * @return int, the number of tokens
1098 static int ms_tokenize_command(char *command
,
1100 const int max_tokens
)
1105 assert(command
!= NULL
&& tokens
!= NULL
&& max_tokens
> 1);
1107 for (s
= e
= command
; ntokens
< max_tokens
- 1; ++e
)
1113 tokens
[ntokens
].value
= s
;
1114 tokens
[ntokens
].length
= (size_t)(e
- s
);
1120 else if (*e
== '\0')
1124 tokens
[ntokens
].value
= s
;
1125 tokens
[ntokens
].length
= (size_t)(e
- s
);
1129 break; /* string end */
1134 } /* ms_tokenize_command */
1138 * parse the response of server.
1140 * @param c, pointer of the concurrency
1141 * @param command, the string responded by server
1143 * @return int, if the command completed return 0, else return
1146 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
)
1150 char *buffer
= command
;
1155 * for command get, we store the returned value into local buffer
1156 * then continue in ms_complete_nread().
1161 case 'V': /* VALUE || VERSION */
1162 if (buffer
[1] == 'A') /* VALUE */
1164 token_t tokens
[MAX_TOKENS
];
1165 ms_tokenize_command(command
, tokens
, MAX_TOKENS
);
1166 value_len
= strtol(tokens
[VALUELEN_TOKEN
].value
, NULL
, 10);
1167 c
->currcmd
.key_prefix
= *(uint64_t *)tokens
[KEY_TOKEN
].value
;
1170 * We read the \r\n into the string since not doing so is more
1171 * cycles then the waster of memory to do so.
1173 * We are null terminating through, which will most likely make
1174 * some people lazy about using the return length.
1176 c
->rvbytes
= (int)(value_len
+ 2);
1184 c
->currcmd
.retstat
= MCD_SUCCESS
;
1186 case 'S': /* STORED STATS SERVER_ERROR */
1187 if (buffer
[2] == 'A') /* STORED STATS */
1189 c
->currcmd
.retstat
= MCD_STAT
;
1191 else if (buffer
[1] == 'E')
1194 printf("<%d %s\n", c
->sfd
, buffer
);
1196 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
1198 else if (buffer
[1] == 'T')
1201 c
->currcmd
.retstat
= MCD_STORED
;
1205 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1209 case 'D': /* DELETED DATA */
1210 if (buffer
[1] == 'E')
1212 c
->currcmd
.retstat
= MCD_DELETED
;
1216 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1221 case 'N': /* NOT_FOUND NOT_STORED*/
1222 if (buffer
[4] == 'F')
1224 c
->currcmd
.retstat
= MCD_NOTFOUND
;
1226 else if (buffer
[4] == 'S')
1228 printf("<%d %s\n", c
->sfd
, buffer
);
1229 c
->currcmd
.retstat
= MCD_NOTSTORED
;
1233 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1237 case 'E': /* PROTOCOL ERROR or END */
1238 if (buffer
[1] == 'N')
1241 c
->currcmd
.retstat
= MCD_END
;
1243 else if (buffer
[1] == 'R')
1245 printf("<%d ERROR\n", c
->sfd
);
1246 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
1248 else if (buffer
[1] == 'X')
1250 c
->currcmd
.retstat
= MCD_DATA_EXISTS
;
1251 printf("<%d %s\n", c
->sfd
, buffer
);
1255 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1259 case 'C': /* CLIENT ERROR */
1260 printf("<%d %s\n", c
->sfd
, buffer
);
1261 c
->currcmd
.retstat
= MCD_CLIENT_ERROR
;
1265 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1270 } /* ms_ascii_process_line */
1274 * after one operation completes, reset the concurrency
1276 * @param c, pointer of the concurrency
1277 * @param timeout, whether it's timeout
1279 void ms_reset_conn(ms_conn_t
*c
, bool timeout
)
1285 if ((c
->packets
> 0) && (c
->packets
< MAX_UDP_PACKET
))
1287 memset(c
->udppkt
, 0, sizeof(ms_udppkt_t
) * (size_t)c
->packets
);
1296 c
->currcmd
.isfinish
= true;
1300 ms_conn_set_state(c
, conn_write
);
1301 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
1305 ms_drive_machine(c
);
1307 } /* ms_reset_conn */
1311 * if we have a complete line in the buffer, process it.
1313 * @param c, pointer of the concurrency
1315 * @return int, if success, return 0, else return -1
1317 static int ms_try_read_line(ms_conn_t
*c
)
1319 if (c
->protocol
== binary_prot
)
1321 /* Do we have the complete packet header? */
1322 if ((uint64_t)c
->rbytes
< sizeof(c
->binary_header
))
1324 /* need more data! */
1330 if (((long)(c
->rcurr
)) % 8 != 0)
1332 /* must realign input buffer */
1333 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
1335 if (settings
.verbose
)
1337 fprintf(stderr
, "%d: Realign input buffer.\n", c
->sfd
);
1341 protocol_binary_response_header
*rsp
;
1342 rsp
= (protocol_binary_response_header
*)c
->rcurr
;
1344 c
->binary_header
= *rsp
;
1345 c
->binary_header
.response
.extlen
= rsp
->response
.extlen
;
1346 c
->binary_header
.response
.keylen
= ntohs(rsp
->response
.keylen
);
1347 c
->binary_header
.response
.bodylen
= ntohl(rsp
->response
.bodylen
);
1348 c
->binary_header
.response
.status
= ntohs(rsp
->response
.status
);
1350 if (c
->binary_header
.response
.magic
!= PROTOCOL_BINARY_RES
)
1352 fprintf(stderr
, "Invalid magic: %x\n",
1353 c
->binary_header
.response
.magic
);
1354 ms_conn_set_state(c
, conn_closing
);
1358 /* process this complete response */
1359 if (ms_bin_process_response(c
) == 0)
1361 /* current operation completed */
1362 ms_reset_conn(c
, false);
1367 c
->rbytes
-= (int32_t)sizeof(c
->binary_header
);
1368 c
->rcurr
+= sizeof(c
->binary_header
);
1377 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1382 el
= memchr(c
->rcurr
, '\n', (size_t)c
->rbytes
);
1387 if (((el
- c
->rcurr
) > 1) && (*(el
- 1) == '\r'))
1393 assert(cont
<= (c
->rcurr
+ c
->rbytes
));
1395 /* process this complete line */
1396 if (ms_ascii_process_line(c
, c
->rcurr
) == 0)
1398 /* current operation completed */
1399 ms_reset_conn(c
, false);
1404 /* current operation didn't complete */
1405 c
->rbytes
-= (int32_t)(cont
- c
->rcurr
);
1409 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1413 } /* ms_try_read_line */
1417 * because the packet of UDP can't ensure the order, the
1418 * function is used to sort the received udp packet.
1420 * @param c, pointer of the concurrency
1421 * @param buf, the buffer to store the ordered packages data
1422 * @param rbytes, the maximum capacity of the buffer
1424 * @return int, if success, return the copy bytes, else return
1427 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
)
1432 uint16_t seq_num
= 0;
1433 uint16_t packets
= 0;
1434 unsigned char *header
= NULL
;
1436 /* no enough data */
1438 assert(buf
!= NULL
);
1439 assert(c
->rudpbytes
>= UDP_HEADER_SIZE
);
1441 /* calculate received packets count */
1442 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
>= UDP_HEADER_SIZE
)
1444 /* the last packet has some data */
1445 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
+ 1;
1449 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
;
1452 /* get the total packets count if necessary */
1453 if (c
->packets
== 0)
1455 c
->packets
= HEADER_TO_PACKETS((unsigned char *)c
->rudpbuf
);
1458 /* build the ordered packet array */
1459 for (int i
= c
->pktcurr
; i
< c
->recvpkt
; i
++)
1461 header
= (unsigned char *)c
->rudpbuf
+ i
* UDP_MAX_PAYLOAD_SIZE
;
1462 req_id
= (uint16_t)HEADER_TO_REQID(header
);
1463 assert(req_id
== c
->request_id
% (1 << 16));
1465 packets
= (uint16_t)HEADER_TO_PACKETS(header
);
1466 assert(c
->packets
== HEADER_TO_PACKETS(header
));
1468 seq_num
= (uint16_t)HEADER_TO_SEQNUM(header
);
1469 c
->udppkt
[seq_num
].header
= header
;
1470 c
->udppkt
[seq_num
].data
= (char *)header
+ UDP_HEADER_SIZE
;
1472 if (i
== c
->recvpkt
- 1)
1474 /* last received packet */
1475 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
== 0)
1477 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1482 c
->udppkt
[seq_num
].rbytes
= c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
1488 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1493 for (int i
= c
->ordcurr
; i
< c
->recvpkt
; i
++)
1495 /* there is some data to copy */
1496 if ((c
->udppkt
[i
].data
!= NULL
)
1497 && (c
->udppkt
[i
].copybytes
< c
->udppkt
[i
].rbytes
))
1499 header
= c
->udppkt
[i
].header
;
1500 len
= c
->udppkt
[i
].rbytes
- c
->udppkt
[i
].copybytes
;
1501 if (len
> rbytes
- wbytes
)
1503 len
= rbytes
- wbytes
;
1506 assert(len
<= rbytes
- wbytes
);
1507 assert(i
== HEADER_TO_SEQNUM(header
));
1509 memcpy(buf
+ wbytes
, c
->udppkt
[i
].data
+ c
->udppkt
[i
].copybytes
,
1512 c
->udppkt
[i
].copybytes
+= len
;
1514 if ((c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
)
1515 && (c
->udppkt
[i
].rbytes
== UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
))
1517 /* finish copying all the data of this packet, next */
1521 /* last received packet, and finish copying all the data */
1522 if ((c
->recvpkt
== c
->packets
) && (i
== c
->recvpkt
- 1)
1523 && (c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
))
1528 /* no space to copy data */
1529 if (wbytes
>= rbytes
)
1534 /* it doesn't finish reading all the data of the packet from network */
1535 if ((i
!= c
->recvpkt
- 1)
1536 && (c
->udppkt
[i
].rbytes
< UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
))
1543 /* no data to copy */
1548 return wbytes
== 0 ? -1 : wbytes
;
1549 } /* ms_sort_udp_packet */
1553 * encapsulate upd read like tcp read
1555 * @param c, pointer of the concurrency
1556 * @param buf, read buffer
1557 * @param len, length to read
1559 * @return int, if success, return the read bytes, else return
1562 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
)
1573 if (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
> c
->rudpsize
)
1575 char *new_rbuf
= realloc(c
->rudpbuf
, (size_t)c
->rudpsize
* 2);
1578 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1579 c
->rudpbytes
= 0; /* ignore what we read */
1582 c
->rudpbuf
= new_rbuf
;
1586 avail
= c
->rudpsize
- c
->rudpbytes
;
1587 /* UDP each time read a packet, 1400 bytes */
1588 res
= (int)read(c
->sfd
, c
->rudpbuf
+ c
->rudpbytes
, (size_t)avail
);
1592 atomic_add_size(&ms_stats
.bytes_read
, res
);
1607 /* "connection" closed */
1613 /* no data to read */
1618 /* copy data to read buffer */
1621 copybytes
= ms_sort_udp_packet(c
, buf
, len
);
1624 if (copybytes
== -1)
1626 atomic_add_size(&ms_stats
.pkt_disorder
, 1);
1634 * read from network as much as we can, handle buffer overflow and connection
1636 * before reading, move the remaining incomplete fragment of a command
1637 * (if any) to the beginning of the buffer.
1638 * return 0 if there's nothing to read on the first read.
1642 * read from network as much as we can, handle buffer overflow and connection
1643 * close. before reading, move the remaining incomplete fragment of a command
1644 * (if any) to the beginning of the buffer.
1646 * @param c, pointer of the concurrency
1649 * return 0 if there's nothing to read on the first read.
1650 * return 1 if get data
1651 * return -1 if error happens
1653 static int ms_try_read_network(ms_conn_t
*c
)
1661 if ((c
->rcurr
!= c
->rbuf
)
1662 && (! c
->readval
|| (c
->rvbytes
> c
->rsize
- (c
->rcurr
- c
->rbuf
))
1663 || (c
->readval
&& (c
->rcurr
- c
->rbuf
> c
->rbytes
))))
1665 if (c
->rbytes
!= 0) /* otherwise there's nothing to copy */
1666 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
1672 if (c
->rbytes
>= c
->rsize
)
1674 char *new_rbuf
= realloc(c
->rbuf
, (size_t)c
->rsize
* 2);
1677 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1678 c
->rbytes
= 0; /* ignore what we read */
1681 c
->rcurr
= c
->rbuf
= new_rbuf
;
1685 avail
= c
->rsize
- c
->rbytes
- (c
->rcurr
- c
->rbuf
);
1693 res
= (int32_t)ms_udp_read(c
, c
->rcurr
+ c
->rbytes
, (int32_t)avail
);
1697 res
= (int)read(c
->sfd
, c
->rcurr
+ c
->rbytes
, (size_t)avail
);
1704 atomic_add_size(&ms_stats
.bytes_read
, res
);
1719 /* connection closed */
1720 ms_conn_set_state(c
, conn_closing
);
1725 if ((errno
== EAGAIN
) || (errno
== EWOULDBLOCK
))
1727 /* Should close on unhandled errors. */
1728 ms_conn_set_state(c
, conn_closing
);
1734 } /* ms_try_read_network */
1738 * after get the object from server, verify the value if
1741 * @param c, pointer of the concurrency
1742 * @param mlget_item, pointer of mulit-get task item structure
1743 * @param value, received value string
1744 * @param vlen, received value string length
1746 static void ms_verify_value(ms_conn_t
*c
,
1747 ms_mlget_task_item_t
*mlget_item
,
1751 if (c
->curr_task
.verify
)
1753 assert(c
->curr_task
.item
->value_offset
!= INVALID_OFFSET
);
1754 char *orignval
= &ms_setting
.char_block
[c
->curr_task
.item
->value_offset
];
1756 &ms_setting
.char_block
[c
->curr_task
.item
->key_suffix_offset
];
1758 /* verify expire time if necessary */
1759 if (c
->curr_task
.item
->exp_time
> 0)
1761 struct timeval curr_time
;
1762 gettimeofday(&curr_time
, NULL
);
1764 /* object expired but get it now */
1765 if (curr_time
.tv_sec
- c
->curr_task
.item
->client_time
1766 > c
->curr_task
.item
->exp_time
+ EXPIRE_TIME_ERROR
)
1768 atomic_add_size(&ms_stats
.exp_get
, 1);
1770 if (ms_setting
.verbose
)
1774 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
1775 localtime(&c
->curr_task
.item
->client_time
));
1776 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
1777 localtime(&curr_time
.tv_sec
));
1779 "\n<%d expire time verification failed, "
1780 "object expired but get it now\n"
1782 "\tkey: %" PRIx64
" %.*s\n"
1783 "\tset time: %s current time: %s "
1784 "diff time: %d expire time: %d\n"
1785 "\texpected data: \n"
1786 "\treceived data len: %d\n"
1787 "\treceived data: %.*s\n",
1789 c
->curr_task
.item
->key_size
,
1790 c
->curr_task
.item
->key_prefix
,
1791 c
->curr_task
.item
->key_size
- (int)KEY_PREFIX_SIZE
,
1795 (int)(curr_time
.tv_sec
- c
->curr_task
.item
->client_time
),
1796 c
->curr_task
.item
->exp_time
,
1806 if ((c
->curr_task
.item
->value_size
!= vlen
)
1807 || (memcmp(orignval
, value
, (size_t)vlen
) != 0))
1809 atomic_add_size(&ms_stats
.vef_failed
, 1);
1811 if (ms_setting
.verbose
)
1814 "\n<%d data verification failed\n"
1816 "\tkey: %" PRIx64
" %.*s\n"
1817 "\texpected data len: %d\n"
1818 "\texpected data: %.*s\n"
1819 "\treceived data len: %d\n"
1820 "\treceived data: %.*s\n",
1822 c
->curr_task
.item
->key_size
,
1823 c
->curr_task
.item
->key_prefix
,
1824 c
->curr_task
.item
->key_size
- (int)KEY_PREFIX_SIZE
,
1826 c
->curr_task
.item
->value_size
,
1827 c
->curr_task
.item
->value_size
,
1837 c
->curr_task
.finish_verify
= true;
1839 if (mlget_item
!= NULL
)
1841 mlget_item
->finish_verify
= true;
1844 } /* ms_verify_value */
1848 * For ASCII protocol, after store the data into the local
1849 * buffer, run this function to handle the data.
1851 * @param c, pointer of the concurrency
1853 static void ms_ascii_complete_nread(ms_conn_t
*c
)
1856 assert(c
->rbytes
>= c
->rvbytes
);
1857 assert(c
->protocol
== ascii_udp_prot
|| c
->protocol
== ascii_prot
);
1861 c
->rcurr
[c
->rvbytes
- 1] == '\n' && c
->rcurr
[c
->rvbytes
- 2] == '\r');
1865 ms_mlget_task_item_t
*mlget_item
= NULL
;
1866 if (((ms_setting
.mult_key_num
> 1)
1867 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1868 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1870 c
->mlget_task
.value_index
++;
1871 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1873 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
)
1875 c
->curr_task
.item
= mlget_item
->item
;
1876 c
->curr_task
.verify
= mlget_item
->verify
;
1877 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1878 mlget_item
->get_miss
= false;
1882 /* Try to find the task item in multi-get task array */
1883 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
1885 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
1886 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
)
1888 c
->curr_task
.item
= mlget_item
->item
;
1889 c
->curr_task
.verify
= mlget_item
->verify
;
1890 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1891 mlget_item
->get_miss
= false;
1899 ms_verify_value(c
, mlget_item
, c
->rcurr
, c
->rvbytes
- 2);
1901 c
->curr_task
.get_miss
= false;
1902 c
->rbytes
-= c
->rvbytes
;
1903 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1904 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1907 } /* ms_ascii_complete_nread */
1911 * For binary protocol, after store the data into the local
1912 * buffer, run this function to handle the data.
1914 * @param c, pointer of the concurrency
1916 static void ms_bin_complete_nread(ms_conn_t
*c
)
1919 assert(c
->rbytes
>= c
->rvbytes
);
1920 assert(c
->protocol
== binary_prot
);
1922 int extlen
= c
->binary_header
.response
.extlen
;
1923 int keylen
= c
->binary_header
.response
.keylen
;
1924 uint8_t opcode
= c
->binary_header
.response
.opcode
;
1926 /* not get command or not include value, just return */
1927 if (((opcode
!= PROTOCOL_BINARY_CMD_GET
)
1928 && (opcode
!= PROTOCOL_BINARY_CMD_GETQ
))
1929 || (c
->rvbytes
<= extlen
+ keylen
))
1932 if (c
->binary_header
.response
.opcode
== PROTOCOL_BINARY_CMD_GET
)
1934 c
->currcmd
.retstat
= MCD_END
;
1935 c
->curr_task
.get_miss
= true;
1940 ms_reset_conn(c
, false);
1945 ms_mlget_task_item_t
*mlget_item
= NULL
;
1946 if (((ms_setting
.mult_key_num
> 1)
1947 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1948 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1950 c
->mlget_task
.value_index
++;
1951 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1953 c
->curr_task
.item
= mlget_item
->item
;
1954 c
->curr_task
.verify
= mlget_item
->verify
;
1955 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1956 mlget_item
->get_miss
= false;
1961 c
->rcurr
+ extlen
+ keylen
,
1962 c
->rvbytes
- extlen
- keylen
);
1964 c
->currcmd
.retstat
= MCD_END
;
1965 c
->curr_task
.get_miss
= false;
1966 c
->rbytes
-= c
->rvbytes
;
1967 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1968 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1972 if (ms_setting
.mult_key_num
> 1)
1974 /* multi-get have check all the item */
1975 if (c
->mlget_task
.value_index
== c
->mlget_task
.mlget_num
- 1)
1977 ms_reset_conn(c
, false);
1983 ms_reset_conn(c
, false);
1985 } /* ms_bin_complete_nread */
1989 * we get here after reading the value of get commands.
1991 * @param c, pointer of the concurrency
1993 static void ms_complete_nread(ms_conn_t
*c
)
1996 assert(c
->rbytes
>= c
->rvbytes
);
1997 assert(c
->protocol
== ascii_udp_prot
1998 || c
->protocol
== ascii_prot
1999 || c
->protocol
== binary_prot
);
2001 if (c
->protocol
== binary_prot
)
2003 ms_bin_complete_nread(c
);
2007 ms_ascii_complete_nread(c
);
2009 } /* ms_complete_nread */
2013 * Adds a message header to a connection.
2015 * @param c, pointer of the concurrency
2017 * @return int, if success, return 0, else return -1
2019 static int ms_add_msghdr(ms_conn_t
*c
)
2025 if (c
->msgsize
== c
->msgused
)
2028 realloc(c
->msglist
, (size_t)c
->msgsize
* 2 * sizeof(struct msghdr
));
2036 msg
= c
->msglist
+ c
->msgused
;
2039 * this wipes msg_iovlen, msg_control, msg_controllen, and
2040 * msg_flags, the last 3 of which aren't defined on solaris:
2042 memset(msg
, 0, sizeof(struct msghdr
));
2044 msg
->msg_iov
= &c
->iov
[c
->iovused
];
2046 if (c
->udp
&& (c
->srv_recv_addr_size
> 0))
2048 msg
->msg_name
= &c
->srv_recv_addr
;
2049 msg
->msg_namelen
= c
->srv_recv_addr_size
;
2057 /* Leave room for the UDP header, which we'll fill in later. */
2058 return ms_add_iov(c
, NULL
, UDP_HEADER_SIZE
);
2062 } /* ms_add_msghdr */
2066 * Ensures that there is room for another structure iovec in a connection's
2069 * @param c, pointer of the concurrency
2071 * @return int, if success, return 0, else return -1
2073 static int ms_ensure_iov_space(ms_conn_t
*c
)
2077 if (c
->iovused
>= c
->iovsize
)
2080 struct iovec
*new_iov
= (struct iovec
*)realloc(c
->iov
,
2083 * sizeof(struct iovec
));
2090 /* Point all the msghdr structures at the new list. */
2091 for (i
= 0, iovnum
= 0; i
< c
->msgused
; i
++)
2093 c
->msglist
[i
].msg_iov
= &c
->iov
[iovnum
];
2094 iovnum
+= (int)c
->msglist
[i
].msg_iovlen
;
2099 } /* ms_ensure_iov_space */
2103 * Adds data to the list of pending data that will be written out to a
2106 * @param c, pointer of the concurrency
2107 * @param buf, the buffer includes data to send
2108 * @param len, the data length in the buffer
2110 * @return int, if success, return 0, else return -1
2112 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
)
2122 m
= &c
->msglist
[c
->msgused
- 1];
2125 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2127 limit_to_mtu
= c
->udp
;
2129 /* We may need to start a new msghdr if this one is full. */
2130 if ((m
->msg_iovlen
== IOV_MAX
)
2131 || (limit_to_mtu
&& (c
->msgbytes
>= UDP_MAX_SEND_PAYLOAD_SIZE
)))
2134 m
= &c
->msglist
[c
->msgused
- 1];
2137 if (ms_ensure_iov_space(c
) != 0)
2140 /* If the fragment is too big to fit in the datagram, split it up */
2141 if (limit_to_mtu
&& (len
+ c
->msgbytes
> UDP_MAX_SEND_PAYLOAD_SIZE
))
2143 leftover
= len
+ c
->msgbytes
- UDP_MAX_SEND_PAYLOAD_SIZE
;
2151 m
= &c
->msglist
[c
->msgused
- 1];
2152 m
->msg_iov
[m
->msg_iovlen
].iov_base
= (void *)buf
;
2153 m
->msg_iov
[m
->msg_iovlen
].iov_len
= (size_t)len
;
2159 buf
= ((char *)buf
) + len
;
2162 while (leftover
> 0);
2169 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2171 * @param c, pointer of the concurrency
2173 * @return int, if success, return 0, else return -1
2175 static int ms_build_udp_headers(ms_conn_t
*c
)
2182 c
->request_id
= ms_get_udp_request_id();
2184 if (c
->msgused
> c
->hdrsize
)
2188 new_hdrbuf
= realloc(c
->hdrbuf
,
2189 (size_t)c
->msgused
* 2 * UDP_HEADER_SIZE
);
2191 new_hdrbuf
= malloc((size_t)c
->msgused
* 2 * UDP_HEADER_SIZE
);
2195 c
->hdrbuf
= (unsigned char *)new_hdrbuf
;
2196 c
->hdrsize
= c
->msgused
* 2;
2199 /* If this is a multi-packet request, drop it. */
2200 if (c
->udp
&& (c
->msgused
> 1))
2202 fprintf(stderr
, "multi-packet request for UDP not supported.\n");
2207 for (i
= 0; i
< c
->msgused
; i
++)
2209 c
->msglist
[i
].msg_iov
[0].iov_base
= (void *)hdr
;
2210 c
->msglist
[i
].msg_iov
[0].iov_len
= UDP_HEADER_SIZE
;
2211 *hdr
++= (unsigned char)(c
->request_id
/ 256);
2212 *hdr
++= (unsigned char)(c
->request_id
% 256);
2213 *hdr
++= (unsigned char)(i
/ 256);
2214 *hdr
++= (unsigned char)(i
% 256);
2215 *hdr
++= (unsigned char)(c
->msgused
/ 256);
2216 *hdr
++= (unsigned char)(c
->msgused
% 256);
2217 *hdr
++= (unsigned char)1; /* support facebook memcached */
2218 *hdr
++= (unsigned char)0;
2220 ((unsigned char *)c
->msglist
[i
].msg_iov
[0].iov_base
2221 + UDP_HEADER_SIZE
));
2225 } /* ms_build_udp_headers */
2229 * Transmit the next chunk of data from our list of msgbuf structures.
2231 * @param c, pointer of the concurrency
2233 * @return TRANSMIT_COMPLETE All done writing.
2234 * TRANSMIT_INCOMPLETE More data remaining to write.
2235 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2236 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2238 static int ms_transmit(ms_conn_t
*c
)
2242 if ((c
->msgcurr
< c
->msgused
)
2243 && (c
->msglist
[c
->msgcurr
].msg_iovlen
== 0))
2245 /* Finished writing the current msg; advance to the next. */
2249 if (c
->msgcurr
< c
->msgused
)
2252 struct msghdr
*m
= &c
->msglist
[c
->msgcurr
];
2254 res
= sendmsg(c
->sfd
, m
, 0);
2257 atomic_add_size(&ms_stats
.bytes_written
, res
);
2259 /* We've written some of the data. Remove the completed
2260 * iovec entries from the list of pending writes. */
2261 while (m
->msg_iovlen
> 0 && res
>= (ssize_t
)m
->msg_iov
->iov_len
)
2263 res
-= (ssize_t
)m
->msg_iov
->iov_len
;
2268 /* Might have written just part of the last iovec entry;
2269 * adjust it so the next write will do the rest. */
2272 m
->msg_iov
->iov_base
= (void *)((unsigned char *)m
->msg_iov
->iov_base
+ res
);
2273 m
->msg_iov
->iov_len
-= (size_t)res
;
2275 return TRANSMIT_INCOMPLETE
;
2277 if ((res
== -1) && ((errno
== EAGAIN
) || (errno
== EWOULDBLOCK
)))
2279 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2281 fprintf(stderr
, "Couldn't update event.\n");
2282 ms_conn_set_state(c
, conn_closing
);
2283 return TRANSMIT_HARD_ERROR
;
2285 return TRANSMIT_SOFT_ERROR
;
2288 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2289 * we have a real error, on which we close the connection */
2290 fprintf(stderr
, "Failed to write, and not due to blocking.\n");
2292 ms_conn_set_state(c
, conn_closing
);
2293 return TRANSMIT_HARD_ERROR
;
2297 return TRANSMIT_COMPLETE
;
2303 * Shrinks a connection's buffers if they're too big. This prevents
2304 * periodic large "mget" response from server chewing lots of client
2307 * This should only be called in between requests since it can wipe output
2310 * @param c, pointer of the concurrency
2312 static void ms_conn_shrink(ms_conn_t
*c
)
2319 if ((c
->rsize
> READ_BUFFER_HIGHWAT
) && (c
->rbytes
< DATA_BUFFER_SIZE
))
2323 if (c
->rcurr
!= c
->rbuf
)
2324 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
2326 newbuf
= (char *)realloc((void *)c
->rbuf
, DATA_BUFFER_SIZE
);
2331 c
->rsize
= DATA_BUFFER_SIZE
;
2336 if (c
->udp
&& (c
->rudpsize
> UDP_DATA_BUFFER_HIGHWAT
)
2337 && (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
< UDP_DATA_BUFFER_SIZE
))
2339 char *new_rbuf
= (char *)realloc(c
->rudpbuf
, (size_t)c
->rudpsize
* 2);
2342 c
->rudpbuf
= new_rbuf
;
2343 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
2345 /* TODO check error condition? */
2348 if (c
->msgsize
> MSG_LIST_HIGHWAT
)
2350 struct msghdr
*newbuf
= (struct msghdr
*)realloc(
2353 * sizeof(c
->msglist
[0]));
2357 c
->msgsize
= MSG_LIST_INITIAL
;
2359 /* TODO check error condition? */
2362 if (c
->iovsize
> IOV_LIST_HIGHWAT
)
2364 struct iovec
*newbuf
= (struct iovec
*)realloc((void *)c
->iov
,
2366 * sizeof(c
->iov
[0]));
2370 c
->iovsize
= IOV_LIST_INITIAL
;
2372 /* TODO check return value */
2374 } /* ms_conn_shrink */
2378 * Sets a connection's current state in the state machine. Any special
2379 * processing that needs to happen on certain state transitions can
2382 * @param c, pointer of the concurrency
2383 * @param state, connection state
2385 static void ms_conn_set_state(ms_conn_t
*c
, int state
)
2389 if (state
!= c
->state
)
2391 if (state
== conn_read
)
2397 } /* ms_conn_set_state */
2401 * update the event if socks change state. for example: when
2402 * change the listen scoket read event to sock write event, or
2403 * change socket handler, we could call this function.
2405 * @param c, pointer of the concurrency
2406 * @param new_flags, new event flags
2408 * @return bool, if success, return true, else return false
2410 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
)
2412 /* default event timeout 10 seconds */
2415 .tv_sec
= EVENT_TIMEOUT
, .tv_usec
= 0
2420 struct event_base
*base
= c
->event
.ev_base
;
2421 if ((c
->ev_flags
== new_flags
) && (ms_setting
.rep_write_srv
== 0)
2422 && (! ms_setting
.facebook_test
|| (c
->total_sfds
== 1)))
2427 if (event_del(&c
->event
) == -1)
2429 /* try to delete the event again */
2430 if (event_del(&c
->event
) == -1)
2436 event_set(&c
->event
,
2441 event_base_set(base
, &c
->event
);
2442 c
->ev_flags
= (short)new_flags
;
2444 if (c
->total_sfds
== 1)
2446 if (event_add(&c
->event
, NULL
) == -1)
2453 if (event_add(&c
->event
, &t
) == -1)
2460 } /* ms_update_event */
2464 * If user want to get the expected throughput, we could limit
2465 * the performance of memslap. we could give up some work and
2466 * just wait a short time. The function is used to check this
2469 * @param c, pointer of the concurrency
2471 * @return bool, if success, return true, else return false
2473 static bool ms_need_yield(ms_conn_t
*c
)
2475 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
2477 int64_t time_diff
= 0;
2478 struct timeval curr_time
;
2479 ms_task_t
*task
= &c
->curr_task
;
2481 if (ms_setting
.expected_tps
> 0)
2483 gettimeofday(&curr_time
, NULL
);
2484 time_diff
= ms_time_diff(&ms_thread
->startup_time
, &curr_time
);
2486 (int64_t)((task
->get_opt
2487 + task
->set_opt
) / ((uint64_t)time_diff
/ 1000000));
2489 /* current throughput is greater than expected throughput */
2490 if (tps
> ms_thread
->thread_ctx
->tps_perconn
)
2497 } /* ms_need_yield */
2501 * used to update the start time of each operation
2503 * @param c, pointer of the concurrency
2505 static void ms_update_start_time(ms_conn_t
*c
)
2507 ms_task_item_t
*item
= c
->curr_task
.item
;
2509 if ((ms_setting
.stat_freq
> 0) || c
->udp
2510 || ((c
->currcmd
.cmd
== CMD_SET
) && (item
->exp_time
> 0)))
2512 gettimeofday(&c
->start_time
, NULL
);
2513 if ((c
->currcmd
.cmd
== CMD_SET
) && (item
->exp_time
> 0))
2515 /* record the current time */
2516 item
->client_time
= c
->start_time
.tv_sec
;
2519 } /* ms_update_start_time */
2523 * run the state machine
2525 * @param c, pointer of the concurrency
2527 static void ms_drive_machine(ms_conn_t
*c
)
2540 if (c
->rbytes
>= c
->rvbytes
)
2542 ms_complete_nread(c
);
2548 if (ms_try_read_line(c
) != 0)
2554 if (ms_try_read_network(c
) != 0)
2559 /* doesn't read all the response data, wait event wake up */
2560 if (! c
->currcmd
.isfinish
)
2562 if (! ms_update_event(c
, EV_READ
| EV_PERSIST
))
2564 fprintf(stderr
, "Couldn't update event.\n");
2565 ms_conn_set_state(c
, conn_closing
);
2572 /* we have no command line and no data to read from network, next write */
2573 ms_conn_set_state(c
, conn_write
);
2574 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
2579 if (! c
->ctnwrite
&& ms_need_yield(c
))
2583 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2585 fprintf(stderr
, "Couldn't update event.\n");
2586 ms_conn_set_state(c
, conn_closing
);
2593 if (! c
->ctnwrite
&& (ms_exec_task(c
) != 0))
2595 ms_conn_set_state(c
, conn_closing
);
2599 /* record the start time before starting to send data if necessary */
2600 if (! c
->ctnwrite
|| (c
->change_sfd
&& c
->ctnwrite
))
2604 c
->change_sfd
= false;
2606 ms_update_start_time(c
);
2609 /* change sfd if necessary */
2617 /* execute task until nothing need be written to network */
2618 if (! c
->ctnwrite
&& (c
->msgcurr
== c
->msgused
))
2620 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2622 fprintf(stderr
, "Couldn't update event.\n");
2623 ms_conn_set_state(c
, conn_closing
);
2630 switch (ms_transmit(c
))
2632 case TRANSMIT_COMPLETE
:
2633 /* we have no data to write to network, next wait repose */
2634 if (! ms_update_event(c
, EV_READ
| EV_PERSIST
))
2636 fprintf(stderr
, "Couldn't update event.\n");
2637 ms_conn_set_state(c
, conn_closing
);
2641 ms_conn_set_state(c
, conn_read
);
2646 case TRANSMIT_INCOMPLETE
:
2648 break; /* Continue in state machine. */
2650 case TRANSMIT_HARD_ERROR
:
2654 case TRANSMIT_SOFT_ERROR
:
2666 /* recovery mode, need reconnect if connection close */
2667 if (ms_setting
.reconnect
&& (! ms_global
.time_out
2668 || ((ms_setting
.run_time
== 0)
2669 && (c
->remain_exec_num
> 0))))
2671 if (ms_reconn(c
) != 0)
2678 ms_reset_conn(c
, false);
2680 if (c
->total_sfds
== 1)
2682 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2684 fprintf(stderr
, "Couldn't update event.\n");
2685 ms_conn_set_state(c
, conn_closing
);
2703 } /* ms_drive_machine */
2707 * the event handler of each thread
2709 * @param fd, the file descriptor of socket
2710 * @param which, event flag
2711 * @param arg, argument
2713 void ms_event_handler(const int fd
, const short which
, void *arg
)
2715 ms_conn_t
*c
= (ms_conn_t
*)arg
;
2725 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2731 assert(fd
== c
->sfd
);
2733 /* event timeout, close the current connection */
2734 if (c
->which
== EV_TIMEOUT
)
2736 ms_conn_set_state(c
, conn_closing
);
2739 ms_drive_machine(c
);
2741 /* wait for next event */
2742 } /* ms_event_handler */
2746 * get the next socket descriptor index to run for replication
2748 * @param c, pointer of the concurrency
2749 * @param cmd, command(get or set )
2751 * @return int, if success, return the index, else return 0
2753 static int ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
)
2758 if (c
->total_sfds
== 1)
2763 if (ms_setting
.rep_write_srv
== 0)
2772 for (i
= 0; i
< ms_setting
.rep_write_srv
; i
++)
2774 if (c
->tcpsfd
[i
] > 0)
2780 if (i
== ms_setting
.rep_write_srv
)
2782 /* random get one replication server to read */
2783 sock_index
= (int)(random() % c
->total_sfds
);
2787 /* random get one replication writing server to write */
2788 sock_index
= (int)(random() % ms_setting
.rep_write_srv
);
2791 else if (cmd
== CMD_GET
)
2793 /* random get one replication server to read */
2794 sock_index
= (int)(random() % c
->total_sfds
);
2797 while (c
->tcpsfd
[sock_index
] == 0);
2800 } /* ms_get_rep_sock_index */
2804 * get the next socket descriptor index to run
2806 * @param c, pointer of the concurrency
2808 * @return int, return the index
2810 static int ms_get_next_sock_index(ms_conn_t
*c
)
2816 sock_index
= (++c
->cur_idx
== c
->total_sfds
) ? 0 : c
->cur_idx
;
2818 while (c
->tcpsfd
[sock_index
] == 0);
2821 } /* ms_get_next_sock_index */
2825 * update socket event of the connections
2827 * @param c, pointer of the concurrency
2829 * @return int, if success, return 0, else return -1
2831 static int ms_update_conn_sock_event(ms_conn_t
*c
)
2835 switch (c
->currcmd
.cmd
)
2838 if (ms_setting
.facebook_test
&& c
->udp
)
2840 c
->sfd
= c
->tcpsfd
[0];
2842 c
->change_sfd
= true;
2847 if (ms_setting
.facebook_test
&& ! c
->udp
)
2851 c
->change_sfd
= true;
2859 if (! c
->udp
&& (c
->total_sfds
> 1))
2861 if (c
->cur_idx
!= c
->total_sfds
)
2863 if (ms_setting
.rep_write_srv
== 0)
2865 c
->cur_idx
= ms_get_next_sock_index(c
);
2869 c
->cur_idx
= ms_get_rep_sock_index(c
, c
->currcmd
.cmd
);
2874 /* must select the first sock of the connection at the beginning */
2878 c
->sfd
= c
->tcpsfd
[c
->cur_idx
];
2879 assert(c
->sfd
!= 0);
2880 c
->change_sfd
= true;
2885 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2887 fprintf(stderr
, "Couldn't update event.\n");
2888 ms_conn_set_state(c
, conn_closing
);
2894 } /* ms_update_conn_sock_event */
2898 * for ASCII protocol, this function build the set command
2899 * string and send the command.
2901 * @param c, pointer of the concurrency
2902 * @param item, pointer of task item which includes the object
2905 * @return int, if success, return 0, else return -1
2907 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2911 char *buffer
= c
->wbuf
;
2913 write_len
= sprintf(buffer
,
2919 if (write_len
> c
->wsize
)
2921 /* ought to be always enough. just fail for simplicity */
2922 fprintf(stderr
, "output command line too long.\n");
2926 if (item
->value_offset
== INVALID_OFFSET
)
2928 value_offset
= item
->key_suffix_offset
;
2932 value_offset
= item
->value_offset
;
2935 if ((ms_add_iov(c
, "set ", 4) != 0)
2936 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
2937 (int)KEY_PREFIX_SIZE
) != 0)
2938 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2939 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0)
2940 || (ms_add_iov(c
, buffer
, write_len
) != 0)
2941 || (ms_add_iov(c
, &ms_setting
.char_block
[value_offset
],
2942 item
->value_size
) != 0)
2943 || (ms_add_iov(c
, "\r\n", 2) != 0)
2944 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
2950 } /* ms_build_ascii_write_buf_set */
2954 * used to send set command to server
2956 * @param c, pointer of the concurrency
2957 * @param item, pointer of task item which includes the object
2960 * @return int, if success, return 0, else return -1
2962 int ms_mcd_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2966 c
->currcmd
.cmd
= CMD_SET
;
2967 c
->currcmd
.isfinish
= false;
2968 c
->currcmd
.retstat
= MCD_FAILURE
;
2970 if (ms_update_conn_sock_event(c
) != 0)
2978 if (ms_add_msghdr(c
) != 0)
2980 fprintf(stderr
, "Out of memory preparing request.");
2984 /* binary protocol */
2985 if (c
->protocol
== binary_prot
)
2987 if (ms_build_bin_write_buf_set(c
, item
) != 0)
2994 if (ms_build_ascii_write_buf_set(c
, item
) != 0)
3000 atomic_add_size(&ms_stats
.obj_bytes
,
3001 item
->key_size
+ item
->value_size
);
3002 atomic_add_size(&ms_stats
.cmd_set
, 1);
3009 * for ASCII protocol, this function build the get command
3010 * string and send the command.
3012 * @param c, pointer of the concurrency
3013 * @param item, pointer of task item which includes the object
3016 * @return int, if success, return 0, else return -1
3018 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
)
3020 if ((ms_add_iov(c
, "get ", 4) != 0)
3021 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
3022 (int)KEY_PREFIX_SIZE
) != 0)
3023 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
3024 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0)
3025 || (ms_add_iov(c
, "\r\n", 2) != 0)
3026 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
3032 } /* ms_build_ascii_write_buf_get */
3036 * used to send the get command to server
3038 * @param c, pointer of the concurrency
3039 * @param item, pointer of task item which includes the object
3041 * @param verify, whether do verification
3043 * @return int, if success, return 0, else return -1
3045 int ms_mcd_get(ms_conn_t
*c
, ms_task_item_t
*item
, bool verify
)
3047 /* verify not supported yet */
3048 UNUSED_ARGUMENT(verify
);
3052 c
->currcmd
.cmd
= CMD_GET
;
3053 c
->currcmd
.isfinish
= false;
3054 c
->currcmd
.retstat
= MCD_FAILURE
;
3056 if (ms_update_conn_sock_event(c
) != 0)
3064 if (ms_add_msghdr(c
) != 0)
3066 fprintf(stderr
, "Out of memory preparing request.");
3070 /* binary protocol */
3071 if (c
->protocol
== binary_prot
)
3073 if (ms_build_bin_write_buf_get(c
, item
) != 0)
3080 if (ms_build_ascii_write_buf_get(c
, item
) != 0)
3086 atomic_add_size(&ms_stats
.cmd_get
, 1);
3093 * for ASCII protocol, this function build the multi-get command
3094 * string and send the command.
3096 * @param c, pointer of the concurrency
3098 * @return int, if success, return 0, else return -1
3100 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
)
3102 ms_task_item_t
*item
;
3104 if (ms_add_iov(c
, "get", 3) != 0)
3109 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3111 item
= c
->mlget_task
.mlget_item
[i
].item
;
3112 assert(item
!= NULL
);
3113 if ((ms_add_iov(c
, " ", 1) != 0)
3114 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
3115 (int)KEY_PREFIX_SIZE
) != 0)
3116 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
3117 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0))
3123 if ((ms_add_iov(c
, "\r\n", 2) != 0)
3124 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
3130 } /* ms_build_ascii_write_buf_mlget */
3134 * used to send the multi-get command to server
3136 * @param c, pointer of the concurrency
3138 * @return int, if success, return 0, else return -1
3140 int ms_mcd_mlget(ms_conn_t
*c
)
3142 ms_task_item_t
*item
;
3145 assert(c
->mlget_task
.mlget_num
>= 1);
3147 c
->currcmd
.cmd
= CMD_GET
;
3148 c
->currcmd
.isfinish
= false;
3149 c
->currcmd
.retstat
= MCD_FAILURE
;
3151 if (ms_update_conn_sock_event(c
) != 0)
3159 if (ms_add_msghdr(c
) != 0)
3161 fprintf(stderr
, "Out of memory preparing request.");
3165 /* binary protocol */
3166 if (c
->protocol
== binary_prot
)
3168 if (ms_build_bin_write_buf_mlget(c
) != 0)
3175 if (ms_build_ascii_write_buf_mlget(c
) != 0)
3181 /* decrease operation time of each item */
3182 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3184 item
= c
->mlget_task
.mlget_item
[i
].item
;
3185 atomic_add_size(&ms_stats
.cmd_get
, 1);
3189 } /* ms_mcd_mlget */
3193 * binary protocol support
3197 * for binary protocol, parse the response of server
3199 * @param c, pointer of the concurrency
3201 * @return int, if success, return 0, else return -1
3203 static int ms_bin_process_response(ms_conn_t
*c
)
3205 const char *errstr
= NULL
;
3209 uint32_t bodylen
= c
->binary_header
.response
.bodylen
;
3210 uint8_t opcode
= c
->binary_header
.response
.opcode
;
3211 uint16_t status
= c
->binary_header
.response
.status
;
3215 c
->rvbytes
= (int32_t)bodylen
;
3223 case PROTOCOL_BINARY_RESPONSE_SUCCESS
:
3224 if (opcode
== PROTOCOL_BINARY_CMD_SET
)
3226 c
->currcmd
.retstat
= MCD_STORED
;
3228 else if (opcode
== PROTOCOL_BINARY_CMD_DELETE
)
3230 c
->currcmd
.retstat
= MCD_DELETED
;
3232 else if (opcode
== PROTOCOL_BINARY_CMD_GET
)
3234 c
->currcmd
.retstat
= MCD_END
;
3238 case PROTOCOL_BINARY_RESPONSE_ENOMEM
:
3239 errstr
= "Out of memory";
3240 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
3243 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
:
3244 errstr
= "Unknown command";
3245 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
3248 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
:
3249 errstr
= "Not found";
3250 c
->currcmd
.retstat
= MCD_NOTFOUND
;
3253 case PROTOCOL_BINARY_RESPONSE_EINVAL
:
3254 errstr
= "Invalid arguments";
3255 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
3258 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
:
3259 errstr
= "Data exists for key.";
3262 case PROTOCOL_BINARY_RESPONSE_E2BIG
:
3263 errstr
= "Too large.";
3264 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
3267 case PROTOCOL_BINARY_RESPONSE_NOT_STORED
:
3268 errstr
= "Not stored.";
3269 c
->currcmd
.retstat
= MCD_NOTSTORED
;
3273 errstr
= "Unknown error";
3274 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
3280 fprintf(stderr
, "%s\n", errstr
);
3285 } /* ms_bin_process_response */
3288 /* build binary header and add the header to the buffer to send */
3291 * build binary header and add the header to the buffer to send
3293 * @param c, pointer of the concurrency
3294 * @param opcode, operation code
3295 * @param hdr_len, length of header
3296 * @param key_len, length of key
3297 * @param body_len. length of body
3299 static void ms_add_bin_header(ms_conn_t
*c
,
3305 protocol_binary_request_header
*header
;
3309 header
= (protocol_binary_request_header
*)c
->wcurr
;
3311 header
->request
.magic
= (uint8_t)PROTOCOL_BINARY_REQ
;
3312 header
->request
.opcode
= (uint8_t)opcode
;
3313 header
->request
.keylen
= htons(key_len
);
3315 header
->request
.extlen
= (uint8_t)hdr_len
;
3316 header
->request
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
;
3317 header
->request
.reserved
= 0;
3319 header
->request
.bodylen
= htonl(body_len
);
3320 header
->request
.opaque
= 0;
3321 header
->request
.cas
= 0;
3323 ms_add_iov(c
, c
->wcurr
, sizeof(header
->request
));
3324 } /* ms_add_bin_header */
3328 * add the key to the socket write buffer array
3330 * @param c, pointer of the concurrency
3331 * @param item, pointer of task item which includes the object
3334 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
)
3336 ms_add_iov(c
, (char *)&item
->key_prefix
, (int)KEY_PREFIX_SIZE
);
3337 ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
3338 item
->key_size
- (int)KEY_PREFIX_SIZE
);
3343 * for binary protocol, this function build the set command
3344 * and add the command to send buffer array.
3346 * @param c, pointer of the concurrency
3347 * @param item, pointer of task item which includes the object
3350 * @return int, if success, return 0, else return -1
3352 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
)
3354 assert(c
->wbuf
== c
->wcurr
);
3357 protocol_binary_request_set
*rep
= (protocol_binary_request_set
*)c
->wcurr
;
3358 uint16_t keylen
= (uint16_t)item
->key_size
;
3359 uint32_t bodylen
= (uint32_t)sizeof(rep
->message
.body
)
3360 + (uint32_t)keylen
+ (uint32_t)item
->value_size
;
3362 ms_add_bin_header(c
,
3363 PROTOCOL_BINARY_CMD_SET
,
3364 sizeof(rep
->message
.body
),
3367 rep
->message
.body
.flags
= 0;
3368 rep
->message
.body
.expiration
= htonl((uint32_t)item
->exp_time
);
3369 ms_add_iov(c
, &rep
->message
.body
, sizeof(rep
->message
.body
));
3370 ms_add_key_to_iov(c
, item
);
3372 if (item
->value_offset
== INVALID_OFFSET
)
3374 value_offset
= item
->key_suffix_offset
;
3378 value_offset
= item
->value_offset
;
3380 ms_add_iov(c
, &ms_setting
.char_block
[value_offset
], item
->value_size
);
3383 } /* ms_build_bin_write_buf_set */
3387 * for binary protocol, this function build the get command and
3388 * add the command to send buffer array.
3390 * @param c, pointer of the concurrency
3391 * @param item, pointer of task item which includes the object
3394 * @return int, if success, return 0, else return -1
3396 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
)
3398 assert(c
->wbuf
== c
->wcurr
);
3400 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_GET
, 0, (uint16_t)item
->key_size
,
3401 (uint32_t)item
->key_size
);
3402 ms_add_key_to_iov(c
, item
);
3405 } /* ms_build_bin_write_buf_get */
3409 * for binary protocol, this function build the multi-get
3410 * command and add the command to send buffer array.
3412 * @param c, pointer of the concurrency
3413 * @param item, pointer of task item which includes the object
3416 * @return int, if success, return 0, else return -1
3418 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
)
3420 ms_task_item_t
*item
;
3422 assert(c
->wbuf
== c
->wcurr
);
3424 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3426 item
= c
->mlget_task
.mlget_item
[i
].item
;
3427 assert(item
!= NULL
);
3429 ms_add_bin_header(c
,
3430 PROTOCOL_BINARY_CMD_GET
,
3432 (uint16_t)item
->key_size
,
3433 (uint32_t)item
->key_size
);
3434 ms_add_key_to_iov(c
, item
);
3435 c
->wcurr
+= sizeof(protocol_binary_request_get
);
3441 } /* ms_build_bin_write_buf_mlget */