Incomming code for additions in keys (see Changelog)
author <brian@gir-3.local> <>
Mon, 3 Nov 2008 22:15:47 +0000 (03:45 +0530)
committer <brian@gir-3.local> <>
Mon, 3 Nov 2008 22:15:47 +0000 (03:45 +0530)
15 files changed:
ChangeLog
configure.ac
libmemcached/common.h
libmemcached/memcached.h
libmemcached/memcached_behavior.c
libmemcached/memcached_callback.c
libmemcached/memcached_constants.h
libmemcached/memcached_hash.c
libmemcached/memcached_hosts.c
libmemcached/memcached_io.c
libmemcached/memcached_parse.c
libmemcached/memcached_response.c
libmemcached/memcached_server.c
libmemcached/memcached_server.h
tests/function.c

index 4c885c9491e271cdf3a4a89140994fd309ff9e65..327602cb8d8942c2d7466ca846743b1549f7bd49 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,10 @@
 0.25
   * Jenkins HASH added.
   * Update of Murmur hash code
+  * Support explicit weights (Robey Pointer, Evan Weaver)
+  * Bugfix for ketama continuum (Robey Pointer)
+  * New behavior MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY (Robey Pointer)
+  * Don't ever call stats for weighting servers, because it is unstable.
 
 0.24 Tue Sep 16 02:59:03 PDT 2008 (never released)
   * Cleanup compile warnings.
index 0decc23cf147b021b8be43cdbda5af9b0791d1ff..5206a6a50005961db22c49258fc63e5e692b501d 100644 (file)
@@ -7,7 +7,7 @@ MEMCACHED_LIBRARY_NAME=libmemcached
 
 #release versioning
 MEMCACHED_MAJOR_VERSION=0
-MEMCACHED_MINOR_VERSION=24
+MEMCACHED_MINOR_VERSION=25
 MEMCACHED_MICRO_VERSION=0
 
 #API version
index b64af72331fc94ff3fb636dbd27cb8fa12214774..b8c8acf5279b84930e313157f22669b6c6a66b0d 100644 (file)
@@ -81,7 +81,8 @@ typedef enum {
   MEM_VERIFY_KEY= (1 << 10),
   /* 11 used for weighted ketama */
   MEM_KETAMA_WEIGHTED= (1 << 11),
-  MEM_BINARY_PROTOCOL= (1 << 12)
+  MEM_BINARY_PROTOCOL= (1 << 12),
+  MEM_HASH_WITH_PREFIX_KEY= (1 << 13)
 } memcached_flags;
 
 /* Hashing algo */
@@ -125,7 +126,7 @@ extern uint64_t ntohll(uint64_t);
 extern uint64_t htonll(uint64_t);
 
 void host_reset(memcached_st *ptr, memcached_server_st *host, 
-                const char *hostname, unsigned int port,
+                const char *hostname, unsigned int port, uint32_t weight,
                 memcached_connection type);
 
 #endif /* __COMMON_H__ */
index f12975f6e8d41face40dc792ea0151d61019f2f6..96a4f15e38cc40e10feb94cdf2439b3cfe474583 100644 (file)
@@ -39,7 +39,7 @@ struct memcached_continuum_item_st {
   uint32_t value;
 };
 
-#define LIBMEMCACHED_VERSION_STRING "0.24"
+#define LIBMEMCACHED_VERSION_STRING "0.25"
 
 struct memcached_stat_st {
   uint32_t pid;
@@ -143,12 +143,29 @@ memcached_return memcached_server_add_unix_socket(memcached_st *ptr,
                                                   const char *filename);
 memcached_return memcached_server_add(memcached_st *ptr, const char *hostname, 
                                       unsigned int port);
+
+memcached_return memcached_server_add_udp_with_weight(memcached_st *ptr, 
+                                                      const char *hostname,
+                                                      unsigned int port,
+                                                      uint32_t weight);
+memcached_return memcached_server_add_unix_socket_with_weight(memcached_st *ptr, 
+                                                              const char *filename,
+                                                              uint32_t weight);
+memcached_return memcached_server_add_with_weight(memcached_st *ptr, const char *hostname, 
+                                                  unsigned int port,
+                                                  uint32_t weight);
 void memcached_server_list_free(memcached_server_st *ptr);
 memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *list);
 
 memcached_server_st *memcached_server_list_append(memcached_server_st *ptr, 
-                                             const char *hostname, unsigned int port, 
-                                             memcached_return *error);
+                                                  const char *hostname, 
+                                                  unsigned int port, 
+                                                  memcached_return *error);
+memcached_server_st *memcached_server_list_append_with_weight(memcached_server_st *ptr, 
+                                                              const char *hostname, 
+                                                              unsigned int port, 
+                                                              uint32_t weight,
+                                                              memcached_return *error);
 unsigned int memcached_server_list_count(memcached_server_st *ptr);
 memcached_server_st *memcached_servers_parse(char *server_strings);
 
