1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include <libmemcached/common.h>
41 enum memcached_storage_action_t
{
51 static inline const char *storage_op_string(memcached_storage_action_t verb
)
77 static inline uint8_t can_by_encrypted(const memcached_storage_action_t verb
)
95 static inline uint8_t get_com_code(const memcached_storage_action_t verb
, const bool reply
)
102 return PROTOCOL_BINARY_CMD_SETQ
;
105 return PROTOCOL_BINARY_CMD_ADDQ
;
107 case CAS_OP
: /* FALLTHROUGH */
109 return PROTOCOL_BINARY_CMD_REPLACEQ
;
112 return PROTOCOL_BINARY_CMD_APPENDQ
;
115 return PROTOCOL_BINARY_CMD_PREPENDQ
;
125 return PROTOCOL_BINARY_CMD_ADD
;
127 case CAS_OP
: /* FALLTHROUGH */
129 return PROTOCOL_BINARY_CMD_REPLACE
;
132 return PROTOCOL_BINARY_CMD_APPEND
;
135 return PROTOCOL_BINARY_CMD_PREPEND
;
138 return PROTOCOL_BINARY_CMD_SET
;
141 static memcached_return_t
memcached_send_binary(Memcached
*ptr
,
142 memcached_instance_st
* server
,
145 const size_t key_length
,
147 const size_t value_length
,
148 const time_t expiration
,
149 const uint32_t flags
,
153 memcached_storage_action_t verb
)
155 protocol_binary_request_set request
= {};
156 size_t send_length
= sizeof(request
.bytes
);
158 initialize_binary_request(server
, request
.message
.header
);
160 request
.message
.header
.request
.opcode
= get_com_code(verb
, reply
);
161 request
.message
.header
.request
.keylen
= htons((uint16_t)(key_length
+ memcached_array_size(ptr
->_namespace
)));
162 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
163 if (verb
== APPEND_OP
or verb
== PREPEND_OP
)
165 send_length
-= 8; /* append & prepend does not contain extras! */
169 request
.message
.header
.request
.extlen
= 8;
170 request
.message
.body
.flags
= htonl(flags
);
171 request
.message
.body
.expiration
= htonl((uint32_t)expiration
);
174 request
.message
.header
.request
.bodylen
= htonl((uint32_t) (key_length
+ memcached_array_size(ptr
->_namespace
) + value_length
+
175 request
.message
.header
.request
.extlen
));
179 request
.message
.header
.request
.cas
= memcached_htonll(cas
);
182 libmemcached_io_vector_st vector
[]=
185 { request
.bytes
, send_length
},
186 { memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
) },
188 { value
, value_length
}
191 /* write the header */
192 memcached_return_t rc
;
193 if ((rc
= memcached_vdo(server
, vector
, 5, flush
)) != MEMCACHED_SUCCESS
)
195 assert(memcached_last_error(server
->root
) != MEMCACHED_SUCCESS
);
196 return memcached_last_error(server
->root
);
199 if (verb
== SET_OP
and ptr
->number_of_replicas
> 0)
201 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_SETQ
;
202 WATCHPOINT_STRING("replicating");
204 for (uint32_t x
= 0; x
< ptr
->number_of_replicas
; x
++)
207 if (server_key
== memcached_server_count(ptr
))
212 memcached_instance_st
* instance
= memcached_instance_fetch(ptr
, server_key
);
214 if (memcached_success(memcached_vdo(instance
, vector
, 5, false)))
216 memcached_server_response_decrement(instance
);
223 return MEMCACHED_BUFFERED
;
226 // No reply always assumes success
229 return MEMCACHED_SUCCESS
;
232 return memcached_response(server
, NULL
, 0, NULL
);
235 static memcached_return_t
memcached_send_ascii(Memcached
*ptr
,
236 memcached_instance_st
* instance
,
238 const size_t key_length
,
240 const size_t value_length
,
241 const time_t expiration
,
242 const uint32_t flags
,
246 const memcached_storage_action_t verb
)
248 char flags_buffer
[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH
+1];
249 int flags_buffer_length
= snprintf(flags_buffer
, sizeof(flags_buffer
), " %u", flags
);
250 if (size_t(flags_buffer_length
) >= sizeof(flags_buffer
) or flags_buffer_length
< 0)
252 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
253 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
256 char expiration_buffer
[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH
+1];
257 int expiration_buffer_length
= snprintf(expiration_buffer
, sizeof(expiration_buffer
), " %llu", (unsigned long long)expiration
);
258 if (size_t(expiration_buffer_length
) >= sizeof(expiration_buffer
) or expiration_buffer_length
< 0)
260 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
261 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
264 char value_buffer
[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH
+1];
265 int value_buffer_length
= snprintf(value_buffer
, sizeof(value_buffer
), " %llu", (unsigned long long)value_length
);
266 if (size_t(value_buffer_length
) >= sizeof(value_buffer
) or value_buffer_length
< 0)
268 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
269 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
272 char cas_buffer
[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH
+1];
273 int cas_buffer_length
= 0;
276 cas_buffer_length
= snprintf(cas_buffer
, sizeof(cas_buffer
), " %llu", (unsigned long long)cas
);
277 if (size_t(cas_buffer_length
) >= sizeof(cas_buffer
) or cas_buffer_length
< 0)
279 return memcached_set_error(*instance
, MEMCACHED_MEMORY_ALLOCATION_FAILURE
, MEMCACHED_AT
,
280 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
284 libmemcached_io_vector_st vector
[]=
287 { storage_op_string(verb
), strlen(storage_op_string(verb
))},
288 { memcached_array_string(ptr
->_namespace
), memcached_array_size(ptr
->_namespace
) },
290 { flags_buffer
, size_t(flags_buffer_length
) },
291 { expiration_buffer
, size_t(expiration_buffer_length
) },
292 { value_buffer
, size_t(value_buffer_length
) },
293 { cas_buffer
, size_t(cas_buffer_length
) },
294 { " noreply", reply
? 0 : memcached_literal_param_size(" noreply") },
295 { memcached_literal_param("\r\n") },
296 { value
, value_length
},
297 { memcached_literal_param("\r\n") }
300 /* Send command header */
301 memcached_return_t rc
= memcached_vdo(instance
, vector
, 12, flush
);
303 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
306 return memcached_success(rc
) ? MEMCACHED_SUCCESS
: rc
;
311 return memcached_success(rc
) ? MEMCACHED_BUFFERED
: rc
;
314 if (rc
== MEMCACHED_SUCCESS
)
316 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
317 rc
= memcached_response(instance
, buffer
, sizeof(buffer
), NULL
);
319 if (rc
== MEMCACHED_STORED
)
321 return MEMCACHED_SUCCESS
;
325 assert(memcached_failed(rc
));
327 if (memcached_has_error(ptr
) == false)
329 return memcached_set_error(*ptr
, rc
, MEMCACHED_AT
);
336 static inline memcached_return_t
memcached_send(memcached_st
*shell
,
337 const char *group_key
, size_t group_key_length
,
338 const char *key
, size_t key_length
,
339 const char *value
, size_t value_length
,
340 const time_t expiration
,
341 const uint32_t flags
,
343 memcached_storage_action_t verb
)
345 Memcached
* ptr
= memcached2Memcached(shell
);
346 memcached_return_t rc
;
347 if (memcached_failed(rc
= initialize_query(ptr
, true)))
352 if (memcached_failed(memcached_key_test(*ptr
, (const char **)&key
, &key_length
, 1)))
354 return memcached_last_error(ptr
);
357 uint32_t server_key
= memcached_generate_hash_with_redistribution(ptr
, group_key
, group_key_length
);
358 memcached_instance_st
* instance
= memcached_instance_fetch(ptr
, server_key
);
360 WATCHPOINT_SET(instance
->io_wait_count
.read
= 0);
361 WATCHPOINT_SET(instance
->io_wait_count
.write
= 0);
364 if (memcached_is_buffering(instance
->root
) and verb
== SET_OP
)
369 bool reply
= memcached_is_replying(ptr
);
371 hashkit_string_st
* destination
= NULL
;
373 if (memcached_is_encrypted(ptr
))
375 if (can_by_encrypted(verb
) == false)
377 return memcached_set_error(*ptr
, MEMCACHED_NOT_SUPPORTED
, MEMCACHED_AT
,
378 memcached_literal_param("Operation not allowed while encyrption is enabled"));
381 if ((destination
= hashkit_encrypt(&ptr
->hashkit
, value
, value_length
)) == NULL
)
385 value
= hashkit_string_c_str(destination
);
386 value_length
= hashkit_string_length(destination
);
389 if (memcached_is_binary(ptr
))
391 rc
= memcached_send_binary(ptr
, instance
, server_key
,
393 value
, value_length
, expiration
,
394 flags
, cas
, flush
, reply
, verb
);
398 rc
= memcached_send_ascii(ptr
, instance
,
400 value
, value_length
, expiration
,
401 flags
, cas
, flush
, reply
, verb
);
404 hashkit_string_free(destination
);
410 memcached_return_t
memcached_set(memcached_st
*ptr
, const char *key
, size_t key_length
,
411 const char *value
, size_t value_length
,
415 memcached_return_t rc
;
416 LIBMEMCACHED_MEMCACHED_SET_START();
417 rc
= memcached_send(ptr
, key
, key_length
,
418 key
, key_length
, value
, value_length
,
419 expiration
, flags
, 0, SET_OP
);
420 LIBMEMCACHED_MEMCACHED_SET_END();
424 memcached_return_t
memcached_add(memcached_st
*ptr
,
425 const char *key
, size_t key_length
,
426 const char *value
, size_t value_length
,
430 memcached_return_t rc
;
431 LIBMEMCACHED_MEMCACHED_ADD_START();
432 rc
= memcached_send(ptr
, key
, key_length
,
433 key
, key_length
, value
, value_length
,
434 expiration
, flags
, 0, ADD_OP
);
436 LIBMEMCACHED_MEMCACHED_ADD_END();
440 memcached_return_t
memcached_replace(memcached_st
*ptr
,
441 const char *key
, size_t key_length
,
442 const char *value
, size_t value_length
,
446 memcached_return_t rc
;
447 LIBMEMCACHED_MEMCACHED_REPLACE_START();
448 rc
= memcached_send(ptr
, key
, key_length
,
449 key
, key_length
, value
, value_length
,
450 expiration
, flags
, 0, REPLACE_OP
);
451 LIBMEMCACHED_MEMCACHED_REPLACE_END();
455 memcached_return_t
memcached_prepend(memcached_st
*ptr
,
456 const char *key
, size_t key_length
,
457 const char *value
, size_t value_length
,
461 memcached_return_t rc
;
462 rc
= memcached_send(ptr
, key
, key_length
,
463 key
, key_length
, value
, value_length
,
464 expiration
, flags
, 0, PREPEND_OP
);
468 memcached_return_t
memcached_append(memcached_st
*ptr
,
469 const char *key
, size_t key_length
,
470 const char *value
, size_t value_length
,
474 memcached_return_t rc
;
475 rc
= memcached_send(ptr
, key
, key_length
,
476 key
, key_length
, value
, value_length
,
477 expiration
, flags
, 0, APPEND_OP
);
481 memcached_return_t
memcached_cas(memcached_st
*ptr
,
482 const char *key
, size_t key_length
,
483 const char *value
, size_t value_length
,
488 memcached_return_t rc
;
489 rc
= memcached_send(ptr
, key
, key_length
,
490 key
, key_length
, value
, value_length
,
491 expiration
, flags
, cas
, CAS_OP
);
495 memcached_return_t
memcached_set_by_key(memcached_st
*ptr
,
496 const char *group_key
,
497 size_t group_key_length
,
498 const char *key
, size_t key_length
,
499 const char *value
, size_t value_length
,
503 memcached_return_t rc
;
504 LIBMEMCACHED_MEMCACHED_SET_START();
505 rc
= memcached_send(ptr
, group_key
, group_key_length
,
506 key
, key_length
, value
, value_length
,
507 expiration
, flags
, 0, SET_OP
);
508 LIBMEMCACHED_MEMCACHED_SET_END();
512 memcached_return_t
memcached_add_by_key(memcached_st
*ptr
,
513 const char *group_key
, size_t group_key_length
,
514 const char *key
, size_t key_length
,
515 const char *value
, size_t value_length
,
519 memcached_return_t rc
;
520 LIBMEMCACHED_MEMCACHED_ADD_START();
521 rc
= memcached_send(ptr
, group_key
, group_key_length
,
522 key
, key_length
, value
, value_length
,
523 expiration
, flags
, 0, ADD_OP
);
524 LIBMEMCACHED_MEMCACHED_ADD_END();
528 memcached_return_t
memcached_replace_by_key(memcached_st
*ptr
,
529 const char *group_key
, size_t group_key_length
,
530 const char *key
, size_t key_length
,
531 const char *value
, size_t value_length
,
535 memcached_return_t rc
;
536 LIBMEMCACHED_MEMCACHED_REPLACE_START();
537 rc
= memcached_send(ptr
, group_key
, group_key_length
,
538 key
, key_length
, value
, value_length
,
539 expiration
, flags
, 0, REPLACE_OP
);
540 LIBMEMCACHED_MEMCACHED_REPLACE_END();
544 memcached_return_t
memcached_prepend_by_key(memcached_st
*ptr
,
545 const char *group_key
, size_t group_key_length
,
546 const char *key
, size_t key_length
,
547 const char *value
, size_t value_length
,
551 return memcached_send(ptr
, group_key
, group_key_length
,
552 key
, key_length
, value
, value_length
,
553 expiration
, flags
, 0, PREPEND_OP
);
556 memcached_return_t
memcached_append_by_key(memcached_st
*ptr
,
557 const char *group_key
, size_t group_key_length
,
558 const char *key
, size_t key_length
,
559 const char *value
, size_t value_length
,
563 return memcached_send(ptr
, group_key
, group_key_length
,
564 key
, key_length
, value
, value_length
,
565 expiration
, flags
, 0, APPEND_OP
);
568 memcached_return_t
memcached_cas_by_key(memcached_st
*ptr
,
569 const char *group_key
, size_t group_key_length
,
570 const char *key
, size_t key_length
,
571 const char *value
, size_t value_length
,
576 return memcached_send(ptr
, group_key
, group_key_length
,
577 key
, key_length
, value
, value_length
,
578 expiration
, flags
, cas
, CAS_OP
);