From: Trond Norbye Date: Thu, 17 Sep 2009 13:29:16 +0000 (+0200) Subject: Prototype of a protocol parsing library for the binary protocol X-Git-Tag: 0.34~17^2~7 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=9131482f7923cf9e90b5a715b38e70e3a229b052;p=m6w6%2Flibmemcached Prototype of a protocol parsing library for the binary protocol --- diff --git a/.bzrignore b/.bzrignore index dc1898a6..f1beb3e1 100644 --- a/.bzrignore +++ b/.bzrignore @@ -1,9 +1,12 @@ *.lo */.deps */.libs +*/*/.deps +*/*/.libs */Makefile */Makefile.in */*.l[oa] +*/*/*.l[oa] *TAGS INSTALL Makefile @@ -30,8 +33,10 @@ config/depcomp config/install-sh config/ltmain.sh config/missing +config/plugin.ac configure docs/*.[13] +example/memcached_light libmemcached-*.tar.gz libmemcached/memcached_configure.h libtool @@ -56,4 +61,4 @@ libmemcached-0.30-1.src.rpm libmemcached-0.30-1.x86_64.rpm libmemcached-0.31-1.src.rpm libmemcached-0.31-1.x86_64.rpm -config/plugin.ac +libmemcached-?.??/ diff --git a/Makefile.am b/Makefile.am index 672ac763..affdae19 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,6 +1,6 @@ ACLOCAL_AMFLAGS = -I m4 -SUBDIRS = docs libmemcached libmemcachedutil support clients tests +SUBDIRS = docs libmemcached libmemcachedutil support clients tests example EXTRA_dist = README.FIRST check-local: test-no-outputdiff diff --git a/configure.ac b/configure.ac index 6ce3f3c9..253978b2 100644 --- a/configure.ac +++ b/configure.ac @@ -29,6 +29,8 @@ MEMCACHED_LIBRARY_VERSION=3:0:0 AC_SUBST(MEMCACHED_LIBRARY_VERSION) MEMCACHEDUTIL_LIBRARY_VERSION=0:0:0 AC_SUBST(MEMCACHEDUTIL_LIBRARY_VERSION) +MEMCACHEDPROTOCOL_LIBRARY_VERSION=0:0:0 +AC_SUBST(MEMCACHEDPROTOCOL_LIBRARY_VERSION) # libmemcached versioning when linked with GNU ld. @@ -36,9 +38,11 @@ if test "$lt_cv_prog_gnu_ld" = "yes" then LD_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcached/libmemcached.ver" LD_UTIL_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcachedutil/libmemcachedutil.ver" + LD_PROTOCOL_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcached/protocol/libmemcachedprotocol.ver" fi AC_SUBST(LD_VERSION_SCRIPT) AC_SUBST(LD_UTIL_VERSION_SCRIPT) +AC_SUBST(LD_PROTOCOL_VERSION_SCRIPT) #-------------------------------------------------------------------- @@ -87,9 +91,11 @@ AC_CONFIG_FILES([ clients/Makefile tests/Makefile docs/Makefile + example/Makefile libmemcached/Makefile libmemcached/memcached_configure.h libmemcachedutil/Makefile + libmemcached/protocol/Makefile support/Makefile support/libmemcached.pc support/libmemcached.spec diff --git a/example/Makefile.am b/example/Makefile.am new file mode 100644 index 00000000..7c93764e --- /dev/null +++ b/example/Makefile.am @@ -0,0 +1,8 @@ +noinst_PROGRAMS = memcached_light + +memcached_light_SOURCES= memcached_light.c \ + storage.c storage.h \ + interface_v0.c \ + interface_v1.c +memcached_light_LDADD= $(top_builddir)/libmemcached/protocol/libmemcachedprotocol.la + diff --git a/example/common.h b/example/common.h new file mode 100644 index 00000000..6f87a025 --- /dev/null +++ b/example/common.h @@ -0,0 +1,18 @@ +#ifndef EXAMPLE_COMMON_H +#define EXAMPLE_COMMON_H + +#include + +#ifdef linux +/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to + * optimize the conversion functions, but the prototypes generate warnings + * from gcc. The conversion methods isn't the bottleneck for my app, so + * just remove the warnings by undef'ing the optimization .. + */ +#undef ntohs +#undef ntohl +#undef htons +#undef htonl +#endif + +#endif diff --git a/example/interface_v0.c b/example/interface_v0.c new file mode 100644 index 00000000..5971366d --- /dev/null +++ b/example/interface_v0.c @@ -0,0 +1,520 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +/** + * This file contains an implementation of the callback interface for level 0 + * in the protocol library. You might want to have your copy of the protocol + * specification next to your coffee ;-) + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "common.h" +#include "storage.h" + +static protocol_binary_response_status noop_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_no_extras response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_NOOP, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + }; + + return response_handler(cookie, header, (void*)&response); +} + +static protocol_binary_response_status quit_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_no_extras response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_QUIT, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + }; + + if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT) + response_handler(cookie, header, (void*)&response); + + /* I need a better way to signal to close the connection */ + return PROTOCOL_BINARY_RESPONSE_EIO; +} + +static protocol_binary_response_status get_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + uint8_t opcode= header->request.opcode; + union { + protocol_binary_response_get response; + char buffer[4096]; + } msg= { + .response.message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + }; + + struct item *item= get_item(header + 1, ntohs(header->request.keylen)); + if (item) + { + msg.response.message.body.flags= htonl(item->flags); + char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4); + uint32_t bodysize= 4; + msg.response.message.header.response.cas= htonll(item->cas); + if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ) + { + memcpy(ptr, item->key, item->nkey); + msg.response.message.header.response.keylen= htons((uint16_t)item->nkey); + ptr += item->nkey; + bodysize += (uint32_t)item->nkey; + } + memcpy(ptr, item->data, item->size); + bodysize += (uint32_t)item->size; + msg.response.message.header.response.bodylen= htonl(bodysize); + msg.response.message.header.response.extlen= 4; + + return response_handler(cookie, header, (void*)&msg); + } + else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK) + { + msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); + return response_handler(cookie, header, (void*)&msg); + } + + /* Q shouldn't report a miss ;-) */ + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status delete_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + size_t keylen= ntohs(header->request.keylen); + char *key= ((char*)header) + sizeof(*header); + protocol_binary_response_no_extras response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= header->request.opcode, + .opaque= header->request.opaque + } + }; + + if (!delete_item(key, keylen)) + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); + return response_handler(cookie, header, (void*)&response); + } + else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE) + { + /* DELETEQ doesn't want success response */ + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + return response_handler(cookie, header, (void*)&response); + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status flush_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + uint8_t opcode= header->request.opcode; + + /* @fixme sett inn when! */ + flush(0); + + if (opcode == PROTOCOL_BINARY_CMD_FLUSH) + { + protocol_binary_response_no_extras response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + }; + return response_handler(cookie, header, (void*)&response); + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status arithmetic_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_request_incr *req= (void*)header; + protocol_binary_response_incr response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= header->request.opcode, + .opaque= header->request.opaque, + }, + }; + + uint16_t keylen= ntohs(header->request.keylen); + uint64_t initial= ntohll(req->message.body.initial); + uint64_t delta= ntohll(req->message.body.delta); + uint32_t expiration= ntohl(req->message.body.expiration); + void *key= req->bytes + sizeof(req->bytes); + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + struct item *item= get_item(key, keylen); + if (item == NULL) + { + item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(item->data, &initial, sizeof(initial)); + put_item(item); + } + } + else + { + if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT || + header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ) + { + (*(uint64_t*)item->data) += delta; + } + else + { + if (delta > *(uint64_t*)item->data) + { + *(uint64_t*)item->data= 0; + } + else + { + *(uint64_t*)item->data -= delta; + } + } + update_cas(item); + } + + response.message.header.response.status= htons(rval); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) + { + response.message.header.response.bodylen= ntohl(8); + response.message.body.value= ntohll((*(uint64_t*)item->data)); + response.message.header.response.cas= ntohll(item->cas); + + if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ || + header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ) + { + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + } + + return response_handler(cookie, header, (void*)&response); +} + +static protocol_binary_response_status version_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + const char *versionstring= "1.0.0"; + union { + protocol_binary_response_header packet; + char buffer[256]; + } response= { + .packet.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_VERSION, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .bodylen= htonl((uint32_t)strlen(versionstring)) + } + }; + + memcpy(response.buffer + sizeof(response.packet), versionstring, strlen(versionstring)); + + return response_handler(cookie, header, (void*)&response); +} + +static protocol_binary_response_status concat_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + uint16_t keylen= ntohs(header->request.keylen); + uint64_t cas= ntohll(header->request.cas); + void *key= header + 1; + uint32_t vallen= ntohl(header->request.bodylen) - keylen; + void *val= (char*)key + keylen; + + struct item *item= get_item(key, keylen); + struct item *nitem; + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + else if (cas != 0 && cas != item->cas) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, + item->flags, item->exp)) == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND || + header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ) + { + memcpy(nitem->data, item->data, item->size); + memcpy(((char*)(nitem->data)) + item->size, val, vallen); + } + else + { + memcpy(nitem->data, val, vallen); + memcpy(((char*)(nitem->data)) + vallen, item->data, item->size); + } + delete_item(key, keylen); + put_item(nitem); + if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND || + header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND) + { + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= header->request.opcode, + .status= htons(rval), + .opaque= header->request.opaque, + .cas= htonll(nitem->cas), + } + } + }; + return response_handler(cookie, header, (void*)&response); + } + } + + return rval; +} + +static protocol_binary_response_status set_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + size_t keylen= ntohs(header->request.keylen); + size_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_replace *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + time_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= header->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + } + }; + + if (header->request.cas != 0) + { + /* validate cas */ + struct item* item= get_item(key, keylen); + if (item != NULL && item->cas != ntohll(header->request.cas)) + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); + return response_handler(cookie, header, (void*)&response); + } + } + + delete_item(key, keylen); + struct item* item= create_item(key, keylen, data, datalen, flags, timeout); + if (item == 0) + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); + } + else + { + put_item(item); + /* SETQ shouldn't return a message */ + if (header->request.opcode == PROTOCOL_BINARY_CMD_SET) + { + response.message.header.response.cas= htonll(item->cas); + return response_handler(cookie, header, (void*)&response); + } + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + + return response_handler(cookie, header, (void*)&response); +} + +static protocol_binary_response_status add_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + size_t keylen= ntohs(header->request.keylen); + size_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_add *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + time_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= header->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + } + }; + + struct item* item= get_item(key, keylen); + if (item == NULL) + { + item= create_item(key, keylen, data, datalen, flags, timeout); + if (item == 0) + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); + else + { + put_item(item); + /* ADDQ shouldn't return a message */ + if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD) + { + response.message.header.response.cas= htonll(item->cas); + return response_handler(cookie, header, (void*)&response); + } + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + } + else + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); + } + + return response_handler(cookie, header, (void*)&response); +} + +static protocol_binary_response_status replace_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + size_t keylen= ntohs(header->request.keylen); + size_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_replace *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + time_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= header->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + } + }; + + struct item* item= get_item(key, keylen); + if (item == NULL) + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); + else if (header->request.cas == 0 || ntohll(header->request.cas) == item->cas) + { + delete_item(key, keylen); + item= create_item(key, keylen, data, datalen, flags, timeout); + if (item == 0) + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); + else + { + put_item(item); + /* REPLACEQ shouldn't return a message */ + if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE) + { + response.message.header.response.cas= htonll(item->cas); + return response_handler(cookie, header, (void*)&response); + } + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + } + else + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); + + return response_handler(cookie, header, (void*)&response); +} + +static protocol_binary_response_status stat_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + /* Just send the terminating packet*/ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_STAT, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + } + }; + + return response_handler(cookie, header, (void*)&response); +} + +struct memcached_binary_protocol_callback_st interface_v0_impl= { + .interface_version= 0, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler, + .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler, +}; diff --git a/example/interface_v1.c b/example/interface_v1.c new file mode 100644 index 00000000..04e62aa3 --- /dev/null +++ b/example/interface_v1.c @@ -0,0 +1,386 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +/** + * This file contains an implementation of the callback interface for level 1 + * in the protocol library. If you compare the implementation with the one + * in interface_v0.c you will see that this implementation is much easier and + * hides all of the protocol logic and let you focus on the application + * logic. One "problem" with this layer is that it is synchronous, so that + * you will not receive the next command before a answer to the previous + * command is being sent. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "common.h" +#include "storage.h" + +static protocol_binary_response_status add_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *data, + uint32_t datalen, + uint32_t flags, + uint32_t exptime, + uint64_t *cas) +{ + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + struct item* item= get_item(key, keylen); + if (item == NULL) + { + item= create_item(key, keylen, data, datalen, flags, exptime); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + put_item(item); + *cas= item->cas; + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + + return rval; +} + +static protocol_binary_response_status append_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint64_t cas, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + struct item *item= get_item(key, keylen); + struct item *nitem; + + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + else if (cas != 0 && cas != item->cas) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, + item->flags, item->exp)) == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(nitem->data, item->data, item->size); + memcpy(((char*)(nitem->data)) + item->size, val, vallen); + delete_item(key, keylen); + put_item(nitem); + *result_cas= nitem->cas; + } + + return rval; +} + +static protocol_binary_response_status decrement_handler(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t delta, + uint64_t initial, + uint32_t expiration, + uint64_t *result, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + struct item *item= get_item(key, keylen); + + if (item == NULL) + { + item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(item->data, &initial, sizeof(initial)); + put_item(item); + *result= initial; + *result_cas= item->cas; + } + } + else + { + if (delta > *(uint64_t*)item->data) + { + *(uint64_t*)item->data= 0; + } + else + { + *(uint64_t*)item->data -= delta; + } + *result= (*(uint64_t*)item->data); + /* @todo fix cas */ + } + + return rval; +} + +static protocol_binary_response_status delete_handler(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + if (cas != 0) + { + struct item *item= get_item(key, keylen); + if (item != NULL && item->cas != cas) + { + return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + } + + if (!delete_item(key, keylen)) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + + return rval; +} + + +static protocol_binary_response_status flush_handler(const void *cookie, + uint32_t when) { + + (void)cookie; + flush(when); + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status get_handler(const void *cookie, + const void *key, + uint16_t keylen, + memcached_binary_protocol_get_response_handler response_handler) { + struct item *item= get_item(key, keylen); + + if (item == NULL) + { + return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + + return response_handler(cookie, key, (uint16_t)keylen, + item->data, (uint32_t)item->size, item->flags, + item->cas); +} + +static protocol_binary_response_status increment_handler(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t delta, + uint64_t initial, + uint32_t expiration, + uint64_t *result, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + struct item *item= get_item(key, keylen); + + if (item == NULL) + { + item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(item->data, &initial, sizeof(initial)); + put_item(item); + *result= initial; + *result_cas= item->cas; + } + } + else + { + (*(uint64_t*)item->data) += delta; + *result= (*(uint64_t*)item->data); + update_cas(item); + *result_cas= item->cas; + } + + return rval; +} + +static protocol_binary_response_status noop_handler(const void *cookie) { + (void)cookie; + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status prepend_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint64_t cas, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + struct item *item= get_item(key, keylen); + struct item *nitem; + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + else if (cas != 0 && cas != item->cas) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, + item->flags, item->exp)) == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(nitem->data, val, vallen); + memcpy(((char*)(nitem->data)) + vallen, item->data, item->size); + delete_item(key, keylen); + put_item(nitem); + *result_cas= nitem->cas; + } + + return rval; +} + +static protocol_binary_response_status quit_handler(const void *cookie) { + (void)cookie; + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status replace_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void* data, + uint32_t datalen, + uint32_t flags, + uint32_t exptime, + uint64_t cas, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + struct item* item= get_item(key, keylen); + + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + else if (cas == 0 || cas == item->cas) + { + delete_item(key, keylen); + item= create_item(key, keylen, data, datalen, flags, exptime); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + put_item(item); + *result_cas= item->cas; + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + + return rval; +} + +static protocol_binary_response_status set_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void* data, + uint32_t datalen, + uint32_t flags, + uint32_t exptime, + uint64_t cas, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + if (cas != 0) + { + struct item* item= get_item(key, keylen); + if (item != NULL && cas != item->cas) + { + /* Invalid CAS value */ + return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + } + + delete_item(key, keylen); + struct item* item= create_item(key, keylen, data, datalen, flags, exptime); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + put_item(item); + *result_cas= item->cas; + } + + return rval; +} + +static protocol_binary_response_status stat_handler(const void *cookie, + const void *key, + uint16_t keylen, + memcached_binary_protocol_stat_response_handler response_handler) { + (void)key; + (void)keylen; + /* Just return an empty packet */ + return response_handler(cookie, NULL, 0, NULL, 0); +} + +static protocol_binary_response_status version_handler(const void *cookie, + memcached_binary_protocol_version_response_handler response_handler) { + const char *version= "0.1.1"; + return response_handler(cookie, version, (uint32_t)strlen(version)); +} + +struct memcached_binary_protocol_callback_st interface_v1_impl= { + .interface_version= 1, + .interface.v1= { + .add= add_handler, + .append= append_handler, + .decrement= decrement_handler, + .delete= delete_handler, + .flush= flush_handler, + .get= get_handler, + .increment= increment_handler, + .noop= noop_handler, + .prepend= prepend_handler, + .quit= quit_handler, + .replace= replace_handler, + .set= set_handler, + .stat= stat_handler, + .version= version_handler + } +}; diff --git a/example/memcached_light.c b/example/memcached_light.c new file mode 100644 index 00000000..b373a12f --- /dev/null +++ b/example/memcached_light.c @@ -0,0 +1,363 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +/** + * What is a library without an example to show you how to use the library? + * This example use both interfaces to implement a small memcached server. + * Please note that this is an exemple on how to use the library, not + * an implementation of a scalable memcached server. If you look closely + * at the example it isn't even multithreaded ;-) + * + * With that in mind, let me give you some pointers into the source: + * storage.c/h - Implements the item store for this server and not really + * interesting for this example. + * interface_v0.c - Shows an implementation of the memcached server by using + * the "raw" access to the packets as they arrive + * interface_v1.c - Shows an implementation of the memcached server by using + * the more "logical" interface. + * memcached_light.c - This file sets up all of the sockets and run the main + * message loop. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "common.h" +#include "storage.h" + +extern struct memcached_binary_protocol_callback_st interface_v0_impl; +extern struct memcached_binary_protocol_callback_st interface_v1_impl; + +static int server_sockets[1024]; +static int num_server_sockets= 0; +static void* socket_userdata_map[1024]; + +/** + * Create a socket and bind it to a specific port number + * @param port the port number to bind to + */ +static int server_socket(const char *port) { + struct addrinfo *ai; + struct addrinfo hints= { .ai_flags= AI_PASSIVE, + .ai_family= AF_UNSPEC, + .ai_socktype= SOCK_STREAM }; + + int error= getaddrinfo("127.0.0.1", port, &hints, &ai); + if (error != 0) + { + if (error != EAI_SYSTEM) + fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error)); + else + perror("getaddrinfo()"); + + return 1; + } + + struct linger ling= {0, 0}; + + for (struct addrinfo *next= ai; next; next= next->ai_next) + { + int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (sock == -1) + { + perror("Failed to create socket"); + continue; + } + + int flags= fcntl(sock, F_GETFL, 0); + if (flags == -1) + { + perror("Failed to get socket flags"); + close(sock); + continue; + } + + if ((flags & O_NONBLOCK) != O_NONBLOCK) + if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) + { + perror("Failed to set socket to nonblocking mode"); + close(sock); + continue; + } + + flags= 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0) + perror("Failed to set SO_REUSEADDR"); + + if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0) + perror("Failed to set SO_KEEPALIVE"); + + if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0) + perror("Failed to set SO_LINGER"); + + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0) + perror("Failed to set TCP_NODELAY"); + + if (bind(sock, next->ai_addr, next->ai_addrlen) == -1) + { + if (errno != EADDRINUSE) + { + perror("bind()"); + freeaddrinfo(ai); + } + close(sock); + continue; + } + + if (listen(sock, 1024) == -1) + { + perror("listen()"); + close(sock); + continue; + } + + server_sockets[num_server_sockets++]= sock; + } + + freeaddrinfo(ai); + + return (num_server_sockets > 0) ? 0 : 1; +} + +/** + * Convert a command code to a textual string + * @param cmd the comcode to convert + * @return a textual string with the command or NULL for unknown commands + */ +static const char* comcode2str(uint8_t cmd) +{ + static const char * const text[] = { + "GET", "SET", "ADD", "REPLACE", "DELETE", + "INCREMENT", "DECREMENT", "QUIT", "FLUSH", + "GETQ", "NOOP", "VERSION", "GETK", "GETKQ", + "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ", + "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ", + "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ" + }; + + if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ) + return text[cmd]; + + return NULL; +} + +/** + * Print out the command we are about to execute + */ +static void pre_execute(const void *cookie, protocol_binary_request_header *header) +{ + const char *cmd= comcode2str(header->request.opcode); + if (cmd != NULL) + fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd); + else + fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode); +} + +/** + * Print out the command we just executed + */ +static void post_execute(const void *cookie, protocol_binary_request_header *header) +{ + const char *cmd= comcode2str(header->request.opcode); + if (cmd != NULL) + fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd); + else + fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode); +} + +/** + * Callback handler for all unknown commands. + * Send an unknown command back to the client + */ +static protocol_binary_response_status unknown(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= header->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND), + .opaque= header->request.opaque + } + } + }; + + return response_handler(cookie, header, (void*)&response); +} + +static void work(void); + +/** + * Program entry point. Bind to the specified port(s) and serve clients + * + * @param argc number of items in the argument vector + * @param argv argument vector + * @return 0 on success, 1 otherwise + */ +int main(int argc, char **argv) +{ + bool port_specified= false; + int cmd; + struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl; + + while ((cmd= getopt(argc, argv, "1p:?")) != EOF) + { + switch (cmd) { + case '1': + interface= &interface_v1_impl; + break; + case 'p': + port_specified= true; + (void)server_socket(optarg); + break; + case '?': /* FALLTHROUGH */ + default: + (void)fprintf(stderr, "Usage: %s [-p port]\n", argv[0]); + return 1; + } + } + + if (!port_specified) + (void)server_socket("9999"); + + if (num_server_sockets == 0) + { + fprintf(stderr, "I don't have any server sockets\n"); + return 1; + } + + /* + * Create and initialize the handles to the protocol handlers. I want + * to be able to trace the traffic throught the pre/post handlers, and + * set up a common handler for unknown messages + */ + interface->pre_execute= pre_execute; + interface->post_execute= post_execute; + interface->unknown= unknown; + + struct memcached_binary_protocol_st *protocol_handle; + if ((protocol_handle= memcached_binary_protocol_create_instance()) == NULL) + { + fprintf(stderr, "Failed to allocate protocol handle\n"); + return 1; + } + + memcached_binary_protocol_set_callbacks(protocol_handle, interface); + memcached_binary_protocol_set_pedantic(protocol_handle, true); + + for (int xx= 0; xx < num_server_sockets; ++xx) + socket_userdata_map[server_sockets[xx]]= protocol_handle; + + /* Serve all of the clients */ + work(); + + /* NOTREACHED */ + return 0; +} + +static void work(void) { +#define MAX_SERVERS_TO_POLL 100 + struct pollfd fds[MAX_SERVERS_TO_POLL]; + int max_poll; + + for (max_poll= 0; max_poll < num_server_sockets; ++max_poll) + { + fds[max_poll].events= POLLIN; + fds[max_poll].revents= 0; + fds[max_poll].fd= server_sockets[max_poll]; + ++max_poll; + } + + while (true) + { + int err= poll(fds, (nfds_t)max_poll, -1); + + if (err == 0 || (err == -1 && errno != EINTR)) + { + perror("poll() failed"); + abort(); + } + + /* find the available filedescriptors */ + for (int x= max_poll - 1; x > -1 && err > 0; --x) + { + if (fds[x].revents != 0) + { + --err; + if (x < num_server_sockets) + { + /* accept new client */ + struct sockaddr_storage addr; + socklen_t addrlen= sizeof(addr); + int sock= accept(fds[x].fd, (struct sockaddr *)&addr, + &addrlen); + + if (sock == -1) + { + perror("Failed to accept client"); + continue; + } + + struct memcached_binary_protocol_st *protocol; + protocol= socket_userdata_map[fds[x].fd]; + + struct memcached_binary_protocol_client_st* c; + c= memcached_binary_protocol_create_client(protocol, sock); + if (c == NULL) + { + fprintf(stderr, "Failed to create client\n"); + close(sock); + } + else + { + socket_userdata_map[sock]= c; + fds[max_poll].events= POLLIN; + fds[max_poll].revents= 0; + fds[max_poll].fd= sock; + ++max_poll; + } + } + else + { + /* drive the client */ + struct memcached_binary_protocol_client_st* c; + c= socket_userdata_map[fds[x].fd]; + assert(c != NULL); + fds[max_poll].events= 0; + + switch (memcached_binary_protocol_client_work(c)) + { + case WRITE_EVENT: + case READ_WRITE_EVENT: + fds[max_poll].events= POLLOUT; + /* FALLTHROUGH */ + case READ_EVENT: + fds[max_poll].events |= POLLIN; + break; + case ERROR_EVENT: + default: /* ERROR or unknown state.. close */ + memcached_binary_protocol_client_destroy(c); + close(fds[x].fd); + fds[x].events= 0; + + if (x != max_poll - 1) + memmove(fds + x, fds + x + 1, (size_t)(max_poll - x)); + + --max_poll; + } + } + } + } + } +} diff --git a/example/storage.c b/example/storage.c new file mode 100644 index 00000000..18304961 --- /dev/null +++ b/example/storage.c @@ -0,0 +1,147 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#include +#include +#include +#include +#include +#include "storage.h" + +struct list_entry { + struct item item; + struct list_entry *next; + struct list_entry *prev; +}; + +static struct list_entry *root; +static uint64_t cas; + +void put_item(struct item* item) { + struct list_entry* entry= (void*)item; + update_cas(item); + + if (root == NULL) + { + entry->next= entry->prev= entry; + } + else + { + entry->prev= root->prev; + entry->next= root; + entry->prev->next= entry; + entry->next->prev= entry; + } + + root= entry; +} + +struct item* get_item(const void* key, size_t nkey) { + struct list_entry *walker= root; + if (root == NULL) + { + return NULL; + } + + do + { + if (((struct item*)walker)->nkey == nkey && + memcmp(((struct item*)walker)->key, key, nkey) == 0) + { + return (struct item*)walker; + } + walker= walker->next; + } while (walker != root); + + return NULL; +} + +struct item* create_item(const void* key, size_t nkey, const void* data, + size_t size, uint32_t flags, time_t exp) +{ + struct item* ret= calloc(1, sizeof(struct list_entry)); + if (ret != NULL) + { + ret->key= malloc(nkey); + if (size > 0) + { + ret->data= malloc(size); + } + + if (ret->key == NULL || (size > 0 && ret->data == NULL)) + { + free(ret->key); + free(ret->data); + free(ret); + return NULL; + } + + memcpy(ret->key, key, nkey); + if (data != NULL) + { + memcpy(ret->data, data, size); + } + + ret->nkey= nkey; + ret->size= size; + ret->flags= flags; + ret->exp= exp; + } + + return ret; +} + +bool delete_item(const void* key, size_t nkey) { + struct item* item= get_item(key, nkey); + bool ret= false; + + if (item) + { + /* remove from linked list */ + struct list_entry *entry= (void*)item; + + if (entry->next == entry) + { + /* Only one object in the list */ + root= NULL; + } + else + { + /* ensure that we don't loose track of the root, and this will + * change the start position for the next search ;-) */ + root= entry->next; + entry->prev->next= entry->next; + entry->next->prev= entry->prev; + } + + free(item->key); + free(item->data); + free(item); + ret= true; + } + + return ret; +} + +void flush(uint32_t when) { + /* FIXME */ + (void)when; + /* remove the complete linked list */ + if (root == NULL) + { + return; + } + + root->prev->next= NULL; + while (root != NULL) + { + struct item* tmp= (void*)root; + root= root->next; + + free(tmp->key); + free(tmp->data); + free(tmp); + } +} + +void update_cas(struct item* item) { + item->cas= ++cas; +} diff --git a/example/storage.h b/example/storage.h new file mode 100644 index 00000000..fbe75474 --- /dev/null +++ b/example/storage.h @@ -0,0 +1,23 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#ifndef STORAGE_H +#define STORAGE_H + +struct item { + uint64_t cas; + void* key; + size_t nkey; + void* data; + size_t size; + uint32_t flags; + time_t exp; +}; + +void update_cas(struct item* item); +void put_item(struct item* item); +struct item* get_item(const void* key, size_t nkey); +struct item* create_item(const void* key, size_t nkey, const void *data, + size_t size, uint32_t flags, time_t exp); +bool delete_item(const void* key, size_t nkey); +void flush(uint32_t when); + +#endif diff --git a/libmemcached/Makefile.am b/libmemcached/Makefile.am index 087290e9..165122d0 100644 --- a/libmemcached/Makefile.am +++ b/libmemcached/Makefile.am @@ -1,6 +1,8 @@ EXTRA_DIST = libmemcached_probes.d memcached/README.txt libmemcached.ver \ memcached_configure.h.in +SUBDIRS = protocol + EXTRA_HEADERS = BUILT_SOURCES= @@ -21,8 +23,12 @@ pkginclude_HEADERS= memcached.h \ memcached_string.h \ memcached_types.h \ memcached_watchpoint.h \ + protocol_handler.h \ visibility.h +nobase_pkginclude_HEADERS=protocol/cache.h \ + protocol/callback.h + if BUILD_LIBMEMCACHEDUTIL pkginclude_HEADERS+= memcached_util.h memcached_pool.h diff --git a/libmemcached/memcached/protocol_binary.h b/libmemcached/memcached/protocol_binary.h index 08df72e8..74cdca52 100644 --- a/libmemcached/memcached/protocol_binary.h +++ b/libmemcached/memcached/protocol_binary.h @@ -69,7 +69,10 @@ extern "C" PROTOCOL_BINARY_RESPONSE_EINVAL = 0x04, PROTOCOL_BINARY_RESPONSE_NOT_STORED = 0x05, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND = 0x81, - PROTOCOL_BINARY_RESPONSE_ENOMEM = 0x82 + PROTOCOL_BINARY_RESPONSE_ENOMEM = 0x82, + + + PROTOCOL_BINARY_RESPONSE_EIO = 0xff } protocol_binary_response_status; /** diff --git a/libmemcached/protocol/Makefile.am b/libmemcached/protocol/Makefile.am new file mode 100644 index 00000000..fb4b0a76 --- /dev/null +++ b/libmemcached/protocol/Makefile.am @@ -0,0 +1,8 @@ +EXTRA_DIST= libmemcachedprotocol.ver + +lib_LTLIBRARIES=libmemcachedprotocol.la + +noinst_HEADERS = common.h + +libmemcachedprotocol_la_SOURCES= protocol_handler.c cache.c pedantic.c +libmemcachedprotocol_la_LDFLAGS= -version-info $(MEMCACHEDPROTOCOL_LIBRARY_VERSION) $(LD_PROTOCOL_VERSION_SCRIPT) diff --git a/libmemcached/protocol/cache.c b/libmemcached/protocol/cache.c new file mode 100644 index 00000000..c7c8a613 --- /dev/null +++ b/libmemcached/protocol/cache.c @@ -0,0 +1,149 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +#include +#include +#include + +#ifndef NDEBUG +#include +#endif + +#include "cache.h" + +#ifndef NDEBUG +const uint64_t redzone_pattern = 0xdeadbeefcafebabe; +int cache_error = 0; +#endif + +const size_t initial_pool_size = 64; + +cache_t* cache_create(const char *name, size_t bufsize, size_t align, + cache_constructor_t* constructor, + cache_destructor_t* destructor) { + cache_t* ret = calloc(1, sizeof(cache_t)); + char* nm = strdup(name); + void** ptr = calloc(initial_pool_size, bufsize); + if (ret == NULL || nm == NULL || ptr == NULL || + pthread_mutex_init(&ret->mutex, NULL) == -1) { + free(ret); + free(nm); + free(ptr); + return NULL; + } + + ret->name = nm; + ret->ptr = ptr; + ret->freetotal = initial_pool_size; + ret->constructor = constructor; + ret->destructor = destructor; + +#ifndef NDEBUG + ret->bufsize = bufsize + 2 * sizeof(redzone_pattern); +#else + ret->bufsize = bufsize; +#endif + + (void)align; + + return ret; +} + +static inline void* get_object(void *ptr) { +#ifndef NDEBUG + uint64_t *pre = ptr; + return pre + 1; +#else + return ptr; +#endif +} + +void cache_destroy(cache_t *cache) { + while (cache->freecurr > 0) { + void *ptr = cache->ptr[--cache->freecurr]; + if (cache->destructor) { + cache->destructor(get_object(ptr), NULL); + } + free(ptr); + } + free(cache->name); + free(cache->ptr); + pthread_mutex_destroy(&cache->mutex); +} + +void* cache_alloc(cache_t *cache) { + void *ret; + void *object; + pthread_mutex_lock(&cache->mutex); + if (cache->freecurr > 0) { + ret = cache->ptr[--cache->freecurr]; + object = get_object(ret); + } else { + object = ret = malloc(cache->bufsize); + if (ret != NULL) { + object = get_object(ret); + + if (cache->constructor != NULL && + cache->constructor(object, NULL, 0) != 0) { + free(ret); + object = NULL; + } + } + } + pthread_mutex_unlock(&cache->mutex); + +#ifndef NDEBUG + if (object != NULL) { + /* add a simple form of buffer-check */ + uint64_t *pre = ret; + *pre = redzone_pattern; + ret = pre+1; + memcpy(((char*)ret) + cache->bufsize - (2 * sizeof(redzone_pattern)), + &redzone_pattern, sizeof(redzone_pattern)); + } +#endif + + return object; +} + +void cache_free(cache_t *cache, void *ptr) { + pthread_mutex_lock(&cache->mutex); + +#ifndef NDEBUG + /* validate redzone... */ + if (memcmp(((char*)ptr) + cache->bufsize - (2 * sizeof(redzone_pattern)), + &redzone_pattern, sizeof(redzone_pattern)) != 0) { + raise(SIGABRT); + cache_error = 1; + pthread_mutex_unlock(&cache->mutex); + return; + } + uint64_t *pre = ptr; + --pre; + if (*pre != redzone_pattern) { + raise(SIGABRT); + cache_error = -1; + pthread_mutex_unlock(&cache->mutex); + return; + } + ptr = pre; +#endif + if (cache->freecurr < cache->freetotal) { + cache->ptr[cache->freecurr++] = ptr; + } else { + /* try to enlarge free connections array */ + size_t newtotal = cache->freetotal * 2; + void **new_free = realloc(cache->ptr, sizeof(char *) * newtotal); + if (new_free) { + cache->freetotal = newtotal; + cache->ptr = new_free; + cache->ptr[cache->freecurr++] = ptr; + } else { + if (cache->destructor) { + cache->destructor(ptr, NULL); + } + free(ptr); + + } + } + pthread_mutex_unlock(&cache->mutex); +} + diff --git a/libmemcached/protocol/cache.h b/libmemcached/protocol/cache.h new file mode 100644 index 00000000..a96fba99 --- /dev/null +++ b/libmemcached/protocol/cache.h @@ -0,0 +1,116 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +#ifndef CACHE_H +#define CACHE_H +#include + +#ifdef HAVE_UMEM_H +#include +#define cache_t umem_cache_t +#define cache_alloc(a) umem_cache_alloc(a, UMEM_DEFAULT) +#define cache_free(a, b) umem_cache_free(a, b) +#define cache_create(a,b,c,d,e) umem_cache_create((char*)a, b, c, d, e, NULL, NULL, NULL, 0) +#define cache_destroy(a) umem_cache_destroy(a); + +#else + +#ifndef NDEBUG +/* may be used for debug purposes */ +extern int cache_error; +#endif + +/** + * Constructor used to initialize allocated objects + * + * @param obj pointer to the object to initialized. + * @param notused1 This parameter is currently not used. + * @param notused2 This parameter is currently not used. + * @return you should return 0, but currently this is not checked + */ +typedef int cache_constructor_t(void* obj, void* notused1, int notused2); +/** + * Destructor used to clean up allocated objects before they are + * returned to the operating system. + * + * @param obj pointer to the object to initialized. + * @param notused1 This parameter is currently not used. + * @param notused2 This parameter is currently not used. + * @return you should return 0, but currently this is not checked + */ +typedef void cache_destructor_t(void* obj, void* notused); + +/** + * Definition of the structure to keep track of the internal details of + * the cache allocator. Touching any of these variables results in + * undefined behavior. + */ +typedef struct { + /** Mutex to protect access to the structure */ + pthread_mutex_t mutex; + /** Name of the cache objects in this cache (provided by the caller) */ + char *name; + /** List of pointers to available buffers in this cache */ + void **ptr; + /** The size of each element in this cache */ + size_t bufsize; + /** The capacity of the list of elements */ + size_t freetotal; + /** The current number of free elements */ + size_t freecurr; + /** The constructor to be called each time we allocate more memory */ + cache_constructor_t* constructor; + /** The destructor to be called each time before we release memory */ + cache_destructor_t* destructor; +} cache_t; + +/** + * Create an object cache. + * + * The object cache will let you allocate objects of the same size. It is fully + * MT safe, so you may allocate objects from multiple threads without having to + * do any syncrhonization in the application code. + * + * @param name the name of the object cache. This name may be used for debug purposes + * and may help you track down what kind of object you have problems with + * (buffer overruns, leakage etc) + * @param bufsize the size of each object in the cache + * @param align the alignment requirements of the objects in the cache. + * @param constructor the function to be called to initialize memory when we need + * to allocate more memory from the os. + * @param destructor the function to be called before we release the memory back + * to the os. + * @return a handle to an object cache if successful, NULL otherwise. + */ +cache_t* cache_create(const char* name, size_t bufsize, size_t align, + cache_constructor_t* constructor, + cache_destructor_t* destructor); +/** + * Destroy an object cache. + * + * Destroy and invalidate an object cache. You should return all buffers allocated + * with cache_alloc by using cache_free before calling this function. Not doing + * so results in undefined behavior (the buffers may or may not be invalidated) + * + * @param handle the handle to the object cache to destroy. + */ +void cache_destroy(cache_t* handle); +/** + * Allocate an object from the cache. + * + * @param handle the handle to the object cache to allocate from + * @return a pointer to an initialized object from the cache, or NULL if + * the allocation cannot be satisfied. + */ +void* cache_alloc(cache_t* handle); +/** + * Return an object back to the cache. + * + * The caller should return the object in an initialized state so that + * the object may be returned in an expected state from cache_alloc. + * + * @param handle handle to the object cache to return the object to + * @param ptr pointer to the object to return. + */ +void cache_free(cache_t* handle, void* ptr); +#endif + +#endif diff --git a/libmemcached/protocol/callback.h b/libmemcached/protocol/callback.h new file mode 100644 index 00000000..c1f8c632 --- /dev/null +++ b/libmemcached/protocol/callback.h @@ -0,0 +1,404 @@ +/* + * Summary: Definition of the callback interface + * + * Copy: See Copyright for the status of this software. + * + * Author: Trond Norbye + */ +#ifndef LIBMEMCACHEDPROTOCOL_CALLBACK_H +#define LIBMEMCACHEDPROTOCOL_CALLBACK_H + +/** + * Callback to send data back from a successful GET/GETQ/GETK/GETKQ command + * + * @param cookie Just pass along the cookie supplied in the callback + * @param key What to insert as key in the reply + * @param keylen The length of the key + * @param body What to store in the body of the package + * @param bodylen The number of bytes of the body + * @param flags The flags stored with the item + * @param cas The CAS value to insert into the response (should be 0 + * if you don't care) + */ +typedef protocol_binary_response_status +(*memcached_binary_protocol_get_response_handler)(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen, + uint32_t flags, + uint64_t cas); +/** + * Callback to send data back from a STAT command + * + * @param cookie Just pass along the cookie supplied in the callback + * @param key What to insert as key in the reply + * @param keylen The length of the key + * @param body What to store in the body of the package + * @param bodylen The number of bytes of the body + */ +typedef protocol_binary_response_status +(*memcached_binary_protocol_stat_response_handler)(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen); +/** + * Callback to send data back from a VERSION command + * + * @param cookie Just pass along the cookie supplied in the callback + * @param text The version string + * @param length The number of bytes in the version string + */ +typedef protocol_binary_response_status +(*memcached_binary_protocol_version_response_handler)(const void *cookie, + const void *text, + uint32_t length); + + +/** + * In the low level interface you need to format the response + * packet yourself (giving you complete freedom :-) + * + * @param cookie Just pass along the cookie supplied in the callback + * @param request Pointer to the request packet you are sending a reply to + * @param response Pointer to the response packet to send + * + */ +typedef protocol_binary_response_status (*memcached_binary_protocol_raw_response_handler)(const void *cookie, + protocol_binary_request_header *request, + protocol_binary_response_header *response); + +/** + * In the low lever interface you have to do most of the work by + * yourself, but it also gives you a lot of freedom :-) + * @param cookie identification for this connection, just pass it along to + * the response handler + * @param header the command received over the wire. Never try to access + * anything outside the command. + * @param resonse_handler call this function to send data back to the client + */ +typedef protocol_binary_response_status (*memcached_binary_protocol_command_handler)(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler); + +/** + * The raw interface to the packets is implemented in version 0. It contains + * just an array with command handlers. The inxed in the array is the + * com code. + */ +struct memcached_binary_protocol_callback_v0_st { + memcached_binary_protocol_command_handler comcode[256]; +}; + +/** + * The first version of the callback struct containing all of the + * documented commands in the initial release of the binary protocol + * (aka. memcached 1.4.0). + * + * You might miss the Q commands (addq etc) but the response function + * knows how to deal with them so you don't need to worry about that :-) + */ +struct memcached_binary_protocol_callback_v1_st { + /** + * Add an item to the cache + * @param cookie id of the client receiving the command + * @param key the key to add + * @param len the length of the key + * @param val the value to store for the key (may be NIL) + * @param vallen the length of the data + * @param flags the flags to store with the key + * @param exptime the expiry time for the key-value pair + * @param cas the resulting cas for the add operation (if success) + */ + protocol_binary_response_status (*add)(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint32_t flags, + uint32_t exptime, + uint64_t *cas); + + /** + * Append data to an existing key-value pair. + * + * @param cookie id of the client receiving the command + * @param key the key to add data to + * @param len the length of the key + * @param val the value to append to the value + * @param vallen the length of the data + * @param cas the CAS in the request + * @param result_cas the resulting cas for the append operation + * + */ + protocol_binary_response_status (*append)(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint64_t cas, + uint64_t *result_cas); + + /** + * Decrement the value for a key + * + * @param cookie id of the client receiving the command + * @param key the key to decrement the value for + * @param len the length of the key + * @param delta the amount to decrement + * @param initial initial value to store (if the key doesn't exist) + * @param expiration expiration time for the object (if the key doesn't exist) + * @param cas the CAS in the request + * @param result the result from the decrement + * @param result_cas the cas of the item + * + */ + protocol_binary_response_status (*decrement)(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t delta, + uint64_t initial, + uint32_t expiration, + uint64_t *result, + uint64_t *result_cas); + + /** + * Delete an existing key + * + * @param cookie id of the client receiving the command + * @param key the key to delete + * @param len the length of the key + * @param cas the CAS in the request + */ + protocol_binary_response_status (*delete)(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t cas); + + + /** + * Flush the cache + * + * @param cookie id of the client receiving the command + * @param when when the cache should be flushed (0 == immediately) + */ + protocol_binary_response_status (*flush)(const void *cookie, + uint32_t when); + + + + /** + * Get a key-value pair + * + * @param cookie id of the client receiving the command + * @param key the key to get + * @param len the length of the key + * @param response_handler to send the result back to the client + */ + protocol_binary_response_status (*get)(const void *cookie, + const void *key, + uint16_t keylen, + memcached_binary_protocol_get_response_handler response_handler); + + /** + * Increment the value for a key + * + * @param cookie id of the client receiving the command + * @param key the key to increment the value on + * @param len the length of the key + * @param delta the amount to increment + * @param initial initial value to store (if the key doesn't exist) + * @param expiration expiration time for the object (if the key doesn't exist) + * @param cas the CAS in the request + * @param result the result from the decrement + * @param result_cas the cas of the item + * + */ + protocol_binary_response_status (*increment)(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t delta, + uint64_t initial, + uint32_t expiration, + uint64_t *result, + uint64_t *result_cas); + + /** + * The noop command was received. This is just a notification callback (the + * response is automatically created). + * + * @param cookie id of the client receiving the command + */ + protocol_binary_response_status (*noop)(const void *cookie); + + /** + * Prepend data to an existing key-value pair. + * + * @param cookie id of the client receiving the command + * @param key the key to prepend data to + * @param len the length of the key + * @param val the value to prepend to the value + * @param vallen the length of the data + * @param cas the CAS in the request + * @param result-cas the cas id of the item + * + */ + protocol_binary_response_status (*prepend)(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint64_t cas, + uint64_t *result_cas); + + /** + * The quit command was received. This is just a notification callback (the + * response is automatically created). + * + * @param cookie id of the client receiving the command + */ + protocol_binary_response_status (*quit)(const void *cookie); + + + /** + * Replace an existing item to the cache + * + * @param cookie id of the client receiving the command + * @param key the key to replace the content for + * @param len the length of the key + * @param val the value to store for the key (may be NIL) + * @param vallen the length of the data + * @param flags the flags to store with the key + * @param exptime the expiry time for the key-value pair + * @param cas the cas id in the request + * @param result_cas the cas id of the item + */ + protocol_binary_response_status (*replace)(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint32_t flags, + uint32_t exptime, + uint64_t cas, + uint64_t *result_cas); + + + /** + * Set a key-value pair in the cache + * + * @param cookie id of the client receiving the command + * @param key the key to insert + * @param len the length of the key + * @param val the value to store for the key (may be NIL) + * @param vallen the length of the data + * @param flags the flags to store with the key + * @param exptime the expiry time for the key-value pair + * @param cas the cas id in the request + * @param result_cas the cas id of the new item + */ + protocol_binary_response_status (*set)(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint32_t flags, + uint32_t exptime, + uint64_t cas, + uint64_t *result_cas); + + /** + * Get status information + * + * @param cookie id of the client receiving the command + * @param key the key to get status for (or NIL to request all status). + * Remember to insert the terminating packet if multiple + * packets should be returned. + * @param keylen the length of the key + * @param response_handler to send the result back to the client, but + * don't send reply on success! + * + */ + protocol_binary_response_status (*stat)(const void *cookie, + const void *key, + uint16_t keylen, + memcached_binary_protocol_stat_response_handler response_handler); + + /** + * Get the version information + * + * @param cookie id of the client receiving the command + * @param response_handler to send the result back to the client, but + * don't send reply on success! + * + */ + protocol_binary_response_status (*version)(const void *cookie, + memcached_binary_protocol_version_response_handler response_handler); +}; + +/** + * + */ +struct memcached_binary_protocol_callback_st { + /** + * The interface version used (set to 0 if you don't have any specialized + * command handlers). + */ + uint64_t interface_version; + + /** + * Callback fired just before the command will be executed. + * + * @param cookie id of the client receiving the command + * @param header the command header as received on the wire. If you look + * at the content you must ensure that you don't + * try to access beyond the end of the message. + */ + void (*pre_execute)(const void *cookie, + protocol_binary_request_header *header); + /** + * Callback fired just after the command was exected (please note + * that the data transfer back to the client is not finished at this + * time). + * + * @param cookie id of the client receiving the command + * @param header the command header as received on the wire. If you look + * at the content you must ensure that you don't + * try to access beyond the end of the message. + */ + void (*post_execute)(const void *cookie, + protocol_binary_request_header *header); + + /** + * Callback fired if no specialized callback is registered for this + * specific command code. + * + * @param cookie id of the client receiving the command + * @param header the command header as received on the wire. You must + * ensure that you don't try to access beyond the end of the + * message. + * @param response_handler The response handler to send data back. + */ + protocol_binary_response_status (*unknown)(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler); + + /** + * The different interface levels we support. A pointer is used so the + * size of the structure is fixed. You must ensure that the memory area + * passed as the pointer is valid as long as you use the protocol handler. + */ + union { + struct memcached_binary_protocol_callback_v0_st v0; + + /** + * The first version of the callback struct containing all of the + * documented commands in the initial release of the binary protocol + * (aka. memcached 1.4.0). + */ + struct memcached_binary_protocol_callback_v1_st v1; + } interface; +}; + +#endif diff --git a/libmemcached/protocol/common.h b/libmemcached/protocol/common.h new file mode 100644 index 00000000..036a13f6 --- /dev/null +++ b/libmemcached/protocol/common.h @@ -0,0 +1,93 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#ifndef MEMCACHED_PROTOCOL_INTERNAL_H +#define MEMCACHED_PROTOCOL_INTERNAL_H + +#include "config.h" +#include +#include +#include + +#ifdef linux +/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to + * optimize the conversion functions, but the prototypes generate warnings + * from gcc. The conversion methods isn't the bottleneck for my app, so + * just remove the warnings by undef'ing the optimization .. + */ +#undef ntohs +#undef ntohl +#undef htons +#undef htonl + +#endif + + +/* Define this here, which will turn on the visibilty controls while we're + * building libmemcached. + */ +#define BUILDING_LIBMEMCACHED 1 + +#include +#include + +struct memcached_binary_protocol_st { + struct memcached_binary_protocol_callback_st *callback; + memcached_binary_protocol_recv_func recv; + memcached_binary_protocol_send_func send; + char *input_buffer; + size_t input_buffer_size; + bool pedantic; + /* @todo use multiple sized buffers */ + cache_t *buffer_cache; +}; + + +struct chunk_st { + /* Pointer to the data */ + char *data; + /* The offset to the first byte into the buffer that is used */ + size_t offset; + /* The offset into the buffer for the first free byte */ + size_t nbytes; + /* The number of bytes in the buffer */ + size_t size; + /* Pointer to the next buffer in the chain */ + struct chunk_st *next; +}; + +#define CHUNK_BUFFERSIZE 2048 + +struct memcached_binary_protocol_client_st { + struct memcached_binary_protocol_st *root; + int sock; + int error; + + /* Linked list of data to send */ + struct chunk_st *output; + struct chunk_st *output_tail; + + + + char *input_buffer; + size_t input_buffer_size; + size_t input_buffer_offset; + char *curr_input; + + + struct chunk_st *input; + /* Pointer to the last chunk to avoid the need to traverse the complete list */ + struct chunk_st *input_tail; + size_t bytes_available; + + protocol_binary_request_header *current_command; + bool quiet; +}; + +LIBMEMCACHED_LOCAL +bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_header *request); + +LIBMEMCACHED_LOCAL +bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_header *request, + protocol_binary_response_header *response); + + +#endif diff --git a/libmemcached/protocol/libmemcachedprotocol.ver b/libmemcached/protocol/libmemcachedprotocol.ver new file mode 100644 index 00000000..09090189 --- /dev/null +++ b/libmemcached/protocol/libmemcachedprotocol.ver @@ -0,0 +1 @@ +libmemcachedprotocol_0 { global: *; }; diff --git a/libmemcached/protocol/pedantic.c b/libmemcached/protocol/pedantic.c new file mode 100644 index 00000000..884f6721 --- /dev/null +++ b/libmemcached/protocol/pedantic.c @@ -0,0 +1,203 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#include "common.h" + +#include +#include +#include + +#define ensure(a) if (!(a)) { return false; } + +bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_header *request) +{ + ensure(request->request.magic == PROTOCOL_BINARY_REQ); + ensure(request->request.datatype == PROTOCOL_BINARY_RAW_BYTES); + + ensure(request->bytes[6] == 0); + ensure(request->bytes[7] == 0); + + uint8_t opcode= request->request.opcode; + uint16_t keylen= ntohs(request->request.keylen); + uint8_t extlen= request->request.extlen; + uint32_t bodylen= ntohl(request->request.bodylen); + + ensure(bodylen >= (keylen + extlen)); + + switch (opcode) { + case PROTOCOL_BINARY_CMD_GET: + case PROTOCOL_BINARY_CMD_GETK: + case PROTOCOL_BINARY_CMD_GETKQ: + case PROTOCOL_BINARY_CMD_GETQ: + ensure(extlen == 0); + ensure(keylen > 0); + ensure(keylen == bodylen); + ensure(request->request.cas == 0); + break; + + case PROTOCOL_BINARY_CMD_ADD: + case PROTOCOL_BINARY_CMD_ADDQ: + /* it makes no sense to run add with a cas value */ + ensure(request->request.cas == 0); + /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_SET: + case PROTOCOL_BINARY_CMD_SETQ: + case PROTOCOL_BINARY_CMD_REPLACE: + case PROTOCOL_BINARY_CMD_REPLACEQ: + ensure(keylen > 0); + ensure(extlen == 8); + break; + + case PROTOCOL_BINARY_CMD_DELETE: + case PROTOCOL_BINARY_CMD_DELETEQ: + ensure(extlen == 0); + ensure(keylen > 0); + ensure(keylen == bodylen); + break; + + case PROTOCOL_BINARY_CMD_INCREMENT: + case PROTOCOL_BINARY_CMD_INCREMENTQ: + case PROTOCOL_BINARY_CMD_DECREMENT: + case PROTOCOL_BINARY_CMD_DECREMENTQ: + ensure(extlen == 20); + ensure(keylen > 0); + ensure(keylen + extlen == bodylen); + break; + + case PROTOCOL_BINARY_CMD_QUIT: + case PROTOCOL_BINARY_CMD_QUITQ: + case PROTOCOL_BINARY_CMD_NOOP: + case PROTOCOL_BINARY_CMD_VERSION: + ensure(extlen == 0); + ensure(keylen == 0); + ensure(bodylen == 0); + break; + + case PROTOCOL_BINARY_CMD_FLUSH: + case PROTOCOL_BINARY_CMD_FLUSHQ: + ensure(extlen == 0 || extlen == 4); + ensure(keylen == 0); + ensure(bodylen == extlen); + break; + + case PROTOCOL_BINARY_CMD_STAT: + ensure(extlen == 0); + /* May have key, but not value */ + ensure(keylen == bodylen); + break; + + case PROTOCOL_BINARY_CMD_APPEND: + case PROTOCOL_BINARY_CMD_APPENDQ: + case PROTOCOL_BINARY_CMD_PREPEND: + case PROTOCOL_BINARY_CMD_PREPENDQ: + ensure(extlen == 0); + ensure(keylen > 0); + break; + default: + /* Unknown command */ + ; + } + + return true; +} + +bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_header *request, + protocol_binary_response_header *response) +{ + ensure(response->response.magic == PROTOCOL_BINARY_RES); + ensure(response->response.datatype == PROTOCOL_BINARY_RAW_BYTES); + ensure(response->response.opaque == request->request.opaque); + + uint16_t status= ntohs(response->response.status); + uint8_t opcode= response->response.opcode; + + if (status == PROTOCOL_BINARY_RESPONSE_SUCCESS) + { + switch (opcode) { + case PROTOCOL_BINARY_CMD_ADDQ: + case PROTOCOL_BINARY_CMD_APPENDQ: + case PROTOCOL_BINARY_CMD_DECREMENTQ: + case PROTOCOL_BINARY_CMD_DELETEQ: + case PROTOCOL_BINARY_CMD_FLUSHQ: + case PROTOCOL_BINARY_CMD_INCREMENTQ: + case PROTOCOL_BINARY_CMD_PREPENDQ: + case PROTOCOL_BINARY_CMD_QUITQ: + case PROTOCOL_BINARY_CMD_REPLACEQ: + case PROTOCOL_BINARY_CMD_SETQ: + /* Quiet command shouldn't return on success */ + return false; + default: + break; + } + + switch (opcode) { + case PROTOCOL_BINARY_CMD_ADD: + case PROTOCOL_BINARY_CMD_REPLACE: + case PROTOCOL_BINARY_CMD_SET: + case PROTOCOL_BINARY_CMD_APPEND: + case PROTOCOL_BINARY_CMD_PREPEND: + ensure(response->response.keylen == 0); + ensure(response->response.extlen == 0); + ensure(response->response.bodylen == 0); + ensure(response->response.cas != 0); + break; + case PROTOCOL_BINARY_CMD_FLUSH: + case PROTOCOL_BINARY_CMD_NOOP: + case PROTOCOL_BINARY_CMD_QUIT: + case PROTOCOL_BINARY_CMD_DELETE: + ensure(response->response.keylen == 0); + ensure(response->response.extlen == 0); + ensure(response->response.bodylen == 0); + ensure(response->response.cas == 0); + break; + + case PROTOCOL_BINARY_CMD_DECREMENT: + case PROTOCOL_BINARY_CMD_INCREMENT: + ensure(response->response.keylen == 0); + ensure(response->response.extlen == 0); + ensure(ntohl(response->response.bodylen) == 8); + ensure(response->response.cas != 0); + break; + + case PROTOCOL_BINARY_CMD_STAT: + ensure(response->response.extlen == 0); + /* key and value exists in all packets except in the terminating */ + ensure(response->response.cas == 0); + break; + + case PROTOCOL_BINARY_CMD_VERSION: + ensure(response->response.keylen == 0); + ensure(response->response.extlen == 0); + ensure(response->response.bodylen != 0); + ensure(response->response.cas == 0); + break; + + case PROTOCOL_BINARY_CMD_GET: + case PROTOCOL_BINARY_CMD_GETQ: + ensure(response->response.keylen == 0); + ensure(response->response.extlen == 4); + ensure(response->response.cas != 0); + break; + + case PROTOCOL_BINARY_CMD_GETK: + case PROTOCOL_BINARY_CMD_GETKQ: + ensure(response->response.keylen != 0); + ensure(response->response.extlen == 4); + ensure(response->response.cas != 0); + break; + + default: + /* Undefined command code */ + break; + } + } + else + { + ensure(response->response.cas == 0); + ensure(response->response.extlen == 0); + if (opcode != PROTOCOL_BINARY_CMD_GETK) + { + ensure(response->response.keylen == 0); + } + } + + return true; +} diff --git a/libmemcached/protocol/protocol_handler.c b/libmemcached/protocol/protocol_handler.c new file mode 100644 index 00000000..67f2b63c --- /dev/null +++ b/libmemcached/protocol/protocol_handler.c @@ -0,0 +1,1351 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#include "common.h" + +#include +#include +#include +#include +#include +#include +#include + +/* +** ********************************************************************** +** INTERNAL INTERFACE +** ********************************************************************** +*/ + +/** + * The default function to receive data from the client. This function + * just wraps the recv function to receive from a socket. + * See man -s3socket recv for more information. + * + * @param cookie cookie indentifying a client, not used + * @param sock socket to read from + * @param buf the destination buffer + * @param nbytes the number of bytes to read + * @return the number of bytes transferred of -1 upon error + */ +static ssize_t default_recv(const void *cookie, + int sock, + void *buf, + size_t nbytes) +{ + (void)cookie; + return recv(sock, buf, nbytes, 0); +} + +/** + * The default function to send data to the server. This function + * just wraps the send function to send through a socket. + * See man -s3socket send for more information. + * + * @param cookie cookie indentifying a client, not used + * @param sock socket to send to + * @param buf the source buffer + * @param nbytes the number of bytes to send + * @return the number of bytes transferred of -1 upon error + */ +static ssize_t default_send(const void *cookie, + int fd, + const void *buf, + size_t nbytes) +{ + (void)cookie; + return send(fd, buf, nbytes, 0); +} + +/** + * Try to drain the output buffers without blocking + * + * @param client the client to drain + * @return false if an error occured (connection should be shut down) + * true otherwise (please note that there may be more data to + * left in the buffer to send) + */ +static bool drain_output(struct memcached_binary_protocol_client_st *client) +{ + ssize_t len; + + // Do we have pending data to send? + while (client->output != NULL) + { + len= client->root->send(client, + client->sock, + client->output->data + client->output->offset, + client->output->nbytes - client->output->offset); + + if (len == -1) + { + if (errno == EWOULDBLOCK) + { + return true; + } + else if (errno != EINTR) + { + client->error= errno; + return false; + } + } + else + { + client->output->offset += (size_t)len; + if (client->output->offset == client->output->nbytes) + { + /* This was the complete buffer */ + struct chunk_st *old= client->output; + client->output= client->output->next; + if (client->output == NULL) + { + client->output_tail= NULL; + } + cache_free(client->root->buffer_cache, old); + } + } + } + + return true; +} + +/** + * Allocate an output buffer and chain it into the output list + * + * @param client the client that needs the buffer + * @return pointer to the new chunk if the allocation succeeds, NULL otherwise + */ +static struct chunk_st* +allocate_output_chunk(struct memcached_binary_protocol_client_st *client) +{ + struct chunk_st*ret= cache_alloc(client->root->buffer_cache); + if (ret == NULL) { + return NULL; + } + + ret->offset = ret->nbytes = 0; + ret->next = NULL; + ret->size = CHUNK_BUFFERSIZE; + ret->data= (void*)(ret + 1); + if (client->output == NULL) + { + client->output = client->output_tail = ret; + } + else + { + client->output_tail->next= ret; + client->output_tail= ret; + } + + return ret; +} + +/** + * Spool data into the send-buffer for a client. + * + * @param client the client to spool the data for + * @param data the data to spool + * @param length the number of bytes of data to spool + * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success, + * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory + */ +static protocol_binary_response_status +spool_output(struct memcached_binary_protocol_client_st *client, + const void *data, + size_t length) +{ + size_t offset = 0; + + struct chunk_st *chunk= client->output; + while (offset < length) + { + if (chunk == NULL || (chunk->size - chunk->nbytes) == 0) + { + if ((chunk= allocate_output_chunk(client)) == NULL) + { + return PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + } + + size_t bulk = length - offset; + if (bulk > chunk->size - chunk->nbytes) + { + bulk = chunk->size - chunk->nbytes; + } + + memcpy(chunk->data + chunk->nbytes, data, bulk); + chunk->nbytes += bulk; + offset += bulk; + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Send a preformatted packet back to the client. If the connection is in + * pedantic mode, it will validate the packet and refuse to send it if it + * breaks the specification. + * + * @param cookie client identification + * @param request the original request packet + * @param response the packet to send + * @return The status of the operation + */ +static protocol_binary_response_status +raw_response_handler(const void *cookie, + protocol_binary_request_header *request, + protocol_binary_response_header *response) +{ + struct memcached_binary_protocol_client_st *client= (void*)cookie; + + if (client->root->pedantic && + !memcached_binary_protocol_pedantic_check_response(request, response)) + { + return PROTOCOL_BINARY_RESPONSE_EINVAL; + } + + if (!drain_output(client)) + { + return PROTOCOL_BINARY_RESPONSE_EIO; + } + + size_t len= sizeof(*response) + htonl(response->response.bodylen); + size_t offset= 0; + char *ptr= (void*)response; + + if (client->output == NULL) + { + /* I can write directly to the socket.... */ + do + { + size_t num_bytes= len - offset; + ssize_t nw= client->root->send(client, + client->sock, + ptr + offset, + num_bytes); + if (nw == -1) + { + if (errno == EWOULDBLOCK) + { + break; + } + else if (errno != EINTR) + { + client->error= errno; + return PROTOCOL_BINARY_RESPONSE_EIO; + } + } + else + { + offset += (size_t)nw; + } + } while (offset < len); + } + + return spool_output(client, ptr, len - offset); +} + +/* + * Version 0 of the interface is really low level and protocol specific, + * while the version 1 of the interface is more API focused. We need a + * way to translate between the command codes on the wire and the + * application level interface in V1, so let's just use the V0 of the + * interface as a map instead of creating a huuuge switch :-) + */ + +/** + * Callback for the GET/GETQ/GETK and GETKQ responses + * @param cookie client identifier + * @param key the key for the item + * @param keylen the length of the key + * @param body the length of the body + * @param bodylen the length of the body + * @param flags the flags for the item + * @param cas the CAS id for the item + */ +static protocol_binary_response_status +get_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen, + uint32_t flags, + uint64_t cas) { + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + uint8_t opcode= client->current_command->request.opcode; + + if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ) + { + keylen= 0; + } + + protocol_binary_response_get response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= client->current_command->request.opaque, + .cas= htonll(cas), + .keylen= htons(keylen), + .extlen= 4, + .bodylen= htonl(bodylen + keylen + 4), + }, + .message.body.flags= htonl(flags), + }; + + protocol_binary_response_status rval; + const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; + if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success || + (rval= spool_output(client, key, keylen)) != success || + (rval= spool_output(client, body, bodylen)) != success) + { + return rval; + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Callback for the STAT responses + * @param cookie client identifier + * @param key the key for the item + * @param keylen the length of the key + * @param body the length of the body + * @param bodylen the length of the body + */ +static protocol_binary_response_status +stat_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen) { + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + + protocol_binary_response_no_extras response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= client->current_command->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= client->current_command->request.opaque, + .keylen= htons(keylen), + .bodylen= htonl(bodylen + keylen), + }, + }; + + protocol_binary_response_status rval; + const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; + if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success || + (rval= spool_output(client, key, keylen)) != success || + (rval= spool_output(client, body, bodylen)) != success) + { + return rval; + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Callback for the VERSION responses + * @param cookie client identifier + * @param text the length of the body + * @param textlen the length of the body + */ +static protocol_binary_response_status +version_response_handler(const void *cookie, + const void *text, + uint32_t textlen) { + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + + protocol_binary_response_no_extras response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= client->current_command->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= client->current_command->request.opaque, + .bodylen= htonl(textlen), + }, + }; + + protocol_binary_response_status rval; + const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; + if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success || + (rval= spool_output(client, text, textlen)) != success) + { + return rval; + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Callback for ADD and ADDQ + * @param cookie the calling client + * @param header the add/addq command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +add_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.add != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_add *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + uint32_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + uint64_t cas; + + rval= client->root->callback->interface.v1.add(cookie, key, keylen, + data, datalen, flags, + timeout, &cas); + + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_ADD) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_ADD, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(cas) + } + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for DECREMENT and DECREMENTQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +decrement_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.decrement != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + protocol_binary_request_decr *request= (void*)header; + uint64_t init= ntohll(request->message.body.initial); + uint64_t delta= ntohll(request->message.body.delta); + uint32_t timeout= ntohl(request->message.body.expiration); + void *key= request->bytes + sizeof(request->bytes); + uint64_t result; + uint64_t cas; + + char buffer[1024] = {0}; + memcpy(buffer, key, keylen); + fprintf(stderr, "%s\n", buffer); + + + rval= client->root->callback->interface.v1.decrement(cookie, key, keylen, + delta, init, timeout, + &result, &cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT) + { + /* Send a positive request */ + protocol_binary_response_decr response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_DECREMENT, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(cas), + .bodylen= htonl(8) + }, + .body.value = htonll(result) + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for DELETE and DELETEQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +delete_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.delete != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + void *key= (header + 1); + uint64_t cas= ntohll(header->request.cas); + rval= client->root->callback->interface.v1.delete(cookie, key, keylen, cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_DELETE) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_DELETE, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + } + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for FLUSH and FLUSHQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +flush_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.flush != NULL) + { + protocol_binary_request_flush *flush= (void*)header; + uint32_t timeout= 0; + if (htonl(header->request.bodylen) == 4) + { + timeout= ntohl(flush->message.body.expiration); + } + + rval= client->root->callback->interface.v1.flush(cookie, timeout); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_FLUSH, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + } + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for GET, GETK, GETQ, GETKQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +get_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.get != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + void *key= (header + 1); + rval= client->root->callback->interface.v1.get(cookie, key, keylen, + get_response_handler); + + if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT && + (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ || + header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ)) + { + /* Quiet commands shouldn't respond on cache misses */ + rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for INCREMENT and INCREMENTQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +increment_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.increment != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + protocol_binary_request_incr *request= (void*)header; + uint64_t init= ntohll(request->message.body.initial); + uint64_t delta= ntohll(request->message.body.delta); + uint32_t timeout= ntohl(request->message.body.expiration); + void *key= request->bytes + sizeof(request->bytes); + uint64_t cas; + uint64_t result; + + rval= client->root->callback->interface.v1.increment(cookie, key, keylen, + delta, init, timeout, + &result, &cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT) + { + /* Send a positive request */ + protocol_binary_response_incr response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_INCREMENT, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(cas), + .bodylen= htonl(8) + }, + .body.value = htonll(result) + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for noop. Inform the v1 interface about the noop packet, and + * create and send a packet back to the client + * + * @param cookie the calling client + * @param header the command + * @param response_handler the response handler + * @return the result of the operation + */ +static protocol_binary_response_status +noop_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.noop != NULL) + { + client->root->callback->interface.v1.noop(cookie); + } + + protocol_binary_response_no_extras response= { + .message = { + .header.response = { + .magic = PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_NOOP, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + } + } + }; + + return response_handler(cookie, header, (void*)&response); +} + +/** + * Callback for APPEND and APPENDQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +append_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.append != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen; + char *key= (void*)(header + 1); + char *data= key + keylen; + uint64_t cas= ntohll(header->request.cas); + uint64_t result_cas; + + rval= client->root->callback->interface.v1.append(cookie, key, keylen, + data, datalen, cas, + &result_cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_APPEND) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_APPEND, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(result_cas), + }, + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for PREPEND and PREPENDQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +prepend_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.prepend != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen; + char *key= (char*)(header + 1); + char *data= key + keylen; + uint64_t cas= ntohll(header->request.cas); + uint64_t result_cas; + rval= client->root->callback->interface.v1.prepend(cookie, key, keylen, + data, datalen, cas, + &result_cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_PREPEND, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(result_cas), + }, + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for QUIT and QUITQ. Notify the client and shut down the connection + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +quit_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.quit != NULL) + { + client->root->callback->interface.v1.quit(cookie); + } + + protocol_binary_response_no_extras response= { + .message = { + .header.response = { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_QUIT, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + } + }; + + if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT) + { + response_handler(cookie, header, (void*)&response); + } + + /* I need a better way to signal to close the connection */ + return PROTOCOL_BINARY_RESPONSE_EIO; +} + +/** + * Callback for REPLACE and REPLACEQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +replace_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.replace != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_replace *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + uint32_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + uint64_t cas= ntohll(header->request.cas); + uint64_t result_cas; + + rval= client->root->callback->interface.v1.replace(cookie, key, keylen, + data, datalen, flags, + timeout, cas, + &result_cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_REPLACE, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(result_cas), + }, + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for SET and SETQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +set_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.set != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_replace *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + uint32_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + uint64_t cas= ntohll(header->request.cas); + uint64_t result_cas; + + + rval= client->root->callback->interface.v1.set(cookie, key, keylen, + data, datalen, flags, + timeout, cas, &result_cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_SET) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_SET, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(result_cas), + }, + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for STAT + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +stat_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.stat != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + + rval= client->root->callback->interface.v1.stat(cookie, + (void*)(header + 1), + keylen, + stat_response_handler); + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for VERSION + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +version_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + (void)header; + protocol_binary_response_status rval; + + struct memcached_binary_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.version != NULL) + { + rval= client->root->callback->interface.v1.version(cookie, + version_response_handler); + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * The map to remap between the com codes and the v1 logical setting + */ +static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256]= { + [PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler, + [PROTOCOL_BINARY_CMD_ADD]= add_command_handler, + [PROTOCOL_BINARY_CMD_APPENDQ]= append_command_handler, + [PROTOCOL_BINARY_CMD_APPEND]= append_command_handler, + [PROTOCOL_BINARY_CMD_DECREMENTQ]= decrement_command_handler, + [PROTOCOL_BINARY_CMD_DECREMENT]= decrement_command_handler, + [PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler, + [PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler, + [PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler, + [PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler, + [PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler, + [PROTOCOL_BINARY_CMD_GETK]= get_command_handler, + [PROTOCOL_BINARY_CMD_GETQ]= get_command_handler, + [PROTOCOL_BINARY_CMD_GET]= get_command_handler, + [PROTOCOL_BINARY_CMD_INCREMENTQ]= increment_command_handler, + [PROTOCOL_BINARY_CMD_INCREMENT]= increment_command_handler, + [PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler, + [PROTOCOL_BINARY_CMD_PREPENDQ]= prepend_command_handler, + [PROTOCOL_BINARY_CMD_PREPEND]= prepend_command_handler, + [PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler, + [PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler, + [PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler, + [PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler, + [PROTOCOL_BINARY_CMD_SETQ]= set_command_handler, + [PROTOCOL_BINARY_CMD_SET]= set_command_handler, + [PROTOCOL_BINARY_CMD_STAT]= stat_command_handler, + [PROTOCOL_BINARY_CMD_VERSION]= version_command_handler, +}; + +/** + * Try to execute a command. Fire the pre/post functions and the specialized + * handler function if it's set. If not, the unknown probe should be fired + * if it's present. + * @param client the client connection to operate on + * @param header the command to execute + * @return true if success or false if a fatal error occured so that the + * connection should be shut down. + */ +static bool execute_command(struct memcached_binary_protocol_client_st *client, protocol_binary_request_header *header) +{ + if (client->root->pedantic && + memcached_binary_protocol_pedantic_check_request(header)) + { + /* @todo return invalid command packet */ + } + + /* we got all data available, execute the callback! */ + if (client->root->callback->pre_execute != NULL) + { + client->root->callback->pre_execute(client, header); + } + + protocol_binary_response_status rval; + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + uint8_t cc= header->request.opcode; + + switch (client->root->callback->interface_version) + { + case 0: + if (client->root->callback->interface.v0.comcode[cc] != NULL) { + rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler); + } + break; + case 1: + if (comcode_v0_v1_remap[cc] != NULL) { + rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler); + } + break; + default: + /* Unknown interface. + * It should be impossible to get here so I'll just call abort + * to avoid getting a compiler warning :-) + */ + abort(); + } + + + if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND && + client->root->callback->unknown != NULL) + { + rval= client->root->callback->unknown(client, header, raw_response_handler); + } + + if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS && + rval != PROTOCOL_BINARY_RESPONSE_EIO) + { + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= cc, + .status= htons(rval), + .opaque= header->request.opaque, + }, + } + }; + rval= raw_response_handler(client, header, (void*)&response); + } + + if (client->root->callback->post_execute != NULL) + { + client->root->callback->post_execute(client, header); + } + + return rval != PROTOCOL_BINARY_RESPONSE_EIO; +} + +/* +** ********************************************************************** +** * PUBLIC INTERFACE +** * See protocol_handler.h for function description +** ********************************************************************** +*/ +struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void) +{ + struct memcached_binary_protocol_st *ret= calloc(1, sizeof(*ret)); + if (ret != NULL) + { + ret->recv= default_recv; + ret->send= default_send; + ret->input_buffer_size= 1 * 1024 * 1024; + ret->input_buffer= malloc(ret->input_buffer_size); + if (ret->input_buffer == NULL) + { + free(ret); + ret= NULL; + return NULL; + } + + ret->buffer_cache = cache_create("protocol_handler", + CHUNK_BUFFERSIZE + sizeof(struct chunk_st), + 0, NULL, NULL); + if (ret->buffer_cache == NULL) { + free(ret->input_buffer); + free(ret); + } + } + + return ret; +} + +void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance) +{ + cache_destroy(instance->buffer_cache); + free(instance->input_buffer); + free(instance); +} + +struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance) +{ + return instance->callback; +} + +void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback) +{ + instance->callback= callback; +} + +memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie) +{ + (void)cookie; + return raw_response_handler; +} + +void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable) +{ + instance->pedantic= enable; +} + +bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance) +{ + return instance->pedantic; +} + +struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock) +{ + struct memcached_binary_protocol_client_st *ret= calloc(1, sizeof(*ret)); + if (ret != NULL) + { + ret->root= instance; + ret->sock= sock; + } + + return ret; +} + +void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client) +{ + free(client); +} + +enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client) +{ + /* Try to send data and read from the socket */ + bool more_data= true; + do + { + ssize_t len= client->root->recv(client, + client->sock, + client->root->input_buffer + client->input_buffer_offset, + client->root->input_buffer_size - client->input_buffer_offset); + + if (len > 0) + { + /* Do we have the complete packet? */ + if (client->input_buffer_offset > 0) + { + memcpy(client->root->input_buffer, client->input_buffer, + client->input_buffer_offset); + len += (ssize_t)client->input_buffer_offset; + + /* @todo use buffer-cache! */ + free(client->input_buffer); + client->input_buffer_offset= 0; + } + + /* try to parse all of the received packets */ + protocol_binary_request_header *header; + header= (void*)client->root->input_buffer; + + if (header->request.magic != (uint8_t)PROTOCOL_BINARY_REQ) + { + client->error= EINVAL; + return ERROR_EVENT; + } + + while (len >= (ssize_t)sizeof(*header) && + (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)))) + { + + /* I have the complete package */ + client->current_command = header; + if (!execute_command(client, header)) + { + return ERROR_EVENT; + } + + ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)); + len -= total; + if (len > 0) + { + intptr_t ptr= (intptr_t)header; + ptr += total; + if ((ptr % 8) == 0) + { + header= (void*)ptr; + } + else + { + memmove(client->root->input_buffer, (void*)ptr, (size_t)len); + header= (void*)client->root->input_buffer; + } + } + } + + if (len > 0) + { + /* save the data for later on */ + /* @todo use buffer-cache */ + client->input_buffer= malloc((size_t)len); + if (client->input_buffer == NULL) + { + client->error= ENOMEM; + return ERROR_EVENT; + } + memcpy(client->input_buffer, header, (size_t)len); + client->input_buffer_offset= (size_t)len; + more_data= false; + } + } + else if (len == 0) + { + /* Connection closed */ + drain_output(client); + return ERROR_EVENT; + } + else + { + if (errno != EWOULDBLOCK) + { + client->error= errno; + /* mark this client as terminated! */ + return ERROR_EVENT; + } + more_data = false; + } + } while (more_data); + + if (!drain_output(client)) + { + return ERROR_EVENT; + } + + return (client->output) ? READ_WRITE_EVENT : READ_EVENT; +} diff --git a/libmemcached/protocol_handler.h b/libmemcached/protocol_handler.h new file mode 100644 index 00000000..e1528d50 --- /dev/null +++ b/libmemcached/protocol_handler.h @@ -0,0 +1,191 @@ +/* + * Summary: Definition of the callback interface to the protocol handler + * + * Copy: See Copyright for the status of this software. + * + * Author: Trond Norbye + */ +#ifndef MEMCACHED_PROTOCOL_H +#define MEMCACHED_PROTOCOL_H + +#include +#include + +#include +#include +#include + +/* Forward declarations */ +/* + * You should only access memcached_binary_protocol_st from one thread!, + * and never assume anything about the internal layout / sizes of the + * structures. + */ +struct memcached_binary_protocol_st; +struct memcached_binary_protocol_client_st; + +/** + * Function the protocol handler should call to receive data. + * This function should behave exactly like read(2) + * + * @param cookie a cookie used to represent a given client + * @param fd the filedescriptor associated with the client + * @param buf destination buffer + * @param nbuf number of bytes to receive + * @return the number of bytes copied into buf + * or -1 upon error (errno should contain more information) + */ +typedef ssize_t (*memcached_binary_protocol_recv_func)(const void *cookie, + int fd, + void *buf, + size_t nbuf); + +/** + * Function the protocol handler should call to send data. + * This function should behave exactly like write(2) + * + * @param cookie a cookie used to represent a given client + * @param fd the filedescriptor associated with the client + * @param buf the source buffer + * @param nbuf number of bytes to send + * @return the number of bytes sent + * or -1 upon error (errno should contain more information) + */ +typedef ssize_t (*memcached_binary_protocol_send_func)(const void *cookie, + int fd, + const void *buf, + size_t nbuf); + +/** + * Create an instance of the protocol handler + * + * @return NULL if allocation of an instance fails + */ +LIBMEMCACHED_API +struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void); + +/** + * Get the callbacks associated with a protocol handler instance + * @return the callbacks currently used + */ +LIBMEMCACHED_API +struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance); + +/** + * Set the callbacks to be used by the given protocol handler instance + * @param instance the instance to update + * @param callback the callbacks to use + */ +LIBMEMCACHED_API +void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback); + +/** + * Should the library inspect the packages being sent and received and verify + * that they are according to the specification? If it encounters an invalid + * packet, it will return an EINVAL packet. + * + * @param instance the instance to update + * @param enable true if you want the library to check packages, false otherwise + */ +LIBMEMCACHED_API +void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable); + +/** + * Is the library inpecting each package? + * @param instance the instance to check + * @return true it the library is inspecting each package, false otherwise + */ +LIBMEMCACHED_API +bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance); + +/** + * Destroy an instance of the protocol handler + * + * @param instance The instance to destroy + */ +LIBMEMCACHED_API +void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance); + +/** + * Set the IO functions used by the instance to send and receive data. The + * functions should behave like recv(3socket) and send(3socket). + * + * @param instance the instance to specify the IO functions for + * @param recv the function to call for reciving data + * @param send the function to call for sending data + */ +LIBMEMCACHED_API +void memached_binary_protocol_set_io_functions(struct memcached_binary_protocol_st *instance, + memcached_binary_protocol_recv_func recv, + memcached_binary_protocol_send_func send); + + +/** + * Create a new client instance and associate it with a socket + * @param instance the protocol instance to bind the client to + * @param sock the client socket + * @return NULL if allocation fails, otherwise an instance + */ +LIBMEMCACHED_API +struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock); + +/** + * Destroy a client handle. + * The caller needs to close the socket accociated with the client + * before calling this function. This function invalidates the + * client memory area. + * + * @param client the client to destroy + */ +LIBMEMCACHED_API +void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client); + +/** + * The different events the client is interested in + */ +enum MEMCACHED_BINARY_PROTOCOL_EVENT { + /* Error event means that the client encountered an error with the + * connection so you should shut it down */ + ERROR_EVENT, + /* Please notify when there is more data available to read */ + READ_EVENT, + /* Please notify when it is possible to send more data */ + WRITE_EVENT, + /* Please notify when it is possible to send or receive data */ + READ_WRITE_EVENT +}; + +/** + * Let the client do some work. This might involve reading / sending data + * to/from the client, or perform callbacks to execute a command. + * @param client the client structure to work on + * @return The next event the protocol handler will be notified for + */ +LIBMEMCACHED_API +enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client); + +/** + * Get the socket attached to a client handle + * @param client the client to query + * @return the socket handle + */ +LIBMEMCACHED_API +int memcached_binary_protocol_client_get_socket(struct memcached_binary_protocol_client_st *client); + +/** + * Get the error id socket attached to a client handle + * @param client the client to query for an error code + * @return the OS error code from the client + */ +LIBMEMCACHED_API +int memcached_binary_protocol_client_get_errno(struct memcached_binary_protocol_client_st *client); + +/** + * Get a raw response handler for the given cookie + * @param cookie the cookie passed along into the callback + * @return the raw reponse handler you may use if you find + * the generic callback too limiting + */ +LIBMEMCACHED_API +memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie); +#endif