1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
45 static memcached_return_t
connect_poll(memcached_server_st
*server
)
48 fds
[0].fd
= server
->fd
;
49 fds
[0].events
= POLLOUT
;
53 if (server
->root
->poll_timeout
== 0)
55 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
58 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
60 int error
= poll(fds
, 1, server
->root
->connect_timeout
);
66 socklen_t len
= sizeof (err
);
67 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
69 // We check the value to see what happened wth the socket.
72 return MEMCACHED_SUCCESS
;
77 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
81 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
84 default: // A real error occurred and we need to completely bail
85 switch (get_socket_errno())
87 #ifdef TARGET_OS_LINUX
95 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
98 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
100 default: // This should not happen
101 if (fds
[0].revents
& POLLERR
)
104 socklen_t len
= sizeof(err
);
105 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
109 // This should never happen, if it does? Punt.
116 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
118 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
119 (void)closesocket(server
->fd
);
120 server
->fd
= INVALID_SOCKET
;
121 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
123 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
128 // This should only be possible from ERESTART or EINTR;
129 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
132 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
134 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
135 if (server
->address_info
)
137 freeaddrinfo(server
->address_info
);
138 server
->address_info
= NULL
;
139 server
->address_info_next
= NULL
;
142 char str_port
[NI_MAXSERV
];
143 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
144 if (length
>= NI_MAXSERV
or length
< 0)
146 return MEMCACHED_FAILURE
;
149 struct addrinfo hints
;
150 memset(&hints
, 0, sizeof(struct addrinfo
));
153 hints
.ai_family
= AF_INET
;
155 if (memcached_is_udp(server
->root
))
157 hints
.ai_protocol
= IPPROTO_UDP
;
158 hints
.ai_socktype
= SOCK_DGRAM
;
162 hints
.ai_socktype
= SOCK_STREAM
;
163 hints
.ai_protocol
= IPPROTO_TCP
;
166 server
->address_info
= NULL
;
168 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
174 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
177 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
180 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
183 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
187 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
190 server
->address_info_next
= server
->address_info
;
191 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
193 return MEMCACHED_SUCCESS
;
196 static inline void set_socket_nonblocking(memcached_server_st
*server
)
200 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
202 memcached_set_errno(*server
, get_socket_errno(), NULL
);
209 flags
= fcntl(server
->fd
, F_GETFL
, 0);
210 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
214 memcached_set_errno(*server
, errno
, NULL
);
216 else if ((flags
& O_NONBLOCK
) == 0)
222 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
223 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
227 memcached_set_errno(*server
, errno
, NULL
);
233 static void set_socket_options(memcached_server_st
*server
)
235 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
237 if (memcached_is_udp(server
->root
))
243 if (server
->root
->snd_timeout
)
246 struct timeval waittime
;
249 waittime
.tv_usec
= server
->root
->snd_timeout
;
251 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
252 &waittime
, (socklen_t
)sizeof(struct timeval
));
253 WATCHPOINT_ASSERT(error
== 0);
258 if (server
->root
->rcv_timeout
)
261 struct timeval waittime
;
264 waittime
.tv_usec
= server
->root
->rcv_timeout
;
266 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
267 &waittime
, (socklen_t
)sizeof(struct timeval
));
268 WATCHPOINT_ASSERT(error
== 0);
273 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
276 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
278 // This is not considered a fatal error
281 WATCHPOINT_ERRNO(get_socket_errno());
282 perror("setsockopt(SO_NOSIGPIPE)");
287 if (server
->root
->flags
.no_block
)
290 struct linger linger
;
293 linger
.l_linger
= 0; /* By default on close() just drop the socket */
294 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
295 &linger
, (socklen_t
)sizeof(struct linger
));
296 WATCHPOINT_ASSERT(error
== 0);
299 if (server
->root
->flags
.tcp_nodelay
)
304 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
305 &flag
, (socklen_t
)sizeof(int));
306 WATCHPOINT_ASSERT(error
== 0);
309 if (server
->root
->flags
.tcp_keepalive
)
314 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
315 &flag
, (socklen_t
)sizeof(int));
316 WATCHPOINT_ASSERT(error
== 0);
320 if (server
->root
->tcp_keepidle
> 0)
324 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
325 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
326 WATCHPOINT_ASSERT(error
== 0);
330 if (server
->root
->send_size
> 0)
334 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
335 &server
->root
->send_size
, (socklen_t
)sizeof(int));
336 WATCHPOINT_ASSERT(error
== 0);
339 if (server
->root
->recv_size
> 0)
343 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
344 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
345 WATCHPOINT_ASSERT(error
== 0);
349 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
350 set_socket_nonblocking(server
);
353 static memcached_return_t
unix_socket_connect(memcached_server_st
*server
)
356 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
358 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
360 memcached_set_errno(*server
, errno
, NULL
);
361 return MEMCACHED_CONNECTION_FAILURE
;
364 struct sockaddr_un servAddr
;
366 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
367 servAddr
.sun_family
= AF_UNIX
;
368 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
371 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
380 case EISCONN
: /* We were spinning waiting on connect */
382 WATCHPOINT_ASSERT(0); // Programmer error
387 WATCHPOINT_ERRNO(errno
);
388 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
389 return MEMCACHED_CONNECTION_FAILURE
;
393 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
395 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
397 return MEMCACHED_SUCCESS
;
400 return MEMCACHED_NOT_SUPPORTED
;
404 static memcached_return_t
network_connect(memcached_server_st
*server
)
406 bool timeout_error_occured
= false;
408 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
409 WATCHPOINT_ASSERT(server
->cursor_active
== 0);
412 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
414 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
416 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
417 server
->address_info_next
= NULL
;
418 memcached_return_t rc
;
422 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
428 struct timespec dream
, rem
;
433 nanosleep(&dream
, &rem
);
437 if (memcached_failed(rc
))
443 if (server
->address_info_next
== NULL
)
445 server
->address_info_next
= server
->address_info
;
446 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
449 /* Create the socket */
450 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
452 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
453 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
455 server
->address_info_next
= server
->address_info_next
->ai_next
;
459 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
460 server
->address_info_next
->ai_socktype
,
461 server
->address_info_next
->ai_protocol
)) < 0)
463 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
466 set_socket_options(server
);
468 /* connect to server */
469 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
471 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
472 return MEMCACHED_SUCCESS
;
475 /* An error occurred */
476 switch (get_socket_errno())
479 timeout_error_occured
= true;
483 case EINPROGRESS
: // nonblocking mode - first return
484 case EALREADY
: // nonblocking mode - subsequent returns
486 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
487 memcached_return_t rc
= connect_poll(server
);
489 if (memcached_success(rc
))
491 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
492 return MEMCACHED_SUCCESS
;
495 // A timeout here is treated as an error, we will not retry
496 if (rc
== MEMCACHED_TIMEOUT
)
498 timeout_error_occured
= true;
503 case EISCONN
: // we are connected :-)
504 WATCHPOINT_ASSERT(0); // This is a programmer's error
507 case EINTR
: // Special case, we retry ai_addr
508 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
509 (void)closesocket(server
->fd
);
510 server
->fd
= INVALID_SOCKET
;
517 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
518 (void)closesocket(server
->fd
);
519 server
->fd
= INVALID_SOCKET
;
520 server
->address_info_next
= server
->address_info_next
->ai_next
;
523 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
525 if (timeout_error_occured
)
527 if (server
->fd
!= INVALID_SOCKET
)
529 (void)closesocket(server
->fd
);
530 server
->fd
= INVALID_SOCKET
;
534 WATCHPOINT_STRING("Never got a good file descriptor");
536 if (memcached_has_current_error(*server
))
538 return memcached_server_error_return(server
);
541 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
543 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
546 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
553 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
554 we get caught spending cycles just waiting.
556 static memcached_return_t
backoff_handling(memcached_server_write_instance_st server
, bool& in_timeout
)
558 struct timeval curr_time
;
559 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
562 If we hit server_failure_limit then something is completely wrong about the server.
564 1) If autoeject is enabled we do that.
565 2) If not? We go into timeout again, there is much else to do :(
567 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
570 We just auto_eject if we hit this point
572 if (_is_auto_eject_host(server
->root
))
574 set_last_disconnected_host(server
);
576 // Retry dead servers if requested
577 if (_gettime_success
and server
->root
->dead_timeout
> 0)
579 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
581 // We only retry dead servers once before assuming failure again
582 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
585 memcached_return_t rc
;
586 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
588 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
591 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
594 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
596 // Sanity check/setting
597 if (server
->next_retry
== 0)
599 server
->next_retry
= 1;
603 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
606 If next_retry is less then our current time, then we reset and try everything again.
608 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
610 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
614 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
620 return MEMCACHED_SUCCESS
;
623 static memcached_return_t
_memcached_connect(memcached_server_write_instance_st server
, const bool set_last_disconnected
)
625 if (server
->fd
!= INVALID_SOCKET
)
627 return MEMCACHED_SUCCESS
;
630 LIBMEMCACHED_MEMCACHED_CONNECT_START();
632 bool in_timeout
= false;
633 memcached_return_t rc
;
634 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
636 set_last_disconnected_host(server
);
640 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
642 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
645 if (server
->hostname
[0] == '/')
647 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
650 /* We need to clean up the multi startup piece */
651 switch (server
->type
)
653 case MEMCACHED_CONNECTION_UDP
:
654 case MEMCACHED_CONNECTION_TCP
:
655 rc
= network_connect(server
);
657 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
659 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
661 rc
= memcached_sasl_authenticate_connection(server
);
662 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
664 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
665 (void)closesocket(server
->fd
);
666 server
->fd
= INVALID_SOCKET
;
672 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
673 rc
= unix_socket_connect(server
);
677 if (memcached_success(rc
))
679 memcached_mark_server_as_clean(server
);
682 else if (set_last_disconnected
)
684 set_last_disconnected_host(server
);
685 if (memcached_has_current_error(*server
))
687 memcached_mark_server_for_timeout(server
);
688 assert(memcached_failed(memcached_server_error_return(server
)));
692 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
693 memcached_mark_server_for_timeout(server
);
696 LIBMEMCACHED_MEMCACHED_CONNECT_END();
701 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port
));
702 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
709 memcached_return_t
memcached_connect_try(memcached_server_write_instance_st server
)
711 return _memcached_connect(server
, false);
714 memcached_return_t
memcached_connect(memcached_server_write_instance_st server
)
716 return _memcached_connect(server
, true);