2 * Copyright (C) 2006-2009 Brian Aker
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
8 * Summary: Storage related functions, aka set, replace,..
21 } memcached_storage_action_t
;
24 static inline const char *storage_op_string(memcached_storage_action_t verb
)
41 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
47 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
48 memcached_server_write_instance_st server
,
57 memcached_storage_action_t verb
);
59 static inline memcached_return_t
memcached_send(memcached_st
*ptr
,
60 const char *master_key
, size_t master_key_length
,
61 const char *key
, size_t key_length
,
62 const char *value
, size_t value_length
,
66 memcached_storage_action_t verb
)
70 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
72 memcached_server_write_instance_st instance
;
74 WATCHPOINT_ASSERT(!(value
== NULL
&& value_length
> 0));
76 memcached_return_t rc
;
77 if ((rc
= initialize_query(ptr
)) != MEMCACHED_SUCCESS
)
82 rc
= memcached_validate_key_length(key_length
, ptr
->flags
.binary_protocol
);
83 unlikely (rc
!= MEMCACHED_SUCCESS
)
86 if (ptr
->flags
.verify_key
&& (memcached_key_test((const char **)&key
, &key_length
, 1) == MEMCACHED_BAD_KEY_PROVIDED
))
87 return MEMCACHED_BAD_KEY_PROVIDED
;
89 server_key
= memcached_generate_hash_with_redistribution(ptr
, master_key
, master_key_length
);
90 instance
= memcached_server_instance_fetch(ptr
, server_key
);
92 WATCHPOINT_SET(instance
->io_wait_count
.read
= 0);
93 WATCHPOINT_SET(instance
->io_wait_count
.write
= 0);
95 if (ptr
->flags
.binary_protocol
)
97 rc
= memcached_send_binary(ptr
, instance
, server_key
,
99 value
, value_length
, expiration
,
101 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.read
> 2, "read IO_WAIT", instance
->io_wait_count
.read
);
102 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.write
> 2, "write_IO_WAIT", instance
->io_wait_count
.write
);
110 check_length
= snprintf(buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
,
111 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
112 storage_op_string(verb
),
113 memcached_print_array(ptr
->prefix_key
),
114 (int)key_length
, key
, flags
,
115 (unsigned long long)expiration
, (unsigned long)value_length
,
116 (unsigned long long)cas
,
117 (ptr
->flags
.no_reply
) ? " noreply" : "");
118 if (check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
|| check_length
< 0)
120 rc
= MEMCACHED_WRITE_FAILURE
;
121 memcached_io_reset(instance
);
125 write_length
= check_length
;
129 char *buffer_ptr
= buffer
;
130 const char *command
= storage_op_string(verb
);
132 /* Copy in the command, no space needed, we handle that in the command function*/
133 memcpy(buffer_ptr
, command
, strlen(command
));
135 /* Copy in the key prefix, switch to the buffer_ptr */
136 buffer_ptr
= memcpy((buffer_ptr
+ strlen(command
)), memcached_array_string(ptr
->prefix_key
), memcached_array_size(ptr
->prefix_key
));
138 /* Copy in the key, adjust point if a key prefix was used. */
139 buffer_ptr
= memcpy(buffer_ptr
+ memcached_array_size(ptr
->prefix_key
),
141 buffer_ptr
+= key_length
;
145 write_length
= (size_t)(buffer_ptr
- buffer
);
147 check_length
= snprintf(buffer_ptr
, MEMCACHED_DEFAULT_COMMAND_SIZE
-(size_t)(buffer_ptr
- buffer
),
150 (unsigned long long)expiration
, (unsigned long)value_length
,
151 ptr
->flags
.no_reply
? " noreply" : "");
152 if ((size_t)check_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
-(size_t)(buffer_ptr
- buffer
) || check_length
< 0)
154 rc
= MEMCACHED_WRITE_FAILURE
;
155 memcached_io_reset(instance
);
160 write_length
+= (size_t)check_length
;
161 WATCHPOINT_ASSERT(write_length
< MEMCACHED_DEFAULT_COMMAND_SIZE
);
164 if (ptr
->flags
.use_udp
&& ptr
->flags
.buffer_requests
)
166 size_t cmd_size
= write_length
+ value_length
+ 2;
167 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
168 return MEMCACHED_WRITE_FAILURE
;
169 if (cmd_size
+ instance
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
170 memcached_io_write(instance
, NULL
, 0, true);
173 if (write_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
)
175 rc
= MEMCACHED_WRITE_FAILURE
;
179 struct libmemcached_io_vector_st vector
[]=
181 { .length
= write_length
, .buffer
= buffer
},
182 { .length
= value_length
, .buffer
= value
},
183 { .length
= 2, .buffer
= "\r\n" }
186 if (ptr
->flags
.buffer_requests
&& verb
== SET_OP
)
195 /* Send command header */
196 rc
= memcached_vdo(instance
, vector
, 3, to_write
);
197 if (rc
== MEMCACHED_SUCCESS
)
200 if (ptr
->flags
.no_reply
)
202 rc
= (to_write
== false) ? MEMCACHED_BUFFERED
: MEMCACHED_SUCCESS
;
204 else if (to_write
== false)
206 rc
= MEMCACHED_BUFFERED
;
210 rc
= memcached_response(instance
, buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, NULL
);
212 if (rc
== MEMCACHED_STORED
)
213 rc
= MEMCACHED_SUCCESS
;
218 if (rc
== MEMCACHED_WRITE_FAILURE
)
219 memcached_io_reset(instance
);
222 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.read
> 2, "read IO_WAIT", instance
->io_wait_count
.read
);
223 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.write
> 2, "write_IO_WAIT", instance
->io_wait_count
.write
);
229 memcached_return_t
memcached_set(memcached_st
*ptr
, const char *key
, size_t key_length
,
230 const char *value
, size_t value_length
,
234 memcached_return_t rc
;
235 LIBMEMCACHED_MEMCACHED_SET_START();
236 rc
= memcached_send(ptr
, key
, key_length
,
237 key
, key_length
, value
, value_length
,
238 expiration
, flags
, 0, SET_OP
);
239 LIBMEMCACHED_MEMCACHED_SET_END();
243 memcached_return_t
memcached_add(memcached_st
*ptr
,
244 const char *key
, size_t key_length
,
245 const char *value
, size_t value_length
,
249 memcached_return_t rc
;
250 LIBMEMCACHED_MEMCACHED_ADD_START();
251 rc
= memcached_send(ptr
, key
, key_length
,
252 key
, key_length
, value
, value_length
,
253 expiration
, flags
, 0, ADD_OP
);
254 LIBMEMCACHED_MEMCACHED_ADD_END();
258 memcached_return_t
memcached_replace(memcached_st
*ptr
,
259 const char *key
, size_t key_length
,
260 const char *value
, size_t value_length
,
264 memcached_return_t rc
;
265 LIBMEMCACHED_MEMCACHED_REPLACE_START();
266 rc
= memcached_send(ptr
, key
, key_length
,
267 key
, key_length
, value
, value_length
,
268 expiration
, flags
, 0, REPLACE_OP
);
269 LIBMEMCACHED_MEMCACHED_REPLACE_END();
273 memcached_return_t
memcached_prepend(memcached_st
*ptr
,
274 const char *key
, size_t key_length
,
275 const char *value
, size_t value_length
,
279 memcached_return_t rc
;
280 rc
= memcached_send(ptr
, key
, key_length
,
281 key
, key_length
, value
, value_length
,
282 expiration
, flags
, 0, PREPEND_OP
);
286 memcached_return_t
memcached_append(memcached_st
*ptr
,
287 const char *key
, size_t key_length
,
288 const char *value
, size_t value_length
,
292 memcached_return_t rc
;
293 rc
= memcached_send(ptr
, key
, key_length
,
294 key
, key_length
, value
, value_length
,
295 expiration
, flags
, 0, APPEND_OP
);
299 memcached_return_t
memcached_cas(memcached_st
*ptr
,
300 const char *key
, size_t key_length
,
301 const char *value
, size_t value_length
,
306 memcached_return_t rc
;
307 rc
= memcached_send(ptr
, key
, key_length
,
308 key
, key_length
, value
, value_length
,
309 expiration
, flags
, cas
, CAS_OP
);
313 memcached_return_t
memcached_set_by_key(memcached_st
*ptr
,
314 const char *master_key
,
315 size_t master_key_length
,
316 const char *key
, size_t key_length
,
317 const char *value
, size_t value_length
,
321 memcached_return_t rc
;
322 LIBMEMCACHED_MEMCACHED_SET_START();
323 rc
= memcached_send(ptr
, master_key
, master_key_length
,
324 key
, key_length
, value
, value_length
,
325 expiration
, flags
, 0, SET_OP
);
326 LIBMEMCACHED_MEMCACHED_SET_END();
330 memcached_return_t
memcached_add_by_key(memcached_st
*ptr
,
331 const char *master_key
, size_t master_key_length
,
332 const char *key
, size_t key_length
,
333 const char *value
, size_t value_length
,
337 memcached_return_t rc
;
338 LIBMEMCACHED_MEMCACHED_ADD_START();
339 rc
= memcached_send(ptr
, master_key
, master_key_length
,
340 key
, key_length
, value
, value_length
,
341 expiration
, flags
, 0, ADD_OP
);
342 LIBMEMCACHED_MEMCACHED_ADD_END();
346 memcached_return_t
memcached_replace_by_key(memcached_st
*ptr
,
347 const char *master_key
, size_t master_key_length
,
348 const char *key
, size_t key_length
,
349 const char *value
, size_t value_length
,
353 memcached_return_t rc
;
354 LIBMEMCACHED_MEMCACHED_REPLACE_START();
355 rc
= memcached_send(ptr
, master_key
, master_key_length
,
356 key
, key_length
, value
, value_length
,
357 expiration
, flags
, 0, REPLACE_OP
);
358 LIBMEMCACHED_MEMCACHED_REPLACE_END();
362 memcached_return_t
memcached_prepend_by_key(memcached_st
*ptr
,
363 const char *master_key
, size_t master_key_length
,
364 const char *key
, size_t key_length
,
365 const char *value
, size_t value_length
,
369 memcached_return_t rc
;
370 rc
= memcached_send(ptr
, master_key
, master_key_length
,
371 key
, key_length
, value
, value_length
,
372 expiration
, flags
, 0, PREPEND_OP
);
376 memcached_return_t
memcached_append_by_key(memcached_st
*ptr
,
377 const char *master_key
, size_t master_key_length
,
378 const char *key
, size_t key_length
,
379 const char *value
, size_t value_length
,
383 memcached_return_t rc
;
384 rc
= memcached_send(ptr
, master_key
, master_key_length
,
385 key
, key_length
, value
, value_length
,
386 expiration
, flags
, 0, APPEND_OP
);
390 memcached_return_t
memcached_cas_by_key(memcached_st
*ptr
,
391 const char *master_key
, size_t master_key_length
,
392 const char *key
, size_t key_length
,
393 const char *value
, size_t value_length
,
398 memcached_return_t rc
;
399 rc
= memcached_send(ptr
, master_key
, master_key_length
,
400 key
, key_length
, value
, value_length
,
401 expiration
, flags
, cas
, CAS_OP
);
405 static inline uint8_t get_com_code(memcached_storage_action_t verb
, bool noreply
)
407 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
408 * be used uninitialized in this function. FAIL */
415 ret
=PROTOCOL_BINARY_CMD_SETQ
;
418 ret
=PROTOCOL_BINARY_CMD_ADDQ
;
420 case CAS_OP
: /* FALLTHROUGH */
422 ret
=PROTOCOL_BINARY_CMD_REPLACEQ
;
425 ret
=PROTOCOL_BINARY_CMD_APPENDQ
;
428 ret
=PROTOCOL_BINARY_CMD_PREPENDQ
;
431 WATCHPOINT_ASSERT(verb
);
438 ret
=PROTOCOL_BINARY_CMD_SET
;
441 ret
=PROTOCOL_BINARY_CMD_ADD
;
443 case CAS_OP
: /* FALLTHROUGH */
445 ret
=PROTOCOL_BINARY_CMD_REPLACE
;
448 ret
=PROTOCOL_BINARY_CMD_APPEND
;
451 ret
=PROTOCOL_BINARY_CMD_PREPEND
;
454 WATCHPOINT_ASSERT(verb
);
463 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
464 memcached_server_write_instance_st server
,
473 memcached_storage_action_t verb
)
476 protocol_binary_request_set request
= {.bytes
= {0}};
477 size_t send_length
= sizeof(request
.bytes
);
479 bool noreply
= server
->root
->flags
.no_reply
;
481 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
482 request
.message
.header
.request
.opcode
= get_com_code(verb
, noreply
);
483 request
.message
.header
.request
.keylen
= htons((uint16_t)(key_length
+ memcached_array_size(ptr
->prefix_key
)));
484 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
485 if (verb
== APPEND_OP
|| verb
== PREPEND_OP
)
486 send_length
-= 8; /* append & prepend does not contain extras! */
489 request
.message
.header
.request
.extlen
= 8;
490 request
.message
.body
.flags
= htonl(flags
);
491 request
.message
.body
.expiration
= htonl((uint32_t)expiration
);
494 request
.message
.header
.request
.bodylen
= htonl((uint32_t) (key_length
+ memcached_array_size(ptr
->prefix_key
) + value_length
+
495 request
.message
.header
.request
.extlen
));
498 request
.message
.header
.request
.cas
= htonll(cas
);
500 flush
= (bool) ((server
->root
->flags
.buffer_requests
&& verb
== SET_OP
) ? 0 : 1);
502 if (server
->root
->flags
.use_udp
&& ! flush
)
504 size_t cmd_size
= send_length
+ key_length
+ value_length
;
506 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
508 return MEMCACHED_WRITE_FAILURE
;
510 if (cmd_size
+ server
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
512 memcached_io_write(server
, NULL
, 0, true);
516 struct libmemcached_io_vector_st vector
[]=
518 { .length
= send_length
, .buffer
= request
.bytes
},
519 { .length
= memcached_array_size(ptr
->prefix_key
), .buffer
= memcached_array_string(ptr
->prefix_key
) },
520 { .length
= key_length
, .buffer
= key
},
521 { .length
= value_length
, .buffer
= value
}
524 /* write the header */
525 memcached_return_t rc
;
526 if ((rc
= memcached_vdo(server
, vector
, 4, flush
)) != MEMCACHED_SUCCESS
)
528 memcached_io_reset(server
);
529 return (rc
== MEMCACHED_SUCCESS
) ? MEMCACHED_WRITE_FAILURE
: rc
;
532 if (verb
== SET_OP
&& ptr
->number_of_replicas
> 0)
534 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_SETQ
;
535 WATCHPOINT_STRING("replicating");
537 for (uint32_t x
= 0; x
< ptr
->number_of_replicas
; x
++)
539 memcached_server_write_instance_st instance
;
542 if (server_key
== memcached_server_count(ptr
))
545 instance
= memcached_server_instance_fetch(ptr
, server_key
);
547 if (memcached_vdo(instance
, vector
, 4, false) != MEMCACHED_SUCCESS
)
549 memcached_io_reset(instance
);
553 memcached_server_response_decrement(instance
);
560 return MEMCACHED_BUFFERED
;
565 return MEMCACHED_SUCCESS
;
568 return memcached_response(server
, NULL
, 0, NULL
);