Merge lp:~tangent-org/libmemcached/1.0-build/ Build: jenkins-Libmemcached-170
[awesomized/libmemcached] / clients / ms_thread.c
index 9af3a10b24bf5897150e702aa102c843a37bdb99..264cca7c3a0e162b2d6a1083ca9dc77c33915fff 100644 (file)
@@ -9,13 +9,22 @@
  *
  */
 
-#include "config.h"
+#include "mem_config.h"
+
+#if defined(HAVE_SYS_TIME_H)
+# include <sys/time.h>
+#endif
+
+#if defined(HAVE_TIME_H)
+# include <time.h>
+#endif
 
 #include "ms_thread.h"
 #include "ms_setting.h"
+#include "ms_atomic.h"
 
 /* global variable */
-__thread ms_thread_t ms_thread;    /* each thread with a private ms_thread structure */
+pthread_key_t ms_thread_key;
 
 /* array of thread context structure, each thread has a thread context structure */
 static ms_thread_ctx_t *ms_thread_ctx;
@@ -24,7 +33,7 @@ static ms_thread_ctx_t *ms_thread_ctx;
 static void ms_set_current_time(void);
 static void ms_check_sock_timeout(void);
 static void ms_clock_handler(const int fd, const short which, void *arg);
-static int ms_set_thread_cpu_affinity(int cpu);
+static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu);
 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
 static void *ms_worker_libevent(void *arg);
 static void ms_create_worker(void *(*func)(void *), void *arg);
@@ -37,9 +46,10 @@ static void ms_create_worker(void *(*func)(void *), void *arg);
 static void ms_set_current_time()
 {
   struct timeval timer;
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
 
   gettimeofday(&timer, NULL);
-  ms_thread.curr_time= (rel_time_t)timer.tv_sec;
+  ms_thread->curr_time= (rel_time_t)timer.tv_sec;
 } /* ms_set_current_time */
 
 
@@ -49,16 +59,17 @@ static void ms_set_current_time()
  */
 static void ms_check_sock_timeout(void)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   ms_conn_t *c= NULL;
   int time_diff= 0;
 
-  for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
+  for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
   {
-    c= &ms_thread.conn[i];
+    c= &ms_thread->conn[i];
 
     if (c->udp)
     {
-      time_diff= (int)(ms_thread.curr_time - c->start_time.tv_sec);
+      time_diff= (int)(ms_thread->curr_time - (rel_time_t)c->start_time.tv_sec);
 
       /* wait time out */
       if (time_diff > SOCK_WAIT_TIMEOUT)
@@ -66,10 +77,10 @@ static void ms_check_sock_timeout(void)
         /* calculate dropped packets count */
         if (c->recvpkt > 0)
         {
-          __sync_fetch_and_add(&ms_stats.pkt_drop, c->packets - c->recvpkt);
+          atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt);
         }
 
-        __sync_fetch_and_add(&ms_stats.udp_timeout, 1);
+        atomic_add_size(&ms_stats.udp_timeout, 1);
         ms_reset_conn(c, true);
       }
     }
@@ -80,9 +91,10 @@ static void ms_check_sock_timeout(void)
 /* if disconnect, the ever-1-second timer will call this function to reconnect */
 static void ms_reconn_thread_socks(void)
 {
-  for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+  for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
   {
-    ms_reconn_socks(&ms_thread.conn[i]);
+    ms_reconn_socks(&ms_thread->conn[i]);
   }
 } /* ms_reconn_thread_socks */
 
@@ -96,6 +108,7 @@ static void ms_reconn_thread_socks(void)
  */
 static void ms_clock_handler(const int fd, const short which, void *arg)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   struct timeval t=
   {
     .tv_sec= 1, .tv_usec= 0
@@ -107,22 +120,22 @@ static void ms_clock_handler(const int fd, const short which, void *arg)
 
   ms_set_current_time();
 
-  if (ms_thread.initialized)
+  if (ms_thread->initialized)
   {
     /* only delete the event if it's actually there. */
-    evtimer_del(&ms_thread.clock_event);
+    evtimer_del(&ms_thread->clock_event);
     ms_check_sock_timeout();
   }
   else
   {
-    ms_thread.initialized= true;
+    ms_thread->initialized= true;
   }
 
   ms_reconn_thread_socks();
 
-  evtimer_set(&ms_thread.clock_event, ms_clock_handler, 0);
-  event_base_set(ms_thread.base, &ms_thread.clock_event);
-  evtimer_add(&ms_thread.clock_event, &t);
+  evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0);
+  event_base_set(ms_thread->base, &ms_thread->clock_event);
+  evtimer_add(&ms_thread->clock_event, &t);
 } /* ms_clock_handler */
 
 
@@ -131,11 +144,11 @@ static void ms_clock_handler(const int fd, const short which, void *arg)
  *
  * @param cpu, cpu index
  *
