Merge up for version.
[awesomized/libmemcached] / libmemcached / storage.cc
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Storage related functions, aka set, replace,..
9 *
10 */
11
12 #include <libmemcached/common.h>
13
14 enum memcached_storage_action_t {
15 SET_OP,
16 REPLACE_OP,
17 ADD_OP,
18 PREPEND_OP,
19 APPEND_OP,
20 CAS_OP
21 };
22
23 /* Inline this */
24 static inline const char *storage_op_string(memcached_storage_action_t verb)
25 {
26 switch (verb)
27 {
28 case SET_OP:
29 return "set ";
30 case REPLACE_OP:
31 return "replace ";
32 case ADD_OP:
33 return "add ";
34 case PREPEND_OP:
35 return "prepend ";
36 case APPEND_OP:
37 return "append ";
38 case CAS_OP:
39 return "cas ";
40 default:
41 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
42 }
43
44 /* NOTREACHED */
45 }
46
47 static memcached_return_t memcached_send_binary(memcached_st *ptr,
48 memcached_server_write_instance_st server,
49 uint32_t server_key,
50 const char *key,
51 size_t key_length,
52 const char *value,
53 size_t value_length,
54 time_t expiration,
55 uint32_t flags,
56 uint64_t cas,
57 memcached_storage_action_t verb);
58
59 static inline memcached_return_t memcached_send(memcached_st *ptr,
60 const char *group_key, size_t group_key_length,
61 const char *key, size_t key_length,
62 const char *value, size_t value_length,
63 time_t expiration,
64 uint32_t flags,
65 uint64_t cas,
66 memcached_storage_action_t verb)
67 {
68 bool to_write;
69 size_t write_length;
70 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
71
72 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
73
74 memcached_return_t rc;
75 if (memcached_failed(rc= initialize_query(ptr)))
76 {
77 return rc;
78 }
79
80 if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol)))
81 {
82 return rc;
83 }
84
85 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
86 {
87 return MEMCACHED_BAD_KEY_PROVIDED;
88 }
89
90 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
91 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
92
93 WATCHPOINT_SET(instance->io_wait_count.read= 0);
94 WATCHPOINT_SET(instance->io_wait_count.write= 0);
95
96 if (ptr->flags.binary_protocol)
97 {
98 rc= memcached_send_binary(ptr, instance, server_key,
99 key, key_length,
100 value, value_length, expiration,
101 flags, cas, verb);
102 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read);
103 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write);
104 }
105 else
106 {
107
108 if (cas)
109 {
110 int check_length;
111 check_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
112 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
113 storage_op_string(verb),
114 memcached_print_array(ptr->_namespace),
115 (int)key_length, key, flags,
116 (unsigned long long)expiration, (unsigned long)value_length,
117 (unsigned long long)cas,
118 (ptr->flags.no_reply) ? " noreply" : "");
119 if (check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE || check_length < 0)
120 {
121 rc= memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
122 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
123 memcached_io_reset(instance);
124
125 return rc;
126 }
127 write_length= check_length;
128 }
129 else
130 {
131 char *buffer_ptr= buffer;
132 const char *command= storage_op_string(verb);
133
134 /* Copy in the command, no space needed, we handle that in the command function*/
135 memcpy(buffer_ptr, command, strlen(command));
136
137 /* Copy in the key prefix, switch to the buffer_ptr */
138 buffer_ptr= (char *)memcpy((char *)(buffer_ptr + strlen(command)), (char *)memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace));
139
140 /* Copy in the key, adjust point if a key prefix was used. */
141 buffer_ptr= (char *)memcpy(buffer_ptr + memcached_array_size(ptr->_namespace),
142 key, key_length);
143 buffer_ptr+= key_length;
144 buffer_ptr[0]= ' ';
145 buffer_ptr++;
146
147 write_length= (size_t)(buffer_ptr - buffer);
148 int check_length= snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE -(size_t)(buffer_ptr - buffer),
149 "%u %llu %lu%s\r\n",
150 flags,
151 (unsigned long long)expiration, (unsigned long)value_length,
152 ptr->flags.no_reply ? " noreply" : "");
153 if ((size_t)check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE -size_t(buffer_ptr - buffer) || check_length < 0)
154 {
155 rc= memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
156 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
157 memcached_io_reset(instance);
158
159 return rc;
160 }
161
162 write_length+= (size_t)check_length;
163 WATCHPOINT_ASSERT(write_length < MEMCACHED_DEFAULT_COMMAND_SIZE);
164 }
165
166 if (ptr->flags.use_udp && ptr->flags.buffer_requests)
167 {
168 size_t cmd_size= write_length + value_length +2;
169 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
170 return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
171
172 if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
173 memcached_io_write(instance, NULL, 0, true);
174 }
175
176 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
177 {
178 rc= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
179 }
180 else
181 {
182 struct libmemcached_io_vector_st vector[]=
183 {
184 { write_length, buffer },
185 { value_length, value },
186 { 2, "\r\n" }
187 };
188
189 if (ptr->flags.buffer_requests && verb == SET_OP)
190 {
191 to_write= false;
192 }
193 else
194 {
195 to_write= true;
196 }
197
198 /* Send command header */
199 rc= memcached_vdo(instance, vector, 3, to_write);
200 if (rc == MEMCACHED_SUCCESS)
201 {
202
203 if (ptr->flags.no_reply)
204 {
205 rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
206 }
207 else if (to_write == false)
208 {
209 rc= MEMCACHED_BUFFERED;
210 }
211 else
212 {
213 rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
214
215 if (rc == MEMCACHED_STORED)
216 rc= MEMCACHED_SUCCESS;
217 }
218 }
219 }
220
221 if (rc == MEMCACHED_WRITE_FAILURE)
222 memcached_io_reset(instance);
223 }
224
225 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read);
226 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write);
227
228 return rc;
229 }
230
231
232 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
233 const char *value, size_t value_length,
234 time_t expiration,
235 uint32_t flags)
236 {
237 memcached_return_t rc;
238 LIBMEMCACHED_MEMCACHED_SET_START();
239 rc= memcached_send(ptr, key, key_length,
240 key, key_length, value, value_length,
241 expiration, flags, 0, SET_OP);
242 LIBMEMCACHED_MEMCACHED_SET_END();
243 return rc;
244 }
245
246 memcached_return_t memcached_add(memcached_st *ptr,
247 const char *key, size_t key_length,
248 const char *value, size_t value_length,
249 time_t expiration,
250 uint32_t flags)
251 {
252 memcached_return_t rc;
253 LIBMEMCACHED_MEMCACHED_ADD_START();
254 rc= memcached_send(ptr, key, key_length,
255 key, key_length, value, value_length,
256 expiration, flags, 0, ADD_OP);
257 LIBMEMCACHED_MEMCACHED_ADD_END();
258 return rc;
259 }
260
261 memcached_return_t memcached_replace(memcached_st *ptr,
262 const char *key, size_t key_length,
263 const char *value, size_t value_length,
264 time_t expiration,
265 uint32_t flags)
266 {
267 memcached_return_t rc;
268 LIBMEMCACHED_MEMCACHED_REPLACE_START();
269 rc= memcached_send(ptr, key, key_length,
270 key, key_length, value, value_length,
271 expiration, flags, 0, REPLACE_OP);
272 LIBMEMCACHED_MEMCACHED_REPLACE_END();
273 return rc;
274 }
275
276 memcached_return_t memcached_prepend(memcached_st *ptr,
277 const char *key, size_t key_length,
278 const char *value, size_t value_length,
279 time_t expiration,
280 uint32_t flags)
281 {
282 memcached_return_t rc;
283 rc= memcached_send(ptr, key, key_length,
284 key, key_length, value, value_length,
285 expiration, flags, 0, PREPEND_OP);
286 return rc;
287 }
288
289 memcached_return_t memcached_append(memcached_st *ptr,
290 const char *key, size_t key_length,
291 const char *value, size_t value_length,
292 time_t expiration,
293 uint32_t flags)
294 {
295 memcached_return_t rc;
296 rc= memcached_send(ptr, key, key_length,
297 key, key_length, value, value_length,
298 expiration, flags, 0, APPEND_OP);
299 return rc;
300 }
301
302 memcached_return_t memcached_cas(memcached_st *ptr,
303 const char *key, size_t key_length,
304 const char *value, size_t value_length,
305 time_t expiration,
306 uint32_t flags,
307 uint64_t cas)
308 {
309 memcached_return_t rc;
310 rc= memcached_send(ptr, key, key_length,
311 key, key_length, value, value_length,
312 expiration, flags, cas, CAS_OP);
313 return rc;
314 }
315
316 memcached_return_t memcached_set_by_key(memcached_st *ptr,
317 const char *group_key,
318 size_t group_key_length,
319 const char *key, size_t key_length,
320 const char *value, size_t value_length,
321 time_t expiration,
322 uint32_t flags)
323 {
324 memcached_return_t rc;
325 LIBMEMCACHED_MEMCACHED_SET_START();
326 rc= memcached_send(ptr, group_key, group_key_length,
327 key, key_length, value, value_length,
328 expiration, flags, 0, SET_OP);
329 LIBMEMCACHED_MEMCACHED_SET_END();
330 return rc;
331 }
332
333 memcached_return_t memcached_add_by_key(memcached_st *ptr,
334 const char *group_key, size_t group_key_length,
335 const char *key, size_t key_length,
336 const char *value, size_t value_length,
337 time_t expiration,
338 uint32_t flags)
339 {
340 memcached_return_t rc;
341 LIBMEMCACHED_MEMCACHED_ADD_START();
342 rc= memcached_send(ptr, group_key, group_key_length,
343 key, key_length, value, value_length,
344 expiration, flags, 0, ADD_OP);
345 LIBMEMCACHED_MEMCACHED_ADD_END();
346 return rc;
347 }
348
349 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
350 const char *group_key, size_t group_key_length,
351 const char *key, size_t key_length,
352 const char *value, size_t value_length,
353 time_t expiration,
354 uint32_t flags)
355 {
356 memcached_return_t rc;
357 LIBMEMCACHED_MEMCACHED_REPLACE_START();
358 rc= memcached_send(ptr, group_key, group_key_length,
359 key, key_length, value, value_length,
360 expiration, flags, 0, REPLACE_OP);
361 LIBMEMCACHED_MEMCACHED_REPLACE_END();
362 return rc;
363 }
364
365 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
366 const char *group_key, size_t group_key_length,
367 const char *key, size_t key_length,
368 const char *value, size_t value_length,
369 time_t expiration,
370 uint32_t flags)
371 {
372 memcached_return_t rc;
373 rc= memcached_send(ptr, group_key, group_key_length,
374 key, key_length, value, value_length,
375 expiration, flags, 0, PREPEND_OP);
376 return rc;
377 }
378
379 memcached_return_t memcached_append_by_key(memcached_st *ptr,
380 const char *group_key, size_t group_key_length,
381 const char *key, size_t key_length,
382 const char *value, size_t value_length,
383 time_t expiration,
384 uint32_t flags)
385 {
386 memcached_return_t rc;
387 rc= memcached_send(ptr, group_key, group_key_length,
388 key, key_length, value, value_length,
389 expiration, flags, 0, APPEND_OP);
390 return rc;
391 }
392
393 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
394 const char *group_key, size_t group_key_length,
395 const char *key, size_t key_length,
396 const char *value, size_t value_length,
397 time_t expiration,
398 uint32_t flags,
399 uint64_t cas)
400 {
401 memcached_return_t rc;
402 rc= memcached_send(ptr, group_key, group_key_length,
403 key, key_length, value, value_length,
404 expiration, flags, cas, CAS_OP);
405 return rc;
406 }
407
408 static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply)
409 {
410 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
411 * be used uninitialized in this function. FAIL */
412 uint8_t ret= 0;
413
414 if (noreply)
415 switch (verb)
416 {
417 case SET_OP:
418 ret=PROTOCOL_BINARY_CMD_SETQ;
419 break;
420 case ADD_OP:
421 ret=PROTOCOL_BINARY_CMD_ADDQ;
422 break;
423 case CAS_OP: /* FALLTHROUGH */
424 case REPLACE_OP:
425 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
426 break;
427 case APPEND_OP:
428 ret=PROTOCOL_BINARY_CMD_APPENDQ;
429 break;
430 case PREPEND_OP:
431 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
432 break;
433 default:
434 WATCHPOINT_ASSERT(verb);
435 break;
436 }
437 else
438 switch (verb)
439 {
440 case SET_OP:
441 ret=PROTOCOL_BINARY_CMD_SET;
442 break;
443 case ADD_OP:
444 ret=PROTOCOL_BINARY_CMD_ADD;
445 break;
446 case CAS_OP: /* FALLTHROUGH */
447 case REPLACE_OP:
448 ret=PROTOCOL_BINARY_CMD_REPLACE;
449 break;
450 case APPEND_OP:
451 ret=PROTOCOL_BINARY_CMD_APPEND;
452 break;
453 case PREPEND_OP:
454 ret=PROTOCOL_BINARY_CMD_PREPEND;
455 break;
456 default:
457 WATCHPOINT_ASSERT(verb);
458 break;
459 }
460
461 return ret;
462 }
463
464
465
466 static memcached_return_t memcached_send_binary(memcached_st *ptr,
467 memcached_server_write_instance_st server,
468 uint32_t server_key,
469 const char *key,
470 size_t key_length,
471 const char *value,
472 size_t value_length,
473 time_t expiration,
474 uint32_t flags,
475 uint64_t cas,
476 memcached_storage_action_t verb)
477 {
478 bool flush;
479 protocol_binary_request_set request= {};
480 size_t send_length= sizeof(request.bytes);
481
482 bool noreply= server->root->flags.no_reply;
483
484 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
485 request.message.header.request.opcode= get_com_code(verb, noreply);
486 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
487 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
488 if (verb == APPEND_OP || verb == PREPEND_OP)
489 send_length -= 8; /* append & prepend does not contain extras! */
490 else
491 {
492 request.message.header.request.extlen= 8;
493 request.message.body.flags= htonl(flags);
494 request.message.body.expiration= htonl((uint32_t)expiration);
495 }
496
497 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->_namespace) + value_length +
498 request.message.header.request.extlen));
499
500 if (cas)
501 request.message.header.request.cas= memcached_htonll(cas);
502
503 flush= (bool) ((server->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1);
504
505 if (server->root->flags.use_udp && ! flush)
506 {
507 size_t cmd_size= send_length + key_length + value_length;
508
509 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
510 {
511 return MEMCACHED_WRITE_FAILURE;
512 }
513 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
514 {
515 memcached_io_write(server, NULL, 0, true);
516 }
517 }
518
519 struct libmemcached_io_vector_st vector[]=
520 {
521 { send_length, request.bytes },
522 { memcached_array_size(ptr->_namespace), memcached_array_string(ptr->_namespace) },
523 { key_length, key },
524 { value_length, value }
525 };
526
527 /* write the header */
528 memcached_return_t rc;
529 if ((rc= memcached_vdo(server, vector, 4, flush)) != MEMCACHED_SUCCESS)
530 {
531 memcached_io_reset(server);
532 return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc;
533 }
534
535 if (verb == SET_OP && ptr->number_of_replicas > 0)
536 {
537 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
538 WATCHPOINT_STRING("replicating");
539
540 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
541 {
542 memcached_server_write_instance_st instance;
543
544 ++server_key;
545 if (server_key == memcached_server_count(ptr))
546 server_key= 0;
547
548 instance= memcached_server_instance_fetch(ptr, server_key);
549
550 if (memcached_vdo(instance, vector, 4, false) != MEMCACHED_SUCCESS)
551 {
552 memcached_io_reset(instance);
553 }
554 else
555 {
556 memcached_server_response_decrement(instance);
557 }
558 }
559 }
560
561 if (flush == false)
562 {
563 return MEMCACHED_BUFFERED;
564 }
565
566 if (noreply)
567 {
568 return MEMCACHED_SUCCESS;
569 }
570
571 return memcached_response(server, NULL, 0, NULL);
572 }
573