1.Fix bug that memslap can't start up(ms_thread=NULL). 2.ignore signal SIGPIPE. 3...
authorXiaoyun Mao <xmao@lab14.schoonerinfotech.net>
Tue, 19 Jan 2010 03:38:53 +0000 (19:38 -0800)
committerXiaoyun Mao <xmao@lab14.schoonerinfotech.net>
Tue, 19 Jan 2010 03:38:53 +0000 (19:38 -0800)
clients/memslap.c
clients/ms_conn.c
clients/ms_conn.h
clients/ms_memslap.h
clients/ms_setting.c
clients/ms_sigsegv.c
clients/ms_task.c
clients/ms_thread.c

index 4b342e76f462f70303037c32a16fa4e8a76437c7..006d3c2b5d8488f97b249be37e7a3e6cf21250a3 100644 (file)
@@ -126,6 +126,10 @@ static void ms_sync_lock_init()
   pthread_mutex_init(&ms_global.init_lock.lock, NULL);
   pthread_cond_init(&ms_global.init_lock.cond, NULL);
 
+  ms_global.warmup_lock.count = 0;
+  pthread_mutex_init(&ms_global.warmup_lock.lock, NULL);
+  pthread_cond_init(&ms_global.warmup_lock.cond, NULL);
+
   ms_global.run_lock.count= 0;
   pthread_mutex_init(&ms_global.run_lock.lock, NULL);
   pthread_cond_init(&ms_global.run_lock.cond, NULL);
@@ -141,6 +145,9 @@ static void ms_sync_lock_destroy()
   pthread_mutex_destroy(&ms_global.init_lock.lock);
   pthread_cond_destroy(&ms_global.init_lock.cond);
 
+  pthread_mutex_destroy(&ms_global.warmup_lock.lock);
+  pthread_cond_destroy(&ms_global.warmup_lock.cond);
+
   pthread_mutex_destroy(&ms_global.run_lock.lock);
   pthread_cond_destroy(&ms_global.run_lock.cond);
 
@@ -771,19 +778,25 @@ static void ms_monitor_slap_mode()
   int second= 0;
   struct timeval start_time, end_time;
 
