Initial support for the binary protocol
[awesomized/libmemcached] / libmemcached / memcached_response.c
1 /*
2 Memcached library
3
4 memcached_response() is used to determine the return result
5 from an issued command.
6 */
7
8 #include "common.h"
9 #include "memcached_io.h"
10
11 static memcached_return binary_response(memcached_server_st *ptr,
12 char *buffer, size_t buffer_length,
13 memcached_result_st *result);
14
15 memcached_return memcached_response(memcached_server_st *ptr,
16 char *buffer, size_t buffer_length,
17 memcached_result_st *result)
18 {
19 unsigned int x;
20 size_t send_length;
21 char *buffer_ptr;
22 unsigned int max_messages;
23
24
25 send_length= 0;
26 /* UDP at the moment is odd...*/
27 if (ptr->type == MEMCACHED_CONNECTION_UDP)
28 {
29 char buffer[8];
30 ssize_t read_length;
31
32 return MEMCACHED_SUCCESS;
33
34 read_length= memcached_io_read(ptr, buffer, 8);
35 }
36
37 /* We may have old commands in the buffer not set, first purge */
38 if (ptr->root->flags & MEM_NO_BLOCK)
39 (void)memcached_io_write(ptr, NULL, 0, 1);
40
41 if (ptr->root->flags & MEM_BINARY_PROTOCOL)
42 return binary_response(ptr, buffer, buffer_length, result);
43
44 max_messages= memcached_server_response_count(ptr);
45 for (x= 0; x < max_messages; x++)
46 {
47 size_t total_length= 0;
48 buffer_ptr= buffer;
49
50
51 while (1)
52 {
53 ssize_t read_length;
54
55 read_length= memcached_io_read(ptr, buffer_ptr, 1);
56 WATCHPOINT_ASSERT(isgraph(*buffer_ptr) || isspace(*buffer_ptr));
57
58 if (read_length != 1)
59 {
60 memcached_io_reset(ptr);
61 return MEMCACHED_UNKNOWN_READ_FAILURE;
62 }
63
64 if (*buffer_ptr == '\n')
65 break;
66 else
67 buffer_ptr++;
68
69 total_length++;
70 WATCHPOINT_ASSERT(total_length <= buffer_length);
71
72 if (total_length >= buffer_length)
73 {
74 memcached_io_reset(ptr);
75 return MEMCACHED_PROTOCOL_ERROR;
76 }
77 }
78 buffer_ptr++;
79 *buffer_ptr= 0;
80
81 memcached_server_response_decrement(ptr);
82 }
83
84 switch(buffer[0])
85 {
86 case 'V': /* VALUE || VERSION */
87 if (buffer[1] == 'A') /* VALUE */
88 {
89 memcached_return rc;
90
91 /* We add back in one because we will need to search for END */
92 memcached_server_response_increment(ptr);
93 if (result)
94 rc= value_fetch(ptr, buffer, result);
95 else
96 rc= value_fetch(ptr, buffer, &ptr->root->result);
97
98 return rc;
99 }
100 else if (buffer[1] == 'E') /* VERSION */
101 {
102 return MEMCACHED_SUCCESS;
103 }
104 else
105 {
106 WATCHPOINT_STRING(buffer);
107 WATCHPOINT_ASSERT(0);
108 memcached_io_reset(ptr);
109 return MEMCACHED_UNKNOWN_READ_FAILURE;
110 }
111 case 'O': /* OK */
112 return MEMCACHED_SUCCESS;
113 case 'S': /* STORED STATS SERVER_ERROR */
114 {
115 if (buffer[2] == 'A') /* STORED STATS */
116 {
117 memcached_server_response_increment(ptr);
118 return MEMCACHED_STAT;
119 }
120 else if (buffer[1] == 'E')
121 return MEMCACHED_SERVER_ERROR;
122 else if (buffer[1] == 'T')
123 return MEMCACHED_STORED;
124 else
125 {
126 WATCHPOINT_STRING(buffer);
127 WATCHPOINT_ASSERT(0);
128 memcached_io_reset(ptr);
129 return MEMCACHED_UNKNOWN_READ_FAILURE;
130 }
131 }
132 case 'D': /* DELETED */
133 return MEMCACHED_DELETED;
134 case 'N': /* NOT_FOUND */
135 {
136 if (buffer[4] == 'F')
137 return MEMCACHED_NOTFOUND;
138 else if (buffer[4] == 'S')
139 return MEMCACHED_NOTSTORED;
140 else
141 {
142 memcached_io_reset(ptr);
143 return MEMCACHED_UNKNOWN_READ_FAILURE;
144 }
145 }
146 case 'E': /* PROTOCOL ERROR or END */
147 {
148 if (buffer[1] == 'N')
149 return MEMCACHED_END;
150 else if (buffer[1] == 'R')
151 {
152 memcached_io_reset(ptr);
153 return MEMCACHED_PROTOCOL_ERROR;
154 }
155 else if (buffer[1] == 'X')
156 {
157 memcached_io_reset(ptr);
158 return MEMCACHED_DATA_EXISTS;
159 }
160 else
161 {
162 memcached_io_reset(ptr);
163 return MEMCACHED_UNKNOWN_READ_FAILURE;
164 }
165 }
166 case 'C': /* CLIENT ERROR */
167 memcached_io_reset(ptr);
168 return MEMCACHED_CLIENT_ERROR;
169 default:
170 memcached_io_reset(ptr);
171 return MEMCACHED_UNKNOWN_READ_FAILURE;
172
173 }
174
175 return MEMCACHED_SUCCESS;
176 }
177
178 char *memcached_result_value(memcached_result_st *ptr)
179 {
180 memcached_string_st *sptr= &ptr->value;
181 return memcached_string_value(sptr);
182 }
183
184 size_t memcached_result_length(memcached_result_st *ptr)
185 {
186 memcached_string_st *sptr= &ptr->value;
187 return memcached_string_length(sptr);
188 }
189
190 /**
191 * Read a given number of bytes from the server and place it into a specific
192 * buffer. Reset the IO channel or this server if an error occurs.
193 */
194 static memcached_return safe_read(memcached_server_st *ptr, void *dta,
195 size_t size)
196 {
197 int offset= 0;
198 char *data= dta;
199 while (offset < size)
200 {
201 ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
202 if (nread <= 0)
203 {
204 memcached_io_reset(ptr);
205 return MEMCACHED_UNKNOWN_READ_FAILURE;
206 }
207 offset += nread;
208 }
209
210 return MEMCACHED_SUCCESS;
211 }
212
213 static memcached_return binary_response(memcached_server_st *ptr,
214 char *buffer,
215 size_t buffer_length,
216 memcached_result_st *result)
217 {
218 protocol_binary_response_header header;
219 memcached_server_response_decrement(ptr);
220
221 unlikely (safe_read(ptr, &header.bytes,
222 sizeof(header.bytes)) != MEMCACHED_SUCCESS)
223 return MEMCACHED_UNKNOWN_READ_FAILURE;
224
225 unlikely (header.response.magic != PROTOCOL_BINARY_RES)
226 {
227 memcached_io_reset(ptr);
228 return MEMCACHED_PROTOCOL_ERROR;
229 }
230
231 /*
232 ** Convert the header to host local endian!
233 */
234 header.response.keylen= ntohs(header.response.keylen);
235 header.response.status= ntohs(header.response.status);
236 header.response.bodylen= ntohl(header.response.bodylen);
237 header.response.cas= ntohll(header.response.cas);
238 uint32_t bodylen= header.response.bodylen;
239
240 if (header.response.status == 0)
241 {
242 if ((header.response.opcode == PROTOCOL_BINARY_CMD_GETK) ||
243 (header.response.opcode == PROTOCOL_BINARY_CMD_GETKQ))
244 {
245 uint16_t keylen= header.response.keylen;
246 memcached_result_reset(result);
247 result->cas= header.response.cas;
248
249 if (safe_read(ptr, &result->flags,
250 sizeof(result->flags)) != MEMCACHED_SUCCESS)
251 {
252 return MEMCACHED_UNKNOWN_READ_FAILURE;
253 }
254 result->flags= ntohl(result->flags);
255 bodylen -= header.response.extlen;
256
257 result->key_length= keylen;
258 if (safe_read(ptr, result->key, keylen) != MEMCACHED_SUCCESS)
259 {
260 return MEMCACHED_UNKNOWN_READ_FAILURE;
261 }
262
263 bodylen -= keylen;
264 if (memcached_string_check(&result->value,
265 bodylen) != MEMCACHED_SUCCESS)
266 {
267 memcached_io_reset(ptr);
268 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
269 }
270
271 char *vptr= memcached_string_value(&result->value);
272 if (safe_read(ptr, vptr, bodylen) != MEMCACHED_SUCCESS)
273 {
274 return MEMCACHED_UNKNOWN_READ_FAILURE;
275 }
276
277 memcached_string_set_length(&result->value, bodylen);
278 }
279 else if ((header.response.opcode == PROTOCOL_BINARY_CMD_INCREMENT) ||
280 (header.response.opcode == PROTOCOL_BINARY_CMD_DECREMENT))
281 {
282 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
283 {
284 return MEMCACHED_PROTOCOL_ERROR;
285 }
286
287 WATCHPOINT_ASSERT(bodylen == buffer_length);
288 uint64_t val;
289 if (safe_read(ptr, &val, sizeof(val)) != MEMCACHED_SUCCESS)
290 {
291 return MEMCACHED_UNKNOWN_READ_FAILURE;
292 }
293
294 val= ntohll(val);
295 memcpy(buffer, &val, sizeof(val));
296 }
297 else if (header.response.opcode == PROTOCOL_BINARY_CMD_VERSION)
298 {
299 memset(buffer, 0, buffer_length);
300 if (bodylen >= buffer_length)
301 /* not enough space in buffer.. should not happen... */
302 return MEMCACHED_UNKNOWN_READ_FAILURE;
303 else
304 safe_read(ptr, buffer, bodylen);
305 }
306 else if ((header.response.opcode == PROTOCOL_BINARY_CMD_FLUSH) ||
307 (header.response.opcode == PROTOCOL_BINARY_CMD_QUIT) ||
308 (header.response.opcode == PROTOCOL_BINARY_CMD_SET) ||
309 (header.response.opcode == PROTOCOL_BINARY_CMD_ADD) ||
310 (header.response.opcode == PROTOCOL_BINARY_CMD_REPLACE) ||
311 (header.response.opcode == PROTOCOL_BINARY_CMD_APPEND) ||
312 (header.response.opcode == PROTOCOL_BINARY_CMD_PREPEND) ||
313 (header.response.opcode == PROTOCOL_BINARY_CMD_DELETE))
314 {
315 WATCHPOINT_ASSERT(bodylen == 0);
316 return MEMCACHED_SUCCESS;
317 }
318 else if (header.response.opcode == PROTOCOL_BINARY_CMD_NOOP)
319 {
320 WATCHPOINT_ASSERT(bodylen == 0);
321 return MEMCACHED_END;
322 }
323 else if (header.response.opcode == PROTOCOL_BINARY_CMD_STAT)
324 {
325 if (bodylen == 0)
326 return MEMCACHED_END;
327 else if (bodylen + 1 > buffer_length)
328 /* not enough space in buffer.. should not happen... */
329 return MEMCACHED_UNKNOWN_READ_FAILURE;
330 else
331 {
332 size_t keylen= header.response.keylen;
333 memset(buffer, 0, buffer_length);
334 safe_read(ptr, buffer, keylen);
335 safe_read(ptr, buffer + keylen + 1, bodylen - keylen);
336 }
337 }
338 else
339 {
340 /* Command not implemented yet! */
341 WATCHPOINT_ASSERT(0);
342 memcached_io_reset(ptr);
343 return MEMCACHED_PROTOCOL_ERROR;
344 }
345 }
346 else if (header.response.bodylen)
347 {
348 /* What should I do with the error message??? just discard it for now */
349 char hole[1024];
350 while (bodylen > 0)
351 {
352 size_t nr= (bodylen > sizeof(hole)) ? sizeof(hole) : bodylen;
353 safe_read(ptr, hole, nr);
354 bodylen -= nr;
355 }
356 }
357
358 memcached_return rc= MEMCACHED_SUCCESS;
359 unlikely(header.response.status != 0)
360 switch (header.response.status)
361 {
362 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
363 rc= MEMCACHED_NOTFOUND;
364 break;
365 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
366 rc= MEMCACHED_DATA_EXISTS;
367 break;
368 case PROTOCOL_BINARY_RESPONSE_E2BIG:
369 case PROTOCOL_BINARY_RESPONSE_EINVAL:
370 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
371 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
372 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
373 default:
374 /* @todo fix the error mappings */
375 rc= MEMCACHED_PROTOCOL_ERROR;
376 break;
377 }
378
379 return rc;
380 }