3d82edf52d6eb1dd622ddc58dee7d499d043ef26
[awesomized/libmemcached] / src / libmemcached / storage.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached-awesome - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 enum memcached_storage_action_t { SET_OP, REPLACE_OP, ADD_OP, PREPEND_OP, APPEND_OP, CAS_OP };
19
20 /* Inline this */
21 static inline const char *storage_op_string(memcached_storage_action_t verb) {
22 switch (verb) {
23 case REPLACE_OP:
24 return "replace ";
25
26 case ADD_OP:
27 return "add ";
28
29 case PREPEND_OP:
30 return "prepend ";
31
32 case APPEND_OP:
33 return "append ";
34
35 case CAS_OP:
36 return "cas ";
37
38 case SET_OP:
39 break;
40 }
41
42 return "set ";
43 }
44
45 static inline bool can_by_encrypted(const memcached_storage_action_t verb) {
46 switch (verb) {
47 case SET_OP:
48 case ADD_OP:
49 case CAS_OP:
50 case REPLACE_OP:
51 return true;
52
53 case APPEND_OP:
54 case PREPEND_OP:
55 break;
56 }
57
58 return false;
59 }
60
61 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply) {
62 if (reply == false) {
63 switch (verb) {
64 case SET_OP:
65 return PROTOCOL_BINARY_CMD_SETQ;
66
67 case ADD_OP:
68 return PROTOCOL_BINARY_CMD_ADDQ;
69
70 case CAS_OP: /* FALLTHROUGH */
71 case REPLACE_OP:
72 return PROTOCOL_BINARY_CMD_REPLACEQ;
73
74 case APPEND_OP:
75 return PROTOCOL_BINARY_CMD_APPENDQ;
76
77 case PREPEND_OP:
78 return PROTOCOL_BINARY_CMD_PREPENDQ;
79 }
80 }
81
82 switch (verb) {
83 case SET_OP:
84 break;
85
86 case ADD_OP:
87 return PROTOCOL_BINARY_CMD_ADD;
88
89 case CAS_OP: /* FALLTHROUGH */
90 case REPLACE_OP:
91 return PROTOCOL_BINARY_CMD_REPLACE;
92
93 case APPEND_OP:
94 return PROTOCOL_BINARY_CMD_APPEND;
95
96 case PREPEND_OP:
97 return PROTOCOL_BINARY_CMD_PREPEND;
98 }
99
100 return PROTOCOL_BINARY_CMD_SET;
101 }
102
103 static memcached_return_t memcached_send_binary(Memcached *ptr, memcached_instance_st *server,
104 uint32_t server_key, const char *key,
105 const size_t key_length, const char *value,
106 const size_t value_length, const time_t expiration,
107 const uint32_t flags, const uint64_t cas,
108 const bool flush, const bool reply,
109 memcached_storage_action_t verb) {
110 protocol_binary_request_set request = {};
111 size_t send_length = sizeof(request.bytes);
112
113 initialize_binary_request(server, request.message.header);
114
115 request.message.header.request.opcode = get_com_code(verb, reply);
116 request.message.header.request.keylen =
117 htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
118 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
119 if (verb == APPEND_OP or verb == PREPEND_OP) {
120 send_length -= 8; /* append & prepend does not contain extras! */
121 } else {
122 request.message.header.request.extlen = 8;
123 request.message.body.flags = htonl(flags);
124 request.message.body.expiration = htonl((uint32_t) expiration);
125 }
126
127 request.message.header.request.bodylen =
128 htonl((uint32_t)(key_length + memcached_array_size(ptr->_namespace) + value_length
129 + request.message.header.request.extlen));
130
131 if (cas) {
132 request.message.header.request.cas = memcached_htonll(cas);
133 }
134
135 libmemcached_io_vector_st vector[] = {
136 {NULL, 0},
137 {request.bytes, send_length},
138 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
139 {key, key_length},
140 {value, value_length}};
141
142 /* write the header */
143 memcached_return_t rc;
144 if ((rc = memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS) {
145 assert(memcached_last_error(server->root) != MEMCACHED_SUCCESS);
146 return memcached_last_error(server->root);
147 }
148
149 if (verb == SET_OP and ptr->number_of_replicas > 0) {
150 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_SETQ;
151 WATCHPOINT_STRING("replicating");
152
153 for (uint32_t x = 0; x < ptr->number_of_replicas; x++) {
154 ++server_key;
155 if (server_key == memcached_server_count(ptr)) {
156 server_key = 0;
157 }
158
159 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
160
161 if (memcached_success(memcached_vdo(instance, vector, 5, false))) {
162 memcached_server_response_decrement(instance);
163 }
164 }
165 }
166
167 if (flush == false) {
168 return MEMCACHED_BUFFERED;
169 }
170
171 // No reply always assumes success
172 if (reply == false) {
173 return MEMCACHED_SUCCESS;
174 }
175
176 return memcached_response(server, NULL, 0, NULL);
177 }
178
179 static memcached_return_t
180 memcached_send_meta(memcached_st *ptr, memcached_instance_st *instance,
181 const char *key, size_t key_len,
182 const char *val, size_t val_len,
183 time_t expiration, uint32_t flags, uint64_t cas,
184 bool flush, memcached_storage_action_t verb) {
185 static const char modes[] = "SREPAS";
186 char fl_buf[32] = " F", cs_buf[32] = " C", ex_buf[32] = " T", sz_buf[32] = " S";
187 size_t io_num = 0, fl_len = strlen(fl_buf), cs_len = strlen(cs_buf), ex_len = strlen(ex_buf), sz_len = strlen(sz_buf);
188 libmemcached_io_vector_st io_vec[16] = {};
189
190 io_vec[io_num++] = {memcached_literal_param("ms ")};
191 io_vec[io_num++] = {memcached_array_string(ptr->_namespace),
192 memcached_array_size(ptr->_namespace)};
193 io_vec[io_num++] = {key, key_len};
194
195 if (verb != SET_OP) {
196 io_vec[io_num++] = {memcached_literal_param(" M")};
197 io_vec[io_num++] = {&modes[verb], 1};
198 }
199
200 if (!memcached_is_replying(ptr)) {
201 io_vec[io_num++] = { memcached_literal_param(" q")};
202 }
203
204 fl_len += snprintf(fl_buf + fl_len, sizeof(fl_buf) - fl_len, "%" PRIu32, flags);
205 io_vec[io_num++] = {fl_buf, fl_len};
206 if (expiration) {
207 ex_len += snprintf(ex_buf + ex_len, sizeof(ex_buf) - ex_len, "%" PRIi64, (int64_t) expiration);
208 io_vec[io_num++] = {ex_buf, ex_len};
209 }
210 if (cas) {
211 cs_len += snprintf(cs_buf + cs_len, sizeof(cs_buf) - cs_len, "%" PRIu64, cas);
212 io_vec[io_num++] = {cs_buf, cs_len};
213 }
214
215 /* we have to send a data block even if it's empty, else memcached errors out with ITEM TOO BIG */
216 sz_len += snprintf(sz_buf + sz_len, sizeof(sz_buf) - sz_len, "%" PRIu64, (uint64_t) val_len);
217 io_vec[io_num++] = {sz_buf, sz_len};
218 io_vec[io_num++] = {memcached_literal_param("\r\n")};
219 io_vec[io_num++] = {val, val_len};
220 io_vec[io_num++] = {memcached_literal_param("\r\n")};
221
222 /* Send command header */
223 memcached_return_t rc = memcached_vdo(instance, io_vec, io_num, flush);
224
225 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
226 if (!memcached_is_replying(ptr)) {
227 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
228 }
229
230 if (!flush) {
231 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
232 }
233
234 if (rc == MEMCACHED_SUCCESS) {
235 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
236 rc = memcached_response(instance, buffer, sizeof(buffer), NULL);
237
238 if (rc == MEMCACHED_SUCCESS) {
239 return MEMCACHED_SUCCESS;
240 }
241 }
242
243 assert(memcached_failed(rc));
244
245 return rc;
246 }
247
248 static memcached_return_t
249 memcached_send_ascii(Memcached *ptr, memcached_instance_st *instance, const char *key,
250 const size_t key_length, const char *value, const size_t value_length,
251 const time_t expiration, const uint32_t flags, const uint64_t cas,
252 const bool flush, const bool reply, const memcached_storage_action_t verb) {
253 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
254 int flags_buffer_length = snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
255 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0) {
256 return memcached_set_error(
257 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
258 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
259 }
260
261 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
262 int expiration_buffer_length = snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu",
263 (unsigned long long) expiration);
264 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
265 {
266 return memcached_set_error(
267 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
268 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
269 }
270
271 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
272 int value_buffer_length =
273 snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long) value_length);
274 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0) {
275 return memcached_set_error(
276 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
277 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
278 }
279
280 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
281 int cas_buffer_length = 0;
282 if (cas) {
283 cas_buffer_length = snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long) cas);
284 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0) {
285 return memcached_set_error(
286 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
287 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
288 }
289 }
290
291 libmemcached_io_vector_st vector[] = {
292 {NULL, 0},
293 {storage_op_string(verb), strlen(storage_op_string(verb))},
294 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
295 {key, key_length},
296 {flags_buffer, size_t(flags_buffer_length)},
297 {expiration_buffer, size_t(expiration_buffer_length)},
298 {value_buffer, size_t(value_buffer_length)},
299 {cas_buffer, size_t(cas_buffer_length)},
300 {" noreply", reply ? 0 : memcached_literal_param_size(" noreply")},
301 {memcached_literal_param("\r\n")},
302 {value, value_length},
303 {memcached_literal_param("\r\n")}};
304
305 /* Send command header */
306 memcached_return_t rc = memcached_vdo(instance, vector, 12, flush);
307
308 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
309 if (reply == false) {
310 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
311 }
312
313 if (flush == false) {
314 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
315 }
316
317 if (rc == MEMCACHED_SUCCESS) {
318 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
319 rc = memcached_response(instance, buffer, sizeof(buffer), NULL);
320
321 if (rc == MEMCACHED_STORED) {
322 return MEMCACHED_SUCCESS;
323 }
324 }
325
326 assert(memcached_failed(rc));
327 #if 0
328 if (memcached_has_error(ptr) == false)
329 {
330 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
331 }
332 #endif
333
334 return rc;
335 }
336
337 static inline memcached_return_t
338 memcached_send(memcached_st *shell, const char *group_key, size_t group_key_length, const char *key,
339 size_t key_length, const char *value, size_t value_length, const time_t expiration,
340 const uint32_t flags, const uint64_t cas, memcached_storage_action_t verb) {
341 Memcached *ptr = memcached2Memcached(shell);
342 memcached_return_t rc;
343 if (memcached_failed(rc = initialize_query(ptr, true))) {
344 return rc;
345 }
346
347 if (memcached_failed(memcached_key_test(*ptr, (const char **) &key, &key_length, 1))) {
348 return memcached_last_error(ptr);
349 }
350
351 uint32_t server_key =
352 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
353 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
354
355 WATCHPOINT_SET(instance->io_wait_count.read = 0);
356 WATCHPOINT_SET(instance->io_wait_count.write = 0);
357
358 bool flush = true;
359 if (memcached_is_buffering(instance->root) and verb == SET_OP) {
360 flush = false;
361 }
362
363 bool reply = memcached_is_replying(ptr);
364
365 hashkit_string_st *destination = NULL;
366
367 if (memcached_is_encrypted(ptr)) {
368 if (can_by_encrypted(verb) == false) {
369 return memcached_set_error(
370 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
371 memcached_literal_param("Operation not allowed while encyrption is enabled"));
372 }
373
374 if ((destination = hashkit_encrypt(&ptr->hashkit, value, value_length)) == NULL) {
375 return rc;
376 }
377 value = hashkit_string_c_str(destination);
378 value_length = hashkit_string_length(destination);
379 }
380
381 if (memcached_is_binary(ptr)) {
382 rc = memcached_send_binary(ptr, instance, server_key, key, key_length, value, value_length,
383 expiration, flags, cas, flush, reply, verb);
384 } else if (memcached_is_meta(ptr)) {
385 rc = memcached_send_meta(ptr, instance, key, key_length, value, value_length, expiration,
386 flags, cas, flush, verb);
387 } else {
388 rc = memcached_send_ascii(ptr, instance, key, key_length, value, value_length, expiration,
389 flags, cas, flush, reply, verb);
390 }
391
392 hashkit_string_free(destination);
393
394 return rc;
395 }
396
397 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
398 const char *value, size_t value_length, time_t expiration,
399 uint32_t flags) {
400 memcached_return_t rc;
401 LIBMEMCACHED_MEMCACHED_SET_START();
402 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
403 0, SET_OP);
404 LIBMEMCACHED_MEMCACHED_SET_END();
405 return rc;
406 }
407
408 memcached_return_t memcached_add(memcached_st *ptr, const char *key, size_t key_length,
409 const char *value, size_t value_length, time_t expiration,
410 uint32_t flags) {
411 memcached_return_t rc;
412 LIBMEMCACHED_MEMCACHED_ADD_START();
413 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
414 0, ADD_OP);
415
416 LIBMEMCACHED_MEMCACHED_ADD_END();
417 return rc;
418 }
419
420 memcached_return_t memcached_replace(memcached_st *ptr, const char *key, size_t key_length,
421 const char *value, size_t value_length, time_t expiration,
422 uint32_t flags) {
423 memcached_return_t rc;
424 LIBMEMCACHED_MEMCACHED_REPLACE_START();
425 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
426 0, REPLACE_OP);
427 LIBMEMCACHED_MEMCACHED_REPLACE_END();
428 return rc;
429 }
430
431 memcached_return_t memcached_prepend(memcached_st *ptr, const char *key, size_t key_length,
432 const char *value, size_t value_length, time_t expiration,
433 uint32_t flags) {
434 memcached_return_t rc;
435 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
436 0, PREPEND_OP);
437 return rc;
438 }
439
440 memcached_return_t memcached_append(memcached_st *ptr, const char *key, size_t key_length,
441 const char *value, size_t value_length, time_t expiration,
442 uint32_t flags) {
443 memcached_return_t rc;
444 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
445 0, APPEND_OP);
446 return rc;
447 }
448
449 memcached_return_t memcached_cas(memcached_st *ptr, const char *key, size_t key_length,
450 const char *value, size_t value_length, time_t expiration,
451 uint32_t flags, uint64_t cas) {
452 memcached_return_t rc;
453 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
454 cas, CAS_OP);
455 return rc;
456 }
457
458 memcached_return_t memcached_set_by_key(memcached_st *ptr, const char *group_key,
459 size_t group_key_length, const char *key, size_t key_length,
460 const char *value, size_t value_length, time_t expiration,
461 uint32_t flags) {
462 memcached_return_t rc;
463 LIBMEMCACHED_MEMCACHED_SET_START();
464 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
465 expiration, flags, 0, SET_OP);
466 LIBMEMCACHED_MEMCACHED_SET_END();
467 return rc;
468 }
469
470 memcached_return_t memcached_add_by_key(memcached_st *ptr, const char *group_key,
471 size_t group_key_length, const char *key, size_t key_length,
472 const char *value, size_t value_length, time_t expiration,
473 uint32_t flags) {
474 memcached_return_t rc;
475 LIBMEMCACHED_MEMCACHED_ADD_START();
476 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
477 expiration, flags, 0, ADD_OP);
478 LIBMEMCACHED_MEMCACHED_ADD_END();
479 return rc;
480 }
481
482 memcached_return_t memcached_replace_by_key(memcached_st *ptr, const char *group_key,
483 size_t group_key_length, const char *key,
484 size_t key_length, const char *value,
485 size_t value_length, time_t expiration,
486 uint32_t flags) {
487 memcached_return_t rc;
488 LIBMEMCACHED_MEMCACHED_REPLACE_START();
489 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
490 expiration, flags, 0, REPLACE_OP);
491 LIBMEMCACHED_MEMCACHED_REPLACE_END();
492 return rc;
493 }
494
495 memcached_return_t memcached_prepend_by_key(memcached_st *ptr, const char *group_key,
496 size_t group_key_length, const char *key,
497 size_t key_length, const char *value,
498 size_t value_length, time_t expiration,
499 uint32_t flags) {
500 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
501 expiration, flags, 0, PREPEND_OP);
502 }
503
504 memcached_return_t memcached_append_by_key(memcached_st *ptr, const char *group_key,
505 size_t group_key_length, const char *key,
506 size_t key_length, const char *value,
507 size_t value_length, time_t expiration, uint32_t flags) {
508 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
509 expiration, flags, 0, APPEND_OP);
510 }
511
512 memcached_return_t memcached_cas_by_key(memcached_st *ptr, const char *group_key,
513 size_t group_key_length, const char *key, size_t key_length,
514 const char *value, size_t value_length, time_t expiration,
515 uint32_t flags, uint64_t cas) {
516 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
517 expiration, flags, cas, CAS_OP);
518 }