1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
46 #define SOCK_CLOEXEC 0
49 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
52 fds
[0].fd
= server
->fd
;
53 fds
[0].events
= POLLOUT
;
57 if (server
->root
->poll_timeout
== 0)
59 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
62 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
65 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
69 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
72 #ifdef TARGET_OS_LINUX
80 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
83 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
85 default: // This should not happen
86 if (fds
[0].revents
& POLLERR
)
89 socklen_t len
= sizeof(err
);
90 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
94 // This should never happen, if it does? Punt.
101 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
102 (void)closesocket(server
->fd
);
103 server
->fd
= INVALID_SOCKET
;
104 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
106 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
109 assert(number_of
== 0);
111 server
->io_wait_count
.timeouts
++;
112 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
115 if (fds
[0].revents
& POLLERR
or
116 fds
[0].revents
& POLLHUP
or
117 fds
[0].revents
& POLLNVAL
)
120 socklen_t len
= sizeof (err
);
121 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
123 // We check the value to see what happened wth the socket.
126 return MEMCACHED_SUCCESS
;
131 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
133 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
135 return MEMCACHED_SUCCESS
;
138 // This should only be possible from ERESTART or EINTR;
139 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
142 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
144 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
145 if (server
->address_info
)
147 freeaddrinfo(server
->address_info
);
148 server
->address_info
= NULL
;
149 server
->address_info_next
= NULL
;
152 char str_port
[NI_MAXSERV
];
153 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", uint32_t(server
->port()));
154 if (length
>= NI_MAXSERV
or length
<= 0)
156 return MEMCACHED_FAILURE
;
159 struct addrinfo hints
;
160 memset(&hints
, 0, sizeof(struct addrinfo
));
163 hints
.ai_family
= AF_INET
;
165 if (memcached_is_udp(server
->root
))
167 hints
.ai_protocol
= IPPROTO_UDP
;
168 hints
.ai_socktype
= SOCK_DGRAM
;
172 hints
.ai_socktype
= SOCK_STREAM
;
173 hints
.ai_protocol
= IPPROTO_TCP
;
176 server
->address_info
= NULL
;
178 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
184 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
187 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
190 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
193 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
197 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
200 server
->address_info_next
= server
->address_info
;
201 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
203 return MEMCACHED_SUCCESS
;
206 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
210 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
212 memcached_set_errno(*server
, get_socket_errno(), NULL
);
219 flags
= fcntl(server
->fd
, F_GETFL
, 0);
220 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
224 memcached_set_errno(*server
, errno
, NULL
);
226 else if ((flags
& O_NONBLOCK
) == 0)
232 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
233 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
237 memcached_set_errno(*server
, errno
, NULL
);
243 static void set_socket_options(org::libmemcached::Instance
* server
)
245 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
247 if (memcached_is_udp(server
->root
))
253 if (server
->root
->snd_timeout
> 0)
255 struct timeval waittime
;
257 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
258 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
260 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
261 &waittime
, (socklen_t
)sizeof(struct timeval
));
267 if (server
->root
->rcv_timeout
> 0)
269 struct timeval waittime
;
271 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
272 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
274 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
275 &waittime
, (socklen_t
)sizeof(struct timeval
));
282 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
285 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
287 // This is not considered a fatal error
290 WATCHPOINT_ERRNO(get_socket_errno());
291 perror("setsockopt(SO_NOSIGPIPE)");
296 if (server
->root
->flags
.no_block
)
298 struct linger linger
;
301 linger
.l_linger
= 0; /* By default on close() just drop the socket */
302 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
303 &linger
, (socklen_t
)sizeof(struct linger
));
308 if (server
->root
->flags
.tcp_nodelay
)
312 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
313 &flag
, (socklen_t
)sizeof(int));
318 if (server
->root
->flags
.tcp_keepalive
)
322 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
323 &flag
, (socklen_t
)sizeof(int));
329 if (server
->root
->tcp_keepidle
> 0)
331 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
332 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
338 if (server
->root
->send_size
> 0)
340 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
341 &server
->root
->send_size
, (socklen_t
)sizeof(int));
346 if (server
->root
->recv_size
> 0)
348 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
349 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
355 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
356 set_socket_nonblocking(server
);
359 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
362 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
364 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
366 memcached_set_errno(*server
, errno
, NULL
);
367 return MEMCACHED_CONNECTION_FAILURE
;
370 struct sockaddr_un servAddr
;
372 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
373 servAddr
.sun_family
= AF_UNIX
;
374 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
377 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
386 case EISCONN
: /* We were spinning waiting on connect */
388 assert(0); // Programmer error
393 WATCHPOINT_ERRNO(errno
);
394 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
395 return MEMCACHED_CONNECTION_FAILURE
;
399 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
401 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
403 return MEMCACHED_SUCCESS
;
406 return MEMCACHED_NOT_SUPPORTED
;
410 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
412 bool timeout_error_occured
= false;
414 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
415 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
418 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
420 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
422 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
423 server
->address_info_next
= NULL
;
424 memcached_return_t rc
;
428 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
434 struct timespec dream
, rem
;
439 nanosleep(&dream
, &rem
);
443 if (memcached_failed(rc
))
449 if (server
->address_info_next
== NULL
)
451 server
->address_info_next
= server
->address_info
;
452 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
455 /* Create the socket */
456 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
458 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
459 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
461 server
->address_info_next
= server
->address_info_next
->ai_next
;
465 int type
= server
->address_info_next
->ai_socktype
;
466 if (HAVE_SOCK_CLOEXEC
)
471 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
473 server
->address_info_next
->ai_protocol
)) < 0)
475 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
478 if (HAVE_SOCK_CLOEXEC
== 0)
484 rval
= fcntl (server
->fd
, F_SETFD
, FD_CLOEXEC
);
485 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
489 set_socket_options(server
);
491 /* connect to server */
492 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
494 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
495 return MEMCACHED_SUCCESS
;
498 /* An error occurred */
499 switch (get_socket_errno())
502 timeout_error_occured
= true;
506 #if EWOULDBLOCK != EAGAIN
509 case EINPROGRESS
: // nonblocking mode - first return
510 case EALREADY
: // nonblocking mode - subsequent returns
512 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
513 memcached_return_t rc
= connect_poll(server
);
515 if (memcached_success(rc
))
517 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
518 return MEMCACHED_SUCCESS
;
521 // A timeout here is treated as an error, we will not retry
522 if (rc
== MEMCACHED_TIMEOUT
)
524 timeout_error_occured
= true;
529 case EISCONN
: // we are connected :-)
530 WATCHPOINT_ASSERT(0); // This is a programmer's error
533 case EINTR
: // Special case, we retry ai_addr
534 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
535 (void)closesocket(server
->fd
);
536 server
->fd
= INVALID_SOCKET
;
543 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
544 (void)closesocket(server
->fd
);
545 server
->fd
= INVALID_SOCKET
;
546 server
->address_info_next
= server
->address_info_next
->ai_next
;
549 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
551 if (timeout_error_occured
)
553 if (server
->fd
!= INVALID_SOCKET
)
555 (void)closesocket(server
->fd
);
556 server
->fd
= INVALID_SOCKET
;
560 WATCHPOINT_STRING("Never got a good file descriptor");
562 if (memcached_has_current_error(*server
))
564 return memcached_instance_error_return(server
);
567 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
569 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
572 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
579 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
580 we get caught spending cycles just waiting.
582 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
584 struct timeval curr_time
;
585 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
588 If we hit server_failure_limit then something is completely wrong about the server.
590 1) If autoeject is enabled we do that.
591 2) If not? We go into timeout again, there is much else to do :(
593 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
596 We just auto_eject if we hit this point
598 if (_is_auto_eject_host(server
->root
))
600 set_last_disconnected_host(server
);
602 // Retry dead servers if requested
603 if (_gettime_success
and server
->root
->dead_timeout
> 0)
605 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
607 // We only retry dead servers once before assuming failure again
608 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
611 memcached_return_t rc
;
612 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
614 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
617 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
620 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
622 // Sanity check/setting
623 if (server
->next_retry
== 0)
625 server
->next_retry
= 1;
629 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
632 If next_retry is less then our current time, then we reset and try everything again.
634 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
636 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
640 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
646 return MEMCACHED_SUCCESS
;
649 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
652 if (server
->fd
!= INVALID_SOCKET
)
654 return MEMCACHED_SUCCESS
;
657 LIBMEMCACHED_MEMCACHED_CONNECT_START();
659 bool in_timeout
= false;
660 memcached_return_t rc
;
661 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
663 set_last_disconnected_host(server
);
667 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
669 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
672 if (server
->hostname
[0] == '/')
674 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
677 /* We need to clean up the multi startup piece */
678 switch (server
->type
)
680 case MEMCACHED_CONNECTION_UDP
:
681 case MEMCACHED_CONNECTION_TCP
:
682 rc
= network_connect(server
);
684 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
686 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
688 rc
= memcached_sasl_authenticate_connection(server
);
689 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
691 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
692 (void)closesocket(server
->fd
);
693 server
->fd
= INVALID_SOCKET
;
699 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
700 rc
= unix_socket_connect(server
);
704 if (memcached_success(rc
))
706 server
->mark_server_as_clean();
707 memcached_version_instance(server
);
710 else if (set_last_disconnected
)
712 set_last_disconnected_host(server
);
713 if (memcached_has_current_error(*server
))
715 memcached_mark_server_for_timeout(server
);
716 assert(memcached_failed(memcached_instance_error_return(server
)));
720 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
721 memcached_mark_server_for_timeout(server
);
724 LIBMEMCACHED_MEMCACHED_CONNECT_END();
729 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
730 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
737 memcached_return_t
memcached_connect_try(org::libmemcached::Instance
* server
)
739 if (server
and server
->root
and server
->root
->state
.is_parsing
)
741 return MEMCACHED_SUCCESS
;
744 return _memcached_connect(server
, false);
747 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
749 return _memcached_connect(server
, true);