testing: freebsd [travis skip]
[m6w6/libmemcached] / test / lib / Cluster.cpp
1 #include "Cluster.hpp"
2 #include "Retry.hpp"
3
4 #include <sys/wait.h>
5
6 Cluster::Cluster(Server serv, uint16_t cnt)
7 : count{cnt}
8 , proto{move(serv)}
9 {
10 if (count < 4) {
11 count = stoi(getenv_else("MEMCACHED_CLUSTER", "4"));
12 }
13 if (!count) {
14 count = 1;
15 }
16 for (int i = 0; i < count; ++i) {
17 cluster.push_back(proto);
18 }
19 }
20
21 Cluster::Cluster(vector<Server> servers)
22 : count{servers.size()}
23 , cluster{move(servers)}
24 {
25
26 }
27
28 Cluster::~Cluster() {
29 stop();
30 wait();
31 }
32
33 const vector<Server> &Cluster::getServers() const {
34 return cluster;
35 }
36
37 bool Cluster::start() {
38 bool started = true;
39
40 for (auto &server : cluster) {
41 if (!startServer(server)) {
42 started = false;
43 }
44 }
45
46 return started;
47 }
48
49 void Cluster::stop(bool graceful) {
50 for (auto &server : cluster) {
51 server.drain();
52 if (graceful) {
53 server.stop();
54 } else {
55 // no cookies for memcached; TERM is just too slow
56 server.signal(SIGKILL);
57 }
58 }
59 }
60
61 bool Cluster::isStopped() {
62 for (auto &server : cluster) {
63 if (server.getPid() && !server.tryWait()) {
64 return false;
65 }
66 }
67 return true;
68 }
69
70 bool Cluster::isListening() {
71 for (auto &server : cluster) {
72 Retry server_is_listening{[&] {
73 if (!server.isListening()) {
74 // zombie?
75 auto old_pid = server.getPid();
76 if (server.tryWait()) {
77 cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
78 pids.erase(old_pid);
79 // restart
80 startServer(server);
81 }
82 if (!server.isListening()) {
83 return false;
84 }
85 }
86 return true;
87 }};
88 if (!server_is_listening()) {
89 return false;
90 }
91 }
92
93 return true;
94 }
95
96 bool Cluster::startServer(Server &server) {
97 if (server.start().has_value()) {
98 pids[server.getPid()] = &server;
99 return true;
100 }
101 return false;
102 }
103
104 void Cluster::wait() {
105 siginfo_t inf;
106
107 while (!isStopped()) {
108 if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) {
109 perror("Cluster::wait waitid()");
110 return;
111 }
112
113 auto server = pids.find(inf.si_pid);
114 if (server != pids.end()) {
115 server->second->wait();
116 }
117 }
118 }
119
120 bool Cluster::restart() {
121 stop();
122 wait();
123 return start();
124 }