X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=test%2Flib%2FCluster.cpp;h=de05a79769663e4801866c2c1d4ace41864233b8;hb=844026ddb563959b8327bff40b976c41644fa2d2;hp=54de9398564e25c5cba32f328fe56299448b7bcd;hpb=fd66e60622e8e139753f62454cfdd5be662e39a0;p=awesomized%2Flibmemcached diff --git a/test/lib/Cluster.cpp b/test/lib/Cluster.cpp index 54de9398..de05a797 100644 --- a/test/lib/Cluster.cpp +++ b/test/lib/Cluster.cpp @@ -1,30 +1,24 @@ #include "Cluster.hpp" #include "Retry.hpp" +#include +#if HAVE_EXECUTION && HAVE_TBB +# include +#endif #include -Cluster::Cluster(Server serv, uint16_t cnt) +Cluster::Cluster(Server serv, size_t cnt) : count{cnt} , proto{move(serv)} { - if (count < 4) { - count = stoi(getenv_else("MEMCACHED_CLUSTER", "4")); - } if (!count) { count = 1; } - for (int i = 0; i < count; ++i) { + for (size_t i = 0; i < count; ++i) { cluster.push_back(proto); } } -Cluster::Cluster(vector servers) -: count{servers.size()} -, cluster{move(servers)} -{ - -} - Cluster::~Cluster() { stop(); wait(); @@ -38,9 +32,11 @@ bool Cluster::start() { bool started = true; for (auto &server : cluster) { - if (!startServer(server)) { + if (!server.start()) { started = false; + continue; } + pids[server.getPid()] = &server; } return started; @@ -59,52 +55,50 @@ void Cluster::stop(bool graceful) { } bool Cluster::isStopped() { - for (auto &server : cluster) { - if (server.getPid() && !server.tryWait()) { - return false; - } - } - return true; + return none_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](Server &s) { + return s.getPid() && !s.tryWait(); + }); } -bool Cluster::isListening() { - for (auto &server : cluster) { - Retry server_is_listening{[&] { - if (!server.isListening()) { - // zombie? - auto old_pid = server.getPid(); - if (server.tryWait()) { - cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n"; - pids.erase(old_pid); - // restart - startServer(server); - } - if (!server.isListening()) { - return false; - } - } - return true; - }}; - if (!server_is_listening()) { - return false; - } - } - - return true; +bool Cluster::isListening() const { + return all_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](const Server &s) { + return s.isListening(); + }); } -bool Cluster::startServer(Server &server) { - if (server.start().has_value()) { +bool Cluster::ensureListening() { + if (!start()) { + return false; + } + auto listening = all_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](Server &s) { + return s.ensureListening(); + }); + + pids.clear(); + for (auto &server : cluster) { pids[server.getPid()] = &server; - return true; } - return false; + + return listening; } void Cluster::wait() { siginfo_t inf; while (!isStopped()) { +#if HAVE_WAITID_NOWAIT if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) { perror("Cluster::wait waitid()"); return; @@ -114,6 +108,9 @@ void Cluster::wait() { if (server != pids.end()) { server->second->wait(); } +#else + this_thread::sleep_for(100ms); +#endif } }