Add option -P and -T to memcapable
[m6w6/libmemcached] / clients / ms_conn.c
index c062dc2958df424eb1e165e0e6cf6ce33396033f..005879bdaaed7a994df109a7493fe82565a3fe0c 100644 (file)
 #include <fcntl.h>
 #include <netinet/tcp.h>
 #include <arpa/inet.h>
+#if TIME_WITH_SYS_TIME
+# include <sys/time.h>
+# include <time.h>
+#else
+# if HAVE_SYS_TIME_H
+#  include <sys/time.h>
+# else
+#  include <time.h>
+# endif
+#endif
 #include "ms_setting.h"
 #include "ms_thread.h"
 #include "ms_atomic.h"
@@ -120,8 +130,8 @@ static int ms_transmit(ms_conn_t *c);
 static void ms_conn_shrink(ms_conn_t *c);
 static void ms_conn_set_state(ms_conn_t *c, int state);
 static bool ms_update_event(ms_conn_t *c, const int new_flags);
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
-static int ms_get_next_sock_index(ms_conn_t *c);
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
+static uint32_t ms_get_next_sock_index(ms_conn_t *c);
 static int ms_update_conn_sock_event(ms_conn_t *c);
 static bool ms_need_yield(ms_conn_t *c);
 static void ms_update_start_time(ms_conn_t *c);