index 0cd42efc64fee5fc41054316a4a50a53f7eaaf4a..5f68844041dc4f33be96988c6a63f0c06d337731 100644 (file)
@@ -1,4 +1,5 @@
 #include "common.h" 
+#include <time.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/tcp.h>
@@ -58,6 +59,10 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
   case MEMCACHED_BEHAVIOR_DISTRIBUTION:
     {
       ptr->distribution= (memcached_server_distribution)(data);
+      if (ptr->distribution == MEMCACHED_DISTRIBUTION_RANDOM)
+      {
+        srandom(time(NULL));
+      }
       run_distribution(ptr);
       break;
     }
@@ -124,6 +129,9 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
     break;
   case MEMCACHED_BEHAVIOR_USER_DATA:
     return MEMCACHED_FAILURE;
+  case MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY:
+    set_behavior_flag(ptr, MEM_HASH_WITH_PREFIX_KEY, data);
+    break;
   }
 
   return MEMCACHED_SUCCESS;
@@ -229,6 +237,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
     }
   case MEMCACHED_BEHAVIOR_USER_DATA:
     return MEMCACHED_FAILURE;
+  case MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY:
+    temp_flag= MEM_HASH_WITH_PREFIX_KEY;
+    break;
   }
 
   WATCHPOINT_ASSERT(temp_flag); /* Programming mistake if it gets this far */
