1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
56 # define SO_NOSIGPIPE 0
60 # define TCP_NODELAY 0
64 # define TCP_KEEPIDLE 0
67 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
70 fds
[0].fd
= server
->fd
;
71 fds
[0].events
= POLLOUT
;
75 if (server
->root
->poll_timeout
== 0)
77 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
80 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
83 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
87 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
90 #ifdef TARGET_OS_LINUX
98 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
101 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
103 default: // This should not happen
104 if (fds
[0].revents
& POLLERR
)
107 socklen_t len
= sizeof(err
);
108 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
112 // This should never happen, if it does? Punt.
119 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
120 (void)closesocket(server
->fd
);
121 server
->fd
= INVALID_SOCKET
;
122 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
124 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
127 assert(number_of
== 0);
129 server
->io_wait_count
.timeouts
++;
130 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
133 if (fds
[0].revents
& POLLERR
or
134 fds
[0].revents
& POLLHUP
or
135 fds
[0].revents
& POLLNVAL
)
138 socklen_t len
= sizeof (err
);
139 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
141 // We check the value to see what happened wth the socket.
144 return MEMCACHED_SUCCESS
;
149 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
151 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
153 return MEMCACHED_SUCCESS
;
156 // This should only be possible from ERESTART or EINTR;
157 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
160 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
162 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
163 if (server
->address_info
)
165 freeaddrinfo(server
->address_info
);
166 server
->address_info
= NULL
;
167 server
->address_info_next
= NULL
;
170 char str_port
[MEMCACHED_NI_MAXSERV
];
171 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
172 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0)
174 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
175 memcached_literal_param("snprintf(NI_MAXSERV)"));
178 struct addrinfo hints
;
179 memset(&hints
, 0, sizeof(struct addrinfo
));
182 hints
.ai_family
= AF_INET
;
184 if (memcached_is_udp(server
->root
))
186 hints
.ai_protocol
= IPPROTO_UDP
;
187 hints
.ai_socktype
= SOCK_DGRAM
;
191 hints
.ai_socktype
= SOCK_STREAM
;
192 hints
.ai_protocol
= IPPROTO_TCP
;
195 assert(server
->address_info
== NULL
);
196 assert(server
->address_info_next
== NULL
);
198 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
204 if (server
->address_info
)
206 freeaddrinfo(server
->address_info
);
207 server
->address_info
= NULL
;
208 server
->address_info_next
= NULL
;
210 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
213 if (server
->address_info
)
215 freeaddrinfo(server
->address_info
);
216 server
->address_info
= NULL
;
217 server
->address_info_next
= NULL
;
219 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
222 if (server
->address_info
)
224 freeaddrinfo(server
->address_info
);
225 server
->address_info
= NULL
;
226 server
->address_info_next
= NULL
;
228 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
231 if (server
->address_info
)
233 freeaddrinfo(server
->address_info
);
234 server
->address_info
= NULL
;
235 server
->address_info_next
= NULL
;
237 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
241 if (server
->address_info
)
243 freeaddrinfo(server
->address_info
);
244 server
->address_info
= NULL
;
245 server
->address_info_next
= NULL
;
247 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
250 server
->address_info_next
= server
->address_info
;
251 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
253 return MEMCACHED_SUCCESS
;
256 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
260 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
262 memcached_set_errno(*server
, get_socket_errno(), NULL
);
267 if (SOCK_NONBLOCK
== 0)
271 flags
= fcntl(server
->fd
, F_GETFL
, 0);
272 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
276 memcached_set_errno(*server
, errno
, NULL
);
278 else if ((flags
& O_NONBLOCK
) == 0)
284 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
285 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
289 memcached_set_errno(*server
, errno
, NULL
);
296 static bool set_socket_options(org::libmemcached::Instance
* server
)
298 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
301 // If SOCK_CLOEXEC exists then we don't need to call the following
302 if (SOCK_CLOEXEC
== 0)
309 flags
= fcntl(server
->fd
, F_GETFD
, 0);
310 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
317 rval
= fcntl (server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
318 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
319 // we currently ignore the case where rval is -1
325 if (memcached_is_udp(server
->root
))
331 if (server
->root
->snd_timeout
> 0)
333 struct timeval waittime
;
335 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
336 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
338 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
339 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
346 if (server
->root
->rcv_timeout
> 0)
348 struct timeval waittime
;
350 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
351 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
353 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
354 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
361 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
365 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
369 // This is not considered a fatal error
373 perror("setsockopt(SO_NOSIGPIPE)");
379 if (server
->root
->flags
.no_block
)
381 struct linger linger
;
384 linger
.l_linger
= 0; /* By default on close() just drop the socket */
385 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
386 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
393 if (server
->root
->flags
.tcp_nodelay
)
397 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
398 (char*)&flag
, (socklen_t
)sizeof(int));
404 if (server
->root
->flags
.tcp_keepalive
)
408 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
409 (char*)&flag
, (socklen_t
)sizeof(int));
416 if (server
->root
->tcp_keepidle
> 0)
418 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
419 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
425 if (server
->root
->send_size
> 0)
427 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
428 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
433 if (server
->root
->recv_size
> 0)
435 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
436 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
441 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
442 set_socket_nonblocking(server
);
447 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
450 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
452 int type
= SOCK_STREAM
;
460 type
|= SOCK_NONBLOCK
;
463 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) < 0)
465 memcached_set_errno(*server
, errno
, NULL
);
466 return MEMCACHED_CONNECTION_FAILURE
;
469 struct sockaddr_un servAddr
;
471 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
472 servAddr
.sun_family
= AF_UNIX
;
473 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
476 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
485 case EISCONN
: /* We were spinning waiting on connect */
487 assert(0); // Programmer error
492 WATCHPOINT_ERRNO(errno
);
493 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
494 return MEMCACHED_CONNECTION_FAILURE
;
498 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
500 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
502 return MEMCACHED_SUCCESS
;
505 return MEMCACHED_NOT_SUPPORTED
;
509 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
511 bool timeout_error_occured
= false;
513 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
514 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
517 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
519 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
521 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
522 server
->address_info_next
= NULL
;
523 memcached_return_t rc
= set_hostinfo(server
);
525 if (memcached_failed(rc
))
531 if (server
->address_info_next
== NULL
)
533 server
->address_info_next
= server
->address_info
;
534 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
537 /* Create the socket */
538 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
540 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
541 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
543 server
->address_info_next
= server
->address_info_next
->ai_next
;
547 int type
= server
->address_info_next
->ai_socktype
;
555 type
|= SOCK_NONBLOCK
;
558 server
->fd
= socket(server
->address_info_next
->ai_family
,
560 server
->address_info_next
->ai_protocol
);
562 if (int(server
->fd
) == SOCKET_ERROR
)
564 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
567 if (set_socket_options(server
) == false)
569 (void)closesocket(server
->fd
);
570 return MEMCACHED_CONNECTION_FAILURE
;
573 /* connect to server */
574 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
576 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
577 return MEMCACHED_SUCCESS
;
580 /* An error occurred */
581 switch (get_socket_errno())
584 timeout_error_occured
= true;
588 #if EWOULDBLOCK != EAGAIN
591 case EINPROGRESS
: // nonblocking mode - first return
592 case EALREADY
: // nonblocking mode - subsequent returns
594 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
595 memcached_return_t rc
= connect_poll(server
);
597 if (memcached_success(rc
))
599 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
600 return MEMCACHED_SUCCESS
;
603 // A timeout here is treated as an error, we will not retry
604 if (rc
== MEMCACHED_TIMEOUT
)
606 timeout_error_occured
= true;
611 case EISCONN
: // we are connected :-)
612 WATCHPOINT_ASSERT(0); // This is a programmer's error
615 case EINTR
: // Special case, we retry ai_addr
616 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
617 (void)closesocket(server
->fd
);
618 server
->fd
= INVALID_SOCKET
;
625 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
626 (void)closesocket(server
->fd
);
627 server
->fd
= INVALID_SOCKET
;
628 server
->address_info_next
= server
->address_info_next
->ai_next
;
631 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
633 if (timeout_error_occured
)
635 if (server
->fd
!= INVALID_SOCKET
)
637 (void)closesocket(server
->fd
);
638 server
->fd
= INVALID_SOCKET
;
642 WATCHPOINT_STRING("Never got a good file descriptor");
644 if (memcached_has_current_error(*server
))
646 return memcached_instance_error_return(server
);
649 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
651 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
654 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
661 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
662 we get caught spending cycles just waiting.
664 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
666 struct timeval curr_time
;
667 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
670 If we hit server_failure_limit then something is completely wrong about the server.
672 1) If autoeject is enabled we do that.
673 2) If not? We go into timeout again, there is much else to do :(
675 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
678 We just auto_eject if we hit this point
680 if (_is_auto_eject_host(server
->root
))
682 set_last_disconnected_host(server
);
684 // Retry dead servers if requested
685 if (_gettime_success
and server
->root
->dead_timeout
> 0)
687 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
689 // We only retry dead servers once before assuming failure again
690 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
693 memcached_return_t rc
;
694 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
696 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
699 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
702 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
704 // Sanity check/setting
705 if (server
->next_retry
== 0)
707 server
->next_retry
= 1;
711 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
714 If next_retry is less then our current time, then we reset and try everything again.
716 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
718 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
722 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
728 return MEMCACHED_SUCCESS
;
731 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
734 if (server
->fd
!= INVALID_SOCKET
)
736 return MEMCACHED_SUCCESS
;
739 LIBMEMCACHED_MEMCACHED_CONNECT_START();
741 bool in_timeout
= false;
742 memcached_return_t rc
;
743 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
745 set_last_disconnected_host(server
);
749 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
751 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
754 if (server
->hostname
[0] == '/')
756 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
759 /* We need to clean up the multi startup piece */
760 switch (server
->type
)
762 case MEMCACHED_CONNECTION_UDP
:
763 case MEMCACHED_CONNECTION_TCP
:
764 rc
= network_connect(server
);
766 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
768 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
770 rc
= memcached_sasl_authenticate_connection(server
);
771 fprintf(stderr
, "%s:%d %s\n", __FILE__
, __LINE__
, memcached_strerror(NULL
, rc
));
772 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
774 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
775 (void)closesocket(server
->fd
);
776 server
->fd
= INVALID_SOCKET
;
782 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
783 rc
= unix_socket_connect(server
);
787 if (memcached_success(rc
))
789 server
->mark_server_as_clean();
790 memcached_version_instance(server
);
793 else if (set_last_disconnected
)
795 set_last_disconnected_host(server
);
796 if (memcached_has_current_error(*server
))
798 memcached_mark_server_for_timeout(server
);
799 assert(memcached_failed(memcached_instance_error_return(server
)));
803 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
804 memcached_mark_server_for_timeout(server
);
807 LIBMEMCACHED_MEMCACHED_CONNECT_END();
812 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
813 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
820 memcached_return_t
memcached_connect_try(org::libmemcached::Instance
* server
)
822 if (server
and server
->root
and server
->root
->state
.is_parsing
)
824 return MEMCACHED_SUCCESS
;
827 return _memcached_connect(server
, false);
830 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
832 return _memcached_connect(server
, true);