From 933e5a561b10c1450563f1421b83d72396b805d2 Mon Sep 17 00:00:00 2001 From: Date: Sat, 26 Jul 2008 15:46:42 +0200 Subject: [PATCH] Initial support for the binary protocol --- clients/client_options.h | 1 + clients/memcat.c | 6 + clients/memcp.c | 6 + clients/memflush.c | 6 + clients/memrm.c | 6 + clients/memslap.c | 7 + config/byteorder.m4 | 11 ++ config/memcached.m4 | 38 ++++++ configure.ac | 2 + docs/memcached_behavior.pod | 5 + libmemcached/Makefile.am | 3 +- libmemcached/byteorder.c | 28 ++++ libmemcached/common.h | 8 +- libmemcached/memcached_auto.c | 53 +++++++- libmemcached/memcached_behavior.c | 6 + libmemcached/memcached_constants.h | 1 + libmemcached/memcached_delete.c | 75 ++++++++--- libmemcached/memcached_do.c | 2 +- libmemcached/memcached_flush.c | 56 +++++++- libmemcached/memcached_get.c | 96 ++++++++++++++ libmemcached/memcached_io.c | 6 +- libmemcached/memcached_io.h | 4 +- libmemcached/memcached_quit.c | 13 +- libmemcached/memcached_response.c | 199 +++++++++++++++++++++++++++++ libmemcached/memcached_stats.c | 77 ++++++++++- libmemcached/memcached_storage.c | 88 ++++++++++++- libmemcached/memcached_version.c | 56 ++++++++ tests/function.c | 24 +++- 28 files changed, 840 insertions(+), 43 deletions(-) create mode 100644 config/byteorder.m4 create mode 100644 config/memcached.m4 create mode 100644 libmemcached/byteorder.c diff --git a/clients/client_options.h b/clients/client_options.h index b7ba5a96..33a0f81b 100644 --- a/clients/client_options.h +++ b/clients/client_options.h @@ -22,6 +22,7 @@ typedef enum { OPT_SLAP_TCP_NODELAY, OPT_FLUSH, OPT_HASH, + OPT_BINARY, } memcached_options; #endif /* CLIENT_OPTIONS */ diff --git a/clients/memcat.c b/clients/memcat.c index b9db6497..36e63622 100644 --- a/clients/memcat.c +++ b/clients/memcat.c @@ -14,6 +14,7 @@ /* Prototypes */ void options_parse(int argc, char *argv[]); +static int opt_binary= 0; static int opt_verbose= 0; static int opt_displayflag= 0; static char *opt_servers= NULL; @@ -50,6 +51,7 @@ int main(int argc, char *argv[]) memcached_server_push(memc, servers); memcached_server_list_free(servers); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary); while (optind < argc) { @@ -113,6 +115,7 @@ void options_parse(int argc, char *argv[]) {"servers", required_argument, NULL, OPT_SERVERS}, {"flag", no_argument, &opt_displayflag, OPT_FLAG}, {"hash", required_argument, NULL, OPT_HASH}, + {"binary", no_argument, NULL, OPT_BINARY}, {0, 0, 0, 0}, }; @@ -124,6 +127,9 @@ void options_parse(int argc, char *argv[]) { case 0: break; + case OPT_BINARY: + opt_binary = 1; + break; case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break; diff --git a/clients/memcp.c b/clients/memcp.c index cf06a573..110a8559 100644 --- a/clients/memcp.c +++ b/clients/memcp.c @@ -22,6 +22,7 @@ /* Prototypes */ void options_parse(int argc, char *argv[]); +static int opt_binary=0; static int opt_verbose= 0; static char *opt_servers= NULL; static char *opt_hash= NULL; @@ -60,6 +61,7 @@ int main(int argc, char *argv[]) memcached_server_push(memc, servers); memcached_server_list_free(servers); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary); while (optind < argc) { @@ -172,6 +174,7 @@ void options_parse(int argc, char *argv[]) {"add", no_argument, NULL, OPT_ADD}, {"replace", no_argument, NULL, OPT_REPLACE}, {"hash", required_argument, NULL, OPT_HASH}, + {"binary", no_argument, NULL, OPT_BINARY}, {0, 0, 0, 0}, }; @@ -185,6 +188,9 @@ void options_parse(int argc, char *argv[]) { case 0: break; + case OPT_BINARY: + opt_binary = 1; + break; case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break; diff --git a/clients/memflush.c b/clients/memflush.c index 3bf25c34..5208e5cd 100644 --- a/clients/memflush.c +++ b/clients/memflush.c @@ -6,6 +6,7 @@ #include "client_options.h" #include "utilities.h" +static int opt_binary= 0; static int opt_verbose= 0; static time_t opt_expire= 0; static char *opt_servers= NULL; @@ -42,6 +43,7 @@ int main(int argc, char *argv[]) servers= memcached_servers_parse(opt_servers); memcached_server_push(memc, servers); memcached_server_list_free(servers); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary); rc = memcached_flush(memc, opt_expire); if (rc != MEMCACHED_SUCCESS) @@ -76,6 +78,7 @@ void options_parse(int argc, char *argv[]) {"debug", no_argument, &opt_verbose, OPT_DEBUG}, {"servers", required_argument, NULL, OPT_SERVERS}, {"expire", required_argument, NULL, OPT_EXPIRE}, + {"binary", no_argument, NULL, OPT_BINARY}, {0, 0, 0, 0}, }; int option_index= 0; @@ -89,6 +92,9 @@ void options_parse(int argc, char *argv[]) { case 0: break; + case OPT_BINARY: + opt_binary = 1; + break; case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break; diff --git a/clients/memrm.c b/clients/memrm.c index 05fe404b..1570945e 100644 --- a/clients/memrm.c +++ b/clients/memrm.c @@ -6,6 +6,7 @@ #include "client_options.h" #include "utilities.h" +static int opt_binary= 0; static int opt_verbose= 0; static time_t opt_expire= 0; static char *opt_servers= NULL; @@ -44,6 +45,7 @@ int main(int argc, char *argv[]) servers= memcached_servers_parse(opt_servers); memcached_server_push(memc, servers); memcached_server_list_free(servers); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary); while (optind < argc) { @@ -90,6 +92,7 @@ void options_parse(int argc, char *argv[]) {"servers", required_argument, NULL, OPT_SERVERS}, {"expire", required_argument, NULL, OPT_EXPIRE}, {"hash", required_argument, NULL, OPT_HASH}, + {"binary", no_argument, NULL, OPT_BINARY}, {0, 0, 0, 0}, }; int option_index= 0; @@ -103,6 +106,9 @@ void options_parse(int argc, char *argv[]) { case 0: break; + case OPT_BINARY: + opt_binary = 1; + break; case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break; diff --git a/clients/memslap.c b/clients/memslap.c index 6fc8176f..ddb18f3e 100644 --- a/clients/memslap.c +++ b/clients/memslap.c @@ -67,6 +67,7 @@ pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, unsigned int *actual_loaded); void flush_all(memcached_st *memc); +static int opt_binary= 0; static int opt_verbose= 0; static int opt_flush= 0; static int opt_non_blocking_io= 0; @@ -140,6 +141,8 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion) memc= memcached_create(NULL); memcached_server_push(memc, servers); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary); + if (opt_flush) flush_all(memc); if (opt_createial_load) @@ -243,6 +246,7 @@ void options_parse(int argc, char *argv[]) {"test", required_argument, NULL, OPT_SLAP_TEST}, {"verbose", no_argument, &opt_verbose, OPT_VERBOSE}, {"version", no_argument, NULL, OPT_VERSION}, + {"binary", no_argument, NULL, OPT_BINARY}, {0, 0, 0, 0}, }; @@ -257,6 +261,9 @@ void options_parse(int argc, char *argv[]) { case 0: break; + case OPT_BINARY: + opt_binary = 1; + break; case OPT_VERBOSE: /* --verbose or -v */ opt_verbose = OPT_VERBOSE; break; diff --git a/config/byteorder.m4 b/config/byteorder.m4 new file mode 100644 index 00000000..fe88605a --- /dev/null +++ b/config/byteorder.m4 @@ -0,0 +1,11 @@ +AC_RUN_IFELSE([ + AC_LANG_PROGRAM([ +#include +#include +#include + ], [ +if (htonl(5) != 5) { + return 1; +} + ]) + ], AC_DEFINE([BYTEORDER_BIG_ENDIAN], [1], [Enable big endian byteorder])) diff --git a/config/memcached.m4 b/config/memcached.m4 new file mode 100644 index 00000000..140e48a2 --- /dev/null +++ b/config/memcached.m4 @@ -0,0 +1,38 @@ +dnl --------------------------------------------------------------------------- +dnl Macro: MEMCACHED_TEST +dnl --------------------------------------------------------------------------- +saved_CPPFLAGS="$CPPFLAGS" +CPPFLAGS="" +AC_ARG_WITH(memcached, + [ --with-memcached=PATH Specify path to memcached installation ], + [ + if test "x$withval" != "xno" ; then + CPPFLAGS="-I${withval}/include" + fi + ] +) + +AC_CHECK_HEADER(memcached/protocol_binary.h, [ + AC_RUN_IFELSE([ + AC_LANG_PROGRAM([ + #include "memcached/protocol_binary.h" + ], [ + protocol_binary_request_set request; + if (sizeof(request) != sizeof(request.bytes)) { + return 1; + } + ]) + ],[ + if test "x$CPPFLAGS" != "x" ; then + CPPFLAGS="$saved_CPPFLAGS $CPPFLAGS" + else + CPPFLAGS="$saved_CPPFLAGS" + fi + ], AC_MSG_ERROR([Unsupported struct padding done by compiler.])) + ], [ + AC_MSG_ERROR([Cannot locate memcached/protocol_binary.]) + ]) + +dnl --------------------------------------------------------------------------- +dnl End Macro: MEMCACHED_TEST +dnl --------------------------------------------------------------------------- diff --git a/configure.ac b/configure.ac index 5c9ed3df..83463ac6 100644 --- a/configure.ac +++ b/configure.ac @@ -51,6 +51,8 @@ AC_LANG_CPLUSPLUS sinclude(config/debug.m4) sinclude(config/dtrace.m4) +sinclude(config/memcached.m4) +sinclude(config/byteorder.m4) sinclude(config/64bit.m4) # We only support GCC and Sun's forte at the moment diff --git a/docs/memcached_behavior.pod b/docs/memcached_behavior.pod index f717984d..8821dcdf 100755 --- a/docs/memcached_behavior.pod +++ b/docs/memcached_behavior.pod @@ -121,6 +121,11 @@ sorted order. This will defeat consisten hashing. In non-blocking mode this changes the value of the timeout during socket connection. +=item MEMCACHED_BEHAVIOR_BINARY_PROTOCOL + +Enable the use of the binary protocol. Please note that you cannot toggle +this flag on an open connection. + =back =head1 RETURN diff --git a/libmemcached/Makefile.am b/libmemcached/Makefile.am index 550b4890..7cb12a01 100644 --- a/libmemcached/Makefile.am +++ b/libmemcached/Makefile.am @@ -67,7 +67,8 @@ libmemcached_la_SOURCES = crc.c \ memcached_strerror.c \ memcached_verbosity.c \ memcached_version.c \ - murmur_hash.c + murmur_hash.c \ + byteorder.c libmemcached_la_LIBADD = libmemcached_la_LDFLAGS = -version-info $(MEMCACHED_LIBRARY_VERSION) diff --git a/libmemcached/byteorder.c b/libmemcached/byteorder.c new file mode 100644 index 00000000..f407fa03 --- /dev/null +++ b/libmemcached/byteorder.c @@ -0,0 +1,28 @@ +#include "common.h" + +/* Byte swap a 64-bit number. */ +static inline uint64_t swap64(uint64_t in) { +#ifndef BYTEORDER_BIG_ENDIAN + /* Little endian, flip the bytes around until someone makes a faster/better + * way to do this. */ + uint64_t rv= 0; + int i= 0; + for(i= 0; i < 8; i++) + { + rv= (rv << 8) | (in & 0xff); + in >>= 8; + } + return rv; +#else + /* big-endian machines don't need byte swapping */ + return in; +#endif +} + +uint64_t ntohll(uint64_t value) { + return swap64(value); +} + +uint64_t htonll(uint64_t value) { + return swap64(value); +} diff --git a/libmemcached/common.h b/libmemcached/common.h index 0c9b87ce..90c8a140 100644 --- a/libmemcached/common.h +++ b/libmemcached/common.h @@ -38,6 +38,7 @@ #include #include "memcached_io.h" +#include "memcached/protocol_binary.h" #include #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96) @@ -80,6 +81,7 @@ typedef enum { MEM_VERIFY_KEY= (1 << 10), /* 11 used for weighted ketama */ MEM_KETAMA_WEIGHTED= (1 << 11), + MEM_BINARY_PROTOCOL= (1 << 12), } memcached_flags; /* Hashing algo */ @@ -100,7 +102,7 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death); #define memcached_server_response_decrement(A) (A)->cursor_active-- #define memcached_server_response_reset(A) (A)->cursor_active=0 -memcached_return memcached_do(memcached_server_st *ptr, const char *commmand, +memcached_return memcached_do(memcached_server_st *ptr, const void *commmand, size_t command_length, uint8_t with_flush); memcached_return memcached_version(memcached_st *ptr); memcached_return value_fetch(memcached_server_st *ptr, @@ -114,4 +116,8 @@ memcached_return memcachd_key_test(char **keys, size_t *key_length, memcached_return run_distribution(memcached_st *ptr); uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length); + +extern uint64_t ntohll(uint64_t); +extern uint64_t htonll(uint64_t); + #endif /* __COMMON_H__ */ diff --git a/libmemcached/memcached_auto.c b/libmemcached/memcached_auto.c index c847f936..449d4c4e 100644 --- a/libmemcached/memcached_auto.c +++ b/libmemcached/memcached_auto.c @@ -62,6 +62,45 @@ static memcached_return memcached_auto(memcached_st *ptr, return rc; } +static memcached_return binary_incr_decr(memcached_st *ptr, uint8_t cmd, + const char *key, size_t key_length, + uint32_t offset, uint64_t *value) +{ + unsigned int server_key; + + unlikely (key_length == 0) + return MEMCACHED_NO_KEY_PROVIDED; + + unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0) + return MEMCACHED_NO_SERVERS; + + if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) + return MEMCACHED_BAD_KEY_PROVIDED; + + server_key= memcached_generate_hash(ptr, key, key_length); + + protocol_binary_request_incr request= {0}; + + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= cmd; + request.message.header.request.keylen= htons((uint16_t)key_length); + request.message.header.request.extlen= 20; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl(key_length + request.message.header.request.extlen); + request.message.body.delta= htonll(offset); + + /* TODO: The binary protocol allows you to specify initial and expiry time */ + if ((memcached_do(&ptr->hosts[server_key], request.bytes, + sizeof(request.bytes), 0)!=MEMCACHED_SUCCESS) || + (memcached_io_write(&ptr->hosts[server_key], key, key_length, 1) == -1)) + { + memcached_io_reset(&ptr->hosts[server_key]); + return MEMCACHED_WRITE_FAILURE; + } + + return memcached_response(&ptr->hosts[server_key], value, sizeof(*value), NULL); +} + memcached_return memcached_increment(memcached_st *ptr, const char *key, size_t key_length, uint32_t offset, @@ -70,7 +109,12 @@ memcached_return memcached_increment(memcached_st *ptr, memcached_return rc; LIBMEMCACHED_MEMCACHED_INCREMENT_START(); - rc= memcached_auto(ptr, "incr", key, key_length, offset, value); + if (ptr->flags & MEM_BINARY_PROTOCOL) + rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_INCREMENT, key, + key_length, offset, value); + else + rc= memcached_auto(ptr, "incr", key, key_length, offset, value); + LIBMEMCACHED_MEMCACHED_INCREMENT_END(); return rc; @@ -84,7 +128,12 @@ memcached_return memcached_decrement(memcached_st *ptr, memcached_return rc; LIBMEMCACHED_MEMCACHED_DECREMENT_START(); - rc= memcached_auto(ptr, "decr", key, key_length, offset, value); + if (ptr->flags & MEM_BINARY_PROTOCOL) + rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_DECREMENT, key, + key_length, offset, value); + else + rc= memcached_auto(ptr, "decr", key, key_length, offset, value); + LIBMEMCACHED_MEMCACHED_DECREMENT_END(); return rc; diff --git a/libmemcached/memcached_behavior.c b/libmemcached/memcached_behavior.c index c001c461..6b36626e 100644 --- a/libmemcached/memcached_behavior.c +++ b/libmemcached/memcached_behavior.c @@ -23,6 +23,9 @@ memcached_return memcached_behavior_set(memcached_st *ptr, { switch (flag) { + case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL: + set_behavior_flag(ptr, MEM_BINARY_PROTOCOL, data); + break; case MEMCACHED_BEHAVIOR_SUPPORT_CAS: set_behavior_flag(ptr, MEM_SUPPORT_CAS, data); break; @@ -118,6 +121,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr, switch (flag) { + case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL: + temp_flag= MEM_BINARY_PROTOCOL; + break; case MEMCACHED_BEHAVIOR_SUPPORT_CAS: temp_flag= MEM_SUPPORT_CAS; break; diff --git a/libmemcached/memcached_constants.h b/libmemcached/memcached_constants.h index 69c7666b..0ee2568f 100644 --- a/libmemcached/memcached_constants.h +++ b/libmemcached/memcached_constants.h @@ -88,6 +88,7 @@ typedef enum { MEMCACHED_BEHAVIOR_RETRY_TIMEOUT, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, MEMCACHED_BEHAVIOR_KETAMA_HASH, + MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, } memcached_behavior; typedef enum { diff --git a/libmemcached/memcached_delete.c b/libmemcached/memcached_delete.c index a105f552..5ec2fca3 100644 --- a/libmemcached/memcached_delete.c +++ b/libmemcached/memcached_delete.c @@ -7,6 +7,12 @@ memcached_return memcached_delete(memcached_st *ptr, const char *key, size_t key key, key_length, expiration); } +static inline memcached_return binary_delete(memcached_st *ptr, + unsigned int server_key, + const char *key, + size_t key_length, + int flush); + memcached_return memcached_delete_by_key(memcached_st *ptr, const char *master_key, size_t master_key_length, const char *key, size_t key_length, @@ -27,28 +33,33 @@ memcached_return memcached_delete_by_key(memcached_st *ptr, return MEMCACHED_NO_SERVERS; server_key= memcached_generate_hash(ptr, master_key, master_key_length); - - if (expiration) - send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "delete %s%.*s %llu\r\n", - ptr->prefix_key, - (int)key_length, key, - (unsigned long long)expiration); - else - send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "delete %s%.*s\r\n", - ptr->prefix_key, - (int)key_length, key); - - if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) + to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1; + + if (ptr->flags & MEM_BINARY_PROTOCOL) + rc= binary_delete(ptr, server_key, key, key_length, to_write); + else { - rc= MEMCACHED_WRITE_FAILURE; - goto error; + if (expiration) + send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, + "delete %s%.*s %llu\r\n", + ptr->prefix_key, + (int)key_length, key, + (unsigned long long)expiration); + else + send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, + "delete %s%.*s\r\n", + ptr->prefix_key, + (int)key_length, key); + + if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) + { + rc= MEMCACHED_WRITE_FAILURE; + goto error; + } + + rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write); } - to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1; - - rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write); if (rc != MEMCACHED_SUCCESS) goto error; @@ -70,3 +81,29 @@ error: LIBMEMCACHED_MEMCACHED_DELETE_END(); return rc; } + +static inline memcached_return binary_delete(memcached_st *ptr, + unsigned int server_key, + const char *key, + size_t key_length, + int flush) +{ + protocol_binary_request_delete request= {0}; + + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE; + request.message.header.request.keylen= htons((uint16_t)key_length); + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl(key_length); + + if ((memcached_do(&ptr->hosts[server_key], request.bytes, + sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) || + (memcached_io_write(&ptr->hosts[server_key], key, + key_length, flush) == -1)) + { + memcached_io_reset(&ptr->hosts[server_key]); + return MEMCACHED_WRITE_FAILURE; + } + + return MEMCACHED_SUCCESS; +} diff --git a/libmemcached/memcached_do.c b/libmemcached/memcached_do.c index 671b9fa3..7b9da5ed 100644 --- a/libmemcached/memcached_do.c +++ b/libmemcached/memcached_do.c @@ -1,6 +1,6 @@ #include "common.h" -memcached_return memcached_do(memcached_server_st *ptr, const char *command, +memcached_return memcached_do(memcached_server_st *ptr, const void *command, size_t command_length, uint8_t with_flush) { memcached_return rc; diff --git a/libmemcached/memcached_flush.c b/libmemcached/memcached_flush.c index 1511adbe..f722cc90 100644 --- a/libmemcached/memcached_flush.c +++ b/libmemcached/memcached_flush.c @@ -1,12 +1,30 @@ #include "common.h" +static memcached_return memcached_flush_binary(memcached_st *ptr, + time_t expiration); +static memcached_return memcached_flush_textual(memcached_st *ptr, + time_t expiration); + memcached_return memcached_flush(memcached_st *ptr, time_t expiration) +{ + memcached_return rc; + + LIBMEMCACHED_MEMCACHED_FLUSH_START(); + if (ptr->flags & MEM_BINARY_PROTOCOL) + rc= memcached_flush_binary(ptr, expiration); + else + rc= memcached_flush_textual(ptr, expiration); + LIBMEMCACHED_MEMCACHED_FLUSH_END(); + return rc; +} + +static memcached_return memcached_flush_textual(memcached_st *ptr, + time_t expiration) { unsigned int x; size_t send_length; memcached_return rc; char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - LIBMEMCACHED_MEMCACHED_FLUSH_START(); unlikely (ptr->number_of_hosts == 0) return MEMCACHED_NO_SERVERS; @@ -26,6 +44,40 @@ memcached_return memcached_flush(memcached_st *ptr, time_t expiration) (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); } - LIBMEMCACHED_MEMCACHED_FLUSH_END(); + return MEMCACHED_SUCCESS; +} + +static memcached_return memcached_flush_binary(memcached_st *ptr, + time_t expiration) +{ + unsigned int x; + protocol_binary_request_flush request= {0}; + + unlikely (ptr->number_of_hosts == 0) + return MEMCACHED_NO_SERVERS; + + request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; + request.message.header.request.extlen= 4; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl(request.message.header.request.extlen); + request.message.body.expiration= htonl(expiration); + + for (x= 0; x < ptr->number_of_hosts; x++) + { + if (memcached_do(&ptr->hosts[x], request.bytes, + sizeof(request.bytes), 1) != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[x]); + return MEMCACHED_WRITE_FAILURE; + } + } + + for (x= 0; x < ptr->number_of_hosts; x++) + { + if (memcached_server_response_count(&ptr->hosts[x]) > 0) + (void)memcached_response(&ptr->hosts[x], NULL, 0, NULL); + } + return MEMCACHED_SUCCESS; } diff --git a/libmemcached/memcached_get.c b/libmemcached/memcached_get.c index 670b2550..dcaa8988 100644 --- a/libmemcached/memcached_get.c +++ b/libmemcached/memcached_get.c @@ -102,6 +102,11 @@ memcached_return memcached_mget(memcached_st *ptr, return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys); } +static memcached_return binary_mget_by_key(memcached_st *ptr, + unsigned int master_server_key, + char **keys, size_t *key_length, + unsigned int number_of_keys); + memcached_return memcached_mget_by_key(memcached_st *ptr, const char *master_key, size_t master_key_length, @@ -159,6 +164,10 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result); } } + + if (ptr->flags & MEM_BINARY_PROTOCOL) + return binary_mget_by_key(ptr, master_server_key, keys, + key_length, number_of_keys); /* If a server fails we warn about errors and start all over with sending keys @@ -234,3 +243,90 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, LIBMEMCACHED_MEMCACHED_MGET_END(); return rc; } + +static memcached_return binary_mget_by_key(memcached_st *ptr, + unsigned int master_server_key, + char **keys, size_t *key_length, + unsigned int number_of_keys) +{ + memcached_return rc= MEMCACHED_NOTFOUND; + + int flush= number_of_keys == 1; + + /* + If a server fails we warn about errors and start all over with sending keys + to the server. + */ + for (int x= 0; x < number_of_keys; x++) + { + unsigned int server_key; + + if (master_server_key) + server_key= master_server_key; + else + server_key= memcached_generate_hash(ptr, keys[x], key_length[x]); + + if (memcached_server_response_count(&ptr->hosts[server_key]) == 0) + { + rc= memcached_connect(&ptr->hosts[server_key]); + if (rc != MEMCACHED_SUCCESS) + continue; + } + + protocol_binary_request_getk request= {0}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + if (number_of_keys == 1) + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK; + else + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ; + + request.message.header.request.keylen= htons((uint16_t)key_length[x]); + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl(key_length[x]); + + if ((memcached_io_write(&ptr->hosts[server_key], request.bytes, + sizeof(request.bytes), 0) == -1) || + (memcached_io_write(&ptr->hosts[server_key], keys[x], + key_length[x], flush) == -1)) + { + memcached_server_response_reset(&ptr->hosts[server_key]); + rc= MEMCACHED_SOME_ERRORS; + continue; + } + memcached_server_response_increment(&ptr->hosts[server_key]); + } + + if (number_of_keys > 1) + { + /* + * Send a noop command to flush the buffers + */ + protocol_binary_request_noop request= {0}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + + for (int x= 0; x < ptr->number_of_hosts; x++) + if (memcached_server_response_count(&ptr->hosts[x])) + { + if (memcached_io_write(&ptr->hosts[x], NULL, 0, 1) == -1) + { + memcached_server_response_reset(&ptr->hosts[x]); + memcached_io_reset(&ptr->hosts[x]); + rc= MEMCACHED_SOME_ERRORS; + } + + if (memcached_io_write(&ptr->hosts[x], request.bytes, + sizeof(request.bytes), 1) == -1) + { + memcached_server_response_reset(&ptr->hosts[x]); + memcached_io_reset(&ptr->hosts[x]); + rc= MEMCACHED_SOME_ERRORS; + } + memcached_server_response_increment(&ptr->hosts[x]); + } + } + + + return rc; +} diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 93ca4173..ea8b2581 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -75,7 +75,7 @@ void memcached_io_preread(memcached_st *ptr) #endif ssize_t memcached_io_read(memcached_server_st *ptr, - char *buffer, size_t length) + void *buffer, size_t length) { char *buffer_ptr; @@ -154,11 +154,11 @@ ssize_t memcached_io_read(memcached_server_st *ptr, break; } - return (size_t)(buffer_ptr - buffer); + return (size_t)(buffer_ptr - (char*)buffer); } ssize_t memcached_io_write(memcached_server_st *ptr, - const char *buffer, size_t length, char with_flush) + const void *buffer, size_t length, char with_flush) { size_t original_length; const char* buffer_ptr; diff --git a/libmemcached/memcached_io.h b/libmemcached/memcached_io.h index f7a72574..091405b1 100644 --- a/libmemcached/memcached_io.h +++ b/libmemcached/memcached_io.h @@ -2,8 +2,8 @@ #include ssize_t memcached_io_write(memcached_server_st *ptr, - const char *buffer, size_t length, char with_flush); + const void *buffer, size_t length, char with_flush); void memcached_io_reset(memcached_server_st *ptr); ssize_t memcached_io_read(memcached_server_st *ptr, - char *buffer, size_t length); + void *buffer, size_t length); memcached_return memcached_io_close(memcached_server_st *ptr, uint8_t io_death); diff --git a/libmemcached/memcached_quit.c b/libmemcached/memcached_quit.c index 9749b848..a3d114b3 100644 --- a/libmemcached/memcached_quit.c +++ b/libmemcached/memcached_quit.c @@ -19,9 +19,18 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death) ssize_t read_length; char buffer[MEMCACHED_MAX_BUFFER]; - rc= memcached_do(ptr, "quit\r\n", 6, 1); - WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED); + if (ptr->root->flags & MEM_BINARY_PROTOCOL) + { + protocol_binary_request_quit request = {0}; + request.message.header.request.magic = PROTOCOL_BINARY_REQ; + request.message.header.request.opcode = PROTOCOL_BINARY_CMD_QUIT; + request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES; + rc= memcached_do(ptr, request.bytes, sizeof(request.bytes), 1); + } else + rc= memcached_do(ptr, "quit\r\n", 6, 1); + WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED); + /* read until socket is closed, or there is an error * closing the socket before all data is read * results in server throwing away all data which is diff --git a/libmemcached/memcached_response.c b/libmemcached/memcached_response.c index 6f9e7c33..2c178e49 100644 --- a/libmemcached/memcached_response.c +++ b/libmemcached/memcached_response.c @@ -8,6 +8,10 @@ #include "common.h" #include "memcached_io.h" +static memcached_return binary_response(memcached_server_st *ptr, + char *buffer, size_t buffer_length, + memcached_result_st *result); + memcached_return memcached_response(memcached_server_st *ptr, char *buffer, size_t buffer_length, memcached_result_st *result) @@ -34,6 +38,9 @@ memcached_return memcached_response(memcached_server_st *ptr, if (ptr->root->flags & MEM_NO_BLOCK) (void)memcached_io_write(ptr, NULL, 0, 1); + if (ptr->root->flags & MEM_BINARY_PROTOCOL) + return binary_response(ptr, buffer, buffer_length, result); + max_messages= memcached_server_response_count(ptr); for (x= 0; x < max_messages; x++) { @@ -179,3 +186,195 @@ size_t memcached_result_length(memcached_result_st *ptr) memcached_string_st *sptr= &ptr->value; return memcached_string_length(sptr); } + +/** + * Read a given number of bytes from the server and place it into a specific + * buffer. Reset the IO channel or this server if an error occurs. + */ +static memcached_return safe_read(memcached_server_st *ptr, void *dta, + size_t size) +{ + int offset= 0; + char *data= dta; + while (offset < size) + { + ssize_t nread= memcached_io_read(ptr, data + offset, size - offset); + if (nread <= 0) + { + memcached_io_reset(ptr); + return MEMCACHED_UNKNOWN_READ_FAILURE; + } + offset += nread; + } + + return MEMCACHED_SUCCESS; +} + +static memcached_return binary_response(memcached_server_st *ptr, + char *buffer, + size_t buffer_length, + memcached_result_st *result) +{ + protocol_binary_response_header header; + memcached_server_response_decrement(ptr); + + unlikely (safe_read(ptr, &header.bytes, + sizeof(header.bytes)) != MEMCACHED_SUCCESS) + return MEMCACHED_UNKNOWN_READ_FAILURE; + + unlikely (header.response.magic != PROTOCOL_BINARY_RES) + { + memcached_io_reset(ptr); + return MEMCACHED_PROTOCOL_ERROR; + } + + /* + ** Convert the header to host local endian! + */ + header.response.keylen= ntohs(header.response.keylen); + header.response.status= ntohs(header.response.status); + header.response.bodylen= ntohl(header.response.bodylen); + header.response.cas= ntohll(header.response.cas); + uint32_t bodylen= header.response.bodylen; + + if (header.response.status == 0) + { + if ((header.response.opcode == PROTOCOL_BINARY_CMD_GETK) || + (header.response.opcode == PROTOCOL_BINARY_CMD_GETKQ)) + { + uint16_t keylen= header.response.keylen; + memcached_result_reset(result); + result->cas= header.response.cas; + + if (safe_read(ptr, &result->flags, + sizeof(result->flags)) != MEMCACHED_SUCCESS) + { + return MEMCACHED_UNKNOWN_READ_FAILURE; + } + result->flags= ntohl(result->flags); + bodylen -= header.response.extlen; + + result->key_length= keylen; + if (safe_read(ptr, result->key, keylen) != MEMCACHED_SUCCESS) + { + return MEMCACHED_UNKNOWN_READ_FAILURE; + } + + bodylen -= keylen; + if (memcached_string_check(&result->value, + bodylen) != MEMCACHED_SUCCESS) + { + memcached_io_reset(ptr); + return MEMCACHED_MEMORY_ALLOCATION_FAILURE; + } + + char *vptr= memcached_string_value(&result->value); + if (safe_read(ptr, vptr, bodylen) != MEMCACHED_SUCCESS) + { + return MEMCACHED_UNKNOWN_READ_FAILURE; + } + + memcached_string_set_length(&result->value, bodylen); + } + else if ((header.response.opcode == PROTOCOL_BINARY_CMD_INCREMENT) || + (header.response.opcode == PROTOCOL_BINARY_CMD_DECREMENT)) + { + if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t)) + { + return MEMCACHED_PROTOCOL_ERROR; + } + + WATCHPOINT_ASSERT(bodylen == buffer_length); + uint64_t val; + if (safe_read(ptr, &val, sizeof(val)) != MEMCACHED_SUCCESS) + { + return MEMCACHED_UNKNOWN_READ_FAILURE; + } + + val= ntohll(val); + memcpy(buffer, &val, sizeof(val)); + } + else if (header.response.opcode == PROTOCOL_BINARY_CMD_VERSION) + { + memset(buffer, 0, buffer_length); + if (bodylen >= buffer_length) + /* not enough space in buffer.. should not happen... */ + return MEMCACHED_UNKNOWN_READ_FAILURE; + else + safe_read(ptr, buffer, bodylen); + } + else if ((header.response.opcode == PROTOCOL_BINARY_CMD_FLUSH) || + (header.response.opcode == PROTOCOL_BINARY_CMD_QUIT) || + (header.response.opcode == PROTOCOL_BINARY_CMD_SET) || + (header.response.opcode == PROTOCOL_BINARY_CMD_ADD) || + (header.response.opcode == PROTOCOL_BINARY_CMD_REPLACE) || + (header.response.opcode == PROTOCOL_BINARY_CMD_APPEND) || + (header.response.opcode == PROTOCOL_BINARY_CMD_PREPEND) || + (header.response.opcode == PROTOCOL_BINARY_CMD_DELETE)) + { + WATCHPOINT_ASSERT(bodylen == 0); + return MEMCACHED_SUCCESS; + } + else if (header.response.opcode == PROTOCOL_BINARY_CMD_NOOP) + { + WATCHPOINT_ASSERT(bodylen == 0); + return MEMCACHED_END; + } + else if (header.response.opcode == PROTOCOL_BINARY_CMD_STAT) + { + if (bodylen == 0) + return MEMCACHED_END; + else if (bodylen + 1 > buffer_length) + /* not enough space in buffer.. should not happen... */ + return MEMCACHED_UNKNOWN_READ_FAILURE; + else + { + size_t keylen= header.response.keylen; + memset(buffer, 0, buffer_length); + safe_read(ptr, buffer, keylen); + safe_read(ptr, buffer + keylen + 1, bodylen - keylen); + } + } + else + { + /* Command not implemented yet! */ + WATCHPOINT_ASSERT(0); + memcached_io_reset(ptr); + return MEMCACHED_PROTOCOL_ERROR; + } + } + else if (header.response.bodylen) + { + /* What should I do with the error message??? just discard it for now */ + char hole[1024]; + while (bodylen > 0) + { + size_t nr= (bodylen > sizeof(hole)) ? sizeof(hole) : bodylen; + safe_read(ptr, hole, nr); + bodylen -= nr; + } + } + + memcached_return rc= MEMCACHED_SUCCESS; + unlikely(header.response.status != 0) + switch (header.response.status) + { + case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: + rc= MEMCACHED_NOTFOUND; + break; + case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: + rc= MEMCACHED_DATA_EXISTS; + break; + case PROTOCOL_BINARY_RESPONSE_E2BIG: + case PROTOCOL_BINARY_RESPONSE_EINVAL: + case PROTOCOL_BINARY_RESPONSE_NOT_STORED: + case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: + case PROTOCOL_BINARY_RESPONSE_ENOMEM: + default: + /* @todo fix the error mappings */ + rc= MEMCACHED_PROTOCOL_ERROR; + break; + } + + return rc; +} diff --git a/libmemcached/memcached_stats.c b/libmemcached/memcached_stats.c index bbc82b6e..f8cfdbda 100644 --- a/libmemcached/memcached_stats.c +++ b/libmemcached/memcached_stats.c @@ -211,7 +211,69 @@ char *memcached_stat_get_value(memcached_st *ptr, memcached_stat_st *stat, return ret; } -static memcached_return memcached_stats_fetch(memcached_st *ptr, +static memcached_return binary_stats_fetch(memcached_st *ptr, + memcached_stat_st *stat, + char *args, + unsigned int server_key) +{ + memcached_return rc; + + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + protocol_binary_request_stats request= {0}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_STAT; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + + if (args != NULL) + { + int len= strlen(args); + request.message.header.request.keylen= htons((uint16_t)len); + request.message.header.request.bodylen= htonl(len); + + if ((memcached_do(&ptr->hosts[server_key], request.bytes, + sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) || + (memcached_io_write(&ptr->hosts[server_key], args, len, 1) == -1)) + { + memcached_io_reset(&ptr->hosts[server_key]); + return MEMCACHED_WRITE_FAILURE; + } + } + else + { + if (memcached_do(&ptr->hosts[server_key], request.bytes, + sizeof(request.bytes), 1) != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[server_key]); + return MEMCACHED_WRITE_FAILURE; + } + } + + memcached_server_response_decrement(&ptr->hosts[server_key]); + do + { + rc= memcached_response(&ptr->hosts[server_key], buffer, + sizeof(buffer), NULL); + if (rc == MEMCACHED_END) + break; + + unlikely (rc != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[server_key]); + return rc; + } + + set_data(stat, buffer, buffer + strlen(buffer) + 1); + } while (1); + + /* shit... memcached_response will decrement the counter, so I need to + ** reset it.. todo: look at this and try to find a better solution. + */ + ptr->hosts[server_key].cursor_active= 0; + + return MEMCACHED_SUCCESS; +} + +static memcached_return ascii_stats_fetch(memcached_st *ptr, memcached_stat_st *stat, char *args, unsigned int server_key) @@ -294,8 +356,12 @@ memcached_stat_st *memcached_stat(memcached_st *ptr, char *args, memcached_retur for (x= 0; x < ptr->number_of_hosts; x++) { memcached_return temp_return; - - temp_return= memcached_stats_fetch(ptr, stats + x, args, x); + + if (ptr->flags & MEM_BINARY_PROTOCOL) + temp_return= binary_stats_fetch(ptr, stats + x, args, x); + else + temp_return= ascii_stats_fetch(ptr, stats + x, args, x); + if (temp_return != MEMCACHED_SUCCESS) rc= MEMCACHED_SOME_ERRORS; } @@ -314,7 +380,10 @@ memcached_return memcached_stat_servername(memcached_stat_st *stat, char *args, memcached_server_add(&memc, hostname, port); - rc= memcached_stats_fetch(&memc, stat, args, 0); + if (memc.flags & MEM_BINARY_PROTOCOL) + rc= binary_stats_fetch(&memc, stat, args, 0); + else + rc= ascii_stats_fetch(&memc, stat, args, 0); memcached_free(&memc); diff --git a/libmemcached/memcached_storage.c b/libmemcached/memcached_storage.c index 04f281cc..326f6fe6 100644 --- a/libmemcached/memcached_storage.c +++ b/libmemcached/memcached_storage.c @@ -6,7 +6,6 @@ memcached_add() */ - #include "common.h" #include "memcached_io.h" @@ -43,6 +42,16 @@ static char *storage_op_string(memcached_storage_action verb) return SET_OP; } +static memcached_return memcached_send_binary(memcached_server_st* server, + const char *key, + size_t key_length, + const char *value, + size_t value_length, + time_t expiration, + uint32_t flags, + uint64_t cas, + memcached_storage_action verb); + static inline memcached_return memcached_send(memcached_st *ptr, const char *master_key, size_t master_key_length, const char *key, size_t key_length, @@ -72,6 +81,11 @@ static inline memcached_return memcached_send(memcached_st *ptr, server_key= memcached_generate_hash(ptr, master_key, master_key_length); + if (ptr->flags & MEM_BINARY_PROTOCOL) + return memcached_send_binary(&ptr->hosts[server_key], key, key_length, + value, value_length, expiration, + flags, cas, verb); + if (cas) write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, "%s %s%.*s %u %llu %zu %llu\r\n", storage_op_string(verb), @@ -129,6 +143,7 @@ error: return rc; } + memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length, const char *value, size_t value_length, time_t expiration, @@ -303,3 +318,74 @@ memcached_return memcached_cas_by_key(memcached_st *ptr, expiration, flags, cas, CAS_OP); return rc; } + +static memcached_return memcached_send_binary(memcached_server_st* server, + const char *key, + size_t key_length, + const char *value, + size_t value_length, + time_t expiration, + uint32_t flags, + uint64_t cas, + memcached_storage_action verb) +{ + protocol_binary_request_set request= {0}; + size_t send_length= sizeof(request.bytes); + + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + switch (verb) + { + case SET_OP: + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SET; + break; + case ADD_OP: + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_ADD; + break; + case REPLACE_OP: + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE; + break; + case APPEND_OP: + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_APPEND; + break; + case PREPEND_OP: + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_PREPEND; + break; + case CAS_OP: + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE; + break; + default: + abort(); + } + + request.message.header.request.keylen= htons((uint16_t)key_length); + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + if (verb == APPEND_OP || verb == PREPEND_OP) + send_length -= 8; /* append & prepend does not contain extras! */ + else { + request.message.header.request.extlen= 8; + request.message.body.flags= htonl(flags); + request.message.body.expiration= htonl((uint32_t)expiration); + } + + request.message.header.request.bodylen= htonl(key_length + value_length + + request.message.header.request.extlen); + + if (cas) + request.message.header.request.cas= htonll(cas); + + char flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1; + /* write the header */ + if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) || + (memcached_io_write(server, key, key_length, 0) == -1) || + (memcached_io_write(server, value, value_length, flush) == -1)) + { + memcached_io_reset(server); + return MEMCACHED_WRITE_FAILURE; + } + + if (flush == 0) + return MEMCACHED_BUFFERED; + + return memcached_response(server, NULL, 0, NULL); +} + diff --git a/libmemcached/memcached_version.c b/libmemcached/memcached_version.c index e49d76f0..182ff14e 100644 --- a/libmemcached/memcached_version.c +++ b/libmemcached/memcached_version.c @@ -5,7 +5,18 @@ const char * memcached_lib_version(void) return LIBMEMCACHED_VERSION_STRING; } +static inline memcached_return memcached_version_binary(memcached_st *ptr); +static inline memcached_return memcached_version_textual(memcached_st *ptr); + memcached_return memcached_version(memcached_st *ptr) +{ + if (ptr->flags & MEM_BINARY_PROTOCOL) + return memcached_version_binary(ptr); + else + return memcached_version_textual(ptr); +} + +static inline memcached_return memcached_version_textual(memcached_st *ptr) { unsigned int x; size_t send_length; @@ -47,3 +58,48 @@ memcached_return memcached_version(memcached_st *ptr) return rc; } + +static inline memcached_return memcached_version_binary(memcached_st *ptr) +{ + memcached_return rc; + unsigned int x; + protocol_binary_request_version request= {0}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_VERSION; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + + rc= MEMCACHED_SUCCESS; + for (x= 0; x < ptr->number_of_hosts; x++) + { + memcached_return rrc; + + rrc= memcached_do(&ptr->hosts[x], request.bytes, sizeof(request.bytes), 1); + if (rrc != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[x]); + rc= MEMCACHED_SOME_ERRORS; + continue; + } + } + + for (x= 0; x < ptr->number_of_hosts; x++) + if (memcached_server_response_count(&ptr->hosts[x]) > 0) + { + memcached_return rrc; + char buffer[32]; + char *p; + + rrc= memcached_response(&ptr->hosts[x], buffer, sizeof(buffer), NULL); + if (rrc != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[x]); + rc= MEMCACHED_SOME_ERRORS; + } + + ptr->hosts[x].major_version= (uint8_t)strtol(buffer, &p, 10); + ptr->hosts[x].minor_version= (uint8_t)strtol(p + 1, &p, 10); + ptr->hosts[x].micro_version= (uint8_t)strtol(p + 1, NULL, 10); + } + + return rc; +} diff --git a/tests/function.c b/tests/function.c index 0b2c15d4..e37aabef 100644 --- a/tests/function.c +++ b/tests/function.c @@ -504,7 +504,7 @@ test_return add_test(memcached_st *memc) if (setting_value) assert(rc == MEMCACHED_NOTSTORED || rc == MEMCACHED_STORED); else - assert(rc == MEMCACHED_NOTSTORED); + assert(rc == MEMCACHED_NOTSTORED || rc == MEMCACHED_DATA_EXISTS); return 0; } @@ -2594,6 +2594,19 @@ memcached_return pre_behavior_ketama_weighted(memcached_st *memc) assert(value == MEMCACHED_HASH_MD5); return MEMCACHED_SUCCESS; } + +memcached_return pre_binary(memcached_st *memc) +{ + memcached_return rc; + + rc = memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1); + assert(rc == MEMCACHED_SUCCESS); + + assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) == 1); + + return MEMCACHED_SUCCESS; + +} void my_free(memcached_st *ptr, void *mem) { free(mem); @@ -2736,8 +2749,8 @@ memcached_return enable_cas(memcached_st *memc) memcached_version(memc); if (memc->hosts[0].major_version >= 1 && - memc->hosts[0].minor_version >= 2 && - memc->hosts[0].micro_version >= 4) + (memc->hosts[0].minor_version == 2 && + memc->hosts[0].micro_version >= 4) || memc->hosts[0].minor_version > 2) { memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, set); @@ -2752,8 +2765,8 @@ memcached_return check_for_1_2_3(memcached_st *memc) memcached_version(memc); if (memc->hosts[0].major_version >= 1 && - memc->hosts[0].minor_version >= 2 && - memc->hosts[0].micro_version >= 4) + (memc->hosts[0].minor_version == 2 && memc->hosts[0].micro_version >= 4) + || memc->hosts[0].minor_version > 2) return MEMCACHED_SUCCESS; return MEMCACHED_FAILURE; @@ -2946,6 +2959,7 @@ test_st consistent_weighted_tests[] ={ collection_st collection[] ={ {"block", 0, 0, tests}, + {"binary", pre_binary, 0, tests}, {"nonblock", pre_nonblock, 0, tests}, {"nodelay", pre_nodelay, 0, tests}, {"md5", pre_md5, 0, tests}, -- 2.30.2