Small cleanup for for() to array.
[awesomized/libmemcached] / clients / ms_conn.c
index ad12a113b11f5015c7097d0e4345e9ea7bdc7ca6..c062dc2958df424eb1e165e0e6cf6ce33396033f 100644 (file)
@@ -12,6 +12,7 @@
 #include "config.h"
 
 #include <stdio.h>
+#include <inttypes.h>
 #include <limits.h>
 #include <sys/uio.h>
 #include <event.h>
 #include <arpa/inet.h>
 #include "ms_setting.h"
 #include "ms_thread.h"
+#include "ms_atomic.h"
+
+#ifdef linux
+/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
+ * optimize the conversion functions, but the prototypes generate warnings
+ * from gcc. The conversion methods isn't the bottleneck for my app, so
+ * just remove the warnings by undef'ing the optimization ..
+ */
+#undef ntohs
+#undef ntohl
+#undef htons
+#undef htonl
+#endif
 
 /* for network write */
 #define TRANSMIT_COMPLETE      0
 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
 
 /* global increasing counter, generating request id for UDP */
-static int udp_request_id= 0;
+static volatile uint32_t udp_request_id= 0;
 
-extern __thread ms_thread_t ms_thread;
+extern pthread_key_t ms_thread_key;
 
 /* generate upd request id */
-static int ms_get_udp_request_id(void);
+static uint32_t ms_get_udp_request_id(void);
 
 
 /* connect initialize */
@@ -165,9 +179,9 @@ uint64_t ms_get_key_prefix(void)
  *
  * @return an unique UDP request id
  */
-static int ms_get_udp_request_id(void)
+static uint32_t ms_get_udp_request_id(void)
 {
-  return __sync_fetch_and_add(&udp_request_id, 1);
+  return atomic_add_32_nv(&udp_request_id, 1);
 }
 
 
