1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
45 static memcached_return_t
connect_poll(memcached_server_st
*server
)
48 fds
[0].fd
= server
->fd
;
49 fds
[0].events
= POLLOUT
;
53 if (server
->root
->poll_timeout
== 0)
55 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
58 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
60 int error
= poll(fds
, 1, server
->root
->connect_timeout
);
66 socklen_t len
= sizeof (err
);
67 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
69 // We check the value to see what happened wth the socket.
72 return MEMCACHED_SUCCESS
;
77 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
81 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
84 default: // A real error occurred and we need to completely bail
85 switch (get_socket_errno())
87 #ifdef TARGET_OS_LINUX
95 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
98 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
100 default: // This should not happen
101 if (fds
[0].revents
& POLLERR
)
104 socklen_t len
= sizeof(err
);
105 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
109 // This should never happen, if it does? Punt.
116 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
118 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
119 (void)closesocket(server
->fd
);
120 server
->fd
= INVALID_SOCKET
;
121 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
123 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
128 // This should only be possible from ERESTART or EINTR;
129 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
132 memcached_return_t
set_hostinfo(memcached_server_st
*server
)
134 if (server
->address_info
)
136 freeaddrinfo(server
->address_info
);
137 server
->address_info
= NULL
;
138 server
->address_info_next
= NULL
;
141 char str_port
[NI_MAXSERV
];
142 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
143 if (length
>= NI_MAXSERV
or length
< 0)
145 return MEMCACHED_FAILURE
;
148 struct addrinfo hints
;
149 memset(&hints
, 0, sizeof(struct addrinfo
));
152 hints
.ai_family
= AF_INET
;
154 if (memcached_is_udp(server
->root
))
156 hints
.ai_protocol
= IPPROTO_UDP
;
157 hints
.ai_socktype
= SOCK_DGRAM
;
161 hints
.ai_socktype
= SOCK_STREAM
;
162 hints
.ai_protocol
= IPPROTO_TCP
;
165 server
->address_info
= NULL
;
167 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
173 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
176 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
179 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
182 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
186 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
189 server
->address_info_next
= server
->address_info
;
190 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
192 return MEMCACHED_SUCCESS
;
195 static inline void set_socket_nonblocking(memcached_server_st
*server
)
199 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
201 memcached_set_errno(*server
, get_socket_errno(), NULL
);
208 flags
= fcntl(server
->fd
, F_GETFL
, 0);
209 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
213 memcached_set_errno(*server
, errno
, NULL
);
215 else if ((flags
& O_NONBLOCK
) == 0)
221 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
222 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
226 memcached_set_errno(*server
, errno
, NULL
);
232 static void set_socket_options(memcached_server_st
*server
)
234 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
236 if (memcached_is_udp(server
->root
))
242 if (server
->root
->snd_timeout
)
245 struct timeval waittime
;
248 waittime
.tv_usec
= server
->root
->snd_timeout
;
250 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
251 &waittime
, (socklen_t
)sizeof(struct timeval
));
252 WATCHPOINT_ASSERT(error
== 0);
257 if (server
->root
->rcv_timeout
)
260 struct timeval waittime
;
263 waittime
.tv_usec
= server
->root
->rcv_timeout
;
265 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
266 &waittime
, (socklen_t
)sizeof(struct timeval
));
267 WATCHPOINT_ASSERT(error
== 0);
272 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
275 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
277 // This is not considered a fatal error
280 WATCHPOINT_ERRNO(get_socket_errno());
281 perror("setsockopt(SO_NOSIGPIPE)");
286 if (server
->root
->flags
.no_block
)
289 struct linger linger
;
292 linger
.l_linger
= 0; /* By default on close() just drop the socket */
293 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
294 &linger
, (socklen_t
)sizeof(struct linger
));
295 WATCHPOINT_ASSERT(error
== 0);
298 if (server
->root
->flags
.tcp_nodelay
)
303 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
304 &flag
, (socklen_t
)sizeof(int));
305 WATCHPOINT_ASSERT(error
== 0);
308 if (server
->root
->flags
.tcp_keepalive
)
313 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
314 &flag
, (socklen_t
)sizeof(int));
315 WATCHPOINT_ASSERT(error
== 0);
319 if (server
->root
->tcp_keepidle
> 0)
323 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
324 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
325 WATCHPOINT_ASSERT(error
== 0);
329 if (server
->root
->send_size
> 0)
333 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
334 &server
->root
->send_size
, (socklen_t
)sizeof(int));
335 WATCHPOINT_ASSERT(error
== 0);
338 if (server
->root
->recv_size
> 0)
342 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
343 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
344 WATCHPOINT_ASSERT(error
== 0);
348 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
349 set_socket_nonblocking(server
);
352 static memcached_return_t
unix_socket_connect(memcached_server_st
*server
)
355 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
357 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
359 memcached_set_errno(*server
, errno
, NULL
);
360 return MEMCACHED_CONNECTION_FAILURE
;
363 struct sockaddr_un servAddr
;
365 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
366 servAddr
.sun_family
= AF_UNIX
;
367 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
370 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
379 case EISCONN
: /* We were spinning waiting on connect */
381 WATCHPOINT_ASSERT(0); // Programmer error
386 WATCHPOINT_ERRNO(errno
);
387 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
388 return MEMCACHED_CONNECTION_FAILURE
;
392 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
394 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
396 return MEMCACHED_SUCCESS
;
399 return MEMCACHED_NOT_SUPPORTED
;
403 static memcached_return_t
network_connect(memcached_server_st
*server
)
405 bool timeout_error_occured
= false;
407 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
408 WATCHPOINT_ASSERT(server
->cursor_active
== 0);
411 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
413 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
415 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
416 server
->address_info_next
= NULL
;
417 memcached_return_t rc
;
421 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
427 struct timespec dream
, rem
;
432 nanosleep(&dream
, &rem
);
436 if (memcached_failed(rc
))
442 if (server
->address_info_next
== NULL
)
444 server
->address_info_next
= server
->address_info
;
445 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
448 /* Create the socket */
449 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
451 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
452 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
454 server
->address_info_next
= server
->address_info_next
->ai_next
;
458 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
459 server
->address_info_next
->ai_socktype
,
460 server
->address_info_next
->ai_protocol
)) < 0)
462 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
465 set_socket_options(server
);
467 /* connect to server */
468 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
470 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
471 return MEMCACHED_SUCCESS
;
474 /* An error occurred */
475 switch (get_socket_errno())
478 timeout_error_occured
= true;
482 case EINPROGRESS
: // nonblocking mode - first return
483 case EALREADY
: // nonblocking mode - subsequent returns
485 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
486 memcached_return_t rc
= connect_poll(server
);
488 if (memcached_success(rc
))
490 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
491 return MEMCACHED_SUCCESS
;
494 // A timeout here is treated as an error, we will not retry
495 if (rc
== MEMCACHED_TIMEOUT
)
497 timeout_error_occured
= true;
502 case EISCONN
: // we are connected :-)
503 WATCHPOINT_ASSERT(0); // This is a programmer's error
506 case EINTR
: // Special case, we retry ai_addr
507 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
508 (void)closesocket(server
->fd
);
509 server
->fd
= INVALID_SOCKET
;
516 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
517 (void)closesocket(server
->fd
);
518 server
->fd
= INVALID_SOCKET
;
519 server
->address_info_next
= server
->address_info_next
->ai_next
;
522 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
524 if (timeout_error_occured
)
526 if (server
->fd
!= INVALID_SOCKET
)
528 (void)closesocket(server
->fd
);
529 server
->fd
= INVALID_SOCKET
;
533 WATCHPOINT_STRING("Never got a good file descriptor");
535 if (memcached_has_current_error(*server
))
537 return memcached_server_error_return(server
);
540 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
542 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
545 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
552 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
553 we get caught spending cycles just waiting.
555 static memcached_return_t
backoff_handling(memcached_server_write_instance_st server
, bool& in_timeout
)
558 If we hit server_failure_limit then something is completely wrong about the server.
560 1) If autoeject is enabled we do that.
561 2) If not? We go into timeout again, there is much else to do :(
563 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
566 We just auto_eject if we hit this point
568 if (_is_auto_eject_host(server
->root
))
570 set_last_disconnected_host(server
);
571 run_distribution((memcached_st
*)server
->root
);
573 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
576 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
578 // Sanity check/setting
579 if (server
->next_retry
== 0)
581 server
->next_retry
= 1;
585 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
587 struct timeval curr_time
;
588 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
591 If next_retry is less then our current time, then we reset and try everything again.
593 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
595 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
599 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
605 return MEMCACHED_SUCCESS
;
608 memcached_return_t
memcached_connect(memcached_server_write_instance_st server
)
610 if (server
->fd
!= INVALID_SOCKET
)
612 return MEMCACHED_SUCCESS
;
615 LIBMEMCACHED_MEMCACHED_CONNECT_START();
617 bool in_timeout
= false;
618 memcached_return_t rc
;
619 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
621 set_last_disconnected_host(server
);
625 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
627 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
630 /* We need to clean up the multi startup piece */
631 switch (server
->type
)
633 case MEMCACHED_CONNECTION_UDP
:
634 case MEMCACHED_CONNECTION_TCP
:
635 rc
= network_connect(server
);
637 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
639 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
641 rc
= memcached_sasl_authenticate_connection(server
);
642 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
644 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
645 (void)closesocket(server
->fd
);
646 server
->fd
= INVALID_SOCKET
;
652 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
653 rc
= unix_socket_connect(server
);
657 if (memcached_success(rc
))
659 memcached_mark_server_as_clean(server
);
663 set_last_disconnected_host(server
);
664 if (memcached_has_current_error(*server
))
666 memcached_mark_server_for_timeout(server
);
667 assert(memcached_failed(memcached_server_error_return(server
)));
671 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
672 memcached_mark_server_for_timeout(server
);
675 LIBMEMCACHED_MEMCACHED_CONNECT_END();
679 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);