#include "Cluster.hpp"
#include "Retry.hpp"
+#include <algorithm>
#include <sys/wait.h>
-Cluster::Cluster(Server serv, uint16_t cnt)
+Cluster::Cluster(Server serv, size_t cnt)
: count{cnt}
, proto{move(serv)}
{
if (!count) {
count = 1;
}
- for (int i = 0; i < count; ++i) {
+ for (size_t i = 0; i < count; ++i) {
cluster.push_back(proto);
}
}
bool started = true;
for (auto &server : cluster) {
- if (!startServer(server)) {
+ if (!server.start()) {
started = false;
+ continue;
}
+ pids[server.getPid()] = &server;
}
return started;
}
bool Cluster::isStopped() {
- for (auto &server : cluster) {
- if (server.getPid() && !server.tryWait()) {
- return false;
- }
- }
- return true;
+ return none_of(
+#if HAVE_EXECUTION
+ execution::par,
+#endif
+ cluster.begin(), cluster.end(), [](Server &s) {
+ return s.getPid() && !s.tryWait();
+ });
}
-bool Cluster::isListening() {
- for (auto &server : cluster) {
- Retry server_is_listening{[&] {
- if (!server.isListening()) {
- // zombie?
- auto old_pid = server.getPid();
- if (server.tryWait()) {
- cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
- pids.erase(old_pid);
- // restart
- startServer(server);
- }
- if (!server.isListening()) {
- return false;
- }
- }
- return true;
- }};
- if (!server_is_listening()) {
- return false;
- }
- }
-
- return true;
+bool Cluster::isListening() const {
+ return all_of(
+#if HAVE_EXECUTION
+ execution::par,
+#endif
+ cluster.cbegin(), cluster.cend(), [](const Server &s) {
+ return s.isListening();
+ });
}
-bool Cluster::startServer(Server &server) {
- if (server.start().has_value()) {
+bool Cluster::ensureListening() {
+ if (!start()) {
+ return false;
+ }
+ auto listening = all_of(
+#if HAVE_EXECUTION
+ execution::par,
+#endif
+ cluster.begin(), cluster.end(), [](Server &s) {
+ return s.ensureListening();
+ });
+
+ pids.clear();
+ for (auto &server : cluster) {
pids[server.getPid()] = &server;
- return true;
}
- return false;
+
+ return listening;
}
void Cluster::wait() {