@@ -357,7 +371,7 @@ static int ms_conn_init(ms_conn_t *c,
 
   if (! (ms_setting.facebook_test && is_udp))
   {
-    __sync_fetch_and_add(&ms_stats.active_conns, 1);
+    atomic_add_32(&ms_stats.active_conns, 1);
   }
 
   return 0;
@@ -397,11 +411,12 @@ static void ms_warmup_num_init(ms_conn_t *c)
  */
 static int ms_item_win_init(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   int exp_cnt= 0;
 
   c->win_size= (int)ms_setting.win_size;
   c->set_cursor= 0;
-  c->exec_num= ms_thread.thread_ctx->exec_num_perconn;
+  c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
   c->remain_exec_num= c->exec_num;
 
   c->item_win= (ms_task_item_t *)malloc(
@@ -451,6 +466,7 @@ static int ms_item_win_init(ms_conn_t *c)
  */
 static int ms_conn_sock_init(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   int i;
   int ret_sfd;
   int srv_idx= 0;
@@ -469,7 +485,7 @@ static int ms_conn_sock_init(ms_conn_t *c)
     else
     {
       /* all the connections in a thread connects the same server */
-      srv_idx= ms_thread.thread_ctx->srv_idx;
+      srv_idx= ms_thread->thread_ctx->srv_idx;
     }
 
     if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
@@ -544,6 +560,7 @@ static int ms_conn_sock_init(ms_conn_t *c)
  */
 static int ms_conn_event_init(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   /* default event timeout 10 seconds */
   struct timeval t=
   {
@@ -552,7 +569,7 @@ static int ms_conn_event_init(ms_conn_t *c)
   short event_flags= EV_WRITE | EV_PERSIST;
 
   event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
-  event_base_set(ms_thread.base, &c->event);
+  event_base_set(ms_thread->base, &c->event);
   c->ev_flags= event_flags;
 
   if (c->total_sfds == 1)
@@ -615,6 +632,7 @@ int ms_setup_conn(ms_conn_t *c)
  */
 void ms_conn_free(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   if (c != NULL)
   {
     if (c->hdrbuf != NULL)
@@ -638,9 +656,9 @@ void ms_conn_free(ms_conn_t *c)
     if (c->tcpsfd != NULL)
       free(c->tcpsfd);
 
-    if (--ms_thread.nactive_conn == 0)
+    if (--ms_thread->nactive_conn == 0)
     {
-      free(ms_thread.conn);
+      free(ms_thread->conn);
     }
   }
 } /* ms_conn_free */
@@ -653,6 +671,7 @@ void ms_conn_free(ms_conn_t *c)
  */
 static void ms_conn_close(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   assert(c != NULL);
 
   /* delete the event, the socket and the connection */
@@ -672,7 +691,7 @@ static void ms_conn_close(ms_conn_t *c)
     close(c->udpsfd);
   }
 
-  __sync_fetch_and_sub(&ms_stats.active_conns, 1);
+  atomic_dec_32(&ms_stats.active_conns);
 
   ms_conn_free(c);
 
@@ -684,7 +703,7 @@ static void ms_conn_close(ms_conn_t *c)
     pthread_mutex_unlock(&ms_global.run_lock.lock);
   }
 
-  if (ms_thread.nactive_conn == 0)
+  if (ms_thread->nactive_conn == 0)
   {
     pthread_exit(NULL);
   }
@@ -883,8 +902,9 @@ static int ms_network_connect(ms_conn_t *c,
  */
 static int ms_reconn(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   int srv_idx= 0;
-  int srv_conn_cnt= 0;
+  int32_t srv_conn_cnt= 0;
 
   if (ms_setting.rep_write_srv > 0)
   {
@@ -893,7 +913,7 @@ static int ms_reconn(ms_conn_t *c)
   }
   else
   {
-    srv_idx= ms_thread.thread_ctx->srv_idx;
+    srv_idx= ms_thread->thread_ctx->srv_idx;
     srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
   }
 
@@ -901,8 +921,8 @@ static int ms_reconn(ms_conn_t *c)
   close(c->sfd);
   c->tcpsfd[c->cur_idx]= 0;
 
-  if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
-      % srv_conn_cnt == 0)
+  if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
+      % (uint32_t)srv_conn_cnt == 0)
   {
     gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
     fprintf(stderr, "Server %s:%d disconnect\n",
@@ -937,8 +957,8 @@ static int ms_reconn(ms_conn_t *c)
                              ms_setting.udp, &c->sfd) == 0)
       {
         c->tcpsfd[c->cur_idx]= c->sfd;
-        if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
-            % srv_conn_cnt == 0)
+        if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+            % (uint32_t)srv_conn_cnt == 0)
         {
           gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
           int reconn_time=
@@ -983,6 +1003,7 @@ static int ms_reconn(ms_conn_t *c)
  */
 int ms_reconn_socks(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   int srv_idx= 0;
   int ret_sfd= 0;
   int srv_conn_cnt= 0;
@@ -1020,7 +1041,7 @@ int ms_reconn_socks(ms_conn_t *c)
       }
       else
       {
-        srv_idx= ms_thread.thread_ctx->srv_idx;
+        srv_idx= ms_thread->thread_ctx->srv_idx;
         srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
       }
 
@@ -1031,8 +1052,8 @@ int ms_reconn_socks(ms_conn_t *c)
         c->tcpsfd[i]= ret_sfd;
         c->alive_sfds++;
 
-        if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
-            % srv_conn_cnt == 0)
+        if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+            % (uint32_t)srv_conn_cnt == 0)
         {
           gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
           int reconn_time=
@@ -1263,7 +1284,7 @@ void ms_reset_conn(ms_conn_t *c, bool timeout)
   {
     if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
     {
-      memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
+      memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
     }
 
     c->packets= 0;
@@ -1322,9 +1343,9 @@ static int ms_try_read_line(ms_conn_t *c)
 
       c->binary_header= *rsp;
       c->binary_header.response.extlen= rsp->response.extlen;
-      c->binary_header.response.keylen= ntohl(rsp->response.keylen);
+      c->binary_header.response.keylen= ntohs(rsp->response.keylen);
       c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
-      c->binary_header.response.status= ntohl(rsp->response.status);
+      c->binary_header.response.status= ntohs(rsp->response.status);
 
       if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
       {
@@ -1568,7 +1589,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len)
 
     if (res > 0)
     {
-      __sync_fetch_and_add(&ms_stats.bytes_read, res);
+      atomic_add_size(&ms_stats.bytes_read, res);
       c->rudpbytes+= res;
       rbytes+= res;
       if (res == avail)
@@ -1602,7 +1623,7 @@ static int ms_udp_read(ms_conn_t *c, char *buf, int len)
 
   if (copybytes == -1)
   {
-    __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
+    atomic_add_size(&ms_stats.pkt_disorder, 1);
   }
 
   return copybytes;
@@ -1680,7 +1701,7 @@ static int ms_try_read_network(ms_conn_t *c)
     {
       if (! c->udp)
       {
-        __sync_fetch_and_add(&ms_stats.bytes_read, res);
+        atomic_add_size(&ms_stats.bytes_read, res);
       }
       gotdata= 1;
       c->rbytes+= res;
@@ -1744,7 +1765,7 @@ static void ms_verify_value(ms_conn_t *c,
       if (curr_time.tv_sec - c->curr_task.item->client_time
           > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
       {
-        __sync_fetch_and_add(&ms_stats.exp_get, 1);
+        atomic_add_size(&ms_stats.exp_get, 1);
 
         if (ms_setting.verbose)
         {
@@ -1758,7 +1779,7 @@ static void ms_verify_value(ms_conn_t *c,
                   "\n<%d expire time verification failed, "
                   "object expired but get it now\n"
                   "\tkey len: %d\n"
-                  "\tkey: %lx %.*s\n"
+                  "\tkey: %" PRIx64 " %.*s\n"
                   "\tset time: %s current time: %s "
                   "diff time: %d expire time: %d\n"
                   "\texpected data: \n"
@@ -1785,14 +1806,14 @@ static void ms_verify_value(ms_conn_t *c,
       if ((c->curr_task.item->value_size != vlen)
           || (memcmp(orignval, value, (size_t)vlen) != 0))
       {
-        __sync_fetch_and_add(&ms_stats.vef_failed, 1);
+        atomic_add_size(&ms_stats.vef_failed, 1);
 
         if (ms_setting.verbose)
         {
           fprintf(stderr,
                   "\n<%d data verification failed\n"
                   "\tkey len: %d\n"
-                  "\tkey: %lx %.*s\n"
+                  "\tkey: %" PRIx64" %.*s\n"
                   "\texpected data len: %d\n"
                   "\texpected data: %.*s\n"
                   "\treceived data len: %d\n"
@@ -2004,7 +2025,7 @@ static int ms_add_msghdr(ms_conn_t *c)
   if (c->msgsize == c->msgused)
   {
     msg=
-      realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
+      realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
     if (! msg)
       return -1;
 
@@ -2057,7 +2078,7 @@ static int ms_ensure_iov_space(ms_conn_t *c)
   {
     int i, iovnum;
     struct iovec *new_iov= (struct iovec *)realloc(c->iov,
-                                                   ((uint64_t)c->iovsize
+                                                   ((size_t)c->iovsize
                                                     * 2)
                                                    * sizeof(struct iovec));
     if (! new_iov)
@@ -2185,7 +2206,7 @@ static int ms_build_udp_headers(ms_conn_t *c)
   hdr= c->hdrbuf;
   for (i= 0; i < c->msgused; i++)
   {
-    c->msglist[i].msg_iov[0].iov_base= hdr;
+    c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
     c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
     *hdr++= (unsigned char)(c->request_id / 256);
     *hdr++= (unsigned char)(c->request_id % 256);
@@ -2233,7 +2254,7 @@ static int ms_transmit(ms_conn_t *c)
     res= sendmsg(c->sfd, m, 0);
     if (res > 0)
     {
-      __sync_fetch_and_add(&ms_stats.bytes_written, res);
+      atomic_add_size(&ms_stats.bytes_written, res);
 
       /* We've written some of the data. Remove the completed
        *  iovec entries from the list of pending writes. */
@@ -2248,8 +2269,8 @@ static int ms_transmit(ms_conn_t *c)
        *  adjust it so the next write will do the rest. */
       if (res > 0)
       {
-        m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res;
-        m->msg_iov->iov_len-= (uint64_t)res;
+        m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
+        m->msg_iov->iov_len-= (size_t)res;
       }
       return TRANSMIT_INCOMPLETE;
     }
@@ -2451,6 +2472,7 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags)
  */
 static bool ms_need_yield(ms_conn_t *c)
 {
+  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
   int64_t tps= 0;
   int64_t time_diff= 0;
   struct timeval curr_time;
@@ -2459,13 +2481,13 @@ static bool ms_need_yield(ms_conn_t *c)
   if (ms_setting.expected_tps > 0)
   {
     gettimeofday(&curr_time, NULL);
-    time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time);
+    time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
     tps=
       (int64_t)((task->get_opt
                  + task->set_opt) / ((uint64_t)time_diff / 1000000));
 
     /* current throughput is greater than expected throughput */
-    if (tps > ms_thread.thread_ctx->tps_perconn)
+    if (tps > ms_thread->thread_ctx->tps_perconn)
     {
       return true;
     }
@@ -2975,9 +2997,9 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
     }
   }
 
-  __sync_fetch_and_add(&ms_stats.obj_bytes,
-                       item->key_size + item->value_size);
-  __sync_fetch_and_add(&ms_stats.cmd_set, 1);
+  atomic_add_size(&ms_stats.obj_bytes,
+                  item->key_size + item->value_size);
+  atomic_add_size(&ms_stats.cmd_set, 1);
 
   return 0;
 } /* ms_mcd_set */
@@ -3061,7 +3083,7 @@ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
     }
   }
 
-  __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+  atomic_add_size(&ms_stats.cmd_get, 1);
 
   return 0;
 } /* ms_mcd_get */
@@ -3160,7 +3182,7 @@ int ms_mcd_mlget(ms_conn_t *c)
   for (int i= 0; i < c->mlget_task.mlget_num; i++)
   {
     item= c->mlget_task.mlget_item[i].item;
-    __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+    atomic_add_size(&ms_stats.cmd_get, 1);
   }
 
   return 0;
@@ -3288,7 +3310,7 @@ static void ms_add_bin_header(ms_conn_t *c,
 
   header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
   header->request.opcode= (uint8_t)opcode;
-  header->request.keylen= htonl(key_len);
+  header->request.keylen= htons(key_len);
 
   header->request.extlen= (uint8_t)hdr_len;
   header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;