--- /dev/null
-static void work(void) {
+ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+ /**
+ * What is a library without an example to show you how to use the library?
+ * This example use both interfaces to implement a small memcached server.
+ * Please note that this is an exemple on how to use the library, not
+ * an implementation of a scalable memcached server. If you look closely
+ * at the example it isn't even multithreaded ;-)
+ *
+ * With that in mind, let me give you some pointers into the source:
+ * storage.c/h - Implements the item store for this server and not really
+ * interesting for this example.
+ * interface_v0.c - Shows an implementation of the memcached server by using
+ * the "raw" access to the packets as they arrive
+ * interface_v1.c - Shows an implementation of the memcached server by using
+ * the more "logical" interface.
+ * memcached_light.c - This file sets up all of the sockets and run the main
+ * message loop.
+ */
+
+ #include <assert.h>
+ #include <sys/types.h>
+ #include <sys/socket.h>
+ #include <netdb.h>
+ #include <netinet/tcp.h>
+ #include <stdio.h>
+ #include <unistd.h>
+ #include <fcntl.h>
+ #include <errno.h>
+ #include <stdlib.h>
+ #include <string.h>
+ #include <poll.h>
+
+ #include <libmemcached/protocol_handler.h>
+ #include <libmemcached/byteorder.h>
+ #include "storage.h"
+
+ extern struct memcached_binary_protocol_callback_st interface_v0_impl;
+ extern struct memcached_binary_protocol_callback_st interface_v1_impl;
+
+ static int server_sockets[1024];
+ static int num_server_sockets= 0;
+ static void* socket_userdata_map[1024];
+ static bool verbose= false;
+
+ /**
+ * Create a socket and bind it to a specific port number
+ * @param port the port number to bind to
+ */
+ static int server_socket(const char *port) {
+ struct addrinfo *ai;
+ struct addrinfo hints= { .ai_flags= AI_PASSIVE,
+ .ai_family= AF_UNSPEC,
+ .ai_socktype= SOCK_STREAM };
+
+ int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
+ if (error != 0)
+ {
+ if (error != EAI_SYSTEM)
+ fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
+ else
+ perror("getaddrinfo()");
+
+ return 1;
+ }
+
+ struct linger ling= {0, 0};
+
+ for (struct addrinfo *next= ai; next; next= next->ai_next)
+ {
+ int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+ if (sock == -1)
+ {
+ perror("Failed to create socket");
+ continue;
+ }
+
+ int flags= fcntl(sock, F_GETFL, 0);
+ if (flags == -1)
+ {
+ perror("Failed to get socket flags");
+ close(sock);
+ continue;
+ }
+
+ if ((flags & O_NONBLOCK) != O_NONBLOCK)
++ {
+ if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
+ {
+ perror("Failed to set socket to nonblocking mode");
+ close(sock);
+ continue;
+ }
++ }
+
+ flags= 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
+ perror("Failed to set SO_REUSEADDR");
+
+ if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
+ perror("Failed to set SO_KEEPALIVE");
+
+ if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
+ perror("Failed to set SO_LINGER");
+
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
+ perror("Failed to set TCP_NODELAY");
+
+ if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
+ {
+ if (errno != EADDRINUSE)
+ {
+ perror("bind()");
+ freeaddrinfo(ai);
+ }
+ close(sock);
+ continue;
+ }
+
+ if (listen(sock, 1024) == -1)
+ {
+ perror("listen()");
+ close(sock);
+ continue;
+ }
+
+ server_sockets[num_server_sockets++]= sock;
+ }
+
+ freeaddrinfo(ai);
+
+ return (num_server_sockets > 0) ? 0 : 1;
+ }
+
+ /**
+ * Convert a command code to a textual string
+ * @param cmd the comcode to convert
+ * @return a textual string with the command or NULL for unknown commands
+ */
+ static const char* comcode2str(uint8_t cmd)
+ {
+ static const char * const text[] = {
+ "GET", "SET", "ADD", "REPLACE", "DELETE",
+ "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
+ "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
+ "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
+ "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
+ "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
+ };
+
+ if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
+ return text[cmd];
+
+ return NULL;
+ }
+
+ /**
+ * Print out the command we are about to execute
+ */
+ static void pre_execute(const void *cookie __attribute__((unused)),
+ protocol_binary_request_header *header __attribute__((unused)))
+ {
+ if (verbose)
+ {
+ const char *cmd= comcode2str(header->request.opcode);
+ if (cmd != NULL)
+ fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
+ else
+ fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+ }
+ }
+
+ /**
+ * Print out the command we just executed
+ */
+ static void post_execute(const void *cookie __attribute__((unused)),
+ protocol_binary_request_header *header __attribute__((unused)))
+ {
+ if (verbose)
+ {
+ const char *cmd= comcode2str(header->request.opcode);
+ if (cmd != NULL)
+ fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
+ else
+ fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+ }
+ }
+
+ /**
+ * Callback handler for all unknown commands.
+ * Send an unknown command back to the client
+ */
+ static protocol_binary_response_status unknown(const void *cookie,
+ protocol_binary_request_header *header,
+ memcached_binary_protocol_raw_response_handler response_handler)
+ {
+ protocol_binary_response_no_extras response= {
+ .message= {
+ .header.response= {
+ .magic= PROTOCOL_BINARY_RES,
+ .opcode= header->request.opcode,
+ .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
+ .opaque= header->request.opaque
+ }
+ }
+ };
+
+ return response_handler(cookie, header, (void*)&response);
+ }
+
+ static void work(void);
+
+ /**
+ * Program entry point. Bind to the specified port(s) and serve clients
+ *
+ * @param argc number of items in the argument vector
+ * @param argv argument vector
+ * @return 0 on success, 1 otherwise
+ */
+ int main(int argc, char **argv)
+ {
+ bool port_specified= false;
+ int cmd;
+ struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
+
+ while ((cmd= getopt(argc, argv, "v1p:?")) != EOF)
+ {
+ switch (cmd) {
+ case '1':
+ interface= &interface_v1_impl;
+ break;
+ case 'p':
+ port_specified= true;
+ (void)server_socket(optarg);
+ break;
+ case 'v':
+ verbose= true;
+ break;
+ case '?': /* FALLTHROUGH */
+ default:
+ (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]);
+ return 1;
+ }
+ }
+
+ if (!initialize_storage())
+ {
+ /* Error message already printed */
+ return 1;
+ }
+
+ if (!port_specified)
+ (void)server_socket("9999");
+
+ if (num_server_sockets == 0)
+ {
+ fprintf(stderr, "I don't have any server sockets\n");
+ return 1;
+ }
+
+ /*
+ * Create and initialize the handles to the protocol handlers. I want
+ * to be able to trace the traffic throught the pre/post handlers, and
+ * set up a common handler for unknown messages
+ */
+ interface->pre_execute= pre_execute;
+ interface->post_execute= post_execute;
+ interface->unknown= unknown;
+
+ struct memcached_protocol_st *protocol_handle;
+ if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
+ {
+ fprintf(stderr, "Failed to allocate protocol handle\n");
+ return 1;
+ }
+
+ memcached_binary_protocol_set_callbacks(protocol_handle, interface);
+ memcached_binary_protocol_set_pedantic(protocol_handle, true);
+
+ for (int xx= 0; xx < num_server_sockets; ++xx)
+ socket_userdata_map[server_sockets[xx]]= protocol_handle;
+
+ /* Serve all of the clients */
+ work();
+
+ /* NOTREACHED */
+ return 0;
+ }
+
++static void work(void)
++{
+ #define MAX_SERVERS_TO_POLL 100
+ struct pollfd fds[MAX_SERVERS_TO_POLL];
+ int max_poll;
+
+ for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
+ {
+ fds[max_poll].events= POLLIN;
+ fds[max_poll].revents= 0;
+ fds[max_poll].fd= server_sockets[max_poll];
+ ++max_poll;
+ }
+
+ while (true)
+ {
+ int err= poll(fds, (nfds_t)max_poll, -1);
+
+ if (err == 0 || (err == -1 && errno != EINTR))
+ {
+ perror("poll() failed");
+ abort();
+ }
+
+ /* find the available filedescriptors */
+ for (int x= max_poll - 1; x > -1 && err > 0; --x)
+ {
+ if (fds[x].revents != 0)
+ {
+ --err;
+ if (x < num_server_sockets)
+ {
+ /* accept new client */
+ struct sockaddr_storage addr;
+ socklen_t addrlen= sizeof(addr);
+ int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
+ &addrlen);
+
+ if (sock == -1)
+ {
+ perror("Failed to accept client");
+ continue;
+ }
+
+ struct memcached_protocol_st *protocol;
+ protocol= socket_userdata_map[fds[x].fd];
+
+ struct memcached_protocol_client_st* c;
+ c= memcached_protocol_create_client(protocol, sock);
+ if (c == NULL)
+ {
+ fprintf(stderr, "Failed to create client\n");
+ close(sock);
+ }
+ else
+ {
+ socket_userdata_map[sock]= c;
+ fds[max_poll].events= POLLIN;
+ fds[max_poll].revents= 0;
+ fds[max_poll].fd= sock;
+ ++max_poll;
+ }
+ }
+ else
+ {
+ /* drive the client */
+ struct memcached_protocol_client_st* c;
+ c= socket_userdata_map[fds[x].fd];
+ assert(c != NULL);
+ fds[max_poll].events= 0;
+
+ switch (memcached_protocol_client_work(c))
+ {
+ case WRITE_EVENT:
+ case READ_WRITE_EVENT:
+ fds[max_poll].events= POLLOUT;
+ /* FALLTHROUGH */
+ case READ_EVENT:
+ fds[max_poll].events |= POLLIN;
+ break;
+ case ERROR_EVENT:
+ default: /* ERROR or unknown state.. close */
+ memcached_protocol_client_destroy(c);
+ close(fds[x].fd);
+ fds[x].events= 0;
+
+ if (x != max_poll - 1)
+ memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
+
+ --max_poll;
+ }
+ }
+ }
+ }
+ }
+ }
--- /dev/null
-bool initialize_storage(void) {
+ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+ #include <stdlib.h>
+ #include <inttypes.h>
+ #include <time.h>
+ #include <stdbool.h>
+ #include <string.h>
+ #include "storage.h"
+
+ struct list_entry {
+ struct item item;
+ struct list_entry *next;
+ struct list_entry *prev;
+ };
+
+ static struct list_entry *root;
+ static uint64_t cas;
+
-void shutdown_storage(void) {
++bool initialize_storage(void)
++{
+ return true;
+ }
+
-void put_item(struct item* item) {
++void shutdown_storage(void)
++{
+ /* Do nothing */
+ }
+
-struct item* get_item(const void* key, size_t nkey) {
++void put_item(struct item* item)
++{
+ struct list_entry* entry= (void*)item;
++
+ update_cas(item);
+
+ if (root == NULL)
+ {
+ entry->next= entry->prev= entry;
+ }
+ else
+ {
+ entry->prev= root->prev;
+ entry->next= root;
+ entry->prev->next= entry;
+ entry->next->prev= entry;
+ }
+
+ root= entry;
+ }
+
-bool delete_item(const void* key, size_t nkey) {
++struct item* get_item(const void* key, size_t nkey)
++{
+ struct list_entry *walker= root;
++
+ if (root == NULL)
+ {
+ return NULL;
+ }
+
+ do
+ {
+ if (((struct item*)walker)->nkey == nkey &&
+ memcmp(((struct item*)walker)->key, key, nkey) == 0)
+ {
+ return (struct item*)walker;
+ }
+ walker= walker->next;
+ } while (walker != root);
+
+ return NULL;
+ }
+
+ struct item* create_item(const void* key, size_t nkey, const void* data,
+ size_t size, uint32_t flags, time_t exp)
+ {
+ struct item* ret= calloc(1, sizeof(struct list_entry));
++
+ if (ret != NULL)
+ {
+ ret->key= malloc(nkey);
+ if (size > 0)
+ {
+ ret->data= malloc(size);
+ }
+
+ if (ret->key == NULL || (size > 0 && ret->data == NULL))
+ {
+ free(ret->key);
+ free(ret->data);
+ free(ret);
+ return NULL;
+ }
+
+ memcpy(ret->key, key, nkey);
+ if (data != NULL)
+ {
+ memcpy(ret->data, data, size);
+ }
+
+ ret->nkey= nkey;
+ ret->size= size;
+ ret->flags= flags;
+ ret->exp= exp;
+ }
+
+ return ret;
+ }
+
-void flush(uint32_t when) {
++bool delete_item(const void* key, size_t nkey)
++{
+ struct item* item= get_item(key, nkey);
+ bool ret= false;
+
+ if (item)
+ {
+ /* remove from linked list */
+ struct list_entry *entry= (void*)item;
+
+ if (entry->next == entry)
+ {
+ /* Only one object in the list */
+ root= NULL;
+ }
+ else
+ {
+ /* ensure that we don't loose track of the root, and this will
+ * change the start position for the next search ;-) */
+ root= entry->next;
+ entry->prev->next= entry->next;
+ entry->next->prev= entry->prev;
+ }
+
+ free(item->key);
+ free(item->data);
+ free(item);
+ ret= true;
+ }
+
+ return ret;
+ }
+
-void update_cas(struct item* item) {
++void flush(uint32_t when)
++{
+ /* FIXME */
+ (void)when;
+ /* remove the complete linked list */
+ if (root == NULL)
+ {
+ return;
+ }
+
+ root->prev->next= NULL;
+ while (root != NULL)
+ {
+ struct item* tmp= (void*)root;
+ root= root->next;
+
+ free(tmp->key);
+ free(tmp->data);
+ free(tmp);
+ }
+ }
+
-void release_item(struct item* item __attribute__((unused))) {
++void update_cas(struct item* item)
++{
+ item->cas= ++cas;
+ }
+
++void release_item(struct item* item __attribute__((unused)))
++{
+ /* EMPTY */
+ }
--- /dev/null
-static bool create_schema(void) {
+ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+ #include <stdlib.h>
+ #include <inttypes.h>
+ #include <time.h>
+ #include <stdbool.h>
+ #include <string.h>
+ #include <unistd.h>
+ #include <assert.h>
+ #include <embedded_innodb-1.0/innodb.h>
+
+ #include "storage.h"
+
+ const char *tablename= "memcached/items";
+
+ #define key_col_idx 0
+ #define data_col_idx 1
+ #define flags_col_idx 2
+ #define cas_col_idx 3
+ #define exp_col_idx 4
+
+ static uint64_t cas;
+
+ /**
+ * To avoid cluttering down all the code with error checking I use the
+ * following macro. It will execute the statement and verify that the
+ * result of the operation is DB_SUCCESS. If any other error code is
+ * returned it will print an "assert-like" output and jump to the
+ * label error_exit. There I release resources before returning out of
+ * the function.
+ *
+ * @param a the expression to execute
+ *
+ */
+ #define checked(expression) \
+ do { \
+ ib_err_t checked_err= expression; \
+ if (checked_err != DB_SUCCESS) \
+ { \
+ fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \
+ __FILE__, __LINE__, #expression, \
+ ib_strerror(checked_err)); \
+ goto error_exit; \
+ } \
+ } while (0);
+
+ /**
+ * Create the database schema.
+ * @return true if the database schema was created without any problems
+ * false otherwise.
+ */
-static bool do_put_item(ib_trx_t trx, struct item* item) {
++static bool create_schema(void)
++{
+ ib_tbl_sch_t schema= NULL;
+ ib_idx_sch_t dbindex= NULL;
+
+ if (ib_database_create("memcached") != IB_TRUE)
+ {
+ fprintf(stderr, "Failed to create database\n");
+ return false;
+ }
+
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
+ ib_id_t table_id;
+
+ checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
+ checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
+ IB_COL_NOT_NULL, 0, 32767));
+ checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
+ IB_COL_NONE, 0, 1024*1024));
+ checked(ib_table_schema_add_col(schema, "flags", IB_INT,
+ IB_COL_UNSIGNED, 0, 4));
+ checked(ib_table_schema_add_col(schema, "cas", IB_INT,
+ IB_COL_UNSIGNED, 0, 8));
+ checked(ib_table_schema_add_col(schema, "exp", IB_INT,
+ IB_COL_UNSIGNED, 0, 4));
+ checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex));
+ checked(ib_index_schema_add_col(dbindex, "key", 0));
+ checked(ib_index_schema_set_clustered(dbindex));
+ checked(ib_schema_lock_exclusive(transaction));
+ checked(ib_table_create(transaction, schema, &table_id));
+ checked(ib_trx_commit(transaction));
+ ib_table_schema_delete(schema);
+
+ return true;
+
+ error_exit:
+ /* @todo release resources! */
+ {
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+ }
+ return false;
+ }
+
+ /**
+ * Store an item into the database. Update the CAS id on the item before
+ * storing it in the database.
+ *
+ * @param trx the transaction to use
+ * @param item the item to store
+ * @return true if we can go ahead and commit the transaction, false otherwise
+ */
-static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) {
++static bool do_put_item(ib_trx_t trx, struct item* item)
++{
+ update_cas(item);
+
+ ib_crsr_t cursor= NULL;
+ ib_tpl_t tuple= NULL;
+ bool retval= false;
+
+ checked(ib_cursor_open_table(tablename, trx, &cursor));
+ checked(ib_cursor_lock(cursor, IB_LOCK_X));
+ tuple= ib_clust_read_tuple_create(cursor);
+
+ checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
+ checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
+ checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
+ checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
+ checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp));
+ checked(ib_cursor_insert_row(cursor, tuple));
+
+ retval= true;
+ /* Release resources: */
+ /* FALLTHROUGH */
+
+ error_exit:
+ if (tuple != NULL)
+ ib_tuple_delete(tuple);
+
+ if (cursor != NULL)
+ ib_cursor_close(cursor);
+
+ return retval;
+ }
+
+ /**
+ * Try to locate an item in the database. Return a cursor and the tuple to
+ * the item if I found it in the database.
+ *
+ * @param trx the transaction to use
+ * @param key the key of the item to look up
+ * @param nkey the size of the key
+ * @param cursor where to store the cursor (OUT)
+ * @param tuple where to store the tuple (OUT)
+ * @return true if I found the object, false otherwise
+ */
+ static bool do_locate_item(ib_trx_t trx,
+ const void* key,
+ size_t nkey,
+ ib_crsr_t *cursor)
+ {
+ int res;
+ ib_tpl_t tuple= NULL;
+
+ *cursor= NULL;
+
+ checked(ib_cursor_open_table(tablename, trx, cursor));
+ tuple= ib_clust_search_tuple_create(*cursor);
+ if (tuple == NULL)
+ {
+ fprintf(stderr, "Failed to allocate tuple object\n");
+ goto error_exit;
+ }
+
+ checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
+ ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
+
+ if (err == DB_SUCCESS && res == 0)
+ {
+ ib_tuple_delete(tuple);
+ return true;
+ }
+ else if (err != DB_SUCCESS &&
+ err != DB_RECORD_NOT_FOUND &&
+ err != DB_END_OF_INDEX)
+ {
+ fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
+ }
+ /* FALLTHROUGH */
+ error_exit:
+ if (tuple != NULL)
+ ib_tuple_delete(tuple);
+ if (*cursor != NULL)
+ ib_cursor_close(*cursor);
+ *cursor= NULL;
+
+ return false;
+ }
+
+ /**
+ * Try to get an item from the database
+ *
+ * @param trx the transaction to use
+ * @param key the key to get
+ * @param nkey the lenght of the key
+ * @return a pointer to the item if I found it in the database
+ */
- if (do_locate_item(trx, key, nkey, &cursor)) {
++static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey)
++{
+ ib_crsr_t cursor= NULL;
+ ib_tpl_t tuple= NULL;
+ struct item* retval= NULL;
+
- if (retval == NULL) {
++ if (do_locate_item(trx, key, nkey, &cursor))
++ {
+ tuple= ib_clust_read_tuple_create(cursor);
+ if (tuple == NULL)
+ {
+ fprintf(stderr, "Failed to create read tuple\n");
+ goto error_exit;
+ }
+ checked(ib_cursor_read_row(cursor, tuple));
+ ib_col_meta_t meta;
+ ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
+ ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
+ ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
+ ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
+ const void *dataptr= ib_col_get_value(tuple, data_col_idx);
+
+ retval= create_item(key, nkey, dataptr, datalen, 0, 0);
- if (flaglen != 0) {
++ if (retval == NULL)
++ {
+ fprintf(stderr, "Failed to allocate memory\n");
+ goto error_exit;
+ }
+
- if (caslen != 0) {
++ if (flaglen != 0)
++ {
+ ib_u32_t val;
+ checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
+ retval->flags= (uint32_t)val;
+ }
- if (explen != 0) {
++ if (caslen != 0)
++ {
+ ib_u64_t val;
+ checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
+ retval->cas= (uint64_t)val;
+ }
-bool initialize_storage(void) {
++ if (explen != 0)
++ {
+ ib_u32_t val;
+ checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
+ retval->exp= (time_t)val;
+ }
+ }
+
+ /* Release resources */
+ /* FALLTHROUGH */
+
+ error_exit:
+ if (tuple != NULL)
+ ib_tuple_delete(tuple);
+
+ if (cursor != NULL)
+ ib_cursor_close(cursor);
+
+ return retval;
+ }
+
+ /**
+ * Delete an item from the cache
+ * @param trx the transaction to use
+ * @param key the key of the item to delete
+ * @param nkey the length of the key
+ * @return true if we should go ahead and commit the transaction
+ * or false if we should roll back (if the key didn't exists)
+ */
+ static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
+ ib_crsr_t cursor= NULL;
+ bool retval= false;
+
+ if (do_locate_item(trx, key, nkey, &cursor))
+ {
+ checked(ib_cursor_lock(cursor, IB_LOCK_X));
+ checked(ib_cursor_delete_row(cursor));
+ retval= true;
+ }
+ /* Release resources */
+ /* FALLTHROUGH */
+
+ error_exit:
+ if (cursor != NULL)
+ ib_cursor_close(cursor);
+
+ return retval;
+ }
+
+
+ /****************************************************************************
+ * External interface
+ ***************************************************************************/
+
+ /**
+ * Initialize the database storage
+ * @return true if the database was initialized successfully, false otherwise
+ */
- if (error == DB_TABLE_NOT_FOUND) {
- if (!create_schema()) {
++bool initialize_storage(void)
++{
+ ib_err_t error;
+ ib_id_t tid;
+
+ checked(ib_init());
+ checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
+ checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
+ checked(ib_cfg_set_bool_on("file_per_table"));
+ checked(ib_startup("barracuda"));
+
+ /* check to see if the table exists or if we should create the schema */
+ error= ib_table_get_id(tablename, &tid);
- } else if (error != DB_SUCCESS) {
++ if (error == DB_TABLE_NOT_FOUND)
++ {
++ if (!create_schema())
++ {
+ return false;
+ }
-void shutdown_storage(void) {
++ }
++ else if (error != DB_SUCCESS)
++ {
+ fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
+ return false;
+ }
+
+ return true;
+
+ error_exit:
+ return false;
+ }
+
+ /**
+ * Shut down this storage engine
+ */
-void put_item(struct item* item) {
++void shutdown_storage(void)
++{
+ checked(ib_shutdown());
+ error_exit:
+ ;
+ }
+
+ /**
+ * Store an item in the databse
+ *
+ * @param item the item to store
+ */
- if (do_put_item(transaction, item)) {
++void put_item(struct item* item)
++{
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
- if (error != DB_SUCCESS) {
++ if (do_put_item(transaction, item))
++ {
+ ib_err_t error= ib_trx_commit(transaction);
- } else {
++ if (error != DB_SUCCESS)
++ {
+ fprintf(stderr, "Failed to store key:\n\t%s\n",
+ ib_strerror(error));
+ }
-struct item* get_item(const void* key, size_t nkey) {
++ }
++ else
++ {
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+ }
+ }
+
+ /**
+ * Get an item from the engine
+ * @param key the key to grab
+ * @param nkey number of bytes in the key
+ * @return pointer to the item if found
+ */
-void flush(uint32_t when __attribute__((unused))) {
++struct item* get_item(const void* key, size_t nkey)
++{
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
+ struct item* ret= do_get_item(transaction, key, nkey);
+ ib_err_t error= ib_trx_rollback(transaction);
++
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+
+ return ret;
+ }
+
+ /**
+ * Create an item structure and initialize it with the content
+ *
+ * @param key the key for the item
+ * @param nkey the number of bytes in the key
+ * @param data pointer to the value for the item (may be NULL)
+ * @param size the size of the data
+ * @param flags the flags to store with the data
+ * @param exp the expiry time for the item
+ * @return pointer to an initialized item object or NULL if allocation failed
+ */
+ struct item* create_item(const void* key, size_t nkey, const void* data,
+ size_t size, uint32_t flags, time_t exp)
+ {
+ struct item* ret= calloc(1, sizeof(*ret));
+ if (ret != NULL)
+ {
+ ret->key= malloc(nkey);
+ if (size > 0)
+ {
+ ret->data= malloc(size);
+ }
+
+ if (ret->key == NULL || (size > 0 && ret->data == NULL))
+ {
+ free(ret->key);
+ free(ret->data);
+ free(ret);
+ return NULL;
+ }
+
+ memcpy(ret->key, key, nkey);
+ if (data != NULL)
+ {
+ memcpy(ret->data, data, size);
+ }
+
+ ret->nkey= nkey;
+ ret->size= size;
+ ret->flags= flags;
+ ret->exp= exp;
+ }
+
+ return ret;
+ }
+
+ /**
+ * Delete an item from the cache
+ * @param key the key of the item to delete
+ * @param nkey the length of the key
+ * @return true if the item was deleted from the cache
+ */
+ bool delete_item(const void* key, size_t nkey) {
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
+
+ bool ret= do_delete_item(transaction, key, nkey);
+
+ if (ret)
+ {
+ /* object found. commit transaction */
+ ib_err_t error= ib_trx_commit(transaction);
+ if (error != DB_SUCCESS)
+ {
+ fprintf(stderr, "Failed to delete key:\n\t%s\n",
+ ib_strerror(error));
+ ret= false;
+ }
+ }
+ else
+ {
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+ }
+
+ return ret;
+ }
+
+ /**
+ * Flush the entire cache
+ * @param when when the cache should be flushed (0 == immediately)
+ */
- do {
++void flush(uint32_t when __attribute__((unused)))
++{
+ /* @TODO implement support for when != 0 */
+ ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
+ ib_crsr_t cursor= NULL;
+ ib_err_t err= DB_SUCCESS;
+
+ checked(ib_cursor_open_table(tablename, transaction, &cursor));
+ checked(ib_cursor_first(cursor));
+ checked(ib_cursor_lock(cursor, IB_LOCK_X));
+
-void update_cas(struct item* item) {
++ do
++ {
+ checked(ib_cursor_delete_row(cursor));
+ } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
+
+ if (err != DB_END_OF_INDEX)
+ {
+ fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
+ goto error_exit;
+ }
+ ib_cursor_close(cursor);
+ cursor= NULL;
+ checked(ib_trx_commit(transaction));
+ return;
+
+ error_exit:
+ if (cursor != NULL)
+ ib_cursor_close(cursor);
+
+ ib_err_t error= ib_trx_rollback(transaction);
+ if (error != DB_SUCCESS)
+ fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
+ ib_strerror(error));
+ }
+
+ /**
+ * Update the cas ID in the item structure
+ * @param item the item to update
+ */
-void release_item(struct item* item) {
++void update_cas(struct item* item)
++{
+ item->cas= ++cas;
+ }
+
+ /**
+ * Release all the resources allocated by the item
+ * @param item the item to release
+ */
++void release_item(struct item* item)
++{
+ free(item->key);
+ free(item->data);
+ free(item);
+ }
--- /dev/null
- if (ret->buffer_cache == NULL) {
+ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
+ #include "libmemcached/protocol/common.h"
+
+ #include <stdlib.h>
+ #include <sys/types.h>
+ #include <sys/socket.h>
+ #include <errno.h>
+ #include <stdbool.h>
+ #include <string.h>
+ #include <strings.h>
+ #include <ctype.h>
+ #include <stdio.h>
+
+ /*
+ ** **********************************************************************
+ ** INTERNAL INTERFACE
+ ** **********************************************************************
+ */
+
+ /**
+ * The default function to receive data from the client. This function
+ * just wraps the recv function to receive from a socket.
+ * See man -s3socket recv for more information.
+ *
+ * @param cookie cookie indentifying a client, not used
+ * @param sock socket to read from
+ * @param buf the destination buffer
+ * @param nbytes the number of bytes to read
+ * @return the number of bytes transferred of -1 upon error
+ */
+ static ssize_t default_recv(const void *cookie,
+ int sock,
+ void *buf,
+ size_t nbytes)
+ {
+ (void)cookie;
+ return recv(sock, buf, nbytes, 0);
+ }
+
+ /**
+ * The default function to send data to the server. This function
+ * just wraps the send function to send through a socket.
+ * See man -s3socket send for more information.
+ *
+ * @param cookie cookie indentifying a client, not used
+ * @param sock socket to send to
+ * @param buf the source buffer
+ * @param nbytes the number of bytes to send
+ * @return the number of bytes transferred of -1 upon error
+ */
+ static ssize_t default_send(const void *cookie,
+ int fd,
+ const void *buf,
+ size_t nbytes)
+ {
+ (void)cookie;
+ return send(fd, buf, nbytes, 0);
+ }
+
+ /**
+ * Try to drain the output buffers without blocking
+ *
+ * @param client the client to drain
+ * @return false if an error occured (connection should be shut down)
+ * true otherwise (please note that there may be more data to
+ * left in the buffer to send)
+ */
+ static bool drain_output(struct memcached_protocol_client_st *client)
+ {
+ ssize_t len;
+
+ /* Do we have pending data to send? */
+ while (client->output != NULL)
+ {
+ len= client->root->send(client,
+ client->sock,
+ client->output->data + client->output->offset,
+ client->output->nbytes - client->output->offset);
+
+ if (len == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ {
+ return true;
+ }
+ else if (errno != EINTR)
+ {
+ client->error= errno;
+ return false;
+ }
+ }
+ else
+ {
+ client->output->offset += (size_t)len;
+ if (client->output->offset == client->output->nbytes)
+ {
+ /* This was the complete buffer */
+ struct chunk_st *old= client->output;
+ client->output= client->output->next;
+ if (client->output == NULL)
+ {
+ client->output_tail= NULL;
+ }
+ cache_free(client->root->buffer_cache, old);
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Allocate an output buffer and chain it into the output list
+ *
+ * @param client the client that needs the buffer
+ * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
+ */
+ static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
+ {
+ struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
++
+ if (ret == NULL)
+ {
+ return NULL;
+ }
+
+ ret->offset = ret->nbytes = 0;
+ ret->next = NULL;
+ ret->size = CHUNK_BUFFERSIZE;
+ ret->data= (void*)(ret + 1);
+ if (client->output == NULL)
+ {
+ client->output = client->output_tail = ret;
+ }
+ else
+ {
+ client->output_tail->next= ret;
+ client->output_tail= ret;
+ }
+
+ return ret;
+ }
+
+ /**
+ * Spool data into the send-buffer for a client.
+ *
+ * @param client the client to spool the data for
+ * @param data the data to spool
+ * @param length the number of bytes of data to spool
+ * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
+ * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
+ */
+ static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
+ const void *data,
+ size_t length)
+ {
+ if (client->mute)
+ {
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ }
+
+ size_t offset = 0;
+
+ struct chunk_st *chunk= client->output;
+ while (offset < length)
+ {
+ if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
+ {
+ if ((chunk= allocate_output_chunk(client)) == NULL)
+ {
+ return PROTOCOL_BINARY_RESPONSE_ENOMEM;
+ }
+ }
+
+ size_t bulk = length - offset;
+ if (bulk > chunk->size - chunk->nbytes)
+ {
+ bulk = chunk->size - chunk->nbytes;
+ }
+
+ memcpy(chunk->data + chunk->nbytes, data, bulk);
+ chunk->nbytes += bulk;
+ offset += bulk;
+ }
+
+ return PROTOCOL_BINARY_RESPONSE_SUCCESS;
+ }
+
+ /**
+ * Try to determine the protocol used on this connection.
+ * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
+ * be using the binary protocol on the connection. I implemented the support
+ * for the ASCII protocol by wrapping into the simple interface (aka v1),
+ * so the implementors needs to provide an implementation of that interface
+ *
+ */
+ static enum MEMCACHED_PROTOCOL_EVENT determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
+ {
+ if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
+ {
+ client->work= memcached_binary_protocol_process_data;
+ }
+ else if (client->root->callback->interface_version == 1)
+ {
+ /*
+ * The ASCII protocol can only be used if the implementors provide
+ * an implementation for the version 1 of the interface..
+ *
+ * @todo I should allow the implementors to provide an implementation
+ * for version 0 and 1 at the same time and set the preferred
+ * interface to use...
+ */
+ client->work= memcached_ascii_protocol_process_data;
+ }
+ else
+ {
+ /* Let's just output a warning the way it is supposed to look like
+ * in the ASCII protocol...
+ */
+ const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
+ client->root->spool(client, err, strlen(err));
+ client->root->drain(client);
+ return ERROR_EVENT; /* Unsupported protocol */
+ }
+
+ return client->work(client, length, endptr);
+ }
+
+ /*
+ ** **********************************************************************
+ ** * PUBLIC INTERFACE
+ ** * See protocol_handler.h for function description
+ ** **********************************************************************
+ */
+ struct memcached_protocol_st *memcached_protocol_create_instance(void)
+ {
+ struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
+ if (ret != NULL)
+ {
+ ret->recv= default_recv;
+ ret->send= default_send;
+ ret->drain= drain_output;
+ ret->spool= spool_output;
+ ret->input_buffer_size= 1 * 1024 * 1024;
+ ret->input_buffer= malloc(ret->input_buffer_size);
+ if (ret->input_buffer == NULL)
+ {
+ free(ret);
+ ret= NULL;
+ return NULL;
+ }
+
+ ret->buffer_cache = cache_create("protocol_handler",
+ CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
+ 0, NULL, NULL);
++ if (ret->buffer_cache == NULL)
++ {
+ free(ret->input_buffer);
+ free(ret);
+ }
+ }
+
+ return ret;
+ }
+
+ void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
+ {
+ cache_destroy(instance->buffer_cache);
+ free(instance->input_buffer);
+ free(instance);
+ }
+
+ struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, int sock)
+ {
+ struct memcached_protocol_client_st *ret= calloc(1, sizeof(*ret));
+ if (ret != NULL)
+ {
+ ret->root= instance;
+ ret->sock= sock;
+ ret->work= determine_protocol;
+ }
+
+ return ret;
+ }
+
+ void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
+ {
+ free(client);
+ }
+
+ enum MEMCACHED_PROTOCOL_EVENT memcached_protocol_client_work(struct memcached_protocol_client_st *client)
+ {
+ /* Try to send data and read from the socket */
+ bool more_data= true;
+ do
+ {
+ ssize_t len= client->root->recv(client,
+ client->sock,
+ client->root->input_buffer + client->input_buffer_offset,
+ client->root->input_buffer_size - client->input_buffer_offset);
+
+ if (len > 0)
+ {
+ /* Do we have the complete packet? */
+ if (client->input_buffer_offset > 0)
+ {
+ memcpy(client->root->input_buffer, client->input_buffer,
+ client->input_buffer_offset);
+ len += (ssize_t)client->input_buffer_offset;
+
+ /* @todo use buffer-cache! */
+ free(client->input_buffer);
+ client->input_buffer_offset= 0;
+ }
+
+ void *endptr;
+ if (client->work(client, &len, &endptr) == ERROR_EVENT)
+ {
+ return ERROR_EVENT;
+ }
+
+ if (len > 0)
+ {
+ /* save the data for later on */
+ /* @todo use buffer-cache */
+ client->input_buffer= malloc((size_t)len);
+ if (client->input_buffer == NULL)
+ {
+ client->error= ENOMEM;
+ return ERROR_EVENT;
+ }
+ memcpy(client->input_buffer, endptr, (size_t)len);
+ client->input_buffer_offset= (size_t)len;
+ more_data= false;
+ }
+ }
+ else if (len == 0)
+ {
+ /* Connection closed */
+ drain_output(client);
+ return ERROR_EVENT;
+ }
+ else
+ {
+ if (errno != EWOULDBLOCK)
+ {
+ client->error= errno;
+ /* mark this client as terminated! */
+ return ERROR_EVENT;
+ }
+ more_data = false;
+ }
+ } while (more_data);
+
+ if (!drain_output(client))
+ {
+ return ERROR_EVENT;
+ }
+
+ return (client->output) ? READ_WRITE_EVENT : READ_EVENT;
+ }