Fix compilation on Linux (lacks ntohll etc)
[m6w6/libmemcached] / libmemcached / protocol / protocol_handler.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include "common.h"
3
4 #include <stdlib.h>
5 #include <sys/types.h>
6 #include <sys/socket.h>
7 #include <errno.h>
8 #include <stdbool.h>
9 #include <string.h>
10 #include <stdio.h>
11
12 /*
13 ** **********************************************************************
14 ** INTERNAL INTERFACE
15 ** **********************************************************************
16 */
17
18 /**
19 * The default function to receive data from the client. This function
20 * just wraps the recv function to receive from a socket.
21 * See man -s3socket recv for more information.
22 *
23 * @param cookie cookie indentifying a client, not used
24 * @param sock socket to read from
25 * @param buf the destination buffer
26 * @param nbytes the number of bytes to read
27 * @return the number of bytes transferred of -1 upon error
28 */
29 static ssize_t default_recv(const void *cookie,
30 int sock,
31 void *buf,
32 size_t nbytes)
33 {
34 (void)cookie;
35 return recv(sock, buf, nbytes, 0);
36 }
37
38 /**
39 * The default function to send data to the server. This function
40 * just wraps the send function to send through a socket.
41 * See man -s3socket send for more information.
42 *
43 * @param cookie cookie indentifying a client, not used
44 * @param sock socket to send to
45 * @param buf the source buffer
46 * @param nbytes the number of bytes to send
47 * @return the number of bytes transferred of -1 upon error
48 */
49 static ssize_t default_send(const void *cookie,
50 int fd,
51 const void *buf,
52 size_t nbytes)
53 {
54 (void)cookie;
55 return send(fd, buf, nbytes, 0);
56 }
57
58 /**
59 * Try to drain the output buffers without blocking
60 *
61 * @param client the client to drain
62 * @return false if an error occured (connection should be shut down)
63 * true otherwise (please note that there may be more data to
64 * left in the buffer to send)
65 */
66 static bool drain_output(struct memcached_binary_protocol_client_st *client)
67 {
68 ssize_t len;
69
70 // Do we have pending data to send?
71 while (client->output != NULL)
72 {
73 len= client->root->send(client,
74 client->sock,
75 client->output->data + client->output->offset,
76 client->output->nbytes - client->output->offset);
77
78 if (len == -1)
79 {
80 if (errno == EWOULDBLOCK)
81 {
82 return true;
83 }
84 else if (errno != EINTR)
85 {
86 client->error= errno;
87 return false;
88 }
89 }
90 else
91 {
92 client->output->offset += (size_t)len;
93 if (client->output->offset == client->output->nbytes)
94 {
95 /* This was the complete buffer */
96 struct chunk_st *old= client->output;
97 client->output= client->output->next;
98 if (client->output == NULL)
99 {
100 client->output_tail= NULL;
101 }
102 cache_free(client->root->buffer_cache, old);
103 }
104 }
105 }
106
107 return true;
108 }
109
110 /**
111 * Allocate an output buffer and chain it into the output list
112 *
113 * @param client the client that needs the buffer
114 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
115 */
116 static struct chunk_st*
117 allocate_output_chunk(struct memcached_binary_protocol_client_st *client)
118 {
119 struct chunk_st*ret= cache_alloc(client->root->buffer_cache);
120 if (ret == NULL) {
121 return NULL;
122 }
123
124 ret->offset = ret->nbytes = 0;
125 ret->next = NULL;
126 ret->size = CHUNK_BUFFERSIZE;
127 ret->data= (void*)(ret + 1);
128 if (client->output == NULL)
129 {
130 client->output = client->output_tail = ret;
131 }
132 else
133 {
134 client->output_tail->next= ret;
135 client->output_tail= ret;
136 }
137
138 return ret;
139 }
140
141 /**
142 * Spool data into the send-buffer for a client.
143 *
144 * @param client the client to spool the data for
145 * @param data the data to spool
146 * @param length the number of bytes of data to spool
147 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
148 * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
149 */
150 static protocol_binary_response_status
151 spool_output(struct memcached_binary_protocol_client_st *client,
152 const void *data,
153 size_t length)
154 {
155 size_t offset = 0;
156
157 struct chunk_st *chunk= client->output;
158 while (offset < length)
159 {
160 if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
161 {
162 if ((chunk= allocate_output_chunk(client)) == NULL)
163 {
164 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
165 }
166 }
167
168 size_t bulk = length - offset;
169 if (bulk > chunk->size - chunk->nbytes)
170 {
171 bulk = chunk->size - chunk->nbytes;
172 }
173
174 memcpy(chunk->data + chunk->nbytes, data, bulk);
175 chunk->nbytes += bulk;
176 offset += bulk;
177 }
178
179 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
180 }
181
182 /**
183 * Send a preformatted packet back to the client. If the connection is in
184 * pedantic mode, it will validate the packet and refuse to send it if it
185 * breaks the specification.
186 *
187 * @param cookie client identification
188 * @param request the original request packet
189 * @param response the packet to send
190 * @return The status of the operation
191 */
192 static protocol_binary_response_status
193 raw_response_handler(const void *cookie,
194 protocol_binary_request_header *request,
195 protocol_binary_response_header *response)
196 {
197 struct memcached_binary_protocol_client_st *client= (void*)cookie;
198
199 if (client->root->pedantic &&
200 !memcached_binary_protocol_pedantic_check_response(request, response))
201 {
202 return PROTOCOL_BINARY_RESPONSE_EINVAL;
203 }
204
205 if (!drain_output(client))
206 {
207 return PROTOCOL_BINARY_RESPONSE_EIO;
208 }
209
210 size_t len= sizeof(*response) + htonl(response->response.bodylen);
211 size_t offset= 0;
212 char *ptr= (void*)response;
213
214 if (client->output == NULL)
215 {
216 /* I can write directly to the socket.... */
217 do
218 {
219 size_t num_bytes= len - offset;
220 ssize_t nw= client->root->send(client,
221 client->sock,
222 ptr + offset,
223 num_bytes);
224 if (nw == -1)
225 {
226 if (errno == EWOULDBLOCK)
227 {
228 break;
229 }
230 else if (errno != EINTR)
231 {
232 client->error= errno;
233 return PROTOCOL_BINARY_RESPONSE_EIO;
234 }
235 }
236 else
237 {
238 offset += (size_t)nw;
239 }
240 } while (offset < len);
241 }
242
243 return spool_output(client, ptr, len - offset);
244 }
245
246 /*
247 * Version 0 of the interface is really low level and protocol specific,
248 * while the version 1 of the interface is more API focused. We need a
249 * way to translate between the command codes on the wire and the
250 * application level interface in V1, so let's just use the V0 of the
251 * interface as a map instead of creating a huuuge switch :-)
252 */
253
254 /**
255 * Callback for the GET/GETQ/GETK and GETKQ responses
256 * @param cookie client identifier
257 * @param key the key for the item
258 * @param keylen the length of the key
259 * @param body the length of the body
260 * @param bodylen the length of the body
261 * @param flags the flags for the item
262 * @param cas the CAS id for the item
263 */
264 static protocol_binary_response_status
265 get_response_handler(const void *cookie,
266 const void *key,
267 uint16_t keylen,
268 const void *body,
269 uint32_t bodylen,
270 uint32_t flags,
271 uint64_t cas) {
272
273 struct memcached_binary_protocol_client_st *client= (void*)cookie;
274 uint8_t opcode= client->current_command->request.opcode;
275
276 if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ)
277 {
278 keylen= 0;
279 }
280
281 protocol_binary_response_get response= {
282 .message.header.response= {
283 .magic= PROTOCOL_BINARY_RES,
284 .opcode= opcode,
285 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
286 .opaque= client->current_command->request.opaque,
287 .cas= htonll(cas),
288 .keylen= htons(keylen),
289 .extlen= 4,
290 .bodylen= htonl(bodylen + keylen + 4),
291 },
292 .message.body.flags= htonl(flags),
293 };
294
295 protocol_binary_response_status rval;
296 const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
297 if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
298 (rval= spool_output(client, key, keylen)) != success ||
299 (rval= spool_output(client, body, bodylen)) != success)
300 {
301 return rval;
302 }
303
304 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
305 }
306
307 /**
308 * Callback for the STAT responses
309 * @param cookie client identifier
310 * @param key the key for the item
311 * @param keylen the length of the key
312 * @param body the length of the body
313 * @param bodylen the length of the body
314 */
315 static protocol_binary_response_status
316 stat_response_handler(const void *cookie,
317 const void *key,
318 uint16_t keylen,
319 const void *body,
320 uint32_t bodylen) {
321
322 struct memcached_binary_protocol_client_st *client= (void*)cookie;
323
324 protocol_binary_response_no_extras response= {
325 .message.header.response= {
326 .magic= PROTOCOL_BINARY_RES,
327 .opcode= client->current_command->request.opcode,
328 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
329 .opaque= client->current_command->request.opaque,
330 .keylen= htons(keylen),
331 .bodylen= htonl(bodylen + keylen),
332 },
333 };
334
335 protocol_binary_response_status rval;
336 const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
337 if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
338 (rval= spool_output(client, key, keylen)) != success ||
339 (rval= spool_output(client, body, bodylen)) != success)
340 {
341 return rval;
342 }
343
344 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
345 }
346
347 /**
348 * Callback for the VERSION responses
349 * @param cookie client identifier
350 * @param text the length of the body
351 * @param textlen the length of the body
352 */
353 static protocol_binary_response_status
354 version_response_handler(const void *cookie,
355 const void *text,
356 uint32_t textlen) {
357
358 struct memcached_binary_protocol_client_st *client= (void*)cookie;
359
360 protocol_binary_response_no_extras response= {
361 .message.header.response= {
362 .magic= PROTOCOL_BINARY_RES,
363 .opcode= client->current_command->request.opcode,
364 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
365 .opaque= client->current_command->request.opaque,
366 .bodylen= htonl(textlen),
367 },
368 };
369
370 protocol_binary_response_status rval;
371 const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
372 if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
373 (rval= spool_output(client, text, textlen)) != success)
374 {
375 return rval;
376 }
377
378 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
379 }
380
381 /**
382 * Callback for ADD and ADDQ
383 * @param cookie the calling client
384 * @param header the add/addq command
385 * @param response_handler not used
386 * @return the result of the operation
387 */
388 static protocol_binary_response_status
389 add_command_handler(const void *cookie,
390 protocol_binary_request_header *header,
391 memcached_binary_protocol_raw_response_handler response_handler)
392 {
393 protocol_binary_response_status rval;
394
395 struct memcached_binary_protocol_client_st *client= (void*)cookie;
396 if (client->root->callback->interface.v1.add != NULL)
397 {
398 uint16_t keylen= ntohs(header->request.keylen);
399 uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
400 protocol_binary_request_add *request= (void*)header;
401 uint32_t flags= ntohl(request->message.body.flags);
402 uint32_t timeout= ntohl(request->message.body.expiration);
403 char *key= ((char*)header) + sizeof(*header) + 8;
404 char *data= key + keylen;
405 uint64_t cas;
406
407 rval= client->root->callback->interface.v1.add(cookie, key, keylen,
408 data, datalen, flags,
409 timeout, &cas);
410
411 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
412 header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
413 {
414 /* Send a positive request */
415 protocol_binary_response_no_extras response= {
416 .message= {
417 .header.response= {
418 .magic= PROTOCOL_BINARY_RES,
419 .opcode= PROTOCOL_BINARY_CMD_ADD,
420 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
421 .opaque= header->request.opaque,
422 .cas= ntohll(cas)
423 }
424 }
425 };
426 rval= response_handler(cookie, header, (void*)&response);
427 }
428 }
429 else
430 {
431 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
432 }
433
434 return rval;
435 }
436
437 /**
438 * Callback for DECREMENT and DECREMENTQ
439 * @param cookie the calling client
440 * @param header the command
441 * @param response_handler not used
442 * @return the result of the operation
443 */
444 static protocol_binary_response_status
445 decrement_command_handler(const void *cookie,
446 protocol_binary_request_header *header,
447 memcached_binary_protocol_raw_response_handler response_handler)
448 {
449 (void)response_handler;
450 protocol_binary_response_status rval;
451
452 struct memcached_binary_protocol_client_st *client= (void*)cookie;
453 if (client->root->callback->interface.v1.decrement != NULL)
454 {
455 uint16_t keylen= ntohs(header->request.keylen);
456 protocol_binary_request_decr *request= (void*)header;
457 uint64_t init= ntohll(request->message.body.initial);
458 uint64_t delta= ntohll(request->message.body.delta);
459 uint32_t timeout= ntohl(request->message.body.expiration);
460 void *key= request->bytes + sizeof(request->bytes);
461 uint64_t result;
462 uint64_t cas;
463
464 char buffer[1024] = {0};
465 memcpy(buffer, key, keylen);
466 fprintf(stderr, "%s\n", buffer);
467
468
469 rval= client->root->callback->interface.v1.decrement(cookie, key, keylen,
470 delta, init, timeout,
471 &result, &cas);
472 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
473 header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT)
474 {
475 /* Send a positive request */
476 protocol_binary_response_decr response= {
477 .message= {
478 .header.response= {
479 .magic= PROTOCOL_BINARY_RES,
480 .opcode= PROTOCOL_BINARY_CMD_DECREMENT,
481 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
482 .opaque= header->request.opaque,
483 .cas= ntohll(cas),
484 .bodylen= htonl(8)
485 },
486 .body.value = htonll(result)
487 }
488 };
489 rval= response_handler(cookie, header, (void*)&response);
490 }
491 }
492 else
493 {
494 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
495 }
496
497 return rval;
498 }
499
500 /**
501 * Callback for DELETE and DELETEQ
502 * @param cookie the calling client
503 * @param header the command
504 * @param response_handler not used
505 * @return the result of the operation
506 */
507 static protocol_binary_response_status
508 delete_command_handler(const void *cookie,
509 protocol_binary_request_header *header,
510 memcached_binary_protocol_raw_response_handler response_handler)
511 {
512 (void)response_handler;
513 protocol_binary_response_status rval;
514
515 struct memcached_binary_protocol_client_st *client= (void*)cookie;
516 if (client->root->callback->interface.v1.delete != NULL)
517 {
518 uint16_t keylen= ntohs(header->request.keylen);
519 void *key= (header + 1);
520 uint64_t cas= ntohll(header->request.cas);
521 rval= client->root->callback->interface.v1.delete(cookie, key, keylen, cas);
522 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
523 header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
524 {
525 /* Send a positive request */
526 protocol_binary_response_no_extras response= {
527 .message= {
528 .header.response= {
529 .magic= PROTOCOL_BINARY_RES,
530 .opcode= PROTOCOL_BINARY_CMD_DELETE,
531 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
532 .opaque= header->request.opaque,
533 }
534 }
535 };
536 rval= response_handler(cookie, header, (void*)&response);
537 }
538 }
539 else
540 {
541 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
542 }
543
544 return rval;
545 }
546
547 /**
548 * Callback for FLUSH and FLUSHQ
549 * @param cookie the calling client
550 * @param header the command
551 * @param response_handler not used
552 * @return the result of the operation
553 */
554 static protocol_binary_response_status
555 flush_command_handler(const void *cookie,
556 protocol_binary_request_header *header,
557 memcached_binary_protocol_raw_response_handler response_handler)
558 {
559 (void)response_handler;
560 protocol_binary_response_status rval;
561
562 struct memcached_binary_protocol_client_st *client= (void*)cookie;
563 if (client->root->callback->interface.v1.flush != NULL)
564 {
565 protocol_binary_request_flush *flush= (void*)header;
566 uint32_t timeout= 0;
567 if (htonl(header->request.bodylen) == 4)
568 {
569 timeout= ntohl(flush->message.body.expiration);
570 }
571
572 rval= client->root->callback->interface.v1.flush(cookie, timeout);
573 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
574 header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH)
575 {
576 /* Send a positive request */
577 protocol_binary_response_no_extras response= {
578 .message= {
579 .header.response= {
580 .magic= PROTOCOL_BINARY_RES,
581 .opcode= PROTOCOL_BINARY_CMD_FLUSH,
582 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
583 .opaque= header->request.opaque,
584 }
585 }
586 };
587 rval= response_handler(cookie, header, (void*)&response);
588 }
589 }
590 else
591 {
592 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
593 }
594
595 return rval;
596 }
597
598 /**
599 * Callback for GET, GETK, GETQ, GETKQ
600 * @param cookie the calling client
601 * @param header the command
602 * @param response_handler not used
603 * @return the result of the operation
604 */
605 static protocol_binary_response_status
606 get_command_handler(const void *cookie,
607 protocol_binary_request_header *header,
608 memcached_binary_protocol_raw_response_handler response_handler)
609 {
610 (void)response_handler;
611 protocol_binary_response_status rval;
612
613 struct memcached_binary_protocol_client_st *client= (void*)cookie;
614 if (client->root->callback->interface.v1.get != NULL)
615 {
616 uint16_t keylen= ntohs(header->request.keylen);
617 void *key= (header + 1);
618 rval= client->root->callback->interface.v1.get(cookie, key, keylen,
619 get_response_handler);
620
621 if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT &&
622 (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ ||
623 header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ))
624 {
625 /* Quiet commands shouldn't respond on cache misses */
626 rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
627 }
628 }
629 else
630 {
631 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
632 }
633
634 return rval;
635 }
636
637 /**
638 * Callback for INCREMENT and INCREMENTQ
639 * @param cookie the calling client
640 * @param header the command
641 * @param response_handler not used
642 * @return the result of the operation
643 */
644 static protocol_binary_response_status
645 increment_command_handler(const void *cookie,
646 protocol_binary_request_header *header,
647 memcached_binary_protocol_raw_response_handler response_handler)
648 {
649 (void)response_handler;
650 protocol_binary_response_status rval;
651
652 struct memcached_binary_protocol_client_st *client= (void*)cookie;
653 if (client->root->callback->interface.v1.increment != NULL)
654 {
655 uint16_t keylen= ntohs(header->request.keylen);
656 protocol_binary_request_incr *request= (void*)header;
657 uint64_t init= ntohll(request->message.body.initial);
658 uint64_t delta= ntohll(request->message.body.delta);
659 uint32_t timeout= ntohl(request->message.body.expiration);
660 void *key= request->bytes + sizeof(request->bytes);
661 uint64_t cas;
662 uint64_t result;
663
664 rval= client->root->callback->interface.v1.increment(cookie, key, keylen,
665 delta, init, timeout,
666 &result, &cas);
667 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
668 header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT)
669 {
670 /* Send a positive request */
671 protocol_binary_response_incr response= {
672 .message= {
673 .header.response= {
674 .magic= PROTOCOL_BINARY_RES,
675 .opcode= PROTOCOL_BINARY_CMD_INCREMENT,
676 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
677 .opaque= header->request.opaque,
678 .cas= ntohll(cas),
679 .bodylen= htonl(8)
680 },
681 .body.value = htonll(result)
682 }
683 };
684 rval= response_handler(cookie, header, (void*)&response);
685 }
686 }
687 else
688 {
689 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
690 }
691
692 return rval;
693 }
694
695 /**
696 * Callback for noop. Inform the v1 interface about the noop packet, and
697 * create and send a packet back to the client
698 *
699 * @param cookie the calling client
700 * @param header the command
701 * @param response_handler the response handler
702 * @return the result of the operation
703 */
704 static protocol_binary_response_status
705 noop_command_handler(const void *cookie,
706 protocol_binary_request_header *header,
707 memcached_binary_protocol_raw_response_handler response_handler)
708 {
709 struct memcached_binary_protocol_client_st *client= (void*)cookie;
710 if (client->root->callback->interface.v1.noop != NULL)
711 {
712 client->root->callback->interface.v1.noop(cookie);
713 }
714
715 protocol_binary_response_no_extras response= {
716 .message = {
717 .header.response = {
718 .magic = PROTOCOL_BINARY_RES,
719 .opcode= PROTOCOL_BINARY_CMD_NOOP,
720 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
721 .opaque= header->request.opaque,
722 }
723 }
724 };
725
726 return response_handler(cookie, header, (void*)&response);
727 }
728
729 /**
730 * Callback for APPEND and APPENDQ
731 * @param cookie the calling client
732 * @param header the command
733 * @param response_handler not used
734 * @return the result of the operation
735 */
736 static protocol_binary_response_status
737 append_command_handler(const void *cookie,
738 protocol_binary_request_header *header,
739 memcached_binary_protocol_raw_response_handler response_handler)
740 {
741 (void)response_handler;
742 protocol_binary_response_status rval;
743
744 struct memcached_binary_protocol_client_st *client= (void*)cookie;
745 if (client->root->callback->interface.v1.append != NULL)
746 {
747 uint16_t keylen= ntohs(header->request.keylen);
748 uint32_t datalen= ntohl(header->request.bodylen) - keylen;
749 char *key= (void*)(header + 1);
750 char *data= key + keylen;
751 uint64_t cas= ntohll(header->request.cas);
752 uint64_t result_cas;
753
754 rval= client->root->callback->interface.v1.append(cookie, key, keylen,
755 data, datalen, cas,
756 &result_cas);
757 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
758 header->request.opcode == PROTOCOL_BINARY_CMD_APPEND)
759 {
760 /* Send a positive request */
761 protocol_binary_response_no_extras response= {
762 .message= {
763 .header.response= {
764 .magic= PROTOCOL_BINARY_RES,
765 .opcode= PROTOCOL_BINARY_CMD_APPEND,
766 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
767 .opaque= header->request.opaque,
768 .cas= ntohll(result_cas),
769 },
770 }
771 };
772 rval= response_handler(cookie, header, (void*)&response);
773 }
774 }
775 else
776 {
777 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
778 }
779
780 return rval;
781 }
782
783 /**
784 * Callback for PREPEND and PREPENDQ
785 * @param cookie the calling client
786 * @param header the command
787 * @param response_handler not used
788 * @return the result of the operation
789 */
790 static protocol_binary_response_status
791 prepend_command_handler(const void *cookie,
792 protocol_binary_request_header *header,
793 memcached_binary_protocol_raw_response_handler response_handler)
794 {
795 (void)response_handler;
796 protocol_binary_response_status rval;
797
798 struct memcached_binary_protocol_client_st *client= (void*)cookie;
799 if (client->root->callback->interface.v1.prepend != NULL)
800 {
801 uint16_t keylen= ntohs(header->request.keylen);
802 uint32_t datalen= ntohl(header->request.bodylen) - keylen;
803 char *key= (char*)(header + 1);
804 char *data= key + keylen;
805 uint64_t cas= ntohll(header->request.cas);
806 uint64_t result_cas;
807 rval= client->root->callback->interface.v1.prepend(cookie, key, keylen,
808 data, datalen, cas,
809 &result_cas);
810 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
811 header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
812 {
813 /* Send a positive request */
814 protocol_binary_response_no_extras response= {
815 .message= {
816 .header.response= {
817 .magic= PROTOCOL_BINARY_RES,
818 .opcode= PROTOCOL_BINARY_CMD_PREPEND,
819 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
820 .opaque= header->request.opaque,
821 .cas= ntohll(result_cas),
822 },
823 }
824 };
825 rval= response_handler(cookie, header, (void*)&response);
826 }
827 }
828 else
829 {
830 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
831 }
832
833 return rval;
834 }
835
836 /**
837 * Callback for QUIT and QUITQ. Notify the client and shut down the connection
838 * @param cookie the calling client
839 * @param header the command
840 * @param response_handler not used
841 * @return the result of the operation
842 */
843 static protocol_binary_response_status
844 quit_command_handler(const void *cookie,
845 protocol_binary_request_header *header,
846 memcached_binary_protocol_raw_response_handler response_handler)
847 {
848 struct memcached_binary_protocol_client_st *client= (void*)cookie;
849 if (client->root->callback->interface.v1.quit != NULL)
850 {
851 client->root->callback->interface.v1.quit(cookie);
852 }
853
854 protocol_binary_response_no_extras response= {
855 .message = {
856 .header.response = {
857 .magic= PROTOCOL_BINARY_RES,
858 .opcode= PROTOCOL_BINARY_CMD_QUIT,
859 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
860 .opaque= header->request.opaque
861 }
862 }
863 };
864
865 if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
866 {
867 response_handler(cookie, header, (void*)&response);
868 }
869
870 /* I need a better way to signal to close the connection */
871 return PROTOCOL_BINARY_RESPONSE_EIO;
872 }
873
874 /**
875 * Callback for REPLACE and REPLACEQ
876 * @param cookie the calling client
877 * @param header the command
878 * @param response_handler not used
879 * @return the result of the operation
880 */
881 static protocol_binary_response_status
882 replace_command_handler(const void *cookie,
883 protocol_binary_request_header *header,
884 memcached_binary_protocol_raw_response_handler response_handler)
885 {
886 (void)response_handler;
887 protocol_binary_response_status rval;
888
889 struct memcached_binary_protocol_client_st *client= (void*)cookie;
890 if (client->root->callback->interface.v1.replace != NULL)
891 {
892 uint16_t keylen= ntohs(header->request.keylen);
893 uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
894 protocol_binary_request_replace *request= (void*)header;
895 uint32_t flags= ntohl(request->message.body.flags);
896 uint32_t timeout= ntohl(request->message.body.expiration);
897 char *key= ((char*)header) + sizeof(*header) + 8;
898 char *data= key + keylen;
899 uint64_t cas= ntohll(header->request.cas);
900 uint64_t result_cas;
901
902 rval= client->root->callback->interface.v1.replace(cookie, key, keylen,
903 data, datalen, flags,
904 timeout, cas,
905 &result_cas);
906 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
907 header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
908 {
909 /* Send a positive request */
910 protocol_binary_response_no_extras response= {
911 .message= {
912 .header.response= {
913 .magic= PROTOCOL_BINARY_RES,
914 .opcode= PROTOCOL_BINARY_CMD_REPLACE,
915 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
916 .opaque= header->request.opaque,
917 .cas= ntohll(result_cas),
918 },
919 }
920 };
921 rval= response_handler(cookie, header, (void*)&response);
922 }
923 }
924 else
925 {
926 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
927 }
928
929 return rval;
930 }
931
932 /**
933 * Callback for SET and SETQ
934 * @param cookie the calling client
935 * @param header the command
936 * @param response_handler not used
937 * @return the result of the operation
938 */
939 static protocol_binary_response_status
940 set_command_handler(const void *cookie,
941 protocol_binary_request_header *header,
942 memcached_binary_protocol_raw_response_handler response_handler)
943 {
944 (void)response_handler;
945 protocol_binary_response_status rval;
946
947 struct memcached_binary_protocol_client_st *client= (void*)cookie;
948 if (client->root->callback->interface.v1.set != NULL)
949 {
950 uint16_t keylen= ntohs(header->request.keylen);
951 uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
952 protocol_binary_request_replace *request= (void*)header;
953 uint32_t flags= ntohl(request->message.body.flags);
954 uint32_t timeout= ntohl(request->message.body.expiration);
955 char *key= ((char*)header) + sizeof(*header) + 8;
956 char *data= key + keylen;
957 uint64_t cas= ntohll(header->request.cas);
958 uint64_t result_cas;
959
960
961 rval= client->root->callback->interface.v1.set(cookie, key, keylen,
962 data, datalen, flags,
963 timeout, cas, &result_cas);
964 if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
965 header->request.opcode == PROTOCOL_BINARY_CMD_SET)
966 {
967 /* Send a positive request */
968 protocol_binary_response_no_extras response= {
969 .message= {
970 .header.response= {
971 .magic= PROTOCOL_BINARY_RES,
972 .opcode= PROTOCOL_BINARY_CMD_SET,
973 .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
974 .opaque= header->request.opaque,
975 .cas= ntohll(result_cas),
976 },
977 }
978 };
979 rval= response_handler(cookie, header, (void*)&response);
980 }
981 }
982 else
983 {
984 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
985 }
986
987 return rval;
988 }
989
990 /**
991 * Callback for STAT
992 * @param cookie the calling client
993 * @param header the command
994 * @param response_handler not used
995 * @return the result of the operation
996 */
997 static protocol_binary_response_status
998 stat_command_handler(const void *cookie,
999 protocol_binary_request_header *header,
1000 memcached_binary_protocol_raw_response_handler response_handler)
1001 {
1002 (void)response_handler;
1003 protocol_binary_response_status rval;
1004
1005 struct memcached_binary_protocol_client_st *client= (void*)cookie;
1006 if (client->root->callback->interface.v1.stat != NULL)
1007 {
1008 uint16_t keylen= ntohs(header->request.keylen);
1009
1010 rval= client->root->callback->interface.v1.stat(cookie,
1011 (void*)(header + 1),
1012 keylen,
1013 stat_response_handler);
1014 }
1015 else
1016 {
1017 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1018 }
1019
1020 return rval;
1021 }
1022
1023 /**
1024 * Callback for VERSION
1025 * @param cookie the calling client
1026 * @param header the command
1027 * @param response_handler not used
1028 * @return the result of the operation
1029 */
1030 static protocol_binary_response_status
1031 version_command_handler(const void *cookie,
1032 protocol_binary_request_header *header,
1033 memcached_binary_protocol_raw_response_handler response_handler)
1034 {
1035 (void)response_handler;
1036 (void)header;
1037 protocol_binary_response_status rval;
1038
1039 struct memcached_binary_protocol_client_st *client= (void*)cookie;
1040 if (client->root->callback->interface.v1.version != NULL)
1041 {
1042 rval= client->root->callback->interface.v1.version(cookie,
1043 version_response_handler);
1044 }
1045 else
1046 {
1047 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1048 }
1049
1050 return rval;
1051 }
1052
1053 /**
1054 * The map to remap between the com codes and the v1 logical setting
1055 */
1056 static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256]= {
1057 [PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
1058 [PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
1059 [PROTOCOL_BINARY_CMD_APPENDQ]= append_command_handler,
1060 [PROTOCOL_BINARY_CMD_APPEND]= append_command_handler,
1061 [PROTOCOL_BINARY_CMD_DECREMENTQ]= decrement_command_handler,
1062 [PROTOCOL_BINARY_CMD_DECREMENT]= decrement_command_handler,
1063 [PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
1064 [PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
1065 [PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
1066 [PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
1067 [PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
1068 [PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
1069 [PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
1070 [PROTOCOL_BINARY_CMD_GET]= get_command_handler,
1071 [PROTOCOL_BINARY_CMD_INCREMENTQ]= increment_command_handler,
1072 [PROTOCOL_BINARY_CMD_INCREMENT]= increment_command_handler,
1073 [PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
1074 [PROTOCOL_BINARY_CMD_PREPENDQ]= prepend_command_handler,
1075 [PROTOCOL_BINARY_CMD_PREPEND]= prepend_command_handler,
1076 [PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
1077 [PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
1078 [PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
1079 [PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
1080 [PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
1081 [PROTOCOL_BINARY_CMD_SET]= set_command_handler,
1082 [PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
1083 [PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
1084 };
1085
1086 /**
1087 * Try to execute a command. Fire the pre/post functions and the specialized
1088 * handler function if it's set. If not, the unknown probe should be fired
1089 * if it's present.
1090 * @param client the client connection to operate on
1091 * @param header the command to execute
1092 * @return true if success or false if a fatal error occured so that the
1093 * connection should be shut down.
1094 */
1095 static bool execute_command(struct memcached_binary_protocol_client_st *client, protocol_binary_request_header *header)
1096 {
1097 if (client->root->pedantic &&
1098 memcached_binary_protocol_pedantic_check_request(header))
1099 {
1100 /* @todo return invalid command packet */
1101 }
1102
1103 /* we got all data available, execute the callback! */
1104 if (client->root->callback->pre_execute != NULL)
1105 {
1106 client->root->callback->pre_execute(client, header);
1107 }
1108
1109 protocol_binary_response_status rval;
1110 rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1111 uint8_t cc= header->request.opcode;
1112
1113 switch (client->root->callback->interface_version)
1114 {
1115 case 0:
1116 if (client->root->callback->interface.v0.comcode[cc] != NULL) {
1117 rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler);
1118 }
1119 break;
1120 case 1:
1121 if (comcode_v0_v1_remap[cc] != NULL) {
1122 rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler);
1123 }
1124 break;
1125 default:
1126 /* Unknown interface.
1127 * It should be impossible to get here so I'll just call abort
1128 * to avoid getting a compiler warning :-)
1129 */
1130 abort();
1131 }
1132
1133
1134 if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND &&
1135 client->root->callback->unknown != NULL)
1136 {
1137 rval= client->root->callback->unknown(client, header, raw_response_handler);
1138 }
1139
1140 if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS &&
1141 rval != PROTOCOL_BINARY_RESPONSE_EIO)
1142 {
1143 protocol_binary_response_no_extras response= {
1144 .message= {
1145 .header.response= {
1146 .magic= PROTOCOL_BINARY_RES,
1147 .opcode= cc,
1148 .status= htons(rval),
1149 .opaque= header->request.opaque,
1150 },
1151 }
1152 };
1153 rval= raw_response_handler(client, header, (void*)&response);
1154 }
1155
1156 if (client->root->callback->post_execute != NULL)
1157 {
1158 client->root->callback->post_execute(client, header);
1159 }
1160
1161 return rval != PROTOCOL_BINARY_RESPONSE_EIO;
1162 }
1163
1164 /*
1165 ** **********************************************************************
1166 ** * PUBLIC INTERFACE
1167 ** * See protocol_handler.h for function description
1168 ** **********************************************************************
1169 */
1170 struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void)
1171 {
1172 struct memcached_binary_protocol_st *ret= calloc(1, sizeof(*ret));
1173 if (ret != NULL)
1174 {
1175 ret->recv= default_recv;
1176 ret->send= default_send;
1177 ret->input_buffer_size= 1 * 1024 * 1024;
1178 ret->input_buffer= malloc(ret->input_buffer_size);
1179 if (ret->input_buffer == NULL)
1180 {
1181 free(ret);
1182 ret= NULL;
1183 return NULL;
1184 }
1185
1186 ret->buffer_cache = cache_create("protocol_handler",
1187 CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
1188 0, NULL, NULL);
1189 if (ret->buffer_cache == NULL) {
1190 free(ret->input_buffer);
1191 free(ret);
1192 }
1193 }
1194
1195 return ret;
1196 }
1197
1198 void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance)
1199 {
1200 cache_destroy(instance->buffer_cache);
1201 free(instance->input_buffer);
1202 free(instance);
1203 }
1204
1205 struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance)
1206 {
1207 return instance->callback;
1208 }
1209
1210 void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback)
1211 {
1212 instance->callback= callback;
1213 }
1214
1215 memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie)
1216 {
1217 (void)cookie;
1218 return raw_response_handler;
1219 }
1220
1221 void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable)
1222 {
1223 instance->pedantic= enable;
1224 }
1225
1226 bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance)
1227 {
1228 return instance->pedantic;
1229 }
1230
1231 struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock)
1232 {
1233 struct memcached_binary_protocol_client_st *ret= calloc(1, sizeof(*ret));
1234 if (ret != NULL)
1235 {
1236 ret->root= instance;
1237 ret->sock= sock;
1238 }
1239
1240 return ret;
1241 }
1242
1243 void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client)
1244 {
1245 free(client);
1246 }
1247
1248 enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client)
1249 {
1250 /* Try to send data and read from the socket */
1251 bool more_data= true;
1252 do
1253 {
1254 ssize_t len= client->root->recv(client,
1255 client->sock,
1256 client->root->input_buffer + client->input_buffer_offset,
1257 client->root->input_buffer_size - client->input_buffer_offset);
1258
1259 if (len > 0)
1260 {
1261 /* Do we have the complete packet? */
1262 if (client->input_buffer_offset > 0)
1263 {
1264 memcpy(client->root->input_buffer, client->input_buffer,
1265 client->input_buffer_offset);
1266 len += (ssize_t)client->input_buffer_offset;
1267
1268 /* @todo use buffer-cache! */
1269 free(client->input_buffer);
1270 client->input_buffer_offset= 0;
1271 }
1272
1273 /* try to parse all of the received packets */
1274 protocol_binary_request_header *header;
1275 header= (void*)client->root->input_buffer;
1276
1277 if (header->request.magic != (uint8_t)PROTOCOL_BINARY_REQ)
1278 {
1279 client->error= EINVAL;
1280 return ERROR_EVENT;
1281 }
1282
1283 while (len >= (ssize_t)sizeof(*header) &&
1284 (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen))))
1285 {
1286
1287 /* I have the complete package */
1288 client->current_command = header;
1289 if (!execute_command(client, header))
1290 {
1291 return ERROR_EVENT;
1292 }
1293
1294 ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen));
1295 len -= total;
1296 if (len > 0)
1297 {
1298 intptr_t ptr= (intptr_t)header;
1299 ptr += total;
1300 if ((ptr % 8) == 0)
1301 {
1302 header= (void*)ptr;
1303 }
1304 else
1305 {
1306 memmove(client->root->input_buffer, (void*)ptr, (size_t)len);
1307 header= (void*)client->root->input_buffer;
1308 }
1309 }
1310 }
1311
1312 if (len > 0)
1313 {
1314 /* save the data for later on */
1315 /* @todo use buffer-cache */
1316 client->input_buffer= malloc((size_t)len);
1317 if (client->input_buffer == NULL)
1318 {
1319 client->error= ENOMEM;
1320 return ERROR_EVENT;
1321 }
1322 memcpy(client->input_buffer, header, (size_t)len);
1323 client->input_buffer_offset= (size_t)len;
1324 more_data= false;
1325 }
1326 }
1327 else if (len == 0)
1328 {
1329 /* Connection closed */
1330 drain_output(client);
1331 return ERROR_EVENT;
1332 }
1333 else
1334 {
1335 if (errno != EWOULDBLOCK)
1336 {
1337 client->error= errno;
1338 /* mark this client as terminated! */
1339 return ERROR_EVENT;
1340 }
1341 more_data = false;
1342 }
1343 } while (more_data);
1344
1345 if (!drain_output(client))
1346 {
1347 return ERROR_EVENT;
1348 }
1349
1350 return (client->output) ? READ_WRITE_EVENT : READ_EVENT;
1351 }