flush [ci skip]
[awesomized/libmemcached] / testing / lib / Cluster.cpp
index 8a60149568d36832909a8241f920c91f91bdcf8b..5d2ab42a7eb3e5f42fbf18078b3099e086d99bb1 100644 (file)
@@ -2,6 +2,25 @@
 
 #include <sys/wait.h>
 
+Cluster::Cluster(Server &&serv, uint16_t cnt)
+: count{cnt}
+, proto{forward<Server>(serv)}
+{
+  if (!cnt) {
+    count = thread::hardware_concurrency()/2 ?: 4;
+  }
+  reset();
+}
+
+Cluster::~Cluster() {
+  stop();
+  wait();
+}
+
+const vector<Server> &Cluster::getServers() const {
+  return cluster;
+}
+
 void Cluster::reset() {
   pids.clear();
   cluster.clear();
@@ -15,8 +34,10 @@ bool Cluster::start() {
 
   for (auto &server : cluster) {
     auto pid = server.start();
-    if (started &= pid.has_value()) {
+    if (pid.has_value()) {
       pids[*pid] = &server;
+    } else {
+      started = false;
     }
   }
 
@@ -25,7 +46,8 @@ bool Cluster::start() {
 
 void Cluster::stop() {
   for (auto &server : cluster) {
-    server.stop();
+    // no cookies for memcached; TERM is just too slow
+    server.signal(SIGKILL);
   }
 }
 
@@ -38,22 +60,23 @@ bool Cluster::isStopped() {
   return true;
 }
 
-bool Cluster::isListening(int max_timeout) {
-  vector<WaitForConn::conn_t> conns;
-
+bool Cluster::isListening() {
   for (auto &server : cluster) {
-    auto conn = server.createSocket();
-    if (!conn) {
-      return false;
+    if (!server.isListening()) {
+      // zombie?
+      auto old_pid = server.getPid();
+      if (server.tryWait()) {
+        pids.erase(old_pid);
+        auto pid = server.start();
+        if (pid.has_value()) {
+          pids[*pid] = &server;
+        }
+      }
+      return server.isListening();
     }
-    conns.emplace_back(conn.value());
   }
 
-  WaitForConn wait_for_conn{
-    std::move(conns),
-    Poll{POLLOUT, 2, max_timeout}
-  };
-  return wait_for_conn();
+  return true;
 }
 
 void Cluster::wait() {
@@ -65,8 +88,10 @@ void Cluster::wait() {
       return;
     }
 
-    if (pids[inf.si_pid]) {
-      pids[inf.si_pid]->wait();
+    auto server = pids.find(inf.si_pid);
+    if (server != pids.end()) {
+      server->second->wait();
     }
   }
 }
+