0b96fbf3e5749840bc117fbb71bf12615354c9fd
[awesomized/libmemcached] / src / libmemcached / storage.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached-awesome - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 enum memcached_storage_action_t { SET_OP, REPLACE_OP, ADD_OP, PREPEND_OP, APPEND_OP, CAS_OP };
19
20 /* Inline this */
21 static inline const char *storage_op_string(memcached_storage_action_t verb) {
22 switch (verb) {
23 case REPLACE_OP:
24 return "replace ";
25
26 case ADD_OP:
27 return "add ";
28
29 case PREPEND_OP:
30 return "prepend ";
31
32 case APPEND_OP:
33 return "append ";
34
35 case CAS_OP:
36 return "cas ";
37
38 case SET_OP:
39 break;
40 }
41
42 return "set ";
43 }
44
45 static inline bool can_by_encrypted(const memcached_storage_action_t verb) {
46 switch (verb) {
47 case SET_OP:
48 case ADD_OP:
49 case CAS_OP:
50 case REPLACE_OP:
51 return true;
52
53 case APPEND_OP:
54 case PREPEND_OP:
55 break;
56 }
57
58 return false;
59 }
60
61 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply) {
62 if (reply == false) {
63 switch (verb) {
64 case SET_OP:
65 return PROTOCOL_BINARY_CMD_SETQ;
66
67 case ADD_OP:
68 return PROTOCOL_BINARY_CMD_ADDQ;
69
70 case CAS_OP: /* FALLTHROUGH */
71 case REPLACE_OP:
72 return PROTOCOL_BINARY_CMD_REPLACEQ;
73
74 case APPEND_OP:
75 return PROTOCOL_BINARY_CMD_APPENDQ;
76
77 case PREPEND_OP:
78 return PROTOCOL_BINARY_CMD_PREPENDQ;
79 }
80 }
81
82 switch (verb) {
83 case SET_OP:
84 break;
85
86 case ADD_OP:
87 return PROTOCOL_BINARY_CMD_ADD;
88
89 case CAS_OP: /* FALLTHROUGH */
90 case REPLACE_OP:
91 return PROTOCOL_BINARY_CMD_REPLACE;
92
93 case APPEND_OP:
94 return PROTOCOL_BINARY_CMD_APPEND;
95
96 case PREPEND_OP:
97 return PROTOCOL_BINARY_CMD_PREPEND;
98 }
99
100 return PROTOCOL_BINARY_CMD_SET;
101 }
102
103 static memcached_return_t memcached_send_binary(Memcached *ptr, memcached_instance_st *server,
104 uint32_t server_key, const char *key,
105 const size_t key_length, const char *value,
106 const size_t value_length, const time_t expiration,
107 const uint32_t flags, const uint64_t cas,
108 const bool flush, const bool reply,
109 memcached_storage_action_t verb) {
110 protocol_binary_request_set request = {};
111 size_t send_length = sizeof(request.bytes);
112
113 initialize_binary_request(server, request.message.header);
114
115 request.message.header.request.opcode = get_com_code(verb, reply);
116 request.message.header.request.keylen =
117 htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
118 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
119 if (verb == APPEND_OP or verb == PREPEND_OP) {
120 send_length -= 8; /* append & prepend does not contain extras! */
121 } else {
122 request.message.header.request.extlen = 8;
123 request.message.body.flags = htonl(flags);
124 request.message.body.expiration = htonl((uint32_t) expiration);
125 }
126
127 request.message.header.request.bodylen =
128 htonl((uint32_t)(key_length + memcached_array_size(ptr->_namespace) + value_length
129 + request.message.header.request.extlen));
130
131 if (cas) {
132 request.message.header.request.cas = memcached_htonll(cas);
133 }
134
135 libmemcached_io_vector_st vector[] = {
136 {NULL, 0},
137 {request.bytes, send_length},
138 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
139 {key, key_length},
140 {value, value_length}};
141
142 /* write the header */
143 memcached_return_t rc;
144 if ((rc = memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS) {
145 assert(memcached_last_error(server->root) != MEMCACHED_SUCCESS);
146 return memcached_last_error(server->root);
147 }
148
149 if (verb == SET_OP and ptr->number_of_replicas > 0) {
150 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_SETQ;
151 WATCHPOINT_STRING("replicating");
152
153 for (uint32_t x = 0; x < ptr->number_of_replicas; x++) {
154 ++server_key;
155 if (server_key == memcached_server_count(ptr)) {
156 server_key = 0;
157 }
158
159 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
160
161 if (memcached_success(memcached_vdo(instance, vector, 5, false))) {
162 memcached_server_response_decrement(instance);
163 }
164 }
165 }
166
167 if (flush == false) {
168 return MEMCACHED_BUFFERED;
169 }
170
171 // No reply always assumes success
172 if (reply == false) {
173 return MEMCACHED_SUCCESS;
174 }
175
176 return memcached_response(server, NULL, 0, NULL);
177 }
178
179 static memcached_return_t
180 memcached_send_meta(memcached_st *ptr, memcached_instance_st *instance,
181 const char *key, size_t key_len,
182 const char *val, size_t val_len,
183 time_t expiration, uint32_t flags, uint64_t cas,
184 bool flush, memcached_storage_action_t verb) {
185 static const char modes[] = "SREPAS";
186 char fl_buf[32] = " F", cs_buf[32] = " C", ex_buf[32] = " T", sz_buf[32] = " ";
187 size_t io_num = 0, fl_len = strlen(fl_buf), cs_len = strlen(cs_buf), ex_len = strlen(ex_buf), sz_len = strlen(sz_buf);
188 libmemcached_io_vector_st io_vec[16] = {};
189
190 io_vec[io_num++] = {memcached_literal_param("ms ")};
191 io_vec[io_num++] = {memcached_array_string(ptr->_namespace),
192 memcached_array_size(ptr->_namespace)};
193 io_vec[io_num++] = {key, key_len};
194
195 sz_len += snprintf(sz_buf + sz_len, sizeof(sz_buf) - sz_len, "%" PRIu64, (uint64_t) val_len);
196 io_vec[io_num++] = {sz_buf, sz_len};
197
198 if (verb != SET_OP) {
199 io_vec[io_num++] = {memcached_literal_param(" M")};
200 io_vec[io_num++] = {&modes[verb], 1};
201 }
202
203 if (!memcached_is_replying(ptr)) {
204 io_vec[io_num++] = { memcached_literal_param(" q")};
205 }
206
207 fl_len += snprintf(fl_buf + fl_len, sizeof(fl_buf) - fl_len, "%" PRIu32, flags);
208 io_vec[io_num++] = {fl_buf, fl_len};
209 if (expiration) {
210 ex_len += snprintf(ex_buf + ex_len, sizeof(ex_buf) - ex_len, "%" PRIi64, (int64_t) expiration);
211 io_vec[io_num++] = {ex_buf, ex_len};
212 }
213 if (cas) {
214 cs_len += snprintf(cs_buf + cs_len, sizeof(cs_buf) - cs_len, "%" PRIu64, cas);
215 io_vec[io_num++] = {cs_buf, cs_len};
216 }
217
218 /* we have to send a data block even if it's empty, else memcached errors out with ITEM TOO BIG */
219 io_vec[io_num++] = {memcached_literal_param("\r\n")};
220 io_vec[io_num++] = {val, val_len};
221 io_vec[io_num++] = {memcached_literal_param("\r\n")};
222
223 /* Send command header */
224 memcached_return_t rc = memcached_vdo(instance, io_vec, io_num, flush);
225
226 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
227 if (!memcached_is_replying(ptr)) {
228 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
229 }
230
231 if (!flush) {
232 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
233 }
234
235 if (rc == MEMCACHED_SUCCESS) {
236 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
237 rc = memcached_response(instance, buffer, sizeof(buffer), NULL);
238
239 if (rc == MEMCACHED_SUCCESS) {
240 return MEMCACHED_SUCCESS;
241 }
242 }
243
244 assert(memcached_failed(rc));
245
246 return rc;
247 }
248
249 static memcached_return_t
250 memcached_send_ascii(Memcached *ptr, memcached_instance_st *instance, const char *key,
251 const size_t key_length, const char *value, const size_t value_length,
252 const time_t expiration, const uint32_t flags, const uint64_t cas,
253 const bool flush, const bool reply, const memcached_storage_action_t verb) {
254 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
255 int flags_buffer_length = snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
256 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0) {
257 return memcached_set_error(
258 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
259 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
260 }
261
262 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
263 int expiration_buffer_length = snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu",
264 (unsigned long long) expiration);
265 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
266 {
267 return memcached_set_error(
268 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
269 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
270 }
271
272 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
273 int value_buffer_length =
274 snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long) value_length);
275 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0) {
276 return memcached_set_error(
277 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
278 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
279 }
280
281 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
282 int cas_buffer_length = 0;
283 if (cas) {
284 cas_buffer_length = snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long) cas);
285 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0) {
286 return memcached_set_error(
287 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
288 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
289 }
290 }
291
292 libmemcached_io_vector_st vector[] = {
293 {NULL, 0},
294 {storage_op_string(verb), strlen(storage_op_string(verb))},
295 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
296 {key, key_length},
297 {flags_buffer, size_t(flags_buffer_length)},
298 {expiration_buffer, size_t(expiration_buffer_length)},
299 {value_buffer, size_t(value_buffer_length)},
300 {cas_buffer, size_t(cas_buffer_length)},
301 {" noreply", reply ? 0 : memcached_literal_param_size(" noreply")},
302 {memcached_literal_param("\r\n")},
303 {value, value_length},
304 {memcached_literal_param("\r\n")}};
305
306 /* Send command header */
307 memcached_return_t rc = memcached_vdo(instance, vector, 12, flush);
308
309 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
310 if (reply == false) {
311 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
312 }
313
314 if (flush == false) {
315 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
316 }
317
318 if (rc == MEMCACHED_SUCCESS) {
319 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
320 rc = memcached_response(instance, buffer, sizeof(buffer), NULL);
321
322 if (rc == MEMCACHED_STORED) {
323 return MEMCACHED_SUCCESS;
324 }
325 }
326
327 assert(memcached_failed(rc));
328 #if 0
329 if (memcached_has_error(ptr) == false)
330 {
331 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
332 }
333 #endif
334
335 return rc;
336 }
337
338 static inline memcached_return_t
339 memcached_send(memcached_st *shell, const char *group_key, size_t group_key_length, const char *key,
340 size_t key_length, const char *value, size_t value_length, const time_t expiration,
341 const uint32_t flags, const uint64_t cas, memcached_storage_action_t verb) {
342 Memcached *ptr = memcached2Memcached(shell);
343 memcached_return_t rc;
344 if (memcached_failed(rc = initialize_query(ptr, true))) {
345 return rc;
346 }
347
348 if (memcached_failed(memcached_key_test(*ptr, (const char **) &key, &key_length, 1))) {
349 return memcached_last_error(ptr);
350 }
351
352 uint32_t server_key =
353 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
354 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
355
356 WATCHPOINT_SET(instance->io_wait_count.read = 0);
357 WATCHPOINT_SET(instance->io_wait_count.write = 0);
358
359 bool flush = true;
360 if (memcached_is_buffering(instance->root) and verb == SET_OP) {
361 flush = false;
362 }
363
364 bool reply = memcached_is_replying(ptr);
365
366 hashkit_string_st *destination = NULL;
367
368 if (memcached_is_encrypted(ptr)) {
369 if (can_by_encrypted(verb) == false) {
370 return memcached_set_error(
371 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
372 memcached_literal_param("Operation not allowed while encyrption is enabled"));
373 }
374
375 if ((destination = hashkit_encrypt(&ptr->hashkit, value, value_length)) == NULL) {
376 return rc;
377 }
378 value = hashkit_string_c_str(destination);
379 value_length = hashkit_string_length(destination);
380 }
381
382 if (memcached_is_binary(ptr)) {
383 rc = memcached_send_binary(ptr, instance, server_key, key, key_length, value, value_length,
384 expiration, flags, cas, flush, reply, verb);
385 } else if (memcached_is_meta(ptr)) {
386 rc = memcached_send_meta(ptr, instance, key, key_length, value, value_length, expiration,
387 flags, cas, flush, verb);
388 } else {
389 rc = memcached_send_ascii(ptr, instance, key, key_length, value, value_length, expiration,
390 flags, cas, flush, reply, verb);
391 }
392
393 hashkit_string_free(destination);
394
395 return rc;
396 }
397
398 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
399 const char *value, size_t value_length, time_t expiration,
400 uint32_t flags) {
401 memcached_return_t rc;
402 LIBMEMCACHED_MEMCACHED_SET_START();
403 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
404 0, SET_OP);
405 LIBMEMCACHED_MEMCACHED_SET_END();
406 return rc;
407 }
408
409 memcached_return_t memcached_add(memcached_st *ptr, const char *key, size_t key_length,
410 const char *value, size_t value_length, time_t expiration,
411 uint32_t flags) {
412 memcached_return_t rc;
413 LIBMEMCACHED_MEMCACHED_ADD_START();
414 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
415 0, ADD_OP);
416
417 LIBMEMCACHED_MEMCACHED_ADD_END();
418 return rc;
419 }
420
421 memcached_return_t memcached_replace(memcached_st *ptr, const char *key, size_t key_length,
422 const char *value, size_t value_length, time_t expiration,
423 uint32_t flags) {
424 memcached_return_t rc;
425 LIBMEMCACHED_MEMCACHED_REPLACE_START();
426 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
427 0, REPLACE_OP);
428 LIBMEMCACHED_MEMCACHED_REPLACE_END();
429 return rc;
430 }
431
432 memcached_return_t memcached_prepend(memcached_st *ptr, const char *key, size_t key_length,
433 const char *value, size_t value_length, time_t expiration,
434 uint32_t flags) {
435 memcached_return_t rc;
436 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
437 0, PREPEND_OP);
438 return rc;
439 }
440
441 memcached_return_t memcached_append(memcached_st *ptr, const char *key, size_t key_length,
442 const char *value, size_t value_length, time_t expiration,
443 uint32_t flags) {
444 memcached_return_t rc;
445 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
446 0, APPEND_OP);
447 return rc;
448 }
449
450 memcached_return_t memcached_cas(memcached_st *ptr, const char *key, size_t key_length,
451 const char *value, size_t value_length, time_t expiration,
452 uint32_t flags, uint64_t cas) {
453 memcached_return_t rc;
454 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
455 cas, CAS_OP);
456 return rc;
457 }
458
459 memcached_return_t memcached_set_by_key(memcached_st *ptr, const char *group_key,
460 size_t group_key_length, const char *key, size_t key_length,
461 const char *value, size_t value_length, time_t expiration,
462 uint32_t flags) {
463 memcached_return_t rc;
464 LIBMEMCACHED_MEMCACHED_SET_START();
465 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
466 expiration, flags, 0, SET_OP);
467 LIBMEMCACHED_MEMCACHED_SET_END();
468 return rc;
469 }
470
471 memcached_return_t memcached_add_by_key(memcached_st *ptr, const char *group_key,
472 size_t group_key_length, const char *key, size_t key_length,
473 const char *value, size_t value_length, time_t expiration,
474 uint32_t flags) {
475 memcached_return_t rc;
476 LIBMEMCACHED_MEMCACHED_ADD_START();
477 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
478 expiration, flags, 0, ADD_OP);
479 LIBMEMCACHED_MEMCACHED_ADD_END();
480 return rc;
481 }
482
483 memcached_return_t memcached_replace_by_key(memcached_st *ptr, const char *group_key,
484 size_t group_key_length, const char *key,
485 size_t key_length, const char *value,
486 size_t value_length, time_t expiration,
487 uint32_t flags) {
488 memcached_return_t rc;
489 LIBMEMCACHED_MEMCACHED_REPLACE_START();
490 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
491 expiration, flags, 0, REPLACE_OP);
492 LIBMEMCACHED_MEMCACHED_REPLACE_END();
493 return rc;
494 }
495
496 memcached_return_t memcached_prepend_by_key(memcached_st *ptr, const char *group_key,
497 size_t group_key_length, const char *key,
498 size_t key_length, const char *value,
499 size_t value_length, time_t expiration,
500 uint32_t flags) {
501 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
502 expiration, flags, 0, PREPEND_OP);
503 }
504
505 memcached_return_t memcached_append_by_key(memcached_st *ptr, const char *group_key,
506 size_t group_key_length, const char *key,
507 size_t key_length, const char *value,
508 size_t value_length, time_t expiration, uint32_t flags) {
509 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
510 expiration, flags, 0, APPEND_OP);
511 }
512
513 memcached_return_t memcached_cas_by_key(memcached_st *ptr, const char *group_key,
514 size_t group_key_length, const char *key, size_t key_length,
515 const char *value, size_t value_length, time_t expiration,
516 uint32_t flags, uint64_t cas) {
517 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
518 expiration, flags, cas, CAS_OP);
519 }