Abstraction (which will save us merge hell with 1.2).
[awesomized/libmemcached] / libmemcached / storage.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 enum memcached_storage_action_t {
42 SET_OP,
43 REPLACE_OP,
44 ADD_OP,
45 PREPEND_OP,
46 APPEND_OP,
47 CAS_OP
48 };
49
50 /* Inline this */
51 static inline const char *storage_op_string(memcached_storage_action_t verb)
52 {
53 switch (verb)
54 {
55 case REPLACE_OP:
56 return "replace ";
57
58 case ADD_OP:
59 return "add ";
60
61 case PREPEND_OP:
62 return "prepend ";
63
64 case APPEND_OP:
65 return "append ";
66
67 case CAS_OP:
68 return "cas ";
69
70 case SET_OP:
71 break;
72 }
73
74 return "set ";
75 }
76
77 static inline uint8_t can_by_encrypted(const memcached_storage_action_t verb)
78 {
79 switch (verb)
80 {
81 case SET_OP:
82 case ADD_OP:
83 case CAS_OP:
84 case REPLACE_OP:
85 return true;
86
87 case APPEND_OP:
88 case PREPEND_OP:
89 break;
90 }
91
92 return false;
93 }
94
95 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply)
96 {
97 if (reply == false)
98 {
99 switch (verb)
100 {
101 case SET_OP:
102 return PROTOCOL_BINARY_CMD_SETQ;
103
104 case ADD_OP:
105 return PROTOCOL_BINARY_CMD_ADDQ;
106
107 case CAS_OP: /* FALLTHROUGH */
108 case REPLACE_OP:
109 return PROTOCOL_BINARY_CMD_REPLACEQ;
110
111 case APPEND_OP:
112 return PROTOCOL_BINARY_CMD_APPENDQ;
113
114 case PREPEND_OP:
115 return PROTOCOL_BINARY_CMD_PREPENDQ;
116 }
117 }
118
119 switch (verb)
120 {
121 case SET_OP:
122 break;
123
124 case ADD_OP:
125 return PROTOCOL_BINARY_CMD_ADD;
126
127 case CAS_OP: /* FALLTHROUGH */
128 case REPLACE_OP:
129 return PROTOCOL_BINARY_CMD_REPLACE;
130
131 case APPEND_OP:
132 return PROTOCOL_BINARY_CMD_APPEND;
133
134 case PREPEND_OP:
135 return PROTOCOL_BINARY_CMD_PREPEND;
136 }
137
138 return PROTOCOL_BINARY_CMD_SET;
139 }
140
141 static memcached_return_t memcached_send_binary(Memcached *ptr,
142 org::libmemcached::Instance* server,
143 uint32_t server_key,
144 const char *key,
145 const size_t key_length,
146 const char *value,
147 const size_t value_length,
148 const time_t expiration,
149 const uint32_t flags,
150 const uint64_t cas,
151 const bool flush,
152 const bool reply,
153 memcached_storage_action_t verb)
154 {
155 protocol_binary_request_set request= {};
156 size_t send_length= sizeof(request.bytes);
157
158 initialize_binary_request(server, request.message.header);
159
160 request.message.header.request.opcode= get_com_code(verb, reply);
161 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
162 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
163 if (verb == APPEND_OP or verb == PREPEND_OP)
164 {
165 send_length -= 8; /* append & prepend does not contain extras! */
166 }
167 else
168 {
169 request.message.header.request.extlen= 8;
170 request.message.body.flags= htonl(flags);
171 request.message.body.expiration= htonl((uint32_t)expiration);
172 }
173
174 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->_namespace) + value_length +
175 request.message.header.request.extlen));
176
177 if (cas)
178 {
179 request.message.header.request.cas= memcached_htonll(cas);
180 }
181
182 libmemcached_io_vector_st vector[]=
183 {
184 { NULL, 0 },
185 { request.bytes, send_length },
186 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
187 { key, key_length },
188 { value, value_length }
189 };
190
191 /* write the header */
192 memcached_return_t rc;
193 if ((rc= memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS)
194 {
195 memcached_io_reset(server);
196
197 #if 0
198 if (memcached_has_error(ptr))
199 {
200 memcached_set_error(*server, rc, MEMCACHED_AT);
201 }
202 #endif
203
204 assert(memcached_last_error(server->root) != MEMCACHED_SUCCESS);
205 return memcached_last_error(server->root);
206 }
207
208 if (verb == SET_OP and ptr->number_of_replicas > 0)
209 {
210 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
211 WATCHPOINT_STRING("replicating");
212
213 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
214 {
215 ++server_key;
216 if (server_key == memcached_server_count(ptr))
217 {
218 server_key= 0;
219 }
220
221 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, server_key);
222
223 if (memcached_vdo(instance, vector, 5, false) != MEMCACHED_SUCCESS)
224 {
225 memcached_io_reset(instance);
226 }
227 else
228 {
229 memcached_server_response_decrement(instance);
230 }
231 }
232 }
233
234 if (flush == false)
235 {
236 return MEMCACHED_BUFFERED;
237 }
238
239 // No reply always assumes success
240 if (reply == false)
241 {
242 return MEMCACHED_SUCCESS;
243 }
244
245 return memcached_response(server, NULL, 0, NULL);
246 }
247
248 static memcached_return_t memcached_send_ascii(Memcached *ptr,
249 org::libmemcached::Instance* instance,
250 const char *key,
251 const size_t key_length,
252 const char *value,
253 const size_t value_length,
254 const time_t expiration,
255 const uint32_t flags,
256 const uint64_t cas,
257 const bool flush,
258 const bool reply,
259 const memcached_storage_action_t verb)
260 {
261 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
262 int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
263 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0)
264 {
265 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
266 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
267 }
268
269 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
270 int expiration_buffer_length= snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu", (unsigned long long)expiration);
271 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
272 {
273 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
274 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
275 }
276
277 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
278 int value_buffer_length= snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long)value_length);
279 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0)
280 {
281 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
282 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
283 }
284
285 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
286 int cas_buffer_length= 0;
287 if (cas)
288 {
289 cas_buffer_length= snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long)cas);
290 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0)
291 {
292 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
293 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
294 }
295 }
296
297 libmemcached_io_vector_st vector[]=
298 {
299 { NULL, 0 },
300 { storage_op_string(verb), strlen(storage_op_string(verb))},
301 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
302 { key, key_length },
303 { flags_buffer, size_t(flags_buffer_length) },
304 { expiration_buffer, size_t(expiration_buffer_length) },
305 { value_buffer, size_t(value_buffer_length) },
306 { cas_buffer, size_t(cas_buffer_length) },
307 { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
308 { memcached_literal_param("\r\n") },
309 { value, value_length },
310 { memcached_literal_param("\r\n") }
311 };
312
313 /* Send command header */
314 memcached_return_t rc= memcached_vdo(instance, vector, 12, flush);
315
316 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
317 if (reply == false)
318 {
319 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
320 }
321
322 if (flush == false)
323 {
324 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
325 }
326
327 if (rc == MEMCACHED_SUCCESS)
328 {
329 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
330 rc= memcached_response(instance, buffer, sizeof(buffer), NULL);
331
332 if (rc == MEMCACHED_STORED)
333 {
334 return MEMCACHED_SUCCESS;
335 }
336 }
337
338 if (rc == MEMCACHED_WRITE_FAILURE)
339 {
340 memcached_io_reset(instance);
341 }
342
343 assert(memcached_failed(rc));
344 #if 0
345 if (memcached_has_error(ptr) == false)
346 {
347 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
348 }
349 #endif
350
351 return rc;
352 }
353
354 static inline memcached_return_t memcached_send(memcached_st *shell,
355 const char *group_key, size_t group_key_length,
356 const char *key, size_t key_length,
357 const char *value, size_t value_length,
358 const time_t expiration,
359 const uint32_t flags,
360 const uint64_t cas,
361 memcached_storage_action_t verb)
362 {
363 Memcached* ptr= memcached2Memcached(shell);
364 memcached_return_t rc;
365 if (memcached_failed(rc= initialize_query(ptr, true)))
366 {
367 return rc;
368 }
369
370 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
371 {
372 return memcached_last_error(ptr);
373 }
374
375 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
376 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, server_key);
377
378 WATCHPOINT_SET(instance->io_wait_count.read= 0);
379 WATCHPOINT_SET(instance->io_wait_count.write= 0);
380
381
382 bool flush= true;
383 if (memcached_is_buffering(instance->root) and verb == SET_OP)
384 {
385 flush= false;
386 }
387
388 bool reply= memcached_is_replying(ptr);
389
390 hashkit_string_st* destination= NULL;
391
392 if (memcached_is_encrypted(ptr))
393 {
394 if (can_by_encrypted(verb) == false)
395 {
396 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
397 memcached_literal_param("Operation not allowed while encyrption is enabled"));
398 }
399
400 if ((destination= hashkit_encrypt(&ptr->hashkit, value, value_length)) == NULL)
401 {
402 return rc;
403 }
404 value= hashkit_string_c_str(destination);
405 value_length= hashkit_string_length(destination);
406 }
407
408 if (memcached_is_binary(ptr))
409 {
410 rc= memcached_send_binary(ptr, instance, server_key,
411 key, key_length,
412 value, value_length, expiration,
413 flags, cas, flush, reply, verb);
414 }
415 else
416 {
417 rc= memcached_send_ascii(ptr, instance,
418 key, key_length,
419 value, value_length, expiration,
420 flags, cas, flush, reply, verb);
421 }
422
423 hashkit_string_free(destination);
424
425 return rc;
426 }
427
428
429 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
430 const char *value, size_t value_length,
431 time_t expiration,
432 uint32_t flags)
433 {
434 memcached_return_t rc;
435 LIBMEMCACHED_MEMCACHED_SET_START();
436 rc= memcached_send(ptr, key, key_length,
437 key, key_length, value, value_length,
438 expiration, flags, 0, SET_OP);
439 LIBMEMCACHED_MEMCACHED_SET_END();
440 return rc;
441 }
442
443 memcached_return_t memcached_add(memcached_st *ptr,
444 const char *key, size_t key_length,
445 const char *value, size_t value_length,
446 time_t expiration,
447 uint32_t flags)
448 {
449 memcached_return_t rc;
450 LIBMEMCACHED_MEMCACHED_ADD_START();
451 rc= memcached_send(ptr, key, key_length,
452 key, key_length, value, value_length,
453 expiration, flags, 0, ADD_OP);
454
455 LIBMEMCACHED_MEMCACHED_ADD_END();
456 return rc;
457 }
458
459 memcached_return_t memcached_replace(memcached_st *ptr,
460 const char *key, size_t key_length,
461 const char *value, size_t value_length,
462 time_t expiration,
463 uint32_t flags)
464 {
465 memcached_return_t rc;
466 LIBMEMCACHED_MEMCACHED_REPLACE_START();
467 rc= memcached_send(ptr, key, key_length,
468 key, key_length, value, value_length,
469 expiration, flags, 0, REPLACE_OP);
470 LIBMEMCACHED_MEMCACHED_REPLACE_END();
471 return rc;
472 }
473
474 memcached_return_t memcached_prepend(memcached_st *ptr,
475 const char *key, size_t key_length,
476 const char *value, size_t value_length,
477 time_t expiration,
478 uint32_t flags)
479 {
480 memcached_return_t rc;
481 rc= memcached_send(ptr, key, key_length,
482 key, key_length, value, value_length,
483 expiration, flags, 0, PREPEND_OP);
484 return rc;
485 }
486
487 memcached_return_t memcached_append(memcached_st *ptr,
488 const char *key, size_t key_length,
489 const char *value, size_t value_length,
490 time_t expiration,
491 uint32_t flags)
492 {
493 memcached_return_t rc;
494 rc= memcached_send(ptr, key, key_length,
495 key, key_length, value, value_length,
496 expiration, flags, 0, APPEND_OP);
497 return rc;
498 }
499
500 memcached_return_t memcached_cas(memcached_st *ptr,
501 const char *key, size_t key_length,
502 const char *value, size_t value_length,
503 time_t expiration,
504 uint32_t flags,
505 uint64_t cas)
506 {
507 memcached_return_t rc;
508 rc= memcached_send(ptr, key, key_length,
509 key, key_length, value, value_length,
510 expiration, flags, cas, CAS_OP);
511 return rc;
512 }
513
514 memcached_return_t memcached_set_by_key(memcached_st *ptr,
515 const char *group_key,
516 size_t group_key_length,
517 const char *key, size_t key_length,
518 const char *value, size_t value_length,
519 time_t expiration,
520 uint32_t flags)
521 {
522 memcached_return_t rc;
523 LIBMEMCACHED_MEMCACHED_SET_START();
524 rc= memcached_send(ptr, group_key, group_key_length,
525 key, key_length, value, value_length,
526 expiration, flags, 0, SET_OP);
527 LIBMEMCACHED_MEMCACHED_SET_END();
528 return rc;
529 }
530
531 memcached_return_t memcached_add_by_key(memcached_st *ptr,
532 const char *group_key, size_t group_key_length,
533 const char *key, size_t key_length,
534 const char *value, size_t value_length,
535 time_t expiration,
536 uint32_t flags)
537 {
538 memcached_return_t rc;
539 LIBMEMCACHED_MEMCACHED_ADD_START();
540 rc= memcached_send(ptr, group_key, group_key_length,
541 key, key_length, value, value_length,
542 expiration, flags, 0, ADD_OP);
543 LIBMEMCACHED_MEMCACHED_ADD_END();
544 return rc;
545 }
546
547 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
548 const char *group_key, size_t group_key_length,
549 const char *key, size_t key_length,
550 const char *value, size_t value_length,
551 time_t expiration,
552 uint32_t flags)
553 {
554 memcached_return_t rc;
555 LIBMEMCACHED_MEMCACHED_REPLACE_START();
556 rc= memcached_send(ptr, group_key, group_key_length,
557 key, key_length, value, value_length,
558 expiration, flags, 0, REPLACE_OP);
559 LIBMEMCACHED_MEMCACHED_REPLACE_END();
560 return rc;
561 }
562
563 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
564 const char *group_key, size_t group_key_length,
565 const char *key, size_t key_length,
566 const char *value, size_t value_length,
567 time_t expiration,
568 uint32_t flags)
569 {
570 return memcached_send(ptr, group_key, group_key_length,
571 key, key_length, value, value_length,
572 expiration, flags, 0, PREPEND_OP);
573 }
574
575 memcached_return_t memcached_append_by_key(memcached_st *ptr,
576 const char *group_key, size_t group_key_length,
577 const char *key, size_t key_length,
578 const char *value, size_t value_length,
579 time_t expiration,
580 uint32_t flags)
581 {
582 return memcached_send(ptr, group_key, group_key_length,
583 key, key_length, value, value_length,
584 expiration, flags, 0, APPEND_OP);
585 }
586
587 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
588 const char *group_key, size_t group_key_length,
589 const char *key, size_t key_length,
590 const char *value, size_t value_length,
591 time_t expiration,
592 uint32_t flags,
593 uint64_t cas)
594 {
595 return memcached_send(ptr, group_key, group_key_length,
596 key, key_length, value, value_length,
597 expiration, flags, cas, CAS_OP);
598 }
599