X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=src%2Fbin%2Fcontrib%2Fmemaslap%2Fms_conn.c;fp=src%2Fbin%2Fcontrib%2Fmemaslap%2Fms_conn.c;h=f4375ae87bea7c318859d550fcd8b19b0991615b;hb=77fc9c3ba95eb502d1a66146251acfcfd5606df8;hp=0000000000000000000000000000000000000000;hpb=a7e11259b99326ef0d32a1029166aa53546c2708;p=awesomized%2Flibmemcached diff --git a/src/bin/contrib/memaslap/ms_conn.c b/src/bin/contrib/memaslap/ms_conn.c new file mode 100644 index 00000000..f4375ae8 --- /dev/null +++ b/src/bin/contrib/memaslap/ms_conn.c @@ -0,0 +1,2864 @@ +/* + +--------------------------------------------------------------------+ + | libmemcached - C/C++ Client Library for memcached | + +--------------------------------------------------------------------+ + | Redistribution and use in source and binary forms, with or without | + | modification, are permitted under the terms of the BSD license. | + | You should have received a copy of the license in a bundled file | + | named LICENSE; in case you did not receive a copy you can review | + | the terms online at: https://opensource.org/licenses/BSD-3-Clause | + +--------------------------------------------------------------------+ + | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ | + | Copyright (c) 2020 Michael Wallner | + +--------------------------------------------------------------------+ +*/ + +#include "mem_config.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(HAVE_ARPA_INET_H) +# include +#endif + +#if defined(HAVE_SYS_TIME_H) +# include +#endif + +#if defined(HAVE_TIME_H) +# include +#endif + +#include "ms_setting.h" +#include "ms_thread.h" +#include "ms_atomic.h" + +#ifdef linux +/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to + * optimize the conversion functions, but the prototypes generate warnings + * from gcc. The conversion methods isn't the bottleneck for my app, so + * just remove the warnings by undef'ing the optimization .. + */ +# undef ntohs +# undef ntohl +# undef htons +# undef htonl +#endif + +/* for network write */ +#define TRANSMIT_COMPLETE 0 +#define TRANSMIT_INCOMPLETE 1 +#define TRANSMIT_SOFT_ERROR 2 +#define TRANSMIT_HARD_ERROR 3 + +/* for generating key */ +#define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */ +#define KEY_PREFIX_MASK 0x1010101010101010 + +/* For parse the value length return by server */ +#define KEY_TOKEN 1 +#define VALUELEN_TOKEN 3 + +/* global increasing counter, to ensure the key prefix unique */ +static uint64_t key_prefix_seq = KEY_PREFIX_BASE; + +/* global increasing counter, generating request id for UDP */ +static ATOMIC uint32_t udp_request_id = 0; + +extern pthread_key_t ms_thread_key; + +/* generate upd request id */ +static uint32_t ms_get_udp_request_id(void); + +/* connect initialize */ +static void ms_task_init(ms_conn_t *c); +static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp); +static int ms_conn_sock_init(ms_conn_t *c); +static int ms_conn_event_init(ms_conn_t *c); +static int ms_conn_init(ms_conn_t *c, const int init_state, const int read_buffer_size, + const bool is_udp); +static void ms_warmup_num_init(ms_conn_t *c); +static int ms_item_win_init(ms_conn_t *c); + +/* connection close */ +void ms_conn_free(ms_conn_t *c); +static void ms_conn_close(ms_conn_t *c); + +/* create network connection */ +static int ms_new_socket(struct addrinfo *ai); +static void ms_maximize_sndbuf(const int sfd); +static int ms_network_connect(ms_conn_t *c, char *srv_host_name, const int srv_port, + const bool is_udp, int *ret_sfd); +static int ms_reconn(ms_conn_t *c); + +/* read and parse */ +static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens); +static int ms_ascii_process_line(ms_conn_t *c, char *command); +static int ms_try_read_line(ms_conn_t *c); +static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes); +static int ms_udp_read(ms_conn_t *c, char *buf, int len); +static int ms_try_read_network(ms_conn_t *c); +static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, char *value, int vlen); +static void ms_ascii_complete_nread(ms_conn_t *c); +static void ms_bin_complete_nread(ms_conn_t *c); +static void ms_complete_nread(ms_conn_t *c); + +/* send functions */ +static int ms_add_msghdr(ms_conn_t *c); +static int ms_ensure_iov_space(ms_conn_t *c); +static int ms_add_iov(ms_conn_t *c, const void *buf, int len); +static int ms_build_udp_headers(ms_conn_t *c); +static int ms_transmit(ms_conn_t *c); + +/* status adjustment */ +static void ms_conn_shrink(ms_conn_t *c); +static void ms_conn_set_state(ms_conn_t *c, int state); +static bool ms_update_event(ms_conn_t *c, const int new_flags); +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd); +static uint32_t ms_get_next_sock_index(ms_conn_t *c); +static int ms_update_conn_sock_event(ms_conn_t *c); +static bool ms_need_yield(ms_conn_t *c); +static void ms_update_start_time(ms_conn_t *c); + +/* main loop */ +static void ms_drive_machine(ms_conn_t *c); +void ms_event_handler(const int fd, const short which, void *arg); + +/* ascii protocol */ +static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_ascii_write_buf_mlget(ms_conn_t *c); + +/* binary protocol */ +static int ms_bin_process_response(ms_conn_t *c); +static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, uint16_t key_len, + uint32_t body_len); +static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_bin_write_buf_mlget(ms_conn_t *c); + +/** + * each key has two parts, prefix and suffix. The suffix is a + * string random get form the character table. The prefix is a + * uint64_t variable. And the prefix must be unique. we use the + * prefix to identify a key. And the prefix can't include + * character ' ' '\r' '\n' '\0'. + * + * @return uint64_t + */ +uint64_t ms_get_key_prefix(void) { + uint64_t key_prefix; + + pthread_mutex_lock(&ms_global.seq_mutex); + key_prefix_seq |= KEY_PREFIX_MASK; + key_prefix = key_prefix_seq; + key_prefix_seq++; + pthread_mutex_unlock(&ms_global.seq_mutex); + + return key_prefix; +} /* ms_get_key_prefix */ + +/** + * get an unique udp request id + * + * @return an unique UDP request id + */ +static uint32_t ms_get_udp_request_id(void) { + return atomic_add_32_nv(&udp_request_id, 1); +} + +/** + * initialize current task structure + * + * @param c, pointer of the concurrency + */ +static void ms_task_init(ms_conn_t *c) { + c->curr_task.cmd = CMD_NULL; + c->curr_task.item = 0; + c->curr_task.verify = false; + c->curr_task.finish_verify = true; + c->curr_task.get_miss = true; + + c->curr_task.get_opt = 0; + c->curr_task.set_opt = 0; + c->curr_task.cycle_undo_get = 0; + c->curr_task.cycle_undo_set = 0; + c->curr_task.verified_get = 0; + c->curr_task.overwrite_set = 0; +} /* ms_task_init */ + +/** + * initialize udp for the connection structure + * + * @param c, pointer of the concurrency + * @param is_udp, whether it's udp + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) { + c->hdrbuf = 0; + c->rudpbuf = 0; + c->udppkt = 0; + + c->rudpsize = UDP_DATA_BUFFER_SIZE; + c->hdrsize = 0; + + c->rudpbytes = 0; + c->packets = 0; + c->recvpkt = 0; + c->pktcurr = 0; + c->ordcurr = 0; + + c->udp = is_udp; + + if (c->udp || (!c->udp && ms_setting.facebook_test)) { + c->rudpbuf = (char *) malloc((size_t) c->rudpsize); + c->udppkt = (ms_udppkt_t *) malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t)); + + if ((c->rudpbuf == NULL) || (c->udppkt == NULL)) { + if (c->rudpbuf) + free(c->rudpbuf); + if (c->udppkt) + free(c->udppkt); + fprintf(stderr, "malloc()\n"); + return -1; + } + memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t)); + } + + return EXIT_SUCCESS; +} /* ms_conn_udp_init */ + +/** + * initialize the connection structure + * + * @param c, pointer of the concurrency + * @param init_state, (conn_read, conn_write, conn_closing) + * @param read_buffer_size + * @param is_udp, whether it's udp + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_conn_init(ms_conn_t *c, const int init_state, const int read_buffer_size, + const bool is_udp) { + assert(c); + + c->rbuf = c->wbuf = 0; + c->iov = 0; + c->msglist = 0; + + c->rsize = read_buffer_size; + c->wsize = WRITE_BUFFER_SIZE; + c->iovsize = IOV_LIST_INITIAL; + c->msgsize = MSG_LIST_INITIAL; + + /* for replication, each connection need connect all the server */ + if (ms_setting.rep_write_srv > 0) { + c->total_sfds = ms_setting.srv_cnt * ms_setting.sock_per_conn; + } else { + c->total_sfds = ms_setting.sock_per_conn; + } + c->alive_sfds = 0; + + c->rbuf = (char *) malloc((size_t) c->rsize); + c->wbuf = (char *) malloc((size_t) c->wsize); + c->iov = (struct iovec *) malloc(sizeof(struct iovec) * (size_t) c->iovsize); + c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * (size_t) c->msgsize); + if (ms_setting.mult_key_num > 1) { + c->mlget_task.mlget_item = (ms_mlget_task_item_t *) malloc(sizeof(ms_mlget_task_item_t) + * (size_t) ms_setting.mult_key_num); + } + c->tcpsfd = (int *) malloc((size_t) c->total_sfds * sizeof(int)); + + if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL) || (c->msglist == NULL) + || (c->tcpsfd == NULL) + || ((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_item == NULL))) + { + if (c->rbuf) + free(c->rbuf); + if (c->wbuf) + free(c->wbuf); + if (c->iov) + free(c->iov); + if (c->msglist) + free(c->msglist); + if (c->mlget_task.mlget_item) + free(c->mlget_task.mlget_item); + if (c->tcpsfd) + free(c->tcpsfd); + fprintf(stderr, "malloc()\n"); + return -1; + } + + c->state = init_state; + c->rvbytes = 0; + c->rbytes = 0; + c->rcurr = c->rbuf; + c->wcurr = c->wbuf; + c->iovused = 0; + c->msgcurr = 0; + c->msgused = 0; + c->cur_idx = c->total_sfds; /* default index is a invalid value */ + + c->ctnwrite = false; + c->readval = false; + c->change_sfd = false; + + c->precmd.cmd = c->currcmd.cmd = CMD_NULL; + c->precmd.isfinish = true; /* default the previous command finished */ + c->currcmd.isfinish = false; + c->precmd.retstat = c->currcmd.retstat = MCD_FAILURE; + c->precmd.key_prefix = c->currcmd.key_prefix = 0; + + c->mlget_task.mlget_num = 0; + c->mlget_task.value_index = -1; /* default invalid value */ + + if (ms_setting.binary_prot_) { + c->protocol = binary_prot; + } else { + c->protocol = ascii_prot; + } + + /* initialize udp */ + if (ms_conn_udp_init(c, is_udp)) { + return -1; + } + + /* initialize task */ + ms_task_init(c); + + if (!(ms_setting.facebook_test && is_udp)) { + atomic_add_32(&ms_stats.active_conns, 1); + } + + return EXIT_SUCCESS; +} /* ms_conn_init */ + +/** + * when doing 100% get operation, it could preset some objects + * to warmup the server. this function is used to initialize the + * number of the objects to preset. + * + * @param c, pointer of the concurrency + */ +static void ms_warmup_num_init(ms_conn_t *c) { + /* no set operation, preset all the items in the window */ + if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) { + c->warmup_num = c->win_size; + c->remain_warmup_num = c->warmup_num; + } else { + c->warmup_num = 0; + c->remain_warmup_num = c->warmup_num; + } +} /* ms_warmup_num_init */ + +/** + * each connection has an item window, this function initialize + * the window. The window is used to generate task. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_item_win_init(ms_conn_t *c) { + ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key); + int exp_cnt = 0; + + c->win_size = (int) ms_setting.win_size; + c->set_cursor = 0; + c->exec_num = ms_thread->thread_ctx->exec_num_perconn; + c->remain_exec_num = c->exec_num; + + c->item_win = (ms_task_item_t *) malloc(sizeof(ms_task_item_t) * (size_t) c->win_size); + if (c->item_win == NULL) { + fprintf(stderr, "Can't allocate task item array for conn.\n"); + return -1; + } + memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t) c->win_size); + + for (int i = 0; i < c->win_size; i++) { + c->item_win[i].key_size = (int) ms_setting.distr[i].key_size; + c->item_win[i].key_prefix = ms_get_key_prefix(); + c->item_win[i].key_suffix_offset = ms_setting.distr[i].key_offset; + c->item_win[i].value_size = (int) ms_setting.distr[i].value_size; + c->item_win[i].value_offset = INVALID_OFFSET; /* default in invalid offset */ + c->item_win[i].client_time = 0; + + /* set expire time base on the proportion */ + if (exp_cnt < ms_setting.exp_ver_per * i) { + c->item_win[i].exp_time = FIXED_EXPIRE_TIME; + exp_cnt++; + } else { + c->item_win[i].exp_time = 0; + } + } + + ms_warmup_num_init(c); + + return EXIT_SUCCESS; +} /* ms_item_win_init */ + +/** + * each connection structure can include one or more sock + * handlers. this function create these socks and connect the + * server(s). + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_conn_sock_init(ms_conn_t *c) { + ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key); + uint32_t i; + int ret_sfd; + uint32_t srv_idx = 0; + + assert(c); + assert(c->tcpsfd); + + for (i = 0; i < c->total_sfds; i++) { + ret_sfd = 0; + if (ms_setting.rep_write_srv > 0) { + /* for replication, each connection need connect all the server */ + srv_idx = i % ms_setting.srv_cnt; + } else { + /* all the connections in a thread connects the same server */ + srv_idx = ms_thread->thread_ctx->srv_idx; + } + + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &ret_sfd) +) + { + break; + } + + if (i == 0) { + c->sfd = ret_sfd; + } + + if (!ms_setting.udp) { + c->tcpsfd[i] = ret_sfd; + } + + c->alive_sfds++; + } + + /* initialize udp sock handler if necessary */ + if (ms_setting.facebook_test) { + ret_sfd = 0; + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, true, &ret_sfd) +) + { + c->udpsfd = 0; + } else { + c->udpsfd = ret_sfd; + } + } + + if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0))) { + if (ms_setting.udp) { + close(c->sfd); + } else { + for (uint32_t j = 0; j < i; j++) { + close(c->tcpsfd[j]); + } + } + + if (c->udpsfd) { + close(c->udpsfd); + } + + return -1; + } + + return EXIT_SUCCESS; +} /* ms_conn_sock_init */ + +/** + * each connection is managed by libevent, this function + * initialize the event of the connection structure. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_conn_event_init(ms_conn_t *c) { + ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key); + short event_flags = EV_WRITE | EV_PERSIST; + + event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *) c); + event_base_set(ms_thread->base, &c->event); + c->ev_flags = event_flags; + + if (event_add(&c->event, NULL) == -1) { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_conn_event_init */ + +/** + * setup a connection, each connection structure of each + * thread must call this function to initialize. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_setup_conn(ms_conn_t *c) { + if (ms_item_win_init(c)) { + return -1; + } + + if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp)) { + return -1; + } + + if (ms_conn_sock_init(c)) { + return -1; + } + + if (ms_conn_event_init(c)) { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_setup_conn */ + +/** + * Frees a connection. + * + * @param c, pointer of the concurrency + */ +void ms_conn_free(ms_conn_t *c) { + ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key); + if (c) { + if (c->hdrbuf) + free(c->hdrbuf); + if (c->msglist) + free(c->msglist); + if (c->rbuf) + free(c->rbuf); + if (c->wbuf) + free(c->wbuf); + if (c->iov) + free(c->iov); + if (c->mlget_task.mlget_item) + free(c->mlget_task.mlget_item); + if (c->rudpbuf) + free(c->rudpbuf); + if (c->udppkt) + free(c->udppkt); + if (c->item_win) + free(c->item_win); + if (c->tcpsfd) + free(c->tcpsfd); + + if (--ms_thread->nactive_conn == 0) { + free(ms_thread->conn); + } + } +} /* ms_conn_free */ + +/** + * close a connection + * + * @param c, pointer of the concurrency + */ +static void ms_conn_close(ms_conn_t *c) { + ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key); + assert(c); + + /* delete the event, the socket and the connection */ + event_del(&c->event); + + for (uint32_t i = 0; i < c->total_sfds; i++) { + if (c->tcpsfd[i] > 0) { + close(c->tcpsfd[i]); + } + } + c->sfd = 0; + + if (ms_setting.facebook_test) { + close(c->udpsfd); + } + + atomic_dec_32(&ms_stats.active_conns); + + ms_conn_free(c); + + if (ms_setting.run_time == 0) { + pthread_mutex_lock(&ms_global.run_lock.lock); + ms_global.run_lock.count++; + pthread_cond_signal(&ms_global.run_lock.cond); + pthread_mutex_unlock(&ms_global.run_lock.lock); + } + + if (ms_thread->nactive_conn == 0) { + event_base_loopbreak(ms_thread->base); + } +} /* ms_conn_close */ + +/** + * create a new sock + * + * @param ai, server address information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_new_socket(struct addrinfo *ai) { + int sfd; + + if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) { + fprintf(stderr, "socket() error: %s.\n", strerror(errno)); + return -1; + } + + return sfd; +} /* ms_new_socket */ + +/** + * Sets a socket's send buffer size to the maximum allowed by the system. + * + * @param sfd, file descriptor of socket + */ +static void ms_maximize_sndbuf(const int sfd) { + socklen_t intsize = sizeof(int); + unsigned int last_good = 0; + unsigned int min, max, avg; + unsigned int old_size; + + /* Start with the default size. */ + if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize)) { + fprintf(stderr, "getsockopt(SO_SNDBUF)\n"); + return; + } + + /* Binary-search for the real maximum. */ + min = old_size; + max = MAX_SENDBUF_SIZE; + + while (min <= max) { + avg = ((unsigned int) (min + max)) / 2; + if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *) &avg, intsize) == 0) { + last_good = avg; + min = avg + 1; + } else { + max = avg - 1; + } + } + (void) last_good; +} /* ms_maximize_sndbuf */ + +/** + * socket connects the server + * + * @param c, pointer of the concurrency + * @param srv_host_name, the host name of the server + * @param srv_port, port of server + * @param is_udp, whether it's udp + * @param ret_sfd, the connected socket file descriptor + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_network_connect(ms_conn_t *c, char *srv_host_name, const int srv_port, + const bool is_udp, int *ret_sfd) { + int sfd; + struct linger ling = {0, 0}; + struct addrinfo *ai; + struct addrinfo *next; + struct addrinfo hints; + char port_buf[NI_MAXSERV]; + int error; + int success = 0; + + int flags = 1; + + /* + * the memset call clears nonstandard fields in some impementations + * that otherwise mess things up. + */ + memset(&hints, 0, sizeof(hints)); +#ifdef AI_ADDRCONFIG + hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; +#else + hints.ai_flags = AI_PASSIVE; +#endif /* AI_ADDRCONFIG */ + if (is_udp) { + hints.ai_protocol = IPPROTO_UDP; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_family = AF_INET; /* This left here because of issues with OSX 10.5 */ + } else { + hints.ai_family = AF_UNSPEC; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_socktype = SOCK_STREAM; + } + + snprintf(port_buf, NI_MAXSERV, "%d", srv_port); + error = getaddrinfo(srv_host_name, port_buf, &hints, &ai); + if (error) { + if (error != EAI_SYSTEM) + fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error)); + else + perror("getaddrinfo()"); + + return -1; + } + + for (next = ai; next; next = next->ai_next) { + if ((sfd = ms_new_socket(next)) == -1) { + freeaddrinfo(ai); + return -1; + } + + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *) &flags, sizeof(flags)); + if (is_udp) { + ms_maximize_sndbuf(sfd); + } else { + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &flags, sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *) &ling, sizeof(ling)); + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *) &flags, sizeof(flags)); + } + + if (is_udp) { + c->srv_recv_addr_size = sizeof(struct sockaddr); + memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size); + } else { + if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) { + close(sfd); + freeaddrinfo(ai); + return -1; + } + } + + if (((flags = fcntl(sfd, F_GETFL, 0)) < 0) || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)) { + fprintf(stderr, "setting O_NONBLOCK\n"); + close(sfd); + freeaddrinfo(ai); + return -1; + } + + if (ret_sfd) { + *ret_sfd = sfd; + } + + success++; + } + + freeaddrinfo(ai); + + /* Return zero if we detected no errors in starting up connections */ + return success == 0; +} /* ms_network_connect */ + +/** + * reconnect a disconnected sock + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_reconn(ms_conn_t *c) { + ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key); + uint32_t srv_idx = 0; + uint32_t srv_conn_cnt = 0; + + if (ms_setting.rep_write_srv > 0) { + srv_idx = c->cur_idx % ms_setting.srv_cnt; + srv_conn_cnt = ms_setting.sock_per_conn * ms_setting.nconns; + } else { + srv_idx = ms_thread->thread_ctx->srv_idx; + srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt; + } + + /* close the old socket handler */ + close(c->sfd); + c->tcpsfd[c->cur_idx] = 0; + + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) % srv_conn_cnt == 0) { + gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); + fprintf(stderr, "Server %s:%d disconnect\n", ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port); + } + + if (ms_setting.rep_write_srv > 0) { + uint32_t i = 0; + + for (i = 0; i < c->total_sfds; i++) { + if (c->tcpsfd[i]) { + break; + } + } + + /* all socks disconnect */ + if (i == c->total_sfds) { + return -1; + } + } else { + do { + /* reconnect success, break the loop */ + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &c->sfd) + == 0) + { + c->tcpsfd[c->cur_idx] = c->sfd; + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) % (uint32_t) srv_conn_cnt + == 0) { + gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); + int reconn_time = (int) (ms_setting.servers[srv_idx].reconn_time.tv_sec + - ms_setting.servers[srv_idx].disconn_time.tv_sec); + fprintf(stderr, "Server %s:%d reconnect after %ds\n", + ms_setting.servers[srv_idx].srv_host_name, ms_setting.servers[srv_idx].srv_port, + reconn_time); + } + break; + } + + if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) { + /* wait a second and reconnect */ + sleep(1); + } + } while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); + } + + if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) { + c->sfd = 0; + c->alive_sfds--; + } + + return EXIT_SUCCESS; +} /* ms_reconn */ + +/** + * reconnect several disconnected socks in the connection + * structure, the ever-1-second timer of the thread will check + * whether some socks in the connections disconnect. if + * disconnect, reconnect the sock. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_reconn_socks(ms_conn_t *c) { + ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key); + uint32_t srv_idx = 0; + int ret_sfd = 0; + uint32_t srv_conn_cnt = 0; + struct timeval cur_time; + + assert(c); + + if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds)) { + return EXIT_SUCCESS; + } + + for (uint32_t i = 0; i < c->total_sfds; i++) { + if (c->tcpsfd[i] == 0) { + gettimeofday(&cur_time, NULL); + + /** + * For failover test of replication, reconnect the socks after + * it disconnects more than 5 seconds, Otherwise memslap will + * block at connect() function and the work threads can't work + * in this interval. + */ + if (cur_time.tv_sec - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) { + break; + } + + if (ms_setting.rep_write_srv > 0) { + srv_idx = i % ms_setting.srv_cnt; + srv_conn_cnt = ms_setting.sock_per_conn * ms_setting.nconns; + } else { + srv_idx = ms_thread->thread_ctx->srv_idx; + srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt; + } + + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, ms_setting.udp, &ret_sfd) + == 0) + { + c->tcpsfd[i] = ret_sfd; + c->alive_sfds++; + + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) % (uint32_t) srv_conn_cnt + == 0) { + gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); + int reconn_time = (int) (ms_setting.servers[srv_idx].reconn_time.tv_sec + - ms_setting.servers[srv_idx].disconn_time.tv_sec); + fprintf(stderr, "Server %s:%d reconnect after %ds\n", + ms_setting.servers[srv_idx].srv_host_name, ms_setting.servers[srv_idx].srv_port, + reconn_time); + } + } + } + } + + return EXIT_SUCCESS; +} /* ms_reconn_socks */ + +/** + * Tokenize the command string by replacing whitespace with '\0' and update + * the token array tokens with pointer to start of each token and length. + * Returns total number of tokens. The last valid token is the terminal + * token (value points to the first unprocessed character of the string and + * length zero). + * + * Usage example: + * + * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) { + * for(int ix = 0; tokens[ix].length; ix++) { + * ... + * } + * ncommand = tokens[ix].value - command; + * command = tokens[ix].value; + * } + * + * @param command, the command string to token + * @param tokens, array to store tokens + * @param max_tokens, maximum tokens number + * + * @return int, the number of tokens + */ +static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens) { + char *s, *e; + int ntokens = 0; + + assert(command && tokens && max_tokens > 1); + + for (s = e = command; ntokens < max_tokens - 1; ++e) { + if (*e == ' ') { + if (s != e) { + tokens[ntokens].value = s; + tokens[ntokens].length = (size_t)(e - s); + ntokens++; + *e = '\0'; + } + s = e + 1; + } else if (*e == '\0') { + if (s != e) { + tokens[ntokens].value = s; + tokens[ntokens].length = (size_t)(e - s); + ntokens++; + } + + break; /* string end */ + } + } + + return ntokens; +} /* ms_tokenize_command */ + +/** + * parse the response of server. + * + * @param c, pointer of the concurrency + * @param command, the string responded by server + * + * @return int, if the command completed return EXIT_SUCCESS, else return + * -1 + */ +static int ms_ascii_process_line(ms_conn_t *c, char *command) { + int ret = 0; + int64_t value_len; + char *buffer = command; + + assert(c); + + /** + * for command get, we store the returned value into local buffer + * then continue in ms_complete_nread(). + */ + + switch (buffer[0]) { + case 'V': /* VALUE || VERSION */ + if (buffer[1] == 'A') /* VALUE */ { + token_t tokens[MAX_TOKENS]; + ms_tokenize_command(command, tokens, MAX_TOKENS); + errno = 0; + value_len = strtol(tokens[VALUELEN_TOKEN].value, NULL, 10); + if (errno) { + printf("<%d ERROR %s\n", c->sfd, strerror(errno)); + } + memcpy(&c->currcmd.key_prefix, tokens[KEY_TOKEN].value, sizeof(c->currcmd.key_prefix)); + + /* + * We read the \r\n into the string since not doing so is more + * cycles then the waster of memory to do so. + * + * We are null terminating through, which will most likely make + * some people lazy about using the return length. + */ + c->rvbytes = (int) (value_len + 2); + c->readval = true; + ret = -1; + } + + break; + + case 'O': /* OK */ + c->currcmd.retstat = MCD_SUCCESS; + break; + + case 'S': /* STORED STATS SERVER_ERROR */ + if (buffer[2] == 'A') /* STORED STATS */ { /* STATS*/ + c->currcmd.retstat = MCD_STAT; + } else if (buffer[1] == 'E') { + /* SERVER_ERROR */ + printf("<%d %s\n", c->sfd, buffer); + + c->currcmd.retstat = MCD_SERVER_ERROR; + } else if (buffer[1] == 'T') { + /* STORED */ + c->currcmd.retstat = MCD_STORED; + } else { + c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; + } + break; + + case 'D': /* DELETED DATA */ + if (buffer[1] == 'E') { + c->currcmd.retstat = MCD_DELETED; + } else { + c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; + } + + break; + + case 'N': /* NOT_FOUND NOT_STORED*/ + if (buffer[4] == 'F') { + c->currcmd.retstat = MCD_NOTFOUND; + } else if (buffer[4] == 'S') { + printf("<%d %s\n", c->sfd, buffer); + c->currcmd.retstat = MCD_NOTSTORED; + } else { + c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; + } + break; + + case 'E': /* PROTOCOL ERROR or END */ + if (buffer[1] == 'N') { + /* END */ + c->currcmd.retstat = MCD_END; + } else if (buffer[1] == 'R') { + printf("<%d ERROR\n", c->sfd); + c->currcmd.retstat = MCD_PROTOCOL_ERROR; + } else if (buffer[1] == 'X') { + c->currcmd.retstat = MCD_DATA_EXISTS; + printf("<%d %s\n", c->sfd, buffer); + } else { + c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; + } + break; + + case 'C': /* CLIENT ERROR */ + printf("<%d %s\n", c->sfd, buffer); + c->currcmd.retstat = MCD_CLIENT_ERROR; + break; + + default: + c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; + break; + } /* switch */ + + return ret; +} /* ms_ascii_process_line */ + +/** + * after one operation completes, reset the concurrency + * + * @param c, pointer of the concurrency + * @param timeout, whether it's timeout + */ +void ms_reset_conn(ms_conn_t *c, bool timeout) { + assert(c); + + if (c->udp) { + if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) { + memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t) c->packets); + } + + c->packets = 0; + c->recvpkt = 0; + c->pktcurr = 0; + c->ordcurr = 0; + c->rudpbytes = 0; + } + c->currcmd.isfinish = true; + c->ctnwrite = false; + c->rbytes = 0; + c->rcurr = c->rbuf; + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; + ms_conn_set_state(c, conn_write); + memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ + + if (timeout) { + ms_drive_machine(c); + } +} /* ms_reset_conn */ + +/** + * if we have a complete line in the buffer, process it. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_try_read_line(ms_conn_t *c) { + if (c->protocol == binary_prot) { + /* Do we have the complete packet header? */ + if ((uint64_t) c->rbytes < sizeof(c->binary_header)) { + /* need more data! */ + return EXIT_SUCCESS; + } else { +#ifdef NEED_ALIGN + if (((long) (c->rcurr)) % 8) { + /* must realign input buffer */ + memmove(c->rbuf, c->rcurr, c->rbytes); + c->rcurr = c->rbuf; + if (settings.verbose) { + fprintf(stderr, "%d: Realign input buffer.\n", c->sfd); + } + } +#endif + protocol_binary_response_header *rsp; + rsp = (protocol_binary_response_header *) c->rcurr; + + c->binary_header = *rsp; + c->binary_header.response.extlen = rsp->response.extlen; + c->binary_header.response.keylen = ntohs(rsp->response.keylen); + c->binary_header.response.bodylen = ntohl(rsp->response.bodylen); + c->binary_header.response.status = ntohs(rsp->response.status); + + if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) { + fprintf(stderr, "Invalid magic: %x\n", c->binary_header.response.magic); + ms_conn_set_state(c, conn_closing); + return EXIT_SUCCESS; + } + + /* process this complete response */ + if (ms_bin_process_response(c) == 0) { + /* current operation completed */ + ms_reset_conn(c, false); + return -1; + } else { + c->rbytes -= (int32_t) sizeof(c->binary_header); + c->rcurr += sizeof(c->binary_header); + } + } + } else { + char *el, *cont; + + assert(c); + assert(c->rcurr <= (c->rbuf + c->rsize)); + + if (c->rbytes == 0) + return EXIT_SUCCESS; + + el = memchr(c->rcurr, '\n', (size_t) c->rbytes); + if (!el) + return EXIT_SUCCESS; + + cont = el + 1; + if (((el - c->rcurr) > 1) && (*(el - 1) == '\r')) { + el--; + } + *el = '\0'; + + assert(cont <= (c->rcurr + c->rbytes)); + + /* process this complete line */ + if (ms_ascii_process_line(c, c->rcurr) == 0) { + /* current operation completed */ + ms_reset_conn(c, false); + return -1; + } else { + /* current operation didn't complete */ + c->rbytes -= (int32_t)(cont - c->rcurr); + c->rcurr = cont; + } + + assert(c->rcurr <= (c->rbuf + c->rsize)); + } + + return -1; +} /* ms_try_read_line */ + +/** + * because the packet of UDP can't ensure the order, the + * function is used to sort the received udp packet. + * + * @param c, pointer of the concurrency + * @param buf, the buffer to store the ordered packages data + * @param rbytes, the maximum capacity of the buffer + * + * @return int, if success, return the copy bytes, else return + * -1 + */ +static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes) { + int len = 0; + int wbytes = 0; + uint16_t req_id = 0; + uint16_t seq_num = 0; + uint16_t packets = 0; + unsigned char *header = NULL; + + /* no enough data */ + assert(c); + assert(buf); + assert(c->rudpbytes >= UDP_HEADER_SIZE); + + /* calculate received packets count */ + if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) { + /* the last packet has some data */ + c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1; + } else { + c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE; + } + + /* get the total packets count if necessary */ + if (c->packets == 0) { + c->packets = HEADER_TO_PACKETS((unsigned char *) c->rudpbuf); + } + + /* build the ordered packet array */ + for (int i = c->pktcurr; i < c->recvpkt; i++) { + header = (unsigned char *) c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE; + req_id = (uint16_t) HEADER_TO_REQID(header); + assert(req_id == c->request_id % (1 << 16)); + + packets = (uint16_t) HEADER_TO_PACKETS(header); + assert(c->packets == HEADER_TO_PACKETS(header)); + + seq_num = (uint16_t) HEADER_TO_SEQNUM(header); + c->udppkt[seq_num].header = header; + c->udppkt[seq_num].data = (char *) header + UDP_HEADER_SIZE; + + if (i == c->recvpkt - 1) { + /* last received packet */ + if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) { + c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; + c->pktcurr++; + } else { + c->udppkt[seq_num].rbytes = c->rudpbytes % UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; + } + } else { + c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; + c->pktcurr++; + } + } + + for (int i = c->ordcurr; i < c->recvpkt; i++) { + /* there is some data to copy */ + if ((c->udppkt[i].data) && (c->udppkt[i].copybytes < c->udppkt[i].rbytes)) { + header = c->udppkt[i].header; + len = c->udppkt[i].rbytes - c->udppkt[i].copybytes; + if (len > rbytes - wbytes) { + len = rbytes - wbytes; + } + + assert(len <= rbytes - wbytes); + assert(i == HEADER_TO_SEQNUM(header)); + + memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, (size_t) len); + wbytes += len; + c->udppkt[i].copybytes += len; + + if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes) + && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) + { + /* finish copying all the data of this packet, next */ + c->ordcurr++; + } + + /* last received packet, and finish copying all the data */ + if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1) + && (c->udppkt[i].copybytes == c->udppkt[i].rbytes)) + { + break; + } + + /* no space to copy data */ + if (wbytes >= rbytes) { + break; + } + + /* it doesn't finish reading all the data of the packet from network */ + if ((i != c->recvpkt - 1) && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) { + break; + } + } else { + /* no data to copy */ + break; + } + } + (void) packets; + + return wbytes == 0 ? -1 : wbytes; +} /* ms_sort_udp_packet */ + +/** + * encapsulate upd read like tcp read + * + * @param c, pointer of the concurrency + * @param buf, read buffer + * @param len, length to read + * + * @return int, if success, return the read bytes, else return + * -1 + */ +static int ms_udp_read(ms_conn_t *c, char *buf, int len) { + int res = 0; + int avail = 0; + int rbytes = 0; + int copybytes = 0; + + assert(c->udp); + + while (1) { + if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) { + char *new_rbuf = realloc(c->rudpbuf, (size_t) c->rudpsize * 2); + if (!new_rbuf) { + fprintf(stderr, "Couldn't realloc input buffer.\n"); + c->rudpbytes = 0; /* ignore what we read */ + return -1; + } + c->rudpbuf = new_rbuf; + c->rudpsize *= 2; + } + + avail = c->rudpsize - c->rudpbytes; + /* UDP each time read a packet, 1400 bytes */ + res = (int) read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t) avail); + + if (res > 0) { + atomic_add_size(&ms_stats.bytes_read, res); + c->rudpbytes += res; + rbytes += res; + if (res == avail) { + continue; + } else { + break; + } + } + + if (res == 0) { + /* "connection" closed */ + return res; + } + + if (res == -1) { + /* no data to read */ + return res; + } + } + + /* copy data to read buffer */ + if (rbytes > 0) { + copybytes = ms_sort_udp_packet(c, buf, len); + } + + if (copybytes == -1) { + atomic_add_size(&ms_stats.pkt_disorder, 1); + } + + return copybytes; +} /* ms_udp_read */ + +/* + * read from network as much as we can, handle buffer overflow and connection + * close. + * before reading, move the remaining incomplete fragment of a command + * (if any) to the beginning of the buffer. + * return EXIT_SUCCESS if there's nothing to read on the first read. + */ + +/** + * read from network as much as we can, handle buffer overflow and connection + * close. before reading, move the remaining incomplete fragment of a command + * (if any) to the beginning of the buffer. + * + * @param c, pointer of the concurrency + * + * @return int, + * return EXIT_SUCCESS if there's nothing to read on the first read. + * return EXIT_FAILURE if get data + * return -1 if error happens + */ +static int ms_try_read_network(ms_conn_t *c) { + int gotdata = 0; + int res; + int64_t avail; + + assert(c); + + if ((c->rcurr != c->rbuf) + && (!c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf)) + || (c->readval && (c->rcurr - c->rbuf > c->rbytes)))) + { + if (c->rbytes) /* otherwise there's nothing to copy */ + memmove(c->rbuf, c->rcurr, (size_t) c->rbytes); + c->rcurr = c->rbuf; + } + + while (1) { + if (c->rbytes >= c->rsize) { + char *new_rbuf = realloc(c->rbuf, (size_t) c->rsize * 2); + if (!new_rbuf) { + fprintf(stderr, "Couldn't realloc input buffer.\n"); + c->rbytes = 0; /* ignore what we read */ + return -1; + } + c->rcurr = c->rbuf = new_rbuf; + c->rsize *= 2; + } + + avail = c->rsize - c->rbytes - (c->rcurr - c->rbuf); + if (avail == 0) { + break; + } + + if (c->udp) { + res = (int32_t) ms_udp_read(c, c->rcurr + c->rbytes, (int32_t) avail); + } else { + res = (int) read(c->sfd, c->rcurr + c->rbytes, (size_t) avail); + } + + if (res > 0) { + if (!c->udp) { + atomic_add_size(&ms_stats.bytes_read, res); + } + gotdata = 1; + c->rbytes += res; + if (res == avail) { + continue; + } else { + break; + } + } + if (res == 0) { + /* connection closed */ + ms_conn_set_state(c, conn_closing); + return -1; + } + if (res == -1) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + break; + /* Should close on unhandled errors. */ + ms_conn_set_state(c, conn_closing); + return -1; + } + } + + return gotdata; +} /* ms_try_read_network */ + +/** + * after get the object from server, verify the value if + * necessary. + * + * @param c, pointer of the concurrency + * @param mlget_item, pointer of mulit-get task item structure + * @param value, received value string + * @param vlen, received value string length + */ +static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, char *value, int vlen) { + if (c->curr_task.verify) { + assert(c->curr_task.item->value_offset != INVALID_OFFSET); + char *orignval = &ms_setting.char_block[c->curr_task.item->value_offset]; + char *orignkey = &ms_setting.char_block[c->curr_task.item->key_suffix_offset]; + + /* verify expire time if necessary */ + if (c->curr_task.item->exp_time > 0) { + struct timeval curr_time; + gettimeofday(&curr_time, NULL); + + /* object expired but get it now */ + if (curr_time.tv_sec - c->curr_task.item->client_time + > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) + { + atomic_add_size(&ms_stats.exp_get, 1); + + if (ms_setting.verbose) { + char set_time[64]; + char cur_time[64]; + strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&c->curr_task.item->client_time)); + strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", localtime(&curr_time.tv_sec)); + fprintf(stderr, + "\n<%d expire time verification failed, " + "object expired but get it now\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64 " %.*s\n" + "\tset time: %s current time: %s " + "diff time: %d expire time: %d\n" + "\texpected data: \n" + "\treceived data len: %d\n" + "\treceived data: %.*s\n", + c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix, + c->curr_task.item->key_size - (int) KEY_PREFIX_SIZE, orignkey, set_time, cur_time, + (int) (curr_time.tv_sec - c->curr_task.item->client_time), + c->curr_task.item->exp_time, vlen, vlen, value); + fflush(stderr); + } + } + } else { + if ((c->curr_task.item->value_size != vlen) || (memcmp(orignval, value, (size_t) vlen))) + { + atomic_add_size(&ms_stats.vef_failed, 1); + + if (ms_setting.verbose) { + fprintf(stderr, + "\n<%d data verification failed\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64 " %.*s\n" + "\texpected data len: %d\n" + "\texpected data: %.*s\n" + "\treceived data len: %d\n" + "\treceived data: %.*s\n", + c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix, + c->curr_task.item->key_size - (int) KEY_PREFIX_SIZE, orignkey, + c->curr_task.item->value_size, c->curr_task.item->value_size, orignval, vlen, + vlen, value); + fflush(stderr); + } + } + } + + c->curr_task.finish_verify = true; + + if (mlget_item) { + mlget_item->finish_verify = true; + } + } +} /* ms_verify_value */ + +/** + * For ASCII protocol, after store the data into the local + * buffer, run this function to handle the data. + * + * @param c, pointer of the concurrency + */ +static void ms_ascii_complete_nread(ms_conn_t *c) { + assert(c); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == ascii_prot); + if (c->rvbytes > 2) { + assert(c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r'); + } + + /* multi-get */ + ms_mlget_task_item_t *mlget_item = NULL; + if (((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) + || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) + { + c->mlget_task.value_index++; + mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index]; + + if (mlget_item->item->key_prefix == c->currcmd.key_prefix) { + c->curr_task.item = mlget_item->item; + c->curr_task.verify = mlget_item->verify; + c->curr_task.finish_verify = mlget_item->finish_verify; + mlget_item->get_miss = false; + } else { + /* Try to find the task item in multi-get task array */ + for (int i = 0; i < c->mlget_task.mlget_num; i++) { + mlget_item = &c->mlget_task.mlget_item[i]; + if (mlget_item->item->key_prefix == c->currcmd.key_prefix) { + c->curr_task.item = mlget_item->item; + c->curr_task.verify = mlget_item->verify; + c->curr_task.finish_verify = mlget_item->finish_verify; + mlget_item->get_miss = false; + + break; + } + } + } + } + + ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2); + + c->curr_task.get_miss = false; + c->rbytes -= c->rvbytes; + c->rcurr = c->rcurr + c->rvbytes; + assert(c->rcurr <= (c->rbuf + c->rsize)); + c->readval = false; + c->rvbytes = 0; +} /* ms_ascii_complete_nread */ + +/** + * For binary protocol, after store the data into the local + * buffer, run this function to handle the data. + * + * @param c, pointer of the concurrency + */ +static void ms_bin_complete_nread(ms_conn_t *c) { + assert(c); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == binary_prot); + + int extlen = c->binary_header.response.extlen; + int keylen = c->binary_header.response.keylen; + uint8_t opcode = c->binary_header.response.opcode; + + /* not get command or not include value, just return */ + if (((opcode != PROTOCOL_BINARY_CMD_GET) && (opcode != PROTOCOL_BINARY_CMD_GETQ)) + || (c->rvbytes <= extlen + keylen)) + { + /* get miss */ + if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) { + c->currcmd.retstat = MCD_END; + c->curr_task.get_miss = true; + } + + c->readval = false; + c->rvbytes = 0; + ms_reset_conn(c, false); + return; + } + + /* multi-get */ + ms_mlget_task_item_t *mlget_item = NULL; + if (((ms_setting.mult_key_num > 1) && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) + || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) + { + c->mlget_task.value_index++; + mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index]; + + c->curr_task.item = mlget_item->item; + c->curr_task.verify = mlget_item->verify; + c->curr_task.finish_verify = mlget_item->finish_verify; + mlget_item->get_miss = false; + } + + ms_verify_value(c, mlget_item, c->rcurr + extlen + keylen, c->rvbytes - extlen - keylen); + + c->currcmd.retstat = MCD_END; + c->curr_task.get_miss = false; + c->rbytes -= c->rvbytes; + c->rcurr = c->rcurr + c->rvbytes; + assert(c->rcurr <= (c->rbuf + c->rsize)); + c->readval = false; + c->rvbytes = 0; + + if (ms_setting.mult_key_num > 1) { + /* multi-get have check all the item */ + if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) { + ms_reset_conn(c, false); + } + } else { + /* single get */ + ms_reset_conn(c, false); + } +} /* ms_bin_complete_nread */ + +/** + * we get here after reading the value of get commands. + * + * @param c, pointer of the concurrency + */ +static void ms_complete_nread(ms_conn_t *c) { + assert(c); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == ascii_prot || c->protocol == binary_prot); + + if (c->protocol == binary_prot) { + ms_bin_complete_nread(c); + } else { + ms_ascii_complete_nread(c); + } +} /* ms_complete_nread */ + +/** + * Adds a message header to a connection. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_add_msghdr(ms_conn_t *c) { + struct msghdr *msg; + + assert(c); + + if (c->msgsize == c->msgused) { + msg = realloc(c->msglist, (size_t) c->msgsize * 2 * sizeof(struct msghdr)); + if (!msg) + return -1; + + c->msglist = msg; + c->msgsize *= 2; + } + + msg = c->msglist + c->msgused; + + /** + * this wipes msg_iovlen, msg_control, msg_controllen, and + * msg_flags, the last 3 of which aren't defined on solaris: + */ + memset(msg, 0, sizeof(struct msghdr)); + + msg->msg_iov = &c->iov[c->iovused]; + + if (c->udp && (c->srv_recv_addr_size > 0)) { + msg->msg_name = &c->srv_recv_addr; + msg->msg_namelen = c->srv_recv_addr_size; + } + + c->msgbytes = 0; + c->msgused++; + + if (c->udp) { + /* Leave room for the UDP header, which we'll fill in later. */ + return ms_add_iov(c, NULL, UDP_HEADER_SIZE); + } + + return EXIT_SUCCESS; +} /* ms_add_msghdr */ + +/** + * Ensures that there is room for another structure iovec in a connection's + * iov list. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_ensure_iov_space(ms_conn_t *c) { + assert(c); + + if (c->iovused >= c->iovsize) { + int i, iovnum; + struct iovec *new_iov = + (struct iovec *) realloc(c->iov, ((size_t) c->iovsize * 2) * sizeof(struct iovec)); + if (!new_iov) + return -1; + + c->iov = new_iov; + c->iovsize *= 2; + + /* Point all the msghdr structures at the new list. */ + for (i = 0, iovnum = 0; i < c->msgused; i++) { + c->msglist[i].msg_iov = &c->iov[iovnum]; + iovnum += (int) c->msglist[i].msg_iovlen; + } + } + + return EXIT_SUCCESS; +} /* ms_ensure_iov_space */ + +/** + * Adds data to the list of pending data that will be written out to a + * connection. + * + * @param c, pointer of the concurrency + * @param buf, the buffer includes data to send + * @param len, the data length in the buffer + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_add_iov(ms_conn_t *c, const void *buf, int len) { + struct msghdr *m; + int leftover; + bool limit_to_mtu; + + assert(c); + + do { + m = &c->msglist[c->msgused - 1]; + + /* + * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes. + */ + limit_to_mtu = c->udp; + +#ifdef IOV_MAX + /* We may need to start a new msghdr if this one is full. */ + if ((m->msg_iovlen == IOV_MAX) || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE))) + { + ms_add_msghdr(c); + m = &c->msglist[c->msgused - 1]; + } +#endif + + if (ms_ensure_iov_space(c)) + return -1; + + /* If the fragment is too big to fit in the datagram, split it up */ + if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE)) { + leftover = len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE; + len -= leftover; + } else { + leftover = 0; + } + + m = &c->msglist[c->msgused - 1]; + m->msg_iov[m->msg_iovlen].iov_base = (void *) buf; + m->msg_iov[m->msg_iovlen].iov_len = (size_t) len; + + c->msgbytes += len; + c->iovused++; + m->msg_iovlen++; + + buf = ((char *) buf) + len; + len = leftover; + } while (leftover > 0); + + return EXIT_SUCCESS; +} /* ms_add_iov */ + +/** + * Constructs a set of UDP headers and attaches them to the outgoing messages. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_udp_headers(ms_conn_t *c) { + int i; + unsigned char *hdr; + + assert(c); + + c->request_id = ms_get_udp_request_id(); + + if (c->msgused > c->hdrsize) { + void *new_hdrbuf; + if (c->hdrbuf) + new_hdrbuf = realloc(c->hdrbuf, (size_t) c->msgused * 2 * UDP_HEADER_SIZE); + else + new_hdrbuf = malloc((size_t) c->msgused * 2 * UDP_HEADER_SIZE); + if (!new_hdrbuf) + return -1; + + c->hdrbuf = (unsigned char *) new_hdrbuf; + c->hdrsize = c->msgused * 2; + } + + /* If this is a multi-packet request, drop it. */ + if (c->udp && (c->msgused > 1)) { + fprintf(stderr, "multi-packet request for UDP not supported.\n"); + return -1; + } + + hdr = c->hdrbuf; + for (i = 0; i < c->msgused; i++) { + c->msglist[i].msg_iov[0].iov_base = (void *) hdr; + c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE; + *hdr++ = (unsigned char) (c->request_id / 256); + *hdr++ = (unsigned char) (c->request_id % 256); + *hdr++ = (unsigned char) (i / 256); + *hdr++ = (unsigned char) (i % 256); + *hdr++ = (unsigned char) (c->msgused / 256); + *hdr++ = (unsigned char) (c->msgused % 256); + *hdr++ = (unsigned char) 1; /* support facebook memcached */ + *hdr++ = (unsigned char) 0; + assert(hdr == ((unsigned char *) c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE)); + } + + return EXIT_SUCCESS; +} /* ms_build_udp_headers */ + +/** + * Transmit the next chunk of data from our list of msgbuf structures. + * + * @param c, pointer of the concurrency + * + * @return TRANSMIT_COMPLETE All done writing. + * TRANSMIT_INCOMPLETE More data remaining to write. + * TRANSMIT_SOFT_ERROR Can't write any more right now. + * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing) + */ +static int ms_transmit(ms_conn_t *c) { + assert(c); + + if ((c->msgcurr < c->msgused) && (c->msglist[c->msgcurr].msg_iovlen == 0)) { + /* Finished writing the current msg; advance to the next. */ + c->msgcurr++; + } + + if (c->msgcurr < c->msgused) { + ssize_t res; + struct msghdr *m = &c->msglist[c->msgcurr]; + + res = sendmsg(c->sfd, m, 0); + if (res > 0) { + atomic_add_size(&ms_stats.bytes_written, res); + + /* We've written some of the data. Remove the completed + * iovec entries from the list of pending writes. */ + while (m->msg_iovlen > 0 && res >= (ssize_t) m->msg_iov->iov_len) { + res -= (ssize_t) m->msg_iov->iov_len; + m->msg_iovlen--; + m->msg_iov++; + } + + /* Might have written just part of the last iovec entry; + * adjust it so the next write will do the rest. */ + if (res > 0) { + m->msg_iov->iov_base = (void *) ((unsigned char *) m->msg_iov->iov_base + res); + m->msg_iov->iov_len -= (size_t) res; + } + return TRANSMIT_INCOMPLETE; + } + if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) { + if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + return TRANSMIT_HARD_ERROR; + } + return TRANSMIT_SOFT_ERROR; + } + + /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK, + * we have a real error, on which we close the connection */ + fprintf(stderr, "Failed to write, and not due to blocking.\n"); + + ms_conn_set_state(c, conn_closing); + return TRANSMIT_HARD_ERROR; + } else { + return TRANSMIT_COMPLETE; + } +} /* ms_transmit */ + +/** + * Shrinks a connection's buffers if they're too big. This prevents + * periodic large "mget" response from server chewing lots of client + * memory. + * + * This should only be called in between requests since it can wipe output + * buffers! + * + * @param c, pointer of the concurrency + */ +static void ms_conn_shrink(ms_conn_t *c) { + assert(c); + + if (c->udp) + return; + + if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE)) { + char *newbuf; + + if (c->rcurr != c->rbuf) + memmove(c->rbuf, c->rcurr, (size_t) c->rbytes); + + newbuf = (char *) realloc((void *) c->rbuf, DATA_BUFFER_SIZE); + + if (newbuf) { + c->rbuf = newbuf; + c->rsize = DATA_BUFFER_SIZE; + } + c->rcurr = c->rbuf; + } + + if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT) + && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE)) + { + char *new_rbuf = (char *) realloc(c->rudpbuf, (size_t) c->rudpsize * 2); + if (new_rbuf) { + c->rudpbuf = new_rbuf; + c->rudpsize = UDP_DATA_BUFFER_SIZE; + } + /* TODO check error condition? */ + } + + if (c->msgsize > MSG_LIST_HIGHWAT) { + struct msghdr *newbuf = + (struct msghdr *) realloc((void *) c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0])); + if (newbuf) { + c->msglist = newbuf; + c->msgsize = MSG_LIST_INITIAL; + } + /* TODO check error condition? */ + } + + if (c->iovsize > IOV_LIST_HIGHWAT) { + struct iovec *newbuf = + (struct iovec *) realloc((void *) c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0])); + if (newbuf) { + c->iov = newbuf; + c->iovsize = IOV_LIST_INITIAL; + } + /* TODO check return value */ + } +} /* ms_conn_shrink */ + +/** + * Sets a connection's current state in the state machine. Any special + * processing that needs to happen on certain state transitions can + * happen here. + * + * @param c, pointer of the concurrency + * @param state, connection state + */ +static void ms_conn_set_state(ms_conn_t *c, int state) { + assert(c); + + if (state != c->state) { + if (state == conn_read) { + ms_conn_shrink(c); + } + c->state = state; + } +} /* ms_conn_set_state */ + +/** + * update the event if socks change state. for example: when + * change the listen scoket read event to sock write event, or + * change socket handler, we could call this function. + * + * @param c, pointer of the concurrency + * @param new_flags, new event flags + * + * @return bool, if success, return true, else return false + */ +static bool ms_update_event(ms_conn_t *c, const int new_flags) { + assert(c); + + struct event_base *base = c->event.ev_base; + if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0) + && (!ms_setting.facebook_test || (c->total_sfds == 1))) + { + return true; + } + + if (event_del(&c->event) == -1) { + /* try to delete the event again */ + if (event_del(&c->event) == -1) { + return false; + } + } + + event_set(&c->event, c->sfd, (short) new_flags, ms_event_handler, (void *) c); + event_base_set(base, &c->event); + c->ev_flags = (short) new_flags; + + if (event_add(&c->event, NULL) == -1) { + return false; + } + + return true; +} /* ms_update_event */ + +/** + * If user want to get the expected throughput, we could limit + * the performance of memslap. we could give up some work and + * just wait a short time. The function is used to check this + * case. + * + * @param c, pointer of the concurrency + * + * @return bool, if success, return true, else return false + */ +static bool ms_need_yield(ms_conn_t *c) { + ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key); + int64_t tps = 0; + int64_t time_diff = 0; + struct timeval curr_time; + ms_task_t *task = &c->curr_task; + + if (ms_setting.expected_tps > 0) { + gettimeofday(&curr_time, NULL); + time_diff = ms_time_diff(&ms_thread->startup_time, &curr_time); + tps = (int64_t)(((task->get_opt + task->set_opt) / (uint64_t) time_diff) * 1000000); + + /* current throughput is greater than expected throughput */ + if (tps > ms_thread->thread_ctx->tps_perconn) { + return true; + } + } + + return false; +} /* ms_need_yield */ + +/** + * used to update the start time of each operation + * + * @param c, pointer of the concurrency + */ +static void ms_update_start_time(ms_conn_t *c) { + ms_task_item_t *item = c->curr_task.item; + + if ((ms_setting.stat_freq > 0) || c->udp || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))) + { + gettimeofday(&c->start_time, NULL); + if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)) { + /* record the current time */ + item->client_time = c->start_time.tv_sec; + } + } +} /* ms_update_start_time */ + +/** + * run the state machine + * + * @param c, pointer of the concurrency + */ +static void ms_drive_machine(ms_conn_t *c) { + bool stop = false; + + assert(c); + + while (!stop) { + switch (c->state) { + case conn_read: + if (c->readval) { + if (c->rbytes >= c->rvbytes) { + ms_complete_nread(c); + break; + } + } else { + if (ms_try_read_line(c)) { + break; + } + } + + if (ms_try_read_network(c)) { + break; + } + + /* doesn't read all the response data, wait event wake up */ + if (!c->currcmd.isfinish) { + if (!ms_update_event(c, EV_READ | EV_PERSIST)) { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop = true; + break; + } + + /* we have no command line and no data to read from network, next write */ + ms_conn_set_state(c, conn_write); + memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ + + break; + + case conn_write: + if (!c->ctnwrite && ms_need_yield(c)) { + usleep(10); + + if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop = true; + break; + } + + if (!c->ctnwrite && (ms_exec_task(c))) { + ms_conn_set_state(c, conn_closing); + break; + } + + /* record the start time before starting to send data if necessary */ + if (!c->ctnwrite || (c->change_sfd && c->ctnwrite)) { + if (c->change_sfd) { + c->change_sfd = false; + } + ms_update_start_time(c); + } + + /* change sfd if necessary */ + if (c->change_sfd) { + c->ctnwrite = true; + stop = true; + break; + } + + /* execute task until nothing need be written to network */ + if (!c->ctnwrite && (c->msgcurr == c->msgused)) { + if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop = true; + break; + } + + switch (ms_transmit(c)) { + case TRANSMIT_COMPLETE: + /* we have no data to write to network, next wait repose */ + if (!ms_update_event(c, EV_READ | EV_PERSIST)) { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + c->ctnwrite = false; + break; + } + ms_conn_set_state(c, conn_read); + c->ctnwrite = false; + stop = true; + break; + + case TRANSMIT_INCOMPLETE: + c->ctnwrite = true; + break; /* Continue in state machine. */ + + case TRANSMIT_HARD_ERROR: + c->ctnwrite = false; + break; + + case TRANSMIT_SOFT_ERROR: + c->ctnwrite = true; + stop = true; + break; + + default: + break; + } /* switch */ + + break; + + case conn_closing: + /* recovery mode, need reconnect if connection close */ + if (ms_setting.reconnect + && (!ms_global.time_out || ((ms_setting.run_time == 0) && (c->remain_exec_num > 0)))) + { + if (ms_reconn(c)) { + ms_conn_close(c); + stop = true; + break; + } + + ms_reset_conn(c, false); + + if (c->total_sfds == 1) { + if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + } + + break; + } else { + ms_conn_close(c); + stop = true; + break; + } + + default: + assert(0); + } /* switch */ + } +} /* ms_drive_machine */ + +/** + * the event handler of each thread + * + * @param fd, the file descriptor of socket + * @param which, event flag + * @param arg, argument + */ +void ms_event_handler(const int fd, const short which, void *arg) { + ms_conn_t *c = (ms_conn_t *) arg; + + assert(c); + + c->which = which; + + /* sanity */ + if (fd != c->sfd) { + fprintf(stderr, "Catastrophic: event fd: %d doesn't match conn fd: %d\n", fd, c->sfd); + ms_conn_close(c); + exit(1); + } + assert(fd == c->sfd); + + ms_drive_machine(c); + + /* wait for next event */ +} /* ms_event_handler */ + +/** + * get the next socket descriptor index to run for replication + * + * @param c, pointer of the concurrency + * @param cmd, command(get or set ) + * + * @return int, if success, return the index, else return EXIT_SUCCESS + */ +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) { + uint32_t sock_index = 0; + uint32_t i = 0; + + if (c->total_sfds == 1) { + return EXIT_SUCCESS; + } + + if (ms_setting.rep_write_srv == 0) { + return sock_index; + } + + do { + if (cmd == CMD_SET) { + for (i = 0; i < ms_setting.rep_write_srv; i++) { + if (c->tcpsfd[i] > 0) { + break; + } + } + + if (i == ms_setting.rep_write_srv) { + /* random get one replication server to read */ + sock_index = (uint32_t) random() % c->total_sfds; + } else { + /* random get one replication writing server to write */ + sock_index = (uint32_t) random() % ms_setting.rep_write_srv; + } + } else if (cmd == CMD_GET) { + /* random get one replication server to read */ + sock_index = (uint32_t) random() % c->total_sfds; + } + } while (c->tcpsfd[sock_index] == 0); + + return sock_index; +} /* ms_get_rep_sock_index */ + +/** + * get the next socket descriptor index to run + * + * @param c, pointer of the concurrency + * + * @return int, return the index + */ +static uint32_t ms_get_next_sock_index(ms_conn_t *c) { + uint32_t sock_index = 0; + + do { + sock_index = (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx; + } while (c->tcpsfd[sock_index] == 0); + + return sock_index; +} /* ms_get_next_sock_index */ + +/** + * update socket event of the connections + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_update_conn_sock_event(ms_conn_t *c) { + assert(c); + + switch (c->currcmd.cmd) { + case CMD_SET: + if (ms_setting.facebook_test && c->udp) { + c->sfd = c->tcpsfd[0]; + c->udp = false; + c->change_sfd = true; + } + break; + + case CMD_GET: + if (ms_setting.facebook_test && !c->udp) { + c->sfd = c->udpsfd; + c->udp = true; + c->change_sfd = true; + } + break; + + default: + break; + } /* switch */ + + if (!c->udp && (c->total_sfds > 1)) { + if (c->cur_idx != c->total_sfds) { + if (ms_setting.rep_write_srv == 0) { + c->cur_idx = ms_get_next_sock_index(c); + } else { + c->cur_idx = ms_get_rep_sock_index(c, c->currcmd.cmd); + } + } else { + /* must select the first sock of the connection at the beginning */ + c->cur_idx = 0; + } + + c->sfd = c->tcpsfd[c->cur_idx]; + assert(c->sfd); + c->change_sfd = true; + } + + if (c->change_sfd) { + if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + return -1; + } + } + + return EXIT_SUCCESS; +} /* ms_update_conn_sock_event */ + +/** + * for ASCII protocol, this function build the set command + * string and send the command. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) { + int value_offset; + int write_len; + char *buffer = c->wbuf; + + write_len = snprintf(buffer, c->wsize, " %u %d %d\r\n", 0, item->exp_time, item->value_size); + + if (write_len > c->wsize || write_len < 0) { + /* ought to be always enough. just fail for simplicity */ + fprintf(stderr, "output command line too long.\n"); + return -1; + } + + if (item->value_offset == INVALID_OFFSET) { + value_offset = item->key_suffix_offset; + } else { + value_offset = item->value_offset; + } + + if ((ms_add_iov(c, "set ", 4)) + || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE)) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int) KEY_PREFIX_SIZE) +) + || (ms_add_iov(c, buffer, write_len)) + || (ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size)) + || (ms_add_iov(c, "\r\n", 2)) || (c->udp && (ms_build_udp_headers(c)))) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_set */ + +/** + * used to send set command to server + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) { + assert(c); + + c->currcmd.cmd = CMD_SET; + c->currcmd.isfinish = false; + c->currcmd.retstat = MCD_FAILURE; + + if (ms_update_conn_sock_event(c)) { + return -1; + } + + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; + if (ms_add_msghdr(c)) { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } + + /* binary protocol */ + if (c->protocol == binary_prot) { + if (ms_build_bin_write_buf_set(c, item)) { + return -1; + } + } else { + if (ms_build_ascii_write_buf_set(c, item)) { + return -1; + } + } + + atomic_add_size(&ms_stats.obj_bytes, item->key_size + item->value_size); + atomic_add_size(&ms_stats.cmd_set, 1); + + return EXIT_SUCCESS; +} /* ms_mcd_set */ + +/** + * for ASCII protocol, this function build the get command + * string and send the command. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) { + if ((ms_add_iov(c, "get ", 4)) + || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE)) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int) KEY_PREFIX_SIZE) +) + || (ms_add_iov(c, "\r\n", 2)) || (c->udp && (ms_build_udp_headers(c)))) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_get */ + +/** + * used to send the get command to server + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) { + assert(c); + + c->currcmd.cmd = CMD_GET; + c->currcmd.isfinish = false; + c->currcmd.retstat = MCD_FAILURE; + + if (ms_update_conn_sock_event(c)) { + return -1; + } + + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; + if (ms_add_msghdr(c)) { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } + + /* binary protocol */ + if (c->protocol == binary_prot) { + if (ms_build_bin_write_buf_get(c, item)) { + return -1; + } + } else { + if (ms_build_ascii_write_buf_get(c, item)) { + return -1; + } + } + + atomic_add_size(&ms_stats.cmd_get, 1); + + return EXIT_SUCCESS; +} /* ms_mcd_get */ + +/** + * for ASCII protocol, this function build the multi-get command + * string and send the command. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) { + ms_task_item_t *item; + + if (ms_add_iov(c, "get", 3)) { + return -1; + } + + for (int i = 0; i < c->mlget_task.mlget_num; i++) { + item = c->mlget_task.mlget_item[i].item; + assert(item); + if ((ms_add_iov(c, " ", 1)) + || (ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE)) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int) KEY_PREFIX_SIZE) +)) + { + return -1; + } + } + + if ((ms_add_iov(c, "\r\n", 2)) || (c->udp && (ms_build_udp_headers(c)))) { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_mlget */ + +/** + * used to send the multi-get command to server + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_mcd_mlget(ms_conn_t *c) { + ms_task_item_t *item; + + assert(c); + assert(c->mlget_task.mlget_num >= 1); + + c->currcmd.cmd = CMD_GET; + c->currcmd.isfinish = false; + c->currcmd.retstat = MCD_FAILURE; + + if (ms_update_conn_sock_event(c)) { + return -1; + } + + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; + if (ms_add_msghdr(c)) { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } + + /* binary protocol */ + if (c->protocol == binary_prot) { + if (ms_build_bin_write_buf_mlget(c)) { + return -1; + } + } else { + if (ms_build_ascii_write_buf_mlget(c)) { + return -1; + } + } + + /* decrease operation time of each item */ + for (int i = 0; i < c->mlget_task.mlget_num; i++) { + item = c->mlget_task.mlget_item[i].item; + atomic_add_size(&ms_stats.cmd_get, 1); + } + + (void) item; + + return EXIT_SUCCESS; +} /* ms_mcd_mlget */ + +/** + * binary protocol support + */ + +/** + * for binary protocol, parse the response of server + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_bin_process_response(ms_conn_t *c) { + const char *errstr = NULL; + + assert(c); + + uint32_t bodylen = c->binary_header.response.bodylen; + uint8_t opcode = c->binary_header.response.opcode; + uint16_t status = c->binary_header.response.status; + + if (bodylen > 0) { + c->rvbytes = (int32_t) bodylen; + c->readval = true; + return EXIT_FAILURE; + } else { + switch (status) { + case PROTOCOL_BINARY_RESPONSE_SUCCESS: + if (opcode == PROTOCOL_BINARY_CMD_SET) { + c->currcmd.retstat = MCD_STORED; + } else if (opcode == PROTOCOL_BINARY_CMD_DELETE) { + c->currcmd.retstat = MCD_DELETED; + } else if (opcode == PROTOCOL_BINARY_CMD_GET) { + c->currcmd.retstat = MCD_END; + } + break; + + case PROTOCOL_BINARY_RESPONSE_ENOMEM: + errstr = "Out of memory"; + c->currcmd.retstat = MCD_SERVER_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: + errstr = "Unknown command"; + c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; + break; + + case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: + errstr = "Not found"; + c->currcmd.retstat = MCD_NOTFOUND; + break; + + case PROTOCOL_BINARY_RESPONSE_EINVAL: + errstr = "Invalid arguments"; + c->currcmd.retstat = MCD_PROTOCOL_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: + errstr = "Data exists for key."; + break; + + case PROTOCOL_BINARY_RESPONSE_E2BIG: + errstr = "Too large."; + c->currcmd.retstat = MCD_SERVER_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_NOT_STORED: + errstr = "Not stored."; + c->currcmd.retstat = MCD_NOTSTORED; + break; + + default: + errstr = "Unknown error"; + c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; + break; + } /* switch */ + + if (errstr) { + fprintf(stderr, "%s\n", errstr); + } + } + + return EXIT_SUCCESS; +} /* ms_bin_process_response */ + +/* build binary header and add the header to the buffer to send */ + +/** + * build binary header and add the header to the buffer to send + * + * @param c, pointer of the concurrency + * @param opcode, operation code + * @param hdr_len, length of header + * @param key_len, length of key + * @param body_len. length of body + */ +static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, uint16_t key_len, + uint32_t body_len) { + protocol_binary_request_header *header; + + assert(c); + + header = (protocol_binary_request_header *) c->wcurr; + + header->request.magic = (uint8_t) PROTOCOL_BINARY_REQ; + header->request.opcode = (uint8_t) opcode; + header->request.keylen = htons(key_len); + + header->request.extlen = (uint8_t) hdr_len; + header->request.datatype = (uint8_t) PROTOCOL_BINARY_RAW_BYTES; + header->request.vbucket = 0; + + header->request.bodylen = htonl(body_len); + header->request.opaque = 0; + header->request.cas = 0; + + ms_add_iov(c, c->wcurr, sizeof(header->request)); +} /* ms_add_bin_header */ + +/** + * add the key to the socket write buffer array + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + */ +static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) { + ms_add_iov(c, (char *) &item->key_prefix, (int) KEY_PREFIX_SIZE); + ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int) KEY_PREFIX_SIZE); +} + +/** + * for binary protocol, this function build the set command + * and add the command to send buffer array. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) { + assert(c->wbuf == c->wcurr); + + int value_offset; + protocol_binary_request_set *rep = (protocol_binary_request_set *) c->wcurr; + uint16_t keylen = (uint16_t) item->key_size; + uint32_t bodylen = + (uint32_t) sizeof(rep->message.body) + (uint32_t) keylen + (uint32_t) item->value_size; + + ms_add_bin_header(c, PROTOCOL_BINARY_CMD_SET, sizeof(rep->message.body), keylen, bodylen); + rep->message.body.flags = 0; + rep->message.body.expiration = htonl((uint32_t) item->exp_time); + ms_add_iov(c, &rep->message.body, sizeof(rep->message.body)); + ms_add_key_to_iov(c, item); + + if (item->value_offset == INVALID_OFFSET) { + value_offset = item->key_suffix_offset; + } else { + value_offset = item->value_offset; + } + ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size); + + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_set */ + +/** + * for binary protocol, this function build the get command and + * add the command to send buffer array. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) { + assert(c->wbuf == c->wcurr); + + ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t) item->key_size, + (uint32_t) item->key_size); + ms_add_key_to_iov(c, item); + + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_get */ + +/** + * for binary protocol, this function build the multi-get + * command and add the command to send buffer array. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_bin_write_buf_mlget(ms_conn_t *c) { + ms_task_item_t *item; + + assert(c->wbuf == c->wcurr); + + for (int i = 0; i < c->mlget_task.mlget_num; i++) { + item = c->mlget_task.mlget_item[i].item; + assert(item); + + ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t) item->key_size, + (uint32_t) item->key_size); + ms_add_key_to_iov(c, item); + c->wcurr += sizeof(protocol_binary_request_get); + } + + c->wcurr = c->wbuf; + + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_mlget */