Merge in all scanner tree + virtual buckets.
[awesomized/libmemcached] / libmemcached / response.c
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: memcached_response() is used to determine the return result from an issued command.
9 *
10 */
11
12 #include "common.h"
13
14 static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
15 char *buffer, size_t buffer_length,
16 memcached_result_st *result);
17 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
18 char *buffer, size_t buffer_length,
19 memcached_result_st *result);
20
21 memcached_return_t memcached_read_one_response(memcached_server_write_instance_st ptr,
22 char *buffer, size_t buffer_length,
23 memcached_result_st *result)
24 {
25 memcached_server_response_decrement(ptr);
26
27 if (result == NULL)
28 {
29 memcached_st *root= (memcached_st *)ptr->root;
30 result = &root->result;
31 }
32
33 memcached_return_t rc;
34 if (ptr->root->flags.binary_protocol)
35 rc= binary_read_one_response(ptr, buffer, buffer_length, result);
36 else
37 rc= textual_read_one_response(ptr, buffer, buffer_length, result);
38
39 unlikely(rc == MEMCACHED_UNKNOWN_READ_FAILURE ||
40 rc == MEMCACHED_PROTOCOL_ERROR ||
41 rc == MEMCACHED_CLIENT_ERROR ||
42 rc == MEMCACHED_MEMORY_ALLOCATION_FAILURE)
43 memcached_io_reset(ptr);
44
45 return rc;
46 }
47
48 memcached_return_t memcached_response(memcached_server_write_instance_st ptr,
49 char *buffer, size_t buffer_length,
50 memcached_result_st *result)
51 {
52 /* We may have old commands in the buffer not set, first purge */
53 if ((ptr->root->flags.no_block) && (memcached_is_processing_input(ptr->root) == false))
54 {
55 (void)memcached_io_write(ptr, NULL, 0, true);
56 }
57
58 /*
59 * The previous implementation purged all pending requests and just
60 * returned the last one. Purge all pending messages to ensure backwards
61 * compatibility.
62 */
63 if (ptr->root->flags.binary_protocol == false)
64 {
65 while (memcached_server_response_count(ptr) > 1)
66 {
67 memcached_return_t rc= memcached_read_one_response(ptr, buffer, buffer_length, result);
68
69 unlikely (rc != MEMCACHED_END &&
70 rc != MEMCACHED_STORED &&
71 rc != MEMCACHED_SUCCESS &&
72 rc != MEMCACHED_STAT &&
73 rc != MEMCACHED_DELETED &&
74 rc != MEMCACHED_NOTFOUND &&
75 rc != MEMCACHED_NOTSTORED &&
76 rc != MEMCACHED_DATA_EXISTS)
77 return rc;
78 }
79 }
80
81 return memcached_read_one_response(ptr, buffer, buffer_length, result);
82 }
83
84 static memcached_return_t textual_value_fetch(memcached_server_write_instance_st ptr,
85 char *buffer,
86 memcached_result_st *result)
87 {
88 memcached_return_t rc= MEMCACHED_SUCCESS;
89 char *string_ptr;
90 char *end_ptr;
91 char *next_ptr;
92 size_t value_length;
93 size_t to_read;
94 char *value_ptr;
95 ssize_t read_length= 0;
96 memcached_return_t rrc;
97
98 if (ptr->root->flags.use_udp)
99 return MEMCACHED_NOT_SUPPORTED;
100
101 WATCHPOINT_ASSERT(ptr->root);
102 end_ptr= buffer + MEMCACHED_DEFAULT_COMMAND_SIZE;
103
104 memcached_result_reset(result);
105
106 string_ptr= buffer;
107 string_ptr+= 6; /* "VALUE " */
108
109
110 /* We load the key */
111 {
112 char *key;
113 size_t prefix_length;
114
115 key= result->item_key;
116 result->key_length= 0;
117
118 for (prefix_length= memcached_array_size(ptr->root->prefix_key); !(iscntrl(*string_ptr) || isspace(*string_ptr)) ; string_ptr++)
119 {
120 if (prefix_length == 0)
121 {
122 *key= *string_ptr;
123 key++;
124 result->key_length++;
125 }
126 else
127 prefix_length--;
128 }
129 result->item_key[result->key_length]= 0;
130 }
131
132 if (end_ptr == string_ptr)
133 goto read_error;
134
135 /* Flags fetch move past space */
136 string_ptr++;
137 if (end_ptr == string_ptr)
138 goto read_error;
139 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++);
140 result->item_flags= (uint32_t) strtoul(next_ptr, &string_ptr, 10);
141
142 if (end_ptr == string_ptr)
143 goto read_error;
144
145 /* Length fetch move past space*/
146 string_ptr++;
147 if (end_ptr == string_ptr)
148 goto read_error;
149
150 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++);
151 value_length= (size_t)strtoull(next_ptr, &string_ptr, 10);
152
153 if (end_ptr == string_ptr)
154 goto read_error;
155
156 /* Skip spaces */
157 if (*string_ptr == '\r')
158 {
159 /* Skip past the \r\n */
160 string_ptr+= 2;
161 }
162 else
163 {
164 string_ptr++;
165 for (next_ptr= string_ptr; isdigit(*string_ptr); string_ptr++);
166 result->item_cas= strtoull(next_ptr, &string_ptr, 10);
167 }
168
169 if (end_ptr < string_ptr)
170 goto read_error;
171
172 /* We add two bytes so that we can walk the \r\n */
173 rc= memcached_string_check(&result->value, value_length+2);
174 if (rc != MEMCACHED_SUCCESS)
175 {
176 value_length= 0;
177 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
178 }
179
180 value_ptr= memcached_string_value_mutable(&result->value);
181 /*
182 We read the \r\n into the string since not doing so is more
183 cycles then the waster of memory to do so.
184
185 We are null terminating through, which will most likely make
186 some people lazy about using the return length.
187 */
188 to_read= (value_length) + 2;
189 rrc= memcached_io_read(ptr, value_ptr, to_read, &read_length);
190 if (rrc != MEMCACHED_SUCCESS)
191 return rrc;
192
193 if (read_length != (ssize_t)(value_length + 2))
194 {
195 goto read_error;
196 }
197
198 /* This next bit blows the API, but this is internal....*/
199 {
200 char *char_ptr;
201 char_ptr= memcached_string_value_mutable(&result->value);;
202 char_ptr[value_length]= 0;
203 char_ptr[value_length + 1]= 0;
204 memcached_string_set_length(&result->value, value_length);
205 }
206
207 return MEMCACHED_SUCCESS;
208
209 read_error:
210 memcached_io_reset(ptr);
211
212 return MEMCACHED_PARTIAL_READ;
213 }
214
215 static memcached_return_t textual_read_one_response(memcached_server_write_instance_st ptr,
216 char *buffer, size_t buffer_length,
217 memcached_result_st *result)
218 {
219 memcached_return_t rc= memcached_io_readline(ptr, buffer, buffer_length);
220 if (rc != MEMCACHED_SUCCESS)
221 return rc;
222
223 switch(buffer[0])
224 {
225 case 'V': /* VALUE || VERSION */
226 if (buffer[1] == 'A') /* VALUE */
227 {
228 /* We add back in one because we will need to search for END */
229 memcached_server_response_increment(ptr);
230 return textual_value_fetch(ptr, buffer, result);
231 }
232 else if (buffer[1] == 'E') /* VERSION */
233 {
234 return MEMCACHED_SUCCESS;
235 }
236 else
237 {
238 WATCHPOINT_STRING(buffer);
239 return MEMCACHED_UNKNOWN_READ_FAILURE;
240 }
241 case 'O': /* OK */
242 return MEMCACHED_SUCCESS;
243 case 'S': /* STORED STATS SERVER_ERROR */
244 {
245 if (buffer[2] == 'A') /* STORED STATS */
246 {
247 memcached_server_response_increment(ptr);
248 return MEMCACHED_STAT;
249 }
250 else if (buffer[1] == 'E') /* SERVER_ERROR */
251 {
252 char *rel_ptr;
253 char *startptr= buffer + 13, *endptr= startptr;
254
255 while (*endptr != '\r' && *endptr != '\n') endptr++;
256
257 /*
258 Yes, we could make this "efficent" but to do that we would need
259 to maintain more state for the size of the buffer. Why waste
260 memory in the struct, which is important, for something that
261 rarely should happen?
262 */
263 rel_ptr= (char *)libmemcached_realloc(ptr->root,
264 ptr->cached_server_error,
265 (size_t) (endptr - startptr + 1));
266
267 if (rel_ptr == NULL)
268 {
269 /* If we happened to have some memory, we just null it since we don't know the size */
270 if (ptr->cached_server_error)
271 ptr->cached_server_error[0]= 0;
272 return MEMCACHED_SERVER_ERROR;
273 }
274 ptr->cached_server_error= rel_ptr;
275
276 memcpy(ptr->cached_server_error, startptr, (size_t) (endptr - startptr));
277 ptr->cached_server_error[endptr - startptr]= 0;
278 return MEMCACHED_SERVER_ERROR;
279 }
280 else if (buffer[1] == 'T')
281 return MEMCACHED_STORED;
282 else
283 {
284 WATCHPOINT_STRING(buffer);
285 return MEMCACHED_UNKNOWN_READ_FAILURE;
286 }
287 }
288 case 'D': /* DELETED */
289 return MEMCACHED_DELETED;
290 case 'N': /* NOT_FOUND */
291 {
292 if (buffer[4] == 'F')
293 return MEMCACHED_NOTFOUND;
294 else if (buffer[4] == 'S')
295 return MEMCACHED_NOTSTORED;
296 else
297 {
298 WATCHPOINT_STRING(buffer);
299 return MEMCACHED_UNKNOWN_READ_FAILURE;
300 }
301 }
302 case 'E': /* PROTOCOL ERROR or END */
303 {
304 if (buffer[1] == 'N')
305 return MEMCACHED_END;
306 else if (buffer[1] == 'R')
307 return MEMCACHED_PROTOCOL_ERROR;
308 else if (buffer[1] == 'X')
309 return MEMCACHED_DATA_EXISTS;
310 else
311 {
312 WATCHPOINT_STRING(buffer);
313 return MEMCACHED_UNKNOWN_READ_FAILURE;
314 }
315
316 }
317 case 'I': /* CLIENT ERROR */
318 /* We add back in one because we will need to search for END */
319 memcached_server_response_increment(ptr);
320 return MEMCACHED_ITEM;
321 case 'C': /* CLIENT ERROR */
322 return MEMCACHED_CLIENT_ERROR;
323 default:
324 {
325 unsigned long long auto_return_value;
326
327 if (sscanf(buffer, "%llu", &auto_return_value) == 1)
328 return MEMCACHED_SUCCESS;
329
330 WATCHPOINT_STRING(buffer);
331 return MEMCACHED_UNKNOWN_READ_FAILURE;
332 }
333 }
334
335 /* NOTREACHED */
336 }
337
338 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
339 char *buffer, size_t buffer_length,
340 memcached_result_st *result)
341 {
342 memcached_return_t rc;
343 protocol_binary_response_header header;
344
345 if ((rc= memcached_safe_read(ptr, &header.bytes, sizeof(header.bytes))) != MEMCACHED_SUCCESS)
346 {
347 WATCHPOINT_ERROR(rc);
348 return rc;
349 }
350
351 if (header.response.magic != PROTOCOL_BINARY_RES)
352 {
353 return MEMCACHED_PROTOCOL_ERROR;
354 }
355
356 /*
357 ** Convert the header to host local endian!
358 */
359 header.response.keylen= ntohs(header.response.keylen);
360 header.response.status= ntohs(header.response.status);
361 header.response.bodylen= ntohl(header.response.bodylen);
362 header.response.cas= ntohll(header.response.cas);
363 uint32_t bodylen= header.response.bodylen;
364
365 if (header.response.status == PROTOCOL_BINARY_RESPONSE_SUCCESS ||
366 header.response.status == PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE)
367 {
368 switch (header.response.opcode)
369 {
370 case PROTOCOL_BINARY_CMD_GETKQ:
371 /*
372 * We didn't increment the response counter for the GETKQ packet
373 * (only the final NOOP), so we need to increment the counter again.
374 */
375 memcached_server_response_increment(ptr);
376 /* FALLTHROUGH */
377 case PROTOCOL_BINARY_CMD_GETK:
378 {
379 uint16_t keylen= header.response.keylen;
380 memcached_result_reset(result);
381 result->item_cas= header.response.cas;
382
383 if ((rc= memcached_safe_read(ptr, &result->item_flags, sizeof (result->item_flags))) != MEMCACHED_SUCCESS)
384 {
385 WATCHPOINT_ERROR(rc);
386 return MEMCACHED_UNKNOWN_READ_FAILURE;
387 }
388
389 result->item_flags= ntohl(result->item_flags);
390 bodylen -= header.response.extlen;
391
392 result->key_length= keylen;
393 if ((rc= memcached_safe_read(ptr, result->item_key, keylen)) != MEMCACHED_SUCCESS)
394 {
395 WATCHPOINT_ERROR(rc);
396 return MEMCACHED_UNKNOWN_READ_FAILURE;
397 }
398
399 bodylen -= keylen;
400 if (memcached_string_check(&result->value,
401 bodylen) != MEMCACHED_SUCCESS)
402 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
403
404 char *vptr= memcached_string_value_mutable(&result->value);
405 if ((rc= memcached_safe_read(ptr, vptr, bodylen)) != MEMCACHED_SUCCESS)
406 {
407 WATCHPOINT_ERROR(rc);
408 return MEMCACHED_UNKNOWN_READ_FAILURE;
409 }
410
411 memcached_string_set_length(&result->value, bodylen);
412 }
413 break;
414 case PROTOCOL_BINARY_CMD_INCREMENT:
415 case PROTOCOL_BINARY_CMD_DECREMENT:
416 {
417 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
418 return MEMCACHED_PROTOCOL_ERROR;
419
420 WATCHPOINT_ASSERT(bodylen == buffer_length);
421 uint64_t val;
422 if ((rc= memcached_safe_read(ptr, &val, sizeof(val))) != MEMCACHED_SUCCESS)
423 {
424 WATCHPOINT_ERROR(rc);
425 return MEMCACHED_UNKNOWN_READ_FAILURE;
426 }
427
428 val= ntohll(val);
429 memcpy(buffer, &val, sizeof(val));
430 }
431 break;
432 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
433 case PROTOCOL_BINARY_CMD_VERSION:
434 {
435 memset(buffer, 0, buffer_length);
436 if (bodylen >= buffer_length)
437 {
438 /* not enough space in buffer.. should not happen... */
439 return MEMCACHED_UNKNOWN_READ_FAILURE;
440 }
441 else if ((rc= memcached_safe_read(ptr, buffer, bodylen)) != MEMCACHED_SUCCESS)
442 {
443 WATCHPOINT_ERROR(rc);
444 return MEMCACHED_UNKNOWN_READ_FAILURE;
445 }
446 }
447 break;
448 case PROTOCOL_BINARY_CMD_FLUSH:
449 case PROTOCOL_BINARY_CMD_QUIT:
450 case PROTOCOL_BINARY_CMD_SET:
451 case PROTOCOL_BINARY_CMD_ADD:
452 case PROTOCOL_BINARY_CMD_REPLACE:
453 case PROTOCOL_BINARY_CMD_APPEND:
454 case PROTOCOL_BINARY_CMD_PREPEND:
455 case PROTOCOL_BINARY_CMD_DELETE:
456 {
457 WATCHPOINT_ASSERT(bodylen == 0);
458 return MEMCACHED_SUCCESS;
459 }
460 case PROTOCOL_BINARY_CMD_NOOP:
461 {
462 WATCHPOINT_ASSERT(bodylen == 0);
463 return MEMCACHED_END;
464 }
465 case PROTOCOL_BINARY_CMD_STAT:
466 {
467 if (bodylen == 0)
468 {
469 return MEMCACHED_END;
470 }
471 else if (bodylen + 1 > buffer_length)
472 {
473 /* not enough space in buffer.. should not happen... */
474 return MEMCACHED_UNKNOWN_READ_FAILURE;
475 }
476 else
477 {
478 size_t keylen= header.response.keylen;
479 memset(buffer, 0, buffer_length);
480 if ((rc= memcached_safe_read(ptr, buffer, keylen)) != MEMCACHED_SUCCESS ||
481 (rc= memcached_safe_read(ptr, buffer + keylen + 1, bodylen - keylen)) != MEMCACHED_SUCCESS)
482 {
483 WATCHPOINT_ERROR(rc);
484 return MEMCACHED_UNKNOWN_READ_FAILURE;
485 }
486 }
487 }
488 break;
489
490 case PROTOCOL_BINARY_CMD_SASL_AUTH:
491 case PROTOCOL_BINARY_CMD_SASL_STEP:
492 {
493 memcached_result_reset(result);
494 result->item_cas= header.response.cas;
495
496 if (memcached_string_check(&result->value,
497 bodylen) != MEMCACHED_SUCCESS)
498 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
499
500 char *vptr= memcached_string_value_mutable(&result->value);
501 if ((rc= memcached_safe_read(ptr, vptr, bodylen)) != MEMCACHED_SUCCESS)
502 {
503 WATCHPOINT_ERROR(rc);
504 return MEMCACHED_UNKNOWN_READ_FAILURE;
505 }
506
507 memcached_string_set_length(&result->value, bodylen);
508 }
509 break;
510 default:
511 {
512 /* Command not implemented yet! */
513 WATCHPOINT_ASSERT(0);
514 return MEMCACHED_PROTOCOL_ERROR;
515 }
516 }
517 }
518 else if (header.response.bodylen)
519 {
520 /* What should I do with the error message??? just discard it for now */
521 char hole[SMALL_STRING_LEN];
522 while (bodylen > 0)
523 {
524 size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen;
525 if ((rc= memcached_safe_read(ptr, hole, nr)) != MEMCACHED_SUCCESS)
526 {
527 WATCHPOINT_ERROR(rc);
528 return MEMCACHED_UNKNOWN_READ_FAILURE;
529 }
530 bodylen-= (uint32_t) nr;
531 }
532
533 /* This might be an error from one of the quiet commands.. if
534 * so, just throw it away and get the next one. What about creating
535 * a callback to the user with the error information?
536 */
537 switch (header.response.opcode)
538 {
539 case PROTOCOL_BINARY_CMD_SETQ:
540 case PROTOCOL_BINARY_CMD_ADDQ:
541 case PROTOCOL_BINARY_CMD_REPLACEQ:
542 case PROTOCOL_BINARY_CMD_APPENDQ:
543 case PROTOCOL_BINARY_CMD_PREPENDQ:
544 return binary_read_one_response(ptr, buffer, buffer_length, result);
545 default:
546 break;
547 }
548 }
549
550 rc= MEMCACHED_SUCCESS;
551 unlikely(header.response.status != 0)
552 switch (header.response.status)
553 {
554 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
555 rc= MEMCACHED_NOTFOUND;
556 break;
557 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
558 rc= MEMCACHED_DATA_EXISTS;
559 break;
560 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
561 rc= MEMCACHED_NOTSTORED;
562 break;
563 case PROTOCOL_BINARY_RESPONSE_E2BIG:
564 rc= MEMCACHED_E2BIG;
565 break;
566 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
567 rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
568 break;
569 case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
570 rc= MEMCACHED_AUTH_CONTINUE;
571 break;
572 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
573 rc= MEMCACHED_AUTH_FAILURE;
574 break;
575 case PROTOCOL_BINARY_RESPONSE_EINVAL:
576 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
577 default:
578 /* @todo fix the error mappings */
579 rc= MEMCACHED_PROTOCOL_ERROR;
580 break;
581 }
582
583 return rc;
584 }