Add memcached_exist()
[awesomized/libmemcached] / libmemcached / response.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include <libmemcached/common.h>
39 #include <libmemcached/string.hpp>
40
41 static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
42 char *buffer, size_t buffer_length,
43 memcached_result_st *result);
44 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
45 char *buffer, size_t buffer_length,
46 memcached_result_st *result);
47
48 memcached_return_t memcached_read_one_response(memcached_server_write_instance_st ptr,
49 char *buffer, size_t buffer_length,
50 memcached_result_st *result)
51 {
52 memcached_server_response_decrement(ptr);
53
54 if (result == NULL)
55 {
56 memcached_st *root= (memcached_st *)ptr->root;
57 result = &root->result;
58 }
59
60 memcached_return_t rc;
61 if (ptr->root->flags.binary_protocol)
62 {
63 rc= binary_read_one_response(ptr, buffer, buffer_length, result);
64 }
65 else
66 {
67 rc= textual_read_one_response(ptr, buffer, buffer_length, result);
68 }
69
70 unlikely(rc == MEMCACHED_UNKNOWN_READ_FAILURE or
71 rc == MEMCACHED_PROTOCOL_ERROR or
72 rc == MEMCACHED_CLIENT_ERROR or
73 rc == MEMCACHED_MEMORY_ALLOCATION_FAILURE)
74 memcached_io_reset(ptr);
75
76 return rc;
77 }
78
79 memcached_return_t memcached_response(memcached_server_write_instance_st ptr,
80 char *buffer, size_t buffer_length,
81 memcached_result_st *result)
82 {
83 /* We may have old commands in the buffer not set, first purge */
84 if ((ptr->root->flags.no_block) && (memcached_is_processing_input(ptr->root) == false))
85 {
86 (void)memcached_io_write(ptr, NULL, 0, true);
87 }
88
89 /*
90 * The previous implementation purged all pending requests and just
91 * returned the last one. Purge all pending messages to ensure backwards
92 * compatibility.
93 */
94 if (ptr->root->flags.binary_protocol == false)
95 {
96 while (memcached_server_response_count(ptr) > 1)
97 {
98 memcached_return_t rc= memcached_read_one_response(ptr, buffer, buffer_length, result);
99
100 unlikely (rc != MEMCACHED_END &&
101 rc != MEMCACHED_STORED &&
102 rc != MEMCACHED_SUCCESS &&
103 rc != MEMCACHED_STAT &&
104 rc != MEMCACHED_DELETED &&
105 rc != MEMCACHED_NOTFOUND &&
106 rc != MEMCACHED_NOTSTORED &&
107 rc != MEMCACHED_DATA_EXISTS)
108 return rc;
109 }
110 }
111
112 return memcached_read_one_response(ptr, buffer, buffer_length, result);
113 }
114
115 static memcached_return_t textual_value_fetch(memcached_server_write_instance_st ptr,
116 char *buffer,
117 memcached_result_st *result)
118 {
119 char *string_ptr;
120 char *end_ptr;
121 char *next_ptr;
122 size_t value_length;
123 size_t to_read;
124 ssize_t read_length= 0;
125
126 if (ptr->root->flags.use_udp)
127 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
128
129 WATCHPOINT_ASSERT(ptr->root);
130 end_ptr= buffer + MEMCACHED_DEFAULT_COMMAND_SIZE;
131
132 memcached_result_reset(result);
133
134 string_ptr= buffer;
135 string_ptr+= 6; /* "VALUE " */
136
137
138 /* We load the key */
139 {
140 char *key;
141 size_t prefix_length;
142
143 key= result->item_key;
144 result->key_length= 0;
145
146 for (prefix_length= memcached_array_size(ptr->root->_namespace); !(iscntrl(*string_ptr) || isspace(*string_ptr)) ; string_ptr++)
147 {
148 if (prefix_length == 0)
149 {
150 *key= *string_ptr;
151 key++;
152 result->key_length++;
153 }
154 else
155 prefix_length--;
156 }
157 result->item_key[result->key_length]= 0;
158 }
159
160 if (end_ptr == string_ptr)
161 goto read_error;
162
163 /* Flags fetch move past space */
164 string_ptr++;
165 if (end_ptr == string_ptr)
166 goto read_error;
167
168 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
169 result->item_flags= (uint32_t) strtoul(next_ptr, &string_ptr, 10);
170
171 if (end_ptr == string_ptr)
172 goto read_error;
173
174 /* Length fetch move past space*/
175 string_ptr++;
176 if (end_ptr == string_ptr)
177 goto read_error;
178
179 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
180 value_length= (size_t)strtoull(next_ptr, &string_ptr, 10);
181
182 if (end_ptr == string_ptr)
183 goto read_error;
184
185 /* Skip spaces */
186 if (*string_ptr == '\r')
187 {
188 /* Skip past the \r\n */
189 string_ptr+= 2;
190 }
191 else
192 {
193 string_ptr++;
194 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
195 result->item_cas= strtoull(next_ptr, &string_ptr, 10);
196 }
197
198 if (end_ptr < string_ptr)
199 goto read_error;
200
201 /* We add two bytes so that we can walk the \r\n */
202 if (memcached_failed(memcached_string_check(&result->value, value_length +2)))
203 {
204 value_length= 0;
205 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
206 }
207
208 {
209 char *value_ptr= memcached_string_value_mutable(&result->value);
210 /*
211 We read the \r\n into the string since not doing so is more
212 cycles then the waster of memory to do so.
213
214 We are null terminating through, which will most likely make
215 some people lazy about using the return length.
216 */
217 to_read= (value_length) + 2;
218 memcached_return_t rrc= memcached_io_read(ptr, value_ptr, to_read, &read_length);
219 if (memcached_failed(rrc) and rrc == MEMCACHED_IN_PROGRESS)
220 {
221 memcached_quit_server(ptr, true);
222 return memcached_set_error(*ptr, MEMCACHED_IN_PROGRESS, MEMCACHED_AT);
223 }
224 else if (memcached_failed(rrc))
225 {
226 return rrc;
227 }
228 }
229
230 if (read_length != (ssize_t)(value_length + 2))
231 {
232 goto read_error;
233 }
234
235 /* This next bit blows the API, but this is internal....*/
236 {
237 char *char_ptr;
238 char_ptr= memcached_string_value_mutable(&result->value);;
239 char_ptr[value_length]= 0;
240 char_ptr[value_length +1]= 0;
241 memcached_string_set_length(&result->value, value_length);
242 }
243
244 return MEMCACHED_SUCCESS;
245
246 read_error:
247 memcached_io_reset(ptr);
248
249 return MEMCACHED_PARTIAL_READ;
250 }
251
252 static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
253 char *buffer, size_t buffer_length,
254 memcached_result_st *result)
255 {
256 size_t total_read;
257 memcached_return_t rc= memcached_io_readline(ptr, buffer, buffer_length, total_read);
258
259 if (memcached_failed(rc))
260 {
261 return rc;
262 }
263
264 switch(buffer[0])
265 {
266 case 'V': /* VALUE || VERSION */
267 if (buffer[1] == 'A') /* VALUE */
268 {
269 /* We add back in one because we will need to search for END */
270 memcached_server_response_increment(ptr);
271 return textual_value_fetch(ptr, buffer, result);
272 }
273 else if (buffer[1] == 'E') /* VERSION */
274 {
275 return MEMCACHED_SUCCESS;
276 }
277 else
278 {
279 WATCHPOINT_STRING(buffer);
280 return MEMCACHED_UNKNOWN_READ_FAILURE;
281 }
282 case 'O': /* OK */
283 return MEMCACHED_SUCCESS;
284 case 'S': /* STORED STATS SERVER_ERROR */
285 {
286 if (buffer[2] == 'A') /* STORED STATS */
287 {
288 memcached_server_response_increment(ptr);
289 return MEMCACHED_STAT;
290 }
291 else if (buffer[1] == 'E') /* SERVER_ERROR */
292 {
293 if (total_read == memcached_literal_param_size("SERVER_ERROR"))
294 {
295 return MEMCACHED_SERVER_ERROR;
296 }
297
298 if (total_read > memcached_literal_param_size("SERVER_ERROR object too large for cache") and
299 (memcmp(buffer, memcached_literal_param("SERVER_ERROR object too large for cache")) == 0))
300 {
301 return MEMCACHED_E2BIG;
302 }
303
304 // Move past the basic error message and whitespace
305 char *startptr= buffer + memcached_literal_param_size("SERVER_ERROR");
306 if (startptr[0] == ' ')
307 {
308 startptr++;
309 }
310
311 char *endptr= startptr;
312 while (*endptr != '\r' && *endptr != '\n') endptr++;
313
314 return memcached_set_error(*ptr, MEMCACHED_SERVER_ERROR, MEMCACHED_AT, startptr, size_t(endptr - startptr));
315 }
316 else if (buffer[1] == 'T')
317 {
318 return MEMCACHED_STORED;
319 }
320 else
321 {
322 WATCHPOINT_STRING(buffer);
323 return MEMCACHED_UNKNOWN_READ_FAILURE;
324 }
325 }
326 case 'D': /* DELETED */
327 return MEMCACHED_DELETED;
328
329 case 'N': /* NOT_FOUND */
330 {
331 if (buffer[4] == 'F')
332 return MEMCACHED_NOTFOUND;
333 else if (buffer[4] == 'S')
334 return MEMCACHED_NOTSTORED;
335 else
336 {
337 WATCHPOINT_STRING(buffer);
338 return MEMCACHED_UNKNOWN_READ_FAILURE;
339 }
340 }
341 case 'E': /* PROTOCOL ERROR or END */
342 {
343 if (buffer[1] == 'N')
344 return MEMCACHED_END;
345 else if (buffer[1] == 'R')
346 return MEMCACHED_PROTOCOL_ERROR;
347 else if (buffer[1] == 'X')
348 return MEMCACHED_DATA_EXISTS;
349 else
350 {
351 WATCHPOINT_STRING(buffer);
352 return MEMCACHED_UNKNOWN_READ_FAILURE;
353 }
354
355 }
356 case 'I': /* CLIENT ERROR */
357 /* We add back in one because we will need to search for END */
358 memcached_server_response_increment(ptr);
359 return MEMCACHED_ITEM;
360 case 'C': /* CLIENT ERROR */
361 return MEMCACHED_CLIENT_ERROR;
362 default:
363 {
364 unsigned long long auto_return_value;
365
366 if (sscanf(buffer, "%llu", &auto_return_value) == 1)
367 return MEMCACHED_SUCCESS;
368
369 WATCHPOINT_STRING(buffer);
370 return MEMCACHED_UNKNOWN_READ_FAILURE;
371 }
372 }
373
374 /* NOTREACHED */
375 }
376
377 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
378 char *buffer, size_t buffer_length,
379 memcached_result_st *result)
380 {
381 memcached_return_t rc;
382 protocol_binary_response_header header;
383
384 if ((rc= memcached_safe_read(ptr, &header.bytes, sizeof(header.bytes))) != MEMCACHED_SUCCESS)
385 {
386 WATCHPOINT_ERROR(rc);
387 return rc;
388 }
389
390 if (header.response.magic != PROTOCOL_BINARY_RES)
391 {
392 return MEMCACHED_PROTOCOL_ERROR;
393 }
394
395 /*
396 ** Convert the header to host local endian!
397 */
398 header.response.keylen= ntohs(header.response.keylen);
399 header.response.status= ntohs(header.response.status);
400 header.response.bodylen= ntohl(header.response.bodylen);
401 header.response.cas= memcached_ntohll(header.response.cas);
402 uint32_t bodylen= header.response.bodylen;
403
404 if (header.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS or
405 header.response.status == PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE)
406 {
407 switch (header.response.opcode)
408 {
409 case PROTOCOL_BINARY_CMD_GETKQ:
410 /*
411 * We didn't increment the response counter for the GETKQ packet
412 * (only the final NOOP), so we need to increment the counter again.
413 */
414 memcached_server_response_increment(ptr);
415 /* FALLTHROUGH */
416 case PROTOCOL_BINARY_CMD_GETK:
417 {
418 uint16_t keylen= header.response.keylen;
419 memcached_result_reset(result);
420 result->item_cas= header.response.cas;
421
422 if ((rc= memcached_safe_read(ptr, &result->item_flags, sizeof (result->item_flags))) != MEMCACHED_SUCCESS)
423 {
424 WATCHPOINT_ERROR(rc);
425 return MEMCACHED_UNKNOWN_READ_FAILURE;
426 }
427
428 result->item_flags= ntohl(result->item_flags);
429 bodylen -= header.response.extlen;
430
431 result->key_length= keylen;
432 if (memcached_failed(rc= memcached_safe_read(ptr, result->item_key, keylen)))
433 {
434 WATCHPOINT_ERROR(rc);
435 return MEMCACHED_UNKNOWN_READ_FAILURE;
436 }
437
438 // Only bother with doing this if key_length > 0
439 if (result->key_length)
440 {
441 if (memcached_array_size(ptr->root->_namespace) and memcached_array_size(ptr->root->_namespace) >= result->key_length)
442 {
443 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
444 }
445
446 if (memcached_array_size(ptr->root->_namespace))
447 {
448 result->key_length-= memcached_array_size(ptr->root->_namespace);
449 memmove(result->item_key, result->item_key +memcached_array_size(ptr->root->_namespace), result->key_length);
450 }
451 }
452
453 bodylen -= keylen;
454 if (memcached_failed(memcached_string_check(&result->value, bodylen)))
455 {
456 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
457 }
458
459 char *vptr= memcached_string_value_mutable(&result->value);
460 if (memcached_failed(rc= memcached_safe_read(ptr, vptr, bodylen)))
461 {
462 WATCHPOINT_ERROR(rc);
463 return MEMCACHED_UNKNOWN_READ_FAILURE;
464 }
465
466 memcached_string_set_length(&result->value, bodylen);
467 }
468 break;
469
470 case PROTOCOL_BINARY_CMD_INCREMENT:
471 case PROTOCOL_BINARY_CMD_DECREMENT:
472 {
473 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
474 {
475 return MEMCACHED_PROTOCOL_ERROR;
476 }
477
478 WATCHPOINT_ASSERT(bodylen == buffer_length);
479 uint64_t val;
480 if ((rc= memcached_safe_read(ptr, &val, sizeof(val))) != MEMCACHED_SUCCESS)
481 {
482 WATCHPOINT_ERROR(rc);
483 return MEMCACHED_UNKNOWN_READ_FAILURE;
484 }
485
486 val= memcached_ntohll(val);
487 memcpy(buffer, &val, sizeof(val));
488 }
489 break;
490
491 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
492 case PROTOCOL_BINARY_CMD_VERSION:
493 {
494 memset(buffer, 0, buffer_length);
495 if (bodylen >= buffer_length)
496 {
497 /* not enough space in buffer.. should not happen... */
498 return MEMCACHED_UNKNOWN_READ_FAILURE;
499 }
500 else if ((rc= memcached_safe_read(ptr, buffer, bodylen)) != MEMCACHED_SUCCESS)
501 {
502 WATCHPOINT_ERROR(rc);
503 return MEMCACHED_UNKNOWN_READ_FAILURE;
504 }
505 }
506 break;
507 case PROTOCOL_BINARY_CMD_FLUSH:
508 case PROTOCOL_BINARY_CMD_QUIT:
509 case PROTOCOL_BINARY_CMD_SET:
510 case PROTOCOL_BINARY_CMD_ADD:
511 case PROTOCOL_BINARY_CMD_REPLACE:
512 case PROTOCOL_BINARY_CMD_APPEND:
513 case PROTOCOL_BINARY_CMD_PREPEND:
514 case PROTOCOL_BINARY_CMD_DELETE:
515 {
516 WATCHPOINT_ASSERT(bodylen == 0);
517 return MEMCACHED_SUCCESS;
518 }
519 case PROTOCOL_BINARY_CMD_NOOP:
520 {
521 WATCHPOINT_ASSERT(bodylen == 0);
522 return MEMCACHED_END;
523 }
524 case PROTOCOL_BINARY_CMD_STAT:
525 {
526 if (bodylen == 0)
527 {
528 return MEMCACHED_END;
529 }
530 else if (bodylen + 1 > buffer_length)
531 {
532 /* not enough space in buffer.. should not happen... */
533 return MEMCACHED_UNKNOWN_READ_FAILURE;
534 }
535 else
536 {
537 size_t keylen= header.response.keylen;
538 memset(buffer, 0, buffer_length);
539 if ((rc= memcached_safe_read(ptr, buffer, keylen)) != MEMCACHED_SUCCESS ||
540 (rc= memcached_safe_read(ptr, buffer + keylen + 1, bodylen - keylen)) != MEMCACHED_SUCCESS)
541 {
542 WATCHPOINT_ERROR(rc);
543 return MEMCACHED_UNKNOWN_READ_FAILURE;
544 }
545 }
546 }
547 break;
548
549 case PROTOCOL_BINARY_CMD_SASL_AUTH:
550 case PROTOCOL_BINARY_CMD_SASL_STEP:
551 {
552 memcached_result_reset(result);
553 result->item_cas= header.response.cas;
554
555 if (memcached_string_check(&result->value,
556 bodylen) != MEMCACHED_SUCCESS)
557 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
558
559 char *vptr= memcached_string_value_mutable(&result->value);
560 if ((rc= memcached_safe_read(ptr, vptr, bodylen)) != MEMCACHED_SUCCESS)
561 {
562 WATCHPOINT_ERROR(rc);
563 return MEMCACHED_UNKNOWN_READ_FAILURE;
564 }
565
566 memcached_string_set_length(&result->value, bodylen);
567 }
568 break;
569 default:
570 {
571 /* Command not implemented yet! */
572 WATCHPOINT_ASSERT(0);
573 return MEMCACHED_PROTOCOL_ERROR;
574 }
575 }
576 }
577 else if (header.response.bodylen)
578 {
579 /* What should I do with the error message??? just discard it for now */
580 char hole[SMALL_STRING_LEN];
581 while (bodylen > 0)
582 {
583 size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen;
584 if ((rc= memcached_safe_read(ptr, hole, nr)) != MEMCACHED_SUCCESS)
585 {
586 WATCHPOINT_ERROR(rc);
587 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
588 }
589 bodylen-= (uint32_t) nr;
590 }
591
592 /* This might be an error from one of the quiet commands.. if
593 * so, just throw it away and get the next one. What about creating
594 * a callback to the user with the error information?
595 */
596 switch (header.response.opcode)
597 {
598 case PROTOCOL_BINARY_CMD_SETQ:
599 case PROTOCOL_BINARY_CMD_ADDQ:
600 case PROTOCOL_BINARY_CMD_REPLACEQ:
601 case PROTOCOL_BINARY_CMD_APPENDQ:
602 case PROTOCOL_BINARY_CMD_PREPENDQ:
603 return binary_read_one_response(ptr, buffer, buffer_length, result);
604
605 default:
606 break;
607 }
608 }
609
610 rc= MEMCACHED_SUCCESS;
611 if (header.response.status != 0)
612 {
613 switch (header.response.status)
614 {
615 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
616 rc= MEMCACHED_NOTFOUND;
617 break;
618
619 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
620 rc= MEMCACHED_DATA_EXISTS;
621 break;
622
623 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
624 rc= MEMCACHED_NOTSTORED;
625 break;
626
627 case PROTOCOL_BINARY_RESPONSE_E2BIG:
628 rc= MEMCACHED_E2BIG;
629 break;
630
631 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
632 rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
633 break;
634
635 case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
636 rc= MEMCACHED_AUTH_CONTINUE;
637 break;
638
639 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
640 rc= MEMCACHED_AUTH_FAILURE;
641 break;
642
643 case PROTOCOL_BINARY_RESPONSE_EINVAL:
644 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
645 default:
646 /* @todo fix the error mappings */
647 rc= MEMCACHED_PROTOCOL_ERROR;
648 break;
649 }
650 }
651
652 return rc;
653 }