Merge Trond's protocol work.
authorBrian Aker <brian@gaz>
Fri, 2 Oct 2009 00:01:34 +0000 (17:01 -0700)
committerBrian Aker <brian@gaz>
Fri, 2 Oct 2009 00:01:34 +0000 (17:01 -0700)
1  2 
configure.ac
example/memcached_light.c
example/storage.c
example/storage_innodb.c
libmemcached/protocol/protocol_handler.c

diff --cc configure.ac
Simple merge
index 0000000000000000000000000000000000000000,c1bdea04fdbcdc5df3d56f57a9572b48f71dccec..4343b835e265b9e5ca4a85bc5fff86414990dd1a
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,381 +1,384 @@@
 -static void work(void) {
+ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+ /**
+  * What is a library without an example to show you how to use the library?
+  * This example use both interfaces to implement a small memcached server.
+  * Please note that this is an exemple on how to use the library, not
+  * an implementation of a scalable memcached server. If you look closely
+  * at the example it isn't even multithreaded ;-)
+  *
+  * With that in mind, let me give you some pointers into the source:
+  *   storage.c/h       - Implements the item store for this server and not really
+  *                       interesting for this example.
+  *   interface_v0.c    - Shows an implementation of the memcached server by using
+  *                       the "raw" access to the packets as they arrive
+  *   interface_v1.c    - Shows an implementation of the memcached server by using
+  *                       the more "logical" interface.
+  *   memcached_light.c - This file sets up all of the sockets and run the main
+  *                       message loop.
+  */
+ #include <assert.h>
+ #include <sys/types.h>
+ #include <sys/socket.h>
+ #include <netdb.h>
+ #include <netinet/tcp.h>
+ #include <stdio.h>
+ #include <unistd.h>
+ #include <fcntl.h>
+ #include <errno.h>
+ #include <stdlib.h>
+ #include <string.h>
+ #include <poll.h>
+ #include <libmemcached/protocol_handler.h>
+ #include <libmemcached/byteorder.h>
+ #include "storage.h"
+ extern struct memcached_binary_protocol_callback_st interface_v0_impl;
+ extern struct memcached_binary_protocol_callback_st interface_v1_impl;
+ static int server_sockets[1024];
+ static int num_server_sockets= 0;
+ static void* socket_userdata_map[1024];
+ static bool verbose= false;
+ /**
+  * Create a socket and bind it to a specific port number
+  * @param port the port number to bind to
+  */
+ static int server_socket(const char *port) {
+   struct addrinfo *ai;
+   struct addrinfo hints= { .ai_flags= AI_PASSIVE,
+                            .ai_family= AF_UNSPEC,
+                            .ai_socktype= SOCK_STREAM };
+   int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
+   if (error != 0)
+   {
+     if (error != EAI_SYSTEM)
+       fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
+     else
+       perror("getaddrinfo()");
+     return 1;
+   }
+   struct linger ling= {0, 0};
+   for (struct addrinfo *next= ai; next; next= next->ai_next)
+   {
+     int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+     if (sock == -1)
+     {
+       perror("Failed to create socket");
+       continue;
+     }
+     int flags= fcntl(sock, F_GETFL, 0);
+     if (flags == -1)
+     {
+       perror("Failed to get socket flags");
+       close(sock);
+       continue;
+     }
+     if ((flags & O_NONBLOCK) != O_NONBLOCK)
++    {
+       if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
+       {
+         perror("Failed to set socket to nonblocking mode");
+         close(sock);
+         continue;
+       }
++    }
+     flags= 1;
+     if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
+       perror("Failed to set SO_REUSEADDR");
+     if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
+       perror("Failed to set SO_KEEPALIVE");
+     if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
+       perror("Failed to set SO_LINGER");
+     if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
+       perror("Failed to set TCP_NODELAY");
+     if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
+     {
+       if (errno != EADDRINUSE)
+       {
+         perror("bind()");
+         freeaddrinfo(ai);
+       }
+       close(sock);
+       continue;
+     }
+     if (listen(sock, 1024) == -1)
+     {
+       perror("listen()");
+       close(sock);
+       continue;
+     }
+     server_sockets[num_server_sockets++]= sock;
+   }
+   freeaddrinfo(ai);
+   return (num_server_sockets > 0) ? 0 : 1;
+ }
+ /**
+  * Convert a command code to a textual string
+  * @param cmd the comcode to convert
+  * @return a textual string with the command or NULL for unknown commands
+  */
+ static const char* comcode2str(uint8_t cmd)
+ {
+   static const char * const text[] = {
+     "GET", "SET", "ADD", "REPLACE", "DELETE",
+     "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
+     "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
+     "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
+     "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
+     "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
+   };
+   if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
+     return text[cmd];
+   return NULL;
+ }
+ /**
+  * Print out the command we are about to execute
+  */
+ static void pre_execute(const void *cookie __attribute__((unused)),
+                         protocol_binary_request_header *header __attribute__((unused)))
+ {
+   if (verbose)
+   {
+     const char *cmd= comcode2str(header->request.opcode);
+     if (cmd != NULL)
+       fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
+     else
+       fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+   }
+ }
+ /**
+  * Print out the command we just executed
+  */
+ static void post_execute(const void *cookie __attribute__((unused)),
+                          protocol_binary_request_header *header __attribute__((unused)))
+ {
+   if (verbose)
+   {
+     const char *cmd= comcode2str(header->request.opcode);
+     if (cmd != NULL)
+       fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
+     else
+       fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+   }
+ }
+ /**
+  * Callback handler for all unknown commands.
+  * Send an unknown command back to the client
+  */
+ static protocol_binary_response_status unknown(const void *cookie,
+                                                protocol_binary_request_header *header,
+                                                memcached_binary_protocol_raw_response_handler response_handler)
+ {
+   protocol_binary_response_no_extras response= {
+     .message= {
+       .header.response= {
+         .magic= PROTOCOL_BINARY_RES,
+         .opcode= header->request.opcode,
+         .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
+         .opaque= header->request.opaque
+       }
+     }
+   };
+   return response_handler(cookie, header, (void*)&response);
+ }
+ static void work(void);
+ /**
+  * Program entry point. Bind to the specified port(s) and serve clients
+  *
+  * @param argc number of items in the argument vector
+  * @param argv argument vector
+  * @return 0 on success, 1 otherwise
+  */
+ int main(int argc, char **argv)
+ {
+   bool port_specified= false;
+   int cmd;
+   struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
+   while ((cmd= getopt(argc, argv, "v1p:?")) != EOF)
+   {
+     switch (cmd) {
+     case '1':
+       interface= &interface_v1_impl;
+       break;
+     case 'p':
+       port_specified= true;
+       (void)server_socket(optarg);
+       break;
+     case 'v':
+       verbose= true;
+       break;
+     case '?':  /* FALLTHROUGH */
+     default:
+       (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]);
+       return 1;
+     }
+   }
+   if (!initialize_storage())
+   {
+     /* Error message already printed */
+     return 1;
+   }
+   if (!port_specified)
+     (void)server_socket("9999");
+   if (num_server_sockets == 0)
+   {
+     fprintf(stderr, "I don't have any server sockets\n");
+     return 1;
+   }
+   /*
+    * Create and initialize the handles to the protocol handlers. I want
+    * to be able to trace the traffic throught the pre/post handlers, and
+    * set up a common handler for unknown messages
+    */
+   interface->pre_execute= pre_execute;
+   interface->post_execute= post_execute;
+   interface->unknown= unknown;
+   struct memcached_protocol_st *protocol_handle;
+   if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
+   {
+     fprintf(stderr, "Failed to allocate protocol handle\n");
+     return 1;
+   }
+   memcached_binary_protocol_set_callbacks(protocol_handle, interface);
+   memcached_binary_protocol_set_pedantic(protocol_handle, true);
+   for (int xx= 0; xx < num_server_sockets; ++xx)
+     socket_userdata_map[server_sockets[xx]]= protocol_handle;
+   /* Serve all of the clients */
+   work();
+   /* NOTREACHED */
+   return 0;
+ }
++static void work(void) 
++{
+ #define MAX_SERVERS_TO_POLL 100
+   struct pollfd fds[MAX_SERVERS_TO_POLL];
+   int max_poll;
+   for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
+   {
+     fds[max_poll].events= POLLIN;
+     fds[max_poll].revents= 0;
+     fds[max_poll].fd= server_sockets[max_poll];
+     ++max_poll;
+   }
+   while (true)
+   {
+     int err= poll(fds, (nfds_t)max_poll, -1);
+     if (err == 0 || (err == -1 && errno != EINTR))
+     {
+       perror("poll() failed");
+       abort();
+     }
+     /* find the available filedescriptors */
+     for (int x= max_poll - 1; x > -1 && err > 0; --x)
+     {
+       if (fds[x].revents != 0)
+       {
+         --err;
+         if (x < num_server_sockets)
+         {
+           /* accept new client */
+           struct sockaddr_storage addr;
+           socklen_t addrlen= sizeof(addr);
+           int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
+                            &addrlen);
+           if (sock == -1)
+           {
+             perror("Failed to accept client");
+             continue;
+           }
+           struct memcached_protocol_st *protocol;
+           protocol= socket_userdata_map[fds[x].fd];
+           struct memcached_protocol_client_st* c;
+           c= memcached_protocol_create_client(protocol, sock);
+           if (c == NULL)
+           {
+             fprintf(stderr, "Failed to create client\n");
+             close(sock);
+           }
+           else
+           {
+             socket_userdata_map[sock]= c;
+             fds[max_poll].events= POLLIN;
+             fds[max_poll].revents= 0;
+             fds[max_poll].fd= sock;
+             ++max_poll;
+           }
+         }
+         else
+         {
+           /* drive the client */
+           struct memcached_protocol_client_st* c;
+           c= socket_userdata_map[fds[x].fd];
+           assert(c != NULL);
+           fds[max_poll].events= 0;
+           switch (memcached_protocol_client_work(c))
+           {
+           case WRITE_EVENT:
+           case READ_WRITE_EVENT:
+             fds[max_poll].events= POLLOUT;
+             /* FALLTHROUGH */
+           case READ_EVENT:
+             fds[max_poll].events |= POLLIN;
+             break;
+           case ERROR_EVENT:
+           default: /* ERROR or unknown state.. close */
+             memcached_protocol_client_destroy(c);
+             close(fds[x].fd);
+             fds[x].events= 0;
+             if (x != max_poll - 1)
+               memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
+             --max_poll;
+           }
+         }
+       }
+     }
+   }
+ }
index 0000000000000000000000000000000000000000,72439361bde166d990819f130016382e254f3d2c..39b22cafd40740252c8b346ff7b8c70cd37ad40e
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,159 +1,170 @@@
 -bool initialize_storage(void) {
+ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+ #include <stdlib.h>
+ #include <inttypes.h>
+ #include <time.h>
+ #include <stdbool.h>
+ #include <string.h>
+ #include "storage.h"
+ struct list_entry {
+   struct item item;
+   struct list_entry *next;
+   struct list_entry *prev;
+ };
+ static struct list_entry *root;
+ static uint64_t cas;
 -void shutdown_storage(void) {
++bool initialize_storage(void) 
++{
+   return true;
+ }
 -void put_item(struct item* item) {
++void shutdown_storage(void) 
++{
+   /* Do nothing */
+ }
 -struct item* get_item(const void* key, size_t nkey) {
++void put_item(struct item* item) 
++{
+   struct list_entry* entry= (void*)item;
++
+   update_cas(item);
+   if (root == NULL)
+   {
+     entry->next= entry->prev= entry;
+   }
+   else
+   {
+     entry->prev= root->prev;
+     entry->next= root;
+     entry->prev->next= entry;
+     entry->next->prev= entry;
+   }
+   root= entry;
+ }
 -bool delete_item(const void* key, size_t nkey) {
++struct item* get_item(const void* key, size_t nkey) 
++{
+   struct list_entry *walker= root;
++
+   if (root == NULL)
+   {
+     return NULL;
+   }
+   do
+   {
+     if (((struct item*)walker)->nkey == nkey &&
+         memcmp(((struct item*)walker)->key, key, nkey) == 0)
+     {
+       return (struct item*)walker;
+     }
+     walker= walker->next;
+   } while (walker != root);
+   return NULL;
+ }
+ struct item* create_item(const void* key, size_t nkey, const void* data,
+                          size_t size, uint32_t flags, time_t exp)
+ {
+   struct item* ret= calloc(1, sizeof(struct list_entry));
++
+   if (ret != NULL)
+   {
+     ret->key= malloc(nkey);
+     if (size > 0)
+     {
+       ret->data= malloc(size);
+     }
+     if (ret->key == NULL || (size > 0 && ret->data == NULL))
+     {
+       free(ret->key);
+       free(ret->data);
+       free(ret);
+       return NULL;
+     }
+     memcpy(ret->key, key, nkey);
+     if (data != NULL)
+     {
+       memcpy(ret->data, data, size);
+     }
+     ret->nkey= nkey;
+     ret->size= size;
+     ret->flags= flags;
+     ret->exp= exp;
+   }
+   return ret;
+ }
 -void flush(uint32_t when) {
++bool delete_item(const void* key, size_t nkey) 
++{
+   struct item* item= get_item(key, nkey);
+   bool ret= false;
+   if (item)
+   {
+     /* remove from linked list */
+     struct list_entry *entry= (void*)item;
+     if (entry->next == entry)
+     {
+       /* Only one object in the list */
+       root= NULL;
+     }
+     else
+     {
+       /* ensure that we don't loose track of the root, and this will
+        * change the start position for the next search ;-) */
+       root= entry->next;
+       entry->prev->next= entry->next;
+       entry->next->prev= entry->prev;
+     }
+     free(item->key);
+     free(item->data);
+     free(item);
+     ret= true;
+   }
+   return ret;
+ }
 -void update_cas(struct item* item) {
++void flush(uint32_t when) 
++{
+   /* FIXME */
+   (void)when;
+   /* remove the complete linked list */
+   if (root == NULL)
+   {
+     return;
+   }
+   root->prev->next= NULL;
+   while (root != NULL)
+   {
+     struct item* tmp= (void*)root;
+     root= root->next;
+     free(tmp->key);
+     free(tmp->data);
+     free(tmp);
+   }
+ }
 -void release_item(struct item* item __attribute__((unused))) {
++void update_cas(struct item* item) 
++{
+   item->cas= ++cas;
+ }
++void release_item(struct item* item __attribute__((unused))) 
++{
+   /* EMPTY */
+ }
index 0000000000000000000000000000000000000000,f11d1267ec3feef4653ad926ba435b452df66c21..92a7a4bf75378331858c4a59c8e240e463a449c7
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,498 +1,523 @@@
 -static bool create_schema(void) {
+ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+ #include <stdlib.h>
+ #include <inttypes.h>
+ #include <time.h>
+ #include <stdbool.h>
+ #include <string.h>
+ #include <unistd.h>
+ #include <assert.h>
+ #include <embedded_innodb-1.0/innodb.h>
+ #include "storage.h"
+ const char *tablename= "memcached/items";
+ #define key_col_idx 0
+ #define data_col_idx 1
+ #define flags_col_idx 2
+ #define cas_col_idx 3
+ #define exp_col_idx 4
+ static uint64_t cas;
+ /**
+  * To avoid cluttering down all the code with error checking I use the
+  * following macro. It will execute the statement and verify that the
+  * result of the operation is DB_SUCCESS. If any other error code is
+  * returned it will print an "assert-like" output and jump to the
+  * label error_exit. There I release resources before returning out of
+  * the function.
+  *
+  * @param a the expression to execute
+  *
+  */
+ #define checked(expression)                                    \
+ do {                                                           \
+   ib_err_t checked_err= expression;                            \
+   if (checked_err != DB_SUCCESS)                               \
+   {                                                            \
+     fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n",   \
+             __FILE__, __LINE__, #expression,                   \
+             ib_strerror(checked_err));                         \
+     goto error_exit;                                           \
+   }                                                            \
+ } while (0);
+ /**
+  * Create the database schema.
+  * @return true if the database schema was created without any problems
+  *         false otherwise.
+  */
 -static bool do_put_item(ib_trx_t trx, struct item* item) {
++static bool create_schema(void) 
++{
+   ib_tbl_sch_t schema= NULL;
+   ib_idx_sch_t dbindex= NULL;
+   if (ib_database_create("memcached") != IB_TRUE)
+   {
+     fprintf(stderr, "Failed to create database\n");
+     return false;
+   }
+   ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
+   ib_id_t table_id;
+   checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
+   checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
+                                   IB_COL_NOT_NULL, 0, 32767));
+   checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
+                                   IB_COL_NONE, 0, 1024*1024));
+   checked(ib_table_schema_add_col(schema, "flags", IB_INT,
+                                   IB_COL_UNSIGNED, 0, 4));
+   checked(ib_table_schema_add_col(schema, "cas", IB_INT,
+                                   IB_COL_UNSIGNED, 0, 8));
+   checked(ib_table_schema_add_col(schema, "exp", IB_INT,
+                                   IB_COL_UNSIGNED, 0, 4));
+   checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex));
+   checked(ib_index_schema_add_col(dbindex, "key", 0));
+   checked(ib_index_schema_set_clustered(dbindex));
+   checked(ib_schema_lock_exclusive(transaction));
+   checked(ib_table_create(transaction, schema, &table_id));
+   checked(ib_trx_commit(transaction));
+   ib_table_schema_delete(schema);
+   return true;
+  error_exit:
+   /* @todo release resources! */
+   {
+     ib_err_t error= ib_trx_rollback(transaction);
+     if (error != DB_SUCCESS)
+       fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+               ib_strerror(error));
+   }
+   return false;
+ }
+ /**
+  * Store an item into the database. Update the CAS id on the item before
+  * storing it in the database.
+  *
+  * @param trx the transaction to use
+  * @param item the item to store
+  * @return true if we can go ahead and commit the transaction, false otherwise
+  */
 -static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) {
++static bool do_put_item(ib_trx_t trx, struct item* item) 
++{
+   update_cas(item);
+   ib_crsr_t cursor= NULL;
+   ib_tpl_t tuple= NULL;
+   bool retval= false;
+   checked(ib_cursor_open_table(tablename, trx, &cursor));
+   checked(ib_cursor_lock(cursor, IB_LOCK_X));
+   tuple= ib_clust_read_tuple_create(cursor);
+   checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
+   checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
+   checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
+   checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
+   checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp));
+   checked(ib_cursor_insert_row(cursor, tuple));
+   retval= true;
+   /* Release resources: */
+   /* FALLTHROUGH */
+  error_exit:
+   if (tuple != NULL)
+     ib_tuple_delete(tuple);
+   if (cursor != NULL)
+     ib_cursor_close(cursor);
+   return retval;
+ }
+ /**
+  * Try to locate an item in the database. Return a cursor and the tuple to
+  * the item if I found it in the database.
+  *
+  * @param trx the transaction to use
+  * @param key the key of the item to look up
+  * @param nkey the size of the key
+  * @param cursor where to store the cursor (OUT)
+  * @param tuple where to store the tuple (OUT)
+  * @return true if I found the object, false otherwise
+  */
+ static bool do_locate_item(ib_trx_t trx,
+                            const void* key,
+                            size_t nkey,
+                            ib_crsr_t *cursor)
+ {
+   int res;
+   ib_tpl_t tuple= NULL;
+   *cursor= NULL;
+   checked(ib_cursor_open_table(tablename, trx, cursor));
+   tuple= ib_clust_search_tuple_create(*cursor);
+   if (tuple == NULL)
+   {
+     fprintf(stderr, "Failed to allocate tuple object\n");
+     goto error_exit;
+   }
+   checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
+   ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
+   if (err == DB_SUCCESS && res == 0)
+   {
+     ib_tuple_delete(tuple);
+     return true;
+   }
+   else if (err != DB_SUCCESS &&
+            err != DB_RECORD_NOT_FOUND &&
+            err != DB_END_OF_INDEX)
+   {
+     fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
+   }
+   /* FALLTHROUGH */
+  error_exit:
+   if (tuple != NULL)
+     ib_tuple_delete(tuple);
+   if (*cursor != NULL)
+     ib_cursor_close(*cursor);
+   *cursor= NULL;
+   return false;
+ }
+ /**
+  * Try to get an item from the database
+  *
+  * @param trx the transaction to use
+  * @param key the key to get
+  * @param nkey the lenght of the key
+  * @return a pointer to the item if I found it in the database
+  */
 -  if (do_locate_item(trx, key, nkey, &cursor)) {
++static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) 
++{
+   ib_crsr_t cursor= NULL;
+   ib_tpl_t tuple= NULL;
+   struct item* retval= NULL;
 -    if (retval == NULL) {
++  if (do_locate_item(trx, key, nkey, &cursor)) 
++  {
+     tuple= ib_clust_read_tuple_create(cursor);
+     if (tuple == NULL)
+     {
+       fprintf(stderr, "Failed to create read tuple\n");
+       goto error_exit;
+     }
+     checked(ib_cursor_read_row(cursor, tuple));
+     ib_col_meta_t meta;
+     ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
+     ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
+     ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
+     ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
+     const void *dataptr= ib_col_get_value(tuple, data_col_idx);
+     retval= create_item(key, nkey, dataptr, datalen, 0, 0);
 -    if (flaglen != 0) {
++    if (retval == NULL) 
++    {
+       fprintf(stderr, "Failed to allocate memory\n");
+       goto error_exit;
+     }
 -    if (caslen != 0) {
++    if (flaglen != 0) 
++    {
+       ib_u32_t val;
+       checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
+       retval->flags= (uint32_t)val;
+     }
 -    if (explen != 0) {
++    if (caslen != 0) 
++    {
+       ib_u64_t val;
+       checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
+       retval->cas= (uint64_t)val;
+     }
 -bool initialize_storage(void) {
++    if (explen != 0) 
++    {
+       ib_u32_t val;
+       checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
+       retval->exp= (time_t)val;
+     }
+   }
+   /* Release resources */
+   /* FALLTHROUGH */
+  error_exit:
+   if (tuple != NULL)
+     ib_tuple_delete(tuple);
+   if (cursor != NULL)
+     ib_cursor_close(cursor);
+   return retval;
+ }
+ /**
+  * Delete an item from the cache
+  * @param trx the transaction to use
+  * @param key the key of the item to delete
+  * @param nkey the length of the key
+  * @return true if we should go ahead and commit the transaction
+  *         or false if we should roll back (if the key didn't exists)
+  */
+ static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
+   ib_crsr_t cursor= NULL;
+   bool retval= false;
+   if (do_locate_item(trx, key, nkey, &cursor))
+   {
+     checked(ib_cursor_lock(cursor, IB_LOCK_X));
+     checked(ib_cursor_delete_row(cursor));
+     retval= true;
+   }
+   /* Release resources */
+   /* FALLTHROUGH */
+  error_exit:
+   if (cursor != NULL)
+     ib_cursor_close(cursor);
+   return retval;
+ }
+ /****************************************************************************
+  * External interface
+  ***************************************************************************/
+ /**
+  * Initialize the database storage
+  * @return true if the database was initialized successfully, false otherwise
+  */
 -  if (error == DB_TABLE_NOT_FOUND) {
 -    if (!create_schema()) {
++bool initialize_storage(void) 
++{
+   ib_err_t error;
+   ib_id_t tid;
+   checked(ib_init());
+   checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
+   checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
+   checked(ib_cfg_set_bool_on("file_per_table"));
+   checked(ib_startup("barracuda"));
+   /* check to see if the table exists or if we should create the schema */
+   error= ib_table_get_id(tablename, &tid);
 -  } else if (error != DB_SUCCESS) {
++  if (error == DB_TABLE_NOT_FOUND) 
++  {
++    if (!create_schema()) 
++    {
+       return false;
+     }
 -void shutdown_storage(void) {
++  } 
++  else if (error != DB_SUCCESS) 
++  {
+     fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
+     return false;
+   }
+   return true;
+  error_exit:
+   return false;
+ }
+ /**
+  * Shut down this storage engine
+  */
 -void put_item(struct item* item) {
++void shutdown_storage(void) 
++{
+   checked(ib_shutdown());
+  error_exit:
+   ;
+ }
+ /**
+  * Store an item in the databse
+  *
+  * @param item the item to store
+  */
 -  if (do_put_item(transaction, item)) {
++void put_item(struct item* item) 
++{
+   ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
 -    if (error != DB_SUCCESS) {
++  if (do_put_item(transaction, item)) 
++  {
+     ib_err_t error= ib_trx_commit(transaction);
 -  } else {
++    if (error != DB_SUCCESS) 
++    {
+       fprintf(stderr, "Failed to store key:\n\t%s\n",
+               ib_strerror(error));
+     }
 -struct item* get_item(const void* key, size_t nkey) {
++  } 
++  else 
++  {
+     ib_err_t error= ib_trx_rollback(transaction);
+     if (error != DB_SUCCESS)
+       fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+               ib_strerror(error));
+   }
+ }
+ /**
+  * Get an item from the engine
+  * @param key the key to grab
+  * @param nkey number of bytes in the key
+  * @return pointer to the item if found
+  */
 -void flush(uint32_t when __attribute__((unused))) {
++struct item* get_item(const void* key, size_t nkey) 
++{
+   ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
+   struct item* ret= do_get_item(transaction, key, nkey);
+   ib_err_t error= ib_trx_rollback(transaction);
++
+   if (error != DB_SUCCESS)
+     fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+             ib_strerror(error));
+   return ret;
+ }
+ /**
+  * Create an item structure and initialize it with the content
+  *
+  * @param key the key for the item
+  * @param nkey the number of bytes in the key
+  * @param data pointer to the value for the item (may be NULL)
+  * @param size the size of the data
+  * @param flags the flags to store with the data
+  * @param exp the expiry time for the item
+  * @return pointer to an initialized item object or NULL if allocation failed
+  */
+ struct item* create_item(const void* key, size_t nkey, const void* data,
+                          size_t size, uint32_t flags, time_t exp)
+ {
+   struct item* ret= calloc(1, sizeof(*ret));
+   if (ret != NULL)
+   {
+     ret->key= malloc(nkey);
+     if (size > 0)
+     {
+       ret->data= malloc(size);
+     }
+     if (ret->key == NULL || (size > 0 && ret->data == NULL))
+     {
+       free(ret->key);
+       free(ret->data);
+       free(ret);
+       return NULL;
+     }
+     memcpy(ret->key, key, nkey);
+     if (data != NULL)
+     {
+       memcpy(ret->data, data, size);
+     }
+     ret->nkey= nkey;
+     ret->size= size;
+     ret->flags= flags;
+     ret->exp= exp;
+   }
+   return ret;
+ }
+ /**
+  * Delete an item from the cache
+  * @param key the key of the item to delete
+  * @param nkey the length of the key
+  * @return true if the item was deleted from the cache
+  */
+ bool delete_item(const void* key, size_t nkey) {
+   ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
+   bool ret= do_delete_item(transaction, key, nkey);
+   if (ret)
+   {
+     /* object found. commit transaction */
+     ib_err_t error= ib_trx_commit(transaction);
+     if (error != DB_SUCCESS)
+     {
+       fprintf(stderr, "Failed to delete key:\n\t%s\n",
+               ib_strerror(error));
+       ret= false;
+     }
+   }
+   else
+   {
+     ib_err_t error= ib_trx_rollback(transaction);
+     if (error != DB_SUCCESS)
+       fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+               ib_strerror(error));
+   }
+   return ret;
+ }
+ /**
+  * Flush the entire cache
+  * @param when when the cache should be flushed (0 == immediately)
+  */
 -      do {
++void flush(uint32_t when __attribute__((unused))) 
++{
+   /* @TODO implement support for when != 0 */
+   ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
+   ib_crsr_t cursor= NULL;
+       ib_err_t err= DB_SUCCESS;
+   checked(ib_cursor_open_table(tablename, transaction, &cursor));
+   checked(ib_cursor_first(cursor));
+   checked(ib_cursor_lock(cursor, IB_LOCK_X));
 -void update_cas(struct item* item) {
++  do 
++  {
+     checked(ib_cursor_delete_row(cursor));
+   } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
+   if (err != DB_END_OF_INDEX)
+   {
+     fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
+     goto error_exit;
+   }
+   ib_cursor_close(cursor);
+   cursor= NULL;
+   checked(ib_trx_commit(transaction));
+   return;
+  error_exit:
+   if (cursor != NULL)
+     ib_cursor_close(cursor);
+   ib_err_t error= ib_trx_rollback(transaction);
+   if (error != DB_SUCCESS)
+     fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+             ib_strerror(error));
+ }
+ /**
+  * Update the cas ID in the item structure
+  * @param item the item to update
+  */
 -void release_item(struct item* item) {
++void update_cas(struct item* item) 
++{
+   item->cas= ++cas;
+ }
+ /**
+  * Release all the resources allocated by the item
+  * @param item the item to release
+  */
++void release_item(struct item* item) 
++{
+   free(item->key);
+   free(item->data);
+   free(item);
+ }
index 0000000000000000000000000000000000000000,12cf248c18566db06e83250efd964d06425e2165..adba9ad7f73bfa748d7299d6adfbbc98079fc592
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,359 +1,361 @@@
 -    if (ret->buffer_cache == NULL) {
+ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+ #include "libmemcached/protocol/common.h"
+ #include <stdlib.h>
+ #include <sys/types.h>
+ #include <sys/socket.h>
+ #include <errno.h>
+ #include <stdbool.h>
+ #include <string.h>
+ #include <strings.h>
+ #include <ctype.h>
+ #include <stdio.h>
+ /*
+ ** **********************************************************************
+ ** INTERNAL INTERFACE
+ ** **********************************************************************
+ */
+ /**
+  * The default function to receive data from the client. This function
+  * just wraps the recv function to receive from a socket.
+  * See man -s3socket recv for more information.
+  *
+  * @param cookie cookie indentifying a client, not used
+  * @param sock socket to read from
+  * @param buf the destination buffer
+  * @param nbytes the number of bytes to read
+  * @return the number of bytes transferred of -1 upon error
+  */
+ static ssize_t default_recv(const void *cookie,
+                             int sock,
+                             void *buf,
+                             size_t nbytes)
+ {
+   (void)cookie;
+   return recv(sock, buf, nbytes, 0);
+ }
+ /**
+  * The default function to send data to the server. This function
+  * just wraps the send function to send through a socket.
+  * See man -s3socket send for more information.
+  *
+  * @param cookie cookie indentifying a client, not used
+  * @param sock socket to send to
+  * @param buf the source buffer
+  * @param nbytes the number of bytes to send
+  * @return the number of bytes transferred of -1 upon error
+  */
+ static ssize_t default_send(const void *cookie,
+                             int fd,
+                             const void *buf,
+                             size_t nbytes)
+ {
+   (void)cookie;
+   return send(fd, buf, nbytes, 0);
+ }
+ /**
+  * Try to drain the output buffers without blocking
+  *
+  * @param client the client to drain
+  * @return false if an error occured (connection should be shut down)
+  *         true otherwise (please note that there may be more data to
+  *              left in the buffer to send)
+  */
+ static bool drain_output(struct memcached_protocol_client_st *client)
+ {
+   ssize_t len;
+   /* Do we have pending data to send? */
+   while (client->output != NULL)
+   {
+     len= client->root->send(client,
+                             client->sock,
+                             client->output->data + client->output->offset,
+                             client->output->nbytes - client->output->offset);
+     if (len == -1)
+     {
+       if (errno == EWOULDBLOCK)
+       {
+         return true;
+       }
+       else if (errno != EINTR)
+       {
+         client->error= errno;
+         return false;
+       }
+     }
+     else
+     {
+       client->output->offset += (size_t)len;
+       if (client->output->offset == client->output->nbytes)
+       {
+         /* This was the complete buffer */
+         struct chunk_st *old= client->output;
+         client->output= client->output->next;
+         if (client->output == NULL)
+         {
+           client->output_tail= NULL;
+         }
+         cache_free(client->root->buffer_cache, old);
+       }
+     }
+   }
+   return true;
+ }
+ /**
+  * Allocate an output buffer and chain it into the output list
+  *
+  * @param client the client that needs the buffer
+  * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
+  */
+ static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
+ {
+   struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
++
+   if (ret == NULL)
+   {
+     return NULL;
+   }
+   ret->offset = ret->nbytes = 0;
+   ret->next = NULL;
+   ret->size = CHUNK_BUFFERSIZE;
+   ret->data= (void*)(ret + 1);
+   if (client->output == NULL)
+   {
+     client->output = client->output_tail = ret;
+   }
+   else
+   {
+     client->output_tail->next= ret;
+     client->output_tail= ret;
+   }
+   return ret;
+ }
+ /**
+  * Spool data into the send-buffer for a client.
+  *
+  * @param client the client to spool the data for
+  * @param data the data to spool
+  * @param length the number of bytes of data to spool
+  * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
+  *         PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
+  */
+ static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
+                                                     const void *data,
+                                                     size_t length)
+ {
+   if (client->mute)
+   {
+     return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+   }
+   size_t offset = 0;
+   struct chunk_st *chunk= client->output;
+   while (offset < length)
+   {
+     if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
+     {
+       if ((chunk= allocate_output_chunk(client)) == NULL)
+       {
+         return PROTOCOL_BINARY_RESPONSE_ENOMEM;
+       }
+     }
+     size_t bulk = length - offset;
+     if (bulk > chunk->size - chunk->nbytes)
+     {
+       bulk = chunk->size - chunk->nbytes;
+     }
+     memcpy(chunk->data + chunk->nbytes, data, bulk);
+     chunk->nbytes += bulk;
+     offset += bulk;
+   }
+   return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ }
+ /**
+  * Try to determine the protocol used on this connection.
+  * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
+  * be using the binary protocol on the connection. I implemented the support
+  * for the ASCII protocol by wrapping into the simple interface (aka v1),
+  * so the implementors needs to provide an implementation of that interface
+  *
+  */
+ static enum MEMCACHED_PROTOCOL_EVENT determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
+ {
+   if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
+   {
+     client->work= memcached_binary_protocol_process_data;
+   }
+   else if (client->root->callback->interface_version == 1)
+   {
+     /*
+      * The ASCII protocol can only be used if the implementors provide
+      * an implementation for the version 1 of the interface..
+      *
+      * @todo I should allow the implementors to provide an implementation
+      *       for version 0 and 1 at the same time and set the preferred
+      *       interface to use...
+      */
+     client->work= memcached_ascii_protocol_process_data;
+   }
+   else
+   {
+     /* Let's just output a warning the way it is supposed to look like
+      * in the ASCII protocol...
+      */
+     const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
+     client->root->spool(client, err, strlen(err));
+     client->root->drain(client);
+     return ERROR_EVENT; /* Unsupported protocol */
+   }
+   return client->work(client, length, endptr);
+ }
+ /*
+ ** **********************************************************************
+ ** * PUBLIC INTERFACE
+ ** * See protocol_handler.h for function description
+ ** **********************************************************************
+ */
+ struct memcached_protocol_st *memcached_protocol_create_instance(void)
+ {
+   struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
+   if (ret != NULL)
+   {
+     ret->recv= default_recv;
+     ret->send= default_send;
+     ret->drain= drain_output;
+     ret->spool= spool_output;
+     ret->input_buffer_size= 1 * 1024 * 1024;
+     ret->input_buffer= malloc(ret->input_buffer_size);
+     if (ret->input_buffer == NULL)
+     {
+       free(ret);
+       ret= NULL;
+       return NULL;
+     }
+     ret->buffer_cache = cache_create("protocol_handler",
+                                      CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
+                                      0, NULL, NULL);
++    if (ret->buffer_cache == NULL) 
++    {
+       free(ret->input_buffer);
+       free(ret);
+     }
+   }
+   return ret;
+ }
+ void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
+ {
+   cache_destroy(instance->buffer_cache);
+   free(instance->input_buffer);
+   free(instance);
+ }
+ struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, int sock)
+ {
+   struct memcached_protocol_client_st *ret= calloc(1, sizeof(*ret));
+   if (ret != NULL)
+   {
+     ret->root= instance;
+     ret->sock= sock;
+     ret->work= determine_protocol;
+   }
+   return ret;
+ }
+ void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
+ {
+   free(client);
+ }
+ enum MEMCACHED_PROTOCOL_EVENT memcached_protocol_client_work(struct memcached_protocol_client_st *client)
+ {
+   /* Try to send data and read from the socket */
+   bool more_data= true;
+   do
+   {
+     ssize_t len= client->root->recv(client,
+                                     client->sock,
+                                     client->root->input_buffer + client->input_buffer_offset,
+                                     client->root->input_buffer_size - client->input_buffer_offset);
+     if (len > 0)
+     {
+       /* Do we have the complete packet? */
+       if (client->input_buffer_offset > 0)
+       {
+         memcpy(client->root->input_buffer, client->input_buffer,
+                client->input_buffer_offset);
+         len += (ssize_t)client->input_buffer_offset;
+         /* @todo use buffer-cache! */
+         free(client->input_buffer);
+         client->input_buffer_offset= 0;
+       }
+       void *endptr;
+       if (client->work(client, &len, &endptr) == ERROR_EVENT)
+       {
+         return ERROR_EVENT;
+       }
+       if (len > 0)
+       {
+         /* save the data for later on */
+         /* @todo use buffer-cache */
+         client->input_buffer= malloc((size_t)len);
+         if (client->input_buffer == NULL)
+         {
+           client->error= ENOMEM;
+           return ERROR_EVENT;
+         }
+         memcpy(client->input_buffer, endptr, (size_t)len);
+         client->input_buffer_offset= (size_t)len;
+         more_data= false;
+       }
+     }
+     else if (len == 0)
+     {
+       /* Connection closed */
+       drain_output(client);
+       return ERROR_EVENT;
+     }
+     else
+     {
+       if (errno != EWOULDBLOCK)
+       {
+         client->error= errno;
+         /* mark this client as terminated! */
+         return ERROR_EVENT;
+       }
+       more_data = false;
+     }
+   } while (more_data);
+   if (!drain_output(client))
+   {
+     return ERROR_EVENT;
+   }
+   return (client->output) ? READ_WRITE_EVENT : READ_EVENT;
+ }