cppcheck: fix warnings
[awesomized/libmemcached] / clients / ms_conn.c
index ad12a113b11f5015c7097d0e4345e9ea7bdc7ca6..85e84739835b0c3af8b8709a189f5eefa438c79d 100644 (file)
@@ -9,17 +9,41 @@
  *
  */
 
-#include "config.h"
+#include "mem_config.h"
 
 #include <stdio.h>
+#include <inttypes.h>
 #include <limits.h>
 #include <sys/uio.h>
 #include <event.h>
 #include <fcntl.h>
 #include <netinet/tcp.h>
+#include <netinet/in.h>
 #include <arpa/inet.h>
+
+#if defined(HAVE_SYS_TIME_H)
+# include <sys/time.h>
+#endif
+
+#if defined(HAVE_TIME_H)
+# include <time.h>
+#endif
+
 #include "ms_setting.h"
 #include "ms_thread.h"
+#include "ms_atomic.h"
+
+#ifdef linux
+/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
+ * optimize the conversion functions, but the prototypes generate warnings
+ * from gcc. The conversion methods isn't the bottleneck for my app, so
+ * just remove the warnings by undef'ing the optimization ..
+ */
+#undef ntohs
+#undef ntohl
+#undef htons
+#undef htonl
+#endif
 
 /* for network write */
 #define TRANSMIT_COMPLETE      0
 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
 
 /* global increasing counter, generating request id for UDP */
-static int udp_request_id= 0;
+static volatile uint32_t udp_request_id= 0;
 
-extern __thread ms_thread_t ms_thread;
+extern pthread_key_t ms_thread_key;
 
 /* generate upd request id */
-static int ms_get_udp_request_id(void);
+static uint32_t ms_get_udp_request_id(void);
 
 
 /* connect initialize */
@@ -106,8 +130,8 @@ static int ms_transmit(ms_conn_t *c);
 static void ms_conn_shrink(ms_conn_t *c);
 static void ms_conn_set_state(ms_conn_t *c, int state);
 static bool ms_update_event(ms_conn_t *c, const int new_flags);
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
-static int ms_get_next_sock_index(ms_conn_t *c);
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
+static uint32_t ms_get_next_sock_index(ms_conn_t *c);
 static int ms_update_conn_sock_event(ms_conn_t *c);
 static bool ms_need_yield(ms_conn_t *c);
 static void ms_update_start_time(ms_conn_t *c);
@@ -165,9 +189,9 @@ uint64_t ms_get_key_prefix(void)
  *
  * @return an unique UDP request id
  */
-static int ms_get_udp_request_id(void)
+static uint32_t ms_get_udp_request_id(void)
 {
-  return __sync_fetch_and_add(&udp_request_id, 1);
+  return atomic_add_32_nv(&udp_request_id, 1);
 }
 
 