index 5e36ddf6cfc04d57dbfd248535758a54e0718ab1..d1c042429b4d5fd4569a98a649f0ede6f2ac957f 100644 (file)
@@ -21,7 +21,7 @@ memcached_return memcached_callback_set(memcached_st *ptr,
       {
         size_t key_length= strlen(key);
 
-        if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
+        if (memcachd_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)
         {
           return MEMCACHED_BAD_KEY_PROVIDED;
         }
index fb8305f46b2583c1dab8551babfe567ebb1013e3..5fea02089d51791791ded4bfa3c53e40c8ecb2dc 100644 (file)
@@ -19,12 +19,13 @@ extern "C" {
 #define MEMCACHED_MAX_BUFFER 8196
 #define MEMCACHED_MAX_HOST_LENGTH 64
 #define MEMCACHED_MAX_HOST_SORT_LENGTH 86 /* Used for Ketama */
-#define MEMCACHED_POINTS_PER_SERVER 100 
+#define MEMCACHED_POINTS_PER_SERVER 100
+#define MEMCACHED_POINTS_PER_SERVER_KETAMA 160
 #define MEMCACHED_CONTINUUM_SIZE MEMCACHED_POINTS_PER_SERVER*100 /* This would then set max hosts to 100 */ 
 #define MEMCACHED_STRIDE 4
 #define MEMCACHED_DEFAULT_TIMEOUT 1000
 #define MEMCACHED_CONTINUUM_ADDITION 10 /* How many extra slots we should build for in the continuum */
-#define MEMCACHED_PREFIX_KEY_MAX_SIZE 12
+#define MEMCACHED_PREFIX_KEY_MAX_SIZE 128
 
 typedef enum {
   MEMCACHED_SUCCESS,
@@ -66,7 +67,8 @@ typedef enum {
 typedef enum {
   MEMCACHED_DISTRIBUTION_MODULA,
   MEMCACHED_DISTRIBUTION_CONSISTENT,
-  MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA
+  MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA,
+  MEMCACHED_DISTRIBUTION_RANDOM
 } memcached_server_distribution;
 
 typedef enum {
@@ -93,7 +95,8 @@ typedef enum {
   MEMCACHED_BEHAVIOR_RCV_TIMEOUT,
   MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT,
   MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK,
-  MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK
+  MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK,
+  MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY
 } memcached_behavior;
 
 typedef enum {
index 901f2c486c8d5645e7f5dc229d986ee1f572357f..ff7d0e39dc2732b49b92db7fd42ccf9969169431 100644 (file)
@@ -121,37 +121,25 @@ static uint32_t dispatch_host(memcached_st *ptr, uint32_t hash)
       hash= hash;
       memcached_continuum_item_st *begin, *end, *left, *right, *middle;
       begin= left= ptr->continuum;
-      end= right= ptr->continuum + (num - 1);
+      end= right= ptr->continuum + num;
 
-      while (1)
+      while (left < right)
       {
-        memcached_continuum_item_st *rmiddle;
-
-        middle = left + (right - left) / 2;
-
-        if (middle==end)
-          return begin->index;
-
-        if (middle==begin)
-          return end->index;
-
-        rmiddle = middle+1;
-
-        if (hash<rmiddle->value && hash>=middle->value)
-          return middle->index;
-
+        middle= left + (right - left) / 2;
         if (middle->value < hash)
-          left = middle + 1;
-        else if (middle->value > hash)
-          right = middle - 1;
-
-        if (left>right)
-          return left->index;
+          left= middle + 1;
+        else
+          right= middle;
       }
+      if (right > end)
+        right= begin;
+      return right->index;
     } 
     break;
   case MEMCACHED_DISTRIBUTION_MODULA:
     return hash % ptr->number_of_hosts;
+  case MEMCACHED_DISTRIBUTION_RANDOM:
+    return random() % ptr->number_of_hosts;
   default:
     WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */
     return hash % ptr->number_of_hosts;
@@ -174,7 +162,19 @@ uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_
   if (ptr->number_of_hosts == 1)
     return 0;
 
-  hash= generate_hash(ptr, key, key_length);
+  if (ptr->flags & MEM_HASH_WITH_PREFIX_KEY)
+  {
+    int temp_len= ptr->prefix_key_length + key_length;
+    char *temp= (char *)malloc(temp_len);
+    strncpy(temp, ptr->prefix_key, ptr->prefix_key_length);
+    strncpy(temp + ptr->prefix_key_length, key, key_length);
+    hash= generate_hash(ptr, temp, temp_len);
+    free(temp);
+  }
+  else
+  {
+    hash= generate_hash(ptr, key, key_length);
+  }
 
   WATCHPOINT_ASSERT(hash);
 
index f7a7dfcfe38a925ad332c45f3172373d17661f5e..79e88701f63b66871b7753dea54fa84c9019ce79 100644 (file)
@@ -4,6 +4,7 @@
 /* Protoypes (static) */
 static memcached_return server_add(memcached_st *ptr, const char *hostname, 
                                    unsigned int port,
+                                   uint32_t weight,
                                    memcached_connection type);
 memcached_return update_continuum(memcached_st *ptr);
 
@@ -44,6 +45,8 @@ memcached_return run_distribution(memcached_st *ptr)
     if (ptr->flags & MEM_USE_SORT_HOSTS)
       sort_hosts(ptr);
     break;
+  case MEMCACHED_DISTRIBUTION_RANDOM:
+    break;
   default:
     WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */
   }
@@ -52,13 +55,14 @@ memcached_return run_distribution(memcached_st *ptr)
 }
 
 void host_reset(memcached_st *ptr, memcached_server_st *host, 
-                const char *hostname, unsigned int port,
+                const char *hostname, unsigned int port, uint32_t weight,
                 memcached_connection type)
 {
   memset(host,  0, sizeof(memcached_server_st));
   strncpy(host->hostname, hostname, MEMCACHED_MAX_HOST_LENGTH - 1);
   host->root= ptr ? ptr : NULL;
   host->port= port;
+  host->weight= weight;
   host->fd= -1;
   host->type= type;
   host->read_ptr= host->read_buffer;
@@ -87,6 +91,17 @@ void server_list_free(memcached_st *ptr, memcached_server_st *servers)
     free(servers);
 }
 
+static uint32_t ketama_server_hash(const char *key, unsigned int key_length, int alignment)
+{
+  unsigned char results[16];
+  
+  md5_signature((unsigned char*)key, key_length, results);
+  return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
+    | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
+    | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
+    | (results[0 + alignment * 4] & 0xFF);
+}
+
 static int continuum_item_cmp(const void *t1, const void *t2)
 {
   memcached_continuum_item_st *ct1= (memcached_continuum_item_st *)t1;
@@ -111,19 +126,22 @@ memcached_return update_continuum(memcached_st *ptr)
   memcached_server_st *list;
   uint32_t pointer_counter= 0;
   uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER;
-  memcached_return rc;
-  uint64_t total_mem_bytes= 0;
-  memcached_stat_st *stat_p= NULL;
+  uint32_t pointer_per_hash= 1;
+  uint64_t total_weight= 0;
   uint32_t is_ketama_weighted= 0;
+  uint32_t points_per_server= 0;
+
+  is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+  points_per_server= is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER;
 
   if (ptr->number_of_hosts > ptr->continuum_count)
   {
     memcached_continuum_item_st *new_ptr;
 
     if (ptr->call_realloc)
-      new_ptr= (memcached_continuum_item_st *)ptr->call_realloc(ptr, ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * MEMCACHED_POINTS_PER_SERVER);
+      new_ptr= (memcached_continuum_item_st *)ptr->call_realloc(ptr, ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
     else
-      new_ptr= (memcached_continuum_item_st *)realloc(ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * MEMCACHED_POINTS_PER_SERVER);
+      new_ptr= (memcached_continuum_item_st *)realloc(ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
 
     if (new_ptr == 0)
       return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
@@ -134,58 +152,79 @@ memcached_return update_continuum(memcached_st *ptr)
 
   list = ptr->hosts;
 
-  is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
-  if(is_ketama_weighted) 
+  if (is_ketama_weighted) 
   {
-    stat_p = memcached_stat(ptr, NULL, &rc);
     for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) 
     {
-      list[host_index].limit_maxbytes= (stat_p + host_index)->limit_maxbytes;
-      total_mem_bytes += (stat_p + host_index)->limit_maxbytes;
+      if (list[host_index].weight == 0)
+      {
+        list[host_index].weight = 1;
+      }
+      total_weight += list[host_index].weight;
     }
   }
 
   for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index) 
   {
-    if(is_ketama_weighted) 
+    if (is_ketama_weighted) 
     {
-        float pct = (float)list[host_index].limit_maxbytes/ (float)total_mem_bytes;
-        pointer_per_server= floorf( pct * MEMCACHED_POINTS_PER_SERVER * (float)(ptr->number_of_hosts));
+        float pct = (float)list[host_index].weight / (float)total_weight;
+        pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)(ptr->number_of_hosts) + 0.0000000001) * 4;
+        pointer_per_hash= 4;
 #ifdef HAVE_DEBUG
         printf("ketama_weighted:%s|%d|%llu|%u\n", 
                list[host_index].hostname, 
                list[host_index].port,  
-               (unsigned long long)list[host_index].limit_maxbytes
+               (unsigned long long)list[host_index].weight
                pointer_per_server);
 #endif
     }
-    for(index= 1; index <= pointer_per_server; ++index) 
+    for (index= 1; index <= pointer_per_server / pointer_per_hash; ++index) 
     {
       char sort_host[MEMCACHED_MAX_HOST_SORT_LENGTH]= "";
       size_t sort_host_length;
 
-      sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d", 
-                                 list[host_index].hostname, list[host_index].port, index);
+      if (list[host_index].port == MEMCACHED_DEFAULT_PORT)
+      {
+        sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s-%d", 
+                                   list[host_index].hostname, index - 1);
+
+      }
+      else
+      {
+        sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d", 
+                                   list[host_index].hostname, list[host_index].port, index - 1);
+      }
       WATCHPOINT_ASSERT(sort_host_length);
-      value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum);
-      ptr->continuum[continuum_index].index= host_index;
-      ptr->continuum[continuum_index++].value= value;
+
+      if (is_ketama_weighted)
+      {
+        int i;
+        for (i = 0; i < pointer_per_hash; i++)
+        {
+          value= ketama_server_hash(sort_host, sort_host_length, i);
+          ptr->continuum[continuum_index].index= host_index;
+          ptr->continuum[continuum_index++].value= value;
+        }
+      }
+      else
+      {
+        value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum);
+        ptr->continuum[continuum_index].index= host_index;
+        ptr->continuum[continuum_index++].value= value;
+      }
     }
     pointer_counter+= pointer_per_server;
   }
 
   WATCHPOINT_ASSERT(ptr);
   WATCHPOINT_ASSERT(ptr->continuum);
