1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
55 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
58 fds
[0].fd
= server
->fd
;
59 fds
[0].events
= POLLOUT
;
63 if (server
->root
->poll_timeout
== 0)
65 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
68 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
71 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
75 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
78 #ifdef TARGET_OS_LINUX
86 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
89 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
91 default: // This should not happen
92 if (fds
[0].revents
& POLLERR
)
95 socklen_t len
= sizeof(err
);
96 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
100 // This should never happen, if it does? Punt.
107 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
108 (void)closesocket(server
->fd
);
109 server
->fd
= INVALID_SOCKET
;
110 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
112 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
115 assert(number_of
== 0);
117 server
->io_wait_count
.timeouts
++;
118 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
121 if (fds
[0].revents
& POLLERR
or
122 fds
[0].revents
& POLLHUP
or
123 fds
[0].revents
& POLLNVAL
)
126 socklen_t len
= sizeof (err
);
127 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
129 // We check the value to see what happened wth the socket.
132 return MEMCACHED_SUCCESS
;
137 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
139 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
141 return MEMCACHED_SUCCESS
;
144 // This should only be possible from ERESTART or EINTR;
145 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
148 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
150 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
151 if (server
->address_info
)
153 freeaddrinfo(server
->address_info
);
154 server
->address_info
= NULL
;
155 server
->address_info_next
= NULL
;
158 char str_port
[NI_MAXSERV
];
159 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", uint32_t(server
->port()));
160 if (length
>= NI_MAXSERV
or length
<= 0)
162 return MEMCACHED_FAILURE
;
165 struct addrinfo hints
;
166 memset(&hints
, 0, sizeof(struct addrinfo
));
169 hints
.ai_family
= AF_INET
;
171 if (memcached_is_udp(server
->root
))
173 hints
.ai_protocol
= IPPROTO_UDP
;
174 hints
.ai_socktype
= SOCK_DGRAM
;
178 hints
.ai_socktype
= SOCK_STREAM
;
179 hints
.ai_protocol
= IPPROTO_TCP
;
182 assert(server
->address_info
== NULL
);
183 assert(server
->address_info_next
== NULL
);
185 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
191 if (server
->address_info
)
193 freeaddrinfo(server
->address_info
);
194 server
->address_info
= NULL
;
195 server
->address_info_next
= NULL
;
197 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
200 if (server
->address_info
)
202 freeaddrinfo(server
->address_info
);
203 server
->address_info
= NULL
;
204 server
->address_info_next
= NULL
;
206 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
209 if (server
->address_info
)
211 freeaddrinfo(server
->address_info
);
212 server
->address_info
= NULL
;
213 server
->address_info_next
= NULL
;
215 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
218 if (server
->address_info
)
220 freeaddrinfo(server
->address_info
);
221 server
->address_info
= NULL
;
222 server
->address_info_next
= NULL
;
224 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
228 if (server
->address_info
)
230 freeaddrinfo(server
->address_info
);
231 server
->address_info
= NULL
;
232 server
->address_info_next
= NULL
;
234 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
237 server
->address_info_next
= server
->address_info
;
238 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
240 return MEMCACHED_SUCCESS
;
243 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
247 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
249 memcached_set_errno(*server
, get_socket_errno(), NULL
);
254 if (SOCK_NONBLOCK
== 0)
258 flags
= fcntl(server
->fd
, F_GETFL
, 0);
259 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
263 memcached_set_errno(*server
, errno
, NULL
);
265 else if ((flags
& O_NONBLOCK
) == 0)
271 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
272 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
276 memcached_set_errno(*server
, errno
, NULL
);
283 static void set_socket_options(org::libmemcached::Instance
* server
)
285 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
287 if (memcached_is_udp(server
->root
))
293 if (server
->root
->snd_timeout
> 0)
295 struct timeval waittime
;
297 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
298 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
300 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
301 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
308 if (server
->root
->rcv_timeout
> 0)
310 struct timeval waittime
;
312 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
313 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
315 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
316 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
323 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
326 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
330 // This is not considered a fatal error
334 perror("setsockopt(SO_NOSIGPIPE)");
340 if (server
->root
->flags
.no_block
)
342 struct linger linger
;
345 linger
.l_linger
= 0; /* By default on close() just drop the socket */
346 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
347 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
352 if (server
->root
->flags
.tcp_nodelay
)
356 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
357 (char*)&flag
, (socklen_t
)sizeof(int));
362 if (server
->root
->flags
.tcp_keepalive
)
366 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
367 (char*)&flag
, (socklen_t
)sizeof(int));
373 if (server
->root
->tcp_keepidle
> 0)
375 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
376 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
382 if (server
->root
->send_size
> 0)
384 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
385 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
390 if (server
->root
->recv_size
> 0)
392 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
393 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
398 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
399 set_socket_nonblocking(server
);
402 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
405 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
407 int type
= SOCK_STREAM
;
415 type
|= SOCK_NONBLOCK
;
418 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) < 0)
420 memcached_set_errno(*server
, errno
, NULL
);
421 return MEMCACHED_CONNECTION_FAILURE
;
424 struct sockaddr_un servAddr
;
426 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
427 servAddr
.sun_family
= AF_UNIX
;
428 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
431 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
440 case EISCONN
: /* We were spinning waiting on connect */
442 assert(0); // Programmer error
447 WATCHPOINT_ERRNO(errno
);
448 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
449 return MEMCACHED_CONNECTION_FAILURE
;
453 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
455 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
457 return MEMCACHED_SUCCESS
;
460 return MEMCACHED_NOT_SUPPORTED
;
464 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
466 bool timeout_error_occured
= false;
468 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
469 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
472 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
474 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
476 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
477 server
->address_info_next
= NULL
;
478 memcached_return_t rc
= set_hostinfo(server
);
480 if (memcached_failed(rc
))
486 if (server
->address_info_next
== NULL
)
488 server
->address_info_next
= server
->address_info
;
489 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
492 /* Create the socket */
493 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
495 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
496 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
498 server
->address_info_next
= server
->address_info_next
->ai_next
;
502 int type
= server
->address_info_next
->ai_socktype
;
510 type
|= SOCK_NONBLOCK
;
513 server
->fd
= socket(server
->address_info_next
->ai_family
,
515 server
->address_info_next
->ai_protocol
);
517 if (int(server
->fd
) == SOCKET_ERROR
)
519 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
523 // If SOCK_CLOEXEC exists then we don't need to call the following
524 if (SOCK_CLOEXEC
== 0)
531 rval
= fcntl (server
->fd
, F_SETFD
, FD_CLOEXEC
);
532 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
537 set_socket_options(server
);
539 /* connect to server */
540 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
542 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
543 return MEMCACHED_SUCCESS
;
546 /* An error occurred */
547 switch (get_socket_errno())
550 timeout_error_occured
= true;
554 #if EWOULDBLOCK != EAGAIN
557 case EINPROGRESS
: // nonblocking mode - first return
558 case EALREADY
: // nonblocking mode - subsequent returns
560 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
561 memcached_return_t rc
= connect_poll(server
);
563 if (memcached_success(rc
))
565 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
566 return MEMCACHED_SUCCESS
;
569 // A timeout here is treated as an error, we will not retry
570 if (rc
== MEMCACHED_TIMEOUT
)
572 timeout_error_occured
= true;
577 case EISCONN
: // we are connected :-)
578 WATCHPOINT_ASSERT(0); // This is a programmer's error
581 case EINTR
: // Special case, we retry ai_addr
582 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
583 (void)closesocket(server
->fd
);
584 server
->fd
= INVALID_SOCKET
;
591 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
592 (void)closesocket(server
->fd
);
593 server
->fd
= INVALID_SOCKET
;
594 server
->address_info_next
= server
->address_info_next
->ai_next
;
597 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
599 if (timeout_error_occured
)
601 if (server
->fd
!= INVALID_SOCKET
)
603 (void)closesocket(server
->fd
);
604 server
->fd
= INVALID_SOCKET
;
608 WATCHPOINT_STRING("Never got a good file descriptor");
610 if (memcached_has_current_error(*server
))
612 return memcached_instance_error_return(server
);
615 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
617 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
620 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
627 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
628 we get caught spending cycles just waiting.
630 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
632 struct timeval curr_time
;
633 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
636 If we hit server_failure_limit then something is completely wrong about the server.
638 1) If autoeject is enabled we do that.
639 2) If not? We go into timeout again, there is much else to do :(
641 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
644 We just auto_eject if we hit this point
646 if (_is_auto_eject_host(server
->root
))
648 set_last_disconnected_host(server
);
650 // Retry dead servers if requested
651 if (_gettime_success
and server
->root
->dead_timeout
> 0)
653 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
655 // We only retry dead servers once before assuming failure again
656 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
659 memcached_return_t rc
;
660 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
662 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
665 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
668 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
670 // Sanity check/setting
671 if (server
->next_retry
== 0)
673 server
->next_retry
= 1;
677 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
680 If next_retry is less then our current time, then we reset and try everything again.
682 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
684 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
688 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
694 return MEMCACHED_SUCCESS
;
697 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
700 if (server
->fd
!= INVALID_SOCKET
)
702 return MEMCACHED_SUCCESS
;
705 LIBMEMCACHED_MEMCACHED_CONNECT_START();
707 bool in_timeout
= false;
708 memcached_return_t rc
;
709 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
711 set_last_disconnected_host(server
);
715 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
717 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
720 if (server
->hostname
[0] == '/')
722 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
725 /* We need to clean up the multi startup piece */
726 switch (server
->type
)
728 case MEMCACHED_CONNECTION_UDP
:
729 case MEMCACHED_CONNECTION_TCP
:
730 rc
= network_connect(server
);
732 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
734 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
736 rc
= memcached_sasl_authenticate_connection(server
);
737 fprintf(stderr
, "%s:%d %s\n", __FILE__
, __LINE__
, memcached_strerror(NULL
, rc
));
738 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
740 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
741 (void)closesocket(server
->fd
);
742 server
->fd
= INVALID_SOCKET
;
748 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
749 rc
= unix_socket_connect(server
);
753 if (memcached_success(rc
))
755 server
->mark_server_as_clean();
756 memcached_version_instance(server
);
759 else if (set_last_disconnected
)
761 set_last_disconnected_host(server
);
762 if (memcached_has_current_error(*server
))
764 memcached_mark_server_for_timeout(server
);
765 assert(memcached_failed(memcached_instance_error_return(server
)));
769 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
770 memcached_mark_server_for_timeout(server
);
773 LIBMEMCACHED_MEMCACHED_CONNECT_END();
778 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
779 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
786 memcached_return_t
memcached_connect_try(org::libmemcached::Instance
* server
)
788 if (server
and server
->root
and server
->root
->state
.is_parsing
)
790 return MEMCACHED_SUCCESS
;
793 return _memcached_connect(server
, false);
796 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
798 return _memcached_connect(server
, true);