Fix for case where key may not be checked.
[m6w6/libmemcached] / libmemcached / get.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include <libmemcached/common.h>
39
40 /*
41 What happens if no servers exist?
42 */
43 char *memcached_get(memcached_st *ptr, const char *key,
44 size_t key_length,
45 size_t *value_length,
46 uint32_t *flags,
47 memcached_return_t *error)
48 {
49 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
50 flags, error);
51 }
52
53 static memcached_return_t __mget_by_key_real(memcached_st *ptr,
54 const char *group_key,
55 size_t group_key_length,
56 const char * const *keys,
57 const size_t *key_length,
58 size_t number_of_keys,
59 const bool mget_mode);
60 char *memcached_get_by_key(memcached_st *shell,
61 const char *group_key,
62 size_t group_key_length,
63 const char *key, size_t key_length,
64 size_t *value_length,
65 uint32_t *flags,
66 memcached_return_t *error)
67 {
68 Memcached* ptr= memcached2Memcached(shell);
69 memcached_return_t unused;
70 if (error == NULL)
71 {
72 error= &unused;
73 }
74
75 uint64_t query_id= 0;
76 if (ptr)
77 {
78 query_id= ptr->query_id;
79 }
80
81 /* Request the key */
82 *error= __mget_by_key_real(ptr, group_key, group_key_length,
83 (const char * const *)&key, &key_length,
84 1, false);
85 if (ptr)
86 {
87 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
88 }
89
90 if (memcached_failed(*error))
91 {
92 if (ptr)
93 {
94 if (memcached_has_current_error(*ptr)) // Find the most accurate error
95 {
96 *error= memcached_last_error(ptr);
97 }
98 }
99
100 if (value_length)
101 {
102 *value_length= 0;
103 }
104
105 return NULL;
106 }
107
108 char *value= memcached_fetch(ptr, NULL, NULL,
109 value_length, flags, error);
110 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
111
112 /* This is for historical reasons */
113 if (*error == MEMCACHED_END)
114 {
115 *error= MEMCACHED_NOTFOUND;
116 }
117 if (value == NULL)
118 {
119 if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND)
120 {
121 memcached_result_st key_failure_result;
122 memcached_result_st* result_ptr= memcached_result_create(ptr, &key_failure_result);
123 memcached_return_t rc= ptr->get_key_failure(ptr, key, key_length, result_ptr);
124
125 /* On all failure drop to returning NULL */
126 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
127 {
128 if (rc == MEMCACHED_BUFFERED)
129 {
130 uint64_t latch; /* We use latch to track the state of the original socket */
131 latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
132 if (latch == 0)
133 {
134 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
135 }
136
137 rc= memcached_set(ptr, key, key_length,
138 (memcached_result_value(result_ptr)),
139 (memcached_result_length(result_ptr)),
140 0,
141 (memcached_result_flags(result_ptr)));
142
143 if (rc == MEMCACHED_BUFFERED and latch == 0)
144 {
145 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
146 }
147 }
148 else
149 {
150 rc= memcached_set(ptr, key, key_length,
151 (memcached_result_value(result_ptr)),
152 (memcached_result_length(result_ptr)),
153 0,
154 (memcached_result_flags(result_ptr)));
155 }
156
157 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
158 {
159 *error= rc;
160 *value_length= memcached_result_length(result_ptr);
161 *flags= memcached_result_flags(result_ptr);
162 char *result_value= memcached_string_take_value(&result_ptr->value);
163 memcached_result_free(result_ptr);
164
165 return result_value;
166 }
167 }
168
169 memcached_result_free(result_ptr);
170 }
171 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
172
173 return NULL;
174 }
175
176 return value;
177 }
178
179 memcached_return_t memcached_mget(memcached_st *ptr,
180 const char * const *keys,
181 const size_t *key_length,
182 size_t number_of_keys)
183 {
184 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
185 }
186
187 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
188 const uint32_t master_server_key,
189 const bool is_group_key_set,
190 const char * const *keys,
191 const size_t *key_length,
192 const size_t number_of_keys,
193 const bool mget_mode);
194
195 static memcached_return_t __mget_by_key_real(memcached_st *ptr,
196 const char *group_key,
197 const size_t group_key_length,
198 const char * const *keys,
199 const size_t *key_length,
200 size_t number_of_keys,
201 const bool mget_mode)
202 {
203 bool failures_occured_in_sending= false;
204 const char *get_command= "get";
205 uint8_t get_command_length= 3;
206 unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
207
208 memcached_return_t rc;
209 if (memcached_failed(rc= initialize_query(ptr, true)))
210 {
211 return rc;
212 }
213
214 if (memcached_is_udp(ptr))
215 {
216 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
217 }
218
219 LIBMEMCACHED_MEMCACHED_MGET_START();
220
221 if (number_of_keys == 0)
222 {
223 return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("Numbers of keys provided was zero"));
224 }
225
226 if (memcached_failed((rc= memcached_key_test(*ptr, keys, key_length, number_of_keys))))
227 {
228 assert(memcached_last_error(ptr) == rc);
229
230 return rc;
231 }
232
233 bool is_group_key_set= false;
234 if (group_key and group_key_length)
235 {
236 master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
237 is_group_key_set= true;
238 }
239
240 /*
241 Here is where we pay for the non-block API. We need to remove any data sitting
242 in the queue before we start our get.
243
244 It might be optimum to bounce the connection if count > some number.
245 */
246 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
247 {
248 memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
249
250 if (instance->response_count())
251 {
252 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
253
254 if (ptr->flags.no_block)
255 {
256 memcached_io_write(instance);
257 }
258
259 while(instance->response_count())
260 {
261 (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
262 }
263 }
264 }
265
266 if (memcached_is_binary(ptr))
267 {
268 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys,
269 key_length, number_of_keys, mget_mode);
270 }
271
272 if (ptr->flags.support_cas)
273 {
274 get_command= "gets";
275 get_command_length= 4;
276 }
277
278 /*
279 If a server fails we warn about errors and start all over with sending keys
280 to the server.
281 */
282 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
283 size_t hosts_connected= 0;
284 for (uint32_t x= 0; x < number_of_keys; x++)
285 {
286 uint32_t server_key;
287
288 if (is_group_key_set)
289 {
290 server_key= master_server_key;
291 }
292 else
293 {
294 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
295 }
296
297 memcached_instance_st* instance= memcached_instance_fetch(ptr, server_key);
298
299 libmemcached_io_vector_st vector[]=
300 {
301 { get_command, get_command_length },
302 { memcached_literal_param(" ") },
303 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
304 { keys[x], key_length[x] }
305 };
306
307
308 if (instance->response_count() == 0)
309 {
310 rc= memcached_connect(instance);
311
312 if (memcached_failed(rc))
313 {
314 memcached_set_error(*instance, rc, MEMCACHED_AT);
315 continue;
316 }
317 hosts_connected++;
318
319 if ((memcached_io_writev(instance, vector, 1, false)) == false)
320 {
321 failures_occured_in_sending= true;
322 continue;
323 }
324 WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
325 memcached_instance_response_increment(instance);
326 WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
327 }
328
329 {
330 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false)
331 {
332 memcached_instance_response_reset(instance);
333 failures_occured_in_sending= true;
334 continue;
335 }
336 }
337 }
338
339 if (hosts_connected == 0)
340 {
341 LIBMEMCACHED_MEMCACHED_MGET_END();
342
343 if (memcached_failed(rc))
344 {
345 return rc;
346 }
347
348 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
349 }
350
351
352 /*
353 Should we muddle on if some servers are dead?
354 */
355 bool success_happened= false;
356 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
357 {
358 memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
359
360 if (instance->response_count())
361 {
362 /* We need to do something about non-connnected hosts in the future */
363 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
364 {
365 failures_occured_in_sending= true;
366 }
367 else
368 {
369 success_happened= true;
370 }
371 }
372 }
373
374 LIBMEMCACHED_MEMCACHED_MGET_END();
375
376 if (failures_occured_in_sending and success_happened)
377 {
378 return MEMCACHED_SOME_ERRORS;
379 }
380
381 if (success_happened)
382 {
383 return MEMCACHED_SUCCESS;
384 }
385
386 return MEMCACHED_FAILURE; // Complete failure occurred
387 }
388
389 memcached_return_t memcached_mget_by_key(memcached_st *shell,
390 const char *group_key,
391 size_t group_key_length,
392 const char * const *keys,
393 const size_t *key_length,
394 size_t number_of_keys)
395 {
396 Memcached* ptr= memcached2Memcached(shell);
397 return __mget_by_key_real(ptr, group_key, group_key_length, keys, key_length, number_of_keys, true);
398 }
399
400 memcached_return_t memcached_mget_execute(memcached_st *ptr,
401 const char * const *keys,
402 const size_t *key_length,
403 size_t number_of_keys,
404 memcached_execute_fn *callback,
405 void *context,
406 unsigned int number_of_callbacks)
407 {
408 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
409 number_of_keys, callback,
410 context, number_of_callbacks);
411 }
412
413 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell,
414 const char *group_key,
415 size_t group_key_length,
416 const char * const *keys,
417 const size_t *key_length,
418 size_t number_of_keys,
419 memcached_execute_fn *callback,
420 void *context,
421 unsigned int number_of_callbacks)
422 {
423 Memcached* ptr= memcached2Memcached(shell);
424 memcached_return_t rc;
425 if (memcached_failed(rc= initialize_query(ptr, false)))
426 {
427 return rc;
428 }
429
430 if (memcached_is_udp(ptr))
431 {
432 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
433 }
434
435 if (memcached_is_binary(ptr) == false)
436 {
437 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
438 memcached_literal_param("ASCII protocol is not supported for memcached_mget_execute_by_key()"));
439 }
440
441 memcached_callback_st *original_callbacks= ptr->callbacks;
442 memcached_callback_st cb= {
443 callback,
444 context,
445 number_of_callbacks
446 };
447
448 ptr->callbacks= &cb;
449 rc= memcached_mget_by_key(ptr, group_key, group_key_length, keys,
450 key_length, number_of_keys);
451 ptr->callbacks= original_callbacks;
452
453 return rc;
454 }
455
456 static memcached_return_t simple_binary_mget(memcached_st *ptr,
457 const uint32_t master_server_key,
458 bool is_group_key_set,
459 const char * const *keys,
460 const size_t *key_length,
461 const size_t number_of_keys, const bool mget_mode)
462 {
463 memcached_return_t rc= MEMCACHED_NOTFOUND;
464
465 bool flush= (number_of_keys == 1);
466
467 if (memcached_failed(rc= memcached_key_test(*ptr, keys, key_length, number_of_keys)))
468 {
469 return rc;
470 }
471
472 /*
473 If a server fails we warn about errors and start all over with sending keys
474 to the server.
475 */
476 for (uint32_t x= 0; x < number_of_keys; ++x)
477 {
478 uint32_t server_key;
479
480 if (is_group_key_set)
481 {
482 server_key= master_server_key;
483 }
484 else
485 {
486 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
487 }
488
489 memcached_instance_st* instance= memcached_instance_fetch(ptr, server_key);
490
491 if (instance->response_count() == 0)
492 {
493 rc= memcached_connect(instance);
494 if (memcached_failed(rc))
495 {
496 continue;
497 }
498 }
499
500 protocol_binary_request_getk request= { }; //= {.bytes= {0}};
501 initialize_binary_request(instance, request.message.header);
502 if (mget_mode)
503 {
504 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
505 }
506 else
507 {
508 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
509 }
510
511 #if 0
512 {
513 memcached_return_t vk= memcached_validate_key_length(key_length[x], ptr->flags.binary_protocol);
514 if (memcached_failed(rc= memcached_key_test(*memc, (const char **)&key, &key_length, 1)))
515 {
516 memcached_set_error(ptr, vk, MEMCACHED_AT, memcached_literal_param("Key was too long."));
517
518 if (x > 0)
519 {
520 memcached_io_reset(instance);
521 }
522
523 return vk;
524 }
525 }
526 #endif
527
528 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
529 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
530 request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->_namespace)));
531
532 libmemcached_io_vector_st vector[]=
533 {
534 { request.bytes, sizeof(request.bytes) },
535 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
536 { keys[x], key_length[x] }
537 };
538
539 if (memcached_io_writev(instance, vector, 3, flush) == false)
540 {
541 memcached_server_response_reset(instance);
542 rc= MEMCACHED_SOME_ERRORS;
543 continue;
544 }
545
546 /* We just want one pending response per server */
547 memcached_server_response_reset(instance);
548 memcached_server_response_increment(instance);
549 if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
550 {
551 rc= MEMCACHED_SOME_ERRORS;
552 }
553 }
554
555 if (mget_mode)
556 {
557 /*
558 Send a noop command to flush the buffers
559 */
560 protocol_binary_request_noop request= {}; //= {.bytes= {0}};
561 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
562 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
563
564 for (uint32_t x= 0; x < memcached_server_count(ptr); ++x)
565 {
566 memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
567
568 if (instance->response_count())
569 {
570 initialize_binary_request(instance, request.message.header);
571 if ((memcached_io_write(instance) == false) or
572 (memcached_io_write(instance, request.bytes, sizeof(request.bytes), true) == -1))
573 {
574 memcached_instance_response_reset(instance);
575 memcached_io_reset(instance);
576 rc= MEMCACHED_SOME_ERRORS;
577 }
578 }
579 }
580 }
581
582 return rc;
583 }
584
585 static memcached_return_t replication_binary_mget(memcached_st *ptr,
586 uint32_t* hash,
587 bool* dead_servers,
588 const char *const *keys,
589 const size_t *key_length,
590 const size_t number_of_keys)
591 {
592 memcached_return_t rc= MEMCACHED_NOTFOUND;
593 uint32_t start= 0;
594 uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
595
596 if (randomize_read)
597 {
598 start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
599 }
600
601 /* Loop for each replica */
602 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
603 {
604 bool success= true;
605
606 for (uint32_t x= 0; x < number_of_keys; ++x)
607 {
608 if (hash[x] == memcached_server_count(ptr))
609 {
610 continue; /* Already successfully sent */
611 }
612
613 uint32_t server= hash[x] +replica;
614
615 /* In case of randomized reads */
616 if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas)))
617 {
618 server+= start;
619 }
620
621 while (server >= memcached_server_count(ptr))
622 {
623 server -= memcached_server_count(ptr);
624 }
625
626 if (dead_servers[server])
627 {
628 continue;
629 }
630
631 memcached_instance_st* instance= memcached_instance_fetch(ptr, server);
632
633 if (instance->response_count() == 0)
634 {
635 rc= memcached_connect(instance);
636
637 if (memcached_failed(rc))
638 {
639 memcached_io_reset(instance);
640 dead_servers[server]= true;
641 success= false;
642 continue;
643 }
644 }
645
646 protocol_binary_request_getk request= {};
647 initialize_binary_request(instance, request.message.header);
648 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
649 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
650 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
651 request.message.header.request.bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
652
653 /*
654 * We need to disable buffering to actually know that the request was
655 * successfully sent to the server (so that we should expect a result
656 * back). It would be nice to do this in buffered mode, but then it
657 * would be complex to handle all error situations if we got to send
658 * some of the messages, and then we failed on writing out some others
659 * and we used the callback interface from memcached_mget_execute so
660 * that we might have processed some of the responses etc. For now,
661 * just make sure we work _correctly_
662 */
663 libmemcached_io_vector_st vector[]=
664 {
665 { request.bytes, sizeof(request.bytes) },
666 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
667 { keys[x], key_length[x] }
668 };
669
670 if (memcached_io_writev(instance, vector, 3, true) == false)
671 {
672 memcached_io_reset(instance);
673 dead_servers[server]= true;
674 success= false;
675 continue;
676 }
677
678 memcached_server_response_increment(instance);
679 hash[x]= memcached_server_count(ptr);
680 }
681
682 if (success)
683 {
684 break;
685 }
686 }
687
688 return rc;
689 }
690
691 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
692 const uint32_t master_server_key,
693 bool is_group_key_set,
694 const char * const *keys,
695 const size_t *key_length,
696 const size_t number_of_keys,
697 const bool mget_mode)
698 {
699 if (ptr->number_of_replicas == 0)
700 {
701 return simple_binary_mget(ptr, master_server_key, is_group_key_set,
702 keys, key_length, number_of_keys, mget_mode);
703 }
704
705 uint32_t* hash= libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
706 bool* dead_servers= libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
707
708 if (hash == NULL or dead_servers == NULL)
709 {
710 libmemcached_free(ptr, hash);
711 libmemcached_free(ptr, dead_servers);
712 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
713 }
714
715 if (is_group_key_set)
716 {
717 for (size_t x= 0; x < number_of_keys; x++)
718 {
719 hash[x]= master_server_key;
720 }
721 }
722 else
723 {
724 for (size_t x= 0; x < number_of_keys; x++)
725 {
726 hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
727 }
728 }
729
730 memcached_return_t rc= replication_binary_mget(ptr, hash, dead_servers, keys,
731 key_length, number_of_keys);
732
733 WATCHPOINT_IFERROR(rc);
734 libmemcached_free(ptr, hash);
735 libmemcached_free(ptr, dead_servers);
736
737 return MEMCACHED_SUCCESS;
738 }