Add support for AUTO_EJECT_HOST
authorRobey Pointer <robey@twitter.com>
Sat, 2 May 2009 02:44:39 +0000 (19:44 -0700)
committerRobey Pointer <robey@twitter.com>
Sat, 2 May 2009 02:44:39 +0000 (19:44 -0700)
libmemcached/common.h
libmemcached/memcached.h
libmemcached/memcached_behavior.c
libmemcached/memcached_connect.c
libmemcached/memcached_constants.h
libmemcached/memcached_hash.c
libmemcached/memcached_hosts.c
libmemcached/memcached_io.c
libmemcached/memcached_quit.c
libmemcached/memcached_strerror.c
tests/function.c

index b9560966a5762e82a9e98cdbfd0644640b782b9d..5dc77a08a8a8e9e05c9c9d55f588137fe259b8a9 100644 (file)
@@ -77,7 +77,8 @@ typedef enum {
   MEM_BINARY_PROTOCOL= (1 << 12),
   MEM_HASH_WITH_PREFIX_KEY= (1 << 13),
   MEM_NOREPLY= (1 << 14),
-  MEM_USE_UDP= (1 << 15)
+  MEM_USE_UDP= (1 << 15),
+  MEM_AUTO_EJECT_HOSTS= (1 << 16)
 } memcached_flags;
 
 /* Hashing algo */
index 62fa65c0af3f6661faf139e8dc00e89a25991a80..aceab59b77bf2aac3b189019fb353012289d11ae 100644 (file)
@@ -117,6 +117,7 @@ struct memcached_st {
   uint32_t server_failure_limit;
   uint32_t io_msg_watermark;
   uint32_t io_bytes_watermark;
+  time_t next_distribution_rebuild;
 };
 
 
index 6bc00064da3155e69ca7aaf2ff4f0078e239e9ba..77c9bd3bd89bf3c1fe57ece24413186382ccd43f 100644 (file)
@@ -150,7 +150,10 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
     break;
   case MEMCACHED_BEHAVIOR_NOREPLY:
     set_behavior_flag(ptr, MEM_NOREPLY, data);
-    break;     
+    break;
+  case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS:
+    set_behavior_flag(ptr, MEM_AUTO_EJECT_HOSTS, data);
+    break;
   }
 
   return MEMCACHED_SUCCESS;
@@ -263,6 +266,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
   case MEMCACHED_BEHAVIOR_NOREPLY:
     temp_flag= MEM_NOREPLY;
     break;
+  case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS:
+    temp_flag= MEM_AUTO_EJECT_HOSTS;
+    break;
   }
 
   WATCHPOINT_ASSERT(temp_flag); /* Programming mistake if it gets this far */
index 8e0e4208e864f11556ed8aaa9927aa8b15400c11..fb79c80ad5d7296d900878a6e81406c80aa77a35 100644 (file)
@@ -121,7 +121,7 @@ static memcached_return set_socket_options(memcached_server_st *ptr)
   }
 
   /* For the moment, not getting a nonblocking mode will not be fatal */
