#include "Cluster.hpp"
#include "Retry.hpp"
+#include <algorithm>
+#if HAVE_EXECUTION && HAVE_TBB
+# include <execution>
+#endif
#include <sys/wait.h>
-Cluster::Cluster(Server serv, uint16_t cnt)
+Cluster::Cluster(Server serv, size_t cnt)
: count{cnt}
, proto{move(serv)}
{
- if (count < 4) {
- count = stoi(getenv_else("MEMCACHED_CLUSTER", "4"));
- }
if (!count) {
count = 1;
}
- reset();
+ for (size_t i = 0; i < count; ++i) {
+ cluster.push_back(proto);
+ }
}
Cluster::~Cluster() {
return cluster;
}
-void Cluster::reset() {
- pids.clear();
- cluster.clear();
- for (int i = 0; i < count; ++i) {
- cluster.push_back(proto);
- }
-}
-
bool Cluster::start() {
bool started = true;
for (auto &server : cluster) {
- if (!startServer(server)) {
+ if (!server.start()) {
started = false;
+ continue;
}
+ pids[server.getPid()] = &server;
}
return started;
}
-void Cluster::stop() {
+void Cluster::stop(bool graceful) {
for (auto &server : cluster) {
server.drain();
- // no cookies for memcached; TERM is just too slow
- server.signal(SIGKILL);
+ if (graceful) {
+ server.stop();
+ } else {
+ // no cookies for memcached; TERM is just too slow
+ server.signal(SIGKILL);
+ }
}
}
bool Cluster::isStopped() {
- for (auto &server : cluster) {
- if (server.getPid() && !server.tryWait()) {
- return false;
- }
- }
- return true;
+ return none_of(
+#if HAVE_EXECUTION && HAVE_TBB
+ execution::par,
+#endif
+ cluster.begin(), cluster.end(), [](Server &s) {
+ return s.getPid() && !s.tryWait();
+ });
}
-bool Cluster::isListening() {
- for (auto &server : cluster) {
- Retry server_is_listening{[&] {
- if (!server.isListening()) {
- // zombie?
- auto old_pid = server.getPid();
- if (server.tryWait()) {
- cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
- pids.erase(old_pid);
- // restart
- startServer(server);
- }
- if (!server.isListening()) {
- return false;
- }
- }
- return true;
- }};
- if (!server_is_listening()) {
- return false;
- }
- }
-
- return true;
+bool Cluster::isListening() const {
+ return all_of(
+#if HAVE_EXECUTION && HAVE_TBB
+ execution::par,
+#endif
+ cluster.begin(), cluster.end(), [](const Server &s) {
+ return s.isListening();
+ });
}
-bool Cluster::startServer(Server &server) {
- if (server.start().has_value()) {
+bool Cluster::ensureListening() {
+ if (!start()) {
+ return false;
+ }
+ auto listening = all_of(
+#if HAVE_EXECUTION && HAVE_TBB
+ execution::par,
+#endif
+ cluster.begin(), cluster.end(), [](Server &s) {
+ return s.ensureListening();
+ });
+
+ pids.clear();
+ for (auto &server : cluster) {
pids[server.getPid()] = &server;
- return true;
}
- return false;
+
+ return listening;
}
void Cluster::wait() {
siginfo_t inf;
while (!isStopped()) {
+#if HAVE_WAITID_NOWAIT
if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) {
perror("Cluster::wait waitid()");
return;
if (server != pids.end()) {
server->second->wait();
}
+#else
+ this_thread::sleep_for(100ms);
+#endif
}
}
+
+bool Cluster::restart() {
+ stop();
+ wait();
+ return start();
+}