1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
45 static memcached_return_t
connect_poll(memcached_server_st
*server
)
48 fds
[0].fd
= server
->fd
;
49 fds
[0].events
= POLLOUT
;
53 if (server
->root
->poll_timeout
== 0)
55 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
58 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
60 int error
= poll(fds
, 1, server
->root
->connect_timeout
);
66 socklen_t len
= sizeof (err
);
67 (void)getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
69 // We check the value to see what happened wth the socket.
72 return MEMCACHED_SUCCESS
;
75 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
79 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
82 default: // A real error occurred and we need to completely bail
83 switch (get_socket_errno())
85 #ifdef TARGET_OS_LINUX
93 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
96 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
98 default: // This should not happen
99 if (fds
[0].revents
& POLLERR
)
102 socklen_t len
= sizeof (err
);
103 (void)getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
104 memcached_set_errno(*server
, (err
== 0) ? get_socket_errno() : err
, MEMCACHED_AT
);
108 memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
111 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
112 (void)closesocket(server
->fd
);
113 server
->fd
= INVALID_SOCKET
;
114 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
116 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
121 // This should only be possible from ERESTART or EINTR;
122 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
125 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
127 if (server
->address_info
)
129 freeaddrinfo(server
->address_info
);
130 server
->address_info
= NULL
;
131 server
->address_info_next
= NULL
;
134 char str_port
[NI_MAXSERV
];
135 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
136 if (length
>= NI_MAXSERV
or length
< 0)
138 return MEMCACHED_FAILURE
;
141 struct addrinfo hints
;
142 memset(&hints
, 0, sizeof(struct addrinfo
));
145 hints
.ai_family
= AF_INET
;
147 if (server
->type
== MEMCACHED_CONNECTION_UDP
)
149 hints
.ai_protocol
= IPPROTO_UDP
;
150 hints
.ai_socktype
= SOCK_DGRAM
;
154 hints
.ai_socktype
= SOCK_STREAM
;
155 hints
.ai_protocol
= IPPROTO_TCP
;
158 server
->address_info
= NULL
;
160 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
166 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
169 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
172 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
175 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
179 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
182 server
->address_info_next
= server
->address_info
;
183 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
185 return MEMCACHED_SUCCESS
;
188 static inline void set_socket_nonblocking(memcached_server_st
*server
)
192 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
194 memcached_set_errno(*server
, get_socket_errno(), NULL
);
201 flags
= fcntl(server
->fd
, F_GETFL
, 0);
202 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
206 memcached_set_errno(*server
, errno
, NULL
);
208 else if ((flags
& O_NONBLOCK
) == 0)
214 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
215 } while (rval
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
217 unlikely (rval
== -1)
219 memcached_set_errno(*server
, errno
, NULL
);
225 static void set_socket_options(memcached_server_st
*server
)
227 assert_msg(server
->fd
!= -1, "invalid socket was passed to set_socket_options()");
229 if (server
->type
== MEMCACHED_CONNECTION_UDP
)
235 if (server
->root
->snd_timeout
)
238 struct timeval waittime
;
241 waittime
.tv_usec
= server
->root
->snd_timeout
;
243 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
244 &waittime
, (socklen_t
)sizeof(struct timeval
));
245 WATCHPOINT_ASSERT(error
== 0);
250 if (server
->root
->rcv_timeout
)
253 struct timeval waittime
;
256 waittime
.tv_usec
= server
->root
->rcv_timeout
;
258 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
259 &waittime
, (socklen_t
)sizeof(struct timeval
));
260 WATCHPOINT_ASSERT(error
== 0);
265 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
268 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
270 // This is not considered a fatal error
273 WATCHPOINT_ERRNO(get_socket_errno());
274 perror("setsockopt(SO_NOSIGPIPE)");
279 if (server
->root
->flags
.no_block
)
282 struct linger linger
;
285 linger
.l_linger
= 0; /* By default on close() just drop the socket */
286 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
287 &linger
, (socklen_t
)sizeof(struct linger
));
288 WATCHPOINT_ASSERT(error
== 0);
291 if (server
->root
->flags
.tcp_nodelay
)
296 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
297 &flag
, (socklen_t
)sizeof(int));
298 WATCHPOINT_ASSERT(error
== 0);
301 if (server
->root
->flags
.tcp_keepalive
)
306 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
307 &flag
, (socklen_t
)sizeof(int));
308 WATCHPOINT_ASSERT(error
== 0);
312 if (server
->root
->tcp_keepidle
> 0)
316 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
317 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
318 WATCHPOINT_ASSERT(error
== 0);
322 if (server
->root
->send_size
> 0)
326 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
327 &server
->root
->send_size
, (socklen_t
)sizeof(int));
328 WATCHPOINT_ASSERT(error
== 0);
331 if (server
->root
->recv_size
> 0)
335 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
336 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
337 WATCHPOINT_ASSERT(error
== 0);
341 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
342 set_socket_nonblocking(server
);
345 static memcached_return_t
unix_socket_connect(memcached_server_st
*server
)
348 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
350 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
352 memcached_set_errno(*server
, errno
, NULL
);
353 return MEMCACHED_CONNECTION_FAILURE
;
356 struct sockaddr_un servAddr
;
358 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
359 servAddr
.sun_family
= AF_UNIX
;
360 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
363 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
372 case EISCONN
: /* We were spinning waiting on connect */
374 WATCHPOINT_ASSERT(0); // Programmer error
379 WATCHPOINT_ERRNO(errno
);
380 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
381 return MEMCACHED_CONNECTION_FAILURE
;
385 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
387 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
389 return MEMCACHED_SUCCESS
;
392 return MEMCACHED_NOT_SUPPORTED
;
396 static memcached_return_t
network_connect(memcached_server_st
*server
)
398 bool timeout_error_occured
= false;
400 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
401 WATCHPOINT_ASSERT(server
->cursor_active
== 0);
403 if (server
->address_info
== NULL
or server
->address_info_next
== NULL
)
405 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
406 memcached_return_t rc
;
410 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
416 struct timespec dream
, rem
;
421 nanosleep(&dream
, &rem
);
425 if (memcached_failed(rc
))
431 /* Create the socket */
432 while (server
->address_info_next
and server
->fd
== INVALID_SOCKET
)
434 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
435 if (server
->type
== MEMCACHED_CONNECTION_UDP
&& server
->address_info_next
->ai_family
!= AF_INET
)
437 server
->address_info_next
= server
->address_info_next
->ai_next
;
441 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
442 server
->address_info_next
->ai_socktype
,
443 server
->address_info_next
->ai_protocol
)) < 0)
445 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
448 set_socket_options(server
);
450 /* connect to server */
451 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
453 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
454 return MEMCACHED_SUCCESS
;
457 /* An error occurred */
458 switch (get_socket_errno())
461 timeout_error_occured
= true;
465 case EINPROGRESS
: // nonblocking mode - first return
466 case EALREADY
: // nonblocking mode - subsequent returns
468 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
469 memcached_return_t rc
= connect_poll(server
);
471 if (memcached_success(rc
))
473 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
474 return MEMCACHED_SUCCESS
;
477 // A timeout here is treated as an error, we will not retry
478 if (rc
== MEMCACHED_TIMEOUT
)
480 timeout_error_occured
= true;
485 case EISCONN
: // we are connected :-)
486 WATCHPOINT_ASSERT(0); // This is a programmer's error
489 case EINTR
: // Special case, we retry ai_addr
490 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
491 (void)closesocket(server
->fd
);
492 server
->fd
= INVALID_SOCKET
;
499 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
500 (void)closesocket(server
->fd
);
501 server
->fd
= INVALID_SOCKET
;
502 server
->address_info_next
= server
->address_info_next
->ai_next
;
505 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
507 if (timeout_error_occured
)
509 if (server
->fd
!= INVALID_SOCKET
)
511 (void)closesocket(server
->fd
);
512 server
->fd
= INVALID_SOCKET
;
516 WATCHPOINT_STRING("Never got a good file descriptor");
518 if (memcached_has_current_error(*server
))
520 return memcached_server_error_return(server
);
523 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
525 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
528 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
535 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
536 we get caught spending cycles just waiting.
538 static memcached_return_t
backoff_handling(memcached_server_write_instance_st server
, bool& in_timeout
)
541 If we hit server_failure_limit then something is completely wrong about the server.
543 1) If autoeject is enabled we do that.
544 2) If not? We go into timeout again, there is much else to do :(
546 if (server
->server_failure_counter
>= server
->root
->server_failure_limit
)
549 We just auto_eject if we hit this point
551 if (_is_auto_eject_host(server
->root
))
553 set_last_disconnected_host(server
);
554 run_distribution((memcached_st
*)server
->root
);
556 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
559 server
->state
= MEMCACHED_SERVER_STATE_IN_TIMEOUT
;
561 // Sanity check/setting
562 if (server
->next_retry
== 0)
564 server
->next_retry
= 1;
568 if (server
->state
== MEMCACHED_SERVER_STATE_IN_TIMEOUT
)
570 struct timeval curr_time
;
571 bool _gettime_success
= (gettimeofday(&curr_time
, NULL
) == 0);
574 If next_retry is less then our current time, then we reset and try everything again.
576 if (_gettime_success
and server
->next_retry
< curr_time
.tv_sec
)
578 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
582 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);
588 return MEMCACHED_SUCCESS
;
591 memcached_return_t
memcached_connect(memcached_server_write_instance_st server
)
593 if (server
->fd
!= INVALID_SOCKET
)
595 return MEMCACHED_SUCCESS
;
598 LIBMEMCACHED_MEMCACHED_CONNECT_START();
600 bool in_timeout
= false;
601 memcached_return_t rc
;
602 if (memcached_failed(rc
= backoff_handling(server
, in_timeout
)))
604 set_last_disconnected_host(server
);
608 /* We need to clean up the multi startup piece */
609 switch (server
->type
)
611 case MEMCACHED_CONNECTION_UDP
:
612 case MEMCACHED_CONNECTION_TCP
:
613 rc
= network_connect(server
);
615 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
617 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
619 rc
= memcached_sasl_authenticate_connection(server
);
620 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
622 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
623 (void)closesocket(server
->fd
);
624 server
->fd
= INVALID_SOCKET
;
630 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
631 rc
= unix_socket_connect(server
);
635 if (memcached_success(rc
))
637 memcached_mark_server_as_clean(server
);
641 set_last_disconnected_host(server
);
642 if (memcached_has_current_error(*server
))
644 memcached_mark_server_for_timeout(server
);
645 assert(memcached_failed(memcached_server_error_return(server
)));
649 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
650 memcached_mark_server_for_timeout(server
);
653 LIBMEMCACHED_MEMCACHED_CONNECT_END();
657 return memcached_set_error(*server
, MEMCACHED_SERVER_TEMPORARILY_DISABLED
, MEMCACHED_AT
);