Fix for decrement
[awesomized/libmemcached] / libmemcached / memcached_response.c
1 /*
2 Memcached library
3
4 memcached_response() is used to determine the return result
5 from an issued command.
6 */
7
8 #include "common.h"
9 #include "memcached_io.h"
10
11 static memcached_return binary_response(memcached_server_st *ptr,
12 char *buffer, size_t buffer_length,
13 memcached_result_st *result);
14
15 memcached_return memcached_response(memcached_server_st *ptr,
16 char *buffer, size_t buffer_length,
17 memcached_result_st *result)
18 {
19 unsigned int x;
20 size_t send_length;
21 char *buffer_ptr;
22 unsigned int max_messages;
23
24
25 send_length= 0;
26 /* UDP at the moment is odd...*/
27 if (ptr->type == MEMCACHED_CONNECTION_UDP)
28 {
29 char buffer[8];
30 ssize_t read_length;
31
32 return MEMCACHED_SUCCESS;
33
34 read_length= memcached_io_read(ptr, buffer, 8);
35 }
36
37 /* We may have old commands in the buffer not set, first purge */
38 if (ptr->root->flags & MEM_NO_BLOCK)
39 (void)memcached_io_write(ptr, NULL, 0, 1);
40
41 if (ptr->root->flags & MEM_BINARY_PROTOCOL)
42 return binary_response(ptr, buffer, buffer_length, result);
43
44 max_messages= memcached_server_response_count(ptr);
45 for (x= 0; x < max_messages; x++)
46 {
47 size_t total_length= 0;
48 buffer_ptr= buffer;
49
50
51 while (1)
52 {
53 ssize_t read_length;
54
55 read_length= memcached_io_read(ptr, buffer_ptr, 1);
56 WATCHPOINT_ASSERT(*buffer_ptr != '\0');
57
58 if (read_length != 1)
59 {
60 memcached_io_reset(ptr);
61 return MEMCACHED_UNKNOWN_READ_FAILURE;
62 }
63
64 if (*buffer_ptr == '\n')
65 break;
66 else
67 buffer_ptr++;
68
69 total_length++;
70 WATCHPOINT_ASSERT(total_length <= buffer_length);
71
72 if (total_length >= buffer_length)
73 {
74 memcached_io_reset(ptr);
75 return MEMCACHED_UNKNOWN_READ_FAILURE;
76 }
77 }
78 buffer_ptr++;
79 *buffer_ptr= 0;
80
81 memcached_server_response_decrement(ptr);
82 }
83
84 switch(buffer[0])
85 {
86 case 'V': /* VALUE || VERSION */
87 if (buffer[1] == 'A') /* VALUE */
88 {
89 memcached_return rc;
90
91 /* We add back in one because we will need to search for END */
92 memcached_server_response_increment(ptr);
93 if (result)
94 rc= value_fetch(ptr, buffer, result);
95 else
96 rc= value_fetch(ptr, buffer, &ptr->root->result);
97
98 return rc;
99 }
100 else if (buffer[1] == 'E') /* VERSION */
101 {
102 return MEMCACHED_SUCCESS;
103 }
104 else
105 {
106 WATCHPOINT_STRING(buffer);
107 WATCHPOINT_ASSERT(0);
108 memcached_io_reset(ptr);
109 return MEMCACHED_UNKNOWN_READ_FAILURE;
110 }
111 case 'O': /* OK */
112 return MEMCACHED_SUCCESS;
113 case 'S': /* STORED STATS SERVER_ERROR */
114 {
115 if (buffer[2] == 'A') /* STORED STATS */
116 {
117 memcached_server_response_increment(ptr);
118 return MEMCACHED_STAT;
119 }
120 else if (buffer[1] == 'E')
121 return MEMCACHED_SERVER_ERROR;
122 else if (buffer[1] == 'T')
123 return MEMCACHED_STORED;
124 else
125 {
126 WATCHPOINT_STRING(buffer);
127 WATCHPOINT_ASSERT(0);
128 memcached_io_reset(ptr);
129 return MEMCACHED_UNKNOWN_READ_FAILURE;
130 }
131 }
132 case 'D': /* DELETED */
133 return MEMCACHED_DELETED;
134 case 'N': /* NOT_FOUND */
135 {
136 if (buffer[4] == 'F')
137 return MEMCACHED_NOTFOUND;
138 else if (buffer[4] == 'S')
139 return MEMCACHED_NOTSTORED;
140 else
141 {
142 memcached_io_reset(ptr);
143 return MEMCACHED_UNKNOWN_READ_FAILURE;
144 }
145 }
146 case 'E': /* PROTOCOL ERROR or END */
147 {
148 if (buffer[1] == 'N')
149 return MEMCACHED_END;
150 else if (buffer[1] == 'R')
151 {
152 memcached_io_reset(ptr);
153 return MEMCACHED_PROTOCOL_ERROR;
154 }
155 else if (buffer[1] == 'X')
156 {
157 return MEMCACHED_DATA_EXISTS;
158 }
159 else
160 {
161 memcached_io_reset(ptr);
162 return MEMCACHED_UNKNOWN_READ_FAILURE;
163 }
164 }
165 case 'C': /* CLIENT ERROR */
166 memcached_io_reset(ptr);
167 return MEMCACHED_CLIENT_ERROR;
168 default:
169 {
170 unsigned long long auto_return_value;
171
172 if (sscanf(buffer, "%llu", &auto_return_value) == 1)
173 return MEMCACHED_SUCCESS;
174
175 memcached_io_reset(ptr);
176
177 return MEMCACHED_UNKNOWN_READ_FAILURE;
178 }
179
180 }
181
182 return MEMCACHED_SUCCESS;
183 }
184
185 char *memcached_result_value(memcached_result_st *ptr)
186 {
187 memcached_string_st *sptr= &ptr->value;
188 return memcached_string_value(sptr);
189 }
190
191 size_t memcached_result_length(memcached_result_st *ptr)
192 {
193 memcached_string_st *sptr= &ptr->value;
194 return memcached_string_length(sptr);
195 }
196
197 /**
198 * Read a given number of bytes from the server and place it into a specific
199 * buffer. Reset the IO channel or this server if an error occurs.
200 */
201 static memcached_return safe_read(memcached_server_st *ptr, void *dta,
202 size_t size)
203 {
204 size_t offset= 0;
205 char *data= dta;
206
207 while (offset < size)
208 {
209 ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
210 if (nread <= 0)
211 {
212 memcached_io_reset(ptr);
213 return MEMCACHED_UNKNOWN_READ_FAILURE;
214 }
215 offset += nread;
216 }
217
218 return MEMCACHED_SUCCESS;
219 }
220
221 static memcached_return binary_response(memcached_server_st *ptr,
222 char *buffer,
223 size_t buffer_length,
224 memcached_result_st *result)
225 {
226 protocol_binary_response_header header;
227 memcached_server_response_decrement(ptr);
228
229 unlikely (safe_read(ptr, &header.bytes,
230 sizeof(header.bytes)) != MEMCACHED_SUCCESS)
231 return MEMCACHED_UNKNOWN_READ_FAILURE;
232
233 unlikely (header.response.magic != PROTOCOL_BINARY_RES)
234 {
235 memcached_io_reset(ptr);
236 return MEMCACHED_PROTOCOL_ERROR;
237 }
238
239 /*
240 ** Convert the header to host local endian!
241 */
242 header.response.keylen= ntohs(header.response.keylen);
243 header.response.status= ntohs(header.response.status);
244 header.response.bodylen= ntohl(header.response.bodylen);
245 header.response.cas= ntohll(header.response.cas);
246 uint32_t bodylen= header.response.bodylen;
247
248 if (header.response.status == 0)
249 {
250 switch (header.response.opcode)
251 {
252 case PROTOCOL_BINARY_CMD_GETK:
253 case PROTOCOL_BINARY_CMD_GETKQ:
254 {
255 uint16_t keylen= header.response.keylen;
256 memcached_result_reset(result);
257 result->cas= header.response.cas;
258
259 if (safe_read(ptr, &result->flags,
260 sizeof(result->flags)) != MEMCACHED_SUCCESS)
261 {
262 return MEMCACHED_UNKNOWN_READ_FAILURE;
263 }
264 result->flags= ntohl(result->flags);
265 bodylen -= header.response.extlen;
266
267 result->key_length= keylen;
268 if (safe_read(ptr, result->key, keylen) != MEMCACHED_SUCCESS)
269 {
270 return MEMCACHED_UNKNOWN_READ_FAILURE;
271 }
272
273 bodylen -= keylen;
274 if (memcached_string_check(&result->value,
275 bodylen) != MEMCACHED_SUCCESS)
276 {
277 memcached_io_reset(ptr);
278 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
279 }
280
281 char *vptr= memcached_string_value(&result->value);
282 if (safe_read(ptr, vptr, bodylen) != MEMCACHED_SUCCESS)
283 {
284 return MEMCACHED_UNKNOWN_READ_FAILURE;
285 }
286
287 memcached_string_set_length(&result->value, bodylen);
288 }
289 break;
290 case PROTOCOL_BINARY_CMD_INCREMENT:
291 case PROTOCOL_BINARY_CMD_DECREMENT:
292 {
293 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
294 {
295 return MEMCACHED_PROTOCOL_ERROR;
296 }
297
298 WATCHPOINT_ASSERT(bodylen == buffer_length);
299 uint64_t val;
300 if (safe_read(ptr, &val, sizeof(val)) != MEMCACHED_SUCCESS)
301 {
302 return MEMCACHED_UNKNOWN_READ_FAILURE;
303 }
304
305 val= ntohll(val);
306 memcpy(buffer, &val, sizeof(val));
307 }
308 break;
309 case PROTOCOL_BINARY_CMD_VERSION:
310 {
311 memset(buffer, 0, buffer_length);
312 if (bodylen >= buffer_length)
313 /* not enough space in buffer.. should not happen... */
314 return MEMCACHED_UNKNOWN_READ_FAILURE;
315 else
316 safe_read(ptr, buffer, bodylen);
317 }
318 break;
319 case PROTOCOL_BINARY_CMD_FLUSH:
320 case PROTOCOL_BINARY_CMD_QUIT:
321 case PROTOCOL_BINARY_CMD_SET:
322 case PROTOCOL_BINARY_CMD_ADD:
323 case PROTOCOL_BINARY_CMD_REPLACE:
324 case PROTOCOL_BINARY_CMD_APPEND:
325 case PROTOCOL_BINARY_CMD_PREPEND:
326 case PROTOCOL_BINARY_CMD_DELETE:
327 {
328 WATCHPOINT_ASSERT(bodylen == 0);
329 return MEMCACHED_SUCCESS;
330 }
331 break;
332 case PROTOCOL_BINARY_CMD_NOOP:
333 {
334 WATCHPOINT_ASSERT(bodylen == 0);
335 return MEMCACHED_END;
336 }
337 break;
338 case PROTOCOL_BINARY_CMD_STAT:
339 {
340 if (bodylen == 0)
341 return MEMCACHED_END;
342 else if (bodylen + 1 > buffer_length)
343 /* not enough space in buffer.. should not happen... */
344 return MEMCACHED_UNKNOWN_READ_FAILURE;
345 else
346 {
347 size_t keylen= header.response.keylen;
348 memset(buffer, 0, buffer_length);
349 safe_read(ptr, buffer, keylen);
350 safe_read(ptr, buffer + keylen + 1, bodylen - keylen);
351 }
352 }
353 break;
354 default:
355 {
356 /* Command not implemented yet! */
357 WATCHPOINT_ASSERT(0);
358 memcached_io_reset(ptr);
359 return MEMCACHED_PROTOCOL_ERROR;
360 }
361 }
362 }
363 else if (header.response.bodylen)
364 {
365 /* What should I do with the error message??? just discard it for now */
366 char buffer[SMALL_STRING_LEN];
367 while (bodylen > 0)
368 {
369 size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen;
370 safe_read(ptr, buffer, nr);
371 bodylen -= nr;
372 }
373 }
374
375 memcached_return rc= MEMCACHED_SUCCESS;
376 unlikely(header.response.status != 0)
377 switch (header.response.status)
378 {
379 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
380 rc= MEMCACHED_NOTFOUND;
381 break;
382 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
383 rc= MEMCACHED_DATA_EXISTS;
384 break;
385 case PROTOCOL_BINARY_RESPONSE_E2BIG:
386 case PROTOCOL_BINARY_RESPONSE_EINVAL:
387 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
388 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
389 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
390 default:
391 /* @todo fix the error mappings */
392 rc= MEMCACHED_PROTOCOL_ERROR;
393 break;
394 }
395
396 return rc;
397 }