Prototype of a protocol parsing library for the binary protocol
authorTrond Norbye <trond.norbye@sun.com>
Thu, 17 Sep 2009 13:29:16 +0000 (15:29 +0200)
committerTrond Norbye <trond.norbye@sun.com>
Thu, 17 Sep 2009 13:29:16 +0000 (15:29 +0200)
21 files changed:
.bzrignore
Makefile.am
configure.ac
example/Makefile.am [new file with mode: 0644]
example/common.h [new file with mode: 0644]
example/interface_v0.c [new file with mode: 0644]
example/interface_v1.c [new file with mode: 0644]
example/memcached_light.c [new file with mode: 0644]
example/storage.c [new file with mode: 0644]
example/storage.h [new file with mode: 0644]
libmemcached/Makefile.am
libmemcached/memcached/protocol_binary.h
libmemcached/protocol/Makefile.am [new file with mode: 0644]
libmemcached/protocol/cache.c [new file with mode: 0644]
libmemcached/protocol/cache.h [new file with mode: 0644]
libmemcached/protocol/callback.h [new file with mode: 0644]
libmemcached/protocol/common.h [new file with mode: 0644]
libmemcached/protocol/libmemcachedprotocol.ver [new file with mode: 0644]
libmemcached/protocol/pedantic.c [new file with mode: 0644]
libmemcached/protocol/protocol_handler.c [new file with mode: 0644]
libmemcached/protocol_handler.h [new file with mode: 0644]

index dc1898a6b765711c72159b16ac1655d604029731..f1beb3e190f7cd975be5edfb40243d4661340e05 100644 (file)
@@ -1,9 +1,12 @@
 *.lo
 */.deps
 */.libs
