1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
45 static memcached_return_t
connect_poll(memcached_server_st
*server
)
48 fds
[0].fd
= server
->fd
;
49 fds
[0].events
= POLLOUT
;
53 if (server
->root
->poll_timeout
== 0)
55 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
58 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
60 int error
= poll(fds
, 1, server
->root
->connect_timeout
);
66 socklen_t len
= sizeof (err
);
67 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
69 // We check the value to see what happened wth the socket.
72 return MEMCACHED_SUCCESS
;
77 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
81 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
84 default: // A real error occurred and we need to completely bail
85 switch (get_socket_errno())
87 #ifdef TARGET_OS_LINUX
95 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
98 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
100 default: // This should not happen
101 if (fds
[0].revents
& POLLERR
)
104 socklen_t len
= sizeof(err
);
105 if (getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
) == 0)
109 // This should never happen, if it does? Punt.
116 int local_errno
= get_socket_errno(); // We cache in case closesocket() modifies errno
118 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
119 (void)closesocket(server
->fd
);
120 server
->fd
= INVALID_SOCKET
;
121 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
123 return memcached_set_errno(*server
, local_errno
, MEMCACHED_AT
);
128 // This should only be possible from ERESTART or EINTR;
129 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
132 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
134 assert(server
->type
!= MEMCACHED_CONNECTION_UNIX_SOCKET
);
135 if (server
->address_info
)
137 freeaddrinfo(server
->address_info
);
138 server
->address_info
= NULL
;
139 server
->address_info_next
= NULL
;
142 char str_port
[NI_MAXSERV
];
143 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
144 if (length
>= NI_MAXSERV
or length
< 0)
146 return MEMCACHED_FAILURE
;
149 struct addrinfo hints
;
150 memset(&hints
, 0, sizeof(struct addrinfo
));
153 hints
.ai_family
= AF_INET
;
155 if (memcached_is_udp(server
->root
))
157 hints
.ai_protocol
= IPPROTO_UDP
;
158 hints
.ai_socktype
= SOCK_DGRAM
;
162 hints
.ai_socktype
= SOCK_STREAM
;
163 hints
.ai_protocol
= IPPROTO_TCP
;
166 server
->address_info
= NULL
;
168 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
174 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
177 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
180 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
183 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
187 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
190 server
->address_info_next
= server
->address_info
;
191 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
193 return MEMCACHED_SUCCESS
;
196 static inline void set_socket_nonblocking(memcached_server_st
*server
)
200 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
202 memcached_set_errno(*server
, get_socket_errno(), NULL
);
209 flags
= fcntl(server
->fd
, F_GETFL
, 0);
210 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
214 memcached_set_errno(*server
, errno
, NULL
);
216 else if ((flags
& O_NONBLOCK
) == 0)
222 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
223 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
227 memcached_set_errno(*server
, errno
, NULL
);
233 static void set_socket_options(memcached_server_st
*server
)
235 assert_msg(server
->fd
!= INVALID_SOCKET
, "invalid socket was passed to set_socket_options()");
237 if (memcached_is_udp(server
->root
))
243 if (server
->root
->snd_timeout
)
246 struct timeval waittime
;
249 waittime
.tv_usec
= server
->root
->snd_timeout
;
251 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
252 &waittime
, (socklen_t
)sizeof(struct timeval
));
253 WATCHPOINT_ASSERT(error
== 0);
258 if (server
->root
->rcv_timeout
)
261 struct timeval waittime
;
264 waittime
.tv_usec
= server
->root
->rcv_timeout
;
266 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
267 &waittime
, (socklen_t
)sizeof(struct timeval
));
268 WATCHPOINT_ASSERT(error
== 0);
273 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
276 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
278 // This is not considered a fatal error
281 WATCHPOINT_ERRNO(get_socket_errno());
282 perror("setsockopt(SO_NOSIGPIPE)");
287 if (server
->root
->flags
.no_block
)
290 struct linger linger
;
293 linger
.l_linger
= 0; /* By default on close() just drop the socket */
294 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
295 &linger
, (socklen_t
)sizeof(struct linger
));
296 WATCHPOINT_ASSERT(error
== 0);
299 if (server
->root
->flags
.tcp_nodelay
)
304 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
305 &flag
, (socklen_t
)sizeof(int));
306 WATCHPOINT_ASSERT(error
== 0);
309 if (server
->root
->flags
.tcp_keepalive
)
314 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
315 &flag
, (socklen_t
)sizeof(int));
316 WATCHPOINT_ASSERT(error
== 0);
320 if (server
->root
->tcp_keepidle
> 0)
324 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
325 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
326 WATCHPOINT_ASSERT(error
== 0);
330 if (server
->root
->send_size
> 0)
334 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
335 &server
->root
->send_size
, (socklen_t
)sizeof(int));
336 WATCHPOINT_ASSERT(error
== 0);
339 if (server
->root
->recv_size
> 0)
343 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
344 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
345 WATCHPOINT_ASSERT(error
== 0);
349 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
350 set_socket_nonblocking(server
);
353 static memcached_return_t
unix_socket_connect(memcached_server_st
*server
)
356 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
358 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
360 memcached_set_errno(*server
, errno
, NULL
);
361 return MEMCACHED_CONNECTION_FAILURE
;
364 struct sockaddr_un servAddr
;
366 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
367 servAddr
.sun_family
= AF_UNIX
;
368 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
371 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
380 case EISCONN
: /* We were spinning waiting on connect */
382 WATCHPOINT_ASSERT(0); // Programmer error
387 WATCHPOINT_ERRNO(errno
);
388 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
389 return MEMCACHED_CONNECTION_FAILURE
;
393 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
395 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
397 return MEMCACHED_SUCCESS
;
400 return MEMCACHED_NOT_SUPPORTED
;
404 static memcached_return_t
network_connect(memcached_server_st
*server
)
406 bool timeout_error_occured
= false;
408 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
409 WATCHPOINT_ASSERT(server
->cursor_active
== 0);
412 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
414 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
416 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
417 server
->address_info_next
= NULL
;
418 memcached_return_t rc
;
422 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
428 struct timespec dream
, rem
;
433 nanosleep(&dream
, &rem
);
437 if (memcached_failed(rc
))
443 if (server
->address_info_next
== NULL
)
445 server
->address_info_next
= server
->address_info
;
446 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
449 /* Create the socket */
450 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
452 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
453 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
455 server
->address_info_next
= server
->address_info_next
->ai_next
;
459 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
460 server
->address_info_next
->ai_socktype
,
461 server
->address_info_next
->ai_protocol
)) < 0)
463 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
466 set_socket_options(server
);
468 /* connect to server */
469 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
471 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
472 return MEMCACHED_SUCCESS
;
475 /* An error occurred */
476 switch (get_socket_errno())
479 timeout_error_occured
= true;
483 #if EWOULDBLOCK != EAGAIN
486 case EINPROGRESS
: // nonblocking mode - first return
487 case EALREADY
: // nonblocking mode - subsequent returns
489 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
490 memcached_return_t rc
= connect_poll(server
);
492 if (memcached_success(rc
))
494 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
495 return MEMCACHED_SUCCESS
;
498 // A timeout here is treated as an error, we will not retry
499 if (rc
== MEMCACHED_TIMEOUT
)
501 timeout_error_occured
= true;
506 case EISCONN
: // we are connected :-)
507 WATCHPOINT_ASSERT(0); // This is a programmer's error
510 case EINTR
: // Special case, we retry ai_addr
511 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
512 (void)closesocket(server
->fd
);
513 server
->fd
= INVALID_SOCKET
;
520 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
521 (void)closesocket(server
->fd
);
522 server
->fd
= INVALID_SOCKET
;
523 server
->address_info_next
= server
->address_info_next
->ai_next
;
526 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
528 if (timeout_error_occured
)
530 if (server
->fd
!= INVALID_SOCKET
)
532 (void)closesocket(server
->fd
);
533 server
->fd
= INVALID_SOCKET
;
537 WATCHPOINT_STRING("Never got a good file descriptor");
539 if (memcached_has_current_error(*server
))
541 return memcached_server_error_return(server
);
544 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
546 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
549 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
556 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
557 we get caught spending cycles just waiting.
559 static memcached_return_t
backoff_handling(memcached_server_write_instance_st server
, bool& in_timeout
)
561 struct timeval curr_time
;
562 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
565 If we hit server_failure_limit then something is completely wrong about the server.
567 1) If autoeject is enabled we do that.
568 2) If not? We go into timeout again, there is much else to do :(
570 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
573 We just auto_eject if we hit this point
575 if (_is_auto_eject_host(server
->root
))
577 set_last_disconnected_host(server
);
579 // Retry dead servers if requested
580 if (_gettime_success
and server
->root
->dead_timeout
> 0)
582 server
->next_retry
= curr_time
.tv_sec
+server
->root
->dead_timeout
;
584 // We only retry dead servers once before assuming failure again
585 server
->server_failure_counter
= server
->root
->server_failure_limit
-1;
588 memcached_return_t rc
;
589 if (memcached_failed(rc
= run_distribution((memcached_st
*)server
->root
)))
591 return memcached_set_error(*server
, rc
, MEMCACHED_AT
, memcached_literal_param("Backoff handling failed during run_distribution"));
594 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
597 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
599 // Sanity check/setting
600 if (server
->next_retry
== 0)
602 server
->next_retry
= 1;
606 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
609 If next_retry is less then our current time, then we reset and try everything again.
611 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
613 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
617 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
623 return MEMCACHED_SUCCESS
;
626 static memcached_return_t
_memcached_connect(memcached_server_write_instance_st server
, const bool set_last_disconnected
)
628 if (server
->fd
!= INVALID_SOCKET
)
630 return MEMCACHED_SUCCESS
;
633 LIBMEMCACHED_MEMCACHED_CONNECT_START();
635 bool in_timeout
= false;
636 memcached_return_t rc
;
637 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
639 set_last_disconnected_host(server
);
643 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
645 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
648 if (server
->hostname
[0] == '/')
650 server
->type
= MEMCACHED_CONNECTION_UNIX_SOCKET
;
653 /* We need to clean up the multi startup piece */
654 switch (server
->type
)
656 case MEMCACHED_CONNECTION_UDP
:
657 case MEMCACHED_CONNECTION_TCP
:
658 rc
= network_connect(server
);
660 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
662 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
664 rc
= memcached_sasl_authenticate_connection(server
);
665 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
667 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
668 (void)closesocket(server
->fd
);
669 server
->fd
= INVALID_SOCKET
;
675 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
676 rc
= unix_socket_connect(server
);
680 if (memcached_success(rc
))
682 memcached_mark_server_as_clean(server
);
685 else if (set_last_disconnected
)
687 set_last_disconnected_host(server
);
688 if (memcached_has_current_error(*server
))
690 memcached_mark_server_for_timeout(server
);
691 assert(memcached_failed(memcached_server_error_return(server
)));
695 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
696 memcached_mark_server_for_timeout(server
);
699 LIBMEMCACHED_MEMCACHED_CONNECT_END();
704 int snprintf_length
= snprintf(buffer
, sizeof(buffer
), "%s:%d", server
->hostname
, int(server
->port
));
705 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
, buffer
, snprintf_length
);
712 memcached_return_t
memcached_connect_try(memcached_server_write_instance_st server
)
714 return _memcached_connect(server
, false);
717 memcached_return_t
memcached_connect(memcached_server_write_instance_st server
)
719 return _memcached_connect(server
, true);