Merge in code for C++ compiling of libmemcached.
[m6w6/libmemcached] / libmemcached / get.cc
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Get functions for libmemcached
9 *
10 */
11
12 #include "common.h"
13
14 /*
15 What happens if no servers exist?
16 */
17 char *memcached_get(memcached_st *ptr, const char *key,
18 size_t key_length,
19 size_t *value_length,
20 uint32_t *flags,
21 memcached_return_t *error)
22 {
23 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
24 flags, error);
25 }
26
27 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
28 const char *group_key,
29 size_t group_key_length,
30 const char * const *keys,
31 const size_t *key_length,
32 size_t number_of_keys,
33 bool mget_mode);
34
35 char *memcached_get_by_key(memcached_st *ptr,
36 const char *group_key,
37 size_t group_key_length,
38 const char *key, size_t key_length,
39 size_t *value_length,
40 uint32_t *flags,
41 memcached_return_t *error)
42 {
43 char *value;
44 size_t dummy_length;
45 uint32_t dummy_flags;
46 memcached_return_t dummy_error;
47
48 unlikely (ptr->flags.use_udp)
49 {
50 *error= MEMCACHED_NOT_SUPPORTED;
51 return NULL;
52 }
53
54 /* Request the key */
55 *error= memcached_mget_by_key_real(ptr, group_key, group_key_length,
56 (const char * const *)&key,
57 &key_length, 1, false);
58
59 value= memcached_fetch(ptr, NULL, NULL,
60 value_length, flags, error);
61 /* This is for historical reasons */
62 if (*error == MEMCACHED_END)
63 *error= MEMCACHED_NOTFOUND;
64
65 if (value == NULL)
66 {
67 if (ptr->get_key_failure && *error == MEMCACHED_NOTFOUND)
68 {
69 memcached_return_t rc;
70
71 memcached_result_reset(&ptr->result);
72 rc= ptr->get_key_failure(ptr, key, key_length, &ptr->result);
73
74 /* On all failure drop to returning NULL */
75 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
76 {
77 if (rc == MEMCACHED_BUFFERED)
78 {
79 uint64_t latch; /* We use latch to track the state of the original socket */
80 latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
81 if (latch == 0)
82 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
83
84 rc= memcached_set(ptr, key, key_length,
85 (memcached_result_value(&ptr->result)),
86 (memcached_result_length(&ptr->result)),
87 0,
88 (memcached_result_flags(&ptr->result)));
89
90 if (rc == MEMCACHED_BUFFERED && latch == 0)
91 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
92 }
93 else
94 {
95 rc= memcached_set(ptr, key, key_length,
96 (memcached_result_value(&ptr->result)),
97 (memcached_result_length(&ptr->result)),
98 0,
99 (memcached_result_flags(&ptr->result)));
100 }
101
102 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
103 {
104 *error= rc;
105 *value_length= memcached_result_length(&ptr->result);
106 *flags= memcached_result_flags(&ptr->result);
107 return memcached_string_c_copy(&ptr->result.value);
108 }
109 }
110 }
111
112 return NULL;
113 }
114
115 (void)memcached_fetch(ptr, NULL, NULL,
116 &dummy_length, &dummy_flags,
117 &dummy_error);
118 WATCHPOINT_ASSERT(dummy_length == 0);
119
120 return value;
121 }
122
123 memcached_return_t memcached_mget(memcached_st *ptr,
124 const char * const *keys,
125 const size_t *key_length,
126 size_t number_of_keys)
127 {
128 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
129 }
130
131 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
132 uint32_t master_server_key,
133 bool is_group_key_set,
134 const char * const *keys,
135 const size_t *key_length,
136 size_t number_of_keys,
137 bool mget_mode);
138
139 static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr,
140 const char *group_key,
141 size_t group_key_length,
142 const char * const *keys,
143 const size_t *key_length,
144 size_t number_of_keys,
145 bool mget_mode)
146 {
147 bool failures_occured_in_sending= false;
148 const char *get_command= "get ";
149 uint8_t get_command_length= 4;
150 unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
151 bool is_group_key_set= false;
152
153 memcached_return_t rc;
154 if (memcached_failed(rc= initialize_query(ptr)))
155 {
156 return rc;
157 }
158
159 unlikely (ptr->flags.use_udp)
160 return MEMCACHED_NOT_SUPPORTED;
161
162 LIBMEMCACHED_MEMCACHED_MGET_START();
163
164 if (number_of_keys == 0)
165 return MEMCACHED_NOTFOUND;
166
167 if (ptr->flags.verify_key && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
168 {
169 return MEMCACHED_BAD_KEY_PROVIDED;
170 }
171
172 if (group_key && group_key_length)
173 {
174 if (ptr->flags.verify_key and (memcached_key_test((const char * const *)&group_key, &group_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
175 return MEMCACHED_BAD_KEY_PROVIDED;
176
177 master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
178 is_group_key_set= true;
179 }
180
181 /*
182 Here is where we pay for the non-block API. We need to remove any data sitting
183 in the queue before we start our get.
184
185 It might be optimum to bounce the connection if count > some number.
186 */
187 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
188 {
189 memcached_server_write_instance_st instance=
190 memcached_server_instance_fetch(ptr, x);
191
192 if (memcached_server_response_count(instance))
193 {
194 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
195
196 if (ptr->flags.no_block)
197 (void)memcached_io_write(instance, NULL, 0, true);
198
199 while(memcached_server_response_count(instance))
200 (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
201 }
202 }
203
204 if (ptr->flags.binary_protocol)
205 {
206 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys,
207 key_length, number_of_keys, mget_mode);
208 }
209
210 if (ptr->flags.support_cas)
211 {
212 get_command= "gets ";
213 get_command_length= 5;
214 }
215
216 /*
217 If a server fails we warn about errors and start all over with sending keys
218 to the server.
219 */
220 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
221 size_t hosts_connected= 0;
222 for (uint32_t x= 0; x < number_of_keys; x++)
223 {
224 memcached_server_write_instance_st instance;
225 uint32_t server_key;
226
227 if (is_group_key_set)
228 {
229 server_key= master_server_key;
230 }
231 else
232 {
233 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
234 }
235
236 instance= memcached_server_instance_fetch(ptr, server_key);
237
238 struct libmemcached_io_vector_st vector[]=
239 {
240 { get_command_length, get_command },
241 { memcached_array_size(ptr->prefix_key), memcached_array_string(ptr->prefix_key) },
242 { key_length[x], keys[x] },
243 { 1, " " }
244 };
245
246
247 if (memcached_server_response_count(instance) == 0)
248 {
249 rc= memcached_connect(instance);
250
251 if (rc != MEMCACHED_SUCCESS)
252 {
253 continue;
254 }
255 hosts_connected++;
256
257 if ((memcached_io_writev(instance, vector, 4, false)) == -1)
258 {
259 failures_occured_in_sending= true;
260 continue;
261 }
262 WATCHPOINT_ASSERT(instance->cursor_active == 0);
263 memcached_server_response_increment(instance);
264 WATCHPOINT_ASSERT(instance->cursor_active == 1);
265 }
266 else
267 {
268 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == -1)
269 {
270 memcached_server_response_reset(instance);
271 failures_occured_in_sending= true;
272 continue;
273 }
274 }
275 }
276
277 if (hosts_connected == 0)
278 {
279 LIBMEMCACHED_MEMCACHED_MGET_END();
280
281 if (rc != MEMCACHED_SUCCESS)
282 return rc;
283
284 return MEMCACHED_NO_SERVERS;
285 }
286
287
288 /*
289 Should we muddle on if some servers are dead?
290 */
291 bool success_happened= false;
292 for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
293 {
294 memcached_server_write_instance_st instance=
295 memcached_server_instance_fetch(ptr, x);
296
297 if (memcached_server_response_count(instance))
298 {
299 /* We need to do something about non-connnected hosts in the future */
300 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
301 {
302 failures_occured_in_sending= true;
303 }
304 else
305 {
306 success_happened= true;
307 }
308 }
309 }
310
311 LIBMEMCACHED_MEMCACHED_MGET_END();
312
313 if (failures_occured_in_sending && success_happened)
314 return MEMCACHED_SOME_ERRORS;
315
316 if (success_happened)
317 return MEMCACHED_SUCCESS;
318
319 return MEMCACHED_FAILURE;
320 }
321
322 memcached_return_t memcached_mget_by_key(memcached_st *ptr,
323 const char *group_key,
324 size_t group_key_length,
325 const char * const *keys,
326 const size_t *key_length,
327 size_t number_of_keys)
328 {
329 return memcached_mget_by_key_real(ptr, group_key, group_key_length, keys,
330 key_length, number_of_keys, true);
331 }
332
333 memcached_return_t memcached_mget_execute(memcached_st *ptr,
334 const char * const *keys,
335 const size_t *key_length,
336 size_t number_of_keys,
337 memcached_execute_fn *callback,
338 void *context,
339 unsigned int number_of_callbacks)
340 {
341 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
342 number_of_keys, callback,
343 context, number_of_callbacks);
344 }
345
346 memcached_return_t memcached_mget_execute_by_key(memcached_st *ptr,
347 const char *group_key,
348 size_t group_key_length,
349 const char * const *keys,
350 const size_t *key_length,
351 size_t number_of_keys,
352 memcached_execute_fn *callback,
353 void *context,
354 unsigned int number_of_callbacks)
355 {
356 if ((ptr->flags.binary_protocol) == 0)
357 return MEMCACHED_NOT_SUPPORTED;
358
359 memcached_return_t rc;
360 memcached_callback_st *original_callbacks= ptr->callbacks;
361 memcached_callback_st cb= {
362 callback,
363 context,
364 number_of_callbacks
365 };
366
367 ptr->callbacks= &cb;
368 rc= memcached_mget_by_key(ptr, group_key, group_key_length, keys,
369 key_length, number_of_keys);
370 ptr->callbacks= original_callbacks;
371 return rc;
372 }
373
374 static memcached_return_t simple_binary_mget(memcached_st *ptr,
375 uint32_t master_server_key,
376 bool is_group_key_set,
377 const char * const *keys,
378 const size_t *key_length,
379 size_t number_of_keys, bool mget_mode)
380 {
381 memcached_return_t rc= MEMCACHED_NOTFOUND;
382
383 bool flush= (number_of_keys == 1);
384
385 /*
386 If a server fails we warn about errors and start all over with sending keys
387 to the server.
388 */
389 for (uint32_t x= 0; x < number_of_keys; ++x)
390 {
391 uint32_t server_key;
392 memcached_server_write_instance_st instance;
393
394 if (is_group_key_set)
395 {
396 server_key= master_server_key;
397 }
398 else
399 {
400 server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
401 }
402
403 instance= memcached_server_instance_fetch(ptr, server_key);
404
405 if (memcached_server_response_count(instance) == 0)
406 {
407 rc= memcached_connect(instance);
408 if (rc != MEMCACHED_SUCCESS)
409 continue;
410 }
411
412 protocol_binary_request_getk request= { }; //= {.bytes= {0}};
413 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
414 if (mget_mode)
415 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
416 else
417 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
418
419 memcached_return_t vk;
420 vk= memcached_validate_key_length(key_length[x],
421 ptr->flags.binary_protocol);
422 unlikely (vk != MEMCACHED_SUCCESS)
423 {
424 if (x > 0)
425 {
426 memcached_io_reset(instance);
427 }
428
429 return vk;
430 }
431
432 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->prefix_key)));
433 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
434 request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->prefix_key)));
435
436 struct libmemcached_io_vector_st vector[]=
437 {
438 { sizeof(request.bytes), request.bytes },
439 { memcached_array_size(ptr->prefix_key), memcached_array_string(ptr->prefix_key) },
440 { key_length[x], keys[x] }
441 };
442
443 if (memcached_io_writev(instance, vector, 3, flush) == -1)
444 {
445 memcached_server_response_reset(instance);
446 rc= MEMCACHED_SOME_ERRORS;
447 continue;
448 }
449
450 /* We just want one pending response per server */
451 memcached_server_response_reset(instance);
452 memcached_server_response_increment(instance);
453 if ((x > 0 && x == ptr->io_key_prefetch) && memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
454 {
455 rc= MEMCACHED_SOME_ERRORS;
456 }
457 }
458
459 if (mget_mode)
460 {
461 /*
462 Send a noop command to flush the buffers
463 */
464 protocol_binary_request_noop request= {}; //= {.bytes= {0}};
465 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
466 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
467 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
468
469 for (uint32_t x= 0; x < memcached_server_count(ptr); ++x)
470 {
471 memcached_server_write_instance_st instance=
472 memcached_server_instance_fetch(ptr, x);
473
474 if (memcached_server_response_count(instance))
475 {
476 if (memcached_io_write(instance, NULL, 0, true) == -1)
477 {
478 memcached_server_response_reset(instance);
479 memcached_io_reset(instance);
480 rc= MEMCACHED_SOME_ERRORS;
481 }
482
483 if (memcached_io_write(instance, request.bytes,
484 sizeof(request.bytes), true) == -1)
485 {
486 memcached_server_response_reset(instance);
487 memcached_io_reset(instance);
488 rc= MEMCACHED_SOME_ERRORS;
489 }
490 }
491 }
492 }
493
494
495 return rc;
496 }
497
498 static memcached_return_t replication_binary_mget(memcached_st *ptr,
499 uint32_t* hash,
500 bool* dead_servers,
501 const char *const *keys,
502 const size_t *key_length,
503 size_t number_of_keys)
504 {
505 memcached_return_t rc= MEMCACHED_NOTFOUND;
506 uint32_t start= 0;
507 uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
508
509 if (randomize_read)
510 start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
511
512 /* Loop for each replica */
513 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
514 {
515 bool success= true;
516
517 for (uint32_t x= 0; x < number_of_keys; ++x)
518 {
519 memcached_server_write_instance_st instance;
520
521 if (hash[x] == memcached_server_count(ptr))
522 continue; /* Already successfully sent */
523
524 uint32_t server= hash[x] + replica;
525
526 /* In case of randomized reads */
527 if (randomize_read && ((server + start) <= (hash[x] + ptr->number_of_replicas)))
528 server += start;
529
530 while (server >= memcached_server_count(ptr))
531 server -= memcached_server_count(ptr);
532
533 if (dead_servers[server])
534 continue;
535
536 instance= memcached_server_instance_fetch(ptr, server);
537
538 if (memcached_server_response_count(instance) == 0)
539 {
540 rc= memcached_connect(instance);
541 if (rc != MEMCACHED_SUCCESS)
542 {
543 memcached_io_reset(instance);
544 dead_servers[server]= true;
545 success= false;
546 continue;
547 }
548 }
549
550 protocol_binary_request_getk request= {};
551 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
552 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
553 request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->prefix_key)));
554 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
555 request.message.header.request.bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->prefix_key)));
556
557 /*
558 * We need to disable buffering to actually know that the request was
559 * successfully sent to the server (so that we should expect a result
560 * back). It would be nice to do this in buffered mode, but then it
561 * would be complex to handle all error situations if we got to send
562 * some of the messages, and then we failed on writing out some others
563 * and we used the callback interface from memcached_mget_execute so
564 * that we might have processed some of the responses etc. For now,
565 * just make sure we work _correctly_
566 */
567 struct libmemcached_io_vector_st vector[]=
568 {
569 { sizeof(request.bytes), request.bytes },
570 { memcached_array_size(ptr->prefix_key), memcached_array_string(ptr->prefix_key) },
571 { key_length[x], keys[x] }
572 };
573
574 if (memcached_io_writev(instance, vector, 3, true) == -1)
575 {
576 memcached_io_reset(instance);
577 dead_servers[server]= true;
578 success= false;
579 continue;
580 }
581
582 memcached_server_response_increment(instance);
583 hash[x]= memcached_server_count(ptr);
584 }
585
586 if (success)
587 break;
588 }
589
590 return rc;
591 }
592
593 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
594 uint32_t master_server_key,
595 bool is_group_key_set,
596 const char * const *keys,
597 const size_t *key_length,
598 size_t number_of_keys,
599 bool mget_mode)
600 {
601 memcached_return_t rc;
602
603 if (ptr->number_of_replicas == 0)
604 {
605 rc= simple_binary_mget(ptr, master_server_key, is_group_key_set,
606 keys, key_length, number_of_keys, mget_mode);
607 }
608 else
609 {
610 uint32_t* hash;
611 bool* dead_servers;
612
613 hash= static_cast<uint32_t*>(libmemcached_malloc(ptr, sizeof(uint32_t) * number_of_keys));
614 dead_servers= static_cast<bool*>(libmemcached_calloc(ptr, memcached_server_count(ptr), sizeof(bool)));
615
616 if (hash == NULL || dead_servers == NULL)
617 {
618 libmemcached_free(ptr, hash);
619 libmemcached_free(ptr, dead_servers);
620 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
621 }
622
623 if (is_group_key_set)
624 {
625 for (size_t x= 0; x < number_of_keys; x++)
626 {
627 hash[x]= master_server_key;
628 }
629 }
630 else
631 {
632 for (size_t x= 0; x < number_of_keys; x++)
633 {
634 hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
635 }
636 }
637
638 rc= replication_binary_mget(ptr, hash, dead_servers, keys,
639 key_length, number_of_keys);
640
641 libmemcached_free(ptr, hash);
642 libmemcached_free(ptr, dead_servers);
643
644 return MEMCACHED_SUCCESS;
645 }
646
647 return rc;
648 }