X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fms_conn.c;h=c062dc2958df424eb1e165e0e6cf6ce33396033f;hb=9dc50c36c14c34741170db9da948b62ae1fe6a24;hp=ad12a113b11f5015c7097d0e4345e9ea7bdc7ca6;hpb=604540c15315c6ed876082529f639697ba3eceab;p=awesomized%2Flibmemcached diff --git a/clients/ms_conn.c b/clients/ms_conn.c index ad12a113..c062dc29 100644 --- a/clients/ms_conn.c +++ b/clients/ms_conn.c @@ -12,6 +12,7 @@ #include "config.h" #include +#include #include #include #include @@ -20,6 +21,19 @@ #include #include "ms_setting.h" #include "ms_thread.h" +#include "ms_atomic.h" + +#ifdef linux +/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to + * optimize the conversion functions, but the prototypes generate warnings + * from gcc. The conversion methods isn't the bottleneck for my app, so + * just remove the warnings by undef'ing the optimization .. + */ +#undef ntohs +#undef ntohl +#undef htons +#undef htonl +#endif /* for network write */ #define TRANSMIT_COMPLETE 0 @@ -39,12 +53,12 @@ static uint64_t key_prefix_seq= KEY_PREFIX_BASE; /* global increasing counter, generating request id for UDP */ -static int udp_request_id= 0; +static volatile uint32_t udp_request_id= 0; -extern __thread ms_thread_t ms_thread; +extern pthread_key_t ms_thread_key; /* generate upd request id */ -static int ms_get_udp_request_id(void); +static uint32_t ms_get_udp_request_id(void); /* connect initialize */ @@ -165,9 +179,9 @@ uint64_t ms_get_key_prefix(void) * * @return an unique UDP request id */ -static int ms_get_udp_request_id(void) +static uint32_t ms_get_udp_request_id(void) { - return __sync_fetch_and_add(&udp_request_id, 1); + return atomic_add_32_nv(&udp_request_id, 1); } @@ -357,7 +371,7 @@ static int ms_conn_init(ms_conn_t *c, if (! (ms_setting.facebook_test && is_udp)) { - __sync_fetch_and_add(&ms_stats.active_conns, 1); + atomic_add_32(&ms_stats.active_conns, 1); } return 0; @@ -397,11 +411,12 @@ static void ms_warmup_num_init(ms_conn_t *c) */ static int ms_item_win_init(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int exp_cnt= 0; c->win_size= (int)ms_setting.win_size; c->set_cursor= 0; - c->exec_num= ms_thread.thread_ctx->exec_num_perconn; + c->exec_num= ms_thread->thread_ctx->exec_num_perconn; c->remain_exec_num= c->exec_num; c->item_win= (ms_task_item_t *)malloc( @@ -451,6 +466,7 @@ static int ms_item_win_init(ms_conn_t *c) */ static int ms_conn_sock_init(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int i; int ret_sfd; int srv_idx= 0; @@ -469,7 +485,7 @@ static int ms_conn_sock_init(ms_conn_t *c) else { /* all the connections in a thread connects the same server */ - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; } if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, @@ -544,6 +560,7 @@ static int ms_conn_sock_init(ms_conn_t *c) */ static int ms_conn_event_init(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); /* default event timeout 10 seconds */ struct timeval t= { @@ -552,7 +569,7 @@ static int ms_conn_event_init(ms_conn_t *c) short event_flags= EV_WRITE | EV_PERSIST; event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); - event_base_set(ms_thread.base, &c->event); + event_base_set(ms_thread->base, &c->event); c->ev_flags= event_flags; if (c->total_sfds == 1) @@ -615,6 +632,7 @@ int ms_setup_conn(ms_conn_t *c) */ void ms_conn_free(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); if (c != NULL) { if (c->hdrbuf != NULL) @@ -638,9 +656,9 @@ void ms_conn_free(ms_conn_t *c) if (c->tcpsfd != NULL) free(c->tcpsfd); - if (--ms_thread.nactive_conn == 0) + if (--ms_thread->nactive_conn == 0) { - free(ms_thread.conn); + free(ms_thread->conn); } } } /* ms_conn_free */ @@ -653,6 +671,7 @@ void ms_conn_free(ms_conn_t *c) */ static void ms_conn_close(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); assert(c != NULL); /* delete the event, the socket and the connection */ @@ -672,7 +691,7 @@ static void ms_conn_close(ms_conn_t *c) close(c->udpsfd); } - __sync_fetch_and_sub(&ms_stats.active_conns, 1); + atomic_dec_32(&ms_stats.active_conns); ms_conn_free(c); @@ -684,7 +703,7 @@ static void ms_conn_close(ms_conn_t *c) pthread_mutex_unlock(&ms_global.run_lock.lock); } - if (ms_thread.nactive_conn == 0) + if (ms_thread->nactive_conn == 0) { pthread_exit(NULL); } @@ -883,8 +902,9 @@ static int ms_network_connect(ms_conn_t *c, */ static int ms_reconn(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int srv_idx= 0; - int srv_conn_cnt= 0; + int32_t srv_conn_cnt= 0; if (ms_setting.rep_write_srv > 0) { @@ -893,7 +913,7 @@ static int ms_reconn(ms_conn_t *c) } else { - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; } @@ -901,8 +921,8 @@ static int ms_reconn(ms_conn_t *c) close(c->sfd); c->tcpsfd[c->cur_idx]= 0; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1) - % srv_conn_cnt == 0) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); fprintf(stderr, "Server %s:%d disconnect\n", @@ -937,8 +957,8 @@ static int ms_reconn(ms_conn_t *c) ms_setting.udp, &c->sfd) == 0) { c->tcpsfd[c->cur_idx]= c->sfd; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % srv_conn_cnt == 0) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); int reconn_time= @@ -983,6 +1003,7 @@ static int ms_reconn(ms_conn_t *c) */ int ms_reconn_socks(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int srv_idx= 0; int ret_sfd= 0; int srv_conn_cnt= 0; @@ -1020,7 +1041,7 @@ int ms_reconn_socks(ms_conn_t *c) } else { - srv_idx= ms_thread.thread_ctx->srv_idx; + srv_idx= ms_thread->thread_ctx->srv_idx; srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; } @@ -1031,8 +1052,8 @@ int ms_reconn_socks(ms_conn_t *c) c->tcpsfd[i]= ret_sfd; c->alive_sfds++; - if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % srv_conn_cnt == 0) + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); int reconn_time= @@ -1263,7 +1284,7 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) { if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) { - memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets); + memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets); } c->packets= 0; @@ -1322,9 +1343,9 @@ static int ms_try_read_line(ms_conn_t *c) c->binary_header= *rsp; c->binary_header.response.extlen= rsp->response.extlen; - c->binary_header.response.keylen= ntohl(rsp->response.keylen); + c->binary_header.response.keylen= ntohs(rsp->response.keylen); c->binary_header.response.bodylen= ntohl(rsp->response.bodylen); - c->binary_header.response.status= ntohl(rsp->response.status); + c->binary_header.response.status= ntohs(rsp->response.status); if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) { @@ -1568,7 +1589,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) if (res > 0) { - __sync_fetch_and_add(&ms_stats.bytes_read, res); + atomic_add_size(&ms_stats.bytes_read, res); c->rudpbytes+= res; rbytes+= res; if (res == avail) @@ -1602,7 +1623,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) if (copybytes == -1) { - __sync_fetch_and_add(&ms_stats.pkt_disorder, 1); + atomic_add_size(&ms_stats.pkt_disorder, 1); } return copybytes; @@ -1680,7 +1701,7 @@ static int ms_try_read_network(ms_conn_t *c) { if (! c->udp) { - __sync_fetch_and_add(&ms_stats.bytes_read, res); + atomic_add_size(&ms_stats.bytes_read, res); } gotdata= 1; c->rbytes+= res; @@ -1744,7 +1765,7 @@ static void ms_verify_value(ms_conn_t *c, if (curr_time.tv_sec - c->curr_task.item->client_time > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) { - __sync_fetch_and_add(&ms_stats.exp_get, 1); + atomic_add_size(&ms_stats.exp_get, 1); if (ms_setting.verbose) { @@ -1758,7 +1779,7 @@ static void ms_verify_value(ms_conn_t *c, "\n<%d expire time verification failed, " "object expired but get it now\n" "\tkey len: %d\n" - "\tkey: %lx %.*s\n" + "\tkey: %" PRIx64 " %.*s\n" "\tset time: %s current time: %s " "diff time: %d expire time: %d\n" "\texpected data: \n" @@ -1785,14 +1806,14 @@ static void ms_verify_value(ms_conn_t *c, if ((c->curr_task.item->value_size != vlen) || (memcmp(orignval, value, (size_t)vlen) != 0)) { - __sync_fetch_and_add(&ms_stats.vef_failed, 1); + atomic_add_size(&ms_stats.vef_failed, 1); if (ms_setting.verbose) { fprintf(stderr, "\n<%d data verification failed\n" "\tkey len: %d\n" - "\tkey: %lx %.*s\n" + "\tkey: %" PRIx64" %.*s\n" "\texpected data len: %d\n" "\texpected data: %.*s\n" "\treceived data len: %d\n" @@ -2004,7 +2025,7 @@ static int ms_add_msghdr(ms_conn_t *c) if (c->msgsize == c->msgused) { msg= - realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr)); + realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr)); if (! msg) return -1; @@ -2057,7 +2078,7 @@ static int ms_ensure_iov_space(ms_conn_t *c) { int i, iovnum; struct iovec *new_iov= (struct iovec *)realloc(c->iov, - ((uint64_t)c->iovsize + ((size_t)c->iovsize * 2) * sizeof(struct iovec)); if (! new_iov) @@ -2185,7 +2206,7 @@ static int ms_build_udp_headers(ms_conn_t *c) hdr= c->hdrbuf; for (i= 0; i < c->msgused; i++) { - c->msglist[i].msg_iov[0].iov_base= hdr; + c->msglist[i].msg_iov[0].iov_base= (void *)hdr; c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE; *hdr++= (unsigned char)(c->request_id / 256); *hdr++= (unsigned char)(c->request_id % 256); @@ -2233,7 +2254,7 @@ static int ms_transmit(ms_conn_t *c) res= sendmsg(c->sfd, m, 0); if (res > 0) { - __sync_fetch_and_add(&ms_stats.bytes_written, res); + atomic_add_size(&ms_stats.bytes_written, res); /* We've written some of the data. Remove the completed * iovec entries from the list of pending writes. */ @@ -2248,8 +2269,8 @@ static int ms_transmit(ms_conn_t *c) * adjust it so the next write will do the rest. */ if (res > 0) { - m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res; - m->msg_iov->iov_len-= (uint64_t)res; + m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res); + m->msg_iov->iov_len-= (size_t)res; } return TRANSMIT_INCOMPLETE; } @@ -2451,6 +2472,7 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) */ static bool ms_need_yield(ms_conn_t *c) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); int64_t tps= 0; int64_t time_diff= 0; struct timeval curr_time; @@ -2459,13 +2481,13 @@ static bool ms_need_yield(ms_conn_t *c) if (ms_setting.expected_tps > 0) { gettimeofday(&curr_time, NULL); - time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time); + time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time); tps= (int64_t)((task->get_opt + task->set_opt) / ((uint64_t)time_diff / 1000000)); /* current throughput is greater than expected throughput */ - if (tps > ms_thread.thread_ctx->tps_perconn) + if (tps > ms_thread->thread_ctx->tps_perconn) { return true; } @@ -2975,9 +2997,9 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) } } - __sync_fetch_and_add(&ms_stats.obj_bytes, - item->key_size + item->value_size); - __sync_fetch_and_add(&ms_stats.cmd_set, 1); + atomic_add_size(&ms_stats.obj_bytes, + item->key_size + item->value_size); + atomic_add_size(&ms_stats.cmd_set, 1); return 0; } /* ms_mcd_set */ @@ -3061,7 +3083,7 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) } } - __sync_fetch_and_add(&ms_stats.cmd_get, 1); + atomic_add_size(&ms_stats.cmd_get, 1); return 0; } /* ms_mcd_get */ @@ -3160,7 +3182,7 @@ int ms_mcd_mlget(ms_conn_t *c) for (int i= 0; i < c->mlget_task.mlget_num; i++) { item= c->mlget_task.mlget_item[i].item; - __sync_fetch_and_add(&ms_stats.cmd_get, 1); + atomic_add_size(&ms_stats.cmd_get, 1); } return 0; @@ -3288,7 +3310,7 @@ static void ms_add_bin_header(ms_conn_t *c, header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ; header->request.opcode= (uint8_t)opcode; - header->request.keylen= htonl(key_len); + header->request.keylen= htons(key_len); header->request.extlen= (uint8_t)hdr_len; header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;