From: Trond Norbye Date: Thu, 1 Oct 2009 13:45:12 +0000 (+0200) Subject: Refactor and add support for the ASCII protocol by wrapping the binary protocol X-Git-Tag: 0.34~17^2 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=a148d541d261c9c867c412e773920f5a1973bbae;p=m6w6%2Flibmemcached Refactor and add support for the ASCII protocol by wrapping the binary protocol --- diff --git a/.bzrignore b/.bzrignore index 73d93db3..507ac4a1 100644 --- a/.bzrignore +++ b/.bzrignore @@ -1,8 +1,10 @@ *.lo */.deps */.libs +*/.dirstamp */*/.deps */*/.libs +*/*/.dirstamp */Makefile */Makefile.in */*.l[oa] diff --git a/configure.ac b/configure.ac index aff404e6..6753a967 100644 --- a/configure.ac +++ b/configure.ac @@ -95,7 +95,6 @@ AC_CONFIG_FILES([ libmemcached/Makefile libmemcached/memcached_configure.h libmemcachedutil/Makefile - libmemcached/protocol/Makefile support/Makefile support/libmemcached.pc support/libmemcached.spec diff --git a/example/Makefile.am b/example/Makefile.am index b333b408..62bd5911 100644 --- a/example/Makefile.am +++ b/example/Makefile.am @@ -4,8 +4,8 @@ memcached_light_SOURCES= memcached_light.c \ storage.h \ interface_v0.c \ interface_v1.c -memcached_light_LDADD= $(top_builddir)/libmemcached/protocol/libmemcachedprotocol.la $(LIBINNODB) -memcached_light_DEPENDENCIES= $(top_builddir)/libmemcached/protocol/libmemcachedprotocol.la +memcached_light_LDADD= $(top_builddir)/libmemcached/libmemcachedprotocol.la $(LIBINNODB) +memcached_light_DEPENDENCIES= $(top_builddir)/libmemcached/libmemcachedprotocol.la if BUILD_BYTEORDER memcached_light_LDADD+= $(top_builddir)/libmemcached/libbyteorder.la diff --git a/example/memcached_light.c b/example/memcached_light.c index 743f2d1c..c1bdea04 100644 --- a/example/memcached_light.c +++ b/example/memcached_light.c @@ -264,8 +264,8 @@ int main(int argc, char **argv) interface->post_execute= post_execute; interface->unknown= unknown; - struct memcached_binary_protocol_st *protocol_handle; - if ((protocol_handle= memcached_binary_protocol_create_instance()) == NULL) + struct memcached_protocol_st *protocol_handle; + if ((protocol_handle= memcached_protocol_create_instance()) == NULL) { fprintf(stderr, "Failed to allocate protocol handle\n"); return 1; @@ -327,11 +327,11 @@ static void work(void) { continue; } - struct memcached_binary_protocol_st *protocol; + struct memcached_protocol_st *protocol; protocol= socket_userdata_map[fds[x].fd]; - struct memcached_binary_protocol_client_st* c; - c= memcached_binary_protocol_create_client(protocol, sock); + struct memcached_protocol_client_st* c; + c= memcached_protocol_create_client(protocol, sock); if (c == NULL) { fprintf(stderr, "Failed to create client\n"); @@ -349,12 +349,12 @@ static void work(void) { else { /* drive the client */ - struct memcached_binary_protocol_client_st* c; + struct memcached_protocol_client_st* c; c= socket_userdata_map[fds[x].fd]; assert(c != NULL); fds[max_poll].events= 0; - switch (memcached_binary_protocol_client_work(c)) + switch (memcached_protocol_client_work(c)) { case WRITE_EVENT: case READ_WRITE_EVENT: @@ -365,7 +365,7 @@ static void work(void) { break; case ERROR_EVENT: default: /* ERROR or unknown state.. close */ - memcached_binary_protocol_client_destroy(c); + memcached_protocol_client_destroy(c); close(fds[x].fd); fds[x].events= 0; diff --git a/libmemcached/Makefile.am b/libmemcached/Makefile.am index 14491446..ef9d0cd8 100644 --- a/libmemcached/Makefile.am +++ b/libmemcached/Makefile.am @@ -1,7 +1,6 @@ EXTRA_DIST = libmemcached_probes.d memcached/README.txt libmemcached.ver \ - memcached_configure.h.in + memcached_configure.h.in protocol/libmemcachedprotocol.ver -SUBDIRS = . protocol EXTRA_HEADERS = BUILT_SOURCES= @@ -10,7 +9,10 @@ noinst_HEADERS = libmemcached_probes.h \ memcached_io.h \ memcached_internal.h \ common.h \ - memcached/protocol_binary.h + memcached/protocol_binary.h \ + protocol/common.h \ + protocol/ascii_handler.h \ + protocol/binary_handler.h pkginclude_HEADERS= memcached.h \ memcached.hpp \ @@ -28,14 +30,22 @@ pkginclude_HEADERS= memcached.h \ visibility.h nobase_pkginclude_HEADERS=protocol/cache.h \ - protocol/callback.h + protocol/callback.h if BUILD_LIBMEMCACHEDUTIL pkginclude_HEADERS+= memcached_util.h memcached_pool.h endif -lib_LTLIBRARIES = libmemcached.la +libmemcachedprotocol_la_SOURCES= protocol/ascii_handler.c \ + protocol/binary_handler.c \ + protocol/cache.c \ + protocol/pedantic.c \ + protocol/protocol_handler.c + +libmemcachedprotocol_la_LDFLAGS= -version-info $(MEMCACHEDPROTOCOL_LIBRARY_VERSION) $(LD_PROTOCOL_VERSION_SCRIPT) + +lib_LTLIBRARIES = libmemcached.la libmemcachedprotocol.la noinst_LTLIBRARIES = libmemcachedcallbacks.la libmemcachedcallbacks_la_CFLAGS = ${AM_CFLAGS} ${NO_STRICT_ALIASING} libmemcachedcallbacks_la_SOURCES = memcached_callback.c @@ -89,6 +99,8 @@ noinst_LTLIBRARIES += libbyteorder.la libbyteorder_la_SOURCES= byteorder.c byteorder.h libmemcached_la_LIBADD += libbyteorder.la libmemcached_la_DEPENDENCIES+= libbyteorder.la +libmemcachedprotocol_la_LIBADD=libbyteorder.la +libmemcachedprotocol_la_DEPENDENCIES=libbyteorder.la endif if HAVE_DTRACE diff --git a/libmemcached/protocol/Makefile.am b/libmemcached/protocol/Makefile.am deleted file mode 100644 index 8847016e..00000000 --- a/libmemcached/protocol/Makefile.am +++ /dev/null @@ -1,13 +0,0 @@ -EXTRA_DIST= libmemcachedprotocol.ver - -lib_LTLIBRARIES=libmemcachedprotocol.la - -noinst_HEADERS = common.h - -libmemcachedprotocol_la_SOURCES= protocol_handler.c cache.c pedantic.c -libmemcachedprotocol_la_LDFLAGS= -version-info $(MEMCACHEDPROTOCOL_LIBRARY_VERSION) $(LD_PROTOCOL_VERSION_SCRIPT) - -if BUILD_BYTEORDER -libmemcachedprotocol_la_LIBADD=$(top_builddir)/libmemcached/libbyteorder.la -libmemcachedprotocol_la_DEPENDENCIES= $(top_builddir)/libmemcached/libbyteorder.la -endif diff --git a/libmemcached/protocol/ascii_handler.c b/libmemcached/protocol/ascii_handler.c new file mode 100644 index 00000000..8238a36d --- /dev/null +++ b/libmemcached/protocol/ascii_handler.c @@ -0,0 +1,908 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#include "libmemcached/protocol/common.h" + +#include +#include +#include +#include + +/** + * Try to parse a key from the string. + * @pointer start pointer to a pointer to the string (IN and OUT) + * @return length of the string of -1 if this was an illegal key (invalid + * characters or invalid length) + * @todo add length! + */ +static uint16_t parse_ascii_key(char **start) +{ + uint16_t len= 0; + char *c = *start; + /* Strip leading whitespaces */ + while (isspace(*c)) + { + ++c; + } + + *start= c; + + while (*c != '\0' && !isspace(*c) && !iscntrl(*c)) + { + ++c; + ++len; + } + + + if (len == 0 || len > 240 || (*c != '\0' && *c != '\r' && iscntrl(*c))) + { + return 0; + } + + return len; +} + +/** + * Spool a zero-terminated string + * @param client destination + * @param text the text to spool + * @return status of the spool operation + */ +static protocol_binary_response_status +spool_string(struct memcached_protocol_client_st *client, const char *text) +{ + return client->root->spool(client, text, strlen(text)); +} + +/** + * Send a "CLIENT_ERROR" message back to the client with the correct + * format of the command being sent + * @param client the client to send the message to + */ +static void send_command_usage(struct memcached_protocol_client_st *client) +{ + const char *errmsg[]= { + [GET_CMD]= "CLIENT_ERROR: Syntax error: get *\r\n", + [GETS_CMD]= "CLIENT_ERROR: Syntax error: gets *\r\n", + [SET_CMD]= "CLIENT_ERROR: Syntax error: set [noreply]\r\n", + [ADD_CMD]= "CLIENT_ERROR: Syntax error: add [noreply]\r\n", + [REPLACE_CMD]= "CLIENT_ERROR: Syntax error: replace [noreply]\r\n", + [CAS_CMD]= "CLIENT_ERROR: Syntax error: cas [noreply]\r\n", + [APPEND_CMD]= "CLIENT_ERROR: Syntax error: append [noreply]\r\n", + [PREPEND_CMD]= "CLIENT_ERROR: Syntax error: prepend [noreply]\r\n", + [DELETE_CMD]= "CLIENT_ERROR: Syntax error: delete [noreply]\r\n", + [INCR_CMD]= "CLIENT_ERROR: Syntax error: incr [noreply]\r\n", + [DECR_CMD]= "CLIENT_ERROR: Syntax error: decr [noreply]\r\n", + [STATS_CMD]= "CLIENT_ERROR: Syntax error: stats [key]\r\n", + [FLUSH_ALL_CMD]= "CLIENT_ERROR: Syntax error: flush_all [timeout] [noreply]\r\n", + [VERSION_CMD]= "CLIENT_ERROR: Syntax error: version\r\n", + [QUIT_CMD]="CLIENT_ERROR: Syntax error: quit\r\n", + + [VERBOSITY_CMD]= "CLIENT_ERROR: Syntax error: verbosity \r\n", + [UNKNOWN_CMD]= "CLIENT_ERROR: Unknown command\r\n", + }; + + spool_string(client, errmsg[client->ascii_command]); +} + +/** + * Callback for the VERSION responses + * @param cookie client identifier + * @param text the length of the body + * @param textlen the length of the body + */ +static protocol_binary_response_status +ascii_version_response_handler(const void *cookie, + const void *text, + uint32_t textlen) +{ + struct memcached_protocol_client_st *client= (void*)cookie; + spool_string(client, "VERSION "); + client->root->spool(client, text, textlen); + spool_string(client, "\r\n"); + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Callback for the GET/GETQ/GETK and GETKQ responses + * @param cookie client identifier + * @param key the key for the item + * @param keylen the length of the key + * @param body the length of the body + * @param bodylen the length of the body + * @param flags the flags for the item + * @param cas the CAS id for the item + */ +static protocol_binary_response_status +ascii_get_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen, + uint32_t flags, + uint64_t cas) +{ + struct memcached_protocol_client_st *client= (void*)cookie; + char buffer[300]; + strcpy(buffer, "VALUE "); + const char *source = key; + char *dest = buffer + 6; + + for (int x= 0; x < keylen; ++x) + { + if (*source != '\0' && !isspace(*source) && !iscntrl(*source)) + { + *dest = *source; + } + else + { + return PROTOCOL_BINARY_RESPONSE_EINVAL; /* key constraints in ascii */ + } + + ++dest; + ++source; + } + + size_t used= (size_t)(dest - buffer); + + if (client->ascii_command == GETS_CMD) + { + snprintf(dest, sizeof(buffer) - used, " %u %u %llu\r\n", flags, + flags, (unsigned long long)cas); + } + else + { + snprintf(dest, sizeof(buffer) - used, " %u %u\r\n", flags, flags); + } + + client->root->spool(client, buffer, strlen(buffer)); + client->root->spool(client, body, bodylen); + client->root->spool(client, "\r\n", 2); + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Callback for the STAT responses + * @param cookie client identifier + * @param key the key for the item + * @param keylen the length of the key + * @param body the length of the body + * @param bodylen the length of the body + */ +static protocol_binary_response_status +ascii_stat_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen) +{ + + struct memcached_protocol_client_st *client= (void*)cookie; + + if (key != NULL) + { + spool_string(client, "STAT "); + client->root->spool(client, key, keylen); + spool_string(client, " "); + client->root->spool(client, body, bodylen); + spool_string(client, "\r\n"); + } + else + { + spool_string(client, "END\r\n"); + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Process a get or a gets request. + * @param client the client handle + * @param buffer the complete get(s) command + * @param end the last character in the command + */ +static void ascii_process_gets(struct memcached_protocol_client_st *client, + char *buffer, char *end) +{ + char *key= buffer; + + /* Skip command */ + key += (client->ascii_command == GETS_CMD) ? 5 : 4; + + int num_keys= 0; + while (key < end) + { + uint16_t nkey= parse_ascii_key(&key); + if (nkey == 0) /* Invalid key... stop processing this line */ + { + break; + } + + (void)client->root->callback->interface.v1.get(client, key, nkey, + ascii_get_response_handler); + key += nkey; + ++num_keys; + } + + if (num_keys == 0) + { + send_command_usage(client); + } + else + client->root->spool(client, "END\r\n", 5); +} + +/** + * Try to split up the command line "asdf asdf asdf asdf\n" into an + * argument vector for easier parsing. + * @param start the first character in the command line + * @param end the last character in the command line ("\n") + * @param vec the vector to insert the pointers into + * @size the number of elements in the vector + * @return the number of tokens in the vector + */ +static int ascii_tokenize_command(char *str, char *end, char **vec, int size) +{ + int elem= 0; + + while (str < end) + { + /* Skip leading blanks */ + while (str < end && isspace(*str)) + { + ++str; + } + + if (str == end) + { + return elem; + } + + vec[elem++]= str; + /* find the next non-blank field */ + while (str < end && !isspace(*str)) + { + ++str; + } + + /* zero-terminate it for easier parsing later on */ + *str = '\0'; + ++str; + + /* Is the vector full? */ + if (elem == size) + { + break; + } + } + + return elem; +} + +/** + * If we for some reasons needs to push the line back to read more + * data we have to reverse the tokenization. Just do the brain-dead replace + * of all '\0' to ' ' and set the last character to '\n'. We could have used + * the vector we created, but then we would have to search for all of the + * spaces we ignored... + * @param start pointer to the first character in the buffer to recover + * @param end pointer to the last character in the buffer to recover + */ +static void recover_tokenize_command(char *start, char *end) +{ + while (start < end) + { + if (*start == '\0') + *start = ' '; + ++start; + } + + *end= '\n'; +} + +/** + * Convert the textual command into a comcode + */ +static enum ascii_cmd ascii_to_cmd(char *start, size_t length) +{ + struct { + const char *cmd; + size_t len; + enum ascii_cmd cc; + } commands[] = { + { .cmd= "get", .len= 3, .cc= GET_CMD }, + { .cmd= "gets", .len= 4, .cc= GETS_CMD }, + { .cmd= "set", .len= 3, .cc= SET_CMD }, + { .cmd= "add", .len= 3, .cc= ADD_CMD }, + { .cmd= "replace", .len= 7, .cc= REPLACE_CMD }, + { .cmd= "cas", .len= 3, .cc= CAS_CMD }, + { .cmd= "append", .len= 6, .cc= APPEND_CMD }, + { .cmd= "prepend", .len= 7, .cc= PREPEND_CMD }, + { .cmd= "delete", .len= 6, .cc= DELETE_CMD }, + { .cmd= "incr", .len= 4, .cc= INCR_CMD }, + { .cmd= "decr", .len= 4, .cc= DECR_CMD }, + { .cmd= "stats", .len= 5, .cc= STATS_CMD }, + { .cmd= "flush_all", .len= 9, .cc= FLUSH_ALL_CMD }, + { .cmd= "version", .len= 7, .cc= VERSION_CMD }, + { .cmd= "quit", .len= 4, .cc= QUIT_CMD }, + { .cmd= "verbosity", .len= 9, .cc= VERBOSITY_CMD }, + { .cmd= NULL, .len= 0, .cc= UNKNOWN_CMD }}; + + int x= 0; + while (commands[x].len > 0) { + if (length >= commands[x].len) + { + if (strncmp(start, commands[x].cmd, commands[x].len) == 0) + { + /* Potential hit */ + if (length == commands[x].len || isspace(*(start + commands[x].len))) + { + return commands[x].cc; + } + } + } + ++x; + } + + return UNKNOWN_CMD; +} + +/** + * Perform a delete operation. + * + * @param client client requesting the deletion + * @param tokens the command as a vector + * @param ntokens the number of items in the vector + */ +static void process_delete(struct memcached_protocol_client_st *client, + char **tokens, int ntokens) +{ + char *key= tokens[1]; + uint16_t nkey; + + if (ntokens != 2 || (nkey= parse_ascii_key(&key)) == 0) + { + send_command_usage(client); + return; + } + + if (client->root->callback->interface.v1.delete == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return; + } + + protocol_binary_response_status rval; + rval= client->root->callback->interface.v1.delete(client, key, nkey, 0); + + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) + { + spool_string(client, "DELETED\r\n"); + } + else if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) + { + spool_string(client, "NOT_FOUND\r\n"); + } + else + { + char msg[80]; + snprintf(msg, sizeof(msg), "SERVER_ERROR: delete failed %u\r\n",(int)rval); + spool_string(client, msg); + } +} + +static void process_arithmetic(struct memcached_protocol_client_st *client, + char **tokens, int ntokens) +{ + char *key= tokens[1]; + uint16_t nkey; + + if (ntokens != 3 || (nkey= parse_ascii_key(&key)) == 0) + { + send_command_usage(client); + return; + } + + uint64_t cas; + uint64_t result; + uint64_t delta = strtoull(tokens[2], NULL, 10); + + protocol_binary_response_status rval; + if (client->ascii_command == INCR_CMD) + { + if (client->root->callback->interface.v1.increment == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return; + } + rval= client->root->callback->interface.v1.increment(client, + key, nkey, + delta, 0, + 0, + &result, + &cas); + } + else + { + if (client->root->callback->interface.v1.decrement == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return; + } + rval= client->root->callback->interface.v1.decrement(client, + key, nkey, + delta, 0, + 0, + &result, + &cas); + } + + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) + { + char buffer[80]; + snprintf(buffer, sizeof(buffer), "%llu\r\n", + (unsigned long long)result); + spool_string(client, buffer); + } + else + { + spool_string(client, "NOT_FOUND\r\n"); + } +} + +/** + * Process the stats command (with or without a key specified) + * @param key pointer to the first character after "stats" + * @param end pointer to the "\n" + */ +static void process_stats(struct memcached_protocol_client_st *client, + char *key, char *end) +{ + if (client->root->callback->interface.v1.stat == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return; + } + + while (isspace(*key)) + key++; + + uint16_t nkey= (uint16_t)(end - key); + (void)client->root->callback->interface.v1.stat(client, key, nkey, + ascii_stat_response_handler); +} + +static void process_version(struct memcached_protocol_client_st *client, + char **tokens, int ntokens) +{ + (void)tokens; + if (ntokens != 1) + { + send_command_usage(client); + return; + } + + if (client->root->callback->interface.v1.version == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return; + } + + client->root->callback->interface.v1.version(client, + ascii_version_response_handler); +} + +static void process_flush(struct memcached_protocol_client_st *client, + char **tokens, int ntokens) +{ + if (ntokens > 2) + { + send_command_usage(client); + return; + } + + if (client->root->callback->interface.v1.flush == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return; + } + + uint32_t timeout= 0; + if (ntokens == 2) + { + timeout = (uint32_t)strtoul(tokens[1], NULL, 10); + } + + protocol_binary_response_status rval; + rval= client->root->callback->interface.v1.flush(client, timeout); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) + spool_string(client, "OK\r\n"); + else + spool_string(client, "SERVER_ERROR: internal error\r\n"); +} + +/** + * Process one of the storage commands + * @param client the client performing the operation + * @param tokens the command tokens + * @param ntokens the number of tokens + * @param start pointer to the first character in the line + * @param end pointer to the pointer where the last character of this + * command is (IN and OUT) + * @param length the number of bytes available + * @return -1 if an error occurs (and we should just terminate the connection + * because we are out of sync) + * 0 storage command completed, continue processing + * 1 We need more data, so just go ahead and wait for more! + */ +static inline int process_storage_command(struct memcached_protocol_client_st *client, + char **tokens, int ntokens, char *start, + char **end, ssize_t length) +{ + (void)ntokens; /* already checked */ + char *key= tokens[1]; + uint16_t nkey= parse_ascii_key(&key); + if (nkey == 0) + { + /* return error */ + spool_string(client, "CLIENT_ERROR: bad key\r\n"); + return -1; + } + + uint32_t flags = (uint32_t)strtoul(tokens[2], NULL, 10); + uint32_t timeout = (uint32_t)strtoul(tokens[3], NULL, 10); + unsigned long nbytes = strtoul(tokens[4], NULL, 10); + + /* Do we have all data? */ + unsigned long need = nbytes + (unsigned long)((*end - start) + 1) + 2; /* \n\r\n */ + if ((ssize_t)need > length) + { + /* Keep on reading */ + recover_tokenize_command(start, *end); + return 1; + } + + void *data = (*end) + 1; + uint64_t cas= 0; + uint64_t result_cas; + protocol_binary_response_status rval; + switch (client->ascii_command) + { + case SET_CMD: + rval= client->root->callback->interface.v1.set(client, key, + (uint16_t)nkey, + data, + (uint32_t)nbytes, + flags, + timeout, cas, + &result_cas); + break; + case ADD_CMD: + rval= client->root->callback->interface.v1.add(client, key, + (uint16_t)nkey, + data, + (uint32_t)nbytes, + flags, + timeout, &result_cas); + break; + case CAS_CMD: + cas = strtoull(tokens[5], NULL, 10); + /* FALLTHROUGH */ + case REPLACE_CMD: + rval= client->root->callback->interface.v1.replace(client, key, + (uint16_t)nkey, + data, + (uint32_t)nbytes, + flags, + timeout, cas, + &result_cas); + break; + case APPEND_CMD: + rval= client->root->callback->interface.v1.append(client, key, + (uint16_t)nkey, + data, + (uint32_t)nbytes, + cas, + &result_cas); + break; + case PREPEND_CMD: + rval= client->root->callback->interface.v1.prepend(client, key, + (uint16_t)nkey, + data, + (uint32_t)nbytes, + cas, + &result_cas); + break; + + /* gcc complains if I don't put all of the enums in here.. */ + case GET_CMD: + case GETS_CMD: + case DELETE_CMD: + case DECR_CMD: + case INCR_CMD: + case STATS_CMD: + case FLUSH_ALL_CMD: + case VERSION_CMD: + case QUIT_CMD: + case VERBOSITY_CMD: + case UNKNOWN_CMD: + default: + abort(); /* impossible */ + } + + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) + { + spool_string(client, "STORED\r\n"); + } + else + { + if (client->ascii_command == CAS_CMD) + { + if (rval == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS) + { + spool_string(client, "EXISTS\r\n"); + } + else if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) + { + spool_string(client, "NOT_FOUND\r\n"); + } + else + { + spool_string(client, "NOT_STORED\r\n"); + } + } + else + { + spool_string(client, "NOT_STORED\r\n"); + } + } + + *end += nbytes + 2; + + return 0; +} + +static int process_cas_command(struct memcached_protocol_client_st *client, + char **tokens, int ntokens, char *start, + char **end, ssize_t length) +{ + if (ntokens != 6) + { + send_command_usage(client); + return false; + } + + if (client->root->callback->interface.v1.replace == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return false; + } + + return process_storage_command(client, tokens, ntokens, start, end, length); +} + +static int process_set_command(struct memcached_protocol_client_st *client, + char **tokens, int ntokens, char *start, + char **end, ssize_t length) +{ + if (ntokens != 5) + { + send_command_usage(client); + return false; + } + + if (client->root->callback->interface.v1.set == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return false; + } + + return process_storage_command(client, tokens, ntokens, start, end, length); +} + +static int process_add_command(struct memcached_protocol_client_st *client, + char **tokens, int ntokens, char *start, + char **end, ssize_t length) +{ + if (ntokens != 5) + { + send_command_usage(client); + return false; + } + + if (client->root->callback->interface.v1.add == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return false; + } + + return process_storage_command(client, tokens, ntokens, start, end, length); +} + +static int process_replace_command(struct memcached_protocol_client_st *client, + char **tokens, int ntokens, char *start, + char **end, ssize_t length) +{ + if (ntokens != 5) + { + send_command_usage(client); + return false; + } + + if (client->root->callback->interface.v1.replace == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return false; + } + + return process_storage_command(client, tokens, ntokens, start, end, length); +} + +static int process_append_command(struct memcached_protocol_client_st *client, + char **tokens, int ntokens, char *start, + char **end, ssize_t length) +{ + if (ntokens != 5) + { + send_command_usage(client); + return false; + } + + if (client->root->callback->interface.v1.append == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return false; + } + + return process_storage_command(client, tokens, ntokens, start, end, length); +} + +static int process_prepend_command(struct memcached_protocol_client_st *client, + char **tokens, int ntokens, char *start, + char **end, ssize_t length) +{ + if (ntokens != 5) + { + send_command_usage(client); + return false; + } + + if (client->root->callback->interface.v1.prepend == NULL) + { + spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + return false; + } + + return process_storage_command(client, tokens, ntokens, start, end, length); +} + +/** + * The ASCII protocol support is just one giant big hack. Instead of adding + * a optimal ascii support, I just convert the ASCII commands to the binary + * protocol and calls back into the command handlers for the binary protocol ;) + */ +enum MEMCACHED_PROTOCOL_EVENT memcached_ascii_protocol_process_data(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr) +{ + char *ptr= (char*)client->root->input_buffer; + *endptr= ptr; + + do { + /* Do we have \n (indicating the command preamble)*/ + char *end= memchr(ptr, '\n', (size_t)*length); + if (end == NULL) + { + *endptr= ptr; + return READ_EVENT; + } + + client->ascii_command= ascii_to_cmd(ptr, (size_t)(*length)); + + /* A multiget lists all of the keys, and I don't want to have an + * avector of let's say 512 pointers to tokenize all of them, so let's + * just handle them immediately + */ + if (client->ascii_command == GET_CMD || + client->ascii_command == GETS_CMD) { + if (client->root->callback->interface.v1.get != NULL) + ascii_process_gets(client, ptr, end); + else + spool_string(client, "SERVER_ERROR: Command not implemented\n"); + } else { + /* None of the defined commands takes 10 parameters, so lets just use + * that as a maximum limit. + */ + char *tokens[10]; + int ntokens= ascii_tokenize_command(ptr, end, tokens, 10); + + if (ntokens < 10) + { + client->mute= strcmp(tokens[ntokens - 1], "noreply") == 0; + if (client->mute) + --ntokens; /* processed noreply token*/ + } + + int error= 0; + + switch (client->ascii_command) { + case SET_CMD: + error= process_set_command(client, tokens, ntokens, ptr, &end, *length); + break; + case ADD_CMD: + error= process_add_command(client, tokens, ntokens, ptr, &end, *length); + break; + case REPLACE_CMD: + error= process_replace_command(client, tokens, ntokens, + ptr, &end, *length); + case CAS_CMD: + error= process_cas_command(client, tokens, ntokens, ptr, &end, *length); + break; + case APPEND_CMD: + error= process_append_command(client, tokens, ntokens, + ptr, &end, *length); + break; + case PREPEND_CMD: + error= process_prepend_command(client, tokens, ntokens, + ptr, &end, *length); + break; + case DELETE_CMD: + process_delete(client, tokens, ntokens); + break; + + case INCR_CMD: /* FALLTHROUGH */ + case DECR_CMD: + process_arithmetic(client, tokens, ntokens); + break; + case STATS_CMD: + recover_tokenize_command(ptr, end); + process_stats(client, ptr + 6, end); + break; + case FLUSH_ALL_CMD: + process_flush(client, tokens, ntokens); + break; + case VERSION_CMD: + process_version(client, tokens, ntokens); + break; + case QUIT_CMD: + if (ntokens != 1) + send_command_usage(client); + else + { + if (client->root->callback->interface.v1.quit != NULL) + client->root->callback->interface.v1.quit(client); + + return ERROR_EVENT; + } + break; + + case VERBOSITY_CMD: + if (ntokens != 2) + send_command_usage(client); + else + spool_string(client, "OK\r\n"); + break; + + case UNKNOWN_CMD: + send_command_usage(client); + break; + + case GET_CMD: + case GETS_CMD: + default: + /* Should already be handled */ + abort(); + } + + if (error == -1) + return ERROR_EVENT; + else if (error == 1) + return READ_EVENT; + } + + /* Move past \n */ + ++end; + *length -= end - ptr; + ptr= end; + } while (*length > 0); + + *endptr = ptr; + return READ_EVENT; +} diff --git a/libmemcached/protocol/ascii_handler.h b/libmemcached/protocol/ascii_handler.h new file mode 100644 index 00000000..e8737921 --- /dev/null +++ b/libmemcached/protocol/ascii_handler.h @@ -0,0 +1,8 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#ifndef LIBMEMCACHED_PROTOCOL_ASCII_HANDLER_H +#define LIBMEMCACHED_PROTOCOL_ASCII_HANDLER_H + +LIBMEMCACHED_LOCAL +enum MEMCACHED_PROTOCOL_EVENT memcached_ascii_protocol_process_data(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr); + +#endif diff --git a/libmemcached/protocol/binary_handler.c b/libmemcached/protocol/binary_handler.c new file mode 100644 index 00000000..23aeb5f9 --- /dev/null +++ b/libmemcached/protocol/binary_handler.c @@ -0,0 +1,1080 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#include "libmemcached/protocol/common.h" + +#include +#include +#include +#include +#include +#include +#include + +/* +** ********************************************************************** +** INTERNAL INTERFACE +** ********************************************************************** +*/ + +/** + * Send a preformatted packet back to the client. If the connection is in + * pedantic mode, it will validate the packet and refuse to send it if it + * breaks the specification. + * + * @param cookie client identification + * @param request the original request packet + * @param response the packet to send + * @return The status of the operation + */ +static protocol_binary_response_status +raw_response_handler(const void *cookie, + protocol_binary_request_header *request, + protocol_binary_response_header *response) +{ + struct memcached_protocol_client_st *client= (void*)cookie; + + if (client->root->pedantic && + !memcached_binary_protocol_pedantic_check_response(request, response)) + { + return PROTOCOL_BINARY_RESPONSE_EINVAL; + } + + if (!client->root->drain(client)) + { + return PROTOCOL_BINARY_RESPONSE_EIO; + } + + size_t len= sizeof(*response) + htonl(response->response.bodylen); + size_t offset= 0; + char *ptr= (void*)response; + + if (client->output == NULL) + { + /* I can write directly to the socket.... */ + do + { + size_t num_bytes= len - offset; + ssize_t nw= client->root->send(client, + client->sock, + ptr + offset, + num_bytes); + if (nw == -1) + { + if (errno == EWOULDBLOCK) + { + break; + } + else if (errno != EINTR) + { + client->error= errno; + return PROTOCOL_BINARY_RESPONSE_EIO; + } + } + else + { + offset += (size_t)nw; + } + } while (offset < len); + } + + return client->root->spool(client, ptr, len - offset); +} + +/* + * Version 0 of the interface is really low level and protocol specific, + * while the version 1 of the interface is more API focused. We need a + * way to translate between the command codes on the wire and the + * application level interface in V1, so let's just use the V0 of the + * interface as a map instead of creating a huuuge switch :-) + */ + +/** + * Callback for the GET/GETQ/GETK and GETKQ responses + * @param cookie client identifier + * @param key the key for the item + * @param keylen the length of the key + * @param body the length of the body + * @param bodylen the length of the body + * @param flags the flags for the item + * @param cas the CAS id for the item + */ +static protocol_binary_response_status +get_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen, + uint32_t flags, + uint64_t cas) { + + struct memcached_protocol_client_st *client= (void*)cookie; + uint8_t opcode= client->current_command->request.opcode; + + if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ) + { + keylen= 0; + } + + protocol_binary_response_get response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= client->current_command->request.opaque, + .cas= htonll(cas), + .keylen= htons(keylen), + .extlen= 4, + .bodylen= htonl(bodylen + keylen + 4), + }, + .message.body.flags= htonl(flags), + }; + + protocol_binary_response_status rval; + const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; + if ((rval= client->root->spool(client, response.bytes, sizeof(response.bytes))) != success || + (rval= client->root->spool(client, key, keylen)) != success || + (rval= client->root->spool(client, body, bodylen)) != success) + { + return rval; + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Callback for the STAT responses + * @param cookie client identifier + * @param key the key for the item + * @param keylen the length of the key + * @param body the length of the body + * @param bodylen the length of the body + */ +static protocol_binary_response_status +stat_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen) { + + struct memcached_protocol_client_st *client= (void*)cookie; + + protocol_binary_response_no_extras response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= client->current_command->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= client->current_command->request.opaque, + .keylen= htons(keylen), + .bodylen= htonl(bodylen + keylen), + }, + }; + + protocol_binary_response_status rval; + const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; + if ((rval= client->root->spool(client, response.bytes, sizeof(response.bytes))) != success || + (rval= client->root->spool(client, key, keylen)) != success || + (rval= client->root->spool(client, body, bodylen)) != success) + { + return rval; + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Callback for the VERSION responses + * @param cookie client identifier + * @param text the length of the body + * @param textlen the length of the body + */ +static protocol_binary_response_status +version_response_handler(const void *cookie, + const void *text, + uint32_t textlen) { + + struct memcached_protocol_client_st *client= (void*)cookie; + + protocol_binary_response_no_extras response= { + .message.header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= client->current_command->request.opcode, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= client->current_command->request.opaque, + .bodylen= htonl(textlen), + }, + }; + + protocol_binary_response_status rval; + const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; + if ((rval= client->root->spool(client, response.bytes, sizeof(response.bytes))) != success || + (rval= client->root->spool(client, text, textlen)) != success) + { + return rval; + } + + return PROTOCOL_BINARY_RESPONSE_SUCCESS; +} + +/** + * Callback for ADD and ADDQ + * @param cookie the calling client + * @param header the add/addq command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +add_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.add != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_add *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + uint32_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + uint64_t cas; + + rval= client->root->callback->interface.v1.add(cookie, key, keylen, + data, datalen, flags, + timeout, &cas); + + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_ADD) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_ADD, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(cas) + } + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for DECREMENT and DECREMENTQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +decrement_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.decrement != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + protocol_binary_request_decr *request= (void*)header; + uint64_t init= ntohll(request->message.body.initial); + uint64_t delta= ntohll(request->message.body.delta); + uint32_t timeout= ntohl(request->message.body.expiration); + void *key= request->bytes + sizeof(request->bytes); + uint64_t result; + uint64_t cas; + + rval= client->root->callback->interface.v1.decrement(cookie, key, keylen, + delta, init, timeout, + &result, &cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT) + { + /* Send a positive request */ + protocol_binary_response_decr response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_DECREMENT, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(cas), + .bodylen= htonl(8) + }, + .body.value = htonll(result) + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for DELETE and DELETEQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +delete_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.delete != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + void *key= (header + 1); + uint64_t cas= ntohll(header->request.cas); + rval= client->root->callback->interface.v1.delete(cookie, key, keylen, cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_DELETE) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_DELETE, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + } + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for FLUSH and FLUSHQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +flush_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.flush != NULL) + { + protocol_binary_request_flush *flush= (void*)header; + uint32_t timeout= 0; + if (htonl(header->request.bodylen) == 4) + { + timeout= ntohl(flush->message.body.expiration); + } + + rval= client->root->callback->interface.v1.flush(cookie, timeout); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_FLUSH, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + } + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for GET, GETK, GETQ, GETKQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +get_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.get != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + void *key= (header + 1); + rval= client->root->callback->interface.v1.get(cookie, key, keylen, + get_response_handler); + + if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT && + (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ || + header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ)) + { + /* Quiet commands shouldn't respond on cache misses */ + rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for INCREMENT and INCREMENTQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +increment_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.increment != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + protocol_binary_request_incr *request= (void*)header; + uint64_t init= ntohll(request->message.body.initial); + uint64_t delta= ntohll(request->message.body.delta); + uint32_t timeout= ntohl(request->message.body.expiration); + void *key= request->bytes + sizeof(request->bytes); + uint64_t cas; + uint64_t result; + + rval= client->root->callback->interface.v1.increment(cookie, key, keylen, + delta, init, timeout, + &result, &cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT) + { + /* Send a positive request */ + protocol_binary_response_incr response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_INCREMENT, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(cas), + .bodylen= htonl(8) + }, + .body.value = htonll(result) + } + }; + + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for noop. Inform the v1 interface about the noop packet, and + * create and send a packet back to the client + * + * @param cookie the calling client + * @param header the command + * @param response_handler the response handler + * @return the result of the operation + */ +static protocol_binary_response_status +noop_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.noop != NULL) + { + client->root->callback->interface.v1.noop(cookie); + } + + protocol_binary_response_no_extras response= { + .message = { + .header.response = { + .magic = PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_NOOP, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + } + } + }; + + return response_handler(cookie, header, (void*)&response); +} + +/** + * Callback for APPEND and APPENDQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +append_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.append != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen; + char *key= (void*)(header + 1); + char *data= key + keylen; + uint64_t cas= ntohll(header->request.cas); + uint64_t result_cas; + + rval= client->root->callback->interface.v1.append(cookie, key, keylen, + data, datalen, cas, + &result_cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_APPEND) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_APPEND, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(result_cas), + }, + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for PREPEND and PREPENDQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +prepend_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.prepend != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen; + char *key= (char*)(header + 1); + char *data= key + keylen; + uint64_t cas= ntohll(header->request.cas); + uint64_t result_cas; + rval= client->root->callback->interface.v1.prepend(cookie, key, keylen, + data, datalen, cas, + &result_cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_PREPEND, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(result_cas), + }, + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for QUIT and QUITQ. Notify the client and shut down the connection + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +quit_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.quit != NULL) + { + client->root->callback->interface.v1.quit(cookie); + } + + protocol_binary_response_no_extras response= { + .message = { + .header.response = { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_QUIT, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque + } + } + }; + + if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT) + { + response_handler(cookie, header, (void*)&response); + } + + /* I need a better way to signal to close the connection */ + return PROTOCOL_BINARY_RESPONSE_EIO; +} + +/** + * Callback for REPLACE and REPLACEQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +replace_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.replace != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_replace *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + uint32_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + uint64_t cas= ntohll(header->request.cas); + uint64_t result_cas; + + rval= client->root->callback->interface.v1.replace(cookie, key, keylen, + data, datalen, flags, + timeout, cas, + &result_cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_REPLACE, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(result_cas), + }, + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for SET and SETQ + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +set_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.set != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; + protocol_binary_request_replace *request= (void*)header; + uint32_t flags= ntohl(request->message.body.flags); + uint32_t timeout= ntohl(request->message.body.expiration); + char *key= ((char*)header) + sizeof(*header) + 8; + char *data= key + keylen; + uint64_t cas= ntohll(header->request.cas); + uint64_t result_cas; + + + rval= client->root->callback->interface.v1.set(cookie, key, keylen, + data, datalen, flags, + timeout, cas, &result_cas); + if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && + header->request.opcode == PROTOCOL_BINARY_CMD_SET) + { + /* Send a positive request */ + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= PROTOCOL_BINARY_CMD_SET, + .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), + .opaque= header->request.opaque, + .cas= ntohll(result_cas), + }, + } + }; + rval= response_handler(cookie, header, (void*)&response); + } + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for STAT + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +stat_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.stat != NULL) + { + uint16_t keylen= ntohs(header->request.keylen); + + rval= client->root->callback->interface.v1.stat(cookie, + (void*)(header + 1), + keylen, + stat_response_handler); + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * Callback for VERSION + * @param cookie the calling client + * @param header the command + * @param response_handler not used + * @return the result of the operation + */ +static protocol_binary_response_status +version_command_handler(const void *cookie, + protocol_binary_request_header *header, + memcached_binary_protocol_raw_response_handler response_handler) +{ + (void)response_handler; + (void)header; + protocol_binary_response_status rval; + + struct memcached_protocol_client_st *client= (void*)cookie; + if (client->root->callback->interface.v1.version != NULL) + { + rval= client->root->callback->interface.v1.version(cookie, + version_response_handler); + } + else + { + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + } + + return rval; +} + +/** + * The map to remap between the com codes and the v1 logical setting + */ +static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256]= { + [PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler, + [PROTOCOL_BINARY_CMD_ADD]= add_command_handler, + [PROTOCOL_BINARY_CMD_APPENDQ]= append_command_handler, + [PROTOCOL_BINARY_CMD_APPEND]= append_command_handler, + [PROTOCOL_BINARY_CMD_DECREMENTQ]= decrement_command_handler, + [PROTOCOL_BINARY_CMD_DECREMENT]= decrement_command_handler, + [PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler, + [PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler, + [PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler, + [PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler, + [PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler, + [PROTOCOL_BINARY_CMD_GETK]= get_command_handler, + [PROTOCOL_BINARY_CMD_GETQ]= get_command_handler, + [PROTOCOL_BINARY_CMD_GET]= get_command_handler, + [PROTOCOL_BINARY_CMD_INCREMENTQ]= increment_command_handler, + [PROTOCOL_BINARY_CMD_INCREMENT]= increment_command_handler, + [PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler, + [PROTOCOL_BINARY_CMD_PREPENDQ]= prepend_command_handler, + [PROTOCOL_BINARY_CMD_PREPEND]= prepend_command_handler, + [PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler, + [PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler, + [PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler, + [PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler, + [PROTOCOL_BINARY_CMD_SETQ]= set_command_handler, + [PROTOCOL_BINARY_CMD_SET]= set_command_handler, + [PROTOCOL_BINARY_CMD_STAT]= stat_command_handler, + [PROTOCOL_BINARY_CMD_VERSION]= version_command_handler, +}; + +/** + * Try to execute a command. Fire the pre/post functions and the specialized + * handler function if it's set. If not, the unknown probe should be fired + * if it's present. + * @param client the client connection to operate on + * @param header the command to execute + * @return true if success or false if a fatal error occured so that the + * connection should be shut down. + */ +static bool execute_command(struct memcached_protocol_client_st *client, protocol_binary_request_header *header) +{ + if (client->root->pedantic && + memcached_binary_protocol_pedantic_check_request(header)) + { + /* @todo return invalid command packet */ + } + + /* we got all data available, execute the callback! */ + if (client->root->callback->pre_execute != NULL) + { + client->root->callback->pre_execute(client, header); + } + + protocol_binary_response_status rval; + rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; + uint8_t cc= header->request.opcode; + + switch (client->root->callback->interface_version) + { + case 0: + if (client->root->callback->interface.v0.comcode[cc] != NULL) { + rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler); + } + break; + case 1: + if (comcode_v0_v1_remap[cc] != NULL) { + rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler); + } + break; + default: + /* Unknown interface. + * It should be impossible to get here so I'll just call abort + * to avoid getting a compiler warning :-) + */ + abort(); + } + + + if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND && + client->root->callback->unknown != NULL) + { + rval= client->root->callback->unknown(client, header, raw_response_handler); + } + + if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS && + rval != PROTOCOL_BINARY_RESPONSE_EIO) + { + protocol_binary_response_no_extras response= { + .message= { + .header.response= { + .magic= PROTOCOL_BINARY_RES, + .opcode= cc, + .status= htons(rval), + .opaque= header->request.opaque, + }, + } + }; + rval= raw_response_handler(client, header, (void*)&response); + } + + if (client->root->callback->post_execute != NULL) + { + client->root->callback->post_execute(client, header); + } + + return rval != PROTOCOL_BINARY_RESPONSE_EIO; +} + +/* +** ********************************************************************** +** "PROTOECTED" INTERFACE +** ********************************************************************** +*/ +enum MEMCACHED_PROTOCOL_EVENT memcached_binary_protocol_process_data(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr) +{ + /* try to parse all of the received packets */ + protocol_binary_request_header *header; + header= (void*)client->root->input_buffer; + if (header->request.magic != (uint8_t)PROTOCOL_BINARY_REQ) + { + client->error= EINVAL; + return ERROR_EVENT; + } + ssize_t len= *length; + + while (len >= (ssize_t)sizeof(*header) && + (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)))) + { + /* I have the complete package */ + client->current_command = header; + if (!execute_command(client, header)) + { + *length= len; + *endptr= (void*)header; + return ERROR_EVENT; + } + + ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)); + len -= total; + if (len > 0) + { + intptr_t ptr= (intptr_t)header; + ptr += total; + if ((ptr % 8) == 0) + { + header= (void*)ptr; + } + else + { + /* Fix alignment */ + memmove(client->root->input_buffer, (void*)ptr, (size_t)len); + header= (void*)client->root->input_buffer; + } + } + } + + *length= len; + *endptr= (void*)header; + + return READ_EVENT; +} + +/* +** ********************************************************************** +** PUBLIC INTERFACE +** ********************************************************************** +*/ +struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_protocol_st *instance) +{ + return instance->callback; +} + +void memcached_binary_protocol_set_callbacks(struct memcached_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback) +{ + instance->callback= callback; +} + +memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie) +{ + (void)cookie; + return raw_response_handler; +} + +void memcached_binary_protocol_set_pedantic(struct memcached_protocol_st *instance, bool enable) +{ + instance->pedantic= enable; +} + +bool memcached_binary_protocol_get_pedantic(struct memcached_protocol_st *instance) +{ + return instance->pedantic; +} + diff --git a/libmemcached/protocol/binary_handler.h b/libmemcached/protocol/binary_handler.h new file mode 100644 index 00000000..7b1d0aca --- /dev/null +++ b/libmemcached/protocol/binary_handler.h @@ -0,0 +1,15 @@ +/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#ifndef LIBMEMCACHED_PROTOCOL_BINARY_HANDLER_H +#define LIBMEMCACHED_PROTOCOL_BINARY_HANDLER_H + +LIBMEMCACHED_LOCAL +bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_header *request); + +LIBMEMCACHED_LOCAL +bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_header *request, + protocol_binary_response_header *response); + +LIBMEMCACHED_LOCAL +enum MEMCACHED_PROTOCOL_EVENT memcached_binary_protocol_process_data(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr); + +#endif diff --git a/libmemcached/protocol/common.h b/libmemcached/protocol/common.h index f6062422..20fa16e5 100644 --- a/libmemcached/protocol/common.h +++ b/libmemcached/protocol/common.h @@ -1,6 +1,6 @@ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -#ifndef MEMCACHED_PROTOCOL_INTERNAL_H -#define MEMCACHED_PROTOCOL_INTERNAL_H +#ifndef LIBMEMCACHED_PROTOCOL_COMMON_H +#define LIBMEMCACHED_PROTOCOL_COMMON_H #include "config.h" #include @@ -16,18 +16,48 @@ #include #include -struct memcached_binary_protocol_st { +/* + * I don't really need the following two functions as function pointers + * in the instance handle, but I don't want to put them in the global + * namespace for those linking statically (personally I don't like that, + * but some people still do). If it ever shows up as a performance thing + * I'll look into optimizing this ;-) + */ +typedef bool (*drain_func)(struct memcached_protocol_client_st *client); +typedef protocol_binary_response_status (*spool_func)(struct memcached_protocol_client_st *client, + const void *data, + size_t length); + +/** + * Definition of the per instance structure. + */ +struct memcached_protocol_st { struct memcached_binary_protocol_callback_st *callback; - memcached_binary_protocol_recv_func recv; - memcached_binary_protocol_send_func send; - char *input_buffer; + memcached_protocol_recv_func recv; + memcached_protocol_send_func send; + + /* + * I really don't need these as funciton pointers, but I don't want + * to clutter the namespace if someone links statically. + */ + drain_func drain; + spool_func spool; + + /* + * To avoid keeping a buffer in each client all the time I have a + * bigger buffer in the instance that I read to initially, and then + * I try to parse and execute as much from the buffer. If I wasn't able + * to process all data I'll keep that in a per-connection buffer until + * the next time I can read from the socket. + */ + uint8_t *input_buffer; size_t input_buffer_size; + bool pedantic; /* @todo use multiple sized buffers */ cache_t *buffer_cache; }; - struct chunk_st { /* Pointer to the data */ char *data; @@ -43,8 +73,30 @@ struct chunk_st { #define CHUNK_BUFFERSIZE 2048 -struct memcached_binary_protocol_client_st { - struct memcached_binary_protocol_st *root; +typedef enum MEMCACHED_PROTOCOL_EVENT (*process_data)(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr); + +enum ascii_cmd { + GET_CMD, + GETS_CMD, + SET_CMD, + ADD_CMD, + REPLACE_CMD, + CAS_CMD, + APPEND_CMD, + PREPEND_CMD, + DELETE_CMD, + INCR_CMD, + DECR_CMD, + STATS_CMD, + FLUSH_ALL_CMD, + VERSION_CMD, + QUIT_CMD, + VERBOSITY_CMD, + UNKNOWN_CMD, +}; + +struct memcached_protocol_client_st { + struct memcached_protocol_st *root; int sock; int error; @@ -52,29 +104,32 @@ struct memcached_binary_protocol_client_st { struct chunk_st *output; struct chunk_st *output_tail; - - - char *input_buffer; + /* + * While we process input data, this is where we spool incomplete commands + * if we need to receive more data.... + * @todo use the buffercace + */ + uint8_t *input_buffer; size_t input_buffer_size; size_t input_buffer_offset; - char *curr_input; + /* The callback to the protocol handler to use (ascii or binary) */ + process_data work; - struct chunk_st *input; - /* Pointer to the last chunk to avoid the need to traverse the complete list */ - struct chunk_st *input_tail; - size_t bytes_available; + /* + * Should the spool data discard the data to send or not? (aka noreply in + * the ascii protocol.. + */ + bool mute; + /* Members used by the binary protocol */ protocol_binary_request_header *current_command; - bool quiet; -}; - -LIBMEMCACHED_LOCAL -bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_header *request); -LIBMEMCACHED_LOCAL -bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_header *request, - protocol_binary_response_header *response); + /* Members used by the ascii protocol */ + enum ascii_cmd ascii_command; +}; +#include "ascii_handler.h" +#include "binary_handler.h" #endif diff --git a/libmemcached/protocol/pedantic.c b/libmemcached/protocol/pedantic.c index 884f6721..d6304396 100644 --- a/libmemcached/protocol/pedantic.c +++ b/libmemcached/protocol/pedantic.c @@ -1,5 +1,5 @@ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -#include "common.h" +#include "libmemcached/protocol/common.h" #include #include @@ -19,7 +19,7 @@ bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_he uint16_t keylen= ntohs(request->request.keylen); uint8_t extlen= request->request.extlen; uint32_t bodylen= ntohl(request->request.bodylen); - + ensure(bodylen >= (keylen + extlen)); switch (opcode) { @@ -83,7 +83,7 @@ bool memcached_binary_protocol_pedantic_check_request(protocol_binary_request_he /* May have key, but not value */ ensure(keylen == bodylen); break; - + case PROTOCOL_BINARY_CMD_APPEND: case PROTOCOL_BINARY_CMD_APPENDQ: case PROTOCOL_BINARY_CMD_PREPEND: @@ -109,7 +109,7 @@ bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_h uint16_t status= ntohs(response->response.status); uint8_t opcode= response->response.opcode; - if (status == PROTOCOL_BINARY_RESPONSE_SUCCESS) + if (status == PROTOCOL_BINARY_RESPONSE_SUCCESS) { switch (opcode) { case PROTOCOL_BINARY_CMD_ADDQ: @@ -189,11 +189,11 @@ bool memcached_binary_protocol_pedantic_check_response(protocol_binary_request_h break; } } - else + else { ensure(response->response.cas == 0); ensure(response->response.extlen == 0); - if (opcode != PROTOCOL_BINARY_CMD_GETK) + if (opcode != PROTOCOL_BINARY_CMD_GETK) { ensure(response->response.keylen == 0); } diff --git a/libmemcached/protocol/protocol_handler.c b/libmemcached/protocol/protocol_handler.c index f43788d4..12cf248c 100644 --- a/libmemcached/protocol/protocol_handler.c +++ b/libmemcached/protocol/protocol_handler.c @@ -1,5 +1,5 @@ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ -#include "common.h" +#include "libmemcached/protocol/common.h" #include #include @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include /* @@ -63,11 +65,11 @@ static ssize_t default_send(const void *cookie, * true otherwise (please note that there may be more data to * left in the buffer to send) */ -static bool drain_output(struct memcached_binary_protocol_client_st *client) +static bool drain_output(struct memcached_protocol_client_st *client) { ssize_t len; - // Do we have pending data to send? + /* Do we have pending data to send? */ while (client->output != NULL) { len= client->root->send(client, @@ -113,11 +115,11 @@ static bool drain_output(struct memcached_binary_protocol_client_st *client) * @param client the client that needs the buffer * @return pointer to the new chunk if the allocation succeeds, NULL otherwise */ -static struct chunk_st* -allocate_output_chunk(struct memcached_binary_protocol_client_st *client) +static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client) { - struct chunk_st*ret= cache_alloc(client->root->buffer_cache); - if (ret == NULL) { + struct chunk_st *ret= cache_alloc(client->root->buffer_cache); + if (ret == NULL) + { return NULL; } @@ -147,11 +149,15 @@ allocate_output_chunk(struct memcached_binary_protocol_client_st *client) * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success, * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory */ -static protocol_binary_response_status -spool_output(struct memcached_binary_protocol_client_st *client, - const void *data, - size_t length) +static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client, + const void *data, + size_t length) { + if (client->mute) + { + return PROTOCOL_BINARY_RESPONSE_SUCCESS; + } + size_t offset = 0; struct chunk_st *chunk= client->output; @@ -180,981 +186,43 @@ spool_output(struct memcached_binary_protocol_client_st *client, } /** - * Send a preformatted packet back to the client. If the connection is in - * pedantic mode, it will validate the packet and refuse to send it if it - * breaks the specification. + * Try to determine the protocol used on this connection. + * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should + * be using the binary protocol on the connection. I implemented the support + * for the ASCII protocol by wrapping into the simple interface (aka v1), + * so the implementors needs to provide an implementation of that interface * - * @param cookie client identification - * @param request the original request packet - * @param response the packet to send - * @return The status of the operation */ -static protocol_binary_response_status -raw_response_handler(const void *cookie, - protocol_binary_request_header *request, - protocol_binary_response_header *response) +static enum MEMCACHED_PROTOCOL_EVENT determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr) { - struct memcached_binary_protocol_client_st *client= (void*)cookie; - - if (client->root->pedantic && - !memcached_binary_protocol_pedantic_check_response(request, response)) - { - return PROTOCOL_BINARY_RESPONSE_EINVAL; - } - - if (!drain_output(client)) - { - return PROTOCOL_BINARY_RESPONSE_EIO; - } - - size_t len= sizeof(*response) + htonl(response->response.bodylen); - size_t offset= 0; - char *ptr= (void*)response; - - if (client->output == NULL) - { - /* I can write directly to the socket.... */ - do - { - size_t num_bytes= len - offset; - ssize_t nw= client->root->send(client, - client->sock, - ptr + offset, - num_bytes); - if (nw == -1) - { - if (errno == EWOULDBLOCK) - { - break; - } - else if (errno != EINTR) - { - client->error= errno; - return PROTOCOL_BINARY_RESPONSE_EIO; - } - } - else - { - offset += (size_t)nw; - } - } while (offset < len); - } - - return spool_output(client, ptr, len - offset); -} - -/* - * Version 0 of the interface is really low level and protocol specific, - * while the version 1 of the interface is more API focused. We need a - * way to translate between the command codes on the wire and the - * application level interface in V1, so let's just use the V0 of the - * interface as a map instead of creating a huuuge switch :-) - */ - -/** - * Callback for the GET/GETQ/GETK and GETKQ responses - * @param cookie client identifier - * @param key the key for the item - * @param keylen the length of the key - * @param body the length of the body - * @param bodylen the length of the body - * @param flags the flags for the item - * @param cas the CAS id for the item - */ -static protocol_binary_response_status -get_response_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void *body, - uint32_t bodylen, - uint32_t flags, - uint64_t cas) { - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - uint8_t opcode= client->current_command->request.opcode; - - if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ) + if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ) { - keylen= 0; + client->work= memcached_binary_protocol_process_data; } - - protocol_binary_response_get response= { - .message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= client->current_command->request.opaque, - .cas= htonll(cas), - .keylen= htons(keylen), - .extlen= 4, - .bodylen= htonl(bodylen + keylen + 4), - }, - .message.body.flags= htonl(flags), - }; - - protocol_binary_response_status rval; - const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; - if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success || - (rval= spool_output(client, key, keylen)) != success || - (rval= spool_output(client, body, bodylen)) != success) + else if (client->root->callback->interface_version == 1) { - return rval; - } - - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -/** - * Callback for the STAT responses - * @param cookie client identifier - * @param key the key for the item - * @param keylen the length of the key - * @param body the length of the body - * @param bodylen the length of the body - */ -static protocol_binary_response_status -stat_response_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void *body, - uint32_t bodylen) { - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - - protocol_binary_response_no_extras response= { - .message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= client->current_command->request.opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= client->current_command->request.opaque, - .keylen= htons(keylen), - .bodylen= htonl(bodylen + keylen), - }, - }; - - protocol_binary_response_status rval; - const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; - if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success || - (rval= spool_output(client, key, keylen)) != success || - (rval= spool_output(client, body, bodylen)) != success) - { - return rval; - } - - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -/** - * Callback for the VERSION responses - * @param cookie client identifier - * @param text the length of the body - * @param textlen the length of the body - */ -static protocol_binary_response_status -version_response_handler(const void *cookie, - const void *text, - uint32_t textlen) { - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - - protocol_binary_response_no_extras response= { - .message.header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= client->current_command->request.opcode, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= client->current_command->request.opaque, - .bodylen= htonl(textlen), - }, - }; - - protocol_binary_response_status rval; - const protocol_binary_response_status success = PROTOCOL_BINARY_RESPONSE_SUCCESS; - if ((rval= spool_output(client, response.bytes, sizeof(response.bytes))) != success || - (rval= spool_output(client, text, textlen)) != success) - { - return rval; - } - - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -/** - * Callback for ADD and ADDQ - * @param cookie the calling client - * @param header the add/addq command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -add_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.add != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; - protocol_binary_request_add *request= (void*)header; - uint32_t flags= ntohl(request->message.body.flags); - uint32_t timeout= ntohl(request->message.body.expiration); - char *key= ((char*)header) + sizeof(*header) + 8; - char *data= key + keylen; - uint64_t cas; - - rval= client->root->callback->interface.v1.add(cookie, key, keylen, - data, datalen, flags, - timeout, &cas); - - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_ADD) - { - /* Send a positive request */ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_ADD, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - .cas= ntohll(cas) - } - } - }; - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for DECREMENT and DECREMENTQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -decrement_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.decrement != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - protocol_binary_request_decr *request= (void*)header; - uint64_t init= ntohll(request->message.body.initial); - uint64_t delta= ntohll(request->message.body.delta); - uint32_t timeout= ntohl(request->message.body.expiration); - void *key= request->bytes + sizeof(request->bytes); - uint64_t result; - uint64_t cas; - - rval= client->root->callback->interface.v1.decrement(cookie, key, keylen, - delta, init, timeout, - &result, &cas); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT) - { - /* Send a positive request */ - protocol_binary_response_decr response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_DECREMENT, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - .cas= ntohll(cas), - .bodylen= htonl(8) - }, - .body.value = htonll(result) - } - }; - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for DELETE and DELETEQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -delete_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.delete != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - void *key= (header + 1); - uint64_t cas= ntohll(header->request.cas); - rval= client->root->callback->interface.v1.delete(cookie, key, keylen, cas); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_DELETE) - { - /* Send a positive request */ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_DELETE, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - } - } - }; - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for FLUSH and FLUSHQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -flush_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.flush != NULL) - { - protocol_binary_request_flush *flush= (void*)header; - uint32_t timeout= 0; - if (htonl(header->request.bodylen) == 4) - { - timeout= ntohl(flush->message.body.expiration); - } - - rval= client->root->callback->interface.v1.flush(cookie, timeout); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH) - { - /* Send a positive request */ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_FLUSH, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - } - } - }; - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for GET, GETK, GETQ, GETKQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -get_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.get != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - void *key= (header + 1); - rval= client->root->callback->interface.v1.get(cookie, key, keylen, - get_response_handler); - - if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT && - (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ || - header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ)) - { - /* Quiet commands shouldn't respond on cache misses */ - rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for INCREMENT and INCREMENTQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -increment_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.increment != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - protocol_binary_request_incr *request= (void*)header; - uint64_t init= ntohll(request->message.body.initial); - uint64_t delta= ntohll(request->message.body.delta); - uint32_t timeout= ntohl(request->message.body.expiration); - void *key= request->bytes + sizeof(request->bytes); - uint64_t cas; - uint64_t result; - - rval= client->root->callback->interface.v1.increment(cookie, key, keylen, - delta, init, timeout, - &result, &cas); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT) - { - /* Send a positive request */ - protocol_binary_response_incr response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_INCREMENT, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - .cas= ntohll(cas), - .bodylen= htonl(8) - }, - .body.value = htonll(result) - } - }; - - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for noop. Inform the v1 interface about the noop packet, and - * create and send a packet back to the client - * - * @param cookie the calling client - * @param header the command - * @param response_handler the response handler - * @return the result of the operation - */ -static protocol_binary_response_status -noop_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.noop != NULL) - { - client->root->callback->interface.v1.noop(cookie); - } - - protocol_binary_response_no_extras response= { - .message = { - .header.response = { - .magic = PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_NOOP, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - } - } - }; - - return response_handler(cookie, header, (void*)&response); -} - -/** - * Callback for APPEND and APPENDQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -append_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.append != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - uint32_t datalen= ntohl(header->request.bodylen) - keylen; - char *key= (void*)(header + 1); - char *data= key + keylen; - uint64_t cas= ntohll(header->request.cas); - uint64_t result_cas; - - rval= client->root->callback->interface.v1.append(cookie, key, keylen, - data, datalen, cas, - &result_cas); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_APPEND) - { - /* Send a positive request */ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_APPEND, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - .cas= ntohll(result_cas), - }, - } - }; - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for PREPEND and PREPENDQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -prepend_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.prepend != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - uint32_t datalen= ntohl(header->request.bodylen) - keylen; - char *key= (char*)(header + 1); - char *data= key + keylen; - uint64_t cas= ntohll(header->request.cas); - uint64_t result_cas; - rval= client->root->callback->interface.v1.prepend(cookie, key, keylen, - data, datalen, cas, - &result_cas); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND) - { - /* Send a positive request */ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_PREPEND, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - .cas= ntohll(result_cas), - }, - } - }; - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for QUIT and QUITQ. Notify the client and shut down the connection - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -quit_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.quit != NULL) - { - client->root->callback->interface.v1.quit(cookie); - } - - protocol_binary_response_no_extras response= { - .message = { - .header.response = { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_QUIT, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque - } - } - }; - - if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT) - { - response_handler(cookie, header, (void*)&response); - } - - /* I need a better way to signal to close the connection */ - return PROTOCOL_BINARY_RESPONSE_EIO; -} - -/** - * Callback for REPLACE and REPLACEQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -replace_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.replace != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; - protocol_binary_request_replace *request= (void*)header; - uint32_t flags= ntohl(request->message.body.flags); - uint32_t timeout= ntohl(request->message.body.expiration); - char *key= ((char*)header) + sizeof(*header) + 8; - char *data= key + keylen; - uint64_t cas= ntohll(header->request.cas); - uint64_t result_cas; - - rval= client->root->callback->interface.v1.replace(cookie, key, keylen, - data, datalen, flags, - timeout, cas, - &result_cas); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE) - { - /* Send a positive request */ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_REPLACE, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - .cas= ntohll(result_cas), - }, - } - }; - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for SET and SETQ - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -set_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.set != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; - protocol_binary_request_replace *request= (void*)header; - uint32_t flags= ntohl(request->message.body.flags); - uint32_t timeout= ntohl(request->message.body.expiration); - char *key= ((char*)header) + sizeof(*header) + 8; - char *data= key + keylen; - uint64_t cas= ntohll(header->request.cas); - uint64_t result_cas; - - - rval= client->root->callback->interface.v1.set(cookie, key, keylen, - data, datalen, flags, - timeout, cas, &result_cas); - if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && - header->request.opcode == PROTOCOL_BINARY_CMD_SET) - { - /* Send a positive request */ - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= PROTOCOL_BINARY_CMD_SET, - .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), - .opaque= header->request.opaque, - .cas= ntohll(result_cas), - }, - } - }; - rval= response_handler(cookie, header, (void*)&response); - } - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for STAT - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -stat_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.stat != NULL) - { - uint16_t keylen= ntohs(header->request.keylen); - - rval= client->root->callback->interface.v1.stat(cookie, - (void*)(header + 1), - keylen, - stat_response_handler); - } - else - { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * Callback for VERSION - * @param cookie the calling client - * @param header the command - * @param response_handler not used - * @return the result of the operation - */ -static protocol_binary_response_status -version_command_handler(const void *cookie, - protocol_binary_request_header *header, - memcached_binary_protocol_raw_response_handler response_handler) -{ - (void)response_handler; - (void)header; - protocol_binary_response_status rval; - - struct memcached_binary_protocol_client_st *client= (void*)cookie; - if (client->root->callback->interface.v1.version != NULL) - { - rval= client->root->callback->interface.v1.version(cookie, - version_response_handler); + /* + * The ASCII protocol can only be used if the implementors provide + * an implementation for the version 1 of the interface.. + * + * @todo I should allow the implementors to provide an implementation + * for version 0 and 1 at the same time and set the preferred + * interface to use... + */ + client->work= memcached_ascii_protocol_process_data; } else { - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - } - - return rval; -} - -/** - * The map to remap between the com codes and the v1 logical setting - */ -static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256]= { - [PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler, - [PROTOCOL_BINARY_CMD_ADD]= add_command_handler, - [PROTOCOL_BINARY_CMD_APPENDQ]= append_command_handler, - [PROTOCOL_BINARY_CMD_APPEND]= append_command_handler, - [PROTOCOL_BINARY_CMD_DECREMENTQ]= decrement_command_handler, - [PROTOCOL_BINARY_CMD_DECREMENT]= decrement_command_handler, - [PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler, - [PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler, - [PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler, - [PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler, - [PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler, - [PROTOCOL_BINARY_CMD_GETK]= get_command_handler, - [PROTOCOL_BINARY_CMD_GETQ]= get_command_handler, - [PROTOCOL_BINARY_CMD_GET]= get_command_handler, - [PROTOCOL_BINARY_CMD_INCREMENTQ]= increment_command_handler, - [PROTOCOL_BINARY_CMD_INCREMENT]= increment_command_handler, - [PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler, - [PROTOCOL_BINARY_CMD_PREPENDQ]= prepend_command_handler, - [PROTOCOL_BINARY_CMD_PREPEND]= prepend_command_handler, - [PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler, - [PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler, - [PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler, - [PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler, - [PROTOCOL_BINARY_CMD_SETQ]= set_command_handler, - [PROTOCOL_BINARY_CMD_SET]= set_command_handler, - [PROTOCOL_BINARY_CMD_STAT]= stat_command_handler, - [PROTOCOL_BINARY_CMD_VERSION]= version_command_handler, -}; - -/** - * Try to execute a command. Fire the pre/post functions and the specialized - * handler function if it's set. If not, the unknown probe should be fired - * if it's present. - * @param client the client connection to operate on - * @param header the command to execute - * @return true if success or false if a fatal error occured so that the - * connection should be shut down. - */ -static bool execute_command(struct memcached_binary_protocol_client_st *client, protocol_binary_request_header *header) -{ - if (client->root->pedantic && - memcached_binary_protocol_pedantic_check_request(header)) - { - /* @todo return invalid command packet */ - } - - /* we got all data available, execute the callback! */ - if (client->root->callback->pre_execute != NULL) - { - client->root->callback->pre_execute(client, header); - } - - protocol_binary_response_status rval; - rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; - uint8_t cc= header->request.opcode; - - switch (client->root->callback->interface_version) - { - case 0: - if (client->root->callback->interface.v0.comcode[cc] != NULL) { - rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler); - } - break; - case 1: - if (comcode_v0_v1_remap[cc] != NULL) { - rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler); - } - break; - default: - /* Unknown interface. - * It should be impossible to get here so I'll just call abort - * to avoid getting a compiler warning :-) + /* Let's just output a warning the way it is supposed to look like + * in the ASCII protocol... */ - abort(); - } - - - if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND && - client->root->callback->unknown != NULL) - { - rval= client->root->callback->unknown(client, header, raw_response_handler); - } - - if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS && - rval != PROTOCOL_BINARY_RESPONSE_EIO) - { - protocol_binary_response_no_extras response= { - .message= { - .header.response= { - .magic= PROTOCOL_BINARY_RES, - .opcode= cc, - .status= htons(rval), - .opaque= header->request.opaque, - }, - } - }; - rval= raw_response_handler(client, header, (void*)&response); + const char *err= "CLIENT_ERROR: Unsupported protocol\r\n"; + client->root->spool(client, err, strlen(err)); + client->root->drain(client); + return ERROR_EVENT; /* Unsupported protocol */ } - if (client->root->callback->post_execute != NULL) - { - client->root->callback->post_execute(client, header); - } - - return rval != PROTOCOL_BINARY_RESPONSE_EIO; + return client->work(client, length, endptr); } /* @@ -1163,13 +231,15 @@ static bool execute_command(struct memcached_binary_protocol_client_st *client, ** * See protocol_handler.h for function description ** ********************************************************************** */ -struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void) +struct memcached_protocol_st *memcached_protocol_create_instance(void) { - struct memcached_binary_protocol_st *ret= calloc(1, sizeof(*ret)); + struct memcached_protocol_st *ret= calloc(1, sizeof(*ret)); if (ret != NULL) { ret->recv= default_recv; ret->send= default_send; + ret->drain= drain_output; + ret->spool= spool_output; ret->input_buffer_size= 1 * 1024 * 1024; ret->input_buffer= malloc(ret->input_buffer_size); if (ret->input_buffer == NULL) @@ -1191,57 +261,32 @@ struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(v return ret; } -void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance) +void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance) { cache_destroy(instance->buffer_cache); free(instance->input_buffer); free(instance); } -struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance) -{ - return instance->callback; -} - -void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback) +struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, int sock) { - instance->callback= callback; -} - -memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie) -{ - (void)cookie; - return raw_response_handler; -} - -void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable) -{ - instance->pedantic= enable; -} - -bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance) -{ - return instance->pedantic; -} - -struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock) -{ - struct memcached_binary_protocol_client_st *ret= calloc(1, sizeof(*ret)); + struct memcached_protocol_client_st *ret= calloc(1, sizeof(*ret)); if (ret != NULL) { ret->root= instance; ret->sock= sock; + ret->work= determine_protocol; } return ret; } -void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client) +void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client) { free(client); } -enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client) +enum MEMCACHED_PROTOCOL_EVENT memcached_protocol_client_work(struct memcached_protocol_client_st *client) { /* Try to send data and read from the socket */ bool more_data= true; @@ -1266,45 +311,12 @@ enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struc client->input_buffer_offset= 0; } - /* try to parse all of the received packets */ - protocol_binary_request_header *header; - header= (void*)client->root->input_buffer; - - if (header->request.magic != (uint8_t)PROTOCOL_BINARY_REQ) + void *endptr; + if (client->work(client, &len, &endptr) == ERROR_EVENT) { - client->error= EINVAL; return ERROR_EVENT; } - while (len >= (ssize_t)sizeof(*header) && - (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)))) - { - - /* I have the complete package */ - client->current_command = header; - if (!execute_command(client, header)) - { - return ERROR_EVENT; - } - - ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)); - len -= total; - if (len > 0) - { - intptr_t ptr= (intptr_t)header; - ptr += total; - if ((ptr % 8) == 0) - { - header= (void*)ptr; - } - else - { - memmove(client->root->input_buffer, (void*)ptr, (size_t)len); - header= (void*)client->root->input_buffer; - } - } - } - if (len > 0) { /* save the data for later on */ @@ -1315,7 +327,7 @@ enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struc client->error= ENOMEM; return ERROR_EVENT; } - memcpy(client->input_buffer, header, (size_t)len); + memcpy(client->input_buffer, endptr, (size_t)len); client->input_buffer_offset= (size_t)len; more_data= false; } diff --git a/libmemcached/protocol_handler.h b/libmemcached/protocol_handler.h index e1528d50..ef816a0d 100644 --- a/libmemcached/protocol_handler.h +++ b/libmemcached/protocol_handler.h @@ -17,12 +17,12 @@ /* Forward declarations */ /* - * You should only access memcached_binary_protocol_st from one thread!, + * You should only access memcached_protocol_st from one thread!, * and never assume anything about the internal layout / sizes of the * structures. */ -struct memcached_binary_protocol_st; -struct memcached_binary_protocol_client_st; +struct memcached_protocol_st; +struct memcached_protocol_client_st; /** * Function the protocol handler should call to receive data. @@ -35,10 +35,10 @@ struct memcached_binary_protocol_client_st; * @return the number of bytes copied into buf * or -1 upon error (errno should contain more information) */ -typedef ssize_t (*memcached_binary_protocol_recv_func)(const void *cookie, - int fd, - void *buf, - size_t nbuf); +typedef ssize_t (*memcached_protocol_recv_func)(const void *cookie, + int fd, + void *buf, + size_t nbuf); /** * Function the protocol handler should call to send data. @@ -51,10 +51,10 @@ typedef ssize_t (*memcached_binary_protocol_recv_func)(const void *cookie, * @return the number of bytes sent * or -1 upon error (errno should contain more information) */ -typedef ssize_t (*memcached_binary_protocol_send_func)(const void *cookie, - int fd, - const void *buf, - size_t nbuf); +typedef ssize_t (*memcached_protocol_send_func)(const void *cookie, + int fd, + const void *buf, + size_t nbuf); /** * Create an instance of the protocol handler @@ -62,14 +62,14 @@ typedef ssize_t (*memcached_binary_protocol_send_func)(const void *cookie, * @return NULL if allocation of an instance fails */ LIBMEMCACHED_API -struct memcached_binary_protocol_st *memcached_binary_protocol_create_instance(void); +struct memcached_protocol_st *memcached_protocol_create_instance(void); /** * Get the callbacks associated with a protocol handler instance * @return the callbacks currently used */ LIBMEMCACHED_API -struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_binary_protocol_st *instance); +struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(struct memcached_protocol_st *instance); /** * Set the callbacks to be used by the given protocol handler instance @@ -77,7 +77,7 @@ struct memcached_binary_protocol_callback_st *memcached_binary_protocol_get_call * @param callback the callbacks to use */ LIBMEMCACHED_API -void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback); +void memcached_binary_protocol_set_callbacks(struct memcached_protocol_st *instance, struct memcached_binary_protocol_callback_st *callback); /** * Should the library inspect the packages being sent and received and verify @@ -88,7 +88,7 @@ void memcached_binary_protocol_set_callbacks(struct memcached_binary_protocol_st * @param enable true if you want the library to check packages, false otherwise */ LIBMEMCACHED_API -void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st *instance, bool enable); +void memcached_binary_protocol_set_pedantic(struct memcached_protocol_st *instance, bool enable); /** * Is the library inpecting each package? @@ -96,7 +96,7 @@ void memcached_binary_protocol_set_pedantic(struct memcached_binary_protocol_st * @return true it the library is inspecting each package, false otherwise */ LIBMEMCACHED_API -bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st *instance); +bool memcached_binary_protocol_get_pedantic(struct memcached_protocol_st *instance); /** * Destroy an instance of the protocol handler @@ -104,7 +104,7 @@ bool memcached_binary_protocol_get_pedantic(struct memcached_binary_protocol_st * @param instance The instance to destroy */ LIBMEMCACHED_API -void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol_st *instance); +void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance); /** * Set the IO functions used by the instance to send and receive data. The @@ -115,9 +115,9 @@ void memcached_binary_protocol_destroy_instance(struct memcached_binary_protocol * @param send the function to call for sending data */ LIBMEMCACHED_API -void memached_binary_protocol_set_io_functions(struct memcached_binary_protocol_st *instance, - memcached_binary_protocol_recv_func recv, - memcached_binary_protocol_send_func send); +void memached_protocol_set_io_functions(struct memcached_protocol_st *instance, + memcached_protocol_recv_func recv, + memcached_protocol_send_func send); /** @@ -127,7 +127,7 @@ void memached_binary_protocol_set_io_functions(struct memcached_binary_protocol_ * @return NULL if allocation fails, otherwise an instance */ LIBMEMCACHED_API -struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_client(struct memcached_binary_protocol_st *instance, int sock); +struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, int sock); /** * Destroy a client handle. @@ -138,12 +138,12 @@ struct memcached_binary_protocol_client_st *memcached_binary_protocol_create_cli * @param client the client to destroy */ LIBMEMCACHED_API -void memcached_binary_protocol_client_destroy(struct memcached_binary_protocol_client_st *client); +void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client); /** * The different events the client is interested in */ -enum MEMCACHED_BINARY_PROTOCOL_EVENT { +enum MEMCACHED_PROTOCOL_EVENT { /* Error event means that the client encountered an error with the * connection so you should shut it down */ ERROR_EVENT, @@ -162,7 +162,7 @@ enum MEMCACHED_BINARY_PROTOCOL_EVENT { * @return The next event the protocol handler will be notified for */ LIBMEMCACHED_API -enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struct memcached_binary_protocol_client_st *client); +enum MEMCACHED_PROTOCOL_EVENT memcached_protocol_client_work(struct memcached_protocol_client_st *client); /** * Get the socket attached to a client handle @@ -170,7 +170,7 @@ enum MEMCACHED_BINARY_PROTOCOL_EVENT memcached_binary_protocol_client_work(struc * @return the socket handle */ LIBMEMCACHED_API -int memcached_binary_protocol_client_get_socket(struct memcached_binary_protocol_client_st *client); +int memcached_protocol_client_get_socket(struct memcached_protocol_client_st *client); /** * Get the error id socket attached to a client handle @@ -178,7 +178,7 @@ int memcached_binary_protocol_client_get_socket(struct memcached_binary_protocol * @return the OS error code from the client */ LIBMEMCACHED_API -int memcached_binary_protocol_client_get_errno(struct memcached_binary_protocol_client_st *client); +int memcached_protocol_client_get_errno(struct memcached_protocol_client_st *client); /** * Get a raw response handler for the given cookie