Merge in namespace fixes for binary protocol.
[awesomized/libmemcached] / libmemcached / get.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include <libmemcached/common.h>
39 #include <cassert>
40
41 /*
42 What happens if no servers exist?
43 */
44 char *memcached_get(memcached_st *ptr, const char *key,
45 size_t key_length,
46 size_t *value_length,
47 uint32_t *flags,
48 memcached_return_t *error)
49 {
50 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
51 flags, error);
52 }
53
54 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
55 const char *group_key,
56 size_t group_key_length,
57 const char * const *keys,
58 const size_t *key_length,
59 size_t number_of_keys,
60 bool mget_mode);
61
62 char *memcached_get_by_key(memcached_st *ptr,
63 const char *group_key,
64 size_t group_key_length,
65 const char *key, size_t key_length,
66 size_t *value_length,
67 uint32_t *flags,
68 memcached_return_t *error)
69 {
70 unlikely (ptr->flags.use_udp)
71 {
72 if (value_length)
73 *value_length= 0;
74
75 *error= memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
76 return NULL;
77 }
78
79 uint64_t query_id= ptr->query_id;
80 (void)query_id;
81
82 /* Request the key */
83 *error= memcached_mget_by_key_real(ptr, group_key, group_key_length,
84 (const char * const *)&key,
85 &key_length, 1, false);
86 assert(ptr->query_id == query_id +1);
87
88
89 if (memcached_failed(*error))
90 {
91 if (memcached_has_current_error(*ptr)) // Find the most accurate error
92 {
93 *error= memcached_last_error(ptr);
94 }
95
96 if (value_length)
97 *value_length= 0;
98
99 return NULL;
100 }
101
102 char *value= memcached_fetch(ptr, NULL, NULL,
103 value_length, flags, error);
104 assert(ptr->query_id == query_id +1);
105
106 /* This is for historical reasons */
107 if (*error == MEMCACHED_END)
108 *error= MEMCACHED_NOTFOUND;
109
110 if (value == NULL)
111 {
112 if (ptr->get_key_failure && *error == MEMCACHED_NOTFOUND)
113 {
114 memcached_result_reset(&ptr->result);
115 memcached_return_t rc= ptr->get_key_failure(ptr, key, key_length, &ptr->result);
116
117 /* On all failure drop to returning NULL */
118 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
119 {
120 if (rc == MEMCACHED_BUFFERED)
121 {
122 uint64_t latch; /* We use latch to track the state of the original socket */
123 latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
124 if (latch == 0)
125 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
126
127 rc= memcached_set(ptr, key, key_length,
128 (memcached_result_value(&ptr->result)),
129 (memcached_result_length(&ptr->result)),
130 0,
131 (memcached_result_flags(&ptr->result)));
132
133 if (rc == MEMCACHED_BUFFERED && latch == 0)
134 {
135 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
136 }
137 }
138 else
139 {
140 rc= memcached_set(ptr, key, key_length,
141 (memcached_result_value(&ptr->result)),
142 (memcached_result_length(&ptr->result)),
143 0,
144 (memcached_result_flags(&ptr->result)));
145 }
146
147 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
148 {
149 *error= rc;
150 *value_length= memcached_result_length(&ptr->result);
151 *flags= memcached_result_flags(&ptr->result);
152 return memcached_string_take_value(&ptr->result.value);
153 }
154 }
155 }
156 assert(ptr->query_id == query_id +1);
157
158 return NULL;
159 }
160
161 size_t dummy_length;
162 uint32_t dummy_flags;
163 memcached_return_t dummy_error;
164
165 char *dummy_value= memcached_fetch(ptr, NULL, NULL,
166 &dummy_length, &dummy_flags,
167 &dummy_error);
168 WATCHPOINT_ASSERT(dummy_length == 0);
169 WATCHPOINT_ASSERT(dummy_value == 0);
170 assert(ptr->query_id == query_id +1);
171
172 return value;
173 }
174
175 memcached_return_t memcached_mget(memcached_st *ptr,
176 const char * const *keys,
177 const size_t *key_length,
178 size_t number_of_keys)
179 {
180 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
181 }
182
183 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
184 uint32_t master_server_key,
185 bool is_group_key_set,
186 const char * const *keys,
187 const size_t *key_length,
188 size_t number_of_keys,
189 bool mget_mode);
190
191 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
192 const char *group_key,
193 size_t group_key_length,
194 const char * const *keys,
195 const size_t *key_length,
196 size_t number_of_keys,
197 bool mget_mode)
198 {
199 bool failures_occured_in_sending= false;
200 const char *get_command= "get ";
201 uint8_t get_command_length= 4;
202 unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
203
204 memcached_return_t rc;
205 if (memcached_failed(rc= initialize_query(ptr)))
206 {
207 return rc;
208 }
209
210 unlikely (ptr->flags.use_udp)
211 {
212 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
213 }
214
215 LIBMEMCACHED_MEMCACHED_MGET_START();
216
217 if (number_of_keys == 0)
218 {
219 return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT, memcached_literal_param("number_of_keys was zero"));
220 }
221
222 if (memcached_failed(memcached_key_test(*ptr, keys, key_length, number_of_keys)))
223 {
224 return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT, memcached_literal_param("A bad key value was provided"));
225 }
226
227 bool is_group_key_set= false;
228 if (group_key && group_key_length)
229 {
230 if (memcached_failed(memcached_key_test(*ptr, (const char * const *)&group_key, &group_key_length, 1)))
231 {
232 return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT, memcached_literal_param("A bad group key was provided."));
233 }
234
235 master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
236 is_group_key_set= true;
237 }
238
239 /*
240 Here is where we pay for the non-block API. We need to remove any data sitting
241 in the queue before we start our get.
242
243 It might be optimum to bounce the connection if count > some number.
244 */
245 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
246 {
247 memcached_server_write_instance_st instance=
248 memcached_server_instance_fetch(ptr, x);
249
250 if (memcached_server_response_count(instance))
251 {
252 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
253
254 if (ptr->flags.no_block)
255 (void)memcached_io_write(instance, NULL, 0, true);
256
257 while(memcached_server_response_count(instance))
258 (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
259 }
260 }
261
262 if (ptr->flags.binary_protocol)
263 {
264 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys,
265 key_length, number_of_keys, mget_mode);
266 }
267
268 if (ptr->flags.support_cas)
269 {
270 get_command= "gets ";
271 get_command_length= 5;
272 }
273
274 /*
275 If a server fails we warn about errors and start all over with sending keys
276 to the server.
277 */
278 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
279 size_t hosts_connected= 0;
280 for (uint32_t x= 0; x < number_of_keys; x++)
281 {
282 memcached_server_write_instance_st instance;
283 uint32_t server_key;
284
285 if (is_group_key_set)
286 {
287 server_key= master_server_key;
288 }
289 else
290 {
291 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
292 }
293
294 instance= memcached_server_instance_fetch(ptr, server_key);
295
296 struct libmemcached_io_vector_st vector[]=
297 {
298 { get_command_length, get_command },
299 { memcached_array_size(ptr->prefix_key), memcached_array_string(ptr->prefix_key) },
300 { key_length[x], keys[x] },
301 { 1, " " }
302 };
303
304
305 if (memcached_server_response_count(instance) == 0)
306 {
307 rc= memcached_connect(instance);
308
309 if (memcached_failed(rc))
310 {
311 memcached_set_error(*instance, rc, MEMCACHED_AT);
312 continue;
313 }
314 hosts_connected++;
315
316 if ((memcached_io_writev(instance, vector, 4, false)) == -1)
317 {
318 failures_occured_in_sending= true;
319 continue;
320 }
321 WATCHPOINT_ASSERT(instance->cursor_active == 0);
322 memcached_server_response_increment(instance);
323 WATCHPOINT_ASSERT(instance->cursor_active == 1);
324 }
325 else
326 {
327 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == -1)
328 {
329 memcached_server_response_reset(instance);
330 failures_occured_in_sending= true;
331 continue;
332 }
333 }
334 }
335
336 if (hosts_connected == 0)
337 {
338 LIBMEMCACHED_MEMCACHED_MGET_END();
339
340 if (memcached_failed(rc))
341 return rc;
342
343 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
344 }
345
346
347 /*
348 Should we muddle on if some servers are dead?
349 */
350 bool success_happened= false;
351 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
352 {
353 memcached_server_write_instance_st instance=
354 memcached_server_instance_fetch(ptr, x);
355
356 if (memcached_server_response_count(instance))
357 {
358 /* We need to do something about non-connnected hosts in the future */
359 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
360 {
361 failures_occured_in_sending= true;
362 }
363 else
364 {
365 success_happened= true;
366 }
367 }
368 }
369
370 LIBMEMCACHED_MEMCACHED_MGET_END();
371
372 if (failures_occured_in_sending && success_happened)
373 {
374 return MEMCACHED_SOME_ERRORS;
375 }
376
377 if (success_happened)
378 return MEMCACHED_SUCCESS;
379
380 return MEMCACHED_FAILURE; // Complete failure occurred
381 }
382
383 memcached_return_t memcached_mget_by_key(memcached_st *ptr,
384 const char *group_key,
385 size_t group_key_length,
386 const char * const *keys,
387 const size_t *key_length,
388 size_t number_of_keys)
389 {
390 return memcached_mget_by_key_real(ptr, group_key, group_key_length, keys,
391 key_length, number_of_keys, true);
392 }
393
394 memcached_return_t memcached_mget_execute(memcached_st *ptr,
395 const char * const *keys,
396 const size_t *key_length,
397 size_t number_of_keys,
398 memcached_execute_fn *callback,
399 void *context,
400 unsigned int number_of_callbacks)
401 {
402 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
403 number_of_keys, callback,
404 context, number_of_callbacks);
405 }
406
407 memcached_return_t memcached_mget_execute_by_key(memcached_st *ptr,
408 const char *group_key,
409 size_t group_key_length,
410 const char * const *keys,
411 const size_t *key_length,
412 size_t number_of_keys,
413 memcached_execute_fn *callback,
414 void *context,
415 unsigned int number_of_callbacks)
416 {
417 if ((ptr->flags.binary_protocol) == 0)
418 return MEMCACHED_NOT_SUPPORTED;
419
420 memcached_return_t rc;
421 memcached_callback_st *original_callbacks= ptr->callbacks;
422 memcached_callback_st cb= {
423 callback,
424 context,
425 number_of_callbacks
426 };
427
428 ptr->callbacks= &cb;
429 rc= memcached_mget_by_key(ptr, group_key, group_key_length, keys,
430 key_length, number_of_keys);
431 ptr->callbacks= original_callbacks;
432 return rc;
433 }
434
435 static memcached_return_t simple_binary_mget(memcached_st *ptr,
436 uint32_t master_server_key,
437 bool is_group_key_set,
438 const char * const *keys,
439 const size_t *key_length,
440 size_t number_of_keys, bool mget_mode)
441 {
442 memcached_return_t rc= MEMCACHED_NOTFOUND;
443
444 bool flush= (number_of_keys == 1);
445
446 /*
447 If a server fails we warn about errors and start all over with sending keys
448 to the server.
449 */
450 for (uint32_t x= 0; x < number_of_keys; ++x)
451 {
452 uint32_t server_key;
453
454 if (is_group_key_set)
455 {
456 server_key= master_server_key;
457 }
458 else
459 {
460 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
461 }
462
463 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
464
465 if (memcached_server_response_count(instance) == 0)
466 {
467 rc= memcached_connect(instance);
468 if (memcached_failed(rc))
469 continue;
470 }
471
472 protocol_binary_request_getk request= { }; //= {.bytes= {0}};
473 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
474 if (mget_mode)
475 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
476 else
477 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
478
479 memcached_return_t vk;
480 vk= memcached_validate_key_length(key_length[x],
481 ptr->flags.binary_protocol);
482 unlikely (vk != MEMCACHED_SUCCESS)
483 {
484 if (x > 0)
485 {
486 memcached_io_reset(instance);
487 }
488
489 return vk;
490 }
491
492 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->prefix_key)));
493 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
494 request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->prefix_key)));
495
496 struct libmemcached_io_vector_st vector[]=
497 {
498 { sizeof(request.bytes), request.bytes },
499 { memcached_array_size(ptr->prefix_key), memcached_array_string(ptr->prefix_key) },
500 { key_length[x], keys[x] }
501 };
502
503 if (memcached_io_writev(instance, vector, 3, flush) == -1)
504 {
505 memcached_server_response_reset(instance);
506 rc= MEMCACHED_SOME_ERRORS;
507 continue;
508 }
509
510 /* We just want one pending response per server */
511 memcached_server_response_reset(instance);
512 memcached_server_response_increment(instance);
513 if ((x > 0 && x == ptr->io_key_prefetch) && memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
514 {
515 rc= MEMCACHED_SOME_ERRORS;
516 }
517 }
518
519 if (mget_mode)
520 {
521 /*
522 Send a noop command to flush the buffers
523 */
524 protocol_binary_request_noop request= {}; //= {.bytes= {0}};
525 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
526 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
527 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
528
529 for (uint32_t x= 0; x < memcached_server_count(ptr); ++x)
530 {
531 memcached_server_write_instance_st instance=
532 memcached_server_instance_fetch(ptr, x);
533
534 if (memcached_server_response_count(instance))
535 {
536 if (memcached_io_write(instance, NULL, 0, true) == -1)
537 {
538 memcached_server_response_reset(instance);
539 memcached_io_reset(instance);
540 rc= MEMCACHED_SOME_ERRORS;
541 }
542
543 if (memcached_io_write(instance, request.bytes,
544 sizeof(request.bytes), true) == -1)
545 {
546 memcached_server_response_reset(instance);
547 memcached_io_reset(instance);
548 rc= MEMCACHED_SOME_ERRORS;
549 }
550 }
551 }
552 }
553
554
555 return rc;
556 }
557
558 static memcached_return_t replication_binary_mget(memcached_st *ptr,
559 uint32_t* hash,
560 bool* dead_servers,
561 const char *const *keys,
562 const size_t *key_length,
563 size_t number_of_keys)
564 {
565 memcached_return_t rc= MEMCACHED_NOTFOUND;
566 uint32_t start= 0;
567 uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
568
569 if (randomize_read)
570 start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
571
572 /* Loop for each replica */
573 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
574 {
575 bool success= true;
576
577 for (uint32_t x= 0; x < number_of_keys; ++x)
578 {
579 if (hash[x] == memcached_server_count(ptr))
580 continue; /* Already successfully sent */
581
582 uint32_t server= hash[x] + replica;
583
584 /* In case of randomized reads */
585 if (randomize_read && ((server + start) <= (hash[x] + ptr->number_of_replicas)))
586 server += start;
587
588 while (server >= memcached_server_count(ptr))
589 server -= memcached_server_count(ptr);
590
591 if (dead_servers[server])
592 continue;
593
594 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server);
595
596 if (memcached_server_response_count(instance) == 0)
597 {
598 rc= memcached_connect(instance);
599 if (memcached_failed(rc))
600 {
601 memcached_io_reset(instance);
602 dead_servers[server]= true;
603 success= false;
604 continue;
605 }
606 }
607
608 protocol_binary_request_getk request= {};
609 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
610 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
611 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->prefix_key)));
612 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
613 request.message.header.request.bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->prefix_key)));
614
615 /*
616 * We need to disable buffering to actually know that the request was
617 * successfully sent to the server (so that we should expect a result
618 * back). It would be nice to do this in buffered mode, but then it
619 * would be complex to handle all error situations if we got to send
620 * some of the messages, and then we failed on writing out some others
621 * and we used the callback interface from memcached_mget_execute so
622 * that we might have processed some of the responses etc. For now,
623 * just make sure we work _correctly_
624 */
625 struct libmemcached_io_vector_st vector[]=
626 {
627 { sizeof(request.bytes), request.bytes },
628 { memcached_array_size(ptr->prefix_key), memcached_array_string(ptr->prefix_key) },
629 { key_length[x], keys[x] }
630 };
631
632 if (memcached_io_writev(instance, vector, 3, true) == -1)
633 {
634 memcached_io_reset(instance);
635 dead_servers[server]= true;
636 success= false;
637 continue;
638 }
639
640 memcached_server_response_increment(instance);
641 hash[x]= memcached_server_count(ptr);
642 }
643
644 if (success)
645 break;
646 }
647
648 return rc;
649 }
650
651 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
652 uint32_t master_server_key,
653 bool is_group_key_set,
654 const char * const *keys,
655 const size_t *key_length,
656 size_t number_of_keys,
657 bool mget_mode)
658 {
659 if (ptr->number_of_replicas == 0)
660 {
661 return simple_binary_mget(ptr, master_server_key, is_group_key_set,
662 keys, key_length, number_of_keys, mget_mode);
663 }
664
665 uint32_t* hash= static_cast<uint32_t*>(libmemcached_malloc(ptr, sizeof(uint32_t) * number_of_keys));
666 bool* dead_servers= static_cast<bool*>(libmemcached_calloc(ptr, memcached_server_count(ptr), sizeof(bool)));
667
668 if (hash == NULL || dead_servers == NULL)
669 {
670 libmemcached_free(ptr, hash);
671 libmemcached_free(ptr, dead_servers);
672 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
673 }
674
675 if (is_group_key_set)
676 {
677 for (size_t x= 0; x < number_of_keys; x++)
678 {
679 hash[x]= master_server_key;
680 }
681 }
682 else
683 {
684 for (size_t x= 0; x < number_of_keys; x++)
685 {
686 hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
687 }
688 }
689
690 memcached_return_t rc= replication_binary_mget(ptr, hash, dead_servers, keys,
691 key_length, number_of_keys);
692
693 WATCHPOINT_IFERROR(rc);
694 libmemcached_free(ptr, hash);
695 libmemcached_free(ptr, dead_servers);
696
697 return MEMCACHED_SUCCESS;
698 }