@@ -282,7 +292,7 @@ static int ms_conn_init(ms_conn_t *c,
   /* for replication, each connection need connect all the server */
   if (ms_setting.rep_write_srv > 0)
   {
-    c->total_sfds= ms_setting.srv_cnt;
+    c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
   }
   else
   {
@@ -351,10 +361,6 @@ static int ms_conn_init(ms_conn_t *c,
   {
     c->protocol= binary_prot;
   }
-  else if (is_udp)
-  {
-    c->protocol= ascii_udp_prot;
-  }
   else
   {
     c->protocol= ascii_prot;
@@ -467,9 +473,9 @@ static int ms_item_win_init(ms_conn_t *c)
 static int ms_conn_sock_init(ms_conn_t *c)
 {
   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
-  int i;
+  uint32_t i;
   int ret_sfd;
-  int srv_idx= 0;
+  uint32_t srv_idx= 0;
 
   assert(c != NULL);
   assert(c->tcpsfd != NULL);
@@ -480,7 +486,7 @@ static int ms_conn_sock_init(ms_conn_t *c)
     if (ms_setting.rep_write_srv > 0)
     {
       /* for replication, each connection need connect all the server */
-      srv_idx= i;
+      srv_idx= i % ms_setting.srv_cnt;
     }
     else
     {
@@ -532,7 +538,7 @@ static int ms_conn_sock_init(ms_conn_t *c)
     }
     else
     {
-      for (int j= 0; j < i; j++)
+      for (uint32_t j= 0; j < i; j++)
       {
         close(c->tcpsfd[j]);
       }
@@ -561,30 +567,15 @@ static int ms_conn_sock_init(ms_conn_t *c)
 static int ms_conn_event_init(ms_conn_t *c)
 {
   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
-  /* default event timeout 10 seconds */
-  struct timeval t=
-  {
-    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
-  };
   short event_flags= EV_WRITE | EV_PERSIST;
 
   event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
   event_base_set(ms_thread->base, &c->event);
   c->ev_flags= event_flags;
 
-  if (c->total_sfds == 1)
-  {
-    if (event_add(&c->event, NULL) == -1)
-    {
-      return -1;
-    }
-  }
-  else
+  if (event_add(&c->event, NULL) == -1)
   {
-    if (event_add(&c->event, &t) == -1)
-    {
-      return -1;
-    }
+    return -1;
   }
 
   return 0;
@@ -677,7 +668,7 @@ static void ms_conn_close(ms_conn_t *c)
   /* delete the event, the socket and the connection */
   event_del(&c->event);
 
-  for (int i= 0; i < c->total_sfds; i++)
+  for (uint32_t i= 0; i < c->total_sfds; i++)
   {
     if (c->tcpsfd[i] > 0)
     {
@@ -806,7 +797,11 @@ static int ms_network_connect(ms_conn_t *c,
    * that otherwise mess things up.
    */
   memset(&hints, 0, sizeof(hints));
+#ifdef AI_ADDRCONFIG
   hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
+#else
++  hints.ai_flags= AI_PASSIVE;
+#endif /* AI_ADDRCONFIG */
   if (is_udp)
   {
     hints.ai_protocol= IPPROTO_UDP;
@@ -903,13 +898,13 @@ static int ms_network_connect(ms_conn_t *c,
 static int ms_reconn(ms_conn_t *c)
 {
   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
-  int srv_idx= 0;
-  int32_t srv_conn_cnt= 0;
+  uint32_t srv_idx= 0;
+  uint32_t srv_conn_cnt= 0;
 
   if (ms_setting.rep_write_srv > 0)
   {
-    srv_idx= c->cur_idx;
-    srv_conn_cnt= ms_setting.nconns;
+    srv_idx= c->cur_idx % ms_setting.srv_cnt;
+    srv_conn_cnt= ms_setting.sock_per_conn  * ms_setting.nconns;
   }
   else
   {
@@ -922,7 +917,7 @@ static int ms_reconn(ms_conn_t *c)
   c->tcpsfd[c->cur_idx]= 0;
 
   if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
-      % (uint32_t)srv_conn_cnt == 0)
+      % srv_conn_cnt == 0)
   {
     gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
     fprintf(stderr, "Server %s:%d disconnect\n",
@@ -932,7 +927,8 @@ static int ms_reconn(ms_conn_t *c)
 
   if (ms_setting.rep_write_srv > 0)
   {
-    int i= 0;
+    uint32_t i= 0;
+
     for (i= 0; i < c->total_sfds; i++)
     {
       if (c->tcpsfd[i] != 0)
@@ -972,13 +968,13 @@ static int ms_reconn(ms_conn_t *c)
         break;
       }
 
-      if (c->total_sfds == 1)
+      if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
       {
         /* wait a second and reconnect */
         sleep(1);
       }
     }
-    while (c->total_sfds == 1);
+    while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
   }
 
   if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
@@ -1004,9 +1000,9 @@ static int ms_reconn(ms_conn_t *c)
 int ms_reconn_socks(ms_conn_t *c)
 {
   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
-  int srv_idx= 0;
+  uint32_t srv_idx= 0;
   int ret_sfd= 0;
-  int srv_conn_cnt= 0;
+  uint32_t srv_conn_cnt= 0;
   struct timeval cur_time;
 
   assert(c != NULL);
@@ -1016,7 +1012,7 @@ int ms_reconn_socks(ms_conn_t *c)
     return 0;
   }
 
-  for (int i= 0; i < c->total_sfds; i++)
+  for (uint32_t i= 0; i < c->total_sfds; i++)
   {
     if (c->tcpsfd[i] == 0)
     {
@@ -1036,8 +1032,8 @@ int ms_reconn_socks(ms_conn_t *c)
 
       if (ms_setting.rep_write_srv > 0)
       {
-        srv_idx= i;
-        srv_conn_cnt= ms_setting.nconns;
+        srv_idx= i % ms_setting.srv_cnt;
+        srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
       }
       else
       {
@@ -1297,6 +1293,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout)
   c->ctnwrite= false;
   c->rbytes= 0;
   c->rcurr= c->rbuf;
+  c->msgcurr = 0;
+  c->msgused = 0;
+  c->iovused = 0;
   ms_conn_set_state(c, conn_write);
   memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t));    /* replicate command state */
 
@@ -1854,7 +1853,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c)
 {
   assert(c != NULL);
   assert(c->rbytes >= c->rvbytes);
-  assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
+  assert(c->protocol == ascii_prot);
   if (c->rvbytes > 2)
   {
     assert(
@@ -1994,8 +1993,7 @@ static void ms_complete_nread(ms_conn_t *c)
 {
   assert(c != NULL);
   assert(c->rbytes >= c->rvbytes);
-  assert(c->protocol == ascii_udp_prot
-         || c->protocol == ascii_prot
+  assert(c->protocol == ascii_prot
          || c->protocol == binary_prot);
 
   if (c->protocol == binary_prot)
@@ -2409,12 +2407,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state)
  */
 static bool ms_update_event(ms_conn_t *c, const int new_flags)
 {
-  /* default event timeout 10 seconds */
-  struct timeval t=
-  {
-    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
-  };
-
   assert(c != NULL);
 
   struct event_base *base= c->event.ev_base;
@@ -2441,19 +2433,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags)
   event_base_set(base, &c->event);
   c->ev_flags= (short)new_flags;
 
-  if (c->total_sfds == 1)
-  {
-    if (event_add(&c->event, NULL) == -1)
-    {
-      return false;
-    }
-  }
-  else
+  if (event_add(&c->event, NULL) == -1)
   {
-    if (event_add(&c->event, &t) == -1)
-    {
-      return false;
-    }
+    return false;
   }
 
   return true;
@@ -2730,12 +2712,6 @@ void ms_event_handler(const int fd, const short which, void *arg)
   }
   assert(fd == c->sfd);
 
-  /* event timeout, close the current connection */
-  if (c->which == EV_TIMEOUT)
-  {
-    ms_conn_set_state(c, conn_closing);
-  }
-
   ms_drive_machine(c);
 
   /* wait for next event */
@@ -2750,10 +2726,10 @@ void ms_event_handler(const int fd, const short which, void *arg)
  *
  * @return int, if success, return the index, else return 0
  */
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
 {
-  int sock_index= -1;
-  int i= 0;
+  uint32_t sock_index= 0;
+  uint32_t i= 0;
 
   if (c->total_sfds == 1)
   {
@@ -2780,18 +2756,18 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
       if (i == ms_setting.rep_write_srv)
       {
         /* random get one replication server to read */
-        sock_index= (int)(random() % c->total_sfds);
+        sock_index= (uint32_t)random() % c->total_sfds;
       }
       else
       {
         /* random get one replication writing server to write */
-        sock_index= (int)(random() % ms_setting.rep_write_srv);
+        sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
       }
     }
     else if (cmd == CMD_GET)
     {
       /* random get one replication server to read */
-      sock_index= (int)(random() % c->total_sfds);
+      sock_index= (uint32_t)random() % c->total_sfds;
     }
   }
   while (c->tcpsfd[sock_index] == 0);
@@ -2807,9 +2783,9 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
  *
  * @return int, return the index
  */
-static int ms_get_next_sock_index(ms_conn_t *c)
+static uint32_t ms_get_next_sock_index(ms_conn_t *c)
 {
-  int sock_index= 0;
+  uint32_t sock_index= 0;
 
   do
   {
@@ -3038,15 +3014,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
  * @param c, pointer of the concurrency
  * @param item, pointer of task item which includes the object
  *            information
- * @param verify, whether do verification
  *
  * @return int, if success, return 0, else return -1
  */
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
 {
-  /* verify not supported yet */
-  UNUSED_ARGUMENT(verify);
-
   assert(c != NULL);
 
   c->currcmd.cmd= CMD_GET;