2 * Copyright (C) 2006-2010 Brian Aker
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
8 * Summary: Server IO, Not public!
16 static memcached_return_t
connect_poll(memcached_server_st
*ptr
)
20 fds
[0].events
= POLLOUT
;
22 int timeout
= ptr
->root
->connect_timeout
;
23 if (ptr
->root
->flags
.no_block
== true)
29 while (--loop_max
) // Should only loop on cases of ERESTART or EINTR
31 error
= poll(fds
, 1, timeout
);
38 socklen_t len
= sizeof (err
);
39 (void)getsockopt(ptr
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
41 // We check the value to see what happened wth the socket.
44 return MEMCACHED_SUCCESS
;
48 ptr
->cached_errno
= errno
;
50 return MEMCACHED_ERRNO
;
54 return MEMCACHED_TIMEOUT
;
55 default: // A real error occurred and we need to completely bail
56 WATCHPOINT_ERRNO(get_socket_errno());
57 switch (get_socket_errno())
59 #ifdef TARGET_OS_LINUX
65 if (fds
[0].revents
& POLLERR
)
68 socklen_t len
= sizeof (err
);
69 (void)getsockopt(ptr
->fd
, SOL_SOCKET
, SO_ERROR
, &err
, &len
);
70 ptr
->cached_errno
= (err
== 0) ? get_socket_errno() : err
;
74 ptr
->cached_errno
= get_socket_errno();
77 (void)closesocket(ptr
->fd
);
78 ptr
->fd
= INVALID_SOCKET
;
80 return MEMCACHED_ERRNO
;
85 // This should only be possible from ERESTART or EINTR;
86 ptr
->cached_errno
= get_socket_errno();
88 return MEMCACHED_ERRNO
;
91 static memcached_return_t
set_hostinfo(memcached_server_st
*server
)
94 struct addrinfo hints
;
95 char str_port
[NI_MAXSERV
];
98 int length
= snprintf(str_port
, NI_MAXSERV
, "%u", (uint32_t)server
->port
);
99 if (length
>= NI_MAXSERV
|| length
< 0)
100 return MEMCACHED_FAILURE
;
102 memset(&hints
, 0, sizeof(hints
));
104 // hints.ai_family= AF_INET;
105 if (server
->type
== MEMCACHED_CONNECTION_UDP
)
107 hints
.ai_protocol
= IPPROTO_UDP
;
108 hints
.ai_socktype
= SOCK_DGRAM
;
112 hints
.ai_socktype
= SOCK_STREAM
;
113 hints
.ai_protocol
= IPPROTO_TCP
;
118 int e
= getaddrinfo(server
->hostname
, str_port
, &hints
, &ai
);
124 else if (e
== EAI_AGAIN
)
127 struct timespec dream
, rem
;
132 nanosleep(&dream
, &rem
);
138 WATCHPOINT_STRING(server
->hostname
);
139 WATCHPOINT_STRING(gai_strerror(e
));
140 return MEMCACHED_HOST_LOOKUP_FAILURE
;
144 if (server
->address_info
)
146 freeaddrinfo(server
->address_info
);
147 server
->address_info
= NULL
;
149 server
->address_info
= ai
;
151 return MEMCACHED_SUCCESS
;
154 static inline memcached_return_t
set_socket_nonblocking(memcached_server_st
*ptr
)
158 if (ioctlsocket(ptr
->fd
, FIONBIO
, &arg
) == SOCKET_ERROR
)
160 ptr
->cached_errno
= get_socket_errno();
161 return MEMCACHED_CONNECTION_FAILURE
;
167 flags
= fcntl(ptr
->fd
, F_GETFL
, 0);
168 while (flags
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
170 unlikely (flags
== -1)
172 ptr
->cached_errno
= errno
;
173 return MEMCACHED_CONNECTION_FAILURE
;
175 else if ((flags
& O_NONBLOCK
) == 0)
180 rval
= fcntl(ptr
->fd
, F_SETFL
, flags
| O_NONBLOCK
);
181 while (rval
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
183 unlikely (rval
== -1)
185 ptr
->cached_errno
= errno
;
186 return MEMCACHED_CONNECTION_FAILURE
;
190 return MEMCACHED_SUCCESS
;
193 static memcached_return_t
set_socket_options(memcached_server_st
*ptr
)
195 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
197 if (ptr
->type
== MEMCACHED_CONNECTION_UDP
)
198 return MEMCACHED_SUCCESS
;
201 if (ptr
->root
->snd_timeout
)
204 struct timeval waittime
;
207 waittime
.tv_usec
= ptr
->root
->snd_timeout
;
209 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_SNDTIMEO
,
210 &waittime
, (socklen_t
)sizeof(struct timeval
));
211 WATCHPOINT_ASSERT(error
== 0);
213 return MEMCACHED_FAILURE
;
218 if (ptr
->root
->rcv_timeout
)
221 struct timeval waittime
;
224 waittime
.tv_usec
= ptr
->root
->rcv_timeout
;
226 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_RCVTIMEO
,
227 &waittime
, (socklen_t
)sizeof(struct timeval
));
228 WATCHPOINT_ASSERT(error
== 0);
230 return MEMCACHED_FAILURE
;
235 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
238 int error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void *)&set
, sizeof(int));
240 // This is not considered a fatal error
243 WATCHPOINT_ERRNO(get_socket_errno());
244 perror("setsockopt(SO_NOSIGPIPE)");
249 if (ptr
->root
->flags
.no_block
)
252 struct linger linger
;
255 linger
.l_linger
= 0; /* By default on close() just drop the socket */
256 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_LINGER
,
257 &linger
, (socklen_t
)sizeof(struct linger
));
258 WATCHPOINT_ASSERT(error
== 0);
260 return MEMCACHED_FAILURE
;
263 if (ptr
->root
->flags
.tcp_nodelay
)
268 error
= setsockopt(ptr
->fd
, IPPROTO_TCP
, TCP_NODELAY
,
269 &flag
, (socklen_t
)sizeof(int));
270 WATCHPOINT_ASSERT(error
== 0);
272 return MEMCACHED_FAILURE
;
275 if (ptr
->root
->flags
.tcp_keepalive
)
280 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_KEEPALIVE
,
281 &flag
, (socklen_t
)sizeof(int));
282 WATCHPOINT_ASSERT(error
== 0);
284 return MEMCACHED_FAILURE
;
288 if (ptr
->root
->tcp_keepidle
> 0)
292 error
= setsockopt(ptr
->fd
, IPPROTO_TCP
, TCP_KEEPIDLE
,
293 &ptr
->root
->tcp_keepidle
, (socklen_t
)sizeof(int));
294 WATCHPOINT_ASSERT(error
== 0);
296 return MEMCACHED_FAILURE
;
300 if (ptr
->root
->send_size
> 0)
304 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_SNDBUF
,
305 &ptr
->root
->send_size
, (socklen_t
)sizeof(int));
306 WATCHPOINT_ASSERT(error
== 0);
308 return MEMCACHED_FAILURE
;
311 if (ptr
->root
->recv_size
> 0)
315 error
= setsockopt(ptr
->fd
, SOL_SOCKET
, SO_RCVBUF
,
316 &ptr
->root
->recv_size
, (socklen_t
)sizeof(int));
317 WATCHPOINT_ASSERT(error
== 0);
319 return MEMCACHED_FAILURE
;
323 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
324 return set_socket_nonblocking(ptr
);
327 static memcached_return_t
unix_socket_connect(memcached_server_st
*ptr
)
330 struct sockaddr_un servAddr
;
332 WATCHPOINT_ASSERT(ptr
->fd
== -1);
334 if ((ptr
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) < 0)
336 ptr
->cached_errno
= errno
;
337 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE
;
340 memset(&servAddr
, 0, sizeof (struct sockaddr_un
));
341 servAddr
.sun_family
= AF_UNIX
;
342 strcpy(servAddr
.sun_path
, ptr
->hostname
); /* Copy filename */
346 (struct sockaddr
*)&servAddr
,
347 sizeof(servAddr
)) < 0)
355 case EISCONN
: /* We were spinning waiting on connect */
358 WATCHPOINT_ERRNO(errno
);
359 ptr
->cached_errno
= errno
;
360 return MEMCACHED_ERRNO
;
364 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
366 return MEMCACHED_SUCCESS
;
369 return MEMCACHED_NOT_SUPPORTED
;
373 static memcached_return_t
network_connect(memcached_server_st
*ptr
)
375 bool timeout_error_occured
= false;
378 WATCHPOINT_ASSERT(ptr
->fd
== INVALID_SOCKET
);
379 WATCHPOINT_ASSERT(ptr
->cursor_active
== 0);
381 if (! ptr
->options
.sockaddr_inited
|| (!(ptr
->root
->flags
.use_cache_lookups
)))
383 memcached_return_t rc
;
385 rc
= set_hostinfo(ptr
);
386 if (rc
!= MEMCACHED_SUCCESS
)
388 ptr
->options
.sockaddr_inited
= true;
391 struct addrinfo
*use
= ptr
->address_info
;
392 /* Create the socket */
395 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
396 if (ptr
->type
== MEMCACHED_CONNECTION_UDP
&& use
->ai_family
!= AF_INET
)
402 if ((ptr
->fd
= socket(use
->ai_family
,
404 use
->ai_protocol
)) < 0)
406 ptr
->cached_errno
= get_socket_errno();
407 WATCHPOINT_ERRNO(get_socket_errno());
408 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE
;
411 (void)set_socket_options(ptr
);
413 /* connect to server */
414 if ((connect(ptr
->fd
, use
->ai_addr
, use
->ai_addrlen
) != SOCKET_ERROR
))
419 /* An error occurred */
420 ptr
->cached_errno
= get_socket_errno();
421 if (ptr
->cached_errno
== EWOULDBLOCK
||
422 ptr
->cached_errno
== EINPROGRESS
|| /* nonblocking mode - first return, */
423 ptr
->cached_errno
== EALREADY
) /* nonblocking mode - subsequent returns */
425 memcached_return_t rc
;
426 rc
= connect_poll(ptr
);
428 if (rc
== MEMCACHED_TIMEOUT
)
429 timeout_error_occured
= true;
431 if (rc
== MEMCACHED_SUCCESS
)
434 else if (get_socket_errno() == EISCONN
) /* we are connected :-) */
438 else if (get_socket_errno() == EINTR
) // Special case, we retry ai_addr
440 (void)closesocket(ptr
->fd
);
441 ptr
->fd
= INVALID_SOCKET
;
445 (void)closesocket(ptr
->fd
);
446 ptr
->fd
= INVALID_SOCKET
;
450 if (ptr
->fd
== INVALID_SOCKET
)
452 WATCHPOINT_STRING("Never got a good file descriptor");
454 /* Failed to connect. schedule next retry */
455 if (ptr
->root
->retry_timeout
)
457 struct timeval next_time
;
459 if (gettimeofday(&next_time
, NULL
) == 0)
460 ptr
->next_retry
= next_time
.tv_sec
+ ptr
->root
->retry_timeout
;
463 if (timeout_error_occured
)
464 return MEMCACHED_TIMEOUT
;
466 return MEMCACHED_ERRNO
; /* The last error should be from connect() */
469 return MEMCACHED_SUCCESS
; /* The last error should be from connect() */
472 void set_last_disconnected_host(memcached_server_write_instance_st ptr
)
475 memcached_st
*root
= (memcached_st
*)ptr
->root
;
478 WATCHPOINT_STRING(ptr
->hostname
);
479 WATCHPOINT_NUMBER(ptr
->port
);
480 WATCHPOINT_ERRNO(ptr
->cached_errno
);
482 if (root
->last_disconnected_server
)
483 memcached_server_free(root
->last_disconnected_server
);
484 root
->last_disconnected_server
= memcached_server_clone(NULL
, ptr
);
487 memcached_return_t
memcached_connect(memcached_server_write_instance_st ptr
)
489 memcached_return_t rc
= MEMCACHED_NO_SERVERS
;
491 if (ptr
->fd
!= INVALID_SOCKET
)
492 return MEMCACHED_SUCCESS
;
494 LIBMEMCACHED_MEMCACHED_CONNECT_START();
496 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
497 WATCHPOINT_ASSERT(ptr
->root
);
498 if (ptr
->root
->retry_timeout
&& ptr
->next_retry
)
500 struct timeval curr_time
;
502 gettimeofday(&curr_time
, NULL
);
504 // We should optimize this to remove the allocation if the server was
505 // the last server to die
506 if (ptr
->next_retry
> curr_time
.tv_sec
)
508 set_last_disconnected_host(ptr
);
510 return MEMCACHED_SERVER_MARKED_DEAD
;
514 // If we are over the counter failure, we just fail. Reject host only
515 // works if you have a set number of failures.
516 if (ptr
->root
->server_failure_limit
&& ptr
->server_failure_counter
>= ptr
->root
->server_failure_limit
)
518 set_last_disconnected_host(ptr
);
520 // @todo fix this by fixing behavior to no longer make use of
522 if (_is_auto_eject_host(ptr
->root
))
524 run_distribution((memcached_st
*)ptr
->root
);
527 return MEMCACHED_SERVER_MARKED_DEAD
;
530 /* We need to clean up the multi startup piece */
533 case MEMCACHED_CONNECTION_UNKNOWN
:
534 WATCHPOINT_ASSERT(0);
535 rc
= MEMCACHED_NOT_SUPPORTED
;
537 case MEMCACHED_CONNECTION_UDP
:
538 case MEMCACHED_CONNECTION_TCP
:
539 rc
= network_connect(ptr
);
540 #ifdef LIBMEMCACHED_WITH_SASL_SUPPORT
541 if (ptr
->fd
!= INVALID_SOCKET
&& ptr
->root
->sasl
.callbacks
)
543 rc
= memcached_sasl_authenticate_connection(ptr
);
544 if (rc
!= MEMCACHED_SUCCESS
)
546 (void)closesocket(ptr
->fd
);
547 ptr
->fd
= INVALID_SOCKET
;
552 case MEMCACHED_CONNECTION_UNIX_SOCKET
:
553 rc
= unix_socket_connect(ptr
);
555 case MEMCACHED_CONNECTION_MAX
:
557 WATCHPOINT_ASSERT(0);
560 if (rc
== MEMCACHED_SUCCESS
)
562 ptr
->server_failure_counter
= 0;
567 ptr
->server_failure_counter
++;
569 set_last_disconnected_host(ptr
);
572 LIBMEMCACHED_MEMCACHED_CONNECT_END();