X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=src%2Fbin%2Fcontrib%2Fmemaslap%2Fms_conn.h;fp=src%2Fbin%2Fcontrib%2Fmemaslap%2Fms_conn.h;h=426eae82d44264d5a39571d5692ee9f7a1e5a970;hb=77fc9c3ba95eb502d1a66146251acfcfd5606df8;hp=0000000000000000000000000000000000000000;hpb=a7e11259b99326ef0d32a1029166aa53546c2708;p=awesomized%2Flibmemcached diff --git a/src/bin/contrib/memaslap/ms_conn.h b/src/bin/contrib/memaslap/ms_conn.h new file mode 100644 index 00000000..426eae82 --- /dev/null +++ b/src/bin/contrib/memaslap/ms_conn.h @@ -0,0 +1,229 @@ +/* + +--------------------------------------------------------------------+ + | libmemcached - C/C++ Client Library for memcached | + +--------------------------------------------------------------------+ + | Redistribution and use in source and binary forms, with or without | + | modification, are permitted under the terms of the BSD license. | + | You should have received a copy of the license in a bundled file | + | named LICENSE; in case you did not receive a copy you can review | + | the terms online at: https://opensource.org/licenses/BSD-3-Clause | + +--------------------------------------------------------------------+ + | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ | + | Copyright (c) 2020 Michael Wallner | + +--------------------------------------------------------------------+ +*/ + +#ifndef MS_CONN_H +#define MS_CONN_H + +#include +#include +#include +#include + +#include "ms_task.h" +#include "libmemcachedprotocol-0.0/binary.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define DATA_BUFFER_SIZE \ + (1024 * 1024 + 2048) /* read buffer, 1M + 2k, enough for the max value(1M) */ +#define WRITE_BUFFER_SIZE (32 * 1024) /* write buffer, 32k */ +#define UDP_DATA_BUFFER_SIZE (1 * 1024 * 1024) /* read buffer for UDP, 1M */ +#define UDP_MAX_PAYLOAD_SIZE 1400 /* server limit UDP payload size */ +#define UDP_MAX_SEND_PAYLOAD_SIZE 1400 /* mtu size is 1500 */ +#define UDP_HEADER_SIZE 8 /* UDP header size */ +#define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* Maximum socket buffer size */ +#define SOCK_WAIT_TIMEOUT 30 /* maximum waiting time of UDP, 30s */ +#define MAX_UDP_PACKET (1 << 16) /* maximum UDP packets, 65536 */ + +/* Initial size of the sendmsg() scatter/gather array. */ +#define IOV_LIST_INITIAL 400 + +/* Initial number of sendmsg() argument structures to allocate. */ +#define MSG_LIST_INITIAL 10 + +/* High water marks for buffer shrinking */ +#define READ_BUFFER_HIGHWAT (2 * DATA_BUFFER_SIZE) +#define UDP_DATA_BUFFER_HIGHWAT (4 * UDP_DATA_BUFFER_SIZE) +#define IOV_LIST_HIGHWAT 600 +#define MSG_LIST_HIGHWAT 100 + +/* parse udp header */ +#define HEADER_TO_REQID(ptr) ((uint16_t) *ptr * 256 + (uint16_t) * (ptr + 1)) +#define HEADER_TO_SEQNUM(ptr) ((uint16_t) * (ptr + 2) * 256 + (uint16_t) * (ptr + 3)) +#define HEADER_TO_PACKETS(ptr) ((uint16_t) * (ptr + 4) * 256 + (uint16_t) * (ptr + 5)) + +/* states of connection */ +enum conn_states { + conn_read, /* reading in a command line */ + conn_write, /* writing out a simple response */ + conn_closing /* closing this connection */ +}; + +/* returned states of memcached command */ +enum mcd_ret { + MCD_SUCCESS, /* command success */ + MCD_FAILURE, /* command failure */ + MCD_UNKNOWN_READ_FAILURE, /* unknown read failure */ + MCD_PROTOCOL_ERROR, /* protocol error */ + MCD_CLIENT_ERROR, /* client error, wrong command */ + MCD_SERVER_ERROR, /* server error, server run command failed */ + MCD_DATA_EXISTS, /* object is existent in server */ + MCD_NOTSTORED, /* server doesn't set the object successfully */ + MCD_STORED, /* server set the object successfully */ + MCD_NOTFOUND, /* server not find the object */ + MCD_END, /* end of the response of get command */ + MCD_DELETED, /* server delete the object successfully */ + MCD_STAT /* response of stats command */ +}; + +/* used to store the current or previous running command state */ +typedef struct cmdstat { + int cmd; /* command name */ + int retstat; /* return state of this command */ + bool isfinish; /* if it read all the response data */ + uint64_t key_prefix; /* key prefix */ +} ms_cmdstat_t; + +/* udp packet structure */ +typedef struct udppkt { + uint8_t *header; /* udp header of the packet */ + char *data; /* udp data of the packet */ + int rbytes; /* number of data in the packet */ + int copybytes; /* number of copied data in the packet */ +} ms_udppkt_t; + +/* three protocols supported */ +enum protocol { + ascii_prot = 3, /* ASCII protocol */ + binary_prot /* binary protocol */ +}; + +/** + * concurrency structure + * + * Each thread has a libevent to manage the events of network. + * Each thread has one or more self-governed concurrencies; + * each concurrency has one or more socket connections. This + * concurrency structure includes all the private variables of + * the concurrency. + */ +typedef struct conn { + uint32_t conn_idx; /* connection index in the thread */ + int sfd; /* current tcp sock handler of the connection structure */ + int udpsfd; /* current udp sock handler of the connection structure*/ + int state; /* state of the connection */ + struct event event; /* event for libevent */ + short ev_flags; /* event flag for libevent */ + short which; /* which events were just triggered */ + bool change_sfd; /* whether change sfd */ + + int *tcpsfd; /* TCP sock array */ + uint32_t total_sfds; /* how many socks in the tcpsfd array */ + uint32_t alive_sfds; /* alive socks */ + uint32_t cur_idx; /* current sock index in tcpsfd array */ + + ms_cmdstat_t precmd; /* previous command state */ + ms_cmdstat_t currcmd; /* current command state */ + + char *rbuf; /* buffer to read commands into */ + char *rcurr; /* but if we parsed some already, this is where we stopped */ + int rsize; /* total allocated size of rbuf */ + int rbytes; /* how much data, starting from rcur, do we have unparsed */ + + bool readval; /* read value state, read known data size */ + int rvbytes; /* total value size need to read */ + + char *wbuf; /* buffer to write commands out */ + char *wcurr; /* for multi-get, where we stopped */ + int wsize; /* total allocated size of wbuf */ + bool ctnwrite; /* continue to write */ + + /* data for the mwrite state */ + struct iovec *iov; + int iovsize; /* number of elements allocated in iov[] */ + int iovused; /* number of elements used in iov[] */ + + struct msghdr *msglist; + int msgsize; /* number of elements allocated in msglist[] */ + int msgused; /* number of elements used in msglist[] */ + int msgcurr; /* element in msglist[] being transmitted now */ + int msgbytes; /* number of bytes in current msg */ + + /* data for UDP clients */ + bool udp; /* is this is a UDP "connection" */ + uint32_t request_id; /* UDP request ID of current operation, if this is a UDP "connection" */ + uint8_t *hdrbuf; /* udp packet headers */ + int hdrsize; /* number of headers' worth of space is allocated */ + struct sockaddr srv_recv_addr; /* Sent the most recent request to which server */ + socklen_t srv_recv_addr_size; + + /* udp read buffer */ + char *rudpbuf; /* buffer to read commands into for udp */ + int rudpsize; /* total allocated size of rudpbuf */ + int rudpbytes; /* how much data, starting from rudpbuf */ + + /* order udp packet */ + ms_udppkt_t *udppkt; /* the offset of udp packet in rudpbuf */ + int packets; /* number of total packets need to read */ + int recvpkt; /* number of received packets */ + int pktcurr; /* current packet in rudpbuf being ordered */ + int ordcurr; /* current ordered packet */ + + ms_task_item_t *item_win; /* task sequence */ + int win_size; /* current task window size */ + uint64_t set_cursor; /* current set item index in the item window */ + ms_task_t curr_task; /* current running task */ + ms_mlget_task_t mlget_task; /* multi-get task */ + + int warmup_num; /* to run how many warm up operations*/ + int remain_warmup_num; /* left how many warm up operations to run */ + int64_t exec_num; /* to run how many task operations */ + int64_t remain_exec_num; /* how many remained task operations to run */ + + /* response time statistic and time out control */ + struct timeval start_time; /* start time of current operation(s) */ + struct timeval end_time; /* end time of current operation(s) */ + + /* Binary protocol stuff */ + protocol_binary_response_header binary_header; /* local temporary binary header */ + enum protocol protocol; /* which protocol this connection speaks */ +} ms_conn_t; + +/* used to generate the key prefix */ +uint64_t ms_get_key_prefix(void); + +/** + * setup a connection, each connection structure of each + * thread must call this function to initialize. + */ +int ms_setup_conn(ms_conn_t *c); + +/* after one operation completes, reset the connection */ +void ms_reset_conn(ms_conn_t *c, bool timeout); + +/** + * reconnect several disconnected socks in the connection + * structure, the ever-1-second timer of the thread will check + * whether some socks in the connections disconnect. if + * disconnect, reconnect the sock. + */ +int ms_reconn_socks(ms_conn_t *c); + +/* used to send set command to server */ +int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item); + +/* used to send the get command to server */ +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item); + +/* used to send the multi-get command to server */ +int ms_mcd_mlget(ms_conn_t *c); + +#ifdef __cplusplus +} +#endif + +#endif /* end of MS_CONN_H */