e6db6fd64da5271376b47a4c0d5581450f98041a
[m6w6/libmemcached] / src / libmemcached / connect.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17 #include "p9y/poll.hpp"
18
19 #include <cassert>
20
21
22 static memcached_return_t set_hostinfo(memcached_instance_st *server) {
23 assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
24 server->clear_addrinfo();
25
26 char str_port[MEMCACHED_NI_MAXSERV] = {0};
27 errno = 0;
28 int length = snprintf(str_port, MEMCACHED_NI_MAXSERV, "%u", uint32_t(server->port()));
29 if (length >= MEMCACHED_NI_MAXSERV or length <= 0 or errno) {
30 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
31 memcached_literal_param("snprintf(NI_MAXSERV)"));
32 }
33
34 struct addrinfo hints;
35 memset(&hints, 0, sizeof(struct addrinfo));
36
37 hints.ai_family = AF_UNSPEC;
38 if (memcached_is_udp(server->root)) {
39 hints.ai_protocol = IPPROTO_UDP;
40 hints.ai_socktype = SOCK_DGRAM;
41 } else {
42 hints.ai_socktype = SOCK_STREAM;
43 hints.ai_protocol = IPPROTO_TCP;
44 }
45
46 assert(server->address_info == NULL);
47 assert(server->address_info_next == NULL);
48 int errcode;
49 char hostname[MEMCACHED_NI_MAXHOST];
50 const char *addr;
51 char *p;
52
53 assert(server->hostname());
54 // drop [] from address, commonly used for IPv6
55 addr = server->hostname();
56 if (*addr == '[') {
57 strcpy(hostname, addr +1);
58 p = strchr(hostname, ']');
59 if (p) {
60 *p = 0;
61 addr = hostname;
62 }
63 }
64 switch (errcode = getaddrinfo(addr, str_port, &hints, &server->address_info)) {
65 case 0:
66 server->address_info_next = server->address_info;
67 server->state = MEMCACHED_SERVER_STATE_ADDRINFO;
68 break;
69
70 case EAI_AGAIN:
71 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
72 memcached_string_make_from_cstr(gai_strerror(errcode)));
73
74 case EAI_SYSTEM:
75 server->clear_addrinfo();
76 return memcached_set_errno(*server, errno, MEMCACHED_AT,
77 memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
78
79 case EAI_BADFLAGS:
80 server->clear_addrinfo();
81 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
82 memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
83
84 case EAI_MEMORY:
85 server->clear_addrinfo();
86 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
87 memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
88
89 default: {
90 server->clear_addrinfo();
91 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT,
92 memcached_string_make_from_cstr(gai_strerror(errcode)));
93 }
94 }
95
96 return MEMCACHED_SUCCESS;
97 }
98
99 static inline void set_socket_nonblocking(memcached_instance_st *server) {
100 #if defined(_WIN32)
101 u_long arg = 1;
102 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR) {
103 memcached_set_errno(*server, get_socket_errno(), NULL);
104 }
105 #else
106 int flags;
107
108 if (SOCK_NONBLOCK == 0) {
109 do {
110 flags = fcntl(server->fd, F_GETFL, 0);
111 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
112
113 if (flags == -1) {
114 memcached_set_errno(*server, errno, NULL);
115 } else if ((flags & O_NONBLOCK) == 0) {
116 int rval;
117
118 do {
119 rval = fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
120 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
121
122 if (rval == -1) {
123 memcached_set_errno(*server, errno, NULL);
124 }
125 }
126 }
127 #endif
128 }
129
130 static bool set_socket_options(memcached_instance_st *server) {
131 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
132
133 #ifdef HAVE_FCNTL
134 // If SOCK_CLOEXEC exists then we don't need to call the following
135 if (SOCK_CLOEXEC == 0) {
136 if (FD_CLOEXEC) {
137 int flags;
138 do {
139 flags = fcntl(server->fd, F_GETFD, 0);
140 } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
141
142 if (flags != -1) {
143 int rval;
144 do {
145 rval = fcntl(server->fd, F_SETFD, flags | FD_CLOEXEC);
146 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
147 // we currently ignore the case where rval is -1
148 }
149 }
150 }
151 #endif
152
153 if (memcached_is_udp(server->root)) {
154 return true;
155 }
156
157 #ifdef HAVE_SO_SNDTIMEO
158 if (server->root->snd_timeout > 0) {
159 struct timeval waittime;
160
161 waittime.tv_sec = server->root->snd_timeout / 1000000;
162 waittime.tv_usec = server->root->snd_timeout % 1000000;
163
164 int error = setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &waittime,
165 (socklen_t) sizeof(struct timeval));
166 (void) error;
167 assert(error == 0);
168 }
169 #endif
170
171 #ifdef HAVE_SO_RCVTIMEO
172 if (server->root->rcv_timeout > 0) {
173 struct timeval waittime;
174
175 waittime.tv_sec = server->root->rcv_timeout / 1000000;
176 waittime.tv_usec = server->root->rcv_timeout % 1000000;
177
178 int error = setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &waittime,
179 (socklen_t) sizeof(struct timeval));
180 (void) (error);
181 assert(error == 0);
182 }
183 #endif
184
185 #if defined(_WIN32)
186 #else
187 # if defined(SO_NOSIGPIPE)
188 if (SO_NOSIGPIPE) {
189 int set = 1;
190 int error = setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *) &set, sizeof(int));
191
192 assert(error == 0);
193
194 // This is not considered a fatal error
195 if (error == -1) {
196 # if 0
197 perror("setsockopt(SO_NOSIGPIPE)");
198 # endif
199 }
200 }
201 # endif // SO_NOSIGPIPE
202 #endif // _WIN32
203
204 if (server->root->flags.no_block) {
205 struct linger linger;
206
207 linger.l_onoff = 1;
208 linger.l_linger = 0; /* By default on close() just drop the socket */
209 int error = setsockopt(server->fd, SOL_SOCKET, SO_LINGER, (char *) &linger,
210 (socklen_t) sizeof(struct linger));
211 (void) (error);
212 assert(error == 0);
213 }
214
215 if (TCP_NODELAY) {
216 if (server->root->flags.tcp_nodelay) {
217 int flag = 1;
218
219 int error =
220 setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, (socklen_t) sizeof(int));
221 (void) (error);
222 assert(error == 0);
223 }
224 }
225
226 if (server->root->flags.tcp_keepalive) {
227 int flag = 1;
228
229 int error =
230 setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, (socklen_t) sizeof(int));
231 (void) (error);
232 assert(error == 0);
233 }
234
235 if (TCP_KEEPIDLE) {
236 if (server->root->tcp_keepidle > 0) {
237 int error = setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
238 (char *) &server->root->tcp_keepidle, (socklen_t) sizeof(int));
239 (void) (error);
240 assert(error == 0);
241 }
242 }
243
244 if (server->root->send_size > 0) {
245 int error = setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF, (char *) &server->root->send_size,
246 (socklen_t) sizeof(int));
247 (void) (error);
248 assert(error == 0);
249 }
250
251 if (server->root->recv_size > 0) {
252 int error = setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF, (char *) &server->root->recv_size,
253 (socklen_t) sizeof(int));
254 (void) (error);
255 assert(error == 0);
256 }
257
258 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
259 set_socket_nonblocking(server);
260
261 return true;
262 }
263
264 static memcached_return_t unix_socket_connect(memcached_instance_st *server) {
265 #ifndef _WIN32
266 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
267
268 do {
269 int type = SOCK_STREAM;
270 if (SOCK_CLOEXEC) {
271 type |= SOCK_CLOEXEC;
272 }
273
274 if (SOCK_NONBLOCK) {
275 type |= SOCK_NONBLOCK;
276 }
277
278 if ((server->fd = socket(AF_UNIX, type, 0)) == -1) {
279 return memcached_set_errno(*server, errno, NULL);
280 }
281
282 struct sockaddr_un servAddr;
283
284 memset(&servAddr, 0, sizeof(struct sockaddr_un));
285 servAddr.sun_family = AF_UNIX;
286 if (strlen(server->hostname()) >= sizeof(servAddr.sun_path)) {
287 return memcached_set_error(*server, MEMCACHED_FAIL_UNIX_SOCKET, MEMCACHED_AT);
288 }
289 strncpy(servAddr.sun_path, server->hostname(),
290 sizeof(servAddr.sun_path) - 1); /* Copy filename */
291
292 if (connect(server->fd, (struct sockaddr *) &servAddr, sizeof(servAddr)) == -1) {
293 switch (errno) {
294 case EINPROGRESS:
295 case EALREADY:
296 case EAGAIN:
297 server->events(POLLOUT);
298 break;
299
300 case EINTR:
301 server->reset_socket();
302 continue;
303
304 case EISCONN: /* We were spinning waiting on connect */
305 {
306 assert(0); // Programmer error
307 server->reset_socket();
308 continue;
309 }
310
311 default:
312 WATCHPOINT_ERRNO(errno);
313 server->reset_socket();
314 return memcached_set_errno(*server, errno, MEMCACHED_AT);
315 }
316 }
317 } while (0);
318 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
319
320 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
321
322 return MEMCACHED_SUCCESS;
323 #else
324 (void) server;
325 return MEMCACHED_NOT_SUPPORTED;
326 #endif
327 }
328
329 static memcached_return_t network_connect(memcached_instance_st *server) {
330 bool timeout_error_occured = false;
331
332 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
333 WATCHPOINT_ASSERT(server->cursor_active_ == 0);
334
335 /*
336 We want to check both of these because if address_info_next has been fully tried, we want to do
337 a new lookup to make sure we have picked up on any new DNS information.
338 */
339 if (server->address_info == NULL or server->address_info_next == NULL) {
340 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
341 server->address_info_next = NULL;
342 memcached_return_t rc = set_hostinfo(server);
343
344 if (memcached_failed(rc)) {
345 return rc;
346 }
347 }
348
349 assert(server->address_info_next);
350 assert(server->address_info);
351
352 /* Create the socket */
353 while (server->address_info_next and server->fd == INVALID_SOCKET) {
354 int type = server->address_info_next->ai_socktype;
355 if (SOCK_CLOEXEC) {
356 type |= SOCK_CLOEXEC;
357 }
358
359 if (SOCK_NONBLOCK) {
360 type |= SOCK_NONBLOCK;
361 }
362
363 server->fd =
364 socket(server->address_info_next->ai_family, type, server->address_info_next->ai_protocol);
365
366 if (int(server->fd) == SOCKET_ERROR) {
367 return memcached_set_errno(*server, get_socket_errno(), NULL);
368 }
369
370 if (set_socket_options(server) == false) {
371 server->reset_socket();
372 return MEMCACHED_CONNECTION_FAILURE;
373 }
374
375 /* connect to server */
376 if ((connect(server->fd, server->address_info_next->ai_addr,
377 server->address_info_next->ai_addrlen)
378 != SOCKET_ERROR))
379 {
380 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
381 return MEMCACHED_SUCCESS;
382 }
383
384 /* An error occurred */
385 int local_error = get_socket_errno();
386 switch (local_error) {
387 case ETIMEDOUT:
388 timeout_error_occured = true;
389 break;
390
391 #if EWOULDBLOCK != EAGAIN
392 case EWOULDBLOCK:
393 #endif
394 case EAGAIN:
395 case EINPROGRESS: // nonblocking mode - first return
396 case EALREADY: // nonblocking mode - subsequent returns
397 {
398 server->events(POLLOUT);
399 server->state = MEMCACHED_SERVER_STATE_IN_PROGRESS;
400 memcached_return_t rc = memcached_io_poll(server, IO_POLL_CONNECT, local_error);
401
402 if (memcached_success(rc)) {
403 server->state = MEMCACHED_SERVER_STATE_CONNECTED;
404 return MEMCACHED_SUCCESS;
405 }
406
407 // A timeout here is treated as an error, we will not retry
408 if (rc == MEMCACHED_TIMEOUT) {
409 timeout_error_occured = true;
410 }
411 } break;
412
413 case EISCONN: // we are connected :-)
414 WATCHPOINT_ASSERT(0); // This is a programmer's error
415 break;
416
417 case EINTR: // Special case, we retry ai_addr
418 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
419 server->reset_socket();
420 continue;
421
422 case ECONNREFUSED:
423 // Probably not running service
424
425 default:
426 memcached_set_errno(*server, local_error, MEMCACHED_AT);
427 break;
428 }
429
430 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
431 server->reset_socket();
432 server->address_info_next = server->address_info_next->ai_next;
433 }
434
435 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
436
437 if (timeout_error_occured) {
438 server->reset_socket();
439 }
440
441 WATCHPOINT_STRING("Never got a good file descriptor");
442
443 if (memcached_has_current_error(*server)) {
444 return memcached_instance_error_return(server);
445 }
446
447 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS) {
448 return memcached_set_error(
449 *server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
450 memcached_literal_param(
451 "if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
452 }
453
454 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE,
455 MEMCACHED_AT); /* The last error should be from connect() */
456 }
457
458 /*
459 backoff_handling()
460
461 Based on time/failure count fail the connect without trying. This prevents waiting in a state
462 where we get caught spending cycles just waiting.
463 */
464 static memcached_return_t backoff_handling(memcached_instance_st *server, bool &in_timeout) {
465 struct timeval curr_time;
466 bool _gettime_success = (gettimeofday(&curr_time, NULL) == 0);
467
468 /*
469 If we hit server_failure_limit then something is completely wrong about the server.
470
471 1) If autoeject is enabled we do that.
472 2) If not? We go into timeout again, there is much else to do :(
473 */
474 if (server->server_failure_counter >= server->root->server_failure_limit) {
475 /*
476 We just auto_eject if we hit this point
477 */
478 if (_is_auto_eject_host(server->root)) {
479 set_last_disconnected_host(server);
480
481 // Retry dead servers if requested
482 if (_gettime_success and server->root->dead_timeout > 0) {
483 server->next_retry = curr_time.tv_sec + server->root->dead_timeout;
484
485 // We only retry dead servers once before assuming failure again
486 server->server_failure_counter = server->root->server_failure_limit - 1;
487 }
488
489 memcached_return_t rc;
490 if (memcached_failed(rc = run_distribution((memcached_st *) server->root))) {
491 return memcached_set_error(
492 *server, rc, MEMCACHED_AT,
493 memcached_literal_param("Backoff handling failed during run_distribution"));
494 }
495
496 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
497 }
498
499 server->state = MEMCACHED_SERVER_STATE_IN_TIMEOUT;
500
501 // Sanity check/setting
502 if (server->next_retry == 0) {
503 server->next_retry = 1;
504 }
505 }
506
507 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT) {
508 /*
509 If next_retry is less then our current time, then we reset and try everything again.
510 */
511 if (_gettime_success and server->next_retry < curr_time.tv_sec) {
512 server->state = MEMCACHED_SERVER_STATE_NEW;
513 server->server_timeout_counter = 0;
514 } else {
515 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
516 }
517
518 in_timeout = true;
519 }
520
521 return MEMCACHED_SUCCESS;
522 }
523
524 static memcached_return_t _memcached_connect(memcached_instance_st *server,
525 const bool set_last_disconnected) {
526 assert(server);
527 if (server->fd != INVALID_SOCKET) {
528 return MEMCACHED_SUCCESS;
529 }
530
531 LIBMEMCACHED_MEMCACHED_CONNECT_START();
532
533 bool in_timeout = false;
534 memcached_return_t rc;
535 if (memcached_failed(rc = backoff_handling(server, in_timeout))) {
536 set_last_disconnected_host(server);
537 return rc;
538 }
539
540 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks
541 and memcached_is_udp(server->root))
542 {
543 return memcached_set_error(
544 *server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT,
545 memcached_literal_param("SASL is not supported for UDP connections"));
546 }
547
548 if (server->hostname()[0] == '/') {
549 server->type = MEMCACHED_CONNECTION_UNIX_SOCKET;
550 }
551
552 /* We need to clean up the multi startup piece */
553 switch (server->type) {
554 case MEMCACHED_CONNECTION_UDP:
555 case MEMCACHED_CONNECTION_TCP:
556 rc = network_connect(server);
557
558 #if defined(LIBMEMCACHED_WITH_SASL_SUPPORT)
559 if (LIBMEMCACHED_WITH_SASL_SUPPORT) {
560 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks) {
561 rc = memcached_sasl_authenticate_connection(server);
562 if (memcached_failed(rc) and server->fd != INVALID_SOCKET) {
563 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
564 server->reset_socket();
565 }
566 }
567 }
568 #endif
569 break;
570
571 case MEMCACHED_CONNECTION_UNIX_SOCKET:
572 rc = unix_socket_connect(server);
573 break;
574 }
575
576 if (memcached_success(rc)) {
577 server->mark_server_as_clean();
578 memcached_version_instance(server);
579 return rc;
580 } else if (set_last_disconnected) {
581 set_last_disconnected_host(server);
582 if (memcached_has_current_error(*server)) {
583 memcached_mark_server_for_timeout(server);
584 assert(memcached_failed(memcached_instance_error_return(server)));
585 } else {
586 memcached_set_error(*server, rc, MEMCACHED_AT);
587 memcached_mark_server_for_timeout(server);
588 }
589
590 LIBMEMCACHED_MEMCACHED_CONNECT_END();
591
592 if (in_timeout) {
593 char buffer[1024];
594 int snprintf_length =
595 snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname(), int(server->port()));
596 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT,
597 buffer, snprintf_length);
598 }
599 }
600
601 return rc;
602 }
603
604 memcached_return_t memcached_connect(memcached_instance_st *server) {
605 return _memcached_connect(server, true);
606 }