Merge trunk
[awesomized/libmemcached] / example / interface_v0.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * This file contains an implementation of the callback interface for level 0
4 * in the protocol library. You might want to have your copy of the protocol
5 * specification next to your coffee ;-)
6 */
7 #include <assert.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <netdb.h>
11 #include <netinet/tcp.h>
12 #include <stdio.h>
13 #include <unistd.h>
14 #include <fcntl.h>
15 #include <errno.h>
16 #include <stdlib.h>
17 #include <string.h>
18
19 #include <libmemcached/protocol_handler.h>
20 #include <libmemcached/byteorder.h>
21 #include "storage.h"
22
23 static protocol_binary_response_status noop_command_handler(const void *cookie,
24 protocol_binary_request_header *header,
25 memcached_binary_protocol_raw_response_handler response_handler)
26 {
27 protocol_binary_response_no_extras response= {
28 .message.header.response= {
29 .magic= PROTOCOL_BINARY_RES,
30 .opcode= PROTOCOL_BINARY_CMD_NOOP,
31 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
32 .opaque= header->request.opaque
33 }
34 };
35
36 return response_handler(cookie, header, (void*)&response);
37 }
38
39 static protocol_binary_response_status quit_command_handler(const void *cookie,
40 protocol_binary_request_header *header,
41 memcached_binary_protocol_raw_response_handler response_handler)
42 {
43 protocol_binary_response_no_extras response= {
44 .message.header.response= {
45 .magic= PROTOCOL_BINARY_RES,
46 .opcode= PROTOCOL_BINARY_CMD_QUIT,
47 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
48 .opaque= header->request.opaque
49 }
50 };
51
52 if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
53 response_handler(cookie, header, (void*)&response);
54
55 /* I need a better way to signal to close the connection */
56 return PROTOCOL_BINARY_RESPONSE_EIO;
57 }
58
59 static protocol_binary_response_status get_command_handler(const void *cookie,
60 protocol_binary_request_header *header,
61 memcached_binary_protocol_raw_response_handler response_handler)
62 {
63 uint8_t opcode= header->request.opcode;
64 union {
65 protocol_binary_response_get response;
66 char buffer[4096];
67 } msg= {
68 .response.message.header.response= {
69 .magic= PROTOCOL_BINARY_RES,
70 .opcode= opcode,
71 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
72 .opaque= header->request.opaque
73 }
74 };
75
76 struct item *item= get_item(header + 1, ntohs(header->request.keylen));
77 if (item)
78 {
79 msg.response.message.body.flags= htonl(item->flags);
80 char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4);
81 uint32_t bodysize= 4;
82 msg.response.message.header.response.cas= htonll(item->cas);
83 if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ)
84 {
85 memcpy(ptr, item->key, item->nkey);
86 msg.response.message.header.response.keylen= htons((uint16_t)item->nkey);
87 ptr += item->nkey;
88 bodysize += (uint32_t)item->nkey;
89 }
90 memcpy(ptr, item->data, item->size);
91 bodysize += (uint32_t)item->size;
92 msg.response.message.header.response.bodylen= htonl(bodysize);
93 msg.response.message.header.response.extlen= 4;
94
95 return response_handler(cookie, header, (void*)&msg);
96 }
97 else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK)
98 {
99 msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
100 return response_handler(cookie, header, (void*)&msg);
101 }
102
103 /* Q shouldn't report a miss ;-) */
104 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
105 }
106
107 static protocol_binary_response_status delete_command_handler(const void *cookie,
108 protocol_binary_request_header *header,
109 memcached_binary_protocol_raw_response_handler response_handler)
110 {
111 size_t keylen= ntohs(header->request.keylen);
112 char *key= ((char*)header) + sizeof(*header);
113 protocol_binary_response_no_extras response= {
114 .message.header.response= {
115 .magic= PROTOCOL_BINARY_RES,
116 .opcode= header->request.opcode,
117 .opaque= header->request.opaque
118 }
119 };
120
121 if (!delete_item(key, keylen))
122 {
123 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
124 return response_handler(cookie, header, (void*)&response);
125 }
126 else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
127 {
128 /* DELETEQ doesn't want success response */
129 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
130 return response_handler(cookie, header, (void*)&response);
131 }
132
133 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
134 }
135
136 static protocol_binary_response_status flush_command_handler(const void *cookie,
137 protocol_binary_request_header *header,
138 memcached_binary_protocol_raw_response_handler response_handler)
139 {
140 uint8_t opcode= header->request.opcode;
141
142 /* @fixme sett inn when! */
143 flush(0);
144
145 if (opcode == PROTOCOL_BINARY_CMD_FLUSH)
146 {
147 protocol_binary_response_no_extras response= {
148 .message.header.response= {
149 .magic= PROTOCOL_BINARY_RES,
150 .opcode= opcode,
151 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
152 .opaque= header->request.opaque
153 }
154 };
155 return response_handler(cookie, header, (void*)&response);
156 }
157
158 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
159 }
160
161 static protocol_binary_response_status arithmetic_command_handler(const void *cookie,
162 protocol_binary_request_header *header,
163 memcached_binary_protocol_raw_response_handler response_handler)
164 {
165 protocol_binary_request_incr *req= (void*)header;
166 protocol_binary_response_incr response= {
167 .message.header.response= {
168 .magic= PROTOCOL_BINARY_RES,
169 .opcode= header->request.opcode,
170 .opaque= header->request.opaque,
171 },
172 };
173
174 uint16_t keylen= ntohs(header->request.keylen);
175 uint64_t initial= ntohll(req->message.body.initial);
176 uint64_t delta= ntohll(req->message.body.delta);
177 uint32_t expiration= ntohl(req->message.body.expiration);
178 void *key= req->bytes + sizeof(req->bytes);
179 protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
180
181 struct item *item= get_item(key, keylen);
182 if (item == NULL)
183 {
184 item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration);
185 if (item == 0)
186 {
187 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
188 }
189 else
190 {
191 memcpy(item->data, &initial, sizeof(initial));
192 put_item(item);
193 }
194 }
195 else
196 {
197 if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT ||
198 header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ)
199 {
200 (*(uint64_t*)item->data) += delta;
201 }
202 else
203 {
204 if (delta > *(uint64_t*)item->data)
205 {
206 *(uint64_t*)item->data= 0;
207 }
208 else
209 {
210 *(uint64_t*)item->data -= delta;
211 }
212 }
213 update_cas(item);
214 }
215
216 response.message.header.response.status= htons(rval);
217 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS)
218 {
219 response.message.header.response.bodylen= ntohl(8);
220 response.message.body.value= ntohll((*(uint64_t*)item->data));
221 response.message.header.response.cas= ntohll(item->cas);
222
223 if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ ||
224 header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ)
225 {
226 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
227 }
228 }
229
230 return response_handler(cookie, header, (void*)&response);
231 }
232
233 static protocol_binary_response_status version_command_handler(const void *cookie,
234 protocol_binary_request_header *header,
235 memcached_binary_protocol_raw_response_handler response_handler)
236 {
237 const char *versionstring= "1.0.0";
238 union {
239 protocol_binary_response_header packet;
240 char buffer[256];
241 } response= {
242 .packet.response= {
243 .magic= PROTOCOL_BINARY_RES,
244 .opcode= PROTOCOL_BINARY_CMD_VERSION,
245 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
246 .opaque= header->request.opaque,
247 .bodylen= htonl((uint32_t)strlen(versionstring))
248 }
249 };
250
251 memcpy(response.buffer + sizeof(response.packet), versionstring, strlen(versionstring));
252
253 return response_handler(cookie, header, (void*)&response);
254 }
255
256 static protocol_binary_response_status concat_command_handler(const void *cookie,
257 protocol_binary_request_header *header,
258 memcached_binary_protocol_raw_response_handler response_handler)
259 {
260 protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
261 uint16_t keylen= ntohs(header->request.keylen);
262 uint64_t cas= ntohll(header->request.cas);
263 void *key= header + 1;
264 uint32_t vallen= ntohl(header->request.bodylen) - keylen;
265 void *val= (char*)key + keylen;
266
267 struct item *item= get_item(key, keylen);
268 struct item *nitem;
269 if (item == NULL)
270 {
271 rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
272 }
273 else if (cas != 0 && cas != item->cas)
274 {
275 rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
276 }
277 else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
278 item->flags, item->exp)) == NULL)
279 {
280 rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
281 }
282 else
283 {
284 if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
285 header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ)
286 {
287 memcpy(nitem->data, item->data, item->size);
288 memcpy(((char*)(nitem->data)) + item->size, val, vallen);
289 }
290 else
291 {
292 memcpy(nitem->data, val, vallen);
293 memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
294 }
295 delete_item(key, keylen);
296 put_item(nitem);
297 if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
298 header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
299 {
300 protocol_binary_response_no_extras response= {
301 .message= {
302 .header.response= {
303 .magic= PROTOCOL_BINARY_RES,
304 .opcode= header->request.opcode,
305 .status= htons(rval),
306 .opaque= header->request.opaque,
307 .cas= htonll(nitem->cas),
308 }
309 }
310 };
311 return response_handler(cookie, header, (void*)&response);
312 }
313 }
314
315 return rval;
316 }
317
318 static protocol_binary_response_status set_command_handler(const void *cookie,
319 protocol_binary_request_header *header,
320 memcached_binary_protocol_raw_response_handler response_handler)
321 {
322 size_t keylen= ntohs(header->request.keylen);
323 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
324 protocol_binary_request_replace *request= (void*)header;
325 uint32_t flags= ntohl(request->message.body.flags);
326 time_t timeout= ntohl(request->message.body.expiration);
327 char *key= ((char*)header) + sizeof(*header) + 8;
328 char *data= key + keylen;
329
330 protocol_binary_response_no_extras response= {
331 .message= {
332 .header.response= {
333 .magic= PROTOCOL_BINARY_RES,
334 .opcode= header->request.opcode,
335 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
336 .opaque= header->request.opaque
337 }
338 }
339 };
340
341 if (header->request.cas != 0)
342 {
343 /* validate cas */
344 struct item* item= get_item(key, keylen);
345 if (item != NULL && item->cas != ntohll(header->request.cas))
346 {
347 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
348 return response_handler(cookie, header, (void*)&response);
349 }
350 }
351
352 delete_item(key, keylen);
353 struct item* item= create_item(key, keylen, data, datalen, flags, timeout);
354 if (item == 0)
355 {
356 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
357 }
358 else
359 {
360 put_item(item);
361 /* SETQ shouldn't return a message */
362 if (header->request.opcode == PROTOCOL_BINARY_CMD_SET)
363 {
364 response.message.header.response.cas= htonll(item->cas);
365 return response_handler(cookie, header, (void*)&response);
366 }
367 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
368 }
369
370 return response_handler(cookie, header, (void*)&response);
371 }
372
373 static protocol_binary_response_status add_command_handler(const void *cookie,
374 protocol_binary_request_header *header,
375 memcached_binary_protocol_raw_response_handler response_handler)
376 {
377 size_t keylen= ntohs(header->request.keylen);
378 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
379 protocol_binary_request_add *request= (void*)header;
380 uint32_t flags= ntohl(request->message.body.flags);
381 time_t timeout= ntohl(request->message.body.expiration);
382 char *key= ((char*)header) + sizeof(*header) + 8;
383 char *data= key + keylen;
384
385 protocol_binary_response_no_extras response= {
386 .message= {
387 .header.response= {
388 .magic= PROTOCOL_BINARY_RES,
389 .opcode= header->request.opcode,
390 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
391 .opaque= header->request.opaque
392 }
393 }
394 };
395
396 struct item* item= get_item(key, keylen);
397 if (item == NULL)
398 {
399 item= create_item(key, keylen, data, datalen, flags, timeout);
400 if (item == 0)
401 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
402 else
403 {
404 put_item(item);
405 /* ADDQ shouldn't return a message */
406 if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
407 {
408 response.message.header.response.cas= htonll(item->cas);
409 return response_handler(cookie, header, (void*)&response);
410 }
411 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
412 }
413 }
414 else
415 {
416 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
417 }
418
419 return response_handler(cookie, header, (void*)&response);
420 }
421
422 static protocol_binary_response_status replace_command_handler(const void *cookie,
423 protocol_binary_request_header *header,
424 memcached_binary_protocol_raw_response_handler response_handler)
425 {
426 size_t keylen= ntohs(header->request.keylen);
427 size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
428 protocol_binary_request_replace *request= (void*)header;
429 uint32_t flags= ntohl(request->message.body.flags);
430 time_t timeout= ntohl(request->message.body.expiration);
431 char *key= ((char*)header) + sizeof(*header) + 8;
432 char *data= key + keylen;
433
434 protocol_binary_response_no_extras response= {
435 .message= {
436 .header.response= {
437 .magic= PROTOCOL_BINARY_RES,
438 .opcode= header->request.opcode,
439 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
440 .opaque= header->request.opaque
441 }
442 }
443 };
444
445 struct item* item= get_item(key, keylen);
446 if (item == NULL)
447 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
448 else if (header->request.cas == 0 || ntohll(header->request.cas) == item->cas)
449 {
450 delete_item(key, keylen);
451 item= create_item(key, keylen, data, datalen, flags, timeout);
452 if (item == 0)
453 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
454 else
455 {
456 put_item(item);
457 /* REPLACEQ shouldn't return a message */
458 if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
459 {
460 response.message.header.response.cas= htonll(item->cas);
461 return response_handler(cookie, header, (void*)&response);
462 }
463 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
464 }
465 }
466 else
467 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
468
469 return response_handler(cookie, header, (void*)&response);
470 }
471
472 static protocol_binary_response_status stat_command_handler(const void *cookie,
473 protocol_binary_request_header *header,
474 memcached_binary_protocol_raw_response_handler response_handler)
475 {
476 /* Just send the terminating packet*/
477 protocol_binary_response_no_extras response= {
478 .message= {
479 .header.response= {
480 .magic= PROTOCOL_BINARY_RES,
481 .opcode= PROTOCOL_BINARY_CMD_STAT,
482 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
483 .opaque= header->request.opaque
484 }
485 }
486 };
487
488 return response_handler(cookie, header, (void*)&response);
489 }
490
491 struct memcached_binary_protocol_callback_st interface_v0_impl= {
492 .interface_version= 0,
493 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler,
494 .interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler,
495 .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
496 .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
497 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
498 .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler,
499 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler,
500 .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
501 .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
502 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
503 .interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
504 .interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
505 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
506 .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
507 .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler,
508 .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler,
509 .interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
510 .interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
511 .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
512 .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
513 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
514 .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler,
515 .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler,
516 .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
517 .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
518 .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler,
519 .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler,
520 };