4fcdb47bcbc358e94664f4dc3fe07fc19706dbbd
[m6w6/libmemcached] / test / lib / Cluster.cpp
1 #include "Cluster.hpp"
2 #include "Retry.hpp"
3
4 #include <sys/wait.h>
5
6 Cluster::Cluster(Server serv, uint16_t cnt)
7 : count{cnt}
8 , proto{move(serv)}
9 {
10 if (count < 4) {
11 count = stoi(getenv_else("MEMCACHED_CLUSTER", "4"));
12 }
13 if (!count) {
14 count = 1;
15 }
16 for (int i = 0; i < count; ++i) {
17 cluster.push_back(proto);
18 }
19 }
20
21 Cluster::Cluster(vector<Server> servers)
22 : count{servers.size()}
23 , cluster{move(servers)}
24 {
25
26 }
27
28 Cluster::~Cluster() {
29 stop();
30 wait();
31 }
32
33 const vector<Server> &Cluster::getServers() const {
34 return cluster;
35 }
36
37 bool Cluster::start() {
38 bool started = true;
39
40 for (auto &server : cluster) {
41 if (!startServer(server)) {
42 started = false;
43 }
44 }
45
46 return started;
47 }
48
49 void Cluster::stop() {
50 for (auto &server : cluster) {
51 server.drain();
52 // no cookies for memcached; TERM is just too slow
53 server.signal(SIGKILL);
54 }
55 }
56
57 bool Cluster::isStopped() {
58 for (auto &server : cluster) {
59 if (server.getPid() && !server.tryWait()) {
60 return false;
61 }
62 }
63 return true;
64 }
65
66 bool Cluster::isListening() {
67 for (auto &server : cluster) {
68 Retry server_is_listening{[&] {
69 if (!server.isListening()) {
70 // zombie?
71 auto old_pid = server.getPid();
72 if (server.tryWait()) {
73 cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
74 pids.erase(old_pid);
75 // restart
76 startServer(server);
77 }
78 if (!server.isListening()) {
79 return false;
80 }
81 }
82 return true;
83 }};
84 if (!server_is_listening()) {
85 return false;
86 }
87 }
88
89 return true;
90 }
91
92 bool Cluster::startServer(Server &server) {
93 if (server.start().has_value()) {
94 pids[server.getPid()] = &server;
95 return true;
96 }
97 return false;
98 }
99
100 void Cluster::wait() {
101 siginfo_t inf;
102
103 while (!isStopped()) {
104 if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) {
105 perror("Cluster::wait waitid()");
106 return;
107 }
108
109 auto server = pids.find(inf.si_pid);
110 if (server != pids.end()) {
111 server->second->wait();
112 }
113 }
114 }
115
116 bool Cluster::restart() {
117 stop();
118 wait();
119 return start();
120 }