1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
56 # define SO_NOSIGPIPE 0
60 # define TCP_NODELAY 0
64 # define TCP_KEEPIDLE 0
67 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
, const int connection_error
)
70 fds
[0].fd
= server
->fd
;
71 fds
[0].events
= server
->events();
76 if (server
->root
->poll_timeout
== 0)
78 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
79 memcached_literal_param("The time to wait for a connection to be established was set to zero, which means it will always timeout (MEMCACHED_TIMEOUT)."));
82 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
85 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) == -1)
87 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
98 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
101 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
102 memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
104 default: // This should not happen
108 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
109 server
->reset_socket();
110 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
112 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
117 if (connection_error
== EINPROGRESS
)
120 socklen_t len
= sizeof(err
);
121 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
123 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
126 // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
129 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
133 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_literal_param("(number_of == 0)"));
136 assert (number_of
== 1);
138 if (fds
[0].revents
& POLLERR
or
139 fds
[0].revents
& POLLHUP
or
140 fds
[0].revents
& POLLNVAL
)
143 socklen_t len
= sizeof (err
);
144 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
146 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getsockopt() errored while looking up error state from poll()"));
149 // We check the value to see what happened wth the socket.
150 if (err
== 0) // Should not happen
152 return MEMCACHED_SUCCESS
;
156 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() during connect."));
158 assert(fds
[0].revents
& POLLOUT
);
160 if (fds
[0].revents
& POLLOUT
and connection_error
== EINPROGRESS
)
163 socklen_t len
= sizeof(err
);
164 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
166 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
171 return MEMCACHED_SUCCESS
;
174 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
177 break; // We only have the loop setup for errno types that require restart
180 // This should only be possible from ERESTART or EINTR;
181 return memcached_set_errno(*server
, connection_error
, MEMCACHED_AT
, memcached_literal_param("connect_poll() was exhausted"));
184 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
186 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
187 server
->clear_addrinfo();
189 char str_port
[MEMCACHED_NI_MAXSERV
];
190 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
191 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0)
193 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
194 memcached_literal_param("snprintf(NI_MAXSERV)"));
197 struct addrinfo hints
;
198 memset(&hints
, 0, sizeof(struct addrinfo
));
200 hints
.ai_family
= AF_INET
;
201 if (memcached_is_udp(server
->root
))
203 hints
.ai_protocol
= IPPROTO_UDP
;
204 hints
.ai_socktype
= SOCK_DGRAM
;
208 hints
.ai_socktype
= SOCK_STREAM
;
209 hints
.ai_protocol
= IPPROTO_TCP
;
212 assert(server
->address_info
== NULL
);
213 assert(server
->address_info_next
== NULL
);
215 assert(server
->hostname());
216 switch(errcode
= getaddrinfo(server
->hostname(), str_port
, &hints
, &server
->address_info
))
219 server
->address_info_next
= server
->address_info
;
220 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
224 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
227 server
->clear_addrinfo();
228 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
231 server
->clear_addrinfo();
232 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
235 server
->clear_addrinfo();
236 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
240 server
->clear_addrinfo();
241 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
245 return MEMCACHED_SUCCESS
;
248 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
252 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
254 memcached_set_errno(*server
, get_socket_errno(), NULL
);
259 if (SOCK_NONBLOCK
== 0)
263 flags
= fcntl(server
->fd
, F_GETFL
, 0);
264 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
268 memcached_set_errno(*server
, errno
, NULL
);
270 else if ((flags
& O_NONBLOCK
) == 0)
276 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
277 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
281 memcached_set_errno(*server
, errno
, NULL
);
288 static bool set_socket_options(org::libmemcached::Instance
* server
)
290 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
293 // If SOCK_CLOEXEC exists then we don't need to call the following
294 if (SOCK_CLOEXEC
== 0)
301 flags
= fcntl(server
->fd
, F_GETFD
, 0);
302 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
309 rval
= fcntl (server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
310 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
311 // we currently ignore the case where rval is -1
317 if (memcached_is_udp(server
->root
))
323 if (server
->root
->snd_timeout
> 0)
325 struct timeval waittime
;
327 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
328 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
330 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
331 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
338 if (server
->root
->rcv_timeout
> 0)
340 struct timeval waittime
;
342 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
343 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
345 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
346 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
355 # if defined(SO_NOSIGPIPE)
359 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
363 // This is not considered a fatal error
367 perror("setsockopt(SO_NOSIGPIPE)");
371 # endif // SO_NOSIGPIPE
374 if (server
->root
->flags
.no_block
)
376 struct linger linger
;
379 linger
.l_linger
= 0; /* By default on close() just drop the socket */
380 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
381 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
388 if (server
->root
->flags
.tcp_nodelay
)
392 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
393 (char*)&flag
, (socklen_t
)sizeof(int));
399 if (server
->root
->flags
.tcp_keepalive
)
403 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
404 (char*)&flag
, (socklen_t
)sizeof(int));
411 if (server
->root
->tcp_keepidle
> 0)
413 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
414 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
420 if (server
->root
->send_size
> 0)
422 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
423 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
428 if (server
->root
->recv_size
> 0)
430 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
431 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
436 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
437 set_socket_nonblocking(server
);
442 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
445 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
448 int type
= SOCK_STREAM
;
456 type
|= SOCK_NONBLOCK
;
459 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) == -1)
461 return memcached_set_errno(*server
, errno
, NULL
);
464 struct sockaddr_un servAddr
;
466 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
467 servAddr
.sun_family
= AF_UNIX
;
468 strncpy(servAddr
.sun_path
, server
->hostname(), sizeof(servAddr
.sun_path
)); /* Copy filename */
470 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) == -1)
476 server
->events(POLLOUT
);
480 server
->reset_socket();
483 case EISCONN
: /* We were spinning waiting on connect */
485 assert(0); // Programmer error
486 server
->reset_socket();
491 WATCHPOINT_ERRNO(errno
);
492 server
->reset_socket();
493 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
497 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
499 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
501 return MEMCACHED_SUCCESS
;
504 return MEMCACHED_NOT_SUPPORTED
;
508 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
510 bool timeout_error_occured
= false;
512 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
513 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
516 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
518 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
520 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
521 server
->address_info_next
= NULL
;
522 memcached_return_t rc
= set_hostinfo(server
);
524 if (memcached_failed(rc
))
530 assert(server
->address_info_next
);
531 assert(server
->address_info
);
533 /* Create the socket */
534 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
536 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
537 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
539 server
->address_info_next
= server
->address_info_next
->ai_next
;
543 int type
= server
->address_info_next
->ai_socktype
;
551 type
|= SOCK_NONBLOCK
;
554 server
->fd
= socket(server
->address_info_next
->ai_family
,
556 server
->address_info_next
->ai_protocol
);
558 if (int(server
->fd
) == SOCKET_ERROR
)
560 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
563 if (set_socket_options(server
) == false)
565 server
->reset_socket();
566 return MEMCACHED_CONNECTION_FAILURE
;
569 /* connect to server */
570 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
572 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
573 return MEMCACHED_SUCCESS
;
576 /* An error occurred */
577 int local_error
= get_socket_errno();
581 timeout_error_occured
= true;
585 #if EWOULDBLOCK != EAGAIN
588 case EINPROGRESS
: // nonblocking mode - first return
589 case EALREADY
: // nonblocking mode - subsequent returns
591 server
->events(POLLOUT
);
592 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
593 memcached_return_t rc
= connect_poll(server
, local_error
);
595 if (memcached_success(rc
))
597 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
598 return MEMCACHED_SUCCESS
;
601 // A timeout here is treated as an error, we will not retry
602 if (rc
== MEMCACHED_TIMEOUT
)
604 timeout_error_occured
= true;
609 case EISCONN
: // we are connected :-)
610 WATCHPOINT_ASSERT(0); // This is a programmer's error
613 case EINTR
: // Special case, we retry ai_addr
614 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
615 server
->reset_socket();
619 // Probably not running service
625 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
626 server
->reset_socket();
627 server
->address_info_next
= server
->address_info_next
->ai_next
;
630 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
632 if (timeout_error_occured
)
634 server
->reset_socket();
637 WATCHPOINT_STRING("Never got a good file descriptor");
639 if (memcached_has_current_error(*server
))
641 return memcached_instance_error_return(server
);
644 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
646 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
647 memcached_literal_param("if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
650 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
657 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
658 we get caught spending cycles just waiting.
660 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
662 struct timeval curr_time
;
663 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
666 If we hit server_failure_limit then something is completely wrong about the server.
668 1) If autoeject is enabled we do that.
669 2) If not? We go into timeout again, there is much else to do :(
671 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
674 We just auto_eject if we hit this point
676 if (_is_auto_eject_host(server
->root
))
678 set_last_disconnected_host(server
);
680 // Retry dead servers if requested
681 if (_gettime_success
and server
->root
->dead_timeout
> 0)
683 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
685 // We only retry dead servers once before assuming failure again
686 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
689 memcached_return_t rc
;
690 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
692 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
695 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
698 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
700 // Sanity check/setting
701 if (server
->next_retry
== 0)
703 server
->next_retry
= 1;
707 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
710 If next_retry is less then our current time, then we reset and try everything again.
712 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
714 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
718 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
724 return MEMCACHED_SUCCESS
;
727 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
730 if (server
->fd
!= INVALID_SOCKET
)
732 return MEMCACHED_SUCCESS
;
735 LIBMEMCACHED_MEMCACHED_CONNECT_START();
737 bool in_timeout
= false;
738 memcached_return_t rc
;
739 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
741 set_last_disconnected_host(server
);
745 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
747 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
750 if (server
->hostname()[0] == '/')
752 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
755 /* We need to clean up the multi startup piece */
756 switch (server
->type
)
758 case MEMCACHED_CONNECTION_UDP
:
759 case MEMCACHED_CONNECTION_TCP
:
760 rc
= network_connect(server
);
762 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
764 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
766 rc
= memcached_sasl_authenticate_connection(server
);
767 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
769 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
770 server
->reset_socket();
776 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
777 rc
= unix_socket_connect(server
);
781 if (memcached_success(rc
))
783 server
->mark_server_as_clean();
784 memcached_version_instance(server
);
787 else if (set_last_disconnected
)
789 set_last_disconnected_host(server
);
790 if (memcached_has_current_error(*server
))
792 memcached_mark_server_for_timeout(server
);
793 assert(memcached_failed(memcached_instance_error_return(server
)));
797 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
798 memcached_mark_server_for_timeout(server
);
801 LIBMEMCACHED_MEMCACHED_CONNECT_END();
806 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname(), int(server
->port()));
807 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
814 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
816 return _memcached_connect(server
, true);