1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
44 # define SOCK_CLOEXEC 0
48 # define SOCK_NONBLOCK 0
56 # define SO_NOSIGPIPE 0
60 # define TCP_NODELAY 0
64 # define TCP_KEEPIDLE 0
67 static memcached_return_t
connect_poll(org::libmemcached::Instance
* server
)
70 fds
[0].fd
= server
->fd
;
71 fds
[0].events
= server
->events();
76 if (server
->root
->poll_timeout
== 0)
78 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
81 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
84 if ((number_of
= poll(fds
, 1, server
->root
->connect_timeout
)) <= 0)
88 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
91 #ifdef TARGET_OS_LINUX
99 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
102 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
104 default: // This should not happen
105 if (fds
[0].revents
& POLLERR
)
108 socklen_t len
= sizeof(err
);
109 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
113 // This should never happen, if it does? Punt.
120 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
121 (void)closesocket(server
->fd
);
122 server
->fd
= INVALID_SOCKET
;
123 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
125 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
128 assert(number_of
== 0);
130 server
->io_wait_count
.timeouts
++;
131 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
135 server
->revents(fds
[0].revents
);
138 if (fds
[0].revents
& POLLERR
or
139 fds
[0].revents
& POLLHUP
or
140 fds
[0].revents
& POLLNVAL
)
143 socklen_t len
= sizeof (err
);
144 if (getsockopt(fds
[0].fd
, SOL_SOCKET
, SO_ERROR
, (char*)&err
, &len
) == 0)
146 // We check the value to see what happened wth the socket.
149 return MEMCACHED_SUCCESS
;
154 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
156 assert(fds
[0].revents
& POLLIN
or fds
[0].revents
& POLLOUT
);
158 return MEMCACHED_SUCCESS
;
161 // This should only be possible from ERESTART or EINTR;
162 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
165 static memcached_return_t
set_hostinfo(org::libmemcached::Instance
* server
)
167 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
168 if (server
->address_info
)
170 freeaddrinfo(server
->address_info
);
171 server
->address_info
= NULL
;
172 server
->address_info_next
= NULL
;
175 char str_port
[MEMCACHED_NI_MAXSERV
];
176 int length
= snprintf(str_port
, MEMCACHED_NI_MAXSERV
, "%u", uint32_t(server
->port()));
177 if (length
>= MEMCACHED_NI_MAXSERV
or length
<= 0)
179 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
180 memcached_literal_param("snprintf(NI_MAXSERV)"));
183 struct addrinfo hints
;
184 memset(&hints
, 0, sizeof(struct addrinfo
));
187 hints
.ai_family
= AF_INET
;
189 if (memcached_is_udp(server
->root
))
191 hints
.ai_protocol
= IPPROTO_UDP
;
192 hints
.ai_socktype
= SOCK_DGRAM
;
196 hints
.ai_socktype
= SOCK_STREAM
;
197 hints
.ai_protocol
= IPPROTO_TCP
;
200 assert(server
->address_info
== NULL
);
201 assert(server
->address_info_next
== NULL
);
203 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
209 if (server
->address_info
)
211 freeaddrinfo(server
->address_info
);
212 server
->address_info
= NULL
;
213 server
->address_info_next
= NULL
;
215 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
218 if (server
->address_info
)
220 freeaddrinfo(server
->address_info
);
221 server
->address_info
= NULL
;
222 server
->address_info_next
= NULL
;
224 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
227 if (server
->address_info
)
229 freeaddrinfo(server
->address_info
);
230 server
->address_info
= NULL
;
231 server
->address_info_next
= NULL
;
233 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
236 if (server
->address_info
)
238 freeaddrinfo(server
->address_info
);
239 server
->address_info
= NULL
;
240 server
->address_info_next
= NULL
;
242 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
246 if (server
->address_info
)
248 freeaddrinfo(server
->address_info
);
249 server
->address_info
= NULL
;
250 server
->address_info_next
= NULL
;
252 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
255 server
->address_info_next
= server
->address_info
;
256 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
258 return MEMCACHED_SUCCESS
;
261 static inline void set_socket_nonblocking(org::libmemcached::Instance
* server
)
265 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
267 memcached_set_errno(*server
, get_socket_errno(), NULL
);
272 if (SOCK_NONBLOCK
== 0)
276 flags
= fcntl(server
->fd
, F_GETFL
, 0);
277 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
281 memcached_set_errno(*server
, errno
, NULL
);
283 else if ((flags
& O_NONBLOCK
) == 0)
289 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
290 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
294 memcached_set_errno(*server
, errno
, NULL
);
301 static bool set_socket_options(org::libmemcached::Instance
* server
)
303 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
306 // If SOCK_CLOEXEC exists then we don't need to call the following
307 if (SOCK_CLOEXEC
== 0)
314 flags
= fcntl(server
->fd
, F_GETFD
, 0);
315 } while (flags
== -1 and (errno
== EINTR
or errno
== EAGAIN
));
322 rval
= fcntl (server
->fd
, F_SETFD
, flags
| FD_CLOEXEC
);
323 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
324 // we currently ignore the case where rval is -1
330 if (memcached_is_udp(server
->root
))
336 if (server
->root
->snd_timeout
> 0)
338 struct timeval waittime
;
340 waittime
.tv_sec
= server
->root
->snd_timeout
/ 1000000;
341 waittime
.tv_usec
= server
->root
->snd_timeout
% 1000000;
343 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
344 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
351 if (server
->root
->rcv_timeout
> 0)
353 struct timeval waittime
;
355 waittime
.tv_sec
= server
->root
->rcv_timeout
/ 1000000;
356 waittime
.tv_usec
= server
->root
->rcv_timeout
% 1000000;
358 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
359 (char*)&waittime
, (socklen_t
)sizeof(struct timeval
));
366 #if defined(SO_NOSIGPIPE)
370 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
374 // This is not considered a fatal error
378 perror("setsockopt(SO_NOSIGPIPE)");
384 if (server
->root
->flags
.no_block
)
386 struct linger linger
;
389 linger
.l_linger
= 0; /* By default on close() just drop the socket */
390 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
391 (char*)&linger
, (socklen_t
)sizeof(struct linger
));
398 if (server
->root
->flags
.tcp_nodelay
)
402 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
403 (char*)&flag
, (socklen_t
)sizeof(int));
409 if (server
->root
->flags
.tcp_keepalive
)
413 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
414 (char*)&flag
, (socklen_t
)sizeof(int));
421 if (server
->root
->tcp_keepidle
> 0)
423 int error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
424 (char*)&server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
430 if (server
->root
->send_size
> 0)
432 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
433 (char*)&server
->root
->send_size
, (socklen_t
)sizeof(int));
438 if (server
->root
->recv_size
> 0)
440 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
441 (char*)&server
->root
->recv_size
, (socklen_t
)sizeof(int));
446 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
447 set_socket_nonblocking(server
);
452 static memcached_return_t
unix_socket_connect(org::libmemcached::Instance
* server
)
455 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
458 int type
= SOCK_STREAM
;
466 type
|= SOCK_NONBLOCK
;
469 if ((server
->fd
= socket(AF_UNIX
, type
, 0)) < 0)
471 return memcached_set_errno(*server
, errno
, NULL
);
474 struct sockaddr_un servAddr
;
476 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
477 servAddr
.sun_family
= AF_UNIX
;
478 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
480 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
486 server
->events(POLLOUT
);
490 (void)closesocket(server
->fd
);
491 server
->fd
= INVALID_SOCKET
;
494 case EISCONN
: /* We were spinning waiting on connect */
496 assert(0); // Programmer error
497 (void)closesocket(server
->fd
);
498 server
->fd
= INVALID_SOCKET
;
503 WATCHPOINT_ERRNO(errno
);
504 (void)closesocket(server
->fd
);
505 server
->fd
= INVALID_SOCKET
;
506 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
510 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
512 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
514 return MEMCACHED_SUCCESS
;
517 return MEMCACHED_NOT_SUPPORTED
;
521 static memcached_return_t
network_connect(org::libmemcached::Instance
* server
)
523 bool timeout_error_occured
= false;
525 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
526 WATCHPOINT_ASSERT(server
->cursor_active_
== 0);
529 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
531 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
533 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
534 server
->address_info_next
= NULL
;
535 memcached_return_t rc
= set_hostinfo(server
);
537 if (memcached_failed(rc
))
543 if (server
->address_info_next
== NULL
)
545 server
->address_info_next
= server
->address_info
;
546 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
549 /* Create the socket */
550 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
552 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
553 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
555 server
->address_info_next
= server
->address_info_next
->ai_next
;
559 int type
= server
->address_info_next
->ai_socktype
;
567 type
|= SOCK_NONBLOCK
;
570 server
->fd
= socket(server
->address_info_next
->ai_family
,
572 server
->address_info_next
->ai_protocol
);
574 if (int(server
->fd
) == SOCKET_ERROR
)
576 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
579 if (set_socket_options(server
) == false)
581 (void)closesocket(server
->fd
);
582 return MEMCACHED_CONNECTION_FAILURE
;
585 /* connect to server */
586 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
588 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
589 return MEMCACHED_SUCCESS
;
592 /* An error occurred */
593 switch (get_socket_errno())
596 timeout_error_occured
= true;
600 #if EWOULDBLOCK != EAGAIN
603 case EINPROGRESS
: // nonblocking mode - first return
604 case EALREADY
: // nonblocking mode - subsequent returns
606 server
->events(POLLOUT
);
607 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
608 memcached_return_t rc
= connect_poll(server
);
610 if (memcached_success(rc
))
612 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
613 return MEMCACHED_SUCCESS
;
616 // A timeout here is treated as an error, we will not retry
617 if (rc
== MEMCACHED_TIMEOUT
)
619 timeout_error_occured
= true;
624 case EISCONN
: // we are connected :-)
625 WATCHPOINT_ASSERT(0); // This is a programmer's error
628 case EINTR
: // Special case, we retry ai_addr
629 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
630 (void)closesocket(server
->fd
);
631 server
->fd
= INVALID_SOCKET
;
635 // Probably not running service
641 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
642 (void)closesocket(server
->fd
);
643 server
->fd
= INVALID_SOCKET
;
644 server
->address_info_next
= server
->address_info_next
->ai_next
;
647 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
649 if (timeout_error_occured
)
651 if (server
->fd
!= INVALID_SOCKET
)
653 (void)closesocket(server
->fd
);
654 server
->fd
= INVALID_SOCKET
;
658 WATCHPOINT_STRING("Never got a good file descriptor");
660 if (memcached_has_current_error(*server
))
662 return memcached_instance_error_return(server
);
665 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
667 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
670 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
677 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
678 we get caught spending cycles just waiting.
680 static memcached_return_t
backoff_handling(org::libmemcached::Instance
* server
, bool& in_timeout
)
682 struct timeval curr_time
;
683 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
686 If we hit server_failure_limit then something is completely wrong about the server.
688 1) If autoeject is enabled we do that.
689 2) If not? We go into timeout again, there is much else to do :(
691 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
694 We just auto_eject if we hit this point
696 if (_is_auto_eject_host(server
->root
))
698 set_last_disconnected_host(server
);
700 // Retry dead servers if requested
701 if (_gettime_success
and server
->root
->dead_timeout
> 0)
703 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
705 // We only retry dead servers once before assuming failure again
706 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
709 memcached_return_t rc
;
710 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
712 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
715 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
718 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
720 // Sanity check/setting
721 if (server
->next_retry
== 0)
723 server
->next_retry
= 1;
727 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
730 If next_retry is less then our current time, then we reset and try everything again.
732 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
734 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
738 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
744 return MEMCACHED_SUCCESS
;
747 static memcached_return_t
_memcached_connect(org::libmemcached::Instance
* server
, const bool set_last_disconnected
)
750 if (server
->fd
!= INVALID_SOCKET
)
752 return MEMCACHED_SUCCESS
;
755 LIBMEMCACHED_MEMCACHED_CONNECT_START();
757 bool in_timeout
= false;
758 memcached_return_t rc
;
759 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
761 set_last_disconnected_host(server
);
765 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
767 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
770 if (server
->hostname
[0] == '/')
772 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
775 /* We need to clean up the multi startup piece */
776 switch (server
->type
)
778 case MEMCACHED_CONNECTION_UDP
:
779 case MEMCACHED_CONNECTION_TCP
:
780 rc
= network_connect(server
);
782 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
784 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
786 rc
= memcached_sasl_authenticate_connection(server
);
787 fprintf(stderr
, "%s:%d %s\n", __FILE__
, __LINE__
, memcached_strerror(NULL
, rc
));
788 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
790 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
791 (void)closesocket(server
->fd
);
792 server
->fd
= INVALID_SOCKET
;
798 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
799 rc
= unix_socket_connect(server
);
803 if (memcached_success(rc
))
805 server
->mark_server_as_clean();
806 memcached_version_instance(server
);
809 else if (set_last_disconnected
)
811 set_last_disconnected_host(server
);
812 if (memcached_has_current_error(*server
))
814 memcached_mark_server_for_timeout(server
);
815 assert(memcached_failed(memcached_instance_error_return(server
)));
819 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
820 memcached_mark_server_for_timeout(server
);
823 LIBMEMCACHED_MEMCACHED_CONNECT_END();
828 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port()));
829 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
836 memcached_return_t
memcached_connect(org::libmemcached::Instance
* server
)
838 return _memcached_connect(server
, true);