-  if (ptr->root->flags & MEM_NO_BLOCK)
+  if ((ptr->root->flags & MEM_NO_BLOCK) || ptr->root->connect_timeout)
   {
     int flags;
 
@@ -185,15 +185,6 @@ static memcached_return network_connect(memcached_server_st *ptr)
   {
     struct addrinfo *use;
 
-    if (ptr->root->server_failure_limit != 0) 
-    {
-      if (ptr->server_failure_counter >= ptr->root->server_failure_limit) 
-      {
-          memcached_server_remove(ptr);
-          return MEMCACHED_FAILURE;
-      }
-    }
-
     if (!ptr->sockaddr_inited ||
         (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS)))
     {
@@ -253,7 +244,7 @@ static memcached_return network_connect(memcached_server_st *ptr)
               int err;
               int len = sizeof (err);
               (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
-              ptr->cached_errno= errno;
+              ptr->cached_errno= (err == 0) ? errno : err;
             }
 
             (void)close(ptr->fd);
@@ -275,7 +266,7 @@ static memcached_return network_connect(memcached_server_st *ptr)
       if (ptr->fd != -1)
       {
         /* restore flags */ 
-        if (ptr->root->connect_timeout && (flags & O_NONBLOCK) == 0) 
+        if (ptr->root->connect_timeout && (ptr->root->flags & MEM_NO_BLOCK) == 0) 
           (void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK);
 
         WATCHPOINT_ASSERT(ptr->cursor_active == 0);
@@ -297,6 +288,8 @@ static memcached_return network_connect(memcached_server_st *ptr)
         ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
     }
     ptr->server_failure_counter+= 1;
+    if (ptr->cached_errno == 0)
+      return MEMCACHED_TIMEOUT;
     return MEMCACHED_ERRNO; /* The last error should be from connect() */
   }
 
@@ -310,14 +303,29 @@ memcached_return memcached_connect(memcached_server_st *ptr)
   memcached_return rc= MEMCACHED_NO_SERVERS;
   LIBMEMCACHED_MEMCACHED_CONNECT_START();
 
-  if (ptr->root->retry_timeout)
+  /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
+  if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
   {
     struct timeval next_time;
 
     gettimeofday(&next_time, NULL);
+
+    /* if we've had too many consecutive errors on this server, mark it dead. */
+    if (ptr->server_failure_counter > ptr->root->server_failure_limit)
+    {
+      ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
+      ptr->server_failure_counter= 0;
+    }
+
     if (next_time.tv_sec < ptr->next_retry)
-      return MEMCACHED_TIMEOUT;
+    {
+      if (memcached_behavior_get(ptr->root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
+        run_distribution(ptr->root);
+
+      return MEMCACHED_SERVER_MARKED_DEAD;
+    }
   }
+
   /* We need to clean up the multi startup piece */
   switch (ptr->type)
   {
index 03c61871a8dc7cd7a0a57535b619e182add49857..9133a2626d14248f2432fcd03cb8eaafbde07d5e 100644 (file)
@@ -63,6 +63,7 @@ typedef enum {
   MEMCACHED_BUFFERED,
   MEMCACHED_BAD_KEY_PROVIDED,
   MEMCACHED_INVALID_HOST_PROTOCOL,
+  MEMCACHED_SERVER_MARKED_DEAD,
   MEMCACHED_MAXIMUM_RETURN /* Always add new error code before */
 } memcached_return;
 
@@ -100,7 +101,8 @@ typedef enum {
   MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK,
   MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY,
   MEMCACHED_BEHAVIOR_NOREPLY,
-  MEMCACHED_BEHAVIOR_USE_UDP
+  MEMCACHED_BEHAVIOR_USE_UDP,
+  MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS
 } memcached_behavior;
 
 typedef enum {
index d2af51653be0e815511addc738d15f51aab22cf5..0527733a9b766503731a7054805664a75ee400f9 100644 (file)
@@ -179,6 +179,14 @@ uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_
 
   WATCHPOINT_ASSERT(hash);
 
+  if (memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS) && ptr->next_distribution_rebuild) {
+    struct timeval now;
+
+    if (gettimeofday(&now, NULL) == 0 &&
+        now.tv_sec > ptr->next_distribution_rebuild)
+      run_distribution(ptr);
+  }
+
   return dispatch_host(ptr, hash);
 }
 
index 2bcd189c15ab3fce95eba0970a039743b848af3b..c8e15c4c0351e798f9f632791285471d1c32cca1 100644 (file)
@@ -112,34 +112,63 @@ memcached_return update_continuum(memcached_st *ptr)
   uint32_t pointer_per_hash= 1;
   uint64_t total_weight= 0;
   uint32_t is_ketama_weighted= 0;
+  uint32_t is_auto_ejecting= 0;
   uint32_t points_per_server= 0;
+  uint32_t live_servers= 0;
+  struct timeval now;
+
+  if (gettimeofday(&now, NULL) != 0)
+  {
+    ptr->cached_errno = errno;
+    return MEMCACHED_ERRNO;
+  }
+
+  list = ptr->hosts;
+
+  /* count live servers (those without a retry delay set) */
+  is_auto_ejecting= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS);
+  if (is_auto_ejecting)
+  {
+    live_servers= 0;
+    ptr->next_distribution_rebuild= 0;
+    for (host_index= 0; host_index < ptr->number_of_hosts; ++host_index)
+    {
+      if (list[host_index].next_retry <= now.tv_sec)
+        live_servers++;
+      else
+      {
+        if (ptr->next_distribution_rebuild == 0 || list[host_index].next_retry < ptr->next_distribution_rebuild)
+          ptr->next_distribution_rebuild= list[host_index].next_retry;
+      }
+    }
+  }
+  else
+    live_servers= ptr->number_of_hosts;
 
   is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
   points_per_server= is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER;
 
-  if (ptr->number_of_hosts == 0)
+  if (live_servers == 0)
     return MEMCACHED_SUCCESS;
 
-  if (ptr->number_of_hosts > ptr->continuum_count)
+  if (live_servers > ptr->continuum_count)
   {
     memcached_continuum_item_st *new_ptr;
 
     if (ptr->call_realloc)
       new_ptr= (memcached_continuum_item_st *)ptr->call_realloc(ptr, ptr->continuum, 
-                                                                sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
+                                                                sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
     else
       new_ptr= (memcached_continuum_item_st *)realloc(ptr->continuum, 
-                                                      sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
+                                                      sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
 
     if (new_ptr == 0)
       return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
 
     ptr->continuum= new_ptr;
-    ptr->continuum_count= ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION;
+    ptr->continuum_count= live_servers + MEMCACHED_CONTINUUM_ADDITION;
   }
 
-  list = ptr->hosts;
-
   if (is_ketama_weighted) 
   {
     for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) 
@@ -148,16 +177,20 @@ memcached_return update_continuum(memcached_st *ptr)
       {
         list[host_index].weight = 1;
       }
-      total_weight += list[host_index].weight;
+      if (!is_auto_ejecting || list[host_index].next_retry <= now.tv_sec)
+        total_weight += list[host_index].weight;
     }
   }
 
   for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) 
   {
+    if (is_auto_ejecting && list[host_index].next_retry > now.tv_sec)
+      continue;
+
     if (is_ketama_weighted) 
     {
         float pct = (float)list[host_index].weight / (float)total_weight;
-        pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)(ptr->number_of_hosts) + 0.0000000001) * 4;
+        pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)live_servers + 0.0000000001) * 4;
         pointer_per_hash= 4;
 #ifdef HAVE_DEBUG
         printf("ketama_weighted:%s|%d|%llu|%u\n", 
@@ -212,7 +245,7 @@ memcached_return update_continuum(memcached_st *ptr)
   qsort(ptr->continuum, ptr->continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
 
 #ifdef HAVE_DEBUG
-  for (index= 0; ptr->number_of_hosts && index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++) 
+  for (index= 0; ptr->number_of_hosts && index < ((live_servers * MEMCACHED_POINTS_PER_SERVER) - 1); index++) 
   {
     WATCHPOINT_ASSERT(ptr->continuum[index].value <= ptr->continuum[index + 1].value);
   }
index 43fb509e64c18f44f8dd6352d78e21511b8fc2be..3f184e068624fbbde7c876dd5554a96f61860855 100644 (file)
@@ -116,16 +116,11 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
           switch (errno)
           {
           case EAGAIN:
-          case EINTR: 
-            {
-              memcached_return rc;
-
-              rc= io_wait(ptr, MEM_READ);
+          case EINTR:
+            if (io_wait(ptr, MEM_READ) == MEMCACHED_SUCCESS)
+              continue;
+          /* fall through */
 
-              if (rc == MEMCACHED_SUCCESS)
-                continue;
-            }
-          /* fall trough */
           default:
             {
               memcached_quit_server(ptr, 1);
@@ -177,6 +172,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
     }
   }
 
+  ptr->server_failure_counter= 0;
   return (size_t)(buffer_ptr - (char*)buffer);
 }
 
index 80b9314ac593a2350c28ae925579d42b13566fac..c93cb1883ac17dbadfe818a02fe43caf3dbe926c 100644 (file)
@@ -49,6 +49,8 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death)
     ptr->read_ptr= ptr->read_buffer;
     memcached_server_response_reset(ptr);
   }
