2 * Copyright (C) 2006-2009 Brian Aker
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
8 * Summary: Storage related functions, aka set, replace,..
21 } memcached_storage_action_t
;
24 static inline const char *storage_op_string(memcached_storage_action_t verb
)
41 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
47 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
48 memcached_server_write_instance_st server
,
57 memcached_storage_action_t verb
);
59 static inline memcached_return_t
memcached_send(memcached_st
*ptr
,
60 const char *master_key
, size_t master_key_length
,
61 const char *key
, size_t key_length
,
62 const char *value
, size_t value_length
,
66 memcached_storage_action_t verb
)
70 memcached_return_t rc
;
71 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
73 memcached_server_write_instance_st instance
;
75 WATCHPOINT_ASSERT(!(value
== NULL
&& value_length
> 0));
77 rc
= memcached_validate_key_length(key_length
, ptr
->flags
.binary_protocol
);
78 unlikely (rc
!= MEMCACHED_SUCCESS
)
81 unlikely (memcached_server_count(ptr
) == 0)
82 return MEMCACHED_NO_SERVERS
;
84 if (ptr
->flags
.verify_key
&& (memcached_key_test((const char **)&key
, &key_length
, 1) == MEMCACHED_BAD_KEY_PROVIDED
))
85 return MEMCACHED_BAD_KEY_PROVIDED
;
87 server_key
= memcached_generate_hash_with_redistribution(ptr
, master_key
, master_key_length
);
88 instance
= memcached_server_instance_fetch(ptr
, server_key
);
90 WATCHPOINT_SET(instance
->io_wait_count
.read
= 0);
91 WATCHPOINT_SET(instance
->io_wait_count
.write
= 0);
93 if (ptr
->flags
.binary_protocol
)
95 rc
= memcached_send_binary(ptr
, instance
, server_key
,
97 value
, value_length
, expiration
,
99 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.read
> 2, "read IO_WAIT", instance
->io_wait_count
.read
);
100 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.write
> 2, "write_IO_WAIT", instance
->io_wait_count
.write
);
107 write_length
= (size_t) snprintf(buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
,
108 "%s %.*s%.*s %u %llu %zu %llu%s\r\n",
109 storage_op_string(verb
),
110 (int)ptr
->prefix_key_length
,
112 (int)key_length
, key
, flags
,
113 (unsigned long long)expiration
, value_length
,
114 (unsigned long long)cas
,
115 (ptr
->flags
.no_reply
) ? " noreply" : "");
119 char *buffer_ptr
= buffer
;
120 const char *command
= storage_op_string(verb
);
122 /* Copy in the command, no space needed, we handle that in the command function*/
123 memcpy(buffer_ptr
, command
, strlen(command
));
125 /* Copy in the key prefix, switch to the buffer_ptr */
126 buffer_ptr
= memcpy((buffer_ptr
+ strlen(command
)), ptr
->prefix_key
, ptr
->prefix_key_length
);
128 /* Copy in the key, adjust point if a key prefix was used. */
129 buffer_ptr
= memcpy(buffer_ptr
+ (ptr
->prefix_key_length
? ptr
->prefix_key_length
: 0),
131 buffer_ptr
+= key_length
;
135 write_length
= (size_t)(buffer_ptr
- buffer
);
136 write_length
+= (size_t) snprintf(buffer_ptr
, MEMCACHED_DEFAULT_COMMAND_SIZE
,
139 (unsigned long long)expiration
, value_length
,
140 ptr
->flags
.no_reply
? " noreply" : "");
143 if (ptr
->flags
.use_udp
&& ptr
->flags
.buffer_requests
)
145 size_t cmd_size
= write_length
+ value_length
+ 2;
146 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
147 return MEMCACHED_WRITE_FAILURE
;
148 if (cmd_size
+ instance
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
149 memcached_io_write(instance
, NULL
, 0, true);
152 if (write_length
>= MEMCACHED_DEFAULT_COMMAND_SIZE
)
154 rc
= MEMCACHED_WRITE_FAILURE
;
158 struct __write_vector_st vector
[]=
160 { .length
= write_length
, .buffer
= buffer
},
161 { .length
= value_length
, .buffer
= value
},
162 { .length
= 2, .buffer
= "\r\n" }
165 if (ptr
->flags
.buffer_requests
&& verb
== SET_OP
)
174 /* Send command header */
175 rc
= memcached_vdo(instance
, vector
, 3, to_write
);
176 if (rc
== MEMCACHED_SUCCESS
)
179 if (ptr
->flags
.no_reply
)
181 rc
= (to_write
== false) ? MEMCACHED_BUFFERED
: MEMCACHED_SUCCESS
;
183 else if (to_write
== false)
185 rc
= MEMCACHED_BUFFERED
;
189 rc
= memcached_response(instance
, buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, NULL
);
191 if (rc
== MEMCACHED_STORED
)
192 rc
= MEMCACHED_SUCCESS
;
197 if (rc
== MEMCACHED_WRITE_FAILURE
)
198 memcached_io_reset(instance
);
201 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.read
> 2, "read IO_WAIT", instance
->io_wait_count
.read
);
202 WATCHPOINT_IF_LABELED_NUMBER(instance
->io_wait_count
.write
> 2, "write_IO_WAIT", instance
->io_wait_count
.write
);
208 memcached_return_t
memcached_set(memcached_st
*ptr
, const char *key
, size_t key_length
,
209 const char *value
, size_t value_length
,
213 memcached_return_t rc
;
214 LIBMEMCACHED_MEMCACHED_SET_START();
215 rc
= memcached_send(ptr
, key
, key_length
,
216 key
, key_length
, value
, value_length
,
217 expiration
, flags
, 0, SET_OP
);
218 LIBMEMCACHED_MEMCACHED_SET_END();
222 memcached_return_t
memcached_add(memcached_st
*ptr
,
223 const char *key
, size_t key_length
,
224 const char *value
, size_t value_length
,
228 memcached_return_t rc
;
229 LIBMEMCACHED_MEMCACHED_ADD_START();
230 rc
= memcached_send(ptr
, key
, key_length
,
231 key
, key_length
, value
, value_length
,
232 expiration
, flags
, 0, ADD_OP
);
233 LIBMEMCACHED_MEMCACHED_ADD_END();
237 memcached_return_t
memcached_replace(memcached_st
*ptr
,
238 const char *key
, size_t key_length
,
239 const char *value
, size_t value_length
,
243 memcached_return_t rc
;
244 LIBMEMCACHED_MEMCACHED_REPLACE_START();
245 rc
= memcached_send(ptr
, key
, key_length
,
246 key
, key_length
, value
, value_length
,
247 expiration
, flags
, 0, REPLACE_OP
);
248 LIBMEMCACHED_MEMCACHED_REPLACE_END();
252 memcached_return_t
memcached_prepend(memcached_st
*ptr
,
253 const char *key
, size_t key_length
,
254 const char *value
, size_t value_length
,
258 memcached_return_t rc
;
259 rc
= memcached_send(ptr
, key
, key_length
,
260 key
, key_length
, value
, value_length
,
261 expiration
, flags
, 0, PREPEND_OP
);
265 memcached_return_t
memcached_append(memcached_st
*ptr
,
266 const char *key
, size_t key_length
,
267 const char *value
, size_t value_length
,
271 memcached_return_t rc
;
272 rc
= memcached_send(ptr
, key
, key_length
,
273 key
, key_length
, value
, value_length
,
274 expiration
, flags
, 0, APPEND_OP
);
278 memcached_return_t
memcached_cas(memcached_st
*ptr
,
279 const char *key
, size_t key_length
,
280 const char *value
, size_t value_length
,
285 memcached_return_t rc
;
286 rc
= memcached_send(ptr
, key
, key_length
,
287 key
, key_length
, value
, value_length
,
288 expiration
, flags
, cas
, CAS_OP
);
292 memcached_return_t
memcached_set_by_key(memcached_st
*ptr
,
293 const char *master_key
__attribute__((unused
)),
294 size_t master_key_length
__attribute__((unused
)),
295 const char *key
, size_t key_length
,
296 const char *value
, size_t value_length
,
300 memcached_return_t rc
;
301 LIBMEMCACHED_MEMCACHED_SET_START();
302 rc
= memcached_send(ptr
, master_key
, master_key_length
,
303 key
, key_length
, value
, value_length
,
304 expiration
, flags
, 0, SET_OP
);
305 LIBMEMCACHED_MEMCACHED_SET_END();
309 memcached_return_t
memcached_add_by_key(memcached_st
*ptr
,
310 const char *master_key
, size_t master_key_length
,
311 const char *key
, size_t key_length
,
312 const char *value
, size_t value_length
,
316 memcached_return_t rc
;
317 LIBMEMCACHED_MEMCACHED_ADD_START();
318 rc
= memcached_send(ptr
, master_key
, master_key_length
,
319 key
, key_length
, value
, value_length
,
320 expiration
, flags
, 0, ADD_OP
);
321 LIBMEMCACHED_MEMCACHED_ADD_END();
325 memcached_return_t
memcached_replace_by_key(memcached_st
*ptr
,
326 const char *master_key
, size_t master_key_length
,
327 const char *key
, size_t key_length
,
328 const char *value
, size_t value_length
,
332 memcached_return_t rc
;
333 LIBMEMCACHED_MEMCACHED_REPLACE_START();
334 rc
= memcached_send(ptr
, master_key
, master_key_length
,
335 key
, key_length
, value
, value_length
,
336 expiration
, flags
, 0, REPLACE_OP
);
337 LIBMEMCACHED_MEMCACHED_REPLACE_END();
341 memcached_return_t
memcached_prepend_by_key(memcached_st
*ptr
,
342 const char *master_key
, size_t master_key_length
,
343 const char *key
, size_t key_length
,
344 const char *value
, size_t value_length
,
348 memcached_return_t rc
;
349 rc
= memcached_send(ptr
, master_key
, master_key_length
,
350 key
, key_length
, value
, value_length
,
351 expiration
, flags
, 0, PREPEND_OP
);
355 memcached_return_t
memcached_append_by_key(memcached_st
*ptr
,
356 const char *master_key
, size_t master_key_length
,
357 const char *key
, size_t key_length
,
358 const char *value
, size_t value_length
,
362 memcached_return_t rc
;
363 rc
= memcached_send(ptr
, master_key
, master_key_length
,
364 key
, key_length
, value
, value_length
,
365 expiration
, flags
, 0, APPEND_OP
);
369 memcached_return_t
memcached_cas_by_key(memcached_st
*ptr
,
370 const char *master_key
, size_t master_key_length
,
371 const char *key
, size_t key_length
,
372 const char *value
, size_t value_length
,
377 memcached_return_t rc
;
378 rc
= memcached_send(ptr
, master_key
, master_key_length
,
379 key
, key_length
, value
, value_length
,
380 expiration
, flags
, cas
, CAS_OP
);
384 static inline uint8_t get_com_code(memcached_storage_action_t verb
, bool noreply
)
386 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
387 * be used uninitialized in this function. FAIL */
394 ret
=PROTOCOL_BINARY_CMD_SETQ
;
397 ret
=PROTOCOL_BINARY_CMD_ADDQ
;
399 case CAS_OP
: /* FALLTHROUGH */
401 ret
=PROTOCOL_BINARY_CMD_REPLACEQ
;
404 ret
=PROTOCOL_BINARY_CMD_APPENDQ
;
407 ret
=PROTOCOL_BINARY_CMD_PREPENDQ
;
410 WATCHPOINT_ASSERT(verb
);
417 ret
=PROTOCOL_BINARY_CMD_SET
;
420 ret
=PROTOCOL_BINARY_CMD_ADD
;
422 case CAS_OP
: /* FALLTHROUGH */
424 ret
=PROTOCOL_BINARY_CMD_REPLACE
;
427 ret
=PROTOCOL_BINARY_CMD_APPEND
;
430 ret
=PROTOCOL_BINARY_CMD_PREPEND
;
433 WATCHPOINT_ASSERT(verb
);
442 static memcached_return_t
memcached_send_binary(memcached_st
*ptr
,
443 memcached_server_write_instance_st server
,
452 memcached_storage_action_t verb
)
455 protocol_binary_request_set request
= {.bytes
= {0}};
456 size_t send_length
= sizeof(request
.bytes
);
458 bool noreply
= server
->root
->flags
.no_reply
;
460 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
461 request
.message
.header
.request
.opcode
= get_com_code(verb
, noreply
);
462 request
.message
.header
.request
.keylen
= htons((uint16_t)(key_length
+ ptr
->prefix_key_length
));
463 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
464 if (verb
== APPEND_OP
|| verb
== PREPEND_OP
)
465 send_length
-= 8; /* append & prepend does not contain extras! */
468 request
.message
.header
.request
.extlen
= 8;
469 request
.message
.body
.flags
= htonl(flags
);
470 request
.message
.body
.expiration
= htonl((uint32_t)expiration
);
473 request
.message
.header
.request
.bodylen
= htonl((uint32_t) (key_length
+ ptr
->prefix_key_length
+ value_length
+
474 request
.message
.header
.request
.extlen
));
477 request
.message
.header
.request
.cas
= htonll(cas
);
479 flush
= (bool) ((server
->root
->flags
.buffer_requests
&& verb
== SET_OP
) ? 0 : 1);
481 if (server
->root
->flags
.use_udp
&& ! flush
)
483 size_t cmd_size
= send_length
+ key_length
+ value_length
;
485 if (cmd_size
> MAX_UDP_DATAGRAM_LENGTH
- UDP_DATAGRAM_HEADER_LENGTH
)
487 return MEMCACHED_WRITE_FAILURE
;
489 if (cmd_size
+ server
->write_buffer_offset
> MAX_UDP_DATAGRAM_LENGTH
)
491 memcached_io_write(server
, NULL
, 0, true);
495 struct __write_vector_st vector
[]=
497 { .length
= send_length
, .buffer
= request
.bytes
},
498 { .length
= ptr
->prefix_key_length
, .buffer
= ptr
->prefix_key
},
499 { .length
= key_length
, .buffer
= key
},
500 { .length
= value_length
, .buffer
= value
}
503 /* write the header */
504 memcached_return_t rc
;
505 if ((rc
= memcached_vdo(server
, vector
, 4, flush
)) != MEMCACHED_SUCCESS
)
507 memcached_io_reset(server
);
508 return (rc
== MEMCACHED_SUCCESS
) ? MEMCACHED_WRITE_FAILURE
: rc
;
511 if (verb
== SET_OP
&& ptr
->number_of_replicas
> 0)
513 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_SETQ
;
514 WATCHPOINT_STRING("replicating");
516 for (uint32_t x
= 0; x
< ptr
->number_of_replicas
; x
++)
518 memcached_server_write_instance_st instance
;
521 if (server_key
== memcached_server_count(ptr
))
524 instance
= memcached_server_instance_fetch(ptr
, server_key
);
526 if (memcached_vdo(instance
, vector
, 4, false) != MEMCACHED_SUCCESS
)
528 memcached_io_reset(instance
);
532 memcached_server_response_decrement(instance
);
539 return MEMCACHED_BUFFERED
;
544 return MEMCACHED_SUCCESS
;
547 return memcached_response(server
, NULL
, 0, NULL
);