Update protocol due to review comments:
[awesomized/libmemcached] / example / interface_v0.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * This file contains an implementation of the callback interface for level 0
4 * in the protocol library. You might want to have your copy of the protocol
5 * specification next to your coffee ;-)
6 */
7 #include "config.h"
8 #include <assert.h>
9 #include <sys/types.h>
10 #include <sys/socket.h>
11 #include <netdb.h>
12 #include <netinet/tcp.h>
13 #include <stdio.h>
14 #include <unistd.h>
15 #include <fcntl.h>
16 #include <errno.h>
17 #include <stdlib.h>
18 #include <string.h>
19
20 #include <libmemcached/protocol_handler.h>
21 #include <libmemcached/byteorder.h>
22 #include "storage.h"
23
24 static protocol_binary_response_status noop_command_handler(const void *cookie,
25 protocol_binary_request_header *header,
26 memcached_binary_protocol_raw_response_handler response_handler)
27 {
28 protocol_binary_response_no_extras response= {
29 .message.header.response= {
30 .magic= PROTOCOL_BINARY_RES,
31 .opcode= PROTOCOL_BINARY_CMD_NOOP,
32 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
33 .opaque= header->request.opaque
34 }
35 };
36
37 return response_handler(cookie, header, (void*)&response);
38 }
39
40 static protocol_binary_response_status quit_command_handler(const void *cookie,
41 protocol_binary_request_header *header,
42 memcached_binary_protocol_raw_response_handler response_handler)
43 {
44 protocol_binary_response_no_extras response= {
45 .message.header.response= {
46 .magic= PROTOCOL_BINARY_RES,
47 .opcode= PROTOCOL_BINARY_CMD_QUIT,
48 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
49 .opaque= header->request.opaque
50 }
51 };
52
53 if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
54 response_handler(cookie, header, (void*)&response);
55
56 /* I need a better way to signal to close the connection */
57 return PROTOCOL_BINARY_RESPONSE_EIO;
58 }
59
60 static protocol_binary_response_status get_command_handler(const void *cookie,
61 protocol_binary_request_header *header,
62 memcached_binary_protocol_raw_response_handler response_handler)
63 {
64 uint8_t opcode= header->request.opcode;
65 union {
66 protocol_binary_response_get response;
67 char buffer[4096];
68 } msg= {
69 .response.message.header.response= {
70 .magic= PROTOCOL_BINARY_RES,
71 .opcode= opcode,
72 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
73 .opaque= header->request.opaque
74 }
75 };
76
77 struct item *item= get_item(header + 1, ntohs(header->request.keylen));
78 if (item)
79 {
80 msg.response.message.body.flags= htonl(item->flags);
81 char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4);
82 uint32_t bodysize= 4;
83 msg.response.message.header.response.cas= htonll(item->cas);
84 if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ)
85 {
86 memcpy(ptr, item->key, item->nkey);
87 msg.response.message.header.response.keylen= htons((uint16_t)item->nkey);
88 ptr += item->nkey;
89 bodysize += (uint32_t)item->nkey;
90 }
91 memcpy(ptr, item->data, item->size);
92 bodysize += (uint32_t)item->size;
93 msg.response.message.header.response.bodylen= htonl(bodysize);
94 msg.response.message.header.response.extlen= 4;
95
96 release_item(item);
97 return response_handler(cookie, header, (void*)&msg);
98 }
99 else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK)
100 {
101 msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
102 return response_handler(cookie, header, (void*)&msg);
103 }
104
105 /* Q shouldn't report a miss ;-) */
106 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
107 }
108
109 static protocol_binary_response_status delete_command_handler(const void *cookie,
110 protocol_binary_request_header *header,
111 memcached_binary_protocol_raw_response_handler response_handler)
112 {
113 size_t keylen= ntohs(header->request.keylen);
114 char *key= ((char*)header) + sizeof(*header);
115 protocol_binary_response_no_extras response= {
116 .message.header.response= {
117 .magic= PROTOCOL_BINARY_RES,
118 .opcode= header->request.opcode,
119 .opaque= header->request.opaque
120 }
121 };
122
123 if (!delete_item(key, keylen))
124 {
125 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
126 return response_handler(cookie, header, (void*)&response);
127 }
128 else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
129 {
130 /* DELETEQ doesn't want success response */
131 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
132 return response_handler(cookie, header, (void*)&response);
133 }
134
135 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
136 }
137
138 static protocol_binary_response_status flush_command_handler(const void *cookie,
139 protocol_binary_request_header *header,
140 memcached_binary_protocol_raw_response_handler response_handler)
141 {
142 uint8_t opcode= header->request.opcode;
143
144 /* @fixme sett inn when! */
145 flush(0);
146
147 if (opcode == PROTOCOL_BINARY_CMD_FLUSH)
148 {
149 protocol_binary_response_no_extras response= {
150 .message.header.response= {
151 .magic= PROTOCOL_BINARY_RES,
152 .opcode= opcode,
153 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
154 .opaque= header->request.opaque
155 }
156 };
157 return response_handler(cookie, header, (void*)&response);
158 }
159
160 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
161 }
162
163 static protocol_binary_response_status arithmetic_command_handler(const void *cookie,
164 protocol_binary_request_header *header,
165 memcached_binary_protocol_raw_response_handler response_handler)
166 {
167 protocol_binary_request_incr *req= (void*)header;
168 protocol_binary_response_incr response= {
169 .message.header.response= {
170 .magic= PROTOCOL_BINARY_RES,
171 .opcode= header->request.opcode,
172 .opaque= header->request.opaque,
173 },
174 };
175
176 uint16_t keylen= ntohs(header->request.keylen);
177 uint64_t initial= ntohll(req->message.body.initial);
178 uint64_t delta= ntohll(req->message.body.delta);
179 uint32_t expiration= ntohl(req->message.body.expiration);
180 uint32_t flags= 0;
181 void *key= req->bytes + sizeof(req->bytes);
182 protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
183
184 uint64_t value= initial;
185
186 struct item *item= get_item(key, keylen);
187 if (item != NULL)
188 {
189 if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT ||
190 header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ)
191 {
192 value= (*(uint64_t*)item->data) + delta;
193 }
194 else
195 {
196 if (delta > *(uint64_t*)item->data)
197 {
198 value= 0;
199 }
200 else
201 {
202 value= *(uint64_t*)item->data - delta;
203 }
204 }
205 expiration= (uint32_t)item->exp;
206 flags= item->flags;
207
208 release_item(item);
209 delete_item(key, keylen);
210 }
211
212 item= create_item(key, keylen, NULL, sizeof(value), flags, (time_t)expiration);
213 if (item == NULL)
214 {
215 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
216 }
217 else
218 {
219 memcpy(item->data, &value, sizeof(value));
220 put_item(item);
221 }
222
223 response.message.header.response.status= htons(rval);
224 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS)
225 {
226 response.message.header.response.bodylen= ntohl(8);
227 response.message.body.value= ntohll((*(uint64_t*)item->data));
228 response.message.header.response.cas= ntohll(item->cas);
229
230 release_item(item);
231 if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ ||
232 header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ)
233 {
234 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
235 }
236 }
237
238 return response_handler(cookie, header, (void*)&response);
239 }
240
241 static protocol_binary_response_status version_command_handler(const void *cookie,
242 protocol_binary_request_header *header,
243 memcached_binary_protocol_raw_response_handler response_handler)
244 {
245 const char *versionstring= "1.0.0";
246 union {
247 protocol_binary_response_header packet;
248 char buffer[256];
249 } response= {
250 .packet.response= {
251 .magic= PROTOCOL_BINARY_RES,
252 .opcode= PROTOCOL_BINARY_CMD_VERSION,
253 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
254 .opaque= header->request.opaque,
255 .bodylen= htonl((uint32_t)strlen(versionstring))
256 }
257 };
258
259 memcpy(response.buffer + sizeof(response.packet), versionstring, strlen(versionstring));
260
261 return response_handler(cookie, header, (void*)&response);
262 }
263
264 static protocol_binary_response_status concat_command_handler(const void *cookie,
265 protocol_binary_request_header *header,
266 memcached_binary_protocol_raw_response_handler response_handler)
267 {
268 protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
269 uint16_t keylen= ntohs(header->request.keylen);
270 uint64_t cas= ntohll(header->request.cas);
271 void *key= header + 1;
272 uint32_t vallen= ntohl(header->request.bodylen) - keylen;
273 void *val= (char*)key + keylen;
274
275 struct item *item= get_item(key, keylen);
276 struct item *nitem= NULL;
277
278 if (item == NULL)
279 {
280 rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
281 }
282 else if (cas != 0 && cas != item->cas)
283 {
284 rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
285 }
286 else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
287 item->flags, item->exp)) == NULL)
288 {
289 release_item(item);
290 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
291 }
292 else
293 {
294 if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
295 header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ)
296 {
297 memcpy(nitem->data, item->data, item->size);
298 memcpy(((char*)(nitem->data)) + item->size, val, vallen);
299 }
300 else
301 {
302 memcpy(nitem->data, val, vallen);
303 memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
304 }
305 release_item(item);
306 delete_item(key, keylen);
307 put_item(nitem);
308 cas= nitem->cas;
309 release_item(nitem);
310
311 if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
312 header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
313 {
314 protocol_binary_response_no_extras response= {
315 .message= {
316 .header.response= {
317 .magic= PROTOCOL_BINARY_RES,
318 .opcode= header->request.opcode,
319 .status= htons(rval),
320 .opaque= header->request.opaque,
321 .cas= htonll(cas),
322 }
323 }
324 };
325 return response_handler(cookie, header, (void*)&response);
326 }
327 }
328
329 return rval;
330 }
331
332 static protocol_binary_response_status set_command_handler(const void *cookie,
333 protocol_binary_request_header *header,
334 memcached_binary_protocol_raw_response_handler response_handler)
335 {
336 size_t keylen= ntohs(header->request.keylen);
337 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
338 protocol_binary_request_replace *request= (void*)header;
339 uint32_t flags= ntohl(request->message.body.flags);
340 time_t timeout= (time_t)ntohl(request->message.body.expiration);
341 char *key= ((char*)header) + sizeof(*header) + 8;
342 char *data= key + keylen;
343
344 protocol_binary_response_no_extras response= {
345 .message= {
346 .header.response= {
347 .magic= PROTOCOL_BINARY_RES,
348 .opcode= header->request.opcode,
349 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
350 .opaque= header->request.opaque
351 }
352 }
353 };
354
355 if (header->request.cas != 0)
356 {
357 /* validate cas */
358 struct item* item= get_item(key, keylen);
359 if (item != NULL)
360 {
361 if (item->cas != ntohll(header->request.cas))
362 {
363 release_item(item);
364 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
365 return response_handler(cookie, header, (void*)&response);
366 }
367 release_item(item);
368 }
369 }
370
371 delete_item(key, keylen);
372 struct item* item= create_item(key, keylen, data, datalen, flags, timeout);
373 if (item == NULL)
374 {
375 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
376 }
377 else
378 {
379 put_item(item);
380 /* SETQ shouldn't return a message */
381 if (header->request.opcode == PROTOCOL_BINARY_CMD_SET)
382 {
383 response.message.header.response.cas= htonll(item->cas);
384 release_item(item);
385 return response_handler(cookie, header, (void*)&response);
386 }
387 release_item(item);
388
389 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
390 }
391
392 return response_handler(cookie, header, (void*)&response);
393 }
394
395 static protocol_binary_response_status add_command_handler(const void *cookie,
396 protocol_binary_request_header *header,
397 memcached_binary_protocol_raw_response_handler response_handler)
398 {
399 size_t keylen= ntohs(header->request.keylen);
400 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
401 protocol_binary_request_add *request= (void*)header;
402 uint32_t flags= ntohl(request->message.body.flags);
403 time_t timeout= (time_t)ntohl(request->message.body.expiration);
404 char *key= ((char*)header) + sizeof(*header) + 8;
405 char *data= key + keylen;
406
407 protocol_binary_response_no_extras response= {
408 .message= {
409 .header.response= {
410 .magic= PROTOCOL_BINARY_RES,
411 .opcode= header->request.opcode,
412 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
413 .opaque= header->request.opaque
414 }
415 }
416 };
417
418 struct item* item= get_item(key, keylen);
419 if (item == NULL)
420 {
421 item= create_item(key, keylen, data, datalen, flags, timeout);
422 if (item == NULL)
423 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
424 else
425 {
426 put_item(item);
427 /* ADDQ shouldn't return a message */
428 if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
429 {
430 response.message.header.response.cas= htonll(item->cas);
431 release_item(item);
432 return response_handler(cookie, header, (void*)&response);
433 }
434 release_item(item);
435 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
436 }
437 }
438 else
439 {
440 release_item(item);
441 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
442 }
443
444 return response_handler(cookie, header, (void*)&response);
445 }
446
447 static protocol_binary_response_status replace_command_handler(const void *cookie,
448 protocol_binary_request_header *header,
449 memcached_binary_protocol_raw_response_handler response_handler)
450 {
451 size_t keylen= ntohs(header->request.keylen);
452 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
453 protocol_binary_request_replace *request= (void*)header;
454 uint32_t flags= ntohl(request->message.body.flags);
455 time_t timeout= (time_t)ntohl(request->message.body.expiration);
456 char *key= ((char*)header) + sizeof(*header) + 8;
457 char *data= key + keylen;
458
459 protocol_binary_response_no_extras response= {
460 .message= {
461 .header.response= {
462 .magic= PROTOCOL_BINARY_RES,
463 .opcode= header->request.opcode,
464 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
465 .opaque= header->request.opaque
466 }
467 }
468 };
469
470 struct item* item= get_item(key, keylen);
471 if (item == NULL)
472 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
473 else if (header->request.cas == 0 || ntohll(header->request.cas) == item->cas)
474 {
475 release_item(item);
476 delete_item(key, keylen);
477 item= create_item(key, keylen, data, datalen, flags, timeout);
478 if (item == NULL)
479 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
480 else
481 {
482 put_item(item);
483 /* REPLACEQ shouldn't return a message */
484 if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
485 {
486 response.message.header.response.cas= htonll(item->cas);
487 release_item(item);
488 return response_handler(cookie, header, (void*)&response);
489 }
490 release_item(item);
491 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
492 }
493 }
494 else
495 {
496 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
497 release_item(item);
498 }
499
500 return response_handler(cookie, header, (void*)&response);
501 }
502
503 static protocol_binary_response_status stat_command_handler(const void *cookie,
504 protocol_binary_request_header *header,
505 memcached_binary_protocol_raw_response_handler response_handler)
506 {
507 /* Just send the terminating packet*/
508 protocol_binary_response_no_extras response= {
509 .message= {
510 .header.response= {
511 .magic= PROTOCOL_BINARY_RES,
512 .opcode= PROTOCOL_BINARY_CMD_STAT,
513 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
514 .opaque= header->request.opaque
515 }
516 }
517 };
518
519 return response_handler(cookie, header, (void*)&response);
520 }
521
522 memcached_binary_protocol_callback_st interface_v0_impl= {
523 .interface_version= 0,
524 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler,
525 .interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler,
526 .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
527 .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
528 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
529 .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler,
530 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler,
531 .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
532 .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
533 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
534 .interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
535 .interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
536 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
537 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
538 .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler,
539 .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler,
540 .interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
541 .interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
542 .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
543 .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
544 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
545 .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler,
546 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler,
547 .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
548 .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
549 .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler,
550 .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler,
551 };