#include "ms_atomic.h"
/* global variable */
-__thread ms_thread_t ms_thread; /* each thread with a private ms_thread structure */
+pthread_key_t ms_thread_key;
/* array of thread context structure, each thread has a thread context structure */
static ms_thread_ctx_t *ms_thread_ctx;
static void ms_set_current_time()
{
struct timeval timer;
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
gettimeofday(&timer, NULL);
- ms_thread.curr_time= (rel_time_t)timer.tv_sec;
+ ms_thread->curr_time= (rel_time_t)timer.tv_sec;
} /* ms_set_current_time */
*/
static void ms_check_sock_timeout(void)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
ms_conn_t *c= NULL;
int time_diff= 0;
- for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
+ for (int i= 0; i < ms_thread->thread_ctx->nconns; i++)
{
- c= &ms_thread.conn[i];
+ c= &ms_thread->conn[i];
if (c->udp)
{
- time_diff= (int)(ms_thread.curr_time - c->start_time.tv_sec);
+ time_diff= (int)(ms_thread->curr_time - c->start_time.tv_sec);
/* wait time out */
if (time_diff > SOCK_WAIT_TIMEOUT)
/* calculate dropped packets count */
if (c->recvpkt > 0)
{
- __sync_fetch_and_add(&ms_stats.pkt_drop, c->packets - c->recvpkt);
+ atomic_add_64(&ms_stats.pkt_drop, c->packets - c->recvpkt);
}
- __sync_fetch_and_add(&ms_stats.udp_timeout, 1);
+ atomic_add_64(&ms_stats.udp_timeout, 1);
ms_reset_conn(c, true);
}
}
/* if disconnect, the ever-1-second timer will call this function to reconnect */
static void ms_reconn_thread_socks(void)
{
- for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ for (int i= 0; i < ms_thread->thread_ctx->nconns; i++)
{
- ms_reconn_socks(&ms_thread.conn[i]);
+ ms_reconn_socks(&ms_thread->conn[i]);
}
} /* ms_reconn_thread_socks */
*/
static void ms_clock_handler(const int fd, const short which, void *arg)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
struct timeval t=
{
.tv_sec= 1, .tv_usec= 0
ms_set_current_time();
- if (ms_thread.initialized)
+ if (ms_thread->initialized)
{
/* only delete the event if it's actually there. */
- evtimer_del(&ms_thread.clock_event);
+ evtimer_del(&ms_thread->clock_event);
ms_check_sock_timeout();
}
else
{
- ms_thread.initialized= true;
+ ms_thread->initialized= true;
}
ms_reconn_thread_socks();
- evtimer_set(&ms_thread.clock_event, ms_clock_handler, 0);
- event_base_set(ms_thread.base, &ms_thread.clock_event);
- evtimer_add(&ms_thread.clock_event, &t);
+ evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0);
+ event_base_set(ms_thread->base, &ms_thread->clock_event);
+ evtimer_add(&ms_thread->clock_event, &t);
} /* ms_clock_handler */
*/
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
{
- ms_thread.thread_ctx= thread_ctx;
- ms_thread.nactive_conn= thread_ctx->nconns;
- ms_thread.initialized= false;
- static int cnt= 0;
+
+ ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1);
+ pthread_setspecific(ms_thread_key, (void *)ms_thread);
- gettimeofday(&ms_thread.startup_time, NULL);
+ ms_thread->thread_ctx= thread_ctx;
+ ms_thread->nactive_conn= thread_ctx->nconns;
+ ms_thread->initialized= false;
+ static volatile uint32_t cnt= 0;
- ms_thread.base= event_init();
- if (ms_thread.base == NULL)
+ gettimeofday(&ms_thread->startup_time, NULL);
+
+ ms_thread->base= event_init();
+ if (ms_thread->base == NULL)
{
- if (__sync_fetch_and_add(&cnt, 1) == 0)
+ if (atomic_add_32_nv(&cnt, 1) == 0)
{
fprintf(stderr, "Can't allocate event base.\n");
}
return -1;
}
- ms_thread.conn=
+ ms_thread->conn=
(ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
- if (ms_thread.conn == NULL)
+ if (ms_thread->conn == NULL)
{
- if (__sync_fetch_and_add(&cnt, 1) == 0)
+ if (atomic_add_32_nv(&cnt, 1) == 0)
{
fprintf(
stderr,
return -1;
}
- memset(ms_thread.conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
+ memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
for (int i= 0; i < thread_ctx->nconns; i++)
{
- ms_thread.conn[i].conn_idx= i;
- if (ms_setup_conn(&ms_thread.conn[i]) != 0)
+ ms_thread->conn[i].conn_idx= i;
+ if (ms_setup_conn(&ms_thread->conn[i]) != 0)
{
/* only output this error once */
- if (__sync_fetch_and_add(&cnt, 1) == 0)
+ if (atomic_add_32_nv(&cnt, 1) == 0)
{
fprintf(stderr, "Initializing connection failed.\n");
}
*/
static void *ms_worker_libevent(void *arg)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
/**
/* each thread with a timer */
ms_clock_handler(0, 0, 0);
- event_base_loop(ms_thread.base, 0);
+ event_base_loop(ms_thread->base, 0);
return NULL;
} /* ms_worker_libevent */
/ ms_setting.nconns;
}
+ if (pthread_key_create(&ms_thread_key, NULL))
+ {
+ fprintf(stderr, "Can't create pthread keys. Major malfunction!\n");
+ exit(1);
+ }
/* Create threads after we've done all the epoll setup. */
for (int i= 0; i < ms_setting.nthreads; i++)
{
{
free(ms_thread_ctx);
}
+ pthread_key_delete(ms_thread_key);
} /* ms_thread_cleanup */