1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
56 # define SO_NOSIGPIPE 0
60 # define TCP_NODELAY 0
64 # define TCP_KEEPIDLE 0
67 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
70 fds
[0].fd
= server
->fd
;
71 fds
[0].events
= server
->events();
76 if (server
->root
->poll_timeout
== 0)
78 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
81 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
84 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
88 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
91 #ifdef TARGET_OS_LINUX
99 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
102 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
104 default: // This should not happen
105 if (fds
[0].revents
& POLLERR
)
108 socklen_t len
= sizeof(err
);
109 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
113 // This should never happen, if it does? Punt.
120 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
121 (void)closesocket(server
->fd
);
122 server
->fd
= INVALID_SOCKET
;
123 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
125 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
128 assert(number_of
== 0);
130 server
->io_wait_count
.timeouts
++;
131 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
135 server
->revents(fds
[0].revents
);
138 if (fds
[0].revents
& POLLERR
or
139 fds
[0].revents
& POLLHUP
or
140 fds
[0].revents
& POLLNVAL
)
143 socklen_t len
= sizeof (err
);
144 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
146 // We check the value to see what happened wth the socket.
149 return MEMCACHED_SUCCESS
;
154 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
156 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
158 return MEMCACHED_SUCCESS
;
161 // This should only be possible from ERESTART or EINTR;
162 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
165 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
167 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
168 if (server
->address_info
)
170 freeaddrinfo(server
->address_info
);
171 server
->address_info
= NULL
;
172 server
->address_info_next
= NULL
;
175 char str_port
[MEMCACHED_NI_MAXSERV
];
176 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
177 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0)
179 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
180 memcached_literal_param("snprintf(NI_MAXSERV)"));
183 struct addrinfo hints
;
184 memset(&hints
, 0, sizeof(struct addrinfo
));
187 hints
.ai_family
= AF_INET
;
189 if (memcached_is_udp(server
->root
))
191 hints
.ai_protocol
= IPPROTO_UDP
;
192 hints
.ai_socktype
= SOCK_DGRAM
;
196 hints
.ai_socktype
= SOCK_STREAM
;
197 hints
.ai_protocol
= IPPROTO_TCP
;
200 assert(server
->address_info
== NULL
);
201 assert(server
->address_info_next
== NULL
);
203 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
209 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
212 if (server
->address_info
)
214 freeaddrinfo(server
->address_info
);
215 server
->address_info
= NULL
;
216 server
->address_info_next
= NULL
;
218 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
221 if (server
->address_info
)
223 freeaddrinfo(server
->address_info
);
224 server
->address_info
= NULL
;
225 server
->address_info_next
= NULL
;
227 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
230 if (server
->address_info
)
232 freeaddrinfo(server
->address_info
);
233 server
->address_info
= NULL
;
234 server
->address_info_next
= NULL
;
236 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
240 if (server
->address_info
)
242 freeaddrinfo(server
->address_info
);
243 server
->address_info
= NULL
;
244 server
->address_info_next
= NULL
;
246 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
249 server
->address_info_next
= server
->address_info
;
250 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
252 return MEMCACHED_SUCCESS
;
255 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
259 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
261 memcached_set_errno(*server
, get_socket_errno(), NULL
);
266 if (SOCK_NONBLOCK
== 0)
270 flags
= fcntl(server
->fd
, F_GETFL
, 0);
271 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
275 memcached_set_errno(*server
, errno
, NULL
);
277 else if ((flags
& O_NONBLOCK
) == 0)
283 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
284 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
288 memcached_set_errno(*server
, errno
, NULL
);
295 static bool set_socket_options(org::libmemcached::Instance
* server
)
297 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
300 // If SOCK_CLOEXEC exists then we don't need to call the following
301 if (SOCK_CLOEXEC
== 0)
308 flags
= fcntl(server
->fd
, F_GETFD
, 0);
309 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
316 rval
= fcntl (server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
317 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
318 // we currently ignore the case where rval is -1
324 if (memcached_is_udp(server
->root
))
330 if (server
->root
->snd_timeout
> 0)
332 struct timeval waittime
;
334 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
335 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
337 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
338 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
345 if (server
->root
->rcv_timeout
> 0)
347 struct timeval waittime
;
349 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
350 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
352 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
353 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
360 #if defined(SO_NOSIGPIPE)
364 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
368 // This is not considered a fatal error
372 perror("setsockopt(SO_NOSIGPIPE)");
378 if (server
->root
->flags
.no_block
)
380 struct linger linger
;
383 linger
.l_linger
= 0; /* By default on close() just drop the socket */
384 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
385 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
392 if (server
->root
->flags
.tcp_nodelay
)
396 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
397 (char*)&flag
, (socklen_t
)sizeof(int));
403 if (server
->root
->flags
.tcp_keepalive
)
407 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
408 (char*)&flag
, (socklen_t
)sizeof(int));
415 if (server
->root
->tcp_keepidle
> 0)
417 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
418 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
424 if (server
->root
->send_size
> 0)
426 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
427 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
432 if (server
->root
->recv_size
> 0)
434 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
435 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
440 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
441 set_socket_nonblocking(server
);
446 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
449 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
452 int type
= SOCK_STREAM
;
460 type
|= SOCK_NONBLOCK
;
463 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) < 0)
465 return memcached_set_errno(*server
, errno
, NULL
);
468 struct sockaddr_un servAddr
;
470 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
471 servAddr
.sun_family
= AF_UNIX
;
472 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
474 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
480 server
->events(POLLOUT
);
484 (void)closesocket(server
->fd
);
485 server
->fd
= INVALID_SOCKET
;
488 case EISCONN
: /* We were spinning waiting on connect */
490 assert(0); // Programmer error
491 (void)closesocket(server
->fd
);
492 server
->fd
= INVALID_SOCKET
;
497 WATCHPOINT_ERRNO(errno
);
498 (void)closesocket(server
->fd
);
499 server
->fd
= INVALID_SOCKET
;
500 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
504 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
506 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
508 return MEMCACHED_SUCCESS
;
511 return MEMCACHED_NOT_SUPPORTED
;
515 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
517 bool timeout_error_occured
= false;
519 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
520 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
523 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
525 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
527 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
528 server
->address_info_next
= NULL
;
529 memcached_return_t rc
= set_hostinfo(server
);
531 if (memcached_failed(rc
))
537 if (server
->address_info_next
== NULL
)
539 server
->address_info_next
= server
->address_info
;
540 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
543 /* Create the socket */
544 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
546 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
547 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
549 server
->address_info_next
= server
->address_info_next
->ai_next
;
553 int type
= server
->address_info_next
->ai_socktype
;
561 type
|= SOCK_NONBLOCK
;
564 server
->fd
= socket(server
->address_info_next
->ai_family
,
566 server
->address_info_next
->ai_protocol
);
568 if (int(server
->fd
) == SOCKET_ERROR
)
570 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
573 if (set_socket_options(server
) == false)
575 (void)closesocket(server
->fd
);
576 return MEMCACHED_CONNECTION_FAILURE
;
579 /* connect to server */
580 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
582 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
583 return MEMCACHED_SUCCESS
;
586 /* An error occurred */
587 switch (get_socket_errno())
590 timeout_error_occured
= true;
594 #if EWOULDBLOCK != EAGAIN
597 case EINPROGRESS
: // nonblocking mode - first return
598 case EALREADY
: // nonblocking mode - subsequent returns
600 server
->events(POLLOUT
);
601 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
602 memcached_return_t rc
= connect_poll(server
);
604 if (memcached_success(rc
))
606 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
607 return MEMCACHED_SUCCESS
;
610 // A timeout here is treated as an error, we will not retry
611 if (rc
== MEMCACHED_TIMEOUT
)
613 timeout_error_occured
= true;
618 case EISCONN
: // we are connected :-)
619 WATCHPOINT_ASSERT(0); // This is a programmer's error
622 case EINTR
: // Special case, we retry ai_addr
623 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
624 (void)closesocket(server
->fd
);
625 server
->fd
= INVALID_SOCKET
;
629 // Probably not running service
635 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
636 (void)closesocket(server
->fd
);
637 server
->fd
= INVALID_SOCKET
;
638 server
->address_info_next
= server
->address_info_next
->ai_next
;
641 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
643 if (timeout_error_occured
)
645 if (server
->fd
!= INVALID_SOCKET
)
647 (void)closesocket(server
->fd
);
648 server
->fd
= INVALID_SOCKET
;
652 WATCHPOINT_STRING("Never got a good file descriptor");
654 if (memcached_has_current_error(*server
))
656 return memcached_instance_error_return(server
);
659 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
661 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
664 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
671 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
672 we get caught spending cycles just waiting.
674 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
676 struct timeval curr_time
;
677 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
680 If we hit server_failure_limit then something is completely wrong about the server.
682 1) If autoeject is enabled we do that.
683 2) If not? We go into timeout again, there is much else to do :(
685 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
688 We just auto_eject if we hit this point
690 if (_is_auto_eject_host(server
->root
))
692 set_last_disconnected_host(server
);
694 // Retry dead servers if requested
695 if (_gettime_success
and server
->root
->dead_timeout
> 0)
697 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
699 // We only retry dead servers once before assuming failure again
700 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
703 memcached_return_t rc
;
704 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
706 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
709 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
712 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
714 // Sanity check/setting
715 if (server
->next_retry
== 0)
717 server
->next_retry
= 1;
721 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
724 If next_retry is less then our current time, then we reset and try everything again.
726 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
728 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
732 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
738 return MEMCACHED_SUCCESS
;
741 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
744 if (server
->fd
!= INVALID_SOCKET
)
746 return MEMCACHED_SUCCESS
;
749 LIBMEMCACHED_MEMCACHED_CONNECT_START();
751 bool in_timeout
= false;
752 memcached_return_t rc
;
753 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
755 set_last_disconnected_host(server
);
759 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
761 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
764 if (server
->hostname
[0] == '/')
766 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
769 /* We need to clean up the multi startup piece */
770 switch (server
->type
)
772 case MEMCACHED_CONNECTION_UDP
:
773 case MEMCACHED_CONNECTION_TCP
:
774 rc
= network_connect(server
);
776 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
778 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
780 rc
= memcached_sasl_authenticate_connection(server
);
781 fprintf(stderr
, "%s:%d %s\n", __FILE__
, __LINE__
, memcached_strerror(NULL
, rc
));
782 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
784 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
785 (void)closesocket(server
->fd
);
786 server
->fd
= INVALID_SOCKET
;
792 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
793 rc
= unix_socket_connect(server
);
797 if (memcached_success(rc
))
799 server
->mark_server_as_clean();
800 memcached_version_instance(server
);
803 else if (set_last_disconnected
)
805 set_last_disconnected_host(server
);
806 if (memcached_has_current_error(*server
))
808 memcached_mark_server_for_timeout(server
);
809 assert(memcached_failed(memcached_instance_error_return(server
)));
813 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
814 memcached_mark_server_for_timeout(server
);
817 LIBMEMCACHED_MEMCACHED_CONNECT_END();
822 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
823 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
830 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
832 return _memcached_connect(server
, true);