set(ENV{INVALID_CONFIGURATION} 1)
endif()
-
check_decl(pipe2 unistd.h)
check_decl(SOCK_NONBLOCK sys/socket.h)
check_decl(SOCK_CLOEXEC sys/socket.h)
+check_header(execution)
file(GLOB_RECURSE TESTING_SRC RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *.cpp)
set(TESTING_ROOT ${CMAKE_CURRENT_BINARY_DIR})
#cmakedefine HAVE_PIPE2 1
#cmakedefine HAVE_SOCK_NONBLOCK 1
#cmakedefine HAVE_SOCK_CLOEXEC 1
+#cmakedefine HAVE_EXECUTION 1
#cmakedefine TESTING_ROOT "@TESTING_ROOT@"
#cmakedefine MEMCACHED_BINARY getenv_else("MEMCACHED_BINARY", "@MEMCACHED_BINARY@")
#include "Cluster.hpp"
#include "Retry.hpp"
+#include <algorithm>
#include <sys/wait.h>
-Cluster::Cluster(Server serv, uint16_t cnt)
+Cluster::Cluster(Server serv, size_t cnt)
: count{cnt}
, proto{move(serv)}
{
if (!count) {
count = 1;
}
- for (int i = 0; i < count; ++i) {
+ for (size_t i = 0; i < count; ++i) {
cluster.push_back(proto);
}
}
bool started = true;
for (auto &server : cluster) {
- if (!startServer(server)) {
+ if (!server.start()) {
started = false;
+ continue;
}
+ pids[server.getPid()] = &server;
}
return started;
}
bool Cluster::isStopped() {
- for (auto &server : cluster) {
- if (server.getPid() && !server.tryWait()) {
- return false;
- }
- }
- return true;
+ return none_of(
+#if HAVE_EXECUTION
+ execution::par,
+#endif
+ cluster.begin(), cluster.end(), [](Server &s) {
+ return s.getPid() && !s.tryWait();
+ });
}
-bool Cluster::isListening() {
- for (auto &server : cluster) {
- Retry server_is_listening{[&] {
- if (!server.isListening()) {
- // zombie?
- auto old_pid = server.getPid();
- if (server.tryWait()) {
- cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
- pids.erase(old_pid);
- // restart
- startServer(server);
- }
- if (!server.isListening()) {
- return false;
- }
- }
- return true;
- }};
- if (!server_is_listening()) {
- return false;
- }
- }
-
- return true;
+bool Cluster::isListening() const {
+ return all_of(
+#if HAVE_EXECUTION
+ execution::par,
+#endif
+ cluster.cbegin(), cluster.cend(), [](const Server &s) {
+ return s.isListening();
+ });
}
-bool Cluster::startServer(Server &server) {
- if (server.start().has_value()) {
+bool Cluster::ensureListening() {
+ if (!start()) {
+ return false;
+ }
+ auto listening = all_of(
+#if HAVE_EXECUTION
+ execution::par,
+#endif
+ cluster.begin(), cluster.end(), [](Server &s) {
+ return s.ensureListening();
+ });
+
+ pids.clear();
+ for (auto &server : cluster) {
pids[server.getPid()] = &server;
- return true;
}
- return false;
+
+ return listening;
}
void Cluster::wait() {
class Cluster {
public:
- explicit Cluster(Server serv, uint16_t cnt = 3);
+ explicit Cluster(Server serv, size_t cnt = 3);
~Cluster();
Cluster(const Cluster &c) = delete;
bool start();
void stop(bool graceful = false);
bool isStopped();
- bool isListening();
+ bool isListening() const;
+ bool ensureListening();
void wait();
bool restart();
Server proto;
vector<Server> cluster;
map<pid_t, Server *> pids;
-
- bool startServer(Server &server);
};
void MemcachedCluster::init() {
REQUIRE(cluster.start());
- while (!isListening()) {
+ while (!cluster.ensureListening()) {
cluster.restart();
}
const auto &victim = servers[random_num(0UL, servers.size() - 1)];
::kill(victim.getPid(), SIGKILL);
}
-
-bool MemcachedCluster::isListening() {
- return Retry{[this]() {return cluster.isListening();}}();
-}
void enableBuffering(bool enable = true);
void enableReplication();
void flush();
- bool isListening();
static MemcachedCluster mixed();
static MemcachedCluster network();
return ChildProc{pid, pipe};
}
-bool Server::isListening() {
+bool Server::isListening() const {
MemcachedPtr memc;
if (holds_alternative<string>(socket_or_port)) {
}
bool Server::ensureListening() {
+ if (!start()) {
+ return false;
+ }
return Retry{[this] {
again:
start();
if (!isListening()) {
- if (tryWait()){
+ auto old = pid;
+ if (tryWait()) {
+ cerr << "Collected zombie " << *this << "(old pid=" << old << ")\n";
goto again;
}
}
bool Server::wait(int flags) {
if (pid && pid == waitpid(pid, &status, flags)) {
- if (drain().length() && output != "Signal handled: Terminated.\n") {
+ if (drain().length() &&
+ output.rfind("Signal handled: Terminated", 0) != 0) {
cerr << "Output of " << *this << ":\n";
istringstream iss{output};
bool signal(int signo = SIGTERM);
bool check();
- bool isListening();
+ bool isListening() const;
bool ensureListening();
bool wait(int flags = 0);
}
static inline void setup_signals() {
- struct sigaction sa;
+ cout << " - Setting up signals ... ";
+ struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_flags = SA_NOCLDSTOP | SA_RESTART | SA_SIGINFO | SA_NODEFER;
sa.sa_sigaction = sigchld;
if (0 > sigaction(SIGCHLD, &sa, nullptr)) {
perror("sigaction(CHLD)");
+ } else {
+ cout << "done\n";
}
}
static inline void setup_asan(char **argv) {
const auto set = getenv("ASAN_OPTIONS");
+ cout << " - Setting up ASAN ... ";
+
if (!set || !*set) {
SET_ENV_EX(asan, "ASAN_OPTIONS", ASAN_OPTIONS, 0);
+ cout << "re-exec\n";
execvp(argv[0], argv);
perror("exec()");
}
+ cout << "done\n";
}
#else
# define setup_asan(a) (void) a
#if LIBMEMCACHED_WITH_SASL_SUPPORT
static inline void setup_sasl() {
+ cout << " - Setting up SASL ... ";
+
SET_ENV_EX(sasl_pwdb, "MEMCACHED_SASL_PWDB", LIBMEMCACHED_WITH_SASL_PWDB, 0);
SET_ENV_EX(sasl_conf, "SASL_CONF_PATH", LIBMEMCACHED_WITH_SASL_CONF, 0);
+
+ cout << "done\n";
}
#else
# define setup_sasl()
#endif
-int setup(int &, char ***argv) {
+static inline void setup_random() {
+ cout << " - Setting up RNG ... ";
+
random_setup();
+ cout << "done\n";
+}
+
+int setup(int &, char ***argv) {
+ cout << "Starting " << **argv << " (pid=" << getpid() << ") ... \n";
+
setup_signals();
+ setup_random();
setup_asan(*argv);
setup_sasl();
SECTION("starts and listens") {
REQUIRE(server.start().has_value());
-
- Retry server_is_listening{[&server] {
- return server.isListening();
- }};
- REQUIRE(server_is_listening());
+ REQUIRE(server.ensureListening());
+ REQUIRE(server.isListening());
+ REQUIRE(server.check());
SECTION("stops") {
SECTION("stopped") {
REQUIRE_FALSE(server.check());
+ REQUIRE_FALSE(server.isListening());
}
}
}
SECTION("starts and listens") {
REQUIRE(cluster.start());
-
- Retry cluster_is_listening{[&cluster] {
- return cluster.isListening();
- }};
- REQUIRE(cluster_is_listening());
+ REQUIRE(cluster.ensureListening());
+ REQUIRE(cluster.isListening());
SECTION("stops") {
SECTION("stopped") {
REQUIRE(cluster.isStopped());
+ REQUIRE_FALSE(cluster.isListening());
}
}
}
REQUIRE_RC(MEMCACHED_SERVER_TEMPORARILY_DISABLED, memcached_set(memc, S("foo"), nullptr, 0, 0, 0));
REQUIRE(test.cluster.start());
- REQUIRE(test.isListening());
+ REQUIRE(test.cluster.ensureListening());
Retry recovers{[memc]{
return MEMCACHED_SUCCESS == memcached_set(memc, S("foo"), nullptr, 0, 0, 0);