@@ -199,7 +223,7 @@ static void ms_task_init(ms_conn_t *c)
  * @param c, pointer of the concurrency
  * @param is_udp, whether it's udp
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
 {
@@ -235,7 +259,7 @@ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
     memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_conn_udp_init */
 
 
@@ -247,7 +271,7 @@ static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
  * @param read_buffer_size
  * @param is_udp, whether it's udp
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_conn_init(ms_conn_t *c,
                         const int init_state,
@@ -268,7 +292,7 @@ static int ms_conn_init(ms_conn_t *c,
   /* for replication, each connection need connect all the server */
   if (ms_setting.rep_write_srv > 0)
   {
-    c->total_sfds= ms_setting.srv_cnt;
+    c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
   }
   else
   {
@@ -333,14 +357,10 @@ static int ms_conn_init(ms_conn_t *c,
   c->mlget_task.mlget_num= 0;
   c->mlget_task.value_index= -1;         /* default invalid value */
 
-  if (ms_setting.binary_prot)
+  if (ms_setting.binary_prot_)
   {
     c->protocol= binary_prot;
   }
-  else if (is_udp)
-  {
-    c->protocol= ascii_udp_prot;
-  }
   else
   {
     c->protocol= ascii_prot;
@@ -357,10 +377,10 @@ static int ms_conn_init(ms_conn_t *c,
 
   if (! (ms_setting.facebook_test && is_udp))
   {
-    __sync_fetch_and_add(&ms_stats.active_conns, 1);
+    atomic_add_32(&ms_stats.active_conns, 1);
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_conn_init */
 
 
@@ -393,15 +413,16 @@ static void ms_warmup_num_init(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_item_win_init(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   int exp_cnt= 0;
 
   c->win_size= (int)ms_setting.win_size;
   c->set_cursor= 0;
-  c->exec_num= ms_thread.thread_ctx->exec_num_perconn;
+  c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
   c->remain_exec_num= c->exec_num;
 
   c->item_win= (ms_task_item_t *)malloc(
@@ -436,7 +457,7 @@ static int ms_item_win_init(ms_conn_t *c)
 
   ms_warmup_num_init(c);
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_item_win_init */
 
 
@@ -447,13 +468,14 @@ static int ms_item_win_init(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_conn_sock_init(ms_conn_t *c)
 {
-  int i;
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+  uint32_t i;
   int ret_sfd;
-  int srv_idx= 0;
+  uint32_t srv_idx= 0;
 
   assert(c != NULL);
   assert(c->tcpsfd != NULL);
@@ -464,12 +486,12 @@ static int ms_conn_sock_init(ms_conn_t *c)
     if (ms_setting.rep_write_srv > 0)
     {
       /* for replication, each connection need connect all the server */
-      srv_idx= i;
+      srv_idx= i % ms_setting.srv_cnt;
     }
     else
     {
       /* all the connections in a thread connects the same server */
-      srv_idx= ms_thread.thread_ctx->srv_idx;
+      srv_idx= ms_thread->thread_ctx->srv_idx;
     }
 
     if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
@@ -516,7 +538,7 @@ static int ms_conn_sock_init(ms_conn_t *c)
     }
     else
     {
-      for (int j= 0; j < i; j++)
+      for (uint32_t j= 0; j < i; j++)
       {
         close(c->tcpsfd[j]);
       }
@@ -530,7 +552,7 @@ static int ms_conn_sock_init(ms_conn_t *c)
     return -1;
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_conn_sock_init */
 
 
@@ -540,37 +562,23 @@ static int ms_conn_sock_init(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_conn_event_init(ms_conn_t *c)
 {
-  /* default event timeout 10 seconds */
-  struct timeval t=
-  {
-    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
-  };
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   short event_flags= EV_WRITE | EV_PERSIST;
 
   event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
-  event_base_set(ms_thread.base, &c->event);
+  event_base_set(ms_thread->base, &c->event);
   c->ev_flags= event_flags;
 
-  if (c->total_sfds == 1)
-  {
-    if (event_add(&c->event, NULL) == -1)
-    {
-      return -1;
-    }
-  }
-  else
+  if (event_add(&c->event, NULL) == -1)
   {
-    if (event_add(&c->event, &t) == -1)
-    {
-      return -1;
-    }
+    return -1;
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_conn_event_init */
 
 
@@ -580,7 +588,7 @@ static int ms_conn_event_init(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 int ms_setup_conn(ms_conn_t *c)
 {
@@ -604,7 +612,7 @@ int ms_setup_conn(ms_conn_t *c)
     return -1;
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_setup_conn */
 
 
@@ -615,6 +623,7 @@ int ms_setup_conn(ms_conn_t *c)
  */
 void ms_conn_free(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   if (c != NULL)
   {
     if (c->hdrbuf != NULL)
@@ -638,9 +647,9 @@ void ms_conn_free(ms_conn_t *c)
     if (c->tcpsfd != NULL)
       free(c->tcpsfd);
 
-    if (--ms_thread.nactive_conn == 0)
+    if (--ms_thread->nactive_conn == 0)
     {
-      free(ms_thread.conn);
+      free(ms_thread->conn);
     }
   }
 } /* ms_conn_free */
@@ -653,12 +662,13 @@ void ms_conn_free(ms_conn_t *c)
  */
 static void ms_conn_close(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   assert(c != NULL);
 
   /* delete the event, the socket and the connection */
   event_del(&c->event);
 
-  for (int i= 0; i < c->total_sfds; i++)
+  for (uint32_t i= 0; i < c->total_sfds; i++)
   {
     if (c->tcpsfd[i] > 0)
     {
@@ -672,7 +682,7 @@ static void ms_conn_close(ms_conn_t *c)
     close(c->udpsfd);
   }
 
-  __sync_fetch_and_sub(&ms_stats.active_conns, 1);
+  atomic_dec_32(&ms_stats.active_conns);
 
   ms_conn_free(c);
 
@@ -684,7 +694,7 @@ static void ms_conn_close(ms_conn_t *c)
     pthread_mutex_unlock(&ms_global.run_lock.lock);
   }
 
-  if (ms_thread.nactive_conn == 0)
+  if (ms_thread->nactive_conn == 0)
   {
     pthread_exit(NULL);
   }
@@ -696,7 +706,7 @@ static void ms_conn_close(ms_conn_t *c)
  *
  * @param ai, server address information
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_new_socket(struct addrinfo *ai)
 {
@@ -748,6 +758,7 @@ static void ms_maximize_sndbuf(const int sfd)
       max= avg - 1;
     }
   }
+  (void)last_good;
 } /* ms_maximize_sndbuf */
 
 
@@ -760,7 +771,7 @@ static void ms_maximize_sndbuf(const int sfd)
  * @param is_udp, whether it's udp
  * @param ret_sfd, the connected socket file descriptor
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_network_connect(ms_conn_t *c,
                               char *srv_host_name,
@@ -787,7 +798,11 @@ static int ms_network_connect(ms_conn_t *c,
    * that otherwise mess things up.
    */
   memset(&hints, 0, sizeof(hints));
+#ifdef AI_ADDRCONFIG
   hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
+#else
+  hints.ai_flags= AI_PASSIVE;
+#endif /* AI_ADDRCONFIG */
   if (is_udp)
   {
     hints.ai_protocol= IPPROTO_UDP;
@@ -879,21 +894,22 @@ static int ms_network_connect(ms_conn_t *c,
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_reconn(ms_conn_t *c)
 {
-  int srv_idx= 0;
-  int srv_conn_cnt= 0;
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+  uint32_t srv_idx= 0;
+  uint32_t srv_conn_cnt= 0;
 
   if (ms_setting.rep_write_srv > 0)
   {
-    srv_idx= c->cur_idx;
-    srv_conn_cnt= ms_setting.nconns;
+    srv_idx= c->cur_idx % ms_setting.srv_cnt;
+    srv_conn_cnt= ms_setting.sock_per_conn  * ms_setting.nconns;
   }
   else
   {
-    srv_idx= ms_thread.thread_ctx->srv_idx;
+    srv_idx= ms_thread->thread_ctx->srv_idx;
     srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
   }
 
@@ -901,7 +917,7 @@ static int ms_reconn(ms_conn_t *c)
   close(c->sfd);
   c->tcpsfd[c->cur_idx]= 0;
 
-  if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
+  if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
       % srv_conn_cnt == 0)
   {
     gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
@@ -912,7 +928,8 @@ static int ms_reconn(ms_conn_t *c)
 
   if (ms_setting.rep_write_srv > 0)
   {
-    int i= 0;
+    uint32_t i= 0;
+
     for (i= 0; i < c->total_sfds; i++)
     {
       if (c->tcpsfd[i] != 0)
@@ -937,8 +954,8 @@ static int ms_reconn(ms_conn_t *c)
                              ms_setting.udp, &c->sfd) == 0)
       {
         c->tcpsfd[c->cur_idx]= c->sfd;
-        if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
-            % srv_conn_cnt == 0)
+        if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+            % (uint32_t)srv_conn_cnt == 0)
         {
           gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
           int reconn_time=
@@ -952,13 +969,13 @@ static int ms_reconn(ms_conn_t *c)
         break;
       }
 
-      if (c->total_sfds == 1)
+      if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
       {
         /* wait a second and reconnect */
         sleep(1);
       }
     }
-    while (c->total_sfds == 1);
+    while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
   }
 
   if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
@@ -967,7 +984,7 @@ static int ms_reconn(ms_conn_t *c)
     c->alive_sfds--;
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_reconn */
 
 
@@ -979,23 +996,24 @@ static int ms_reconn(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 int ms_reconn_socks(ms_conn_t *c)
 {
-  int srv_idx= 0;
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+  uint32_t srv_idx= 0;
   int ret_sfd= 0;
-  int srv_conn_cnt= 0;
+  uint32_t srv_conn_cnt= 0;
   struct timeval cur_time;
 
   assert(c != NULL);
 
   if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
   {
-    return 0;
+    return EXIT_SUCCESS;
   }
 
-  for (int i= 0; i < c->total_sfds; i++)
+  for (uint32_t i= 0; i < c->total_sfds; i++)
   {
     if (c->tcpsfd[i] == 0)
     {
@@ -1015,12 +1033,12 @@ int ms_reconn_socks(ms_conn_t *c)
 
       if (ms_setting.rep_write_srv > 0)
       {
-        srv_idx= i;
-        srv_conn_cnt= ms_setting.nconns;
+        srv_idx= i % ms_setting.srv_cnt;
+        srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
       }
       else
       {
-        srv_idx= ms_thread.thread_ctx->srv_idx;
+        srv_idx= ms_thread->thread_ctx->srv_idx;
         srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
       }
 
@@ -1031,8 +1049,8 @@ int ms_reconn_socks(ms_conn_t *c)
         c->tcpsfd[i]= ret_sfd;
         c->alive_sfds++;
 
-        if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
-            % srv_conn_cnt == 0)
+        if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+            % (uint32_t)srv_conn_cnt == 0)
         {
           gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
           int reconn_time=
@@ -1047,7 +1065,7 @@ int ms_reconn_socks(ms_conn_t *c)
     }
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_reconn_socks */
 
 
@@ -1119,7 +1137,7 @@ static int ms_tokenize_command(char *command,
  * @param c, pointer of the concurrency
  * @param command, the string responded by server
  *
- * @return int, if the command completed return 0, else return
+ * @return int, if the command completed return EXIT_SUCCESS, else return
  *         -1
  */
 static int ms_ascii_process_line(ms_conn_t *c, char *command)
@@ -1142,7 +1160,12 @@ static int ms_ascii_process_line(ms_conn_t *c, char *command)
     {
       token_t tokens[MAX_TOKENS];
       ms_tokenize_command(command, tokens, MAX_TOKENS);
+      errno= 0;
       value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
+      if (errno != 0)
+      {
+        printf("<%d ERROR %s\n", c->sfd, strerror(errno));
+      }
       c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
 
       /*
@@ -1161,6 +1184,7 @@ static int ms_ascii_process_line(ms_conn_t *c, char *command)
 
   case 'O':   /* OK */
     c->currcmd.retstat= MCD_SUCCESS;
+    break;
 
   case 'S':                    /* STORED STATS SERVER_ERROR */
     if (buffer[2] == 'A')      /* STORED STATS */
@@ -1263,7 +1287,7 @@ void ms_reset_conn(ms_conn_t *c, bool timeout)
   {
     if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
     {
-      memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
+      memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
     }
 
     c->packets= 0;
@@ -1276,6 +1300,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout)
   c->ctnwrite= false;
   c->rbytes= 0;
   c->rcurr= c->rbuf;
+  c->msgcurr = 0;
+  c->msgused = 0;
+  c->iovused = 0;
   ms_conn_set_state(c, conn_write);
   memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t));    /* replicate command state */
 
@@ -1291,7 +1318,7 @@ void ms_reset_conn(ms_conn_t *c, bool timeout)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_try_read_line(ms_conn_t *c)
 {
@@ -1301,7 +1328,7 @@ static int ms_try_read_line(ms_conn_t *c)
     if ((uint64_t)c->rbytes < sizeof(c->binary_header))
     {
       /* need more data! */
-      return 0;
+      return EXIT_SUCCESS;
     }
     else
     {
@@ -1322,16 +1349,16 @@ static int ms_try_read_line(ms_conn_t *c)
 
       c->binary_header= *rsp;
       c->binary_header.response.extlen= rsp->response.extlen;
-      c->binary_header.response.keylen= ntohl(rsp->response.keylen);
+      c->binary_header.response.keylen= ntohs(rsp->response.keylen);
       c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
-      c->binary_header.response.status= ntohl(rsp->response.status);
+      c->binary_header.response.status= ntohs(rsp->response.status);
 
       if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
       {
         fprintf(stderr, "Invalid magic:  %x\n",
                 c->binary_header.response.magic);
         ms_conn_set_state(c, conn_closing);
-        return 0;
+        return EXIT_SUCCESS;
       }
 
       /* process this complete response */
@@ -1356,11 +1383,11 @@ static int ms_try_read_line(ms_conn_t *c)
     assert(c->rcurr <= (c->rbuf + c->rsize));
 
     if (c->rbytes == 0)
-      return 0;
+      return EXIT_SUCCESS;
 
     el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
     if (! el)
-      return 0;
+      return EXIT_SUCCESS;
 
     cont= el + 1;
     if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
@@ -1523,6 +1550,7 @@ static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
       break;
     }
   }
+  (void)packets;
 
   return wbytes == 0 ? -1 : wbytes;
 } /* ms_sort_udp_packet */
@@ -1568,7 +1596,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len)
 
     if (res > 0)
     {
-      __sync_fetch_and_add(&ms_stats.bytes_read, res);
+      atomic_add_size(&ms_stats.bytes_read, res);
       c->rudpbytes+= res;
       rbytes+= res;
       if (res == avail)
@@ -1602,7 +1630,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len)
 
   if (copybytes == -1)
   {
-    __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
+    atomic_add_size(&ms_stats.pkt_disorder, 1);
   }
 
   return copybytes;
@@ -1614,7 +1642,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len)
  * close.
  * before reading, move the remaining incomplete fragment of a command
  * (if any) to the beginning of the buffer.
- * return 0 if there's nothing to read on the first read.
+ * return EXIT_SUCCESS if there's nothing to read on the first read.
  */
 
 /**
@@ -1625,8 +1653,8 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len)
  * @param c, pointer of the concurrency
  *
  * @return int,
- *         return 0 if there's nothing to read on the first read.
- *         return 1 if get data
+ *         return EXIT_SUCCESS if there's nothing to read on the first read.
+ *         return EXIT_FAILURE if get data
  *         return -1 if error happens
  */
 static int ms_try_read_network(ms_conn_t *c)
@@ -1680,7 +1708,7 @@ static int ms_try_read_network(ms_conn_t *c)
     {
       if (! c->udp)
       {
-        __sync_fetch_and_add(&ms_stats.bytes_read, res);
+        atomic_add_size(&ms_stats.bytes_read, res);
       }
       gotdata= 1;
       c->rbytes+= res;
@@ -1744,7 +1772,7 @@ static void ms_verify_value(ms_conn_t *c,
       if (curr_time.tv_sec - c->curr_task.item->client_time
           > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
       {
-        __sync_fetch_and_add(&ms_stats.exp_get, 1);
+        atomic_add_size(&ms_stats.exp_get, 1);
 
         if (ms_setting.verbose)
         {
@@ -1758,7 +1786,7 @@ static void ms_verify_value(ms_conn_t *c,
                   "\n<%d expire time verification failed, "
                   "object expired but get it now\n"
                   "\tkey len: %d\n"
-                  "\tkey: %lx %.*s\n"
+                  "\tkey: %" PRIx64 " %.*s\n"
                   "\tset time: %s current time: %s "
                   "diff time: %d expire time: %d\n"
                   "\texpected data: \n"
@@ -1785,14 +1813,14 @@ static void ms_verify_value(ms_conn_t *c,
       if ((c->curr_task.item->value_size != vlen)
           || (memcmp(orignval, value, (size_t)vlen) != 0))
       {
-        __sync_fetch_and_add(&ms_stats.vef_failed, 1);
+        atomic_add_size(&ms_stats.vef_failed, 1);
 
         if (ms_setting.verbose)
         {
           fprintf(stderr,
                   "\n<%d data verification failed\n"
                   "\tkey len: %d\n"
-                  "\tkey: %lx %.*s\n"
+                  "\tkey: %" PRIx64" %.*s\n"
                   "\texpected data len: %d\n"
                   "\texpected data: %.*s\n"
                   "\treceived data len: %d\n"
@@ -1833,7 +1861,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c)
 {
   assert(c != NULL);
   assert(c->rbytes >= c->rvbytes);
-  assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
+  assert(c->protocol == ascii_prot);
   if (c->rvbytes > 2)
   {
     assert(
@@ -1973,8 +2001,7 @@ static void ms_complete_nread(ms_conn_t *c)
 {
   assert(c != NULL);
   assert(c->rbytes >= c->rvbytes);
-  assert(c->protocol == ascii_udp_prot
-         || c->protocol == ascii_prot
+  assert(c->protocol == ascii_prot
          || c->protocol == binary_prot);
 
   if (c->protocol == binary_prot)
@@ -1993,7 +2020,7 @@ static void ms_complete_nread(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_add_msghdr(ms_conn_t *c)
 {
@@ -2004,7 +2031,7 @@ static int ms_add_msghdr(ms_conn_t *c)
   if (c->msgsize == c->msgused)
   {
     msg=
-      realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
+      realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
     if (! msg)
       return -1;
 
@@ -2037,7 +2064,7 @@ static int ms_add_msghdr(ms_conn_t *c)
     return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_add_msghdr */
 
 
@@ -2047,7 +2074,7 @@ static int ms_add_msghdr(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_ensure_iov_space(ms_conn_t *c)
 {
@@ -2057,7 +2084,7 @@ static int ms_ensure_iov_space(ms_conn_t *c)
   {
     int i, iovnum;
     struct iovec *new_iov= (struct iovec *)realloc(c->iov,
-                                                   ((uint64_t)c->iovsize
+                                                   ((size_t)c->iovsize
                                                     * 2)
                                                    * sizeof(struct iovec));
     if (! new_iov)
@@ -2074,7 +2101,7 @@ static int ms_ensure_iov_space(ms_conn_t *c)
     }
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_ensure_iov_space */
 
 
@@ -2086,7 +2113,7 @@ static int ms_ensure_iov_space(ms_conn_t *c)
  * @param buf, the buffer includes data to send
  * @param len, the data length in the buffer
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
 {
@@ -2105,6 +2132,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
      */
     limit_to_mtu= c->udp;
 
+#ifdef IOV_MAX
     /* We may need to start a new msghdr if this one is full. */
     if ((m->msg_iovlen == IOV_MAX)
         || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
@@ -2112,6 +2140,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
       ms_add_msghdr(c);
       m= &c->msglist[c->msgused - 1];
     }
+#endif
 
     if (ms_ensure_iov_space(c) != 0)
       return -1;
@@ -2140,7 +2169,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
   }
   while (leftover > 0);
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_add_iov */
 
 
@@ -2149,7 +2178,7 @@ static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_build_udp_headers(ms_conn_t *c)
 {
@@ -2185,7 +2214,7 @@ static int ms_build_udp_headers(ms_conn_t *c)
   hdr= c->hdrbuf;
   for (i= 0; i < c->msgused; i++)
   {
-    c->msglist[i].msg_iov[0].iov_base= hdr;
+    c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
     c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
     *hdr++= (unsigned char)(c->request_id / 256);
     *hdr++= (unsigned char)(c->request_id % 256);
@@ -2200,7 +2229,7 @@ static int ms_build_udp_headers(ms_conn_t *c)
             + UDP_HEADER_SIZE));
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_build_udp_headers */
 
 
@@ -2233,7 +2262,7 @@ static int ms_transmit(ms_conn_t *c)
     res= sendmsg(c->sfd, m, 0);
     if (res > 0)
     {
-      __sync_fetch_and_add(&ms_stats.bytes_written, res);
+      atomic_add_size(&ms_stats.bytes_written, res);
 
       /* We've written some of the data. Remove the completed
        *  iovec entries from the list of pending writes. */
@@ -2248,8 +2277,8 @@ static int ms_transmit(ms_conn_t *c)
        *  adjust it so the next write will do the rest. */
       if (res > 0)
       {
-        m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res;
-        m->msg_iov->iov_len-= (uint64_t)res;
+        m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
+        m->msg_iov->iov_len-= (size_t)res;
       }
       return TRANSMIT_INCOMPLETE;
     }
@@ -2316,7 +2345,7 @@ static void ms_conn_shrink(ms_conn_t *c)
       && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
   {
     char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
-    if (new_rbuf)
+    if (new_rbuf)
     {
       c->rudpbuf= new_rbuf;
       c->rudpsize= UDP_DATA_BUFFER_SIZE;
@@ -2388,12 +2417,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state)
  */
 static bool ms_update_event(ms_conn_t *c, const int new_flags)
 {
-  /* default event timeout 10 seconds */
-  struct timeval t=
-  {
-    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
-  };
-
   assert(c != NULL);
 
   struct event_base *base= c->event.ev_base;
@@ -2420,19 +2443,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags)
   event_base_set(base, &c->event);
   c->ev_flags= (short)new_flags;
 
-  if (c->total_sfds == 1)
-  {
-    if (event_add(&c->event, NULL) == -1)
-    {
-      return false;
-    }
-  }
-  else
+  if (event_add(&c->event, NULL) == -1)
   {
-    if (event_add(&c->event, &t) == -1)
-    {
-      return false;
-    }
+    return false;
   }
 
   return true;
@@ -2451,6 +2464,7 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags)
  */
 static bool ms_need_yield(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   int64_t tps= 0;
   int64_t time_diff= 0;
   struct timeval curr_time;
@@ -2459,13 +2473,11 @@ static bool ms_need_yield(ms_conn_t *c)
   if (ms_setting.expected_tps > 0)
   {
     gettimeofday(&curr_time, NULL);
-    time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time);
-    tps=
-      (int64_t)((task->get_opt
-                 + task->set_opt) / ((uint64_t)time_diff / 1000000));
+    time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
+    tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000);
 
     /* current throughput is greater than expected throughput */
-    if (tps > ms_thread.thread_ctx->tps_perconn)
+    if (tps > ms_thread->thread_ctx->tps_perconn)
     {
       return true;
     }
@@ -2708,12 +2720,6 @@ void ms_event_handler(const int fd, const short which, void *arg)
   }
   assert(fd == c->sfd);
 
-  /* event timeout, close the current connection */
-  if (c->which == EV_TIMEOUT)
-  {
-    ms_conn_set_state(c, conn_closing);
-  }
-
   ms_drive_machine(c);
 
   /* wait for next event */
@@ -2726,16 +2732,16 @@ void ms_event_handler(const int fd, const short which, void *arg)
  * @param c, pointer of the concurrency
  * @param cmd, command(get or set )
  *
- * @return int, if success, return the index, else return 0
+ * @return int, if success, return the index, else return EXIT_SUCCESS
  */
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
 {
-  int sock_index= -1;
-  int i= 0;
+  uint32_t sock_index= 0;
+  uint32_t i= 0;
 
   if (c->total_sfds == 1)
   {
-    return 0;
+    return EXIT_SUCCESS;
   }
 
   if (ms_setting.rep_write_srv == 0)
@@ -2758,18 +2764,18 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
       if (i == ms_setting.rep_write_srv)
       {
         /* random get one replication server to read */
-        sock_index= (int)(random() % c->total_sfds);
+        sock_index= (uint32_t)random() % c->total_sfds;
       }
       else
       {
         /* random get one replication writing server to write */
-        sock_index= (int)(random() % ms_setting.rep_write_srv);
+        sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
       }
     }
     else if (cmd == CMD_GET)
     {
       /* random get one replication server to read */
-      sock_index= (int)(random() % c->total_sfds);
+      sock_index= (uint32_t)random() % c->total_sfds;
     }
   }
   while (c->tcpsfd[sock_index] == 0);
@@ -2785,9 +2791,9 @@ static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
  *
  * @return int, return the index
  */
-static int ms_get_next_sock_index(ms_conn_t *c)
+static uint32_t ms_get_next_sock_index(ms_conn_t *c)
 {
-  int sock_index= 0;
+  uint32_t sock_index= 0;
 
   do
   {
@@ -2804,7 +2810,7 @@ static int ms_get_next_sock_index(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_update_conn_sock_event(ms_conn_t *c)
 {
@@ -2868,7 +2874,7 @@ static int ms_update_conn_sock_event(ms_conn_t *c)
     }
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_update_conn_sock_event */
 
 
@@ -2880,7 +2886,7 @@ static int ms_update_conn_sock_event(ms_conn_t *c)
  * @param item, pointer of task item which includes the object
  *            information
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
 {
@@ -2888,13 +2894,14 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
   int write_len;
   char *buffer= c->wbuf;
 
-  write_len= sprintf(buffer,
-                     " %u %d %d\r\n",
-                     0,
-                     item->exp_time,
-                     item->value_size);
+  write_len= snprintf(buffer,
+                      c->wsize,
+                      " %u %d %d\r\n",
+                      0,
+                      item->exp_time,
+                      item->value_size);
 
-  if (write_len > c->wsize)
+  if (write_len > c->wsize || write_len < 0)
   {
     /* ought to be always enough. just fail for simplicity */
     fprintf(stderr, "output command line too long.\n");
@@ -2924,7 +2931,7 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
     return -1;
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_build_ascii_write_buf_set */
 
 
@@ -2935,7 +2942,7 @@ static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
  * @param item, pointer of task item which includes the object
  *            information
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
 {
@@ -2975,11 +2982,11 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
     }
   }
 
-  __sync_fetch_and_add(&ms_stats.obj_bytes,
-                       item->key_size + item->value_size);
-  __sync_fetch_and_add(&ms_stats.cmd_set, 1);
+  atomic_add_size(&ms_stats.obj_bytes,
+                  item->key_size + item->value_size);
+  atomic_add_size(&ms_stats.cmd_set, 1);
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_mcd_set */
 
 
@@ -2991,7 +2998,7 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
  * @param item, pointer of task item which includes the object
  *            information
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
 {
@@ -3006,7 +3013,7 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
     return -1;
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_build_ascii_write_buf_get */
 
 
@@ -3016,15 +3023,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
  * @param c, pointer of the concurrency
  * @param item, pointer of task item which includes the object
  *            information
- * @param verify, whether do verification
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
 {
-  /* verify not supported yet */
-  UNUSED_ARGUMENT(verify);
-
   assert(c != NULL);
 
   c->currcmd.cmd= CMD_GET;
@@ -3061,9 +3064,9 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
     }
   }
 
-  __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+  atomic_add_size(&ms_stats.cmd_get, 1);
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_mcd_get */
 
 
@@ -3073,7 +3076,7 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
 {
@@ -3104,7 +3107,7 @@ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
     return -1;
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_build_ascii_write_buf_mlget */
 
 
@@ -3113,7 +3116,7 @@ static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 int ms_mcd_mlget(ms_conn_t *c)
 {
@@ -3160,10 +3163,12 @@ int ms_mcd_mlget(ms_conn_t *c)
   for (int i= 0; i < c->mlget_task.mlget_num; i++)
   {
     item= c->mlget_task.mlget_item[i].item;
-    __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+    atomic_add_size(&ms_stats.cmd_get, 1);
   }
 
-  return 0;
+  (void)item;
+
+  return EXIT_SUCCESS;
 } /* ms_mcd_mlget */
 
 
@@ -3176,7 +3181,7 @@ int ms_mcd_mlget(ms_conn_t *c)
  *
  * @param c, pointer of the concurrency
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_bin_process_response(ms_conn_t *c)
 {
@@ -3192,7 +3197,7 @@ static int ms_bin_process_response(ms_conn_t *c)
   {
     c->rvbytes= (int32_t)bodylen;
     c->readval= true;
-    return 1;
+    return EXIT_FAILURE;
   }
   else
   {
@@ -3259,7 +3264,7 @@ static int ms_bin_process_response(ms_conn_t *c)
     }
   }
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_bin_process_response */
 
 
@@ -3288,11 +3293,11 @@ static void ms_add_bin_header(ms_conn_t *c,
 
   header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
   header->request.opcode= (uint8_t)opcode;
-  header->request.keylen= htonl(key_len);
+  header->request.keylen= htons(key_len);
 
   header->request.extlen= (uint8_t)hdr_len;
   header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
-  header->request.reserved= 0;
+  header->request.vbucket= 0;
 
   header->request.bodylen= htonl(body_len);
   header->request.opaque= 0;
@@ -3325,7 +3330,7 @@ static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
  * @param item, pointer of task item which includes the object
  *            information
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
 {
@@ -3357,7 +3362,7 @@ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
   }
   ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_build_bin_write_buf_set */
 
 
@@ -3369,7 +3374,7 @@ static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
  * @param item, pointer of task item which includes the object
  *            information
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
 {
@@ -3379,7 +3384,7 @@ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
                     (uint32_t)item->key_size);
   ms_add_key_to_iov(c, item);
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_build_bin_write_buf_get */
 
 
@@ -3391,7 +3396,7 @@ static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
  * @param item, pointer of task item which includes the object
  *            information
  *
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
  */
 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
 {
@@ -3415,5 +3420,5 @@ static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
 
   c->wcurr= c->wbuf;
 
-  return 0;
+  return EXIT_SUCCESS;
 } /* ms_build_bin_write_buf_mlget */