2 * Copyright (C) 2006-2010 Brian Aker
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
8 * Summary: Server IO, Not public!
12 #include <libmemcached/common.h>
17 static memcached_return_t
connect_poll(memcached_server_st
*ptr
)
21 fds
[0].events
= POLLOUT
;
26 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
28 error
= poll(fds
, 1, ptr
->root
->connect_timeout
);
35 socklen_t len
= sizeof (err
);
36 (void)getsockopt(ptr
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
38 // We check the value to see what happened wth the socket.
41 return MEMCACHED_SUCCESS
;
45 ptr
->cached_errno
= errno
;
47 return MEMCACHED_ERRNO
;
51 return MEMCACHED_TIMEOUT
;
52 default: // A real error occurred and we need to completely bail
53 WATCHPOINT_ERRNO(get_socket_errno());
54 switch (get_socket_errno())
56 #ifdef TARGET_OS_LINUX
62 if (fds
[0].revents
& POLLERR
)
65 socklen_t len
= sizeof (err
);
66 (void)getsockopt(ptr
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
67 ptr
->cached_errno
= (err
== 0) ? get_socket_errno() : err
;
71 ptr
->cached_errno
= get_socket_errno();
74 (void)closesocket(ptr
->fd
);
75 ptr
->fd
= INVALID_SOCKET
;
77 return MEMCACHED_ERRNO
;
82 // This should only be possible from ERESTART or EINTR;
83 ptr
->cached_errno
= get_socket_errno();
85 return MEMCACHED_ERRNO
;
88 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
90 struct addrinfo hints
;
91 char str_port
[NI_MAXSERV
];
93 assert(! server
->address_info
); // We cover the case where a programming mistake has been made.
94 if (server
->address_info
)
96 freeaddrinfo(server
->address_info
);
97 server
->address_info
= NULL
;
98 server
->address_info_next
= NULL
;
101 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
102 if (length
>= NI_MAXSERV
|| length
< 0)
103 return MEMCACHED_FAILURE
;
105 memset(&hints
, 0, sizeof(hints
));
108 hints
.ai_family
= AF_INET
;
110 if (server
->type
== MEMCACHED_CONNECTION_UDP
)
112 hints
.ai_protocol
= IPPROTO_UDP
;
113 hints
.ai_socktype
= SOCK_DGRAM
;
117 hints
.ai_socktype
= SOCK_STREAM
;
118 hints
.ai_protocol
= IPPROTO_TCP
;
124 int e
= getaddrinfo(server
->hostname
, str_port
, &hints
, &server
->address_info
);
130 else if (e
== EAI_AGAIN
)
133 struct timespec dream
, rem
;
138 nanosleep(&dream
, &rem
);
144 WATCHPOINT_STRING(server
->hostname
);
145 WATCHPOINT_STRING(gai_strerror(e
));
146 return MEMCACHED_HOST_LOOKUP_FAILURE
;
150 server
->address_info_next
= server
->address_info
;
152 return MEMCACHED_SUCCESS
;
155 static inline memcached_return_t
set_socket_nonblocking(memcached_server_st
*ptr
)
159 if (ioctlsocket(ptr
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
161 ptr
->cached_errno
= get_socket_errno();
162 return MEMCACHED_CONNECTION_FAILURE
;
169 flags
= fcntl(ptr
->fd
, F_GETFL
, 0);
171 while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
173 unlikely (flags
== -1)
175 ptr
->cached_errno
= errno
;
176 return MEMCACHED_CONNECTION_FAILURE
;
178 else if ((flags
& O_NONBLOCK
) == 0)
184 rval
= fcntl(ptr
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
186 while (rval
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
188 unlikely (rval
== -1)
190 ptr
->cached_errno
= errno
;
191 return MEMCACHED_CONNECTION_FAILURE
;
195 return MEMCACHED_SUCCESS
;
198 static memcached_return_t
set_socket_options(memcached_server_st
*ptr
)
200 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
202 if (ptr
->type
== MEMCACHED_CONNECTION_UDP
)
203 return MEMCACHED_SUCCESS
;
206 if (ptr
->root
->snd_timeout
)
209 struct timeval waittime
;
212 waittime
.tv_usec
= ptr
->root
->snd_timeout
;
214 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
215 &waittime
, (socklen_t
)sizeof(struct timeval
));
216 WATCHPOINT_ASSERT(error
== 0);
218 return MEMCACHED_FAILURE
;
223 if (ptr
->root
->rcv_timeout
)
226 struct timeval waittime
;
229 waittime
.tv_usec
= ptr
->root
->rcv_timeout
;
231 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
232 &waittime
, (socklen_t
)sizeof(struct timeval
));
233 WATCHPOINT_ASSERT(error
== 0);
235 return MEMCACHED_FAILURE
;
240 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
243 int error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
245 // This is not considered a fatal error
248 WATCHPOINT_ERRNO(get_socket_errno());
249 perror("setsockopt(SO_NOSIGPIPE)");
254 if (ptr
->root
->flags
.no_block
)
257 struct linger linger
;
260 linger
.l_linger
= 0; /* By default on close() just drop the socket */
261 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_LINGER
,
262 &linger
, (socklen_t
)sizeof(struct linger
));
263 WATCHPOINT_ASSERT(error
== 0);
265 return MEMCACHED_FAILURE
;
268 if (ptr
->root
->flags
.tcp_nodelay
)
273 error
= setsockopt(ptr
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
274 &flag
, (socklen_t
)sizeof(int));
275 WATCHPOINT_ASSERT(error
== 0);
277 return MEMCACHED_FAILURE
;
280 if (ptr
->root
->flags
.tcp_keepalive
)
285 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
286 &flag
, (socklen_t
)sizeof(int));
287 WATCHPOINT_ASSERT(error
== 0);
289 return MEMCACHED_FAILURE
;
293 if (ptr
->root
->tcp_keepidle
> 0)
297 error
= setsockopt(ptr
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
298 &ptr
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
299 WATCHPOINT_ASSERT(error
== 0);
301 return MEMCACHED_FAILURE
;
305 if (ptr
->root
->send_size
> 0)
309 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_SNDBUF
,
310 &ptr
->root
->send_size
, (socklen_t
)sizeof(int));
311 WATCHPOINT_ASSERT(error
== 0);
313 return MEMCACHED_FAILURE
;
316 if (ptr
->root
->recv_size
> 0)
320 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_RCVBUF
,
321 &ptr
->root
->recv_size
, (socklen_t
)sizeof(int));
322 WATCHPOINT_ASSERT(error
== 0);
324 return MEMCACHED_FAILURE
;
328 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
329 return set_socket_nonblocking(ptr
);
332 static memcached_return_t
unix_socket_connect(memcached_server_st
*ptr
)
335 struct sockaddr_un servAddr
;
337 WATCHPOINT_ASSERT(ptr
->fd
== -1);
339 if ((ptr
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
341 ptr
->cached_errno
= errno
;
342 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE
;
345 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
346 servAddr
.sun_family
= AF_UNIX
;
347 strncpy(servAddr
.sun_path
, ptr
->hostname
, sizeof(servAddr
.sun_path
)); /* Copy filename */
351 (struct sockaddr
*)&servAddr
,
352 sizeof(servAddr
)) < 0)
360 case EISCONN
: /* We were spinning waiting on connect */
363 WATCHPOINT_ERRNO(errno
);
364 ptr
->cached_errno
= errno
;
365 return MEMCACHED_ERRNO
;
369 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
371 return MEMCACHED_SUCCESS
;
374 return MEMCACHED_NOT_SUPPORTED
;
378 static memcached_return_t
network_connect(memcached_server_st
*ptr
)
380 bool timeout_error_occured
= false;
382 WATCHPOINT_ASSERT(ptr
->fd
== INVALID_SOCKET
);
383 WATCHPOINT_ASSERT(ptr
->cursor_active
== 0);
385 if (! ptr
->address_info
)
387 memcached_return_t rc
= set_hostinfo(ptr
);
388 if (rc
!= MEMCACHED_SUCCESS
)
392 /* Create the socket */
393 while (ptr
->address_info_next
&& ptr
->fd
== INVALID_SOCKET
)
395 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
396 if (ptr
->type
== MEMCACHED_CONNECTION_UDP
&& ptr
->address_info_next
->ai_family
!= AF_INET
)
398 ptr
->address_info_next
= ptr
->address_info_next
->ai_next
;
402 if ((ptr
->fd
= socket(ptr
->address_info_next
->ai_family
,
403 ptr
->address_info_next
->ai_socktype
,
404 ptr
->address_info_next
->ai_protocol
)) < 0)
406 ptr
->cached_errno
= get_socket_errno();
407 WATCHPOINT_ERRNO(get_socket_errno());
408 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE
;
411 (void)set_socket_options(ptr
);
413 /* connect to server */
414 if ((connect(ptr
->fd
, ptr
->address_info_next
->ai_addr
, ptr
->address_info_next
->ai_addrlen
) != SOCKET_ERROR
))
419 /* An error occurred */
420 ptr
->cached_errno
= get_socket_errno();
421 switch (ptr
->cached_errno
)
424 case EINPROGRESS
: // nonblocking mode - first return
425 case EALREADY
: // nonblocking mode - subsequent returns
427 memcached_return_t rc
;
428 rc
= connect_poll(ptr
);
430 if (rc
== MEMCACHED_TIMEOUT
)
431 timeout_error_occured
= true;
433 if (rc
== MEMCACHED_SUCCESS
)
437 case EISCONN
: // we are connected :-)
440 case EINTR
: // Special case, we retry ai_addr
441 (void)closesocket(ptr
->fd
);
442 ptr
->fd
= INVALID_SOCKET
;
446 (void)closesocket(ptr
->fd
);
447 ptr
->fd
= INVALID_SOCKET
;
448 ptr
->address_info_next
= ptr
->address_info_next
->ai_next
;
453 if (ptr
->fd
== INVALID_SOCKET
)
455 WATCHPOINT_STRING("Never got a good file descriptor");
457 /* Failed to connect. schedule next retry */
458 if (ptr
->root
->retry_timeout
)
460 struct timeval next_time
;
462 if (gettimeofday(&next_time
, NULL
) == 0)
463 ptr
->next_retry
= next_time
.tv_sec
+ ptr
->root
->retry_timeout
;
466 if (timeout_error_occured
)
467 return MEMCACHED_TIMEOUT
;
469 return MEMCACHED_ERRNO
; /* The last error should be from connect() */
472 return MEMCACHED_SUCCESS
; /* The last error should be from connect() */
475 void set_last_disconnected_host(memcached_server_write_instance_st ptr
)
478 memcached_st
*root
= (memcached_st
*)ptr
->root
;
481 WATCHPOINT_STRING(ptr
->hostname
);
482 WATCHPOINT_NUMBER(ptr
->port
);
483 WATCHPOINT_ERRNO(ptr
->cached_errno
);
485 if (root
->last_disconnected_server
)
486 memcached_server_free(root
->last_disconnected_server
);
487 root
->last_disconnected_server
= memcached_server_clone(NULL
, ptr
);
490 memcached_return_t
memcached_connect(memcached_server_write_instance_st ptr
)
492 memcached_return_t rc
= MEMCACHED_NO_SERVERS
;
494 if (ptr
->fd
!= INVALID_SOCKET
)
495 return MEMCACHED_SUCCESS
;
497 LIBMEMCACHED_MEMCACHED_CONNECT_START();
499 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
500 WATCHPOINT_ASSERT(ptr
->root
);
501 if (ptr
->root
->retry_timeout
&& ptr
->next_retry
)
503 struct timeval curr_time
;
505 gettimeofday(&curr_time
, NULL
);
507 // We should optimize this to remove the allocation if the server was
508 // the last server to die
509 if (ptr
->next_retry
> curr_time
.tv_sec
)
511 set_last_disconnected_host(ptr
);
513 return MEMCACHED_SERVER_MARKED_DEAD
;
517 // If we are over the counter failure, we just fail. Reject host only
518 // works if you have a set number of failures.
519 if (ptr
->root
->server_failure_limit
&& ptr
->server_failure_counter
>= ptr
->root
->server_failure_limit
)
521 set_last_disconnected_host(ptr
);
523 // @todo fix this by fixing behavior to no longer make use of
525 if (_is_auto_eject_host(ptr
->root
))
527 run_distribution((memcached_st
*)ptr
->root
);
530 return MEMCACHED_SERVER_MARKED_DEAD
;
533 /* We need to clean up the multi startup piece */
536 case MEMCACHED_CONNECTION_UNKNOWN
:
537 WATCHPOINT_ASSERT(0);
538 rc
= MEMCACHED_NOT_SUPPORTED
;
540 case MEMCACHED_CONNECTION_UDP
:
541 case MEMCACHED_CONNECTION_TCP
:
542 rc
= network_connect(ptr
);
543 #ifdef LIBMEMCACHED_WITH_SASL_SUPPORT
544 if (ptr
->fd
!= INVALID_SOCKET
&& ptr
->root
->sasl
.callbacks
)
546 rc
= memcached_sasl_authenticate_connection(ptr
);
547 if (rc
!= MEMCACHED_SUCCESS
)
549 (void)closesocket(ptr
->fd
);
550 ptr
->fd
= INVALID_SOCKET
;
555 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
556 rc
= unix_socket_connect(ptr
);
558 case MEMCACHED_CONNECTION_MAX
:
560 WATCHPOINT_ASSERT(0);
563 if (rc
== MEMCACHED_SUCCESS
)
565 ptr
->server_failure_counter
= 0;
570 ptr
->server_failure_counter
++;
572 set_last_disconnected_host(ptr
);
575 LIBMEMCACHED_MEMCACHED_CONNECT_END();