src/libmemcached: apply clang-format
[m6w6/libmemcached] / src / libmemcached / get.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 char *memcached_get(memcached_st *ptr, const char *key, size_t key_length, size_t *value_length,
19 uint32_t *flags, memcached_return_t *error) {
20 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length, flags, error);
21 }
22
23 static memcached_return_t __mget_by_key_real(memcached_st *ptr, const char *group_key,
24 size_t group_key_length, const char *const *keys,
25 const size_t *key_length, size_t number_of_keys,
26 const bool mget_mode);
27 char *memcached_get_by_key(memcached_st *shell, const char *group_key, size_t group_key_length,
28 const char *key, size_t key_length, size_t *value_length,
29 uint32_t *flags, memcached_return_t *error) {
30 Memcached *ptr = memcached2Memcached(shell);
31 memcached_return_t unused;
32 if (error == NULL) {
33 error = &unused;
34 }
35
36 uint64_t query_id = 0;
37 if (ptr) {
38 query_id = ptr->query_id;
39 }
40
41 /* Request the key */
42 *error = __mget_by_key_real(ptr, group_key, group_key_length, (const char *const *) &key,
43 &key_length, 1, false);
44 if (ptr) {
45 assert_msg(ptr->query_id == query_id + 1,
46 "Programmer error, the query_id was not incremented.");
47 }
48
49 if (memcached_failed(*error)) {
50 if (ptr) {
51 if (memcached_has_current_error(*ptr)) // Find the most accurate error
52 {
53 *error = memcached_last_error(ptr);
54 }
55 }
56
57 if (value_length) {
58 *value_length = 0;
59 }
60
61 return NULL;
62 }
63
64 char *value = memcached_fetch(ptr, NULL, NULL, value_length, flags, error);
65 assert_msg(ptr->query_id == query_id + 1, "Programmer error, the query_id was not incremented.");
66
67 /* This is for historical reasons */
68 if (*error == MEMCACHED_END) {
69 *error = MEMCACHED_NOTFOUND;
70 }
71 if (value == NULL) {
72 if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND) {
73 memcached_result_st key_failure_result;
74 memcached_result_st *result_ptr = memcached_result_create(ptr, &key_failure_result);
75 memcached_return_t rc = ptr->get_key_failure(ptr, key, key_length, result_ptr);
76
77 /* On all failure drop to returning NULL */
78 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
79 if (rc == MEMCACHED_BUFFERED) {
80 uint64_t latch; /* We use latch to track the state of the original socket */
81 latch = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
82 if (latch == 0) {
83 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
84 }
85
86 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
87 (memcached_result_length(result_ptr)), 0,
88 (memcached_result_flags(result_ptr)));
89
90 if (rc == MEMCACHED_BUFFERED and latch == 0) {
91 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
92 }
93 } else {
94 rc = memcached_set(ptr, key, key_length, (memcached_result_value(result_ptr)),
95 (memcached_result_length(result_ptr)), 0,
96 (memcached_result_flags(result_ptr)));
97 }
98
99 if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED) {
100 *error = rc;
101 *value_length = memcached_result_length(result_ptr);
102 *flags = memcached_result_flags(result_ptr);
103 char *result_value = memcached_string_take_value(&result_ptr->value);
104 memcached_result_free(result_ptr);
105
106 return result_value;
107 }
108 }
109
110 memcached_result_free(result_ptr);
111 }
112 assert_msg(ptr->query_id == query_id + 1,
113 "Programmer error, the query_id was not incremented.");
114
115 return NULL;
116 }
117
118 return value;
119 }
120
121 memcached_return_t memcached_mget(memcached_st *ptr, const char *const *keys,
122 const size_t *key_length, size_t number_of_keys) {
123 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
124 }
125
126 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
127 const bool is_group_key_set, const char *const *keys,
128 const size_t *key_length, const size_t number_of_keys,
129 const bool mget_mode);
130
131 static memcached_return_t __mget_by_key_real(memcached_st *ptr, const char *group_key,
132 const size_t group_key_length, const char *const *keys,
133 const size_t *key_length, size_t number_of_keys,
134 const bool mget_mode) {
135 bool failures_occured_in_sending = false;
136 const char *get_command = "get";
137 uint8_t get_command_length = 3;
138 unsigned int master_server_key = (unsigned int) -1; /* 0 is a valid server id! */
139
140 memcached_return_t rc;
141 if (memcached_failed(rc = initialize_query(ptr, true))) {
142 return rc;
143 }
144
145 if (memcached_is_udp(ptr)) {
146 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
147 }
148
149 LIBMEMCACHED_MEMCACHED_MGET_START();
150
151 if (number_of_keys == 0) {
152 return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
153 memcached_literal_param("Numbers of keys provided was zero"));
154 }
155
156 if (memcached_failed((rc = memcached_key_test(*ptr, keys, key_length, number_of_keys)))) {
157 assert(memcached_last_error(ptr) == rc);
158
159 return rc;
160 }
161
162 bool is_group_key_set = false;
163 if (group_key and group_key_length) {
164 master_server_key =
165 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
166 is_group_key_set = true;
167 }
168
169 /*
170 Here is where we pay for the non-block API. We need to remove any data sitting
171 in the queue before we start our get.
172
173 It might be optimum to bounce the connection if count > some number.
174 */
175 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
176 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
177
178 if (instance->response_count()) {
179 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
180
181 if (ptr->flags.no_block) {
182 memcached_io_write(instance);
183 }
184
185 while (instance->response_count()) {
186 (void) memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
187 }
188 }
189 }
190
191 if (memcached_is_binary(ptr)) {
192 return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys, key_length,
193 number_of_keys, mget_mode);
194 }
195
196 if (ptr->flags.support_cas) {
197 get_command = "gets";
198 get_command_length = 4;
199 }
200
201 /*
202 If a server fails we warn about errors and start all over with sending keys
203 to the server.
204 */
205 WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
206 size_t hosts_connected = 0;
207 for (uint32_t x = 0; x < number_of_keys; x++) {
208 uint32_t server_key;
209
210 if (is_group_key_set) {
211 server_key = master_server_key;
212 } else {
213 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
214 }
215
216 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
217
218 libmemcached_io_vector_st vector[] = {
219 {get_command, get_command_length},
220 {memcached_literal_param(" ")},
221 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
222 {keys[x], key_length[x]}};
223
224 if (instance->response_count() == 0) {
225 rc = memcached_connect(instance);
226
227 if (memcached_failed(rc)) {
228 memcached_set_error(*instance, rc, MEMCACHED_AT);
229 continue;
230 }
231 hosts_connected++;
232
233 if ((memcached_io_writev(instance, vector, 1, false)) == false) {
234 failures_occured_in_sending = true;
235 continue;
236 }
237 WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
238 memcached_instance_response_increment(instance);
239 WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
240 }
241
242 {
243 if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) {
244 memcached_instance_response_reset(instance);
245 failures_occured_in_sending = true;
246 continue;
247 }
248 }
249 }
250
251 if (hosts_connected == 0) {
252 LIBMEMCACHED_MEMCACHED_MGET_END();
253
254 if (memcached_failed(rc)) {
255 return rc;
256 }
257
258 return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
259 }
260
261 /*
262 Should we muddle on if some servers are dead?
263 */
264 bool success_happened = false;
265 for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
266 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
267
268 if (instance->response_count()) {
269 /* We need to do something about non-connnected hosts in the future */
270 if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) {
271 failures_occured_in_sending = true;
272 } else {
273 success_happened = true;
274 }
275 }
276 }
277
278 LIBMEMCACHED_MEMCACHED_MGET_END();
279
280 if (failures_occured_in_sending and success_happened) {
281 return MEMCACHED_SOME_ERRORS;
282 }
283
284 if (success_happened) {
285 return MEMCACHED_SUCCESS;
286 }
287
288 return MEMCACHED_FAILURE; // Complete failure occurred
289 }
290
291 memcached_return_t memcached_mget_by_key(memcached_st *shell, const char *group_key,
292 size_t group_key_length, const char *const *keys,
293 const size_t *key_length, size_t number_of_keys) {
294 Memcached *ptr = memcached2Memcached(shell);
295 return __mget_by_key_real(ptr, group_key, group_key_length, keys, key_length, number_of_keys,
296 true);
297 }
298
299 memcached_return_t memcached_mget_execute(memcached_st *ptr, const char *const *keys,
300 const size_t *key_length, size_t number_of_keys,
301 memcached_execute_fn *callback, void *context,
302 unsigned int number_of_callbacks) {
303 return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length, number_of_keys, callback,
304 context, number_of_callbacks);
305 }
306
307 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell, const char *group_key,
308 size_t group_key_length, const char *const *keys,
309 const size_t *key_length, size_t number_of_keys,
310 memcached_execute_fn *callback, void *context,
311 unsigned int number_of_callbacks) {
312 Memcached *ptr = memcached2Memcached(shell);
313 memcached_return_t rc;
314 if (memcached_failed(rc = initialize_query(ptr, false))) {
315 return rc;
316 }
317
318 if (memcached_is_udp(ptr)) {
319 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
320 }
321
322 if (memcached_is_binary(ptr) == false) {
323 return memcached_set_error(
324 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
325 memcached_literal_param(
326 "ASCII protocol is not supported for memcached_mget_execute_by_key()"));
327 }
328
329 memcached_callback_st *original_callbacks = ptr->callbacks;
330 memcached_callback_st cb = {callback, context, number_of_callbacks};
331
332 ptr->callbacks = &cb;
333 rc = memcached_mget_by_key(ptr, group_key, group_key_length, keys, key_length, number_of_keys);
334 ptr->callbacks = original_callbacks;
335
336 return rc;
337 }
338
339 static memcached_return_t simple_binary_mget(memcached_st *ptr, const uint32_t master_server_key,
340 bool is_group_key_set, const char *const *keys,
341 const size_t *key_length, const size_t number_of_keys,
342 const bool mget_mode) {
343 memcached_return_t rc = MEMCACHED_NOTFOUND;
344
345 bool flush = (number_of_keys == 1);
346
347 if (memcached_failed(rc = memcached_key_test(*ptr, keys, key_length, number_of_keys))) {
348 return rc;
349 }
350
351 /*
352 If a server fails we warn about errors and start all over with sending keys
353 to the server.
354 */
355 for (uint32_t x = 0; x < number_of_keys; ++x) {
356 uint32_t server_key;
357
358 if (is_group_key_set) {
359 server_key = master_server_key;
360 } else {
361 server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
362 }
363
364 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
365
366 if (instance->response_count() == 0) {
367 rc = memcached_connect(instance);
368 if (memcached_failed(rc)) {
369 continue;
370 }
371 }
372
373 protocol_binary_request_getk request = {}; //= {.bytes= {0}};
374 initialize_binary_request(instance, request.message.header);
375 if (mget_mode) {
376 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETKQ;
377 } else {
378 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
379 }
380
381 #if 0
382 {
383 memcached_return_t vk= memcached_validate_key_length(key_length[x], ptr->flags.binary_protocol);
384 if (memcached_failed(rc= memcached_key_test(*memc, (const char **)&key, &key_length, 1)))
385 {
386 memcached_set_error(ptr, vk, MEMCACHED_AT, memcached_literal_param("Key was too long."));
387
388 if (x > 0)
389 {
390 memcached_io_reset(instance);
391 }
392
393 return vk;
394 }
395 }
396 #endif
397
398 request.message.header.request.keylen =
399 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
400 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
401 request.message.header.request.bodylen =
402 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
403
404 libmemcached_io_vector_st vector[] = {
405 {request.bytes, sizeof(request.bytes)},
406 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
407 {keys[x], key_length[x]}};
408
409 if (memcached_io_writev(instance, vector, 3, flush) == false) {
410 memcached_server_response_reset(instance);
411 rc = MEMCACHED_SOME_ERRORS;
412 continue;
413 }
414
415 /* We just want one pending response per server */
416 memcached_server_response_reset(instance);
417 memcached_server_response_increment(instance);
418 if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
419 {
420 rc = MEMCACHED_SOME_ERRORS;
421 }
422 }
423
424 if (mget_mode) {
425 /*
426 Send a noop command to flush the buffers
427 */
428 protocol_binary_request_noop request = {}; //= {.bytes= {0}};
429 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
430 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
431
432 for (uint32_t x = 0; x < memcached_server_count(ptr); ++x) {
433 memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
434
435 if (instance->response_count()) {
436 initialize_binary_request(instance, request.message.header);
437 if ((memcached_io_write(instance) == false)
438 or (memcached_io_write(instance, request.bytes, sizeof(request.bytes), true) == -1))
439 {
440 memcached_instance_response_reset(instance);
441 memcached_io_reset(instance);
442 rc = MEMCACHED_SOME_ERRORS;
443 }
444 }
445 }
446 }
447
448 return rc;
449 }
450
451 static memcached_return_t replication_binary_mget(memcached_st *ptr, uint32_t *hash,
452 bool *dead_servers, const char *const *keys,
453 const size_t *key_length,
454 const size_t number_of_keys) {
455 memcached_return_t rc = MEMCACHED_NOTFOUND;
456 uint32_t start = 0;
457 uint64_t randomize_read = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
458
459 if (randomize_read) {
460 start = (uint32_t) random() % (uint32_t)(ptr->number_of_replicas + 1);
461 }
462
463 /* Loop for each replica */
464 for (uint32_t replica = 0; replica <= ptr->number_of_replicas; ++replica) {
465 bool success = true;
466
467 for (uint32_t x = 0; x < number_of_keys; ++x) {
468 if (hash[x] == memcached_server_count(ptr)) {
469 continue; /* Already successfully sent */
470 }
471
472 uint32_t server = hash[x] + replica;
473
474 /* In case of randomized reads */
475 if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas))) {
476 server += start;
477 }
478
479 while (server >= memcached_server_count(ptr)) {
480 server -= memcached_server_count(ptr);
481 }
482
483 if (dead_servers[server]) {
484 continue;
485 }
486
487 memcached_instance_st *instance = memcached_instance_fetch(ptr, server);
488
489 if (instance->response_count() == 0) {
490 rc = memcached_connect(instance);
491
492 if (memcached_failed(rc)) {
493 memcached_io_reset(instance);
494 dead_servers[server] = true;
495 success = false;
496 continue;
497 }
498 }
499
500 protocol_binary_request_getk request = {};
501 initialize_binary_request(instance, request.message.header);
502 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_GETK;
503 request.message.header.request.keylen =
504 htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
505 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
506 request.message.header.request.bodylen =
507 htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
508
509 /*
510 * We need to disable buffering to actually know that the request was
511 * successfully sent to the server (so that we should expect a result
512 * back). It would be nice to do this in buffered mode, but then it
513 * would be complex to handle all error situations if we got to send
514 * some of the messages, and then we failed on writing out some others
515 * and we used the callback interface from memcached_mget_execute so
516 * that we might have processed some of the responses etc. For now,
517 * just make sure we work _correctly_
518 */
519 libmemcached_io_vector_st vector[] = {
520 {request.bytes, sizeof(request.bytes)},
521 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
522 {keys[x], key_length[x]}};
523
524 if (memcached_io_writev(instance, vector, 3, true) == false) {
525 memcached_io_reset(instance);
526 dead_servers[server] = true;
527 success = false;
528 continue;
529 }
530
531 memcached_server_response_increment(instance);
532 hash[x] = memcached_server_count(ptr);
533 }
534
535 if (success) {
536 break;
537 }
538 }
539
540 return rc;
541 }
542
543 static memcached_return_t binary_mget_by_key(memcached_st *ptr, const uint32_t master_server_key,
544 bool is_group_key_set, const char *const *keys,
545 const size_t *key_length, const size_t number_of_keys,
546 const bool mget_mode) {
547 if (ptr->number_of_replicas == 0) {
548 return simple_binary_mget(ptr, master_server_key, is_group_key_set, keys, key_length,
549 number_of_keys, mget_mode);
550 }
551
552 uint32_t *hash = libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
553 bool *dead_servers = libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
554
555 if (hash == NULL or dead_servers == NULL) {
556 libmemcached_free(ptr, hash);
557 libmemcached_free(ptr, dead_servers);
558 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
559 }
560
561 if (is_group_key_set) {
562 for (size_t x = 0; x < number_of_keys; x++) {
563 hash[x] = master_server_key;
564 }
565 } else {
566 for (size_t x = 0; x < number_of_keys; x++) {
567 hash[x] = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
568 }
569 }
570
571 memcached_return_t rc =
572 replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys);
573
574 WATCHPOINT_IFERROR(rc);
575 libmemcached_free(ptr, hash);
576 libmemcached_free(ptr, dead_servers);
577
578 return MEMCACHED_SUCCESS;
579 }