Fix detangle, minor style cleanups.
[awesomized/libmemcached] / libmemcached / storage.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 enum memcached_storage_action_t {
42 SET_OP,
43 REPLACE_OP,
44 ADD_OP,
45 PREPEND_OP,
46 APPEND_OP,
47 CAS_OP
48 };
49
50 /* Inline this */
51 static inline const char *storage_op_string(memcached_storage_action_t verb)
52 {
53 switch (verb)
54 {
55 case REPLACE_OP:
56 return "replace ";
57
58 case ADD_OP:
59 return "add ";
60
61 case PREPEND_OP:
62 return "prepend ";
63
64 case APPEND_OP:
65 return "append ";
66
67 case CAS_OP:
68 return "cas ";
69
70 case SET_OP:
71 break;
72 }
73
74 return "set ";
75 }
76
77 static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply)
78 {
79 if (noreply)
80 {
81 switch (verb)
82 {
83 case SET_OP:
84 return PROTOCOL_BINARY_CMD_SETQ;
85
86 case ADD_OP:
87 return PROTOCOL_BINARY_CMD_ADDQ;
88
89 case CAS_OP: /* FALLTHROUGH */
90 case REPLACE_OP:
91 return PROTOCOL_BINARY_CMD_REPLACEQ;
92
93 case APPEND_OP:
94 return PROTOCOL_BINARY_CMD_APPENDQ;
95
96 case PREPEND_OP:
97 return PROTOCOL_BINARY_CMD_PREPENDQ;
98 }
99 }
100
101 switch (verb)
102 {
103 case SET_OP:
104 break;
105
106 case ADD_OP:
107 return PROTOCOL_BINARY_CMD_ADD;
108
109 case CAS_OP: /* FALLTHROUGH */
110 case REPLACE_OP:
111 return PROTOCOL_BINARY_CMD_REPLACE;
112
113 case APPEND_OP:
114 return PROTOCOL_BINARY_CMD_APPEND;
115
116 case PREPEND_OP:
117 return PROTOCOL_BINARY_CMD_PREPEND;
118 }
119
120 return PROTOCOL_BINARY_CMD_SET;
121 }
122
123 static memcached_return_t memcached_send_binary(memcached_st *ptr,
124 memcached_server_write_instance_st server,
125 uint32_t server_key,
126 const char *key,
127 size_t key_length,
128 const char *value,
129 size_t value_length,
130 time_t expiration,
131 uint32_t flags,
132 uint64_t cas,
133 bool flush,
134 memcached_storage_action_t verb)
135 {
136 protocol_binary_request_set request= {};
137 size_t send_length= sizeof(request.bytes);
138
139 bool noreply= server->root->flags.no_reply;
140
141 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
142 request.message.header.request.opcode= get_com_code(verb, noreply);
143 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
144 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
145 if (verb == APPEND_OP or verb == PREPEND_OP)
146 {
147 send_length -= 8; /* append & prepend does not contain extras! */
148 }
149 else
150 {
151 request.message.header.request.extlen= 8;
152 request.message.body.flags= htonl(flags);
153 request.message.body.expiration= htonl((uint32_t)expiration);
154 }
155
156 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->_namespace) + value_length +
157 request.message.header.request.extlen));
158
159 if (cas)
160 {
161 request.message.header.request.cas= memcached_htonll(cas);
162 }
163
164 if (server->root->flags.use_udp and flush == false)
165 {
166 size_t cmd_size= send_length + key_length + value_length;
167
168 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
169 {
170 return MEMCACHED_WRITE_FAILURE;
171 }
172 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
173 {
174 memcached_io_write(server, NULL, 0, true);
175 }
176 }
177
178 struct libmemcached_io_vector_st vector[]=
179 {
180 { request.bytes, send_length },
181 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
182 { key, key_length },
183 { value, value_length }
184 };
185
186 /* write the header */
187 memcached_return_t rc;
188 if ((rc= memcached_vdo(server, vector, 4, flush)) != MEMCACHED_SUCCESS)
189 {
190 memcached_io_reset(server);
191
192 if (ptr->error_messages == NULL)
193 {
194 memcached_set_error(*server, rc, MEMCACHED_AT);
195 }
196
197 return MEMCACHED_WRITE_FAILURE;
198 }
199
200 if (verb == SET_OP && ptr->number_of_replicas > 0)
201 {
202 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
203 WATCHPOINT_STRING("replicating");
204
205 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
206 {
207 ++server_key;
208 if (server_key == memcached_server_count(ptr))
209 {
210 server_key= 0;
211 }
212
213 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
214
215 if (memcached_vdo(instance, vector, 4, false) != MEMCACHED_SUCCESS)
216 {
217 memcached_io_reset(instance);
218 }
219 else
220 {
221 memcached_server_response_decrement(instance);
222 }
223 }
224 }
225
226 if (flush == false)
227 {
228 return MEMCACHED_BUFFERED;
229 }
230
231 if (noreply)
232 {
233 return MEMCACHED_SUCCESS;
234 }
235
236 return memcached_response(server, NULL, 0, NULL);
237 }
238
239 static memcached_return_t memcached_send_ascii(memcached_st *ptr,
240 memcached_server_write_instance_st instance,
241 const char *key,
242 size_t key_length,
243 const char *value,
244 size_t value_length,
245 time_t expiration,
246 uint32_t flags,
247 uint64_t cas,
248 bool flush,
249 memcached_storage_action_t verb)
250 {
251 size_t write_length;
252 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
253
254 if (cas)
255 {
256 int check_length;
257 check_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
258 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
259 storage_op_string(verb),
260 memcached_print_array(ptr->_namespace),
261 (int)key_length, key, flags,
262 (unsigned long long)expiration, (unsigned long)value_length,
263 (unsigned long long)cas,
264 (ptr->flags.no_reply) ? " noreply" : "");
265 if (check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE or check_length < 0)
266 {
267 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
268 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
269 }
270 write_length= check_length;
271 }
272 else
273 {
274 char *buffer_ptr= buffer;
275 const char *command= storage_op_string(verb);
276
277 /* Copy in the command, no space needed, we handle that in the command function*/
278 memcpy(buffer_ptr, command, strlen(command));
279
280 /* Copy in the key prefix, switch to the buffer_ptr */
281 buffer_ptr= (char *)memcpy((char *)(buffer_ptr + strlen(command)), (char *)memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace));
282
283 /* Copy in the key, adjust point if a key prefix was used. */
284 buffer_ptr= (char *)memcpy(buffer_ptr + memcached_array_size(ptr->_namespace),
285 key, key_length);
286 buffer_ptr+= key_length;
287 buffer_ptr[0]= ' ';
288 buffer_ptr++;
289
290 write_length= (size_t)(buffer_ptr - buffer);
291 int check_length= snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE -(size_t)(buffer_ptr - buffer),
292 "%u %llu %lu%s\r\n",
293 flags,
294 (unsigned long long)expiration, (unsigned long)value_length,
295 ptr->flags.no_reply ? " noreply" : "");
296 if ((size_t)check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE -size_t(buffer_ptr - buffer) or check_length < 0)
297 {
298 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
299 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
300 }
301
302 write_length+= (size_t)check_length;
303 WATCHPOINT_ASSERT(write_length < MEMCACHED_DEFAULT_COMMAND_SIZE);
304 }
305
306 if (ptr->flags.use_udp)
307 {
308 size_t cmd_size= write_length + value_length +2;
309 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
310 {
311 return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
312 }
313
314 if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
315 {
316 memcached_io_write(instance, NULL, 0, true);
317 }
318 }
319
320 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
321 {
322 return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
323 }
324
325 struct libmemcached_io_vector_st vector[]=
326 {
327 { buffer, write_length },
328 { value, value_length },
329 { memcached_literal_param("\r\n") }
330 };
331
332 if (memcached_is_udp(instance->root) and (write_length +value_length +memcached_literal_param_size("\r\n") +UDP_DATAGRAM_HEADER_LENGTH > MAX_UDP_DATAGRAM_LENGTH))
333 {
334 return memcached_set_error(*instance, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT, memcached_literal_param("UDP packet is too large"));
335 }
336
337 /* Send command header */
338 memcached_return_t rc= memcached_vdo(instance, vector, 3, flush);
339 if (rc == MEMCACHED_SUCCESS)
340 {
341 if (ptr->flags.no_reply and flush)
342 {
343 rc= MEMCACHED_SUCCESS;
344 }
345 else if (flush == false)
346 {
347 rc= MEMCACHED_BUFFERED;
348 }
349 else
350 {
351 rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
352
353 if (rc == MEMCACHED_STORED)
354 {
355 rc= MEMCACHED_SUCCESS;
356 }
357 }
358 }
359
360 if (rc == MEMCACHED_WRITE_FAILURE)
361 {
362 memcached_io_reset(instance);
363 }
364
365 if (memcached_failed(rc) and ptr->error_messages == NULL)
366 {
367 memcached_set_error(*ptr, rc, MEMCACHED_AT);
368 }
369
370 return rc;
371 }
372
373 static inline memcached_return_t memcached_send(memcached_st *ptr,
374 const char *group_key, size_t group_key_length,
375 const char *key, size_t key_length,
376 const char *value, size_t value_length,
377 time_t expiration,
378 uint32_t flags,
379 uint64_t cas,
380 memcached_storage_action_t verb)
381 {
382 memcached_return_t rc;
383 if (memcached_failed(rc= initialize_query(ptr)))
384 {
385 return rc;
386 }
387
388 if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol)))
389 {
390 return rc;
391 }
392
393 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
394 {
395 return MEMCACHED_BAD_KEY_PROVIDED;
396 }
397
398 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
399 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
400
401 WATCHPOINT_SET(instance->io_wait_count.read= 0);
402 WATCHPOINT_SET(instance->io_wait_count.write= 0);
403
404 bool flush= (bool) ((instance->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1);
405 if (ptr->flags.binary_protocol)
406 {
407 rc= memcached_send_binary(ptr, instance, server_key,
408 key, key_length,
409 value, value_length, expiration,
410 flags, cas, flush, verb);
411 }
412 else
413 {
414 rc= memcached_send_ascii(ptr, instance,
415 key, key_length,
416 value, value_length, expiration,
417 flags, cas, flush, verb);
418 }
419
420 return rc;
421 }
422
423
424 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
425 const char *value, size_t value_length,
426 time_t expiration,
427 uint32_t flags)
428 {
429 memcached_return_t rc;
430 LIBMEMCACHED_MEMCACHED_SET_START();
431 rc= memcached_send(ptr, key, key_length,
432 key, key_length, value, value_length,
433 expiration, flags, 0, SET_OP);
434 LIBMEMCACHED_MEMCACHED_SET_END();
435 return rc;
436 }
437
438 memcached_return_t memcached_add(memcached_st *ptr,
439 const char *key, size_t key_length,
440 const char *value, size_t value_length,
441 time_t expiration,
442 uint32_t flags)
443 {
444 memcached_return_t rc;
445 LIBMEMCACHED_MEMCACHED_ADD_START();
446 rc= memcached_send(ptr, key, key_length,
447 key, key_length, value, value_length,
448 expiration, flags, 0, ADD_OP);
449 LIBMEMCACHED_MEMCACHED_ADD_END();
450 return rc;
451 }
452
453 memcached_return_t memcached_replace(memcached_st *ptr,
454 const char *key, size_t key_length,
455 const char *value, size_t value_length,
456 time_t expiration,
457 uint32_t flags)
458 {
459 memcached_return_t rc;
460 LIBMEMCACHED_MEMCACHED_REPLACE_START();
461 rc= memcached_send(ptr, key, key_length,
462 key, key_length, value, value_length,
463 expiration, flags, 0, REPLACE_OP);
464 LIBMEMCACHED_MEMCACHED_REPLACE_END();
465 return rc;
466 }
467
468 memcached_return_t memcached_prepend(memcached_st *ptr,
469 const char *key, size_t key_length,
470 const char *value, size_t value_length,
471 time_t expiration,
472 uint32_t flags)
473 {
474 memcached_return_t rc;
475 rc= memcached_send(ptr, key, key_length,
476 key, key_length, value, value_length,
477 expiration, flags, 0, PREPEND_OP);
478 return rc;
479 }
480
481 memcached_return_t memcached_append(memcached_st *ptr,
482 const char *key, size_t key_length,
483 const char *value, size_t value_length,
484 time_t expiration,
485 uint32_t flags)
486 {
487 memcached_return_t rc;
488 rc= memcached_send(ptr, key, key_length,
489 key, key_length, value, value_length,
490 expiration, flags, 0, APPEND_OP);
491 return rc;
492 }
493
494 memcached_return_t memcached_cas(memcached_st *ptr,
495 const char *key, size_t key_length,
496 const char *value, size_t value_length,
497 time_t expiration,
498 uint32_t flags,
499 uint64_t cas)
500 {
501 memcached_return_t rc;
502 rc= memcached_send(ptr, key, key_length,
503 key, key_length, value, value_length,
504 expiration, flags, cas, CAS_OP);
505 return rc;
506 }
507
508 memcached_return_t memcached_set_by_key(memcached_st *ptr,
509 const char *group_key,
510 size_t group_key_length,
511 const char *key, size_t key_length,
512 const char *value, size_t value_length,
513 time_t expiration,
514 uint32_t flags)
515 {
516 memcached_return_t rc;
517 LIBMEMCACHED_MEMCACHED_SET_START();
518 rc= memcached_send(ptr, group_key, group_key_length,
519 key, key_length, value, value_length,
520 expiration, flags, 0, SET_OP);
521 LIBMEMCACHED_MEMCACHED_SET_END();
522 return rc;
523 }
524
525 memcached_return_t memcached_add_by_key(memcached_st *ptr,
526 const char *group_key, size_t group_key_length,
527 const char *key, size_t key_length,
528 const char *value, size_t value_length,
529 time_t expiration,
530 uint32_t flags)
531 {
532 memcached_return_t rc;
533 LIBMEMCACHED_MEMCACHED_ADD_START();
534 rc= memcached_send(ptr, group_key, group_key_length,
535 key, key_length, value, value_length,
536 expiration, flags, 0, ADD_OP);
537 LIBMEMCACHED_MEMCACHED_ADD_END();
538 return rc;
539 }
540
541 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
542 const char *group_key, size_t group_key_length,
543 const char *key, size_t key_length,
544 const char *value, size_t value_length,
545 time_t expiration,
546 uint32_t flags)
547 {
548 memcached_return_t rc;
549 LIBMEMCACHED_MEMCACHED_REPLACE_START();
550 rc= memcached_send(ptr, group_key, group_key_length,
551 key, key_length, value, value_length,
552 expiration, flags, 0, REPLACE_OP);
553 LIBMEMCACHED_MEMCACHED_REPLACE_END();
554 return rc;
555 }
556
557 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
558 const char *group_key, size_t group_key_length,
559 const char *key, size_t key_length,
560 const char *value, size_t value_length,
561 time_t expiration,
562 uint32_t flags)
563 {
564 memcached_return_t rc;
565 rc= memcached_send(ptr, group_key, group_key_length,
566 key, key_length, value, value_length,
567 expiration, flags, 0, PREPEND_OP);
568 return rc;
569 }
570
571 memcached_return_t memcached_append_by_key(memcached_st *ptr,
572 const char *group_key, size_t group_key_length,
573 const char *key, size_t key_length,
574 const char *value, size_t value_length,
575 time_t expiration,
576 uint32_t flags)
577 {
578 memcached_return_t rc;
579 rc= memcached_send(ptr, group_key, group_key_length,
580 key, key_length, value, value_length,
581 expiration, flags, 0, APPEND_OP);
582 return rc;
583 }
584
585 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
586 const char *group_key, size_t group_key_length,
587 const char *key, size_t key_length,
588 const char *value, size_t value_length,
589 time_t expiration,
590 uint32_t flags,
591 uint64_t cas)
592 {
593 memcached_return_t rc;
594 rc= memcached_send(ptr, group_key, group_key_length,
595 key, key_length, value, value_length,
596 expiration, flags, cas, CAS_OP);
597 return rc;
598 }
599