Add an example using innodb as a storage (NOTE: this version currently leaks memorybz...
[awesomized/libmemcached] / example / interface_v0.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * This file contains an implementation of the callback interface for level 0
4 * in the protocol library. You might want to have your copy of the protocol
5 * specification next to your coffee ;-)
6 */
7 #include <assert.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <netdb.h>
11 #include <netinet/tcp.h>
12 #include <stdio.h>
13 #include <unistd.h>
14 #include <fcntl.h>
15 #include <errno.h>
16 #include <stdlib.h>
17 #include <string.h>
18
19 #include <libmemcached/protocol_handler.h>
20 #include <libmemcached/byteorder.h>
21 #include "storage.h"
22
23 static protocol_binary_response_status noop_command_handler(const void *cookie,
24 protocol_binary_request_header *header,
25 memcached_binary_protocol_raw_response_handler response_handler)
26 {
27 protocol_binary_response_no_extras response= {
28 .message.header.response= {
29 .magic= PROTOCOL_BINARY_RES,
30 .opcode= PROTOCOL_BINARY_CMD_NOOP,
31 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
32 .opaque= header->request.opaque
33 }
34 };
35
36 return response_handler(cookie, header, (void*)&response);
37 }
38
39 static protocol_binary_response_status quit_command_handler(const void *cookie,
40 protocol_binary_request_header *header,
41 memcached_binary_protocol_raw_response_handler response_handler)
42 {
43 protocol_binary_response_no_extras response= {
44 .message.header.response= {
45 .magic= PROTOCOL_BINARY_RES,
46 .opcode= PROTOCOL_BINARY_CMD_QUIT,
47 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
48 .opaque= header->request.opaque
49 }
50 };
51
52 if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
53 response_handler(cookie, header, (void*)&response);
54
55 /* I need a better way to signal to close the connection */
56 return PROTOCOL_BINARY_RESPONSE_EIO;
57 }
58
59 static protocol_binary_response_status get_command_handler(const void *cookie,
60 protocol_binary_request_header *header,
61 memcached_binary_protocol_raw_response_handler response_handler)
62 {
63 uint8_t opcode= header->request.opcode;
64 union {
65 protocol_binary_response_get response;
66 char buffer[4096];
67 } msg= {
68 .response.message.header.response= {
69 .magic= PROTOCOL_BINARY_RES,
70 .opcode= opcode,
71 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
72 .opaque= header->request.opaque
73 }
74 };
75
76 struct item *item= get_item(header + 1, ntohs(header->request.keylen));
77 if (item)
78 {
79 msg.response.message.body.flags= htonl(item->flags);
80 char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4);
81 uint32_t bodysize= 4;
82 msg.response.message.header.response.cas= htonll(item->cas);
83 if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ)
84 {
85 memcpy(ptr, item->key, item->nkey);
86 msg.response.message.header.response.keylen= htons((uint16_t)item->nkey);
87 ptr += item->nkey;
88 bodysize += (uint32_t)item->nkey;
89 }
90 memcpy(ptr, item->data, item->size);
91 bodysize += (uint32_t)item->size;
92 msg.response.message.header.response.bodylen= htonl(bodysize);
93 msg.response.message.header.response.extlen= 4;
94
95 return response_handler(cookie, header, (void*)&msg);
96 }
97 else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK)
98 {
99 msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
100 return response_handler(cookie, header, (void*)&msg);
101 }
102
103 /* Q shouldn't report a miss ;-) */
104 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
105 }
106
107 static protocol_binary_response_status delete_command_handler(const void *cookie,
108 protocol_binary_request_header *header,
109 memcached_binary_protocol_raw_response_handler response_handler)
110 {
111 size_t keylen= ntohs(header->request.keylen);
112 char *key= ((char*)header) + sizeof(*header);
113 protocol_binary_response_no_extras response= {
114 .message.header.response= {
115 .magic= PROTOCOL_BINARY_RES,
116 .opcode= header->request.opcode,
117 .opaque= header->request.opaque
118 }
119 };
120
121 if (!delete_item(key, keylen))
122 {
123 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
124 return response_handler(cookie, header, (void*)&response);
125 }
126 else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
127 {
128 /* DELETEQ doesn't want success response */
129 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
130 return response_handler(cookie, header, (void*)&response);
131 }
132
133 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
134 }
135
136 static protocol_binary_response_status flush_command_handler(const void *cookie,
137 protocol_binary_request_header *header,
138 memcached_binary_protocol_raw_response_handler response_handler)
139 {
140 uint8_t opcode= header->request.opcode;
141
142 /* @fixme sett inn when! */
143 flush(0);
144
145 if (opcode == PROTOCOL_BINARY_CMD_FLUSH)
146 {
147 protocol_binary_response_no_extras response= {
148 .message.header.response= {
149 .magic= PROTOCOL_BINARY_RES,
150 .opcode= opcode,
151 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
152 .opaque= header->request.opaque
153 }
154 };
155 return response_handler(cookie, header, (void*)&response);
156 }
157
158 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
159 }
160
161 static protocol_binary_response_status arithmetic_command_handler(const void *cookie,
162 protocol_binary_request_header *header,
163 memcached_binary_protocol_raw_response_handler response_handler)
164 {
165 protocol_binary_request_incr *req= (void*)header;
166 protocol_binary_response_incr response= {
167 .message.header.response= {
168 .magic= PROTOCOL_BINARY_RES,
169 .opcode= header->request.opcode,
170 .opaque= header->request.opaque,
171 },
172 };
173
174 uint16_t keylen= ntohs(header->request.keylen);
175 uint64_t initial= ntohll(req->message.body.initial);
176 uint64_t delta= ntohll(req->message.body.delta);
177 uint32_t expiration= ntohl(req->message.body.expiration);
178 void *key= req->bytes + sizeof(req->bytes);
179 protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
180
181 struct item *item= get_item(key, keylen);
182 if (item == NULL)
183 {
184 item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration);
185 if (item == NULL)
186 {
187 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
188 }
189 else
190 {
191 memcpy(item->data, &initial, sizeof(initial));
192 put_item(item);
193 }
194 }
195 else
196 {
197 if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT ||
198 header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ)
199 {
200 (*(uint64_t*)item->data) += delta;
201 }
202 else
203 {
204 if (delta > *(uint64_t*)item->data)
205 {
206 *(uint64_t*)item->data= 0;
207 }
208 else
209 {
210 *(uint64_t*)item->data -= delta;
211 }
212 }
213
214 struct item *nitem= create_item(key, keylen, NULL, sizeof(initial), 0, item->exp);
215 if (item == NULL)
216 {
217 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
218 delete_item(key, keylen);
219 }
220 else
221 {
222 memcpy(nitem->data, item->data, item->size);
223 delete_item(key, keylen);
224 put_item(nitem);
225 item = nitem;
226 }
227 }
228
229 response.message.header.response.status= htons(rval);
230 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS)
231 {
232 response.message.header.response.bodylen= ntohl(8);
233 response.message.body.value= ntohll((*(uint64_t*)item->data));
234 response.message.header.response.cas= ntohll(item->cas);
235
236 if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ ||
237 header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ)
238 {
239 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
240 }
241 }
242
243 return response_handler(cookie, header, (void*)&response);
244 }
245
246 static protocol_binary_response_status version_command_handler(const void *cookie,
247 protocol_binary_request_header *header,
248 memcached_binary_protocol_raw_response_handler response_handler)
249 {
250 const char *versionstring= "1.0.0";
251 union {
252 protocol_binary_response_header packet;
253 char buffer[256];
254 } response= {
255 .packet.response= {
256 .magic= PROTOCOL_BINARY_RES,
257 .opcode= PROTOCOL_BINARY_CMD_VERSION,
258 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
259 .opaque= header->request.opaque,
260 .bodylen= htonl((uint32_t)strlen(versionstring))
261 }
262 };
263
264 memcpy(response.buffer + sizeof(response.packet), versionstring, strlen(versionstring));
265
266 return response_handler(cookie, header, (void*)&response);
267 }
268
269 static protocol_binary_response_status concat_command_handler(const void *cookie,
270 protocol_binary_request_header *header,
271 memcached_binary_protocol_raw_response_handler response_handler)
272 {
273 protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
274 uint16_t keylen= ntohs(header->request.keylen);
275 uint64_t cas= ntohll(header->request.cas);
276 void *key= header + 1;
277 uint32_t vallen= ntohl(header->request.bodylen) - keylen;
278 void *val= (char*)key + keylen;
279
280 struct item *item= get_item(key, keylen);
281 struct item *nitem;
282 if (item == NULL)
283 {
284 rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
285 }
286 else if (cas != 0 && cas != item->cas)
287 {
288 rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
289 }
290 else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
291 item->flags, item->exp)) == NULL)
292 {
293 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
294 }
295 else
296 {
297 if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
298 header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ)
299 {
300 memcpy(nitem->data, item->data, item->size);
301 memcpy(((char*)(nitem->data)) + item->size, val, vallen);
302 }
303 else
304 {
305 memcpy(nitem->data, val, vallen);
306 memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
307 }
308 delete_item(key, keylen);
309 put_item(nitem);
310 if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
311 header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
312 {
313 protocol_binary_response_no_extras response= {
314 .message= {
315 .header.response= {
316 .magic= PROTOCOL_BINARY_RES,
317 .opcode= header->request.opcode,
318 .status= htons(rval),
319 .opaque= header->request.opaque,
320 .cas= htonll(nitem->cas),
321 }
322 }
323 };
324 return response_handler(cookie, header, (void*)&response);
325 }
326 }
327
328 return rval;
329 }
330
331 static protocol_binary_response_status set_command_handler(const void *cookie,
332 protocol_binary_request_header *header,
333 memcached_binary_protocol_raw_response_handler response_handler)
334 {
335 size_t keylen= ntohs(header->request.keylen);
336 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
337 protocol_binary_request_replace *request= (void*)header;
338 uint32_t flags= ntohl(request->message.body.flags);
339 time_t timeout= ntohl(request->message.body.expiration);
340 char *key= ((char*)header) + sizeof(*header) + 8;
341 char *data= key + keylen;
342
343 protocol_binary_response_no_extras response= {
344 .message= {
345 .header.response= {
346 .magic= PROTOCOL_BINARY_RES,
347 .opcode= header->request.opcode,
348 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
349 .opaque= header->request.opaque
350 }
351 }
352 };
353
354 if (header->request.cas != 0)
355 {
356 /* validate cas */
357 struct item* item= get_item(key, keylen);
358 if (item != NULL && item->cas != ntohll(header->request.cas))
359 {
360 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
361 return response_handler(cookie, header, (void*)&response);
362 }
363 }
364
365 delete_item(key, keylen);
366 struct item* item= create_item(key, keylen, data, datalen, flags, timeout);
367 if (item == 0)
368 {
369 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
370 }
371 else
372 {
373 put_item(item);
374 /* SETQ shouldn't return a message */
375 if (header->request.opcode == PROTOCOL_BINARY_CMD_SET)
376 {
377 response.message.header.response.cas= htonll(item->cas);
378 return response_handler(cookie, header, (void*)&response);
379 }
380 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
381 }
382
383 return response_handler(cookie, header, (void*)&response);
384 }
385
386 static protocol_binary_response_status add_command_handler(const void *cookie,
387 protocol_binary_request_header *header,
388 memcached_binary_protocol_raw_response_handler response_handler)
389 {
390 size_t keylen= ntohs(header->request.keylen);
391 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
392 protocol_binary_request_add *request= (void*)header;
393 uint32_t flags= ntohl(request->message.body.flags);
394 time_t timeout= ntohl(request->message.body.expiration);
395 char *key= ((char*)header) + sizeof(*header) + 8;
396 char *data= key + keylen;
397
398 protocol_binary_response_no_extras response= {
399 .message= {
400 .header.response= {
401 .magic= PROTOCOL_BINARY_RES,
402 .opcode= header->request.opcode,
403 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
404 .opaque= header->request.opaque
405 }
406 }
407 };
408
409 struct item* item= get_item(key, keylen);
410 if (item == NULL)
411 {
412 item= create_item(key, keylen, data, datalen, flags, timeout);
413 if (item == 0)
414 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
415 else
416 {
417 put_item(item);
418 /* ADDQ shouldn't return a message */
419 if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
420 {
421 response.message.header.response.cas= htonll(item->cas);
422 return response_handler(cookie, header, (void*)&response);
423 }
424 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
425 }
426 }
427 else
428 {
429 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
430 }
431
432 return response_handler(cookie, header, (void*)&response);
433 }
434
435 static protocol_binary_response_status replace_command_handler(const void *cookie,
436 protocol_binary_request_header *header,
437 memcached_binary_protocol_raw_response_handler response_handler)
438 {
439 size_t keylen= ntohs(header->request.keylen);
440 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
441 protocol_binary_request_replace *request= (void*)header;
442 uint32_t flags= ntohl(request->message.body.flags);
443 time_t timeout= ntohl(request->message.body.expiration);
444 char *key= ((char*)header) + sizeof(*header) + 8;
445 char *data= key + keylen;
446
447 protocol_binary_response_no_extras response= {
448 .message= {
449 .header.response= {
450 .magic= PROTOCOL_BINARY_RES,
451 .opcode= header->request.opcode,
452 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
453 .opaque= header->request.opaque
454 }
455 }
456 };
457
458 struct item* item= get_item(key, keylen);
459 if (item == NULL)
460 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
461 else if (header->request.cas == 0 || ntohll(header->request.cas) == item->cas)
462 {
463 delete_item(key, keylen);
464 item= create_item(key, keylen, data, datalen, flags, timeout);
465 if (item == 0)
466 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
467 else
468 {
469 put_item(item);
470 /* REPLACEQ shouldn't return a message */
471 if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
472 {
473 response.message.header.response.cas= htonll(item->cas);
474 return response_handler(cookie, header, (void*)&response);
475 }
476 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
477 }
478 }
479 else
480 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
481
482 return response_handler(cookie, header, (void*)&response);
483 }
484
485 static protocol_binary_response_status stat_command_handler(const void *cookie,
486 protocol_binary_request_header *header,
487 memcached_binary_protocol_raw_response_handler response_handler)
488 {
489 /* Just send the terminating packet*/
490 protocol_binary_response_no_extras response= {
491 .message= {
492 .header.response= {
493 .magic= PROTOCOL_BINARY_RES,
494 .opcode= PROTOCOL_BINARY_CMD_STAT,
495 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
496 .opaque= header->request.opaque
497 }
498 }
499 };
500
501 return response_handler(cookie, header, (void*)&response);
502 }
503
504 struct memcached_binary_protocol_callback_st interface_v0_impl= {
505 .interface_version= 0,
506 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler,
507 .interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler,
508 .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
509 .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
510 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
511 .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler,
512 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler,
513 .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
514 .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
515 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
516 .interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
517 .interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
518 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
519 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
520 .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler,
521 .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler,
522 .interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
523 .interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
524 .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
525 .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
526 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
527 .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler,
528 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler,
529 .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
530 .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
531 .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler,
532 .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler,
533 };