From ef66fe65395361ec9656531413af6e26676e1f47 Mon Sep 17 00:00:00 2001 From: Michael Wallner Date: Tue, 15 Sep 2020 11:27:17 +0200 Subject: [PATCH] cmake: reorder src/bin --- src/bin/CMakeLists.txt | 25 +- src/bin/common/CMakeLists.txt | 10 + src/bin/{ => common}/client_options.h | 0 src/bin/{ => common}/execute.cc | 0 src/bin/{ => common}/execute.h | 0 src/bin/{ => common}/generator.cc | 0 src/bin/{ => common}/generator.h | 0 src/bin/{ => common}/utilities.cc | 0 src/bin/{ => common}/utilities.h | 0 src/bin/memcapable.sh | 2 - src/bin/memcat.sh | 3 - src/bin/memcp.sh | 2 - src/bin/memdump.sh | 2 - src/bin/memerror.sh | 2 - src/bin/memexist.sh | 3 - src/bin/memflush.sh | 2 - src/bin/memparse.sh | 2 - src/bin/memping.sh | 2 - src/bin/memrm.sh | 3 - src/bin/memslap.sh | 2 - src/bin/memstat.sh | 2 - src/bin/memtouch.sh | 3 - src/bin/ms_atomic.h | 114 - src/bin/ms_conn.c | 3427 ------------------------- src/bin/ms_conn.h | 241 -- src/bin/ms_memslap.h | 133 - src/bin/ms_setting.c | 1068 -------- src/bin/ms_setting.h | 181 -- src/bin/ms_sigsegv.c | 128 - src/bin/ms_sigsegv.h | 34 - src/bin/ms_stats.c | 303 --- src/bin/ms_stats.h | 69 - src/bin/ms_task.c | 1110 -------- src/bin/ms_task.h | 94 - src/bin/ms_thread.c | 349 --- src/bin/ms_thread.h | 78 - 36 files changed, 20 insertions(+), 7374 deletions(-) create mode 100644 src/bin/common/CMakeLists.txt rename src/bin/{ => common}/client_options.h (100%) rename src/bin/{ => common}/execute.cc (100%) rename src/bin/{ => common}/execute.h (100%) rename src/bin/{ => common}/generator.cc (100%) rename src/bin/{ => common}/generator.h (100%) rename src/bin/{ => common}/utilities.cc (100%) rename src/bin/{ => common}/utilities.h (100%) delete mode 100755 src/bin/memcapable.sh delete mode 100755 src/bin/memcat.sh delete mode 100755 src/bin/memcp.sh delete mode 100755 src/bin/memdump.sh delete mode 100755 src/bin/memerror.sh delete mode 100755 src/bin/memexist.sh delete mode 100755 src/bin/memflush.sh delete mode 100755 src/bin/memparse.sh delete mode 100755 src/bin/memping.sh delete mode 100755 src/bin/memrm.sh delete mode 100755 src/bin/memslap.sh delete mode 100755 src/bin/memstat.sh delete mode 100755 src/bin/memtouch.sh delete mode 100644 src/bin/ms_atomic.h delete mode 100644 src/bin/ms_conn.c delete mode 100644 src/bin/ms_conn.h delete mode 100644 src/bin/ms_memslap.h delete mode 100644 src/bin/ms_setting.c delete mode 100644 src/bin/ms_setting.h delete mode 100644 src/bin/ms_sigsegv.c delete mode 100644 src/bin/ms_sigsegv.h delete mode 100644 src/bin/ms_stats.c delete mode 100644 src/bin/ms_stats.h delete mode 100644 src/bin/ms_task.c delete mode 100644 src/bin/ms_task.h delete mode 100644 src/bin/ms_thread.c delete mode 100644 src/bin/ms_thread.h diff --git a/src/bin/CMakeLists.txt b/src/bin/CMakeLists.txt index 5e08ac64..0894b247 100644 --- a/src/bin/CMakeLists.txt +++ b/src/bin/CMakeLists.txt @@ -1,19 +1,9 @@ - -add_library(libclient_utilities STATIC utilities.cc generator.cc execute.cc) -add_library(client_utilities ALIAS libclient_utilities) -target_link_libraries(libclient_utilities PRIVATE libmemcachedinternal) -target_include_directories(libclient_utilities PUBLIC - ${CMAKE_SOURCE_DIR}/include - ${CMAKE_BINARY_DIR}/include - ${CMAKE_SOURCE_DIR}/src - ${CMAKE_BINARY_DIR}/src - ${CMAKE_BINARY_DIR}) - +add_subdirectory(common) foreach(CLIENT IN LISTS CLIENTS) add_executable(${CLIENT} ${CLIENT}.cc) target_include_directories(${CLIENT} PRIVATE ..) - target_link_libraries(${CLIENT} PRIVATE libclient_utilities) + target_link_libraries(${CLIENT} PRIVATE libclient_common) if(CMAKE_INSTALL_RPATH) set_target_properties(${CLIENT} PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}/../${CMAKE_INSTALL_LIBDIR}) @@ -37,9 +27,14 @@ target_link_libraries(memslap PRIVATE Threads::Threads) if(ENABLE_MEMASLAP) if(LIBEVENT AND HAVE_C_STDATOMIC) add_executable(memaslap memaslap.c - ms_conn.c ms_setting.c ms_sigsegv.c ms_stats.c ms_task.c ms_thread.c) - target_include_directories(memaslap PRIVATE ${LIBEVENT_INCLUDEDIR}) - target_link_libraries(memaslap PRIVATE libclient_utilities ${LIBEVENT_LIBRARIES} Threads::Threads) + memaslap/ms_conn.c + memaslap/ms_setting.c + memaslap/ms_sigsegv.c + memaslap/ms_stats.c + memaslap/ms_task.c + memaslap/ms_thread.c) + target_include_directories(memaslap PRIVATE memaslap ${LIBEVENT_INCLUDEDIR}) + target_link_libraries(memaslap PRIVATE libclient_common ${LIBEVENT_LIBRARIES} Threads::Threads) if(CMAKE_INSTALL_RPATH) set_target_properties(${CLIENT} PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}/../${CMAKE_INSTALL_LIBDIR}) diff --git a/src/bin/common/CMakeLists.txt b/src/bin/common/CMakeLists.txt new file mode 100644 index 00000000..c452049a --- /dev/null +++ b/src/bin/common/CMakeLists.txt @@ -0,0 +1,10 @@ +add_library(libclient_common STATIC utilities.cc generator.cc execute.cc) +add_library(client_common ALIAS libclient_common) +target_link_libraries(libclient_common PRIVATE libmemcachedinternal) +target_include_directories(libclient_common PUBLIC + . + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/include + ${CMAKE_SOURCE_DIR}/src + ${CMAKE_BINARY_DIR}/src + ${CMAKE_BINARY_DIR}) diff --git a/src/bin/client_options.h b/src/bin/common/client_options.h similarity index 100% rename from src/bin/client_options.h rename to src/bin/common/client_options.h diff --git a/src/bin/execute.cc b/src/bin/common/execute.cc similarity index 100% rename from src/bin/execute.cc rename to src/bin/common/execute.cc diff --git a/src/bin/execute.h b/src/bin/common/execute.h similarity index 100% rename from src/bin/execute.h rename to src/bin/common/execute.h diff --git a/src/bin/generator.cc b/src/bin/common/generator.cc similarity index 100% rename from src/bin/generator.cc rename to src/bin/common/generator.cc diff --git a/src/bin/generator.h b/src/bin/common/generator.h similarity index 100% rename from src/bin/generator.h rename to src/bin/common/generator.h diff --git a/src/bin/utilities.cc b/src/bin/common/utilities.cc similarity index 100% rename from src/bin/utilities.cc rename to src/bin/common/utilities.cc diff --git a/src/bin/utilities.h b/src/bin/common/utilities.h similarity index 100% rename from src/bin/utilities.h rename to src/bin/common/utilities.h diff --git a/src/bin/memcapable.sh b/src/bin/memcapable.sh deleted file mode 100755 index 78fe1b4d..00000000 --- a/src/bin/memcapable.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -src/bin/memcapable -vh localhost diff --git a/src/bin/memcat.sh b/src/bin/memcat.sh deleted file mode 100755 index fbdc2122..00000000 --- a/src/bin/memcat.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -src/bin/memcp.sh -src/bin/memcat --servers=localhost -v mem.testdata diff --git a/src/bin/memcp.sh b/src/bin/memcp.sh deleted file mode 100755 index c173d46a..00000000 --- a/src/bin/memcp.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -src/bin/memcp -v --servers localhost mem.testdata diff --git a/src/bin/memdump.sh b/src/bin/memdump.sh deleted file mode 100755 index ee71c555..00000000 --- a/src/bin/memdump.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -src/bin/memdump -v --servers localhost diff --git a/src/bin/memerror.sh b/src/bin/memerror.sh deleted file mode 100755 index 70bf7cea..00000000 --- a/src/bin/memerror.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -src/bin/memerror 0 1 2 3 diff --git a/src/bin/memexist.sh b/src/bin/memexist.sh deleted file mode 100755 index 9cbaf75b..00000000 --- a/src/bin/memexist.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -src/bin/memcp.sh -src/bin/memexist -v --servers localhost mem.testdata diff --git a/src/bin/memflush.sh b/src/bin/memflush.sh deleted file mode 100755 index 770d8442..00000000 --- a/src/bin/memflush.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -src/bin/memflush -v --servers localhost diff --git a/src/bin/memparse.sh b/src/bin/memparse.sh deleted file mode 100755 index 8d7433e5..00000000 --- a/src/bin/memparse.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -src/bin/memparse --server=localhost:11211/?1 --server=127.0.0.1:11211/?2 diff --git a/src/bin/memping.sh b/src/bin/memping.sh deleted file mode 100755 index 09f09b8c..00000000 --- a/src/bin/memping.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -src/bin/memping -v --servers localhost diff --git a/src/bin/memrm.sh b/src/bin/memrm.sh deleted file mode 100755 index 40b63d96..00000000 --- a/src/bin/memrm.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -src/bin/memcp.sh -src/bin/memrm -v --servers localhost mem.testdata diff --git a/src/bin/memslap.sh b/src/bin/memslap.sh deleted file mode 100755 index e80cc111..00000000 --- a/src/bin/memslap.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -src/bin/memslap -v --servers localhost diff --git a/src/bin/memstat.sh b/src/bin/memstat.sh deleted file mode 100755 index cadaf362..00000000 --- a/src/bin/memstat.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -src/bin/memstat -v --servers localhost diff --git a/src/bin/memtouch.sh b/src/bin/memtouch.sh deleted file mode 100755 index a082eb52..00000000 --- a/src/bin/memtouch.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -src/bin/memcp.sh -src/bin/memtouch -v --servers localhost mem.testdata diff --git a/src/bin/ms_atomic.h b/src/bin/ms_atomic.h deleted file mode 100644 index 6ec990c0..00000000 --- a/src/bin/ms_atomic.h +++ /dev/null @@ -1,114 +0,0 @@ -/* LibMemcached - * Copyright (C) 2006-2009 Brian Aker - * All rights reserved. - * - * Use and distribution licensed under the BSD license. See - * the COPYING file in the parent directory for full text. - * - * Summary: - * - */ - -#ifndef CLIENTS_MS_ATOMIC_H -#define CLIENTS_MS_ATOMIC_H - -#if HAVE_C_STDATOMIC -# define ATOMIC _Atomic -#else -# define ATOMIC volatile -#endif - -#if defined(__SUNPRO_C) -# define _KERNEL -# include -# if SIZEOF_SIZE_T == 8 -# define atomic_add_size(X, Y) atomic_add_64((X), (Y)) -# define atomic_add_size_nv(X, Y) atomic_add_64((X), (Y)) -# define atomic_dec_size(X, Y) atomic_add_64((X), (Y)) -# define atomic_dec_size_nv(X, Y) atomic_add_64((X), (Y)) -# else -# define atomic_add_size(X, Y) atomic_add_32((X), (Y)) -# define atomic_add_size_nv(X, Y) atomic_add_32((X), (Y)) -# define atomic_dec_size(X, Y) atomic_add_32((X), (Y)) -# define atomic_dec_size_nv(X, Y) atomic_add_32((X), (Y)) -# endif -# undef _KERNEL -#elif HAVE_GCC_ATOMIC_BUILTINS -# define atomic_add_8(X, Y) __sync_fetch_and_add((X), (Y)) -# define atomic_add_16(X, Y) __sync_fetch_and_add((X), (Y)) -# define atomic_add_32(X, Y) __sync_fetch_and_add((X), (Y)) -# define atomic_add_size(X, Y) __sync_fetch_and_add((X), (Y)) -# define atomic_dec_8(X) __sync_fetch_and_sub((X), 1) -# define atomic_dec_16(X) __sync_fetch_and_sub((X), 1) -# define atomic_dec_32(X) __sync_fetch_and_sub((X), 1) -# define atomic_dec_size(X) __sync_fetch_and_sub((X), 1) -/* The same as above, but these return the new value instead of void */ -# define atomic_add_8_nv(X, Y) __sync_fetch_and_add((X), (Y)) -# define atomic_add_16_nv(X, Y) __sync_fetch_and_add((X), (Y)) -# define atomic_add_32_nv(X, Y) __sync_fetch_and_add((X), (Y)) -# define atomic_add_size_nv(X, Y) __sync_fetch_and_add((X), (Y)) -# define atomic_dec_8_nv(X) __sync_fetch_and_sub((X), 1) -# define atomic_dec_16_nv(X) __sync_fetch_and_sub((X), 1) -# define atomic_dec_32_nv(X) __sync_fetch_and_sub((X), 1) -# define atomic_dec_size_nv(X) __sync_fetch_and_sub((X), 1) -#elif HAVE_C_STDATOMIC -# include -# define atomic_add_8(X, Y) atomic_fetch_add(X, Y) -# define atomic_add_16(X, Y) atomic_fetch_add(X, Y) -# define atomic_add_32(X, Y) atomic_fetch_add(X, Y) -# define atomic_add_size(X, Y) atomic_fetch_add(X, Y) -# define atomic_dec_8(X) atomic_fetch_sub(X, 1) -# define atomic_dec_16(X) atomic_fetch_sub(X, 1) -# define atomic_dec_32(X) atomic_fetch_sub(X, 1) -# define atomic_dec_size(X) atomic_fetch_sub(X, 1) -/* The same as above, but these return the new value instead of void */ -# define ATOMIC_ADD_FETCH_DECL(T) \ -static inline T atomic_add_fetch_##T(ATOMIC T *ptr, T add) { \ - T des, cur = atomic_load(ptr); \ - do { \ - des = cur + add; \ - } while(!atomic_compare_exchange_weak(ptr, &cur, des)); \ - return des; \ -} -# define ATOMIC_SUB_FETCH_DECL(T) \ -T atomic_sub_fetch_##T(ATOMIC T *ptr) { \ - T des, cur = atomic_load(ptr); \ - do { \ - des = cur - 1; \ - } while(!atomic_compare_exchange_weak(ptr, &cur, des)); \ - return des; \ -} -ATOMIC_ADD_FETCH_DECL(uint8_t) -# define atomic_add_8_nv(X, Y) atomic_add_fetch_uint8_t(X, Y) -ATOMIC_ADD_FETCH_DECL(uint16_t) -# define atomic_add_16_nv(X, Y) atomic_add_fetch_uint16_t(X, Y) -ATOMIC_ADD_FETCH_DECL(uint32_t) -# define atomic_add_32_nv(X, Y) atomic_add_fetch_uint32_t(X, Y) -ATOMIC_ADD_FETCH_DECL(size_t) -# define atomic_add_size_nv(X, Y) atomic_add_fetch_size_t(X, Y) -# define atomic_dec_8_nv(X) atomic_sub_fetch(X, Y) -# define atomic_dec_16_nv(X) atomic_sub_fetch(X, Y) -# define atomic_dec_32_nv(X) atomic_sub_fetch(X, Y) -# define atomic_dec_size_nv(X) atomic_sub_fetch(X, Y) -#else -#warning "Atomic operators not found so memslap will not work correctly" -# define atomic_add_8(X, Y) 0 -# define atomic_add_16(X, Y) 0 -# define atomic_add_32(X, Y) 0 -# define atomic_add_size(X, Y) 0 -# define atomic_dec_8(X) 0 -# define atomic_dec_16(X) 0 -# define atomic_dec_32(X) 0 -# define atomic_dec_size(X) 0 -/* The same as above, but these return the new value instead of void */ -# define atomic_add_8_nv(X, Y) 0 -# define atomic_add_16_nv(X, Y) 0 -# define atomic_add_32_nv(X, Y) 0 -# define atomic_add_size_nv(X, Y) 0 -# define atomic_dec_8_nv(X) 0 -# define atomic_dec_16_nv(X) 0 -# define atomic_dec_32_nv(X) 0 -# define atomic_dec_size_nv(X) 0 -#endif /* defined(__SUNPRO_C) */ - -#endif /* CLIENTS_MS_ATOMIC_H */ diff --git a/src/bin/ms_conn.c b/src/bin/ms_conn.c deleted file mode 100644 index 71353d5b..00000000 --- a/src/bin/ms_conn.c +++ /dev/null @@ -1,3427 +0,0 @@ -/* - * File: ms_conn.c - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ - -#include "mem_config.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#if defined(HAVE_ARPA_INET_H) -# include -#endif - -#if defined(HAVE_SYS_TIME_H) -# include -#endif - -#if defined(HAVE_TIME_H) -# include -#endif - -#include "ms_setting.h" -#include "ms_thread.h" -#include "ms_atomic.h" - -#ifdef linux -/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to - * optimize the conversion functions, but the prototypes generate warnings - * from gcc. The conversion methods isn't the bottleneck for my app, so - * just remove the warnings by undef'ing the optimization .. - */ -#undef ntohs -#undef ntohl -#undef htons -#undef htonl -#endif - -/* for network write */ -#define TRANSMIT_COMPLETE 0 -#define TRANSMIT_INCOMPLETE 1 -#define TRANSMIT_SOFT_ERROR 2 -#define TRANSMIT_HARD_ERROR 3 - -/* for generating key */ -#define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */ -#define KEY_PREFIX_MASK 0x1010101010101010 - -/* For parse the value length return by server */ -#define KEY_TOKEN 1 -#define VALUELEN_TOKEN 3 - -/* global increasing counter, to ensure the key prefix unique */ -static uint64_t key_prefix_seq= KEY_PREFIX_BASE; - -/* global increasing counter, generating request id for UDP */ -static ATOMIC uint32_t udp_request_id= 0; - -extern pthread_key_t ms_thread_key; - -/* generate upd request id */ -static uint32_t ms_get_udp_request_id(void); - - -/* connect initialize */ -static void ms_task_init(ms_conn_t *c); -static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp); -static int ms_conn_sock_init(ms_conn_t *c); -static int ms_conn_event_init(ms_conn_t *c); -static int ms_conn_init(ms_conn_t *c, - const int init_state, - const int read_buffer_size, - const bool is_udp); -static void ms_warmup_num_init(ms_conn_t *c); -static int ms_item_win_init(ms_conn_t *c); - - -/* connection close */ -void ms_conn_free(ms_conn_t *c); -static void ms_conn_close(ms_conn_t *c); - - -/* create network connection */ -static int ms_new_socket(struct addrinfo *ai); -static void ms_maximize_sndbuf(const int sfd); -static int ms_network_connect(ms_conn_t *c, - char *srv_host_name, - const int srv_port, - const bool is_udp, - int *ret_sfd); -static int ms_reconn(ms_conn_t *c); - - -/* read and parse */ -static int ms_tokenize_command(char *command, - token_t *tokens, - const int max_tokens); -static int ms_ascii_process_line(ms_conn_t *c, char *command); -static int ms_try_read_line(ms_conn_t *c); -static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes); -static int ms_udp_read(ms_conn_t *c, char *buf, int len); -static int ms_try_read_network(ms_conn_t *c); -static void ms_verify_value(ms_conn_t *c, - ms_mlget_task_item_t *mlget_item, - char *value, - int vlen); -static void ms_ascii_complete_nread(ms_conn_t *c); -static void ms_bin_complete_nread(ms_conn_t *c); -static void ms_complete_nread(ms_conn_t *c); - - -/* send functions */ -static int ms_add_msghdr(ms_conn_t *c); -static int ms_ensure_iov_space(ms_conn_t *c); -static int ms_add_iov(ms_conn_t *c, const void *buf, int len); -static int ms_build_udp_headers(ms_conn_t *c); -static int ms_transmit(ms_conn_t *c); - - -/* status adjustment */ -static void ms_conn_shrink(ms_conn_t *c); -static void ms_conn_set_state(ms_conn_t *c, int state); -static bool ms_update_event(ms_conn_t *c, const int new_flags); -static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd); -static uint32_t ms_get_next_sock_index(ms_conn_t *c); -static int ms_update_conn_sock_event(ms_conn_t *c); -static bool ms_need_yield(ms_conn_t *c); -static void ms_update_start_time(ms_conn_t *c); - - -/* main loop */ -static void ms_drive_machine(ms_conn_t *c); -void ms_event_handler(const int fd, const short which, void *arg); - - -/* ascii protocol */ -static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item); -static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item); -static int ms_build_ascii_write_buf_mlget(ms_conn_t *c); - - -/* binary protocol */ -static int ms_bin_process_response(ms_conn_t *c); -static void ms_add_bin_header(ms_conn_t *c, - uint8_t opcode, - uint8_t hdr_len, - uint16_t key_len, - uint32_t body_len); -static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item); -static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item); -static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item); -static int ms_build_bin_write_buf_mlget(ms_conn_t *c); - - -/** - * each key has two parts, prefix and suffix. The suffix is a - * string random get form the character table. The prefix is a - * uint64_t variable. And the prefix must be unique. we use the - * prefix to identify a key. And the prefix can't include - * character ' ' '\r' '\n' '\0'. - * - * @return uint64_t - */ -uint64_t ms_get_key_prefix(void) -{ - uint64_t key_prefix; - - pthread_mutex_lock(&ms_global.seq_mutex); - key_prefix_seq|= KEY_PREFIX_MASK; - key_prefix= key_prefix_seq; - key_prefix_seq++; - pthread_mutex_unlock(&ms_global.seq_mutex); - - return key_prefix; -} /* ms_get_key_prefix */ - - -/** - * get an unique udp request id - * - * @return an unique UDP request id - */ -static uint32_t ms_get_udp_request_id(void) -{ - return atomic_add_32_nv(&udp_request_id, 1); -} - - -/** - * initialize current task structure - * - * @param c, pointer of the concurrency - */ -static void ms_task_init(ms_conn_t *c) -{ - c->curr_task.cmd= CMD_NULL; - c->curr_task.item= 0; - c->curr_task.verify= false; - c->curr_task.finish_verify= true; - c->curr_task.get_miss= true; - - c->curr_task.get_opt= 0; - c->curr_task.set_opt= 0; - c->curr_task.cycle_undo_get= 0; - c->curr_task.cycle_undo_set= 0; - c->curr_task.verified_get= 0; - c->curr_task.overwrite_set= 0; -} /* ms_task_init */ - - -/** - * initialize udp for the connection structure - * - * @param c, pointer of the concurrency - * @param is_udp, whether it's udp - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp) -{ - c->hdrbuf= 0; - c->rudpbuf= 0; - c->udppkt= 0; - - c->rudpsize= UDP_DATA_BUFFER_SIZE; - c->hdrsize= 0; - - c->rudpbytes= 0; - c->packets= 0; - c->recvpkt= 0; - c->pktcurr= 0; - c->ordcurr= 0; - - c->udp= is_udp; - - if (c->udp || (! c->udp && ms_setting.facebook_test)) - { - c->rudpbuf= (char *)malloc((size_t)c->rudpsize); - c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t)); - - if ((c->rudpbuf == NULL) || (c->udppkt == NULL)) - { - if (c->rudpbuf != NULL) - free(c->rudpbuf); - if (c->udppkt != NULL) - free(c->udppkt); - fprintf(stderr, "malloc()\n"); - return -1; - } - memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t)); - } - - return EXIT_SUCCESS; -} /* ms_conn_udp_init */ - - -/** - * initialize the connection structure - * - * @param c, pointer of the concurrency - * @param init_state, (conn_read, conn_write, conn_closing) - * @param read_buffer_size - * @param is_udp, whether it's udp - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_conn_init(ms_conn_t *c, - const int init_state, - const int read_buffer_size, - const bool is_udp) -{ - assert(c != NULL); - - c->rbuf= c->wbuf= 0; - c->iov= 0; - c->msglist= 0; - - c->rsize= read_buffer_size; - c->wsize= WRITE_BUFFER_SIZE; - c->iovsize= IOV_LIST_INITIAL; - c->msgsize= MSG_LIST_INITIAL; - - /* for replication, each connection need connect all the server */ - if (ms_setting.rep_write_srv > 0) - { - c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn; - } - else - { - c->total_sfds= ms_setting.sock_per_conn; - } - c->alive_sfds= 0; - - c->rbuf= (char *)malloc((size_t)c->rsize); - c->wbuf= (char *)malloc((size_t)c->wsize); - c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize); - c->msglist= (struct msghdr *)malloc( - sizeof(struct msghdr) * (size_t)c->msgsize); - if (ms_setting.mult_key_num > 1) - { - c->mlget_task.mlget_item= (ms_mlget_task_item_t *) - malloc( - sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num); - } - c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int)); - - if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL) - || (c->msglist == NULL) || (c->tcpsfd == NULL) - || ((ms_setting.mult_key_num > 1) - && (c->mlget_task.mlget_item == NULL))) - { - if (c->rbuf != NULL) - free(c->rbuf); - if (c->wbuf != NULL) - free(c->wbuf); - if (c->iov != NULL) - free(c->iov); - if (c->msglist != NULL) - free(c->msglist); - if (c->mlget_task.mlget_item != NULL) - free(c->mlget_task.mlget_item); - if (c->tcpsfd != NULL) - free(c->tcpsfd); - fprintf(stderr, "malloc()\n"); - return -1; - } - - c->state= init_state; - c->rvbytes= 0; - c->rbytes= 0; - c->rcurr= c->rbuf; - c->wcurr= c->wbuf; - c->iovused= 0; - c->msgcurr= 0; - c->msgused= 0; - c->cur_idx= c->total_sfds; /* default index is a invalid value */ - - c->ctnwrite= false; - c->readval= false; - c->change_sfd= false; - - c->precmd.cmd= c->currcmd.cmd= CMD_NULL; - c->precmd.isfinish= true; /* default the previous command finished */ - c->currcmd.isfinish= false; - c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE; - c->precmd.key_prefix= c->currcmd.key_prefix= 0; - - c->mlget_task.mlget_num= 0; - c->mlget_task.value_index= -1; /* default invalid value */ - - if (ms_setting.binary_prot_) - { - c->protocol= binary_prot; - } - else - { - c->protocol= ascii_prot; - } - - /* initialize udp */ - if (ms_conn_udp_init(c, is_udp) != 0) - { - return -1; - } - - /* initialize task */ - ms_task_init(c); - - if (! (ms_setting.facebook_test && is_udp)) - { - atomic_add_32(&ms_stats.active_conns, 1); - } - - return EXIT_SUCCESS; -} /* ms_conn_init */ - - -/** - * when doing 100% get operation, it could preset some objects - * to warmup the server. this function is used to initialize the - * number of the objects to preset. - * - * @param c, pointer of the concurrency - */ -static void ms_warmup_num_init(ms_conn_t *c) -{ - /* no set operation, preset all the items in the window */ - if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) - { - c->warmup_num= c->win_size; - c->remain_warmup_num= c->warmup_num; - } - else - { - c->warmup_num= 0; - c->remain_warmup_num= c->warmup_num; - } -} /* ms_warmup_num_init */ - - -/** - * each connection has an item window, this function initialize - * the window. The window is used to generate task. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_item_win_init(ms_conn_t *c) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - int exp_cnt= 0; - - c->win_size= (int)ms_setting.win_size; - c->set_cursor= 0; - c->exec_num= ms_thread->thread_ctx->exec_num_perconn; - c->remain_exec_num= c->exec_num; - - c->item_win= (ms_task_item_t *)malloc( - sizeof(ms_task_item_t) * (size_t)c->win_size); - if (c->item_win == NULL) - { - fprintf(stderr, "Can't allocate task item array for conn.\n"); - return -1; - } - memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size); - - for (int i= 0; i < c->win_size; i++) - { - c->item_win[i].key_size= (int)ms_setting.distr[i].key_size; - c->item_win[i].key_prefix= ms_get_key_prefix(); - c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset; - c->item_win[i].value_size= (int)ms_setting.distr[i].value_size; - c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */ - c->item_win[i].client_time= 0; - - /* set expire time base on the proportion */ - if (exp_cnt < ms_setting.exp_ver_per * i) - { - c->item_win[i].exp_time= FIXED_EXPIRE_TIME; - exp_cnt++; - } - else - { - c->item_win[i].exp_time= 0; - } - } - - ms_warmup_num_init(c); - - return EXIT_SUCCESS; -} /* ms_item_win_init */ - - -/** - * each connection structure can include one or more sock - * handlers. this function create these socks and connect the - * server(s). - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_conn_sock_init(ms_conn_t *c) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - uint32_t i; - int ret_sfd; - uint32_t srv_idx= 0; - - assert(c != NULL); - assert(c->tcpsfd != NULL); - - for (i= 0; i < c->total_sfds; i++) - { - ret_sfd= 0; - if (ms_setting.rep_write_srv > 0) - { - /* for replication, each connection need connect all the server */ - srv_idx= i % ms_setting.srv_cnt; - } - else - { - /* all the connections in a thread connects the same server */ - srv_idx= ms_thread->thread_ctx->srv_idx; - } - - if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, - ms_setting.udp, &ret_sfd) != 0) - { - break; - } - - if (i == 0) - { - c->sfd= ret_sfd; - } - - if (! ms_setting.udp) - { - c->tcpsfd[i]= ret_sfd; - } - - c->alive_sfds++; - } - - /* initialize udp sock handler if necessary */ - if (ms_setting.facebook_test) - { - ret_sfd= 0; - if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, - true, &ret_sfd) != 0) - { - c->udpsfd= 0; - } - else - { - c->udpsfd= ret_sfd; - } - } - - if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0))) - { - if (ms_setting.udp) - { - close(c->sfd); - } - else - { - for (uint32_t j= 0; j < i; j++) - { - close(c->tcpsfd[j]); - } - } - - if (c->udpsfd != 0) - { - close(c->udpsfd); - } - - return -1; - } - - return EXIT_SUCCESS; -} /* ms_conn_sock_init */ - - -/** - * each connection is managed by libevent, this function - * initialize the event of the connection structure. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_conn_event_init(ms_conn_t *c) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - short event_flags= EV_WRITE | EV_PERSIST; - - event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); - event_base_set(ms_thread->base, &c->event); - c->ev_flags= event_flags; - - if (event_add(&c->event, NULL) == -1) - { - return -1; - } - - return EXIT_SUCCESS; -} /* ms_conn_event_init */ - - -/** - * setup a connection, each connection structure of each - * thread must call this function to initialize. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -int ms_setup_conn(ms_conn_t *c) -{ - if (ms_item_win_init(c) != 0) - { - return -1; - } - - if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0) - { - return -1; - } - - if (ms_conn_sock_init(c) != 0) - { - return -1; - } - - if (ms_conn_event_init(c) != 0) - { - return -1; - } - - return EXIT_SUCCESS; -} /* ms_setup_conn */ - - -/** - * Frees a connection. - * - * @param c, pointer of the concurrency - */ -void ms_conn_free(ms_conn_t *c) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - if (c != NULL) - { - if (c->hdrbuf != NULL) - free(c->hdrbuf); - if (c->msglist != NULL) - free(c->msglist); - if (c->rbuf != NULL) - free(c->rbuf); - if (c->wbuf != NULL) - free(c->wbuf); - if (c->iov != NULL) - free(c->iov); - if (c->mlget_task.mlget_item != NULL) - free(c->mlget_task.mlget_item); - if (c->rudpbuf != NULL) - free(c->rudpbuf); - if (c->udppkt != NULL) - free(c->udppkt); - if (c->item_win != NULL) - free(c->item_win); - if (c->tcpsfd != NULL) - free(c->tcpsfd); - - if (--ms_thread->nactive_conn == 0) - { - free(ms_thread->conn); - } - } -} /* ms_conn_free */ - - -/** - * close a connection - * - * @param c, pointer of the concurrency - */ -static void ms_conn_close(ms_conn_t *c) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - assert(c != NULL); - - /* delete the event, the socket and the connection */ - event_del(&c->event); - - for (uint32_t i= 0; i < c->total_sfds; i++) - { - if (c->tcpsfd[i] > 0) - { - close(c->tcpsfd[i]); - } - } - c->sfd= 0; - - if (ms_setting.facebook_test) - { - close(c->udpsfd); - } - - atomic_dec_32(&ms_stats.active_conns); - - ms_conn_free(c); - - if (ms_setting.run_time == 0) - { - pthread_mutex_lock(&ms_global.run_lock.lock); - ms_global.run_lock.count++; - pthread_cond_signal(&ms_global.run_lock.cond); - pthread_mutex_unlock(&ms_global.run_lock.lock); - } - - if (ms_thread->nactive_conn == 0) - { - pthread_exit(NULL); - } -} /* ms_conn_close */ - - -/** - * create a new sock - * - * @param ai, server address information - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_new_socket(struct addrinfo *ai) -{ - int sfd; - - if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) - { - fprintf(stderr, "socket() error: %s.\n", strerror(errno)); - return -1; - } - - return sfd; -} /* ms_new_socket */ - - -/** - * Sets a socket's send buffer size to the maximum allowed by the system. - * - * @param sfd, file descriptor of socket - */ -static void ms_maximize_sndbuf(const int sfd) -{ - socklen_t intsize= sizeof(int); - unsigned int last_good= 0; - unsigned int min, max, avg; - unsigned int old_size; - - /* Start with the default size. */ - if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) - { - fprintf(stderr, "getsockopt(SO_SNDBUF)\n"); - return; - } - - /* Binary-search for the real maximum. */ - min= old_size; - max= MAX_SENDBUF_SIZE; - - while (min <= max) - { - avg= ((unsigned int)(min + max)) / 2; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) - { - last_good= avg; - min= avg + 1; - } - else - { - max= avg - 1; - } - } - (void)last_good; -} /* ms_maximize_sndbuf */ - - -/** - * socket connects the server - * - * @param c, pointer of the concurrency - * @param srv_host_name, the host name of the server - * @param srv_port, port of server - * @param is_udp, whether it's udp - * @param ret_sfd, the connected socket file descriptor - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_network_connect(ms_conn_t *c, - char *srv_host_name, - const int srv_port, - const bool is_udp, - int *ret_sfd) -{ - int sfd; - struct linger ling= - { - 0, 0 - }; - struct addrinfo *ai; - struct addrinfo *next; - struct addrinfo hints; - char port_buf[NI_MAXSERV]; - int error; - int success= 0; - - int flags= 1; - - /* - * the memset call clears nonstandard fields in some impementations - * that otherwise mess things up. - */ - memset(&hints, 0, sizeof(hints)); -#ifdef AI_ADDRCONFIG - hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG; -#else - hints.ai_flags= AI_PASSIVE; -#endif /* AI_ADDRCONFIG */ - if (is_udp) - { - hints.ai_protocol= IPPROTO_UDP; - hints.ai_socktype= SOCK_DGRAM; - hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */ - } - else - { - hints.ai_family= AF_UNSPEC; - hints.ai_protocol= IPPROTO_TCP; - hints.ai_socktype= SOCK_STREAM; - } - - snprintf(port_buf, NI_MAXSERV, "%d", srv_port); - error= getaddrinfo(srv_host_name, port_buf, &hints, &ai); - if (error != 0) - { - if (error != EAI_SYSTEM) - fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error)); - else - perror("getaddrinfo()\n"); - - return -1; - } - - for (next= ai; next; next= next->ai_next) - { - if ((sfd= ms_new_socket(next)) == -1) - { - freeaddrinfo(ai); - return -1; - } - - setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); - if (is_udp) - { - ms_maximize_sndbuf(sfd); - } - else - { - setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, - sizeof(flags)); - setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); - setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, - sizeof(flags)); - } - - if (is_udp) - { - c->srv_recv_addr_size= sizeof(struct sockaddr); - memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size); - } - else - { - if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1) - { - close(sfd); - freeaddrinfo(ai); - return -1; - } - } - - if (((flags= fcntl(sfd, F_GETFL, 0)) < 0) - || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)) - { - fprintf(stderr, "setting O_NONBLOCK\n"); - close(sfd); - freeaddrinfo(ai); - return -1; - } - - if (ret_sfd != NULL) - { - *ret_sfd= sfd; - } - - success++; - } - - freeaddrinfo(ai); - - /* Return zero if we detected no errors in starting up connections */ - return success == 0; -} /* ms_network_connect */ - - -/** - * reconnect a disconnected sock - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_reconn(ms_conn_t *c) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - uint32_t srv_idx= 0; - uint32_t srv_conn_cnt= 0; - - if (ms_setting.rep_write_srv > 0) - { - srv_idx= c->cur_idx % ms_setting.srv_cnt; - srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; - } - else - { - srv_idx= ms_thread->thread_ctx->srv_idx; - srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; - } - - /* close the old socket handler */ - close(c->sfd); - c->tcpsfd[c->cur_idx]= 0; - - if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1) - % srv_conn_cnt == 0) - { - gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL); - fprintf(stderr, "Server %s:%d disconnect\n", - ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port); - } - - if (ms_setting.rep_write_srv > 0) - { - uint32_t i= 0; - - for (i= 0; i < c->total_sfds; i++) - { - if (c->tcpsfd[i] != 0) - { - break; - } - } - - /* all socks disconnect */ - if (i == c->total_sfds) - { - return -1; - } - } - else - { - do - { - /* reconnect success, break the loop */ - if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, - ms_setting.udp, &c->sfd) == 0) - { - c->tcpsfd[c->cur_idx]= c->sfd; - if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % (uint32_t)srv_conn_cnt == 0) - { - gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); - int reconn_time= - (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec - - ms_setting.servers[srv_idx].disconn_time - .tv_sec); - fprintf(stderr, "Server %s:%d reconnect after %ds\n", - ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, reconn_time); - } - break; - } - - if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) - { - /* wait a second and reconnect */ - sleep(1); - } - } - while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); - } - - if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) - { - c->sfd= 0; - c->alive_sfds--; - } - - return EXIT_SUCCESS; -} /* ms_reconn */ - - -/** - * reconnect several disconnected socks in the connection - * structure, the ever-1-second timer of the thread will check - * whether some socks in the connections disconnect. if - * disconnect, reconnect the sock. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -int ms_reconn_socks(ms_conn_t *c) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - uint32_t srv_idx= 0; - int ret_sfd= 0; - uint32_t srv_conn_cnt= 0; - struct timeval cur_time; - - assert(c != NULL); - - if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds)) - { - return EXIT_SUCCESS; - } - - for (uint32_t i= 0; i < c->total_sfds; i++) - { - if (c->tcpsfd[i] == 0) - { - gettimeofday(&cur_time, NULL); - - /** - * For failover test of replication, reconnect the socks after - * it disconnects more than 5 seconds, Otherwise memslap will - * block at connect() function and the work threads can't work - * in this interval. - */ - if (cur_time.tv_sec - - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5) - { - break; - } - - if (ms_setting.rep_write_srv > 0) - { - srv_idx= i % ms_setting.srv_cnt; - srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns; - } - else - { - srv_idx= ms_thread->thread_ctx->srv_idx; - srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt; - } - - if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, - ms_setting.udp, &ret_sfd) == 0) - { - c->tcpsfd[i]= ret_sfd; - c->alive_sfds++; - - if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1) - % (uint32_t)srv_conn_cnt == 0) - { - gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL); - int reconn_time= - (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec - - ms_setting.servers[srv_idx].disconn_time - .tv_sec); - fprintf(stderr, "Server %s:%d reconnect after %ds\n", - ms_setting.servers[srv_idx].srv_host_name, - ms_setting.servers[srv_idx].srv_port, reconn_time); - } - } - } - } - - return EXIT_SUCCESS; -} /* ms_reconn_socks */ - - -/** - * Tokenize the command string by replacing whitespace with '\0' and update - * the token array tokens with pointer to start of each token and length. - * Returns total number of tokens. The last valid token is the terminal - * token (value points to the first unprocessed character of the string and - * length zero). - * - * Usage example: - * - * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) { - * for(int ix = 0; tokens[ix].length != 0; ix++) { - * ... - * } - * ncommand = tokens[ix].value - command; - * command = tokens[ix].value; - * } - * - * @param command, the command string to token - * @param tokens, array to store tokens - * @param max_tokens, maximum tokens number - * - * @return int, the number of tokens - */ -static int ms_tokenize_command(char *command, - token_t *tokens, - const int max_tokens) -{ - char *s, *e; - int ntokens= 0; - - assert(command != NULL && tokens != NULL && max_tokens > 1); - - for (s= e= command; ntokens < max_tokens - 1; ++e) - { - if (*e == ' ') - { - if (s != e) - { - tokens[ntokens].value= s; - tokens[ntokens].length= (size_t)(e - s); - ntokens++; - *e= '\0'; - } - s= e + 1; - } - else if (*e == '\0') - { - if (s != e) - { - tokens[ntokens].value= s; - tokens[ntokens].length= (size_t)(e - s); - ntokens++; - } - - break; /* string end */ - } - } - - return ntokens; -} /* ms_tokenize_command */ - - -/** - * parse the response of server. - * - * @param c, pointer of the concurrency - * @param command, the string responded by server - * - * @return int, if the command completed return EXIT_SUCCESS, else return - * -1 - */ -static int ms_ascii_process_line(ms_conn_t *c, char *command) -{ - int ret= 0; - int64_t value_len; - char *buffer= command; - - assert(c != NULL); - - /** - * for command get, we store the returned value into local buffer - * then continue in ms_complete_nread(). - */ - - switch (buffer[0]) - { - case 'V': /* VALUE || VERSION */ - if (buffer[1] == 'A') /* VALUE */ - { - token_t tokens[MAX_TOKENS]; - ms_tokenize_command(command, tokens, MAX_TOKENS); - errno= 0; - value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10); - if (errno != 0) - { - printf("<%d ERROR %s\n", c->sfd, strerror(errno)); - } - c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value; - - /* - * We read the \r\n into the string since not doing so is more - * cycles then the waster of memory to do so. - * - * We are null terminating through, which will most likely make - * some people lazy about using the return length. - */ - c->rvbytes= (int)(value_len + 2); - c->readval= true; - ret= -1; - } - - break; - - case 'O': /* OK */ - c->currcmd.retstat= MCD_SUCCESS; - break; - - case 'S': /* STORED STATS SERVER_ERROR */ - if (buffer[2] == 'A') /* STORED STATS */ - { /* STATS*/ - c->currcmd.retstat= MCD_STAT; - } - else if (buffer[1] == 'E') - { - /* SERVER_ERROR */ - printf("<%d %s\n", c->sfd, buffer); - - c->currcmd.retstat= MCD_SERVER_ERROR; - } - else if (buffer[1] == 'T') - { - /* STORED */ - c->currcmd.retstat= MCD_STORED; - } - else - { - c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; - } - break; - - case 'D': /* DELETED DATA */ - if (buffer[1] == 'E') - { - c->currcmd.retstat= MCD_DELETED; - } - else - { - c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; - } - - break; - - case 'N': /* NOT_FOUND NOT_STORED*/ - if (buffer[4] == 'F') - { - c->currcmd.retstat= MCD_NOTFOUND; - } - else if (buffer[4] == 'S') - { - printf("<%d %s\n", c->sfd, buffer); - c->currcmd.retstat= MCD_NOTSTORED; - } - else - { - c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; - } - break; - - case 'E': /* PROTOCOL ERROR or END */ - if (buffer[1] == 'N') - { - /* END */ - c->currcmd.retstat= MCD_END; - } - else if (buffer[1] == 'R') - { - printf("<%d ERROR\n", c->sfd); - c->currcmd.retstat= MCD_PROTOCOL_ERROR; - } - else if (buffer[1] == 'X') - { - c->currcmd.retstat= MCD_DATA_EXISTS; - printf("<%d %s\n", c->sfd, buffer); - } - else - { - c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; - } - break; - - case 'C': /* CLIENT ERROR */ - printf("<%d %s\n", c->sfd, buffer); - c->currcmd.retstat= MCD_CLIENT_ERROR; - break; - - default: - c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; - break; - } /* switch */ - - return ret; -} /* ms_ascii_process_line */ - - -/** - * after one operation completes, reset the concurrency - * - * @param c, pointer of the concurrency - * @param timeout, whether it's timeout - */ -void ms_reset_conn(ms_conn_t *c, bool timeout) -{ - assert(c != NULL); - - if (c->udp) - { - if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET)) - { - memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets); - } - - c->packets= 0; - c->recvpkt= 0; - c->pktcurr= 0; - c->ordcurr= 0; - c->rudpbytes= 0; - } - c->currcmd.isfinish= true; - c->ctnwrite= false; - c->rbytes= 0; - c->rcurr= c->rbuf; - c->msgcurr = 0; - c->msgused = 0; - c->iovused = 0; - ms_conn_set_state(c, conn_write); - memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ - - if (timeout) - { - ms_drive_machine(c); - } -} /* ms_reset_conn */ - - -/** - * if we have a complete line in the buffer, process it. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_try_read_line(ms_conn_t *c) -{ - if (c->protocol == binary_prot) - { - /* Do we have the complete packet header? */ - if ((uint64_t)c->rbytes < sizeof(c->binary_header)) - { - /* need more data! */ - return EXIT_SUCCESS; - } - else - { -#ifdef NEED_ALIGN - if (((long)(c->rcurr)) % 8 != 0) - { - /* must realign input buffer */ - memmove(c->rbuf, c->rcurr, c->rbytes); - c->rcurr= c->rbuf; - if (settings.verbose) - { - fprintf(stderr, "%d: Realign input buffer.\n", c->sfd); - } - } -#endif - protocol_binary_response_header *rsp; - rsp= (protocol_binary_response_header *)c->rcurr; - - c->binary_header= *rsp; - c->binary_header.response.extlen= rsp->response.extlen; - c->binary_header.response.keylen= ntohs(rsp->response.keylen); - c->binary_header.response.bodylen= ntohl(rsp->response.bodylen); - c->binary_header.response.status= ntohs(rsp->response.status); - - if (c->binary_header.response.magic != PROTOCOL_BINARY_RES) - { - fprintf(stderr, "Invalid magic: %x\n", - c->binary_header.response.magic); - ms_conn_set_state(c, conn_closing); - return EXIT_SUCCESS; - } - - /* process this complete response */ - if (ms_bin_process_response(c) == 0) - { - /* current operation completed */ - ms_reset_conn(c, false); - return -1; - } - else - { - c->rbytes-= (int32_t)sizeof(c->binary_header); - c->rcurr+= sizeof(c->binary_header); - } - } - } - else - { - char *el, *cont; - - assert(c != NULL); - assert(c->rcurr <= (c->rbuf + c->rsize)); - - if (c->rbytes == 0) - return EXIT_SUCCESS; - - el= memchr(c->rcurr, '\n', (size_t)c->rbytes); - if (! el) - return EXIT_SUCCESS; - - cont= el + 1; - if (((el - c->rcurr) > 1) && (*(el - 1) == '\r')) - { - el--; - } - *el= '\0'; - - assert(cont <= (c->rcurr + c->rbytes)); - - /* process this complete line */ - if (ms_ascii_process_line(c, c->rcurr) == 0) - { - /* current operation completed */ - ms_reset_conn(c, false); - return -1; - } - else - { - /* current operation didn't complete */ - c->rbytes-= (int32_t)(cont - c->rcurr); - c->rcurr= cont; - } - - assert(c->rcurr <= (c->rbuf + c->rsize)); - } - - return -1; -} /* ms_try_read_line */ - - -/** - * because the packet of UDP can't ensure the order, the - * function is used to sort the received udp packet. - * - * @param c, pointer of the concurrency - * @param buf, the buffer to store the ordered packages data - * @param rbytes, the maximum capacity of the buffer - * - * @return int, if success, return the copy bytes, else return - * -1 - */ -static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes) -{ - int len= 0; - int wbytes= 0; - uint16_t req_id= 0; - uint16_t seq_num= 0; - uint16_t packets= 0; - unsigned char *header= NULL; - - /* no enough data */ - assert(c != NULL); - assert(buf != NULL); - assert(c->rudpbytes >= UDP_HEADER_SIZE); - - /* calculate received packets count */ - if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE) - { - /* the last packet has some data */ - c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1; - } - else - { - c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE; - } - - /* get the total packets count if necessary */ - if (c->packets == 0) - { - c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf); - } - - /* build the ordered packet array */ - for (int i= c->pktcurr; i < c->recvpkt; i++) - { - header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE; - req_id= (uint16_t)HEADER_TO_REQID(header); - assert(req_id == c->request_id % (1 << 16)); - - packets= (uint16_t)HEADER_TO_PACKETS(header); - assert(c->packets == HEADER_TO_PACKETS(header)); - - seq_num= (uint16_t)HEADER_TO_SEQNUM(header); - c->udppkt[seq_num].header= header; - c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE; - - if (i == c->recvpkt - 1) - { - /* last received packet */ - if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0) - { - c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; - c->pktcurr++; - } - else - { - c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE - - UDP_HEADER_SIZE; - } - } - else - { - c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE; - c->pktcurr++; - } - } - - for (int i= c->ordcurr; i < c->recvpkt; i++) - { - /* there is some data to copy */ - if ((c->udppkt[i].data != NULL) - && (c->udppkt[i].copybytes < c->udppkt[i].rbytes)) - { - header= c->udppkt[i].header; - len= c->udppkt[i].rbytes - c->udppkt[i].copybytes; - if (len > rbytes - wbytes) - { - len= rbytes - wbytes; - } - - assert(len <= rbytes - wbytes); - assert(i == HEADER_TO_SEQNUM(header)); - - memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes, - (size_t)len); - wbytes+= len; - c->udppkt[i].copybytes+= len; - - if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes) - && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) - { - /* finish copying all the data of this packet, next */ - c->ordcurr++; - } - - /* last received packet, and finish copying all the data */ - if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1) - && (c->udppkt[i].copybytes == c->udppkt[i].rbytes)) - { - break; - } - - /* no space to copy data */ - if (wbytes >= rbytes) - { - break; - } - - /* it doesn't finish reading all the data of the packet from network */ - if ((i != c->recvpkt - 1) - && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE)) - { - break; - } - } - else - { - /* no data to copy */ - break; - } - } - (void)packets; - - return wbytes == 0 ? -1 : wbytes; -} /* ms_sort_udp_packet */ - - -/** - * encapsulate upd read like tcp read - * - * @param c, pointer of the concurrency - * @param buf, read buffer - * @param len, length to read - * - * @return int, if success, return the read bytes, else return - * -1 - */ -static int ms_udp_read(ms_conn_t *c, char *buf, int len) -{ - int res= 0; - int avail= 0; - int rbytes= 0; - int copybytes= 0; - - assert(c->udp); - - while (1) - { - if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize) - { - char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2); - if (! new_rbuf) - { - fprintf(stderr, "Couldn't realloc input buffer.\n"); - c->rudpbytes= 0; /* ignore what we read */ - return -1; - } - c->rudpbuf= new_rbuf; - c->rudpsize*= 2; - } - - avail= c->rudpsize - c->rudpbytes; - /* UDP each time read a packet, 1400 bytes */ - res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail); - - if (res > 0) - { - atomic_add_size(&ms_stats.bytes_read, res); - c->rudpbytes+= res; - rbytes+= res; - if (res == avail) - { - continue; - } - else - { - break; - } - } - - if (res == 0) - { - /* "connection" closed */ - return res; - } - - if (res == -1) - { - /* no data to read */ - return res; - } - } - - /* copy data to read buffer */ - if (rbytes > 0) - { - copybytes= ms_sort_udp_packet(c, buf, len); - } - - if (copybytes == -1) - { - atomic_add_size(&ms_stats.pkt_disorder, 1); - } - - return copybytes; -} /* ms_udp_read */ - - -/* - * read from network as much as we can, handle buffer overflow and connection - * close. - * before reading, move the remaining incomplete fragment of a command - * (if any) to the beginning of the buffer. - * return EXIT_SUCCESS if there's nothing to read on the first read. - */ - -/** - * read from network as much as we can, handle buffer overflow and connection - * close. before reading, move the remaining incomplete fragment of a command - * (if any) to the beginning of the buffer. - * - * @param c, pointer of the concurrency - * - * @return int, - * return EXIT_SUCCESS if there's nothing to read on the first read. - * return EXIT_FAILURE if get data - * return -1 if error happens - */ -static int ms_try_read_network(ms_conn_t *c) -{ - int gotdata= 0; - int res; - int64_t avail; - - assert(c != NULL); - - if ((c->rcurr != c->rbuf) - && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf)) - || (c->readval && (c->rcurr - c->rbuf > c->rbytes)))) - { - if (c->rbytes != 0) /* otherwise there's nothing to copy */ - memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); - c->rcurr= c->rbuf; - } - - while (1) - { - if (c->rbytes >= c->rsize) - { - char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2); - if (! new_rbuf) - { - fprintf(stderr, "Couldn't realloc input buffer.\n"); - c->rbytes= 0; /* ignore what we read */ - return -1; - } - c->rcurr= c->rbuf= new_rbuf; - c->rsize*= 2; - } - - avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf); - if (avail == 0) - { - break; - } - - if (c->udp) - { - res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail); - } - else - { - res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail); - } - - if (res > 0) - { - if (! c->udp) - { - atomic_add_size(&ms_stats.bytes_read, res); - } - gotdata= 1; - c->rbytes+= res; - if (res == avail) - { - continue; - } - else - { - break; - } - } - if (res == 0) - { - /* connection closed */ - ms_conn_set_state(c, conn_closing); - return -1; - } - if (res == -1) - { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) - break; - /* Should close on unhandled errors. */ - ms_conn_set_state(c, conn_closing); - return -1; - } - } - - return gotdata; -} /* ms_try_read_network */ - - -/** - * after get the object from server, verify the value if - * necessary. - * - * @param c, pointer of the concurrency - * @param mlget_item, pointer of mulit-get task item structure - * @param value, received value string - * @param vlen, received value string length - */ -static void ms_verify_value(ms_conn_t *c, - ms_mlget_task_item_t *mlget_item, - char *value, - int vlen) -{ - if (c->curr_task.verify) - { - assert(c->curr_task.item->value_offset != INVALID_OFFSET); - char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset]; - char *orignkey= - &ms_setting.char_block[c->curr_task.item->key_suffix_offset]; - - /* verify expire time if necessary */ - if (c->curr_task.item->exp_time > 0) - { - struct timeval curr_time; - gettimeofday(&curr_time, NULL); - - /* object expired but get it now */ - if (curr_time.tv_sec - c->curr_task.item->client_time - > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR) - { - atomic_add_size(&ms_stats.exp_get, 1); - - if (ms_setting.verbose) - { - char set_time[64]; - char cur_time[64]; - strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", - localtime(&c->curr_task.item->client_time)); - strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", - localtime(&curr_time.tv_sec)); - fprintf(stderr, - "\n<%d expire time verification failed, " - "object expired but get it now\n" - "\tkey len: %d\n" - "\tkey: %" PRIx64 " %.*s\n" - "\tset time: %s current time: %s " - "diff time: %d expire time: %d\n" - "\texpected data: \n" - "\treceived data len: %d\n" - "\treceived data: %.*s\n", - c->sfd, - c->curr_task.item->key_size, - c->curr_task.item->key_prefix, - c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE, - orignkey, - set_time, - cur_time, - (int)(curr_time.tv_sec - c->curr_task.item->client_time), - c->curr_task.item->exp_time, - vlen, - vlen, - value); - fflush(stderr); - } - } - } - else - { - if ((c->curr_task.item->value_size != vlen) - || (memcmp(orignval, value, (size_t)vlen) != 0)) - { - atomic_add_size(&ms_stats.vef_failed, 1); - - if (ms_setting.verbose) - { - fprintf(stderr, - "\n<%d data verification failed\n" - "\tkey len: %d\n" - "\tkey: %" PRIx64" %.*s\n" - "\texpected data len: %d\n" - "\texpected data: %.*s\n" - "\treceived data len: %d\n" - "\treceived data: %.*s\n", - c->sfd, - c->curr_task.item->key_size, - c->curr_task.item->key_prefix, - c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE, - orignkey, - c->curr_task.item->value_size, - c->curr_task.item->value_size, - orignval, - vlen, - vlen, - value); - fflush(stderr); - } - } - } - - c->curr_task.finish_verify= true; - - if (mlget_item != NULL) - { - mlget_item->finish_verify= true; - } - } -} /* ms_verify_value */ - - -/** - * For ASCII protocol, after store the data into the local - * buffer, run this function to handle the data. - * - * @param c, pointer of the concurrency - */ -static void ms_ascii_complete_nread(ms_conn_t *c) -{ - assert(c != NULL); - assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_prot); - if (c->rvbytes > 2) - { - assert( - c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r'); - } - - /* multi-get */ - ms_mlget_task_item_t *mlget_item= NULL; - if (((ms_setting.mult_key_num > 1) - && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) - || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) - { - c->mlget_task.value_index++; - mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index]; - - if (mlget_item->item->key_prefix == c->currcmd.key_prefix) - { - c->curr_task.item= mlget_item->item; - c->curr_task.verify= mlget_item->verify; - c->curr_task.finish_verify= mlget_item->finish_verify; - mlget_item->get_miss= false; - } - else - { - /* Try to find the task item in multi-get task array */ - for (int i= 0; i < c->mlget_task.mlget_num; i++) - { - mlget_item= &c->mlget_task.mlget_item[i]; - if (mlget_item->item->key_prefix == c->currcmd.key_prefix) - { - c->curr_task.item= mlget_item->item; - c->curr_task.verify= mlget_item->verify; - c->curr_task.finish_verify= mlget_item->finish_verify; - mlget_item->get_miss= false; - - break; - } - } - } - } - - ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2); - - c->curr_task.get_miss= false; - c->rbytes-= c->rvbytes; - c->rcurr= c->rcurr + c->rvbytes; - assert(c->rcurr <= (c->rbuf + c->rsize)); - c->readval= false; - c->rvbytes= 0; -} /* ms_ascii_complete_nread */ - - -/** - * For binary protocol, after store the data into the local - * buffer, run this function to handle the data. - * - * @param c, pointer of the concurrency - */ -static void ms_bin_complete_nread(ms_conn_t *c) -{ - assert(c != NULL); - assert(c->rbytes >= c->rvbytes); - assert(c->protocol == binary_prot); - - int extlen= c->binary_header.response.extlen; - int keylen= c->binary_header.response.keylen; - uint8_t opcode= c->binary_header.response.opcode; - - /* not get command or not include value, just return */ - if (((opcode != PROTOCOL_BINARY_CMD_GET) - && (opcode != PROTOCOL_BINARY_CMD_GETQ)) - || (c->rvbytes <= extlen + keylen)) - { - /* get miss */ - if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET) - { - c->currcmd.retstat= MCD_END; - c->curr_task.get_miss= true; - } - - c->readval= false; - c->rvbytes= 0; - ms_reset_conn(c, false); - return; - } - - /* multi-get */ - ms_mlget_task_item_t *mlget_item= NULL; - if (((ms_setting.mult_key_num > 1) - && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) - || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) - { - c->mlget_task.value_index++; - mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index]; - - c->curr_task.item= mlget_item->item; - c->curr_task.verify= mlget_item->verify; - c->curr_task.finish_verify= mlget_item->finish_verify; - mlget_item->get_miss= false; - } - - ms_verify_value(c, - mlget_item, - c->rcurr + extlen + keylen, - c->rvbytes - extlen - keylen); - - c->currcmd.retstat= MCD_END; - c->curr_task.get_miss= false; - c->rbytes-= c->rvbytes; - c->rcurr= c->rcurr + c->rvbytes; - assert(c->rcurr <= (c->rbuf + c->rsize)); - c->readval= false; - c->rvbytes= 0; - - if (ms_setting.mult_key_num > 1) - { - /* multi-get have check all the item */ - if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1) - { - ms_reset_conn(c, false); - } - } - else - { - /* single get */ - ms_reset_conn(c, false); - } -} /* ms_bin_complete_nread */ - - -/** - * we get here after reading the value of get commands. - * - * @param c, pointer of the concurrency - */ -static void ms_complete_nread(ms_conn_t *c) -{ - assert(c != NULL); - assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_prot - || c->protocol == binary_prot); - - if (c->protocol == binary_prot) - { - ms_bin_complete_nread(c); - } - else - { - ms_ascii_complete_nread(c); - } -} /* ms_complete_nread */ - - -/** - * Adds a message header to a connection. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_add_msghdr(ms_conn_t *c) -{ - struct msghdr *msg; - - assert(c != NULL); - - if (c->msgsize == c->msgused) - { - msg= - realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr)); - if (! msg) - return -1; - - c->msglist= msg; - c->msgsize*= 2; - } - - msg= c->msglist + c->msgused; - - /** - * this wipes msg_iovlen, msg_control, msg_controllen, and - * msg_flags, the last 3 of which aren't defined on solaris: - */ - memset(msg, 0, sizeof(struct msghdr)); - - msg->msg_iov= &c->iov[c->iovused]; - - if (c->udp && (c->srv_recv_addr_size > 0)) - { - msg->msg_name= &c->srv_recv_addr; - msg->msg_namelen= c->srv_recv_addr_size; - } - - c->msgbytes= 0; - c->msgused++; - - if (c->udp) - { - /* Leave room for the UDP header, which we'll fill in later. */ - return ms_add_iov(c, NULL, UDP_HEADER_SIZE); - } - - return EXIT_SUCCESS; -} /* ms_add_msghdr */ - - -/** - * Ensures that there is room for another structure iovec in a connection's - * iov list. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_ensure_iov_space(ms_conn_t *c) -{ - assert(c != NULL); - - if (c->iovused >= c->iovsize) - { - int i, iovnum; - struct iovec *new_iov= (struct iovec *)realloc(c->iov, - ((size_t)c->iovsize - * 2) - * sizeof(struct iovec)); - if (! new_iov) - return -1; - - c->iov= new_iov; - c->iovsize*= 2; - - /* Point all the msghdr structures at the new list. */ - for (i= 0, iovnum= 0; i < c->msgused; i++) - { - c->msglist[i].msg_iov= &c->iov[iovnum]; - iovnum+= (int)c->msglist[i].msg_iovlen; - } - } - - return EXIT_SUCCESS; -} /* ms_ensure_iov_space */ - - -/** - * Adds data to the list of pending data that will be written out to a - * connection. - * - * @param c, pointer of the concurrency - * @param buf, the buffer includes data to send - * @param len, the data length in the buffer - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_add_iov(ms_conn_t *c, const void *buf, int len) -{ - struct msghdr *m; - int leftover; - bool limit_to_mtu; - - assert(c != NULL); - - do - { - m= &c->msglist[c->msgused - 1]; - - /* - * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes. - */ - limit_to_mtu= c->udp; - -#ifdef IOV_MAX - /* We may need to start a new msghdr if this one is full. */ - if ((m->msg_iovlen == IOV_MAX) - || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE))) - { - ms_add_msghdr(c); - m= &c->msglist[c->msgused - 1]; - } -#endif - - if (ms_ensure_iov_space(c) != 0) - return -1; - - /* If the fragment is too big to fit in the datagram, split it up */ - if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE)) - { - leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE; - len-= leftover; - } - else - { - leftover= 0; - } - - m= &c->msglist[c->msgused - 1]; - m->msg_iov[m->msg_iovlen].iov_base= (void *)buf; - m->msg_iov[m->msg_iovlen].iov_len= (size_t)len; - - c->msgbytes+= len; - c->iovused++; - m->msg_iovlen++; - - buf= ((char *)buf) + len; - len= leftover; - } - while (leftover > 0); - - return EXIT_SUCCESS; -} /* ms_add_iov */ - - -/** - * Constructs a set of UDP headers and attaches them to the outgoing messages. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_build_udp_headers(ms_conn_t *c) -{ - int i; - unsigned char *hdr; - - assert(c != NULL); - - c->request_id= ms_get_udp_request_id(); - - if (c->msgused > c->hdrsize) - { - void *new_hdrbuf; - if (c->hdrbuf) - new_hdrbuf= realloc(c->hdrbuf, - (size_t)c->msgused * 2 * UDP_HEADER_SIZE); - else - new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE); - if (! new_hdrbuf) - return -1; - - c->hdrbuf= (unsigned char *)new_hdrbuf; - c->hdrsize= c->msgused * 2; - } - - /* If this is a multi-packet request, drop it. */ - if (c->udp && (c->msgused > 1)) - { - fprintf(stderr, "multi-packet request for UDP not supported.\n"); - return -1; - } - - hdr= c->hdrbuf; - for (i= 0; i < c->msgused; i++) - { - c->msglist[i].msg_iov[0].iov_base= (void *)hdr; - c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE; - *hdr++= (unsigned char)(c->request_id / 256); - *hdr++= (unsigned char)(c->request_id % 256); - *hdr++= (unsigned char)(i / 256); - *hdr++= (unsigned char)(i % 256); - *hdr++= (unsigned char)(c->msgused / 256); - *hdr++= (unsigned char)(c->msgused % 256); - *hdr++= (unsigned char)1; /* support facebook memcached */ - *hdr++= (unsigned char)0; - assert(hdr == - ((unsigned char *)c->msglist[i].msg_iov[0].iov_base - + UDP_HEADER_SIZE)); - } - - return EXIT_SUCCESS; -} /* ms_build_udp_headers */ - - -/** - * Transmit the next chunk of data from our list of msgbuf structures. - * - * @param c, pointer of the concurrency - * - * @return TRANSMIT_COMPLETE All done writing. - * TRANSMIT_INCOMPLETE More data remaining to write. - * TRANSMIT_SOFT_ERROR Can't write any more right now. - * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing) - */ -static int ms_transmit(ms_conn_t *c) -{ - assert(c != NULL); - - if ((c->msgcurr < c->msgused) - && (c->msglist[c->msgcurr].msg_iovlen == 0)) - { - /* Finished writing the current msg; advance to the next. */ - c->msgcurr++; - } - - if (c->msgcurr < c->msgused) - { - ssize_t res; - struct msghdr *m= &c->msglist[c->msgcurr]; - - res= sendmsg(c->sfd, m, 0); - if (res > 0) - { - atomic_add_size(&ms_stats.bytes_written, res); - - /* We've written some of the data. Remove the completed - * iovec entries from the list of pending writes. */ - while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len) - { - res-= (ssize_t)m->msg_iov->iov_len; - m->msg_iovlen--; - m->msg_iov++; - } - - /* Might have written just part of the last iovec entry; - * adjust it so the next write will do the rest. */ - if (res > 0) - { - m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res); - m->msg_iov->iov_len-= (size_t)res; - } - return TRANSMIT_INCOMPLETE; - } - if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) - { - if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) - { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - return TRANSMIT_HARD_ERROR; - } - return TRANSMIT_SOFT_ERROR; - } - - /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK, - * we have a real error, on which we close the connection */ - fprintf(stderr, "Failed to write, and not due to blocking.\n"); - - ms_conn_set_state(c, conn_closing); - return TRANSMIT_HARD_ERROR; - } - else - { - return TRANSMIT_COMPLETE; - } -} /* ms_transmit */ - - -/** - * Shrinks a connection's buffers if they're too big. This prevents - * periodic large "mget" response from server chewing lots of client - * memory. - * - * This should only be called in between requests since it can wipe output - * buffers! - * - * @param c, pointer of the concurrency - */ -static void ms_conn_shrink(ms_conn_t *c) -{ - assert(c != NULL); - - if (c->udp) - return; - - if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE)) - { - char *newbuf; - - if (c->rcurr != c->rbuf) - memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); - - newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); - - if (newbuf) - { - c->rbuf= newbuf; - c->rsize= DATA_BUFFER_SIZE; - } - c->rcurr= c->rbuf; - } - - if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT) - && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE)) - { - char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2); - if (new_rbuf) - { - c->rudpbuf= new_rbuf; - c->rudpsize= UDP_DATA_BUFFER_SIZE; - } - /* TODO check error condition? */ - } - - if (c->msgsize > MSG_LIST_HIGHWAT) - { - struct msghdr *newbuf= (struct msghdr *)realloc( - (void *)c->msglist, - MSG_LIST_INITIAL - * sizeof(c->msglist[0])); - if (newbuf) - { - c->msglist= newbuf; - c->msgsize= MSG_LIST_INITIAL; - } - /* TODO check error condition? */ - } - - if (c->iovsize > IOV_LIST_HIGHWAT) - { - struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov, - IOV_LIST_INITIAL - * sizeof(c->iov[0])); - if (newbuf) - { - c->iov= newbuf; - c->iovsize= IOV_LIST_INITIAL; - } - /* TODO check return value */ - } -} /* ms_conn_shrink */ - - -/** - * Sets a connection's current state in the state machine. Any special - * processing that needs to happen on certain state transitions can - * happen here. - * - * @param c, pointer of the concurrency - * @param state, connection state - */ -static void ms_conn_set_state(ms_conn_t *c, int state) -{ - assert(c != NULL); - - if (state != c->state) - { - if (state == conn_read) - { - ms_conn_shrink(c); - } - c->state= state; - } -} /* ms_conn_set_state */ - - -/** - * update the event if socks change state. for example: when - * change the listen scoket read event to sock write event, or - * change socket handler, we could call this function. - * - * @param c, pointer of the concurrency - * @param new_flags, new event flags - * - * @return bool, if success, return true, else return false - */ -static bool ms_update_event(ms_conn_t *c, const int new_flags) -{ - assert(c != NULL); - - struct event_base *base= c->event.ev_base; - if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0) - && (! ms_setting.facebook_test || (c->total_sfds == 1))) - { - return true; - } - - if (event_del(&c->event) == -1) - { - /* try to delete the event again */ - if (event_del(&c->event) == -1) - { - return false; - } - } - - event_set(&c->event, - c->sfd, - (short)new_flags, - ms_event_handler, - (void *)c); - event_base_set(base, &c->event); - c->ev_flags= (short)new_flags; - - if (event_add(&c->event, NULL) == -1) - { - return false; - } - - return true; -} /* ms_update_event */ - - -/** - * If user want to get the expected throughput, we could limit - * the performance of memslap. we could give up some work and - * just wait a short time. The function is used to check this - * case. - * - * @param c, pointer of the concurrency - * - * @return bool, if success, return true, else return false - */ -static bool ms_need_yield(ms_conn_t *c) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - int64_t tps= 0; - int64_t time_diff= 0; - struct timeval curr_time; - ms_task_t *task= &c->curr_task; - - if (ms_setting.expected_tps > 0) - { - gettimeofday(&curr_time, NULL); - time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time); - tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000); - - /* current throughput is greater than expected throughput */ - if (tps > ms_thread->thread_ctx->tps_perconn) - { - return true; - } - } - - return false; -} /* ms_need_yield */ - - -/** - * used to update the start time of each operation - * - * @param c, pointer of the concurrency - */ -static void ms_update_start_time(ms_conn_t *c) -{ - ms_task_item_t *item= c->curr_task.item; - - if ((ms_setting.stat_freq > 0) || c->udp - || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))) - { - gettimeofday(&c->start_time, NULL); - if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)) - { - /* record the current time */ - item->client_time= c->start_time.tv_sec; - } - } -} /* ms_update_start_time */ - - -/** - * run the state machine - * - * @param c, pointer of the concurrency - */ -static void ms_drive_machine(ms_conn_t *c) -{ - bool stop= false; - - assert(c != NULL); - - while (! stop) - { - switch (c->state) - { - case conn_read: - if (c->readval) - { - if (c->rbytes >= c->rvbytes) - { - ms_complete_nread(c); - break; - } - } - else - { - if (ms_try_read_line(c) != 0) - { - break; - } - } - - if (ms_try_read_network(c) != 0) - { - break; - } - - /* doesn't read all the response data, wait event wake up */ - if (! c->currcmd.isfinish) - { - if (! ms_update_event(c, EV_READ | EV_PERSIST)) - { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - break; - } - stop= true; - break; - } - - /* we have no command line and no data to read from network, next write */ - ms_conn_set_state(c, conn_write); - memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ - - break; - - case conn_write: - if (! c->ctnwrite && ms_need_yield(c)) - { - usleep(10); - - if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) - { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - break; - } - stop= true; - break; - } - - if (! c->ctnwrite && (ms_exec_task(c) != 0)) - { - ms_conn_set_state(c, conn_closing); - break; - } - - /* record the start time before starting to send data if necessary */ - if (! c->ctnwrite || (c->change_sfd && c->ctnwrite)) - { - if (c->change_sfd) - { - c->change_sfd= false; - } - ms_update_start_time(c); - } - - /* change sfd if necessary */ - if (c->change_sfd) - { - c->ctnwrite= true; - stop= true; - break; - } - - /* execute task until nothing need be written to network */ - if (! c->ctnwrite && (c->msgcurr == c->msgused)) - { - if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) - { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - break; - } - stop= true; - break; - } - - switch (ms_transmit(c)) - { - case TRANSMIT_COMPLETE: - /* we have no data to write to network, next wait repose */ - if (! ms_update_event(c, EV_READ | EV_PERSIST)) - { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - c->ctnwrite= false; - break; - } - ms_conn_set_state(c, conn_read); - c->ctnwrite= false; - stop= true; - break; - - case TRANSMIT_INCOMPLETE: - c->ctnwrite= true; - break; /* Continue in state machine. */ - - case TRANSMIT_HARD_ERROR: - c->ctnwrite= false; - break; - - case TRANSMIT_SOFT_ERROR: - c->ctnwrite= true; - stop= true; - break; - - default: - break; - } /* switch */ - - break; - - case conn_closing: - /* recovery mode, need reconnect if connection close */ - if (ms_setting.reconnect && (! ms_global.time_out - || ((ms_setting.run_time == 0) - && (c->remain_exec_num > 0)))) - { - if (ms_reconn(c) != 0) - { - ms_conn_close(c); - stop= true; - break; - } - - ms_reset_conn(c, false); - - if (c->total_sfds == 1) - { - if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) - { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - break; - } - } - - break; - } - else - { - ms_conn_close(c); - stop= true; - break; - } - - default: - assert(0); - } /* switch */ - } -} /* ms_drive_machine */ - - -/** - * the event handler of each thread - * - * @param fd, the file descriptor of socket - * @param which, event flag - * @param arg, argument - */ -void ms_event_handler(const int fd, const short which, void *arg) -{ - ms_conn_t *c= (ms_conn_t *)arg; - - assert(c != NULL); - - c->which= which; - - /* sanity */ - if (fd != c->sfd) - { - fprintf(stderr, - "Catastrophic: event fd: %d doesn't match conn fd: %d\n", - fd, - c->sfd); - ms_conn_close(c); - exit(1); - } - assert(fd == c->sfd); - - ms_drive_machine(c); - - /* wait for next event */ -} /* ms_event_handler */ - - -/** - * get the next socket descriptor index to run for replication - * - * @param c, pointer of the concurrency - * @param cmd, command(get or set ) - * - * @return int, if success, return the index, else return EXIT_SUCCESS - */ -static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd) -{ - uint32_t sock_index= 0; - uint32_t i= 0; - - if (c->total_sfds == 1) - { - return EXIT_SUCCESS; - } - - if (ms_setting.rep_write_srv == 0) - { - return sock_index; - } - - do - { - if (cmd == CMD_SET) - { - for (i= 0; i < ms_setting.rep_write_srv; i++) - { - if (c->tcpsfd[i] > 0) - { - break; - } - } - - if (i == ms_setting.rep_write_srv) - { - /* random get one replication server to read */ - sock_index= (uint32_t)random() % c->total_sfds; - } - else - { - /* random get one replication writing server to write */ - sock_index= (uint32_t)random() % ms_setting.rep_write_srv; - } - } - else if (cmd == CMD_GET) - { - /* random get one replication server to read */ - sock_index= (uint32_t)random() % c->total_sfds; - } - } - while (c->tcpsfd[sock_index] == 0); - - return sock_index; -} /* ms_get_rep_sock_index */ - - -/** - * get the next socket descriptor index to run - * - * @param c, pointer of the concurrency - * - * @return int, return the index - */ -static uint32_t ms_get_next_sock_index(ms_conn_t *c) -{ - uint32_t sock_index= 0; - - do - { - sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx; - } - while (c->tcpsfd[sock_index] == 0); - - return sock_index; -} /* ms_get_next_sock_index */ - - -/** - * update socket event of the connections - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_update_conn_sock_event(ms_conn_t *c) -{ - assert(c != NULL); - - switch (c->currcmd.cmd) - { - case CMD_SET: - if (ms_setting.facebook_test && c->udp) - { - c->sfd= c->tcpsfd[0]; - c->udp= false; - c->change_sfd= true; - } - break; - - case CMD_GET: - if (ms_setting.facebook_test && ! c->udp) - { - c->sfd= c->udpsfd; - c->udp= true; - c->change_sfd= true; - } - break; - - default: - break; - } /* switch */ - - if (! c->udp && (c->total_sfds > 1)) - { - if (c->cur_idx != c->total_sfds) - { - if (ms_setting.rep_write_srv == 0) - { - c->cur_idx= ms_get_next_sock_index(c); - } - else - { - c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd); - } - } - else - { - /* must select the first sock of the connection at the beginning */ - c->cur_idx= 0; - } - - c->sfd= c->tcpsfd[c->cur_idx]; - assert(c->sfd != 0); - c->change_sfd= true; - } - - if (c->change_sfd) - { - if (! ms_update_event(c, EV_WRITE | EV_PERSIST)) - { - fprintf(stderr, "Couldn't update event.\n"); - ms_conn_set_state(c, conn_closing); - return -1; - } - } - - return EXIT_SUCCESS; -} /* ms_update_conn_sock_event */ - - -/** - * for ASCII protocol, this function build the set command - * string and send the command. - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item) -{ - int value_offset; - int write_len; - char *buffer= c->wbuf; - - write_len= snprintf(buffer, - c->wsize, - " %u %d %d\r\n", - 0, - item->exp_time, - item->value_size); - - if (write_len > c->wsize || write_len < 0) - { - /* ought to be always enough. just fail for simplicity */ - fprintf(stderr, "output command line too long.\n"); - return -1; - } - - if (item->value_offset == INVALID_OFFSET) - { - value_offset= item->key_suffix_offset; - } - else - { - value_offset= item->value_offset; - } - - if ((ms_add_iov(c, "set ", 4) != 0) - || (ms_add_iov(c, (char *)&item->key_prefix, - (int)KEY_PREFIX_SIZE) != 0) - || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], - item->key_size - (int)KEY_PREFIX_SIZE) != 0) - || (ms_add_iov(c, buffer, write_len) != 0) - || (ms_add_iov(c, &ms_setting.char_block[value_offset], - item->value_size) != 0) - || (ms_add_iov(c, "\r\n", 2) != 0) - || (c->udp && (ms_build_udp_headers(c) != 0))) - { - return -1; - } - - return EXIT_SUCCESS; -} /* ms_build_ascii_write_buf_set */ - - -/** - * used to send set command to server - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item) -{ - assert(c != NULL); - - c->currcmd.cmd= CMD_SET; - c->currcmd.isfinish= false; - c->currcmd.retstat= MCD_FAILURE; - - if (ms_update_conn_sock_event(c) != 0) - { - return -1; - } - - c->msgcurr= 0; - c->msgused= 0; - c->iovused= 0; - if (ms_add_msghdr(c) != 0) - { - fprintf(stderr, "Out of memory preparing request."); - return -1; - } - - /* binary protocol */ - if (c->protocol == binary_prot) - { - if (ms_build_bin_write_buf_set(c, item) != 0) - { - return -1; - } - } - else - { - if (ms_build_ascii_write_buf_set(c, item) != 0) - { - return -1; - } - } - - atomic_add_size(&ms_stats.obj_bytes, - item->key_size + item->value_size); - atomic_add_size(&ms_stats.cmd_set, 1); - - return EXIT_SUCCESS; -} /* ms_mcd_set */ - - -/** - * for ASCII protocol, this function build the get command - * string and send the command. - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) -{ - if ((ms_add_iov(c, "get ", 4) != 0) - || (ms_add_iov(c, (char *)&item->key_prefix, - (int)KEY_PREFIX_SIZE) != 0) - || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], - item->key_size - (int)KEY_PREFIX_SIZE) != 0) - || (ms_add_iov(c, "\r\n", 2) != 0) - || (c->udp && (ms_build_udp_headers(c) != 0))) - { - return -1; - } - - return EXIT_SUCCESS; -} /* ms_build_ascii_write_buf_get */ - - -/** - * used to send the get command to server - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) -{ - assert(c != NULL); - - c->currcmd.cmd= CMD_GET; - c->currcmd.isfinish= false; - c->currcmd.retstat= MCD_FAILURE; - - if (ms_update_conn_sock_event(c) != 0) - { - return -1; - } - - c->msgcurr= 0; - c->msgused= 0; - c->iovused= 0; - if (ms_add_msghdr(c) != 0) - { - fprintf(stderr, "Out of memory preparing request."); - return -1; - } - - /* binary protocol */ - if (c->protocol == binary_prot) - { - if (ms_build_bin_write_buf_get(c, item) != 0) - { - return -1; - } - } - else - { - if (ms_build_ascii_write_buf_get(c, item) != 0) - { - return -1; - } - } - - atomic_add_size(&ms_stats.cmd_get, 1); - - return EXIT_SUCCESS; -} /* ms_mcd_get */ - - -/** - * for ASCII protocol, this function build the multi-get command - * string and send the command. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_build_ascii_write_buf_mlget(ms_conn_t *c) -{ - ms_task_item_t *item; - - if (ms_add_iov(c, "get", 3) != 0) - { - return -1; - } - - for (int i= 0; i < c->mlget_task.mlget_num; i++) - { - item= c->mlget_task.mlget_item[i].item; - assert(item != NULL); - if ((ms_add_iov(c, " ", 1) != 0) - || (ms_add_iov(c, (char *)&item->key_prefix, - (int)KEY_PREFIX_SIZE) != 0) - || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], - item->key_size - (int)KEY_PREFIX_SIZE) != 0)) - { - return -1; - } - } - - if ((ms_add_iov(c, "\r\n", 2) != 0) - || (c->udp && (ms_build_udp_headers(c) != 0))) - { - return -1; - } - - return EXIT_SUCCESS; -} /* ms_build_ascii_write_buf_mlget */ - - -/** - * used to send the multi-get command to server - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -int ms_mcd_mlget(ms_conn_t *c) -{ - ms_task_item_t *item; - - assert(c != NULL); - assert(c->mlget_task.mlget_num >= 1); - - c->currcmd.cmd= CMD_GET; - c->currcmd.isfinish= false; - c->currcmd.retstat= MCD_FAILURE; - - if (ms_update_conn_sock_event(c) != 0) - { - return -1; - } - - c->msgcurr= 0; - c->msgused= 0; - c->iovused= 0; - if (ms_add_msghdr(c) != 0) - { - fprintf(stderr, "Out of memory preparing request."); - return -1; - } - - /* binary protocol */ - if (c->protocol == binary_prot) - { - if (ms_build_bin_write_buf_mlget(c) != 0) - { - return -1; - } - } - else - { - if (ms_build_ascii_write_buf_mlget(c) != 0) - { - return -1; - } - } - - /* decrease operation time of each item */ - for (int i= 0; i < c->mlget_task.mlget_num; i++) - { - item= c->mlget_task.mlget_item[i].item; - atomic_add_size(&ms_stats.cmd_get, 1); - } - - (void)item; - - return EXIT_SUCCESS; -} /* ms_mcd_mlget */ - - -/** - * binary protocol support - */ - -/** - * for binary protocol, parse the response of server - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_bin_process_response(ms_conn_t *c) -{ - const char *errstr= NULL; - - assert(c != NULL); - - uint32_t bodylen= c->binary_header.response.bodylen; - uint8_t opcode= c->binary_header.response.opcode; - uint16_t status= c->binary_header.response.status; - - if (bodylen > 0) - { - c->rvbytes= (int32_t)bodylen; - c->readval= true; - return EXIT_FAILURE; - } - else - { - switch (status) - { - case PROTOCOL_BINARY_RESPONSE_SUCCESS: - if (opcode == PROTOCOL_BINARY_CMD_SET) - { - c->currcmd.retstat= MCD_STORED; - } - else if (opcode == PROTOCOL_BINARY_CMD_DELETE) - { - c->currcmd.retstat= MCD_DELETED; - } - else if (opcode == PROTOCOL_BINARY_CMD_GET) - { - c->currcmd.retstat= MCD_END; - } - break; - - case PROTOCOL_BINARY_RESPONSE_ENOMEM: - errstr= "Out of memory"; - c->currcmd.retstat= MCD_SERVER_ERROR; - break; - - case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: - errstr= "Unknown command"; - c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; - break; - - case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: - errstr= "Not found"; - c->currcmd.retstat= MCD_NOTFOUND; - break; - - case PROTOCOL_BINARY_RESPONSE_EINVAL: - errstr= "Invalid arguments"; - c->currcmd.retstat= MCD_PROTOCOL_ERROR; - break; - - case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: - errstr= "Data exists for key."; - break; - - case PROTOCOL_BINARY_RESPONSE_E2BIG: - errstr= "Too large."; - c->currcmd.retstat= MCD_SERVER_ERROR; - break; - - case PROTOCOL_BINARY_RESPONSE_NOT_STORED: - errstr= "Not stored."; - c->currcmd.retstat= MCD_NOTSTORED; - break; - - default: - errstr= "Unknown error"; - c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE; - break; - } /* switch */ - - if (errstr != NULL) - { - fprintf(stderr, "%s\n", errstr); - } - } - - return EXIT_SUCCESS; -} /* ms_bin_process_response */ - - -/* build binary header and add the header to the buffer to send */ - -/** - * build binary header and add the header to the buffer to send - * - * @param c, pointer of the concurrency - * @param opcode, operation code - * @param hdr_len, length of header - * @param key_len, length of key - * @param body_len. length of body - */ -static void ms_add_bin_header(ms_conn_t *c, - uint8_t opcode, - uint8_t hdr_len, - uint16_t key_len, - uint32_t body_len) -{ - protocol_binary_request_header *header; - - assert(c != NULL); - - header= (protocol_binary_request_header *)c->wcurr; - - header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ; - header->request.opcode= (uint8_t)opcode; - header->request.keylen= htons(key_len); - - header->request.extlen= (uint8_t)hdr_len; - header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES; - header->request.vbucket= 0; - - header->request.bodylen= htonl(body_len); - header->request.opaque= 0; - header->request.cas= 0; - - ms_add_iov(c, c->wcurr, sizeof(header->request)); -} /* ms_add_bin_header */ - - -/** - * add the key to the socket write buffer array - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - */ -static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item) -{ - ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE); - ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset], - item->key_size - (int)KEY_PREFIX_SIZE); -} - - -/** - * for binary protocol, this function build the set command - * and add the command to send buffer array. - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item) -{ - assert(c->wbuf == c->wcurr); - - int value_offset; - protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr; - uint16_t keylen= (uint16_t)item->key_size; - uint32_t bodylen= (uint32_t)sizeof(rep->message.body) - + (uint32_t)keylen + (uint32_t)item->value_size; - - ms_add_bin_header(c, - PROTOCOL_BINARY_CMD_SET, - sizeof(rep->message.body), - keylen, - bodylen); - rep->message.body.flags= 0; - rep->message.body.expiration= htonl((uint32_t)item->exp_time); - ms_add_iov(c, &rep->message.body, sizeof(rep->message.body)); - ms_add_key_to_iov(c, item); - - if (item->value_offset == INVALID_OFFSET) - { - value_offset= item->key_suffix_offset; - } - else - { - value_offset= item->value_offset; - } - ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size); - - return EXIT_SUCCESS; -} /* ms_build_bin_write_buf_set */ - - -/** - * for binary protocol, this function build the get command and - * add the command to send buffer array. - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item) -{ - assert(c->wbuf == c->wcurr); - - ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size, - (uint32_t)item->key_size); - ms_add_key_to_iov(c, item); - - return EXIT_SUCCESS; -} /* ms_build_bin_write_buf_get */ - - -/** - * for binary protocol, this function build the multi-get - * command and add the command to send buffer array. - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_build_bin_write_buf_mlget(ms_conn_t *c) -{ - ms_task_item_t *item; - - assert(c->wbuf == c->wcurr); - - for (int i= 0; i < c->mlget_task.mlget_num; i++) - { - item= c->mlget_task.mlget_item[i].item; - assert(item != NULL); - - ms_add_bin_header(c, - PROTOCOL_BINARY_CMD_GET, - 0, - (uint16_t)item->key_size, - (uint32_t)item->key_size); - ms_add_key_to_iov(c, item); - c->wcurr+= sizeof(protocol_binary_request_get); - } - - c->wcurr= c->wbuf; - - return EXIT_SUCCESS; -} /* ms_build_bin_write_buf_mlget */ diff --git a/src/bin/ms_conn.h b/src/bin/ms_conn.h deleted file mode 100644 index d0530a7c..00000000 --- a/src/bin/ms_conn.h +++ /dev/null @@ -1,241 +0,0 @@ -/* - * File: ms_conn.h - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ -#ifndef MS_CONN_H -#define MS_CONN_H - -#include -#include -#include -#include - -#include "ms_task.h" -#include "libmemcachedprotocol-0.0/binary.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define DATA_BUFFER_SIZE (1024 * 1024 + 2048) /* read buffer, 1M + 2k, enough for the max value(1M) */ -#define WRITE_BUFFER_SIZE (32 * 1024) /* write buffer, 32k */ -#define UDP_DATA_BUFFER_SIZE (1 * 1024 * 1024) /* read buffer for UDP, 1M */ -#define UDP_MAX_PAYLOAD_SIZE 1400 /* server limit UDP payload size */ -#define UDP_MAX_SEND_PAYLOAD_SIZE 1400 /* mtu size is 1500 */ -#define UDP_HEADER_SIZE 8 /* UDP header size */ -#define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* Maximum socket buffer size */ -#define SOCK_WAIT_TIMEOUT 30 /* maximum waiting time of UDP, 30s */ -#define MAX_UDP_PACKET (1 << 16) /* maximum UDP packets, 65536 */ - -/* Initial size of the sendmsg() scatter/gather array. */ -#define IOV_LIST_INITIAL 400 - -/* Initial number of sendmsg() argument structures to allocate. */ -#define MSG_LIST_INITIAL 10 - -/* High water marks for buffer shrinking */ -#define READ_BUFFER_HIGHWAT (2 * DATA_BUFFER_SIZE) -#define UDP_DATA_BUFFER_HIGHWAT (4 * UDP_DATA_BUFFER_SIZE) -#define IOV_LIST_HIGHWAT 600 -#define MSG_LIST_HIGHWAT 100 - -/* parse udp header */ -#define HEADER_TO_REQID(ptr) ((uint16_t)*ptr * 256 \ - + (uint16_t)*(ptr + 1)) -#define HEADER_TO_SEQNUM(ptr) ((uint16_t)*(ptr \ - + 2) * 256 \ - + (uint16_t)*(ptr + 3)) -#define HEADER_TO_PACKETS(ptr) ((uint16_t)*(ptr \ - + 4) * 256 \ - + (uint16_t)*(ptr + 5)) - -/* states of connection */ -enum conn_states -{ - conn_read, /* reading in a command line */ - conn_write, /* writing out a simple response */ - conn_closing /* closing this connection */ -}; - -/* returned states of memcached command */ -enum mcd_ret -{ - MCD_SUCCESS, /* command success */ - MCD_FAILURE, /* command failure */ - MCD_UNKNOWN_READ_FAILURE, /* unknown read failure */ - MCD_PROTOCOL_ERROR, /* protocol error */ - MCD_CLIENT_ERROR, /* client error, wrong command */ - MCD_SERVER_ERROR, /* server error, server run command failed */ - MCD_DATA_EXISTS, /* object is existent in server */ - MCD_NOTSTORED, /* server doesn't set the object successfully */ - MCD_STORED, /* server set the object successfully */ - MCD_NOTFOUND, /* server not find the object */ - MCD_END, /* end of the response of get command */ - MCD_DELETED, /* server delete the object successfully */ - MCD_STAT /* response of stats command */ -}; - -/* used to store the current or previous running command state */ -typedef struct cmdstat -{ - int cmd; /* command name */ - int retstat; /* return state of this command */ - bool isfinish; /* if it read all the response data */ - uint64_t key_prefix; /* key prefix */ -} ms_cmdstat_t; - -/* udp packet structure */ -typedef struct udppkt -{ - uint8_t *header; /* udp header of the packet */ - char *data; /* udp data of the packet */ - int rbytes; /* number of data in the packet */ - int copybytes; /* number of copied data in the packet */ -} ms_udppkt_t; - -/* three protocols supported */ -enum protocol -{ - ascii_prot = 3, /* ASCII protocol */ - binary_prot /* binary protocol */ -}; - -/** - * concurrency structure - * - * Each thread has a libevent to manage the events of network. - * Each thread has one or more self-governed concurrencies; - * each concurrency has one or more socket connections. This - * concurrency structure includes all the private variables of - * the concurrency. - */ -typedef struct conn -{ - uint32_t conn_idx; /* connection index in the thread */ - int sfd; /* current tcp sock handler of the connection structure */ - int udpsfd; /* current udp sock handler of the connection structure*/ - int state; /* state of the connection */ - struct event event; /* event for libevent */ - short ev_flags; /* event flag for libevent */ - short which; /* which events were just triggered */ - bool change_sfd; /* whether change sfd */ - - int *tcpsfd; /* TCP sock array */ - uint32_t total_sfds; /* how many socks in the tcpsfd array */ - uint32_t alive_sfds; /* alive socks */ - uint32_t cur_idx; /* current sock index in tcpsfd array */ - - ms_cmdstat_t precmd; /* previous command state */ - ms_cmdstat_t currcmd; /* current command state */ - - char *rbuf; /* buffer to read commands into */ - char *rcurr; /* but if we parsed some already, this is where we stopped */ - int rsize; /* total allocated size of rbuf */ - int rbytes; /* how much data, starting from rcur, do we have unparsed */ - - bool readval; /* read value state, read known data size */ - int rvbytes; /* total value size need to read */ - - char *wbuf; /* buffer to write commands out */ - char *wcurr; /* for multi-get, where we stopped */ - int wsize; /* total allocated size of wbuf */ - bool ctnwrite; /* continue to write */ - - /* data for the mwrite state */ - struct iovec *iov; - int iovsize; /* number of elements allocated in iov[] */ - int iovused; /* number of elements used in iov[] */ - - struct msghdr *msglist; - int msgsize; /* number of elements allocated in msglist[] */ - int msgused; /* number of elements used in msglist[] */ - int msgcurr; /* element in msglist[] being transmitted now */ - int msgbytes; /* number of bytes in current msg */ - - /* data for UDP clients */ - bool udp; /* is this is a UDP "connection" */ - uint32_t request_id; /* UDP request ID of current operation, if this is a UDP "connection" */ - uint8_t *hdrbuf; /* udp packet headers */ - int hdrsize; /* number of headers' worth of space is allocated */ - struct sockaddr srv_recv_addr; /* Sent the most recent request to which server */ - socklen_t srv_recv_addr_size; - - /* udp read buffer */ - char *rudpbuf; /* buffer to read commands into for udp */ - int rudpsize; /* total allocated size of rudpbuf */ - int rudpbytes; /* how much data, starting from rudpbuf */ - - /* order udp packet */ - ms_udppkt_t *udppkt; /* the offset of udp packet in rudpbuf */ - int packets; /* number of total packets need to read */ - int recvpkt; /* number of received packets */ - int pktcurr; /* current packet in rudpbuf being ordered */ - int ordcurr; /* current ordered packet */ - - ms_task_item_t *item_win; /* task sequence */ - int win_size; /* current task window size */ - uint64_t set_cursor; /* current set item index in the item window */ - ms_task_t curr_task; /* current running task */ - ms_mlget_task_t mlget_task; /* multi-get task */ - - int warmup_num; /* to run how many warm up operations*/ - int remain_warmup_num; /* left how many warm up operations to run */ - int64_t exec_num; /* to run how many task operations */ - int64_t remain_exec_num; /* how many remained task operations to run */ - - /* response time statistic and time out control */ - struct timeval start_time; /* start time of current operation(s) */ - struct timeval end_time; /* end time of current operation(s) */ - - /* Binary protocol stuff */ - protocol_binary_response_header binary_header; /* local temporary binary header */ - enum protocol protocol; /* which protocol this connection speaks */ -} ms_conn_t; - -/* used to generate the key prefix */ -uint64_t ms_get_key_prefix(void); - - -/** - * setup a connection, each connection structure of each - * thread must call this function to initialize. - */ -int ms_setup_conn(ms_conn_t *c); - - -/* after one operation completes, reset the connection */ -void ms_reset_conn(ms_conn_t *c, bool timeout); - - -/** - * reconnect several disconnected socks in the connection - * structure, the ever-1-second timer of the thread will check - * whether some socks in the connections disconnect. if - * disconnect, reconnect the sock. - */ -int ms_reconn_socks(ms_conn_t *c); - - -/* used to send set command to server */ -int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item); - - -/* used to send the get command to server */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item); - - -/* used to send the multi-get command to server */ -int ms_mcd_mlget(ms_conn_t *c); - - -#ifdef __cplusplus -} -#endif - -#endif /* end of MS_CONN_H */ diff --git a/src/bin/ms_memslap.h b/src/bin/ms_memslap.h deleted file mode 100644 index 72af301a..00000000 --- a/src/bin/ms_memslap.h +++ /dev/null @@ -1,133 +0,0 @@ -/* - * File: ms_memslap.h - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ -#ifndef MS_MEMSLAP_H -#define MS_MEMSLAP_H - -#include -#include -#include -#include -#include -#include -#include -#include -#if !defined(__cplusplus) -# include -#endif -#include - -#include "ms_stats.h" -#include "ms_atomic.h" - -#ifdef __cplusplus -extern "C" { -#endif - -/* command line option */ -typedef enum -{ - OPT_VERSION= 'V', - OPT_HELP= 'h', - OPT_UDP= 'U', - OPT_SERVERS= 's', - OPT_EXECUTE_NUMBER= 'x', - OPT_THREAD_NUMBER= 'T', - OPT_CONCURRENCY= 'c', - OPT_FIXED_LTH= 'X', - OPT_VERIFY= 'v', - OPT_GETS_DIVISION= 'd', - OPT_TIME= 't', - OPT_CONFIG_CMD= 'F', - OPT_WINDOW_SIZE= 'w', - OPT_EXPIRE= 'e', - OPT_STAT_FREQ= 'S', - OPT_RECONNECT= 'R', - OPT_VERBOSE= 'b', - OPT_FACEBOOK_TEST= 'a', - OPT_SOCK_PER_CONN= 'n', - OPT_BINARY_PROTOCOL= 'B', - OPT_OVERWRITE= 'o', - OPT_TPS= 'P', - OPT_REP_WRITE_SRV= 'p' -} ms_options_t; - -/* global statistic of response time */ -typedef struct statistic -{ - pthread_mutex_t stat_mutex; /* synchronize the following members */ - - ms_stat_t get_stat; /* statistics of get command */ - ms_stat_t set_stat; /* statistics of set command */ - ms_stat_t total_stat; /* statistics of both get and set commands */ -} ms_statistic_t; - -/* global status statistic structure */ -typedef struct stats -{ - ATOMIC uint32_t active_conns; /* active connections */ - ATOMIC size_t bytes_read; /* read bytes */ - ATOMIC size_t bytes_written; /* written bytes */ - ATOMIC size_t obj_bytes; /* object bytes */ - ATOMIC size_t pre_cmd_get; /* previous total get command count */ - ATOMIC size_t pre_cmd_set; /* previous total set command count */ - ATOMIC size_t cmd_get; /* current total get command count */ - ATOMIC size_t cmd_set; /* current total set command count */ - ATOMIC size_t get_misses; /* total objects of get miss */ - ATOMIC size_t vef_miss; /* total objects of verification miss */ - ATOMIC size_t vef_failed; /* total objects of verification failed */ - ATOMIC size_t unexp_unget; /* total objects which is unexpired but not get */ - ATOMIC size_t exp_get; /* total objects which is expired but get */ - ATOMIC size_t pkt_disorder; /* disorder packages of UDP */ - ATOMIC size_t pkt_drop; /* packages dropped of UDP */ - ATOMIC size_t udp_timeout; /* how many times timeout of UDP happens */ -} ms_stats_t; - -/* lock adapter */ -typedef struct sync_lock -{ - uint32_t count; - pthread_mutex_t lock; - pthread_cond_t cond; -} ms_sync_lock_t; - -/* global variable structure */ -typedef struct global -{ - /* synchronize lock */ - ms_sync_lock_t init_lock; - ms_sync_lock_t warmup_lock; - ms_sync_lock_t run_lock; - - /* mutex for outputing error log synchronously when memslap crashes */ - pthread_mutex_t quit_mutex; - - /* mutex for generating key prefix */ - pthread_mutex_t seq_mutex; - - /* global synchronous flags for slap mode */ - bool finish_warmup; - bool time_out; -} ms_global_t; - -/* global structure */ -extern ms_global_t ms_global; - -/* global stats information structure */ -extern ms_stats_t ms_stats; - -/* global statistic structure */ -extern ms_statistic_t ms_statistic; - -#ifdef __cplusplus -} -#endif - -#endif /* end of MS_MEMSLAP_H */ diff --git a/src/bin/ms_setting.c b/src/bin/ms_setting.c deleted file mode 100644 index 6cb367a3..00000000 --- a/src/bin/ms_setting.c +++ /dev/null @@ -1,1068 +0,0 @@ -/* - * File: ms_setting.c - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ - -#include "mem_config.h" - -#include "libmemcached/memcached.h" - -#include -#include -#include -#include -#include -#include -#include - - - -#include "ms_setting.h" -#include "ms_conn.h" - -#define MAX_EXEC_NUM 0x4000000000000000 /* 1 << 62 */ -#define ADDR_ALIGN(addr) ((addr + 15) & ~(16 - 1)) /* 16 bytes aligned */ -#define RAND_CHAR_SIZE (10 * 1024 * 1024) /* 10M character table */ -#define RESERVED_RAND_CHAR_SIZE (2 * 1024 * 1024) /* reserved 2M to avoid pointer sloping over */ - -#define DEFAULT_CONFIG_NAME ".memslap.cnf" - -#define DEFAULT_THREADS_NUM 1 /* default start one thread */ -#define DEFAULT_CONNS_NUM 16 /* default each thread with 16 connections */ -#define DEFAULT_EXE_NUM 0 /* default execute number is 0 */ -#define DEFAULT_VERIFY_RATE 0.0 /* default it doesn't do data verification */ -#define DEFAULT_OVERWRITE_RATE 0.0 /* default it doesn't do overwrite */ -#define DEFAULT_DIV 1 /* default it runs single get */ -#define DEFAULT_RUN_TIME 600 /* default run time 10 minutes */ -#define DEFAULT_WINDOW_SIZE (10 * UNIT_ITEMS_COUNT) /* default window size is 10k */ -#define DEFAULT_SOCK_PER_CONN 1 /* default socks per connection is 1 */ - -/* Use this for string generation */ -#define CHAR_COUNT 64 /* number of characters used to generate character table */ -const char ALPHANUMBERICS[]= - "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz.-"; - -ms_setting_st ms_setting; /* store the settings specified by user */ - - -/* read setting from configuration file */ -static void ms_get_serverlist(char *str); -static uint32_t ms_get_cpu_count(void); -ms_conf_type_t ms_get_conf_type(char *line); -static int ms_is_line_data(char *line); -static int ms_read_is_data(char *line, ssize_t nread); -static void ms_no_config_file(void); -static void ms_parse_cfg_file(char *cfg_file); - - -/* initialize setting structure */ -static void ms_init_random_block(void); -static void ms_calc_avg_size(void); -static int ms_shuffle_distr(ms_distr_t *distr, int length); -static void ms_build_distr(void); -static void ms_print_setting(void); -static void ms_setting_slapmode_init_pre(void); -static void ms_setting_slapmode_init_post(void); - -#if !defined(HAVE_GETLINE) -#include -static ssize_t getline (char **line, size_t *line_size, FILE *fp) -{ - char delim= '\n'; - ssize_t result= 0; - size_t cur_len= 0; - - if (line == NULL || line_size == NULL || fp == NULL) - { - errno = EINVAL; - return -1; - } - - if (*line == NULL || *line_size == 0) - { - char *new_line; - *line_size = 120; - new_line= (char *) realloc (*line, *line_size); - if (new_line == NULL) - { - result= -1; - return result; - } - *line= new_line; - } - - for (;;) - { - int i= getc(fp); - if (i == EOF) - { - result = -1; - break; - } - - /* Make enough space for len+1 (for final NUL) bytes. */ - if (cur_len + 1 >= *line_size) - { - size_t needed_max= - SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX; - size_t needed= (2 * (*line_size)) + 1; - char *new_line; - - if (needed_max < needed) - needed= needed_max; - if (cur_len + 1 >= needed) - { - result= -1; - errno= EOVERFLOW; - return result; - } - - new_line= (char *)realloc(*line, needed); - if (new_line == NULL) - { - result= -1; - return result; - } - - *line= new_line; - *line_size= needed; - } - - (*line)[cur_len]= (char)i; - cur_len++; - - if (i == delim) - break; - } - (*line)[cur_len] = '\0'; - if (cur_len != 0) - return (ssize_t)cur_len; - return result; -} -#endif - -/** - * parse the server list string, and build the servers - * information structure array. this function is used to parse - * the command line options specified by user. - * - * @param str, the string of server list - */ -static void ms_get_serverlist(char *str) -{ - ms_mcd_server_t *srvs= NULL; - - /** - * Servers list format is like this. For example: - * "localhost:11108, localhost:11109" - */ - memcached_server_st *server_pool; - server_pool = memcached_servers_parse(str); - - for (uint32_t loop= 0; loop < memcached_server_list_count(server_pool); loop++) - { - assert(ms_setting.srv_cnt < ms_setting.total_srv_cnt); - strcpy(ms_setting.servers[ms_setting.srv_cnt].srv_host_name, server_pool[loop].hostname); - ms_setting.servers[ms_setting.srv_cnt].srv_port= server_pool[loop].port; - ms_setting.servers[ms_setting.srv_cnt].disconn_cnt= 0; - ms_setting.servers[ms_setting.srv_cnt].reconn_cnt= 0; - ms_setting.srv_cnt++; - - if (ms_setting.srv_cnt >= ms_setting.total_srv_cnt) - { - srvs= (ms_mcd_server_t *)realloc( ms_setting.servers, - (size_t)ms_setting.total_srv_cnt * sizeof(ms_mcd_server_t) * 2); - if (srvs == NULL) - { - fprintf(stderr, "Can't reallocate servers structure.\n"); - exit(1); - } - ms_setting.servers= srvs; - ms_setting.total_srv_cnt*= 2; - } - } - - memcached_server_free(server_pool); -} /* ms_get_serverlist */ - - -/** - * used to get the CPU count of the current system - * - * @return return the cpu count if get, else return EXIT_FAILURE - */ -static uint32_t ms_get_cpu_count() -{ -#ifdef HAVE__SC_NPROCESSORS_ONLN - return sysconf(_SC_NPROCESSORS_CONF); - -#else -# ifdef HAVE_CPU_SET_T - int cpu_count= 0; - cpu_set_t cpu_set; - - sched_getaffinity(0, sizeof(cpu_set_t), &cpu_set); - - for (int i= 0; i < (sizeof(cpu_set_t) * 8); i++) - { - if (CPU_ISSET(i, &cpu_set)) - { - cpu_count++; - } - } - - return cpu_count; - -# endif -#endif - - /* the system with one cpu at least */ - return EXIT_FAILURE; -} /* ms_get_cpu_count */ - - -/** - * used to get the configure type based on the type string read - * from the configuration file. - * - * @param line, string of one line - * - * @return ms_conf_type_t - */ -ms_conf_type_t ms_get_conf_type(char *line) -{ - if (! memcmp(line, "key", strlen("key"))) - { - return CONF_KEY; - } - else if (! memcmp(line, "value", strlen("value"))) - { - return CONF_VALUE; - } - else if (! memcmp(line, "cmd", strlen("cmd"))) - { - return CONF_CMD; - } - else - { - return CONF_NULL; - } -} /* ms_get_conf_type */ - - -/** - * judge whether the line is a line with useful data. used to - * parse the configuration file. - * - * @param line, string of one line - * - * @return if success, return EXIT_FAILURE, else return EXIT_SUCCESS - */ -static int ms_is_line_data(char *line) -{ - assert(line != NULL); - - char *begin_ptr= line; - - while (isspace(*begin_ptr)) - { - begin_ptr++; - } - if ((begin_ptr[0] == '\0') || (begin_ptr[0] == '#')) - return EXIT_SUCCESS; - - return EXIT_FAILURE; -} /* ms_is_line_data */ - - -/** - * function to bypass blank line and comments - * - * @param line, string of one line - * @param nread, length of the line - * - * @return if it's EOF or not line data, return EXIT_SUCCESS, else return EXIT_FAILURE - */ -static int ms_read_is_data(char *line, ssize_t nread) -{ - if ((nread == EOF) || ! ms_is_line_data(line)) - return EXIT_SUCCESS; - - return EXIT_FAILURE; -} /* ms_read_is_data */ - - -/** - * if no configuration file, use this function to create the default - * configuration file. - */ -static void ms_no_config_file() -{ - char userpath[PATH_MAX]; - struct passwd *usr= NULL; - FILE *fd; - - usr= getpwuid(getuid()); - - snprintf(userpath, PATH_MAX, "%s/%s", usr->pw_dir, DEFAULT_CONFIG_NAME); - - if (access (userpath, F_OK | R_OK) == 0) - goto exit; - - fd= fopen(userpath, "w+"); - - if (fd == NULL) - { - fprintf(stderr, "Could not create default configure file %s\n", userpath); - perror(strerror(errno)); - exit(1); - } - fprintf(fd, "%s", DEFAULT_CONGIF_STR); - fclose(fd); - -exit: - ms_setting.cfg_file= strdup(userpath); -} /* ms_no_config_file */ - - -/** - * parse the configuration file - * - * @param cfg_file, the configuration file name - */ -static void ms_parse_cfg_file(char *cfg_file) -{ - FILE *f; - size_t start_len, end_len; - double proportion; - char *line= NULL; - size_t read_len; - ssize_t nread; - int cmd_type; - ms_conf_type_t conf_type; - int end_of_file= 0; - ms_key_distr_t *key_distr= NULL; - ms_value_distr_t *val_distr= NULL; - - if (cfg_file == NULL) - { - ms_no_config_file(); - cfg_file= ms_setting.cfg_file; - } - - /*read key value configure file*/ - if ((f= fopen(cfg_file, "r")) == NULL) - { - fprintf(stderr, "Can not open file: '%s'.\n", cfg_file); - exit(1); - } - - while (1) - { - if ((((nread= getline(&line, &read_len, f)) == 1) - || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ - continue; - - if (nread == EOF) - { - fprintf(stderr, "Bad configuration file, no configuration find.\n"); - exit(1); - } - conf_type= ms_get_conf_type(line); - break; - } - - while (! end_of_file) - { - switch (conf_type) - { - case CONF_KEY: - while (1) - { - if ((((nread= getline(&line, &read_len, f)) == 1) - || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ - continue; - - if (nread != EOF) - { - if (sscanf(line, "%zu %zu %lf ", &start_len, - &end_len, &proportion) != 3) - { - conf_type= ms_get_conf_type(line); - break; - } - ms_setting.key_distr[ms_setting.key_rng_cnt].start_len= start_len; - ms_setting.key_distr[ms_setting.key_rng_cnt].end_len= end_len; - ms_setting.key_distr[ms_setting.key_rng_cnt].key_prop= proportion; - ms_setting.key_rng_cnt++; - - if (ms_setting.key_rng_cnt >= ms_setting.total_key_rng_cnt) - { - key_distr= (ms_key_distr_t *)realloc( - ms_setting.key_distr, - (size_t)ms_setting. - total_key_rng_cnt * sizeof(ms_key_distr_t) * 2); - if (key_distr == NULL) - { - fprintf(stderr, - "Can't reallocate key distribution structure.\n"); - exit(1); - } - ms_setting.key_distr= key_distr; - ms_setting.total_key_rng_cnt*= 2; - } - continue; - } - end_of_file= 1; - break; - } - break; - - case CONF_VALUE: - while (1) - { - if ((((nread= getline(&line, &read_len, f)) == 1) - || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ - continue; - - if (nread != EOF) - { - if (sscanf(line, "%zu %zu %lf", &start_len, &end_len, - &proportion) != 3) - { - conf_type= ms_get_conf_type(line); - break; - } - ms_setting.value_distr[ms_setting.val_rng_cnt].start_len= - start_len; - ms_setting.value_distr[ms_setting.val_rng_cnt].end_len= end_len; - ms_setting.value_distr[ms_setting.val_rng_cnt].value_prop= - proportion; - ms_setting.val_rng_cnt++; - - if (ms_setting.val_rng_cnt >= ms_setting.total_val_rng_cnt) - { - val_distr= (ms_value_distr_t *)realloc( - ms_setting.value_distr, - (size_t)ms_setting. - total_val_rng_cnt * sizeof(ms_value_distr_t) * 2); - if (val_distr == NULL) - { - fprintf(stderr, - "Can't reallocate key distribution structure.\n"); - exit(1); - } - ms_setting.value_distr= val_distr; - ms_setting.total_val_rng_cnt*= 2; - } - continue; - } - end_of_file= 1; - break; - } - break; - - case CONF_CMD: - while (1) - { - if ((((nread= getline(&line, &read_len, f)) == 1) - || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ - continue; - - if (nread != EOF) - { - if (sscanf(line, "%d %lf", &cmd_type, &proportion) != 2) - { - conf_type= ms_get_conf_type(line); - break; - } - if (cmd_type >= CMD_NULL) - { - continue; - } - ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_type= - cmd_type; - ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_prop= - proportion; - ms_setting.cmd_used_count++; - continue; - } - end_of_file= 1; - break; - } - - case CONF_NULL: - while (1) - { - if ((((nread= getline(&line, &read_len, f)) == 1) - || ! ms_read_is_data(line, nread)) && (nread != EOF)) /* bypass blank line */ - continue; - - if (nread != EOF) - { - if ((conf_type= ms_get_conf_type(line)) != CONF_NULL) - { - break; - } - continue; - } - end_of_file= 1; - break; - } - break; - - default: - assert(0); - break; - } /* switch */ - } - - fclose(f); - - if (line != NULL) - { - free(line); - } -} /* ms_parse_cfg_file */ - - -/* calculate the average size of key and value */ -static void ms_calc_avg_size() -{ - double avg_val_size= 0.0; - double avg_key_size= 0.0; - double val_pro= 0.0; - double key_pro= 0.0; - double averge_len= 0.0; - size_t start_len= 0; - size_t end_len= 0; - - for (int j= 0; j < ms_setting.val_rng_cnt; j++) - { - val_pro= ms_setting.value_distr[j].value_prop; - start_len= ms_setting.value_distr[j].start_len; - end_len= ms_setting.value_distr[j].end_len; - - averge_len= val_pro * ((double)(start_len + end_len)) / 2; - avg_val_size+= averge_len; - } - - for (int j= 0; j < ms_setting.key_rng_cnt; j++) - { - key_pro= ms_setting.key_distr[j].key_prop; - start_len= ms_setting.key_distr[j].start_len; - end_len= ms_setting.key_distr[j].end_len; - - averge_len= key_pro * ((double)(start_len + end_len)) / 2; - avg_key_size+= averge_len; - } - - ms_setting.avg_val_size= (size_t)avg_val_size; - ms_setting.avg_key_size= (size_t)avg_key_size; -} /* ms_calc_avg_size */ - - -/** - * used to shuffle key and value distribution array to ensure - * (key, value) pair with different set. - * - * @param distr, pointer of distribution structure array - * @param length, length of the array - * - * @return always return EXIT_SUCCESS - */ -static int ms_shuffle_distr(ms_distr_t *distr, int length) -{ - int i, j; - int tmp_offset; - size_t tmp_size; - int64_t rnd; - - for (i= 0; i < length; i++) - { - rnd= random(); - j= (int)(rnd % (length - i)) + i; - - switch (rnd % 3) - { - case 0: - tmp_size= distr[j].key_size; - distr[j].key_size= distr[i].key_size; - distr[i].key_size= tmp_size; - break; - - case 1: - tmp_offset= distr[j].key_offset; - distr[j].key_offset= distr[i].key_offset; - distr[i].key_offset= tmp_offset; - break; - - case 2: - tmp_size= distr[j].value_size; - distr[j].value_size= distr[i].value_size; - distr[i].value_size= tmp_size; - break; - - default: - break; - } /* switch */ - } - - return EXIT_SUCCESS; -} /* ms_shuffle_distr */ - - -/** - * according to the key and value distribution, to build the - * (key, value) pair distribution. the (key, value) pair - * distribution array is global, each connection set or get - * object keeping this distribution, for the final result, we - * can reach the expected key and value distribution. - */ -static void ms_build_distr() -{ - int offset= 0; - int end= 0; - int key_cnt= 0; - int value_cnt= 0; - size_t average_len= 0; - size_t diff_len= 0; - size_t start_len= 0; - size_t end_len= 0; - int rnd= 0; - ms_distr_t *distr= NULL; - int units= (int)ms_setting.win_size / UNIT_ITEMS_COUNT; - - /* calculate average value size and key size */ - ms_calc_avg_size(); - - ms_setting.char_blk_size= RAND_CHAR_SIZE; - int key_scope_size= - (int)((ms_setting.char_blk_size - RESERVED_RAND_CHAR_SIZE) - / UNIT_ITEMS_COUNT); - - ms_setting.distr= (ms_distr_t *)malloc( - sizeof(ms_distr_t) * ms_setting.win_size); - if (ms_setting.distr == NULL) - { - fprintf(stderr, "Can't allocate distribution array."); - exit(1); - } - - /** - * character block is divided by how many different key - * size, each different key size has the same size character - * range. - */ - for (int m= 0; m < units; m++) - { - for (int i= 0; i < UNIT_ITEMS_COUNT; i++) - { - ms_setting.distr[m * UNIT_ITEMS_COUNT + i].key_offset= - ADDR_ALIGN(key_scope_size * i); - } - } - - /* initialize key size distribution */ - for (int m= 0; m < units; m++) - { - for (int j= 0; j < ms_setting.key_rng_cnt; j++) - { - key_cnt= (int)(UNIT_ITEMS_COUNT * ms_setting.key_distr[j].key_prop); - start_len= ms_setting.key_distr[j].start_len; - end_len= ms_setting.key_distr[j].end_len; - if ((start_len < MIN_KEY_SIZE) || (end_len < MIN_KEY_SIZE)) - { - fprintf(stderr, "key length must be greater than 16 bytes.\n"); - exit(1); - } - - if (! ms_setting.binary_prot_ - && ((start_len > MAX_KEY_SIZE) || (end_len > MAX_KEY_SIZE))) - { - fprintf(stderr, "key length must be less than 250 bytes.\n"); - exit(1); - } - - average_len= (start_len + end_len) / 2; - diff_len= (end_len - start_len) / 2; - for (int k= 0; k < key_cnt; k++) - { - if (offset >= (m + 1) * UNIT_ITEMS_COUNT) - { - break; - } - rnd= (int)random(); - if (k % 2 == 0) - { - ms_setting.distr[offset].key_size= - (diff_len == 0) ? average_len : - average_len + (size_t)rnd - % diff_len; - } - else - { - ms_setting.distr[offset].key_size= - (diff_len == 0) ? average_len : - average_len - (size_t)rnd - % diff_len; - } - offset++; - } - } - - if (offset < (m + 1) * UNIT_ITEMS_COUNT) - { - end= (m + 1) * UNIT_ITEMS_COUNT - offset; - for (int i= 0; i < end; i++) - { - ms_setting.distr[offset].key_size= ms_setting.avg_key_size; - offset++; - } - } - } - offset= 0; - - /* initialize value distribution */ - if (ms_setting.fixed_value_size != 0) - { - for (int i= 0; i < units * UNIT_ITEMS_COUNT; i++) - { - ms_setting.distr[i].value_size= ms_setting.fixed_value_size; - } - } - else - { - for (int m= 0; m < units; m++) - { - for (int j= 0; j < ms_setting.val_rng_cnt; j++) - { - value_cnt= - (int)(UNIT_ITEMS_COUNT * ms_setting.value_distr[j].value_prop); - start_len= ms_setting.value_distr[j].start_len; - end_len= ms_setting.value_distr[j].end_len; - if ((start_len <= 0) || (end_len <= 0)) - { - fprintf(stderr, "value length must be greater than 0 bytes.\n"); - exit(1); - } - - if ((start_len > MAX_VALUE_SIZE) || (end_len > MAX_VALUE_SIZE)) - { - fprintf(stderr, "key length must be less than or equal to 1M.\n"); - exit(1); - } - - average_len= (start_len + end_len) / 2; - diff_len= (end_len - start_len) / 2; - for (int k= 0; k < value_cnt; k++) - { - if (offset >= (m + 1) * UNIT_ITEMS_COUNT) - { - break; - } - rnd= (int)random(); - if (k % 2 == 0) - { - ms_setting.distr[offset].value_size= - (diff_len == 0) ? average_len : - average_len - + (size_t)rnd % diff_len; - } - else - { - ms_setting.distr[offset].value_size= - (diff_len == 0) ? average_len : - average_len - - (size_t)rnd % diff_len; - } - offset++; - } - } - - if (offset < (m + 1) * UNIT_ITEMS_COUNT) - { - end= (m + 1) * UNIT_ITEMS_COUNT - offset; - for (int i= 0; i < end; i++) - { - ms_setting.distr[offset++].value_size= ms_setting.avg_val_size; - } - } - } - } - - /* shuffle distribution */ - for (int i= 0; i < units; i++) - { - distr= &ms_setting.distr[i * UNIT_ITEMS_COUNT]; - for (int j= 0; j < 4; j++) - { - ms_shuffle_distr(distr, UNIT_ITEMS_COUNT); - } - } -} /* ms_build_distr */ - - -/** - * used to initialize the global character block. The character - * block is used to generate the suffix of the key and value. we - * only store a pointer in the character block for each key - * suffix or value string. It can save much memory to store key - * or value string. - */ -static void ms_init_random_block() -{ - char *ptr= NULL; - - assert(ms_setting.char_blk_size > 0); - - ms_setting.char_block= (char *)malloc(ms_setting.char_blk_size); - if (ms_setting.char_block == NULL) - { - fprintf(stderr, "Can't allocate global char block."); - exit(1); - } - ptr= ms_setting.char_block; - - for (int i= 0; (size_t)i < ms_setting.char_blk_size; i++) - { - *(ptr++)= ALPHANUMBERICS[random() % CHAR_COUNT]; - } -} /* ms_init_random_block */ - - -/** - * after initialization, call this function to output the main - * configuration user specified. - */ -static void ms_print_setting() -{ - fprintf(stdout, "servers : %s\n", ms_setting.srv_str); - fprintf(stdout, "threads count: %d\n", ms_setting.nthreads); - fprintf(stdout, "concurrency: %d\n", ms_setting.nconns); - if (ms_setting.run_time > 0) - { - fprintf(stdout, "run time: %ds\n", ms_setting.run_time); - } - else - { - fprintf(stdout, "execute number: %" PRId64 "\n", ms_setting.exec_num); - } - fprintf(stdout, "windows size: %" PRId64 "k\n", - (int64_t)(ms_setting.win_size / 1024)); - fprintf(stdout, "set proportion: set_prop=%.2f\n", - ms_setting.cmd_distr[CMD_SET].cmd_prop); - fprintf(stdout, "get proportion: get_prop=%.2f\n", - ms_setting.cmd_distr[CMD_GET].cmd_prop); - fflush(stdout); -} /* ms_print_setting */ - - -/** - * previous part of slap mode initialization of setting structure - */ -static void ms_setting_slapmode_init_pre() -{ - ms_setting.exec_num= DEFAULT_EXE_NUM; - ms_setting.verify_percent= DEFAULT_VERIFY_RATE; - ms_setting.exp_ver_per= DEFAULT_VERIFY_RATE; - ms_setting.overwrite_percent= DEFAULT_OVERWRITE_RATE; - ms_setting.mult_key_num= DEFAULT_DIV; - ms_setting.fixed_value_size= 0; - ms_setting.win_size= DEFAULT_WINDOW_SIZE; - ms_setting.udp= false; - ms_setting.reconnect= false; - ms_setting.verbose= false; - ms_setting.facebook_test= false; - ms_setting.binary_prot_= false; - ms_setting.stat_freq= 0; - ms_setting.srv_str= NULL; - ms_setting.cfg_file= NULL; - ms_setting.sock_per_conn= DEFAULT_SOCK_PER_CONN; - ms_setting.expected_tps= 0; - ms_setting.rep_write_srv= 0; -} /* ms_setting_slapmode_init_pre */ - - -/** - * previous part of initialization of setting structure - */ -void ms_setting_init_pre() -{ - memset(&ms_setting, 0, sizeof(ms_setting)); - - /* common initialize */ - ms_setting.ncpu= ms_get_cpu_count(); - ms_setting.nthreads= DEFAULT_THREADS_NUM; - ms_setting.nconns= DEFAULT_CONNS_NUM; - ms_setting.run_time= DEFAULT_RUN_TIME; - ms_setting.total_srv_cnt= MCD_SRVS_NUM_INIT; - ms_setting.servers= (ms_mcd_server_t *)malloc( - (size_t)ms_setting.total_srv_cnt - * sizeof(ms_mcd_server_t)); - if (ms_setting.servers == NULL) - { - fprintf(stderr, "Can't allocate servers structure.\n"); - exit(1); - } - - ms_setting_slapmode_init_pre(); -} /* ms_setting_init_pre */ - - -/** - * post part of slap mode initialization of setting structure - */ -static void ms_setting_slapmode_init_post() -{ - ms_setting.total_key_rng_cnt= KEY_RANGE_COUNT_INIT; - ms_setting.key_distr= - (ms_key_distr_t *)malloc((size_t)ms_setting.total_key_rng_cnt * sizeof(ms_key_distr_t)); - - if (ms_setting.key_distr == NULL) - { - fprintf(stderr, "Can't allocate key distribution structure.\n"); - exit(1); - } - - ms_setting.total_val_rng_cnt= VALUE_RANGE_COUNT_INIT; - - ms_setting.value_distr= - (ms_value_distr_t *)malloc((size_t)ms_setting.total_val_rng_cnt * sizeof( ms_value_distr_t)); - - if (ms_setting.value_distr == NULL) - { - fprintf(stderr, "Can't allocate value distribution structure.\n"); - exit(1); - } - - ms_parse_cfg_file(ms_setting.cfg_file); - - /* run time mode */ - if ((ms_setting.exec_num == 0) && (ms_setting.run_time != 0)) - { - ms_setting.exec_num= (int64_t)MAX_EXEC_NUM; - } - else - { - /* execute number mode */ - ms_setting.run_time= 0; - } - - if (ms_setting.rep_write_srv > 0) - { - /* for replication test, need enable reconnect feature */ - ms_setting.reconnect= true; - } - - if (ms_setting.facebook_test && (ms_setting.mult_key_num < 2)) - { - fprintf(stderr, "facebook test must work with multi-get, " - "please specify multi-get key number " - "with '--division' option.\n"); - exit(1); - } - - if (ms_setting.facebook_test && ms_setting.udp) - { - fprintf(stderr, "facebook test couldn't work with UDP.\n"); - exit(1); - } - - if (ms_setting.udp && (ms_setting.sock_per_conn > 1)) - { - fprintf(stderr, "UDP doesn't support multi-socks " - "in one connection structure.\n"); - exit(1); - } - - if ((ms_setting.rep_write_srv > 0) && (ms_setting.srv_cnt < 2)) - { - fprintf(stderr, "Please specify 2 servers at least for replication\n"); - exit(1); - } - - if ((ms_setting.rep_write_srv > 0) - && (ms_setting.srv_cnt < ms_setting.rep_write_srv)) - { - fprintf(stderr, "Servers to do replication writing " - "is larger than the total servers\n"); - exit(1); - } - - if (ms_setting.udp && (ms_setting.rep_write_srv > 0)) - { - fprintf(stderr, "UDP doesn't support replication.\n"); - exit(1); - } - - if (ms_setting.facebook_test && (ms_setting.rep_write_srv > 0)) - { - fprintf(stderr, "facebook test couldn't work with replication.\n"); - exit(1); - } - - ms_build_distr(); - - /* initialize global character block */ - ms_init_random_block(); - ms_print_setting(); -} /* ms_setting_slapmode_init_post */ - - -/** - * post part of initialization of setting structure - */ -void ms_setting_init_post() -{ - ms_get_serverlist(ms_setting.srv_str); - ms_setting_slapmode_init_post(); -} - - -/** - * clean up the global setting structure - */ -void ms_setting_cleanup() -{ - if (ms_setting.distr != NULL) - { - free(ms_setting.distr); - } - - if (ms_setting.char_block != NULL) - { - free(ms_setting.char_block); - } - - if (ms_setting.srv_str != NULL) - { - free(ms_setting.srv_str); - } - - if (ms_setting.cfg_file != NULL) - { - free(ms_setting.cfg_file); - } - - if (ms_setting.servers != NULL) - { - free(ms_setting.servers); - } - - if (ms_setting.key_distr != NULL) - { - free(ms_setting.key_distr); - } - - if (ms_setting.value_distr != NULL) - { - free(ms_setting.value_distr); - } -} /* ms_setting_cleanup */ diff --git a/src/bin/ms_setting.h b/src/bin/ms_setting.h deleted file mode 100644 index 9db956c9..00000000 --- a/src/bin/ms_setting.h +++ /dev/null @@ -1,181 +0,0 @@ -/* - * File: ms_setting.h - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ -#ifndef MS_SETTING_H -#define MS_SETTING_H - -#include "ms_memslap.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define MCD_SRVS_NUM_INIT 8 -#define MCD_HOST_LENGTH 64 -#define KEY_RANGE_COUNT_INIT 8 -#define VALUE_RANGE_COUNT_INIT 8 -#define PROP_ERROR 0.001 - -#define MIN_KEY_SIZE 16 -#define MAX_KEY_SIZE 250 -#define MAX_VALUE_SIZE (1024 * 1024) - -/* the content of the configuration file for memslap running without configuration file */ -#define DEFAULT_CONGIF_STR \ - "key\n" \ - "64 64 1\n" \ - "value\n" \ - "1024 1024 1\n" \ - "cmd\n" \ - "0 0.1\n" \ - "1 0.9" - -/* Used to parse the value length return by server and path string */ -typedef struct token_s -{ - char *value; - size_t length; -} token_t; - -#define MAX_TOKENS 10 - -/* server information */ -typedef struct mcd_server -{ - char srv_host_name[MCD_HOST_LENGTH]; /* host name of server */ - int srv_port; /* server port */ - - /* for calculating how long the server disconnects */ - ATOMIC uint32_t disconn_cnt; /* number of disconnections count */ - ATOMIC uint32_t reconn_cnt; /* number of reconnections count */ - struct timeval disconn_time; /* start time of disconnection */ - struct timeval reconn_time; /* end time of reconnection */ -} ms_mcd_server_t; - -/* information of an item distribution including key and value */ -typedef struct distr -{ - size_t key_size; /* size of key */ - int key_offset; /* offset of one key in character block */ - size_t value_size; /* size of value */ -} ms_distr_t; - -/* information of key distribution */ -typedef struct key_distr -{ - size_t start_len; /* start of the key length range */ - size_t end_len; /* end of the key length range */ - double key_prop; /* key proportion */ -} ms_key_distr_t; - -/* information of value distribution */ -typedef struct value_distr -{ - size_t start_len; /* start of the value length range */ - size_t end_len; /* end of the value length range */ - double value_prop; /* value proportion */ -} ms_value_distr_t; - -/* memcached command types */ -typedef enum cmd_type -{ - CMD_SET, - CMD_GET, - CMD_NULL -} ms_cmd_type_t; - -/* types in the configuration file */ -typedef enum conf_type -{ - CONF_KEY, - CONF_VALUE, - CONF_CMD, - CONF_NULL -} ms_conf_type_t; - -/* information of command distribution */ -typedef struct cmd_distr -{ - ms_cmd_type_t cmd_type; /* command type */ - double cmd_prop; /* proportion of the command */ -} ms_cmd_distr_t; - -/* global setting structure */ -typedef struct setting -{ - uint32_t ncpu; /* cpu count of this system */ - uint32_t nthreads; /* total thread count, must equal or less than cpu cores */ - uint32_t nconns; /* total conn count, must multiply by total thread count */ - int64_t exec_num; /* total execute number */ - int run_time; /* total run time */ - - uint32_t char_blk_size; /* global character block size */ - char *char_block; /* global character block with random character */ - ms_distr_t *distr; /* distribution from configure file */ - - char *srv_str; /* string includes servers information */ - char *cfg_file; /* configure file name */ - - ms_mcd_server_t *servers; /* servers array */ - uint32_t total_srv_cnt; /* total servers count of the servers array */ - uint32_t srv_cnt; /* servers count */ - - ms_key_distr_t *key_distr; /* array of key distribution */ - int total_key_rng_cnt; /* total key range count of the array */ - int key_rng_cnt; /* actual key range count */ - - ms_value_distr_t *value_distr; /* array of value distribution */ - int total_val_rng_cnt; /* total value range count of the array */ - int val_rng_cnt; /* actual value range count */ - - ms_cmd_distr_t cmd_distr[CMD_NULL]; /* total we have CMD_NULL commands */ - int cmd_used_count; /* supported command count */ - - size_t fixed_value_size; /* fixed value size */ - size_t avg_val_size; /* average value size */ - size_t avg_key_size; /* average value size */ - - double verify_percent; /* percent of data verification */ - double exp_ver_per; /* percent of data verification with expire time */ - double overwrite_percent; /* percent of overwrite */ - int mult_key_num; /* number of keys used by multi-get once */ - size_t win_size; /* item window size per connection */ - bool udp; /* whether or not use UDP */ - int stat_freq; /* statistic frequency second */ - bool reconnect; /* whether it reconnect when connection close */ - bool verbose; /* whether it outputs detailed information when verification */ - bool facebook_test; /* facebook test, TCP set and multi-get with UDP */ - uint32_t sock_per_conn; /* number of socks per connection structure */ - bool binary_prot_; /* whether it use binary protocol */ - int expected_tps; /* expected throughput */ - uint32_t rep_write_srv; /* which servers are used to do replication writing */ -} ms_setting_st; - -extern ms_setting_st ms_setting; - -/* previous part of initialization of setting structure */ -void ms_setting_init_pre(void); - - -/* post part of initialization of setting structure */ -void ms_setting_init_post(void); - - -/* clean up the global setting structure */ -void ms_setting_cleanup(void); - - -#define UNUSED_ARGUMENT(x) (void)x - -#ifdef __cplusplus -} -#endif - -#endif /* end of MS_SETTING_H */ diff --git a/src/bin/ms_sigsegv.c b/src/bin/ms_sigsegv.c deleted file mode 100644 index 303381f4..00000000 --- a/src/bin/ms_sigsegv.c +++ /dev/null @@ -1,128 +0,0 @@ -/* - * File: ms_sigsegv.c - * Author: Mingqiang Zhuang - * - * Created on March 15, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - * Rewrite of stack dump: - * Copyright (C) 2009 Sun Microsystems - * Author Trond Norbye - */ - -#include "mem_config.h" - -#include -#include -#include -#include -#include - -#include "ms_memslap.h" -#include "ms_setting.h" - -/* prototypes */ -int ms_setup_sigsegv(void); -int ms_setup_sigpipe(void); -int ms_setup_sigint(void); - - -/* signal seg reaches, this function will run */ -static void ms_signal_segv(int signum, siginfo_t *info, void *ptr) -{ - UNUSED_ARGUMENT(signum); - UNUSED_ARGUMENT(info); - UNUSED_ARGUMENT(ptr); - - pthread_mutex_lock(&ms_global.quit_mutex); - fprintf(stderr, "Segmentation fault occurred.\nStack trace:\n"); -#if 0 - pandora_print_callstack(stderr); -#endif - fprintf(stderr, "End of stack trace\n"); - pthread_mutex_unlock(&ms_global.quit_mutex); - abort(); -} - -/* signal int reaches, this function will run */ -static void ms_signal_int(int signum, siginfo_t *info, void *ptr) -{ - UNUSED_ARGUMENT(signum); - UNUSED_ARGUMENT(info); - UNUSED_ARGUMENT(ptr); - - pthread_mutex_lock(&ms_global.quit_mutex); - fprintf(stderr, "SIGINT handled.\n"); - pthread_mutex_unlock(&ms_global.quit_mutex); - exit(1); -} /* ms_signal_int */ - - -/** - * redirect signal seg - * - * @return if success, return EXIT_SUCCESS, else return -1 - */ -int ms_setup_sigsegv(void) -{ - struct sigaction action; - - memset(&action, 0, sizeof(action)); - action.sa_sigaction= ms_signal_segv; - action.sa_flags= SA_SIGINFO; - if (sigaction(SIGSEGV, &action, NULL) < 0) - { - perror("sigaction"); - return EXIT_SUCCESS; - } - - return -1; -} /* ms_setup_sigsegv */ - - -/** - * redirect signal pipe - * - * @return if success, return EXIT_SUCCESS, else return -1 - */ -int ms_setup_sigpipe(void) -{ - /* ignore the SIGPIPE signal */ - signal(SIGPIPE, SIG_IGN); - - return -1; -} /* ms_setup_sigpipe */ - - -/** - * redirect signal int - * - * @return if success, return EXIT_SUCCESS, else return -1 - */ -int ms_setup_sigint(void) -{ - struct sigaction action_3; - - memset(&action_3, 0, sizeof(action_3)); - action_3.sa_sigaction= ms_signal_int; - action_3.sa_flags= SA_SIGINFO; - if (sigaction(SIGINT, &action_3, NULL) < 0) - { - perror("sigaction"); - return EXIT_SUCCESS; - } - - return -1; -} /* ms_setup_sigint */ - - -#ifndef SIGSEGV_NO_AUTO_INIT -static void __attribute((constructor)) ms_init(void) -{ - ms_setup_sigsegv(); - ms_setup_sigpipe(); - ms_setup_sigint(); -} -#endif diff --git a/src/bin/ms_sigsegv.h b/src/bin/ms_sigsegv.h deleted file mode 100644 index 7990ff67..00000000 --- a/src/bin/ms_sigsegv.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * File: ms_sigsegv.h - * Author: Mingqiang Zhuang - * - * Created on March 15, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ -#ifndef MS_SIGSEGV_H -#define MS_SIGSEGV_H - -#ifdef __cplusplus -extern "C" { -#endif - -/* redirect signal seg */ -int ms_setup_sigsegv(void); - - -/* redirect signal pipe */ -int ms_setup_sigpipe(void); - - -/* redirect signal int */ -int ms_setup_sigint(void); - - -#ifdef __cplusplus -} -#endif - -#endif /* end of MS_SIGSEGV_H */ diff --git a/src/bin/ms_stats.c b/src/bin/ms_stats.c deleted file mode 100644 index 086fb3ef..00000000 --- a/src/bin/ms_stats.c +++ /dev/null @@ -1,303 +0,0 @@ -/* - * File: ms_stats.h - * Author: Mingqiang Zhuang - * - * Created on March 25, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ - -#include "mem_config.h" - -#include -#include "ms_stats.h" - -#define array_size(x) (sizeof(x) / sizeof((x)[0])) - -static int ms_local_log2(uint64_t value); -static uint64_t ms_get_events(ms_stat_t *stat); - - -/** - * get the index of local log2 array - * - * @param value - * - * @return return the index of local log2 array - */ -static int ms_local_log2(uint64_t value) -{ - int result= 0; - - while (result <= 63 && ((uint64_t)1 << result) < value) - { - result++; - } - - return result; -} /* ms_local_log2 */ - - -/** - * initialize statistic structure - * - * @param stat, pointer of the statistic structure - * @param name, name of the statistic - */ -void ms_init_stats(ms_stat_t *stat, const char *name) -{ - memset(stat, 0, sizeof(*stat)); - - stat->name= (char *)name; - stat->min_time= (uint64_t)-1; - stat->max_time= 0; - stat->period_min_time= (uint64_t)-1; - stat->period_max_time= 0; - stat->log_product= 0; - stat->total_time= 0; - stat->pre_total_time= 0; - stat->squares= 0; - stat->pre_squares= 0; - stat->pre_events= 0; - stat->pre_log_product= 0; - stat->get_miss= 0; - stat->pre_get_miss= 0; -} /* ms_init_stats */ - - -/** - * record one event - * - * @param stat, pointer of the statistic structure - * @param total_time, response time of the command - * @param get_miss, whether it gets miss - */ -void ms_record_event(ms_stat_t *stat, uint64_t total_time, int get_miss) -{ - stat->total_time+= total_time; - - if (total_time < stat->min_time) - { - stat->min_time= total_time; - } - - if (total_time > stat->max_time) - { - stat->max_time= total_time; - } - - if (total_time < stat->period_min_time) - { - stat->period_min_time= total_time; - } - - if (total_time > stat->period_max_time) - { - stat->period_max_time= total_time; - } - - if (get_miss) - { - stat->get_miss++; - } - - stat->dist[ms_local_log2(total_time)]++; - stat->squares+= (double)(total_time * total_time); - - if (total_time != 0) - { - stat->log_product+= log((double)total_time); - } -} /* ms_record_event */ - - -/** - * get the events count - * - * @param stat, pointer of the statistic structure - * - * @return total events recorded - */ -static uint64_t ms_get_events(ms_stat_t *stat) -{ - uint64_t events= 0; - - for (uint32_t i= 0; i < array_size(stat->dist); i++) - { - events+= stat->dist[i]; - } - - return events; -} /* ms_get_events */ - - -/** - * dump the statistics - * - * @param stat, pointer of the statistic structure - */ -void ms_dump_stats(ms_stat_t *stat) -{ - uint64_t events= 0; - int max_non_zero= 0; - int min_non_zero= 0; - double average= 0; - - for (uint32_t i= 0; i < array_size(stat->dist); i++) - { - events+= stat->dist[i]; - if (stat->dist[i] != 0) - { - max_non_zero= (int)i; - } - } - - if (events == 0) - { - return; - } - average= (double)(stat->total_time / events); - - printf("%s Statistics (%lld events)\n", stat->name, (long long)events); - printf(" Min: %8lld\n", (long long)stat->min_time); - printf(" Max: %8lld\n", (long long)stat->max_time); - printf(" Avg: %8lld\n", (long long)(stat->total_time / events)); - printf(" Geo: %8.2lf\n", exp(stat->log_product / (double)events)); - - if (events > 1) - { - printf(" Std: %8.2lf\n", - sqrt((stat->squares - (double)events * average - * average) / ((double)events - 1))); - } - printf(" Log2 Dist:"); - - for (int i= 0; i <= max_non_zero - 4; i+= 4) - { - if ((stat->dist[i + 0] != 0) - || (stat->dist[i + 1] != 0) - || (stat->dist[i + 2] != 0) - || (stat->dist[i + 3] != 0)) - { - min_non_zero= i; - break; - } - } - - for (int i= min_non_zero; i <= max_non_zero; i++) - { - if ((i % 4) == 0) - { - printf("\n %2d:", (int)i); - } - printf(" %6" PRIu64 , stat->dist[i]); - } - - printf("\n\n"); -} /* ms_dump_stats */ - - -/** - * dump the format statistics - * - * @param stat, pointer of the statistic structure - * @param run_time, the total run time - * @param freq, statistic frequency - * @param obj_size, average object size - */ -void ms_dump_format_stats(ms_stat_t *stat, - int run_time, - int freq, - int obj_size) -{ - uint64_t events= 0; - double global_average= 0; - uint64_t global_tps= 0; - double global_rate= 0; - double global_std= 0; - double global_log= 0; - - double period_average= 0; - uint64_t period_tps= 0; - double period_rate= 0; - double period_std= 0; - double period_log= 0; - - if ((events= ms_get_events(stat)) == 0) - { - return; - } - - global_average= (double)(stat->total_time / events); - global_tps= events / (uint64_t)run_time; - global_rate= (double)events * obj_size / 1024 / 1024 / run_time; - global_std= sqrt((stat->squares - (double)events * global_average - * global_average) / (double)(events - 1)); - global_log= exp(stat->log_product / (double)events); - - uint64_t diff_time= stat->total_time - stat->pre_total_time; - uint64_t diff_events= events - stat->pre_events; - if (diff_events >= 1) - { - period_average= (double)(diff_time / diff_events); - period_tps= diff_events / (uint64_t)freq; - period_rate= (double)diff_events * obj_size / 1024 / 1024 / freq; - double diff_squares= (double)stat->squares - (double)stat->pre_squares; - period_std= sqrt((diff_squares - (double)diff_events * period_average - * period_average) / (double)(diff_events - 1)); - double diff_log_product= stat->log_product - stat->pre_log_product; - period_log= exp(diff_log_product / (double)diff_events); - } - - printf("%s Statistics\n", stat->name); - printf("%-8s %-8s %-12s %-12s %-10s %-10s %-8s %-10s %-10s %-10s %-10s\n", - "Type", - "Time(s)", - "Ops", - "TPS(ops/s)", - "Net(M/s)", - "Get_miss", - "Min(us)", - "Max(us)", - "Avg(us)", - "Std_dev", - "Geo_dist"); - - printf( - "%-8s %-8d %-12llu %-12lld %-10.1f %-10lld %-8lld %-10lld %-10lld %-10.2f %.2f\n", - "Period", - freq, - (long long)diff_events, - (long long)period_tps, - global_rate, - (long long)(stat->get_miss - stat->pre_get_miss), - (long long)stat->period_min_time, - (long long)stat->period_max_time, - (long long)period_average, - period_std, - period_log); - - printf( - "%-8s %-8d %-12llu %-12lld %-10.1f %-10lld %-8lld %-10lld %-10lld %-10.2f %.2f\n\n", - "Global", - run_time, - (long long)events, - (long long)global_tps, - period_rate, - (long long)stat->get_miss, - (long long)stat->min_time, - (long long)stat->max_time, - (long long)global_average, - global_std, - global_log); - - stat->pre_events= events; - stat->pre_squares= (uint64_t)stat->squares; - stat->pre_total_time= stat->total_time; - stat->pre_log_product= stat->log_product; - stat->period_min_time= (uint64_t)-1; - stat->period_max_time= 0; - stat->pre_get_miss= stat->get_miss; -} /* ms_dump_format_stats */ diff --git a/src/bin/ms_stats.h b/src/bin/ms_stats.h deleted file mode 100644 index 5ac88b3f..00000000 --- a/src/bin/ms_stats.h +++ /dev/null @@ -1,69 +0,0 @@ -/* - * File: ms_stats.h - * Author: Mingqiang Zhuang - * - * Created on March 25, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ -#ifndef MS_STAT_H -#define MS_STAT_H - -#include -#include -#include -#include -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -/* statistic structure of response time */ -typedef struct -{ - char *name; - uint64_t total_time; - uint64_t min_time; - uint64_t max_time; - uint64_t get_miss; - uint64_t dist[65]; - double squares; - double log_product; - - uint64_t period_min_time; - uint64_t period_max_time; - uint64_t pre_get_miss; - uint64_t pre_events; - uint64_t pre_total_time; - uint64_t pre_squares; - double pre_log_product; -} ms_stat_t; - -/* initialize statistic */ -void ms_init_stats(ms_stat_t *stat, const char *name); - - -/* record one event */ -void ms_record_event(ms_stat_t *stat, uint64_t time, int get_miss); - - -/* dump the statistics */ -void ms_dump_stats(ms_stat_t *stat); - - -/* dump the format statistics */ -void ms_dump_format_stats(ms_stat_t *stat, - int run_time, - int freq, - int obj_size); - - -#ifdef __cplusplus -} -#endif - -#endif /* MS_STAT_H */ diff --git a/src/bin/ms_task.c b/src/bin/ms_task.c deleted file mode 100644 index f2cb8657..00000000 --- a/src/bin/ms_task.c +++ /dev/null @@ -1,1110 +0,0 @@ -/* - * File: ms_task.c - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ - -#include "mem_config.h" - -#if defined(HAVE_SYS_TIME_H) -# include -#endif - -#if defined(HAVE_TIME_H) -# include -#endif - -#include "ms_thread.h" -#include "ms_setting.h" -#include "ms_atomic.h" - -/* command distribution adjustment cycle */ -#define CMD_DISTR_ADJUST_CYCLE 1000 -#define DISADJUST_FACTOR 0.03 /** - * In one adjustment cycle, if undo set or get - * operations proportion is more than 3% , means - * there are too many new item or need more new - * item in the window. This factor shows it. - */ - -/* get item from task window */ -static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c); -static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c); -static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c); -static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c); - - -/* select next operation to do */ -static void ms_select_opt(ms_conn_t *c, ms_task_t *task); - - -/* set and get speed estimate for controlling and adjustment */ -static bool ms_is_set_too_fast(ms_task_t *task); -static bool ms_is_get_too_fast(ms_task_t *task); -static void ms_kick_out_item(ms_task_item_t *item); - - -/* miss rate adjustment */ -static bool ms_need_overwrite_item(ms_task_t *task); -static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task); - - -/* deal with data verification initialization */ -static void ms_task_data_verify_init(ms_task_t *task); -static void ms_task_expire_verify_init(ms_task_t *task); - - -/* select a new task to do */ -static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup); - - -/* run the selected task */ -static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item); -static void ms_update_stat_result(ms_conn_t *c); -static void ms_update_multi_get_result(ms_conn_t *c); -static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item); -static void ms_update_task_result(ms_conn_t *c); -static void ms_single_getset_task_sch(ms_conn_t *c); -static void ms_multi_getset_task_sch(ms_conn_t *c); -static void ms_send_signal(ms_sync_lock_t *sync_lock); -static void ms_warmup_server(ms_conn_t *c); -static int ms_run_getset_task(ms_conn_t *c); - - -/** - * used to get the current operation item(object) - * - * @param c, pointer of the concurrency - * - * @return ms_task_item_t*, current operating item - */ -static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c) -{ - return c->curr_task.item; -} - - -/** - * used to get the next item to do get operation - * - * @param c, pointer of the concurrency - * - * @return ms_task_item_t*, the pointer of the next item to do - * get operation - */ -static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c) -{ - ms_task_item_t *item= NULL; - - if (c->set_cursor <= 0) - { - /* the first item in the window */ - item= &c->item_win[0]; - } - else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size) - { - /* random get one item set before */ - item= &c->item_win[random() % (int64_t)c->set_cursor]; - } - else - { - /* random get one item from the window */ - item= &c->item_win[random() % c->win_size]; - } - - return item; -} /* ms_get_next_get_item */ - - -/** - * used to get the next item to do set operation - * - * @param c, pointer of the concurrency - * - * @return ms_task_item_t*, the pointer of the next item to do - * set operation - */ -static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c) -{ - /** - * when a set command successes, the cursor will plus 1. If set - * fails, the cursor doesn't change. it isn't necessary to - * increase the cursor here. - */ - return &c->item_win[(int64_t)c->set_cursor % c->win_size]; -} - - -/** - * If we need do overwrite, we could select a item set before. - * This function is used to get a item set before to do - * overwrite. - * - * @param c, pointer of the concurrency - * - * @return ms_task_item_t*, the pointer of the previous item of - * set operation - */ -static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c) -{ - return ms_get_next_get_item(c); -} /* ms_get_random_overwrite_item */ - -/** - * According to the proportion of operations(get or set), select - * an operation to do. - * - * @param c, pointer of the concurrency - * @param task, pointer of current task in the concurrency - */ -static void ms_select_opt(ms_conn_t *c, ms_task_t *task) -{ - double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop; - double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop; - - /* update cycle operation number if necessary */ - if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0)) - { - task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop); - task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop); - } - - /** - * According to operation distribution to choose doing which - * operation. If it can't set new object to sever, just change - * to do get operation. - */ - if ((set_prop > PROP_ERROR) - && ((double)task->get_opt * set_prop >= (double)task->set_opt - * get_prop)) - { - task->cmd= CMD_SET; - task->item= ms_get_next_set_item(c); - } - else - { - task->cmd= CMD_GET; - task->item= ms_get_next_get_item(c); - } -} /* ms_select_opt */ - - -/** - * used to judge whether the number of get operations done is - * more than expected number of get operations to do right now. - * - * @param task, pointer of current task in the concurrency - * - * @return bool, if get too fast, return true, else return false - */ -static bool ms_is_get_too_fast(ms_task_t *task) -{ - double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop; - double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop; - - /* no get operation */ - if (get_prop < PROP_ERROR) - { - return false; - } - - int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR)) - * task->cycle_undo_get; - - if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop) - && (task->cycle_undo_set > max_undo_set)) - { - return true; - } - - return false; -} /* ms_is_get_too_fast */ - - -/** - * used to judge whether the number of set operations done is - * more than expected number of set operations to do right now. - * - * @param task, pointer of current task in the concurrency - * - * @return bool, if set too fast, return true, else return false - */ -static bool ms_is_set_too_fast(ms_task_t *task) -{ - double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop; - double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop; - - /* no set operation */ - if (set_prop < PROP_ERROR) - { - return false; - } - - /* If it does set operation too fast, skip some */ - int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR)) - * (double)task->cycle_undo_set); - - if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop) - && (task->cycle_undo_get > max_undo_get)) - { - return true; - } - - return false; -} /* ms_is_set_too_fast */ - - -/** - * kick out the old item in the window, and add a new item to - * overwrite the old item. When we don't want to do overwrite - * object, and the current item to do set operation is an old - * item, we could kick out the old item and add a new item. Then - * we can ensure we set new object every time. - * - * @param item, pointer of task item which includes the object - * information - */ -static void ms_kick_out_item(ms_task_item_t *item) -{ - /* allocate a new item */ - item->key_prefix= ms_get_key_prefix(); - - item->key_suffix_offset++; - item->value_offset= INVALID_OFFSET; /* new item use invalid value offset */ - item->client_time= 0; -} /* ms_kick_out_item */ - - -/** - * used to judge whether we need overwrite object based on the - * options user specified - * - * @param task, pointer of current task in the concurrency - * - * @return bool, if need overwrite, return true, else return - * false - */ -static bool ms_need_overwrite_item(ms_task_t *task) -{ - ms_task_item_t *item= task->item; - - assert(item != NULL); - assert(task->cmd == CMD_SET); - - /** - * according to data overwrite percent to determine if do data - * overwrite. - */ - if (task->overwrite_set < (double)task->set_opt - * ms_setting.overwrite_percent) - { - return true; - } - - return false; -} /* ms_need_overwirte_item */ - - -/** - * used to adjust operation. the function must be called after - * select operation. the function change get operation to set - * operation, or set operation to get operation based on the - * current case. - * - * @param c, pointer of the concurrency - * @param task, pointer of current task in the concurrency - * - * @return bool, if success, return true, else return false - */ -static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task) -{ - ms_task_item_t *item= task->item; - - assert(item != NULL); - - if (task->cmd == CMD_SET) - { - /* If did set operation too fast, skip some */ - if (ms_is_set_too_fast(task)) - { - /* get the item instead */ - if (item->value_offset != INVALID_OFFSET) - { - task->cmd= CMD_GET; - return true; - } - } - - /* If the current item is not a new item, kick it out */ - if (item->value_offset != INVALID_OFFSET) - { - if (ms_need_overwrite_item(task)) - { - /* overwrite */ - task->overwrite_set++; - } - else - { - /* kick out the current item to do set operation */ - ms_kick_out_item(item); - } - } - else /* it's a new item */ - { - /* need overwrite */ - if (ms_need_overwrite_item(task)) - { - /** - * overwrite not use the item with current set cursor, revert - * set cursor. - */ - c->set_cursor--; - - item= ms_get_random_overwrite_item(c); - if (item->value_offset != INVALID_OFFSET) - { - task->item= item; - task->overwrite_set++; - } - else /* item is a new item */ - { - /* select the item to run, and cancel overwrite */ - task->item= item; - } - } - } - task->cmd= CMD_SET; - return true; - } - else - { - if (item->value_offset == INVALID_OFFSET) - { - task->cmd= CMD_SET; - return true; - } - - /** - * If It does get operation too fast, it will change the - * operation to set. - */ - if (ms_is_get_too_fast(task)) - { - /* don't kick out the first item in the window */ - if (! ms_is_set_too_fast(task)) - { - ms_kick_out_item(item); - task->cmd= CMD_SET; - return true; - } - else - { - return false; - } - } - - assert(item->value_offset != INVALID_OFFSET); - - task->cmd= CMD_GET; - return true; - } -} /* ms_adjust_opt */ - - -/** - * used to initialize the task which need verify data. - * - * @param task, pointer of current task in the concurrency - */ -static void ms_task_data_verify_init(ms_task_t *task) -{ - ms_task_item_t *item= task->item; - - assert(item != NULL); - assert(task->cmd == CMD_GET); - - /** - * according to data verification percent to determine if do - * data verification. - */ - if (task->verified_get < (double)task->get_opt - * ms_setting.verify_percent) - { - /** - * currently it doesn't do verify, just increase the counter, - * and do verification next proper get command - */ - if ((task->item->value_offset != INVALID_OFFSET) - && (item->exp_time == 0)) - { - task->verify= true; - task->finish_verify= false; - task->verified_get++; - } - } -} /* ms_task_data_verify_init */ - - -/** - * used to initialize the task which need verify expire time. - * - * @param task, pointer of current task in the concurrency - */ -static void ms_task_expire_verify_init(ms_task_t *task) -{ - ms_task_item_t *item= task->item; - - assert(item != NULL); - assert(task->cmd == CMD_GET); - assert(item->exp_time > 0); - - task->verify= true; - task->finish_verify= false; -} /* ms_task_expire_verify_init */ - - -/** - * used to get one task, the function initializes the task - * structure. - * - * @param c, pointer of the concurrency - * @param warmup, whether it need warmup - * - * @return ms_task_t*, pointer of current task in the - * concurrency - */ -static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup) -{ - ms_task_t *task= &c->curr_task; - - while (1) - { - task->verify= false; - task->finish_verify= true; - task->get_miss= true; - - if (warmup) - { - task->cmd= CMD_SET; - task->item= ms_get_next_set_item(c); - - return task; - } - - /* according to operation distribution to choose doing which operation */ - ms_select_opt(c, task); - - if (! ms_adjust_opt(c, task)) - { - continue; - } - - if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET)) - { - ms_task_data_verify_init(task); - } - - if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET) - && (task->item->exp_time > 0)) - { - ms_task_expire_verify_init(task); - } - - break; - } - - /** - * Only update get and delete counter, set counter will be - * updated after set operation successes. - */ - if (task->cmd == CMD_GET) - { - task->get_opt++; - task->cycle_undo_get--; - } - - return task; -} /* ms_get_task */ - - -/** - * send a signal to the main monitor thread - * - * @param sync_lock, pointer of the lock - */ -static void ms_send_signal(ms_sync_lock_t *sync_lock) -{ - pthread_mutex_lock(&sync_lock->lock); - sync_lock->count++; - pthread_cond_signal(&sync_lock->cond); - pthread_mutex_unlock(&sync_lock->lock); -} /* ms_send_signal */ - - -/** - * If user only want to do get operation, but there is no object - * in server , so we use this function to warmup the server, and - * set some objects to server. It runs at the beginning of task. - * - * @param c, pointer of the concurrency - */ -static void ms_warmup_server(ms_conn_t *c) -{ - ms_task_t *task; - ms_task_item_t *item; - - /** - * Extra one loop to get the last command returned state. - * Normally it gets the previous command returned state. - */ - if ((c->remain_warmup_num >= 0) - && (c->remain_warmup_num != c->warmup_num)) - { - item= ms_get_cur_opt_item(c); - /* only update the set command result state for data verification */ - if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED)) - { - item->value_offset= item->key_suffix_offset; - /* set success, update counter */ - c->set_cursor++; - } - else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED) - { - printf("key: %" PRIx64 " didn't set success\n", item->key_prefix); - } - } - - /* the last time don't run a task */ - if (c->remain_warmup_num-- > 0) - { - /* operate next task item */ - task= ms_get_task(c, true); - item= task->item; - ms_mcd_set(c, item); - } - - /** - * finish warming up server, wait all connects initialize - * complete. Then all connects can start do task at the same - * time. - */ - if (c->remain_warmup_num == -1) - { - ms_send_signal(&ms_global.warmup_lock); - c->remain_warmup_num--; /* never run the if branch */ - } -} /* ms_warmup_server */ - - -/** - * dispatch single get and set task - * - * @param c, pointer of the concurrency - */ -static void ms_single_getset_task_sch(ms_conn_t *c) -{ - ms_task_t *task; - ms_task_item_t *item; - - /* the last time don't run a task */ - if (c->remain_exec_num-- > 0) - { - task= ms_get_task(c, false); - item= task->item; - if (task->cmd == CMD_SET) - { - ms_mcd_set(c, item); - } - else if (task->cmd == CMD_GET) - { - assert(task->cmd == CMD_GET); - ms_mcd_get(c, item); - } - } -} /* ms_single_getset_task_sch */ - - -/** - * dispatch multi-get and set task - * - * @param c, pointer of the concurrency - */ -static void ms_multi_getset_task_sch(ms_conn_t *c) -{ - ms_task_t *task; - ms_mlget_task_item_t *mlget_item; - - while (1) - { - if (c->remain_exec_num-- > 0) - { - task= ms_get_task(c, false); - if (task->cmd == CMD_SET) /* just do it */ - { - ms_mcd_set(c, task->item); - break; - } - else - { - assert(task->cmd == CMD_GET); - mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num]; - mlget_item->item= task->item; - mlget_item->verify= task->verify; - mlget_item->finish_verify= task->finish_verify; - mlget_item->get_miss= task->get_miss; - c->mlget_task.mlget_num++; - - /* enough multi-get task items can be done */ - if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num) - || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) - { - ms_mcd_mlget(c); - break; - } - } - } - else - { - if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0)) - { - ms_mcd_mlget(c); - } - break; - } - } -} /* ms_multi_getset_task_sch */ - - -/** - * calculate the difference value of two time points - * - * @param start_time, the start time - * @param end_time, the end time - * - * @return uint64_t, the difference value between start_time and end_time in us - */ -int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time) -{ - int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec; - int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec; - - assert(endtime >= starttime); - - return endtime - starttime; -} /* ms_time_diff */ - - -/** - * after get the response from server for multi-get, the - * function update the state of the task and do data verify if - * necessary. - * - * @param c, pointer of the concurrency - */ -static void ms_update_multi_get_result(ms_conn_t *c) -{ - ms_mlget_task_item_t *mlget_item; - ms_task_item_t *item; - char *orignval= NULL; - char *orignkey= NULL; - - if (c == NULL) - { - return; - } - assert(c != NULL); - - for (int i= 0; i < c->mlget_task.mlget_num; i++) - { - mlget_item= &c->mlget_task.mlget_item[i]; - item= mlget_item->item; - orignval= &ms_setting.char_block[item->value_offset]; - orignkey= &ms_setting.char_block[item->key_suffix_offset]; - - /* update get miss counter */ - if (mlget_item->get_miss) - { - atomic_add_size(&ms_stats.get_misses, 1); - } - - /* get nothing from server for this task item */ - if (mlget_item->verify && ! mlget_item->finish_verify) - { - /* verify expire time if necessary */ - if (item->exp_time > 0) - { - struct timeval curr_time; - gettimeofday(&curr_time, NULL); - - /* object doesn't expire but can't get it now */ - if (curr_time.tv_sec - item->client_time - < item->exp_time - EXPIRE_TIME_ERROR) - { - atomic_add_size(&ms_stats.unexp_unget, 1); - - if (ms_setting.verbose) - { - char set_time[64]; - char cur_time[64]; - strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", - localtime(&item->client_time)); - strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", - localtime(&curr_time.tv_sec)); - fprintf(stderr, - "\n\t<%d expire time verification failed, object " - "doesn't expire but can't get it now\n" - "\tkey len: %d\n" - "\tkey: %" PRIx64 " %.*s\n" - "\tset time: %s current time: %s " - "diff time: %d expire time: %d\n" - "\texpected data len: %d\n" - "\texpected data: %.*s\n" - "\treceived data: \n", - c->sfd, - item->key_size, - item->key_prefix, - item->key_size - (int)KEY_PREFIX_SIZE, - orignkey, - set_time, - cur_time, - (int)(curr_time.tv_sec - item->client_time), - item->exp_time, - item->value_size, - item->value_size, - orignval); - fflush(stderr); - } - } - } - else - { - atomic_add_size(&ms_stats.vef_miss, 1); - - if (ms_setting.verbose) - { - fprintf(stderr, "\n<%d data verification failed\n" - "\tkey len: %d\n" - "\tkey: %" PRIx64 " %.*s\n" - "\texpected data len: %d\n" - "\texpected data: %.*s\n" - "\treceived data: \n", - c->sfd, item->key_size, item->key_prefix, - item->key_size - (int)KEY_PREFIX_SIZE, - orignkey, item->value_size, item->value_size, orignval); - fflush(stderr); - } - } - } - } - c->mlget_task.mlget_num= 0; - c->mlget_task.value_index= INVALID_OFFSET; -} /* ms_update_multi_get_result */ - - -/** - * after get the response from server for single get, the - * function update the state of the task and do data verify if - * necessary. - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - */ -static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item) -{ - char *orignval= NULL; - char *orignkey= NULL; - - if ((c == NULL) || (item == NULL)) - { - return; - } - assert(c != NULL); - assert(item != NULL); - - orignval= &ms_setting.char_block[item->value_offset]; - orignkey= &ms_setting.char_block[item->key_suffix_offset]; - - /* update get miss counter */ - if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss) - { - atomic_add_size(&ms_stats.get_misses, 1); - } - - /* get nothing from server for this task item */ - if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify - && ! c->curr_task.finish_verify) - { - /* verify expire time if necessary */ - if (item->exp_time > 0) - { - struct timeval curr_time; - gettimeofday(&curr_time, NULL); - - /* object doesn't expire but can't get it now */ - if (curr_time.tv_sec - item->client_time - < item->exp_time - EXPIRE_TIME_ERROR) - { - atomic_add_size(&ms_stats.unexp_unget, 1); - - if (ms_setting.verbose) - { - char set_time[64]; - char cur_time[64]; - strftime(set_time, 64, "%Y-%m-%d %H:%M:%S", - localtime(&item->client_time)); - strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S", - localtime(&curr_time.tv_sec)); - fprintf(stderr, - "\n\t<%d expire time verification failed, object " - "doesn't expire but can't get it now\n" - "\tkey len: %d\n" - "\tkey: %" PRIx64 " %.*s\n" - "\tset time: %s current time: %s " - "diff time: %d expire time: %d\n" - "\texpected data len: %d\n" - "\texpected data: %.*s\n" - "\treceived data: \n", - c->sfd, - item->key_size, - item->key_prefix, - item->key_size - (int)KEY_PREFIX_SIZE, - orignkey, - set_time, - cur_time, - (int)(curr_time.tv_sec - item->client_time), - item->exp_time, - item->value_size, - item->value_size, - orignval); - fflush(stderr); - } - } - } - else - { - atomic_add_size(&ms_stats.vef_miss, 1); - - if (ms_setting.verbose) - { - fprintf(stderr, "\n<%d data verification failed\n" - "\tkey len: %d\n" - "\tkey: %" PRIx64 " %.*s\n" - "\texpected data len: %d\n" - "\texpected data: %.*s\n" - "\treceived data: \n", - c->sfd, item->key_size, item->key_prefix, - item->key_size - (int)KEY_PREFIX_SIZE, - orignkey, item->value_size, item->value_size, orignval); - fflush(stderr); - } - } - } -} /* ms_update_single_get_result */ - - -/** - * after get the response from server for set the function - * update the state of the task and do data verify if necessary. - * - * @param c, pointer of the concurrency - * @param item, pointer of task item which includes the object - * information - */ -static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item) -{ - if ((c == NULL) || (item == NULL)) - { - return; - } - assert(c != NULL); - assert(item != NULL); - - if (c->precmd.cmd == CMD_SET) - { - switch (c->precmd.retstat) - { - case MCD_STORED: - if (item->value_offset == INVALID_OFFSET) - { - /* first set with the same offset of key suffix */ - item->value_offset= item->key_suffix_offset; - } - else - { - /* not first set, just increase the value offset */ - item->value_offset+= 1; - } - - /* set successes, update counter */ - c->set_cursor++; - c->curr_task.set_opt++; - c->curr_task.cycle_undo_set--; - break; - - case MCD_SERVER_ERROR: - default: - break; - } /* switch */ - } -} /* ms_update_set_result */ - - -/** - * update the response time result - * - * @param c, pointer of the concurrency - */ -static void ms_update_stat_result(ms_conn_t *c) -{ - bool get_miss= false; - - if (c == NULL) - { - return; - } - assert(c != NULL); - - gettimeofday(&c->end_time, NULL); - uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time); - - pthread_mutex_lock(&ms_statistic.stat_mutex); - - switch (c->precmd.cmd) - { - case CMD_SET: - ms_record_event(&ms_statistic.set_stat, time_diff, false); - break; - - case CMD_GET: - if (c->curr_task.get_miss) - { - get_miss= true; - } - ms_record_event(&ms_statistic.get_stat, time_diff, get_miss); - break; - - default: - break; - } /* switch */ - - ms_record_event(&ms_statistic.total_stat, time_diff, get_miss); - pthread_mutex_unlock(&ms_statistic.stat_mutex); -} /* ms_update_stat_result */ - - -/** - * after get response from server for the current operation, and - * before doing the next operation, update the state of the - * current operation. - * - * @param c, pointer of the concurrency - */ -static void ms_update_task_result(ms_conn_t *c) -{ - ms_task_item_t *item; - - if (c == NULL) - { - return; - } - assert(c != NULL); - - item= ms_get_cur_opt_item(c); - if (item == NULL) - { - return; - } - assert(item != NULL); - - ms_update_set_result(c, item); - - if ((ms_setting.stat_freq > 0) - && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET))) - { - ms_update_stat_result(c); - } - - /* update multi-get task item */ - if (((ms_setting.mult_key_num > 1) - && (c->mlget_task.mlget_num >= ms_setting.mult_key_num)) - || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0))) - { - ms_update_multi_get_result(c); - } - else - { - ms_update_single_get_result(c, item); - } -} /* ms_update_task_result */ - - -/** - * run get and set operation - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_run_getset_task(ms_conn_t *c) -{ - /** - * extra one loop to get the last command return state. get the - * last command return state. - */ - if ((c->remain_exec_num >= 0) - && (c->remain_exec_num != c->exec_num)) - { - ms_update_task_result(c); - } - - /* multi-get */ - if (ms_setting.mult_key_num > 1) - { - /* operate next task item */ - ms_multi_getset_task_sch(c); - } - else - { - /* operate next task item */ - ms_single_getset_task_sch(c); - } - - /* no task to do, exit */ - if ((c->remain_exec_num == -1) || ms_global.time_out) - { - return -1; - } - - return EXIT_SUCCESS; -} /* ms_run_getset_task */ - - -/** - * the state machine call the function to execute task. - * - * @param c, pointer of the concurrency - * - * @return int, if success, return EXIT_SUCCESS, else return -1 - */ -int ms_exec_task(struct conn *c) -{ - if (! ms_global.finish_warmup) - { - ms_warmup_server(c); - } - else - { - if (ms_run_getset_task(c) != 0) - { - return -1; - } - } - - return EXIT_SUCCESS; -} /* ms_exec_task */ diff --git a/src/bin/ms_task.h b/src/bin/ms_task.h deleted file mode 100644 index c4917d11..00000000 --- a/src/bin/ms_task.h +++ /dev/null @@ -1,94 +0,0 @@ -/* - * File: ms_task.h - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ -#ifndef MS_TASK_H -#define MS_TASK_H - -#include -#include -#if !defined(__cplusplus) -# include -#endif -#include - -#ifdef __cplusplus -extern "C" { -#endif - -#define UNIT_ITEMS_COUNT 1024 /* each window unit has 1024 items */ -#define KEY_PREFIX_SIZE (sizeof(uint64_t)) /* key prefix length: 8 bytes */ -#define INVALID_OFFSET (-1) /* invalid offset in the character table */ -#define FIXED_EXPIRE_TIME 60 /* default expire time is 60s */ -#define EXPIRE_TIME_ERROR 5 /* default expire time error is 5s */ - -/* information of a task item(object) */ -typedef struct task_item -{ - uint64_t key_prefix; /* prefix of the key, 8 bytes, binary */ - int key_size; /* key size */ - int key_suffix_offset; /* suffix offset in the global character table */ - - int value_size; /* data size */ - int value_offset; /* data offset in the global character table */ - - time_t client_time; /* the current client time */ - int exp_time; /* expire time */ -} ms_task_item_t; - -/* task item for multi-get */ -typedef struct mlget_task_item -{ - ms_task_item_t *item; /* task item */ - bool verify; /* whether verify data or not */ - bool finish_verify; /* whether finish data verify or not */ - bool get_miss; /* whether get miss or not */ -} ms_mlget_task_item_t; - -/* information of multi-get task */ -typedef struct mlget_task -{ - ms_mlget_task_item_t *mlget_item; /* multi-get task array */ - int mlget_num; /* how many tasks in mlget_task array */ - int value_index; /* the nth value received by the connect, for multi-get */ -} ms_mlget_task_t; - -/* structure used to store the state of the running task */ -typedef struct task -{ - int cmd; /* command name */ - bool verify; /* whether verify data or not */ - bool finish_verify; /* whether finish data verify or not */ - bool get_miss; /* whether get miss or not */ - ms_task_item_t *item; /* task item */ - - /* counter for command distribution adjustment */ - uint64_t get_opt; /* number of total get operations */ - uint64_t set_opt; /* number of total set operations, no including warmup set count */ - int cycle_undo_get; /* number of undo get in an adjustment cycle */ - int cycle_undo_set; /* number of undo set in an adjustment cycle */ - uint64_t verified_get; /* number of total verified get operations */ - uint64_t overwrite_set; /* number of total overwrite set operations */ -} ms_task_t; - -struct conn; - -/* the state machine call the function to execute task.*/ -int ms_exec_task(struct conn *c); - - -/* calculate the difference value of two time points */ -int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time); - - -#ifdef __cplusplus -} -#endif - -#endif /* end of MS_TASK_H */ diff --git a/src/bin/ms_thread.c b/src/bin/ms_thread.c deleted file mode 100644 index f9f52bfb..00000000 --- a/src/bin/ms_thread.c +++ /dev/null @@ -1,349 +0,0 @@ -/* - * File: ms_thread.c - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ - -#include "mem_config.h" - -#if defined(HAVE_SYS_TIME_H) -# include -#endif - -#if defined(HAVE_TIME_H) -# include -#endif - -#include "ms_thread.h" -#include "ms_setting.h" -#include "ms_atomic.h" - -/* global variable */ -pthread_key_t ms_thread_key; - -/* array of thread context structure, each thread has a thread context structure */ -static ms_thread_ctx_t *ms_thread_ctx; - -/* functions */ -static void ms_set_current_time(void); -static void ms_check_sock_timeout(void); -static void ms_clock_handler(const int fd, const short which, void *arg); -static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu); -static int ms_setup_thread(ms_thread_ctx_t *thread_ctx); -static void *ms_worker_libevent(void *arg); -static void ms_create_worker(void *(*func)(void *), void *arg); - - -/** - * time-sensitive callers can call it by hand with this, - * outside the normal ever-1-second timer - */ -static void ms_set_current_time() -{ - struct timeval timer; - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - - gettimeofday(&timer, NULL); - ms_thread->curr_time= (rel_time_t)timer.tv_sec; -} /* ms_set_current_time */ - - -/** - * used to check whether UDP of command are waiting timeout - * by the ever-1-second timer - */ -static void ms_check_sock_timeout(void) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - ms_conn_t *c= NULL; - int time_diff= 0; - - for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++) - { - c= &ms_thread->conn[i]; - - if (c->udp) - { - time_diff= (int)(ms_thread->curr_time - (rel_time_t)c->start_time.tv_sec); - - /* wait time out */ - if (time_diff > SOCK_WAIT_TIMEOUT) - { - /* calculate dropped packets count */ - if (c->recvpkt > 0) - { - atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt); - } - - atomic_add_size(&ms_stats.udp_timeout, 1); - ms_reset_conn(c, true); - } - } - } -} /* ms_check_sock_timeout */ - - -/* if disconnect, the ever-1-second timer will call this function to reconnect */ -static void ms_reconn_thread_socks(void) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++) - { - ms_reconn_socks(&ms_thread->conn[i]); - } -} /* ms_reconn_thread_socks */ - - -/** - * the handler of the ever-1-second timer - * - * @param fd, the descriptors of the socket - * @param which, event flags - * @param arg, argument - */ -static void ms_clock_handler(const int fd, const short which, void *arg) -{ - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - struct timeval t= - { - .tv_sec= 1, .tv_usec= 0 - }; - - UNUSED_ARGUMENT(fd); - UNUSED_ARGUMENT(which); - UNUSED_ARGUMENT(arg); - - ms_set_current_time(); - - if (ms_thread->initialized) - { - /* only delete the event if it's actually there. */ - evtimer_del(&ms_thread->clock_event); - ms_check_sock_timeout(); - } - else - { - ms_thread->initialized= true; - } - - ms_reconn_thread_socks(); - - evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0); - event_base_set(ms_thread->base, &ms_thread->clock_event); - evtimer_add(&ms_thread->clock_event, &t); -} /* ms_clock_handler */ - - -/** - * used to bind thread to CPU if the system supports - * - * @param cpu, cpu index - * - * @return if success, return EXIT_SUCCESS, else return -1 - */ -static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu) -{ - uint32_t ret= 0; - -#ifdef HAVE_CPU_SET_T - cpu_set_t cpu_set; - CPU_ZERO(&cpu_set); - CPU_SET(cpu, &cpu_set); - - if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1) - { - fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n"); - ret= 1; - } -#else - UNUSED_ARGUMENT(cpu); -#endif - - return ret; -} /* ms_set_thread_cpu_affinity */ - - -/** - * Set up a thread's information. - * - * @param thread_ctx, pointer of the thread context structure - * - * @return if success, return EXIT_SUCCESS, else return -1 - */ -static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) -{ - - ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1); - pthread_setspecific(ms_thread_key, (void *)ms_thread); - - ms_thread->thread_ctx= thread_ctx; - ms_thread->nactive_conn= thread_ctx->nconns; - ms_thread->initialized= false; - static ATOMIC uint32_t cnt= 0; - - gettimeofday(&ms_thread->startup_time, NULL); - - ms_thread->base= event_init(); - if (ms_thread->base == NULL) - { - if (atomic_add_32_nv(&cnt, 1) == 0) - { - fprintf(stderr, "Can't allocate event base.\n"); - } - - return -1; - } - - ms_thread->conn= - (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t)); - if (ms_thread->conn == NULL) - { - if (atomic_add_32_nv(&cnt, 1) == 0) - { - fprintf( - stderr, - "Can't allocate concurrency structure for thread descriptors."); - } - - return -1; - } - memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t)); - - for (uint32_t i= 0; i < thread_ctx->nconns; i++) - { - ms_thread->conn[i].conn_idx= i; - if (ms_setup_conn(&ms_thread->conn[i]) != 0) - { - /* only output this error once */ - if (atomic_add_32_nv(&cnt, 1) == 0) - { - fprintf(stderr, "Initializing connection failed.\n"); - } - - return -1; - } - } - - return EXIT_SUCCESS; -} /* ms_setup_thread */ - - -/** - * Worker thread: main event loop - * - * @param arg, the pointer of argument - * - * @return void* - */ -static void *ms_worker_libevent(void *arg) -{ - ms_thread_t *ms_thread= NULL; - ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg; - - /** - * If system has more than one cpu and supports set cpu - * affinity, try to bind each thread to a cpu core; - */ - if (ms_setting.ncpu > 1) - { - ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu); - } - - if (ms_setup_thread(thread_ctx) != 0) - { - exit(1); - } - - /* each thread with a timer */ - ms_clock_handler(0, 0, 0); - - pthread_mutex_lock(&ms_global.init_lock.lock); - ms_global.init_lock.count++; - pthread_cond_signal(&ms_global.init_lock.cond); - pthread_mutex_unlock(&ms_global.init_lock.lock); - - ms_thread= pthread_getspecific(ms_thread_key); - event_base_loop(ms_thread->base, 0); - - return NULL; -} /* ms_worker_libevent */ - - -/** - * Creates a worker thread. - * - * @param func, the callback function - * @param arg, the argument to pass to the callback function - */ -static void ms_create_worker(void *(*func)(void *), void *arg) -{ - pthread_t thread; - pthread_attr_t attr; - int ret; - - pthread_attr_init(&attr); - - if ((ret= pthread_create(&thread, &attr, func, arg)) != 0) - { - fprintf(stderr, "Can't create thread: %s.\n", strerror(ret)); - exit(1); - } -} /* ms_create_worker */ - - -/* initialize threads */ -void ms_thread_init() -{ - ms_thread_ctx= - (ms_thread_ctx_t *)malloc( - sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads); - if (ms_thread_ctx == NULL) - { - fprintf(stderr, "Can't allocate thread descriptors."); - exit(1); - } - - for (uint32_t i= 0; i < ms_setting.nthreads; i++) - { - ms_thread_ctx[i].thd_idx= i; - ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads; - - /** - * If only one server, all the connections in all threads - * connects the same server. For support multi-servers, simple - * distribute thread to server. - */ - ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt; - ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps - / (int)ms_setting.nconns; - ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num - / ms_setting.nconns; - } - - if (pthread_key_create(&ms_thread_key, NULL)) - { - fprintf(stderr, "Can't create pthread keys. Major malfunction!\n"); - exit(1); - } - /* Create threads after we've done all the epoll setup. */ - for (uint32_t i= 0; i < ms_setting.nthreads; i++) - { - ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]); - } -} /* ms_thread_init */ - - -/* cleanup some resource of threads when all the threads exit */ -void ms_thread_cleanup() -{ - if (ms_thread_ctx != NULL) - { - free(ms_thread_ctx); - } - pthread_key_delete(ms_thread_key); -} /* ms_thread_cleanup */ diff --git a/src/bin/ms_thread.h b/src/bin/ms_thread.h deleted file mode 100644 index 8cb98bde..00000000 --- a/src/bin/ms_thread.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * File: ms_thread.h - * Author: Mingqiang Zhuang - * - * Created on February 10, 2009 - * - * (c) Copyright 2009, Schooner Information Technology, Inc. - * http://www.schoonerinfotech.com/ - * - */ - -/** - * Asynchronous memslap has the similar implementation of - * multi-threads with memcached. Asynchronous memslap creates - * one or more self-governed threads; each thread is bound with - * one CPU core if the system supports setting CPU core - * affinity. And every thread has private variables. There is - * less communication or some shared resources among all the - * threads. It can improve the performance because there are - * fewer locks and competition. In addition, each thread has a - * libevent to manage the events of network. Each thread has one - * or more self-governed concurrencies; each concurrency has one - * or more socket connections. All the concurrencies don't - * communicate with each other even though they are in the same - * thread. - */ -#ifndef MS_THREAD_H -#define MS_THREAD_H - -#include -#include "ms_conn.h" - -#ifdef __cplusplus -extern "C" { -#endif - -/** Time relative to server start. Smaller than time_t on 64-bit systems. */ -typedef unsigned int rel_time_t; - -/* Used to store the context of each thread */ -typedef struct thread_ctx -{ - uint32_t thd_idx; /* the thread index */ - uint32_t nconns; /* how many connections included by the thread */ - uint32_t srv_idx; /* index of the thread */ - int tps_perconn; /* expected throughput per connection */ - int64_t exec_num_perconn; /* execute number per connection */ -} ms_thread_ctx_t; - -/* Used to store the private variables of each thread */ -typedef struct thread -{ - ms_conn_t *conn; /* conn array to store all the conn in the thread */ - uint32_t nactive_conn; /* how many connects are active */ - - ms_thread_ctx_t *thread_ctx; /* thread context from the caller */ - struct event_base *base; /* libevent handler created by this thread */ - - rel_time_t curr_time; /* current time */ - struct event clock_event; /* clock event to time each one second */ - bool initialized; /* whether clock_event has been initialized */ - - struct timeval startup_time; /* start time of the thread */ -} ms_thread_t; - -/* initialize threads */ -void ms_thread_init(void); - - -/* cleanup some resource of threads when all the threads exit */ -void ms_thread_cleanup(void); - - -#ifdef __cplusplus -} -#endif - -#endif /* end of MS_THREAD_H */ -- 2.30.2