X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fms_conn.c;h=005879bdaaed7a994df109a7493fe82565a3fe0c;hb=c79b88bc75d37c933e9d05c83113ccb26ded3683;hp=c062dc2958df424eb1e165e0e6cf6ce33396033f;hpb=d39962cd481eb10a7d9dceb6d442af0fd49521fb;p=m6w6%2Flibmemcached diff --git a/clients/ms_conn.c b/clients/ms_conn.c index c062dc29..005879bd 100644 --- a/clients/ms_conn.c +++ b/clients/ms_conn.c @@ -19,6 +19,16 @@ #include #include #include +#if TIME_WITH_SYS_TIME +# include +# include +#else +# if HAVE_SYS_TIME_H +# include +# else +# include +# endif +#endif #include "ms_setting.h" #include "ms_thread.h" #include "ms_atomic.h" @@ -120,8 +130,8 @@ static int ms_transmit(ms_conn_t *c); static void ms_conn_shrink(ms_conn_t *c); static void ms_conn_set_state(ms_conn_t *c, int state); static bool ms_update_event(ms_conn_t *c, const int new_flags); -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd); -static int ms_get_next_sock_index(ms_conn_t *c); +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd); +static uint32_t ms_get_next_sock_index(ms_conn_t *c); static int ms_update_conn_sock_event(ms_conn_t *c); static bool ms_need_yield(ms_conn_t *c); static void ms_update_start_time(ms_conn_t *c); @@ -282,7 +292,7 @@ static int ms_conn_init(ms_conn_t *c, /* for replication, each connection need connect all the server */ if (ms_setting.rep_write_srv > 0) { - c->total_sfds= ms_setting.srv_cnt; + c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn; } else { @@ -351,10 +361,6 @@ static int ms_conn_init(ms_conn_t *c, { c->protocol= binary_prot; } - else if (is_udp) - { - c->protocol= ascii_udp_prot; - } else { c->protocol= ascii_prot; @@ -467,9 +473,9 @@ static int ms_item_win_init(ms_conn_t *c) static int ms_conn_sock_init(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - int i; + uint32_t i; int ret_sfd; - int srv_idx= 0; + uint32_t srv_idx= 0; assert(c != NULL); assert(c->tcpsfd != NULL); @@ -480,7 +486,7 @@ static int ms_conn_sock_init(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { /* for replication, each connection need connect all the server */ - srv_idx= i; + srv_idx= i % ms_setting.srv_cnt; } else { @@ -532,7 +538,7 @@ static int ms_conn_sock_init(ms_conn_t *c) } else { - for (int j= 0; j < i; j++) + for (uint32_t j= 0; j < i; j++) { close(c->tcpsfd[j]); } @@ -561,30 +567,15 @@ static int ms_conn_sock_init(ms_conn_t *c) static int ms_conn_event_init(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; short event_flags= EV_WRITE | EV_PERSIST; event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); event_base_set(ms_thread->base, &c->event); c->ev_flags= event_flags; - if (c->total_sfds == 1) - { - if (event_add(&c->event, NULL) == -1) - { - return -1; - } - } - else + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, &t) == -1) - { - return -1; - } + return -1; } return 0; @@ -677,7 +668,7 @@ static void ms_conn_close(ms_conn_t *c) /* delete the event, the socket and the connection */ event_del(&c->event); - for (int i= 0; i < c->total_sfds; i++) + for (uint32_t i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] > 0) { @@ -806,7 +797,11 @@ static int ms_network_connect(ms_conn_t *c, * that otherwise mess things up. */ memset(&hints, 0, sizeof(hints)); +#ifdef AI_ADDRCONFIG hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG; +#else ++ hints.ai_flags= AI_PASSIVE; +#endif /* AI_ADDRCONFIG */ if (is_udp) { hints.ai_protocol= IPPROTO_UDP; @@ -903,13 +898,13 @@ static int ms_network_connect(ms_conn_t *c, static int ms_reconn(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - int srv_idx= 0; - int32_t srv_conn_cnt= 0; + uint32_t srv_idx= 0; + uint32_t srv_conn_cnt= 0; if (ms_setting.rep_write_srv > 0) { - srv_idx= c->cur_idx; - srv_conn_cnt= ms_setting.nconns; + srv_idx= c->cur_idx % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; } else { @@ -922,7 +917,7 @@ static int ms_reconn(ms_conn_t *c) c->tcpsfd[c->cur_idx]= 0; if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) - % (uint32_t)srv_conn_cnt == 0) + % srv_conn_cnt == 0) { gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); fprintf(stderr, "Server %s:%d disconnect\n", @@ -932,7 +927,8 @@ static int ms_reconn(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - int i= 0; + uint32_t i= 0; + for (i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] != 0) @@ -972,13 +968,13 @@ static int ms_reconn(ms_conn_t *c) break; } - if (c->total_sfds == 1) + if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) { /* wait a second and reconnect */ sleep(1); } } - while (c->total_sfds == 1); + while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); } if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) @@ -1004,9 +1000,9 @@ static int ms_reconn(ms_conn_t *c) int ms_reconn_socks(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - int srv_idx= 0; + uint32_t srv_idx= 0; int ret_sfd= 0; - int srv_conn_cnt= 0; + uint32_t srv_conn_cnt= 0; struct timeval cur_time; assert(c != NULL); @@ -1016,7 +1012,7 @@ int ms_reconn_socks(ms_conn_t *c) return 0; } - for (int i= 0; i < c->total_sfds; i++) + for (uint32_t i= 0; i < c->total_sfds; i++) { if (c->tcpsfd[i] == 0) { @@ -1036,8 +1032,8 @@ int ms_reconn_socks(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - srv_idx= i; - srv_conn_cnt= ms_setting.nconns; + srv_idx= i % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; } else { @@ -1297,6 +1293,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) c->ctnwrite= false; c->rbytes= 0; c->rcurr= c->rbuf; + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; ms_conn_set_state(c, conn_write); memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ @@ -1854,7 +1853,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot); + assert(c->protocol == ascii_prot); if (c->rvbytes > 2) { assert( @@ -1994,8 +1993,7 @@ static void ms_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot - || c->protocol == ascii_prot + assert(c->protocol == ascii_prot || c->protocol == binary_prot); if (c->protocol == binary_prot) @@ -2409,12 +2407,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state) */ static bool ms_update_event(ms_conn_t *c, const int new_flags) { - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; - assert(c != NULL); struct event_base *base= c->event.ev_base; @@ -2441,19 +2433,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) event_base_set(base, &c->event); c->ev_flags= (short)new_flags; - if (c->total_sfds == 1) - { - if (event_add(&c->event, NULL) == -1) - { - return false; - } - } - else + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, &t) == -1) - { - return false; - } + return false; } return true; @@ -2730,12 +2712,6 @@ void ms_event_handler(const int fd, const short which, void *arg) } assert(fd == c->sfd); - /* event timeout, close the current connection */ - if (c->which == EV_TIMEOUT) - { - ms_conn_set_state(c, conn_closing); - } - ms_drive_machine(c); /* wait for next event */ @@ -2750,10 +2726,10 @@ void ms_event_handler(const int fd, const short which, void *arg) * * @return int, if success, return the index, else return 0 */ -static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) { - int sock_index= -1; - int i= 0; + uint32_t sock_index= 0; + uint32_t i= 0; if (c->total_sfds == 1) { @@ -2780,18 +2756,18 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) if (i == ms_setting.rep_write_srv) { /* random get one replication server to read */ - sock_index= (int)(random() % c->total_sfds); + sock_index= (uint32_t)random() % c->total_sfds; } else { /* random get one replication writing server to write */ - sock_index= (int)(random() % ms_setting.rep_write_srv); + sock_index= (uint32_t)random() % ms_setting.rep_write_srv; } } else if (cmd == CMD_GET) { /* random get one replication server to read */ - sock_index= (int)(random() % c->total_sfds); + sock_index= (uint32_t)random() % c->total_sfds; } } while (c->tcpsfd[sock_index] == 0); @@ -2807,9 +2783,9 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd) * * @return int, return the index */ -static int ms_get_next_sock_index(ms_conn_t *c) +static uint32_t ms_get_next_sock_index(ms_conn_t *c) { - int sock_index= 0; + uint32_t sock_index= 0; do { @@ -3038,15 +3014,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param c, pointer of the concurrency * @param item, pointer of task item which includes the object * information - * @param verify, whether do verification * * @return int, if success, return 0, else return -1 */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) { - /* verify not supported yet */ - UNUSED_ARGUMENT(verify); - assert(c != NULL); c->currcmd.cmd= CMD_GET;