1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
46 #define SOCK_CLOEXEC 0
49 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
52 fds
[0].fd
= server
->fd
;
53 fds
[0].events
= POLLOUT
;
57 if (server
->root
->poll_timeout
== 0)
59 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
62 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
65 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
69 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
72 #ifdef TARGET_OS_LINUX
80 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
83 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
85 default: // This should not happen
86 if (fds
[0].revents
& POLLERR
)
89 socklen_t len
= sizeof(err
);
90 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
94 // This should never happen, if it does? Punt.
101 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
102 (void)closesocket(server
->fd
);
103 server
->fd
= INVALID_SOCKET
;
104 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
106 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
109 assert(number_of
== 0);
111 server
->io_wait_count
.timeouts
++;
112 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
115 if (fds
[0].revents
& POLLERR
or
116 fds
[0].revents
& POLLHUP
or
117 fds
[0].revents
& POLLNVAL
)
120 socklen_t len
= sizeof (err
);
121 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
123 // We check the value to see what happened wth the socket.
126 return MEMCACHED_SUCCESS
;
131 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
133 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
135 return MEMCACHED_SUCCESS
;
138 // This should only be possible from ERESTART or EINTR;
139 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
142 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
144 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
145 if (server
->address_info
)
147 freeaddrinfo(server
->address_info
);
148 server
->address_info
= NULL
;
149 server
->address_info_next
= NULL
;
152 char str_port
[NI_MAXSERV
];
153 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", uint32_t(server
->port()));
154 if (length
>= NI_MAXSERV
or length
<= 0)
156 return MEMCACHED_FAILURE
;
159 struct addrinfo hints
;
160 memset(&hints
, 0, sizeof(struct addrinfo
));
163 hints
.ai_family
= AF_INET
;
165 if (memcached_is_udp(server
->root
))
167 hints
.ai_protocol
= IPPROTO_UDP
;
168 hints
.ai_socktype
= SOCK_DGRAM
;
172 hints
.ai_socktype
= SOCK_STREAM
;
173 hints
.ai_protocol
= IPPROTO_TCP
;
176 server
->address_info
= NULL
;
178 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
184 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
187 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
190 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
193 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
197 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
200 server
->address_info_next
= server
->address_info
;
201 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
203 return MEMCACHED_SUCCESS
;
206 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
210 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
212 memcached_set_errno(*server
, get_socket_errno(), NULL
);
219 flags
= fcntl(server
->fd
, F_GETFL
, 0);
220 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
224 memcached_set_errno(*server
, errno
, NULL
);
226 else if ((flags
& O_NONBLOCK
) == 0)
232 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
233 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
237 memcached_set_errno(*server
, errno
, NULL
);
243 static void set_socket_options(org::libmemcached::Instance
* server
)
245 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
247 if (memcached_is_udp(server
->root
))
253 if (server
->root
->snd_timeout
)
255 struct timeval waittime
;
258 waittime
.tv_usec
= server
->root
->snd_timeout
;
260 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
261 &waittime
, (socklen_t
)sizeof(struct timeval
));
268 if (server
->root
->rcv_timeout
)
270 struct timeval waittime
;
273 waittime
.tv_usec
= server
->root
->rcv_timeout
;
275 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
276 &waittime
, (socklen_t
)sizeof(struct timeval
));
283 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
286 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
288 // This is not considered a fatal error
291 WATCHPOINT_ERRNO(get_socket_errno());
292 perror("setsockopt(SO_NOSIGPIPE)");
297 if (server
->root
->flags
.no_block
)
299 struct linger linger
;
302 linger
.l_linger
= 0; /* By default on close() just drop the socket */
303 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
304 &linger
, (socklen_t
)sizeof(struct linger
));
309 if (server
->root
->flags
.tcp_nodelay
)
313 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
314 &flag
, (socklen_t
)sizeof(int));
319 if (server
->root
->flags
.tcp_keepalive
)
323 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
324 &flag
, (socklen_t
)sizeof(int));
330 if (server
->root
->tcp_keepidle
> 0)
332 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
333 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
339 if (server
->root
->send_size
> 0)
341 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
342 &server
->root
->send_size
, (socklen_t
)sizeof(int));
347 if (server
->root
->recv_size
> 0)
349 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
350 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
356 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
357 set_socket_nonblocking(server
);
360 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
363 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
365 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
367 memcached_set_errno(*server
, errno
, NULL
);
368 return MEMCACHED_CONNECTION_FAILURE
;
371 struct sockaddr_un servAddr
;
373 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
374 servAddr
.sun_family
= AF_UNIX
;
375 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
378 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
387 case EISCONN
: /* We were spinning waiting on connect */
389 assert(0); // Programmer error
394 WATCHPOINT_ERRNO(errno
);
395 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
396 return MEMCACHED_CONNECTION_FAILURE
;
400 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
402 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
404 return MEMCACHED_SUCCESS
;
407 return MEMCACHED_NOT_SUPPORTED
;
411 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
413 bool timeout_error_occured
= false;
415 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
416 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
419 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
421 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
423 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
424 server
->address_info_next
= NULL
;
425 memcached_return_t rc
;
429 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
435 struct timespec dream
, rem
;
440 nanosleep(&dream
, &rem
);
444 if (memcached_failed(rc
))
450 if (server
->address_info_next
== NULL
)
452 server
->address_info_next
= server
->address_info
;
453 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
456 /* Create the socket */
457 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
459 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
460 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
462 server
->address_info_next
= server
->address_info_next
->ai_next
;
466 int type
= server
->address_info_next
->ai_socktype
;
467 if (HAVE_SOCK_CLOEXEC
)
472 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
474 server
->address_info_next
->ai_protocol
)) < 0)
476 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
479 if (HAVE_SOCK_CLOEXEC
== 0)
485 rval
= fcntl (server
->fd
, F_SETFD
, FD_CLOEXEC
);
486 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
490 set_socket_options(server
);
492 /* connect to server */
493 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
495 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
496 return MEMCACHED_SUCCESS
;
499 /* An error occurred */
500 switch (get_socket_errno())
503 timeout_error_occured
= true;
507 #if EWOULDBLOCK != EAGAIN
510 case EINPROGRESS
: // nonblocking mode - first return
511 case EALREADY
: // nonblocking mode - subsequent returns
513 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
514 memcached_return_t rc
= connect_poll(server
);
516 if (memcached_success(rc
))
518 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
519 return MEMCACHED_SUCCESS
;
522 // A timeout here is treated as an error, we will not retry
523 if (rc
== MEMCACHED_TIMEOUT
)
525 timeout_error_occured
= true;
530 case EISCONN
: // we are connected :-)
531 WATCHPOINT_ASSERT(0); // This is a programmer's error
534 case EINTR
: // Special case, we retry ai_addr
535 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
536 (void)closesocket(server
->fd
);
537 server
->fd
= INVALID_SOCKET
;
544 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
545 (void)closesocket(server
->fd
);
546 server
->fd
= INVALID_SOCKET
;
547 server
->address_info_next
= server
->address_info_next
->ai_next
;
550 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
552 if (timeout_error_occured
)
554 if (server
->fd
!= INVALID_SOCKET
)
556 (void)closesocket(server
->fd
);
557 server
->fd
= INVALID_SOCKET
;
561 WATCHPOINT_STRING("Never got a good file descriptor");
563 if (memcached_has_current_error(*server
))
565 return memcached_instance_error_return(server
);
568 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
570 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
573 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
580 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
581 we get caught spending cycles just waiting.
583 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
585 struct timeval curr_time
;
586 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
589 If we hit server_failure_limit then something is completely wrong about the server.
591 1) If autoeject is enabled we do that.
592 2) If not? We go into timeout again, there is much else to do :(
594 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
597 We just auto_eject if we hit this point
599 if (_is_auto_eject_host(server
->root
))
601 set_last_disconnected_host(server
);
603 // Retry dead servers if requested
604 if (_gettime_success
and server
->root
->dead_timeout
> 0)
606 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
608 // We only retry dead servers once before assuming failure again
609 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
612 memcached_return_t rc
;
613 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
615 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
618 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
621 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
623 // Sanity check/setting
624 if (server
->next_retry
== 0)
626 server
->next_retry
= 1;
630 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
633 If next_retry is less then our current time, then we reset and try everything again.
635 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
637 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
641 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
647 return MEMCACHED_SUCCESS
;
650 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
653 if (server
->fd
!= INVALID_SOCKET
)
655 return MEMCACHED_SUCCESS
;
658 LIBMEMCACHED_MEMCACHED_CONNECT_START();
660 bool in_timeout
= false;
661 memcached_return_t rc
;
662 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
664 set_last_disconnected_host(server
);
668 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
670 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
673 if (server
->hostname
[0] == '/')
675 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
678 /* We need to clean up the multi startup piece */
679 switch (server
->type
)
681 case MEMCACHED_CONNECTION_UDP
:
682 case MEMCACHED_CONNECTION_TCP
:
683 rc
= network_connect(server
);
685 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
687 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
689 rc
= memcached_sasl_authenticate_connection(server
);
690 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
692 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
693 (void)closesocket(server
->fd
);
694 server
->fd
= INVALID_SOCKET
;
700 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
701 rc
= unix_socket_connect(server
);
705 if (memcached_success(rc
))
707 server
->mark_server_as_clean();
708 memcached_version_instance(server
);
711 else if (set_last_disconnected
)
713 set_last_disconnected_host(server
);
714 if (memcached_has_current_error(*server
))
716 memcached_mark_server_for_timeout(server
);
717 assert(memcached_failed(memcached_instance_error_return(server
)));
721 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
722 memcached_mark_server_for_timeout(server
);
725 LIBMEMCACHED_MEMCACHED_CONNECT_END();
730 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
731 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
738 memcached_return_t
memcached_connect_try(org::libmemcached::Instance
* server
)
740 return _memcached_connect(server
, false);
743 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
745 return _memcached_connect(server
, true);