flush
authorMichael Wallner <mike@php.net>
Thu, 10 Sep 2020 10:28:32 +0000 (12:28 +0200)
committerMichael Wallner <mike@php.net>
Thu, 10 Sep 2020 10:28:32 +0000 (12:28 +0200)
testing/lib/Cluster.cpp
testing/lib/Connection.cpp
testing/lib/ForkAndExec.cpp
testing/lib/ForkAndExec.hpp
testing/lib/MemcachedCluster.cpp
testing/lib/Server.cpp
testing/lib/Server.hpp
testing/lib/common.hpp
testing/tests/hashkit/basic.cpp
testing/tests/memcached/encoding_key.cpp
testing/tests/memcached/inc_dec.cpp

index 4c6ea52c258f49c256d4987d8ce5336666bc64c3..c3c625cf04eb225069a08d4b5f732ea68132eade 100644 (file)
@@ -44,6 +44,7 @@ bool Cluster::start() {
 
 void Cluster::stop() {
   for (auto &server : cluster) {
+    server.drain();
     // no cookies for memcached; TERM is just too slow
     server.signal(SIGKILL);
   }
@@ -65,7 +66,7 @@ bool Cluster::isListening() {
         // zombie?
         auto old_pid = server.getPid();
         if (server.tryWait()) {
-          cerr << "zombie collected (old pid=" << old_pid << "): " << server << "\n";
+          cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
           pids.erase(old_pid);
           // restart
           startServer(server);
@@ -85,9 +86,8 @@ bool Cluster::isListening() {
 }
 
 bool Cluster::startServer(Server &server) {
-  auto pid = server.start();
-  if (pid.has_value()) {
-    pids[*pid] = &server;
+  if (server.start().has_value()) {
+    pids[server.getPid()] = &server;
     return true;
   }
   return false;
index c4e01d97d503fe0cb1fd74218f1c060b34302b56..72c3b50ea57065af252b92ca8a620a9cbd12ee5e 100644 (file)
@@ -81,7 +81,7 @@ Connection::Connection(Connection &&conn) noexcept {
 }
 
 Connection &Connection::operator=(Connection &&conn) noexcept {
-  Connection copy(forward<Connection>(conn));
+  Connection copy(move(conn));
   copy.swap(*this);
   return *this;
 }
index 271af306681f5e1c86abf03feda7dc22b2c864d2..5274dba99dc48745c8b5c1fecae37f6d48607668 100644 (file)
@@ -8,47 +8,88 @@
 #include <unistd.h>
 
 ForkAndExec::ForkAndExec(const char *binary_, char **argv_)
-: binary{binary_}
+: ready{-1, -1}
+, pipes{-1, -1}
+, binary{binary_}
 , argv{argv_}
 {
-  if (pipe2(ready, O_CLOEXEC | O_NONBLOCK)) {
-    int error = errno;
-    perror("Server::start pipe2()");
-    throw system_error(error, system_category());
-  }
 }
 
 ForkAndExec::~ForkAndExec() {
-  if (ready[0] != -1) {
-    close(ready[0]);
-  }
-  if (ready[1] != -1) {
-    close(ready[1]);
+  for (auto &pipe : {ready, pipes}) {
+    for (auto m : {READ, WRITE}) {
+      closePipe(pipe[m]);
+    }
   }
 }
 
-optional<pid_t> ForkAndExec::operator()()  {
-  if (ready[0] == -1) {
-    return {};
+int ForkAndExec::createPipe() {
+  if (pipes[mode::READ] == -1) {
+    if(pipe2(pipes, O_NONBLOCK)) {
+      perror("ForkAndExec pipe()");
+      return -1;
+    }
   }
-  if (ready[1] != -1) {
-    close(ready[1]);
-    ready[1] = -1;
+
+  return pipes[mode::READ];
+}
+
+pid_t ForkAndExec::operator()()  {
+  if (!prepareExecReadyPipe()) {
+    return 0;
   }
 
   switch (pid_t pid = fork()) {
     case 0:
+      closePipe(pipes[mode::READ]);
+      prepareOutputPipe();
       execvp(binary, argv);
-      [[fallthrough]];
+      perror("ForkAndExec exec()");
+      _exit(EXIT_FAILURE);
+
     case -1:
-      perror("fork() && exec()");
-      return {};
+      perror("ForkAndExec fork()");
+      return 0;
 
     default:
-      pollfd fd{ready[0], 0, 0};
-      if (1 > poll(&fd, 1, 5000)) {
-        cerr << "exec() timed out" << endl;
-      }
+      pipes[mode::READ] = -1;
+      closePipe(pipes[mode::WRITE]);
+      pollExecReadyPipe();
       return pid;
   }
 }
+
+bool ForkAndExec::prepareExecReadyPipe() {
+  if (ready[mode::READ] == -1) {
+    if (pipe2(ready, O_CLOEXEC | O_NONBLOCK)) {
+      perror("ForkAndExec pipe()");
+      return false;
+    }
+    closePipe(ready[mode::WRITE]);
+  }
+
+  return true;
+}
+
+void ForkAndExec::prepareOutputPipe() {
+  if (pipes[mode::WRITE] != -1) {
+    if (0 > dup2(pipes[mode::WRITE], STDERR_FILENO) ||
+        0 > dup2(pipes[mode::WRITE], STDOUT_FILENO)) {
+      perror("ForkAndExec dup()");
+    }
+  }
+}
+
+void ForkAndExec::closePipe(int &fd) {
+  if (fd != -1) {
+    close(fd);
+    fd = -1;
+  }
+}
+
+void ForkAndExec::pollExecReadyPipe() {
+  pollfd fd{ready[mode::READ], 0, 0};
+  if (1 > poll(&fd, 1, 5000)) {
+    cerr << "exec() timed out" << endl;
+  }
+}
index ff70abb98f20e40825b37496681d85ad08529f7b..ad7b415150c38ec04b911c749263ec21cc7be194 100644 (file)
@@ -4,7 +4,7 @@
 
 class ForkAndExec {
 public:
-  enum { READ, WRITE } pipe;
+  enum mode { READ, WRITE };
 
   ForkAndExec(const char *binary, char **argv);
   ~ForkAndExec();
@@ -14,10 +14,17 @@ public:
   ForkAndExec(ForkAndExec &&) = default;
   ForkAndExec &operator = (ForkAndExec &&) = default;
 
-  optional<pid_t> operator () ();
+  [[nodiscard]]
+  int createPipe();
+  pid_t operator () ();
 
 private:
   int ready[2], pipes[2];
   const char *binary;
   char **argv;
+
+  bool prepareExecReadyPipe();
+  void prepareOutputPipe();
+  void closePipe(int &fd);
+  void pollExecReadyPipe();
 };
index 9da89a960075b13b9df86397b062e4b08719506d..6b9af3e39e4672909f7152a87a159fc8a1c79ec6 100644 (file)
@@ -42,7 +42,7 @@ MemcachedCluster::MemcachedCluster()
 }
 
 MemcachedCluster::MemcachedCluster(Cluster &&cluster_)
-: cluster{forward<Cluster>(cluster_)}
+: cluster{move(cluster_)}
 {
   init();
 }
index e5ef5ead68b12e4ddafb4e510941763f750c226d..481a9fd6eb582c3299eb1abfdebeb45f4cb2cf20 100644 (file)
@@ -13,6 +13,9 @@ Server::Server(string binary_, Server::argv_t args_)
 Server::~Server() {
   stop();
   wait();
+  if (pipe != -1) {
+    close(pipe);
+  }
   if (holds_alternative<string>(socket_or_port)) {
     unlink(get<string>(socket_or_port).c_str());
   }
@@ -57,7 +60,7 @@ vector<char *> Server::createArgv()  {
   vector<char *> arr;
 
   pushArg(arr, binary);
-  pushArg(arr, "-v");
+  //pushArg(arr, "-v");
 
   for (auto it = args.cbegin(); it != args.cend(); ++it) {
     if (holds_alternative<arg_t>(*it)) {
@@ -87,23 +90,23 @@ vector<char *> Server::createArgv()  {
   return arr;
 }
 
-optional<pid_t> Server::start() {
-  if (pid) {
-    return pid;
-  }
+optional<Server::ChildProc> Server::start() {
+  if (!pid) {
+    auto argv = createArgv();
+    ForkAndExec fork_and_exec{binary.c_str(), argv.data()};
 
-  auto argv = createArgv();
-  auto child = ForkAndExec{binary.c_str(), argv.data()}();
+    pipe = fork_and_exec.createPipe();
+    pid = fork_and_exec();
 
-  for (auto argp : argv) {
-    delete [] argp;
-  }
+    for (auto argp : argv) {
+      delete [] argp;
+    }
 
-  if (child.has_value()) {
-    pid = child.value();
+    if (!pid) {
+      return {};
+    }
   }
-
-  return child;
+  return ChildProc{pid, pipe};
 }
 
 bool Server::isListening() {
@@ -140,7 +143,15 @@ bool Server::check() {
 
 bool Server::wait(int flags) {
   if (pid && pid == waitpid(pid, &status, flags)) {
+    if (drain().length()) {
+      cerr << "Ouput of " << *this << ":\n" << output << endl;
+      output.clear();
+    }
     pid = 0;
+    if (pipe != -1) {
+      close(pipe);
+      pipe = -1;
+    }
     return true;
   }
   return false;
@@ -179,3 +190,34 @@ const socket_or_port_t &Server::getSocketOrPort() const {
   return socket_or_port;
 }
 
+int Server::getPipe() const {
+  return pipe;
+}
+
+string &Server::drain() {
+  if (pipe != -1) {
+    again:
+    char read_buf[1<<12];
+    auto read_len = read(pipe, read_buf, sizeof(read_buf));
+
+    if (read_len > 0) {
+      output.append(read_buf, read_len);
+      goto again;
+    }
+    if (read_len == -1) {
+      switch (errno) {
+      case EINTR:
+        goto again;
+      default:
+        perror("Server::drain read()");
+        [[fallthrough]];
+      case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
+      case EWOULDBLOCK:
+#endif
+        break;
+      }
+    }
+  }
+  return output;
+}
index 49f75cb0d0c1089f95ffa9ae22d443a281df74c9..fb1b8af184c04e443b60eae06404eb264dbde8a7 100644 (file)
@@ -29,21 +29,30 @@ public:
     binary = exchange(s.binary, "false");
     args = exchange(s.args, {});
     pid = exchange(s.pid, 0);
+    pipe = exchange(s.pipe, -1);
     status = exchange(s.status, 0);
     signalled = exchange(s.signalled, {});
     socket_or_port = exchange(s.socket_or_port, {});
+    output = exchange(s.output, {});
     return *this;
   };
 
   pid_t getPid() const;
-
+  int getPipe() const;
   const string &getBinary() const;
-
   const argv_t &getArgs() const;
-
   const socket_or_port_t &getSocketOrPort() const;
 
-  optional<pid_t> start();
+  struct ChildProc {
+    pid_t pid;
+    int pipe;
+    ChildProc(pid_t pid_, int pipe_)
+    : pid{pid_}
+    , pipe{pipe_}
+    {
+    }
+  };
+  optional<ChildProc> start();
   bool stop();
 
   bool signal(int signo = SIGTERM);
@@ -52,14 +61,18 @@ public:
 
   bool wait(int flags = 0);
   bool tryWait();
+  string &drain();
+
 
 private:
   string binary;
   argv_t args;
   pid_t pid = 0;
+  int pipe = -1;
   int status = 0;
   unordered_map<int, unsigned> signalled;
   socket_or_port_t socket_or_port = 11211;
+  string output;
 
   [[nodiscard]]
   vector<char *> createArgv();
index c1ef43ae9f27394b6acceb79b13e7196a0bca221..9f9578224d48581a798a6b717c9d600b35ea4641 100644 (file)
@@ -57,21 +57,20 @@ inline memcached_return_t fetch_all_results(memcached_st *memc, unsigned int &ke
 
 class MemcachedPtr {
 public:
-  memcached_st memc;
+  memcached_st *memc;
 
   explicit
   MemcachedPtr(memcached_st *memc_) {
-    memset(&memc, 0, sizeof(memc));
-    REQUIRE(memcached_clone(&memc, memc_));
+    memc = memc_;
   }
   MemcachedPtr()
-  : MemcachedPtr(nullptr)
+  : MemcachedPtr(memcached_create(nullptr))
   {}
   ~MemcachedPtr() {
-    memcached_free(&memc);
+    memcached_free(memc);
   }
   memcached_st *operator * () {
-    return &memc;
+    return memc;
   }
 };
 
index 42dda2920d1bdc8a12ba1da0e801823145b7a737..9dcec525247bf2520bd8427652967aee09fa1377 100644 (file)
@@ -7,6 +7,7 @@ TEST_CASE("hashkit") {
   hashkit_st st, *hp = hashkit_create(nullptr);
   Hashkit stack;
   Hashkit *heap = new Hashkit;
+  auto newed = unique_ptr<Hashkit>(heap);
 
   REQUIRE(hashkit_create(&st));
   REQUIRE(hp);
@@ -57,15 +58,12 @@ TEST_CASE("hashkit") {
       REQUIRE(HASHKIT_SUCCESS == stack.set_function(h));
       REQUIRE(HASHKIT_SUCCESS == hashkit_set_function(&st, h));
 
-      DYNAMIC_SECTION("can digest hash function: " << libhashkit_string_hash(h)) {
-        auto n = 0;
-
-        for (auto i : input) {
-          CHECK(output[f][n] == stack.digest(S(i)));
-          CHECK(output[f][n] == hashkit_digest(&st, S(i)));
-          CHECK(output[f][n] == libhashkit_digest(S(i), h));
-          ++n;
-        }
+      auto n = 0;
+      for (auto i : input) {
+        CHECK(output[f][n] == stack.digest(S(i)));
+        CHECK(output[f][n] == hashkit_digest(&st, S(i)));
+        CHECK(output[f][n] == libhashkit_digest(S(i), h));
+        ++n;
       }
     }
   }
@@ -81,7 +79,6 @@ TEST_CASE("hashkit") {
     REQUIRE_FALSE(hashkit_compare(&st, hp));
   }
 
-  delete heap;
   hashkit_free(&st);
   hashkit_free(hp);
 }
index 825bfa69b590c91c2d2e3ef65de11d8e3ab627d3..c4616ce4f17496397268a1773811ed2aff77962c 100644 (file)
@@ -29,7 +29,7 @@ TEST_CASE("memcached encoding_key") {
     auto memc = &test.memc;
 
     SECTION("accepts encoding key") {
-      MemcachedPtr copy(memc);
+      MemcachedPtr copy(memcached_clone(nullptr, memc));
 
       REQUIRE_SUCCESS(memcached_set_encoding_key(memc, S(__func__)));
 
@@ -37,13 +37,13 @@ TEST_CASE("memcached encoding_key") {
         REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0));
 
         SECTION("gets encoded value") {
-          check(memc, &copy.memc, INITIAL_VAL);
+          check(memc, copy.memc, INITIAL_VAL);
         }
 
         SECTION("cloned gets encoded value") {
-          MemcachedPtr dupe(memc);
+          MemcachedPtr dupe(memcached_clone(nullptr, memc));
 
-          check(&dupe.memc, &copy.memc, INITIAL_VAL);
+          check(dupe.memc, copy.memc, INITIAL_VAL);
         }
       }
 
@@ -52,26 +52,26 @@ TEST_CASE("memcached encoding_key") {
         REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0));
         REQUIRE_RC(MEMCACHED_NOTSTORED, memcached_add(memc, TEST_KEY, REPLACED_VAL, 0, 0));
 
-        check(memc, &copy.memc, INITIAL_VAL);
+        check(memc, copy.memc, INITIAL_VAL);
 
         test.flush();
 
         REQUIRE_SUCCESS(memcached_add(memc, TEST_KEY, REPLACED_VAL, 0, 0));
 
         SECTION("gets encoded value") {
-          check(memc, &copy.memc, REPLACED_VAL);
+          check(memc, copy.memc, REPLACED_VAL);
         }
       }
 
       SECTION("replaces encoded value") {
         REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0));
 
-        check(memc, &copy.memc, INITIAL_VAL);
+        check(memc, copy.memc, INITIAL_VAL);
 
         REQUIRE_SUCCESS(memcached_replace(memc, TEST_KEY, REPLACED_VAL, 0, 0));
 
         SECTION("gets encoded value") {
-          check(memc, &copy.memc, REPLACED_VAL);
+          check(memc, copy.memc, REPLACED_VAL);
         }
       }
 
index e7dfb4a01811032fb4d5edcaf4652f1179f2bdc5..dff068b64dba09144fb563aeebac521869c5965e 100644 (file)
@@ -9,10 +9,10 @@ TEST_CASE("memcached inc_dec") {
   LOOPED_SECTION(tests) {
     auto memc = &test.memc;
     uint64_t binary = GENERATE(0, 1);
-    void *prefix = GENERATE(as<void *>{}, nullptr, "namespace:");
+    char *prefix = GENERATE(as<char *>{}, "", "namespace:");
 
     REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, binary));
-    if (prefix) {
+    if (*prefix) {
       REQUIRE_SUCCESS(memcached_callback_set(memc, MEMCACHED_CALLBACK_NAMESPACE, prefix));
     }