1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
46 #define SOCK_CLOEXEC 0
49 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
52 fds
[0].fd
= server
->fd
;
53 fds
[0].events
= POLLOUT
;
57 if (server
->root
->poll_timeout
== 0)
59 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
62 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
65 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
69 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
72 #ifdef TARGET_OS_LINUX
80 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
83 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
85 default: // This should not happen
86 if (fds
[0].revents
& POLLERR
)
89 socklen_t len
= sizeof(err
);
90 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
94 // This should never happen, if it does? Punt.
101 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
102 (void)closesocket(server
->fd
);
103 server
->fd
= INVALID_SOCKET
;
104 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
106 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
109 assert(number_of
== 0);
111 server
->io_wait_count
.timeouts
++;
112 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
115 if (fds
[0].revents
& POLLERR
or
116 fds
[0].revents
& POLLHUP
or
117 fds
[0].revents
& POLLNVAL
)
120 socklen_t len
= sizeof (err
);
121 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
123 // We check the value to see what happened wth the socket.
126 return MEMCACHED_SUCCESS
;
131 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
133 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
135 return MEMCACHED_SUCCESS
;
138 // This should only be possible from ERESTART or EINTR;
139 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
142 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
144 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
145 if (server
->address_info
)
147 freeaddrinfo(server
->address_info
);
148 server
->address_info
= NULL
;
149 server
->address_info_next
= NULL
;
152 char str_port
[NI_MAXSERV
];
153 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", uint32_t(server
->port()));
154 if (length
>= NI_MAXSERV
or length
<= 0)
156 return MEMCACHED_FAILURE
;
159 struct addrinfo hints
;
160 memset(&hints
, 0, sizeof(struct addrinfo
));
163 hints
.ai_family
= AF_INET
;
165 if (memcached_is_udp(server
->root
))
167 hints
.ai_protocol
= IPPROTO_UDP
;
168 hints
.ai_socktype
= SOCK_DGRAM
;
172 hints
.ai_socktype
= SOCK_STREAM
;
173 hints
.ai_protocol
= IPPROTO_TCP
;
176 server
->address_info
= NULL
;
178 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
184 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
187 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
190 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
193 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
197 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
200 server
->address_info_next
= server
->address_info
;
201 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
203 return MEMCACHED_SUCCESS
;
206 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
210 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
212 memcached_set_errno(*server
, get_socket_errno(), NULL
);
219 flags
= fcntl(server
->fd
, F_GETFL
, 0);
220 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
224 memcached_set_errno(*server
, errno
, NULL
);
226 else if ((flags
& O_NONBLOCK
) == 0)
232 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
233 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
237 memcached_set_errno(*server
, errno
, NULL
);
243 static void set_socket_options(org::libmemcached::Instance
* server
)
245 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
247 if (memcached_is_udp(server
->root
))
253 if (server
->root
->snd_timeout
> 0)
255 struct timeval waittime
;
257 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
258 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
260 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
261 &waittime
, (socklen_t
)sizeof(struct timeval
));
268 if (server
->root
->rcv_timeout
> 0)
270 struct timeval waittime
;
272 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
273 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
275 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
276 &waittime
, (socklen_t
)sizeof(struct timeval
));
283 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
286 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
290 // This is not considered a fatal error
294 perror("setsockopt(SO_NOSIGPIPE)");
300 if (server
->root
->flags
.no_block
)
302 struct linger linger
;
305 linger
.l_linger
= 0; /* By default on close() just drop the socket */
306 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
307 &linger
, (socklen_t
)sizeof(struct linger
));
312 if (server
->root
->flags
.tcp_nodelay
)
316 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
317 &flag
, (socklen_t
)sizeof(int));
322 if (server
->root
->flags
.tcp_keepalive
)
326 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
327 &flag
, (socklen_t
)sizeof(int));
333 if (server
->root
->tcp_keepidle
> 0)
335 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
336 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
342 if (server
->root
->send_size
> 0)
344 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
345 &server
->root
->send_size
, (socklen_t
)sizeof(int));
350 if (server
->root
->recv_size
> 0)
352 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
353 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
359 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
360 set_socket_nonblocking(server
);
363 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
366 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
368 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
370 memcached_set_errno(*server
, errno
, NULL
);
371 return MEMCACHED_CONNECTION_FAILURE
;
374 struct sockaddr_un servAddr
;
376 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
377 servAddr
.sun_family
= AF_UNIX
;
378 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
381 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
390 case EISCONN
: /* We were spinning waiting on connect */
392 assert(0); // Programmer error
397 WATCHPOINT_ERRNO(errno
);
398 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
399 return MEMCACHED_CONNECTION_FAILURE
;
403 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
405 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
407 return MEMCACHED_SUCCESS
;
410 return MEMCACHED_NOT_SUPPORTED
;
414 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
416 bool timeout_error_occured
= false;
418 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
419 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
422 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
424 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
426 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
427 server
->address_info_next
= NULL
;
428 memcached_return_t rc
;
432 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
438 struct timespec dream
, rem
;
443 nanosleep(&dream
, &rem
);
447 if (memcached_failed(rc
))
453 if (server
->address_info_next
== NULL
)
455 server
->address_info_next
= server
->address_info
;
456 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
459 /* Create the socket */
460 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
462 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
463 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
465 server
->address_info_next
= server
->address_info_next
->ai_next
;
469 int type
= server
->address_info_next
->ai_socktype
;
470 if (HAVE_SOCK_CLOEXEC
)
475 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
477 server
->address_info_next
->ai_protocol
)) < 0)
479 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
482 if (HAVE_SOCK_CLOEXEC
== 0)
488 rval
= fcntl (server
->fd
, F_SETFD
, FD_CLOEXEC
);
489 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
493 set_socket_options(server
);
495 /* connect to server */
496 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
498 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
499 return MEMCACHED_SUCCESS
;
502 /* An error occurred */
503 switch (get_socket_errno())
506 timeout_error_occured
= true;
510 #if EWOULDBLOCK != EAGAIN
513 case EINPROGRESS
: // nonblocking mode - first return
514 case EALREADY
: // nonblocking mode - subsequent returns
516 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
517 memcached_return_t rc
= connect_poll(server
);
519 if (memcached_success(rc
))
521 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
522 return MEMCACHED_SUCCESS
;
525 // A timeout here is treated as an error, we will not retry
526 if (rc
== MEMCACHED_TIMEOUT
)
528 timeout_error_occured
= true;
533 case EISCONN
: // we are connected :-)
534 WATCHPOINT_ASSERT(0); // This is a programmer's error
537 case EINTR
: // Special case, we retry ai_addr
538 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
539 (void)closesocket(server
->fd
);
540 server
->fd
= INVALID_SOCKET
;
547 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
548 (void)closesocket(server
->fd
);
549 server
->fd
= INVALID_SOCKET
;
550 server
->address_info_next
= server
->address_info_next
->ai_next
;
553 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
555 if (timeout_error_occured
)
557 if (server
->fd
!= INVALID_SOCKET
)
559 (void)closesocket(server
->fd
);
560 server
->fd
= INVALID_SOCKET
;
564 WATCHPOINT_STRING("Never got a good file descriptor");
566 if (memcached_has_current_error(*server
))
568 return memcached_instance_error_return(server
);
571 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
573 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
576 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
583 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
584 we get caught spending cycles just waiting.
586 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
588 struct timeval curr_time
;
589 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
592 If we hit server_failure_limit then something is completely wrong about the server.
594 1) If autoeject is enabled we do that.
595 2) If not? We go into timeout again, there is much else to do :(
597 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
600 We just auto_eject if we hit this point
602 if (_is_auto_eject_host(server
->root
))
604 set_last_disconnected_host(server
);
606 // Retry dead servers if requested
607 if (_gettime_success
and server
->root
->dead_timeout
> 0)
609 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
611 // We only retry dead servers once before assuming failure again
612 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
615 memcached_return_t rc
;
616 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
618 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
621 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
624 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
626 // Sanity check/setting
627 if (server
->next_retry
== 0)
629 server
->next_retry
= 1;
633 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
636 If next_retry is less then our current time, then we reset and try everything again.
638 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
640 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
644 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
650 return MEMCACHED_SUCCESS
;
653 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
656 if (server
->fd
!= INVALID_SOCKET
)
658 return MEMCACHED_SUCCESS
;
661 LIBMEMCACHED_MEMCACHED_CONNECT_START();
663 bool in_timeout
= false;
664 memcached_return_t rc
;
665 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
667 set_last_disconnected_host(server
);
671 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
673 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
676 if (server
->hostname
[0] == '/')
678 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
681 /* We need to clean up the multi startup piece */
682 switch (server
->type
)
684 case MEMCACHED_CONNECTION_UDP
:
685 case MEMCACHED_CONNECTION_TCP
:
686 rc
= network_connect(server
);
688 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
690 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
692 rc
= memcached_sasl_authenticate_connection(server
);
693 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
695 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
696 (void)closesocket(server
->fd
);
697 server
->fd
= INVALID_SOCKET
;
703 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
704 rc
= unix_socket_connect(server
);
708 if (memcached_success(rc
))
710 server
->mark_server_as_clean();
711 memcached_version_instance(server
);
714 else if (set_last_disconnected
)
716 set_last_disconnected_host(server
);
717 if (memcached_has_current_error(*server
))
719 memcached_mark_server_for_timeout(server
);
720 assert(memcached_failed(memcached_instance_error_return(server
)));
724 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
725 memcached_mark_server_for_timeout(server
);
728 LIBMEMCACHED_MEMCACHED_CONNECT_END();
733 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
734 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
741 memcached_return_t
memcached_connect_try(org::libmemcached::Instance
* server
)
743 if (server
and server
->root
and server
->root
->state
.is_parsing
)
745 return MEMCACHED_SUCCESS
;
748 return _memcached_connect(server
, false);
751 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
753 return _memcached_connect(server
, true);