X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=clients%2Fms_thread.c;h=1efe5cfbb6f62df9dc8975033065d6ae933db002;hb=450a9f280cd5d5b83da7561ff0362854f5c8b204;hp=9af3a10b24bf5897150e702aa102c843a37bdb99;hpb=e926cd518e3605dd3fa050734061a5fabf5460bd;p=m6w6%2Flibmemcached diff --git a/clients/ms_thread.c b/clients/ms_thread.c index 9af3a10b..1efe5cfb 100644 --- a/clients/ms_thread.c +++ b/clients/ms_thread.c @@ -11,11 +11,22 @@ #include "config.h" +#if TIME_WITH_SYS_TIME +# include +# include +#else +# if HAVE_SYS_TIME_H +# include +# else +# include +# endif +#endif #include "ms_thread.h" #include "ms_setting.h" +#include "ms_atomic.h" /* global variable */ -__thread ms_thread_t ms_thread; /* each thread with a private ms_thread structure */ +pthread_key_t ms_thread_key; /* array of thread context structure, each thread has a thread context structure */ static ms_thread_ctx_t *ms_thread_ctx; @@ -24,7 +35,7 @@ static ms_thread_ctx_t *ms_thread_ctx; static void ms_set_current_time(void); static void ms_check_sock_timeout(void); static void ms_clock_handler(const int fd, const short which, void *arg); -static int ms_set_thread_cpu_affinity(int cpu); +static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu); static int ms_setup_thread(ms_thread_ctx_t *thread_ctx); static void *ms_worker_libevent(void *arg); static void ms_create_worker(void *(*func)(void *), void *arg); @@ -37,9 +48,10 @@ static void ms_create_worker(void *(*func)(void *), void *arg); static void ms_set_current_time() { struct timeval timer; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); gettimeofday(&timer, NULL); - ms_thread.curr_time= (rel_time_t)timer.tv_sec; + ms_thread->curr_time= (rel_time_t)timer.tv_sec; } /* ms_set_current_time */ @@ -49,16 +61,17 @@ static void ms_set_current_time() */ static void ms_check_sock_timeout(void) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); ms_conn_t *c= NULL; int time_diff= 0; - for (int i= 0; i < ms_thread.thread_ctx->nconns; i++) + for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++) { - c= &ms_thread.conn[i]; + c= &ms_thread->conn[i]; if (c->udp) { - time_diff= (int)(ms_thread.curr_time - c->start_time.tv_sec); + time_diff= (int)(ms_thread->curr_time - (rel_time_t)c->start_time.tv_sec); /* wait time out */ if (time_diff > SOCK_WAIT_TIMEOUT) @@ -66,10 +79,10 @@ static void ms_check_sock_timeout(void) /* calculate dropped packets count */ if (c->recvpkt > 0) { - __sync_fetch_and_add(&ms_stats.pkt_drop, c->packets - c->recvpkt); + atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt); } - __sync_fetch_and_add(&ms_stats.udp_timeout, 1); + atomic_add_size(&ms_stats.udp_timeout, 1); ms_reset_conn(c, true); } } @@ -80,9 +93,10 @@ static void ms_check_sock_timeout(void) /* if disconnect, the ever-1-second timer will call this function to reconnect */ static void ms_reconn_thread_socks(void) { - for (int i= 0; i < ms_thread.thread_ctx->nconns; i++) + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++) { - ms_reconn_socks(&ms_thread.conn[i]); + ms_reconn_socks(&ms_thread->conn[i]); } } /* ms_reconn_thread_socks */ @@ -96,6 +110,7 @@ static void ms_reconn_thread_socks(void) */ static void ms_clock_handler(const int fd, const short which, void *arg) { + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); struct timeval t= { .tv_sec= 1, .tv_usec= 0 @@ -107,22 +122,22 @@ static void ms_clock_handler(const int fd, const short which, void *arg) ms_set_current_time(); - if (ms_thread.initialized) + if (ms_thread->initialized) { /* only delete the event if it's actually there. */ - evtimer_del(&ms_thread.clock_event); + evtimer_del(&ms_thread->clock_event); ms_check_sock_timeout(); } else { - ms_thread.initialized= true; + ms_thread->initialized= true; } ms_reconn_thread_socks(); - evtimer_set(&ms_thread.clock_event, ms_clock_handler, 0); - event_base_set(ms_thread.base, &ms_thread.clock_event); - evtimer_add(&ms_thread.clock_event, &t); + evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0); + event_base_set(ms_thread->base, &ms_thread->clock_event); + evtimer_add(&ms_thread->clock_event, &t); } /* ms_clock_handler */ @@ -131,11 +146,11 @@ static void ms_clock_handler(const int fd, const short which, void *arg) * * @param cpu, cpu index * - * @return if success, return 0, else return -1 + * @return if success, return EXIT_SUCCESS, else return -1 */ -static int ms_set_thread_cpu_affinity(int cpu) +static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu) { - int ret= 0; + uint32_t ret= 0; #ifdef HAVE_CPU_SET_T cpu_set_t cpu_set; @@ -160,21 +175,25 @@ static int ms_set_thread_cpu_affinity(int cpu) * * @param thread_ctx, pointer of the thread context structure * - * @return if success, return 0, else return -1 + * @return if success, return EXIT_SUCCESS, else return -1 */ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) { - ms_thread.thread_ctx= thread_ctx; - ms_thread.nactive_conn= thread_ctx->nconns; - ms_thread.initialized= false; - static int cnt= 0; - gettimeofday(&ms_thread.startup_time, NULL); + ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1); + pthread_setspecific(ms_thread_key, (void *)ms_thread); + + ms_thread->thread_ctx= thread_ctx; + ms_thread->nactive_conn= thread_ctx->nconns; + ms_thread->initialized= false; + static volatile uint32_t cnt= 0; + + gettimeofday(&ms_thread->startup_time, NULL); - ms_thread.base= event_init(); - if (ms_thread.base == NULL) + ms_thread->base= event_init(); + if (ms_thread->base == NULL) { - if (__sync_fetch_and_add(&cnt, 1) == 0) + if (atomic_add_32_nv(&cnt, 1) == 0) { fprintf(stderr, "Can't allocate event base.\n"); } @@ -182,11 +201,11 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) return -1; } - ms_thread.conn= + ms_thread->conn= (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t)); - if (ms_thread.conn == NULL) + if (ms_thread->conn == NULL) { - if (__sync_fetch_and_add(&cnt, 1) == 0) + if (atomic_add_32_nv(&cnt, 1) == 0) { fprintf( stderr, @@ -195,15 +214,15 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) return -1; } - memset(ms_thread.conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t)); + memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t)); - for (int i= 0; i < thread_ctx->nconns; i++) + for (uint32_t i= 0; i < thread_ctx->nconns; i++) { - ms_thread.conn[i].conn_idx= i; - if (ms_setup_conn(&ms_thread.conn[i]) != 0) + ms_thread->conn[i].conn_idx= i; + if (ms_setup_conn(&ms_thread->conn[i]) != 0) { /* only output this error once */ - if (__sync_fetch_and_add(&cnt, 1) == 0) + if (atomic_add_32_nv(&cnt, 1) == 0) { fprintf(stderr, "Initializing connection failed.\n"); } @@ -212,7 +231,7 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) } } - return 0; + return EXIT_SUCCESS; } /* ms_setup_thread */ @@ -225,6 +244,7 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) */ static void *ms_worker_libevent(void *arg) { + ms_thread_t *ms_thread= NULL; ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg; /** @@ -244,7 +264,13 @@ static void *ms_worker_libevent(void *arg) /* each thread with a timer */ ms_clock_handler(0, 0, 0); - event_base_loop(ms_thread.base, 0); + pthread_mutex_lock(&ms_global.init_lock.lock); + ms_global.init_lock.count++; + pthread_cond_signal(&ms_global.init_lock.cond); + pthread_mutex_unlock(&ms_global.init_lock.lock); + + ms_thread= pthread_getspecific(ms_thread_key); + event_base_loop(ms_thread->base, 0); return NULL; } /* ms_worker_libevent */ @@ -284,7 +310,7 @@ void ms_thread_init() exit(1); } - for (int i= 0; i < ms_setting.nthreads; i++) + for (uint32_t i= 0; i < ms_setting.nthreads; i++) { ms_thread_ctx[i].thd_idx= i; ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads; @@ -296,13 +322,18 @@ void ms_thread_init() */ ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt; ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps - / ms_setting.nconns; + / (int)ms_setting.nconns; ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num / ms_setting.nconns; } + if (pthread_key_create(&ms_thread_key, NULL)) + { + fprintf(stderr, "Can't create pthread keys. Major malfunction!\n"); + exit(1); + } /* Create threads after we've done all the epoll setup. */ - for (int i= 0; i < ms_setting.nthreads; i++) + for (uint32_t i= 0; i < ms_setting.nthreads; i++) { ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]); } @@ -316,4 +347,5 @@ void ms_thread_cleanup() { free(ms_thread_ctx); } + pthread_key_delete(ms_thread_key); } /* ms_thread_cleanup */