gh-actions: fix workflow
[awesomized/libmemcached] / test / lib / Cluster.cpp
index ee7fcbf24f409596fd5f6d404970f7abeea9002e..de05a79769663e4801866c2c1d4ace41864233b8 100644 (file)
@@ -1,19 +1,22 @@
 #include "Cluster.hpp"
 #include "Retry.hpp"
 
+#include <algorithm>
+#if HAVE_EXECUTION && HAVE_TBB
+#  include <execution>
+#endif
 #include <sys/wait.h>
 
-Cluster::Cluster(Server serv, uint16_t cnt)
+Cluster::Cluster(Server serv, size_t cnt)
 : count{cnt}
 , proto{move(serv)}
 {
-  if (!cnt) {
-    count = thread::hardware_concurrency()/2;
-    if (count < 4) {
-      count = 4;
-    }
+  if (!count) {
+    count = 1;
+  }
+  for (size_t i = 0; i < count; ++i) {
+    cluster.push_back(proto);
   }
-  reset();
 }
 
 Cluster::~Cluster() {
@@ -25,81 +28,77 @@ const vector<Server> &Cluster::getServers() const {
   return cluster;
 }
 
-void Cluster::reset() {
-  pids.clear();
-  cluster.clear();
-  for (int i = 0; i < count; ++i) {
-    cluster.push_back(proto);
-  }
-}
-
 bool Cluster::start() {
   bool started = true;
 
   for (auto &server : cluster) {
-    if (!startServer(server)) {
+    if (!server.start()) {
       started = false;
+      continue;
     }
+    pids[server.getPid()] = &server;
   }
 
   return started;
 }
 
-void Cluster::stop() {
+void Cluster::stop(bool graceful) {
   for (auto &server : cluster) {
     server.drain();
-    // no cookies for memcached; TERM is just too slow
-    server.signal(SIGKILL);
+    if (graceful) {
+      server.stop();
+    } else {
+      // no cookies for memcached; TERM is just too slow
+      server.signal(SIGKILL);
+    }
   }
 }
 
 bool Cluster::isStopped() {
-  for (auto &server : cluster) {
-    if (server.getPid() && !server.tryWait()) {
-      return false;
-    }
-  }
-  return true;
+  return none_of(
+#if HAVE_EXECUTION && HAVE_TBB
+    execution::par,
+#endif
+      cluster.begin(), cluster.end(), [](Server &s) {
+    return s.getPid() && !s.tryWait();
+  });
 }
 
-bool Cluster::isListening() {
-  for (auto &server : cluster) {
-    Retry server_is_listening{[&] {
-      if (!server.isListening()) {
-        // zombie?
-        auto old_pid = server.getPid();
-        if (server.tryWait()) {
-          cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
-          pids.erase(old_pid);
-          // restart
-          startServer(server);
-        }
-        if (!server.isListening()) {
-          return false;
-        }
-      }
-      return true;
-    }};
-    if (!server_is_listening()) {
-      return false;
-    }
-  }
-
-  return true;
+bool Cluster::isListening() const {
+  return all_of(
+#if HAVE_EXECUTION && HAVE_TBB
+    execution::par,
+#endif
+      cluster.begin(), cluster.end(), [](const Server &s) {
+    return s.isListening();
+  });
 }
 
-bool Cluster::startServer(Server &server) {
-  if (server.start().has_value()) {
+bool Cluster::ensureListening() {
+  if (!start()) {
+    return false;
+  }
+  auto listening = all_of(
+#if HAVE_EXECUTION && HAVE_TBB
+    execution::par,
+#endif
+  cluster.begin(), cluster.end(), [](Server &s) {
+    return s.ensureListening();
+  });
+
+  pids.clear();
+  for (auto &server : cluster) {
     pids[server.getPid()] = &server;
-    return true;
   }
-  return false;
+
+  return listening;
 }
 
 void Cluster::wait() {
   siginfo_t inf;
 
   while (!isStopped()) {
+#if HAVE_WAITID_NOWAIT
     if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) {
       perror("Cluster::wait waitid()");
       return;
@@ -109,5 +108,14 @@ void Cluster::wait() {
     if (server != pids.end()) {
       server->second->wait();
     }
+#else
+    this_thread::sleep_for(100ms);
+#endif
   }
 }
+
+bool Cluster::restart() {
+  stop();
+  wait();
+  return start();
+}