2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
16 #include "libmemcached/common.h"
17 #include "p9y/poll.hpp"
21 static memcached_return_t
set_hostinfo(memcached_instance_st
*server
) {
22 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
23 assert(server
->hostname());
25 server
->clear_addrinfo();
27 char str_host
[MEMCACHED_NI_MAXHOST
] = {0}, str_port
[MEMCACHED_NI_MAXSERV
] = {0};
30 auto length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
31 if (length
<= 0 or errno
) {
32 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
33 memcached_literal_param("snprintf(NI_MAXSERV)"));
36 struct addrinfo hints
{};
37 hints
.ai_family
= AF_UNSPEC
;
38 if (memcached_is_udp(server
->root
)) {
39 hints
.ai_protocol
= IPPROTO_UDP
;
40 hints
.ai_socktype
= SOCK_DGRAM
;
42 hints
.ai_protocol
= IPPROTO_TCP
;
43 hints
.ai_socktype
= SOCK_STREAM
;
46 auto hostname
= server
->hostname();
47 if (*hostname
== '[') {
48 auto closing_bracket
= &hostname
[strlen(hostname
) - 1];
49 if (*closing_bracket
== ']') {
50 auto host_len
= closing_bracket
- hostname
- 1;
51 if (host_len
< MEMCACHED_NI_MAXHOST
) {
52 hostname
= strncpy(str_host
, hostname
+ 1, host_len
);
57 auto errcode
= getaddrinfo(hostname
, str_port
, &hints
, &server
->address_info
);
60 server
->address_info_next
= server
->address_info
;
61 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
65 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
66 memcached_string_make_from_cstr(gai_strerror(errcode
)));
69 server
->clear_addrinfo();
70 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
,
71 memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
74 server
->clear_addrinfo();
75 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
,
76 memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
79 server
->clear_addrinfo();
80 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
81 memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
84 server
->clear_addrinfo();
85 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
,
86 memcached_string_make_from_cstr(gai_strerror(errcode
)));
90 return MEMCACHED_SUCCESS
;
93 static inline void set_socket_nonblocking(memcached_instance_st
*server
) {
96 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
) {
97 memcached_set_errno(*server
, get_socket_errno(), NULL
);
102 if (SOCK_NONBLOCK
== 0) {
104 flags
= fcntl(server
->fd
, F_GETFL
, 0);
105 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
108 memcached_set_errno(*server
, errno
, NULL
);
109 } else if ((flags
& O_NONBLOCK
) == 0) {
113 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
114 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
117 memcached_set_errno(*server
, errno
, NULL
);
124 static bool set_socket_options(memcached_instance_st
*server
) {
125 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
128 // If SOCK_CLOEXEC exists then we don't need to call the following
129 if (SOCK_CLOEXEC
== 0) {
133 flags
= fcntl(server
->fd
, F_GETFD
, 0);
134 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
139 rval
= fcntl(server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
140 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
141 // we currently ignore the case where rval is -1
147 if (memcached_is_udp(server
->root
)) {
151 #ifdef HAVE_SO_SNDTIMEO
152 if (server
->root
->snd_timeout
> 0) {
153 struct timeval waittime
;
155 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
156 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
158 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
, (char *) &waittime
,
159 (socklen_t
) sizeof(struct timeval
));
165 #ifdef HAVE_SO_RCVTIMEO
166 if (server
->root
->rcv_timeout
> 0) {
167 struct timeval waittime
;
169 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
170 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
172 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
, (char *) &waittime
,
173 (socklen_t
) sizeof(struct timeval
));
181 # if defined(SO_NOSIGPIPE)
184 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *) &set
, sizeof(int));
188 // This is not considered a fatal error
191 perror("setsockopt(SO_NOSIGPIPE)");
195 # endif // SO_NOSIGPIPE
198 if (server
->root
->flags
.no_block
) {
199 struct linger linger
;
202 linger
.l_linger
= 0; /* By default on close() just drop the socket */
203 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
, (char *) &linger
,
204 (socklen_t
) sizeof(struct linger
));
210 if (server
->root
->flags
.tcp_nodelay
) {
214 setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
, (char *) &flag
, (socklen_t
) sizeof(int));
220 if (server
->root
->flags
.tcp_keepalive
) {
224 setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
, (char *) &flag
, (socklen_t
) sizeof(int));
230 if (server
->root
->tcp_keepidle
> 0) {
231 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
232 (char *) &server
->root
->tcp_keepidle
, (socklen_t
) sizeof(int));
238 if (server
->root
->send_size
> 0) {
239 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
, (char *) &server
->root
->send_size
,
240 (socklen_t
) sizeof(int));
245 if (server
->root
->recv_size
> 0) {
246 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
, (char *) &server
->root
->recv_size
,
247 (socklen_t
) sizeof(int));
252 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
253 set_socket_nonblocking(server
);
258 static memcached_return_t
unix_socket_connect(memcached_instance_st
*server
) {
260 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
263 int type
= SOCK_STREAM
;
265 type
|= SOCK_CLOEXEC
;
269 type
|= SOCK_NONBLOCK
;
272 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) == -1) {
273 return memcached_set_errno(*server
, errno
, NULL
);
276 struct sockaddr_un servAddr
;
278 memset(&servAddr
, 0, sizeof(struct sockaddr_un
));
279 servAddr
.sun_family
= AF_UNIX
;
280 if (strlen(server
->hostname()) >= sizeof(servAddr
.sun_path
)) {
281 return memcached_set_error(*server
, MEMCACHED_FAIL_UNIX_SOCKET
, MEMCACHED_AT
);
283 strncpy(servAddr
.sun_path
, server
->hostname(),
284 sizeof(servAddr
.sun_path
) - 1); /* Copy filename */
286 if (connect(server
->fd
, (struct sockaddr
*) &servAddr
, sizeof(servAddr
)) == -1) {
291 server
->events(POLLOUT
);
295 server
->reset_socket();
298 case EISCONN
: /* We were spinning waiting on connect */
300 assert(0); // Programmer error
301 server
->reset_socket();
306 WATCHPOINT_ERRNO(errno
);
307 server
->reset_socket();
308 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
312 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
314 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
316 return MEMCACHED_SUCCESS
;
319 return MEMCACHED_NOT_SUPPORTED
;
323 static memcached_return_t
network_connect(memcached_instance_st
*server
) {
324 bool timeout_error_occured
= false;
326 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
327 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
330 We want to check both of these because if address_info_next has been fully tried, we want to do
331 a new lookup to make sure we have picked up on any new DNS information.
333 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
) {
334 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
335 server
->address_info_next
= NULL
;
336 memcached_return_t rc
= set_hostinfo(server
);
338 if (memcached_failed(rc
)) {
343 assert(server
->address_info_next
);
344 assert(server
->address_info
);
346 /* Create the socket */
347 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
) {
348 int type
= server
->address_info_next
->ai_socktype
;
350 type
|= SOCK_CLOEXEC
;
354 type
|= SOCK_NONBLOCK
;
358 socket(server
->address_info_next
->ai_family
, type
, server
->address_info_next
->ai_protocol
);
360 if (int(server
->fd
) == SOCKET_ERROR
) {
361 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
364 if (set_socket_options(server
) == false) {
365 server
->reset_socket();
366 return MEMCACHED_CONNECTION_FAILURE
;
369 /* connect to server */
370 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
,
371 server
->address_info_next
->ai_addrlen
)
374 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
375 return MEMCACHED_SUCCESS
;
378 /* An error occurred */
379 int local_error
= get_socket_errno();
380 switch (local_error
) {
382 timeout_error_occured
= true;
385 #if EWOULDBLOCK != EAGAIN
389 case EINPROGRESS
: // nonblocking mode - first return
390 case EALREADY
: // nonblocking mode - subsequent returns
392 server
->events(POLLOUT
);
393 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
394 memcached_return_t rc
= memcached_io_poll(server
, IO_POLL_CONNECT
, local_error
);
396 if (memcached_success(rc
)) {
397 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
398 return MEMCACHED_SUCCESS
;
401 // A timeout here is treated as an error, we will not retry
402 if (rc
== MEMCACHED_TIMEOUT
) {
403 timeout_error_occured
= true;
407 case EISCONN
: // we are connected :-)
408 WATCHPOINT_ASSERT(0); // This is a programmer's error
411 case EINTR
: // Special case, we retry ai_addr
412 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
413 server
->reset_socket();
417 // Probably not running service
420 memcached_set_errno(*server
, local_error
, MEMCACHED_AT
);
424 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
425 server
->reset_socket();
426 server
->address_info_next
= server
->address_info_next
->ai_next
;
429 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
431 if (timeout_error_occured
) {
432 server
->reset_socket();
435 WATCHPOINT_STRING("Never got a good file descriptor");
437 if (memcached_has_current_error(*server
)) {
438 return memcached_instance_error_return(server
);
441 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
) {
442 return memcached_set_error(
443 *server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
444 memcached_literal_param(
445 "if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
448 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
,
449 MEMCACHED_AT
); /* The last error should be from connect() */
455 Based on time/failure count fail the connect without trying. This prevents waiting in a state
456 where we get caught spending cycles just waiting.
458 static memcached_return_t
backoff_handling(memcached_instance_st
*server
, bool &in_timeout
) {
459 struct timeval curr_time
;
460 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
463 If we hit server_failure_limit then something is completely wrong about the server.
465 1) If autoeject is enabled we do that.
466 2) If not? We go into timeout again, there is much else to do :(
468 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
) {
470 We just auto_eject if we hit this point
472 if (_is_auto_eject_host(server
->root
)) {
473 set_last_disconnected_host(server
);
475 // Retry dead servers if requested
476 if (_gettime_success
and server
->root
->dead_timeout
> 0) {
477 server
->next_retry
= curr_time
.tv_sec
+ server
->root
->dead_timeout
;
479 // We only retry dead servers once before assuming failure again
480 server
->server_failure_counter
= server
->root
->server_failure_limit
- 1;
483 memcached_return_t rc
;
484 if (memcached_failed(rc
= run_distribution((memcached_st
*) server
->root
))) {
485 return memcached_set_error(
486 *server
, rc
, MEMCACHED_AT
,
487 memcached_literal_param("Backoff handling failed during run_distribution"));
490 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
493 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
495 // Sanity check/setting
496 if (server
->next_retry
== 0) {
497 server
->next_retry
= 1;
501 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
) {
503 If next_retry is less then our current time, then we reset and try everything again.
505 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
) {
506 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
507 server
->server_timeout_counter
= 0;
509 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
515 return MEMCACHED_SUCCESS
;
518 static memcached_return_t
_memcached_connect(memcached_instance_st
*server
,
519 const bool set_last_disconnected
) {
521 if (server
->fd
!= INVALID_SOCKET
) {
522 return MEMCACHED_SUCCESS
;
525 LIBMEMCACHED_MEMCACHED_CONNECT_START();
527 bool in_timeout
= false;
528 memcached_return_t rc
;
529 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
))) {
530 set_last_disconnected_host(server
);
534 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
535 and memcached_is_udp(server
->root
))
537 return memcached_set_error(
538 *server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
,
539 memcached_literal_param("SASL is not supported for UDP connections"));
542 if (server
->hostname()[0] == '/') {
543 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
546 /* We need to clean up the multi startup piece */
547 switch (server
->type
) {
548 case MEMCACHED_CONNECTION_UDP
:
549 case MEMCACHED_CONNECTION_TCP
:
550 rc
= network_connect(server
);
552 #if defined(LIBMEMCACHED_WITH_SASL_SUPPORT)
553 if (LIBMEMCACHED_WITH_SASL_SUPPORT
) {
554 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
) {
555 rc
= memcached_sasl_authenticate_connection(server
);
556 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
) {
557 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
558 server
->reset_socket();
565 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
566 rc
= unix_socket_connect(server
);
570 if (memcached_success(rc
)) {
571 server
->mark_server_as_clean();
572 memcached_version_instance(server
);
574 } else if (set_last_disconnected
) {
575 set_last_disconnected_host(server
);
576 if (memcached_has_current_error(*server
)) {
577 memcached_mark_server_for_timeout(server
);
578 assert(memcached_failed(memcached_instance_error_return(server
)));
580 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
581 memcached_mark_server_for_timeout(server
);
584 LIBMEMCACHED_MEMCACHED_CONNECT_END();
588 int snprintf_length
=
589 snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname(), int(server
->port()));
590 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
,
591 buffer
, snprintf_length
);
598 memcached_return_t
memcached_connect(memcached_instance_st
*server
) {
599 return _memcached_connect(server
, true);