Merge in touch.
[awesomized/libmemcached] / libmemcached / response.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include <libmemcached/common.h>
39 #include <libmemcached/string.hpp>
40
41 static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
42 char *buffer, size_t buffer_length,
43 memcached_result_st *result);
44 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
45 char *buffer, size_t buffer_length,
46 memcached_result_st *result);
47
48 memcached_return_t memcached_read_one_response(memcached_server_write_instance_st ptr,
49 char *buffer, size_t buffer_length,
50 memcached_result_st *result)
51 {
52 memcached_server_response_decrement(ptr);
53
54 if (result == NULL)
55 {
56 memcached_st *root= (memcached_st *)ptr->root;
57 result = &root->result;
58 }
59
60 memcached_return_t rc;
61 if (ptr->root->flags.binary_protocol)
62 {
63 rc= binary_read_one_response(ptr, buffer, buffer_length, result);
64 }
65 else
66 {
67 rc= textual_read_one_response(ptr, buffer, buffer_length, result);
68 }
69
70 unlikely(rc == MEMCACHED_UNKNOWN_READ_FAILURE or
71 rc == MEMCACHED_PROTOCOL_ERROR or
72 rc == MEMCACHED_CLIENT_ERROR or
73 rc == MEMCACHED_MEMORY_ALLOCATION_FAILURE)
74 {
75 memcached_io_reset(ptr);
76 }
77
78 return rc;
79 }
80
81 memcached_return_t memcached_response(memcached_server_write_instance_st ptr,
82 char *buffer, size_t buffer_length,
83 memcached_result_st *result)
84 {
85 /* We may have old commands in the buffer not set, first purge */
86 if ((ptr->root->flags.no_block) && (memcached_is_processing_input(ptr->root) == false))
87 {
88 (void)memcached_io_write(ptr, NULL, 0, true);
89 }
90
91 /*
92 * The previous implementation purged all pending requests and just
93 * returned the last one. Purge all pending messages to ensure backwards
94 * compatibility.
95 */
96 if (ptr->root->flags.binary_protocol == false)
97 {
98 while (memcached_server_response_count(ptr) > 1)
99 {
100 memcached_return_t rc= memcached_read_one_response(ptr, buffer, buffer_length, result);
101
102 unlikely (rc != MEMCACHED_END &&
103 rc != MEMCACHED_STORED &&
104 rc != MEMCACHED_SUCCESS &&
105 rc != MEMCACHED_STAT &&
106 rc != MEMCACHED_DELETED &&
107 rc != MEMCACHED_NOTFOUND &&
108 rc != MEMCACHED_NOTSTORED &&
109 rc != MEMCACHED_DATA_EXISTS)
110 return rc;
111 }
112 }
113
114 return memcached_read_one_response(ptr, buffer, buffer_length, result);
115 }
116
117 static memcached_return_t textual_value_fetch(memcached_server_write_instance_st ptr,
118 char *buffer,
119 memcached_result_st *result)
120 {
121 char *string_ptr;
122 char *end_ptr;
123 char *next_ptr;
124 size_t value_length;
125 size_t to_read;
126 ssize_t read_length= 0;
127
128 if (ptr->root->flags.use_udp)
129 {
130 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
131 }
132
133 WATCHPOINT_ASSERT(ptr->root);
134 end_ptr= buffer + MEMCACHED_DEFAULT_COMMAND_SIZE;
135
136 memcached_result_reset(result);
137
138 string_ptr= buffer;
139 string_ptr+= 6; /* "VALUE " */
140
141
142 /* We load the key */
143 {
144 char *key;
145 size_t prefix_length;
146
147 key= result->item_key;
148 result->key_length= 0;
149
150 for (prefix_length= memcached_array_size(ptr->root->_namespace); !(iscntrl(*string_ptr) || isspace(*string_ptr)) ; string_ptr++)
151 {
152 if (prefix_length == 0)
153 {
154 *key= *string_ptr;
155 key++;
156 result->key_length++;
157 }
158 else
159 prefix_length--;
160 }
161 result->item_key[result->key_length]= 0;
162 }
163
164 if (end_ptr == string_ptr)
165 goto read_error;
166
167 /* Flags fetch move past space */
168 string_ptr++;
169 if (end_ptr == string_ptr)
170 goto read_error;
171
172 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
173 result->item_flags= (uint32_t) strtoul(next_ptr, &string_ptr, 10);
174
175 if (end_ptr == string_ptr)
176 goto read_error;
177
178 /* Length fetch move past space*/
179 string_ptr++;
180 if (end_ptr == string_ptr)
181 goto read_error;
182
183 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
184 value_length= (size_t)strtoull(next_ptr, &string_ptr, 10);
185
186 if (end_ptr == string_ptr)
187 goto read_error;
188
189 /* Skip spaces */
190 if (*string_ptr == '\r')
191 {
192 /* Skip past the \r\n */
193 string_ptr+= 2;
194 }
195 else
196 {
197 string_ptr++;
198 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
199 result->item_cas= strtoull(next_ptr, &string_ptr, 10);
200 }
201
202 if (end_ptr < string_ptr)
203 goto read_error;
204
205 /* We add two bytes so that we can walk the \r\n */
206 if (memcached_failed(memcached_string_check(&result->value, value_length +2)))
207 {
208 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
209 }
210
211 {
212 char *value_ptr= memcached_string_value_mutable(&result->value);
213 /*
214 We read the \r\n into the string since not doing so is more
215 cycles then the waster of memory to do so.
216
217 We are null terminating through, which will most likely make
218 some people lazy about using the return length.
219 */
220 to_read= (value_length) + 2;
221 memcached_return_t rrc= memcached_io_read(ptr, value_ptr, to_read, &read_length);
222 if (memcached_failed(rrc) and rrc == MEMCACHED_IN_PROGRESS)
223 {
224 memcached_quit_server(ptr, true);
225 return memcached_set_error(*ptr, MEMCACHED_IN_PROGRESS, MEMCACHED_AT);
226 }
227 else if (memcached_failed(rrc))
228 {
229 return rrc;
230 }
231 }
232
233 if (read_length != (ssize_t)(value_length + 2))
234 {
235 goto read_error;
236 }
237
238 /* This next bit blows the API, but this is internal....*/
239 {
240 char *char_ptr;
241 char_ptr= memcached_string_value_mutable(&result->value);;
242 char_ptr[value_length]= 0;
243 char_ptr[value_length +1]= 0;
244 memcached_string_set_length(&result->value, value_length);
245 }
246
247 return MEMCACHED_SUCCESS;
248
249 read_error:
250 memcached_io_reset(ptr);
251
252 return MEMCACHED_PARTIAL_READ;
253 }
254
255 static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
256 char *buffer, size_t buffer_length,
257 memcached_result_st *result)
258 {
259 size_t total_read;
260 memcached_return_t rc= memcached_io_readline(ptr, buffer, buffer_length, total_read);
261
262 if (memcached_failed(rc))
263 {
264 return rc;
265 }
266
267 switch(buffer[0])
268 {
269 case 'V': /* VALUE || VERSION */
270 if (buffer[1] == 'A') /* VALUE */
271 {
272 /* We add back in one because we will need to search for END */
273 memcached_server_response_increment(ptr);
274 return textual_value_fetch(ptr, buffer, result);
275 }
276 else if (buffer[1] == 'E') /* VERSION */
277 {
278 return MEMCACHED_SUCCESS;
279 }
280 else
281 {
282 WATCHPOINT_STRING(buffer);
283 return MEMCACHED_UNKNOWN_READ_FAILURE;
284 }
285 case 'O': /* OK */
286 return MEMCACHED_SUCCESS;
287
288 case 'S': /* STORED STATS SERVER_ERROR */
289 {
290 if (buffer[2] == 'A') /* STORED STATS */
291 {
292 memcached_server_response_increment(ptr);
293 return MEMCACHED_STAT;
294 }
295 else if (buffer[1] == 'E') /* SERVER_ERROR */
296 {
297 if (total_read == memcached_literal_param_size("SERVER_ERROR"))
298 {
299 return MEMCACHED_SERVER_ERROR;
300 }
301
302 if (total_read > memcached_literal_param_size("SERVER_ERROR object too large for cache") and
303 (memcmp(buffer, memcached_literal_param("SERVER_ERROR object too large for cache")) == 0))
304 {
305 return MEMCACHED_E2BIG;
306 }
307
308 // Move past the basic error message and whitespace
309 char *startptr= buffer + memcached_literal_param_size("SERVER_ERROR");
310 if (startptr[0] == ' ')
311 {
312 startptr++;
313 }
314
315 char *endptr= startptr;
316 while (*endptr != '\r' && *endptr != '\n') endptr++;
317
318 return memcached_set_error(*ptr, MEMCACHED_SERVER_ERROR, MEMCACHED_AT, startptr, size_t(endptr - startptr));
319 }
320 else if (buffer[1] == 'T')
321 {
322 return MEMCACHED_STORED;
323 }
324 else
325 {
326 WATCHPOINT_STRING(buffer);
327 return MEMCACHED_UNKNOWN_READ_FAILURE;
328 }
329 }
330 case 'D': /* DELETED */
331 return MEMCACHED_DELETED;
332
333 case 'N': /* NOT_FOUND */
334 {
335 if (buffer[4] == 'F')
336 {
337 return MEMCACHED_NOTFOUND;
338 }
339 else if (buffer[4] == 'S')
340 {
341 return MEMCACHED_NOTSTORED;
342 }
343 else
344 {
345 WATCHPOINT_STRING(buffer);
346 return MEMCACHED_UNKNOWN_READ_FAILURE;
347 }
348 }
349 case 'E': /* PROTOCOL ERROR or END */
350 {
351 if (buffer[1] == 'N')
352 {
353 return MEMCACHED_END;
354 }
355 else if (buffer[1] == 'R')
356 {
357 return MEMCACHED_PROTOCOL_ERROR;
358 }
359 else if (buffer[1] == 'X')
360 {
361 return MEMCACHED_DATA_EXISTS;
362 }
363 else
364 {
365 WATCHPOINT_STRING(buffer);
366 return MEMCACHED_UNKNOWN_READ_FAILURE;
367 }
368
369 }
370 case 'T': /* TOUCHED */
371 {
372 if (buffer[1] == 'O' and buffer[2] == 'U'
373 and buffer[3] == 'C' and buffer[4] == 'H'
374 and buffer[5] == 'E' and buffer[6] == 'D')
375 {
376 return MEMCACHED_SUCCESS;
377 }
378 }
379 return MEMCACHED_UNKNOWN_READ_FAILURE;
380
381 case 'I': /* CLIENT ERROR */
382 /* We add back in one because we will need to search for END */
383 memcached_server_response_increment(ptr);
384 return MEMCACHED_ITEM;
385
386 case 'C': /* CLIENT ERROR */
387 return MEMCACHED_CLIENT_ERROR;
388
389 default:
390 {
391 unsigned long long auto_return_value;
392
393 if (sscanf(buffer, "%llu", &auto_return_value) == 1)
394 return MEMCACHED_SUCCESS;
395
396 WATCHPOINT_STRING(buffer);
397 return MEMCACHED_UNKNOWN_READ_FAILURE;
398 }
399 }
400
401 /* NOTREACHED */
402 }
403
404 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
405 char *buffer, size_t buffer_length,
406 memcached_result_st *result)
407 {
408 memcached_return_t rc;
409 protocol_binary_response_header header;
410
411 if ((rc= memcached_safe_read(ptr, &header.bytes, sizeof(header.bytes))) != MEMCACHED_SUCCESS)
412 {
413 WATCHPOINT_ERROR(rc);
414 return rc;
415 }
416
417 if (header.response.magic != PROTOCOL_BINARY_RES)
418 {
419 return MEMCACHED_PROTOCOL_ERROR;
420 }
421
422 /*
423 ** Convert the header to host local endian!
424 */
425 header.response.keylen= ntohs(header.response.keylen);
426 header.response.status= ntohs(header.response.status);
427 header.response.bodylen= ntohl(header.response.bodylen);
428 header.response.cas= memcached_ntohll(header.response.cas);
429 uint32_t bodylen= header.response.bodylen;
430
431 if (header.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS or
432 header.response.status == PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE)
433 {
434 switch (header.response.opcode)
435 {
436 case PROTOCOL_BINARY_CMD_GETKQ:
437 /*
438 * We didn't increment the response counter for the GETKQ packet
439 * (only the final NOOP), so we need to increment the counter again.
440 */
441 memcached_server_response_increment(ptr);
442 /* FALLTHROUGH */
443 case PROTOCOL_BINARY_CMD_GETK:
444 {
445 uint16_t keylen= header.response.keylen;
446 memcached_result_reset(result);
447 result->item_cas= header.response.cas;
448
449 if ((rc= memcached_safe_read(ptr, &result->item_flags, sizeof (result->item_flags))) != MEMCACHED_SUCCESS)
450 {
451 WATCHPOINT_ERROR(rc);
452 return MEMCACHED_UNKNOWN_READ_FAILURE;
453 }
454
455 result->item_flags= ntohl(result->item_flags);
456 bodylen -= header.response.extlen;
457
458 result->key_length= keylen;
459 if (memcached_failed(rc= memcached_safe_read(ptr, result->item_key, keylen)))
460 {
461 WATCHPOINT_ERROR(rc);
462 return MEMCACHED_UNKNOWN_READ_FAILURE;
463 }
464
465 // Only bother with doing this if key_length > 0
466 if (result->key_length)
467 {
468 if (memcached_array_size(ptr->root->_namespace) and memcached_array_size(ptr->root->_namespace) >= result->key_length)
469 {
470 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
471 }
472
473 if (memcached_array_size(ptr->root->_namespace))
474 {
475 result->key_length-= memcached_array_size(ptr->root->_namespace);
476 memmove(result->item_key, result->item_key +memcached_array_size(ptr->root->_namespace), result->key_length);
477 }
478 }
479
480 bodylen -= keylen;
481 if (memcached_failed(memcached_string_check(&result->value, bodylen)))
482 {
483 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
484 }
485
486 char *vptr= memcached_string_value_mutable(&result->value);
487 if (memcached_failed(rc= memcached_safe_read(ptr, vptr, bodylen)))
488 {
489 WATCHPOINT_ERROR(rc);
490 return MEMCACHED_UNKNOWN_READ_FAILURE;
491 }
492
493 memcached_string_set_length(&result->value, bodylen);
494 }
495 break;
496
497 case PROTOCOL_BINARY_CMD_INCREMENT:
498 case PROTOCOL_BINARY_CMD_DECREMENT:
499 {
500 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
501 {
502 return MEMCACHED_PROTOCOL_ERROR;
503 }
504
505 WATCHPOINT_ASSERT(bodylen == buffer_length);
506 uint64_t val;
507 if ((rc= memcached_safe_read(ptr, &val, sizeof(val))) != MEMCACHED_SUCCESS)
508 {
509 WATCHPOINT_ERROR(rc);
510 return MEMCACHED_UNKNOWN_READ_FAILURE;
511 }
512
513 val= memcached_ntohll(val);
514 memcpy(buffer, &val, sizeof(val));
515 }
516 break;
517
518 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
519 case PROTOCOL_BINARY_CMD_VERSION:
520 {
521 memset(buffer, 0, buffer_length);
522 if (bodylen >= buffer_length)
523 {
524 /* not enough space in buffer.. should not happen... */
525 return MEMCACHED_UNKNOWN_READ_FAILURE;
526 }
527 else if ((rc= memcached_safe_read(ptr, buffer, bodylen)) != MEMCACHED_SUCCESS)
528 {
529 WATCHPOINT_ERROR(rc);
530 return MEMCACHED_UNKNOWN_READ_FAILURE;
531 }
532 }
533 break;
534 case PROTOCOL_BINARY_CMD_FLUSH:
535 case PROTOCOL_BINARY_CMD_QUIT:
536 case PROTOCOL_BINARY_CMD_SET:
537 case PROTOCOL_BINARY_CMD_ADD:
538 case PROTOCOL_BINARY_CMD_REPLACE:
539 case PROTOCOL_BINARY_CMD_APPEND:
540 case PROTOCOL_BINARY_CMD_PREPEND:
541 case PROTOCOL_BINARY_CMD_DELETE:
542 case PROTOCOL_BINARY_CMD_TOUCH:
543 {
544 WATCHPOINT_ASSERT(bodylen == 0);
545 return MEMCACHED_SUCCESS;
546 }
547
548 case PROTOCOL_BINARY_CMD_NOOP:
549 {
550 WATCHPOINT_ASSERT(bodylen == 0);
551 return MEMCACHED_END;
552 }
553
554 case PROTOCOL_BINARY_CMD_STAT:
555 {
556 if (bodylen == 0)
557 {
558 return MEMCACHED_END;
559 }
560 else if (bodylen + 1 > buffer_length)
561 {
562 /* not enough space in buffer.. should not happen... */
563 return MEMCACHED_UNKNOWN_READ_FAILURE;
564 }
565 else
566 {
567 size_t keylen= header.response.keylen;
568 memset(buffer, 0, buffer_length);
569 if ((rc= memcached_safe_read(ptr, buffer, keylen)) != MEMCACHED_SUCCESS ||
570 (rc= memcached_safe_read(ptr, buffer + keylen + 1, bodylen - keylen)) != MEMCACHED_SUCCESS)
571 {
572 WATCHPOINT_ERROR(rc);
573 return MEMCACHED_UNKNOWN_READ_FAILURE;
574 }
575 }
576 }
577 break;
578
579 case PROTOCOL_BINARY_CMD_SASL_AUTH:
580 case PROTOCOL_BINARY_CMD_SASL_STEP:
581 {
582 memcached_result_reset(result);
583 result->item_cas= header.response.cas;
584
585 if (memcached_string_check(&result->value,
586 bodylen) != MEMCACHED_SUCCESS)
587 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
588
589 char *vptr= memcached_string_value_mutable(&result->value);
590 if ((rc= memcached_safe_read(ptr, vptr, bodylen)) != MEMCACHED_SUCCESS)
591 {
592 WATCHPOINT_ERROR(rc);
593 return MEMCACHED_UNKNOWN_READ_FAILURE;
594 }
595
596 memcached_string_set_length(&result->value, bodylen);
597 }
598 break;
599 default:
600 {
601 /* Command not implemented yet! */
602 WATCHPOINT_ASSERT(0);
603 return MEMCACHED_PROTOCOL_ERROR;
604 }
605 }
606 }
607 else if (header.response.bodylen)
608 {
609 /* What should I do with the error message??? just discard it for now */
610 char hole[SMALL_STRING_LEN];
611 while (bodylen > 0)
612 {
613 size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen;
614 if ((rc= memcached_safe_read(ptr, hole, nr)) != MEMCACHED_SUCCESS)
615 {
616 WATCHPOINT_ERROR(rc);
617 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
618 }
619 bodylen-= (uint32_t) nr;
620 }
621
622 /* This might be an error from one of the quiet commands.. if
623 * so, just throw it away and get the next one. What about creating
624 * a callback to the user with the error information?
625 */
626 switch (header.response.opcode)
627 {
628 case PROTOCOL_BINARY_CMD_SETQ:
629 case PROTOCOL_BINARY_CMD_ADDQ:
630 case PROTOCOL_BINARY_CMD_REPLACEQ:
631 case PROTOCOL_BINARY_CMD_APPENDQ:
632 case PROTOCOL_BINARY_CMD_PREPENDQ:
633 return binary_read_one_response(ptr, buffer, buffer_length, result);
634
635 default:
636 break;
637 }
638 }
639
640 rc= MEMCACHED_SUCCESS;
641 if (header.response.status != 0)
642 {
643 switch (header.response.status)
644 {
645 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
646 rc= MEMCACHED_NOTFOUND;
647 break;
648
649 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
650 rc= MEMCACHED_DATA_EXISTS;
651 break;
652
653 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
654 rc= MEMCACHED_NOTSTORED;
655 break;
656
657 case PROTOCOL_BINARY_RESPONSE_E2BIG:
658 rc= MEMCACHED_E2BIG;
659 break;
660
661 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
662 rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
663 break;
664
665 case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
666 rc= MEMCACHED_AUTH_CONTINUE;
667 break;
668
669 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
670 rc= MEMCACHED_AUTH_FAILURE;
671 break;
672
673 case PROTOCOL_BINARY_RESPONSE_EINVAL:
674 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
675 default:
676 /* @todo fix the error mappings */
677 rc= MEMCACHED_PROTOCOL_ERROR;
678 break;
679 }
680 }
681
682 return rc;
683 }