Pushing weighted ketama code.
author <brian@localhost.localdomain> <>
Wed, 23 Jul 2008 19:16:31 +0000 (12:16 -0700)
committer <brian@localhost.localdomain> <>
Wed, 23 Jul 2008 19:16:31 +0000 (12:16 -0700)
13 files changed:
ChangeLog
THANKS
docs/memcached_behavior.pod
libmemcached/common.h
libmemcached/memcached.c
libmemcached/memcached.h
libmemcached/memcached_behavior.c
libmemcached/memcached_constants.h
libmemcached/memcached_hash.c
libmemcached/memcached_hosts.c
libmemcached/memcached_server.h
tests/function.c
tests/server.c

index c8e04e6fed66b8c68d3682265de26378015ae076..6500f2a26433a255d787214591059b1820569f39 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,7 @@
 0.23
   * Added strings.h header for Solaris 9
   * Solaris 64bit fix.
+  * Support for weighted Ketama from Yin Chen.
 
 0.22 Mon Jul 14 09:24:11 PDT 2008
   * Fix where master key was no being checked for "bad key"
diff --git a/THANKS b/THANKS
index bf6975fa094ca7d89abac0096e420a158531a813..9221e3a622c15f2111cff5ff804490493f8e96d9 100644 (file)
--- a/THANKS
+++ b/THANKS
@@ -9,4 +9,4 @@ Kevin Dalley - Bug Fixes
 Patrick Galbraith - work on C++ interface
 Ross McFarland - Idea for sorting servers.
 Marcelo Fernandez - TCP/IP timeout pieces
-Yin Chen - Ketama support
+Yin Chen - Ketama support/weighted support
index c33b2af59a4d9330997eb22efc05aabbd9cf99d0..f717984d850be918b9defcd23dc9d9c474688552 100755 (executable)
@@ -77,6 +77,16 @@ Support CAS operations (this is not enabled by default at this point in the serv
 Sets the default distribution to MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA
 and the hash to MEMCACHED_HASH_MD5.
 
+=item MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED
+
+Sets the default distribution to MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA with the weighted support.
+and the hash to MEMCACHED_HASH_MD5.
+
+=item MEMCACHED_BEHAVIOR_KETAMA_HASH
+
+Sets the hashing algorithm for host mapping on continuum. The value can be set
+to either MEMCACHED_HASH_DEFAULT, MEMCACHED_HASH_MD5, MEMCACHED_HASH_CRC, MEMCACHED_HASH_FNV1_64, MEMCACHED_HASH_FNV1A_64, MEMCACHED_HASH_FNV1_32, and MEMCACHED_HASH_FNV1A_32. 
+
 =item MEMCACHED_BEHAVIOR_POLL_TIMEOUT
 
 Modify the timeout value that is used by poll(). The default value is -1. An signed int pointer must be passed to memcached_behavior_set() to change this value. For memcached_behavior_get() a signed int value will be cast and returned as the unsigned long long.
index 8cef723a850e1aa2fceeb2f45f99587166cc4226..0c9b87ce3f3478eb8e7984307c7583770276d33f 100644 (file)
@@ -78,6 +78,8 @@ typedef enum {
   MEM_BUFFER_REQUESTS= (1 << 8),
   MEM_USE_SORT_HOSTS= (1 << 9),
   MEM_VERIFY_KEY= (1 << 10),
+  /* 11 used for weighted ketama */
+  MEM_KETAMA_WEIGHTED= (1 << 11),
 } memcached_flags;
 
 /* Hashing algo */
index 2efcc7d5f713042f3401e68612e11b7b6887df18..dcf1a4a0c6bf73cf2ce8199b29b79ad47320fe6d 100644 (file)
@@ -103,6 +103,7 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *ptr)
   new_clone->retry_timeout= ptr->retry_timeout;
   new_clone->distribution= ptr->distribution;
   new_clone->hash= ptr->hash;
+  new_clone->hash_continuum= ptr->hash_continuum;
   new_clone->user_data= ptr->user_data;
 
   new_clone->on_clone= ptr->on_clone;
index d1daf775362af71375de732e55a7bf2f7dbfffeb..911b04447d96d0f286881f75b833e6e1aad91175 100644 (file)
@@ -93,6 +93,8 @@ struct memcached_st {
   memcached_trigger_delete_key delete_trigger;
   char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE];
   size_t prefix_key_length;
