1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
43 static memcached_return_t
connect_poll(memcached_server_st
*server
)
46 fds
[0].fd
= server
->fd
;
47 fds
[0].events
= POLLOUT
;
51 if (server
->root
->poll_timeout
== 0)
53 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
56 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
58 int error
= poll(fds
, 1, server
->root
->connect_timeout
);
64 socklen_t len
= sizeof (err
);
65 (void)getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
67 // We check the value to see what happened wth the socket.
70 return MEMCACHED_SUCCESS
;
73 return memcached_set_errno(*server
, err
, MEMCACHED_AT
);
77 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
80 default: // A real error occurred and we need to completely bail
81 switch (get_socket_errno())
83 #ifdef TARGET_OS_LINUX
91 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
);
94 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
96 default: // This should not happen
97 if (fds
[0].revents
& POLLERR
)
100 socklen_t len
= sizeof (err
);
101 (void)getsockopt(server
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
102 memcached_set_errno(*server
, (err
== 0) ? get_socket_errno() : err
, MEMCACHED_AT
);
106 memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
109 assert_msg(server
->fd
!= INVALID_SOCKET
, "poll() was passed an invalid file descriptor");
110 (void)closesocket(server
->fd
);
111 server
->fd
= INVALID_SOCKET
;
112 server
->state
= MEMCACHED_SERVER_STATE_NEW
;
114 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
119 // This should only be possible from ERESTART or EINTR;
120 return memcached_set_errno(*server
, get_socket_errno(), MEMCACHED_AT
);
123 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
125 if (server
->address_info
)
127 freeaddrinfo(server
->address_info
);
128 server
->address_info
= NULL
;
129 server
->address_info_next
= NULL
;
132 char str_port
[NI_MAXSERV
];
133 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
134 if (length
>= NI_MAXSERV
or length
< 0)
136 return MEMCACHED_FAILURE
;
139 struct addrinfo hints
;
140 memset(&hints
, 0, sizeof(struct addrinfo
));
143 hints
.ai_family
= AF_INET
;
145 if (server
->type
== MEMCACHED_CONNECTION_UDP
)
147 hints
.ai_protocol
= IPPROTO_UDP
;
148 hints
.ai_socktype
= SOCK_DGRAM
;
152 hints
.ai_socktype
= SOCK_STREAM
;
153 hints
.ai_protocol
= IPPROTO_TCP
;
156 server
->address_info
= NULL
;
158 switch(errcode
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
))
164 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
167 return memcached_set_errno(*server
, errno
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
170 return memcached_set_error(*server
, MEMCACHED_INVALID_ARGUMENTS
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
173 return memcached_set_error(*server
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
177 return memcached_set_error(*server
, MEMCACHED_HOST_LOOKUP_FAILURE
, MEMCACHED_AT
, memcached_string_make_from_cstr(gai_strerror(errcode
)));
180 server
->address_info_next
= server
->address_info
;
181 server
->state
= MEMCACHED_SERVER_STATE_ADDRINFO
;
183 return MEMCACHED_SUCCESS
;
186 static inline void set_socket_nonblocking(memcached_server_st
*server
)
190 if (ioctlsocket(server
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
192 memcached_set_errno(*server
, get_socket_errno(), NULL
);
199 flags
= fcntl(server
->fd
, F_GETFL
, 0);
200 } while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
204 memcached_set_errno(*server
, errno
, NULL
);
206 else if ((flags
& O_NONBLOCK
) == 0)
212 rval
= fcntl(server
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
213 } while (rval
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
215 unlikely (rval
== -1)
217 memcached_set_errno(*server
, errno
, NULL
);
223 static void set_socket_options(memcached_server_st
*server
)
225 assert_msg(server
->fd
!= -1, "invalid socket was passed to set_socket_options()");
227 if (server
->type
== MEMCACHED_CONNECTION_UDP
)
233 if (server
->root
->snd_timeout
)
236 struct timeval waittime
;
239 waittime
.tv_usec
= server
->root
->snd_timeout
;
241 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
242 &waittime
, (socklen_t
)sizeof(struct timeval
));
243 WATCHPOINT_ASSERT(error
== 0);
248 if (server
->root
->rcv_timeout
)
251 struct timeval waittime
;
254 waittime
.tv_usec
= server
->root
->rcv_timeout
;
256 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
257 &waittime
, (socklen_t
)sizeof(struct timeval
));
258 WATCHPOINT_ASSERT(error
== 0);
263 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
266 int error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
268 // This is not considered a fatal error
271 WATCHPOINT_ERRNO(get_socket_errno());
272 perror("setsockopt(SO_NOSIGPIPE)");
277 if (server
->root
->flags
.no_block
)
280 struct linger linger
;
283 linger
.l_linger
= 0; /* By default on close() just drop the socket */
284 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_LINGER
,
285 &linger
, (socklen_t
)sizeof(struct linger
));
286 WATCHPOINT_ASSERT(error
== 0);
289 if (server
->root
->flags
.tcp_nodelay
)
294 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
295 &flag
, (socklen_t
)sizeof(int));
296 WATCHPOINT_ASSERT(error
== 0);
299 if (server
->root
->flags
.tcp_keepalive
)
304 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
305 &flag
, (socklen_t
)sizeof(int));
306 WATCHPOINT_ASSERT(error
== 0);
310 if (server
->root
->tcp_keepidle
> 0)
314 error
= setsockopt(server
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
315 &server
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
316 WATCHPOINT_ASSERT(error
== 0);
320 if (server
->root
->send_size
> 0)
324 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_SNDBUF
,
325 &server
->root
->send_size
, (socklen_t
)sizeof(int));
326 WATCHPOINT_ASSERT(error
== 0);
329 if (server
->root
->recv_size
> 0)
333 error
= setsockopt(server
->fd
, SOL_SOCKET
, SO_RCVBUF
,
334 &server
->root
->recv_size
, (socklen_t
)sizeof(int));
335 WATCHPOINT_ASSERT(error
== 0);
339 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
340 set_socket_nonblocking(server
);
343 static memcached_return_t
unix_socket_connect(memcached_server_st
*server
)
346 WATCHPOINT_ASSERT(server
->fd
== -1);
348 if ((server
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
350 memcached_set_errno(*server
, errno
, NULL
);
351 return MEMCACHED_CONNECTION_FAILURE
;
354 struct sockaddr_un servAddr
;
356 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
357 servAddr
.sun_family
= AF_UNIX
;
358 strncpy(servAddr
.sun_path
, server
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
361 if (connect(server
->fd
, (struct sockaddr
*)&servAddr
, sizeof(servAddr
)) < 0)
370 case EISCONN
: /* We were spinning waiting on connect */
372 WATCHPOINT_ASSERT(0); // Programmer error
377 WATCHPOINT_ERRNO(errno
);
378 memcached_set_errno(*server
, errno
, MEMCACHED_AT
);
379 return MEMCACHED_CONNECTION_FAILURE
;
383 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
385 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
387 return MEMCACHED_SUCCESS
;
390 return MEMCACHED_NOT_SUPPORTED
;
394 static memcached_return_t
network_connect(memcached_server_st
*server
)
396 bool timeout_error_occured
= false;
398 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
399 WATCHPOINT_ASSERT(server
->cursor_active
== 0);
401 if (not server
->address_info
)
403 WATCHPOINT_ASSERT(server
->state
== MEMCACHED_SERVER_STATE_NEW
);
404 memcached_return_t rc
;
408 if ((rc
= set_hostinfo(server
)) != MEMCACHED_TIMEOUT
)
414 struct timespec dream
, rem
;
419 nanosleep(&dream
, &rem
);
423 if (memcached_failed(rc
))
427 /* Create the socket */
428 while (server
->address_info_next
&& server
->fd
== INVALID_SOCKET
)
430 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
431 if (server
->type
== MEMCACHED_CONNECTION_UDP
&& server
->address_info_next
->ai_family
!= AF_INET
)
433 server
->address_info_next
= server
->address_info_next
->ai_next
;
437 if ((server
->fd
= socket(server
->address_info_next
->ai_family
,
438 server
->address_info_next
->ai_socktype
,
439 server
->address_info_next
->ai_protocol
)) < 0)
441 return memcached_set_errno(*server
, get_socket_errno(), NULL
);
444 set_socket_options(server
);
446 /* connect to server */
447 if ((connect(server
->fd
, server
->address_info_next
->ai_addr
, server
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
449 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
450 return MEMCACHED_SUCCESS
;
453 /* An error occurred */
454 switch (get_socket_errno())
457 timeout_error_occured
= true;
461 case EINPROGRESS
: // nonblocking mode - first return
462 case EALREADY
: // nonblocking mode - subsequent returns
464 server
->state
= MEMCACHED_SERVER_STATE_IN_PROGRESS
;
465 memcached_return_t rc
= connect_poll(server
);
467 if (memcached_success(rc
))
469 server
->state
= MEMCACHED_SERVER_STATE_CONNECTED
;
470 return MEMCACHED_SUCCESS
;
473 // A timeout here is treated as an error, we will not retry
474 if (rc
== MEMCACHED_TIMEOUT
)
476 timeout_error_occured
= true;
481 case EISCONN
: // we are connected :-)
482 WATCHPOINT_ASSERT(0); // This is a programmer's error
485 case EINTR
: // Special case, we retry ai_addr
486 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
487 (void)closesocket(server
->fd
);
488 server
->fd
= INVALID_SOCKET
;
495 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
496 (void)closesocket(server
->fd
);
497 server
->fd
= INVALID_SOCKET
;
498 server
->address_info_next
= server
->address_info_next
->ai_next
;
501 WATCHPOINT_ASSERT(server
->fd
== INVALID_SOCKET
);
503 if (timeout_error_occured
)
505 if (server
->fd
!= INVALID_SOCKET
)
507 (void)closesocket(server
->fd
);
508 server
->fd
= INVALID_SOCKET
;
512 WATCHPOINT_STRING("Never got a good file descriptor");
513 /* Failed to connect. schedule next retry */
514 if (server
->root
->retry_timeout
)
516 struct timeval next_time
;
518 if (gettimeofday(&next_time
, NULL
) == 0)
520 server
->next_retry
= next_time
.tv_sec
+ server
->root
->retry_timeout
;
524 if (memcached_has_current_error(*server
))
526 return memcached_server_error_return(server
);
529 if (timeout_error_occured
and server
->state
< MEMCACHED_SERVER_STATE_IN_PROGRESS
)
531 return memcached_set_error(*server
, MEMCACHED_TIMEOUT
, MEMCACHED_AT
);
534 return memcached_set_error(*server
, MEMCACHED_CONNECTION_FAILURE
, MEMCACHED_AT
); /* The last error should be from connect() */
537 void set_last_disconnected_host(memcached_server_write_instance_st self
)
540 memcached_st
*root
= (memcached_st
*)self
->root
;
542 memcached_server_free(root
->last_disconnected_server
);
543 root
->last_disconnected_server
= memcached_server_clone(NULL
, self
);
546 memcached_return_t
memcached_connect(memcached_server_write_instance_st server
)
548 memcached_return_t rc
= MEMCACHED_NO_SERVERS
;
550 if (server
->fd
!= INVALID_SOCKET
)
552 return MEMCACHED_SUCCESS
;
555 LIBMEMCACHED_MEMCACHED_CONNECT_START();
557 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
558 WATCHPOINT_ASSERT(server
->root
);
559 if (server
->root
->retry_timeout
and server
->next_retry
)
561 struct timeval curr_time
;
563 gettimeofday(&curr_time
, NULL
);
565 // We should optimize this to remove the allocation if the server was
566 // the last server to die
567 if (server
->next_retry
> curr_time
.tv_sec
)
569 set_last_disconnected_host(server
);
571 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
575 // If we are over the counter failure, we just fail. Reject host only
576 // works if you have a set number of failures.
577 if (server
->root
->server_failure_limit
and server
->server_failure_counter
>= server
->root
->server_failure_limit
)
579 set_last_disconnected_host(server
);
581 // @todo fix this by fixing behavior to no longer make use of
583 if (_is_auto_eject_host(server
->root
))
585 run_distribution((memcached_st
*)server
->root
);
588 return memcached_set_error(*server
, MEMCACHED_SERVER_MARKED_DEAD
, MEMCACHED_AT
);
591 /* We need to clean up the multi startup piece */
592 switch (server
->type
)
594 case MEMCACHED_CONNECTION_UDP
:
595 case MEMCACHED_CONNECTION_TCP
:
596 rc
= network_connect(server
);
597 if (LIBMEMCACHED_WITH_SASL_SUPPORT
)
599 if (server
->fd
!= INVALID_SOCKET
and server
->root
->sasl
.callbacks
)
601 rc
= memcached_sasl_authenticate_connection(server
);
602 if (memcached_failed(rc
) and server
->fd
!= INVALID_SOCKET
)
604 WATCHPOINT_ASSERT(server
->fd
!= INVALID_SOCKET
);
605 (void)closesocket(server
->fd
);
606 server
->fd
= INVALID_SOCKET
;
612 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
613 rc
= unix_socket_connect(server
);
617 if (memcached_success(rc
))
619 server
->server_failure_counter
= 0;
620 server
->next_retry
= 0;
622 else if (memcached_has_current_error(*server
))
624 server
->server_failure_counter
++;
625 set_last_disconnected_host(server
);
629 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
630 server
->server_failure_counter
++;
631 set_last_disconnected_host(server
);
634 LIBMEMCACHED_MEMCACHED_CONNECT_END();