Merge Monty.
[awesomized/libmemcached] / example / interface_v0.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * This file contains an implementation of the callback interface for level 0
4 * in the protocol library. You might want to have your copy of the protocol
5 * specification next to your coffee ;-)
6 */
7 #include <assert.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <netdb.h>
11 #include <netinet/tcp.h>
12 #include <stdio.h>
13 #include <unistd.h>
14 #include <fcntl.h>
15 #include <errno.h>
16 #include <stdlib.h>
17 #include <string.h>
18
19 #include <libmemcached/protocol_handler.h>
20 #include <libmemcached/byteorder.h>
21 #include "storage.h"
22
23 static protocol_binary_response_status noop_command_handler(const void *cookie,
24 protocol_binary_request_header *header,
25 memcached_binary_protocol_raw_response_handler response_handler)
26 {
27 protocol_binary_response_no_extras response= {
28 .message.header.response= {
29 .magic= PROTOCOL_BINARY_RES,
30 .opcode= PROTOCOL_BINARY_CMD_NOOP,
31 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
32 .opaque= header->request.opaque
33 }
34 };
35
36 return response_handler(cookie, header, (void*)&response);
37 }
38
39 static protocol_binary_response_status quit_command_handler(const void *cookie,
40 protocol_binary_request_header *header,
41 memcached_binary_protocol_raw_response_handler response_handler)
42 {
43 protocol_binary_response_no_extras response= {
44 .message.header.response= {
45 .magic= PROTOCOL_BINARY_RES,
46 .opcode= PROTOCOL_BINARY_CMD_QUIT,
47 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
48 .opaque= header->request.opaque
49 }
50 };
51
52 if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
53 response_handler(cookie, header, (void*)&response);
54
55 /* I need a better way to signal to close the connection */
56 return PROTOCOL_BINARY_RESPONSE_EIO;
57 }
58
59 static protocol_binary_response_status get_command_handler(const void *cookie,
60 protocol_binary_request_header *header,
61 memcached_binary_protocol_raw_response_handler response_handler)
62 {
63 uint8_t opcode= header->request.opcode;
64 union {
65 protocol_binary_response_get response;
66 char buffer[4096];
67 } msg= {
68 .response.message.header.response= {
69 .magic= PROTOCOL_BINARY_RES,
70 .opcode= opcode,
71 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
72 .opaque= header->request.opaque
73 }
74 };
75
76 struct item *item= get_item(header + 1, ntohs(header->request.keylen));
77 if (item)
78 {
79 msg.response.message.body.flags= htonl(item->flags);
80 char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4);
81 uint32_t bodysize= 4;
82 msg.response.message.header.response.cas= htonll(item->cas);
83 if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ)
84 {
85 memcpy(ptr, item->key, item->nkey);
86 msg.response.message.header.response.keylen= htons((uint16_t)item->nkey);
87 ptr += item->nkey;
88 bodysize += (uint32_t)item->nkey;
89 }
90 memcpy(ptr, item->data, item->size);
91 bodysize += (uint32_t)item->size;
92 msg.response.message.header.response.bodylen= htonl(bodysize);
93 msg.response.message.header.response.extlen= 4;
94
95 release_item(item);
96 return response_handler(cookie, header, (void*)&msg);
97 }
98 else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK)
99 {
100 msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
101 return response_handler(cookie, header, (void*)&msg);
102 }
103
104 /* Q shouldn't report a miss ;-) */
105 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
106 }
107
108 static protocol_binary_response_status delete_command_handler(const void *cookie,
109 protocol_binary_request_header *header,
110 memcached_binary_protocol_raw_response_handler response_handler)
111 {
112 size_t keylen= ntohs(header->request.keylen);
113 char *key= ((char*)header) + sizeof(*header);
114 protocol_binary_response_no_extras response= {
115 .message.header.response= {
116 .magic= PROTOCOL_BINARY_RES,
117 .opcode= header->request.opcode,
118 .opaque= header->request.opaque
119 }
120 };
121
122 if (!delete_item(key, keylen))
123 {
124 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
125 return response_handler(cookie, header, (void*)&response);
126 }
127 else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
128 {
129 /* DELETEQ doesn't want success response */
130 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
131 return response_handler(cookie, header, (void*)&response);
132 }
133
134 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
135 }
136
137 static protocol_binary_response_status flush_command_handler(const void *cookie,
138 protocol_binary_request_header *header,
139 memcached_binary_protocol_raw_response_handler response_handler)
140 {
141 uint8_t opcode= header->request.opcode;
142
143 /* @fixme sett inn when! */
144 flush(0);
145
146 if (opcode == PROTOCOL_BINARY_CMD_FLUSH)
147 {
148 protocol_binary_response_no_extras response= {
149 .message.header.response= {
150 .magic= PROTOCOL_BINARY_RES,
151 .opcode= opcode,
152 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
153 .opaque= header->request.opaque
154 }
155 };
156 return response_handler(cookie, header, (void*)&response);
157 }
158
159 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
160 }
161
162 static protocol_binary_response_status arithmetic_command_handler(const void *cookie,
163 protocol_binary_request_header *header,
164 memcached_binary_protocol_raw_response_handler response_handler)
165 {
166 protocol_binary_request_incr *req= (void*)header;
167 protocol_binary_response_incr response= {
168 .message.header.response= {
169 .magic= PROTOCOL_BINARY_RES,
170 .opcode= header->request.opcode,
171 .opaque= header->request.opaque,
172 },
173 };
174
175 uint16_t keylen= ntohs(header->request.keylen);
176 uint64_t initial= ntohll(req->message.body.initial);
177 uint64_t delta= ntohll(req->message.body.delta);
178 uint32_t expiration= ntohl(req->message.body.expiration);
179 uint32_t flags= 0;
180 void *key= req->bytes + sizeof(req->bytes);
181 protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
182
183 uint64_t value= initial;
184
185 struct item *item= get_item(key, keylen);
186 if (item != NULL)
187 {
188 if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT ||
189 header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ)
190 {
191 value= (*(uint64_t*)item->data) + delta;
192 }
193 else
194 {
195 if (delta > *(uint64_t*)item->data)
196 {
197 value= 0;
198 }
199 else
200 {
201 value= *(uint64_t*)item->data - delta;
202 }
203 }
204 expiration= (uint32_t)item->exp;
205 flags= item->flags;
206
207 release_item(item);
208 delete_item(key, keylen);
209 }
210
211 item= create_item(key, keylen, NULL, sizeof(value), flags, (time_t)expiration);
212 if (item == NULL)
213 {
214 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
215 }
216 else
217 {
218 memcpy(item->data, &value, sizeof(value));
219 put_item(item);
220 }
221
222 response.message.header.response.status= htons(rval);
223 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS)
224 {
225 response.message.header.response.bodylen= ntohl(8);
226 response.message.body.value= ntohll((*(uint64_t*)item->data));
227 response.message.header.response.cas= ntohll(item->cas);
228
229 release_item(item);
230 if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ ||
231 header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ)
232 {
233 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
234 }
235 }
236
237 return response_handler(cookie, header, (void*)&response);
238 }
239
240 static protocol_binary_response_status version_command_handler(const void *cookie,
241 protocol_binary_request_header *header,
242 memcached_binary_protocol_raw_response_handler response_handler)
243 {
244 const char *versionstring= "1.0.0";
245 union {
246 protocol_binary_response_header packet;
247 char buffer[256];
248 } response= {
249 .packet.response= {
250 .magic= PROTOCOL_BINARY_RES,
251 .opcode= PROTOCOL_BINARY_CMD_VERSION,
252 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
253 .opaque= header->request.opaque,
254 .bodylen= htonl((uint32_t)strlen(versionstring))
255 }
256 };
257
258 memcpy(response.buffer + sizeof(response.packet), versionstring, strlen(versionstring));
259
260 return response_handler(cookie, header, (void*)&response);
261 }
262
263 static protocol_binary_response_status concat_command_handler(const void *cookie,
264 protocol_binary_request_header *header,
265 memcached_binary_protocol_raw_response_handler response_handler)
266 {
267 protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
268 uint16_t keylen= ntohs(header->request.keylen);
269 uint64_t cas= ntohll(header->request.cas);
270 void *key= header + 1;
271 uint32_t vallen= ntohl(header->request.bodylen) - keylen;
272 void *val= (char*)key + keylen;
273
274 struct item *item= get_item(key, keylen);
275 struct item *nitem= NULL;
276
277 if (item == NULL)
278 {
279 rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
280 }
281 else if (cas != 0 && cas != item->cas)
282 {
283 rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
284 }
285 else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
286 item->flags, item->exp)) == NULL)
287 {
288 release_item(item);
289 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
290 }
291 else
292 {
293 if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
294 header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ)
295 {
296 memcpy(nitem->data, item->data, item->size);
297 memcpy(((char*)(nitem->data)) + item->size, val, vallen);
298 }
299 else
300 {
301 memcpy(nitem->data, val, vallen);
302 memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
303 }
304 release_item(item);
305 delete_item(key, keylen);
306 put_item(nitem);
307 cas= nitem->cas;
308 release_item(nitem);
309
310 if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
311 header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
312 {
313 protocol_binary_response_no_extras response= {
314 .message= {
315 .header.response= {
316 .magic= PROTOCOL_BINARY_RES,
317 .opcode= header->request.opcode,
318 .status= htons(rval),
319 .opaque= header->request.opaque,
320 .cas= htonll(cas),
321 }
322 }
323 };
324 return response_handler(cookie, header, (void*)&response);
325 }
326 }
327
328 return rval;
329 }
330
331 static protocol_binary_response_status set_command_handler(const void *cookie,
332 protocol_binary_request_header *header,
333 memcached_binary_protocol_raw_response_handler response_handler)
334 {
335 size_t keylen= ntohs(header->request.keylen);
336 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
337 protocol_binary_request_replace *request= (void*)header;
338 uint32_t flags= ntohl(request->message.body.flags);
339 time_t timeout= (time_t)ntohl(request->message.body.expiration);
340 char *key= ((char*)header) + sizeof(*header) + 8;
341 char *data= key + keylen;
342
343 protocol_binary_response_no_extras response= {
344 .message= {
345 .header.response= {
346 .magic= PROTOCOL_BINARY_RES,
347 .opcode= header->request.opcode,
348 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
349 .opaque= header->request.opaque
350 }
351 }
352 };
353
354 if (header->request.cas != 0)
355 {
356 /* validate cas */
357 struct item* item= get_item(key, keylen);
358 if (item != NULL)
359 {
360 if (item->cas != ntohll(header->request.cas))
361 {
362 release_item(item);
363 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
364 return response_handler(cookie, header, (void*)&response);
365 }
366 release_item(item);
367 }
368 }
369
370 delete_item(key, keylen);
371 struct item* item= create_item(key, keylen, data, datalen, flags, timeout);
372 if (item == NULL)
373 {
374 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
375 }
376 else
377 {
378 put_item(item);
379 /* SETQ shouldn't return a message */
380 if (header->request.opcode == PROTOCOL_BINARY_CMD_SET)
381 {
382 response.message.header.response.cas= htonll(item->cas);
383 release_item(item);
384 return response_handler(cookie, header, (void*)&response);
385 }
386 release_item(item);
387
388 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
389 }
390
391 return response_handler(cookie, header, (void*)&response);
392 }
393
394 static protocol_binary_response_status add_command_handler(const void *cookie,
395 protocol_binary_request_header *header,
396 memcached_binary_protocol_raw_response_handler response_handler)
397 {
398 size_t keylen= ntohs(header->request.keylen);
399 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
400 protocol_binary_request_add *request= (void*)header;
401 uint32_t flags= ntohl(request->message.body.flags);
402 time_t timeout= (time_t)ntohl(request->message.body.expiration);
403 char *key= ((char*)header) + sizeof(*header) + 8;
404 char *data= key + keylen;
405
406 protocol_binary_response_no_extras response= {
407 .message= {
408 .header.response= {
409 .magic= PROTOCOL_BINARY_RES,
410 .opcode= header->request.opcode,
411 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
412 .opaque= header->request.opaque
413 }
414 }
415 };
416
417 struct item* item= get_item(key, keylen);
418 if (item == NULL)
419 {
420 item= create_item(key, keylen, data, datalen, flags, timeout);
421 if (item == NULL)
422 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
423 else
424 {
425 put_item(item);
426 /* ADDQ shouldn't return a message */
427 if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
428 {
429 response.message.header.response.cas= htonll(item->cas);
430 release_item(item);
431 return response_handler(cookie, header, (void*)&response);
432 }
433 release_item(item);
434 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
435 }
436 }
437 else
438 {
439 release_item(item);
440 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
441 }
442
443 return response_handler(cookie, header, (void*)&response);
444 }
445
446 static protocol_binary_response_status replace_command_handler(const void *cookie,
447 protocol_binary_request_header *header,
448 memcached_binary_protocol_raw_response_handler response_handler)
449 {
450 size_t keylen= ntohs(header->request.keylen);
451 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
452 protocol_binary_request_replace *request= (void*)header;
453 uint32_t flags= ntohl(request->message.body.flags);
454 time_t timeout= (time_t)ntohl(request->message.body.expiration);
455 char *key= ((char*)header) + sizeof(*header) + 8;
456 char *data= key + keylen;
457
458 protocol_binary_response_no_extras response= {
459 .message= {
460 .header.response= {
461 .magic= PROTOCOL_BINARY_RES,
462 .opcode= header->request.opcode,
463 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
464 .opaque= header->request.opaque
465 }
466 }
467 };
468
469 struct item* item= get_item(key, keylen);
470 if (item == NULL)
471 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
472 else if (header->request.cas == 0 || ntohll(header->request.cas) == item->cas)
473 {
474 release_item(item);
475 delete_item(key, keylen);
476 item= create_item(key, keylen, data, datalen, flags, timeout);
477 if (item == NULL)
478 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
479 else
480 {
481 put_item(item);
482 /* REPLACEQ shouldn't return a message */
483 if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
484 {
485 response.message.header.response.cas= htonll(item->cas);
486 release_item(item);
487 return response_handler(cookie, header, (void*)&response);
488 }
489 release_item(item);
490 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
491 }
492 }
493 else
494 {
495 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
496 release_item(item);
497 }
498
499 return response_handler(cookie, header, (void*)&response);
500 }
501
502 static protocol_binary_response_status stat_command_handler(const void *cookie,
503 protocol_binary_request_header *header,
504 memcached_binary_protocol_raw_response_handler response_handler)
505 {
506 /* Just send the terminating packet*/
507 protocol_binary_response_no_extras response= {
508 .message= {
509 .header.response= {
510 .magic= PROTOCOL_BINARY_RES,
511 .opcode= PROTOCOL_BINARY_CMD_STAT,
512 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
513 .opaque= header->request.opaque
514 }
515 }
516 };
517
518 return response_handler(cookie, header, (void*)&response);
519 }
520
521 struct memcached_binary_protocol_callback_st interface_v0_impl= {
522 .interface_version= 0,
523 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler,
524 .interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler,
525 .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
526 .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
527 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
528 .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler,
529 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler,
530 .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
531 .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
532 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
533 .interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
534 .interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
535 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
536 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
537 .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler,
538 .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler,
539 .interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
540 .interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
541 .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
542 .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
543 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
544 .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler,
545 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler,
546 .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
547 .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
548 .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler,
549 .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler,
550 };