p9y
[m6w6/libmemcached] / src / libmemcached / get.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17 #include "p9y/random.hpp"
18
19 char *memcached_get(memcached_st *ptr, const char *key, size_t key_length, size_t *value_length,
20 uint32_t *flags, memcached_return_t *error) {
21 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length, flags, error);
22 }
23
24 static memcached_return_t __mget_by_key_real(memcached_st *ptr, const char *group_key,
25 size_t group_key_length, const char *const *keys,
26 const size_t *key_length, size_t number_of_keys,
27 const bool mget_mode);
28 char *memcached_get_by_key(memcached_st *shell, const char *group_key, size_t group_key_length,
29 const char *key, size_t key_length, size_t *value_length,
30 uint32_t *flags, memcached_return_t *error) {
31 Memcached *ptr = memcached2Memcached(shell);
32 memcached_return_t unused;
33 if (error == NULL) {
34 error = &unused;
35 }
36
37 uint64_t query_id = 0;
38 if (ptr) {
39 query_id = ptr->query_id;
40 }
41
42 /* Request the key */
43 *error = __mget_by_key_real(ptr, group_key, group_key_length, (const char *const *) &key,
44 &key_length, 1, false);
45 if (ptr) {
46 assert_msg(ptr->query_id == query_id + 1,
47 "Programmer error, the query_id was not incremented.");
48 }
49
50 if (memcached_failed(*error)) {
51 if (ptr) {
52 if (memcached_has_current_error(*ptr)) // Find the most accurate error
53 {
54 *error = memcached_last_error(ptr);
55 }
56 }
57
58 if (value_length) {
59 *value_length = 0;
60 }
61
62 return NULL;
63 }
64
65 char *value = memcached_fetch(ptr, NULL, NULL, value_length, flags, error);
66 assert_msg(ptr->query_id == query_id + 1, "Programmer error, the query_id was not incremented.");
67
68 /* This is for historical reasons */
69 if (*error == MEMCACHED_END) {
70 *error = MEMCACHED_NOTFOUND;
71 }
72 if (value == NULL) {
73 if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND) {
74 memcached_result_st key_failure_result;
75 memcached_result_st *result_ptr = memcached_result_create(ptr, &key_failure_result);
76 memcached_return_t rc = ptr->get_key_failure(ptr, key, key_length, result_ptr);
77
78 /* On all failure drop to returning NULL */
79 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
80 if (rc == MEMCACHED_BUFFERED) {
81 uint64_t latch; /* We use latch to track the state of the original socket */
82 latch = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
83 if (latch == 0) {
84 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
85 }
86
87 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
88 (memcached_result_length(result_ptr)), 0,
89 (memcached_result_flags(result_ptr)));
90
91 if (rc == MEMCACHED_BUFFERED and latch == 0) {
92 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
93 }
94 } else {
95 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
96 (memcached_result_length(result_ptr)), 0,
97 (memcached_result_flags(result_ptr)));
98 }
99
100 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
101 *error = rc;
102 if (value_length) {
103 *value_length = memcached_result_length(result_ptr);
104 }
105 if (flags) {
106 *flags = memcached_result_flags(result_ptr);
107 }
108 char *result_value = memcached_string_take_value(&result_ptr->value);
109 memcached_result_free(result_ptr);
110
111 return result_value;
112 }
113 }
114
115 memcached_result_free(result_ptr);
116 }
117 assert_msg(ptr->query_id == query_id + 1,
118 "Programmer error, the query_id was not incremented.");
119
120 return NULL;
121 }
122
123 return value;
124 }
125
126 memcached_return_t memcached_mget(memcached_st *ptr, const char *const *keys,
127 const size_t *key_length, size_t number_of_keys) {
128 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
129 }
130
131 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
132 const bool is_group_key_set, const char *const *keys,
133 const size_t *key_length, const size_t number_of_keys,
134 const bool mget_mode);
135
136 static memcached_return_t __mget_by_key_real(memcached_st *ptr, const char *group_key,
137 const size_t group_key_length, const char *const *keys,
138 const size_t *key_length, size_t number_of_keys,
139 const bool mget_mode) {
140 bool failures_occured_in_sending = false;
141 const char *get_command = "get";
142 uint8_t get_command_length = 3;
143 unsigned int master_server_key = (unsigned int) -1; /* 0 is a valid server id! */
144
145 memcached_return_t rc;
146 if (memcached_failed(rc = initialize_query(ptr, true))) {
147 return rc;
148 }
149
150 if (memcached_is_udp(ptr)) {
151 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
152 }
153
154 LIBMEMCACHED_MEMCACHED_MGET_START();
155
156 if (number_of_keys == 0) {
157 return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
158 memcached_literal_param("Numbers of keys provided was zero"));
159 }
160
161 if (memcached_failed((rc = memcached_key_test(*ptr, keys, key_length, number_of_keys)))) {
162 assert(memcached_last_error(ptr) == rc);
163
164 return rc;
165 }
166
167 bool is_group_key_set = false;
168 if (group_key and group_key_length) {
169 master_server_key =
170 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
171 is_group_key_set = true;
172 }
173
174 /*
175 Here is where we pay for the non-block API. We need to remove any data sitting
176 in the queue before we start our get.
177
178 It might be optimum to bounce the connection if count > some number.
179 */
180 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
181 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
182
183 if (instance->response_count()) {
184 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
185
186 if (ptr->flags.no_block || ptr->flags.buffer_requests) {
187 memcached_io_write(instance);
188 }
189
190 while (instance->response_count()) {
191 (void) memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
192 }
193 }
194 }
195
196 if (memcached_is_binary(ptr)) {
197 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys, key_length,
198 number_of_keys, mget_mode);
199 }
200
201 if (ptr->flags.support_cas) {
202 get_command = "gets";
203 get_command_length = 4;
204 }
205
206 /*
207 If a server fails we warn about errors and start all over with sending keys
208 to the server.
209 */
210 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
211 size_t hosts_connected = 0;
212 for (uint32_t x = 0; x < number_of_keys; x++) {
213 uint32_t server_key;
214
215 if (is_group_key_set) {
216 server_key = master_server_key;
217 } else {
218 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
219 }
220
221 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
222
223 libmemcached_io_vector_st vector[] = {
224 {get_command, get_command_length},
225 {memcached_literal_param(" ")},
226 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
227 {keys[x], key_length[x]}};
228
229 if (instance->response_count() == 0) {
230 rc = memcached_connect(instance);
231
232 if (memcached_failed(rc)) {
233 memcached_set_error(*instance, rc, MEMCACHED_AT);
234 continue;
235 }
236 hosts_connected++;
237
238 if ((memcached_io_writev(instance, vector, 1, false)) == false) {
239 failures_occured_in_sending = true;
240 continue;
241 }
242 WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
243 memcached_instance_response_increment(instance);
244 WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
245 }
246
247 {
248 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) {
249 memcached_instance_response_reset(instance);
250 failures_occured_in_sending = true;
251 continue;
252 }
253 }
254 }
255
256 if (hosts_connected == 0) {
257 LIBMEMCACHED_MEMCACHED_MGET_END();
258
259 if (memcached_failed(rc)) {
260 return rc;
261 }
262
263 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
264 }
265
266 /*
267 Should we muddle on if some servers are dead?
268 */
269 bool success_happened = false;
270 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
271 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
272
273 if (instance->response_count()) {
274 /* We need to do something about non-connnected hosts in the future */
275 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) {
276 failures_occured_in_sending = true;
277 } else {
278 success_happened = true;
279 }
280 }
281 }
282
283 LIBMEMCACHED_MEMCACHED_MGET_END();
284
285 if (failures_occured_in_sending and success_happened) {
286 return MEMCACHED_SOME_ERRORS;
287 }
288
289 if (success_happened) {
290 return MEMCACHED_SUCCESS;
291 }
292
293 return MEMCACHED_FAILURE; // Complete failure occurred
294 }
295
296 memcached_return_t memcached_mget_by_key(memcached_st *shell, const char *group_key,
297 size_t group_key_length, const char *const *keys,
298 const size_t *key_length, size_t number_of_keys) {
299 Memcached *ptr = memcached2Memcached(shell);
300 return __mget_by_key_real(ptr, group_key, group_key_length, keys, key_length, number_of_keys,
301 true);
302 }
303
304 memcached_return_t memcached_mget_execute(memcached_st *ptr, const char *const *keys,
305 const size_t *key_length, size_t number_of_keys,
306 memcached_execute_fn *callback, void *context,
307 unsigned int number_of_callbacks) {
308 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length, number_of_keys, callback,
309 context, number_of_callbacks);
310 }
311
312 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell, const char *group_key,
313 size_t group_key_length, const char *const *keys,
314 const size_t *key_length, size_t number_of_keys,
315 memcached_execute_fn *callback, void *context,
316 unsigned int number_of_callbacks) {
317 Memcached *ptr = memcached2Memcached(shell);
318 memcached_return_t rc;
319 if (memcached_failed(rc = initialize_query(ptr, false))) {
320 return rc;
321 }
322
323 if (memcached_is_udp(ptr)) {
324 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
325 }
326
327 if (memcached_is_binary(ptr) == false) {
328 return memcached_set_error(
329 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
330 memcached_literal_param(
331 "ASCII protocol is not supported for memcached_mget_execute_by_key()"));
332 }
333
334 memcached_callback_st *original_callbacks = ptr->callbacks;
335 memcached_callback_st cb = {callback, context, number_of_callbacks};
336
337 ptr->callbacks = &cb;
338 rc = memcached_mget_by_key(ptr, group_key, group_key_length, keys, key_length, number_of_keys);
339 ptr->callbacks = original_callbacks;
340
341 return rc;
342 }
343
344 static memcached_return_t simple_binary_mget(memcached_st *ptr, const uint32_t master_server_key,
345 bool is_group_key_set, const char *const *keys,
346 const size_t *key_length, const size_t number_of_keys,
347 const bool mget_mode) {
348 memcached_return_t rc = MEMCACHED_NOTFOUND;
349
350 bool flush = (number_of_keys == 1);
351
352 if (memcached_failed(rc = memcached_key_test(*ptr, keys, key_length, number_of_keys))) {
353 return rc;
354 }
355
356 /*
357 If a server fails we warn about errors and start all over with sending keys
358 to the server.
359 */
360 for (uint32_t x = 0; x < number_of_keys; ++x) {
361 uint32_t server_key;
362
363 if (is_group_key_set) {
364 server_key = master_server_key;
365 } else {
366 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
367 }
368
369 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
370
371 if (instance->response_count() == 0) {
372 rc = memcached_connect(instance);
373 if (memcached_failed(rc)) {
374 continue;
375 }
376 }
377
378 protocol_binary_request_getk request = {}; //= {.bytes= {0}};
379 initialize_binary_request(instance, request.message.header);
380 if (mget_mode) {
381 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETKQ;
382 } else {
383 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
384 }
385
386 #if 0
387 {
388 memcached_return_t vk= memcached_validate_key_length(key_length[x], ptr->flags.binary_protocol);
389 if (memcached_failed(rc= memcached_key_test(*memc, (const char **)&key, &key_length, 1)))
390 {
391 memcached_set_error(ptr, vk, MEMCACHED_AT, memcached_literal_param("Key was too long."));
392
393 if (x > 0)
394 {
395 memcached_io_reset(instance);
396 }
397
398 return vk;
399 }
400 }
401 #endif
402
403 request.message.header.request.keylen =
404 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
405 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
406 request.message.header.request.bodylen =
407 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
408
409 libmemcached_io_vector_st vector[] = {
410 {request.bytes, sizeof(request.bytes)},
411 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
412 {keys[x], key_length[x]}};
413
414 if (memcached_io_writev(instance, vector, 3, flush) == false) {
415 memcached_server_response_reset(instance);
416 rc = MEMCACHED_SOME_ERRORS;
417 continue;
418 }
419
420 /* We just want one pending response per server */
421 memcached_server_response_reset(instance);
422 memcached_server_response_increment(instance);
423 if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
424 {
425 rc = MEMCACHED_SOME_ERRORS;
426 }
427 }
428
429 if (mget_mode) {
430 /*
431 Send a noop command to flush the buffers
432 */
433 protocol_binary_request_noop request = {}; //= {.bytes= {0}};
434 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
435 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
436
437 for (uint32_t x = 0; x < memcached_server_count(ptr); ++x) {
438 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
439
440 if (instance->response_count()) {
441 initialize_binary_request(instance, request.message.header);
442 if ((memcached_io_write(instance) == false)
443 or (memcached_io_write(instance, request.bytes, sizeof(request.bytes), true) == -1))
444 {
445 memcached_instance_response_reset(instance);
446 memcached_io_reset(instance);
447 rc = MEMCACHED_SOME_ERRORS;
448 }
449 }
450 }
451 }
452
453 return rc;
454 }
455
456 static memcached_return_t replication_binary_mget(memcached_st *ptr, uint32_t *hash,
457 bool *dead_servers, const char *const *keys,
458 const size_t *key_length,
459 const size_t number_of_keys) {
460 memcached_return_t rc = MEMCACHED_NOTFOUND;
461 uint32_t start = 0;
462 uint64_t randomize_read = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
463
464 if (randomize_read) {
465 start = (uint32_t) random() % (uint32_t)(ptr->number_of_replicas + 1);
466 }
467
468 /* Loop for each replica */
469 for (uint32_t replica = 0; replica <= ptr->number_of_replicas; ++replica) {
470 bool success = true;
471
472 for (uint32_t x = 0; x < number_of_keys; ++x) {
473 if (hash[x] == memcached_server_count(ptr)) {
474 continue; /* Already successfully sent */
475 }
476
477 uint32_t server = hash[x] + replica;
478
479 /* In case of randomized reads */
480 if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas))) {
481 server += start;
482 }
483
484 while (server >= memcached_server_count(ptr)) {
485 server -= memcached_server_count(ptr);
486 }
487
488 if (dead_servers[server]) {
489 continue;
490 }
491
492 memcached_instance_st *instance = memcached_instance_fetch(ptr, server);
493
494 if (instance->response_count() == 0) {
495 rc = memcached_connect(instance);
496
497 if (memcached_failed(rc)) {
498 memcached_io_reset(instance);
499 dead_servers[server] = true;
500 success = false;
501 continue;
502 }
503 }
504
505 protocol_binary_request_getk request = {};
506 initialize_binary_request(instance, request.message.header);
507 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
508 request.message.header.request.keylen =
509 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
510 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
511 request.message.header.request.bodylen =
512 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
513
514 /*
515 * We need to disable buffering to actually know that the request was
516 * successfully sent to the server (so that we should expect a result
517 * back). It would be nice to do this in buffered mode, but then it
518 * would be complex to handle all error situations if we got to send
519 * some of the messages, and then we failed on writing out some others
520 * and we used the callback interface from memcached_mget_execute so
521 * that we might have processed some of the responses etc. For now,
522 * just make sure we work _correctly_
523 */
524 libmemcached_io_vector_st vector[] = {
525 {request.bytes, sizeof(request.bytes)},
526 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
527 {keys[x], key_length[x]}};
528
529 if (memcached_io_writev(instance, vector, 3, true) == false) {
530 memcached_io_reset(instance);
531 dead_servers[server] = true;
532 success = false;
533 continue;
534 }
535
536 memcached_server_response_increment(instance);
537 hash[x] = memcached_server_count(ptr);
538 }
539
540 if (success) {
541 break;
542 }
543 }
544
545 return rc;
546 }
547
548 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
549 bool is_group_key_set, const char *const *keys,
550 const size_t *key_length, const size_t number_of_keys,
551 const bool mget_mode) {
552 if (ptr->number_of_replicas == 0) {
553 return simple_binary_mget(ptr, master_server_key, is_group_key_set, keys, key_length,
554 number_of_keys, mget_mode);
555 }
556
557 uint32_t *hash = libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
558 bool *dead_servers = libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
559
560 if (hash == NULL or dead_servers == NULL) {
561 libmemcached_free(ptr, hash);
562 libmemcached_free(ptr, dead_servers);
563 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
564 }
565
566 if (is_group_key_set) {
567 for (size_t x = 0; x < number_of_keys; x++) {
568 hash[x] = master_server_key;
569 }
570 } else {
571 for (size_t x = 0; x < number_of_keys; x++) {
572 hash[x] = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
573 }
574 }
575
576 memcached_return_t rc =
577 replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys);
578
579 WATCHPOINT_IFERROR(rc);
580 libmemcached_free(ptr, hash);
581 libmemcached_free(ptr, dead_servers);
582
583 return MEMCACHED_SUCCESS;
584 }