X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;ds=sidebyside;f=testing%2Flib%2FCluster.cpp;h=c3c625cf04eb225069a08d4b5f732ea68132eade;hb=cbaa9f46519f9b085e28db5f44247d23ca7ec5a4;hp=8a60149568d36832909a8241f920c91f91bdcf8b;hpb=5e54d17fc535a901f384fcbf2cfd420f3a2e7a81;p=awesomized%2Flibmemcached diff --git a/testing/lib/Cluster.cpp b/testing/lib/Cluster.cpp index 8a601495..c3c625cf 100644 --- a/testing/lib/Cluster.cpp +++ b/testing/lib/Cluster.cpp @@ -1,7 +1,27 @@ #include "Cluster.hpp" +#include "Retry.hpp" #include +Cluster::Cluster(Server serv, uint16_t cnt) +: count{cnt} +, proto{move(serv)} +{ + if (!cnt) { + count = thread::hardware_concurrency()/2 ?: 4; + } + reset(); +} + +Cluster::~Cluster() { + stop(); + wait(); +} + +const vector &Cluster::getServers() const { + return cluster; +} + void Cluster::reset() { pids.clear(); cluster.clear(); @@ -14,9 +34,8 @@ bool Cluster::start() { bool started = true; for (auto &server : cluster) { - auto pid = server.start(); - if (started &= pid.has_value()) { - pids[*pid] = &server; + if (!startServer(server)) { + started = false; } } @@ -25,7 +44,9 @@ bool Cluster::start() { void Cluster::stop() { for (auto &server : cluster) { - server.stop(); + server.drain(); + // no cookies for memcached; TERM is just too slow + server.signal(SIGKILL); } } @@ -38,22 +59,38 @@ bool Cluster::isStopped() { return true; } -bool Cluster::isListening(int max_timeout) { - vector conns; - +bool Cluster::isListening() { for (auto &server : cluster) { - auto conn = server.createSocket(); - if (!conn) { + Retry server_is_listening{[&] { + if (!server.isListening()) { + // zombie? + auto old_pid = server.getPid(); + if (server.tryWait()) { + cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n"; + pids.erase(old_pid); + // restart + startServer(server); + } + if (!server.isListening()) { + return false; + } + } + return true; + }}; + if (!server_is_listening()) { return false; } - conns.emplace_back(conn.value()); } - WaitForConn wait_for_conn{ - std::move(conns), - Poll{POLLOUT, 2, max_timeout} - }; - return wait_for_conn(); + return true; +} + +bool Cluster::startServer(Server &server) { + if (server.start().has_value()) { + pids[server.getPid()] = &server; + return true; + } + return false; } void Cluster::wait() { @@ -65,8 +102,9 @@ void Cluster::wait() { return; } - if (pids[inf.si_pid]) { - pids[inf.si_pid]->wait(); + auto server = pids.find(inf.si_pid); + if (server != pids.end()) { + server->second->wait(); } } }