1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
46 # define SOCK_CLOEXEC 0
50 # define SOCK_NONBLOCK 0
57 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
60 fds
[0].fd
= server
->fd
;
61 fds
[0].events
= POLLOUT
;
65 if (server
->root
->poll_timeout
== 0)
67 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
70 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
73 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
77 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
80 #ifdef TARGET_OS_LINUX
88 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
91 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
93 default: // This should not happen
94 if (fds
[0].revents
& POLLERR
)
97 socklen_t len
= sizeof(err
);
98 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
102 // This should never happen, if it does? Punt.
109 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
110 (void)closesocket(server
->fd
);
111 server
->fd
= INVALID_SOCKET
;
112 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
114 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
117 assert(number_of
== 0);
119 server
->io_wait_count
.timeouts
++;
120 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
123 if (fds
[0].revents
& POLLERR
or
124 fds
[0].revents
& POLLHUP
or
125 fds
[0].revents
& POLLNVAL
)
128 socklen_t len
= sizeof (err
);
129 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
131 // We check the value to see what happened wth the socket.
134 return MEMCACHED_SUCCESS
;
139 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
141 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
143 return MEMCACHED_SUCCESS
;
146 // This should only be possible from ERESTART or EINTR;
147 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
150 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
152 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
153 if (server
->address_info
)
155 freeaddrinfo(server
->address_info
);
156 server
->address_info
= NULL
;
157 server
->address_info_next
= NULL
;
160 char str_port
[NI_MAXSERV
];
161 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", uint32_t(server
->port()));
162 if (length
>= NI_MAXSERV
or length
<= 0)
164 return MEMCACHED_FAILURE
;
167 struct addrinfo hints
;
168 memset(&hints
, 0, sizeof(struct addrinfo
));
171 hints
.ai_family
= AF_INET
;
173 if (memcached_is_udp(server
->root
))
175 hints
.ai_protocol
= IPPROTO_UDP
;
176 hints
.ai_socktype
= SOCK_DGRAM
;
180 hints
.ai_socktype
= SOCK_STREAM
;
181 hints
.ai_protocol
= IPPROTO_TCP
;
184 assert(server
->address_info
== NULL
);
185 assert(server
->address_info_next
== NULL
);
187 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
193 if (server
->address_info
)
195 freeaddrinfo(server
->address_info
);
196 server
->address_info
= NULL
;
197 server
->address_info_next
= NULL
;
199 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
202 if (server
->address_info
)
204 freeaddrinfo(server
->address_info
);
205 server
->address_info
= NULL
;
206 server
->address_info_next
= NULL
;
208 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
211 if (server
->address_info
)
213 freeaddrinfo(server
->address_info
);
214 server
->address_info
= NULL
;
215 server
->address_info_next
= NULL
;
217 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
220 if (server
->address_info
)
222 freeaddrinfo(server
->address_info
);
223 server
->address_info
= NULL
;
224 server
->address_info_next
= NULL
;
226 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
230 if (server
->address_info
)
232 freeaddrinfo(server
->address_info
);
233 server
->address_info
= NULL
;
234 server
->address_info_next
= NULL
;
236 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
239 server
->address_info_next
= server
->address_info
;
240 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
242 return MEMCACHED_SUCCESS
;
245 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
249 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
251 memcached_set_errno(*server
, get_socket_errno(), NULL
);
256 if (SOCK_NONBLOCK
== 0)
260 flags
= fcntl(server
->fd
, F_GETFL
, 0);
261 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
265 memcached_set_errno(*server
, errno
, NULL
);
267 else if ((flags
& O_NONBLOCK
) == 0)
273 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
274 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
278 memcached_set_errno(*server
, errno
, NULL
);
285 static void set_socket_options(org::libmemcached::Instance
* server
)
287 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
289 if (memcached_is_udp(server
->root
))
295 if (server
->root
->snd_timeout
> 0)
297 struct timeval waittime
;
299 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
300 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
302 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
303 &waittime
, (socklen_t
)sizeof(struct timeval
));
310 if (server
->root
->rcv_timeout
> 0)
312 struct timeval waittime
;
314 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
315 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
317 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
318 &waittime
, (socklen_t
)sizeof(struct timeval
));
325 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
328 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
332 // This is not considered a fatal error
336 perror("setsockopt(SO_NOSIGPIPE)");
342 if (server
->root
->flags
.no_block
)
344 struct linger linger
;
347 linger
.l_linger
= 0; /* By default on close() just drop the socket */
348 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
349 &linger
, (socklen_t
)sizeof(struct linger
));
354 if (server
->root
->flags
.tcp_nodelay
)
358 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
359 &flag
, (socklen_t
)sizeof(int));
364 if (server
->root
->flags
.tcp_keepalive
)
368 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
369 &flag
, (socklen_t
)sizeof(int));
375 if (server
->root
->tcp_keepidle
> 0)
377 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
378 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
384 if (server
->root
->send_size
> 0)
386 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
387 &server
->root
->send_size
, (socklen_t
)sizeof(int));
392 if (server
->root
->recv_size
> 0)
394 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
395 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
400 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
401 set_socket_nonblocking(server
);
404 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
407 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
409 int type
= SOCK_STREAM
;
417 type
|= SOCK_NONBLOCK
;
420 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) < 0)
422 memcached_set_errno(*server
, errno
, NULL
);
423 return MEMCACHED_CONNECTION_FAILURE
;
426 struct sockaddr_un servAddr
;
428 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
429 servAddr
.sun_family
= AF_UNIX
;
430 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
433 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
442 case EISCONN
: /* We were spinning waiting on connect */
444 assert(0); // Programmer error
449 WATCHPOINT_ERRNO(errno
);
450 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
451 return MEMCACHED_CONNECTION_FAILURE
;
455 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
457 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
459 return MEMCACHED_SUCCESS
;
462 return MEMCACHED_NOT_SUPPORTED
;
466 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
468 bool timeout_error_occured
= false;
470 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
471 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
474 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
476 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
478 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
479 server
->address_info_next
= NULL
;
480 memcached_return_t rc
= set_hostinfo(server
);
482 if (memcached_failed(rc
))
488 if (server
->address_info_next
== NULL
)
490 server
->address_info_next
= server
->address_info
;
491 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
494 /* Create the socket */
495 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
497 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
498 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
500 server
->address_info_next
= server
->address_info_next
->ai_next
;
504 int type
= server
->address_info_next
->ai_socktype
;
512 type
|= SOCK_NONBLOCK
;
515 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
517 server
->address_info_next
->ai_protocol
)) < 0)
519 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
522 // If SOCK_CLOEXEC exists then we don't need to call the following
523 if (SOCK_CLOEXEC
== 0)
530 rval
= fcntl (server
->fd
, F_SETFD
, FD_CLOEXEC
);
531 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
535 set_socket_options(server
);
537 /* connect to server */
538 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
540 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
541 return MEMCACHED_SUCCESS
;
544 /* An error occurred */
545 switch (get_socket_errno())
548 timeout_error_occured
= true;
552 #if EWOULDBLOCK != EAGAIN
555 case EINPROGRESS
: // nonblocking mode - first return
556 case EALREADY
: // nonblocking mode - subsequent returns
558 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
559 memcached_return_t rc
= connect_poll(server
);
561 if (memcached_success(rc
))
563 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
564 return MEMCACHED_SUCCESS
;
567 // A timeout here is treated as an error, we will not retry
568 if (rc
== MEMCACHED_TIMEOUT
)
570 timeout_error_occured
= true;
575 case EISCONN
: // we are connected :-)
576 WATCHPOINT_ASSERT(0); // This is a programmer's error
579 case EINTR
: // Special case, we retry ai_addr
580 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
581 (void)closesocket(server
->fd
);
582 server
->fd
= INVALID_SOCKET
;
589 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
590 (void)closesocket(server
->fd
);
591 server
->fd
= INVALID_SOCKET
;
592 server
->address_info_next
= server
->address_info_next
->ai_next
;
595 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
597 if (timeout_error_occured
)
599 if (server
->fd
!= INVALID_SOCKET
)
601 (void)closesocket(server
->fd
);
602 server
->fd
= INVALID_SOCKET
;
606 WATCHPOINT_STRING("Never got a good file descriptor");
608 if (memcached_has_current_error(*server
))
610 return memcached_instance_error_return(server
);
613 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
615 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
618 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
625 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
626 we get caught spending cycles just waiting.
628 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
630 struct timeval curr_time
;
631 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
634 If we hit server_failure_limit then something is completely wrong about the server.
636 1) If autoeject is enabled we do that.
637 2) If not? We go into timeout again, there is much else to do :(
639 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
642 We just auto_eject if we hit this point
644 if (_is_auto_eject_host(server
->root
))
646 set_last_disconnected_host(server
);
648 // Retry dead servers if requested
649 if (_gettime_success
and server
->root
->dead_timeout
> 0)
651 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
653 // We only retry dead servers once before assuming failure again
654 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
657 memcached_return_t rc
;
658 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
660 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
663 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
666 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
668 // Sanity check/setting
669 if (server
->next_retry
== 0)
671 server
->next_retry
= 1;
675 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
678 If next_retry is less then our current time, then we reset and try everything again.
680 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
682 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
686 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
692 return MEMCACHED_SUCCESS
;
695 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
698 if (server
->fd
!= INVALID_SOCKET
)
700 return MEMCACHED_SUCCESS
;
703 LIBMEMCACHED_MEMCACHED_CONNECT_START();
705 bool in_timeout
= false;
706 memcached_return_t rc
;
707 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
709 set_last_disconnected_host(server
);
713 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
715 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
718 if (server
->hostname
[0] == '/')
720 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
723 /* We need to clean up the multi startup piece */
724 switch (server
->type
)
726 case MEMCACHED_CONNECTION_UDP
:
727 case MEMCACHED_CONNECTION_TCP
:
728 rc
= network_connect(server
);
730 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
732 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
734 rc
= memcached_sasl_authenticate_connection(server
);
735 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
737 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
738 (void)closesocket(server
->fd
);
739 server
->fd
= INVALID_SOCKET
;
745 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
746 rc
= unix_socket_connect(server
);
750 if (memcached_success(rc
))
752 server
->mark_server_as_clean();
753 memcached_version_instance(server
);
756 else if (set_last_disconnected
)
758 set_last_disconnected_host(server
);
759 if (memcached_has_current_error(*server
))
761 memcached_mark_server_for_timeout(server
);
762 assert(memcached_failed(memcached_instance_error_return(server
)));
766 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
767 memcached_mark_server_for_timeout(server
);
770 LIBMEMCACHED_MEMCACHED_CONNECT_END();
775 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
776 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
783 memcached_return_t
memcached_connect_try(org::libmemcached::Instance
* server
)
785 if (server
and server
->root
and server
->root
->state
.is_parsing
)
787 return MEMCACHED_SUCCESS
;
790 return _memcached_connect(server
, false);
793 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
795 return _memcached_connect(server
, true);