Merge in namespace fixes for binary protocol.
[m6w6/libmemcached] / libmemcached / storage.cc
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Storage related functions, aka set, replace,..
9 *
10 */
11
12 #include <libmemcached/common.h>
13
14 enum memcached_storage_action_t {
15 SET_OP,
16 REPLACE_OP,
17 ADD_OP,
18 PREPEND_OP,
19 APPEND_OP,
20 CAS_OP
21 };
22
23 /* Inline this */
24 static inline const char *storage_op_string(memcached_storage_action_t verb)
25 {
26 switch (verb)
27 {
28 case SET_OP:
29 return "set ";
30 case REPLACE_OP:
31 return "replace ";
32 case ADD_OP:
33 return "add ";
34 case PREPEND_OP:
35 return "prepend ";
36 case APPEND_OP:
37 return "append ";
38 case CAS_OP:
39 return "cas ";
40 default:
41 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
42 }
43
44 /* NOTREACHED */
45 }
46
47 static memcached_return_t memcached_send_binary(memcached_st *ptr,
48 memcached_server_write_instance_st server,
49 uint32_t server_key,
50 const char *key,
51 size_t key_length,
52 const char *value,
53 size_t value_length,
54 time_t expiration,
55 uint32_t flags,
56 uint64_t cas,
57 memcached_storage_action_t verb);
58
59 static inline memcached_return_t memcached_send(memcached_st *ptr,
60 const char *group_key, size_t group_key_length,
61 const char *key, size_t key_length,
62 const char *value, size_t value_length,
63 time_t expiration,
64 uint32_t flags,
65 uint64_t cas,
66 memcached_storage_action_t verb)
67 {
68 bool to_write;
69 size_t write_length;
70 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
71
72 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
73
74 memcached_return_t rc;
75 if (memcached_failed(rc= initialize_query(ptr)))
76 {
77 return rc;
78 }
79
80 if (memcached_failed(rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol)))
81 {
82 return rc;
83 }
84
85 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
86 {
87 return MEMCACHED_BAD_KEY_PROVIDED;
88 }
89
90 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
91 memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);
92
93 WATCHPOINT_SET(instance->io_wait_count.read= 0);
94 WATCHPOINT_SET(instance->io_wait_count.write= 0);
95
96 if (ptr->flags.binary_protocol)
97 {
98 rc= memcached_send_binary(ptr, instance, server_key,
99 key, key_length,
100 value, value_length, expiration,
101 flags, cas, verb);
102 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read);
103 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write);
104 }
105 else
106 {
107
108 if (cas)
109 {
110 int check_length;
111 check_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
112 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
113 storage_op_string(verb),
114 memcached_print_array(ptr->prefix_key),
115 (int)key_length, key, flags,
116 (unsigned long long)expiration, (unsigned long)value_length,
117 (unsigned long long)cas,
118 (ptr->flags.no_reply) ? " noreply" : "");
119 if (check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE || check_length < 0)
120 {
121 rc= MEMCACHED_WRITE_FAILURE;
122 memcached_io_reset(instance);
123
124 return rc;
125 }
126 write_length= check_length;
127 }
128 else
129 {
130 char *buffer_ptr= buffer;
131 const char *command= storage_op_string(verb);
132
133 /* Copy in the command, no space needed, we handle that in the command function*/
134 memcpy(buffer_ptr, command, strlen(command));
135
136 /* Copy in the key prefix, switch to the buffer_ptr */
137 buffer_ptr= (char *)memcpy((char *)(buffer_ptr + strlen(command)), (char *)memcached_array_string(ptr->prefix_key), memcached_array_size(ptr->prefix_key));
138
139 /* Copy in the key, adjust point if a key prefix was used. */
140 buffer_ptr= (char *)memcpy(buffer_ptr + memcached_array_size(ptr->prefix_key),
141 key, key_length);
142 buffer_ptr+= key_length;
143 buffer_ptr[0]= ' ';
144 buffer_ptr++;
145
146 write_length= (size_t)(buffer_ptr - buffer);
147 int check_length= snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE -(size_t)(buffer_ptr - buffer),
148 "%u %llu %lu%s\r\n",
149 flags,
150 (unsigned long long)expiration, (unsigned long)value_length,
151 ptr->flags.no_reply ? " noreply" : "");
152 if ((size_t)check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE -size_t(buffer_ptr - buffer) || check_length < 0)
153 {
154 rc= MEMCACHED_WRITE_FAILURE;
155 memcached_io_reset(instance);
156
157 return rc;
158 }
159
160 write_length+= (size_t)check_length;
161 WATCHPOINT_ASSERT(write_length < MEMCACHED_DEFAULT_COMMAND_SIZE);
162 }
163
164 if (ptr->flags.use_udp && ptr->flags.buffer_requests)
165 {
166 size_t cmd_size= write_length + value_length +2;
167 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
168 return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
169
170 if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
171 memcached_io_write(instance, NULL, 0, true);
172 }
173
174 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
175 {
176 rc= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
177 }
178 else
179 {
180 struct libmemcached_io_vector_st vector[]=
181 {
182 { write_length, buffer },
183 { value_length, value },
184 { 2, "\r\n" }
185 };
186
187 if (ptr->flags.buffer_requests && verb == SET_OP)
188 {
189 to_write= false;
190 }
191 else
192 {
193 to_write= true;
194 }
195
196 /* Send command header */
197 rc= memcached_vdo(instance, vector, 3, to_write);
198 if (rc == MEMCACHED_SUCCESS)
199 {
200
201 if (ptr->flags.no_reply)
202 {
203 rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
204 }
205 else if (to_write == false)
206 {
207 rc= MEMCACHED_BUFFERED;
208 }
209 else
210 {
211 rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
212
213 if (rc == MEMCACHED_STORED)
214 rc= MEMCACHED_SUCCESS;
215 }
216 }
217 }
218
219 if (rc == MEMCACHED_WRITE_FAILURE)
220 memcached_io_reset(instance);
221 }
222
223 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read);
224 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write);
225
226 return rc;
227 }
228
229
230 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
231 const char *value, size_t value_length,
232 time_t expiration,
233 uint32_t flags)
234 {
235 memcached_return_t rc;
236 LIBMEMCACHED_MEMCACHED_SET_START();
237 rc= memcached_send(ptr, key, key_length,
238 key, key_length, value, value_length,
239 expiration, flags, 0, SET_OP);
240 LIBMEMCACHED_MEMCACHED_SET_END();
241 return rc;
242 }
243
244 memcached_return_t memcached_add(memcached_st *ptr,
245 const char *key, size_t key_length,
246 const char *value, size_t value_length,
247 time_t expiration,
248 uint32_t flags)
249 {
250 memcached_return_t rc;
251 LIBMEMCACHED_MEMCACHED_ADD_START();
252 rc= memcached_send(ptr, key, key_length,
253 key, key_length, value, value_length,
254 expiration, flags, 0, ADD_OP);
255 LIBMEMCACHED_MEMCACHED_ADD_END();
256 return rc;
257 }
258
259 memcached_return_t memcached_replace(memcached_st *ptr,
260 const char *key, size_t key_length,
261 const char *value, size_t value_length,
262 time_t expiration,
263 uint32_t flags)
264 {
265 memcached_return_t rc;
266 LIBMEMCACHED_MEMCACHED_REPLACE_START();
267 rc= memcached_send(ptr, key, key_length,
268 key, key_length, value, value_length,
269 expiration, flags, 0, REPLACE_OP);
270 LIBMEMCACHED_MEMCACHED_REPLACE_END();
271 return rc;
272 }
273
274 memcached_return_t memcached_prepend(memcached_st *ptr,
275 const char *key, size_t key_length,
276 const char *value, size_t value_length,
277 time_t expiration,
278 uint32_t flags)
279 {
280 memcached_return_t rc;
281 rc= memcached_send(ptr, key, key_length,
282 key, key_length, value, value_length,
283 expiration, flags, 0, PREPEND_OP);
284 return rc;
285 }
286
287 memcached_return_t memcached_append(memcached_st *ptr,
288 const char *key, size_t key_length,
289 const char *value, size_t value_length,
290 time_t expiration,
291 uint32_t flags)
292 {
293 memcached_return_t rc;
294 rc= memcached_send(ptr, key, key_length,
295 key, key_length, value, value_length,
296 expiration, flags, 0, APPEND_OP);
297 return rc;
298 }
299
300 memcached_return_t memcached_cas(memcached_st *ptr,
301 const char *key, size_t key_length,
302 const char *value, size_t value_length,
303 time_t expiration,
304 uint32_t flags,
305 uint64_t cas)
306 {
307 memcached_return_t rc;
308 rc= memcached_send(ptr, key, key_length,
309 key, key_length, value, value_length,
310 expiration, flags, cas, CAS_OP);
311 return rc;
312 }
313
314 memcached_return_t memcached_set_by_key(memcached_st *ptr,
315 const char *group_key,
316 size_t group_key_length,
317 const char *key, size_t key_length,
318 const char *value, size_t value_length,
319 time_t expiration,
320 uint32_t flags)
321 {
322 memcached_return_t rc;
323 LIBMEMCACHED_MEMCACHED_SET_START();
324 rc= memcached_send(ptr, group_key, group_key_length,
325 key, key_length, value, value_length,
326 expiration, flags, 0, SET_OP);
327 LIBMEMCACHED_MEMCACHED_SET_END();
328 return rc;
329 }
330
331 memcached_return_t memcached_add_by_key(memcached_st *ptr,
332 const char *group_key, size_t group_key_length,
333 const char *key, size_t key_length,
334 const char *value, size_t value_length,
335 time_t expiration,
336 uint32_t flags)
337 {
338 memcached_return_t rc;
339 LIBMEMCACHED_MEMCACHED_ADD_START();
340 rc= memcached_send(ptr, group_key, group_key_length,
341 key, key_length, value, value_length,
342 expiration, flags, 0, ADD_OP);
343 LIBMEMCACHED_MEMCACHED_ADD_END();
344 return rc;
345 }
346
347 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
348 const char *group_key, size_t group_key_length,
349 const char *key, size_t key_length,
350 const char *value, size_t value_length,
351 time_t expiration,
352 uint32_t flags)
353 {
354 memcached_return_t rc;
355 LIBMEMCACHED_MEMCACHED_REPLACE_START();
356 rc= memcached_send(ptr, group_key, group_key_length,
357 key, key_length, value, value_length,
358 expiration, flags, 0, REPLACE_OP);
359 LIBMEMCACHED_MEMCACHED_REPLACE_END();
360 return rc;
361 }
362
363 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
364 const char *group_key, size_t group_key_length,
365 const char *key, size_t key_length,
366 const char *value, size_t value_length,
367 time_t expiration,
368 uint32_t flags)
369 {
370 memcached_return_t rc;
371 rc= memcached_send(ptr, group_key, group_key_length,
372 key, key_length, value, value_length,
373 expiration, flags, 0, PREPEND_OP);
374 return rc;
375 }
376
377 memcached_return_t memcached_append_by_key(memcached_st *ptr,
378 const char *group_key, size_t group_key_length,
379 const char *key, size_t key_length,
380 const char *value, size_t value_length,
381 time_t expiration,
382 uint32_t flags)
383 {
384 memcached_return_t rc;
385 rc= memcached_send(ptr, group_key, group_key_length,
386 key, key_length, value, value_length,
387 expiration, flags, 0, APPEND_OP);
388 return rc;
389 }
390
391 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
392 const char *group_key, size_t group_key_length,
393 const char *key, size_t key_length,
394 const char *value, size_t value_length,
395 time_t expiration,
396 uint32_t flags,
397 uint64_t cas)
398 {
399 memcached_return_t rc;
400 rc= memcached_send(ptr, group_key, group_key_length,
401 key, key_length, value, value_length,
402 expiration, flags, cas, CAS_OP);
403 return rc;
404 }
405
406 static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply)
407 {
408 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
409 * be used uninitialized in this function. FAIL */
410 uint8_t ret= 0;
411
412 if (noreply)
413 switch (verb)
414 {
415 case SET_OP:
416 ret=PROTOCOL_BINARY_CMD_SETQ;
417 break;
418 case ADD_OP:
419 ret=PROTOCOL_BINARY_CMD_ADDQ;
420 break;
421 case CAS_OP: /* FALLTHROUGH */
422 case REPLACE_OP:
423 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
424 break;
425 case APPEND_OP:
426 ret=PROTOCOL_BINARY_CMD_APPENDQ;
427 break;
428 case PREPEND_OP:
429 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
430 break;
431 default:
432 WATCHPOINT_ASSERT(verb);
433 break;
434 }
435 else
436 switch (verb)
437 {
438 case SET_OP:
439 ret=PROTOCOL_BINARY_CMD_SET;
440 break;
441 case ADD_OP:
442 ret=PROTOCOL_BINARY_CMD_ADD;
443 break;
444 case CAS_OP: /* FALLTHROUGH */
445 case REPLACE_OP:
446 ret=PROTOCOL_BINARY_CMD_REPLACE;
447 break;
448 case APPEND_OP:
449 ret=PROTOCOL_BINARY_CMD_APPEND;
450 break;
451 case PREPEND_OP:
452 ret=PROTOCOL_BINARY_CMD_PREPEND;
453 break;
454 default:
455 WATCHPOINT_ASSERT(verb);
456 break;
457 }
458
459 return ret;
460 }
461
462
463
464 static memcached_return_t memcached_send_binary(memcached_st *ptr,
465 memcached_server_write_instance_st server,
466 uint32_t server_key,
467 const char *key,
468 size_t key_length,
469 const char *value,
470 size_t value_length,
471 time_t expiration,
472 uint32_t flags,
473 uint64_t cas,
474 memcached_storage_action_t verb)
475 {
476 bool flush;
477 protocol_binary_request_set request= {};
478 size_t send_length= sizeof(request.bytes);
479
480 bool noreply= server->root->flags.no_reply;
481
482 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
483 request.message.header.request.opcode= get_com_code(verb, noreply);
484 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->prefix_key)));
485 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
486 if (verb == APPEND_OP || verb == PREPEND_OP)
487 send_length -= 8; /* append & prepend does not contain extras! */
488 else
489 {
490 request.message.header.request.extlen= 8;
491 request.message.body.flags= htonl(flags);
492 request.message.body.expiration= htonl((uint32_t)expiration);
493 }
494
495 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->prefix_key) + value_length +
496 request.message.header.request.extlen));
497
498 if (cas)
499 request.message.header.request.cas= memcached_htonll(cas);
500
501 flush= (bool) ((server->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1);
502
503 if (server->root->flags.use_udp && ! flush)
504 {
505 size_t cmd_size= send_length + key_length + value_length;
506
507 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
508 {
509 return MEMCACHED_WRITE_FAILURE;
510 }
511 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
512 {
513 memcached_io_write(server, NULL, 0, true);
514 }
515 }
516
517 struct libmemcached_io_vector_st vector[]=
518 {
519 { send_length, request.bytes },
520 { memcached_array_size(ptr->prefix_key), memcached_array_string(ptr->prefix_key) },
521 { key_length, key },
522 { value_length, value }
523 };
524
525 /* write the header */
526 memcached_return_t rc;
527 if ((rc= memcached_vdo(server, vector, 4, flush)) != MEMCACHED_SUCCESS)
528 {
529 memcached_io_reset(server);
530 return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc;
531 }
532
533 if (verb == SET_OP && ptr->number_of_replicas > 0)
534 {
535 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
536 WATCHPOINT_STRING("replicating");
537
538 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
539 {
540 memcached_server_write_instance_st instance;
541
542 ++server_key;
543 if (server_key == memcached_server_count(ptr))
544 server_key= 0;
545
546 instance= memcached_server_instance_fetch(ptr, server_key);
547
548 if (memcached_vdo(instance, vector, 4, false) != MEMCACHED_SUCCESS)
549 {
550 memcached_io_reset(instance);
551 }
552 else
553 {
554 memcached_server_response_decrement(instance);
555 }
556 }
557 }
558
559 if (flush == false)
560 {
561 return MEMCACHED_BUFFERED;
562 }
563
564 if (noreply)
565 {
566 return MEMCACHED_SUCCESS;
567 }
568
569 return memcached_response(server, NULL, 0, NULL);
570 }
571