- * @return if success, return 0, else return -1
+ * @return if success, return EXIT_SUCCESS, else return -1
  */
-static int ms_set_thread_cpu_affinity(int cpu)
+static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu)
 {
-  int ret= 0;
+  uint32_t ret= 0;
 
 #ifdef HAVE_CPU_SET_T
   cpu_set_t cpu_set;
@@ -160,21 +173,25 @@ static int ms_set_thread_cpu_affinity(int cpu)
  *
  * @param thread_ctx, pointer of the thread context structure
  *
- * @return if success, return 0, else return -1
+ * @return if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
 {
-  ms_thread.thread_ctx= thread_ctx;
-  ms_thread.nactive_conn= thread_ctx->nconns;
-  ms_thread.initialized= false;
-  static int cnt= 0;
 
-  gettimeofday(&ms_thread.startup_time, NULL);
+  ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1);
+  pthread_setspecific(ms_thread_key, (void *)ms_thread);
+
+  ms_thread->thread_ctx= thread_ctx;
+  ms_thread->nactive_conn= thread_ctx->nconns;
+  ms_thread->initialized= false;
+  static volatile uint32_t cnt= 0;
 
-  ms_thread.base= event_init();
-  if (ms_thread.base == NULL)
+  gettimeofday(&ms_thread->startup_time, NULL);
+
+  ms_thread->base= event_init();
+  if (ms_thread->base == NULL)
   {
-    if (__sync_fetch_and_add(&cnt, 1) == 0)
+    if (atomic_add_32_nv(&cnt, 1) == 0)
     {
       fprintf(stderr, "Can't allocate event base.\n");
     }
@@ -182,11 +199,11 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
     return -1;
   }
 
-  ms_thread.conn=
+  ms_thread->conn=
     (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
-  if (ms_thread.conn == NULL)
+  if (ms_thread->conn == NULL)
   {
-    if (__sync_fetch_and_add(&cnt, 1) == 0)
+    if (atomic_add_32_nv(&cnt, 1) == 0)
     {
       fprintf(
         stderr,
@@ -195,15 +212,15 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
 
     return -1;
   }
-  memset(ms_thread.conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
+  memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
 
-  for (int i= 0; i < thread_ctx->nconns; i++)
+  for (uint32_t i= 0; i < thread_ctx->nconns; i++)
   {
-    ms_thread.conn[i].conn_idx= i;
-    if (ms_setup_conn(&ms_thread.conn[i]) != 0)
+    ms_thread->conn[i].conn_idx= i;
+    if (ms_setup_conn(&ms_thread->conn[i]) != 0)
     {
       /* only output this error once */
-      if (__sync_fetch_and_add(&cnt, 1) == 0)
+      if (atomic_add_32_nv(&cnt, 1) == 0)
       {
         fprintf(stderr, "Initializing connection failed.\n");
       }
@@ -212,7 +229,7 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
     }
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_setup_thread */
 
 
@@ -225,6 +242,7 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
  */
 static void *ms_worker_libevent(void *arg)
 {
+  ms_thread_t *ms_thread= NULL;
   ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
 
   /**
@@ -244,7 +262,13 @@ static void *ms_worker_libevent(void *arg)
   /* each thread with a timer */
   ms_clock_handler(0, 0, 0);
 
-  event_base_loop(ms_thread.base, 0);
+  pthread_mutex_lock(&ms_global.init_lock.lock);
+  ms_global.init_lock.count++;
+  pthread_cond_signal(&ms_global.init_lock.cond);
+  pthread_mutex_unlock(&ms_global.init_lock.lock);
+
+  ms_thread= pthread_getspecific(ms_thread_key);
+  event_base_loop(ms_thread->base, 0);
 
   return NULL;
 } /* ms_worker_libevent */
@@ -284,7 +308,7 @@ void ms_thread_init()
     exit(1);
   }
 
-  for (int i= 0; i < ms_setting.nthreads; i++)
+  for (uint32_t i= 0; i < ms_setting.nthreads; i++)
   {
     ms_thread_ctx[i].thd_idx= i;
     ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
@@ -296,13 +320,18 @@ void ms_thread_init()
      */
     ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
     ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
-                                  / ms_setting.nconns;
+                                  / (int)ms_setting.nconns;
     ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
                                        / ms_setting.nconns;
   }
 
+  if (pthread_key_create(&ms_thread_key, NULL))
+  {
+    fprintf(stderr, "Can't create pthread keys. Major malfunction!\n");
+    exit(1);
+  }
   /* Create threads after we've done all the epoll setup. */
-  for (int i= 0; i < ms_setting.nthreads; i++)
+  for (uint32_t i= 0; i < ms_setting.nthreads; i++)
   {
     ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
   }
@@ -316,4 +345,5 @@ void ms_thread_cleanup()
   {
     free(ms_thread_ctx);
   }
+  pthread_key_delete(ms_thread_key);
 } /* ms_thread_cleanup */