2 * Copyright (C) 2006-2009 Brian Aker
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
8 * Summary: Storage related functions, aka set, replace,..
12 #include <libmemcached/common.h>
14 enum memcached_storage_action_t
{
24 static inline const char *storage_op_string(memcached_storage_action_t verb
)
41 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
47 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
48 memcached_server_write_instance_st server
,
57 memcached_storage_action_t verb
);
59 static inline memcached_return_t
memcached_send(memcached_st
*ptr
,
60 const char *group_key
, size_t group_key_length
,
61 const char *key
, size_t key_length
,
62 const char *value
, size_t value_length
,
66 memcached_storage_action_t verb
)
70 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
72 WATCHPOINT_ASSERT(!(value
== NULL
&& value_length
> 0));
74 memcached_return_t rc
;
75 if (memcached_failed(rc
= initialize_query(ptr
)))
80 if (memcached_failed(rc
= memcached_validate_key_length(key_length
, ptr
->flags
.binary_protocol
)))
85 if (memcached_failed(memcached_key_test(*ptr
, (const char **)&key
, &key_length
, 1)))
87 return MEMCACHED_BAD_KEY_PROVIDED
;
90 uint32_t server_key
= memcached_generate_hash_with_redistribution(ptr
, group_key
, group_key_length
);
91 memcached_server_write_instance_st instance
= memcached_server_instance_fetch(ptr
, server_key
);
93 WATCHPOINT_SET(instance
->io_wait_count
.read
= 0);
94 WATCHPOINT_SET(instance
->io_wait_count
.write
= 0);
96 if (ptr
->flags
.binary_protocol
)
98 rc
= memcached_send_binary(ptr
, instance
, server_key
,
100 value
, value_length
, expiration
,
102 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.read
> 2, "read IO_WAIT", instance
->io_wait_count
.read
);
103 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.write
> 2, "write_IO_WAIT", instance
->io_wait_count
.write
);
111 check_length
= snprintf(buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
,
112 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
113 storage_op_string(verb
),
114 memcached_print_array(ptr
->_namespace
),
115 (int)key_length
, key
, flags
,
116 (unsigned long long)expiration
, (unsigned long)value_length
,
117 (unsigned long long)cas
,
118 (ptr
->flags
.no_reply
) ? " noreply" : "");
119 if (check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
|| check_length
< 0)
121 rc
= memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
122 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
123 memcached_io_reset(instance
);
127 write_length
= check_length
;
131 char *buffer_ptr
= buffer
;
132 const char *command
= storage_op_string(verb
);
134 /* Copy in the command, no space needed, we handle that in the command function*/
135 memcpy(buffer_ptr
, command
, strlen(command
));
137 /* Copy in the key prefix, switch to the buffer_ptr */
138 buffer_ptr
= (char *)memcpy((char *)(buffer_ptr
+ strlen(command
)), (char *)memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
));
140 /* Copy in the key, adjust point if a key prefix was used. */
141 buffer_ptr
= (char *)memcpy(buffer_ptr
+ memcached_array_size(ptr
->_namespace
),
143 buffer_ptr
+= key_length
;
147 write_length
= (size_t)(buffer_ptr
- buffer
);
148 int check_length
= snprintf(buffer_ptr
, MEMCACHED_DEFAULT_COMMAND_SIZE
-(size_t)(buffer_ptr
- buffer
),
151 (unsigned long long)expiration
, (unsigned long)value_length
,
152 ptr
->flags
.no_reply
? " noreply" : "");
153 if ((size_t)check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
-size_t(buffer_ptr
- buffer
) || check_length
< 0)
155 rc
= memcached_set_error(*ptr
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
156 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
157 memcached_io_reset(instance
);
162 write_length
+= (size_t)check_length
;
163 WATCHPOINT_ASSERT(write_length
< MEMCACHED_DEFAULT_COMMAND_SIZE
);
166 if (ptr
->flags
.use_udp
&& ptr
->flags
.buffer_requests
)
168 size_t cmd_size
= write_length
+ value_length
+2;
169 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
170 return memcached_set_error(*ptr
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
);
172 if (cmd_size
+ instance
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
173 memcached_io_write(instance
, NULL
, 0, true);
176 if (write_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
)
178 rc
= memcached_set_error(*ptr
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
);
182 struct libmemcached_io_vector_st vector
[]=
184 { write_length
, buffer
},
185 { value_length
, value
},
189 if (ptr
->flags
.buffer_requests
&& verb
== SET_OP
)
198 /* Send command header */
199 rc
= memcached_vdo(instance
, vector
, 3, to_write
);
200 if (rc
== MEMCACHED_SUCCESS
)
203 if (ptr
->flags
.no_reply
)
205 rc
= (to_write
== false) ? MEMCACHED_BUFFERED
: MEMCACHED_SUCCESS
;
207 else if (to_write
== false)
209 rc
= MEMCACHED_BUFFERED
;
213 rc
= memcached_response(instance
, buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, NULL
);
215 if (rc
== MEMCACHED_STORED
)
216 rc
= MEMCACHED_SUCCESS
;
221 if (rc
== MEMCACHED_WRITE_FAILURE
)
222 memcached_io_reset(instance
);
225 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.read
> 2, "read IO_WAIT", instance
->io_wait_count
.read
);
226 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.write
> 2, "write_IO_WAIT", instance
->io_wait_count
.write
);
232 memcached_return_t
memcached_set(memcached_st
*ptr
, const char *key
, size_t key_length
,
233 const char *value
, size_t value_length
,
237 memcached_return_t rc
;
238 LIBMEMCACHED_MEMCACHED_SET_START();
239 rc
= memcached_send(ptr
, key
, key_length
,
240 key
, key_length
, value
, value_length
,
241 expiration
, flags
, 0, SET_OP
);
242 LIBMEMCACHED_MEMCACHED_SET_END();
246 memcached_return_t
memcached_add(memcached_st
*ptr
,
247 const char *key
, size_t key_length
,
248 const char *value
, size_t value_length
,
252 memcached_return_t rc
;
253 LIBMEMCACHED_MEMCACHED_ADD_START();
254 rc
= memcached_send(ptr
, key
, key_length
,
255 key
, key_length
, value
, value_length
,
256 expiration
, flags
, 0, ADD_OP
);
257 LIBMEMCACHED_MEMCACHED_ADD_END();
261 memcached_return_t
memcached_replace(memcached_st
*ptr
,
262 const char *key
, size_t key_length
,
263 const char *value
, size_t value_length
,
267 memcached_return_t rc
;
268 LIBMEMCACHED_MEMCACHED_REPLACE_START();
269 rc
= memcached_send(ptr
, key
, key_length
,
270 key
, key_length
, value
, value_length
,
271 expiration
, flags
, 0, REPLACE_OP
);
272 LIBMEMCACHED_MEMCACHED_REPLACE_END();
276 memcached_return_t
memcached_prepend(memcached_st
*ptr
,
277 const char *key
, size_t key_length
,
278 const char *value
, size_t value_length
,
282 memcached_return_t rc
;
283 rc
= memcached_send(ptr
, key
, key_length
,
284 key
, key_length
, value
, value_length
,
285 expiration
, flags
, 0, PREPEND_OP
);
289 memcached_return_t
memcached_append(memcached_st
*ptr
,
290 const char *key
, size_t key_length
,
291 const char *value
, size_t value_length
,
295 memcached_return_t rc
;
296 rc
= memcached_send(ptr
, key
, key_length
,
297 key
, key_length
, value
, value_length
,
298 expiration
, flags
, 0, APPEND_OP
);
302 memcached_return_t
memcached_cas(memcached_st
*ptr
,
303 const char *key
, size_t key_length
,
304 const char *value
, size_t value_length
,
309 memcached_return_t rc
;
310 rc
= memcached_send(ptr
, key
, key_length
,
311 key
, key_length
, value
, value_length
,
312 expiration
, flags
, cas
, CAS_OP
);
316 memcached_return_t
memcached_set_by_key(memcached_st
*ptr
,
317 const char *group_key
,
318 size_t group_key_length
,
319 const char *key
, size_t key_length
,
320 const char *value
, size_t value_length
,
324 memcached_return_t rc
;
325 LIBMEMCACHED_MEMCACHED_SET_START();
326 rc
= memcached_send(ptr
, group_key
, group_key_length
,
327 key
, key_length
, value
, value_length
,
328 expiration
, flags
, 0, SET_OP
);
329 LIBMEMCACHED_MEMCACHED_SET_END();
333 memcached_return_t
memcached_add_by_key(memcached_st
*ptr
,
334 const char *group_key
, size_t group_key_length
,
335 const char *key
, size_t key_length
,
336 const char *value
, size_t value_length
,
340 memcached_return_t rc
;
341 LIBMEMCACHED_MEMCACHED_ADD_START();
342 rc
= memcached_send(ptr
, group_key
, group_key_length
,
343 key
, key_length
, value
, value_length
,
344 expiration
, flags
, 0, ADD_OP
);
345 LIBMEMCACHED_MEMCACHED_ADD_END();
349 memcached_return_t
memcached_replace_by_key(memcached_st
*ptr
,
350 const char *group_key
, size_t group_key_length
,
351 const char *key
, size_t key_length
,
352 const char *value
, size_t value_length
,
356 memcached_return_t rc
;
357 LIBMEMCACHED_MEMCACHED_REPLACE_START();
358 rc
= memcached_send(ptr
, group_key
, group_key_length
,
359 key
, key_length
, value
, value_length
,
360 expiration
, flags
, 0, REPLACE_OP
);
361 LIBMEMCACHED_MEMCACHED_REPLACE_END();
365 memcached_return_t
memcached_prepend_by_key(memcached_st
*ptr
,
366 const char *group_key
, size_t group_key_length
,
367 const char *key
, size_t key_length
,
368 const char *value
, size_t value_length
,
372 memcached_return_t rc
;
373 rc
= memcached_send(ptr
, group_key
, group_key_length
,
374 key
, key_length
, value
, value_length
,
375 expiration
, flags
, 0, PREPEND_OP
);
379 memcached_return_t
memcached_append_by_key(memcached_st
*ptr
,
380 const char *group_key
, size_t group_key_length
,
381 const char *key
, size_t key_length
,
382 const char *value
, size_t value_length
,
386 memcached_return_t rc
;
387 rc
= memcached_send(ptr
, group_key
, group_key_length
,
388 key
, key_length
, value
, value_length
,
389 expiration
, flags
, 0, APPEND_OP
);
393 memcached_return_t
memcached_cas_by_key(memcached_st
*ptr
,
394 const char *group_key
, size_t group_key_length
,
395 const char *key
, size_t key_length
,
396 const char *value
, size_t value_length
,
401 memcached_return_t rc
;
402 rc
= memcached_send(ptr
, group_key
, group_key_length
,
403 key
, key_length
, value
, value_length
,
404 expiration
, flags
, cas
, CAS_OP
);
408 static inline uint8_t get_com_code(memcached_storage_action_t verb
, bool noreply
)
410 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
411 * be used uninitialized in this function. FAIL */
418 ret
=PROTOCOL_BINARY_CMD_SETQ
;
421 ret
=PROTOCOL_BINARY_CMD_ADDQ
;
423 case CAS_OP
: /* FALLTHROUGH */
425 ret
=PROTOCOL_BINARY_CMD_REPLACEQ
;
428 ret
=PROTOCOL_BINARY_CMD_APPENDQ
;
431 ret
=PROTOCOL_BINARY_CMD_PREPENDQ
;
434 WATCHPOINT_ASSERT(verb
);
441 ret
=PROTOCOL_BINARY_CMD_SET
;
444 ret
=PROTOCOL_BINARY_CMD_ADD
;
446 case CAS_OP
: /* FALLTHROUGH */
448 ret
=PROTOCOL_BINARY_CMD_REPLACE
;
451 ret
=PROTOCOL_BINARY_CMD_APPEND
;
454 ret
=PROTOCOL_BINARY_CMD_PREPEND
;
457 WATCHPOINT_ASSERT(verb
);
466 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
467 memcached_server_write_instance_st server
,
476 memcached_storage_action_t verb
)
479 protocol_binary_request_set request
= {};
480 size_t send_length
= sizeof(request
.bytes
);
482 bool noreply
= server
->root
->flags
.no_reply
;
484 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
485 request
.message
.header
.request
.opcode
= get_com_code(verb
, noreply
);
486 request
.message
.header
.request
.keylen
= htons((uint16_t)(key_length
+ memcached_array_size(ptr
->_namespace
)));
487 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
488 if (verb
== APPEND_OP
|| verb
== PREPEND_OP
)
489 send_length
-= 8; /* append & prepend does not contain extras! */
492 request
.message
.header
.request
.extlen
= 8;
493 request
.message
.body
.flags
= htonl(flags
);
494 request
.message
.body
.expiration
= htonl((uint32_t)expiration
);
497 request
.message
.header
.request
.bodylen
= htonl((uint32_t) (key_length
+ memcached_array_size(ptr
->_namespace
) + value_length
+
498 request
.message
.header
.request
.extlen
));
501 request
.message
.header
.request
.cas
= memcached_htonll(cas
);
503 flush
= (bool) ((server
->root
->flags
.buffer_requests
&& verb
== SET_OP
) ? 0 : 1);
505 if (server
->root
->flags
.use_udp
&& ! flush
)
507 size_t cmd_size
= send_length
+ key_length
+ value_length
;
509 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
511 return MEMCACHED_WRITE_FAILURE
;
513 if (cmd_size
+ server
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
515 memcached_io_write(server
, NULL
, 0, true);
519 struct libmemcached_io_vector_st vector
[]=
521 { send_length
, request
.bytes
},
522 { memcached_array_size(ptr
->_namespace
), memcached_array_string(ptr
->_namespace
) },
524 { value_length
, value
}
527 /* write the header */
528 memcached_return_t rc
;
529 if ((rc
= memcached_vdo(server
, vector
, 4, flush
)) != MEMCACHED_SUCCESS
)
531 memcached_io_reset(server
);
532 return (rc
== MEMCACHED_SUCCESS
) ? MEMCACHED_WRITE_FAILURE
: rc
;
535 if (verb
== SET_OP
&& ptr
->number_of_replicas
> 0)
537 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_SETQ
;
538 WATCHPOINT_STRING("replicating");
540 for (uint32_t x
= 0; x
< ptr
->number_of_replicas
; x
++)
542 memcached_server_write_instance_st instance
;
545 if (server_key
== memcached_server_count(ptr
))
548 instance
= memcached_server_instance_fetch(ptr
, server_key
);
550 if (memcached_vdo(instance
, vector
, 4, false) != MEMCACHED_SUCCESS
)
552 memcached_io_reset(instance
);
556 memcached_server_response_decrement(instance
);
563 return MEMCACHED_BUFFERED
;
568 return MEMCACHED_SUCCESS
;
571 return memcached_response(server
, NULL
, 0, NULL
);