+
+  ptr->server_failure_counter++;
 }
 
 void memcached_quit(memcached_st *ptr)
index 72e51fdabd2135d80ae2951788abcc531829c3e8..4b626713c60ad69be3d375859c0913e3d894d1b2 100644 (file)
@@ -72,6 +72,8 @@ char *memcached_strerror(memcached_st *ptr __attribute__((unused)), memcached_re
     return "A BAD KEY WAS PROVIDED/CHARACTERS OUT OF RANGE";
   case MEMCACHED_INVALID_HOST_PROTOCOL:
     return "THE HOST TRANSPORT PROTOCOL DOES NOT MATCH THAT OF THE CLIENT";
+  case MEMCACHED_SERVER_MARKED_DEAD:
+    return "SERVER IS MARKED DEAD";
   case MEMCACHED_MAXIMUM_RETURN:
     return "Gibberish returned!";
   default:
index 1f0c404b5e0a760a6fa7489c01a396398a734db5..182aa8c35bce03f86f12fe71b5e38767d354e797 100644 (file)
@@ -2407,6 +2407,73 @@ test_return user_supplied_bug18(memcached_st *trash)
   return 0;
 }
 
+test_return auto_eject_hosts(memcached_st *trash)
+{
+  memcached_return rc;
+  memcached_st *memc= memcached_create(NULL);
+  assert(memc);
+
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  uint64_t value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+  assert(value == 1);
+
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH, MEMCACHED_HASH_MD5);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH);
+  assert(value == MEMCACHED_HASH_MD5);
+
+    /* server should be removed when in delay */
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS, 1);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS);
+  assert(value == 1);
+
+  memcached_server_st *server_pool;
+  server_pool = memcached_servers_parse("10.0.1.1:11211 600,10.0.1.2:11211 300,10.0.1.3:11211 200,10.0.1.4:11211 350,10.0.1.5:11211 1000,10.0.1.6:11211 800,10.0.1.7:11211 950,10.0.1.8:11211 100");
+  memcached_server_push(memc, server_pool);
+
+  /* verify that the server list was parsed okay. */
+  assert(memc->number_of_hosts == 8);
+  assert(strcmp(server_pool[0].hostname, "10.0.1.1") == 0);
+  assert(server_pool[0].port == 11211);
+  assert(server_pool[0].weight == 600);
+  assert(strcmp(server_pool[2].hostname, "10.0.1.3") == 0);
+  assert(server_pool[2].port == 11211);
+  assert(server_pool[2].weight == 200);
+  assert(strcmp(server_pool[7].hostname, "10.0.1.8") == 0);
+  assert(server_pool[7].port == 11211);
+  assert(server_pool[7].weight == 100);
+
+  memc->hosts[2].next_retry = time(NULL) + 15;
+  memc->next_distribution_rebuild= time(NULL) - 1;
+
+  for (int x= 0; x < 99; x++)
+  {
+    uint32_t server_idx = memcached_generate_hash(memc, test_cases[x].key, strlen(test_cases[x].key));
+    assert(server_idx != 2);
+  }
+
+  /* and re-added when it's back. */
+  memc->hosts[2].next_retry = time(NULL) - 1;
+  memc->next_distribution_rebuild= time(NULL) - 1;
+  run_distribution(memc);
+  for (int x= 0; x < 99; x++)
+  {
+    uint32_t server_idx = memcached_generate_hash(memc, test_cases[x].key, strlen(test_cases[x].key));
+    char *hostname = memc->hosts[server_idx].hostname;
+    assert(strcmp(hostname, test_cases[x].server) == 0);
+  }
+
+  memcached_server_list_free(server_pool);
+  memcached_free(memc);
+
+  return TEST_SUCCESS;
+}
+
 static test_return  result_static(memcached_st *memc)
 {
   memcached_result_st result;
@@ -3816,6 +3883,11 @@ test_st hsieh_availability[] ={
   {0, 0, 0}
 };
 
+test_st ketama_auto_eject_hosts[] ={
+  {"auto_eject_hosts", 1, auto_eject_hosts },
+  {0, 0, 0}
+};
+
 collection_st collection[] ={
   {"hsieh_availability",0,0,hsieh_availability},
   {"udp_setup", init_udp, 0, udp_setup_server_tests},
@@ -3835,6 +3907,7 @@ collection_st collection[] ={
   {"fnv1_32", pre_hash_fnv1_32, 0, tests},
   {"fnv1a_32", pre_hash_fnv1a_32, 0, tests},
   {"ketama", pre_behavior_ketama, 0, tests},
+  {"ketama_auto_eject_hosts", pre_behavior_ketama, 0, ketama_auto_eject_hosts},
   {"unix_socket", pre_unix_socket, 0, tests},
   {"unix_socket_nodelay", pre_nodelay, 0, tests},
   {"poll_timeout", poll_timeout, 0, tests},