prepare v1.1.4
[awesomized/libmemcached] / test / lib / Cluster.cpp
1 #include "Cluster.hpp"
2 #include "Retry.hpp"
3
4 #include <algorithm>
5 #if HAVE_EXECUTION && HAVE_TBB
6 # include <execution>
7 #endif
8 #include <sys/wait.h>
9
10 Cluster::Cluster(Server serv, size_t cnt)
11 : count{cnt}
12 , proto{move(serv)}
13 {
14 if (!count) {
15 count = 1;
16 }
17 for (size_t i = 0; i < count; ++i) {
18 cluster.push_back(proto);
19 }
20 }
21
22 Cluster::~Cluster() {
23 stop();
24 wait();
25 }
26
27 const vector<Server> &Cluster::getServers() const {
28 return cluster;
29 }
30
31 bool Cluster::start() {
32 bool started = true;
33
34 for (auto &server : cluster) {
35 if (!server.start()) {
36 started = false;
37 continue;
38 }
39 pids[server.getPid()] = &server;
40 }
41
42 return started;
43 }
44
45 void Cluster::stop(bool graceful) {
46 for (auto &server : cluster) {
47 server.drain();
48 if (graceful) {
49 server.stop();
50 } else {
51 // no cookies for memcached; TERM is just too slow
52 server.signal(SIGKILL);
53 }
54 }
55 }
56
57 bool Cluster::isStopped() {
58 return none_of(
59 #if HAVE_EXECUTION && HAVE_TBB
60 execution::par,
61 #endif
62 cluster.begin(), cluster.end(), [](Server &s) {
63 return s.getPid() && !s.tryWait();
64 });
65 }
66
67 bool Cluster::isListening() const {
68 return all_of(
69 #if HAVE_EXECUTION && HAVE_TBB
70 execution::par,
71 #endif
72 cluster.begin(), cluster.end(), [](const Server &s) {
73 return s.isListening();
74 });
75 }
76
77 bool Cluster::ensureListening() {
78 if (!start()) {
79 return false;
80 }
81 auto listening = all_of(
82 #if HAVE_EXECUTION && HAVE_TBB
83 execution::par,
84 #endif
85 cluster.begin(), cluster.end(), [](Server &s) {
86 return s.ensureListening();
87 });
88
89 pids.clear();
90 for (auto &server : cluster) {
91 pids[server.getPid()] = &server;
92 }
93
94 return listening;
95 }
96
97 void Cluster::wait() {
98 siginfo_t inf;
99
100 while (!isStopped()) {
101 #if HAVE_WAITID_NOWAIT
102 if (waitid(P_ALL, 0, &inf, WEXITED | WNOWAIT)) {
103 perror("Cluster::wait waitid()");
104 return;
105 }
106
107 auto server = pids.find(inf.si_pid);
108 if (server != pids.end()) {
109 server->second->wait();
110 }
111 #else
112 this_thread::sleep_for(100ms);
113 #endif
114 }
115 }
116
117 bool Cluster::restart() {
118 stop();
119 wait();
120 return start();
121 }