+*/*/.deps
+*/*/.libs
 */Makefile
 */Makefile.in
 */*.l[oa]
+*/*/*.l[oa]
 *TAGS
 INSTALL
 Makefile
@@ -30,8 +33,10 @@ config/depcomp
 config/install-sh
 config/ltmain.sh
 config/missing
+config/plugin.ac
 configure
 docs/*.[13]
+example/memcached_light
 libmemcached-*.tar.gz
 libmemcached/memcached_configure.h
 libtool
@@ -56,4 +61,4 @@ libmemcached-0.30-1.src.rpm
 libmemcached-0.30-1.x86_64.rpm
 libmemcached-0.31-1.src.rpm
 libmemcached-0.31-1.x86_64.rpm
-config/plugin.ac
+libmemcached-?.??/
index 672ac763adbafe7dd8c0fb7f87a4f7c98326c1bc..affdae1977275ea49d04614a80b5be5cf1d10e8f 100644 (file)
@@ -1,6 +1,6 @@
 ACLOCAL_AMFLAGS = -I m4
 
-SUBDIRS = docs libmemcached libmemcachedutil support clients tests
+SUBDIRS = docs libmemcached libmemcachedutil support clients tests example
 EXTRA_dist = README.FIRST
 
 check-local: test-no-outputdiff
index 6ce3f3c9d07c81540c9f6ff2e634aba39f04c8e8..253978b2daa453b2527c9a89076fa23294e1ccda 100644 (file)
@@ -29,6 +29,8 @@ MEMCACHED_LIBRARY_VERSION=3:0:0
 AC_SUBST(MEMCACHED_LIBRARY_VERSION)
 MEMCACHEDUTIL_LIBRARY_VERSION=0:0:0
 AC_SUBST(MEMCACHEDUTIL_LIBRARY_VERSION)
+MEMCACHEDPROTOCOL_LIBRARY_VERSION=0:0:0
+AC_SUBST(MEMCACHEDPROTOCOL_LIBRARY_VERSION)
 
 
 # libmemcached versioning when linked with GNU ld.
@@ -36,9 +38,11 @@ if test "$lt_cv_prog_gnu_ld" = "yes"
 then
     LD_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcached/libmemcached.ver"
     LD_UTIL_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcachedutil/libmemcachedutil.ver"
+    LD_PROTOCOL_VERSION_SCRIPT="-Wl,--version-script=\$(top_srcdir)/libmemcached/protocol/libmemcachedprotocol.ver"
 fi
 AC_SUBST(LD_VERSION_SCRIPT)
 AC_SUBST(LD_UTIL_VERSION_SCRIPT)
+AC_SUBST(LD_PROTOCOL_VERSION_SCRIPT)
 
 
 #--------------------------------------------------------------------
@@ -87,9 +91,11 @@ AC_CONFIG_FILES([
   clients/Makefile
   tests/Makefile
   docs/Makefile
+  example/Makefile
   libmemcached/Makefile
   libmemcached/memcached_configure.h
   libmemcachedutil/Makefile
+  libmemcached/protocol/Makefile
   support/Makefile
   support/libmemcached.pc
   support/libmemcached.spec
diff --git a/example/Makefile.am b/example/Makefile.am
new file mode 100644 (file)
index 0000000..7c93764
--- /dev/null
@@ -0,0 +1,8 @@
+noinst_PROGRAMS = memcached_light
+
+memcached_light_SOURCES= memcached_light.c \
+                         storage.c storage.h \
+                         interface_v0.c \
+                         interface_v1.c
+memcached_light_LDADD= $(top_builddir)/libmemcached/protocol/libmemcachedprotocol.la
+
diff --git a/example/common.h b/example/common.h
new file mode 100644 (file)
index 0000000..6f87a02
--- /dev/null
@@ -0,0 +1,18 @@
+#ifndef EXAMPLE_COMMON_H
+#define EXAMPLE_COMMON_H
+
+#include <netinet/in.h>
+
+#ifdef linux
+/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
+ * optimize the conversion functions, but the prototypes generate warnings
+ * from gcc. The conversion methods isn't the bottleneck for my app, so
+ * just remove the warnings by undef'ing the optimization ..
+ */
+#undef ntohs
+#undef ntohl
+#undef htons
+#undef htonl
+#endif
+
+#endif
diff --git a/example/interface_v0.c b/example/interface_v0.c
new file mode 100644 (file)
index 0000000..5971366
--- /dev/null
@@ -0,0 +1,520 @@
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+/**
+ * This file contains an implementation of the callback interface for level 0
+ * in the protocol library. You might want to have your copy of the protocol
+ * specification next to your coffee ;-)
+ */
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/tcp.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libmemcached/protocol_handler.h>
+#include "common.h"
+#include "storage.h"
+
+static protocol_binary_response_status noop_command_handler(const void *cookie,
+                                                            protocol_binary_request_header *header,
+                                                            memcached_binary_protocol_raw_response_handler response_handler)
+{
+  protocol_binary_response_no_extras response= {
+    .message.header.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= PROTOCOL_BINARY_CMD_NOOP,
+      .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+      .opaque= header->request.opaque
+    }
+  };
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status quit_command_handler(const void *cookie,
+                                                            protocol_binary_request_header *header,
+                                                            memcached_binary_protocol_raw_response_handler response_handler)
+{
+  protocol_binary_response_no_extras response= {
+    .message.header.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= PROTOCOL_BINARY_CMD_QUIT,
+      .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+      .opaque= header->request.opaque
+    }
+  };
+
+  if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
+    response_handler(cookie, header, (void*)&response);
+
+  /* I need a better way to signal to close the connection */
+  return PROTOCOL_BINARY_RESPONSE_EIO;
+}
+
+static protocol_binary_response_status get_command_handler(const void *cookie,
+                                                           protocol_binary_request_header *header,
+                                                           memcached_binary_protocol_raw_response_handler response_handler)
+{
+  uint8_t opcode= header->request.opcode;
+  union {
+    protocol_binary_response_get response;
+    char buffer[4096];
+  } msg= {
+    .response.message.header.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= opcode,
+      .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+      .opaque= header->request.opaque
+    }
+  };
+
+  struct item *item= get_item(header + 1, ntohs(header->request.keylen));
+  if (item)
+  {
+    msg.response.message.body.flags= htonl(item->flags);
+    char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4);
+    uint32_t bodysize= 4;
+    msg.response.message.header.response.cas= htonll(item->cas);
+    if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ)
+    {
+      memcpy(ptr, item->key, item->nkey);
+      msg.response.message.header.response.keylen= htons((uint16_t)item->nkey);
+      ptr += item->nkey;
+      bodysize += (uint32_t)item->nkey;
+    }
+    memcpy(ptr, item->data, item->size);
+    bodysize += (uint32_t)item->size;
+    msg.response.message.header.response.bodylen= htonl(bodysize);
+    msg.response.message.header.response.extlen= 4;
+
+    return response_handler(cookie, header, (void*)&msg);
+  }
+  else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK)
+  {
+    msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
+    return response_handler(cookie, header, (void*)&msg);
+  }
+
+  /* Q shouldn't report a miss ;-) */
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status delete_command_handler(const void *cookie,
+                                                              protocol_binary_request_header *header,
+                                                              memcached_binary_protocol_raw_response_handler response_handler)
+{
+  size_t keylen= ntohs(header->request.keylen);
+  char *key= ((char*)header) + sizeof(*header);
+  protocol_binary_response_no_extras response= {
+    .message.header.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= header->request.opcode,
+      .opaque= header->request.opaque
+    }
+  };
+
+  if (!delete_item(key, keylen))
+  {
+    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
+    return response_handler(cookie, header, (void*)&response);
+  }
+  else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
+  {
+    /* DELETEQ doesn't want success response */
+    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
+    return response_handler(cookie, header, (void*)&response);
+  }
+
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status flush_command_handler(const void *cookie,
+                                                             protocol_binary_request_header *header,
+                                                             memcached_binary_protocol_raw_response_handler response_handler)
+{
+  uint8_t opcode= header->request.opcode;
+
+  /* @fixme sett inn when! */
+  flush(0);
+
+  if (opcode == PROTOCOL_BINARY_CMD_FLUSH)
+  {
+    protocol_binary_response_no_extras response= {
+      .message.header.response= {
+        .magic= PROTOCOL_BINARY_RES,
+        .opcode= opcode,
+        .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+        .opaque= header->request.opaque
+      }
+    };
+    return response_handler(cookie, header, (void*)&response);
+  }
+
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status arithmetic_command_handler(const void *cookie,
+                                                                  protocol_binary_request_header *header,
+                                                                  memcached_binary_protocol_raw_response_handler response_handler)
+{
+  protocol_binary_request_incr *req= (void*)header;
+  protocol_binary_response_incr response= {
+    .message.header.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= header->request.opcode,
+      .opaque= header->request.opaque,
+    },
+  };
+
+  uint16_t keylen= ntohs(header->request.keylen);
+  uint64_t initial= ntohll(req->message.body.initial);
+  uint64_t delta= ntohll(req->message.body.delta);
+  uint32_t expiration= ntohl(req->message.body.expiration);
+  void *key= req->bytes + sizeof(req->bytes);
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+  struct item *item= get_item(key, keylen);
+  if (item == NULL)
+  {
+    item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration);
+    if (item == 0)
+    {
+      rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+    }
+    else
+    {
+      memcpy(item->data, &initial, sizeof(initial));
+      put_item(item);
+    }
+  }
+  else
+  {
+    if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT ||
+        header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ)
+    {
+      (*(uint64_t*)item->data) += delta;
+    }
+    else
+    {
+      if (delta > *(uint64_t*)item->data)
+      {
+        *(uint64_t*)item->data= 0;
+      }
+      else
+      {
+        *(uint64_t*)item->data -= delta;
+      }
+    }
+    update_cas(item);
+  }
+
+  response.message.header.response.status= htons(rval);
+  if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS)
+  {
+    response.message.header.response.bodylen= ntohl(8);
+    response.message.body.value= ntohll((*(uint64_t*)item->data));
+    response.message.header.response.cas= ntohll(item->cas);
+
+    if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ ||
+        header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ)
+    {
+      return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+    }
+  }
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status version_command_handler(const void *cookie,
+                                                               protocol_binary_request_header *header,
+                                                               memcached_binary_protocol_raw_response_handler response_handler)
+{
+  const char *versionstring= "1.0.0";
+  union {
+    protocol_binary_response_header packet;
+    char buffer[256];
+  } response= {
+    .packet.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= PROTOCOL_BINARY_CMD_VERSION,
+      .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+      .opaque= header->request.opaque,
+      .bodylen= htonl((uint32_t)strlen(versionstring))
+    }
+  };
+
+  memcpy(response.buffer + sizeof(response.packet), versionstring, strlen(versionstring));
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status concat_command_handler(const void *cookie,
+                                                              protocol_binary_request_header *header,
+                                                              memcached_binary_protocol_raw_response_handler response_handler)
+{
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  uint16_t keylen= ntohs(header->request.keylen);
+  uint64_t cas= ntohll(header->request.cas);
+  void *key= header + 1;
+  uint32_t vallen= ntohl(header->request.bodylen) - keylen;
+  void *val= (char*)key + keylen;
+
+  struct item *item= get_item(key, keylen);
+  struct item *nitem;
+  if (item == NULL)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+  }
+  else if (cas != 0 && cas != item->cas)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+  }
+  else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
+                               item->flags, item->exp)) == NULL)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+  }
+  else
+  {
+    if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
+        header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ)
+    {
+      memcpy(nitem->data, item->data, item->size);
+      memcpy(((char*)(nitem->data)) + item->size, val, vallen);
+    }
+    else
+    {
+      memcpy(nitem->data, val, vallen);
+      memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
+    }
+    delete_item(key, keylen);
+    put_item(nitem);
+    if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
+        header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
+    {
+      protocol_binary_response_no_extras response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= header->request.opcode,
+            .status= htons(rval),
+            .opaque= header->request.opaque,
+            .cas= htonll(nitem->cas),
+          }
+        }
+      };
+      return response_handler(cookie, header, (void*)&response);
+    }
+  }
+
+  return rval;
+}
+
+static protocol_binary_response_status set_command_handler(const void *cookie,
+                                                           protocol_binary_request_header *header,
+                                                           memcached_binary_protocol_raw_response_handler response_handler)
+{
+  size_t keylen= ntohs(header->request.keylen);
+  size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+  protocol_binary_request_replace *request= (void*)header;
+  uint32_t flags= ntohl(request->message.body.flags);
+  time_t timeout= ntohl(request->message.body.expiration);
+  char *key= ((char*)header) + sizeof(*header) + 8;
+  char *data= key + keylen;
+
+  protocol_binary_response_no_extras response= {
+    .message= {
+      .header.response= {
+        .magic= PROTOCOL_BINARY_RES,
+        .opcode= header->request.opcode,
+        .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+        .opaque= header->request.opaque
+      }
+    }
+  };
+
+  if (header->request.cas != 0)
+  {
+    /* validate cas */
+    struct item* item= get_item(key, keylen);
+    if (item != NULL && item->cas != ntohll(header->request.cas))
+    {
+      response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
+      return response_handler(cookie, header, (void*)&response);
+    }
+  }
+
+  delete_item(key, keylen);
+  struct item* item= create_item(key, keylen, data, datalen, flags, timeout);
+  if (item == 0)
+  {
+    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
+  }
+  else
+  {
+    put_item(item);
+    /* SETQ shouldn't return a message */
+    if (header->request.opcode == PROTOCOL_BINARY_CMD_SET)
+    {
+      response.message.header.response.cas= htonll(item->cas);
+      return response_handler(cookie, header, (void*)&response);
+    }
+    return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  }
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status add_command_handler(const void *cookie,
+                                                           protocol_binary_request_header *header,
+                                                           memcached_binary_protocol_raw_response_handler response_handler)
+{
+  size_t keylen= ntohs(header->request.keylen);
+  size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+  protocol_binary_request_add *request= (void*)header;
+  uint32_t flags= ntohl(request->message.body.flags);
+  time_t timeout= ntohl(request->message.body.expiration);
+  char *key= ((char*)header) + sizeof(*header) + 8;
+  char *data= key + keylen;
+
+  protocol_binary_response_no_extras response= {
+    .message= {
+      .header.response= {
+        .magic= PROTOCOL_BINARY_RES,
+        .opcode= header->request.opcode,
+        .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+        .opaque= header->request.opaque
+      }
+    }
+  };
+
+  struct item* item= get_item(key, keylen);
+  if (item == NULL)
+  {
+    item= create_item(key, keylen, data, datalen, flags, timeout);
+    if (item == 0)
+      response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
+    else
+    {
+      put_item(item);
+      /* ADDQ shouldn't return a message */
+      if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
+      {
+        response.message.header.response.cas= htonll(item->cas);
+        return response_handler(cookie, header, (void*)&response);
+      }
+      return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+    }
+  }
+  else
+  {
+    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
+  }
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status replace_command_handler(const void *cookie,
+                                                               protocol_binary_request_header *header,
+                                                               memcached_binary_protocol_raw_response_handler response_handler)
+{
+  size_t keylen= ntohs(header->request.keylen);
+  size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+  protocol_binary_request_replace *request= (void*)header;
+  uint32_t flags= ntohl(request->message.body.flags);
+  time_t timeout= ntohl(request->message.body.expiration);
+  char *key= ((char*)header) + sizeof(*header) + 8;
+  char *data= key + keylen;
+
+  protocol_binary_response_no_extras response= {
+    .message= {
+      .header.response= {
+        .magic= PROTOCOL_BINARY_RES,
+        .opcode= header->request.opcode,
+        .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+        .opaque= header->request.opaque
+      }
+    }
+  };
+
+  struct item* item= get_item(key, keylen);
+  if (item == NULL)
+    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
+  else if (header->request.cas == 0 || ntohll(header->request.cas) == item->cas)
+  {
+    delete_item(key, keylen);
+    item= create_item(key, keylen, data, datalen, flags, timeout);
+    if (item == 0)
+      response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
+    else
+    {
+      put_item(item);
+      /* REPLACEQ shouldn't return a message */
+      if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
+      {
+        response.message.header.response.cas= htonll(item->cas);
+        return response_handler(cookie, header, (void*)&response);
+      }
+      return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+    }
+  }
+  else
+    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+static protocol_binary_response_status stat_command_handler(const void *cookie,
+                                                            protocol_binary_request_header *header,
+                                                            memcached_binary_protocol_raw_response_handler response_handler)
+{
+  /* Just send the terminating packet*/
+  protocol_binary_response_no_extras response= {
+    .message= {
+      .header.response= {
+        .magic= PROTOCOL_BINARY_RES,
+        .opcode= PROTOCOL_BINARY_CMD_STAT,
+        .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+        .opaque= header->request.opaque
+      }
+    }
+  };
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+struct memcached_binary_protocol_callback_st interface_v0_impl= {
+  .interface_version= 0,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler,
+  .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler,
+};
diff --git a/example/interface_v1.c b/example/interface_v1.c
new file mode 100644 (file)
index 0000000..04e62aa
--- /dev/null
@@ -0,0 +1,386 @@
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+/**
+ * This file contains an implementation of the callback interface for level 1
+ * in the protocol library. If you compare the implementation with the one
+ * in interface_v0.c you will see that this implementation is much easier and
+ * hides all of the protocol logic and let you focus on the application
+ * logic. One "problem" with this layer is that it is synchronous, so that
+ * you will not receive the next command before a answer to the previous
+ * command is being sent.
+ */
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/tcp.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libmemcached/protocol_handler.h>
+#include "common.h"
+#include "storage.h"
+
+static protocol_binary_response_status add_handler(const void *cookie,
+                                                   const void *key,
+                                                   uint16_t keylen,
+                                                   const void *data,
+                                                   uint32_t datalen,
+                                                   uint32_t flags,
+                                                   uint32_t exptime,
+                                                   uint64_t *cas)
+{
+  (void)cookie;
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  struct item* item= get_item(key, keylen);
+  if (item == NULL)
+  {
+    item= create_item(key, keylen, data, datalen, flags, exptime);
+    if (item == 0)
+    {
+      rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+    }
+    else
+    {
+      put_item(item);
+      *cas= item->cas;
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+  }
+
+  return rval;
+}
+
+static protocol_binary_response_status append_handler(const void *cookie,
+                                                      const void *key,
+                                                      uint16_t keylen,
+                                                      const void* val,
+                                                      uint32_t vallen,
+                                                      uint64_t cas,
+                                                      uint64_t *result_cas) {
+  (void)cookie;
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+  struct item *item= get_item(key, keylen);
+  struct item *nitem;
+
+  if (item == NULL)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+  }
+  else if (cas != 0 && cas != item->cas)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+  }
+  else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
+                               item->flags, item->exp)) == NULL)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+  }
+  else
+  {
+    memcpy(nitem->data, item->data, item->size);
+    memcpy(((char*)(nitem->data)) + item->size, val, vallen);
+    delete_item(key, keylen);
+    put_item(nitem);
+    *result_cas= nitem->cas;
+  }
+
+  return rval;
+}
+
+static protocol_binary_response_status decrement_handler(const void *cookie,
+                                                         const void *key,
+                                                         uint16_t keylen,
+                                                         uint64_t delta,
+                                                         uint64_t initial,
+                                                         uint32_t expiration,
+                                                         uint64_t *result,
+                                                         uint64_t *result_cas) {
+  (void)cookie;
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  struct item *item= get_item(key, keylen);
+
+  if (item == NULL)
+  {
+    item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration);
+    if (item == 0)
+    {
+      rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+    }
+    else
+    {
+      memcpy(item->data, &initial, sizeof(initial));
+      put_item(item);
+      *result= initial;
+      *result_cas= item->cas;
+    }
+  }
+  else
+  {
+    if (delta > *(uint64_t*)item->data)
+    {
+      *(uint64_t*)item->data= 0;
+    }
+    else
+    {
+      *(uint64_t*)item->data -= delta;
+    }
+    *result= (*(uint64_t*)item->data);
+    /* @todo fix cas */
+  }
+
+  return rval;
+}
+
+static protocol_binary_response_status delete_handler(const void *cookie,
+                                                      const void *key,
+                                                      uint16_t keylen,
+                                                      uint64_t cas) {
+  (void)cookie;
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+  if (cas != 0)
+  {
+    struct item *item= get_item(key, keylen);
+    if (item != NULL && item->cas != cas)
+    {
+      return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+    }
+  }
+
+  if (!delete_item(key, keylen))
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+  }
+
+  return rval;
+}
+
+
+static protocol_binary_response_status flush_handler(const void *cookie,
+                                                     uint32_t when) {
+
+  (void)cookie;
+  flush(when);
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status get_handler(const void *cookie,
+                                                   const void *key,
+                                                   uint16_t keylen,
+                                                   memcached_binary_protocol_get_response_handler response_handler) {
+  struct item *item= get_item(key, keylen);
+
+  if (item == NULL)
+  {
+    return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+  }
+
+  return response_handler(cookie, key, (uint16_t)keylen,
+                          item->data, (uint32_t)item->size, item->flags,
+                          item->cas);
+}
+
+static protocol_binary_response_status increment_handler(const void *cookie,
+                                                         const void *key,
+                                                         uint16_t keylen,
+                                                         uint64_t delta,
+                                                         uint64_t initial,
+                                                         uint32_t expiration,
+                                                         uint64_t *result,
+                                                         uint64_t *result_cas) {
+  (void)cookie;
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  struct item *item= get_item(key, keylen);
+
+  if (item == NULL)
+  {
+    item= create_item(key, keylen, NULL, sizeof(initial), 0, expiration);
+    if (item == 0)
+    {
+      rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+    }
+    else
+    {
+      memcpy(item->data, &initial, sizeof(initial));
+      put_item(item);
+      *result= initial;
+      *result_cas= item->cas;
+    }
+  }
+  else
+  {
+    (*(uint64_t*)item->data) += delta;
+    *result= (*(uint64_t*)item->data);
+    update_cas(item);
+    *result_cas= item->cas;
+  }
+
+  return rval;
+}
+
+static protocol_binary_response_status noop_handler(const void *cookie) {
+  (void)cookie;
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status prepend_handler(const void *cookie,
+                                                       const void *key,
+                                                       uint16_t keylen,
+                                                       const void* val,
+                                                       uint32_t vallen,
+                                                       uint64_t cas,
+                                                       uint64_t *result_cas) {
+  (void)cookie;
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+  struct item *item= get_item(key, keylen);
+  struct item *nitem;
+  if (item == NULL)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+  }
+  else if (cas != 0 && cas != item->cas)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+  }
+  else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
+                                 item->flags, item->exp)) == NULL)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+  }
+  else
+  {
+    memcpy(nitem->data, val, vallen);
+    memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
+    delete_item(key, keylen);
+    put_item(nitem);
+    *result_cas= nitem->cas;
+  }
+
+  return rval;
+}
+
+static protocol_binary_response_status quit_handler(const void *cookie) {
+  (void)cookie;
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+static protocol_binary_response_status replace_handler(const void *cookie,
+                                                       const void *key,
+                                                       uint16_t keylen,
+                                                       const void* data,
+                                                       uint32_t datalen,
+                                                       uint32_t flags,
+                                                       uint32_t exptime,
+                                                       uint64_t cas,
+                                                       uint64_t *result_cas) {
+  (void)cookie;
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  struct item* item= get_item(key, keylen);
+
+  if (item == NULL)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+  }
+  else if (cas == 0 || cas == item->cas)
+  {
+    delete_item(key, keylen);
+    item= create_item(key, keylen, data, datalen, flags, exptime);
+    if (item == 0)
+    {
+      rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+    }
+    else
+    {
+      put_item(item);
+      *result_cas= item->cas;
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+  }
+
+  return rval;
+}
+
+static protocol_binary_response_status set_handler(const void *cookie,
+                                                   const void *key,
+                                                   uint16_t keylen,
+                                                   const void* data,
+                                                   uint32_t datalen,
+                                                   uint32_t flags,
+                                                   uint32_t exptime,
+                                                   uint64_t cas,
+                                                   uint64_t *result_cas) {
+  (void)cookie;
+  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+
+  if (cas != 0)
+  {
+    struct item* item= get_item(key, keylen);
+    if (item != NULL && cas != item->cas)
+    {
+      /* Invalid CAS value */
+      return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
+    }
+  }
+
+  delete_item(key, keylen);
+  struct item* item= create_item(key, keylen, data, datalen, flags, exptime);
+  if (item == 0)
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
+  }
+  else
+  {
+    put_item(item);
+    *result_cas= item->cas;
+  }
+
+  return rval;
+}
+
+static protocol_binary_response_status stat_handler(const void *cookie,
+                                                    const void *key,
+                                                    uint16_t keylen,
+                                                    memcached_binary_protocol_stat_response_handler response_handler) {
+  (void)key;
+  (void)keylen;
+  /* Just return an empty packet */
+  return response_handler(cookie, NULL, 0, NULL, 0);
+}
+
+static protocol_binary_response_status version_handler(const void *cookie,
+                                                       memcached_binary_protocol_version_response_handler response_handler) {
+  const char *version= "0.1.1";
+  return response_handler(cookie, version, (uint32_t)strlen(version));
+}
+
+struct memcached_binary_protocol_callback_st interface_v1_impl= {
+  .interface_version= 1,
+  .interface.v1= {
+    .add= add_handler,
+    .append= append_handler,
+    .decrement= decrement_handler,
+    .delete= delete_handler,
+    .flush= flush_handler,
+    .get= get_handler,
+    .increment= increment_handler,
+    .noop= noop_handler,
+    .prepend= prepend_handler,
+    .quit= quit_handler,
+    .replace= replace_handler,
+    .set= set_handler,
+    .stat= stat_handler,
+    .version= version_handler
+  }
+};
diff --git a/example/memcached_light.c b/example/memcached_light.c
new file mode 100644 (file)
index 0000000..b373a12
--- /dev/null
@@ -0,0 +1,363 @@
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+/**
+ * What is a library without an example to show you how to use the library?
+ * This example use both interfaces to implement a small memcached server.
+ * Please note that this is an exemple on how to use the library, not
+ * an implementation of a scalable memcached server. If you look closely
+ * at the example it isn't even multithreaded ;-)
+ *
+ * With that in mind, let me give you some pointers into the source:
+ *   storage.c/h       - Implements the item store for this server and not really
+ *                       interesting for this example.
+ *   interface_v0.c    - Shows an implementation of the memcached server by using
+ *                       the "raw" access to the packets as they arrive
+ *   interface_v1.c    - Shows an implementation of the memcached server by using
+ *                       the more "logical" interface.
+ *   memcached_light.c - This file sets up all of the sockets and run the main
+ *                       message loop.
+ */
+
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/tcp.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <poll.h>
+
+#include <libmemcached/protocol_handler.h>
+#include "common.h"
+#include "storage.h"
+
+extern struct memcached_binary_protocol_callback_st interface_v0_impl;
+extern struct memcached_binary_protocol_callback_st interface_v1_impl;
+
+static int server_sockets[1024];
+static int num_server_sockets= 0;
+static void* socket_userdata_map[1024];
+
+/**
+ * Create a socket and bind it to a specific port number
+ * @param port the port number to bind to
+ */
+static int server_socket(const char *port) {
+  struct addrinfo *ai;
+  struct addrinfo hints= { .ai_flags= AI_PASSIVE,
+                           .ai_family= AF_UNSPEC,
+                           .ai_socktype= SOCK_STREAM };
+
+  int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
+  if (error != 0)
+  {
+    if (error != EAI_SYSTEM)
+      fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
+    else
+      perror("getaddrinfo()");
+
+    return 1;
+  }
+
+  struct linger ling= {0, 0};
+
+  for (struct addrinfo *next= ai; next; next= next->ai_next)
+  {
+    int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+    if (sock == -1)
+    {
+      perror("Failed to create socket");
+      continue;
+    }
+
+    int flags= fcntl(sock, F_GETFL, 0);
+    if (flags == -1)
+    {
+      perror("Failed to get socket flags");
+      close(sock);
+      continue;
+    }
+
+    if ((flags & O_NONBLOCK) != O_NONBLOCK)
+      if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
+      {
+        perror("Failed to set socket to nonblocking mode");
+        close(sock);
+        continue;
+      }
+
+    flags= 1;
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
+      perror("Failed to set SO_REUSEADDR");
+
+    if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
+      perror("Failed to set SO_KEEPALIVE");
+
+    if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
+      perror("Failed to set SO_LINGER");
+
+    if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
+      perror("Failed to set TCP_NODELAY");
+
+    if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
+    {
+      if (errno != EADDRINUSE)
+      {
+        perror("bind()");
+        freeaddrinfo(ai);
+      }
+      close(sock);
+      continue;
+    }
+
+    if (listen(sock, 1024) == -1)
+    {
+      perror("listen()");
+      close(sock);
+      continue;
+    }
+
+    server_sockets[num_server_sockets++]= sock;
+  }
+
+  freeaddrinfo(ai);
+
+  return (num_server_sockets > 0) ? 0 : 1;
+}
+
+/**
+ * Convert a command code to a textual string
+ * @param cmd the comcode to convert
+ * @return a textual string with the command or NULL for unknown commands
+ */
+static const char* comcode2str(uint8_t cmd)
+{
+  static const char * const text[] = {
+    "GET", "SET", "ADD", "REPLACE", "DELETE",
+    "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
+    "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
+    "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
+    "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
+    "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
+  };
+
+  if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
+    return text[cmd];
+
+  return NULL;
+}
+
+/**
+ * Print out the command we are about to execute
+ */
+static void pre_execute(const void *cookie, protocol_binary_request_header *header)
+{
+  const char *cmd= comcode2str(header->request.opcode);
+  if (cmd != NULL)
+    fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
+  else
+    fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+}
+
+/**
+ * Print out the command we just executed
+ */
+static void post_execute(const void *cookie, protocol_binary_request_header *header)
+{
+  const char *cmd= comcode2str(header->request.opcode);
+  if (cmd != NULL)
+    fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
+  else
+    fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+}
+
+/**
+ * Callback handler for all unknown commands.
+ * Send an unknown command back to the client
+ */
+static protocol_binary_response_status unknown(const void *cookie,
+                                               protocol_binary_request_header *header,
+                                               memcached_binary_protocol_raw_response_handler response_handler)
+{
+  protocol_binary_response_no_extras response= {
+    .message= {
+      .header.response= {
+        .magic= PROTOCOL_BINARY_RES,
+        .opcode= header->request.opcode,
+        .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
+        .opaque= header->request.opaque
+      }
+    }
+  };
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+static void work(void);
+
+/**
+ * Program entry point. Bind to the specified port(s) and serve clients
+ *
+ * @param argc number of items in the argument vector
+ * @param argv argument vector
+ * @return 0 on success, 1 otherwise
+ */
+int main(int argc, char **argv)
+{
+  bool port_specified= false;
+  int cmd;
+  struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
+
+  while ((cmd= getopt(argc, argv, "1p:?")) != EOF)
+  {
+    switch (cmd) {
+    case '1':
+      interface= &interface_v1_impl;
+      break;
+    case 'p':
+      port_specified= true;
+      (void)server_socket(optarg);
+      break;
+    case '?':  /* FALLTHROUGH */
+    default:
+      (void)fprintf(stderr, "Usage: %s [-p port]\n", argv[0]);
+      return 1;
+    }
+  }
+
+  if (!port_specified)
+    (void)server_socket("9999");
+
+  if (num_server_sockets == 0)
+  {
+    fprintf(stderr, "I don't have any server sockets\n");
+    return 1;
+  }
+
+  /*
+   * Create and initialize the handles to the protocol handlers. I want
+   * to be able to trace the traffic throught the pre/post handlers, and
+   * set up a common handler for unknown messages
+   */
+  interface->pre_execute= pre_execute;
+  interface->post_execute= post_execute;
+  interface->unknown= unknown;
+
+  struct memcached_binary_protocol_st *protocol_handle;
+  if ((protocol_handle= memcached_binary_protocol_create_instance()) == NULL)
+  {
+    fprintf(stderr, "Failed to allocate protocol handle\n");
+    return 1;
+  }
+
+  memcached_binary_protocol_set_callbacks(protocol_handle, interface);
+  memcached_binary_protocol_set_pedantic(protocol_handle, true);
+
+  for (int xx= 0; xx < num_server_sockets; ++xx)
+    socket_userdata_map[server_sockets[xx]]= protocol_handle;
+
+  /* Serve all of the clients */
+  work();
+
+  /* NOTREACHED */
+  return 0;
+}
+
+static void work(void) {
+#define MAX_SERVERS_TO_POLL 100
+  struct pollfd fds[MAX_SERVERS_TO_POLL];
+  int max_poll;
+
+  for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
+  {
+    fds[max_poll].events= POLLIN;
+    fds[max_poll].revents= 0;
+    fds[max_poll].fd= server_sockets[max_poll];
+    ++max_poll;
+  }
+
+  while (true)
+  {
+    int err= poll(fds, (nfds_t)max_poll, -1);
+
+    if (err == 0 || (err == -1 && errno != EINTR))
+    {
+      perror("poll() failed");
+      abort();
+    }
+
+    /* find the available filedescriptors */
+    for (int x= max_poll - 1; x > -1 && err > 0; --x)
+    {
+      if (fds[x].revents != 0)
+      {
+        --err;
+        if (x < num_server_sockets)
+        {
+          /* accept new client */
+          struct sockaddr_storage addr;
+          socklen_t addrlen= sizeof(addr);
+          int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
+                           &addrlen);
+
+          if (sock == -1)
+          {
+            perror("Failed to accept client");
+            continue;
+          }
+
+          struct memcached_binary_protocol_st *protocol;
+          protocol= socket_userdata_map[fds[x].fd];
+
+          struct memcached_binary_protocol_client_st* c;
+          c= memcached_binary_protocol_create_client(protocol, sock);
+          if (c == NULL)
+          {
+            fprintf(stderr, "Failed to create client\n");
+            close(sock);
+          }
+          else
+          {
+            socket_userdata_map[sock]= c;
+            fds[max_poll].events= POLLIN;
+            fds[max_poll].revents= 0;
+            fds[max_poll].fd= sock;
+            ++max_poll;
+          }
+        }
+        else
+        {
+          /* drive the client */
+          struct memcached_binary_protocol_client_st* c;
+          c= socket_userdata_map[fds[x].fd];
+          assert(c != NULL);
+          fds[max_poll].events= 0;
+
+          switch (memcached_binary_protocol_client_work(c))
+          {
+          case WRITE_EVENT:
+          case READ_WRITE_EVENT:
+            fds[max_poll].events= POLLOUT;
+            /* FALLTHROUGH */
+          case READ_EVENT:
+            fds[max_poll].events |= POLLIN;
+            break;
+          case ERROR_EVENT:
+          default: /* ERROR or unknown state.. close */
+            memcached_binary_protocol_client_destroy(c);
+            close(fds[x].fd);
+            fds[x].events= 0;
+
+            if (x != max_poll - 1)
+              memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
+
+            --max_poll;
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/example/storage.c b/example/storage.c
new file mode 100644 (file)
index 0000000..1830496
--- /dev/null
@@ -0,0 +1,147 @@
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#include <stdlib.h>
+#include <inttypes.h>
+#include <time.h>
+#include <stdbool.h>
+#include <string.h>
+#include "storage.h"
+
+struct list_entry {
+  struct item item;
+  struct list_entry *next;
+  struct list_entry *prev;
+};
+
+static struct list_entry *root;
+static uint64_t cas;
+
+void put_item(struct item* item) {
+  struct list_entry* entry= (void*)item;
+  update_cas(item);
+  
+  if (root == NULL) 
+  {
+    entry->next= entry->prev= entry;
+  } 
+  else 
+  {
+    entry->prev= root->prev;
+    entry->next= root;
+    entry->prev->next= entry;
+    entry->next->prev= entry;
+  }
+  
+  root= entry;
+}
+
+struct item* get_item(const void* key, size_t nkey) {
+  struct list_entry *walker= root;
+  if (root == NULL) 
+  {
+    return NULL;
+  }  
+  
+  do 
+  {
+    if (((struct item*)walker)->nkey == nkey && 
+        memcmp(((struct item*)walker)->key, key, nkey) == 0) 
+    {
+      return (struct item*)walker;
+    }
+    walker= walker->next;
+  } while (walker != root);
+
+  return NULL;
+}
+
+struct item* create_item(const void* key, size_t nkey, const void* data, 
+                         size_t size, uint32_t flags, time_t exp)
+{
+  struct item* ret= calloc(1, sizeof(struct list_entry));
+  if (ret != NULL) 
+  {
+    ret->key= malloc(nkey);
+    if (size > 0) 
+    {
+      ret->data= malloc(size);
+    }
+
+    if (ret->key == NULL || (size > 0 && ret->data == NULL)) 
+    {
+      free(ret->key);
+      free(ret->data);
+      free(ret);
+      return NULL;
+    }
+
+    memcpy(ret->key, key, nkey);
+    if (data != NULL) 
+    {
+      memcpy(ret->data, data, size);
+    }
+
+    ret->nkey= nkey;
+    ret->size= size;
+    ret->flags= flags;
+    ret->exp= exp;
+  }
+
+  return ret;
+}
+
+bool delete_item(const void* key, size_t nkey) {
+  struct item* item= get_item(key, nkey);
+  bool ret= false;
+
+  if (item) 
+  {
+    /* remove from linked list */
+    struct list_entry *entry= (void*)item;
+      
+    if (entry->next == entry) 
+    {
+      /* Only one object in the list */
+      root= NULL;
+    }
+    else
+    { 
+      /* ensure that we don't loose track of the root, and this will
+       * change the start position for the next search ;-) */
+      root= entry->next;
+      entry->prev->next= entry->next;
+      entry->next->prev= entry->prev;
+    }
+
+    free(item->key);
+    free(item->data);
+    free(item);
+    ret= true;
+  }
+
+  return ret;
+}
+
+void flush(uint32_t when) {
+  /* FIXME */
+  (void)when;
+  /* remove the complete linked list */
+  if (root == NULL) 
+  {
+    return;
+  }
+
+  root->prev->next= NULL;
+  while (root != NULL) 
+  {
+    struct item* tmp= (void*)root;
+    root= root->next;
+      
+    free(tmp->key);
+    free(tmp->data);
+    free(tmp);
+  }
+}
+
+void update_cas(struct item* item) {
+  item->cas= ++cas;
+}
diff --git a/example/storage.h b/example/storage.h
new file mode 100644 (file)
index 0000000..fbe7547
--- /dev/null
@@ -0,0 +1,23 @@
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#ifndef STORAGE_H
+#define STORAGE_H
+
+struct item {
+  uint64_t cas;
+  void* key;
+  size_t nkey;
+  void* data;
+  size_t size;
+  uint32_t flags;
+  time_t exp;
+};
+
+void update_cas(struct item* item);
+void put_item(struct item* item);
+struct item* get_item(const void* key, size_t nkey);
+struct item* create_item(const void* key, size_t nkey, const void *data, 
+                         size_t size, uint32_t flags, time_t exp);
+bool delete_item(const void* key, size_t nkey);
+void flush(uint32_t when);
+
+#endif
index 087290e986d33dfae258a71fd01e98e98012c1b5..165122d0cd9b941f57eec548f2ca9c1d6cde308d 100644 (file)
@@ -1,6 +1,8 @@
 EXTRA_DIST = libmemcached_probes.d memcached/README.txt libmemcached.ver \
              memcached_configure.h.in
 
+SUBDIRS = protocol
+
 EXTRA_HEADERS = 
 BUILT_SOURCES= 
 
@@ -21,8 +23,12 @@ pkginclude_HEADERS= memcached.h \
                    memcached_string.h \
                    memcached_types.h \
                    memcached_watchpoint.h \
+                    protocol_handler.h \
                    visibility.h
 
+nobase_pkginclude_HEADERS=protocol/cache.h \
+                    protocol/callback.h
+
 
 if BUILD_LIBMEMCACHEDUTIL
 pkginclude_HEADERS+= memcached_util.h memcached_pool.h
index 08df72e8b10fe0c505086443c8409f49ecd6a118..74cdca52d61d3a284536fa744bb6169d13a61e5c 100644 (file)
@@ -69,7 +69,10 @@ extern "C"
     PROTOCOL_BINARY_RESPONSE_EINVAL = 0x04,
     PROTOCOL_BINARY_RESPONSE_NOT_STORED = 0x05,
     PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND = 0x81,
-    PROTOCOL_BINARY_RESPONSE_ENOMEM = 0x82
+    PROTOCOL_BINARY_RESPONSE_ENOMEM = 0x82,
+
+
+    PROTOCOL_BINARY_RESPONSE_EIO = 0xff
   } protocol_binary_response_status;
 
   /**
diff --git a/libmemcached/protocol/Makefile.am b/libmemcached/protocol/Makefile.am
new file mode 100644 (file)
index 0000000..fb4b0a7
--- /dev/null
@@ -0,0 +1,8 @@
+EXTRA_DIST= libmemcachedprotocol.ver
+
+lib_LTLIBRARIES=libmemcachedprotocol.la
+
+noinst_HEADERS = common.h 
+
+libmemcachedprotocol_la_SOURCES= protocol_handler.c cache.c pedantic.c
+libmemcachedprotocol_la_LDFLAGS= -version-info $(MEMCACHEDPROTOCOL_LIBRARY_VERSION) $(LD_PROTOCOL_VERSION_SCRIPT)
diff --git a/libmemcached/protocol/cache.c b/libmemcached/protocol/cache.c
new file mode 100644 (file)
index 0000000..c7c8a61
--- /dev/null
@@ -0,0 +1,149 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#include <stdlib.h>
+#include <string.h>
+#include <inttypes.h>
+
+#ifndef NDEBUG
+#include <signal.h>
+#endif
+
+#include "cache.h"
+
+#ifndef NDEBUG
+const uint64_t redzone_pattern = 0xdeadbeefcafebabe;
+int cache_error = 0;
+#endif
+
+const size_t initial_pool_size = 64;
+
+cache_t* cache_create(const char *name, size_t bufsize, size_t align,
+                      cache_constructor_t* constructor,
+                      cache_destructor_t* destructor) {
+    cache_t* ret = calloc(1, sizeof(cache_t));
+    char* nm = strdup(name);
+    void** ptr = calloc(initial_pool_size, bufsize);
+    if (ret == NULL || nm == NULL || ptr == NULL ||
+        pthread_mutex_init(&ret->mutex, NULL) == -1) {
+        free(ret);
+        free(nm);
+        free(ptr);
+        return NULL;
+    }
+
+    ret->name = nm;
+    ret->ptr = ptr;
+    ret->freetotal = initial_pool_size;
+    ret->constructor = constructor;
+    ret->destructor = destructor;
+
+#ifndef NDEBUG
+    ret->bufsize = bufsize + 2 * sizeof(redzone_pattern);
+#else
+    ret->bufsize = bufsize;
+#endif
+
+    (void)align;
+
+    return ret;
+}
+
+static inline void* get_object(void *ptr) {
+#ifndef NDEBUG
+    uint64_t *pre = ptr;
+    return pre + 1;
+#else
+    return ptr;
+#endif
+}
+
+void cache_destroy(cache_t *cache) {
+    while (cache->freecurr > 0) {
+        void *ptr = cache->ptr[--cache->freecurr];
+        if (cache->destructor) {
+            cache->destructor(get_object(ptr), NULL);
+        }
+        free(ptr);
+    }
+    free(cache->name);
+    free(cache->ptr);
+    pthread_mutex_destroy(&cache->mutex);
+}
+
+void* cache_alloc(cache_t *cache) {
+    void *ret;
+    void *object;
+    pthread_mutex_lock(&cache->mutex);
+    if (cache->freecurr > 0) {
+        ret = cache->ptr[--cache->freecurr];
+        object = get_object(ret);
+    } else {
+        object = ret = malloc(cache->bufsize);
+        if (ret != NULL) {
+            object = get_object(ret);
+
+            if (cache->constructor != NULL &&
+                cache->constructor(object, NULL, 0) != 0) {
+                free(ret);
+                object = NULL;
+            }
+        }
+    }
+    pthread_mutex_unlock(&cache->mutex);
+
+#ifndef NDEBUG
+    if (object != NULL) {
+        /* add a simple form of buffer-check */
+        uint64_t *pre = ret;
+        *pre = redzone_pattern;
+        ret = pre+1;
+        memcpy(((char*)ret) + cache->bufsize - (2 * sizeof(redzone_pattern)),
+               &redzone_pattern, sizeof(redzone_pattern));
+    }
+#endif
+
+    return object;
+}
+
+void cache_free(cache_t *cache, void *ptr) {
+    pthread_mutex_lock(&cache->mutex);
+
+#ifndef NDEBUG
+    /* validate redzone... */
+    if (memcmp(((char*)ptr) + cache->bufsize - (2 * sizeof(redzone_pattern)),
+               &redzone_pattern, sizeof(redzone_pattern)) != 0) {
+        raise(SIGABRT);
+        cache_error = 1;
+        pthread_mutex_unlock(&cache->mutex);
+        return;
+    }
+    uint64_t *pre = ptr;
+    --pre;
+    if (*pre != redzone_pattern) {
+        raise(SIGABRT);
+        cache_error = -1;
+        pthread_mutex_unlock(&cache->mutex);
+        return;
+    }
+    ptr = pre;
+#endif
+    if (cache->freecurr < cache->freetotal) {
+        cache->ptr[cache->freecurr++] = ptr;
+    } else {
+        /* try to enlarge free connections array */
+        size_t newtotal = cache->freetotal * 2;
+        void **new_free = realloc(cache->ptr, sizeof(char *) * newtotal);
+        if (new_free) {
+            cache->freetotal = newtotal;
+            cache->ptr = new_free;
+            cache->ptr[cache->freecurr++] = ptr;
+        } else {
+            if (cache->destructor) {
+                cache->destructor(ptr, NULL);
+            }
+            free(ptr);
+
+        }
+    }
+    pthread_mutex_unlock(&cache->mutex);
+}
+
diff --git a/libmemcached/protocol/cache.h b/libmemcached/protocol/cache.h
new file mode 100644 (file)
index 0000000..a96fba9
--- /dev/null
@@ -0,0 +1,116 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#ifndef CACHE_H
+#define CACHE_H
+#include <pthread.h>
+
+#ifdef HAVE_UMEM_H
+#include <umem.h>
+#define cache_t umem_cache_t
+#define cache_alloc(a) umem_cache_alloc(a, UMEM_DEFAULT)
+#define cache_free(a, b) umem_cache_free(a, b)
+#define cache_create(a,b,c,d,e) umem_cache_create((char*)a, b, c, d, e, NULL, NULL, NULL, 0)
+#define cache_destroy(a) umem_cache_destroy(a);
+
+#else
+
+#ifndef NDEBUG
+/* may be used for debug purposes */
+extern int cache_error;
+#endif
+
+/**
+ * Constructor used to initialize allocated objects
+ *
+ * @param obj pointer to the object to initialized.
+ * @param notused1 This parameter is currently not used.
+ * @param notused2 This parameter is currently not used.
+ * @return you should return 0, but currently this is not checked
+ */
+typedef int cache_constructor_t(void* obj, void* notused1, int notused2);
+/**
+ * Destructor used to clean up allocated objects before they are
+ * returned to the operating system.
+ *
+ * @param obj pointer to the object to initialized.
+ * @param notused1 This parameter is currently not used.
+ * @param notused2 This parameter is currently not used.
+ * @return you should return 0, but currently this is not checked
+ */
+typedef void cache_destructor_t(void* obj, void* notused);
+
+/**
+ * Definition of the structure to keep track of the internal details of
+ * the cache allocator. Touching any of these variables results in
+ * undefined behavior.
+ */
+typedef struct {
+    /** Mutex to protect access to the structure */
+    pthread_mutex_t mutex;
+    /** Name of the cache objects in this cache (provided by the caller) */
+    char *name;
+    /** List of pointers to available buffers in this cache */
+    void **ptr;
+    /** The size of each element in this cache */
+    size_t bufsize;
+    /** The capacity of the list of elements */
+    size_t freetotal;
+    /** The current number of free elements */
+    size_t freecurr;
+    /** The constructor to be called each time we allocate more memory */
+    cache_constructor_t* constructor;
+    /** The destructor to be called each time before we release memory */
+    cache_destructor_t* destructor;
+} cache_t;
+
+/**
+ * Create an object cache.
+ *
+ * The object cache will let you allocate objects of the same size. It is fully
+ * MT safe, so you may allocate objects from multiple threads without having to
+ * do any syncrhonization in the application code.
+ *
+ * @param name the name of the object cache. This name may be used for debug purposes
+ *             and may help you track down what kind of object you have problems with
+ *             (buffer overruns, leakage etc)
+ * @param bufsize the size of each object in the cache
+ * @param align the alignment requirements of the objects in the cache.
+ * @param constructor the function to be called to initialize memory when we need
+ *                    to allocate more memory from the os.
+ * @param destructor the function to be called before we release the memory back
+ *                   to the os.
+ * @return a handle to an object cache if successful, NULL otherwise.
+ */
+cache_t* cache_create(const char* name, size_t bufsize, size_t align,
+                      cache_constructor_t* constructor,
+                      cache_destructor_t* destructor);
+/**
+ * Destroy an object cache.
+ *
+ * Destroy and invalidate an object cache. You should return all buffers allocated
+ * with cache_alloc by using cache_free before calling this function. Not doing
+ * so results in undefined behavior (the buffers may or may not be invalidated)
+ *
+ * @param handle the handle to the object cache to destroy.
+ */
+void cache_destroy(cache_t* handle);
+/**
+ * Allocate an object from the cache.
+ *
+ * @param handle the handle to the object cache to allocate from
+ * @return a pointer to an initialized object from the cache, or NULL if
+ *         the allocation cannot be satisfied.
+ */
+void* cache_alloc(cache_t* handle);
+/**
+ * Return an object back to the cache.
+ *
+ * The caller should return the object in an initialized state so that
+ * the object may be returned in an expected state from cache_alloc.
+ *
+ * @param handle handle to the object cache to return the object to
+ * @param ptr pointer to the object to return.
+ */
+void cache_free(cache_t* handle, void* ptr);
+#endif
+
+#endif
diff --git a/libmemcached/protocol/callback.h b/libmemcached/protocol/callback.h
new file mode 100644 (file)
index 0000000..c1f8c63
--- /dev/null
@@ -0,0 +1,404 @@
+/*
+ * Summary: Definition of the callback interface
+ *
+ * Copy: See Copyright for the status of this software.
+ *
+ * Author: Trond Norbye
+ */
+#ifndef LIBMEMCACHEDPROTOCOL_CALLBACK_H
+#define LIBMEMCACHEDPROTOCOL_CALLBACK_H
+
+/**
+ * Callback to send data back from a successful GET/GETQ/GETK/GETKQ command
+ *
+ * @param cookie Just pass along the cookie supplied in the callback
+ * @param key What to insert as key in the reply
+ * @param keylen The length of the key
+ * @param body What to store in the body of the package
+ * @param bodylen The number of bytes of the body
+ * @param flags The flags stored with the item
+ * @param cas The CAS value to insert into the response (should be 0
+ *            if you don't care)
+ */
+typedef protocol_binary_response_status 
+(*memcached_binary_protocol_get_response_handler)(const void *cookie,
+                                                  const void *key,
+                                                  uint16_t keylen,
+                                                  const void *body,
+                                                  uint32_t bodylen,
+                                                  uint32_t flags,
+                                                  uint64_t cas);
+/**
+ * Callback to send data back from a STAT command
+ * 
+ * @param cookie Just pass along the cookie supplied in the callback
+ * @param key What to insert as key in the reply
+ * @param keylen The length of the key
+ * @param body What to store in the body of the package
+ * @param bodylen The number of bytes of the body
+ */
+typedef protocol_binary_response_status 
+(*memcached_binary_protocol_stat_response_handler)(const void *cookie,
+                                                   const void *key,
+                                                   uint16_t keylen,
+                                                   const void *body,
+                                                   uint32_t bodylen);
+/**
+ * Callback to send data back from a VERSION command
+ * 
+ * @param cookie Just pass along the cookie supplied in the callback
+ * @param text The version string
+ * @param length The number of bytes in the version string
+ */
+typedef protocol_binary_response_status 
+(*memcached_binary_protocol_version_response_handler)(const void *cookie,
+                                                      const void *text,
+                                                      uint32_t length);
+
+
+/**
+ * In the low level interface you need to format the response
+ * packet yourself (giving you complete freedom :-) 
+ *
+ * @param cookie Just pass along the cookie supplied in the callback
+ * @param request Pointer to the request packet you are sending a reply to
+ * @param response Pointer to the response packet to send
+ *
+ */
+typedef protocol_binary_response_status (*memcached_binary_protocol_raw_response_handler)(const void *cookie,
+                                                               protocol_binary_request_header *request,
+                                                               protocol_binary_response_header *response);
+
+/**
+ * In the low lever interface you have to do most of the work by
+ * yourself, but it also gives you a lot of freedom :-)
+ * @param cookie identification for this connection, just pass it along to 
+ *               the response handler
+ * @param header the command received over the wire. Never try to access
+ *               <u>anything</u> outside the command.
+ * @param resonse_handler call this function to send data back to the client
+ */
+typedef protocol_binary_response_status (*memcached_binary_protocol_command_handler)(const void *cookie,
+                                                   protocol_binary_request_header *header,
+                                                   memcached_binary_protocol_raw_response_handler response_handler);
+
+/**
+ * The raw interface to the packets is implemented in version 0. It contains
+ * just an array with command handlers. The inxed in the array is the 
+ * com code.
+ */
+struct memcached_binary_protocol_callback_v0_st {
+   memcached_binary_protocol_command_handler comcode[256];
+};
+
+/**
+ * The first version of the callback struct containing all of the
+ * documented commands in the initial release of the binary protocol
+ * (aka. memcached 1.4.0).
+ * 
+ * You might miss the Q commands (addq etc) but the response function
+ * knows how to deal with them so you don't need to worry about that :-)
+ */
+struct memcached_binary_protocol_callback_v1_st {
+   /**
+    * Add an item to the cache
+    * @param cookie id of the client receiving the command
+    * @param key the key to add
+    * @param len the length of the key
+    * @param val the value to store for the key (may be NIL)
+    * @param vallen the length of the data
+    * @param flags the flags to store with the key
+    * @param exptime the expiry time for the key-value pair
+    * @param cas the resulting cas for the add operation (if success)
+    */
+   protocol_binary_response_status (*add)(const void *cookie, 
+                                          const void *key, 
+                                          uint16_t keylen,
+                                          const void* val, 
+                                          uint32_t vallen, 
+                                          uint32_t flags, 
+                                          uint32_t exptime, 
+                                          uint64_t *cas);
+
+   /**
+    * Append data to an <b>existing</b> key-value pair.
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to add data to
+    * @param len the length of the key
+    * @param val the value to append to the value
+    * @param vallen the length of the data
+    * @param cas the CAS in the request
+    * @param result_cas the resulting cas for the append operation
+    * 
+    */
+   protocol_binary_response_status (*append)(const void *cookie, 
+                                             const void *key, 
+                                             uint16_t keylen, 
+                                             const void* val, 
+                                             uint32_t vallen, 
+                                             uint64_t cas,
+                                             uint64_t *result_cas);
+
+   /**
+    * Decrement the value for a key
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to decrement the value for
+    * @param len the length of the key
+    * @param delta the amount to decrement
+    * @param initial initial value to store (if the key doesn't exist)
+    * @param expiration expiration time for the object (if the key doesn't exist)
+    * @param cas the CAS in the request
+    * @param result the result from the decrement
+    * @param result_cas the cas of the item
+    * 
+    */
+   protocol_binary_response_status (*decrement)(const void *cookie, 
+                                                const void *key, 
+                                                uint16_t keylen, 
+                                                uint64_t delta, 
+                                                uint64_t initial, 
+                                                uint32_t expiration,
+                                                uint64_t *result,
+                                                uint64_t *result_cas);
+
+   /**
+    * Delete an existing key
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to delete
+    * @param len the length of the key
+    * @param cas the CAS in the request
+    */
+   protocol_binary_response_status (*delete)(const void *cookie, 
+                                             const void *key, 
+                                             uint16_t keylen, 
+                                             uint64_t cas);
+
+
+   /**
+    * Flush the cache
+    *
+    * @param cookie id of the client receiving the command
+    * @param when when the cache should be flushed (0 == immediately)
+    */
+   protocol_binary_response_status (*flush)(const void *cookie, 
+                                            uint32_t when);
+
+
+
+   /**
+    * Get a key-value pair
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to get
+    * @param len the length of the key
+    * @param response_handler to send the result back to the client
+    */
+   protocol_binary_response_status (*get)(const void *cookie, 
+                                          const void *key, 
+                                          uint16_t keylen, 
+                                          memcached_binary_protocol_get_response_handler response_handler);
+
+   /**
+    * Increment the value for a key
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to increment the value on
+    * @param len the length of the key
+    * @param delta the amount to increment
+    * @param initial initial value to store (if the key doesn't exist)
+    * @param expiration expiration time for the object (if the key doesn't exist)
+    * @param cas the CAS in the request
+    * @param result the result from the decrement
+    * @param result_cas the cas of the item
+    * 
+    */
+   protocol_binary_response_status (*increment)(const void *cookie, 
+                                                const void *key, 
+                                                uint16_t keylen, 
+                                                uint64_t delta, 
+                                                uint64_t initial, 
+                                                uint32_t expiration, 
+                                                uint64_t *result,
+                                                uint64_t *result_cas);
+
+   /**
+    * The noop command was received. This is just a notification callback (the
+    * response is automatically created).
+    * 
+    * @param cookie id of the client receiving the command
+    */
+   protocol_binary_response_status (*noop)(const void *cookie);
+
+   /**
+    * Prepend data to an <b>existing</b> key-value pair.
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to prepend data to
+    * @param len the length of the key
+    * @param val the value to prepend to the value
+    * @param vallen the length of the data
+    * @param cas the CAS in the request
+    * @param result-cas the cas id of the item
+    * 
+    */
+   protocol_binary_response_status (*prepend)(const void *cookie, 
+                                              const void *key, 
+                                              uint16_t keylen, 
+                                              const void* val, 
+                                              uint32_t vallen, 
+                                              uint64_t cas,
+                                              uint64_t *result_cas);
+
+   /**
+    * The quit command was received. This is just a notification callback (the
+    * response is automatically created).
+    * 
+    * @param cookie id of the client receiving the command
+    */
+   protocol_binary_response_status (*quit)(const void *cookie);
+
+
+   /**
+    * Replace an <b>existing</b> item to the cache
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to replace the content for
+    * @param len the length of the key
+    * @param val the value to store for the key (may be NIL)
+    * @param vallen the length of the data
+    * @param flags the flags to store with the key
+    * @param exptime the expiry time for the key-value pair
+    * @param cas the cas id in the request
+    * @param result_cas the cas id of the item
+    */
+   protocol_binary_response_status (*replace)(const void *cookie, 
+                                              const void *key, 
+                                              uint16_t keylen,
+                                              const void* val, 
+                                              uint32_t vallen, 
+                                              uint32_t flags, 
+                                              uint32_t exptime, 
+                                              uint64_t cas,
+                                              uint64_t *result_cas);
+
+
+   /**
+    * Set a key-value pair in the cache
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to insert
+    * @param len the length of the key
+    * @param val the value to store for the key (may be NIL)
+    * @param vallen the length of the data
+    * @param flags the flags to store with the key
+    * @param exptime the expiry time for the key-value pair
+    * @param cas the cas id in the request
+    * @param result_cas the cas id of the new item
+    */
+   protocol_binary_response_status (*set)(const void *cookie, 
+                                          const void *key, 
+                                          uint16_t keylen,
+                                          const void* val, 
+                                          uint32_t vallen, 
+                                          uint32_t flags, 
+                                          uint32_t exptime, 
+                                          uint64_t cas,
+                                          uint64_t *result_cas);
+
+   /**
+    * Get status information
+    *
+    * @param cookie id of the client receiving the command
+    * @param key the key to get status for (or NIL to request all status).
+    *            Remember to insert the terminating packet if multiple
+    *            packets should be returned.
+    * @param keylen the length of the key
+    * @param response_handler to send the result back to the client, but
+    *                         don't send reply on success!
+    * 
+    */
+   protocol_binary_response_status (*stat)(const void *cookie, 
+                                           const void *key, 
+                                           uint16_t keylen, 
+                                           memcached_binary_protocol_stat_response_handler response_handler);
+
+   /**
+    * Get the version information
+    *
+    * @param cookie id of the client receiving the command
+    * @param response_handler to send the result back to the client, but
+    *                         don't send reply on success!
+    * 
+    */
+   protocol_binary_response_status (*version)(const void *cookie, 
+                                              memcached_binary_protocol_version_response_handler response_handler);
+};
+
+/**
+ * 
+ */
+struct memcached_binary_protocol_callback_st {
+   /**
+    * The interface version used (set to 0 if you don't have any specialized
+    * command handlers).
+    */
+   uint64_t interface_version;
+
+   /**
+    * Callback fired just before the command will be executed.
+    *
+    * @param cookie id of the client receiving the command
+    * @param header the command header as received on the wire. If you look
+    *               at the content you <b>must</b> ensure that you don't
+    *               try to access beyond the end of the message.
+    */
+   void (*pre_execute)(const void *cookie, 
+                       protocol_binary_request_header *header);
+   /**
+    * Callback fired just after the command was exected (please note
+    * that the data transfer back to the client is not finished at this
+    * time).
+    *
+    * @param cookie id of the client receiving the command
+    * @param header the command header as received on the wire. If you look
+    *               at the content you <b>must</b> ensure that you don't
+    *               try to access beyond the end of the message.
+    */
+   void (*post_execute)(const void *cookie,
+                        protocol_binary_request_header *header);
+
+   /**
+    * Callback fired if no specialized callback is registered for this
+    * specific command code.
+    *
+    * @param cookie id of the client receiving the command
+    * @param header the command header as received on the wire. You <b>must</b>
+    *               ensure that you don't try to access beyond the end of the
+    *               message.
+    * @param response_handler The response handler to send data back.
+    */
+   protocol_binary_response_status (*unknown)(const void *cookie,
+                                              protocol_binary_request_header *header,
+                                              memcached_binary_protocol_raw_response_handler response_handler);
+
+   /**
+    * The different interface levels we support. A pointer is used so the
+    * size of the structure is fixed. You must ensure that the memory area
+    * passed as the pointer is valid as long as you use the protocol handler.
+    */
+   union {
+      struct memcached_binary_protocol_callback_v0_st v0;
+
+      /**
+       * The first version of the callback struct containing all of the
+       * documented commands in the initial release of the binary protocol
+       * (aka. memcached 1.4.0).
+       */
+      struct memcached_binary_protocol_callback_v1_st v1;
+   } interface;
+};
+
+#endif
diff --git a/libmemcached/protocol/common.h b/libmemcached/protocol/common.h
new file mode 100644 (file)
index 0000000..036a13f
--- /dev/null
@@ -0,0 +1,93 @@
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#ifndef MEMCACHED_PROTOCOL_INTERNAL_H
+#define MEMCACHED_PROTOCOL_INTERNAL_H
+
+#include "config.h"
+#include <stdbool.h>
+#include <assert.h>
+#include <netinet/in.h>
+
+#ifdef linux
+/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
+ * optimize the conversion functions, but the prototypes generate warnings
+ * from gcc. The conversion methods isn't the bottleneck for my app, so
+ * just remove the warnings by undef'ing the optimization ..
+ */
+#undef ntohs
+#undef ntohl
+#undef htons
+#undef htonl
+
+#endif
+
+
+/* Define this here, which will turn on the visibilty controls while we're
+ * building libmemcached.
+ */
+#define BUILDING_LIBMEMCACHED 1
+
+#include <libmemcached/protocol_handler.h>
+#include <libmemcached/protocol/cache.h>
+
+struct memcached_binary_protocol_st {
+  struct memcached_binary_protocol_callback_st *callback;
+  memcached_binary_protocol_recv_func recv;
+  memcached_binary_protocol_send_func send;
+  char *input_buffer;
+  size_t input_buffer_size;
+  bool pedantic;
+  /* @todo use multiple sized buffers */
+  cache_t *buffer_cache;
+};
+
+
+struct chunk_st {
+  /* Pointer to the data */
+  char *data;
+  /* The offset to the first byte into the buffer that is used */
+  size_t offset;
+  /* The offset into the buffer for the first free byte */
+  size_t nbytes;
+  /* The number of bytes in the buffer */
+  size_t size;
+  /* Pointer to the next buffer in the chain */
+  struct chunk_st *next;
+};
+
+#define CHUNK_BUFFERSIZE 2048
+
+struct memcached_binary_protocol_client_st {
+  struct memcached_binary_protocol_st *root;
+  int sock;
+  int error;
+
+  /* Linked list of data to send */
+  struct chunk_st *output;
+  struct chunk_st *output_tail;
+
+
+
+  char *input_buffer;
+  size_t input_buffer_size;
+  size_t input_buffer_offset;
+  char *curr_input;
+
+
+  struct chunk_st *input;
+  /* Pointer to the last chunk to avoid the need to traverse the complete list */
+  struct chunk_st *input_tail;
+  size_t bytes_available;
+
+  protocol_binary_request_header *current_command;
+  bool quiet;
+};
+
+LIBMEMCACHED_LOCAL
+bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_header *request);
+
+LIBMEMCACHED_LOCAL
+bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_header *request,
+                                                       protocol_binary_response_header *response);
+
+
+#endif
diff --git a/libmemcached/protocol/libmemcachedprotocol.ver b/libmemcached/protocol/libmemcachedprotocol.ver
new file mode 100644 (file)
index 0000000..0909018
--- /dev/null
@@ -0,0 +1 @@
+libmemcachedprotocol_0 { global: *; };
diff --git a/libmemcached/protocol/pedantic.c b/libmemcached/protocol/pedantic.c
new file mode 100644 (file)
index 0000000..884f672
--- /dev/null
@@ -0,0 +1,203 @@
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#include "common.h"
+
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <inttypes.h>
+
+#define ensure(a) if (!(a)) { return false; }
+
+bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_header *request)
+{
+  ensure(request->request.magic == PROTOCOL_BINARY_REQ);
+  ensure(request->request.datatype == PROTOCOL_BINARY_RAW_BYTES);
+
+  ensure(request->bytes[6] == 0);
+  ensure(request->bytes[7] == 0);
+
+  uint8_t opcode= request->request.opcode;
+  uint16_t keylen= ntohs(request->request.keylen);
+  uint8_t extlen= request->request.extlen;
+  uint32_t bodylen= ntohl(request->request.bodylen);
+  
+  ensure(bodylen >= (keylen + extlen));
+
+  switch (opcode) {
+  case PROTOCOL_BINARY_CMD_GET:
+  case PROTOCOL_BINARY_CMD_GETK:
+  case PROTOCOL_BINARY_CMD_GETKQ:
+  case PROTOCOL_BINARY_CMD_GETQ:
+    ensure(extlen == 0);
+    ensure(keylen > 0);
+    ensure(keylen == bodylen);
+    ensure(request->request.cas == 0);
+    break;
+
+  case PROTOCOL_BINARY_CMD_ADD:
+  case PROTOCOL_BINARY_CMD_ADDQ:
+    /* it makes no sense to run add with a cas value */
+    ensure(request->request.cas == 0);
+    /* FALLTHROUGH */
+  case PROTOCOL_BINARY_CMD_SET:
+  case PROTOCOL_BINARY_CMD_SETQ:
+  case PROTOCOL_BINARY_CMD_REPLACE:
+  case PROTOCOL_BINARY_CMD_REPLACEQ:
+    ensure(keylen > 0);
+    ensure(extlen == 8);
+    break;
+
+  case PROTOCOL_BINARY_CMD_DELETE:
+  case PROTOCOL_BINARY_CMD_DELETEQ:
+    ensure(extlen == 0);
+    ensure(keylen > 0);
+    ensure(keylen == bodylen);
+    break;
+
+  case PROTOCOL_BINARY_CMD_INCREMENT:
+  case PROTOCOL_BINARY_CMD_INCREMENTQ:
+  case PROTOCOL_BINARY_CMD_DECREMENT:
+  case PROTOCOL_BINARY_CMD_DECREMENTQ:
+    ensure(extlen == 20);
+    ensure(keylen > 0);
+    ensure(keylen + extlen == bodylen);
+    break;
+
+  case PROTOCOL_BINARY_CMD_QUIT:
+  case PROTOCOL_BINARY_CMD_QUITQ:
+  case PROTOCOL_BINARY_CMD_NOOP:
+  case PROTOCOL_BINARY_CMD_VERSION:
+    ensure(extlen == 0);
+    ensure(keylen == 0);
+    ensure(bodylen == 0);
+    break;
+
+  case PROTOCOL_BINARY_CMD_FLUSH:
+  case PROTOCOL_BINARY_CMD_FLUSHQ:
+    ensure(extlen == 0 || extlen == 4);
+    ensure(keylen == 0);
+    ensure(bodylen == extlen);
+    break;
+
+  case PROTOCOL_BINARY_CMD_STAT:
+    ensure(extlen == 0);
+    /* May have key, but not value */
+    ensure(keylen == bodylen);
+    break;
+      
+  case PROTOCOL_BINARY_CMD_APPEND:
+  case PROTOCOL_BINARY_CMD_APPENDQ:
+  case PROTOCOL_BINARY_CMD_PREPEND:
+  case PROTOCOL_BINARY_CMD_PREPENDQ:
+    ensure(extlen == 0);
+    ensure(keylen > 0);
+    break;
+  default:
+    /* Unknown command */
+    ;
+  }
+
+  return true;
+}
+
+bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_header *request,
+                                                       protocol_binary_response_header *response)
+{
+  ensure(response->response.magic == PROTOCOL_BINARY_RES);
+  ensure(response->response.datatype == PROTOCOL_BINARY_RAW_BYTES);
+  ensure(response->response.opaque == request->request.opaque);
+
+  uint16_t status= ntohs(response->response.status);
+  uint8_t opcode= response->response.opcode;
+
+  if (status == PROTOCOL_BINARY_RESPONSE_SUCCESS) 
+  {
+    switch (opcode) {
+    case PROTOCOL_BINARY_CMD_ADDQ:
+    case PROTOCOL_BINARY_CMD_APPENDQ:
+    case PROTOCOL_BINARY_CMD_DECREMENTQ:
+    case PROTOCOL_BINARY_CMD_DELETEQ:
+    case PROTOCOL_BINARY_CMD_FLUSHQ:
+    case PROTOCOL_BINARY_CMD_INCREMENTQ:
+    case PROTOCOL_BINARY_CMD_PREPENDQ:
+    case PROTOCOL_BINARY_CMD_QUITQ:
+    case PROTOCOL_BINARY_CMD_REPLACEQ:
+    case PROTOCOL_BINARY_CMD_SETQ:
+      /* Quiet command shouldn't return on success */
+      return false;
+    default:
+      break;
+    }
+
+    switch (opcode) {
+    case PROTOCOL_BINARY_CMD_ADD:
+    case PROTOCOL_BINARY_CMD_REPLACE:
+    case PROTOCOL_BINARY_CMD_SET:
+    case PROTOCOL_BINARY_CMD_APPEND:
+    case PROTOCOL_BINARY_CMD_PREPEND:
+      ensure(response->response.keylen == 0);
+      ensure(response->response.extlen == 0);
+      ensure(response->response.bodylen == 0);
+      ensure(response->response.cas != 0);
+      break;
+    case PROTOCOL_BINARY_CMD_FLUSH:
+    case PROTOCOL_BINARY_CMD_NOOP:
+    case PROTOCOL_BINARY_CMD_QUIT:
+    case PROTOCOL_BINARY_CMD_DELETE:
+      ensure(response->response.keylen == 0);
+      ensure(response->response.extlen == 0);
+      ensure(response->response.bodylen == 0);
+      ensure(response->response.cas == 0);
+      break;
+
+    case PROTOCOL_BINARY_CMD_DECREMENT:
+    case PROTOCOL_BINARY_CMD_INCREMENT:
+      ensure(response->response.keylen == 0);
+      ensure(response->response.extlen == 0);
+      ensure(ntohl(response->response.bodylen) == 8);
+      ensure(response->response.cas != 0);
+      break;
+
+    case PROTOCOL_BINARY_CMD_STAT:
+      ensure(response->response.extlen == 0);
+      /* key and value exists in all packets except in the terminating */
+      ensure(response->response.cas == 0);
+      break;
+
+    case PROTOCOL_BINARY_CMD_VERSION:
+      ensure(response->response.keylen == 0);
+      ensure(response->response.extlen == 0);
+      ensure(response->response.bodylen != 0);
+      ensure(response->response.cas == 0);
+      break;
+
+    case PROTOCOL_BINARY_CMD_GET:
+    case PROTOCOL_BINARY_CMD_GETQ:
+      ensure(response->response.keylen == 0);
+      ensure(response->response.extlen == 4);
+      ensure(response->response.cas != 0);
+      break;
+
+    case PROTOCOL_BINARY_CMD_GETK:
+    case PROTOCOL_BINARY_CMD_GETKQ:
+      ensure(response->response.keylen != 0);
+      ensure(response->response.extlen == 4);
+      ensure(response->response.cas != 0);
+      break;
+
+    default:
+      /* Undefined command code */
+      break;
+    }
+  }
+  else 
+  {
+    ensure(response->response.cas == 0);
+    ensure(response->response.extlen == 0);
+    if (opcode != PROTOCOL_BINARY_CMD_GETK) 
+    {
+      ensure(response->response.keylen == 0);
+    }
+  }
+
+  return true;
+}
diff --git a/libmemcached/protocol/protocol_handler.c b/libmemcached/protocol/protocol_handler.c
new file mode 100644 (file)
index 0000000..67f2b63
--- /dev/null
@@ -0,0 +1,1351 @@
+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#include "common.h"
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <stdbool.h>
+#include <string.h>
+#include <stdio.h>
+
+/*
+** **********************************************************************
+** INTERNAL INTERFACE
+** **********************************************************************
+*/
+
+/**
+ * The default function to receive data from the client. This function
+ * just wraps the recv function to receive from a socket.
+ * See man -s3socket recv for more information.
+ *
+ * @param cookie cookie indentifying a client, not used
+ * @param sock socket to read from
+ * @param buf the destination buffer
+ * @param nbytes the number of bytes to read
+ * @return the number of bytes transferred of -1 upon error
+ */
+static ssize_t default_recv(const void *cookie,
+                            int sock,
+                            void *buf,
+                            size_t nbytes)
+{
+  (void)cookie;
+  return recv(sock, buf, nbytes, 0);
+}
+
+/**
+ * The default function to send data to the server. This function
+ * just wraps the send function to send through a socket.
+ * See man -s3socket send for more information.
+ *
+ * @param cookie cookie indentifying a client, not used
+ * @param sock socket to send to
+ * @param buf the source buffer
+ * @param nbytes the number of bytes to send
+ * @return the number of bytes transferred of -1 upon error
+ */
+static ssize_t default_send(const void *cookie,
+                            int fd,
+                            const void *buf,
+                            size_t nbytes)
+{
+  (void)cookie;
+  return send(fd, buf, nbytes, 0);
+}
+
+/**
+ * Try to drain the output buffers without blocking
+ *
+ * @param client the client to drain
+ * @return false if an error occured (connection should be shut down)
+ *         true otherwise (please note that there may be more data to
+ *              left in the buffer to send)
+ */
+static bool drain_output(struct memcached_binary_protocol_client_st *client)
+{
+  ssize_t len;
+
+  // Do we have pending data to send?
+  while (client->output != NULL)
+  {
+    len= client->root->send(client,
+                            client->sock,
+                            client->output->data + client->output->offset,
+                            client->output->nbytes - client->output->offset);
+
+    if (len == -1)
+    {
+      if (errno == EWOULDBLOCK)
+      {
+        return true;
+      }
+      else if (errno != EINTR)
+      {
+        client->error= errno;
+        return false;
+      }
+    }
+    else
+    {
+      client->output->offset += (size_t)len;
+      if (client->output->offset == client->output->nbytes)
+      {
+        /* This was the complete buffer */
+        struct chunk_st *old= client->output;
+        client->output= client->output->next;
+        if (client->output == NULL)
+        {
+          client->output_tail= NULL;
+        }
+        cache_free(client->root->buffer_cache, old);
+      }
+    }
+  }
+
+  return true;
+}
+
+/**
+ * Allocate an output buffer and chain it into the output list
+ *
+ * @param client the client that needs the buffer
+ * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
+ */
+static struct chunk_st*
+allocate_output_chunk(struct memcached_binary_protocol_client_st *client)
+{
+  struct chunk_st*ret= cache_alloc(client->root->buffer_cache);
+  if (ret == NULL) {
+    return NULL;
+  }
+
+  ret->offset = ret->nbytes = 0;
+  ret->next = NULL;
+  ret->size = CHUNK_BUFFERSIZE;
+  ret->data= (void*)(ret + 1);
+  if (client->output == NULL)
+  {
+    client->output = client->output_tail = ret;
+  }
+  else
+  {
+    client->output_tail->next= ret;
+    client->output_tail= ret;
+  }
+
+  return ret;
+}
+
+/**
+ * Spool data into the send-buffer for a client.
+ *
+ * @param client the client to spool the data for
+ * @param data the data to spool
+ * @param length the number of bytes of data to spool
+ * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
+ *         PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
+ */
+static protocol_binary_response_status
+spool_output(struct memcached_binary_protocol_client_st *client,
+             const void *data,
+             size_t length)
+{
+  size_t offset = 0;
+
+  struct chunk_st *chunk= client->output;
+  while (offset < length)
+  {
+    if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
+    {
+      if ((chunk= allocate_output_chunk(client)) == NULL)
+      {
+        return PROTOCOL_BINARY_RESPONSE_ENOMEM;
+      }
+    }
+
+    size_t bulk = length - offset;
+    if (bulk > chunk->size - chunk->nbytes)
+    {
+      bulk = chunk->size - chunk->nbytes;
+    }
+
+    memcpy(chunk->data + chunk->nbytes, data, bulk);
+    chunk->nbytes += bulk;
+    offset += bulk;
+  }
+
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+/**
+ * Send a preformatted packet back to the client. If the connection is in
+ * pedantic mode, it will validate the packet and refuse to send it if it
+ * breaks the specification.
+ *
+ * @param cookie client identification
+ * @param request the original request packet
+ * @param response the packet to send
+ * @return The status of the operation
+ */
+static protocol_binary_response_status
+raw_response_handler(const void *cookie,
+                     protocol_binary_request_header *request,
+                     protocol_binary_response_header *response)
+{
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+
+  if (client->root->pedantic &&
+      !memcached_binary_protocol_pedantic_check_response(request, response))
+  {
+    return PROTOCOL_BINARY_RESPONSE_EINVAL;
+  }
+
+  if (!drain_output(client))
+  {
+    return PROTOCOL_BINARY_RESPONSE_EIO;
+  }
+
+  size_t len= sizeof(*response) + htonl(response->response.bodylen);
+  size_t offset= 0;
+  char *ptr= (void*)response;
+
+  if (client->output == NULL)
+  {
+    /* I can write directly to the socket.... */
+    do
+    {
+      size_t num_bytes= len - offset;
+      ssize_t nw= client->root->send(client,
+                                     client->sock,
+                                     ptr + offset,
+                                     num_bytes);
+      if (nw == -1)
+      {
+        if (errno == EWOULDBLOCK)
+        {
+          break;
+        }
+        else if (errno != EINTR)
+        {
+          client->error= errno;
+          return PROTOCOL_BINARY_RESPONSE_EIO;
+        }
+      }
+      else
+      {
+        offset += (size_t)nw;
+      }
+    } while (offset < len);
+  }
+
+  return spool_output(client, ptr, len - offset);
+}
+
+/*
+ * Version 0 of the interface is really low level and protocol specific,
+ * while the version 1 of the interface is more API focused. We need a
+ * way to translate between the command codes on the wire and the
+ * application level interface in V1, so let's just use the V0 of the
+ * interface as a map instead of creating a huuuge switch :-)
+ */
+
+/**
+ * Callback for the GET/GETQ/GETK and GETKQ responses
+ * @param cookie client identifier
+ * @param key the key for the item
+ * @param keylen the length of the key
+ * @param body the length of the body
+ * @param bodylen the length of the body
+ * @param flags the flags for the item
+ * @param cas the CAS id for the item
+ */
+static protocol_binary_response_status
+get_response_handler(const void *cookie,
+                     const void *key,
+                     uint16_t keylen,
+                     const void *body,
+                     uint32_t bodylen,
+                     uint32_t flags,
+                     uint64_t cas) {
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  uint8_t opcode= client->current_command->request.opcode;
+
+  if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ)
+  {
+    keylen= 0;
+  }
+
+  protocol_binary_response_get response= {
+    .message.header.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= opcode,
+      .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+      .opaque= client->current_command->request.opaque,
+      .cas= htonll(cas),
+      .keylen= htons(keylen),
+      .extlen= 4,
+      .bodylen= htonl(bodylen + keylen + 4),
+    },
+    .message.body.flags= htonl(flags),
+  };
+
+  protocol_binary_response_status rval;
+  const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
+      (rval= spool_output(client, key, keylen)) != success ||
+      (rval= spool_output(client, body, bodylen)) != success)
+  {
+    return rval;
+  }
+
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+/**
+ * Callback for the STAT responses
+ * @param cookie client identifier
+ * @param key the key for the item
+ * @param keylen the length of the key
+ * @param body the length of the body
+ * @param bodylen the length of the body
+ */
+static protocol_binary_response_status
+stat_response_handler(const void *cookie,
+                     const void *key,
+                     uint16_t keylen,
+                     const void *body,
+                     uint32_t bodylen) {
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+
+  protocol_binary_response_no_extras response= {
+    .message.header.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= client->current_command->request.opcode,
+      .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+      .opaque= client->current_command->request.opaque,
+      .keylen= htons(keylen),
+      .bodylen= htonl(bodylen + keylen),
+    },
+  };
+
+  protocol_binary_response_status rval;
+  const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
+      (rval= spool_output(client, key, keylen)) != success ||
+      (rval= spool_output(client, body, bodylen)) != success)
+  {
+    return rval;
+  }
+
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+/**
+ * Callback for the VERSION responses
+ * @param cookie client identifier
+ * @param text the length of the body
+ * @param textlen the length of the body
+ */
+static protocol_binary_response_status
+version_response_handler(const void *cookie,
+                         const void *text,
+                         uint32_t textlen) {
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+
+  protocol_binary_response_no_extras response= {
+    .message.header.response= {
+      .magic= PROTOCOL_BINARY_RES,
+      .opcode= client->current_command->request.opcode,
+      .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+      .opaque= client->current_command->request.opaque,
+      .bodylen= htonl(textlen),
+    },
+  };
+
+  protocol_binary_response_status rval;
+  const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+  if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success ||
+      (rval= spool_output(client, text, textlen)) != success)
+  {
+    return rval;
+  }
+
+  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+/**
+ * Callback for ADD and ADDQ
+ * @param cookie the calling client
+ * @param header the add/addq command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+add_command_handler(const void *cookie,
+                    protocol_binary_request_header *header,
+                    memcached_binary_protocol_raw_response_handler response_handler)
+{
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.add != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+    protocol_binary_request_add *request= (void*)header;
+    uint32_t flags= ntohl(request->message.body.flags);
+    uint32_t timeout= ntohl(request->message.body.expiration);
+    char *key= ((char*)header) + sizeof(*header) + 8;
+    char *data= key + keylen;
+    uint64_t cas;
+
+    rval= client->root->callback->interface.v1.add(cookie, key, keylen,
+                                                   data, datalen, flags,
+                                                   timeout, &cas);
+
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
+    {
+      /* Send a positive request */
+      protocol_binary_response_no_extras response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_ADD,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+            .cas= ntohll(cas)
+          }
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for DECREMENT and DECREMENTQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+decrement_command_handler(const void *cookie,
+                          protocol_binary_request_header *header,
+                          memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.decrement != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    protocol_binary_request_decr *request= (void*)header;
+    uint64_t init= ntohll(request->message.body.initial);
+    uint64_t delta= ntohll(request->message.body.delta);
+    uint32_t timeout= ntohl(request->message.body.expiration);
+    void *key= request->bytes + sizeof(request->bytes);
+    uint64_t result;
+    uint64_t cas;
+
+    char buffer[1024] = {0};
+    memcpy(buffer, key, keylen);
+    fprintf(stderr, "%s\n", buffer);
+
+
+    rval= client->root->callback->interface.v1.decrement(cookie, key, keylen,
+                                                         delta, init, timeout,
+                                                         &result, &cas);
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT)
+    {
+      /* Send a positive request */
+      protocol_binary_response_decr response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_DECREMENT,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+            .cas= ntohll(cas),
+            .bodylen= htonl(8)
+          },
+          .body.value = htonll(result)
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for DELETE and DELETEQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+delete_command_handler(const void *cookie,
+                       protocol_binary_request_header *header,
+                       memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.delete != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    void *key= (header + 1);
+    uint64_t cas= ntohll(header->request.cas);
+    rval= client->root->callback->interface.v1.delete(cookie, key, keylen, cas);
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
+    {
+      /* Send a positive request */
+      protocol_binary_response_no_extras response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_DELETE,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+          }
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for FLUSH and FLUSHQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+flush_command_handler(const void *cookie,
+                      protocol_binary_request_header *header,
+                      memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.flush != NULL)
+  {
+    protocol_binary_request_flush *flush= (void*)header;
+    uint32_t timeout= 0;
+    if (htonl(header->request.bodylen) == 4)
+    {
+      timeout= ntohl(flush->message.body.expiration);
+    }
+
+    rval= client->root->callback->interface.v1.flush(cookie, timeout);
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH)
+    {
+      /* Send a positive request */
+      protocol_binary_response_no_extras response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_FLUSH,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+          }
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for GET, GETK, GETQ, GETKQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+get_command_handler(const void *cookie,
+                    protocol_binary_request_header *header,
+                    memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.get != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    void *key= (header + 1);
+    rval= client->root->callback->interface.v1.get(cookie, key, keylen,
+                                                   get_response_handler);
+
+    if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT &&
+        (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ ||
+         header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ))
+    {
+      /* Quiet commands shouldn't respond on cache misses */
+      rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for INCREMENT and INCREMENTQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+increment_command_handler(const void *cookie,
+                          protocol_binary_request_header *header,
+                          memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.increment != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    protocol_binary_request_incr *request= (void*)header;
+    uint64_t init= ntohll(request->message.body.initial);
+    uint64_t delta= ntohll(request->message.body.delta);
+    uint32_t timeout= ntohl(request->message.body.expiration);
+    void *key= request->bytes + sizeof(request->bytes);
+    uint64_t cas;
+    uint64_t result;
+
+    rval= client->root->callback->interface.v1.increment(cookie, key, keylen,
+                                                         delta, init, timeout,
+                                                         &result, &cas);
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT)
+    {
+      /* Send a positive request */
+      protocol_binary_response_incr response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_INCREMENT,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+            .cas= ntohll(cas),
+            .bodylen= htonl(8)
+          },
+          .body.value = htonll(result)
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for noop. Inform the v1 interface about the noop packet, and
+ * create and send a packet back to the client
+ *
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler the response handler
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+noop_command_handler(const void *cookie,
+                     protocol_binary_request_header *header,
+                     memcached_binary_protocol_raw_response_handler response_handler)
+{
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.noop != NULL)
+  {
+    client->root->callback->interface.v1.noop(cookie);
+  }
+
+  protocol_binary_response_no_extras response= {
+    .message = {
+      .header.response = {
+        .magic = PROTOCOL_BINARY_RES,
+        .opcode= PROTOCOL_BINARY_CMD_NOOP,
+        .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+        .opaque= header->request.opaque,
+      }
+    }
+  };
+
+  return response_handler(cookie, header, (void*)&response);
+}
+
+/**
+ * Callback for APPEND and APPENDQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+append_command_handler(const void *cookie,
+                       protocol_binary_request_header *header,
+                       memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.append != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    uint32_t datalen= ntohl(header->request.bodylen) - keylen;
+    char *key= (void*)(header + 1);
+    char *data= key + keylen;
+    uint64_t cas= ntohll(header->request.cas);
+    uint64_t result_cas;
+
+    rval= client->root->callback->interface.v1.append(cookie, key, keylen,
+                                                      data, datalen, cas,
+                                                      &result_cas);
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_APPEND)
+    {
+      /* Send a positive request */
+      protocol_binary_response_no_extras response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_APPEND,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+            .cas= ntohll(result_cas),
+          },
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for PREPEND and PREPENDQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+prepend_command_handler(const void *cookie,
+                        protocol_binary_request_header *header,
+                        memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.prepend != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    uint32_t datalen= ntohl(header->request.bodylen) - keylen;
+    char *key= (char*)(header + 1);
+    char *data= key + keylen;
+    uint64_t cas= ntohll(header->request.cas);
+    uint64_t result_cas;
+    rval= client->root->callback->interface.v1.prepend(cookie, key, keylen,
+                                                       data, datalen, cas,
+                                                       &result_cas);
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
+    {
+      /* Send a positive request */
+      protocol_binary_response_no_extras response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_PREPEND,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+            .cas= ntohll(result_cas),
+          },
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for QUIT and QUITQ. Notify the client and shut down the connection
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+quit_command_handler(const void *cookie,
+                     protocol_binary_request_header *header,
+                     memcached_binary_protocol_raw_response_handler response_handler)
+{
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.quit != NULL)
+  {
+    client->root->callback->interface.v1.quit(cookie);
+  }
+
+  protocol_binary_response_no_extras response= {
+    .message = {
+      .header.response = {
+        .magic= PROTOCOL_BINARY_RES,
+        .opcode= PROTOCOL_BINARY_CMD_QUIT,
+        .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+        .opaque= header->request.opaque
+      }
+    }
+  };
+
+  if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
+  {
+    response_handler(cookie, header, (void*)&response);
+  }
+
+  /* I need a better way to signal to close the connection */
+  return PROTOCOL_BINARY_RESPONSE_EIO;
+}
+
+/**
+ * Callback for REPLACE and REPLACEQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+replace_command_handler(const void *cookie,
+                        protocol_binary_request_header *header,
+                        memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.replace != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+    protocol_binary_request_replace *request= (void*)header;
+    uint32_t flags= ntohl(request->message.body.flags);
+    uint32_t timeout= ntohl(request->message.body.expiration);
+    char *key= ((char*)header) + sizeof(*header) + 8;
+    char *data= key + keylen;
+    uint64_t cas= ntohll(header->request.cas);
+    uint64_t result_cas;
+
+    rval= client->root->callback->interface.v1.replace(cookie, key, keylen,
+                                                       data, datalen, flags,
+                                                       timeout, cas,
+                                                       &result_cas);
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
+    {
+      /* Send a positive request */
+      protocol_binary_response_no_extras response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_REPLACE,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+            .cas= ntohll(result_cas),
+          },
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for SET and SETQ
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+set_command_handler(const void *cookie,
+                    protocol_binary_request_header *header,
+                    memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.set != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+    uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8;
+    protocol_binary_request_replace *request= (void*)header;
+    uint32_t flags= ntohl(request->message.body.flags);
+    uint32_t timeout= ntohl(request->message.body.expiration);
+    char *key= ((char*)header) + sizeof(*header) + 8;
+    char *data= key + keylen;
+    uint64_t cas= ntohll(header->request.cas);
+    uint64_t result_cas;
+
+
+    rval= client->root->callback->interface.v1.set(cookie, key, keylen,
+                                                   data, datalen, flags,
+                                                   timeout, cas, &result_cas);
+    if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+        header->request.opcode == PROTOCOL_BINARY_CMD_SET)
+    {
+      /* Send a positive request */
+      protocol_binary_response_no_extras response= {
+        .message= {
+          .header.response= {
+            .magic= PROTOCOL_BINARY_RES,
+            .opcode= PROTOCOL_BINARY_CMD_SET,
+            .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS),
+            .opaque= header->request.opaque,
+            .cas= ntohll(result_cas),
+          },
+        }
+      };
+      rval= response_handler(cookie, header, (void*)&response);
+    }
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for STAT
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+stat_command_handler(const void *cookie,
+                     protocol_binary_request_header *header,
+                     memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.stat != NULL)
+  {
+    uint16_t keylen= ntohs(header->request.keylen);
+
+    rval= client->root->callback->interface.v1.stat(cookie,
+                                                    (void*)(header + 1),
+                                                    keylen,
+                                                    stat_response_handler);
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * Callback for VERSION
+ * @param cookie the calling client
+ * @param header the command
+ * @param response_handler not used
+ * @return the result of the operation
+ */
+static protocol_binary_response_status
+version_command_handler(const void *cookie,
+                        protocol_binary_request_header *header,
+                        memcached_binary_protocol_raw_response_handler response_handler)
+{
+  (void)response_handler;
+  (void)header;
+  protocol_binary_response_status rval;
+
+  struct memcached_binary_protocol_client_st *client= (void*)cookie;
+  if (client->root->callback->interface.v1.version != NULL)
+  {
+    rval= client->root->callback->interface.v1.version(cookie,
+                                                       version_response_handler);
+  }
+  else
+  {
+    rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  }
+
+  return rval;
+}
+
+/**
+ * The map to remap between the com codes and the v1 logical setting
+ */
+static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256]= {
+  [PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler,
+  [PROTOCOL_BINARY_CMD_ADD]= add_command_handler,
+  [PROTOCOL_BINARY_CMD_APPENDQ]= append_command_handler,
+  [PROTOCOL_BINARY_CMD_APPEND]= append_command_handler,
+  [PROTOCOL_BINARY_CMD_DECREMENTQ]= decrement_command_handler,
+  [PROTOCOL_BINARY_CMD_DECREMENT]= decrement_command_handler,
+  [PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler,
+  [PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler,
+  [PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler,
+  [PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler,
+  [PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler,
+  [PROTOCOL_BINARY_CMD_GETK]= get_command_handler,
+  [PROTOCOL_BINARY_CMD_GETQ]= get_command_handler,
+  [PROTOCOL_BINARY_CMD_GET]= get_command_handler,
+  [PROTOCOL_BINARY_CMD_INCREMENTQ]= increment_command_handler,
+  [PROTOCOL_BINARY_CMD_INCREMENT]= increment_command_handler,
+  [PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler,
+  [PROTOCOL_BINARY_CMD_PREPENDQ]= prepend_command_handler,
+  [PROTOCOL_BINARY_CMD_PREPEND]= prepend_command_handler,
+  [PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler,
+  [PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler,
+  [PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler,
+  [PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler,
+  [PROTOCOL_BINARY_CMD_SETQ]= set_command_handler,
+  [PROTOCOL_BINARY_CMD_SET]= set_command_handler,
+  [PROTOCOL_BINARY_CMD_STAT]= stat_command_handler,
+  [PROTOCOL_BINARY_CMD_VERSION]= version_command_handler,
+};
+
+/**
+ * Try to execute a command. Fire the pre/post functions and the specialized
+ * handler function if it's set. If not, the unknown probe should be fired
+ * if it's present.
+ * @param client the client connection to operate on
+ * @param header the command to execute
+ * @return true if success or false if a fatal error occured so that the
+ *         connection should be shut down.
+ */
+static bool execute_command(struct memcached_binary_protocol_client_st *client, protocol_binary_request_header *header)
+{
+  if (client->root->pedantic &&
+      memcached_binary_protocol_pedantic_check_request(header))
+  {
+      /* @todo return invalid command packet */
+  }
+
+  /* we got all data available, execute the callback! */
+  if (client->root->callback->pre_execute != NULL)
+  {
+    client->root->callback->pre_execute(client, header);
+  }
+
+  protocol_binary_response_status rval;
+  rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+  uint8_t cc= header->request.opcode;
+
+  switch (client->root->callback->interface_version)
+  {
+  case 0:
+    if (client->root->callback->interface.v0.comcode[cc] != NULL) {
+      rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler);
+    }
+    break;
+  case 1:
+    if (comcode_v0_v1_remap[cc] != NULL) {
+      rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler);
+    }
+    break;
+  default:
+    /* Unknown interface.
+     * It should be impossible to get here so I'll just call abort
+     * to avoid getting a compiler warning :-)
+     */
+    abort();
+  }
+
+
+  if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND &&
+      client->root->callback->unknown != NULL)
+  {
+    rval= client->root->callback->unknown(client, header, raw_response_handler);
+  }
+
+  if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS &&
+      rval != PROTOCOL_BINARY_RESPONSE_EIO)
+  {
+    protocol_binary_response_no_extras response= {
+      .message= {
+        .header.response= {
+          .magic= PROTOCOL_BINARY_RES,
+          .opcode= cc,
+          .status= htons(rval),
+          .opaque= header->request.opaque,
+        },
+      }
+    };
+    rval= raw_response_handler(client, header, (void*)&response);
+  }
+
+  if (client->root->callback->post_execute != NULL)
+  {
+    client->root->callback->post_execute(client, header);
+  }
+
+  return rval != PROTOCOL_BINARY_RESPONSE_EIO;
+}
+
+/*
+** **********************************************************************
+** * PUBLIC INTERFACE
+** * See protocol_handler.h for function description
+** **********************************************************************
+*/
+struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void)
+{
+  struct memcached_binary_protocol_st *ret= calloc(1, sizeof(*ret));
+  if (ret != NULL)
+  {
+    ret->recv= default_recv;
+    ret->send= default_send;
+    ret->input_buffer_size= 1 * 1024 * 1024;
+    ret->input_buffer= malloc(ret->input_buffer_size);
+    if (ret->input_buffer == NULL)
+    {
+      free(ret);
+      ret= NULL;
+      return NULL;
+    }
+
+    ret->buffer_cache = cache_create("protocol_handler",
+                                     CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
+                                     0, NULL, NULL);
+    if (ret->buffer_cache == NULL) {
+      free(ret->input_buffer);
+      free(ret);
+    }
+  }
+
+  return ret;
+}
+
+void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance)
+{
+  cache_destroy(instance->buffer_cache);
+  free(instance->input_buffer);
+  free(instance);
+}
+
+struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance)
+{
+  return instance->callback;
+}
+
+void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback)
+{
+  instance->callback= callback;
+}
+
+memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie)
+{
+  (void)cookie;
+  return raw_response_handler;
+}
+
+void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable)
+{
+  instance->pedantic= enable;
+}
+
+bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance)
+{
+  return instance->pedantic;
+}
+
+struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock)
+{
+  struct memcached_binary_protocol_client_st *ret= calloc(1, sizeof(*ret));
+  if (ret != NULL)
+  {
+    ret->root= instance;
+    ret->sock= sock;
+  }
+
+  return ret;
+}
+
+void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client)
+{
+  free(client);
+}
+
+enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client)
+{
+  /* Try to send data and read from the socket */
+  bool more_data= true;
+  do
+  {
+    ssize_t len= client->root->recv(client,
+                                    client->sock,
+                                    client->root->input_buffer + client->input_buffer_offset,
+                                    client->root->input_buffer_size - client->input_buffer_offset);
+
+    if (len > 0)
+    {
+      /* Do we have the complete packet? */
+      if (client->input_buffer_offset > 0)
+      {
+        memcpy(client->root->input_buffer, client->input_buffer,
+               client->input_buffer_offset);
+        len += (ssize_t)client->input_buffer_offset;
+
+        /* @todo use buffer-cache! */
+        free(client->input_buffer);
+        client->input_buffer_offset= 0;
+      }
+
+      /* try to parse all of the received packets */
+      protocol_binary_request_header *header;
+      header= (void*)client->root->input_buffer;
+
+      if (header->request.magic != (uint8_t)PROTOCOL_BINARY_REQ)
+      {
+        client->error= EINVAL;
+        return ERROR_EVENT;
+      }
+
+      while (len >= (ssize_t)sizeof(*header) &&
+             (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen))))
+      {
+
+        /* I have the complete package */
+        client->current_command = header;
+        if (!execute_command(client, header))
+        {
+          return ERROR_EVENT;
+        }
+
+        ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen));
+        len -= total;
+        if (len > 0)
+        {
+          intptr_t ptr= (intptr_t)header;
+          ptr += total;
+          if ((ptr % 8) == 0)
+          {
+            header= (void*)ptr;
+          }
+          else
+          {
+            memmove(client->root->input_buffer, (void*)ptr, (size_t)len);
+            header= (void*)client->root->input_buffer;
+          }
+        }
+      }
+
+      if (len > 0)
+      {
+        /* save the data for later on */
+        /* @todo use buffer-cache */
+        client->input_buffer= malloc((size_t)len);
+        if (client->input_buffer == NULL)
+        {
+          client->error= ENOMEM;
+          return ERROR_EVENT;
+        }
+        memcpy(client->input_buffer, header, (size_t)len);
+        client->input_buffer_offset= (size_t)len;
+        more_data= false;
+      }
+    }
+    else if (len == 0)
+    {
+      /* Connection closed */
+      drain_output(client);
+      return ERROR_EVENT;
+    }
+    else
+    {
+      if (errno != EWOULDBLOCK)
+      {
+        client->error= errno;
+        /* mark this client as terminated! */
+        return ERROR_EVENT;
+      }
+      more_data = false;
+    }
+  } while (more_data);
+
+  if (!drain_output(client))
+  {
+    return ERROR_EVENT;
+  }
+
+  return (client->output) ? READ_WRITE_EVENT : READ_EVENT;
+}
diff --git a/libmemcached/protocol_handler.h b/libmemcached/protocol_handler.h
new file mode 100644 (file)
index 0000000..e1528d5
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * Summary: Definition of the callback interface to the protocol handler
+ *
+ * Copy: See Copyright for the status of this software.
+ *
+ * Author: Trond Norbye
+ */
+#ifndef MEMCACHED_PROTOCOL_H
+#define MEMCACHED_PROTOCOL_H
+
+#include <sys/types.h>
+#include <stdbool.h>
+
+#include <libmemcached/memcached/protocol_binary.h>
+#include <libmemcached/visibility.h>
+#include <libmemcached/protocol/callback.h>
+
+/* Forward declarations */
+/*
+ * You should only access memcached_binary_protocol_st from one thread!,
+ * and never assume anything about the internal layout / sizes of the
+ * structures.
+ */
+struct memcached_binary_protocol_st;
+struct memcached_binary_protocol_client_st;
+
+/**
+ * Function the protocol handler should call to receive data.
+ * This function should behave exactly like read(2)
+ *
+ * @param cookie a cookie used to represent a given client
+ * @param fd the filedescriptor associated with the client
+ * @param buf destination buffer
+ * @param nbuf number of bytes to receive
+ * @return the number of bytes copied into buf
+ *         or -1 upon error (errno should contain more information)
+ */
+typedef ssize_t (*memcached_binary_protocol_recv_func)(const void *cookie,
+                                                       int fd,
+                                                       void *buf, 
+                                                       size_t nbuf);
+
+/**
+ * Function the protocol handler should call to send data.
+ * This function should behave exactly like write(2)
+ *
+ * @param cookie a cookie used to represent a given client
+ * @param fd the filedescriptor associated with the client
+ * @param buf the source buffer
+ * @param nbuf number of bytes to send
+ * @return the number of bytes sent
+ *         or -1 upon error (errno should contain more information)
+ */
+typedef ssize_t (*memcached_binary_protocol_send_func)(const void *cookie,
+                                                       int fd,
+                                                       const void *buf, 
+                                                       size_t nbuf);
+
+/**
+ * Create an instance of the protocol handler
+ *
+ * @return NULL if allocation of an instance fails
+ */
+LIBMEMCACHED_API
+struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void);
+
+/**
+ * Get the callbacks associated with a protocol handler instance
+ * @return the callbacks currently used
+ */
+LIBMEMCACHED_API
+struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance);
+
+/**
+ * Set the callbacks to be used by the given protocol handler instance
+ * @param instance the instance to update
+ * @param callback the callbacks to use
+ */
+LIBMEMCACHED_API
+void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback);
+
+/**
+ * Should the library inspect the packages being sent and received and verify
+ * that they are according to the specification? If it encounters an invalid
+ * packet, it will return an EINVAL packet.
+ *
+ * @param instance the instance to update
+ * @param enable true if you want the library to check packages, false otherwise
+ */
+LIBMEMCACHED_API
+void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable);
+
+/**
+ * Is the library inpecting each package?
+ * @param instance the instance to check
+ * @return true it the library is inspecting each package, false otherwise
+ */
+LIBMEMCACHED_API
+bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance);
+
+/**
+ * Destroy an instance of the protocol handler
+ *
+ * @param instance The instance to destroy
+ */
+LIBMEMCACHED_API
+void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance);
+
+/**
+ * Set the IO functions used by the instance to send and receive data. The
+ * functions should behave like recv(3socket) and send(3socket). 
+ * 
+ * @param instance the instance to specify the IO functions for
+ * @param recv the function to call for reciving data
+ * @param send the function to call for sending data
+ */
+LIBMEMCACHED_API
+void memached_binary_protocol_set_io_functions(struct memcached_binary_protocol_st *instance, 
+                                               memcached_binary_protocol_recv_func recv, 
+                                               memcached_binary_protocol_send_func send);
+
+
+/**
+ * Create a new client instance and associate it with a socket
+ * @param instance the protocol instance to bind the client to
+ * @param sock the client socket
+ * @return NULL if allocation fails, otherwise an instance 
+ */
+LIBMEMCACHED_API
+struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock);
+
+/**
+ * Destroy a client handle.
+ * The caller needs to close the socket accociated with the client 
+ * <b>before</b> calling this function. This function invalidates the
+ * client memory area.
+ *
+ * @param client the client to destroy
+ */
+LIBMEMCACHED_API
+void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client);
+
+/**
+ * The different events the client is interested in
+ */
+enum MEMCACHED_BINARY_PROTOCOL_EVENT { 
+   /* Error event means that the client encountered an error with the
+    * connection so you should shut it down */
+   ERROR_EVENT, 
+   /* Please notify when there is more data available to read */
+   READ_EVENT, 
+   /* Please notify when it is possible to send more data */ 
+   WRITE_EVENT, 
+   /* Please notify when it is possible to send or receive data */
+   READ_WRITE_EVENT 
+};
+
+/**
+ * Let the client do some work. This might involve reading / sending data 
+ * to/from the client, or perform callbacks to execute a command.
+ * @param client the client structure to work on
+ * @return The next event the protocol handler will be notified for
+ */
+LIBMEMCACHED_API
+enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client);
+
+/**
+ * Get the socket attached to a client handle
+ * @param client the client to query
+ * @return the socket handle
+ */
+LIBMEMCACHED_API
+int memcached_binary_protocol_client_get_socket(struct memcached_binary_protocol_client_st *client);
+
+/**
+ * Get the error id socket attached to a client handle
+ * @param client the client to query for an error code
+ * @return the OS error code from the client
+ */
+LIBMEMCACHED_API
+int memcached_binary_protocol_client_get_errno(struct memcached_binary_protocol_client_st *client);
+
+/**
+ * Get a raw response handler for the given cookie
+ * @param cookie the cookie passed along into the callback
+ * @return the raw reponse handler you may use if you find
+ *         the generic callback too limiting
+ */
+LIBMEMCACHED_API
+memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie);
+#endif