*.lo
*/.deps
*/.libs
+*/*/.deps
+*/*/.libs
*/Makefile
*/Makefile.in
*/*.l[oa]
+*/*/*.l[oa]
*TAGS
INSTALL
Makefile
config/install-sh
config/ltmain.sh
config/missing
+config/plugin.ac
configure
docs/*.[13]
+example/memcached_light
libmemcached-*.tar.gz
libmemcached/memcached_configure.h
libtool
libmemcached-0.30-1.x86_64.rpm
libmemcached-0.31-1.src.rpm
libmemcached-0.31-1.x86_64.rpm
-config/plugin.ac
+libmemcached-?.??/
ACLOCAL_AMFLAGS = -I m4
-SUBDIRS = docs libmemcached libmemcachedutil support clients tests
+SUBDIRS = docs libmemcached libmemcachedutil support clients tests example
EXTRA_dist = README.FIRST
check-local: test-no-outputdiff
AC_SUBST(MEMCACHED_LIBRARY_VERSION)
MEMCACHEDUTIL_LIBRARY_VERSION=0:0:0
AC_SUBST(MEMCACHEDUTIL_LIBRARY_VERSION)
+MEMCACHEDPROTOCOL_LIBRARY_VERSION=0:0:0
+AC_SUBST(MEMCACHEDPROTOCOL_LIBRARY_VERSION)
# libmemcached versioning when linked with GNU ld.
then
LD_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcached/libmemcached.ver"
LD_UTIL_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcachedutil/libmemcachedutil.ver"
+ LD_PROTOCOL_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcached/protocol/libmemcachedprotocol.ver"
fi
AC_SUBST(LD_VERSION_SCRIPT)
AC_SUBST(LD_UTIL_VERSION_SCRIPT)
+AC_SUBST(LD_PROTOCOL_VERSION_SCRIPT)
#--------------------------------------------------------------------
clients/Makefile
tests/Makefile
docs/Makefile
+ example/Makefile
libmemcached/Makefile
libmemcached/memcached_configure.h
libmemcachedutil/Makefile
+ libmemcached/protocol/Makefile
support/Makefile
support/libmemcached.pc
support/libmemcached.spec
--- /dev/null
+noinst_PROGRAMS = memcached_light
+
+memcached_light_SOURCES= memcached_light.c \
+ storage.c storage.h \
+ interface_v0.c \
+ interface_v1.c
+memcached_light_LDADD= $(top_builddir)/libmemcached/protocol/libmemcachedprotocol.la
+
--- /dev/null
+#ifndef EXAMPLE_COMMON_H
+#define EXAMPLE_COMMON_H
+
+#include <netinet/in.h>
+
+#ifdef linux
+/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
+ * optimize the conversion functions, but the prototypes generate warnings
+ * from gcc. The conversion methods isn't the bottleneck for my app, so
+ * just remove the warnings by undef'ing the optimization ..
+ */
+#undef ntohs
+#undef ntohl
+#undef htons
+#undef htonl
+#endif
+
+#endif
--- /dev/null
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+/**
+ * This file contains an implementation of the callback interface for level 0
+ * in the protocol library. You might want to have your copy of the protocol
+ * specification next to your coffee ;-)
+ */
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/tcp.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libmemcached/protocol_handler.h>
+#include "common.h"
+#include "storage.h"
+
+static protocol_binary_response_status noop_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ protocol_binary_response_no_extras response= {
+ .message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_NOOP,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ };
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status quit_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ protocol_binary_response_no_extras response= {
+ .message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_QUIT,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ };
+
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
+ response_handler(cookie, header, (void*)&response);
+
+ /* I need a better way to signal to close the connection */
+ return PROTOCOL_BINARY_RESPONSE_EIO;
+}
+
+static protocol_binary_response_status get_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ uint8_t opcode= header->request.opcode;
+ union {
+ protocol_binary_response_get response;
+ char buffer[4096];
+ } msg= {
+ .response.message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ };
+
+ struct item *item= get_item(header + 1, ntohs(header->request.keylen));
+ if (item)
+ {
+ msg.response.message.body.flags= htonl(item->flags);
+ char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4);
+ uint32_t bodysize= 4;
+ msg.response.message.header.response.cas= htonll(item->cas);
+ if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ)
+ {
+ memcpy(ptr, item->key, item->nkey);
+ msg.response.message.header.response.keylen= htons((uint16_t)item->nkey);
+ ptr += item->nkey;
+ bodysize += (uint32_t)item->nkey;
+ }
+ memcpy(ptr, item->data, item->size);
+ bodysize += (uint32_t)item->size;
+ msg.response.message.header.response.bodylen= htonl(bodysize);
+ msg.response.message.header.response.extlen= 4;
+
+ return response_handler(cookie, header, (void*)&msg);
+ }
+ else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK)
+ {
+ msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
+ return response_handler(cookie, header, (void*)&msg);
+ }
+
+ /* Q shouldn't report a miss ;-) */
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status delete_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ size_t keylen= ntohs(header->request.keylen);
+ char *key= ((char*)header) + sizeof(*header);
+ protocol_binary_response_no_extras response= {
+ .message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= header->request.opcode,
+ .opaque= header->request.opaque
+ }
+ };
+
+ if (!delete_item(key, keylen))
+ {
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
+ return response_handler(cookie, header, (void*)&response);
+ }
+ else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
+ {
+ /* DELETEQ doesn't want success response */
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
+ return response_handler(cookie, header, (void*)&response);
+ }
+
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status flush_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ uint8_t opcode= header->request.opcode;
+
+ /* @fixme sett inn when! */
+ flush(0);
+
+ if (opcode == PROTOCOL_BINARY_CMD_FLUSH)
+ {
+ protocol_binary_response_no_extras response= {
+ .message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ };
+ return response_handler(cookie, header, (void*)&response);
+ }
+
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status arithmetic_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ protocol_binary_request_incr *req= (void*)header;
+ protocol_binary_response_incr response= {
+ .message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= header->request.opcode,
+ .opaque= header->request.opaque,
+ },
+ };
+
+ uint16_t keylen= ntohs(header->request.keylen);
+ uint64_t initial= ntohll(req->message.body.initial);
+ uint64_t delta= ntohll(req->message.body.delta);
+ uint32_t expiration= ntohl(req->message.body.expiration);
+ void *key= req->bytes + sizeof(req->bytes);
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+ struct item *item= get_item(key, keylen);
+ if (item == NULL)
+ {
+ item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration);
+ if (item == 0)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ memcpy(item->data, &initial, sizeof(initial));
+ put_item(item);
+ }
+ }
+ else
+ {
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT ||
+ header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ)
+ {
+ (*(uint64_t*)item->data) += delta;
+ }
+ else
+ {
+ if (delta > *(uint64_t*)item->data)
+ {
+ *(uint64_t*)item->data= 0;
+ }
+ else
+ {
+ *(uint64_t*)item->data -= delta;
+ }
+ }
+ update_cas(item);
+ }
+
+ response.message.header.response.status= htons(rval);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS)
+ {
+ response.message.header.response.bodylen= ntohl(8);
+ response.message.body.value= ntohll((*(uint64_t*)item->data));
+ response.message.header.response.cas= ntohll(item->cas);
+
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ ||
+ header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ)
+ {
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ }
+ }
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status version_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ const char *versionstring= "1.0.0";
+ union {
+ protocol_binary_response_header packet;
+ char buffer[256];
+ } response= {
+ .packet.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_VERSION,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ .bodylen= htonl((uint32_t)strlen(versionstring))
+ }
+ };
+
+ memcpy(response.buffer + sizeof(response.packet), versionstring, strlen(versionstring));
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status concat_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ uint16_t keylen= ntohs(header->request.keylen);
+ uint64_t cas= ntohll(header->request.cas);
+ void *key= header + 1;
+ uint32_t vallen= ntohl(header->request.bodylen) - keylen;
+ void *val= (char*)key + keylen;
+
+ struct item *item= get_item(key, keylen);
+ struct item *nitem;
+ if (item == NULL)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+ }
+ else if (cas != 0 && cas != item->cas)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+ }
+ else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
+ item->flags, item->exp)) == NULL)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
+ header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ)
+ {
+ memcpy(nitem->data, item->data, item->size);
+ memcpy(((char*)(nitem->data)) + item->size, val, vallen);
+ }
+ else
+ {
+ memcpy(nitem->data, val, vallen);
+ memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
+ }
+ delete_item(key, keylen);
+ put_item(nitem);
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
+ header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
+ {
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= header->request.opcode,
+ .status= htons(rval),
+ .opaque= header->request.opaque,
+ .cas= htonll(nitem->cas),
+ }
+ }
+ };
+ return response_handler(cookie, header, (void*)&response);
+ }
+ }
+
+ return rval;
+}
+
+static protocol_binary_response_status set_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ size_t keylen= ntohs(header->request.keylen);
+ size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+ protocol_binary_request_replace *request= (void*)header;
+ uint32_t flags= ntohl(request->message.body.flags);
+ time_t timeout= ntohl(request->message.body.expiration);
+ char *key= ((char*)header) + sizeof(*header) + 8;
+ char *data= key + keylen;
+
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= header->request.opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ }
+ };
+
+ if (header->request.cas != 0)
+ {
+ /* validate cas */
+ struct item* item= get_item(key, keylen);
+ if (item != NULL && item->cas != ntohll(header->request.cas))
+ {
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
+ return response_handler(cookie, header, (void*)&response);
+ }
+ }
+
+ delete_item(key, keylen);
+ struct item* item= create_item(key, keylen, data, datalen, flags, timeout);
+ if (item == 0)
+ {
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
+ }
+ else
+ {
+ put_item(item);
+ /* SETQ shouldn't return a message */
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_SET)
+ {
+ response.message.header.response.cas= htonll(item->cas);
+ return response_handler(cookie, header, (void*)&response);
+ }
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ }
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status add_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ size_t keylen= ntohs(header->request.keylen);
+ size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+ protocol_binary_request_add *request= (void*)header;
+ uint32_t flags= ntohl(request->message.body.flags);
+ time_t timeout= ntohl(request->message.body.expiration);
+ char *key= ((char*)header) + sizeof(*header) + 8;
+ char *data= key + keylen;
+
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= header->request.opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ }
+ };
+
+ struct item* item= get_item(key, keylen);
+ if (item == NULL)
+ {
+ item= create_item(key, keylen, data, datalen, flags, timeout);
+ if (item == 0)
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
+ else
+ {
+ put_item(item);
+ /* ADDQ shouldn't return a message */
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
+ {
+ response.message.header.response.cas= htonll(item->cas);
+ return response_handler(cookie, header, (void*)&response);
+ }
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ }
+ }
+ else
+ {
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
+ }
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status replace_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ size_t keylen= ntohs(header->request.keylen);
+ size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+ protocol_binary_request_replace *request= (void*)header;
+ uint32_t flags= ntohl(request->message.body.flags);
+ time_t timeout= ntohl(request->message.body.expiration);
+ char *key= ((char*)header) + sizeof(*header) + 8;
+ char *data= key + keylen;
+
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= header->request.opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ }
+ };
+
+ struct item* item= get_item(key, keylen);
+ if (item == NULL)
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
+ else if (header->request.cas == 0 || ntohll(header->request.cas) == item->cas)
+ {
+ delete_item(key, keylen);
+ item= create_item(key, keylen, data, datalen, flags, timeout);
+ if (item == 0)
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
+ else
+ {
+ put_item(item);
+ /* REPLACEQ shouldn't return a message */
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
+ {
+ response.message.header.response.cas= htonll(item->cas);
+ return response_handler(cookie, header, (void*)&response);
+ }
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ }
+ }
+ else
+ response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status stat_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ /* Just send the terminating packet*/
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_STAT,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ }
+ };
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+struct memcached_binary_protocol_callback_st interface_v0_impl= {
+ .interface_version= 0,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler,
+ .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler,
+};
--- /dev/null
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+/**
+ * This file contains an implementation of the callback interface for level 1
+ * in the protocol library. If you compare the implementation with the one
+ * in interface_v0.c you will see that this implementation is much easier and
+ * hides all of the protocol logic and let you focus on the application
+ * logic. One "problem" with this layer is that it is synchronous, so that
+ * you will not receive the next command before a answer to the previous
+ * command is being sent.
+ */
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/tcp.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libmemcached/protocol_handler.h>
+#include "common.h"
+#include "storage.h"
+
+static protocol_binary_response_status add_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void *data,
+ uint32_t datalen,
+ uint32_t flags,
+ uint32_t exptime,
+ uint64_t *cas)
+{
+ (void)cookie;
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ struct item* item= get_item(key, keylen);
+ if (item == NULL)
+ {
+ item= create_item(key, keylen, data, datalen, flags, exptime);
+ if (item == 0)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ put_item(item);
+ *cas= item->cas;
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+ }
+
+ return rval;
+}
+
+static protocol_binary_response_status append_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* val,
+ uint32_t vallen,
+ uint64_t cas,
+ uint64_t *result_cas) {
+ (void)cookie;
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+ struct item *item= get_item(key, keylen);
+ struct item *nitem;
+
+ if (item == NULL)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+ }
+ else if (cas != 0 && cas != item->cas)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+ }
+ else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
+ item->flags, item->exp)) == NULL)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ memcpy(nitem->data, item->data, item->size);
+ memcpy(((char*)(nitem->data)) + item->size, val, vallen);
+ delete_item(key, keylen);
+ put_item(nitem);
+ *result_cas= nitem->cas;
+ }
+
+ return rval;
+}
+
+static protocol_binary_response_status decrement_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ uint64_t delta,
+ uint64_t initial,
+ uint32_t expiration,
+ uint64_t *result,
+ uint64_t *result_cas) {
+ (void)cookie;
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ struct item *item= get_item(key, keylen);
+
+ if (item == NULL)
+ {
+ item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration);
+ if (item == 0)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ memcpy(item->data, &initial, sizeof(initial));
+ put_item(item);
+ *result= initial;
+ *result_cas= item->cas;
+ }
+ }
+ else
+ {
+ if (delta > *(uint64_t*)item->data)
+ {
+ *(uint64_t*)item->data= 0;
+ }
+ else
+ {
+ *(uint64_t*)item->data -= delta;
+ }
+ *result= (*(uint64_t*)item->data);
+ /* @todo fix cas */
+ }
+
+ return rval;
+}
+
+static protocol_binary_response_status delete_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ uint64_t cas) {
+ (void)cookie;
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+ if (cas != 0)
+ {
+ struct item *item= get_item(key, keylen);
+ if (item != NULL && item->cas != cas)
+ {
+ return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+ }
+ }
+
+ if (!delete_item(key, keylen))
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+ }
+
+ return rval;
+}
+
+
+static protocol_binary_response_status flush_handler(const void *cookie,
+ uint32_t when) {
+
+ (void)cookie;
+ flush(when);
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status get_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ memcached_binary_protocol_get_response_handler response_handler) {
+ struct item *item= get_item(key, keylen);
+
+ if (item == NULL)
+ {
+ return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+ }
+
+ return response_handler(cookie, key, (uint16_t)keylen,
+ item->data, (uint32_t)item->size, item->flags,
+ item->cas);
+}
+
+static protocol_binary_response_status increment_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ uint64_t delta,
+ uint64_t initial,
+ uint32_t expiration,
+ uint64_t *result,
+ uint64_t *result_cas) {
+ (void)cookie;
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ struct item *item= get_item(key, keylen);
+
+ if (item == NULL)
+ {
+ item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration);
+ if (item == 0)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ memcpy(item->data, &initial, sizeof(initial));
+ put_item(item);
+ *result= initial;
+ *result_cas= item->cas;
+ }
+ }
+ else
+ {
+ (*(uint64_t*)item->data) += delta;
+ *result= (*(uint64_t*)item->data);
+ update_cas(item);
+ *result_cas= item->cas;
+ }
+
+ return rval;
+}
+
+static protocol_binary_response_status noop_handler(const void *cookie) {
+ (void)cookie;
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status prepend_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* val,
+ uint32_t vallen,
+ uint64_t cas,
+ uint64_t *result_cas) {
+ (void)cookie;
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+ struct item *item= get_item(key, keylen);
+ struct item *nitem;
+ if (item == NULL)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+ }
+ else if (cas != 0 && cas != item->cas)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+ }
+ else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
+ item->flags, item->exp)) == NULL)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ memcpy(nitem->data, val, vallen);
+ memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
+ delete_item(key, keylen);
+ put_item(nitem);
+ *result_cas= nitem->cas;
+ }
+
+ return rval;
+}
+
+static protocol_binary_response_status quit_handler(const void *cookie) {
+ (void)cookie;
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status replace_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* data,
+ uint32_t datalen,
+ uint32_t flags,
+ uint32_t exptime,
+ uint64_t cas,
+ uint64_t *result_cas) {
+ (void)cookie;
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ struct item* item= get_item(key, keylen);
+
+ if (item == NULL)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+ }
+ else if (cas == 0 || cas == item->cas)
+ {
+ delete_item(key, keylen);
+ item= create_item(key, keylen, data, datalen, flags, exptime);
+ if (item == 0)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ put_item(item);
+ *result_cas= item->cas;
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+ }
+
+ return rval;
+}
+
+static protocol_binary_response_status set_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* data,
+ uint32_t datalen,
+ uint32_t flags,
+ uint32_t exptime,
+ uint64_t cas,
+ uint64_t *result_cas) {
+ (void)cookie;
+ protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+ if (cas != 0)
+ {
+ struct item* item= get_item(key, keylen);
+ if (item != NULL && cas != item->cas)
+ {
+ /* Invalid CAS value */
+ return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+ }
+ }
+
+ delete_item(key, keylen);
+ struct item* item= create_item(key, keylen, data, datalen, flags, exptime);
+ if (item == 0)
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ else
+ {
+ put_item(item);
+ *result_cas= item->cas;
+ }
+
+ return rval;
+}
+
+static protocol_binary_response_status stat_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ memcached_binary_protocol_stat_response_handler response_handler) {
+ (void)key;
+ (void)keylen;
+ /* Just return an empty packet */
+ return response_handler(cookie, NULL, 0, NULL, 0);
+}
+
+static protocol_binary_response_status version_handler(const void *cookie,
+ memcached_binary_protocol_version_response_handler response_handler) {
+ const char *version= "0.1.1";
+ return response_handler(cookie, version, (uint32_t)strlen(version));
+}
+
+struct memcached_binary_protocol_callback_st interface_v1_impl= {
+ .interface_version= 1,
+ .interface.v1= {
+ .add= add_handler,
+ .append= append_handler,
+ .decrement= decrement_handler,
+ .delete= delete_handler,
+ .flush= flush_handler,
+ .get= get_handler,
+ .increment= increment_handler,
+ .noop= noop_handler,
+ .prepend= prepend_handler,
+ .quit= quit_handler,
+ .replace= replace_handler,
+ .set= set_handler,
+ .stat= stat_handler,
+ .version= version_handler
+ }
+};
--- /dev/null
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+/**
+ * What is a library without an example to show you how to use the library?
+ * This example use both interfaces to implement a small memcached server.
+ * Please note that this is an exemple on how to use the library, not
+ * an implementation of a scalable memcached server. If you look closely
+ * at the example it isn't even multithreaded ;-)
+ *
+ * With that in mind, let me give you some pointers into the source:
+ * storage.c/h - Implements the item store for this server and not really
+ * interesting for this example.
+ * interface_v0.c - Shows an implementation of the memcached server by using
+ * the "raw" access to the packets as they arrive
+ * interface_v1.c - Shows an implementation of the memcached server by using
+ * the more "logical" interface.
+ * memcached_light.c - This file sets up all of the sockets and run the main
+ * message loop.
+ */
+
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/tcp.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <poll.h>
+
+#include <libmemcached/protocol_handler.h>
+#include "common.h"
+#include "storage.h"
+
+extern struct memcached_binary_protocol_callback_st interface_v0_impl;
+extern struct memcached_binary_protocol_callback_st interface_v1_impl;
+
+static int server_sockets[1024];
+static int num_server_sockets= 0;
+static void* socket_userdata_map[1024];
+
+/**
+ * Create a socket and bind it to a specific port number
+ * @param port the port number to bind to
+ */
+static int server_socket(const char *port) {
+ struct addrinfo *ai;
+ struct addrinfo hints= { .ai_flags= AI_PASSIVE,
+ .ai_family= AF_UNSPEC,
+ .ai_socktype= SOCK_STREAM };
+
+ int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
+ if (error != 0)
+ {
+ if (error != EAI_SYSTEM)
+ fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
+ else
+ perror("getaddrinfo()");
+
+ return 1;
+ }
+
+ struct linger ling= {0, 0};
+
+ for (struct addrinfo *next= ai; next; next= next->ai_next)
+ {
+ int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+ if (sock == -1)
+ {
+ perror("Failed to create socket");
+ continue;
+ }
+
+ int flags= fcntl(sock, F_GETFL, 0);
+ if (flags == -1)
+ {
+ perror("Failed to get socket flags");
+ close(sock);
+ continue;
+ }
+
+ if ((flags & O_NONBLOCK) != O_NONBLOCK)
+ if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
+ {
+ perror("Failed to set socket to nonblocking mode");
+ close(sock);
+ continue;
+ }
+
+ flags= 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
+ perror("Failed to set SO_REUSEADDR");
+
+ if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
+ perror("Failed to set SO_KEEPALIVE");
+
+ if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
+ perror("Failed to set SO_LINGER");
+
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
+ perror("Failed to set TCP_NODELAY");
+
+ if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
+ {
+ if (errno != EADDRINUSE)
+ {
+ perror("bind()");
+ freeaddrinfo(ai);
+ }
+ close(sock);
+ continue;
+ }
+
+ if (listen(sock, 1024) == -1)
+ {
+ perror("listen()");
+ close(sock);
+ continue;
+ }
+
+ server_sockets[num_server_sockets++]= sock;
+ }
+
+ freeaddrinfo(ai);
+
+ return (num_server_sockets > 0) ? 0 : 1;
+}
+
+/**
+ * Convert a command code to a textual string
+ * @param cmd the comcode to convert
+ * @return a textual string with the command or NULL for unknown commands
+ */
+static const char* comcode2str(uint8_t cmd)
+{
+ static const char * const text[] = {
+ "GET", "SET", "ADD", "REPLACE", "DELETE",
+ "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
+ "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
+ "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
+ "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
+ "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
+ };
+
+ if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
+ return text[cmd];
+
+ return NULL;
+}
+
+/**
+ * Print out the command we are about to execute
+ */
+static void pre_execute(const void *cookie, protocol_binary_request_header *header)
+{
+ const char *cmd= comcode2str(header->request.opcode);
+ if (cmd != NULL)
+ fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
+ else
+ fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+}
+
+/**
+ * Print out the command we just executed
+ */
+static void post_execute(const void *cookie, protocol_binary_request_header *header)
+{
+ const char *cmd= comcode2str(header->request.opcode);
+ if (cmd != NULL)
+ fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
+ else
+ fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+}
+
+/**
+ * Callback handler for all unknown commands.
+ * Send an unknown command back to the client
+ */
+static protocol_binary_response_status unknown(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= header->request.opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
+ .opaque= header->request.opaque
+ }
+ }
+ };
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+static void work(void);
+
+/**
+ * Program entry point. Bind to the specified port(s) and serve clients
+ *
+ * @param argc number of items in the argument vector
+ * @param argv argument vector
+ * @return 0 on success, 1 otherwise
+ */
+int main(int argc, char **argv)
+{
+ bool port_specified= false;
+ int cmd;
+ struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
+
+ while ((cmd= getopt(argc, argv, "1p:?")) != EOF)
+ {
+ switch (cmd) {
+ case '1':
+ interface= &interface_v1_impl;
+ break;
+ case 'p':
+ port_specified= true;
+ (void)server_socket(optarg);
+ break;
+ case '?': /* FALLTHROUGH */
+ default:
+ (void)fprintf(stderr, "Usage: %s [-p port]\n", argv[0]);
+ return 1;
+ }
+ }
+
+ if (!port_specified)
+ (void)server_socket("9999");
+
+ if (num_server_sockets == 0)
+ {
+ fprintf(stderr, "I don't have any server sockets\n");
+ return 1;
+ }
+
+ /*
+ * Create and initialize the handles to the protocol handlers. I want
+ * to be able to trace the traffic throught the pre/post handlers, and
+ * set up a common handler for unknown messages
+ */
+ interface->pre_execute= pre_execute;
+ interface->post_execute= post_execute;
+ interface->unknown= unknown;
+
+ struct memcached_binary_protocol_st *protocol_handle;
+ if ((protocol_handle= memcached_binary_protocol_create_instance()) == NULL)
+ {
+ fprintf(stderr, "Failed to allocate protocol handle\n");
+ return 1;
+ }
+
+ memcached_binary_protocol_set_callbacks(protocol_handle, interface);
+ memcached_binary_protocol_set_pedantic(protocol_handle, true);
+
+ for (int xx= 0; xx < num_server_sockets; ++xx)
+ socket_userdata_map[server_sockets[xx]]= protocol_handle;
+
+ /* Serve all of the clients */
+ work();
+
+ /* NOTREACHED */
+ return 0;
+}
+
+static void work(void) {
+#define MAX_SERVERS_TO_POLL 100
+ struct pollfd fds[MAX_SERVERS_TO_POLL];
+ int max_poll;
+
+ for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
+ {
+ fds[max_poll].events= POLLIN;
+ fds[max_poll].revents= 0;
+ fds[max_poll].fd= server_sockets[max_poll];
+ ++max_poll;
+ }
+
+ while (true)
+ {
+ int err= poll(fds, (nfds_t)max_poll, -1);
+
+ if (err == 0 || (err == -1 && errno != EINTR))
+ {
+ perror("poll() failed");
+ abort();
+ }
+
+ /* find the available filedescriptors */
+ for (int x= max_poll - 1; x > -1 && err > 0; --x)
+ {
+ if (fds[x].revents != 0)
+ {
+ --err;
+ if (x < num_server_sockets)
+ {
+ /* accept new client */
+ struct sockaddr_storage addr;
+ socklen_t addrlen= sizeof(addr);
+ int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
+ &addrlen);
+
+ if (sock == -1)
+ {
+ perror("Failed to accept client");
+ continue;
+ }
+
+ struct memcached_binary_protocol_st *protocol;
+ protocol= socket_userdata_map[fds[x].fd];
+
+ struct memcached_binary_protocol_client_st* c;
+ c= memcached_binary_protocol_create_client(protocol, sock);
+ if (c == NULL)
+ {
+ fprintf(stderr, "Failed to create client\n");
+ close(sock);
+ }
+ else
+ {
+ socket_userdata_map[sock]= c;
+ fds[max_poll].events= POLLIN;
+ fds[max_poll].revents= 0;
+ fds[max_poll].fd= sock;
+ ++max_poll;
+ }
+ }
+ else
+ {
+ /* drive the client */
+ struct memcached_binary_protocol_client_st* c;
+ c= socket_userdata_map[fds[x].fd];
+ assert(c != NULL);
+ fds[max_poll].events= 0;
+
+ switch (memcached_binary_protocol_client_work(c))
+ {
+ case WRITE_EVENT:
+ case READ_WRITE_EVENT:
+ fds[max_poll].events= POLLOUT;
+ /* FALLTHROUGH */
+ case READ_EVENT:
+ fds[max_poll].events |= POLLIN;
+ break;
+ case ERROR_EVENT:
+ default: /* ERROR or unknown state.. close */
+ memcached_binary_protocol_client_destroy(c);
+ close(fds[x].fd);
+ fds[x].events= 0;
+
+ if (x != max_poll - 1)
+ memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
+
+ --max_poll;
+ }
+ }
+ }
+ }
+ }
+}
--- /dev/null
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#include <stdlib.h>
+#include <inttypes.h>
+#include <time.h>
+#include <stdbool.h>
+#include <string.h>
+#include "storage.h"
+
+struct list_entry {
+ struct item item;
+ struct list_entry *next;
+ struct list_entry *prev;
+};
+
+static struct list_entry *root;
+static uint64_t cas;
+
+void put_item(struct item* item) {
+ struct list_entry* entry= (void*)item;
+ update_cas(item);
+
+ if (root == NULL)
+ {
+ entry->next= entry->prev= entry;
+ }
+ else
+ {
+ entry->prev= root->prev;
+ entry->next= root;
+ entry->prev->next= entry;
+ entry->next->prev= entry;
+ }
+
+ root= entry;
+}
+
+struct item* get_item(const void* key, size_t nkey) {
+ struct list_entry *walker= root;
+ if (root == NULL)
+ {
+ return NULL;
+ }
+
+ do
+ {
+ if (((struct item*)walker)->nkey == nkey &&
+ memcmp(((struct item*)walker)->key, key, nkey) == 0)
+ {
+ return (struct item*)walker;
+ }
+ walker= walker->next;
+ } while (walker != root);
+
+ return NULL;
+}
+
+struct item* create_item(const void* key, size_t nkey, const void* data,
+ size_t size, uint32_t flags, time_t exp)
+{
+ struct item* ret= calloc(1, sizeof(struct list_entry));
+ if (ret != NULL)
+ {
+ ret->key= malloc(nkey);
+ if (size > 0)
+ {
+ ret->data= malloc(size);
+ }
+
+ if (ret->key == NULL || (size > 0 && ret->data == NULL))
+ {
+ free(ret->key);
+ free(ret->data);
+ free(ret);
+ return NULL;
+ }
+
+ memcpy(ret->key, key, nkey);
+ if (data != NULL)
+ {
+ memcpy(ret->data, data, size);
+ }
+
+ ret->nkey= nkey;
+ ret->size= size;
+ ret->flags= flags;
+ ret->exp= exp;
+ }
+
+ return ret;
+}
+
+bool delete_item(const void* key, size_t nkey) {
+ struct item* item= get_item(key, nkey);
+ bool ret= false;
+
+ if (item)
+ {
+ /* remove from linked list */
+ struct list_entry *entry= (void*)item;
+
+ if (entry->next == entry)
+ {
+ /* Only one object in the list */
+ root= NULL;
+ }
+ else
+ {
+ /* ensure that we don't loose track of the root, and this will
+ * change the start position for the next search ;-) */
+ root= entry->next;
+ entry->prev->next= entry->next;
+ entry->next->prev= entry->prev;
+ }
+
+ free(item->key);
+ free(item->data);
+ free(item);
+ ret= true;
+ }
+
+ return ret;
+}
+
+void flush(uint32_t when) {
+ /* FIXME */
+ (void)when;
+ /* remove the complete linked list */
+ if (root == NULL)
+ {
+ return;
+ }
+
+ root->prev->next= NULL;
+ while (root != NULL)
+ {
+ struct item* tmp= (void*)root;
+ root= root->next;
+
+ free(tmp->key);
+ free(tmp->data);
+ free(tmp);
+ }
+}
+
+void update_cas(struct item* item) {
+ item->cas= ++cas;
+}
--- /dev/null
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#ifndef STORAGE_H
+#define STORAGE_H
+
+struct item {
+ uint64_t cas;
+ void* key;
+ size_t nkey;
+ void* data;
+ size_t size;
+ uint32_t flags;
+ time_t exp;
+};
+
+void update_cas(struct item* item);
+void put_item(struct item* item);
+struct item* get_item(const void* key, size_t nkey);
+struct item* create_item(const void* key, size_t nkey, const void *data,
+ size_t size, uint32_t flags, time_t exp);
+bool delete_item(const void* key, size_t nkey);
+void flush(uint32_t when);
+
+#endif
EXTRA_DIST = libmemcached_probes.d memcached/README.txt libmemcached.ver \
memcached_configure.h.in
+SUBDIRS = protocol
+
EXTRA_HEADERS =
BUILT_SOURCES=
memcached_string.h \
memcached_types.h \
memcached_watchpoint.h \
+ protocol_handler.h \
visibility.h
+nobase_pkginclude_HEADERS=protocol/cache.h \
+ protocol/callback.h
+
if BUILD_LIBMEMCACHEDUTIL
pkginclude_HEADERS+= memcached_util.h memcached_pool.h
PROTOCOL_BINARY_RESPONSE_EINVAL = 0x04,
PROTOCOL_BINARY_RESPONSE_NOT_STORED = 0x05,
PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND = 0x81,
- PROTOCOL_BINARY_RESPONSE_ENOMEM = 0x82
+ PROTOCOL_BINARY_RESPONSE_ENOMEM = 0x82,
+
+
+ PROTOCOL_BINARY_RESPONSE_EIO = 0xff
} protocol_binary_response_status;
/**
--- /dev/null
+EXTRA_DIST= libmemcachedprotocol.ver
+
+lib_LTLIBRARIES=libmemcachedprotocol.la
+
+noinst_HEADERS = common.h
+
+libmemcachedprotocol_la_SOURCES= protocol_handler.c cache.c pedantic.c
+libmemcachedprotocol_la_LDFLAGS= -version-info $(MEMCACHEDPROTOCOL_LIBRARY_VERSION) $(LD_PROTOCOL_VERSION_SCRIPT)
--- /dev/null
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#include <stdlib.h>
+#include <string.h>
+#include <inttypes.h>
+
+#ifndef NDEBUG
+#include <signal.h>
+#endif
+
+#include "cache.h"
+
+#ifndef NDEBUG
+const uint64_t redzone_pattern = 0xdeadbeefcafebabe;
+int cache_error = 0;
+#endif
+
+const size_t initial_pool_size = 64;
+
+cache_t* cache_create(const char *name, size_t bufsize, size_t align,
+ cache_constructor_t* constructor,
+ cache_destructor_t* destructor) {
+ cache_t* ret = calloc(1, sizeof(cache_t));
+ char* nm = strdup(name);
+ void** ptr = calloc(initial_pool_size, bufsize);
+ if (ret == NULL || nm == NULL || ptr == NULL ||
+ pthread_mutex_init(&ret->mutex, NULL) == -1) {
+ free(ret);
+ free(nm);
+ free(ptr);
+ return NULL;
+ }
+
+ ret->name = nm;
+ ret->ptr = ptr;
+ ret->freetotal = initial_pool_size;
+ ret->constructor = constructor;
+ ret->destructor = destructor;
+
+#ifndef NDEBUG
+ ret->bufsize = bufsize + 2 * sizeof(redzone_pattern);
+#else
+ ret->bufsize = bufsize;
+#endif
+
+ (void)align;
+
+ return ret;
+}
+
+static inline void* get_object(void *ptr) {
+#ifndef NDEBUG
+ uint64_t *pre = ptr;
+ return pre + 1;
+#else
+ return ptr;
+#endif
+}
+
+void cache_destroy(cache_t *cache) {
+ while (cache->freecurr > 0) {
+ void *ptr = cache->ptr[--cache->freecurr];
+ if (cache->destructor) {
+ cache->destructor(get_object(ptr), NULL);
+ }
+ free(ptr);
+ }
+ free(cache->name);
+ free(cache->ptr);
+ pthread_mutex_destroy(&cache->mutex);
+}
+
+void* cache_alloc(cache_t *cache) {
+ void *ret;
+ void *object;
+ pthread_mutex_lock(&cache->mutex);
+ if (cache->freecurr > 0) {
+ ret = cache->ptr[--cache->freecurr];
+ object = get_object(ret);
+ } else {
+ object = ret = malloc(cache->bufsize);
+ if (ret != NULL) {
+ object = get_object(ret);
+
+ if (cache->constructor != NULL &&
+ cache->constructor(object, NULL, 0) != 0) {
+ free(ret);
+ object = NULL;
+ }
+ }
+ }
+ pthread_mutex_unlock(&cache->mutex);
+
+#ifndef NDEBUG
+ if (object != NULL) {
+ /* add a simple form of buffer-check */
+ uint64_t *pre = ret;
+ *pre = redzone_pattern;
+ ret = pre+1;
+ memcpy(((char*)ret) + cache->bufsize - (2 * sizeof(redzone_pattern)),
+ &redzone_pattern, sizeof(redzone_pattern));
+ }
+#endif
+
+ return object;
+}
+
+void cache_free(cache_t *cache, void *ptr) {
+ pthread_mutex_lock(&cache->mutex);
+
+#ifndef NDEBUG
+ /* validate redzone... */
+ if (memcmp(((char*)ptr) + cache->bufsize - (2 * sizeof(redzone_pattern)),
+ &redzone_pattern, sizeof(redzone_pattern)) != 0) {
+ raise(SIGABRT);
+ cache_error = 1;
+ pthread_mutex_unlock(&cache->mutex);
+ return;
+ }
+ uint64_t *pre = ptr;
+ --pre;
+ if (*pre != redzone_pattern) {
+ raise(SIGABRT);
+ cache_error = -1;
+ pthread_mutex_unlock(&cache->mutex);
+ return;
+ }
+ ptr = pre;
+#endif
+ if (cache->freecurr < cache->freetotal) {
+ cache->ptr[cache->freecurr++] = ptr;
+ } else {
+ /* try to enlarge free connections array */
+ size_t newtotal = cache->freetotal * 2;
+ void **new_free = realloc(cache->ptr, sizeof(char *) * newtotal);
+ if (new_free) {
+ cache->freetotal = newtotal;
+ cache->ptr = new_free;
+ cache->ptr[cache->freecurr++] = ptr;
+ } else {
+ if (cache->destructor) {
+ cache->destructor(ptr, NULL);
+ }
+ free(ptr);
+
+ }
+ }
+ pthread_mutex_unlock(&cache->mutex);
+}
+
--- /dev/null
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#ifndef CACHE_H
+#define CACHE_H
+#include <pthread.h>
+
+#ifdef HAVE_UMEM_H
+#include <umem.h>
+#define cache_t umem_cache_t
+#define cache_alloc(a) umem_cache_alloc(a, UMEM_DEFAULT)
+#define cache_free(a, b) umem_cache_free(a, b)
+#define cache_create(a,b,c,d,e) umem_cache_create((char*)a, b, c, d, e, NULL, NULL, NULL, 0)
+#define cache_destroy(a) umem_cache_destroy(a);
+
+#else
+
+#ifndef NDEBUG
+/* may be used for debug purposes */
+extern int cache_error;
+#endif
+
+/**
+ * Constructor used to initialize allocated objects
+ *
+ * @param obj pointer to the object to initialized.
+ * @param notused1 This parameter is currently not used.
+ * @param notused2 This parameter is currently not used.
+ * @return you should return 0, but currently this is not checked
+ */
+typedef int cache_constructor_t(void* obj, void* notused1, int notused2);
+/**
+ * Destructor used to clean up allocated objects before they are
+ * returned to the operating system.
+ *
+ * @param obj pointer to the object to initialized.
+ * @param notused1 This parameter is currently not used.
+ * @param notused2 This parameter is currently not used.
+ * @return you should return 0, but currently this is not checked
+ */
+typedef void cache_destructor_t(void* obj, void* notused);
+
+/**
+ * Definition of the structure to keep track of the internal details of
+ * the cache allocator. Touching any of these variables results in
+ * undefined behavior.
+ */
+typedef struct {
+ /** Mutex to protect access to the structure */
+ pthread_mutex_t mutex;
+ /** Name of the cache objects in this cache (provided by the caller) */
+ char *name;
+ /** List of pointers to available buffers in this cache */
+ void **ptr;
+ /** The size of each element in this cache */
+ size_t bufsize;
+ /** The capacity of the list of elements */
+ size_t freetotal;
+ /** The current number of free elements */
+ size_t freecurr;
+ /** The constructor to be called each time we allocate more memory */
+ cache_constructor_t* constructor;
+ /** The destructor to be called each time before we release memory */
+ cache_destructor_t* destructor;
+} cache_t;
+
+/**
+ * Create an object cache.
+ *
+ * The object cache will let you allocate objects of the same size. It is fully
+ * MT safe, so you may allocate objects from multiple threads without having to
+ * do any syncrhonization in the application code.
+ *
+ * @param name the name of the object cache. This name may be used for debug purposes
+ * and may help you track down what kind of object you have problems with
+ * (buffer overruns, leakage etc)
+ * @param bufsize the size of each object in the cache
+ * @param align the alignment requirements of the objects in the cache.
+ * @param constructor the function to be called to initialize memory when we need
+ * to allocate more memory from the os.
+ * @param destructor the function to be called before we release the memory back
+ * to the os.
+ * @return a handle to an object cache if successful, NULL otherwise.
+ */
+cache_t* cache_create(const char* name, size_t bufsize, size_t align,
+ cache_constructor_t* constructor,
+ cache_destructor_t* destructor);
+/**
+ * Destroy an object cache.
+ *
+ * Destroy and invalidate an object cache. You should return all buffers allocated
+ * with cache_alloc by using cache_free before calling this function. Not doing
+ * so results in undefined behavior (the buffers may or may not be invalidated)
+ *
+ * @param handle the handle to the object cache to destroy.
+ */
+void cache_destroy(cache_t* handle);
+/**
+ * Allocate an object from the cache.
+ *
+ * @param handle the handle to the object cache to allocate from
+ * @return a pointer to an initialized object from the cache, or NULL if
+ * the allocation cannot be satisfied.
+ */
+void* cache_alloc(cache_t* handle);
+/**
+ * Return an object back to the cache.
+ *
+ * The caller should return the object in an initialized state so that
+ * the object may be returned in an expected state from cache_alloc.
+ *
+ * @param handle handle to the object cache to return the object to
+ * @param ptr pointer to the object to return.
+ */
+void cache_free(cache_t* handle, void* ptr);
+#endif
+
+#endif
--- /dev/null
+/*
+ * Summary: Definition of the callback interface
+ *
+ * Copy: See Copyright for the status of this software.
+ *
+ * Author: Trond Norbye
+ */
+#ifndef LIBMEMCACHEDPROTOCOL_CALLBACK_H
+#define LIBMEMCACHEDPROTOCOL_CALLBACK_H
+
+/**
+ * Callback to send data back from a successful GET/GETQ/GETK/GETKQ command
+ *
+ * @param cookie Just pass along the cookie supplied in the callback
+ * @param key What to insert as key in the reply
+ * @param keylen The length of the key
+ * @param body What to store in the body of the package
+ * @param bodylen The number of bytes of the body
+ * @param flags The flags stored with the item
+ * @param cas The CAS value to insert into the response (should be 0
+ * if you don't care)
+ */
+typedef protocol_binary_response_status
+(*memcached_binary_protocol_get_response_handler)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void *body,
+ uint32_t bodylen,
+ uint32_t flags,
+ uint64_t cas);
+/**
+ * Callback to send data back from a STAT command
+ *
+ * @param cookie Just pass along the cookie supplied in the callback
+ * @param key What to insert as key in the reply
+ * @param keylen The length of the key
+ * @param body What to store in the body of the package
+ * @param bodylen The number of bytes of the body
+ */
+typedef protocol_binary_response_status
+(*memcached_binary_protocol_stat_response_handler)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void *body,
+ uint32_t bodylen);
+/**
+ * Callback to send data back from a VERSION command
+ *
+ * @param cookie Just pass along the cookie supplied in the callback
+ * @param text The version string
+ * @param length The number of bytes in the version string
+ */
+typedef protocol_binary_response_status
+(*memcached_binary_protocol_version_response_handler)(const void *cookie,
+ const void *text,
+ uint32_t length);
+
+
+/**
+ * In the low level interface you need to format the response
+ * packet yourself (giving you complete freedom :-)
+ *
+ * @param cookie Just pass along the cookie supplied in the callback
+ * @param request Pointer to the request packet you are sending a reply to
+ * @param response Pointer to the response packet to send
+ *
+ */
+typedef protocol_binary_response_status (*memcached_binary_protocol_raw_response_handler)(const void *cookie,
+ protocol_binary_request_header *request,
+ protocol_binary_response_header *response);
+
+/**
+ * In the low lever interface you have to do most of the work by
+ * yourself, but it also gives you a lot of freedom :-)
+ * @param cookie identification for this connection, just pass it along to
+ * the response handler
+ * @param header the command received over the wire. Never try to access
+ * <u>anything</u> outside the command.
+ * @param resonse_handler call this function to send data back to the client
+ */
+typedef protocol_binary_response_status (*memcached_binary_protocol_command_handler)(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler);
+
+/**
+ * The raw interface to the packets is implemented in version 0. It contains
+ * just an array with command handlers. The inxed in the array is the
+ * com code.
+ */
+struct memcached_binary_protocol_callback_v0_st {
+ memcached_binary_protocol_command_handler comcode[256];
+};
+
+/**
+ * The first version of the callback struct containing all of the
+ * documented commands in the initial release of the binary protocol
+ * (aka. memcached 1.4.0).
+ *
+ * You might miss the Q commands (addq etc) but the response function
+ * knows how to deal with them so you don't need to worry about that :-)
+ */
+struct memcached_binary_protocol_callback_v1_st {
+ /**
+ * Add an item to the cache
+ * @param cookie id of the client receiving the command
+ * @param key the key to add
+ * @param len the length of the key
+ * @param val the value to store for the key (may be NIL)
+ * @param vallen the length of the data
+ * @param flags the flags to store with the key
+ * @param exptime the expiry time for the key-value pair
+ * @param cas the resulting cas for the add operation (if success)
+ */
+ protocol_binary_response_status (*add)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* val,
+ uint32_t vallen,
+ uint32_t flags,
+ uint32_t exptime,
+ uint64_t *cas);
+
+ /**
+ * Append data to an <b>existing</b> key-value pair.
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to add data to
+ * @param len the length of the key
+ * @param val the value to append to the value
+ * @param vallen the length of the data
+ * @param cas the CAS in the request
+ * @param result_cas the resulting cas for the append operation
+ *
+ */
+ protocol_binary_response_status (*append)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* val,
+ uint32_t vallen,
+ uint64_t cas,
+ uint64_t *result_cas);
+
+ /**
+ * Decrement the value for a key
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to decrement the value for
+ * @param len the length of the key
+ * @param delta the amount to decrement
+ * @param initial initial value to store (if the key doesn't exist)
+ * @param expiration expiration time for the object (if the key doesn't exist)
+ * @param cas the CAS in the request
+ * @param result the result from the decrement
+ * @param result_cas the cas of the item
+ *
+ */
+ protocol_binary_response_status (*decrement)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ uint64_t delta,
+ uint64_t initial,
+ uint32_t expiration,
+ uint64_t *result,
+ uint64_t *result_cas);
+
+ /**
+ * Delete an existing key
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to delete
+ * @param len the length of the key
+ * @param cas the CAS in the request
+ */
+ protocol_binary_response_status (*delete)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ uint64_t cas);
+
+
+ /**
+ * Flush the cache
+ *
+ * @param cookie id of the client receiving the command
+ * @param when when the cache should be flushed (0 == immediately)
+ */
+ protocol_binary_response_status (*flush)(const void *cookie,
+ uint32_t when);
+
+
+
+ /**
+ * Get a key-value pair
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to get
+ * @param len the length of the key
+ * @param response_handler to send the result back to the client
+ */
+ protocol_binary_response_status (*get)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ memcached_binary_protocol_get_response_handler response_handler);
+
+ /**
+ * Increment the value for a key
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to increment the value on
+ * @param len the length of the key
+ * @param delta the amount to increment
+ * @param initial initial value to store (if the key doesn't exist)
+ * @param expiration expiration time for the object (if the key doesn't exist)
+ * @param cas the CAS in the request
+ * @param result the result from the decrement
+ * @param result_cas the cas of the item
+ *
+ */
+ protocol_binary_response_status (*increment)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ uint64_t delta,
+ uint64_t initial,
+ uint32_t expiration,
+ uint64_t *result,
+ uint64_t *result_cas);
+
+ /**
+ * The noop command was received. This is just a notification callback (the
+ * response is automatically created).
+ *
+ * @param cookie id of the client receiving the command
+ */
+ protocol_binary_response_status (*noop)(const void *cookie);
+
+ /**
+ * Prepend data to an <b>existing</b> key-value pair.
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to prepend data to
+ * @param len the length of the key
+ * @param val the value to prepend to the value
+ * @param vallen the length of the data
+ * @param cas the CAS in the request
+ * @param result-cas the cas id of the item
+ *
+ */
+ protocol_binary_response_status (*prepend)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* val,
+ uint32_t vallen,
+ uint64_t cas,
+ uint64_t *result_cas);
+
+ /**
+ * The quit command was received. This is just a notification callback (the
+ * response is automatically created).
+ *
+ * @param cookie id of the client receiving the command
+ */
+ protocol_binary_response_status (*quit)(const void *cookie);
+
+
+ /**
+ * Replace an <b>existing</b> item to the cache
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to replace the content for
+ * @param len the length of the key
+ * @param val the value to store for the key (may be NIL)
+ * @param vallen the length of the data
+ * @param flags the flags to store with the key
+ * @param exptime the expiry time for the key-value pair
+ * @param cas the cas id in the request
+ * @param result_cas the cas id of the item
+ */
+ protocol_binary_response_status (*replace)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* val,
+ uint32_t vallen,
+ uint32_t flags,
+ uint32_t exptime,
+ uint64_t cas,
+ uint64_t *result_cas);
+
+
+ /**
+ * Set a key-value pair in the cache
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to insert
+ * @param len the length of the key
+ * @param val the value to store for the key (may be NIL)
+ * @param vallen the length of the data
+ * @param flags the flags to store with the key
+ * @param exptime the expiry time for the key-value pair
+ * @param cas the cas id in the request
+ * @param result_cas the cas id of the new item
+ */
+ protocol_binary_response_status (*set)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void* val,
+ uint32_t vallen,
+ uint32_t flags,
+ uint32_t exptime,
+ uint64_t cas,
+ uint64_t *result_cas);
+
+ /**
+ * Get status information
+ *
+ * @param cookie id of the client receiving the command
+ * @param key the key to get status for (or NIL to request all status).
+ * Remember to insert the terminating packet if multiple
+ * packets should be returned.
+ * @param keylen the length of the key
+ * @param response_handler to send the result back to the client, but
+ * don't send reply on success!
+ *
+ */
+ protocol_binary_response_status (*stat)(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ memcached_binary_protocol_stat_response_handler response_handler);
+
+ /**
+ * Get the version information
+ *
+ * @param cookie id of the client receiving the command
+ * @param response_handler to send the result back to the client, but
+ * don't send reply on success!
+ *
+ */
+ protocol_binary_response_status (*version)(const void *cookie,
+ memcached_binary_protocol_version_response_handler response_handler);
+};
+
+/**
+ *
+ */
+struct memcached_binary_protocol_callback_st {
+ /**
+ * The interface version used (set to 0 if you don't have any specialized
+ * command handlers).
+ */
+ uint64_t interface_version;
+
+ /**
+ * Callback fired just before the command will be executed.
+ *
+ * @param cookie id of the client receiving the command
+ * @param header the command header as received on the wire. If you look
+ * at the content you <b>must</b> ensure that you don't
+ * try to access beyond the end of the message.
+ */
+ void (*pre_execute)(const void *cookie,
+ protocol_binary_request_header *header);
+ /**
+ * Callback fired just after the command was exected (please note
+ * that the data transfer back to the client is not finished at this
+ * time).
+ *
+ * @param cookie id of the client receiving the command
+ * @param header the command header as received on the wire. If you look
+ * at the content you <b>must</b> ensure that you don't
+ * try to access beyond the end of the message.
+ */
+ void (*post_execute)(const void *cookie,
+ protocol_binary_request_header *header);
+
+ /**
+ * Callback fired if no specialized callback is registered for this
+ * specific command code.
+ *
+ * @param cookie id of the client receiving the command
+ * @param header the command header as received on the wire. You <b>must</b>
+ * ensure that you don't try to access beyond the end of the
+ * message.
+ * @param response_handler The response handler to send data back.
+ */
+ protocol_binary_response_status (*unknown)(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler);
+
+ /**
+ * The different interface levels we support. A pointer is used so the
+ * size of the structure is fixed. You must ensure that the memory area
+ * passed as the pointer is valid as long as you use the protocol handler.
+ */
+ union {
+ struct memcached_binary_protocol_callback_v0_st v0;
+
+ /**
+ * The first version of the callback struct containing all of the
+ * documented commands in the initial release of the binary protocol
+ * (aka. memcached 1.4.0).
+ */
+ struct memcached_binary_protocol_callback_v1_st v1;
+ } interface;
+};
+
+#endif
--- /dev/null
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#ifndef MEMCACHED_PROTOCOL_INTERNAL_H
+#define MEMCACHED_PROTOCOL_INTERNAL_H
+
+#include "config.h"
+#include <stdbool.h>
+#include <assert.h>
+#include <netinet/in.h>
+
+#ifdef linux
+/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
+ * optimize the conversion functions, but the prototypes generate warnings
+ * from gcc. The conversion methods isn't the bottleneck for my app, so
+ * just remove the warnings by undef'ing the optimization ..
+ */
+#undef ntohs
+#undef ntohl
+#undef htons
+#undef htonl
+
+#endif
+
+
+/* Define this here, which will turn on the visibilty controls while we're
+ * building libmemcached.
+ */
+#define BUILDING_LIBMEMCACHED 1
+
+#include <libmemcached/protocol_handler.h>
+#include <libmemcached/protocol/cache.h>
+
+struct memcached_binary_protocol_st {
+ struct memcached_binary_protocol_callback_st *callback;
+ memcached_binary_protocol_recv_func recv;
+ memcached_binary_protocol_send_func send;
+ char *input_buffer;
+ size_t input_buffer_size;
+ bool pedantic;
+ /* @todo use multiple sized buffers */
+ cache_t *buffer_cache;
+};
+
+
+struct chunk_st {
+ /* Pointer to the data */
+ char *data;
+ /* The offset to the first byte into the buffer that is used */
+ size_t offset;
+ /* The offset into the buffer for the first free byte */
+ size_t nbytes;
+ /* The number of bytes in the buffer */
+ size_t size;
+ /* Pointer to the next buffer in the chain */
+ struct chunk_st *next;
+};
+
+#define CHUNK_BUFFERSIZE 2048
+
+struct memcached_binary_protocol_client_st {
+ struct memcached_binary_protocol_st *root;
+ int sock;
+ int error;
+
+ /* Linked list of data to send */
+ struct chunk_st *output;
+ struct chunk_st *output_tail;
+
+
+
+ char *input_buffer;
+ size_t input_buffer_size;
+ size_t input_buffer_offset;
+ char *curr_input;
+
+
+ struct chunk_st *input;
+ /* Pointer to the last chunk to avoid the need to traverse the complete list */
+ struct chunk_st *input_tail;
+ size_t bytes_available;
+
+ protocol_binary_request_header *current_command;
+ bool quiet;
+};
+
+LIBMEMCACHED_LOCAL
+bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_header *request);
+
+LIBMEMCACHED_LOCAL
+bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_header *request,
+ protocol_binary_response_header *response);
+
+
+#endif
--- /dev/null
+libmemcachedprotocol_0 { global: *; };
--- /dev/null
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#include "common.h"
+
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <inttypes.h>
+
+#define ensure(a) if (!(a)) { return false; }
+
+bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_header *request)
+{
+ ensure(request->request.magic == PROTOCOL_BINARY_REQ);
+ ensure(request->request.datatype == PROTOCOL_BINARY_RAW_BYTES);
+
+ ensure(request->bytes[6] == 0);
+ ensure(request->bytes[7] == 0);
+
+ uint8_t opcode= request->request.opcode;
+ uint16_t keylen= ntohs(request->request.keylen);
+ uint8_t extlen= request->request.extlen;
+ uint32_t bodylen= ntohl(request->request.bodylen);
+
+ ensure(bodylen >= (keylen + extlen));
+
+ switch (opcode) {
+ case PROTOCOL_BINARY_CMD_GET:
+ case PROTOCOL_BINARY_CMD_GETK:
+ case PROTOCOL_BINARY_CMD_GETKQ:
+ case PROTOCOL_BINARY_CMD_GETQ:
+ ensure(extlen == 0);
+ ensure(keylen > 0);
+ ensure(keylen == bodylen);
+ ensure(request->request.cas == 0);
+ break;
+
+ case PROTOCOL_BINARY_CMD_ADD:
+ case PROTOCOL_BINARY_CMD_ADDQ:
+ /* it makes no sense to run add with a cas value */
+ ensure(request->request.cas == 0);
+ /* FALLTHROUGH */
+ case PROTOCOL_BINARY_CMD_SET:
+ case PROTOCOL_BINARY_CMD_SETQ:
+ case PROTOCOL_BINARY_CMD_REPLACE:
+ case PROTOCOL_BINARY_CMD_REPLACEQ:
+ ensure(keylen > 0);
+ ensure(extlen == 8);
+ break;
+
+ case PROTOCOL_BINARY_CMD_DELETE:
+ case PROTOCOL_BINARY_CMD_DELETEQ:
+ ensure(extlen == 0);
+ ensure(keylen > 0);
+ ensure(keylen == bodylen);
+ break;
+
+ case PROTOCOL_BINARY_CMD_INCREMENT:
+ case PROTOCOL_BINARY_CMD_INCREMENTQ:
+ case PROTOCOL_BINARY_CMD_DECREMENT:
+ case PROTOCOL_BINARY_CMD_DECREMENTQ:
+ ensure(extlen == 20);
+ ensure(keylen > 0);
+ ensure(keylen + extlen == bodylen);
+ break;
+
+ case PROTOCOL_BINARY_CMD_QUIT:
+ case PROTOCOL_BINARY_CMD_QUITQ:
+ case PROTOCOL_BINARY_CMD_NOOP:
+ case PROTOCOL_BINARY_CMD_VERSION:
+ ensure(extlen == 0);
+ ensure(keylen == 0);
+ ensure(bodylen == 0);
+ break;
+
+ case PROTOCOL_BINARY_CMD_FLUSH:
+ case PROTOCOL_BINARY_CMD_FLUSHQ:
+ ensure(extlen == 0 || extlen == 4);
+ ensure(keylen == 0);
+ ensure(bodylen == extlen);
+ break;
+
+ case PROTOCOL_BINARY_CMD_STAT:
+ ensure(extlen == 0);
+ /* May have key, but not value */
+ ensure(keylen == bodylen);
+ break;
+
+ case PROTOCOL_BINARY_CMD_APPEND:
+ case PROTOCOL_BINARY_CMD_APPENDQ:
+ case PROTOCOL_BINARY_CMD_PREPEND:
+ case PROTOCOL_BINARY_CMD_PREPENDQ:
+ ensure(extlen == 0);
+ ensure(keylen > 0);
+ break;
+ default:
+ /* Unknown command */
+ ;
+ }
+
+ return true;
+}
+
+bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_header *request,
+ protocol_binary_response_header *response)
+{
+ ensure(response->response.magic == PROTOCOL_BINARY_RES);
+ ensure(response->response.datatype == PROTOCOL_BINARY_RAW_BYTES);
+ ensure(response->response.opaque == request->request.opaque);
+
+ uint16_t status= ntohs(response->response.status);
+ uint8_t opcode= response->response.opcode;
+
+ if (status == PROTOCOL_BINARY_RESPONSE_SUCCESS)
+ {
+ switch (opcode) {
+ case PROTOCOL_BINARY_CMD_ADDQ:
+ case PROTOCOL_BINARY_CMD_APPENDQ:
+ case PROTOCOL_BINARY_CMD_DECREMENTQ:
+ case PROTOCOL_BINARY_CMD_DELETEQ:
+ case PROTOCOL_BINARY_CMD_FLUSHQ:
+ case PROTOCOL_BINARY_CMD_INCREMENTQ:
+ case PROTOCOL_BINARY_CMD_PREPENDQ:
+ case PROTOCOL_BINARY_CMD_QUITQ:
+ case PROTOCOL_BINARY_CMD_REPLACEQ:
+ case PROTOCOL_BINARY_CMD_SETQ:
+ /* Quiet command shouldn't return on success */
+ return false;
+ default:
+ break;
+ }
+
+ switch (opcode) {
+ case PROTOCOL_BINARY_CMD_ADD:
+ case PROTOCOL_BINARY_CMD_REPLACE:
+ case PROTOCOL_BINARY_CMD_SET:
+ case PROTOCOL_BINARY_CMD_APPEND:
+ case PROTOCOL_BINARY_CMD_PREPEND:
+ ensure(response->response.keylen == 0);
+ ensure(response->response.extlen == 0);
+ ensure(response->response.bodylen == 0);
+ ensure(response->response.cas != 0);
+ break;
+ case PROTOCOL_BINARY_CMD_FLUSH:
+ case PROTOCOL_BINARY_CMD_NOOP:
+ case PROTOCOL_BINARY_CMD_QUIT:
+ case PROTOCOL_BINARY_CMD_DELETE:
+ ensure(response->response.keylen == 0);
+ ensure(response->response.extlen == 0);
+ ensure(response->response.bodylen == 0);
+ ensure(response->response.cas == 0);
+ break;
+
+ case PROTOCOL_BINARY_CMD_DECREMENT:
+ case PROTOCOL_BINARY_CMD_INCREMENT:
+ ensure(response->response.keylen == 0);
+ ensure(response->response.extlen == 0);
+ ensure(ntohl(response->response.bodylen) == 8);
+ ensure(response->response.cas != 0);
+ break;
+
+ case PROTOCOL_BINARY_CMD_STAT:
+ ensure(response->response.extlen == 0);
+ /* key and value exists in all packets except in the terminating */
+ ensure(response->response.cas == 0);
+ break;
+
+ case PROTOCOL_BINARY_CMD_VERSION:
+ ensure(response->response.keylen == 0);
+ ensure(response->response.extlen == 0);
+ ensure(response->response.bodylen != 0);
+ ensure(response->response.cas == 0);
+ break;
+
+ case PROTOCOL_BINARY_CMD_GET:
+ case PROTOCOL_BINARY_CMD_GETQ:
+ ensure(response->response.keylen == 0);
+ ensure(response->response.extlen == 4);
+ ensure(response->response.cas != 0);
+ break;
+
+ case PROTOCOL_BINARY_CMD_GETK:
+ case PROTOCOL_BINARY_CMD_GETKQ:
+ ensure(response->response.keylen != 0);
+ ensure(response->response.extlen == 4);
+ ensure(response->response.cas != 0);
+ break;
+
+ default:
+ /* Undefined command code */
+ break;
+ }
+ }
+ else
+ {
+ ensure(response->response.cas == 0);
+ ensure(response->response.extlen == 0);
+ if (opcode != PROTOCOL_BINARY_CMD_GETK)
+ {
+ ensure(response->response.keylen == 0);
+ }
+ }
+
+ return true;
+}
--- /dev/null
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#include "common.h"
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <stdbool.h>
+#include <string.h>
+#include <stdio.h>
+
+/*
+** **********************************************************************
+** INTERNAL INTERFACE
+** **********************************************************************
+*/
+
+/**
+ * The default function to receive data from the client. This function
+ * just wraps the recv function to receive from a socket.
+ * See man -s3socket recv for more information.
+ *
+ * @param cookie cookie indentifying a client, not used
+ * @param sock socket to read from
+ * @param buf the destination buffer
+ * @param nbytes the number of bytes to read
+ * @return the number of bytes transferred of -1 upon error
+ */
+static ssize_t default_recv(const void *cookie,
+ int sock,
+ void *buf,
+ size_t nbytes)
+{
+ (void)cookie;
+ return recv(sock, buf, nbytes, 0);
+}
+
+/**
+ * The default function to send data to the server. This function
+ * just wraps the send function to send through a socket.
+ * See man -s3socket send for more information.
+ *
+ * @param cookie cookie indentifying a client, not used
+ * @param sock socket to send to
+ * @param buf the source buffer
+ * @param nbytes the number of bytes to send
+ * @return the number of bytes transferred of -1 upon error
+ */
+static ssize_t default_send(const void *cookie,
+ int fd,
+ const void *buf,
+ size_t nbytes)
+{
+ (void)cookie;
+ return send(fd, buf, nbytes, 0);
+}
+
+/**
+ * Try to drain the output buffers without blocking
+ *
+ * @param client the client to drain
+ * @return false if an error occured (connection should be shut down)
+ * true otherwise (please note that there may be more data to
+ * left in the buffer to send)
+ */
+static bool drain_output(struct memcached_binary_protocol_client_st *client)
+{
+ ssize_t len;
+
+ // Do we have pending data to send?
+ while (client->output != NULL)
+ {
+ len= client->root->send(client,
+ client->sock,
+ client->output->data + client->output->offset,
+ client->output->nbytes - client->output->offset);
+
+ if (len == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ {
+ return true;
+ }
+ else if (errno != EINTR)
+ {
+ client->error= errno;
+ return false;
+ }
+ }
+ else
+ {
+ client->output->offset += (size_t)len;
+ if (client->output->offset == client->output->nbytes)
+ {
+ /* This was the complete buffer */
+ struct chunk_st *old= client->output;
+ client->output= client->output->next;
+ if (client->output == NULL)
+ {
+ client->output_tail= NULL;
+ }
+ cache_free(client->root->buffer_cache, old);
+ }
+ }
+ }
+
+ return true;
+}
+
+/**
+ * Allocate an output buffer and chain it into the output list
+ *
+ * @param client the client that needs the buffer
+ * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
+ */
+static struct chunk_st*
+allocate_output_chunk(struct memcached_binary_protocol_client_st *client)
+{
+ struct chunk_st*ret= cache_alloc(client->root->buffer_cache);
+ if (ret == NULL) {
+ return NULL;
+ }
+
+ ret->offset = ret->nbytes = 0;
+ ret->next = NULL;
+ ret->size = CHUNK_BUFFERSIZE;
+ ret->data= (void*)(ret + 1);
+ if (client->output == NULL)
+ {
+ client->output = client->output_tail = ret;
+ }
+ else
+ {
+ client->output_tail->next= ret;
+ client->output_tail= ret;
+ }
+
+ return ret;
+}
+
+/**
+ * Spool data into the send-buffer for a client.
+ *
+ * @param client the client to spool the data for
+ * @param data the data to spool
+ * @param length the number of bytes of data to spool
+ * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
+ * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
+ */
+static protocol_binary_response_status
+spool_output(struct memcached_binary_protocol_client_st *client,
+ const void *data,
+ size_t length)
+{
+ size_t offset = 0;
+
+ struct chunk_st *chunk= client->output;
+ while (offset < length)
+ {
+ if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
+ {
+ if ((chunk= allocate_output_chunk(client)) == NULL)
+ {
+ return PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ }
+
+ size_t bulk = length - offset;
+ if (bulk > chunk->size - chunk->nbytes)
+ {
+ bulk = chunk->size - chunk->nbytes;
+ }
+
+ memcpy(chunk->data + chunk->nbytes, data, bulk);
+ chunk->nbytes += bulk;
+ offset += bulk;
+ }
+
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+/**
+ * Send a preformatted packet back to the client. If the connection is in
+ * pedantic mode, it will validate the packet and refuse to send it if it
+ * breaks the specification.
+ *
+ * @param cookie client identification
+ * @param request the original request packet
+ * @param response the packet to send
+ * @return The status of the operation
+ */
+static protocol_binary_response_status
+raw_response_handler(const void *cookie,
+ protocol_binary_request_header *request,
+ protocol_binary_response_header *response)
+{
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+
+ if (client->root->pedantic &&
+ !memcached_binary_protocol_pedantic_check_response(request, response))
+ {
+ return PROTOCOL_BINARY_RESPONSE_EINVAL;
+ }
+
+ if (!drain_output(client))
+ {
+ return PROTOCOL_BINARY_RESPONSE_EIO;
+ }
+
+ size_t len= sizeof(*response) + htonl(response->response.bodylen);
+ size_t offset= 0;
+ char *ptr= (void*)response;
+
+ if (client->output == NULL)
+ {
+ /* I can write directly to the socket.... */
+ do
+ {
+ size_t num_bytes= len - offset;
+ ssize_t nw= client->root->send(client,
+ client->sock,
+ ptr + offset,
+ num_bytes);
+ if (nw == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ {
+ break;
+ }
+ else if (errno != EINTR)
+ {
+ client->error= errno;
+ return PROTOCOL_BINARY_RESPONSE_EIO;
+ }
+ }
+ else
+ {
+ offset += (size_t)nw;
+ }
+ } while (offset < len);
+ }
+
+ return spool_output(client, ptr, len - offset);
+}
+
+/*
+ * Version 0 of the interface is really low level and protocol specific,
+ * while the version 1 of the interface is more API focused. We need a
+ * way to translate between the command codes on the wire and the
+ * application level interface in V1, so let's just use the V0 of the
+ * interface as a map instead of creating a huuuge switch :-)
+ */
+
+/**
+ * Callback for the GET/GETQ/GETK and GETKQ responses
+ * @param cookie client identifier
+ * @param key the key for the item
+ * @param keylen the length of the key
+ * @param body the length of the body
+ * @param bodylen the length of the body
+ * @param flags the flags for the item
+ * @param cas the CAS id for the item
+ */
+static protocol_binary_response_status
+get_response_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void *body,
+ uint32_t bodylen,
+ uint32_t flags,
+ uint64_t cas) {
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ uint8_t opcode= client->current_command->request.opcode;
+
+ if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ)
+ {
+ keylen= 0;
+ }
+
+ protocol_binary_response_get response= {
+ .message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= client->current_command->request.opaque,
+ .cas= htonll(cas),
+ .keylen= htons(keylen),
+ .extlen= 4,
+ .bodylen= htonl(bodylen + keylen + 4),
+ },
+ .message.body.flags= htonl(flags),
+ };
+
+ protocol_binary_response_status rval;
+ const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
+ (rval= spool_output(client, key, keylen)) != success ||
+ (rval= spool_output(client, body, bodylen)) != success)
+ {
+ return rval;
+ }
+
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+/**
+ * Callback for the STAT responses
+ * @param cookie client identifier
+ * @param key the key for the item
+ * @param keylen the length of the key
+ * @param body the length of the body
+ * @param bodylen the length of the body
+ */
+static protocol_binary_response_status
+stat_response_handler(const void *cookie,
+ const void *key,
+ uint16_t keylen,
+ const void *body,
+ uint32_t bodylen) {
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+
+ protocol_binary_response_no_extras response= {
+ .message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= client->current_command->request.opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= client->current_command->request.opaque,
+ .keylen= htons(keylen),
+ .bodylen= htonl(bodylen + keylen),
+ },
+ };
+
+ protocol_binary_response_status rval;
+ const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
+ (rval= spool_output(client, key, keylen)) != success ||
+ (rval= spool_output(client, body, bodylen)) != success)
+ {
+ return rval;
+ }
+
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+/**
+ * Callback for the VERSION responses
+ * @param cookie client identifier
+ * @param text the length of the body
+ * @param textlen the length of the body
+ */
+static protocol_binary_response_status
+version_response_handler(const void *cookie,
+ const void *text,
+ uint32_t textlen) {
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+
+ protocol_binary_response_no_extras response= {
+ .message.header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= client->current_command->request.opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= client->current_command->request.opaque,
+ .bodylen= htonl(textlen),
+ },
+ };
+
+ protocol_binary_response_status rval;
+ const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
+ (rval= spool_output(client, text, textlen)) != success)
+ {
+ return rval;
+ }
+
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+/**
+ * Callback for ADD and ADDQ
+ * @param cookie the calling client
+ * @param header the add/addq command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+add_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.add != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+ protocol_binary_request_add *request= (void*)header;
+ uint32_t flags= ntohl(request->message.body.flags);
+ uint32_t timeout= ntohl(request->message.body.expiration);
+ char *key= ((char*)header) + sizeof(*header) + 8;
+ char *data= key + keylen;
+ uint64_t cas;
+
+ rval= client->root->callback->interface.v1.add(cookie, key, keylen,
+ data, datalen, flags,
+ timeout, &cas);
+
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
+ {
+ /* Send a positive request */
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_ADD,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ .cas= ntohll(cas)
+ }
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for DECREMENT and DECREMENTQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+decrement_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.decrement != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ protocol_binary_request_decr *request= (void*)header;
+ uint64_t init= ntohll(request->message.body.initial);
+ uint64_t delta= ntohll(request->message.body.delta);
+ uint32_t timeout= ntohl(request->message.body.expiration);
+ void *key= request->bytes + sizeof(request->bytes);
+ uint64_t result;
+ uint64_t cas;
+
+ char buffer[1024] = {0};
+ memcpy(buffer, key, keylen);
+ fprintf(stderr, "%s\n", buffer);
+
+
+ rval= client->root->callback->interface.v1.decrement(cookie, key, keylen,
+ delta, init, timeout,
+ &result, &cas);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT)
+ {
+ /* Send a positive request */
+ protocol_binary_response_decr response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_DECREMENT,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ .cas= ntohll(cas),
+ .bodylen= htonl(8)
+ },
+ .body.value = htonll(result)
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for DELETE and DELETEQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+delete_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.delete != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ void *key= (header + 1);
+ uint64_t cas= ntohll(header->request.cas);
+ rval= client->root->callback->interface.v1.delete(cookie, key, keylen, cas);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
+ {
+ /* Send a positive request */
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_DELETE,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ }
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for FLUSH and FLUSHQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+flush_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.flush != NULL)
+ {
+ protocol_binary_request_flush *flush= (void*)header;
+ uint32_t timeout= 0;
+ if (htonl(header->request.bodylen) == 4)
+ {
+ timeout= ntohl(flush->message.body.expiration);
+ }
+
+ rval= client->root->callback->interface.v1.flush(cookie, timeout);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH)
+ {
+ /* Send a positive request */
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_FLUSH,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ }
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for GET, GETK, GETQ, GETKQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+get_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.get != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ void *key= (header + 1);
+ rval= client->root->callback->interface.v1.get(cookie, key, keylen,
+ get_response_handler);
+
+ if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT &&
+ (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ ||
+ header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ))
+ {
+ /* Quiet commands shouldn't respond on cache misses */
+ rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for INCREMENT and INCREMENTQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+increment_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.increment != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ protocol_binary_request_incr *request= (void*)header;
+ uint64_t init= ntohll(request->message.body.initial);
+ uint64_t delta= ntohll(request->message.body.delta);
+ uint32_t timeout= ntohl(request->message.body.expiration);
+ void *key= request->bytes + sizeof(request->bytes);
+ uint64_t cas;
+ uint64_t result;
+
+ rval= client->root->callback->interface.v1.increment(cookie, key, keylen,
+ delta, init, timeout,
+ &result, &cas);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT)
+ {
+ /* Send a positive request */
+ protocol_binary_response_incr response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_INCREMENT,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ .cas= ntohll(cas),
+ .bodylen= htonl(8)
+ },
+ .body.value = htonll(result)
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for noop. Inform the v1 interface about the noop packet, and
+ * create and send a packet back to the client
+ *
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler the response handler
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+noop_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.noop != NULL)
+ {
+ client->root->callback->interface.v1.noop(cookie);
+ }
+
+ protocol_binary_response_no_extras response= {
+ .message = {
+ .header.response = {
+ .magic = PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_NOOP,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ }
+ }
+ };
+
+ return response_handler(cookie, header, (void*)&response);
+}
+
+/**
+ * Callback for APPEND and APPENDQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+append_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.append != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ uint32_t datalen= ntohl(header->request.bodylen) - keylen;
+ char *key= (void*)(header + 1);
+ char *data= key + keylen;
+ uint64_t cas= ntohll(header->request.cas);
+ uint64_t result_cas;
+
+ rval= client->root->callback->interface.v1.append(cookie, key, keylen,
+ data, datalen, cas,
+ &result_cas);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_APPEND)
+ {
+ /* Send a positive request */
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_APPEND,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ .cas= ntohll(result_cas),
+ },
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for PREPEND and PREPENDQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+prepend_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.prepend != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ uint32_t datalen= ntohl(header->request.bodylen) - keylen;
+ char *key= (char*)(header + 1);
+ char *data= key + keylen;
+ uint64_t cas= ntohll(header->request.cas);
+ uint64_t result_cas;
+ rval= client->root->callback->interface.v1.prepend(cookie, key, keylen,
+ data, datalen, cas,
+ &result_cas);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
+ {
+ /* Send a positive request */
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_PREPEND,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ .cas= ntohll(result_cas),
+ },
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for QUIT and QUITQ. Notify the client and shut down the connection
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+quit_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.quit != NULL)
+ {
+ client->root->callback->interface.v1.quit(cookie);
+ }
+
+ protocol_binary_response_no_extras response= {
+ .message = {
+ .header.response = {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_QUIT,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque
+ }
+ }
+ };
+
+ if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
+ {
+ response_handler(cookie, header, (void*)&response);
+ }
+
+ /* I need a better way to signal to close the connection */
+ return PROTOCOL_BINARY_RESPONSE_EIO;
+}
+
+/**
+ * Callback for REPLACE and REPLACEQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+replace_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.replace != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+ protocol_binary_request_replace *request= (void*)header;
+ uint32_t flags= ntohl(request->message.body.flags);
+ uint32_t timeout= ntohl(request->message.body.expiration);
+ char *key= ((char*)header) + sizeof(*header) + 8;
+ char *data= key + keylen;
+ uint64_t cas= ntohll(header->request.cas);
+ uint64_t result_cas;
+
+ rval= client->root->callback->interface.v1.replace(cookie, key, keylen,
+ data, datalen, flags,
+ timeout, cas,
+ &result_cas);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
+ {
+ /* Send a positive request */
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_REPLACE,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ .cas= ntohll(result_cas),
+ },
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for SET and SETQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+set_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.set != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+ uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+ protocol_binary_request_replace *request= (void*)header;
+ uint32_t flags= ntohl(request->message.body.flags);
+ uint32_t timeout= ntohl(request->message.body.expiration);
+ char *key= ((char*)header) + sizeof(*header) + 8;
+ char *data= key + keylen;
+ uint64_t cas= ntohll(header->request.cas);
+ uint64_t result_cas;
+
+
+ rval= client->root->callback->interface.v1.set(cookie, key, keylen,
+ data, datalen, flags,
+ timeout, cas, &result_cas);
+ if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ header->request.opcode == PROTOCOL_BINARY_CMD_SET)
+ {
+ /* Send a positive request */
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= PROTOCOL_BINARY_CMD_SET,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+ .opaque= header->request.opaque,
+ .cas= ntohll(result_cas),
+ },
+ }
+ };
+ rval= response_handler(cookie, header, (void*)&response);
+ }
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for STAT
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+stat_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.stat != NULL)
+ {
+ uint16_t keylen= ntohs(header->request.keylen);
+
+ rval= client->root->callback->interface.v1.stat(cookie,
+ (void*)(header + 1),
+ keylen,
+ stat_response_handler);
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * Callback for VERSION
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+version_command_handler(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+{
+ (void)response_handler;
+ (void)header;
+ protocol_binary_response_status rval;
+
+ struct memcached_binary_protocol_client_st *client= (void*)cookie;
+ if (client->root->callback->interface.v1.version != NULL)
+ {
+ rval= client->root->callback->interface.v1.version(cookie,
+ version_response_handler);
+ }
+ else
+ {
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ }
+
+ return rval;
+}
+
+/**
+ * The map to remap between the com codes and the v1 logical setting
+ */
+static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256]= {
+ [PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
+ [PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
+ [PROTOCOL_BINARY_CMD_APPENDQ]= append_command_handler,
+ [PROTOCOL_BINARY_CMD_APPEND]= append_command_handler,
+ [PROTOCOL_BINARY_CMD_DECREMENTQ]= decrement_command_handler,
+ [PROTOCOL_BINARY_CMD_DECREMENT]= decrement_command_handler,
+ [PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
+ [PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
+ [PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
+ [PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
+ [PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
+ [PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
+ [PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
+ [PROTOCOL_BINARY_CMD_GET]= get_command_handler,
+ [PROTOCOL_BINARY_CMD_INCREMENTQ]= increment_command_handler,
+ [PROTOCOL_BINARY_CMD_INCREMENT]= increment_command_handler,
+ [PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
+ [PROTOCOL_BINARY_CMD_PREPENDQ]= prepend_command_handler,
+ [PROTOCOL_BINARY_CMD_PREPEND]= prepend_command_handler,
+ [PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
+ [PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
+ [PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
+ [PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
+ [PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
+ [PROTOCOL_BINARY_CMD_SET]= set_command_handler,
+ [PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
+ [PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
+};
+
+/**
+ * Try to execute a command. Fire the pre/post functions and the specialized
+ * handler function if it's set. If not, the unknown probe should be fired
+ * if it's present.
+ * @param client the client connection to operate on
+ * @param header the command to execute
+ * @return true if success or false if a fatal error occured so that the
+ * connection should be shut down.
+ */
+static bool execute_command(struct memcached_binary_protocol_client_st *client, protocol_binary_request_header *header)
+{
+ if (client->root->pedantic &&
+ memcached_binary_protocol_pedantic_check_request(header))
+ {
+ /* @todo return invalid command packet */
+ }
+
+ /* we got all data available, execute the callback! */
+ if (client->root->callback->pre_execute != NULL)
+ {
+ client->root->callback->pre_execute(client, header);
+ }
+
+ protocol_binary_response_status rval;
+ rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ uint8_t cc= header->request.opcode;
+
+ switch (client->root->callback->interface_version)
+ {
+ case 0:
+ if (client->root->callback->interface.v0.comcode[cc] != NULL) {
+ rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler);
+ }
+ break;
+ case 1:
+ if (comcode_v0_v1_remap[cc] != NULL) {
+ rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler);
+ }
+ break;
+ default:
+ /* Unknown interface.
+ * It should be impossible to get here so I'll just call abort
+ * to avoid getting a compiler warning :-)
+ */
+ abort();
+ }
+
+
+ if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND &&
+ client->root->callback->unknown != NULL)
+ {
+ rval= client->root->callback->unknown(client, header, raw_response_handler);
+ }
+
+ if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+ rval != PROTOCOL_BINARY_RESPONSE_EIO)
+ {
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= cc,
+ .status= htons(rval),
+ .opaque= header->request.opaque,
+ },
+ }
+ };
+ rval= raw_response_handler(client, header, (void*)&response);
+ }
+
+ if (client->root->callback->post_execute != NULL)
+ {
+ client->root->callback->post_execute(client, header);
+ }
+
+ return rval != PROTOCOL_BINARY_RESPONSE_EIO;
+}
+
+/*
+** **********************************************************************
+** * PUBLIC INTERFACE
+** * See protocol_handler.h for function description
+** **********************************************************************
+*/
+struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void)
+{
+ struct memcached_binary_protocol_st *ret= calloc(1, sizeof(*ret));
+ if (ret != NULL)
+ {
+ ret->recv= default_recv;
+ ret->send= default_send;
+ ret->input_buffer_size= 1 * 1024 * 1024;
+ ret->input_buffer= malloc(ret->input_buffer_size);
+ if (ret->input_buffer == NULL)
+ {
+ free(ret);
+ ret= NULL;
+ return NULL;
+ }
+
+ ret->buffer_cache = cache_create("protocol_handler",
+ CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
+ 0, NULL, NULL);
+ if (ret->buffer_cache == NULL) {
+ free(ret->input_buffer);
+ free(ret);
+ }
+ }
+
+ return ret;
+}
+
+void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance)
+{
+ cache_destroy(instance->buffer_cache);
+ free(instance->input_buffer);
+ free(instance);
+}
+
+struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance)
+{
+ return instance->callback;
+}
+
+void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback)
+{
+ instance->callback= callback;
+}
+
+memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie)
+{
+ (void)cookie;
+ return raw_response_handler;
+}
+
+void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable)
+{
+ instance->pedantic= enable;
+}
+
+bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance)
+{
+ return instance->pedantic;
+}
+
+struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock)
+{
+ struct memcached_binary_protocol_client_st *ret= calloc(1, sizeof(*ret));
+ if (ret != NULL)
+ {
+ ret->root= instance;
+ ret->sock= sock;
+ }
+
+ return ret;
+}
+
+void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client)
+{
+ free(client);
+}
+
+enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client)
+{
+ /* Try to send data and read from the socket */
+ bool more_data= true;
+ do
+ {
+ ssize_t len= client->root->recv(client,
+ client->sock,
+ client->root->input_buffer + client->input_buffer_offset,
+ client->root->input_buffer_size - client->input_buffer_offset);
+
+ if (len > 0)
+ {
+ /* Do we have the complete packet? */
+ if (client->input_buffer_offset > 0)
+ {
+ memcpy(client->root->input_buffer, client->input_buffer,
+ client->input_buffer_offset);
+ len += (ssize_t)client->input_buffer_offset;
+
+ /* @todo use buffer-cache! */
+ free(client->input_buffer);
+ client->input_buffer_offset= 0;
+ }
+
+ /* try to parse all of the received packets */
+ protocol_binary_request_header *header;
+ header= (void*)client->root->input_buffer;
+
+ if (header->request.magic != (uint8_t)PROTOCOL_BINARY_REQ)
+ {
+ client->error= EINVAL;
+ return ERROR_EVENT;
+ }
+
+ while (len >= (ssize_t)sizeof(*header) &&
+ (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen))))
+ {
+
+ /* I have the complete package */
+ client->current_command = header;
+ if (!execute_command(client, header))
+ {
+ return ERROR_EVENT;
+ }
+
+ ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen));
+ len -= total;
+ if (len > 0)
+ {
+ intptr_t ptr= (intptr_t)header;
+ ptr += total;
+ if ((ptr % 8) == 0)
+ {
+ header= (void*)ptr;
+ }
+ else
+ {
+ memmove(client->root->input_buffer, (void*)ptr, (size_t)len);
+ header= (void*)client->root->input_buffer;
+ }
+ }
+ }
+
+ if (len > 0)
+ {
+ /* save the data for later on */
+ /* @todo use buffer-cache */
+ client->input_buffer= malloc((size_t)len);
+ if (client->input_buffer == NULL)
+ {
+ client->error= ENOMEM;
+ return ERROR_EVENT;
+ }
+ memcpy(client->input_buffer, header, (size_t)len);
+ client->input_buffer_offset= (size_t)len;
+ more_data= false;
+ }
+ }
+ else if (len == 0)
+ {
+ /* Connection closed */
+ drain_output(client);
+ return ERROR_EVENT;
+ }
+ else
+ {
+ if (errno != EWOULDBLOCK)
+ {
+ client->error= errno;
+ /* mark this client as terminated! */
+ return ERROR_EVENT;
+ }
+ more_data = false;
+ }
+ } while (more_data);
+
+ if (!drain_output(client))
+ {
+ return ERROR_EVENT;
+ }
+
+ return (client->output) ? READ_WRITE_EVENT : READ_EVENT;
+}
--- /dev/null
+/*
+ * Summary: Definition of the callback interface to the protocol handler
+ *
+ * Copy: See Copyright for the status of this software.
+ *
+ * Author: Trond Norbye
+ */
+#ifndef MEMCACHED_PROTOCOL_H
+#define MEMCACHED_PROTOCOL_H
+
+#include <sys/types.h>
+#include <stdbool.h>
+
+#include <libmemcached/memcached/protocol_binary.h>
+#include <libmemcached/visibility.h>
+#include <libmemcached/protocol/callback.h>
+
+/* Forward declarations */
+/*
+ * You should only access memcached_binary_protocol_st from one thread!,
+ * and never assume anything about the internal layout / sizes of the
+ * structures.
+ */
+struct memcached_binary_protocol_st;
+struct memcached_binary_protocol_client_st;
+
+/**
+ * Function the protocol handler should call to receive data.
+ * This function should behave exactly like read(2)
+ *
+ * @param cookie a cookie used to represent a given client
+ * @param fd the filedescriptor associated with the client
+ * @param buf destination buffer
+ * @param nbuf number of bytes to receive
+ * @return the number of bytes copied into buf
+ * or -1 upon error (errno should contain more information)
+ */
+typedef ssize_t (*memcached_binary_protocol_recv_func)(const void *cookie,
+ int fd,
+ void *buf,
+ size_t nbuf);
+
+/**
+ * Function the protocol handler should call to send data.
+ * This function should behave exactly like write(2)
+ *
+ * @param cookie a cookie used to represent a given client
+ * @param fd the filedescriptor associated with the client
+ * @param buf the source buffer
+ * @param nbuf number of bytes to send
+ * @return the number of bytes sent
+ * or -1 upon error (errno should contain more information)
+ */
+typedef ssize_t (*memcached_binary_protocol_send_func)(const void *cookie,
+ int fd,
+ const void *buf,
+ size_t nbuf);
+
+/**
+ * Create an instance of the protocol handler
+ *
+ * @return NULL if allocation of an instance fails
+ */
+LIBMEMCACHED_API
+struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void);
+
+/**
+ * Get the callbacks associated with a protocol handler instance
+ * @return the callbacks currently used
+ */
+LIBMEMCACHED_API
+struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance);
+
+/**
+ * Set the callbacks to be used by the given protocol handler instance
+ * @param instance the instance to update
+ * @param callback the callbacks to use
+ */
+LIBMEMCACHED_API
+void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback);
+
+/**
+ * Should the library inspect the packages being sent and received and verify
+ * that they are according to the specification? If it encounters an invalid
+ * packet, it will return an EINVAL packet.
+ *
+ * @param instance the instance to update
+ * @param enable true if you want the library to check packages, false otherwise
+ */
+LIBMEMCACHED_API
+void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable);
+
+/**
+ * Is the library inpecting each package?
+ * @param instance the instance to check
+ * @return true it the library is inspecting each package, false otherwise
+ */
+LIBMEMCACHED_API
+bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance);
+
+/**
+ * Destroy an instance of the protocol handler
+ *
+ * @param instance The instance to destroy
+ */
+LIBMEMCACHED_API
+void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance);
+
+/**
+ * Set the IO functions used by the instance to send and receive data. The
+ * functions should behave like recv(3socket) and send(3socket).
+ *
+ * @param instance the instance to specify the IO functions for
+ * @param recv the function to call for reciving data
+ * @param send the function to call for sending data
+ */
+LIBMEMCACHED_API
+void memached_binary_protocol_set_io_functions(struct memcached_binary_protocol_st *instance,
+ memcached_binary_protocol_recv_func recv,
+ memcached_binary_protocol_send_func send);
+
+
+/**
+ * Create a new client instance and associate it with a socket
+ * @param instance the protocol instance to bind the client to
+ * @param sock the client socket
+ * @return NULL if allocation fails, otherwise an instance
+ */
+LIBMEMCACHED_API
+struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock);
+
+/**
+ * Destroy a client handle.
+ * The caller needs to close the socket accociated with the client
+ * <b>before</b> calling this function. This function invalidates the
+ * client memory area.
+ *
+ * @param client the client to destroy
+ */
+LIBMEMCACHED_API
+void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client);
+
+/**
+ * The different events the client is interested in
+ */
+enum MEMCACHED_BINARY_PROTOCOL_EVENT {
+ /* Error event means that the client encountered an error with the
+ * connection so you should shut it down */
+ ERROR_EVENT,
+ /* Please notify when there is more data available to read */
+ READ_EVENT,
+ /* Please notify when it is possible to send more data */
+ WRITE_EVENT,
+ /* Please notify when it is possible to send or receive data */
+ READ_WRITE_EVENT
+};
+
+/**
+ * Let the client do some work. This might involve reading / sending data
+ * to/from the client, or perform callbacks to execute a command.
+ * @param client the client structure to work on
+ * @return The next event the protocol handler will be notified for
+ */
+LIBMEMCACHED_API
+enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client);
+
+/**
+ * Get the socket attached to a client handle
+ * @param client the client to query
+ * @return the socket handle
+ */
+LIBMEMCACHED_API
+int memcached_binary_protocol_client_get_socket(struct memcached_binary_protocol_client_st *client);
+
+/**
+ * Get the error id socket attached to a client handle
+ * @param client the client to query for an error code
+ * @return the OS error code from the client
+ */
+LIBMEMCACHED_API
+int memcached_binary_protocol_client_get_errno(struct memcached_binary_protocol_client_st *client);
+
+/**
+ * Get a raw response handler for the given cookie
+ * @param cookie the cookie passed along into the callback
+ * @return the raw reponse handler you may use if you find
+ * the generic callback too limiting
+ */
+LIBMEMCACHED_API
+memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie);
+#endif