-  WATCHPOINT_ASSERT(ptr->number_of_hosts);
   WATCHPOINT_ASSERT(ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);
   ptr->continuum_points_counter= pointer_counter;
   qsort(ptr->continuum, ptr->continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
 
-  if (stat_p)
-    memcached_stat_free(NULL, stat_p);
-
 #ifdef HAVE_DEBUG
-  for (index= 0; index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++) 
+  for (index= 0; ptr->number_of_hosts && index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++) 
   {
     WATCHPOINT_ASSERT(ptr->continuum[index].value <= ptr->continuum[index + 1].value);
   }
@@ -205,7 +244,6 @@ memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *l
     return MEMCACHED_SUCCESS;
 
   count= list[0].count;
-
   if (ptr->call_realloc)
     new_host_list= 
       (memcached_server_st *)ptr->call_realloc(ptr, ptr->hosts, 
@@ -219,12 +257,12 @@ memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *l
     return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
 
   ptr->hosts= new_host_list;
-                                   
+
   for (x= 0; x < count; x++)
   {
     WATCHPOINT_ASSERT(list[x].hostname[0] != 0);
     host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], list[x].hostname, 
-               list[x].port, list[x].type);
+               list[x].port, list[x].weight, list[x].type);
     ptr->number_of_hosts++;
   }
   ptr->hosts[0].count= ptr->number_of_hosts;
