X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fms_conn.c;h=85e84739835b0c3af8b8709a189f5eefa438c79d;hb=398f48445cacf12679248142f0c86d3a0d6caab6;hp=feb6a476bbf91e2b65372d53f15ac8e9b36c1f6a;hpb=88642cefb9f215ee7dd2be926af1e8fdbe64b963;p=awesomized%2Flibmemcached diff --git a/clients/ms_conn.c b/clients/ms_conn.c index feb6a476..85e84739 100644 --- a/clients/ms_conn.c +++ b/clients/ms_conn.c @@ -9,19 +9,42 @@ * */ -#include "config.h" +#include "mem_config.h" #include +#include #include #include #include #include #include +#include #include + +#if defined(HAVE_SYS_TIME_H) +# include +#endif + +#if defined(HAVE_TIME_H) +# include +#endif + #include "ms_setting.h" #include "ms_thread.h" #include "ms_atomic.h" +#ifdef linux +/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to + * optimize the conversion functions, but the prototypes generate warnings + * from gcc. The conversion methods isn't the bottleneck for my app, so + * just remove the warnings by undef'ing the optimization .. + */ +#undef ntohs +#undef ntohl +#undef htons +#undef htonl +#endif + /* for network write */ #define TRANSMIT_COMPLETE 0 #define TRANSMIT_INCOMPLETE 1 @@ -40,12 +63,12 @@ static uint64_t key_prefix_seq= KEY_PREFIX_BASE; /* global increasing counter, generating request id for UDP */ -static int32_t udp_request_id= 0; +static volatile uint32_t udp_request_id= 0; -extern __thread ms_thread_t ms_thread; +extern pthread_key_t ms_thread_key; /* generate upd request id */ -static int ms_get_udp_request_id(void); +static uint32_t ms_get_udp_request_id(void); /* connect initialize */ @@ -107,8 +130,8 @@ static int ms_transmit(ms_conn_t *c); static void ms_conn_shrink(ms_conn_t *c); static void ms_conn_set_state(ms_conn_t *c, int state); static bool ms_update_event(ms_conn_t *c, const int new_flags); -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd); -static int ms_get_next_sock_index(ms_conn_t *c); +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd); +static uint32_t ms_get_next_sock_index(ms_conn_t *c); static int ms_update_conn_sock_event(ms_conn_t *c); static bool ms_need_yield(ms_conn_t *c); static void ms_update_start_time(ms_conn_t *c); @@ -166,9 +189,9 @@ uint64_t ms_get_key_prefix(void) * * @return an unique UDP request id */ -static int ms_get_udp_request_id(void) +static uint32_t ms_get_udp_request_id(void) { - return __sync_fetch_and_add(&udp_request_id, 1); + return atomic_add_32_nv(&udp_request_id, 1); } @@ -200,7 +223,7 @@ static void ms_task_init(ms_conn_t *c) * @param c, pointer of the concurrency * @param is_udp, whether it's udp * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) { @@ -236,7 +259,7 @@ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t)); } - return 0; + return EXIT_SUCCESS; } /* ms_conn_udp_init */ @@ -248,7 +271,7 @@ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) * @param read_buffer_size * @param is_udp, whether it's udp * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_init(ms_conn_t *c, const int init_state, @@ -269,7 +292,7 @@ static int ms_conn_init(ms_conn_t *c, /* for replication, each connection need connect all the server */ if (ms_setting.rep_write_srv > 0) { - c->total_sfds= ms_setting.srv_cnt; + c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn; } else { @@ -334,14 +357,10 @@ static int ms_conn_init(ms_conn_t *c, c->mlget_task.mlget_num= 0; c->mlget_task.value_index= -1; /* default invalid value */ - if (ms_setting.binary_prot) + if (ms_setting.binary_prot_) { c->protocol= binary_prot; } - else if (is_udp) - { - c->protocol= ascii_udp_prot; - } else { c->protocol= ascii_prot; @@ -358,10 +377,10 @@ static int ms_conn_init(ms_conn_t *c, if (! (ms_setting.facebook_test && is_udp)) { - __sync_fetch_and_add(&ms_stats.active_conns, 1); + atomic_add_32(&ms_stats.active_conns, 1); } - return 0; + return EXIT_SUCCESS; } /* ms_conn_init */ @@ -394,15 +413,16 @@ static void ms_warmup_num_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_item_win_init(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int exp_cnt= 0; c->win_size= (int)ms_setting.win_size; c->set_cursor= 0; - c->exec_num= ms_thread.thread_ctx->exec_num_perconn; + c->exec_num= ms_thread->thread_ctx->exec_num_perconn; c->remain_exec_num= c->exec_num; c->item_win= (ms_task_item_t *)malloc( @@ -437,7 +457,7 @@ static int ms_item_win_init(ms_conn_t *c) ms_warmup_num_init(c); - return 0; + return EXIT_SUCCESS; } /* ms_item_win_init */ @@ -448,13 +468,14 @@ static int ms_item_win_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_sock_init(ms_conn_t *c) { - int i; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t i; int ret_sfd; - int srv_idx= 0; + uint32_t srv_idx= 0; assert(c != NULL); assert(c->tcpsfd != NULL); @@ -465,12 +486,12 @@ static int ms_conn_sock_init(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { /* for replication, each connection need connect all the server */ - srv_idx= i; + srv_idx= i % ms_setting.srv_cnt; } else { /* all the connections in a thread connects the same server */ - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; } if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, @@ -517,7 +538,7 @@ static int ms_conn_sock_init(ms_conn_t *c) } else { - for (int j= 0; j < i; j++) + for (uint32_t j= 0; j < i; j++) { close(c->tcpsfd[j]); } @@ -531,7 +552,7 @@ static int ms_conn_sock_init(ms_conn_t *c) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_conn_sock_init */ @@ -541,37 +562,23 @@ static int ms_conn_sock_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_conn_event_init(ms_conn_t *c) { - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); short event_flags= EV_WRITE | EV_PERSIST; event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); - event_base_set(ms_thread.base, &c->event); + event_base_set(ms_thread->base, &c->event); c->ev_flags= event_flags; - if (c->total_sfds == 1) - { - if (event_add(&c->event, NULL) == -1) - { - return -1; - } - } - else + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, &t) == -1) - { - return -1; - } + return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_conn_event_init */ @@ -581,7 +588,7 @@ static int ms_conn_event_init(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_setup_conn(ms_conn_t *c) { @@ -605,7 +612,7 @@ int ms_setup_conn(ms_conn_t *c) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_setup_conn */ @@ -616,6 +623,7 @@ int ms_setup_conn(ms_conn_t *c) */ void ms_conn_free(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); if (c != NULL) { if (c->hdrbuf != NULL) @@ -639,9 +647,9 @@ void ms_conn_free(ms_conn_t *c) if (c->tcpsfd != NULL) free(c->tcpsfd); - if (--ms_thread.nactive_conn == 0) + if (--ms_thread->nactive_conn == 0) { - free(ms_thread.conn); + free(ms_thread->conn); } } } /* ms_conn_free */ @@ -654,12 +662,13 @@ void ms_conn_free(ms_conn_t *c) */ static void ms_conn_close(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); assert(c != NULL); /* delete the event, the socket and the connection */ event_del(&c->event); - for (int i= 0; i < c->total_sfds; i++) + for (uint32_t i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] > 0) { @@ -673,7 +682,7 @@ static void ms_conn_close(ms_conn_t *c) close(c->udpsfd); } - __sync_fetch_and_sub(&ms_stats.active_conns, 1); + atomic_dec_32(&ms_stats.active_conns); ms_conn_free(c); @@ -685,7 +694,7 @@ static void ms_conn_close(ms_conn_t *c) pthread_mutex_unlock(&ms_global.run_lock.lock); } - if (ms_thread.nactive_conn == 0) + if (ms_thread->nactive_conn == 0) { pthread_exit(NULL); } @@ -697,7 +706,7 @@ static void ms_conn_close(ms_conn_t *c) * * @param ai, server address information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_new_socket(struct addrinfo *ai) { @@ -749,6 +758,7 @@ static void ms_maximize_sndbuf(const int sfd) max= avg - 1; } } + (void)last_good; } /* ms_maximize_sndbuf */ @@ -761,7 +771,7 @@ static void ms_maximize_sndbuf(const int sfd) * @param is_udp, whether it's udp * @param ret_sfd, the connected socket file descriptor * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_network_connect(ms_conn_t *c, char *srv_host_name, @@ -788,7 +798,11 @@ static int ms_network_connect(ms_conn_t *c, * that otherwise mess things up. */ memset(&hints, 0, sizeof(hints)); +#ifdef AI_ADDRCONFIG hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG; +#else + hints.ai_flags= AI_PASSIVE; +#endif /* AI_ADDRCONFIG */ if (is_udp) { hints.ai_protocol= IPPROTO_UDP; @@ -880,21 +894,22 @@ static int ms_network_connect(ms_conn_t *c, * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_reconn(ms_conn_t *c) { - int srv_idx= 0; - int srv_conn_cnt= 0; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t srv_idx= 0; + uint32_t srv_conn_cnt= 0; if (ms_setting.rep_write_srv > 0) { - srv_idx= c->cur_idx; - srv_conn_cnt= ms_setting.nconns; + srv_idx= c->cur_idx % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; } else { - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; } @@ -902,7 +917,7 @@ static int ms_reconn(ms_conn_t *c) close(c->sfd); c->tcpsfd[c->cur_idx]= 0; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) % srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); @@ -913,7 +928,8 @@ static int ms_reconn(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - int i= 0; + uint32_t i= 0; + for (i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] != 0) @@ -938,8 +954,8 @@ static int ms_reconn(ms_conn_t *c) ms_setting.udp, &c->sfd) == 0) { c->tcpsfd[c->cur_idx]= c->sfd; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % srv_conn_cnt == 0) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); int reconn_time= @@ -953,13 +969,13 @@ static int ms_reconn(ms_conn_t *c) break; } - if (c->total_sfds == 1) + if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) { /* wait a second and reconnect */ sleep(1); } } - while (c->total_sfds == 1); + while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); } if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) @@ -968,7 +984,7 @@ static int ms_reconn(ms_conn_t *c) c->alive_sfds--; } - return 0; + return EXIT_SUCCESS; } /* ms_reconn */ @@ -980,23 +996,24 @@ static int ms_reconn(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_reconn_socks(ms_conn_t *c) { - int srv_idx= 0; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t srv_idx= 0; int ret_sfd= 0; - int srv_conn_cnt= 0; + uint32_t srv_conn_cnt= 0; struct timeval cur_time; assert(c != NULL); if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds)) { - return 0; + return EXIT_SUCCESS; } - for (int i= 0; i < c->total_sfds; i++) + for (uint32_t i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] == 0) { @@ -1016,12 +1033,12 @@ int ms_reconn_socks(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - srv_idx= i; - srv_conn_cnt= ms_setting.nconns; + srv_idx= i % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; } else { - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; } @@ -1032,8 +1049,8 @@ int ms_reconn_socks(ms_conn_t *c) c->tcpsfd[i]= ret_sfd; c->alive_sfds++; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % srv_conn_cnt == 0) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); int reconn_time= @@ -1048,7 +1065,7 @@ int ms_reconn_socks(ms_conn_t *c) } } - return 0; + return EXIT_SUCCESS; } /* ms_reconn_socks */ @@ -1120,7 +1137,7 @@ static int ms_tokenize_command(char *command, * @param c, pointer of the concurrency * @param command, the string responded by server * - * @return int, if the command completed return 0, else return + * @return int, if the command completed return EXIT_SUCCESS, else return * -1 */ static int ms_ascii_process_line(ms_conn_t *c, char *command) @@ -1143,7 +1160,12 @@ static int ms_ascii_process_line(ms_conn_t *c, char *command) { token_t tokens[MAX_TOKENS]; ms_tokenize_command(command, tokens, MAX_TOKENS); + errno= 0; value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10); + if (errno != 0) + { + printf("<%d ERROR %s\n", c->sfd, strerror(errno)); + } c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value; /* @@ -1162,6 +1184,7 @@ static int ms_ascii_process_line(ms_conn_t *c, char *command) case 'O': /* OK */ c->currcmd.retstat= MCD_SUCCESS; + break; case 'S': /* STORED STATS SERVER_ERROR */ if (buffer[2] == 'A') /* STORED STATS */ @@ -1264,7 +1287,7 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) { if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) { - memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets); + memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets); } c->packets= 0; @@ -1277,6 +1300,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) c->ctnwrite= false; c->rbytes= 0; c->rcurr= c->rbuf; + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; ms_conn_set_state(c, conn_write); memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ @@ -1292,7 +1318,7 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_try_read_line(ms_conn_t *c) { @@ -1302,7 +1328,7 @@ static int ms_try_read_line(ms_conn_t *c) if ((uint64_t)c->rbytes < sizeof(c->binary_header)) { /* need more data! */ - return 0; + return EXIT_SUCCESS; } else { @@ -1323,16 +1349,16 @@ static int ms_try_read_line(ms_conn_t *c) c->binary_header= *rsp; c->binary_header.response.extlen= rsp->response.extlen; - c->binary_header.response.keylen= ntohl(rsp->response.keylen); + c->binary_header.response.keylen= ntohs(rsp->response.keylen); c->binary_header.response.bodylen= ntohl(rsp->response.bodylen); - c->binary_header.response.status= ntohl(rsp->response.status); + c->binary_header.response.status= ntohs(rsp->response.status); if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) { fprintf(stderr, "Invalid magic: %x\n", c->binary_header.response.magic); ms_conn_set_state(c, conn_closing); - return 0; + return EXIT_SUCCESS; } /* process this complete response */ @@ -1357,11 +1383,11 @@ static int ms_try_read_line(ms_conn_t *c) assert(c->rcurr <= (c->rbuf + c->rsize)); if (c->rbytes == 0) - return 0; + return EXIT_SUCCESS; el= memchr(c->rcurr, '\n', (size_t)c->rbytes); if (! el) - return 0; + return EXIT_SUCCESS; cont= el + 1; if (((el - c->rcurr) > 1) && (*(el - 1) == '\r')) @@ -1524,6 +1550,7 @@ static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes) break; } } + (void)packets; return wbytes == 0 ? -1 : wbytes; } /* ms_sort_udp_packet */ @@ -1569,7 +1596,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) if (res > 0) { - __sync_fetch_and_add(&ms_stats.bytes_read, res); + atomic_add_size(&ms_stats.bytes_read, res); c->rudpbytes+= res; rbytes+= res; if (res == avail) @@ -1603,7 +1630,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) if (copybytes == -1) { - __sync_fetch_and_add(&ms_stats.pkt_disorder, 1); + atomic_add_size(&ms_stats.pkt_disorder, 1); } return copybytes; @@ -1615,7 +1642,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) * close. * before reading, move the remaining incomplete fragment of a command * (if any) to the beginning of the buffer. - * return 0 if there's nothing to read on the first read. + * return EXIT_SUCCESS if there's nothing to read on the first read. */ /** @@ -1626,8 +1653,8 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) * @param c, pointer of the concurrency * * @return int, - * return 0 if there's nothing to read on the first read. - * return 1 if get data + * return EXIT_SUCCESS if there's nothing to read on the first read. + * return EXIT_FAILURE if get data * return -1 if error happens */ static int ms_try_read_network(ms_conn_t *c) @@ -1681,7 +1708,7 @@ static int ms_try_read_network(ms_conn_t *c) { if (! c->udp) { - __sync_fetch_and_add(&ms_stats.bytes_read, res); + atomic_add_size(&ms_stats.bytes_read, res); } gotdata= 1; c->rbytes+= res; @@ -1745,7 +1772,7 @@ static void ms_verify_value(ms_conn_t *c, if (curr_time.tv_sec - c->curr_task.item->client_time > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) { - __sync_fetch_and_add(&ms_stats.exp_get, 1); + atomic_add_size(&ms_stats.exp_get, 1); if (ms_setting.verbose) { @@ -1759,7 +1786,7 @@ static void ms_verify_value(ms_conn_t *c, "\n<%d expire time verification failed, " "object expired but get it now\n" "\tkey len: %d\n" - "\tkey: %lx %.*s\n" + "\tkey: %" PRIx64 " %.*s\n" "\tset time: %s current time: %s " "diff time: %d expire time: %d\n" "\texpected data: \n" @@ -1786,14 +1813,14 @@ static void ms_verify_value(ms_conn_t *c, if ((c->curr_task.item->value_size != vlen) || (memcmp(orignval, value, (size_t)vlen) != 0)) { - __sync_fetch_and_add(&ms_stats.vef_failed, 1); + atomic_add_size(&ms_stats.vef_failed, 1); if (ms_setting.verbose) { fprintf(stderr, "\n<%d data verification failed\n" "\tkey len: %d\n" - "\tkey: %lx %.*s\n" + "\tkey: %" PRIx64" %.*s\n" "\texpected data len: %d\n" "\texpected data: %.*s\n" "\treceived data len: %d\n" @@ -1834,7 +1861,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot); + assert(c->protocol == ascii_prot); if (c->rvbytes > 2) { assert( @@ -1974,8 +2001,7 @@ static void ms_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot - || c->protocol == ascii_prot + assert(c->protocol == ascii_prot || c->protocol == binary_prot); if (c->protocol == binary_prot) @@ -1994,7 +2020,7 @@ static void ms_complete_nread(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_add_msghdr(ms_conn_t *c) { @@ -2005,7 +2031,7 @@ static int ms_add_msghdr(ms_conn_t *c) if (c->msgsize == c->msgused) { msg= - realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr)); + realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr)); if (! msg) return -1; @@ -2038,7 +2064,7 @@ static int ms_add_msghdr(ms_conn_t *c) return ms_add_iov(c, NULL, UDP_HEADER_SIZE); } - return 0; + return EXIT_SUCCESS; } /* ms_add_msghdr */ @@ -2048,7 +2074,7 @@ static int ms_add_msghdr(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_ensure_iov_space(ms_conn_t *c) { @@ -2058,7 +2084,7 @@ static int ms_ensure_iov_space(ms_conn_t *c) { int i, iovnum; struct iovec *new_iov= (struct iovec *)realloc(c->iov, - ((uint64_t)c->iovsize + ((size_t)c->iovsize * 2) * sizeof(struct iovec)); if (! new_iov) @@ -2075,7 +2101,7 @@ static int ms_ensure_iov_space(ms_conn_t *c) } } - return 0; + return EXIT_SUCCESS; } /* ms_ensure_iov_space */ @@ -2087,7 +2113,7 @@ static int ms_ensure_iov_space(ms_conn_t *c) * @param buf, the buffer includes data to send * @param len, the data length in the buffer * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) { @@ -2106,6 +2132,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) */ limit_to_mtu= c->udp; +#ifdef IOV_MAX /* We may need to start a new msghdr if this one is full. */ if ((m->msg_iovlen == IOV_MAX) || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE))) @@ -2113,6 +2140,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) ms_add_msghdr(c); m= &c->msglist[c->msgused - 1]; } +#endif if (ms_ensure_iov_space(c) != 0) return -1; @@ -2141,7 +2169,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) } while (leftover > 0); - return 0; + return EXIT_SUCCESS; } /* ms_add_iov */ @@ -2150,7 +2178,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_udp_headers(ms_conn_t *c) { @@ -2186,7 +2214,7 @@ static int ms_build_udp_headers(ms_conn_t *c) hdr= c->hdrbuf; for (i= 0; i < c->msgused; i++) { - c->msglist[i].msg_iov[0].iov_base= hdr; + c->msglist[i].msg_iov[0].iov_base= (void *)hdr; c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE; *hdr++= (unsigned char)(c->request_id / 256); *hdr++= (unsigned char)(c->request_id % 256); @@ -2201,7 +2229,7 @@ static int ms_build_udp_headers(ms_conn_t *c) + UDP_HEADER_SIZE)); } - return 0; + return EXIT_SUCCESS; } /* ms_build_udp_headers */ @@ -2234,7 +2262,7 @@ static int ms_transmit(ms_conn_t *c) res= sendmsg(c->sfd, m, 0); if (res > 0) { - __sync_fetch_and_add(&ms_stats.bytes_written, res); + atomic_add_size(&ms_stats.bytes_written, res); /* We've written some of the data. Remove the completed * iovec entries from the list of pending writes. */ @@ -2249,8 +2277,8 @@ static int ms_transmit(ms_conn_t *c) * adjust it so the next write will do the rest. */ if (res > 0) { - m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res; - m->msg_iov->iov_len-= (uint64_t)res; + m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res); + m->msg_iov->iov_len-= (size_t)res; } return TRANSMIT_INCOMPLETE; } @@ -2317,7 +2345,7 @@ static void ms_conn_shrink(ms_conn_t *c) && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE)) { char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2); - if (! new_rbuf) + if (new_rbuf) { c->rudpbuf= new_rbuf; c->rudpsize= UDP_DATA_BUFFER_SIZE; @@ -2389,12 +2417,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state) */ static bool ms_update_event(ms_conn_t *c, const int new_flags) { - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; - assert(c != NULL); struct event_base *base= c->event.ev_base; @@ -2421,19 +2443,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) event_base_set(base, &c->event); c->ev_flags= (short)new_flags; - if (c->total_sfds == 1) - { - if (event_add(&c->event, NULL) == -1) - { - return false; - } - } - else + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, &t) == -1) - { - return false; - } + return false; } return true; @@ -2452,6 +2464,7 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) */ static bool ms_need_yield(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int64_t tps= 0; int64_t time_diff= 0; struct timeval curr_time; @@ -2460,13 +2473,11 @@ static bool ms_need_yield(ms_conn_t *c) if (ms_setting.expected_tps > 0) { gettimeofday(&curr_time, NULL); - time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time); - tps= - (int64_t)((task->get_opt - + task->set_opt) / ((uint64_t)time_diff / 1000000)); + time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time); + tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000); /* current throughput is greater than expected throughput */ - if (tps > ms_thread.thread_ctx->tps_perconn) + if (tps > ms_thread->thread_ctx->tps_perconn) { return true; } @@ -2709,12 +2720,6 @@ void ms_event_handler(const int fd, const short which, void *arg) } assert(fd == c->sfd); - /* event timeout, close the current connection */ - if (c->which == EV_TIMEOUT) - { - ms_conn_set_state(c, conn_closing); - } - ms_drive_machine(c); /* wait for next event */ @@ -2727,16 +2732,16 @@ void ms_event_handler(const int fd, const short which, void *arg) * @param c, pointer of the concurrency * @param cmd, command(get or set ) * - * @return int, if success, return the index, else return 0 + * @return int, if success, return the index, else return EXIT_SUCCESS */ -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) { - int sock_index= -1; - int i= 0; + uint32_t sock_index= 0; + uint32_t i= 0; if (c->total_sfds == 1) { - return 0; + return EXIT_SUCCESS; } if (ms_setting.rep_write_srv == 0) @@ -2759,18 +2764,18 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) if (i == ms_setting.rep_write_srv) { /* random get one replication server to read */ - sock_index= (int)(random() % c->total_sfds); + sock_index= (uint32_t)random() % c->total_sfds; } else { /* random get one replication writing server to write */ - sock_index= (int)(random() % ms_setting.rep_write_srv); + sock_index= (uint32_t)random() % ms_setting.rep_write_srv; } } else if (cmd == CMD_GET) { /* random get one replication server to read */ - sock_index= (int)(random() % c->total_sfds); + sock_index= (uint32_t)random() % c->total_sfds; } } while (c->tcpsfd[sock_index] == 0); @@ -2786,9 +2791,9 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) * * @return int, return the index */ -static int ms_get_next_sock_index(ms_conn_t *c) +static uint32_t ms_get_next_sock_index(ms_conn_t *c) { - int sock_index= 0; + uint32_t sock_index= 0; do { @@ -2805,7 +2810,7 @@ static int ms_get_next_sock_index(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_update_conn_sock_event(ms_conn_t *c) { @@ -2869,7 +2874,7 @@ static int ms_update_conn_sock_event(ms_conn_t *c) } } - return 0; + return EXIT_SUCCESS; } /* ms_update_conn_sock_event */ @@ -2881,7 +2886,7 @@ static int ms_update_conn_sock_event(ms_conn_t *c) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) { @@ -2889,13 +2894,14 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) int write_len; char *buffer= c->wbuf; - write_len= sprintf(buffer, - " %u %d %d\r\n", - 0, - item->exp_time, - item->value_size); + write_len= snprintf(buffer, + c->wsize, + " %u %d %d\r\n", + 0, + item->exp_time, + item->value_size); - if (write_len > c->wsize) + if (write_len > c->wsize || write_len < 0) { /* ought to be always enough. just fail for simplicity */ fprintf(stderr, "output command line too long.\n"); @@ -2925,7 +2931,7 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_build_ascii_write_buf_set */ @@ -2936,7 +2942,7 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) { @@ -2976,11 +2982,11 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) } } - __sync_fetch_and_add(&ms_stats.obj_bytes, - item->key_size + item->value_size); - __sync_fetch_and_add(&ms_stats.cmd_set, 1); + atomic_add_size(&ms_stats.obj_bytes, + item->key_size + item->value_size); + atomic_add_size(&ms_stats.cmd_set, 1); - return 0; + return EXIT_SUCCESS; } /* ms_mcd_set */ @@ -2992,7 +2998,7 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) { @@ -3007,7 +3013,7 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_build_ascii_write_buf_get */ @@ -3017,15 +3023,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param c, pointer of the concurrency * @param item, pointer of task item which includes the object * information - * @param verify, whether do verification * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) { - /* verify not supported yet */ - UNUSED_ARGUMENT(verify); - assert(c != NULL); c->currcmd.cmd= CMD_GET; @@ -3062,9 +3064,9 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) } } - __sync_fetch_and_add(&ms_stats.cmd_get, 1); + atomic_add_size(&ms_stats.cmd_get, 1); - return 0; + return EXIT_SUCCESS; } /* ms_mcd_get */ @@ -3074,7 +3076,7 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) { @@ -3105,7 +3107,7 @@ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) return -1; } - return 0; + return EXIT_SUCCESS; } /* ms_build_ascii_write_buf_mlget */ @@ -3114,7 +3116,7 @@ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ int ms_mcd_mlget(ms_conn_t *c) { @@ -3161,10 +3163,12 @@ int ms_mcd_mlget(ms_conn_t *c) for (int i= 0; i < c->mlget_task.mlget_num; i++) { item= c->mlget_task.mlget_item[i].item; - __sync_fetch_and_add(&ms_stats.cmd_get, 1); + atomic_add_size(&ms_stats.cmd_get, 1); } - return 0; + (void)item; + + return EXIT_SUCCESS; } /* ms_mcd_mlget */ @@ -3177,7 +3181,7 @@ int ms_mcd_mlget(ms_conn_t *c) * * @param c, pointer of the concurrency * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_bin_process_response(ms_conn_t *c) { @@ -3193,7 +3197,7 @@ static int ms_bin_process_response(ms_conn_t *c) { c->rvbytes= (int32_t)bodylen; c->readval= true; - return 1; + return EXIT_FAILURE; } else { @@ -3260,7 +3264,7 @@ static int ms_bin_process_response(ms_conn_t *c) } } - return 0; + return EXIT_SUCCESS; } /* ms_bin_process_response */ @@ -3289,11 +3293,11 @@ static void ms_add_bin_header(ms_conn_t *c, header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ; header->request.opcode= (uint8_t)opcode; - header->request.keylen= htonl(key_len); + header->request.keylen= htons(key_len); header->request.extlen= (uint8_t)hdr_len; header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES; - header->request.reserved= 0; + header->request.vbucket= 0; header->request.bodylen= htonl(body_len); header->request.opaque= 0; @@ -3326,7 +3330,7 @@ static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) { @@ -3358,7 +3362,7 @@ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) } ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size); - return 0; + return EXIT_SUCCESS; } /* ms_build_bin_write_buf_set */ @@ -3370,7 +3374,7 @@ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) { @@ -3380,7 +3384,7 @@ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) (uint32_t)item->key_size); ms_add_key_to_iov(c, item); - return 0; + return EXIT_SUCCESS; } /* ms_build_bin_write_buf_get */ @@ -3392,7 +3396,7 @@ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param item, pointer of task item which includes the object * information * - * @return int, if success, return 0, else return -1 + * @return int, if success, return EXIT_SUCCESS, else return -1 */ static int ms_build_bin_write_buf_mlget(ms_conn_t *c) { @@ -3416,5 +3420,5 @@ static int ms_build_bin_write_buf_mlget(ms_conn_t *c) c->wcurr= c->wbuf; - return 0; + return EXIT_SUCCESS; } /* ms_build_bin_write_buf_mlget */