X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fms_conn.c;h=b4d5f673d11df8ce616ec7494f1c3a4dc49faf76;hb=9e4ae978d7d40c4f1c7d221c61a07a31a5e270d5;hp=ad12a113b11f5015c7097d0e4345e9ea7bdc7ca6;hpb=604540c15315c6ed876082529f639697ba3eceab;p=m6w6%2Flibmemcached diff --git a/clients/ms_conn.c b/clients/ms_conn.c index ad12a113..b4d5f673 100644 --- a/clients/ms_conn.c +++ b/clients/ms_conn.c @@ -12,14 +12,39 @@ #include "config.h" #include +#include #include #include #include #include #include +#include #include +#if TIME_WITH_SYS_TIME +# include +# include +#else +# if HAVE_SYS_TIME_H +# include +# else +# include +# endif +#endif #include "ms_setting.h" #include "ms_thread.h" +#include "ms_atomic.h" + +#ifdef linux +/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to + * optimize the conversion functions, but the prototypes generate warnings + * from gcc. The conversion methods isn't the bottleneck for my app, so + * just remove the warnings by undef'ing the optimization .. + */ +#undef ntohs +#undef ntohl +#undef htons +#undef htonl +#endif /* for network write */ #define TRANSMIT_COMPLETE 0 @@ -39,12 +64,12 @@ static uint64_t key_prefix_seq= KEY_PREFIX_BASE; /* global increasing counter, generating request id for UDP */ -static int udp_request_id= 0; +static volatile uint32_t udp_request_id= 0; -extern __thread ms_thread_t ms_thread; +extern pthread_key_t ms_thread_key; /* generate upd request id */ -static int ms_get_udp_request_id(void); +static uint32_t ms_get_udp_request_id(void); /* connect initialize */ @@ -106,8 +131,8 @@ static int ms_transmit(ms_conn_t *c); static void ms_conn_shrink(ms_conn_t *c); static void ms_conn_set_state(ms_conn_t *c, int state); static bool ms_update_event(ms_conn_t *c, const int new_flags); -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd); -static int ms_get_next_sock_index(ms_conn_t *c); +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd); +static uint32_t ms_get_next_sock_index(ms_conn_t *c); static int ms_update_conn_sock_event(ms_conn_t *c); static bool ms_need_yield(ms_conn_t *c); static void ms_update_start_time(ms_conn_t *c); @@ -165,9 +190,9 @@ uint64_t ms_get_key_prefix(void) * * @return an unique UDP request id */ -static int ms_get_udp_request_id(void) +static uint32_t ms_get_udp_request_id(void) { - return __sync_fetch_and_add(&udp_request_id, 1); + return atomic_add_32_nv(&udp_request_id, 1); } @@ -199,7 +224,7 @@ static void ms_task_init(ms_conn_t *c) * @param c, pointer of the concurrency * @param is_udp, whether it's udp * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) { @@ -235,7 +260,7 @@ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t)); } - return 0; + return EXIT_SUCCESS; } /* ms_conn_udp_init */ @@ -247,7 +272,7 @@ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) * @param read_buffer_size * @param is_udp, whether it's udp * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_init(ms_conn_t *c, const int init_state, @@ -268,7 +293,7 @@ static int ms_conn_init(ms_conn_t *c, /* for replication, each connection need connect all the server */ if (ms_setting.rep_write_srv > 0) { - c->total_sfds= ms_setting.srv_cnt; + c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn; } else { @@ -333,14 +358,10 @@ static int ms_conn_init(ms_conn_t *c, c->mlget_task.mlget_num= 0; c->mlget_task.value_index= -1; /* default invalid value */ - if (ms_setting.binary_prot) + if (ms_setting.binary_prot_) { c->protocol= binary_prot; } - else if (is_udp) - { - c->protocol= ascii_udp_prot; - } else { c->protocol= ascii_prot; @@ -357,10 +378,10 @@ static int ms_conn_init(ms_conn_t *c, if (! (ms_setting.facebook_test && is_udp)) { - __sync_fetch_and_add(&ms_stats.active_conns, 1); + atomic_add_32(&ms_stats.active_conns, 1); } - return 0; + return EXIT_SUCCESS; } /* ms_conn_init */ @@ -393,15 +414,16 @@ static void ms_warmup_num_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_item_win_init(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int exp_cnt= 0; c->win_size= (int)ms_setting.win_size; c->set_cursor= 0; - c->exec_num= ms_thread.thread_ctx->exec_num_perconn; + c->exec_num= ms_thread->thread_ctx->exec_num_perconn; c->remain_exec_num= c->exec_num; c->item_win= (ms_task_item_t *)malloc( @@ -436,7 +458,7 @@ static int ms_item_win_init(ms_conn_t *c) ms_warmup_num_init(c); - return 0; + return EXIT_SUCCESS; } /* ms_item_win_init */ @@ -447,13 +469,14 @@ static int ms_item_win_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_sock_init(ms_conn_t *c) { - int i; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t i; int ret_sfd; - int srv_idx= 0; + uint32_t srv_idx= 0; assert(c != NULL); assert(c->tcpsfd != NULL); @@ -464,12 +487,12 @@ static int ms_conn_sock_init(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { /* for replication, each connection need connect all the server */ - srv_idx= i; + srv_idx= i % ms_setting.srv_cnt; } else { /* all the connections in a thread connects the same server */ - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; } if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, @@ -516,7 +539,7 @@ static int ms_conn_sock_init(ms_conn_t *c) } else { - for (int j= 0; j < i; j++) + for (uint32_t j= 0; j < i; j++) { close(c->tcpsfd[j]); } @@ -530,7 +553,7 @@ static int ms_conn_sock_init(ms_conn_t *c) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_conn_sock_init */ @@ -540,37 +563,23 @@ static int ms_conn_sock_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_event_init(ms_conn_t *c) { - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); short event_flags= EV_WRITE | EV_PERSIST; event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); - event_base_set(ms_thread.base, &c->event); + event_base_set(ms_thread->base, &c->event); c->ev_flags= event_flags; - if (c->total_sfds == 1) + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, NULL) == -1) - { - return -1; - } - } - else - { - if (event_add(&c->event, &t) == -1) - { - return -1; - } + return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_conn_event_init */ @@ -580,7 +589,7 @@ static int ms_conn_event_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_setup_conn(ms_conn_t *c) { @@ -604,7 +613,7 @@ int ms_setup_conn(ms_conn_t *c) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_setup_conn */ @@ -615,6 +624,7 @@ int ms_setup_conn(ms_conn_t *c) */ void ms_conn_free(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); if (c != NULL) { if (c->hdrbuf != NULL) @@ -638,9 +648,9 @@ void ms_conn_free(ms_conn_t *c) if (c->tcpsfd != NULL) free(c->tcpsfd); - if (--ms_thread.nactive_conn == 0) + if (--ms_thread->nactive_conn == 0) { - free(ms_thread.conn); + free(ms_thread->conn); } } } /* ms_conn_free */ @@ -653,12 +663,13 @@ void ms_conn_free(ms_conn_t *c) */ static void ms_conn_close(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); assert(c != NULL); /* delete the event, the socket and the connection */ event_del(&c->event); - for (int i= 0; i < c->total_sfds; i++) + for (uint32_t i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] > 0) { @@ -672,7 +683,7 @@ static void ms_conn_close(ms_conn_t *c) close(c->udpsfd); } - __sync_fetch_and_sub(&ms_stats.active_conns, 1); + atomic_dec_32(&ms_stats.active_conns); ms_conn_free(c); @@ -684,7 +695,7 @@ static void ms_conn_close(ms_conn_t *c) pthread_mutex_unlock(&ms_global.run_lock.lock); } - if (ms_thread.nactive_conn == 0) + if (ms_thread->nactive_conn == 0) { pthread_exit(NULL); } @@ -696,7 +707,7 @@ static void ms_conn_close(ms_conn_t *c) * * @param ai, server address information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_new_socket(struct addrinfo *ai) { @@ -760,7 +771,7 @@ static void ms_maximize_sndbuf(const int sfd) * @param is_udp, whether it's udp * @param ret_sfd, the connected socket file descriptor * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_network_connect(ms_conn_t *c, char *srv_host_name, @@ -787,7 +798,11 @@ static int ms_network_connect(ms_conn_t *c, * that otherwise mess things up. */ memset(&hints, 0, sizeof(hints)); +#ifdef AI_ADDRCONFIG hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG; +#else + hints.ai_flags= AI_PASSIVE; +#endif /* AI_ADDRCONFIG */ if (is_udp) { hints.ai_protocol= IPPROTO_UDP; @@ -879,21 +894,22 @@ static int ms_network_connect(ms_conn_t *c, * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_reconn(ms_conn_t *c) { - int srv_idx= 0; - int srv_conn_cnt= 0; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t srv_idx= 0; + uint32_t srv_conn_cnt= 0; if (ms_setting.rep_write_srv > 0) { - srv_idx= c->cur_idx; - srv_conn_cnt= ms_setting.nconns; + srv_idx= c->cur_idx % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; } else { - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; } @@ -901,7 +917,7 @@ static int ms_reconn(ms_conn_t *c) close(c->sfd); c->tcpsfd[c->cur_idx]= 0; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) % srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); @@ -912,7 +928,8 @@ static int ms_reconn(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - int i= 0; + uint32_t i= 0; + for (i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] != 0) @@ -937,8 +954,8 @@ static int ms_reconn(ms_conn_t *c) ms_setting.udp, &c->sfd) == 0) { c->tcpsfd[c->cur_idx]= c->sfd; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % srv_conn_cnt == 0) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); int reconn_time= @@ -952,13 +969,13 @@ static int ms_reconn(ms_conn_t *c) break; } - if (c->total_sfds == 1) + if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) { /* wait a second and reconnect */ sleep(1); } } - while (c->total_sfds == 1); + while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); } if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) @@ -967,7 +984,7 @@ static int ms_reconn(ms_conn_t *c) c->alive_sfds--; } - return 0; + return EXIT_SUCCESS; } /* ms_reconn */ @@ -979,23 +996,24 @@ static int ms_reconn(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_reconn_socks(ms_conn_t *c) { - int srv_idx= 0; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t srv_idx= 0; int ret_sfd= 0; - int srv_conn_cnt= 0; + uint32_t srv_conn_cnt= 0; struct timeval cur_time; assert(c != NULL); if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds)) { - return 0; + return EXIT_SUCCESS; } - for (int i= 0; i < c->total_sfds; i++) + for (uint32_t i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] == 0) { @@ -1015,12 +1033,12 @@ int ms_reconn_socks(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - srv_idx= i; - srv_conn_cnt= ms_setting.nconns; + srv_idx= i % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; } else { - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; } @@ -1031,8 +1049,8 @@ int ms_reconn_socks(ms_conn_t *c) c->tcpsfd[i]= ret_sfd; c->alive_sfds++; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % srv_conn_cnt == 0) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); int reconn_time= @@ -1047,7 +1065,7 @@ int ms_reconn_socks(ms_conn_t *c) } } - return 0; + return EXIT_SUCCESS; } /* ms_reconn_socks */ @@ -1119,7 +1137,7 @@ static int ms_tokenize_command(char *command, * @param c, pointer of the concurrency * @param command, the string responded by server * - * @return int, if the command completed return 0, else return + * @return int, if the command completed return EXIT_SUCCESS, else return * -1 */ static int ms_ascii_process_line(ms_conn_t *c, char *command) @@ -1263,7 +1281,7 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) { if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) { - memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets); + memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets); } c->packets= 0; @@ -1276,6 +1294,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) c->ctnwrite= false; c->rbytes= 0; c->rcurr= c->rbuf; + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; ms_conn_set_state(c, conn_write); memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ @@ -1291,7 +1312,7 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_try_read_line(ms_conn_t *c) { @@ -1301,7 +1322,7 @@ static int ms_try_read_line(ms_conn_t *c) if ((uint64_t)c->rbytes < sizeof(c->binary_header)) { /* need more data! */ - return 0; + return EXIT_SUCCESS; } else { @@ -1322,16 +1343,16 @@ static int ms_try_read_line(ms_conn_t *c) c->binary_header= *rsp; c->binary_header.response.extlen= rsp->response.extlen; - c->binary_header.response.keylen= ntohl(rsp->response.keylen); + c->binary_header.response.keylen= ntohs(rsp->response.keylen); c->binary_header.response.bodylen= ntohl(rsp->response.bodylen); - c->binary_header.response.status= ntohl(rsp->response.status); + c->binary_header.response.status= ntohs(rsp->response.status); if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) { fprintf(stderr, "Invalid magic: %x\n", c->binary_header.response.magic); ms_conn_set_state(c, conn_closing); - return 0; + return EXIT_SUCCESS; } /* process this complete response */ @@ -1356,11 +1377,11 @@ static int ms_try_read_line(ms_conn_t *c) assert(c->rcurr <= (c->rbuf + c->rsize)); if (c->rbytes == 0) - return 0; + return EXIT_SUCCESS; el= memchr(c->rcurr, '\n', (size_t)c->rbytes); if (! el) - return 0; + return EXIT_SUCCESS; cont= el + 1; if (((el - c->rcurr) > 1) && (*(el - 1) == '\r')) @@ -1568,7 +1589,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) if (res > 0) { - __sync_fetch_and_add(&ms_stats.bytes_read, res); + atomic_add_size(&ms_stats.bytes_read, res); c->rudpbytes+= res; rbytes+= res; if (res == avail) @@ -1602,7 +1623,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) if (copybytes == -1) { - __sync_fetch_and_add(&ms_stats.pkt_disorder, 1); + atomic_add_size(&ms_stats.pkt_disorder, 1); } return copybytes; @@ -1614,7 +1635,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) * close. * before reading, move the remaining incomplete fragment of a command * (if any) to the beginning of the buffer. - * return 0 if there's nothing to read on the first read. + * return EXIT_SUCCESS if there's nothing to read on the first read. */ /** @@ -1625,8 +1646,8 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) * @param c, pointer of the concurrency * * @return int, - * return 0 if there's nothing to read on the first read. - * return 1 if get data + * return EXIT_SUCCESS if there's nothing to read on the first read. + * return EXIT_FAILURE if get data * return -1 if error happens */ static int ms_try_read_network(ms_conn_t *c) @@ -1680,7 +1701,7 @@ static int ms_try_read_network(ms_conn_t *c) { if (! c->udp) { - __sync_fetch_and_add(&ms_stats.bytes_read, res); + atomic_add_size(&ms_stats.bytes_read, res); } gotdata= 1; c->rbytes+= res; @@ -1744,7 +1765,7 @@ static void ms_verify_value(ms_conn_t *c, if (curr_time.tv_sec - c->curr_task.item->client_time > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) { - __sync_fetch_and_add(&ms_stats.exp_get, 1); + atomic_add_size(&ms_stats.exp_get, 1); if (ms_setting.verbose) { @@ -1758,7 +1779,7 @@ static void ms_verify_value(ms_conn_t *c, "\n<%d expire time verification failed, " "object expired but get it now\n" "\tkey len: %d\n" - "\tkey: %lx %.*s\n" + "\tkey: %" PRIx64 " %.*s\n" "\tset time: %s current time: %s " "diff time: %d expire time: %d\n" "\texpected data: \n" @@ -1785,14 +1806,14 @@ static void ms_verify_value(ms_conn_t *c, if ((c->curr_task.item->value_size != vlen) || (memcmp(orignval, value, (size_t)vlen) != 0)) { - __sync_fetch_and_add(&ms_stats.vef_failed, 1); + atomic_add_size(&ms_stats.vef_failed, 1); if (ms_setting.verbose) { fprintf(stderr, "\n<%d data verification failed\n" "\tkey len: %d\n" - "\tkey: %lx %.*s\n" + "\tkey: %" PRIx64" %.*s\n" "\texpected data len: %d\n" "\texpected data: %.*s\n" "\treceived data len: %d\n" @@ -1833,7 +1854,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot); + assert(c->protocol == ascii_prot); if (c->rvbytes > 2) { assert( @@ -1973,8 +1994,7 @@ static void ms_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot - || c->protocol == ascii_prot + assert(c->protocol == ascii_prot || c->protocol == binary_prot); if (c->protocol == binary_prot) @@ -1993,7 +2013,7 @@ static void ms_complete_nread(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_add_msghdr(ms_conn_t *c) { @@ -2004,7 +2024,7 @@ static int ms_add_msghdr(ms_conn_t *c) if (c->msgsize == c->msgused) { msg= - realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr)); + realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr)); if (! msg) return -1; @@ -2037,7 +2057,7 @@ static int ms_add_msghdr(ms_conn_t *c) return ms_add_iov(c, NULL, UDP_HEADER_SIZE); } - return 0; + return EXIT_SUCCESS; } /* ms_add_msghdr */ @@ -2047,7 +2067,7 @@ static int ms_add_msghdr(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_ensure_iov_space(ms_conn_t *c) { @@ -2057,7 +2077,7 @@ static int ms_ensure_iov_space(ms_conn_t *c) { int i, iovnum; struct iovec *new_iov= (struct iovec *)realloc(c->iov, - ((uint64_t)c->iovsize + ((size_t)c->iovsize * 2) * sizeof(struct iovec)); if (! new_iov) @@ -2074,7 +2094,7 @@ static int ms_ensure_iov_space(ms_conn_t *c) } } - return 0; + return EXIT_SUCCESS; } /* ms_ensure_iov_space */ @@ -2086,7 +2106,7 @@ static int ms_ensure_iov_space(ms_conn_t *c) * @param buf, the buffer includes data to send * @param len, the data length in the buffer * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) { @@ -2105,6 +2125,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) */ limit_to_mtu= c->udp; +#ifdef IOV_MAX /* We may need to start a new msghdr if this one is full. */ if ((m->msg_iovlen == IOV_MAX) || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE))) @@ -2112,6 +2133,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) ms_add_msghdr(c); m= &c->msglist[c->msgused - 1]; } +#endif if (ms_ensure_iov_space(c) != 0) return -1; @@ -2140,7 +2162,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) } while (leftover > 0); - return 0; + return EXIT_SUCCESS; } /* ms_add_iov */ @@ -2149,7 +2171,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_udp_headers(ms_conn_t *c) { @@ -2185,7 +2207,7 @@ static int ms_build_udp_headers(ms_conn_t *c) hdr= c->hdrbuf; for (i= 0; i < c->msgused; i++) { - c->msglist[i].msg_iov[0].iov_base= hdr; + c->msglist[i].msg_iov[0].iov_base= (void *)hdr; c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE; *hdr++= (unsigned char)(c->request_id / 256); *hdr++= (unsigned char)(c->request_id % 256); @@ -2200,7 +2222,7 @@ static int ms_build_udp_headers(ms_conn_t *c) + UDP_HEADER_SIZE)); } - return 0; + return EXIT_SUCCESS; } /* ms_build_udp_headers */ @@ -2233,7 +2255,7 @@ static int ms_transmit(ms_conn_t *c) res= sendmsg(c->sfd, m, 0); if (res > 0) { - __sync_fetch_and_add(&ms_stats.bytes_written, res); + atomic_add_size(&ms_stats.bytes_written, res); /* We've written some of the data. Remove the completed * iovec entries from the list of pending writes. */ @@ -2248,8 +2270,8 @@ static int ms_transmit(ms_conn_t *c) * adjust it so the next write will do the rest. */ if (res > 0) { - m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res; - m->msg_iov->iov_len-= (uint64_t)res; + m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res); + m->msg_iov->iov_len-= (size_t)res; } return TRANSMIT_INCOMPLETE; } @@ -2388,12 +2410,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state) */ static bool ms_update_event(ms_conn_t *c, const int new_flags) { - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; - assert(c != NULL); struct event_base *base= c->event.ev_base; @@ -2420,19 +2436,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) event_base_set(base, &c->event); c->ev_flags= (short)new_flags; - if (c->total_sfds == 1) - { - if (event_add(&c->event, NULL) == -1) - { - return false; - } - } - else + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, &t) == -1) - { - return false; - } + return false; } return true; @@ -2451,6 +2457,7 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) */ static bool ms_need_yield(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int64_t tps= 0; int64_t time_diff= 0; struct timeval curr_time; @@ -2459,13 +2466,11 @@ static bool ms_need_yield(ms_conn_t *c) if (ms_setting.expected_tps > 0) { gettimeofday(&curr_time, NULL); - time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time); - tps= - (int64_t)((task->get_opt - + task->set_opt) / ((uint64_t)time_diff / 1000000)); + time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time); + tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000); /* current throughput is greater than expected throughput */ - if (tps > ms_thread.thread_ctx->tps_perconn) + if (tps > ms_thread->thread_ctx->tps_perconn) { return true; } @@ -2708,12 +2713,6 @@ void ms_event_handler(const int fd, const short which, void *arg) } assert(fd == c->sfd); - /* event timeout, close the current connection */ - if (c->which == EV_TIMEOUT) - { - ms_conn_set_state(c, conn_closing); - } - ms_drive_machine(c); /* wait for next event */ @@ -2726,16 +2725,16 @@ void ms_event_handler(const int fd, const short which, void *arg) * @param c, pointer of the concurrency * @param cmd, command(get or set ) * - * @return int, if success, return the index, else return 0 + * @return int, if success, return the index, else return EXIT_SUCCESS */ -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) { - int sock_index= -1; - int i= 0; + uint32_t sock_index= 0; + uint32_t i= 0; if (c->total_sfds == 1) { - return 0; + return EXIT_SUCCESS; } if (ms_setting.rep_write_srv == 0) @@ -2758,18 +2757,18 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) if (i == ms_setting.rep_write_srv) { /* random get one replication server to read */ - sock_index= (int)(random() % c->total_sfds); + sock_index= (uint32_t)random() % c->total_sfds; } else { /* random get one replication writing server to write */ - sock_index= (int)(random() % ms_setting.rep_write_srv); + sock_index= (uint32_t)random() % ms_setting.rep_write_srv; } } else if (cmd == CMD_GET) { /* random get one replication server to read */ - sock_index= (int)(random() % c->total_sfds); + sock_index= (uint32_t)random() % c->total_sfds; } } while (c->tcpsfd[sock_index] == 0); @@ -2785,9 +2784,9 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) * * @return int, return the index */ -static int ms_get_next_sock_index(ms_conn_t *c) +static uint32_t ms_get_next_sock_index(ms_conn_t *c) { - int sock_index= 0; + uint32_t sock_index= 0; do { @@ -2804,7 +2803,7 @@ static int ms_get_next_sock_index(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_update_conn_sock_event(ms_conn_t *c) { @@ -2868,7 +2867,7 @@ static int ms_update_conn_sock_event(ms_conn_t *c) } } - return 0; + return EXIT_SUCCESS; } /* ms_update_conn_sock_event */ @@ -2880,7 +2879,7 @@ static int ms_update_conn_sock_event(ms_conn_t *c) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) { @@ -2888,13 +2887,14 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) int write_len; char *buffer= c->wbuf; - write_len= sprintf(buffer, - " %u %d %d\r\n", - 0, - item->exp_time, - item->value_size); + write_len= snprintf(buffer, + c->wsize, + " %u %d %d\r\n", + 0, + item->exp_time, + item->value_size); - if (write_len > c->wsize) + if (write_len > c->wsize || write_len < 0) { /* ought to be always enough. just fail for simplicity */ fprintf(stderr, "output command line too long.\n"); @@ -2924,7 +2924,7 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_build_ascii_write_buf_set */ @@ -2935,7 +2935,7 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) { @@ -2975,11 +2975,11 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) } } - __sync_fetch_and_add(&ms_stats.obj_bytes, - item->key_size + item->value_size); - __sync_fetch_and_add(&ms_stats.cmd_set, 1); + atomic_add_size(&ms_stats.obj_bytes, + item->key_size + item->value_size); + atomic_add_size(&ms_stats.cmd_set, 1); - return 0; + return EXIT_SUCCESS; } /* ms_mcd_set */ @@ -2991,7 +2991,7 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) { @@ -3006,7 +3006,7 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_build_ascii_write_buf_get */ @@ -3016,15 +3016,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param c, pointer of the concurrency * @param item, pointer of task item which includes the object * information - * @param verify, whether do verification * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) { - /* verify not supported yet */ - UNUSED_ARGUMENT(verify); - assert(c != NULL); c->currcmd.cmd= CMD_GET; @@ -3061,9 +3057,9 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) } } - __sync_fetch_and_add(&ms_stats.cmd_get, 1); + atomic_add_size(&ms_stats.cmd_get, 1); - return 0; + return EXIT_SUCCESS; } /* ms_mcd_get */ @@ -3073,7 +3069,7 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) { @@ -3104,7 +3100,7 @@ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_build_ascii_write_buf_mlget */ @@ -3113,7 +3109,7 @@ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_mcd_mlget(ms_conn_t *c) { @@ -3160,10 +3156,10 @@ int ms_mcd_mlget(ms_conn_t *c) for (int i= 0; i < c->mlget_task.mlget_num; i++) { item= c->mlget_task.mlget_item[i].item; - __sync_fetch_and_add(&ms_stats.cmd_get, 1); + atomic_add_size(&ms_stats.cmd_get, 1); } - return 0; + return EXIT_SUCCESS; } /* ms_mcd_mlget */ @@ -3176,7 +3172,7 @@ int ms_mcd_mlget(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_bin_process_response(ms_conn_t *c) { @@ -3192,7 +3188,7 @@ static int ms_bin_process_response(ms_conn_t *c) { c->rvbytes= (int32_t)bodylen; c->readval= true; - return 1; + return EXIT_FAILURE; } else { @@ -3259,7 +3255,7 @@ static int ms_bin_process_response(ms_conn_t *c) } } - return 0; + return EXIT_SUCCESS; } /* ms_bin_process_response */ @@ -3288,11 +3284,11 @@ static void ms_add_bin_header(ms_conn_t *c, header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ; header->request.opcode= (uint8_t)opcode; - header->request.keylen= htonl(key_len); + header->request.keylen= htons(key_len); header->request.extlen= (uint8_t)hdr_len; header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES; - header->request.reserved= 0; + header->request.vbucket= 0; header->request.bodylen= htonl(body_len); header->request.opaque= 0; @@ -3325,7 +3321,7 @@ static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) { @@ -3357,7 +3353,7 @@ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) } ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size); - return 0; + return EXIT_SUCCESS; } /* ms_build_bin_write_buf_set */ @@ -3369,7 +3365,7 @@ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) { @@ -3379,7 +3375,7 @@ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) (uint32_t)item->key_size); ms_add_key_to_iov(c, item); - return 0; + return EXIT_SUCCESS; } /* ms_build_bin_write_buf_get */ @@ -3391,7 +3387,7 @@ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_mlget(ms_conn_t *c) { @@ -3415,5 +3411,5 @@ static int ms_build_bin_write_buf_mlget(ms_conn_t *c) c->wcurr= c->wbuf; - return 0; + return EXIT_SUCCESS; } /* ms_build_bin_write_buf_mlget */