90f1b21756a5930299ba405a446af4d79c0d7498
[awesomized/libmemcached] / src / libmemcached / connect.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17 #include "p9y/poll.hpp"
18
19 #include <cassert>
20
21
22 static memcached_return_t set_hostinfo(memcached_instance_st *server) {
23 assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
24 server->clear_addrinfo();
25
26 char str_port[MEMCACHED_NI_MAXSERV] = {0};
27 errno = 0;
28 int length = snprintf(str_port, MEMCACHED_NI_MAXSERV, "%u", uint32_t(server->port()));
29 if (length >= MEMCACHED_NI_MAXSERV or length <= 0 or errno) {
30 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
31 memcached_literal_param("snprintf(NI_MAXSERV)"));
32 }
33
34 struct addrinfo hints;
35 memset(&hints, 0, sizeof(struct addrinfo));
36
37 hints.ai_family = AF_UNSPEC;
38 if (memcached_is_udp(server->root)) {
39 hints.ai_protocol = IPPROTO_UDP;
40 hints.ai_socktype = SOCK_DGRAM;
41 } else {
42 hints.ai_socktype = SOCK_STREAM;
43 hints.ai_protocol = IPPROTO_TCP;
44 }
45
46 assert(server->address_info == NULL);
47 assert(server->address_info_next == NULL);
48 int errcode;
49 assert(server->hostname());
50 switch (errcode = getaddrinfo(server->hostname(), str_port, &hints, &server->address_info)) {
51 case 0:
52 server->address_info_next = server->address_info;
53 server->state = MEMCACHED_SERVER_STATE_ADDRINFO;
54 break;
55
56 case EAI_AGAIN:
57 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
58 memcached_string_make_from_cstr(gai_strerror(errcode)));
59
60 case EAI_SYSTEM:
61 server->clear_addrinfo();
62 return memcached_set_errno(*server, errno, MEMCACHED_AT,
63 memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
64
65 case EAI_BADFLAGS:
66 server->clear_addrinfo();
67 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
68 memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
69
70 case EAI_MEMORY:
71 server->clear_addrinfo();
72 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
73 memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
74
75 default: {
76 server->clear_addrinfo();
77 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT,
78 memcached_string_make_from_cstr(gai_strerror(errcode)));
79 }
80 }
81
82 return MEMCACHED_SUCCESS;
83 }
84
85 static inline void set_socket_nonblocking(memcached_instance_st *server) {
86 #if defined(_WIN32)
87 u_long arg = 1;
88 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR) {
89 memcached_set_errno(*server, get_socket_errno(), NULL);
90 }
91 #else
92 int flags;
93
94 if (SOCK_NONBLOCK == 0) {
95 do {
96 flags = fcntl(server->fd, F_GETFL, 0);
97 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
98
99 if (flags == -1) {
100 memcached_set_errno(*server, errno, NULL);
101 } else if ((flags & O_NONBLOCK) == 0) {
102 int rval;
103
104 do {
105 rval = fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
106 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
107
108 if (rval == -1) {
109 memcached_set_errno(*server, errno, NULL);
110 }
111 }
112 }
113 #endif
114 }
115
116 static bool set_socket_options(memcached_instance_st *server) {
117 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
118
119 #ifdef HAVE_FCNTL
120 // If SOCK_CLOEXEC exists then we don't need to call the following
121 if (SOCK_CLOEXEC == 0) {
122 if (FD_CLOEXEC) {
123 int flags;
124 do {
125 flags = fcntl(server->fd, F_GETFD, 0);
126 } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
127
128 if (flags != -1) {
129 int rval;
130 do {
131 rval = fcntl(server->fd, F_SETFD, flags | FD_CLOEXEC);
132 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
133 // we currently ignore the case where rval is -1
134 }
135 }
136 }
137 #endif
138
139 if (memcached_is_udp(server->root)) {
140 return true;
141 }
142
143 #ifdef HAVE_SO_SNDTIMEO
144 if (server->root->snd_timeout > 0) {
145 struct timeval waittime;
146
147 waittime.tv_sec = server->root->snd_timeout / 1000000;
148 waittime.tv_usec = server->root->snd_timeout % 1000000;
149
150 int error = setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &waittime,
151 (socklen_t) sizeof(struct timeval));
152 (void) error;
153 assert(error == 0);
154 }
155 #endif
156
157 #ifdef HAVE_SO_RCVTIMEO
158 if (server->root->rcv_timeout > 0) {
159 struct timeval waittime;
160
161 waittime.tv_sec = server->root->rcv_timeout / 1000000;
162 waittime.tv_usec = server->root->rcv_timeout % 1000000;
163
164 int error = setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &waittime,
165 (socklen_t) sizeof(struct timeval));
166 (void) (error);
167 assert(error == 0);
168 }
169 #endif
170
171 #if defined(_WIN32)
172 #else
173 # if defined(SO_NOSIGPIPE)
174 if (SO_NOSIGPIPE) {
175 int set = 1;
176 int error = setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *) &set, sizeof(int));
177
178 assert(error == 0);
179
180 // This is not considered a fatal error
181 if (error == -1) {
182 # if 0
183 perror("setsockopt(SO_NOSIGPIPE)");
184 # endif
185 }
186 }
187 # endif // SO_NOSIGPIPE
188 #endif // _WIN32
189
190 if (server->root->flags.no_block) {
191 struct linger linger;
192
193 linger.l_onoff = 1;
194 linger.l_linger = 0; /* By default on close() just drop the socket */
195 int error = setsockopt(server->fd, SOL_SOCKET, SO_LINGER, (char *) &linger,
196 (socklen_t) sizeof(struct linger));
197 (void) (error);
198 assert(error == 0);
199 }
200
201 if (TCP_NODELAY) {
202 if (server->root->flags.tcp_nodelay) {
203 int flag = 1;
204
205 int error =
206 setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, (socklen_t) sizeof(int));
207 (void) (error);
208 assert(error == 0);
209 }
210 }
211
212 if (server->root->flags.tcp_keepalive) {
213 int flag = 1;
214
215 int error =
216 setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, (socklen_t) sizeof(int));
217 (void) (error);
218 assert(error == 0);
219 }
220
221 if (TCP_KEEPIDLE) {
222 if (server->root->tcp_keepidle > 0) {
223 int error = setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
224 (char *) &server->root->tcp_keepidle, (socklen_t) sizeof(int));
225 (void) (error);
226 assert(error == 0);
227 }
228 }
229
230 if (server->root->send_size > 0) {
231 int error = setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF, (char *) &server->root->send_size,
232 (socklen_t) sizeof(int));
233 (void) (error);
234 assert(error == 0);
235 }
236
237 if (server->root->recv_size > 0) {
238 int error = setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF, (char *) &server->root->recv_size,
239 (socklen_t) sizeof(int));
240 (void) (error);
241 assert(error == 0);
242 }
243
244 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
245 set_socket_nonblocking(server);
246
247 return true;
248 }
249
250 static memcached_return_t unix_socket_connect(memcached_instance_st *server) {
251 #ifndef _WIN32
252 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
253
254 do {
255 int type = SOCK_STREAM;
256 if (SOCK_CLOEXEC) {
257 type |= SOCK_CLOEXEC;
258 }
259
260 if (SOCK_NONBLOCK) {
261 type |= SOCK_NONBLOCK;
262 }
263
264 if ((server->fd = socket(AF_UNIX, type, 0)) == -1) {
265 return memcached_set_errno(*server, errno, NULL);
266 }
267
268 struct sockaddr_un servAddr;
269
270 memset(&servAddr, 0, sizeof(struct sockaddr_un));
271 servAddr.sun_family = AF_UNIX;
272 if (strlen(server->hostname()) >= sizeof(servAddr.sun_path)) {
273 return memcached_set_error(*server, MEMCACHED_FAIL_UNIX_SOCKET, MEMCACHED_AT);
274 }
275 strncpy(servAddr.sun_path, server->hostname(),
276 sizeof(servAddr.sun_path) - 1); /* Copy filename */
277
278 if (connect(server->fd, (struct sockaddr *) &servAddr, sizeof(servAddr)) == -1) {
279 switch (errno) {
280 case EINPROGRESS:
281 case EALREADY:
282 case EAGAIN:
283 server->events(POLLOUT);
284 break;
285
286 case EINTR:
287 server->reset_socket();
288 continue;
289
290 case EISCONN: /* We were spinning waiting on connect */
291 {
292 assert(0); // Programmer error
293 server->reset_socket();
294 continue;
295 }
296
297 default:
298 WATCHPOINT_ERRNO(errno);
299 server->reset_socket();
300 return memcached_set_errno(*server, errno, MEMCACHED_AT);
301 }
302 }
303 } while (0);
304 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
305
306 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
307
308 return MEMCACHED_SUCCESS;
309 #else
310 (void) server;
311 return MEMCACHED_NOT_SUPPORTED;
312 #endif
313 }
314
315 static memcached_return_t network_connect(memcached_instance_st *server) {
316 bool timeout_error_occured = false;
317
318 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
319 WATCHPOINT_ASSERT(server->cursor_active_ == 0);
320
321 /*
322 We want to check both of these because if address_info_next has been fully tried, we want to do
323 a new lookup to make sure we have picked up on any new DNS information.
324 */
325 if (server->address_info == NULL or server->address_info_next == NULL) {
326 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
327 server->address_info_next = NULL;
328 memcached_return_t rc = set_hostinfo(server);
329
330 if (memcached_failed(rc)) {
331 return rc;
332 }
333 }
334
335 assert(server->address_info_next);
336 assert(server->address_info);
337
338 /* Create the socket */
339 while (server->address_info_next and server->fd == INVALID_SOCKET) {
340 int type = server->address_info_next->ai_socktype;
341 if (SOCK_CLOEXEC) {
342 type |= SOCK_CLOEXEC;
343 }
344
345 if (SOCK_NONBLOCK) {
346 type |= SOCK_NONBLOCK;
347 }
348
349 server->fd =
350 socket(server->address_info_next->ai_family, type, server->address_info_next->ai_protocol);
351
352 if (int(server->fd) == SOCKET_ERROR) {
353 return memcached_set_errno(*server, get_socket_errno(), NULL);
354 }
355
356 if (set_socket_options(server) == false) {
357 server->reset_socket();
358 return MEMCACHED_CONNECTION_FAILURE;
359 }
360
361 /* connect to server */
362 if ((connect(server->fd, server->address_info_next->ai_addr,
363 server->address_info_next->ai_addrlen)
364 != SOCKET_ERROR))
365 {
366 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
367 return MEMCACHED_SUCCESS;
368 }
369
370 /* An error occurred */
371 int local_error = get_socket_errno();
372 switch (local_error) {
373 case ETIMEDOUT:
374 timeout_error_occured = true;
375 break;
376
377 #if EWOULDBLOCK != EAGAIN
378 case EWOULDBLOCK:
379 #endif
380 case EAGAIN:
381 case EINPROGRESS: // nonblocking mode - first return
382 case EALREADY: // nonblocking mode - subsequent returns
383 {
384 server->events(POLLOUT);
385 server->state = MEMCACHED_SERVER_STATE_IN_PROGRESS;
386 memcached_return_t rc = memcached_io_poll(server, IO_POLL_CONNECT, local_error);
387
388 if (memcached_success(rc)) {
389 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
390 return MEMCACHED_SUCCESS;
391 }
392
393 // A timeout here is treated as an error, we will not retry
394 if (rc == MEMCACHED_TIMEOUT) {
395 timeout_error_occured = true;
396 }
397 } break;
398
399 case EISCONN: // we are connected :-)
400 WATCHPOINT_ASSERT(0); // This is a programmer's error
401 break;
402
403 case EINTR: // Special case, we retry ai_addr
404 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
405 server->reset_socket();
406 continue;
407
408 case ECONNREFUSED:
409 // Probably not running service
410
411 default:
412 memcached_set_errno(*server, local_error, MEMCACHED_AT);
413 break;
414 }
415
416 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
417 server->reset_socket();
418 server->address_info_next = server->address_info_next->ai_next;
419 }
420
421 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
422
423 if (timeout_error_occured) {
424 server->reset_socket();
425 }
426
427 WATCHPOINT_STRING("Never got a good file descriptor");
428
429 if (memcached_has_current_error(*server)) {
430 return memcached_instance_error_return(server);
431 }
432
433 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS) {
434 return memcached_set_error(
435 *server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
436 memcached_literal_param(
437 "if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
438 }
439
440 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE,
441 MEMCACHED_AT); /* The last error should be from connect() */
442 }
443
444 /*
445 backoff_handling()
446
447 Based on time/failure count fail the connect without trying. This prevents waiting in a state
448 where we get caught spending cycles just waiting.
449 */
450 static memcached_return_t backoff_handling(memcached_instance_st *server, bool &in_timeout) {
451 struct timeval curr_time;
452 bool _gettime_success = (gettimeofday(&curr_time, NULL) == 0);
453
454 /*
455 If we hit server_failure_limit then something is completely wrong about the server.
456
457 1) If autoeject is enabled we do that.
458 2) If not? We go into timeout again, there is much else to do :(
459 */
460 if (server->server_failure_counter >= server->root->server_failure_limit) {
461 /*
462 We just auto_eject if we hit this point
463 */
464 if (_is_auto_eject_host(server->root)) {
465 set_last_disconnected_host(server);
466
467 // Retry dead servers if requested
468 if (_gettime_success and server->root->dead_timeout > 0) {
469 server->next_retry = curr_time.tv_sec + server->root->dead_timeout;
470
471 // We only retry dead servers once before assuming failure again
472 server->server_failure_counter = server->root->server_failure_limit - 1;
473 }
474
475 memcached_return_t rc;
476 if (memcached_failed(rc = run_distribution((memcached_st *) server->root))) {
477 return memcached_set_error(
478 *server, rc, MEMCACHED_AT,
479 memcached_literal_param("Backoff handling failed during run_distribution"));
480 }
481
482 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
483 }
484
485 server->state = MEMCACHED_SERVER_STATE_IN_TIMEOUT;
486
487 // Sanity check/setting
488 if (server->next_retry == 0) {
489 server->next_retry = 1;
490 }
491 }
492
493 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT) {
494 /*
495 If next_retry is less then our current time, then we reset and try everything again.
496 */
497 if (_gettime_success and server->next_retry < curr_time.tv_sec) {
498 server->state = MEMCACHED_SERVER_STATE_NEW;
499 server->server_timeout_counter = 0;
500 } else {
501 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
502 }
503
504 in_timeout = true;
505 }
506
507 return MEMCACHED_SUCCESS;
508 }
509
510 static memcached_return_t _memcached_connect(memcached_instance_st *server,
511 const bool set_last_disconnected) {
512 assert(server);
513 if (server->fd != INVALID_SOCKET) {
514 return MEMCACHED_SUCCESS;
515 }
516
517 LIBMEMCACHED_MEMCACHED_CONNECT_START();
518
519 bool in_timeout = false;
520 memcached_return_t rc;
521 if (memcached_failed(rc = backoff_handling(server, in_timeout))) {
522 set_last_disconnected_host(server);
523 return rc;
524 }
525
526 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks
527 and memcached_is_udp(server->root))
528 {
529 return memcached_set_error(
530 *server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT,
531 memcached_literal_param("SASL is not supported for UDP connections"));
532 }
533
534 if (server->hostname()[0] == '/') {
535 server->type = MEMCACHED_CONNECTION_UNIX_SOCKET;
536 }
537
538 /* We need to clean up the multi startup piece */
539 switch (server->type) {
540 case MEMCACHED_CONNECTION_UDP:
541 case MEMCACHED_CONNECTION_TCP:
542 rc = network_connect(server);
543
544 #if defined(LIBMEMCACHED_WITH_SASL_SUPPORT)
545 if (LIBMEMCACHED_WITH_SASL_SUPPORT) {
546 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks) {
547 rc = memcached_sasl_authenticate_connection(server);
548 if (memcached_failed(rc) and server->fd != INVALID_SOCKET) {
549 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
550 server->reset_socket();
551 }
552 }
553 }
554 #endif
555 break;
556
557 case MEMCACHED_CONNECTION_UNIX_SOCKET:
558 rc = unix_socket_connect(server);
559 break;
560 }
561
562 if (memcached_success(rc)) {
563 server->mark_server_as_clean();
564 memcached_version_instance(server);
565 return rc;
566 } else if (set_last_disconnected) {
567 set_last_disconnected_host(server);
568 if (memcached_has_current_error(*server)) {
569 memcached_mark_server_for_timeout(server);
570 assert(memcached_failed(memcached_instance_error_return(server)));
571 } else {
572 memcached_set_error(*server, rc, MEMCACHED_AT);
573 memcached_mark_server_for_timeout(server);
574 }
575
576 LIBMEMCACHED_MEMCACHED_CONNECT_END();
577
578 if (in_timeout) {
579 char buffer[1024];
580 int snprintf_length =
581 snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname(), int(server->port()));
582 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT,
583 buffer, snprintf_length);
584 }
585 }
586
587 return rc;
588 }
589
590 memcached_return_t memcached_connect(memcached_instance_st *server) {
591 return _memcached_connect(server, true);
592 }