void Cluster::stop() {
for (auto &server : cluster) {
+ server.drain();
// no cookies for memcached; TERM is just too slow
server.signal(SIGKILL);
}
// zombie?
auto old_pid = server.getPid();
if (server.tryWait()) {
- cerr << "zombie collected (old pid=" << old_pid << "): " << server << "\n";
+ cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
pids.erase(old_pid);
// restart
startServer(server);
}
bool Cluster::startServer(Server &server) {
- auto pid = server.start();
- if (pid.has_value()) {
- pids[*pid] = &server;
+ if (server.start().has_value()) {
+ pids[server.getPid()] = &server;
return true;
}
return false;
}
Connection &Connection::operator=(Connection &&conn) noexcept {
- Connection copy(forward<Connection>(conn));
+ Connection copy(move(conn));
copy.swap(*this);
return *this;
}
#include <unistd.h>
ForkAndExec::ForkAndExec(const char *binary_, char **argv_)
-: binary{binary_}
+: ready{-1, -1}
+, pipes{-1, -1}
+, binary{binary_}
, argv{argv_}
{
- if (pipe2(ready, O_CLOEXEC | O_NONBLOCK)) {
- int error = errno;
- perror("Server::start pipe2()");
- throw system_error(error, system_category());
- }
}
ForkAndExec::~ForkAndExec() {
- if (ready[0] != -1) {
- close(ready[0]);
- }
- if (ready[1] != -1) {
- close(ready[1]);
+ for (auto &pipe : {ready, pipes}) {
+ for (auto m : {READ, WRITE}) {
+ closePipe(pipe[m]);
+ }
}
}
-optional<pid_t> ForkAndExec::operator()() {
- if (ready[0] == -1) {
- return {};
+int ForkAndExec::createPipe() {
+ if (pipes[mode::READ] == -1) {
+ if(pipe2(pipes, O_NONBLOCK)) {
+ perror("ForkAndExec pipe()");
+ return -1;
+ }
}
- if (ready[1] != -1) {
- close(ready[1]);
- ready[1] = -1;
+
+ return pipes[mode::READ];
+}
+
+pid_t ForkAndExec::operator()() {
+ if (!prepareExecReadyPipe()) {
+ return 0;
}
switch (pid_t pid = fork()) {
case 0:
+ closePipe(pipes[mode::READ]);
+ prepareOutputPipe();
execvp(binary, argv);
- [[fallthrough]];
+ perror("ForkAndExec exec()");
+ _exit(EXIT_FAILURE);
+
case -1:
- perror("fork() && exec()");
- return {};
+ perror("ForkAndExec fork()");
+ return 0;
default:
- pollfd fd{ready[0], 0, 0};
- if (1 > poll(&fd, 1, 5000)) {
- cerr << "exec() timed out" << endl;
- }
+ pipes[mode::READ] = -1;
+ closePipe(pipes[mode::WRITE]);
+ pollExecReadyPipe();
return pid;
}
}
+
+bool ForkAndExec::prepareExecReadyPipe() {
+ if (ready[mode::READ] == -1) {
+ if (pipe2(ready, O_CLOEXEC | O_NONBLOCK)) {
+ perror("ForkAndExec pipe()");
+ return false;
+ }
+ closePipe(ready[mode::WRITE]);
+ }
+
+ return true;
+}
+
+void ForkAndExec::prepareOutputPipe() {
+ if (pipes[mode::WRITE] != -1) {
+ if (0 > dup2(pipes[mode::WRITE], STDERR_FILENO) ||
+ 0 > dup2(pipes[mode::WRITE], STDOUT_FILENO)) {
+ perror("ForkAndExec dup()");
+ }
+ }
+}
+
+void ForkAndExec::closePipe(int &fd) {
+ if (fd != -1) {
+ close(fd);
+ fd = -1;
+ }
+}
+
+void ForkAndExec::pollExecReadyPipe() {
+ pollfd fd{ready[mode::READ], 0, 0};
+ if (1 > poll(&fd, 1, 5000)) {
+ cerr << "exec() timed out" << endl;
+ }
+}
class ForkAndExec {
public:
- enum { READ, WRITE } pipe;
+ enum mode { READ, WRITE };
ForkAndExec(const char *binary, char **argv);
~ForkAndExec();
ForkAndExec(ForkAndExec &&) = default;
ForkAndExec &operator = (ForkAndExec &&) = default;
- optional<pid_t> operator () ();
+ [[nodiscard]]
+ int createPipe();
+ pid_t operator () ();
private:
int ready[2], pipes[2];
const char *binary;
char **argv;
+
+ bool prepareExecReadyPipe();
+ void prepareOutputPipe();
+ void closePipe(int &fd);
+ void pollExecReadyPipe();
};
}
MemcachedCluster::MemcachedCluster(Cluster &&cluster_)
-: cluster{forward<Cluster>(cluster_)}
+: cluster{move(cluster_)}
{
init();
}
Server::~Server() {
stop();
wait();
+ if (pipe != -1) {
+ close(pipe);
+ }
if (holds_alternative<string>(socket_or_port)) {
unlink(get<string>(socket_or_port).c_str());
}
vector<char *> arr;
pushArg(arr, binary);
- pushArg(arr, "-v");
+ //pushArg(arr, "-v");
for (auto it = args.cbegin(); it != args.cend(); ++it) {
if (holds_alternative<arg_t>(*it)) {
return arr;
}
-optional<pid_t> Server::start() {
- if (pid) {
- return pid;
- }
+optional<Server::ChildProc> Server::start() {
+ if (!pid) {
+ auto argv = createArgv();
+ ForkAndExec fork_and_exec{binary.c_str(), argv.data()};
- auto argv = createArgv();
- auto child = ForkAndExec{binary.c_str(), argv.data()}();
+ pipe = fork_and_exec.createPipe();
+ pid = fork_and_exec();
- for (auto argp : argv) {
- delete [] argp;
- }
+ for (auto argp : argv) {
+ delete [] argp;
+ }
- if (child.has_value()) {
- pid = child.value();
+ if (!pid) {
+ return {};
+ }
}
-
- return child;
+ return ChildProc{pid, pipe};
}
bool Server::isListening() {
bool Server::wait(int flags) {
if (pid && pid == waitpid(pid, &status, flags)) {
+ if (drain().length()) {
+ cerr << "Ouput of " << *this << ":\n" << output << endl;
+ output.clear();
+ }
pid = 0;
+ if (pipe != -1) {
+ close(pipe);
+ pipe = -1;
+ }
return true;
}
return false;
return socket_or_port;
}
+int Server::getPipe() const {
+ return pipe;
+}
+
+string &Server::drain() {
+ if (pipe != -1) {
+ again:
+ char read_buf[1<<12];
+ auto read_len = read(pipe, read_buf, sizeof(read_buf));
+
+ if (read_len > 0) {
+ output.append(read_buf, read_len);
+ goto again;
+ }
+ if (read_len == -1) {
+ switch (errno) {
+ case EINTR:
+ goto again;
+ default:
+ perror("Server::drain read()");
+ [[fallthrough]];
+ case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ break;
+ }
+ }
+ }
+ return output;
+}
binary = exchange(s.binary, "false");
args = exchange(s.args, {});
pid = exchange(s.pid, 0);
+ pipe = exchange(s.pipe, -1);
status = exchange(s.status, 0);
signalled = exchange(s.signalled, {});
socket_or_port = exchange(s.socket_or_port, {});
+ output = exchange(s.output, {});
return *this;
};
pid_t getPid() const;
-
+ int getPipe() const;
const string &getBinary() const;
-
const argv_t &getArgs() const;
-
const socket_or_port_t &getSocketOrPort() const;
- optional<pid_t> start();
+ struct ChildProc {
+ pid_t pid;
+ int pipe;
+ ChildProc(pid_t pid_, int pipe_)
+ : pid{pid_}
+ , pipe{pipe_}
+ {
+ }
+ };
+ optional<ChildProc> start();
bool stop();
bool signal(int signo = SIGTERM);
bool wait(int flags = 0);
bool tryWait();
+ string &drain();
+
private:
string binary;
argv_t args;
pid_t pid = 0;
+ int pipe = -1;
int status = 0;
unordered_map<int, unsigned> signalled;
socket_or_port_t socket_or_port = 11211;
+ string output;
[[nodiscard]]
vector<char *> createArgv();
class MemcachedPtr {
public:
- memcached_st memc;
+ memcached_st *memc;
explicit
MemcachedPtr(memcached_st *memc_) {
- memset(&memc, 0, sizeof(memc));
- REQUIRE(memcached_clone(&memc, memc_));
+ memc = memc_;
}
MemcachedPtr()
- : MemcachedPtr(nullptr)
+ : MemcachedPtr(memcached_create(nullptr))
{}
~MemcachedPtr() {
- memcached_free(&memc);
+ memcached_free(memc);
}
memcached_st *operator * () {
- return &memc;
+ return memc;
}
};
hashkit_st st, *hp = hashkit_create(nullptr);
Hashkit stack;
Hashkit *heap = new Hashkit;
+ auto newed = unique_ptr<Hashkit>(heap);
REQUIRE(hashkit_create(&st));
REQUIRE(hp);
REQUIRE(HASHKIT_SUCCESS == stack.set_function(h));
REQUIRE(HASHKIT_SUCCESS == hashkit_set_function(&st, h));
- DYNAMIC_SECTION("can digest hash function: " << libhashkit_string_hash(h)) {
- auto n = 0;
-
- for (auto i : input) {
- CHECK(output[f][n] == stack.digest(S(i)));
- CHECK(output[f][n] == hashkit_digest(&st, S(i)));
- CHECK(output[f][n] == libhashkit_digest(S(i), h));
- ++n;
- }
+ auto n = 0;
+ for (auto i : input) {
+ CHECK(output[f][n] == stack.digest(S(i)));
+ CHECK(output[f][n] == hashkit_digest(&st, S(i)));
+ CHECK(output[f][n] == libhashkit_digest(S(i), h));
+ ++n;
}
}
}
REQUIRE_FALSE(hashkit_compare(&st, hp));
}
- delete heap;
hashkit_free(&st);
hashkit_free(hp);
}
auto memc = &test.memc;
SECTION("accepts encoding key") {
- MemcachedPtr copy(memc);
+ MemcachedPtr copy(memcached_clone(nullptr, memc));
REQUIRE_SUCCESS(memcached_set_encoding_key(memc, S(__func__)));
REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0));
SECTION("gets encoded value") {
- check(memc, ©.memc, INITIAL_VAL);
+ check(memc, copy.memc, INITIAL_VAL);
}
SECTION("cloned gets encoded value") {
- MemcachedPtr dupe(memc);
+ MemcachedPtr dupe(memcached_clone(nullptr, memc));
- check(&dupe.memc, ©.memc, INITIAL_VAL);
+ check(dupe.memc, copy.memc, INITIAL_VAL);
}
}
REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0));
REQUIRE_RC(MEMCACHED_NOTSTORED, memcached_add(memc, TEST_KEY, REPLACED_VAL, 0, 0));
- check(memc, ©.memc, INITIAL_VAL);
+ check(memc, copy.memc, INITIAL_VAL);
test.flush();
REQUIRE_SUCCESS(memcached_add(memc, TEST_KEY, REPLACED_VAL, 0, 0));
SECTION("gets encoded value") {
- check(memc, ©.memc, REPLACED_VAL);
+ check(memc, copy.memc, REPLACED_VAL);
}
}
SECTION("replaces encoded value") {
REQUIRE_SUCCESS(memcached_set(memc, TEST_KEY, INITIAL_VAL, 0, 0));
- check(memc, ©.memc, INITIAL_VAL);
+ check(memc, copy.memc, INITIAL_VAL);
REQUIRE_SUCCESS(memcached_replace(memc, TEST_KEY, REPLACED_VAL, 0, 0));
SECTION("gets encoded value") {
- check(memc, ©.memc, REPLACED_VAL);
+ check(memc, copy.memc, REPLACED_VAL);
}
}
LOOPED_SECTION(tests) {
auto memc = &test.memc;
uint64_t binary = GENERATE(0, 1);
- void *prefix = GENERATE(as<void *>{}, nullptr, "namespace:");
+ char *prefix = GENERATE(as<char *>{}, "", "namespace:");
REQUIRE_SUCCESS(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, binary));
- if (prefix) {
+ if (*prefix) {
REQUIRE_SUCCESS(memcached_callback_set(memc, MEMCACHED_CALLBACK_NAMESPACE, prefix));
}