+  memcached_hash hash_continuum;
+  uint32_t continuum_points_counter;
 };
 
 
index e89473cff8997d6f19d40110c9604398143373c4..c001c4616d7c588fdc37f8d561f1920fd6919ad9 100644 (file)
@@ -58,9 +58,21 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
       run_distribution(ptr);
       break;
     }
+  case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED:
+    {
+      ptr->hash= MEMCACHED_HASH_MD5;
+      ptr->distribution= MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA;
+      set_behavior_flag(ptr, MEM_KETAMA_WEIGHTED, data);
+      run_distribution(ptr);
+      break;
+    }
   case MEMCACHED_BEHAVIOR_HASH:
     ptr->hash= (memcached_hash)(data);
     break;
+  case MEMCACHED_BEHAVIOR_KETAMA_HASH:
+    ptr->hash_continuum= (memcached_hash)(data);
+    run_distribution(ptr);
+    break;
   case MEMCACHED_BEHAVIOR_CACHE_LOOKUPS:
     set_behavior_flag(ptr, MEM_USE_CACHE_LOOKUPS, data);
     memcached_quit(ptr);
@@ -124,12 +136,17 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
   case MEMCACHED_BEHAVIOR_VERIFY_KEY:
     temp_flag= MEM_VERIFY_KEY;
     break;
+  case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED:
+    temp_flag= MEM_KETAMA_WEIGHTED;
+    break;
   case MEMCACHED_BEHAVIOR_DISTRIBUTION:
     return ptr->distribution;
   case MEMCACHED_BEHAVIOR_KETAMA:
-    return (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA  && ptr->hash == MEMCACHED_HASH_MD5 ) ? 1 : 0;
+    return (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA) ? 1 : 0;
   case MEMCACHED_BEHAVIOR_HASH:
     return ptr->hash;
+  case MEMCACHED_BEHAVIOR_KETAMA_HASH:
+    return ptr->hash_continuum;
   case MEMCACHED_BEHAVIOR_SORT_HOSTS:
     temp_flag= MEM_USE_SORT_HOSTS;
     break;
index fd2f2b4d8f45032967ddd34a1f0b22878479a40a..69c7666b8abdd9220ce4ffce4197e0514c7c0f16 100644 (file)
@@ -86,6 +86,8 @@ typedef enum {
   MEMCACHED_BEHAVIOR_VERIFY_KEY,
   MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT,
   MEMCACHED_BEHAVIOR_RETRY_TIMEOUT,
+  MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED,
+  MEMCACHED_BEHAVIOR_KETAMA_HASH,
 } memcached_behavior;
 
 typedef enum {
index 5bb0fcee903f7a173ea04600d17f25c76e8121c0..75a1537a81e4547f67e8912261331e6dcec0e887 100644 (file)
@@ -11,18 +11,12 @@ static uint32_t FNV_32_PRIME= 16777619;
 static uint32_t internal_generate_hash(const char *key, size_t key_length);
 static uint32_t internal_generate_md5(const char *key, size_t key_length);
 
-uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length)
+uint32_t generate_hash_value(const char *key, size_t key_length, memcached_hash hash_algorithm)
 {
   uint32_t hash= 1; /* Just here to remove compile warning */
   uint32_t x= 0;
 
-
-  WATCHPOINT_ASSERT(ptr->number_of_hosts);
-
-  if (ptr->number_of_hosts == 1)
-    return 0;
-
-  switch (ptr->hash)
+  switch (hash_algorithm)
   {
   case MEMCACHED_HASH_DEFAULT:
     hash= internal_generate_hash(key, key_length);
@@ -91,7 +85,20 @@ uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length)
       break;
     }
   }
+  return hash;
+}
+
+uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length)
+{
+  uint32_t hash= 1; /* Just here to remove compile warning */
+
+
+  WATCHPOINT_ASSERT(ptr->number_of_hosts);
 
+  if (ptr->number_of_hosts == 1)
+    return 0;
+
+  hash= generate_hash_value(key, key_length, ptr->hash);
   WATCHPOINT_ASSERT(hash);
   return hash;
 }
