0700c0558ecc025451dbbfa096c8ebd79dcae3cd
[m6w6/libmemcached] / src / libmemcached / storage.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 enum memcached_storage_action_t { SET_OP, REPLACE_OP, ADD_OP, PREPEND_OP, APPEND_OP, CAS_OP };
19
20 /* Inline this */
21 static inline const char *storage_op_string(memcached_storage_action_t verb) {
22 switch (verb) {
23 case REPLACE_OP: return "replace ";
24
25 case ADD_OP: return "add ";
26
27 case PREPEND_OP: return "prepend ";
28
29 case APPEND_OP: return "append ";
30
31 case CAS_OP: return "cas ";
32
33 case SET_OP: break;
34 }
35
36 return "set ";
37 }
38
39 static inline bool can_by_encrypted(const memcached_storage_action_t verb) {
40 switch (verb) {
41 case SET_OP:
42 case ADD_OP:
43 case CAS_OP:
44 case REPLACE_OP: return true;
45
46 case APPEND_OP:
47 case PREPEND_OP: break;
48 }
49
50 return false;
51 }
52
53 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply) {
54 if (reply == false) {
55 switch (verb) {
56 case SET_OP: return PROTOCOL_BINARY_CMD_SETQ;
57
58 case ADD_OP: return PROTOCOL_BINARY_CMD_ADDQ;
59
60 case CAS_OP: /* FALLTHROUGH */
61 case REPLACE_OP: return PROTOCOL_BINARY_CMD_REPLACEQ;
62
63 case APPEND_OP: return PROTOCOL_BINARY_CMD_APPENDQ;
64
65 case PREPEND_OP: return PROTOCOL_BINARY_CMD_PREPENDQ;
66 }
67 }
68
69 switch (verb) {
70 case SET_OP: break;
71
72 case ADD_OP: return PROTOCOL_BINARY_CMD_ADD;
73
74 case CAS_OP: /* FALLTHROUGH */
75 case REPLACE_OP: return PROTOCOL_BINARY_CMD_REPLACE;
76
77 case APPEND_OP: return PROTOCOL_BINARY_CMD_APPEND;
78
79 case PREPEND_OP: return PROTOCOL_BINARY_CMD_PREPEND;
80 }
81
82 return PROTOCOL_BINARY_CMD_SET;
83 }
84
85 static memcached_return_t memcached_send_binary(Memcached *ptr, memcached_instance_st *server,
86 uint32_t server_key, const char *key,
87 const size_t key_length, const char *value,
88 const size_t value_length, const time_t expiration,
89 const uint32_t flags, const uint64_t cas,
90 const bool flush, const bool reply,
91 memcached_storage_action_t verb) {
92 protocol_binary_request_set request = {};
93 size_t send_length = sizeof(request.bytes);
94
95 initialize_binary_request(server, request.message.header);
96
97 request.message.header.request.opcode = get_com_code(verb, reply);
98 request.message.header.request.keylen =
99 htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
100 request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
101 if (verb == APPEND_OP or verb == PREPEND_OP) {
102 send_length -= 8; /* append & prepend does not contain extras! */
103 } else {
104 request.message.header.request.extlen = 8;
105 request.message.body.flags = htonl(flags);
106 request.message.body.expiration = htonl((uint32_t) expiration);
107 }
108
109 request.message.header.request.bodylen =
110 htonl((uint32_t)(key_length + memcached_array_size(ptr->_namespace) + value_length
111 + request.message.header.request.extlen));
112
113 if (cas) {
114 request.message.header.request.cas = memcached_htonll(cas);
115 }
116
117 libmemcached_io_vector_st vector[] = {
118 {NULL, 0},
119 {request.bytes, send_length},
120 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
121 {key, key_length},
122 {value, value_length}};
123
124 /* write the header */
125 memcached_return_t rc;
126 if ((rc = memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS) {
127 assert(memcached_last_error(server->root) != MEMCACHED_SUCCESS);
128 return memcached_last_error(server->root);
129 }
130
131 if (verb == SET_OP and ptr->number_of_replicas > 0) {
132 request.message.header.request.opcode = PROTOCOL_BINARY_CMD_SETQ;
133 WATCHPOINT_STRING("replicating");
134
135 for (uint32_t x = 0; x < ptr->number_of_replicas; x++) {
136 ++server_key;
137 if (server_key == memcached_server_count(ptr)) {
138 server_key = 0;
139 }
140
141 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
142
143 if (memcached_success(memcached_vdo(instance, vector, 5, false))) {
144 memcached_server_response_decrement(instance);
145 }
146 }
147 }
148
149 if (flush == false) {
150 return MEMCACHED_BUFFERED;
151 }
152
153 // No reply always assumes success
154 if (reply == false) {
155 return MEMCACHED_SUCCESS;
156 }
157
158 return memcached_response(server, NULL, 0, NULL);
159 }
160
161 static memcached_return_t
162 memcached_send_ascii(Memcached *ptr, memcached_instance_st *instance, const char *key,
163 const size_t key_length, const char *value, const size_t value_length,
164 const time_t expiration, const uint32_t flags, const uint64_t cas,
165 const bool flush, const bool reply, const memcached_storage_action_t verb) {
166 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
167 int flags_buffer_length = snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
168 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0) {
169 return memcached_set_error(
170 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
171 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
172 }
173
174 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
175 int expiration_buffer_length = snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu",
176 (unsigned long long) expiration);
177 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
178 {
179 return memcached_set_error(
180 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
181 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
182 }
183
184 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
185 int value_buffer_length =
186 snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long) value_length);
187 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0) {
188 return memcached_set_error(
189 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
190 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
191 }
192
193 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH + 1];
194 int cas_buffer_length = 0;
195 if (cas) {
196 cas_buffer_length = snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long) cas);
197 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0) {
198 return memcached_set_error(
199 *instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
200 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
201 }
202 }
203
204 libmemcached_io_vector_st vector[] = {
205 {NULL, 0},
206 {storage_op_string(verb), strlen(storage_op_string(verb))},
207 {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
208 {key, key_length},
209 {flags_buffer, size_t(flags_buffer_length)},
210 {expiration_buffer, size_t(expiration_buffer_length)},
211 {value_buffer, size_t(value_buffer_length)},
212 {cas_buffer, size_t(cas_buffer_length)},
213 {" noreply", reply ? 0 : memcached_literal_param_size(" noreply")},
214 {memcached_literal_param("\r\n")},
215 {value, value_length},
216 {memcached_literal_param("\r\n")}};
217
218 /* Send command header */
219 memcached_return_t rc = memcached_vdo(instance, vector, 12, flush);
220
221 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
222 if (reply == false) {
223 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
224 }
225
226 if (flush == false) {
227 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
228 }
229
230 if (rc == MEMCACHED_SUCCESS) {
231 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
232 rc = memcached_response(instance, buffer, sizeof(buffer), NULL);
233
234 if (rc == MEMCACHED_STORED) {
235 return MEMCACHED_SUCCESS;
236 }
237 }
238
239 assert(memcached_failed(rc));
240 #if 0
241 if (memcached_has_error(ptr) == false)
242 {
243 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
244 }
245 #endif
246
247 return rc;
248 }
249
250 static inline memcached_return_t
251 memcached_send(memcached_st *shell, const char *group_key, size_t group_key_length, const char *key,
252 size_t key_length, const char *value, size_t value_length, const time_t expiration,
253 const uint32_t flags, const uint64_t cas, memcached_storage_action_t verb) {
254 Memcached *ptr = memcached2Memcached(shell);
255 memcached_return_t rc;
256 if (memcached_failed(rc = initialize_query(ptr, true))) {
257 return rc;
258 }
259
260 if (memcached_failed(memcached_key_test(*ptr, (const char **) &key, &key_length, 1))) {
261 return memcached_last_error(ptr);
262 }
263
264 uint32_t server_key =
265 memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
266 memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
267
268 WATCHPOINT_SET(instance->io_wait_count.read = 0);
269 WATCHPOINT_SET(instance->io_wait_count.write = 0);
270
271 bool flush = true;
272 if (memcached_is_buffering(instance->root) and verb == SET_OP) {
273 flush = false;
274 }
275
276 bool reply = memcached_is_replying(ptr);
277
278 hashkit_string_st *destination = NULL;
279
280 if (memcached_is_encrypted(ptr)) {
281 if (can_by_encrypted(verb) == false) {
282 return memcached_set_error(
283 *ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
284 memcached_literal_param("Operation not allowed while encyrption is enabled"));
285 }
286
287 if ((destination = hashkit_encrypt(&ptr->hashkit, value, value_length)) == NULL) {
288 return rc;
289 }
290 value = hashkit_string_c_str(destination);
291 value_length = hashkit_string_length(destination);
292 }
293
294 if (memcached_is_binary(ptr)) {
295 rc = memcached_send_binary(ptr, instance, server_key, key, key_length, value, value_length,
296 expiration, flags, cas, flush, reply, verb);
297 } else {
298 rc = memcached_send_ascii(ptr, instance, key, key_length, value, value_length, expiration,
299 flags, cas, flush, reply, verb);
300 }
301
302 hashkit_string_free(destination);
303
304 return rc;
305 }
306
307 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
308 const char *value, size_t value_length, time_t expiration,
309 uint32_t flags) {
310 memcached_return_t rc;
311 LIBMEMCACHED_MEMCACHED_SET_START();
312 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
313 0, SET_OP);
314 LIBMEMCACHED_MEMCACHED_SET_END();
315 return rc;
316 }
317
318 memcached_return_t memcached_add(memcached_st *ptr, const char *key, size_t key_length,
319 const char *value, size_t value_length, time_t expiration,
320 uint32_t flags) {
321 memcached_return_t rc;
322 LIBMEMCACHED_MEMCACHED_ADD_START();
323 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
324 0, ADD_OP);
325
326 LIBMEMCACHED_MEMCACHED_ADD_END();
327 return rc;
328 }
329
330 memcached_return_t memcached_replace(memcached_st *ptr, const char *key, size_t key_length,
331 const char *value, size_t value_length, time_t expiration,
332 uint32_t flags) {
333 memcached_return_t rc;
334 LIBMEMCACHED_MEMCACHED_REPLACE_START();
335 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
336 0, REPLACE_OP);
337 LIBMEMCACHED_MEMCACHED_REPLACE_END();
338 return rc;
339 }
340
341 memcached_return_t memcached_prepend(memcached_st *ptr, const char *key, size_t key_length,
342 const char *value, size_t value_length, time_t expiration,
343 uint32_t flags) {
344 memcached_return_t rc;
345 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
346 0, PREPEND_OP);
347 return rc;
348 }
349
350 memcached_return_t memcached_append(memcached_st *ptr, const char *key, size_t key_length,
351 const char *value, size_t value_length, time_t expiration,
352 uint32_t flags) {
353 memcached_return_t rc;
354 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
355 0, APPEND_OP);
356 return rc;
357 }
358
359 memcached_return_t memcached_cas(memcached_st *ptr, const char *key, size_t key_length,
360 const char *value, size_t value_length, time_t expiration,
361 uint32_t flags, uint64_t cas) {
362 memcached_return_t rc;
363 rc = memcached_send(ptr, key, key_length, key, key_length, value, value_length, expiration, flags,
364 cas, CAS_OP);
365 return rc;
366 }
367
368 memcached_return_t memcached_set_by_key(memcached_st *ptr, const char *group_key,
369 size_t group_key_length, const char *key, size_t key_length,
370 const char *value, size_t value_length, time_t expiration,
371 uint32_t flags) {
372 memcached_return_t rc;
373 LIBMEMCACHED_MEMCACHED_SET_START();
374 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
375 expiration, flags, 0, SET_OP);
376 LIBMEMCACHED_MEMCACHED_SET_END();
377 return rc;
378 }
379
380 memcached_return_t memcached_add_by_key(memcached_st *ptr, const char *group_key,
381 size_t group_key_length, const char *key, size_t key_length,
382 const char *value, size_t value_length, time_t expiration,
383 uint32_t flags) {
384 memcached_return_t rc;
385 LIBMEMCACHED_MEMCACHED_ADD_START();
386 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
387 expiration, flags, 0, ADD_OP);
388 LIBMEMCACHED_MEMCACHED_ADD_END();
389 return rc;
390 }
391
392 memcached_return_t memcached_replace_by_key(memcached_st *ptr, const char *group_key,
393 size_t group_key_length, const char *key,
394 size_t key_length, const char *value,
395 size_t value_length, time_t expiration,
396 uint32_t flags) {
397 memcached_return_t rc;
398 LIBMEMCACHED_MEMCACHED_REPLACE_START();
399 rc = memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
400 expiration, flags, 0, REPLACE_OP);
401 LIBMEMCACHED_MEMCACHED_REPLACE_END();
402 return rc;
403 }
404
405 memcached_return_t memcached_prepend_by_key(memcached_st *ptr, const char *group_key,
406 size_t group_key_length, const char *key,
407 size_t key_length, const char *value,
408 size_t value_length, time_t expiration,
409 uint32_t flags) {
410 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
411 expiration, flags, 0, PREPEND_OP);
412 }
413
414 memcached_return_t memcached_append_by_key(memcached_st *ptr, const char *group_key,
415 size_t group_key_length, const char *key,
416 size_t key_length, const char *value,
417 size_t value_length, time_t expiration, uint32_t flags) {
418 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
419 expiration, flags, 0, APPEND_OP);
420 }
421
422 memcached_return_t memcached_cas_by_key(memcached_st *ptr, const char *group_key,
423 size_t group_key_length, const char *key, size_t key_length,
424 const char *value, size_t value_length, time_t expiration,
425 uint32_t flags, uint64_t cas) {
426 return memcached_send(ptr, group_key, group_key_length, key, key_length, value, value_length,
427 expiration, flags, cas, CAS_OP);
428 }