@@ -232,17 +270,33 @@ memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *l
   return run_distribution(ptr);
 }
 
-memcached_return memcached_server_add_unix_socket(memcached_st *ptr, const char *filename)
+memcached_return memcached_server_add_unix_socket(memcached_st *ptr, 
+                                                  const char *filename)
+{
+  return memcached_server_add_unix_socket_with_weight(ptr, filename, 0);
+}
+
+memcached_return memcached_server_add_unix_socket_with_weight(memcached_st *ptr, 
+                                                              const char *filename, 
+                                                              uint32_t weight)
 {
   if (!filename)
     return MEMCACHED_FAILURE;
 
-  return server_add(ptr, filename, 0, MEMCACHED_CONNECTION_UNIX_SOCKET);
+  return server_add(ptr, filename, 0, weight, MEMCACHED_CONNECTION_UNIX_SOCKET);
 }
 
 memcached_return memcached_server_add_udp(memcached_st *ptr, 
                                           const char *hostname,
                                           unsigned int port)
+{
+  return memcached_server_add_udp_with_weight(ptr, hostname, port, 0);
+}
+
+memcached_return memcached_server_add_udp_with_weight(memcached_st *ptr, 
+                                                      const char *hostname,
+                                                      unsigned int port,
+                                                      uint32_t weight)
 {
   if (!port)
     port= MEMCACHED_DEFAULT_PORT; 
@@ -250,12 +304,20 @@ memcached_return memcached_server_add_udp(memcached_st *ptr,
   if (!hostname)
     hostname= "localhost"; 
 
-  return server_add(ptr, hostname, port, MEMCACHED_CONNECTION_UDP);
+  return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_UDP);
 }
 
 memcached_return memcached_server_add(memcached_st *ptr, 
                                       const char *hostname, 
                                       unsigned int port)
+{
+  return memcached_server_add_with_weight(ptr, hostname, port, 0);
+}
+
+memcached_return memcached_server_add_with_weight(memcached_st *ptr, 
+                                                  const char *hostname, 
+                                                  unsigned int port,
+                                                  uint32_t weight)
 {
   if (!port)
     port= MEMCACHED_DEFAULT_PORT; 
@@ -263,11 +325,12 @@ memcached_return memcached_server_add(memcached_st *ptr,
   if (!hostname)
     hostname= "localhost"; 
 
-  return server_add(ptr, hostname, port, MEMCACHED_CONNECTION_TCP);
+  return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_TCP);
 }
 
 static memcached_return server_add(memcached_st *ptr, const char *hostname, 
                                    unsigned int port,
+                                   uint32_t weight,
                                    memcached_connection type)
 {
   memcached_server_st *new_host_list;
@@ -283,7 +346,7 @@ static memcached_return server_add(memcached_st *ptr, const char *hostname,
 
   ptr->hosts= new_host_list;
 
-  host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], hostname, port, type);
+  host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], hostname, port, weight, type);
   ptr->number_of_hosts++;
   ptr->hosts[0].count= ptr->number_of_hosts;
 
@@ -317,8 +380,16 @@ memcached_return memcached_server_remove(memcached_server_st *st_ptr)
 }
 
 memcached_server_st *memcached_server_list_append(memcached_server_st *ptr, 
-                                                  const char *hostname, unsigned int port, 
+                                                  const char *hostname, unsigned int port,
                                                   memcached_return *error)
+{
+  return memcached_server_list_append_with_weight(ptr, hostname, port, 0, error);
+}
+
+memcached_server_st *memcached_server_list_append_with_weight(memcached_server_st *ptr, 
+                                                              const char *hostname, unsigned int port,
+                                                              uint32_t weight, 
+                                                              memcached_return *error)
 {
   unsigned int count;
   memcached_server_st *new_host_list;
@@ -343,7 +414,7 @@ memcached_server_st *memcached_server_list_append(memcached_server_st *ptr,
     return NULL;
   }
 
-  host_reset(NULL, &new_host_list[count-1], hostname, port, MEMCACHED_CONNECTION_TCP);
+  host_reset(NULL, &new_host_list[count-1], hostname, port, weight, MEMCACHED_CONNECTION_TCP);
 
   /* Backwards compatibility hack */
   new_host_list[0].count= count;
index ddca197260f818de70c5e04e6e60a553479f1b8e..da845912fc7de05c745b743f137ff2ca1520cdd2 100644 (file)
@@ -232,7 +232,10 @@ memcached_return memcached_io_close(memcached_server_st *ptr)
 #endif
 
   r= close(ptr->fd);
-  WATCHPOINT_ASSERT(r == 0);
+#ifdef HAVE_DEBUG
+  if (r != 0)
+    WATCHPOINT_ERRNO(errno);
+#endif
 
   return MEMCACHED_SUCCESS;
 }
