1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
45 static memcached_return_t
connect_poll(memcached_server_st
*server
)
48 fds
[0].fd
= server
->fd
;
49 fds
[0].events
= POLLOUT
;
53 if (server
->root
->poll_timeout
== 0)
55 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
58 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
60 int error
= poll(fds
, 1, server
->root
->connect_timeout
);
66 socklen_t len
= sizeof (err
);
67 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
69 // We check the value to see what happened wth the socket.
72 return MEMCACHED_SUCCESS
;
77 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
81 server
->io_wait_count
.timeouts
++;
82 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
85 default: // A real error occurred and we need to completely bail
86 switch (get_socket_errno())
88 #ifdef TARGET_OS_LINUX
96 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
99 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
101 default: // This should not happen
102 if (fds
[0].revents
& POLLERR
)
105 socklen_t len
= sizeof(err
);
106 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
110 // This should never happen, if it does? Punt.
117 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
119 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
120 (void)closesocket(server
->fd
);
121 server
->fd
= INVALID_SOCKET
;
122 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
124 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
129 // This should only be possible from ERESTART or EINTR;
130 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
133 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
135 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
136 if (server
->address_info
)
138 freeaddrinfo(server
->address_info
);
139 server
->address_info
= NULL
;
140 server
->address_info_next
= NULL
;
143 char str_port
[NI_MAXSERV
];
144 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
145 if (length
>= NI_MAXSERV
or length
< 0)
147 return MEMCACHED_FAILURE
;
150 struct addrinfo hints
;
151 memset(&hints
, 0, sizeof(struct addrinfo
));
154 hints
.ai_family
= AF_INET
;
156 if (memcached_is_udp(server
->root
))
158 hints
.ai_protocol
= IPPROTO_UDP
;
159 hints
.ai_socktype
= SOCK_DGRAM
;
163 hints
.ai_socktype
= SOCK_STREAM
;
164 hints
.ai_protocol
= IPPROTO_TCP
;
167 server
->address_info
= NULL
;
169 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
175 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
178 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
181 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
184 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
188 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
191 server
->address_info_next
= server
->address_info
;
192 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
194 return MEMCACHED_SUCCESS
;
197 static inline void set_socket_nonblocking(memcached_server_st
*server
)
201 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
203 memcached_set_errno(*server
, get_socket_errno(), NULL
);
210 flags
= fcntl(server
->fd
, F_GETFL
, 0);
211 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
215 memcached_set_errno(*server
, errno
, NULL
);
217 else if ((flags
& O_NONBLOCK
) == 0)
223 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
224 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
228 memcached_set_errno(*server
, errno
, NULL
);
234 static void set_socket_options(memcached_server_st
*server
)
236 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
238 if (memcached_is_udp(server
->root
))
244 if (server
->root
->snd_timeout
)
247 struct timeval waittime
;
250 waittime
.tv_usec
= server
->root
->snd_timeout
;
252 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
253 &waittime
, (socklen_t
)sizeof(struct timeval
));
254 WATCHPOINT_ASSERT(error
== 0);
259 if (server
->root
->rcv_timeout
)
262 struct timeval waittime
;
265 waittime
.tv_usec
= server
->root
->rcv_timeout
;
267 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
268 &waittime
, (socklen_t
)sizeof(struct timeval
));
269 WATCHPOINT_ASSERT(error
== 0);
274 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
277 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
279 // This is not considered a fatal error
282 WATCHPOINT_ERRNO(get_socket_errno());
283 perror("setsockopt(SO_NOSIGPIPE)");
288 if (server
->root
->flags
.no_block
)
291 struct linger linger
;
294 linger
.l_linger
= 0; /* By default on close() just drop the socket */
295 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
296 &linger
, (socklen_t
)sizeof(struct linger
));
297 WATCHPOINT_ASSERT(error
== 0);
300 if (server
->root
->flags
.tcp_nodelay
)
305 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
306 &flag
, (socklen_t
)sizeof(int));
307 WATCHPOINT_ASSERT(error
== 0);
310 if (server
->root
->flags
.tcp_keepalive
)
315 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
316 &flag
, (socklen_t
)sizeof(int));
317 WATCHPOINT_ASSERT(error
== 0);
321 if (server
->root
->tcp_keepidle
> 0)
325 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
326 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
327 WATCHPOINT_ASSERT(error
== 0);
331 if (server
->root
->send_size
> 0)
335 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
336 &server
->root
->send_size
, (socklen_t
)sizeof(int));
337 WATCHPOINT_ASSERT(error
== 0);
340 if (server
->root
->recv_size
> 0)
344 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
345 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
346 WATCHPOINT_ASSERT(error
== 0);
350 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
351 set_socket_nonblocking(server
);
354 static memcached_return_t
unix_socket_connect(memcached_server_st
*server
)
357 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
359 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
361 memcached_set_errno(*server
, errno
, NULL
);
362 return MEMCACHED_CONNECTION_FAILURE
;
365 struct sockaddr_un servAddr
;
367 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
368 servAddr
.sun_family
= AF_UNIX
;
369 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
372 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
381 case EISCONN
: /* We were spinning waiting on connect */
383 WATCHPOINT_ASSERT(0); // Programmer error
388 WATCHPOINT_ERRNO(errno
);
389 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
390 return MEMCACHED_CONNECTION_FAILURE
;
394 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
396 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
398 return MEMCACHED_SUCCESS
;
401 return MEMCACHED_NOT_SUPPORTED
;
405 static memcached_return_t
network_connect(memcached_server_st
*server
)
407 bool timeout_error_occured
= false;
409 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
410 WATCHPOINT_ASSERT(server
->cursor_active
== 0);
413 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
415 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
417 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
418 server
->address_info_next
= NULL
;
419 memcached_return_t rc
;
423 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
429 struct timespec dream
, rem
;
434 nanosleep(&dream
, &rem
);
438 if (memcached_failed(rc
))
444 if (server
->address_info_next
== NULL
)
446 server
->address_info_next
= server
->address_info
;
447 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
450 /* Create the socket */
451 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
453 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
454 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
456 server
->address_info_next
= server
->address_info_next
->ai_next
;
460 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
461 server
->address_info_next
->ai_socktype
,
462 server
->address_info_next
->ai_protocol
)) < 0)
464 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
467 set_socket_options(server
);
469 /* connect to server */
470 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
472 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
473 return MEMCACHED_SUCCESS
;
476 /* An error occurred */
477 switch (get_socket_errno())
480 timeout_error_occured
= true;
484 #if EWOULDBLOCK != EAGAIN
487 case EINPROGRESS
: // nonblocking mode - first return
488 case EALREADY
: // nonblocking mode - subsequent returns
490 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
491 memcached_return_t rc
= connect_poll(server
);
493 if (memcached_success(rc
))
495 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
496 return MEMCACHED_SUCCESS
;
499 // A timeout here is treated as an error, we will not retry
500 if (rc
== MEMCACHED_TIMEOUT
)
502 timeout_error_occured
= true;
507 case EISCONN
: // we are connected :-)
508 WATCHPOINT_ASSERT(0); // This is a programmer's error
511 case EINTR
: // Special case, we retry ai_addr
512 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
513 (void)closesocket(server
->fd
);
514 server
->fd
= INVALID_SOCKET
;
521 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
522 (void)closesocket(server
->fd
);
523 server
->fd
= INVALID_SOCKET
;
524 server
->address_info_next
= server
->address_info_next
->ai_next
;
527 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
529 if (timeout_error_occured
)
531 if (server
->fd
!= INVALID_SOCKET
)
533 (void)closesocket(server
->fd
);
534 server
->fd
= INVALID_SOCKET
;
538 WATCHPOINT_STRING("Never got a good file descriptor");
540 if (memcached_has_current_error(*server
))
542 return memcached_server_error_return(server
);
545 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
547 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
550 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
557 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
558 we get caught spending cycles just waiting.
560 static memcached_return_t
backoff_handling(memcached_server_write_instance_st server
, bool& in_timeout
)
562 struct timeval curr_time
;
563 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
566 If we hit server_failure_limit then something is completely wrong about the server.
568 1) If autoeject is enabled we do that.
569 2) If not? We go into timeout again, there is much else to do :(
571 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
574 We just auto_eject if we hit this point
576 if (_is_auto_eject_host(server
->root
))
578 set_last_disconnected_host(server
);
580 // Retry dead servers if requested
581 if (_gettime_success
and server
->root
->dead_timeout
> 0)
583 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
585 // We only retry dead servers once before assuming failure again
586 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
589 memcached_return_t rc
;
590 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
592 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
595 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
598 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
600 // Sanity check/setting
601 if (server
->next_retry
== 0)
603 server
->next_retry
= 1;
607 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
610 If next_retry is less then our current time, then we reset and try everything again.
612 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
614 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
618 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
624 return MEMCACHED_SUCCESS
;
627 static memcached_return_t
_memcached_connect(memcached_server_write_instance_st server
, const bool set_last_disconnected
)
629 if (server
->fd
!= INVALID_SOCKET
)
631 return MEMCACHED_SUCCESS
;
634 LIBMEMCACHED_MEMCACHED_CONNECT_START();
636 bool in_timeout
= false;
637 memcached_return_t rc
;
638 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
640 set_last_disconnected_host(server
);
644 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
646 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
649 if (server
->hostname
[0] == '/')
651 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
654 /* We need to clean up the multi startup piece */
655 switch (server
->type
)
657 case MEMCACHED_CONNECTION_UDP
:
658 case MEMCACHED_CONNECTION_TCP
:
659 rc
= network_connect(server
);
661 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
663 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
665 rc
= memcached_sasl_authenticate_connection(server
);
666 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
668 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
669 (void)closesocket(server
->fd
);
670 server
->fd
= INVALID_SOCKET
;
676 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
677 rc
= unix_socket_connect(server
);
681 if (memcached_success(rc
))
683 memcached_mark_server_as_clean(server
);
686 else if (set_last_disconnected
)
688 set_last_disconnected_host(server
);
689 if (memcached_has_current_error(*server
))
691 memcached_mark_server_for_timeout(server
);
692 assert(memcached_failed(memcached_server_error_return(server
)));
696 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
697 memcached_mark_server_for_timeout(server
);
700 LIBMEMCACHED_MEMCACHED_CONNECT_END();
705 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port
));
706 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
713 memcached_return_t
memcached_connect_try(memcached_server_write_instance_st server
)
715 return _memcached_connect(server
, false);
718 memcached_return_t
memcached_connect(memcached_server_write_instance_st server
)
720 return _memcached_connect(server
, true);