X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fmemcached_connect.c;h=24328f4c4bb7c244b2bc358faeef953317af7207;hb=fa11f4a5d1a9c92eda13f8a5aa11dcff542b8815;hp=7edcce8565925da6cdc63a53e4ab161823eb45ed;hpb=1c1fa16311aeaad3cb84bf6da3c403b9963ddc44;p=m6w6%2Flibmemcached diff --git a/libmemcached/memcached_connect.c b/libmemcached/memcached_connect.c index 7edcce85..24328f4c 100644 --- a/libmemcached/memcached_connect.c +++ b/libmemcached/memcached_connect.c @@ -1,7 +1,9 @@ #include "common.h" +#include #include +#include -static memcached_return set_hostinfo(memcached_server_st *server) +static memcached_return_t set_hostinfo(memcached_server_st *server) { struct addrinfo *ai; struct addrinfo hints; @@ -12,7 +14,7 @@ static memcached_return set_hostinfo(memcached_server_st *server) memset(&hints, 0, sizeof(hints)); - hints.ai_family= AF_INET; + // hints.ai_family= AF_INET; if (server->type == MEMCACHED_CONNECTION_UDP) { hints.ai_protocol= IPPROTO_UDP; @@ -33,47 +35,70 @@ static memcached_return set_hostinfo(memcached_server_st *server) } if (server->address_info) + { freeaddrinfo(server->address_info); + server->address_info= NULL; + } server->address_info= ai; return MEMCACHED_SUCCESS; } -static memcached_return set_socket_options(memcached_server_st *ptr) +static memcached_return_t set_socket_options(memcached_server_st *ptr) { + WATCHPOINT_ASSERT(ptr->fd != -1); + if (ptr->type == MEMCACHED_CONNECTION_UDP) return MEMCACHED_SUCCESS; - if (ptr->root->flags & MEM_NO_BLOCK) +#ifdef HAVE_SNDTIMEO + if (ptr->root->snd_timeout) { int error; - struct linger linger; struct timeval waittime; - waittime.tv_sec= 10; - waittime.tv_usec= 0; + waittime.tv_sec= 0; + waittime.tv_usec= ptr->root->snd_timeout; - linger.l_onoff= 1; - linger.l_linger= MEMCACHED_DEFAULT_TIMEOUT; - error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER, - &linger, (socklen_t)sizeof(struct linger)); + error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO, + &waittime, (socklen_t)sizeof(struct timeval)); WATCHPOINT_ASSERT(error == 0); + } +#endif - error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO, +#ifdef HAVE_RCVTIMEO + if (ptr->root->rcv_timeout) + { + int error; + struct timeval waittime; + + waittime.tv_sec= 0; + waittime.tv_usec= ptr->root->rcv_timeout; + + error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO, &waittime, (socklen_t)sizeof(struct timeval)); WATCHPOINT_ASSERT(error == 0); + } +#endif - error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO, - &waittime, (socklen_t)sizeof(struct timeval)); + if (ptr->root->flags.no_block) + { + int error; + struct linger linger; + + linger.l_onoff= 1; + linger.l_linger= 0; /* By default on close() just drop the socket */ + error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER, + &linger, (socklen_t)sizeof(struct linger)); WATCHPOINT_ASSERT(error == 0); } - if (ptr->root->flags & MEM_TCP_NODELAY) + if (ptr->root->flags.tcp_nodelay) { int flag= 1; int error; - error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY, + error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY, &flag, (socklen_t)sizeof(int)); WATCHPOINT_ASSERT(error == 0); } @@ -82,7 +107,7 @@ static memcached_return set_socket_options(memcached_server_st *ptr) { int error; - error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF, + error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF, &ptr->root->send_size, (socklen_t)sizeof(int)); WATCHPOINT_ASSERT(error == 0); } @@ -91,27 +116,36 @@ static memcached_return set_socket_options(memcached_server_st *ptr) { int error; - error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF, + error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF, &ptr->root->recv_size, (socklen_t)sizeof(int)); WATCHPOINT_ASSERT(error == 0); } - /* For the moment, not getting a nonblocking mode will not be fatal */ - if (ptr->root->flags & MEM_NO_BLOCK) - { - int flags; + /* libmemcached will always use nonblocking IO to avoid write deadlocks */ + int flags; + do flags= fcntl(ptr->fd, F_GETFL, 0); - unlikely (flags != -1) - { - (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK); - } + while (flags == -1 && (errno == EINTR || errno == EAGAIN)); + + unlikely (flags == -1) + return MEMCACHED_CONNECTION_FAILURE; + else if ((flags & O_NONBLOCK) == 0) + { + int rval; + + do + rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK); + while (rval == -1 && (errno == EINTR || errno == EAGAIN)); + + unlikely (rval == -1) + return MEMCACHED_CONNECTION_FAILURE; } return MEMCACHED_SUCCESS; } -static memcached_return unix_socket_connect(memcached_server_st *ptr) +static memcached_return_t unix_socket_connect(memcached_server_st *ptr) { struct sockaddr_un servAddr; socklen_t addrlen; @@ -128,14 +162,15 @@ static memcached_return unix_socket_connect(memcached_server_st *ptr) servAddr.sun_family= AF_UNIX; strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */ - addrlen= strlen(servAddr.sun_path) + sizeof(servAddr.sun_family); + addrlen= (socklen_t) (strlen(servAddr.sun_path) + sizeof(servAddr.sun_family)); test_connect: - if (connect(ptr->fd, + if (connect(ptr->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0) { - switch (errno) { + switch (errno) + { case EINPROGRESS: case EALREADY: case EINTR: @@ -149,35 +184,41 @@ test_connect: } } } + + WATCHPOINT_ASSERT(ptr->fd != -1); return MEMCACHED_SUCCESS; } -static memcached_return network_connect(memcached_server_st *ptr) +static memcached_return_t network_connect(memcached_server_st *ptr) { if (ptr->fd == -1) { struct addrinfo *use; - /* Old connection junk still is in the structure */ - WATCHPOINT_ASSERT(ptr->cursor_active == 0); - - if (ptr->sockaddr_inited == MEMCACHED_NOT_ALLOCATED || - (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS))) + if (!ptr->sockaddr_inited || + (!(ptr->root->flags.use_cache_lookups))) { - memcached_return rc; + memcached_return_t rc; rc= set_hostinfo(ptr); if (rc != MEMCACHED_SUCCESS) return rc; - ptr->sockaddr_inited= MEMCACHED_ALLOCATED; + ptr->sockaddr_inited= true; } use= ptr->address_info; /* Create the socket */ while (use != NULL) { - if ((ptr->fd= socket(use->ai_family, - use->ai_socktype, + /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */ + if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET) + { + use= use->ai_next; + continue; + } + + if ((ptr->fd= socket(use->ai_family, + use->ai_socktype, use->ai_protocol)) < 0) { ptr->cached_errno= errno; @@ -188,58 +229,48 @@ static memcached_return network_connect(memcached_server_st *ptr) (void)set_socket_options(ptr); /* connect to server */ -test_connect: - if (connect(ptr->fd, - use->ai_addr, - use->ai_addrlen) < 0) + while (ptr->fd != -1 && + connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0) { - switch (errno) { - /* We are spinning waiting on connect */ - case EALREADY: - case EINPROGRESS: + ptr->cached_errno= errno; + if (errno == EINPROGRESS || /* nonblocking mode - first return, */ + errno == EALREADY) /* nonblocking mode - subsequent returns */ + { + struct pollfd fds[1]; + fds[0].fd = ptr->fd; + fds[0].events = POLLOUT; + int error= poll(fds, 1, ptr->root->connect_timeout); + + if (error != 1 || fds[0].revents & POLLERR) { - struct pollfd fds[1]; - int error; - - memset(&fds, 0, sizeof(struct pollfd)); - fds[0].fd= ptr->fd; - fds[0].events= POLLOUT | POLLERR; - error= poll(fds, 1, ptr->root->connect_timeout); - - if (error != 1) + if (fds[0].revents & POLLERR) { - ptr->cached_errno= errno; - WATCHPOINT_ERRNO(ptr->cached_errno); - WATCHPOINT_NUMBER(ptr->root->connect_timeout); - close(ptr->fd); - ptr->fd= -1; - return MEMCACHED_ERRNO; + int err; + socklen_t len = sizeof (err); + (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); + ptr->cached_errno= (err == 0) ? errno : err; } - break; + (void)close(ptr->fd); + ptr->fd= -1; } - /* We are spinning waiting on connect */ - case EINTR: - goto test_connect; - case EISCONN: /* We were spinning waiting on connect */ + } + else if (errno == EISCONN) /* we are connected :-) */ + { break; - default: - ptr->cached_errno= errno; - WATCHPOINT_ERRNO(ptr->cached_errno); - close(ptr->fd); + } + else if (errno != EINTR) + { + (void)close(ptr->fd); ptr->fd= -1; - if (ptr->root->retry_timeout) - { - struct timeval next_time; - - gettimeofday(&next_time, NULL); - ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout; - } + break; } } - else + + if (ptr->fd != -1) { WATCHPOINT_ASSERT(ptr->cursor_active == 0); + ptr->server_failure_counter= 0; return MEMCACHED_SUCCESS; } use = use->ai_next; @@ -247,25 +278,57 @@ test_connect: } if (ptr->fd == -1) + { + /* Failed to connect. schedule next retry */ + if (ptr->root->retry_timeout) + { + struct timeval next_time; + + if (gettimeofday(&next_time, NULL) == 0) + ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout; + } + ptr->server_failure_counter++; + if (ptr->cached_errno == 0) + return MEMCACHED_TIMEOUT; + return MEMCACHED_ERRNO; /* The last error should be from connect() */ + } + ptr->server_failure_counter= 0; return MEMCACHED_SUCCESS; /* The last error should be from connect() */ } -memcached_return memcached_connect(memcached_server_st *ptr) +memcached_return_t memcached_connect(memcached_server_st *ptr) { - memcached_return rc= MEMCACHED_NO_SERVERS; + memcached_return_t rc= MEMCACHED_NO_SERVERS; LIBMEMCACHED_MEMCACHED_CONNECT_START(); - if (ptr->root->retry_timeout) + /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */ + WATCHPOINT_ASSERT(ptr->root); + if (ptr->root->retry_timeout && ptr->root->server_failure_limit) { - struct timeval next_time; + struct timeval curr_time; - gettimeofday(&next_time, NULL); - if (next_time.tv_sec < ptr->next_retry) - return MEMCACHED_TIMEOUT; + gettimeofday(&curr_time, NULL); + + /* if we've had too many consecutive errors on this server, mark it dead. */ + if (ptr->server_failure_counter >= ptr->root->server_failure_limit) + { + ptr->next_retry= curr_time.tv_sec + ptr->root->retry_timeout; + ptr->server_failure_counter= 0; + } + + if (curr_time.tv_sec < ptr->next_retry) + { + if (memcached_behavior_get(ptr->root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS)) + run_distribution(ptr->root); + + ptr->root->last_disconnected_server = ptr; + return MEMCACHED_SERVER_MARKED_DEAD; + } } + /* We need to clean up the multi startup piece */ switch (ptr->type) { @@ -284,8 +347,7 @@ memcached_return memcached_connect(memcached_server_st *ptr) WATCHPOINT_ASSERT(0); } - if (rc != MEMCACHED_SUCCESS) - WATCHPOINT_ERROR(rc); + unlikely ( rc != MEMCACHED_SUCCESS) ptr->root->last_disconnected_server = ptr; LIBMEMCACHED_MEMCACHED_CONNECT_END();