X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcachedprotocol%2Fascii_handler.c;h=8cd29720bae5c7c681d95eb974793a12176ec6fc;hb=357b107e59d9918a0f3bdd7e4aad9493c70e03f1;hp=f6134b3c31af29851e219cc60f1dcd664beabbf1;hpb=9c5fa1db34c5fb1ffed88742caeffa5a9afd0a9e;p=awesomized%2Flibmemcached diff --git a/libmemcachedprotocol/ascii_handler.c b/libmemcachedprotocol/ascii_handler.c index f6134b3c..8cd29720 100644 --- a/libmemcachedprotocol/ascii_handler.c +++ b/libmemcachedprotocol/ascii_handler.c @@ -40,6 +40,87 @@ #include #include #include +#include + + +static void print_ascii_command(memcached_protocol_client_st *client) +{ + if (client->is_verbose) + { + switch (client->ascii_command) + { + case SET_CMD: + fprintf(stderr, "%s:%d SET_CMD\n", __FILE__, __LINE__); + break; + + case ADD_CMD: + fprintf(stderr, "%s:%d ADD_CMD\n", __FILE__, __LINE__); + break; + + case REPLACE_CMD: + fprintf(stderr, "%s:%d REPLACE_CMD\n", __FILE__, __LINE__); + break; + + case CAS_CMD: + fprintf(stderr, "%s:%d CAS_CMD\n", __FILE__, __LINE__); + break; + + case APPEND_CMD: + fprintf(stderr, "%s:%d APPEND_CMD\n", __FILE__, __LINE__); + break; + + case PREPEND_CMD: + fprintf(stderr, "%s:%d PREPEND_CMD\n", __FILE__, __LINE__); + break; + + case DELETE_CMD: + fprintf(stderr, "%s:%d DELETE_CMD\n", __FILE__, __LINE__); + break; + + case INCR_CMD: /* FALLTHROUGH */ + fprintf(stderr, "%s:%d INCR_CMD\n", __FILE__, __LINE__); + break; + + case DECR_CMD: + fprintf(stderr, "%s:%d DECR_CMD\n", __FILE__, __LINE__); + break; + + case STATS_CMD: + fprintf(stderr, "%s:%d STATS_CMD\n", __FILE__, __LINE__); + break; + + case FLUSH_ALL_CMD: + fprintf(stderr, "%s:%d FLUSH_ALL_CMD\n", __FILE__, __LINE__); + break; + + case VERSION_CMD: + fprintf(stderr, "%s:%d VERSION_CMD\n", __FILE__, __LINE__); + break; + + case QUIT_CMD: + fprintf(stderr, "%s:%d QUIT_CMD\n", __FILE__, __LINE__); + break; + + case VERBOSITY_CMD: + fprintf(stderr, "%s:%d VERBOSITY_CMD\n", __FILE__, __LINE__); + break; + + case GET_CMD: + fprintf(stderr, "%s:%d GET_CMD\n", __FILE__, __LINE__); + break; + + case GETS_CMD: + fprintf(stderr, "%s:%d GETS_CMD\n", __FILE__, __LINE__); + break; + + default: + case UNKNOWN_CMD: + fprintf(stderr, "%s:%d UNKNOWN_CMD\n", __FILE__, __LINE__); + break; + + } + } +} /** * Try to parse a key from the string. @@ -81,9 +162,50 @@ static uint16_t parse_ascii_key(char **start) * @param text the text to spool * @return status of the spool operation */ -static protocol_binary_response_status -spool_string(memcached_protocol_client_st *client, const char *text) +static protocol_binary_response_status raw_response_handler(memcached_protocol_client_st *client, const char *text) { + if (client->is_verbose) + { + fprintf(stderr, "%s:%d %s\n", __FILE__, __LINE__, text); + } + + if (client->root->drain(client) == false) + { + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; + } + + assert(client->output != NULL); +#if 0 + if (client->output == NULL) + { + /* I can write directly to the socket.... */ + do + { + size_t num_bytes= len -offset; + ssize_t nw= client->root->send(client, + client->sock, + ptr + offset, + num_bytes); + if (nw == -1) + { + if (get_socket_errno() == EWOULDBLOCK) + { + break; + } + else if (get_socket_errno() != EINTR) + { + client->error= errno; + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; + } + } + else + { + offset += (size_t)nw; + } + } while (offset < len); + } +#endif + return client->root->spool(client, text, strlen(text)); } @@ -103,7 +225,7 @@ static void send_command_usage(memcached_protocol_client_st *client) [CAS_CMD]= "CLIENT_ERROR: Syntax error: cas [noreply]\r\n", [APPEND_CMD]= "CLIENT_ERROR: Syntax error: append [noreply]\r\n", [PREPEND_CMD]= "CLIENT_ERROR: Syntax error: prepend [noreply]\r\n", - [DELETE_CMD]= "CLIENT_ERROR: Syntax error: delete [noreply]\r\n", + [DELETE_CMD]= "CLIENT_ERROR: Syntax error: delete_object [noreply]\r\n", [INCR_CMD]= "CLIENT_ERROR: Syntax error: incr [noreply]\r\n", [DECR_CMD]= "CLIENT_ERROR: Syntax error: decr [noreply]\r\n", [STATS_CMD]= "CLIENT_ERROR: Syntax error: stats [key]\r\n", @@ -116,7 +238,7 @@ static void send_command_usage(memcached_protocol_client_st *client) }; client->mute = false; - spool_string(client, errmsg[client->ascii_command]); + raw_response_handler(client, errmsg[client->ascii_command]); } /** @@ -125,15 +247,14 @@ static void send_command_usage(memcached_protocol_client_st *client) * @param text the length of the body * @param textlen the length of the body */ -static protocol_binary_response_status -ascii_version_response_handler(const void *cookie, - const void *text, - uint32_t textlen) +static protocol_binary_response_status ascii_version_response_handler(const void *cookie, + const void *text, + uint32_t textlen) { memcached_protocol_client_st *client= (memcached_protocol_client_st*)cookie; - spool_string(client, "VERSION "); + raw_response_handler(client, "VERSION "); client->root->spool(client, text, textlen); - spool_string(client, "\r\n"); + raw_response_handler(client, "\r\n"); return PROTOCOL_BINARY_RESPONSE_SUCCESS; } @@ -204,27 +325,26 @@ ascii_get_response_handler(const void *cookie, * @param body the length of the body * @param bodylen the length of the body */ -static protocol_binary_response_status -ascii_stat_response_handler(const void *cookie, - const void *key, - uint16_t keylen, - const void *body, - uint32_t bodylen) +static protocol_binary_response_status ascii_stat_response_handler(const void *cookie, + const void *key, + uint16_t keylen, + const void *body, + uint32_t bodylen) { memcached_protocol_client_st *client= (void*)cookie; if (key != NULL) { - spool_string(client, "STAT "); + raw_response_handler(client, "STAT "); client->root->spool(client, key, keylen); - spool_string(client, " "); + raw_response_handler(client, " "); client->root->spool(client, body, bodylen); - spool_string(client, "\r\n"); + raw_response_handler(client, "\r\n"); } else { - spool_string(client, "END\r\n"); + raw_response_handler(client, "END\r\n"); } return PROTOCOL_BINARY_RESPONSE_SUCCESS; @@ -264,7 +384,9 @@ static void ascii_process_gets(memcached_protocol_client_st *client, send_command_usage(client); } else + { client->root->spool(client, "END\r\n", 5); + } } /** @@ -353,7 +475,7 @@ static enum ascii_cmd ascii_to_cmd(char *start, size_t length) { .cmd= "cas", .len= 3, .cc= CAS_CMD }, { .cmd= "append", .len= 6, .cc= APPEND_CMD }, { .cmd= "prepend", .len= 7, .cc= PREPEND_CMD }, - { .cmd= "delete", .len= 6, .cc= DELETE_CMD }, + { .cmd= "delete_object", .len= 6, .cc= DELETE_CMD }, { .cmd= "incr", .len= 4, .cc= INCR_CMD }, { .cmd= "decr", .len= 4, .cc= DECR_CMD }, { .cmd= "stats", .len= 5, .cc= STATS_CMD }, @@ -383,7 +505,7 @@ static enum ascii_cmd ascii_to_cmd(char *start, size_t length) } /** - * Perform a delete operation. + * Perform a delete_object operation. * * @param client client requesting the deletion * @param tokens the command as a vector @@ -401,28 +523,27 @@ static void process_delete(memcached_protocol_client_st *client, return; } - if (client->root->callback->interface.v1.delete == NULL) + if (client->root->callback->interface.v1.delete_object == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } - protocol_binary_response_status rval; - rval= client->root->callback->interface.v1.delete(client, key, nkey, 0); + protocol_binary_response_status rval= client->root->callback->interface.v1.delete_object(client, key, nkey, 0); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) { - spool_string(client, "DELETED\r\n"); + raw_response_handler(client, "DELETED\r\n"); } else if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) { - spool_string(client, "NOT_FOUND\r\n"); + raw_response_handler(client, "NOT_FOUND\r\n"); } else { char msg[80]; - snprintf(msg, sizeof(msg), "SERVER_ERROR: delete failed %u\r\n",(uint32_t)rval); - spool_string(client, msg); + snprintf(msg, sizeof(msg), "SERVER_ERROR: delete_object failed %u\r\n",(uint32_t)rval); + raw_response_handler(client, msg); } } @@ -447,7 +568,7 @@ static void process_arithmetic(memcached_protocol_client_st *client, { if (client->root->callback->interface.v1.increment == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } rval= client->root->callback->interface.v1.increment(client, @@ -461,7 +582,7 @@ static void process_arithmetic(memcached_protocol_client_st *client, { if (client->root->callback->interface.v1.decrement == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } rval= client->root->callback->interface.v1.decrement(client, @@ -476,11 +597,11 @@ static void process_arithmetic(memcached_protocol_client_st *client, { char buffer[80]; snprintf(buffer, sizeof(buffer), "%"PRIu64"\r\n", result); - spool_string(client, buffer); + raw_response_handler(client, buffer); } else { - spool_string(client, "NOT_FOUND\r\n"); + raw_response_handler(client, "NOT_FOUND\r\n"); } } @@ -494,12 +615,14 @@ static void process_stats(memcached_protocol_client_st *client, { if (client->root->callback->interface.v1.stat == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } while (isspace(*key)) + { key++; + } uint16_t nkey= (uint16_t)(end - key); (void)client->root->callback->interface.v1.stat(client, key, nkey, @@ -518,7 +641,7 @@ static void process_version(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.version == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } @@ -535,9 +658,9 @@ static void process_flush(memcached_protocol_client_st *client, return; } - if (client->root->callback->interface.v1.flush == NULL) + if (client->root->callback->interface.v1.flush_object == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return; } @@ -548,11 +671,11 @@ static void process_flush(memcached_protocol_client_st *client, } protocol_binary_response_status rval; - rval= client->root->callback->interface.v1.flush(client, timeout); + rval= client->root->callback->interface.v1.flush_object(client, timeout); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) - spool_string(client, "OK\r\n"); + raw_response_handler(client, "OK\r\n"); else - spool_string(client, "SERVER_ERROR: internal error\r\n"); + raw_response_handler(client, "SERVER_ERROR: internal error\r\n"); } /** @@ -570,8 +693,8 @@ static void process_flush(memcached_protocol_client_st *client, * 1 We need more data, so just go ahead and wait for more! */ static inline int process_storage_command(memcached_protocol_client_st *client, - char **tokens, int ntokens, char *start, - char **end, ssize_t length) + char **tokens, int ntokens, char *start, + char **end, ssize_t length) { (void)ntokens; /* already checked */ char *key= tokens[1]; @@ -579,7 +702,7 @@ static inline int process_storage_command(memcached_protocol_client_st *client, if (nkey == 0) { /* return error */ - spool_string(client, "CLIENT_ERROR: bad key\r\n"); + raw_response_handler(client, "CLIENT_ERROR: bad key\r\n"); return -1; } @@ -666,7 +789,7 @@ static inline int process_storage_command(memcached_protocol_client_st *client, if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS) { - spool_string(client, "STORED\r\n"); + raw_response_handler(client, "STORED\r\n"); } else { @@ -674,20 +797,20 @@ static inline int process_storage_command(memcached_protocol_client_st *client, { if (rval == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS) { - spool_string(client, "EXISTS\r\n"); + raw_response_handler(client, "EXISTS\r\n"); } else if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) { - spool_string(client, "NOT_FOUND\r\n"); + raw_response_handler(client, "NOT_FOUND\r\n"); } else { - spool_string(client, "NOT_STORED\r\n"); + raw_response_handler(client, "NOT_STORED\r\n"); } } else { - spool_string(client, "NOT_STORED\r\n"); + raw_response_handler(client, "NOT_STORED\r\n"); } } @@ -708,7 +831,7 @@ static int process_cas_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.replace == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -727,7 +850,7 @@ static int process_set_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.set == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -746,7 +869,7 @@ static int process_add_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.add == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -765,7 +888,7 @@ static int process_replace_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.replace == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -784,7 +907,7 @@ static int process_append_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.append == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -803,7 +926,7 @@ static int process_prepend_command(memcached_protocol_client_st *client, if (client->root->callback->interface.v1.prepend == NULL) { - spool_string(client, "SERVER_ERROR: callback not implemented\r\n"); + raw_response_handler(client, "SERVER_ERROR: callback not implemented\r\n"); return false; } @@ -831,20 +954,34 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto client->ascii_command= ascii_to_cmd(ptr, (size_t)(*length)); + /* we got all data available, execute the callback! */ + if (client->root->callback->pre_execute != NULL) + { + client->root->callback->pre_execute(client, NULL); + } + + /* A multiget lists all of the keys, and I don't want to have an * avector of let's say 512 pointers to tokenize all of them, so let's * just handle them immediately */ if (client->ascii_command == GET_CMD || - client->ascii_command == GETS_CMD) { + client->ascii_command == GETS_CMD) + { if (client->root->callback->interface.v1.get != NULL) + { ascii_process_gets(client, ptr, end); + } else - spool_string(client, "SERVER_ERROR: Command not implemented\n"); - } else { + { + raw_response_handler(client, "SERVER_ERROR: Command not implemented\n"); + } + } + else + { /* None of the defined commands takes 10 parameters, so lets just use * that as a maximum limit. - */ + */ char *tokens[10]; int ntokens= ascii_tokenize_command(ptr, end, tokens, 10); @@ -852,33 +989,40 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto { client->mute= strcmp(tokens[ntokens - 1], "noreply") == 0; if (client->mute) + { --ntokens; /* processed noreply token*/ + } } int error= 0; - switch (client->ascii_command) { + print_ascii_command(client); + switch (client->ascii_command) + { case SET_CMD: error= process_set_command(client, tokens, ntokens, ptr, &end, *length); break; + case ADD_CMD: error= process_add_command(client, tokens, ntokens, ptr, &end, *length); break; + case REPLACE_CMD: - error= process_replace_command(client, tokens, ntokens, - ptr, &end, *length); + error= process_replace_command(client, tokens, ntokens, ptr, &end, *length); break; + case CAS_CMD: error= process_cas_command(client, tokens, ntokens, ptr, &end, *length); break; + case APPEND_CMD: - error= process_append_command(client, tokens, ntokens, - ptr, &end, *length); + error= process_append_command(client, tokens, ntokens, ptr, &end, *length); break; + case PREPEND_CMD: - error= process_prepend_command(client, tokens, ntokens, - ptr, &end, *length); - break; + error= process_prepend_command(client, tokens, ntokens, ptr, &end, *length); + break; + case DELETE_CMD: process_delete(client, tokens, ntokens); break; @@ -887,6 +1031,7 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto case DECR_CMD: process_arithmetic(client, tokens, ntokens); break; + case STATS_CMD: if (client->mute) { @@ -898,9 +1043,11 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto process_stats(client, ptr + 6, end); } break; + case FLUSH_ALL_CMD: process_flush(client, tokens, ntokens); break; + case VERSION_CMD: if (client->mute) { @@ -911,6 +1058,7 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto process_version(client, tokens, ntokens); } break; + case QUIT_CMD: if (ntokens != 1 || client->mute) { @@ -919,7 +1067,9 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto else { if (client->root->callback->interface.v1.quit != NULL) + { client->root->callback->interface.v1.quit(client); + } return MEMCACHED_PROTOCOL_ERROR_EVENT; } @@ -927,9 +1077,13 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto case VERBOSITY_CMD: if (ntokens != 2) + { send_command_usage(client); + } else - spool_string(client, "OK\r\n"); + { + raw_response_handler(client, "OK\r\n"); + } break; case UNKNOWN_CMD: @@ -944,9 +1098,18 @@ memcached_protocol_event_t memcached_ascii_protocol_process_data(memcached_proto } if (error == -1) + { return MEMCACHED_PROTOCOL_ERROR_EVENT; + } else if (error == 1) + { return MEMCACHED_PROTOCOL_READ_EVENT; + } + } + + if (client->root->callback->post_execute != NULL) + { + client->root->callback->post_execute(client, NULL); } /* Move past \n */