@@ -103,7 +110,7 @@ unsigned int dispatch_host(memcached_st *ptr, uint32_t hash)
   case MEMCACHED_DISTRIBUTION_CONSISTENT:
   case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA:
     {
-      uint32_t num= ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER;
+      uint32_t num= ptr->continuum_points_counter;
       WATCHPOINT_ASSERT(ptr->continuum);
 
       hash= hash;
@@ -156,6 +163,7 @@ unsigned int dispatch_host(memcached_st *ptr, uint32_t hash)
 uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_length)
 {
   uint32_t hash= 1; /* Just here to remove compile warning */
+  uint32_t result= 1;
 
   WATCHPOINT_ASSERT(ptr->number_of_hosts);
 
@@ -165,6 +173,7 @@ uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_
   hash = generate_hash(ptr, key, key_length);
 
   WATCHPOINT_ASSERT(hash);
+
   return dispatch_host(ptr, hash);
 }
 
index d75da5d2d6114410ffaf1519e0469cef2b113821..55d7a49df403ea736e905a43f3576748c4069482 100644 (file)
@@ -1,4 +1,5 @@
 #include "common.h"
+#include <math.h>
 
 /* Protoypes (static) */
 static memcached_return server_add(memcached_st *ptr, char *hostname, 
@@ -107,6 +108,12 @@ memcached_return update_continuum(memcached_st *ptr)
   uint32_t continuum_index= 0;
   uint32_t value;
   memcached_server_st *list;
+  uint32_t pointer_counter= 0;
+  uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER;
+  memcached_return rc;
+  uint64_t total_mem_bytes= 0;
+  memcached_stat_st *stat_p= NULL;
+  uint32_t is_ketama_weighted= 0;
 
   if (ptr->number_of_hosts > ptr->continuum_count)
   {
@@ -125,9 +132,29 @@ memcached_return update_continuum(memcached_st *ptr)
   }
 
   list = ptr->hosts;
+
+  is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+  if(is_ketama_weighted) 
+  {
+    stat_p = memcached_stat(ptr, NULL, &rc);
+    for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) 
+    {
+      list[host_index].limit_maxbytes= (stat_p + host_index)->limit_maxbytes;
+      total_mem_bytes += (stat_p + host_index)->limit_maxbytes;
+    }
+  }
+
   for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) 
   {
-    for(index= 1; index <= MEMCACHED_POINTS_PER_SERVER; ++index) 
+    if(is_ketama_weighted) 
+    {
+        float pct = (float)list[host_index].limit_maxbytes/ (float)total_mem_bytes;
+        pointer_per_server= floorf( pct * MEMCACHED_POINTS_PER_SERVER * (float)(ptr->number_of_hosts));
+#ifdef HAVE_DEBUG
+        printf("ketama_weighted:%s|%d|%llu|%u\n", list[host_index].hostname, list[host_index].port,  list[host_index].limit_maxbytes, pointer_per_server);
+#endif
+    }
+    for(index= 1; index <= pointer_per_server; ++index) 
     {
       char sort_host[MEMCACHED_MAX_HOST_SORT_LENGTH]= "";
       size_t sort_host_length;
@@ -135,14 +162,19 @@ memcached_return update_continuum(memcached_st *ptr)
       sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d", 
                                  list[host_index].hostname, list[host_index].port, index);
       WATCHPOINT_ASSERT(sort_host_length);
-      value= generate_hash(ptr, sort_host, sort_host_length);
+      value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum);
       ptr->continuum[continuum_index].index= host_index;
       ptr->continuum[continuum_index++].value= value;
     }
+    pointer_counter+= pointer_per_server;
   }
 
   WATCHPOINT_ASSERT(ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);
   qsort(ptr->continuum, ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER, sizeof(memcached_continuum_item_st), continuum_item_cmp);
