X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;ds=sidebyside;f=clients%2Fms_conn.c;h=005879bdaaed7a994df109a7493fe82565a3fe0c;hb=c79b88bc75d37c933e9d05c83113ccb26ded3683;hp=34707958aa0d21896c0fd6fe5b3e4f64187b857b;hpb=e14953a7d23cb8b951b8bc7eaaaf2cdd3b67a079;p=m6w6%2Flibmemcached diff --git a/clients/ms_conn.c b/clients/ms_conn.c index 34707958..005879bd 100644 --- a/clients/ms_conn.c +++ b/clients/ms_conn.c @@ -19,6 +19,16 @@ #include #include #include +#if TIME_WITH_SYS_TIME +# include +# include +#else +# if HAVE_SYS_TIME_H +# include +# else +# include +# endif +#endif #include "ms_setting.h" #include "ms_thread.h" #include "ms_atomic.h" @@ -120,8 +130,8 @@ static int ms_transmit(ms_conn_t *c); static void ms_conn_shrink(ms_conn_t *c); static void ms_conn_set_state(ms_conn_t *c, int state); static bool ms_update_event(ms_conn_t *c, const int new_flags); -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd); -static int ms_get_next_sock_index(ms_conn_t *c); +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd); +static uint32_t ms_get_next_sock_index(ms_conn_t *c); static int ms_update_conn_sock_event(ms_conn_t *c); static bool ms_need_yield(ms_conn_t *c); static void ms_update_start_time(ms_conn_t *c); @@ -282,7 +292,7 @@ static int ms_conn_init(ms_conn_t *c, /* for replication, each connection need connect all the server */ if (ms_setting.rep_write_srv > 0) { - c->total_sfds= ms_setting.srv_cnt; + c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn; } else { @@ -351,10 +361,6 @@ static int ms_conn_init(ms_conn_t *c, { c->protocol= binary_prot; } - else if (is_udp) - { - c->protocol= ascii_udp_prot; - } else { c->protocol= ascii_prot; @@ -467,9 +473,9 @@ static int ms_item_win_init(ms_conn_t *c) static int ms_conn_sock_init(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - int i; + uint32_t i; int ret_sfd; - int srv_idx= 0; + uint32_t srv_idx= 0; assert(c != NULL); assert(c->tcpsfd != NULL); @@ -480,7 +486,7 @@ static int ms_conn_sock_init(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { /* for replication, each connection need connect all the server */ - srv_idx= i; + srv_idx= i % ms_setting.srv_cnt; } else { @@ -532,7 +538,7 @@ static int ms_conn_sock_init(ms_conn_t *c) } else { - for (int j= 0; j < i; j++) + for (uint32_t j= 0; j < i; j++) { close(c->tcpsfd[j]); } @@ -561,30 +567,15 @@ static int ms_conn_sock_init(ms_conn_t *c) static int ms_conn_event_init(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; short event_flags= EV_WRITE | EV_PERSIST; event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); event_base_set(ms_thread->base, &c->event); c->ev_flags= event_flags; - if (c->total_sfds == 1) - { - if (event_add(&c->event, NULL) == -1) - { - return -1; - } - } - else + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, &t) == -1) - { - return -1; - } + return -1; } return 0; @@ -677,7 +668,7 @@ static void ms_conn_close(ms_conn_t *c) /* delete the event, the socket and the connection */ event_del(&c->event); - for (int i= 0; i < c->total_sfds; i++) + for (uint32_t i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] > 0) { @@ -806,7 +797,11 @@ static int ms_network_connect(ms_conn_t *c, * that otherwise mess things up. */ memset(&hints, 0, sizeof(hints)); +#ifdef AI_ADDRCONFIG hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG; +#else ++ hints.ai_flags= AI_PASSIVE; +#endif /* AI_ADDRCONFIG */ if (is_udp) { hints.ai_protocol= IPPROTO_UDP; @@ -903,13 +898,13 @@ static int ms_network_connect(ms_conn_t *c, static int ms_reconn(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - int srv_idx= 0; - int32_t srv_conn_cnt= 0; + uint32_t srv_idx= 0; + uint32_t srv_conn_cnt= 0; if (ms_setting.rep_write_srv > 0) { - srv_idx= c->cur_idx; - srv_conn_cnt= ms_setting.nconns; + srv_idx= c->cur_idx % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; } else { @@ -922,7 +917,7 @@ static int ms_reconn(ms_conn_t *c) c->tcpsfd[c->cur_idx]= 0; if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) - % (uint32_t)srv_conn_cnt == 0) + % srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); fprintf(stderr, "Server %s:%d disconnect\n", @@ -932,7 +927,8 @@ static int ms_reconn(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - int i= 0; + uint32_t i= 0; + for (i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] != 0) @@ -972,13 +968,13 @@ static int ms_reconn(ms_conn_t *c) break; } - if (c->total_sfds == 1) + if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) { /* wait a second and reconnect */ sleep(1); } } - while (c->total_sfds == 1); + while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); } if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) @@ -1004,9 +1000,9 @@ static int ms_reconn(ms_conn_t *c) int ms_reconn_socks(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - int srv_idx= 0; + uint32_t srv_idx= 0; int ret_sfd= 0; - int srv_conn_cnt= 0; + uint32_t srv_conn_cnt= 0; struct timeval cur_time; assert(c != NULL); @@ -1016,7 +1012,7 @@ int ms_reconn_socks(ms_conn_t *c) return 0; } - for (int i= 0; i < c->total_sfds; i++) + for (uint32_t i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] == 0) { @@ -1036,8 +1032,8 @@ int ms_reconn_socks(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - srv_idx= i; - srv_conn_cnt= ms_setting.nconns; + srv_idx= i % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; } else { @@ -1297,6 +1293,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) c->ctnwrite= false; c->rbytes= 0; c->rcurr= c->rbuf; + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; ms_conn_set_state(c, conn_write); memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ @@ -1589,7 +1588,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) if (res > 0) { - atomic_add_64(&ms_stats.bytes_read, res); + atomic_add_size(&ms_stats.bytes_read, res); c->rudpbytes+= res; rbytes+= res; if (res == avail) @@ -1623,7 +1622,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len) if (copybytes == -1) { - atomic_add_64(&ms_stats.pkt_disorder, 1); + atomic_add_size(&ms_stats.pkt_disorder, 1); } return copybytes; @@ -1701,7 +1700,7 @@ static int ms_try_read_network(ms_conn_t *c) { if (! c->udp) { - atomic_add_64(&ms_stats.bytes_read, res); + atomic_add_size(&ms_stats.bytes_read, res); } gotdata= 1; c->rbytes+= res; @@ -1765,7 +1764,7 @@ static void ms_verify_value(ms_conn_t *c, if (curr_time.tv_sec - c->curr_task.item->client_time > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) { - atomic_add_64(&ms_stats.exp_get, 1); + atomic_add_size(&ms_stats.exp_get, 1); if (ms_setting.verbose) { @@ -1806,7 +1805,7 @@ static void ms_verify_value(ms_conn_t *c, if ((c->curr_task.item->value_size != vlen) || (memcmp(orignval, value, (size_t)vlen) != 0)) { - atomic_add_64(&ms_stats.vef_failed, 1); + atomic_add_size(&ms_stats.vef_failed, 1); if (ms_setting.verbose) { @@ -1854,7 +1853,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot); + assert(c->protocol == ascii_prot); if (c->rvbytes > 2) { assert( @@ -1994,8 +1993,7 @@ static void ms_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot - || c->protocol == ascii_prot + assert(c->protocol == ascii_prot || c->protocol == binary_prot); if (c->protocol == binary_prot) @@ -2254,7 +2252,7 @@ static int ms_transmit(ms_conn_t *c) res= sendmsg(c->sfd, m, 0); if (res > 0) { - atomic_add_64(&ms_stats.bytes_written, res); + atomic_add_size(&ms_stats.bytes_written, res); /* We've written some of the data. Remove the completed * iovec entries from the list of pending writes. */ @@ -2409,12 +2407,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state) */ static bool ms_update_event(ms_conn_t *c, const int new_flags) { - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; - assert(c != NULL); struct event_base *base= c->event.ev_base; @@ -2441,19 +2433,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) event_base_set(base, &c->event); c->ev_flags= (short)new_flags; - if (c->total_sfds == 1) - { - if (event_add(&c->event, NULL) == -1) - { - return false; - } - } - else + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, &t) == -1) - { - return false; - } + return false; } return true; @@ -2730,12 +2712,6 @@ void ms_event_handler(const int fd, const short which, void *arg) } assert(fd == c->sfd); - /* event timeout, close the current connection */ - if (c->which == EV_TIMEOUT) - { - ms_conn_set_state(c, conn_closing); - } - ms_drive_machine(c); /* wait for next event */ @@ -2750,10 +2726,10 @@ void ms_event_handler(const int fd, const short which, void *arg) * * @return int, if success, return the index, else return 0 */ -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) { - int sock_index= -1; - int i= 0; + uint32_t sock_index= 0; + uint32_t i= 0; if (c->total_sfds == 1) { @@ -2780,18 +2756,18 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) if (i == ms_setting.rep_write_srv) { /* random get one replication server to read */ - sock_index= (int)(random() % c->total_sfds); + sock_index= (uint32_t)random() % c->total_sfds; } else { /* random get one replication writing server to write */ - sock_index= (int)(random() % ms_setting.rep_write_srv); + sock_index= (uint32_t)random() % ms_setting.rep_write_srv; } } else if (cmd == CMD_GET) { /* random get one replication server to read */ - sock_index= (int)(random() % c->total_sfds); + sock_index= (uint32_t)random() % c->total_sfds; } } while (c->tcpsfd[sock_index] == 0); @@ -2807,9 +2783,9 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) * * @return int, return the index */ -static int ms_get_next_sock_index(ms_conn_t *c) +static uint32_t ms_get_next_sock_index(ms_conn_t *c) { - int sock_index= 0; + uint32_t sock_index= 0; do { @@ -2997,9 +2973,9 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) } } - atomic_add_64(&ms_stats.obj_bytes, - item->key_size + item->value_size); - atomic_add_64(&ms_stats.cmd_set, 1); + atomic_add_size(&ms_stats.obj_bytes, + item->key_size + item->value_size); + atomic_add_size(&ms_stats.cmd_set, 1); return 0; } /* ms_mcd_set */ @@ -3038,15 +3014,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param c, pointer of the concurrency * @param item, pointer of task item which includes the object * information - * @param verify, whether do verification * * @return int, if success, return 0, else return -1 */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) { - /* verify not supported yet */ - UNUSED_ARGUMENT(verify); - assert(c != NULL); c->currcmd.cmd= CMD_GET; @@ -3083,7 +3055,7 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) } } - atomic_add_64(&ms_stats.cmd_get, 1); + atomic_add_size(&ms_stats.cmd_get, 1); return 0; } /* ms_mcd_get */ @@ -3182,7 +3154,7 @@ int ms_mcd_mlget(ms_conn_t *c) for (int i= 0; i < c->mlget_task.mlget_num; i++) { item= c->mlget_task.mlget_item[i].item; - atomic_add_64(&ms_stats.cmd_get, 1); + atomic_add_size(&ms_stats.cmd_get, 1); } return 0;