331c7fae9ce7a985d2b906187657cfb9322d2c1b
[awesomized/libmemcached] / libmemcached / storage.c
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Storage related functions, aka set, replace,..
9 *
10 */
11
12 #include "common.h"
13
14 typedef enum {
15 SET_OP,
16 REPLACE_OP,
17 ADD_OP,
18 PREPEND_OP,
19 APPEND_OP,
20 CAS_OP,
21 } memcached_storage_action_t;
22
23 /* Inline this */
24 static inline const char *storage_op_string(memcached_storage_action_t verb)
25 {
26 switch (verb)
27 {
28 case SET_OP:
29 return "set ";
30 case REPLACE_OP:
31 return "replace ";
32 case ADD_OP:
33 return "add ";
34 case PREPEND_OP:
35 return "prepend ";
36 case APPEND_OP:
37 return "append ";
38 case CAS_OP:
39 return "cas ";
40 default:
41 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
42 }
43
44 /* NOTREACHED */
45 }
46
47 static memcached_return_t memcached_send_binary(memcached_st *ptr,
48 memcached_server_write_instance_st server,
49 uint32_t server_key,
50 const char *key,
51 size_t key_length,
52 const char *value,
53 size_t value_length,
54 time_t expiration,
55 uint32_t flags,
56 uint64_t cas,
57 memcached_storage_action_t verb);
58
59 static inline memcached_return_t memcached_send(memcached_st *ptr,
60 const char *master_key, size_t master_key_length,
61 const char *key, size_t key_length,
62 const char *value, size_t value_length,
63 time_t expiration,
64 uint32_t flags,
65 uint64_t cas,
66 memcached_storage_action_t verb)
67 {
68 bool to_write;
69 size_t write_length;
70 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
71 uint32_t server_key;
72 memcached_server_write_instance_st instance;
73
74 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
75
76 memcached_return_t rc;
77 if ((rc= initialize_query(ptr)) != MEMCACHED_SUCCESS)
78 {
79 return rc;
80 }
81
82 rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol);
83 unlikely (rc != MEMCACHED_SUCCESS)
84 return rc;
85
86 if (ptr->flags.verify_key && (memcached_key_test((const char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
87 return MEMCACHED_BAD_KEY_PROVIDED;
88
89 server_key= memcached_generate_hash_with_redistribution(ptr, master_key, master_key_length);
90 instance= memcached_server_instance_fetch(ptr, server_key);
91
92 WATCHPOINT_SET(instance->io_wait_count.read= 0);
93 WATCHPOINT_SET(instance->io_wait_count.write= 0);
94
95 if (ptr->flags.binary_protocol)
96 {
97 rc= memcached_send_binary(ptr, instance, server_key,
98 key, key_length,
99 value, value_length, expiration,
100 flags, cas, verb);
101 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read);
102 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write);
103 }
104 else
105 {
106
107 if (cas)
108 {
109 int check_length;
110 check_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
111 "%s %.*s%.*s %u %llu %lu %llu%s\r\n",
112 storage_op_string(verb),
113 memcached_print_array(ptr->prefix_key),
114 (int)key_length, key, flags,
115 (unsigned long long)expiration, (unsigned long)value_length,
116 (unsigned long long)cas,
117 (ptr->flags.no_reply) ? " noreply" : "");
118 if (check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE || check_length < 0)
119 {
120 rc= MEMCACHED_WRITE_FAILURE;
121 memcached_io_reset(instance);
122
123 return rc;
124 }
125 write_length= check_length;
126 }
127 else
128 {
129 char *buffer_ptr= buffer;
130 const char *command= storage_op_string(verb);
131
132 /* Copy in the command, no space needed, we handle that in the command function*/
133 memcpy(buffer_ptr, command, strlen(command));
134
135 /* Copy in the key prefix, switch to the buffer_ptr */
136 buffer_ptr= memcpy((buffer_ptr + strlen(command)), memcached_array_string(ptr->prefix_key), memcached_array_size(ptr->prefix_key));
137
138 /* Copy in the key, adjust point if a key prefix was used. */
139 buffer_ptr= memcpy(buffer_ptr + memcached_array_size(ptr->prefix_key),
140 key, key_length);
141 buffer_ptr+= key_length;
142 buffer_ptr[0]= ' ';
143 buffer_ptr++;
144
145 write_length= (size_t)(buffer_ptr - buffer);
146 int check_length;
147 check_length= snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE -(size_t)(buffer_ptr - buffer),
148 "%u %llu %lu%s\r\n",
149 flags,
150 (unsigned long long)expiration, (unsigned long)value_length,
151 ptr->flags.no_reply ? " noreply" : "");
152 if ((size_t)check_length >= MEMCACHED_DEFAULT_COMMAND_SIZE -(size_t)(buffer_ptr - buffer) || check_length < 0)
153 {
154 rc= MEMCACHED_WRITE_FAILURE;
155 memcached_io_reset(instance);
156
157 return rc;
158 }
159
160 write_length+= (size_t)check_length;
161 WATCHPOINT_ASSERT(write_length < MEMCACHED_DEFAULT_COMMAND_SIZE);
162 }
163
164 if (ptr->flags.use_udp && ptr->flags.buffer_requests)
165 {
166 size_t cmd_size= write_length + value_length + 2;
167 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
168 return MEMCACHED_WRITE_FAILURE;
169 if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
170 memcached_io_write(instance, NULL, 0, true);
171 }
172
173 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
174 {
175 rc= MEMCACHED_WRITE_FAILURE;
176 }
177 else
178 {
179 struct libmemcached_io_vector_st vector[]=
180 {
181 { .length= write_length, .buffer= buffer },
182 { .length= value_length, .buffer= value },
183 { .length= 2, .buffer= "\r\n" }
184 };
185
186 if (ptr->flags.buffer_requests && verb == SET_OP)
187 {
188 to_write= false;
189 }
190 else
191 {
192 to_write= true;
193 }
194
195 /* Send command header */
196 rc= memcached_vdo(instance, vector, 3, to_write);
197 if (rc == MEMCACHED_SUCCESS)
198 {
199
200 if (ptr->flags.no_reply)
201 {
202 rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
203 }
204 else if (to_write == false)
205 {
206 rc= MEMCACHED_BUFFERED;
207 }
208 else
209 {
210 rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
211
212 if (rc == MEMCACHED_STORED)
213 rc= MEMCACHED_SUCCESS;
214 }
215 }
216 }
217
218 if (rc == MEMCACHED_WRITE_FAILURE)
219 memcached_io_reset(instance);
220 }
221
222 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read);
223 WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write);
224
225 return rc;
226 }
227
228
229 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
230 const char *value, size_t value_length,
231 time_t expiration,
232 uint32_t flags)
233 {
234 memcached_return_t rc;
235 LIBMEMCACHED_MEMCACHED_SET_START();
236 rc= memcached_send(ptr, key, key_length,
237 key, key_length, value, value_length,
238 expiration, flags, 0, SET_OP);
239 LIBMEMCACHED_MEMCACHED_SET_END();
240 return rc;
241 }
242
243 memcached_return_t memcached_add(memcached_st *ptr,
244 const char *key, size_t key_length,
245 const char *value, size_t value_length,
246 time_t expiration,
247 uint32_t flags)
248 {
249 memcached_return_t rc;
250 LIBMEMCACHED_MEMCACHED_ADD_START();
251 rc= memcached_send(ptr, key, key_length,
252 key, key_length, value, value_length,
253 expiration, flags, 0, ADD_OP);
254 LIBMEMCACHED_MEMCACHED_ADD_END();
255 return rc;
256 }
257
258 memcached_return_t memcached_replace(memcached_st *ptr,
259 const char *key, size_t key_length,
260 const char *value, size_t value_length,
261 time_t expiration,
262 uint32_t flags)
263 {
264 memcached_return_t rc;
265 LIBMEMCACHED_MEMCACHED_REPLACE_START();
266 rc= memcached_send(ptr, key, key_length,
267 key, key_length, value, value_length,
268 expiration, flags, 0, REPLACE_OP);
269 LIBMEMCACHED_MEMCACHED_REPLACE_END();
270 return rc;
271 }
272
273 memcached_return_t memcached_prepend(memcached_st *ptr,
274 const char *key, size_t key_length,
275 const char *value, size_t value_length,
276 time_t expiration,
277 uint32_t flags)
278 {
279 memcached_return_t rc;
280 rc= memcached_send(ptr, key, key_length,
281 key, key_length, value, value_length,
282 expiration, flags, 0, PREPEND_OP);
283 return rc;
284 }
285
286 memcached_return_t memcached_append(memcached_st *ptr,
287 const char *key, size_t key_length,
288 const char *value, size_t value_length,
289 time_t expiration,
290 uint32_t flags)
291 {
292 memcached_return_t rc;
293 rc= memcached_send(ptr, key, key_length,
294 key, key_length, value, value_length,
295 expiration, flags, 0, APPEND_OP);
296 return rc;
297 }
298
299 memcached_return_t memcached_cas(memcached_st *ptr,
300 const char *key, size_t key_length,
301 const char *value, size_t value_length,
302 time_t expiration,
303 uint32_t flags,
304 uint64_t cas)
305 {
306 memcached_return_t rc;
307 rc= memcached_send(ptr, key, key_length,
308 key, key_length, value, value_length,
309 expiration, flags, cas, CAS_OP);
310 return rc;
311 }
312
313 memcached_return_t memcached_set_by_key(memcached_st *ptr,
314 const char *master_key,
315 size_t master_key_length,
316 const char *key, size_t key_length,
317 const char *value, size_t value_length,
318 time_t expiration,
319 uint32_t flags)
320 {
321 memcached_return_t rc;
322 LIBMEMCACHED_MEMCACHED_SET_START();
323 rc= memcached_send(ptr, master_key, master_key_length,
324 key, key_length, value, value_length,
325 expiration, flags, 0, SET_OP);
326 LIBMEMCACHED_MEMCACHED_SET_END();
327 return rc;
328 }
329
330 memcached_return_t memcached_add_by_key(memcached_st *ptr,
331 const char *master_key, size_t master_key_length,
332 const char *key, size_t key_length,
333 const char *value, size_t value_length,
334 time_t expiration,
335 uint32_t flags)
336 {
337 memcached_return_t rc;
338 LIBMEMCACHED_MEMCACHED_ADD_START();
339 rc= memcached_send(ptr, master_key, master_key_length,
340 key, key_length, value, value_length,
341 expiration, flags, 0, ADD_OP);
342 LIBMEMCACHED_MEMCACHED_ADD_END();
343 return rc;
344 }
345
346 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
347 const char *master_key, size_t master_key_length,
348 const char *key, size_t key_length,
349 const char *value, size_t value_length,
350 time_t expiration,
351 uint32_t flags)
352 {
353 memcached_return_t rc;
354 LIBMEMCACHED_MEMCACHED_REPLACE_START();
355 rc= memcached_send(ptr, master_key, master_key_length,
356 key, key_length, value, value_length,
357 expiration, flags, 0, REPLACE_OP);
358 LIBMEMCACHED_MEMCACHED_REPLACE_END();
359 return rc;
360 }
361
362 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
363 const char *master_key, size_t master_key_length,
364 const char *key, size_t key_length,
365 const char *value, size_t value_length,
366 time_t expiration,
367 uint32_t flags)
368 {
369 memcached_return_t rc;
370 rc= memcached_send(ptr, master_key, master_key_length,
371 key, key_length, value, value_length,
372 expiration, flags, 0, PREPEND_OP);
373 return rc;
374 }
375
376 memcached_return_t memcached_append_by_key(memcached_st *ptr,
377 const char *master_key, size_t master_key_length,
378 const char *key, size_t key_length,
379 const char *value, size_t value_length,
380 time_t expiration,
381 uint32_t flags)
382 {
383 memcached_return_t rc;
384 rc= memcached_send(ptr, master_key, master_key_length,
385 key, key_length, value, value_length,
386 expiration, flags, 0, APPEND_OP);
387 return rc;
388 }
389
390 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
391 const char *master_key, size_t master_key_length,
392 const char *key, size_t key_length,
393 const char *value, size_t value_length,
394 time_t expiration,
395 uint32_t flags,
396 uint64_t cas)
397 {
398 memcached_return_t rc;
399 rc= memcached_send(ptr, master_key, master_key_length,
400 key, key_length, value, value_length,
401 expiration, flags, cas, CAS_OP);
402 return rc;
403 }
404
405 static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply)
406 {
407 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
408 * be used uninitialized in this function. FAIL */
409 uint8_t ret= 0;
410
411 if (noreply)
412 switch (verb)
413 {
414 case SET_OP:
415 ret=PROTOCOL_BINARY_CMD_SETQ;
416 break;
417 case ADD_OP:
418 ret=PROTOCOL_BINARY_CMD_ADDQ;
419 break;
420 case CAS_OP: /* FALLTHROUGH */
421 case REPLACE_OP:
422 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
423 break;
424 case APPEND_OP:
425 ret=PROTOCOL_BINARY_CMD_APPENDQ;
426 break;
427 case PREPEND_OP:
428 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
429 break;
430 default:
431 WATCHPOINT_ASSERT(verb);
432 break;
433 }
434 else
435 switch (verb)
436 {
437 case SET_OP:
438 ret=PROTOCOL_BINARY_CMD_SET;
439 break;
440 case ADD_OP:
441 ret=PROTOCOL_BINARY_CMD_ADD;
442 break;
443 case CAS_OP: /* FALLTHROUGH */
444 case REPLACE_OP:
445 ret=PROTOCOL_BINARY_CMD_REPLACE;
446 break;
447 case APPEND_OP:
448 ret=PROTOCOL_BINARY_CMD_APPEND;
449 break;
450 case PREPEND_OP:
451 ret=PROTOCOL_BINARY_CMD_PREPEND;
452 break;
453 default:
454 WATCHPOINT_ASSERT(verb);
455 break;
456 }
457
458 return ret;
459 }
460
461
462
463 static memcached_return_t memcached_send_binary(memcached_st *ptr,
464 memcached_server_write_instance_st server,
465 uint32_t server_key,
466 const char *key,
467 size_t key_length,
468 const char *value,
469 size_t value_length,
470 time_t expiration,
471 uint32_t flags,
472 uint64_t cas,
473 memcached_storage_action_t verb)
474 {
475 bool flush;
476 protocol_binary_request_set request= {.bytes= {0}};
477 size_t send_length= sizeof(request.bytes);
478
479 bool noreply= server->root->flags.no_reply;
480
481 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
482 request.message.header.request.opcode= get_com_code(verb, noreply);
483 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->prefix_key)));
484 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
485 if (verb == APPEND_OP || verb == PREPEND_OP)
486 send_length -= 8; /* append & prepend does not contain extras! */
487 else
488 {
489 request.message.header.request.extlen= 8;
490 request.message.body.flags= htonl(flags);
491 request.message.body.expiration= htonl((uint32_t)expiration);
492 }
493
494 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->prefix_key) + value_length +
495 request.message.header.request.extlen));
496
497 if (cas)
498 request.message.header.request.cas= htonll(cas);
499
500 flush= (bool) ((server->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1);
501
502 if (server->root->flags.use_udp && ! flush)
503 {
504 size_t cmd_size= send_length + key_length + value_length;
505
506 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
507 {
508 return MEMCACHED_WRITE_FAILURE;
509 }
510 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
511 {
512 memcached_io_write(server, NULL, 0, true);
513 }
514 }
515
516 struct libmemcached_io_vector_st vector[]=
517 {
518 { .length= send_length, .buffer= request.bytes },
519 { .length= memcached_array_size(ptr->prefix_key), .buffer= memcached_array_string(ptr->prefix_key) },
520 { .length= key_length, .buffer= key },
521 { .length= value_length, .buffer= value }
522 };
523
524 /* write the header */
525 memcached_return_t rc;
526 if ((rc= memcached_vdo(server, vector, 4, flush)) != MEMCACHED_SUCCESS)
527 {
528 memcached_io_reset(server);
529 return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc;
530 }
531
532 if (verb == SET_OP && ptr->number_of_replicas > 0)
533 {
534 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
535 WATCHPOINT_STRING("replicating");
536
537 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
538 {
539 memcached_server_write_instance_st instance;
540
541 ++server_key;
542 if (server_key == memcached_server_count(ptr))
543 server_key= 0;
544
545 instance= memcached_server_instance_fetch(ptr, server_key);
546
547 if (memcached_vdo(instance, vector, 4, false) != MEMCACHED_SUCCESS)
548 {
549 memcached_io_reset(instance);
550 }
551 else
552 {
553 memcached_server_response_decrement(instance);
554 }
555 }
556 }
557
558 if (flush == false)
559 {
560 return MEMCACHED_BUFFERED;
561 }
562
563 if (noreply)
564 {
565 return MEMCACHED_SUCCESS;
566 }
567
568 return memcached_response(server, NULL, 0, NULL);
569 }
570