X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fms_conn.c;h=2afc9fa10f029dc4a3b477317516a5f9ee408fff;hb=1899ec2a3d1f4f180b0109879a966fd6603ece86;hp=69bfaaa967f05c46d29d841630b4e4cb539e10d4;hpb=ea32b463888ecfd7525eb88859cc2bb4af9b24d3;p=awesomized%2Flibmemcached diff --git a/clients/ms_conn.c b/clients/ms_conn.c index 69bfaaa9..2afc9fa1 100644 --- a/clients/ms_conn.c +++ b/clients/ms_conn.c @@ -8,75 +8,116 @@ * http://www.schoonerinfotech.com/ * */ + +#include "mem_config.h" + #include +#include #include #include #include #include #include +#include #include + +#if defined(HAVE_SYS_TIME_H) +# include +#endif + +#if defined(HAVE_TIME_H) +# include +#endif + #include "ms_setting.h" #include "ms_thread.h" +#include "ms_atomic.h" + +#ifdef linux +/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to + * optimize the conversion functions, but the prototypes generate warnings + * from gcc. The conversion methods isn't the bottleneck for my app, so + * just remove the warnings by undef'ing the optimization .. + */ +#undef ntohs +#undef ntohl +#undef htons +#undef htonl +#endif /* for network write */ -#define TRANSMIT_COMPLETE 0 -#define TRANSMIT_INCOMPLETE 1 -#define TRANSMIT_SOFT_ERROR 2 -#define TRANSMIT_HARD_ERROR 3 +#define TRANSMIT_COMPLETE 0 +#define TRANSMIT_INCOMPLETE 1 +#define TRANSMIT_SOFT_ERROR 2 +#define TRANSMIT_HARD_ERROR 3 /* for generating key */ -#define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */ -#define KEY_PREFIX_MASK 0x1010101010101010 +#define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */ +#define KEY_PREFIX_MASK 0x1010101010101010 /* For parse the value length return by server */ -#define KEY_TOKEN 1 -#define VALUELEN_TOKEN 3 +#define KEY_TOKEN 1 +#define VALUELEN_TOKEN 3 /* global increasing counter, to ensure the key prefix unique */ -static uint64_t key_prefix_seq = KEY_PREFIX_BASE; +static uint64_t key_prefix_seq= KEY_PREFIX_BASE; /* global increasing counter, generating request id for UDP */ -static int udp_request_id = 0; +static volatile uint32_t udp_request_id= 0; -extern __thread ms_thread_t ms_thread; +extern pthread_key_t ms_thread_key; /* generate upd request id */ -static int ms_get_udp_request_id(void); +static uint32_t ms_get_udp_request_id(void); + /* connect initialize */ static void ms_task_init(ms_conn_t *c); static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp); static int ms_conn_sock_init(ms_conn_t *c); static int ms_conn_event_init(ms_conn_t *c); -static int ms_conn_init(ms_conn_t *c, const int init_state, - const int read_buffer_size, const bool is_udp); +static int ms_conn_init(ms_conn_t *c, + const int init_state, + const int read_buffer_size, + const bool is_udp); static void ms_warmup_num_init(ms_conn_t *c); static int ms_item_win_init(ms_conn_t *c); + /* connection close */ void ms_conn_free(ms_conn_t *c); static void ms_conn_close(ms_conn_t *c); + /* create network connection */ static int ms_new_socket(struct addrinfo *ai); static void ms_maximize_sndbuf(const int sfd); -static int ms_network_connect(ms_conn_t *c, char * srv_host_name, - const int srv_port, const bool is_udp, int *ret_sfd); +static int ms_network_connect(ms_conn_t *c, + char *srv_host_name, + const int srv_port, + const bool is_udp, + int *ret_sfd); static int ms_reconn(ms_conn_t *c); + /* read and parse */ -static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens); +static int ms_tokenize_command(char *command, + token_t *tokens, + const int max_tokens); static int ms_ascii_process_line(ms_conn_t *c, char *command); static int ms_try_read_line(ms_conn_t *c); static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes); static int ms_udp_read(ms_conn_t *c, char *buf, int len); static int ms_try_read_network(ms_conn_t *c); -static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, - char *value, int vlen); +static void ms_verify_value(ms_conn_t *c, + ms_mlget_task_item_t *mlget_item, + char *value, + int vlen); static void ms_ascii_complete_nread(ms_conn_t *c); static void ms_bin_complete_nread(ms_conn_t *c); static void ms_complete_nread(ms_conn_t *c); + /* send functions */ static int ms_add_msghdr(ms_conn_t *c); static int ms_ensure_iov_space(ms_conn_t *c); @@ -84,34 +125,42 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len); static int ms_build_udp_headers(ms_conn_t *c); static int ms_transmit(ms_conn_t *c); + /* status adjustment */ static void ms_conn_shrink(ms_conn_t *c); static void ms_conn_set_state(ms_conn_t *c, int state); static bool ms_update_event(ms_conn_t *c, const int new_flags); -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd); -static int ms_get_next_sock_index(ms_conn_t *c); +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd); +static uint32_t ms_get_next_sock_index(ms_conn_t *c); static int ms_update_conn_sock_event(ms_conn_t *c); static bool ms_need_yield(ms_conn_t *c); static void ms_update_start_time(ms_conn_t *c); + /* main loop */ static void ms_drive_machine(ms_conn_t *c); void ms_event_handler(const int fd, const short which, void *arg); + /* ascii protocol */ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item); static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item); static int ms_build_ascii_write_buf_mlget(ms_conn_t *c); + /* binary protocol */ static int ms_bin_process_response(ms_conn_t *c); -static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, - uint16_t key_len, uint32_t body_len); +static void ms_add_bin_header(ms_conn_t *c, + uint8_t opcode, + uint8_t hdr_len, + uint16_t key_len, + uint32_t body_len); static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item); static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item); static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item); static int ms_build_bin_write_buf_mlget(ms_conn_t *c); + /** * each key has two parts, prefix and suffix. The suffix is a * string random get form the character table. The prefix is a @@ -123,27 +172,29 @@ static int ms_build_bin_write_buf_mlget(ms_conn_t *c); */ uint64_t ms_get_key_prefix(void) { - uint64_t key_prefix; + uint64_t key_prefix; - pthread_mutex_lock(&ms_global.seq_mutex); - key_prefix_seq |= KEY_PREFIX_MASK; - key_prefix = key_prefix_seq; - key_prefix_seq++; - pthread_mutex_unlock(&ms_global.seq_mutex); + pthread_mutex_lock(&ms_global.seq_mutex); + key_prefix_seq|= KEY_PREFIX_MASK; + key_prefix= key_prefix_seq; + key_prefix_seq++; + pthread_mutex_unlock(&ms_global.seq_mutex); + + return key_prefix; +} /* ms_get_key_prefix */ - return key_prefix; -} /** * get an unique udp request id * * @return an unique UDP request id */ -static int ms_get_udp_request_id(void) +static uint32_t ms_get_udp_request_id(void) { - return(__sync_fetch_and_add(&udp_request_id, 1)); + return atomic_add_32_nv(&udp_request_id, 1); } + /** * initialize current task structure * @@ -151,19 +202,20 @@ static int ms_get_udp_request_id(void) */ static void ms_task_init(ms_conn_t *c) { - c->curr_task.cmd = CMD_NULL; - c->curr_task.item = 0; - c->curr_task.verify = false; - c->curr_task.finish_verify = true; - c->curr_task.get_miss = true; - - c->curr_task.get_opt = 0; - c->curr_task.set_opt = 0; - c->curr_task.cycle_undo_get = 0; - c->curr_task.cycle_undo_set = 0; - c->curr_task.verified_get = 0; - c->curr_task.overwrite_set = 0; -} + c->curr_task.cmd= CMD_NULL; + c->curr_task.item= 0; + c->curr_task.verify= false; + c->curr_task.finish_verify= true; + c->curr_task.get_miss= true; + + c->curr_task.get_opt= 0; + c->curr_task.set_opt= 0; + c->curr_task.cycle_undo_get= 0; + c->curr_task.cycle_undo_set= 0; + c->curr_task.verified_get= 0; + c->curr_task.overwrite_set= 0; +} /* ms_task_init */ + /** * initialize udp for the connection structure @@ -171,40 +223,45 @@ static void ms_task_init(ms_conn_t *c) * @param c, pointer of the concurrency * @param is_udp, whether it's udp * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) { - c->hdrbuf = 0; - c->rudpbuf = 0; - c->udppkt = 0; - - c->rudpsize = UDP_DATA_BUFFER_SIZE; - c->hdrsize = 0; - - c->rudpbytes = 0; - c->packets = 0; - c->recvpkt = 0; - c->pktcurr = 0; - c->ordcurr = 0; - - c->udp = is_udp; - - if (c->udp || (!c->udp && ms_setting.facebook_test)) { - c->rudpbuf = (char *)malloc((size_t)c->rudpsize); - c->udppkt = (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t)); - - if (c->rudpbuf == NULL || c->udppkt == NULL) { - if (c->rudpbuf != NULL) free(c->rudpbuf); - if (c->udppkt != NULL) free(c->udppkt); - fprintf(stderr, "malloc()\n"); - return -1; - } - memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t)); + c->hdrbuf= 0; + c->rudpbuf= 0; + c->udppkt= 0; + + c->rudpsize= UDP_DATA_BUFFER_SIZE; + c->hdrsize= 0; + + c->rudpbytes= 0; + c->packets= 0; + c->recvpkt= 0; + c->pktcurr= 0; + c->ordcurr= 0; + + c->udp= is_udp; + + if (c->udp || (! c->udp && ms_setting.facebook_test)) + { + c->rudpbuf= (char *)malloc((size_t)c->rudpsize); + c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t)); + + if ((c->rudpbuf == NULL) || (c->udppkt == NULL)) + { + if (c->rudpbuf != NULL) + free(c->rudpbuf); + if (c->udppkt != NULL) + free(c->udppkt); + fprintf(stderr, "malloc()\n"); + return -1; } + memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t)); + } + + return EXIT_SUCCESS; +} /* ms_conn_udp_init */ - return 0; -} /** * initialize the connection structure @@ -214,99 +271,118 @@ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) * @param read_buffer_size * @param is_udp, whether it's udp * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ -static int ms_conn_init(ms_conn_t *c, const int init_state, - const int read_buffer_size, const bool is_udp) +static int ms_conn_init(ms_conn_t *c, + const int init_state, + const int read_buffer_size, + const bool is_udp) { - assert(c != NULL); - - c->rbuf = c->wbuf = 0; - c->iov = 0; - c->msglist = 0; - - c->rsize = read_buffer_size; - c->wsize = WRITE_BUFFER_SIZE; - c->iovsize = IOV_LIST_INITIAL; - c->msgsize = MSG_LIST_INITIAL; - - /* for replication, each connection need connect all the server */ - if (ms_setting.rep_write_srv > 0) { - c->total_sfds = ms_setting.srv_cnt; - } else { - c->total_sfds = ms_setting.sock_per_conn; - } - c->alive_sfds = 0; - - c->rbuf = (char *)malloc((size_t)c->rsize); - c->wbuf = (char *)malloc((size_t)c->wsize); - c->iov = (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize); - c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * (size_t)c->msgsize); - if (ms_setting.mult_key_num > 1) { - c->mlget_task.mlget_item = (ms_mlget_task_item_t *) - malloc(sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num); - } - c->tcpsfd = (int *)malloc((size_t)c->total_sfds * sizeof(int)); - - if (c->rbuf == NULL || c->wbuf == NULL || c->iov == NULL - || c->msglist == NULL || c->tcpsfd == NULL - || (ms_setting.mult_key_num > 1 && c->mlget_task.mlget_item == NULL)) { - - if (c->rbuf != NULL) free(c->rbuf); - if (c->wbuf != NULL) free(c->wbuf); - if (c->iov != NULL) free(c->iov); - if (c->msglist != NULL) free(c->msglist); - if (c->mlget_task.mlget_item != NULL) free(c->mlget_task.mlget_item); - if (c->tcpsfd != NULL) free(c->tcpsfd); - fprintf(stderr, "malloc()\n"); - return -1; - } - - c->state = init_state; - c->rvbytes = 0; - c->rbytes = 0; - c->rcurr = c->rbuf; - c->wcurr = c->wbuf; - c->iovused = 0; - c->msgcurr = 0; - c->msgused = 0; - c->cur_idx = c->total_sfds; /* default index is a invalid value */ - - c->ctnwrite = false; - c->readval = false; - c->change_sfd = false; - - c->precmd.cmd = c->currcmd.cmd = CMD_NULL; - c->precmd.isfinish = true; /* default the previous command finished */ - c->currcmd.isfinish = false; - c->precmd.retstat = c->currcmd.retstat = MCD_FAILURE; - c->precmd.key_prefix = c->currcmd.key_prefix = 0; - - c->mlget_task.mlget_num = 0; - c->mlget_task.value_index = -1; /* default invalid value */ - - if (ms_setting.binary_prot) { - c->protocol = binary_prot; - } else if (is_udp) { - c->protocol = ascii_udp_prot; - } else { - c->protocol = ascii_prot; - } + assert(c != NULL); + + c->rbuf= c->wbuf= 0; + c->iov= 0; + c->msglist= 0; + + c->rsize= read_buffer_size; + c->wsize= WRITE_BUFFER_SIZE; + c->iovsize= IOV_LIST_INITIAL; + c->msgsize= MSG_LIST_INITIAL; + + /* for replication, each connection need connect all the server */ + if (ms_setting.rep_write_srv > 0) + { + c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn; + } + else + { + c->total_sfds= ms_setting.sock_per_conn; + } + c->alive_sfds= 0; + + c->rbuf= (char *)malloc((size_t)c->rsize); + c->wbuf= (char *)malloc((size_t)c->wsize); + c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize); + c->msglist= (struct msghdr *)malloc( + sizeof(struct msghdr) * (size_t)c->msgsize); + if (ms_setting.mult_key_num > 1) + { + c->mlget_task.mlget_item= (ms_mlget_task_item_t *) + malloc( + sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num); + } + c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int)); + + if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL) + || (c->msglist == NULL) || (c->tcpsfd == NULL) + || ((ms_setting.mult_key_num > 1) + && (c->mlget_task.mlget_item == NULL))) + { + if (c->rbuf != NULL) + free(c->rbuf); + if (c->wbuf != NULL) + free(c->wbuf); + if (c->iov != NULL) + free(c->iov); + if (c->msglist != NULL) + free(c->msglist); + if (c->mlget_task.mlget_item != NULL) + free(c->mlget_task.mlget_item); + if (c->tcpsfd != NULL) + free(c->tcpsfd); + fprintf(stderr, "malloc()\n"); + return -1; + } + + c->state= init_state; + c->rvbytes= 0; + c->rbytes= 0; + c->rcurr= c->rbuf; + c->wcurr= c->wbuf; + c->iovused= 0; + c->msgcurr= 0; + c->msgused= 0; + c->cur_idx= c->total_sfds; /* default index is a invalid value */ + + c->ctnwrite= false; + c->readval= false; + c->change_sfd= false; + + c->precmd.cmd= c->currcmd.cmd= CMD_NULL; + c->precmd.isfinish= true; /* default the previous command finished */ + c->currcmd.isfinish= false; + c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE; + c->precmd.key_prefix= c->currcmd.key_prefix= 0; + + c->mlget_task.mlget_num= 0; + c->mlget_task.value_index= -1; /* default invalid value */ + + if (ms_setting.binary_prot_) + { + c->protocol= binary_prot; + } + else + { + c->protocol= ascii_prot; + } + + /* initialize udp */ + if (ms_conn_udp_init(c, is_udp) != 0) + { + return -1; + } - /* initialize udp */ - if (ms_conn_udp_init(c, is_udp) != 0) { - return -1; - } + /* initialize task */ + ms_task_init(c); - /* initialize task */ - ms_task_init(c); + if (! (ms_setting.facebook_test && is_udp)) + { + atomic_add_32(&ms_stats.active_conns, 1); + } - if (!(ms_setting.facebook_test && is_udp)) { - __sync_fetch_and_add(&ms_stats.active_conns, 1); - } + return EXIT_SUCCESS; +} /* ms_conn_init */ - return 0; -} /** * when doing 100% get operation, it could preset some objects @@ -317,15 +393,19 @@ static int ms_conn_init(ms_conn_t *c, const int init_state, */ static void ms_warmup_num_init(ms_conn_t *c) { - /* no set operation, preset all the items in the window */ - if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) { - c->warmup_num = c->win_size; - c->remain_warmup_num = c->warmup_num; - } else { - c->warmup_num = 0; - c->remain_warmup_num = c->warmup_num; - } -} + /* no set operation, preset all the items in the window */ + if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) + { + c->warmup_num= c->win_size; + c->remain_warmup_num= c->warmup_num; + } + else + { + c->warmup_num= 0; + c->remain_warmup_num= c->warmup_num; + } +} /* ms_warmup_num_init */ + /** * each connection has an item window, this function initialize @@ -333,45 +413,53 @@ static void ms_warmup_num_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_item_win_init(ms_conn_t *c) { - int exp_cnt = 0; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + int exp_cnt= 0; + + c->win_size= (int)ms_setting.win_size; + c->set_cursor= 0; + c->exec_num= ms_thread->thread_ctx->exec_num_perconn; + c->remain_exec_num= c->exec_num; + + c->item_win= (ms_task_item_t *)malloc( + sizeof(ms_task_item_t) * (size_t)c->win_size); + if (c->item_win == NULL) + { + fprintf(stderr, "Can't allocate task item array for conn.\n"); + return -1; + } + memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size); - c->win_size = (int)ms_setting.win_size; - c->set_cursor = 0; - c->exec_num = ms_thread.thread_ctx->exec_num_perconn; - c->remain_exec_num = c->exec_num; + for (int i= 0; i < c->win_size; i++) + { + c->item_win[i].key_size= (int)ms_setting.distr[i].key_size; + c->item_win[i].key_prefix= ms_get_key_prefix(); + c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset; + c->item_win[i].value_size= (int)ms_setting.distr[i].value_size; + c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */ + c->item_win[i].client_time= 0; - c->item_win = (ms_task_item_t *)malloc(sizeof(ms_task_item_t) * (size_t)c->win_size); - if (c->item_win == NULL) { - fprintf(stderr, "Can't allocate task item array for conn.\n"); - return -1; + /* set expire time base on the proportion */ + if (exp_cnt < ms_setting.exp_ver_per * i) + { + c->item_win[i].exp_time= FIXED_EXPIRE_TIME; + exp_cnt++; } - memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size); - - for (int i = 0; i < c->win_size; i++) { - c->item_win[i].key_size = (int)ms_setting.distr[i].key_size; - c->item_win[i].key_prefix = ms_get_key_prefix(); - c->item_win[i].key_suffix_offset = ms_setting.distr[i].key_offset; - c->item_win[i].value_size = (int)ms_setting.distr[i].value_size; - c->item_win[i].value_offset = INVALID_OFFSET; /* default in invalid offset */ - c->item_win[i].client_time = 0; - - /* set expire time base on the proportion */ - if (exp_cnt < ms_setting.exp_ver_per * i) { - c->item_win[i].exp_time = FIXED_EXPIRE_TIME; - exp_cnt++; - } else { - c->item_win[i].exp_time = 0; - } + else + { + c->item_win[i].exp_time= 0; } + } - ms_warmup_num_init(c); + ms_warmup_num_init(c); + + return EXIT_SUCCESS; +} /* ms_item_win_init */ - return 0; -} /** * each connection structure can include one or more sock @@ -380,74 +468,93 @@ static int ms_item_win_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_sock_init(ms_conn_t *c) { - int i; - int ret_sfd; - int srv_idx = 0; - - assert(c != NULL); - assert(c->tcpsfd != NULL); - - for (i = 0; i < c->total_sfds; i++) { - ret_sfd = 0; - if (ms_setting.rep_write_srv > 0) { - /* for replication, each connection need connect all the server */ - srv_idx = i; - } else { - /* all the connections in a thread connects the same server */ - srv_idx = ms_thread.thread_ctx->srv_idx; - } + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t i; + int ret_sfd; + uint32_t srv_idx= 0; - if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, - ms_setting.udp, &ret_sfd) != 0) { - break; - } + assert(c != NULL); + assert(c->tcpsfd != NULL); - if (i == 0) { - c->sfd = ret_sfd; - } + for (i= 0; i < c->total_sfds; i++) + { + ret_sfd= 0; + if (ms_setting.rep_write_srv > 0) + { + /* for replication, each connection need connect all the server */ + srv_idx= i % ms_setting.srv_cnt; + } + else + { + /* all the connections in a thread connects the same server */ + srv_idx= ms_thread->thread_ctx->srv_idx; + } - if (!ms_setting.udp) { - c->tcpsfd[i] = ret_sfd; - } + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, + ms_setting.udp, &ret_sfd) != 0) + { + break; + } - c->alive_sfds++; + if (i == 0) + { + c->sfd= ret_sfd; } - /* initialize udp sock handler if necessary */ - if (ms_setting.facebook_test) { - ret_sfd = 0; - if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, - true, &ret_sfd) != 0) { - c->udpsfd = 0; - } else { - c->udpsfd = ret_sfd; - } + if (! ms_setting.udp) + { + c->tcpsfd[i]= ret_sfd; } - if (i != c->total_sfds || (ms_setting.facebook_test && c->udpsfd == 0)) { - if (ms_setting.udp) { - close(c->sfd); - } else { - for (int j = 0; j < i; j++) { - close(c->tcpsfd[j]); - } - } + c->alive_sfds++; + } - if (c->udpsfd != 0) { - close(c->udpsfd); - } + /* initialize udp sock handler if necessary */ + if (ms_setting.facebook_test) + { + ret_sfd= 0; + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, + true, &ret_sfd) != 0) + { + c->udpsfd= 0; + } + else + { + c->udpsfd= ret_sfd; + } + } - return -1; + if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0))) + { + if (ms_setting.udp) + { + close(c->sfd); + } + else + { + for (uint32_t j= 0; j < i; j++) + { + close(c->tcpsfd[j]); + } } - return 0; -} + if (c->udpsfd != 0) + { + close(c->udpsfd); + } + + return -1; + } + + return EXIT_SUCCESS; +} /* ms_conn_sock_init */ + /** * each connection is managed by libevent, this function @@ -455,30 +562,25 @@ static int ms_conn_sock_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_event_init(ms_conn_t *c) { - /* default event timeout 10 seconds */ - struct timeval t = {.tv_sec = EVENT_TIMEOUT, .tv_usec = 0}; - short event_flags = EV_WRITE | EV_PERSIST; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + short event_flags= EV_WRITE | EV_PERSIST; - event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); - event_base_set(ms_thread.base, &c->event); - c->ev_flags = event_flags; + event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); + event_base_set(ms_thread->base, &c->event); + c->ev_flags= event_flags; - if (c->total_sfds == 1) { - if (event_add(&c->event, NULL) == -1) { - return -1; - } - } else { - if (event_add(&c->event, &t) == -1) { - return -1; - } - } + if (event_add(&c->event, NULL) == -1) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_conn_event_init */ - return 0; -} /** * setup a connection, each connection structure of each @@ -486,28 +588,33 @@ static int ms_conn_event_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_setup_conn(ms_conn_t *c) { - if (ms_item_win_init(c) != 0) { - return -1; - } + if (ms_item_win_init(c) != 0) + { + return -1; + } - if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0) { - return -1; - } + if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0) + { + return -1; + } - if (ms_conn_sock_init(c) != 0) { - return -1; - } + if (ms_conn_sock_init(c) != 0) + { + return -1; + } - if (ms_conn_event_init(c) != 0) { - return -1; - } + if (ms_conn_event_init(c) != 0) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_setup_conn */ - return 0; -} /** * Frees a connection. @@ -516,33 +623,37 @@ int ms_setup_conn(ms_conn_t *c) */ void ms_conn_free(ms_conn_t *c) { - if (c != NULL) { - if (c->hdrbuf != NULL) - free(c->hdrbuf); - if (c->msglist != NULL) - free(c->msglist); - if (c->rbuf != NULL) - free(c->rbuf); - if (c->wbuf != NULL) - free(c->wbuf); - if (c->iov != NULL) - free(c->iov); - if (c->mlget_task.mlget_item != NULL) - free(c->mlget_task.mlget_item); - if (c->rudpbuf != NULL) - free(c->rudpbuf); - if (c->udppkt != NULL) - free(c->udppkt); - if (c->item_win != NULL) - free(c->item_win); - if (c->tcpsfd != NULL) - free(c->tcpsfd); - - if (--ms_thread.nactive_conn == 0) { - free(ms_thread.conn); - } - } -} + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + if (c != NULL) + { + if (c->hdrbuf != NULL) + free(c->hdrbuf); + if (c->msglist != NULL) + free(c->msglist); + if (c->rbuf != NULL) + free(c->rbuf); + if (c->wbuf != NULL) + free(c->wbuf); + if (c->iov != NULL) + free(c->iov); + if (c->mlget_task.mlget_item != NULL) + free(c->mlget_task.mlget_item); + if (c->rudpbuf != NULL) + free(c->rudpbuf); + if (c->udppkt != NULL) + free(c->udppkt); + if (c->item_win != NULL) + free(c->item_win); + if (c->tcpsfd != NULL) + free(c->tcpsfd); + + if (--ms_thread->nactive_conn == 0) + { + free(ms_thread->conn); + } + } +} /* ms_conn_free */ + /** * close a connection @@ -551,56 +662,65 @@ void ms_conn_free(ms_conn_t *c) */ static void ms_conn_close(ms_conn_t *c) { - assert(c != NULL); + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + assert(c != NULL); - /* delete the event, the socket and the connection */ - event_del(&c->event); + /* delete the event, the socket and the connection */ + event_del(&c->event); - for (int i = 0; i < c->total_sfds; i++) { - if (c->tcpsfd[i] > 0) { - close(c->tcpsfd[i]); - } + for (uint32_t i= 0; i < c->total_sfds; i++) + { + if (c->tcpsfd[i] > 0) + { + close(c->tcpsfd[i]); } - c->sfd = 0; + } + c->sfd= 0; - if (ms_setting.facebook_test) { - close(c->udpsfd); - } + if (ms_setting.facebook_test) + { + close(c->udpsfd); + } - __sync_fetch_and_sub(&ms_stats.active_conns, 1); + atomic_dec_32(&ms_stats.active_conns); - ms_conn_free(c); + ms_conn_free(c); - if (ms_setting.run_time == 0) { - pthread_mutex_lock(&ms_global.run_lock.lock); - ms_global.run_lock.count++; - pthread_cond_signal(&ms_global.run_lock.cond); - pthread_mutex_unlock(&ms_global.run_lock.lock); - } + if (ms_setting.run_time == 0) + { + pthread_mutex_lock(&ms_global.run_lock.lock); + ms_global.run_lock.count++; + pthread_cond_signal(&ms_global.run_lock.cond); + pthread_mutex_unlock(&ms_global.run_lock.lock); + } + + if (ms_thread->nactive_conn == 0) + { + pthread_exit(NULL); + } +} /* ms_conn_close */ - if (ms_thread.nactive_conn == 0) { - pthread_exit(NULL); - } -} /** * create a new sock * * @param ai, server address information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_new_socket(struct addrinfo *ai) { - int sfd; + int sfd; - if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) { - fprintf(stderr, "socket() error: %s.\n", strerror(errno)); - return -1; - } + if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) + { + fprintf(stderr, "socket() error: %s.\n", strerror(errno)); + return -1; + } + + return sfd; +} /* ms_new_socket */ - return sfd; -} /** * Sets a socket's send buffer size to the maximum allowed by the system. @@ -609,31 +729,38 @@ static int ms_new_socket(struct addrinfo *ai) */ static void ms_maximize_sndbuf(const int sfd) { - socklen_t intsize = sizeof(int); - unsigned int last_good = 0; - unsigned int min, max, avg; - unsigned int old_size; - - /* Start with the default size. */ - if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) { - fprintf(stderr, "getsockopt(SO_SNDBUF)\n"); - return; - } - - /* Binary-search for the real maximum. */ - min = old_size; - max = MAX_SENDBUF_SIZE; - - while (min <= max) { - avg = ((unsigned int)(min + max)) / 2; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) { - last_good = avg; - min = avg + 1; - } else { - max = avg - 1; - } + socklen_t intsize= sizeof(int); + unsigned int last_good= 0; + unsigned int min, max, avg; + unsigned int old_size; + + /* Start with the default size. */ + if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) + { + fprintf(stderr, "getsockopt(SO_SNDBUF)\n"); + return; + } + + /* Binary-search for the real maximum. */ + min= old_size; + max= MAX_SENDBUF_SIZE; + + while (min <= max) + { + avg= ((unsigned int)(min + max)) / 2; + if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) + { + last_good= avg; + min= avg + 1; } -} + else + { + max= avg - 1; + } + } + (void)last_good; +} /* ms_maximize_sndbuf */ + /** * socket connects the server @@ -644,176 +771,222 @@ static void ms_maximize_sndbuf(const int sfd) * @param is_udp, whether it's udp * @param ret_sfd, the connected socket file descriptor * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ -static int ms_network_connect(ms_conn_t *c, char * srv_host_name, - const int srv_port, const bool is_udp, int *ret_sfd) +static int ms_network_connect(ms_conn_t *c, + char *srv_host_name, + const int srv_port, + const bool is_udp, + int *ret_sfd) { - int sfd; - struct linger ling = {0, 0}; - struct addrinfo *ai; - struct addrinfo *next; - struct addrinfo hints; - char port_buf[NI_MAXSERV]; - int error; - int success = 0; - - int flags = 1; - - /* - * the memset call clears nonstandard fields in some impementations - * that otherwise mess things up. - */ - memset(&hints, 0, sizeof (hints)); - hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; - if (is_udp) { - hints.ai_protocol = IPPROTO_UDP; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_family = AF_INET; /* This left here because of issues with OSX 10.5 */ - } else { - hints.ai_family = AF_UNSPEC; - hints.ai_protocol = IPPROTO_TCP; - hints.ai_socktype = SOCK_STREAM; - } - - snprintf(port_buf, NI_MAXSERV, "%d", srv_port); - error= getaddrinfo(srv_host_name, port_buf, &hints, &ai); - if (error != 0) { - if (error != EAI_SYSTEM) - fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error)); - else - perror("getaddrinfo()\n"); + int sfd; + struct linger ling= + { + 0, 0 + }; + struct addrinfo *ai; + struct addrinfo *next; + struct addrinfo hints; + char port_buf[NI_MAXSERV]; + int error; + int success= 0; + + int flags= 1; + + /* + * the memset call clears nonstandard fields in some impementations + * that otherwise mess things up. + */ + memset(&hints, 0, sizeof(hints)); +#ifdef AI_ADDRCONFIG + hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG; +#else + hints.ai_flags= AI_PASSIVE; +#endif /* AI_ADDRCONFIG */ + if (is_udp) + { + hints.ai_protocol= IPPROTO_UDP; + hints.ai_socktype= SOCK_DGRAM; + hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */ + } + else + { + hints.ai_family= AF_UNSPEC; + hints.ai_protocol= IPPROTO_TCP; + hints.ai_socktype= SOCK_STREAM; + } + + snprintf(port_buf, NI_MAXSERV, "%d", srv_port); + error= getaddrinfo(srv_host_name, port_buf, &hints, &ai); + if (error != 0) + { + if (error != EAI_SYSTEM) + fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error)); + else + perror("getaddrinfo()\n"); + return -1; + } + + for (next= ai; next; next= next->ai_next) + { + if ((sfd= ms_new_socket(next)) == -1) + { + freeaddrinfo(ai); + return -1; + } + + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); + if (is_udp) + { + ms_maximize_sndbuf(sfd); + } + else + { + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, + sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, + sizeof(flags)); + } + + if (is_udp) + { + c->srv_recv_addr_size= sizeof(struct sockaddr); + memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size); + } + else + { + if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) + { + close(sfd); + freeaddrinfo(ai); return -1; + } } - for (next = ai; next; next = next->ai_next) { - if ((sfd = ms_new_socket(next)) == -1) { - freeaddrinfo(ai); - return -1; - } - - setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); - if (is_udp) { - ms_maximize_sndbuf(sfd); - } else { - setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); - setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); - setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); - } - - if (is_udp) { - c->srv_recv_addr_size = sizeof(struct sockaddr); - memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size); - } else { - if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) { - close(sfd); - freeaddrinfo(ai); - return -1; - } - } + if (((flags= fcntl(sfd, F_GETFL, 0)) < 0) + || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)) + { + fprintf(stderr, "setting O_NONBLOCK\n"); + close(sfd); + freeaddrinfo(ai); + return -1; + } - if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || - fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { - fprintf(stderr, "setting O_NONBLOCK\n"); - close(sfd); - freeaddrinfo(ai); - return -1; - } + if (ret_sfd != NULL) + { + *ret_sfd= sfd; + } - if (ret_sfd != NULL) { - *ret_sfd = sfd; - } + success++; + } - success++; - } + freeaddrinfo(ai); - freeaddrinfo(ai); + /* Return zero if we detected no errors in starting up connections */ + return success == 0; +} /* ms_network_connect */ - /* Return zero if we detected no errors in starting up connections */ - return success == 0; -} /** * reconnect a disconnected sock * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_reconn(ms_conn_t *c) { - int srv_idx = 0; - int srv_conn_cnt = 0; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t srv_idx= 0; + uint32_t srv_conn_cnt= 0; + + if (ms_setting.rep_write_srv > 0) + { + srv_idx= c->cur_idx % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; + } + else + { + srv_idx= ms_thread->thread_ctx->srv_idx; + srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; + } + + /* close the old socket handler */ + close(c->sfd); + c->tcpsfd[c->cur_idx]= 0; + + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) + % srv_conn_cnt == 0) + { + gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); + fprintf(stderr, "Server %s:%d disconnect\n", + ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port); + } + + if (ms_setting.rep_write_srv > 0) + { + uint32_t i= 0; + + for (i= 0; i < c->total_sfds; i++) + { + if (c->tcpsfd[i] != 0) + { + break; + } + } + + /* all socks disconnect */ + if (i == c->total_sfds) + { + return -1; + } + } + else + { + do + { + /* reconnect success, break the loop */ + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, + ms_setting.udp, &c->sfd) == 0) + { + c->tcpsfd[c->cur_idx]= c->sfd; + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) + { + gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); + int reconn_time= + (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec + - ms_setting.servers[srv_idx].disconn_time + .tv_sec); + fprintf(stderr, "Server %s:%d reconnect after %ds\n", + ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, reconn_time); + } + break; + } - if (ms_setting.rep_write_srv > 0) { - srv_idx = c->cur_idx; - srv_conn_cnt = ms_setting.nconns; - } else { - srv_idx = ms_thread.thread_ctx->srv_idx; - srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt; + if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) + { + /* wait a second and reconnect */ + sleep(1); + } } + while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); + } - /* close the old socket handler */ - close(c->sfd); - c->tcpsfd[c->cur_idx] = 0; + if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) + { + c->sfd= 0; + c->alive_sfds--; + } - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1) - % srv_conn_cnt == 0) { + return EXIT_SUCCESS; +} /* ms_reconn */ - gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); - fprintf(stderr, "Server %s:%d disconnect\n", - ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port); - } - - if (ms_setting.rep_write_srv > 0) { - int i = 0; - for (i = 0; i < c->total_sfds; i++) { - if (c->tcpsfd[i] != 0) { - break; - } - } - - /* all socks disconnect */ - if (i == c->total_sfds) { - return -1; - } - } else { - do { - /* reconnect success, break the loop */ - if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, - ms_setting.udp, &c->sfd) == 0) { - - c->tcpsfd[c->cur_idx] = c->sfd; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % srv_conn_cnt == 0) { - - gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); - int reconn_time = (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec - - ms_setting.servers[srv_idx].disconn_time.tv_sec); - fprintf(stderr, "Server %s:%d reconnect after %ds\n", - ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, reconn_time); - } - break; - } - - if (c->total_sfds == 1) { - /* wait a second and reconnect */ - sleep(1); - } - } while (c->total_sfds == 1); - } - - if (c->total_sfds > 1 && c->tcpsfd[c->cur_idx] == 0) { - c->sfd = 0; - c->alive_sfds--; - } - - return 0; -} /** * reconnect several disconnected socks in the connection @@ -823,66 +996,78 @@ static int ms_reconn(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_reconn_socks(ms_conn_t *c) { - int srv_idx = 0; - int ret_sfd = 0; - int srv_conn_cnt = 0; - struct timeval cur_time; - - assert(c != NULL); + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t srv_idx= 0; + int ret_sfd= 0; + uint32_t srv_conn_cnt= 0; + struct timeval cur_time; + + assert(c != NULL); + + if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds)) + { + return EXIT_SUCCESS; + } + + for (uint32_t i= 0; i < c->total_sfds; i++) + { + if (c->tcpsfd[i] == 0) + { + gettimeofday(&cur_time, NULL); + + /** + * For failover test of replication, reconnect the socks after + * it disconnects more than 5 seconds, Otherwise memslap will + * block at connect() function and the work threads can't work + * in this interval. + */ + if (cur_time.tv_sec + - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) + { + break; + } + + if (ms_setting.rep_write_srv > 0) + { + srv_idx= i % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; + } + else + { + srv_idx= ms_thread->thread_ctx->srv_idx; + srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; + } + + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, + ms_setting.udp, &ret_sfd) == 0) + { + c->tcpsfd[i]= ret_sfd; + c->alive_sfds++; - if (c->total_sfds == 1 || c->total_sfds == c->alive_sfds) { - return 0; - } - - for (int i = 0; i < c->total_sfds; i++) { - if (c->tcpsfd[i] == 0) { - gettimeofday(&cur_time, NULL); - - /** - * For failover test of replication, reconnect the socks after - * it disconnects more than 5 seconds, Otherwise memslap will - * block at connect() function and the work threads can't work - * in this interval. - */ - if (cur_time.tv_sec - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) { - break; - } - - if (ms_setting.rep_write_srv > 0) { - srv_idx = i; - srv_conn_cnt = ms_setting.nconns; - } else { - srv_idx = ms_thread.thread_ctx->srv_idx; - srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt; - } - - if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, - ms_setting.udp, &ret_sfd) == 0) { - - c->tcpsfd[i] = ret_sfd; - c->alive_sfds++; - - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % srv_conn_cnt == 0) { - - gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); - int reconn_time = (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec - - ms_setting.servers[srv_idx].disconn_time.tv_sec); - fprintf(stderr, "Server %s:%d reconnect after %ds\n", - ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, reconn_time); - } - } + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) + { + gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); + int reconn_time= + (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec + - ms_setting.servers[srv_idx].disconn_time + .tv_sec); + fprintf(stderr, "Server %s:%d reconnect after %ds\n", + ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, reconn_time); } + } } + } + + return EXIT_SUCCESS; +} /* ms_reconn_socks */ - return 0; -} /** * Tokenize the command string by replacing whitespace with '\0' and update @@ -907,35 +1092,44 @@ int ms_reconn_socks(ms_conn_t *c) * * @return int, the number of tokens */ -static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens) +static int ms_tokenize_command(char *command, + token_t *tokens, + const int max_tokens) { - char *s, *e; - int ntokens = 0; - - assert(command != NULL && tokens != NULL && max_tokens > 1); - - for (s = e = command; ntokens < max_tokens - 1; ++e) { - if (*e == ' ') { - if (s != e) { - tokens[ntokens].value = s; - tokens[ntokens].length = (size_t)(e - s); - ntokens++; - *e = '\0'; - } - s = e + 1; - } else if (*e == '\0') { - if (s != e) { - tokens[ntokens].value = s; - tokens[ntokens].length = (size_t)(e - s); - ntokens++; - } - - break; /* string end */ - } + char *s, *e; + int ntokens= 0; + + assert(command != NULL && tokens != NULL && max_tokens > 1); + + for (s= e= command; ntokens < max_tokens - 1; ++e) + { + if (*e == ' ') + { + if (s != e) + { + tokens[ntokens].value= s; + tokens[ntokens].length= (size_t)(e - s); + ntokens++; + *e= '\0'; + } + s= e + 1; } + else if (*e == '\0') + { + if (s != e) + { + tokens[ntokens].value= s; + tokens[ntokens].length= (size_t)(e - s); + ntokens++; + } + + break; /* string end */ + } + } + + return ntokens; +} /* ms_tokenize_command */ - return ntokens; -} /** * parse the response of server. @@ -943,111 +1137,140 @@ static int ms_tokenize_command(char *command, token_t *tokens, const int max_tok * @param c, pointer of the concurrency * @param command, the string responded by server * - * @return int, if the command completed return 0, else return + * @return int, if the command completed return EXIT_SUCCESS, else return * -1 */ static int ms_ascii_process_line(ms_conn_t *c, char *command) { - int ret = 0; - int64_t value_len; - char *buffer = command; - - assert(c != NULL); - - /** - * for command get, we store the returned value into local buffer - * then continue in ms_complete_nread(). - */ - - switch (buffer[0]) { - case 'V': /* VALUE || VERSION */ - if (buffer[1] == 'A') { /* VALUE */ - token_t tokens[MAX_TOKENS]; - ms_tokenize_command(command, tokens, MAX_TOKENS); - value_len = strtol(tokens[VALUELEN_TOKEN].value, NULL, 10); - c->currcmd.key_prefix = *(uint64_t *)tokens[KEY_TOKEN].value; - - /* - We read the \r\n into the string since not doing so is more - cycles then the waster of memory to do so. - - We are null terminating through, which will most likely make - some people lazy about using the return length. - */ - c->rvbytes = (int)(value_len + 2); - c->readval = true; - ret = -1; - } - - break; - - case 'O': /* OK */ - c->currcmd.retstat = MCD_SUCCESS; - - case 'S': /* STORED STATS SERVER_ERROR */ - if (buffer[2] == 'A') {/* STORED STATS */ - /* STATS*/ - c->currcmd.retstat = MCD_STAT; - } else if (buffer[1] == 'E') { - /* SERVER_ERROR */ - printf("<%d %s\n", c->sfd, buffer); - - c->currcmd.retstat = MCD_SERVER_ERROR; - } else if (buffer[1] == 'T') { - /* STORED */ - c->currcmd.retstat = MCD_STORED; - } else { - c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; - } - break; - - case 'D': /* DELETED DATA */ - if (buffer[1] == 'E') { - c->currcmd.retstat = MCD_DELETED; - } else { - c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; - } + int ret= 0; + int64_t value_len; + char *buffer= command; + + assert(c != NULL); + + /** + * for command get, we store the returned value into local buffer + * then continue in ms_complete_nread(). + */ + + switch (buffer[0]) + { + case 'V': /* VALUE || VERSION */ + if (buffer[1] == 'A') /* VALUE */ + { + token_t tokens[MAX_TOKENS]; + ms_tokenize_command(command, tokens, MAX_TOKENS); + errno= 0; + value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10); + if (errno != 0) + { + printf("<%d ERROR %s\n", c->sfd, strerror(errno)); + } + c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value; + + /* + * We read the \r\n into the string since not doing so is more + * cycles then the waster of memory to do so. + * + * We are null terminating through, which will most likely make + * some people lazy about using the return length. + */ + c->rvbytes= (int)(value_len + 2); + c->readval= true; + ret= -1; + } + + break; + + case 'O': /* OK */ + c->currcmd.retstat= MCD_SUCCESS; + + case 'S': /* STORED STATS SERVER_ERROR */ + if (buffer[2] == 'A') /* STORED STATS */ + { /* STATS*/ + c->currcmd.retstat= MCD_STAT; + } + else if (buffer[1] == 'E') + { + /* SERVER_ERROR */ + printf("<%d %s\n", c->sfd, buffer); + + c->currcmd.retstat= MCD_SERVER_ERROR; + } + else if (buffer[1] == 'T') + { + /* STORED */ + c->currcmd.retstat= MCD_STORED; + } + else + { + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + } + break; + + case 'D': /* DELETED DATA */ + if (buffer[1] == 'E') + { + c->currcmd.retstat= MCD_DELETED; + } + else + { + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + } + + break; + + case 'N': /* NOT_FOUND NOT_STORED*/ + if (buffer[4] == 'F') + { + c->currcmd.retstat= MCD_NOTFOUND; + } + else if (buffer[4] == 'S') + { + printf("<%d %s\n", c->sfd, buffer); + c->currcmd.retstat= MCD_NOTSTORED; + } + else + { + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + } + break; + + case 'E': /* PROTOCOL ERROR or END */ + if (buffer[1] == 'N') + { + /* END */ + c->currcmd.retstat= MCD_END; + } + else if (buffer[1] == 'R') + { + printf("<%d ERROR\n", c->sfd); + c->currcmd.retstat= MCD_PROTOCOL_ERROR; + } + else if (buffer[1] == 'X') + { + c->currcmd.retstat= MCD_DATA_EXISTS; + printf("<%d %s\n", c->sfd, buffer); + } + else + { + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + } + break; - break; - - case 'N': /* NOT_FOUND NOT_STORED*/ - if (buffer[4] == 'F') { - c->currcmd.retstat = MCD_NOTFOUND; - } else if (buffer[4] == 'S') { - printf("<%d %s\n", c->sfd, buffer); - c->currcmd.retstat = MCD_NOTSTORED; - } else { - c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; - } - break; + case 'C': /* CLIENT ERROR */ + printf("<%d %s\n", c->sfd, buffer); + c->currcmd.retstat= MCD_CLIENT_ERROR; + break; - case 'E': /* PROTOCOL ERROR or END */ - if (buffer[1] == 'N') { - /* END */ - c->currcmd.retstat = MCD_END; - } else if (buffer[1] == 'R') { - printf("<%d ERROR\n", c->sfd); - c->currcmd.retstat = MCD_PROTOCOL_ERROR; - } else if (buffer[1] == 'X') { - c->currcmd.retstat = MCD_DATA_EXISTS; - printf("<%d %s\n", c->sfd, buffer); - } else { - c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; - } - break; - - case 'C': /* CLIENT ERROR */ - printf("<%d %s\n", c->sfd, buffer); - c->currcmd.retstat = MCD_CLIENT_ERROR; - break; + default: + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + break; + } /* switch */ - default: - c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; - break; - } + return ret; +} /* ms_ascii_process_line */ - return ret; -} /** * after one operation completes, reset the concurrency @@ -1057,117 +1280,143 @@ static int ms_ascii_process_line(ms_conn_t *c, char *command) */ void ms_reset_conn(ms_conn_t *c, bool timeout) { - assert(c != NULL); - - if (c->udp) { - if (c->packets > 0 && c->packets < MAX_UDP_PACKET) { - memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets); - } - - c->packets = 0; - c->recvpkt = 0; - c->pktcurr = 0; - c->ordcurr = 0; - c->rudpbytes = 0; - } - c->currcmd.isfinish = true; - c->ctnwrite = false; - c->rbytes = 0; - c->rcurr = c->rbuf; - ms_conn_set_state(c, conn_write); - memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ + assert(c != NULL); + + if (c->udp) + { + if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) + { + memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets); + } + + c->packets= 0; + c->recvpkt= 0; + c->pktcurr= 0; + c->ordcurr= 0; + c->rudpbytes= 0; + } + c->currcmd.isfinish= true; + c->ctnwrite= false; + c->rbytes= 0; + c->rcurr= c->rbuf; + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; + ms_conn_set_state(c, conn_write); + memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ + + if (timeout) + { + ms_drive_machine(c); + } +} /* ms_reset_conn */ - if (timeout) { - ms_drive_machine(c); - } -} /** * if we have a complete line in the buffer, process it. * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_try_read_line(ms_conn_t *c) { - if (c->protocol == binary_prot) { - /* Do we have the complete packet header? */ - if ((uint64_t)c->rbytes < sizeof(c->binary_header)) { - /* need more data! */ - return 0; - } else { + if (c->protocol == binary_prot) + { + /* Do we have the complete packet header? */ + if ((uint64_t)c->rbytes < sizeof(c->binary_header)) + { + /* need more data! */ + return EXIT_SUCCESS; + } + else + { #ifdef NEED_ALIGN - if (((long)(c->rcurr)) % 8 != 0) { - /* must realign input buffer */ - memmove(c->rbuf, c->rcurr, c->rbytes); - c->rcurr = c->rbuf; - if (settings.verbose) { - fprintf(stderr, "%d: Realign input buffer.\n", c->sfd); - } - } + if (((long)(c->rcurr)) % 8 != 0) + { + /* must realign input buffer */ + memmove(c->rbuf, c->rcurr, c->rbytes); + c->rcurr= c->rbuf; + if (settings.verbose) + { + fprintf(stderr, "%d: Realign input buffer.\n", c->sfd); + } + } #endif - protocol_binary_response_header* rsp; - rsp = (protocol_binary_response_header*)c->rcurr; - - c->binary_header = *rsp; - c->binary_header.response.extlen = rsp->response.extlen; - c->binary_header.response.keylen = ntohl(rsp->response.keylen); - c->binary_header.response.bodylen = ntohl(rsp->response.bodylen); - c->binary_header.response.status = ntohl(rsp->response.status); - - if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) { - fprintf(stderr, "Invalid magic: %x\n", - c->binary_header.response.magic); - ms_conn_set_state(c, conn_closing); - return 0; - } - - /* process this complete response */ - if (ms_bin_process_response(c) == 0) { - /* current operation completed */ - ms_reset_conn(c, false); - return -1; - } else { - c->rbytes -= (int32_t)sizeof(c->binary_header); - c->rcurr += sizeof(c->binary_header); - } - } - } else { - char *el, *cont; - - assert(c != NULL); - assert(c->rcurr <= (c->rbuf + c->rsize)); - - if (c->rbytes == 0) - return 0; - el = memchr(c->rcurr, '\n', (size_t)c->rbytes); - if (!el) - return 0; - cont = el + 1; - if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { - el--; - } - *el = '\0'; - - assert(cont <= (c->rcurr + c->rbytes)); - - /* process this complete line */ - if (ms_ascii_process_line(c, c->rcurr) == 0) { - /* current operation completed */ - ms_reset_conn(c, false); - return -1; - } else { - /* current operation didn't complete */ - c->rbytes -= (int32_t)(cont - c->rcurr); - c->rcurr = cont; - } + protocol_binary_response_header *rsp; + rsp= (protocol_binary_response_header *)c->rcurr; + + c->binary_header= *rsp; + c->binary_header.response.extlen= rsp->response.extlen; + c->binary_header.response.keylen= ntohs(rsp->response.keylen); + c->binary_header.response.bodylen= ntohl(rsp->response.bodylen); + c->binary_header.response.status= ntohs(rsp->response.status); + + if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) + { + fprintf(stderr, "Invalid magic: %x\n", + c->binary_header.response.magic); + ms_conn_set_state(c, conn_closing); + return EXIT_SUCCESS; + } + + /* process this complete response */ + if (ms_bin_process_response(c) == 0) + { + /* current operation completed */ + ms_reset_conn(c, false); + return -1; + } + else + { + c->rbytes-= (int32_t)sizeof(c->binary_header); + c->rcurr+= sizeof(c->binary_header); + } + } + } + else + { + char *el, *cont; + + assert(c != NULL); + assert(c->rcurr <= (c->rbuf + c->rsize)); + + if (c->rbytes == 0) + return EXIT_SUCCESS; + + el= memchr(c->rcurr, '\n', (size_t)c->rbytes); + if (! el) + return EXIT_SUCCESS; - assert(c->rcurr <= (c->rbuf + c->rsize)); + cont= el + 1; + if (((el - c->rcurr) > 1) && (*(el - 1) == '\r')) + { + el--; } + *el= '\0'; + + assert(cont <= (c->rcurr + c->rbytes)); + + /* process this complete line */ + if (ms_ascii_process_line(c, c->rcurr) == 0) + { + /* current operation completed */ + ms_reset_conn(c, false); + return -1; + } + else + { + /* current operation didn't complete */ + c->rbytes-= (int32_t)(cont - c->rcurr); + c->rcurr= cont; + } + + assert(c->rcurr <= (c->rbuf + c->rsize)); + } + + return -1; +} /* ms_try_read_line */ - return -1; -} /** * because the packet of UDP can't ensure the order, the @@ -1182,104 +1431,129 @@ static int ms_try_read_line(ms_conn_t *c) */ static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes) { - int len = 0; - int wbytes = 0; - uint16_t req_id = 0; - uint16_t seq_num = 0; - uint16_t packets = 0; - unsigned char *header = NULL; - - /* no enough data */ - assert(c != NULL); - assert(buf != NULL); - assert(c->rudpbytes >= UDP_HEADER_SIZE); - - /* calculate received packets count */ - if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) { - /* the last packet has some data */ - c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1; - } else { - c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE; - } - - /* get the total packets count if necessary */ - if (c->packets == 0) { - c->packets = HEADER_TO_PACKETS((unsigned char *)c->rudpbuf); - } - - /* build the ordered packet array */ - for (int i = c->pktcurr; i < c->recvpkt; i++) { - header = (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE; - req_id = (uint16_t)HEADER_TO_REQID(header); - assert(req_id == c->request_id % (1 << 16)); - - packets = (uint16_t)HEADER_TO_PACKETS(header); - assert(c->packets == HEADER_TO_PACKETS(header)); - - seq_num = (uint16_t)HEADER_TO_SEQNUM(header); - c->udppkt[seq_num].header = header; - c->udppkt[seq_num].data = (char *)header + UDP_HEADER_SIZE; - - if (i == c->recvpkt - 1) { - /* last received packet */ - if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) { - c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; - c->pktcurr++; - } else { - c->udppkt[seq_num].rbytes = c->rudpbytes % UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; - } - } else { - c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; - c->pktcurr++; - } - } + int len= 0; + int wbytes= 0; + uint16_t req_id= 0; + uint16_t seq_num= 0; + uint16_t packets= 0; + unsigned char *header= NULL; + + /* no enough data */ + assert(c != NULL); + assert(buf != NULL); + assert(c->rudpbytes >= UDP_HEADER_SIZE); + + /* calculate received packets count */ + if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) + { + /* the last packet has some data */ + c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1; + } + else + { + c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE; + } + + /* get the total packets count if necessary */ + if (c->packets == 0) + { + c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf); + } + + /* build the ordered packet array */ + for (int i= c->pktcurr; i < c->recvpkt; i++) + { + header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE; + req_id= (uint16_t)HEADER_TO_REQID(header); + assert(req_id == c->request_id % (1 << 16)); + + packets= (uint16_t)HEADER_TO_PACKETS(header); + assert(c->packets == HEADER_TO_PACKETS(header)); + + seq_num= (uint16_t)HEADER_TO_SEQNUM(header); + c->udppkt[seq_num].header= header; + c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE; + + if (i == c->recvpkt - 1) + { + /* last received packet */ + if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) + { + c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; + c->pktcurr++; + } + else + { + c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE + - UDP_HEADER_SIZE; + } + } + else + { + c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; + c->pktcurr++; + } + } + + for (int i= c->ordcurr; i < c->recvpkt; i++) + { + /* there is some data to copy */ + if ((c->udppkt[i].data != NULL) + && (c->udppkt[i].copybytes < c->udppkt[i].rbytes)) + { + header= c->udppkt[i].header; + len= c->udppkt[i].rbytes - c->udppkt[i].copybytes; + if (len > rbytes - wbytes) + { + len= rbytes - wbytes; + } + + assert(len <= rbytes - wbytes); + assert(i == HEADER_TO_SEQNUM(header)); + + memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, + (size_t)len); + wbytes+= len; + c->udppkt[i].copybytes+= len; + + if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes) + && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) + { + /* finish copying all the data of this packet, next */ + c->ordcurr++; + } + + /* last received packet, and finish copying all the data */ + if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1) + && (c->udppkt[i].copybytes == c->udppkt[i].rbytes)) + { + break; + } - for (int i = c->ordcurr; i < c->recvpkt; i++) { - /* there is some data to copy */ - if (c->udppkt[i].data != NULL && c->udppkt[i].copybytes < c->udppkt[i].rbytes) { - header = c->udppkt[i].header; - len = c->udppkt[i].rbytes - c->udppkt[i].copybytes; - if (len > rbytes - wbytes) { - len = rbytes - wbytes; - } - - assert(len <= rbytes - wbytes); - assert(i == HEADER_TO_SEQNUM(header)); - - memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, (size_t)len); - wbytes += len; - c->udppkt[i].copybytes += len; - - if (c->udppkt[i].copybytes == c->udppkt[i].rbytes - && c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE) { - /* finish copying all the data of this packet, next */ - c->ordcurr++; - } - - /* last received packet, and finish copying all the data */ - if (c->recvpkt == c->packets && i == c->recvpkt - 1 - && c->udppkt[i].copybytes == c->udppkt[i].rbytes) { - break; - } - - /* no space to copy data */ - if (wbytes >= rbytes) { - break; - } - - /* it doesn't finish reading all the data of the packet from network */ - if (i != c->recvpkt - 1 - && c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE) { - break; - } - } else { - /* no data to copy */ - break; - } + /* no space to copy data */ + if (wbytes >= rbytes) + { + break; + } + + /* it doesn't finish reading all the data of the packet from network */ + if ((i != c->recvpkt - 1) + && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) + { + break; + } } + else + { + /* no data to copy */ + break; + } + } + (void)packets; + + return wbytes == 0 ? -1 : wbytes; +} /* ms_sort_udp_packet */ - return(wbytes == 0 ? -1 : wbytes); -} /** * encapsulate upd read like tcp read @@ -1293,70 +1567,83 @@ static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes) */ static int ms_udp_read(ms_conn_t *c, char *buf, int len) { - int res = 0; - int avail = 0; - int rbytes = 0; - int copybytes = 0; - - assert(c->udp); - - while (1) { - if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) { - char *new_rbuf = realloc(c->rudpbuf, (size_t)c->rudpsize * 2); - if (!new_rbuf) { - fprintf(stderr, "Couldn't realloc input buffer.\n"); - c->rudpbytes = 0; /* ignore what we read */ - return -1; - } - c->rudpbuf = new_rbuf; - c->rudpsize *= 2; - } - - avail = c->rudpsize - c->rudpbytes; - /* UDP each time read a packet, 1400 bytes */ - res = (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail); - - if (res > 0) { - __sync_fetch_and_add(&ms_stats.bytes_read, res); - c->rudpbytes += res; - rbytes += res; - if (res == avail) { - continue; - } else { - break; - } - } - - if (res == 0) { - /* "connection" closed */ - return res; - } - - if (res == -1) { - /* no data to read */ - return res; - } + int res= 0; + int avail= 0; + int rbytes= 0; + int copybytes= 0; + + assert(c->udp); + + while (1) + { + if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) + { + char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2); + if (! new_rbuf) + { + fprintf(stderr, "Couldn't realloc input buffer.\n"); + c->rudpbytes= 0; /* ignore what we read */ + return -1; + } + c->rudpbuf= new_rbuf; + c->rudpsize*= 2; + } + + avail= c->rudpsize - c->rudpbytes; + /* UDP each time read a packet, 1400 bytes */ + res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail); + + if (res > 0) + { + atomic_add_size(&ms_stats.bytes_read, res); + c->rudpbytes+= res; + rbytes+= res; + if (res == avail) + { + continue; + } + else + { + break; + } } - /* copy data to read buffer */ - if (rbytes > 0) { - copybytes = ms_sort_udp_packet(c, buf, len); + if (res == 0) + { + /* "connection" closed */ + return res; } - if (copybytes == -1) { - __sync_fetch_and_add(&ms_stats.pkt_disorder, 1); + if (res == -1) + { + /* no data to read */ + return res; } + } + + /* copy data to read buffer */ + if (rbytes > 0) + { + copybytes= ms_sort_udp_packet(c, buf, len); + } + + if (copybytes == -1) + { + atomic_add_size(&ms_stats.pkt_disorder, 1); + } + + return copybytes; +} /* ms_udp_read */ - return copybytes; -} /* * read from network as much as we can, handle buffer overflow and connection * close. * before reading, move the remaining incomplete fragment of a command * (if any) to the beginning of the buffer. - * return 0 if there's nothing to read on the first read. + * return EXIT_SUCCESS if there's nothing to read on the first read. */ + /** * read from network as much as we can, handle buffer overflow and connection * close. before reading, move the remaining incomplete fragment of a command @@ -1365,76 +1652,93 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) * @param c, pointer of the concurrency * * @return int, - * return 0 if there's nothing to read on the first read. - * return 1 if get data + * return EXIT_SUCCESS if there's nothing to read on the first read. + * return EXIT_FAILURE if get data * return -1 if error happens */ static int ms_try_read_network(ms_conn_t *c) { - int gotdata = 0; - int res; - int64_t avail; + int gotdata= 0; + int res; + int64_t avail; + + assert(c != NULL); + + if ((c->rcurr != c->rbuf) + && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf)) + || (c->readval && (c->rcurr - c->rbuf > c->rbytes)))) + { + if (c->rbytes != 0) /* otherwise there's nothing to copy */ + memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); + c->rcurr= c->rbuf; + } + + while (1) + { + if (c->rbytes >= c->rsize) + { + char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2); + if (! new_rbuf) + { + fprintf(stderr, "Couldn't realloc input buffer.\n"); + c->rbytes= 0; /* ignore what we read */ + return -1; + } + c->rcurr= c->rbuf= new_rbuf; + c->rsize*= 2; + } - assert(c != NULL); + avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf); + if (avail == 0) + { + break; + } - if (c->rcurr != c->rbuf && - (!c->readval || c->rvbytes > c->rsize - (c->rcurr - c->rbuf) - || (c->readval && c->rcurr - c->rbuf > c->rbytes))) { - if (c->rbytes != 0) /* otherwise there's nothing to copy */ - memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); - c->rcurr = c->rbuf; - } - - while (1) { - if (c->rbytes >= c->rsize) { - char *new_rbuf = realloc(c->rbuf, (size_t)c->rsize * 2); - if (!new_rbuf) { - fprintf(stderr, "Couldn't realloc input buffer.\n"); - c->rbytes = 0; /* ignore what we read */ - return -1; - } - c->rcurr = c->rbuf = new_rbuf; - c->rsize *= 2; - } + if (c->udp) + { + res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail); + } + else + { + res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail); + } + + if (res > 0) + { + if (! c->udp) + { + atomic_add_size(&ms_stats.bytes_read, res); + } + gotdata= 1; + c->rbytes+= res; + if (res == avail) + { + continue; + } + else + { + break; + } + } + if (res == 0) + { + /* connection closed */ + ms_conn_set_state(c, conn_closing); + return -1; + } + if (res == -1) + { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + break; + /* Should close on unhandled errors. */ + ms_conn_set_state(c, conn_closing); + return -1; + } + } - avail = c->rsize - c->rbytes - (c->rcurr - c->rbuf); - if (avail == 0) { - break; - } + return gotdata; +} /* ms_try_read_network */ - if (c->udp) { - res = (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail); - } else { - res = (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail); - } - - if (res > 0) { - if (!c->udp) { - __sync_fetch_and_add(&ms_stats.bytes_read, res); - } - gotdata = 1; - c->rbytes += res; - if (res == avail) { - continue; - } else { - break; - } - } - if (res == 0) { - /* connection closed */ - ms_conn_set_state(c, conn_closing); - return -1; - } - if (res == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) break; - /* Should close on unhandled errors. */ - ms_conn_set_state(c, conn_closing); - return -1; - } - } - - return gotdata; -} /** * after get the object from server, verify the value if @@ -1445,79 +1749,106 @@ static int ms_try_read_network(ms_conn_t *c) * @param value, received value string * @param vlen, received value string length */ -static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, - char *value, int vlen) +static void ms_verify_value(ms_conn_t *c, + ms_mlget_task_item_t *mlget_item, + char *value, + int vlen) { - if (c->curr_task.verify) { - assert(c->curr_task.item->value_offset != INVALID_OFFSET); - char *orignval = &ms_setting.char_block[c->curr_task.item->value_offset]; - char *orignkey = &ms_setting.char_block[c->curr_task.item->key_suffix_offset]; - - /* verify expire time if necessary */ - if (c->curr_task.item->exp_time > 0) { - struct timeval curr_time; - gettimeofday(&curr_time, NULL); - - /* object expired but get it now */ - if (curr_time.tv_sec - c->curr_task.item->client_time - > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) { - __sync_fetch_and_add(&ms_stats.exp_get, 1); - - if (ms_setting.verbose) { - char set_time[64]; - char cur_time[64]; - strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", - localtime(&c->curr_task.item->client_time)); - strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", - localtime(&curr_time.tv_sec)); - fprintf(stderr, "\n<%d expire time verification failed, " - "object expired but get it now\n" - "\tkey len: %d\n" - "\tkey: %lx %.*s\n" - "\tset time: %s current time: %s " - "diff time: %d expire time: %d\n" - "\texpected data: \n" - "\treceived data len: %d\n" - "\treceived data: %.*s\n", - c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix, - c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE, - orignkey, set_time, cur_time, - (int)(curr_time.tv_sec - c->curr_task.item->client_time), - c->curr_task.item->exp_time, - vlen, vlen, value); - fflush(stderr); - } - } - } else { - if (c->curr_task.item->value_size != vlen - || memcmp(orignval, value, (size_t)vlen) != 0) { - __sync_fetch_and_add(&ms_stats.vef_failed, 1); - - if (ms_setting.verbose) { - fprintf(stderr, "\n<%d data verification failed\n" - "\tkey len: %d\n" - "\tkey: %lx %.*s\n" - "\texpected data len: %d\n" - "\texpected data: %.*s\n" - "\treceived data len: %d\n" - "\treceived data: %.*s\n", - c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix, - c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE, - orignkey, c->curr_task.item->value_size, - c->curr_task.item->value_size, - orignval, vlen, vlen, value); - fflush(stderr); - } - } - } - - c->curr_task.finish_verify = true; + if (c->curr_task.verify) + { + assert(c->curr_task.item->value_offset != INVALID_OFFSET); + char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset]; + char *orignkey= + &ms_setting.char_block[c->curr_task.item->key_suffix_offset]; + + /* verify expire time if necessary */ + if (c->curr_task.item->exp_time > 0) + { + struct timeval curr_time; + gettimeofday(&curr_time, NULL); + + /* object expired but get it now */ + if (curr_time.tv_sec - c->curr_task.item->client_time + > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) + { + atomic_add_size(&ms_stats.exp_get, 1); + + if (ms_setting.verbose) + { + char set_time[64]; + char cur_time[64]; + strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", + localtime(&c->curr_task.item->client_time)); + strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", + localtime(&curr_time.tv_sec)); + fprintf(stderr, + "\n<%d expire time verification failed, " + "object expired but get it now\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64 " %.*s\n" + "\tset time: %s current time: %s " + "diff time: %d expire time: %d\n" + "\texpected data: \n" + "\treceived data len: %d\n" + "\treceived data: %.*s\n", + c->sfd, + c->curr_task.item->key_size, + c->curr_task.item->key_prefix, + c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE, + orignkey, + set_time, + cur_time, + (int)(curr_time.tv_sec - c->curr_task.item->client_time), + c->curr_task.item->exp_time, + vlen, + vlen, + value); + fflush(stderr); + } + } + } + else + { + if ((c->curr_task.item->value_size != vlen) + || (memcmp(orignval, value, (size_t)vlen) != 0)) + { + atomic_add_size(&ms_stats.vef_failed, 1); + + if (ms_setting.verbose) + { + fprintf(stderr, + "\n<%d data verification failed\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64" %.*s\n" + "\texpected data len: %d\n" + "\texpected data: %.*s\n" + "\treceived data len: %d\n" + "\treceived data: %.*s\n", + c->sfd, + c->curr_task.item->key_size, + c->curr_task.item->key_prefix, + c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE, + orignkey, + c->curr_task.item->value_size, + c->curr_task.item->value_size, + orignval, + vlen, + vlen, + value); + fflush(stderr); + } + } + } + + c->curr_task.finish_verify= true; + + if (mlget_item != NULL) + { + mlget_item->finish_verify= true; + } + } +} /* ms_verify_value */ - if (mlget_item != NULL) { - mlget_item->finish_verify = true; - } - } -} /** * For ASCII protocol, after store the data into the local @@ -1527,53 +1858,60 @@ static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item, */ static void ms_ascii_complete_nread(ms_conn_t *c) { - assert(c != NULL); - assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot); - if (c->rvbytes > 2) { - assert(c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r'); - } - - /* multi-get */ - ms_mlget_task_item_t *mlget_item = NULL; - if ((ms_setting.mult_key_num > 1 && - c->mlget_task.mlget_num >= ms_setting.mult_key_num) || - (c->remain_exec_num == 0 && c->mlget_task.mlget_num > 0)) { - - c->mlget_task.value_index++; - mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index]; - - if (mlget_item->item->key_prefix == c->currcmd.key_prefix) { - c->curr_task.item = mlget_item->item; - c->curr_task.verify = mlget_item->verify; - c->curr_task.finish_verify = mlget_item->finish_verify; - mlget_item->get_miss = false; - } else { - /* Try to find the task item in multi-get task array */ - for (int i = 0; i < c->mlget_task.mlget_num; i++) { - mlget_item = &c->mlget_task.mlget_item[i]; - if (mlget_item->item->key_prefix == c->currcmd.key_prefix) { - - c->curr_task.item = mlget_item->item; - c->curr_task.verify = mlget_item->verify; - c->curr_task.finish_verify = mlget_item->finish_verify; - mlget_item->get_miss = false; - - break; - } - } - } - } - - ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2); + assert(c != NULL); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == ascii_prot); + if (c->rvbytes > 2) + { + assert( + c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r'); + } + + /* multi-get */ + ms_mlget_task_item_t *mlget_item= NULL; + if (((ms_setting.mult_key_num > 1) + && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) + || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) + { + c->mlget_task.value_index++; + mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index]; + + if (mlget_item->item->key_prefix == c->currcmd.key_prefix) + { + c->curr_task.item= mlget_item->item; + c->curr_task.verify= mlget_item->verify; + c->curr_task.finish_verify= mlget_item->finish_verify; + mlget_item->get_miss= false; + } + else + { + /* Try to find the task item in multi-get task array */ + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + mlget_item= &c->mlget_task.mlget_item[i]; + if (mlget_item->item->key_prefix == c->currcmd.key_prefix) + { + c->curr_task.item= mlget_item->item; + c->curr_task.verify= mlget_item->verify; + c->curr_task.finish_verify= mlget_item->finish_verify; + mlget_item->get_miss= false; + + break; + } + } + } + } + + ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2); + + c->curr_task.get_miss= false; + c->rbytes-= c->rvbytes; + c->rcurr= c->rcurr + c->rvbytes; + assert(c->rcurr <= (c->rbuf + c->rsize)); + c->readval= false; + c->rvbytes= 0; +} /* ms_ascii_complete_nread */ - c->curr_task.get_miss = false; - c->rbytes -= c->rvbytes; - c->rcurr = c->rcurr + c->rvbytes; - assert(c->rcurr <= (c->rbuf + c->rsize)); - c->readval = false; - c->rvbytes = 0; -} /** * For binary protocol, after store the data into the local @@ -1583,64 +1921,75 @@ static void ms_ascii_complete_nread(ms_conn_t *c) */ static void ms_bin_complete_nread(ms_conn_t *c) { - assert(c != NULL); - assert(c->rbytes >= c->rvbytes); - assert(c->protocol == binary_prot); - - int extlen = c->binary_header.response.extlen; - int keylen = c->binary_header.response.keylen; - uint8_t opcode = c->binary_header.response.opcode; - - /* not get command or not include value, just return */ - if ((opcode != PROTOCOL_BINARY_CMD_GET && opcode != PROTOCOL_BINARY_CMD_GETQ) || - c->rvbytes <= extlen + keylen) { - /* get miss */ - if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) { - c->currcmd.retstat = MCD_END; - c->curr_task.get_miss = true; - } - - c->readval = false; - c->rvbytes = 0; - ms_reset_conn(c, false); - return; - } - - /* multi-get */ - ms_mlget_task_item_t *mlget_item = NULL; - if ((ms_setting.mult_key_num > 1 && - c->mlget_task.mlget_num >= ms_setting.mult_key_num) || - (c->remain_exec_num == 0 && c->mlget_task.mlget_num > 0)) { - - c->mlget_task.value_index++; - mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index]; - - c->curr_task.item = mlget_item->item; - c->curr_task.verify = mlget_item->verify; - c->curr_task.finish_verify = mlget_item->finish_verify; - mlget_item->get_miss = false; - } - - ms_verify_value(c, mlget_item, c->rcurr + extlen + keylen, c->rvbytes - extlen - keylen); - - c->currcmd.retstat = MCD_END; - c->curr_task.get_miss = false; - c->rbytes -= c->rvbytes; - c->rcurr = c->rcurr + c->rvbytes; - assert(c->rcurr <= (c->rbuf + c->rsize)); - c->readval = false; - c->rvbytes = 0; + assert(c != NULL); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == binary_prot); + + int extlen= c->binary_header.response.extlen; + int keylen= c->binary_header.response.keylen; + uint8_t opcode= c->binary_header.response.opcode; + + /* not get command or not include value, just return */ + if (((opcode != PROTOCOL_BINARY_CMD_GET) + && (opcode != PROTOCOL_BINARY_CMD_GETQ)) + || (c->rvbytes <= extlen + keylen)) + { + /* get miss */ + if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) + { + c->currcmd.retstat= MCD_END; + c->curr_task.get_miss= true; + } + + c->readval= false; + c->rvbytes= 0; + ms_reset_conn(c, false); + return; + } + + /* multi-get */ + ms_mlget_task_item_t *mlget_item= NULL; + if (((ms_setting.mult_key_num > 1) + && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) + || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) + { + c->mlget_task.value_index++; + mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index]; + + c->curr_task.item= mlget_item->item; + c->curr_task.verify= mlget_item->verify; + c->curr_task.finish_verify= mlget_item->finish_verify; + mlget_item->get_miss= false; + } + + ms_verify_value(c, + mlget_item, + c->rcurr + extlen + keylen, + c->rvbytes - extlen - keylen); + + c->currcmd.retstat= MCD_END; + c->curr_task.get_miss= false; + c->rbytes-= c->rvbytes; + c->rcurr= c->rcurr + c->rvbytes; + assert(c->rcurr <= (c->rbuf + c->rsize)); + c->readval= false; + c->rvbytes= 0; + + if (ms_setting.mult_key_num > 1) + { + /* multi-get have check all the item */ + if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) + { + ms_reset_conn(c, false); + } + } + else + { + /* single get */ + ms_reset_conn(c, false); + } +} /* ms_bin_complete_nread */ - if (ms_setting.mult_key_num > 1) { - /* multi-get have check all the item */ - if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) { - ms_reset_conn(c, false); - } - } else { - /* single get */ - ms_reset_conn(c, false); - } -} /** * we get here after reading the value of get commands. @@ -1649,65 +1998,74 @@ static void ms_bin_complete_nread(ms_conn_t *c) */ static void ms_complete_nread(ms_conn_t *c) { - assert(c != NULL); - assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot - || c->protocol == ascii_prot - || c->protocol == binary_prot); + assert(c != NULL); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == ascii_prot + || c->protocol == binary_prot); + + if (c->protocol == binary_prot) + { + ms_bin_complete_nread(c); + } + else + { + ms_ascii_complete_nread(c); + } +} /* ms_complete_nread */ - if (c->protocol == binary_prot) { - ms_bin_complete_nread(c); - } else { - ms_ascii_complete_nread(c); - } -} /** * Adds a message header to a connection. * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_add_msghdr(ms_conn_t *c) { - struct msghdr *msg; + struct msghdr *msg; - assert(c != NULL); + assert(c != NULL); - if (c->msgsize == c->msgused) { - msg = realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr)); - if (! msg) - return -1; - c->msglist = msg; - c->msgsize *= 2; - } + if (c->msgsize == c->msgused) + { + msg= + realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr)); + if (! msg) + return -1; - msg = c->msglist + c->msgused; + c->msglist= msg; + c->msgsize*= 2; + } - /** - * this wipes msg_iovlen, msg_control, msg_controllen, and - * msg_flags, the last 3 of which aren't defined on solaris: - */ - memset(msg, 0, sizeof(struct msghdr)); + msg= c->msglist + c->msgused; - msg->msg_iov = &c->iov[c->iovused]; + /** + * this wipes msg_iovlen, msg_control, msg_controllen, and + * msg_flags, the last 3 of which aren't defined on solaris: + */ + memset(msg, 0, sizeof(struct msghdr)); - if (c->udp && c->srv_recv_addr_size > 0) { - msg->msg_name = &c->srv_recv_addr; - msg->msg_namelen = c->srv_recv_addr_size; - } + msg->msg_iov= &c->iov[c->iovused]; - c->msgbytes = 0; - c->msgused++; + if (c->udp && (c->srv_recv_addr_size > 0)) + { + msg->msg_name= &c->srv_recv_addr; + msg->msg_namelen= c->srv_recv_addr_size; + } - if (c->udp) { - /* Leave room for the UDP header, which we'll fill in later. */ - return ms_add_iov(c, NULL, UDP_HEADER_SIZE); - } + c->msgbytes= 0; + c->msgused++; + + if (c->udp) + { + /* Leave room for the UDP header, which we'll fill in later. */ + return ms_add_iov(c, NULL, UDP_HEADER_SIZE); + } + + return EXIT_SUCCESS; +} /* ms_add_msghdr */ - return 0; -} /** * Ensures that there is room for another structure iovec in a connection's @@ -1715,30 +2073,36 @@ static int ms_add_msghdr(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_ensure_iov_space(ms_conn_t *c) { - assert(c != NULL); + assert(c != NULL); - if (c->iovused >= c->iovsize) { - int i, iovnum; - struct iovec *new_iov = (struct iovec *)realloc(c->iov, - ((uint64_t)c->iovsize * 2) * sizeof(struct iovec)); - if (! new_iov) - return -1; - c->iov = new_iov; - c->iovsize *= 2; - - /* Point all the msghdr structures at the new list. */ - for (i = 0, iovnum = 0; i < c->msgused; i++) { - c->msglist[i].msg_iov = &c->iov[iovnum]; - iovnum += (int)c->msglist[i].msg_iovlen; - } + if (c->iovused >= c->iovsize) + { + int i, iovnum; + struct iovec *new_iov= (struct iovec *)realloc(c->iov, + ((size_t)c->iovsize + * 2) + * sizeof(struct iovec)); + if (! new_iov) + return -1; + + c->iov= new_iov; + c->iovsize*= 2; + + /* Point all the msghdr structures at the new list. */ + for (i= 0, iovnum= 0; i < c->msgused; i++) + { + c->msglist[i].msg_iov= &c->iov[iovnum]; + iovnum+= (int)c->msglist[i].msg_iovlen; } + } + + return EXIT_SUCCESS; +} /* ms_ensure_iov_space */ - return 0; -} /** * Adds data to the list of pending data that will be written out to a @@ -1748,108 +2112,125 @@ static int ms_ensure_iov_space(ms_conn_t *c) * @param buf, the buffer includes data to send * @param len, the data length in the buffer * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) { - struct msghdr *m; - int leftover; - bool limit_to_mtu; + struct msghdr *m; + int leftover; + bool limit_to_mtu; - assert(c != NULL); + assert(c != NULL); - do { - m = &c->msglist[c->msgused - 1]; + do + { + m= &c->msglist[c->msgused - 1]; - /* - * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes. - */ - limit_to_mtu = c->udp; + /* + * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes. + */ + limit_to_mtu= c->udp; - /* We may need to start a new msghdr if this one is full. */ - if (m->msg_iovlen == IOV_MAX || - (limit_to_mtu && c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)) { - ms_add_msghdr(c); - m = &c->msglist[c->msgused - 1]; - } +#ifdef IOV_MAX + /* We may need to start a new msghdr if this one is full. */ + if ((m->msg_iovlen == IOV_MAX) + || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE))) + { + ms_add_msghdr(c); + m= &c->msglist[c->msgused - 1]; + } +#endif - if (ms_ensure_iov_space(c) != 0) - return -1; + if (ms_ensure_iov_space(c) != 0) + return -1; - /* If the fragment is too big to fit in the datagram, split it up */ - if (limit_to_mtu && len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE) { - leftover = len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE; - len -= leftover; - } else { - leftover = 0; - } + /* If the fragment is too big to fit in the datagram, split it up */ + if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE)) + { + leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE; + len-= leftover; + } + else + { + leftover= 0; + } - m = &c->msglist[c->msgused - 1]; - m->msg_iov[m->msg_iovlen].iov_base = (void *)buf; - m->msg_iov[m->msg_iovlen].iov_len = (size_t)len; + m= &c->msglist[c->msgused - 1]; + m->msg_iov[m->msg_iovlen].iov_base= (void *)buf; + m->msg_iov[m->msg_iovlen].iov_len= (size_t)len; - c->msgbytes += len; - c->iovused++; - m->msg_iovlen++; + c->msgbytes+= len; + c->iovused++; + m->msg_iovlen++; - buf = ((char *)buf) + len; - len = leftover; - } while (leftover > 0); + buf= ((char *)buf) + len; + len= leftover; + } + while (leftover > 0); + + return EXIT_SUCCESS; +} /* ms_add_iov */ - return 0; -} /** * Constructs a set of UDP headers and attaches them to the outgoing messages. * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_udp_headers(ms_conn_t *c) { - int i; - unsigned char *hdr; - - assert(c != NULL); - - c->request_id = ms_get_udp_request_id(); - - if (c->msgused > c->hdrsize) { - void *new_hdrbuf; - if (c->hdrbuf) - new_hdrbuf = realloc(c->hdrbuf, (size_t)c->msgused * 2 * UDP_HEADER_SIZE); - else - new_hdrbuf = malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE); - if (! new_hdrbuf) - return -1; - c->hdrbuf = (unsigned char *)new_hdrbuf; - c->hdrsize = c->msgused * 2; - } - - /* If this is a multi-packet request, drop it. */ - if (c->udp && c->msgused > 1) { - fprintf(stderr, "multi-packet request for UDP not supported.\n"); - return -1; - } - - hdr = c->hdrbuf; - for (i = 0; i < c->msgused; i++) { - c->msglist[i].msg_iov[0].iov_base = hdr; - c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE; - *hdr++ = (unsigned char)(c->request_id / 256); - *hdr++ = (unsigned char)(c->request_id % 256); - *hdr++ = (unsigned char)(i / 256); - *hdr++ = (unsigned char)(i % 256); - *hdr++ = (unsigned char)(c->msgused / 256); - *hdr++ = (unsigned char)(c->msgused % 256); - *hdr++ = (unsigned char)1; /* support facebook memcached */ - *hdr++ = (unsigned char)0; - assert(hdr == ((unsigned char *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE)); - } + int i; + unsigned char *hdr; + + assert(c != NULL); + + c->request_id= ms_get_udp_request_id(); + + if (c->msgused > c->hdrsize) + { + void *new_hdrbuf; + if (c->hdrbuf) + new_hdrbuf= realloc(c->hdrbuf, + (size_t)c->msgused * 2 * UDP_HEADER_SIZE); + else + new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE); + if (! new_hdrbuf) + return -1; + + c->hdrbuf= (unsigned char *)new_hdrbuf; + c->hdrsize= c->msgused * 2; + } + + /* If this is a multi-packet request, drop it. */ + if (c->udp && (c->msgused > 1)) + { + fprintf(stderr, "multi-packet request for UDP not supported.\n"); + return -1; + } + + hdr= c->hdrbuf; + for (i= 0; i < c->msgused; i++) + { + c->msglist[i].msg_iov[0].iov_base= (void *)hdr; + c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE; + *hdr++= (unsigned char)(c->request_id / 256); + *hdr++= (unsigned char)(c->request_id % 256); + *hdr++= (unsigned char)(i / 256); + *hdr++= (unsigned char)(i % 256); + *hdr++= (unsigned char)(c->msgused / 256); + *hdr++= (unsigned char)(c->msgused % 256); + *hdr++= (unsigned char)1; /* support facebook memcached */ + *hdr++= (unsigned char)0; + assert(hdr == + ((unsigned char *)c->msglist[i].msg_iov[0].iov_base + + UDP_HEADER_SIZE)); + } + + return EXIT_SUCCESS; +} /* ms_build_udp_headers */ - return 0; -} /** * Transmit the next chunk of data from our list of msgbuf structures. @@ -1863,57 +2244,67 @@ static int ms_build_udp_headers(ms_conn_t *c) */ static int ms_transmit(ms_conn_t *c) { - assert(c != NULL); - - if (c->msgcurr < c->msgused && - c->msglist[c->msgcurr].msg_iovlen == 0) { - /* Finished writing the current msg; advance to the next. */ - c->msgcurr++; - } - - if (c->msgcurr < c->msgused) { - ssize_t res; - struct msghdr *m = &c->msglist[c->msgcurr]; - - res = sendmsg(c->sfd, m, 0); - if (res > 0) { - __sync_fetch_and_add(&ms_stats.bytes_written, res); - - /* We've written some of the data. Remove the completed - iovec entries from the list of pending writes. */ - while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len) { - res -= (ssize_t)m->msg_iov->iov_len; - m->msg_iovlen--; - m->msg_iov++; - } - - /* Might have written just part of the last iovec entry; - adjust it so the next write will do the rest. */ - if (res > 0) { - m->msg_iov->iov_base = (unsigned char *)m->msg_iov->iov_base + res; - m->msg_iov->iov_len -= (uint64_t)res; - } - return TRANSMIT_INCOMPLETE; - } - if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - return TRANSMIT_HARD_ERROR; - } - return TRANSMIT_SOFT_ERROR; - } - - /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK, - we have a real error, on which we close the connection */ - fprintf(stderr, "Failed to write, and not due to blocking.\n"); - + assert(c != NULL); + + if ((c->msgcurr < c->msgused) + && (c->msglist[c->msgcurr].msg_iovlen == 0)) + { + /* Finished writing the current msg; advance to the next. */ + c->msgcurr++; + } + + if (c->msgcurr < c->msgused) + { + ssize_t res; + struct msghdr *m= &c->msglist[c->msgcurr]; + + res= sendmsg(c->sfd, m, 0); + if (res > 0) + { + atomic_add_size(&ms_stats.bytes_written, res); + + /* We've written some of the data. Remove the completed + * iovec entries from the list of pending writes. */ + while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len) + { + res-= (ssize_t)m->msg_iov->iov_len; + m->msg_iovlen--; + m->msg_iov++; + } + + /* Might have written just part of the last iovec entry; + * adjust it so the next write will do the rest. */ + if (res > 0) + { + m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res); + m->msg_iov->iov_len-= (size_t)res; + } + return TRANSMIT_INCOMPLETE; + } + if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) + { + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); ms_conn_set_state(c, conn_closing); return TRANSMIT_HARD_ERROR; - } else { - return TRANSMIT_COMPLETE; + } + return TRANSMIT_SOFT_ERROR; } -} + + /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK, + * we have a real error, on which we close the connection */ + fprintf(stderr, "Failed to write, and not due to blocking.\n"); + + ms_conn_set_state(c, conn_closing); + return TRANSMIT_HARD_ERROR; + } + else + { + return TRANSMIT_COMPLETE; + } +} /* ms_transmit */ + /** * Shrinks a connection's buffers if they're too big. This prevents @@ -1927,56 +2318,68 @@ static int ms_transmit(ms_conn_t *c) */ static void ms_conn_shrink(ms_conn_t *c) { - assert(c != NULL); - - if (c->udp) - return; - - if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { - char *newbuf; - - if (c->rcurr != c->rbuf) - memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); - - newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); + assert(c != NULL); - if (newbuf) { - c->rbuf = newbuf; - c->rsize = DATA_BUFFER_SIZE; - } - c->rcurr = c->rbuf; - } + if (c->udp) + return; - if (c->udp && c->rudpsize > UDP_DATA_BUFFER_HIGHWAT - && c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE) { - char *new_rbuf = (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2); - if (!new_rbuf) { - c->rudpbuf = new_rbuf; - c->rudpsize = UDP_DATA_BUFFER_SIZE; - } - /* TODO check error condition? */ - } + if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE)) + { + char *newbuf; + + if (c->rcurr != c->rbuf) + memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); + + newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); + + if (newbuf) + { + c->rbuf= newbuf; + c->rsize= DATA_BUFFER_SIZE; + } + c->rcurr= c->rbuf; + } + + if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT) + && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE)) + { + char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2); + if (! new_rbuf) + { + c->rudpbuf= new_rbuf; + c->rudpsize= UDP_DATA_BUFFER_SIZE; + } + /* TODO check error condition? */ + } + + if (c->msgsize > MSG_LIST_HIGHWAT) + { + struct msghdr *newbuf= (struct msghdr *)realloc( + (void *)c->msglist, + MSG_LIST_INITIAL + * sizeof(c->msglist[0])); + if (newbuf) + { + c->msglist= newbuf; + c->msgsize= MSG_LIST_INITIAL; + } + /* TODO check error condition? */ + } + + if (c->iovsize > IOV_LIST_HIGHWAT) + { + struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov, + IOV_LIST_INITIAL + * sizeof(c->iov[0])); + if (newbuf) + { + c->iov= newbuf; + c->iovsize= IOV_LIST_INITIAL; + } + /* TODO check return value */ + } +} /* ms_conn_shrink */ - if (c->msgsize > MSG_LIST_HIGHWAT) { - struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, - MSG_LIST_INITIAL * sizeof(c->msglist[0])); - if (newbuf) { - c->msglist = newbuf; - c->msgsize = MSG_LIST_INITIAL; - } - /* TODO check error condition? */ - } - - if (c->iovsize > IOV_LIST_HIGHWAT) { - struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, - IOV_LIST_INITIAL * sizeof(c->iov[0])); - if (newbuf) { - c->iov = newbuf; - c->iovsize = IOV_LIST_INITIAL; - } - /* TODO check return value */ - } -} /** * Sets a connection's current state in the state machine. Any special @@ -1988,15 +2391,18 @@ static void ms_conn_shrink(ms_conn_t *c) */ static void ms_conn_set_state(ms_conn_t *c, int state) { - assert(c != NULL); + assert(c != NULL); - if (state != c->state) { - if (state == conn_read) { - ms_conn_shrink(c); - } - c->state = state; + if (state != c->state) + { + if (state == conn_read) + { + ms_conn_shrink(c); } -} + c->state= state; + } +} /* ms_conn_set_state */ + /** * update the event if socks change state. for example: when @@ -2010,40 +2416,40 @@ static void ms_conn_set_state(ms_conn_t *c, int state) */ static bool ms_update_event(ms_conn_t *c, const int new_flags) { - /* default event timeout 10 seconds */ - struct timeval t = {.tv_sec = EVENT_TIMEOUT, .tv_usec = 0}; - - assert(c != NULL); - - struct event_base *base = c->event.ev_base; - if (c->ev_flags == new_flags && ms_setting.rep_write_srv == 0 - && (!ms_setting.facebook_test || c->total_sfds == 1)) { - return true; - } + assert(c != NULL); - if (event_del(&c->event) == -1) { - /* try to delete the event again */ - if (event_del(&c->event) == -1) { - return false; - } - } - - event_set(&c->event, c->sfd, (short)new_flags, ms_event_handler, (void *)c); - event_base_set(base, &c->event); - c->ev_flags = (short)new_flags; + struct event_base *base= c->event.ev_base; + if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0) + && (! ms_setting.facebook_test || (c->total_sfds == 1))) + { + return true; + } + + if (event_del(&c->event) == -1) + { + /* try to delete the event again */ + if (event_del(&c->event) == -1) + { + return false; + } + } + + event_set(&c->event, + c->sfd, + (short)new_flags, + ms_event_handler, + (void *)c); + event_base_set(base, &c->event); + c->ev_flags= (short)new_flags; + + if (event_add(&c->event, NULL) == -1) + { + return false; + } - if (c->total_sfds == 1) { - if (event_add(&c->event, NULL) == -1) { - return false; - } - } else { - if (event_add(&c->event, &t) == -1) { - return false; - } - } + return true; +} /* ms_update_event */ - return true; -} /** * If user want to get the expected throughput, we could limit @@ -2057,24 +2463,28 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) */ static bool ms_need_yield(ms_conn_t *c) { - int64_t tps = 0; - int64_t time_diff = 0; - struct timeval curr_time; - ms_task_t *task = &c->curr_task; - - if (ms_setting.expected_tps > 0) { - gettimeofday(&curr_time, NULL); - time_diff = ms_time_diff(&ms_thread.startup_time, &curr_time); - tps = (int64_t)((task->get_opt + task->set_opt) / ((uint64_t)time_diff / 1000000)); - - /* current throughput is greater than expected throughput */ - if (tps > ms_thread.thread_ctx->tps_perconn) { - return true; - } + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + int64_t tps= 0; + int64_t time_diff= 0; + struct timeval curr_time; + ms_task_t *task= &c->curr_task; + + if (ms_setting.expected_tps > 0) + { + gettimeofday(&curr_time, NULL); + time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time); + tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000); + + /* current throughput is greater than expected throughput */ + if (tps > ms_thread->thread_ctx->tps_perconn) + { + return true; } + } + + return false; +} /* ms_need_yield */ - return false; -} /** * used to update the start time of each operation @@ -2083,18 +2493,20 @@ static bool ms_need_yield(ms_conn_t *c) */ static void ms_update_start_time(ms_conn_t *c) { - ms_task_item_t *item = c->curr_task.item; - - if (ms_setting.stat_freq > 0 || c->udp - || (c->currcmd.cmd == CMD_SET && item->exp_time > 0)) { + ms_task_item_t *item= c->curr_task.item; - gettimeofday(&c->start_time, NULL); - if (c->currcmd.cmd == CMD_SET && item->exp_time > 0) { - /* record the current time */ - item->client_time = c->start_time.tv_sec; - } + if ((ms_setting.stat_freq > 0) || c->udp + || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))) + { + gettimeofday(&c->start_time, NULL); + if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)) + { + /* record the current time */ + item->client_time= c->start_time.tv_sec; } -} + } +} /* ms_update_start_time */ + /** * run the state machine @@ -2103,154 +2515,182 @@ static void ms_update_start_time(ms_conn_t *c) */ static void ms_drive_machine(ms_conn_t *c) { - bool stop = false; + bool stop= false; + + assert(c != NULL); + + while (! stop) + { + switch (c->state) + { + case conn_read: + if (c->readval) + { + if (c->rbytes >= c->rvbytes) + { + ms_complete_nread(c); + break; + } + } + else + { + if (ms_try_read_line(c) != 0) + { + break; + } + } + + if (ms_try_read_network(c) != 0) + { + break; + } + + /* doesn't read all the response data, wait event wake up */ + if (! c->currcmd.isfinish) + { + if (! ms_update_event(c, EV_READ | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop= true; + break; + } - assert(c != NULL); + /* we have no command line and no data to read from network, next write */ + ms_conn_set_state(c, conn_write); + memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ - while (!stop) { - switch (c->state) { - case conn_read: - if (c->readval) { - if (c->rbytes >= c->rvbytes) { - ms_complete_nread(c); - break; - } - } else { - if (ms_try_read_line(c) != 0) { - break; - } - } - - if (ms_try_read_network(c) != 0) { - break; - } - - /* doesn't read all the response data, wait event wake up */ - if (!c->currcmd.isfinish) { - if (!ms_update_event(c, EV_READ | EV_PERSIST)) { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - break; - } - stop = true; - break; - } - - /* we have no command line and no data to read from network, next write */ - ms_conn_set_state(c, conn_write); - memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ + break; - break; + case conn_write: + if (! c->ctnwrite && ms_need_yield(c)) + { + usleep(10); - case conn_write: - if (!c->ctnwrite && ms_need_yield(c)) { - usleep(10); - - if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - break; - } - stop = true; - break; - } - - if (!c->ctnwrite && ms_exec_task(c) != 0) { - ms_conn_set_state(c, conn_closing); - break; - } - - /* record the start time before starting to send data if necessary */ - if (!c->ctnwrite || (c->change_sfd && c->ctnwrite)) { - if (c->change_sfd) { - c->change_sfd = false; - } - ms_update_start_time(c); - } - - /* change sfd if necessary */ - if (c->change_sfd) { - c->ctnwrite = true; - stop = true; - break; - } - - /* execute task until nothing need be written to network */ - if (!c->ctnwrite && c->msgcurr == c->msgused) { - if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - break; - } - stop = true; - break; - } - - switch (ms_transmit(c)) { - case TRANSMIT_COMPLETE: - /* we have no data to write to network, next wait repose */ - if (!ms_update_event(c, EV_READ | EV_PERSIST)) { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - c->ctnwrite = false; - break; - } - ms_conn_set_state(c, conn_read); - c->ctnwrite = false; - stop = true; - break; - - case TRANSMIT_INCOMPLETE: - c->ctnwrite = true; - break; /* Continue in state machine. */ - - case TRANSMIT_HARD_ERROR: - c->ctnwrite = false; - break; - - case TRANSMIT_SOFT_ERROR: - c->ctnwrite = true; - stop = true; - break; - default: - break; - } - break; + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop= true; + break; + } + + if (! c->ctnwrite && (ms_exec_task(c) != 0)) + { + ms_conn_set_state(c, conn_closing); + break; + } + + /* record the start time before starting to send data if necessary */ + if (! c->ctnwrite || (c->change_sfd && c->ctnwrite)) + { + if (c->change_sfd) + { + c->change_sfd= false; + } + ms_update_start_time(c); + } + + /* change sfd if necessary */ + if (c->change_sfd) + { + c->ctnwrite= true; + stop= true; + break; + } + + /* execute task until nothing need be written to network */ + if (! c->ctnwrite && (c->msgcurr == c->msgused)) + { + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop= true; + break; + } + + switch (ms_transmit(c)) + { + case TRANSMIT_COMPLETE: + /* we have no data to write to network, next wait repose */ + if (! ms_update_event(c, EV_READ | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + c->ctnwrite= false; + break; + } + ms_conn_set_state(c, conn_read); + c->ctnwrite= false; + stop= true; + break; + + case TRANSMIT_INCOMPLETE: + c->ctnwrite= true; + break; /* Continue in state machine. */ + + case TRANSMIT_HARD_ERROR: + c->ctnwrite= false; + break; + + case TRANSMIT_SOFT_ERROR: + c->ctnwrite= true; + stop= true; + break; + + default: + break; + } /* switch */ - case conn_closing: - /* recovery mode, need reconnect if connection close */ - if (ms_setting.reconnect && (!ms_global.time_out - || (ms_setting.run_time == 0 && c->remain_exec_num > 0))) { - - if (ms_reconn(c) != 0) { - ms_conn_close(c); - stop = true; - break; - } - - ms_reset_conn(c, false); - - if (c->total_sfds == 1) { - if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - break; - } - } - - break; - } else { - ms_conn_close(c); - stop = true; - break; - } - default: - assert(0); + break; + + case conn_closing: + /* recovery mode, need reconnect if connection close */ + if (ms_setting.reconnect && (! ms_global.time_out + || ((ms_setting.run_time == 0) + && (c->remain_exec_num > 0)))) + { + if (ms_reconn(c) != 0) + { + ms_conn_close(c); + stop= true; + break; } - } - return; -} + ms_reset_conn(c, false); + + if (c->total_sfds == 1) + { + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + } + + break; + } + else + { + ms_conn_close(c); + stop= true; + break; + } + + default: + assert(0); + } /* switch */ + } +} /* ms_drive_machine */ + /** * the event handler of each thread @@ -2261,30 +2701,29 @@ static void ms_drive_machine(ms_conn_t *c) */ void ms_event_handler(const int fd, const short which, void *arg) { - ms_conn_t *c = (ms_conn_t *)arg; - assert(c != NULL); + ms_conn_t *c= (ms_conn_t *)arg; - c->which = which; + assert(c != NULL); - /* sanity */ - if (fd != c->sfd) { - fprintf(stderr, "Catastrophic: event fd: %d doesn't match conn fd: %d\n", - fd, c->sfd); - ms_conn_close(c); - exit(1); - } - assert(fd == c->sfd); + c->which= which; - /* event timeout, close the current connection */ - if (c->which == EV_TIMEOUT) { - ms_conn_set_state(c, conn_closing); - } + /* sanity */ + if (fd != c->sfd) + { + fprintf(stderr, + "Catastrophic: event fd: %d doesn't match conn fd: %d\n", + fd, + c->sfd); + ms_conn_close(c); + exit(1); + } + assert(fd == c->sfd); - ms_drive_machine(c); + ms_drive_machine(c); + + /* wait for next event */ +} /* ms_event_handler */ - /* wait for next event */ - return; -} /** * get the next socket descriptor index to run for replication @@ -2292,44 +2731,57 @@ void ms_event_handler(const int fd, const short which, void *arg) * @param c, pointer of the concurrency * @param cmd, command(get or set ) * - * @return int, if success, return the index, else return 0 + * @return int, if success, return the index, else return EXIT_SUCCESS */ -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) { - int sock_index = -1; - int i = 0; - - if (c->total_sfds == 1) { - return 0; - } - - if (ms_setting.rep_write_srv == 0) { - return sock_index; - } - - do { - if (cmd == CMD_SET) { - for (i = 0; i < ms_setting.rep_write_srv; i++) { - if (c->tcpsfd[i] > 0) { - break; - } - } - - if (i == ms_setting.rep_write_srv) { - /* random get one replication server to read */ - sock_index = (int)(random() % c->total_sfds); - } else { - /* random get one replication writing server to write */ - sock_index = (int)(random() % ms_setting.rep_write_srv); - } - } else if (cmd == CMD_GET) { - /* random get one replication server to read */ - sock_index = (int)(random() % c->total_sfds); - } - } while (c->tcpsfd[sock_index] == 0); + uint32_t sock_index= 0; + uint32_t i= 0; + + if (c->total_sfds == 1) + { + return EXIT_SUCCESS; + } + if (ms_setting.rep_write_srv == 0) + { return sock_index; -} + } + + do + { + if (cmd == CMD_SET) + { + for (i= 0; i < ms_setting.rep_write_srv; i++) + { + if (c->tcpsfd[i] > 0) + { + break; + } + } + + if (i == ms_setting.rep_write_srv) + { + /* random get one replication server to read */ + sock_index= (uint32_t)random() % c->total_sfds; + } + else + { + /* random get one replication writing server to write */ + sock_index= (uint32_t)random() % ms_setting.rep_write_srv; + } + } + else if (cmd == CMD_GET) + { + /* random get one replication server to read */ + sock_index= (uint32_t)random() % c->total_sfds; + } + } + while (c->tcpsfd[sock_index] == 0); + + return sock_index; +} /* ms_get_rep_sock_index */ + /** * get the next socket descriptor index to run @@ -2338,74 +2790,92 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) * * @return int, return the index */ -static int ms_get_next_sock_index(ms_conn_t *c) +static uint32_t ms_get_next_sock_index(ms_conn_t *c) { - int sock_index = 0; + uint32_t sock_index= 0; - do { - sock_index = (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx; - } while (c->tcpsfd[sock_index] == 0); + do + { + sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx; + } + while (c->tcpsfd[sock_index] == 0); + + return sock_index; +} /* ms_get_next_sock_index */ - return sock_index; -} /** * update socket event of the connections * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_update_conn_sock_event(ms_conn_t *c) { - assert(c != NULL); + assert(c != NULL); + + switch (c->currcmd.cmd) + { + case CMD_SET: + if (ms_setting.facebook_test && c->udp) + { + c->sfd= c->tcpsfd[0]; + c->udp= false; + c->change_sfd= true; + } + break; + + case CMD_GET: + if (ms_setting.facebook_test && ! c->udp) + { + c->sfd= c->udpsfd; + c->udp= true; + c->change_sfd= true; + } + break; + + default: + break; + } /* switch */ + + if (! c->udp && (c->total_sfds > 1)) + { + if (c->cur_idx != c->total_sfds) + { + if (ms_setting.rep_write_srv == 0) + { + c->cur_idx= ms_get_next_sock_index(c); + } + else + { + c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd); + } + } + else + { + /* must select the first sock of the connection at the beginning */ + c->cur_idx= 0; + } + + c->sfd= c->tcpsfd[c->cur_idx]; + assert(c->sfd != 0); + c->change_sfd= true; + } + + if (c->change_sfd) + { + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + return -1; + } + } + + return EXIT_SUCCESS; +} /* ms_update_conn_sock_event */ - switch (c->currcmd.cmd) { - case CMD_SET: - if (ms_setting.facebook_test && c->udp) { - c->sfd = c->tcpsfd[0]; - c->udp = false; - c->change_sfd = true; - } - break; - case CMD_GET: - if (ms_setting.facebook_test && !c->udp) { - c->sfd = c->udpsfd; - c->udp = true; - c->change_sfd = true; - } - break; - default: - break; - } - - if (!c->udp && c->total_sfds > 1) { - if (c->cur_idx != c->total_sfds) { - if (ms_setting.rep_write_srv == 0) { - c->cur_idx = ms_get_next_sock_index(c); - } else { - c->cur_idx = ms_get_rep_sock_index(c, c->currcmd.cmd); - } - } else { - /* must select the first sock of the connection at the beginning */ - c->cur_idx = 0; - } - - c->sfd = c->tcpsfd[c->cur_idx]; - assert(c->sfd != 0); - c->change_sfd = true; - } - - if (c->change_sfd) { - if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - return -1; - } - } - - return 0; -} /** * for ASCII protocol, this function build the set command @@ -2415,42 +2885,54 @@ static int ms_update_conn_sock_event(ms_conn_t *c) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) { - int value_offset; - int write_len; - char *buffer = c->wbuf; - - write_len= sprintf(buffer, " %u %d %d\r\n", 0, item->exp_time, item->value_size); - - if (write_len > c->wsize) { - /* ought to be always enough. just fail for simplicity */ - fprintf(stderr, "output command line too long.\n"); - return -1; - } - - if (item->value_offset == INVALID_OFFSET) { - value_offset = item->key_suffix_offset; - } else { - value_offset = item->value_offset; - } - - if (ms_add_iov(c, "set ", 4) != 0 || - ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 || - ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], - item->key_size - (int)KEY_PREFIX_SIZE) != 0 || - ms_add_iov(c, buffer, write_len) != 0 || - ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size) != 0 || - ms_add_iov(c, "\r\n", 2) != 0 || - (c->udp && ms_build_udp_headers(c) != 0)) { + int value_offset; + int write_len; + char *buffer= c->wbuf; + + write_len= snprintf(buffer, + c->wsize, + " %u %d %d\r\n", + 0, + item->exp_time, + item->value_size); + + if (write_len > c->wsize || write_len < 0) + { + /* ought to be always enough. just fail for simplicity */ + fprintf(stderr, "output command line too long.\n"); + return -1; + } + + if (item->value_offset == INVALID_OFFSET) + { + value_offset= item->key_suffix_offset; + } + else + { + value_offset= item->value_offset; + } + + if ((ms_add_iov(c, "set ", 4) != 0) + || (ms_add_iov(c, (char *)&item->key_prefix, + (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, buffer, write_len) != 0) + || (ms_add_iov(c, &ms_setting.char_block[value_offset], + item->value_size) != 0) + || (ms_add_iov(c, "\r\n", 2) != 0) + || (c->udp && (ms_build_udp_headers(c) != 0))) + { + return -1; + } - return -1; - } + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_set */ - return 0; -} /** * used to send set command to server @@ -2459,44 +2941,53 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) { - assert(c != NULL); + assert(c != NULL); - c->currcmd.cmd = CMD_SET; - c->currcmd.isfinish = false; - c->currcmd.retstat = MCD_FAILURE; + c->currcmd.cmd= CMD_SET; + c->currcmd.isfinish= false; + c->currcmd.retstat= MCD_FAILURE; - if (ms_update_conn_sock_event(c) != 0) { - return -1; - } + if (ms_update_conn_sock_event(c) != 0) + { + return -1; + } + + c->msgcurr= 0; + c->msgused= 0; + c->iovused= 0; + if (ms_add_msghdr(c) != 0) + { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } - c->msgcurr = 0; - c->msgused = 0; - c->iovused = 0; - if (ms_add_msghdr(c) != 0) { - fprintf(stderr, "Out of memory preparing request."); - return -1; + /* binary protocol */ + if (c->protocol == binary_prot) + { + if (ms_build_bin_write_buf_set(c, item) != 0) + { + return -1; } - - /* binary protocol */ - if (c->protocol == binary_prot) { - if (ms_build_bin_write_buf_set(c, item) != 0) { - return -1; - } - } else { - if (ms_build_ascii_write_buf_set(c, item) != 0) { - return -1; - } + } + else + { + if (ms_build_ascii_write_buf_set(c, item) != 0) + { + return -1; } + } - __sync_fetch_and_add(&ms_stats.obj_bytes, item->key_size + item->value_size); - __sync_fetch_and_add(&ms_stats.cmd_set, 1); + atomic_add_size(&ms_stats.obj_bytes, + item->key_size + item->value_size); + atomic_add_size(&ms_stats.cmd_set, 1); + + return EXIT_SUCCESS; +} /* ms_mcd_set */ - return 0; -} /** * for ASCII protocol, this function build the get command @@ -2506,22 +2997,24 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) { - if (ms_add_iov(c, "get ", 4) != 0 || - ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 || - ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], - item->key_size - (int)KEY_PREFIX_SIZE) != 0 || - ms_add_iov(c, "\r\n", 2) != 0 || - (c->udp && ms_build_udp_headers(c) != 0)) { + if ((ms_add_iov(c, "get ", 4) != 0) + || (ms_add_iov(c, (char *)&item->key_prefix, + (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, "\r\n", 2) != 0) + || (c->udp && (ms_build_udp_headers(c) != 0))) + { + return -1; + } - return -1; - } + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_get */ - return 0; -} /** * used to send the get command to server @@ -2529,48 +3022,52 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param c, pointer of the concurrency * @param item, pointer of task item which includes the object * information - * @param verify, whether do verification * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) { - /* verify not supported yet */ - UNUSED_ARGUMENT(verify); + assert(c != NULL); - assert(c != NULL); + c->currcmd.cmd= CMD_GET; + c->currcmd.isfinish= false; + c->currcmd.retstat= MCD_FAILURE; - c->currcmd.cmd = CMD_GET; - c->currcmd.isfinish = false; - c->currcmd.retstat = MCD_FAILURE; + if (ms_update_conn_sock_event(c) != 0) + { + return -1; + } + + c->msgcurr= 0; + c->msgused= 0; + c->iovused= 0; + if (ms_add_msghdr(c) != 0) + { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } - if (ms_update_conn_sock_event(c) != 0) { - return -1; + /* binary protocol */ + if (c->protocol == binary_prot) + { + if (ms_build_bin_write_buf_get(c, item) != 0) + { + return -1; } - - c->msgcurr = 0; - c->msgused = 0; - c->iovused = 0; - if (ms_add_msghdr(c) != 0) { - fprintf(stderr, "Out of memory preparing request."); - return -1; + } + else + { + if (ms_build_ascii_write_buf_get(c, item) != 0) + { + return -1; } + } - /* binary protocol */ - if (c->protocol == binary_prot) { - if (ms_build_bin_write_buf_get(c, item) != 0) { - return -1; - } - } else { - if (ms_build_ascii_write_buf_get(c, item) != 0) { - return -1; - } - } + atomic_add_size(&ms_stats.cmd_get, 1); - __sync_fetch_and_add(&ms_stats.cmd_get, 1); + return EXIT_SUCCESS; +} /* ms_mcd_get */ - return 0; -} /** * for ASCII protocol, this function build the multi-get command @@ -2578,84 +3075,101 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) { - ms_task_item_t *item; - - if (ms_add_iov(c, "get", 3) != 0) { - return -1; - } + ms_task_item_t *item; - for (int i = 0; i < c->mlget_task.mlget_num; i++) { - item = c->mlget_task.mlget_item[i].item; - assert(item != NULL); - if (ms_add_iov(c, " ", 1) != 0 || - ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 || - ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], - item->key_size - (int)KEY_PREFIX_SIZE) != 0) { - return -1; - } - } + if (ms_add_iov(c, "get", 3) != 0) + { + return -1; + } + + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + item= c->mlget_task.mlget_item[i].item; + assert(item != NULL); + if ((ms_add_iov(c, " ", 1) != 0) + || (ms_add_iov(c, (char *)&item->key_prefix, + (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int)KEY_PREFIX_SIZE) != 0)) + { + return -1; + } + } + + if ((ms_add_iov(c, "\r\n", 2) != 0) + || (c->udp && (ms_build_udp_headers(c) != 0))) + { + return -1; + } - if (ms_add_iov(c, "\r\n", 2) != 0 || - (c->udp && ms_build_udp_headers(c) != 0)) { - return -1; - } + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_mlget */ - return 0; -} /** * used to send the multi-get command to server * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_mcd_mlget(ms_conn_t *c) { - ms_task_item_t *item; + ms_task_item_t *item; - assert(c != NULL); - assert(c->mlget_task.mlget_num >= 1); + assert(c != NULL); + assert(c->mlget_task.mlget_num >= 1); - c->currcmd.cmd = CMD_GET; - c->currcmd.isfinish = false; - c->currcmd.retstat = MCD_FAILURE; + c->currcmd.cmd= CMD_GET; + c->currcmd.isfinish= false; + c->currcmd.retstat= MCD_FAILURE; - if (ms_update_conn_sock_event(c) != 0) { - return -1; - } + if (ms_update_conn_sock_event(c) != 0) + { + return -1; + } + + c->msgcurr= 0; + c->msgused= 0; + c->iovused= 0; + if (ms_add_msghdr(c) != 0) + { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } - c->msgcurr = 0; - c->msgused = 0; - c->iovused = 0; - if (ms_add_msghdr(c) != 0) { - fprintf(stderr, "Out of memory preparing request."); - return -1; + /* binary protocol */ + if (c->protocol == binary_prot) + { + if (ms_build_bin_write_buf_mlget(c) != 0) + { + return -1; } - - /* binary protocol */ - if (c->protocol == binary_prot) { - if (ms_build_bin_write_buf_mlget(c) != 0) { - return -1; - } - } else { - if (ms_build_ascii_write_buf_mlget(c) != 0) { - return -1; - } + } + else + { + if (ms_build_ascii_write_buf_mlget(c) != 0) + { + return -1; } + } - /* decrease operation time of each item */ - for (int i = 0; i < c->mlget_task.mlget_num; i++) { - item = c->mlget_task.mlget_item[i].item; - __sync_fetch_and_add(&ms_stats.cmd_get, 1); - } + /* decrease operation time of each item */ + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + item= c->mlget_task.mlget_item[i].item; + atomic_add_size(&ms_stats.cmd_get, 1); + } + + (void)item; + + return EXIT_SUCCESS; +} /* ms_mcd_mlget */ - return 0; -} /** * binary protocol support @@ -2666,75 +3180,95 @@ int ms_mcd_mlget(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_bin_process_response(ms_conn_t *c) { - const char *errstr = NULL; + const char *errstr= NULL; + + assert(c != NULL); + + uint32_t bodylen= c->binary_header.response.bodylen; + uint8_t opcode= c->binary_header.response.opcode; + uint16_t status= c->binary_header.response.status; + + if (bodylen > 0) + { + c->rvbytes= (int32_t)bodylen; + c->readval= true; + return EXIT_FAILURE; + } + else + { + switch (status) + { + case PROTOCOL_BINARY_RESPONSE_SUCCESS: + if (opcode == PROTOCOL_BINARY_CMD_SET) + { + c->currcmd.retstat= MCD_STORED; + } + else if (opcode == PROTOCOL_BINARY_CMD_DELETE) + { + c->currcmd.retstat= MCD_DELETED; + } + else if (opcode == PROTOCOL_BINARY_CMD_GET) + { + c->currcmd.retstat= MCD_END; + } + break; + + case PROTOCOL_BINARY_RESPONSE_ENOMEM: + errstr= "Out of memory"; + c->currcmd.retstat= MCD_SERVER_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: + errstr= "Unknown command"; + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + break; + + case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: + errstr= "Not found"; + c->currcmd.retstat= MCD_NOTFOUND; + break; + + case PROTOCOL_BINARY_RESPONSE_EINVAL: + errstr= "Invalid arguments"; + c->currcmd.retstat= MCD_PROTOCOL_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: + errstr= "Data exists for key."; + break; + + case PROTOCOL_BINARY_RESPONSE_E2BIG: + errstr= "Too large."; + c->currcmd.retstat= MCD_SERVER_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_NOT_STORED: + errstr= "Not stored."; + c->currcmd.retstat= MCD_NOTSTORED; + break; - assert(c != NULL); - - uint32_t bodylen = c->binary_header.response.bodylen; - uint8_t opcode = c->binary_header.response.opcode; - uint16_t status = c->binary_header.response.status; - - if (bodylen > 0) { - c->rvbytes = (int32_t)bodylen; - c->readval = true; - return 1; - } else { - switch (status) { - case PROTOCOL_BINARY_RESPONSE_SUCCESS: - if (opcode == PROTOCOL_BINARY_CMD_SET) { - c->currcmd.retstat = MCD_STORED; - } else if (opcode == PROTOCOL_BINARY_CMD_DELETE) { - c->currcmd.retstat = MCD_DELETED; - } else if (opcode == PROTOCOL_BINARY_CMD_GET) { - c->currcmd.retstat = MCD_END; - } - break; - case PROTOCOL_BINARY_RESPONSE_ENOMEM: - errstr = "Out of memory"; - c->currcmd.retstat = MCD_SERVER_ERROR; - break; - case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: - errstr = "Unknown command"; - c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; - break; - case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: - errstr = "Not found"; - c->currcmd.retstat = MCD_NOTFOUND; - break; - case PROTOCOL_BINARY_RESPONSE_EINVAL: - errstr = "Invalid arguments"; - c->currcmd.retstat = MCD_PROTOCOL_ERROR; - break; - case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: - errstr = "Data exists for key."; - break; - case PROTOCOL_BINARY_RESPONSE_E2BIG: - errstr = "Too large."; - c->currcmd.retstat = MCD_SERVER_ERROR; - break; - case PROTOCOL_BINARY_RESPONSE_NOT_STORED: - errstr = "Not stored."; - c->currcmd.retstat = MCD_NOTSTORED; - break; - default: - errstr = "Unknown error"; - c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE; - break; - } + default: + errstr= "Unknown error"; + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + break; + } /* switch */ - if (errstr != NULL) { - fprintf(stderr, "%s\n", errstr); - } + if (errstr != NULL) + { + fprintf(stderr, "%s\n", errstr); } + } + + return EXIT_SUCCESS; +} /* ms_bin_process_response */ - return 0; -} /* build binary header and add the header to the buffer to send */ + /** * build binary header and add the header to the buffer to send * @@ -2744,28 +3278,33 @@ static int ms_bin_process_response(ms_conn_t *c) * @param key_len, length of key * @param body_len. length of body */ -static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, - uint16_t key_len, uint32_t body_len) { - protocol_binary_request_header* header; +static void ms_add_bin_header(ms_conn_t *c, + uint8_t opcode, + uint8_t hdr_len, + uint16_t key_len, + uint32_t body_len) +{ + protocol_binary_request_header *header; - assert(c != NULL); + assert(c != NULL); - header = (protocol_binary_request_header *)c->wcurr; + header= (protocol_binary_request_header *)c->wcurr; - header->request.magic = (uint8_t)PROTOCOL_BINARY_REQ; - header->request.opcode = (uint8_t)opcode; - header->request.keylen = htonl(key_len); + header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ; + header->request.opcode= (uint8_t)opcode; + header->request.keylen= htons(key_len); - header->request.extlen = (uint8_t)hdr_len; - header->request.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES; - header->request.reserved = 0; + header->request.extlen= (uint8_t)hdr_len; + header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES; + header->request.vbucket= 0; - header->request.bodylen = htonl(body_len); - header->request.opaque = 0; - header->request.cas = 0; + header->request.bodylen= htonl(body_len); + header->request.opaque= 0; + header->request.cas= 0; + + ms_add_iov(c, c->wcurr, sizeof(header->request)); +} /* ms_add_bin_header */ - ms_add_iov(c, c->wcurr, sizeof(header->request)); -} /** * add the key to the socket write buffer array @@ -2776,11 +3315,12 @@ static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len, */ static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) { - ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE); - ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], - item->key_size - (int)KEY_PREFIX_SIZE); + ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE); + ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int)KEY_PREFIX_SIZE); } + /** * for binary protocol, this function build the set command * and add the command to send buffer array. @@ -2789,32 +3329,41 @@ static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) { - assert(c->wbuf == c->wcurr); - - int value_offset; - protocol_binary_request_set* rep = (protocol_binary_request_set*)c->wcurr; - uint16_t keylen = (uint16_t)item->key_size; - uint32_t bodylen = (uint32_t)sizeof(rep->message.body) + (uint32_t)keylen + (uint32_t)item->value_size; - - ms_add_bin_header(c, PROTOCOL_BINARY_CMD_SET, sizeof(rep->message.body), keylen, bodylen); - rep->message.body.flags = 0; - rep->message.body.expiration = htonl((uint32_t)item->exp_time); - ms_add_iov(c, &rep->message.body, sizeof(rep->message.body)); - ms_add_key_to_iov(c, item); - - if (item->value_offset == INVALID_OFFSET) { - value_offset = item->key_suffix_offset; - } else { - value_offset = item->value_offset; - } - ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size); + assert(c->wbuf == c->wcurr); + + int value_offset; + protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr; + uint16_t keylen= (uint16_t)item->key_size; + uint32_t bodylen= (uint32_t)sizeof(rep->message.body) + + (uint32_t)keylen + (uint32_t)item->value_size; + + ms_add_bin_header(c, + PROTOCOL_BINARY_CMD_SET, + sizeof(rep->message.body), + keylen, + bodylen); + rep->message.body.flags= 0; + rep->message.body.expiration= htonl((uint32_t)item->exp_time); + ms_add_iov(c, &rep->message.body, sizeof(rep->message.body)); + ms_add_key_to_iov(c, item); + + if (item->value_offset == INVALID_OFFSET) + { + value_offset= item->key_suffix_offset; + } + else + { + value_offset= item->value_offset; + } + ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size); + + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_set */ - return 0; -} /** * for binary protocol, this function build the get command and @@ -2824,17 +3373,19 @@ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) { - assert(c->wbuf == c->wcurr); + assert(c->wbuf == c->wcurr); - ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, (uint32_t)item->key_size); - ms_add_key_to_iov(c, item); + ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, + (uint32_t)item->key_size); + ms_add_key_to_iov(c, item); + + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_get */ - return 0; -} /** * for binary protocol, this function build the multi-get @@ -2844,25 +3395,29 @@ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_mlget(ms_conn_t *c) { - ms_task_item_t *item; + ms_task_item_t *item; - assert(c->wbuf == c->wcurr); + assert(c->wbuf == c->wcurr); - for (int i = 0; i < c->mlget_task.mlget_num; i++) { - item = c->mlget_task.mlget_item[i].item; - assert(item != NULL); + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + item= c->mlget_task.mlget_item[i].item; + assert(item != NULL); - ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, (uint32_t)item->key_size); - ms_add_key_to_iov(c, item); - c->wcurr += sizeof(protocol_binary_request_get); - } - - c->wcurr = c->wbuf; + ms_add_bin_header(c, + PROTOCOL_BINARY_CMD_GET, + 0, + (uint16_t)item->key_size, + (uint32_t)item->key_size); + ms_add_key_to_iov(c, item); + c->wcurr+= sizeof(protocol_binary_request_get); + } - return 0; -} + c->wcurr= c->wbuf; + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_mlget */