739e349d1be6535d14c4f59fd3dfe0e007f8d6a1
[awesomized/libmemcached] / src / libmemcached / connect.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17 #include "p9y/poll.hpp"
18
19 #include <cassert>
20
21 static memcached_return_t set_hostinfo(memcached_instance_st *server) {
22 assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
23 assert(server->hostname());
24
25 server->clear_addrinfo();
26
27 char str_host[MEMCACHED_NI_MAXHOST] = {0}, str_port[MEMCACHED_NI_MAXSERV] = {0};
28 errno = 0;
29
30 auto length = snprintf(str_port, MEMCACHED_NI_MAXSERV, "%u", uint32_t(server->port()));
31 if (length <= 0 or errno) {
32 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
33 memcached_literal_param("snprintf(NI_MAXSERV)"));
34 }
35
36 struct addrinfo hints{};
37 hints.ai_family = AF_UNSPEC;
38 if (memcached_is_udp(server->root)) {
39 hints.ai_protocol = IPPROTO_UDP;
40 hints.ai_socktype = SOCK_DGRAM;
41 } else {
42 hints.ai_protocol = IPPROTO_TCP;
43 hints.ai_socktype = SOCK_STREAM;
44 }
45
46 auto hostname = server->hostname();
47 if (*hostname == '[') {
48 auto closing_bracket = &hostname[strlen(hostname) - 1];
49 if (*closing_bracket == ']') {
50 auto host_len = closing_bracket - hostname - 1;
51 if (host_len < MEMCACHED_NI_MAXHOST) {
52 hostname = strncpy(str_host, hostname + 1, host_len);
53 }
54 }
55 }
56
57 auto errcode = getaddrinfo(hostname, str_port, &hints, &server->address_info);
58 switch (errcode) {
59 case 0:
60 server->address_info_next = server->address_info;
61 server->state = MEMCACHED_SERVER_STATE_ADDRINFO;
62 break;
63
64 case EAI_AGAIN:
65 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
66 memcached_string_make_from_cstr(gai_strerror(errcode)));
67
68 case EAI_SYSTEM:
69 server->clear_addrinfo();
70 return memcached_set_errno(*server, errno, MEMCACHED_AT,
71 memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
72
73 case EAI_BADFLAGS:
74 server->clear_addrinfo();
75 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
76 memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
77
78 case EAI_MEMORY:
79 server->clear_addrinfo();
80 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
81 memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
82
83 default: {
84 server->clear_addrinfo();
85 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT,
86 memcached_string_make_from_cstr(gai_strerror(errcode)));
87 }
88 }
89
90 return MEMCACHED_SUCCESS;
91 }
92
93 static inline void set_socket_nonblocking(memcached_instance_st *server) {
94 #if defined(_WIN32)
95 u_long arg = 1;
96 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR) {
97 memcached_set_errno(*server, get_socket_errno(), NULL);
98 }
99 #else
100 int flags;
101
102 if (SOCK_NONBLOCK == 0) {
103 do {
104 flags = fcntl(server->fd, F_GETFL, 0);
105 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
106
107 if (flags == -1) {
108 memcached_set_errno(*server, errno, NULL);
109 } else if ((flags & O_NONBLOCK) == 0) {
110 int rval;
111
112 do {
113 rval = fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
114 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
115
116 if (rval == -1) {
117 memcached_set_errno(*server, errno, NULL);
118 }
119 }
120 }
121 #endif
122 }
123
124 static bool set_socket_options(memcached_instance_st *server) {
125 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
126
127 #ifdef HAVE_FCNTL
128 // If SOCK_CLOEXEC exists then we don't need to call the following
129 if (SOCK_CLOEXEC == 0) {
130 if (FD_CLOEXEC) {
131 int flags;
132 do {
133 flags = fcntl(server->fd, F_GETFD, 0);
134 } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
135
136 if (flags != -1) {
137 int rval;
138 do {
139 rval = fcntl(server->fd, F_SETFD, flags | FD_CLOEXEC);
140 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
141 // we currently ignore the case where rval is -1
142 }
143 }
144 }
145 #endif
146
147 if (memcached_is_udp(server->root)) {
148 return true;
149 }
150
151 #ifdef HAVE_SO_SNDTIMEO
152 if (server->root->snd_timeout > 0) {
153 struct timeval waittime;
154
155 waittime.tv_sec = server->root->snd_timeout / 1000000;
156 waittime.tv_usec = server->root->snd_timeout % 1000000;
157
158 int error = setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &waittime,
159 (socklen_t) sizeof(struct timeval));
160 (void) error;
161 assert(error == 0);
162 }
163 #endif
164
165 #ifdef HAVE_SO_RCVTIMEO
166 if (server->root->rcv_timeout > 0) {
167 struct timeval waittime;
168
169 waittime.tv_sec = server->root->rcv_timeout / 1000000;
170 waittime.tv_usec = server->root->rcv_timeout % 1000000;
171
172 int error = setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &waittime,
173 (socklen_t) sizeof(struct timeval));
174 (void) (error);
175 assert(error == 0);
176 }
177 #endif
178
179 #if defined(_WIN32)
180 #else
181 # if defined(SO_NOSIGPIPE)
182 if (SO_NOSIGPIPE) {
183 int set = 1;
184 int error = setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *) &set, sizeof(int));
185
186 assert(error == 0);
187
188 // This is not considered a fatal error
189 if (error == -1) {
190 # if 0
191 perror("setsockopt(SO_NOSIGPIPE)");
192 # endif
193 }
194 }
195 # endif // SO_NOSIGPIPE
196 #endif // _WIN32
197
198 if (server->root->flags.no_block) {
199 struct linger linger;
200
201 linger.l_onoff = 1;
202 linger.l_linger = 0; /* By default on close() just drop the socket */
203 int error = setsockopt(server->fd, SOL_SOCKET, SO_LINGER, (char *) &linger,
204 (socklen_t) sizeof(struct linger));
205 (void) (error);
206 assert(error == 0);
207 }
208
209 if (TCP_NODELAY) {
210 if (server->root->flags.tcp_nodelay) {
211 int flag = 1;
212
213 int error =
214 setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, (socklen_t) sizeof(int));
215 (void) (error);
216 assert(error == 0);
217 }
218 }
219
220 if (server->root->flags.tcp_keepalive) {
221 int flag = 1;
222
223 int error =
224 setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, (socklen_t) sizeof(int));
225 (void) (error);
226 assert(error == 0);
227 }
228
229 if (TCP_KEEPIDLE) {
230 if (server->root->tcp_keepidle > 0) {
231 int error = setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
232 (char *) &server->root->tcp_keepidle, (socklen_t) sizeof(int));
233 (void) (error);
234 assert(error == 0);
235 }
236 }
237
238 if (server->root->send_size > 0) {
239 int error = setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF, (char *) &server->root->send_size,
240 (socklen_t) sizeof(int));
241 (void) (error);
242 assert(error == 0);
243 }
244
245 if (server->root->recv_size > 0) {
246 int error = setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF, (char *) &server->root->recv_size,
247 (socklen_t) sizeof(int));
248 (void) (error);
249 assert(error == 0);
250 }
251
252 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
253 set_socket_nonblocking(server);
254
255 return true;
256 }
257
258 static memcached_return_t unix_socket_connect(memcached_instance_st *server) {
259 #ifndef _WIN32
260 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
261
262 do {
263 int type = SOCK_STREAM;
264 if (SOCK_CLOEXEC) {
265 type |= SOCK_CLOEXEC;
266 }
267
268 if (SOCK_NONBLOCK) {
269 type |= SOCK_NONBLOCK;
270 }
271
272 if ((server->fd = socket(AF_UNIX, type, 0)) == -1) {
273 return memcached_set_errno(*server, errno, NULL);
274 }
275
276 struct sockaddr_un servAddr;
277
278 memset(&servAddr, 0, sizeof(struct sockaddr_un));
279 servAddr.sun_family = AF_UNIX;
280 if (strlen(server->hostname()) >= sizeof(servAddr.sun_path)) {
281 return memcached_set_error(*server, MEMCACHED_FAIL_UNIX_SOCKET, MEMCACHED_AT);
282 }
283 strncpy(servAddr.sun_path, server->hostname(),
284 sizeof(servAddr.sun_path) - 1); /* Copy filename */
285
286 if (connect(server->fd, (struct sockaddr *) &servAddr, sizeof(servAddr)) == -1) {
287 switch (errno) {
288 case EINPROGRESS:
289 case EALREADY:
290 case EAGAIN:
291 server->events(POLLOUT);
292 break;
293
294 case EINTR:
295 server->reset_socket();
296 continue;
297
298 case EISCONN: /* We were spinning waiting on connect */
299 {
300 assert(0); // Programmer error
301 server->reset_socket();
302 continue;
303 }
304
305 default:
306 WATCHPOINT_ERRNO(errno);
307 server->reset_socket();
308 return memcached_set_errno(*server, errno, MEMCACHED_AT);
309 }
310 }
311 } while (0);
312 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
313
314 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
315
316 return MEMCACHED_SUCCESS;
317 #else
318 (void) server;
319 return MEMCACHED_NOT_SUPPORTED;
320 #endif
321 }
322
323 static memcached_return_t network_connect(memcached_instance_st *server) {
324 bool timeout_error_occured = false;
325
326 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
327 WATCHPOINT_ASSERT(server->cursor_active_ == 0);
328
329 /*
330 We want to check both of these because if address_info_next has been fully tried, we want to do
331 a new lookup to make sure we have picked up on any new DNS information.
332 */
333 if (server->address_info == NULL or server->address_info_next == NULL) {
334 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
335 server->address_info_next = NULL;
336 memcached_return_t rc = set_hostinfo(server);
337
338 if (memcached_failed(rc)) {
339 return rc;
340 }
341 }
342
343 assert(server->address_info_next);
344 assert(server->address_info);
345
346 /* Create the socket */
347 while (server->address_info_next and server->fd == INVALID_SOCKET) {
348 int type = server->address_info_next->ai_socktype;
349 if (SOCK_CLOEXEC) {
350 type |= SOCK_CLOEXEC;
351 }
352
353 if (SOCK_NONBLOCK) {
354 type |= SOCK_NONBLOCK;
355 }
356
357 server->fd =
358 socket(server->address_info_next->ai_family, type, server->address_info_next->ai_protocol);
359
360 if (int(server->fd) == SOCKET_ERROR) {
361 return memcached_set_errno(*server, get_socket_errno(), NULL);
362 }
363
364 if (set_socket_options(server) == false) {
365 server->reset_socket();
366 return MEMCACHED_CONNECTION_FAILURE;
367 }
368
369 /* connect to server */
370 if ((connect(server->fd, server->address_info_next->ai_addr,
371 server->address_info_next->ai_addrlen)
372 != SOCKET_ERROR))
373 {
374 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
375 return MEMCACHED_SUCCESS;
376 }
377
378 /* An error occurred */
379 int local_error = get_socket_errno();
380 switch (local_error) {
381 case ETIMEDOUT:
382 timeout_error_occured = true;
383 break;
384
385 #if EWOULDBLOCK != EAGAIN
386 case EWOULDBLOCK:
387 #endif
388 case EAGAIN:
389 case EINPROGRESS: // nonblocking mode - first return
390 case EALREADY: // nonblocking mode - subsequent returns
391 {
392 server->events(POLLOUT);
393 server->state = MEMCACHED_SERVER_STATE_IN_PROGRESS;
394 memcached_return_t rc = memcached_io_poll(server, IO_POLL_CONNECT, local_error);
395
396 if (memcached_success(rc)) {
397 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
398 return MEMCACHED_SUCCESS;
399 }
400
401 // A timeout here is treated as an error, we will not retry
402 if (rc == MEMCACHED_TIMEOUT) {
403 timeout_error_occured = true;
404 }
405 } break;
406
407 case EISCONN: // we are connected :-)
408 WATCHPOINT_ASSERT(0); // This is a programmer's error
409 break;
410
411 case EINTR: // Special case, we retry ai_addr
412 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
413 server->reset_socket();
414 continue;
415
416 case ECONNREFUSED:
417 // Probably not running service
418
419 default:
420 memcached_set_errno(*server, local_error, MEMCACHED_AT);
421 break;
422 }
423
424 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
425 server->reset_socket();
426 server->address_info_next = server->address_info_next->ai_next;
427 }
428
429 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
430
431 if (timeout_error_occured) {
432 server->reset_socket();
433 }
434
435 WATCHPOINT_STRING("Never got a good file descriptor");
436
437 if (memcached_has_current_error(*server)) {
438 return memcached_instance_error_return(server);
439 }
440
441 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS) {
442 return memcached_set_error(
443 *server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
444 memcached_literal_param(
445 "if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
446 }
447
448 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE,
449 MEMCACHED_AT); /* The last error should be from connect() */
450 }
451
452 /*
453 backoff_handling()
454
455 Based on time/failure count fail the connect without trying. This prevents waiting in a state
456 where we get caught spending cycles just waiting.
457 */
458 static memcached_return_t backoff_handling(memcached_instance_st *server, bool &in_timeout) {
459 struct timeval curr_time;
460 bool _gettime_success = (gettimeofday(&curr_time, NULL) == 0);
461
462 /*
463 If we hit server_failure_limit then something is completely wrong about the server.
464
465 1) If autoeject is enabled we do that.
466 2) If not? We go into timeout again, there is much else to do :(
467 */
468 if (server->server_failure_counter >= server->root->server_failure_limit) {
469 /*
470 We just auto_eject if we hit this point
471 */
472 if (_is_auto_eject_host(server->root)) {
473 set_last_disconnected_host(server);
474
475 // Retry dead servers if requested
476 if (_gettime_success and server->root->dead_timeout > 0) {
477 server->next_retry = curr_time.tv_sec + server->root->dead_timeout;
478
479 // We only retry dead servers once before assuming failure again
480 server->server_failure_counter = server->root->server_failure_limit - 1;
481 }
482
483 memcached_return_t rc;
484 if (memcached_failed(rc = run_distribution((memcached_st *) server->root))) {
485 return memcached_set_error(
486 *server, rc, MEMCACHED_AT,
487 memcached_literal_param("Backoff handling failed during run_distribution"));
488 }
489
490 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
491 }
492
493 server->state = MEMCACHED_SERVER_STATE_IN_TIMEOUT;
494
495 // Sanity check/setting
496 if (server->next_retry == 0) {
497 server->next_retry = 1;
498 }
499 }
500
501 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT) {
502 /*
503 If next_retry is less then our current time, then we reset and try everything again.
504 */
505 if (_gettime_success and server->next_retry < curr_time.tv_sec) {
506 server->state = MEMCACHED_SERVER_STATE_NEW;
507 server->server_timeout_counter = 0;
508 } else {
509 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
510 }
511
512 in_timeout = true;
513 }
514
515 return MEMCACHED_SUCCESS;
516 }
517
518 static memcached_return_t _memcached_connect(memcached_instance_st *server,
519 const bool set_last_disconnected) {
520 assert(server);
521 if (server->fd != INVALID_SOCKET) {
522 return MEMCACHED_SUCCESS;
523 }
524
525 LIBMEMCACHED_MEMCACHED_CONNECT_START();
526
527 bool in_timeout = false;
528 memcached_return_t rc;
529 if (memcached_failed(rc = backoff_handling(server, in_timeout))) {
530 set_last_disconnected_host(server);
531 return rc;
532 }
533
534 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks
535 and memcached_is_udp(server->root))
536 {
537 return memcached_set_error(
538 *server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT,
539 memcached_literal_param("SASL is not supported for UDP connections"));
540 }
541
542 if (server->hostname()[0] == '/') {
543 server->type = MEMCACHED_CONNECTION_UNIX_SOCKET;
544 }
545
546 /* We need to clean up the multi startup piece */
547 switch (server->type) {
548 case MEMCACHED_CONNECTION_UDP:
549 case MEMCACHED_CONNECTION_TCP:
550 rc = network_connect(server);
551
552 #if defined(LIBMEMCACHED_WITH_SASL_SUPPORT)
553 if (LIBMEMCACHED_WITH_SASL_SUPPORT) {
554 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks) {
555 rc = memcached_sasl_authenticate_connection(server);
556 if (memcached_failed(rc) and server->fd != INVALID_SOCKET) {
557 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
558 server->reset_socket();
559 }
560 }
561 }
562 #endif
563 break;
564
565 case MEMCACHED_CONNECTION_UNIX_SOCKET:
566 rc = unix_socket_connect(server);
567 break;
568 }
569
570 if (memcached_success(rc)) {
571 server->mark_server_as_clean();
572 memcached_version_instance(server);
573 return rc;
574 } else if (set_last_disconnected) {
575 set_last_disconnected_host(server);
576 if (memcached_has_current_error(*server)) {
577 memcached_mark_server_for_timeout(server);
578 assert(memcached_failed(memcached_instance_error_return(server)));
579 } else {
580 memcached_set_error(*server, rc, MEMCACHED_AT);
581 memcached_mark_server_for_timeout(server);
582 }
583
584 LIBMEMCACHED_MEMCACHED_CONNECT_END();
585
586 if (in_timeout) {
587 char buffer[1024];
588 int snprintf_length =
589 snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname(), int(server->port()));
590 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT,
591 buffer, snprintf_length);
592 }
593 }
594
595 return rc;
596 }
597
598 memcached_return_t memcached_connect(memcached_instance_st *server) {
599 return _memcached_connect(server, true);
600 }