Merge lp:~tangent-org/libmemcached/1.0-build/ Build: jenkins-Libmemcached-170
[awesomized/libmemcached] / clients / ms_thread.c
index f381c115fcae985b09ae35729094c4fb8fc2ad51..264cca7c3a0e162b2d6a1083ca9dc77c33915fff 100644 (file)
@@ -8,11 +8,23 @@
  * http://www.schoonerinfotech.com/
  *
  */
+
+#include "mem_config.h"
+
+#if defined(HAVE_SYS_TIME_H)
+# include <sys/time.h>
+#endif
+
+#if defined(HAVE_TIME_H)
+# include <time.h>
+#endif
+
 #include "ms_thread.h"
 #include "ms_setting.h"
+#include "ms_atomic.h"
 
 /* global variable */
-__thread ms_thread_t ms_thread;    /* each thread with a private ms_thread structure */
+pthread_key_t ms_thread_key;
 
 /* array of thread context structure, each thread has a thread context structure */
 static ms_thread_ctx_t *ms_thread_ctx;
@@ -21,7 +33,7 @@ static ms_thread_ctx_t *ms_thread_ctx;
 static void ms_set_current_time(void);
 static void ms_check_sock_timeout(void);
 static void ms_clock_handler(const int fd, const short which, void *arg);
-static int ms_set_thread_cpu_affinity(int cpu);
+static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu);
 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
 static void *ms_worker_libevent(void *arg);
 static void ms_create_worker(void *(*func)(void *), void *arg);
@@ -33,11 +45,13 @@ static void ms_create_worker(void *(*func)(void *), void *arg);
  */
 static void ms_set_current_time()
 {
-    struct timeval timer;
+  struct timeval timer;
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+
+  gettimeofday(&timer, NULL);
+  ms_thread->curr_time= (rel_time_t)timer.tv_sec;
+} /* ms_set_current_time */
 
-    gettimeofday(&timer, NULL);
-    ms_thread.curr_time = (rel_time_t)timer.tv_sec;
-}
 
 /**
  *  used to check whether UDP of command are waiting timeout
@@ -45,36 +59,45 @@ static void ms_set_current_time()
  */
 static void ms_check_sock_timeout(void)
 {
-    ms_conn_t *c = NULL;
-    int time_diff = 0;
-
-    for (int i = 0; i < ms_thread.thread_ctx->nconns; i++) {
-        c = &ms_thread.conn[i];
-
-        if (c->udp) {
-            time_diff = (int)(ms_thread.curr_time - c->start_time.tv_sec);
-
-            /* wait time out */
-            if (time_diff > SOCK_WAIT_TIMEOUT) {
-                /* calculate dropped packets count */
-                if (c->recvpkt > 0) {
-                    __sync_fetch_and_add(&ms_stats.pkt_drop, c->packets - c->recvpkt);
-                }
-
-                __sync_fetch_and_add(&ms_stats.udp_timeout, 1);
-                ms_reset_conn(c, true);
-            }
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+  ms_conn_t *c= NULL;
+  int time_diff= 0;
+
+  for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
+  {
+    c= &ms_thread->conn[i];
+
+    if (c->udp)
+    {
+      time_diff= (int)(ms_thread->curr_time - (rel_time_t)c->start_time.tv_sec);
+
+      /* wait time out */
+      if (time_diff > SOCK_WAIT_TIMEOUT)
+      {
+        /* calculate dropped packets count */
+        if (c->recvpkt > 0)
+        {
+          atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt);
         }
+
+        atomic_add_size(&ms_stats.udp_timeout, 1);
+        ms_reset_conn(c, true);
+      }
     }
-}
+  }
+} /* ms_check_sock_timeout */
+
 
 /* if disconnect, the ever-1-second timer will call this function to reconnect */
 static void ms_reconn_thread_socks(void)
 {
-    for (int i = 0; i < ms_thread.thread_ctx->nconns; i++) {
-        ms_reconn_socks(&ms_thread.conn[i]);
-    }
-}
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+  for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
+  {
+    ms_reconn_socks(&ms_thread->conn[i]);
+  }
+} /* ms_reconn_thread_socks */
+
 
 /**
  * the handler of the ever-1-second timer
@@ -85,106 +108,130 @@ static void ms_reconn_thread_socks(void)
  */
 static void ms_clock_handler(const int fd, const short which, void *arg)
 {
-    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+  struct timeval t=
+  {
+    .tv_sec= 1, .tv_usec= 0
+  };
 
-    UNUSED_ARGUMENT(fd);
-    UNUSED_ARGUMENT(which);
-    UNUSED_ARGUMENT(arg);
+  UNUSED_ARGUMENT(fd);
+  UNUSED_ARGUMENT(which);
+  UNUSED_ARGUMENT(arg);
 
-    ms_set_current_time();
+  ms_set_current_time();
 
-    if (ms_thread.initialized) {
-        /* only delete the event if it's actually there. */
-        evtimer_del(&ms_thread.clock_event);
-        ms_check_sock_timeout();
-    } else {
-        ms_thread.initialized = true;
-    }
+  if (ms_thread->initialized)
+  {
+    /* only delete the event if it's actually there. */
+    evtimer_del(&ms_thread->clock_event);
+    ms_check_sock_timeout();
+  }
+  else
+  {
+    ms_thread->initialized= true;
+  }
+
+  ms_reconn_thread_socks();
 
-    ms_reconn_thread_socks();
+  evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0);
+  event_base_set(ms_thread->base, &ms_thread->clock_event);
+  evtimer_add(&ms_thread->clock_event, &t);
+} /* ms_clock_handler */
 
