*
*/
#include <getopt.h>
+#include <limits.h>
#include "ms_sigsegv.h"
#include "ms_setting.h"
second++;
if ((ms_setting.stat_freq > 0) && (second % ms_setting.stat_freq == 0)
- && (ms_stats.active_conns >= ms_setting.nconns))
+ && ((int32_t)ms_stats.active_conns >= ms_setting.nconns)
+ && (ms_stats.active_conns <= (uint32_t)INT_MAX))
{
ms_print_statistics(second);
}
#ifndef CLIENTS_MS_ATOMIC_H
#define CLIENTS_MS_ATOMIC_H
-#if defined(__SUNC)
+#if defined(__SUNPRO_C)
+# define _KERNEL
+# include <atomic.h>
+# undef _KERNEL
+#else
+# define atomic_add_8(X, Y) __sync_fetch_and_add((X), (Y))
+# define atomic_add_16(X, Y) __sync_fetch_and_add((X), (Y))
+# define atomic_add_32(X, Y) __sync_fetch_and_add((X), (Y))
+# define atomic_add_64(X, Y) __sync_fetch_and_add((X), (Y))
+# define atomic_dec_8(X) __sync_fetch_and_sub((X), 1)
+# define atomic_dec_16(X) __sync_fetch_and_sub((X), 1)
+# define atomic_dec_32(X) __sync_fetch_and_sub((X), 1)
+# define atomic_dec_64(X) __sync_fetch_and_sub((X), 1)
+/* The same as above, but these return the new value instead of void */
+# define atomic_add_8_nv(X, Y) __sync_fetch_and_add((X), (Y))
+# define atomic_add_16_nv(X, Y) __sync_fetch_and_add((X), (Y))
+# define atomic_add_32_nv(X, Y) __sync_fetch_and_add((X), (Y))
+# define atomic_add_64_nv(X, Y) __sync_fetch_and_add((X), (Y))
+# define atomic_dec_8_nv(X) __sync_fetch_and_sub((X), 1)
+# define atomic_dec_16_nv(X) __sync_fetch_and_sub((X), 1)
+# define atomic_dec_32_nv(X) __sync_fetch_and_sub((X), 1)
+# define atomic_dec_64_nv(X) __sync_fetch_and_sub((X), 1)
+#endif /* defined(__SUNPRO_C) */
-#define _KERNEL
-#include <atomic.h>
-#undef _KERNEL
-
-inline int32_t __sync_add_and_fetch(volatile int32_t* ptr, int32_t val)
-{
- (val == 1) ? atomic_inc_32((volatile uint32_t*)ptr) : atomic_add_32((volatile uint32_t*)ptr, val);
- return *ptr;
-}
-
-
-inline uint32_t __sync_sub_and_fetch(volatile uint32_t* ptr, uint32_t val)
-{
- (val == 1) ? atomic_dec_32(ptr) : atomic_add_32(ptr, 0-(int32_t)val);
- return *ptr;
-}
-
-#endif /* defined(__SUNC) */
#endif /* CLIENTS_MS_ATOMIC_H */
static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
/* global increasing counter, generating request id for UDP */
-static int32_t udp_request_id= 0;
+static volatile uint32_t udp_request_id= 0;
extern __thread ms_thread_t ms_thread;
/* generate upd request id */
-static int ms_get_udp_request_id(void);
+static uint32_t ms_get_udp_request_id(void);
/* connect initialize */
*
* @return an unique UDP request id
*/
-static int ms_get_udp_request_id(void)
+static uint32_t ms_get_udp_request_id(void)
{
- return __sync_fetch_and_add(&udp_request_id, 1);
+ return atomic_add_32_nv(&udp_request_id, 1);
}
if (! (ms_setting.facebook_test && is_udp))
{
- __sync_fetch_and_add(&ms_stats.active_conns, 1);
+ atomic_add_32(&ms_stats.active_conns, 1);
}
return 0;
close(c->udpsfd);
}
- __sync_fetch_and_sub(&ms_stats.active_conns, 1);
+ atomic_dec_32(&ms_stats.active_conns);
ms_conn_free(c);
static int ms_reconn(ms_conn_t *c)
{
int srv_idx= 0;
- int srv_conn_cnt= 0;
+ int32_t srv_conn_cnt= 0;
if (ms_setting.rep_write_srv > 0)
{
close(c->sfd);
c->tcpsfd[c->cur_idx]= 0;
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
+ if (atomic_add_32_nv((volatile uint32_t *)&ms_setting.servers[srv_idx].disconn_cnt, 1)
% srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
ms_setting.udp, &c->sfd) == 0)
{
c->tcpsfd[c->cur_idx]= c->sfd;
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+ if (atomic_add_32_nv((volatile uint32_t *)(&ms_setting.servers[srv_idx].reconn_cnt), 1)
% srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
c->tcpsfd[i]= ret_sfd;
c->alive_sfds++;
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+ if (atomic_add_32_nv((volatile uint32_t *)(&ms_setting.servers[srv_idx].reconn_cnt), 1)
% srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
if (res > 0)
{
- __sync_fetch_and_add(&ms_stats.bytes_read, res);
+ atomic_add_64(&ms_stats.bytes_read, res);
c->rudpbytes+= res;
rbytes+= res;
if (res == avail)
if (copybytes == -1)
{
- __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
+ atomic_add_64(&ms_stats.pkt_disorder, 1);
}
return copybytes;
{
if (! c->udp)
{
- __sync_fetch_and_add(&ms_stats.bytes_read, res);
+ atomic_add_64(&ms_stats.bytes_read, res);
}
gotdata= 1;
c->rbytes+= res;
if (curr_time.tv_sec - c->curr_task.item->client_time
> c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
{
- __sync_fetch_and_add(&ms_stats.exp_get, 1);
+ atomic_add_64(&ms_stats.exp_get, 1);
if (ms_setting.verbose)
{
if ((c->curr_task.item->value_size != vlen)
|| (memcmp(orignval, value, (size_t)vlen) != 0))
{
- __sync_fetch_and_add(&ms_stats.vef_failed, 1);
+ atomic_add_64(&ms_stats.vef_failed, 1);
if (ms_setting.verbose)
{
hdr= c->hdrbuf;
for (i= 0; i < c->msgused; i++)
{
- c->msglist[i].msg_iov[0].iov_base= hdr;
+ c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
*hdr++= (unsigned char)(c->request_id / 256);
*hdr++= (unsigned char)(c->request_id % 256);
res= sendmsg(c->sfd, m, 0);
if (res > 0)
{
- __sync_fetch_and_add(&ms_stats.bytes_written, res);
+ atomic_add_64(&ms_stats.bytes_written, res);
/* We've written some of the data. Remove the completed
* iovec entries from the list of pending writes. */
* adjust it so the next write will do the rest. */
if (res > 0)
{
- m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res;
+ m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
m->msg_iov->iov_len-= (uint64_t)res;
}
return TRANSMIT_INCOMPLETE;
}
}
- __sync_fetch_and_add(&ms_stats.obj_bytes,
- item->key_size + item->value_size);
- __sync_fetch_and_add(&ms_stats.cmd_set, 1);
+ atomic_add_64(&ms_stats.obj_bytes,
+ item->key_size + item->value_size);
+ atomic_add_64(&ms_stats.cmd_set, 1);
return 0;
} /* ms_mcd_set */
}
}
- __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+ atomic_add_64(&ms_stats.cmd_get, 1);
return 0;
} /* ms_mcd_get */
for (int i= 0; i < c->mlget_task.mlget_num; i++)
{
item= c->mlget_task.mlget_item[i].item;
- __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+ atomic_add_64(&ms_stats.cmd_get, 1);
}
return 0;
/* data for UDP clients */
int udp; /* is this is a UDP "connection" */
- int request_id; /* UDP request ID of current operation, if this is a UDP "connection" */
+ uint32_t request_id; /* UDP request ID of current operation, if this is a UDP "connection" */
uint8_t *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */
struct sockaddr srv_recv_addr; /* Sent the most recent request to which server */
/* global status statistic structure */
typedef struct stats
{
- int32_t active_conns; /* active connections */
+ volatile uint32_t active_conns; /* active connections */
uint64_t bytes_read; /* read bytes */
uint64_t bytes_written; /* written bytes */
uint64_t obj_bytes; /* object bytes */
uint64_t vef_failed; /* total objects of verification failed */
uint64_t unexp_unget; /* total objects which is unexpired but not get */
uint64_t exp_get; /* total objects which is expired but get */
- uint64_t pkt_disorder; /* disorder packages of UDP */
+ volatile uint64_t pkt_disorder; /* disorder packages of UDP */
uint64_t pkt_drop; /* packages dropped of UDP */
uint64_t udp_timeout; /* how many times timeout of UDP happens */
} ms_stats_t;
static void ms_setting_slapmode_init_pre(void);
static void ms_setting_slapmode_init_post(void);
+#if defined(__SUNPRO_C)
+#include <limits.h>
+static ssize_t getline (char **line, size_t *line_size, FILE *fp)
+{
+ char delim= '\n';
+ ssize_t result= 0;
+ size_t cur_len= 0;
+
+ if (line == NULL || line_size == NULL || fp == NULL)
+ {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (*line == NULL || *line_size == 0)
+ {
+ char *new_line;
+ *line_size = 120;
+ new_line= (char *) realloc (*line, *line_size);
+ if (new_line == NULL)
+ {
+ result= -1;
+ return result;
+ }
+ *line= new_line;
+ }
+
+ for (;;)
+ {
+ char i;
+
+ i = getc(fp);
+ if (i == EOF)
+ {
+ result = -1;
+ break;
+ }
+
+ /* Make enough space for len+1 (for final NUL) bytes. */
+ if (cur_len + 1 >= *line_size)
+ {
+ size_t needed_max=
+ SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
+ size_t needed= (2 * (*line_size)) + 1;
+ char *new_line;
+
+ if (needed_max < needed)
+ needed= needed_max;
+ if (cur_len + 1 >= needed)
+ {
+ result= -1;
+ errno= EOVERFLOW;
+ return result;
+ }
+
+ new_line= (char *)realloc(*line, needed);
+ if (new_line == NULL)
+ {
+ result= -1;
+ return result;
+ }
+
+ *line= new_line;
+ *line_size= needed;
+ }
+
+ (*line)[cur_len]= i;
+ cur_len++;
+
+ if (i == delim)
+ break;
+ }
+ (*line)[cur_len] = '\0';
+ result= cur_len ? cur_len : result;
+
+ return result;
+}
+#endif
/**
* parse the server list string, and build the servers
int srv_port; /* server port */
/* for calculating how long the server disconnects */
- int32_t disconn_cnt; /* number of disconnections count */
- int32_t reconn_cnt; /* number of reconnections count */
+ volatile int32_t disconn_cnt; /* number of disconnections count */
+ volatile int32_t reconn_cnt; /* number of reconnections count */
struct timeval disconn_time; /* start time of disconnection */
struct timeval reconn_time; /* end time of reconnection */
} ms_mcd_server_t;
{
int ncpu; /* cpu count of this system */
int nthreads; /* total thread count, must equal or less than cpu cores */
- int nconns; /* total conn count, must multiply by total thread count */
+ int nconns; /* total conn count, must multiply by total thread count */
int64_t exec_num; /* total execute number */
int run_time; /* total run time */
/* update get miss counter */
if (mlget_item->get_miss)
{
- __sync_fetch_and_add(&ms_stats.get_misses, 1);
+ atomic_add_64(&ms_stats.get_misses, 1);
}
/* get nothing from server for this task item */
if (curr_time.tv_sec - item->client_time
< item->exp_time - EXPIRE_TIME_ERROR)
{
- __sync_fetch_and_add(&ms_stats.unexp_unget, 1);
+ atomic_add_64(&ms_stats.unexp_unget, 1);
if (ms_setting.verbose)
{
}
else
{
- __sync_fetch_and_add(&ms_stats.vef_miss, 1);
+ atomic_add_64(&ms_stats.vef_miss, 1);
if (ms_setting.verbose)
{
/* update get miss counter */
if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
{
- __sync_fetch_and_add(&ms_stats.get_misses, 1);
+ atomic_add_64(&ms_stats.get_misses, 1);
}
/* get nothing from server for this task item */
if (curr_time.tv_sec - item->client_time
< item->exp_time - EXPIRE_TIME_ERROR)
{
- __sync_fetch_and_add(&ms_stats.unexp_unget, 1);
+ atomic_add_64(&ms_stats.unexp_unget, 1);
if (ms_setting.verbose)
{
}
else
{
- __sync_fetch_and_add(&ms_stats.vef_miss, 1);
+ atomic_add_64(&ms_stats.vef_miss, 1);
if (ms_setting.verbose)
{
/* calculate dropped packets count */
if (c->recvpkt > 0)
{
- __sync_fetch_and_add(&ms_stats.pkt_drop, c->packets - c->recvpkt);
+ atomic_add_64(&ms_stats.pkt_drop, c->packets - c->recvpkt);
}
- __sync_fetch_and_add(&ms_stats.udp_timeout, 1);
+ atomic_add_64(&ms_stats.udp_timeout, 1);
ms_reset_conn(c, true);
}
}
ms_thread.thread_ctx= thread_ctx;
ms_thread.nactive_conn= thread_ctx->nconns;
ms_thread.initialized= false;
- static int cnt= 0;
+ static volatile uint32_t cnt= 0;
gettimeofday(&ms_thread.startup_time, NULL);
ms_thread.base= event_init();
if (ms_thread.base == NULL)
{
- if (__sync_fetch_and_add(&cnt, 1) == 0)
+ if (atomic_add_32_nv(&cnt, 1) == 0)
{
fprintf(stderr, "Can't allocate event base.\n");
}
(ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
if (ms_thread.conn == NULL)
{
- if (__sync_fetch_and_add(&cnt, 1) == 0)
+ if (atomic_add_32_nv(&cnt, 1) == 0)
{
fprintf(
stderr,
if (ms_setup_conn(&ms_thread.conn[i]) != 0)
{
/* only output this error once */
- if (__sync_fetch_and_add(&cnt, 1) == 0)
+ if (atomic_add_32_nv(&cnt, 1) == 0)
{
fprintf(stderr, "Initializing connection failed.\n");
}