From: Brian Aker Date: Thu, 23 Feb 2012 01:16:50 +0000 (-0800) Subject: Fix for the light server. X-Git-Tag: 1.0.5~27 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=357b107e59d9918a0f3bdd7e4aad9493c70e03f1;p=awesomized%2Flibmemcached Fix for the light server. --- diff --git a/.bzrignore b/.bzrignore index 6e39c494..d3ff77af 100644 --- a/.bzrignore +++ b/.bzrignore @@ -143,3 +143,4 @@ tests/var/ tmp_chroot unittests/unittests tests/libmemcached-1.0/testsocket +example/t/memcached_light diff --git a/clients/memcat.cc b/clients/memcat.cc index 86ae279c..c93f1a21 100644 --- a/clients/memcat.cc +++ b/clients/memcat.cc @@ -184,7 +184,6 @@ int main(int argc, char *argv[]) void options_parse(int argc, char *argv[]) { int option_index= 0; - int option_rv; memcached_programs_help_st help_options[]= { @@ -210,7 +209,7 @@ void options_parse(int argc, char *argv[]) while (1) { - option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index); + int option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index); if (option_rv == -1) break; switch (option_rv) { diff --git a/clients/memcp.cc b/clients/memcp.cc index 5422aa73..59bd7478 100644 --- a/clients/memcp.cc +++ b/clients/memcp.cc @@ -245,7 +245,7 @@ int main(int argc, char *argv[]) if (memcached_failed(rc)) { - std::cerr << "Error occrrured during operation: " << memcached_last_error_message(memc) << std::endl; + std::cerr << "Error occrrured during memcached_set(): " << memcached_last_error_message(memc) << std::endl; exit_code= EXIT_FAILURE; } @@ -254,6 +254,11 @@ int main(int argc, char *argv[]) optind++; } + if (opt_verbose) + { + std::cout << "Calling memcached_free()" << std::endl; + } + memcached_free(memc); if (opt_servers) diff --git a/clients/memrm.cc b/clients/memrm.cc index cbfcd241..61697cd1 100644 --- a/clients/memrm.cc +++ b/clients/memrm.cc @@ -97,7 +97,7 @@ int main(int argc, char *argv[]) std::cerr << "Could not find key \"" << argv[optind] << "\"" << std::endl; } } - else if (memcached_failed(rc)) + else if (memcached_fatal(rc)) { if (opt_verbose) { diff --git a/example/include.am b/example/include.am index 625c4680..0abd2c3b 100644 --- a/example/include.am +++ b/example/include.am @@ -13,11 +13,13 @@ noinst_HEADERS+= \ example_memcached_light_SOURCES= \ example/byteorder.cc \ - example/interface_v0.c \ - example/interface_v1.c \ - example/memcached_light.c + example/interface_v0.cc \ + example/interface_v1.cc \ + example/memcached_light.cc example_memcached_light_LDADD= libmemcached/libmemcachedprotocol.la \ $(LIBEVENT_LDFLAGS) -example_memcached_light_SOURCES+= example/storage.c +example_memcached_light_SOURCES+= example/storage.cc + +include example/t/include.am diff --git a/example/interface_v0.c b/example/interface_v0.c deleted file mode 100644 index ef5ba47b..00000000 --- a/example/interface_v0.c +++ /dev/null @@ -1,596 +0,0 @@ -/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -/** - * This file contains an implementation of the callback interface for level 0 - * in the protocol library. You might want to have your copy of the protocol - * specification next to your coffee ;-) - */ - -#include "config.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include "example/storage.h" -#include "example/memcached_light.h" - -static protocol_binary_response_status noop_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - protocol_binary_response_no_extras response= { - .message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_NOOP, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - }; - - return response_handler(cookie, header, (void*)&response); -} - -static protocol_binary_response_status quit_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - protocol_binary_response_no_extras response= { - .message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_QUIT, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - }; - - if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT) - response_handler(cookie, header, (void*)&response); - - /* I need a better way to signal to close the connection */ - return PROTOCOL_BINARY_RESPONSE_EINTERNAL; -} - -static protocol_binary_response_status get_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - uint8_t opcode= header->request.opcode; - union { - protocol_binary_response_get response; - char buffer[4096]; - } msg= { - .response.message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - }; - - struct item *item= get_item(header + 1, ntohs(header->request.keylen)); - if (item) - { - msg.response.message.body.flags= htonl(item->flags); - char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4); - uint32_t bodysize= 4; - msg.response.message.header.response.cas= example_htonll(item->cas); - if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ) - { - memcpy(ptr, item->key, item->nkey); - msg.response.message.header.response.keylen= htons((uint16_t)item->nkey); - ptr += item->nkey; - bodysize += (uint32_t)item->nkey; - } - memcpy(ptr, item->data, item->size); - bodysize += (uint32_t)item->size; - msg.response.message.header.response.bodylen= htonl(bodysize); - msg.response.message.header.response.extlen= 4; - - release_item(item); - return response_handler(cookie, header, (void*)&msg); - } - else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK) - { - msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); - return response_handler(cookie, header, (void*)&msg); - } - - /* Q shouldn't report a miss ;-) */ - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -static protocol_binary_response_status delete_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - size_t keylen= ntohs(header->request.keylen); - char *key= ((char*)header) + sizeof(*header); - protocol_binary_response_no_extras response= { - .message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= header->request.opcode, - .opaque= header->request.opaque - } - }; - - if (!delete_item(key, keylen)) - { - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); - return response_handler(cookie, header, (void*)&response); - } - else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE) - { - /* DELETEQ doesn't want success response */ - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); - return response_handler(cookie, header, (void*)&response); - } - - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -static protocol_binary_response_status flush_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - uint8_t opcode= header->request.opcode; - - /* @fixme sett inn when! */ - flush(0); - - if (opcode == PROTOCOL_BINARY_CMD_FLUSH) - { - protocol_binary_response_no_extras response= { - .message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - }; - return response_handler(cookie, header, (void*)&response); - } - - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -static protocol_binary_response_status arithmetic_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - protocol_binary_request_incr *req= (void*)header; - protocol_binary_response_incr response= { - .message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= header->request.opcode, - .opaque= header->request.opaque, - }, - }; - - uint16_t keylen= ntohs(header->request.keylen); - uint64_t initial= example_ntohll(req->message.body.initial); - uint64_t delta= example_ntohll(req->message.body.delta); - uint32_t expiration= ntohl(req->message.body.expiration); - uint32_t flags= 0; - void *key= req->bytes + sizeof(req->bytes); - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - - uint64_t value= initial; - - struct item *item= get_item(key, keylen); - if (item != NULL) - { - if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT || - header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ) - { - value= (*(uint64_t*)item->data) + delta; - } - else - { - if (delta > *(uint64_t*)item->data) - { - value= 0; - } - else - { - value= *(uint64_t*)item->data - delta; - } - } - expiration= (uint32_t)item->exp; - flags= item->flags; - - release_item(item); - delete_item(key, keylen); - } - - item= create_item(key, keylen, NULL, sizeof(value), flags, (time_t)expiration); - if (item == NULL) - { - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - memcpy(item->data, &value, sizeof(value)); - put_item(item); - } - - response.message.header.response.status= htons(rval); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) - { - response.message.header.response.bodylen= ntohl(8); - response.message.body.value= example_ntohll((*(uint64_t*)item->data)); - response.message.header.response.cas= example_ntohll(item->cas); - - release_item(item); - if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ || - header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ) - { - return PROTOCOL_BINARY_RESPONSE_SUCCESS; - } - } - - return response_handler(cookie, header, (void*)&response); -} - -static protocol_binary_response_status version_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - const char *versionstring= "1.0.0"; - union { - protocol_binary_response_header packet; - char buffer[256]; - } response= { - .packet.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_VERSION, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - .cas= 0, - .bodylen= htonl((uint32_t)strlen(versionstring)) - } - }; - - memcpy(response.buffer + sizeof(response.packet), versionstring, strlen(versionstring)); - - return response_handler(cookie, header, (void*)&response); -} - -static protocol_binary_response_status concat_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - uint16_t keylen= ntohs(header->request.keylen); - uint64_t cas= example_ntohll(header->request.cas); - void *key= header + 1; - uint32_t vallen= ntohl(header->request.bodylen) - keylen; - void *val= (char*)key + keylen; - - struct item *item= get_item(key, keylen); - struct item *nitem= NULL; - - if (item == NULL) - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; - } - else if (cas != 0 && cas != item->cas) - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; - } - else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, - item->flags, item->exp)) == NULL) - { - release_item(item); - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND || - header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ) - { - memcpy(nitem->data, item->data, item->size); - memcpy(((char*)(nitem->data)) + item->size, val, vallen); - } - else - { - memcpy(nitem->data, val, vallen); - memcpy(((char*)(nitem->data)) + vallen, item->data, item->size); - } - release_item(item); - delete_item(key, keylen); - put_item(nitem); - cas= nitem->cas; - release_item(nitem); - - if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND || - header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND) - { - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= header->request.opcode, - .status= htons(rval), - .opaque= header->request.opaque, - .cas= example_htonll(cas), - } - } - }; - return response_handler(cookie, header, (void*)&response); - } - } - - return rval; -} - -static protocol_binary_response_status set_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - size_t keylen= ntohs(header->request.keylen); - size_t datalen= ntohl(header->request.bodylen) - keylen - 8; - protocol_binary_request_replace *request= (void*)header; - uint32_t flags= ntohl(request->message.body.flags); - time_t timeout= (time_t)ntohl(request->message.body.expiration); - char *key= ((char*)header) + sizeof(*header) + 8; - char *data= key + keylen; - - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= header->request.opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - } - }; - - if (header->request.cas != 0) - { - /* validate cas */ - struct item* item= get_item(key, keylen); - if (item != NULL) - { - if (item->cas != example_ntohll(header->request.cas)) - { - release_item(item); - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); - return response_handler(cookie, header, (void*)&response); - } - release_item(item); - } - } - - delete_item(key, keylen); - struct item* item= create_item(key, keylen, data, datalen, flags, timeout); - if (item == NULL) - { - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); - } - else - { - put_item(item); - /* SETQ shouldn't return a message */ - if (header->request.opcode == PROTOCOL_BINARY_CMD_SET) - { - response.message.header.response.cas= example_htonll(item->cas); - release_item(item); - return response_handler(cookie, header, (void*)&response); - } - release_item(item); - - return PROTOCOL_BINARY_RESPONSE_SUCCESS; - } - - return response_handler(cookie, header, (void*)&response); -} - -static protocol_binary_response_status add_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - size_t keylen= ntohs(header->request.keylen); - size_t datalen= ntohl(header->request.bodylen) - keylen - 8; - protocol_binary_request_add *request= (void*)header; - uint32_t flags= ntohl(request->message.body.flags); - time_t timeout= (time_t)ntohl(request->message.body.expiration); - char *key= ((char*)header) + sizeof(*header) + 8; - char *data= key + keylen; - - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= header->request.opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - } - }; - - struct item* item= get_item(key, keylen); - if (item == NULL) - { - item= create_item(key, keylen, data, datalen, flags, timeout); - if (item == NULL) - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); - else - { - put_item(item); - /* ADDQ shouldn't return a message */ - if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD) - { - response.message.header.response.cas= example_htonll(item->cas); - release_item(item); - return response_handler(cookie, header, (void*)&response); - } - release_item(item); - return PROTOCOL_BINARY_RESPONSE_SUCCESS; - } - } - else - { - release_item(item); - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); - } - - return response_handler(cookie, header, (void*)&response); -} - -static protocol_binary_response_status replace_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - size_t keylen= ntohs(header->request.keylen); - size_t datalen= ntohl(header->request.bodylen) - keylen - 8; - protocol_binary_request_replace *request= (void*)header; - uint32_t flags= ntohl(request->message.body.flags); - time_t timeout= (time_t)ntohl(request->message.body.expiration); - char *key= ((char*)header) + sizeof(*header) + 8; - char *data= key + keylen; - - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= header->request.opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - } - }; - - struct item* item= get_item(key, keylen); - if (item == NULL) - { - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); - } - else if (header->request.cas == 0 || example_ntohll(header->request.cas) == item->cas) - { - release_item(item); - delete_item(key, keylen); - item= create_item(key, keylen, data, datalen, flags, timeout); - - if (item == NULL) - { - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); - } - else - { - put_item(item); - /* REPLACEQ shouldn't return a message */ - if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE) - { - response.message.header.response.cas= example_htonll(item->cas); - release_item(item); - return response_handler(cookie, header, (void*)&response); - } - release_item(item); - return PROTOCOL_BINARY_RESPONSE_SUCCESS; - } - } - else - { - response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); - release_item(item); - } - - return response_handler(cookie, header, (void*)&response); -} - -static protocol_binary_response_status stat_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - /* Just send the terminating packet*/ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_STAT, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - } - }; - - return response_handler(cookie, header, (void*)&response); -} - -memcached_binary_protocol_callback_st interface_v0_impl= { - .interface_version= MEMCACHED_PROTOCOL_HANDLER_V0, -#ifdef FUTURE - /* - ** There is a number of bugs in the extra options for gcc causing - ** warning on these struct initializers. It hurts my heart to remove - ** it so I'll just leave it in here so that we can enable it when - ** we can drop support for the broken compilers - */ - .interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler, - .interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler, -#endif -}; - -void initialize_interface_v0_handler(void) -{ - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler; - interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler; -} diff --git a/example/interface_v0.cc b/example/interface_v0.cc new file mode 100644 index 00000000..cc2f17d2 --- /dev/null +++ b/example/interface_v0.cc @@ -0,0 +1,549 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +/** + * This file contains an implementation of the callback interface for level 0 + * in the protocol library. You might want to have your copy of the protocol + * specification next to your coffee ;-) + */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include "example/memcached_light.h" +#include "example/storage.h" + +static protocol_binary_response_status noop_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= PROTOCOL_BINARY_CMD_NOOP; + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + response.message.header.response.opaque= header->request.opaque; + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); +} + +static protocol_binary_response_status quit_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= PROTOCOL_BINARY_CMD_QUIT; + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + response.message.header.response.opaque= header->request.opaque; + + if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT) + { + response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + + /* I need a better way to signal to close the connection */ + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; +} + +static protocol_binary_response_status get_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + uint8_t opcode= header->request.opcode; + union protocol_binary_response_get_un { + protocol_binary_response_get response; + char buffer[4096]; + }; + + protocol_binary_response_get_un msg; + memset(&msg, 0, sizeof(protocol_binary_response_get_un)); + + msg.response.message.header.response.magic= PROTOCOL_BINARY_RES; + msg.response.message.header.response.opcode= opcode; + msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + msg.response.message.header.response.opaque= header->request.opaque; + + struct item *item= get_item(header + 1, ntohs(header->request.keylen)); + if (item) + { + msg.response.message.body.flags= htonl(item->flags); + char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4); + uint32_t bodysize= 4; + msg.response.message.header.response.cas= example_htonll(item->cas); + if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ) + { + memcpy(ptr, item->key, item->nkey); + msg.response.message.header.response.keylen= htons((uint16_t)item->nkey); + ptr += item->nkey; + bodysize += (uint32_t)item->nkey; + } + memcpy(ptr, item->data, item->size); + bodysize += (uint32_t)item->size; + msg.response.message.header.response.bodylen= htonl(bodysize); + msg.response.message.header.response.extlen= 4; + + release_item(item); + return response_handler(cookie, header, (protocol_binary_response_header*)&msg); + } + else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK) + { + msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); + return response_handler(cookie, header, (protocol_binary_response_header*)&msg); + } + + /* Q shouldn't report a miss ;-) */ + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status delete_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + size_t keylen= ntohs(header->request.keylen); + + char *key= ((char*)header) + sizeof(*header); + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= header->request.opcode; + response.message.header.response.opaque= header->request.opaque; + + if (!delete_item(key, keylen)) + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); + return response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE) + { + /* DELETEQ doesn't want success response */ + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + return response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status flush_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + uint8_t opcode= header->request.opcode; + + /* @fixme sett inn when! */ + flush(0); + + if (opcode == PROTOCOL_BINARY_CMD_FLUSH) + { + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= opcode; + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + response.message.header.response.opaque= header->request.opaque; + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status arithmetic_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_request_incr *req= (protocol_binary_request_incr*)header; + protocol_binary_response_incr response; + memset(&response, 0, sizeof(protocol_binary_response_incr)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= header->request.opcode; + response.message.header.response.opaque= header->request.opaque; + + uint16_t keylen= ntohs(header->request.keylen); + uint64_t initial= example_ntohll(req->message.body.initial); + uint64_t delta= example_ntohll(req->message.body.delta); + uint32_t expiration= ntohl(req->message.body.expiration); + uint32_t flags= 0; + void *key= req->bytes + sizeof(req->bytes); + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + uint64_t value= initial; + + struct item *item= get_item(key, keylen); + if (item != NULL) + { + if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT || + header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ) + { + value= (*(uint64_t*)item->data) + delta; + } + else + { + if (delta > *(uint64_t*)item->data) + { + value= 0; + } + else + { + value= *(uint64_t*)item->data - delta; + } + } + expiration= (uint32_t)item->exp; + flags= item->flags; + + release_item(item); + delete_item(key, keylen); + } + + item= create_item(key, keylen, NULL, sizeof(value), flags, (time_t)expiration); + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(item->data, &value, sizeof(value)); + put_item(item); + } + + response.message.header.response.status= htons(rval); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) + { + response.message.header.response.bodylen= ntohl(8); + response.message.body.value= example_ntohll((*(uint64_t*)item->data)); + response.message.header.response.cas= example_ntohll(item->cas); + + release_item(item); + if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ || + header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ) + { + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + } + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); +} + +static protocol_binary_response_status version_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + const char *versionstring= "1.0.0"; + union protocol_binary_response_header_un + { + protocol_binary_response_header packet; + char buffer[256]; + }; + + protocol_binary_response_header_un response; + memset(&response, 0, sizeof(protocol_binary_response_header_un)); + + response.packet.response.magic= PROTOCOL_BINARY_RES; + response.packet.response.opcode= PROTOCOL_BINARY_CMD_VERSION; + response.packet.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + response.packet.response.opaque= header->request.opaque; + response.packet.response.cas= 0; + response.packet.response.bodylen= htonl((uint32_t)strlen(versionstring)); + + assert(sizeof(protocol_binary_response_header) +strlen(versionstring) <= 256); + memcpy(response.buffer + sizeof(protocol_binary_response_header), versionstring, strlen(versionstring)); + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); +} + +static protocol_binary_response_status concat_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + uint16_t keylen= ntohs(header->request.keylen); + uint64_t cas= example_ntohll(header->request.cas); + void *key= header + 1; + uint32_t vallen= ntohl(header->request.bodylen) - keylen; + void *val= (char*)key + keylen; + + struct item *item= get_item(key, keylen); + struct item *nitem= NULL; + + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + else if (cas != 0 && cas != item->cas) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, + item->flags, item->exp)) == NULL) + { + release_item(item); + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND || + header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ) + { + memcpy(nitem->data, item->data, item->size); + memcpy(((char*)(nitem->data)) + item->size, val, vallen); + } + else + { + memcpy(nitem->data, val, vallen); + memcpy(((char*)(nitem->data)) + vallen, item->data, item->size); + } + release_item(item); + delete_item(key, keylen); + put_item(nitem); + cas= nitem->cas; + release_item(nitem); + + if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND || + header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND) + { + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= header->request.opcode; + response.message.header.response.status= htons(rval); + response.message.header.response.opaque= header->request.opaque; + response.message.header.response.cas= example_htonll(cas); + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + } + + return rval; +} + +static protocol_binary_response_status set_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + size_t keylen= ntohs(header->request.keylen); + size_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_replace *request= (protocol_binary_request_replace*)header; + uint32_t flags= ntohl(request->message.body.flags); + time_t timeout= (time_t)ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= header->request.opcode; + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + response.message.header.response.opaque= header->request.opaque; + + if (header->request.cas != 0) + { + /* validate cas */ + struct item* item= get_item(key, keylen); + if (item != NULL) + { + if (item->cas != example_ntohll(header->request.cas)) + { + release_item(item); + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); + return response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + release_item(item); + } + } + + delete_item(key, keylen); + struct item* item= create_item(key, keylen, data, datalen, flags, timeout); + if (item == NULL) + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); + } + else + { + put_item(item); + /* SETQ shouldn't return a message */ + if (header->request.opcode == PROTOCOL_BINARY_CMD_SET) + { + response.message.header.response.cas= example_htonll(item->cas); + release_item(item); + return response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + release_item(item); + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); +} + +static protocol_binary_response_status add_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + size_t keylen= ntohs(header->request.keylen); + size_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_add *request= (protocol_binary_request_add*)header; + uint32_t flags= ntohl(request->message.body.flags); + time_t timeout= (time_t)ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= header->request.opcode; + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + response.message.header.response.opaque= header->request.opaque; + + struct item* item= get_item(key, keylen); + if (item == NULL) + { + item= create_item(key, keylen, data, datalen, flags, timeout); + if (item == NULL) + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); + else + { + put_item(item); + /* ADDQ shouldn't return a message */ + if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD) + { + response.message.header.response.cas= example_htonll(item->cas); + release_item(item); + return response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + release_item(item); + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + } + else + { + release_item(item); + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); + } + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); +} + +static protocol_binary_response_status replace_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + size_t keylen= ntohs(header->request.keylen); + size_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_replace *request= (protocol_binary_request_replace*)header; + uint32_t flags= ntohl(request->message.body.flags); + time_t timeout= (time_t)ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= header->request.opcode; + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + response.message.header.response.opaque= header->request.opaque; + + struct item* item= get_item(key, keylen); + if (item == NULL) + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); + } + else if (header->request.cas == 0 || example_ntohll(header->request.cas) == item->cas) + { + release_item(item); + delete_item(key, keylen); + item= create_item(key, keylen, data, datalen, flags, timeout); + + if (item == NULL) + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM); + } + else + { + put_item(item); + /* REPLACEQ shouldn't return a message */ + if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE) + { + response.message.header.response.cas= example_htonll(item->cas); + release_item(item); + return response_handler(cookie, header, (protocol_binary_response_header*)&response); + } + release_item(item); + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + } + else + { + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS); + release_item(item); + } + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); +} + +static protocol_binary_response_status stat_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + /* Just send the terminating packet*/ + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= PROTOCOL_BINARY_CMD_STAT; + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS); + response.message.header.response.opaque= header->request.opaque; + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); +} + +memcached_binary_protocol_callback_st interface_v0_impl; + +void initialize_interface_v0_handler(void) +{ + interface_v0_impl.interface_version= MEMCACHED_PROTOCOL_HANDLER_V0; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler; + interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler; +} diff --git a/example/interface_v1.c b/example/interface_v1.c deleted file mode 100644 index d2b20712..00000000 --- a/example/interface_v1.c +++ /dev/null @@ -1,411 +0,0 @@ -/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -/** - * This file contains an implementation of the callback interface for level 1 - * in the protocol library. If you compare the implementation with the one - * in interface_v0.c you will see that this implementation is much easier and - * hides all of the protocol logic and let you focus on the application - * logic. One "problem" with this layer is that it is synchronous, so that - * you will not receive the next command before a answer to the previous - * command is being sent. - */ -#include "config.h" -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include "storage.h" - -static protocol_binary_response_status add_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void *data, - uint32_t datalen, - uint32_t flags, - uint32_t exptime, - uint64_t *cas) -{ - (void)cookie; - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - struct item* item= get_item(key, keylen); - if (item == NULL) - { - item= create_item(key, keylen, data, datalen, flags, (time_t)exptime); - if (item == 0) - { - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - put_item(item); - *cas= item->cas; - release_item(item); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; - } - - return rval; -} - -static protocol_binary_response_status append_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void* val, - uint32_t vallen, - uint64_t cas, - uint64_t *result_cas) -{ - (void)cookie; - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - - struct item *item= get_item(key, keylen); - struct item *nitem; - - if (item == NULL) - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; - } - else if (cas != 0 && cas != item->cas) - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; - } - else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, - item->flags, item->exp)) == NULL) - { - release_item(item); - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - memcpy(nitem->data, item->data, item->size); - memcpy(((char*)(nitem->data)) + item->size, val, vallen); - release_item(item); - delete_item(key, keylen); - put_item(nitem); - *result_cas= nitem->cas; - release_item(nitem); - } - - return rval; -} - -static protocol_binary_response_status decrement_handler(const void *cookie, - const void *key, - uint16_t keylen, - uint64_t delta, - uint64_t initial, - uint32_t expiration, - uint64_t *result, - uint64_t *result_cas) { - (void)cookie; - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - uint64_t val= initial; - struct item *item= get_item(key, keylen); - - if (item != NULL) - { - if (delta > *(uint64_t*)item->data) - val= 0; - else - val= *(uint64_t*)item->data - delta; - - expiration= (uint32_t)item->exp; - release_item(item); - delete_item(key, keylen); - } - - item= create_item(key, keylen, NULL, sizeof(initial), 0, (time_t)expiration); - if (item == 0) - { - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - memcpy(item->data, &val, sizeof(val)); - put_item(item); - *result= val; - *result_cas= item->cas; - release_item(item); - } - - return rval; -} - -static protocol_binary_response_status delete_handler(const void *cookie, - const void *key, - uint16_t keylen, - uint64_t cas) { - (void)cookie; - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - - if (cas != 0) - { - struct item *item= get_item(key, keylen); - if (item != NULL) - { - if (item->cas != cas) - { - release_item(item); - return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; - } - release_item(item); - } - } - - if (!delete_item(key, keylen)) - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; - } - - return rval; -} - - -static protocol_binary_response_status flush_handler(const void *cookie, - uint32_t when) { - - (void)cookie; - flush(when); - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -static protocol_binary_response_status get_handler(const void *cookie, - const void *key, - uint16_t keylen, - memcached_binary_protocol_get_response_handler response_handler) { - struct item *item= get_item(key, keylen); - - if (item == NULL) - { - return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; - } - - protocol_binary_response_status rc; - rc= response_handler(cookie, key, (uint16_t)keylen, - item->data, (uint32_t)item->size, item->flags, - item->cas); - release_item(item); - return rc; -} - -static protocol_binary_response_status increment_handler(const void *cookie, - const void *key, - uint16_t keylen, - uint64_t delta, - uint64_t initial, - uint32_t expiration, - uint64_t *result, - uint64_t *result_cas) { - (void)cookie; - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - uint64_t val= initial; - struct item *item= get_item(key, keylen); - - if (item != NULL) - { - val= (*(uint64_t*)item->data) + delta; - expiration= (uint32_t)item->exp; - release_item(item); - delete_item(key, keylen); - } - - item= create_item(key, keylen, NULL, sizeof(initial), 0, (time_t)expiration); - if (item == NULL) - { - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - char buffer[1024] = {0}; - memcpy(buffer, key, keylen); - memcpy(item->data, &val, sizeof(val)); - put_item(item); - *result= val; - *result_cas= item->cas; - release_item(item); - } - - return rval; -} - -static protocol_binary_response_status noop_handler(const void *cookie) { - (void)cookie; - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -static protocol_binary_response_status prepend_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void* val, - uint32_t vallen, - uint64_t cas, - uint64_t *result_cas) { - (void)cookie; - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - - struct item *item= get_item(key, keylen); - struct item *nitem= NULL; - - if (item == NULL) - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; - } - else if (cas != 0 && cas != item->cas) - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; - } - else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, - item->flags, item->exp)) == NULL) - { - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - memcpy(nitem->data, val, vallen); - memcpy(((char*)(nitem->data)) + vallen, item->data, item->size); - release_item(item); - item= NULL; - delete_item(key, keylen); - put_item(nitem); - *result_cas= nitem->cas; - } - - if (item) - release_item(item); - - if (nitem) - release_item(nitem); - - return rval; -} - -static protocol_binary_response_status quit_handler(const void *cookie) { - (void)cookie; - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -static protocol_binary_response_status replace_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void* data, - uint32_t datalen, - uint32_t flags, - uint32_t exptime, - uint64_t cas, - uint64_t *result_cas) { - (void)cookie; - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - struct item* item= get_item(key, keylen); - - if (item == NULL) - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; - } - else if (cas == 0 || cas == item->cas) - { - release_item(item); - delete_item(key, keylen); - item= create_item(key, keylen, data, datalen, flags, (time_t)exptime); - if (item == 0) - { - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - put_item(item); - *result_cas= item->cas; - release_item(item); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; - release_item(item); - } - - return rval; -} - -static protocol_binary_response_status set_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void* data, - uint32_t datalen, - uint32_t flags, - uint32_t exptime, - uint64_t cas, - uint64_t *result_cas) { - (void)cookie; - protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - - if (cas != 0) - { - struct item* item= get_item(key, keylen); - if (item != NULL && cas != item->cas) - { - /* Invalid CAS value */ - release_item(item); - return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; - } - } - - delete_item(key, keylen); - struct item* item= create_item(key, keylen, data, datalen, flags, (time_t)exptime); - if (item == 0) - { - rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; - } - else - { - put_item(item); - *result_cas= item->cas; - release_item(item); - } - - return rval; -} - -static protocol_binary_response_status stat_handler(const void *cookie, - const void *key, - uint16_t keylen, - memcached_binary_protocol_stat_response_handler response_handler) { - (void)key; - (void)keylen; - /* Just return an empty packet */ - return response_handler(cookie, NULL, 0, NULL, 0); -} - -static protocol_binary_response_status version_handler(const void *cookie, - memcached_binary_protocol_version_response_handler response_handler) { - const char *version= "0.1.1"; - return response_handler(cookie, version, (uint32_t)strlen(version)); -} - -memcached_binary_protocol_callback_st interface_v1_impl= { - .interface_version= MEMCACHED_PROTOCOL_HANDLER_V1, - .interface.v1= { - .add= add_handler, - .append= append_handler, - .decrement= decrement_handler, - .delete= delete_handler, - .flush= flush_handler, - .get= get_handler, - .increment= increment_handler, - .noop= noop_handler, - .prepend= prepend_handler, - .quit= quit_handler, - .replace= replace_handler, - .set= set_handler, - .stat= stat_handler, - .version= version_handler - } -}; diff --git a/example/interface_v1.cc b/example/interface_v1.cc new file mode 100644 index 00000000..a4364918 --- /dev/null +++ b/example/interface_v1.cc @@ -0,0 +1,412 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +/** + * This file contains an implementation of the callback interface for level 1 + * in the protocol library. If you compare the implementation with the one + * in interface_v0.c you will see that this implementation is much easier and + * hides all of the protocol logic and let you focus on the application + * logic. One "problem" with this layer is that it is synchronous, so that + * you will not receive the next command before a answer to the previous + * command is being sent. + */ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include "example/memcached_light.h" +#include "example/storage.h" + +static protocol_binary_response_status add_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *data, + uint32_t datalen, + uint32_t flags, + uint32_t exptime, + uint64_t *cas) +{ + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + struct item* item= get_item(key, keylen); + if (item == NULL) + { + item= create_item(key, keylen, data, datalen, flags, (time_t)exptime); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + put_item(item); + *cas= item->cas; + release_item(item); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + + return rval; +} + +static protocol_binary_response_status append_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint64_t cas, + uint64_t *result_cas) +{ + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + struct item *item= get_item(key, keylen); + struct item *nitem; + + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + else if (cas != 0 && cas != item->cas) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, + item->flags, item->exp)) == NULL) + { + release_item(item); + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(nitem->data, item->data, item->size); + memcpy(((char*)(nitem->data)) + item->size, val, vallen); + release_item(item); + delete_item(key, keylen); + put_item(nitem); + *result_cas= nitem->cas; + release_item(nitem); + } + + return rval; +} + +static protocol_binary_response_status decrement_handler(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t delta, + uint64_t initial, + uint32_t expiration, + uint64_t *result, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + uint64_t val= initial; + struct item *item= get_item(key, keylen); + + if (item != NULL) + { + if (delta > *(uint64_t*)item->data) + val= 0; + else + val= *(uint64_t*)item->data - delta; + + expiration= (uint32_t)item->exp; + release_item(item); + delete_item(key, keylen); + } + + item= create_item(key, keylen, NULL, sizeof(initial), 0, (time_t)expiration); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(item->data, &val, sizeof(val)); + put_item(item); + *result= val; + *result_cas= item->cas; + release_item(item); + } + + return rval; +} + +static protocol_binary_response_status delete_handler(const void *, // cookie + const void *key, + uint16_t keylen, + uint64_t cas) +{ + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + if (cas != 0) + { + struct item *item= get_item(key, keylen); + if (item != NULL) + { + if (item->cas != cas) + { + release_item(item); + return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + release_item(item); + } + } + + if (!delete_item(key, keylen)) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + + return rval; +} + + +static protocol_binary_response_status flush_handler(const void * /* cookie */, uint32_t /* when */) +{ + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status get_handler(const void *cookie, + const void *key, + uint16_t keylen, + memcached_binary_protocol_get_response_handler response_handler) { + struct item *item= get_item(key, keylen); + + if (item == NULL) + { + return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + + protocol_binary_response_status rc; + rc= response_handler(cookie, key, (uint16_t)keylen, + item->data, (uint32_t)item->size, item->flags, + item->cas); + release_item(item); + return rc; +} + +static protocol_binary_response_status increment_handler(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t delta, + uint64_t initial, + uint32_t expiration, + uint64_t *result, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + uint64_t val= initial; + struct item *item= get_item(key, keylen); + + if (item != NULL) + { + val= (*(uint64_t*)item->data) + delta; + expiration= (uint32_t)item->exp; + release_item(item); + delete_item(key, keylen); + } + + item= create_item(key, keylen, NULL, sizeof(initial), 0, (time_t)expiration); + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + char buffer[1024] = {0}; + memcpy(buffer, key, keylen); + memcpy(item->data, &val, sizeof(val)); + put_item(item); + *result= val; + *result_cas= item->cas; + release_item(item); + } + + return rval; +} + +static protocol_binary_response_status noop_handler(const void *cookie) { + (void)cookie; + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status prepend_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void* val, + uint32_t vallen, + uint64_t cas, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + struct item *item= get_item(key, keylen); + struct item *nitem= NULL; + + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + else if (cas != 0 && cas != item->cas) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + else if ((nitem= create_item(key, keylen, NULL, item->size + vallen, + item->flags, item->exp)) == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + memcpy(nitem->data, val, vallen); + memcpy(((char*)(nitem->data)) + vallen, item->data, item->size); + release_item(item); + item= NULL; + delete_item(key, keylen); + put_item(nitem); + *result_cas= nitem->cas; + } + + if (item) + release_item(item); + + if (nitem) + release_item(nitem); + + return rval; +} + +static protocol_binary_response_status quit_handler(const void *) //cookie +{ + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +static protocol_binary_response_status replace_handler(const void *, // cookie + const void *key, + uint16_t keylen, + const void* data, + uint32_t datalen, + uint32_t flags, + uint32_t exptime, + uint64_t cas, + uint64_t *result_cas) +{ + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + struct item* item= get_item(key, keylen); + + if (item == NULL) + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } + else if (cas == 0 || cas == item->cas) + { + release_item(item); + delete_item(key, keylen); + item= create_item(key, keylen, data, datalen, flags, (time_t)exptime); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + put_item(item); + *result_cas= item->cas; + release_item(item); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + release_item(item); + } + + return rval; +} + +static protocol_binary_response_status set_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void* data, + uint32_t datalen, + uint32_t flags, + uint32_t exptime, + uint64_t cas, + uint64_t *result_cas) { + (void)cookie; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + + if (cas != 0) + { + struct item* item= get_item(key, keylen); + if (item != NULL && cas != item->cas) + { + /* Invalid CAS value */ + release_item(item); + return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } + } + + delete_item(key, keylen); + struct item* item= create_item(key, keylen, data, datalen, flags, (time_t)exptime); + if (item == 0) + { + rval= PROTOCOL_BINARY_RESPONSE_ENOMEM; + } + else + { + put_item(item); + *result_cas= item->cas; + release_item(item); + } + + return rval; +} + +static protocol_binary_response_status stat_handler(const void *cookie, + const void *, // key + uint16_t, // keylen, + memcached_binary_protocol_stat_response_handler response_handler) +{ + /* Just return an empty packet */ + return response_handler(cookie, NULL, 0, NULL, 0); +} + +static protocol_binary_response_status version_handler(const void *cookie, + memcached_binary_protocol_version_response_handler response_handler) +{ + const char *version= "0.1.1"; + return response_handler(cookie, version, (uint32_t)strlen(version)); +} + +memcached_binary_protocol_callback_st interface_v1_impl; + +void initialize_interface_v1_handler(void) +{ + memset(&interface_v1_impl, 0, sizeof(memcached_binary_protocol_callback_st)); + + interface_v1_impl.interface_version= MEMCACHED_PROTOCOL_HANDLER_V1; + interface_v1_impl.interface.v1.add= add_handler; + interface_v1_impl.interface.v1.append= append_handler; + interface_v1_impl.interface.v1.decrement= decrement_handler; + interface_v1_impl.interface.v1.delete_object= delete_handler; + interface_v1_impl.interface.v1.flush_object= flush_handler; + interface_v1_impl.interface.v1.get= get_handler; + interface_v1_impl.interface.v1.increment= increment_handler; + interface_v1_impl.interface.v1.noop= noop_handler; + interface_v1_impl.interface.v1.prepend= prepend_handler; + interface_v1_impl.interface.v1.quit= quit_handler; + interface_v1_impl.interface.v1.replace= replace_handler; + interface_v1_impl.interface.v1.set= set_handler; + interface_v1_impl.interface.v1.stat= stat_handler; + interface_v1_impl.interface.v1.version= version_handler; +} diff --git a/example/memcached_light.c b/example/memcached_light.c deleted file mode 100644 index c064e519..00000000 --- a/example/memcached_light.c +++ /dev/null @@ -1,475 +0,0 @@ -/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -/** - * What is a library without an example to show you how to use the library? - * This example use both interfaces to implement a small memcached server. - * Please note that this is an exemple on how to use the library, not - * an implementation of a scalable memcached server. If you look closely - * at the example it isn't even multithreaded ;-) - * - * With that in mind, let me give you some pointers into the source: - * storage.c/h - Implements the item store for this server and not really - * interesting for this example. - * interface_v0.c - Shows an implementation of the memcached server by using - * the "raw" access to the packets as they arrive - * interface_v1.c - Shows an implementation of the memcached server by using - * the more "logical" interface. - * memcached_light.c - This file sets up all of the sockets and run the main - * message loop. - * - * - * config.h is included so that I can use the ntohll/htonll on platforms that - * doesn't have that (this is a private function inside libmemcached, so you - * cannot use it directly from libmemcached without special modifications to - * the library) - */ - -#include "config.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include "example/storage.h" -#include "example/memcached_light.h" - -extern memcached_binary_protocol_callback_st interface_v0_impl; -extern memcached_binary_protocol_callback_st interface_v1_impl; - -static memcached_socket_t server_sockets[1024]; -static int num_server_sockets= 0; - -struct connection -{ - void *userdata; - struct event event; -}; - -/* The default maximum number of connections... (change with -c) */ -static int maxconns = 1024; - -static struct connection *socket_userdata_map; -static bool verbose= false; -static struct event_base *event_base; - -struct options_st { - char *pid_file; - bool has_port; - in_port_t port; -} global_options; - -typedef struct options_st options_st; - -/** - * Callback for driving a client connection - * @param fd the socket for the client socket - * @param which identifying the event that occurred (not used) - * @param arg the connection structure for the client - */ -static void drive_client(memcached_socket_t fd, short which, void *arg) -{ - (void)which; - struct connection *client= arg; - struct memcached_protocol_client_st* c= client->userdata; - assert(c != NULL); - - memcached_protocol_event_t events= memcached_protocol_client_work(c); - if (events & MEMCACHED_PROTOCOL_ERROR_EVENT) - { - memcached_protocol_client_destroy(c); - closesocket(fd); - } else { - short flags = 0; - if (events & MEMCACHED_PROTOCOL_WRITE_EVENT) - { - flags= EV_WRITE; - } - - if (events & MEMCACHED_PROTOCOL_READ_EVENT) - { - flags|= EV_READ; - } - - event_set(&client->event, (intptr_t)fd, flags, drive_client, client); - event_base_set(event_base, &client->event); - - if (event_add(&client->event, 0) == -1) - { - (void)fprintf(stderr, "Failed to add event for %d\n", fd); - memcached_protocol_client_destroy(c); - closesocket(fd); - } - } -} - -/** - * Callback for accepting new connections - * @param fd the socket for the server socket - * @param which identifying the event that occurred (not used) - * @param arg the connection structure for the server - */ -static void accept_handler(memcached_socket_t fd, short which, void *arg) -{ - (void)which; - struct connection *server= arg; - /* accept new client */ - struct sockaddr_storage addr; - socklen_t addrlen= sizeof(addr); - memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen); - - if (sock == INVALID_SOCKET) - { - perror("Failed to accept client"); - return ; - } - -#ifndef WIN32 - if (sock >= maxconns) - { - (void)fprintf(stderr, "Client outside socket range (specified with -c)\n"); - closesocket(sock); - return ; - } -#endif - - struct memcached_protocol_client_st* c; - c= memcached_protocol_create_client(server->userdata, sock); - if (c == NULL) - { - (void)fprintf(stderr, "Failed to create client\n"); - closesocket(sock); - } - else - { - struct connection *client = &socket_userdata_map[sock]; - client->userdata= c; - - event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client); - event_base_set(event_base, &client->event); - if (event_add(&client->event, 0) == -1) - { - (void)fprintf(stderr, "Failed to add event for %d\n", sock); - memcached_protocol_client_destroy(c); - closesocket(sock); - } - } -} - -/** - * Create a socket and bind it to a specific port number - * @param port the port number to bind to - */ -static int server_socket(const char *port) -{ - struct addrinfo *ai; - struct addrinfo hints= { .ai_flags= AI_PASSIVE, - .ai_family= AF_UNSPEC, - .ai_socktype= SOCK_STREAM }; - - int error= getaddrinfo("127.0.0.1", port, &hints, &ai); - if (error != 0) - { - if (error != EAI_SYSTEM) - fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error)); - else - perror("getaddrinfo()"); - - return 0; - } - - struct linger ling= {0, 0}; - - for (struct addrinfo *next= ai; next; next= next->ai_next) - { - memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); - if (sock == INVALID_SOCKET) - { - perror("Failed to create socket"); - continue; - } - - int flags; -#ifdef WIN32 - u_long arg = 1; - if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR) - { - perror("Failed to set nonblocking io"); - closesocket(sock); - continue; - } -#else - flags= fcntl(sock, F_GETFL, 0); - if (flags == -1) - { - perror("Failed to get socket flags"); - closesocket(sock); - continue; - } - - if ((flags & O_NONBLOCK) != O_NONBLOCK) - { - if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) - { - perror("Failed to set socket to nonblocking mode"); - closesocket(sock); - continue; - } - } -#endif - - flags= 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0) - perror("Failed to set SO_REUSEADDR"); - - if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0) - perror("Failed to set SO_KEEPALIVE"); - - if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0) - perror("Failed to set SO_LINGER"); - - if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0) - perror("Failed to set TCP_NODELAY"); - - if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR) - { - if (get_socket_errno() != EADDRINUSE) - { - perror("bind()"); - freeaddrinfo(ai); - } - closesocket(sock); - continue; - } - - if (listen(sock, 1024) == SOCKET_ERROR) - { - perror("listen()"); - closesocket(sock); - continue; - } - - server_sockets[num_server_sockets++]= sock; - } - - freeaddrinfo(ai); - - return (num_server_sockets > 0) ? 0 : 1; -} - -/** - * Convert a command code to a textual string - * @param cmd the comcode to convert - * @return a textual string with the command or NULL for unknown commands - */ -static const char* comcode2str(uint8_t cmd) -{ - static const char * const text[] = { - "GET", "SET", "ADD", "REPLACE", "DELETE", - "INCREMENT", "DECREMENT", "QUIT", "FLUSH", - "GETQ", "NOOP", "VERSION", "GETK", "GETKQ", - "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ", - "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ", - "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ" - }; - - if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ) - return text[cmd]; - - return NULL; -} - -/** - * Print out the command we are about to execute - */ -static void pre_execute(const void *cookie, - protocol_binary_request_header *header) -{ - if (verbose) - { - const char *cmd= comcode2str(header->request.opcode); - if (cmd != NULL) - fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd); - else - fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode); - } -} - -/** - * Print out the command we just executed - */ -static void post_execute(const void *cookie, - protocol_binary_request_header *header) -{ - if (verbose) - { - const char *cmd= comcode2str(header->request.opcode); - if (cmd != NULL) - fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd); - else - fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode); - } -} - -/** - * Callback handler for all unknown commands. - * Send an unknown command back to the client - */ -static protocol_binary_response_status unknown(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= header->request.opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND), - .opaque= header->request.opaque - } - } - }; - - return response_handler(cookie, header, (void*)&response); -} - -/** - * Program entry point. Bind to the specified port(s) and serve clients - * - * @param argc number of items in the argument vector - * @param argv argument vector - * @return EXIT_SUCCESS on success, 1 otherwise - */ -int main(int argc, char **argv) -{ - int cmd; - memcached_binary_protocol_callback_st *interface= &interface_v0_impl; - - memset(&global_options, 0, sizeof(global_options)); - - event_base= event_init(); - if (event_base == NULL) - { - fprintf(stderr, "Failed to create an instance of libevent\n"); - return EXIT_FAILURE; - } - - /* - * We need to initialize the handlers manually due to a bug in the - * warnings generated by struct initialization in gcc (all the way up to 4.4) - */ - initialize_interface_v0_handler(); - - while ((cmd= getopt(argc, argv, "v1p:P:?hc:")) != EOF) - { - switch (cmd) { - case '1': - interface= &interface_v1_impl; - break; - case 'P': - global_options.pid_file= strdup(optarg); - break; - case 'p': - global_options.has_port= true; - (void)server_socket(optarg); - break; - case 'v': - verbose= true; - break; - case 'c': - maxconns= atoi(optarg); - break; - case 'h': /* FALLTHROUGH */ - case '?': /* FALLTHROUGH */ - default: - (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1] [-c #clients] [-P pidfile]\n", - argv[0]); - return EXIT_FAILURE; - } - } - - if (! initialize_storage()) - { - /* Error message already printed */ - return EXIT_FAILURE; - } - - if (! global_options.has_port) - (void)server_socket("9999"); - - if (global_options.pid_file) - { - FILE *pid_file; - uint32_t pid; - - pid_file= fopen(global_options.pid_file, "w+"); - - if (pid_file == NULL) - { - perror(strerror(get_socket_errno())); - abort(); - } - - pid= (uint32_t)getpid(); - fprintf(pid_file, "%u\n", pid); - fclose(pid_file); - } - - if (num_server_sockets == 0) - { - fprintf(stderr, "I don't have any server sockets\n"); - return EXIT_FAILURE; - } - - /* - * Create and initialize the handles to the protocol handlers. I want - * to be able to trace the traffic throught the pre/post handlers, and - * set up a common handler for unknown messages - */ - interface->pre_execute= pre_execute; - interface->post_execute= post_execute; - interface->unknown= unknown; - - struct memcached_protocol_st *protocol_handle; - if ((protocol_handle= memcached_protocol_create_instance()) == NULL) - { - fprintf(stderr, "Failed to allocate protocol handle\n"); - return EXIT_FAILURE; - } - - socket_userdata_map= calloc((size_t)(maxconns), sizeof(struct connection)); - if (socket_userdata_map == NULL) - { - fprintf(stderr, "Failed to allocate room for connections\n"); - return EXIT_FAILURE; - } - - memcached_binary_protocol_set_callbacks(protocol_handle, interface); - memcached_binary_protocol_set_pedantic(protocol_handle, true); - - for (int xx= 0; xx < num_server_sockets; ++xx) - { - struct connection *conn= &socket_userdata_map[server_sockets[xx]]; - conn->userdata= protocol_handle; - event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST, - accept_handler, conn); - event_base_set(event_base, &conn->event); - if (event_add(&conn->event, 0) == -1) - { - fprintf(stderr, "Failed to add event for %d\n", server_sockets[xx]); - closesocket(server_sockets[xx]); - } - } - - /* Serve all of the clients */ - event_base_loop(event_base, 0); - - /* NOTREACHED */ - return EXIT_SUCCESS; -} diff --git a/example/memcached_light.cc b/example/memcached_light.cc new file mode 100644 index 00000000..791d259b --- /dev/null +++ b/example/memcached_light.cc @@ -0,0 +1,587 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +/** + * What is a library without an example to show you how to use the library? + * This example use both interfaces to implement a small memcached server. + * Please note that this is an exemple on how to use the library, not + * an implementation of a scalable memcached server. If you look closely + * at the example it isn't even multithreaded ;-) + * + * With that in mind, let me give you some pointers into the source: + * storage.c/h - Implements the item store for this server and not really + * interesting for this example. + * interface_v0.c - Shows an implementation of the memcached server by using + * the "raw" access to the packets as they arrive + * interface_v1.c - Shows an implementation of the memcached server by using + * the more "logical" interface. + * memcached_light.c - This file sets up all of the sockets and run the main + * message loop. + * + * + * config.h is included so that I can use the ntohll/htonll on platforms that + * doesn't have that (this is a private function inside libmemcached, so you + * cannot use it directly from libmemcached without special modifications to + * the library) + */ + +#include "config.h" + +#include +#include +#include +#include "example/storage.h" +#include "example/memcached_light.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern memcached_binary_protocol_callback_st interface_v0_impl; +extern memcached_binary_protocol_callback_st interface_v1_impl; + +static memcached_socket_t server_sockets[1024]; +static int num_server_sockets= 0; + +struct connection +{ + void *userdata; + struct event event; +}; + +/* The default maximum number of connections... (change with -c) */ +static int maxconns= 1024; + +static struct connection *socket_userdata_map; +static struct event_base *event_base; + +struct options_st { + char *pid_file; + bool is_verbose; + + options_st() : + pid_file(NULL), + is_verbose(false) + { + } +}; + +static options_st global_options; + +/** + * Callback for driving a client connection + * @param fd the socket for the client socket + * @param which identifying the event that occurred (not used) + * @param arg the connection structure for the client + */ +static void drive_client(memcached_socket_t fd, short, void *arg) +{ + struct connection *client= (struct connection*)arg; + struct memcached_protocol_client_st* c= (struct memcached_protocol_client_st*)client->userdata; + assert(c != NULL); + + memcached_protocol_event_t events= memcached_protocol_client_work(c); + if (events & MEMCACHED_PROTOCOL_ERROR_EVENT) + { + if (global_options.is_verbose) + { + struct sockaddr_in sin; + socklen_t addrlen= sizeof(sin); + + if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) != -1) + { + std::cout << __FILE__ << ":" << __LINE__ + << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)" + << " " << inet_ntoa(sin.sin_addr) << ":" << sin.sin_port + << " fd:" << fd + << std::endl; + } + else + { + std::cout << __FILE__ << ":" << __LINE__ << "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl; + } + } + + memcached_protocol_client_destroy(c); + closesocket(fd); + } + else + { + short flags = 0; + if (events & MEMCACHED_PROTOCOL_WRITE_EVENT) + { + flags= EV_WRITE; + } + + if (events & MEMCACHED_PROTOCOL_READ_EVENT) + { + flags|= EV_READ; + } + + event_set(&client->event, (intptr_t)fd, flags, drive_client, client); + event_base_set(event_base, &client->event); + + if (event_add(&client->event, 0) == -1) + { + memcached_protocol_client_destroy(c); + closesocket(fd); + } + } +} + +/** + * Callback for accepting new connections + * @param fd the socket for the server socket + * @param which identifying the event that occurred (not used) + * @param arg the connection structure for the server + */ +static void accept_handler(memcached_socket_t fd, short, void *arg) +{ + struct connection *server= (struct connection *)arg; + /* accept new client */ + struct sockaddr_storage addr; + socklen_t addrlen= sizeof(addr); + memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen); + + if (sock == INVALID_SOCKET) + { + perror("Failed to accept client"); + } + +#ifndef WIN32 + if (sock >= maxconns) + { + closesocket(sock); + return ; + } +#endif + + struct memcached_protocol_client_st* c= memcached_protocol_create_client((memcached_protocol_st*)server->userdata, sock); + if (c == NULL) + { + closesocket(sock); + } + else + { + memcached_protocol_client_set_verbose(c, global_options.is_verbose); + struct connection *client = &socket_userdata_map[sock]; + client->userdata= c; + + event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client); + event_base_set(event_base, &client->event); + if (event_add(&client->event, 0) == -1) + { + std::cerr << "Failed to add event for " << sock << std::endl; + memcached_protocol_client_destroy(c); + closesocket(sock); + } + } +} + +/** + * Create a socket and bind it to a specific port number + * @param port the port number to bind to + */ +static int server_socket(const char *port) +{ + struct addrinfo *ai; + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + + hints.ai_flags= AI_PASSIVE; + hints.ai_family= AF_UNSPEC; + hints.ai_socktype= SOCK_STREAM; + + int error= getaddrinfo("127.0.0.1", port, &hints, &ai); + if (error != 0) + { + if (error != EAI_SYSTEM) + { + std::cerr << "getaddrinfo(): " << gai_strerror(error) << std::endl; + } + else + { + std::cerr << "getaddrinfo(): " << strerror(errno) << std::endl; + } + + return 0; + } + + struct linger ling= {0, 0}; + + for (struct addrinfo *next= ai; next; next= next->ai_next) + { + memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (sock == INVALID_SOCKET) + { + std::cerr << "Failed to create socket: " << strerror(errno) << std::endl; + continue; + } + + int flags; +#ifdef WIN32 + u_long arg = 1; + if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR) + { + std::cerr << "Failed to set nonblocking io: " << strerror(errno) << std::endl; + closesocket(sock); + continue; + } +#else + flags= fcntl(sock, F_GETFL, 0); + if (flags == -1) + { + std::cerr << "Failed to get socket flags: " << strerror(errno) << std::endl; + closesocket(sock); + continue; + } + + if ((flags & O_NONBLOCK) != O_NONBLOCK) + { + if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) + { + std::cerr << "Failed to set socket to nonblocking mode: " << strerror(errno) << std::endl; + closesocket(sock); + continue; + } + } +#endif + + flags= 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0) + { + std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl; + } + + if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0) + { + std::cerr << "Failed to set SO_KEEPALIVE: " << strerror(errno) << std::endl; + } + + if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0) + { + std::cerr << "Failed to set SO_LINGER: " << strerror(errno) << std::endl; + } + + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0) + { + std::cerr << "Failed to set TCP_NODELAY: " << strerror(errno) << std::endl; + } + + if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR) + { + if (get_socket_errno() != EADDRINUSE) + { + std::cerr << "bind(): " << strerror(errno) << std::endl; + freeaddrinfo(ai); + } + closesocket(sock); + continue; + } + + if (listen(sock, 1024) == SOCKET_ERROR) + { + std::cerr << "listen(): " << strerror(errno) << std::endl; + closesocket(sock); + continue; + } + + if (global_options.is_verbose) + { + std::cout << "Listening to " << port << std::endl; + } + + server_sockets[num_server_sockets++]= sock; + } + + freeaddrinfo(ai); + + return (num_server_sockets > 0) ? 0 : 1; +} + +/** + * Convert a command code to a textual string + * @param cmd the comcode to convert + * @return a textual string with the command or NULL for unknown commands + */ +static const char* comcode2str(uint8_t cmd) +{ + static const char * const text[] = { + "GET", "SET", "ADD", "REPLACE", "DELETE", + "INCREMENT", "DECREMENT", "QUIT", "FLUSH", + "GETQ", "NOOP", "VERSION", "GETK", "GETKQ", + "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ", + "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ", + "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ" + }; + + if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ) + { + return text[cmd]; + } + + return NULL; +} + +/** + * Print out the command we are about to execute + */ +static void pre_execute(const void *cookie, + protocol_binary_request_header *header) +{ + if (global_options.is_verbose) + { + if (header) + { + const char *cmd= comcode2str(header->request.opcode); + if (cmd != NULL) + { + std::cout << "pre_execute from " << cookie << ": " << cmd << std::endl; + } + else + { + std::cout << "pre_execute from " << cookie << ": " << header->request.opcode << std::endl; + } + } + else + { + std::cout << "pre_execute from " << cookie << std::endl; + } + } +} + +/** + * Print out the command we just executed + */ +static void post_execute(const void *cookie, + protocol_binary_request_header *header) +{ + if (global_options.is_verbose) + { + if (header) + { + const char *cmd= comcode2str(header->request.opcode); + if (cmd != NULL) + { + std::cout << "post_execute from " << cookie << ": " << cmd << std::endl; + } + else + { + std::cout << "post_execute from " << cookie << ": " << header->request.opcode << std::endl; + } + } + else + { + std::cout << "post_execute from " << cookie << std::endl; + } + } +} + +/** + * Callback handler for all unknown commands. + * Send an unknown command back to the client + */ +static protocol_binary_response_status unknown(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_no_extras response; + memset(&response, 0, sizeof(protocol_binary_response_no_extras)); + + response.message.header.response.magic= PROTOCOL_BINARY_RES; + response.message.header.response.opcode= header->request.opcode; + response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND); + response.message.header.response.opaque= header->request.opaque; + + return response_handler(cookie, header, (protocol_binary_response_header*)&response); +} + +/** + * Program entry point. Bind to the specified port(s) and serve clients + * + * @param argc number of items in the argument vector + * @param argv argument vector + * @return EXIT_SUCCESS on success, 1 otherwise + */ +int main(int argc, char **argv) +{ + memcached_binary_protocol_callback_st *interface= &interface_v0_impl; + + event_base= event_init(); + if (event_base == NULL) + { + std::cerr << "Failed to create an instance of libevent" << std::endl; + return EXIT_FAILURE; + } + + /* + * We need to initialize the handlers manually due to a bug in the + * warnings generated by struct initialization in gcc (all the way up to 4.4) + */ + initialize_interface_v0_handler(); + initialize_interface_v1_handler(); + + { + enum long_option_t { + OPT_HELP, + OPT_VERBOSE, + OPT_PROTOCOL_VERSION, + OPT_VERSION, + OPT_PORT, + OPT_MAX_CONNECTIONS, + OPT_PIDFILE + }; + + static struct option long_options[]= + { + {"help", no_argument, NULL, OPT_HELP}, + {"port", required_argument, NULL, OPT_PORT}, + {"verbose", no_argument, NULL, OPT_VERBOSE}, + {"protocol", required_argument, NULL, OPT_PROTOCOL_VERSION}, + {"version", no_argument, NULL, OPT_VERSION}, + {"max-connections", required_argument, NULL, OPT_MAX_CONNECTIONS}, + {"pid-file", required_argument, NULL, OPT_PIDFILE}, + {0, 0, 0, 0} + }; + + int option_index; + bool has_port= false; + bool done= false; + while (done == false) + { + switch (getopt_long(argc, argv, "", long_options, &option_index)) + { + case -1: + done= true; + break; + + case OPT_PROTOCOL_VERSION: + interface= &interface_v1_impl; + break; + + case OPT_PIDFILE: + global_options.pid_file= strdup(optarg); + break; + + case OPT_VERBOSE: + global_options.is_verbose= true; + break; + + case OPT_PORT: + has_port= true; + (void)server_socket(optarg); + break; + + case OPT_MAX_CONNECTIONS: + maxconns= atoi(optarg); + break; + + case OPT_HELP: /* FALLTHROUGH */ + std::cout << "Usage: " << argv[0] << std::endl; + for (struct option *ptr_option= long_options; ptr_option->name; ptr_option++) + { + std::cout << "\t" << ptr_option->name << std::endl; + } + return EXIT_SUCCESS; + + default: + { + std::cerr << "Unknown option: " << optarg << std::endl; + return EXIT_FAILURE; + } + } + } + + if (has_port == false) + { + (void)server_socket("9999"); + } + } + + if (! initialize_storage()) + { + /* Error message already printed */ + return EXIT_FAILURE; + } + + if (global_options.pid_file) + { + FILE *pid_file; + uint32_t pid; + + pid_file= fopen(global_options.pid_file, "w+"); + + if (pid_file == NULL) + { + perror(strerror(get_socket_errno())); + abort(); + } + + pid= (uint32_t)getpid(); + if (global_options.is_verbose) + { + std::cout << "pid:" << pid << std::endl; + } + fclose(pid_file); + } + + if (num_server_sockets == 0) + { + std::cerr << "No server sockets are available" << std::endl; + return EXIT_FAILURE; + } + + /* + * Create and initialize the handles to the protocol handlers. I want + * to be able to trace the traffic throught the pre/post handlers, and + * set up a common handler for unknown messages + */ + interface->pre_execute= pre_execute; + interface->post_execute= post_execute; + interface->unknown= unknown; + + struct memcached_protocol_st *protocol_handle; + if ((protocol_handle= memcached_protocol_create_instance()) == NULL) + { + std::cerr << "Failed to allocate protocol handle" << std::endl; + return EXIT_FAILURE; + } + + socket_userdata_map= (struct connection*)calloc((size_t)(maxconns), sizeof(struct connection)); + if (socket_userdata_map == NULL) + { + std::cerr << "Failed to allocate room for connections" << std::endl; + return EXIT_FAILURE; + } + + memcached_binary_protocol_set_callbacks(protocol_handle, interface); + memcached_binary_protocol_set_pedantic(protocol_handle, true); + + for (int xx= 0; xx < num_server_sockets; ++xx) + { + struct connection *conn= &socket_userdata_map[server_sockets[xx]]; + conn->userdata= protocol_handle; + + event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST, accept_handler, conn); + + event_base_set(event_base, &conn->event); + if (event_add(&conn->event, 0) == -1) + { + std::cerr << "Failed to add event for " << server_sockets[xx] << std::endl; + closesocket(server_sockets[xx]); + } + } + + /* Serve all of the clients */ + event_base_loop(event_base, 0); + + /* NOTREACHED */ + return EXIT_SUCCESS; +} diff --git a/example/memcached_light.h b/example/memcached_light.h index 52916939..12c4ea79 100644 --- a/example/memcached_light.h +++ b/example/memcached_light.h @@ -1,7 +1,5 @@ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -#ifndef MEMCACHED_LIGHT_H -#define MEMCACHED_LIGHT_H +#pragma once extern void initialize_interface_v0_handler(void); - -#endif +extern void initialize_interface_v1_handler(void); diff --git a/example/storage.c b/example/storage.c deleted file mode 100644 index 1fb79730..00000000 --- a/example/storage.c +++ /dev/null @@ -1,172 +0,0 @@ -/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -#include "config.h" -#include -#include -#include -#include -#include -#include "storage.h" - -struct list_entry { - struct item item; - struct list_entry *next; - struct list_entry *prev; -}; - -static struct list_entry *root; -static uint64_t cas; - -bool initialize_storage(void) -{ - return true; -} - -void shutdown_storage(void) -{ - /* Do nothing */ -} - -void put_item(struct item* item) -{ - struct list_entry* entry= (void*)item; - - update_cas(item); - - if (root == NULL) - { - entry->next= entry->prev= entry; - } - else - { - entry->prev= root->prev; - entry->next= root; - entry->prev->next= entry; - entry->next->prev= entry; - } - - root= entry; -} - -struct item* get_item(const void* key, size_t nkey) -{ - struct list_entry *walker= root; - - if (root == NULL) - { - return NULL; - } - - do - { - if (((struct item*)walker)->nkey == nkey && - memcmp(((struct item*)walker)->key, key, nkey) == 0) - { - return (struct item*)walker; - } - walker= walker->next; - } while (walker != root); - - return NULL; -} - -struct item* create_item(const void* key, size_t nkey, const void* data, - size_t size, uint32_t flags, time_t exp) -{ - struct item* ret= calloc(1, sizeof(struct list_entry)); - - if (ret != NULL) - { - ret->key= malloc(nkey); - if (size > 0) - { - ret->data= malloc(size); - } - - if (ret->key == NULL || (size > 0 && ret->data == NULL)) - { - free(ret->key); - free(ret->data); - free(ret); - return NULL; - } - - memcpy(ret->key, key, nkey); - if (data != NULL) - { - memcpy(ret->data, data, size); - } - - ret->nkey= nkey; - ret->size= size; - ret->flags= flags; - ret->exp= exp; - } - - return ret; -} - -bool delete_item(const void* key, size_t nkey) -{ - struct item* item= get_item(key, nkey); - bool ret= false; - - if (item) - { - /* remove from linked list */ - struct list_entry *entry= (void*)item; - - if (entry->next == entry) - { - /* Only one object in the list */ - root= NULL; - } - else - { - /* ensure that we don't loose track of the root, and this will - * change the start position for the next search ;-) */ - root= entry->next; - entry->prev->next= entry->next; - entry->next->prev= entry->prev; - } - - free(item->key); - free(item->data); - free(item); - ret= true; - } - - return ret; -} - -void flush(uint32_t when) -{ - /* FIXME */ - (void)when; - /* remove the complete linked list */ - if (root == NULL) - { - return; - } - - root->prev->next= NULL; - while (root != NULL) - { - struct item* tmp= (void*)root; - root= root->next; - - free(tmp->key); - free(tmp->data); - free(tmp); - } -} - -void update_cas(struct item* item) -{ - item->cas= ++cas; -} - -void release_item(struct item* item) -{ - (void)item; - /* EMPTY */ -} diff --git a/example/storage.cc b/example/storage.cc new file mode 100644 index 00000000..2e63a3e3 --- /dev/null +++ b/example/storage.cc @@ -0,0 +1,168 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#include "config.h" +#include +#include +#include +#include +#include +#include "storage.h" + +struct list_entry { + struct item item; + struct list_entry *next; + struct list_entry *prev; +}; + +static struct list_entry *root; +static uint64_t cas; + +bool initialize_storage(void) +{ + return true; +} + +void shutdown_storage(void) +{ + /* Do nothing */ +} + +void put_item(struct item* item) +{ + struct list_entry* entry= (struct list_entry*)item; + + update_cas(item); + + if (root == NULL) + { + entry->next= entry->prev= entry; + } + else + { + entry->prev= root->prev; + entry->next= root; + entry->prev->next= entry; + entry->next->prev= entry; + } + + root= entry; +} + +struct item* get_item(const void* key, size_t nkey) +{ + struct list_entry *walker= root; + + if (root == NULL) + { + return NULL; + } + + do + { + if (((struct item*)walker)->nkey == nkey && + memcmp(((struct item*)walker)->key, key, nkey) == 0) + { + return (struct item*)walker; + } + walker= walker->next; + } while (walker != root); + + return NULL; +} + +struct item* create_item(const void* key, size_t nkey, const void* data, + size_t size, uint32_t flags, time_t exp) +{ + struct item* ret= (struct item*)calloc(1, sizeof(struct list_entry)); + + if (ret != NULL) + { + ret->key= malloc(nkey); + if (size > 0) + { + ret->data= malloc(size); + } + + if (ret->key == NULL || (size > 0 && ret->data == NULL)) + { + free(ret->key); + free(ret->data); + free(ret); + return NULL; + } + + memcpy(ret->key, key, nkey); + if (data != NULL) + { + memcpy(ret->data, data, size); + } + + ret->nkey= nkey; + ret->size= size; + ret->flags= flags; + ret->exp= exp; + } + + return ret; +} + +bool delete_item(const void* key, size_t nkey) +{ + struct item* item= get_item(key, nkey); + bool ret= false; + + if (item) + { + /* remove from linked list */ + struct list_entry *entry= (struct list_entry*)item; + + if (entry->next == entry) + { + /* Only one object in the list */ + root= NULL; + } + else + { + /* ensure that we don't loose track of the root, and this will + * change the start position for the next search ;-) */ + root= entry->next; + entry->prev->next= entry->next; + entry->next->prev= entry->prev; + } + + free(item->key); + free(item->data); + free(item); + ret= true; + } + + return ret; +} + +void flush(uint32_t /* when */) +{ + /* remove the complete linked list */ + if (root == NULL) + { + return; + } + + root->prev->next= NULL; + while (root != NULL) + { + struct item* tmp= (struct item*)root; + root= root->next; + + free(tmp->key); + free(tmp->data); + free(tmp); + } +} + +void update_cas(struct item* item) +{ + item->cas= ++cas; +} + +void release_item(struct item* /* item */) +{ +} diff --git a/example/t/include.am b/example/t/include.am new file mode 100644 index 00000000..82ba9404 --- /dev/null +++ b/example/t/include.am @@ -0,0 +1,23 @@ +# vim:ft=automake +# Copyright (C) 2012 Data Differential +# All rights reserved. +# +# Use and distribution licensed under the BSD license. See +# the COPYING file in the parent directory for full text. +# +# included from Top Level Makefile.am +# All paths should be given relative to the root + +MEMCACHED_LIGHT_TESTS_LDADDS= \ + libmemcached/libmemcached.la \ + libmemcached/libmemcachedutil.la \ + libtest/libtest.la +example_t_memcached_light_SOURCES= example/t/memcached_light.cc +example_t_memcached_light_CXXFLAGS = $(AM_CXXFLAGS) +example_t_memcached_light_DEPENDENCIES= $(MEMCACHED_LIGHT_TESTS_LDADDS) +example_t_memcached_light_LDADD= $(example_t_memcached_light_DEPENDENCIES) +check_PROGRAMS+= example/t/memcached_light +noinst_PROGRAMS+= example/t/memcached_light + +test-memcached_light: example/t/memcached_light + @example/t/memcached_light diff --git a/example/t/memcached_light.cc b/example/t/memcached_light.cc new file mode 100644 index 00000000..032d6f90 --- /dev/null +++ b/example/t/memcached_light.cc @@ -0,0 +1,132 @@ +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: + * + * Test memslap + * + * Copyright (C) 2012 Data Differential, http://datadifferential.com/ + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * The names of its contributors may not be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + + +/* + Test that we are cycling the servers we are creating during testing. +*/ + +#include + +#include +#include + +using namespace libtest; + +#ifndef __INTEL_COMPILER +#pragma GCC diagnostic ignored "-Wstrict-aliasing" +#endif + +static std::string executable; + +static test_return_t help_TEST(void *) +{ + const char *memcached_light_args[]= { "--help", 0 }; + Application memcached_light_app(executable, true); + + test_compare(Application::SUCCESS, memcached_light_app.run(memcached_light_args)); + test_compare(Application::SUCCESS, memcached_light_app.wait()); + + return TEST_SUCCESS; +} + +static test_return_t basic_TEST(void *) +{ + char port_buffer[1024]; + snprintf(port_buffer, sizeof(port_buffer), "--port=%d", int(libtest::default_port())); + const char *memcached_light_args[]= { port_buffer, 0 }; + Application memcached_light_app(executable, true); + + test_compare(Application::SUCCESS, memcached_light_app.run(memcached_light_args)); + + char instance_buffer[1024]; + int instance_buffer_length= snprintf(instance_buffer, sizeof(instance_buffer), "--BINARY --server=localhost:%d", int(libtest::default_port())); + memcached_st *memc= memcached(instance_buffer, instance_buffer_length); + +#if 0 + for (size_t x= 0; x < 24; x++) + { + char id_buffer[1024]; + int length= snprintf(id_buffer, sizeof(id_buffer), "%u", uint32_t(x)); + test_compare_hint(MEMCACHED_NOTFOUND, memcached_delete(memc, id_buffer, length, 0), + id_buffer); + } +#endif + + // We fail since we are just outright killing it. + test_compare(Application::FAILURE, memcached_light_app.wait()); + + memcached_free(memc); + + return TEST_SUCCESS; +} + +test_st help_TESTS[] ={ + {"--help", true, help_TEST }, + {0, 0, 0} +}; + +test_st basic_TESTS[] ={ + {"--port", true, basic_TEST }, + {0, 0, 0} +}; + +collection_st collection[] ={ + {"--help", 0, 0, help_TESTS }, + {"basics", 0, 0, basic_TESTS }, + {0, 0, 0, 0} +}; + +static void *world_create(server_startup_st& servers, test_return_t& error) +{ + if (HAVE_MEMCACHED_BINARY == 0) + { + error= TEST_SKIPPED; + return NULL; + } + + return &servers; +} + + +void get_world(Framework *world) +{ + executable= "./example/memcached_light"; + world->collections= collection; + world->_create= world_create; +} + + diff --git a/libmemcached/socket.hpp b/libmemcached/socket.hpp index 0d18857a..fb589931 100644 --- a/libmemcached/socket.hpp +++ b/libmemcached/socket.hpp @@ -50,6 +50,7 @@ #include "win32/wrappers.h" #define get_socket_errno() WSAGetLastError() #else +#include #define INVALID_SOCKET -1 #define SOCKET_ERROR -1 #define closesocket(a) close(a) diff --git a/libmemcachedprotocol-0.0/callback.h b/libmemcachedprotocol-0.0/callback.h index 90204113..638536c8 100644 --- a/libmemcachedprotocol-0.0/callback.h +++ b/libmemcachedprotocol-0.0/callback.h @@ -168,14 +168,14 @@ typedef struct { * Delete an existing key * * @param cookie id of the client receiving the command - * @param key the key to delete + * @param key the key to delete_object * @param len the length of the key * @param cas the CAS in the request */ - protocol_binary_response_status (*delete)(const void *cookie, - const void *key, - uint16_t keylen, - uint64_t cas); + protocol_binary_response_status (*delete_object)(const void *cookie, + const void *key, + uint16_t keylen, + uint64_t cas); /** @@ -184,8 +184,8 @@ typedef struct { * @param cookie id of the client receiving the command * @param when when the cache should be flushed (0 == immediately) */ - protocol_binary_response_status (*flush)(const void *cookie, - uint32_t when); + protocol_binary_response_status (*flush_object)(const void *cookie, + uint32_t when); @@ -349,7 +349,7 @@ typedef enum { * Version 1 abstracts more of the protocol details, and let you work at * a logical level */ - MEMCACHED_PROTOCOL_HANDLER_V1= 1, + MEMCACHED_PROTOCOL_HANDLER_V1= 1 } memcached_protocol_interface_version_t; /** diff --git a/libmemcachedprotocol-0.0/handler.h b/libmemcachedprotocol-0.0/handler.h index f1cfdc03..113338df 100644 --- a/libmemcachedprotocol-0.0/handler.h +++ b/libmemcachedprotocol-0.0/handler.h @@ -152,6 +152,9 @@ memcached_protocol_client_st *memcached_protocol_create_client(memcached_protoco LIBMEMCACHED_API void memcached_protocol_client_destroy(memcached_protocol_client_st *client); +LIBMEMCACHED_API +void memcached_protocol_client_set_verbose(struct memcached_protocol_client_st *client, bool arg); + /** * Error event means that the client encountered an error with the * connection so you should shut it down diff --git a/libmemcachedprotocol/ascii_handler.c b/libmemcachedprotocol/ascii_handler.c index f6134b3c..8cd29720 100644 --- a/libmemcachedprotocol/ascii_handler.c +++ b/libmemcachedprotocol/ascii_handler.c @@ -40,6 +40,87 @@ #include #include #include +#include + + +static void print_ascii_command(memcached_protocol_client_st *client) +{ + if (client->is_verbose) + { + switch (client->ascii_command) + { + case SET_CMD: + fprintf(stderr, "%s:%d SET_CMD\n", __FILE__, __LINE__); + break; + + case ADD_CMD: + fprintf(stderr, "%s:%d ADD_CMD\n", __FILE__, __LINE__); + break; + + case REPLACE_CMD: + fprintf(stderr, "%s:%d REPLACE_CMD\n", __FILE__, __LINE__); + break; + + case CAS_CMD: + fprintf(stderr, "%s:%d CAS_CMD\n", __FILE__, __LINE__); + break; + + case APPEND_CMD: + fprintf(stderr, "%s:%d APPEND_CMD\n", __FILE__, __LINE__); + break; + + case PREPEND_CMD: + fprintf(stderr, "%s:%d PREPEND_CMD\n", __FILE__, __LINE__); + break; + + case DELETE_CMD: + fprintf(stderr, "%s:%d DELETE_CMD\n", __FILE__, __LINE__); + break; + + case INCR_CMD: /* FALLTHROUGH */ + fprintf(stderr, "%s:%d INCR_CMD\n", __FILE__, __LINE__); + break; + + case DECR_CMD: + fprintf(stderr, "%s:%d DECR_CMD\n", __FILE__, __LINE__); + break; + + case STATS_CMD: + fprintf(stderr, "%s:%d STATS_CMD\n", __FILE__, __LINE__); + break; + + case FLUSH_ALL_CMD: + fprintf(stderr, "%s:%d FLUSH_ALL_CMD\n", __FILE__, __LINE__); + break; + + case VERSION_CMD: + fprintf(stderr, "%s:%d VERSION_CMD\n", __FILE__, __LINE__); + break; + + case QUIT_CMD: + fprintf(stderr, "%s:%d QUIT_CMD\n", __FILE__, __LINE__); + break; + + case VERBOSITY_CMD: + fprintf(stderr, "%s:%d VERBOSITY_CMD\n", __FILE__, __LINE__); + break; + + case GET_CMD: + fprintf(stderr, "%s:%d GET_CMD\n", __FILE__, __LINE__); + break; + + case GETS_CMD: + fprintf(stderr, "%s:%d GETS_CMD\n", __FILE__, __LINE__); + break; + + default: + case UNKNOWN_CMD: + fprintf(stderr, "%s:%d UNKNOWN_CMD\n", __FILE__, __LINE__); + break; + + } + } +} /** * Try to parse a key from the string. @@ -81,9 +162,50 @@ static uint16_t parse_ascii_key(char **start) * @param text the text to spool * @return status of the spool operation */ -static protocol_binary_response_status -spool_string(memcached_protocol_client_st *client, const char *text) +static protocol_binary_response_status raw_response_handler(memcached_protocol_client_st *client, const char *text) { + if (client->is_verbose) + { + fprintf(stderr, "%s:%d %s\n", __FILE__, __LINE__, text); + } + + if (client->root->drain(client) == false) + { + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; + } + + assert(client->output != NULL); +#if 0 + if (client->output == NULL) + { + /* I can write directly to the socket.... */ + do + { + size_t num_bytes= len -offset; + ssize_t nw= client->root->send(client, + client->sock, + ptr + offset, + num_bytes); + if (nw == -1) + { + if (get_socket_errno() == EWOULDBLOCK) + { + break; + } + else if (get_socket_errno() != EINTR) + { + client->error= errno; + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; + } + } + else + { + offset += (size_t)nw; + } + } while (offset < len); + } +#endif + return client->root->spool(client, text, strlen(text)); } @@ -103,7 +225,7 @@ static void send_command_usage(memcached_protocol_client_st *client) [CAS_CMD]= "CLIENT_ERROR: Syntax error: cas [noreply]\r\n", [APPEND_CMD]= "CLIENT_ERROR: Syntax error: append [noreply]\r\n", [PREPEND_CMD]= "CLIENT_ERROR: Syntax error: prepend [noreply]\r\n", - [DELETE_CMD]= "CLIENT_ERROR: Syntax error: delete [noreply]\r\n", + [DELETE_CMD]= "CLIENT_ERROR: Syntax error: delete_object [noreply]\r\n", [INCR_CMD]= "CLIENT_ERROR: Syntax error: incr [noreply]\r\n", [DECR_CMD]= "CLIENT_ERROR: Syntax error: decr [noreply]\r\n", [STATS_CMD]= "CLIENT_ERROR: Syntax error: stats [key]\r\n", @@ -116,7 +238,7 @@ static void send_command_usage(memcached_protocol_client_st *client) }; client->mute = false; - spool_string(client, errmsg[client->ascii_command]); + raw_response_handler(client, errmsg[client->ascii_command]); } /** @@ -125,15 +247,14 @@ static void send_command_usage(memcached_protocol_client_st *client) * @param text the length of the body * @param textlen the length of the body */ -static protocol_binary_response_status -ascii_version_response_handler(const void *cookie, - const void *text, - uint32_t textlen) +static protocol_binary_response_status ascii_version_response_handler(const void *cookie, + const void *text, + uint32_t textlen) { memcached_protocol_client_st *client= (memcached_protocol_client_st*)cookie; - spool_string(client, "VERSION "); + raw_response_handler(client, "VERSION "); client->root->spool(client, text, textlen); - spool_string(client, "\r\n"); + raw_response_handler(client, "\r\n"); return PROTOCOL_BINARY_RESPONSE_SUCCESS; } @@ -204,27 +325,26 @@ ascii_get_response_handler(const void *cookie, * @param body the length of the body * @param bodylen the length of the body */ -static protocol_binary_response_status -ascii_stat_response_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void *body, - uint32_t bodylen) +static protocol_binary_response_status ascii_stat_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen) { memcached_protocol_client_st *client= (void*)cookie; if (key != NULL) { - spool_string(client, "STAT "); + raw_response_handler(client, "STAT "); client->root->spool(client, key, keylen); - spool_string(client, " "); + raw_response_handler(client, " "); client->root->spool(client, body, bodylen); - spool_string(client, "\r\n"); + raw_response_handler(client, "\r\n"); } else { - spool_string(client, "END\r\n"); + raw_response_handler(client, "END\r\n"); } return PROTOCOL_BINARY_RESPONSE_SUCCESS; @@ -264,7 +384,9 @@ static void ascii_process_gets(memcached_protocol_client_st *client, send_command_usage(client); } else + { client->root->spool(client, "END\r\n", 5); + } } /** @@ -353,7 +475,7 @@ static enum ascii_cmd ascii_to_cmd(char *start, size_t length) { .cmd= "cas", .len= 3, .cc= CAS_CMD }, { .cmd= "append", .len= 6, .cc= APPEND_CMD }, { .cmd= "prepend", .len= 7, .cc= PREPEND_CMD }, - { .cmd= "delete", .len= 6, .cc= DELETE_CMD }, + { .cmd= "delete_object", .len= 6, .cc= DELETE_CMD }, { .cmd= "incr", .len= 4, .cc= INCR_CMD }, { .cmd= "decr", .len= 4, .cc= DECR_CMD }, { .cmd= "stats", .len= 5, .cc= STATS_CMD }, @@ -383,7 +505,7 @@ static enum ascii_cmd ascii_to_cmd(char *start, size_t length) } /** - * Perform a delete operation. + * Perform a delete_object operation. * * @param client client requesting the deletion * @param tokens the command as a vector @@ -401,28 +523,27 @@ static void process_delete(memcached_protocol_client_st *client, return; } - if (client->root->callback->interface.v1.delete == NULL) + if (client->root->callback->interface.v1.delete_object == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } - protocol_binary_response_status rval; - rval= client->root->callback->interface.v1.delete(client, key, nkey, 0); + protocol_binary_response_status rval= client->root->callback->interface.v1.delete_object(client, key, nkey, 0); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) { - spool_string(client, "DELETED\r\n"); + raw_response_handler(client, "DELETED\r\n"); } else if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) { - spool_string(client, "NOT_FOUND\r\n"); + raw_response_handler(client, "NOT_FOUND\r\n"); } else { char msg[80]; - snprintf(msg, sizeof(msg), "SERVER_ERROR: delete failed %u\r\n",(uint32_t)rval); - spool_string(client, msg); + snprintf(msg, sizeof(msg), "SERVER_ERROR: delete_object failed %u\r\n",(uint32_t)rval); + raw_response_handler(client, msg); } } @@ -447,7 +568,7 @@ static void process_arithmetic(memcached_protocol_client_st *client, { if (client->root->callback->interface.v1.increment == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } rval= client->root->callback->interface.v1.increment(client, @@ -461,7 +582,7 @@ static void process_arithmetic(memcached_protocol_client_st *client, { if (client->root->callback->interface.v1.decrement == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } rval= client->root->callback->interface.v1.decrement(client, @@ -476,11 +597,11 @@ static void process_arithmetic(memcached_protocol_client_st *client, { char buffer[80]; snprintf(buffer, sizeof(buffer), "%"PRIu64"\r\n", result); - spool_string(client, buffer); + raw_response_handler(client, buffer); } else { - spool_string(client, "NOT_FOUND\r\n"); + raw_response_handler(client, "NOT_FOUND\r\n"); } } @@ -494,12 +615,14 @@ static void process_stats(memcached_protocol_client_st *client, { if (client->root->callback->interface.v1.stat == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } while (isspace(*key)) + { key++; + } uint16_t nkey= (uint16_t)(end - key); (void)client->root->callback->interface.v1.stat(client, key, nkey, @@ -518,7 +641,7 @@ static void process_version(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.version == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } @@ -535,9 +658,9 @@ static void process_flush(memcached_protocol_client_st *client, return; } - if (client->root->callback->interface.v1.flush == NULL) + if (client->root->callback->interface.v1.flush_object == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } @@ -548,11 +671,11 @@ static void process_flush(memcached_protocol_client_st *client, } protocol_binary_response_status rval; - rval= client->root->callback->interface.v1.flush(client, timeout); + rval= client->root->callback->interface.v1.flush_object(client, timeout); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) - spool_string(client, "OK\r\n"); + raw_response_handler(client, "OK\r\n"); else - spool_string(client, "SERVER_ERROR: internal error\r\n"); + raw_response_handler(client, "SERVER_ERROR: internal error\r\n"); } /** @@ -570,8 +693,8 @@ static void process_flush(memcached_protocol_client_st *client, * 1 We need more data, so just go ahead and wait for more! */ static inline int process_storage_command(memcached_protocol_client_st *client, - char **tokens, int ntokens, char *start, - char **end, ssize_t length) + char **tokens, int ntokens, char *start, + char **end, ssize_t length) { (void)ntokens; /* already checked */ char *key= tokens[1]; @@ -579,7 +702,7 @@ static inline int process_storage_command(memcached_protocol_client_st *client, if (nkey == 0) { /* return error */ - spool_string(client, "CLIENT_ERROR: bad key\r\n"); + raw_response_handler(client, "CLIENT_ERROR: bad key\r\n"); return -1; } @@ -666,7 +789,7 @@ static inline int process_storage_command(memcached_protocol_client_st *client, if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) { - spool_string(client, "STORED\r\n"); + raw_response_handler(client, "STORED\r\n"); } else { @@ -674,20 +797,20 @@ static inline int process_storage_command(memcached_protocol_client_st *client, { if (rval == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS) { - spool_string(client, "EXISTS\r\n"); + raw_response_handler(client, "EXISTS\r\n"); } else if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) { - spool_string(client, "NOT_FOUND\r\n"); + raw_response_handler(client, "NOT_FOUND\r\n"); } else { - spool_string(client, "NOT_STORED\r\n"); + raw_response_handler(client, "NOT_STORED\r\n"); } } else { - spool_string(client, "NOT_STORED\r\n"); + raw_response_handler(client, "NOT_STORED\r\n"); } } @@ -708,7 +831,7 @@ static int process_cas_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.replace == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -727,7 +850,7 @@ static int process_set_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.set == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -746,7 +869,7 @@ static int process_add_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.add == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -765,7 +888,7 @@ static int process_replace_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.replace == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -784,7 +907,7 @@ static int process_append_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.append == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -803,7 +926,7 @@ static int process_prepend_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.prepend == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -831,20 +954,34 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto client->ascii_command= ascii_to_cmd(ptr, (size_t)(*length)); + /* we got all data available, execute the callback! */ + if (client->root->callback->pre_execute != NULL) + { + client->root->callback->pre_execute(client, NULL); + } + + /* A multiget lists all of the keys, and I don't want to have an * avector of let's say 512 pointers to tokenize all of them, so let's * just handle them immediately */ if (client->ascii_command == GET_CMD || - client->ascii_command == GETS_CMD) { + client->ascii_command == GETS_CMD) + { if (client->root->callback->interface.v1.get != NULL) + { ascii_process_gets(client, ptr, end); + } else - spool_string(client, "SERVER_ERROR: Command not implemented\n"); - } else { + { + raw_response_handler(client, "SERVER_ERROR: Command not implemented\n"); + } + } + else + { /* None of the defined commands takes 10 parameters, so lets just use * that as a maximum limit. - */ + */ char *tokens[10]; int ntokens= ascii_tokenize_command(ptr, end, tokens, 10); @@ -852,33 +989,40 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto { client->mute= strcmp(tokens[ntokens - 1], "noreply") == 0; if (client->mute) + { --ntokens; /* processed noreply token*/ + } } int error= 0; - switch (client->ascii_command) { + print_ascii_command(client); + switch (client->ascii_command) + { case SET_CMD: error= process_set_command(client, tokens, ntokens, ptr, &end, *length); break; + case ADD_CMD: error= process_add_command(client, tokens, ntokens, ptr, &end, *length); break; + case REPLACE_CMD: - error= process_replace_command(client, tokens, ntokens, - ptr, &end, *length); + error= process_replace_command(client, tokens, ntokens, ptr, &end, *length); break; + case CAS_CMD: error= process_cas_command(client, tokens, ntokens, ptr, &end, *length); break; + case APPEND_CMD: - error= process_append_command(client, tokens, ntokens, - ptr, &end, *length); + error= process_append_command(client, tokens, ntokens, ptr, &end, *length); break; + case PREPEND_CMD: - error= process_prepend_command(client, tokens, ntokens, - ptr, &end, *length); - break; + error= process_prepend_command(client, tokens, ntokens, ptr, &end, *length); + break; + case DELETE_CMD: process_delete(client, tokens, ntokens); break; @@ -887,6 +1031,7 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto case DECR_CMD: process_arithmetic(client, tokens, ntokens); break; + case STATS_CMD: if (client->mute) { @@ -898,9 +1043,11 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto process_stats(client, ptr + 6, end); } break; + case FLUSH_ALL_CMD: process_flush(client, tokens, ntokens); break; + case VERSION_CMD: if (client->mute) { @@ -911,6 +1058,7 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto process_version(client, tokens, ntokens); } break; + case QUIT_CMD: if (ntokens != 1 || client->mute) { @@ -919,7 +1067,9 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto else { if (client->root->callback->interface.v1.quit != NULL) + { client->root->callback->interface.v1.quit(client); + } return MEMCACHED_PROTOCOL_ERROR_EVENT; } @@ -927,9 +1077,13 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto case VERBOSITY_CMD: if (ntokens != 2) + { send_command_usage(client); + } else - spool_string(client, "OK\r\n"); + { + raw_response_handler(client, "OK\r\n"); + } break; case UNKNOWN_CMD: @@ -944,9 +1098,18 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto } if (error == -1) + { return MEMCACHED_PROTOCOL_ERROR_EVENT; + } else if (error == 1) + { return MEMCACHED_PROTOCOL_READ_EVENT; + } + } + + if (client->root->callback->post_execute != NULL) + { + client->root->callback->post_execute(client, NULL); } /* Move past \n */ diff --git a/libmemcachedprotocol/binary_handler.c b/libmemcachedprotocol/binary_handler.c index c2fd4414..3778e344 100644 --- a/libmemcachedprotocol/binary_handler.c +++ b/libmemcachedprotocol/binary_handler.c @@ -59,10 +59,9 @@ * @param response the packet to send * @return The status of the operation */ -static protocol_binary_response_status -raw_response_handler(const void *cookie, - protocol_binary_request_header *request, - protocol_binary_response_header *response) +static protocol_binary_response_status raw_response_handler(const void *cookie, + protocol_binary_request_header *request, + protocol_binary_response_header *response) { memcached_protocol_client_st *client= (void*)cookie; @@ -72,12 +71,12 @@ raw_response_handler(const void *cookie, return PROTOCOL_BINARY_RESPONSE_EINVAL; } - if (!client->root->drain(client)) + if (client->root->drain(client) == false) { return PROTOCOL_BINARY_RESPONSE_EINTERNAL; } - size_t len= sizeof(*response) + htonl(response->response.bodylen); + size_t len= sizeof(protocol_binary_response_header) + htonl(response->response.bodylen); size_t offset= 0; char *ptr= (void*)response; @@ -113,6 +112,75 @@ raw_response_handler(const void *cookie, return client->root->spool(client, ptr, len - offset); } +static void print_cmd(protocol_binary_command cmd) +{ + switch (cmd) + { + case PROTOCOL_BINARY_CMD_GET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GET\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_SET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SET\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_ADD: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_ADD\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_REPLACE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_REPLACE\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_DELETE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DELETE\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_INCREMENT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_INCREMENT\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_DECREMENT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DECREMENT\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_QUIT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_QUIT\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_FLUSH: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_FLUSH\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_GETQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_NOOP: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_NOOP\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_VERSION: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_VERSION\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_GETK: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETK\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_GETKQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETKQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_APPEND: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_APPEND\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_PREPEND: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_PREPEND\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_STAT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_STAT\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_SETQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SETQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_ADDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_ADDQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_REPLACEQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_REPLACEQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_DELETEQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DELETEQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_INCREMENTQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_INCREMENTQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_DECREMENTQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DECREMENTQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_QUITQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_QUITQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_FLUSHQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_FLUSHQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_APPENDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_APPENDQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_PREPENDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_PREPENDQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_VERBOSITY: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_VERBOSITY\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TOUCH: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TOUCH\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_GAT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GAT\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_GATQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GATQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_LIST_MECHS\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_SASL_AUTH: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_AUTH\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_SASL_STEP: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_STEP\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RGET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RGET\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RSET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RSET\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RSETQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RSETQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RAPPEND: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RAPPEND\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RAPPENDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RAPPENDQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RPREPEND: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RPREPEND\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RPREPENDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RPREPENDQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RDELETE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDELETE\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RDELETEQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDELETEQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RINCR: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RINCR\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RINCRQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RINCRQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RDECR: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDECR\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_RDECRQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDECRQ\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_SET_VBUCKET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SET_VBUCKET\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_GET_VBUCKET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GET_VBUCKET\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_DEL_VBUCKET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DEL_VBUCKET\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TAP_CONNECT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CONNECT\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TAP_MUTATION: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_MUTATION\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TAP_DELETE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_DELETE\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TAP_FLUSH: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_FLUSH\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TAP_OPAQUE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_OPAQUE\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_LAST_RESERVED: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_LAST_RESERVED\n", __FILE__, __LINE__); return; + case PROTOCOL_BINARY_CMD_SCRUB: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SCRUB\n", __FILE__, __LINE__); return; + default: + abort(); + } +} + /* * Version 0 of the interface is really low level and protocol specific, * while the version 1 of the interface is more API focused. We need a @@ -131,15 +199,14 @@ raw_response_handler(const void *cookie, * @param flags the flags for the item * @param cas the CAS id for the item */ -static protocol_binary_response_status -get_response_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void *body, - uint32_t bodylen, - uint32_t flags, - uint64_t cas) { - +static protocol_binary_response_status get_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen, + uint32_t flags, + uint64_t cas) +{ memcached_protocol_client_st *client= (void*)cookie; uint8_t opcode= client->current_command->request.opcode; @@ -222,10 +289,10 @@ static protocol_binary_response_status stat_response_handler(const void *cookie, * @param text the length of the body * @param textlen the length of the body */ -static protocol_binary_response_status -version_response_handler(const void *cookie, - const void *text, - uint32_t textlen) { +static protocol_binary_response_status version_response_handler(const void *cookie, + const void *text, + uint32_t textlen) +{ memcached_protocol_client_st *client= (void*)cookie; @@ -372,21 +439,20 @@ decrement_command_handler(const void *cookie, * @param response_handler not used * @return the result of the operation */ -static protocol_binary_response_status -delete_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) +static protocol_binary_response_status delete_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.delete != NULL) + if (client->root->callback->interface.v1.delete_object != NULL) { uint16_t keylen= ntohs(header->request.keylen); void *key= (header +1); uint64_t cas= memcached_ntohll(header->request.cas); - rval= client->root->callback->interface.v1.delete(cookie, key, keylen, cas); + rval= client->root->callback->interface.v1.delete_object(cookie, key, keylen, cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_DELETE) { @@ -428,16 +494,16 @@ flush_command_handler(const void *cookie, protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.flush != NULL) + if (client->root->callback->interface.v1.flush_object != NULL) { - protocol_binary_request_flush *flush= (void*)header; + protocol_binary_request_flush *flush_object= (void*)header; uint32_t timeout= 0; if (htonl(header->request.bodylen) == 4) { - timeout= ntohl(flush->message.body.expiration); + timeout= ntohl(flush_object->message.body.expiration); } - rval= client->root->callback->interface.v1.flush(cookie, timeout); + rval= client->root->callback->interface.v1.flush_object(cookie, timeout); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH) { @@ -805,10 +871,9 @@ replace_command_handler(const void *cookie, * @param response_handler not used * @return the result of the operation */ -static protocol_binary_response_status -set_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) +static protocol_binary_response_status set_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; @@ -975,22 +1040,30 @@ static protocol_binary_response_status execute_command(memcached_protocol_client client->root->callback->pre_execute(client, header); } - protocol_binary_response_status rval; - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; uint8_t cc= header->request.opcode; + if (client->is_verbose) + { + print_cmd(cc); + } + switch (client->root->callback->interface_version) { case 0: - if (client->root->callback->interface.v0.comcode[cc] != NULL) { + if (client->root->callback->interface.v0.comcode[cc] != NULL) + { rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler); } break; + case 1: - if (comcode_v0_v1_remap[cc] != NULL) { + if (comcode_v0_v1_remap[cc] != NULL) + { rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler); } break; + default: /* Unknown interface. * It should be impossible to get here so I'll just call abort @@ -1060,8 +1133,11 @@ memcached_protocol_event_t memcached_binary_protocol_process_data(memcached_prot *length= len; *endptr= (void*)header; return MEMCACHED_PROTOCOL_ERROR_EVENT; - } else if (rv == PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED) + } + else if (rv == PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED) + { return MEMCACHED_PROTOCOL_PAUSE_EVENT; + } ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)); len -= total; diff --git a/libmemcachedprotocol/common.h b/libmemcachedprotocol/common.h index ea94a8d7..8453ccd8 100644 --- a/libmemcachedprotocol/common.h +++ b/libmemcachedprotocol/common.h @@ -124,6 +124,7 @@ enum ascii_cmd { }; struct memcached_protocol_client_st { + bool is_verbose; memcached_protocol_st *root; memcached_socket_t sock; int error; diff --git a/libmemcachedprotocol/handler.c b/libmemcachedprotocol/handler.c index 9ce927a6..59b461f6 100644 --- a/libmemcachedprotocol/handler.c +++ b/libmemcachedprotocol/handler.c @@ -89,7 +89,7 @@ static ssize_t default_send(const void *cookie, size_t nbytes) { (void)cookie; - return send(fd, buf, nbytes, 0); + return send(fd, buf, nbytes, MSG_NOSIGNAL); } /** @@ -102,12 +102,17 @@ static ssize_t default_send(const void *cookie, */ static bool drain_output(struct memcached_protocol_client_st *client) { - ssize_t len; + if (client->is_verbose) + { + fprintf(stderr, "%s:%d %s mute:%d output:%s length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute, + client->output ? "yes" : "no", + client->output ? (int)(client->output->nbytes - client->output->offset) : 0); + } /* Do we have pending data to send? */ while (client->output != NULL) { - len= client->root->send(client, + ssize_t len= client->root->send(client, client->sock, client->output->data + client->output->offset, client->output->nbytes - client->output->offset); @@ -189,6 +194,11 @@ static protocol_binary_response_status spool_output(struct memcached_protocol_cl const void *data, size_t length) { + if (client->is_verbose) + { + fprintf(stderr, "%s:%d %s mute:%d length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute, (int)length); + } + if (client->mute) { return PROTOCOL_BINARY_RESPONSE_SUCCESS; @@ -233,10 +243,19 @@ static memcached_protocol_event_t determine_protocol(struct memcached_protocol_c { if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ) { + if (client->is_verbose) + { + fprintf(stderr, "%s:%d PROTOCOL: memcached_binary_protocol_process_data\n", __FILE__, __LINE__); + } client->work= memcached_binary_protocol_process_data; } else if (client->root->callback->interface_version == 1) { + if (client->is_verbose) + { + fprintf(stderr, "%s:%d PROTOCOL: memcached_ascii_protocol_process_data\n", __FILE__, __LINE__); + } + /* * The ASCII protocol can only be used if the implementors provide * an implementation for the version 1 of the interface.. @@ -249,12 +268,18 @@ static memcached_protocol_event_t determine_protocol(struct memcached_protocol_c } else { + if (client->is_verbose) + { + fprintf(stderr, "%s:%d PROTOCOL: Unsupported protocol\n", __FILE__, __LINE__); + } + /* Let's just output a warning the way it is supposed to look like * in the ASCII protocol... */ const char *err= "CLIENT_ERROR: Unsupported protocol\r\n"; client->root->spool(client, err, strlen(err)); client->root->drain(client); + return MEMCACHED_PROTOCOL_ERROR_EVENT; /* Unsupported protocol */ } @@ -282,6 +307,7 @@ struct memcached_protocol_st *memcached_protocol_create_instance(void) { free(ret); ret= NULL; + return NULL; } @@ -307,7 +333,7 @@ void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance) struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, memcached_socket_t sock) { - struct memcached_protocol_client_st *ret= calloc(1, sizeof(*ret)); + struct memcached_protocol_client_st *ret= calloc(1, sizeof(memcached_protocol_client_st)); if (ret != NULL) { ret->root= instance; @@ -323,6 +349,14 @@ void memcached_protocol_client_destroy(struct memcached_protocol_client_st *clie free(client); } +void memcached_protocol_client_set_verbose(struct memcached_protocol_client_st *client, bool arg) +{ + if (client) + { + client->is_verbose= arg; + } +} + memcached_protocol_event_t memcached_protocol_client_work(struct memcached_protocol_client_st *client) { /* Try to send data and read from the socket */ @@ -395,7 +429,9 @@ memcached_protocol_event_t memcached_protocol_client_work(struct memcached_proto memcached_protocol_event_t ret= MEMCACHED_PROTOCOL_READ_EVENT; if (client->output) + { ret|= MEMCACHED_PROTOCOL_READ_EVENT; + } return ret; } diff --git a/libtest/cmdline.cc b/libtest/cmdline.cc index 7c2f5f44..6ef40784 100644 --- a/libtest/cmdline.cc +++ b/libtest/cmdline.cc @@ -214,7 +214,10 @@ Application::error_t Application::wait() } else { - assert(waited_pid == _pid); + if (waited_pid != _pid) + { + throw libtest::fatal(LIBYATL_DEFAULT_PARAM, "Pid mismatch, %d != %d", int(waited_pid), int(_pid)); + } exit_code= error_t(exited_successfully(status)); } } diff --git a/libtest/test.hpp b/libtest/test.hpp index b77cd214..eae8ca34 100644 --- a/libtest/test.hpp +++ b/libtest/test.hpp @@ -26,7 +26,6 @@ #include #include -#include #include #include diff --git a/tests/memrm.cc b/tests/memrm.cc index 8ca4445e..e422a19d 100644 --- a/tests/memrm.cc +++ b/tests/memrm.cc @@ -95,7 +95,7 @@ static test_return_t rm_test(void *) return TEST_SUCCESS; } -static test_return_t NOT_FOUND_test(void *) +static test_return_t NOT_FOUND_TEST(void *) { char buffer[1024]; snprintf(buffer, sizeof(buffer), "--server=localhost:%d", int(default_port())); @@ -118,11 +118,24 @@ static test_return_t NOT_FOUND_test(void *) return TEST_SUCCESS; } +static test_return_t multiple_NOT_FOUND_TEST(void *) +{ + char buffer[1024]; + snprintf(buffer, sizeof(buffer), "--server=localhost:%d", int(default_port())); + const char *args[]= { buffer, "protocols", "foo", "mine", "bar", "dog", "cat", "foo", "mine", + "eye", "for", "the", "to", "not", "know", "what", "I", "should", "be", "doing", 0 }; + + test_compare(EXIT_SUCCESS, exec_cmdline(executable, args, true)); + + return TEST_SUCCESS; +} + test_st memrm_tests[] ={ {"--quiet", true, quiet_test }, {"--help", true, help_test }, {"rm(FOUND)", true, rm_test }, - {"rm(NOT_FOUND)", true, NOT_FOUND_test }, + {"rm(NOT_FOUND)", true, NOT_FOUND_TEST }, + {"multiple rm(NOT_FOUND)", true, multiple_NOT_FOUND_TEST }, {0, 0, 0} };