4dd88061fc682a14a5092cc10ad801fb5cc567e9
[m6w6/libmemcached] / src / libmemcached / get.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 char *memcached_get(memcached_st *ptr, const char *key, size_t key_length, size_t *value_length,
19 uint32_t *flags, memcached_return_t *error) {
20 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length, flags, error);
21 }
22
23 static memcached_return_t __mget_by_key_real(memcached_st *ptr, const char *group_key,
24 size_t group_key_length, const char *const *keys,
25 const size_t *key_length, size_t number_of_keys,
26 const bool mget_mode);
27 char *memcached_get_by_key(memcached_st *shell, const char *group_key, size_t group_key_length,
28 const char *key, size_t key_length, size_t *value_length,
29 uint32_t *flags, memcached_return_t *error) {
30 Memcached *ptr = memcached2Memcached(shell);
31 memcached_return_t unused;
32 if (error == NULL) {
33 error = &unused;
34 }
35
36 uint64_t query_id = 0;
37 if (ptr) {
38 query_id = ptr->query_id;
39 }
40
41 /* Request the key */
42 *error = __mget_by_key_real(ptr, group_key, group_key_length, (const char *const *) &key,
43 &key_length, 1, false);
44 if (ptr) {
45 assert_msg(ptr->query_id == query_id + 1,
46 "Programmer error, the query_id was not incremented.");
47 }
48
49 if (memcached_failed(*error)) {
50 if (ptr) {
51 if (memcached_has_current_error(*ptr)) // Find the most accurate error
52 {
53 *error = memcached_last_error(ptr);
54 }
55 }
56
57 if (value_length) {
58 *value_length = 0;
59 }
60
61 return NULL;
62 }
63
64 char *value = memcached_fetch(ptr, NULL, NULL, value_length, flags, error);
65 assert_msg(ptr->query_id == query_id + 1, "Programmer error, the query_id was not incremented.");
66
67 /* This is for historical reasons */
68 if (*error == MEMCACHED_END) {
69 *error = MEMCACHED_NOTFOUND;
70 }
71 if (value == NULL) {
72 if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND) {
73 memcached_result_st key_failure_result;
74 memcached_result_st *result_ptr = memcached_result_create(ptr, &key_failure_result);
75 memcached_return_t rc = ptr->get_key_failure(ptr, key, key_length, result_ptr);
76
77 /* On all failure drop to returning NULL */
78 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
79 if (rc == MEMCACHED_BUFFERED) {
80 uint64_t latch; /* We use latch to track the state of the original socket */
81 latch = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
82 if (latch == 0) {
83 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
84 }
85
86 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
87 (memcached_result_length(result_ptr)), 0,
88 (memcached_result_flags(result_ptr)));
89
90 if (rc == MEMCACHED_BUFFERED and latch == 0) {
91 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
92 }
93 } else {
94 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
95 (memcached_result_length(result_ptr)), 0,
96 (memcached_result_flags(result_ptr)));
97 }
98
99 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
100 *error = rc;
101 if (value_length) {
102 *value_length = memcached_result_length(result_ptr);
103 }
104 if (flags) {
105 *flags = memcached_result_flags(result_ptr);
106 }
107 char *result_value = memcached_string_take_value(&result_ptr->value);
108 memcached_result_free(result_ptr);
109
110 return result_value;
111 }
112 }
113
114 memcached_result_free(result_ptr);
115 }
116 assert_msg(ptr->query_id == query_id + 1,
117 "Programmer error, the query_id was not incremented.");
118
119 return NULL;
120 }
121
122 return value;
123 }
124
125 memcached_return_t memcached_mget(memcached_st *ptr, const char *const *keys,
126 const size_t *key_length, size_t number_of_keys) {
127 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
128 }
129
130 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
131 const bool is_group_key_set, const char *const *keys,
132 const size_t *key_length, const size_t number_of_keys,
133 const bool mget_mode);
134
135 static memcached_return_t __mget_by_key_real(memcached_st *ptr, const char *group_key,
136 const size_t group_key_length, const char *const *keys,
137 const size_t *key_length, size_t number_of_keys,
138 const bool mget_mode) {
139 bool failures_occured_in_sending = false;
140 const char *get_command = "get";
141 uint8_t get_command_length = 3;
142 unsigned int master_server_key = (unsigned int) -1; /* 0 is a valid server id! */
143
144 memcached_return_t rc;
145 if (memcached_failed(rc = initialize_query(ptr, true))) {
146 return rc;
147 }
148
149 if (memcached_is_udp(ptr)) {
150 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
151 }
152
153 LIBMEMCACHED_MEMCACHED_MGET_START();
154
155 if (number_of_keys == 0) {
156 return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
157 memcached_literal_param("Numbers of keys provided was zero"));
158 }
159
160 if (memcached_failed((rc = memcached_key_test(*ptr, keys, key_length, number_of_keys)))) {
161 assert(memcached_last_error(ptr) == rc);
162
163 return rc;
164 }
165
166 bool is_group_key_set = false;
167 if (group_key and group_key_length) {
168 master_server_key =
169 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
170 is_group_key_set = true;
171 }
172
173 /*
174 Here is where we pay for the non-block API. We need to remove any data sitting
175 in the queue before we start our get.
176
177 It might be optimum to bounce the connection if count > some number.
178 */
179 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
180 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
181
182 if (instance->response_count()) {
183 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
184
185 if (ptr->flags.no_block || ptr->flags.buffer_requests) {
186 memcached_io_write(instance);
187 }
188
189 while (instance->response_count()) {
190 (void) memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
191 }
192 }
193 }
194
195 if (memcached_is_binary(ptr)) {
196 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys, key_length,
197 number_of_keys, mget_mode);
198 }
199
200 if (ptr->flags.support_cas) {
201 get_command = "gets";
202 get_command_length = 4;
203 }
204
205 /*
206 If a server fails we warn about errors and start all over with sending keys
207 to the server.
208 */
209 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
210 size_t hosts_connected = 0;
211 for (uint32_t x = 0; x < number_of_keys; x++) {
212 uint32_t server_key;
213
214 if (is_group_key_set) {
215 server_key = master_server_key;
216 } else {
217 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
218 }
219
220 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
221
222 libmemcached_io_vector_st vector[] = {
223 {get_command, get_command_length},
224 {memcached_literal_param(" ")},
225 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
226 {keys[x], key_length[x]}};
227
228 if (instance->response_count() == 0) {
229 rc = memcached_connect(instance);
230
231 if (memcached_failed(rc)) {
232 memcached_set_error(*instance, rc, MEMCACHED_AT);
233 continue;
234 }
235 hosts_connected++;
236
237 if ((memcached_io_writev(instance, vector, 1, false)) == false) {
238 failures_occured_in_sending = true;
239 continue;
240 }
241 WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
242 memcached_instance_response_increment(instance);
243 WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
244 }
245
246 {
247 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) {
248 memcached_instance_response_reset(instance);
249 failures_occured_in_sending = true;
250 continue;
251 }
252 }
253 }
254
255 if (hosts_connected == 0) {
256 LIBMEMCACHED_MEMCACHED_MGET_END();
257
258 if (memcached_failed(rc)) {
259 return rc;
260 }
261
262 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
263 }
264
265 /*
266 Should we muddle on if some servers are dead?
267 */
268 bool success_happened = false;
269 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
270 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
271
272 if (instance->response_count()) {
273 /* We need to do something about non-connnected hosts in the future */
274 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) {
275 failures_occured_in_sending = true;
276 } else {
277 success_happened = true;
278 }
279 }
280 }
281
282 LIBMEMCACHED_MEMCACHED_MGET_END();
283
284 if (failures_occured_in_sending and success_happened) {
285 return MEMCACHED_SOME_ERRORS;
286 }
287
288 if (success_happened) {
289 return MEMCACHED_SUCCESS;
290 }
291
292 return MEMCACHED_FAILURE; // Complete failure occurred
293 }
294
295 memcached_return_t memcached_mget_by_key(memcached_st *shell, const char *group_key,
296 size_t group_key_length, const char *const *keys,
297 const size_t *key_length, size_t number_of_keys) {
298 Memcached *ptr = memcached2Memcached(shell);
299 return __mget_by_key_real(ptr, group_key, group_key_length, keys, key_length, number_of_keys,
300 true);
301 }
302
303 memcached_return_t memcached_mget_execute(memcached_st *ptr, const char *const *keys,
304 const size_t *key_length, size_t number_of_keys,
305 memcached_execute_fn *callback, void *context,
306 unsigned int number_of_callbacks) {
307 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length, number_of_keys, callback,
308 context, number_of_callbacks);
309 }
310
311 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell, const char *group_key,
312 size_t group_key_length, const char *const *keys,
313 const size_t *key_length, size_t number_of_keys,
314 memcached_execute_fn *callback, void *context,
315 unsigned int number_of_callbacks) {
316 Memcached *ptr = memcached2Memcached(shell);
317 memcached_return_t rc;
318 if (memcached_failed(rc = initialize_query(ptr, false))) {
319 return rc;
320 }
321
322 if (memcached_is_udp(ptr)) {
323 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
324 }
325
326 if (memcached_is_binary(ptr) == false) {
327 return memcached_set_error(
328 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
329 memcached_literal_param(
330 "ASCII protocol is not supported for memcached_mget_execute_by_key()"));
331 }
332
333 memcached_callback_st *original_callbacks = ptr->callbacks;
334 memcached_callback_st cb = {callback, context, number_of_callbacks};
335
336 ptr->callbacks = &cb;
337 rc = memcached_mget_by_key(ptr, group_key, group_key_length, keys, key_length, number_of_keys);
338 ptr->callbacks = original_callbacks;
339
340 return rc;
341 }
342
343 static memcached_return_t simple_binary_mget(memcached_st *ptr, const uint32_t master_server_key,
344 bool is_group_key_set, const char *const *keys,
345 const size_t *key_length, const size_t number_of_keys,
346 const bool mget_mode) {
347 memcached_return_t rc = MEMCACHED_NOTFOUND;
348
349 bool flush = (number_of_keys == 1);
350
351 if (memcached_failed(rc = memcached_key_test(*ptr, keys, key_length, number_of_keys))) {
352 return rc;
353 }
354
355 /*
356 If a server fails we warn about errors and start all over with sending keys
357 to the server.
358 */
359 for (uint32_t x = 0; x < number_of_keys; ++x) {
360 uint32_t server_key;
361
362 if (is_group_key_set) {
363 server_key = master_server_key;
364 } else {
365 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
366 }
367
368 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
369
370 if (instance->response_count() == 0) {
371 rc = memcached_connect(instance);
372 if (memcached_failed(rc)) {
373 continue;
374 }
375 }
376
377 protocol_binary_request_getk request = {}; //= {.bytes= {0}};
378 initialize_binary_request(instance, request.message.header);
379 if (mget_mode) {
380 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETKQ;
381 } else {
382 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
383 }
384
385 #if 0
386 {
387 memcached_return_t vk= memcached_validate_key_length(key_length[x], ptr->flags.binary_protocol);
388 if (memcached_failed(rc= memcached_key_test(*memc, (const char **)&key, &key_length, 1)))
389 {
390 memcached_set_error(ptr, vk, MEMCACHED_AT, memcached_literal_param("Key was too long."));
391
392 if (x > 0)
393 {
394 memcached_io_reset(instance);
395 }
396
397 return vk;
398 }
399 }
400 #endif
401
402 request.message.header.request.keylen =
403 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
404 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
405 request.message.header.request.bodylen =
406 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
407
408 libmemcached_io_vector_st vector[] = {
409 {request.bytes, sizeof(request.bytes)},
410 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
411 {keys[x], key_length[x]}};
412
413 if (memcached_io_writev(instance, vector, 3, flush) == false) {
414 memcached_server_response_reset(instance);
415 rc = MEMCACHED_SOME_ERRORS;
416 continue;
417 }
418
419 /* We just want one pending response per server */
420 memcached_server_response_reset(instance);
421 memcached_server_response_increment(instance);
422 if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
423 {
424 rc = MEMCACHED_SOME_ERRORS;
425 }
426 }
427
428 if (mget_mode) {
429 /*
430 Send a noop command to flush the buffers
431 */
432 protocol_binary_request_noop request = {}; //= {.bytes= {0}};
433 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
434 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
435
436 for (uint32_t x = 0; x < memcached_server_count(ptr); ++x) {
437 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
438
439 if (instance->response_count()) {
440 initialize_binary_request(instance, request.message.header);
441 if ((memcached_io_write(instance) == false)
442 or (memcached_io_write(instance, request.bytes, sizeof(request.bytes), true) == -1))
443 {
444 memcached_instance_response_reset(instance);
445 memcached_io_reset(instance);
446 rc = MEMCACHED_SOME_ERRORS;
447 }
448 }
449 }
450 }
451
452 return rc;
453 }
454
455 static memcached_return_t replication_binary_mget(memcached_st *ptr, uint32_t *hash,
456 bool *dead_servers, const char *const *keys,
457 const size_t *key_length,
458 const size_t number_of_keys) {
459 memcached_return_t rc = MEMCACHED_NOTFOUND;
460 uint32_t start = 0;
461 uint64_t randomize_read = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
462
463 if (randomize_read) {
464 start = (uint32_t) random() % (uint32_t)(ptr->number_of_replicas + 1);
465 }
466
467 /* Loop for each replica */
468 for (uint32_t replica = 0; replica <= ptr->number_of_replicas; ++replica) {
469 bool success = true;
470
471 for (uint32_t x = 0; x < number_of_keys; ++x) {
472 if (hash[x] == memcached_server_count(ptr)) {
473 continue; /* Already successfully sent */
474 }
475
476 uint32_t server = hash[x] + replica;
477
478 /* In case of randomized reads */
479 if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas))) {
480 server += start;
481 }
482
483 while (server >= memcached_server_count(ptr)) {
484 server -= memcached_server_count(ptr);
485 }
486
487 if (dead_servers[server]) {
488 continue;
489 }
490
491 memcached_instance_st *instance = memcached_instance_fetch(ptr, server);
492
493 if (instance->response_count() == 0) {
494 rc = memcached_connect(instance);
495
496 if (memcached_failed(rc)) {
497 memcached_io_reset(instance);
498 dead_servers[server] = true;
499 success = false;
500 continue;
501 }
502 }
503
504 protocol_binary_request_getk request = {};
505 initialize_binary_request(instance, request.message.header);
506 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
507 request.message.header.request.keylen =
508 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
509 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
510 request.message.header.request.bodylen =
511 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
512
513 /*
514 * We need to disable buffering to actually know that the request was
515 * successfully sent to the server (so that we should expect a result
516 * back). It would be nice to do this in buffered mode, but then it
517 * would be complex to handle all error situations if we got to send
518 * some of the messages, and then we failed on writing out some others
519 * and we used the callback interface from memcached_mget_execute so
520 * that we might have processed some of the responses etc. For now,
521 * just make sure we work _correctly_
522 */
523 libmemcached_io_vector_st vector[] = {
524 {request.bytes, sizeof(request.bytes)},
525 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
526 {keys[x], key_length[x]}};
527
528 if (memcached_io_writev(instance, vector, 3, true) == false) {
529 memcached_io_reset(instance);
530 dead_servers[server] = true;
531 success = false;
532 continue;
533 }
534
535 memcached_server_response_increment(instance);
536 hash[x] = memcached_server_count(ptr);
537 }
538
539 if (success) {
540 break;
541 }
542 }
543
544 return rc;
545 }
546
547 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
548 bool is_group_key_set, const char *const *keys,
549 const size_t *key_length, const size_t number_of_keys,
550 const bool mget_mode) {
551 if (ptr->number_of_replicas == 0) {
552 return simple_binary_mget(ptr, master_server_key, is_group_key_set, keys, key_length,
553 number_of_keys, mget_mode);
554 }
555
556 uint32_t *hash = libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
557 bool *dead_servers = libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
558
559 if (hash == NULL or dead_servers == NULL) {
560 libmemcached_free(ptr, hash);
561 libmemcached_free(ptr, dead_servers);
562 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
563 }
564
565 if (is_group_key_set) {
566 for (size_t x = 0; x < number_of_keys; x++) {
567 hash[x] = master_server_key;
568 }
569 } else {
570 for (size_t x = 0; x < number_of_keys; x++) {
571 hash[x] = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
572 }
573 }
574
575 memcached_return_t rc =
576 replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys);
577
578 WATCHPOINT_IFERROR(rc);
579 libmemcached_free(ptr, hash);
580 libmemcached_free(ptr, dead_servers);
581
582 return MEMCACHED_SUCCESS;
583 }