#include "common.h"
+#include <netdb.h>
#include <poll.h>
#include <sys/time.h>
-static memcached_return set_hostinfo(memcached_server_st *server)
+static memcached_return_t set_hostinfo(memcached_server_st *server)
{
struct addrinfo *ai;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
- hints.ai_family= AF_INET;
+ // hints.ai_family= AF_INET;
if (server->type == MEMCACHED_CONNECTION_UDP)
{
hints.ai_protocol= IPPROTO_UDP;
}
if (server->address_info)
+ {
freeaddrinfo(server->address_info);
+ server->address_info= NULL;
+ }
server->address_info= ai;
return MEMCACHED_SUCCESS;
}
-static memcached_return set_socket_options(memcached_server_st *ptr)
+static memcached_return_t set_socket_options(memcached_server_st *ptr)
{
+ WATCHPOINT_ASSERT(ptr->fd != -1);
+
if (ptr->type == MEMCACHED_CONNECTION_UDP)
return MEMCACHED_SUCCESS;
+#ifdef HAVE_SNDTIMEO
if (ptr->root->snd_timeout)
{
int error;
waittime.tv_sec= 0;
waittime.tv_usec= ptr->root->snd_timeout;
- error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
&waittime, (socklen_t)sizeof(struct timeval));
WATCHPOINT_ASSERT(error == 0);
}
+#endif
+#ifdef HAVE_RCVTIMEO
if (ptr->root->rcv_timeout)
{
int error;
waittime.tv_sec= 0;
waittime.tv_usec= ptr->root->rcv_timeout;
- error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
&waittime, (socklen_t)sizeof(struct timeval));
WATCHPOINT_ASSERT(error == 0);
}
+#endif
+ if (ptr->root->flags.no_block)
{
int error;
struct linger linger;
- linger.l_onoff= 1;
- linger.l_linger= MEMCACHED_DEFAULT_TIMEOUT;
- error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
+ linger.l_onoff= 1;
+ linger.l_linger= 0; /* By default on close() just drop the socket */
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
&linger, (socklen_t)sizeof(struct linger));
WATCHPOINT_ASSERT(error == 0);
}
- if (ptr->root->flags & MEM_TCP_NODELAY)
+ if (ptr->root->flags.tcp_nodelay)
{
int flag= 1;
int error;
- error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
+ error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
&flag, (socklen_t)sizeof(int));
WATCHPOINT_ASSERT(error == 0);
}
{
int error;
- error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
&ptr->root->send_size, (socklen_t)sizeof(int));
WATCHPOINT_ASSERT(error == 0);
}
{
int error;
- error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
&ptr->root->recv_size, (socklen_t)sizeof(int));
WATCHPOINT_ASSERT(error == 0);
}
- /* For the moment, not getting a nonblocking mode will not be fatal */
- if (ptr->root->flags & MEM_NO_BLOCK)
- {
- int flags;
+ /* libmemcached will always use nonblocking IO to avoid write deadlocks */
+ int flags;
+ do
flags= fcntl(ptr->fd, F_GETFL, 0);
- unlikely (flags != -1)
- {
- (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
- }
+ while (flags == -1 && (errno == EINTR || errno == EAGAIN));
+
+ unlikely (flags == -1)
+ return MEMCACHED_CONNECTION_FAILURE;
+ else if ((flags & O_NONBLOCK) == 0)
+ {
+ int rval;
+
+ do
+ rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
+ while (rval == -1 && (errno == EINTR || errno == EAGAIN));
+
+ unlikely (rval == -1)
+ return MEMCACHED_CONNECTION_FAILURE;
}
return MEMCACHED_SUCCESS;
}
-static memcached_return unix_socket_connect(memcached_server_st *ptr)
+static memcached_return_t unix_socket_connect(memcached_server_st *ptr)
{
struct sockaddr_un servAddr;
socklen_t addrlen;
servAddr.sun_family= AF_UNIX;
strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
- addrlen= strlen(servAddr.sun_path) + sizeof(servAddr.sun_family);
+ addrlen= (socklen_t) (strlen(servAddr.sun_path) + sizeof(servAddr.sun_family));
test_connect:
- if (connect(ptr->fd,
+ if (connect(ptr->fd,
(struct sockaddr *)&servAddr,
sizeof(servAddr)) < 0)
{
- switch (errno) {
+ switch (errno)
+ {
case EINPROGRESS:
case EALREADY:
case EINTR:
}
}
}
+
+ WATCHPOINT_ASSERT(ptr->fd != -1);
return MEMCACHED_SUCCESS;
}
-static memcached_return network_connect(memcached_server_st *ptr)
+static memcached_return_t network_connect(memcached_server_st *ptr)
{
if (ptr->fd == -1)
{
struct addrinfo *use;
- if(ptr->root->server_failure_limit != 0) {
- if(ptr->server_failure_counter >= ptr->root->server_failure_limit) {
- memcached_server_remove(ptr);
- }
- }
- /* Old connection junk still is in the structure */
- WATCHPOINT_ASSERT(ptr->cursor_active == 0);
-
- if (ptr->sockaddr_inited == MEMCACHED_NOT_ALLOCATED ||
- (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS)))
+ if (!ptr->sockaddr_inited ||
+ (!(ptr->root->flags.use_cache_lookups)))
{
- memcached_return rc;
+ memcached_return_t rc;
rc= set_hostinfo(ptr);
if (rc != MEMCACHED_SUCCESS)
return rc;
- ptr->sockaddr_inited= MEMCACHED_ALLOCATED;
+ ptr->sockaddr_inited= true;
}
use= ptr->address_info;
/* Create the socket */
while (use != NULL)
{
- if ((ptr->fd= socket(use->ai_family,
- use->ai_socktype,
+ /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
+ if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET)
+ {
+ use= use->ai_next;
+ continue;
+ }
+
+ if ((ptr->fd= socket(use->ai_family,
+ use->ai_socktype,
use->ai_protocol)) < 0)
{
ptr->cached_errno= errno;
(void)set_socket_options(ptr);
/* connect to server */
-test_connect:
- if (connect(ptr->fd,
- use->ai_addr,
- use->ai_addrlen) < 0)
+ while (ptr->fd != -1 &&
+ connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
{
- switch (errno) {
- /* We are spinning waiting on connect */
- case EALREADY:
- case EINPROGRESS:
+ ptr->cached_errno= errno;
+ if (errno == EINPROGRESS || /* nonblocking mode - first return, */
+ errno == EALREADY) /* nonblocking mode - subsequent returns */
+ {
+ struct pollfd fds[1];
+ fds[0].fd = ptr->fd;
+ fds[0].events = POLLOUT;
+ int error= poll(fds, 1, ptr->root->connect_timeout);
+
+ if (error != 1 || fds[0].revents & POLLERR)
{
- struct pollfd fds[1];
- int error;
-
- memset(&fds, 0, sizeof(struct pollfd));
- fds[0].fd= ptr->fd;
- fds[0].events= POLLOUT | POLLERR;
- error= poll(fds, 1, ptr->root->connect_timeout);
-
- if (error == 0)
+ if (fds[0].revents & POLLERR)
{
- goto handle_retry;
+ int err;
+ socklen_t len = sizeof (err);
+ (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
+ ptr->cached_errno= (err == 0) ? errno : err;
}
- else if (error != 1 || fds[0].revents & POLLERR)
- {
- ptr->cached_errno= errno;
- WATCHPOINT_ERRNO(ptr->cached_errno);
- WATCHPOINT_NUMBER(ptr->root->connect_timeout);
- close(ptr->fd);
- ptr->fd= -1;
- if (ptr->address_info)
- {
- freeaddrinfo(ptr->address_info);
- ptr->address_info= NULL;
- }
-
- if (ptr->root->retry_timeout)
- {
- struct timeval next_time;
- gettimeofday(&next_time, NULL);
- ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
+ (void)close(ptr->fd);
+ ptr->fd= -1;
}
- ptr->server_failure_counter+= 1;
- return MEMCACHED_ERRNO;
- }
-
- break;
- }
- /* We are spinning waiting on connect */
- case EINTR:
- goto test_connect;
- case EISCONN: /* We were spinning waiting on connect */
+ }
+ else if (errno == EISCONN) /* we are connected :-) */
+ {
break;
- default:
-handle_retry:
- ptr->cached_errno= errno;
- close(ptr->fd);
+ }
+ else if (errno != EINTR)
+ {
+ (void)close(ptr->fd);
ptr->fd= -1;
- if (ptr->root->retry_timeout)
- {
- struct timeval next_time;
-
- gettimeofday(&next_time, NULL);
- ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
- }
+ break;
}
}
- else
+
+ if (ptr->fd != -1)
{
WATCHPOINT_ASSERT(ptr->cursor_active == 0);
ptr->server_failure_counter= 0;
}
}
- if (ptr->fd == -1) {
- ptr->server_failure_counter+= 1;
+ if (ptr->fd == -1)
+ {
+ /* Failed to connect. schedule next retry */
+ if (ptr->root->retry_timeout)
+ {
+ struct timeval next_time;
+
+ if (gettimeofday(&next_time, NULL) == 0)
+ ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
+ }
+ ptr->server_failure_counter++;
+ if (ptr->cached_errno == 0)
+ return MEMCACHED_TIMEOUT;
+
return MEMCACHED_ERRNO; /* The last error should be from connect() */
}
}
-memcached_return memcached_connect(memcached_server_st *ptr)
+memcached_return_t memcached_connect(memcached_server_st *ptr)
{
- memcached_return rc= MEMCACHED_NO_SERVERS;
+ memcached_return_t rc= MEMCACHED_NO_SERVERS;
LIBMEMCACHED_MEMCACHED_CONNECT_START();
- if (ptr->root->retry_timeout)
+ /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
+ WATCHPOINT_ASSERT(ptr->root);
+ if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
{
- struct timeval next_time;
+ struct timeval curr_time;
- gettimeofday(&next_time, NULL);
- if (next_time.tv_sec < ptr->next_retry)
- return MEMCACHED_TIMEOUT;
+ gettimeofday(&curr_time, NULL);
+
+ /* if we've had too many consecutive errors on this server, mark it dead. */
+ if (ptr->server_failure_counter >= ptr->root->server_failure_limit)
+ {
+ ptr->next_retry= curr_time.tv_sec + ptr->root->retry_timeout;
+ ptr->server_failure_counter= 0;
+ }
+
+ if (curr_time.tv_sec < ptr->next_retry)
+ {
+ if (memcached_behavior_get(ptr->root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
+ run_distribution(ptr->root);
+
+ ptr->root->last_disconnected_server = ptr;
+ return MEMCACHED_SERVER_MARKED_DEAD;
+ }
}
+
/* We need to clean up the multi startup piece */
switch (ptr->type)
{
WATCHPOINT_ASSERT(0);
}
+ unlikely ( rc != MEMCACHED_SUCCESS) ptr->root->last_disconnected_server = ptr;
+
LIBMEMCACHED_MEMCACHED_CONNECT_END();
return rc;