Merge from build trunk.
[awesomized/libmemcached] / libmemcached / storage.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 enum memcached_storage_action_t {
42 SET_OP,
43 REPLACE_OP,
44 ADD_OP,
45 PREPEND_OP,
46 APPEND_OP,
47 CAS_OP
48 };
49
50 /* Inline this */
51 static inline const char *storage_op_string(memcached_storage_action_t verb)
52 {
53 switch (verb)
54 {
55 case REPLACE_OP:
56 return "replace ";
57
58 case ADD_OP:
59 return "add ";
60
61 case PREPEND_OP:
62 return "prepend ";
63
64 case APPEND_OP:
65 return "append ";
66
67 case CAS_OP:
68 return "cas ";
69
70 case SET_OP:
71 break;
72 }
73
74 return "set ";
75 }
76
77 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply)
78 {
79 if (reply == false)
80 {
81 switch (verb)
82 {
83 case SET_OP:
84 return PROTOCOL_BINARY_CMD_SETQ;
85
86 case ADD_OP:
87 return PROTOCOL_BINARY_CMD_ADDQ;
88
89 case CAS_OP: /* FALLTHROUGH */
90 case REPLACE_OP:
91 return PROTOCOL_BINARY_CMD_REPLACEQ;
92
93 case APPEND_OP:
94 return PROTOCOL_BINARY_CMD_APPENDQ;
95
96 case PREPEND_OP:
97 return PROTOCOL_BINARY_CMD_PREPENDQ;
98 }
99 }
100
101 switch (verb)
102 {
103 case SET_OP:
104 break;
105
106 case ADD_OP:
107 return PROTOCOL_BINARY_CMD_ADD;
108
109 case CAS_OP: /* FALLTHROUGH */
110 case REPLACE_OP:
111 return PROTOCOL_BINARY_CMD_REPLACE;
112
113 case APPEND_OP:
114 return PROTOCOL_BINARY_CMD_APPEND;
115
116 case PREPEND_OP:
117 return PROTOCOL_BINARY_CMD_PREPEND;
118 }
119
120 return PROTOCOL_BINARY_CMD_SET;
121 }
122
123 static memcached_return_t memcached_send_binary(memcached_st *ptr,
124 memcached_server_write_instance_st server,
125 uint32_t server_key,
126 const char *key,
127 const size_t key_length,
128 const char *value,
129 const size_t value_length,
130 const time_t expiration,
131 const uint32_t flags,
132 const uint64_t cas,
133 const bool flush,
134 const bool reply,
135 memcached_storage_action_t verb)
136 {
137 protocol_binary_request_set request= {};
138 size_t send_length= sizeof(request.bytes);
139
140 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
141 request.message.header.request.opcode= get_com_code(verb, reply);
142 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
143 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
144 if (verb == APPEND_OP or verb == PREPEND_OP)
145 {
146 send_length -= 8; /* append & prepend does not contain extras! */
147 }
148 else
149 {
150 request.message.header.request.extlen= 8;
151 request.message.body.flags= htonl(flags);
152 request.message.body.expiration= htonl((uint32_t)expiration);
153 }
154
155 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->_namespace) + value_length +
156 request.message.header.request.extlen));
157
158 if (cas)
159 {
160 request.message.header.request.cas= memcached_htonll(cas);
161 }
162
163 libmemcached_io_vector_st vector[]=
164 {
165 { NULL, 0 },
166 { request.bytes, send_length },
167 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
168 { key, key_length },
169 { value, value_length }
170 };
171
172 /* write the header */
173 memcached_return_t rc;
174 if ((rc= memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS)
175 {
176 memcached_io_reset(server);
177
178 if (memcached_has_error(ptr))
179 {
180 memcached_set_error(*server, rc, MEMCACHED_AT);
181 }
182
183 return MEMCACHED_WRITE_FAILURE;
184 }
185
186 if (verb == SET_OP and ptr->number_of_replicas > 0)
187 {
188 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
189 WATCHPOINT_STRING("replicating");
190
191 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
192 {
193 ++server_key;
194 if (server_key == memcached_server_count(ptr))
195 {
196 server_key= 0;
197 }
198
199 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
200
201 if (memcached_vdo(instance, vector, 5, false) != MEMCACHED_SUCCESS)
202 {
203 memcached_io_reset(instance);
204 }
205 else
206 {
207 memcached_server_response_decrement(instance);
208 }
209 }
210 }
211
212 if (flush == false)
213 {
214 return MEMCACHED_BUFFERED;
215 }
216
217 // No reply always assumes success
218 if (reply == false)
219 {
220 return MEMCACHED_SUCCESS;
221 }
222
223 return memcached_response(server, NULL, 0, NULL);
224 }
225
226 static memcached_return_t memcached_send_ascii(memcached_st *ptr,
227 memcached_server_write_instance_st instance,
228 const char *key,
229 const size_t key_length,
230 const char *value,
231 const size_t value_length,
232 const time_t expiration,
233 const uint32_t flags,
234 const uint64_t cas,
235 const bool flush,
236 const bool reply,
237 const memcached_storage_action_t verb)
238 {
239 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
240 int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
241 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0)
242 {
243 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
244 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
245 }
246
247 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
248 int expiration_buffer_length= snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu", (unsigned long long)expiration);
249 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
250 {
251 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
252 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
253 }
254
255 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
256 int value_buffer_length= snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long)value_length);
257 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0)
258 {
259 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
260 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
261 }
262
263 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
264 int cas_buffer_length= 0;
265 if (cas)
266 {
267 cas_buffer_length= snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long)cas);
268 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0)
269 {
270 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
271 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
272 }
273 }
274
275 libmemcached_io_vector_st vector[]=
276 {
277 { NULL, 0 },
278 { storage_op_string(verb), strlen(storage_op_string(verb))},
279 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
280 { key, key_length },
281 { flags_buffer, flags_buffer_length },
282 { expiration_buffer, expiration_buffer_length },
283 { value_buffer, value_buffer_length },
284 { cas_buffer, cas_buffer_length },
285 { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
286 { memcached_literal_param("\r\n") },
287 { value, value_length },
288 { memcached_literal_param("\r\n") }
289 };
290
291 /* Send command header */
292 memcached_return_t rc= memcached_vdo(instance, vector, 12, flush);
293
294 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
295 if (reply == false)
296 {
297 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
298 }
299
300 if (flush == false)
301 {
302 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
303 }
304
305 if (rc == MEMCACHED_SUCCESS)
306 {
307 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
308 rc= memcached_response(instance, buffer, sizeof(buffer), NULL);
309
310 if (rc == MEMCACHED_STORED)
311 {
312 return MEMCACHED_SUCCESS;
313 }
314 }
315
316 if (rc == MEMCACHED_WRITE_FAILURE)
317 {
318 memcached_io_reset(instance);
319 }
320
321 assert(memcached_failed(rc));
322 if (memcached_has_error(ptr) == false)
323 {
324 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
325 }
326
327 return rc;
328 }
329
330 static inline memcached_return_t memcached_send(memcached_st *ptr,
331 const char *group_key, size_t group_key_length,
332 const char *key, size_t key_length,
333 const char *value, size_t value_length,
334 const time_t expiration,
335 const uint32_t flags,
336 const uint64_t cas,
337 memcached_storage_action_t verb)
338 {
339 memcached_return_t rc;
340 if (memcached_failed(rc= initialize_query(ptr, true)))
341 {
342 return rc;
343 }
344
345 if (memcached_failed(rc= memcached_validate_key_length(key_length, memcached_is_binary(ptr))))
346 {
347 return rc;
348 }
349
350 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
351 {
352 return MEMCACHED_BAD_KEY_PROVIDED;
353 }
354
355 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
356 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
357
358 WATCHPOINT_SET(instance->io_wait_count.read= 0);
359 WATCHPOINT_SET(instance->io_wait_count.write= 0);
360
361
362 bool flush= true;
363 if (memcached_is_buffering(instance->root) and verb == SET_OP)
364 {
365 flush= false;
366 }
367
368 bool reply= memcached_is_replying(ptr);
369
370 if (memcached_is_binary(ptr))
371 {
372 return memcached_send_binary(ptr, instance, server_key,
373 key, key_length,
374 value, value_length, expiration,
375 flags, cas, flush, reply, verb);
376 }
377
378 return memcached_send_ascii(ptr, instance,
379 key, key_length,
380 value, value_length, expiration,
381 flags, cas, flush, reply, verb);
382 }
383
384
385 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
386 const char *value, size_t value_length,
387 time_t expiration,
388 uint32_t flags)
389 {
390 memcached_return_t rc;
391 LIBMEMCACHED_MEMCACHED_SET_START();
392 rc= memcached_send(ptr, key, key_length,
393 key, key_length, value, value_length,
394 expiration, flags, 0, SET_OP);
395 LIBMEMCACHED_MEMCACHED_SET_END();
396 return rc;
397 }
398
399 memcached_return_t memcached_add(memcached_st *ptr,
400 const char *key, size_t key_length,
401 const char *value, size_t value_length,
402 time_t expiration,
403 uint32_t flags)
404 {
405 memcached_return_t rc;
406 LIBMEMCACHED_MEMCACHED_ADD_START();
407 rc= memcached_send(ptr, key, key_length,
408 key, key_length, value, value_length,
409 expiration, flags, 0, ADD_OP);
410
411 if (rc == MEMCACHED_NOTSTORED or rc == MEMCACHED_DATA_EXISTS)
412 {
413 memcached_set_error(*ptr, rc, MEMCACHED_AT);
414 }
415 LIBMEMCACHED_MEMCACHED_ADD_END();
416 return rc;
417 }
418
419 memcached_return_t memcached_replace(memcached_st *ptr,
420 const char *key, size_t key_length,
421 const char *value, size_t value_length,
422 time_t expiration,
423 uint32_t flags)
424 {
425 memcached_return_t rc;
426 LIBMEMCACHED_MEMCACHED_REPLACE_START();
427 rc= memcached_send(ptr, key, key_length,
428 key, key_length, value, value_length,
429 expiration, flags, 0, REPLACE_OP);
430 LIBMEMCACHED_MEMCACHED_REPLACE_END();
431 return rc;
432 }
433
434 memcached_return_t memcached_prepend(memcached_st *ptr,
435 const char *key, size_t key_length,
436 const char *value, size_t value_length,
437 time_t expiration,
438 uint32_t flags)
439 {
440 memcached_return_t rc;
441 rc= memcached_send(ptr, key, key_length,
442 key, key_length, value, value_length,
443 expiration, flags, 0, PREPEND_OP);
444 return rc;
445 }
446
447 memcached_return_t memcached_append(memcached_st *ptr,
448 const char *key, size_t key_length,
449 const char *value, size_t value_length,
450 time_t expiration,
451 uint32_t flags)
452 {
453 memcached_return_t rc;
454 rc= memcached_send(ptr, key, key_length,
455 key, key_length, value, value_length,
456 expiration, flags, 0, APPEND_OP);
457 return rc;
458 }
459
460 memcached_return_t memcached_cas(memcached_st *ptr,
461 const char *key, size_t key_length,
462 const char *value, size_t value_length,
463 time_t expiration,
464 uint32_t flags,
465 uint64_t cas)
466 {
467 memcached_return_t rc;
468 rc= memcached_send(ptr, key, key_length,
469 key, key_length, value, value_length,
470 expiration, flags, cas, CAS_OP);
471 return rc;
472 }
473
474 memcached_return_t memcached_set_by_key(memcached_st *ptr,
475 const char *group_key,
476 size_t group_key_length,
477 const char *key, size_t key_length,
478 const char *value, size_t value_length,
479 time_t expiration,
480 uint32_t flags)
481 {
482 memcached_return_t rc;
483 LIBMEMCACHED_MEMCACHED_SET_START();
484 rc= memcached_send(ptr, group_key, group_key_length,
485 key, key_length, value, value_length,
486 expiration, flags, 0, SET_OP);
487 LIBMEMCACHED_MEMCACHED_SET_END();
488 return rc;
489 }
490
491 memcached_return_t memcached_add_by_key(memcached_st *ptr,
492 const char *group_key, size_t group_key_length,
493 const char *key, size_t key_length,
494 const char *value, size_t value_length,
495 time_t expiration,
496 uint32_t flags)
497 {
498 memcached_return_t rc;
499 LIBMEMCACHED_MEMCACHED_ADD_START();
500 rc= memcached_send(ptr, group_key, group_key_length,
501 key, key_length, value, value_length,
502 expiration, flags, 0, ADD_OP);
503 LIBMEMCACHED_MEMCACHED_ADD_END();
504 return rc;
505 }
506
507 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
508 const char *group_key, size_t group_key_length,
509 const char *key, size_t key_length,
510 const char *value, size_t value_length,
511 time_t expiration,
512 uint32_t flags)
513 {
514 memcached_return_t rc;
515 LIBMEMCACHED_MEMCACHED_REPLACE_START();
516 rc= memcached_send(ptr, group_key, group_key_length,
517 key, key_length, value, value_length,
518 expiration, flags, 0, REPLACE_OP);
519 LIBMEMCACHED_MEMCACHED_REPLACE_END();
520 return rc;
521 }
522
523 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
524 const char *group_key, size_t group_key_length,
525 const char *key, size_t key_length,
526 const char *value, size_t value_length,
527 time_t expiration,
528 uint32_t flags)
529 {
530 return memcached_send(ptr, group_key, group_key_length,
531 key, key_length, value, value_length,
532 expiration, flags, 0, PREPEND_OP);
533 }
534
535 memcached_return_t memcached_append_by_key(memcached_st *ptr,
536 const char *group_key, size_t group_key_length,
537 const char *key, size_t key_length,
538 const char *value, size_t value_length,
539 time_t expiration,
540 uint32_t flags)
541 {
542 return memcached_send(ptr, group_key, group_key_length,
543 key, key_length, value, value_length,
544 expiration, flags, 0, APPEND_OP);
545 }
546
547 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
548 const char *group_key, size_t group_key_length,
549 const char *key, size_t key_length,
550 const char *value, size_t value_length,
551 time_t expiration,
552 uint32_t flags,
553 uint64_t cas)
554 {
555 return memcached_send(ptr, group_key, group_key_length,
556 key, key_length, value, value_length,
557 expiration, flags, cas, CAS_OP);
558 }
559