Add an example using innodb as a storage (NOTE: this version currently leaks memorybz...
[awesomized/libmemcached] / libmemcached / protocol / protocol_handler.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include "common.h"
3
4 #include <stdlib.h>
5 #include <sys/types.h>
6 #include <sys/socket.h>
7 #include <errno.h>
8 #include <stdbool.h>
9 #include <string.h>
10 #include <stdio.h>
11
12 /*
13 ** **********************************************************************
14 ** INTERNAL INTERFACE
15 ** **********************************************************************
16 */
17
18 /**
19 * The default function to receive data from the client. This function
20 * just wraps the recv function to receive from a socket.
21 * See man -s3socket recv for more information.
22 *
23 * @param cookie cookie indentifying a client, not used
24 * @param sock socket to read from
25 * @param buf the destination buffer
26 * @param nbytes the number of bytes to read
27 * @return the number of bytes transferred of -1 upon error
28 */
29 static ssize_t default_recv(const void *cookie,
30 int sock,
31 void *buf,
32 size_t nbytes)
33 {
34 (void)cookie;
35 return recv(sock, buf, nbytes, 0);
36 }
37
38 /**
39 * The default function to send data to the server. This function
40 * just wraps the send function to send through a socket.
41 * See man -s3socket send for more information.
42 *
43 * @param cookie cookie indentifying a client, not used
44 * @param sock socket to send to
45 * @param buf the source buffer
46 * @param nbytes the number of bytes to send
47 * @return the number of bytes transferred of -1 upon error
48 */
49 static ssize_t default_send(const void *cookie,
50 int fd,
51 const void *buf,
52 size_t nbytes)
53 {
54 (void)cookie;
55 return send(fd, buf, nbytes, 0);
56 }
57
58 /**
59 * Try to drain the output buffers without blocking
60 *
61 * @param client the client to drain
62 * @return false if an error occured (connection should be shut down)
63 * true otherwise (please note that there may be more data to
64 * left in the buffer to send)
65 */
66 static bool drain_output(struct memcached_binary_protocol_client_st *client)
67 {
68 ssize_t len;
69
70 // Do we have pending data to send?
71 while (client->output != NULL)
72 {
73 len= client->root->send(client,
74 client->sock,
75 client->output->data + client->output->offset,
76 client->output->nbytes - client->output->offset);
77
78 if (len == -1)
79 {
80 if (errno == EWOULDBLOCK)
81 {
82 return true;
83 }
84 else if (errno != EINTR)
85 {
86 client->error= errno;
87 return false;
88 }
89 }
90 else
91 {
92 client->output->offset += (size_t)len;
93 if (client->output->offset == client->output->nbytes)
94 {
95 /* This was the complete buffer */
96 struct chunk_st *old= client->output;
97 client->output= client->output->next;
98 if (client->output == NULL)
99 {
100 client->output_tail= NULL;
101 }
102 cache_free(client->root->buffer_cache, old);
103 }
104 }
105 }
106
107 return true;
108 }
109
110 /**
111 * Allocate an output buffer and chain it into the output list
112 *
113 * @param client the client that needs the buffer
114 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
115 */
116 static struct chunk_st*
117 allocate_output_chunk(struct memcached_binary_protocol_client_st *client)
118 {
119 struct chunk_st*ret= cache_alloc(client->root->buffer_cache);
120 if (ret == NULL) {
121 return NULL;
122 }
123
124 ret->offset = ret->nbytes = 0;
125 ret->next = NULL;
126 ret->size = CHUNK_BUFFERSIZE;
127 ret->data= (void*)(ret + 1);
128 if (client->output == NULL)
129 {
130 client->output = client->output_tail = ret;
131 }
132 else
133 {
134 client->output_tail->next= ret;
135 client->output_tail= ret;
136 }
137
138 return ret;
139 }
140
141 /**
142 * Spool data into the send-buffer for a client.
143 *
144 * @param client the client to spool the data for
145 * @param data the data to spool
146 * @param length the number of bytes of data to spool
147 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
148 * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
149 */
150 static protocol_binary_response_status
151 spool_output(struct memcached_binary_protocol_client_st *client,
152 const void *data,
153 size_t length)
154 {
155 size_t offset = 0;
156
157 struct chunk_st *chunk= client->output;
158 while (offset < length)
159 {
160 if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
161 {
162 if ((chunk= allocate_output_chunk(client)) == NULL)
163 {
164 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
165 }
166 }
167
168 size_t bulk = length - offset;
169 if (bulk > chunk->size - chunk->nbytes)
170 {
171 bulk = chunk->size - chunk->nbytes;
172 }
173
174 memcpy(chunk->data + chunk->nbytes, data, bulk);
175 chunk->nbytes += bulk;
176 offset += bulk;
177 }
178
179 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
180 }
181
182 /**
183 * Send a preformatted packet back to the client. If the connection is in
184 * pedantic mode, it will validate the packet and refuse to send it if it
185 * breaks the specification.
186 *
187 * @param cookie client identification
188 * @param request the original request packet
189 * @param response the packet to send
190 * @return The status of the operation
191 */
192 static protocol_binary_response_status
193 raw_response_handler(const void *cookie,
194 protocol_binary_request_header *request,
195 protocol_binary_response_header *response)
196 {
197 struct memcached_binary_protocol_client_st *client= (void*)cookie;
198
199 if (client->root->pedantic &&
200 !memcached_binary_protocol_pedantic_check_response(request, response))
201 {
202 return PROTOCOL_BINARY_RESPONSE_EINVAL;
203 }
204
205 if (!drain_output(client))
206 {
207 return PROTOCOL_BINARY_RESPONSE_EIO;
208 }
209
210 size_t len= sizeof(*response) + htonl(response->response.bodylen);
211 size_t offset= 0;
212 char *ptr= (void*)response;
213
214 if (client->output == NULL)
215 {
216 /* I can write directly to the socket.... */
217 do
218 {
219 size_t num_bytes= len - offset;
220 ssize_t nw= client->root->send(client,
221 client->sock,
222 ptr + offset,
223 num_bytes);
224 if (nw == -1)
225 {
226 if (errno == EWOULDBLOCK)
227 {
228 break;
229 }
230 else if (errno != EINTR)
231 {
232 client->error= errno;
233 return PROTOCOL_BINARY_RESPONSE_EIO;
234 }
235 }
236 else
237 {
238 offset += (size_t)nw;
239 }
240 } while (offset < len);
241 }
242
243 return spool_output(client, ptr, len - offset);
244 }
245
246 /*
247 * Version 0 of the interface is really low level and protocol specific,
248 * while the version 1 of the interface is more API focused. We need a
249 * way to translate between the command codes on the wire and the
250 * application level interface in V1, so let's just use the V0 of the
251 * interface as a map instead of creating a huuuge switch :-)
252 */
253
254 /**
255 * Callback for the GET/GETQ/GETK and GETKQ responses
256 * @param cookie client identifier
257 * @param key the key for the item
258 * @param keylen the length of the key
259 * @param body the length of the body
260 * @param bodylen the length of the body
261 * @param flags the flags for the item
262 * @param cas the CAS id for the item
263 */
264 static protocol_binary_response_status
265 get_response_handler(const void *cookie,
266 const void *key,
267 uint16_t keylen,
268 const void *body,
269 uint32_t bodylen,
270 uint32_t flags,
271 uint64_t cas) {
272
273 struct memcached_binary_protocol_client_st *client= (void*)cookie;
274 uint8_t opcode= client->current_command->request.opcode;
275
276 if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ)
277 {
278 keylen= 0;
279 }
280
281 protocol_binary_response_get response= {
282 .message.header.response= {
283 .magic= PROTOCOL_BINARY_RES,
284 .opcode= opcode,
285 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
286 .opaque= client->current_command->request.opaque,
287 .cas= htonll(cas),
288 .keylen= htons(keylen),
289 .extlen= 4,
290 .bodylen= htonl(bodylen + keylen + 4),
291 },
292 .message.body.flags= htonl(flags),
293 };
294
295 protocol_binary_response_status rval;
296 const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
297 if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
298 (rval= spool_output(client, key, keylen)) != success ||
299 (rval= spool_output(client, body, bodylen)) != success)
300 {
301 return rval;
302 }
303
304 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
305 }
306
307 /**
308 * Callback for the STAT responses
309 * @param cookie client identifier
310 * @param key the key for the item
311 * @param keylen the length of the key
312 * @param body the length of the body
313 * @param bodylen the length of the body
314 */
315 static protocol_binary_response_status
316 stat_response_handler(const void *cookie,
317 const void *key,
318 uint16_t keylen,
319 const void *body,
320 uint32_t bodylen) {
321
322 struct memcached_binary_protocol_client_st *client= (void*)cookie;
323
324 protocol_binary_response_no_extras response= {
325 .message.header.response= {
326 .magic= PROTOCOL_BINARY_RES,
327 .opcode= client->current_command->request.opcode,
328 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
329 .opaque= client->current_command->request.opaque,
330 .keylen= htons(keylen),
331 .bodylen= htonl(bodylen + keylen),
332 },
333 };
334
335 protocol_binary_response_status rval;
336 const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
337 if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
338 (rval= spool_output(client, key, keylen)) != success ||
339 (rval= spool_output(client, body, bodylen)) != success)
340 {
341 return rval;
342 }
343
344 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
345 }
346
347 /**
348 * Callback for the VERSION responses
349 * @param cookie client identifier
350 * @param text the length of the body
351 * @param textlen the length of the body
352 */
353 static protocol_binary_response_status
354 version_response_handler(const void *cookie,
355 const void *text,
356 uint32_t textlen) {
357
358 struct memcached_binary_protocol_client_st *client= (void*)cookie;
359
360 protocol_binary_response_no_extras response= {
361 .message.header.response= {
362 .magic= PROTOCOL_BINARY_RES,
363 .opcode= client->current_command->request.opcode,
364 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
365 .opaque= client->current_command->request.opaque,
366 .bodylen= htonl(textlen),
367 },
368 };
369
370 protocol_binary_response_status rval;
371 const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
372 if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
373 (rval= spool_output(client, text, textlen)) != success)
374 {
375 return rval;
376 }
377
378 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
379 }
380
381 /**
382 * Callback for ADD and ADDQ
383 * @param cookie the calling client
384 * @param header the add/addq command
385 * @param response_handler not used
386 * @return the result of the operation
387 */
388 static protocol_binary_response_status
389 add_command_handler(const void *cookie,
390 protocol_binary_request_header *header,
391 memcached_binary_protocol_raw_response_handler response_handler)
392 {
393 protocol_binary_response_status rval;
394
395 struct memcached_binary_protocol_client_st *client= (void*)cookie;
396 if (client->root->callback->interface.v1.add != NULL)
397 {
398 uint16_t keylen= ntohs(header->request.keylen);
399 uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
400 protocol_binary_request_add *request= (void*)header;
401 uint32_t flags= ntohl(request->message.body.flags);
402 uint32_t timeout= ntohl(request->message.body.expiration);
403 char *key= ((char*)header) + sizeof(*header) + 8;
404 char *data= key + keylen;
405 uint64_t cas;
406
407 rval= client->root->callback->interface.v1.add(cookie, key, keylen,
408 data, datalen, flags,
409 timeout, &cas);
410
411 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
412 header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
413 {
414 /* Send a positive request */
415 protocol_binary_response_no_extras response= {
416 .message= {
417 .header.response= {
418 .magic= PROTOCOL_BINARY_RES,
419 .opcode= PROTOCOL_BINARY_CMD_ADD,
420 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
421 .opaque= header->request.opaque,
422 .cas= ntohll(cas)
423 }
424 }
425 };
426 rval= response_handler(cookie, header, (void*)&response);
427 }
428 }
429 else
430 {
431 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
432 }
433
434 return rval;
435 }
436
437 /**
438 * Callback for DECREMENT and DECREMENTQ
439 * @param cookie the calling client
440 * @param header the command
441 * @param response_handler not used
442 * @return the result of the operation
443 */
444 static protocol_binary_response_status
445 decrement_command_handler(const void *cookie,
446 protocol_binary_request_header *header,
447 memcached_binary_protocol_raw_response_handler response_handler)
448 {
449 (void)response_handler;
450 protocol_binary_response_status rval;
451
452 struct memcached_binary_protocol_client_st *client= (void*)cookie;
453 if (client->root->callback->interface.v1.decrement != NULL)
454 {
455 uint16_t keylen= ntohs(header->request.keylen);
456 protocol_binary_request_decr *request= (void*)header;
457 uint64_t init= ntohll(request->message.body.initial);
458 uint64_t delta= ntohll(request->message.body.delta);
459 uint32_t timeout= ntohl(request->message.body.expiration);
460 void *key= request->bytes + sizeof(request->bytes);
461 uint64_t result;
462 uint64_t cas;
463
464 rval= client->root->callback->interface.v1.decrement(cookie, key, keylen,
465 delta, init, timeout,
466 &result, &cas);
467 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
468 header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT)
469 {
470 /* Send a positive request */
471 protocol_binary_response_decr response= {
472 .message= {
473 .header.response= {
474 .magic= PROTOCOL_BINARY_RES,
475 .opcode= PROTOCOL_BINARY_CMD_DECREMENT,
476 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
477 .opaque= header->request.opaque,
478 .cas= ntohll(cas),
479 .bodylen= htonl(8)
480 },
481 .body.value = htonll(result)
482 }
483 };
484 rval= response_handler(cookie, header, (void*)&response);
485 }
486 }
487 else
488 {
489 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
490 }
491
492 return rval;
493 }
494
495 /**
496 * Callback for DELETE and DELETEQ
497 * @param cookie the calling client
498 * @param header the command
499 * @param response_handler not used
500 * @return the result of the operation
501 */
502 static protocol_binary_response_status
503 delete_command_handler(const void *cookie,
504 protocol_binary_request_header *header,
505 memcached_binary_protocol_raw_response_handler response_handler)
506 {
507 (void)response_handler;
508 protocol_binary_response_status rval;
509
510 struct memcached_binary_protocol_client_st *client= (void*)cookie;
511 if (client->root->callback->interface.v1.delete != NULL)
512 {
513 uint16_t keylen= ntohs(header->request.keylen);
514 void *key= (header + 1);
515 uint64_t cas= ntohll(header->request.cas);
516 rval= client->root->callback->interface.v1.delete(cookie, key, keylen, cas);
517 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
518 header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
519 {
520 /* Send a positive request */
521 protocol_binary_response_no_extras response= {
522 .message= {
523 .header.response= {
524 .magic= PROTOCOL_BINARY_RES,
525 .opcode= PROTOCOL_BINARY_CMD_DELETE,
526 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
527 .opaque= header->request.opaque,
528 }
529 }
530 };
531 rval= response_handler(cookie, header, (void*)&response);
532 }
533 }
534 else
535 {
536 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
537 }
538
539 return rval;
540 }
541
542 /**
543 * Callback for FLUSH and FLUSHQ
544 * @param cookie the calling client
545 * @param header the command
546 * @param response_handler not used
547 * @return the result of the operation
548 */
549 static protocol_binary_response_status
550 flush_command_handler(const void *cookie,
551 protocol_binary_request_header *header,
552 memcached_binary_protocol_raw_response_handler response_handler)
553 {
554 (void)response_handler;
555 protocol_binary_response_status rval;
556
557 struct memcached_binary_protocol_client_st *client= (void*)cookie;
558 if (client->root->callback->interface.v1.flush != NULL)
559 {
560 protocol_binary_request_flush *flush= (void*)header;
561 uint32_t timeout= 0;
562 if (htonl(header->request.bodylen) == 4)
563 {
564 timeout= ntohl(flush->message.body.expiration);
565 }
566
567 rval= client->root->callback->interface.v1.flush(cookie, timeout);
568 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
569 header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH)
570 {
571 /* Send a positive request */
572 protocol_binary_response_no_extras response= {
573 .message= {
574 .header.response= {
575 .magic= PROTOCOL_BINARY_RES,
576 .opcode= PROTOCOL_BINARY_CMD_FLUSH,
577 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
578 .opaque= header->request.opaque,
579 }
580 }
581 };
582 rval= response_handler(cookie, header, (void*)&response);
583 }
584 }
585 else
586 {
587 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
588 }
589
590 return rval;
591 }
592
593 /**
594 * Callback for GET, GETK, GETQ, GETKQ
595 * @param cookie the calling client
596 * @param header the command
597 * @param response_handler not used
598 * @return the result of the operation
599 */
600 static protocol_binary_response_status
601 get_command_handler(const void *cookie,
602 protocol_binary_request_header *header,
603 memcached_binary_protocol_raw_response_handler response_handler)
604 {
605 (void)response_handler;
606 protocol_binary_response_status rval;
607
608 struct memcached_binary_protocol_client_st *client= (void*)cookie;
609 if (client->root->callback->interface.v1.get != NULL)
610 {
611 uint16_t keylen= ntohs(header->request.keylen);
612 void *key= (header + 1);
613 rval= client->root->callback->interface.v1.get(cookie, key, keylen,
614 get_response_handler);
615
616 if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT &&
617 (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ ||
618 header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ))
619 {
620 /* Quiet commands shouldn't respond on cache misses */
621 rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
622 }
623 }
624 else
625 {
626 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
627 }
628
629 return rval;
630 }
631
632 /**
633 * Callback for INCREMENT and INCREMENTQ
634 * @param cookie the calling client
635 * @param header the command
636 * @param response_handler not used
637 * @return the result of the operation
638 */
639 static protocol_binary_response_status
640 increment_command_handler(const void *cookie,
641 protocol_binary_request_header *header,
642 memcached_binary_protocol_raw_response_handler response_handler)
643 {
644 (void)response_handler;
645 protocol_binary_response_status rval;
646
647 struct memcached_binary_protocol_client_st *client= (void*)cookie;
648 if (client->root->callback->interface.v1.increment != NULL)
649 {
650 uint16_t keylen= ntohs(header->request.keylen);
651 protocol_binary_request_incr *request= (void*)header;
652 uint64_t init= ntohll(request->message.body.initial);
653 uint64_t delta= ntohll(request->message.body.delta);
654 uint32_t timeout= ntohl(request->message.body.expiration);
655 void *key= request->bytes + sizeof(request->bytes);
656 uint64_t cas;
657 uint64_t result;
658
659 rval= client->root->callback->interface.v1.increment(cookie, key, keylen,
660 delta, init, timeout,
661 &result, &cas);
662 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
663 header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT)
664 {
665 /* Send a positive request */
666 protocol_binary_response_incr response= {
667 .message= {
668 .header.response= {
669 .magic= PROTOCOL_BINARY_RES,
670 .opcode= PROTOCOL_BINARY_CMD_INCREMENT,
671 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
672 .opaque= header->request.opaque,
673 .cas= ntohll(cas),
674 .bodylen= htonl(8)
675 },
676 .body.value = htonll(result)
677 }
678 };
679
680 rval= response_handler(cookie, header, (void*)&response);
681 }
682 }
683 else
684 {
685 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
686 }
687
688 return rval;
689 }
690
691 /**
692 * Callback for noop. Inform the v1 interface about the noop packet, and
693 * create and send a packet back to the client
694 *
695 * @param cookie the calling client
696 * @param header the command
697 * @param response_handler the response handler
698 * @return the result of the operation
699 */
700 static protocol_binary_response_status
701 noop_command_handler(const void *cookie,
702 protocol_binary_request_header *header,
703 memcached_binary_protocol_raw_response_handler response_handler)
704 {
705 struct memcached_binary_protocol_client_st *client= (void*)cookie;
706 if (client->root->callback->interface.v1.noop != NULL)
707 {
708 client->root->callback->interface.v1.noop(cookie);
709 }
710
711 protocol_binary_response_no_extras response= {
712 .message = {
713 .header.response = {
714 .magic = PROTOCOL_BINARY_RES,
715 .opcode= PROTOCOL_BINARY_CMD_NOOP,
716 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
717 .opaque= header->request.opaque,
718 }
719 }
720 };
721
722 return response_handler(cookie, header, (void*)&response);
723 }
724
725 /**
726 * Callback for APPEND and APPENDQ
727 * @param cookie the calling client
728 * @param header the command
729 * @param response_handler not used
730 * @return the result of the operation
731 */
732 static protocol_binary_response_status
733 append_command_handler(const void *cookie,
734 protocol_binary_request_header *header,
735 memcached_binary_protocol_raw_response_handler response_handler)
736 {
737 (void)response_handler;
738 protocol_binary_response_status rval;
739
740 struct memcached_binary_protocol_client_st *client= (void*)cookie;
741 if (client->root->callback->interface.v1.append != NULL)
742 {
743 uint16_t keylen= ntohs(header->request.keylen);
744 uint32_t datalen= ntohl(header->request.bodylen) - keylen;
745 char *key= (void*)(header + 1);
746 char *data= key + keylen;
747 uint64_t cas= ntohll(header->request.cas);
748 uint64_t result_cas;
749
750 rval= client->root->callback->interface.v1.append(cookie, key, keylen,
751 data, datalen, cas,
752 &result_cas);
753 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
754 header->request.opcode == PROTOCOL_BINARY_CMD_APPEND)
755 {
756 /* Send a positive request */
757 protocol_binary_response_no_extras response= {
758 .message= {
759 .header.response= {
760 .magic= PROTOCOL_BINARY_RES,
761 .opcode= PROTOCOL_BINARY_CMD_APPEND,
762 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
763 .opaque= header->request.opaque,
764 .cas= ntohll(result_cas),
765 },
766 }
767 };
768 rval= response_handler(cookie, header, (void*)&response);
769 }
770 }
771 else
772 {
773 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
774 }
775
776 return rval;
777 }
778
779 /**
780 * Callback for PREPEND and PREPENDQ
781 * @param cookie the calling client
782 * @param header the command
783 * @param response_handler not used
784 * @return the result of the operation
785 */
786 static protocol_binary_response_status
787 prepend_command_handler(const void *cookie,
788 protocol_binary_request_header *header,
789 memcached_binary_protocol_raw_response_handler response_handler)
790 {
791 (void)response_handler;
792 protocol_binary_response_status rval;
793
794 struct memcached_binary_protocol_client_st *client= (void*)cookie;
795 if (client->root->callback->interface.v1.prepend != NULL)
796 {
797 uint16_t keylen= ntohs(header->request.keylen);
798 uint32_t datalen= ntohl(header->request.bodylen) - keylen;
799 char *key= (char*)(header + 1);
800 char *data= key + keylen;
801 uint64_t cas= ntohll(header->request.cas);
802 uint64_t result_cas;
803 rval= client->root->callback->interface.v1.prepend(cookie, key, keylen,
804 data, datalen, cas,
805 &result_cas);
806 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
807 header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
808 {
809 /* Send a positive request */
810 protocol_binary_response_no_extras response= {
811 .message= {
812 .header.response= {
813 .magic= PROTOCOL_BINARY_RES,
814 .opcode= PROTOCOL_BINARY_CMD_PREPEND,
815 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
816 .opaque= header->request.opaque,
817 .cas= ntohll(result_cas),
818 },
819 }
820 };
821 rval= response_handler(cookie, header, (void*)&response);
822 }
823 }
824 else
825 {
826 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
827 }
828
829 return rval;
830 }
831
832 /**
833 * Callback for QUIT and QUITQ. Notify the client and shut down the connection
834 * @param cookie the calling client
835 * @param header the command
836 * @param response_handler not used
837 * @return the result of the operation
838 */
839 static protocol_binary_response_status
840 quit_command_handler(const void *cookie,
841 protocol_binary_request_header *header,
842 memcached_binary_protocol_raw_response_handler response_handler)
843 {
844 struct memcached_binary_protocol_client_st *client= (void*)cookie;
845 if (client->root->callback->interface.v1.quit != NULL)
846 {
847 client->root->callback->interface.v1.quit(cookie);
848 }
849
850 protocol_binary_response_no_extras response= {
851 .message = {
852 .header.response = {
853 .magic= PROTOCOL_BINARY_RES,
854 .opcode= PROTOCOL_BINARY_CMD_QUIT,
855 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
856 .opaque= header->request.opaque
857 }
858 }
859 };
860
861 if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
862 {
863 response_handler(cookie, header, (void*)&response);
864 }
865
866 /* I need a better way to signal to close the connection */
867 return PROTOCOL_BINARY_RESPONSE_EIO;
868 }
869
870 /**
871 * Callback for REPLACE and REPLACEQ
872 * @param cookie the calling client
873 * @param header the command
874 * @param response_handler not used
875 * @return the result of the operation
876 */
877 static protocol_binary_response_status
878 replace_command_handler(const void *cookie,
879 protocol_binary_request_header *header,
880 memcached_binary_protocol_raw_response_handler response_handler)
881 {
882 (void)response_handler;
883 protocol_binary_response_status rval;
884
885 struct memcached_binary_protocol_client_st *client= (void*)cookie;
886 if (client->root->callback->interface.v1.replace != NULL)
887 {
888 uint16_t keylen= ntohs(header->request.keylen);
889 uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
890 protocol_binary_request_replace *request= (void*)header;
891 uint32_t flags= ntohl(request->message.body.flags);
892 uint32_t timeout= ntohl(request->message.body.expiration);
893 char *key= ((char*)header) + sizeof(*header) + 8;
894 char *data= key + keylen;
895 uint64_t cas= ntohll(header->request.cas);
896 uint64_t result_cas;
897
898 rval= client->root->callback->interface.v1.replace(cookie, key, keylen,
899 data, datalen, flags,
900 timeout, cas,
901 &result_cas);
902 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
903 header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
904 {
905 /* Send a positive request */
906 protocol_binary_response_no_extras response= {
907 .message= {
908 .header.response= {
909 .magic= PROTOCOL_BINARY_RES,
910 .opcode= PROTOCOL_BINARY_CMD_REPLACE,
911 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
912 .opaque= header->request.opaque,
913 .cas= ntohll(result_cas),
914 },
915 }
916 };
917 rval= response_handler(cookie, header, (void*)&response);
918 }
919 }
920 else
921 {
922 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
923 }
924
925 return rval;
926 }
927
928 /**
929 * Callback for SET and SETQ
930 * @param cookie the calling client
931 * @param header the command
932 * @param response_handler not used
933 * @return the result of the operation
934 */
935 static protocol_binary_response_status
936 set_command_handler(const void *cookie,
937 protocol_binary_request_header *header,
938 memcached_binary_protocol_raw_response_handler response_handler)
939 {
940 (void)response_handler;
941 protocol_binary_response_status rval;
942
943 struct memcached_binary_protocol_client_st *client= (void*)cookie;
944 if (client->root->callback->interface.v1.set != NULL)
945 {
946 uint16_t keylen= ntohs(header->request.keylen);
947 uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
948 protocol_binary_request_replace *request= (void*)header;
949 uint32_t flags= ntohl(request->message.body.flags);
950 uint32_t timeout= ntohl(request->message.body.expiration);
951 char *key= ((char*)header) + sizeof(*header) + 8;
952 char *data= key + keylen;
953 uint64_t cas= ntohll(header->request.cas);
954 uint64_t result_cas;
955
956
957 rval= client->root->callback->interface.v1.set(cookie, key, keylen,
958 data, datalen, flags,
959 timeout, cas, &result_cas);
960 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
961 header->request.opcode == PROTOCOL_BINARY_CMD_SET)
962 {
963 /* Send a positive request */
964 protocol_binary_response_no_extras response= {
965 .message= {
966 .header.response= {
967 .magic= PROTOCOL_BINARY_RES,
968 .opcode= PROTOCOL_BINARY_CMD_SET,
969 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
970 .opaque= header->request.opaque,
971 .cas= ntohll(result_cas),
972 },
973 }
974 };
975 rval= response_handler(cookie, header, (void*)&response);
976 }
977 }
978 else
979 {
980 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
981 }
982
983 return rval;
984 }
985
986 /**
987 * Callback for STAT
988 * @param cookie the calling client
989 * @param header the command
990 * @param response_handler not used
991 * @return the result of the operation
992 */
993 static protocol_binary_response_status
994 stat_command_handler(const void *cookie,
995 protocol_binary_request_header *header,
996 memcached_binary_protocol_raw_response_handler response_handler)
997 {
998 (void)response_handler;
999 protocol_binary_response_status rval;
1000
1001 struct memcached_binary_protocol_client_st *client= (void*)cookie;
1002 if (client->root->callback->interface.v1.stat != NULL)
1003 {
1004 uint16_t keylen= ntohs(header->request.keylen);
1005
1006 rval= client->root->callback->interface.v1.stat(cookie,
1007 (void*)(header + 1),
1008 keylen,
1009 stat_response_handler);
1010 }
1011 else
1012 {
1013 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1014 }
1015
1016 return rval;
1017 }
1018
1019 /**
1020 * Callback for VERSION
1021 * @param cookie the calling client
1022 * @param header the command
1023 * @param response_handler not used
1024 * @return the result of the operation
1025 */
1026 static protocol_binary_response_status
1027 version_command_handler(const void *cookie,
1028 protocol_binary_request_header *header,
1029 memcached_binary_protocol_raw_response_handler response_handler)
1030 {
1031 (void)response_handler;
1032 (void)header;
1033 protocol_binary_response_status rval;
1034
1035 struct memcached_binary_protocol_client_st *client= (void*)cookie;
1036 if (client->root->callback->interface.v1.version != NULL)
1037 {
1038 rval= client->root->callback->interface.v1.version(cookie,
1039 version_response_handler);
1040 }
1041 else
1042 {
1043 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1044 }
1045
1046 return rval;
1047 }
1048
1049 /**
1050 * The map to remap between the com codes and the v1 logical setting
1051 */
1052 static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256]= {
1053 [PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
1054 [PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
1055 [PROTOCOL_BINARY_CMD_APPENDQ]= append_command_handler,
1056 [PROTOCOL_BINARY_CMD_APPEND]= append_command_handler,
1057 [PROTOCOL_BINARY_CMD_DECREMENTQ]= decrement_command_handler,
1058 [PROTOCOL_BINARY_CMD_DECREMENT]= decrement_command_handler,
1059 [PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
1060 [PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
1061 [PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
1062 [PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
1063 [PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
1064 [PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
1065 [PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
1066 [PROTOCOL_BINARY_CMD_GET]= get_command_handler,
1067 [PROTOCOL_BINARY_CMD_INCREMENTQ]= increment_command_handler,
1068 [PROTOCOL_BINARY_CMD_INCREMENT]= increment_command_handler,
1069 [PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
1070 [PROTOCOL_BINARY_CMD_PREPENDQ]= prepend_command_handler,
1071 [PROTOCOL_BINARY_CMD_PREPEND]= prepend_command_handler,
1072 [PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
1073 [PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
1074 [PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
1075 [PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
1076 [PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
1077 [PROTOCOL_BINARY_CMD_SET]= set_command_handler,
1078 [PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
1079 [PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
1080 };
1081
1082 /**
1083 * Try to execute a command. Fire the pre/post functions and the specialized
1084 * handler function if it's set. If not, the unknown probe should be fired
1085 * if it's present.
1086 * @param client the client connection to operate on
1087 * @param header the command to execute
1088 * @return true if success or false if a fatal error occured so that the
1089 * connection should be shut down.
1090 */
1091 static bool execute_command(struct memcached_binary_protocol_client_st *client, protocol_binary_request_header *header)
1092 {
1093 if (client->root->pedantic &&
1094 memcached_binary_protocol_pedantic_check_request(header))
1095 {
1096 /* @todo return invalid command packet */
1097 }
1098
1099 /* we got all data available, execute the callback! */
1100 if (client->root->callback->pre_execute != NULL)
1101 {
1102 client->root->callback->pre_execute(client, header);
1103 }
1104
1105 protocol_binary_response_status rval;
1106 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1107 uint8_t cc= header->request.opcode;
1108
1109 switch (client->root->callback->interface_version)
1110 {
1111 case 0:
1112 if (client->root->callback->interface.v0.comcode[cc] != NULL) {
1113 rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler);
1114 }
1115 break;
1116 case 1:
1117 if (comcode_v0_v1_remap[cc] != NULL) {
1118 rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler);
1119 }
1120 break;
1121 default:
1122 /* Unknown interface.
1123 * It should be impossible to get here so I'll just call abort
1124 * to avoid getting a compiler warning :-)
1125 */
1126 abort();
1127 }
1128
1129
1130 if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND &&
1131 client->root->callback->unknown != NULL)
1132 {
1133 rval= client->root->callback->unknown(client, header, raw_response_handler);
1134 }
1135
1136 if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS &&
1137 rval != PROTOCOL_BINARY_RESPONSE_EIO)
1138 {
1139 protocol_binary_response_no_extras response= {
1140 .message= {
1141 .header.response= {
1142 .magic= PROTOCOL_BINARY_RES,
1143 .opcode= cc,
1144 .status= htons(rval),
1145 .opaque= header->request.opaque,
1146 },
1147 }
1148 };
1149 rval= raw_response_handler(client, header, (void*)&response);
1150 }
1151
1152 if (client->root->callback->post_execute != NULL)
1153 {
1154 client->root->callback->post_execute(client, header);
1155 }
1156
1157 return rval != PROTOCOL_BINARY_RESPONSE_EIO;
1158 }
1159
1160 /*
1161 ** **********************************************************************
1162 ** * PUBLIC INTERFACE
1163 ** * See protocol_handler.h for function description
1164 ** **********************************************************************
1165 */
1166 struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void)
1167 {
1168 struct memcached_binary_protocol_st *ret= calloc(1, sizeof(*ret));
1169 if (ret != NULL)
1170 {
1171 ret->recv= default_recv;
1172 ret->send= default_send;
1173 ret->input_buffer_size= 1 * 1024 * 1024;
1174 ret->input_buffer= malloc(ret->input_buffer_size);
1175 if (ret->input_buffer == NULL)
1176 {
1177 free(ret);
1178 ret= NULL;
1179 return NULL;
1180 }
1181
1182 ret->buffer_cache = cache_create("protocol_handler",
1183 CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
1184 0, NULL, NULL);
1185 if (ret->buffer_cache == NULL) {
1186 free(ret->input_buffer);
1187 free(ret);
1188 }
1189 }
1190
1191 return ret;
1192 }
1193
1194 void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance)
1195 {
1196 cache_destroy(instance->buffer_cache);
1197 free(instance->input_buffer);
1198 free(instance);
1199 }
1200
1201 struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance)
1202 {
1203 return instance->callback;
1204 }
1205
1206 void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback)
1207 {
1208 instance->callback= callback;
1209 }
1210
1211 memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie)
1212 {
1213 (void)cookie;
1214 return raw_response_handler;
1215 }
1216
1217 void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable)
1218 {
1219 instance->pedantic= enable;
1220 }
1221
1222 bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance)
1223 {
1224 return instance->pedantic;
1225 }
1226
1227 struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock)
1228 {
1229 struct memcached_binary_protocol_client_st *ret= calloc(1, sizeof(*ret));
1230 if (ret != NULL)
1231 {
1232 ret->root= instance;
1233 ret->sock= sock;
1234 }
1235
1236 return ret;
1237 }
1238
1239 void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client)
1240 {
1241 free(client);
1242 }
1243
1244 enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client)
1245 {
1246 /* Try to send data and read from the socket */
1247 bool more_data= true;
1248 do
1249 {
1250 ssize_t len= client->root->recv(client,
1251 client->sock,
1252 client->root->input_buffer + client->input_buffer_offset,
1253 client->root->input_buffer_size - client->input_buffer_offset);
1254
1255 if (len > 0)
1256 {
1257 /* Do we have the complete packet? */
1258 if (client->input_buffer_offset > 0)
1259 {
1260 memcpy(client->root->input_buffer, client->input_buffer,
1261 client->input_buffer_offset);
1262 len += (ssize_t)client->input_buffer_offset;
1263
1264 /* @todo use buffer-cache! */
1265 free(client->input_buffer);
1266 client->input_buffer_offset= 0;
1267 }
1268
1269 /* try to parse all of the received packets */
1270 protocol_binary_request_header *header;
1271 header= (void*)client->root->input_buffer;
1272
1273 if (header->request.magic != (uint8_t)PROTOCOL_BINARY_REQ)
1274 {
1275 client->error= EINVAL;
1276 return ERROR_EVENT;
1277 }
1278
1279 while (len >= (ssize_t)sizeof(*header) &&
1280 (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen))))
1281 {
1282
1283 /* I have the complete package */
1284 client->current_command = header;
1285 if (!execute_command(client, header))
1286 {
1287 return ERROR_EVENT;
1288 }
1289
1290 ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen));
1291 len -= total;
1292 if (len > 0)
1293 {
1294 intptr_t ptr= (intptr_t)header;
1295 ptr += total;
1296 if ((ptr % 8) == 0)
1297 {
1298 header= (void*)ptr;
1299 }
1300 else
1301 {
1302 memmove(client->root->input_buffer, (void*)ptr, (size_t)len);
1303 header= (void*)client->root->input_buffer;
1304 }
1305 }
1306 }
1307
1308 if (len > 0)
1309 {
1310 /* save the data for later on */
1311 /* @todo use buffer-cache */
1312 client->input_buffer= malloc((size_t)len);
1313 if (client->input_buffer == NULL)
1314 {
1315 client->error= ENOMEM;
1316 return ERROR_EVENT;
1317 }
1318 memcpy(client->input_buffer, header, (size_t)len);
1319 client->input_buffer_offset= (size_t)len;
1320 more_data= false;
1321 }
1322 }
1323 else if (len == 0)
1324 {
1325 /* Connection closed */
1326 drain_output(client);
1327 return ERROR_EVENT;
1328 }
1329 else
1330 {
1331 if (errno != EWOULDBLOCK)
1332 {
1333 client->error= errno;
1334 /* mark this client as terminated! */
1335 return ERROR_EVENT;
1336 }
1337 more_data = false;
1338 }
1339 } while (more_data);
1340
1341 if (!drain_output(client))
1342 {
1343 return ERROR_EVENT;
1344 }
1345
1346 return (client->output) ? READ_WRITE_EVENT : READ_EVENT;
1347 }