From 0eb2ca0dc8651777c0b6275a450c921ea9b4508d Mon Sep 17 00:00:00 2001 From: Michael Wallner Date: Tue, 15 Sep 2020 19:11:51 +0200 Subject: [PATCH] .gitignore: cleanup --- .gitignore | 33 - src/bin/memaslap/ms_atomic.h | 114 ++ src/bin/memaslap/ms_conn.c | 3427 +++++++++++++++++++++++++++++++++ src/bin/memaslap/ms_conn.h | 241 +++ src/bin/memaslap/ms_memslap.h | 133 ++ src/bin/memaslap/ms_setting.c | 1068 ++++++++++ src/bin/memaslap/ms_setting.h | 181 ++ src/bin/memaslap/ms_sigsegv.c | 128 ++ src/bin/memaslap/ms_sigsegv.h | 34 + src/bin/memaslap/ms_stats.c | 303 +++ src/bin/memaslap/ms_stats.h | 69 + src/bin/memaslap/ms_task.c | 1110 +++++++++++ src/bin/memaslap/ms_task.h | 94 + src/bin/memaslap/ms_thread.c | 349 ++++ src/bin/memaslap/ms_thread.h | 78 + 15 files changed, 7329 insertions(+), 33 deletions(-) create mode 100644 src/bin/memaslap/ms_atomic.h create mode 100644 src/bin/memaslap/ms_conn.c create mode 100644 src/bin/memaslap/ms_conn.h create mode 100644 src/bin/memaslap/ms_memslap.h create mode 100644 src/bin/memaslap/ms_setting.c create mode 100644 src/bin/memaslap/ms_setting.h create mode 100644 src/bin/memaslap/ms_sigsegv.c create mode 100644 src/bin/memaslap/ms_sigsegv.h create mode 100644 src/bin/memaslap/ms_stats.c create mode 100644 src/bin/memaslap/ms_stats.h create mode 100644 src/bin/memaslap/ms_task.c create mode 100644 src/bin/memaslap/ms_task.h create mode 100644 src/bin/memaslap/ms_thread.c create mode 100644 src/bin/memaslap/ms_thread.h diff --git a/.gitignore b/.gitignore index ffe575ca..47dcabb5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,50 +1,17 @@ *~ *.log -*.exe -*.gz -*.lo -*.o *.orig *.output *.pop *.rej -*.rpm -*.tar -*/*.l[oa] -*/*/*.l[oa] -*/*/.deps -*/*/.dirstamp -*/*/.libs -*/.deps -*/.dirstamp -*/.libs -*/Makefile -*/Makefile.in *TAGS *patch -.deps .cproject .idea/ .project .settings/ -src/bin/memaslap -src/bin/memcapable -src/bin/memcat -src/bin/memcp -src/bin/memdump -src/bin/memerror -src/bin/memexist -src/bin/memflush -src/bin/memparse -src/bin/memping -src/bin/memrm -src/bin/memslap -src/bin/memstat -src/bin/memtouch cmake-build-*/ ccmake-*/ -example/memcached_light -example/t/memcached_light tags venv/ /infer-out/ diff --git a/src/bin/memaslap/ms_atomic.h b/src/bin/memaslap/ms_atomic.h new file mode 100644 index 00000000..6ec990c0 --- /dev/null +++ b/src/bin/memaslap/ms_atomic.h @@ -0,0 +1,114 @@ +/* LibMemcached + * Copyright (C) 2006-2009 Brian Aker + * All rights reserved. + * + * Use and distribution licensed under the BSD license. See + * the COPYING file in the parent directory for full text. + * + * Summary: + * + */ + +#ifndef CLIENTS_MS_ATOMIC_H +#define CLIENTS_MS_ATOMIC_H + +#if HAVE_C_STDATOMIC +# define ATOMIC _Atomic +#else +# define ATOMIC volatile +#endif + +#if defined(__SUNPRO_C) +# define _KERNEL +# include +# if SIZEOF_SIZE_T == 8 +# define atomic_add_size(X, Y) atomic_add_64((X), (Y)) +# define atomic_add_size_nv(X, Y) atomic_add_64((X), (Y)) +# define atomic_dec_size(X, Y) atomic_add_64((X), (Y)) +# define atomic_dec_size_nv(X, Y) atomic_add_64((X), (Y)) +# else +# define atomic_add_size(X, Y) atomic_add_32((X), (Y)) +# define atomic_add_size_nv(X, Y) atomic_add_32((X), (Y)) +# define atomic_dec_size(X, Y) atomic_add_32((X), (Y)) +# define atomic_dec_size_nv(X, Y) atomic_add_32((X), (Y)) +# endif +# undef _KERNEL +#elif HAVE_GCC_ATOMIC_BUILTINS +# define atomic_add_8(X, Y) __sync_fetch_and_add((X), (Y)) +# define atomic_add_16(X, Y) __sync_fetch_and_add((X), (Y)) +# define atomic_add_32(X, Y) __sync_fetch_and_add((X), (Y)) +# define atomic_add_size(X, Y) __sync_fetch_and_add((X), (Y)) +# define atomic_dec_8(X) __sync_fetch_and_sub((X), 1) +# define atomic_dec_16(X) __sync_fetch_and_sub((X), 1) +# define atomic_dec_32(X) __sync_fetch_and_sub((X), 1) +# define atomic_dec_size(X) __sync_fetch_and_sub((X), 1) +/* The same as above, but these return the new value instead of void */ +# define atomic_add_8_nv(X, Y) __sync_fetch_and_add((X), (Y)) +# define atomic_add_16_nv(X, Y) __sync_fetch_and_add((X), (Y)) +# define atomic_add_32_nv(X, Y) __sync_fetch_and_add((X), (Y)) +# define atomic_add_size_nv(X, Y) __sync_fetch_and_add((X), (Y)) +# define atomic_dec_8_nv(X) __sync_fetch_and_sub((X), 1) +# define atomic_dec_16_nv(X) __sync_fetch_and_sub((X), 1) +# define atomic_dec_32_nv(X) __sync_fetch_and_sub((X), 1) +# define atomic_dec_size_nv(X) __sync_fetch_and_sub((X), 1) +#elif HAVE_C_STDATOMIC +# include +# define atomic_add_8(X, Y) atomic_fetch_add(X, Y) +# define atomic_add_16(X, Y) atomic_fetch_add(X, Y) +# define atomic_add_32(X, Y) atomic_fetch_add(X, Y) +# define atomic_add_size(X, Y) atomic_fetch_add(X, Y) +# define atomic_dec_8(X) atomic_fetch_sub(X, 1) +# define atomic_dec_16(X) atomic_fetch_sub(X, 1) +# define atomic_dec_32(X) atomic_fetch_sub(X, 1) +# define atomic_dec_size(X) atomic_fetch_sub(X, 1) +/* The same as above, but these return the new value instead of void */ +# define ATOMIC_ADD_FETCH_DECL(T) \ +static inline T atomic_add_fetch_##T(ATOMIC T *ptr, T add) { \ + T des, cur = atomic_load(ptr); \ + do { \ + des = cur + add; \ + } while(!atomic_compare_exchange_weak(ptr, &cur, des)); \ + return des; \ +} +# define ATOMIC_SUB_FETCH_DECL(T) \ +T atomic_sub_fetch_##T(ATOMIC T *ptr) { \ + T des, cur = atomic_load(ptr); \ + do { \ + des = cur - 1; \ + } while(!atomic_compare_exchange_weak(ptr, &cur, des)); \ + return des; \ +} +ATOMIC_ADD_FETCH_DECL(uint8_t) +# define atomic_add_8_nv(X, Y) atomic_add_fetch_uint8_t(X, Y) +ATOMIC_ADD_FETCH_DECL(uint16_t) +# define atomic_add_16_nv(X, Y) atomic_add_fetch_uint16_t(X, Y) +ATOMIC_ADD_FETCH_DECL(uint32_t) +# define atomic_add_32_nv(X, Y) atomic_add_fetch_uint32_t(X, Y) +ATOMIC_ADD_FETCH_DECL(size_t) +# define atomic_add_size_nv(X, Y) atomic_add_fetch_size_t(X, Y) +# define atomic_dec_8_nv(X) atomic_sub_fetch(X, Y) +# define atomic_dec_16_nv(X) atomic_sub_fetch(X, Y) +# define atomic_dec_32_nv(X) atomic_sub_fetch(X, Y) +# define atomic_dec_size_nv(X) atomic_sub_fetch(X, Y) +#else +#warning "Atomic operators not found so memslap will not work correctly" +# define atomic_add_8(X, Y) 0 +# define atomic_add_16(X, Y) 0 +# define atomic_add_32(X, Y) 0 +# define atomic_add_size(X, Y) 0 +# define atomic_dec_8(X) 0 +# define atomic_dec_16(X) 0 +# define atomic_dec_32(X) 0 +# define atomic_dec_size(X) 0 +/* The same as above, but these return the new value instead of void */ +# define atomic_add_8_nv(X, Y) 0 +# define atomic_add_16_nv(X, Y) 0 +# define atomic_add_32_nv(X, Y) 0 +# define atomic_add_size_nv(X, Y) 0 +# define atomic_dec_8_nv(X) 0 +# define atomic_dec_16_nv(X) 0 +# define atomic_dec_32_nv(X) 0 +# define atomic_dec_size_nv(X) 0 +#endif /* defined(__SUNPRO_C) */ + +#endif /* CLIENTS_MS_ATOMIC_H */ diff --git a/src/bin/memaslap/ms_conn.c b/src/bin/memaslap/ms_conn.c new file mode 100644 index 00000000..71353d5b --- /dev/null +++ b/src/bin/memaslap/ms_conn.c @@ -0,0 +1,3427 @@ +/* + * File: ms_conn.c + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ + +#include "mem_config.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(HAVE_ARPA_INET_H) +# include +#endif + +#if defined(HAVE_SYS_TIME_H) +# include +#endif + +#if defined(HAVE_TIME_H) +# include +#endif + +#include "ms_setting.h" +#include "ms_thread.h" +#include "ms_atomic.h" + +#ifdef linux +/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to + * optimize the conversion functions, but the prototypes generate warnings + * from gcc. The conversion methods isn't the bottleneck for my app, so + * just remove the warnings by undef'ing the optimization .. + */ +#undef ntohs +#undef ntohl +#undef htons +#undef htonl +#endif + +/* for network write */ +#define TRANSMIT_COMPLETE 0 +#define TRANSMIT_INCOMPLETE 1 +#define TRANSMIT_SOFT_ERROR 2 +#define TRANSMIT_HARD_ERROR 3 + +/* for generating key */ +#define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */ +#define KEY_PREFIX_MASK 0x1010101010101010 + +/* For parse the value length return by server */ +#define KEY_TOKEN 1 +#define VALUELEN_TOKEN 3 + +/* global increasing counter, to ensure the key prefix unique */ +static uint64_t key_prefix_seq= KEY_PREFIX_BASE; + +/* global increasing counter, generating request id for UDP */ +static ATOMIC uint32_t udp_request_id= 0; + +extern pthread_key_t ms_thread_key; + +/* generate upd request id */ +static uint32_t ms_get_udp_request_id(void); + + +/* connect initialize */ +static void ms_task_init(ms_conn_t *c); +static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp); +static int ms_conn_sock_init(ms_conn_t *c); +static int ms_conn_event_init(ms_conn_t *c); +static int ms_conn_init(ms_conn_t *c, + const int init_state, + const int read_buffer_size, + const bool is_udp); +static void ms_warmup_num_init(ms_conn_t *c); +static int ms_item_win_init(ms_conn_t *c); + + +/* connection close */ +void ms_conn_free(ms_conn_t *c); +static void ms_conn_close(ms_conn_t *c); + + +/* create network connection */ +static int ms_new_socket(struct addrinfo *ai); +static void ms_maximize_sndbuf(const int sfd); +static int ms_network_connect(ms_conn_t *c, + char *srv_host_name, + const int srv_port, + const bool is_udp, + int *ret_sfd); +static int ms_reconn(ms_conn_t *c); + + +/* read and parse */ +static int ms_tokenize_command(char *command, + token_t *tokens, + const int max_tokens); +static int ms_ascii_process_line(ms_conn_t *c, char *command); +static int ms_try_read_line(ms_conn_t *c); +static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes); +static int ms_udp_read(ms_conn_t *c, char *buf, int len); +static int ms_try_read_network(ms_conn_t *c); +static void ms_verify_value(ms_conn_t *c, + ms_mlget_task_item_t *mlget_item, + char *value, + int vlen); +static void ms_ascii_complete_nread(ms_conn_t *c); +static void ms_bin_complete_nread(ms_conn_t *c); +static void ms_complete_nread(ms_conn_t *c); + + +/* send functions */ +static int ms_add_msghdr(ms_conn_t *c); +static int ms_ensure_iov_space(ms_conn_t *c); +static int ms_add_iov(ms_conn_t *c, const void *buf, int len); +static int ms_build_udp_headers(ms_conn_t *c); +static int ms_transmit(ms_conn_t *c); + + +/* status adjustment */ +static void ms_conn_shrink(ms_conn_t *c); +static void ms_conn_set_state(ms_conn_t *c, int state); +static bool ms_update_event(ms_conn_t *c, const int new_flags); +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd); +static uint32_t ms_get_next_sock_index(ms_conn_t *c); +static int ms_update_conn_sock_event(ms_conn_t *c); +static bool ms_need_yield(ms_conn_t *c); +static void ms_update_start_time(ms_conn_t *c); + + +/* main loop */ +static void ms_drive_machine(ms_conn_t *c); +void ms_event_handler(const int fd, const short which, void *arg); + + +/* ascii protocol */ +static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_ascii_write_buf_mlget(ms_conn_t *c); + + +/* binary protocol */ +static int ms_bin_process_response(ms_conn_t *c); +static void ms_add_bin_header(ms_conn_t *c, + uint8_t opcode, + uint8_t hdr_len, + uint16_t key_len, + uint32_t body_len); +static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item); +static int ms_build_bin_write_buf_mlget(ms_conn_t *c); + + +/** + * each key has two parts, prefix and suffix. The suffix is a + * string random get form the character table. The prefix is a + * uint64_t variable. And the prefix must be unique. we use the + * prefix to identify a key. And the prefix can't include + * character ' ' '\r' '\n' '\0'. + * + * @return uint64_t + */ +uint64_t ms_get_key_prefix(void) +{ + uint64_t key_prefix; + + pthread_mutex_lock(&ms_global.seq_mutex); + key_prefix_seq|= KEY_PREFIX_MASK; + key_prefix= key_prefix_seq; + key_prefix_seq++; + pthread_mutex_unlock(&ms_global.seq_mutex); + + return key_prefix; +} /* ms_get_key_prefix */ + + +/** + * get an unique udp request id + * + * @return an unique UDP request id + */ +static uint32_t ms_get_udp_request_id(void) +{ + return atomic_add_32_nv(&udp_request_id, 1); +} + + +/** + * initialize current task structure + * + * @param c, pointer of the concurrency + */ +static void ms_task_init(ms_conn_t *c) +{ + c->curr_task.cmd= CMD_NULL; + c->curr_task.item= 0; + c->curr_task.verify= false; + c->curr_task.finish_verify= true; + c->curr_task.get_miss= true; + + c->curr_task.get_opt= 0; + c->curr_task.set_opt= 0; + c->curr_task.cycle_undo_get= 0; + c->curr_task.cycle_undo_set= 0; + c->curr_task.verified_get= 0; + c->curr_task.overwrite_set= 0; +} /* ms_task_init */ + + +/** + * initialize udp for the connection structure + * + * @param c, pointer of the concurrency + * @param is_udp, whether it's udp + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) +{ + c->hdrbuf= 0; + c->rudpbuf= 0; + c->udppkt= 0; + + c->rudpsize= UDP_DATA_BUFFER_SIZE; + c->hdrsize= 0; + + c->rudpbytes= 0; + c->packets= 0; + c->recvpkt= 0; + c->pktcurr= 0; + c->ordcurr= 0; + + c->udp= is_udp; + + if (c->udp || (! c->udp && ms_setting.facebook_test)) + { + c->rudpbuf= (char *)malloc((size_t)c->rudpsize); + c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t)); + + if ((c->rudpbuf == NULL) || (c->udppkt == NULL)) + { + if (c->rudpbuf != NULL) + free(c->rudpbuf); + if (c->udppkt != NULL) + free(c->udppkt); + fprintf(stderr, "malloc()\n"); + return -1; + } + memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t)); + } + + return EXIT_SUCCESS; +} /* ms_conn_udp_init */ + + +/** + * initialize the connection structure + * + * @param c, pointer of the concurrency + * @param init_state, (conn_read, conn_write, conn_closing) + * @param read_buffer_size + * @param is_udp, whether it's udp + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_conn_init(ms_conn_t *c, + const int init_state, + const int read_buffer_size, + const bool is_udp) +{ + assert(c != NULL); + + c->rbuf= c->wbuf= 0; + c->iov= 0; + c->msglist= 0; + + c->rsize= read_buffer_size; + c->wsize= WRITE_BUFFER_SIZE; + c->iovsize= IOV_LIST_INITIAL; + c->msgsize= MSG_LIST_INITIAL; + + /* for replication, each connection need connect all the server */ + if (ms_setting.rep_write_srv > 0) + { + c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn; + } + else + { + c->total_sfds= ms_setting.sock_per_conn; + } + c->alive_sfds= 0; + + c->rbuf= (char *)malloc((size_t)c->rsize); + c->wbuf= (char *)malloc((size_t)c->wsize); + c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize); + c->msglist= (struct msghdr *)malloc( + sizeof(struct msghdr) * (size_t)c->msgsize); + if (ms_setting.mult_key_num > 1) + { + c->mlget_task.mlget_item= (ms_mlget_task_item_t *) + malloc( + sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num); + } + c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int)); + + if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL) + || (c->msglist == NULL) || (c->tcpsfd == NULL) + || ((ms_setting.mult_key_num > 1) + && (c->mlget_task.mlget_item == NULL))) + { + if (c->rbuf != NULL) + free(c->rbuf); + if (c->wbuf != NULL) + free(c->wbuf); + if (c->iov != NULL) + free(c->iov); + if (c->msglist != NULL) + free(c->msglist); + if (c->mlget_task.mlget_item != NULL) + free(c->mlget_task.mlget_item); + if (c->tcpsfd != NULL) + free(c->tcpsfd); + fprintf(stderr, "malloc()\n"); + return -1; + } + + c->state= init_state; + c->rvbytes= 0; + c->rbytes= 0; + c->rcurr= c->rbuf; + c->wcurr= c->wbuf; + c->iovused= 0; + c->msgcurr= 0; + c->msgused= 0; + c->cur_idx= c->total_sfds; /* default index is a invalid value */ + + c->ctnwrite= false; + c->readval= false; + c->change_sfd= false; + + c->precmd.cmd= c->currcmd.cmd= CMD_NULL; + c->precmd.isfinish= true; /* default the previous command finished */ + c->currcmd.isfinish= false; + c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE; + c->precmd.key_prefix= c->currcmd.key_prefix= 0; + + c->mlget_task.mlget_num= 0; + c->mlget_task.value_index= -1; /* default invalid value */ + + if (ms_setting.binary_prot_) + { + c->protocol= binary_prot; + } + else + { + c->protocol= ascii_prot; + } + + /* initialize udp */ + if (ms_conn_udp_init(c, is_udp) != 0) + { + return -1; + } + + /* initialize task */ + ms_task_init(c); + + if (! (ms_setting.facebook_test && is_udp)) + { + atomic_add_32(&ms_stats.active_conns, 1); + } + + return EXIT_SUCCESS; +} /* ms_conn_init */ + + +/** + * when doing 100% get operation, it could preset some objects + * to warmup the server. this function is used to initialize the + * number of the objects to preset. + * + * @param c, pointer of the concurrency + */ +static void ms_warmup_num_init(ms_conn_t *c) +{ + /* no set operation, preset all the items in the window */ + if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) + { + c->warmup_num= c->win_size; + c->remain_warmup_num= c->warmup_num; + } + else + { + c->warmup_num= 0; + c->remain_warmup_num= c->warmup_num; + } +} /* ms_warmup_num_init */ + + +/** + * each connection has an item window, this function initialize + * the window. The window is used to generate task. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_item_win_init(ms_conn_t *c) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + int exp_cnt= 0; + + c->win_size= (int)ms_setting.win_size; + c->set_cursor= 0; + c->exec_num= ms_thread->thread_ctx->exec_num_perconn; + c->remain_exec_num= c->exec_num; + + c->item_win= (ms_task_item_t *)malloc( + sizeof(ms_task_item_t) * (size_t)c->win_size); + if (c->item_win == NULL) + { + fprintf(stderr, "Can't allocate task item array for conn.\n"); + return -1; + } + memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size); + + for (int i= 0; i < c->win_size; i++) + { + c->item_win[i].key_size= (int)ms_setting.distr[i].key_size; + c->item_win[i].key_prefix= ms_get_key_prefix(); + c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset; + c->item_win[i].value_size= (int)ms_setting.distr[i].value_size; + c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */ + c->item_win[i].client_time= 0; + + /* set expire time base on the proportion */ + if (exp_cnt < ms_setting.exp_ver_per * i) + { + c->item_win[i].exp_time= FIXED_EXPIRE_TIME; + exp_cnt++; + } + else + { + c->item_win[i].exp_time= 0; + } + } + + ms_warmup_num_init(c); + + return EXIT_SUCCESS; +} /* ms_item_win_init */ + + +/** + * each connection structure can include one or more sock + * handlers. this function create these socks and connect the + * server(s). + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_conn_sock_init(ms_conn_t *c) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t i; + int ret_sfd; + uint32_t srv_idx= 0; + + assert(c != NULL); + assert(c->tcpsfd != NULL); + + for (i= 0; i < c->total_sfds; i++) + { + ret_sfd= 0; + if (ms_setting.rep_write_srv > 0) + { + /* for replication, each connection need connect all the server */ + srv_idx= i % ms_setting.srv_cnt; + } + else + { + /* all the connections in a thread connects the same server */ + srv_idx= ms_thread->thread_ctx->srv_idx; + } + + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, + ms_setting.udp, &ret_sfd) != 0) + { + break; + } + + if (i == 0) + { + c->sfd= ret_sfd; + } + + if (! ms_setting.udp) + { + c->tcpsfd[i]= ret_sfd; + } + + c->alive_sfds++; + } + + /* initialize udp sock handler if necessary */ + if (ms_setting.facebook_test) + { + ret_sfd= 0; + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, + true, &ret_sfd) != 0) + { + c->udpsfd= 0; + } + else + { + c->udpsfd= ret_sfd; + } + } + + if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0))) + { + if (ms_setting.udp) + { + close(c->sfd); + } + else + { + for (uint32_t j= 0; j < i; j++) + { + close(c->tcpsfd[j]); + } + } + + if (c->udpsfd != 0) + { + close(c->udpsfd); + } + + return -1; + } + + return EXIT_SUCCESS; +} /* ms_conn_sock_init */ + + +/** + * each connection is managed by libevent, this function + * initialize the event of the connection structure. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_conn_event_init(ms_conn_t *c) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + short event_flags= EV_WRITE | EV_PERSIST; + + event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); + event_base_set(ms_thread->base, &c->event); + c->ev_flags= event_flags; + + if (event_add(&c->event, NULL) == -1) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_conn_event_init */ + + +/** + * setup a connection, each connection structure of each + * thread must call this function to initialize. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_setup_conn(ms_conn_t *c) +{ + if (ms_item_win_init(c) != 0) + { + return -1; + } + + if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0) + { + return -1; + } + + if (ms_conn_sock_init(c) != 0) + { + return -1; + } + + if (ms_conn_event_init(c) != 0) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_setup_conn */ + + +/** + * Frees a connection. + * + * @param c, pointer of the concurrency + */ +void ms_conn_free(ms_conn_t *c) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + if (c != NULL) + { + if (c->hdrbuf != NULL) + free(c->hdrbuf); + if (c->msglist != NULL) + free(c->msglist); + if (c->rbuf != NULL) + free(c->rbuf); + if (c->wbuf != NULL) + free(c->wbuf); + if (c->iov != NULL) + free(c->iov); + if (c->mlget_task.mlget_item != NULL) + free(c->mlget_task.mlget_item); + if (c->rudpbuf != NULL) + free(c->rudpbuf); + if (c->udppkt != NULL) + free(c->udppkt); + if (c->item_win != NULL) + free(c->item_win); + if (c->tcpsfd != NULL) + free(c->tcpsfd); + + if (--ms_thread->nactive_conn == 0) + { + free(ms_thread->conn); + } + } +} /* ms_conn_free */ + + +/** + * close a connection + * + * @param c, pointer of the concurrency + */ +static void ms_conn_close(ms_conn_t *c) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + assert(c != NULL); + + /* delete the event, the socket and the connection */ + event_del(&c->event); + + for (uint32_t i= 0; i < c->total_sfds; i++) + { + if (c->tcpsfd[i] > 0) + { + close(c->tcpsfd[i]); + } + } + c->sfd= 0; + + if (ms_setting.facebook_test) + { + close(c->udpsfd); + } + + atomic_dec_32(&ms_stats.active_conns); + + ms_conn_free(c); + + if (ms_setting.run_time == 0) + { + pthread_mutex_lock(&ms_global.run_lock.lock); + ms_global.run_lock.count++; + pthread_cond_signal(&ms_global.run_lock.cond); + pthread_mutex_unlock(&ms_global.run_lock.lock); + } + + if (ms_thread->nactive_conn == 0) + { + pthread_exit(NULL); + } +} /* ms_conn_close */ + + +/** + * create a new sock + * + * @param ai, server address information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_new_socket(struct addrinfo *ai) +{ + int sfd; + + if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) + { + fprintf(stderr, "socket() error: %s.\n", strerror(errno)); + return -1; + } + + return sfd; +} /* ms_new_socket */ + + +/** + * Sets a socket's send buffer size to the maximum allowed by the system. + * + * @param sfd, file descriptor of socket + */ +static void ms_maximize_sndbuf(const int sfd) +{ + socklen_t intsize= sizeof(int); + unsigned int last_good= 0; + unsigned int min, max, avg; + unsigned int old_size; + + /* Start with the default size. */ + if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) + { + fprintf(stderr, "getsockopt(SO_SNDBUF)\n"); + return; + } + + /* Binary-search for the real maximum. */ + min= old_size; + max= MAX_SENDBUF_SIZE; + + while (min <= max) + { + avg= ((unsigned int)(min + max)) / 2; + if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) + { + last_good= avg; + min= avg + 1; + } + else + { + max= avg - 1; + } + } + (void)last_good; +} /* ms_maximize_sndbuf */ + + +/** + * socket connects the server + * + * @param c, pointer of the concurrency + * @param srv_host_name, the host name of the server + * @param srv_port, port of server + * @param is_udp, whether it's udp + * @param ret_sfd, the connected socket file descriptor + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_network_connect(ms_conn_t *c, + char *srv_host_name, + const int srv_port, + const bool is_udp, + int *ret_sfd) +{ + int sfd; + struct linger ling= + { + 0, 0 + }; + struct addrinfo *ai; + struct addrinfo *next; + struct addrinfo hints; + char port_buf[NI_MAXSERV]; + int error; + int success= 0; + + int flags= 1; + + /* + * the memset call clears nonstandard fields in some impementations + * that otherwise mess things up. + */ + memset(&hints, 0, sizeof(hints)); +#ifdef AI_ADDRCONFIG + hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG; +#else + hints.ai_flags= AI_PASSIVE; +#endif /* AI_ADDRCONFIG */ + if (is_udp) + { + hints.ai_protocol= IPPROTO_UDP; + hints.ai_socktype= SOCK_DGRAM; + hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */ + } + else + { + hints.ai_family= AF_UNSPEC; + hints.ai_protocol= IPPROTO_TCP; + hints.ai_socktype= SOCK_STREAM; + } + + snprintf(port_buf, NI_MAXSERV, "%d", srv_port); + error= getaddrinfo(srv_host_name, port_buf, &hints, &ai); + if (error != 0) + { + if (error != EAI_SYSTEM) + fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error)); + else + perror("getaddrinfo()\n"); + + return -1; + } + + for (next= ai; next; next= next->ai_next) + { + if ((sfd= ms_new_socket(next)) == -1) + { + freeaddrinfo(ai); + return -1; + } + + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); + if (is_udp) + { + ms_maximize_sndbuf(sfd); + } + else + { + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, + sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, + sizeof(flags)); + } + + if (is_udp) + { + c->srv_recv_addr_size= sizeof(struct sockaddr); + memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size); + } + else + { + if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) + { + close(sfd); + freeaddrinfo(ai); + return -1; + } + } + + if (((flags= fcntl(sfd, F_GETFL, 0)) < 0) + || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)) + { + fprintf(stderr, "setting O_NONBLOCK\n"); + close(sfd); + freeaddrinfo(ai); + return -1; + } + + if (ret_sfd != NULL) + { + *ret_sfd= sfd; + } + + success++; + } + + freeaddrinfo(ai); + + /* Return zero if we detected no errors in starting up connections */ + return success == 0; +} /* ms_network_connect */ + + +/** + * reconnect a disconnected sock + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_reconn(ms_conn_t *c) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t srv_idx= 0; + uint32_t srv_conn_cnt= 0; + + if (ms_setting.rep_write_srv > 0) + { + srv_idx= c->cur_idx % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; + } + else + { + srv_idx= ms_thread->thread_ctx->srv_idx; + srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; + } + + /* close the old socket handler */ + close(c->sfd); + c->tcpsfd[c->cur_idx]= 0; + + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) + % srv_conn_cnt == 0) + { + gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); + fprintf(stderr, "Server %s:%d disconnect\n", + ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port); + } + + if (ms_setting.rep_write_srv > 0) + { + uint32_t i= 0; + + for (i= 0; i < c->total_sfds; i++) + { + if (c->tcpsfd[i] != 0) + { + break; + } + } + + /* all socks disconnect */ + if (i == c->total_sfds) + { + return -1; + } + } + else + { + do + { + /* reconnect success, break the loop */ + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, + ms_setting.udp, &c->sfd) == 0) + { + c->tcpsfd[c->cur_idx]= c->sfd; + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) + { + gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); + int reconn_time= + (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec + - ms_setting.servers[srv_idx].disconn_time + .tv_sec); + fprintf(stderr, "Server %s:%d reconnect after %ds\n", + ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, reconn_time); + } + break; + } + + if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) + { + /* wait a second and reconnect */ + sleep(1); + } + } + while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); + } + + if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) + { + c->sfd= 0; + c->alive_sfds--; + } + + return EXIT_SUCCESS; +} /* ms_reconn */ + + +/** + * reconnect several disconnected socks in the connection + * structure, the ever-1-second timer of the thread will check + * whether some socks in the connections disconnect. if + * disconnect, reconnect the sock. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_reconn_socks(ms_conn_t *c) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + uint32_t srv_idx= 0; + int ret_sfd= 0; + uint32_t srv_conn_cnt= 0; + struct timeval cur_time; + + assert(c != NULL); + + if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds)) + { + return EXIT_SUCCESS; + } + + for (uint32_t i= 0; i < c->total_sfds; i++) + { + if (c->tcpsfd[i] == 0) + { + gettimeofday(&cur_time, NULL); + + /** + * For failover test of replication, reconnect the socks after + * it disconnects more than 5 seconds, Otherwise memslap will + * block at connect() function and the work threads can't work + * in this interval. + */ + if (cur_time.tv_sec + - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) + { + break; + } + + if (ms_setting.rep_write_srv > 0) + { + srv_idx= i % ms_setting.srv_cnt; + srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; + } + else + { + srv_idx= ms_thread->thread_ctx->srv_idx; + srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; + } + + if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, + ms_setting.udp, &ret_sfd) == 0) + { + c->tcpsfd[i]= ret_sfd; + c->alive_sfds++; + + if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) + % (uint32_t)srv_conn_cnt == 0) + { + gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); + int reconn_time= + (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec + - ms_setting.servers[srv_idx].disconn_time + .tv_sec); + fprintf(stderr, "Server %s:%d reconnect after %ds\n", + ms_setting.servers[srv_idx].srv_host_name, + ms_setting.servers[srv_idx].srv_port, reconn_time); + } + } + } + } + + return EXIT_SUCCESS; +} /* ms_reconn_socks */ + + +/** + * Tokenize the command string by replacing whitespace with '\0' and update + * the token array tokens with pointer to start of each token and length. + * Returns total number of tokens. The last valid token is the terminal + * token (value points to the first unprocessed character of the string and + * length zero). + * + * Usage example: + * + * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) { + * for(int ix = 0; tokens[ix].length != 0; ix++) { + * ... + * } + * ncommand = tokens[ix].value - command; + * command = tokens[ix].value; + * } + * + * @param command, the command string to token + * @param tokens, array to store tokens + * @param max_tokens, maximum tokens number + * + * @return int, the number of tokens + */ +static int ms_tokenize_command(char *command, + token_t *tokens, + const int max_tokens) +{ + char *s, *e; + int ntokens= 0; + + assert(command != NULL && tokens != NULL && max_tokens > 1); + + for (s= e= command; ntokens < max_tokens - 1; ++e) + { + if (*e == ' ') + { + if (s != e) + { + tokens[ntokens].value= s; + tokens[ntokens].length= (size_t)(e - s); + ntokens++; + *e= '\0'; + } + s= e + 1; + } + else if (*e == '\0') + { + if (s != e) + { + tokens[ntokens].value= s; + tokens[ntokens].length= (size_t)(e - s); + ntokens++; + } + + break; /* string end */ + } + } + + return ntokens; +} /* ms_tokenize_command */ + + +/** + * parse the response of server. + * + * @param c, pointer of the concurrency + * @param command, the string responded by server + * + * @return int, if the command completed return EXIT_SUCCESS, else return + * -1 + */ +static int ms_ascii_process_line(ms_conn_t *c, char *command) +{ + int ret= 0; + int64_t value_len; + char *buffer= command; + + assert(c != NULL); + + /** + * for command get, we store the returned value into local buffer + * then continue in ms_complete_nread(). + */ + + switch (buffer[0]) + { + case 'V': /* VALUE || VERSION */ + if (buffer[1] == 'A') /* VALUE */ + { + token_t tokens[MAX_TOKENS]; + ms_tokenize_command(command, tokens, MAX_TOKENS); + errno= 0; + value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10); + if (errno != 0) + { + printf("<%d ERROR %s\n", c->sfd, strerror(errno)); + } + c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value; + + /* + * We read the \r\n into the string since not doing so is more + * cycles then the waster of memory to do so. + * + * We are null terminating through, which will most likely make + * some people lazy about using the return length. + */ + c->rvbytes= (int)(value_len + 2); + c->readval= true; + ret= -1; + } + + break; + + case 'O': /* OK */ + c->currcmd.retstat= MCD_SUCCESS; + break; + + case 'S': /* STORED STATS SERVER_ERROR */ + if (buffer[2] == 'A') /* STORED STATS */ + { /* STATS*/ + c->currcmd.retstat= MCD_STAT; + } + else if (buffer[1] == 'E') + { + /* SERVER_ERROR */ + printf("<%d %s\n", c->sfd, buffer); + + c->currcmd.retstat= MCD_SERVER_ERROR; + } + else if (buffer[1] == 'T') + { + /* STORED */ + c->currcmd.retstat= MCD_STORED; + } + else + { + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + } + break; + + case 'D': /* DELETED DATA */ + if (buffer[1] == 'E') + { + c->currcmd.retstat= MCD_DELETED; + } + else + { + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + } + + break; + + case 'N': /* NOT_FOUND NOT_STORED*/ + if (buffer[4] == 'F') + { + c->currcmd.retstat= MCD_NOTFOUND; + } + else if (buffer[4] == 'S') + { + printf("<%d %s\n", c->sfd, buffer); + c->currcmd.retstat= MCD_NOTSTORED; + } + else + { + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + } + break; + + case 'E': /* PROTOCOL ERROR or END */ + if (buffer[1] == 'N') + { + /* END */ + c->currcmd.retstat= MCD_END; + } + else if (buffer[1] == 'R') + { + printf("<%d ERROR\n", c->sfd); + c->currcmd.retstat= MCD_PROTOCOL_ERROR; + } + else if (buffer[1] == 'X') + { + c->currcmd.retstat= MCD_DATA_EXISTS; + printf("<%d %s\n", c->sfd, buffer); + } + else + { + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + } + break; + + case 'C': /* CLIENT ERROR */ + printf("<%d %s\n", c->sfd, buffer); + c->currcmd.retstat= MCD_CLIENT_ERROR; + break; + + default: + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + break; + } /* switch */ + + return ret; +} /* ms_ascii_process_line */ + + +/** + * after one operation completes, reset the concurrency + * + * @param c, pointer of the concurrency + * @param timeout, whether it's timeout + */ +void ms_reset_conn(ms_conn_t *c, bool timeout) +{ + assert(c != NULL); + + if (c->udp) + { + if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) + { + memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets); + } + + c->packets= 0; + c->recvpkt= 0; + c->pktcurr= 0; + c->ordcurr= 0; + c->rudpbytes= 0; + } + c->currcmd.isfinish= true; + c->ctnwrite= false; + c->rbytes= 0; + c->rcurr= c->rbuf; + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; + ms_conn_set_state(c, conn_write); + memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ + + if (timeout) + { + ms_drive_machine(c); + } +} /* ms_reset_conn */ + + +/** + * if we have a complete line in the buffer, process it. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_try_read_line(ms_conn_t *c) +{ + if (c->protocol == binary_prot) + { + /* Do we have the complete packet header? */ + if ((uint64_t)c->rbytes < sizeof(c->binary_header)) + { + /* need more data! */ + return EXIT_SUCCESS; + } + else + { +#ifdef NEED_ALIGN + if (((long)(c->rcurr)) % 8 != 0) + { + /* must realign input buffer */ + memmove(c->rbuf, c->rcurr, c->rbytes); + c->rcurr= c->rbuf; + if (settings.verbose) + { + fprintf(stderr, "%d: Realign input buffer.\n", c->sfd); + } + } +#endif + protocol_binary_response_header *rsp; + rsp= (protocol_binary_response_header *)c->rcurr; + + c->binary_header= *rsp; + c->binary_header.response.extlen= rsp->response.extlen; + c->binary_header.response.keylen= ntohs(rsp->response.keylen); + c->binary_header.response.bodylen= ntohl(rsp->response.bodylen); + c->binary_header.response.status= ntohs(rsp->response.status); + + if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) + { + fprintf(stderr, "Invalid magic: %x\n", + c->binary_header.response.magic); + ms_conn_set_state(c, conn_closing); + return EXIT_SUCCESS; + } + + /* process this complete response */ + if (ms_bin_process_response(c) == 0) + { + /* current operation completed */ + ms_reset_conn(c, false); + return -1; + } + else + { + c->rbytes-= (int32_t)sizeof(c->binary_header); + c->rcurr+= sizeof(c->binary_header); + } + } + } + else + { + char *el, *cont; + + assert(c != NULL); + assert(c->rcurr <= (c->rbuf + c->rsize)); + + if (c->rbytes == 0) + return EXIT_SUCCESS; + + el= memchr(c->rcurr, '\n', (size_t)c->rbytes); + if (! el) + return EXIT_SUCCESS; + + cont= el + 1; + if (((el - c->rcurr) > 1) && (*(el - 1) == '\r')) + { + el--; + } + *el= '\0'; + + assert(cont <= (c->rcurr + c->rbytes)); + + /* process this complete line */ + if (ms_ascii_process_line(c, c->rcurr) == 0) + { + /* current operation completed */ + ms_reset_conn(c, false); + return -1; + } + else + { + /* current operation didn't complete */ + c->rbytes-= (int32_t)(cont - c->rcurr); + c->rcurr= cont; + } + + assert(c->rcurr <= (c->rbuf + c->rsize)); + } + + return -1; +} /* ms_try_read_line */ + + +/** + * because the packet of UDP can't ensure the order, the + * function is used to sort the received udp packet. + * + * @param c, pointer of the concurrency + * @param buf, the buffer to store the ordered packages data + * @param rbytes, the maximum capacity of the buffer + * + * @return int, if success, return the copy bytes, else return + * -1 + */ +static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes) +{ + int len= 0; + int wbytes= 0; + uint16_t req_id= 0; + uint16_t seq_num= 0; + uint16_t packets= 0; + unsigned char *header= NULL; + + /* no enough data */ + assert(c != NULL); + assert(buf != NULL); + assert(c->rudpbytes >= UDP_HEADER_SIZE); + + /* calculate received packets count */ + if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) + { + /* the last packet has some data */ + c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1; + } + else + { + c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE; + } + + /* get the total packets count if necessary */ + if (c->packets == 0) + { + c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf); + } + + /* build the ordered packet array */ + for (int i= c->pktcurr; i < c->recvpkt; i++) + { + header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE; + req_id= (uint16_t)HEADER_TO_REQID(header); + assert(req_id == c->request_id % (1 << 16)); + + packets= (uint16_t)HEADER_TO_PACKETS(header); + assert(c->packets == HEADER_TO_PACKETS(header)); + + seq_num= (uint16_t)HEADER_TO_SEQNUM(header); + c->udppkt[seq_num].header= header; + c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE; + + if (i == c->recvpkt - 1) + { + /* last received packet */ + if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) + { + c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; + c->pktcurr++; + } + else + { + c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE + - UDP_HEADER_SIZE; + } + } + else + { + c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; + c->pktcurr++; + } + } + + for (int i= c->ordcurr; i < c->recvpkt; i++) + { + /* there is some data to copy */ + if ((c->udppkt[i].data != NULL) + && (c->udppkt[i].copybytes < c->udppkt[i].rbytes)) + { + header= c->udppkt[i].header; + len= c->udppkt[i].rbytes - c->udppkt[i].copybytes; + if (len > rbytes - wbytes) + { + len= rbytes - wbytes; + } + + assert(len <= rbytes - wbytes); + assert(i == HEADER_TO_SEQNUM(header)); + + memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, + (size_t)len); + wbytes+= len; + c->udppkt[i].copybytes+= len; + + if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes) + && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) + { + /* finish copying all the data of this packet, next */ + c->ordcurr++; + } + + /* last received packet, and finish copying all the data */ + if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1) + && (c->udppkt[i].copybytes == c->udppkt[i].rbytes)) + { + break; + } + + /* no space to copy data */ + if (wbytes >= rbytes) + { + break; + } + + /* it doesn't finish reading all the data of the packet from network */ + if ((i != c->recvpkt - 1) + && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) + { + break; + } + } + else + { + /* no data to copy */ + break; + } + } + (void)packets; + + return wbytes == 0 ? -1 : wbytes; +} /* ms_sort_udp_packet */ + + +/** + * encapsulate upd read like tcp read + * + * @param c, pointer of the concurrency + * @param buf, read buffer + * @param len, length to read + * + * @return int, if success, return the read bytes, else return + * -1 + */ +static int ms_udp_read(ms_conn_t *c, char *buf, int len) +{ + int res= 0; + int avail= 0; + int rbytes= 0; + int copybytes= 0; + + assert(c->udp); + + while (1) + { + if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) + { + char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2); + if (! new_rbuf) + { + fprintf(stderr, "Couldn't realloc input buffer.\n"); + c->rudpbytes= 0; /* ignore what we read */ + return -1; + } + c->rudpbuf= new_rbuf; + c->rudpsize*= 2; + } + + avail= c->rudpsize - c->rudpbytes; + /* UDP each time read a packet, 1400 bytes */ + res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail); + + if (res > 0) + { + atomic_add_size(&ms_stats.bytes_read, res); + c->rudpbytes+= res; + rbytes+= res; + if (res == avail) + { + continue; + } + else + { + break; + } + } + + if (res == 0) + { + /* "connection" closed */ + return res; + } + + if (res == -1) + { + /* no data to read */ + return res; + } + } + + /* copy data to read buffer */ + if (rbytes > 0) + { + copybytes= ms_sort_udp_packet(c, buf, len); + } + + if (copybytes == -1) + { + atomic_add_size(&ms_stats.pkt_disorder, 1); + } + + return copybytes; +} /* ms_udp_read */ + + +/* + * read from network as much as we can, handle buffer overflow and connection + * close. + * before reading, move the remaining incomplete fragment of a command + * (if any) to the beginning of the buffer. + * return EXIT_SUCCESS if there's nothing to read on the first read. + */ + +/** + * read from network as much as we can, handle buffer overflow and connection + * close. before reading, move the remaining incomplete fragment of a command + * (if any) to the beginning of the buffer. + * + * @param c, pointer of the concurrency + * + * @return int, + * return EXIT_SUCCESS if there's nothing to read on the first read. + * return EXIT_FAILURE if get data + * return -1 if error happens + */ +static int ms_try_read_network(ms_conn_t *c) +{ + int gotdata= 0; + int res; + int64_t avail; + + assert(c != NULL); + + if ((c->rcurr != c->rbuf) + && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf)) + || (c->readval && (c->rcurr - c->rbuf > c->rbytes)))) + { + if (c->rbytes != 0) /* otherwise there's nothing to copy */ + memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); + c->rcurr= c->rbuf; + } + + while (1) + { + if (c->rbytes >= c->rsize) + { + char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2); + if (! new_rbuf) + { + fprintf(stderr, "Couldn't realloc input buffer.\n"); + c->rbytes= 0; /* ignore what we read */ + return -1; + } + c->rcurr= c->rbuf= new_rbuf; + c->rsize*= 2; + } + + avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf); + if (avail == 0) + { + break; + } + + if (c->udp) + { + res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail); + } + else + { + res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail); + } + + if (res > 0) + { + if (! c->udp) + { + atomic_add_size(&ms_stats.bytes_read, res); + } + gotdata= 1; + c->rbytes+= res; + if (res == avail) + { + continue; + } + else + { + break; + } + } + if (res == 0) + { + /* connection closed */ + ms_conn_set_state(c, conn_closing); + return -1; + } + if (res == -1) + { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + break; + /* Should close on unhandled errors. */ + ms_conn_set_state(c, conn_closing); + return -1; + } + } + + return gotdata; +} /* ms_try_read_network */ + + +/** + * after get the object from server, verify the value if + * necessary. + * + * @param c, pointer of the concurrency + * @param mlget_item, pointer of mulit-get task item structure + * @param value, received value string + * @param vlen, received value string length + */ +static void ms_verify_value(ms_conn_t *c, + ms_mlget_task_item_t *mlget_item, + char *value, + int vlen) +{ + if (c->curr_task.verify) + { + assert(c->curr_task.item->value_offset != INVALID_OFFSET); + char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset]; + char *orignkey= + &ms_setting.char_block[c->curr_task.item->key_suffix_offset]; + + /* verify expire time if necessary */ + if (c->curr_task.item->exp_time > 0) + { + struct timeval curr_time; + gettimeofday(&curr_time, NULL); + + /* object expired but get it now */ + if (curr_time.tv_sec - c->curr_task.item->client_time + > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) + { + atomic_add_size(&ms_stats.exp_get, 1); + + if (ms_setting.verbose) + { + char set_time[64]; + char cur_time[64]; + strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", + localtime(&c->curr_task.item->client_time)); + strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", + localtime(&curr_time.tv_sec)); + fprintf(stderr, + "\n<%d expire time verification failed, " + "object expired but get it now\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64 " %.*s\n" + "\tset time: %s current time: %s " + "diff time: %d expire time: %d\n" + "\texpected data: \n" + "\treceived data len: %d\n" + "\treceived data: %.*s\n", + c->sfd, + c->curr_task.item->key_size, + c->curr_task.item->key_prefix, + c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE, + orignkey, + set_time, + cur_time, + (int)(curr_time.tv_sec - c->curr_task.item->client_time), + c->curr_task.item->exp_time, + vlen, + vlen, + value); + fflush(stderr); + } + } + } + else + { + if ((c->curr_task.item->value_size != vlen) + || (memcmp(orignval, value, (size_t)vlen) != 0)) + { + atomic_add_size(&ms_stats.vef_failed, 1); + + if (ms_setting.verbose) + { + fprintf(stderr, + "\n<%d data verification failed\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64" %.*s\n" + "\texpected data len: %d\n" + "\texpected data: %.*s\n" + "\treceived data len: %d\n" + "\treceived data: %.*s\n", + c->sfd, + c->curr_task.item->key_size, + c->curr_task.item->key_prefix, + c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE, + orignkey, + c->curr_task.item->value_size, + c->curr_task.item->value_size, + orignval, + vlen, + vlen, + value); + fflush(stderr); + } + } + } + + c->curr_task.finish_verify= true; + + if (mlget_item != NULL) + { + mlget_item->finish_verify= true; + } + } +} /* ms_verify_value */ + + +/** + * For ASCII protocol, after store the data into the local + * buffer, run this function to handle the data. + * + * @param c, pointer of the concurrency + */ +static void ms_ascii_complete_nread(ms_conn_t *c) +{ + assert(c != NULL); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == ascii_prot); + if (c->rvbytes > 2) + { + assert( + c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r'); + } + + /* multi-get */ + ms_mlget_task_item_t *mlget_item= NULL; + if (((ms_setting.mult_key_num > 1) + && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) + || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) + { + c->mlget_task.value_index++; + mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index]; + + if (mlget_item->item->key_prefix == c->currcmd.key_prefix) + { + c->curr_task.item= mlget_item->item; + c->curr_task.verify= mlget_item->verify; + c->curr_task.finish_verify= mlget_item->finish_verify; + mlget_item->get_miss= false; + } + else + { + /* Try to find the task item in multi-get task array */ + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + mlget_item= &c->mlget_task.mlget_item[i]; + if (mlget_item->item->key_prefix == c->currcmd.key_prefix) + { + c->curr_task.item= mlget_item->item; + c->curr_task.verify= mlget_item->verify; + c->curr_task.finish_verify= mlget_item->finish_verify; + mlget_item->get_miss= false; + + break; + } + } + } + } + + ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2); + + c->curr_task.get_miss= false; + c->rbytes-= c->rvbytes; + c->rcurr= c->rcurr + c->rvbytes; + assert(c->rcurr <= (c->rbuf + c->rsize)); + c->readval= false; + c->rvbytes= 0; +} /* ms_ascii_complete_nread */ + + +/** + * For binary protocol, after store the data into the local + * buffer, run this function to handle the data. + * + * @param c, pointer of the concurrency + */ +static void ms_bin_complete_nread(ms_conn_t *c) +{ + assert(c != NULL); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == binary_prot); + + int extlen= c->binary_header.response.extlen; + int keylen= c->binary_header.response.keylen; + uint8_t opcode= c->binary_header.response.opcode; + + /* not get command or not include value, just return */ + if (((opcode != PROTOCOL_BINARY_CMD_GET) + && (opcode != PROTOCOL_BINARY_CMD_GETQ)) + || (c->rvbytes <= extlen + keylen)) + { + /* get miss */ + if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) + { + c->currcmd.retstat= MCD_END; + c->curr_task.get_miss= true; + } + + c->readval= false; + c->rvbytes= 0; + ms_reset_conn(c, false); + return; + } + + /* multi-get */ + ms_mlget_task_item_t *mlget_item= NULL; + if (((ms_setting.mult_key_num > 1) + && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) + || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) + { + c->mlget_task.value_index++; + mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index]; + + c->curr_task.item= mlget_item->item; + c->curr_task.verify= mlget_item->verify; + c->curr_task.finish_verify= mlget_item->finish_verify; + mlget_item->get_miss= false; + } + + ms_verify_value(c, + mlget_item, + c->rcurr + extlen + keylen, + c->rvbytes - extlen - keylen); + + c->currcmd.retstat= MCD_END; + c->curr_task.get_miss= false; + c->rbytes-= c->rvbytes; + c->rcurr= c->rcurr + c->rvbytes; + assert(c->rcurr <= (c->rbuf + c->rsize)); + c->readval= false; + c->rvbytes= 0; + + if (ms_setting.mult_key_num > 1) + { + /* multi-get have check all the item */ + if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) + { + ms_reset_conn(c, false); + } + } + else + { + /* single get */ + ms_reset_conn(c, false); + } +} /* ms_bin_complete_nread */ + + +/** + * we get here after reading the value of get commands. + * + * @param c, pointer of the concurrency + */ +static void ms_complete_nread(ms_conn_t *c) +{ + assert(c != NULL); + assert(c->rbytes >= c->rvbytes); + assert(c->protocol == ascii_prot + || c->protocol == binary_prot); + + if (c->protocol == binary_prot) + { + ms_bin_complete_nread(c); + } + else + { + ms_ascii_complete_nread(c); + } +} /* ms_complete_nread */ + + +/** + * Adds a message header to a connection. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_add_msghdr(ms_conn_t *c) +{ + struct msghdr *msg; + + assert(c != NULL); + + if (c->msgsize == c->msgused) + { + msg= + realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr)); + if (! msg) + return -1; + + c->msglist= msg; + c->msgsize*= 2; + } + + msg= c->msglist + c->msgused; + + /** + * this wipes msg_iovlen, msg_control, msg_controllen, and + * msg_flags, the last 3 of which aren't defined on solaris: + */ + memset(msg, 0, sizeof(struct msghdr)); + + msg->msg_iov= &c->iov[c->iovused]; + + if (c->udp && (c->srv_recv_addr_size > 0)) + { + msg->msg_name= &c->srv_recv_addr; + msg->msg_namelen= c->srv_recv_addr_size; + } + + c->msgbytes= 0; + c->msgused++; + + if (c->udp) + { + /* Leave room for the UDP header, which we'll fill in later. */ + return ms_add_iov(c, NULL, UDP_HEADER_SIZE); + } + + return EXIT_SUCCESS; +} /* ms_add_msghdr */ + + +/** + * Ensures that there is room for another structure iovec in a connection's + * iov list. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_ensure_iov_space(ms_conn_t *c) +{ + assert(c != NULL); + + if (c->iovused >= c->iovsize) + { + int i, iovnum; + struct iovec *new_iov= (struct iovec *)realloc(c->iov, + ((size_t)c->iovsize + * 2) + * sizeof(struct iovec)); + if (! new_iov) + return -1; + + c->iov= new_iov; + c->iovsize*= 2; + + /* Point all the msghdr structures at the new list. */ + for (i= 0, iovnum= 0; i < c->msgused; i++) + { + c->msglist[i].msg_iov= &c->iov[iovnum]; + iovnum+= (int)c->msglist[i].msg_iovlen; + } + } + + return EXIT_SUCCESS; +} /* ms_ensure_iov_space */ + + +/** + * Adds data to the list of pending data that will be written out to a + * connection. + * + * @param c, pointer of the concurrency + * @param buf, the buffer includes data to send + * @param len, the data length in the buffer + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_add_iov(ms_conn_t *c, const void *buf, int len) +{ + struct msghdr *m; + int leftover; + bool limit_to_mtu; + + assert(c != NULL); + + do + { + m= &c->msglist[c->msgused - 1]; + + /* + * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes. + */ + limit_to_mtu= c->udp; + +#ifdef IOV_MAX + /* We may need to start a new msghdr if this one is full. */ + if ((m->msg_iovlen == IOV_MAX) + || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE))) + { + ms_add_msghdr(c); + m= &c->msglist[c->msgused - 1]; + } +#endif + + if (ms_ensure_iov_space(c) != 0) + return -1; + + /* If the fragment is too big to fit in the datagram, split it up */ + if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE)) + { + leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE; + len-= leftover; + } + else + { + leftover= 0; + } + + m= &c->msglist[c->msgused - 1]; + m->msg_iov[m->msg_iovlen].iov_base= (void *)buf; + m->msg_iov[m->msg_iovlen].iov_len= (size_t)len; + + c->msgbytes+= len; + c->iovused++; + m->msg_iovlen++; + + buf= ((char *)buf) + len; + len= leftover; + } + while (leftover > 0); + + return EXIT_SUCCESS; +} /* ms_add_iov */ + + +/** + * Constructs a set of UDP headers and attaches them to the outgoing messages. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_udp_headers(ms_conn_t *c) +{ + int i; + unsigned char *hdr; + + assert(c != NULL); + + c->request_id= ms_get_udp_request_id(); + + if (c->msgused > c->hdrsize) + { + void *new_hdrbuf; + if (c->hdrbuf) + new_hdrbuf= realloc(c->hdrbuf, + (size_t)c->msgused * 2 * UDP_HEADER_SIZE); + else + new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE); + if (! new_hdrbuf) + return -1; + + c->hdrbuf= (unsigned char *)new_hdrbuf; + c->hdrsize= c->msgused * 2; + } + + /* If this is a multi-packet request, drop it. */ + if (c->udp && (c->msgused > 1)) + { + fprintf(stderr, "multi-packet request for UDP not supported.\n"); + return -1; + } + + hdr= c->hdrbuf; + for (i= 0; i < c->msgused; i++) + { + c->msglist[i].msg_iov[0].iov_base= (void *)hdr; + c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE; + *hdr++= (unsigned char)(c->request_id / 256); + *hdr++= (unsigned char)(c->request_id % 256); + *hdr++= (unsigned char)(i / 256); + *hdr++= (unsigned char)(i % 256); + *hdr++= (unsigned char)(c->msgused / 256); + *hdr++= (unsigned char)(c->msgused % 256); + *hdr++= (unsigned char)1; /* support facebook memcached */ + *hdr++= (unsigned char)0; + assert(hdr == + ((unsigned char *)c->msglist[i].msg_iov[0].iov_base + + UDP_HEADER_SIZE)); + } + + return EXIT_SUCCESS; +} /* ms_build_udp_headers */ + + +/** + * Transmit the next chunk of data from our list of msgbuf structures. + * + * @param c, pointer of the concurrency + * + * @return TRANSMIT_COMPLETE All done writing. + * TRANSMIT_INCOMPLETE More data remaining to write. + * TRANSMIT_SOFT_ERROR Can't write any more right now. + * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing) + */ +static int ms_transmit(ms_conn_t *c) +{ + assert(c != NULL); + + if ((c->msgcurr < c->msgused) + && (c->msglist[c->msgcurr].msg_iovlen == 0)) + { + /* Finished writing the current msg; advance to the next. */ + c->msgcurr++; + } + + if (c->msgcurr < c->msgused) + { + ssize_t res; + struct msghdr *m= &c->msglist[c->msgcurr]; + + res= sendmsg(c->sfd, m, 0); + if (res > 0) + { + atomic_add_size(&ms_stats.bytes_written, res); + + /* We've written some of the data. Remove the completed + * iovec entries from the list of pending writes. */ + while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len) + { + res-= (ssize_t)m->msg_iov->iov_len; + m->msg_iovlen--; + m->msg_iov++; + } + + /* Might have written just part of the last iovec entry; + * adjust it so the next write will do the rest. */ + if (res > 0) + { + m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res); + m->msg_iov->iov_len-= (size_t)res; + } + return TRANSMIT_INCOMPLETE; + } + if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) + { + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + return TRANSMIT_HARD_ERROR; + } + return TRANSMIT_SOFT_ERROR; + } + + /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK, + * we have a real error, on which we close the connection */ + fprintf(stderr, "Failed to write, and not due to blocking.\n"); + + ms_conn_set_state(c, conn_closing); + return TRANSMIT_HARD_ERROR; + } + else + { + return TRANSMIT_COMPLETE; + } +} /* ms_transmit */ + + +/** + * Shrinks a connection's buffers if they're too big. This prevents + * periodic large "mget" response from server chewing lots of client + * memory. + * + * This should only be called in between requests since it can wipe output + * buffers! + * + * @param c, pointer of the concurrency + */ +static void ms_conn_shrink(ms_conn_t *c) +{ + assert(c != NULL); + + if (c->udp) + return; + + if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE)) + { + char *newbuf; + + if (c->rcurr != c->rbuf) + memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); + + newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); + + if (newbuf) + { + c->rbuf= newbuf; + c->rsize= DATA_BUFFER_SIZE; + } + c->rcurr= c->rbuf; + } + + if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT) + && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE)) + { + char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2); + if (new_rbuf) + { + c->rudpbuf= new_rbuf; + c->rudpsize= UDP_DATA_BUFFER_SIZE; + } + /* TODO check error condition? */ + } + + if (c->msgsize > MSG_LIST_HIGHWAT) + { + struct msghdr *newbuf= (struct msghdr *)realloc( + (void *)c->msglist, + MSG_LIST_INITIAL + * sizeof(c->msglist[0])); + if (newbuf) + { + c->msglist= newbuf; + c->msgsize= MSG_LIST_INITIAL; + } + /* TODO check error condition? */ + } + + if (c->iovsize > IOV_LIST_HIGHWAT) + { + struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov, + IOV_LIST_INITIAL + * sizeof(c->iov[0])); + if (newbuf) + { + c->iov= newbuf; + c->iovsize= IOV_LIST_INITIAL; + } + /* TODO check return value */ + } +} /* ms_conn_shrink */ + + +/** + * Sets a connection's current state in the state machine. Any special + * processing that needs to happen on certain state transitions can + * happen here. + * + * @param c, pointer of the concurrency + * @param state, connection state + */ +static void ms_conn_set_state(ms_conn_t *c, int state) +{ + assert(c != NULL); + + if (state != c->state) + { + if (state == conn_read) + { + ms_conn_shrink(c); + } + c->state= state; + } +} /* ms_conn_set_state */ + + +/** + * update the event if socks change state. for example: when + * change the listen scoket read event to sock write event, or + * change socket handler, we could call this function. + * + * @param c, pointer of the concurrency + * @param new_flags, new event flags + * + * @return bool, if success, return true, else return false + */ +static bool ms_update_event(ms_conn_t *c, const int new_flags) +{ + assert(c != NULL); + + struct event_base *base= c->event.ev_base; + if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0) + && (! ms_setting.facebook_test || (c->total_sfds == 1))) + { + return true; + } + + if (event_del(&c->event) == -1) + { + /* try to delete the event again */ + if (event_del(&c->event) == -1) + { + return false; + } + } + + event_set(&c->event, + c->sfd, + (short)new_flags, + ms_event_handler, + (void *)c); + event_base_set(base, &c->event); + c->ev_flags= (short)new_flags; + + if (event_add(&c->event, NULL) == -1) + { + return false; + } + + return true; +} /* ms_update_event */ + + +/** + * If user want to get the expected throughput, we could limit + * the performance of memslap. we could give up some work and + * just wait a short time. The function is used to check this + * case. + * + * @param c, pointer of the concurrency + * + * @return bool, if success, return true, else return false + */ +static bool ms_need_yield(ms_conn_t *c) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + int64_t tps= 0; + int64_t time_diff= 0; + struct timeval curr_time; + ms_task_t *task= &c->curr_task; + + if (ms_setting.expected_tps > 0) + { + gettimeofday(&curr_time, NULL); + time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time); + tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000); + + /* current throughput is greater than expected throughput */ + if (tps > ms_thread->thread_ctx->tps_perconn) + { + return true; + } + } + + return false; +} /* ms_need_yield */ + + +/** + * used to update the start time of each operation + * + * @param c, pointer of the concurrency + */ +static void ms_update_start_time(ms_conn_t *c) +{ + ms_task_item_t *item= c->curr_task.item; + + if ((ms_setting.stat_freq > 0) || c->udp + || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))) + { + gettimeofday(&c->start_time, NULL); + if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)) + { + /* record the current time */ + item->client_time= c->start_time.tv_sec; + } + } +} /* ms_update_start_time */ + + +/** + * run the state machine + * + * @param c, pointer of the concurrency + */ +static void ms_drive_machine(ms_conn_t *c) +{ + bool stop= false; + + assert(c != NULL); + + while (! stop) + { + switch (c->state) + { + case conn_read: + if (c->readval) + { + if (c->rbytes >= c->rvbytes) + { + ms_complete_nread(c); + break; + } + } + else + { + if (ms_try_read_line(c) != 0) + { + break; + } + } + + if (ms_try_read_network(c) != 0) + { + break; + } + + /* doesn't read all the response data, wait event wake up */ + if (! c->currcmd.isfinish) + { + if (! ms_update_event(c, EV_READ | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop= true; + break; + } + + /* we have no command line and no data to read from network, next write */ + ms_conn_set_state(c, conn_write); + memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ + + break; + + case conn_write: + if (! c->ctnwrite && ms_need_yield(c)) + { + usleep(10); + + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop= true; + break; + } + + if (! c->ctnwrite && (ms_exec_task(c) != 0)) + { + ms_conn_set_state(c, conn_closing); + break; + } + + /* record the start time before starting to send data if necessary */ + if (! c->ctnwrite || (c->change_sfd && c->ctnwrite)) + { + if (c->change_sfd) + { + c->change_sfd= false; + } + ms_update_start_time(c); + } + + /* change sfd if necessary */ + if (c->change_sfd) + { + c->ctnwrite= true; + stop= true; + break; + } + + /* execute task until nothing need be written to network */ + if (! c->ctnwrite && (c->msgcurr == c->msgused)) + { + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + stop= true; + break; + } + + switch (ms_transmit(c)) + { + case TRANSMIT_COMPLETE: + /* we have no data to write to network, next wait repose */ + if (! ms_update_event(c, EV_READ | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + c->ctnwrite= false; + break; + } + ms_conn_set_state(c, conn_read); + c->ctnwrite= false; + stop= true; + break; + + case TRANSMIT_INCOMPLETE: + c->ctnwrite= true; + break; /* Continue in state machine. */ + + case TRANSMIT_HARD_ERROR: + c->ctnwrite= false; + break; + + case TRANSMIT_SOFT_ERROR: + c->ctnwrite= true; + stop= true; + break; + + default: + break; + } /* switch */ + + break; + + case conn_closing: + /* recovery mode, need reconnect if connection close */ + if (ms_setting.reconnect && (! ms_global.time_out + || ((ms_setting.run_time == 0) + && (c->remain_exec_num > 0)))) + { + if (ms_reconn(c) != 0) + { + ms_conn_close(c); + stop= true; + break; + } + + ms_reset_conn(c, false); + + if (c->total_sfds == 1) + { + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + break; + } + } + + break; + } + else + { + ms_conn_close(c); + stop= true; + break; + } + + default: + assert(0); + } /* switch */ + } +} /* ms_drive_machine */ + + +/** + * the event handler of each thread + * + * @param fd, the file descriptor of socket + * @param which, event flag + * @param arg, argument + */ +void ms_event_handler(const int fd, const short which, void *arg) +{ + ms_conn_t *c= (ms_conn_t *)arg; + + assert(c != NULL); + + c->which= which; + + /* sanity */ + if (fd != c->sfd) + { + fprintf(stderr, + "Catastrophic: event fd: %d doesn't match conn fd: %d\n", + fd, + c->sfd); + ms_conn_close(c); + exit(1); + } + assert(fd == c->sfd); + + ms_drive_machine(c); + + /* wait for next event */ +} /* ms_event_handler */ + + +/** + * get the next socket descriptor index to run for replication + * + * @param c, pointer of the concurrency + * @param cmd, command(get or set ) + * + * @return int, if success, return the index, else return EXIT_SUCCESS + */ +static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) +{ + uint32_t sock_index= 0; + uint32_t i= 0; + + if (c->total_sfds == 1) + { + return EXIT_SUCCESS; + } + + if (ms_setting.rep_write_srv == 0) + { + return sock_index; + } + + do + { + if (cmd == CMD_SET) + { + for (i= 0; i < ms_setting.rep_write_srv; i++) + { + if (c->tcpsfd[i] > 0) + { + break; + } + } + + if (i == ms_setting.rep_write_srv) + { + /* random get one replication server to read */ + sock_index= (uint32_t)random() % c->total_sfds; + } + else + { + /* random get one replication writing server to write */ + sock_index= (uint32_t)random() % ms_setting.rep_write_srv; + } + } + else if (cmd == CMD_GET) + { + /* random get one replication server to read */ + sock_index= (uint32_t)random() % c->total_sfds; + } + } + while (c->tcpsfd[sock_index] == 0); + + return sock_index; +} /* ms_get_rep_sock_index */ + + +/** + * get the next socket descriptor index to run + * + * @param c, pointer of the concurrency + * + * @return int, return the index + */ +static uint32_t ms_get_next_sock_index(ms_conn_t *c) +{ + uint32_t sock_index= 0; + + do + { + sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx; + } + while (c->tcpsfd[sock_index] == 0); + + return sock_index; +} /* ms_get_next_sock_index */ + + +/** + * update socket event of the connections + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_update_conn_sock_event(ms_conn_t *c) +{ + assert(c != NULL); + + switch (c->currcmd.cmd) + { + case CMD_SET: + if (ms_setting.facebook_test && c->udp) + { + c->sfd= c->tcpsfd[0]; + c->udp= false; + c->change_sfd= true; + } + break; + + case CMD_GET: + if (ms_setting.facebook_test && ! c->udp) + { + c->sfd= c->udpsfd; + c->udp= true; + c->change_sfd= true; + } + break; + + default: + break; + } /* switch */ + + if (! c->udp && (c->total_sfds > 1)) + { + if (c->cur_idx != c->total_sfds) + { + if (ms_setting.rep_write_srv == 0) + { + c->cur_idx= ms_get_next_sock_index(c); + } + else + { + c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd); + } + } + else + { + /* must select the first sock of the connection at the beginning */ + c->cur_idx= 0; + } + + c->sfd= c->tcpsfd[c->cur_idx]; + assert(c->sfd != 0); + c->change_sfd= true; + } + + if (c->change_sfd) + { + if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) + { + fprintf(stderr, "Couldn't update event.\n"); + ms_conn_set_state(c, conn_closing); + return -1; + } + } + + return EXIT_SUCCESS; +} /* ms_update_conn_sock_event */ + + +/** + * for ASCII protocol, this function build the set command + * string and send the command. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) +{ + int value_offset; + int write_len; + char *buffer= c->wbuf; + + write_len= snprintf(buffer, + c->wsize, + " %u %d %d\r\n", + 0, + item->exp_time, + item->value_size); + + if (write_len > c->wsize || write_len < 0) + { + /* ought to be always enough. just fail for simplicity */ + fprintf(stderr, "output command line too long.\n"); + return -1; + } + + if (item->value_offset == INVALID_OFFSET) + { + value_offset= item->key_suffix_offset; + } + else + { + value_offset= item->value_offset; + } + + if ((ms_add_iov(c, "set ", 4) != 0) + || (ms_add_iov(c, (char *)&item->key_prefix, + (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, buffer, write_len) != 0) + || (ms_add_iov(c, &ms_setting.char_block[value_offset], + item->value_size) != 0) + || (ms_add_iov(c, "\r\n", 2) != 0) + || (c->udp && (ms_build_udp_headers(c) != 0))) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_set */ + + +/** + * used to send set command to server + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) +{ + assert(c != NULL); + + c->currcmd.cmd= CMD_SET; + c->currcmd.isfinish= false; + c->currcmd.retstat= MCD_FAILURE; + + if (ms_update_conn_sock_event(c) != 0) + { + return -1; + } + + c->msgcurr= 0; + c->msgused= 0; + c->iovused= 0; + if (ms_add_msghdr(c) != 0) + { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } + + /* binary protocol */ + if (c->protocol == binary_prot) + { + if (ms_build_bin_write_buf_set(c, item) != 0) + { + return -1; + } + } + else + { + if (ms_build_ascii_write_buf_set(c, item) != 0) + { + return -1; + } + } + + atomic_add_size(&ms_stats.obj_bytes, + item->key_size + item->value_size); + atomic_add_size(&ms_stats.cmd_set, 1); + + return EXIT_SUCCESS; +} /* ms_mcd_set */ + + +/** + * for ASCII protocol, this function build the get command + * string and send the command. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) +{ + if ((ms_add_iov(c, "get ", 4) != 0) + || (ms_add_iov(c, (char *)&item->key_prefix, + (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, "\r\n", 2) != 0) + || (c->udp && (ms_build_udp_headers(c) != 0))) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_get */ + + +/** + * used to send the get command to server + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) +{ + assert(c != NULL); + + c->currcmd.cmd= CMD_GET; + c->currcmd.isfinish= false; + c->currcmd.retstat= MCD_FAILURE; + + if (ms_update_conn_sock_event(c) != 0) + { + return -1; + } + + c->msgcurr= 0; + c->msgused= 0; + c->iovused= 0; + if (ms_add_msghdr(c) != 0) + { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } + + /* binary protocol */ + if (c->protocol == binary_prot) + { + if (ms_build_bin_write_buf_get(c, item) != 0) + { + return -1; + } + } + else + { + if (ms_build_ascii_write_buf_get(c, item) != 0) + { + return -1; + } + } + + atomic_add_size(&ms_stats.cmd_get, 1); + + return EXIT_SUCCESS; +} /* ms_mcd_get */ + + +/** + * for ASCII protocol, this function build the multi-get command + * string and send the command. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) +{ + ms_task_item_t *item; + + if (ms_add_iov(c, "get", 3) != 0) + { + return -1; + } + + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + item= c->mlget_task.mlget_item[i].item; + assert(item != NULL); + if ((ms_add_iov(c, " ", 1) != 0) + || (ms_add_iov(c, (char *)&item->key_prefix, + (int)KEY_PREFIX_SIZE) != 0) + || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int)KEY_PREFIX_SIZE) != 0)) + { + return -1; + } + } + + if ((ms_add_iov(c, "\r\n", 2) != 0) + || (c->udp && (ms_build_udp_headers(c) != 0))) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_build_ascii_write_buf_mlget */ + + +/** + * used to send the multi-get command to server + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_mcd_mlget(ms_conn_t *c) +{ + ms_task_item_t *item; + + assert(c != NULL); + assert(c->mlget_task.mlget_num >= 1); + + c->currcmd.cmd= CMD_GET; + c->currcmd.isfinish= false; + c->currcmd.retstat= MCD_FAILURE; + + if (ms_update_conn_sock_event(c) != 0) + { + return -1; + } + + c->msgcurr= 0; + c->msgused= 0; + c->iovused= 0; + if (ms_add_msghdr(c) != 0) + { + fprintf(stderr, "Out of memory preparing request."); + return -1; + } + + /* binary protocol */ + if (c->protocol == binary_prot) + { + if (ms_build_bin_write_buf_mlget(c) != 0) + { + return -1; + } + } + else + { + if (ms_build_ascii_write_buf_mlget(c) != 0) + { + return -1; + } + } + + /* decrease operation time of each item */ + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + item= c->mlget_task.mlget_item[i].item; + atomic_add_size(&ms_stats.cmd_get, 1); + } + + (void)item; + + return EXIT_SUCCESS; +} /* ms_mcd_mlget */ + + +/** + * binary protocol support + */ + +/** + * for binary protocol, parse the response of server + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_bin_process_response(ms_conn_t *c) +{ + const char *errstr= NULL; + + assert(c != NULL); + + uint32_t bodylen= c->binary_header.response.bodylen; + uint8_t opcode= c->binary_header.response.opcode; + uint16_t status= c->binary_header.response.status; + + if (bodylen > 0) + { + c->rvbytes= (int32_t)bodylen; + c->readval= true; + return EXIT_FAILURE; + } + else + { + switch (status) + { + case PROTOCOL_BINARY_RESPONSE_SUCCESS: + if (opcode == PROTOCOL_BINARY_CMD_SET) + { + c->currcmd.retstat= MCD_STORED; + } + else if (opcode == PROTOCOL_BINARY_CMD_DELETE) + { + c->currcmd.retstat= MCD_DELETED; + } + else if (opcode == PROTOCOL_BINARY_CMD_GET) + { + c->currcmd.retstat= MCD_END; + } + break; + + case PROTOCOL_BINARY_RESPONSE_ENOMEM: + errstr= "Out of memory"; + c->currcmd.retstat= MCD_SERVER_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: + errstr= "Unknown command"; + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + break; + + case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: + errstr= "Not found"; + c->currcmd.retstat= MCD_NOTFOUND; + break; + + case PROTOCOL_BINARY_RESPONSE_EINVAL: + errstr= "Invalid arguments"; + c->currcmd.retstat= MCD_PROTOCOL_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: + errstr= "Data exists for key."; + break; + + case PROTOCOL_BINARY_RESPONSE_E2BIG: + errstr= "Too large."; + c->currcmd.retstat= MCD_SERVER_ERROR; + break; + + case PROTOCOL_BINARY_RESPONSE_NOT_STORED: + errstr= "Not stored."; + c->currcmd.retstat= MCD_NOTSTORED; + break; + + default: + errstr= "Unknown error"; + c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; + break; + } /* switch */ + + if (errstr != NULL) + { + fprintf(stderr, "%s\n", errstr); + } + } + + return EXIT_SUCCESS; +} /* ms_bin_process_response */ + + +/* build binary header and add the header to the buffer to send */ + +/** + * build binary header and add the header to the buffer to send + * + * @param c, pointer of the concurrency + * @param opcode, operation code + * @param hdr_len, length of header + * @param key_len, length of key + * @param body_len. length of body + */ +static void ms_add_bin_header(ms_conn_t *c, + uint8_t opcode, + uint8_t hdr_len, + uint16_t key_len, + uint32_t body_len) +{ + protocol_binary_request_header *header; + + assert(c != NULL); + + header= (protocol_binary_request_header *)c->wcurr; + + header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ; + header->request.opcode= (uint8_t)opcode; + header->request.keylen= htons(key_len); + + header->request.extlen= (uint8_t)hdr_len; + header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES; + header->request.vbucket= 0; + + header->request.bodylen= htonl(body_len); + header->request.opaque= 0; + header->request.cas= 0; + + ms_add_iov(c, c->wcurr, sizeof(header->request)); +} /* ms_add_bin_header */ + + +/** + * add the key to the socket write buffer array + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + */ +static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) +{ + ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE); + ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], + item->key_size - (int)KEY_PREFIX_SIZE); +} + + +/** + * for binary protocol, this function build the set command + * and add the command to send buffer array. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) +{ + assert(c->wbuf == c->wcurr); + + int value_offset; + protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr; + uint16_t keylen= (uint16_t)item->key_size; + uint32_t bodylen= (uint32_t)sizeof(rep->message.body) + + (uint32_t)keylen + (uint32_t)item->value_size; + + ms_add_bin_header(c, + PROTOCOL_BINARY_CMD_SET, + sizeof(rep->message.body), + keylen, + bodylen); + rep->message.body.flags= 0; + rep->message.body.expiration= htonl((uint32_t)item->exp_time); + ms_add_iov(c, &rep->message.body, sizeof(rep->message.body)); + ms_add_key_to_iov(c, item); + + if (item->value_offset == INVALID_OFFSET) + { + value_offset= item->key_suffix_offset; + } + else + { + value_offset= item->value_offset; + } + ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size); + + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_set */ + + +/** + * for binary protocol, this function build the get command and + * add the command to send buffer array. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) +{ + assert(c->wbuf == c->wcurr); + + ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, + (uint32_t)item->key_size); + ms_add_key_to_iov(c, item); + + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_get */ + + +/** + * for binary protocol, this function build the multi-get + * command and add the command to send buffer array. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_build_bin_write_buf_mlget(ms_conn_t *c) +{ + ms_task_item_t *item; + + assert(c->wbuf == c->wcurr); + + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + item= c->mlget_task.mlget_item[i].item; + assert(item != NULL); + + ms_add_bin_header(c, + PROTOCOL_BINARY_CMD_GET, + 0, + (uint16_t)item->key_size, + (uint32_t)item->key_size); + ms_add_key_to_iov(c, item); + c->wcurr+= sizeof(protocol_binary_request_get); + } + + c->wcurr= c->wbuf; + + return EXIT_SUCCESS; +} /* ms_build_bin_write_buf_mlget */ diff --git a/src/bin/memaslap/ms_conn.h b/src/bin/memaslap/ms_conn.h new file mode 100644 index 00000000..d0530a7c --- /dev/null +++ b/src/bin/memaslap/ms_conn.h @@ -0,0 +1,241 @@ +/* + * File: ms_conn.h + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ +#ifndef MS_CONN_H +#define MS_CONN_H + +#include +#include +#include +#include + +#include "ms_task.h" +#include "libmemcachedprotocol-0.0/binary.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define DATA_BUFFER_SIZE (1024 * 1024 + 2048) /* read buffer, 1M + 2k, enough for the max value(1M) */ +#define WRITE_BUFFER_SIZE (32 * 1024) /* write buffer, 32k */ +#define UDP_DATA_BUFFER_SIZE (1 * 1024 * 1024) /* read buffer for UDP, 1M */ +#define UDP_MAX_PAYLOAD_SIZE 1400 /* server limit UDP payload size */ +#define UDP_MAX_SEND_PAYLOAD_SIZE 1400 /* mtu size is 1500 */ +#define UDP_HEADER_SIZE 8 /* UDP header size */ +#define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* Maximum socket buffer size */ +#define SOCK_WAIT_TIMEOUT 30 /* maximum waiting time of UDP, 30s */ +#define MAX_UDP_PACKET (1 << 16) /* maximum UDP packets, 65536 */ + +/* Initial size of the sendmsg() scatter/gather array. */ +#define IOV_LIST_INITIAL 400 + +/* Initial number of sendmsg() argument structures to allocate. */ +#define MSG_LIST_INITIAL 10 + +/* High water marks for buffer shrinking */ +#define READ_BUFFER_HIGHWAT (2 * DATA_BUFFER_SIZE) +#define UDP_DATA_BUFFER_HIGHWAT (4 * UDP_DATA_BUFFER_SIZE) +#define IOV_LIST_HIGHWAT 600 +#define MSG_LIST_HIGHWAT 100 + +/* parse udp header */ +#define HEADER_TO_REQID(ptr) ((uint16_t)*ptr * 256 \ + + (uint16_t)*(ptr + 1)) +#define HEADER_TO_SEQNUM(ptr) ((uint16_t)*(ptr \ + + 2) * 256 \ + + (uint16_t)*(ptr + 3)) +#define HEADER_TO_PACKETS(ptr) ((uint16_t)*(ptr \ + + 4) * 256 \ + + (uint16_t)*(ptr + 5)) + +/* states of connection */ +enum conn_states +{ + conn_read, /* reading in a command line */ + conn_write, /* writing out a simple response */ + conn_closing /* closing this connection */ +}; + +/* returned states of memcached command */ +enum mcd_ret +{ + MCD_SUCCESS, /* command success */ + MCD_FAILURE, /* command failure */ + MCD_UNKNOWN_READ_FAILURE, /* unknown read failure */ + MCD_PROTOCOL_ERROR, /* protocol error */ + MCD_CLIENT_ERROR, /* client error, wrong command */ + MCD_SERVER_ERROR, /* server error, server run command failed */ + MCD_DATA_EXISTS, /* object is existent in server */ + MCD_NOTSTORED, /* server doesn't set the object successfully */ + MCD_STORED, /* server set the object successfully */ + MCD_NOTFOUND, /* server not find the object */ + MCD_END, /* end of the response of get command */ + MCD_DELETED, /* server delete the object successfully */ + MCD_STAT /* response of stats command */ +}; + +/* used to store the current or previous running command state */ +typedef struct cmdstat +{ + int cmd; /* command name */ + int retstat; /* return state of this command */ + bool isfinish; /* if it read all the response data */ + uint64_t key_prefix; /* key prefix */ +} ms_cmdstat_t; + +/* udp packet structure */ +typedef struct udppkt +{ + uint8_t *header; /* udp header of the packet */ + char *data; /* udp data of the packet */ + int rbytes; /* number of data in the packet */ + int copybytes; /* number of copied data in the packet */ +} ms_udppkt_t; + +/* three protocols supported */ +enum protocol +{ + ascii_prot = 3, /* ASCII protocol */ + binary_prot /* binary protocol */ +}; + +/** + * concurrency structure + * + * Each thread has a libevent to manage the events of network. + * Each thread has one or more self-governed concurrencies; + * each concurrency has one or more socket connections. This + * concurrency structure includes all the private variables of + * the concurrency. + */ +typedef struct conn +{ + uint32_t conn_idx; /* connection index in the thread */ + int sfd; /* current tcp sock handler of the connection structure */ + int udpsfd; /* current udp sock handler of the connection structure*/ + int state; /* state of the connection */ + struct event event; /* event for libevent */ + short ev_flags; /* event flag for libevent */ + short which; /* which events were just triggered */ + bool change_sfd; /* whether change sfd */ + + int *tcpsfd; /* TCP sock array */ + uint32_t total_sfds; /* how many socks in the tcpsfd array */ + uint32_t alive_sfds; /* alive socks */ + uint32_t cur_idx; /* current sock index in tcpsfd array */ + + ms_cmdstat_t precmd; /* previous command state */ + ms_cmdstat_t currcmd; /* current command state */ + + char *rbuf; /* buffer to read commands into */ + char *rcurr; /* but if we parsed some already, this is where we stopped */ + int rsize; /* total allocated size of rbuf */ + int rbytes; /* how much data, starting from rcur, do we have unparsed */ + + bool readval; /* read value state, read known data size */ + int rvbytes; /* total value size need to read */ + + char *wbuf; /* buffer to write commands out */ + char *wcurr; /* for multi-get, where we stopped */ + int wsize; /* total allocated size of wbuf */ + bool ctnwrite; /* continue to write */ + + /* data for the mwrite state */ + struct iovec *iov; + int iovsize; /* number of elements allocated in iov[] */ + int iovused; /* number of elements used in iov[] */ + + struct msghdr *msglist; + int msgsize; /* number of elements allocated in msglist[] */ + int msgused; /* number of elements used in msglist[] */ + int msgcurr; /* element in msglist[] being transmitted now */ + int msgbytes; /* number of bytes in current msg */ + + /* data for UDP clients */ + bool udp; /* is this is a UDP "connection" */ + uint32_t request_id; /* UDP request ID of current operation, if this is a UDP "connection" */ + uint8_t *hdrbuf; /* udp packet headers */ + int hdrsize; /* number of headers' worth of space is allocated */ + struct sockaddr srv_recv_addr; /* Sent the most recent request to which server */ + socklen_t srv_recv_addr_size; + + /* udp read buffer */ + char *rudpbuf; /* buffer to read commands into for udp */ + int rudpsize; /* total allocated size of rudpbuf */ + int rudpbytes; /* how much data, starting from rudpbuf */ + + /* order udp packet */ + ms_udppkt_t *udppkt; /* the offset of udp packet in rudpbuf */ + int packets; /* number of total packets need to read */ + int recvpkt; /* number of received packets */ + int pktcurr; /* current packet in rudpbuf being ordered */ + int ordcurr; /* current ordered packet */ + + ms_task_item_t *item_win; /* task sequence */ + int win_size; /* current task window size */ + uint64_t set_cursor; /* current set item index in the item window */ + ms_task_t curr_task; /* current running task */ + ms_mlget_task_t mlget_task; /* multi-get task */ + + int warmup_num; /* to run how many warm up operations*/ + int remain_warmup_num; /* left how many warm up operations to run */ + int64_t exec_num; /* to run how many task operations */ + int64_t remain_exec_num; /* how many remained task operations to run */ + + /* response time statistic and time out control */ + struct timeval start_time; /* start time of current operation(s) */ + struct timeval end_time; /* end time of current operation(s) */ + + /* Binary protocol stuff */ + protocol_binary_response_header binary_header; /* local temporary binary header */ + enum protocol protocol; /* which protocol this connection speaks */ +} ms_conn_t; + +/* used to generate the key prefix */ +uint64_t ms_get_key_prefix(void); + + +/** + * setup a connection, each connection structure of each + * thread must call this function to initialize. + */ +int ms_setup_conn(ms_conn_t *c); + + +/* after one operation completes, reset the connection */ +void ms_reset_conn(ms_conn_t *c, bool timeout); + + +/** + * reconnect several disconnected socks in the connection + * structure, the ever-1-second timer of the thread will check + * whether some socks in the connections disconnect. if + * disconnect, reconnect the sock. + */ +int ms_reconn_socks(ms_conn_t *c); + + +/* used to send set command to server */ +int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item); + + +/* used to send the get command to server */ +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item); + + +/* used to send the multi-get command to server */ +int ms_mcd_mlget(ms_conn_t *c); + + +#ifdef __cplusplus +} +#endif + +#endif /* end of MS_CONN_H */ diff --git a/src/bin/memaslap/ms_memslap.h b/src/bin/memaslap/ms_memslap.h new file mode 100644 index 00000000..72af301a --- /dev/null +++ b/src/bin/memaslap/ms_memslap.h @@ -0,0 +1,133 @@ +/* + * File: ms_memslap.h + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ +#ifndef MS_MEMSLAP_H +#define MS_MEMSLAP_H + +#include +#include +#include +#include +#include +#include +#include +#include +#if !defined(__cplusplus) +# include +#endif +#include + +#include "ms_stats.h" +#include "ms_atomic.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* command line option */ +typedef enum +{ + OPT_VERSION= 'V', + OPT_HELP= 'h', + OPT_UDP= 'U', + OPT_SERVERS= 's', + OPT_EXECUTE_NUMBER= 'x', + OPT_THREAD_NUMBER= 'T', + OPT_CONCURRENCY= 'c', + OPT_FIXED_LTH= 'X', + OPT_VERIFY= 'v', + OPT_GETS_DIVISION= 'd', + OPT_TIME= 't', + OPT_CONFIG_CMD= 'F', + OPT_WINDOW_SIZE= 'w', + OPT_EXPIRE= 'e', + OPT_STAT_FREQ= 'S', + OPT_RECONNECT= 'R', + OPT_VERBOSE= 'b', + OPT_FACEBOOK_TEST= 'a', + OPT_SOCK_PER_CONN= 'n', + OPT_BINARY_PROTOCOL= 'B', + OPT_OVERWRITE= 'o', + OPT_TPS= 'P', + OPT_REP_WRITE_SRV= 'p' +} ms_options_t; + +/* global statistic of response time */ +typedef struct statistic +{ + pthread_mutex_t stat_mutex; /* synchronize the following members */ + + ms_stat_t get_stat; /* statistics of get command */ + ms_stat_t set_stat; /* statistics of set command */ + ms_stat_t total_stat; /* statistics of both get and set commands */ +} ms_statistic_t; + +/* global status statistic structure */ +typedef struct stats +{ + ATOMIC uint32_t active_conns; /* active connections */ + ATOMIC size_t bytes_read; /* read bytes */ + ATOMIC size_t bytes_written; /* written bytes */ + ATOMIC size_t obj_bytes; /* object bytes */ + ATOMIC size_t pre_cmd_get; /* previous total get command count */ + ATOMIC size_t pre_cmd_set; /* previous total set command count */ + ATOMIC size_t cmd_get; /* current total get command count */ + ATOMIC size_t cmd_set; /* current total set command count */ + ATOMIC size_t get_misses; /* total objects of get miss */ + ATOMIC size_t vef_miss; /* total objects of verification miss */ + ATOMIC size_t vef_failed; /* total objects of verification failed */ + ATOMIC size_t unexp_unget; /* total objects which is unexpired but not get */ + ATOMIC size_t exp_get; /* total objects which is expired but get */ + ATOMIC size_t pkt_disorder; /* disorder packages of UDP */ + ATOMIC size_t pkt_drop; /* packages dropped of UDP */ + ATOMIC size_t udp_timeout; /* how many times timeout of UDP happens */ +} ms_stats_t; + +/* lock adapter */ +typedef struct sync_lock +{ + uint32_t count; + pthread_mutex_t lock; + pthread_cond_t cond; +} ms_sync_lock_t; + +/* global variable structure */ +typedef struct global +{ + /* synchronize lock */ + ms_sync_lock_t init_lock; + ms_sync_lock_t warmup_lock; + ms_sync_lock_t run_lock; + + /* mutex for outputing error log synchronously when memslap crashes */ + pthread_mutex_t quit_mutex; + + /* mutex for generating key prefix */ + pthread_mutex_t seq_mutex; + + /* global synchronous flags for slap mode */ + bool finish_warmup; + bool time_out; +} ms_global_t; + +/* global structure */ +extern ms_global_t ms_global; + +/* global stats information structure */ +extern ms_stats_t ms_stats; + +/* global statistic structure */ +extern ms_statistic_t ms_statistic; + +#ifdef __cplusplus +} +#endif + +#endif /* end of MS_MEMSLAP_H */ diff --git a/src/bin/memaslap/ms_setting.c b/src/bin/memaslap/ms_setting.c new file mode 100644 index 00000000..6cb367a3 --- /dev/null +++ b/src/bin/memaslap/ms_setting.c @@ -0,0 +1,1068 @@ +/* + * File: ms_setting.c + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ + +#include "mem_config.h" + +#include "libmemcached/memcached.h" + +#include +#include +#include +#include +#include +#include +#include + + + +#include "ms_setting.h" +#include "ms_conn.h" + +#define MAX_EXEC_NUM 0x4000000000000000 /* 1 << 62 */ +#define ADDR_ALIGN(addr) ((addr + 15) & ~(16 - 1)) /* 16 bytes aligned */ +#define RAND_CHAR_SIZE (10 * 1024 * 1024) /* 10M character table */ +#define RESERVED_RAND_CHAR_SIZE (2 * 1024 * 1024) /* reserved 2M to avoid pointer sloping over */ + +#define DEFAULT_CONFIG_NAME ".memslap.cnf" + +#define DEFAULT_THREADS_NUM 1 /* default start one thread */ +#define DEFAULT_CONNS_NUM 16 /* default each thread with 16 connections */ +#define DEFAULT_EXE_NUM 0 /* default execute number is 0 */ +#define DEFAULT_VERIFY_RATE 0.0 /* default it doesn't do data verification */ +#define DEFAULT_OVERWRITE_RATE 0.0 /* default it doesn't do overwrite */ +#define DEFAULT_DIV 1 /* default it runs single get */ +#define DEFAULT_RUN_TIME 600 /* default run time 10 minutes */ +#define DEFAULT_WINDOW_SIZE (10 * UNIT_ITEMS_COUNT) /* default window size is 10k */ +#define DEFAULT_SOCK_PER_CONN 1 /* default socks per connection is 1 */ + +/* Use this for string generation */ +#define CHAR_COUNT 64 /* number of characters used to generate character table */ +const char ALPHANUMBERICS[]= + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz.-"; + +ms_setting_st ms_setting; /* store the settings specified by user */ + + +/* read setting from configuration file */ +static void ms_get_serverlist(char *str); +static uint32_t ms_get_cpu_count(void); +ms_conf_type_t ms_get_conf_type(char *line); +static int ms_is_line_data(char *line); +static int ms_read_is_data(char *line, ssize_t nread); +static void ms_no_config_file(void); +static void ms_parse_cfg_file(char *cfg_file); + + +/* initialize setting structure */ +static void ms_init_random_block(void); +static void ms_calc_avg_size(void); +static int ms_shuffle_distr(ms_distr_t *distr, int length); +static void ms_build_distr(void); +static void ms_print_setting(void); +static void ms_setting_slapmode_init_pre(void); +static void ms_setting_slapmode_init_post(void); + +#if !defined(HAVE_GETLINE) +#include +static ssize_t getline (char **line, size_t *line_size, FILE *fp) +{ + char delim= '\n'; + ssize_t result= 0; + size_t cur_len= 0; + + if (line == NULL || line_size == NULL || fp == NULL) + { + errno = EINVAL; + return -1; + } + + if (*line == NULL || *line_size == 0) + { + char *new_line; + *line_size = 120; + new_line= (char *) realloc (*line, *line_size); + if (new_line == NULL) + { + result= -1; + return result; + } + *line= new_line; + } + + for (;;) + { + int i= getc(fp); + if (i == EOF) + { + result = -1; + break; + } + + /* Make enough space for len+1 (for final NUL) bytes. */ + if (cur_len + 1 >= *line_size) + { + size_t needed_max= + SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX; + size_t needed= (2 * (*line_size)) + 1; + char *new_line; + + if (needed_max < needed) + needed= needed_max; + if (cur_len + 1 >= needed) + { + result= -1; + errno= EOVERFLOW; + return result; + } + + new_line= (char *)realloc(*line, needed); + if (new_line == NULL) + { + result= -1; + return result; + } + + *line= new_line; + *line_size= needed; + } + + (*line)[cur_len]= (char)i; + cur_len++; + + if (i == delim) + break; + } + (*line)[cur_len] = '\0'; + if (cur_len != 0) + return (ssize_t)cur_len; + return result; +} +#endif + +/** + * parse the server list string, and build the servers + * information structure array. this function is used to parse + * the command line options specified by user. + * + * @param str, the string of server list + */ +static void ms_get_serverlist(char *str) +{ + ms_mcd_server_t *srvs= NULL; + + /** + * Servers list format is like this. For example: + * "localhost:11108, localhost:11109" + */ + memcached_server_st *server_pool; + server_pool = memcached_servers_parse(str); + + for (uint32_t loop= 0; loop < memcached_server_list_count(server_pool); loop++) + { + assert(ms_setting.srv_cnt < ms_setting.total_srv_cnt); + strcpy(ms_setting.servers[ms_setting.srv_cnt].srv_host_name, server_pool[loop].hostname); + ms_setting.servers[ms_setting.srv_cnt].srv_port= server_pool[loop].port; + ms_setting.servers[ms_setting.srv_cnt].disconn_cnt= 0; + ms_setting.servers[ms_setting.srv_cnt].reconn_cnt= 0; + ms_setting.srv_cnt++; + + if (ms_setting.srv_cnt >= ms_setting.total_srv_cnt) + { + srvs= (ms_mcd_server_t *)realloc( ms_setting.servers, + (size_t)ms_setting.total_srv_cnt * sizeof(ms_mcd_server_t) * 2); + if (srvs == NULL) + { + fprintf(stderr, "Can't reallocate servers structure.\n"); + exit(1); + } + ms_setting.servers= srvs; + ms_setting.total_srv_cnt*= 2; + } + } + + memcached_server_free(server_pool); +} /* ms_get_serverlist */ + + +/** + * used to get the CPU count of the current system + * + * @return return the cpu count if get, else return EXIT_FAILURE + */ +static uint32_t ms_get_cpu_count() +{ +#ifdef HAVE__SC_NPROCESSORS_ONLN + return sysconf(_SC_NPROCESSORS_CONF); + +#else +# ifdef HAVE_CPU_SET_T + int cpu_count= 0; + cpu_set_t cpu_set; + + sched_getaffinity(0, sizeof(cpu_set_t), &cpu_set); + + for (int i= 0; i < (sizeof(cpu_set_t) * 8); i++) + { + if (CPU_ISSET(i, &cpu_set)) + { + cpu_count++; + } + } + + return cpu_count; + +# endif +#endif + + /* the system with one cpu at least */ + return EXIT_FAILURE; +} /* ms_get_cpu_count */ + + +/** + * used to get the configure type based on the type string read + * from the configuration file. + * + * @param line, string of one line + * + * @return ms_conf_type_t + */ +ms_conf_type_t ms_get_conf_type(char *line) +{ + if (! memcmp(line, "key", strlen("key"))) + { + return CONF_KEY; + } + else if (! memcmp(line, "value", strlen("value"))) + { + return CONF_VALUE; + } + else if (! memcmp(line, "cmd", strlen("cmd"))) + { + return CONF_CMD; + } + else + { + return CONF_NULL; + } +} /* ms_get_conf_type */ + + +/** + * judge whether the line is a line with useful data. used to + * parse the configuration file. + * + * @param line, string of one line + * + * @return if success, return EXIT_FAILURE, else return EXIT_SUCCESS + */ +static int ms_is_line_data(char *line) +{ + assert(line != NULL); + + char *begin_ptr= line; + + while (isspace(*begin_ptr)) + { + begin_ptr++; + } + if ((begin_ptr[0] == '\0') || (begin_ptr[0] == '#')) + return EXIT_SUCCESS; + + return EXIT_FAILURE; +} /* ms_is_line_data */ + + +/** + * function to bypass blank line and comments + * + * @param line, string of one line + * @param nread, length of the line + * + * @return if it's EOF or not line data, return EXIT_SUCCESS, else return EXIT_FAILURE + */ +static int ms_read_is_data(char *line, ssize_t nread) +{ + if ((nread == EOF) || ! ms_is_line_data(line)) + return EXIT_SUCCESS; + + return EXIT_FAILURE; +} /* ms_read_is_data */ + + +/** + * if no configuration file, use this function to create the default + * configuration file. + */ +static void ms_no_config_file() +{ + char userpath[PATH_MAX]; + struct passwd *usr= NULL; + FILE *fd; + + usr= getpwuid(getuid()); + + snprintf(userpath, PATH_MAX, "%s/%s", usr->pw_dir, DEFAULT_CONFIG_NAME); + + if (access (userpath, F_OK | R_OK) == 0) + goto exit; + + fd= fopen(userpath, "w+"); + + if (fd == NULL) + { + fprintf(stderr, "Could not create default configure file %s\n", userpath); + perror(strerror(errno)); + exit(1); + } + fprintf(fd, "%s", DEFAULT_CONGIF_STR); + fclose(fd); + +exit: + ms_setting.cfg_file= strdup(userpath); +} /* ms_no_config_file */ + + +/** + * parse the configuration file + * + * @param cfg_file, the configuration file name + */ +static void ms_parse_cfg_file(char *cfg_file) +{ + FILE *f; + size_t start_len, end_len; + double proportion; + char *line= NULL; + size_t read_len; + ssize_t nread; + int cmd_type; + ms_conf_type_t conf_type; + int end_of_file= 0; + ms_key_distr_t *key_distr= NULL; + ms_value_distr_t *val_distr= NULL; + + if (cfg_file == NULL) + { + ms_no_config_file(); + cfg_file= ms_setting.cfg_file; + } + + /*read key value configure file*/ + if ((f= fopen(cfg_file, "r")) == NULL) + { + fprintf(stderr, "Can not open file: '%s'.\n", cfg_file); + exit(1); + } + + while (1) + { + if ((((nread= getline(&line, &read_len, f)) == 1) + || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ + continue; + + if (nread == EOF) + { + fprintf(stderr, "Bad configuration file, no configuration find.\n"); + exit(1); + } + conf_type= ms_get_conf_type(line); + break; + } + + while (! end_of_file) + { + switch (conf_type) + { + case CONF_KEY: + while (1) + { + if ((((nread= getline(&line, &read_len, f)) == 1) + || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ + continue; + + if (nread != EOF) + { + if (sscanf(line, "%zu %zu %lf ", &start_len, + &end_len, &proportion) != 3) + { + conf_type= ms_get_conf_type(line); + break; + } + ms_setting.key_distr[ms_setting.key_rng_cnt].start_len= start_len; + ms_setting.key_distr[ms_setting.key_rng_cnt].end_len= end_len; + ms_setting.key_distr[ms_setting.key_rng_cnt].key_prop= proportion; + ms_setting.key_rng_cnt++; + + if (ms_setting.key_rng_cnt >= ms_setting.total_key_rng_cnt) + { + key_distr= (ms_key_distr_t *)realloc( + ms_setting.key_distr, + (size_t)ms_setting. + total_key_rng_cnt * sizeof(ms_key_distr_t) * 2); + if (key_distr == NULL) + { + fprintf(stderr, + "Can't reallocate key distribution structure.\n"); + exit(1); + } + ms_setting.key_distr= key_distr; + ms_setting.total_key_rng_cnt*= 2; + } + continue; + } + end_of_file= 1; + break; + } + break; + + case CONF_VALUE: + while (1) + { + if ((((nread= getline(&line, &read_len, f)) == 1) + || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ + continue; + + if (nread != EOF) + { + if (sscanf(line, "%zu %zu %lf", &start_len, &end_len, + &proportion) != 3) + { + conf_type= ms_get_conf_type(line); + break; + } + ms_setting.value_distr[ms_setting.val_rng_cnt].start_len= + start_len; + ms_setting.value_distr[ms_setting.val_rng_cnt].end_len= end_len; + ms_setting.value_distr[ms_setting.val_rng_cnt].value_prop= + proportion; + ms_setting.val_rng_cnt++; + + if (ms_setting.val_rng_cnt >= ms_setting.total_val_rng_cnt) + { + val_distr= (ms_value_distr_t *)realloc( + ms_setting.value_distr, + (size_t)ms_setting. + total_val_rng_cnt * sizeof(ms_value_distr_t) * 2); + if (val_distr == NULL) + { + fprintf(stderr, + "Can't reallocate key distribution structure.\n"); + exit(1); + } + ms_setting.value_distr= val_distr; + ms_setting.total_val_rng_cnt*= 2; + } + continue; + } + end_of_file= 1; + break; + } + break; + + case CONF_CMD: + while (1) + { + if ((((nread= getline(&line, &read_len, f)) == 1) + || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ + continue; + + if (nread != EOF) + { + if (sscanf(line, "%d %lf", &cmd_type, &proportion) != 2) + { + conf_type= ms_get_conf_type(line); + break; + } + if (cmd_type >= CMD_NULL) + { + continue; + } + ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_type= + cmd_type; + ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_prop= + proportion; + ms_setting.cmd_used_count++; + continue; + } + end_of_file= 1; + break; + } + + case CONF_NULL: + while (1) + { + if ((((nread= getline(&line, &read_len, f)) == 1) + || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ + continue; + + if (nread != EOF) + { + if ((conf_type= ms_get_conf_type(line)) != CONF_NULL) + { + break; + } + continue; + } + end_of_file= 1; + break; + } + break; + + default: + assert(0); + break; + } /* switch */ + } + + fclose(f); + + if (line != NULL) + { + free(line); + } +} /* ms_parse_cfg_file */ + + +/* calculate the average size of key and value */ +static void ms_calc_avg_size() +{ + double avg_val_size= 0.0; + double avg_key_size= 0.0; + double val_pro= 0.0; + double key_pro= 0.0; + double averge_len= 0.0; + size_t start_len= 0; + size_t end_len= 0; + + for (int j= 0; j < ms_setting.val_rng_cnt; j++) + { + val_pro= ms_setting.value_distr[j].value_prop; + start_len= ms_setting.value_distr[j].start_len; + end_len= ms_setting.value_distr[j].end_len; + + averge_len= val_pro * ((double)(start_len + end_len)) / 2; + avg_val_size+= averge_len; + } + + for (int j= 0; j < ms_setting.key_rng_cnt; j++) + { + key_pro= ms_setting.key_distr[j].key_prop; + start_len= ms_setting.key_distr[j].start_len; + end_len= ms_setting.key_distr[j].end_len; + + averge_len= key_pro * ((double)(start_len + end_len)) / 2; + avg_key_size+= averge_len; + } + + ms_setting.avg_val_size= (size_t)avg_val_size; + ms_setting.avg_key_size= (size_t)avg_key_size; +} /* ms_calc_avg_size */ + + +/** + * used to shuffle key and value distribution array to ensure + * (key, value) pair with different set. + * + * @param distr, pointer of distribution structure array + * @param length, length of the array + * + * @return always return EXIT_SUCCESS + */ +static int ms_shuffle_distr(ms_distr_t *distr, int length) +{ + int i, j; + int tmp_offset; + size_t tmp_size; + int64_t rnd; + + for (i= 0; i < length; i++) + { + rnd= random(); + j= (int)(rnd % (length - i)) + i; + + switch (rnd % 3) + { + case 0: + tmp_size= distr[j].key_size; + distr[j].key_size= distr[i].key_size; + distr[i].key_size= tmp_size; + break; + + case 1: + tmp_offset= distr[j].key_offset; + distr[j].key_offset= distr[i].key_offset; + distr[i].key_offset= tmp_offset; + break; + + case 2: + tmp_size= distr[j].value_size; + distr[j].value_size= distr[i].value_size; + distr[i].value_size= tmp_size; + break; + + default: + break; + } /* switch */ + } + + return EXIT_SUCCESS; +} /* ms_shuffle_distr */ + + +/** + * according to the key and value distribution, to build the + * (key, value) pair distribution. the (key, value) pair + * distribution array is global, each connection set or get + * object keeping this distribution, for the final result, we + * can reach the expected key and value distribution. + */ +static void ms_build_distr() +{ + int offset= 0; + int end= 0; + int key_cnt= 0; + int value_cnt= 0; + size_t average_len= 0; + size_t diff_len= 0; + size_t start_len= 0; + size_t end_len= 0; + int rnd= 0; + ms_distr_t *distr= NULL; + int units= (int)ms_setting.win_size / UNIT_ITEMS_COUNT; + + /* calculate average value size and key size */ + ms_calc_avg_size(); + + ms_setting.char_blk_size= RAND_CHAR_SIZE; + int key_scope_size= + (int)((ms_setting.char_blk_size - RESERVED_RAND_CHAR_SIZE) + / UNIT_ITEMS_COUNT); + + ms_setting.distr= (ms_distr_t *)malloc( + sizeof(ms_distr_t) * ms_setting.win_size); + if (ms_setting.distr == NULL) + { + fprintf(stderr, "Can't allocate distribution array."); + exit(1); + } + + /** + * character block is divided by how many different key + * size, each different key size has the same size character + * range. + */ + for (int m= 0; m < units; m++) + { + for (int i= 0; i < UNIT_ITEMS_COUNT; i++) + { + ms_setting.distr[m * UNIT_ITEMS_COUNT + i].key_offset= + ADDR_ALIGN(key_scope_size * i); + } + } + + /* initialize key size distribution */ + for (int m= 0; m < units; m++) + { + for (int j= 0; j < ms_setting.key_rng_cnt; j++) + { + key_cnt= (int)(UNIT_ITEMS_COUNT * ms_setting.key_distr[j].key_prop); + start_len= ms_setting.key_distr[j].start_len; + end_len= ms_setting.key_distr[j].end_len; + if ((start_len < MIN_KEY_SIZE) || (end_len < MIN_KEY_SIZE)) + { + fprintf(stderr, "key length must be greater than 16 bytes.\n"); + exit(1); + } + + if (! ms_setting.binary_prot_ + && ((start_len > MAX_KEY_SIZE) || (end_len > MAX_KEY_SIZE))) + { + fprintf(stderr, "key length must be less than 250 bytes.\n"); + exit(1); + } + + average_len= (start_len + end_len) / 2; + diff_len= (end_len - start_len) / 2; + for (int k= 0; k < key_cnt; k++) + { + if (offset >= (m + 1) * UNIT_ITEMS_COUNT) + { + break; + } + rnd= (int)random(); + if (k % 2 == 0) + { + ms_setting.distr[offset].key_size= + (diff_len == 0) ? average_len : + average_len + (size_t)rnd + % diff_len; + } + else + { + ms_setting.distr[offset].key_size= + (diff_len == 0) ? average_len : + average_len - (size_t)rnd + % diff_len; + } + offset++; + } + } + + if (offset < (m + 1) * UNIT_ITEMS_COUNT) + { + end= (m + 1) * UNIT_ITEMS_COUNT - offset; + for (int i= 0; i < end; i++) + { + ms_setting.distr[offset].key_size= ms_setting.avg_key_size; + offset++; + } + } + } + offset= 0; + + /* initialize value distribution */ + if (ms_setting.fixed_value_size != 0) + { + for (int i= 0; i < units * UNIT_ITEMS_COUNT; i++) + { + ms_setting.distr[i].value_size= ms_setting.fixed_value_size; + } + } + else + { + for (int m= 0; m < units; m++) + { + for (int j= 0; j < ms_setting.val_rng_cnt; j++) + { + value_cnt= + (int)(UNIT_ITEMS_COUNT * ms_setting.value_distr[j].value_prop); + start_len= ms_setting.value_distr[j].start_len; + end_len= ms_setting.value_distr[j].end_len; + if ((start_len <= 0) || (end_len <= 0)) + { + fprintf(stderr, "value length must be greater than 0 bytes.\n"); + exit(1); + } + + if ((start_len > MAX_VALUE_SIZE) || (end_len > MAX_VALUE_SIZE)) + { + fprintf(stderr, "key length must be less than or equal to 1M.\n"); + exit(1); + } + + average_len= (start_len + end_len) / 2; + diff_len= (end_len - start_len) / 2; + for (int k= 0; k < value_cnt; k++) + { + if (offset >= (m + 1) * UNIT_ITEMS_COUNT) + { + break; + } + rnd= (int)random(); + if (k % 2 == 0) + { + ms_setting.distr[offset].value_size= + (diff_len == 0) ? average_len : + average_len + + (size_t)rnd % diff_len; + } + else + { + ms_setting.distr[offset].value_size= + (diff_len == 0) ? average_len : + average_len + - (size_t)rnd % diff_len; + } + offset++; + } + } + + if (offset < (m + 1) * UNIT_ITEMS_COUNT) + { + end= (m + 1) * UNIT_ITEMS_COUNT - offset; + for (int i= 0; i < end; i++) + { + ms_setting.distr[offset++].value_size= ms_setting.avg_val_size; + } + } + } + } + + /* shuffle distribution */ + for (int i= 0; i < units; i++) + { + distr= &ms_setting.distr[i * UNIT_ITEMS_COUNT]; + for (int j= 0; j < 4; j++) + { + ms_shuffle_distr(distr, UNIT_ITEMS_COUNT); + } + } +} /* ms_build_distr */ + + +/** + * used to initialize the global character block. The character + * block is used to generate the suffix of the key and value. we + * only store a pointer in the character block for each key + * suffix or value string. It can save much memory to store key + * or value string. + */ +static void ms_init_random_block() +{ + char *ptr= NULL; + + assert(ms_setting.char_blk_size > 0); + + ms_setting.char_block= (char *)malloc(ms_setting.char_blk_size); + if (ms_setting.char_block == NULL) + { + fprintf(stderr, "Can't allocate global char block."); + exit(1); + } + ptr= ms_setting.char_block; + + for (int i= 0; (size_t)i < ms_setting.char_blk_size; i++) + { + *(ptr++)= ALPHANUMBERICS[random() % CHAR_COUNT]; + } +} /* ms_init_random_block */ + + +/** + * after initialization, call this function to output the main + * configuration user specified. + */ +static void ms_print_setting() +{ + fprintf(stdout, "servers : %s\n", ms_setting.srv_str); + fprintf(stdout, "threads count: %d\n", ms_setting.nthreads); + fprintf(stdout, "concurrency: %d\n", ms_setting.nconns); + if (ms_setting.run_time > 0) + { + fprintf(stdout, "run time: %ds\n", ms_setting.run_time); + } + else + { + fprintf(stdout, "execute number: %" PRId64 "\n", ms_setting.exec_num); + } + fprintf(stdout, "windows size: %" PRId64 "k\n", + (int64_t)(ms_setting.win_size / 1024)); + fprintf(stdout, "set proportion: set_prop=%.2f\n", + ms_setting.cmd_distr[CMD_SET].cmd_prop); + fprintf(stdout, "get proportion: get_prop=%.2f\n", + ms_setting.cmd_distr[CMD_GET].cmd_prop); + fflush(stdout); +} /* ms_print_setting */ + + +/** + * previous part of slap mode initialization of setting structure + */ +static void ms_setting_slapmode_init_pre() +{ + ms_setting.exec_num= DEFAULT_EXE_NUM; + ms_setting.verify_percent= DEFAULT_VERIFY_RATE; + ms_setting.exp_ver_per= DEFAULT_VERIFY_RATE; + ms_setting.overwrite_percent= DEFAULT_OVERWRITE_RATE; + ms_setting.mult_key_num= DEFAULT_DIV; + ms_setting.fixed_value_size= 0; + ms_setting.win_size= DEFAULT_WINDOW_SIZE; + ms_setting.udp= false; + ms_setting.reconnect= false; + ms_setting.verbose= false; + ms_setting.facebook_test= false; + ms_setting.binary_prot_= false; + ms_setting.stat_freq= 0; + ms_setting.srv_str= NULL; + ms_setting.cfg_file= NULL; + ms_setting.sock_per_conn= DEFAULT_SOCK_PER_CONN; + ms_setting.expected_tps= 0; + ms_setting.rep_write_srv= 0; +} /* ms_setting_slapmode_init_pre */ + + +/** + * previous part of initialization of setting structure + */ +void ms_setting_init_pre() +{ + memset(&ms_setting, 0, sizeof(ms_setting)); + + /* common initialize */ + ms_setting.ncpu= ms_get_cpu_count(); + ms_setting.nthreads= DEFAULT_THREADS_NUM; + ms_setting.nconns= DEFAULT_CONNS_NUM; + ms_setting.run_time= DEFAULT_RUN_TIME; + ms_setting.total_srv_cnt= MCD_SRVS_NUM_INIT; + ms_setting.servers= (ms_mcd_server_t *)malloc( + (size_t)ms_setting.total_srv_cnt + * sizeof(ms_mcd_server_t)); + if (ms_setting.servers == NULL) + { + fprintf(stderr, "Can't allocate servers structure.\n"); + exit(1); + } + + ms_setting_slapmode_init_pre(); +} /* ms_setting_init_pre */ + + +/** + * post part of slap mode initialization of setting structure + */ +static void ms_setting_slapmode_init_post() +{ + ms_setting.total_key_rng_cnt= KEY_RANGE_COUNT_INIT; + ms_setting.key_distr= + (ms_key_distr_t *)malloc((size_t)ms_setting.total_key_rng_cnt * sizeof(ms_key_distr_t)); + + if (ms_setting.key_distr == NULL) + { + fprintf(stderr, "Can't allocate key distribution structure.\n"); + exit(1); + } + + ms_setting.total_val_rng_cnt= VALUE_RANGE_COUNT_INIT; + + ms_setting.value_distr= + (ms_value_distr_t *)malloc((size_t)ms_setting.total_val_rng_cnt * sizeof( ms_value_distr_t)); + + if (ms_setting.value_distr == NULL) + { + fprintf(stderr, "Can't allocate value distribution structure.\n"); + exit(1); + } + + ms_parse_cfg_file(ms_setting.cfg_file); + + /* run time mode */ + if ((ms_setting.exec_num == 0) && (ms_setting.run_time != 0)) + { + ms_setting.exec_num= (int64_t)MAX_EXEC_NUM; + } + else + { + /* execute number mode */ + ms_setting.run_time= 0; + } + + if (ms_setting.rep_write_srv > 0) + { + /* for replication test, need enable reconnect feature */ + ms_setting.reconnect= true; + } + + if (ms_setting.facebook_test && (ms_setting.mult_key_num < 2)) + { + fprintf(stderr, "facebook test must work with multi-get, " + "please specify multi-get key number " + "with '--division' option.\n"); + exit(1); + } + + if (ms_setting.facebook_test && ms_setting.udp) + { + fprintf(stderr, "facebook test couldn't work with UDP.\n"); + exit(1); + } + + if (ms_setting.udp && (ms_setting.sock_per_conn > 1)) + { + fprintf(stderr, "UDP doesn't support multi-socks " + "in one connection structure.\n"); + exit(1); + } + + if ((ms_setting.rep_write_srv > 0) && (ms_setting.srv_cnt < 2)) + { + fprintf(stderr, "Please specify 2 servers at least for replication\n"); + exit(1); + } + + if ((ms_setting.rep_write_srv > 0) + && (ms_setting.srv_cnt < ms_setting.rep_write_srv)) + { + fprintf(stderr, "Servers to do replication writing " + "is larger than the total servers\n"); + exit(1); + } + + if (ms_setting.udp && (ms_setting.rep_write_srv > 0)) + { + fprintf(stderr, "UDP doesn't support replication.\n"); + exit(1); + } + + if (ms_setting.facebook_test && (ms_setting.rep_write_srv > 0)) + { + fprintf(stderr, "facebook test couldn't work with replication.\n"); + exit(1); + } + + ms_build_distr(); + + /* initialize global character block */ + ms_init_random_block(); + ms_print_setting(); +} /* ms_setting_slapmode_init_post */ + + +/** + * post part of initialization of setting structure + */ +void ms_setting_init_post() +{ + ms_get_serverlist(ms_setting.srv_str); + ms_setting_slapmode_init_post(); +} + + +/** + * clean up the global setting structure + */ +void ms_setting_cleanup() +{ + if (ms_setting.distr != NULL) + { + free(ms_setting.distr); + } + + if (ms_setting.char_block != NULL) + { + free(ms_setting.char_block); + } + + if (ms_setting.srv_str != NULL) + { + free(ms_setting.srv_str); + } + + if (ms_setting.cfg_file != NULL) + { + free(ms_setting.cfg_file); + } + + if (ms_setting.servers != NULL) + { + free(ms_setting.servers); + } + + if (ms_setting.key_distr != NULL) + { + free(ms_setting.key_distr); + } + + if (ms_setting.value_distr != NULL) + { + free(ms_setting.value_distr); + } +} /* ms_setting_cleanup */ diff --git a/src/bin/memaslap/ms_setting.h b/src/bin/memaslap/ms_setting.h new file mode 100644 index 00000000..9db956c9 --- /dev/null +++ b/src/bin/memaslap/ms_setting.h @@ -0,0 +1,181 @@ +/* + * File: ms_setting.h + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ +#ifndef MS_SETTING_H +#define MS_SETTING_H + +#include "ms_memslap.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define MCD_SRVS_NUM_INIT 8 +#define MCD_HOST_LENGTH 64 +#define KEY_RANGE_COUNT_INIT 8 +#define VALUE_RANGE_COUNT_INIT 8 +#define PROP_ERROR 0.001 + +#define MIN_KEY_SIZE 16 +#define MAX_KEY_SIZE 250 +#define MAX_VALUE_SIZE (1024 * 1024) + +/* the content of the configuration file for memslap running without configuration file */ +#define DEFAULT_CONGIF_STR \ + "key\n" \ + "64 64 1\n" \ + "value\n" \ + "1024 1024 1\n" \ + "cmd\n" \ + "0 0.1\n" \ + "1 0.9" + +/* Used to parse the value length return by server and path string */ +typedef struct token_s +{ + char *value; + size_t length; +} token_t; + +#define MAX_TOKENS 10 + +/* server information */ +typedef struct mcd_server +{ + char srv_host_name[MCD_HOST_LENGTH]; /* host name of server */ + int srv_port; /* server port */ + + /* for calculating how long the server disconnects */ + ATOMIC uint32_t disconn_cnt; /* number of disconnections count */ + ATOMIC uint32_t reconn_cnt; /* number of reconnections count */ + struct timeval disconn_time; /* start time of disconnection */ + struct timeval reconn_time; /* end time of reconnection */ +} ms_mcd_server_t; + +/* information of an item distribution including key and value */ +typedef struct distr +{ + size_t key_size; /* size of key */ + int key_offset; /* offset of one key in character block */ + size_t value_size; /* size of value */ +} ms_distr_t; + +/* information of key distribution */ +typedef struct key_distr +{ + size_t start_len; /* start of the key length range */ + size_t end_len; /* end of the key length range */ + double key_prop; /* key proportion */ +} ms_key_distr_t; + +/* information of value distribution */ +typedef struct value_distr +{ + size_t start_len; /* start of the value length range */ + size_t end_len; /* end of the value length range */ + double value_prop; /* value proportion */ +} ms_value_distr_t; + +/* memcached command types */ +typedef enum cmd_type +{ + CMD_SET, + CMD_GET, + CMD_NULL +} ms_cmd_type_t; + +/* types in the configuration file */ +typedef enum conf_type +{ + CONF_KEY, + CONF_VALUE, + CONF_CMD, + CONF_NULL +} ms_conf_type_t; + +/* information of command distribution */ +typedef struct cmd_distr +{ + ms_cmd_type_t cmd_type; /* command type */ + double cmd_prop; /* proportion of the command */ +} ms_cmd_distr_t; + +/* global setting structure */ +typedef struct setting +{ + uint32_t ncpu; /* cpu count of this system */ + uint32_t nthreads; /* total thread count, must equal or less than cpu cores */ + uint32_t nconns; /* total conn count, must multiply by total thread count */ + int64_t exec_num; /* total execute number */ + int run_time; /* total run time */ + + uint32_t char_blk_size; /* global character block size */ + char *char_block; /* global character block with random character */ + ms_distr_t *distr; /* distribution from configure file */ + + char *srv_str; /* string includes servers information */ + char *cfg_file; /* configure file name */ + + ms_mcd_server_t *servers; /* servers array */ + uint32_t total_srv_cnt; /* total servers count of the servers array */ + uint32_t srv_cnt; /* servers count */ + + ms_key_distr_t *key_distr; /* array of key distribution */ + int total_key_rng_cnt; /* total key range count of the array */ + int key_rng_cnt; /* actual key range count */ + + ms_value_distr_t *value_distr; /* array of value distribution */ + int total_val_rng_cnt; /* total value range count of the array */ + int val_rng_cnt; /* actual value range count */ + + ms_cmd_distr_t cmd_distr[CMD_NULL]; /* total we have CMD_NULL commands */ + int cmd_used_count; /* supported command count */ + + size_t fixed_value_size; /* fixed value size */ + size_t avg_val_size; /* average value size */ + size_t avg_key_size; /* average value size */ + + double verify_percent; /* percent of data verification */ + double exp_ver_per; /* percent of data verification with expire time */ + double overwrite_percent; /* percent of overwrite */ + int mult_key_num; /* number of keys used by multi-get once */ + size_t win_size; /* item window size per connection */ + bool udp; /* whether or not use UDP */ + int stat_freq; /* statistic frequency second */ + bool reconnect; /* whether it reconnect when connection close */ + bool verbose; /* whether it outputs detailed information when verification */ + bool facebook_test; /* facebook test, TCP set and multi-get with UDP */ + uint32_t sock_per_conn; /* number of socks per connection structure */ + bool binary_prot_; /* whether it use binary protocol */ + int expected_tps; /* expected throughput */ + uint32_t rep_write_srv; /* which servers are used to do replication writing */ +} ms_setting_st; + +extern ms_setting_st ms_setting; + +/* previous part of initialization of setting structure */ +void ms_setting_init_pre(void); + + +/* post part of initialization of setting structure */ +void ms_setting_init_post(void); + + +/* clean up the global setting structure */ +void ms_setting_cleanup(void); + + +#define UNUSED_ARGUMENT(x) (void)x + +#ifdef __cplusplus +} +#endif + +#endif /* end of MS_SETTING_H */ diff --git a/src/bin/memaslap/ms_sigsegv.c b/src/bin/memaslap/ms_sigsegv.c new file mode 100644 index 00000000..303381f4 --- /dev/null +++ b/src/bin/memaslap/ms_sigsegv.c @@ -0,0 +1,128 @@ +/* + * File: ms_sigsegv.c + * Author: Mingqiang Zhuang + * + * Created on March 15, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + * Rewrite of stack dump: + * Copyright (C) 2009 Sun Microsystems + * Author Trond Norbye + */ + +#include "mem_config.h" + +#include +#include +#include +#include +#include + +#include "ms_memslap.h" +#include "ms_setting.h" + +/* prototypes */ +int ms_setup_sigsegv(void); +int ms_setup_sigpipe(void); +int ms_setup_sigint(void); + + +/* signal seg reaches, this function will run */ +static void ms_signal_segv(int signum, siginfo_t *info, void *ptr) +{ + UNUSED_ARGUMENT(signum); + UNUSED_ARGUMENT(info); + UNUSED_ARGUMENT(ptr); + + pthread_mutex_lock(&ms_global.quit_mutex); + fprintf(stderr, "Segmentation fault occurred.\nStack trace:\n"); +#if 0 + pandora_print_callstack(stderr); +#endif + fprintf(stderr, "End of stack trace\n"); + pthread_mutex_unlock(&ms_global.quit_mutex); + abort(); +} + +/* signal int reaches, this function will run */ +static void ms_signal_int(int signum, siginfo_t *info, void *ptr) +{ + UNUSED_ARGUMENT(signum); + UNUSED_ARGUMENT(info); + UNUSED_ARGUMENT(ptr); + + pthread_mutex_lock(&ms_global.quit_mutex); + fprintf(stderr, "SIGINT handled.\n"); + pthread_mutex_unlock(&ms_global.quit_mutex); + exit(1); +} /* ms_signal_int */ + + +/** + * redirect signal seg + * + * @return if success, return EXIT_SUCCESS, else return -1 + */ +int ms_setup_sigsegv(void) +{ + struct sigaction action; + + memset(&action, 0, sizeof(action)); + action.sa_sigaction= ms_signal_segv; + action.sa_flags= SA_SIGINFO; + if (sigaction(SIGSEGV, &action, NULL) < 0) + { + perror("sigaction"); + return EXIT_SUCCESS; + } + + return -1; +} /* ms_setup_sigsegv */ + + +/** + * redirect signal pipe + * + * @return if success, return EXIT_SUCCESS, else return -1 + */ +int ms_setup_sigpipe(void) +{ + /* ignore the SIGPIPE signal */ + signal(SIGPIPE, SIG_IGN); + + return -1; +} /* ms_setup_sigpipe */ + + +/** + * redirect signal int + * + * @return if success, return EXIT_SUCCESS, else return -1 + */ +int ms_setup_sigint(void) +{ + struct sigaction action_3; + + memset(&action_3, 0, sizeof(action_3)); + action_3.sa_sigaction= ms_signal_int; + action_3.sa_flags= SA_SIGINFO; + if (sigaction(SIGINT, &action_3, NULL) < 0) + { + perror("sigaction"); + return EXIT_SUCCESS; + } + + return -1; +} /* ms_setup_sigint */ + + +#ifndef SIGSEGV_NO_AUTO_INIT +static void __attribute((constructor)) ms_init(void) +{ + ms_setup_sigsegv(); + ms_setup_sigpipe(); + ms_setup_sigint(); +} +#endif diff --git a/src/bin/memaslap/ms_sigsegv.h b/src/bin/memaslap/ms_sigsegv.h new file mode 100644 index 00000000..7990ff67 --- /dev/null +++ b/src/bin/memaslap/ms_sigsegv.h @@ -0,0 +1,34 @@ +/* + * File: ms_sigsegv.h + * Author: Mingqiang Zhuang + * + * Created on March 15, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ +#ifndef MS_SIGSEGV_H +#define MS_SIGSEGV_H + +#ifdef __cplusplus +extern "C" { +#endif + +/* redirect signal seg */ +int ms_setup_sigsegv(void); + + +/* redirect signal pipe */ +int ms_setup_sigpipe(void); + + +/* redirect signal int */ +int ms_setup_sigint(void); + + +#ifdef __cplusplus +} +#endif + +#endif /* end of MS_SIGSEGV_H */ diff --git a/src/bin/memaslap/ms_stats.c b/src/bin/memaslap/ms_stats.c new file mode 100644 index 00000000..086fb3ef --- /dev/null +++ b/src/bin/memaslap/ms_stats.c @@ -0,0 +1,303 @@ +/* + * File: ms_stats.h + * Author: Mingqiang Zhuang + * + * Created on March 25, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ + +#include "mem_config.h" + +#include +#include "ms_stats.h" + +#define array_size(x) (sizeof(x) / sizeof((x)[0])) + +static int ms_local_log2(uint64_t value); +static uint64_t ms_get_events(ms_stat_t *stat); + + +/** + * get the index of local log2 array + * + * @param value + * + * @return return the index of local log2 array + */ +static int ms_local_log2(uint64_t value) +{ + int result= 0; + + while (result <= 63 && ((uint64_t)1 << result) < value) + { + result++; + } + + return result; +} /* ms_local_log2 */ + + +/** + * initialize statistic structure + * + * @param stat, pointer of the statistic structure + * @param name, name of the statistic + */ +void ms_init_stats(ms_stat_t *stat, const char *name) +{ + memset(stat, 0, sizeof(*stat)); + + stat->name= (char *)name; + stat->min_time= (uint64_t)-1; + stat->max_time= 0; + stat->period_min_time= (uint64_t)-1; + stat->period_max_time= 0; + stat->log_product= 0; + stat->total_time= 0; + stat->pre_total_time= 0; + stat->squares= 0; + stat->pre_squares= 0; + stat->pre_events= 0; + stat->pre_log_product= 0; + stat->get_miss= 0; + stat->pre_get_miss= 0; +} /* ms_init_stats */ + + +/** + * record one event + * + * @param stat, pointer of the statistic structure + * @param total_time, response time of the command + * @param get_miss, whether it gets miss + */ +void ms_record_event(ms_stat_t *stat, uint64_t total_time, int get_miss) +{ + stat->total_time+= total_time; + + if (total_time < stat->min_time) + { + stat->min_time= total_time; + } + + if (total_time > stat->max_time) + { + stat->max_time= total_time; + } + + if (total_time < stat->period_min_time) + { + stat->period_min_time= total_time; + } + + if (total_time > stat->period_max_time) + { + stat->period_max_time= total_time; + } + + if (get_miss) + { + stat->get_miss++; + } + + stat->dist[ms_local_log2(total_time)]++; + stat->squares+= (double)(total_time * total_time); + + if (total_time != 0) + { + stat->log_product+= log((double)total_time); + } +} /* ms_record_event */ + + +/** + * get the events count + * + * @param stat, pointer of the statistic structure + * + * @return total events recorded + */ +static uint64_t ms_get_events(ms_stat_t *stat) +{ + uint64_t events= 0; + + for (uint32_t i= 0; i < array_size(stat->dist); i++) + { + events+= stat->dist[i]; + } + + return events; +} /* ms_get_events */ + + +/** + * dump the statistics + * + * @param stat, pointer of the statistic structure + */ +void ms_dump_stats(ms_stat_t *stat) +{ + uint64_t events= 0; + int max_non_zero= 0; + int min_non_zero= 0; + double average= 0; + + for (uint32_t i= 0; i < array_size(stat->dist); i++) + { + events+= stat->dist[i]; + if (stat->dist[i] != 0) + { + max_non_zero= (int)i; + } + } + + if (events == 0) + { + return; + } + average= (double)(stat->total_time / events); + + printf("%s Statistics (%lld events)\n", stat->name, (long long)events); + printf(" Min: %8lld\n", (long long)stat->min_time); + printf(" Max: %8lld\n", (long long)stat->max_time); + printf(" Avg: %8lld\n", (long long)(stat->total_time / events)); + printf(" Geo: %8.2lf\n", exp(stat->log_product / (double)events)); + + if (events > 1) + { + printf(" Std: %8.2lf\n", + sqrt((stat->squares - (double)events * average + * average) / ((double)events - 1))); + } + printf(" Log2 Dist:"); + + for (int i= 0; i <= max_non_zero - 4; i+= 4) + { + if ((stat->dist[i + 0] != 0) + || (stat->dist[i + 1] != 0) + || (stat->dist[i + 2] != 0) + || (stat->dist[i + 3] != 0)) + { + min_non_zero= i; + break; + } + } + + for (int i= min_non_zero; i <= max_non_zero; i++) + { + if ((i % 4) == 0) + { + printf("\n %2d:", (int)i); + } + printf(" %6" PRIu64 , stat->dist[i]); + } + + printf("\n\n"); +} /* ms_dump_stats */ + + +/** + * dump the format statistics + * + * @param stat, pointer of the statistic structure + * @param run_time, the total run time + * @param freq, statistic frequency + * @param obj_size, average object size + */ +void ms_dump_format_stats(ms_stat_t *stat, + int run_time, + int freq, + int obj_size) +{ + uint64_t events= 0; + double global_average= 0; + uint64_t global_tps= 0; + double global_rate= 0; + double global_std= 0; + double global_log= 0; + + double period_average= 0; + uint64_t period_tps= 0; + double period_rate= 0; + double period_std= 0; + double period_log= 0; + + if ((events= ms_get_events(stat)) == 0) + { + return; + } + + global_average= (double)(stat->total_time / events); + global_tps= events / (uint64_t)run_time; + global_rate= (double)events * obj_size / 1024 / 1024 / run_time; + global_std= sqrt((stat->squares - (double)events * global_average + * global_average) / (double)(events - 1)); + global_log= exp(stat->log_product / (double)events); + + uint64_t diff_time= stat->total_time - stat->pre_total_time; + uint64_t diff_events= events - stat->pre_events; + if (diff_events >= 1) + { + period_average= (double)(diff_time / diff_events); + period_tps= diff_events / (uint64_t)freq; + period_rate= (double)diff_events * obj_size / 1024 / 1024 / freq; + double diff_squares= (double)stat->squares - (double)stat->pre_squares; + period_std= sqrt((diff_squares - (double)diff_events * period_average + * period_average) / (double)(diff_events - 1)); + double diff_log_product= stat->log_product - stat->pre_log_product; + period_log= exp(diff_log_product / (double)diff_events); + } + + printf("%s Statistics\n", stat->name); + printf("%-8s %-8s %-12s %-12s %-10s %-10s %-8s %-10s %-10s %-10s %-10s\n", + "Type", + "Time(s)", + "Ops", + "TPS(ops/s)", + "Net(M/s)", + "Get_miss", + "Min(us)", + "Max(us)", + "Avg(us)", + "Std_dev", + "Geo_dist"); + + printf( + "%-8s %-8d %-12llu %-12lld %-10.1f %-10lld %-8lld %-10lld %-10lld %-10.2f %.2f\n", + "Period", + freq, + (long long)diff_events, + (long long)period_tps, + global_rate, + (long long)(stat->get_miss - stat->pre_get_miss), + (long long)stat->period_min_time, + (long long)stat->period_max_time, + (long long)period_average, + period_std, + period_log); + + printf( + "%-8s %-8d %-12llu %-12lld %-10.1f %-10lld %-8lld %-10lld %-10lld %-10.2f %.2f\n\n", + "Global", + run_time, + (long long)events, + (long long)global_tps, + period_rate, + (long long)stat->get_miss, + (long long)stat->min_time, + (long long)stat->max_time, + (long long)global_average, + global_std, + global_log); + + stat->pre_events= events; + stat->pre_squares= (uint64_t)stat->squares; + stat->pre_total_time= stat->total_time; + stat->pre_log_product= stat->log_product; + stat->period_min_time= (uint64_t)-1; + stat->period_max_time= 0; + stat->pre_get_miss= stat->get_miss; +} /* ms_dump_format_stats */ diff --git a/src/bin/memaslap/ms_stats.h b/src/bin/memaslap/ms_stats.h new file mode 100644 index 00000000..5ac88b3f --- /dev/null +++ b/src/bin/memaslap/ms_stats.h @@ -0,0 +1,69 @@ +/* + * File: ms_stats.h + * Author: Mingqiang Zhuang + * + * Created on March 25, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ +#ifndef MS_STAT_H +#define MS_STAT_H + +#include +#include +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* statistic structure of response time */ +typedef struct +{ + char *name; + uint64_t total_time; + uint64_t min_time; + uint64_t max_time; + uint64_t get_miss; + uint64_t dist[65]; + double squares; + double log_product; + + uint64_t period_min_time; + uint64_t period_max_time; + uint64_t pre_get_miss; + uint64_t pre_events; + uint64_t pre_total_time; + uint64_t pre_squares; + double pre_log_product; +} ms_stat_t; + +/* initialize statistic */ +void ms_init_stats(ms_stat_t *stat, const char *name); + + +/* record one event */ +void ms_record_event(ms_stat_t *stat, uint64_t time, int get_miss); + + +/* dump the statistics */ +void ms_dump_stats(ms_stat_t *stat); + + +/* dump the format statistics */ +void ms_dump_format_stats(ms_stat_t *stat, + int run_time, + int freq, + int obj_size); + + +#ifdef __cplusplus +} +#endif + +#endif /* MS_STAT_H */ diff --git a/src/bin/memaslap/ms_task.c b/src/bin/memaslap/ms_task.c new file mode 100644 index 00000000..f2cb8657 --- /dev/null +++ b/src/bin/memaslap/ms_task.c @@ -0,0 +1,1110 @@ +/* + * File: ms_task.c + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ + +#include "mem_config.h" + +#if defined(HAVE_SYS_TIME_H) +# include +#endif + +#if defined(HAVE_TIME_H) +# include +#endif + +#include "ms_thread.h" +#include "ms_setting.h" +#include "ms_atomic.h" + +/* command distribution adjustment cycle */ +#define CMD_DISTR_ADJUST_CYCLE 1000 +#define DISADJUST_FACTOR 0.03 /** + * In one adjustment cycle, if undo set or get + * operations proportion is more than 3% , means + * there are too many new item or need more new + * item in the window. This factor shows it. + */ + +/* get item from task window */ +static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c); +static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c); +static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c); +static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c); + + +/* select next operation to do */ +static void ms_select_opt(ms_conn_t *c, ms_task_t *task); + + +/* set and get speed estimate for controlling and adjustment */ +static bool ms_is_set_too_fast(ms_task_t *task); +static bool ms_is_get_too_fast(ms_task_t *task); +static void ms_kick_out_item(ms_task_item_t *item); + + +/* miss rate adjustment */ +static bool ms_need_overwrite_item(ms_task_t *task); +static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task); + + +/* deal with data verification initialization */ +static void ms_task_data_verify_init(ms_task_t *task); +static void ms_task_expire_verify_init(ms_task_t *task); + + +/* select a new task to do */ +static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup); + + +/* run the selected task */ +static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item); +static void ms_update_stat_result(ms_conn_t *c); +static void ms_update_multi_get_result(ms_conn_t *c); +static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item); +static void ms_update_task_result(ms_conn_t *c); +static void ms_single_getset_task_sch(ms_conn_t *c); +static void ms_multi_getset_task_sch(ms_conn_t *c); +static void ms_send_signal(ms_sync_lock_t *sync_lock); +static void ms_warmup_server(ms_conn_t *c); +static int ms_run_getset_task(ms_conn_t *c); + + +/** + * used to get the current operation item(object) + * + * @param c, pointer of the concurrency + * + * @return ms_task_item_t*, current operating item + */ +static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c) +{ + return c->curr_task.item; +} + + +/** + * used to get the next item to do get operation + * + * @param c, pointer of the concurrency + * + * @return ms_task_item_t*, the pointer of the next item to do + * get operation + */ +static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c) +{ + ms_task_item_t *item= NULL; + + if (c->set_cursor <= 0) + { + /* the first item in the window */ + item= &c->item_win[0]; + } + else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size) + { + /* random get one item set before */ + item= &c->item_win[random() % (int64_t)c->set_cursor]; + } + else + { + /* random get one item from the window */ + item= &c->item_win[random() % c->win_size]; + } + + return item; +} /* ms_get_next_get_item */ + + +/** + * used to get the next item to do set operation + * + * @param c, pointer of the concurrency + * + * @return ms_task_item_t*, the pointer of the next item to do + * set operation + */ +static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c) +{ + /** + * when a set command successes, the cursor will plus 1. If set + * fails, the cursor doesn't change. it isn't necessary to + * increase the cursor here. + */ + return &c->item_win[(int64_t)c->set_cursor % c->win_size]; +} + + +/** + * If we need do overwrite, we could select a item set before. + * This function is used to get a item set before to do + * overwrite. + * + * @param c, pointer of the concurrency + * + * @return ms_task_item_t*, the pointer of the previous item of + * set operation + */ +static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c) +{ + return ms_get_next_get_item(c); +} /* ms_get_random_overwrite_item */ + +/** + * According to the proportion of operations(get or set), select + * an operation to do. + * + * @param c, pointer of the concurrency + * @param task, pointer of current task in the concurrency + */ +static void ms_select_opt(ms_conn_t *c, ms_task_t *task) +{ + double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop; + double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop; + + /* update cycle operation number if necessary */ + if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0)) + { + task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop); + task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop); + } + + /** + * According to operation distribution to choose doing which + * operation. If it can't set new object to sever, just change + * to do get operation. + */ + if ((set_prop > PROP_ERROR) + && ((double)task->get_opt * set_prop >= (double)task->set_opt + * get_prop)) + { + task->cmd= CMD_SET; + task->item= ms_get_next_set_item(c); + } + else + { + task->cmd= CMD_GET; + task->item= ms_get_next_get_item(c); + } +} /* ms_select_opt */ + + +/** + * used to judge whether the number of get operations done is + * more than expected number of get operations to do right now. + * + * @param task, pointer of current task in the concurrency + * + * @return bool, if get too fast, return true, else return false + */ +static bool ms_is_get_too_fast(ms_task_t *task) +{ + double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop; + double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop; + + /* no get operation */ + if (get_prop < PROP_ERROR) + { + return false; + } + + int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR)) + * task->cycle_undo_get; + + if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop) + && (task->cycle_undo_set > max_undo_set)) + { + return true; + } + + return false; +} /* ms_is_get_too_fast */ + + +/** + * used to judge whether the number of set operations done is + * more than expected number of set operations to do right now. + * + * @param task, pointer of current task in the concurrency + * + * @return bool, if set too fast, return true, else return false + */ +static bool ms_is_set_too_fast(ms_task_t *task) +{ + double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop; + double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop; + + /* no set operation */ + if (set_prop < PROP_ERROR) + { + return false; + } + + /* If it does set operation too fast, skip some */ + int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR)) + * (double)task->cycle_undo_set); + + if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop) + && (task->cycle_undo_get > max_undo_get)) + { + return true; + } + + return false; +} /* ms_is_set_too_fast */ + + +/** + * kick out the old item in the window, and add a new item to + * overwrite the old item. When we don't want to do overwrite + * object, and the current item to do set operation is an old + * item, we could kick out the old item and add a new item. Then + * we can ensure we set new object every time. + * + * @param item, pointer of task item which includes the object + * information + */ +static void ms_kick_out_item(ms_task_item_t *item) +{ + /* allocate a new item */ + item->key_prefix= ms_get_key_prefix(); + + item->key_suffix_offset++; + item->value_offset= INVALID_OFFSET; /* new item use invalid value offset */ + item->client_time= 0; +} /* ms_kick_out_item */ + + +/** + * used to judge whether we need overwrite object based on the + * options user specified + * + * @param task, pointer of current task in the concurrency + * + * @return bool, if need overwrite, return true, else return + * false + */ +static bool ms_need_overwrite_item(ms_task_t *task) +{ + ms_task_item_t *item= task->item; + + assert(item != NULL); + assert(task->cmd == CMD_SET); + + /** + * according to data overwrite percent to determine if do data + * overwrite. + */ + if (task->overwrite_set < (double)task->set_opt + * ms_setting.overwrite_percent) + { + return true; + } + + return false; +} /* ms_need_overwirte_item */ + + +/** + * used to adjust operation. the function must be called after + * select operation. the function change get operation to set + * operation, or set operation to get operation based on the + * current case. + * + * @param c, pointer of the concurrency + * @param task, pointer of current task in the concurrency + * + * @return bool, if success, return true, else return false + */ +static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task) +{ + ms_task_item_t *item= task->item; + + assert(item != NULL); + + if (task->cmd == CMD_SET) + { + /* If did set operation too fast, skip some */ + if (ms_is_set_too_fast(task)) + { + /* get the item instead */ + if (item->value_offset != INVALID_OFFSET) + { + task->cmd= CMD_GET; + return true; + } + } + + /* If the current item is not a new item, kick it out */ + if (item->value_offset != INVALID_OFFSET) + { + if (ms_need_overwrite_item(task)) + { + /* overwrite */ + task->overwrite_set++; + } + else + { + /* kick out the current item to do set operation */ + ms_kick_out_item(item); + } + } + else /* it's a new item */ + { + /* need overwrite */ + if (ms_need_overwrite_item(task)) + { + /** + * overwrite not use the item with current set cursor, revert + * set cursor. + */ + c->set_cursor--; + + item= ms_get_random_overwrite_item(c); + if (item->value_offset != INVALID_OFFSET) + { + task->item= item; + task->overwrite_set++; + } + else /* item is a new item */ + { + /* select the item to run, and cancel overwrite */ + task->item= item; + } + } + } + task->cmd= CMD_SET; + return true; + } + else + { + if (item->value_offset == INVALID_OFFSET) + { + task->cmd= CMD_SET; + return true; + } + + /** + * If It does get operation too fast, it will change the + * operation to set. + */ + if (ms_is_get_too_fast(task)) + { + /* don't kick out the first item in the window */ + if (! ms_is_set_too_fast(task)) + { + ms_kick_out_item(item); + task->cmd= CMD_SET; + return true; + } + else + { + return false; + } + } + + assert(item->value_offset != INVALID_OFFSET); + + task->cmd= CMD_GET; + return true; + } +} /* ms_adjust_opt */ + + +/** + * used to initialize the task which need verify data. + * + * @param task, pointer of current task in the concurrency + */ +static void ms_task_data_verify_init(ms_task_t *task) +{ + ms_task_item_t *item= task->item; + + assert(item != NULL); + assert(task->cmd == CMD_GET); + + /** + * according to data verification percent to determine if do + * data verification. + */ + if (task->verified_get < (double)task->get_opt + * ms_setting.verify_percent) + { + /** + * currently it doesn't do verify, just increase the counter, + * and do verification next proper get command + */ + if ((task->item->value_offset != INVALID_OFFSET) + && (item->exp_time == 0)) + { + task->verify= true; + task->finish_verify= false; + task->verified_get++; + } + } +} /* ms_task_data_verify_init */ + + +/** + * used to initialize the task which need verify expire time. + * + * @param task, pointer of current task in the concurrency + */ +static void ms_task_expire_verify_init(ms_task_t *task) +{ + ms_task_item_t *item= task->item; + + assert(item != NULL); + assert(task->cmd == CMD_GET); + assert(item->exp_time > 0); + + task->verify= true; + task->finish_verify= false; +} /* ms_task_expire_verify_init */ + + +/** + * used to get one task, the function initializes the task + * structure. + * + * @param c, pointer of the concurrency + * @param warmup, whether it need warmup + * + * @return ms_task_t*, pointer of current task in the + * concurrency + */ +static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup) +{ + ms_task_t *task= &c->curr_task; + + while (1) + { + task->verify= false; + task->finish_verify= true; + task->get_miss= true; + + if (warmup) + { + task->cmd= CMD_SET; + task->item= ms_get_next_set_item(c); + + return task; + } + + /* according to operation distribution to choose doing which operation */ + ms_select_opt(c, task); + + if (! ms_adjust_opt(c, task)) + { + continue; + } + + if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET)) + { + ms_task_data_verify_init(task); + } + + if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET) + && (task->item->exp_time > 0)) + { + ms_task_expire_verify_init(task); + } + + break; + } + + /** + * Only update get and delete counter, set counter will be + * updated after set operation successes. + */ + if (task->cmd == CMD_GET) + { + task->get_opt++; + task->cycle_undo_get--; + } + + return task; +} /* ms_get_task */ + + +/** + * send a signal to the main monitor thread + * + * @param sync_lock, pointer of the lock + */ +static void ms_send_signal(ms_sync_lock_t *sync_lock) +{ + pthread_mutex_lock(&sync_lock->lock); + sync_lock->count++; + pthread_cond_signal(&sync_lock->cond); + pthread_mutex_unlock(&sync_lock->lock); +} /* ms_send_signal */ + + +/** + * If user only want to do get operation, but there is no object + * in server , so we use this function to warmup the server, and + * set some objects to server. It runs at the beginning of task. + * + * @param c, pointer of the concurrency + */ +static void ms_warmup_server(ms_conn_t *c) +{ + ms_task_t *task; + ms_task_item_t *item; + + /** + * Extra one loop to get the last command returned state. + * Normally it gets the previous command returned state. + */ + if ((c->remain_warmup_num >= 0) + && (c->remain_warmup_num != c->warmup_num)) + { + item= ms_get_cur_opt_item(c); + /* only update the set command result state for data verification */ + if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED)) + { + item->value_offset= item->key_suffix_offset; + /* set success, update counter */ + c->set_cursor++; + } + else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED) + { + printf("key: %" PRIx64 " didn't set success\n", item->key_prefix); + } + } + + /* the last time don't run a task */ + if (c->remain_warmup_num-- > 0) + { + /* operate next task item */ + task= ms_get_task(c, true); + item= task->item; + ms_mcd_set(c, item); + } + + /** + * finish warming up server, wait all connects initialize + * complete. Then all connects can start do task at the same + * time. + */ + if (c->remain_warmup_num == -1) + { + ms_send_signal(&ms_global.warmup_lock); + c->remain_warmup_num--; /* never run the if branch */ + } +} /* ms_warmup_server */ + + +/** + * dispatch single get and set task + * + * @param c, pointer of the concurrency + */ +static void ms_single_getset_task_sch(ms_conn_t *c) +{ + ms_task_t *task; + ms_task_item_t *item; + + /* the last time don't run a task */ + if (c->remain_exec_num-- > 0) + { + task= ms_get_task(c, false); + item= task->item; + if (task->cmd == CMD_SET) + { + ms_mcd_set(c, item); + } + else if (task->cmd == CMD_GET) + { + assert(task->cmd == CMD_GET); + ms_mcd_get(c, item); + } + } +} /* ms_single_getset_task_sch */ + + +/** + * dispatch multi-get and set task + * + * @param c, pointer of the concurrency + */ +static void ms_multi_getset_task_sch(ms_conn_t *c) +{ + ms_task_t *task; + ms_mlget_task_item_t *mlget_item; + + while (1) + { + if (c->remain_exec_num-- > 0) + { + task= ms_get_task(c, false); + if (task->cmd == CMD_SET) /* just do it */ + { + ms_mcd_set(c, task->item); + break; + } + else + { + assert(task->cmd == CMD_GET); + mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num]; + mlget_item->item= task->item; + mlget_item->verify= task->verify; + mlget_item->finish_verify= task->finish_verify; + mlget_item->get_miss= task->get_miss; + c->mlget_task.mlget_num++; + + /* enough multi-get task items can be done */ + if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num) + || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) + { + ms_mcd_mlget(c); + break; + } + } + } + else + { + if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0)) + { + ms_mcd_mlget(c); + } + break; + } + } +} /* ms_multi_getset_task_sch */ + + +/** + * calculate the difference value of two time points + * + * @param start_time, the start time + * @param end_time, the end time + * + * @return uint64_t, the difference value between start_time and end_time in us + */ +int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time) +{ + int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec; + int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec; + + assert(endtime >= starttime); + + return endtime - starttime; +} /* ms_time_diff */ + + +/** + * after get the response from server for multi-get, the + * function update the state of the task and do data verify if + * necessary. + * + * @param c, pointer of the concurrency + */ +static void ms_update_multi_get_result(ms_conn_t *c) +{ + ms_mlget_task_item_t *mlget_item; + ms_task_item_t *item; + char *orignval= NULL; + char *orignkey= NULL; + + if (c == NULL) + { + return; + } + assert(c != NULL); + + for (int i= 0; i < c->mlget_task.mlget_num; i++) + { + mlget_item= &c->mlget_task.mlget_item[i]; + item= mlget_item->item; + orignval= &ms_setting.char_block[item->value_offset]; + orignkey= &ms_setting.char_block[item->key_suffix_offset]; + + /* update get miss counter */ + if (mlget_item->get_miss) + { + atomic_add_size(&ms_stats.get_misses, 1); + } + + /* get nothing from server for this task item */ + if (mlget_item->verify && ! mlget_item->finish_verify) + { + /* verify expire time if necessary */ + if (item->exp_time > 0) + { + struct timeval curr_time; + gettimeofday(&curr_time, NULL); + + /* object doesn't expire but can't get it now */ + if (curr_time.tv_sec - item->client_time + < item->exp_time - EXPIRE_TIME_ERROR) + { + atomic_add_size(&ms_stats.unexp_unget, 1); + + if (ms_setting.verbose) + { + char set_time[64]; + char cur_time[64]; + strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", + localtime(&item->client_time)); + strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", + localtime(&curr_time.tv_sec)); + fprintf(stderr, + "\n\t<%d expire time verification failed, object " + "doesn't expire but can't get it now\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64 " %.*s\n" + "\tset time: %s current time: %s " + "diff time: %d expire time: %d\n" + "\texpected data len: %d\n" + "\texpected data: %.*s\n" + "\treceived data: \n", + c->sfd, + item->key_size, + item->key_prefix, + item->key_size - (int)KEY_PREFIX_SIZE, + orignkey, + set_time, + cur_time, + (int)(curr_time.tv_sec - item->client_time), + item->exp_time, + item->value_size, + item->value_size, + orignval); + fflush(stderr); + } + } + } + else + { + atomic_add_size(&ms_stats.vef_miss, 1); + + if (ms_setting.verbose) + { + fprintf(stderr, "\n<%d data verification failed\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64 " %.*s\n" + "\texpected data len: %d\n" + "\texpected data: %.*s\n" + "\treceived data: \n", + c->sfd, item->key_size, item->key_prefix, + item->key_size - (int)KEY_PREFIX_SIZE, + orignkey, item->value_size, item->value_size, orignval); + fflush(stderr); + } + } + } + } + c->mlget_task.mlget_num= 0; + c->mlget_task.value_index= INVALID_OFFSET; +} /* ms_update_multi_get_result */ + + +/** + * after get the response from server for single get, the + * function update the state of the task and do data verify if + * necessary. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + */ +static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item) +{ + char *orignval= NULL; + char *orignkey= NULL; + + if ((c == NULL) || (item == NULL)) + { + return; + } + assert(c != NULL); + assert(item != NULL); + + orignval= &ms_setting.char_block[item->value_offset]; + orignkey= &ms_setting.char_block[item->key_suffix_offset]; + + /* update get miss counter */ + if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss) + { + atomic_add_size(&ms_stats.get_misses, 1); + } + + /* get nothing from server for this task item */ + if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify + && ! c->curr_task.finish_verify) + { + /* verify expire time if necessary */ + if (item->exp_time > 0) + { + struct timeval curr_time; + gettimeofday(&curr_time, NULL); + + /* object doesn't expire but can't get it now */ + if (curr_time.tv_sec - item->client_time + < item->exp_time - EXPIRE_TIME_ERROR) + { + atomic_add_size(&ms_stats.unexp_unget, 1); + + if (ms_setting.verbose) + { + char set_time[64]; + char cur_time[64]; + strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", + localtime(&item->client_time)); + strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", + localtime(&curr_time.tv_sec)); + fprintf(stderr, + "\n\t<%d expire time verification failed, object " + "doesn't expire but can't get it now\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64 " %.*s\n" + "\tset time: %s current time: %s " + "diff time: %d expire time: %d\n" + "\texpected data len: %d\n" + "\texpected data: %.*s\n" + "\treceived data: \n", + c->sfd, + item->key_size, + item->key_prefix, + item->key_size - (int)KEY_PREFIX_SIZE, + orignkey, + set_time, + cur_time, + (int)(curr_time.tv_sec - item->client_time), + item->exp_time, + item->value_size, + item->value_size, + orignval); + fflush(stderr); + } + } + } + else + { + atomic_add_size(&ms_stats.vef_miss, 1); + + if (ms_setting.verbose) + { + fprintf(stderr, "\n<%d data verification failed\n" + "\tkey len: %d\n" + "\tkey: %" PRIx64 " %.*s\n" + "\texpected data len: %d\n" + "\texpected data: %.*s\n" + "\treceived data: \n", + c->sfd, item->key_size, item->key_prefix, + item->key_size - (int)KEY_PREFIX_SIZE, + orignkey, item->value_size, item->value_size, orignval); + fflush(stderr); + } + } + } +} /* ms_update_single_get_result */ + + +/** + * after get the response from server for set the function + * update the state of the task and do data verify if necessary. + * + * @param c, pointer of the concurrency + * @param item, pointer of task item which includes the object + * information + */ +static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item) +{ + if ((c == NULL) || (item == NULL)) + { + return; + } + assert(c != NULL); + assert(item != NULL); + + if (c->precmd.cmd == CMD_SET) + { + switch (c->precmd.retstat) + { + case MCD_STORED: + if (item->value_offset == INVALID_OFFSET) + { + /* first set with the same offset of key suffix */ + item->value_offset= item->key_suffix_offset; + } + else + { + /* not first set, just increase the value offset */ + item->value_offset+= 1; + } + + /* set successes, update counter */ + c->set_cursor++; + c->curr_task.set_opt++; + c->curr_task.cycle_undo_set--; + break; + + case MCD_SERVER_ERROR: + default: + break; + } /* switch */ + } +} /* ms_update_set_result */ + + +/** + * update the response time result + * + * @param c, pointer of the concurrency + */ +static void ms_update_stat_result(ms_conn_t *c) +{ + bool get_miss= false; + + if (c == NULL) + { + return; + } + assert(c != NULL); + + gettimeofday(&c->end_time, NULL); + uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time); + + pthread_mutex_lock(&ms_statistic.stat_mutex); + + switch (c->precmd.cmd) + { + case CMD_SET: + ms_record_event(&ms_statistic.set_stat, time_diff, false); + break; + + case CMD_GET: + if (c->curr_task.get_miss) + { + get_miss= true; + } + ms_record_event(&ms_statistic.get_stat, time_diff, get_miss); + break; + + default: + break; + } /* switch */ + + ms_record_event(&ms_statistic.total_stat, time_diff, get_miss); + pthread_mutex_unlock(&ms_statistic.stat_mutex); +} /* ms_update_stat_result */ + + +/** + * after get response from server for the current operation, and + * before doing the next operation, update the state of the + * current operation. + * + * @param c, pointer of the concurrency + */ +static void ms_update_task_result(ms_conn_t *c) +{ + ms_task_item_t *item; + + if (c == NULL) + { + return; + } + assert(c != NULL); + + item= ms_get_cur_opt_item(c); + if (item == NULL) + { + return; + } + assert(item != NULL); + + ms_update_set_result(c, item); + + if ((ms_setting.stat_freq > 0) + && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET))) + { + ms_update_stat_result(c); + } + + /* update multi-get task item */ + if (((ms_setting.mult_key_num > 1) + && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) + || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) + { + ms_update_multi_get_result(c); + } + else + { + ms_update_single_get_result(c, item); + } +} /* ms_update_task_result */ + + +/** + * run get and set operation + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_run_getset_task(ms_conn_t *c) +{ + /** + * extra one loop to get the last command return state. get the + * last command return state. + */ + if ((c->remain_exec_num >= 0) + && (c->remain_exec_num != c->exec_num)) + { + ms_update_task_result(c); + } + + /* multi-get */ + if (ms_setting.mult_key_num > 1) + { + /* operate next task item */ + ms_multi_getset_task_sch(c); + } + else + { + /* operate next task item */ + ms_single_getset_task_sch(c); + } + + /* no task to do, exit */ + if ((c->remain_exec_num == -1) || ms_global.time_out) + { + return -1; + } + + return EXIT_SUCCESS; +} /* ms_run_getset_task */ + + +/** + * the state machine call the function to execute task. + * + * @param c, pointer of the concurrency + * + * @return int, if success, return EXIT_SUCCESS, else return -1 + */ +int ms_exec_task(struct conn *c) +{ + if (! ms_global.finish_warmup) + { + ms_warmup_server(c); + } + else + { + if (ms_run_getset_task(c) != 0) + { + return -1; + } + } + + return EXIT_SUCCESS; +} /* ms_exec_task */ diff --git a/src/bin/memaslap/ms_task.h b/src/bin/memaslap/ms_task.h new file mode 100644 index 00000000..c4917d11 --- /dev/null +++ b/src/bin/memaslap/ms_task.h @@ -0,0 +1,94 @@ +/* + * File: ms_task.h + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ +#ifndef MS_TASK_H +#define MS_TASK_H + +#include +#include +#if !defined(__cplusplus) +# include +#endif +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define UNIT_ITEMS_COUNT 1024 /* each window unit has 1024 items */ +#define KEY_PREFIX_SIZE (sizeof(uint64_t)) /* key prefix length: 8 bytes */ +#define INVALID_OFFSET (-1) /* invalid offset in the character table */ +#define FIXED_EXPIRE_TIME 60 /* default expire time is 60s */ +#define EXPIRE_TIME_ERROR 5 /* default expire time error is 5s */ + +/* information of a task item(object) */ +typedef struct task_item +{ + uint64_t key_prefix; /* prefix of the key, 8 bytes, binary */ + int key_size; /* key size */ + int key_suffix_offset; /* suffix offset in the global character table */ + + int value_size; /* data size */ + int value_offset; /* data offset in the global character table */ + + time_t client_time; /* the current client time */ + int exp_time; /* expire time */ +} ms_task_item_t; + +/* task item for multi-get */ +typedef struct mlget_task_item +{ + ms_task_item_t *item; /* task item */ + bool verify; /* whether verify data or not */ + bool finish_verify; /* whether finish data verify or not */ + bool get_miss; /* whether get miss or not */ +} ms_mlget_task_item_t; + +/* information of multi-get task */ +typedef struct mlget_task +{ + ms_mlget_task_item_t *mlget_item; /* multi-get task array */ + int mlget_num; /* how many tasks in mlget_task array */ + int value_index; /* the nth value received by the connect, for multi-get */ +} ms_mlget_task_t; + +/* structure used to store the state of the running task */ +typedef struct task +{ + int cmd; /* command name */ + bool verify; /* whether verify data or not */ + bool finish_verify; /* whether finish data verify or not */ + bool get_miss; /* whether get miss or not */ + ms_task_item_t *item; /* task item */ + + /* counter for command distribution adjustment */ + uint64_t get_opt; /* number of total get operations */ + uint64_t set_opt; /* number of total set operations, no including warmup set count */ + int cycle_undo_get; /* number of undo get in an adjustment cycle */ + int cycle_undo_set; /* number of undo set in an adjustment cycle */ + uint64_t verified_get; /* number of total verified get operations */ + uint64_t overwrite_set; /* number of total overwrite set operations */ +} ms_task_t; + +struct conn; + +/* the state machine call the function to execute task.*/ +int ms_exec_task(struct conn *c); + + +/* calculate the difference value of two time points */ +int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time); + + +#ifdef __cplusplus +} +#endif + +#endif /* end of MS_TASK_H */ diff --git a/src/bin/memaslap/ms_thread.c b/src/bin/memaslap/ms_thread.c new file mode 100644 index 00000000..f9f52bfb --- /dev/null +++ b/src/bin/memaslap/ms_thread.c @@ -0,0 +1,349 @@ +/* + * File: ms_thread.c + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ + +#include "mem_config.h" + +#if defined(HAVE_SYS_TIME_H) +# include +#endif + +#if defined(HAVE_TIME_H) +# include +#endif + +#include "ms_thread.h" +#include "ms_setting.h" +#include "ms_atomic.h" + +/* global variable */ +pthread_key_t ms_thread_key; + +/* array of thread context structure, each thread has a thread context structure */ +static ms_thread_ctx_t *ms_thread_ctx; + +/* functions */ +static void ms_set_current_time(void); +static void ms_check_sock_timeout(void); +static void ms_clock_handler(const int fd, const short which, void *arg); +static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu); +static int ms_setup_thread(ms_thread_ctx_t *thread_ctx); +static void *ms_worker_libevent(void *arg); +static void ms_create_worker(void *(*func)(void *), void *arg); + + +/** + * time-sensitive callers can call it by hand with this, + * outside the normal ever-1-second timer + */ +static void ms_set_current_time() +{ + struct timeval timer; + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + + gettimeofday(&timer, NULL); + ms_thread->curr_time= (rel_time_t)timer.tv_sec; +} /* ms_set_current_time */ + + +/** + * used to check whether UDP of command are waiting timeout + * by the ever-1-second timer + */ +static void ms_check_sock_timeout(void) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + ms_conn_t *c= NULL; + int time_diff= 0; + + for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++) + { + c= &ms_thread->conn[i]; + + if (c->udp) + { + time_diff= (int)(ms_thread->curr_time - (rel_time_t)c->start_time.tv_sec); + + /* wait time out */ + if (time_diff > SOCK_WAIT_TIMEOUT) + { + /* calculate dropped packets count */ + if (c->recvpkt > 0) + { + atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt); + } + + atomic_add_size(&ms_stats.udp_timeout, 1); + ms_reset_conn(c, true); + } + } + } +} /* ms_check_sock_timeout */ + + +/* if disconnect, the ever-1-second timer will call this function to reconnect */ +static void ms_reconn_thread_socks(void) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++) + { + ms_reconn_socks(&ms_thread->conn[i]); + } +} /* ms_reconn_thread_socks */ + + +/** + * the handler of the ever-1-second timer + * + * @param fd, the descriptors of the socket + * @param which, event flags + * @param arg, argument + */ +static void ms_clock_handler(const int fd, const short which, void *arg) +{ + ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + struct timeval t= + { + .tv_sec= 1, .tv_usec= 0 + }; + + UNUSED_ARGUMENT(fd); + UNUSED_ARGUMENT(which); + UNUSED_ARGUMENT(arg); + + ms_set_current_time(); + + if (ms_thread->initialized) + { + /* only delete the event if it's actually there. */ + evtimer_del(&ms_thread->clock_event); + ms_check_sock_timeout(); + } + else + { + ms_thread->initialized= true; + } + + ms_reconn_thread_socks(); + + evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0); + event_base_set(ms_thread->base, &ms_thread->clock_event); + evtimer_add(&ms_thread->clock_event, &t); +} /* ms_clock_handler */ + + +/** + * used to bind thread to CPU if the system supports + * + * @param cpu, cpu index + * + * @return if success, return EXIT_SUCCESS, else return -1 + */ +static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu) +{ + uint32_t ret= 0; + +#ifdef HAVE_CPU_SET_T + cpu_set_t cpu_set; + CPU_ZERO(&cpu_set); + CPU_SET(cpu, &cpu_set); + + if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1) + { + fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n"); + ret= 1; + } +#else + UNUSED_ARGUMENT(cpu); +#endif + + return ret; +} /* ms_set_thread_cpu_affinity */ + + +/** + * Set up a thread's information. + * + * @param thread_ctx, pointer of the thread context structure + * + * @return if success, return EXIT_SUCCESS, else return -1 + */ +static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) +{ + + ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1); + pthread_setspecific(ms_thread_key, (void *)ms_thread); + + ms_thread->thread_ctx= thread_ctx; + ms_thread->nactive_conn= thread_ctx->nconns; + ms_thread->initialized= false; + static ATOMIC uint32_t cnt= 0; + + gettimeofday(&ms_thread->startup_time, NULL); + + ms_thread->base= event_init(); + if (ms_thread->base == NULL) + { + if (atomic_add_32_nv(&cnt, 1) == 0) + { + fprintf(stderr, "Can't allocate event base.\n"); + } + + return -1; + } + + ms_thread->conn= + (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t)); + if (ms_thread->conn == NULL) + { + if (atomic_add_32_nv(&cnt, 1) == 0) + { + fprintf( + stderr, + "Can't allocate concurrency structure for thread descriptors."); + } + + return -1; + } + memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t)); + + for (uint32_t i= 0; i < thread_ctx->nconns; i++) + { + ms_thread->conn[i].conn_idx= i; + if (ms_setup_conn(&ms_thread->conn[i]) != 0) + { + /* only output this error once */ + if (atomic_add_32_nv(&cnt, 1) == 0) + { + fprintf(stderr, "Initializing connection failed.\n"); + } + + return -1; + } + } + + return EXIT_SUCCESS; +} /* ms_setup_thread */ + + +/** + * Worker thread: main event loop + * + * @param arg, the pointer of argument + * + * @return void* + */ +static void *ms_worker_libevent(void *arg) +{ + ms_thread_t *ms_thread= NULL; + ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg; + + /** + * If system has more than one cpu and supports set cpu + * affinity, try to bind each thread to a cpu core; + */ + if (ms_setting.ncpu > 1) + { + ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu); + } + + if (ms_setup_thread(thread_ctx) != 0) + { + exit(1); + } + + /* each thread with a timer */ + ms_clock_handler(0, 0, 0); + + pthread_mutex_lock(&ms_global.init_lock.lock); + ms_global.init_lock.count++; + pthread_cond_signal(&ms_global.init_lock.cond); + pthread_mutex_unlock(&ms_global.init_lock.lock); + + ms_thread= pthread_getspecific(ms_thread_key); + event_base_loop(ms_thread->base, 0); + + return NULL; +} /* ms_worker_libevent */ + + +/** + * Creates a worker thread. + * + * @param func, the callback function + * @param arg, the argument to pass to the callback function + */ +static void ms_create_worker(void *(*func)(void *), void *arg) +{ + pthread_t thread; + pthread_attr_t attr; + int ret; + + pthread_attr_init(&attr); + + if ((ret= pthread_create(&thread, &attr, func, arg)) != 0) + { + fprintf(stderr, "Can't create thread: %s.\n", strerror(ret)); + exit(1); + } +} /* ms_create_worker */ + + +/* initialize threads */ +void ms_thread_init() +{ + ms_thread_ctx= + (ms_thread_ctx_t *)malloc( + sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads); + if (ms_thread_ctx == NULL) + { + fprintf(stderr, "Can't allocate thread descriptors."); + exit(1); + } + + for (uint32_t i= 0; i < ms_setting.nthreads; i++) + { + ms_thread_ctx[i].thd_idx= i; + ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads; + + /** + * If only one server, all the connections in all threads + * connects the same server. For support multi-servers, simple + * distribute thread to server. + */ + ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt; + ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps + / (int)ms_setting.nconns; + ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num + / ms_setting.nconns; + } + + if (pthread_key_create(&ms_thread_key, NULL)) + { + fprintf(stderr, "Can't create pthread keys. Major malfunction!\n"); + exit(1); + } + /* Create threads after we've done all the epoll setup. */ + for (uint32_t i= 0; i < ms_setting.nthreads; i++) + { + ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]); + } +} /* ms_thread_init */ + + +/* cleanup some resource of threads when all the threads exit */ +void ms_thread_cleanup() +{ + if (ms_thread_ctx != NULL) + { + free(ms_thread_ctx); + } + pthread_key_delete(ms_thread_key); +} /* ms_thread_cleanup */ diff --git a/src/bin/memaslap/ms_thread.h b/src/bin/memaslap/ms_thread.h new file mode 100644 index 00000000..8cb98bde --- /dev/null +++ b/src/bin/memaslap/ms_thread.h @@ -0,0 +1,78 @@ +/* + * File: ms_thread.h + * Author: Mingqiang Zhuang + * + * Created on February 10, 2009 + * + * (c) Copyright 2009, Schooner Information Technology, Inc. + * http://www.schoonerinfotech.com/ + * + */ + +/** + * Asynchronous memslap has the similar implementation of + * multi-threads with memcached. Asynchronous memslap creates + * one or more self-governed threads; each thread is bound with + * one CPU core if the system supports setting CPU core + * affinity. And every thread has private variables. There is + * less communication or some shared resources among all the + * threads. It can improve the performance because there are + * fewer locks and competition. In addition, each thread has a + * libevent to manage the events of network. Each thread has one + * or more self-governed concurrencies; each concurrency has one + * or more socket connections. All the concurrencies don't + * communicate with each other even though they are in the same + * thread. + */ +#ifndef MS_THREAD_H +#define MS_THREAD_H + +#include +#include "ms_conn.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** Time relative to server start. Smaller than time_t on 64-bit systems. */ +typedef unsigned int rel_time_t; + +/* Used to store the context of each thread */ +typedef struct thread_ctx +{ + uint32_t thd_idx; /* the thread index */ + uint32_t nconns; /* how many connections included by the thread */ + uint32_t srv_idx; /* index of the thread */ + int tps_perconn; /* expected throughput per connection */ + int64_t exec_num_perconn; /* execute number per connection */ +} ms_thread_ctx_t; + +/* Used to store the private variables of each thread */ +typedef struct thread +{ + ms_conn_t *conn; /* conn array to store all the conn in the thread */ + uint32_t nactive_conn; /* how many connects are active */ + + ms_thread_ctx_t *thread_ctx; /* thread context from the caller */ + struct event_base *base; /* libevent handler created by this thread */ + + rel_time_t curr_time; /* current time */ + struct event clock_event; /* clock event to time each one second */ + bool initialized; /* whether clock_event has been initialized */ + + struct timeval startup_time; /* start time of the thread */ +} ms_thread_t; + +/* initialize threads */ +void ms_thread_init(void); + + +/* cleanup some resource of threads when all the threads exit */ +void ms_thread_cleanup(void); + + +#ifdef __cplusplus +} +#endif + +#endif /* end of MS_THREAD_H */ -- 2.30.2