3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
14 #include <sys/socket.h>
15 #include <netinet/in.h>
20 #include "protocol_binary.h"
26 #define DATA_BUFFER_SIZE (1024 * 1024 + 2048) /* read buffer, 1M + 2k, enough for the max value(1M) */
27 #define WRITE_BUFFER_SIZE (32 * 1024) /* write buffer, 32k */
28 #define UDP_DATA_BUFFER_SIZE (1 * 1024 * 1024) /* read buffer for UDP, 1M */
29 #define UDP_MAX_PAYLOAD_SIZE 1400 /* server limit UDP payload size */
30 #define UDP_MAX_SEND_PAYLOAD_SIZE 1400 /* mtu size is 1500 */
31 #define UDP_HEADER_SIZE 8 /* UDP header size */
32 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* Maximum socket buffer size */
33 #define SOCK_WAIT_TIMEOUT 10 /* maximum waiting time of UDP, 10s */
34 #define EVENT_TIMEOUT 10 /* maximum waiting time of event,10s */
35 #define MAX_UDP_PACKET (1 << 16) /* maximum UDP packets, 65536 */
37 /* Initial size of the sendmsg() scatter/gather array. */
38 #define IOV_LIST_INITIAL 400
40 /* Initial number of sendmsg() argument structures to allocate. */
41 #define MSG_LIST_INITIAL 10
43 /* High water marks for buffer shrinking */
44 #define READ_BUFFER_HIGHWAT (2 * DATA_BUFFER_SIZE)
45 #define UDP_DATA_BUFFER_HIGHWAT (4 * UDP_DATA_BUFFER_SIZE)
46 #define IOV_LIST_HIGHWAT 600
47 #define MSG_LIST_HIGHWAT 100
49 /* parse udp header */
50 #define HEADER_TO_REQID(ptr) ((uint16_t)*ptr * 256 + (uint16_t)*(ptr + 1))
51 #define HEADER_TO_SEQNUM(ptr) ((uint16_t)*(ptr + 2) * 256 + (uint16_t)*(ptr + 3))
52 #define HEADER_TO_PACKETS(ptr) ((uint16_t)*(ptr + 4) * 256 + (uint16_t)*(ptr + 5))
54 /* states of connection */
56 conn_read
, /* reading in a command line */
57 conn_write
, /* writing out a simple response */
58 conn_closing
, /* closing this connection */
61 /* returned states of memcached command */
63 MCD_SUCCESS
, /* command success */
64 MCD_FAILURE
, /* command failure */
65 MCD_UNKNOWN_READ_FAILURE
, /* unknown read failure */
66 MCD_PROTOCOL_ERROR
, /* protocol error */
67 MCD_CLIENT_ERROR
, /* client error, wrong command */
68 MCD_SERVER_ERROR
, /* server error, server run command failed */
69 MCD_DATA_EXISTS
, /* object is existent in server */
70 MCD_NOTSTORED
, /* server doesn't set the object successfully */
71 MCD_STORED
, /* server set the object successfully */
72 MCD_NOTFOUND
, /* server not find the object */
73 MCD_END
, /* end of the response of get command */
74 MCD_DELETED
, /* server delete the object successfully */
75 MCD_STAT
, /* response of stats command */
78 /* used to store the current or previous running command state */
79 typedef struct cmdstat
{
80 int cmd
; /* command name */
81 int retstat
; /* return state of this command */
82 bool isfinish
; /* if it read all the response data */
83 uint64_t key_prefix
; /* key prefix */
86 /* udp packet structure */
87 typedef struct udppkt
{
88 uint8_t *header
; /* udp header of the packet */
89 char *data
; /* udp data of the packet */
90 int rbytes
; /* number of data in the packet */
91 int copybytes
; /* number of copied data in the packet */
94 /* three protocols supported */
96 ascii_prot
= 3, /* ASCII protocol */
97 ascii_udp_prot
, /* ASCII UDP protocol*/
98 binary_prot
, /* binary protocol */
102 * concurrency structure
104 * Each thread has a libevent to manage the events of network.
105 * Each thread has one or more self-governed concurrencies;
106 * each concurrency has one or more socket connections. This
107 * concurrency structure includes all the private variables of
110 typedef struct conn
{
111 int conn_idx
; /* connection index in the thread */
112 int sfd
; /* current tcp sock handler of the connection structure */
113 int udpsfd
; /* current udp sock handler of the connection structure*/
114 int state
; /* state of the connection */
115 struct event event
; /* event for libevent */
116 short ev_flags
; /* event flag for libevent */
117 short which
; /* which events were just triggered */
118 bool change_sfd
; /* whether change sfd */
120 int *tcpsfd
; /* TCP sock array */
121 int total_sfds
; /* how many socks in the tcpsfd array */
122 int alive_sfds
; /* alive socks */
123 int cur_idx
; /* current sock index in tcpsfd array */
125 ms_cmdstat_t precmd
; /* previous command state */
126 ms_cmdstat_t currcmd
; /* current command state */
128 char *rbuf
; /* buffer to read commands into */
129 char *rcurr
; /* but if we parsed some already, this is where we stopped */
130 int rsize
; /* total allocated size of rbuf */
131 int rbytes
; /* how much data, starting from rcur, do we have unparsed */
133 bool readval
; /* read value state, read known data size */
134 int rvbytes
; /* total value size need to read */
136 char *wbuf
; /* buffer to write commands out */
137 char *wcurr
; /* for multi-get, where we stopped */
138 int wsize
; /* total allocated size of wbuf */
139 bool ctnwrite
; /* continue to write */
141 /* data for the mwrite state */
143 int iovsize
; /* number of elements allocated in iov[] */
144 int iovused
; /* number of elements used in iov[] */
146 struct msghdr
*msglist
;
147 int msgsize
; /* number of elements allocated in msglist[] */
148 int msgused
; /* number of elements used in msglist[] */
149 int msgcurr
; /* element in msglist[] being transmitted now */
150 int msgbytes
; /* number of bytes in current msg */
152 /* data for UDP clients */
153 int udp
; /* is this is a UDP "connection" */
154 int request_id
; /* UDP request ID of current operation, if this is a UDP "connection" */
155 uint8_t *hdrbuf
; /* udp packet headers */
156 int hdrsize
; /* number of headers' worth of space is allocated */
157 struct sockaddr srv_recv_addr
; /* Sent the most recent request to which server */
158 socklen_t srv_recv_addr_size
;
160 /* udp read buffer */
161 char *rudpbuf
; /* buffer to read commands into for udp */
162 int rudpsize
; /* total allocated size of rudpbuf */
163 int rudpbytes
; /* how much data, starting from rudpbuf */
165 /* order udp packet */
166 ms_udppkt_t
*udppkt
; /* the offset of udp packet in rudpbuf */
167 int packets
; /* number of total packets need to read */
168 int recvpkt
; /* number of received packets */
169 int pktcurr
; /* current packet in rudpbuf being ordered */
170 int ordcurr
; /* current ordered packet */
172 ms_task_item_t
*item_win
; /* task sequence */
173 int win_size
; /* current task window size */
174 uint64_t set_cursor
; /* current set item index in the item window */
175 ms_task_t curr_task
; /* current running task */
176 ms_mlget_task_t mlget_task
; /* multi-get task */
178 int warmup_num
; /* to run how many warm up operations*/
179 int remain_warmup_num
; /* left how many warm up operations to run */
180 int64_t exec_num
; /* to run how many task operations */
181 int64_t remain_exec_num
; /* how many remained task operations to run */
183 /* response time statistic and time out control */
184 struct timeval start_time
; /* start time of current operation(s) */
185 struct timeval end_time
; /* end time of current operation(s) */
187 /* Binary protocol stuff */
188 protocol_binary_response_header binary_header
; /* local temporary binary header */
189 enum protocol protocol
; /* which protocol this connection speaks */
192 /* used to generate the key prefix */
193 uint64_t ms_get_key_prefix(void);
196 * setup a connection, each connection structure of each
197 * thread must call this function to initialize.
199 int ms_setup_conn(ms_conn_t
*c
);
201 /* after one operation completes, reset the connection */
202 void ms_reset_conn(ms_conn_t
*c
, bool timeout
);
205 * reconnect several disconnected socks in the connection
206 * structure, the ever-1-second timer of the thread will check
207 * whether some socks in the connections disconnect. if
208 * disconnect, reconnect the sock.
210 int ms_reconn_socks(ms_conn_t
*c
);
212 /* used to send set command to server */
213 int ms_mcd_set(ms_conn_t
*c
, ms_task_item_t
*item
);
215 /* used to send the get command to server */
216 int ms_mcd_get(ms_conn_t
*c
, ms_task_item_t
*item
, bool verify
);
218 /* used to send the multi-get command to server */
219 int ms_mcd_mlget(ms_conn_t
*c
);
225 #endif /* end of MS_CONN_H */