libmemcached: add MEMCACHED_BEHAVIOR_META_PROTOCOL
[awesomized/libmemcached] / src / libmemcached / get.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached-awesome - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17 #include "p9y/random.hpp"
18
19 char *memcached_get(memcached_st *ptr, const char *key, size_t key_length, size_t *value_length,
20 uint32_t *flags, memcached_return_t *error) {
21 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length, flags, error);
22 }
23
24 static memcached_return_t mget_by_key_real(memcached_st *ptr, const char *group_key,
25 size_t group_key_length, const char *const *keys,
26 const size_t *key_length, size_t number_of_keys,
27 const bool mget_mode);
28 char *memcached_get_by_key(memcached_st *shell, const char *group_key, size_t group_key_length,
29 const char *key, size_t key_length, size_t *value_length,
30 uint32_t *flags, memcached_return_t *error) {
31 Memcached *ptr = memcached2Memcached(shell);
32 memcached_return_t unused;
33 if (error == NULL) {
34 error = &unused;
35 }
36
37 uint64_t query_id = 0;
38 if (ptr) {
39 query_id = ptr->query_id;
40 }
41
42 /* Request the key */
43 *error = mget_by_key_real(ptr, group_key, group_key_length, (const char *const *) &key,
44 &key_length, 1, false);
45 if (ptr) {
46 assert_msg(ptr->query_id == query_id + 1,
47 "Programmer error, the query_id was not incremented.");
48 }
49
50 if (memcached_failed(*error)) {
51 if (ptr) {
52 if (memcached_has_current_error(*ptr)) // Find the most accurate error
53 {
54 *error = memcached_last_error(ptr);
55 }
56 }
57
58 if (value_length) {
59 *value_length = 0;
60 }
61
62 return NULL;
63 }
64
65 char *value = memcached_fetch(ptr, NULL, NULL, value_length, flags, error);
66 assert_msg(ptr->query_id == query_id + 1, "Programmer error, the query_id was not incremented.");
67
68 /* This is for historical reasons */
69 if (*error == MEMCACHED_END) {
70 *error = MEMCACHED_NOTFOUND;
71 }
72 if (value == NULL) {
73 if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND) {
74 memcached_result_st key_failure_result;
75 memcached_result_st *result_ptr = memcached_result_create(ptr, &key_failure_result);
76 memcached_return_t rc = ptr->get_key_failure(ptr, key, key_length, result_ptr);
77
78 /* On all failure drop to returning NULL */
79 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
80 if (rc == MEMCACHED_BUFFERED) {
81 uint64_t latch; /* We use latch to track the state of the original socket */
82 latch = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
83 if (latch == 0) {
84 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
85 }
86
87 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
88 (memcached_result_length(result_ptr)), 0,
89 (memcached_result_flags(result_ptr)));
90
91 if (rc == MEMCACHED_BUFFERED and latch == 0) {
92 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
93 }
94 } else {
95 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
96 (memcached_result_length(result_ptr)), 0,
97 (memcached_result_flags(result_ptr)));
98 }
99
100 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
101 *error = rc;
102 if (value_length) {
103 *value_length = memcached_result_length(result_ptr);
104 }
105 if (flags) {
106 *flags = memcached_result_flags(result_ptr);
107 }
108 char *result_value = memcached_string_take_value(&result_ptr->value);
109 memcached_result_free(result_ptr);
110
111 return result_value;
112 }
113 }
114
115 memcached_result_free(result_ptr);
116 }
117 assert_msg(ptr->query_id == query_id + 1,
118 "Programmer error, the query_id was not incremented.");
119
120 return NULL;
121 }
122
123 return value;
124 }
125
126 memcached_return_t memcached_mget(memcached_st *ptr, const char *const *keys,
127 const size_t *key_length, size_t number_of_keys) {
128 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
129 }
130
131 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
132 const bool is_group_key_set, const char *const *keys,
133 const size_t *key_length, const size_t number_of_keys,
134 const bool mget_mode);
135
136 static memcached_return_t mget_by_key_real(memcached_st *ptr, const char *group_key,
137 size_t group_key_length, const char *const *keys,
138 const size_t *key_length, size_t number_of_keys,
139 const bool mget_mode) {
140 bool failures_occurred_in_sending = false, success_happened = false;
141 const char *get_command = "get";
142 uint8_t get_command_length = 3;
143 unsigned int master_server_key = (unsigned int) -1; /* 0 is a valid server id! */
144
145 memcached_return_t rc;
146 if (memcached_failed(rc = initialize_query(ptr, true))) {
147 return rc;
148 }
149
150 if (memcached_is_udp(ptr)) {
151 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
152 }
153
154 LIBMEMCACHED_MEMCACHED_MGET_START();
155
156 if (number_of_keys == 0) {
157 return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
158 memcached_literal_param("Numbers of keys provided was zero"));
159 }
160
161 if (memcached_failed((rc = memcached_key_test(*ptr, keys, key_length, number_of_keys)))) {
162 assert(memcached_last_error(ptr) == rc);
163
164 return rc;
165 }
166
167 bool is_group_key_set = false;
168 if (group_key and group_key_length) {
169 master_server_key =
170 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
171 is_group_key_set = true;
172 }
173
174 /*
175 Here is where we pay for the non-block API. We need to remove any data sitting
176 in the queue before we start our get.
177
178 It might be optimum to bounce the connection if count > some number.
179 */
180 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
181 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
182
183 if (instance->response_count()) {
184 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
185
186 if (ptr->flags.no_block || ptr->flags.buffer_requests) {
187 memcached_io_write(instance);
188 }
189
190 while (instance->response_count()) {
191 (void) memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
192 }
193 }
194 }
195
196 if (memcached_is_binary(ptr)) {
197 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys, key_length,
198 number_of_keys, mget_mode);
199 }
200
201 if (memcached_is_meta(ptr)) {
202 size_t hosts_connected = 0;
203
204 for (uint32_t x = 0; x < number_of_keys; ++x) {
205 auto io_num = 0;
206 libmemcached_io_vector_st io_vec[8] = {};
207
208 io_vec[io_num++] = {memcached_literal_param("mg ")};
209 io_vec[io_num++] = {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)};
210 io_vec[io_num++] = {keys[x], key_length[x]};
211 if (memcached_is_cas(ptr)) {
212 io_vec[io_num++] = { memcached_literal_param(" c")};
213 }
214 io_vec[io_num++] = {memcached_literal_param(" k t f v\r\n")};
215
216 uint32_t server_key;
217 if (is_group_key_set) {
218 server_key = master_server_key;
219 } else {
220 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
221 }
222
223 auto *instance = memcached_instance_fetch(ptr, server_key);
224 if (!instance->response_count()) {
225 rc = memcached_connect(instance);
226
227 if (memcached_failed(rc)) {
228 memcached_set_error(*instance, rc, MEMCACHED_AT);
229 continue;
230 }
231 hosts_connected++;
232 }
233 if (!memcached_io_writev(instance, io_vec, io_num, false)) {
234 failures_occurred_in_sending = true;
235 continue;
236 }
237 memcached_instance_response_increment(instance);
238 }
239
240 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
241 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
242
243 if (instance->response_count()) {
244 /* We need to do something about non-connected hosts in the future */
245 if (!memcached_io_write(instance)) {
246 failures_occurred_in_sending = true;
247 } else {
248 success_happened = true;
249 }
250 }
251 }
252 } else {
253 if (ptr->flags.support_cas) {
254 get_command = "gets";
255 get_command_length = 4;
256 }
257
258 /*
259 If a server fails we warn about errors and start all over with sending keys
260 to the server.
261 */
262 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
263 size_t hosts_connected = 0;
264 for (uint32_t x = 0; x < number_of_keys; x++) {
265 uint32_t server_key;
266
267 if (is_group_key_set) {
268 server_key = master_server_key;
269 } else {
270 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
271 }
272
273 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
274
275 libmemcached_io_vector_st vector[] = {
276 {get_command, get_command_length},
277 {memcached_literal_param(" ")},
278 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
279 {keys[x], key_length[x]}};
280
281 if (instance->response_count() == 0) {
282 rc = memcached_connect(instance);
283
284 if (memcached_failed(rc)) {
285 memcached_set_error(*instance, rc, MEMCACHED_AT);
286 continue;
287 }
288 hosts_connected++;
289
290 if ((memcached_io_writev(instance, vector, 1, false)) == false) {
291 failures_occurred_in_sending = true;
292 continue;
293 }
294 WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
295 memcached_instance_response_increment(instance);
296 WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
297 }
298
299 {
300 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) {
301 memcached_instance_response_reset(instance);
302 failures_occurred_in_sending = true;
303 continue;
304 }
305 }
306 }
307
308 if (hosts_connected == 0) {
309 LIBMEMCACHED_MEMCACHED_MGET_END();
310
311 if (memcached_failed(rc)) {
312 return rc;
313 }
314
315 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
316 }
317
318 /*
319 Should we muddle on if some servers are dead?
320 */
321 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
322 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
323
324 if (instance->response_count()) {
325 /* We need to do something about non-connnected hosts in the future */
326 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) {
327 failures_occurred_in_sending = true;
328 } else {
329 success_happened = true;
330 }
331 }
332 }
333 }
334
335 LIBMEMCACHED_MEMCACHED_MGET_END();
336
337 if (failures_occurred_in_sending and success_happened) {
338 return MEMCACHED_SOME_ERRORS;
339 }
340
341 if (success_happened) {
342 return MEMCACHED_SUCCESS;
343 }
344
345 return MEMCACHED_FAILURE; // Complete failure occurred
346 }
347
348 memcached_return_t memcached_mget_by_key(memcached_st *shell, const char *group_key,
349 size_t group_key_length, const char *const *keys,
350 const size_t *key_length, size_t number_of_keys) {
351 Memcached *ptr = memcached2Memcached(shell);
352 return mget_by_key_real(ptr, group_key, group_key_length, keys, key_length, number_of_keys, true);
353 }
354
355 memcached_return_t memcached_mget_execute(memcached_st *ptr, const char *const *keys,
356 const size_t *key_length, size_t number_of_keys,
357 memcached_execute_fn *callback, void *context,
358 unsigned int number_of_callbacks) {
359 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length, number_of_keys, callback,
360 context, number_of_callbacks);
361 }
362
363 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell, const char *group_key,
364 size_t group_key_length, const char *const *keys,
365 const size_t *key_length, size_t number_of_keys,
366 memcached_execute_fn *callback, void *context,
367 unsigned int number_of_callbacks) {
368 Memcached *ptr = memcached2Memcached(shell);
369 memcached_return_t rc;
370 if (memcached_failed(rc = initialize_query(ptr, false))) {
371 return rc;
372 }
373
374 if (memcached_is_udp(ptr)) {
375 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
376 }
377
378 if (memcached_is_binary(ptr) == false) {
379 return memcached_set_error(
380 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
381 memcached_literal_param(
382 "ASCII protocol is not supported for memcached_mget_execute_by_key()"));
383 }
384
385 memcached_callback_st *original_callbacks = ptr->callbacks;
386 memcached_callback_st cb = {callback, context, number_of_callbacks};
387
388 ptr->callbacks = &cb;
389 rc = memcached_mget_by_key(ptr, group_key, group_key_length, keys, key_length, number_of_keys);
390 ptr->callbacks = original_callbacks;
391
392 return rc;
393 }
394
395 static memcached_return_t simple_binary_mget(memcached_st *ptr, const uint32_t master_server_key,
396 bool is_group_key_set, const char *const *keys,
397 const size_t *key_length, const size_t number_of_keys,
398 const bool mget_mode) {
399 memcached_return_t rc = MEMCACHED_NOTFOUND;
400
401 bool flush = (number_of_keys == 1);
402
403 if (memcached_failed(rc = memcached_key_test(*ptr, keys, key_length, number_of_keys))) {
404 return rc;
405 }
406
407 /*
408 If a server fails we warn about errors and start all over with sending keys
409 to the server.
410 */
411 for (uint32_t x = 0; x < number_of_keys; ++x) {
412 uint32_t server_key;
413
414 if (is_group_key_set) {
415 server_key = master_server_key;
416 } else {
417 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
418 }
419
420 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
421
422 if (instance->response_count() == 0) {
423 rc = memcached_connect(instance);
424 if (memcached_failed(rc)) {
425 continue;
426 }
427 }
428
429 protocol_binary_request_getk request = {}; //= {.bytes= {0}};
430 initialize_binary_request(instance, request.message.header);
431 if (mget_mode) {
432 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETKQ;
433 } else {
434 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
435 }
436
437 #if 0
438 {
439 memcached_return_t vk= memcached_validate_key_length(key_length[x], ptr->flags.binary_protocol);
440 if (memcached_failed(rc= memcached_key_test(*memc, (const char **)&key, &key_length, 1)))
441 {
442 memcached_set_error(ptr, vk, MEMCACHED_AT, memcached_literal_param("Key was too long."));
443
444 if (x > 0)
445 {
446 memcached_io_reset(instance);
447 }
448
449 return vk;
450 }
451 }
452 #endif
453
454 request.message.header.request.keylen =
455 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
456 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
457 request.message.header.request.bodylen =
458 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
459
460 libmemcached_io_vector_st vector[] = {
461 {request.bytes, sizeof(request.bytes)},
462 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
463 {keys[x], key_length[x]}};
464
465 if (memcached_io_writev(instance, vector, 3, flush) == false) {
466 memcached_server_response_reset(instance);
467 rc = MEMCACHED_SOME_ERRORS;
468 continue;
469 }
470
471 /* We just want one pending response per server */
472 memcached_server_response_reset(instance);
473 memcached_server_response_increment(instance);
474 if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
475 {
476 rc = MEMCACHED_SOME_ERRORS;
477 }
478 }
479
480 if (mget_mode) {
481 /*
482 Send a noop command to flush the buffers
483 */
484 protocol_binary_request_noop request = {}; //= {.bytes= {0}};
485 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
486 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
487
488 for (uint32_t x = 0; x < memcached_server_count(ptr); ++x) {
489 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
490
491 if (instance->response_count()) {
492 initialize_binary_request(instance, request.message.header);
493 if ((memcached_io_write(instance) == false)
494 or (memcached_io_write(instance, request.bytes, sizeof(request.bytes), true) == -1))
495 {
496 memcached_instance_response_reset(instance);
497 memcached_io_reset(instance);
498 rc = MEMCACHED_SOME_ERRORS;
499 }
500 }
501 }
502 }
503
504 return rc;
505 }
506
507 static memcached_return_t replication_binary_mget(memcached_st *ptr, uint32_t *hash,
508 bool *dead_servers, const char *const *keys,
509 const size_t *key_length,
510 const size_t number_of_keys) {
511 memcached_return_t rc = MEMCACHED_NOTFOUND;
512 uint32_t start = 0;
513 uint64_t randomize_read = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
514
515 if (randomize_read) {
516 start = (uint32_t) random() % (uint32_t)(ptr->number_of_replicas + 1);
517 }
518
519 /* Loop for each replica */
520 for (uint32_t replica = 0; replica <= ptr->number_of_replicas; ++replica) {
521 bool success = true;
522
523 for (uint32_t x = 0; x < number_of_keys; ++x) {
524 if (hash[x] == memcached_server_count(ptr)) {
525 continue; /* Already successfully sent */
526 }
527
528 uint32_t server = hash[x] + replica;
529
530 /* In case of randomized reads */
531 if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas))) {
532 server += start;
533 }
534
535 while (server >= memcached_server_count(ptr)) {
536 server -= memcached_server_count(ptr);
537 }
538
539 if (dead_servers[server]) {
540 continue;
541 }
542
543 memcached_instance_st *instance = memcached_instance_fetch(ptr, server);
544
545 if (instance->response_count() == 0) {
546 rc = memcached_connect(instance);
547
548 if (memcached_failed(rc)) {
549 memcached_io_reset(instance);
550 dead_servers[server] = true;
551 success = false;
552 continue;
553 }
554 }
555
556 protocol_binary_request_getk request = {};
557 initialize_binary_request(instance, request.message.header);
558 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
559 request.message.header.request.keylen =
560 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
561 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
562 request.message.header.request.bodylen =
563 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
564
565 /*
566 * We need to disable buffering to actually know that the request was
567 * successfully sent to the server (so that we should expect a result
568 * back). It would be nice to do this in buffered mode, but then it
569 * would be complex to handle all error situations if we got to send
570 * some of the messages, and then we failed on writing out some others
571 * and we used the callback interface from memcached_mget_execute so
572 * that we might have processed some of the responses etc. For now,
573 * just make sure we work _correctly_
574 */
575 libmemcached_io_vector_st vector[] = {
576 {request.bytes, sizeof(request.bytes)},
577 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
578 {keys[x], key_length[x]}};
579
580 if (memcached_io_writev(instance, vector, 3, true) == false) {
581 memcached_io_reset(instance);
582 dead_servers[server] = true;
583 success = false;
584 continue;
585 }
586
587 memcached_server_response_increment(instance);
588 hash[x] = memcached_server_count(ptr);
589 }
590
591 if (success) {
592 break;
593 }
594 }
595
596 return rc;
597 }
598
599 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
600 bool is_group_key_set, const char *const *keys,
601 const size_t *key_length, const size_t number_of_keys,
602 const bool mget_mode) {
603 if (ptr->number_of_replicas == 0) {
604 return simple_binary_mget(ptr, master_server_key, is_group_key_set, keys, key_length,
605 number_of_keys, mget_mode);
606 }
607
608 uint32_t *hash = libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
609 bool *dead_servers = libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
610
611 if (hash == NULL or dead_servers == NULL) {
612 libmemcached_free(ptr, hash);
613 libmemcached_free(ptr, dead_servers);
614 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
615 }
616
617 if (is_group_key_set) {
618 for (size_t x = 0; x < number_of_keys; x++) {
619 hash[x] = master_server_key;
620 }
621 } else {
622 for (size_t x = 0; x < number_of_keys; x++) {
623 hash[x] = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
624 }
625 }
626
627 memcached_return_t rc =
628 replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys);
629
630 WATCHPOINT_IFERROR(rc);
631 libmemcached_free(ptr, hash);
632 libmemcached_free(ptr, dead_servers);
633
634 return MEMCACHED_SUCCESS;
635 }