Latest of libtest.
[m6w6/libmemcached] / libmemcached / connect.cc
index 1c4ea913689ca54b513d7a8ded2660c9ca13100a..9d50e5c0dc05ecc1ad1f0ecd754077a1ff9eda15 100644 (file)
@@ -64,18 +64,21 @@ static memcached_return_t connect_poll(memcached_server_st *server)
       {
         int err;
         socklen_t len= sizeof (err);
-        (void)getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len);
-
-        // We check the value to see what happened wth the socket.
-        if (err == 0)
+        if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
         {
-          return MEMCACHED_SUCCESS;
+          // We check the value to see what happened wth the socket.
+          if (err == 0)
+          {
+            return MEMCACHED_SUCCESS;
+          }
+          errno= err;
         }
 
         return memcached_set_errno(*server, err, MEMCACHED_AT);
       }
     case 0:
       {
+        server->io_wait_count.timeouts++;
         return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
       }
 
@@ -99,21 +102,26 @@ static memcached_return_t connect_poll(memcached_server_st *server)
         if (fds[0].revents & POLLERR)
         {
           int err;
-          socklen_t len= sizeof (err);
-          (void)getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len);
-          memcached_set_errno(*server, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT);
-        }
-        else
-        {
-          memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
+          socklen_t len= sizeof(err);
+          if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+          {
+            if (err == 0)
+            {
+              // This should never happen, if it does? Punt.  
+              continue;
+            }
+            errno= err;
+          }
         }
 
+        int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
+
         assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
         (void)closesocket(server->fd);
         server->fd= INVALID_SOCKET;
         server->state= MEMCACHED_SERVER_STATE_NEW;
 
-        return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
+        return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
       }
     }
   }
