2 * Copyright (C) 2006-2009 Brian Aker
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
8 * Summary: Storage related functions, aka set, replace,..
12 #include <libmemcached/common.h>
14 enum memcached_storage_action_t
{
24 static inline const char *storage_op_string(memcached_storage_action_t verb
)
50 static inline uint8_t get_com_code(memcached_storage_action_t verb
, bool noreply
)
57 return PROTOCOL_BINARY_CMD_SETQ
;
60 return PROTOCOL_BINARY_CMD_ADDQ
;
62 case CAS_OP
: /* FALLTHROUGH */
64 return PROTOCOL_BINARY_CMD_REPLACEQ
;
67 return PROTOCOL_BINARY_CMD_APPENDQ
;
70 return PROTOCOL_BINARY_CMD_PREPENDQ
;
80 return PROTOCOL_BINARY_CMD_ADD
;
82 case CAS_OP
: /* FALLTHROUGH */
84 return PROTOCOL_BINARY_CMD_REPLACE
;
87 return PROTOCOL_BINARY_CMD_APPEND
;
90 return PROTOCOL_BINARY_CMD_PREPEND
;
93 return PROTOCOL_BINARY_CMD_SET
;
96 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
97 memcached_server_write_instance_st server
,
107 memcached_storage_action_t verb
)
109 protocol_binary_request_set request
= {};
110 size_t send_length
= sizeof(request
.bytes
);
112 bool noreply
= server
->root
->flags
.no_reply
;
114 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
115 request
.message
.header
.request
.opcode
= get_com_code(verb
, noreply
);
116 request
.message
.header
.request
.keylen
= htons((uint16_t)(key_length
+ memcached_array_size(ptr
->_namespace
)));
117 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
118 if (verb
== APPEND_OP
or verb
== PREPEND_OP
)
120 send_length
-= 8; /* append & prepend does not contain extras! */
124 request
.message
.header
.request
.extlen
= 8;
125 request
.message
.body
.flags
= htonl(flags
);
126 request
.message
.body
.expiration
= htonl((uint32_t)expiration
);
129 request
.message
.header
.request
.bodylen
= htonl((uint32_t) (key_length
+ memcached_array_size(ptr
->_namespace
) + value_length
+
130 request
.message
.header
.request
.extlen
));
134 request
.message
.header
.request
.cas
= memcached_htonll(cas
);
137 if (server
->root
->flags
.use_udp
and flush
== false)
139 size_t cmd_size
= send_length
+ key_length
+ value_length
;
141 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
143 return MEMCACHED_WRITE_FAILURE
;
145 if (cmd_size
+ server
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
147 memcached_io_write(server
, NULL
, 0, true);
151 struct libmemcached_io_vector_st vector
[]=
153 { request
.bytes
, send_length
},
154 { memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
) },
156 { value
, value_length
}
159 /* write the header */
160 memcached_return_t rc
;
161 if ((rc
= memcached_vdo(server
, vector
, 4, flush
)) != MEMCACHED_SUCCESS
)
163 memcached_io_reset(server
);
165 if (ptr
->error_messages
== NULL
)
167 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
170 return MEMCACHED_WRITE_FAILURE
;
173 if (verb
== SET_OP
&& ptr
->number_of_replicas
> 0)
175 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_SETQ
;
176 WATCHPOINT_STRING("replicating");
178 for (uint32_t x
= 0; x
< ptr
->number_of_replicas
; x
++)
181 if (server_key
== memcached_server_count(ptr
))
186 memcached_server_write_instance_st instance
= memcached_server_instance_fetch(ptr
, server_key
);
188 if (memcached_vdo(instance
, vector
, 4, false) != MEMCACHED_SUCCESS
)
190 memcached_io_reset(instance
);
194 memcached_server_response_decrement(instance
);
201 return MEMCACHED_BUFFERED
;
206 return MEMCACHED_SUCCESS
;
209 return memcached_response(server
, NULL
, 0, NULL
);
212 static memcached_return_t
memcached_send_ascii(memcached_st
*ptr
,
213 memcached_server_write_instance_st instance
,
222 memcached_storage_action_t verb
)
225 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
230 check_length
= snprintf(buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
,
231 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
232 storage_op_string(verb
),
233 memcached_print_array(ptr
->_namespace
),
234 (int)key_length
, key
, flags
,
235 (unsigned long long)expiration
, (unsigned long)value_length
,
236 (unsigned long long)cas
,
237 (ptr
->flags
.no_reply
) ? " noreply" : "");
238 if (check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
or check_length
< 0)
240 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
241 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
243 write_length
= check_length
;
247 char *buffer_ptr
= buffer
;
248 const char *command
= storage_op_string(verb
);
250 /* Copy in the command, no space needed, we handle that in the command function*/
251 memcpy(buffer_ptr
, command
, strlen(command
));
253 /* Copy in the key prefix, switch to the buffer_ptr */
254 buffer_ptr
= (char *)memcpy((char *)(buffer_ptr
+ strlen(command
)), (char *)memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
));
256 /* Copy in the key, adjust point if a key prefix was used. */
257 buffer_ptr
= (char *)memcpy(buffer_ptr
+ memcached_array_size(ptr
->_namespace
),
259 buffer_ptr
+= key_length
;
263 write_length
= (size_t)(buffer_ptr
- buffer
);
264 int check_length
= snprintf(buffer_ptr
, MEMCACHED_DEFAULT_COMMAND_SIZE
-(size_t)(buffer_ptr
- buffer
),
267 (unsigned long long)expiration
, (unsigned long)value_length
,
268 ptr
->flags
.no_reply
? " noreply" : "");
269 if ((size_t)check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
-size_t(buffer_ptr
- buffer
) or check_length
< 0)
271 return memcached_set_error(*ptr
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
272 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
275 write_length
+= (size_t)check_length
;
276 WATCHPOINT_ASSERT(write_length
< MEMCACHED_DEFAULT_COMMAND_SIZE
);
279 if (ptr
->flags
.use_udp
and ptr
->flags
.buffer_requests
)
281 size_t cmd_size
= write_length
+ value_length
+2;
282 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
284 return memcached_set_error(*ptr
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
);
287 if (cmd_size
+ instance
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
289 memcached_io_write(instance
, NULL
, 0, true);
293 if (write_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
)
295 return memcached_set_error(*ptr
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
);
298 struct libmemcached_io_vector_st vector
[]=
300 { buffer
, write_length
},
301 { value
, value_length
},
302 { memcached_literal_param("\r\n") }
305 if (memcached_is_udp(instance
->root
) and (write_length
+value_length
+memcached_literal_param_size("\r\n") +UDP_DATAGRAM_HEADER_LENGTH
> MAX_UDP_DATAGRAM_LENGTH
))
307 return memcached_set_error(*instance
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
, memcached_literal_param("UDP packet is too large"));
310 /* Send command header */
311 memcached_return_t rc
= memcached_vdo(instance
, vector
, 3, flush
);
312 if (rc
== MEMCACHED_SUCCESS
)
314 if (ptr
->flags
.no_reply
and flush
)
316 rc
= MEMCACHED_SUCCESS
;
318 else if (flush
== false)
320 rc
= MEMCACHED_BUFFERED
;
324 rc
= memcached_response(instance
, buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, NULL
);
326 if (rc
== MEMCACHED_STORED
)
328 rc
= MEMCACHED_SUCCESS
;
333 if (rc
== MEMCACHED_WRITE_FAILURE
)
335 memcached_io_reset(instance
);
338 if (memcached_failed(rc
) and ptr
->error_messages
== NULL
)
340 memcached_set_error(*ptr
, rc
, MEMCACHED_AT
);
346 static inline memcached_return_t
memcached_send(memcached_st
*ptr
,
347 const char *group_key
, size_t group_key_length
,
348 const char *key
, size_t key_length
,
349 const char *value
, size_t value_length
,
353 memcached_storage_action_t verb
)
355 memcached_return_t rc
;
356 if (memcached_failed(rc
= initialize_query(ptr
)))
361 if (memcached_failed(rc
= memcached_validate_key_length(key_length
, ptr
->flags
.binary_protocol
)))
366 if (memcached_failed(memcached_key_test(*ptr
, (const char **)&key
, &key_length
, 1)))
368 return MEMCACHED_BAD_KEY_PROVIDED
;
371 uint32_t server_key
= memcached_generate_hash_with_redistribution(ptr
, group_key
, group_key_length
);
372 memcached_server_write_instance_st instance
= memcached_server_instance_fetch(ptr
, server_key
);
374 WATCHPOINT_SET(instance
->io_wait_count
.read
= 0);
375 WATCHPOINT_SET(instance
->io_wait_count
.write
= 0);
377 bool flush
= (bool) ((instance
->root
->flags
.buffer_requests
&& verb
== SET_OP
) ? 0 : 1);
378 if (ptr
->flags
.binary_protocol
)
380 rc
= memcached_send_binary(ptr
, instance
, server_key
,
382 value
, value_length
, expiration
,
383 flags
, cas
, flush
, verb
);
387 rc
= memcached_send_ascii(ptr
, instance
,
389 value
, value_length
, expiration
,
390 flags
, cas
, flush
, verb
);
397 memcached_return_t
memcached_set(memcached_st
*ptr
, const char *key
, size_t key_length
,
398 const char *value
, size_t value_length
,
402 memcached_return_t rc
;
403 LIBMEMCACHED_MEMCACHED_SET_START();
404 rc
= memcached_send(ptr
, key
, key_length
,
405 key
, key_length
, value
, value_length
,
406 expiration
, flags
, 0, SET_OP
);
407 LIBMEMCACHED_MEMCACHED_SET_END();
411 memcached_return_t
memcached_add(memcached_st
*ptr
,
412 const char *key
, size_t key_length
,
413 const char *value
, size_t value_length
,
417 memcached_return_t rc
;
418 LIBMEMCACHED_MEMCACHED_ADD_START();
419 rc
= memcached_send(ptr
, key
, key_length
,
420 key
, key_length
, value
, value_length
,
421 expiration
, flags
, 0, ADD_OP
);
422 LIBMEMCACHED_MEMCACHED_ADD_END();
426 memcached_return_t
memcached_replace(memcached_st
*ptr
,
427 const char *key
, size_t key_length
,
428 const char *value
, size_t value_length
,
432 memcached_return_t rc
;
433 LIBMEMCACHED_MEMCACHED_REPLACE_START();
434 rc
= memcached_send(ptr
, key
, key_length
,
435 key
, key_length
, value
, value_length
,
436 expiration
, flags
, 0, REPLACE_OP
);
437 LIBMEMCACHED_MEMCACHED_REPLACE_END();
441 memcached_return_t
memcached_prepend(memcached_st
*ptr
,
442 const char *key
, size_t key_length
,
443 const char *value
, size_t value_length
,
447 memcached_return_t rc
;
448 rc
= memcached_send(ptr
, key
, key_length
,
449 key
, key_length
, value
, value_length
,
450 expiration
, flags
, 0, PREPEND_OP
);
454 memcached_return_t
memcached_append(memcached_st
*ptr
,
455 const char *key
, size_t key_length
,
456 const char *value
, size_t value_length
,
460 memcached_return_t rc
;
461 rc
= memcached_send(ptr
, key
, key_length
,
462 key
, key_length
, value
, value_length
,
463 expiration
, flags
, 0, APPEND_OP
);
467 memcached_return_t
memcached_cas(memcached_st
*ptr
,
468 const char *key
, size_t key_length
,
469 const char *value
, size_t value_length
,
474 memcached_return_t rc
;
475 rc
= memcached_send(ptr
, key
, key_length
,
476 key
, key_length
, value
, value_length
,
477 expiration
, flags
, cas
, CAS_OP
);
481 memcached_return_t
memcached_set_by_key(memcached_st
*ptr
,
482 const char *group_key
,
483 size_t group_key_length
,
484 const char *key
, size_t key_length
,
485 const char *value
, size_t value_length
,
489 memcached_return_t rc
;
490 LIBMEMCACHED_MEMCACHED_SET_START();
491 rc
= memcached_send(ptr
, group_key
, group_key_length
,
492 key
, key_length
, value
, value_length
,
493 expiration
, flags
, 0, SET_OP
);
494 LIBMEMCACHED_MEMCACHED_SET_END();
498 memcached_return_t
memcached_add_by_key(memcached_st
*ptr
,
499 const char *group_key
, size_t group_key_length
,
500 const char *key
, size_t key_length
,
501 const char *value
, size_t value_length
,
505 memcached_return_t rc
;
506 LIBMEMCACHED_MEMCACHED_ADD_START();
507 rc
= memcached_send(ptr
, group_key
, group_key_length
,
508 key
, key_length
, value
, value_length
,
509 expiration
, flags
, 0, ADD_OP
);
510 LIBMEMCACHED_MEMCACHED_ADD_END();
514 memcached_return_t
memcached_replace_by_key(memcached_st
*ptr
,
515 const char *group_key
, size_t group_key_length
,
516 const char *key
, size_t key_length
,
517 const char *value
, size_t value_length
,
521 memcached_return_t rc
;
522 LIBMEMCACHED_MEMCACHED_REPLACE_START();
523 rc
= memcached_send(ptr
, group_key
, group_key_length
,
524 key
, key_length
, value
, value_length
,
525 expiration
, flags
, 0, REPLACE_OP
);
526 LIBMEMCACHED_MEMCACHED_REPLACE_END();
530 memcached_return_t
memcached_prepend_by_key(memcached_st
*ptr
,
531 const char *group_key
, size_t group_key_length
,
532 const char *key
, size_t key_length
,
533 const char *value
, size_t value_length
,
537 memcached_return_t rc
;
538 rc
= memcached_send(ptr
, group_key
, group_key_length
,
539 key
, key_length
, value
, value_length
,
540 expiration
, flags
, 0, PREPEND_OP
);
544 memcached_return_t
memcached_append_by_key(memcached_st
*ptr
,
545 const char *group_key
, size_t group_key_length
,
546 const char *key
, size_t key_length
,
547 const char *value
, size_t value_length
,
551 memcached_return_t rc
;
552 rc
= memcached_send(ptr
, group_key
, group_key_length
,
553 key
, key_length
, value
, value_length
,
554 expiration
, flags
, 0, APPEND_OP
);
558 memcached_return_t
memcached_cas_by_key(memcached_st
*ptr
,
559 const char *group_key
, size_t group_key_length
,
560 const char *key
, size_t key_length
,
561 const char *value
, size_t value_length
,
566 memcached_return_t rc
;
567 rc
= memcached_send(ptr
, group_key
, group_key_length
,
568 key
, key_length
, value
, value_length
,
569 expiration
, flags
, cas
, CAS_OP
);