+  /* Wait all the threads complete initialization. */
+  pthread_mutex_lock(&ms_global.init_lock.lock);
+  while (ms_global.init_lock.count < ms_setting.nthreads)
+  {
+    pthread_cond_wait(&ms_global.init_lock.cond,
+                      &ms_global.init_lock.lock);
+  }
+  pthread_mutex_unlock(&ms_global.init_lock.lock);
+
   /* only when there is no set operation it need warm up */
   if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
   {
     /* Wait all the connects complete warm up. */
-    pthread_mutex_lock(&ms_global.init_lock.lock);
-    while (ms_global.init_lock.count < (int)ms_setting.nconns)
-    {
-      pthread_cond_wait(&ms_global.init_lock.cond,
-                        &ms_global.init_lock.lock);
+    pthread_mutex_lock(&ms_global.warmup_lock.lock);
+    while (ms_global.warmup_lock.count < (int)ms_setting.nconns) {
+      pthread_cond_wait(&ms_global.warmup_lock.cond, &ms_global.warmup_lock.lock);
     }
-    pthread_mutex_unlock(&ms_global.init_lock.lock);
+    pthread_mutex_unlock(&ms_global.warmup_lock.lock);
   }
-
   ms_global.finish_warmup= true;
 
   /* running in "run time" mode, user specify run time */
index b488401c126d6298919ad22cbb72f71bdb3c5327..32578b3e920323d34547b1d1133b44d30272eb3b 100644 (file)
@@ -292,7 +292,7 @@ static int ms_conn_init(ms_conn_t *c,
   /* for replication, each connection need connect all the server */
   if (ms_setting.rep_write_srv > 0)
   {
-    c->total_sfds= ms_setting.srv_cnt;
+    c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
   }
   else
   {
@@ -361,10 +361,6 @@ static int ms_conn_init(ms_conn_t *c,
   {
     c->protocol= binary_prot;
   }
-  else if (is_udp)
-  {
-    c->protocol= ascii_udp_prot;
-  }
   else
   {
     c->protocol= ascii_prot;
@@ -490,7 +486,7 @@ static int ms_conn_sock_init(ms_conn_t *c)
     if (ms_setting.rep_write_srv > 0)
     {
       /* for replication, each connection need connect all the server */
-      srv_idx= i;
+      srv_idx= i % ms_setting.srv_cnt;
     }
     else
     {
@@ -571,30 +567,15 @@ static int ms_conn_sock_init(ms_conn_t *c)
 static int ms_conn_event_init(ms_conn_t *c)
 {
   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
-  /* default event timeout 10 seconds */
-  struct timeval t=
-  {
-    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
-  };
   short event_flags= EV_WRITE | EV_PERSIST;
 
   event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
   event_base_set(ms_thread->base, &c->event);
   c->ev_flags= event_flags;
 
-  if (c->total_sfds == 1)
+  if (event_add(&c->event, NULL) == -1)
   {
-    if (event_add(&c->event, NULL) == -1)
-    {
-      return -1;
-    }
-  }
-  else
-  {
-    if (event_add(&c->event, &t) == -1)
-    {
-      return -1;
-    }
+    return -1;
   }
 
   return 0;
@@ -918,8 +899,8 @@ static int ms_reconn(ms_conn_t *c)
 
   if (ms_setting.rep_write_srv > 0)
   {
-    srv_idx= c->cur_idx;
-    srv_conn_cnt= (int)ms_setting.nconns;
+    srv_idx= c->cur_idx % ms_setting.srv_cnt;
+    srv_conn_cnt= (int)(ms_setting.sock_per_conn  * ms_setting.nconns);
   }
   else
   {
@@ -982,13 +963,13 @@ static int ms_reconn(ms_conn_t *c)
         break;
       }
 
-      if (c->total_sfds == 1)
+      if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
       {
         /* wait a second and reconnect */
         sleep(1);
       }
     }
-    while (c->total_sfds == 1);
+    while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
   }
 
   if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
@@ -1046,8 +1027,8 @@ int ms_reconn_socks(ms_conn_t *c)
 
       if (ms_setting.rep_write_srv > 0)
       {
-        srv_idx= i;
-        srv_conn_cnt= (int)ms_setting.nconns;
+        srv_idx= i % ms_setting.srv_cnt;
+        srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns);
       }
       else
       {
@@ -1307,6 +1288,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout)
   c->ctnwrite= false;
   c->rbytes= 0;
   c->rcurr= c->rbuf;
+  c->msgcurr = 0;
+  c->msgused = 0;
+  c->iovused = 0;
   ms_conn_set_state(c, conn_write);
   memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t));    /* replicate command state */
 
@@ -1864,7 +1848,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c)
 {
   assert(c != NULL);
   assert(c->rbytes >= c->rvbytes);
-  assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
+  assert(c->protocol == ascii_prot);
   if (c->rvbytes > 2)
   {
     assert(
@@ -2004,8 +1988,7 @@ static void ms_complete_nread(ms_conn_t *c)
 {
   assert(c != NULL);
   assert(c->rbytes >= c->rvbytes);
-  assert(c->protocol == ascii_udp_prot
-         || c->protocol == ascii_prot
+  assert(c->protocol == ascii_prot
          || c->protocol == binary_prot);
 
   if (c->protocol == binary_prot)
@@ -2419,12 +2402,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state)
  */
 static bool ms_update_event(ms_conn_t *c, const int new_flags)
 {
-  /* default event timeout 10 seconds */
-  struct timeval t=
-  {
-    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
-  };
-
   assert(c != NULL);
 
   struct event_base *base= c->event.ev_base;
@@ -2451,19 +2428,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags)
   event_base_set(base, &c->event);
   c->ev_flags= (short)new_flags;
 
-  if (c->total_sfds == 1)
-  {
-    if (event_add(&c->event, NULL) == -1)
-    {
-      return false;
-    }
-  }
-  else
+  if (event_add(&c->event, NULL) == -1)
   {
-    if (event_add(&c->event, &t) == -1)
-    {
-      return false;
-    }
+    return false;
   }
 
   return true;
@@ -2740,12 +2707,6 @@ void ms_event_handler(const int fd, const short which, void *arg)
   }
   assert(fd == c->sfd);
 
-  /* event timeout, close the current connection */
-  if (c->which == EV_TIMEOUT)
-  {
-    ms_conn_set_state(c, conn_closing);
-  }
-
   ms_drive_machine(c);
 
   /* wait for next event */
@@ -3048,15 +3009,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
  * @param c, pointer of the concurrency
  * @param item, pointer of task item which includes the object
  *            information
- * @param verify, whether do verification
  *
  * @return int, if success, return 0, else return -1
  */
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
 {
-  /* verify not supported yet */
-  UNUSED_ARGUMENT(verify);
-
   assert(c != NULL);
 
   c->currcmd.cmd= CMD_GET;
index d95191ece80c3881dbcf1ed99d525cc6c7fd91bd..a52a55c95ec62a6f3685e0a394ac092de27be49d 100644 (file)
@@ -30,8 +30,7 @@ extern "C" {
 #define UDP_MAX_SEND_PAYLOAD_SIZE    1400                 /* mtu size is 1500 */
 #define UDP_HEADER_SIZE              8                    /* UDP header size */
 #define MAX_SENDBUF_SIZE             (256 * 1024 * 1024)  /* Maximum socket buffer size */
-#define SOCK_WAIT_TIMEOUT            10                   /* maximum waiting time of UDP, 10s */
-#define EVENT_TIMEOUT                10                   /* maximum waiting time of event,10s */
+#define SOCK_WAIT_TIMEOUT            30                   /* maximum waiting time of UDP, 30s */
 #define MAX_UDP_PACKET               (1 << 16)            /* maximum UDP packets, 65536 */
 
 /* Initial size of the sendmsg() scatter/gather array. */
@@ -104,7 +103,6 @@ typedef struct udppkt
 enum protocol
 {
   ascii_prot = 3,           /* ASCII protocol */
-  ascii_udp_prot,           /* ASCII UDP protocol*/
   binary_prot,              /* binary protocol */
 };
 
@@ -229,7 +227,7 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item);
 
 
 /* used to send the get command to server */
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify);
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item);
 
 
 /* used to send the multi-get command to server */
index 87caca5dc1408ba422c5fec17c7e09ee1520dddd..540fed0e16c218aa3f6ffe015852df4a491aab00 100644 (file)
@@ -102,6 +102,7 @@ typedef struct global
 {
   /* synchronize lock */
   ms_sync_lock_t init_lock;
+  ms_sync_lock_t warmup_lock;
   ms_sync_lock_t run_lock;
 
   /* mutex for outputing error log synchronously when memslap crashes */
index 95af535c74c89e0212670c43c1b88acc1da8b745..eb5e40557ba512149a334d65ad9ea368a6668ed3 100644 (file)
@@ -342,7 +342,6 @@ static void ms_parse_cfg_file(char *cfg_file)
   FILE *f;
   size_t start_len, end_len;
   double proportion;
-  size_t frequence;
   char *line= NULL;
   size_t  read_len;
   ssize_t nread;
@@ -435,8 +434,8 @@ static void ms_parse_cfg_file(char *cfg_file)
 
         if (nread != EOF)
         {
-          if (sscanf(line, "%zu %zu %lf %zu", &start_len, &end_len,
-                     &proportion, &frequence) != 3)
+          if (sscanf(line, "%zu %zu %lf", &start_len, &end_len,
+                     &proportion) != 3)
           {
             conf_type= ms_get_conf_type(line);
             break;
@@ -479,11 +478,15 @@ static void ms_parse_cfg_file(char *cfg_file)
 
         if (nread != EOF)
         {
-          if (sscanf(line, "%d %lf\n", &cmd_type, &proportion) != 2)
+          if (sscanf(line, "%d %lf", &cmd_type, &proportion) != 2)
           {
             conf_type= ms_get_conf_type(line);
             break;
           }
+          if (cmd_type >= CMD_NULL)
+          {
+            continue;
+          }
           ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_type=
             cmd_type;
           ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_prop=
@@ -920,7 +923,7 @@ void ms_setting_init_pre()
 static void ms_setting_slapmode_init_post()
 {
   ms_setting.total_key_rng_cnt= KEY_RANGE_COUNT_INIT;
-  ms_setting.key_distr= 
+  ms_setting.key_distr=
     (ms_key_distr_t *)malloc((size_t)ms_setting.total_key_rng_cnt * sizeof(ms_key_distr_t));
 
   if (ms_setting.key_distr == NULL)
@@ -931,7 +934,7 @@ static void ms_setting_slapmode_init_post()
 
   ms_setting.total_val_rng_cnt= VALUE_RANGE_COUNT_INIT;
 
-  ms_setting.value_distr= 
+  ms_setting.value_distr=
     (ms_value_distr_t *)malloc((size_t)ms_setting.total_val_rng_cnt * sizeof( ms_value_distr_t));
 
   if (ms_setting.value_distr == NULL)
@@ -980,13 +983,6 @@ static void ms_setting_slapmode_init_post()
     exit(1);
   }
 
-  if ((ms_setting.udp
-       || ms_setting.facebook_test) && ms_setting.binary_prot)
-  {
-    fprintf(stderr, "Binary protocol doesn't support UDP now.\n");
-    exit(1);
-  }
-
   if ((ms_setting.rep_write_srv > 0) && (ms_setting.srv_cnt < 2))
   {
     fprintf(stderr, "Please specify 2 servers at least for replication\n");
@@ -1007,26 +1003,12 @@ static void ms_setting_slapmode_init_post()
     exit(1);
   }
 
-  if ((ms_setting.rep_write_srv > 0) && (ms_setting.sock_per_conn > 1))
-  {
-    fprintf(stderr, "Replication doesn't support multi-socks "
-                    "in one connection structure.\n");
-    exit(1);
-  }
-
   if (ms_setting.facebook_test && (ms_setting.rep_write_srv > 0))
   {
     fprintf(stderr, "facebook test couldn't work with replication.\n");
     exit(1);
   }
 
-  if (ms_setting.reconnect && (ms_setting.sock_per_conn > 1))
-  {
-    fprintf(stderr, "Reconnection doesn't support multi-socks "
-                    "in one connection structure.\n");
-    exit(1);
-  }
-
   ms_build_distr();
 
   /* initialize global character block */
index 33029b42cc35d74b29406ff547c2f65c3926df82..60e1f59c581a26a74469cc102766eb9003892422 100644 (file)
@@ -44,23 +44,6 @@ static void ms_signal_segv(int signum, siginfo_t *info, void *ptr)
   abort();
 }
 
-/* signal pipe reaches, this function will run */
-static void ms_signal_pipe(int signum, siginfo_t *info, void *ptr)
-{
-  UNUSED_ARGUMENT(signum);
-  UNUSED_ARGUMENT(info);
-  UNUSED_ARGUMENT(ptr);
-
-  pthread_mutex_lock(&ms_global.quit_mutex);
-  fprintf(stderr, "\tMemslap encountered a server error. Quitting...\n");
-  fprintf(stderr, "\tError info: SIGPIPE captured (from write?)\n");
-  fprintf(stderr,
-          "\tProbably a socket I/O error when the server is down.\n");
-  pthread_mutex_unlock(&ms_global.quit_mutex);
-  exit(1);
-} /* ms_signal_pipe */
-
-
 /* signal int reaches, this function will run */
 static void ms_signal_int(int signum, siginfo_t *info, void *ptr)
 {
@@ -104,16 +87,8 @@ int ms_setup_sigsegv(void)
  */
 int ms_setup_sigpipe(void)
 {
-  struct sigaction action_2;
-
-  memset(&action_2, 0, sizeof(action_2));
-  action_2.sa_sigaction= ms_signal_pipe;
-  action_2.sa_flags= SA_SIGINFO;
-  if (sigaction(SIGPIPE, &action_2, NULL) < 0)
-  {
-    perror("sigaction");
-    return 0;
-  }
+  /* ignore the SIGPIPE signal */
+  signal(SIGPIPE, SIG_IGN);
 
   return -1;
 } /* ms_setup_sigpipe */
index 5fbd7d1302c9ffb0d1e7b41c838596e89bcf3091..631c35ea1a7bf4ab883a4a7efb320973ffa684f9 100644 (file)
@@ -40,7 +40,7 @@
 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
-static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c);
+static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c);
 
 
 /* select next operation to do */
@@ -54,7 +54,7 @@ static void ms_kick_out_item(ms_task_item_t *item);
 
 
 /* miss rate adjustment */
-static bool ms_need_overwirte_item(ms_task_t *task);
+static bool ms_need_overwrite_item(ms_task_t *task);
 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
 
 
@@ -154,18 +154,10 @@ static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
  * @return ms_task_item_t*, the pointer of the previous item of
  *         set operation
  */
-static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c)
+static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c)
 {
-  if (c->set_cursor <= 0)
-  {
-    return &c->item_win[0];
-  }
-  else
-  {
-    return &c->item_win[(int64_t)-- c->set_cursor % c->win_size];
-  }
-} /* ms_get_pre_set_item */
-
+    return ms_get_next_get_item(c);
+} /* ms_get_random_overwrite_item */
 
 /**
  * According to the proportion of operations(get or set), select
@@ -301,7 +293,7 @@ static void ms_kick_out_item(ms_task_item_t *item)
  * @return bool, if need overwrite, return true, else return
  *         false
  */
-static bool ms_need_overwirte_item(ms_task_t *task)
+static bool ms_need_overwrite_item(ms_task_t *task)
 {
   ms_task_item_t *item= task->item;
 
@@ -355,7 +347,7 @@ static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
     /* If the current item is not a new item, kick it out */
     if (item->value_offset != INVALID_OFFSET)
     {
-      if (ms_need_overwirte_item(task))
+      if (ms_need_overwrite_item(task))
       {
         /* overwrite */
         task->overwrite_set++;
@@ -369,17 +361,23 @@ static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
     else            /* it's a new item */
     {
       /* need overwrite */
-      if (ms_need_overwirte_item(task))
+      if (ms_need_overwrite_item(task))
       {
-        item= ms_get_pre_set_item(c);
+        /**
+         *  overwrite not use the item with current set cursor, revert
+         *  set cursor.
+         */
+        c->set_cursor--;
+
+        item= ms_get_random_overwrite_item(c);
         if (item->value_offset != INVALID_OFFSET)
         {
           task->item= item;
           task->overwrite_set++;
         }
-        else                /* previous set item is a new item */
+        else                /* item is a new item */
         {
-          /* select the previous item to run, and cancel overwrite */
+          /* select the item to run, and cancel overwrite */
           task->item= item;
         }
       }
@@ -601,7 +599,7 @@ static void ms_warmup_server(ms_conn_t *c)
    */
   if (c->remain_warmup_num == -1)
   {
-    ms_send_signal(&ms_global.init_lock);
+    ms_send_signal(&ms_global.warmup_lock);
     c->remain_warmup_num--;       /* never run the if branch */
   }
 } /* ms_warmup_server */
@@ -629,7 +627,7 @@ static void ms_single_getset_task_sch(ms_conn_t *c)
     else if (task->cmd == CMD_GET)
     {
       assert(task->cmd == CMD_GET);
-      ms_mcd_get(c, item, task->verify);
+      ms_mcd_get(c, item);
     }
   }
 } /* ms_single_getset_task_sch */
index dbfc62a7965194bc49c9c92f0527e15a239054f2..60f8ef7947a2860a9b92b8d670da0f4f8364efef 100644 (file)
@@ -244,7 +244,7 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
  */
 static void *ms_worker_libevent(void *arg)
 {
-  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+  ms_thread_t *ms_thread= NULL;
   ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
 
   /**
@@ -264,6 +264,12 @@ static void *ms_worker_libevent(void *arg)
   /* each thread with a timer */
   ms_clock_handler(0, 0, 0);
 
+  pthread_mutex_lock(&ms_global.init_lock.lock);
+  ms_global.init_lock.count++;
+  pthread_cond_signal(&ms_global.init_lock.cond);
+  pthread_mutex_unlock(&ms_global.init_lock.lock);
+
+  ms_thread= pthread_getspecific(ms_thread_key);
   event_base_loop(ms_thread->base, 0);
 
   return NULL;