C++: double underscores are reserved
[m6w6/libmemcached] / src / libmemcached / get.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17 #include "p9y/random.hpp"
18
19 char *memcached_get(memcached_st *ptr, const char *key, size_t key_length, size_t *value_length,
20 uint32_t *flags, memcached_return_t *error) {
21 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length, flags, error);
22 }
23
24 static memcached_return_t mget_by_key_real(memcached_st *ptr, const char *group_key,
25 size_t group_key_length, const char *const *keys,
26 const size_t *key_length, size_t number_of_keys,
27 const bool mget_mode);
28 char *memcached_get_by_key(memcached_st *shell, const char *group_key, size_t group_key_length,
29 const char *key, size_t key_length, size_t *value_length,
30 uint32_t *flags, memcached_return_t *error) {
31 Memcached *ptr = memcached2Memcached(shell);
32 memcached_return_t unused;
33 if (error == NULL) {
34 error = &unused;
35 }
36
37 uint64_t query_id = 0;
38 if (ptr) {
39 query_id = ptr->query_id;
40 }
41
42 /* Request the key */
43 *error = mget_by_key_real(ptr, group_key, group_key_length, (const char *const *) &key,
44 &key_length, 1, false);
45 if (ptr) {
46 assert_msg(ptr->query_id == query_id + 1,
47 "Programmer error, the query_id was not incremented.");
48 }
49
50 if (memcached_failed(*error)) {
51 if (ptr) {
52 if (memcached_has_current_error(*ptr)) // Find the most accurate error
53 {
54 *error = memcached_last_error(ptr);
55 }
56 }
57
58 if (value_length) {
59 *value_length = 0;
60 }
61
62 return NULL;
63 }
64
65 char *value = memcached_fetch(ptr, NULL, NULL, value_length, flags, error);
66 assert_msg(ptr->query_id == query_id + 1, "Programmer error, the query_id was not incremented.");
67
68 /* This is for historical reasons */
69 if (*error == MEMCACHED_END) {
70 *error = MEMCACHED_NOTFOUND;
71 }
72 if (value == NULL) {
73 if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND) {
74 memcached_result_st key_failure_result;
75 memcached_result_st *result_ptr = memcached_result_create(ptr, &key_failure_result);
76 memcached_return_t rc = ptr->get_key_failure(ptr, key, key_length, result_ptr);
77
78 /* On all failure drop to returning NULL */
79 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
80 if (rc == MEMCACHED_BUFFERED) {
81 uint64_t latch; /* We use latch to track the state of the original socket */
82 latch = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
83 if (latch == 0) {
84 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
85 }
86
87 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
88 (memcached_result_length(result_ptr)), 0,
89 (memcached_result_flags(result_ptr)));
90
91 if (rc == MEMCACHED_BUFFERED and latch == 0) {
92 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
93 }
94 } else {
95 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
96 (memcached_result_length(result_ptr)), 0,
97 (memcached_result_flags(result_ptr)));
98 }
99
100 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
101 *error = rc;
102 if (value_length) {
103 *value_length = memcached_result_length(result_ptr);
104 }
105 if (flags) {
106 *flags = memcached_result_flags(result_ptr);
107 }
108 char *result_value = memcached_string_take_value(&result_ptr->value);
109 memcached_result_free(result_ptr);
110
111 return result_value;
112 }
113 }
114
115 memcached_result_free(result_ptr);
116 }
117 assert_msg(ptr->query_id == query_id + 1,
118 "Programmer error, the query_id was not incremented.");
119
120 return NULL;
121 }
122
123 return value;
124 }
125
126 memcached_return_t memcached_mget(memcached_st *ptr, const char *const *keys,
127 const size_t *key_length, size_t number_of_keys) {
128 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
129 }
130
131 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
132 const bool is_group_key_set, const char *const *keys,
133 const size_t *key_length, const size_t number_of_keys,
134 const bool mget_mode);
135
136 static memcached_return_t mget_by_key_real(memcached_st *ptr, const char *group_key,
137 size_t group_key_length, const char *const *keys,
138 const size_t *key_length, size_t number_of_keys,
139 const bool mget_mode) {
140 bool failures_occured_in_sending = false;
141 const char *get_command = "get";
142 uint8_t get_command_length = 3;
143 unsigned int master_server_key = (unsigned int) -1; /* 0 is a valid server id! */
144
145 memcached_return_t rc;
146 if (memcached_failed(rc = initialize_query(ptr, true))) {
147 return rc;
148 }
149
150 if (memcached_is_udp(ptr)) {
151 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
152 }
153
154 LIBMEMCACHED_MEMCACHED_MGET_START();
155
156 if (number_of_keys == 0) {
157 return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
158 memcached_literal_param("Numbers of keys provided was zero"));
159 }
160
161 if (memcached_failed((rc = memcached_key_test(*ptr, keys, key_length, number_of_keys)))) {
162 assert(memcached_last_error(ptr) == rc);
163
164 return rc;
165 }
166
167 bool is_group_key_set = false;
168 if (group_key and group_key_length) {
169 master_server_key =
170 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
171 is_group_key_set = true;
172 }
173
174 /*
175 Here is where we pay for the non-block API. We need to remove any data sitting
176 in the queue before we start our get.
177
178 It might be optimum to bounce the connection if count > some number.
179 */
180 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
181 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
182
183 if (instance->response_count()) {
184 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
185
186 if (ptr->flags.no_block || ptr->flags.buffer_requests) {
187 memcached_io_write(instance);
188 }
189
190 while (instance->response_count()) {
191 (void) memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
192 }
193 }
194 }
195
196 if (memcached_is_binary(ptr)) {
197 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys, key_length,
198 number_of_keys, mget_mode);
199 }
200
201 if (ptr->flags.support_cas) {
202 get_command = "gets";
203 get_command_length = 4;
204 }
205
206 /*
207 If a server fails we warn about errors and start all over with sending keys
208 to the server.
209 */
210 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
211 size_t hosts_connected = 0;
212 for (uint32_t x = 0; x < number_of_keys; x++) {
213 uint32_t server_key;
214
215 if (is_group_key_set) {
216 server_key = master_server_key;
217 } else {
218 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
219 }
220
221 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
222
223 libmemcached_io_vector_st vector[] = {
224 {get_command, get_command_length},
225 {memcached_literal_param(" ")},
226 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
227 {keys[x], key_length[x]}};
228
229 if (instance->response_count() == 0) {
230 rc = memcached_connect(instance);
231
232 if (memcached_failed(rc)) {
233 memcached_set_error(*instance, rc, MEMCACHED_AT);
234 continue;
235 }
236 hosts_connected++;
237
238 if ((memcached_io_writev(instance, vector, 1, false)) == false) {
239 failures_occured_in_sending = true;
240 continue;
241 }
242 WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
243 memcached_instance_response_increment(instance);
244 WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
245 }
246
247 {
248 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) {
249 memcached_instance_response_reset(instance);
250 failures_occured_in_sending = true;
251 continue;
252 }
253 }
254 }
255
256 if (hosts_connected == 0) {
257 LIBMEMCACHED_MEMCACHED_MGET_END();
258
259 if (memcached_failed(rc)) {
260 return rc;
261 }
262
263 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
264 }
265
266 /*
267 Should we muddle on if some servers are dead?
268 */
269 bool success_happened = false;
270 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
271 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
272
273 if (instance->response_count()) {
274 /* We need to do something about non-connnected hosts in the future */
275 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) {
276 failures_occured_in_sending = true;
277 } else {
278 success_happened = true;
279 }
280 }
281 }
282
283 LIBMEMCACHED_MEMCACHED_MGET_END();
284
285 if (failures_occured_in_sending and success_happened) {
286 return MEMCACHED_SOME_ERRORS;
287 }
288
289 if (success_happened) {
290 return MEMCACHED_SUCCESS;
291 }
292
293 return MEMCACHED_FAILURE; // Complete failure occurred
294 }
295
296 memcached_return_t memcached_mget_by_key(memcached_st *shell, const char *group_key,
297 size_t group_key_length, const char *const *keys,
298 const size_t *key_length, size_t number_of_keys) {
299 Memcached *ptr = memcached2Memcached(shell);
300 return mget_by_key_real(ptr, group_key, group_key_length, keys, key_length, number_of_keys, true);
301 }
302
303 memcached_return_t memcached_mget_execute(memcached_st *ptr, const char *const *keys,
304 const size_t *key_length, size_t number_of_keys,
305 memcached_execute_fn *callback, void *context,
306 unsigned int number_of_callbacks) {
307 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length, number_of_keys, callback,
308 context, number_of_callbacks);
309 }
310
311 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell, const char *group_key,
312 size_t group_key_length, const char *const *keys,
313 const size_t *key_length, size_t number_of_keys,
314 memcached_execute_fn *callback, void *context,
315 unsigned int number_of_callbacks) {
316 Memcached *ptr = memcached2Memcached(shell);
317 memcached_return_t rc;
318 if (memcached_failed(rc = initialize_query(ptr, false))) {
319 return rc;
320 }
321
322 if (memcached_is_udp(ptr)) {
323 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
324 }
325
326 if (memcached_is_binary(ptr) == false) {
327 return memcached_set_error(
328 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
329 memcached_literal_param(
330 "ASCII protocol is not supported for memcached_mget_execute_by_key()"));
331 }
332
333 memcached_callback_st *original_callbacks = ptr->callbacks;
334 memcached_callback_st cb = {callback, context, number_of_callbacks};
335
336 ptr->callbacks = &cb;
337 rc = memcached_mget_by_key(ptr, group_key, group_key_length, keys, key_length, number_of_keys);
338 ptr->callbacks = original_callbacks;
339
340 return rc;
341 }
342
343 static memcached_return_t simple_binary_mget(memcached_st *ptr, const uint32_t master_server_key,
344 bool is_group_key_set, const char *const *keys,
345 const size_t *key_length, const size_t number_of_keys,
346 const bool mget_mode) {
347 memcached_return_t rc = MEMCACHED_NOTFOUND;
348
349 bool flush = (number_of_keys == 1);
350
351 if (memcached_failed(rc = memcached_key_test(*ptr, keys, key_length, number_of_keys))) {
352 return rc;
353 }
354
355 /*
356 If a server fails we warn about errors and start all over with sending keys
357 to the server.
358 */
359 for (uint32_t x = 0; x < number_of_keys; ++x) {
360 uint32_t server_key;
361
362 if (is_group_key_set) {
363 server_key = master_server_key;
364 } else {
365 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
366 }
367
368 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
369
370 if (instance->response_count() == 0) {
371 rc = memcached_connect(instance);
372 if (memcached_failed(rc)) {
373 continue;
374 }
375 }
376
377 protocol_binary_request_getk request = {}; //= {.bytes= {0}};
378 initialize_binary_request(instance, request.message.header);
379 if (mget_mode) {
380 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETKQ;
381 } else {
382 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
383 }
384
385 #if 0
386 {
387 memcached_return_t vk= memcached_validate_key_length(key_length[x], ptr->flags.binary_protocol);
388 if (memcached_failed(rc= memcached_key_test(*memc, (const char **)&key, &key_length, 1)))
389 {
390 memcached_set_error(ptr, vk, MEMCACHED_AT, memcached_literal_param("Key was too long."));
391
392 if (x > 0)
393 {
394 memcached_io_reset(instance);
395 }
396
397 return vk;
398 }
399 }
400 #endif
401
402 request.message.header.request.keylen =
403 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
404 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
405 request.message.header.request.bodylen =
406 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
407
408 libmemcached_io_vector_st vector[] = {
409 {request.bytes, sizeof(request.bytes)},
410 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
411 {keys[x], key_length[x]}};
412
413 if (memcached_io_writev(instance, vector, 3, flush) == false) {
414 memcached_server_response_reset(instance);
415 rc = MEMCACHED_SOME_ERRORS;
416 continue;
417 }
418
419 /* We just want one pending response per server */
420 memcached_server_response_reset(instance);
421 memcached_server_response_increment(instance);
422 if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
423 {
424 rc = MEMCACHED_SOME_ERRORS;
425 }
426 }
427
428 if (mget_mode) {
429 /*
430 Send a noop command to flush the buffers
431 */
432 protocol_binary_request_noop request = {}; //= {.bytes= {0}};
433 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
434 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
435
436 for (uint32_t x = 0; x < memcached_server_count(ptr); ++x) {
437 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
438
439 if (instance->response_count()) {
440 initialize_binary_request(instance, request.message.header);
441 if ((memcached_io_write(instance) == false)
442 or (memcached_io_write(instance, request.bytes, sizeof(request.bytes), true) == -1))
443 {
444 memcached_instance_response_reset(instance);
445 memcached_io_reset(instance);
446 rc = MEMCACHED_SOME_ERRORS;
447 }
448 }
449 }
450 }
451
452 return rc;
453 }
454
455 static memcached_return_t replication_binary_mget(memcached_st *ptr, uint32_t *hash,
456 bool *dead_servers, const char *const *keys,
457 const size_t *key_length,
458 const size_t number_of_keys) {
459 memcached_return_t rc = MEMCACHED_NOTFOUND;
460 uint32_t start = 0;
461 uint64_t randomize_read = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
462
463 if (randomize_read) {
464 start = (uint32_t) random() % (uint32_t)(ptr->number_of_replicas + 1);
465 }
466
467 /* Loop for each replica */
468 for (uint32_t replica = 0; replica <= ptr->number_of_replicas; ++replica) {
469 bool success = true;
470
471 for (uint32_t x = 0; x < number_of_keys; ++x) {
472 if (hash[x] == memcached_server_count(ptr)) {
473 continue; /* Already successfully sent */
474 }
475
476 uint32_t server = hash[x] + replica;
477
478 /* In case of randomized reads */
479 if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas))) {
480 server += start;
481 }
482
483 while (server >= memcached_server_count(ptr)) {
484 server -= memcached_server_count(ptr);
485 }
486
487 if (dead_servers[server]) {
488 continue;
489 }
490
491 memcached_instance_st *instance = memcached_instance_fetch(ptr, server);
492
493 if (instance->response_count() == 0) {
494 rc = memcached_connect(instance);
495
496 if (memcached_failed(rc)) {
497 memcached_io_reset(instance);
498 dead_servers[server] = true;
499 success = false;
500 continue;
501 }
502 }
503
504 protocol_binary_request_getk request = {};
505 initialize_binary_request(instance, request.message.header);
506 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
507 request.message.header.request.keylen =
508 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
509 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
510 request.message.header.request.bodylen =
511 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
512
513 /*
514 * We need to disable buffering to actually know that the request was
515 * successfully sent to the server (so that we should expect a result
516 * back). It would be nice to do this in buffered mode, but then it
517 * would be complex to handle all error situations if we got to send
518 * some of the messages, and then we failed on writing out some others
519 * and we used the callback interface from memcached_mget_execute so
520 * that we might have processed some of the responses etc. For now,
521 * just make sure we work _correctly_
522 */
523 libmemcached_io_vector_st vector[] = {
524 {request.bytes, sizeof(request.bytes)},
525 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
526 {keys[x], key_length[x]}};
527
528 if (memcached_io_writev(instance, vector, 3, true) == false) {
529 memcached_io_reset(instance);
530 dead_servers[server] = true;
531 success = false;
532 continue;
533 }
534
535 memcached_server_response_increment(instance);
536 hash[x] = memcached_server_count(ptr);
537 }
538
539 if (success) {
540 break;
541 }
542 }
543
544 return rc;
545 }
546
547 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
548 bool is_group_key_set, const char *const *keys,
549 const size_t *key_length, const size_t number_of_keys,
550 const bool mget_mode) {
551 if (ptr->number_of_replicas == 0) {
552 return simple_binary_mget(ptr, master_server_key, is_group_key_set, keys, key_length,
553 number_of_keys, mget_mode);
554 }
555
556 uint32_t *hash = libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
557 bool *dead_servers = libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
558
559 if (hash == NULL or dead_servers == NULL) {
560 libmemcached_free(ptr, hash);
561 libmemcached_free(ptr, dead_servers);
562 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
563 }
564
565 if (is_group_key_set) {
566 for (size_t x = 0; x < number_of_keys; x++) {
567 hash[x] = master_server_key;
568 }
569 } else {
570 for (size_t x = 0; x < number_of_keys; x++) {
571 hash[x] = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
572 }
573 }
574
575 memcached_return_t rc =
576 replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys);
577
578 WATCHPOINT_IFERROR(rc);
579 libmemcached_free(ptr, hash);
580 libmemcached_free(ptr, dead_servers);
581
582 return MEMCACHED_SUCCESS;
583 }