1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
46 #define SOCK_CLOEXEC 0
49 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
52 fds
[0].fd
= server
->fd
;
53 fds
[0].events
= POLLOUT
;
57 if (server
->root
->poll_timeout
== 0)
59 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
62 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
65 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
69 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
72 #ifdef TARGET_OS_LINUX
80 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
83 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
85 default: // This should not happen
86 if (fds
[0].revents
& POLLERR
)
89 socklen_t len
= sizeof(err
);
90 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
94 // This should never happen, if it does? Punt.
101 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
102 (void)closesocket(server
->fd
);
103 server
->fd
= INVALID_SOCKET
;
104 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
106 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
109 assert(number_of
== 0);
111 server
->io_wait_count
.timeouts
++;
112 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
115 if (fds
[0].revents
& POLLERR
or
116 fds
[0].revents
& POLLHUP
or
117 fds
[0].revents
& POLLNVAL
)
120 socklen_t len
= sizeof (err
);
121 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
123 // We check the value to see what happened wth the socket.
126 return MEMCACHED_SUCCESS
;
131 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
133 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
135 return MEMCACHED_SUCCESS
;
138 // This should only be possible from ERESTART or EINTR;
139 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
142 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
144 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
145 if (server
->address_info
)
147 freeaddrinfo(server
->address_info
);
148 server
->address_info
= NULL
;
149 server
->address_info_next
= NULL
;
152 char str_port
[NI_MAXSERV
];
153 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", uint32_t(server
->port()));
154 if (length
>= NI_MAXSERV
or length
<= 0)
156 return MEMCACHED_FAILURE
;
159 struct addrinfo hints
;
160 memset(&hints
, 0, sizeof(struct addrinfo
));
163 hints
.ai_family
= AF_INET
;
165 if (memcached_is_udp(server
->root
))
167 hints
.ai_protocol
= IPPROTO_UDP
;
168 hints
.ai_socktype
= SOCK_DGRAM
;
172 hints
.ai_socktype
= SOCK_STREAM
;
173 hints
.ai_protocol
= IPPROTO_TCP
;
176 server
->address_info
= NULL
;
178 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
184 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
187 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
190 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
193 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
197 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
200 server
->address_info_next
= server
->address_info
;
201 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
203 return MEMCACHED_SUCCESS
;
206 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
210 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
212 memcached_set_errno(*server
, get_socket_errno(), NULL
);
219 flags
= fcntl(server
->fd
, F_GETFL
, 0);
220 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
224 memcached_set_errno(*server
, errno
, NULL
);
226 else if ((flags
& O_NONBLOCK
) == 0)
232 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
233 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
237 memcached_set_errno(*server
, errno
, NULL
);
243 static void set_socket_options(org::libmemcached::Instance
* server
)
245 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
247 if (memcached_is_udp(server
->root
))
253 if (server
->root
->snd_timeout
)
255 struct timeval waittime
;
258 waittime
.tv_usec
= server
->root
->snd_timeout
;
260 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
261 &waittime
, (socklen_t
)sizeof(struct timeval
));
267 if (server
->root
->rcv_timeout
)
269 struct timeval waittime
;
272 waittime
.tv_usec
= server
->root
->rcv_timeout
;
274 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
275 &waittime
, (socklen_t
)sizeof(struct timeval
));
281 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
284 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
286 // This is not considered a fatal error
289 WATCHPOINT_ERRNO(get_socket_errno());
290 perror("setsockopt(SO_NOSIGPIPE)");
295 if (server
->root
->flags
.no_block
)
297 struct linger linger
;
300 linger
.l_linger
= 0; /* By default on close() just drop the socket */
301 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
302 &linger
, (socklen_t
)sizeof(struct linger
));
306 if (server
->root
->flags
.tcp_nodelay
)
310 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
311 &flag
, (socklen_t
)sizeof(int));
315 if (server
->root
->flags
.tcp_keepalive
)
319 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
320 &flag
, (socklen_t
)sizeof(int));
325 if (server
->root
->tcp_keepidle
> 0)
327 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
328 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
333 if (server
->root
->send_size
> 0)
335 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
336 &server
->root
->send_size
, (socklen_t
)sizeof(int));
340 if (server
->root
->recv_size
> 0)
342 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
343 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
348 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
349 set_socket_nonblocking(server
);
352 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
355 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
357 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
359 memcached_set_errno(*server
, errno
, NULL
);
360 return MEMCACHED_CONNECTION_FAILURE
;
363 struct sockaddr_un servAddr
;
365 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
366 servAddr
.sun_family
= AF_UNIX
;
367 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
370 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
379 case EISCONN
: /* We were spinning waiting on connect */
381 assert(0); // Programmer error
386 WATCHPOINT_ERRNO(errno
);
387 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
388 return MEMCACHED_CONNECTION_FAILURE
;
392 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
394 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
396 return MEMCACHED_SUCCESS
;
399 return MEMCACHED_NOT_SUPPORTED
;
403 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
405 bool timeout_error_occured
= false;
407 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
408 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
411 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
413 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
415 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
416 server
->address_info_next
= NULL
;
417 memcached_return_t rc
;
421 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
427 struct timespec dream
, rem
;
432 nanosleep(&dream
, &rem
);
436 if (memcached_failed(rc
))
442 if (server
->address_info_next
== NULL
)
444 server
->address_info_next
= server
->address_info
;
445 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
448 /* Create the socket */
449 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
451 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
452 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
454 server
->address_info_next
= server
->address_info_next
->ai_next
;
458 int type
= server
->address_info_next
->ai_socktype
;
459 if (HAVE_SOCK_CLOEXEC
)
464 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
466 server
->address_info_next
->ai_protocol
)) < 0)
468 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
471 if (HAVE_SOCK_CLOEXEC
== 0)
477 rval
= fcntl (server
->fd
, F_SETFD
, FD_CLOEXEC
);
478 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
482 set_socket_options(server
);
484 /* connect to server */
485 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
487 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
488 return MEMCACHED_SUCCESS
;
491 /* An error occurred */
492 switch (get_socket_errno())
495 timeout_error_occured
= true;
499 #if EWOULDBLOCK != EAGAIN
502 case EINPROGRESS
: // nonblocking mode - first return
503 case EALREADY
: // nonblocking mode - subsequent returns
505 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
506 memcached_return_t rc
= connect_poll(server
);
508 if (memcached_success(rc
))
510 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
511 return MEMCACHED_SUCCESS
;
514 // A timeout here is treated as an error, we will not retry
515 if (rc
== MEMCACHED_TIMEOUT
)
517 timeout_error_occured
= true;
522 case EISCONN
: // we are connected :-)
523 WATCHPOINT_ASSERT(0); // This is a programmer's error
526 case EINTR
: // Special case, we retry ai_addr
527 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
528 (void)closesocket(server
->fd
);
529 server
->fd
= INVALID_SOCKET
;
536 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
537 (void)closesocket(server
->fd
);
538 server
->fd
= INVALID_SOCKET
;
539 server
->address_info_next
= server
->address_info_next
->ai_next
;
542 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
544 if (timeout_error_occured
)
546 if (server
->fd
!= INVALID_SOCKET
)
548 (void)closesocket(server
->fd
);
549 server
->fd
= INVALID_SOCKET
;
553 WATCHPOINT_STRING("Never got a good file descriptor");
555 if (memcached_has_current_error(*server
))
557 return memcached_instance_error_return(server
);
560 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
562 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
565 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
572 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
573 we get caught spending cycles just waiting.
575 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
577 struct timeval curr_time
;
578 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
581 If we hit server_failure_limit then something is completely wrong about the server.
583 1) If autoeject is enabled we do that.
584 2) If not? We go into timeout again, there is much else to do :(
586 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
589 We just auto_eject if we hit this point
591 if (_is_auto_eject_host(server
->root
))
593 set_last_disconnected_host(server
);
595 // Retry dead servers if requested
596 if (_gettime_success
and server
->root
->dead_timeout
> 0)
598 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
600 // We only retry dead servers once before assuming failure again
601 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
604 memcached_return_t rc
;
605 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
607 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
610 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
613 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
615 // Sanity check/setting
616 if (server
->next_retry
== 0)
618 server
->next_retry
= 1;
622 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
625 If next_retry is less then our current time, then we reset and try everything again.
627 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
629 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
633 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
639 return MEMCACHED_SUCCESS
;
642 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
645 if (server
->fd
!= INVALID_SOCKET
)
647 return MEMCACHED_SUCCESS
;
650 LIBMEMCACHED_MEMCACHED_CONNECT_START();
652 bool in_timeout
= false;
653 memcached_return_t rc
;
654 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
656 set_last_disconnected_host(server
);
660 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
662 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
665 if (server
->hostname
[0] == '/')
667 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
670 /* We need to clean up the multi startup piece */
671 switch (server
->type
)
673 case MEMCACHED_CONNECTION_UDP
:
674 case MEMCACHED_CONNECTION_TCP
:
675 rc
= network_connect(server
);
677 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
679 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
681 rc
= memcached_sasl_authenticate_connection(server
);
682 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
684 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
685 (void)closesocket(server
->fd
);
686 server
->fd
= INVALID_SOCKET
;
692 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
693 rc
= unix_socket_connect(server
);
697 if (memcached_success(rc
))
699 server
->mark_server_as_clean();
700 memcached_version_instance(server
);
703 else if (set_last_disconnected
)
705 set_last_disconnected_host(server
);
706 if (memcached_has_current_error(*server
))
708 memcached_mark_server_for_timeout(server
);
709 assert(memcached_failed(memcached_instance_error_return(server
)));
713 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
714 memcached_mark_server_for_timeout(server
);
717 LIBMEMCACHED_MEMCACHED_CONNECT_END();
722 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
723 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
730 memcached_return_t
memcached_connect_try(org::libmemcached::Instance
* server
)
732 return _memcached_connect(server
, false);
735 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
737 return _memcached_connect(server
, true);