dddb68d9f733ceecf98da9ae9f3e5bca83bce323
[m6w6/libmemcached] / test / lib / Server.cpp
1 #include "Server.hpp"
2 #include "Retry.hpp"
3 #include "ForkAndExec.hpp"
4
5 #include <sys/wait.h>
6 #include <unistd.h>
7
8 Server::Server(string binary_, Server::argv_t args_)
9 : binary{move(binary_)}
10 , args{move(args_)}
11 {}
12
13 Server::~Server() {
14 stop();
15 wait();
16 if (pipe != -1) {
17 close(pipe);
18 }
19 if (holds_alternative<string>(socket_or_port)) {
20 unlink(get<string>(socket_or_port).c_str());
21 }
22 }
23
24 static inline string extractArg(const Server::arg_t &arg_cont, const string &func_arg) {
25 if (holds_alternative<string>(arg_cont)) {
26 return get<string>(arg_cont);
27 } else {
28 return get<Server::arg_func_t>(arg_cont)(func_arg);
29 }
30 }
31
32 static inline void pushArg(vector<char *> &arr, const string &arg) {
33 auto len = arg.size();
34 auto str = arg.data(), end = str + len + 1;
35 auto ptr = new char[len + 1];
36 copy(str, end, ptr);
37 arr.push_back(ptr);
38 }
39
40 optional<string> Server::handleArg(vector<char *> &arr, const string &arg, const arg_func_t &next_arg) {
41 pushArg(arr, arg);
42 if (arg == "-U" || arg == "--udp-port") {
43 auto port = next_arg(arg);
44 pushArg(arr, port);
45 pushArg(arr, "-p");
46 pushArg(arr, port);
47 socket_or_port = stoi(port);
48 return port;
49 } else if (arg == "-p" || arg == "--port") {
50 auto port = next_arg(arg);
51 pushArg(arr, port);
52 socket_or_port = stoi(port);
53 return port;
54 } else if (arg == "-s" || arg == "--unix-socket") {
55 auto sock = next_arg(arg);
56 pushArg(arr, sock);
57 socket_or_port = sock;
58 return sock;
59 } else if (arg == "-S" || arg == "--enable-sasl") {
60 sasl = true;
61 }
62 return {};
63 }
64
65 [[nodiscard]]
66 vector<char *> Server::createArgv() {
67 vector<char *> arr;
68
69 pushArg(arr, binary);
70 pushArg(arr, "-u");
71 pushArg(arr, "nobody");
72
73 for (auto it = args.cbegin(); it != args.cend(); ++it) {
74 if (holds_alternative<arg_t>(*it)) {
75 // a single argument
76 auto arg = extractArg(get<arg_t>(*it), binary);
77 handleArg(arr, arg, [&it](const string &arg_) {
78 return extractArg(get<arg_t>(*++it), arg_);
79 });
80 } else {
81 // an argument pair
82 auto &[one, two] = get<arg_pair_t>(*it);
83 auto arg_one = extractArg(one, binary);
84 auto arg_two = extractArg(two, arg_one);
85
86 auto next = handleArg(arr, arg_one, [&arg_two](const string &) {
87 return arg_two;
88 });
89
90 if (!next.has_value()) {
91 pushArg(arr, arg_two);
92 }
93 }
94 }
95
96 arr.push_back(nullptr);
97
98 return arr;
99 }
100
101 optional<Server::ChildProc> Server::start() {
102 if (!pid) {
103 auto argv = createArgv();
104 ForkAndExec fork_and_exec{binary.c_str(), argv.data()};
105
106 pipe = fork_and_exec.createPipe();
107 pid = fork_and_exec();
108
109 for (auto argp : argv) {
110 delete [] argp;
111 }
112
113 if (!pid) {
114 return {};
115 }
116 }
117 return ChildProc{pid, pipe};
118 }
119
120 bool Server::isListening() const {
121 MemcachedPtr memc;
122
123 if (holds_alternative<string>(socket_or_port)) {
124 if (memcached_server_add_unix_socket(*memc, get<string>(socket_or_port).c_str())) {
125 return false;
126 }
127 } else {
128 if (memcached_server_add(*memc, "localhost", get<int>(socket_or_port))) {
129 return false;
130 }
131 }
132
133 if (sasl) {
134 memcached_behavior_set(*memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
135 memcached_set_sasl_auth_data(*memc, "memcached", "memcached");
136 }
137
138 Malloced stat(memcached_stat(*memc, nullptr, nullptr));
139 if (!*stat || !stat->pid || stat->pid == -1) {
140 return false;
141 }
142 if (stat->pid != pid) {
143 cerr << "Another server is listening on " << socket_or_port
144 << " (expected pid " << pid << " found pid " << stat->pid << ")\n";
145 return false;
146 }
147
148 return true;
149 }
150
151 bool Server::ensureListening() {
152 if (!start()) {
153 return false;
154 }
155 return Retry{[this] {
156 again:
157 start();
158 if (!isListening()) {
159 auto old = pid;
160 if (tryWait()) {
161 cerr << "Collected zombie " << *this << "(old pid=" << old << ")\n";
162 goto again;
163 }
164 }
165 return isListening();
166 }}();
167 }
168
169 bool Server::stop() {
170 if (!pid) {
171 return true;
172 }
173 if (signalled[SIGTERM]) {
174 return signal(SIGKILL);
175 } else {
176 return signal(SIGTERM);
177 }
178 }
179
180 bool Server::signal(int signo) {
181 if (!pid) {
182 return false;
183 }
184 signalled[signo] += 1;
185 return 0 <= kill(pid, signo);
186 }
187
188 bool Server::check() {
189 return signal(0);
190 }
191
192 bool Server::wait(int flags) {
193 if (pid && pid == waitpid(pid, &status, flags)) {
194 if (drain().length() &&
195 output.rfind("Signal handled: Terminated", 0) != 0) {
196 cerr << "Output of " << *this << ":\n";
197
198 istringstream iss{output};
199 string line;
200
201 while (getline(iss, line)) {
202 cerr << " " << line << "\n";
203 }
204
205 if (output.back() != '\n') {
206 cerr << endl;
207 }
208 output.clear();
209 }
210 pid = 0;
211 if (pipe != -1) {
212 close(pipe);
213 pipe = -1;
214 }
215 return true;
216 }
217 return false;
218 }
219
220 bool Server::tryWait() {
221 return wait(WNOHANG);
222 }
223
224 Server::Server(const Server &s) {
225 binary = s.binary;
226 args = s.args;
227 socket_or_port = s.socket_or_port;
228 }
229
230 Server &Server::operator=(const Server &s) {
231 binary = s.binary;
232 args = s.args;
233 socket_or_port = s.socket_or_port;
234 return *this;
235 }
236
237 pid_t Server::getPid() const {
238 return pid;
239 }
240
241 const string &Server::getBinary() const {
242 return binary;
243 }
244
245 const Server::argv_t &Server::getArgs() const {
246 return args;
247 }
248
249 const socket_or_port_t &Server::getSocketOrPort() const {
250 return socket_or_port;
251 }
252
253 int Server::getPipe() const {
254 return pipe;
255 }
256
257 string &Server::drain() {
258 if (pipe != -1) {
259 again:
260 char read_buf[1<<12];
261 auto read_len = read(pipe, read_buf, sizeof(read_buf));
262
263 if (read_len > 0) {
264 output.append(read_buf, read_len);
265 goto again;
266 }
267 if (read_len == -1) {
268 switch (errno) {
269 case EINTR:
270 goto again;
271 default:
272 perror("Server::drain read()");
273 [[fallthrough]];
274 case EAGAIN:
275 #if EWOULDBLOCK != EAGAIN
276 case EWOULDBLOCK:
277 #endif
278 break;
279 }
280 }
281 }
282 return output;
283 }