46b6319c564e16a8b5c902064e7df5bcb9e06239
[awesomized/libmemcached] / libmemcached / get.c
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Get functions for libmemcached
9 *
10 */
11
12 #include "common.h"
13
14 /*
15 What happens if no servers exist?
16 */
17 char *memcached_get(memcached_st *ptr, const char *key,
18 size_t key_length,
19 size_t *value_length,
20 uint32_t *flags,
21 memcached_return_t *error)
22 {
23 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
24 flags, error);
25 }
26
27 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
28 const char *group_key,
29 size_t group_key_length,
30 const char * const *keys,
31 const size_t *key_length,
32 size_t number_of_keys,
33 bool mget_mode);
34
35 char *memcached_get_by_key(memcached_st *ptr,
36 const char *group_key,
37 size_t group_key_length,
38 const char *key, size_t key_length,
39 size_t *value_length,
40 uint32_t *flags,
41 memcached_return_t *error)
42 {
43 char *value;
44 size_t dummy_length;
45 uint32_t dummy_flags;
46 memcached_return_t dummy_error;
47
48 unlikely (ptr->flags.use_udp)
49 {
50 *error= MEMCACHED_NOT_SUPPORTED;
51 return NULL;
52 }
53
54 /* Request the key */
55 *error= memcached_mget_by_key_real(ptr, group_key, group_key_length,
56 (const char * const *)&key,
57 &key_length, 1, false);
58
59 value= memcached_fetch(ptr, NULL, NULL,
60 value_length, flags, error);
61 /* This is for historical reasons */
62 if (*error == MEMCACHED_END)
63 *error= MEMCACHED_NOTFOUND;
64
65 if (value == NULL)
66 {
67 if (ptr->get_key_failure && *error == MEMCACHED_NOTFOUND)
68 {
69 memcached_return_t rc;
70
71 memcached_result_reset(&ptr->result);
72 rc= ptr->get_key_failure(ptr, key, key_length, &ptr->result);
73
74 /* On all failure drop to returning NULL */
75 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
76 {
77 if (rc == MEMCACHED_BUFFERED)
78 {
79 uint64_t latch; /* We use latch to track the state of the original socket */
80 latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
81 if (latch == 0)
82 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
83
84 rc= memcached_set(ptr, key, key_length,
85 (memcached_result_value(&ptr->result)),
86 (memcached_result_length(&ptr->result)),
87 0,
88 (memcached_result_flags(&ptr->result)));
89
90 if (rc == MEMCACHED_BUFFERED && latch == 0)
91 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
92 }
93 else
94 {
95 rc= memcached_set(ptr, key, key_length,
96 (memcached_result_value(&ptr->result)),
97 (memcached_result_length(&ptr->result)),
98 0,
99 (memcached_result_flags(&ptr->result)));
100 }
101
102 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
103 {
104 *error= rc;
105 *value_length= memcached_result_length(&ptr->result);
106 *flags= memcached_result_flags(&ptr->result);
107 return memcached_string_c_copy(&ptr->result.value);
108 }
109 }
110 }
111
112 return NULL;
113 }
114
115 (void)memcached_fetch(ptr, NULL, NULL,
116 &dummy_length, &dummy_flags,
117 &dummy_error);
118 WATCHPOINT_ASSERT(dummy_length == 0);
119
120 return value;
121 }
122
123 memcached_return_t memcached_mget(memcached_st *ptr,
124 const char * const *keys,
125 const size_t *key_length,
126 size_t number_of_keys)
127 {
128 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
129 }
130
131 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
132 uint32_t master_server_key,
133 bool is_group_key_set,
134 const char * const *keys,
135 const size_t *key_length,
136 size_t number_of_keys,
137 bool mget_mode);
138
139 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
140 const char *group_key,
141 size_t group_key_length,
142 const char * const *keys,
143 const size_t *key_length,
144 size_t number_of_keys,
145 bool mget_mode)
146 {
147 bool failures_occured_in_sending= false;
148 const char *get_command= "get ";
149 uint8_t get_command_length= 4;
150 unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
151 bool is_group_key_set= false;
152
153 memcached_return_t rc;
154 if ((rc= initialize_query(ptr)) != MEMCACHED_SUCCESS)
155 {
156 return rc;
157 }
158
159 unlikely (ptr->flags.use_udp)
160 return MEMCACHED_NOT_SUPPORTED;
161
162 LIBMEMCACHED_MEMCACHED_MGET_START();
163
164 if (number_of_keys == 0)
165 return MEMCACHED_NOTFOUND;
166
167 if (ptr->flags.verify_key && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
168 return MEMCACHED_BAD_KEY_PROVIDED;
169
170 if (group_key && group_key_length)
171 {
172 if (ptr->flags.verify_key && (memcached_key_test((const char * const *)&group_key, &group_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
173 return MEMCACHED_BAD_KEY_PROVIDED;
174 master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
175 is_group_key_set= true;
176 }
177
178 /*
179 Here is where we pay for the non-block API. We need to remove any data sitting
180 in the queue before we start our get.
181
182 It might be optimum to bounce the connection if count > some number.
183 */
184 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
185 {
186 memcached_server_write_instance_st instance=
187 memcached_server_instance_fetch(ptr, x);
188
189 if (memcached_server_response_count(instance))
190 {
191 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
192
193 if (ptr->flags.no_block)
194 (void)memcached_io_write(instance, NULL, 0, true);
195
196 while(memcached_server_response_count(instance))
197 (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
198 }
199 }
200
201 if (ptr->flags.binary_protocol)
202 {
203 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys,
204 key_length, number_of_keys, mget_mode);
205 }
206
207 if (ptr->flags.support_cas)
208 {
209 get_command= "gets ";
210 get_command_length= 5;
211 }
212
213 /*
214 If a server fails we warn about errors and start all over with sending keys
215 to the server.
216 */
217 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
218 size_t hosts_connected= 0;
219 for (uint32_t x= 0; x < number_of_keys; x++)
220 {
221 memcached_server_write_instance_st instance;
222 uint32_t server_key;
223
224 if (is_group_key_set)
225 {
226 server_key= master_server_key;
227 }
228 else
229 {
230 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
231 }
232
233 instance= memcached_server_instance_fetch(ptr, server_key);
234
235 struct libmemcached_io_vector_st vector[]=
236 {
237 { .length= get_command_length, .buffer= get_command },
238 { .length= memcached_array_size(ptr->prefix_key), .buffer= memcached_array_string(ptr->prefix_key) },
239 { .length= key_length[x], .buffer= keys[x] },
240 { .length= 1, .buffer= " " }
241 };
242
243
244 if (memcached_server_response_count(instance) == 0)
245 {
246 rc= memcached_connect(instance);
247
248 if (rc != MEMCACHED_SUCCESS)
249 {
250 continue;
251 }
252 hosts_connected++;
253
254 if ((memcached_io_writev(instance, vector, 4, false)) == -1)
255 {
256 failures_occured_in_sending= true;
257 continue;
258 }
259 WATCHPOINT_ASSERT(instance->cursor_active == 0);
260 memcached_server_response_increment(instance);
261 WATCHPOINT_ASSERT(instance->cursor_active == 1);
262 }
263 else
264 {
265 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == -1)
266 {
267 memcached_server_response_reset(instance);
268 failures_occured_in_sending= true;
269 continue;
270 }
271 }
272 }
273
274 if (hosts_connected == 0)
275 {
276 LIBMEMCACHED_MEMCACHED_MGET_END();
277
278 if (rc != MEMCACHED_SUCCESS)
279 return rc;
280
281 return MEMCACHED_NO_SERVERS;
282 }
283
284
285 /*
286 Should we muddle on if some servers are dead?
287 */
288 bool success_happened= false;
289 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
290 {
291 memcached_server_write_instance_st instance=
292 memcached_server_instance_fetch(ptr, x);
293
294 if (memcached_server_response_count(instance))
295 {
296 /* We need to do something about non-connnected hosts in the future */
297 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
298 {
299 failures_occured_in_sending= true;
300 }
301 else
302 {
303 success_happened= true;
304 }
305 }
306 }
307
308 LIBMEMCACHED_MEMCACHED_MGET_END();
309
310 if (failures_occured_in_sending && success_happened)
311 return MEMCACHED_SOME_ERRORS;
312
313 if (success_happened)
314 return MEMCACHED_SUCCESS;
315
316 return MEMCACHED_FAILURE;
317 }
318
319 memcached_return_t memcached_mget_by_key(memcached_st *ptr,
320 const char *group_key,
321 size_t group_key_length,
322 const char * const *keys,
323 const size_t *key_length,
324 size_t number_of_keys)
325 {
326 return memcached_mget_by_key_real(ptr, group_key, group_key_length, keys,
327 key_length, number_of_keys, true);
328 }
329
330 memcached_return_t memcached_mget_execute(memcached_st *ptr,
331 const char * const *keys,
332 const size_t *key_length,
333 size_t number_of_keys,
334 memcached_execute_fn *callback,
335 void *context,
336 unsigned int number_of_callbacks)
337 {
338 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
339 number_of_keys, callback,
340 context, number_of_callbacks);
341 }
342
343 memcached_return_t memcached_mget_execute_by_key(memcached_st *ptr,
344 const char *group_key,
345 size_t group_key_length,
346 const char * const *keys,
347 const size_t *key_length,
348 size_t number_of_keys,
349 memcached_execute_fn *callback,
350 void *context,
351 unsigned int number_of_callbacks)
352 {
353 if ((ptr->flags.binary_protocol) == 0)
354 return MEMCACHED_NOT_SUPPORTED;
355
356 memcached_return_t rc;
357 memcached_callback_st *original_callbacks= ptr->callbacks;
358 memcached_callback_st cb= {
359 .callback= callback,
360 .context= context,
361 .number_of_callback= number_of_callbacks
362 };
363
364 ptr->callbacks= &cb;
365 rc= memcached_mget_by_key(ptr, group_key, group_key_length, keys,
366 key_length, number_of_keys);
367 ptr->callbacks= original_callbacks;
368 return rc;
369 }
370
371 static memcached_return_t simple_binary_mget(memcached_st *ptr,
372 uint32_t master_server_key,
373 bool is_group_key_set,
374 const char * const *keys,
375 const size_t *key_length,
376 size_t number_of_keys, bool mget_mode)
377 {
378 memcached_return_t rc= MEMCACHED_NOTFOUND;
379
380 bool flush= (number_of_keys == 1);
381
382 /*
383 If a server fails we warn about errors and start all over with sending keys
384 to the server.
385 */
386 for (uint32_t x= 0; x < number_of_keys; ++x)
387 {
388 uint32_t server_key;
389 memcached_server_write_instance_st instance;
390
391 if (is_group_key_set)
392 {
393 server_key= master_server_key;
394 }
395 else
396 {
397 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
398 }
399
400 instance= memcached_server_instance_fetch(ptr, server_key);
401
402 if (memcached_server_response_count(instance) == 0)
403 {
404 rc= memcached_connect(instance);
405 if (rc != MEMCACHED_SUCCESS)
406 continue;
407 }
408
409 protocol_binary_request_getk request= {.bytes= {0}};
410 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
411 if (mget_mode)
412 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
413 else
414 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
415
416 memcached_return_t vk;
417 vk= memcached_validate_key_length(key_length[x],
418 ptr->flags.binary_protocol);
419 unlikely (vk != MEMCACHED_SUCCESS)
420 {
421 if (x > 0)
422 {
423 memcached_io_reset(instance);
424 }
425
426 return vk;
427 }
428
429 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->prefix_key)));
430 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
431 request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->prefix_key)));
432
433 struct libmemcached_io_vector_st vector[]=
434 {
435 { .length= sizeof(request.bytes), .buffer= request.bytes },
436 { .length= memcached_array_size(ptr->prefix_key), .buffer= memcached_array_string(ptr->prefix_key) },
437 { .length= key_length[x], .buffer= keys[x] }
438 };
439
440 if (memcached_io_writev(instance, vector, 3, flush) == -1)
441 {
442 memcached_server_response_reset(instance);
443 rc= MEMCACHED_SOME_ERRORS;
444 continue;
445 }
446
447 /* We just want one pending response per server */
448 memcached_server_response_reset(instance);
449 memcached_server_response_increment(instance);
450 if ((x > 0 && x == ptr->io_key_prefetch) && memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
451 {
452 rc= MEMCACHED_SOME_ERRORS;
453 }
454 }
455
456 if (mget_mode)
457 {
458 /*
459 * Send a noop command to flush the buffers
460 */
461 protocol_binary_request_noop request= {.bytes= {0}};
462 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
463 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
464 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
465
466 for (uint32_t x= 0; x < memcached_server_count(ptr); ++x)
467 {
468 memcached_server_write_instance_st instance=
469 memcached_server_instance_fetch(ptr, x);
470
471 if (memcached_server_response_count(instance))
472 {
473 if (memcached_io_write(instance, NULL, 0, true) == -1)
474 {
475 memcached_server_response_reset(instance);
476 memcached_io_reset(instance);
477 rc= MEMCACHED_SOME_ERRORS;
478 }
479
480 if (memcached_io_write(instance, request.bytes,
481 sizeof(request.bytes), true) == -1)
482 {
483 memcached_server_response_reset(instance);
484 memcached_io_reset(instance);
485 rc= MEMCACHED_SOME_ERRORS;
486 }
487 }
488 }
489 }
490
491
492 return rc;
493 }
494
495 static memcached_return_t replication_binary_mget(memcached_st *ptr,
496 uint32_t* hash,
497 bool* dead_servers,
498 const char *const *keys,
499 const size_t *key_length,
500 size_t number_of_keys)
501 {
502 memcached_return_t rc= MEMCACHED_NOTFOUND;
503 uint32_t start= 0;
504 uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
505
506 if (randomize_read)
507 start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
508
509 /* Loop for each replica */
510 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
511 {
512 bool success= true;
513
514 for (uint32_t x= 0; x < number_of_keys; ++x)
515 {
516 memcached_server_write_instance_st instance;
517
518 if (hash[x] == memcached_server_count(ptr))
519 continue; /* Already successfully sent */
520
521 uint32_t server= hash[x] + replica;
522
523 /* In case of randomized reads */
524 if (randomize_read && ((server + start) <= (hash[x] + ptr->number_of_replicas)))
525 server += start;
526
527 while (server >= memcached_server_count(ptr))
528 server -= memcached_server_count(ptr);
529
530 if (dead_servers[server])
531 continue;
532
533 instance= memcached_server_instance_fetch(ptr, server);
534
535 if (memcached_server_response_count(instance) == 0)
536 {
537 rc= memcached_connect(instance);
538 if (rc != MEMCACHED_SUCCESS)
539 {
540 memcached_io_reset(instance);
541 dead_servers[server]= true;
542 success= false;
543 continue;
544 }
545 }
546
547 protocol_binary_request_getk request= {
548 .message.header.request= {
549 .magic= PROTOCOL_BINARY_REQ,
550 .opcode= PROTOCOL_BINARY_CMD_GETK,
551 .keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->prefix_key))),
552 .datatype= PROTOCOL_BINARY_RAW_BYTES,
553 .bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->prefix_key)))
554 }
555 };
556
557 /*
558 * We need to disable buffering to actually know that the request was
559 * successfully sent to the server (so that we should expect a result
560 * back). It would be nice to do this in buffered mode, but then it
561 * would be complex to handle all error situations if we got to send
562 * some of the messages, and then we failed on writing out some others
563 * and we used the callback interface from memcached_mget_execute so
564 * that we might have processed some of the responses etc. For now,
565 * just make sure we work _correctly_
566 */
567 struct libmemcached_io_vector_st vector[]=
568 {
569 { .length= sizeof(request.bytes), .buffer= request.bytes },
570 { .length= memcached_array_size(ptr->prefix_key), .buffer= memcached_array_string(ptr->prefix_key) },
571 { .length= key_length[x], .buffer= keys[x] }
572 };
573
574 if (memcached_io_writev(instance, vector, 3, true) == -1)
575 {
576 memcached_io_reset(instance);
577 dead_servers[server]= true;
578 success= false;
579 continue;
580 }
581
582 memcached_server_response_increment(instance);
583 hash[x]= memcached_server_count(ptr);
584 }
585
586 if (success)
587 break;
588 }
589
590 return rc;
591 }
592
593 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
594 uint32_t master_server_key,
595 bool is_group_key_set,
596 const char * const *keys,
597 const size_t *key_length,
598 size_t number_of_keys,
599 bool mget_mode)
600 {
601 memcached_return_t rc;
602
603 if (ptr->number_of_replicas == 0)
604 {
605 rc= simple_binary_mget(ptr, master_server_key, is_group_key_set,
606 keys, key_length, number_of_keys, mget_mode);
607 }
608 else
609 {
610 uint32_t* hash;
611 bool* dead_servers;
612
613 hash= libmemcached_malloc(ptr, sizeof(uint32_t) * number_of_keys);
614 dead_servers= libmemcached_calloc(ptr, memcached_server_count(ptr), sizeof(bool));
615
616 if (hash == NULL || dead_servers == NULL)
617 {
618 libmemcached_free(ptr, hash);
619 libmemcached_free(ptr, dead_servers);
620 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
621 }
622
623 if (is_group_key_set)
624 {
625 for (size_t x= 0; x < number_of_keys; x++)
626 {
627 hash[x]= master_server_key;
628 }
629 }
630 else
631 {
632 for (size_t x= 0; x < number_of_keys; x++)
633 {
634 hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
635 }
636 }
637
638 rc= replication_binary_mget(ptr, hash, dead_servers, keys,
639 key_length, number_of_keys);
640
641 libmemcached_free(ptr, hash);
642 libmemcached_free(ptr, dead_servers);
643
644 return MEMCACHED_SUCCESS;
645 }
646
647 return rc;
648 }