1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
45 static memcached_return_t
connect_poll(memcached_server_st
*server
)
48 fds
[0].fd
= server
->fd
;
49 fds
[0].events
= POLLOUT
;
53 if (server
->root
->poll_timeout
== 0)
55 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
58 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
61 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
65 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
68 #ifdef TARGET_OS_LINUX
76 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
79 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
81 default: // This should not happen
82 if (fds
[0].revents
& POLLERR
)
85 socklen_t len
= sizeof(err
);
86 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
90 // This should never happen, if it does? Punt.
97 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
98 (void)closesocket(server
->fd
);
99 server
->fd
= INVALID_SOCKET
;
100 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
102 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
105 assert(number_of
== 0);
107 server
->io_wait_count
.timeouts
++;
108 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
111 if (fds
[0].revents
& POLLERR
or
112 fds
[0].revents
& POLLHUP
or
113 fds
[0].revents
& POLLNVAL
)
116 socklen_t len
= sizeof (err
);
117 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
119 // We check the value to see what happened wth the socket.
122 return MEMCACHED_SUCCESS
;
127 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
129 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
131 return MEMCACHED_SUCCESS
;
134 // This should only be possible from ERESTART or EINTR;
135 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
138 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
140 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
141 if (server
->address_info
)
143 freeaddrinfo(server
->address_info
);
144 server
->address_info
= NULL
;
145 server
->address_info_next
= NULL
;
148 char str_port
[NI_MAXSERV
];
149 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
150 if (length
>= NI_MAXSERV
or length
< 0)
152 return MEMCACHED_FAILURE
;
155 struct addrinfo hints
;
156 memset(&hints
, 0, sizeof(struct addrinfo
));
159 hints
.ai_family
= AF_INET
;
161 if (memcached_is_udp(server
->root
))
163 hints
.ai_protocol
= IPPROTO_UDP
;
164 hints
.ai_socktype
= SOCK_DGRAM
;
168 hints
.ai_socktype
= SOCK_STREAM
;
169 hints
.ai_protocol
= IPPROTO_TCP
;
172 server
->address_info
= NULL
;
174 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
180 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
183 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
186 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
189 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
193 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
196 server
->address_info_next
= server
->address_info
;
197 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
199 return MEMCACHED_SUCCESS
;
202 static inline void set_socket_nonblocking(memcached_server_st
*server
)
206 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
208 memcached_set_errno(*server
, get_socket_errno(), NULL
);
215 flags
= fcntl(server
->fd
, F_GETFL
, 0);
216 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
220 memcached_set_errno(*server
, errno
, NULL
);
222 else if ((flags
& O_NONBLOCK
) == 0)
228 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
229 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
233 memcached_set_errno(*server
, errno
, NULL
);
239 static void set_socket_options(memcached_server_st
*server
)
241 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
243 if (memcached_is_udp(server
->root
))
249 if (server
->root
->snd_timeout
)
251 struct timeval waittime
;
254 waittime
.tv_usec
= server
->root
->snd_timeout
;
256 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
257 &waittime
, (socklen_t
)sizeof(struct timeval
));
263 if (server
->root
->rcv_timeout
)
265 struct timeval waittime
;
268 waittime
.tv_usec
= server
->root
->rcv_timeout
;
270 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
271 &waittime
, (socklen_t
)sizeof(struct timeval
));
277 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
280 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
282 // This is not considered a fatal error
285 WATCHPOINT_ERRNO(get_socket_errno());
286 perror("setsockopt(SO_NOSIGPIPE)");
291 if (server
->root
->flags
.no_block
)
293 struct linger linger
;
296 linger
.l_linger
= 0; /* By default on close() just drop the socket */
297 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
298 &linger
, (socklen_t
)sizeof(struct linger
));
302 if (server
->root
->flags
.tcp_nodelay
)
306 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
307 &flag
, (socklen_t
)sizeof(int));
311 if (server
->root
->flags
.tcp_keepalive
)
315 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
316 &flag
, (socklen_t
)sizeof(int));
321 if (server
->root
->tcp_keepidle
> 0)
323 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
324 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
329 if (server
->root
->send_size
> 0)
331 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
332 &server
->root
->send_size
, (socklen_t
)sizeof(int));
336 if (server
->root
->recv_size
> 0)
338 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
339 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
344 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
345 set_socket_nonblocking(server
);
348 static memcached_return_t
unix_socket_connect(memcached_server_st
*server
)
351 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
353 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
355 memcached_set_errno(*server
, errno
, NULL
);
356 return MEMCACHED_CONNECTION_FAILURE
;
359 struct sockaddr_un servAddr
;
361 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
362 servAddr
.sun_family
= AF_UNIX
;
363 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
366 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
375 case EISCONN
: /* We were spinning waiting on connect */
377 assert(0); // Programmer error
382 WATCHPOINT_ERRNO(errno
);
383 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
384 return MEMCACHED_CONNECTION_FAILURE
;
388 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
390 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
392 return MEMCACHED_SUCCESS
;
395 return MEMCACHED_NOT_SUPPORTED
;
399 static memcached_return_t
network_connect(memcached_server_st
*server
)
401 bool timeout_error_occured
= false;
403 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
404 WATCHPOINT_ASSERT(server
->cursor_active
== 0);
407 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
409 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
411 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
412 server
->address_info_next
= NULL
;
413 memcached_return_t rc
;
417 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
423 struct timespec dream
, rem
;
428 nanosleep(&dream
, &rem
);
432 if (memcached_failed(rc
))
438 if (server
->address_info_next
== NULL
)
440 server
->address_info_next
= server
->address_info
;
441 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
444 /* Create the socket */
445 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
447 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
448 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
450 server
->address_info_next
= server
->address_info_next
->ai_next
;
454 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
455 server
->address_info_next
->ai_socktype
,
456 server
->address_info_next
->ai_protocol
)) < 0)
458 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
461 set_socket_options(server
);
463 /* connect to server */
464 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
466 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
467 return MEMCACHED_SUCCESS
;
470 /* An error occurred */
471 switch (get_socket_errno())
474 timeout_error_occured
= true;
478 #if EWOULDBLOCK != EAGAIN
481 case EINPROGRESS
: // nonblocking mode - first return
482 case EALREADY
: // nonblocking mode - subsequent returns
484 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
485 memcached_return_t rc
= connect_poll(server
);
487 if (memcached_success(rc
))
489 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
490 return MEMCACHED_SUCCESS
;
493 // A timeout here is treated as an error, we will not retry
494 if (rc
== MEMCACHED_TIMEOUT
)
496 timeout_error_occured
= true;
501 case EISCONN
: // we are connected :-)
502 WATCHPOINT_ASSERT(0); // This is a programmer's error
505 case EINTR
: // Special case, we retry ai_addr
506 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
507 (void)closesocket(server
->fd
);
508 server
->fd
= INVALID_SOCKET
;
515 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
516 (void)closesocket(server
->fd
);
517 server
->fd
= INVALID_SOCKET
;
518 server
->address_info_next
= server
->address_info_next
->ai_next
;
521 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
523 if (timeout_error_occured
)
525 if (server
->fd
!= INVALID_SOCKET
)
527 (void)closesocket(server
->fd
);
528 server
->fd
= INVALID_SOCKET
;
532 WATCHPOINT_STRING("Never got a good file descriptor");
534 if (memcached_has_current_error(*server
))
536 return memcached_server_error_return(server
);
539 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
541 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
544 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
551 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
552 we get caught spending cycles just waiting.
554 static memcached_return_t
backoff_handling(memcached_server_write_instance_st server
, bool& in_timeout
)
556 struct timeval curr_time
;
557 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
560 If we hit server_failure_limit then something is completely wrong about the server.
562 1) If autoeject is enabled we do that.
563 2) If not? We go into timeout again, there is much else to do :(
565 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
568 We just auto_eject if we hit this point
570 if (_is_auto_eject_host(server
->root
))
572 set_last_disconnected_host(server
);
574 // Retry dead servers if requested
575 if (_gettime_success
and server
->root
->dead_timeout
> 0)
577 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
579 // We only retry dead servers once before assuming failure again
580 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
583 memcached_return_t rc
;
584 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
586 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
589 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
592 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
594 // Sanity check/setting
595 if (server
->next_retry
== 0)
597 server
->next_retry
= 1;
601 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
604 If next_retry is less then our current time, then we reset and try everything again.
606 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
608 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
612 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
618 return MEMCACHED_SUCCESS
;
621 static memcached_return_t
_memcached_connect(memcached_server_write_instance_st server
, const bool set_last_disconnected
)
623 if (server
->fd
!= INVALID_SOCKET
)
625 return MEMCACHED_SUCCESS
;
628 LIBMEMCACHED_MEMCACHED_CONNECT_START();
630 bool in_timeout
= false;
631 memcached_return_t rc
;
632 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
634 set_last_disconnected_host(server
);
638 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
640 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
643 if (server
->hostname
[0] == '/')
645 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
648 /* We need to clean up the multi startup piece */
649 switch (server
->type
)
651 case MEMCACHED_CONNECTION_UDP
:
652 case MEMCACHED_CONNECTION_TCP
:
653 rc
= network_connect(server
);
655 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
657 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
659 rc
= memcached_sasl_authenticate_connection(server
);
660 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
662 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
663 (void)closesocket(server
->fd
);
664 server
->fd
= INVALID_SOCKET
;
670 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
671 rc
= unix_socket_connect(server
);
675 if (memcached_success(rc
))
677 memcached_mark_server_as_clean(server
);
680 else if (set_last_disconnected
)
682 set_last_disconnected_host(server
);
683 if (memcached_has_current_error(*server
))
685 memcached_mark_server_for_timeout(server
);
686 assert(memcached_failed(memcached_server_error_return(server
)));
690 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
691 memcached_mark_server_for_timeout(server
);
694 LIBMEMCACHED_MEMCACHED_CONNECT_END();
699 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port
));
700 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
707 memcached_return_t
memcached_connect_try(memcached_server_write_instance_st server
)
709 return _memcached_connect(server
, false);
712 memcached_return_t
memcached_connect(memcached_server_write_instance_st server
)
714 return _memcached_connect(server
, true);