1.Fix bug that memslap can't start up(ms_thread=NULL). 2.ignore signal SIGPIPE. 3...
[m6w6/libmemcached] / clients / ms_conn.c
index b488401c126d6298919ad22cbb72f71bdb3c5327..32578b3e920323d34547b1d1133b44d30272eb3b 100644 (file)
@@ -292,7 +292,7 @@ static int ms_conn_init(ms_conn_t *c,
   /* for replication, each connection need connect all the server */
   if (ms_setting.rep_write_srv > 0)
   {
-    c->total_sfds= ms_setting.srv_cnt;
+    c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
   }
   else
   {
@@ -361,10 +361,6 @@ static int ms_conn_init(ms_conn_t *c,
   {
     c->protocol= binary_prot;
   }
-  else if (is_udp)
-  {
-    c->protocol= ascii_udp_prot;
-  }
   else
   {
     c->protocol= ascii_prot;
@@ -490,7 +486,7 @@ static int ms_conn_sock_init(ms_conn_t *c)
     if (ms_setting.rep_write_srv > 0)
     {
       /* for replication, each connection need connect all the server */
-      srv_idx= i;
+      srv_idx= i % ms_setting.srv_cnt;
     }
     else
     {
@@ -571,30 +567,15 @@ static int ms_conn_sock_init(ms_conn_t *c)
 static int ms_conn_event_init(ms_conn_t *c)
 {
   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
-  /* default event timeout 10 seconds */
-  struct timeval t=
-  {
-    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
-  };
   short event_flags= EV_WRITE | EV_PERSIST;
 
   event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
   event_base_set(ms_thread->base, &c->event);
   c->ev_flags= event_flags;
 
-  if (c->total_sfds == 1)
+  if (event_add(&c->event, NULL) == -1)
   {
-    if (event_add(&c->event, NULL) == -1)
-    {
-      return -1;
-    }
-  }
-  else
-  {
-    if (event_add(&c->event, &t) == -1)
-    {
-      return -1;
-    }
+    return -1;
   }
 
   return 0;
@@ -918,8 +899,8 @@ static int ms_reconn(ms_conn_t *c)
 
   if (ms_setting.rep_write_srv > 0)
   {
-    srv_idx= c->cur_idx;
-    srv_conn_cnt= (int)ms_setting.nconns;
+    srv_idx= c->cur_idx % ms_setting.srv_cnt;
+    srv_conn_cnt= (int)(ms_setting.sock_per_conn  * ms_setting.nconns);
   }
   else
   {
@@ -982,13 +963,13 @@ static int ms_reconn(ms_conn_t *c)
         break;
       }
 
-      if (c->total_sfds == 1)
+      if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
       {
         /* wait a second and reconnect */
         sleep(1);
       }
     }
-    while (c->total_sfds == 1);
+    while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
   }
 
   if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
@@ -1046,8 +1027,8 @@ int ms_reconn_socks(ms_conn_t *c)
 
       if (ms_setting.rep_write_srv > 0)
       {
-        srv_idx= i;
-        srv_conn_cnt= (int)ms_setting.nconns;
+        srv_idx= i % ms_setting.srv_cnt;
+        srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns);
       }
       else
       {
@@ -1307,6 +1288,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout)
   c->ctnwrite= false;
   c->rbytes= 0;
   c->rcurr= c->rbuf;
+  c->msgcurr = 0;
+  c->msgused = 0;
+  c->iovused = 0;
   ms_conn_set_state(c, conn_write);
   memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t));    /* replicate command state */
 
@@ -1864,7 +1848,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c)
 {
   assert(c != NULL);
   assert(c->rbytes >= c->rvbytes);
-  assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
+  assert(c->protocol == ascii_prot);
   if (c->rvbytes > 2)
   {
     assert(
@@ -2004,8 +1988,7 @@ static void ms_complete_nread(ms_conn_t *c)
 {
   assert(c != NULL);
   assert(c->rbytes >= c->rvbytes);
-  assert(c->protocol == ascii_udp_prot
-         || c->protocol == ascii_prot
+  assert(c->protocol == ascii_prot
          || c->protocol == binary_prot);
 
   if (c->protocol == binary_prot)
@@ -2419,12 +2402,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state)
  */
 static bool ms_update_event(ms_conn_t *c, const int new_flags)
 {
-  /* default event timeout 10 seconds */
-  struct timeval t=
-  {
-    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
-  };
-
   assert(c != NULL);
 
   struct event_base *base= c->event.ev_base;
@@ -2451,19 +2428,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags)
   event_base_set(base, &c->event);
   c->ev_flags= (short)new_flags;
 
-  if (c->total_sfds == 1)
-  {
-    if (event_add(&c->event, NULL) == -1)
-    {
-      return false;
-    }
-  }
-  else
+  if (event_add(&c->event, NULL) == -1)
   {
-    if (event_add(&c->event, &t) == -1)
-    {
-      return false;
-    }
+    return false;
   }
 
   return true;
@@ -2740,12 +2707,6 @@ void ms_event_handler(const int fd, const short which, void *arg)
   }
   assert(fd == c->sfd);
 
-  /* event timeout, close the current connection */
-  if (c->which == EV_TIMEOUT)
-  {
-    ms_conn_set_state(c, conn_closing);
-  }
-
   ms_drive_machine(c);
 
   /* wait for next event */
@@ -3048,15 +3009,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
  * @param c, pointer of the concurrency
  * @param item, pointer of task item which includes the object
  *            information
- * @param verify, whether do verification
  *
  * @return int, if success, return 0, else return -1
  */
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
 {
-  /* verify not supported yet */
-  UNUSED_ARGUMENT(verify);
-
   assert(c != NULL);
 
   c->currcmd.cmd= CMD_GET;