@@ -334,7 +337,8 @@ static ssize_t io_flush(memcached_server_st *ptr,
   }
 
   WATCHPOINT_ASSERT(write_length == 0);
-  WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset);
+  // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
+  // ptr->write_buffer_offset);
   ptr->write_buffer_offset= 0;
 
   return return_length;
index fc6a4cb1ecb19e08892afc84a024eac263118c4d..d3d364a0bd43cd7ac1c21091966def75520104c5 100644 (file)
@@ -12,6 +12,7 @@ memcached_server_st *memcached_servers_parse(char *server_strings)
 {
   char *string;
   unsigned int port;
+  uint32_t weight;
   char *begin_ptr;
   char *end_ptr;
   memcached_server_st *servers= NULL;
@@ -26,8 +27,9 @@ memcached_server_st *memcached_servers_parse(char *server_strings)
        string= index(begin_ptr, ','))
   {
     char buffer[HUGE_STRING_LEN];
-    char *ptr;
+    char *ptr, *ptr2;
     port= 0;
+    weight= 0;
 
     if (string)
     {
@@ -52,9 +54,18 @@ memcached_server_st *memcached_servers_parse(char *server_strings)
       ptr++;
 
       port= strtoul(ptr, (char **)NULL, 10);
+
+      ptr2= index(ptr, ' ');
+      if (! ptr2)
+        ptr2= index(ptr, ':');
+      if (ptr2)
+      {
+        ptr2++;
+        weight = strtoul(ptr2, (char **)NULL, 10);
+      }
     }
 
-    servers= memcached_server_list_append(servers, buffer, port, &rc);
+    servers= memcached_server_list_append_with_weight(servers, buffer, port, weight, &rc);
 
     if (isspace(*begin_ptr))
       begin_ptr++;
index b103b34a68a4579e68b728e43a77cea0489c8e62..764dbdf0c61a62ea2cde6587337111c273151f3e 100644 (file)
@@ -53,7 +53,7 @@ memcached_return memcached_response(memcached_server_st *ptr,
       ssize_t read_length;
 
       read_length= memcached_io_read(ptr, buffer_ptr, 1);
-      WATCHPOINT_ASSERT(isgraph(*buffer_ptr) || isspace(*buffer_ptr));
+      WATCHPOINT_ASSERT(*buffer_ptr != '\0');
 
       if (read_length != 1)
       {
index 1ee60101a8d32f07937ffdae1d9b04ebcf765210..fd022355ab9b7a001bc27221dc7e41f14566456f 100644 (file)
@@ -75,7 +75,7 @@ memcached_server_st *memcached_server_clone(memcached_server_st *clone, memcache
   new_clone->root= ptr->root;
 
   host_reset(new_clone->root, new_clone, 
-             ptr->hostname, ptr->port,
+             ptr->hostname, ptr->port, ptr->weight,
              ptr->type);
 
   return new_clone;
index d5ed2692528530737497ffbd29cdf200b668beb6..c2bdcdb136420da7379c79dcfad1b0d52fd51b88 100644 (file)
@@ -38,6 +38,7 @@ struct memcached_server_st {
   uint64_t limit_maxbytes;
   uint32_t server_failure_counter;
   uint32_t io_bytes_sent; /* # bytes sent since last read */
+  uint32_t weight;
 };
 
 #define memcached_server_count(A) (A)->number_of_hosts
index 4483232c6b6666a24e80af0d8f577ea70453d32c..4ec247f6dd2e022d341f5ed5b7d8c937464a1646 100644 (file)
@@ -48,13 +48,13 @@ static test_return  server_list_null_test(memcached_st *ptr __attribute__((unuse
   memcached_server_st *server_list;
   memcached_return rc;
 
-  server_list= memcached_server_list_append(NULL, NULL, 0, NULL);
+  server_list= memcached_server_list_append_with_weight(NULL, NULL, 0, 0, NULL);
   assert(server_list == NULL);
 
-  server_list= memcached_server_list_append(NULL, "localhost", 0, NULL);
+  server_list= memcached_server_list_append_with_weight(NULL, "localhost", 0, 0, NULL);
   assert(server_list == NULL);
 
-  server_list= memcached_server_list_append(NULL, NULL, 0, &rc);
+  server_list= memcached_server_list_append_with_weight(NULL, NULL, 0, 0, &rc);
   assert(server_list == NULL);
 
   return 0;
@@ -88,7 +88,7 @@ static test_return  server_sort_test(memcached_st *ptr __attribute__((unused)))
   for (x= 0; x < TEST_PORT_COUNT; x++)
   {
     test_ports[x]= random() % 64000;
-    rc= memcached_server_add(local_memc, "localhost", test_ports[x]);
+    rc= memcached_server_add_with_weight(local_memc, "localhost", test_ports[x], 0);
     assert(local_memc->number_of_hosts == x + 1);
     assert(local_memc->hosts[0].count == x+1);
     assert(rc == MEMCACHED_SUCCESS);
@@ -115,11 +115,11 @@ static test_return  server_sort2_test(memcached_st *ptr __attribute__((unused)))
   rc= memcached_behavior_set(local_memc, MEMCACHED_BEHAVIOR_SORT_HOSTS, 1);
   assert(rc == MEMCACHED_SUCCESS);
 
-  rc= memcached_server_add(local_memc, "MEMCACHED_BEHAVIOR_SORT_HOSTS", 43043);
+  rc= memcached_server_add_with_weight(local_memc, "MEMCACHED_BEHAVIOR_SORT_HOSTS", 43043, 0);
   assert(rc == MEMCACHED_SUCCESS);
   assert(local_memc->hosts[0].port == 43043);
 
-  rc= memcached_server_add(local_memc, "MEMCACHED_BEHAVIOR_SORT_HOSTS", 43042);
+  rc= memcached_server_add_with_weight(local_memc, "MEMCACHED_BEHAVIOR_SORT_HOSTS", 43042, 0);
   assert(rc == MEMCACHED_SUCCESS);
   assert(local_memc->hosts[0].port == 43042);
   assert(local_memc->hosts[1].port == 43043);
@@ -159,7 +159,7 @@ static test_return  server_unsort_test(memcached_st *ptr __attribute__((unused))
   for (x= 0; x < TEST_PORT_COUNT; x++)
   {
     test_ports[x]= random() % 64000;
-    rc= memcached_server_add(local_memc, "localhost", test_ports[x]);
+    rc= memcached_server_add_with_weight(local_memc, "localhost", test_ports[x], 0);
     assert(local_memc->number_of_hosts == x+1);
     assert(local_memc->hosts[0].count == x+1);
     assert(rc == MEMCACHED_SUCCESS);
@@ -234,7 +234,7 @@ static test_return  connection_test(memcached_st *memc)
 {
   memcached_return rc;
 
-  rc= memcached_server_add(memc, "localhost", 0);
+  rc= memcached_server_add_with_weight(memc, "localhost", 0, 0);
   assert(rc == MEMCACHED_SUCCESS);
 
   return 0;
@@ -1238,7 +1238,7 @@ static test_return  add_host_test(memcached_st *memc)
   memcached_return rc;
   char servername[]= "0.example.com";
 
-  servers= memcached_server_list_append(NULL, servername, 400, &rc);
+  servers= memcached_server_list_append_with_weight(NULL, servername, 400, 0, &rc);
   assert(servers);
   assert(1 == memcached_server_list_count(servers));
 
@@ -1247,7 +1247,7 @@ static test_return  add_host_test(memcached_st *memc)
     char buffer[SMALL_STRING_LEN];
 
     snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x);
-    servers= memcached_server_list_append(servers, buffer, 401
+    servers= memcached_server_list_append_with_weight(servers, buffer, 401, 0
                                      &rc);
     assert(rc == MEMCACHED_SUCCESS);
     assert(x == memcached_server_list_count(servers));
@@ -2123,6 +2123,54 @@ static test_return  user_supplied_bug17(memcached_st *memc)
     return 0;
 }
 
+#include "ketama_test_cases.h"
+test_return user_supplied_bug18(memcached_st *memc)
+{
+  memcached_return rc;
+  int value;
+  int i;
+  memcached_server_st *server_pool;
+    
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
+  assert(value == 1);
+
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH, MEMCACHED_HASH_MD5);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_KETAMA_HASH);
+  assert(value == MEMCACHED_HASH_MD5);
+
+  while (memc->number_of_hosts > 0)
+  {
+    memcached_server_remove(memc->hosts);
+  }
+  server_pool = memcached_servers_parse("10.0.1.1:11211 600,10.0.1.2:11211 300,10.0.1.3:11211 200,10.0.1.4:11211 350,10.0.1.5:11211 1000,10.0.1.6:11211 800,10.0.1.7:11211 950,10.0.1.8:11211 100");
+  memcached_server_push(memc, server_pool);
+  
+  /* verify that the server list was parsed okay. */
+  assert(memc->number_of_hosts == 8);
+  assert(strcmp(server_pool[0].hostname, "10.0.1.1") == 0);
+  assert(server_pool[0].port == 11211);
+  assert(server_pool[0].weight == 600);
+  assert(strcmp(server_pool[2].hostname, "10.0.1.3") == 0);
+  assert(server_pool[2].port == 11211);
+  assert(server_pool[2].weight == 200);
+  assert(strcmp(server_pool[7].hostname, "10.0.1.8") == 0);
+  assert(server_pool[7].port == 11211);
+  assert(server_pool[7].weight == 100);
+  
+  /* verify the standard ketama set. */
+  for (i= 0; i < 99; i++)
+  {
+    uint32_t server_idx = memcached_generate_hash(memc, test_cases[i].key, strlen(test_cases[i].key));
+    char *hostname = memc->hosts[server_idx].hostname;
+    assert(strcmp(hostname, test_cases[i].server) == 0);
+  }
+  return 0;
+}
 
 static test_return  result_static(memcached_st *memc)
 {
@@ -2325,7 +2373,7 @@ static test_return  get_read_count(memcached_st *memc)
   clone= memcached_clone(NULL, memc);
   assert(clone);
 
-  memcached_server_add(clone, "localhost", 6666);
+  memcached_server_add_with_weight(clone, "localhost", 6666, 0);
 
   {
     char *return_value;
@@ -2487,7 +2535,7 @@ static test_return  add_host_test1(memcached_st *memc)
   char servername[]= "0.example.com";
   memcached_server_st *servers;
 
-  servers= memcached_server_list_append(NULL, servername, 400, &rc);
+  servers= memcached_server_list_append_with_weight(NULL, servername, 400, 0, &rc);
   assert(servers);
   assert(1 == memcached_server_list_count(servers));
 
@@ -2496,7 +2544,7 @@ static test_return  add_host_test1(memcached_st *memc)
     char buffer[SMALL_STRING_LEN];
 
     snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x);
-    servers= memcached_server_list_append(servers, buffer, 401
+    servers= memcached_server_list_append_with_weight(servers, buffer, 401, 0
                                      &rc);
     assert(rc == MEMCACHED_SUCCESS);
     assert(x == memcached_server_list_count(servers));
@@ -2653,7 +2701,7 @@ static void *my_realloc(memcached_st *ptr __attribute__((unused)), void *mem, co
   return realloc(mem, size);
 }
 
-static memcached_return  set_prefix(memcached_st *memc)
+static memcached_return set_prefix(memcached_st *memc)
 {
   memcached_return rc;
   const char *key= "mine";
@@ -2697,9 +2745,11 @@ static memcached_return  set_prefix(memcached_st *memc)
     assert(value == NULL);
 
     /* Test a long key for failure */
+    /* TODO, extend test to determine based on setting, what result should be */
     long_key= "Thisismorethentheallottednumberofcharacters";
     rc= memcached_callback_set(memc, MEMCACHED_CALLBACK_PREFIX_KEY, long_key);
-    assert(rc == MEMCACHED_BAD_KEY_PROVIDED);
+    //assert(rc == MEMCACHED_BAD_KEY_PROVIDED);
+    assert(rc == MEMCACHED_SUCCESS);
 
     /* Now test a key with spaces (which will fail from long key, since bad key is not set) */
     long_key= "This is more then the allotted number of characters";
@@ -2813,7 +2863,7 @@ static memcached_return  pre_unix_socket(memcached_st *memc)
   if (stat("/tmp/memcached.socket", &buf))
     return MEMCACHED_FAILURE;
 
-  rc= memcached_server_add_unix_socket(memc, "/tmp/memcached.socket");
+  rc= memcached_server_add_unix_socket_with_weight(memc, "/tmp/memcached.socket", 0);
 
   return rc;
 }
@@ -2941,6 +2991,7 @@ test_st user_tests[] ={
   {"user_supplied_bug15", 1, user_supplied_bug15 },
   {"user_supplied_bug16", 1, user_supplied_bug16 },
   {"user_supplied_bug17", 1, user_supplied_bug17 },
+  {"user_supplied_bug18", 1, user_supplied_bug18 },
   {0, 0, 0}
 };