flush
[awesomized/libmemcached] / testing / lib / Cluster.cpp
1 #include "Cluster.hpp"
2 #include "Retry.hpp"
3
4 #include <sys/wait.h>
5
6 Cluster::Cluster(Server serv, uint16_t cnt)
7 : count{cnt}
8 , proto{move(serv)}
9 {
10 if (!cnt) {
11 count = thread::hardware_concurrency()/2 ?: 4;
12 }
13 reset();
14 }
15
16 Cluster::~Cluster() {
17 stop();
18 wait();
19 }
20
21 const vector<Server> &Cluster::getServers() const {
22 return cluster;
23 }
24
25 void Cluster::reset() {
26 pids.clear();
27 cluster.clear();
28 for (int i = 0; i < count; ++i) {
29 cluster.push_back(proto);
30 }
31 }
32
33 bool Cluster::start() {
34 bool started = true;
35
36 for (auto &server : cluster) {
37 if (!startServer(server)) {
38 started = false;
39 }
40 }
41
42 return started;
43 }
44
45 void Cluster::stop() {
46 for (auto &server : cluster) {
47 // no cookies for memcached; TERM is just too slow
48 server.signal(SIGKILL);
49 }
50 }
51
52 bool Cluster::isStopped() {
53 for (auto &server : cluster) {
54 if (server.getPid() && !server.tryWait()) {
55 return false;
56 }
57 }
58 return true;
59 }
60
61 bool Cluster::isListening() {
62 for (auto &server : cluster) {
63 Retry server_is_listening{[&] {
64 if (!server.isListening()) {
65 // zombie?
66 auto old_pid = server.getPid();
67 if (server.tryWait()) {
68 cerr << "zombie collected (old pid=" << old_pid << "): " << server << "\n";
69 pids.erase(old_pid);
70 // restart
71 startServer(server);
72 }
73 if (!server.isListening()) {
74 return false;
75 }
76 }
77 return true;
78 }};
79 if (!server_is_listening()) {
80 return false;
81 }
82 }
83
84 return true;
85 }
86
87 bool Cluster::startServer(Server &server) {
88 auto pid = server.start();
89 if (pid.has_value()) {
90 pids[*pid] = &server;
91 return true;
92 }
93 return false;
94 }
95
96 void Cluster::wait() {
97 siginfo_t inf;
98
99 while (!isStopped()) {
100 if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) {
101 perror("Cluster::wait waitid()");
102 return;
103 }
104
105 auto server = pids.find(inf.si_pid);
106 if (server != pids.end()) {
107 server->second->wait();
108 }
109 }
110 }