* http://www.schoonerinfotech.com/
*
*/
+
+#include "config.h"
+
#include <stdio.h>
+#include <inttypes.h>
#include <limits.h>
#include <sys/uio.h>
#include <event.h>
#include <fcntl.h>
#include <netinet/tcp.h>
+#include <netinet/in.h>
#include <arpa/inet.h>
+#if TIME_WITH_SYS_TIME
+# include <sys/time.h>
+# include <time.h>
+#else
+# if HAVE_SYS_TIME_H
+# include <sys/time.h>
+# else
+# include <time.h>
+# endif
+#endif
#include "ms_setting.h"
#include "ms_thread.h"
+#include "ms_atomic.h"
+
+#ifdef linux
+/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
+ * optimize the conversion functions, but the prototypes generate warnings
+ * from gcc. The conversion methods isn't the bottleneck for my app, so
+ * just remove the warnings by undef'ing the optimization ..
+ */
+#undef ntohs
+#undef ntohl
+#undef htons
+#undef htonl
+#endif
/* for network write */
-#define TRANSMIT_COMPLETE 0
-#define TRANSMIT_INCOMPLETE 1
-#define TRANSMIT_SOFT_ERROR 2
-#define TRANSMIT_HARD_ERROR 3
+#define TRANSMIT_COMPLETE 0
+#define TRANSMIT_INCOMPLETE 1
+#define TRANSMIT_SOFT_ERROR 2
+#define TRANSMIT_HARD_ERROR 3
/* for generating key */
-#define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
-#define KEY_PREFIX_MASK 0x1010101010101010
+#define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
+#define KEY_PREFIX_MASK 0x1010101010101010
/* For parse the value length return by server */
-#define KEY_TOKEN 1
-#define VALUELEN_TOKEN 3
+#define KEY_TOKEN 1
+#define VALUELEN_TOKEN 3
/* global increasing counter, to ensure the key prefix unique */
-static uint64_t key_prefix_seq = KEY_PREFIX_BASE;
+static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
/* global increasing counter, generating request id for UDP */
-static int udp_request_id = 0;
+static volatile uint32_t udp_request_id= 0;
-extern __thread ms_thread_t ms_thread;
+extern pthread_key_t ms_thread_key;
/* generate upd request id */
-static int ms_get_udp_request_id(void);
+static uint32_t ms_get_udp_request_id(void);
+
/* connect initialize */
static void ms_task_init(ms_conn_t *c);
static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
static int ms_conn_sock_init(ms_conn_t *c);
static int ms_conn_event_init(ms_conn_t *c);
-static int ms_conn_init(ms_conn_t *c, const int init_state,
- const int read_buffer_size, const bool is_udp);
+static int ms_conn_init(ms_conn_t *c,
+ const int init_state,
+ const int read_buffer_size,
+ const bool is_udp);
static void ms_warmup_num_init(ms_conn_t *c);
static int ms_item_win_init(ms_conn_t *c);
+
/* connection close */
void ms_conn_free(ms_conn_t *c);
static void ms_conn_close(ms_conn_t *c);
+
/* create network connection */
static int ms_new_socket(struct addrinfo *ai);
static void ms_maximize_sndbuf(const int sfd);
-static int ms_network_connect(ms_conn_t *c, char * srv_host_name,
- const int srv_port, const bool is_udp, int *ret_sfd);
+static int ms_network_connect(ms_conn_t *c,
+ char *srv_host_name,
+ const int srv_port,
+ const bool is_udp,
+ int *ret_sfd);
static int ms_reconn(ms_conn_t *c);
+
/* read and parse */
-static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens);
+static int ms_tokenize_command(char *command,
+ token_t *tokens,
+ const int max_tokens);
static int ms_ascii_process_line(ms_conn_t *c, char *command);
static int ms_try_read_line(ms_conn_t *c);
static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
static int ms_udp_read(ms_conn_t *c, char *buf, int len);
static int ms_try_read_network(ms_conn_t *c);
-static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item,
- char *value, int vlen);
+static void ms_verify_value(ms_conn_t *c,
+ ms_mlget_task_item_t *mlget_item,
+ char *value,
+ int vlen);
static void ms_ascii_complete_nread(ms_conn_t *c);
static void ms_bin_complete_nread(ms_conn_t *c);
static void ms_complete_nread(ms_conn_t *c);
+
/* send functions */
static int ms_add_msghdr(ms_conn_t *c);
static int ms_ensure_iov_space(ms_conn_t *c);
static int ms_build_udp_headers(ms_conn_t *c);
static int ms_transmit(ms_conn_t *c);
+
/* status adjustment */
static void ms_conn_shrink(ms_conn_t *c);
static void ms_conn_set_state(ms_conn_t *c, int state);
static bool ms_update_event(ms_conn_t *c, const int new_flags);
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
-static int ms_get_next_sock_index(ms_conn_t *c);
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
+static uint32_t ms_get_next_sock_index(ms_conn_t *c);
static int ms_update_conn_sock_event(ms_conn_t *c);
static bool ms_need_yield(ms_conn_t *c);
static void ms_update_start_time(ms_conn_t *c);
+
/* main loop */
static void ms_drive_machine(ms_conn_t *c);
void ms_event_handler(const int fd, const short which, void *arg);
+
/* ascii protocol */
static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
+
/* binary protocol */
static int ms_bin_process_response(ms_conn_t *c);
-static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len,
- uint16_t key_len, uint32_t body_len);
+static void ms_add_bin_header(ms_conn_t *c,
+ uint8_t opcode,
+ uint8_t hdr_len,
+ uint16_t key_len,
+ uint32_t body_len);
static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
+
/**
* each key has two parts, prefix and suffix. The suffix is a
* string random get form the character table. The prefix is a
*/
uint64_t ms_get_key_prefix(void)
{
- uint64_t key_prefix;
+ uint64_t key_prefix;
- pthread_mutex_lock(&ms_global.seq_mutex);
- key_prefix_seq |= KEY_PREFIX_MASK;
- key_prefix = key_prefix_seq;
- key_prefix_seq++;
- pthread_mutex_unlock(&ms_global.seq_mutex);
+ pthread_mutex_lock(&ms_global.seq_mutex);
+ key_prefix_seq|= KEY_PREFIX_MASK;
+ key_prefix= key_prefix_seq;
+ key_prefix_seq++;
+ pthread_mutex_unlock(&ms_global.seq_mutex);
+
+ return key_prefix;
+} /* ms_get_key_prefix */
- return key_prefix;
-}
/**
* get an unique udp request id
*
* @return an unique UDP request id
*/
-static int ms_get_udp_request_id(void)
+static uint32_t ms_get_udp_request_id(void)
{
- return(__sync_fetch_and_add(&udp_request_id, 1));
+ return atomic_add_32_nv(&udp_request_id, 1);
}
+
/**
* initialize current task structure
*
*/
static void ms_task_init(ms_conn_t *c)
{
- c->curr_task.cmd = CMD_NULL;
- c->curr_task.item = 0;
- c->curr_task.verify = false;
- c->curr_task.finish_verify = true;
- c->curr_task.get_miss = true;
-
- c->curr_task.get_opt = 0;
- c->curr_task.set_opt = 0;
- c->curr_task.cycle_undo_get = 0;
- c->curr_task.cycle_undo_set = 0;
- c->curr_task.verified_get = 0;
- c->curr_task.overwrite_set = 0;
-}
+ c->curr_task.cmd= CMD_NULL;
+ c->curr_task.item= 0;
+ c->curr_task.verify= false;
+ c->curr_task.finish_verify= true;
+ c->curr_task.get_miss= true;
+
+ c->curr_task.get_opt= 0;
+ c->curr_task.set_opt= 0;
+ c->curr_task.cycle_undo_get= 0;
+ c->curr_task.cycle_undo_set= 0;
+ c->curr_task.verified_get= 0;
+ c->curr_task.overwrite_set= 0;
+} /* ms_task_init */
+
/**
* initialize udp for the connection structure
* @param c, pointer of the concurrency
* @param is_udp, whether it's udp
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
{
- c->hdrbuf = 0;
- c->rudpbuf = 0;
- c->udppkt = 0;
-
- c->rudpsize = UDP_DATA_BUFFER_SIZE;
- c->hdrsize = 0;
-
- c->rudpbytes = 0;
- c->packets = 0;
- c->recvpkt = 0;
- c->pktcurr = 0;
- c->ordcurr = 0;
-
- c->udp = is_udp;
-
- if (c->udp || (!c->udp && ms_setting.facebook_test)) {
- c->rudpbuf = (char *)malloc((size_t)c->rudpsize);
- c->udppkt = (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
-
- if (c->rudpbuf == NULL || c->udppkt == NULL) {
- if (c->rudpbuf != NULL) free(c->rudpbuf);
- if (c->udppkt != NULL) free(c->udppkt);
- fprintf(stderr, "malloc()\n");
- return -1;
- }
- memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
+ c->hdrbuf= 0;
+ c->rudpbuf= 0;
+ c->udppkt= 0;
+
+ c->rudpsize= UDP_DATA_BUFFER_SIZE;
+ c->hdrsize= 0;
+
+ c->rudpbytes= 0;
+ c->packets= 0;
+ c->recvpkt= 0;
+ c->pktcurr= 0;
+ c->ordcurr= 0;
+
+ c->udp= is_udp;
+
+ if (c->udp || (! c->udp && ms_setting.facebook_test))
+ {
+ c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
+ c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
+
+ if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
+ {
+ if (c->rudpbuf != NULL)
+ free(c->rudpbuf);
+ if (c->udppkt != NULL)
+ free(c->udppkt);
+ fprintf(stderr, "malloc()\n");
+ return -1;
}
+ memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_conn_udp_init */
- return 0;
-}
/**
* initialize the connection structure
* @param read_buffer_size
* @param is_udp, whether it's udp
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
-static int ms_conn_init(ms_conn_t *c, const int init_state,
- const int read_buffer_size, const bool is_udp)
+static int ms_conn_init(ms_conn_t *c,
+ const int init_state,
+ const int read_buffer_size,
+ const bool is_udp)
{
- assert(c != NULL);
-
- c->rbuf = c->wbuf = 0;
- c->iov = 0;
- c->msglist = 0;
-
- c->rsize = read_buffer_size;
- c->wsize = WRITE_BUFFER_SIZE;
- c->iovsize = IOV_LIST_INITIAL;
- c->msgsize = MSG_LIST_INITIAL;
-
- /* for replication, each connection need connect all the server */
- if (ms_setting.rep_write_srv > 0) {
- c->total_sfds = ms_setting.srv_cnt;
- } else {
- c->total_sfds = ms_setting.sock_per_conn;
- }
- c->alive_sfds = 0;
-
- c->rbuf = (char *)malloc((size_t)c->rsize);
- c->wbuf = (char *)malloc((size_t)c->wsize);
- c->iov = (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
- c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * (size_t)c->msgsize);
- if (ms_setting.mult_key_num > 1) {
- c->mlget_task.mlget_item = (ms_mlget_task_item_t *)
- malloc(sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
- }
- c->tcpsfd = (int *)malloc((size_t)c->total_sfds * sizeof(int));
-
- if (c->rbuf == NULL || c->wbuf == NULL || c->iov == NULL
- || c->msglist == NULL || c->tcpsfd == NULL
- || (ms_setting.mult_key_num > 1 && c->mlget_task.mlget_item == NULL)) {
-
- if (c->rbuf != NULL) free(c->rbuf);
- if (c->wbuf != NULL) free(c->wbuf);
- if (c->iov != NULL) free(c->iov);
- if (c->msglist != NULL) free(c->msglist);
- if (c->mlget_task.mlget_item != NULL) free(c->mlget_task.mlget_item);
- if (c->tcpsfd != NULL) free(c->tcpsfd);
- fprintf(stderr, "malloc()\n");
- return -1;
- }
-
- c->state = init_state;
- c->rvbytes = 0;
- c->rbytes = 0;
- c->rcurr = c->rbuf;
- c->wcurr = c->wbuf;
- c->iovused = 0;
- c->msgcurr = 0;
- c->msgused = 0;
- c->cur_idx = c->total_sfds; /* default index is a invalid value */
-
- c->ctnwrite = false;
- c->readval = false;
- c->change_sfd = false;
-
- c->precmd.cmd = c->currcmd.cmd = CMD_NULL;
- c->precmd.isfinish = true; /* default the previous command finished */
- c->currcmd.isfinish = false;
- c->precmd.retstat = c->currcmd.retstat = MCD_FAILURE;
- c->precmd.key_prefix = c->currcmd.key_prefix = 0;
-
- c->mlget_task.mlget_num = 0;
- c->mlget_task.value_index = -1; /* default invalid value */
-
- if (ms_setting.binary_prot) {
- c->protocol = binary_prot;
- } else if (is_udp) {
- c->protocol = ascii_udp_prot;
- } else {
- c->protocol = ascii_prot;
- }
+ assert(c != NULL);
+
+ c->rbuf= c->wbuf= 0;
+ c->iov= 0;
+ c->msglist= 0;
+
+ c->rsize= read_buffer_size;
+ c->wsize= WRITE_BUFFER_SIZE;
+ c->iovsize= IOV_LIST_INITIAL;
+ c->msgsize= MSG_LIST_INITIAL;
+
+ /* for replication, each connection need connect all the server */
+ if (ms_setting.rep_write_srv > 0)
+ {
+ c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
+ }
+ else
+ {
+ c->total_sfds= ms_setting.sock_per_conn;
+ }
+ c->alive_sfds= 0;
+
+ c->rbuf= (char *)malloc((size_t)c->rsize);
+ c->wbuf= (char *)malloc((size_t)c->wsize);
+ c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
+ c->msglist= (struct msghdr *)malloc(
+ sizeof(struct msghdr) * (size_t)c->msgsize);
+ if (ms_setting.mult_key_num > 1)
+ {
+ c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
+ malloc(
+ sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
+ }
+ c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
+
+ if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
+ || (c->msglist == NULL) || (c->tcpsfd == NULL)
+ || ((ms_setting.mult_key_num > 1)
+ && (c->mlget_task.mlget_item == NULL)))
+ {
+ if (c->rbuf != NULL)
+ free(c->rbuf);
+ if (c->wbuf != NULL)
+ free(c->wbuf);
+ if (c->iov != NULL)
+ free(c->iov);
+ if (c->msglist != NULL)
+ free(c->msglist);
+ if (c->mlget_task.mlget_item != NULL)
+ free(c->mlget_task.mlget_item);
+ if (c->tcpsfd != NULL)
+ free(c->tcpsfd);
+ fprintf(stderr, "malloc()\n");
+ return -1;
+ }
+
+ c->state= init_state;
+ c->rvbytes= 0;
+ c->rbytes= 0;
+ c->rcurr= c->rbuf;
+ c->wcurr= c->wbuf;
+ c->iovused= 0;
+ c->msgcurr= 0;
+ c->msgused= 0;
+ c->cur_idx= c->total_sfds; /* default index is a invalid value */
+
+ c->ctnwrite= false;
+ c->readval= false;
+ c->change_sfd= false;
+
+ c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
+ c->precmd.isfinish= true; /* default the previous command finished */
+ c->currcmd.isfinish= false;
+ c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
+ c->precmd.key_prefix= c->currcmd.key_prefix= 0;
+
+ c->mlget_task.mlget_num= 0;
+ c->mlget_task.value_index= -1; /* default invalid value */
+
+ if (ms_setting.binary_prot)
+ {
+ c->protocol= binary_prot;
+ }
+ else
+ {
+ c->protocol= ascii_prot;
+ }
+
+ /* initialize udp */
+ if (ms_conn_udp_init(c, is_udp) != 0)
+ {
+ return -1;
+ }
- /* initialize udp */
- if (ms_conn_udp_init(c, is_udp) != 0) {
- return -1;
- }
+ /* initialize task */
+ ms_task_init(c);
- /* initialize task */
- ms_task_init(c);
+ if (! (ms_setting.facebook_test && is_udp))
+ {
+ atomic_add_32(&ms_stats.active_conns, 1);
+ }
- if (!(ms_setting.facebook_test && is_udp)) {
- __sync_fetch_and_add(&ms_stats.active_conns, 1);
- }
+ return EXIT_SUCCESS;
+} /* ms_conn_init */
- return 0;
-}
/**
* when doing 100% get operation, it could preset some objects
*/
static void ms_warmup_num_init(ms_conn_t *c)
{
- /* no set operation, preset all the items in the window */
- if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) {
- c->warmup_num = c->win_size;
- c->remain_warmup_num = c->warmup_num;
- } else {
- c->warmup_num = 0;
- c->remain_warmup_num = c->warmup_num;
- }
-}
+ /* no set operation, preset all the items in the window */
+ if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
+ {
+ c->warmup_num= c->win_size;
+ c->remain_warmup_num= c->warmup_num;
+ }
+ else
+ {
+ c->warmup_num= 0;
+ c->remain_warmup_num= c->warmup_num;
+ }
+} /* ms_warmup_num_init */
+
/**
* each connection has an item window, this function initialize
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_item_win_init(ms_conn_t *c)
{
- int exp_cnt = 0;
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ int exp_cnt= 0;
+
+ c->win_size= (int)ms_setting.win_size;
+ c->set_cursor= 0;
+ c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
+ c->remain_exec_num= c->exec_num;
+
+ c->item_win= (ms_task_item_t *)malloc(
+ sizeof(ms_task_item_t) * (size_t)c->win_size);
+ if (c->item_win == NULL)
+ {
+ fprintf(stderr, "Can't allocate task item array for conn.\n");
+ return -1;
+ }
+ memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
- c->win_size = (int)ms_setting.win_size;
- c->set_cursor = 0;
- c->exec_num = ms_thread.thread_ctx->exec_num_perconn;
- c->remain_exec_num = c->exec_num;
+ for (int i= 0; i < c->win_size; i++)
+ {
+ c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
+ c->item_win[i].key_prefix= ms_get_key_prefix();
+ c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
+ c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
+ c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
+ c->item_win[i].client_time= 0;
- c->item_win = (ms_task_item_t *)malloc(sizeof(ms_task_item_t) * (size_t)c->win_size);
- if (c->item_win == NULL) {
- fprintf(stderr, "Can't allocate task item array for conn.\n");
- return -1;
+ /* set expire time base on the proportion */
+ if (exp_cnt < ms_setting.exp_ver_per * i)
+ {
+ c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
+ exp_cnt++;
}
- memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
-
- for (int i = 0; i < c->win_size; i++) {
- c->item_win[i].key_size = (int)ms_setting.distr[i].key_size;
- c->item_win[i].key_prefix = ms_get_key_prefix();
- c->item_win[i].key_suffix_offset = ms_setting.distr[i].key_offset;
- c->item_win[i].value_size = (int)ms_setting.distr[i].value_size;
- c->item_win[i].value_offset = INVALID_OFFSET; /* default in invalid offset */
- c->item_win[i].client_time = 0;
-
- /* set expire time base on the proportion */
- if (exp_cnt < ms_setting.exp_ver_per * i) {
- c->item_win[i].exp_time = FIXED_EXPIRE_TIME;
- exp_cnt++;
- } else {
- c->item_win[i].exp_time = 0;
- }
+ else
+ {
+ c->item_win[i].exp_time= 0;
}
+ }
- ms_warmup_num_init(c);
+ ms_warmup_num_init(c);
+
+ return EXIT_SUCCESS;
+} /* ms_item_win_init */
- return 0;
-}
/**
* each connection structure can include one or more sock
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_conn_sock_init(ms_conn_t *c)
{
- int i;
- int ret_sfd;
- int srv_idx = 0;
-
- assert(c != NULL);
- assert(c->tcpsfd != NULL);
-
- for (i = 0; i < c->total_sfds; i++) {
- ret_sfd = 0;
- if (ms_setting.rep_write_srv > 0) {
- /* for replication, each connection need connect all the server */
- srv_idx = i;
- } else {
- /* all the connections in a thread connects the same server */
- srv_idx = ms_thread.thread_ctx->srv_idx;
- }
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ uint32_t i;
+ int ret_sfd;
+ uint32_t srv_idx= 0;
- if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
- ms_setting.servers[srv_idx].srv_port,
- ms_setting.udp, &ret_sfd) != 0) {
- break;
- }
+ assert(c != NULL);
+ assert(c->tcpsfd != NULL);
- if (i == 0) {
- c->sfd = ret_sfd;
- }
+ for (i= 0; i < c->total_sfds; i++)
+ {
+ ret_sfd= 0;
+ if (ms_setting.rep_write_srv > 0)
+ {
+ /* for replication, each connection need connect all the server */
+ srv_idx= i % ms_setting.srv_cnt;
+ }
+ else
+ {
+ /* all the connections in a thread connects the same server */
+ srv_idx= ms_thread->thread_ctx->srv_idx;
+ }
- if (!ms_setting.udp) {
- c->tcpsfd[i] = ret_sfd;
- }
+ if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
+ ms_setting.servers[srv_idx].srv_port,
+ ms_setting.udp, &ret_sfd) != 0)
+ {
+ break;
+ }
- c->alive_sfds++;
+ if (i == 0)
+ {
+ c->sfd= ret_sfd;
}
- /* initialize udp sock handler if necessary */
- if (ms_setting.facebook_test) {
- ret_sfd = 0;
- if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
- ms_setting.servers[srv_idx].srv_port,
- true, &ret_sfd) != 0) {
- c->udpsfd = 0;
- } else {
- c->udpsfd = ret_sfd;
- }
+ if (! ms_setting.udp)
+ {
+ c->tcpsfd[i]= ret_sfd;
}
- if (i != c->total_sfds || (ms_setting.facebook_test && c->udpsfd == 0)) {
- if (ms_setting.udp) {
- close(c->sfd);
- } else {
- for (int j = 0; j < i; j++) {
- close(c->tcpsfd[j]);
- }
- }
+ c->alive_sfds++;
+ }
- if (c->udpsfd != 0) {
- close(c->udpsfd);
- }
+ /* initialize udp sock handler if necessary */
+ if (ms_setting.facebook_test)
+ {
+ ret_sfd= 0;
+ if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
+ ms_setting.servers[srv_idx].srv_port,
+ true, &ret_sfd) != 0)
+ {
+ c->udpsfd= 0;
+ }
+ else
+ {
+ c->udpsfd= ret_sfd;
+ }
+ }
- return -1;
+ if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
+ {
+ if (ms_setting.udp)
+ {
+ close(c->sfd);
+ }
+ else
+ {
+ for (uint32_t j= 0; j < i; j++)
+ {
+ close(c->tcpsfd[j]);
+ }
}
- return 0;
-}
+ if (c->udpsfd != 0)
+ {
+ close(c->udpsfd);
+ }
+
+ return -1;
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_conn_sock_init */
+
/**
* each connection is managed by libevent, this function
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_conn_event_init(ms_conn_t *c)
{
- /* default event timeout 10 seconds */
- struct timeval t = {.tv_sec = EVENT_TIMEOUT, .tv_usec = 0};
- short event_flags = EV_WRITE | EV_PERSIST;
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ short event_flags= EV_WRITE | EV_PERSIST;
- event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
- event_base_set(ms_thread.base, &c->event);
- c->ev_flags = event_flags;
+ event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
+ event_base_set(ms_thread->base, &c->event);
+ c->ev_flags= event_flags;
- if (c->total_sfds == 1) {
- if (event_add(&c->event, NULL) == -1) {
- return -1;
- }
- } else {
- if (event_add(&c->event, &t) == -1) {
- return -1;
- }
- }
+ if (event_add(&c->event, NULL) == -1)
+ {
+ return -1;
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_conn_event_init */
- return 0;
-}
/**
* setup a connection, each connection structure of each
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
int ms_setup_conn(ms_conn_t *c)
{
- if (ms_item_win_init(c) != 0) {
- return -1;
- }
+ if (ms_item_win_init(c) != 0)
+ {
+ return -1;
+ }
- if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0) {
- return -1;
- }
+ if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
+ {
+ return -1;
+ }
- if (ms_conn_sock_init(c) != 0) {
- return -1;
- }
+ if (ms_conn_sock_init(c) != 0)
+ {
+ return -1;
+ }
- if (ms_conn_event_init(c) != 0) {
- return -1;
- }
+ if (ms_conn_event_init(c) != 0)
+ {
+ return -1;
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_setup_conn */
- return 0;
-}
/**
* Frees a connection.
*/
void ms_conn_free(ms_conn_t *c)
{
- if (c != NULL) {
- if (c->hdrbuf != NULL)
- free(c->hdrbuf);
- if (c->msglist != NULL)
- free(c->msglist);
- if (c->rbuf != NULL)
- free(c->rbuf);
- if (c->wbuf != NULL)
- free(c->wbuf);
- if (c->iov != NULL)
- free(c->iov);
- if (c->mlget_task.mlget_item != NULL)
- free(c->mlget_task.mlget_item);
- if (c->rudpbuf != NULL)
- free(c->rudpbuf);
- if (c->udppkt != NULL)
- free(c->udppkt);
- if (c->item_win != NULL)
- free(c->item_win);
- if (c->tcpsfd != NULL)
- free(c->tcpsfd);
-
- if (--ms_thread.nactive_conn == 0) {
- free(ms_thread.conn);
- }
- }
-}
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ if (c != NULL)
+ {
+ if (c->hdrbuf != NULL)
+ free(c->hdrbuf);
+ if (c->msglist != NULL)
+ free(c->msglist);
+ if (c->rbuf != NULL)
+ free(c->rbuf);
+ if (c->wbuf != NULL)
+ free(c->wbuf);
+ if (c->iov != NULL)
+ free(c->iov);
+ if (c->mlget_task.mlget_item != NULL)
+ free(c->mlget_task.mlget_item);
+ if (c->rudpbuf != NULL)
+ free(c->rudpbuf);
+ if (c->udppkt != NULL)
+ free(c->udppkt);
+ if (c->item_win != NULL)
+ free(c->item_win);
+ if (c->tcpsfd != NULL)
+ free(c->tcpsfd);
+
+ if (--ms_thread->nactive_conn == 0)
+ {
+ free(ms_thread->conn);
+ }
+ }
+} /* ms_conn_free */
+
/**
* close a connection
*/
static void ms_conn_close(ms_conn_t *c)
{
- assert(c != NULL);
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ assert(c != NULL);
- /* delete the event, the socket and the connection */
- event_del(&c->event);
+ /* delete the event, the socket and the connection */
+ event_del(&c->event);
- for (int i = 0; i < c->total_sfds; i++) {
- if (c->tcpsfd[i] > 0) {
- close(c->tcpsfd[i]);
- }
+ for (uint32_t i= 0; i < c->total_sfds; i++)
+ {
+ if (c->tcpsfd[i] > 0)
+ {
+ close(c->tcpsfd[i]);
}
- c->sfd = 0;
+ }
+ c->sfd= 0;
- if (ms_setting.facebook_test) {
- close(c->udpsfd);
- }
+ if (ms_setting.facebook_test)
+ {
+ close(c->udpsfd);
+ }
- __sync_fetch_and_sub(&ms_stats.active_conns, 1);
+ atomic_dec_32(&ms_stats.active_conns);
- ms_conn_free(c);
+ ms_conn_free(c);
- if (ms_setting.run_time == 0) {
- pthread_mutex_lock(&ms_global.run_lock.lock);
- ms_global.run_lock.count++;
- pthread_cond_signal(&ms_global.run_lock.cond);
- pthread_mutex_unlock(&ms_global.run_lock.lock);
- }
+ if (ms_setting.run_time == 0)
+ {
+ pthread_mutex_lock(&ms_global.run_lock.lock);
+ ms_global.run_lock.count++;
+ pthread_cond_signal(&ms_global.run_lock.cond);
+ pthread_mutex_unlock(&ms_global.run_lock.lock);
+ }
+
+ if (ms_thread->nactive_conn == 0)
+ {
+ pthread_exit(NULL);
+ }
+} /* ms_conn_close */
- if (ms_thread.nactive_conn == 0) {
- pthread_exit(NULL);
- }
-}
/**
* create a new sock
*
* @param ai, server address information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_new_socket(struct addrinfo *ai)
{
- int sfd;
+ int sfd;
- if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
- fprintf(stderr, "socket() error: %s.\n", strerror(errno));
- return -1;
- }
+ if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
+ {
+ fprintf(stderr, "socket() error: %s.\n", strerror(errno));
+ return -1;
+ }
+
+ return sfd;
+} /* ms_new_socket */
- return sfd;
-}
/**
* Sets a socket's send buffer size to the maximum allowed by the system.
*/
static void ms_maximize_sndbuf(const int sfd)
{
- socklen_t intsize = sizeof(int);
- unsigned int last_good = 0;
- unsigned int min, max, avg;
- unsigned int old_size;
-
- /* Start with the default size. */
- if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
- fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
- return;
- }
-
- /* Binary-search for the real maximum. */
- min = old_size;
- max = MAX_SENDBUF_SIZE;
-
- while (min <= max) {
- avg = ((unsigned int)(min + max)) / 2;
- if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
- last_good = avg;
- min = avg + 1;
- } else {
- max = avg - 1;
- }
+ socklen_t intsize= sizeof(int);
+ unsigned int last_good= 0;
+ unsigned int min, max, avg;
+ unsigned int old_size;
+
+ /* Start with the default size. */
+ if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
+ {
+ fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
+ return;
+ }
+
+ /* Binary-search for the real maximum. */
+ min= old_size;
+ max= MAX_SENDBUF_SIZE;
+
+ while (min <= max)
+ {
+ avg= ((unsigned int)(min + max)) / 2;
+ if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
+ {
+ last_good= avg;
+ min= avg + 1;
}
-}
+ else
+ {
+ max= avg - 1;
+ }
+ }
+} /* ms_maximize_sndbuf */
+
/**
* socket connects the server
* @param is_udp, whether it's udp
* @param ret_sfd, the connected socket file descriptor
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
-static int ms_network_connect(ms_conn_t *c, char * srv_host_name,
- const int srv_port, const bool is_udp, int *ret_sfd)
+static int ms_network_connect(ms_conn_t *c,
+ char *srv_host_name,
+ const int srv_port,
+ const bool is_udp,
+ int *ret_sfd)
{
- int sfd;
- struct linger ling = {0, 0};
- struct addrinfo *ai;
- struct addrinfo *next;
- struct addrinfo hints;
- char port_buf[NI_MAXSERV];
- int error;
- int success = 0;
-
- int flags = 1;
-
- /*
- * the memset call clears nonstandard fields in some impementations
- * that otherwise mess things up.
- */
- memset(&hints, 0, sizeof (hints));
- hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
- if (is_udp) {
- hints.ai_protocol = IPPROTO_UDP;
- hints.ai_socktype = SOCK_DGRAM;
- hints.ai_family = AF_INET; /* This left here because of issues with OSX 10.5 */
- } else {
- hints.ai_family = AF_UNSPEC;
- hints.ai_protocol = IPPROTO_TCP;
- hints.ai_socktype = SOCK_STREAM;
- }
-
- snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
- error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
- if (error != 0) {
- if (error != EAI_SYSTEM)
- fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
- else
- perror("getaddrinfo()\n");
+ int sfd;
+ struct linger ling=
+ {
+ 0, 0
+ };
+ struct addrinfo *ai;
+ struct addrinfo *next;
+ struct addrinfo hints;
+ char port_buf[NI_MAXSERV];
+ int error;
+ int success= 0;
+
+ int flags= 1;
+
+ /*
+ * the memset call clears nonstandard fields in some impementations
+ * that otherwise mess things up.
+ */
+ memset(&hints, 0, sizeof(hints));
+#ifdef AI_ADDRCONFIG
+ hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
+#else
+ hints.ai_flags= AI_PASSIVE;
+#endif /* AI_ADDRCONFIG */
+ if (is_udp)
+ {
+ hints.ai_protocol= IPPROTO_UDP;
+ hints.ai_socktype= SOCK_DGRAM;
+ hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
+ }
+ else
+ {
+ hints.ai_family= AF_UNSPEC;
+ hints.ai_protocol= IPPROTO_TCP;
+ hints.ai_socktype= SOCK_STREAM;
+ }
+
+ snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
+ error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
+ if (error != 0)
+ {
+ if (error != EAI_SYSTEM)
+ fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
+ else
+ perror("getaddrinfo()\n");
+ return -1;
+ }
+
+ for (next= ai; next; next= next->ai_next)
+ {
+ if ((sfd= ms_new_socket(next)) == -1)
+ {
+ freeaddrinfo(ai);
+ return -1;
+ }
+
+ setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
+ if (is_udp)
+ {
+ ms_maximize_sndbuf(sfd);
+ }
+ else
+ {
+ setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
+ sizeof(flags));
+ setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
+ setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
+ sizeof(flags));
+ }
+
+ if (is_udp)
+ {
+ c->srv_recv_addr_size= sizeof(struct sockaddr);
+ memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
+ }
+ else
+ {
+ if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
+ {
+ close(sfd);
+ freeaddrinfo(ai);
return -1;
+ }
}
- for (next = ai; next; next = next->ai_next) {
- if ((sfd = ms_new_socket(next)) == -1) {
- freeaddrinfo(ai);
- return -1;
- }
-
- setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
- if (is_udp) {
- ms_maximize_sndbuf(sfd);
- } else {
- setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
- setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
- setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
- }
-
- if (is_udp) {
- c->srv_recv_addr_size = sizeof(struct sockaddr);
- memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
- } else {
- if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) {
- close(sfd);
- freeaddrinfo(ai);
- return -1;
- }
- }
+ if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
+ || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
+ {
+ fprintf(stderr, "setting O_NONBLOCK\n");
+ close(sfd);
+ freeaddrinfo(ai);
+ return -1;
+ }
- if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
- fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
- fprintf(stderr, "setting O_NONBLOCK\n");
- close(sfd);
- freeaddrinfo(ai);
- return -1;
- }
+ if (ret_sfd != NULL)
+ {
+ *ret_sfd= sfd;
+ }
- if (ret_sfd != NULL) {
- *ret_sfd = sfd;
- }
+ success++;
+ }
- success++;
- }
+ freeaddrinfo(ai);
- freeaddrinfo(ai);
+ /* Return zero if we detected no errors in starting up connections */
+ return success == 0;
+} /* ms_network_connect */
- /* Return zero if we detected no errors in starting up connections */
- return success == 0;
-}
/**
* reconnect a disconnected sock
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_reconn(ms_conn_t *c)
{
- int srv_idx = 0;
- int srv_conn_cnt = 0;
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ uint32_t srv_idx= 0;
+ uint32_t srv_conn_cnt= 0;
+
+ if (ms_setting.rep_write_srv > 0)
+ {
+ srv_idx= c->cur_idx % ms_setting.srv_cnt;
+ srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
+ }
+ else
+ {
+ srv_idx= ms_thread->thread_ctx->srv_idx;
+ srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
+ }
+
+ /* close the old socket handler */
+ close(c->sfd);
+ c->tcpsfd[c->cur_idx]= 0;
+
+ if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
+ % srv_conn_cnt == 0)
+ {
+ gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
+ fprintf(stderr, "Server %s:%d disconnect\n",
+ ms_setting.servers[srv_idx].srv_host_name,
+ ms_setting.servers[srv_idx].srv_port);
+ }
+
+ if (ms_setting.rep_write_srv > 0)
+ {
+ uint32_t i= 0;
+
+ for (i= 0; i < c->total_sfds; i++)
+ {
+ if (c->tcpsfd[i] != 0)
+ {
+ break;
+ }
+ }
+
+ /* all socks disconnect */
+ if (i == c->total_sfds)
+ {
+ return -1;
+ }
+ }
+ else
+ {
+ do
+ {
+ /* reconnect success, break the loop */
+ if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
+ ms_setting.servers[srv_idx].srv_port,
+ ms_setting.udp, &c->sfd) == 0)
+ {
+ c->tcpsfd[c->cur_idx]= c->sfd;
+ if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+ % (uint32_t)srv_conn_cnt == 0)
+ {
+ gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
+ int reconn_time=
+ (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
+ - ms_setting.servers[srv_idx].disconn_time
+ .tv_sec);
+ fprintf(stderr, "Server %s:%d reconnect after %ds\n",
+ ms_setting.servers[srv_idx].srv_host_name,
+ ms_setting.servers[srv_idx].srv_port, reconn_time);
+ }
+ break;
+ }
- if (ms_setting.rep_write_srv > 0) {
- srv_idx = c->cur_idx;
- srv_conn_cnt = ms_setting.nconns;
- } else {
- srv_idx = ms_thread.thread_ctx->srv_idx;
- srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt;
+ if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
+ {
+ /* wait a second and reconnect */
+ sleep(1);
+ }
}
+ while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
+ }
- /* close the old socket handler */
- close(c->sfd);
- c->tcpsfd[c->cur_idx] = 0;
+ if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
+ {
+ c->sfd= 0;
+ c->alive_sfds--;
+ }
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].disconn_cnt, 1)
- % srv_conn_cnt == 0) {
-
- gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
- fprintf(stderr, "Server %s:%d disconnect\n",
- ms_setting.servers[srv_idx].srv_host_name,
- ms_setting.servers[srv_idx].srv_port);
- }
-
- if (ms_setting.rep_write_srv > 0) {
- int i = 0;
- for (i = 0; i < c->total_sfds; i++) {
- if (c->tcpsfd[i] != 0) {
- break;
- }
- }
+ return EXIT_SUCCESS;
+} /* ms_reconn */
- /* all socks disconnect */
- if (i == c->total_sfds) {
- return -1;
- }
- } else {
- do {
- /* reconnect success, break the loop */
- if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
- ms_setting.servers[srv_idx].srv_port,
- ms_setting.udp, &c->sfd) == 0) {
-
- c->tcpsfd[c->cur_idx] = c->sfd;
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
- % srv_conn_cnt == 0) {
-
- gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
- int reconn_time = (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec -
- ms_setting.servers[srv_idx].disconn_time.tv_sec);
- fprintf(stderr, "Server %s:%d reconnect after %ds\n",
- ms_setting.servers[srv_idx].srv_host_name,
- ms_setting.servers[srv_idx].srv_port, reconn_time);
- }
- break;
- }
-
- if (c->total_sfds == 1) {
- /* wait a second and reconnect */
- sleep(1);
- }
- } while (c->total_sfds == 1);
- }
-
- if (c->total_sfds > 1 && c->tcpsfd[c->cur_idx] == 0) {
- c->sfd = 0;
- c->alive_sfds--;
- }
-
- return 0;
-}
/**
* reconnect several disconnected socks in the connection
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
int ms_reconn_socks(ms_conn_t *c)
{
- int srv_idx = 0;
- int ret_sfd = 0;
- int srv_conn_cnt = 0;
- struct timeval cur_time;
-
- assert(c != NULL);
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ uint32_t srv_idx= 0;
+ int ret_sfd= 0;
+ uint32_t srv_conn_cnt= 0;
+ struct timeval cur_time;
+
+ assert(c != NULL);
+
+ if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
+ {
+ return EXIT_SUCCESS;
+ }
+
+ for (uint32_t i= 0; i < c->total_sfds; i++)
+ {
+ if (c->tcpsfd[i] == 0)
+ {
+ gettimeofday(&cur_time, NULL);
+
+ /**
+ * For failover test of replication, reconnect the socks after
+ * it disconnects more than 5 seconds, Otherwise memslap will
+ * block at connect() function and the work threads can't work
+ * in this interval.
+ */
+ if (cur_time.tv_sec
+ - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
+ {
+ break;
+ }
+
+ if (ms_setting.rep_write_srv > 0)
+ {
+ srv_idx= i % ms_setting.srv_cnt;
+ srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
+ }
+ else
+ {
+ srv_idx= ms_thread->thread_ctx->srv_idx;
+ srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
+ }
+
+ if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
+ ms_setting.servers[srv_idx].srv_port,
+ ms_setting.udp, &ret_sfd) == 0)
+ {
+ c->tcpsfd[i]= ret_sfd;
+ c->alive_sfds++;
- if (c->total_sfds == 1 || c->total_sfds == c->alive_sfds) {
- return 0;
- }
-
- for (int i = 0; i < c->total_sfds; i++) {
- if (c->tcpsfd[i] == 0) {
- gettimeofday(&cur_time, NULL);
-
- /**
- * For failover test of replication, reconnect the socks after
- * it disconnects more than 5 seconds, Otherwise memslap will
- * block at connect() function and the work threads can't work
- * in this interval.
- */
- if (cur_time.tv_sec - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) {
- break;
- }
-
- if (ms_setting.rep_write_srv > 0) {
- srv_idx = i;
- srv_conn_cnt = ms_setting.nconns;
- } else {
- srv_idx = ms_thread.thread_ctx->srv_idx;
- srv_conn_cnt = ms_setting.nconns / ms_setting.srv_cnt;
- }
-
- if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
- ms_setting.servers[srv_idx].srv_port,
- ms_setting.udp, &ret_sfd) == 0) {
-
- c->tcpsfd[i] = ret_sfd;
- c->alive_sfds++;
-
- if (__sync_fetch_and_add(&ms_setting.servers[srv_idx].reconn_cnt, 1)
- % srv_conn_cnt == 0) {
-
- gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
- int reconn_time = (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec -
- ms_setting.servers[srv_idx].disconn_time.tv_sec);
- fprintf(stderr, "Server %s:%d reconnect after %ds\n",
- ms_setting.servers[srv_idx].srv_host_name,
- ms_setting.servers[srv_idx].srv_port, reconn_time);
- }
- }
+ if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+ % (uint32_t)srv_conn_cnt == 0)
+ {
+ gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
+ int reconn_time=
+ (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
+ - ms_setting.servers[srv_idx].disconn_time
+ .tv_sec);
+ fprintf(stderr, "Server %s:%d reconnect after %ds\n",
+ ms_setting.servers[srv_idx].srv_host_name,
+ ms_setting.servers[srv_idx].srv_port, reconn_time);
}
+ }
}
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_reconn_socks */
- return 0;
-}
/**
* Tokenize the command string by replacing whitespace with '\0' and update
*
* @return int, the number of tokens
*/
-static int ms_tokenize_command(char *command, token_t *tokens, const int max_tokens)
+static int ms_tokenize_command(char *command,
+ token_t *tokens,
+ const int max_tokens)
{
- char *s, *e;
- int ntokens = 0;
-
- assert(command != NULL && tokens != NULL && max_tokens > 1);
-
- for (s = e = command; ntokens < max_tokens - 1; ++e) {
- if (*e == ' ') {
- if (s != e) {
- tokens[ntokens].value = s;
- tokens[ntokens].length = (size_t)(e - s);
- ntokens++;
- *e = '\0';
- }
- s = e + 1;
- } else if (*e == '\0') {
- if (s != e) {
- tokens[ntokens].value = s;
- tokens[ntokens].length = (size_t)(e - s);
- ntokens++;
- }
-
- break; /* string end */
- }
+ char *s, *e;
+ int ntokens= 0;
+
+ assert(command != NULL && tokens != NULL && max_tokens > 1);
+
+ for (s= e= command; ntokens < max_tokens - 1; ++e)
+ {
+ if (*e == ' ')
+ {
+ if (s != e)
+ {
+ tokens[ntokens].value= s;
+ tokens[ntokens].length= (size_t)(e - s);
+ ntokens++;
+ *e= '\0';
+ }
+ s= e + 1;
}
+ else if (*e == '\0')
+ {
+ if (s != e)
+ {
+ tokens[ntokens].value= s;
+ tokens[ntokens].length= (size_t)(e - s);
+ ntokens++;
+ }
+
+ break; /* string end */
+ }
+ }
+
+ return ntokens;
+} /* ms_tokenize_command */
- return ntokens;
-}
/**
* parse the response of server.
* @param c, pointer of the concurrency
* @param command, the string responded by server
*
- * @return int, if the command completed return 0, else return
+ * @return int, if the command completed return EXIT_SUCCESS, else return
* -1
*/
static int ms_ascii_process_line(ms_conn_t *c, char *command)
{
- int ret = 0;
- int64_t value_len;
- char *buffer = command;
+ int ret= 0;
+ int64_t value_len;
+ char *buffer= command;
+
+ assert(c != NULL);
+
+ /**
+ * for command get, we store the returned value into local buffer
+ * then continue in ms_complete_nread().
+ */
- assert(c != NULL);
+ switch (buffer[0])
+ {
+ case 'V': /* VALUE || VERSION */
+ if (buffer[1] == 'A') /* VALUE */
+ {
+ token_t tokens[MAX_TOKENS];
+ ms_tokenize_command(command, tokens, MAX_TOKENS);
+ value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
+ c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
+
+ /*
+ * We read the \r\n into the string since not doing so is more
+ * cycles then the waster of memory to do so.
+ *
+ * We are null terminating through, which will most likely make
+ * some people lazy about using the return length.
+ */
+ c->rvbytes= (int)(value_len + 2);
+ c->readval= true;
+ ret= -1;
+ }
- /**
- * for command get, we store the returned value into local buffer
- * then continue in ms_complete_nread().
- */
+ break;
- switch (buffer[0]) {
- case 'V': /* VALUE || VERSION */
- if (buffer[1] == 'A') { /* VALUE */
- token_t tokens[MAX_TOKENS];
- ms_tokenize_command(command, tokens, MAX_TOKENS);
- value_len = strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
- c->currcmd.key_prefix = *(uint64_t *)tokens[KEY_TOKEN].value;
-
- /*
- We read the \r\n into the string since not doing so is more
- cycles then the waster of memory to do so.
-
- We are null terminating through, which will most likely make
- some people lazy about using the return length.
- */
- c->rvbytes = (int)(value_len + 2);
- c->readval = true;
- ret = -1;
- }
+ case 'O': /* OK */
+ c->currcmd.retstat= MCD_SUCCESS;
- break;
+ case 'S': /* STORED STATS SERVER_ERROR */
+ if (buffer[2] == 'A') /* STORED STATS */
+ { /* STATS*/
+ c->currcmd.retstat= MCD_STAT;
+ }
+ else if (buffer[1] == 'E')
+ {
+ /* SERVER_ERROR */
+ printf("<%d %s\n", c->sfd, buffer);
- case 'O': /* OK */
- c->currcmd.retstat = MCD_SUCCESS;
-
- case 'S': /* STORED STATS SERVER_ERROR */
- if (buffer[2] == 'A') {/* STORED STATS */
- /* STATS*/
- c->currcmd.retstat = MCD_STAT;
- } else if (buffer[1] == 'E') {
- /* SERVER_ERROR */
- printf("<%d %s\n", c->sfd, buffer);
-
- c->currcmd.retstat = MCD_SERVER_ERROR;
- } else if (buffer[1] == 'T') {
- /* STORED */
- c->currcmd.retstat = MCD_STORED;
- } else {
- c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
- }
- break;
+ c->currcmd.retstat= MCD_SERVER_ERROR;
+ }
+ else if (buffer[1] == 'T')
+ {
+ /* STORED */
+ c->currcmd.retstat= MCD_STORED;
+ }
+ else
+ {
+ c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
+ }
+ break;
- case 'D': /* DELETED DATA */
- if (buffer[1] == 'E') {
- c->currcmd.retstat = MCD_DELETED;
- } else {
- c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
- }
+ case 'D': /* DELETED DATA */
+ if (buffer[1] == 'E')
+ {
+ c->currcmd.retstat= MCD_DELETED;
+ }
+ else
+ {
+ c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
+ }
- break;
+ break;
- case 'N': /* NOT_FOUND NOT_STORED*/
- if (buffer[4] == 'F') {
- c->currcmd.retstat = MCD_NOTFOUND;
- } else if (buffer[4] == 'S') {
- printf("<%d %s\n", c->sfd, buffer);
- c->currcmd.retstat = MCD_NOTSTORED;
- } else {
- c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
- }
- break;
+ case 'N': /* NOT_FOUND NOT_STORED*/
+ if (buffer[4] == 'F')
+ {
+ c->currcmd.retstat= MCD_NOTFOUND;
+ }
+ else if (buffer[4] == 'S')
+ {
+ printf("<%d %s\n", c->sfd, buffer);
+ c->currcmd.retstat= MCD_NOTSTORED;
+ }
+ else
+ {
+ c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
+ }
+ break;
- case 'E': /* PROTOCOL ERROR or END */
- if (buffer[1] == 'N') {
- /* END */
- c->currcmd.retstat = MCD_END;
- } else if (buffer[1] == 'R') {
- printf("<%d ERROR\n", c->sfd);
- c->currcmd.retstat = MCD_PROTOCOL_ERROR;
- } else if (buffer[1] == 'X') {
- c->currcmd.retstat = MCD_DATA_EXISTS;
- printf("<%d %s\n", c->sfd, buffer);
- } else {
- c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
- }
- break;
+ case 'E': /* PROTOCOL ERROR or END */
+ if (buffer[1] == 'N')
+ {
+ /* END */
+ c->currcmd.retstat= MCD_END;
+ }
+ else if (buffer[1] == 'R')
+ {
+ printf("<%d ERROR\n", c->sfd);
+ c->currcmd.retstat= MCD_PROTOCOL_ERROR;
+ }
+ else if (buffer[1] == 'X')
+ {
+ c->currcmd.retstat= MCD_DATA_EXISTS;
+ printf("<%d %s\n", c->sfd, buffer);
+ }
+ else
+ {
+ c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
+ }
+ break;
- case 'C': /* CLIENT ERROR */
- printf("<%d %s\n", c->sfd, buffer);
- c->currcmd.retstat = MCD_CLIENT_ERROR;
- break;
+ case 'C': /* CLIENT ERROR */
+ printf("<%d %s\n", c->sfd, buffer);
+ c->currcmd.retstat= MCD_CLIENT_ERROR;
+ break;
- default:
- c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
- break;
- }
+ default:
+ c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
+ break;
+ } /* switch */
+
+ return ret;
+} /* ms_ascii_process_line */
- return ret;
-}
/**
* after one operation completes, reset the concurrency
*/
void ms_reset_conn(ms_conn_t *c, bool timeout)
{
- assert(c != NULL);
-
- if (c->udp) {
- if (c->packets > 0 && c->packets < MAX_UDP_PACKET) {
- memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
- }
-
- c->packets = 0;
- c->recvpkt = 0;
- c->pktcurr = 0;
- c->ordcurr = 0;
- c->rudpbytes = 0;
- }
- c->currcmd.isfinish = true;
- c->ctnwrite = false;
- c->rbytes = 0;
- c->rcurr = c->rbuf;
- ms_conn_set_state(c, conn_write);
- memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
+ assert(c != NULL);
+
+ if (c->udp)
+ {
+ if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
+ {
+ memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
+ }
+
+ c->packets= 0;
+ c->recvpkt= 0;
+ c->pktcurr= 0;
+ c->ordcurr= 0;
+ c->rudpbytes= 0;
+ }
+ c->currcmd.isfinish= true;
+ c->ctnwrite= false;
+ c->rbytes= 0;
+ c->rcurr= c->rbuf;
+ c->msgcurr = 0;
+ c->msgused = 0;
+ c->iovused = 0;
+ ms_conn_set_state(c, conn_write);
+ memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
+
+ if (timeout)
+ {
+ ms_drive_machine(c);
+ }
+} /* ms_reset_conn */
- if (timeout) {
- ms_drive_machine(c);
- }
-}
/**
* if we have a complete line in the buffer, process it.
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_try_read_line(ms_conn_t *c)
{
- if (c->protocol == binary_prot) {
- /* Do we have the complete packet header? */
- if ((uint64_t)c->rbytes < sizeof(c->binary_header)) {
- /* need more data! */
- return 0;
- } else {
+ if (c->protocol == binary_prot)
+ {
+ /* Do we have the complete packet header? */
+ if ((uint64_t)c->rbytes < sizeof(c->binary_header))
+ {
+ /* need more data! */
+ return EXIT_SUCCESS;
+ }
+ else
+ {
#ifdef NEED_ALIGN
- if (((long)(c->rcurr)) % 8 != 0) {
- /* must realign input buffer */
- memmove(c->rbuf, c->rcurr, c->rbytes);
- c->rcurr = c->rbuf;
- if (settings.verbose) {
- fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
- }
- }
+ if (((long)(c->rcurr)) % 8 != 0)
+ {
+ /* must realign input buffer */
+ memmove(c->rbuf, c->rcurr, c->rbytes);
+ c->rcurr= c->rbuf;
+ if (settings.verbose)
+ {
+ fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
+ }
+ }
#endif
- protocol_binary_response_header* rsp;
- rsp = (protocol_binary_response_header*)c->rcurr;
-
- c->binary_header = *rsp;
- c->binary_header.response.extlen = rsp->response.extlen;
- c->binary_header.response.keylen = ntohl(rsp->response.keylen);
- c->binary_header.response.bodylen = ntohl(rsp->response.bodylen);
- c->binary_header.response.status = ntohl(rsp->response.status);
-
- if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) {
- fprintf(stderr, "Invalid magic: %x\n",
- c->binary_header.response.magic);
- ms_conn_set_state(c, conn_closing);
- return 0;
- }
-
- /* process this complete response */
- if (ms_bin_process_response(c) == 0) {
- /* current operation completed */
- ms_reset_conn(c, false);
- return -1;
- } else {
- c->rbytes -= (int32_t)sizeof(c->binary_header);
- c->rcurr += sizeof(c->binary_header);
- }
- }
- } else {
- char *el, *cont;
-
- assert(c != NULL);
- assert(c->rcurr <= (c->rbuf + c->rsize));
-
- if (c->rbytes == 0)
- return 0;
- el = memchr(c->rcurr, '\n', (size_t)c->rbytes);
- if (!el)
- return 0;
- cont = el + 1;
- if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
- el--;
- }
- *el = '\0';
-
- assert(cont <= (c->rcurr + c->rbytes));
-
- /* process this complete line */
- if (ms_ascii_process_line(c, c->rcurr) == 0) {
- /* current operation completed */
- ms_reset_conn(c, false);
- return -1;
- } else {
- /* current operation didn't complete */
- c->rbytes -= (int32_t)(cont - c->rcurr);
- c->rcurr = cont;
- }
+ protocol_binary_response_header *rsp;
+ rsp= (protocol_binary_response_header *)c->rcurr;
+
+ c->binary_header= *rsp;
+ c->binary_header.response.extlen= rsp->response.extlen;
+ c->binary_header.response.keylen= ntohs(rsp->response.keylen);
+ c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
+ c->binary_header.response.status= ntohs(rsp->response.status);
+
+ if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
+ {
+ fprintf(stderr, "Invalid magic: %x\n",
+ c->binary_header.response.magic);
+ ms_conn_set_state(c, conn_closing);
+ return EXIT_SUCCESS;
+ }
+
+ /* process this complete response */
+ if (ms_bin_process_response(c) == 0)
+ {
+ /* current operation completed */
+ ms_reset_conn(c, false);
+ return -1;
+ }
+ else
+ {
+ c->rbytes-= (int32_t)sizeof(c->binary_header);
+ c->rcurr+= sizeof(c->binary_header);
+ }
+ }
+ }
+ else
+ {
+ char *el, *cont;
+
+ assert(c != NULL);
+ assert(c->rcurr <= (c->rbuf + c->rsize));
+
+ if (c->rbytes == 0)
+ return EXIT_SUCCESS;
- assert(c->rcurr <= (c->rbuf + c->rsize));
+ el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
+ if (! el)
+ return EXIT_SUCCESS;
+
+ cont= el + 1;
+ if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
+ {
+ el--;
}
+ *el= '\0';
+
+ assert(cont <= (c->rcurr + c->rbytes));
+
+ /* process this complete line */
+ if (ms_ascii_process_line(c, c->rcurr) == 0)
+ {
+ /* current operation completed */
+ ms_reset_conn(c, false);
+ return -1;
+ }
+ else
+ {
+ /* current operation didn't complete */
+ c->rbytes-= (int32_t)(cont - c->rcurr);
+ c->rcurr= cont;
+ }
+
+ assert(c->rcurr <= (c->rbuf + c->rsize));
+ }
+
+ return -1;
+} /* ms_try_read_line */
- return -1;
-}
/**
* because the packet of UDP can't ensure the order, the
*/
static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
{
- int len = 0;
- int wbytes = 0;
- uint16_t req_id = 0;
- uint16_t seq_num = 0;
- uint16_t packets = 0;
- unsigned char *header = NULL;
-
- /* no enough data */
- assert(c != NULL);
- assert(buf != NULL);
- assert(c->rudpbytes >= UDP_HEADER_SIZE);
-
- /* calculate received packets count */
- if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) {
- /* the last packet has some data */
- c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
- } else {
- c->recvpkt = c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
- }
-
- /* get the total packets count if necessary */
- if (c->packets == 0) {
- c->packets = HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
- }
-
- /* build the ordered packet array */
- for (int i = c->pktcurr; i < c->recvpkt; i++) {
- header = (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
- req_id = (uint16_t)HEADER_TO_REQID(header);
- assert(req_id == c->request_id % (1 << 16));
-
- packets = (uint16_t)HEADER_TO_PACKETS(header);
- assert(c->packets == HEADER_TO_PACKETS(header));
-
- seq_num = (uint16_t)HEADER_TO_SEQNUM(header);
- c->udppkt[seq_num].header = header;
- c->udppkt[seq_num].data = (char *)header + UDP_HEADER_SIZE;
-
- if (i == c->recvpkt - 1) {
- /* last received packet */
- if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) {
- c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
- c->pktcurr++;
- } else {
- c->udppkt[seq_num].rbytes = c->rudpbytes % UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
- }
- } else {
- c->udppkt[seq_num].rbytes = UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
- c->pktcurr++;
- }
- }
+ int len= 0;
+ int wbytes= 0;
+ uint16_t req_id= 0;
+ uint16_t seq_num= 0;
+ uint16_t packets= 0;
+ unsigned char *header= NULL;
+
+ /* no enough data */
+ assert(c != NULL);
+ assert(buf != NULL);
+ assert(c->rudpbytes >= UDP_HEADER_SIZE);
+
+ /* calculate received packets count */
+ if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
+ {
+ /* the last packet has some data */
+ c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
+ }
+ else
+ {
+ c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
+ }
+
+ /* get the total packets count if necessary */
+ if (c->packets == 0)
+ {
+ c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
+ }
+
+ /* build the ordered packet array */
+ for (int i= c->pktcurr; i < c->recvpkt; i++)
+ {
+ header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
+ req_id= (uint16_t)HEADER_TO_REQID(header);
+ assert(req_id == c->request_id % (1 << 16));
+
+ packets= (uint16_t)HEADER_TO_PACKETS(header);
+ assert(c->packets == HEADER_TO_PACKETS(header));
+
+ seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
+ c->udppkt[seq_num].header= header;
+ c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
+
+ if (i == c->recvpkt - 1)
+ {
+ /* last received packet */
+ if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
+ {
+ c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
+ c->pktcurr++;
+ }
+ else
+ {
+ c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
+ - UDP_HEADER_SIZE;
+ }
+ }
+ else
+ {
+ c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
+ c->pktcurr++;
+ }
+ }
+
+ for (int i= c->ordcurr; i < c->recvpkt; i++)
+ {
+ /* there is some data to copy */
+ if ((c->udppkt[i].data != NULL)
+ && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
+ {
+ header= c->udppkt[i].header;
+ len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
+ if (len > rbytes - wbytes)
+ {
+ len= rbytes - wbytes;
+ }
+
+ assert(len <= rbytes - wbytes);
+ assert(i == HEADER_TO_SEQNUM(header));
+
+ memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
+ (size_t)len);
+ wbytes+= len;
+ c->udppkt[i].copybytes+= len;
+
+ if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
+ && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
+ {
+ /* finish copying all the data of this packet, next */
+ c->ordcurr++;
+ }
+
+ /* last received packet, and finish copying all the data */
+ if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
+ && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
+ {
+ break;
+ }
- for (int i = c->ordcurr; i < c->recvpkt; i++) {
- /* there is some data to copy */
- if (c->udppkt[i].data != NULL && c->udppkt[i].copybytes < c->udppkt[i].rbytes) {
- header = c->udppkt[i].header;
- len = c->udppkt[i].rbytes - c->udppkt[i].copybytes;
- if (len > rbytes - wbytes) {
- len = rbytes - wbytes;
- }
-
- assert(len <= rbytes - wbytes);
- assert(i == HEADER_TO_SEQNUM(header));
-
- memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, (size_t)len);
- wbytes += len;
- c->udppkt[i].copybytes += len;
-
- if (c->udppkt[i].copybytes == c->udppkt[i].rbytes
- && c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE) {
- /* finish copying all the data of this packet, next */
- c->ordcurr++;
- }
-
- /* last received packet, and finish copying all the data */
- if (c->recvpkt == c->packets && i == c->recvpkt - 1
- && c->udppkt[i].copybytes == c->udppkt[i].rbytes) {
- break;
- }
-
- /* no space to copy data */
- if (wbytes >= rbytes) {
- break;
- }
-
- /* it doesn't finish reading all the data of the packet from network */
- if (i != c->recvpkt - 1
- && c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE) {
- break;
- }
- } else {
- /* no data to copy */
- break;
- }
+ /* no space to copy data */
+ if (wbytes >= rbytes)
+ {
+ break;
+ }
+
+ /* it doesn't finish reading all the data of the packet from network */
+ if ((i != c->recvpkt - 1)
+ && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
+ {
+ break;
+ }
+ }
+ else
+ {
+ /* no data to copy */
+ break;
}
+ }
+
+ return wbytes == 0 ? -1 : wbytes;
+} /* ms_sort_udp_packet */
- return(wbytes == 0 ? -1 : wbytes);
-}
/**
* encapsulate upd read like tcp read
*/
static int ms_udp_read(ms_conn_t *c, char *buf, int len)
{
- int res = 0;
- int avail = 0;
- int rbytes = 0;
- int copybytes = 0;
-
- assert(c->udp);
-
- while (1) {
- if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) {
- char *new_rbuf = realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
- if (!new_rbuf) {
- fprintf(stderr, "Couldn't realloc input buffer.\n");
- c->rudpbytes = 0; /* ignore what we read */
- return -1;
- }
- c->rudpbuf = new_rbuf;
- c->rudpsize *= 2;
- }
-
- avail = c->rudpsize - c->rudpbytes;
- /* UDP each time read a packet, 1400 bytes */
- res = (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
-
- if (res > 0) {
- __sync_fetch_and_add(&ms_stats.bytes_read, res);
- c->rudpbytes += res;
- rbytes += res;
- if (res == avail) {
- continue;
- } else {
- break;
- }
- }
-
- if (res == 0) {
- /* "connection" closed */
- return res;
- }
-
- if (res == -1) {
- /* no data to read */
- return res;
- }
+ int res= 0;
+ int avail= 0;
+ int rbytes= 0;
+ int copybytes= 0;
+
+ assert(c->udp);
+
+ while (1)
+ {
+ if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
+ {
+ char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
+ if (! new_rbuf)
+ {
+ fprintf(stderr, "Couldn't realloc input buffer.\n");
+ c->rudpbytes= 0; /* ignore what we read */
+ return -1;
+ }
+ c->rudpbuf= new_rbuf;
+ c->rudpsize*= 2;
+ }
+
+ avail= c->rudpsize - c->rudpbytes;
+ /* UDP each time read a packet, 1400 bytes */
+ res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
+
+ if (res > 0)
+ {
+ atomic_add_size(&ms_stats.bytes_read, res);
+ c->rudpbytes+= res;
+ rbytes+= res;
+ if (res == avail)
+ {
+ continue;
+ }
+ else
+ {
+ break;
+ }
}
- /* copy data to read buffer */
- if (rbytes > 0) {
- copybytes = ms_sort_udp_packet(c, buf, len);
+ if (res == 0)
+ {
+ /* "connection" closed */
+ return res;
}
- if (copybytes == -1) {
- __sync_fetch_and_add(&ms_stats.pkt_disorder, 1);
+ if (res == -1)
+ {
+ /* no data to read */
+ return res;
}
+ }
+
+ /* copy data to read buffer */
+ if (rbytes > 0)
+ {
+ copybytes= ms_sort_udp_packet(c, buf, len);
+ }
+
+ if (copybytes == -1)
+ {
+ atomic_add_size(&ms_stats.pkt_disorder, 1);
+ }
+
+ return copybytes;
+} /* ms_udp_read */
- return copybytes;
-}
/*
* read from network as much as we can, handle buffer overflow and connection
* close.
* before reading, move the remaining incomplete fragment of a command
* (if any) to the beginning of the buffer.
- * return 0 if there's nothing to read on the first read.
+ * return EXIT_SUCCESS if there's nothing to read on the first read.
*/
+
/**
* read from network as much as we can, handle buffer overflow and connection
* close. before reading, move the remaining incomplete fragment of a command
* @param c, pointer of the concurrency
*
* @return int,
- * return 0 if there's nothing to read on the first read.
- * return 1 if get data
+ * return EXIT_SUCCESS if there's nothing to read on the first read.
+ * return EXIT_FAILURE if get data
* return -1 if error happens
*/
static int ms_try_read_network(ms_conn_t *c)
{
- int gotdata = 0;
- int res;
- int64_t avail;
-
- assert(c != NULL);
-
- if (c->rcurr != c->rbuf &&
- (!c->readval || c->rvbytes > c->rsize - (c->rcurr - c->rbuf)
- || (c->readval && c->rcurr - c->rbuf > c->rbytes))) {
- if (c->rbytes != 0) /* otherwise there's nothing to copy */
- memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
- c->rcurr = c->rbuf;
- }
-
- while (1) {
- if (c->rbytes >= c->rsize) {
- char *new_rbuf = realloc(c->rbuf, (size_t)c->rsize * 2);
- if (!new_rbuf) {
- fprintf(stderr, "Couldn't realloc input buffer.\n");
- c->rbytes = 0; /* ignore what we read */
- return -1;
- }
- c->rcurr = c->rbuf = new_rbuf;
- c->rsize *= 2;
- }
+ int gotdata= 0;
+ int res;
+ int64_t avail;
+
+ assert(c != NULL);
+
+ if ((c->rcurr != c->rbuf)
+ && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
+ || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
+ {
+ if (c->rbytes != 0) /* otherwise there's nothing to copy */
+ memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
+ c->rcurr= c->rbuf;
+ }
+
+ while (1)
+ {
+ if (c->rbytes >= c->rsize)
+ {
+ char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
+ if (! new_rbuf)
+ {
+ fprintf(stderr, "Couldn't realloc input buffer.\n");
+ c->rbytes= 0; /* ignore what we read */
+ return -1;
+ }
+ c->rcurr= c->rbuf= new_rbuf;
+ c->rsize*= 2;
+ }
- avail = c->rsize - c->rbytes - (c->rcurr - c->rbuf);
- if (avail == 0) {
- break;
- }
+ avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
+ if (avail == 0)
+ {
+ break;
+ }
- if (c->udp) {
- res = (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
- } else {
- res = (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
- }
-
- if (res > 0) {
- if (!c->udp) {
- __sync_fetch_and_add(&ms_stats.bytes_read, res);
- }
- gotdata = 1;
- c->rbytes += res;
- if (res == avail) {
- continue;
- } else {
- break;
- }
- }
- if (res == 0) {
- /* connection closed */
- ms_conn_set_state(c, conn_closing);
- return -1;
- }
- if (res == -1) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) break;
- /* Should close on unhandled errors. */
- ms_conn_set_state(c, conn_closing);
- return -1;
- }
+ if (c->udp)
+ {
+ res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
+ }
+ else
+ {
+ res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
+ }
+
+ if (res > 0)
+ {
+ if (! c->udp)
+ {
+ atomic_add_size(&ms_stats.bytes_read, res);
+ }
+ gotdata= 1;
+ c->rbytes+= res;
+ if (res == avail)
+ {
+ continue;
+ }
+ else
+ {
+ break;
+ }
}
+ if (res == 0)
+ {
+ /* connection closed */
+ ms_conn_set_state(c, conn_closing);
+ return -1;
+ }
+ if (res == -1)
+ {
+ if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+ break;
+ /* Should close on unhandled errors. */
+ ms_conn_set_state(c, conn_closing);
+ return -1;
+ }
+ }
+
+ return gotdata;
+} /* ms_try_read_network */
- return gotdata;
-}
/**
* after get the object from server, verify the value if
* @param value, received value string
* @param vlen, received value string length
*/
-static void ms_verify_value(ms_conn_t *c, ms_mlget_task_item_t *mlget_item,
- char *value, int vlen)
+static void ms_verify_value(ms_conn_t *c,
+ ms_mlget_task_item_t *mlget_item,
+ char *value,
+ int vlen)
{
- if (c->curr_task.verify) {
- assert(c->curr_task.item->value_offset != INVALID_OFFSET);
- char *orignval = &ms_setting.char_block[c->curr_task.item->value_offset];
- char *orignkey = &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
-
- /* verify expire time if necessary */
- if (c->curr_task.item->exp_time > 0) {
- struct timeval curr_time;
- gettimeofday(&curr_time, NULL);
-
- /* object expired but get it now */
- if (curr_time.tv_sec - c->curr_task.item->client_time
- > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) {
- __sync_fetch_and_add(&ms_stats.exp_get, 1);
-
- if (ms_setting.verbose) {
- char set_time[64];
- char cur_time[64];
- strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
- localtime(&c->curr_task.item->client_time));
- strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
- localtime(&curr_time.tv_sec));
- fprintf(stderr, "\n<%d expire time verification failed, "
- "object expired but get it now\n"
- "\tkey len: %d\n"
- "\tkey: %lx %.*s\n"
- "\tset time: %s current time: %s "
- "diff time: %d expire time: %d\n"
- "\texpected data: \n"
- "\treceived data len: %d\n"
- "\treceived data: %.*s\n",
- c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix,
- c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
- orignkey, set_time, cur_time,
- (int)(curr_time.tv_sec - c->curr_task.item->client_time),
- c->curr_task.item->exp_time,
- vlen, vlen, value);
- fflush(stderr);
- }
- }
- } else {
- if (c->curr_task.item->value_size != vlen
- || memcmp(orignval, value, (size_t)vlen) != 0) {
- __sync_fetch_and_add(&ms_stats.vef_failed, 1);
-
- if (ms_setting.verbose) {
- fprintf(stderr, "\n<%d data verification failed\n"
- "\tkey len: %d\n"
- "\tkey: %lx %.*s\n"
- "\texpected data len: %d\n"
- "\texpected data: %.*s\n"
- "\treceived data len: %d\n"
- "\treceived data: %.*s\n",
- c->sfd, c->curr_task.item->key_size, c->curr_task.item->key_prefix,
- c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
- orignkey, c->curr_task.item->value_size,
- c->curr_task.item->value_size,
- orignval, vlen, vlen, value);
- fflush(stderr);
- }
- }
- }
+ if (c->curr_task.verify)
+ {
+ assert(c->curr_task.item->value_offset != INVALID_OFFSET);
+ char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
+ char *orignkey=
+ &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
+
+ /* verify expire time if necessary */
+ if (c->curr_task.item->exp_time > 0)
+ {
+ struct timeval curr_time;
+ gettimeofday(&curr_time, NULL);
+
+ /* object expired but get it now */
+ if (curr_time.tv_sec - c->curr_task.item->client_time
+ > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
+ {
+ atomic_add_size(&ms_stats.exp_get, 1);
+
+ if (ms_setting.verbose)
+ {
+ char set_time[64];
+ char cur_time[64];
+ strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
+ localtime(&c->curr_task.item->client_time));
+ strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
+ localtime(&curr_time.tv_sec));
+ fprintf(stderr,
+ "\n<%d expire time verification failed, "
+ "object expired but get it now\n"
+ "\tkey len: %d\n"
+ "\tkey: %" PRIx64 " %.*s\n"
+ "\tset time: %s current time: %s "
+ "diff time: %d expire time: %d\n"
+ "\texpected data: \n"
+ "\treceived data len: %d\n"
+ "\treceived data: %.*s\n",
+ c->sfd,
+ c->curr_task.item->key_size,
+ c->curr_task.item->key_prefix,
+ c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
+ orignkey,
+ set_time,
+ cur_time,
+ (int)(curr_time.tv_sec - c->curr_task.item->client_time),
+ c->curr_task.item->exp_time,
+ vlen,
+ vlen,
+ value);
+ fflush(stderr);
+ }
+ }
+ }
+ else
+ {
+ if ((c->curr_task.item->value_size != vlen)
+ || (memcmp(orignval, value, (size_t)vlen) != 0))
+ {
+ atomic_add_size(&ms_stats.vef_failed, 1);
+
+ if (ms_setting.verbose)
+ {
+ fprintf(stderr,
+ "\n<%d data verification failed\n"
+ "\tkey len: %d\n"
+ "\tkey: %" PRIx64" %.*s\n"
+ "\texpected data len: %d\n"
+ "\texpected data: %.*s\n"
+ "\treceived data len: %d\n"
+ "\treceived data: %.*s\n",
+ c->sfd,
+ c->curr_task.item->key_size,
+ c->curr_task.item->key_prefix,
+ c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
+ orignkey,
+ c->curr_task.item->value_size,
+ c->curr_task.item->value_size,
+ orignval,
+ vlen,
+ vlen,
+ value);
+ fflush(stderr);
+ }
+ }
+ }
+
+ c->curr_task.finish_verify= true;
+
+ if (mlget_item != NULL)
+ {
+ mlget_item->finish_verify= true;
+ }
+ }
+} /* ms_verify_value */
- c->curr_task.finish_verify = true;
-
- if (mlget_item != NULL) {
- mlget_item->finish_verify = true;
- }
- }
-}
/**
* For ASCII protocol, after store the data into the local
*/
static void ms_ascii_complete_nread(ms_conn_t *c)
{
- assert(c != NULL);
- assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
- if (c->rvbytes > 2) {
- assert(c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
- }
-
- /* multi-get */
- ms_mlget_task_item_t *mlget_item = NULL;
- if ((ms_setting.mult_key_num > 1 &&
- c->mlget_task.mlget_num >= ms_setting.mult_key_num) ||
- (c->remain_exec_num == 0 && c->mlget_task.mlget_num > 0)) {
-
- c->mlget_task.value_index++;
- mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index];
-
- if (mlget_item->item->key_prefix == c->currcmd.key_prefix) {
- c->curr_task.item = mlget_item->item;
- c->curr_task.verify = mlget_item->verify;
- c->curr_task.finish_verify = mlget_item->finish_verify;
- mlget_item->get_miss = false;
- } else {
- /* Try to find the task item in multi-get task array */
- for (int i = 0; i < c->mlget_task.mlget_num; i++) {
- mlget_item = &c->mlget_task.mlget_item[i];
- if (mlget_item->item->key_prefix == c->currcmd.key_prefix) {
-
- c->curr_task.item = mlget_item->item;
- c->curr_task.verify = mlget_item->verify;
- c->curr_task.finish_verify = mlget_item->finish_verify;
- mlget_item->get_miss = false;
-
- break;
- }
- }
- }
- }
-
- ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
+ assert(c != NULL);
+ assert(c->rbytes >= c->rvbytes);
+ assert(c->protocol == ascii_prot);
+ if (c->rvbytes > 2)
+ {
+ assert(
+ c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
+ }
+
+ /* multi-get */
+ ms_mlget_task_item_t *mlget_item= NULL;
+ if (((ms_setting.mult_key_num > 1)
+ && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
+ || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
+ {
+ c->mlget_task.value_index++;
+ mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
+
+ if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
+ {
+ c->curr_task.item= mlget_item->item;
+ c->curr_task.verify= mlget_item->verify;
+ c->curr_task.finish_verify= mlget_item->finish_verify;
+ mlget_item->get_miss= false;
+ }
+ else
+ {
+ /* Try to find the task item in multi-get task array */
+ for (int i= 0; i < c->mlget_task.mlget_num; i++)
+ {
+ mlget_item= &c->mlget_task.mlget_item[i];
+ if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
+ {
+ c->curr_task.item= mlget_item->item;
+ c->curr_task.verify= mlget_item->verify;
+ c->curr_task.finish_verify= mlget_item->finish_verify;
+ mlget_item->get_miss= false;
+
+ break;
+ }
+ }
+ }
+ }
+
+ ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
+
+ c->curr_task.get_miss= false;
+ c->rbytes-= c->rvbytes;
+ c->rcurr= c->rcurr + c->rvbytes;
+ assert(c->rcurr <= (c->rbuf + c->rsize));
+ c->readval= false;
+ c->rvbytes= 0;
+} /* ms_ascii_complete_nread */
- c->curr_task.get_miss = false;
- c->rbytes -= c->rvbytes;
- c->rcurr = c->rcurr + c->rvbytes;
- assert(c->rcurr <= (c->rbuf + c->rsize));
- c->readval = false;
- c->rvbytes = 0;
-}
/**
* For binary protocol, after store the data into the local
*/
static void ms_bin_complete_nread(ms_conn_t *c)
{
- assert(c != NULL);
- assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == binary_prot);
-
- int extlen = c->binary_header.response.extlen;
- int keylen = c->binary_header.response.keylen;
- uint8_t opcode = c->binary_header.response.opcode;
-
- /* not get command or not include value, just return */
- if ((opcode != PROTOCOL_BINARY_CMD_GET && opcode != PROTOCOL_BINARY_CMD_GETQ) ||
- c->rvbytes <= extlen + keylen) {
- /* get miss */
- if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) {
- c->currcmd.retstat = MCD_END;
- c->curr_task.get_miss = true;
- }
-
- c->readval = false;
- c->rvbytes = 0;
- ms_reset_conn(c, false);
- return;
- }
-
- /* multi-get */
- ms_mlget_task_item_t *mlget_item = NULL;
- if ((ms_setting.mult_key_num > 1 &&
- c->mlget_task.mlget_num >= ms_setting.mult_key_num) ||
- (c->remain_exec_num == 0 && c->mlget_task.mlget_num > 0)) {
-
- c->mlget_task.value_index++;
- mlget_item = &c->mlget_task.mlget_item[c->mlget_task.value_index];
-
- c->curr_task.item = mlget_item->item;
- c->curr_task.verify = mlget_item->verify;
- c->curr_task.finish_verify = mlget_item->finish_verify;
- mlget_item->get_miss = false;
- }
-
- ms_verify_value(c, mlget_item, c->rcurr + extlen + keylen, c->rvbytes - extlen - keylen);
-
- c->currcmd.retstat = MCD_END;
- c->curr_task.get_miss = false;
- c->rbytes -= c->rvbytes;
- c->rcurr = c->rcurr + c->rvbytes;
- assert(c->rcurr <= (c->rbuf + c->rsize));
- c->readval = false;
- c->rvbytes = 0;
+ assert(c != NULL);
+ assert(c->rbytes >= c->rvbytes);
+ assert(c->protocol == binary_prot);
+
+ int extlen= c->binary_header.response.extlen;
+ int keylen= c->binary_header.response.keylen;
+ uint8_t opcode= c->binary_header.response.opcode;
+
+ /* not get command or not include value, just return */
+ if (((opcode != PROTOCOL_BINARY_CMD_GET)
+ && (opcode != PROTOCOL_BINARY_CMD_GETQ))
+ || (c->rvbytes <= extlen + keylen))
+ {
+ /* get miss */
+ if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
+ {
+ c->currcmd.retstat= MCD_END;
+ c->curr_task.get_miss= true;
+ }
+
+ c->readval= false;
+ c->rvbytes= 0;
+ ms_reset_conn(c, false);
+ return;
+ }
+
+ /* multi-get */
+ ms_mlget_task_item_t *mlget_item= NULL;
+ if (((ms_setting.mult_key_num > 1)
+ && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
+ || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
+ {
+ c->mlget_task.value_index++;
+ mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
+
+ c->curr_task.item= mlget_item->item;
+ c->curr_task.verify= mlget_item->verify;
+ c->curr_task.finish_verify= mlget_item->finish_verify;
+ mlget_item->get_miss= false;
+ }
+
+ ms_verify_value(c,
+ mlget_item,
+ c->rcurr + extlen + keylen,
+ c->rvbytes - extlen - keylen);
+
+ c->currcmd.retstat= MCD_END;
+ c->curr_task.get_miss= false;
+ c->rbytes-= c->rvbytes;
+ c->rcurr= c->rcurr + c->rvbytes;
+ assert(c->rcurr <= (c->rbuf + c->rsize));
+ c->readval= false;
+ c->rvbytes= 0;
+
+ if (ms_setting.mult_key_num > 1)
+ {
+ /* multi-get have check all the item */
+ if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
+ {
+ ms_reset_conn(c, false);
+ }
+ }
+ else
+ {
+ /* single get */
+ ms_reset_conn(c, false);
+ }
+} /* ms_bin_complete_nread */
- if (ms_setting.mult_key_num > 1) {
- /* multi-get have check all the item */
- if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) {
- ms_reset_conn(c, false);
- }
- } else {
- /* single get */
- ms_reset_conn(c, false);
- }
-}
/**
* we get here after reading the value of get commands.
*/
static void ms_complete_nread(ms_conn_t *c)
{
- assert(c != NULL);
- assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == ascii_udp_prot
- || c->protocol == ascii_prot
- || c->protocol == binary_prot);
+ assert(c != NULL);
+ assert(c->rbytes >= c->rvbytes);
+ assert(c->protocol == ascii_prot
+ || c->protocol == binary_prot);
+
+ if (c->protocol == binary_prot)
+ {
+ ms_bin_complete_nread(c);
+ }
+ else
+ {
+ ms_ascii_complete_nread(c);
+ }
+} /* ms_complete_nread */
- if (c->protocol == binary_prot) {
- ms_bin_complete_nread(c);
- } else {
- ms_ascii_complete_nread(c);
- }
-}
/**
* Adds a message header to a connection.
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_add_msghdr(ms_conn_t *c)
{
- struct msghdr *msg;
+ struct msghdr *msg;
- assert(c != NULL);
+ assert(c != NULL);
- if (c->msgsize == c->msgused) {
- msg = realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
- if (! msg)
- return -1;
- c->msglist = msg;
- c->msgsize *= 2;
- }
+ if (c->msgsize == c->msgused)
+ {
+ msg=
+ realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
+ if (! msg)
+ return -1;
- msg = c->msglist + c->msgused;
+ c->msglist= msg;
+ c->msgsize*= 2;
+ }
- /**
- * this wipes msg_iovlen, msg_control, msg_controllen, and
- * msg_flags, the last 3 of which aren't defined on solaris:
- */
- memset(msg, 0, sizeof(struct msghdr));
+ msg= c->msglist + c->msgused;
- msg->msg_iov = &c->iov[c->iovused];
+ /**
+ * this wipes msg_iovlen, msg_control, msg_controllen, and
+ * msg_flags, the last 3 of which aren't defined on solaris:
+ */
+ memset(msg, 0, sizeof(struct msghdr));
- if (c->udp && c->srv_recv_addr_size > 0) {
- msg->msg_name = &c->srv_recv_addr;
- msg->msg_namelen = c->srv_recv_addr_size;
- }
+ msg->msg_iov= &c->iov[c->iovused];
- c->msgbytes = 0;
- c->msgused++;
+ if (c->udp && (c->srv_recv_addr_size > 0))
+ {
+ msg->msg_name= &c->srv_recv_addr;
+ msg->msg_namelen= c->srv_recv_addr_size;
+ }
- if (c->udp) {
- /* Leave room for the UDP header, which we'll fill in later. */
- return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
- }
+ c->msgbytes= 0;
+ c->msgused++;
+
+ if (c->udp)
+ {
+ /* Leave room for the UDP header, which we'll fill in later. */
+ return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_add_msghdr */
- return 0;
-}
/**
* Ensures that there is room for another structure iovec in a connection's
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_ensure_iov_space(ms_conn_t *c)
{
- assert(c != NULL);
+ assert(c != NULL);
- if (c->iovused >= c->iovsize) {
- int i, iovnum;
- struct iovec *new_iov = (struct iovec *)realloc(c->iov,
- ((uint64_t)c->iovsize * 2) * sizeof(struct iovec));
- if (! new_iov)
- return -1;
- c->iov = new_iov;
- c->iovsize *= 2;
-
- /* Point all the msghdr structures at the new list. */
- for (i = 0, iovnum = 0; i < c->msgused; i++) {
- c->msglist[i].msg_iov = &c->iov[iovnum];
- iovnum += (int)c->msglist[i].msg_iovlen;
- }
+ if (c->iovused >= c->iovsize)
+ {
+ int i, iovnum;
+ struct iovec *new_iov= (struct iovec *)realloc(c->iov,
+ ((size_t)c->iovsize
+ * 2)
+ * sizeof(struct iovec));
+ if (! new_iov)
+ return -1;
+
+ c->iov= new_iov;
+ c->iovsize*= 2;
+
+ /* Point all the msghdr structures at the new list. */
+ for (i= 0, iovnum= 0; i < c->msgused; i++)
+ {
+ c->msglist[i].msg_iov= &c->iov[iovnum];
+ iovnum+= (int)c->msglist[i].msg_iovlen;
}
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_ensure_iov_space */
- return 0;
-}
/**
* Adds data to the list of pending data that will be written out to a
* @param buf, the buffer includes data to send
* @param len, the data length in the buffer
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
{
- struct msghdr *m;
- int leftover;
- bool limit_to_mtu;
+ struct msghdr *m;
+ int leftover;
+ bool limit_to_mtu;
- assert(c != NULL);
+ assert(c != NULL);
- do {
- m = &c->msglist[c->msgused - 1];
+ do
+ {
+ m= &c->msglist[c->msgused - 1];
- /*
- * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
- */
- limit_to_mtu = c->udp;
+ /*
+ * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
+ */
+ limit_to_mtu= c->udp;
- /* We may need to start a new msghdr if this one is full. */
- if (m->msg_iovlen == IOV_MAX ||
- (limit_to_mtu && c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)) {
- ms_add_msghdr(c);
- m = &c->msglist[c->msgused - 1];
- }
+#ifdef IOV_MAX
+ /* We may need to start a new msghdr if this one is full. */
+ if ((m->msg_iovlen == IOV_MAX)
+ || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
+ {
+ ms_add_msghdr(c);
+ m= &c->msglist[c->msgused - 1];
+ }
+#endif
- if (ms_ensure_iov_space(c) != 0)
- return -1;
+ if (ms_ensure_iov_space(c) != 0)
+ return -1;
- /* If the fragment is too big to fit in the datagram, split it up */
- if (limit_to_mtu && len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE) {
- leftover = len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
- len -= leftover;
- } else {
- leftover = 0;
- }
+ /* If the fragment is too big to fit in the datagram, split it up */
+ if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
+ {
+ leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
+ len-= leftover;
+ }
+ else
+ {
+ leftover= 0;
+ }
- m = &c->msglist[c->msgused - 1];
- m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
- m->msg_iov[m->msg_iovlen].iov_len = (size_t)len;
+ m= &c->msglist[c->msgused - 1];
+ m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
+ m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
- c->msgbytes += len;
- c->iovused++;
- m->msg_iovlen++;
+ c->msgbytes+= len;
+ c->iovused++;
+ m->msg_iovlen++;
- buf = ((char *)buf) + len;
- len = leftover;
- } while (leftover > 0);
+ buf= ((char *)buf) + len;
+ len= leftover;
+ }
+ while (leftover > 0);
+
+ return EXIT_SUCCESS;
+} /* ms_add_iov */
- return 0;
-}
/**
* Constructs a set of UDP headers and attaches them to the outgoing messages.
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_udp_headers(ms_conn_t *c)
{
- int i;
- unsigned char *hdr;
-
- assert(c != NULL);
-
- c->request_id = ms_get_udp_request_id();
-
- if (c->msgused > c->hdrsize) {
- void *new_hdrbuf;
- if (c->hdrbuf)
- new_hdrbuf = realloc(c->hdrbuf, (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
- else
- new_hdrbuf = malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
- if (! new_hdrbuf)
- return -1;
- c->hdrbuf = (unsigned char *)new_hdrbuf;
- c->hdrsize = c->msgused * 2;
- }
-
- /* If this is a multi-packet request, drop it. */
- if (c->udp && c->msgused > 1) {
- fprintf(stderr, "multi-packet request for UDP not supported.\n");
- return -1;
- }
-
- hdr = c->hdrbuf;
- for (i = 0; i < c->msgused; i++) {
- c->msglist[i].msg_iov[0].iov_base = hdr;
- c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
- *hdr++ = (unsigned char)(c->request_id / 256);
- *hdr++ = (unsigned char)(c->request_id % 256);
- *hdr++ = (unsigned char)(i / 256);
- *hdr++ = (unsigned char)(i % 256);
- *hdr++ = (unsigned char)(c->msgused / 256);
- *hdr++ = (unsigned char)(c->msgused % 256);
- *hdr++ = (unsigned char)1; /* support facebook memcached */
- *hdr++ = (unsigned char)0;
- assert(hdr == ((unsigned char *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE));
- }
+ int i;
+ unsigned char *hdr;
+
+ assert(c != NULL);
+
+ c->request_id= ms_get_udp_request_id();
+
+ if (c->msgused > c->hdrsize)
+ {
+ void *new_hdrbuf;
+ if (c->hdrbuf)
+ new_hdrbuf= realloc(c->hdrbuf,
+ (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
+ else
+ new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
+ if (! new_hdrbuf)
+ return -1;
+
+ c->hdrbuf= (unsigned char *)new_hdrbuf;
+ c->hdrsize= c->msgused * 2;
+ }
+
+ /* If this is a multi-packet request, drop it. */
+ if (c->udp && (c->msgused > 1))
+ {
+ fprintf(stderr, "multi-packet request for UDP not supported.\n");
+ return -1;
+ }
+
+ hdr= c->hdrbuf;
+ for (i= 0; i < c->msgused; i++)
+ {
+ c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
+ c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
+ *hdr++= (unsigned char)(c->request_id / 256);
+ *hdr++= (unsigned char)(c->request_id % 256);
+ *hdr++= (unsigned char)(i / 256);
+ *hdr++= (unsigned char)(i % 256);
+ *hdr++= (unsigned char)(c->msgused / 256);
+ *hdr++= (unsigned char)(c->msgused % 256);
+ *hdr++= (unsigned char)1; /* support facebook memcached */
+ *hdr++= (unsigned char)0;
+ assert(hdr ==
+ ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
+ + UDP_HEADER_SIZE));
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_build_udp_headers */
- return 0;
-}
/**
* Transmit the next chunk of data from our list of msgbuf structures.
*/
static int ms_transmit(ms_conn_t *c)
{
- assert(c != NULL);
-
- if (c->msgcurr < c->msgused &&
- c->msglist[c->msgcurr].msg_iovlen == 0) {
- /* Finished writing the current msg; advance to the next. */
- c->msgcurr++;
- }
-
- if (c->msgcurr < c->msgused) {
- ssize_t res;
- struct msghdr *m = &c->msglist[c->msgcurr];
-
- res = sendmsg(c->sfd, m, 0);
- if (res > 0) {
- __sync_fetch_and_add(&ms_stats.bytes_written, res);
-
- /* We've written some of the data. Remove the completed
- iovec entries from the list of pending writes. */
- while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len) {
- res -= (ssize_t)m->msg_iov->iov_len;
- m->msg_iovlen--;
- m->msg_iov++;
- }
-
- /* Might have written just part of the last iovec entry;
- adjust it so the next write will do the rest. */
- if (res > 0) {
- m->msg_iov->iov_base = (unsigned char *)m->msg_iov->iov_base + res;
- m->msg_iov->iov_len -= (uint64_t)res;
- }
- return TRANSMIT_INCOMPLETE;
- }
- if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
- fprintf(stderr, "Couldn't update event.\n");
- ms_conn_set_state(c, conn_closing);
- return TRANSMIT_HARD_ERROR;
- }
- return TRANSMIT_SOFT_ERROR;
- }
-
- /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
- we have a real error, on which we close the connection */
- fprintf(stderr, "Failed to write, and not due to blocking.\n");
-
+ assert(c != NULL);
+
+ if ((c->msgcurr < c->msgused)
+ && (c->msglist[c->msgcurr].msg_iovlen == 0))
+ {
+ /* Finished writing the current msg; advance to the next. */
+ c->msgcurr++;
+ }
+
+ if (c->msgcurr < c->msgused)
+ {
+ ssize_t res;
+ struct msghdr *m= &c->msglist[c->msgcurr];
+
+ res= sendmsg(c->sfd, m, 0);
+ if (res > 0)
+ {
+ atomic_add_size(&ms_stats.bytes_written, res);
+
+ /* We've written some of the data. Remove the completed
+ * iovec entries from the list of pending writes. */
+ while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
+ {
+ res-= (ssize_t)m->msg_iov->iov_len;
+ m->msg_iovlen--;
+ m->msg_iov++;
+ }
+
+ /* Might have written just part of the last iovec entry;
+ * adjust it so the next write will do the rest. */
+ if (res > 0)
+ {
+ m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
+ m->msg_iov->iov_len-= (size_t)res;
+ }
+ return TRANSMIT_INCOMPLETE;
+ }
+ if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
+ {
+ if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
+ {
+ fprintf(stderr, "Couldn't update event.\n");
ms_conn_set_state(c, conn_closing);
return TRANSMIT_HARD_ERROR;
- } else {
- return TRANSMIT_COMPLETE;
+ }
+ return TRANSMIT_SOFT_ERROR;
}
-}
+
+ /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
+ * we have a real error, on which we close the connection */
+ fprintf(stderr, "Failed to write, and not due to blocking.\n");
+
+ ms_conn_set_state(c, conn_closing);
+ return TRANSMIT_HARD_ERROR;
+ }
+ else
+ {
+ return TRANSMIT_COMPLETE;
+ }
+} /* ms_transmit */
+
/**
* Shrinks a connection's buffers if they're too big. This prevents
*/
static void ms_conn_shrink(ms_conn_t *c)
{
- assert(c != NULL);
-
- if (c->udp)
- return;
-
- if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
- char *newbuf;
+ assert(c != NULL);
- if (c->rcurr != c->rbuf)
- memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
-
- newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
-
- if (newbuf) {
- c->rbuf = newbuf;
- c->rsize = DATA_BUFFER_SIZE;
- }
- c->rcurr = c->rbuf;
- }
+ if (c->udp)
+ return;
- if (c->udp && c->rudpsize > UDP_DATA_BUFFER_HIGHWAT
- && c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE) {
- char *new_rbuf = (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
- if (!new_rbuf) {
- c->rudpbuf = new_rbuf;
- c->rudpsize = UDP_DATA_BUFFER_SIZE;
- }
- /* TODO check error condition? */
- }
+ if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
+ {
+ char *newbuf;
+
+ if (c->rcurr != c->rbuf)
+ memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
+
+ newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
+
+ if (newbuf)
+ {
+ c->rbuf= newbuf;
+ c->rsize= DATA_BUFFER_SIZE;
+ }
+ c->rcurr= c->rbuf;
+ }
+
+ if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
+ && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
+ {
+ char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
+ if (! new_rbuf)
+ {
+ c->rudpbuf= new_rbuf;
+ c->rudpsize= UDP_DATA_BUFFER_SIZE;
+ }
+ /* TODO check error condition? */
+ }
+
+ if (c->msgsize > MSG_LIST_HIGHWAT)
+ {
+ struct msghdr *newbuf= (struct msghdr *)realloc(
+ (void *)c->msglist,
+ MSG_LIST_INITIAL
+ * sizeof(c->msglist[0]));
+ if (newbuf)
+ {
+ c->msglist= newbuf;
+ c->msgsize= MSG_LIST_INITIAL;
+ }
+ /* TODO check error condition? */
+ }
+
+ if (c->iovsize > IOV_LIST_HIGHWAT)
+ {
+ struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
+ IOV_LIST_INITIAL
+ * sizeof(c->iov[0]));
+ if (newbuf)
+ {
+ c->iov= newbuf;
+ c->iovsize= IOV_LIST_INITIAL;
+ }
+ /* TODO check return value */
+ }
+} /* ms_conn_shrink */
- if (c->msgsize > MSG_LIST_HIGHWAT) {
- struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist,
- MSG_LIST_INITIAL * sizeof(c->msglist[0]));
- if (newbuf) {
- c->msglist = newbuf;
- c->msgsize = MSG_LIST_INITIAL;
- }
- /* TODO check error condition? */
- }
-
- if (c->iovsize > IOV_LIST_HIGHWAT) {
- struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov,
- IOV_LIST_INITIAL * sizeof(c->iov[0]));
- if (newbuf) {
- c->iov = newbuf;
- c->iovsize = IOV_LIST_INITIAL;
- }
- /* TODO check return value */
- }
-}
/**
* Sets a connection's current state in the state machine. Any special
*/
static void ms_conn_set_state(ms_conn_t *c, int state)
{
- assert(c != NULL);
+ assert(c != NULL);
- if (state != c->state) {
- if (state == conn_read) {
- ms_conn_shrink(c);
- }
- c->state = state;
+ if (state != c->state)
+ {
+ if (state == conn_read)
+ {
+ ms_conn_shrink(c);
}
-}
+ c->state= state;
+ }
+} /* ms_conn_set_state */
+
/**
* update the event if socks change state. for example: when
*/
static bool ms_update_event(ms_conn_t *c, const int new_flags)
{
- /* default event timeout 10 seconds */
- struct timeval t = {.tv_sec = EVENT_TIMEOUT, .tv_usec = 0};
-
- assert(c != NULL);
+ assert(c != NULL);
- struct event_base *base = c->event.ev_base;
- if (c->ev_flags == new_flags && ms_setting.rep_write_srv == 0
- && (!ms_setting.facebook_test || c->total_sfds == 1)) {
- return true;
- }
-
- if (event_del(&c->event) == -1) {
- /* try to delete the event again */
- if (event_del(&c->event) == -1) {
- return false;
- }
- }
+ struct event_base *base= c->event.ev_base;
+ if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
+ && (! ms_setting.facebook_test || (c->total_sfds == 1)))
+ {
+ return true;
+ }
+
+ if (event_del(&c->event) == -1)
+ {
+ /* try to delete the event again */
+ if (event_del(&c->event) == -1)
+ {
+ return false;
+ }
+ }
+
+ event_set(&c->event,
+ c->sfd,
+ (short)new_flags,
+ ms_event_handler,
+ (void *)c);
+ event_base_set(base, &c->event);
+ c->ev_flags= (short)new_flags;
+
+ if (event_add(&c->event, NULL) == -1)
+ {
+ return false;
+ }
- event_set(&c->event, c->sfd, (short)new_flags, ms_event_handler, (void *)c);
- event_base_set(base, &c->event);
- c->ev_flags = (short)new_flags;
+ return true;
+} /* ms_update_event */
- if (c->total_sfds == 1) {
- if (event_add(&c->event, NULL) == -1) {
- return false;
- }
- } else {
- if (event_add(&c->event, &t) == -1) {
- return false;
- }
- }
-
- return true;
-}
/**
* If user want to get the expected throughput, we could limit
*/
static bool ms_need_yield(ms_conn_t *c)
{
- int64_t tps = 0;
- int64_t time_diff = 0;
- struct timeval curr_time;
- ms_task_t *task = &c->curr_task;
-
- if (ms_setting.expected_tps > 0) {
- gettimeofday(&curr_time, NULL);
- time_diff = ms_time_diff(&ms_thread.startup_time, &curr_time);
- tps = (int64_t)((task->get_opt + task->set_opt) / ((uint64_t)time_diff / 1000000));
-
- /* current throughput is greater than expected throughput */
- if (tps > ms_thread.thread_ctx->tps_perconn) {
- return true;
- }
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ int64_t tps= 0;
+ int64_t time_diff= 0;
+ struct timeval curr_time;
+ ms_task_t *task= &c->curr_task;
+
+ if (ms_setting.expected_tps > 0)
+ {
+ gettimeofday(&curr_time, NULL);
+ time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
+ tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000);
+
+ /* current throughput is greater than expected throughput */
+ if (tps > ms_thread->thread_ctx->tps_perconn)
+ {
+ return true;
}
+ }
+
+ return false;
+} /* ms_need_yield */
- return false;
-}
/**
* used to update the start time of each operation
*/
static void ms_update_start_time(ms_conn_t *c)
{
- ms_task_item_t *item = c->curr_task.item;
-
- if (ms_setting.stat_freq > 0 || c->udp
- || (c->currcmd.cmd == CMD_SET && item->exp_time > 0)) {
+ ms_task_item_t *item= c->curr_task.item;
- gettimeofday(&c->start_time, NULL);
- if (c->currcmd.cmd == CMD_SET && item->exp_time > 0) {
- /* record the current time */
- item->client_time = c->start_time.tv_sec;
- }
+ if ((ms_setting.stat_freq > 0) || c->udp
+ || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
+ {
+ gettimeofday(&c->start_time, NULL);
+ if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
+ {
+ /* record the current time */
+ item->client_time= c->start_time.tv_sec;
}
-}
+ }
+} /* ms_update_start_time */
+
/**
* run the state machine
*/
static void ms_drive_machine(ms_conn_t *c)
{
- bool stop = false;
+ bool stop= false;
+
+ assert(c != NULL);
+
+ while (! stop)
+ {
+ switch (c->state)
+ {
+ case conn_read:
+ if (c->readval)
+ {
+ if (c->rbytes >= c->rvbytes)
+ {
+ ms_complete_nread(c);
+ break;
+ }
+ }
+ else
+ {
+ if (ms_try_read_line(c) != 0)
+ {
+ break;
+ }
+ }
+
+ if (ms_try_read_network(c) != 0)
+ {
+ break;
+ }
+
+ /* doesn't read all the response data, wait event wake up */
+ if (! c->currcmd.isfinish)
+ {
+ if (! ms_update_event(c, EV_READ | EV_PERSIST))
+ {
+ fprintf(stderr, "Couldn't update event.\n");
+ ms_conn_set_state(c, conn_closing);
+ break;
+ }
+ stop= true;
+ break;
+ }
- assert(c != NULL);
+ /* we have no command line and no data to read from network, next write */
+ ms_conn_set_state(c, conn_write);
+ memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
- while (!stop) {
- switch (c->state) {
- case conn_read:
- if (c->readval) {
- if (c->rbytes >= c->rvbytes) {
- ms_complete_nread(c);
- break;
- }
- } else {
- if (ms_try_read_line(c) != 0) {
- break;
- }
- }
-
- if (ms_try_read_network(c) != 0) {
- break;
- }
-
- /* doesn't read all the response data, wait event wake up */
- if (!c->currcmd.isfinish) {
- if (!ms_update_event(c, EV_READ | EV_PERSIST)) {
- fprintf(stderr, "Couldn't update event.\n");
- ms_conn_set_state(c, conn_closing);
- break;
- }
- stop = true;
- break;
- }
-
- /* we have no command line and no data to read from network, next write */
- ms_conn_set_state(c, conn_write);
- memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
+ break;
- break;
+ case conn_write:
+ if (! c->ctnwrite && ms_need_yield(c))
+ {
+ usleep(10);
- case conn_write:
- if (!c->ctnwrite && ms_need_yield(c)) {
- usleep(10);
-
- if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
- fprintf(stderr, "Couldn't update event.\n");
- ms_conn_set_state(c, conn_closing);
- break;
- }
- stop = true;
- break;
- }
-
- if (!c->ctnwrite && ms_exec_task(c) != 0) {
- ms_conn_set_state(c, conn_closing);
- break;
- }
-
- /* record the start time before starting to send data if necessary */
- if (!c->ctnwrite || (c->change_sfd && c->ctnwrite)) {
- if (c->change_sfd) {
- c->change_sfd = false;
- }
- ms_update_start_time(c);
- }
-
- /* change sfd if necessary */
- if (c->change_sfd) {
- c->ctnwrite = true;
- stop = true;
- break;
- }
-
- /* execute task until nothing need be written to network */
- if (!c->ctnwrite && c->msgcurr == c->msgused) {
- if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
- fprintf(stderr, "Couldn't update event.\n");
- ms_conn_set_state(c, conn_closing);
- break;
- }
- stop = true;
- break;
- }
-
- switch (ms_transmit(c)) {
- case TRANSMIT_COMPLETE:
- /* we have no data to write to network, next wait repose */
- if (!ms_update_event(c, EV_READ | EV_PERSIST)) {
- fprintf(stderr, "Couldn't update event.\n");
- ms_conn_set_state(c, conn_closing);
- c->ctnwrite = false;
- break;
- }
- ms_conn_set_state(c, conn_read);
- c->ctnwrite = false;
- stop = true;
- break;
-
- case TRANSMIT_INCOMPLETE:
- c->ctnwrite = true;
- break; /* Continue in state machine. */
-
- case TRANSMIT_HARD_ERROR:
- c->ctnwrite = false;
- break;
-
- case TRANSMIT_SOFT_ERROR:
- c->ctnwrite = true;
- stop = true;
- break;
- default:
- break;
- }
- break;
+ if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
+ {
+ fprintf(stderr, "Couldn't update event.\n");
+ ms_conn_set_state(c, conn_closing);
+ break;
+ }
+ stop= true;
+ break;
+ }
+
+ if (! c->ctnwrite && (ms_exec_task(c) != 0))
+ {
+ ms_conn_set_state(c, conn_closing);
+ break;
+ }
+
+ /* record the start time before starting to send data if necessary */
+ if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
+ {
+ if (c->change_sfd)
+ {
+ c->change_sfd= false;
+ }
+ ms_update_start_time(c);
+ }
+
+ /* change sfd if necessary */
+ if (c->change_sfd)
+ {
+ c->ctnwrite= true;
+ stop= true;
+ break;
+ }
+
+ /* execute task until nothing need be written to network */
+ if (! c->ctnwrite && (c->msgcurr == c->msgused))
+ {
+ if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
+ {
+ fprintf(stderr, "Couldn't update event.\n");
+ ms_conn_set_state(c, conn_closing);
+ break;
+ }
+ stop= true;
+ break;
+ }
+
+ switch (ms_transmit(c))
+ {
+ case TRANSMIT_COMPLETE:
+ /* we have no data to write to network, next wait repose */
+ if (! ms_update_event(c, EV_READ | EV_PERSIST))
+ {
+ fprintf(stderr, "Couldn't update event.\n");
+ ms_conn_set_state(c, conn_closing);
+ c->ctnwrite= false;
+ break;
+ }
+ ms_conn_set_state(c, conn_read);
+ c->ctnwrite= false;
+ stop= true;
+ break;
+
+ case TRANSMIT_INCOMPLETE:
+ c->ctnwrite= true;
+ break; /* Continue in state machine. */
+
+ case TRANSMIT_HARD_ERROR:
+ c->ctnwrite= false;
+ break;
+
+ case TRANSMIT_SOFT_ERROR:
+ c->ctnwrite= true;
+ stop= true;
+ break;
+
+ default:
+ break;
+ } /* switch */
- case conn_closing:
- /* recovery mode, need reconnect if connection close */
- if (ms_setting.reconnect && (!ms_global.time_out
- || (ms_setting.run_time == 0 && c->remain_exec_num > 0))) {
-
- if (ms_reconn(c) != 0) {
- ms_conn_close(c);
- stop = true;
- break;
- }
-
- ms_reset_conn(c, false);
-
- if (c->total_sfds == 1) {
- if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
- fprintf(stderr, "Couldn't update event.\n");
- ms_conn_set_state(c, conn_closing);
- break;
- }
- }
-
- break;
- } else {
- ms_conn_close(c);
- stop = true;
- break;
- }
- default:
- assert(0);
+ break;
+
+ case conn_closing:
+ /* recovery mode, need reconnect if connection close */
+ if (ms_setting.reconnect && (! ms_global.time_out
+ || ((ms_setting.run_time == 0)
+ && (c->remain_exec_num > 0))))
+ {
+ if (ms_reconn(c) != 0)
+ {
+ ms_conn_close(c);
+ stop= true;
+ break;
}
- }
- return;
-}
+ ms_reset_conn(c, false);
+
+ if (c->total_sfds == 1)
+ {
+ if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
+ {
+ fprintf(stderr, "Couldn't update event.\n");
+ ms_conn_set_state(c, conn_closing);
+ break;
+ }
+ }
+
+ break;
+ }
+ else
+ {
+ ms_conn_close(c);
+ stop= true;
+ break;
+ }
+
+ default:
+ assert(0);
+ } /* switch */
+ }
+} /* ms_drive_machine */
+
/**
* the event handler of each thread
*/
void ms_event_handler(const int fd, const short which, void *arg)
{
- ms_conn_t *c = (ms_conn_t *)arg;
- assert(c != NULL);
+ ms_conn_t *c= (ms_conn_t *)arg;
- c->which = which;
+ assert(c != NULL);
- /* sanity */
- if (fd != c->sfd) {
- fprintf(stderr, "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
- fd, c->sfd);
- ms_conn_close(c);
- exit(1);
- }
- assert(fd == c->sfd);
+ c->which= which;
- /* event timeout, close the current connection */
- if (c->which == EV_TIMEOUT) {
- ms_conn_set_state(c, conn_closing);
- }
+ /* sanity */
+ if (fd != c->sfd)
+ {
+ fprintf(stderr,
+ "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
+ fd,
+ c->sfd);
+ ms_conn_close(c);
+ exit(1);
+ }
+ assert(fd == c->sfd);
- ms_drive_machine(c);
+ ms_drive_machine(c);
+
+ /* wait for next event */
+} /* ms_event_handler */
- /* wait for next event */
- return;
-}
/**
* get the next socket descriptor index to run for replication
* @param c, pointer of the concurrency
* @param cmd, command(get or set )
*
- * @return int, if success, return the index, else return 0
+ * @return int, if success, return the index, else return EXIT_SUCCESS
*/
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
{
- int sock_index = -1;
- int i = 0;
-
- if (c->total_sfds == 1) {
- return 0;
- }
-
- if (ms_setting.rep_write_srv == 0) {
- return sock_index;
- }
-
- do {
- if (cmd == CMD_SET) {
- for (i = 0; i < ms_setting.rep_write_srv; i++) {
- if (c->tcpsfd[i] > 0) {
- break;
- }
- }
-
- if (i == ms_setting.rep_write_srv) {
- /* random get one replication server to read */
- sock_index = (int)(random() % c->total_sfds);
- } else {
- /* random get one replication writing server to write */
- sock_index = (int)(random() % ms_setting.rep_write_srv);
- }
- } else if (cmd == CMD_GET) {
- /* random get one replication server to read */
- sock_index = (int)(random() % c->total_sfds);
- }
- } while (c->tcpsfd[sock_index] == 0);
+ uint32_t sock_index= 0;
+ uint32_t i= 0;
+ if (c->total_sfds == 1)
+ {
+ return EXIT_SUCCESS;
+ }
+
+ if (ms_setting.rep_write_srv == 0)
+ {
return sock_index;
-}
+ }
+
+ do
+ {
+ if (cmd == CMD_SET)
+ {
+ for (i= 0; i < ms_setting.rep_write_srv; i++)
+ {
+ if (c->tcpsfd[i] > 0)
+ {
+ break;
+ }
+ }
+
+ if (i == ms_setting.rep_write_srv)
+ {
+ /* random get one replication server to read */
+ sock_index= (uint32_t)random() % c->total_sfds;
+ }
+ else
+ {
+ /* random get one replication writing server to write */
+ sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
+ }
+ }
+ else if (cmd == CMD_GET)
+ {
+ /* random get one replication server to read */
+ sock_index= (uint32_t)random() % c->total_sfds;
+ }
+ }
+ while (c->tcpsfd[sock_index] == 0);
+
+ return sock_index;
+} /* ms_get_rep_sock_index */
+
/**
* get the next socket descriptor index to run
*
* @return int, return the index
*/
-static int ms_get_next_sock_index(ms_conn_t *c)
+static uint32_t ms_get_next_sock_index(ms_conn_t *c)
{
- int sock_index = 0;
+ uint32_t sock_index= 0;
- do {
- sock_index = (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
- } while (c->tcpsfd[sock_index] == 0);
+ do
+ {
+ sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
+ }
+ while (c->tcpsfd[sock_index] == 0);
+
+ return sock_index;
+} /* ms_get_next_sock_index */
- return sock_index;
-}
/**
* update socket event of the connections
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_update_conn_sock_event(ms_conn_t *c)
{
- assert(c != NULL);
-
- switch (c->currcmd.cmd) {
- case CMD_SET:
- if (ms_setting.facebook_test && c->udp) {
- c->sfd = c->tcpsfd[0];
- c->udp = false;
- c->change_sfd = true;
- }
- break;
- case CMD_GET:
- if (ms_setting.facebook_test && !c->udp) {
- c->sfd = c->udpsfd;
- c->udp = true;
- c->change_sfd = true;
- }
- break;
- default:
- break;
- }
+ assert(c != NULL);
+
+ switch (c->currcmd.cmd)
+ {
+ case CMD_SET:
+ if (ms_setting.facebook_test && c->udp)
+ {
+ c->sfd= c->tcpsfd[0];
+ c->udp= false;
+ c->change_sfd= true;
+ }
+ break;
+
+ case CMD_GET:
+ if (ms_setting.facebook_test && ! c->udp)
+ {
+ c->sfd= c->udpsfd;
+ c->udp= true;
+ c->change_sfd= true;
+ }
+ break;
+
+ default:
+ break;
+ } /* switch */
+
+ if (! c->udp && (c->total_sfds > 1))
+ {
+ if (c->cur_idx != c->total_sfds)
+ {
+ if (ms_setting.rep_write_srv == 0)
+ {
+ c->cur_idx= ms_get_next_sock_index(c);
+ }
+ else
+ {
+ c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
+ }
+ }
+ else
+ {
+ /* must select the first sock of the connection at the beginning */
+ c->cur_idx= 0;
+ }
+
+ c->sfd= c->tcpsfd[c->cur_idx];
+ assert(c->sfd != 0);
+ c->change_sfd= true;
+ }
+
+ if (c->change_sfd)
+ {
+ if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
+ {
+ fprintf(stderr, "Couldn't update event.\n");
+ ms_conn_set_state(c, conn_closing);
+ return -1;
+ }
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_update_conn_sock_event */
- if (!c->udp && c->total_sfds > 1) {
- if (c->cur_idx != c->total_sfds) {
- if (ms_setting.rep_write_srv == 0) {
- c->cur_idx = ms_get_next_sock_index(c);
- } else {
- c->cur_idx = ms_get_rep_sock_index(c, c->currcmd.cmd);
- }
- } else {
- /* must select the first sock of the connection at the beginning */
- c->cur_idx = 0;
- }
-
- c->sfd = c->tcpsfd[c->cur_idx];
- assert(c->sfd != 0);
- c->change_sfd = true;
- }
-
- if (c->change_sfd) {
- if (!ms_update_event(c, EV_WRITE | EV_PERSIST)) {
- fprintf(stderr, "Couldn't update event.\n");
- ms_conn_set_state(c, conn_closing);
- return -1;
- }
- }
-
- return 0;
-}
/**
* for ASCII protocol, this function build the set command
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
{
- int value_offset;
- int write_len;
- char *buffer = c->wbuf;
-
- write_len= sprintf(buffer, " %u %d %d\r\n", 0, item->exp_time, item->value_size);
-
- if (write_len > c->wsize) {
- /* ought to be always enough. just fail for simplicity */
- fprintf(stderr, "output command line too long.\n");
- return -1;
- }
-
- if (item->value_offset == INVALID_OFFSET) {
- value_offset = item->key_suffix_offset;
- } else {
- value_offset = item->value_offset;
- }
+ int value_offset;
+ int write_len;
+ char *buffer= c->wbuf;
+
+ write_len= snprintf(buffer,
+ c->wsize,
+ " %u %d %d\r\n",
+ 0,
+ item->exp_time,
+ item->value_size);
+
+ if (write_len > c->wsize || write_len < 0)
+ {
+ /* ought to be always enough. just fail for simplicity */
+ fprintf(stderr, "output command line too long.\n");
+ return -1;
+ }
+
+ if (item->value_offset == INVALID_OFFSET)
+ {
+ value_offset= item->key_suffix_offset;
+ }
+ else
+ {
+ value_offset= item->value_offset;
+ }
+
+ if ((ms_add_iov(c, "set ", 4) != 0)
+ || (ms_add_iov(c, (char *)&item->key_prefix,
+ (int)KEY_PREFIX_SIZE) != 0)
+ || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
+ item->key_size - (int)KEY_PREFIX_SIZE) != 0)
+ || (ms_add_iov(c, buffer, write_len) != 0)
+ || (ms_add_iov(c, &ms_setting.char_block[value_offset],
+ item->value_size) != 0)
+ || (ms_add_iov(c, "\r\n", 2) != 0)
+ || (c->udp && (ms_build_udp_headers(c) != 0)))
+ {
+ return -1;
+ }
- if (ms_add_iov(c, "set ", 4) != 0 ||
- ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 ||
- ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
- item->key_size - (int)KEY_PREFIX_SIZE) != 0 ||
- ms_add_iov(c, buffer, write_len) != 0 ||
- ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size) != 0 ||
- ms_add_iov(c, "\r\n", 2) != 0 ||
- (c->udp && ms_build_udp_headers(c) != 0)) {
+ return EXIT_SUCCESS;
+} /* ms_build_ascii_write_buf_set */
- return -1;
- }
-
- return 0;
-}
/**
* used to send set command to server
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
{
- assert(c != NULL);
+ assert(c != NULL);
- c->currcmd.cmd = CMD_SET;
- c->currcmd.isfinish = false;
- c->currcmd.retstat = MCD_FAILURE;
+ c->currcmd.cmd= CMD_SET;
+ c->currcmd.isfinish= false;
+ c->currcmd.retstat= MCD_FAILURE;
- if (ms_update_conn_sock_event(c) != 0) {
- return -1;
- }
+ if (ms_update_conn_sock_event(c) != 0)
+ {
+ return -1;
+ }
+
+ c->msgcurr= 0;
+ c->msgused= 0;
+ c->iovused= 0;
+ if (ms_add_msghdr(c) != 0)
+ {
+ fprintf(stderr, "Out of memory preparing request.");
+ return -1;
+ }
- c->msgcurr = 0;
- c->msgused = 0;
- c->iovused = 0;
- if (ms_add_msghdr(c) != 0) {
- fprintf(stderr, "Out of memory preparing request.");
- return -1;
+ /* binary protocol */
+ if (c->protocol == binary_prot)
+ {
+ if (ms_build_bin_write_buf_set(c, item) != 0)
+ {
+ return -1;
}
-
- /* binary protocol */
- if (c->protocol == binary_prot) {
- if (ms_build_bin_write_buf_set(c, item) != 0) {
- return -1;
- }
- } else {
- if (ms_build_ascii_write_buf_set(c, item) != 0) {
- return -1;
- }
+ }
+ else
+ {
+ if (ms_build_ascii_write_buf_set(c, item) != 0)
+ {
+ return -1;
}
+ }
- __sync_fetch_and_add(&ms_stats.obj_bytes, item->key_size + item->value_size);
- __sync_fetch_and_add(&ms_stats.cmd_set, 1);
+ atomic_add_size(&ms_stats.obj_bytes,
+ item->key_size + item->value_size);
+ atomic_add_size(&ms_stats.cmd_set, 1);
+
+ return EXIT_SUCCESS;
+} /* ms_mcd_set */
- return 0;
-}
/**
* for ASCII protocol, this function build the get command
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
{
- if (ms_add_iov(c, "get ", 4) != 0 ||
- ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 ||
- ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
- item->key_size - (int)KEY_PREFIX_SIZE) != 0 ||
- ms_add_iov(c, "\r\n", 2) != 0 ||
- (c->udp && ms_build_udp_headers(c) != 0)) {
+ if ((ms_add_iov(c, "get ", 4) != 0)
+ || (ms_add_iov(c, (char *)&item->key_prefix,
+ (int)KEY_PREFIX_SIZE) != 0)
+ || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
+ item->key_size - (int)KEY_PREFIX_SIZE) != 0)
+ || (ms_add_iov(c, "\r\n", 2) != 0)
+ || (c->udp && (ms_build_udp_headers(c) != 0)))
+ {
+ return -1;
+ }
- return -1;
- }
+ return EXIT_SUCCESS;
+} /* ms_build_ascii_write_buf_get */
- return 0;
-}
/**
* used to send the get command to server
* @param c, pointer of the concurrency
* @param item, pointer of task item which includes the object
* information
- * @param verify, whether do verification
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
{
- /* verify not supported yet */
- UNUSED_ARGUMENT(verify);
+ assert(c != NULL);
- assert(c != NULL);
+ c->currcmd.cmd= CMD_GET;
+ c->currcmd.isfinish= false;
+ c->currcmd.retstat= MCD_FAILURE;
- c->currcmd.cmd = CMD_GET;
- c->currcmd.isfinish = false;
- c->currcmd.retstat = MCD_FAILURE;
+ if (ms_update_conn_sock_event(c) != 0)
+ {
+ return -1;
+ }
+
+ c->msgcurr= 0;
+ c->msgused= 0;
+ c->iovused= 0;
+ if (ms_add_msghdr(c) != 0)
+ {
+ fprintf(stderr, "Out of memory preparing request.");
+ return -1;
+ }
- if (ms_update_conn_sock_event(c) != 0) {
- return -1;
+ /* binary protocol */
+ if (c->protocol == binary_prot)
+ {
+ if (ms_build_bin_write_buf_get(c, item) != 0)
+ {
+ return -1;
}
-
- c->msgcurr = 0;
- c->msgused = 0;
- c->iovused = 0;
- if (ms_add_msghdr(c) != 0) {
- fprintf(stderr, "Out of memory preparing request.");
- return -1;
+ }
+ else
+ {
+ if (ms_build_ascii_write_buf_get(c, item) != 0)
+ {
+ return -1;
}
+ }
- /* binary protocol */
- if (c->protocol == binary_prot) {
- if (ms_build_bin_write_buf_get(c, item) != 0) {
- return -1;
- }
- } else {
- if (ms_build_ascii_write_buf_get(c, item) != 0) {
- return -1;
- }
- }
+ atomic_add_size(&ms_stats.cmd_get, 1);
- __sync_fetch_and_add(&ms_stats.cmd_get, 1);
+ return EXIT_SUCCESS;
+} /* ms_mcd_get */
- return 0;
-}
/**
* for ASCII protocol, this function build the multi-get command
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
{
- ms_task_item_t *item;
-
- if (ms_add_iov(c, "get", 3) != 0) {
- return -1;
- }
+ ms_task_item_t *item;
- for (int i = 0; i < c->mlget_task.mlget_num; i++) {
- item = c->mlget_task.mlget_item[i].item;
- assert(item != NULL);
- if (ms_add_iov(c, " ", 1) != 0 ||
- ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE) != 0 ||
- ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
- item->key_size - (int)KEY_PREFIX_SIZE) != 0) {
- return -1;
- }
- }
+ if (ms_add_iov(c, "get", 3) != 0)
+ {
+ return -1;
+ }
+
+ for (int i= 0; i < c->mlget_task.mlget_num; i++)
+ {
+ item= c->mlget_task.mlget_item[i].item;
+ assert(item != NULL);
+ if ((ms_add_iov(c, " ", 1) != 0)
+ || (ms_add_iov(c, (char *)&item->key_prefix,
+ (int)KEY_PREFIX_SIZE) != 0)
+ || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
+ item->key_size - (int)KEY_PREFIX_SIZE) != 0))
+ {
+ return -1;
+ }
+ }
+
+ if ((ms_add_iov(c, "\r\n", 2) != 0)
+ || (c->udp && (ms_build_udp_headers(c) != 0)))
+ {
+ return -1;
+ }
- if (ms_add_iov(c, "\r\n", 2) != 0 ||
- (c->udp && ms_build_udp_headers(c) != 0)) {
- return -1;
- }
+ return EXIT_SUCCESS;
+} /* ms_build_ascii_write_buf_mlget */
- return 0;
-}
/**
* used to send the multi-get command to server
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
int ms_mcd_mlget(ms_conn_t *c)
{
- ms_task_item_t *item;
+ ms_task_item_t *item;
- assert(c != NULL);
- assert(c->mlget_task.mlget_num >= 1);
+ assert(c != NULL);
+ assert(c->mlget_task.mlget_num >= 1);
- c->currcmd.cmd = CMD_GET;
- c->currcmd.isfinish = false;
- c->currcmd.retstat = MCD_FAILURE;
+ c->currcmd.cmd= CMD_GET;
+ c->currcmd.isfinish= false;
+ c->currcmd.retstat= MCD_FAILURE;
- if (ms_update_conn_sock_event(c) != 0) {
- return -1;
- }
+ if (ms_update_conn_sock_event(c) != 0)
+ {
+ return -1;
+ }
+
+ c->msgcurr= 0;
+ c->msgused= 0;
+ c->iovused= 0;
+ if (ms_add_msghdr(c) != 0)
+ {
+ fprintf(stderr, "Out of memory preparing request.");
+ return -1;
+ }
- c->msgcurr = 0;
- c->msgused = 0;
- c->iovused = 0;
- if (ms_add_msghdr(c) != 0) {
- fprintf(stderr, "Out of memory preparing request.");
- return -1;
+ /* binary protocol */
+ if (c->protocol == binary_prot)
+ {
+ if (ms_build_bin_write_buf_mlget(c) != 0)
+ {
+ return -1;
}
-
- /* binary protocol */
- if (c->protocol == binary_prot) {
- if (ms_build_bin_write_buf_mlget(c) != 0) {
- return -1;
- }
- } else {
- if (ms_build_ascii_write_buf_mlget(c) != 0) {
- return -1;
- }
+ }
+ else
+ {
+ if (ms_build_ascii_write_buf_mlget(c) != 0)
+ {
+ return -1;
}
+ }
- /* decrease operation time of each item */
- for (int i = 0; i < c->mlget_task.mlget_num; i++) {
- item = c->mlget_task.mlget_item[i].item;
- __sync_fetch_and_add(&ms_stats.cmd_get, 1);
- }
+ /* decrease operation time of each item */
+ for (int i= 0; i < c->mlget_task.mlget_num; i++)
+ {
+ item= c->mlget_task.mlget_item[i].item;
+ atomic_add_size(&ms_stats.cmd_get, 1);
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_mcd_mlget */
- return 0;
-}
/**
* binary protocol support
*
* @param c, pointer of the concurrency
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_bin_process_response(ms_conn_t *c)
{
- const char *errstr = NULL;
+ const char *errstr= NULL;
+
+ assert(c != NULL);
+
+ uint32_t bodylen= c->binary_header.response.bodylen;
+ uint8_t opcode= c->binary_header.response.opcode;
+ uint16_t status= c->binary_header.response.status;
+
+ if (bodylen > 0)
+ {
+ c->rvbytes= (int32_t)bodylen;
+ c->readval= true;
+ return EXIT_FAILURE;
+ }
+ else
+ {
+ switch (status)
+ {
+ case PROTOCOL_BINARY_RESPONSE_SUCCESS:
+ if (opcode == PROTOCOL_BINARY_CMD_SET)
+ {
+ c->currcmd.retstat= MCD_STORED;
+ }
+ else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
+ {
+ c->currcmd.retstat= MCD_DELETED;
+ }
+ else if (opcode == PROTOCOL_BINARY_CMD_GET)
+ {
+ c->currcmd.retstat= MCD_END;
+ }
+ break;
+
+ case PROTOCOL_BINARY_RESPONSE_ENOMEM:
+ errstr= "Out of memory";
+ c->currcmd.retstat= MCD_SERVER_ERROR;
+ break;
+
+ case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
+ errstr= "Unknown command";
+ c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
+ break;
+
+ case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
+ errstr= "Not found";
+ c->currcmd.retstat= MCD_NOTFOUND;
+ break;
+
+ case PROTOCOL_BINARY_RESPONSE_EINVAL:
+ errstr= "Invalid arguments";
+ c->currcmd.retstat= MCD_PROTOCOL_ERROR;
+ break;
+
+ case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
+ errstr= "Data exists for key.";
+ break;
+
+ case PROTOCOL_BINARY_RESPONSE_E2BIG:
+ errstr= "Too large.";
+ c->currcmd.retstat= MCD_SERVER_ERROR;
+ break;
+
+ case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
+ errstr= "Not stored.";
+ c->currcmd.retstat= MCD_NOTSTORED;
+ break;
- assert(c != NULL);
-
- uint32_t bodylen = c->binary_header.response.bodylen;
- uint8_t opcode = c->binary_header.response.opcode;
- uint16_t status = c->binary_header.response.status;
-
- if (bodylen > 0) {
- c->rvbytes = (int32_t)bodylen;
- c->readval = true;
- return 1;
- } else {
- switch (status) {
- case PROTOCOL_BINARY_RESPONSE_SUCCESS:
- if (opcode == PROTOCOL_BINARY_CMD_SET) {
- c->currcmd.retstat = MCD_STORED;
- } else if (opcode == PROTOCOL_BINARY_CMD_DELETE) {
- c->currcmd.retstat = MCD_DELETED;
- } else if (opcode == PROTOCOL_BINARY_CMD_GET) {
- c->currcmd.retstat = MCD_END;
- }
- break;
- case PROTOCOL_BINARY_RESPONSE_ENOMEM:
- errstr = "Out of memory";
- c->currcmd.retstat = MCD_SERVER_ERROR;
- break;
- case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
- errstr = "Unknown command";
- c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
- break;
- case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
- errstr = "Not found";
- c->currcmd.retstat = MCD_NOTFOUND;
- break;
- case PROTOCOL_BINARY_RESPONSE_EINVAL:
- errstr = "Invalid arguments";
- c->currcmd.retstat = MCD_PROTOCOL_ERROR;
- break;
- case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
- errstr = "Data exists for key.";
- break;
- case PROTOCOL_BINARY_RESPONSE_E2BIG:
- errstr = "Too large.";
- c->currcmd.retstat = MCD_SERVER_ERROR;
- break;
- case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
- errstr = "Not stored.";
- c->currcmd.retstat = MCD_NOTSTORED;
- break;
- default:
- errstr = "Unknown error";
- c->currcmd.retstat = MCD_UNKNOWN_READ_FAILURE;
- break;
- }
+ default:
+ errstr= "Unknown error";
+ c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
+ break;
+ } /* switch */
- if (errstr != NULL) {
- fprintf(stderr, "%s\n", errstr);
- }
+ if (errstr != NULL)
+ {
+ fprintf(stderr, "%s\n", errstr);
}
+ }
+
+ return EXIT_SUCCESS;
+} /* ms_bin_process_response */
- return 0;
-}
/* build binary header and add the header to the buffer to send */
+
/**
* build binary header and add the header to the buffer to send
*
* @param key_len, length of key
* @param body_len. length of body
*/
-static void ms_add_bin_header(ms_conn_t *c, uint8_t opcode, uint8_t hdr_len,
- uint16_t key_len, uint32_t body_len) {
- protocol_binary_request_header* header;
+static void ms_add_bin_header(ms_conn_t *c,
+ uint8_t opcode,
+ uint8_t hdr_len,
+ uint16_t key_len,
+ uint32_t body_len)
+{
+ protocol_binary_request_header *header;
- assert(c != NULL);
+ assert(c != NULL);
- header = (protocol_binary_request_header *)c->wcurr;
+ header= (protocol_binary_request_header *)c->wcurr;
- header->request.magic = (uint8_t)PROTOCOL_BINARY_REQ;
- header->request.opcode = (uint8_t)opcode;
- header->request.keylen = htonl(key_len);
+ header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
+ header->request.opcode= (uint8_t)opcode;
+ header->request.keylen= htons(key_len);
- header->request.extlen = (uint8_t)hdr_len;
- header->request.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
- header->request.reserved = 0;
+ header->request.extlen= (uint8_t)hdr_len;
+ header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
+ header->request.vbucket= 0;
- header->request.bodylen = htonl(body_len);
- header->request.opaque = 0;
- header->request.cas = 0;
+ header->request.bodylen= htonl(body_len);
+ header->request.opaque= 0;
+ header->request.cas= 0;
+
+ ms_add_iov(c, c->wcurr, sizeof(header->request));
+} /* ms_add_bin_header */
- ms_add_iov(c, c->wcurr, sizeof(header->request));
-}
/**
* add the key to the socket write buffer array
*/
static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
{
- ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
- ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
- item->key_size - (int)KEY_PREFIX_SIZE);
+ ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
+ ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
+ item->key_size - (int)KEY_PREFIX_SIZE);
}
+
/**
* for binary protocol, this function build the set command
* and add the command to send buffer array.
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
{
- assert(c->wbuf == c->wcurr);
-
- int value_offset;
- protocol_binary_request_set* rep = (protocol_binary_request_set*)c->wcurr;
- uint16_t keylen = (uint16_t)item->key_size;
- uint32_t bodylen = (uint32_t)sizeof(rep->message.body) + (uint32_t)keylen + (uint32_t)item->value_size;
-
- ms_add_bin_header(c, PROTOCOL_BINARY_CMD_SET, sizeof(rep->message.body), keylen, bodylen);
- rep->message.body.flags = 0;
- rep->message.body.expiration = htonl((uint32_t)item->exp_time);
- ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
- ms_add_key_to_iov(c, item);
-
- if (item->value_offset == INVALID_OFFSET) {
- value_offset = item->key_suffix_offset;
- } else {
- value_offset = item->value_offset;
- }
- ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
+ assert(c->wbuf == c->wcurr);
+
+ int value_offset;
+ protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
+ uint16_t keylen= (uint16_t)item->key_size;
+ uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
+ + (uint32_t)keylen + (uint32_t)item->value_size;
+
+ ms_add_bin_header(c,
+ PROTOCOL_BINARY_CMD_SET,
+ sizeof(rep->message.body),
+ keylen,
+ bodylen);
+ rep->message.body.flags= 0;
+ rep->message.body.expiration= htonl((uint32_t)item->exp_time);
+ ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
+ ms_add_key_to_iov(c, item);
+
+ if (item->value_offset == INVALID_OFFSET)
+ {
+ value_offset= item->key_suffix_offset;
+ }
+ else
+ {
+ value_offset= item->value_offset;
+ }
+ ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
+
+ return EXIT_SUCCESS;
+} /* ms_build_bin_write_buf_set */
- return 0;
-}
/**
* for binary protocol, this function build the get command and
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
{
- assert(c->wbuf == c->wcurr);
+ assert(c->wbuf == c->wcurr);
- ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, (uint32_t)item->key_size);
- ms_add_key_to_iov(c, item);
+ ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
+ (uint32_t)item->key_size);
+ ms_add_key_to_iov(c, item);
+
+ return EXIT_SUCCESS;
+} /* ms_build_bin_write_buf_get */
- return 0;
-}
/**
* for binary protocol, this function build the multi-get
* @param item, pointer of task item which includes the object
* information
*
- * @return int, if success, return 0, else return -1
+ * @return int, if success, return EXIT_SUCCESS, else return -1
*/
static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
{
- ms_task_item_t *item;
+ ms_task_item_t *item;
- assert(c->wbuf == c->wcurr);
+ assert(c->wbuf == c->wcurr);
- for (int i = 0; i < c->mlget_task.mlget_num; i++) {
- item = c->mlget_task.mlget_item[i].item;
- assert(item != NULL);
+ for (int i= 0; i < c->mlget_task.mlget_num; i++)
+ {
+ item= c->mlget_task.mlget_item[i].item;
+ assert(item != NULL);
- ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, (uint32_t)item->key_size);
- ms_add_key_to_iov(c, item);
- c->wcurr += sizeof(protocol_binary_request_get);
- }
-
- c->wcurr = c->wbuf;
+ ms_add_bin_header(c,
+ PROTOCOL_BINARY_CMD_GET,
+ 0,
+ (uint16_t)item->key_size,
+ (uint32_t)item->key_size);
+ ms_add_key_to_iov(c, item);
+ c->wcurr+= sizeof(protocol_binary_request_get);
+ }
- return 0;
-}
+ c->wcurr= c->wbuf;
+ return EXIT_SUCCESS;
+} /* ms_build_bin_write_buf_mlget */