First pass through turning instance into ++
[m6w6/libmemcached] / libmemcached / storage.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 enum memcached_storage_action_t {
42 SET_OP,
43 REPLACE_OP,
44 ADD_OP,
45 PREPEND_OP,
46 APPEND_OP,
47 CAS_OP
48 };
49
50 /* Inline this */
51 static inline const char *storage_op_string(memcached_storage_action_t verb)
52 {
53 switch (verb)
54 {
55 case REPLACE_OP:
56 return "replace ";
57
58 case ADD_OP:
59 return "add ";
60
61 case PREPEND_OP:
62 return "prepend ";
63
64 case APPEND_OP:
65 return "append ";
66
67 case CAS_OP:
68 return "cas ";
69
70 case SET_OP:
71 break;
72 }
73
74 return "set ";
75 }
76
77 static inline uint8_t can_by_encrypted(const memcached_storage_action_t verb)
78 {
79 switch (verb)
80 {
81 case SET_OP:
82 case ADD_OP:
83 case CAS_OP:
84 case REPLACE_OP:
85 return true;
86
87 case APPEND_OP:
88 case PREPEND_OP:
89 break;
90 }
91
92 return false;
93 }
94
95 static inline uint8_t get_com_code(const memcached_storage_action_t verb, const bool reply)
96 {
97 if (reply == false)
98 {
99 switch (verb)
100 {
101 case SET_OP:
102 return PROTOCOL_BINARY_CMD_SETQ;
103
104 case ADD_OP:
105 return PROTOCOL_BINARY_CMD_ADDQ;
106
107 case CAS_OP: /* FALLTHROUGH */
108 case REPLACE_OP:
109 return PROTOCOL_BINARY_CMD_REPLACEQ;
110
111 case APPEND_OP:
112 return PROTOCOL_BINARY_CMD_APPENDQ;
113
114 case PREPEND_OP:
115 return PROTOCOL_BINARY_CMD_PREPENDQ;
116 }
117 }
118
119 switch (verb)
120 {
121 case SET_OP:
122 break;
123
124 case ADD_OP:
125 return PROTOCOL_BINARY_CMD_ADD;
126
127 case CAS_OP: /* FALLTHROUGH */
128 case REPLACE_OP:
129 return PROTOCOL_BINARY_CMD_REPLACE;
130
131 case APPEND_OP:
132 return PROTOCOL_BINARY_CMD_APPEND;
133
134 case PREPEND_OP:
135 return PROTOCOL_BINARY_CMD_PREPEND;
136 }
137
138 return PROTOCOL_BINARY_CMD_SET;
139 }
140
141 static memcached_return_t memcached_send_binary(memcached_st *ptr,
142 org::libmemcached::Instance* server,
143 uint32_t server_key,
144 const char *key,
145 const size_t key_length,
146 const char *value,
147 const size_t value_length,
148 const time_t expiration,
149 const uint32_t flags,
150 const uint64_t cas,
151 const bool flush,
152 const bool reply,
153 memcached_storage_action_t verb)
154 {
155 protocol_binary_request_set request= {};
156 size_t send_length= sizeof(request.bytes);
157
158 initialize_binary_request(server, request.message.header);
159
160 request.message.header.request.opcode= get_com_code(verb, reply);
161 request.message.header.request.keylen= htons((uint16_t)(key_length + memcached_array_size(ptr->_namespace)));
162 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
163 if (verb == APPEND_OP or verb == PREPEND_OP)
164 {
165 send_length -= 8; /* append & prepend does not contain extras! */
166 }
167 else
168 {
169 request.message.header.request.extlen= 8;
170 request.message.body.flags= htonl(flags);
171 request.message.body.expiration= htonl((uint32_t)expiration);
172 }
173
174 request.message.header.request.bodylen= htonl((uint32_t) (key_length + memcached_array_size(ptr->_namespace) + value_length +
175 request.message.header.request.extlen));
176
177 if (cas)
178 {
179 request.message.header.request.cas= memcached_htonll(cas);
180 }
181
182 libmemcached_io_vector_st vector[]=
183 {
184 { NULL, 0 },
185 { request.bytes, send_length },
186 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
187 { key, key_length },
188 { value, value_length }
189 };
190
191 /* write the header */
192 memcached_return_t rc;
193 if ((rc= memcached_vdo(server, vector, 5, flush)) != MEMCACHED_SUCCESS)
194 {
195 memcached_io_reset(server);
196
197 #if 0
198 if (memcached_has_error(ptr))
199 {
200 memcached_set_error(*server, rc, MEMCACHED_AT);
201 }
202 #endif
203
204 return MEMCACHED_WRITE_FAILURE;
205 }
206
207 if (verb == SET_OP and ptr->number_of_replicas > 0)
208 {
209 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
210 WATCHPOINT_STRING("replicating");
211
212 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
213 {
214 ++server_key;
215 if (server_key == memcached_server_count(ptr))
216 {
217 server_key= 0;
218 }
219
220 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, server_key);
221
222 if (memcached_vdo(instance, vector, 5, false) != MEMCACHED_SUCCESS)
223 {
224 memcached_io_reset(instance);
225 }
226 else
227 {
228 memcached_server_response_decrement(instance);
229 }
230 }
231 }
232
233 if (flush == false)
234 {
235 return MEMCACHED_BUFFERED;
236 }
237
238 // No reply always assumes success
239 if (reply == false)
240 {
241 return MEMCACHED_SUCCESS;
242 }
243
244 return memcached_response(server, NULL, 0, NULL);
245 }
246
247 static memcached_return_t memcached_send_ascii(memcached_st *ptr,
248 org::libmemcached::Instance* instance,
249 const char *key,
250 const size_t key_length,
251 const char *value,
252 const size_t value_length,
253 const time_t expiration,
254 const uint32_t flags,
255 const uint64_t cas,
256 const bool flush,
257 const bool reply,
258 const memcached_storage_action_t verb)
259 {
260 char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
261 int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
262 if (size_t(flags_buffer_length) >= sizeof(flags_buffer) or flags_buffer_length < 0)
263 {
264 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
265 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
266 }
267
268 char expiration_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
269 int expiration_buffer_length= snprintf(expiration_buffer, sizeof(expiration_buffer), " %llu", (unsigned long long)expiration);
270 if (size_t(expiration_buffer_length) >= sizeof(expiration_buffer) or expiration_buffer_length < 0)
271 {
272 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
273 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
274 }
275
276 char value_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
277 int value_buffer_length= snprintf(value_buffer, sizeof(value_buffer), " %llu", (unsigned long long)value_length);
278 if (size_t(value_buffer_length) >= sizeof(value_buffer) or value_buffer_length < 0)
279 {
280 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
281 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
282 }
283
284 char cas_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
285 int cas_buffer_length= 0;
286 if (cas)
287 {
288 cas_buffer_length= snprintf(cas_buffer, sizeof(cas_buffer), " %llu", (unsigned long long)cas);
289 if (size_t(cas_buffer_length) >= sizeof(cas_buffer) or cas_buffer_length < 0)
290 {
291 return memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
292 memcached_literal_param("snprintf(MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH)"));
293 }
294 }
295
296 libmemcached_io_vector_st vector[]=
297 {
298 { NULL, 0 },
299 { storage_op_string(verb), strlen(storage_op_string(verb))},
300 { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
301 { key, key_length },
302 { flags_buffer, size_t(flags_buffer_length) },
303 { expiration_buffer, size_t(expiration_buffer_length) },
304 { value_buffer, size_t(value_buffer_length) },
305 { cas_buffer, size_t(cas_buffer_length) },
306 { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
307 { memcached_literal_param("\r\n") },
308 { value, value_length },
309 { memcached_literal_param("\r\n") }
310 };
311
312 /* Send command header */
313 memcached_return_t rc= memcached_vdo(instance, vector, 12, flush);
314
315 // If we should not reply, return with MEMCACHED_SUCCESS, unless error
316 if (reply == false)
317 {
318 return memcached_success(rc) ? MEMCACHED_SUCCESS : rc;
319 }
320
321 if (flush == false)
322 {
323 return memcached_success(rc) ? MEMCACHED_BUFFERED : rc;
324 }
325
326 if (rc == MEMCACHED_SUCCESS)
327 {
328 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
329 rc= memcached_response(instance, buffer, sizeof(buffer), NULL);
330
331 if (rc == MEMCACHED_STORED)
332 {
333 return MEMCACHED_SUCCESS;
334 }
335 }
336
337 if (rc == MEMCACHED_WRITE_FAILURE)
338 {
339 memcached_io_reset(instance);
340 }
341
342 assert(memcached_failed(rc));
343 #if 0
344 if (memcached_has_error(ptr) == false)
345 {
346 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
347 }
348 #endif
349
350 return rc;
351 }
352
353 static inline memcached_return_t memcached_send(memcached_st *ptr,
354 const char *group_key, size_t group_key_length,
355 const char *key, size_t key_length,
356 const char *value, size_t value_length,
357 const time_t expiration,
358 const uint32_t flags,
359 const uint64_t cas,
360 memcached_storage_action_t verb)
361 {
362 memcached_return_t rc;
363 if (memcached_failed(rc= initialize_query(ptr, true)))
364 {
365 return rc;
366 }
367
368 if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
369 {
370 return memcached_last_error(ptr);
371 }
372
373 uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
374 org::libmemcached::Instance* instance= memcached_instance_fetch(ptr, server_key);
375
376 WATCHPOINT_SET(instance->io_wait_count.read= 0);
377 WATCHPOINT_SET(instance->io_wait_count.write= 0);
378
379
380 bool flush= true;
381 if (memcached_is_buffering(instance->root) and verb == SET_OP)
382 {
383 flush= false;
384 }
385
386 bool reply= memcached_is_replying(ptr);
387
388 hashkit_string_st* destination= NULL;
389
390 if (memcached_is_encrypted(ptr))
391 {
392 if (can_by_encrypted(verb) == false)
393 {
394 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
395 memcached_literal_param("Operation not allowed while encyrption is enabled"));
396 }
397
398 if ((destination= hashkit_encrypt(&ptr->hashkit, value, value_length)) == NULL)
399 {
400 return rc;
401 }
402 value= hashkit_string_c_str(destination);
403 value_length= hashkit_string_length(destination);
404 }
405
406 if (memcached_is_binary(ptr))
407 {
408 rc= memcached_send_binary(ptr, instance, server_key,
409 key, key_length,
410 value, value_length, expiration,
411 flags, cas, flush, reply, verb);
412 }
413 else
414 {
415 rc= memcached_send_ascii(ptr, instance,
416 key, key_length,
417 value, value_length, expiration,
418 flags, cas, flush, reply, verb);
419 }
420
421 hashkit_string_free(destination);
422
423 return rc;
424 }
425
426
427 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
428 const char *value, size_t value_length,
429 time_t expiration,
430 uint32_t flags)
431 {
432 memcached_return_t rc;
433 LIBMEMCACHED_MEMCACHED_SET_START();
434 rc= memcached_send(ptr, key, key_length,
435 key, key_length, value, value_length,
436 expiration, flags, 0, SET_OP);
437 LIBMEMCACHED_MEMCACHED_SET_END();
438 return rc;
439 }
440
441 memcached_return_t memcached_add(memcached_st *ptr,
442 const char *key, size_t key_length,
443 const char *value, size_t value_length,
444 time_t expiration,
445 uint32_t flags)
446 {
447 memcached_return_t rc;
448 LIBMEMCACHED_MEMCACHED_ADD_START();
449 rc= memcached_send(ptr, key, key_length,
450 key, key_length, value, value_length,
451 expiration, flags, 0, ADD_OP);
452
453 LIBMEMCACHED_MEMCACHED_ADD_END();
454 return rc;
455 }
456
457 memcached_return_t memcached_replace(memcached_st *ptr,
458 const char *key, size_t key_length,
459 const char *value, size_t value_length,
460 time_t expiration,
461 uint32_t flags)
462 {
463 memcached_return_t rc;
464 LIBMEMCACHED_MEMCACHED_REPLACE_START();
465 rc= memcached_send(ptr, key, key_length,
466 key, key_length, value, value_length,
467 expiration, flags, 0, REPLACE_OP);
468 LIBMEMCACHED_MEMCACHED_REPLACE_END();
469 return rc;
470 }
471
472 memcached_return_t memcached_prepend(memcached_st *ptr,
473 const char *key, size_t key_length,
474 const char *value, size_t value_length,
475 time_t expiration,
476 uint32_t flags)
477 {
478 memcached_return_t rc;
479 rc= memcached_send(ptr, key, key_length,
480 key, key_length, value, value_length,
481 expiration, flags, 0, PREPEND_OP);
482 return rc;
483 }
484
485 memcached_return_t memcached_append(memcached_st *ptr,
486 const char *key, size_t key_length,
487 const char *value, size_t value_length,
488 time_t expiration,
489 uint32_t flags)
490 {
491 memcached_return_t rc;
492 rc= memcached_send(ptr, key, key_length,
493 key, key_length, value, value_length,
494 expiration, flags, 0, APPEND_OP);
495 return rc;
496 }
497
498 memcached_return_t memcached_cas(memcached_st *ptr,
499 const char *key, size_t key_length,
500 const char *value, size_t value_length,
501 time_t expiration,
502 uint32_t flags,
503 uint64_t cas)
504 {
505 memcached_return_t rc;
506 rc= memcached_send(ptr, key, key_length,
507 key, key_length, value, value_length,
508 expiration, flags, cas, CAS_OP);
509 return rc;
510 }
511
512 memcached_return_t memcached_set_by_key(memcached_st *ptr,
513 const char *group_key,
514 size_t group_key_length,
515 const char *key, size_t key_length,
516 const char *value, size_t value_length,
517 time_t expiration,
518 uint32_t flags)
519 {
520 memcached_return_t rc;
521 LIBMEMCACHED_MEMCACHED_SET_START();
522 rc= memcached_send(ptr, group_key, group_key_length,
523 key, key_length, value, value_length,
524 expiration, flags, 0, SET_OP);
525 LIBMEMCACHED_MEMCACHED_SET_END();
526 return rc;
527 }
528
529 memcached_return_t memcached_add_by_key(memcached_st *ptr,
530 const char *group_key, size_t group_key_length,
531 const char *key, size_t key_length,
532 const char *value, size_t value_length,
533 time_t expiration,
534 uint32_t flags)
535 {
536 memcached_return_t rc;
537 LIBMEMCACHED_MEMCACHED_ADD_START();
538 rc= memcached_send(ptr, group_key, group_key_length,
539 key, key_length, value, value_length,
540 expiration, flags, 0, ADD_OP);
541 LIBMEMCACHED_MEMCACHED_ADD_END();
542 return rc;
543 }
544
545 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
546 const char *group_key, size_t group_key_length,
547 const char *key, size_t key_length,
548 const char *value, size_t value_length,
549 time_t expiration,
550 uint32_t flags)
551 {
552 memcached_return_t rc;
553 LIBMEMCACHED_MEMCACHED_REPLACE_START();
554 rc= memcached_send(ptr, group_key, group_key_length,
555 key, key_length, value, value_length,
556 expiration, flags, 0, REPLACE_OP);
557 LIBMEMCACHED_MEMCACHED_REPLACE_END();
558 return rc;
559 }
560
561 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
562 const char *group_key, size_t group_key_length,
563 const char *key, size_t key_length,
564 const char *value, size_t value_length,
565 time_t expiration,
566 uint32_t flags)
567 {
568 return memcached_send(ptr, group_key, group_key_length,
569 key, key_length, value, value_length,
570 expiration, flags, 0, PREPEND_OP);
571 }
572
573 memcached_return_t memcached_append_by_key(memcached_st *ptr,
574 const char *group_key, size_t group_key_length,
575 const char *key, size_t key_length,
576 const char *value, size_t value_length,
577 time_t expiration,
578 uint32_t flags)
579 {
580 return memcached_send(ptr, group_key, group_key_length,
581 key, key_length, value, value_length,
582 expiration, flags, 0, APPEND_OP);
583 }
584
585 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
586 const char *group_key, size_t group_key_length,
587 const char *key, size_t key_length,
588 const char *value, size_t value_length,
589 time_t expiration,
590 uint32_t flags,
591 uint64_t cas)
592 {
593 return memcached_send(ptr, group_key, group_key_length,
594 key, key_length, value, value_length,
595 expiration, flags, cas, CAS_OP);
596 }
597