From: Brian Aker Date: Fri, 2 Oct 2009 00:01:34 +0000 (-0700) Subject: Merge Trond's protocol work. X-Git-Tag: 0.34~17 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=771170f522f2186b7fb0cde6052dbe37c1a9f1b7;p=m6w6%2Flibmemcached Merge Trond's protocol work. --- 771170f522f2186b7fb0cde6052dbe37c1a9f1b7 diff --cc example/memcached_light.c index 00000000,c1bdea04..4343b835 mode 000000,100644..100644 --- a/example/memcached_light.c +++ b/example/memcached_light.c @@@ -1,0 -1,381 +1,384 @@@ + /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ + /** + * What is a library without an example to show you how to use the library? + * This example use both interfaces to implement a small memcached server. + * Please note that this is an exemple on how to use the library, not + * an implementation of a scalable memcached server. If you look closely + * at the example it isn't even multithreaded ;-) + * + * With that in mind, let me give you some pointers into the source: + * storage.c/h - Implements the item store for this server and not really + * interesting for this example. + * interface_v0.c - Shows an implementation of the memcached server by using + * the "raw" access to the packets as they arrive + * interface_v1.c - Shows an implementation of the memcached server by using + * the more "logical" interface. + * memcached_light.c - This file sets up all of the sockets and run the main + * message loop. + */ + + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + + #include + #include + #include "storage.h" + + extern struct memcached_binary_protocol_callback_st interface_v0_impl; + extern struct memcached_binary_protocol_callback_st interface_v1_impl; + + static int server_sockets[1024]; + static int num_server_sockets= 0; + static void* socket_userdata_map[1024]; + static bool verbose= false; + + /** + * Create a socket and bind it to a specific port number + * @param port the port number to bind to + */ + static int server_socket(const char *port) { + struct addrinfo *ai; + struct addrinfo hints= { .ai_flags= AI_PASSIVE, + .ai_family= AF_UNSPEC, + .ai_socktype= SOCK_STREAM }; + + int error= getaddrinfo("127.0.0.1", port, &hints, &ai); + if (error != 0) + { + if (error != EAI_SYSTEM) + fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error)); + else + perror("getaddrinfo()"); + + return 1; + } + + struct linger ling= {0, 0}; + + for (struct addrinfo *next= ai; next; next= next->ai_next) + { + int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (sock == -1) + { + perror("Failed to create socket"); + continue; + } + + int flags= fcntl(sock, F_GETFL, 0); + if (flags == -1) + { + perror("Failed to get socket flags"); + close(sock); + continue; + } + + if ((flags & O_NONBLOCK) != O_NONBLOCK) ++ { + if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) + { + perror("Failed to set socket to nonblocking mode"); + close(sock); + continue; + } ++ } + + flags= 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0) + perror("Failed to set SO_REUSEADDR"); + + if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0) + perror("Failed to set SO_KEEPALIVE"); + + if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0) + perror("Failed to set SO_LINGER"); + + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0) + perror("Failed to set TCP_NODELAY"); + + if (bind(sock, next->ai_addr, next->ai_addrlen) == -1) + { + if (errno != EADDRINUSE) + { + perror("bind()"); + freeaddrinfo(ai); + } + close(sock); + continue; + } + + if (listen(sock, 1024) == -1) + { + perror("listen()"); + close(sock); + continue; + } + + server_sockets[num_server_sockets++]= sock; + } + + freeaddrinfo(ai); + + return (num_server_sockets > 0) ? 0 : 1; + } + + /** + * Convert a command code to a textual string + * @param cmd the comcode to convert + * @return a textual string with the command or NULL for unknown commands + */ + static const char* comcode2str(uint8_t cmd) + { + static const char * const text[] = { + "GET", "SET", "ADD", "REPLACE", "DELETE", + "INCREMENT", "DECREMENT", "QUIT", "FLUSH", + "GETQ", "NOOP", "VERSION", "GETK", "GETKQ", + "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ", + "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ", + "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ" + }; + + if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ) + return text[cmd]; + + return NULL; + } + + /** + * Print out the command we are about to execute + */ + static void pre_execute(const void *cookie __attribute__((unused)), + protocol_binary_request_header *header __attribute__((unused))) + { + if (verbose) + { + const char *cmd= comcode2str(header->request.opcode); + if (cmd != NULL) + fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd); + else + fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode); + } + } + + /** + * Print out the command we just executed + */ + static void post_execute(const void *cookie __attribute__((unused)), + protocol_binary_request_header *header __attribute__((unused))) + { + if (verbose) + { + const char *cmd= comcode2str(header->request.opcode); + if (cmd != NULL) + fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd); + else + fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode); + } + } + + /** + * Callback handler for all unknown commands. + * Send an unknown command back to the client + */ + static protocol_binary_response_status unknown(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) + { + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= header->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND), + .opaque= header->request.opaque + } + } + }; + + return response_handler(cookie, header, (void*)&response); + } + + static void work(void); + + /** + * Program entry point. Bind to the specified port(s) and serve clients + * + * @param argc number of items in the argument vector + * @param argv argument vector + * @return 0 on success, 1 otherwise + */ + int main(int argc, char **argv) + { + bool port_specified= false; + int cmd; + struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl; + + while ((cmd= getopt(argc, argv, "v1p:?")) != EOF) + { + switch (cmd) { + case '1': + interface= &interface_v1_impl; + break; + case 'p': + port_specified= true; + (void)server_socket(optarg); + break; + case 'v': + verbose= true; + break; + case '?': /* FALLTHROUGH */ + default: + (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]); + return 1; + } + } + + if (!initialize_storage()) + { + /* Error message already printed */ + return 1; + } + + if (!port_specified) + (void)server_socket("9999"); + + if (num_server_sockets == 0) + { + fprintf(stderr, "I don't have any server sockets\n"); + return 1; + } + + /* + * Create and initialize the handles to the protocol handlers. I want + * to be able to trace the traffic throught the pre/post handlers, and + * set up a common handler for unknown messages + */ + interface->pre_execute= pre_execute; + interface->post_execute= post_execute; + interface->unknown= unknown; + + struct memcached_protocol_st *protocol_handle; + if ((protocol_handle= memcached_protocol_create_instance()) == NULL) + { + fprintf(stderr, "Failed to allocate protocol handle\n"); + return 1; + } + + memcached_binary_protocol_set_callbacks(protocol_handle, interface); + memcached_binary_protocol_set_pedantic(protocol_handle, true); + + for (int xx= 0; xx < num_server_sockets; ++xx) + socket_userdata_map[server_sockets[xx]]= protocol_handle; + + /* Serve all of the clients */ + work(); + + /* NOTREACHED */ + return 0; + } + -static void work(void) { ++static void work(void) ++{ + #define MAX_SERVERS_TO_POLL 100 + struct pollfd fds[MAX_SERVERS_TO_POLL]; + int max_poll; + + for (max_poll= 0; max_poll < num_server_sockets; ++max_poll) + { + fds[max_poll].events= POLLIN; + fds[max_poll].revents= 0; + fds[max_poll].fd= server_sockets[max_poll]; + ++max_poll; + } + + while (true) + { + int err= poll(fds, (nfds_t)max_poll, -1); + + if (err == 0 || (err == -1 && errno != EINTR)) + { + perror("poll() failed"); + abort(); + } + + /* find the available filedescriptors */ + for (int x= max_poll - 1; x > -1 && err > 0; --x) + { + if (fds[x].revents != 0) + { + --err; + if (x < num_server_sockets) + { + /* accept new client */ + struct sockaddr_storage addr; + socklen_t addrlen= sizeof(addr); + int sock= accept(fds[x].fd, (struct sockaddr *)&addr, + &addrlen); + + if (sock == -1) + { + perror("Failed to accept client"); + continue; + } + + struct memcached_protocol_st *protocol; + protocol= socket_userdata_map[fds[x].fd]; + + struct memcached_protocol_client_st* c; + c= memcached_protocol_create_client(protocol, sock); + if (c == NULL) + { + fprintf(stderr, "Failed to create client\n"); + close(sock); + } + else + { + socket_userdata_map[sock]= c; + fds[max_poll].events= POLLIN; + fds[max_poll].revents= 0; + fds[max_poll].fd= sock; + ++max_poll; + } + } + else + { + /* drive the client */ + struct memcached_protocol_client_st* c; + c= socket_userdata_map[fds[x].fd]; + assert(c != NULL); + fds[max_poll].events= 0; + + switch (memcached_protocol_client_work(c)) + { + case WRITE_EVENT: + case READ_WRITE_EVENT: + fds[max_poll].events= POLLOUT; + /* FALLTHROUGH */ + case READ_EVENT: + fds[max_poll].events |= POLLIN; + break; + case ERROR_EVENT: + default: /* ERROR or unknown state.. close */ + memcached_protocol_client_destroy(c); + close(fds[x].fd); + fds[x].events= 0; + + if (x != max_poll - 1) + memmove(fds + x, fds + x + 1, (size_t)(max_poll - x)); + + --max_poll; + } + } + } + } + } + } diff --cc example/storage.c index 00000000,72439361..39b22caf mode 000000,100644..100644 --- a/example/storage.c +++ b/example/storage.c @@@ -1,0 -1,159 +1,170 @@@ + /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ + #include + #include + #include + #include + #include + #include "storage.h" + + struct list_entry { + struct item item; + struct list_entry *next; + struct list_entry *prev; + }; + + static struct list_entry *root; + static uint64_t cas; + -bool initialize_storage(void) { ++bool initialize_storage(void) ++{ + return true; + } + -void shutdown_storage(void) { ++void shutdown_storage(void) ++{ + /* Do nothing */ + } + -void put_item(struct item* item) { ++void put_item(struct item* item) ++{ + struct list_entry* entry= (void*)item; ++ + update_cas(item); + + if (root == NULL) + { + entry->next= entry->prev= entry; + } + else + { + entry->prev= root->prev; + entry->next= root; + entry->prev->next= entry; + entry->next->prev= entry; + } + + root= entry; + } + -struct item* get_item(const void* key, size_t nkey) { ++struct item* get_item(const void* key, size_t nkey) ++{ + struct list_entry *walker= root; ++ + if (root == NULL) + { + return NULL; + } + + do + { + if (((struct item*)walker)->nkey == nkey && + memcmp(((struct item*)walker)->key, key, nkey) == 0) + { + return (struct item*)walker; + } + walker= walker->next; + } while (walker != root); + + return NULL; + } + + struct item* create_item(const void* key, size_t nkey, const void* data, + size_t size, uint32_t flags, time_t exp) + { + struct item* ret= calloc(1, sizeof(struct list_entry)); ++ + if (ret != NULL) + { + ret->key= malloc(nkey); + if (size > 0) + { + ret->data= malloc(size); + } + + if (ret->key == NULL || (size > 0 && ret->data == NULL)) + { + free(ret->key); + free(ret->data); + free(ret); + return NULL; + } + + memcpy(ret->key, key, nkey); + if (data != NULL) + { + memcpy(ret->data, data, size); + } + + ret->nkey= nkey; + ret->size= size; + ret->flags= flags; + ret->exp= exp; + } + + return ret; + } + -bool delete_item(const void* key, size_t nkey) { ++bool delete_item(const void* key, size_t nkey) ++{ + struct item* item= get_item(key, nkey); + bool ret= false; + + if (item) + { + /* remove from linked list */ + struct list_entry *entry= (void*)item; + + if (entry->next == entry) + { + /* Only one object in the list */ + root= NULL; + } + else + { + /* ensure that we don't loose track of the root, and this will + * change the start position for the next search ;-) */ + root= entry->next; + entry->prev->next= entry->next; + entry->next->prev= entry->prev; + } + + free(item->key); + free(item->data); + free(item); + ret= true; + } + + return ret; + } + -void flush(uint32_t when) { ++void flush(uint32_t when) ++{ + /* FIXME */ + (void)when; + /* remove the complete linked list */ + if (root == NULL) + { + return; + } + + root->prev->next= NULL; + while (root != NULL) + { + struct item* tmp= (void*)root; + root= root->next; + + free(tmp->key); + free(tmp->data); + free(tmp); + } + } + -void update_cas(struct item* item) { ++void update_cas(struct item* item) ++{ + item->cas= ++cas; + } + -void release_item(struct item* item __attribute__((unused))) { ++void release_item(struct item* item __attribute__((unused))) ++{ + /* EMPTY */ + } diff --cc example/storage_innodb.c index 00000000,f11d1267..92a7a4bf mode 000000,100644..100644 --- a/example/storage_innodb.c +++ b/example/storage_innodb.c @@@ -1,0 -1,498 +1,523 @@@ + /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ + #include + #include + #include + #include + #include + #include + #include + #include + + #include "storage.h" + + const char *tablename= "memcached/items"; + + #define key_col_idx 0 + #define data_col_idx 1 + #define flags_col_idx 2 + #define cas_col_idx 3 + #define exp_col_idx 4 + + static uint64_t cas; + + /** + * To avoid cluttering down all the code with error checking I use the + * following macro. It will execute the statement and verify that the + * result of the operation is DB_SUCCESS. If any other error code is + * returned it will print an "assert-like" output and jump to the + * label error_exit. There I release resources before returning out of + * the function. + * + * @param a the expression to execute + * + */ + #define checked(expression) \ + do { \ + ib_err_t checked_err= expression; \ + if (checked_err != DB_SUCCESS) \ + { \ + fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \ + __FILE__, __LINE__, #expression, \ + ib_strerror(checked_err)); \ + goto error_exit; \ + } \ + } while (0); + + /** + * Create the database schema. + * @return true if the database schema was created without any problems + * false otherwise. + */ -static bool create_schema(void) { ++static bool create_schema(void) ++{ + ib_tbl_sch_t schema= NULL; + ib_idx_sch_t dbindex= NULL; + + if (ib_database_create("memcached") != IB_TRUE) + { + fprintf(stderr, "Failed to create database\n"); + return false; + } + + ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE); + ib_id_t table_id; + + checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0)); + checked(ib_table_schema_add_col(schema, "key", IB_BLOB, + IB_COL_NOT_NULL, 0, 32767)); + checked(ib_table_schema_add_col(schema, "data", IB_BLOB, + IB_COL_NONE, 0, 1024*1024)); + checked(ib_table_schema_add_col(schema, "flags", IB_INT, + IB_COL_UNSIGNED, 0, 4)); + checked(ib_table_schema_add_col(schema, "cas", IB_INT, + IB_COL_UNSIGNED, 0, 8)); + checked(ib_table_schema_add_col(schema, "exp", IB_INT, + IB_COL_UNSIGNED, 0, 4)); + checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex)); + checked(ib_index_schema_add_col(dbindex, "key", 0)); + checked(ib_index_schema_set_clustered(dbindex)); + checked(ib_schema_lock_exclusive(transaction)); + checked(ib_table_create(transaction, schema, &table_id)); + checked(ib_trx_commit(transaction)); + ib_table_schema_delete(schema); + + return true; + + error_exit: + /* @todo release resources! */ + { + ib_err_t error= ib_trx_rollback(transaction); + if (error != DB_SUCCESS) + fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", + ib_strerror(error)); + } + return false; + } + + /** + * Store an item into the database. Update the CAS id on the item before + * storing it in the database. + * + * @param trx the transaction to use + * @param item the item to store + * @return true if we can go ahead and commit the transaction, false otherwise + */ -static bool do_put_item(ib_trx_t trx, struct item* item) { ++static bool do_put_item(ib_trx_t trx, struct item* item) ++{ + update_cas(item); + + ib_crsr_t cursor= NULL; + ib_tpl_t tuple= NULL; + bool retval= false; + + checked(ib_cursor_open_table(tablename, trx, &cursor)); + checked(ib_cursor_lock(cursor, IB_LOCK_X)); + tuple= ib_clust_read_tuple_create(cursor); + + checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey)); + checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size)); + checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags)); + checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas)); + checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp)); + checked(ib_cursor_insert_row(cursor, tuple)); + + retval= true; + /* Release resources: */ + /* FALLTHROUGH */ + + error_exit: + if (tuple != NULL) + ib_tuple_delete(tuple); + + if (cursor != NULL) + ib_cursor_close(cursor); + + return retval; + } + + /** + * Try to locate an item in the database. Return a cursor and the tuple to + * the item if I found it in the database. + * + * @param trx the transaction to use + * @param key the key of the item to look up + * @param nkey the size of the key + * @param cursor where to store the cursor (OUT) + * @param tuple where to store the tuple (OUT) + * @return true if I found the object, false otherwise + */ + static bool do_locate_item(ib_trx_t trx, + const void* key, + size_t nkey, + ib_crsr_t *cursor) + { + int res; + ib_tpl_t tuple= NULL; + + *cursor= NULL; + + checked(ib_cursor_open_table(tablename, trx, cursor)); + tuple= ib_clust_search_tuple_create(*cursor); + if (tuple == NULL) + { + fprintf(stderr, "Failed to allocate tuple object\n"); + goto error_exit; + } + + checked(ib_col_set_value(tuple, key_col_idx, key, nkey)); + ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res); + + if (err == DB_SUCCESS && res == 0) + { + ib_tuple_delete(tuple); + return true; + } + else if (err != DB_SUCCESS && + err != DB_RECORD_NOT_FOUND && + err != DB_END_OF_INDEX) + { + fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err)); + } + /* FALLTHROUGH */ + error_exit: + if (tuple != NULL) + ib_tuple_delete(tuple); + if (*cursor != NULL) + ib_cursor_close(*cursor); + *cursor= NULL; + + return false; + } + + /** + * Try to get an item from the database + * + * @param trx the transaction to use + * @param key the key to get + * @param nkey the lenght of the key + * @return a pointer to the item if I found it in the database + */ -static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) { ++static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) ++{ + ib_crsr_t cursor= NULL; + ib_tpl_t tuple= NULL; + struct item* retval= NULL; + - if (do_locate_item(trx, key, nkey, &cursor)) { ++ if (do_locate_item(trx, key, nkey, &cursor)) ++ { + tuple= ib_clust_read_tuple_create(cursor); + if (tuple == NULL) + { + fprintf(stderr, "Failed to create read tuple\n"); + goto error_exit; + } + checked(ib_cursor_read_row(cursor, tuple)); + ib_col_meta_t meta; + ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta); + ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta); + ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta); + ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta); + const void *dataptr= ib_col_get_value(tuple, data_col_idx); + + retval= create_item(key, nkey, dataptr, datalen, 0, 0); - if (retval == NULL) { ++ if (retval == NULL) ++ { + fprintf(stderr, "Failed to allocate memory\n"); + goto error_exit; + } + - if (flaglen != 0) { ++ if (flaglen != 0) ++ { + ib_u32_t val; + checked(ib_tuple_read_u32(tuple, flags_col_idx, &val)); + retval->flags= (uint32_t)val; + } - if (caslen != 0) { ++ if (caslen != 0) ++ { + ib_u64_t val; + checked(ib_tuple_read_u64(tuple, cas_col_idx, &val)); + retval->cas= (uint64_t)val; + } - if (explen != 0) { ++ if (explen != 0) ++ { + ib_u32_t val; + checked(ib_tuple_read_u32(tuple, exp_col_idx, &val)); + retval->exp= (time_t)val; + } + } + + /* Release resources */ + /* FALLTHROUGH */ + + error_exit: + if (tuple != NULL) + ib_tuple_delete(tuple); + + if (cursor != NULL) + ib_cursor_close(cursor); + + return retval; + } + + /** + * Delete an item from the cache + * @param trx the transaction to use + * @param key the key of the item to delete + * @param nkey the length of the key + * @return true if we should go ahead and commit the transaction + * or false if we should roll back (if the key didn't exists) + */ + static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) { + ib_crsr_t cursor= NULL; + bool retval= false; + + if (do_locate_item(trx, key, nkey, &cursor)) + { + checked(ib_cursor_lock(cursor, IB_LOCK_X)); + checked(ib_cursor_delete_row(cursor)); + retval= true; + } + /* Release resources */ + /* FALLTHROUGH */ + + error_exit: + if (cursor != NULL) + ib_cursor_close(cursor); + + return retval; + } + + + /**************************************************************************** + * External interface + ***************************************************************************/ + + /** + * Initialize the database storage + * @return true if the database was initialized successfully, false otherwise + */ -bool initialize_storage(void) { ++bool initialize_storage(void) ++{ + ib_err_t error; + ib_id_t tid; + + checked(ib_init()); + checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light")); + checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light")); + checked(ib_cfg_set_bool_on("file_per_table")); + checked(ib_startup("barracuda")); + + /* check to see if the table exists or if we should create the schema */ + error= ib_table_get_id(tablename, &tid); - if (error == DB_TABLE_NOT_FOUND) { - if (!create_schema()) { ++ if (error == DB_TABLE_NOT_FOUND) ++ { ++ if (!create_schema()) ++ { + return false; + } - } else if (error != DB_SUCCESS) { ++ } ++ else if (error != DB_SUCCESS) ++ { + fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error)); + return false; + } + + return true; + + error_exit: + return false; + } + + /** + * Shut down this storage engine + */ -void shutdown_storage(void) { ++void shutdown_storage(void) ++{ + checked(ib_shutdown()); + error_exit: + ; + } + + /** + * Store an item in the databse + * + * @param item the item to store + */ -void put_item(struct item* item) { ++void put_item(struct item* item) ++{ + ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE); - if (do_put_item(transaction, item)) { ++ if (do_put_item(transaction, item)) ++ { + ib_err_t error= ib_trx_commit(transaction); - if (error != DB_SUCCESS) { ++ if (error != DB_SUCCESS) ++ { + fprintf(stderr, "Failed to store key:\n\t%s\n", + ib_strerror(error)); + } - } else { ++ } ++ else ++ { + ib_err_t error= ib_trx_rollback(transaction); + if (error != DB_SUCCESS) + fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", + ib_strerror(error)); + } + } + + /** + * Get an item from the engine + * @param key the key to grab + * @param nkey number of bytes in the key + * @return pointer to the item if found + */ -struct item* get_item(const void* key, size_t nkey) { ++struct item* get_item(const void* key, size_t nkey) ++{ + ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE); + struct item* ret= do_get_item(transaction, key, nkey); + ib_err_t error= ib_trx_rollback(transaction); ++ + if (error != DB_SUCCESS) + fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", + ib_strerror(error)); + + return ret; + } + + /** + * Create an item structure and initialize it with the content + * + * @param key the key for the item + * @param nkey the number of bytes in the key + * @param data pointer to the value for the item (may be NULL) + * @param size the size of the data + * @param flags the flags to store with the data + * @param exp the expiry time for the item + * @return pointer to an initialized item object or NULL if allocation failed + */ + struct item* create_item(const void* key, size_t nkey, const void* data, + size_t size, uint32_t flags, time_t exp) + { + struct item* ret= calloc(1, sizeof(*ret)); + if (ret != NULL) + { + ret->key= malloc(nkey); + if (size > 0) + { + ret->data= malloc(size); + } + + if (ret->key == NULL || (size > 0 && ret->data == NULL)) + { + free(ret->key); + free(ret->data); + free(ret); + return NULL; + } + + memcpy(ret->key, key, nkey); + if (data != NULL) + { + memcpy(ret->data, data, size); + } + + ret->nkey= nkey; + ret->size= size; + ret->flags= flags; + ret->exp= exp; + } + + return ret; + } + + /** + * Delete an item from the cache + * @param key the key of the item to delete + * @param nkey the length of the key + * @return true if the item was deleted from the cache + */ + bool delete_item(const void* key, size_t nkey) { + ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ); + + bool ret= do_delete_item(transaction, key, nkey); + + if (ret) + { + /* object found. commit transaction */ + ib_err_t error= ib_trx_commit(transaction); + if (error != DB_SUCCESS) + { + fprintf(stderr, "Failed to delete key:\n\t%s\n", + ib_strerror(error)); + ret= false; + } + } + else + { + ib_err_t error= ib_trx_rollback(transaction); + if (error != DB_SUCCESS) + fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", + ib_strerror(error)); + } + + return ret; + } + + /** + * Flush the entire cache + * @param when when the cache should be flushed (0 == immediately) + */ -void flush(uint32_t when __attribute__((unused))) { ++void flush(uint32_t when __attribute__((unused))) ++{ + /* @TODO implement support for when != 0 */ + ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ); + ib_crsr_t cursor= NULL; + ib_err_t err= DB_SUCCESS; + + checked(ib_cursor_open_table(tablename, transaction, &cursor)); + checked(ib_cursor_first(cursor)); + checked(ib_cursor_lock(cursor, IB_LOCK_X)); + - do { ++ do ++ { + checked(ib_cursor_delete_row(cursor)); + } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS); + + if (err != DB_END_OF_INDEX) + { + fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err)); + goto error_exit; + } + ib_cursor_close(cursor); + cursor= NULL; + checked(ib_trx_commit(transaction)); + return; + + error_exit: + if (cursor != NULL) + ib_cursor_close(cursor); + + ib_err_t error= ib_trx_rollback(transaction); + if (error != DB_SUCCESS) + fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", + ib_strerror(error)); + } + + /** + * Update the cas ID in the item structure + * @param item the item to update + */ -void update_cas(struct item* item) { ++void update_cas(struct item* item) ++{ + item->cas= ++cas; + } + + /** + * Release all the resources allocated by the item + * @param item the item to release + */ -void release_item(struct item* item) { ++void release_item(struct item* item) ++{ + free(item->key); + free(item->data); + free(item); + } diff --cc libmemcached/protocol/protocol_handler.c index 00000000,12cf248c..adba9ad7 mode 000000,100644..100644 --- a/libmemcached/protocol/protocol_handler.c +++ b/libmemcached/protocol/protocol_handler.c @@@ -1,0 -1,359 +1,361 @@@ + /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ + #include "libmemcached/protocol/common.h" + + #include + #include + #include + #include + #include + #include + #include + #include + #include + + /* + ** ********************************************************************** + ** INTERNAL INTERFACE + ** ********************************************************************** + */ + + /** + * The default function to receive data from the client. This function + * just wraps the recv function to receive from a socket. + * See man -s3socket recv for more information. + * + * @param cookie cookie indentifying a client, not used + * @param sock socket to read from + * @param buf the destination buffer + * @param nbytes the number of bytes to read + * @return the number of bytes transferred of -1 upon error + */ + static ssize_t default_recv(const void *cookie, + int sock, + void *buf, + size_t nbytes) + { + (void)cookie; + return recv(sock, buf, nbytes, 0); + } + + /** + * The default function to send data to the server. This function + * just wraps the send function to send through a socket. + * See man -s3socket send for more information. + * + * @param cookie cookie indentifying a client, not used + * @param sock socket to send to + * @param buf the source buffer + * @param nbytes the number of bytes to send + * @return the number of bytes transferred of -1 upon error + */ + static ssize_t default_send(const void *cookie, + int fd, + const void *buf, + size_t nbytes) + { + (void)cookie; + return send(fd, buf, nbytes, 0); + } + + /** + * Try to drain the output buffers without blocking + * + * @param client the client to drain + * @return false if an error occured (connection should be shut down) + * true otherwise (please note that there may be more data to + * left in the buffer to send) + */ + static bool drain_output(struct memcached_protocol_client_st *client) + { + ssize_t len; + + /* Do we have pending data to send? */ + while (client->output != NULL) + { + len= client->root->send(client, + client->sock, + client->output->data + client->output->offset, + client->output->nbytes - client->output->offset); + + if (len == -1) + { + if (errno == EWOULDBLOCK) + { + return true; + } + else if (errno != EINTR) + { + client->error= errno; + return false; + } + } + else + { + client->output->offset += (size_t)len; + if (client->output->offset == client->output->nbytes) + { + /* This was the complete buffer */ + struct chunk_st *old= client->output; + client->output= client->output->next; + if (client->output == NULL) + { + client->output_tail= NULL; + } + cache_free(client->root->buffer_cache, old); + } + } + } + + return true; + } + + /** + * Allocate an output buffer and chain it into the output list + * + * @param client the client that needs the buffer + * @return pointer to the new chunk if the allocation succeeds, NULL otherwise + */ + static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client) + { + struct chunk_st *ret= cache_alloc(client->root->buffer_cache); ++ + if (ret == NULL) + { + return NULL; + } + + ret->offset = ret->nbytes = 0; + ret->next = NULL; + ret->size = CHUNK_BUFFERSIZE; + ret->data= (void*)(ret + 1); + if (client->output == NULL) + { + client->output = client->output_tail = ret; + } + else + { + client->output_tail->next= ret; + client->output_tail= ret; + } + + return ret; + } + + /** + * Spool data into the send-buffer for a client. + * + * @param client the client to spool the data for + * @param data the data to spool + * @param length the number of bytes of data to spool + * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success, + * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory + */ + static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client, + const void *data, + size_t length) + { + if (client->mute) + { + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + + size_t offset = 0; + + struct chunk_st *chunk= client->output; + while (offset < length) + { + if (chunk == NULL || (chunk->size - chunk->nbytes) == 0) + { + if ((chunk= allocate_output_chunk(client)) == NULL) + { + return PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + } + + size_t bulk = length - offset; + if (bulk > chunk->size - chunk->nbytes) + { + bulk = chunk->size - chunk->nbytes; + } + + memcpy(chunk->data + chunk->nbytes, data, bulk); + chunk->nbytes += bulk; + offset += bulk; + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + + /** + * Try to determine the protocol used on this connection. + * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should + * be using the binary protocol on the connection. I implemented the support + * for the ASCII protocol by wrapping into the simple interface (aka v1), + * so the implementors needs to provide an implementation of that interface + * + */ + static enum MEMCACHED_PROTOCOL_EVENT determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr) + { + if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ) + { + client->work= memcached_binary_protocol_process_data; + } + else if (client->root->callback->interface_version == 1) + { + /* + * The ASCII protocol can only be used if the implementors provide + * an implementation for the version 1 of the interface.. + * + * @todo I should allow the implementors to provide an implementation + * for version 0 and 1 at the same time and set the preferred + * interface to use... + */ + client->work= memcached_ascii_protocol_process_data; + } + else + { + /* Let's just output a warning the way it is supposed to look like + * in the ASCII protocol... + */ + const char *err= "CLIENT_ERROR: Unsupported protocol\r\n"; + client->root->spool(client, err, strlen(err)); + client->root->drain(client); + return ERROR_EVENT; /* Unsupported protocol */ + } + + return client->work(client, length, endptr); + } + + /* + ** ********************************************************************** + ** * PUBLIC INTERFACE + ** * See protocol_handler.h for function description + ** ********************************************************************** + */ + struct memcached_protocol_st *memcached_protocol_create_instance(void) + { + struct memcached_protocol_st *ret= calloc(1, sizeof(*ret)); + if (ret != NULL) + { + ret->recv= default_recv; + ret->send= default_send; + ret->drain= drain_output; + ret->spool= spool_output; + ret->input_buffer_size= 1 * 1024 * 1024; + ret->input_buffer= malloc(ret->input_buffer_size); + if (ret->input_buffer == NULL) + { + free(ret); + ret= NULL; + return NULL; + } + + ret->buffer_cache = cache_create("protocol_handler", + CHUNK_BUFFERSIZE + sizeof(struct chunk_st), + 0, NULL, NULL); - if (ret->buffer_cache == NULL) { ++ if (ret->buffer_cache == NULL) ++ { + free(ret->input_buffer); + free(ret); + } + } + + return ret; + } + + void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance) + { + cache_destroy(instance->buffer_cache); + free(instance->input_buffer); + free(instance); + } + + struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, int sock) + { + struct memcached_protocol_client_st *ret= calloc(1, sizeof(*ret)); + if (ret != NULL) + { + ret->root= instance; + ret->sock= sock; + ret->work= determine_protocol; + } + + return ret; + } + + void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client) + { + free(client); + } + + enum MEMCACHED_PROTOCOL_EVENT memcached_protocol_client_work(struct memcached_protocol_client_st *client) + { + /* Try to send data and read from the socket */ + bool more_data= true; + do + { + ssize_t len= client->root->recv(client, + client->sock, + client->root->input_buffer + client->input_buffer_offset, + client->root->input_buffer_size - client->input_buffer_offset); + + if (len > 0) + { + /* Do we have the complete packet? */ + if (client->input_buffer_offset > 0) + { + memcpy(client->root->input_buffer, client->input_buffer, + client->input_buffer_offset); + len += (ssize_t)client->input_buffer_offset; + + /* @todo use buffer-cache! */ + free(client->input_buffer); + client->input_buffer_offset= 0; + } + + void *endptr; + if (client->work(client, &len, &endptr) == ERROR_EVENT) + { + return ERROR_EVENT; + } + + if (len > 0) + { + /* save the data for later on */ + /* @todo use buffer-cache */ + client->input_buffer= malloc((size_t)len); + if (client->input_buffer == NULL) + { + client->error= ENOMEM; + return ERROR_EVENT; + } + memcpy(client->input_buffer, endptr, (size_t)len); + client->input_buffer_offset= (size_t)len; + more_data= false; + } + } + else if (len == 0) + { + /* Connection closed */ + drain_output(client); + return ERROR_EVENT; + } + else + { + if (errno != EWOULDBLOCK) + { + client->error= errno; + /* mark this client as terminated! */ + return ERROR_EVENT; + } + more_data = false; + } + } while (more_data); + + if (!drain_output(client)) + { + return ERROR_EVENT; + } + + return (client->output) ? READ_WRITE_EVENT : READ_EVENT; + }