46ae15a953042d71954f9fd423645e81a1b15922
[awesomized/libmemcached] / libmemcached / storage.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 enum memcached_storage_action_t {
42 SET_OP,
43 REPLACE_OP,
44 ADD_OP,
45 PREPEND_OP,
46 APPEND_OP,
47 CAS_OP
48 };
49
50 /* Inline this */
51 static inline const char *storage_op_string(memcached_storage_action_t verb)
52 {
53 switch (verb)
54 {
55 case REPLACE_OP:
56 return "replace ";
57
58 case ADD_OP:
59 return "add ";
60
61 case PREPEND_OP:
62 return "prepend ";
63
64 case APPEND_OP:
65 return "append ";
66
67 case CAS_OP:
68 return "cas ";
69
70 case SET_OP:
71 break;
72 }
73
74 return "set ";
75 }
76
77 static inline uint8_t can_by_encrypted(const memcached_storage_action_t verb)
78 {
79 switch (verb)
80 {
81 case SET_OP:
82 case ADD_OP:
83 case CAS_OP:
84 case REPLACE_OP:
85 return true;
86
87 case APPEND_OP:
88 case PREPEND_OP:
89 break;
90 }
91
92 return false;
93 }
94
95 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply)
96 {
97 if (reply == false)
98 {
99 switch (verb)
100 {
101 case SET_OP:
102 return PROTOCOL_BINARY_CMD_SETQ;
103
104 case ADD_OP:
105 return PROTOCOL_BINARY_CMD_ADDQ;
106
107 case CAS_OP: /* FALLTHROUGH */
108 case REPLACE_OP:
109 return PROTOCOL_BINARY_CMD_REPLACEQ;
110
111 case APPEND_OP:
112 return PROTOCOL_BINARY_CMD_APPENDQ;
113
114 case PREPEND_OP:
115 return PROTOCOL_BINARY_CMD_PREPENDQ;
116 }
117 }
118
119 switch (verb)
120 {
121 case SET_OP:
122 break;
123
124 case ADD_OP:
125 return PROTOCOL_BINARY_CMD_ADD;
126
127 case CAS_OP: /* FALLTHROUGH */
128 case REPLACE_OP:
129 return PROTOCOL_BINARY_CMD_REPLACE;
130
131 case APPEND_OP:
132 return PROTOCOL_BINARY_CMD_APPEND;
133
134 case PREPEND_OP:
135 return PROTOCOL_BINARY_CMD_PREPEND;
136 }
137
138 return PROTOCOL_BINARY_CMD_SET;
139 }
140
141 static memcached_return_t memcached_send_binary(Memcached *ptr,
142 memcached_instance_st* server,
143 uint32_t server_key,
144 const char *key,
145 const size_t key_length,
146 const char *value,
147 const size_t value_length,
148 const time_t expiration,
149 const uint32_t flags,
150 const uint64_t cas,
151 const bool flush,
152 const bool reply,
153 memcached_storage_action_t verb)
154 {
155 protocol_binary_request_set request= {};
156 size_t send_length= sizeof(request.bytes);
157
158 initialize_binary_request(server, request.message.header);
159
160 request.message.header.request.opcode= get_com_code(verb, reply);
161 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
162 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
163 if (verb == APPEND_OP or verb == PREPEND_OP)
164 {
165 send_length -= 8; /* append & prepend does not contain extras! */
166 }
167 else
168 {
169 request.message.header.request.extlen= 8;
170 request.message.body.flags= htonl(flags);
171 request.message.body.expiration= htonl((uint32_t)expiration);
172 }
173
174 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->_namespace) + value_length +
175 request.message.header.request.extlen));
176
177 if (cas)
178 {
179 request.message.header.request.cas= memcached_htonll(cas);
180 }
181
182 libmemcached_io_vector_st vector[]=
183 {
184 { NULL, 0 },
185 { request.bytes, send_length },
186 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
187 { key, key_length },
188 { value, value_length }
189 };
190
191 /* write the header */
192 memcached_return_t rc;
193 if ((rc= memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS)
194 {
195 memcached_io_reset(server);
196
197 #if 0
198 if (memcached_has_error(ptr))
199 {
200 memcached_set_error(*server, rc, MEMCACHED_AT);
201 }
202 #endif
203
204 assert(memcached_last_error(server->root) != MEMCACHED_SUCCESS);
205 return memcached_last_error(server->root);
206 }
207
208 if (verb == SET_OP and ptr->number_of_replicas > 0)
209 {
210 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
211 WATCHPOINT_STRING("replicating");
212
213 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
214 {
215 ++server_key;
216 if (server_key == memcached_server_count(ptr))
217 {
218 server_key= 0;
219 }
220
221 memcached_instance_st* instance= memcached_instance_fetch(ptr, server_key);
222
223 if (memcached_vdo(instance, vector, 5, false) != MEMCACHED_SUCCESS)
224 {
225 memcached_io_reset(instance);
226 }
227 else
228 {
229 memcached_server_response_decrement(instance);
230 }
231 }
232 }
233
234 if (flush == false)
235 {
236 return MEMCACHED_BUFFERED;
237 }
238
239 // No reply always assumes success
240 if (reply == false)
241 {
242 return MEMCACHED_SUCCESS;
243 }
244
245 return memcached_response(server, NULL, 0, NULL);
246 }
247
248 static memcached_return_t memcached_send_ascii(Memcached *ptr,
249 memcached_instance_st* instance,
250 const char *key,
251 const size_t key_length,
252 const char *value,
253 const size_t value_length,
254 const time_t expiration,
255 const uint32_t flags,
256 const uint64_t cas,
257 const bool flush,
258 const bool reply,
259 const memcached_storage_action_t verb)
260 {
261 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
262 int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
263 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0)
264 {
265 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
266 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
267 }
268
269 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
270 int expiration_buffer_length= snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu", (unsigned long long)expiration);
271 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
272 {
273 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
274 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
275 }
276
277 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
278 int value_buffer_length= snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long)value_length);
279 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0)
280 {
281 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
282 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
283 }
284
285 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
286 int cas_buffer_length= 0;
287 if (cas)
288 {
289 cas_buffer_length= snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long)cas);
290 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0)
291 {
292 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
293 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
294 }
295 }
296
297 libmemcached_io_vector_st vector[]=
298 {
299 { NULL, 0 },
300 { storage_op_string(verb), strlen(storage_op_string(verb))},
301 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
302 { key, key_length },
303 { flags_buffer, size_t(flags_buffer_length) },
304 { expiration_buffer, size_t(expiration_buffer_length) },
305 { value_buffer, size_t(value_buffer_length) },
306 { cas_buffer, size_t(cas_buffer_length) },
307 { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
308 { memcached_literal_param("\r\n") },
309 { value, value_length },
310 { memcached_literal_param("\r\n") }
311 };
312
313 /* Send command header */
314 memcached_return_t rc= memcached_vdo(instance, vector, 12, flush);
315
316 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
317 if (reply == false)
318 {
319 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
320 }
321
322 if (flush == false)
323 {
324 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
325 }
326
327 if (rc == MEMCACHED_SUCCESS)
328 {
329 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
330 rc= memcached_response(instance, buffer, sizeof(buffer), NULL);
331
332 if (rc == MEMCACHED_STORED)
333 {
334 return MEMCACHED_SUCCESS;
335 }
336 }
337
338 if (rc == MEMCACHED_WRITE_FAILURE)
339 {
340 memcached_io_reset(instance);
341 }
342
343 assert(memcached_failed(rc));
344 #if 0
345 if (memcached_has_error(ptr) == false)
346 {
347 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
348 }
349 #endif
350
351 return rc;
352 }
353
354 static inline memcached_return_t memcached_send(memcached_st *shell,
355 const char *group_key, size_t group_key_length,
356 const char *key, size_t key_length,
357 const char *value, size_t value_length,
358 const time_t expiration,
359 const uint32_t flags,
360 const uint64_t cas,
361 memcached_storage_action_t verb)
362 {
363 Memcached* ptr= memcached2Memcached(shell);
364 memcached_return_t rc;
365 if (memcached_failed(rc= initialize_query(ptr, true)))
366 {
367 return rc;
368 }
369
370 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
371 {
372 return memcached_last_error(ptr);
373 }
374
375 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
376 memcached_instance_st* instance= memcached_instance_fetch(ptr, server_key);
377
378 WATCHPOINT_SET(instance->io_wait_count.read= 0);
379 WATCHPOINT_SET(instance->io_wait_count.write= 0);
380
381 bool flush= true;
382 if (memcached_is_buffering(instance->root) and verb == SET_OP)
383 {
384 flush= false;
385 }
386
387 bool reply= memcached_is_replying(ptr);
388
389 hashkit_string_st* destination= NULL;
390
391 if (memcached_is_encrypted(ptr))
392 {
393 if (can_by_encrypted(verb) == false)
394 {
395 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
396 memcached_literal_param("Operation not allowed while encyrption is enabled"));
397 }
398
399 if ((destination= hashkit_encrypt(&ptr->hashkit, value, value_length)) == NULL)
400 {
401 return rc;
402 }
403 value= hashkit_string_c_str(destination);
404 value_length= hashkit_string_length(destination);
405 }
406
407 if (memcached_is_binary(ptr))
408 {
409 rc= memcached_send_binary(ptr, instance, server_key,
410 key, key_length,
411 value, value_length, expiration,
412 flags, cas, flush, reply, verb);
413 }
414 else
415 {
416 rc= memcached_send_ascii(ptr, instance,
417 key, key_length,
418 value, value_length, expiration,
419 flags, cas, flush, reply, verb);
420 }
421
422 hashkit_string_free(destination);
423
424 return rc;
425 }
426
427
428 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
429 const char *value, size_t value_length,
430 time_t expiration,
431 uint32_t flags)
432 {
433 memcached_return_t rc;
434 LIBMEMCACHED_MEMCACHED_SET_START();
435 rc= memcached_send(ptr, key, key_length,
436 key, key_length, value, value_length,
437 expiration, flags, 0, SET_OP);
438 LIBMEMCACHED_MEMCACHED_SET_END();
439 return rc;
440 }
441
442 memcached_return_t memcached_add(memcached_st *ptr,
443 const char *key, size_t key_length,
444 const char *value, size_t value_length,
445 time_t expiration,
446 uint32_t flags)
447 {
448 memcached_return_t rc;
449 LIBMEMCACHED_MEMCACHED_ADD_START();
450 rc= memcached_send(ptr, key, key_length,
451 key, key_length, value, value_length,
452 expiration, flags, 0, ADD_OP);
453
454 LIBMEMCACHED_MEMCACHED_ADD_END();
455 return rc;
456 }
457
458 memcached_return_t memcached_replace(memcached_st *ptr,
459 const char *key, size_t key_length,
460 const char *value, size_t value_length,
461 time_t expiration,
462 uint32_t flags)
463 {
464 memcached_return_t rc;
465 LIBMEMCACHED_MEMCACHED_REPLACE_START();
466 rc= memcached_send(ptr, key, key_length,
467 key, key_length, value, value_length,
468 expiration, flags, 0, REPLACE_OP);
469 LIBMEMCACHED_MEMCACHED_REPLACE_END();
470 return rc;
471 }
472
473 memcached_return_t memcached_prepend(memcached_st *ptr,
474 const char *key, size_t key_length,
475 const char *value, size_t value_length,
476 time_t expiration,
477 uint32_t flags)
478 {
479 memcached_return_t rc;
480 rc= memcached_send(ptr, key, key_length,
481 key, key_length, value, value_length,
482 expiration, flags, 0, PREPEND_OP);
483 return rc;
484 }
485
486 memcached_return_t memcached_append(memcached_st *ptr,
487 const char *key, size_t key_length,
488 const char *value, size_t value_length,
489 time_t expiration,
490 uint32_t flags)
491 {
492 memcached_return_t rc;
493 rc= memcached_send(ptr, key, key_length,
494 key, key_length, value, value_length,
495 expiration, flags, 0, APPEND_OP);
496 return rc;
497 }
498
499 memcached_return_t memcached_cas(memcached_st *ptr,
500 const char *key, size_t key_length,
501 const char *value, size_t value_length,
502 time_t expiration,
503 uint32_t flags,
504 uint64_t cas)
505 {
506 memcached_return_t rc;
507 rc= memcached_send(ptr, key, key_length,
508 key, key_length, value, value_length,
509 expiration, flags, cas, CAS_OP);
510 return rc;
511 }
512
513 memcached_return_t memcached_set_by_key(memcached_st *ptr,
514 const char *group_key,
515 size_t group_key_length,
516 const char *key, size_t key_length,
517 const char *value, size_t value_length,
518 time_t expiration,
519 uint32_t flags)
520 {
521 memcached_return_t rc;
522 LIBMEMCACHED_MEMCACHED_SET_START();
523 rc= memcached_send(ptr, group_key, group_key_length,
524 key, key_length, value, value_length,
525 expiration, flags, 0, SET_OP);
526 LIBMEMCACHED_MEMCACHED_SET_END();
527 return rc;
528 }
529
530 memcached_return_t memcached_add_by_key(memcached_st *ptr,
531 const char *group_key, size_t group_key_length,
532 const char *key, size_t key_length,
533 const char *value, size_t value_length,
534 time_t expiration,
535 uint32_t flags)
536 {
537 memcached_return_t rc;
538 LIBMEMCACHED_MEMCACHED_ADD_START();
539 rc= memcached_send(ptr, group_key, group_key_length,
540 key, key_length, value, value_length,
541 expiration, flags, 0, ADD_OP);
542 LIBMEMCACHED_MEMCACHED_ADD_END();
543 return rc;
544 }
545
546 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
547 const char *group_key, size_t group_key_length,
548 const char *key, size_t key_length,
549 const char *value, size_t value_length,
550 time_t expiration,
551 uint32_t flags)
552 {
553 memcached_return_t rc;
554 LIBMEMCACHED_MEMCACHED_REPLACE_START();
555 rc= memcached_send(ptr, group_key, group_key_length,
556 key, key_length, value, value_length,
557 expiration, flags, 0, REPLACE_OP);
558 LIBMEMCACHED_MEMCACHED_REPLACE_END();
559 return rc;
560 }
561
562 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
563 const char *group_key, size_t group_key_length,
564 const char *key, size_t key_length,
565 const char *value, size_t value_length,
566 time_t expiration,
567 uint32_t flags)
568 {
569 return memcached_send(ptr, group_key, group_key_length,
570 key, key_length, value, value_length,
571 expiration, flags, 0, PREPEND_OP);
572 }
573
574 memcached_return_t memcached_append_by_key(memcached_st *ptr,
575 const char *group_key, size_t group_key_length,
576 const char *key, size_t key_length,
577 const char *value, size_t value_length,
578 time_t expiration,
579 uint32_t flags)
580 {
581 return memcached_send(ptr, group_key, group_key_length,
582 key, key_length, value, value_length,
583 expiration, flags, 0, APPEND_OP);
584 }
585
586 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
587 const char *group_key, size_t group_key_length,
588 const char *key, size_t key_length,
589 const char *value, size_t value_length,
590 time_t expiration,
591 uint32_t flags,
592 uint64_t cas)
593 {
594 return memcached_send(ptr, group_key, group_key_length,
595 key, key_length, value, value_length,
596 expiration, flags, cas, CAS_OP);
597 }
598