#include "Cluster.hpp"
+#include "Retry.hpp"
#include <sys/wait.h>
+Cluster::Cluster(Server serv, uint16_t cnt)
+: count{cnt}
+, proto{move(serv)}
+{
+ if (!cnt) {
+ count = thread::hardware_concurrency()/2 ?: 4;
+ }
+ reset();
+}
+
+Cluster::~Cluster() {
+ stop();
+ wait();
+}
+
+const vector<Server> &Cluster::getServers() const {
+ return cluster;
+}
+
void Cluster::reset() {
pids.clear();
cluster.clear();
bool started = true;
for (auto &server : cluster) {
- auto pid = server.start();
- if (started &= pid.has_value()) {
- pids[*pid] = &server;
+ if (!startServer(server)) {
+ started = false;
}
}
void Cluster::stop() {
for (auto &server : cluster) {
- server.stop();
+ server.drain();
+ // no cookies for memcached; TERM is just too slow
+ server.signal(SIGKILL);
}
}
return true;
}
-bool Cluster::isListening(int max_timeout) {
- vector<WaitForConn::conn_t> conns;
-
+bool Cluster::isListening() {
for (auto &server : cluster) {
- auto conn = server.createSocket();
- if (!conn) {
+ Retry server_is_listening{[&] {
+ if (!server.isListening()) {
+ // zombie?
+ auto old_pid = server.getPid();
+ if (server.tryWait()) {
+ cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
+ pids.erase(old_pid);
+ // restart
+ startServer(server);
+ }
+ if (!server.isListening()) {
+ return false;
+ }
+ }
+ return true;
+ }};
+ if (!server_is_listening()) {
return false;
}
- conns.emplace_back(conn.value());
}
- WaitForConn wait_for_conn{
- std::move(conns),
- Poll{POLLOUT, 2, max_timeout}
- };
- return wait_for_conn();
+ return true;
+}
+
+bool Cluster::startServer(Server &server) {
+ if (server.start().has_value()) {
+ pids[server.getPid()] = &server;
+ return true;
+ }
+ return false;
}
void Cluster::wait() {
return;
}
- if (pids[inf.si_pid]) {
- pids[inf.si_pid]->wait();
+ auto server = pids.find(inf.si_pid);
+ if (server != pids.end()) {
+ server->second->wait();
}
}
}