1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include "libmemcached/common.h"
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
56 # define SO_NOSIGPIPE 0
60 # define TCP_NODELAY 0
64 # define TCP_KEEPIDLE 0
67 static memcached_return_t
connect_poll(memcached_instance_st
* server
, const int connection_error
)
70 fds
[0].fd
= server
->fd
;
71 fds
[0].events
= server
->events();
76 if (server
->root
->poll_timeout
== 0)
78 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
79 memcached_literal_param("The time to wait for a connection to be established was set to zero which produces a timeout to every call to poll()."));
82 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
85 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) == -1)
87 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
98 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
101 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
102 memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
104 default: // This should not happen
108 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
109 server
->reset_socket();
110 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
112 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
117 if (connection_error
== EINPROGRESS
)
120 socklen_t len
= sizeof(err
);
121 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
123 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
126 // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
129 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
133 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_literal_param("(number_of == 0)"));
136 assert (number_of
== 1);
138 if (fds
[0].revents
& POLLERR
or
139 fds
[0].revents
& POLLHUP
or
140 fds
[0].revents
& POLLNVAL
)
143 socklen_t len
= sizeof (err
);
144 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
146 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getsockopt() errored while looking up error state from poll()"));
149 // We check the value to see what happened with the socket.
150 if (err
== 0) // Should not happen
152 return MEMCACHED_SUCCESS
;
156 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() during connect."));
158 assert(fds
[0].revents
& POLLOUT
);
160 if (fds
[0].revents
& POLLOUT
and connection_error
== EINPROGRESS
)
163 socklen_t len
= sizeof(err
);
164 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
166 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
171 return MEMCACHED_SUCCESS
;
174 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
177 break; // We only have the loop setup for errno types that require restart
180 // This should only be possible from ERESTART or EINTR;
181 return memcached_set_errno(*server
, connection_error
, MEMCACHED_AT
, memcached_literal_param("connect_poll() was exhausted"));
184 static memcached_return_t
set_hostinfo(memcached_instance_st
* server
)
186 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
187 server
->clear_addrinfo();
189 char str_port
[MEMCACHED_NI_MAXSERV
]= { 0 };
191 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
192 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0 or errno
!= 0)
194 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
195 memcached_literal_param("snprintf(NI_MAXSERV)"));
198 struct addrinfo hints
;
199 memset(&hints
, 0, sizeof(struct addrinfo
));
201 hints
.ai_family
= AF_UNSPEC
;
202 if (memcached_is_udp(server
->root
))
204 hints
.ai_protocol
= IPPROTO_UDP
;
205 hints
.ai_socktype
= SOCK_DGRAM
;
209 hints
.ai_socktype
= SOCK_STREAM
;
210 hints
.ai_protocol
= IPPROTO_TCP
;
213 assert(server
->address_info
== NULL
);
214 assert(server
->address_info_next
== NULL
);
216 assert(server
->hostname());
217 switch(errcode
= getaddrinfo(server
->hostname(), str_port
, &hints
, &server
->address_info
))
220 server
->address_info_next
= server
->address_info
;
221 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
225 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
228 server
->clear_addrinfo();
229 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
232 server
->clear_addrinfo();
233 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
236 server
->clear_addrinfo();
237 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
241 server
->clear_addrinfo();
242 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
246 return MEMCACHED_SUCCESS
;
249 static inline void set_socket_nonblocking(memcached_instance_st
* server
)
253 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
255 memcached_set_errno(*server
, get_socket_errno(), NULL
);
260 if (SOCK_NONBLOCK
== 0)
264 flags
= fcntl(server
->fd
, F_GETFL
, 0);
265 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
269 memcached_set_errno(*server
, errno
, NULL
);
271 else if ((flags
& O_NONBLOCK
) == 0)
277 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
278 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
282 memcached_set_errno(*server
, errno
, NULL
);
289 static bool set_socket_options(memcached_instance_st
* server
)
291 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
294 // If SOCK_CLOEXEC exists then we don't need to call the following
295 if (SOCK_CLOEXEC
== 0)
302 flags
= fcntl(server
->fd
, F_GETFD
, 0);
303 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
310 rval
= fcntl (server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
311 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
312 // we currently ignore the case where rval is -1
318 if (memcached_is_udp(server
->root
))
324 if (server
->root
->snd_timeout
> 0)
326 struct timeval waittime
;
328 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
329 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
331 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
332 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
339 if (server
->root
->rcv_timeout
> 0)
341 struct timeval waittime
;
343 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
344 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
346 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
347 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
356 # if defined(SO_NOSIGPIPE)
360 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
364 // This is not considered a fatal error
368 perror("setsockopt(SO_NOSIGPIPE)");
372 # endif // SO_NOSIGPIPE
375 if (server
->root
->flags
.no_block
)
377 struct linger linger
;
380 linger
.l_linger
= 0; /* By default on close() just drop the socket */
381 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
382 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
389 if (server
->root
->flags
.tcp_nodelay
)
393 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
394 (char*)&flag
, (socklen_t
)sizeof(int));
400 if (server
->root
->flags
.tcp_keepalive
)
404 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
405 (char*)&flag
, (socklen_t
)sizeof(int));
412 if (server
->root
->tcp_keepidle
> 0)
414 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
415 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
421 if (server
->root
->send_size
> 0)
423 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
424 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
429 if (server
->root
->recv_size
> 0)
431 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
432 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
437 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
438 set_socket_nonblocking(server
);
443 static memcached_return_t
unix_socket_connect(memcached_instance_st
* server
)
446 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
449 int type
= SOCK_STREAM
;
450 if (SOCK_CLOEXEC
!= 0)
455 if (SOCK_NONBLOCK
!= 0)
457 type
|= SOCK_NONBLOCK
;
460 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) == -1)
462 return memcached_set_errno(*server
, errno
, NULL
);
465 struct sockaddr_un servAddr
;
467 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
468 servAddr
.sun_family
= AF_UNIX
;
469 if (strlen(server
->hostname()) >= sizeof(servAddr
.sun_path
)) {
470 return memcached_set_error(*server
, MEMCACHED_UNIX_SOCKET_PATH_TOO_BIG
, MEMCACHED_AT
);
472 strncpy(servAddr
.sun_path
, server
->hostname(), sizeof(servAddr
.sun_path
)-1); /* Copy filename */
474 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) == -1)
481 server
->events(POLLOUT
);
485 server
->reset_socket();
488 case EISCONN
: /* We were spinning waiting on connect */
490 assert(0); // Programmer error
491 server
->reset_socket();
496 WATCHPOINT_ERRNO(errno
);
497 server
->reset_socket();
498 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
502 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
504 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
506 return MEMCACHED_SUCCESS
;
509 return MEMCACHED_NOT_SUPPORTED
;
513 static memcached_return_t
network_connect(memcached_instance_st
* server
)
515 bool timeout_error_occured
= false;
517 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
518 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
521 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
523 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
525 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
526 server
->address_info_next
= NULL
;
527 memcached_return_t rc
= set_hostinfo(server
);
529 if (memcached_failed(rc
))
535 assert(server
->address_info_next
);
536 assert(server
->address_info
);
538 /* Create the socket */
539 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
541 int type
= server
->address_info_next
->ai_socktype
;
542 if (SOCK_CLOEXEC
!= 0)
547 if (SOCK_NONBLOCK
!= 0)
549 type
|= SOCK_NONBLOCK
;
552 server
->fd
= socket(server
->address_info_next
->ai_family
,
554 server
->address_info_next
->ai_protocol
);
556 if (int(server
->fd
) == SOCKET_ERROR
)
558 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
561 if (set_socket_options(server
) == false)
563 server
->reset_socket();
564 return MEMCACHED_CONNECTION_FAILURE
;
567 /* connect to server */
568 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
570 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
571 return MEMCACHED_SUCCESS
;
574 /* An error occurred */
575 int local_error
= get_socket_errno();
579 timeout_error_occured
= true;
582 #if EWOULDBLOCK != EAGAIN
585 case EINPROGRESS
: // nonblocking mode - first return
586 case EALREADY
: // nonblocking mode - subsequent returns
588 server
->events(POLLOUT
);
589 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
590 memcached_return_t rc
= connect_poll(server
, local_error
);
592 if (memcached_success(rc
))
594 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
595 return MEMCACHED_SUCCESS
;
598 // A timeout here is treated as an error, we will not retry
599 if (rc
== MEMCACHED_TIMEOUT
)
601 timeout_error_occured
= true;
606 case EISCONN
: // we are connected :-)
607 WATCHPOINT_ASSERT(0); // This is a programmer's error
610 case EINTR
: // Special case, we retry ai_addr
611 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
612 server
->reset_socket();
616 // Probably not running service
619 memcached_set_errno(*server
, local_error
, MEMCACHED_AT
);
623 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
624 server
->reset_socket();
625 server
->address_info_next
= server
->address_info_next
->ai_next
;
628 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
630 if (timeout_error_occured
)
632 server
->reset_socket();
635 WATCHPOINT_STRING("Never got a good file descriptor");
637 if (memcached_has_current_error(*server
))
639 return memcached_instance_error_return(server
);
642 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
644 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
645 memcached_literal_param("if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
648 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
655 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
656 we get caught spending cycles just waiting.
658 static memcached_return_t
backoff_handling(memcached_instance_st
* server
, bool& in_timeout
)
660 struct timeval curr_time
;
661 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
664 If we hit server_failure_limit then something is completely wrong about the server.
666 1) If autoeject is enabled we do that.
667 2) If not? We go into timeout again, there is much else to do :(
669 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
672 We just auto_eject if we hit this point
674 if (_is_auto_eject_host(server
->root
))
676 set_last_disconnected_host(server
);
678 // Retry dead servers if requested
679 if (_gettime_success
and server
->root
->dead_timeout
> 0)
681 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
683 // We only retry dead servers once before assuming failure again
684 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
687 memcached_return_t rc
;
688 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
690 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
693 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
696 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
698 // Sanity check/setting
699 if (server
->next_retry
== 0)
701 server
->next_retry
= 1;
705 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
708 If next_retry is less then our current time, then we reset and try everything again.
710 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
712 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
713 server
->server_timeout_counter
= 0;
717 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
723 return MEMCACHED_SUCCESS
;
726 static memcached_return_t
_memcached_connect(memcached_instance_st
* server
, const bool set_last_disconnected
)
729 if (server
->fd
!= INVALID_SOCKET
)
731 return MEMCACHED_SUCCESS
;
734 LIBMEMCACHED_MEMCACHED_CONNECT_START();
736 bool in_timeout
= false;
737 memcached_return_t rc
;
738 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
740 set_last_disconnected_host(server
);
744 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
746 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
749 if (server
->hostname()[0] == '/')
751 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
754 /* We need to clean up the multi startup piece */
755 switch (server
->type
)
757 case MEMCACHED_CONNECTION_UDP
:
758 case MEMCACHED_CONNECTION_TCP
:
759 rc
= network_connect(server
);
761 #if defined(LIBMEMCACHED_WITH_SASL_SUPPORT)
762 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
764 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
766 rc
= memcached_sasl_authenticate_connection(server
);
767 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
769 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
770 server
->reset_socket();
777 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
778 rc
= unix_socket_connect(server
);
782 if (memcached_success(rc
))
784 server
->mark_server_as_clean();
785 memcached_version_instance(server
);
788 else if (set_last_disconnected
)
790 set_last_disconnected_host(server
);
791 if (memcached_has_current_error(*server
))
793 memcached_mark_server_for_timeout(server
);
794 assert(memcached_failed(memcached_instance_error_return(server
)));
798 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
799 memcached_mark_server_for_timeout(server
);
802 LIBMEMCACHED_MEMCACHED_CONNECT_END();
807 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname(), int(server
->port()));
808 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
815 memcached_return_t
memcached_connect(memcached_instance_st
* server
)
817 return _memcached_connect(server
, true);