3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
16 #include <netinet/tcp.h>
17 #include <arpa/inet.h>
18 #include "ms_setting.h"
19 #include "ms_thread.h"
21 /* for network write */
22 #define TRANSMIT_COMPLETE 0
23 #define TRANSMIT_INCOMPLETE 1
24 #define TRANSMIT_SOFT_ERROR 2
25 #define TRANSMIT_HARD_ERROR 3
27 /* for generating key */
28 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
29 #define KEY_PREFIX_MASK 0x1010101010101010
31 /* For parse the value length return by server */
33 #define VALUELEN_TOKEN 3
35 /* global increasing counter, to ensure the key prefix unique */
36 static uint64_t key_prefix_seq
= KEY_PREFIX_BASE
;
38 /* global increasing counter, generating request id for UDP */
39 static int udp_request_id
= 0;
41 extern __thread ms_thread_t ms_thread
;
43 /* generate upd request id */
44 static int ms_get_udp_request_id(void);
47 /* connect initialize */
48 static void ms_task_init(ms_conn_t
*c
);
49 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
);
50 static int ms_conn_sock_init(ms_conn_t
*c
);
51 static int ms_conn_event_init(ms_conn_t
*c
);
52 static int ms_conn_init(ms_conn_t
*c
,
54 const int read_buffer_size
,
56 static void ms_warmup_num_init(ms_conn_t
*c
);
57 static int ms_item_win_init(ms_conn_t
*c
);
60 /* connection close */
61 void ms_conn_free(ms_conn_t
*c
);
62 static void ms_conn_close(ms_conn_t
*c
);
65 /* create network connection */
66 static int ms_new_socket(struct addrinfo
*ai
);
67 static void ms_maximize_sndbuf(const int sfd
);
68 static int ms_network_connect(ms_conn_t
*c
,
73 static int ms_reconn(ms_conn_t
*c
);
77 static int ms_tokenize_command(char *command
,
79 const int max_tokens
);
80 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
);
81 static int ms_try_read_line(ms_conn_t
*c
);
82 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
);
83 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
);
84 static int ms_try_read_network(ms_conn_t
*c
);
85 static void ms_verify_value(ms_conn_t
*c
,
86 ms_mlget_task_item_t
*mlget_item
,
89 static void ms_ascii_complete_nread(ms_conn_t
*c
);
90 static void ms_bin_complete_nread(ms_conn_t
*c
);
91 static void ms_complete_nread(ms_conn_t
*c
);
95 static int ms_add_msghdr(ms_conn_t
*c
);
96 static int ms_ensure_iov_space(ms_conn_t
*c
);
97 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
);
98 static int ms_build_udp_headers(ms_conn_t
*c
);
99 static int ms_transmit(ms_conn_t
*c
);
102 /* status adjustment */
103 static void ms_conn_shrink(ms_conn_t
*c
);
104 static void ms_conn_set_state(ms_conn_t
*c
, int state
);
105 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
);
106 static int ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
);
107 static int ms_get_next_sock_index(ms_conn_t
*c
);
108 static int ms_update_conn_sock_event(ms_conn_t
*c
);
109 static bool ms_need_yield(ms_conn_t
*c
);
110 static void ms_update_start_time(ms_conn_t
*c
);
114 static void ms_drive_machine(ms_conn_t
*c
);
115 void ms_event_handler(const int fd
, const short which
, void *arg
);
119 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
120 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
121 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
);
124 /* binary protocol */
125 static int ms_bin_process_response(ms_conn_t
*c
);
126 static void ms_add_bin_header(ms_conn_t
*c
,
131 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
);
132 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
);
133 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
);
134 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
);
138 * each key has two parts, prefix and suffix. The suffix is a
139 * string random get form the character table. The prefix is a
140 * uint64_t variable. And the prefix must be unique. we use the
141 * prefix to identify a key. And the prefix can't include
142 * character ' ' '\r' '\n' '\0'.
146 uint64_t ms_get_key_prefix(void)
150 pthread_mutex_lock(&ms_global
.seq_mutex
);
151 key_prefix_seq
|= KEY_PREFIX_MASK
;
152 key_prefix
= key_prefix_seq
;
154 pthread_mutex_unlock(&ms_global
.seq_mutex
);
157 } /* ms_get_key_prefix */
161 * get an unique udp request id
163 * @return an unique UDP request id
165 static int ms_get_udp_request_id(void)
167 return __sync_fetch_and_add(&udp_request_id
, 1);
172 * initialize current task structure
174 * @param c, pointer of the concurrency
176 static void ms_task_init(ms_conn_t
*c
)
178 c
->curr_task
.cmd
= CMD_NULL
;
179 c
->curr_task
.item
= 0;
180 c
->curr_task
.verify
= false;
181 c
->curr_task
.finish_verify
= true;
182 c
->curr_task
.get_miss
= true;
184 c
->curr_task
.get_opt
= 0;
185 c
->curr_task
.set_opt
= 0;
186 c
->curr_task
.cycle_undo_get
= 0;
187 c
->curr_task
.cycle_undo_set
= 0;
188 c
->curr_task
.verified_get
= 0;
189 c
->curr_task
.overwrite_set
= 0;
194 * initialize udp for the connection structure
196 * @param c, pointer of the concurrency
197 * @param is_udp, whether it's udp
199 * @return int, if success, return 0, else return -1
201 static int ms_conn_udp_init(ms_conn_t
*c
, const bool is_udp
)
207 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
218 if (c
->udp
|| (! c
->udp
&& ms_setting
.facebook_test
))
220 c
->rudpbuf
= (char *)malloc((size_t)c
->rudpsize
);
221 c
->udppkt
= (ms_udppkt_t
*)malloc(MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
223 if ((c
->rudpbuf
== NULL
) || (c
->udppkt
== NULL
))
225 if (c
->rudpbuf
!= NULL
)
227 if (c
->udppkt
!= NULL
)
229 fprintf(stderr
, "malloc()\n");
232 memset(c
->udppkt
, 0, MAX_UDP_PACKET
* sizeof(ms_udppkt_t
));
236 } /* ms_conn_udp_init */
240 * initialize the connection structure
242 * @param c, pointer of the concurrency
243 * @param init_state, (conn_read, conn_write, conn_closing)
244 * @param read_buffer_size
245 * @param is_udp, whether it's udp
247 * @return int, if success, return 0, else return -1
249 static int ms_conn_init(ms_conn_t
*c
,
250 const int init_state
,
251 const int read_buffer_size
,
260 c
->rsize
= read_buffer_size
;
261 c
->wsize
= WRITE_BUFFER_SIZE
;
262 c
->iovsize
= IOV_LIST_INITIAL
;
263 c
->msgsize
= MSG_LIST_INITIAL
;
265 /* for replication, each connection need connect all the server */
266 if (ms_setting
.rep_write_srv
> 0)
268 c
->total_sfds
= ms_setting
.srv_cnt
;
272 c
->total_sfds
= ms_setting
.sock_per_conn
;
276 c
->rbuf
= (char *)malloc((size_t)c
->rsize
);
277 c
->wbuf
= (char *)malloc((size_t)c
->wsize
);
278 c
->iov
= (struct iovec
*)malloc(sizeof(struct iovec
) * (size_t)c
->iovsize
);
279 c
->msglist
= (struct msghdr
*)malloc(
280 sizeof(struct msghdr
) * (size_t)c
->msgsize
);
281 if (ms_setting
.mult_key_num
> 1)
283 c
->mlget_task
.mlget_item
= (ms_mlget_task_item_t
*)
285 sizeof(ms_mlget_task_item_t
) * (size_t)ms_setting
.mult_key_num
);
287 c
->tcpsfd
= (int *)malloc((size_t)c
->total_sfds
* sizeof(int));
289 if ((c
->rbuf
== NULL
) || (c
->wbuf
== NULL
) || (c
->iov
== NULL
)
290 || (c
->msglist
== NULL
) || (c
->tcpsfd
== NULL
)
291 || ((ms_setting
.mult_key_num
> 1)
292 && (c
->mlget_task
.mlget_item
== NULL
)))
300 if (c
->msglist
!= NULL
)
302 if (c
->mlget_task
.mlget_item
!= NULL
)
303 free(c
->mlget_task
.mlget_item
);
304 if (c
->tcpsfd
!= NULL
)
306 fprintf(stderr
, "malloc()\n");
310 c
->state
= init_state
;
318 c
->cur_idx
= c
->total_sfds
; /* default index is a invalid value */
322 c
->change_sfd
= false;
324 c
->precmd
.cmd
= c
->currcmd
.cmd
= CMD_NULL
;
325 c
->precmd
.isfinish
= true; /* default the previous command finished */
326 c
->currcmd
.isfinish
= false;
327 c
->precmd
.retstat
= c
->currcmd
.retstat
= MCD_FAILURE
;
328 c
->precmd
.key_prefix
= c
->currcmd
.key_prefix
= 0;
330 c
->mlget_task
.mlget_num
= 0;
331 c
->mlget_task
.value_index
= -1; /* default invalid value */
333 if (ms_setting
.binary_prot
)
335 c
->protocol
= binary_prot
;
339 c
->protocol
= ascii_udp_prot
;
343 c
->protocol
= ascii_prot
;
347 if (ms_conn_udp_init(c
, is_udp
) != 0)
352 /* initialize task */
355 if (! (ms_setting
.facebook_test
&& is_udp
))
357 __sync_fetch_and_add(&ms_stats
.active_conns
, 1);
365 * when doing 100% get operation, it could preset some objects
366 * to warmup the server. this function is used to initialize the
367 * number of the objects to preset.
369 * @param c, pointer of the concurrency
371 static void ms_warmup_num_init(ms_conn_t
*c
)
373 /* no set operation, preset all the items in the window */
374 if (ms_setting
.cmd_distr
[CMD_SET
].cmd_prop
< PROP_ERROR
)
376 c
->warmup_num
= c
->win_size
;
377 c
->remain_warmup_num
= c
->warmup_num
;
382 c
->remain_warmup_num
= c
->warmup_num
;
384 } /* ms_warmup_num_init */
388 * each connection has an item window, this function initialize
389 * the window. The window is used to generate task.
391 * @param c, pointer of the concurrency
393 * @return int, if success, return 0, else return -1
395 static int ms_item_win_init(ms_conn_t
*c
)
399 c
->win_size
= (int)ms_setting
.win_size
;
401 c
->exec_num
= ms_thread
.thread_ctx
->exec_num_perconn
;
402 c
->remain_exec_num
= c
->exec_num
;
404 c
->item_win
= (ms_task_item_t
*)malloc(
405 sizeof(ms_task_item_t
) * (size_t)c
->win_size
);
406 if (c
->item_win
== NULL
)
408 fprintf(stderr
, "Can't allocate task item array for conn.\n");
411 memset(c
->item_win
, 0, sizeof(ms_task_item_t
) * (size_t)c
->win_size
);
413 for (int i
= 0; i
< c
->win_size
; i
++)
415 c
->item_win
[i
].key_size
= (int)ms_setting
.distr
[i
].key_size
;
416 c
->item_win
[i
].key_prefix
= ms_get_key_prefix();
417 c
->item_win
[i
].key_suffix_offset
= ms_setting
.distr
[i
].key_offset
;
418 c
->item_win
[i
].value_size
= (int)ms_setting
.distr
[i
].value_size
;
419 c
->item_win
[i
].value_offset
= INVALID_OFFSET
; /* default in invalid offset */
420 c
->item_win
[i
].client_time
= 0;
422 /* set expire time base on the proportion */
423 if (exp_cnt
< ms_setting
.exp_ver_per
* i
)
425 c
->item_win
[i
].exp_time
= FIXED_EXPIRE_TIME
;
430 c
->item_win
[i
].exp_time
= 0;
434 ms_warmup_num_init(c
);
437 } /* ms_item_win_init */
441 * each connection structure can include one or more sock
442 * handlers. this function create these socks and connect the
445 * @param c, pointer of the concurrency
447 * @return int, if success, return 0, else return -1
449 static int ms_conn_sock_init(ms_conn_t
*c
)
456 assert(c
->tcpsfd
!= NULL
);
458 for (i
= 0; i
< c
->total_sfds
; i
++)
461 if (ms_setting
.rep_write_srv
> 0)
463 /* for replication, each connection need connect all the server */
468 /* all the connections in a thread connects the same server */
469 srv_idx
= ms_thread
.thread_ctx
->srv_idx
;
472 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
473 ms_setting
.servers
[srv_idx
].srv_port
,
474 ms_setting
.udp
, &ret_sfd
) != 0)
484 if (! ms_setting
.udp
)
486 c
->tcpsfd
[i
]= ret_sfd
;
492 /* initialize udp sock handler if necessary */
493 if (ms_setting
.facebook_test
)
496 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
497 ms_setting
.servers
[srv_idx
].srv_port
,
498 true, &ret_sfd
) != 0)
508 if ((i
!= c
->total_sfds
) || (ms_setting
.facebook_test
&& (c
->udpsfd
== 0)))
516 for (int j
= 0; j
< i
; j
++)
531 } /* ms_conn_sock_init */
535 * each connection is managed by libevent, this function
536 * initialize the event of the connection structure.
538 * @param c, pointer of the concurrency
540 * @return int, if success, return 0, else return -1
542 static int ms_conn_event_init(ms_conn_t
*c
)
544 /* default event timeout 10 seconds */
547 .tv_sec
= EVENT_TIMEOUT
, .tv_usec
= 0
549 short event_flags
= EV_WRITE
| EV_PERSIST
;
551 event_set(&c
->event
, c
->sfd
, event_flags
, ms_event_handler
, (void *)c
);
552 event_base_set(ms_thread
.base
, &c
->event
);
553 c
->ev_flags
= event_flags
;
555 if (c
->total_sfds
== 1)
557 if (event_add(&c
->event
, NULL
) == -1)
564 if (event_add(&c
->event
, &t
) == -1)
571 } /* ms_conn_event_init */
575 * setup a connection, each connection structure of each
576 * thread must call this function to initialize.
578 * @param c, pointer of the concurrency
580 * @return int, if success, return 0, else return -1
582 int ms_setup_conn(ms_conn_t
*c
)
584 if (ms_item_win_init(c
) != 0)
589 if (ms_conn_init(c
, conn_write
, DATA_BUFFER_SIZE
, ms_setting
.udp
) != 0)
594 if (ms_conn_sock_init(c
) != 0)
599 if (ms_conn_event_init(c
) != 0)
605 } /* ms_setup_conn */
609 * Frees a connection.
611 * @param c, pointer of the concurrency
613 void ms_conn_free(ms_conn_t
*c
)
617 if (c
->hdrbuf
!= NULL
)
619 if (c
->msglist
!= NULL
)
627 if (c
->mlget_task
.mlget_item
!= NULL
)
628 free(c
->mlget_task
.mlget_item
);
629 if (c
->rudpbuf
!= NULL
)
631 if (c
->udppkt
!= NULL
)
633 if (c
->item_win
!= NULL
)
635 if (c
->tcpsfd
!= NULL
)
638 if (--ms_thread
.nactive_conn
== 0)
640 free(ms_thread
.conn
);
649 * @param c, pointer of the concurrency
651 static void ms_conn_close(ms_conn_t
*c
)
655 /* delete the event, the socket and the connection */
656 event_del(&c
->event
);
658 for (int i
= 0; i
< c
->total_sfds
; i
++)
660 if (c
->tcpsfd
[i
] > 0)
667 if (ms_setting
.facebook_test
)
672 __sync_fetch_and_sub(&ms_stats
.active_conns
, 1);
676 if (ms_setting
.run_time
== 0)
678 pthread_mutex_lock(&ms_global
.run_lock
.lock
);
679 ms_global
.run_lock
.count
++;
680 pthread_cond_signal(&ms_global
.run_lock
.cond
);
681 pthread_mutex_unlock(&ms_global
.run_lock
.lock
);
684 if (ms_thread
.nactive_conn
== 0)
688 } /* ms_conn_close */
694 * @param ai, server address information
696 * @return int, if success, return 0, else return -1
698 static int ms_new_socket(struct addrinfo
*ai
)
702 if ((sfd
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
)) == -1)
704 fprintf(stderr
, "socket() error: %s.\n", strerror(errno
));
709 } /* ms_new_socket */
713 * Sets a socket's send buffer size to the maximum allowed by the system.
715 * @param sfd, file descriptor of socket
717 static void ms_maximize_sndbuf(const int sfd
)
719 socklen_t intsize
= sizeof(int);
720 unsigned int last_good
= 0;
721 unsigned int min
, max
, avg
;
722 unsigned int old_size
;
724 /* Start with the default size. */
725 if (getsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, &old_size
, &intsize
) != 0)
727 fprintf(stderr
, "getsockopt(SO_SNDBUF)\n");
731 /* Binary-search for the real maximum. */
733 max
= MAX_SENDBUF_SIZE
;
737 avg
= ((unsigned int)(min
+ max
)) / 2;
738 if (setsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, (void *)&avg
, intsize
) == 0)
748 } /* ms_maximize_sndbuf */
752 * socket connects the server
754 * @param c, pointer of the concurrency
755 * @param srv_host_name, the host name of the server
756 * @param srv_port, port of server
757 * @param is_udp, whether it's udp
758 * @param ret_sfd, the connected socket file descriptor
760 * @return int, if success, return 0, else return -1
762 static int ms_network_connect(ms_conn_t
*c
,
774 struct addrinfo
*next
;
775 struct addrinfo hints
;
776 char port_buf
[NI_MAXSERV
];
783 * the memset call clears nonstandard fields in some impementations
784 * that otherwise mess things up.
786 memset(&hints
, 0, sizeof(hints
));
787 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
790 hints
.ai_protocol
= IPPROTO_UDP
;
791 hints
.ai_socktype
= SOCK_DGRAM
;
792 hints
.ai_family
= AF_INET
; /* This left here because of issues with OSX 10.5 */
796 hints
.ai_family
= AF_UNSPEC
;
797 hints
.ai_protocol
= IPPROTO_TCP
;
798 hints
.ai_socktype
= SOCK_STREAM
;
801 snprintf(port_buf
, NI_MAXSERV
, "%d", srv_port
);
802 error
= getaddrinfo(srv_host_name
, port_buf
, &hints
, &ai
);
805 if (error
!= EAI_SYSTEM
)
806 fprintf(stderr
, "getaddrinfo(): %s.\n", gai_strerror(error
));
808 perror("getaddrinfo()\n");
813 for (next
= ai
; next
; next
= next
->ai_next
)
815 if ((sfd
= ms_new_socket(next
)) == -1)
821 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
));
824 ms_maximize_sndbuf(sfd
);
828 setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
,
830 setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
831 setsockopt(sfd
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
,
837 c
->srv_recv_addr_size
= sizeof(struct sockaddr
);
838 memcpy(&c
->srv_recv_addr
, next
->ai_addr
, c
->srv_recv_addr_size
);
842 if (connect(sfd
, next
->ai_addr
, next
->ai_addrlen
) == -1)
850 if (((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0)
851 || (fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0))
853 fprintf(stderr
, "setting O_NONBLOCK\n");
869 /* Return zero if we detected no errors in starting up connections */
871 } /* ms_network_connect */
875 * reconnect a disconnected sock
877 * @param c, pointer of the concurrency
879 * @return int, if success, return 0, else return -1
881 static int ms_reconn(ms_conn_t
*c
)
886 if (ms_setting
.rep_write_srv
> 0)
889 srv_conn_cnt
= ms_setting
.nconns
;
893 srv_idx
= ms_thread
.thread_ctx
->srv_idx
;
894 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
897 /* close the old socket handler */
899 c
->tcpsfd
[c
->cur_idx
]= 0;
901 if (__sync_fetch_and_add(&ms_setting
.servers
[srv_idx
].disconn_cnt
, 1)
904 gettimeofday(&ms_setting
.servers
[srv_idx
].disconn_time
, NULL
);
905 fprintf(stderr
, "Server %s:%d disconnect\n",
906 ms_setting
.servers
[srv_idx
].srv_host_name
,
907 ms_setting
.servers
[srv_idx
].srv_port
);
910 if (ms_setting
.rep_write_srv
> 0)
913 for (i
= 0; i
< c
->total_sfds
; i
++)
915 if (c
->tcpsfd
[i
] != 0)
921 /* all socks disconnect */
922 if (i
== c
->total_sfds
)
931 /* reconnect success, break the loop */
932 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
933 ms_setting
.servers
[srv_idx
].srv_port
,
934 ms_setting
.udp
, &c
->sfd
) == 0)
936 c
->tcpsfd
[c
->cur_idx
]= c
->sfd
;
937 if (__sync_fetch_and_add(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1)
940 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
942 (int)(ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
943 - ms_setting
.servers
[srv_idx
].disconn_time
945 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
946 ms_setting
.servers
[srv_idx
].srv_host_name
,
947 ms_setting
.servers
[srv_idx
].srv_port
, reconn_time
);
952 if (c
->total_sfds
== 1)
954 /* wait a second and reconnect */
958 while (c
->total_sfds
== 1);
961 if ((c
->total_sfds
> 1) && (c
->tcpsfd
[c
->cur_idx
] == 0))
972 * reconnect several disconnected socks in the connection
973 * structure, the ever-1-second timer of the thread will check
974 * whether some socks in the connections disconnect. if
975 * disconnect, reconnect the sock.
977 * @param c, pointer of the concurrency
979 * @return int, if success, return 0, else return -1
981 int ms_reconn_socks(ms_conn_t
*c
)
986 struct timeval cur_time
;
990 if ((c
->total_sfds
== 1) || (c
->total_sfds
== c
->alive_sfds
))
995 for (int i
= 0; i
< c
->total_sfds
; i
++)
997 if (c
->tcpsfd
[i
] == 0)
999 gettimeofday(&cur_time
, NULL
);
1002 * For failover test of replication, reconnect the socks after
1003 * it disconnects more than 5 seconds, Otherwise memslap will
1004 * block at connect() function and the work threads can't work
1008 - ms_setting
.servers
[srv_idx
].disconn_time
.tv_sec
< 5)
1013 if (ms_setting
.rep_write_srv
> 0)
1016 srv_conn_cnt
= ms_setting
.nconns
;
1020 srv_idx
= ms_thread
.thread_ctx
->srv_idx
;
1021 srv_conn_cnt
= ms_setting
.nconns
/ ms_setting
.srv_cnt
;
1024 if (ms_network_connect(c
, ms_setting
.servers
[srv_idx
].srv_host_name
,
1025 ms_setting
.servers
[srv_idx
].srv_port
,
1026 ms_setting
.udp
, &ret_sfd
) == 0)
1028 c
->tcpsfd
[i
]= ret_sfd
;
1031 if (__sync_fetch_and_add(&ms_setting
.servers
[srv_idx
].reconn_cnt
, 1)
1032 % srv_conn_cnt
== 0)
1034 gettimeofday(&ms_setting
.servers
[srv_idx
].reconn_time
, NULL
);
1036 (int)(ms_setting
.servers
[srv_idx
].reconn_time
.tv_sec
1037 - ms_setting
.servers
[srv_idx
].disconn_time
1039 fprintf(stderr
, "Server %s:%d reconnect after %ds\n",
1040 ms_setting
.servers
[srv_idx
].srv_host_name
,
1041 ms_setting
.servers
[srv_idx
].srv_port
, reconn_time
);
1048 } /* ms_reconn_socks */
1052 * Tokenize the command string by replacing whitespace with '\0' and update
1053 * the token array tokens with pointer to start of each token and length.
1054 * Returns total number of tokens. The last valid token is the terminal
1055 * token (value points to the first unprocessed character of the string and
1060 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1061 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1064 * ncommand = tokens[ix].value - command;
1065 * command = tokens[ix].value;
1068 * @param command, the command string to token
1069 * @param tokens, array to store tokens
1070 * @param max_tokens, maximum tokens number
1072 * @return int, the number of tokens
1074 static int ms_tokenize_command(char *command
,
1076 const int max_tokens
)
1081 assert(command
!= NULL
&& tokens
!= NULL
&& max_tokens
> 1);
1083 for (s
= e
= command
; ntokens
< max_tokens
- 1; ++e
)
1089 tokens
[ntokens
].value
= s
;
1090 tokens
[ntokens
].length
= (size_t)(e
- s
);
1096 else if (*e
== '\0')
1100 tokens
[ntokens
].value
= s
;
1101 tokens
[ntokens
].length
= (size_t)(e
- s
);
1105 break; /* string end */
1110 } /* ms_tokenize_command */
1114 * parse the response of server.
1116 * @param c, pointer of the concurrency
1117 * @param command, the string responded by server
1119 * @return int, if the command completed return 0, else return
1122 static int ms_ascii_process_line(ms_conn_t
*c
, char *command
)
1126 char *buffer
= command
;
1131 * for command get, we store the returned value into local buffer
1132 * then continue in ms_complete_nread().
1137 case 'V': /* VALUE || VERSION */
1138 if (buffer
[1] == 'A') /* VALUE */
1140 token_t tokens
[MAX_TOKENS
];
1141 ms_tokenize_command(command
, tokens
, MAX_TOKENS
);
1142 value_len
= strtol(tokens
[VALUELEN_TOKEN
].value
, NULL
, 10);
1143 c
->currcmd
.key_prefix
= *(uint64_t *)tokens
[KEY_TOKEN
].value
;
1146 * We read the \r\n into the string since not doing so is more
1147 * cycles then the waster of memory to do so.
1149 * We are null terminating through, which will most likely make
1150 * some people lazy about using the return length.
1152 c
->rvbytes
= (int)(value_len
+ 2);
1160 c
->currcmd
.retstat
= MCD_SUCCESS
;
1162 case 'S': /* STORED STATS SERVER_ERROR */
1163 if (buffer
[2] == 'A') /* STORED STATS */
1165 c
->currcmd
.retstat
= MCD_STAT
;
1167 else if (buffer
[1] == 'E')
1170 printf("<%d %s\n", c
->sfd
, buffer
);
1172 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
1174 else if (buffer
[1] == 'T')
1177 c
->currcmd
.retstat
= MCD_STORED
;
1181 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1185 case 'D': /* DELETED DATA */
1186 if (buffer
[1] == 'E')
1188 c
->currcmd
.retstat
= MCD_DELETED
;
1192 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1197 case 'N': /* NOT_FOUND NOT_STORED*/
1198 if (buffer
[4] == 'F')
1200 c
->currcmd
.retstat
= MCD_NOTFOUND
;
1202 else if (buffer
[4] == 'S')
1204 printf("<%d %s\n", c
->sfd
, buffer
);
1205 c
->currcmd
.retstat
= MCD_NOTSTORED
;
1209 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1213 case 'E': /* PROTOCOL ERROR or END */
1214 if (buffer
[1] == 'N')
1217 c
->currcmd
.retstat
= MCD_END
;
1219 else if (buffer
[1] == 'R')
1221 printf("<%d ERROR\n", c
->sfd
);
1222 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
1224 else if (buffer
[1] == 'X')
1226 c
->currcmd
.retstat
= MCD_DATA_EXISTS
;
1227 printf("<%d %s\n", c
->sfd
, buffer
);
1231 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1235 case 'C': /* CLIENT ERROR */
1236 printf("<%d %s\n", c
->sfd
, buffer
);
1237 c
->currcmd
.retstat
= MCD_CLIENT_ERROR
;
1241 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
1246 } /* ms_ascii_process_line */
1250 * after one operation completes, reset the concurrency
1252 * @param c, pointer of the concurrency
1253 * @param timeout, whether it's timeout
1255 void ms_reset_conn(ms_conn_t
*c
, bool timeout
)
1261 if ((c
->packets
> 0) && (c
->packets
< MAX_UDP_PACKET
))
1263 memset(c
->udppkt
, 0, sizeof(ms_udppkt_t
) * (uint64_t)c
->packets
);
1272 c
->currcmd
.isfinish
= true;
1276 ms_conn_set_state(c
, conn_write
);
1277 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
1281 ms_drive_machine(c
);
1283 } /* ms_reset_conn */
1287 * if we have a complete line in the buffer, process it.
1289 * @param c, pointer of the concurrency
1291 * @return int, if success, return 0, else return -1
1293 static int ms_try_read_line(ms_conn_t
*c
)
1295 if (c
->protocol
== binary_prot
)
1297 /* Do we have the complete packet header? */
1298 if ((uint64_t)c
->rbytes
< sizeof(c
->binary_header
))
1300 /* need more data! */
1306 if (((long)(c
->rcurr
)) % 8 != 0)
1308 /* must realign input buffer */
1309 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
1311 if (settings
.verbose
)
1313 fprintf(stderr
, "%d: Realign input buffer.\n", c
->sfd
);
1317 protocol_binary_response_header
*rsp
;
1318 rsp
= (protocol_binary_response_header
*)c
->rcurr
;
1320 c
->binary_header
= *rsp
;
1321 c
->binary_header
.response
.extlen
= rsp
->response
.extlen
;
1322 c
->binary_header
.response
.keylen
= ntohl(rsp
->response
.keylen
);
1323 c
->binary_header
.response
.bodylen
= ntohl(rsp
->response
.bodylen
);
1324 c
->binary_header
.response
.status
= ntohl(rsp
->response
.status
);
1326 if (c
->binary_header
.response
.magic
!= PROTOCOL_BINARY_RES
)
1328 fprintf(stderr
, "Invalid magic: %x\n",
1329 c
->binary_header
.response
.magic
);
1330 ms_conn_set_state(c
, conn_closing
);
1334 /* process this complete response */
1335 if (ms_bin_process_response(c
) == 0)
1337 /* current operation completed */
1338 ms_reset_conn(c
, false);
1343 c
->rbytes
-= (int32_t)sizeof(c
->binary_header
);
1344 c
->rcurr
+= sizeof(c
->binary_header
);
1353 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1358 el
= memchr(c
->rcurr
, '\n', (size_t)c
->rbytes
);
1363 if (((el
- c
->rcurr
) > 1) && (*(el
- 1) == '\r'))
1369 assert(cont
<= (c
->rcurr
+ c
->rbytes
));
1371 /* process this complete line */
1372 if (ms_ascii_process_line(c
, c
->rcurr
) == 0)
1374 /* current operation completed */
1375 ms_reset_conn(c
, false);
1380 /* current operation didn't complete */
1381 c
->rbytes
-= (int32_t)(cont
- c
->rcurr
);
1385 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1389 } /* ms_try_read_line */
1393 * because the packet of UDP can't ensure the order, the
1394 * function is used to sort the received udp packet.
1396 * @param c, pointer of the concurrency
1397 * @param buf, the buffer to store the ordered packages data
1398 * @param rbytes, the maximum capacity of the buffer
1400 * @return int, if success, return the copy bytes, else return
1403 static int ms_sort_udp_packet(ms_conn_t
*c
, char *buf
, int rbytes
)
1408 uint16_t seq_num
= 0;
1409 uint16_t packets
= 0;
1410 unsigned char *header
= NULL
;
1412 /* no enough data */
1414 assert(buf
!= NULL
);
1415 assert(c
->rudpbytes
>= UDP_HEADER_SIZE
);
1417 /* calculate received packets count */
1418 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
>= UDP_HEADER_SIZE
)
1420 /* the last packet has some data */
1421 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
+ 1;
1425 c
->recvpkt
= c
->rudpbytes
/ UDP_MAX_PAYLOAD_SIZE
;
1428 /* get the total packets count if necessary */
1429 if (c
->packets
== 0)
1431 c
->packets
= HEADER_TO_PACKETS((unsigned char *)c
->rudpbuf
);
1434 /* build the ordered packet array */
1435 for (int i
= c
->pktcurr
; i
< c
->recvpkt
; i
++)
1437 header
= (unsigned char *)c
->rudpbuf
+ i
* UDP_MAX_PAYLOAD_SIZE
;
1438 req_id
= (uint16_t)HEADER_TO_REQID(header
);
1439 assert(req_id
== c
->request_id
% (1 << 16));
1441 packets
= (uint16_t)HEADER_TO_PACKETS(header
);
1442 assert(c
->packets
== HEADER_TO_PACKETS(header
));
1444 seq_num
= (uint16_t)HEADER_TO_SEQNUM(header
);
1445 c
->udppkt
[seq_num
].header
= header
;
1446 c
->udppkt
[seq_num
].data
= (char *)header
+ UDP_HEADER_SIZE
;
1448 if (i
== c
->recvpkt
- 1)
1450 /* last received packet */
1451 if (c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
== 0)
1453 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1458 c
->udppkt
[seq_num
].rbytes
= c
->rudpbytes
% UDP_MAX_PAYLOAD_SIZE
1464 c
->udppkt
[seq_num
].rbytes
= UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
;
1469 for (int i
= c
->ordcurr
; i
< c
->recvpkt
; i
++)
1471 /* there is some data to copy */
1472 if ((c
->udppkt
[i
].data
!= NULL
)
1473 && (c
->udppkt
[i
].copybytes
< c
->udppkt
[i
].rbytes
))
1475 header
= c
->udppkt
[i
].header
;
1476 len
= c
->udppkt
[i
].rbytes
- c
->udppkt
[i
].copybytes
;
1477 if (len
> rbytes
- wbytes
)
1479 len
= rbytes
- wbytes
;
1482 assert(len
<= rbytes
- wbytes
);
1483 assert(i
== HEADER_TO_SEQNUM(header
));
1485 memcpy(buf
+ wbytes
, c
->udppkt
[i
].data
+ c
->udppkt
[i
].copybytes
,
1488 c
->udppkt
[i
].copybytes
+= len
;
1490 if ((c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
)
1491 && (c
->udppkt
[i
].rbytes
== UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
))
1493 /* finish copying all the data of this packet, next */
1497 /* last received packet, and finish copying all the data */
1498 if ((c
->recvpkt
== c
->packets
) && (i
== c
->recvpkt
- 1)
1499 && (c
->udppkt
[i
].copybytes
== c
->udppkt
[i
].rbytes
))
1504 /* no space to copy data */
1505 if (wbytes
>= rbytes
)
1510 /* it doesn't finish reading all the data of the packet from network */
1511 if ((i
!= c
->recvpkt
- 1)
1512 && (c
->udppkt
[i
].rbytes
< UDP_MAX_PAYLOAD_SIZE
- UDP_HEADER_SIZE
))
1519 /* no data to copy */
1524 return wbytes
== 0 ? -1 : wbytes
;
1525 } /* ms_sort_udp_packet */
1529 * encapsulate upd read like tcp read
1531 * @param c, pointer of the concurrency
1532 * @param buf, read buffer
1533 * @param len, length to read
1535 * @return int, if success, return the read bytes, else return
1538 static int ms_udp_read(ms_conn_t
*c
, char *buf
, int len
)
1549 if (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
> c
->rudpsize
)
1551 char *new_rbuf
= realloc(c
->rudpbuf
, (size_t)c
->rudpsize
* 2);
1554 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1555 c
->rudpbytes
= 0; /* ignore what we read */
1558 c
->rudpbuf
= new_rbuf
;
1562 avail
= c
->rudpsize
- c
->rudpbytes
;
1563 /* UDP each time read a packet, 1400 bytes */
1564 res
= (int)read(c
->sfd
, c
->rudpbuf
+ c
->rudpbytes
, (size_t)avail
);
1568 __sync_fetch_and_add(&ms_stats
.bytes_read
, res
);
1583 /* "connection" closed */
1589 /* no data to read */
1594 /* copy data to read buffer */
1597 copybytes
= ms_sort_udp_packet(c
, buf
, len
);
1600 if (copybytes
== -1)
1602 __sync_fetch_and_add(&ms_stats
.pkt_disorder
, 1);
1610 * read from network as much as we can, handle buffer overflow and connection
1612 * before reading, move the remaining incomplete fragment of a command
1613 * (if any) to the beginning of the buffer.
1614 * return 0 if there's nothing to read on the first read.
1618 * read from network as much as we can, handle buffer overflow and connection
1619 * close. before reading, move the remaining incomplete fragment of a command
1620 * (if any) to the beginning of the buffer.
1622 * @param c, pointer of the concurrency
1625 * return 0 if there's nothing to read on the first read.
1626 * return 1 if get data
1627 * return -1 if error happens
1629 static int ms_try_read_network(ms_conn_t
*c
)
1637 if ((c
->rcurr
!= c
->rbuf
)
1638 && (! c
->readval
|| (c
->rvbytes
> c
->rsize
- (c
->rcurr
- c
->rbuf
))
1639 || (c
->readval
&& (c
->rcurr
- c
->rbuf
> c
->rbytes
))))
1641 if (c
->rbytes
!= 0) /* otherwise there's nothing to copy */
1642 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
1648 if (c
->rbytes
>= c
->rsize
)
1650 char *new_rbuf
= realloc(c
->rbuf
, (size_t)c
->rsize
* 2);
1653 fprintf(stderr
, "Couldn't realloc input buffer.\n");
1654 c
->rbytes
= 0; /* ignore what we read */
1657 c
->rcurr
= c
->rbuf
= new_rbuf
;
1661 avail
= c
->rsize
- c
->rbytes
- (c
->rcurr
- c
->rbuf
);
1669 res
= (int32_t)ms_udp_read(c
, c
->rcurr
+ c
->rbytes
, (int32_t)avail
);
1673 res
= (int)read(c
->sfd
, c
->rcurr
+ c
->rbytes
, (size_t)avail
);
1680 __sync_fetch_and_add(&ms_stats
.bytes_read
, res
);
1695 /* connection closed */
1696 ms_conn_set_state(c
, conn_closing
);
1701 if ((errno
== EAGAIN
) || (errno
== EWOULDBLOCK
))
1703 /* Should close on unhandled errors. */
1704 ms_conn_set_state(c
, conn_closing
);
1710 } /* ms_try_read_network */
1714 * after get the object from server, verify the value if
1717 * @param c, pointer of the concurrency
1718 * @param mlget_item, pointer of mulit-get task item structure
1719 * @param value, received value string
1720 * @param vlen, received value string length
1722 static void ms_verify_value(ms_conn_t
*c
,
1723 ms_mlget_task_item_t
*mlget_item
,
1727 if (c
->curr_task
.verify
)
1729 assert(c
->curr_task
.item
->value_offset
!= INVALID_OFFSET
);
1730 char *orignval
= &ms_setting
.char_block
[c
->curr_task
.item
->value_offset
];
1732 &ms_setting
.char_block
[c
->curr_task
.item
->key_suffix_offset
];
1734 /* verify expire time if necessary */
1735 if (c
->curr_task
.item
->exp_time
> 0)
1737 struct timeval curr_time
;
1738 gettimeofday(&curr_time
, NULL
);
1740 /* object expired but get it now */
1741 if (curr_time
.tv_sec
- c
->curr_task
.item
->client_time
1742 > c
->curr_task
.item
->exp_time
+ EXPIRE_TIME_ERROR
)
1744 __sync_fetch_and_add(&ms_stats
.exp_get
, 1);
1746 if (ms_setting
.verbose
)
1750 strftime(set_time
, 64, "%Y-%m-%d %H:%M:%S",
1751 localtime(&c
->curr_task
.item
->client_time
));
1752 strftime(cur_time
, 64, "%Y-%m-%d %H:%M:%S",
1753 localtime(&curr_time
.tv_sec
));
1755 "\n<%d expire time verification failed, "
1756 "object expired but get it now\n"
1759 "\tset time: %s current time: %s "
1760 "diff time: %d expire time: %d\n"
1761 "\texpected data: \n"
1762 "\treceived data len: %d\n"
1763 "\treceived data: %.*s\n",
1765 c
->curr_task
.item
->key_size
,
1766 c
->curr_task
.item
->key_prefix
,
1767 c
->curr_task
.item
->key_size
- (int)KEY_PREFIX_SIZE
,
1771 (int)(curr_time
.tv_sec
- c
->curr_task
.item
->client_time
),
1772 c
->curr_task
.item
->exp_time
,
1782 if ((c
->curr_task
.item
->value_size
!= vlen
)
1783 || (memcmp(orignval
, value
, (size_t)vlen
) != 0))
1785 __sync_fetch_and_add(&ms_stats
.vef_failed
, 1);
1787 if (ms_setting
.verbose
)
1790 "\n<%d data verification failed\n"
1793 "\texpected data len: %d\n"
1794 "\texpected data: %.*s\n"
1795 "\treceived data len: %d\n"
1796 "\treceived data: %.*s\n",
1798 c
->curr_task
.item
->key_size
,
1799 c
->curr_task
.item
->key_prefix
,
1800 c
->curr_task
.item
->key_size
- (int)KEY_PREFIX_SIZE
,
1802 c
->curr_task
.item
->value_size
,
1803 c
->curr_task
.item
->value_size
,
1813 c
->curr_task
.finish_verify
= true;
1815 if (mlget_item
!= NULL
)
1817 mlget_item
->finish_verify
= true;
1820 } /* ms_verify_value */
1824 * For ASCII protocol, after store the data into the local
1825 * buffer, run this function to handle the data.
1827 * @param c, pointer of the concurrency
1829 static void ms_ascii_complete_nread(ms_conn_t
*c
)
1832 assert(c
->rbytes
>= c
->rvbytes
);
1833 assert(c
->protocol
== ascii_udp_prot
|| c
->protocol
== ascii_prot
);
1837 c
->rcurr
[c
->rvbytes
- 1] == '\n' && c
->rcurr
[c
->rvbytes
- 2] == '\r');
1841 ms_mlget_task_item_t
*mlget_item
= NULL
;
1842 if (((ms_setting
.mult_key_num
> 1)
1843 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1844 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1846 c
->mlget_task
.value_index
++;
1847 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1849 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
)
1851 c
->curr_task
.item
= mlget_item
->item
;
1852 c
->curr_task
.verify
= mlget_item
->verify
;
1853 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1854 mlget_item
->get_miss
= false;
1858 /* Try to find the task item in multi-get task array */
1859 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
1861 mlget_item
= &c
->mlget_task
.mlget_item
[i
];
1862 if (mlget_item
->item
->key_prefix
== c
->currcmd
.key_prefix
)
1864 c
->curr_task
.item
= mlget_item
->item
;
1865 c
->curr_task
.verify
= mlget_item
->verify
;
1866 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1867 mlget_item
->get_miss
= false;
1875 ms_verify_value(c
, mlget_item
, c
->rcurr
, c
->rvbytes
- 2);
1877 c
->curr_task
.get_miss
= false;
1878 c
->rbytes
-= c
->rvbytes
;
1879 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1880 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1883 } /* ms_ascii_complete_nread */
1887 * For binary protocol, after store the data into the local
1888 * buffer, run this function to handle the data.
1890 * @param c, pointer of the concurrency
1892 static void ms_bin_complete_nread(ms_conn_t
*c
)
1895 assert(c
->rbytes
>= c
->rvbytes
);
1896 assert(c
->protocol
== binary_prot
);
1898 int extlen
= c
->binary_header
.response
.extlen
;
1899 int keylen
= c
->binary_header
.response
.keylen
;
1900 uint8_t opcode
= c
->binary_header
.response
.opcode
;
1902 /* not get command or not include value, just return */
1903 if (((opcode
!= PROTOCOL_BINARY_CMD_GET
)
1904 && (opcode
!= PROTOCOL_BINARY_CMD_GETQ
))
1905 || (c
->rvbytes
<= extlen
+ keylen
))
1908 if (c
->binary_header
.response
.opcode
== PROTOCOL_BINARY_CMD_GET
)
1910 c
->currcmd
.retstat
= MCD_END
;
1911 c
->curr_task
.get_miss
= true;
1916 ms_reset_conn(c
, false);
1921 ms_mlget_task_item_t
*mlget_item
= NULL
;
1922 if (((ms_setting
.mult_key_num
> 1)
1923 && (c
->mlget_task
.mlget_num
>= ms_setting
.mult_key_num
))
1924 || ((c
->remain_exec_num
== 0) && (c
->mlget_task
.mlget_num
> 0)))
1926 c
->mlget_task
.value_index
++;
1927 mlget_item
= &c
->mlget_task
.mlget_item
[c
->mlget_task
.value_index
];
1929 c
->curr_task
.item
= mlget_item
->item
;
1930 c
->curr_task
.verify
= mlget_item
->verify
;
1931 c
->curr_task
.finish_verify
= mlget_item
->finish_verify
;
1932 mlget_item
->get_miss
= false;
1937 c
->rcurr
+ extlen
+ keylen
,
1938 c
->rvbytes
- extlen
- keylen
);
1940 c
->currcmd
.retstat
= MCD_END
;
1941 c
->curr_task
.get_miss
= false;
1942 c
->rbytes
-= c
->rvbytes
;
1943 c
->rcurr
= c
->rcurr
+ c
->rvbytes
;
1944 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
1948 if (ms_setting
.mult_key_num
> 1)
1950 /* multi-get have check all the item */
1951 if (c
->mlget_task
.value_index
== c
->mlget_task
.mlget_num
- 1)
1953 ms_reset_conn(c
, false);
1959 ms_reset_conn(c
, false);
1961 } /* ms_bin_complete_nread */
1965 * we get here after reading the value of get commands.
1967 * @param c, pointer of the concurrency
1969 static void ms_complete_nread(ms_conn_t
*c
)
1972 assert(c
->rbytes
>= c
->rvbytes
);
1973 assert(c
->protocol
== ascii_udp_prot
1974 || c
->protocol
== ascii_prot
1975 || c
->protocol
== binary_prot
);
1977 if (c
->protocol
== binary_prot
)
1979 ms_bin_complete_nread(c
);
1983 ms_ascii_complete_nread(c
);
1985 } /* ms_complete_nread */
1989 * Adds a message header to a connection.
1991 * @param c, pointer of the concurrency
1993 * @return int, if success, return 0, else return -1
1995 static int ms_add_msghdr(ms_conn_t
*c
)
2001 if (c
->msgsize
== c
->msgused
)
2004 realloc(c
->msglist
, (uint64_t)c
->msgsize
* 2 * sizeof(struct msghdr
));
2012 msg
= c
->msglist
+ c
->msgused
;
2015 * this wipes msg_iovlen, msg_control, msg_controllen, and
2016 * msg_flags, the last 3 of which aren't defined on solaris:
2018 memset(msg
, 0, sizeof(struct msghdr
));
2020 msg
->msg_iov
= &c
->iov
[c
->iovused
];
2022 if (c
->udp
&& (c
->srv_recv_addr_size
> 0))
2024 msg
->msg_name
= &c
->srv_recv_addr
;
2025 msg
->msg_namelen
= c
->srv_recv_addr_size
;
2033 /* Leave room for the UDP header, which we'll fill in later. */
2034 return ms_add_iov(c
, NULL
, UDP_HEADER_SIZE
);
2038 } /* ms_add_msghdr */
2042 * Ensures that there is room for another structure iovec in a connection's
2045 * @param c, pointer of the concurrency
2047 * @return int, if success, return 0, else return -1
2049 static int ms_ensure_iov_space(ms_conn_t
*c
)
2053 if (c
->iovused
>= c
->iovsize
)
2056 struct iovec
*new_iov
= (struct iovec
*)realloc(c
->iov
,
2057 ((uint64_t)c
->iovsize
2059 * sizeof(struct iovec
));
2066 /* Point all the msghdr structures at the new list. */
2067 for (i
= 0, iovnum
= 0; i
< c
->msgused
; i
++)
2069 c
->msglist
[i
].msg_iov
= &c
->iov
[iovnum
];
2070 iovnum
+= (int)c
->msglist
[i
].msg_iovlen
;
2075 } /* ms_ensure_iov_space */
2079 * Adds data to the list of pending data that will be written out to a
2082 * @param c, pointer of the concurrency
2083 * @param buf, the buffer includes data to send
2084 * @param len, the data length in the buffer
2086 * @return int, if success, return 0, else return -1
2088 static int ms_add_iov(ms_conn_t
*c
, const void *buf
, int len
)
2098 m
= &c
->msglist
[c
->msgused
- 1];
2101 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2103 limit_to_mtu
= c
->udp
;
2105 /* We may need to start a new msghdr if this one is full. */
2106 if ((m
->msg_iovlen
== IOV_MAX
)
2107 || (limit_to_mtu
&& (c
->msgbytes
>= UDP_MAX_SEND_PAYLOAD_SIZE
)))
2110 m
= &c
->msglist
[c
->msgused
- 1];
2113 if (ms_ensure_iov_space(c
) != 0)
2116 /* If the fragment is too big to fit in the datagram, split it up */
2117 if (limit_to_mtu
&& (len
+ c
->msgbytes
> UDP_MAX_SEND_PAYLOAD_SIZE
))
2119 leftover
= len
+ c
->msgbytes
- UDP_MAX_SEND_PAYLOAD_SIZE
;
2127 m
= &c
->msglist
[c
->msgused
- 1];
2128 m
->msg_iov
[m
->msg_iovlen
].iov_base
= (void *)buf
;
2129 m
->msg_iov
[m
->msg_iovlen
].iov_len
= (size_t)len
;
2135 buf
= ((char *)buf
) + len
;
2138 while (leftover
> 0);
2145 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2147 * @param c, pointer of the concurrency
2149 * @return int, if success, return 0, else return -1
2151 static int ms_build_udp_headers(ms_conn_t
*c
)
2158 c
->request_id
= ms_get_udp_request_id();
2160 if (c
->msgused
> c
->hdrsize
)
2164 new_hdrbuf
= realloc(c
->hdrbuf
,
2165 (size_t)c
->msgused
* 2 * UDP_HEADER_SIZE
);
2167 new_hdrbuf
= malloc((size_t)c
->msgused
* 2 * UDP_HEADER_SIZE
);
2171 c
->hdrbuf
= (unsigned char *)new_hdrbuf
;
2172 c
->hdrsize
= c
->msgused
* 2;
2175 /* If this is a multi-packet request, drop it. */
2176 if (c
->udp
&& (c
->msgused
> 1))
2178 fprintf(stderr
, "multi-packet request for UDP not supported.\n");
2183 for (i
= 0; i
< c
->msgused
; i
++)
2185 c
->msglist
[i
].msg_iov
[0].iov_base
= hdr
;
2186 c
->msglist
[i
].msg_iov
[0].iov_len
= UDP_HEADER_SIZE
;
2187 *hdr
++= (unsigned char)(c
->request_id
/ 256);
2188 *hdr
++= (unsigned char)(c
->request_id
% 256);
2189 *hdr
++= (unsigned char)(i
/ 256);
2190 *hdr
++= (unsigned char)(i
% 256);
2191 *hdr
++= (unsigned char)(c
->msgused
/ 256);
2192 *hdr
++= (unsigned char)(c
->msgused
% 256);
2193 *hdr
++= (unsigned char)1; /* support facebook memcached */
2194 *hdr
++= (unsigned char)0;
2196 ((unsigned char *)c
->msglist
[i
].msg_iov
[0].iov_base
2197 + UDP_HEADER_SIZE
));
2201 } /* ms_build_udp_headers */
2205 * Transmit the next chunk of data from our list of msgbuf structures.
2207 * @param c, pointer of the concurrency
2209 * @return TRANSMIT_COMPLETE All done writing.
2210 * TRANSMIT_INCOMPLETE More data remaining to write.
2211 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2212 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2214 static int ms_transmit(ms_conn_t
*c
)
2218 if ((c
->msgcurr
< c
->msgused
)
2219 && (c
->msglist
[c
->msgcurr
].msg_iovlen
== 0))
2221 /* Finished writing the current msg; advance to the next. */
2225 if (c
->msgcurr
< c
->msgused
)
2228 struct msghdr
*m
= &c
->msglist
[c
->msgcurr
];
2230 res
= sendmsg(c
->sfd
, m
, 0);
2233 __sync_fetch_and_add(&ms_stats
.bytes_written
, res
);
2235 /* We've written some of the data. Remove the completed
2236 * iovec entries from the list of pending writes. */
2237 while (m
->msg_iovlen
> 0 && res
>= (ssize_t
)m
->msg_iov
->iov_len
)
2239 res
-= (ssize_t
)m
->msg_iov
->iov_len
;
2244 /* Might have written just part of the last iovec entry;
2245 * adjust it so the next write will do the rest. */
2248 m
->msg_iov
->iov_base
= (unsigned char *)m
->msg_iov
->iov_base
+ res
;
2249 m
->msg_iov
->iov_len
-= (uint64_t)res
;
2251 return TRANSMIT_INCOMPLETE
;
2253 if ((res
== -1) && ((errno
== EAGAIN
) || (errno
== EWOULDBLOCK
)))
2255 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2257 fprintf(stderr
, "Couldn't update event.\n");
2258 ms_conn_set_state(c
, conn_closing
);
2259 return TRANSMIT_HARD_ERROR
;
2261 return TRANSMIT_SOFT_ERROR
;
2264 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2265 * we have a real error, on which we close the connection */
2266 fprintf(stderr
, "Failed to write, and not due to blocking.\n");
2268 ms_conn_set_state(c
, conn_closing
);
2269 return TRANSMIT_HARD_ERROR
;
2273 return TRANSMIT_COMPLETE
;
2279 * Shrinks a connection's buffers if they're too big. This prevents
2280 * periodic large "mget" response from server chewing lots of client
2283 * This should only be called in between requests since it can wipe output
2286 * @param c, pointer of the concurrency
2288 static void ms_conn_shrink(ms_conn_t
*c
)
2295 if ((c
->rsize
> READ_BUFFER_HIGHWAT
) && (c
->rbytes
< DATA_BUFFER_SIZE
))
2299 if (c
->rcurr
!= c
->rbuf
)
2300 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
2302 newbuf
= (char *)realloc((void *)c
->rbuf
, DATA_BUFFER_SIZE
);
2307 c
->rsize
= DATA_BUFFER_SIZE
;
2312 if (c
->udp
&& (c
->rudpsize
> UDP_DATA_BUFFER_HIGHWAT
)
2313 && (c
->rudpbytes
+ UDP_MAX_PAYLOAD_SIZE
< UDP_DATA_BUFFER_SIZE
))
2315 char *new_rbuf
= (char *)realloc(c
->rudpbuf
, (size_t)c
->rudpsize
* 2);
2318 c
->rudpbuf
= new_rbuf
;
2319 c
->rudpsize
= UDP_DATA_BUFFER_SIZE
;
2321 /* TODO check error condition? */
2324 if (c
->msgsize
> MSG_LIST_HIGHWAT
)
2326 struct msghdr
*newbuf
= (struct msghdr
*)realloc(
2329 * sizeof(c
->msglist
[0]));
2333 c
->msgsize
= MSG_LIST_INITIAL
;
2335 /* TODO check error condition? */
2338 if (c
->iovsize
> IOV_LIST_HIGHWAT
)
2340 struct iovec
*newbuf
= (struct iovec
*)realloc((void *)c
->iov
,
2342 * sizeof(c
->iov
[0]));
2346 c
->iovsize
= IOV_LIST_INITIAL
;
2348 /* TODO check return value */
2350 } /* ms_conn_shrink */
2354 * Sets a connection's current state in the state machine. Any special
2355 * processing that needs to happen on certain state transitions can
2358 * @param c, pointer of the concurrency
2359 * @param state, connection state
2361 static void ms_conn_set_state(ms_conn_t
*c
, int state
)
2365 if (state
!= c
->state
)
2367 if (state
== conn_read
)
2373 } /* ms_conn_set_state */
2377 * update the event if socks change state. for example: when
2378 * change the listen scoket read event to sock write event, or
2379 * change socket handler, we could call this function.
2381 * @param c, pointer of the concurrency
2382 * @param new_flags, new event flags
2384 * @return bool, if success, return true, else return false
2386 static bool ms_update_event(ms_conn_t
*c
, const int new_flags
)
2388 /* default event timeout 10 seconds */
2391 .tv_sec
= EVENT_TIMEOUT
, .tv_usec
= 0
2396 struct event_base
*base
= c
->event
.ev_base
;
2397 if ((c
->ev_flags
== new_flags
) && (ms_setting
.rep_write_srv
== 0)
2398 && (! ms_setting
.facebook_test
|| (c
->total_sfds
== 1)))
2403 if (event_del(&c
->event
) == -1)
2405 /* try to delete the event again */
2406 if (event_del(&c
->event
) == -1)
2412 event_set(&c
->event
,
2417 event_base_set(base
, &c
->event
);
2418 c
->ev_flags
= (short)new_flags
;
2420 if (c
->total_sfds
== 1)
2422 if (event_add(&c
->event
, NULL
) == -1)
2429 if (event_add(&c
->event
, &t
) == -1)
2436 } /* ms_update_event */
2440 * If user want to get the expected throughput, we could limit
2441 * the performance of memslap. we could give up some work and
2442 * just wait a short time. The function is used to check this
2445 * @param c, pointer of the concurrency
2447 * @return bool, if success, return true, else return false
2449 static bool ms_need_yield(ms_conn_t
*c
)
2452 int64_t time_diff
= 0;
2453 struct timeval curr_time
;
2454 ms_task_t
*task
= &c
->curr_task
;
2456 if (ms_setting
.expected_tps
> 0)
2458 gettimeofday(&curr_time
, NULL
);
2459 time_diff
= ms_time_diff(&ms_thread
.startup_time
, &curr_time
);
2461 (int64_t)((task
->get_opt
2462 + task
->set_opt
) / ((uint64_t)time_diff
/ 1000000));
2464 /* current throughput is greater than expected throughput */
2465 if (tps
> ms_thread
.thread_ctx
->tps_perconn
)
2472 } /* ms_need_yield */
2476 * used to update the start time of each operation
2478 * @param c, pointer of the concurrency
2480 static void ms_update_start_time(ms_conn_t
*c
)
2482 ms_task_item_t
*item
= c
->curr_task
.item
;
2484 if ((ms_setting
.stat_freq
> 0) || c
->udp
2485 || ((c
->currcmd
.cmd
== CMD_SET
) && (item
->exp_time
> 0)))
2487 gettimeofday(&c
->start_time
, NULL
);
2488 if ((c
->currcmd
.cmd
== CMD_SET
) && (item
->exp_time
> 0))
2490 /* record the current time */
2491 item
->client_time
= c
->start_time
.tv_sec
;
2494 } /* ms_update_start_time */
2498 * run the state machine
2500 * @param c, pointer of the concurrency
2502 static void ms_drive_machine(ms_conn_t
*c
)
2515 if (c
->rbytes
>= c
->rvbytes
)
2517 ms_complete_nread(c
);
2523 if (ms_try_read_line(c
) != 0)
2529 if (ms_try_read_network(c
) != 0)
2534 /* doesn't read all the response data, wait event wake up */
2535 if (! c
->currcmd
.isfinish
)
2537 if (! ms_update_event(c
, EV_READ
| EV_PERSIST
))
2539 fprintf(stderr
, "Couldn't update event.\n");
2540 ms_conn_set_state(c
, conn_closing
);
2547 /* we have no command line and no data to read from network, next write */
2548 ms_conn_set_state(c
, conn_write
);
2549 memcpy(&c
->precmd
, &c
->currcmd
, sizeof(ms_cmdstat_t
)); /* replicate command state */
2554 if (! c
->ctnwrite
&& ms_need_yield(c
))
2558 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2560 fprintf(stderr
, "Couldn't update event.\n");
2561 ms_conn_set_state(c
, conn_closing
);
2568 if (! c
->ctnwrite
&& (ms_exec_task(c
) != 0))
2570 ms_conn_set_state(c
, conn_closing
);
2574 /* record the start time before starting to send data if necessary */
2575 if (! c
->ctnwrite
|| (c
->change_sfd
&& c
->ctnwrite
))
2579 c
->change_sfd
= false;
2581 ms_update_start_time(c
);
2584 /* change sfd if necessary */
2592 /* execute task until nothing need be written to network */
2593 if (! c
->ctnwrite
&& (c
->msgcurr
== c
->msgused
))
2595 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2597 fprintf(stderr
, "Couldn't update event.\n");
2598 ms_conn_set_state(c
, conn_closing
);
2605 switch (ms_transmit(c
))
2607 case TRANSMIT_COMPLETE
:
2608 /* we have no data to write to network, next wait repose */
2609 if (! ms_update_event(c
, EV_READ
| EV_PERSIST
))
2611 fprintf(stderr
, "Couldn't update event.\n");
2612 ms_conn_set_state(c
, conn_closing
);
2616 ms_conn_set_state(c
, conn_read
);
2621 case TRANSMIT_INCOMPLETE
:
2623 break; /* Continue in state machine. */
2625 case TRANSMIT_HARD_ERROR
:
2629 case TRANSMIT_SOFT_ERROR
:
2641 /* recovery mode, need reconnect if connection close */
2642 if (ms_setting
.reconnect
&& (! ms_global
.time_out
2643 || ((ms_setting
.run_time
== 0)
2644 && (c
->remain_exec_num
> 0))))
2646 if (ms_reconn(c
) != 0)
2653 ms_reset_conn(c
, false);
2655 if (c
->total_sfds
== 1)
2657 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2659 fprintf(stderr
, "Couldn't update event.\n");
2660 ms_conn_set_state(c
, conn_closing
);
2678 } /* ms_drive_machine */
2682 * the event handler of each thread
2684 * @param fd, the file descriptor of socket
2685 * @param which, event flag
2686 * @param arg, argument
2688 void ms_event_handler(const int fd
, const short which
, void *arg
)
2690 ms_conn_t
*c
= (ms_conn_t
*)arg
;
2700 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2706 assert(fd
== c
->sfd
);
2708 /* event timeout, close the current connection */
2709 if (c
->which
== EV_TIMEOUT
)
2711 ms_conn_set_state(c
, conn_closing
);
2714 ms_drive_machine(c
);
2716 /* wait for next event */
2717 } /* ms_event_handler */
2721 * get the next socket descriptor index to run for replication
2723 * @param c, pointer of the concurrency
2724 * @param cmd, command(get or set )
2726 * @return int, if success, return the index, else return 0
2728 static int ms_get_rep_sock_index(ms_conn_t
*c
, int cmd
)
2733 if (c
->total_sfds
== 1)
2738 if (ms_setting
.rep_write_srv
== 0)
2747 for (i
= 0; i
< ms_setting
.rep_write_srv
; i
++)
2749 if (c
->tcpsfd
[i
] > 0)
2755 if (i
== ms_setting
.rep_write_srv
)
2757 /* random get one replication server to read */
2758 sock_index
= (int)(random() % c
->total_sfds
);
2762 /* random get one replication writing server to write */
2763 sock_index
= (int)(random() % ms_setting
.rep_write_srv
);
2766 else if (cmd
== CMD_GET
)
2768 /* random get one replication server to read */
2769 sock_index
= (int)(random() % c
->total_sfds
);
2772 while (c
->tcpsfd
[sock_index
] == 0);
2775 } /* ms_get_rep_sock_index */
2779 * get the next socket descriptor index to run
2781 * @param c, pointer of the concurrency
2783 * @return int, return the index
2785 static int ms_get_next_sock_index(ms_conn_t
*c
)
2791 sock_index
= (++c
->cur_idx
== c
->total_sfds
) ? 0 : c
->cur_idx
;
2793 while (c
->tcpsfd
[sock_index
] == 0);
2796 } /* ms_get_next_sock_index */
2800 * update socket event of the connections
2802 * @param c, pointer of the concurrency
2804 * @return int, if success, return 0, else return -1
2806 static int ms_update_conn_sock_event(ms_conn_t
*c
)
2810 switch (c
->currcmd
.cmd
)
2813 if (ms_setting
.facebook_test
&& c
->udp
)
2815 c
->sfd
= c
->tcpsfd
[0];
2817 c
->change_sfd
= true;
2822 if (ms_setting
.facebook_test
&& ! c
->udp
)
2826 c
->change_sfd
= true;
2834 if (! c
->udp
&& (c
->total_sfds
> 1))
2836 if (c
->cur_idx
!= c
->total_sfds
)
2838 if (ms_setting
.rep_write_srv
== 0)
2840 c
->cur_idx
= ms_get_next_sock_index(c
);
2844 c
->cur_idx
= ms_get_rep_sock_index(c
, c
->currcmd
.cmd
);
2849 /* must select the first sock of the connection at the beginning */
2853 c
->sfd
= c
->tcpsfd
[c
->cur_idx
];
2854 assert(c
->sfd
!= 0);
2855 c
->change_sfd
= true;
2860 if (! ms_update_event(c
, EV_WRITE
| EV_PERSIST
))
2862 fprintf(stderr
, "Couldn't update event.\n");
2863 ms_conn_set_state(c
, conn_closing
);
2869 } /* ms_update_conn_sock_event */
2873 * for ASCII protocol, this function build the set command
2874 * string and send the command.
2876 * @param c, pointer of the concurrency
2877 * @param item, pointer of task item which includes the object
2880 * @return int, if success, return 0, else return -1
2882 static int ms_build_ascii_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2886 char *buffer
= c
->wbuf
;
2888 write_len
= sprintf(buffer
,
2894 if (write_len
> c
->wsize
)
2896 /* ought to be always enough. just fail for simplicity */
2897 fprintf(stderr
, "output command line too long.\n");
2901 if (item
->value_offset
== INVALID_OFFSET
)
2903 value_offset
= item
->key_suffix_offset
;
2907 value_offset
= item
->value_offset
;
2910 if ((ms_add_iov(c
, "set ", 4) != 0)
2911 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
2912 (int)KEY_PREFIX_SIZE
) != 0)
2913 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2914 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0)
2915 || (ms_add_iov(c
, buffer
, write_len
) != 0)
2916 || (ms_add_iov(c
, &ms_setting
.char_block
[value_offset
],
2917 item
->value_size
) != 0)
2918 || (ms_add_iov(c
, "\r\n", 2) != 0)
2919 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
2925 } /* ms_build_ascii_write_buf_set */
2929 * used to send set command to server
2931 * @param c, pointer of the concurrency
2932 * @param item, pointer of task item which includes the object
2935 * @return int, if success, return 0, else return -1
2937 int ms_mcd_set(ms_conn_t
*c
, ms_task_item_t
*item
)
2941 c
->currcmd
.cmd
= CMD_SET
;
2942 c
->currcmd
.isfinish
= false;
2943 c
->currcmd
.retstat
= MCD_FAILURE
;
2945 if (ms_update_conn_sock_event(c
) != 0)
2953 if (ms_add_msghdr(c
) != 0)
2955 fprintf(stderr
, "Out of memory preparing request.");
2959 /* binary protocol */
2960 if (c
->protocol
== binary_prot
)
2962 if (ms_build_bin_write_buf_set(c
, item
) != 0)
2969 if (ms_build_ascii_write_buf_set(c
, item
) != 0)
2975 __sync_fetch_and_add(&ms_stats
.obj_bytes
,
2976 item
->key_size
+ item
->value_size
);
2977 __sync_fetch_and_add(&ms_stats
.cmd_set
, 1);
2984 * for ASCII protocol, this function build the get command
2985 * string and send the command.
2987 * @param c, pointer of the concurrency
2988 * @param item, pointer of task item which includes the object
2991 * @return int, if success, return 0, else return -1
2993 static int ms_build_ascii_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
)
2995 if ((ms_add_iov(c
, "get ", 4) != 0)
2996 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
2997 (int)KEY_PREFIX_SIZE
) != 0)
2998 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
2999 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0)
3000 || (ms_add_iov(c
, "\r\n", 2) != 0)
3001 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
3007 } /* ms_build_ascii_write_buf_get */
3011 * used to send the get command to server
3013 * @param c, pointer of the concurrency
3014 * @param item, pointer of task item which includes the object
3016 * @param verify, whether do verification
3018 * @return int, if success, return 0, else return -1
3020 int ms_mcd_get(ms_conn_t
*c
, ms_task_item_t
*item
, bool verify
)
3022 /* verify not supported yet */
3023 UNUSED_ARGUMENT(verify
);
3027 c
->currcmd
.cmd
= CMD_GET
;
3028 c
->currcmd
.isfinish
= false;
3029 c
->currcmd
.retstat
= MCD_FAILURE
;
3031 if (ms_update_conn_sock_event(c
) != 0)
3039 if (ms_add_msghdr(c
) != 0)
3041 fprintf(stderr
, "Out of memory preparing request.");
3045 /* binary protocol */
3046 if (c
->protocol
== binary_prot
)
3048 if (ms_build_bin_write_buf_get(c
, item
) != 0)
3055 if (ms_build_ascii_write_buf_get(c
, item
) != 0)
3061 __sync_fetch_and_add(&ms_stats
.cmd_get
, 1);
3068 * for ASCII protocol, this function build the multi-get command
3069 * string and send the command.
3071 * @param c, pointer of the concurrency
3073 * @return int, if success, return 0, else return -1
3075 static int ms_build_ascii_write_buf_mlget(ms_conn_t
*c
)
3077 ms_task_item_t
*item
;
3079 if (ms_add_iov(c
, "get", 3) != 0)
3084 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3086 item
= c
->mlget_task
.mlget_item
[i
].item
;
3087 assert(item
!= NULL
);
3088 if ((ms_add_iov(c
, " ", 1) != 0)
3089 || (ms_add_iov(c
, (char *)&item
->key_prefix
,
3090 (int)KEY_PREFIX_SIZE
) != 0)
3091 || (ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
3092 item
->key_size
- (int)KEY_PREFIX_SIZE
) != 0))
3098 if ((ms_add_iov(c
, "\r\n", 2) != 0)
3099 || (c
->udp
&& (ms_build_udp_headers(c
) != 0)))
3105 } /* ms_build_ascii_write_buf_mlget */
3109 * used to send the multi-get command to server
3111 * @param c, pointer of the concurrency
3113 * @return int, if success, return 0, else return -1
3115 int ms_mcd_mlget(ms_conn_t
*c
)
3117 ms_task_item_t
*item
;
3120 assert(c
->mlget_task
.mlget_num
>= 1);
3122 c
->currcmd
.cmd
= CMD_GET
;
3123 c
->currcmd
.isfinish
= false;
3124 c
->currcmd
.retstat
= MCD_FAILURE
;
3126 if (ms_update_conn_sock_event(c
) != 0)
3134 if (ms_add_msghdr(c
) != 0)
3136 fprintf(stderr
, "Out of memory preparing request.");
3140 /* binary protocol */
3141 if (c
->protocol
== binary_prot
)
3143 if (ms_build_bin_write_buf_mlget(c
) != 0)
3150 if (ms_build_ascii_write_buf_mlget(c
) != 0)
3156 /* decrease operation time of each item */
3157 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3159 item
= c
->mlget_task
.mlget_item
[i
].item
;
3160 __sync_fetch_and_add(&ms_stats
.cmd_get
, 1);
3164 } /* ms_mcd_mlget */
3168 * binary protocol support
3172 * for binary protocol, parse the response of server
3174 * @param c, pointer of the concurrency
3176 * @return int, if success, return 0, else return -1
3178 static int ms_bin_process_response(ms_conn_t
*c
)
3180 const char *errstr
= NULL
;
3184 uint32_t bodylen
= c
->binary_header
.response
.bodylen
;
3185 uint8_t opcode
= c
->binary_header
.response
.opcode
;
3186 uint16_t status
= c
->binary_header
.response
.status
;
3190 c
->rvbytes
= (int32_t)bodylen
;
3198 case PROTOCOL_BINARY_RESPONSE_SUCCESS
:
3199 if (opcode
== PROTOCOL_BINARY_CMD_SET
)
3201 c
->currcmd
.retstat
= MCD_STORED
;
3203 else if (opcode
== PROTOCOL_BINARY_CMD_DELETE
)
3205 c
->currcmd
.retstat
= MCD_DELETED
;
3207 else if (opcode
== PROTOCOL_BINARY_CMD_GET
)
3209 c
->currcmd
.retstat
= MCD_END
;
3213 case PROTOCOL_BINARY_RESPONSE_ENOMEM
:
3214 errstr
= "Out of memory";
3215 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
3218 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
:
3219 errstr
= "Unknown command";
3220 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
3223 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
:
3224 errstr
= "Not found";
3225 c
->currcmd
.retstat
= MCD_NOTFOUND
;
3228 case PROTOCOL_BINARY_RESPONSE_EINVAL
:
3229 errstr
= "Invalid arguments";
3230 c
->currcmd
.retstat
= MCD_PROTOCOL_ERROR
;
3233 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
:
3234 errstr
= "Data exists for key.";
3237 case PROTOCOL_BINARY_RESPONSE_E2BIG
:
3238 errstr
= "Too large.";
3239 c
->currcmd
.retstat
= MCD_SERVER_ERROR
;
3242 case PROTOCOL_BINARY_RESPONSE_NOT_STORED
:
3243 errstr
= "Not stored.";
3244 c
->currcmd
.retstat
= MCD_NOTSTORED
;
3248 errstr
= "Unknown error";
3249 c
->currcmd
.retstat
= MCD_UNKNOWN_READ_FAILURE
;
3255 fprintf(stderr
, "%s\n", errstr
);
3260 } /* ms_bin_process_response */
3263 /* build binary header and add the header to the buffer to send */
3266 * build binary header and add the header to the buffer to send
3268 * @param c, pointer of the concurrency
3269 * @param opcode, operation code
3270 * @param hdr_len, length of header
3271 * @param key_len, length of key
3272 * @param body_len. length of body
3274 static void ms_add_bin_header(ms_conn_t
*c
,
3280 protocol_binary_request_header
*header
;
3284 header
= (protocol_binary_request_header
*)c
->wcurr
;
3286 header
->request
.magic
= (uint8_t)PROTOCOL_BINARY_REQ
;
3287 header
->request
.opcode
= (uint8_t)opcode
;
3288 header
->request
.keylen
= htonl(key_len
);
3290 header
->request
.extlen
= (uint8_t)hdr_len
;
3291 header
->request
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
;
3292 header
->request
.reserved
= 0;
3294 header
->request
.bodylen
= htonl(body_len
);
3295 header
->request
.opaque
= 0;
3296 header
->request
.cas
= 0;
3298 ms_add_iov(c
, c
->wcurr
, sizeof(header
->request
));
3299 } /* ms_add_bin_header */
3303 * add the key to the socket write buffer array
3305 * @param c, pointer of the concurrency
3306 * @param item, pointer of task item which includes the object
3309 static void ms_add_key_to_iov(ms_conn_t
*c
, ms_task_item_t
*item
)
3311 ms_add_iov(c
, (char *)&item
->key_prefix
, (int)KEY_PREFIX_SIZE
);
3312 ms_add_iov(c
, &ms_setting
.char_block
[item
->key_suffix_offset
],
3313 item
->key_size
- (int)KEY_PREFIX_SIZE
);
3318 * for binary protocol, this function build the set command
3319 * and add the command to send buffer array.
3321 * @param c, pointer of the concurrency
3322 * @param item, pointer of task item which includes the object
3325 * @return int, if success, return 0, else return -1
3327 static int ms_build_bin_write_buf_set(ms_conn_t
*c
, ms_task_item_t
*item
)
3329 assert(c
->wbuf
== c
->wcurr
);
3332 protocol_binary_request_set
*rep
= (protocol_binary_request_set
*)c
->wcurr
;
3333 uint16_t keylen
= (uint16_t)item
->key_size
;
3334 uint32_t bodylen
= (uint32_t)sizeof(rep
->message
.body
)
3335 + (uint32_t)keylen
+ (uint32_t)item
->value_size
;
3337 ms_add_bin_header(c
,
3338 PROTOCOL_BINARY_CMD_SET
,
3339 sizeof(rep
->message
.body
),
3342 rep
->message
.body
.flags
= 0;
3343 rep
->message
.body
.expiration
= htonl((uint32_t)item
->exp_time
);
3344 ms_add_iov(c
, &rep
->message
.body
, sizeof(rep
->message
.body
));
3345 ms_add_key_to_iov(c
, item
);
3347 if (item
->value_offset
== INVALID_OFFSET
)
3349 value_offset
= item
->key_suffix_offset
;
3353 value_offset
= item
->value_offset
;
3355 ms_add_iov(c
, &ms_setting
.char_block
[value_offset
], item
->value_size
);
3358 } /* ms_build_bin_write_buf_set */
3362 * for binary protocol, this function build the get command and
3363 * add the command to send buffer array.
3365 * @param c, pointer of the concurrency
3366 * @param item, pointer of task item which includes the object
3369 * @return int, if success, return 0, else return -1
3371 static int ms_build_bin_write_buf_get(ms_conn_t
*c
, ms_task_item_t
*item
)
3373 assert(c
->wbuf
== c
->wcurr
);
3375 ms_add_bin_header(c
, PROTOCOL_BINARY_CMD_GET
, 0, (uint16_t)item
->key_size
,
3376 (uint32_t)item
->key_size
);
3377 ms_add_key_to_iov(c
, item
);
3380 } /* ms_build_bin_write_buf_get */
3384 * for binary protocol, this function build the multi-get
3385 * command and add the command to send buffer array.
3387 * @param c, pointer of the concurrency
3388 * @param item, pointer of task item which includes the object
3391 * @return int, if success, return 0, else return -1
3393 static int ms_build_bin_write_buf_mlget(ms_conn_t
*c
)
3395 ms_task_item_t
*item
;
3397 assert(c
->wbuf
== c
->wcurr
);
3399 for (int i
= 0; i
< c
->mlget_task
.mlget_num
; i
++)
3401 item
= c
->mlget_task
.mlget_item
[i
].item
;
3402 assert(item
!= NULL
);
3404 ms_add_bin_header(c
,
3405 PROTOCOL_BINARY_CMD_GET
,
3407 (uint16_t)item
->key_size
,
3408 (uint32_t)item
->key_size
);
3409 ms_add_key_to_iov(c
, item
);
3410 c
->wcurr
+= sizeof(protocol_binary_request_get
);
3416 } /* ms_build_bin_write_buf_mlget */