Fixes for Fedora 17.
[m6w6/libmemcached] / libmemcached / storage.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 enum memcached_storage_action_t {
42 SET_OP,
43 REPLACE_OP,
44 ADD_OP,
45 PREPEND_OP,
46 APPEND_OP,
47 CAS_OP
48 };
49
50 /* Inline this */
51 static inline const char *storage_op_string(memcached_storage_action_t verb)
52 {
53 switch (verb)
54 {
55 case REPLACE_OP:
56 return "replace ";
57
58 case ADD_OP:
59 return "add ";
60
61 case PREPEND_OP:
62 return "prepend ";
63
64 case APPEND_OP:
65 return "append ";
66
67 case CAS_OP:
68 return "cas ";
69
70 case SET_OP:
71 break;
72 }
73
74 return "set ";
75 }
76
77 static inline uint8_t can_by_encrypted(const memcached_storage_action_t verb)
78 {
79 switch (verb)
80 {
81 case SET_OP:
82 case ADD_OP:
83 case CAS_OP:
84 case REPLACE_OP:
85 return true;
86
87 case APPEND_OP:
88 case PREPEND_OP:
89 break;
90 }
91
92 return false;
93 }
94
95 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply)
96 {
97 if (reply == false)
98 {
99 switch (verb)
100 {
101 case SET_OP:
102 return PROTOCOL_BINARY_CMD_SETQ;
103
104 case ADD_OP:
105 return PROTOCOL_BINARY_CMD_ADDQ;
106
107 case CAS_OP: /* FALLTHROUGH */
108 case REPLACE_OP:
109 return PROTOCOL_BINARY_CMD_REPLACEQ;
110
111 case APPEND_OP:
112 return PROTOCOL_BINARY_CMD_APPENDQ;
113
114 case PREPEND_OP:
115 return PROTOCOL_BINARY_CMD_PREPENDQ;
116 }
117 }
118
119 switch (verb)
120 {
121 case SET_OP:
122 break;
123
124 case ADD_OP:
125 return PROTOCOL_BINARY_CMD_ADD;
126
127 case CAS_OP: /* FALLTHROUGH */
128 case REPLACE_OP:
129 return PROTOCOL_BINARY_CMD_REPLACE;
130
131 case APPEND_OP:
132 return PROTOCOL_BINARY_CMD_APPEND;
133
134 case PREPEND_OP:
135 return PROTOCOL_BINARY_CMD_PREPEND;
136 }
137
138 return PROTOCOL_BINARY_CMD_SET;
139 }
140
141 static memcached_return_t memcached_send_binary(memcached_st *ptr,
142 memcached_server_write_instance_st server,
143 uint32_t server_key,
144 const char *key,
145 const size_t key_length,
146 const char *value,
147 const size_t value_length,
148 const time_t expiration,
149 const uint32_t flags,
150 const uint64_t cas,
151 const bool flush,
152 const bool reply,
153 memcached_storage_action_t verb)
154 {
155 protocol_binary_request_set request= {};
156 size_t send_length= sizeof(request.bytes);
157
158 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
159 request.message.header.request.opcode= get_com_code(verb, reply);
160 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
161 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
162 if (verb == APPEND_OP or verb == PREPEND_OP)
163 {
164 send_length -= 8; /* append & prepend does not contain extras! */
165 }
166 else
167 {
168 request.message.header.request.extlen= 8;
169 request.message.body.flags= htonl(flags);
170 request.message.body.expiration= htonl((uint32_t)expiration);
171 }
172
173 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->_namespace) + value_length +
174 request.message.header.request.extlen));
175
176 if (cas)
177 {
178 request.message.header.request.cas= memcached_htonll(cas);
179 }
180
181 libmemcached_io_vector_st vector[]=
182 {
183 { NULL, 0 },
184 { request.bytes, send_length },
185 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
186 { key, key_length },
187 { value, value_length }
188 };
189
190 /* write the header */
191 memcached_return_t rc;
192 if ((rc= memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS)
193 {
194 memcached_io_reset(server);
195
196 #if 0
197 if (memcached_has_error(ptr))
198 {
199 memcached_set_error(*server, rc, MEMCACHED_AT);
200 }
201 #endif
202
203 return MEMCACHED_WRITE_FAILURE;
204 }
205
206 if (verb == SET_OP and ptr->number_of_replicas > 0)
207 {
208 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
209 WATCHPOINT_STRING("replicating");
210
211 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
212 {
213 ++server_key;
214 if (server_key == memcached_server_count(ptr))
215 {
216 server_key= 0;
217 }
218
219 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
220
221 if (memcached_vdo(instance, vector, 5, false) != MEMCACHED_SUCCESS)
222 {
223 memcached_io_reset(instance);
224 }
225 else
226 {
227 memcached_server_response_decrement(instance);
228 }
229 }
230 }
231
232 if (flush == false)
233 {
234 return MEMCACHED_BUFFERED;
235 }
236
237 // No reply always assumes success
238 if (reply == false)
239 {
240 return MEMCACHED_SUCCESS;
241 }
242
243 return memcached_response(server, NULL, 0, NULL);
244 }
245
246 static memcached_return_t memcached_send_ascii(memcached_st *ptr,
247 memcached_server_write_instance_st instance,
248 const char *key,
249 const size_t key_length,
250 const char *value,
251 const size_t value_length,
252 const time_t expiration,
253 const uint32_t flags,
254 const uint64_t cas,
255 const bool flush,
256 const bool reply,
257 const memcached_storage_action_t verb)
258 {
259 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
260 int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
261 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0)
262 {
263 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
264 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
265 }
266
267 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
268 int expiration_buffer_length= snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu", (unsigned long long)expiration);
269 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
270 {
271 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
272 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
273 }
274
275 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
276 int value_buffer_length= snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long)value_length);
277 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0)
278 {
279 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
280 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
281 }
282
283 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
284 int cas_buffer_length= 0;
285 if (cas)
286 {
287 cas_buffer_length= snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long)cas);
288 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0)
289 {
290 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
291 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
292 }
293 }
294
295 libmemcached_io_vector_st vector[]=
296 {
297 { NULL, 0 },
298 { storage_op_string(verb), strlen(storage_op_string(verb))},
299 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
300 { key, key_length },
301 { flags_buffer, size_t(flags_buffer_length) },
302 { expiration_buffer, size_t(expiration_buffer_length) },
303 { value_buffer, size_t(value_buffer_length) },
304 { cas_buffer, size_t(cas_buffer_length) },
305 { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
306 { memcached_literal_param("\r\n") },
307 { value, value_length },
308 { memcached_literal_param("\r\n") }
309 };
310
311 /* Send command header */
312 memcached_return_t rc= memcached_vdo(instance, vector, 12, flush);
313
314 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
315 if (reply == false)
316 {
317 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
318 }
319
320 if (flush == false)
321 {
322 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
323 }
324
325 if (rc == MEMCACHED_SUCCESS)
326 {
327 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
328 rc= memcached_response(instance, buffer, sizeof(buffer), NULL);
329
330 if (rc == MEMCACHED_STORED)
331 {
332 return MEMCACHED_SUCCESS;
333 }
334 }
335
336 if (rc == MEMCACHED_WRITE_FAILURE)
337 {
338 memcached_io_reset(instance);
339 }
340
341 assert(memcached_failed(rc));
342 #if 0
343 if (memcached_has_error(ptr) == false)
344 {
345 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
346 }
347 #endif
348
349 return rc;
350 }
351
352 static inline memcached_return_t memcached_send(memcached_st *ptr,
353 const char *group_key, size_t group_key_length,
354 const char *key, size_t key_length,
355 const char *value, size_t value_length,
356 const time_t expiration,
357 const uint32_t flags,
358 const uint64_t cas,
359 memcached_storage_action_t verb)
360 {
361 memcached_return_t rc;
362 if (memcached_failed(rc= initialize_query(ptr, true)))
363 {
364 return rc;
365 }
366
367 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
368 {
369 return memcached_last_error(ptr);
370 }
371
372 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
373 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
374
375 WATCHPOINT_SET(instance->io_wait_count.read= 0);
376 WATCHPOINT_SET(instance->io_wait_count.write= 0);
377
378
379 bool flush= true;
380 if (memcached_is_buffering(instance->root) and verb == SET_OP)
381 {
382 flush= false;
383 }
384
385 bool reply= memcached_is_replying(ptr);
386
387 hashkit_string_st* destination= NULL;
388
389 if (memcached_is_encrypted(ptr))
390 {
391 if (can_by_encrypted(verb) == false)
392 {
393 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
394 memcached_literal_param("Operation not allowed while encyrption is enabled"));
395 }
396
397 if ((destination= hashkit_encrypt(&ptr->hashkit, value, value_length)) == NULL)
398 {
399 return rc;
400 }
401 value= hashkit_string_c_str(destination);
402 value_length= hashkit_string_length(destination);
403 }
404
405 if (memcached_is_binary(ptr))
406 {
407 rc= memcached_send_binary(ptr, instance, server_key,
408 key, key_length,
409 value, value_length, expiration,
410 flags, cas, flush, reply, verb);
411 }
412 else
413 {
414 rc= memcached_send_ascii(ptr, instance,
415 key, key_length,
416 value, value_length, expiration,
417 flags, cas, flush, reply, verb);
418 }
419
420 hashkit_string_free(destination);
421
422 return rc;
423 }
424
425
426 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
427 const char *value, size_t value_length,
428 time_t expiration,
429 uint32_t flags)
430 {
431 memcached_return_t rc;
432 LIBMEMCACHED_MEMCACHED_SET_START();
433 rc= memcached_send(ptr, key, key_length,
434 key, key_length, value, value_length,
435 expiration, flags, 0, SET_OP);
436 LIBMEMCACHED_MEMCACHED_SET_END();
437 return rc;
438 }
439
440 memcached_return_t memcached_add(memcached_st *ptr,
441 const char *key, size_t key_length,
442 const char *value, size_t value_length,
443 time_t expiration,
444 uint32_t flags)
445 {
446 memcached_return_t rc;
447 LIBMEMCACHED_MEMCACHED_ADD_START();
448 rc= memcached_send(ptr, key, key_length,
449 key, key_length, value, value_length,
450 expiration, flags, 0, ADD_OP);
451
452 LIBMEMCACHED_MEMCACHED_ADD_END();
453 return rc;
454 }
455
456 memcached_return_t memcached_replace(memcached_st *ptr,
457 const char *key, size_t key_length,
458 const char *value, size_t value_length,
459 time_t expiration,
460 uint32_t flags)
461 {
462 memcached_return_t rc;
463 LIBMEMCACHED_MEMCACHED_REPLACE_START();
464 rc= memcached_send(ptr, key, key_length,
465 key, key_length, value, value_length,
466 expiration, flags, 0, REPLACE_OP);
467 LIBMEMCACHED_MEMCACHED_REPLACE_END();
468 return rc;
469 }
470
471 memcached_return_t memcached_prepend(memcached_st *ptr,
472 const char *key, size_t key_length,
473 const char *value, size_t value_length,
474 time_t expiration,
475 uint32_t flags)
476 {
477 memcached_return_t rc;
478 rc= memcached_send(ptr, key, key_length,
479 key, key_length, value, value_length,
480 expiration, flags, 0, PREPEND_OP);
481 return rc;
482 }
483
484 memcached_return_t memcached_append(memcached_st *ptr,
485 const char *key, size_t key_length,
486 const char *value, size_t value_length,
487 time_t expiration,
488 uint32_t flags)
489 {
490 memcached_return_t rc;
491 rc= memcached_send(ptr, key, key_length,
492 key, key_length, value, value_length,
493 expiration, flags, 0, APPEND_OP);
494 return rc;
495 }
496
497 memcached_return_t memcached_cas(memcached_st *ptr,
498 const char *key, size_t key_length,
499 const char *value, size_t value_length,
500 time_t expiration,
501 uint32_t flags,
502 uint64_t cas)
503 {
504 memcached_return_t rc;
505 rc= memcached_send(ptr, key, key_length,
506 key, key_length, value, value_length,
507 expiration, flags, cas, CAS_OP);
508 return rc;
509 }
510
511 memcached_return_t memcached_set_by_key(memcached_st *ptr,
512 const char *group_key,
513 size_t group_key_length,
514 const char *key, size_t key_length,
515 const char *value, size_t value_length,
516 time_t expiration,
517 uint32_t flags)
518 {
519 memcached_return_t rc;
520 LIBMEMCACHED_MEMCACHED_SET_START();
521 rc= memcached_send(ptr, group_key, group_key_length,
522 key, key_length, value, value_length,
523 expiration, flags, 0, SET_OP);
524 LIBMEMCACHED_MEMCACHED_SET_END();
525 return rc;
526 }
527
528 memcached_return_t memcached_add_by_key(memcached_st *ptr,
529 const char *group_key, size_t group_key_length,
530 const char *key, size_t key_length,
531 const char *value, size_t value_length,
532 time_t expiration,
533 uint32_t flags)
534 {
535 memcached_return_t rc;
536 LIBMEMCACHED_MEMCACHED_ADD_START();
537 rc= memcached_send(ptr, group_key, group_key_length,
538 key, key_length, value, value_length,
539 expiration, flags, 0, ADD_OP);
540 LIBMEMCACHED_MEMCACHED_ADD_END();
541 return rc;
542 }
543
544 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
545 const char *group_key, size_t group_key_length,
546 const char *key, size_t key_length,
547 const char *value, size_t value_length,
548 time_t expiration,
549 uint32_t flags)
550 {
551 memcached_return_t rc;
552 LIBMEMCACHED_MEMCACHED_REPLACE_START();
553 rc= memcached_send(ptr, group_key, group_key_length,
554 key, key_length, value, value_length,
555 expiration, flags, 0, REPLACE_OP);
556 LIBMEMCACHED_MEMCACHED_REPLACE_END();
557 return rc;
558 }
559
560 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
561 const char *group_key, size_t group_key_length,
562 const char *key, size_t key_length,
563 const char *value, size_t value_length,
564 time_t expiration,
565 uint32_t flags)
566 {
567 return memcached_send(ptr, group_key, group_key_length,
568 key, key_length, value, value_length,
569 expiration, flags, 0, PREPEND_OP);
570 }
571
572 memcached_return_t memcached_append_by_key(memcached_st *ptr,
573 const char *group_key, size_t group_key_length,
574 const char *key, size_t key_length,
575 const char *value, size_t value_length,
576 time_t expiration,
577 uint32_t flags)
578 {
579 return memcached_send(ptr, group_key, group_key_length,
580 key, key_length, value, value_length,
581 expiration, flags, 0, APPEND_OP);
582 }
583
584 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
585 const char *group_key, size_t group_key_length,
586 const char *key, size_t key_length,
587 const char *value, size_t value_length,
588 time_t expiration,
589 uint32_t flags,
590 uint64_t cas)
591 {
592 return memcached_send(ptr, group_key, group_key_length,
593 key, key_length, value, value_length,
594 expiration, flags, cas, CAS_OP);
595 }
596