-    evtimer_set(&ms_thread.clock_event, ms_clock_handler, 0);
-    event_base_set(ms_thread.base, &ms_thread.clock_event);
-    evtimer_add(&ms_thread.clock_event, &t);
-}
 
 /**
  * used to bind thread to CPU if the system supports
  *
  * @param cpu, cpu index
  *
- * @return if success, return 0, else return -1
+ * @return if success, return EXIT_SUCCESS, else return -1
  */
-static int ms_set_thread_cpu_affinity(int cpu)
+static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu)
 {
-    int ret = 0;
+  uint32_t ret= 0;
 
 #ifdef HAVE_CPU_SET_T
-    cpu_set_t cpu_set;
-    CPU_ZERO(&cpu_set);
-    CPU_SET(cpu, &cpu_set);
-
-    if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1){
-        fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
-        ret = 1;
-    }
+  cpu_set_t cpu_set;
+  CPU_ZERO(&cpu_set);
+  CPU_SET(cpu, &cpu_set);
+
+  if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
+  {
+    fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
+    ret= 1;
+  }
 #else
-    UNUSED_ARGUMENT(cpu);
-#endif    
+  UNUSED_ARGUMENT(cpu);
+#endif
+
+  return ret;
+} /* ms_set_thread_cpu_affinity */
 
-    return ret;
-}
 
 /**
  * Set up a thread's information.
  *
  * @param thread_ctx, pointer of the thread context structure
  *
- * @return if success, return 0, else return -1
+ * @return if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
 {
-    ms_thread.thread_ctx = thread_ctx;
-    ms_thread.nactive_conn = thread_ctx->nconns;
-    ms_thread.initialized = false;
-    static int cnt = 0;
 
-    gettimeofday(&ms_thread.startup_time, NULL);
-
-    ms_thread.base = event_init();
-    if (ms_thread.base == NULL) {
-        if (__sync_fetch_and_add(&cnt, 1) == 0) {
-            fprintf(stderr, "Can't allocate event base.\n");
-        }
+  ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1);
+  pthread_setspecific(ms_thread_key, (void *)ms_thread);
 
-        return -1;
-    }
+  ms_thread->thread_ctx= thread_ctx;
+  ms_thread->nactive_conn= thread_ctx->nconns;
+  ms_thread->initialized= false;
+  static volatile uint32_t cnt= 0;
 
-    ms_thread.conn = (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
-    if (ms_thread.conn == NULL) {
-        if (__sync_fetch_and_add(&cnt, 1) == 0) {
-            fprintf(stderr, "Can't allocate concurrency structure for thread descriptors.");
-        }
+  gettimeofday(&ms_thread->startup_time, NULL);
 
-        return -1;
+  ms_thread->base= event_init();
+  if (ms_thread->base == NULL)
+  {
+    if (atomic_add_32_nv(&cnt, 1) == 0)
+    {
+      fprintf(stderr, "Can't allocate event base.\n");
     }
-    memset(ms_thread.conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
-
-    for (int i = 0; i < thread_ctx->nconns; i++) {
-        ms_thread.conn[i].conn_idx = i;
-        if (ms_setup_conn(&ms_thread.conn[i]) != 0) {
 
-            /* only output this error once */
-            if (__sync_fetch_and_add(&cnt, 1) == 0) {
-                fprintf(stderr, "Initializing connection failed.\n");
-            }
+    return -1;
+  }
+
+  ms_thread->conn=
+    (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
+  if (ms_thread->conn == NULL)
+  {
+    if (atomic_add_32_nv(&cnt, 1) == 0)
+    {
+      fprintf(
+        stderr,
+        "Can't allocate concurrency structure for thread descriptors.");
+    }
 
-            return -1;
-        }
+    return -1;
+  }
+  memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
+
+  for (uint32_t i= 0; i < thread_ctx->nconns; i++)
+  {
+    ms_thread->conn[i].conn_idx= i;
+    if (ms_setup_conn(&ms_thread->conn[i]) != 0)
+    {
+      /* only output this error once */
+      if (atomic_add_32_nv(&cnt, 1) == 0)
+      {
+        fprintf(stderr, "Initializing connection failed.\n");
+      }
+
+      return -1;
     }
+  }
+
+  return EXIT_SUCCESS;
+} /* ms_setup_thread */
 
