testing: UDP test
[awesomized/libmemcached] / test / lib / Cluster.cpp
1 #include "Cluster.hpp"
2 #include "Retry.hpp"
3
4 #include <sys/wait.h>
5
6 Cluster::Cluster(Server serv, uint16_t cnt)
7 : count{cnt}
8 , proto{move(serv)}
9 {
10 if (count < 4) {
11 count = stoi(getenv_else("MEMCACHED_CLUSTER", "4"));
12 }
13 if (!count) {
14 count = 1;
15 }
16 reset();
17 }
18
19 Cluster::~Cluster() {
20 stop();
21 wait();
22 }
23
24 const vector<Server> &Cluster::getServers() const {
25 return cluster;
26 }
27
28 void Cluster::reset() {
29 pids.clear();
30 cluster.clear();
31 for (int i = 0; i < count; ++i) {
32 cluster.push_back(proto);
33 }
34 }
35
36 bool Cluster::start() {
37 bool started = true;
38
39 for (auto &server : cluster) {
40 if (!startServer(server)) {
41 started = false;
42 }
43 }
44
45 return started;
46 }
47
48 void Cluster::stop() {
49 for (auto &server : cluster) {
50 server.drain();
51 // no cookies for memcached; TERM is just too slow
52 server.signal(SIGKILL);
53 }
54 }
55
56 bool Cluster::isStopped() {
57 for (auto &server : cluster) {
58 if (server.getPid() && !server.tryWait()) {
59 return false;
60 }
61 }
62 return true;
63 }
64
65 bool Cluster::isListening() {
66 for (auto &server : cluster) {
67 Retry server_is_listening{[&] {
68 if (!server.isListening()) {
69 // zombie?
70 auto old_pid = server.getPid();
71 if (server.tryWait()) {
72 cerr << "Collected zombie " << server << "(old pid=" << old_pid << ")\n";
73 pids.erase(old_pid);
74 // restart
75 startServer(server);
76 }
77 if (!server.isListening()) {
78 return false;
79 }
80 }
81 return true;
82 }};
83 if (!server_is_listening()) {
84 return false;
85 }
86 }
87
88 return true;
89 }
90
91 bool Cluster::startServer(Server &server) {
92 if (server.start().has_value()) {
93 pids[server.getPid()] = &server;
94 return true;
95 }
96 return false;
97 }
98
99 void Cluster::wait() {
100 siginfo_t inf;
101
102 while (!isStopped()) {
103 if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) {
104 perror("Cluster::wait waitid()");
105 return;
106 }
107
108 auto server = pids.find(inf.si_pid);
109 if (server != pids.end()) {
110 server->second->wait();
111 }
112 }
113 }