X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=test%2Flib%2FCluster.cpp;h=de05a79769663e4801866c2c1d4ace41864233b8;hb=24f797294aca130e70317003f4ee42540429ec63;hp=a4f8a3ae3c150fa4bbdcd3c40df313680941e53f;hpb=83cd358dea85840d916b6a69e8b0b20fe7a4e4e0;p=awesomized%2Flibmemcached diff --git a/test/lib/Cluster.cpp b/test/lib/Cluster.cpp index a4f8a3ae..de05a797 100644 --- a/test/lib/Cluster.cpp +++ b/test/lib/Cluster.cpp @@ -1,16 +1,22 @@ #include "Cluster.hpp" #include "Retry.hpp" +#include +#if HAVE_EXECUTION && HAVE_TBB +# include +#endif #include -Cluster::Cluster(Server serv, uint16_t cnt) +Cluster::Cluster(Server serv, size_t cnt) : count{cnt} , proto{move(serv)} { - if (count < 4) { - count = 4; + if (!count) { + count = 1; + } + for (size_t i = 0; i < count; ++i) { + cluster.push_back(proto); } - reset(); } Cluster::~Cluster() { @@ -22,81 +28,77 @@ const vector &Cluster::getServers() const { return cluster; } -void Cluster::reset() { - pids.clear(); - cluster.clear(); - for (int i = 0; i < count; ++i) { - cluster.push_back(proto); - } -} - bool Cluster::start() { bool started = true; for (auto &server : cluster) { - if (!startServer(server)) { + if (!server.start()) { started = false; + continue; } + pids[server.getPid()] = &server; } return started; } -void Cluster::stop() { +void Cluster::stop(bool graceful) { for (auto &server : cluster) { server.drain(); - // no cookies for memcached; TERM is just too slow - server.signal(SIGKILL); + if (graceful) { + server.stop(); + } else { + // no cookies for memcached; TERM is just too slow + server.signal(SIGKILL); + } } } bool Cluster::isStopped() { - for (auto &server : cluster) { - if (server.getPid() && !server.tryWait()) { - return false; - } - } - return true; + return none_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](Server &s) { + return s.getPid() && !s.tryWait(); + }); } -bool Cluster::isListening() { - for (auto &server : cluster) { - Retry server_is_listening{[&] { - if (!server.isListening()) { - // zombie? - auto old_pid = server.getPid(); - if (server.tryWait()) { - cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n"; - pids.erase(old_pid); - // restart - startServer(server); - } - if (!server.isListening()) { - return false; - } - } - return true; - }}; - if (!server_is_listening()) { - return false; - } - } - - return true; +bool Cluster::isListening() const { + return all_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](const Server &s) { + return s.isListening(); + }); } -bool Cluster::startServer(Server &server) { - if (server.start().has_value()) { +bool Cluster::ensureListening() { + if (!start()) { + return false; + } + auto listening = all_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](Server &s) { + return s.ensureListening(); + }); + + pids.clear(); + for (auto &server : cluster) { pids[server.getPid()] = &server; - return true; } - return false; + + return listening; } void Cluster::wait() { siginfo_t inf; while (!isStopped()) { +#if HAVE_WAITID_NOWAIT if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) { perror("Cluster::wait waitid()"); return; @@ -106,5 +108,14 @@ void Cluster::wait() { if (server != pids.end()) { server->second->wait(); } +#else + this_thread::sleep_for(100ms); +#endif } } + +bool Cluster::restart() { + stop(); + wait(); + return start(); +}