1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
56 # define SO_NOSIGPIPE 0
60 # define TCP_NODELAY 0
64 # define TCP_KEEPIDLE 0
67 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
, const int connection_error
)
70 fds
[0].fd
= server
->fd
;
71 fds
[0].events
= server
->events();
76 if (server
->root
->poll_timeout
== 0)
78 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
,
79 memcached_literal_param("The time to wait for a connection to be established was set to zero, which means it will always timeout (MEMCACHED_TIMEOUT)."));
82 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
85 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) == -1)
87 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
90 #ifdef TARGET_OS_LINUX
98 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
101 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
102 memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
104 default: // This should not happen
107 if (fds
[0].revents
& POLLERR
)
110 socklen_t len
= sizeof(err
);
111 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
115 // This should never happen, if it does? Punt.
124 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
125 server
->reset_socket();
126 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
128 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
133 if (connection_error
== EINPROGRESS
)
136 socklen_t len
= sizeof(err
);
137 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
139 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
142 // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
145 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
149 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
153 server
->revents(fds
[0].revents
);
156 assert (number_of
== 1);
158 if (fds
[0].revents
& POLLERR
or
159 fds
[0].revents
& POLLHUP
or
160 fds
[0].revents
& POLLNVAL
)
163 socklen_t len
= sizeof (err
);
164 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
166 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getsockopt() errored while looking up error state from poll()"));
169 // We check the value to see what happened wth the socket.
170 if (err
== 0) // Should not happen
172 return MEMCACHED_SUCCESS
;
176 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() during connect."));
178 assert(fds
[0].revents
& POLLOUT
);
180 if (fds
[0].revents
& POLLOUT
and connection_error
== EINPROGRESS
)
183 socklen_t len
= sizeof(err
);
184 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == -1)
186 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
191 return MEMCACHED_SUCCESS
;
194 return memcached_set_errno(*server
, err
, MEMCACHED_AT
, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
197 break; // We only have the loop setup for errno types that require restart
200 // This should only be possible from ERESTART or EINTR;
201 return memcached_set_errno(*server
, connection_error
, MEMCACHED_AT
, memcached_literal_param("connect_poll() was exhausted"));
204 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
206 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
207 server
->clear_addrinfo();
209 char str_port
[MEMCACHED_NI_MAXSERV
];
210 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
211 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0)
213 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
214 memcached_literal_param("snprintf(NI_MAXSERV)"));
217 struct addrinfo hints
;
218 memset(&hints
, 0, sizeof(struct addrinfo
));
220 hints
.ai_family
= AF_INET
;
221 if (memcached_is_udp(server
->root
))
223 hints
.ai_protocol
= IPPROTO_UDP
;
224 hints
.ai_socktype
= SOCK_DGRAM
;
228 hints
.ai_socktype
= SOCK_STREAM
;
229 hints
.ai_protocol
= IPPROTO_TCP
;
232 assert(server
->address_info
== NULL
);
233 assert(server
->address_info_next
== NULL
);
235 assert(server
->hostname());
236 switch(errcode
= getaddrinfo(server
->hostname(), str_port
, &hints
, &server
->address_info
))
239 server
->address_info_next
= server
->address_info
;
240 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
244 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
247 server
->clear_addrinfo();
248 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
251 server
->clear_addrinfo();
252 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
255 server
->clear_addrinfo();
256 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
260 server
->clear_addrinfo();
261 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
265 return MEMCACHED_SUCCESS
;
268 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
272 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
274 memcached_set_errno(*server
, get_socket_errno(), NULL
);
279 if (SOCK_NONBLOCK
== 0)
283 flags
= fcntl(server
->fd
, F_GETFL
, 0);
284 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
288 memcached_set_errno(*server
, errno
, NULL
);
290 else if ((flags
& O_NONBLOCK
) == 0)
296 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
297 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
301 memcached_set_errno(*server
, errno
, NULL
);
308 static bool set_socket_options(org::libmemcached::Instance
* server
)
310 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
313 // If SOCK_CLOEXEC exists then we don't need to call the following
314 if (SOCK_CLOEXEC
== 0)
321 flags
= fcntl(server
->fd
, F_GETFD
, 0);
322 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
329 rval
= fcntl (server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
330 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
331 // we currently ignore the case where rval is -1
337 if (memcached_is_udp(server
->root
))
343 if (server
->root
->snd_timeout
> 0)
345 struct timeval waittime
;
347 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
348 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
350 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
351 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
358 if (server
->root
->rcv_timeout
> 0)
360 struct timeval waittime
;
362 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
363 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
365 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
366 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
375 # if defined(SO_NOSIGPIPE)
379 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
383 // This is not considered a fatal error
387 perror("setsockopt(SO_NOSIGPIPE)");
391 # endif // SO_NOSIGPIPE
394 if (server
->root
->flags
.no_block
)
396 struct linger linger
;
399 linger
.l_linger
= 0; /* By default on close() just drop the socket */
400 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
401 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
408 if (server
->root
->flags
.tcp_nodelay
)
412 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
413 (char*)&flag
, (socklen_t
)sizeof(int));
419 if (server
->root
->flags
.tcp_keepalive
)
423 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
424 (char*)&flag
, (socklen_t
)sizeof(int));
431 if (server
->root
->tcp_keepidle
> 0)
433 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
434 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
440 if (server
->root
->send_size
> 0)
442 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
443 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
448 if (server
->root
->recv_size
> 0)
450 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
451 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
456 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
457 set_socket_nonblocking(server
);
462 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
465 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
468 int type
= SOCK_STREAM
;
476 type
|= SOCK_NONBLOCK
;
479 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) == -1)
481 return memcached_set_errno(*server
, errno
, NULL
);
484 struct sockaddr_un servAddr
;
486 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
487 servAddr
.sun_family
= AF_UNIX
;
488 strncpy(servAddr
.sun_path
, server
->hostname(), sizeof(servAddr
.sun_path
)); /* Copy filename */
490 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) == -1)
496 server
->events(POLLOUT
);
500 server
->reset_socket();
503 case EISCONN
: /* We were spinning waiting on connect */
505 assert(0); // Programmer error
506 server
->reset_socket();
511 WATCHPOINT_ERRNO(errno
);
512 server
->reset_socket();
513 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
517 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
519 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
521 return MEMCACHED_SUCCESS
;
524 return MEMCACHED_NOT_SUPPORTED
;
528 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
530 bool timeout_error_occured
= false;
532 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
533 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
536 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
538 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
540 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
541 server
->address_info_next
= NULL
;
542 memcached_return_t rc
= set_hostinfo(server
);
544 if (memcached_failed(rc
))
550 assert(server
->address_info_next
);
551 assert(server
->address_info
);
553 /* Create the socket */
554 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
556 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
557 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
559 server
->address_info_next
= server
->address_info_next
->ai_next
;
563 int type
= server
->address_info_next
->ai_socktype
;
571 type
|= SOCK_NONBLOCK
;
574 server
->fd
= socket(server
->address_info_next
->ai_family
,
576 server
->address_info_next
->ai_protocol
);
578 if (int(server
->fd
) == SOCKET_ERROR
)
580 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
583 if (set_socket_options(server
) == false)
585 server
->reset_socket();
586 return MEMCACHED_CONNECTION_FAILURE
;
589 /* connect to server */
590 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
592 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
593 return MEMCACHED_SUCCESS
;
596 /* An error occurred */
597 int local_error
= get_socket_errno();
601 timeout_error_occured
= true;
605 #if EWOULDBLOCK != EAGAIN
608 case EINPROGRESS
: // nonblocking mode - first return
609 case EALREADY
: // nonblocking mode - subsequent returns
611 server
->events(POLLOUT
);
612 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
613 memcached_return_t rc
= connect_poll(server
, local_error
);
615 if (memcached_success(rc
))
617 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
618 return MEMCACHED_SUCCESS
;
621 // A timeout here is treated as an error, we will not retry
622 if (rc
== MEMCACHED_TIMEOUT
)
624 timeout_error_occured
= true;
629 case EISCONN
: // we are connected :-)
630 WATCHPOINT_ASSERT(0); // This is a programmer's error
633 case EINTR
: // Special case, we retry ai_addr
634 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
635 server
->reset_socket();
639 // Probably not running service
645 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
646 server
->reset_socket();
647 server
->address_info_next
= server
->address_info_next
->ai_next
;
650 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
652 if (timeout_error_occured
)
654 server
->reset_socket();
657 WATCHPOINT_STRING("Never got a good file descriptor");
659 if (memcached_has_current_error(*server
))
661 return memcached_instance_error_return(server
);
664 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
666 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
669 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
676 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
677 we get caught spending cycles just waiting.
679 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
681 struct timeval curr_time
;
682 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
685 If we hit server_failure_limit then something is completely wrong about the server.
687 1) If autoeject is enabled we do that.
688 2) If not? We go into timeout again, there is much else to do :(
690 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
693 We just auto_eject if we hit this point
695 if (_is_auto_eject_host(server
->root
))
697 set_last_disconnected_host(server
);
699 // Retry dead servers if requested
700 if (_gettime_success
and server
->root
->dead_timeout
> 0)
702 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
704 // We only retry dead servers once before assuming failure again
705 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
708 memcached_return_t rc
;
709 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
711 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
714 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
717 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
719 // Sanity check/setting
720 if (server
->next_retry
== 0)
722 server
->next_retry
= 1;
726 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
729 If next_retry is less then our current time, then we reset and try everything again.
731 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
733 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
737 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
743 return MEMCACHED_SUCCESS
;
746 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
749 if (server
->fd
!= INVALID_SOCKET
)
751 return MEMCACHED_SUCCESS
;
754 LIBMEMCACHED_MEMCACHED_CONNECT_START();
756 bool in_timeout
= false;
757 memcached_return_t rc
;
758 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
760 set_last_disconnected_host(server
);
764 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
766 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
769 if (server
->hostname()[0] == '/')
771 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
774 /* We need to clean up the multi startup piece */
775 switch (server
->type
)
777 case MEMCACHED_CONNECTION_UDP
:
778 case MEMCACHED_CONNECTION_TCP
:
779 rc
= network_connect(server
);
781 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
783 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
785 rc
= memcached_sasl_authenticate_connection(server
);
786 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
788 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
789 server
->reset_socket();
795 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
796 rc
= unix_socket_connect(server
);
800 if (memcached_success(rc
))
802 server
->mark_server_as_clean();
803 memcached_version_instance(server
);
806 else if (set_last_disconnected
)
808 set_last_disconnected_host(server
);
809 if (memcached_has_current_error(*server
))
811 memcached_mark_server_for_timeout(server
);
812 assert(memcached_failed(memcached_instance_error_return(server
)));
816 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
817 memcached_mark_server_for_timeout(server
);
820 LIBMEMCACHED_MEMCACHED_CONNECT_END();
825 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname(), int(server
->port()));
826 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
833 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
835 return _memcached_connect(server
, true);