Updating for simplified IO flush.
[awesomized/libmemcached] / libmemcached / storage.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 enum memcached_storage_action_t {
42 SET_OP,
43 REPLACE_OP,
44 ADD_OP,
45 PREPEND_OP,
46 APPEND_OP,
47 CAS_OP
48 };
49
50 /* Inline this */
51 static inline const char *storage_op_string(memcached_storage_action_t verb)
52 {
53 switch (verb)
54 {
55 case REPLACE_OP:
56 return "replace ";
57
58 case ADD_OP:
59 return "add ";
60
61 case PREPEND_OP:
62 return "prepend ";
63
64 case APPEND_OP:
65 return "append ";
66
67 case CAS_OP:
68 return "cas ";
69
70 case SET_OP:
71 break;
72 }
73
74 return "set ";
75 }
76
77 static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply)
78 {
79 if (noreply)
80 {
81 switch (verb)
82 {
83 case SET_OP:
84 return PROTOCOL_BINARY_CMD_SETQ;
85
86 case ADD_OP:
87 return PROTOCOL_BINARY_CMD_ADDQ;
88
89 case CAS_OP: /* FALLTHROUGH */
90 case REPLACE_OP:
91 return PROTOCOL_BINARY_CMD_REPLACEQ;
92
93 case APPEND_OP:
94 return PROTOCOL_BINARY_CMD_APPENDQ;
95
96 case PREPEND_OP:
97 return PROTOCOL_BINARY_CMD_PREPENDQ;
98 }
99 }
100
101 switch (verb)
102 {
103 case SET_OP:
104 break;
105
106 case ADD_OP:
107 return PROTOCOL_BINARY_CMD_ADD;
108
109 case CAS_OP: /* FALLTHROUGH */
110 case REPLACE_OP:
111 return PROTOCOL_BINARY_CMD_REPLACE;
112
113 case APPEND_OP:
114 return PROTOCOL_BINARY_CMD_APPEND;
115
116 case PREPEND_OP:
117 return PROTOCOL_BINARY_CMD_PREPEND;
118 }
119
120 return PROTOCOL_BINARY_CMD_SET;
121 }
122
123 static memcached_return_t memcached_send_binary(memcached_st *ptr,
124 memcached_server_write_instance_st server,
125 uint32_t server_key,
126 const char *key,
127 size_t key_length,
128 const char *value,
129 size_t value_length,
130 time_t expiration,
131 uint32_t flags,
132 uint64_t cas,
133 bool flush,
134 memcached_storage_action_t verb)
135 {
136 protocol_binary_request_set request= {};
137 size_t send_length= sizeof(request.bytes);
138
139 bool noreply= server->root->flags.no_reply;
140
141 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
142 request.message.header.request.opcode= get_com_code(verb, noreply);
143 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
144 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
145 if (verb == APPEND_OP or verb == PREPEND_OP)
146 {
147 send_length -= 8; /* append & prepend does not contain extras! */
148 }
149 else
150 {
151 request.message.header.request.extlen= 8;
152 request.message.body.flags= htonl(flags);
153 request.message.body.expiration= htonl((uint32_t)expiration);
154 }
155
156 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->_namespace) + value_length +
157 request.message.header.request.extlen));
158
159 if (cas)
160 {
161 request.message.header.request.cas= memcached_htonll(cas);
162 }
163
164 if (server->root->flags.use_udp and flush == false)
165 {
166 size_t cmd_size= send_length + key_length + value_length;
167
168 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
169 {
170 return MEMCACHED_WRITE_FAILURE;
171 }
172 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
173 {
174 memcached_io_write(server);
175 }
176 }
177
178 struct libmemcached_io_vector_st vector[]=
179 {
180 { request.bytes, send_length },
181 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
182 { key, key_length },
183 { value, value_length }
184 };
185
186 /* write the header */
187 memcached_return_t rc;
188 if ((rc= memcached_vdo(server, vector, 4, flush)) != MEMCACHED_SUCCESS)
189 {
190 memcached_io_reset(server);
191
192 if (ptr->error_messages == NULL)
193 {
194 memcached_set_error(*server, rc, MEMCACHED_AT);
195 }
196
197 return MEMCACHED_WRITE_FAILURE;
198 }
199
200 if (verb == SET_OP && ptr->number_of_replicas > 0)
201 {
202 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
203 WATCHPOINT_STRING("replicating");
204
205 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
206 {
207 ++server_key;
208 if (server_key == memcached_server_count(ptr))
209 {
210 server_key= 0;
211 }
212
213 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
214
215 if (memcached_vdo(instance, vector, 4, false) != MEMCACHED_SUCCESS)
216 {
217 memcached_io_reset(instance);
218 }
219 else
220 {
221 memcached_server_response_decrement(instance);
222 }
223 }
224 }
225
226 if (flush == false)
227 {
228 return MEMCACHED_BUFFERED;
229 }
230
231 if (noreply)
232 {
233 return MEMCACHED_SUCCESS;
234 }
235
236 return memcached_response(server, NULL, 0, NULL);
237 }
238
239 static memcached_return_t memcached_send_ascii(memcached_st *ptr,
240 memcached_server_write_instance_st instance,
241 const char *key,
242 const size_t key_length,
243 const char *value,
244 const size_t value_length,
245 const time_t expiration,
246 const uint32_t flags,
247 const uint64_t cas,
248 const bool flush,
249 const memcached_storage_action_t verb)
250 {
251 // Invert the logic to make it simpler to read the code
252 bool reply= (ptr->flags.no_reply) ? false : true;
253
254 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
255 int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
256 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0)
257 {
258 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
259 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
260 }
261
262 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
263 int expiration_buffer_length= snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu", (unsigned long long)expiration);
264 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
265 {
266 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
267 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
268 }
269
270 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
271 int value_buffer_length= snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long)value_length);
272 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0)
273 {
274 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
275 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
276 }
277
278 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
279 int cas_buffer_length= 0;
280 if (cas)
281 {
282 cas_buffer_length= snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long)cas);
283 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0)
284 {
285 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
286 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
287 }
288 }
289
290 struct libmemcached_io_vector_st vector[]=
291 {
292 { storage_op_string(verb), strlen(storage_op_string(verb))},
293 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
294 { key, key_length },
295 { flags_buffer, flags_buffer_length },
296 { expiration_buffer, expiration_buffer_length },
297 { value_buffer, value_buffer_length },
298 { cas_buffer, cas_buffer_length },
299 { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
300 { memcached_literal_param("\r\n") },
301 { value, value_length },
302 { memcached_literal_param("\r\n") }
303 };
304
305 if (memcached_is_udp(instance->root))
306 {
307 size_t write_length= io_vector_total_size(vector, 11);
308
309 size_t cmd_size= write_length + value_length +2;
310 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
311 {
312 return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
313 }
314
315 if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
316 {
317 memcached_io_write(instance);
318 }
319 }
320
321 /* Send command header */
322 memcached_return_t rc= memcached_vdo(instance, vector, 11, flush);
323 if (rc == MEMCACHED_SUCCESS)
324 {
325 if (ptr->flags.no_reply and flush)
326 {
327 rc= MEMCACHED_SUCCESS;
328 }
329 else if (flush == false)
330 {
331 rc= MEMCACHED_BUFFERED;
332 }
333 else
334 {
335 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
336 rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
337
338 if (rc == MEMCACHED_STORED)
339 {
340 rc= MEMCACHED_SUCCESS;
341 }
342 }
343 }
344
345 if (rc == MEMCACHED_WRITE_FAILURE)
346 {
347 memcached_io_reset(instance);
348 }
349
350 if (memcached_failed(rc) and ptr->error_messages == NULL)
351 {
352 memcached_set_error(*ptr, rc, MEMCACHED_AT);
353 }
354
355 return rc;
356 }
357
358 static inline memcached_return_t memcached_send(memcached_st *ptr,
359 const char *group_key, size_t group_key_length,
360 const char *key, size_t key_length,
361 const char *value, size_t value_length,
362 const time_t expiration,
363 const uint32_t flags,
364 const uint64_t cas,
365 memcached_storage_action_t verb)
366 {
367 memcached_return_t rc;
368 if (memcached_failed(rc= initialize_query(ptr)))
369 {
370 return rc;
371 }
372
373 if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol)))
374 {
375 return rc;
376 }
377
378 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
379 {
380 return MEMCACHED_BAD_KEY_PROVIDED;
381 }
382
383 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
384 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
385
386 WATCHPOINT_SET(instance->io_wait_count.read= 0);
387 WATCHPOINT_SET(instance->io_wait_count.write= 0);
388
389 bool flush= (bool) ((instance->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1);
390 if (ptr->flags.binary_protocol)
391 {
392 rc= memcached_send_binary(ptr, instance, server_key,
393 key, key_length,
394 value, value_length, expiration,
395 flags, cas, flush, verb);
396 }
397 else
398 {
399 rc= memcached_send_ascii(ptr, instance,
400 key, key_length,
401 value, value_length, expiration,
402 flags, cas, flush, verb);
403 }
404
405 return rc;
406 }
407
408
409 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
410 const char *value, size_t value_length,
411 time_t expiration,
412 uint32_t flags)
413 {
414 memcached_return_t rc;
415 LIBMEMCACHED_MEMCACHED_SET_START();
416 rc= memcached_send(ptr, key, key_length,
417 key, key_length, value, value_length,
418 expiration, flags, 0, SET_OP);
419 LIBMEMCACHED_MEMCACHED_SET_END();
420 return rc;
421 }
422
423 memcached_return_t memcached_add(memcached_st *ptr,
424 const char *key, size_t key_length,
425 const char *value, size_t value_length,
426 time_t expiration,
427 uint32_t flags)
428 {
429 memcached_return_t rc;
430 LIBMEMCACHED_MEMCACHED_ADD_START();
431 rc= memcached_send(ptr, key, key_length,
432 key, key_length, value, value_length,
433 expiration, flags, 0, ADD_OP);
434 LIBMEMCACHED_MEMCACHED_ADD_END();
435 return rc;
436 }
437
438 memcached_return_t memcached_replace(memcached_st *ptr,
439 const char *key, size_t key_length,
440 const char *value, size_t value_length,
441 time_t expiration,
442 uint32_t flags)
443 {
444 memcached_return_t rc;
445 LIBMEMCACHED_MEMCACHED_REPLACE_START();
446 rc= memcached_send(ptr, key, key_length,
447 key, key_length, value, value_length,
448 expiration, flags, 0, REPLACE_OP);
449 LIBMEMCACHED_MEMCACHED_REPLACE_END();
450 return rc;
451 }
452
453 memcached_return_t memcached_prepend(memcached_st *ptr,
454 const char *key, size_t key_length,
455 const char *value, size_t value_length,
456 time_t expiration,
457 uint32_t flags)
458 {
459 memcached_return_t rc;
460 rc= memcached_send(ptr, key, key_length,
461 key, key_length, value, value_length,
462 expiration, flags, 0, PREPEND_OP);
463 return rc;
464 }
465
466 memcached_return_t memcached_append(memcached_st *ptr,
467 const char *key, size_t key_length,
468 const char *value, size_t value_length,
469 time_t expiration,
470 uint32_t flags)
471 {
472 memcached_return_t rc;
473 rc= memcached_send(ptr, key, key_length,
474 key, key_length, value, value_length,
475 expiration, flags, 0, APPEND_OP);
476 return rc;
477 }
478
479 memcached_return_t memcached_cas(memcached_st *ptr,
480 const char *key, size_t key_length,
481 const char *value, size_t value_length,
482 time_t expiration,
483 uint32_t flags,
484 uint64_t cas)
485 {
486 memcached_return_t rc;
487 rc= memcached_send(ptr, key, key_length,
488 key, key_length, value, value_length,
489 expiration, flags, cas, CAS_OP);
490 return rc;
491 }
492
493 memcached_return_t memcached_set_by_key(memcached_st *ptr,
494 const char *group_key,
495 size_t group_key_length,
496 const char *key, size_t key_length,
497 const char *value, size_t value_length,
498 time_t expiration,
499 uint32_t flags)
500 {
501 memcached_return_t rc;
502 LIBMEMCACHED_MEMCACHED_SET_START();
503 rc= memcached_send(ptr, group_key, group_key_length,
504 key, key_length, value, value_length,
505 expiration, flags, 0, SET_OP);
506 LIBMEMCACHED_MEMCACHED_SET_END();
507 return rc;
508 }
509
510 memcached_return_t memcached_add_by_key(memcached_st *ptr,
511 const char *group_key, size_t group_key_length,
512 const char *key, size_t key_length,
513 const char *value, size_t value_length,
514 time_t expiration,
515 uint32_t flags)
516 {
517 memcached_return_t rc;
518 LIBMEMCACHED_MEMCACHED_ADD_START();
519 rc= memcached_send(ptr, group_key, group_key_length,
520 key, key_length, value, value_length,
521 expiration, flags, 0, ADD_OP);
522 LIBMEMCACHED_MEMCACHED_ADD_END();
523 return rc;
524 }
525
526 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
527 const char *group_key, size_t group_key_length,
528 const char *key, size_t key_length,
529 const char *value, size_t value_length,
530 time_t expiration,
531 uint32_t flags)
532 {
533 memcached_return_t rc;
534 LIBMEMCACHED_MEMCACHED_REPLACE_START();
535 rc= memcached_send(ptr, group_key, group_key_length,
536 key, key_length, value, value_length,
537 expiration, flags, 0, REPLACE_OP);
538 LIBMEMCACHED_MEMCACHED_REPLACE_END();
539 return rc;
540 }
541
542 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
543 const char *group_key, size_t group_key_length,
544 const char *key, size_t key_length,
545 const char *value, size_t value_length,
546 time_t expiration,
547 uint32_t flags)
548 {
549 memcached_return_t rc;
550 rc= memcached_send(ptr, group_key, group_key_length,
551 key, key_length, value, value_length,
552 expiration, flags, 0, PREPEND_OP);
553 return rc;
554 }
555
556 memcached_return_t memcached_append_by_key(memcached_st *ptr,
557 const char *group_key, size_t group_key_length,
558 const char *key, size_t key_length,
559 const char *value, size_t value_length,
560 time_t expiration,
561 uint32_t flags)
562 {
563 memcached_return_t rc;
564 rc= memcached_send(ptr, group_key, group_key_length,
565 key, key_length, value, value_length,
566 expiration, flags, 0, APPEND_OP);
567 return rc;
568 }
569
570 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
571 const char *group_key, size_t group_key_length,
572 const char *key, size_t key_length,
573 const char *value, size_t value_length,
574 time_t expiration,
575 uint32_t flags,
576 uint64_t cas)
577 {
578 memcached_return_t rc;
579 rc= memcached_send(ptr, group_key, group_key_length,
580 key, key_length, value, value_length,
581 expiration, flags, cas, CAS_OP);
582 return rc;
583 }
584