1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
41 enum memcached_storage_action_t
{
51 static inline const char *storage_op_string(memcached_storage_action_t verb
)
77 static inline uint8_t get_com_code(const memcached_storage_action_t verb
, const bool reply
)
84 return PROTOCOL_BINARY_CMD_SETQ
;
87 return PROTOCOL_BINARY_CMD_ADDQ
;
89 case CAS_OP
: /* FALLTHROUGH */
91 return PROTOCOL_BINARY_CMD_REPLACEQ
;
94 return PROTOCOL_BINARY_CMD_APPENDQ
;
97 return PROTOCOL_BINARY_CMD_PREPENDQ
;
107 return PROTOCOL_BINARY_CMD_ADD
;
109 case CAS_OP
: /* FALLTHROUGH */
111 return PROTOCOL_BINARY_CMD_REPLACE
;
114 return PROTOCOL_BINARY_CMD_APPEND
;
117 return PROTOCOL_BINARY_CMD_PREPEND
;
120 return PROTOCOL_BINARY_CMD_SET
;
123 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
124 memcached_server_write_instance_st server
,
127 const size_t key_length
,
129 const size_t value_length
,
130 const time_t expiration
,
131 const uint32_t flags
,
135 memcached_storage_action_t verb
)
137 protocol_binary_request_set request
= {};
138 size_t send_length
= sizeof(request
.bytes
);
140 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
141 request
.message
.header
.request
.opcode
= get_com_code(verb
, reply
);
142 request
.message
.header
.request
.keylen
= htons((uint16_t)(key_length
+ memcached_array_size(ptr
->_namespace
)));
143 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
144 if (verb
== APPEND_OP
or verb
== PREPEND_OP
)
146 send_length
-= 8; /* append & prepend does not contain extras! */
150 request
.message
.header
.request
.extlen
= 8;
151 request
.message
.body
.flags
= htonl(flags
);
152 request
.message
.body
.expiration
= htonl((uint32_t)expiration
);
155 request
.message
.header
.request
.bodylen
= htonl((uint32_t) (key_length
+ memcached_array_size(ptr
->_namespace
) + value_length
+
156 request
.message
.header
.request
.extlen
));
160 request
.message
.header
.request
.cas
= memcached_htonll(cas
);
163 libmemcached_io_vector_st vector
[]=
166 { request
.bytes
, send_length
},
167 { memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
) },
169 { value
, value_length
}
172 /* write the header */
173 memcached_return_t rc
;
174 if ((rc
= memcached_vdo(server
, vector
, 5, flush
)) != MEMCACHED_SUCCESS
)
176 memcached_io_reset(server
);
178 if (memcached_has_error(ptr
))
180 memcached_set_error(*server
, rc
, MEMCACHED_AT
);
183 return MEMCACHED_WRITE_FAILURE
;
186 if (verb
== SET_OP
and ptr
->number_of_replicas
> 0)
188 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_SETQ
;
189 WATCHPOINT_STRING("replicating");
191 for (uint32_t x
= 0; x
< ptr
->number_of_replicas
; x
++)
194 if (server_key
== memcached_server_count(ptr
))
199 memcached_server_write_instance_st instance
= memcached_server_instance_fetch(ptr
, server_key
);
201 if (memcached_vdo(instance
, vector
, 5, false) != MEMCACHED_SUCCESS
)
203 memcached_io_reset(instance
);
207 memcached_server_response_decrement(instance
);
214 return MEMCACHED_BUFFERED
;
217 // No reply always assumes success
220 return MEMCACHED_SUCCESS
;
223 return memcached_response(server
, NULL
, 0, NULL
);
226 static memcached_return_t
memcached_send_ascii(memcached_st
*ptr
,
227 memcached_server_write_instance_st instance
,
229 const size_t key_length
,
231 const size_t value_length
,
232 const time_t expiration
,
233 const uint32_t flags
,
237 const memcached_storage_action_t verb
)
239 char flags_buffer
[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH
+1];
240 int flags_buffer_length
= snprintf(flags_buffer
, sizeof(flags_buffer
), " %u", flags
);
241 if (size_t(flags_buffer_length
) >= sizeof(flags_buffer
) or flags_buffer_length
< 0)
243 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
244 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
247 char expiration_buffer
[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH
+1];
248 int expiration_buffer_length
= snprintf(expiration_buffer
, sizeof(expiration_buffer
), " %llu", (unsigned long long)expiration
);
249 if (size_t(expiration_buffer_length
) >= sizeof(expiration_buffer
) or expiration_buffer_length
< 0)
251 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
252 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
255 char value_buffer
[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH
+1];
256 int value_buffer_length
= snprintf(value_buffer
, sizeof(value_buffer
), " %llu", (unsigned long long)value_length
);
257 if (size_t(value_buffer_length
) >= sizeof(value_buffer
) or value_buffer_length
< 0)
259 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
260 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
263 char cas_buffer
[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH
+1];
264 int cas_buffer_length
= 0;
267 cas_buffer_length
= snprintf(cas_buffer
, sizeof(cas_buffer
), " %llu", (unsigned long long)cas
);
268 if (size_t(cas_buffer_length
) >= sizeof(cas_buffer
) or cas_buffer_length
< 0)
270 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
271 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
275 libmemcached_io_vector_st vector
[]=
278 { storage_op_string(verb
), strlen(storage_op_string(verb
))},
279 { memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
) },
281 { flags_buffer
, flags_buffer_length
},
282 { expiration_buffer
, expiration_buffer_length
},
283 { value_buffer
, value_buffer_length
},
284 { cas_buffer
, cas_buffer_length
},
285 { " noreply", reply
? 0 : memcached_literal_param_size(" noreply") },
286 { memcached_literal_param("\r\n") },
287 { value
, value_length
},
288 { memcached_literal_param("\r\n") }
291 /* Send command header */
292 memcached_return_t rc
= memcached_vdo(instance
, vector
, 12, flush
);
293 if (rc
== MEMCACHED_SUCCESS
)
297 return MEMCACHED_BUFFERED
;
302 return MEMCACHED_SUCCESS
;
305 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
306 rc
= memcached_response(instance
, buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, NULL
);
308 if (rc
== MEMCACHED_STORED
)
310 return MEMCACHED_SUCCESS
;
314 if (rc
== MEMCACHED_WRITE_FAILURE
)
316 memcached_io_reset(instance
);
319 assert(memcached_failed(rc
));
320 if (memcached_has_error(ptr
) == false)
322 return memcached_set_error(*ptr
, rc
, MEMCACHED_AT
);
328 static inline memcached_return_t
memcached_send(memcached_st
*ptr
,
329 const char *group_key
, size_t group_key_length
,
330 const char *key
, size_t key_length
,
331 const char *value
, size_t value_length
,
332 const time_t expiration
,
333 const uint32_t flags
,
335 memcached_storage_action_t verb
)
337 memcached_return_t rc
;
338 if (memcached_failed(rc
= initialize_query(ptr
, true)))
343 if (memcached_failed(rc
= memcached_validate_key_length(key_length
, memcached_is_binary(ptr
))))
348 if (memcached_failed(memcached_key_test(*ptr
, (const char **)&key
, &key_length
, 1)))
350 return MEMCACHED_BAD_KEY_PROVIDED
;
353 uint32_t server_key
= memcached_generate_hash_with_redistribution(ptr
, group_key
, group_key_length
);
354 memcached_server_write_instance_st instance
= memcached_server_instance_fetch(ptr
, server_key
);
356 WATCHPOINT_SET(instance
->io_wait_count
.read
= 0);
357 WATCHPOINT_SET(instance
->io_wait_count
.write
= 0);
361 if (memcached_is_buffering(instance
->root
) and verb
== SET_OP
)
366 bool reply
= memcached_is_replying(ptr
);
368 if (memcached_is_binary(ptr
))
370 return memcached_send_binary(ptr
, instance
, server_key
,
372 value
, value_length
, expiration
,
373 flags
, cas
, flush
, reply
, verb
);
376 return memcached_send_ascii(ptr
, instance
,
378 value
, value_length
, expiration
,
379 flags
, cas
, flush
, reply
, verb
);
383 memcached_return_t
memcached_set(memcached_st
*ptr
, const char *key
, size_t key_length
,
384 const char *value
, size_t value_length
,
388 memcached_return_t rc
;
389 LIBMEMCACHED_MEMCACHED_SET_START();
390 rc
= memcached_send(ptr
, key
, key_length
,
391 key
, key_length
, value
, value_length
,
392 expiration
, flags
, 0, SET_OP
);
393 LIBMEMCACHED_MEMCACHED_SET_END();
397 memcached_return_t
memcached_add(memcached_st
*ptr
,
398 const char *key
, size_t key_length
,
399 const char *value
, size_t value_length
,
403 memcached_return_t rc
;
404 LIBMEMCACHED_MEMCACHED_ADD_START();
405 rc
= memcached_send(ptr
, key
, key_length
,
406 key
, key_length
, value
, value_length
,
407 expiration
, flags
, 0, ADD_OP
);
409 if (rc
== MEMCACHED_NOTSTORED
or rc
== MEMCACHED_DATA_EXISTS
)
411 memcached_set_error(*ptr
, rc
, MEMCACHED_AT
);
413 LIBMEMCACHED_MEMCACHED_ADD_END();
417 memcached_return_t
memcached_replace(memcached_st
*ptr
,
418 const char *key
, size_t key_length
,
419 const char *value
, size_t value_length
,
423 memcached_return_t rc
;
424 LIBMEMCACHED_MEMCACHED_REPLACE_START();
425 rc
= memcached_send(ptr
, key
, key_length
,
426 key
, key_length
, value
, value_length
,
427 expiration
, flags
, 0, REPLACE_OP
);
428 LIBMEMCACHED_MEMCACHED_REPLACE_END();
432 memcached_return_t
memcached_prepend(memcached_st
*ptr
,
433 const char *key
, size_t key_length
,
434 const char *value
, size_t value_length
,
438 memcached_return_t rc
;
439 rc
= memcached_send(ptr
, key
, key_length
,
440 key
, key_length
, value
, value_length
,
441 expiration
, flags
, 0, PREPEND_OP
);
445 memcached_return_t
memcached_append(memcached_st
*ptr
,
446 const char *key
, size_t key_length
,
447 const char *value
, size_t value_length
,
451 memcached_return_t rc
;
452 rc
= memcached_send(ptr
, key
, key_length
,
453 key
, key_length
, value
, value_length
,
454 expiration
, flags
, 0, APPEND_OP
);
458 memcached_return_t
memcached_cas(memcached_st
*ptr
,
459 const char *key
, size_t key_length
,
460 const char *value
, size_t value_length
,
465 memcached_return_t rc
;
466 rc
= memcached_send(ptr
, key
, key_length
,
467 key
, key_length
, value
, value_length
,
468 expiration
, flags
, cas
, CAS_OP
);
472 memcached_return_t
memcached_set_by_key(memcached_st
*ptr
,
473 const char *group_key
,
474 size_t group_key_length
,
475 const char *key
, size_t key_length
,
476 const char *value
, size_t value_length
,
480 memcached_return_t rc
;
481 LIBMEMCACHED_MEMCACHED_SET_START();
482 rc
= memcached_send(ptr
, group_key
, group_key_length
,
483 key
, key_length
, value
, value_length
,
484 expiration
, flags
, 0, SET_OP
);
485 LIBMEMCACHED_MEMCACHED_SET_END();
489 memcached_return_t
memcached_add_by_key(memcached_st
*ptr
,
490 const char *group_key
, size_t group_key_length
,
491 const char *key
, size_t key_length
,
492 const char *value
, size_t value_length
,
496 memcached_return_t rc
;
497 LIBMEMCACHED_MEMCACHED_ADD_START();
498 rc
= memcached_send(ptr
, group_key
, group_key_length
,
499 key
, key_length
, value
, value_length
,
500 expiration
, flags
, 0, ADD_OP
);
501 LIBMEMCACHED_MEMCACHED_ADD_END();
505 memcached_return_t
memcached_replace_by_key(memcached_st
*ptr
,
506 const char *group_key
, size_t group_key_length
,
507 const char *key
, size_t key_length
,
508 const char *value
, size_t value_length
,
512 memcached_return_t rc
;
513 LIBMEMCACHED_MEMCACHED_REPLACE_START();
514 rc
= memcached_send(ptr
, group_key
, group_key_length
,
515 key
, key_length
, value
, value_length
,
516 expiration
, flags
, 0, REPLACE_OP
);
517 LIBMEMCACHED_MEMCACHED_REPLACE_END();
521 memcached_return_t
memcached_prepend_by_key(memcached_st
*ptr
,
522 const char *group_key
, size_t group_key_length
,
523 const char *key
, size_t key_length
,
524 const char *value
, size_t value_length
,
528 return memcached_send(ptr
, group_key
, group_key_length
,
529 key
, key_length
, value
, value_length
,
530 expiration
, flags
, 0, PREPEND_OP
);
533 memcached_return_t
memcached_append_by_key(memcached_st
*ptr
,
534 const char *group_key
, size_t group_key_length
,
535 const char *key
, size_t key_length
,
536 const char *value
, size_t value_length
,
540 return memcached_send(ptr
, group_key
, group_key_length
,
541 key
, key_length
, value
, value_length
,
542 expiration
, flags
, 0, APPEND_OP
);
545 memcached_return_t
memcached_cas_by_key(memcached_st
*ptr
,
546 const char *group_key
, size_t group_key_length
,
547 const char *key
, size_t key_length
,
548 const char *value
, size_t value_length
,
553 return memcached_send(ptr
, group_key
, group_key_length
,
554 key
, key_length
, value
, value_length
,
555 expiration
, flags
, cas
, CAS_OP
);