2 * Copyright (C) 2006-2009 Brian Aker
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
8 * Summary: Storage related functions, aka set, replace,..
12 #include <libmemcached/common.h>
14 enum memcached_storage_action_t
{
24 static inline const char *storage_op_string(memcached_storage_action_t verb
)
50 static inline uint8_t get_com_code(memcached_storage_action_t verb
, bool noreply
)
52 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
53 * be used uninitialized in this function. FAIL */
60 ret
=PROTOCOL_BINARY_CMD_SETQ
;
63 ret
=PROTOCOL_BINARY_CMD_ADDQ
;
65 case CAS_OP
: /* FALLTHROUGH */
67 ret
=PROTOCOL_BINARY_CMD_REPLACEQ
;
70 ret
=PROTOCOL_BINARY_CMD_APPENDQ
;
73 ret
=PROTOCOL_BINARY_CMD_PREPENDQ
;
76 WATCHPOINT_ASSERT(verb
);
83 ret
=PROTOCOL_BINARY_CMD_SET
;
86 ret
=PROTOCOL_BINARY_CMD_ADD
;
88 case CAS_OP
: /* FALLTHROUGH */
90 ret
=PROTOCOL_BINARY_CMD_REPLACE
;
93 ret
=PROTOCOL_BINARY_CMD_APPEND
;
96 ret
=PROTOCOL_BINARY_CMD_PREPEND
;
99 WATCHPOINT_ASSERT(verb
);
106 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
107 memcached_server_write_instance_st server
,
116 memcached_storage_action_t verb
)
119 protocol_binary_request_set request
= {};
120 size_t send_length
= sizeof(request
.bytes
);
122 bool noreply
= server
->root
->flags
.no_reply
;
124 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
125 request
.message
.header
.request
.opcode
= get_com_code(verb
, noreply
);
126 request
.message
.header
.request
.keylen
= htons((uint16_t)(key_length
+ memcached_array_size(ptr
->_namespace
)));
127 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
128 if (verb
== APPEND_OP
|| verb
== PREPEND_OP
)
129 send_length
-= 8; /* append & prepend does not contain extras! */
132 request
.message
.header
.request
.extlen
= 8;
133 request
.message
.body
.flags
= htonl(flags
);
134 request
.message
.body
.expiration
= htonl((uint32_t)expiration
);
137 request
.message
.header
.request
.bodylen
= htonl((uint32_t) (key_length
+ memcached_array_size(ptr
->_namespace
) + value_length
+
138 request
.message
.header
.request
.extlen
));
141 request
.message
.header
.request
.cas
= memcached_htonll(cas
);
143 flush
= (bool) ((server
->root
->flags
.buffer_requests
&& verb
== SET_OP
) ? 0 : 1);
145 if (server
->root
->flags
.use_udp
&& ! flush
)
147 size_t cmd_size
= send_length
+ key_length
+ value_length
;
149 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
151 return MEMCACHED_WRITE_FAILURE
;
153 if (cmd_size
+ server
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
155 memcached_io_write(server
, NULL
, 0, true);
159 struct libmemcached_io_vector_st vector
[]=
161 { send_length
, request
.bytes
},
162 { memcached_array_size(ptr
->_namespace
), memcached_array_string(ptr
->_namespace
) },
164 { value_length
, value
}
167 /* write the header */
168 memcached_return_t rc
;
169 if ((rc
= memcached_vdo(server
, vector
, 4, flush
)) != MEMCACHED_SUCCESS
)
171 memcached_io_reset(server
);
172 return (rc
== MEMCACHED_SUCCESS
) ? MEMCACHED_WRITE_FAILURE
: rc
;
175 if (verb
== SET_OP
&& ptr
->number_of_replicas
> 0)
177 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_SETQ
;
178 WATCHPOINT_STRING("replicating");
180 for (uint32_t x
= 0; x
< ptr
->number_of_replicas
; x
++)
182 memcached_server_write_instance_st instance
;
185 if (server_key
== memcached_server_count(ptr
))
188 instance
= memcached_server_instance_fetch(ptr
, server_key
);
190 if (memcached_vdo(instance
, vector
, 4, false) != MEMCACHED_SUCCESS
)
192 memcached_io_reset(instance
);
196 memcached_server_response_decrement(instance
);
203 return MEMCACHED_BUFFERED
;
208 return MEMCACHED_SUCCESS
;
211 return memcached_response(server
, NULL
, 0, NULL
);
214 static memcached_return_t
memcached_send_ascii(memcached_st
*ptr
,
215 memcached_server_write_instance_st instance
,
223 memcached_storage_action_t verb
)
227 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
228 memcached_return_t rc
;
233 check_length
= snprintf(buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
,
234 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
235 storage_op_string(verb
),
236 memcached_print_array(ptr
->_namespace
),
237 (int)key_length
, key
, flags
,
238 (unsigned long long)expiration
, (unsigned long)value_length
,
239 (unsigned long long)cas
,
240 (ptr
->flags
.no_reply
) ? " noreply" : "");
241 if (check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
|| check_length
< 0)
243 rc
= memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
244 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
245 memcached_io_reset(instance
);
249 write_length
= check_length
;
253 char *buffer_ptr
= buffer
;
254 const char *command
= storage_op_string(verb
);
256 /* Copy in the command, no space needed, we handle that in the command function*/
257 memcpy(buffer_ptr
, command
, strlen(command
));
259 /* Copy in the key prefix, switch to the buffer_ptr */
260 buffer_ptr
= (char *)memcpy((char *)(buffer_ptr
+ strlen(command
)), (char *)memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
));
262 /* Copy in the key, adjust point if a key prefix was used. */
263 buffer_ptr
= (char *)memcpy(buffer_ptr
+ memcached_array_size(ptr
->_namespace
),
265 buffer_ptr
+= key_length
;
269 write_length
= (size_t)(buffer_ptr
- buffer
);
270 int check_length
= snprintf(buffer_ptr
, MEMCACHED_DEFAULT_COMMAND_SIZE
-(size_t)(buffer_ptr
- buffer
),
273 (unsigned long long)expiration
, (unsigned long)value_length
,
274 ptr
->flags
.no_reply
? " noreply" : "");
275 if ((size_t)check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
-size_t(buffer_ptr
- buffer
) || check_length
< 0)
277 rc
= memcached_set_error(*ptr
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
278 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
279 memcached_io_reset(instance
);
284 write_length
+= (size_t)check_length
;
285 WATCHPOINT_ASSERT(write_length
< MEMCACHED_DEFAULT_COMMAND_SIZE
);
288 if (ptr
->flags
.use_udp
&& ptr
->flags
.buffer_requests
)
290 size_t cmd_size
= write_length
+ value_length
+2;
291 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
292 return memcached_set_error(*ptr
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
);
294 if (cmd_size
+ instance
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
295 memcached_io_write(instance
, NULL
, 0, true);
298 if (write_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
)
300 rc
= memcached_set_error(*ptr
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
);
304 struct libmemcached_io_vector_st vector
[]=
306 { write_length
, buffer
},
307 { value_length
, value
},
311 if (ptr
->flags
.buffer_requests
&& verb
== SET_OP
)
320 /* Send command header */
321 rc
= memcached_vdo(instance
, vector
, 3, to_write
);
322 if (rc
== MEMCACHED_SUCCESS
)
325 if (ptr
->flags
.no_reply
)
327 rc
= (to_write
== false) ? MEMCACHED_BUFFERED
: MEMCACHED_SUCCESS
;
329 else if (to_write
== false)
331 rc
= MEMCACHED_BUFFERED
;
335 rc
= memcached_response(instance
, buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, NULL
);
337 if (rc
== MEMCACHED_STORED
)
338 rc
= MEMCACHED_SUCCESS
;
343 if (rc
== MEMCACHED_WRITE_FAILURE
)
344 memcached_io_reset(instance
);
349 static inline memcached_return_t
memcached_send(memcached_st
*ptr
,
350 const char *group_key
, size_t group_key_length
,
351 const char *key
, size_t key_length
,
352 const char *value
, size_t value_length
,
356 memcached_storage_action_t verb
)
358 WATCHPOINT_ASSERT(!(value
== NULL
&& value_length
> 0));
360 memcached_return_t rc
;
361 if (memcached_failed(rc
= initialize_query(ptr
)))
366 if (memcached_failed(rc
= memcached_validate_key_length(key_length
, ptr
->flags
.binary_protocol
)))
371 if (memcached_failed(memcached_key_test(*ptr
, (const char **)&key
, &key_length
, 1)))
373 return MEMCACHED_BAD_KEY_PROVIDED
;
376 uint32_t server_key
= memcached_generate_hash_with_redistribution(ptr
, group_key
, group_key_length
);
377 memcached_server_write_instance_st instance
= memcached_server_instance_fetch(ptr
, server_key
);
379 WATCHPOINT_SET(instance
->io_wait_count
.read
= 0);
380 WATCHPOINT_SET(instance
->io_wait_count
.write
= 0);
382 if (ptr
->flags
.binary_protocol
)
384 rc
= memcached_send_binary(ptr
, instance
, server_key
,
386 value
, value_length
, expiration
,
391 rc
= memcached_send_ascii(ptr
, instance
,
393 value
, value_length
, expiration
,
401 memcached_return_t
memcached_set(memcached_st
*ptr
, const char *key
, size_t key_length
,
402 const char *value
, size_t value_length
,
406 memcached_return_t rc
;
407 LIBMEMCACHED_MEMCACHED_SET_START();
408 rc
= memcached_send(ptr
, key
, key_length
,
409 key
, key_length
, value
, value_length
,
410 expiration
, flags
, 0, SET_OP
);
411 LIBMEMCACHED_MEMCACHED_SET_END();
415 memcached_return_t
memcached_add(memcached_st
*ptr
,
416 const char *key
, size_t key_length
,
417 const char *value
, size_t value_length
,
421 memcached_return_t rc
;
422 LIBMEMCACHED_MEMCACHED_ADD_START();
423 rc
= memcached_send(ptr
, key
, key_length
,
424 key
, key_length
, value
, value_length
,
425 expiration
, flags
, 0, ADD_OP
);
426 LIBMEMCACHED_MEMCACHED_ADD_END();
430 memcached_return_t
memcached_replace(memcached_st
*ptr
,
431 const char *key
, size_t key_length
,
432 const char *value
, size_t value_length
,
436 memcached_return_t rc
;
437 LIBMEMCACHED_MEMCACHED_REPLACE_START();
438 rc
= memcached_send(ptr
, key
, key_length
,
439 key
, key_length
, value
, value_length
,
440 expiration
, flags
, 0, REPLACE_OP
);
441 LIBMEMCACHED_MEMCACHED_REPLACE_END();
445 memcached_return_t
memcached_prepend(memcached_st
*ptr
,
446 const char *key
, size_t key_length
,
447 const char *value
, size_t value_length
,
451 memcached_return_t rc
;
452 rc
= memcached_send(ptr
, key
, key_length
,
453 key
, key_length
, value
, value_length
,
454 expiration
, flags
, 0, PREPEND_OP
);
458 memcached_return_t
memcached_append(memcached_st
*ptr
,
459 const char *key
, size_t key_length
,
460 const char *value
, size_t value_length
,
464 memcached_return_t rc
;
465 rc
= memcached_send(ptr
, key
, key_length
,
466 key
, key_length
, value
, value_length
,
467 expiration
, flags
, 0, APPEND_OP
);
471 memcached_return_t
memcached_cas(memcached_st
*ptr
,
472 const char *key
, size_t key_length
,
473 const char *value
, size_t value_length
,
478 memcached_return_t rc
;
479 rc
= memcached_send(ptr
, key
, key_length
,
480 key
, key_length
, value
, value_length
,
481 expiration
, flags
, cas
, CAS_OP
);
485 memcached_return_t
memcached_set_by_key(memcached_st
*ptr
,
486 const char *group_key
,
487 size_t group_key_length
,
488 const char *key
, size_t key_length
,
489 const char *value
, size_t value_length
,
493 memcached_return_t rc
;
494 LIBMEMCACHED_MEMCACHED_SET_START();
495 rc
= memcached_send(ptr
, group_key
, group_key_length
,
496 key
, key_length
, value
, value_length
,
497 expiration
, flags
, 0, SET_OP
);
498 LIBMEMCACHED_MEMCACHED_SET_END();
502 memcached_return_t
memcached_add_by_key(memcached_st
*ptr
,
503 const char *group_key
, size_t group_key_length
,
504 const char *key
, size_t key_length
,
505 const char *value
, size_t value_length
,
509 memcached_return_t rc
;
510 LIBMEMCACHED_MEMCACHED_ADD_START();
511 rc
= memcached_send(ptr
, group_key
, group_key_length
,
512 key
, key_length
, value
, value_length
,
513 expiration
, flags
, 0, ADD_OP
);
514 LIBMEMCACHED_MEMCACHED_ADD_END();
518 memcached_return_t
memcached_replace_by_key(memcached_st
*ptr
,
519 const char *group_key
, size_t group_key_length
,
520 const char *key
, size_t key_length
,
521 const char *value
, size_t value_length
,
525 memcached_return_t rc
;
526 LIBMEMCACHED_MEMCACHED_REPLACE_START();
527 rc
= memcached_send(ptr
, group_key
, group_key_length
,
528 key
, key_length
, value
, value_length
,
529 expiration
, flags
, 0, REPLACE_OP
);
530 LIBMEMCACHED_MEMCACHED_REPLACE_END();
534 memcached_return_t
memcached_prepend_by_key(memcached_st
*ptr
,
535 const char *group_key
, size_t group_key_length
,
536 const char *key
, size_t key_length
,
537 const char *value
, size_t value_length
,
541 memcached_return_t rc
;
542 rc
= memcached_send(ptr
, group_key
, group_key_length
,
543 key
, key_length
, value
, value_length
,
544 expiration
, flags
, 0, PREPEND_OP
);
548 memcached_return_t
memcached_append_by_key(memcached_st
*ptr
,
549 const char *group_key
, size_t group_key_length
,
550 const char *key
, size_t key_length
,
551 const char *value
, size_t value_length
,
555 memcached_return_t rc
;
556 rc
= memcached_send(ptr
, group_key
, group_key_length
,
557 key
, key_length
, value
, value_length
,
558 expiration
, flags
, 0, APPEND_OP
);
562 memcached_return_t
memcached_cas_by_key(memcached_st
*ptr
,
563 const char *group_key
, size_t group_key_length
,
564 const char *key
, size_t key_length
,
565 const char *value
, size_t value_length
,
570 memcached_return_t rc
;
571 rc
= memcached_send(ptr
, group_key
, group_key_length
,
572 key
, key_length
, value
, value_length
,
573 expiration
, flags
, cas
, CAS_OP
);