1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
45 static memcached_return_t
connect_poll(memcached_server_st
*server
)
48 fds
[0].fd
= server
->fd
;
49 fds
[0].events
= POLLOUT
;
53 if (server
->root
->poll_timeout
== 0)
55 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
58 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
60 int error
= poll(fds
, 1, server
->root
->connect_timeout
);
66 socklen_t len
= sizeof (err
);
67 (void)getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
69 // We check the value to see what happened wth the socket.
72 return MEMCACHED_SUCCESS
;
75 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
79 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
82 default: // A real error occurred and we need to completely bail
83 switch (get_socket_errno())
85 #ifdef TARGET_OS_LINUX
93 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
96 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
98 default: // This should not happen
99 if (fds
[0].revents
& POLLERR
)
102 socklen_t len
= sizeof (err
);
103 (void)getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
104 memcached_set_errno(*server
, (err
== 0) ? get_socket_errno() : err
, MEMCACHED_AT
);
108 memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
111 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
112 (void)closesocket(server
->fd
);
113 server
->fd
= INVALID_SOCKET
;
114 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
116 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
121 // This should only be possible from ERESTART or EINTR;
122 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
125 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
127 if (server
->address_info
)
129 freeaddrinfo(server
->address_info
);
130 server
->address_info
= NULL
;
131 server
->address_info_next
= NULL
;
134 char str_port
[NI_MAXSERV
];
135 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
136 if (length
>= NI_MAXSERV
or length
< 0)
138 return MEMCACHED_FAILURE
;
141 struct addrinfo hints
;
142 memset(&hints
, 0, sizeof(struct addrinfo
));
145 hints
.ai_family
= AF_INET
;
147 if (memcached_is_udp(server
->root
))
149 hints
.ai_protocol
= IPPROTO_UDP
;
150 hints
.ai_socktype
= SOCK_DGRAM
;
154 hints
.ai_socktype
= SOCK_STREAM
;
155 hints
.ai_protocol
= IPPROTO_TCP
;
158 server
->address_info
= NULL
;
160 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
166 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
169 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
172 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
175 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
179 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
182 server
->address_info_next
= server
->address_info
;
183 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
185 return MEMCACHED_SUCCESS
;
188 static inline void set_socket_nonblocking(memcached_server_st
*server
)
192 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
194 memcached_set_errno(*server
, get_socket_errno(), NULL
);
201 flags
= fcntl(server
->fd
, F_GETFL
, 0);
202 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
206 memcached_set_errno(*server
, errno
, NULL
);
208 else if ((flags
& O_NONBLOCK
) == 0)
214 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
215 } while (rval
== -1 && (errno
== EINTR
or errno
== EAGAIN
));
219 memcached_set_errno(*server
, errno
, NULL
);
225 static void set_socket_options(memcached_server_st
*server
)
227 assert_msg(server
->fd
!= -1, "invalid socket was passed to set_socket_options()");
229 if (memcached_is_udp(server
->root
))
235 if (server
->root
->snd_timeout
)
238 struct timeval waittime
;
241 waittime
.tv_usec
= server
->root
->snd_timeout
;
243 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
244 &waittime
, (socklen_t
)sizeof(struct timeval
));
245 WATCHPOINT_ASSERT(error
== 0);
250 if (server
->root
->rcv_timeout
)
253 struct timeval waittime
;
256 waittime
.tv_usec
= server
->root
->rcv_timeout
;
258 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
259 &waittime
, (socklen_t
)sizeof(struct timeval
));
260 WATCHPOINT_ASSERT(error
== 0);
265 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
268 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
270 // This is not considered a fatal error
273 WATCHPOINT_ERRNO(get_socket_errno());
274 perror("setsockopt(SO_NOSIGPIPE)");
279 if (server
->root
->flags
.no_block
)
282 struct linger linger
;
285 linger
.l_linger
= 0; /* By default on close() just drop the socket */
286 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
287 &linger
, (socklen_t
)sizeof(struct linger
));
288 WATCHPOINT_ASSERT(error
== 0);
291 if (server
->root
->flags
.tcp_nodelay
)
296 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
297 &flag
, (socklen_t
)sizeof(int));
298 WATCHPOINT_ASSERT(error
== 0);
301 if (server
->root
->flags
.tcp_keepalive
)
306 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
307 &flag
, (socklen_t
)sizeof(int));
308 WATCHPOINT_ASSERT(error
== 0);
312 if (server
->root
->tcp_keepidle
> 0)
316 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
317 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
318 WATCHPOINT_ASSERT(error
== 0);
322 if (server
->root
->send_size
> 0)
326 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
327 &server
->root
->send_size
, (socklen_t
)sizeof(int));
328 WATCHPOINT_ASSERT(error
== 0);
331 if (server
->root
->recv_size
> 0)
335 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
336 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
337 WATCHPOINT_ASSERT(error
== 0);
341 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
342 set_socket_nonblocking(server
);
345 static memcached_return_t
unix_socket_connect(memcached_server_st
*server
)
348 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
350 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
352 memcached_set_errno(*server
, errno
, NULL
);
353 return MEMCACHED_CONNECTION_FAILURE
;
356 struct sockaddr_un servAddr
;
358 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
359 servAddr
.sun_family
= AF_UNIX
;
360 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
363 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
372 case EISCONN
: /* We were spinning waiting on connect */
374 WATCHPOINT_ASSERT(0); // Programmer error
379 WATCHPOINT_ERRNO(errno
);
380 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
381 return MEMCACHED_CONNECTION_FAILURE
;
385 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
387 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
389 return MEMCACHED_SUCCESS
;
392 return MEMCACHED_NOT_SUPPORTED
;
396 static memcached_return_t
network_connect(memcached_server_st
*server
)
398 bool timeout_error_occured
= false;
400 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
401 WATCHPOINT_ASSERT(server
->cursor_active
== 0);
404 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
406 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
408 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
409 server
->address_info_next
= NULL
;
410 memcached_return_t rc
;
414 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
420 struct timespec dream
, rem
;
425 nanosleep(&dream
, &rem
);
429 if (memcached_failed(rc
))
435 if (server
->address_info_next
== NULL
)
437 server
->address_info_next
= server
->address_info
;
438 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
441 /* Create the socket */
442 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
444 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
445 if (memcached_is_udp(server
->root
) and server
->address_info_next
->ai_family
!= AF_INET
)
447 server
->address_info_next
= server
->address_info_next
->ai_next
;
451 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
452 server
->address_info_next
->ai_socktype
,
453 server
->address_info_next
->ai_protocol
)) < 0)
455 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
458 set_socket_options(server
);
460 /* connect to server */
461 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
463 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
464 return MEMCACHED_SUCCESS
;
467 /* An error occurred */
468 switch (get_socket_errno())
471 timeout_error_occured
= true;
475 case EINPROGRESS
: // nonblocking mode - first return
476 case EALREADY
: // nonblocking mode - subsequent returns
478 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
479 memcached_return_t rc
= connect_poll(server
);
481 if (memcached_success(rc
))
483 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
484 return MEMCACHED_SUCCESS
;
487 // A timeout here is treated as an error, we will not retry
488 if (rc
== MEMCACHED_TIMEOUT
)
490 timeout_error_occured
= true;
495 case EISCONN
: // we are connected :-)
496 WATCHPOINT_ASSERT(0); // This is a programmer's error
499 case EINTR
: // Special case, we retry ai_addr
500 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
501 (void)closesocket(server
->fd
);
502 server
->fd
= INVALID_SOCKET
;
509 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
510 (void)closesocket(server
->fd
);
511 server
->fd
= INVALID_SOCKET
;
512 server
->address_info_next
= server
->address_info_next
->ai_next
;
515 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
517 if (timeout_error_occured
)
519 if (server
->fd
!= INVALID_SOCKET
)
521 (void)closesocket(server
->fd
);
522 server
->fd
= INVALID_SOCKET
;
526 WATCHPOINT_STRING("Never got a good file descriptor");
528 if (memcached_has_current_error(*server
))
530 return memcached_server_error_return(server
);
533 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
535 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
538 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
545 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
546 we get caught spending cycles just waiting.
548 static memcached_return_t
backoff_handling(memcached_server_write_instance_st server
, bool& in_timeout
)
551 If we hit server_failure_limit then something is completely wrong about the server.
553 1) If autoeject is enabled we do that.
554 2) If not? We go into timeout again, there is much else to do :(
556 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
559 We just auto_eject if we hit this point
561 if (_is_auto_eject_host(server
->root
))
563 set_last_disconnected_host(server
);
564 run_distribution((memcached_st
*)server
->root
);
566 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
569 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
571 // Sanity check/setting
572 if (server
->next_retry
== 0)
574 server
->next_retry
= 1;
578 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
580 struct timeval curr_time
;
581 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
584 If next_retry is less then our current time, then we reset and try everything again.
586 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
588 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
592 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
598 return MEMCACHED_SUCCESS
;
601 memcached_return_t
memcached_connect(memcached_server_write_instance_st server
)
603 if (server
->fd
!= INVALID_SOCKET
)
605 return MEMCACHED_SUCCESS
;
608 LIBMEMCACHED_MEMCACHED_CONNECT_START();
610 bool in_timeout
= false;
611 memcached_return_t rc
;
612 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
614 set_last_disconnected_host(server
);
618 if (LIBMEMCACHED_WITH_SASL_SUPPORT
and server
->root
->sasl
.callbacks
and memcached_is_udp(server
->root
))
620 return memcached_set_error(*server
, MEMCACHED_INVALID_HOST_PROTOCOL
, MEMCACHED_AT
, memcached_literal_param("SASL is not supported for UDP connections"));
623 /* We need to clean up the multi startup piece */
624 switch (server
->type
)
626 case MEMCACHED_CONNECTION_UDP
:
627 case MEMCACHED_CONNECTION_TCP
:
628 rc
= network_connect(server
);
630 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
632 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
634 rc
= memcached_sasl_authenticate_connection(server
);
635 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
637 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
638 (void)closesocket(server
->fd
);
639 server
->fd
= INVALID_SOCKET
;
645 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
646 rc
= unix_socket_connect(server
);
650 if (memcached_success(rc
))
652 memcached_mark_server_as_clean(server
);
656 set_last_disconnected_host(server
);
657 if (memcached_has_current_error(*server
))
659 memcached_mark_server_for_timeout(server
);
660 assert(memcached_failed(memcached_server_error_return(server
)));
664 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
665 memcached_mark_server_for_timeout(server
);
668 LIBMEMCACHED_MEMCACHED_CONNECT_END();
672 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);