From: Trond Norbye Date: Tue, 27 Jan 2009 11:34:32 +0000 (+0100) Subject: Implemented support for noreply in the binary protocol X-Git-Tag: 0.26~1 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=a7cf02c028bde895f4e49b176443578956001060;p=awesomized%2Flibmemcached Implemented support for noreply in the binary protocol Please note that the quiet commands in the binary protocol was added recently, so you need a recent build of the memcached 1.3.x server. (at least http://github.com/dustin/memcached/commit/0e8a58a8afbb8f15e42b001f2442858cfa3dcbb6 ) --- diff --git a/libmemcached/memcached/protocol_binary.h b/libmemcached/memcached/protocol_binary.h index ee5dd0b9..08df72e8 100644 --- a/libmemcached/memcached/protocol_binary.h +++ b/libmemcached/memcached/protocol_binary.h @@ -93,7 +93,17 @@ extern "C" PROTOCOL_BINARY_CMD_GETKQ = 0x0d, PROTOCOL_BINARY_CMD_APPEND = 0x0e, PROTOCOL_BINARY_CMD_PREPEND = 0x0f, - PROTOCOL_BINARY_CMD_STAT = 0x10 + PROTOCOL_BINARY_CMD_STAT = 0x10, + PROTOCOL_BINARY_CMD_SETQ = 0x11, + PROTOCOL_BINARY_CMD_ADDQ = 0x12, + PROTOCOL_BINARY_CMD_REPLACEQ = 0x13, + PROTOCOL_BINARY_CMD_DELETEQ = 0x14, + PROTOCOL_BINARY_CMD_INCREMENTQ = 0x15, + PROTOCOL_BINARY_CMD_DECREMENTQ = 0x16, + PROTOCOL_BINARY_CMD_QUITQ = 0x17, + PROTOCOL_BINARY_CMD_FLUSHQ = 0x18, + PROTOCOL_BINARY_CMD_APPENDQ = 0x19, + PROTOCOL_BINARY_CMD_PREPENDQ = 0x1a } protocol_binary_command; /** diff --git a/libmemcached/memcached_response.c b/libmemcached/memcached_response.c index e9ff94ac..940c9f88 100644 --- a/libmemcached/memcached_response.c +++ b/libmemcached/memcached_response.c @@ -300,14 +300,30 @@ static memcached_return binary_read_one_response(memcached_server_st *ptr, else if (header.response.bodylen) { /* What should I do with the error message??? just discard it for now */ - char buffer[SMALL_STRING_LEN]; + char hole[SMALL_STRING_LEN]; while (bodylen > 0) { size_t nr= (bodylen > SMALL_STRING_LEN) ? SMALL_STRING_LEN : bodylen; - if (memcached_safe_read(ptr, buffer, nr) != MEMCACHED_SUCCESS) + if (memcached_safe_read(ptr, hole, nr) != MEMCACHED_SUCCESS) return MEMCACHED_UNKNOWN_READ_FAILURE; bodylen -= nr; } + + /* This might be an error from one of the quiet commands.. if + * so, just throw it away and get the next one. What about creating + * a callback to the user with the error information? + */ + switch (header.response.opcode) + { + case PROTOCOL_BINARY_CMD_SETQ: + case PROTOCOL_BINARY_CMD_ADDQ: + case PROTOCOL_BINARY_CMD_REPLACEQ: + case PROTOCOL_BINARY_CMD_APPENDQ: + case PROTOCOL_BINARY_CMD_PREPENDQ: + return binary_read_one_response(ptr, buffer, buffer_length, result); + default: + break; + } } memcached_return rc= MEMCACHED_SUCCESS; diff --git a/libmemcached/memcached_storage.c b/libmemcached/memcached_storage.c index 111d0ea7..4b626068 100644 --- a/libmemcached/memcached_storage.c +++ b/libmemcached/memcached_storage.c @@ -333,6 +333,55 @@ memcached_return memcached_cas_by_key(memcached_st *ptr, return rc; } +static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply) { + uint8_t ret; + + if (noreply) + switch (verb) + { + case SET_OP: + ret=PROTOCOL_BINARY_CMD_SETQ; + break; + case ADD_OP: + ret=PROTOCOL_BINARY_CMD_ADDQ; + break; + case CAS_OP: /* FALLTHROUGH */ + case REPLACE_OP: + ret=PROTOCOL_BINARY_CMD_REPLACEQ; + break; + case APPEND_OP: + ret=PROTOCOL_BINARY_CMD_APPENDQ; + break; + case PREPEND_OP: + ret=PROTOCOL_BINARY_CMD_PREPENDQ; + break; + } + else + switch (verb) + { + case SET_OP: + ret=PROTOCOL_BINARY_CMD_SET; + break; + case ADD_OP: + ret=PROTOCOL_BINARY_CMD_ADD; + break; + case CAS_OP: /* FALLTHROUGH */ + case REPLACE_OP: + ret=PROTOCOL_BINARY_CMD_REPLACE; + break; + case APPEND_OP: + ret=PROTOCOL_BINARY_CMD_APPEND; + break; + case PREPEND_OP: + ret=PROTOCOL_BINARY_CMD_PREPEND; + break; + } + + return ret; +} + + + static memcached_return memcached_send_binary(memcached_server_st* server, const char *key, size_t key_length, @@ -346,30 +395,10 @@ static memcached_return memcached_send_binary(memcached_server_st* server, char flush; protocol_binary_request_set request= {.bytes= {0}}; size_t send_length= sizeof(request.bytes); + bool noreply = server->root->flags & MEM_NOREPLY; request.message.header.request.magic= PROTOCOL_BINARY_REQ; - switch (verb) - { - case SET_OP: - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SET; - break; - case ADD_OP: - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_ADD; - break; - case REPLACE_OP: - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE; - break; - case APPEND_OP: - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_APPEND; - break; - case PREPEND_OP: - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_PREPEND; - break; - case CAS_OP: - request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE; - break; - } - + request.message.header.request.opcode= get_com_code(verb, noreply); request.message.header.request.keylen= htons((uint16_t)key_length); request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; if (verb == APPEND_OP || verb == PREPEND_OP) @@ -397,10 +426,16 @@ static memcached_return memcached_send_binary(memcached_server_st* server, memcached_io_reset(server); return MEMCACHED_WRITE_FAILURE; } + + if (noreply) + memcached_server_response_decrement(server); if (flush == 0) return MEMCACHED_BUFFERED; + if (noreply) + return MEMCACHED_SUCCESS; + return memcached_response(server, NULL, 0, NULL); } diff --git a/tests/function.c b/tests/function.c index 35d1c5a6..bb2d3993 100644 --- a/tests/function.c +++ b/tests/function.c @@ -3040,51 +3040,115 @@ static test_return noreply_test(memcached_st *memc) assert(ret == MEMCACHED_SUCCESS); ret= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1); assert(ret == MEMCACHED_SUCCESS); + ret= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, 1); + assert(ret == MEMCACHED_SUCCESS); assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NOREPLY) == 1); assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS) == 1); - - for (int x= 0; x < 100; ++x) { - char key[10]; - size_t len= sprintf(key, "%d", x); - ret= memcached_set(memc, key, len, key, len, 0, 0); - assert(ret == MEMCACHED_SUCCESS || ret == MEMCACHED_BUFFERED); - } - - /* - ** NOTE: Don't ever do this in your code! this is not a supported use of the - ** API and is _ONLY_ done this way to verify that the library works the - ** way it is supposed to do!!!! - */ - int no_msg= 0; - for (int x= 0; x < memc->number_of_hosts; ++x) { - no_msg+= memc->hosts[x].cursor_active; + assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS) == 1); + + for (int count=0; count < 5; ++count) + { + for (int x=0; x < 100; ++x) + { + char key[10]; + size_t len=sprintf(key, "%d", x); + switch (count) + { + case 0: + ret=memcached_add(memc, key, len, key, len, 0, 0); + break; + case 1: + ret=memcached_replace(memc, key, len, key, len, 0, 0); + break; + case 2: + ret=memcached_set(memc, key, len, key, len, 0, 0); + break; + case 3: + ret=memcached_append(memc, key, len, key, len, 0, 0); + break; + case 4: + ret=memcached_prepend(memc, key, len, key, len, 0, 0); + break; + } + assert(ret == MEMCACHED_SUCCESS || ret == MEMCACHED_BUFFERED); + } + + /* + ** NOTE: Don't ever do this in your code! this is not a supported use of the + ** API and is _ONLY_ done this way to verify that the library works the + ** way it is supposed to do!!!! + */ + int no_msg=0; + for (int x=0; x < memc->number_of_hosts; ++x) + no_msg+=memc->hosts[x].cursor_active; + + assert(no_msg == 0); + assert(memcached_flush_buffers(memc) == MEMCACHED_SUCCESS); + + /* + ** Now validate that all items was set properly! + */ + for (int x=0; x < 100; ++x) + { + char key[10]; + size_t len=sprintf(key, "%d", x); + size_t length; + uint32_t flags; + char* value=memcached_get(memc, key, strlen(key), + &length, &flags, &ret); + assert(ret == MEMCACHED_SUCCESS && value != NULL); + switch (count) + { + case 0: /* FALLTHROUGH */ + case 1: /* FALLTHROUGH */ + case 2: + assert(strncmp(value, key, len) == 0); + assert(len == length); + break; + case 3: + assert(length == len * 2); + break; + case 4: + assert(length == len * 3); + break; + } + free(value); + } } - - /* - ** The binary protocol does not implement quiet commands yet. Fix this - ** test they are implemented! - */ - if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) == 1) - assert(no_msg == 100); - else - assert(no_msg == 0); - assert(memcached_flush_buffers(memc) == MEMCACHED_SUCCESS); + /* Try setting an illegal cas value (should not return an error to + * the caller (because we don't expect a return message from the server) + */ + char* keys[]= {"0"}; + size_t lengths[]= {1}; + size_t length; + uint32_t flags; + memcached_result_st results_obj; + memcached_result_st *results; + ret=memcached_mget(memc, keys, lengths, 1); + assert(ret == MEMCACHED_SUCCESS); + + results=memcached_result_create(memc, &results_obj); + assert(results); + results=memcached_fetch_result(memc, &results_obj, &ret); + assert(results); + assert(ret == MEMCACHED_SUCCESS); + uint64_t cas= memcached_result_cas(results); + memcached_result_free(&results_obj); + + ret= memcached_cas(memc, keys[0], lengths[0], keys[0], lengths[0], 0, 0, cas); + assert(ret == MEMCACHED_SUCCESS); /* - ** Now validate that all items was set properly! - */ - for (int x= 0; x < 100; ++x) { - char key[10]; - size_t len= sprintf(key, "%d", x); - size_t length; - uint32_t flags; - char* value= memcached_get(memc, key, strlen(key), - &length, &flags, &ret); - assert(ret == MEMCACHED_SUCCESS && value != NULL); - assert(strncmp(value, key, len) == 0); - free(value); - } + * The item will have a new cas value, so try to set it again with the old + * value. This should fail! + */ + ret= memcached_cas(memc, keys[0], lengths[0], keys[0], lengths[0], 0, 0, cas); + assert(ret == MEMCACHED_SUCCESS); + assert(memcached_flush_buffers(memc) == MEMCACHED_SUCCESS); + char* value=memcached_get(memc, keys[0], lengths[0], &length, &flags, &ret); + assert(ret == MEMCACHED_SUCCESS && value != NULL); + free(value); return TEST_SUCCESS; }