start using a .clang-format code style
[awesomized/libmemcached] / src / libmemcached / get.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include "libmemcached/common.h"
39
40 char *memcached_get(memcached_st *ptr, const char *key,
41 size_t key_length,
42 size_t *value_length,
43 uint32_t *flags,
44 memcached_return_t *error)
45 {
46 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
47 flags, error);
48 }
49
50 static memcached_return_t __mget_by_key_real(memcached_st *ptr,
51 const char *group_key,
52 size_t group_key_length,
53 const char * const *keys,
54 const size_t *key_length,
55 size_t number_of_keys,
56 const bool mget_mode);
57 char *memcached_get_by_key(memcached_st *shell,
58 const char *group_key,
59 size_t group_key_length,
60 const char *key, size_t key_length,
61 size_t *value_length,
62 uint32_t *flags,
63 memcached_return_t *error)
64 {
65 Memcached* ptr= memcached2Memcached(shell);
66 memcached_return_t unused;
67 if (error == NULL)
68 {
69 error= &unused;
70 }
71
72 uint64_t query_id= 0;
73 if (ptr)
74 {
75 query_id= ptr->query_id;
76 }
77
78 /* Request the key */
79 *error= __mget_by_key_real(ptr, group_key, group_key_length,
80 (const char * const *)&key, &key_length,
81 1, false);
82 if (ptr)
83 {
84 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
85 }
86
87 if (memcached_failed(*error))
88 {
89 if (ptr)
90 {
91 if (memcached_has_current_error(*ptr)) // Find the most accurate error
92 {
93 *error= memcached_last_error(ptr);
94 }
95 }
96
97 if (value_length)
98 {
99 *value_length= 0;
100 }
101
102 return NULL;
103 }
104
105 char *value= memcached_fetch(ptr, NULL, NULL,
106 value_length, flags, error);
107 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
108
109 /* This is for historical reasons */
110 if (*error == MEMCACHED_END)
111 {
112 *error= MEMCACHED_NOTFOUND;
113 }
114 if (value == NULL)
115 {
116 if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND)
117 {
118 memcached_result_st key_failure_result;
119 memcached_result_st* result_ptr= memcached_result_create(ptr, &key_failure_result);
120 memcached_return_t rc= ptr->get_key_failure(ptr, key, key_length, result_ptr);
121
122 /* On all failure drop to returning NULL */
123 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
124 {
125 if (rc == MEMCACHED_BUFFERED)
126 {
127 uint64_t latch; /* We use latch to track the state of the original socket */
128 latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
129 if (latch == 0)
130 {
131 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
132 }
133
134 rc= memcached_set(ptr, key, key_length,
135 (memcached_result_value(result_ptr)),
136 (memcached_result_length(result_ptr)),
137 0,
138 (memcached_result_flags(result_ptr)));
139
140 if (rc == MEMCACHED_BUFFERED and latch == 0)
141 {
142 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
143 }
144 }
145 else
146 {
147 rc= memcached_set(ptr, key, key_length,
148 (memcached_result_value(result_ptr)),
149 (memcached_result_length(result_ptr)),
150 0,
151 (memcached_result_flags(result_ptr)));
152 }
153
154 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
155 {
156 *error= rc;
157 *value_length= memcached_result_length(result_ptr);
158 *flags= memcached_result_flags(result_ptr);
159 char *result_value= memcached_string_take_value(&result_ptr->value);
160 memcached_result_free(result_ptr);
161
162 return result_value;
163 }
164 }
165
166 memcached_result_free(result_ptr);
167 }
168 assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
169
170 return NULL;
171 }
172
173 return value;
174 }
175
176 memcached_return_t memcached_mget(memcached_st *ptr,
177 const char * const *keys,
178 const size_t *key_length,
179 size_t number_of_keys)
180 {
181 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
182 }
183
184 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
185 const uint32_t master_server_key,
186 const bool is_group_key_set,
187 const char * const *keys,
188 const size_t *key_length,
189 const size_t number_of_keys,
190 const bool mget_mode);
191
192 static memcached_return_t __mget_by_key_real(memcached_st *ptr,
193 const char *group_key,
194 const size_t group_key_length,
195 const char * const *keys,
196 const size_t *key_length,
197 size_t number_of_keys,
198 const bool mget_mode)
199 {
200 bool failures_occured_in_sending= false;
201 const char *get_command= "get";
202 uint8_t get_command_length= 3;
203 unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
204
205 memcached_return_t rc;
206 if (memcached_failed(rc= initialize_query(ptr, true)))
207 {
208 return rc;
209 }
210
211 if (memcached_is_udp(ptr))
212 {
213 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
214 }
215
216 LIBMEMCACHED_MEMCACHED_MGET_START();
217
218 if (number_of_keys == 0)
219 {
220 return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("Numbers of keys provided was zero"));
221 }
222
223 if (memcached_failed((rc= memcached_key_test(*ptr, keys, key_length, number_of_keys))))
224 {
225 assert(memcached_last_error(ptr) == rc);
226
227 return rc;
228 }
229
230 bool is_group_key_set= false;
231 if (group_key and group_key_length)
232 {
233 master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
234 is_group_key_set= true;
235 }
236
237 /*
238 Here is where we pay for the non-block API. We need to remove any data sitting
239 in the queue before we start our get.
240
241 It might be optimum to bounce the connection if count > some number.
242 */
243 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
244 {
245 memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
246
247 if (instance->response_count())
248 {
249 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
250
251 if (ptr->flags.no_block)
252 {
253 memcached_io_write(instance);
254 }
255
256 while(instance->response_count())
257 {
258 (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
259 }
260 }
261 }
262
263 if (memcached_is_binary(ptr))
264 {
265 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys,
266 key_length, number_of_keys, mget_mode);
267 }
268
269 if (ptr->flags.support_cas)
270 {
271 get_command= "gets";
272 get_command_length= 4;
273 }
274
275 /*
276 If a server fails we warn about errors and start all over with sending keys
277 to the server.
278 */
279 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
280 size_t hosts_connected= 0;
281 for (uint32_t x= 0; x < number_of_keys; x++)
282 {
283 uint32_t server_key;
284
285 if (is_group_key_set)
286 {
287 server_key= master_server_key;
288 }
289 else
290 {
291 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
292 }
293
294 memcached_instance_st* instance= memcached_instance_fetch(ptr, server_key);
295
296 libmemcached_io_vector_st vector[]=
297 {
298 { get_command, get_command_length },
299 { memcached_literal_param(" ") },
300 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
301 { keys[x], key_length[x] }
302 };
303
304
305 if (instance->response_count() == 0)
306 {
307 rc= memcached_connect(instance);
308
309 if (memcached_failed(rc))
310 {
311 memcached_set_error(*instance, rc, MEMCACHED_AT);
312 continue;
313 }
314 hosts_connected++;
315
316 if ((memcached_io_writev(instance, vector, 1, false)) == false)
317 {
318 failures_occured_in_sending= true;
319 continue;
320 }
321 WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
322 memcached_instance_response_increment(instance);
323 WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
324 }
325
326 {
327 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false)
328 {
329 memcached_instance_response_reset(instance);
330 failures_occured_in_sending= true;
331 continue;
332 }
333 }
334 }
335
336 if (hosts_connected == 0)
337 {
338 LIBMEMCACHED_MEMCACHED_MGET_END();
339
340 if (memcached_failed(rc))
341 {
342 return rc;
343 }
344
345 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
346 }
347
348
349 /*
350 Should we muddle on if some servers are dead?
351 */
352 bool success_happened= false;
353 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
354 {
355 memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
356
357 if (instance->response_count())
358 {
359 /* We need to do something about non-connnected hosts in the future */
360 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
361 {
362 failures_occured_in_sending= true;
363 }
364 else
365 {
366 success_happened= true;
367 }
368 }
369 }
370
371 LIBMEMCACHED_MEMCACHED_MGET_END();
372
373 if (failures_occured_in_sending and success_happened)
374 {
375 return MEMCACHED_SOME_ERRORS;
376 }
377
378 if (success_happened)
379 {
380 return MEMCACHED_SUCCESS;
381 }
382
383 return MEMCACHED_FAILURE; // Complete failure occurred
384 }
385
386 memcached_return_t memcached_mget_by_key(memcached_st *shell,
387 const char *group_key,
388 size_t group_key_length,
389 const char * const *keys,
390 const size_t *key_length,
391 size_t number_of_keys)
392 {
393 Memcached* ptr= memcached2Memcached(shell);
394 return __mget_by_key_real(ptr, group_key, group_key_length, keys, key_length, number_of_keys, true);
395 }
396
397 memcached_return_t memcached_mget_execute(memcached_st *ptr,
398 const char * const *keys,
399 const size_t *key_length,
400 size_t number_of_keys,
401 memcached_execute_fn *callback,
402 void *context,
403 unsigned int number_of_callbacks)
404 {
405 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
406 number_of_keys, callback,
407 context, number_of_callbacks);
408 }
409
410 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell,
411 const char *group_key,
412 size_t group_key_length,
413 const char * const *keys,
414 const size_t *key_length,
415 size_t number_of_keys,
416 memcached_execute_fn *callback,
417 void *context,
418 unsigned int number_of_callbacks)
419 {
420 Memcached* ptr= memcached2Memcached(shell);
421 memcached_return_t rc;
422 if (memcached_failed(rc= initialize_query(ptr, false)))
423 {
424 return rc;
425 }
426
427 if (memcached_is_udp(ptr))
428 {
429 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
430 }
431
432 if (memcached_is_binary(ptr) == false)
433 {
434 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
435 memcached_literal_param("ASCII protocol is not supported for memcached_mget_execute_by_key()"));
436 }
437
438 memcached_callback_st *original_callbacks= ptr->callbacks;
439 memcached_callback_st cb= {
440 callback,
441 context,
442 number_of_callbacks
443 };
444
445 ptr->callbacks= &cb;
446 rc= memcached_mget_by_key(ptr, group_key, group_key_length, keys,
447 key_length, number_of_keys);
448 ptr->callbacks= original_callbacks;
449
450 return rc;
451 }
452
453 static memcached_return_t simple_binary_mget(memcached_st *ptr,
454 const uint32_t master_server_key,
455 bool is_group_key_set,
456 const char * const *keys,
457 const size_t *key_length,
458 const size_t number_of_keys, const bool mget_mode)
459 {
460 memcached_return_t rc= MEMCACHED_NOTFOUND;
461
462 bool flush= (number_of_keys == 1);
463
464 if (memcached_failed(rc= memcached_key_test(*ptr, keys, key_length, number_of_keys)))
465 {
466 return rc;
467 }
468
469 /*
470 If a server fails we warn about errors and start all over with sending keys
471 to the server.
472 */
473 for (uint32_t x= 0; x < number_of_keys; ++x)
474 {
475 uint32_t server_key;
476
477 if (is_group_key_set)
478 {
479 server_key= master_server_key;
480 }
481 else
482 {
483 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
484 }
485
486 memcached_instance_st* instance= memcached_instance_fetch(ptr, server_key);
487
488 if (instance->response_count() == 0)
489 {
490 rc= memcached_connect(instance);
491 if (memcached_failed(rc))
492 {
493 continue;
494 }
495 }
496
497 protocol_binary_request_getk request= { }; //= {.bytes= {0}};
498 initialize_binary_request(instance, request.message.header);
499 if (mget_mode)
500 {
501 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
502 }
503 else
504 {
505 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
506 }
507
508 #if 0
509 {
510 memcached_return_t vk= memcached_validate_key_length(key_length[x], ptr->flags.binary_protocol);
511 if (memcached_failed(rc= memcached_key_test(*memc, (const char **)&key, &key_length, 1)))
512 {
513 memcached_set_error(ptr, vk, MEMCACHED_AT, memcached_literal_param("Key was too long."));
514
515 if (x > 0)
516 {
517 memcached_io_reset(instance);
518 }
519
520 return vk;
521 }
522 }
523 #endif
524
525 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
526 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
527 request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->_namespace)));
528
529 libmemcached_io_vector_st vector[]=
530 {
531 { request.bytes, sizeof(request.bytes) },
532 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
533 { keys[x], key_length[x] }
534 };
535
536 if (memcached_io_writev(instance, vector, 3, flush) == false)
537 {
538 memcached_server_response_reset(instance);
539 rc= MEMCACHED_SOME_ERRORS;
540 continue;
541 }
542
543 /* We just want one pending response per server */
544 memcached_server_response_reset(instance);
545 memcached_server_response_increment(instance);
546 if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
547 {
548 rc= MEMCACHED_SOME_ERRORS;
549 }
550 }
551
552 if (mget_mode)
553 {
554 /*
555 Send a noop command to flush the buffers
556 */
557 protocol_binary_request_noop request= {}; //= {.bytes= {0}};
558 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
559 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
560
561 for (uint32_t x= 0; x < memcached_server_count(ptr); ++x)
562 {
563 memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
564
565 if (instance->response_count())
566 {
567 initialize_binary_request(instance, request.message.header);
568 if ((memcached_io_write(instance) == false) or
569 (memcached_io_write(instance, request.bytes, sizeof(request.bytes), true) == -1))
570 {
571 memcached_instance_response_reset(instance);
572 memcached_io_reset(instance);
573 rc= MEMCACHED_SOME_ERRORS;
574 }
575 }
576 }
577 }
578
579 return rc;
580 }
581
582 static memcached_return_t replication_binary_mget(memcached_st *ptr,
583 uint32_t* hash,
584 bool* dead_servers,
585 const char *const *keys,
586 const size_t *key_length,
587 const size_t number_of_keys)
588 {
589 memcached_return_t rc= MEMCACHED_NOTFOUND;
590 uint32_t start= 0;
591 uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
592
593 if (randomize_read)
594 {
595 start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
596 }
597
598 /* Loop for each replica */
599 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
600 {
601 bool success= true;
602
603 for (uint32_t x= 0; x < number_of_keys; ++x)
604 {
605 if (hash[x] == memcached_server_count(ptr))
606 {
607 continue; /* Already successfully sent */
608 }
609
610 uint32_t server= hash[x] +replica;
611
612 /* In case of randomized reads */
613 if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas)))
614 {
615 server+= start;
616 }
617
618 while (server >= memcached_server_count(ptr))
619 {
620 server -= memcached_server_count(ptr);
621 }
622
623 if (dead_servers[server])
624 {
625 continue;
626 }
627
628 memcached_instance_st* instance= memcached_instance_fetch(ptr, server);
629
630 if (instance->response_count() == 0)
631 {
632 rc= memcached_connect(instance);
633
634 if (memcached_failed(rc))
635 {
636 memcached_io_reset(instance);
637 dead_servers[server]= true;
638 success= false;
639 continue;
640 }
641 }
642
643 protocol_binary_request_getk request= {};
644 initialize_binary_request(instance, request.message.header);
645 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
646 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
647 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
648 request.message.header.request.bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
649
650 /*
651 * We need to disable buffering to actually know that the request was
652 * successfully sent to the server (so that we should expect a result
653 * back). It would be nice to do this in buffered mode, but then it
654 * would be complex to handle all error situations if we got to send
655 * some of the messages, and then we failed on writing out some others
656 * and we used the callback interface from memcached_mget_execute so
657 * that we might have processed some of the responses etc. For now,
658 * just make sure we work _correctly_
659 */
660 libmemcached_io_vector_st vector[]=
661 {
662 { request.bytes, sizeof(request.bytes) },
663 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
664 { keys[x], key_length[x] }
665 };
666
667 if (memcached_io_writev(instance, vector, 3, true) == false)
668 {
669 memcached_io_reset(instance);
670 dead_servers[server]= true;
671 success= false;
672 continue;
673 }
674
675 memcached_server_response_increment(instance);
676 hash[x]= memcached_server_count(ptr);
677 }
678
679 if (success)
680 {
681 break;
682 }
683 }
684
685 return rc;
686 }
687
688 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
689 const uint32_t master_server_key,
690 bool is_group_key_set,
691 const char * const *keys,
692 const size_t *key_length,
693 const size_t number_of_keys,
694 const bool mget_mode)
695 {
696 if (ptr->number_of_replicas == 0)
697 {
698 return simple_binary_mget(ptr, master_server_key, is_group_key_set,
699 keys, key_length, number_of_keys, mget_mode);
700 }
701
702 uint32_t* hash= libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
703 bool* dead_servers= libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
704
705 if (hash == NULL or dead_servers == NULL)
706 {
707 libmemcached_free(ptr, hash);
708 libmemcached_free(ptr, dead_servers);
709 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
710 }
711
712 if (is_group_key_set)
713 {
714 for (size_t x= 0; x < number_of_keys; x++)
715 {
716 hash[x]= master_server_key;
717 }
718 }
719 else
720 {
721 for (size_t x= 0; x < number_of_keys; x++)
722 {
723 hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
724 }
725 }
726
727 memcached_return_t rc= replication_binary_mget(ptr, hash, dead_servers, keys,
728 key_length, number_of_keys);
729
730 WATCHPOINT_IFERROR(rc);
731 libmemcached_free(ptr, hash);
732 libmemcached_free(ptr, dead_servers);
733
734 return MEMCACHED_SUCCESS;
735 }