@@ -124,6 +132,7 @@ static memcached_return_t connect_poll(memcached_server_st *server)
 
 static memcached_return_t set_hostinfo(memcached_server_st *server)
 {
+  assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
   if (server->address_info)
   {
     freeaddrinfo(server->address_info);
@@ -144,7 +153,7 @@ static memcached_return_t set_hostinfo(memcached_server_st *server)
 #if 0
   hints.ai_family= AF_INET;
 #endif
-  if (server->type == MEMCACHED_CONNECTION_UDP)
+  if (memcached_is_udp(server->root))
   {
     hints.ai_protocol= IPPROTO_UDP;
     hints.ai_socktype= SOCK_DGRAM;
@@ -212,9 +221,9 @@ static inline void set_socket_nonblocking(memcached_server_st *server)
     do
     {
       rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
-    } while (rval == -1 && (errno == EINTR || errno == EAGAIN));
+    } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
 
-    unlikely (rval == -1)
+    if (rval == -1)
     {
       memcached_set_errno(*server, errno, NULL);
     }
@@ -224,9 +233,9 @@ static inline void set_socket_nonblocking(memcached_server_st *server)
 
 static void set_socket_options(memcached_server_st *server)
 {
-  assert_msg(server->fd != -1, "invalid socket was passed to set_socket_options()");
+  assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
 
-  if (server->type == MEMCACHED_CONNECTION_UDP)
+  if (memcached_is_udp(server->root))
   {
     return;
   }
@@ -442,7 +451,7 @@ static memcached_return_t network_connect(memcached_server_st *server)
   while (server->address_info_next and server->fd == INVALID_SOCKET)
   {
     /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
-    if (server->type == MEMCACHED_CONNECTION_UDP && server->address_info_next->ai_family != AF_INET)
+    if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
     {
       server->address_info_next= server->address_info_next->ai_next;
       continue;
@@ -471,7 +480,10 @@ static memcached_return_t network_connect(memcached_server_st *server)
       timeout_error_occured= true;
       break;
 
+    case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
     case EWOULDBLOCK:
+#endif
     case EINPROGRESS: // nonblocking mode - first return
     case EALREADY: // nonblocking mode - subsequent returns
       {
@@ -547,6 +559,9 @@ static memcached_return_t network_connect(memcached_server_st *server)
 */
 static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
 {
+  struct timeval curr_time;
+  bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
+
   /* 
     If we hit server_failure_limit then something is completely wrong about the server.
 
@@ -561,7 +576,21 @@ static memcached_return_t backoff_handling(memcached_server_write_instance_st se
     if (_is_auto_eject_host(server->root))
     {
       set_last_disconnected_host(server);
-      run_distribution((memcached_st *)server->root);
+
+      // Retry dead servers if requested
+      if (_gettime_success and server->root->dead_timeout > 0)
+      {
+        server->next_retry= curr_time.tv_sec +server->root->dead_timeout;
+
+        // We only retry dead servers once before assuming failure again
+        server->server_failure_counter= server->root->server_failure_limit -1;
+      }
+
+      memcached_return_t rc;
+      if (memcached_failed(rc= run_distribution((memcached_st *)server->root)))
+      {
+        return memcached_set_error(*server, rc, MEMCACHED_AT, memcached_literal_param("Backoff handling failed during run_distribution"));
+      }
 
       return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
     }
@@ -577,9 +606,6 @@ static memcached_return_t backoff_handling(memcached_server_write_instance_st se
 
   if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
   {
-    struct timeval curr_time;
-    bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
-
     /*
       If next_retry is less then our current time, then we reset and try everything again.
     */
@@ -598,7 +624,7 @@ static memcached_return_t backoff_handling(memcached_server_write_instance_st se
   return MEMCACHED_SUCCESS;
 }
 
-memcached_return_t memcached_connect(memcached_server_write_instance_st server)
+static memcached_return_t _memcached_connect(memcached_server_write_instance_st server, const bool set_last_disconnected)
 {
   if (server->fd != INVALID_SOCKET)
   {
@@ -615,6 +641,16 @@ memcached_return_t memcached_connect(memcached_server_write_instance_st server)
     return rc;
   }
 
+  if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root))
+  {
+    return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
+  }
+
+  if (server->hostname[0] == '/')
+  {
+    server->type= MEMCACHED_CONNECTION_UNIX_SOCKET;
+  }
+
   /* We need to clean up the multi startup piece */
   switch (server->type)
   {
@@ -647,25 +683,39 @@ memcached_return_t memcached_connect(memcached_server_write_instance_st server)
     memcached_mark_server_as_clean(server);
     return rc;
   }
-
-  set_last_disconnected_host(server);
-  if (memcached_has_current_error(*server))
-  {
-    memcached_mark_server_for_timeout(server);
-    assert(memcached_failed(memcached_server_error_return(server)));
-  }
-  else
+  else if (set_last_disconnected)
   {
-    memcached_set_error(*server, rc, MEMCACHED_AT);
-    memcached_mark_server_for_timeout(server);
-  }
+    set_last_disconnected_host(server);
+    if (memcached_has_current_error(*server))
+    {
+      memcached_mark_server_for_timeout(server);
+      assert(memcached_failed(memcached_server_error_return(server)));
+    }
+    else
+    {
+      memcached_set_error(*server, rc, MEMCACHED_AT);
+      memcached_mark_server_for_timeout(server);
+    }
 
-  LIBMEMCACHED_MEMCACHED_CONNECT_END();
+    LIBMEMCACHED_MEMCACHED_CONNECT_END();
 
-  if (in_timeout)
-  {
-    return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
+    if (in_timeout)
+    {
+      char buffer[1024];
+      int snprintf_length= snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname, int(server->port));
+      return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT, buffer, snprintf_length);
+    }
   }
 
   return rc;
 }
+
+memcached_return_t memcached_connect_try(memcached_server_write_instance_st server)
+{
+  return _memcached_connect(server, false);
+}
+
+memcached_return_t memcached_connect(memcached_server_write_instance_st server)
+{
+  return _memcached_connect(server, true);
+}