Fix for bad disconnect during test run.
[awesomized/libmemcached] / libmemcached / memcached_response.c
1 /*
2 Memcached library
3
4 memcached_response() is used to determine the return result
5 from an issued command.
6 */
7
8 #include "common.h"
9 #include "memcached_io.h"
10
11 static memcached_return binary_response(memcached_server_st *ptr,
12 char *buffer, size_t buffer_length,
13 memcached_result_st *result);
14
15 memcached_return memcached_response(memcached_server_st *ptr,
16 char *buffer, size_t buffer_length,
17 memcached_result_st *result)
18 {
19 unsigned int x;
20 size_t send_length;
21 char *buffer_ptr;
22 unsigned int max_messages;
23
24
25 send_length= 0;
26 /* UDP at the moment is odd...*/
27 if (ptr->type == MEMCACHED_CONNECTION_UDP)
28 {
29 char buffer[8];
30 ssize_t read_length;
31
32 return MEMCACHED_SUCCESS;
33
34 read_length= memcached_io_read(ptr, buffer, 8);
35 }
36
37 /* We may have old commands in the buffer not set, first purge */
38 if (ptr->root->flags & MEM_NO_BLOCK)
39 (void)memcached_io_write(ptr, NULL, 0, 1);
40
41 if (ptr->root->flags & MEM_BINARY_PROTOCOL)
42 return binary_response(ptr, buffer, buffer_length, result);
43
44 max_messages= memcached_server_response_count(ptr);
45 for (x= 0; x < max_messages; x++)
46 {
47 size_t total_length= 0;
48 buffer_ptr= buffer;
49
50
51 while (1)
52 {
53 ssize_t read_length;
54
55 read_length= memcached_io_read(ptr, buffer_ptr, 1);
56 WATCHPOINT_ASSERT(*buffer_ptr != '\0');
57
58 if (read_length != 1)
59 {
60 memcached_io_reset(ptr);
61 return MEMCACHED_UNKNOWN_READ_FAILURE;
62 }
63
64 if (*buffer_ptr == '\n')
65 break;
66 else
67 buffer_ptr++;
68
69 total_length++;
70 WATCHPOINT_ASSERT(total_length <= buffer_length);
71
72 if (total_length >= buffer_length)
73 {
74 memcached_io_reset(ptr);
75 return MEMCACHED_PROTOCOL_ERROR;
76 }
77 }
78 buffer_ptr++;
79 *buffer_ptr= 0;
80
81 memcached_server_response_decrement(ptr);
82 }
83
84 switch(buffer[0])
85 {
86 case 'V': /* VALUE || VERSION */
87 if (buffer[1] == 'A') /* VALUE */
88 {
89 memcached_return rc;
90
91 /* We add back in one because we will need to search for END */
92 memcached_server_response_increment(ptr);
93 if (result)
94 rc= value_fetch(ptr, buffer, result);
95 else
96 rc= value_fetch(ptr, buffer, &ptr->root->result);
97
98 return rc;
99 }
100 else if (buffer[1] == 'E') /* VERSION */
101 {
102 return MEMCACHED_SUCCESS;
103 }
104 else
105 {
106 WATCHPOINT_STRING(buffer);
107 WATCHPOINT_ASSERT(0);
108 memcached_io_reset(ptr);
109 return MEMCACHED_UNKNOWN_READ_FAILURE;
110 }
111 case 'O': /* OK */
112 return MEMCACHED_SUCCESS;
113 case 'S': /* STORED STATS SERVER_ERROR */
114 {
115 if (buffer[2] == 'A') /* STORED STATS */
116 {
117 memcached_server_response_increment(ptr);
118 return MEMCACHED_STAT;
119 }
120 else if (buffer[1] == 'E')
121 return MEMCACHED_SERVER_ERROR;
122 else if (buffer[1] == 'T')
123 return MEMCACHED_STORED;
124 else
125 {
126 WATCHPOINT_STRING(buffer);
127 WATCHPOINT_ASSERT(0);
128 memcached_io_reset(ptr);
129 return MEMCACHED_UNKNOWN_READ_FAILURE;
130 }
131 }
132 case 'D': /* DELETED */
133 return MEMCACHED_DELETED;
134 case 'N': /* NOT_FOUND */
135 {
136 if (buffer[4] == 'F')
137 return MEMCACHED_NOTFOUND;
138 else if (buffer[4] == 'S')
139 return MEMCACHED_NOTSTORED;
140 else
141 {
142 memcached_io_reset(ptr);
143 return MEMCACHED_UNKNOWN_READ_FAILURE;
144 }
145 }
146 case 'E': /* PROTOCOL ERROR or END */
147 {
148 if (buffer[1] == 'N')
149 return MEMCACHED_END;
150 else if (buffer[1] == 'R')
151 {
152 memcached_io_reset(ptr);
153 return MEMCACHED_PROTOCOL_ERROR;
154 }
155 else if (buffer[1] == 'X')
156 {
157 memcached_io_reset(ptr);
158 return MEMCACHED_DATA_EXISTS;
159 }
160 else
161 {
162 memcached_io_reset(ptr);
163 return MEMCACHED_UNKNOWN_READ_FAILURE;
164 }
165 }
166 case 'C': /* CLIENT ERROR */
167 memcached_io_reset(ptr);
168 return MEMCACHED_CLIENT_ERROR;
169 default:
170 {
171 unsigned long long auto_return_value;
172
173 if (sscanf(buffer, "%llu", &auto_return_value) == 1)
174 return MEMCACHED_SUCCESS;
175
176 memcached_io_reset(ptr);
177
178 return MEMCACHED_UNKNOWN_READ_FAILURE;
179 }
180
181 }
182
183 return MEMCACHED_SUCCESS;
184 }
185
186 char *memcached_result_value(memcached_result_st *ptr)
187 {
188 memcached_string_st *sptr= &ptr->value;
189 return memcached_string_value(sptr);
190 }
191
192 size_t memcached_result_length(memcached_result_st *ptr)
193 {
194 memcached_string_st *sptr= &ptr->value;
195 return memcached_string_length(sptr);
196 }
197
198 /**
199 * Read a given number of bytes from the server and place it into a specific
200 * buffer. Reset the IO channel or this server if an error occurs.
201 */
202 static memcached_return safe_read(memcached_server_st *ptr, void *dta,
203 size_t size)
204 {
205 size_t offset= 0;
206 char *data= dta;
207
208 while (offset < size)
209 {
210 ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
211 if (nread <= 0)
212 {
213 memcached_io_reset(ptr);
214 return MEMCACHED_UNKNOWN_READ_FAILURE;
215 }
216 offset += nread;
217 }
218
219 return MEMCACHED_SUCCESS;
220 }
221
222 static memcached_return binary_response(memcached_server_st *ptr,
223 char *buffer,
224 size_t buffer_length,
225 memcached_result_st *result)
226 {
227 protocol_binary_response_header header;
228 memcached_server_response_decrement(ptr);
229
230 unlikely (safe_read(ptr, &header.bytes,
231 sizeof(header.bytes)) != MEMCACHED_SUCCESS)
232 return MEMCACHED_UNKNOWN_READ_FAILURE;
233
234 unlikely (header.response.magic != PROTOCOL_BINARY_RES)
235 {
236 memcached_io_reset(ptr);
237 return MEMCACHED_PROTOCOL_ERROR;
238 }
239
240 /*
241 ** Convert the header to host local endian!
242 */
243 header.response.keylen= ntohs(header.response.keylen);
244 header.response.status= ntohs(header.response.status);
245 header.response.bodylen= ntohl(header.response.bodylen);
246 header.response.cas= ntohll(header.response.cas);
247 uint32_t bodylen= header.response.bodylen;
248
249 if (header.response.status == 0)
250 {
251 switch (header.response.opcode)
252 {
253 case PROTOCOL_BINARY_CMD_GETK:
254 case PROTOCOL_BINARY_CMD_GETKQ:
255 {
256 uint16_t keylen= header.response.keylen;
257 memcached_result_reset(result);
258 result->cas= header.response.cas;
259
260 if (safe_read(ptr, &result->flags,
261 sizeof(result->flags)) != MEMCACHED_SUCCESS)
262 {
263 return MEMCACHED_UNKNOWN_READ_FAILURE;
264 }
265 result->flags= ntohl(result->flags);
266 bodylen -= header.response.extlen;
267
268 result->key_length= keylen;
269 if (safe_read(ptr, result->key, keylen) != MEMCACHED_SUCCESS)
270 {
271 return MEMCACHED_UNKNOWN_READ_FAILURE;
272 }
273
274 bodylen -= keylen;
275 if (memcached_string_check(&result->value,
276 bodylen) != MEMCACHED_SUCCESS)
277 {
278 memcached_io_reset(ptr);
279 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
280 }
281
282 char *vptr= memcached_string_value(&result->value);
283 if (safe_read(ptr, vptr, bodylen) != MEMCACHED_SUCCESS)
284 {
285 return MEMCACHED_UNKNOWN_READ_FAILURE;
286 }
287
288 memcached_string_set_length(&result->value, bodylen);
289 }
290 break;
291 case PROTOCOL_BINARY_CMD_INCREMENT:
292 case PROTOCOL_BINARY_CMD_DECREMENT:
293 {
294 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
295 {
296 return MEMCACHED_PROTOCOL_ERROR;
297 }
298
299 WATCHPOINT_ASSERT(bodylen == buffer_length);
300 uint64_t val;
301 if (safe_read(ptr, &val, sizeof(val)) != MEMCACHED_SUCCESS)
302 {
303 return MEMCACHED_UNKNOWN_READ_FAILURE;
304 }
305
306 val= ntohll(val);
307 memcpy(buffer, &val, sizeof(val));
308 }
309 break;
310 case PROTOCOL_BINARY_CMD_VERSION:
311 {
312 memset(buffer, 0, buffer_length);
313 if (bodylen >= buffer_length)
314 /* not enough space in buffer.. should not happen... */
315 return MEMCACHED_UNKNOWN_READ_FAILURE;
316 else
317 safe_read(ptr, buffer, bodylen);
318 }
319 break;
320 case PROTOCOL_BINARY_CMD_FLUSH:
321 case PROTOCOL_BINARY_CMD_QUIT:
322 case PROTOCOL_BINARY_CMD_SET:
323 case PROTOCOL_BINARY_CMD_ADD:
324 case PROTOCOL_BINARY_CMD_REPLACE:
325 case PROTOCOL_BINARY_CMD_APPEND:
326 case PROTOCOL_BINARY_CMD_PREPEND:
327 case PROTOCOL_BINARY_CMD_DELETE:
328 {
329 WATCHPOINT_ASSERT(bodylen == 0);
330 return MEMCACHED_SUCCESS;
331 }
332 break;
333 case PROTOCOL_BINARY_CMD_NOOP:
334 {
335 WATCHPOINT_ASSERT(bodylen == 0);
336 return MEMCACHED_END;
337 }
338 break;
339 case PROTOCOL_BINARY_CMD_STAT:
340 {
341 if (bodylen == 0)
342 return MEMCACHED_END;
343 else if (bodylen + 1 > buffer_length)
344 /* not enough space in buffer.. should not happen... */
345 return MEMCACHED_UNKNOWN_READ_FAILURE;
346 else
347 {
348 size_t keylen= header.response.keylen;
349 memset(buffer, 0, buffer_length);
350 safe_read(ptr, buffer, keylen);
351 safe_read(ptr, buffer + keylen + 1, bodylen - keylen);
352 }
353 }
354 break;
355 default:
356 {
357 /* Command not implemented yet! */
358 WATCHPOINT_ASSERT(0);
359 memcached_io_reset(ptr);
360 return MEMCACHED_PROTOCOL_ERROR;
361 }
362 }
363 }
364 else if (header.response.bodylen)
365 {
366 /* What should I do with the error message??? just discard it for now */
367 char buffer[SMALL_STRING_LEN];
368 while (bodylen > 0)
369 {
370 size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen;
371 safe_read(ptr, buffer, nr);
372 bodylen -= nr;
373 }
374 }
375
376 memcached_return rc= MEMCACHED_SUCCESS;
377 unlikely(header.response.status != 0)
378 switch (header.response.status)
379 {
380 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
381 rc= MEMCACHED_NOTFOUND;
382 break;
383 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
384 rc= MEMCACHED_DATA_EXISTS;
385 break;
386 case PROTOCOL_BINARY_RESPONSE_E2BIG:
387 case PROTOCOL_BINARY_RESPONSE_EINVAL:
388 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
389 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
390 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
391 default:
392 /* @todo fix the error mappings */
393 rc= MEMCACHED_PROTOCOL_ERROR;
394 break;
395 }
396
397 return rc;
398 }