Cleanup internal naming convention.
[m6w6/libmemcached] / libmemcached / get.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include <libmemcached/common.h>
39 #include <cassert>
40
41 /*
42 What happens if no servers exist?
43 */
44 char *memcached_get(memcached_st *ptr, const char *key,
45 size_t key_length,
46 size_t *value_length,
47 uint32_t *flags,
48 memcached_return_t *error)
49 {
50 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
51 flags, error);
52 }
53
54 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
55 const char *group_key,
56 size_t group_key_length,
57 const char * const *keys,
58 const size_t *key_length,
59 size_t number_of_keys,
60 bool mget_mode);
61
62 char *memcached_get_by_key(memcached_st *ptr,
63 const char *group_key,
64 size_t group_key_length,
65 const char *key, size_t key_length,
66 size_t *value_length,
67 uint32_t *flags,
68 memcached_return_t *error)
69 {
70 memcached_return_t unused;
71 if (error == NULL)
72 error= &unused;
73
74 unlikely (ptr->flags.use_udp)
75 {
76 if (value_length)
77 *value_length= 0;
78
79 *error= memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
80 return NULL;
81 }
82
83 uint64_t query_id= ptr->query_id;
84 (void)query_id;
85
86 /* Request the key */
87 *error= memcached_mget_by_key_real(ptr, group_key, group_key_length,
88 (const char * const *)&key, &key_length,
89 1, false);
90 assert(ptr->query_id == query_id +1);
91
92
93 if (memcached_failed(*error))
94 {
95 if (memcached_has_current_error(*ptr)) // Find the most accurate error
96 {
97 *error= memcached_last_error(ptr);
98 }
99
100 if (value_length)
101 *value_length= 0;
102
103 return NULL;
104 }
105
106 char *value= memcached_fetch(ptr, NULL, NULL,
107 value_length, flags, error);
108 assert(ptr->query_id == query_id +1);
109
110 /* This is for historical reasons */
111 if (*error == MEMCACHED_END)
112 *error= MEMCACHED_NOTFOUND;
113
114 if (value == NULL)
115 {
116 if (ptr->get_key_failure && *error == MEMCACHED_NOTFOUND)
117 {
118 memcached_result_reset(&ptr->result);
119 memcached_return_t rc= ptr->get_key_failure(ptr, key, key_length, &ptr->result);
120
121 /* On all failure drop to returning NULL */
122 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
123 {
124 if (rc == MEMCACHED_BUFFERED)
125 {
126 uint64_t latch; /* We use latch to track the state of the original socket */
127 latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
128 if (latch == 0)
129 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
130
131 rc= memcached_set(ptr, key, key_length,
132 (memcached_result_value(&ptr->result)),
133 (memcached_result_length(&ptr->result)),
134 0,
135 (memcached_result_flags(&ptr->result)));
136
137 if (rc == MEMCACHED_BUFFERED && latch == 0)
138 {
139 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
140 }
141 }
142 else
143 {
144 rc= memcached_set(ptr, key, key_length,
145 (memcached_result_value(&ptr->result)),
146 (memcached_result_length(&ptr->result)),
147 0,
148 (memcached_result_flags(&ptr->result)));
149 }
150
151 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
152 {
153 *error= rc;
154 *value_length= memcached_result_length(&ptr->result);
155 *flags= memcached_result_flags(&ptr->result);
156 return memcached_string_take_value(&ptr->result.value);
157 }
158 }
159 }
160 assert(ptr->query_id == query_id +1);
161
162 return NULL;
163 }
164
165 size_t dummy_length;
166 uint32_t dummy_flags;
167 memcached_return_t dummy_error;
168
169 char *dummy_value= memcached_fetch(ptr, NULL, NULL,
170 &dummy_length, &dummy_flags,
171 &dummy_error);
172 WATCHPOINT_ASSERT(dummy_length == 0);
173 WATCHPOINT_ASSERT(dummy_value == 0);
174 assert(ptr->query_id == query_id +1);
175
176 return value;
177 }
178
179 memcached_return_t memcached_mget(memcached_st *ptr,
180 const char * const *keys,
181 const size_t *key_length,
182 size_t number_of_keys)
183 {
184 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
185 }
186
187 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
188 uint32_t master_server_key,
189 bool is_group_key_set,
190 const char * const *keys,
191 const size_t *key_length,
192 size_t number_of_keys,
193 bool mget_mode);
194
195 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
196 const char *group_key,
197 size_t group_key_length,
198 const char * const *keys,
199 const size_t *key_length,
200 size_t number_of_keys,
201 bool mget_mode)
202 {
203 bool failures_occured_in_sending= false;
204 const char *get_command= "get ";
205 uint8_t get_command_length= 4;
206 unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
207
208 memcached_return_t rc;
209 if (memcached_failed(rc= initialize_query(ptr)))
210 {
211 return rc;
212 }
213
214 unlikely (ptr->flags.use_udp)
215 {
216 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
217 }
218
219 LIBMEMCACHED_MEMCACHED_MGET_START();
220
221 if (number_of_keys == 0)
222 {
223 return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT, memcached_literal_param("number_of_keys was zero"));
224 }
225
226 if (memcached_failed(memcached_key_test(*ptr, keys, key_length, number_of_keys)))
227 {
228 return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT, memcached_literal_param("A bad key value was provided"));
229 }
230
231 bool is_group_key_set= false;
232 if (group_key && group_key_length)
233 {
234 if (memcached_failed(memcached_key_test(*ptr, (const char * const *)&group_key, &group_key_length, 1)))
235 {
236 return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT, memcached_literal_param("A bad group key was provided."));
237 }
238
239 master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
240 is_group_key_set= true;
241 }
242
243 /*
244 Here is where we pay for the non-block API. We need to remove any data sitting
245 in the queue before we start our get.
246
247 It might be optimum to bounce the connection if count > some number.
248 */
249 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
250 {
251 memcached_server_write_instance_st instance=
252 memcached_server_instance_fetch(ptr, x);
253
254 if (memcached_server_response_count(instance))
255 {
256 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
257
258 if (ptr->flags.no_block)
259 (void)memcached_io_write(instance, NULL, 0, true);
260
261 while(memcached_server_response_count(instance))
262 (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
263 }
264 }
265
266 if (ptr->flags.binary_protocol)
267 {
268 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys,
269 key_length, number_of_keys, mget_mode);
270 }
271
272 if (ptr->flags.support_cas)
273 {
274 get_command= "gets ";
275 get_command_length= 5;
276 }
277
278 /*
279 If a server fails we warn about errors and start all over with sending keys
280 to the server.
281 */
282 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
283 size_t hosts_connected= 0;
284 for (uint32_t x= 0; x < number_of_keys; x++)
285 {
286 memcached_server_write_instance_st instance;
287 uint32_t server_key;
288
289 if (is_group_key_set)
290 {
291 server_key= master_server_key;
292 }
293 else
294 {
295 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
296 }
297
298 instance= memcached_server_instance_fetch(ptr, server_key);
299
300 struct libmemcached_io_vector_st vector[]=
301 {
302 { get_command_length, get_command },
303 { memcached_array_size(ptr->_namespace), memcached_array_string(ptr->_namespace) },
304 { key_length[x], keys[x] },
305 { 1, " " }
306 };
307
308
309 if (memcached_server_response_count(instance) == 0)
310 {
311 rc= memcached_connect(instance);
312
313 if (memcached_failed(rc))
314 {
315 memcached_set_error(*instance, rc, MEMCACHED_AT);
316 continue;
317 }
318 hosts_connected++;
319
320 if ((memcached_io_writev(instance, vector, 4, false)) == -1)
321 {
322 failures_occured_in_sending= true;
323 continue;
324 }
325 WATCHPOINT_ASSERT(instance->cursor_active == 0);
326 memcached_server_response_increment(instance);
327 WATCHPOINT_ASSERT(instance->cursor_active == 1);
328 }
329 else
330 {
331 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == -1)
332 {
333 memcached_server_response_reset(instance);
334 failures_occured_in_sending= true;
335 continue;
336 }
337 }
338 }
339
340 if (hosts_connected == 0)
341 {
342 LIBMEMCACHED_MEMCACHED_MGET_END();
343
344 if (memcached_failed(rc))
345 return rc;
346
347 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
348 }
349
350
351 /*
352 Should we muddle on if some servers are dead?
353 */
354 bool success_happened= false;
355 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
356 {
357 memcached_server_write_instance_st instance=
358 memcached_server_instance_fetch(ptr, x);
359
360 if (memcached_server_response_count(instance))
361 {
362 /* We need to do something about non-connnected hosts in the future */
363 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
364 {
365 failures_occured_in_sending= true;
366 }
367 else
368 {
369 success_happened= true;
370 }
371 }
372 }
373
374 LIBMEMCACHED_MEMCACHED_MGET_END();
375
376 if (failures_occured_in_sending && success_happened)
377 {
378 return MEMCACHED_SOME_ERRORS;
379 }
380
381 if (success_happened)
382 return MEMCACHED_SUCCESS;
383
384 return MEMCACHED_FAILURE; // Complete failure occurred
385 }
386
387 memcached_return_t memcached_mget_by_key(memcached_st *ptr,
388 const char *group_key,
389 size_t group_key_length,
390 const char * const *keys,
391 const size_t *key_length,
392 size_t number_of_keys)
393 {
394 return memcached_mget_by_key_real(ptr, group_key, group_key_length, keys,
395 key_length, number_of_keys, true);
396 }
397
398 memcached_return_t memcached_mget_execute(memcached_st *ptr,
399 const char * const *keys,
400 const size_t *key_length,
401 size_t number_of_keys,
402 memcached_execute_fn *callback,
403 void *context,
404 unsigned int number_of_callbacks)
405 {
406 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
407 number_of_keys, callback,
408 context, number_of_callbacks);
409 }
410
411 memcached_return_t memcached_mget_execute_by_key(memcached_st *ptr,
412 const char *group_key,
413 size_t group_key_length,
414 const char * const *keys,
415 const size_t *key_length,
416 size_t number_of_keys,
417 memcached_execute_fn *callback,
418 void *context,
419 unsigned int number_of_callbacks)
420 {
421 if ((ptr->flags.binary_protocol) == 0)
422 return MEMCACHED_NOT_SUPPORTED;
423
424 memcached_return_t rc;
425 memcached_callback_st *original_callbacks= ptr->callbacks;
426 memcached_callback_st cb= {
427 callback,
428 context,
429 number_of_callbacks
430 };
431
432 ptr->callbacks= &cb;
433 rc= memcached_mget_by_key(ptr, group_key, group_key_length, keys,
434 key_length, number_of_keys);
435 ptr->callbacks= original_callbacks;
436 return rc;
437 }
438
439 static memcached_return_t simple_binary_mget(memcached_st *ptr,
440 uint32_t master_server_key,
441 bool is_group_key_set,
442 const char * const *keys,
443 const size_t *key_length,
444 size_t number_of_keys, bool mget_mode)
445 {
446 memcached_return_t rc= MEMCACHED_NOTFOUND;
447
448 bool flush= (number_of_keys == 1);
449
450 /*
451 If a server fails we warn about errors and start all over with sending keys
452 to the server.
453 */
454 for (uint32_t x= 0; x < number_of_keys; ++x)
455 {
456 uint32_t server_key;
457
458 if (is_group_key_set)
459 {
460 server_key= master_server_key;
461 }
462 else
463 {
464 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
465 }
466
467 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
468
469 if (memcached_server_response_count(instance) == 0)
470 {
471 rc= memcached_connect(instance);
472 if (memcached_failed(rc))
473 continue;
474 }
475
476 protocol_binary_request_getk request= { }; //= {.bytes= {0}};
477 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
478 if (mget_mode)
479 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
480 else
481 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
482
483 memcached_return_t vk;
484 vk= memcached_validate_key_length(key_length[x],
485 ptr->flags.binary_protocol);
486 unlikely (vk != MEMCACHED_SUCCESS)
487 {
488 if (x > 0)
489 {
490 memcached_io_reset(instance);
491 }
492
493 return vk;
494 }
495
496 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
497 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
498 request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->_namespace)));
499
500 struct libmemcached_io_vector_st vector[]=
501 {
502 { sizeof(request.bytes), request.bytes },
503 { memcached_array_size(ptr->_namespace), memcached_array_string(ptr->_namespace) },
504 { key_length[x], keys[x] }
505 };
506
507 if (memcached_io_writev(instance, vector, 3, flush) == -1)
508 {
509 memcached_server_response_reset(instance);
510 rc= MEMCACHED_SOME_ERRORS;
511 continue;
512 }
513
514 /* We just want one pending response per server */
515 memcached_server_response_reset(instance);
516 memcached_server_response_increment(instance);
517 if ((x > 0 && x == ptr->io_key_prefetch) && memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
518 {
519 rc= MEMCACHED_SOME_ERRORS;
520 }
521 }
522
523 if (mget_mode)
524 {
525 /*
526 Send a noop command to flush the buffers
527 */
528 protocol_binary_request_noop request= {}; //= {.bytes= {0}};
529 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
530 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
531 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
532
533 for (uint32_t x= 0; x < memcached_server_count(ptr); ++x)
534 {
535 memcached_server_write_instance_st instance=
536 memcached_server_instance_fetch(ptr, x);
537
538 if (memcached_server_response_count(instance))
539 {
540 if (memcached_io_write(instance, NULL, 0, true) == -1)
541 {
542 memcached_server_response_reset(instance);
543 memcached_io_reset(instance);
544 rc= MEMCACHED_SOME_ERRORS;
545 }
546
547 if (memcached_io_write(instance, request.bytes,
548 sizeof(request.bytes), true) == -1)
549 {
550 memcached_server_response_reset(instance);
551 memcached_io_reset(instance);
552 rc= MEMCACHED_SOME_ERRORS;
553 }
554 }
555 }
556 }
557
558
559 return rc;
560 }
561
562 static memcached_return_t replication_binary_mget(memcached_st *ptr,
563 uint32_t* hash,
564 bool* dead_servers,
565 const char *const *keys,
566 const size_t *key_length,
567 size_t number_of_keys)
568 {
569 memcached_return_t rc= MEMCACHED_NOTFOUND;
570 uint32_t start= 0;
571 uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
572
573 if (randomize_read)
574 start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
575
576 /* Loop for each replica */
577 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
578 {
579 bool success= true;
580
581 for (uint32_t x= 0; x < number_of_keys; ++x)
582 {
583 if (hash[x] == memcached_server_count(ptr))
584 continue; /* Already successfully sent */
585
586 uint32_t server= hash[x] + replica;
587
588 /* In case of randomized reads */
589 if (randomize_read && ((server + start) <= (hash[x] + ptr->number_of_replicas)))
590 server += start;
591
592 while (server >= memcached_server_count(ptr))
593 server -= memcached_server_count(ptr);
594
595 if (dead_servers[server])
596 continue;
597
598 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server);
599
600 if (memcached_server_response_count(instance) == 0)
601 {
602 rc= memcached_connect(instance);
603 if (memcached_failed(rc))
604 {
605 memcached_io_reset(instance);
606 dead_servers[server]= true;
607 success= false;
608 continue;
609 }
610 }
611
612 protocol_binary_request_getk request= {};
613 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
614 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
615 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
616 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
617 request.message.header.request.bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
618
619 /*
620 * We need to disable buffering to actually know that the request was
621 * successfully sent to the server (so that we should expect a result
622 * back). It would be nice to do this in buffered mode, but then it
623 * would be complex to handle all error situations if we got to send
624 * some of the messages, and then we failed on writing out some others
625 * and we used the callback interface from memcached_mget_execute so
626 * that we might have processed some of the responses etc. For now,
627 * just make sure we work _correctly_
628 */
629 struct libmemcached_io_vector_st vector[]=
630 {
631 { sizeof(request.bytes), request.bytes },
632 { memcached_array_size(ptr->_namespace), memcached_array_string(ptr->_namespace) },
633 { key_length[x], keys[x] }
634 };
635
636 if (memcached_io_writev(instance, vector, 3, true) == -1)
637 {
638 memcached_io_reset(instance);
639 dead_servers[server]= true;
640 success= false;
641 continue;
642 }
643
644 memcached_server_response_increment(instance);
645 hash[x]= memcached_server_count(ptr);
646 }
647
648 if (success)
649 break;
650 }
651
652 return rc;
653 }
654
655 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
656 uint32_t master_server_key,
657 bool is_group_key_set,
658 const char * const *keys,
659 const size_t *key_length,
660 size_t number_of_keys,
661 bool mget_mode)
662 {
663 if (ptr->number_of_replicas == 0)
664 {
665 return simple_binary_mget(ptr, master_server_key, is_group_key_set,
666 keys, key_length, number_of_keys, mget_mode);
667 }
668
669 uint32_t* hash= static_cast<uint32_t*>(libmemcached_malloc(ptr, sizeof(uint32_t) * number_of_keys));
670 bool* dead_servers= static_cast<bool*>(libmemcached_calloc(ptr, memcached_server_count(ptr), sizeof(bool)));
671
672 if (hash == NULL || dead_servers == NULL)
673 {
674 libmemcached_free(ptr, hash);
675 libmemcached_free(ptr, dead_servers);
676 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
677 }
678
679 if (is_group_key_set)
680 {
681 for (size_t x= 0; x < number_of_keys; x++)
682 {
683 hash[x]= master_server_key;
684 }
685 }
686 else
687 {
688 for (size_t x= 0; x < number_of_keys; x++)
689 {
690 hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
691 }
692 }
693
694 memcached_return_t rc= replication_binary_mget(ptr, hash, dead_servers, keys,
695 key_length, number_of_keys);
696
697 WATCHPOINT_IFERROR(rc);
698 libmemcached_free(ptr, hash);
699 libmemcached_free(ptr, dead_servers);
700
701 return MEMCACHED_SUCCESS;
702 }