eacc04f0f69b3fc767003e10a05f9d8594c6d5dd
[awesomized/libmemcached] / src / libmemcached / storage.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached-awesome - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 enum memcached_storage_action_t { SET_OP, REPLACE_OP, ADD_OP, PREPEND_OP, APPEND_OP, CAS_OP };
19
20 /* Inline this */
21 static inline const char *storage_op_string(memcached_storage_action_t verb) {
22 switch (verb) {
23 case REPLACE_OP:
24 return "replace ";
25
26 case ADD_OP:
27 return "add ";
28
29 case PREPEND_OP:
30 return "prepend ";
31
32 case APPEND_OP:
33 return "append ";
34
35 case CAS_OP:
36 return "cas ";
37
38 case SET_OP:
39 break;
40 }
41
42 return "set ";
43 }
44
45 static inline bool can_be_encrypted(const memcached_storage_action_t verb) {
46 switch (verb) {
47 case SET_OP:
48 case ADD_OP:
49 case CAS_OP:
50 case REPLACE_OP:
51 return true;
52
53 case APPEND_OP:
54 case PREPEND_OP:
55 break;
56 }
57
58 return false;
59 }
60
61 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply) {
62 if (reply == false) {
63 switch (verb) {
64 case SET_OP:
65 return PROTOCOL_BINARY_CMD_SETQ;
66
67 case ADD_OP:
68 return PROTOCOL_BINARY_CMD_ADDQ;
69
70 case CAS_OP: /* FALLTHROUGH */
71 case REPLACE_OP:
72 return PROTOCOL_BINARY_CMD_REPLACEQ;
73
74 case APPEND_OP:
75 return PROTOCOL_BINARY_CMD_APPENDQ;
76
77 case PREPEND_OP:
78 return PROTOCOL_BINARY_CMD_PREPENDQ;
79 }
80 }
81
82 switch (verb) {
83 case SET_OP:
84 break;
85
86 case ADD_OP:
87 return PROTOCOL_BINARY_CMD_ADD;
88
89 case CAS_OP: /* FALLTHROUGH */
90 case REPLACE_OP:
91 return PROTOCOL_BINARY_CMD_REPLACE;
92
93 case APPEND_OP:
94 return PROTOCOL_BINARY_CMD_APPEND;
95
96 case PREPEND_OP:
97 return PROTOCOL_BINARY_CMD_PREPEND;
98 }
99
100 return PROTOCOL_BINARY_CMD_SET;
101 }
102
103 static memcached_return_t memcached_send_binary(Memcached *ptr, memcached_instance_st *server,
104 uint32_t server_key, const char *key,
105 const size_t key_length, const char *value,
106 const size_t value_length, const time_t expiration,
107 const uint32_t flags, const uint64_t cas,
108 const bool flush, const bool reply,
109 memcached_storage_action_t verb) {
110 protocol_binary_request_set request = {};
111 size_t send_length = sizeof(request.bytes);
112
113 initialize_binary_request(server, request.message.header);
114
115 request.message.header.request.opcode = get_com_code(verb, reply);
116 request.message.header.request.keylen =
117 htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
118 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
119 if (verb == APPEND_OP or verb == PREPEND_OP) {
120 send_length -= 8; /* append & prepend does not contain extras! */
121 } else {
122 request.message.header.request.extlen = 8;
123 request.message.body.flags = htonl(flags);
124 request.message.body.expiration = htonl((uint32_t) expiration);
125 }
126
127 request.message.header.request.bodylen =
128 htonl((uint32_t)(key_length + memcached_array_size(ptr->_namespace) + value_length
129 + request.message.header.request.extlen));
130
131 if (cas) {
132 request.message.header.request.cas = memcached_htonll(cas);
133 }
134
135 libmemcached_io_vector_st vector[] = {
136 {NULL, 0},
137 {request.bytes, send_length},
138 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
139 {key, key_length},
140 {value, value_length}};
141
142 /* write the header */
143 memcached_return_t rc;
144 if ((rc = memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS) {
145 assert(memcached_last_error(server->root) != MEMCACHED_SUCCESS);
146 return memcached_last_error(server->root);
147 }
148
149 if (verb == SET_OP and ptr->number_of_replicas > 0) {
150 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_SETQ;
151 WATCHPOINT_STRING("replicating");
152
153 for (uint32_t x = 0; x < ptr->number_of_replicas; x++) {
154 ++server_key;
155 if (server_key == memcached_server_count(ptr)) {
156 server_key = 0;
157 }
158
159 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
160
161 if (memcached_success(memcached_vdo(instance, vector, 5, false))) {
162 memcached_server_response_decrement(instance);
163 }
164 }
165 }
166
167 if (flush == false) {
168 return MEMCACHED_BUFFERED;
169 }
170
171 // No reply always assumes success
172 if (reply == false) {
173 return MEMCACHED_SUCCESS;
174 }
175
176 return memcached_response(server, NULL, 0, NULL);
177 }
178
179 static memcached_return_t
180 memcached_send_ascii(Memcached *ptr, memcached_instance_st *instance, const char *key,
181 const size_t key_length, const char *value, const size_t value_length,
182 const time_t expiration, const uint32_t flags, const uint64_t cas,
183 const bool flush, const bool reply, const memcached_storage_action_t verb) {
184 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
185 int flags_buffer_length = snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
186 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0) {
187 return memcached_set_error(
188 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
189 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
190 }
191
192 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
193 int expiration_buffer_length = snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu",
194 (unsigned long long) expiration);
195 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
196 {
197 return memcached_set_error(
198 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
199 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
200 }
201
202 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
203 int value_buffer_length =
204 snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long) value_length);
205 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0) {
206 return memcached_set_error(
207 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
208 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
209 }
210
211 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
212 int cas_buffer_length = 0;
213 if (cas) {
214 cas_buffer_length = snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long) cas);
215 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0) {
216 return memcached_set_error(
217 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
218 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
219 }
220 }
221
222 libmemcached_io_vector_st vector[] = {
223 {NULL, 0},
224 {storage_op_string(verb), strlen(storage_op_string(verb))},
225 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
226 {key, key_length},
227 {flags_buffer, size_t(flags_buffer_length)},
228 {expiration_buffer, size_t(expiration_buffer_length)},
229 {value_buffer, size_t(value_buffer_length)},
230 {cas_buffer, size_t(cas_buffer_length)},
231 {" noreply", reply ? 0 : memcached_literal_param_size(" noreply")},
232 {memcached_literal_param("\r\n")},
233 {value, value_length},
234 {memcached_literal_param("\r\n")}};
235
236 /* Send command header */
237 memcached_return_t rc = memcached_vdo(instance, vector, 12, flush);
238
239 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
240 if (reply == false) {
241 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
242 }
243
244 if (flush == false) {
245 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
246 }
247
248 if (rc == MEMCACHED_SUCCESS) {
249 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
250 rc = memcached_response(instance, buffer, sizeof(buffer), NULL);
251
252 if (rc == MEMCACHED_STORED) {
253 return MEMCACHED_SUCCESS;
254 }
255 }
256
257 assert(memcached_failed(rc));
258 #if 0
259 if (memcached_has_error(ptr) == false)
260 {
261 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
262 }
263 #endif
264
265 return rc;
266 }
267
268 static inline memcached_return_t
269 memcached_send(memcached_st *shell, const char *group_key, size_t group_key_length, const char *key,
270 size_t key_length, const char *value, size_t value_length, const time_t expiration,
271 const uint32_t flags, const uint64_t cas, memcached_storage_action_t verb) {
272 Memcached *ptr = memcached2Memcached(shell);
273 memcached_return_t rc;
274 if (memcached_failed(rc = initialize_query(ptr, true))) {
275 return rc;
276 }
277
278 if (memcached_failed(memcached_key_test(*ptr, (const char **) &key, &key_length, 1))) {
279 return memcached_last_error(ptr);
280 }
281
282 uint32_t server_key =
283 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
284 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
285
286 WATCHPOINT_SET(instance->io_wait_count.read = 0);
287 WATCHPOINT_SET(instance->io_wait_count.write = 0);
288
289 bool flush = true;
290 if (memcached_is_buffering(instance->root) and verb == SET_OP) {
291 flush = false;
292 }
293
294 bool reply = memcached_is_replying(ptr);
295
296 hashkit_string_st *destination = NULL;
297
298 if (memcached_is_encrypted(ptr)) {
299 if (can_be_encrypted(verb) == false) {
300 return memcached_set_error(
301 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
302 memcached_literal_param("Operation not allowed while encyrption is enabled"));
303 }
304
305 if ((destination = hashkit_encrypt(&ptr->hashkit, value, value_length)) == NULL) {
306 return rc;
307 }
308 value = hashkit_string_c_str(destination);
309 value_length = hashkit_string_length(destination);
310 }
311
312 if (memcached_is_binary(ptr)) {
313 rc = memcached_send_binary(ptr, instance, server_key, key, key_length, value, value_length,
314 expiration, flags, cas, flush, reply, verb);
315 } else {
316 rc = memcached_send_ascii(ptr, instance, key, key_length, value, value_length, expiration,
317 flags, cas, flush, reply, verb);
318 }
319
320 hashkit_string_free(destination);
321
322 return rc;
323 }
324
325 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
326 const char *value, size_t value_length, time_t expiration,
327 uint32_t flags) {
328 memcached_return_t rc;
329 LIBMEMCACHED_MEMCACHED_SET_START();
330 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
331 0, SET_OP);
332 LIBMEMCACHED_MEMCACHED_SET_END();
333 return rc;
334 }
335
336 memcached_return_t memcached_add(memcached_st *ptr, const char *key, size_t key_length,
337 const char *value, size_t value_length, time_t expiration,
338 uint32_t flags) {
339 memcached_return_t rc;
340 LIBMEMCACHED_MEMCACHED_ADD_START();
341 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
342 0, ADD_OP);
343
344 LIBMEMCACHED_MEMCACHED_ADD_END();
345 return rc;
346 }
347
348 memcached_return_t memcached_replace(memcached_st *ptr, const char *key, size_t key_length,
349 const char *value, size_t value_length, time_t expiration,
350 uint32_t flags) {
351 memcached_return_t rc;
352 LIBMEMCACHED_MEMCACHED_REPLACE_START();
353 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
354 0, REPLACE_OP);
355 LIBMEMCACHED_MEMCACHED_REPLACE_END();
356 return rc;
357 }
358
359 memcached_return_t memcached_prepend(memcached_st *ptr, const char *key, size_t key_length,
360 const char *value, size_t value_length, time_t expiration,
361 uint32_t flags) {
362 memcached_return_t rc;
363 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
364 0, PREPEND_OP);
365 return rc;
366 }
367
368 memcached_return_t memcached_append(memcached_st *ptr, const char *key, size_t key_length,
369 const char *value, size_t value_length, time_t expiration,
370 uint32_t flags) {
371 memcached_return_t rc;
372 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
373 0, APPEND_OP);
374 return rc;
375 }
376
377 memcached_return_t memcached_cas(memcached_st *ptr, const char *key, size_t key_length,
378 const char *value, size_t value_length, time_t expiration,
379 uint32_t flags, uint64_t cas) {
380 memcached_return_t rc;
381 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
382 cas, CAS_OP);
383 return rc;
384 }
385
386 memcached_return_t memcached_set_by_key(memcached_st *ptr, const char *group_key,
387 size_t group_key_length, const char *key, size_t key_length,
388 const char *value, size_t value_length, time_t expiration,
389 uint32_t flags) {
390 memcached_return_t rc;
391 LIBMEMCACHED_MEMCACHED_SET_START();
392 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
393 expiration, flags, 0, SET_OP);
394 LIBMEMCACHED_MEMCACHED_SET_END();
395 return rc;
396 }
397
398 memcached_return_t memcached_add_by_key(memcached_st *ptr, const char *group_key,
399 size_t group_key_length, const char *key, size_t key_length,
400 const char *value, size_t value_length, time_t expiration,
401 uint32_t flags) {
402 memcached_return_t rc;
403 LIBMEMCACHED_MEMCACHED_ADD_START();
404 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
405 expiration, flags, 0, ADD_OP);
406 LIBMEMCACHED_MEMCACHED_ADD_END();
407 return rc;
408 }
409
410 memcached_return_t memcached_replace_by_key(memcached_st *ptr, const char *group_key,
411 size_t group_key_length, const char *key,
412 size_t key_length, const char *value,
413 size_t value_length, time_t expiration,
414 uint32_t flags) {
415 memcached_return_t rc;
416 LIBMEMCACHED_MEMCACHED_REPLACE_START();
417 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
418 expiration, flags, 0, REPLACE_OP);
419 LIBMEMCACHED_MEMCACHED_REPLACE_END();
420 return rc;
421 }
422
423 memcached_return_t memcached_prepend_by_key(memcached_st *ptr, const char *group_key,
424 size_t group_key_length, const char *key,
425 size_t key_length, const char *value,
426 size_t value_length, time_t expiration,
427 uint32_t flags) {
428 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
429 expiration, flags, 0, PREPEND_OP);
430 }
431
432 memcached_return_t memcached_append_by_key(memcached_st *ptr, const char *group_key,
433 size_t group_key_length, const char *key,
434 size_t key_length, const char *value,
435 size_t value_length, time_t expiration, uint32_t flags) {
436 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
437 expiration, flags, 0, APPEND_OP);
438 }
439
440 memcached_return_t memcached_cas_by_key(memcached_st *ptr, const char *group_key,
441 size_t group_key_length, const char *key, size_t key_length,
442 const char *value, size_t value_length, time_t expiration,
443 uint32_t flags, uint64_t cas) {
444 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
445 expiration, flags, cas, CAS_OP);
446 }