1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
46 #define SOCK_CLOEXEC 0
49 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
52 fds
[0].fd
= server
->fd
;
53 fds
[0].events
= POLLOUT
;
57 if (server
->root
->poll_timeout
== 0)
59 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
62 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
65 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
69 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
72 #ifdef TARGET_OS_LINUX
80 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
83 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
85 default: // This should not happen
86 if (fds
[0].revents
& POLLERR
)
89 socklen_t len
= sizeof(err
);
90 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
94 // This should never happen, if it does? Punt.
101 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
102 (void)closesocket(server
->fd
);
103 server
->fd
= INVALID_SOCKET
;
104 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
106 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
109 assert(number_of
== 0);
111 server
->io_wait_count
.timeouts
++;
112 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
115 if (fds
[0].revents
& POLLERR
or
116 fds
[0].revents
& POLLHUP
or
117 fds
[0].revents
& POLLNVAL
)
120 socklen_t len
= sizeof (err
);
121 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
123 // We check the value to see what happened wth the socket.
126 return MEMCACHED_SUCCESS
;
131 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
133 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
135 return MEMCACHED_SUCCESS
;
138 // This should only be possible from ERESTART or EINTR;
139 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
142 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
144 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
145 if (server
->address_info
)
147 freeaddrinfo(server
->address_info
);
148 server
->address_info
= NULL
;
149 server
->address_info_next
= NULL
;
152 char str_port
[NI_MAXSERV
];
153 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", uint32_t(server
->port()));
154 if (length
>= NI_MAXSERV
or length
<= 0)
156 return MEMCACHED_FAILURE
;
159 struct addrinfo hints
;
160 memset(&hints
, 0, sizeof(struct addrinfo
));
163 hints
.ai_family
= AF_INET
;
165 if (memcached_is_udp(server
->root
))
167 hints
.ai_protocol
= IPPROTO_UDP
;
168 hints
.ai_socktype
= SOCK_DGRAM
;
172 hints
.ai_socktype
= SOCK_STREAM
;
173 hints
.ai_protocol
= IPPROTO_TCP
;
176 assert(server
->address_info
== NULL
);
177 assert(server
->address_info_next
== NULL
);
179 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
185 if (server
->address_info
)
187 freeaddrinfo(server
->address_info
);
188 server
->address_info
= NULL
;
189 server
->address_info_next
= NULL
;
191 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
194 if (server
->address_info
)
196 freeaddrinfo(server
->address_info
);
197 server
->address_info
= NULL
;
198 server
->address_info_next
= NULL
;
200 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
203 if (server
->address_info
)
205 freeaddrinfo(server
->address_info
);
206 server
->address_info
= NULL
;
207 server
->address_info_next
= NULL
;
209 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
212 if (server
->address_info
)
214 freeaddrinfo(server
->address_info
);
215 server
->address_info
= NULL
;
216 server
->address_info_next
= NULL
;
218 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
222 if (server
->address_info
)
224 freeaddrinfo(server
->address_info
);
225 server
->address_info
= NULL
;
226 server
->address_info_next
= NULL
;
228 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
231 server
->address_info_next
= server
->address_info
;
232 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
234 return MEMCACHED_SUCCESS
;
237 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
241 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
243 memcached_set_errno(*server
, get_socket_errno(), NULL
);
250 flags
= fcntl(server
->fd
, F_GETFL
, 0);
251 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
255 memcached_set_errno(*server
, errno
, NULL
);
257 else if ((flags
& O_NONBLOCK
) == 0)
263 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
264 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
268 memcached_set_errno(*server
, errno
, NULL
);
274 static void set_socket_options(org::libmemcached::Instance
* server
)
276 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
278 if (memcached_is_udp(server
->root
))
284 if (server
->root
->snd_timeout
> 0)
286 struct timeval waittime
;
288 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
289 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
291 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
292 &waittime
, (socklen_t
)sizeof(struct timeval
));
299 if (server
->root
->rcv_timeout
> 0)
301 struct timeval waittime
;
303 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
304 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
306 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
307 &waittime
, (socklen_t
)sizeof(struct timeval
));
314 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
317 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
321 // This is not considered a fatal error
325 perror("setsockopt(SO_NOSIGPIPE)");
331 if (server
->root
->flags
.no_block
)
333 struct linger linger
;
336 linger
.l_linger
= 0; /* By default on close() just drop the socket */
337 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
338 &linger
, (socklen_t
)sizeof(struct linger
));
343 if (server
->root
->flags
.tcp_nodelay
)
347 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
348 &flag
, (socklen_t
)sizeof(int));
353 if (server
->root
->flags
.tcp_keepalive
)
357 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
358 &flag
, (socklen_t
)sizeof(int));
364 if (server
->root
->tcp_keepidle
> 0)
366 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
367 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
373 if (server
->root
->send_size
> 0)
375 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
376 &server
->root
->send_size
, (socklen_t
)sizeof(int));
381 if (server
->root
->recv_size
> 0)
383 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
384 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
390 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
391 set_socket_nonblocking(server
);
394 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
397 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
399 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
401 memcached_set_errno(*server
, errno
, NULL
);
402 return MEMCACHED_CONNECTION_FAILURE
;
405 struct sockaddr_un servAddr
;
407 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
408 servAddr
.sun_family
= AF_UNIX
;
409 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
412 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
421 case EISCONN
: /* We were spinning waiting on connect */
423 assert(0); // Programmer error
428 WATCHPOINT_ERRNO(errno
);
429 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
430 return MEMCACHED_CONNECTION_FAILURE
;
434 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
436 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
438 return MEMCACHED_SUCCESS
;
441 return MEMCACHED_NOT_SUPPORTED
;
445 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
447 bool timeout_error_occured
= false;
449 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
450 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
453 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
455 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
457 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
458 server
->address_info_next
= NULL
;
459 memcached_return_t rc
= set_hostinfo(server
);
461 if (memcached_failed(rc
))
467 if (server
->address_info_next
== NULL
)
469 server
->address_info_next
= server
->address_info
;
470 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
473 /* Create the socket */
474 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
476 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
477 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
479 server
->address_info_next
= server
->address_info_next
->ai_next
;
483 int type
= server
->address_info_next
->ai_socktype
;
484 if (HAVE_SOCK_CLOEXEC
)
489 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
491 server
->address_info_next
->ai_protocol
)) < 0)
493 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
496 if (HAVE_SOCK_CLOEXEC
== 0)
502 rval
= fcntl (server
->fd
, F_SETFD
, FD_CLOEXEC
);
503 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
507 set_socket_options(server
);
509 /* connect to server */
510 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
512 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
513 return MEMCACHED_SUCCESS
;
516 /* An error occurred */
517 switch (get_socket_errno())
520 timeout_error_occured
= true;
524 #if EWOULDBLOCK != EAGAIN
527 case EINPROGRESS
: // nonblocking mode - first return
528 case EALREADY
: // nonblocking mode - subsequent returns
530 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
531 memcached_return_t rc
= connect_poll(server
);
533 if (memcached_success(rc
))
535 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
536 return MEMCACHED_SUCCESS
;
539 // A timeout here is treated as an error, we will not retry
540 if (rc
== MEMCACHED_TIMEOUT
)
542 timeout_error_occured
= true;
547 case EISCONN
: // we are connected :-)
548 WATCHPOINT_ASSERT(0); // This is a programmer's error
551 case EINTR
: // Special case, we retry ai_addr
552 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
553 (void)closesocket(server
->fd
);
554 server
->fd
= INVALID_SOCKET
;
561 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
562 (void)closesocket(server
->fd
);
563 server
->fd
= INVALID_SOCKET
;
564 server
->address_info_next
= server
->address_info_next
->ai_next
;
567 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
569 if (timeout_error_occured
)
571 if (server
->fd
!= INVALID_SOCKET
)
573 (void)closesocket(server
->fd
);
574 server
->fd
= INVALID_SOCKET
;
578 WATCHPOINT_STRING("Never got a good file descriptor");
580 if (memcached_has_current_error(*server
))
582 return memcached_instance_error_return(server
);
585 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
587 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
590 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
597 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
598 we get caught spending cycles just waiting.
600 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
602 struct timeval curr_time
;
603 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
606 If we hit server_failure_limit then something is completely wrong about the server.
608 1) If autoeject is enabled we do that.
609 2) If not? We go into timeout again, there is much else to do :(
611 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
614 We just auto_eject if we hit this point
616 if (_is_auto_eject_host(server
->root
))
618 set_last_disconnected_host(server
);
620 // Retry dead servers if requested
621 if (_gettime_success
and server
->root
->dead_timeout
> 0)
623 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
625 // We only retry dead servers once before assuming failure again
626 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
629 memcached_return_t rc
;
630 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
632 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
635 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
638 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
640 // Sanity check/setting
641 if (server
->next_retry
== 0)
643 server
->next_retry
= 1;
647 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
650 If next_retry is less then our current time, then we reset and try everything again.
652 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
654 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
658 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
664 return MEMCACHED_SUCCESS
;
667 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
670 if (server
->fd
!= INVALID_SOCKET
)
672 return MEMCACHED_SUCCESS
;
675 LIBMEMCACHED_MEMCACHED_CONNECT_START();
677 bool in_timeout
= false;
678 memcached_return_t rc
;
679 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
681 set_last_disconnected_host(server
);
685 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
687 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
690 if (server
->hostname
[0] == '/')
692 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
695 /* We need to clean up the multi startup piece */
696 switch (server
->type
)
698 case MEMCACHED_CONNECTION_UDP
:
699 case MEMCACHED_CONNECTION_TCP
:
700 rc
= network_connect(server
);
702 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
704 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
706 rc
= memcached_sasl_authenticate_connection(server
);
707 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
709 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
710 (void)closesocket(server
->fd
);
711 server
->fd
= INVALID_SOCKET
;
717 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
718 rc
= unix_socket_connect(server
);
722 if (memcached_success(rc
))
724 server
->mark_server_as_clean();
725 memcached_version_instance(server
);
728 else if (set_last_disconnected
)
730 set_last_disconnected_host(server
);
731 if (memcached_has_current_error(*server
))
733 memcached_mark_server_for_timeout(server
);
734 assert(memcached_failed(memcached_instance_error_return(server
)));
738 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
739 memcached_mark_server_for_timeout(server
);
742 LIBMEMCACHED_MEMCACHED_CONNECT_END();
747 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
748 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
755 memcached_return_t
memcached_connect_try(org::libmemcached::Instance
* server
)
757 if (server
and server
->root
and server
->root
->state
.is_parsing
)
759 return MEMCACHED_SUCCESS
;
762 return _memcached_connect(server
, false);
765 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
767 return _memcached_connect(server
, true);