c3c625cf04eb225069a08d4b5f732ea68132eade
[awesomized/libmemcached] / test / lib / Cluster.cpp
1 #include "Cluster.hpp"
2 #include "Retry.hpp"
3
4 #include <sys/wait.h>
5
6 Cluster::Cluster(Server serv, uint16_t cnt)
7 : count{cnt}
8 , proto{move(serv)}
9 {
10 if (!cnt) {
11 count = thread::hardware_concurrency()/2 ?: 4;
12 }
13 reset();
14 }
15
16 Cluster::~Cluster() {
17 stop();
18 wait();
19 }
20
21 const vector<Server> &Cluster::getServers() const {
22 return cluster;
23 }
24
25 void Cluster::reset() {
26 pids.clear();
27 cluster.clear();
28 for (int i = 0; i < count; ++i) {
29 cluster.push_back(proto);
30 }
31 }
32
33 bool Cluster::start() {
34 bool started = true;
35
36 for (auto &server : cluster) {
37 if (!startServer(server)) {
38 started = false;
39 }
40 }
41
42 return started;
43 }
44
45 void Cluster::stop() {
46 for (auto &server : cluster) {
47 server.drain();
48 // no cookies for memcached; TERM is just too slow
49 server.signal(SIGKILL);
50 }
51 }
52
53 bool Cluster::isStopped() {
54 for (auto &server : cluster) {
55 if (server.getPid() && !server.tryWait()) {
56 return false;
57 }
58 }
59 return true;
60 }
61
62 bool Cluster::isListening() {
63 for (auto &server : cluster) {
64 Retry server_is_listening{[&] {
65 if (!server.isListening()) {
66 // zombie?
67 auto old_pid = server.getPid();
68 if (server.tryWait()) {
69 cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
70 pids.erase(old_pid);
71 // restart
72 startServer(server);
73 }
74 if (!server.isListening()) {
75 return false;
76 }
77 }
78 return true;
79 }};
80 if (!server_is_listening()) {
81 return false;
82 }
83 }
84
85 return true;
86 }
87
88 bool Cluster::startServer(Server &server) {
89 if (server.start().has_value()) {
90 pids[server.getPid()] = &server;
91 return true;
92 }
93 return false;
94 }
95
96 void Cluster::wait() {
97 siginfo_t inf;
98
99 while (!isStopped()) {
100 if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) {
101 perror("Cluster::wait waitid()");
102 return;
103 }
104
105 auto server = pids.find(inf.si_pid);
106 if (server != pids.end()) {
107 server->second->wait();
108 }
109 }
110 }