break;
case OPT_THREAD_NUMBER: /* --threads or -T */
- ms_setting.nthreads= (int)strtol(optarg, (char **) NULL, 10);
+ ms_setting.nthreads= (uint32_t)strtoul(optarg, (char **) NULL, 10);
if (ms_setting.nthreads <= 0)
{
fprintf(stderr, "Threads number must be greater than 0.:-)\n");
break;
case OPT_SOCK_PER_CONN: /* --conn_sock or -n */
- ms_setting.sock_per_conn= (int)strtol(optarg, (char **) NULL, 10);
+ ms_setting.sock_per_conn= (uint32_t)strtoul(optarg, (char **) NULL, 10);
if (ms_setting.sock_per_conn <= 0)
{
fprintf(stderr, "Number of socks of each concurrency "
break;
case OPT_REP_WRITE_SRV: /* --rep_write or -p */
- ms_setting.rep_write_srv= (int)strtol(optarg, (char **) NULL, 10);
+ ms_setting.rep_write_srv= (uint32_t)strtoul(optarg, (char **) NULL, 10);
if (ms_setting.rep_write_srv <= 0)
{
fprintf(stderr,
{
/* Wait all the connects complete warm up. */
pthread_mutex_lock(&ms_global.warmup_lock.lock);
- while (ms_global.warmup_lock.count < (int)ms_setting.nconns) {
+ while (ms_global.warmup_lock.count < ms_setting.nconns)
+ {
pthread_cond_wait(&ms_global.warmup_lock.cond, &ms_global.warmup_lock.lock);
}
pthread_mutex_unlock(&ms_global.warmup_lock.lock);
* We loop until we know that all connects have cleaned up.
*/
pthread_mutex_lock(&ms_global.run_lock.lock);
- while (ms_global.run_lock.count < (int)ms_setting.nconns)
+ while (ms_global.run_lock.count < ms_setting.nconns)
{
pthread_cond_wait(&ms_global.run_lock.cond, &ms_global.run_lock.lock);
}
static void ms_conn_shrink(ms_conn_t *c);
static void ms_conn_set_state(ms_conn_t *c, int state);
static bool ms_update_event(ms_conn_t *c, const int new_flags);
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
-static int ms_get_next_sock_index(ms_conn_t *c);
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
+static uint32_t ms_get_next_sock_index(ms_conn_t *c);
static int ms_update_conn_sock_event(ms_conn_t *c);
static bool ms_need_yield(ms_conn_t *c);
static void ms_update_start_time(ms_conn_t *c);
static int ms_conn_sock_init(ms_conn_t *c)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- int i;
+ uint32_t i;
int ret_sfd;
- int srv_idx= 0;
+ uint32_t srv_idx= 0;
assert(c != NULL);
assert(c->tcpsfd != NULL);
}
else
{
- for (int j= 0; j < i; j++)
+ for (uint32_t j= 0; j < i; j++)
{
close(c->tcpsfd[j]);
}
/* delete the event, the socket and the connection */
event_del(&c->event);
- for (int i= 0; i < c->total_sfds; i++)
+ for (uint32_t i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] > 0)
{
static int ms_reconn(ms_conn_t *c)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- int srv_idx= 0;
- int32_t srv_conn_cnt= 0;
+ uint32_t srv_idx= 0;
+ uint32_t srv_conn_cnt= 0;
if (ms_setting.rep_write_srv > 0)
{
srv_idx= c->cur_idx % ms_setting.srv_cnt;
- srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns);
+ srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
}
else
{
srv_idx= ms_thread->thread_ctx->srv_idx;
- srv_conn_cnt= (int32_t)((int)ms_setting.nconns / ms_setting.srv_cnt);
+ srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
/* close the old socket handler */
c->tcpsfd[c->cur_idx]= 0;
if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
- % (uint32_t)srv_conn_cnt == 0)
+ % srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
fprintf(stderr, "Server %s:%d disconnect\n",
if (ms_setting.rep_write_srv > 0)
{
- int i= 0;
+ uint32_t i= 0;
+
for (i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] != 0)
int ms_reconn_socks(ms_conn_t *c)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- int srv_idx= 0;
+ uint32_t srv_idx= 0;
int ret_sfd= 0;
- int srv_conn_cnt= 0;
+ uint32_t srv_conn_cnt= 0;
struct timeval cur_time;
assert(c != NULL);
return 0;
}
- for (int i= 0; i < c->total_sfds; i++)
+ for (uint32_t i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] == 0)
{
if (ms_setting.rep_write_srv > 0)
{
srv_idx= i % ms_setting.srv_cnt;
- srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns);
+ srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
}
else
{
srv_idx= ms_thread->thread_ctx->srv_idx;
- srv_conn_cnt= (int)ms_setting.nconns / ms_setting.srv_cnt;
+ srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
*
* @return int, if success, return the index, else return 0
*/
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
{
- int sock_index= -1;
- int i= 0;
+ uint32_t sock_index= 0;
+ uint32_t i= 0;
if (c->total_sfds == 1)
{
if (i == ms_setting.rep_write_srv)
{
/* random get one replication server to read */
- sock_index= (int)(random() % c->total_sfds);
+ sock_index= (uint32_t)(random() % c->total_sfds);
}
else
{
/* random get one replication writing server to write */
- sock_index= (int)(random() % ms_setting.rep_write_srv);
+ sock_index= (uint32_t)(random() % ms_setting.rep_write_srv);
}
}
else if (cmd == CMD_GET)
{
/* random get one replication server to read */
- sock_index= (int)(random() % c->total_sfds);
+ sock_index= (uint32_t)(random() % c->total_sfds);
}
}
while (c->tcpsfd[sock_index] == 0);
*
* @return int, return the index
*/
-static int ms_get_next_sock_index(ms_conn_t *c)
+static uint32_t ms_get_next_sock_index(ms_conn_t *c)
{
- int sock_index= 0;
+ uint32_t sock_index= 0;
do
{
*/
typedef struct conn
{
- int conn_idx; /* connection index in the thread */
+ uint32_t conn_idx; /* connection index in the thread */
int sfd; /* current tcp sock handler of the connection structure */
int udpsfd; /* current udp sock handler of the connection structure*/
int state; /* state of the connection */
bool change_sfd; /* whether change sfd */
int *tcpsfd; /* TCP sock array */
- int total_sfds; /* how many socks in the tcpsfd array */
- int alive_sfds; /* alive socks */
- int cur_idx; /* current sock index in tcpsfd array */
+ uint32_t total_sfds; /* how many socks in the tcpsfd array */
+ uint32_t alive_sfds; /* alive socks */
+ uint32_t cur_idx; /* current sock index in tcpsfd array */
ms_cmdstat_t precmd; /* previous command state */
ms_cmdstat_t currcmd; /* current command state */
int msgbytes; /* number of bytes in current msg */
/* data for UDP clients */
- int udp; /* is this is a UDP "connection" */
+ bool udp; /* is this is a UDP "connection" */
uint32_t request_id; /* UDP request ID of current operation, if this is a UDP "connection" */
uint8_t *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */
/* lock adapter */
typedef struct sync_lock
{
- int count;
+ uint32_t count;
pthread_mutex_t lock;
pthread_cond_t cond;
} ms_sync_lock_t;
/* read setting from configuration file */
static void ms_get_serverlist(char *str);
-static int ms_get_cpu_count(void);
+static uint32_t ms_get_cpu_count(void);
ms_conf_type_t ms_get_conf_type(char *line);
static int ms_is_line_data(char *line);
static int ms_read_is_data(char *line, ssize_t nread);
*
* @return return the cpu count if get, else return 1
*/
-static int ms_get_cpu_count()
+static uint32_t ms_get_cpu_count()
{
#ifdef HAVE__SC_NPROCESSORS_ONLN
return sysconf(_SC_NPROCESSORS_CONF);
/* global setting structure */
typedef struct setting
{
- int ncpu; /* cpu count of this system */
- int nthreads; /* total thread count, must equal or less than cpu cores */
+ uint32_t ncpu; /* cpu count of this system */
+ uint32_t nthreads; /* total thread count, must equal or less than cpu cores */
uint32_t nconns; /* total conn count, must multiply by total thread count */
int64_t exec_num; /* total execute number */
int run_time; /* total run time */
char *cfg_file; /* configure file name */
ms_mcd_server_t *servers; /* servers array */
- int total_srv_cnt; /* total servers count of the servers array */
- int srv_cnt; /* servers count */
+ uint32_t total_srv_cnt; /* total servers count of the servers array */
+ uint32_t srv_cnt; /* servers count */
ms_key_distr_t *key_distr; /* array of key distribution */
int total_key_rng_cnt; /* total key range count of the array */
bool reconnect; /* whether it reconnect when connection close */
bool verbose; /* whether it outputs detailed information when verification */
bool facebook_test; /* facebook test, TCP set and multi-get with UDP */
- int sock_per_conn; /* number of socks per connection structure */
+ uint32_t sock_per_conn; /* number of socks per connection structure */
bool binary_prot; /* whether it use binary protocol */
int expected_tps; /* expected throughput */
- int rep_write_srv; /* which servers are used to do replication writing */
+ uint32_t rep_write_srv; /* which servers are used to do replication writing */
} ms_setting_st;
extern ms_setting_st ms_setting;
static void ms_set_current_time(void);
static void ms_check_sock_timeout(void);
static void ms_clock_handler(const int fd, const short which, void *arg);
-static int ms_set_thread_cpu_affinity(int cpu);
+static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu);
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
static void *ms_worker_libevent(void *arg);
static void ms_create_worker(void *(*func)(void *), void *arg);
ms_conn_t *c= NULL;
int time_diff= 0;
- for (int i= 0; i < ms_thread->thread_ctx->nconns; i++)
+ for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
{
c= &ms_thread->conn[i];
static void ms_reconn_thread_socks(void)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- for (int i= 0; i < ms_thread->thread_ctx->nconns; i++)
+ for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
{
ms_reconn_socks(&ms_thread->conn[i]);
}
*
* @return if success, return 0, else return -1
*/
-static int ms_set_thread_cpu_affinity(int cpu)
+static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu)
{
- int ret= 0;
+ uint32_t ret= 0;
#ifdef HAVE_CPU_SET_T
cpu_set_t cpu_set;
}
memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
- for (int i= 0; i < thread_ctx->nconns; i++)
+ for (uint32_t i= 0; i < thread_ctx->nconns; i++)
{
ms_thread->conn[i].conn_idx= i;
if (ms_setup_conn(&ms_thread->conn[i]) != 0)
exit(1);
}
- for (int i= 0; i < ms_setting.nthreads; i++)
+ for (uint32_t i= 0; i < ms_setting.nthreads; i++)
{
ms_thread_ctx[i].thd_idx= i;
- ms_thread_ctx[i].nconns= (int)((int)ms_setting.nconns / ms_setting.nthreads);
+ ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
/**
* If only one server, all the connections in all threads
exit(1);
}
/* Create threads after we've done all the epoll setup. */
- for (int i= 0; i < ms_setting.nthreads; i++)
+ for (uint32_t i= 0; i < ms_setting.nthreads; i++)
{
ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
}
/* Used to store the context of each thread */
typedef struct thread_ctx
{
- int thd_idx; /* the thread index */
- int nconns; /* how many connections included by the thread */
- int srv_idx; /* index of the thread */
+ uint32_t thd_idx; /* the thread index */
+ uint32_t nconns; /* how many connections included by the thread */
+ uint32_t srv_idx; /* index of the thread */
int tps_perconn; /* expected throughput per connection */
int64_t exec_num_perconn; /* execute number per connection */
} ms_thread_ctx_t;
typedef struct thread
{
ms_conn_t *conn; /* conn array to store all the conn in the thread */
- int nactive_conn; /* how many connects are active */
+ uint32_t nactive_conn; /* how many connects are active */
ms_thread_ctx_t *thread_ctx; /* thread context from the caller */
struct event_base *base; /* libevent handler created by this thread */