c8b2896593a207f3b1eb35fde5adba6900412c2d
[awesomized/libmemcached] / libmemcached / memcached_response.c
1 /*
2 Memcached library
3
4 memcached_response() is used to determine the return result
5 from an issued command.
6 */
7
8 #include "common.h"
9 #include "memcached_io.h"
10
11 static memcached_return binary_response(memcached_server_st *ptr,
12 char *buffer, size_t buffer_length,
13 memcached_result_st *result);
14
15 memcached_return memcached_response(memcached_server_st *ptr,
16 char *buffer, size_t buffer_length,
17 memcached_result_st *result)
18 {
19 unsigned int x;
20 size_t send_length;
21 char *buffer_ptr;
22 unsigned int max_messages;
23
24
25 send_length= 0;
26 /* UDP at the moment is odd...*/
27 if (ptr->type == MEMCACHED_CONNECTION_UDP)
28 {
29 char buffer[8];
30 ssize_t read_length;
31
32 return MEMCACHED_SUCCESS;
33
34 read_length= memcached_io_read(ptr, buffer, 8);
35 }
36
37 /* We may have old commands in the buffer not set, first purge */
38 if (ptr->root->flags & MEM_NO_BLOCK)
39 (void)memcached_io_write(ptr, NULL, 0, 1);
40
41 if (ptr->root->flags & MEM_BINARY_PROTOCOL)
42 return binary_response(ptr, buffer, buffer_length, result);
43
44 max_messages= memcached_server_response_count(ptr);
45 for (x= 0; x < max_messages; x++)
46 {
47 size_t total_length= 0;
48 buffer_ptr= buffer;
49
50
51 while (1)
52 {
53 ssize_t read_length;
54
55 read_length= memcached_io_read(ptr, buffer_ptr, 1);
56 WATCHPOINT_ASSERT(isgraph(*buffer_ptr) || isspace(*buffer_ptr));
57
58 if (read_length != 1)
59 {
60 memcached_io_reset(ptr);
61 return MEMCACHED_UNKNOWN_READ_FAILURE;
62 }
63
64 if (*buffer_ptr == '\n')
65 break;
66 else
67 buffer_ptr++;
68
69 total_length++;
70 WATCHPOINT_ASSERT(total_length <= buffer_length);
71
72 if (total_length >= buffer_length)
73 {
74 memcached_io_reset(ptr);
75 return MEMCACHED_PROTOCOL_ERROR;
76 }
77 }
78 buffer_ptr++;
79 *buffer_ptr= 0;
80
81 memcached_server_response_decrement(ptr);
82 }
83
84 uint64_t auto_return_value= 0;
85 switch(buffer[0])
86 {
87 case 'V': /* VALUE || VERSION */
88 if (buffer[1] == 'A') /* VALUE */
89 {
90 memcached_return rc;
91
92 /* We add back in one because we will need to search for END */
93 memcached_server_response_increment(ptr);
94 if (result)
95 rc= value_fetch(ptr, buffer, result);
96 else
97 rc= value_fetch(ptr, buffer, &ptr->root->result);
98
99 return rc;
100 }
101 else if (buffer[1] == 'E') /* VERSION */
102 {
103 return MEMCACHED_SUCCESS;
104 }
105 else
106 {
107 WATCHPOINT_STRING(buffer);
108 WATCHPOINT_ASSERT(0);
109 memcached_io_reset(ptr);
110 return MEMCACHED_UNKNOWN_READ_FAILURE;
111 }
112 case 'O': /* OK */
113 return MEMCACHED_SUCCESS;
114 case 'S': /* STORED STATS SERVER_ERROR */
115 {
116 if (buffer[2] == 'A') /* STORED STATS */
117 {
118 memcached_server_response_increment(ptr);
119 return MEMCACHED_STAT;
120 }
121 else if (buffer[1] == 'E')
122 return MEMCACHED_SERVER_ERROR;
123 else if (buffer[1] == 'T')
124 return MEMCACHED_STORED;
125 else
126 {
127 WATCHPOINT_STRING(buffer);
128 WATCHPOINT_ASSERT(0);
129 memcached_io_reset(ptr);
130 return MEMCACHED_UNKNOWN_READ_FAILURE;
131 }
132 }
133 case 'D': /* DELETED */
134 return MEMCACHED_DELETED;
135 case 'N': /* NOT_FOUND */
136 {
137 if (buffer[4] == 'F')
138 return MEMCACHED_NOTFOUND;
139 else if (buffer[4] == 'S')
140 return MEMCACHED_NOTSTORED;
141 else
142 {
143 memcached_io_reset(ptr);
144 return MEMCACHED_UNKNOWN_READ_FAILURE;
145 }
146 }
147 case 'E': /* PROTOCOL ERROR or END */
148 {
149 if (buffer[1] == 'N')
150 return MEMCACHED_END;
151 else if (buffer[1] == 'R')
152 {
153 memcached_io_reset(ptr);
154 return MEMCACHED_PROTOCOL_ERROR;
155 }
156 else if (buffer[1] == 'X')
157 {
158 memcached_io_reset(ptr);
159 return MEMCACHED_DATA_EXISTS;
160 }
161 else
162 {
163 memcached_io_reset(ptr);
164 return MEMCACHED_UNKNOWN_READ_FAILURE;
165 }
166 }
167 case 'C': /* CLIENT ERROR */
168 memcached_io_reset(ptr);
169 return MEMCACHED_CLIENT_ERROR;
170 default:
171 if (sscanf(buffer, "%lld", &auto_return_value) == 1)
172 return MEMCACHED_SUCCESS;
173 memcached_io_reset(ptr);
174 return MEMCACHED_UNKNOWN_READ_FAILURE;
175
176 }
177
178 return MEMCACHED_SUCCESS;
179 }
180
181 char *memcached_result_value(memcached_result_st *ptr)
182 {
183 memcached_string_st *sptr= &ptr->value;
184 return memcached_string_value(sptr);
185 }
186
187 size_t memcached_result_length(memcached_result_st *ptr)
188 {
189 memcached_string_st *sptr= &ptr->value;
190 return memcached_string_length(sptr);
191 }
192
193 /**
194 * Read a given number of bytes from the server and place it into a specific
195 * buffer. Reset the IO channel or this server if an error occurs.
196 */
197 static memcached_return safe_read(memcached_server_st *ptr, void *dta,
198 size_t size)
199 {
200 int offset= 0;
201 char *data= dta;
202 while (offset < size)
203 {
204 ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
205 if (nread <= 0)
206 {
207 memcached_io_reset(ptr);
208 return MEMCACHED_UNKNOWN_READ_FAILURE;
209 }
210 offset += nread;
211 }
212
213 return MEMCACHED_SUCCESS;
214 }
215
216 static memcached_return binary_response(memcached_server_st *ptr,
217 char *buffer,
218 size_t buffer_length,
219 memcached_result_st *result)
220 {
221 protocol_binary_response_header header;
222 memcached_server_response_decrement(ptr);
223
224 unlikely (safe_read(ptr, &header.bytes,
225 sizeof(header.bytes)) != MEMCACHED_SUCCESS)
226 return MEMCACHED_UNKNOWN_READ_FAILURE;
227
228 unlikely (header.response.magic != PROTOCOL_BINARY_RES)
229 {
230 memcached_io_reset(ptr);
231 return MEMCACHED_PROTOCOL_ERROR;
232 }
233
234 /*
235 ** Convert the header to host local endian!
236 */
237 header.response.keylen= ntohs(header.response.keylen);
238 header.response.status= ntohs(header.response.status);
239 header.response.bodylen= ntohl(header.response.bodylen);
240 header.response.cas= ntohll(header.response.cas);
241 uint32_t bodylen= header.response.bodylen;
242
243 if (header.response.status == 0)
244 {
245 if ((header.response.opcode == PROTOCOL_BINARY_CMD_GETK) ||
246 (header.response.opcode == PROTOCOL_BINARY_CMD_GETKQ))
247 {
248 uint16_t keylen= header.response.keylen;
249 memcached_result_reset(result);
250 result->cas= header.response.cas;
251
252 if (safe_read(ptr, &result->flags,
253 sizeof(result->flags)) != MEMCACHED_SUCCESS)
254 {
255 return MEMCACHED_UNKNOWN_READ_FAILURE;
256 }
257 result->flags= ntohl(result->flags);
258 bodylen -= header.response.extlen;
259
260 result->key_length= keylen;
261 if (safe_read(ptr, result->key, keylen) != MEMCACHED_SUCCESS)
262 {
263 return MEMCACHED_UNKNOWN_READ_FAILURE;
264 }
265
266 bodylen -= keylen;
267 if (memcached_string_check(&result->value,
268 bodylen) != MEMCACHED_SUCCESS)
269 {
270 memcached_io_reset(ptr);
271 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
272 }
273
274 char *vptr= memcached_string_value(&result->value);
275 if (safe_read(ptr, vptr, bodylen) != MEMCACHED_SUCCESS)
276 {
277 return MEMCACHED_UNKNOWN_READ_FAILURE;
278 }
279
280 memcached_string_set_length(&result->value, bodylen);
281 }
282 else if ((header.response.opcode == PROTOCOL_BINARY_CMD_INCREMENT) ||
283 (header.response.opcode == PROTOCOL_BINARY_CMD_DECREMENT))
284 {
285 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
286 {
287 return MEMCACHED_PROTOCOL_ERROR;
288 }
289
290 WATCHPOINT_ASSERT(bodylen == buffer_length);
291 uint64_t val;
292 if (safe_read(ptr, &val, sizeof(val)) != MEMCACHED_SUCCESS)
293 {
294 return MEMCACHED_UNKNOWN_READ_FAILURE;
295 }
296
297 val= ntohll(val);
298 memcpy(buffer, &val, sizeof(val));
299 }
300 else if (header.response.opcode == PROTOCOL_BINARY_CMD_VERSION)
301 {
302 memset(buffer, 0, buffer_length);
303 if (bodylen >= buffer_length)
304 /* not enough space in buffer.. should not happen... */
305 return MEMCACHED_UNKNOWN_READ_FAILURE;
306 else
307 safe_read(ptr, buffer, bodylen);
308 }
309 else if ((header.response.opcode == PROTOCOL_BINARY_CMD_FLUSH) ||
310 (header.response.opcode == PROTOCOL_BINARY_CMD_QUIT) ||
311 (header.response.opcode == PROTOCOL_BINARY_CMD_SET) ||
312 (header.response.opcode == PROTOCOL_BINARY_CMD_ADD) ||
313 (header.response.opcode == PROTOCOL_BINARY_CMD_REPLACE) ||
314 (header.response.opcode == PROTOCOL_BINARY_CMD_APPEND) ||
315 (header.response.opcode == PROTOCOL_BINARY_CMD_PREPEND) ||
316 (header.response.opcode == PROTOCOL_BINARY_CMD_DELETE))
317 {
318 WATCHPOINT_ASSERT(bodylen == 0);
319 return MEMCACHED_SUCCESS;
320 }
321 else if (header.response.opcode == PROTOCOL_BINARY_CMD_NOOP)
322 {
323 WATCHPOINT_ASSERT(bodylen == 0);
324 return MEMCACHED_END;
325 }
326 else if (header.response.opcode == PROTOCOL_BINARY_CMD_STAT)
327 {
328 if (bodylen == 0)
329 return MEMCACHED_END;
330 else if (bodylen + 1 > buffer_length)
331 /* not enough space in buffer.. should not happen... */
332 return MEMCACHED_UNKNOWN_READ_FAILURE;
333 else
334 {
335 size_t keylen= header.response.keylen;
336 memset(buffer, 0, buffer_length);
337 safe_read(ptr, buffer, keylen);
338 safe_read(ptr, buffer + keylen + 1, bodylen - keylen);
339 }
340 }
341 else
342 {
343 /* Command not implemented yet! */
344 WATCHPOINT_ASSERT(0);
345 memcached_io_reset(ptr);
346 return MEMCACHED_PROTOCOL_ERROR;
347 }
348 }
349 else if (header.response.bodylen)
350 {
351 /* What should I do with the error message??? just discard it for now */
352 char hole[1024];
353 while (bodylen > 0)
354 {
355 size_t nr= (bodylen > sizeof(hole)) ? sizeof(hole) : bodylen;
356 safe_read(ptr, hole, nr);
357 bodylen -= nr;
358 }
359 }
360
361 memcached_return rc= MEMCACHED_SUCCESS;
362 unlikely(header.response.status != 0)
363 switch (header.response.status)
364 {
365 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
366 rc= MEMCACHED_NOTFOUND;
367 break;
368 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
369 rc= MEMCACHED_DATA_EXISTS;
370 break;
371 case PROTOCOL_BINARY_RESPONSE_E2BIG:
372 case PROTOCOL_BINARY_RESPONSE_EINVAL:
373 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
374 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
375 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
376 default:
377 /* @todo fix the error mappings */
378 rc= MEMCACHED_PROTOCOL_ERROR;
379 break;
380 }
381
382 return rc;
383 }