+
+  ptr->continuum_points_counter= pointer_counter;
+  memcached_stat_free(NULL, stat_p);
+
 #ifdef HAVE_DEBUG
   for (index= 0; index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++) 
   {
index f0eff961b51c277556e4d7215249fe526eecb543..c566ddbee3f33f86c49b4f2965d49086331618d1 100644 (file)
@@ -34,6 +34,7 @@ struct memcached_server_st {
   uint16_t count;
   time_t next_retry;
   memcached_st *root;
+  uint64_t limit_maxbytes;
 };
 
 #define memcached_server_count(A) (A)->number_of_hosts
index 59e1f2b94a65c8415a1cb3964b8b88da4bb63b4f..d83b9729efdb7283401ca9fd48e0a0eba9c8ec50 100644 (file)
@@ -24,8 +24,9 @@
 
 #include "test.h"
 
-#define GLOBAL_COUNT 100000
-#define GLOBAL2_COUNT 1000
+#define GLOBAL_COUNT 10000
+#define GLOBAL2_COUNT 100
+#define SERVERS_TO_CREATE 5
 static uint32_t global_count;
 
 static pairs_st *global_pairs;
@@ -2237,6 +2238,23 @@ test_return generate_data(memcached_st *memc)
   return 0;
 }
 
+test_return generate_data_with_stats(memcached_st *memc)
+{
+  memcached_stat_st *stat_p= NULL;
+  memcached_return rc;
+  int host_index= 0;
+  execute_set(memc, global_pairs, global_count);
+
+  //TODO: hosts used size stats
+  stat_p = memcached_stat(memc, NULL, &rc);
+  for (host_index = 0; host_index < SERVERS_TO_CREATE; ++host_index)
+  {
+      printf("\nserver %d|%s|%d bytes: %lld\n", host_index, (memc->hosts)[host_index].hostname, (memc->hosts)[host_index].port, (stat_p + host_index)->bytes);
+  }
+
+
+  return 0;
+}
 test_return generate_buffer_data(memcached_st *memc)
 {
   int latch= 0;
@@ -2521,6 +2539,24 @@ memcached_return pre_behavior_ketama(memcached_st *memc)
   return MEMCACHED_SUCCESS;
 }
 
+memcached_return pre_behavior_ketama_weighted(memcached_st *memc)
+{
+  memcached_return rc;
+  uint64_t value;
+
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+  assert(value == 1);
+
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH, MEMCACHED_HASH_MD5);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH);
+  assert(value == MEMCACHED_HASH_MD5);
+  return MEMCACHED_SUCCESS;
+}
 void my_free(memcached_st *ptr, void *mem)
 {
   free(mem);
@@ -2862,6 +2898,14 @@ test_st consistent_tests[] ={
   {0, 0, 0}
 };
 
+test_st consistent_weighted_tests[] ={
+  {"generate_pairs", 1, generate_pairs },
+  {"generate_data", 1, generate_data_with_stats },
+  {"get_read", 0, get_read_count },
+  {"cleanup", 1, cleanup_pairs },
+  {0, 0, 0}
+};
+
 collection_st collection[] ={
   {"block", 0, 0, tests},
   {"nonblock", pre_nonblock, 0, tests},
@@ -2896,6 +2940,7 @@ collection_st collection[] ={
   {"generate_nonblock", pre_nonblock, 0, generate_tests},
   {"consistent_not", 0, 0, consistent_tests},
   {"consistent_ketama", pre_behavior_ketama, 0, consistent_tests},
+  {"consistent_ketama_weighted", pre_behavior_ketama_weighted, 0, consistent_weighted_tests},
   {0, 0, 0, 0}
 };
 
index 1248981ba1acd63fd92d679daa1554c9ae7f2eae..29f6699ddd42b06008413bd7ba324519ba487de4 100644 (file)
@@ -37,10 +37,20 @@ void server_startup(server_startup_st *construct)
         int count;
         int status;
 
-        if (construct->udp)
-          sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u", x, x+ TEST_PORT_BASE);
-        else
-          sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u", x, x+ TEST_PORT_BASE);
+        if (construct->udp){
+          if(x == 0) {
+            sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u -m 128", x, x+ TEST_PORT_BASE);
+          } else {
+            sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -U %u", x, x+ TEST_PORT_BASE);
+          }
+        }
+        else{
+          if(x == 0) {
+            sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u -m 128", x, x+ TEST_PORT_BASE);
+          } else {
+            sprintf(buffer, "memcached -d -P /tmp/%umemc.pid -t 1 -p %u", x, x+ TEST_PORT_BASE);
+          }
+        }
         status= system(buffer);
         count= sprintf(end_ptr, "localhost:%u,", x + TEST_PORT_BASE);
         end_ptr+= count;