X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=test%2Flib%2FCluster.cpp;h=de05a79769663e4801866c2c1d4ace41864233b8;hb=4debceb95ebab23b83c7ebcf153bb321d16b4d52;hp=3ecc6e9abea2d629009e3a54cf05e610bdef2e7c;hpb=5a18a901fcfc7b5bef004eb9110a2386fe87224a;p=awesomized%2Flibmemcached diff --git a/test/lib/Cluster.cpp b/test/lib/Cluster.cpp index 3ecc6e9a..de05a797 100644 --- a/test/lib/Cluster.cpp +++ b/test/lib/Cluster.cpp @@ -1,16 +1,20 @@ #include "Cluster.hpp" #include "Retry.hpp" +#include +#if HAVE_EXECUTION && HAVE_TBB +# include +#endif #include -Cluster::Cluster(Server serv, uint16_t cnt) +Cluster::Cluster(Server serv, size_t cnt) : count{cnt} , proto{move(serv)} { if (!count) { count = 1; } - for (int i = 0; i < count; ++i) { + for (size_t i = 0; i < count; ++i) { cluster.push_back(proto); } } @@ -28,9 +32,11 @@ bool Cluster::start() { bool started = true; for (auto &server : cluster) { - if (!startServer(server)) { + if (!server.start()) { started = false; + continue; } + pids[server.getPid()] = &server; } return started; @@ -49,52 +55,50 @@ void Cluster::stop(bool graceful) { } bool Cluster::isStopped() { - for (auto &server : cluster) { - if (server.getPid() && !server.tryWait()) { - return false; - } - } - return true; + return none_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](Server &s) { + return s.getPid() && !s.tryWait(); + }); } -bool Cluster::isListening() { - for (auto &server : cluster) { - Retry server_is_listening{[&] { - if (!server.isListening()) { - // zombie? - auto old_pid = server.getPid(); - if (server.tryWait()) { - cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n"; - pids.erase(old_pid); - // restart - startServer(server); - } - if (!server.isListening()) { - return false; - } - } - return true; - }}; - if (!server_is_listening()) { - return false; - } - } - - return true; +bool Cluster::isListening() const { + return all_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](const Server &s) { + return s.isListening(); + }); } -bool Cluster::startServer(Server &server) { - if (server.start().has_value()) { +bool Cluster::ensureListening() { + if (!start()) { + return false; + } + auto listening = all_of( +#if HAVE_EXECUTION && HAVE_TBB + execution::par, +#endif + cluster.begin(), cluster.end(), [](Server &s) { + return s.ensureListening(); + }); + + pids.clear(); + for (auto &server : cluster) { pids[server.getPid()] = &server; - return true; } - return false; + + return listening; } void Cluster::wait() { siginfo_t inf; while (!isStopped()) { +#if HAVE_WAITID_NOWAIT if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) { perror("Cluster::wait waitid()"); return; @@ -104,6 +108,9 @@ void Cluster::wait() { if (server != pids.end()) { server->second->wait(); } +#else + this_thread::sleep_for(100ms); +#endif } }