Import new purge work from trond
[awesomized/libmemcached] / libmemcached / memcached_response.c
1 /*
2 Memcached library
3
4 memcached_response() is used to determine the return result
5 from an issued command.
6 */
7
8 #include "common.h"
9 #include "memcached_io.h"
10
11 static memcached_return textual_read_one_response(memcached_server_st *ptr,
12 char *buffer, size_t buffer_length,
13 memcached_result_st *result);
14 static memcached_return binary_read_one_response(memcached_server_st *ptr,
15 char *buffer, size_t buffer_length,
16 memcached_result_st *result);
17
18 memcached_return memcached_read_one_response(memcached_server_st *ptr,
19 char *buffer, size_t buffer_length,
20 memcached_result_st *result)
21 {
22 memcached_server_response_decrement(ptr);
23
24 memcached_return rc;
25 if (ptr->root->flags & MEM_BINARY_PROTOCOL)
26 rc= binary_read_one_response(ptr, buffer, buffer_length, result);
27 else
28 rc= textual_read_one_response(ptr, buffer, buffer_length, result);
29
30 unlikely(rc == MEMCACHED_UNKNOWN_READ_FAILURE ||
31 rc == MEMCACHED_PROTOCOL_ERROR ||
32 rc == MEMCACHED_CLIENT_ERROR ||
33 rc == MEMCACHED_MEMORY_ALLOCATION_FAILURE)
34 memcached_io_reset(ptr);
35
36 return rc;
37 }
38
39 memcached_return memcached_response(memcached_server_st *ptr,
40 char *buffer, size_t buffer_length,
41 memcached_result_st *result)
42 {
43 /* We may have old commands in the buffer not set, first purge */
44 if (ptr->root->flags & MEM_NO_BLOCK)
45 (void)memcached_io_write(ptr, NULL, 0, 1);
46
47 /*
48 * The previous implementation purged all pending requests and just
49 * returned the last one. Purge all pending messages to ensure backwards
50 * compatibility.
51 */
52 if ((ptr->root->flags & MEM_BINARY_PROTOCOL) == 0)
53 while (memcached_server_response_count(ptr) > 1) {
54 memcached_return rc= memcached_read_one_response(ptr, buffer, buffer_length, result);
55
56 unlikely (rc != MEMCACHED_END &&
57 rc != MEMCACHED_STORED &&
58 rc != MEMCACHED_SUCCESS &&
59 rc != MEMCACHED_STAT &&
60 rc != MEMCACHED_DELETED &&
61 rc != MEMCACHED_NOTFOUND &&
62 rc != MEMCACHED_NOTSTORED &&
63 rc != MEMCACHED_DATA_EXISTS)
64 return rc;
65 }
66
67 return memcached_read_one_response(ptr, buffer, buffer_length, result);
68 }
69
70 static memcached_return memcached_readline(memcached_server_st *ptr,
71 char *buffer,
72 size_t size)
73 {
74 bool line_complete= false;
75 char *buffer_ptr= buffer;
76 int total_nr=0;
77
78 while (!line_complete)
79 {
80 if (memcached_io_read(ptr, buffer_ptr, 1) != 1)
81 return MEMCACHED_UNKNOWN_READ_FAILURE;
82
83 WATCHPOINT_ASSERT(*buffer_ptr != '\0');
84 if (*buffer_ptr == '\n')
85 line_complete=true;
86 else
87 {
88 ++buffer_ptr;
89 ++total_nr;
90 }
91
92 if (total_nr == size)
93 return MEMCACHED_PROTOCOL_ERROR;
94 }
95
96 return MEMCACHED_SUCCESS;
97 }
98
99 static memcached_return textual_read_one_response(memcached_server_st *ptr,
100 char *buffer, size_t buffer_length,
101 memcached_result_st *result)
102 {
103 memcached_return rc=memcached_readline(ptr, buffer, buffer_length);
104 if (rc != MEMCACHED_SUCCESS)
105 return rc;
106
107 switch(buffer[0])
108 {
109 case 'V': /* VALUE || VERSION */
110 if (buffer[1] == 'A') /* VALUE */
111 {
112 memcached_return rc;
113
114 /* We add back in one because we will need to search for END */
115 memcached_server_response_increment(ptr);
116 if (result)
117 rc= value_fetch(ptr, buffer, result);
118 else
119 rc= value_fetch(ptr, buffer, &ptr->root->result);
120
121 return rc;
122 }
123 else if (buffer[1] == 'E') /* VERSION */
124 {
125 return MEMCACHED_SUCCESS;
126 }
127 else
128 {
129 WATCHPOINT_STRING(buffer);
130 WATCHPOINT_ASSERT(0);
131 return MEMCACHED_UNKNOWN_READ_FAILURE;
132 }
133 case 'O': /* OK */
134 return MEMCACHED_SUCCESS;
135 case 'S': /* STORED STATS SERVER_ERROR */
136 {
137 if (buffer[2] == 'A') /* STORED STATS */
138 {
139 memcached_server_response_increment(ptr);
140 return MEMCACHED_STAT;
141 }
142 else if (buffer[1] == 'E')
143 return MEMCACHED_SERVER_ERROR;
144 else if (buffer[1] == 'T')
145 return MEMCACHED_STORED;
146 else
147 {
148 WATCHPOINT_STRING(buffer);
149 WATCHPOINT_ASSERT(0);
150 return MEMCACHED_UNKNOWN_READ_FAILURE;
151 }
152 }
153 case 'D': /* DELETED */
154 return MEMCACHED_DELETED;
155 case 'N': /* NOT_FOUND */
156 {
157 if (buffer[4] == 'F')
158 return MEMCACHED_NOTFOUND;
159 else if (buffer[4] == 'S')
160 return MEMCACHED_NOTSTORED;
161 else
162 return MEMCACHED_UNKNOWN_READ_FAILURE;
163 }
164 case 'E': /* PROTOCOL ERROR or END */
165 {
166 if (buffer[1] == 'N')
167 return MEMCACHED_END;
168 else if (buffer[1] == 'R')
169 return MEMCACHED_PROTOCOL_ERROR;
170 else if (buffer[1] == 'X')
171 return MEMCACHED_DATA_EXISTS;
172 else
173 return MEMCACHED_UNKNOWN_READ_FAILURE;
174 }
175 case 'C': /* CLIENT ERROR */
176 return MEMCACHED_CLIENT_ERROR;
177 default:
178 {
179 unsigned long long auto_return_value;
180
181 if (sscanf(buffer, "%llu", &auto_return_value) == 1)
182 return MEMCACHED_SUCCESS;
183
184 return MEMCACHED_UNKNOWN_READ_FAILURE;
185 }
186 }
187
188 return MEMCACHED_SUCCESS;
189 }
190
191 char *memcached_result_value(memcached_result_st *ptr)
192 {
193 memcached_string_st *sptr= &ptr->value;
194 return memcached_string_value(sptr);
195 }
196
197 size_t memcached_result_length(memcached_result_st *ptr)
198 {
199 memcached_string_st *sptr= &ptr->value;
200 return memcached_string_length(sptr);
201 }
202
203 static memcached_return binary_read_one_response(memcached_server_st *ptr,
204 char *buffer, size_t buffer_length,
205 memcached_result_st *result)
206 {
207 protocol_binary_response_header header;
208
209 unlikely (memcached_safe_read(ptr, &header.bytes,
210 sizeof(header.bytes)) != MEMCACHED_SUCCESS)
211 return MEMCACHED_UNKNOWN_READ_FAILURE;
212
213 unlikely (header.response.magic != PROTOCOL_BINARY_RES)
214 return MEMCACHED_PROTOCOL_ERROR;
215
216 /*
217 ** Convert the header to host local endian!
218 */
219 header.response.keylen= ntohs(header.response.keylen);
220 header.response.status= ntohs(header.response.status);
221 header.response.bodylen= ntohl(header.response.bodylen);
222 header.response.cas= ntohll(header.response.cas);
223 uint32_t bodylen= header.response.bodylen;
224
225 if (header.response.status == 0)
226 {
227 switch (header.response.opcode)
228 {
229 case PROTOCOL_BINARY_CMD_GETK:
230 case PROTOCOL_BINARY_CMD_GETKQ:
231 {
232 uint16_t keylen= header.response.keylen;
233 memcached_result_reset(result);
234 result->cas= header.response.cas;
235
236 if (memcached_safe_read(ptr, &result->flags,
237 sizeof (result->flags)) != MEMCACHED_SUCCESS)
238 return MEMCACHED_UNKNOWN_READ_FAILURE;
239
240 result->flags= ntohl(result->flags);
241 bodylen -= header.response.extlen;
242
243 result->key_length= keylen;
244 if (memcached_safe_read(ptr, result->key, keylen) != MEMCACHED_SUCCESS)
245 return MEMCACHED_UNKNOWN_READ_FAILURE;
246
247 bodylen -= keylen;
248 if (memcached_string_check(&result->value,
249 bodylen) != MEMCACHED_SUCCESS)
250 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
251
252 char *vptr= memcached_string_value(&result->value);
253 if (memcached_safe_read(ptr, vptr, bodylen) != MEMCACHED_SUCCESS)
254 return MEMCACHED_UNKNOWN_READ_FAILURE;
255
256 memcached_string_set_length(&result->value, bodylen);
257 }
258 break;
259 case PROTOCOL_BINARY_CMD_INCREMENT:
260 case PROTOCOL_BINARY_CMD_DECREMENT:
261 {
262 if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
263 return MEMCACHED_PROTOCOL_ERROR;
264
265 WATCHPOINT_ASSERT(bodylen == buffer_length);
266 uint64_t val;
267 if (memcached_safe_read(ptr, &val, sizeof(val)) != MEMCACHED_SUCCESS)
268 return MEMCACHED_UNKNOWN_READ_FAILURE;
269
270 val= ntohll(val);
271 memcpy(buffer, &val, sizeof(val));
272 }
273 break;
274 case PROTOCOL_BINARY_CMD_VERSION:
275 {
276 memset(buffer, 0, buffer_length);
277 if (bodylen >= buffer_length)
278 /* not enough space in buffer.. should not happen... */
279 return MEMCACHED_UNKNOWN_READ_FAILURE;
280 else if (memcached_safe_read(ptr, buffer, bodylen) != MEMCACHED_SUCCESS)
281 return MEMCACHED_UNKNOWN_READ_FAILURE;
282 }
283 break;
284 case PROTOCOL_BINARY_CMD_FLUSH:
285 case PROTOCOL_BINARY_CMD_QUIT:
286 case PROTOCOL_BINARY_CMD_SET:
287 case PROTOCOL_BINARY_CMD_ADD:
288 case PROTOCOL_BINARY_CMD_REPLACE:
289 case PROTOCOL_BINARY_CMD_APPEND:
290 case PROTOCOL_BINARY_CMD_PREPEND:
291 case PROTOCOL_BINARY_CMD_DELETE:
292 {
293 WATCHPOINT_ASSERT(bodylen == 0);
294 return MEMCACHED_SUCCESS;
295 }
296 break;
297 case PROTOCOL_BINARY_CMD_NOOP:
298 {
299 WATCHPOINT_ASSERT(bodylen == 0);
300 return MEMCACHED_END;
301 }
302 break;
303 case PROTOCOL_BINARY_CMD_STAT:
304 {
305 if (bodylen == 0)
306 return MEMCACHED_END;
307 else if (bodylen + 1 > buffer_length)
308 /* not enough space in buffer.. should not happen... */
309 return MEMCACHED_UNKNOWN_READ_FAILURE;
310 else
311 {
312 size_t keylen= header.response.keylen;
313 memset(buffer, 0, buffer_length);
314 if (memcached_safe_read(ptr, buffer, keylen) != MEMCACHED_SUCCESS ||
315 memcached_safe_read(ptr, buffer + keylen + 1,
316 bodylen - keylen) != MEMCACHED_SUCCESS)
317 return MEMCACHED_UNKNOWN_READ_FAILURE;
318 }
319 }
320 break;
321 default:
322 {
323 /* Command not implemented yet! */
324 WATCHPOINT_ASSERT(0);
325 return MEMCACHED_PROTOCOL_ERROR;
326 }
327 }
328 }
329 else if (header.response.bodylen)
330 {
331 /* What should I do with the error message??? just discard it for now */
332 char buffer[SMALL_STRING_LEN];
333 while (bodylen > 0)
334 {
335 size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen;
336 if (memcached_safe_read(ptr, buffer, nr) != MEMCACHED_SUCCESS)
337 return MEMCACHED_UNKNOWN_READ_FAILURE;
338 bodylen -= nr;
339 }
340 }
341
342 memcached_return rc= MEMCACHED_SUCCESS;
343 unlikely(header.response.status != 0)
344 switch (header.response.status)
345 {
346 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
347 rc= MEMCACHED_NOTFOUND;
348 break;
349 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
350 rc= MEMCACHED_DATA_EXISTS;
351 break;
352 case PROTOCOL_BINARY_RESPONSE_E2BIG:
353 case PROTOCOL_BINARY_RESPONSE_EINVAL:
354 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
355 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
356 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
357 default:
358 /* @todo fix the error mappings */
359 rc= MEMCACHED_PROTOCOL_ERROR;
360 break;
361 }
362
363 return rc;
364 }