Merge Monty
[awesomized/libmemcached] / libhashkit / ketama.c
1 /* HashKit
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 */
8
9 #include "common.h"
10 #include <math.h>
11
12 static uint32_t ketama_server_hash(const char *key, unsigned int key_length, int alignment)
13 {
14 unsigned char results[16];
15
16 md5_signature((unsigned char*)key, key_length, results);
17 return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
18 | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
19 | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
20 | (results[0 + alignment * 4] & 0xFF);
21 }
22
23 static int continuum_points_cmp(const void *t1, const void *t2)
24 {
25 hashkit_continuum_point_st *ct1= (hashkit_continuum_point_st *)t1;
26 hashkit_continuum_point_st *ct2= (hashkit_continuum_point_st *)t2;
27
28 if (ct1->value == ct2->value)
29 return 0;
30 else if (ct1->value > ct2->value)
31 return 1;
32 else
33 return -1;
34 }
35
36 int update_continuum(hashkit_st *hashkit)
37 {
38 uint32_t count;
39 uint32_t continuum_index= 0;
40 uint32_t value;
41 uint32_t points_index;
42 uint32_t points_count= 0;
43 uint32_t points_per_server;
44 uint32_t points_per_hash;
45 uint64_t total_weight= 0;
46 uint32_t live_servers;
47 uint8_t *context;
48
49 if (hashkit->active_fn != NULL || hashkit->weight_fn != NULL)
50 {
51 live_servers= 0;
52
53 for (count= 0, context= hashkit->list; count < hashkit->list_size;
54 count++, context+= hashkit->context_size)
55 {
56 if (hashkit->active_fn != NULL)
57 {
58 if (hashkit->active_fn(context))
59 live_servers++;
60 else
61 continue;
62 }
63
64 if (hashkit->weight_fn != NULL)
65 total_weight+= hashkit->weight_fn(context);
66 }
67 }
68
69 if (hashkit->active_fn == NULL)
70 live_servers= (uint32_t)hashkit->list_size;
71
72 if (live_servers == 0)
73 return 0;
74
75 if (hashkit->weight_fn == NULL)
76 {
77 points_per_server= HASHKIT_POINTS_PER_NODE;
78 points_per_hash= 1;
79 }
80 else
81 {
82 points_per_server= HASHKIT_POINTS_PER_NODE_WEIGHTED;
83 points_per_hash= 4;
84 }
85
86 if (live_servers > hashkit->continuum_count)
87 {
88 hashkit_continuum_point_st *new_continuum;
89
90 new_continuum= realloc(hashkit->continuum,
91 sizeof(hashkit_continuum_point_st) *
92 (live_servers + HASHKIT_CONTINUUM_ADDITION) *
93 points_per_server);
94
95 if (new_continuum == NULL)
96 return ENOMEM;
97
98 hashkit->continuum= new_continuum;
99 hashkit->continuum_count= live_servers + HASHKIT_CONTINUUM_ADDITION;
100 }
101
102 for (count= 0, context= hashkit->list; count < hashkit->list_size;
103 count++, context+= hashkit->context_size)
104 {
105 if (hashkit->active_fn != NULL && hashkit->active_fn(context) == false)
106 continue;
107
108 if (hashkit->weight_fn != NULL)
109 {
110 float pct = (float)hashkit->weight_fn(context) / (float)total_weight;
111 points_per_server= (uint32_t) ((floorf((float) (pct * HASHKIT_POINTS_PER_NODE_WEIGHTED / 4 * (float)live_servers + 0.0000000001))) * 4);
112 }
113
114 for (points_index= 0;
115 points_index < points_per_server / points_per_hash;
116 points_index++)
117 {
118 char sort_host[HASHKIT_CONTINUUM_KEY_SIZE]= "";
119 size_t sort_host_length;
120
121 if (hashkit->continuum_key_fn == NULL)
122 {
123 sort_host_length= (size_t) snprintf(sort_host, HASHKIT_CONTINUUM_KEY_SIZE, "%u",
124 points_index);
125 }
126 else
127 {
128 sort_host_length= hashkit->continuum_key_fn(sort_host, HASHKIT_CONTINUUM_KEY_SIZE,
129 points_index, context);
130 }
131
132 if (hashkit->weight_fn == NULL)
133 {
134 if (hashkit->continuum_hash_fn == NULL)
135 value= hashkit_default(sort_host, sort_host_length);
136 else
137 value= hashkit->continuum_hash_fn(sort_host, sort_host_length);
138
139 hashkit->continuum[continuum_index].index= count;
140 hashkit->continuum[continuum_index++].value= value;
141 }
142 else
143 {
144 unsigned int i;
145 for (i = 0; i < points_per_hash; i++)
146 {
147 value= ketama_server_hash(sort_host, (uint32_t) sort_host_length, (int) i);
148 hashkit->continuum[continuum_index].index= count;
149 hashkit->continuum[continuum_index++].value= value;
150 }
151 }
152 }
153
154 points_count+= points_per_server;
155 }
156
157 hashkit->continuum_points_count= points_count;
158 qsort(hashkit->continuum, hashkit->continuum_points_count, sizeof(hashkit_continuum_point_st),
159 continuum_points_cmp);
160
161 return 0;
162 }