2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
16 #include "libmemcached/common.h"
17 #include "p9y/poll.hpp"
22 static memcached_return_t
connect_poll(memcached_instance_st
*server
, const int connection_error
) {
24 fds
[0].fd
= server
->fd
;
25 fds
[0].events
= server
->events();
30 if (server
->root
->connect_timeout
== 0) {
31 return memcached_set_error(
32 *server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
33 memcached_literal_param("The time to wait for a connection to be established was set to "
34 "zero which produces a timeout to every call to poll()."));
37 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
40 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) == SOCKET_ERROR
) {
41 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
42 switch (local_errno
) {
51 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
54 return memcached_set_error(
55 *server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
56 memcached_literal_param(
57 "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
59 default: // This should not happen
63 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
64 server
->reset_socket();
65 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
67 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
71 if (connection_error
!= EALREADY
) {
73 socklen_t len
= sizeof(err
);
74 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char *) &err
, &len
) == -1) {
75 return memcached_set_errno(
76 *server
, errno
, MEMCACHED_AT
,
77 memcached_literal_param(
78 "getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
81 // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
83 return memcached_set_errno(
84 *server
, err
, MEMCACHED_AT
,
85 memcached_literal_param("getsockopt() found the error from poll() after connect() "
86 "returned EINPROGRESS."));
90 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
91 memcached_literal_param("(number_of == 0)"));
94 assert(number_of
== 1);
96 if (fds
[0].revents
& POLLERR
or fds
[0].revents
& POLLHUP
or fds
[0].revents
& POLLNVAL
) {
98 socklen_t len
= sizeof(err
);
99 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char *) &err
, &len
) == -1) {
100 return memcached_set_errno(
101 *server
, errno
, MEMCACHED_AT
,
102 memcached_literal_param(
103 "getsockopt() errored while looking up error state from poll()"));
106 // We check the value to see what happened with the socket.
107 if (err
== 0) // Should not happen
109 return MEMCACHED_SUCCESS
;
113 return memcached_set_errno(
114 *server
, err
, MEMCACHED_AT
,
115 memcached_literal_param("getsockopt() found the error from poll() during connect."));
117 assert(fds
[0].revents
& POLLOUT
);
119 if (fds
[0].revents
& POLLOUT
and connection_error
!= EALREADY
) {
121 socklen_t len
= sizeof(err
);
122 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char *) &err
, &len
) == -1) {
123 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
127 return MEMCACHED_SUCCESS
;
130 return memcached_set_errno(
131 *server
, err
, MEMCACHED_AT
,
132 memcached_literal_param(
133 "getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
136 break; // We only have the loop setup for errno types that require restart
139 // This should only be possible from ERESTART or EINTR;
140 return memcached_set_errno(*server
, connection_error
, MEMCACHED_AT
,
141 memcached_literal_param("connect_poll() was exhausted"));
144 static memcached_return_t
set_hostinfo(memcached_instance_st
*server
) {
145 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
146 server
->clear_addrinfo();
148 char str_port
[MEMCACHED_NI_MAXSERV
] = {0};
150 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
151 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0 or errno
) {
152 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
153 memcached_literal_param("snprintf(NI_MAXSERV)"));
156 struct addrinfo hints
;
157 memset(&hints
, 0, sizeof(struct addrinfo
));
159 hints
.ai_family
= AF_UNSPEC
;
160 if (memcached_is_udp(server
->root
)) {
161 hints
.ai_protocol
= IPPROTO_UDP
;
162 hints
.ai_socktype
= SOCK_DGRAM
;
164 hints
.ai_socktype
= SOCK_STREAM
;
165 hints
.ai_protocol
= IPPROTO_TCP
;
168 assert(server
->address_info
== NULL
);
169 assert(server
->address_info_next
== NULL
);
171 assert(server
->hostname());
172 switch (errcode
= getaddrinfo(server
->hostname(), str_port
, &hints
, &server
->address_info
)) {
174 server
->address_info_next
= server
->address_info
;
175 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
179 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
180 memcached_string_make_from_cstr(gai_strerror(errcode
)));
183 server
->clear_addrinfo();
184 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
,
185 memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
188 server
->clear_addrinfo();
189 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
,
190 memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
193 server
->clear_addrinfo();
194 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
195 memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
198 server
->clear_addrinfo();
199 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
,
200 memcached_string_make_from_cstr(gai_strerror(errcode
)));
204 return MEMCACHED_SUCCESS
;
207 static inline void set_socket_nonblocking(memcached_instance_st
*server
) {
210 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
) {
211 memcached_set_errno(*server
, get_socket_errno(), NULL
);
216 if (SOCK_NONBLOCK
== 0) {
218 flags
= fcntl(server
->fd
, F_GETFL
, 0);
219 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
222 memcached_set_errno(*server
, errno
, NULL
);
223 } else if ((flags
& O_NONBLOCK
) == 0) {
227 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
228 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
231 memcached_set_errno(*server
, errno
, NULL
);
238 static bool set_socket_options(memcached_instance_st
*server
) {
239 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
242 // If SOCK_CLOEXEC exists then we don't need to call the following
243 if (SOCK_CLOEXEC
== 0) {
247 flags
= fcntl(server
->fd
, F_GETFD
, 0);
248 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
253 rval
= fcntl(server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
254 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
255 // we currently ignore the case where rval is -1
261 if (memcached_is_udp(server
->root
)) {
265 #ifdef HAVE_SO_SNDTIMEO
266 if (server
->root
->snd_timeout
> 0) {
267 struct timeval waittime
;
269 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
270 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
272 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
, (char *) &waittime
,
273 (socklen_t
) sizeof(struct timeval
));
279 #ifdef HAVE_SO_RCVTIMEO
280 if (server
->root
->rcv_timeout
> 0) {
281 struct timeval waittime
;
283 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
284 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
286 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
, (char *) &waittime
,
287 (socklen_t
) sizeof(struct timeval
));
295 # if defined(SO_NOSIGPIPE)
298 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *) &set
, sizeof(int));
302 // This is not considered a fatal error
305 perror("setsockopt(SO_NOSIGPIPE)");
309 # endif // SO_NOSIGPIPE
312 if (server
->root
->flags
.no_block
) {
313 struct linger linger
;
316 linger
.l_linger
= 0; /* By default on close() just drop the socket */
317 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
, (char *) &linger
,
318 (socklen_t
) sizeof(struct linger
));
324 if (server
->root
->flags
.tcp_nodelay
) {
328 setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
, (char *) &flag
, (socklen_t
) sizeof(int));
334 if (server
->root
->flags
.tcp_keepalive
) {
338 setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
, (char *) &flag
, (socklen_t
) sizeof(int));
344 if (server
->root
->tcp_keepidle
> 0) {
345 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
346 (char *) &server
->root
->tcp_keepidle
, (socklen_t
) sizeof(int));
352 if (server
->root
->send_size
> 0) {
353 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
, (char *) &server
->root
->send_size
,
354 (socklen_t
) sizeof(int));
359 if (server
->root
->recv_size
> 0) {
360 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
, (char *) &server
->root
->recv_size
,
361 (socklen_t
) sizeof(int));
366 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
367 set_socket_nonblocking(server
);
372 static memcached_return_t
unix_socket_connect(memcached_instance_st
*server
) {
374 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
377 int type
= SOCK_STREAM
;
379 type
|= SOCK_CLOEXEC
;
383 type
|= SOCK_NONBLOCK
;
386 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) == -1) {
387 return memcached_set_errno(*server
, errno
, NULL
);
390 struct sockaddr_un servAddr
;
392 memset(&servAddr
, 0, sizeof(struct sockaddr_un
));
393 servAddr
.sun_family
= AF_UNIX
;
394 if (strlen(server
->hostname()) >= sizeof(servAddr
.sun_path
)) {
395 return memcached_set_error(*server
, MEMCACHED_FAIL_UNIX_SOCKET
, MEMCACHED_AT
);
397 strncpy(servAddr
.sun_path
, server
->hostname(),
398 sizeof(servAddr
.sun_path
) - 1); /* Copy filename */
400 if (connect(server
->fd
, (struct sockaddr
*) &servAddr
, sizeof(servAddr
)) == -1) {
405 server
->events(POLLOUT
);
409 server
->reset_socket();
412 case EISCONN
: /* We were spinning waiting on connect */
414 assert(0); // Programmer error
415 server
->reset_socket();
420 WATCHPOINT_ERRNO(errno
);
421 server
->reset_socket();
422 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
426 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
428 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
430 return MEMCACHED_SUCCESS
;
433 return MEMCACHED_NOT_SUPPORTED
;
437 static memcached_return_t
network_connect(memcached_instance_st
*server
) {
438 bool timeout_error_occured
= false;
440 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
441 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
444 We want to check both of these because if address_info_next has been fully tried, we want to do
445 a new lookup to make sure we have picked up on any new DNS information.
447 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
) {
448 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
449 server
->address_info_next
= NULL
;
450 memcached_return_t rc
= set_hostinfo(server
);
452 if (memcached_failed(rc
)) {
457 assert(server
->address_info_next
);
458 assert(server
->address_info
);
460 /* Create the socket */
461 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
) {
462 int type
= server
->address_info_next
->ai_socktype
;
464 type
|= SOCK_CLOEXEC
;
468 type
|= SOCK_NONBLOCK
;
472 socket(server
->address_info_next
->ai_family
, type
, server
->address_info_next
->ai_protocol
);
474 if (int(server
->fd
) == SOCKET_ERROR
) {
475 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
478 if (set_socket_options(server
) == false) {
479 server
->reset_socket();
480 return MEMCACHED_CONNECTION_FAILURE
;
483 /* connect to server */
484 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
,
485 server
->address_info_next
->ai_addrlen
)
488 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
489 return MEMCACHED_SUCCESS
;
492 /* An error occurred */
493 int local_error
= get_socket_errno();
494 switch (local_error
) {
496 timeout_error_occured
= true;
499 #if EWOULDBLOCK != EAGAIN
503 case EINPROGRESS
: // nonblocking mode - first return
504 case EALREADY
: // nonblocking mode - subsequent returns
506 server
->events(POLLOUT
);
507 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
508 memcached_return_t rc
= connect_poll(server
, local_error
);
510 if (memcached_success(rc
)) {
511 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
512 return MEMCACHED_SUCCESS
;
515 // A timeout here is treated as an error, we will not retry
516 if (rc
== MEMCACHED_TIMEOUT
) {
517 timeout_error_occured
= true;
521 case EISCONN
: // we are connected :-)
522 WATCHPOINT_ASSERT(0); // This is a programmer's error
525 case EINTR
: // Special case, we retry ai_addr
526 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
527 server
->reset_socket();
531 // Probably not running service
534 memcached_set_errno(*server
, local_error
, MEMCACHED_AT
);
538 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
539 server
->reset_socket();
540 server
->address_info_next
= server
->address_info_next
->ai_next
;
543 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
545 if (timeout_error_occured
) {
546 server
->reset_socket();
549 WATCHPOINT_STRING("Never got a good file descriptor");
551 if (memcached_has_current_error(*server
)) {
552 return memcached_instance_error_return(server
);
555 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
) {
556 return memcached_set_error(
557 *server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
558 memcached_literal_param(
559 "if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
562 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
,
563 MEMCACHED_AT
); /* The last error should be from connect() */
569 Based on time/failure count fail the connect without trying. This prevents waiting in a state
570 where we get caught spending cycles just waiting.
572 static memcached_return_t
backoff_handling(memcached_instance_st
*server
, bool &in_timeout
) {
573 struct timeval curr_time
;
574 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
577 If we hit server_failure_limit then something is completely wrong about the server.
579 1) If autoeject is enabled we do that.
580 2) If not? We go into timeout again, there is much else to do :(
582 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
) {
584 We just auto_eject if we hit this point
586 if (_is_auto_eject_host(server
->root
)) {
587 set_last_disconnected_host(server
);
589 // Retry dead servers if requested
590 if (_gettime_success
and server
->root
->dead_timeout
> 0) {
591 server
->next_retry
= curr_time
.tv_sec
+ server
->root
->dead_timeout
;
593 // We only retry dead servers once before assuming failure again
594 server
->server_failure_counter
= server
->root
->server_failure_limit
- 1;
597 memcached_return_t rc
;
598 if (memcached_failed(rc
= run_distribution((memcached_st
*) server
->root
))) {
599 return memcached_set_error(
600 *server
, rc
, MEMCACHED_AT
,
601 memcached_literal_param("Backoff handling failed during run_distribution"));
604 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
607 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
609 // Sanity check/setting
610 if (server
->next_retry
== 0) {
611 server
->next_retry
= 1;
615 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
) {
617 If next_retry is less then our current time, then we reset and try everything again.
619 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
) {
620 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
621 server
->server_timeout_counter
= 0;
623 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
629 return MEMCACHED_SUCCESS
;
632 static memcached_return_t
_memcached_connect(memcached_instance_st
*server
,
633 const bool set_last_disconnected
) {
635 if (server
->fd
!= INVALID_SOCKET
) {
636 return MEMCACHED_SUCCESS
;
639 LIBMEMCACHED_MEMCACHED_CONNECT_START();
641 bool in_timeout
= false;
642 memcached_return_t rc
;
643 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
))) {
644 set_last_disconnected_host(server
);
648 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
649 and memcached_is_udp(server
->root
))
651 return memcached_set_error(
652 *server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
,
653 memcached_literal_param("SASL is not supported for UDP connections"));
656 if (server
->hostname()[0] == '/') {
657 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
660 /* We need to clean up the multi startup piece */
661 switch (server
->type
) {
662 case MEMCACHED_CONNECTION_UDP
:
663 case MEMCACHED_CONNECTION_TCP
:
664 rc
= network_connect(server
);
666 #if defined(LIBMEMCACHED_WITH_SASL_SUPPORT)
667 if (LIBMEMCACHED_WITH_SASL_SUPPORT
) {
668 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
) {
669 rc
= memcached_sasl_authenticate_connection(server
);
670 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
) {
671 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
672 server
->reset_socket();
679 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
680 rc
= unix_socket_connect(server
);
684 if (memcached_success(rc
)) {
685 server
->mark_server_as_clean();
686 memcached_version_instance(server
);
688 } else if (set_last_disconnected
) {
689 set_last_disconnected_host(server
);
690 if (memcached_has_current_error(*server
)) {
691 memcached_mark_server_for_timeout(server
);
692 assert(memcached_failed(memcached_instance_error_return(server
)));
694 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
695 memcached_mark_server_for_timeout(server
);
698 LIBMEMCACHED_MEMCACHED_CONNECT_END();
702 int snprintf_length
=
703 snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname(), int(server
->port()));
704 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
,
705 buffer
, snprintf_length
);
712 memcached_return_t
memcached_connect(memcached_instance_st
*server
) {
713 return _memcached_connect(server
, true);