+/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+#include <stdlib.h>
+#include <inttypes.h>
+#include <time.h>
+#include <stdbool.h>
+#include <string.h>
+#include <unistd.h>
+#include <assert.h>
+#include <embedded_innodb-1.0/innodb.h>
+
+#include "storage.h"
+
+const char *tablename = "memcached/items";
+
+#define key_col_idx 0
+#define data_col_idx 1
+#define flags_col_idx 2
+#define cas_col_idx 3
+#define exp_col_idx 4
+
+static uint64_t cas;
+
+/**
+ * To avoid cluttering down all the code with error checking I use the
+ * following macro. It will execute the statement and verify that the
+ * result of the operation is DB_SUCCESS. If any other error code is
+ * returned it will print an "assert-like" output and jump to the
+ * label error_exit. There I release resources before returning out of
+ * the function.
+ *
+ * @param a the expression to execute
+ *
+ */
+#define checked(expression) \
+do { \
+ ib_err_t checked_err= expression; \
+ if (checked_err != DB_SUCCESS) \
+ { \
+ fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \
+ __FILE__, __LINE__, #expression, \
+ ib_strerror(checked_err)); \
+ goto error_exit; \
+ } \
+} while (0);
+
+/**
+ * Create the database schema.
+ * @return true if the database schema was created without any problems
+ * false otherwise.
+ */
+static bool create_schema(void) {
+ ib_tbl_sch_t schema= NULL;
+ ib_idx_sch_t index= NULL;
+
+ if (ib_database_create("memcached") != IB_TRUE)
+ {
+ fprintf(stderr, "Failed to create database\n");
+ return false;
+ }
+
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
+ ib_id_t table_id;
+
+ checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
+ checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
+ IB_COL_NOT_NULL, 0, 32767));
+ checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
+ IB_COL_NONE, 0, 1024*1024));
+ checked(ib_table_schema_add_col(schema, "flags", IB_INT,
+ IB_COL_UNSIGNED, 0, 4));
+ checked(ib_table_schema_add_col(schema, "cas", IB_INT,
+ IB_COL_UNSIGNED, 0, 8));
+ checked(ib_table_schema_add_col(schema, "exp", IB_INT,
+ IB_COL_UNSIGNED, 0, 4));
+ checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &index));
+ checked(ib_index_schema_add_col(index, "key", 0));
+ checked(ib_index_schema_set_clustered(index));
+ checked(ib_schema_lock_exclusive(transaction));
+ checked(ib_table_create(transaction, schema, &table_id));
+ checked(ib_trx_commit(transaction));
+ ib_table_schema_delete(schema);
+
+ return true;
+
+ error_exit:
+ /* @todo release resources! */
+ {
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+ }
+ return false;
+}
+
+/**
+ * Store an item into the database. Update the CAS id on the item before
+ * storing it in the database.
+ *
+ * @param trx the transaction to use
+ * @param item the item to store
+ * @return true if we can go ahead and commit the transaction, false otherwise
+ */
+bool do_put_item(ib_trx_t trx, struct item* item) {
+ update_cas(item);
+
+ ib_crsr_t cursor = NULL;
+ ib_tpl_t tuple= NULL;
+ bool retval = false;
+
+ checked(ib_cursor_open_table(tablename, trx, &cursor));
+ checked(ib_cursor_lock(cursor, IB_LOCK_X));
+ tuple= ib_clust_read_tuple_create(cursor);
+
+ checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
+ checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
+ checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
+ checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
+ checked(ib_tuple_write_u32(tuple, exp_col_idx, item->exp));
+ checked(ib_cursor_insert_row(cursor, tuple));
+
+ retval= true;
+ /* Release resources: */
+ /* FALLTHROUGH */
+
+ error_exit:
+ if (tuple != NULL)
+ ib_tuple_delete(tuple);
+
+ if (cursor != NULL)
+ ib_cursor_close(cursor);
+
+ return retval;
+}
+
+/**
+ * Try to locate an item in the database. Return a cursor and the tuple to
+ * the item if I found it in the database.
+ *
+ * @param trx the transaction to use
+ * @param key the key of the item to look up
+ * @param nkey the size of the key
+ * @param cursor where to store the cursor (OUT)
+ * @param tuple where to store the tuple (OUT)
+ * @return true if I found the object, false otherwise
+ */
+static bool do_locate_item(ib_trx_t trx,
+ const void* key,
+ size_t nkey,
+ ib_crsr_t *cursor)
+{
+ int res;
+ ib_tpl_t tuple;
+
+ *cursor = NULL;
+
+ checked(ib_cursor_open_table(tablename, trx, cursor));
+ tuple= ib_clust_search_tuple_create(*cursor);
+ if (tuple == NULL)
+ {
+ fprintf(stderr, "Failed to allocate tuple object\n");
+ goto error_exit;
+ }
+
+ checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
+ ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
+
+ if (err == DB_SUCCESS && res == 0)
+ {
+ ib_tuple_delete(tuple);
+ return true;
+ }
+ else if (err != DB_SUCCESS &&
+ err != DB_RECORD_NOT_FOUND &&
+ err != DB_END_OF_INDEX)
+ {
+ fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
+ }
+ /* FALLTHROUGH */
+ error_exit:
+ if (tuple != NULL)
+ ib_tuple_delete(tuple);
+ if (*cursor != NULL)
+ ib_cursor_close(*cursor);
+ *cursor= NULL;
+
+ return false;
+}
+
+/**
+ * Try to get an item from the database
+ *
+ * @param trx the transaction to use
+ * @param key the key to get
+ * @param nkey the lenght of the key
+ * @return a pointer to the item if I found it in the database
+ */
+static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) {
+ ib_crsr_t cursor= NULL;
+ ib_tpl_t tuple= NULL;
+ struct item* retval= NULL;
+
+ if (do_locate_item(trx, key, nkey, &cursor)) {
+ tuple= ib_clust_read_tuple_create(cursor);
+ if (tuple == NULL)
+ {
+ fprintf(stderr, "Failed to create read tuple\n");
+ goto error_exit;
+ }
+ checked(ib_cursor_read_row(cursor, tuple));
+ ib_col_meta_t meta;
+ ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
+ ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
+ ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
+ ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
+ const void *dataptr= ib_col_get_value(tuple, data_col_idx);
+
+ retval= create_item(key, nkey, dataptr, datalen, 0, 0);
+ if (retval == NULL) {
+ fprintf(stderr, "Failed to allocate memory\n");
+ goto error_exit;
+ }
+
+ if (flaglen != 0) {
+ ib_u32_t val;
+ checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
+ retval->flags= (uint32_t)val;
+ }
+ if (caslen != 0) {
+ ib_u64_t val;
+ checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
+ retval->cas= (uint64_t)val;
+ }
+ if (explen != 0) {
+ ib_u32_t val;
+ checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
+ retval->exp= (uint32_t)val;
+ }
+ }
+
+ /* Release resources */
+ /* FALLTHROUGH */
+
+ error_exit:
+ if (tuple != NULL)
+ ib_tuple_delete(tuple);
+
+ if (cursor != NULL)
+ ib_cursor_close(cursor);
+
+ return retval;
+}
+
+/**
+ * Delete an item from the cache
+ * @param trx the transaction to use
+ * @param key the key of the item to delete
+ * @param nkey the length of the key
+ * @return true if we should go ahead and commit the transaction
+ * or false if we should roll back (if the key didn't exists)
+ */
+static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
+ ib_crsr_t cursor= NULL;
+ bool retval= false;
+
+ if (do_locate_item(trx, key, nkey, &cursor))
+ {
+ checked(ib_cursor_lock(cursor, IB_LOCK_X));
+ checked(ib_cursor_delete_row(cursor));
+ retval = true;
+ }
+ /* Release resources */
+ /* FALLTHROUGH */
+
+ error_exit:
+ if (cursor != NULL)
+ ib_cursor_close(cursor);
+
+ return retval;
+}
+
+
+/****************************************************************************
+ * External interface
+ ***************************************************************************/
+
+/**
+ * Initialize the database storage
+ * @return true if the database was initialized successfully, false otherwise
+ */
+bool initialize_storage(void) {
+ ib_err_t error;
+ ib_id_t tid;
+
+ checked(ib_init());
+ checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
+ checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
+ checked(ib_cfg_set_bool_on("file_per_table"));
+ checked(ib_startup("barracuda"));
+
+ /* check to see if the table exists or if we should create the schema */
+ error= ib_table_get_id(tablename, &tid);
+ if (error == DB_TABLE_NOT_FOUND) {
+ if (!create_schema()) {
+ return false;
+ }
+ } else if (error != DB_SUCCESS) {
+ fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
+ return false;
+ }
+
+ return true;
+
+ error_exit:
+ return false;
+}
+
+/**
+ * Shut down this storage engine
+ */
+void shutdown_storage(void) {
+ checked(ib_shutdown());
+ error_exit:
+ ;
+}
+
+/**
+ * Store an item in the databse
+ *
+ * @param item the item to store
+ */
+void put_item(struct item* item) {
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
+ if (do_put_item(transaction, item)) {
+ ib_err_t error= ib_trx_commit(transaction);
+ if (error != DB_SUCCESS) {
+ fprintf(stderr, "Failed to store key:\n\t%s\n",
+ ib_strerror(error));
+ }
+ } else {
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+ }
+}
+
+/**
+ * Get an item from the engine
+ * @param key the key to grab
+ * @param nkey number of bytes in the key
+ * @return pointer to the item if found
+ */
+struct item* get_item(const void* key, size_t nkey) {
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
+ struct item* ret= do_get_item(transaction, key, nkey);
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+
+ return ret;
+}
+
+/**
+ * Create an item structure and initialize it with the content
+ *
+ * @param key the key for the item
+ * @param nkey the number of bytes in the key
+ * @param data pointer to the value for the item (may be NULL)
+ * @param size the size of the data
+ * @param flags the flags to store with the data
+ * @param exp the expiry time for the item
+ * @return pointer to an initialized item object or NULL if allocation failed
+ */
+struct item* create_item(const void* key, size_t nkey, const void* data,
+ size_t size, uint32_t flags, time_t exp)
+{
+ struct item* ret= calloc(1, sizeof(*ret));
+ if (ret != NULL)
+ {
+ ret->key= malloc(nkey);
+ if (size > 0)
+ {
+ ret->data= malloc(size);
+ }
+
+ if (ret->key == NULL || (size > 0 && ret->data == NULL))
+ {
+ free(ret->key);
+ free(ret->data);
+ free(ret);
+ return NULL;
+ }
+
+ memcpy(ret->key, key, nkey);
+ if (data != NULL)
+ {
+ memcpy(ret->data, data, size);
+ }
+
+ ret->nkey= nkey;
+ ret->size= size;
+ ret->flags= flags;
+ ret->exp= exp;
+ }
+
+ return ret;
+}
+
+/**
+ * Delete an item from the cache
+ * @param key the key of the item to delete
+ * @param nkey the length of the key
+ * @return true if the item was deleted from the cache
+ */
+bool delete_item(const void* key, size_t nkey) {
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
+
+ bool ret= do_delete_item(transaction, key, nkey);
+
+ if (ret)
+ {
+ /* object found. commit transaction */
+ ib_err_t error= ib_trx_commit(transaction);
+ if (error != DB_SUCCESS)
+ {
+ fprintf(stderr, "Failed to delete key:\n\t%s\n",
+ ib_strerror(error));
+ ret= false;
+ }
+ }
+ else
+ {
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+ }
+
+ return ret;
+}
+
+/**
+ * Flush the entire cache
+ * @param when when the cache should be flushed (0 == immediately)
+ */
+void flush(uint32_t when) {
+ /* @TODO implement support for when != 0 */
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
+ ib_crsr_t cursor= NULL;
+ ib_err_t err= DB_SUCCESS;
+
+ checked(ib_cursor_open_table(tablename, transaction, &cursor));
+ checked(ib_cursor_first(cursor));
+ checked(ib_cursor_lock(cursor, IB_LOCK_X));
+
+ do {
+ checked(ib_cursor_delete_row(cursor));
+ } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
+
+ if (err != DB_END_OF_INDEX)
+ {
+ fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
+ goto error_exit;
+ }
+ ib_cursor_close(cursor);
+ cursor= NULL;
+ checked(ib_trx_commit(transaction));
+ return;
+
+ error_exit:
+ if (cursor != NULL)
+ ib_cursor_close(cursor);
+
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+}
+
+/**
+ * Update the cas ID in the item structure
+ * @param item the item to update
+ */
+void update_cas(struct item* item) {
+ item->cas= ++cas;
+}