1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
56 # define SO_NOSIGPIPE 0
60 # define TCP_NODELAY 0
64 # define TCP_KEEPIDLE 0
67 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
70 fds
[0].fd
= server
->fd
;
71 fds
[0].events
= server
->events();
76 if (server
->root
->poll_timeout
== 0)
78 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
81 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
84 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
88 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
91 #ifdef TARGET_OS_LINUX
99 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
102 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
104 default: // This should not happen
105 if (fds
[0].revents
& POLLERR
)
108 socklen_t len
= sizeof(err
);
109 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
113 // This should never happen, if it does? Punt.
120 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
121 server
->reset_socket();
122 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
124 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
127 assert(number_of
== 0);
129 server
->io_wait_count
.timeouts
++;
130 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
134 server
->revents(fds
[0].revents
);
137 if (fds
[0].revents
& POLLERR
or
138 fds
[0].revents
& POLLHUP
or
139 fds
[0].revents
& POLLNVAL
)
142 socklen_t len
= sizeof (err
);
143 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
145 // We check the value to see what happened wth the socket.
148 return MEMCACHED_SUCCESS
;
153 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
155 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
157 return MEMCACHED_SUCCESS
;
160 // This should only be possible from ERESTART or EINTR;
161 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
164 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
166 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
167 if (server
->address_info
)
169 freeaddrinfo(server
->address_info
);
170 server
->address_info
= NULL
;
171 server
->address_info_next
= NULL
;
174 char str_port
[MEMCACHED_NI_MAXSERV
];
175 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
176 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0)
178 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
179 memcached_literal_param("snprintf(NI_MAXSERV)"));
182 struct addrinfo hints
;
183 memset(&hints
, 0, sizeof(struct addrinfo
));
186 hints
.ai_family
= AF_INET
;
188 if (memcached_is_udp(server
->root
))
190 hints
.ai_protocol
= IPPROTO_UDP
;
191 hints
.ai_socktype
= SOCK_DGRAM
;
195 hints
.ai_socktype
= SOCK_STREAM
;
196 hints
.ai_protocol
= IPPROTO_TCP
;
199 assert(server
->address_info
== NULL
);
200 assert(server
->address_info_next
== NULL
);
202 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
208 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
211 if (server
->address_info
)
213 freeaddrinfo(server
->address_info
);
214 server
->address_info
= NULL
;
215 server
->address_info_next
= NULL
;
217 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
220 if (server
->address_info
)
222 freeaddrinfo(server
->address_info
);
223 server
->address_info
= NULL
;
224 server
->address_info_next
= NULL
;
226 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
229 if (server
->address_info
)
231 freeaddrinfo(server
->address_info
);
232 server
->address_info
= NULL
;
233 server
->address_info_next
= NULL
;
235 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
239 if (server
->address_info
)
241 freeaddrinfo(server
->address_info
);
242 server
->address_info
= NULL
;
243 server
->address_info_next
= NULL
;
245 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
248 server
->address_info_next
= server
->address_info
;
249 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
251 return MEMCACHED_SUCCESS
;
254 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
258 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
260 memcached_set_errno(*server
, get_socket_errno(), NULL
);
265 if (SOCK_NONBLOCK
== 0)
269 flags
= fcntl(server
->fd
, F_GETFL
, 0);
270 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
274 memcached_set_errno(*server
, errno
, NULL
);
276 else if ((flags
& O_NONBLOCK
) == 0)
282 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
283 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
287 memcached_set_errno(*server
, errno
, NULL
);
294 static bool set_socket_options(org::libmemcached::Instance
* server
)
296 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
299 // If SOCK_CLOEXEC exists then we don't need to call the following
300 if (SOCK_CLOEXEC
== 0)
307 flags
= fcntl(server
->fd
, F_GETFD
, 0);
308 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
315 rval
= fcntl (server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
316 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
317 // we currently ignore the case where rval is -1
323 if (memcached_is_udp(server
->root
))
329 if (server
->root
->snd_timeout
> 0)
331 struct timeval waittime
;
333 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
334 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
336 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
337 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
344 if (server
->root
->rcv_timeout
> 0)
346 struct timeval waittime
;
348 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
349 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
351 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
352 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
361 #if defined(SO_NOSIGPIPE)
365 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
369 // This is not considered a fatal error
373 perror("setsockopt(SO_NOSIGPIPE)");
377 #endif // SO_NOSIGPIPE
380 if (server
->root
->flags
.no_block
)
382 struct linger linger
;
385 linger
.l_linger
= 0; /* By default on close() just drop the socket */
386 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
387 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
394 if (server
->root
->flags
.tcp_nodelay
)
398 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
399 (char*)&flag
, (socklen_t
)sizeof(int));
405 if (server
->root
->flags
.tcp_keepalive
)
409 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
410 (char*)&flag
, (socklen_t
)sizeof(int));
417 if (server
->root
->tcp_keepidle
> 0)
419 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
420 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
426 if (server
->root
->send_size
> 0)
428 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
429 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
434 if (server
->root
->recv_size
> 0)
436 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
437 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
442 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
443 set_socket_nonblocking(server
);
448 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
451 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
454 int type
= SOCK_STREAM
;
462 type
|= SOCK_NONBLOCK
;
465 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) == -1)
467 return memcached_set_errno(*server
, errno
, NULL
);
470 struct sockaddr_un servAddr
;
472 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
473 servAddr
.sun_family
= AF_UNIX
;
474 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
476 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) == -1)
482 server
->events(POLLOUT
);
486 server
->reset_socket();
489 case EISCONN
: /* We were spinning waiting on connect */
491 assert(0); // Programmer error
492 server
->reset_socket();
497 WATCHPOINT_ERRNO(errno
);
498 server
->reset_socket();
499 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
503 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
505 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
507 return MEMCACHED_SUCCESS
;
510 return MEMCACHED_NOT_SUPPORTED
;
514 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
516 bool timeout_error_occured
= false;
518 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
519 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
522 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
524 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
526 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
527 server
->address_info_next
= NULL
;
528 memcached_return_t rc
= set_hostinfo(server
);
530 if (memcached_failed(rc
))
536 if (server
->address_info_next
== NULL
)
538 server
->address_info_next
= server
->address_info
;
539 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
542 /* Create the socket */
543 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
545 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
546 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
548 server
->address_info_next
= server
->address_info_next
->ai_next
;
552 int type
= server
->address_info_next
->ai_socktype
;
560 type
|= SOCK_NONBLOCK
;
563 server
->fd
= socket(server
->address_info_next
->ai_family
,
565 server
->address_info_next
->ai_protocol
);
567 if (int(server
->fd
) == SOCKET_ERROR
)
569 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
572 if (set_socket_options(server
) == false)
574 server
->reset_socket();
575 return MEMCACHED_CONNECTION_FAILURE
;
578 /* connect to server */
579 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
581 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
582 return MEMCACHED_SUCCESS
;
585 /* An error occurred */
586 switch (get_socket_errno())
589 timeout_error_occured
= true;
593 #if EWOULDBLOCK != EAGAIN
596 case EINPROGRESS
: // nonblocking mode - first return
597 case EALREADY
: // nonblocking mode - subsequent returns
599 server
->events(POLLOUT
);
600 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
601 memcached_return_t rc
= connect_poll(server
);
603 if (memcached_success(rc
))
605 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
606 return MEMCACHED_SUCCESS
;
609 // A timeout here is treated as an error, we will not retry
610 if (rc
== MEMCACHED_TIMEOUT
)
612 timeout_error_occured
= true;
617 case EISCONN
: // we are connected :-)
618 WATCHPOINT_ASSERT(0); // This is a programmer's error
621 case EINTR
: // Special case, we retry ai_addr
622 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
623 server
->reset_socket();
627 // Probably not running service
633 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
634 server
->reset_socket();
635 server
->address_info_next
= server
->address_info_next
->ai_next
;
638 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
640 if (timeout_error_occured
)
642 server
->reset_socket();
645 WATCHPOINT_STRING("Never got a good file descriptor");
647 if (memcached_has_current_error(*server
))
649 return memcached_instance_error_return(server
);
652 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
654 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
657 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
664 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
665 we get caught spending cycles just waiting.
667 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
669 struct timeval curr_time
;
670 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
673 If we hit server_failure_limit then something is completely wrong about the server.
675 1) If autoeject is enabled we do that.
676 2) If not? We go into timeout again, there is much else to do :(
678 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
681 We just auto_eject if we hit this point
683 if (_is_auto_eject_host(server
->root
))
685 set_last_disconnected_host(server
);
687 // Retry dead servers if requested
688 if (_gettime_success
and server
->root
->dead_timeout
> 0)
690 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
692 // We only retry dead servers once before assuming failure again
693 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
696 memcached_return_t rc
;
697 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
699 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
702 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
705 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
707 // Sanity check/setting
708 if (server
->next_retry
== 0)
710 server
->next_retry
= 1;
714 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
717 If next_retry is less then our current time, then we reset and try everything again.
719 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
721 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
725 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
731 return MEMCACHED_SUCCESS
;
734 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
737 if (server
->fd
!= INVALID_SOCKET
)
739 return MEMCACHED_SUCCESS
;
742 LIBMEMCACHED_MEMCACHED_CONNECT_START();
744 bool in_timeout
= false;
745 memcached_return_t rc
;
746 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
748 set_last_disconnected_host(server
);
752 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
754 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
757 if (server
->hostname
[0] == '/')
759 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
762 /* We need to clean up the multi startup piece */
763 switch (server
->type
)
765 case MEMCACHED_CONNECTION_UDP
:
766 case MEMCACHED_CONNECTION_TCP
:
767 rc
= network_connect(server
);
769 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
771 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
773 rc
= memcached_sasl_authenticate_connection(server
);
774 fprintf(stderr
, "%s:%d %s\n", __FILE__
, __LINE__
, memcached_strerror(NULL
, rc
));
775 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
777 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
778 server
->reset_socket();
784 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
785 rc
= unix_socket_connect(server
);
789 if (memcached_success(rc
))
791 server
->mark_server_as_clean();
792 memcached_version_instance(server
);
795 else if (set_last_disconnected
)
797 set_last_disconnected_host(server
);
798 if (memcached_has_current_error(*server
))
800 memcached_mark_server_for_timeout(server
);
801 assert(memcached_failed(memcached_instance_error_return(server
)));
805 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
806 memcached_mark_server_for_timeout(server
);
809 LIBMEMCACHED_MEMCACHED_CONNECT_END();
814 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
815 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
822 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
824 return _memcached_connect(server
, true);