WIP
[m6w6/libmemcached] / test / lib / Server.cpp
1 #include "Server.hpp"
2 #include "Retry.hpp"
3 #include "ForkAndExec.hpp"
4
5 #include <sys/wait.h>
6 #if HAVE_UNISTD_H
7 # include <unistd.h>
8 #endif
9
10 Server::Server(string binary_, Server::argv_t args_)
11 : binary{move(binary_)}
12 , args{move(args_)}
13 {}
14
15 Server::~Server() {
16 stop();
17 wait();
18 if (pipe != -1) {
19 close(pipe);
20 }
21 if (holds_alternative<string>(socket_or_port)) {
22 unlink(get<string>(socket_or_port).c_str());
23 }
24 }
25
26 static inline string extractArg(const Server::arg_t &arg_cont, const string &func_arg) {
27 if (holds_alternative<string>(arg_cont)) {
28 return get<string>(arg_cont);
29 } else {
30 return get<Server::arg_func_t>(arg_cont)(func_arg);
31 }
32 }
33
34 static inline void pushArg(vector<char *> &arr, const string &arg) {
35 auto len = arg.size();
36 auto str = arg.data(), end = str + len + 1;
37 auto ptr = new char[len + 1];
38 copy(str, end, ptr);
39 arr.push_back(ptr);
40 }
41
42 optional<string> Server::handleArg(vector<char *> &arr, const string &arg, const arg_func_t &next_arg) {
43 pushArg(arr, arg);
44 if (arg == "-U" || arg == "--udp-port") {
45 auto port = next_arg(arg);
46 pushArg(arr, port);
47 pushArg(arr, "-p");
48 pushArg(arr, port);
49 socket_or_port = stoi(port);
50 return port;
51 } else if (arg == "-p" || arg == "--port") {
52 auto port = next_arg(arg);
53 pushArg(arr, port);
54 socket_or_port = stoi(port);
55 return port;
56 } else if (arg == "-s" || arg == "--unix-socket") {
57 auto sock = next_arg(arg);
58 pushArg(arr, sock);
59 socket_or_port = sock;
60 return sock;
61 } else if (arg == "-S" || arg == "--enable-sasl") {
62 sasl = true;
63 }
64 return {};
65 }
66
67 [[nodiscard]]
68 vector<char *> Server::createArgv() {
69 vector<char *> arr;
70
71 pushArg(arr, binary);
72 pushArg(arr, "-u");
73 pushArg(arr, "nobody");
74
75 for (auto it = args.cbegin(); it != args.cend(); ++it) {
76 if (holds_alternative<arg_t>(*it)) {
77 // a single argument
78 auto arg = extractArg(get<arg_t>(*it), binary);
79 handleArg(arr, arg, [&it](const string &arg_) {
80 return extractArg(get<arg_t>(*++it), arg_);
81 });
82 } else {
83 // an argument pair
84 auto &[one, two] = get<arg_pair_t>(*it);
85 auto arg_one = extractArg(one, binary);
86 auto arg_two = extractArg(two, arg_one);
87
88 auto next = handleArg(arr, arg_one, [&arg_two](const string &) {
89 return arg_two;
90 });
91
92 if (!next.has_value()) {
93 pushArg(arr, arg_two);
94 }
95 }
96 }
97
98 arr.push_back(nullptr);
99
100 return arr;
101 }
102
103 optional<Server::ChildProc> Server::start() {
104 if (!pid) {
105 auto argv = createArgv();
106 ForkAndExec fork_and_exec{binary.c_str(), argv.data()};
107
108 pipe = fork_and_exec.createPipe();
109 pid = fork_and_exec();
110
111 for (auto argp : argv) {
112 delete [] argp;
113 }
114
115 if (!pid) {
116 return {};
117 }
118 }
119 return ChildProc{pid, pipe};
120 }
121
122 bool Server::isListening() const {
123 MemcachedPtr memc;
124
125 if (holds_alternative<string>(socket_or_port)) {
126 if (memcached_server_add_unix_socket(*memc, get<string>(socket_or_port).c_str())) {
127 return false;
128 }
129 } else {
130 if (memcached_server_add(*memc, "localhost", get<int>(socket_or_port))) {
131 return false;
132 }
133 }
134
135 if (sasl) {
136 memcached_behavior_set(*memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
137 memcached_set_sasl_auth_data(*memc, "memcached", "memcached");
138 }
139
140 Malloced stat(memcached_stat(*memc, nullptr, nullptr));
141 if (!*stat || !stat->pid || stat->pid == -1) {
142 return false;
143 }
144 if (stat->pid != pid) {
145 cerr << "Another server is listening on " << socket_or_port
146 << " (expected pid " << pid << " found pid " << stat->pid << ")\n";
147 return false;
148 }
149
150 return true;
151 }
152
153 bool Server::ensureListening() {
154 if (!start()) {
155 return false;
156 }
157 return Retry{[this] {
158 again:
159 start();
160 if (!isListening()) {
161 auto old = pid;
162 if (tryWait()) {
163 cerr << "Collected zombie " << *this << "(old pid=" << old << ")\n";
164 goto again;
165 }
166 }
167 return isListening();
168 }}();
169 }
170
171 bool Server::stop() {
172 if (!pid) {
173 return true;
174 }
175 if (signalled[SIGTERM]) {
176 return signal(SIGKILL);
177 } else {
178 return signal(SIGTERM);
179 }
180 }
181
182 bool Server::signal(int signo) {
183 if (!pid) {
184 return false;
185 }
186 signalled[signo] += 1;
187 return 0 <= kill(pid, signo);
188 }
189
190 bool Server::check() {
191 return signal(0);
192 }
193
194 bool Server::wait(int flags) {
195 if (pid && pid == waitpid(pid, &status, flags)) {
196 if (drain().length() &&
197 output.rfind("Signal handled: Terminated", 0) != 0) {
198 cerr << "Output of " << *this << ":\n";
199
200 istringstream iss{output};
201 string line;
202
203 while (getline(iss, line)) {
204 cerr << " " << line << "\n";
205 }
206
207 if (output.back() != '\n') {
208 cerr << endl;
209 }
210 output.clear();
211 }
212 pid = 0;
213 if (pipe != -1) {
214 close(pipe);
215 pipe = -1;
216 }
217 return true;
218 }
219 return false;
220 }
221
222 bool Server::tryWait() {
223 return wait(WNOHANG);
224 }
225
226 Server::Server(const Server &s) {
227 binary = s.binary;
228 args = s.args;
229 socket_or_port = s.socket_or_port;
230 }
231
232 Server &Server::operator=(const Server &s) {
233 binary = s.binary;
234 args = s.args;
235 socket_or_port = s.socket_or_port;
236 return *this;
237 }
238
239 pid_t Server::getPid() const {
240 return pid;
241 }
242
243 const string &Server::getBinary() const {
244 return binary;
245 }
246
247 const Server::argv_t &Server::getArgs() const {
248 return args;
249 }
250
251 const socket_or_port_t &Server::getSocketOrPort() const {
252 return socket_or_port;
253 }
254
255 int Server::getPipe() const {
256 return pipe;
257 }
258
259 string &Server::drain() {
260 if (pipe != -1) {
261 again:
262 char read_buf[1<<12];
263 auto read_len = read(pipe, read_buf, sizeof(read_buf));
264
265 if (read_len > 0) {
266 output.append(read_buf, read_len);
267 goto again;
268 }
269 if (read_len == -1) {
270 switch (errno) {
271 case EINTR:
272 goto again;
273 default:
274 perror("Server::drain read()");
275 [[fallthrough]];
276 case EAGAIN:
277 #if EWOULDBLOCK != EAGAIN
278 case EWOULDBLOCK:
279 #endif
280 break;
281 }
282 }
283 }
284 return output;
285 }