Merge in namespace fixes for binary protocol.
[m6w6/libmemcached] / libmemcached / response.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38 #include <libmemcached/common.h>
39
40 static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
41 char *buffer, size_t buffer_length,
42 memcached_result_st *result);
43 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
44 char *buffer, size_t buffer_length,
45 memcached_result_st *result);
46
47 memcached_return_t memcached_read_one_response(memcached_server_write_instance_st ptr,
48 char *buffer, size_t buffer_length,
49 memcached_result_st *result)
50 {
51 memcached_server_response_decrement(ptr);
52
53 if (result == NULL)
54 {
55 memcached_st *root= (memcached_st *)ptr->root;
56 result = &root->result;
57 }
58
59 memcached_return_t rc;
60 if (ptr->root->flags.binary_protocol)
61 {
62 rc= binary_read_one_response(ptr, buffer, buffer_length, result);
63 }
64 else
65 {
66 rc= textual_read_one_response(ptr, buffer, buffer_length, result);
67 }
68
69 unlikely(rc == MEMCACHED_UNKNOWN_READ_FAILURE or
70 rc == MEMCACHED_PROTOCOL_ERROR or
71 rc == MEMCACHED_CLIENT_ERROR or
72 rc == MEMCACHED_MEMORY_ALLOCATION_FAILURE)
73 memcached_io_reset(ptr);
74
75 return rc;
76 }
77
78 memcached_return_t memcached_response(memcached_server_write_instance_st ptr,
79 char *buffer, size_t buffer_length,
80 memcached_result_st *result)
81 {
82 /* We may have old commands in the buffer not set, first purge */
83 if ((ptr->root->flags.no_block) && (memcached_is_processing_input(ptr->root) == false))
84 {
85 (void)memcached_io_write(ptr, NULL, 0, true);
86 }
87
88 /*
89 * The previous implementation purged all pending requests and just
90 * returned the last one. Purge all pending messages to ensure backwards
91 * compatibility.
92 */
93 if (ptr->root->flags.binary_protocol == false)
94 {
95 while (memcached_server_response_count(ptr) > 1)
96 {
97 memcached_return_t rc= memcached_read_one_response(ptr, buffer, buffer_length, result);
98
99 unlikely (rc != MEMCACHED_END &&
100 rc != MEMCACHED_STORED &&
101 rc != MEMCACHED_SUCCESS &&
102 rc != MEMCACHED_STAT &&
103 rc != MEMCACHED_DELETED &&
104 rc != MEMCACHED_NOTFOUND &&
105 rc != MEMCACHED_NOTSTORED &&
106 rc != MEMCACHED_DATA_EXISTS)
107 return rc;
108 }
109 }
110
111 return memcached_read_one_response(ptr, buffer, buffer_length, result);
112 }
113
114 static memcached_return_t textual_value_fetch(memcached_server_write_instance_st ptr,
115 char *buffer,
116 memcached_result_st *result)
117 {
118 char *string_ptr;
119 char *end_ptr;
120 char *next_ptr;
121 size_t value_length;
122 size_t to_read;
123 ssize_t read_length= 0;
124
125 if (ptr->root->flags.use_udp)
126 return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
127
128 WATCHPOINT_ASSERT(ptr->root);
129 end_ptr= buffer + MEMCACHED_DEFAULT_COMMAND_SIZE;
130
131 memcached_result_reset(result);
132
133 string_ptr= buffer;
134 string_ptr+= 6; /* "VALUE " */
135
136
137 /* We load the key */
138 {
139 char *key;
140 size_t prefix_length;
141
142 key= result->item_key;
143 result->key_length= 0;
144
145 for (prefix_length= memcached_array_size(ptr->root->prefix_key); !(iscntrl(*string_ptr) || isspace(*string_ptr)) ; string_ptr++)
146 {
147 if (prefix_length == 0)
148 {
149 *key= *string_ptr;
150 key++;
151 result->key_length++;
152 }
153 else
154 prefix_length--;
155 }
156 result->item_key[result->key_length]= 0;
157 }
158
159 if (end_ptr == string_ptr)
160 goto read_error;
161
162 /* Flags fetch move past space */
163 string_ptr++;
164 if (end_ptr == string_ptr)
165 goto read_error;
166
167 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
168 result->item_flags= (uint32_t) strtoul(next_ptr, &string_ptr, 10);
169
170 if (end_ptr == string_ptr)
171 goto read_error;
172
173 /* Length fetch move past space*/
174 string_ptr++;
175 if (end_ptr == string_ptr)
176 goto read_error;
177
178 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
179 value_length= (size_t)strtoull(next_ptr, &string_ptr, 10);
180
181 if (end_ptr == string_ptr)
182 goto read_error;
183
184 /* Skip spaces */
185 if (*string_ptr == '\r')
186 {
187 /* Skip past the \r\n */
188 string_ptr+= 2;
189 }
190 else
191 {
192 string_ptr++;
193 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++) {};
194 result->item_cas= strtoull(next_ptr, &string_ptr, 10);
195 }
196
197 if (end_ptr < string_ptr)
198 goto read_error;
199
200 /* We add two bytes so that we can walk the \r\n */
201 if (memcached_failed(memcached_string_check(&result->value, value_length +2)))
202 {
203 value_length= 0;
204 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
205 }
206
207 {
208 char *value_ptr= memcached_string_value_mutable(&result->value);
209 /*
210 We read the \r\n into the string since not doing so is more
211 cycles then the waster of memory to do so.
212
213 We are null terminating through, which will most likely make
214 some people lazy about using the return length.
215 */
216 to_read= (value_length) + 2;
217 memcached_return_t rrc= memcached_io_read(ptr, value_ptr, to_read, &read_length);
218 if (memcached_failed(rrc) and rrc == MEMCACHED_IN_PROGRESS)
219 {
220 memcached_quit_server(ptr, true);
221 return memcached_set_error(*ptr, rrc, MEMCACHED_AT);
222 }
223 else if (memcached_failed(rrc))
224 {
225 return rrc;
226 }
227 }
228
229 if (read_length != (ssize_t)(value_length + 2))
230 {
231 goto read_error;
232 }
233
234 /* This next bit blows the API, but this is internal....*/
235 {
236 char *char_ptr;
237 char_ptr= memcached_string_value_mutable(&result->value);;
238 char_ptr[value_length]= 0;
239 char_ptr[value_length +1]= 0;
240 memcached_string_set_length(&result->value, value_length);
241 }
242
243 return MEMCACHED_SUCCESS;
244
245 read_error:
246 memcached_io_reset(ptr);
247
248 return MEMCACHED_PARTIAL_READ;
249 }
250
251 static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
252 char *buffer, size_t buffer_length,
253 memcached_result_st *result)
254 {
255 memcached_return_t rc= memcached_io_readline(ptr, buffer, buffer_length);
256 if (memcached_failed(rc))
257 {
258 return rc;
259 }
260
261 switch(buffer[0])
262 {
263 case 'V': /* VALUE || VERSION */
264 if (buffer[1] == 'A') /* VALUE */
265 {
266 /* We add back in one because we will need to search for END */
267 memcached_server_response_increment(ptr);
268 return textual_value_fetch(ptr, buffer, result);
269 }
270 else if (buffer[1] == 'E') /* VERSION */
271 {
272 return MEMCACHED_SUCCESS;
273 }
274 else
275 {
276 WATCHPOINT_STRING(buffer);
277 return MEMCACHED_UNKNOWN_READ_FAILURE;
278 }
279 case 'O': /* OK */
280 return MEMCACHED_SUCCESS;
281 case 'S': /* STORED STATS SERVER_ERROR */
282 {
283 if (buffer[2] == 'A') /* STORED STATS */
284 {
285 memcached_server_response_increment(ptr);
286 return MEMCACHED_STAT;
287 }
288 else if (buffer[1] == 'E') /* SERVER_ERROR */
289 {
290 char *startptr= buffer + 13, *endptr= startptr;
291
292 while (*endptr != '\r' && *endptr != '\n') endptr++;
293
294 /*
295 Yes, we could make this "efficent" but to do that we would need
296 to maintain more state for the size of the buffer. Why waste
297 memory in the struct, which is important, for something that
298 rarely should happen?
299 */
300 char *rel_ptr= (char *)libmemcached_realloc(ptr->root,
301 ptr->cached_server_error,
302 (size_t) (endptr - startptr + 1));
303
304 if (rel_ptr == NULL)
305 {
306 /* If we happened to have some memory, we just null it since we don't know the size */
307 if (ptr->cached_server_error)
308 ptr->cached_server_error[0]= 0;
309 return MEMCACHED_SERVER_ERROR;
310 }
311 ptr->cached_server_error= rel_ptr;
312
313 memcpy(ptr->cached_server_error, startptr, (size_t) (endptr - startptr));
314 ptr->cached_server_error[endptr - startptr]= 0;
315 return MEMCACHED_SERVER_ERROR;
316 }
317 else if (buffer[1] == 'T')
318 {
319 return MEMCACHED_STORED;
320 }
321 else
322 {
323 WATCHPOINT_STRING(buffer);
324 return MEMCACHED_UNKNOWN_READ_FAILURE;
325 }
326 }
327 case 'D': /* DELETED */
328 return MEMCACHED_DELETED;
329
330 case 'N': /* NOT_FOUND */
331 {
332 if (buffer[4] == 'F')
333 return MEMCACHED_NOTFOUND;
334 else if (buffer[4] == 'S')
335 return MEMCACHED_NOTSTORED;
336 else
337 {
338 WATCHPOINT_STRING(buffer);
339 return MEMCACHED_UNKNOWN_READ_FAILURE;
340 }
341 }
342 case 'E': /* PROTOCOL ERROR or END */
343 {
344 if (buffer[1] == 'N')
345 return MEMCACHED_END;
346 else if (buffer[1] == 'R')
347 return MEMCACHED_PROTOCOL_ERROR;
348 else if (buffer[1] == 'X')
349 return MEMCACHED_DATA_EXISTS;
350 else
351 {
352 WATCHPOINT_STRING(buffer);
353 return MEMCACHED_UNKNOWN_READ_FAILURE;
354 }
355
356 }
357 case 'I': /* CLIENT ERROR */
358 /* We add back in one because we will need to search for END */
359 memcached_server_response_increment(ptr);
360 return MEMCACHED_ITEM;
361 case 'C': /* CLIENT ERROR */
362 return MEMCACHED_CLIENT_ERROR;
363 default:
364 {
365 unsigned long long auto_return_value;
366
367 if (sscanf(buffer, "%llu", &auto_return_value) == 1)
368 return MEMCACHED_SUCCESS;
369
370 WATCHPOINT_STRING(buffer);
371 return MEMCACHED_UNKNOWN_READ_FAILURE;
372 }
373 }
374
375 /* NOTREACHED */
376 }
377
378 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
379 char *buffer, size_t buffer_length,
380 memcached_result_st *result)
381 {
382 memcached_return_t rc;
383 protocol_binary_response_header header;
384
385 if ((rc= memcached_safe_read(ptr, &header.bytes, sizeof(header.bytes))) != MEMCACHED_SUCCESS)
386 {
387 WATCHPOINT_ERROR(rc);
388 return rc;
389 }
390
391 if (header.response.magic != PROTOCOL_BINARY_RES)
392 {
393 return MEMCACHED_PROTOCOL_ERROR;
394 }
395
396 /*
397 ** Convert the header to host local endian!
398 */
399 header.response.keylen= ntohs(header.response.keylen);
400 header.response.status= ntohs(header.response.status);
401 header.response.bodylen= ntohl(header.response.bodylen);
402 header.response.cas= memcached_ntohll(header.response.cas);
403 uint32_t bodylen= header.response.bodylen;
404
405 if (header.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS ||
406 header.response.status == PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE)
407 {
408 switch (header.response.opcode)
409 {
410 case PROTOCOL_BINARY_CMD_GETKQ:
411 /*
412 * We didn't increment the response counter for the GETKQ packet
413 * (only the final NOOP), so we need to increment the counter again.
414 */
415 memcached_server_response_increment(ptr);
416 /* FALLTHROUGH */
417 case PROTOCOL_BINARY_CMD_GETK:
418 {
419 uint16_t keylen= header.response.keylen;
420 memcached_result_reset(result);
421 result->item_cas= header.response.cas;
422
423 if ((rc= memcached_safe_read(ptr, &result->item_flags, sizeof (result->item_flags))) != MEMCACHED_SUCCESS)
424 {
425 WATCHPOINT_ERROR(rc);
426 return MEMCACHED_UNKNOWN_READ_FAILURE;
427 }
428
429 result->item_flags= ntohl(result->item_flags);
430 bodylen -= header.response.extlen;
431
432 result->key_length= keylen;
433 if (memcached_failed(rc= memcached_safe_read(ptr, result->item_key, keylen)))
434 {
435 WATCHPOINT_ERROR(rc);
436 return MEMCACHED_UNKNOWN_READ_FAILURE;
437 }
438
439 // Only bother with doing this if key_length > 0
440 if (result->key_length)
441 {
442 if (memcached_array_size(ptr->root->prefix_key) and memcached_array_size(ptr->root->prefix_key) >= result->key_length)
443 {
444 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
445 }
446
447 if (memcached_array_size(ptr->root->prefix_key))
448 {
449 result->key_length-= memcached_array_size(ptr->root->prefix_key);
450 memmove(result->item_key, result->item_key +memcached_array_size(ptr->root->prefix_key), result->key_length);
451 }
452 }
453
454 bodylen -= keylen;
455 if (memcached_failed(memcached_string_check(&result->value, bodylen)))
456 {
457 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
458 }
459
460 char *vptr= memcached_string_value_mutable(&result->value);
461 if (memcached_failed(rc= memcached_safe_read(ptr, vptr, bodylen)))
462 {
463 WATCHPOINT_ERROR(rc);
464 return MEMCACHED_UNKNOWN_READ_FAILURE;
465 }
466
467 memcached_string_set_length(&result->value, bodylen);
468 }
469 break;
470
471 case PROTOCOL_BINARY_CMD_INCREMENT:
472 case PROTOCOL_BINARY_CMD_DECREMENT:
473 {
474 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
475 {
476 return MEMCACHED_PROTOCOL_ERROR;
477 }
478
479 WATCHPOINT_ASSERT(bodylen == buffer_length);
480 uint64_t val;
481 if ((rc= memcached_safe_read(ptr, &val, sizeof(val))) != MEMCACHED_SUCCESS)
482 {
483 WATCHPOINT_ERROR(rc);
484 return MEMCACHED_UNKNOWN_READ_FAILURE;
485 }
486
487 val= memcached_ntohll(val);
488 memcpy(buffer, &val, sizeof(val));
489 }
490 break;
491
492 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
493 case PROTOCOL_BINARY_CMD_VERSION:
494 {
495 memset(buffer, 0, buffer_length);
496 if (bodylen >= buffer_length)
497 {
498 /* not enough space in buffer.. should not happen... */
499 return MEMCACHED_UNKNOWN_READ_FAILURE;
500 }
501 else if ((rc= memcached_safe_read(ptr, buffer, bodylen)) != MEMCACHED_SUCCESS)
502 {
503 WATCHPOINT_ERROR(rc);
504 return MEMCACHED_UNKNOWN_READ_FAILURE;
505 }
506 }
507 break;
508 case PROTOCOL_BINARY_CMD_FLUSH:
509 case PROTOCOL_BINARY_CMD_QUIT:
510 case PROTOCOL_BINARY_CMD_SET:
511 case PROTOCOL_BINARY_CMD_ADD:
512 case PROTOCOL_BINARY_CMD_REPLACE:
513 case PROTOCOL_BINARY_CMD_APPEND:
514 case PROTOCOL_BINARY_CMD_PREPEND:
515 case PROTOCOL_BINARY_CMD_DELETE:
516 {
517 WATCHPOINT_ASSERT(bodylen == 0);
518 return MEMCACHED_SUCCESS;
519 }
520 case PROTOCOL_BINARY_CMD_NOOP:
521 {
522 WATCHPOINT_ASSERT(bodylen == 0);
523 return MEMCACHED_END;
524 }
525 case PROTOCOL_BINARY_CMD_STAT:
526 {
527 if (bodylen == 0)
528 {
529 return MEMCACHED_END;
530 }
531 else if (bodylen + 1 > buffer_length)
532 {
533 /* not enough space in buffer.. should not happen... */
534 return MEMCACHED_UNKNOWN_READ_FAILURE;
535 }
536 else
537 {
538 size_t keylen= header.response.keylen;
539 memset(buffer, 0, buffer_length);
540 if ((rc= memcached_safe_read(ptr, buffer, keylen)) != MEMCACHED_SUCCESS ||
541 (rc= memcached_safe_read(ptr, buffer + keylen + 1, bodylen - keylen)) != MEMCACHED_SUCCESS)
542 {
543 WATCHPOINT_ERROR(rc);
544 return MEMCACHED_UNKNOWN_READ_FAILURE;
545 }
546 }
547 }
548 break;
549
550 case PROTOCOL_BINARY_CMD_SASL_AUTH:
551 case PROTOCOL_BINARY_CMD_SASL_STEP:
552 {
553 memcached_result_reset(result);
554 result->item_cas= header.response.cas;
555
556 if (memcached_string_check(&result->value,
557 bodylen) != MEMCACHED_SUCCESS)
558 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
559
560 char *vptr= memcached_string_value_mutable(&result->value);
561 if ((rc= memcached_safe_read(ptr, vptr, bodylen)) != MEMCACHED_SUCCESS)
562 {
563 WATCHPOINT_ERROR(rc);
564 return MEMCACHED_UNKNOWN_READ_FAILURE;
565 }
566
567 memcached_string_set_length(&result->value, bodylen);
568 }
569 break;
570 default:
571 {
572 /* Command not implemented yet! */
573 WATCHPOINT_ASSERT(0);
574 return MEMCACHED_PROTOCOL_ERROR;
575 }
576 }
577 }
578 else if (header.response.bodylen)
579 {
580 /* What should I do with the error message??? just discard it for now */
581 char hole[SMALL_STRING_LEN];
582 while (bodylen > 0)
583 {
584 size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen;
585 if ((rc= memcached_safe_read(ptr, hole, nr)) != MEMCACHED_SUCCESS)
586 {
587 WATCHPOINT_ERROR(rc);
588 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
589 }
590 bodylen-= (uint32_t) nr;
591 }
592
593 /* This might be an error from one of the quiet commands.. if
594 * so, just throw it away and get the next one. What about creating
595 * a callback to the user with the error information?
596 */
597 switch (header.response.opcode)
598 {
599 case PROTOCOL_BINARY_CMD_SETQ:
600 case PROTOCOL_BINARY_CMD_ADDQ:
601 case PROTOCOL_BINARY_CMD_REPLACEQ:
602 case PROTOCOL_BINARY_CMD_APPENDQ:
603 case PROTOCOL_BINARY_CMD_PREPENDQ:
604 return binary_read_one_response(ptr, buffer, buffer_length, result);
605 default:
606 break;
607 }
608 }
609
610 rc= MEMCACHED_SUCCESS;
611 unlikely(header.response.status != 0)
612 switch (header.response.status)
613 {
614 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
615 rc= MEMCACHED_NOTFOUND;
616 break;
617 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
618 rc= MEMCACHED_DATA_EXISTS;
619 break;
620 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
621 rc= MEMCACHED_NOTSTORED;
622 break;
623 case PROTOCOL_BINARY_RESPONSE_E2BIG:
624 rc= MEMCACHED_E2BIG;
625 break;
626 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
627 rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
628 break;
629 case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
630 rc= MEMCACHED_AUTH_CONTINUE;
631 break;
632 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
633 rc= MEMCACHED_AUTH_FAILURE;
634 break;
635 case PROTOCOL_BINARY_RESPONSE_EINVAL:
636 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
637 default:
638 /* @todo fix the error mappings */
639 rc= MEMCACHED_PROTOCOL_ERROR;
640 break;
641 }
642
643 return rc;
644 }