-    return 0;
-}
 
 /**
  * Worker thread: main event loop
@@ -195,27 +242,37 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
  */
 static void *ms_worker_libevent(void *arg)
 {
-    ms_thread_ctx_t *thread_ctx = (ms_thread_ctx_t *)arg;
+  ms_thread_t *ms_thread= NULL;
+  ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
 
-    /**
-     * If system has more than one cpu and supports set cpu
-     * affinity, try to bind each thread to a cpu core;
-     */
-    if (ms_setting.ncpu > 1) {
-        ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
-    }
+  /**
+   * If system has more than one cpu and supports set cpu
+   * affinity, try to bind each thread to a cpu core;
+   */
+  if (ms_setting.ncpu > 1)
+  {
+    ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
+  }
 
-    if (ms_setup_thread(thread_ctx) != 0) {
-        exit(1);
-    }
+  if (ms_setup_thread(thread_ctx) != 0)
+  {
+    exit(1);
+  }
 
-    /* each thread with a timer */
-    ms_clock_handler(0, 0, 0);
+  /* each thread with a timer */
+  ms_clock_handler(0, 0, 0);
 
-    event_base_loop(ms_thread.base, 0);
+  pthread_mutex_lock(&ms_global.init_lock.lock);
+  ms_global.init_lock.count++;
+  pthread_cond_signal(&ms_global.init_lock.cond);
+  pthread_mutex_unlock(&ms_global.init_lock.lock);
+
+  ms_thread= pthread_getspecific(ms_thread_key);
+  event_base_loop(ms_thread->base, 0);
+
+  return NULL;
+} /* ms_worker_libevent */
 
-    return NULL;
-}
 
 /**
  * Creates a worker thread.
@@ -225,51 +282,68 @@ static void *ms_worker_libevent(void *arg)
  */
 static void ms_create_worker(void *(*func)(void *), void *arg)
 {
-    pthread_t       thread;
-    pthread_attr_t  attr;
-    int             ret;
+  pthread_t thread;
+  pthread_attr_t attr;
+  int ret;
 
-    pthread_attr_init(&attr);
+  pthread_attr_init(&attr);
+
+  if ((ret= pthread_create(&thread, &attr, func, arg)) != 0)
+  {
+    fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
+    exit(1);
+  }
+} /* ms_create_worker */
 
-    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
-        fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
-        exit(1);
-    }
-}
 
 /* initialize threads */
 void ms_thread_init()
 {
-    ms_thread_ctx = (ms_thread_ctx_t *)malloc(sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
-    if (ms_thread_ctx == NULL) {
-        fprintf(stderr, "Can't allocate thread descriptors.");
-        exit(1);
-    }
+  ms_thread_ctx=
+    (ms_thread_ctx_t *)malloc(
+      sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
+  if (ms_thread_ctx == NULL)
+  {
+    fprintf(stderr, "Can't allocate thread descriptors.");
+    exit(1);
+  }
+
+  for (uint32_t i= 0; i < ms_setting.nthreads; i++)
+  {
+    ms_thread_ctx[i].thd_idx= i;
+    ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
 
-    for (int i = 0; i < ms_setting.nthreads; i++) {
-        ms_thread_ctx[i].thd_idx = i;
-        ms_thread_ctx[i].nconns = ms_setting.nconns / ms_setting.nthreads;
-
-        /**
-         *  If only one server, all the connections in all threads
-         *  connects the same server. For support multi-servers, simple
-         *  distribute thread to server.
-         */
-        ms_thread_ctx[i].srv_idx = i % ms_setting.srv_cnt;
-        ms_thread_ctx[i].tps_perconn = ms_setting.expected_tps / ms_setting.nconns;
-        ms_thread_ctx[i].exec_num_perconn = ms_setting.exec_num / ms_setting.nconns;
-    }
+    /**
+     *  If only one server, all the connections in all threads
+     *  connects the same server. For support multi-servers, simple
+     *  distribute thread to server.
+     */
+    ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
+    ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
+                                  / (int)ms_setting.nconns;
+    ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
+                                       / ms_setting.nconns;
+  }
+
+  if (pthread_key_create(&ms_thread_key, NULL))
+  {
+    fprintf(stderr, "Can't create pthread keys. Major malfunction!\n");
+    exit(1);
+  }
+  /* Create threads after we've done all the epoll setup. */
+  for (uint32_t i= 0; i < ms_setting.nthreads; i++)
+  {
+    ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
+  }
+} /* ms_thread_init */
 
-    /* Create threads after we've done all the epoll setup. */
-    for (int i = 0; i < ms_setting.nthreads; i++) {
-        ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
-    }
-}
 
 /* cleanup some resource of threads when all the threads exit */
 void ms_thread_cleanup()
 {
-    if (ms_thread_ctx != NULL) {
-        free(ms_thread_ctx);
-    }
-}
+  if (ms_thread_ctx != NULL)
+  {
+    free(ms_thread_ctx);
+  }
+  pthread_key_delete(ms_thread_key);
+} /* ms_thread_cleanup */