Incomming code for additions in keys (see Changelog)
[awesomized/libmemcached] / libmemcached / memcached_hosts.c
1 #include "common.h"
2 #include <math.h>
3
4 /* Protoypes (static) */
5 static memcached_return server_add(memcached_st *ptr, const char *hostname,
6 unsigned int port,
7 uint32_t weight,
8 memcached_connection type);
9 memcached_return update_continuum(memcached_st *ptr);
10
11 static int compare_servers(const void *p1, const void *p2)
12 {
13 int return_value;
14 memcached_server_st *a= (memcached_server_st *)p1;
15 memcached_server_st *b= (memcached_server_st *)p2;
16
17 return_value= strcmp(a->hostname, b->hostname);
18
19 if (return_value == 0)
20 {
21 return_value= (int) (a->port - b->port);
22 }
23
24 return return_value;
25 }
26
27 static void sort_hosts(memcached_st *ptr)
28 {
29 if (ptr->number_of_hosts)
30 {
31 qsort(ptr->hosts, ptr->number_of_hosts, sizeof(memcached_server_st), compare_servers);
32 ptr->hosts[0].count= ptr->number_of_hosts;
33 }
34 }
35
36
37 memcached_return run_distribution(memcached_st *ptr)
38 {
39 switch (ptr->distribution)
40 {
41 case MEMCACHED_DISTRIBUTION_CONSISTENT:
42 case MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA:
43 return update_continuum(ptr);
44 case MEMCACHED_DISTRIBUTION_MODULA:
45 if (ptr->flags & MEM_USE_SORT_HOSTS)
46 sort_hosts(ptr);
47 break;
48 case MEMCACHED_DISTRIBUTION_RANDOM:
49 break;
50 default:
51 WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */
52 }
53
54 return MEMCACHED_SUCCESS;
55 }
56
57 void host_reset(memcached_st *ptr, memcached_server_st *host,
58 const char *hostname, unsigned int port, uint32_t weight,
59 memcached_connection type)
60 {
61 memset(host, 0, sizeof(memcached_server_st));
62 strncpy(host->hostname, hostname, MEMCACHED_MAX_HOST_LENGTH - 1);
63 host->root= ptr ? ptr : NULL;
64 host->port= port;
65 host->weight= weight;
66 host->fd= -1;
67 host->type= type;
68 host->read_ptr= host->read_buffer;
69 if (ptr)
70 host->next_retry= ptr->retry_timeout;
71 host->sockaddr_inited= MEMCACHED_NOT_ALLOCATED;
72 }
73
74 void server_list_free(memcached_st *ptr, memcached_server_st *servers)
75 {
76 unsigned int x;
77
78 if (servers == NULL)
79 return;
80
81 for (x= 0; x < servers->count; x++)
82 if (servers[x].address_info)
83 {
84 freeaddrinfo(servers[x].address_info);
85 servers[x].address_info= NULL;
86 }
87
88 if (ptr && ptr->call_free)
89 ptr->call_free(ptr, servers);
90 else
91 free(servers);
92 }
93
94 static uint32_t ketama_server_hash(const char *key, unsigned int key_length, int alignment)
95 {
96 unsigned char results[16];
97
98 md5_signature((unsigned char*)key, key_length, results);
99 return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
100 | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
101 | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
102 | (results[0 + alignment * 4] & 0xFF);
103 }
104
105 static int continuum_item_cmp(const void *t1, const void *t2)
106 {
107 memcached_continuum_item_st *ct1= (memcached_continuum_item_st *)t1;
108 memcached_continuum_item_st *ct2= (memcached_continuum_item_st *)t2;
109
110 /* Why 153? Hmmm... */
111 WATCHPOINT_ASSERT(ct1->value != 153);
112 if (ct1->value == ct2->value)
113 return 0;
114 else if (ct1->value > ct2->value)
115 return 1;
116 else
117 return -1;
118 }
119
120 memcached_return update_continuum(memcached_st *ptr)
121 {
122 uint32_t index;
123 uint32_t host_index;
124 uint32_t continuum_index= 0;
125 uint32_t value;
126 memcached_server_st *list;
127 uint32_t pointer_counter= 0;
128 uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER;
129 uint32_t pointer_per_hash= 1;
130 uint64_t total_weight= 0;
131 uint32_t is_ketama_weighted= 0;
132 uint32_t points_per_server= 0;
133
134 is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED);
135 points_per_server= is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER;
136
137 if (ptr->number_of_hosts > ptr->continuum_count)
138 {
139 memcached_continuum_item_st *new_ptr;
140
141 if (ptr->call_realloc)
142 new_ptr= (memcached_continuum_item_st *)ptr->call_realloc(ptr, ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
143 else
144 new_ptr= (memcached_continuum_item_st *)realloc(ptr->continuum, sizeof(memcached_continuum_item_st) * (ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION) * points_per_server);
145
146 if (new_ptr == 0)
147 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
148
149 ptr->continuum= new_ptr;
150 ptr->continuum_count= ptr->number_of_hosts + MEMCACHED_CONTINUUM_ADDITION;
151 }
152
153 list = ptr->hosts;
154
155 if (is_ketama_weighted)
156 {
157 for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
158 {
159 if (list[host_index].weight == 0)
160 {
161 list[host_index].weight = 1;
162 }
163 total_weight += list[host_index].weight;
164 }
165 }
166
167 for (host_index = 0; host_index < ptr->number_of_hosts; ++host_index)
168 {
169 if (is_ketama_weighted)
170 {
171 float pct = (float)list[host_index].weight / (float)total_weight;
172 pointer_per_server= floorf(pct * MEMCACHED_POINTS_PER_SERVER_KETAMA / 4 * (float)(ptr->number_of_hosts) + 0.0000000001) * 4;
173 pointer_per_hash= 4;
174 #ifdef HAVE_DEBUG
175 printf("ketama_weighted:%s|%d|%llu|%u\n",
176 list[host_index].hostname,
177 list[host_index].port,
178 (unsigned long long)list[host_index].weight,
179 pointer_per_server);
180 #endif
181 }
182 for (index= 1; index <= pointer_per_server / pointer_per_hash; ++index)
183 {
184 char sort_host[MEMCACHED_MAX_HOST_SORT_LENGTH]= "";
185 size_t sort_host_length;
186
187 if (list[host_index].port == MEMCACHED_DEFAULT_PORT)
188 {
189 sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s-%d",
190 list[host_index].hostname, index - 1);
191
192 }
193 else
194 {
195 sort_host_length= snprintf(sort_host, MEMCACHED_MAX_HOST_SORT_LENGTH, "%s:%d-%d",
196 list[host_index].hostname, list[host_index].port, index - 1);
197 }
198 WATCHPOINT_ASSERT(sort_host_length);
199
200 if (is_ketama_weighted)
201 {
202 int i;
203 for (i = 0; i < pointer_per_hash; i++)
204 {
205 value= ketama_server_hash(sort_host, sort_host_length, i);
206 ptr->continuum[continuum_index].index= host_index;
207 ptr->continuum[continuum_index++].value= value;
208 }
209 }
210 else
211 {
212 value= generate_hash_value(sort_host, sort_host_length, ptr->hash_continuum);
213 ptr->continuum[continuum_index].index= host_index;
214 ptr->continuum[continuum_index++].value= value;
215 }
216 }
217 pointer_counter+= pointer_per_server;
218 }
219
220 WATCHPOINT_ASSERT(ptr);
221 WATCHPOINT_ASSERT(ptr->continuum);
222 WATCHPOINT_ASSERT(ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);
223 ptr->continuum_points_counter= pointer_counter;
224 qsort(ptr->continuum, ptr->continuum_points_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
225
226 #ifdef HAVE_DEBUG
227 for (index= 0; ptr->number_of_hosts && index < ((ptr->number_of_hosts * MEMCACHED_POINTS_PER_SERVER) - 1); index++)
228 {
229 WATCHPOINT_ASSERT(ptr->continuum[index].value <= ptr->continuum[index + 1].value);
230 }
231 #endif
232
233 return MEMCACHED_SUCCESS;
234 }
235
236
237 memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *list)
238 {
239 unsigned int x;
240 uint16_t count;
241 memcached_server_st *new_host_list;
242
243 if (!list)
244 return MEMCACHED_SUCCESS;
245
246 count= list[0].count;
247 if (ptr->call_realloc)
248 new_host_list=
249 (memcached_server_st *)ptr->call_realloc(ptr, ptr->hosts,
250 sizeof(memcached_server_st) * (count + ptr->number_of_hosts));
251 else
252 new_host_list=
253 (memcached_server_st *)realloc(ptr->hosts,
254 sizeof(memcached_server_st) * (count + ptr->number_of_hosts));
255
256 if (!new_host_list)
257 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
258
259 ptr->hosts= new_host_list;
260
261 for (x= 0; x < count; x++)
262 {
263 WATCHPOINT_ASSERT(list[x].hostname[0] != 0);
264 host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], list[x].hostname,
265 list[x].port, list[x].weight, list[x].type);
266 ptr->number_of_hosts++;
267 }
268 ptr->hosts[0].count= ptr->number_of_hosts;
269
270 return run_distribution(ptr);
271 }
272
273 memcached_return memcached_server_add_unix_socket(memcached_st *ptr,
274 const char *filename)
275 {
276 return memcached_server_add_unix_socket_with_weight(ptr, filename, 0);
277 }
278
279 memcached_return memcached_server_add_unix_socket_with_weight(memcached_st *ptr,
280 const char *filename,
281 uint32_t weight)
282 {
283 if (!filename)
284 return MEMCACHED_FAILURE;
285
286 return server_add(ptr, filename, 0, weight, MEMCACHED_CONNECTION_UNIX_SOCKET);
287 }
288
289 memcached_return memcached_server_add_udp(memcached_st *ptr,
290 const char *hostname,
291 unsigned int port)
292 {
293 return memcached_server_add_udp_with_weight(ptr, hostname, port, 0);
294 }
295
296 memcached_return memcached_server_add_udp_with_weight(memcached_st *ptr,
297 const char *hostname,
298 unsigned int port,
299 uint32_t weight)
300 {
301 if (!port)
302 port= MEMCACHED_DEFAULT_PORT;
303
304 if (!hostname)
305 hostname= "localhost";
306
307 return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_UDP);
308 }
309
310 memcached_return memcached_server_add(memcached_st *ptr,
311 const char *hostname,
312 unsigned int port)
313 {
314 return memcached_server_add_with_weight(ptr, hostname, port, 0);
315 }
316
317 memcached_return memcached_server_add_with_weight(memcached_st *ptr,
318 const char *hostname,
319 unsigned int port,
320 uint32_t weight)
321 {
322 if (!port)
323 port= MEMCACHED_DEFAULT_PORT;
324
325 if (!hostname)
326 hostname= "localhost";
327
328 return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_TCP);
329 }
330
331 static memcached_return server_add(memcached_st *ptr, const char *hostname,
332 unsigned int port,
333 uint32_t weight,
334 memcached_connection type)
335 {
336 memcached_server_st *new_host_list;
337
338 if (ptr->call_realloc)
339 new_host_list= (memcached_server_st *)ptr->call_realloc(ptr, ptr->hosts,
340 sizeof(memcached_server_st) * (ptr->number_of_hosts+1));
341 else
342 new_host_list= (memcached_server_st *)realloc(ptr->hosts,
343 sizeof(memcached_server_st) * (ptr->number_of_hosts+1));
344 if (new_host_list == NULL)
345 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
346
347 ptr->hosts= new_host_list;
348
349 host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], hostname, port, weight, type);
350 ptr->number_of_hosts++;
351 ptr->hosts[0].count= ptr->number_of_hosts;
352
353 return run_distribution(ptr);
354 }
355
356 memcached_return memcached_server_remove(memcached_server_st *st_ptr)
357 {
358 uint32_t x, index;
359 memcached_st *ptr= st_ptr->root;
360 memcached_server_st *list= ptr->hosts;
361
362 for (x= 0, index= 0; x < ptr->number_of_hosts; x++)
363 {
364 if (strncmp(list[x].hostname, st_ptr->hostname, MEMCACHED_MAX_HOST_LENGTH)!=0 || list[x].port != st_ptr->port)
365 {
366 memcpy(list+index, list+x, sizeof(memcached_server_st));
367 index++;
368 }
369 }
370 ptr->number_of_hosts= index;
371
372 if (st_ptr->address_info)
373 {
374 freeaddrinfo(st_ptr->address_info);
375 st_ptr->address_info= NULL;
376 }
377 run_distribution(ptr);
378
379 return MEMCACHED_SUCCESS;
380 }
381
382 memcached_server_st *memcached_server_list_append(memcached_server_st *ptr,
383 const char *hostname, unsigned int port,
384 memcached_return *error)
385 {
386 return memcached_server_list_append_with_weight(ptr, hostname, port, 0, error);
387 }
388
389 memcached_server_st *memcached_server_list_append_with_weight(memcached_server_st *ptr,
390 const char *hostname, unsigned int port,
391 uint32_t weight,
392 memcached_return *error)
393 {
394 unsigned int count;
395 memcached_server_st *new_host_list;
396
397 if (hostname == NULL || error == NULL)
398 return NULL;
399
400 if (!port)
401 port= MEMCACHED_DEFAULT_PORT;
402
403 /* Increment count for hosts */
404 count= 1;
405 if (ptr != NULL)
406 {
407 count+= ptr[0].count;
408 }
409
410 new_host_list= (memcached_server_st *)realloc(ptr, sizeof(memcached_server_st) * count);
411 if (!new_host_list)
412 {
413 *error= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
414 return NULL;
415 }
416
417 host_reset(NULL, &new_host_list[count-1], hostname, port, weight, MEMCACHED_CONNECTION_TCP);
418
419 /* Backwards compatibility hack */
420 new_host_list[0].count= count;
421
422 *error= MEMCACHED_SUCCESS;
423 return new_host_list;
424 }
425
426 unsigned int memcached_server_list_count(memcached_server_st *ptr)
427 {
428 if (ptr == NULL)
429 return 0;
430
431 return ptr[0].count;
432 }
433
434 void memcached_server_list_free(memcached_server_st *ptr)
435 {
436 server_list_free(NULL, ptr);
437 }