2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
16 #include "libmemcached/common.h"
21 # define SOCK_CLOEXEC 0
25 # define SOCK_NONBLOCK 0
33 # define SO_NOSIGPIPE 0
37 # define TCP_NODELAY 0
41 # define TCP_KEEPIDLE 0
44 static memcached_return_t
connect_poll(memcached_instance_st
*server
, const int connection_error
) {
46 fds
[0].fd
= server
->fd
;
47 fds
[0].events
= server
->events();
52 if (server
->root
->poll_timeout
== 0) {
53 return memcached_set_error(
54 *server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
55 memcached_literal_param("The time to wait for a connection to be established was set to "
56 "zero which produces a timeout to every call to poll()."));
59 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
62 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) == -1) {
63 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
64 switch (local_errno
) {
72 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
75 return memcached_set_error(
76 *server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
77 memcached_literal_param(
78 "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
80 default: // This should not happen
84 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
85 server
->reset_socket();
86 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
88 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
92 if (connection_error
== EINPROGRESS
) {
94 socklen_t len
= sizeof(err
);
95 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char *) &err
, &len
) == -1) {
96 return memcached_set_errno(
97 *server
, errno
, MEMCACHED_AT
,
98 memcached_literal_param(
99 "getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
102 // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
104 return memcached_set_errno(
105 *server
, err
, MEMCACHED_AT
,
106 memcached_literal_param("getsockopt() found the error from poll() after connect() "
107 "returned EINPROGRESS."));
111 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
112 memcached_literal_param("(number_of == 0)"));
115 assert(number_of
== 1);
117 if (fds
[0].revents
& POLLERR
or fds
[0].revents
& POLLHUP
or fds
[0].revents
& POLLNVAL
) {
119 socklen_t len
= sizeof(err
);
120 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char *) &err
, &len
) == -1) {
121 return memcached_set_errno(
122 *server
, errno
, MEMCACHED_AT
,
123 memcached_literal_param(
124 "getsockopt() errored while looking up error state from poll()"));
127 // We check the value to see what happened with the socket.
128 if (err
== 0) // Should not happen
130 return MEMCACHED_SUCCESS
;
134 return memcached_set_errno(
135 *server
, err
, MEMCACHED_AT
,
136 memcached_literal_param("getsockopt() found the error from poll() during connect."));
138 assert(fds
[0].revents
& POLLOUT
);
140 if (fds
[0].revents
& POLLOUT
and connection_error
== EINPROGRESS
) {
142 socklen_t len
= sizeof(err
);
143 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char *) &err
, &len
) == -1) {
144 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
148 return MEMCACHED_SUCCESS
;
151 return memcached_set_errno(
152 *server
, err
, MEMCACHED_AT
,
153 memcached_literal_param(
154 "getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
157 break; // We only have the loop setup for errno types that require restart
160 // This should only be possible from ERESTART or EINTR;
161 return memcached_set_errno(*server
, connection_error
, MEMCACHED_AT
,
162 memcached_literal_param("connect_poll() was exhausted"));
165 static memcached_return_t
set_hostinfo(memcached_instance_st
*server
) {
166 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
167 server
->clear_addrinfo();
169 char str_port
[MEMCACHED_NI_MAXSERV
] = {0};
171 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
172 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0 or errno
!= 0) {
173 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
174 memcached_literal_param("snprintf(NI_MAXSERV)"));
177 struct addrinfo hints
;
178 memset(&hints
, 0, sizeof(struct addrinfo
));
180 hints
.ai_family
= AF_UNSPEC
;
181 if (memcached_is_udp(server
->root
)) {
182 hints
.ai_protocol
= IPPROTO_UDP
;
183 hints
.ai_socktype
= SOCK_DGRAM
;
185 hints
.ai_socktype
= SOCK_STREAM
;
186 hints
.ai_protocol
= IPPROTO_TCP
;
189 assert(server
->address_info
== NULL
);
190 assert(server
->address_info_next
== NULL
);
192 assert(server
->hostname());
193 switch (errcode
= getaddrinfo(server
->hostname(), str_port
, &hints
, &server
->address_info
)) {
195 server
->address_info_next
= server
->address_info
;
196 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
200 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
201 memcached_string_make_from_cstr(gai_strerror(errcode
)));
204 server
->clear_addrinfo();
205 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
,
206 memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
209 server
->clear_addrinfo();
210 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
,
211 memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
214 server
->clear_addrinfo();
215 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
216 memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
219 server
->clear_addrinfo();
220 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
,
221 memcached_string_make_from_cstr(gai_strerror(errcode
)));
225 return MEMCACHED_SUCCESS
;
228 static inline void set_socket_nonblocking(memcached_instance_st
*server
) {
231 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
) {
232 memcached_set_errno(*server
, get_socket_errno(), NULL
);
237 if (SOCK_NONBLOCK
== 0) {
239 flags
= fcntl(server
->fd
, F_GETFL
, 0);
240 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
243 memcached_set_errno(*server
, errno
, NULL
);
244 } else if ((flags
& O_NONBLOCK
) == 0) {
248 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
249 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
252 memcached_set_errno(*server
, errno
, NULL
);
259 static bool set_socket_options(memcached_instance_st
*server
) {
260 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
263 // If SOCK_CLOEXEC exists then we don't need to call the following
264 if (SOCK_CLOEXEC
== 0) {
265 if (FD_CLOEXEC
!= 0) {
268 flags
= fcntl(server
->fd
, F_GETFD
, 0);
269 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
274 rval
= fcntl(server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
275 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
276 // we currently ignore the case where rval is -1
282 if (memcached_is_udp(server
->root
)) {
286 #ifdef HAVE_SO_SNDTIMEO
287 if (server
->root
->snd_timeout
> 0) {
288 struct timeval waittime
;
290 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
291 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
293 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
, (char *) &waittime
,
294 (socklen_t
) sizeof(struct timeval
));
300 #ifdef HAVE_SO_RCVTIMEO
301 if (server
->root
->rcv_timeout
> 0) {
302 struct timeval waittime
;
304 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
305 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
307 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
, (char *) &waittime
,
308 (socklen_t
) sizeof(struct timeval
));
316 # if defined(SO_NOSIGPIPE)
319 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *) &set
, sizeof(int));
323 // This is not considered a fatal error
326 perror("setsockopt(SO_NOSIGPIPE)");
330 # endif // SO_NOSIGPIPE
333 if (server
->root
->flags
.no_block
) {
334 struct linger linger
;
337 linger
.l_linger
= 0; /* By default on close() just drop the socket */
338 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
, (char *) &linger
,
339 (socklen_t
) sizeof(struct linger
));
345 if (server
->root
->flags
.tcp_nodelay
) {
349 setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
, (char *) &flag
, (socklen_t
) sizeof(int));
355 if (server
->root
->flags
.tcp_keepalive
) {
359 setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
, (char *) &flag
, (socklen_t
) sizeof(int));
365 if (server
->root
->tcp_keepidle
> 0) {
366 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
367 (char *) &server
->root
->tcp_keepidle
, (socklen_t
) sizeof(int));
373 if (server
->root
->send_size
> 0) {
374 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
, (char *) &server
->root
->send_size
,
375 (socklen_t
) sizeof(int));
380 if (server
->root
->recv_size
> 0) {
381 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
, (char *) &server
->root
->recv_size
,
382 (socklen_t
) sizeof(int));
387 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
388 set_socket_nonblocking(server
);
393 static memcached_return_t
unix_socket_connect(memcached_instance_st
*server
) {
395 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
398 int type
= SOCK_STREAM
;
399 if (SOCK_CLOEXEC
!= 0) {
400 type
|= SOCK_CLOEXEC
;
403 if (SOCK_NONBLOCK
!= 0) {
404 type
|= SOCK_NONBLOCK
;
407 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) == -1) {
408 return memcached_set_errno(*server
, errno
, NULL
);
411 struct sockaddr_un servAddr
;
413 memset(&servAddr
, 0, sizeof(struct sockaddr_un
));
414 servAddr
.sun_family
= AF_UNIX
;
415 if (strlen(server
->hostname()) >= sizeof(servAddr
.sun_path
)) {
416 return memcached_set_error(*server
, MEMCACHED_FAIL_UNIX_SOCKET
, MEMCACHED_AT
);
418 strncpy(servAddr
.sun_path
, server
->hostname(),
419 sizeof(servAddr
.sun_path
) - 1); /* Copy filename */
421 if (connect(server
->fd
, (struct sockaddr
*) &servAddr
, sizeof(servAddr
)) == -1) {
425 case EAGAIN
: server
->events(POLLOUT
); break;
427 case EINTR
: server
->reset_socket(); continue;
429 case EISCONN
: /* We were spinning waiting on connect */
431 assert(0); // Programmer error
432 server
->reset_socket();
437 WATCHPOINT_ERRNO(errno
);
438 server
->reset_socket();
439 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
443 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
445 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
447 return MEMCACHED_SUCCESS
;
450 return MEMCACHED_NOT_SUPPORTED
;
454 static memcached_return_t
network_connect(memcached_instance_st
*server
) {
455 bool timeout_error_occured
= false;
457 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
458 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
461 We want to check both of these because if address_info_next has been fully tried, we want to do
462 a new lookup to make sure we have picked up on any new DNS information.
464 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
) {
465 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
466 server
->address_info_next
= NULL
;
467 memcached_return_t rc
= set_hostinfo(server
);
469 if (memcached_failed(rc
)) {
474 assert(server
->address_info_next
);
475 assert(server
->address_info
);
477 /* Create the socket */
478 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
) {
479 int type
= server
->address_info_next
->ai_socktype
;
480 if (SOCK_CLOEXEC
!= 0) {
481 type
|= SOCK_CLOEXEC
;
484 if (SOCK_NONBLOCK
!= 0) {
485 type
|= SOCK_NONBLOCK
;
489 socket(server
->address_info_next
->ai_family
, type
, server
->address_info_next
->ai_protocol
);
491 if (int(server
->fd
) == SOCKET_ERROR
) {
492 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
495 if (set_socket_options(server
) == false) {
496 server
->reset_socket();
497 return MEMCACHED_CONNECTION_FAILURE
;
500 /* connect to server */
501 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
,
502 server
->address_info_next
->ai_addrlen
)
505 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
506 return MEMCACHED_SUCCESS
;
509 /* An error occurred */
510 int local_error
= get_socket_errno();
511 switch (local_error
) {
512 case ETIMEDOUT
: timeout_error_occured
= true; break;
514 #if EWOULDBLOCK != EAGAIN
517 case EINPROGRESS
: // nonblocking mode - first return
518 case EALREADY
: // nonblocking mode - subsequent returns
520 server
->events(POLLOUT
);
521 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
522 memcached_return_t rc
= connect_poll(server
, local_error
);
524 if (memcached_success(rc
)) {
525 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
526 return MEMCACHED_SUCCESS
;
529 // A timeout here is treated as an error, we will not retry
530 if (rc
== MEMCACHED_TIMEOUT
) {
531 timeout_error_occured
= true;
535 case EISCONN
: // we are connected :-)
536 WATCHPOINT_ASSERT(0); // This is a programmer's error
539 case EINTR
: // Special case, we retry ai_addr
540 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
541 server
->reset_socket();
545 // Probably not running service
547 default: memcached_set_errno(*server
, local_error
, MEMCACHED_AT
); break;
550 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
551 server
->reset_socket();
552 server
->address_info_next
= server
->address_info_next
->ai_next
;
555 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
557 if (timeout_error_occured
) {
558 server
->reset_socket();
561 WATCHPOINT_STRING("Never got a good file descriptor");
563 if (memcached_has_current_error(*server
)) {
564 return memcached_instance_error_return(server
);
567 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
) {
568 return memcached_set_error(
569 *server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
570 memcached_literal_param(
571 "if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
574 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
,
575 MEMCACHED_AT
); /* The last error should be from connect() */
581 Based on time/failure count fail the connect without trying. This prevents waiting in a state
582 where we get caught spending cycles just waiting.
584 static memcached_return_t
backoff_handling(memcached_instance_st
*server
, bool &in_timeout
) {
585 struct timeval curr_time
;
586 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
589 If we hit server_failure_limit then something is completely wrong about the server.
591 1) If autoeject is enabled we do that.
592 2) If not? We go into timeout again, there is much else to do :(
594 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
) {
596 We just auto_eject if we hit this point
598 if (_is_auto_eject_host(server
->root
)) {
599 set_last_disconnected_host(server
);
601 // Retry dead servers if requested
602 if (_gettime_success
and server
->root
->dead_timeout
> 0) {
603 server
->next_retry
= curr_time
.tv_sec
+ server
->root
->dead_timeout
;
605 // We only retry dead servers once before assuming failure again
606 server
->server_failure_counter
= server
->root
->server_failure_limit
- 1;
609 memcached_return_t rc
;
610 if (memcached_failed(rc
= run_distribution((memcached_st
*) server
->root
))) {
611 return memcached_set_error(
612 *server
, rc
, MEMCACHED_AT
,
613 memcached_literal_param("Backoff handling failed during run_distribution"));
616 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
619 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
621 // Sanity check/setting
622 if (server
->next_retry
== 0) {
623 server
->next_retry
= 1;
627 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
) {
629 If next_retry is less then our current time, then we reset and try everything again.
631 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
) {
632 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
633 server
->server_timeout_counter
= 0;
635 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
641 return MEMCACHED_SUCCESS
;
644 static memcached_return_t
_memcached_connect(memcached_instance_st
*server
,
645 const bool set_last_disconnected
) {
647 if (server
->fd
!= INVALID_SOCKET
) {
648 return MEMCACHED_SUCCESS
;
651 LIBMEMCACHED_MEMCACHED_CONNECT_START();
653 bool in_timeout
= false;
654 memcached_return_t rc
;
655 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
))) {
656 set_last_disconnected_host(server
);
660 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
661 and memcached_is_udp(server
->root
))
663 return memcached_set_error(
664 *server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
,
665 memcached_literal_param("SASL is not supported for UDP connections"));
668 if (server
->hostname()[0] == '/') {
669 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
672 /* We need to clean up the multi startup piece */
673 switch (server
->type
) {
674 case MEMCACHED_CONNECTION_UDP
:
675 case MEMCACHED_CONNECTION_TCP
: rc
= network_connect(server
);
677 #if defined(LIBMEMCACHED_WITH_SASL_SUPPORT)
678 if (LIBMEMCACHED_WITH_SASL_SUPPORT
) {
679 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
) {
680 rc
= memcached_sasl_authenticate_connection(server
);
681 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
) {
682 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
683 server
->reset_socket();
690 case MEMCACHED_CONNECTION_UNIX_SOCKET
: rc
= unix_socket_connect(server
); break;
693 if (memcached_success(rc
)) {
694 server
->mark_server_as_clean();
695 memcached_version_instance(server
);
697 } else if (set_last_disconnected
) {
698 set_last_disconnected_host(server
);
699 if (memcached_has_current_error(*server
)) {
700 memcached_mark_server_for_timeout(server
);
701 assert(memcached_failed(memcached_instance_error_return(server
)));
703 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
704 memcached_mark_server_for_timeout(server
);
707 LIBMEMCACHED_MEMCACHED_CONNECT_END();
711 int snprintf_length
=
712 snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname(), int(server
->port()));
713 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
,
714 buffer
, snprintf_length
);
721 memcached_return_t
memcached_connect(memcached_instance_st
*server
) {
722 return _memcached_connect(server
, true);