1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
56 # define SO_NOSIGPIPE 0
60 # define TCP_NODELAY 0
64 # define TCP_KEEPIDLE 0
67 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
70 fds
[0].fd
= server
->fd
;
71 fds
[0].events
= server
->events();
76 if (server
->root
->poll_timeout
== 0)
78 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
81 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
84 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
88 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
91 #ifdef TARGET_OS_LINUX
99 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
102 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
104 default: // This should not happen
105 if (fds
[0].revents
& POLLERR
)
108 socklen_t len
= sizeof(err
);
109 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
113 // This should never happen, if it does? Punt.
120 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
121 server
->reset_socket();
122 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
124 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
127 assert(number_of
== 0);
129 server
->io_wait_count
.timeouts
++;
130 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
134 server
->revents(fds
[0].revents
);
137 if (fds
[0].revents
& POLLERR
or
138 fds
[0].revents
& POLLHUP
or
139 fds
[0].revents
& POLLNVAL
)
142 socklen_t len
= sizeof (err
);
143 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
145 // We check the value to see what happened wth the socket.
148 return MEMCACHED_SUCCESS
;
153 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
155 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
157 return MEMCACHED_SUCCESS
;
160 // This should only be possible from ERESTART or EINTR;
161 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
164 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
166 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
167 server
->clear_addrinfo();
169 char str_port
[MEMCACHED_NI_MAXSERV
];
170 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
171 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0)
173 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
174 memcached_literal_param("snprintf(NI_MAXSERV)"));
177 struct addrinfo hints
;
178 memset(&hints
, 0, sizeof(struct addrinfo
));
180 hints
.ai_family
= AF_INET
;
181 if (memcached_is_udp(server
->root
))
183 hints
.ai_protocol
= IPPROTO_UDP
;
184 hints
.ai_socktype
= SOCK_DGRAM
;
188 hints
.ai_socktype
= SOCK_STREAM
;
189 hints
.ai_protocol
= IPPROTO_TCP
;
192 assert(server
->address_info
== NULL
);
193 assert(server
->address_info_next
== NULL
);
195 assert(server
->hostname());
196 switch(errcode
= getaddrinfo(server
->hostname(), str_port
, &hints
, &server
->address_info
))
199 server
->address_info_next
= server
->address_info
;
200 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
204 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
207 server
->clear_addrinfo();
208 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
211 server
->clear_addrinfo();
212 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
215 server
->clear_addrinfo();
216 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
220 server
->clear_addrinfo();
221 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
225 return MEMCACHED_SUCCESS
;
228 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
232 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
234 memcached_set_errno(*server
, get_socket_errno(), NULL
);
239 if (SOCK_NONBLOCK
== 0)
243 flags
= fcntl(server
->fd
, F_GETFL
, 0);
244 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
248 memcached_set_errno(*server
, errno
, NULL
);
250 else if ((flags
& O_NONBLOCK
) == 0)
256 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
257 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
261 memcached_set_errno(*server
, errno
, NULL
);
268 static bool set_socket_options(org::libmemcached::Instance
* server
)
270 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
273 // If SOCK_CLOEXEC exists then we don't need to call the following
274 if (SOCK_CLOEXEC
== 0)
281 flags
= fcntl(server
->fd
, F_GETFD
, 0);
282 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
289 rval
= fcntl (server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
290 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
291 // we currently ignore the case where rval is -1
297 if (memcached_is_udp(server
->root
))
303 if (server
->root
->snd_timeout
> 0)
305 struct timeval waittime
;
307 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
308 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
310 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
311 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
318 if (server
->root
->rcv_timeout
> 0)
320 struct timeval waittime
;
322 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
323 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
325 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
326 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
335 #if defined(SO_NOSIGPIPE)
339 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
343 // This is not considered a fatal error
347 perror("setsockopt(SO_NOSIGPIPE)");
351 #endif // SO_NOSIGPIPE
354 if (server
->root
->flags
.no_block
)
356 struct linger linger
;
359 linger
.l_linger
= 0; /* By default on close() just drop the socket */
360 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
361 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
368 if (server
->root
->flags
.tcp_nodelay
)
372 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
373 (char*)&flag
, (socklen_t
)sizeof(int));
379 if (server
->root
->flags
.tcp_keepalive
)
383 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
384 (char*)&flag
, (socklen_t
)sizeof(int));
391 if (server
->root
->tcp_keepidle
> 0)
393 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
394 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
400 if (server
->root
->send_size
> 0)
402 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
403 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
408 if (server
->root
->recv_size
> 0)
410 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
411 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
416 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
417 set_socket_nonblocking(server
);
422 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
425 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
428 int type
= SOCK_STREAM
;
436 type
|= SOCK_NONBLOCK
;
439 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) == -1)
441 return memcached_set_errno(*server
, errno
, NULL
);
444 struct sockaddr_un servAddr
;
446 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
447 servAddr
.sun_family
= AF_UNIX
;
448 strncpy(servAddr
.sun_path
, server
->hostname(), sizeof(servAddr
.sun_path
)); /* Copy filename */
450 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) == -1)
456 server
->events(POLLOUT
);
460 server
->reset_socket();
463 case EISCONN
: /* We were spinning waiting on connect */
465 assert(0); // Programmer error
466 server
->reset_socket();
471 WATCHPOINT_ERRNO(errno
);
472 server
->reset_socket();
473 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
477 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
479 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
481 return MEMCACHED_SUCCESS
;
484 return MEMCACHED_NOT_SUPPORTED
;
488 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
490 bool timeout_error_occured
= false;
492 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
493 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
496 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
498 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
500 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
501 server
->address_info_next
= NULL
;
502 memcached_return_t rc
= set_hostinfo(server
);
504 if (memcached_failed(rc
))
510 assert(server
->address_info_next
);
511 assert(server
->address_info
);
513 /* Create the socket */
514 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
516 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
517 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
519 server
->address_info_next
= server
->address_info_next
->ai_next
;
523 int type
= server
->address_info_next
->ai_socktype
;
531 type
|= SOCK_NONBLOCK
;
534 server
->fd
= socket(server
->address_info_next
->ai_family
,
536 server
->address_info_next
->ai_protocol
);
538 if (int(server
->fd
) == SOCKET_ERROR
)
540 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
543 if (set_socket_options(server
) == false)
545 server
->reset_socket();
546 return MEMCACHED_CONNECTION_FAILURE
;
549 /* connect to server */
550 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
552 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
553 return MEMCACHED_SUCCESS
;
556 /* An error occurred */
557 switch (get_socket_errno())
560 timeout_error_occured
= true;
564 #if EWOULDBLOCK != EAGAIN
567 case EINPROGRESS
: // nonblocking mode - first return
568 case EALREADY
: // nonblocking mode - subsequent returns
570 server
->events(POLLOUT
);
571 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
572 memcached_return_t rc
= connect_poll(server
);
574 if (memcached_success(rc
))
576 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
577 return MEMCACHED_SUCCESS
;
580 // A timeout here is treated as an error, we will not retry
581 if (rc
== MEMCACHED_TIMEOUT
)
583 timeout_error_occured
= true;
588 case EISCONN
: // we are connected :-)
589 WATCHPOINT_ASSERT(0); // This is a programmer's error
592 case EINTR
: // Special case, we retry ai_addr
593 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
594 server
->reset_socket();
598 // Probably not running service
604 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
605 server
->reset_socket();
606 server
->address_info_next
= server
->address_info_next
->ai_next
;
609 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
611 if (timeout_error_occured
)
613 server
->reset_socket();
616 WATCHPOINT_STRING("Never got a good file descriptor");
618 if (memcached_has_current_error(*server
))
620 return memcached_instance_error_return(server
);
623 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
625 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
628 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
635 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
636 we get caught spending cycles just waiting.
638 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
640 struct timeval curr_time
;
641 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
644 If we hit server_failure_limit then something is completely wrong about the server.
646 1) If autoeject is enabled we do that.
647 2) If not? We go into timeout again, there is much else to do :(
649 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
652 We just auto_eject if we hit this point
654 if (_is_auto_eject_host(server
->root
))
656 set_last_disconnected_host(server
);
658 // Retry dead servers if requested
659 if (_gettime_success
and server
->root
->dead_timeout
> 0)
661 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
663 // We only retry dead servers once before assuming failure again
664 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
667 memcached_return_t rc
;
668 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
670 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
673 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
676 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
678 // Sanity check/setting
679 if (server
->next_retry
== 0)
681 server
->next_retry
= 1;
685 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
688 If next_retry is less then our current time, then we reset and try everything again.
690 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
692 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
696 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
702 return MEMCACHED_SUCCESS
;
705 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
708 if (server
->fd
!= INVALID_SOCKET
)
710 return MEMCACHED_SUCCESS
;
713 LIBMEMCACHED_MEMCACHED_CONNECT_START();
715 bool in_timeout
= false;
716 memcached_return_t rc
;
717 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
719 set_last_disconnected_host(server
);
723 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
725 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
728 if (server
->hostname()[0] == '/')
730 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
733 /* We need to clean up the multi startup piece */
734 switch (server
->type
)
736 case MEMCACHED_CONNECTION_UDP
:
737 case MEMCACHED_CONNECTION_TCP
:
738 rc
= network_connect(server
);
740 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
742 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
744 rc
= memcached_sasl_authenticate_connection(server
);
745 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
747 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
748 server
->reset_socket();
754 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
755 rc
= unix_socket_connect(server
);
759 if (memcached_success(rc
))
761 server
->mark_server_as_clean();
762 memcached_version_instance(server
);
765 else if (set_last_disconnected
)
767 set_last_disconnected_host(server
);
768 if (memcached_has_current_error(*server
))
770 memcached_mark_server_for_timeout(server
);
771 assert(memcached_failed(memcached_instance_error_return(server
)));
775 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
776 memcached_mark_server_for_timeout(server
);
779 LIBMEMCACHED_MEMCACHED_CONNECT_END();
784 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname(), int(server
->port()));
785 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
792 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
794 return _memcached_connect(server
, true);