From c0b779f4045f4858701b3741af805414bc066717 Mon Sep 17 00:00:00 2001 From: Michael Wallner Date: Thu, 10 Sep 2020 12:28:32 +0200 Subject: [PATCH 1/1] flush --- testing/lib/Cluster.cpp | 8 +-- testing/lib/Connection.cpp | 2 +- testing/lib/ForkAndExec.cpp | 89 +++++++++++++++++------- testing/lib/ForkAndExec.hpp | 11 ++- testing/lib/MemcachedCluster.cpp | 2 +- testing/lib/Server.cpp | 70 +++++++++++++++---- testing/lib/Server.hpp | 21 ++++-- testing/lib/common.hpp | 11 ++- testing/tests/hashkit/basic.cpp | 17 ++--- testing/tests/memcached/encoding_key.cpp | 16 ++--- testing/tests/memcached/inc_dec.cpp | 4 +- 11 files changed, 175 insertions(+), 76 deletions(-) diff --git a/testing/lib/Cluster.cpp b/testing/lib/Cluster.cpp index 4c6ea52c..c3c625cf 100644 --- a/testing/lib/Cluster.cpp +++ b/testing/lib/Cluster.cpp @@ -44,6 +44,7 @@ bool Cluster::start() { void Cluster::stop() { for (auto &server : cluster) { + server.drain(); // no cookies for memcached; TERM is just too slow server.signal(SIGKILL); } @@ -65,7 +66,7 @@ bool Cluster::isListening() { // zombie? auto old_pid = server.getPid(); if (server.tryWait()) { - cerr << "zombie collected (old pid=" << old_pid << "): " << server << "\n"; + cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n"; pids.erase(old_pid); // restart startServer(server); @@ -85,9 +86,8 @@ bool Cluster::isListening() { } bool Cluster::startServer(Server &server) { - auto pid = server.start(); - if (pid.has_value()) { - pids[*pid] = &server; + if (server.start().has_value()) { + pids[server.getPid()] = &server; return true; } return false; diff --git a/testing/lib/Connection.cpp b/testing/lib/Connection.cpp index c4e01d97..72c3b50e 100644 --- a/testing/lib/Connection.cpp +++ b/testing/lib/Connection.cpp @@ -81,7 +81,7 @@ Connection::Connection(Connection &&conn) noexcept { } Connection &Connection::operator=(Connection &&conn) noexcept { - Connection copy(forward(conn)); + Connection copy(move(conn)); copy.swap(*this); return *this; } diff --git a/testing/lib/ForkAndExec.cpp b/testing/lib/ForkAndExec.cpp index 271af306..5274dba9 100644 --- a/testing/lib/ForkAndExec.cpp +++ b/testing/lib/ForkAndExec.cpp @@ -8,47 +8,88 @@ #include ForkAndExec::ForkAndExec(const char *binary_, char **argv_) -: binary{binary_} +: ready{-1, -1} +, pipes{-1, -1} +, binary{binary_} , argv{argv_} { - if (pipe2(ready, O_CLOEXEC | O_NONBLOCK)) { - int error = errno; - perror("Server::start pipe2()"); - throw system_error(error, system_category()); - } } ForkAndExec::~ForkAndExec() { - if (ready[0] != -1) { - close(ready[0]); - } - if (ready[1] != -1) { - close(ready[1]); + for (auto &pipe : {ready, pipes}) { + for (auto m : {READ, WRITE}) { + closePipe(pipe[m]); + } } } -optional ForkAndExec::operator()() { - if (ready[0] == -1) { - return {}; +int ForkAndExec::createPipe() { + if (pipes[mode::READ] == -1) { + if(pipe2(pipes, O_NONBLOCK)) { + perror("ForkAndExec pipe()"); + return -1; + } } - if (ready[1] != -1) { - close(ready[1]); - ready[1] = -1; + + return pipes[mode::READ]; +} + +pid_t ForkAndExec::operator()() { + if (!prepareExecReadyPipe()) { + return 0; } switch (pid_t pid = fork()) { case 0: + closePipe(pipes[mode::READ]); + prepareOutputPipe(); execvp(binary, argv); - [[fallthrough]]; + perror("ForkAndExec exec()"); + _exit(EXIT_FAILURE); + case -1: - perror("fork() && exec()"); - return {}; + perror("ForkAndExec fork()"); + return 0; default: - pollfd fd{ready[0], 0, 0}; - if (1 > poll(&fd, 1, 5000)) { - cerr << "exec() timed out" << endl; - } + pipes[mode::READ] = -1; + closePipe(pipes[mode::WRITE]); + pollExecReadyPipe(); return pid; } } + +bool ForkAndExec::prepareExecReadyPipe() { + if (ready[mode::READ] == -1) { + if (pipe2(ready, O_CLOEXEC | O_NONBLOCK)) { + perror("ForkAndExec pipe()"); + return false; + } + closePipe(ready[mode::WRITE]); + } + + return true; +} + +void ForkAndExec::prepareOutputPipe() { + if (pipes[mode::WRITE] != -1) { + if (0 > dup2(pipes[mode::WRITE], STDERR_FILENO) || + 0 > dup2(pipes[mode::WRITE], STDOUT_FILENO)) { + perror("ForkAndExec dup()"); + } + } +} + +void ForkAndExec::closePipe(int &fd) { + if (fd != -1) { + close(fd); + fd = -1; + } +} + +void ForkAndExec::pollExecReadyPipe() { + pollfd fd{ready[mode::READ], 0, 0}; + if (1 > poll(&fd, 1, 5000)) { + cerr << "exec() timed out" << endl; + } +} diff --git a/testing/lib/ForkAndExec.hpp b/testing/lib/ForkAndExec.hpp index ff70abb9..ad7b4151 100644 --- a/testing/lib/ForkAndExec.hpp +++ b/testing/lib/ForkAndExec.hpp @@ -4,7 +4,7 @@ class ForkAndExec { public: - enum { READ, WRITE } pipe; + enum mode { READ, WRITE }; ForkAndExec(const char *binary, char **argv); ~ForkAndExec(); @@ -14,10 +14,17 @@ public: ForkAndExec(ForkAndExec &&) = default; ForkAndExec &operator = (ForkAndExec &&) = default; - optional operator () (); + [[nodiscard]] + int createPipe(); + pid_t operator () (); private: int ready[2], pipes[2]; const char *binary; char **argv; + + bool prepareExecReadyPipe(); + void prepareOutputPipe(); + void closePipe(int &fd); + void pollExecReadyPipe(); }; diff --git a/testing/lib/MemcachedCluster.cpp b/testing/lib/MemcachedCluster.cpp index 9da89a96..6b9af3e3 100644 --- a/testing/lib/MemcachedCluster.cpp +++ b/testing/lib/MemcachedCluster.cpp @@ -42,7 +42,7 @@ MemcachedCluster::MemcachedCluster() } MemcachedCluster::MemcachedCluster(Cluster &&cluster_) -: cluster{forward(cluster_)} +: cluster{move(cluster_)} { init(); } diff --git a/testing/lib/Server.cpp b/testing/lib/Server.cpp index e5ef5ead..481a9fd6 100644 --- a/testing/lib/Server.cpp +++ b/testing/lib/Server.cpp @@ -13,6 +13,9 @@ Server::Server(string binary_, Server::argv_t args_) Server::~Server() { stop(); wait(); + if (pipe != -1) { + close(pipe); + } if (holds_alternative(socket_or_port)) { unlink(get(socket_or_port).c_str()); } @@ -57,7 +60,7 @@ vector Server::createArgv() { vector arr; pushArg(arr, binary); - pushArg(arr, "-v"); + //pushArg(arr, "-v"); for (auto it = args.cbegin(); it != args.cend(); ++it) { if (holds_alternative(*it)) { @@ -87,23 +90,23 @@ vector Server::createArgv() { return arr; } -optional Server::start() { - if (pid) { - return pid; - } +optional Server::start() { + if (!pid) { + auto argv = createArgv(); + ForkAndExec fork_and_exec{binary.c_str(), argv.data()}; - auto argv = createArgv(); - auto child = ForkAndExec{binary.c_str(), argv.data()}(); + pipe = fork_and_exec.createPipe(); + pid = fork_and_exec(); - for (auto argp : argv) { - delete [] argp; - } + for (auto argp : argv) { + delete [] argp; + } - if (child.has_value()) { - pid = child.value(); + if (!pid) { + return {}; + } } - - return child; + return ChildProc{pid, pipe}; } bool Server::isListening() { @@ -140,7 +143,15 @@ bool Server::check() { bool Server::wait(int flags) { if (pid && pid == waitpid(pid, &status, flags)) { + if (drain().length()) { + cerr << "Ouput of " << *this << ":\n" << output << endl; + output.clear(); + } pid = 0; + if (pipe != -1) { + close(pipe); + pipe = -1; + } return true; } return false; @@ -179,3 +190,34 @@ const socket_or_port_t &Server::getSocketOrPort() const { return socket_or_port; } +int Server::getPipe() const { + return pipe; +} + +string &Server::drain() { + if (pipe != -1) { + again: + char read_buf[1<<12]; + auto read_len = read(pipe, read_buf, sizeof(read_buf)); + + if (read_len > 0) { + output.append(read_buf, read_len); + goto again; + } + if (read_len == -1) { + switch (errno) { + case EINTR: + goto again; + default: + perror("Server::drain read()"); + [[fallthrough]]; + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + break; + } + } + } + return output; +} diff --git a/testing/lib/Server.hpp b/testing/lib/Server.hpp index 49f75cb0..fb1b8af1 100644 --- a/testing/lib/Server.hpp +++ b/testing/lib/Server.hpp @@ -29,21 +29,30 @@ public: binary = exchange(s.binary, "false"); args = exchange(s.args, {}); pid = exchange(s.pid, 0); + pipe = exchange(s.pipe, -1); status = exchange(s.status, 0); signalled = exchange(s.signalled, {}); socket_or_port = exchange(s.socket_or_port, {}); + output = exchange(s.output, {}); return *this; }; pid_t getPid() const; - + int getPipe() const; const string &getBinary() const; - const argv_t &getArgs() const; - const socket_or_port_t &getSocketOrPort() const; - optional start(); + struct ChildProc { + pid_t pid; + int pipe; + ChildProc(pid_t pid_, int pipe_) + : pid{pid_} + , pipe{pipe_} + { + } + }; + optional start(); bool stop(); bool signal(int signo = SIGTERM); @@ -52,14 +61,18 @@ public: bool wait(int flags = 0); bool tryWait(); + string &drain(); + private: string binary; argv_t args; pid_t pid = 0; + int pipe = -1; int status = 0; unordered_map signalled; socket_or_port_t socket_or_port = 11211; + string output; [[nodiscard]] vector createArgv(); diff --git a/testing/lib/common.hpp b/testing/lib/common.hpp index c1ef43ae..9f957822 100644 --- a/testing/lib/common.hpp +++ b/testing/lib/common.hpp @@ -57,21 +57,20 @@ inline memcached_return_t fetch_all_results(memcached_st *memc, unsigned int &ke class MemcachedPtr { public: - memcached_st memc; + memcached_st *memc; explicit MemcachedPtr(memcached_st *memc_) { - memset(&memc, 0, sizeof(memc)); - REQUIRE(memcached_clone(&memc, memc_)); + memc = memc_; } MemcachedPtr() - : MemcachedPtr(nullptr) + : MemcachedPtr(memcached_create(nullptr)) {} ~MemcachedPtr() { - memcached_free(&memc); + memcached_free(memc); } memcached_st *operator * () { - return &memc; + return memc; } }; diff --git a/testing/tests/hashkit/basic.cpp b/testing/tests/hashkit/basic.cpp index 42dda292..9dcec525 100644 --- a/testing/tests/hashkit/basic.cpp +++ b/testing/tests/hashkit/basic.cpp @@ -7,6 +7,7 @@ TEST_CASE("hashkit") { hashkit_st st, *hp = hashkit_create(nullptr); Hashkit stack; Hashkit *heap = new Hashkit; + auto newed = unique_ptr(heap); REQUIRE(hashkit_create(&st)); REQUIRE(hp); @@ -57,15 +58,12 @@ TEST_CASE("hashkit") { REQUIRE(HASHKIT_SUCCESS == stack.set_function(h)); REQUIRE(HASHKIT_SUCCESS == hashkit_set_function(&st, h)); - DYNAMIC_SECTION("can digest hash function: " << libhashkit_string_hash(h)) { - auto n = 0; - - for (auto i : input) { - CHECK(output[f][n] == stack.digest(S(i))); - CHECK(output[f][n] == hashkit_digest(&st, S(i))); - CHECK(output[f][n] == libhashkit_digest(S(i), h)); - ++n; - } + auto n = 0; + for (auto i : input) { + CHECK(output[f][n] == stack.digest(S(i))); + CHECK(output[f][n] == hashkit_digest(&st, S(i))); + CHECK(output[f][n] == libhashkit_digest(S(i), h)); + ++n; } } } @@ -81,7 +79,6 @@ TEST_CASE("hashkit") { REQUIRE_FALSE(hashkit_compare(&st, hp)); } - delete heap; hashkit_free(&st); hashkit_free(hp); } diff --git a/testing/tests/memcached/encoding_key.cpp b/testing/tests/memcached/encoding_key.cpp index 825bfa69..c4616ce4 100644 --- a/testing/tests/memcached/encoding_key.cpp +++ b/testing/tests/memcached/encoding_key.cpp @@ -29,7 +29,7 @@ TEST_CASE("memcached encoding_key") { auto memc = &test.memc; SECTION("accepts encoding key") { - MemcachedPtr copy(memc); + MemcachedPtr copy(memcached_clone(nullptr, memc)); REQUIRE_SUCCESS(memcached_set_encoding_key(memc, S(__func__))); @@ -37,13 +37,13 @@ TEST_CASE("memcached encoding_key") { REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0)); SECTION("gets encoded value") { - check(memc, ©.memc, INITIAL_VAL); + check(memc, copy.memc, INITIAL_VAL); } SECTION("cloned gets encoded value") { - MemcachedPtr dupe(memc); + MemcachedPtr dupe(memcached_clone(nullptr, memc)); - check(&dupe.memc, ©.memc, INITIAL_VAL); + check(dupe.memc, copy.memc, INITIAL_VAL); } } @@ -52,26 +52,26 @@ TEST_CASE("memcached encoding_key") { REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0)); REQUIRE_RC(MEMCACHED_NOTSTORED, memcached_add(memc, TEST_KEY, REPLACED_VAL, 0, 0)); - check(memc, ©.memc, INITIAL_VAL); + check(memc, copy.memc, INITIAL_VAL); test.flush(); REQUIRE_SUCCESS(memcached_add(memc, TEST_KEY, REPLACED_VAL, 0, 0)); SECTION("gets encoded value") { - check(memc, ©.memc, REPLACED_VAL); + check(memc, copy.memc, REPLACED_VAL); } } SECTION("replaces encoded value") { REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0)); - check(memc, ©.memc, INITIAL_VAL); + check(memc, copy.memc, INITIAL_VAL); REQUIRE_SUCCESS(memcached_replace(memc, TEST_KEY, REPLACED_VAL, 0, 0)); SECTION("gets encoded value") { - check(memc, ©.memc, REPLACED_VAL); + check(memc, copy.memc, REPLACED_VAL); } } diff --git a/testing/tests/memcached/inc_dec.cpp b/testing/tests/memcached/inc_dec.cpp index e7dfb4a0..dff068b6 100644 --- a/testing/tests/memcached/inc_dec.cpp +++ b/testing/tests/memcached/inc_dec.cpp @@ -9,10 +9,10 @@ TEST_CASE("memcached inc_dec") { LOOPED_SECTION(tests) { auto memc = &test.memc; uint64_t binary = GENERATE(0, 1); - void *prefix = GENERATE(as{}, nullptr, "namespace:"); + char *prefix = GENERATE(as{}, "", "namespace:"); REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, binary)); - if (prefix) { + if (*prefix) { REQUIRE_SUCCESS(memcached_callback_set(memc, MEMCACHED_CALLBACK_NAMESPACE, prefix)); } -- 2.30.2