1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
41 enum memcached_storage_action_t
{
51 static inline const char *storage_op_string(memcached_storage_action_t verb
)
77 static inline uint8_t get_com_code(memcached_storage_action_t verb
, bool noreply
)
84 return PROTOCOL_BINARY_CMD_SETQ
;
87 return PROTOCOL_BINARY_CMD_ADDQ
;
89 case CAS_OP
: /* FALLTHROUGH */
91 return PROTOCOL_BINARY_CMD_REPLACEQ
;
94 return PROTOCOL_BINARY_CMD_APPENDQ
;
97 return PROTOCOL_BINARY_CMD_PREPENDQ
;
107 return PROTOCOL_BINARY_CMD_ADD
;
109 case CAS_OP
: /* FALLTHROUGH */
111 return PROTOCOL_BINARY_CMD_REPLACE
;
114 return PROTOCOL_BINARY_CMD_APPEND
;
117 return PROTOCOL_BINARY_CMD_PREPEND
;
120 return PROTOCOL_BINARY_CMD_SET
;
123 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
124 memcached_server_write_instance_st server
,
134 memcached_storage_action_t verb
)
136 protocol_binary_request_set request
= {};
137 size_t send_length
= sizeof(request
.bytes
);
139 bool noreply
= server
->root
->flags
.no_reply
;
141 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
142 request
.message
.header
.request
.opcode
= get_com_code(verb
, noreply
);
143 request
.message
.header
.request
.keylen
= htons((uint16_t)(key_length
+ memcached_array_size(ptr
->_namespace
)));
144 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
145 if (verb
== APPEND_OP
or verb
== PREPEND_OP
)
147 send_length
-= 8; /* append & prepend does not contain extras! */
151 request
.message
.header
.request
.extlen
= 8;
152 request
.message
.body
.flags
= htonl(flags
);
153 request
.message
.body
.expiration
= htonl((uint32_t)expiration
);
156 request
.message
.header
.request
.bodylen
= htonl((uint32_t) (key_length
+ memcached_array_size(ptr
->_namespace
) + value_length
+
157 request
.message
.header
.request
.extlen
));
161 request
.message
.header
.request
.cas
= memcached_htonll(cas
);
164 if (server
->root
->flags
.use_udp
and flush
== false)
166 size_t cmd_size
= send_length
+ key_length
+ value_length
;
168 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
170 return MEMCACHED_WRITE_FAILURE
;
172 if (cmd_size
+ server
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
174 memcached_io_write(server
, NULL
, 0, true);
178 struct libmemcached_io_vector_st vector
[]=
180 { request
.bytes
, send_length
},
181 { memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
) },
183 { value
, value_length
}
186 /* write the header */
187 memcached_return_t rc
;
188 if ((rc
= memcached_vdo(server
, vector
, 4, flush
)) != MEMCACHED_SUCCESS
)
190 memcached_io_reset(server
);
192 if (ptr
->error_messages
== NULL
)
194 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
197 return MEMCACHED_WRITE_FAILURE
;
200 if (verb
== SET_OP
&& ptr
->number_of_replicas
> 0)
202 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_SETQ
;
203 WATCHPOINT_STRING("replicating");
205 for (uint32_t x
= 0; x
< ptr
->number_of_replicas
; x
++)
208 if (server_key
== memcached_server_count(ptr
))
213 memcached_server_write_instance_st instance
= memcached_server_instance_fetch(ptr
, server_key
);
215 if (memcached_vdo(instance
, vector
, 4, false) != MEMCACHED_SUCCESS
)
217 memcached_io_reset(instance
);
221 memcached_server_response_decrement(instance
);
228 return MEMCACHED_BUFFERED
;
233 return MEMCACHED_SUCCESS
;
236 return memcached_response(server
, NULL
, 0, NULL
);
239 static memcached_return_t
memcached_send_ascii(memcached_st
*ptr
,
240 memcached_server_write_instance_st instance
,
249 memcached_storage_action_t verb
)
252 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
257 check_length
= snprintf(buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
,
258 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
259 storage_op_string(verb
),
260 memcached_print_array(ptr
->_namespace
),
261 (int)key_length
, key
, flags
,
262 (unsigned long long)expiration
, (unsigned long)value_length
,
263 (unsigned long long)cas
,
264 (ptr
->flags
.no_reply
) ? " noreply" : "");
265 if (check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
or check_length
< 0)
267 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
268 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
270 write_length
= check_length
;
274 char *buffer_ptr
= buffer
;
275 const char *command
= storage_op_string(verb
);
277 /* Copy in the command, no space needed, we handle that in the command function*/
278 memcpy(buffer_ptr
, command
, strlen(command
));
280 /* Copy in the key prefix, switch to the buffer_ptr */
281 buffer_ptr
= (char *)memcpy((char *)(buffer_ptr
+ strlen(command
)), (char *)memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
));
283 /* Copy in the key, adjust point if a key prefix was used. */
284 buffer_ptr
= (char *)memcpy(buffer_ptr
+ memcached_array_size(ptr
->_namespace
),
286 buffer_ptr
+= key_length
;
290 write_length
= (size_t)(buffer_ptr
- buffer
);
291 int check_length
= snprintf(buffer_ptr
, MEMCACHED_DEFAULT_COMMAND_SIZE
-(size_t)(buffer_ptr
- buffer
),
294 (unsigned long long)expiration
, (unsigned long)value_length
,
295 ptr
->flags
.no_reply
? " noreply" : "");
296 if ((size_t)check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
-size_t(buffer_ptr
- buffer
) or check_length
< 0)
298 return memcached_set_error(*ptr
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
299 memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
302 write_length
+= (size_t)check_length
;
303 WATCHPOINT_ASSERT(write_length
< MEMCACHED_DEFAULT_COMMAND_SIZE
);
306 if (ptr
->flags
.use_udp
)
308 size_t cmd_size
= write_length
+ value_length
+2;
309 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
311 return memcached_set_error(*ptr
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
);
314 if (cmd_size
+ instance
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
316 memcached_io_write(instance
, NULL
, 0, true);
320 if (write_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
)
322 return memcached_set_error(*ptr
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
);
325 struct libmemcached_io_vector_st vector
[]=
327 { buffer
, write_length
},
328 { value
, value_length
},
329 { memcached_literal_param("\r\n") }
332 if (memcached_is_udp(instance
->root
) and (write_length
+value_length
+memcached_literal_param_size("\r\n") +UDP_DATAGRAM_HEADER_LENGTH
> MAX_UDP_DATAGRAM_LENGTH
))
334 return memcached_set_error(*instance
, MEMCACHED_WRITE_FAILURE
, MEMCACHED_AT
, memcached_literal_param("UDP packet is too large"));
337 /* Send command header */
338 memcached_return_t rc
= memcached_vdo(instance
, vector
, 3, flush
);
339 if (rc
== MEMCACHED_SUCCESS
)
341 if (ptr
->flags
.no_reply
and flush
)
343 rc
= MEMCACHED_SUCCESS
;
345 else if (flush
== false)
347 rc
= MEMCACHED_BUFFERED
;
351 rc
= memcached_response(instance
, buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, NULL
);
353 if (rc
== MEMCACHED_STORED
)
355 rc
= MEMCACHED_SUCCESS
;
360 if (rc
== MEMCACHED_WRITE_FAILURE
)
362 memcached_io_reset(instance
);
365 if (memcached_failed(rc
) and ptr
->error_messages
== NULL
)
367 memcached_set_error(*ptr
, rc
, MEMCACHED_AT
);
373 static inline memcached_return_t
memcached_send(memcached_st
*ptr
,
374 const char *group_key
, size_t group_key_length
,
375 const char *key
, size_t key_length
,
376 const char *value
, size_t value_length
,
380 memcached_storage_action_t verb
)
382 memcached_return_t rc
;
383 if (memcached_failed(rc
= initialize_query(ptr
)))
388 if (memcached_failed(rc
= memcached_validate_key_length(key_length
, ptr
->flags
.binary_protocol
)))
393 if (memcached_failed(memcached_key_test(*ptr
, (const char **)&key
, &key_length
, 1)))
395 return MEMCACHED_BAD_KEY_PROVIDED
;
398 uint32_t server_key
= memcached_generate_hash_with_redistribution(ptr
, group_key
, group_key_length
);
399 memcached_server_write_instance_st instance
= memcached_server_instance_fetch(ptr
, server_key
);
401 WATCHPOINT_SET(instance
->io_wait_count
.read
= 0);
402 WATCHPOINT_SET(instance
->io_wait_count
.write
= 0);
404 bool flush
= (bool) ((instance
->root
->flags
.buffer_requests
&& verb
== SET_OP
) ? 0 : 1);
405 if (ptr
->flags
.binary_protocol
)
407 rc
= memcached_send_binary(ptr
, instance
, server_key
,
409 value
, value_length
, expiration
,
410 flags
, cas
, flush
, verb
);
414 rc
= memcached_send_ascii(ptr
, instance
,
416 value
, value_length
, expiration
,
417 flags
, cas
, flush
, verb
);
424 memcached_return_t
memcached_set(memcached_st
*ptr
, const char *key
, size_t key_length
,
425 const char *value
, size_t value_length
,
429 memcached_return_t rc
;
430 LIBMEMCACHED_MEMCACHED_SET_START();
431 rc
= memcached_send(ptr
, key
, key_length
,
432 key
, key_length
, value
, value_length
,
433 expiration
, flags
, 0, SET_OP
);
434 LIBMEMCACHED_MEMCACHED_SET_END();
438 memcached_return_t
memcached_add(memcached_st
*ptr
,
439 const char *key
, size_t key_length
,
440 const char *value
, size_t value_length
,
444 memcached_return_t rc
;
445 LIBMEMCACHED_MEMCACHED_ADD_START();
446 rc
= memcached_send(ptr
, key
, key_length
,
447 key
, key_length
, value
, value_length
,
448 expiration
, flags
, 0, ADD_OP
);
449 LIBMEMCACHED_MEMCACHED_ADD_END();
453 memcached_return_t
memcached_replace(memcached_st
*ptr
,
454 const char *key
, size_t key_length
,
455 const char *value
, size_t value_length
,
459 memcached_return_t rc
;
460 LIBMEMCACHED_MEMCACHED_REPLACE_START();
461 rc
= memcached_send(ptr
, key
, key_length
,
462 key
, key_length
, value
, value_length
,
463 expiration
, flags
, 0, REPLACE_OP
);
464 LIBMEMCACHED_MEMCACHED_REPLACE_END();
468 memcached_return_t
memcached_prepend(memcached_st
*ptr
,
469 const char *key
, size_t key_length
,
470 const char *value
, size_t value_length
,
474 memcached_return_t rc
;
475 rc
= memcached_send(ptr
, key
, key_length
,
476 key
, key_length
, value
, value_length
,
477 expiration
, flags
, 0, PREPEND_OP
);
481 memcached_return_t
memcached_append(memcached_st
*ptr
,
482 const char *key
, size_t key_length
,
483 const char *value
, size_t value_length
,
487 memcached_return_t rc
;
488 rc
= memcached_send(ptr
, key
, key_length
,
489 key
, key_length
, value
, value_length
,
490 expiration
, flags
, 0, APPEND_OP
);
494 memcached_return_t
memcached_cas(memcached_st
*ptr
,
495 const char *key
, size_t key_length
,
496 const char *value
, size_t value_length
,
501 memcached_return_t rc
;
502 rc
= memcached_send(ptr
, key
, key_length
,
503 key
, key_length
, value
, value_length
,
504 expiration
, flags
, cas
, CAS_OP
);
508 memcached_return_t
memcached_set_by_key(memcached_st
*ptr
,
509 const char *group_key
,
510 size_t group_key_length
,
511 const char *key
, size_t key_length
,
512 const char *value
, size_t value_length
,
516 memcached_return_t rc
;
517 LIBMEMCACHED_MEMCACHED_SET_START();
518 rc
= memcached_send(ptr
, group_key
, group_key_length
,
519 key
, key_length
, value
, value_length
,
520 expiration
, flags
, 0, SET_OP
);
521 LIBMEMCACHED_MEMCACHED_SET_END();
525 memcached_return_t
memcached_add_by_key(memcached_st
*ptr
,
526 const char *group_key
, size_t group_key_length
,
527 const char *key
, size_t key_length
,
528 const char *value
, size_t value_length
,
532 memcached_return_t rc
;
533 LIBMEMCACHED_MEMCACHED_ADD_START();
534 rc
= memcached_send(ptr
, group_key
, group_key_length
,
535 key
, key_length
, value
, value_length
,
536 expiration
, flags
, 0, ADD_OP
);
537 LIBMEMCACHED_MEMCACHED_ADD_END();
541 memcached_return_t
memcached_replace_by_key(memcached_st
*ptr
,
542 const char *group_key
, size_t group_key_length
,
543 const char *key
, size_t key_length
,
544 const char *value
, size_t value_length
,
548 memcached_return_t rc
;
549 LIBMEMCACHED_MEMCACHED_REPLACE_START();
550 rc
= memcached_send(ptr
, group_key
, group_key_length
,
551 key
, key_length
, value
, value_length
,
552 expiration
, flags
, 0, REPLACE_OP
);
553 LIBMEMCACHED_MEMCACHED_REPLACE_END();
557 memcached_return_t
memcached_prepend_by_key(memcached_st
*ptr
,
558 const char *group_key
, size_t group_key_length
,
559 const char *key
, size_t key_length
,
560 const char *value
, size_t value_length
,
564 memcached_return_t rc
;
565 rc
= memcached_send(ptr
, group_key
, group_key_length
,
566 key
, key_length
, value
, value_length
,
567 expiration
, flags
, 0, PREPEND_OP
);
571 memcached_return_t
memcached_append_by_key(memcached_st
*ptr
,
572 const char *group_key
, size_t group_key_length
,
573 const char *key
, size_t key_length
,
574 const char *value
, size_t value_length
,
578 memcached_return_t rc
;
579 rc
= memcached_send(ptr
, group_key
, group_key_length
,
580 key
, key_length
, value
, value_length
,
581 expiration
, flags
, 0, APPEND_OP
);
585 memcached_return_t
memcached_cas_by_key(memcached_st
*ptr
,
586 const char *group_key
, size_t group_key_length
,
587 const char *key
, size_t key_length
,
588 const char *value
, size_t value_length
,
593 memcached_return_t rc
;
594 rc
= memcached_send(ptr
, group_key
, group_key_length
,
595 key
, key_length
, value
, value_length
,
596 expiration
, flags
, cas
, CAS_OP
);