Encapsulate more of the cleanup logic.
[awesomized/libmemcached] / libmemcached / get.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include <libmemcached/common.h>
39
40 /*
41 What happens if no servers exist?
42 */
43 char *memcached_get(memcached_st *ptr, const char *key,
44 size_t key_length,
45 size_t *value_length,
46 uint32_t *flags,
47 memcached_return_t *error)
48 {
49 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
50 flags, error);
51 }
52
53 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
54 const char *group_key,
55 size_t group_key_length,
56 const char * const *keys,
57 const size_t *key_length,
58 size_t number_of_keys,
59 bool mget_mode);
60
61 char *memcached_get_by_key(memcached_st *shell,
62 const char *group_key,
63 size_t group_key_length,
64 const char *key, size_t key_length,
65 size_t *value_length,
66 uint32_t *flags,
67 memcached_return_t *error)
68 {
69 Memcached* ptr= memcached2Memcached(shell);
70 memcached_return_t unused;
71 if (error == NULL)
72 {
73 error= &unused;
74 }
75
76 uint64_t query_id= 0;
77 if (ptr)
78 {
79 query_id= ptr->query_id;
80 }
81
82 /* Request the key */
83 *error= memcached_mget_by_key_real(ptr, group_key, group_key_length,
84 (const char * const *)&key, &key_length,
85 1, false);
86 if (ptr)
87 {
88 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
89 }
90
91 if (memcached_failed(*error))
92 {
93 if (ptr)
94 {
95 if (memcached_has_current_error(*ptr)) // Find the most accurate error
96 {
97 *error= memcached_last_error(ptr);
98 }
99 }
100
101 if (value_length)
102 {
103 *value_length= 0;
104 }
105
106 return NULL;
107 }
108
109 char *value= memcached_fetch(ptr, NULL, NULL,
110 value_length, flags, error);
111 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
112
113 /* This is for historical reasons */
114 if (*error == MEMCACHED_END)
115 {
116 *error= MEMCACHED_NOTFOUND;
117 }
118
119 if (value == NULL)
120 {
121 if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND)
122 {
123 memcached_result_st key_failure_result;
124 memcached_result_st* result_ptr= memcached_result_create(ptr, &key_failure_result);
125 memcached_return_t rc= ptr->get_key_failure(ptr, key, key_length, result_ptr);
126
127 /* On all failure drop to returning NULL */
128 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
129 {
130 if (rc == MEMCACHED_BUFFERED)
131 {
132 uint64_t latch; /* We use latch to track the state of the original socket */
133 latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
134 if (latch == 0)
135 {
136 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
137 }
138
139 rc= memcached_set(ptr, key, key_length,
140 (memcached_result_value(result_ptr)),
141 (memcached_result_length(result_ptr)),
142 0,
143 (memcached_result_flags(result_ptr)));
144
145 if (rc == MEMCACHED_BUFFERED and latch == 0)
146 {
147 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
148 }
149 }
150 else
151 {
152 rc= memcached_set(ptr, key, key_length,
153 (memcached_result_value(result_ptr)),
154 (memcached_result_length(result_ptr)),
155 0,
156 (memcached_result_flags(result_ptr)));
157 }
158
159 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
160 {
161 *error= rc;
162 *value_length= memcached_result_length(result_ptr);
163 *flags= memcached_result_flags(result_ptr);
164 char *result_value= memcached_string_take_value(&result_ptr->value);
165 memcached_result_free(result_ptr);
166
167 return result_value;
168 }
169 }
170
171 memcached_result_free(result_ptr);
172 }
173 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
174
175 return NULL;
176 }
177
178 return value;
179 }
180
181 memcached_return_t memcached_mget(memcached_st *ptr,
182 const char * const *keys,
183 const size_t *key_length,
184 size_t number_of_keys)
185 {
186 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
187 }
188
189 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
190 uint32_t master_server_key,
191 bool is_group_key_set,
192 const char * const *keys,
193 const size_t *key_length,
194 size_t number_of_keys,
195 bool mget_mode);
196
197 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
198 const char *group_key,
199 size_t group_key_length,
200 const char * const *keys,
201 const size_t *key_length,
202 size_t number_of_keys,
203 bool mget_mode)
204 {
205 bool failures_occured_in_sending= false;
206 const char *get_command= "get";
207 uint8_t get_command_length= 3;
208 unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
209
210 memcached_return_t rc;
211 if (memcached_failed(rc= initialize_query(ptr, true)))
212 {
213 return rc;
214 }
215
216 if (memcached_is_udp(ptr))
217 {
218 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
219 }
220
221 LIBMEMCACHED_MEMCACHED_MGET_START();
222
223 if (number_of_keys == 0)
224 {
225 return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT, memcached_literal_param("number_of_keys was zero"));
226 }
227
228 if (memcached_failed(memcached_key_test(*ptr, keys, key_length, number_of_keys)))
229 {
230 return memcached_last_error(ptr);
231 }
232
233 bool is_group_key_set= false;
234 if (group_key and group_key_length)
235 {
236 master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
237 is_group_key_set= true;
238 }
239
240 /*
241 Here is where we pay for the non-block API. We need to remove any data sitting
242 in the queue before we start our get.
243
244 It might be optimum to bounce the connection if count > some number.
245 */
246 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
247 {
248 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, x);
249
250 if (instance->response_count())
251 {
252 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
253
254 if (ptr->flags.no_block)
255 {
256 memcached_io_write(instance);
257 }
258
259 while(instance->response_count())
260 {
261 (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
262 }
263 }
264 }
265
266 if (memcached_is_binary(ptr))
267 {
268 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys,
269 key_length, number_of_keys, mget_mode);
270 }
271
272 if (ptr->flags.support_cas)
273 {
274 get_command= "gets";
275 get_command_length= 4;
276 }
277
278 /*
279 If a server fails we warn about errors and start all over with sending keys
280 to the server.
281 */
282 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
283 size_t hosts_connected= 0;
284 for (uint32_t x= 0; x < number_of_keys; x++)
285 {
286 uint32_t server_key;
287
288 if (is_group_key_set)
289 {
290 server_key= master_server_key;
291 }
292 else
293 {
294 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
295 }
296
297 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, server_key);
298
299 libmemcached_io_vector_st vector[]=
300 {
301 { get_command, get_command_length },
302 { memcached_literal_param(" ") },
303 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
304 { keys[x], key_length[x] }
305 };
306
307
308 if (instance->response_count() == 0)
309 {
310 rc= memcached_connect(instance);
311
312 if (memcached_failed(rc))
313 {
314 memcached_set_error(*instance, rc, MEMCACHED_AT);
315 continue;
316 }
317 hosts_connected++;
318
319 if ((memcached_io_writev(instance, vector, 1, false)) == false)
320 {
321 failures_occured_in_sending= true;
322 continue;
323 }
324 WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
325 memcached_instance_response_increment(instance);
326 WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
327 }
328
329 {
330 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false)
331 {
332 memcached_instance_response_reset(instance);
333 failures_occured_in_sending= true;
334 continue;
335 }
336 }
337 }
338
339 if (hosts_connected == 0)
340 {
341 LIBMEMCACHED_MEMCACHED_MGET_END();
342
343 if (memcached_failed(rc))
344 {
345 return rc;
346 }
347
348 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
349 }
350
351
352 /*
353 Should we muddle on if some servers are dead?
354 */
355 bool success_happened= false;
356 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
357 {
358 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, x);
359
360 if (instance->response_count())
361 {
362 /* We need to do something about non-connnected hosts in the future */
363 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
364 {
365 failures_occured_in_sending= true;
366 }
367 else
368 {
369 success_happened= true;
370 }
371 }
372 }
373
374 LIBMEMCACHED_MEMCACHED_MGET_END();
375
376 if (failures_occured_in_sending and success_happened)
377 {
378 return MEMCACHED_SOME_ERRORS;
379 }
380
381 if (success_happened)
382 {
383 return MEMCACHED_SUCCESS;
384 }
385
386 return MEMCACHED_FAILURE; // Complete failure occurred
387 }
388
389 memcached_return_t memcached_mget_by_key(memcached_st *shell,
390 const char *group_key,
391 size_t group_key_length,
392 const char * const *keys,
393 const size_t *key_length,
394 size_t number_of_keys)
395 {
396 Memcached* ptr= memcached2Memcached(shell);
397 return memcached_mget_by_key_real(ptr, group_key, group_key_length, keys,
398 key_length, number_of_keys, true);
399 }
400
401 memcached_return_t memcached_mget_execute(memcached_st *ptr,
402 const char * const *keys,
403 const size_t *key_length,
404 size_t number_of_keys,
405 memcached_execute_fn *callback,
406 void *context,
407 unsigned int number_of_callbacks)
408 {
409 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
410 number_of_keys, callback,
411 context, number_of_callbacks);
412 }
413
414 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell,
415 const char *group_key,
416 size_t group_key_length,
417 const char * const *keys,
418 const size_t *key_length,
419 size_t number_of_keys,
420 memcached_execute_fn *callback,
421 void *context,
422 unsigned int number_of_callbacks)
423 {
424 Memcached* ptr= memcached2Memcached(shell);
425 memcached_return_t rc;
426 if (memcached_failed(rc= initialize_query(ptr, false)))
427 {
428 return rc;
429 }
430
431 if (memcached_is_udp(ptr))
432 {
433 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
434 }
435
436 if (memcached_is_binary(ptr) == false)
437 {
438 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
439 memcached_literal_param("ASCII protocol is not supported for memcached_mget_execute_by_key()"));
440 }
441
442 memcached_callback_st *original_callbacks= ptr->callbacks;
443 memcached_callback_st cb= {
444 callback,
445 context,
446 number_of_callbacks
447 };
448
449 ptr->callbacks= &cb;
450 rc= memcached_mget_by_key(ptr, group_key, group_key_length, keys,
451 key_length, number_of_keys);
452 ptr->callbacks= original_callbacks;
453 return rc;
454 }
455
456 static memcached_return_t simple_binary_mget(memcached_st *ptr,
457 uint32_t master_server_key,
458 bool is_group_key_set,
459 const char * const *keys,
460 const size_t *key_length,
461 size_t number_of_keys, bool mget_mode)
462 {
463 memcached_return_t rc= MEMCACHED_NOTFOUND;
464
465 bool flush= (number_of_keys == 1);
466
467 /*
468 If a server fails we warn about errors and start all over with sending keys
469 to the server.
470 */
471 for (uint32_t x= 0; x < number_of_keys; ++x)
472 {
473 uint32_t server_key;
474
475 if (is_group_key_set)
476 {
477 server_key= master_server_key;
478 }
479 else
480 {
481 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
482 }
483
484 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, server_key);
485
486 if (instance->response_count() == 0)
487 {
488 rc= memcached_connect(instance);
489 if (memcached_failed(rc))
490 {
491 continue;
492 }
493 }
494
495 protocol_binary_request_getk request= { }; //= {.bytes= {0}};
496 initialize_binary_request(instance, request.message.header);
497 if (mget_mode)
498 {
499 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
500 }
501 else
502 {
503 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
504 }
505
506 memcached_return_t vk;
507 vk= memcached_validate_key_length(key_length[x],
508 ptr->flags.binary_protocol);
509 if (vk != MEMCACHED_SUCCESS)
510 {
511 if (x > 0)
512 {
513 memcached_io_reset(instance);
514 }
515
516 return vk;
517 }
518
519 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
520 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
521 request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->_namespace)));
522
523 libmemcached_io_vector_st vector[]=
524 {
525 { request.bytes, sizeof(request.bytes) },
526 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
527 { keys[x], key_length[x] }
528 };
529
530 if (memcached_io_writev(instance, vector, 3, flush) == false)
531 {
532 memcached_server_response_reset(instance);
533 rc= MEMCACHED_SOME_ERRORS;
534 continue;
535 }
536
537 /* We just want one pending response per server */
538 memcached_server_response_reset(instance);
539 memcached_server_response_increment(instance);
540 if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
541 {
542 rc= MEMCACHED_SOME_ERRORS;
543 }
544 }
545
546 if (mget_mode)
547 {
548 /*
549 Send a noop command to flush the buffers
550 */
551 protocol_binary_request_noop request= {}; //= {.bytes= {0}};
552 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
553 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
554
555 for (uint32_t x= 0; x < memcached_server_count(ptr); ++x)
556 {
557 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, x);
558 initialize_binary_request(instance, request.message.header);
559
560 if (instance->response_count())
561 {
562 if (memcached_io_write(instance) == false)
563 {
564 memcached_instance_response_reset(instance);
565 memcached_io_reset(instance);
566 rc= MEMCACHED_SOME_ERRORS;
567 }
568
569 if (memcached_io_write(instance, request.bytes,
570 sizeof(request.bytes), true) == -1)
571 {
572 memcached_instance_response_reset(instance);
573 memcached_io_reset(instance);
574 rc= MEMCACHED_SOME_ERRORS;
575 }
576 }
577 }
578 }
579
580
581 return rc;
582 }
583
584 static memcached_return_t replication_binary_mget(memcached_st *ptr,
585 uint32_t* hash,
586 bool* dead_servers,
587 const char *const *keys,
588 const size_t *key_length,
589 size_t number_of_keys)
590 {
591 memcached_return_t rc= MEMCACHED_NOTFOUND;
592 uint32_t start= 0;
593 uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
594
595 if (randomize_read)
596 {
597 start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
598 }
599
600 /* Loop for each replica */
601 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
602 {
603 bool success= true;
604
605 for (uint32_t x= 0; x < number_of_keys; ++x)
606 {
607 if (hash[x] == memcached_server_count(ptr))
608 continue; /* Already successfully sent */
609
610 uint32_t server= hash[x] + replica;
611
612 /* In case of randomized reads */
613 if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas)))
614 {
615 server+= start;
616 }
617
618 while (server >= memcached_server_count(ptr))
619 {
620 server -= memcached_server_count(ptr);
621 }
622
623 if (dead_servers[server])
624 {
625 continue;
626 }
627
628 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, server);
629
630 if (instance->response_count() == 0)
631 {
632 rc= memcached_connect(instance);
633
634 if (memcached_failed(rc))
635 {
636 memcached_io_reset(instance);
637 dead_servers[server]= true;
638 success= false;
639 continue;
640 }
641 }
642
643 protocol_binary_request_getk request= {};
644 initialize_binary_request(instance, request.message.header);
645 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
646 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
647 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
648 request.message.header.request.bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
649
650 /*
651 * We need to disable buffering to actually know that the request was
652 * successfully sent to the server (so that we should expect a result
653 * back). It would be nice to do this in buffered mode, but then it
654 * would be complex to handle all error situations if we got to send
655 * some of the messages, and then we failed on writing out some others
656 * and we used the callback interface from memcached_mget_execute so
657 * that we might have processed some of the responses etc. For now,
658 * just make sure we work _correctly_
659 */
660 libmemcached_io_vector_st vector[]=
661 {
662 { request.bytes, sizeof(request.bytes) },
663 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
664 { keys[x], key_length[x] }
665 };
666
667 if (memcached_io_writev(instance, vector, 3, true) == false)
668 {
669 memcached_io_reset(instance);
670 dead_servers[server]= true;
671 success= false;
672 continue;
673 }
674
675 memcached_server_response_increment(instance);
676 hash[x]= memcached_server_count(ptr);
677 }
678
679 if (success)
680 {
681 break;
682 }
683 }
684
685 return rc;
686 }
687
688 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
689 uint32_t master_server_key,
690 bool is_group_key_set,
691 const char * const *keys,
692 const size_t *key_length,
693 size_t number_of_keys,
694 bool mget_mode)
695 {
696 if (ptr->number_of_replicas == 0)
697 {
698 return simple_binary_mget(ptr, master_server_key, is_group_key_set,
699 keys, key_length, number_of_keys, mget_mode);
700 }
701
702 uint32_t* hash= libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
703 bool* dead_servers= libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
704
705 if (hash == NULL or dead_servers == NULL)
706 {
707 libmemcached_free(ptr, hash);
708 libmemcached_free(ptr, dead_servers);
709 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
710 }
711
712 if (is_group_key_set)
713 {
714 for (size_t x= 0; x < number_of_keys; x++)
715 {
716 hash[x]= master_server_key;
717 }
718 }
719 else
720 {
721 for (size_t x= 0; x < number_of_keys; x++)
722 {
723 hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
724 }
725 }
726
727 memcached_return_t rc= replication_binary_mget(ptr, hash, dead_servers, keys,
728 key_length, number_of_keys);
729
730 WATCHPOINT_IFERROR(rc);
731 libmemcached_free(ptr, hash);
732 libmemcached_free(ptr, dead_servers);
733
734 return MEMCACHED_SUCCESS;
735 }