* http://www.schoonerinfotech.com/
*
*/
+
+#include "mem_config.h"
+
#include <stdio.h>
+#include <inttypes.h>
#include <limits.h>
#include <sys/uio.h>
#include <event.h>
#include <fcntl.h>
#include <netinet/tcp.h>
+#include <netinet/in.h>
#include <arpa/inet.h>
+
+#if defined(HAVE_SYS_TIME_H)
+# include <sys/time.h>
+#endif
+
+#if defined(HAVE_TIME_H)
+# include <time.h>
+#endif
+
#include "ms_setting.h"
#include "ms_thread.h"
+#include "ms_atomic.h"
+
+#ifdef linux
+/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
+ * optimize the conversion functions, but the prototypes generate warnings
+ * from gcc. The conversion methods isn't the bottleneck for my app, so
+ * just remove the warnings by undef'ing the optimization ..
+ */
+#undef ntohs
+#undef ntohl
+#undef htons
+#undef htonl
+#endif
/* for network write */
#define TRANSMIT_COMPLETE 0
static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
/* global increasing counter, generating request id for UDP */
-static int udp_request_id= 0;
+static volatile uint32_t udp_request_id= 0;
-extern __thread ms_thread_t ms_thread;
+extern pthread_key_t ms_thread_key;
/* generate upd request id */
-static int ms_get_udp_request_id(void);
+static uint32_t ms_get_udp_request_id(void);
/* connect initialize */
static void ms_conn_shrink(ms_conn_t *c);
static void ms_conn_set_state(ms_conn_t *c, int state);
static bool ms_update_event(ms_conn_t *c, const int new_flags);
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
-static int ms_get_next_sock_index(ms_conn_t *c);
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
+static uint32_t ms_get_next_sock_index(ms_conn_t *c);
static int ms_update_conn_sock_event(ms_conn_t *c);
static bool ms_need_yield(ms_conn_t *c);
static void ms_update_start_time(ms_conn_t *c);
*
* @return an unique UDP request id
*/
-static int ms_get_udp_request_id(void)
+static uint32_t ms_get_udp_request_id(void)
{
- return __sync_fetch_and_add(&udp_request_id, 1);
+ return atomic_add_32_nv(&udp_request_id, 1);
}
* @param c, pointer of the concurrency
* @param is_udp, whether it's udp
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
{
memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_conn_udp_init */
* @param read_buffer_size
* @param is_udp, whether it's udp
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_conn_init(ms_conn_t *c,
const int init_state,
/* for replication, each connection need connect all the server */
if (ms_setting.rep_write_srv > 0)
{
- c->total_sfds= ms_setting.srv_cnt;
+ c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
}
else
{
c->mlget_task.mlget_num= 0;
c->mlget_task.value_index= -1; /* default invalid value */
- if (ms_setting.binary_prot)
+ if (ms_setting.binary_prot_)
{
c->protocol= binary_prot;
}
- else if (is_udp)
- {
- c->protocol= ascii_udp_prot;
- }
else
{
c->protocol= ascii_prot;
if (! (ms_setting.facebook_test && is_udp))
{
- __sync_fetch_and_add(&ms_stats.active_conns, 1);
+ atomic_add_32(&ms_stats.active_conns, 1);
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_conn_init */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_item_win_init(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
int exp_cnt= 0;
c->win_size= (int)ms_setting.win_size;
c->set_cursor= 0;
- c->exec_num= ms_thread.thread_ctx->exec_num_perconn;
+ c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
c->remain_exec_num= c->exec_num;
c->item_win= (ms_task_item_t *)malloc(
ms_warmup_num_init(c);
- return 0;
+ return EXIT_SUCCESS;
} /* ms_item_win_init */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_conn_sock_init(ms_conn_t *c)
{
- int i;
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ uint32_t i;
int ret_sfd;
- int srv_idx= 0;
+ uint32_t srv_idx= 0;
assert(c != NULL);
assert(c->tcpsfd != NULL);
if (ms_setting.rep_write_srv > 0)
{
/* for replication, each connection need connect all the server */
- srv_idx= i;
+ srv_idx= i % ms_setting.srv_cnt;
}
else
{
/* all the connections in a thread connects the same server */
- srv_idx= ms_thread.thread_ctx->srv_idx;
+ srv_idx= ms_thread->thread_ctx->srv_idx;
}
if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
}
else
{
- for (int j= 0; j < i; j++)
+ for (uint32_t j= 0; j < i; j++)
{
close(c->tcpsfd[j]);
}
return -1;
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_conn_sock_init */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_conn_event_init(ms_conn_t *c)
{
- /* default event timeout 10 seconds */
- struct timeval t=
- {
- .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
- };
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
short event_flags= EV_WRITE | EV_PERSIST;
event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
- event_base_set(ms_thread.base, &c->event);
+ event_base_set(ms_thread->base, &c->event);
c->ev_flags= event_flags;
- if (c->total_sfds == 1)
- {
- if (event_add(&c->event, NULL) == -1)
- {
- return -1;
- }
- }
- else
+ if (event_add(&c->event, NULL) == -1)
{
- if (event_add(&c->event, &t) == -1)
- {
- return -1;
- }
+ return -1;
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_conn_event_init */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
int ms_setup_conn(ms_conn_t *c)
{
return -1;
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_setup_conn */
*/
void ms_conn_free(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
if (c != NULL)
{
if (c->hdrbuf != NULL)
if (c->tcpsfd != NULL)
free(c->tcpsfd);
- if (--ms_thread.nactive_conn == 0)
+ if (--ms_thread->nactive_conn == 0)
{
- free(ms_thread.conn);
+ free(ms_thread->conn);
}
}
} /* ms_conn_free */
*/
static void ms_conn_close(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
assert(c != NULL);
/* delete the event, the socket and the connection */
event_del(&c->event);
- for (int i= 0; i < c->total_sfds; i++)
+ for (uint32_t i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] > 0)
{
close(c->udpsfd);
}
- __sync_fetch_and_sub(&ms_stats.active_conns, 1);
+ atomic_dec_32(&ms_stats.active_conns);
ms_conn_free(c);
pthread_mutex_unlock(&ms_global.run_lock.lock);
}
- if (ms_thread.nactive_conn == 0)
+ if (ms_thread->nactive_conn == 0)
{
pthread_exit(NULL);
}
*
* @param ai, server address information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_new_socket(struct addrinfo *ai)
{
max= avg - 1;
}
}
+ (void)last_good;
} /* ms_maximize_sndbuf */
* @param is_udp, whether it's udp
* @param ret_sfd, the connected socket file descriptor
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_network_connect(ms_conn_t *c,
char *srv_host_name,
* that otherwise mess things up.
*/
memset(&hints, 0, sizeof(hints));
+#ifdef AI_ADDRCONFIG
hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
+#else
+ hints.ai_flags= AI_PASSIVE;
+#endif /* AI_ADDRCONFIG */
if (is_udp)
{
hints.ai_protocol= IPPROTO_UDP;
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_reconn(ms_conn_t *c)
{
- int srv_idx= 0;
- int srv_conn_cnt= 0;
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ uint32_t srv_idx= 0;
+ uint32_t srv_conn_cnt= 0;
if (ms_setting.rep_write_srv > 0)
{
- srv_idx= c->cur_idx;
- srv_conn_cnt= ms_setting.nconns;
+ srv_idx= c->cur_idx % ms_setting.srv_cnt;
+ srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
}
else
{
- srv_idx= ms_thread.thread_ctx->srv_idx;
+ srv_idx= ms_thread->thread_ctx->srv_idx;
srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
close(c->sfd);
c->tcpsfd[c->cur_idx]= 0;
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
+ if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
% srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
if (ms_setting.rep_write_srv > 0)
{
- int i= 0;
+ uint32_t i= 0;
+
for (i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] != 0)
ms_setting.udp, &c->sfd) == 0)
{
c->tcpsfd[c->cur_idx]= c->sfd;
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
- % srv_conn_cnt == 0)
+ if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+ % (uint32_t)srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
int reconn_time=
break;
}
- if (c->total_sfds == 1)
+ if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
{
/* wait a second and reconnect */
sleep(1);
}
}
- while (c->total_sfds == 1);
+ while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
}
if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
c->alive_sfds--;
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_reconn */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
int ms_reconn_socks(ms_conn_t *c)
{
- int srv_idx= 0;
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ uint32_t srv_idx= 0;
int ret_sfd= 0;
- int srv_conn_cnt= 0;
+ uint32_t srv_conn_cnt= 0;
struct timeval cur_time;
assert(c != NULL);
if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
{
- return 0;
+ return EXIT_SUCCESS;
}
- for (int i= 0; i < c->total_sfds; i++)
+ for (uint32_t i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] == 0)
{
if (ms_setting.rep_write_srv > 0)
{
- srv_idx= i;
- srv_conn_cnt= ms_setting.nconns;
+ srv_idx= i % ms_setting.srv_cnt;
+ srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
}
else
{
- srv_idx= ms_thread.thread_ctx->srv_idx;
+ srv_idx= ms_thread->thread_ctx->srv_idx;
srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
c->tcpsfd[i]= ret_sfd;
c->alive_sfds++;
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
- % srv_conn_cnt == 0)
+ if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+ % (uint32_t)srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
int reconn_time=
}
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_reconn_socks */
* @param c, pointer of the concurrency
* @param command, the string responded by server
*
- * @return int, if the command completed return 0, else return
+ * @return int, if the command completed return EXIT_SUCCESS, else return
* -1
*/
static int ms_ascii_process_line(ms_conn_t *c, char *command)
{
token_t tokens[MAX_TOKENS];
ms_tokenize_command(command, tokens, MAX_TOKENS);
+ errno= 0;
value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
+ if (errno != 0)
+ {
+ printf("<%d ERROR %s\n", c->sfd, strerror(errno));
+ }
c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
/*
{
if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
{
- memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
+ memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
}
c->packets= 0;
c->ctnwrite= false;
c->rbytes= 0;
c->rcurr= c->rbuf;
+ c->msgcurr = 0;
+ c->msgused = 0;
+ c->iovused = 0;
ms_conn_set_state(c, conn_write);
memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_try_read_line(ms_conn_t *c)
{
if ((uint64_t)c->rbytes < sizeof(c->binary_header))
{
/* need more data! */
- return 0;
+ return EXIT_SUCCESS;
}
else
{
c->binary_header= *rsp;
c->binary_header.response.extlen= rsp->response.extlen;
- c->binary_header.response.keylen= ntohl(rsp->response.keylen);
+ c->binary_header.response.keylen= ntohs(rsp->response.keylen);
c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
- c->binary_header.response.status= ntohl(rsp->response.status);
+ c->binary_header.response.status= ntohs(rsp->response.status);
if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
{
fprintf(stderr, "Invalid magic: %x\n",
c->binary_header.response.magic);
ms_conn_set_state(c, conn_closing);
- return 0;
+ return EXIT_SUCCESS;
}
/* process this complete response */
assert(c->rcurr <= (c->rbuf + c->rsize));
if (c->rbytes == 0)
- return 0;
+ return EXIT_SUCCESS;
el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
if (! el)
- return 0;
+ return EXIT_SUCCESS;
cont= el + 1;
if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
break;
}
}
+ (void)packets;
return wbytes == 0 ? -1 : wbytes;
} /* ms_sort_udp_packet */
if (res > 0)
{
- __sync_fetch_and_add(&ms_stats.bytes_read, res);
+ atomic_add_size(&ms_stats.bytes_read, res);
c->rudpbytes+= res;
rbytes+= res;
if (res == avail)
if (copybytes == -1)
{
- __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
+ atomic_add_size(&ms_stats.pkt_disorder, 1);
}
return copybytes;
* close.
* before reading, move the remaining incomplete fragment of a command
* (if any) to the beginning of the buffer.
- * return 0 if there's nothing to read on the first read.
+ * return EXIT_SUCCESS if there's nothing to read on the first read.
*/
/**
* @param c, pointer of the concurrency
*
* @return int,
- * return 0 if there's nothing to read on the first read.
- * return 1 if get data
+ * return EXIT_SUCCESS if there's nothing to read on the first read.
+ * return EXIT_FAILURE if get data
* return -1 if error happens
*/
static int ms_try_read_network(ms_conn_t *c)
{
if (! c->udp)
{
- __sync_fetch_and_add(&ms_stats.bytes_read, res);
+ atomic_add_size(&ms_stats.bytes_read, res);
}
gotdata= 1;
c->rbytes+= res;
if (curr_time.tv_sec - c->curr_task.item->client_time
> c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
{
- __sync_fetch_and_add(&ms_stats.exp_get, 1);
+ atomic_add_size(&ms_stats.exp_get, 1);
if (ms_setting.verbose)
{
"\n<%d expire time verification failed, "
"object expired but get it now\n"
"\tkey len: %d\n"
- "\tkey: %lx %.*s\n"
+ "\tkey: %" PRIx64 " %.*s\n"
"\tset time: %s current time: %s "
"diff time: %d expire time: %d\n"
"\texpected data: \n"
if ((c->curr_task.item->value_size != vlen)
|| (memcmp(orignval, value, (size_t)vlen) != 0))
{
- __sync_fetch_and_add(&ms_stats.vef_failed, 1);
+ atomic_add_size(&ms_stats.vef_failed, 1);
if (ms_setting.verbose)
{
fprintf(stderr,
"\n<%d data verification failed\n"
"\tkey len: %d\n"
- "\tkey: %lx %.*s\n"
+ "\tkey: %" PRIx64" %.*s\n"
"\texpected data len: %d\n"
"\texpected data: %.*s\n"
"\treceived data len: %d\n"
{
assert(c != NULL);
assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
+ assert(c->protocol == ascii_prot);
if (c->rvbytes > 2)
{
assert(
{
assert(c != NULL);
assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == ascii_udp_prot
- || c->protocol == ascii_prot
+ assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);
if (c->protocol == binary_prot)
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_add_msghdr(ms_conn_t *c)
{
if (c->msgsize == c->msgused)
{
msg=
- realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
+ realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
if (! msg)
return -1;
return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_add_msghdr */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_ensure_iov_space(ms_conn_t *c)
{
{
int i, iovnum;
struct iovec *new_iov= (struct iovec *)realloc(c->iov,
- ((uint64_t)c->iovsize
+ ((size_t)c->iovsize
* 2)
* sizeof(struct iovec));
if (! new_iov)
}
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_ensure_iov_space */
* @param buf, the buffer includes data to send
* @param len, the data length in the buffer
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
{
*/
limit_to_mtu= c->udp;
+#ifdef IOV_MAX
/* We may need to start a new msghdr if this one is full. */
if ((m->msg_iovlen == IOV_MAX)
|| (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
ms_add_msghdr(c);
m= &c->msglist[c->msgused - 1];
}
+#endif
if (ms_ensure_iov_space(c) != 0)
return -1;
}
while (leftover > 0);
- return 0;
+ return EXIT_SUCCESS;
} /* ms_add_iov */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_udp_headers(ms_conn_t *c)
{
hdr= c->hdrbuf;
for (i= 0; i < c->msgused; i++)
{
- c->msglist[i].msg_iov[0].iov_base= hdr;
+ c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
*hdr++= (unsigned char)(c->request_id / 256);
*hdr++= (unsigned char)(c->request_id % 256);
+ UDP_HEADER_SIZE));
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_build_udp_headers */
res= sendmsg(c->sfd, m, 0);
if (res > 0)
{
- __sync_fetch_and_add(&ms_stats.bytes_written, res);
+ atomic_add_size(&ms_stats.bytes_written, res);
/* We've written some of the data. Remove the completed
* iovec entries from the list of pending writes. */
* adjust it so the next write will do the rest. */
if (res > 0)
{
- m->msg_iov->iov_base= (unsigned char *)m->msg_iov->iov_base + res;
- m->msg_iov->iov_len-= (uint64_t)res;
+ m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
+ m->msg_iov->iov_len-= (size_t)res;
}
return TRANSMIT_INCOMPLETE;
}
*/
static bool ms_update_event(ms_conn_t *c, const int new_flags)
{
- /* default event timeout 10 seconds */
- struct timeval t=
- {
- .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
- };
-
assert(c != NULL);
struct event_base *base= c->event.ev_base;
event_base_set(base, &c->event);
c->ev_flags= (short)new_flags;
- if (c->total_sfds == 1)
+ if (event_add(&c->event, NULL) == -1)
{
- if (event_add(&c->event, NULL) == -1)
- {
- return false;
- }
- }
- else
- {
- if (event_add(&c->event, &t) == -1)
- {
- return false;
- }
+ return false;
}
return true;
*/
static bool ms_need_yield(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
int64_t tps= 0;
int64_t time_diff= 0;
struct timeval curr_time;
if (ms_setting.expected_tps > 0)
{
gettimeofday(&curr_time, NULL);
- time_diff= ms_time_diff(&ms_thread.startup_time, &curr_time);
- tps=
- (int64_t)((task->get_opt
- + task->set_opt) / ((uint64_t)time_diff / 1000000));
+ time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
+ tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000);
/* current throughput is greater than expected throughput */
- if (tps > ms_thread.thread_ctx->tps_perconn)
+ if (tps > ms_thread->thread_ctx->tps_perconn)
{
return true;
}
}
assert(fd == c->sfd);
- /* event timeout, close the current connection */
- if (c->which == EV_TIMEOUT)
- {
- ms_conn_set_state(c, conn_closing);
- }
-
ms_drive_machine(c);
/* wait for next event */
* @param c, pointer of the concurrency
* @param cmd, command(get or set )
*
- * @return int, if success, return the index, else return 0
+ * @return int, if success, return the index, else return EXIT_SUCCESS
*/
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
{
- int sock_index= -1;
- int i= 0;
+ uint32_t sock_index= 0;
+ uint32_t i= 0;
if (c->total_sfds == 1)
{
- return 0;
+ return EXIT_SUCCESS;
}
if (ms_setting.rep_write_srv == 0)
if (i == ms_setting.rep_write_srv)
{
/* random get one replication server to read */
- sock_index= (int)(random() % c->total_sfds);
+ sock_index= (uint32_t)random() % c->total_sfds;
}
else
{
/* random get one replication writing server to write */
- sock_index= (int)(random() % ms_setting.rep_write_srv);
+ sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
}
}
else if (cmd == CMD_GET)
{
/* random get one replication server to read */
- sock_index= (int)(random() % c->total_sfds);
+ sock_index= (uint32_t)random() % c->total_sfds;
}
}
while (c->tcpsfd[sock_index] == 0);
*
* @return int, return the index
*/
-static int ms_get_next_sock_index(ms_conn_t *c)
+static uint32_t ms_get_next_sock_index(ms_conn_t *c)
{
- int sock_index= 0;
+ uint32_t sock_index= 0;
do
{
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_update_conn_sock_event(ms_conn_t *c)
{
}
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_update_conn_sock_event */
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
{
int write_len;
char *buffer= c->wbuf;
- write_len= sprintf(buffer,
- " %u %d %d\r\n",
- 0,
- item->exp_time,
- item->value_size);
+ write_len= snprintf(buffer,
+ c->wsize,
+ " %u %d %d\r\n",
+ 0,
+ item->exp_time,
+ item->value_size);
- if (write_len > c->wsize)
+ if (write_len > c->wsize || write_len < 0)
{
/* ought to be always enough. just fail for simplicity */
fprintf(stderr, "output command line too long.\n");
return -1;
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_build_ascii_write_buf_set */
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
{
}
}
- __sync_fetch_and_add(&ms_stats.obj_bytes,
- item->key_size + item->value_size);
- __sync_fetch_and_add(&ms_stats.cmd_set, 1);
+ atomic_add_size(&ms_stats.obj_bytes,
+ item->key_size + item->value_size);
+ atomic_add_size(&ms_stats.cmd_set, 1);
- return 0;
+ return EXIT_SUCCESS;
} /* ms_mcd_set */
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
{
return -1;
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_build_ascii_write_buf_get */
* @param c, pointer of the concurrency
* @param item, pointer of task item which includes the object
* information
- * @param verify, whether do verification
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
{
- /* verify not supported yet */
- UNUSED_ARGUMENT(verify);
-
assert(c != NULL);
c->currcmd.cmd= CMD_GET;
}
}
- __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+ atomic_add_size(&ms_stats.cmd_get, 1);
- return 0;
+ return EXIT_SUCCESS;
} /* ms_mcd_get */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
{
return -1;
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_build_ascii_write_buf_mlget */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
int ms_mcd_mlget(ms_conn_t *c)
{
for (int i= 0; i < c->mlget_task.mlget_num; i++)
{
item= c->mlget_task.mlget_item[i].item;
- __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+ atomic_add_size(&ms_stats.cmd_get, 1);
}
- return 0;
+ (void)item;
+
+ return EXIT_SUCCESS;
} /* ms_mcd_mlget */
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_bin_process_response(ms_conn_t *c)
{
{
c->rvbytes= (int32_t)bodylen;
c->readval= true;
- return 1;
+ return EXIT_FAILURE;
}
else
{
}
}
- return 0;
+ return EXIT_SUCCESS;
} /* ms_bin_process_response */
header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
header->request.opcode= (uint8_t)opcode;
- header->request.keylen= htonl(key_len);
+ header->request.keylen= htons(key_len);
header->request.extlen= (uint8_t)hdr_len;
header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
- header->request.reserved= 0;
+ header->request.vbucket= 0;
header->request.bodylen= htonl(body_len);
header->request.opaque= 0;
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
{
}
ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
- return 0;
+ return EXIT_SUCCESS;
} /* ms_build_bin_write_buf_set */
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
{
(uint32_t)item->key_size);
ms_add_key_to_iov(c, item);
- return 0;
+ return EXIT_SUCCESS;
} /* ms_build_bin_write_buf_get */
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
{
c->wcurr= c->wbuf;
- return 0;
+ return EXIT_SUCCESS;
} /* ms_build_bin_write_buf_mlget */