From: Brian Aker Date: Sun, 21 Jun 2009 15:51:17 +0000 (-0700) Subject: Merge Monty X-Git-Tag: 0.31~14 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=f63d634e21459280599f663154089fa724903bd6;p=m6w6%2Flibmemcached Merge Monty --- f63d634e21459280599f663154089fa724903bd6 diff --cc config/bootstrap index d62ca6ba,00000000..43095ae4 mode 100755,000000..120000 --- a/config/bootstrap +++ b/config/bootstrap @@@ -1,106 -1,0 +1,1 @@@ - #!/usr/bin/env bash - # Taken from lighthttpd server (BSD). Thanks Jan! - # Run this to generate all the initial makefiles, etc. - - die() { echo "$@"; exit 1; } - - # LIBTOOLIZE=${LIBTOOLIZE:-libtoolize} - LIBTOOLIZE_FLAGS=" --automake --copy --force" - # ACLOCAL=${ACLOCAL:-aclocal} - ACLOCAL_FLAGS="-I m4" - # AUTOHEADER=${AUTOHEADER:-autoheader} - # AUTOMAKE=${AUTOMAKE:-automake} - AUTOMAKE_FLAGS="--add-missing --copy --force" - # AUTOCONF=${AUTOCONF:-autoconf} - - ARGV0=$0 - ARGS="$@" - - - run() { - echo "$ARGV0: running \`$@' $ARGS" - $@ $ARGS - } - - ## jump out if one of the programs returns 'false' - set -e - - ## We do not currently support glibtoolize - if test x$LIBTOOLIZE = x; then - if test \! "x`which glibtoolize 2> /dev/null | grep -v '^no'`" = x; then - LIBTOOLIZE=glibtoolize - elif test \! "x`which libtoolize-1.5 2> /dev/null | grep -v '^no'`" = x; then - LIBTOOLIZE=libtoolize-1.5 - elif test \! "x`which libtoolize 2> /dev/null | grep -v '^no'`" = x; then - LIBTOOLIZE=libtoolize - else - echo "libtoolize 1.5.x wasn't found, exiting"; exit 0 - fi - fi - - ## suse has aclocal and aclocal-1.9 - if test x$ACLOCAL = x; then - if test \! "x`which aclocal-1.10 2> /dev/null | grep -v '^no'`" = x; then - ACLOCAL=aclocal-1.10 - elif test \! "x`which aclocal-1.9 2> /dev/null | grep -v '^no'`" = x; then - ACLOCAL=aclocal-1.9 - elif test \! "x`which aclocal19 2> /dev/null | grep -v '^no'`" = x; then - ACLOCAL=aclocal19 - elif test \! "x`which aclocal 2> /dev/null | grep -v '^no'`" = x; then - ACLOCAL=aclocal - else - echo "automake 1.9.x (aclocal) wasn't found, exiting"; exit 0 - fi - fi - - if test x$AUTOMAKE = x; then - if test \! "x`which automake-1.10 2> /dev/null | grep -v '^no'`" = x; then - AUTOMAKE=automake-1.10 - elif test \! "x`which automake-1.9 2> /dev/null | grep -v '^no'`" = x; then - AUTOMAKE=automake-1.9 - elif test \! "x`which automake19 2> /dev/null | grep -v '^no'`" = x; then - AUTOMAKE=automake19 - elif test \! "x`which automake 2> /dev/null | grep -v '^no'`" = x; then - AUTOMAKE=automake - else - echo "automake 1.9.x wasn't found, exiting"; exit 0 - fi - fi - - - ## macosx has autoconf-2.59 and autoconf-2.60 - if test x$AUTOCONF = x; then - if test \! "x`which autoconf-2.59 2> /dev/null | grep -v '^no'`" = x; then - AUTOCONF=autoconf-2.59 - elif test \! "x`which autoconf259 2> /dev/null | grep -v '^no'`" = x; then - AUTOCONF=autoconf259 - elif test \! "x`which autoconf 2> /dev/null | grep -v '^no'`" = x; then - AUTOCONF=autoconf - else - echo "autoconf 2.59+ wasn't found, exiting"; exit 0 - fi - fi - - if test x$AUTOHEADER = x; then - if test \! "x`which autoheader-2.59 2> /dev/null | grep -v '^no'`" = x; then - AUTOHEADER=autoheader-2.59 - elif test \! "x`which autoheader259 2> /dev/null | grep -v '^no'`" = x; then - AUTOHEADER=autoheader259 - elif test \! "x`which autoheader 2> /dev/null | grep -v '^no'`" = x; then - AUTOHEADER=autoheader - else - echo "autoconf 2.59+ (autoheader) wasn't found, exiting"; exit 0 - fi - fi - - - # --force means overwrite ltmain.sh script if it already exists - run $LIBTOOLIZE $LIBTOOLIZE_FLAGS || die "Can't execute libtoolize" - - run $ACLOCAL $ACLOCAL_FLAGS || die "Can't execute aclocal" - run $AUTOHEADER || die "Can't execute autoheader" - - # --add-missing instructs automake to install missing auxiliary files - # and --force to overwrite them if they already exist - run $AUTOMAKE $AUTOMAKE_FLAGS || die "Can't execute automake" - run $AUTOCONF || die "Can't execute autoconf" ++autorun.sh diff --cc libmemcached/memcached_delete.c index 93b1eb7a,f2fab322..5e2a0132 --- a/libmemcached/memcached_delete.c +++ b/libmemcached/memcached_delete.c @@@ -126,26 -123,8 +126,26 @@@ static inline memcached_return binary_d key_length, flush) == -1)) { memcached_io_reset(&ptr->hosts[server_key]); - return MEMCACHED_WRITE_FAILURE; + rc= MEMCACHED_WRITE_FAILURE; + } + + unlikely (ptr->number_of_replicas > 0) + { + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ; + - for (int x= 0; x < ptr->number_of_replicas; ++x) ++ for (uint32_t x= 0; x < ptr->number_of_replicas; ++x) + { + ++server_key; + if (server_key == ptr->number_of_hosts) + server_key= 0; + + memcached_server_st* server= &ptr->hosts[server_key]; + if ((memcached_do(server, (const char*)request.bytes, + sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) || + (memcached_io_write(server, key, key_length, flush) == -1)) + memcached_io_reset(server); + } } - return MEMCACHED_SUCCESS; + return rc; } diff --cc libmemcached/memcached_get.c index 0209b283,05b317a6..e91db4cf --- a/libmemcached/memcached_get.c +++ b/libmemcached/memcached_get.c @@@ -357,148 -357,3 +357,148 @@@ static memcached_return simple_binary_m return rc; } + +static memcached_return replication_binary_mget(memcached_st *ptr, + uint32_t* hash, bool* dead_servers, + char **keys, size_t *key_length, + unsigned int number_of_keys) +{ + memcached_return rc= MEMCACHED_NOTFOUND; + uint32_t x; + + int flush= number_of_keys == 1; + - for (int replica = 0; replica <= ptr->number_of_replicas; ++replica) ++ for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica) + { + bool success= true; + - for (uint32_t x= 0; x < number_of_keys; ++x) ++ for (x= 0; x < number_of_keys; ++x) + { + if (hash[x] == ptr->number_of_hosts) + continue; /* Already successfully sent */ + + uint32_t server= hash[x] + replica; + while (server >= ptr->number_of_hosts) + server -= ptr->number_of_hosts; + + if (dead_servers[server]) + continue; + + if (memcached_server_response_count(&ptr->hosts[server]) == 0) + { + rc= memcached_connect(&ptr->hosts[server]); + if (rc != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[server]); + dead_servers[server]= true; + success= false; + continue; + } + } + + protocol_binary_request_getk request= {.bytes= {0}}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + if (number_of_keys == 1) + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK; + else + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ; + + request.message.header.request.keylen= htons((uint16_t)key_length[x]); + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl(key_length[x]); + + if ((memcached_io_write(&ptr->hosts[server], request.bytes, + sizeof(request.bytes), 0) == -1) || + (memcached_io_write(&ptr->hosts[server], keys[x], + key_length[x], flush) == -1)) + { + memcached_io_reset(&ptr->hosts[server]); + dead_servers[server]= true; + success= false; + continue; + } + memcached_server_response_increment(&ptr->hosts[server]); + } + + if (number_of_keys > 1) + { + /* + * Send a noop command to flush the buffers + */ + protocol_binary_request_noop request= {.bytes= {0}}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + + for (x= 0; x < ptr->number_of_hosts; x++) + if (memcached_server_response_count(&ptr->hosts[x])) + { + if (memcached_io_write(&ptr->hosts[x], request.bytes, + sizeof(request.bytes), 1) == -1) + { + memcached_io_reset(&ptr->hosts[x]); + dead_servers[x]= true; + success= false; + } + memcached_server_response_increment(&ptr->hosts[x]); + + /* mark all of the messages bound for this server as sent! */ - for (uint32_t x= 0; x < number_of_keys; ++x) ++ for (x= 0; x < number_of_keys; ++x) + if (hash[x] == x) + hash[x]= ptr->number_of_hosts; + } + } + + if (success) + break; + } + + return rc; +} + +static memcached_return binary_mget_by_key(memcached_st *ptr, + unsigned int master_server_key, + bool is_master_key_set, + char **keys, size_t *key_length, + unsigned int number_of_keys) +{ + memcached_return rc; + + if (ptr->number_of_replicas == 0) + { + rc= simple_binary_mget(ptr, master_server_key, is_master_key_set, + keys, key_length, number_of_keys); + } + else + { + uint32_t* hash; + bool* dead_servers; + + hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys); + dead_servers= ptr->call_calloc(ptr, ptr->number_of_hosts, sizeof(bool)); + + if (hash == NULL || dead_servers == NULL) + { + ptr->call_free(ptr, hash); + ptr->call_free(ptr, dead_servers); + return MEMCACHED_MEMORY_ALLOCATION_FAILURE; + } + + if (is_master_key_set) + for (unsigned int x= 0; x < number_of_keys; x++) + hash[x]= master_server_key; + else + for (unsigned int x= 0; x < number_of_keys; x++) + hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]); + + rc= replication_binary_mget(ptr, hash, dead_servers, keys, + key_length, number_of_keys); + + ptr->call_free(ptr, hash); + ptr->call_free(ptr, dead_servers); + + return MEMCACHED_SUCCESS; + } + + return rc; +} diff --cc libmemcached/memcached_storage.c index 713f831d,dd7e23b0..b950dc92 --- a/libmemcached/memcached_storage.c +++ b/libmemcached/memcached_storage.c @@@ -469,26 -467,7 +475,26 @@@ static memcached_return memcached_send_ memcached_io_reset(server); return MEMCACHED_WRITE_FAILURE; } - + + unlikely (verb == SET_OP && ptr->number_of_replicas > 0) + { + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ; + - for (int x= 0; x < ptr->number_of_replicas; ++x) ++ for (uint32_t x= 0; x < ptr->number_of_replicas; x++) + { + ++server_key; + if (server_key == ptr->number_of_hosts) + server_key= 0; + + memcached_server_st *srv= &ptr->hosts[server_key]; + if ((memcached_do(srv, (const char*)request.bytes, + send_length, 0) != MEMCACHED_SUCCESS) || + (memcached_io_write(srv, key, key_length, 0) == -1) || + (memcached_io_write(srv, value, value_length, flush) == -1)) + memcached_io_reset(server); + } + } + if (flush == 0) return MEMCACHED_BUFFERED; diff --cc tests/function.c index 5f83e1ea,fcf5b511..c505b111 --- a/tests/function.c +++ b/tests/function.c @@@ -3564,177 -3550,6 +3571,179 @@@ static test_return connection_pool_test } #endif +static test_return replication_set_test(memcached_st *memc) +{ + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0); + + rc= memcached_set(memc, "bubba", 5, "0", 1, 0, 0); + assert(rc == MEMCACHED_SUCCESS); + + /* + ** "bubba" should now be stored on all of our servers. We don't have an + ** easy to use API to address each individual server, so I'll just iterate + ** through a bunch of "master keys" and I should most likely hit all of the + ** servers... + */ + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + size_t len; + uint32_t flags; + char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, + &len, &flags, &rc); + assert(rc == MEMCACHED_SUCCESS); + assert(val != NULL); + free(val); + } + + memcached_free(clone); + + return TEST_SUCCESS; +} + +static test_return replication_get_test(memcached_st *memc) +{ + memcached_return rc; + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ - for (int host= 0; host < memc->number_of_hosts; ++host) { ++ for (uint32_t host= 0; host < memc->number_of_hosts; ++host) ++ { + memcached_st *clone= memcached_clone(NULL, memc); + clone->hosts[host].port= 0; + + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + size_t len; + uint32_t flags; + char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, + &len, &flags, &rc); + assert(rc == MEMCACHED_SUCCESS); + assert(val != NULL); + free(val); + } + + memcached_free(clone); + } + + return TEST_SUCCESS; +} + +static test_return replication_mget_test(memcached_st *memc) +{ + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0); + + char *keys[]= { "bubba", "key1", "key2", "key3" }; + size_t len[]= { 5, 4, 4, 4 }; + + for (int x=0; x< 4; ++x) + { + rc= memcached_set(memc, keys[x], len[x], "0", 1, 0, 0); + assert(rc == MEMCACHED_SUCCESS); + } + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ + memcached_result_st result_obj; - for (int host= 0; host < clone->number_of_hosts; ++host) ++ for (uint32_t host= 0; host < clone->number_of_hosts; host++) + { - memcached_st *clone= memcached_clone(NULL, memc); - clone->hosts[host].port= 0; ++ memcached_st *new_clone= memcached_clone(NULL, memc); ++ new_clone->hosts[host].port= 0; + + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + - rc= memcached_mget_by_key(clone, key, 1, keys, len, 4); ++ rc= memcached_mget_by_key(new_clone, key, 1, keys, len, 4); + assert(rc == MEMCACHED_SUCCESS); + - memcached_result_st *results= memcached_result_create(clone, &result_obj); ++ memcached_result_st *results= memcached_result_create(new_clone, &result_obj); + assert(results); + + int hits= 0; - while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL) ++ while ((results= memcached_fetch_result(new_clone, &result_obj, &rc)) != NULL) + { + hits++; + } + assert(hits == 4); + memcached_result_free(&result_obj); + } + - memcached_free(clone); ++ memcached_free(new_clone); + } + + return TEST_SUCCESS; +} + +static test_return replication_delete_test(memcached_st *memc) +{ + memcached_return rc; + memcached_st *clone= memcached_clone(NULL, memc); + /* Delete the items from all of the servers except 1 */ + uint64_t repl= memcached_behavior_get(memc, + MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, --repl); + + char *keys[]= { "bubba", "key1", "key2", "key3" }; + size_t len[]= { 5, 4, 4, 4 }; + + for (int x=0; x< 4; ++x) + { + rc= memcached_delete_by_key(memc, keys[0], len[0], keys[x], len[x], 0); + assert(rc == MEMCACHED_SUCCESS); + } + + /* + * Don't do the following in your code. I am abusing the internal details + * within the library, and this is not a supported interface. + * This is to verify correct behavior in the library + */ - int hash= memcached_generate_hash(memc, keys[0], len[0]); - for (int x= 0; x < (repl + 1); ++x) { ++ uint32_t hash= memcached_generate_hash(memc, keys[0], len[0]); ++ for (uint32_t x= 0; x < (repl + 1); ++x) ++ { + clone->hosts[hash].port= 0; + if (++hash == clone->number_of_hosts) + hash= 0; + } + + memcached_result_st result_obj; - for (int host= 0; host < clone->number_of_hosts; ++host) ++ for (uint32_t host= 0; host < clone->number_of_hosts; ++host) + { + for (int x= 'a'; x <= 'z'; ++x) + { + char key[2]= { [0]= (char)x }; + + rc= memcached_mget_by_key(clone, key, 1, keys, len, 4); + assert(rc == MEMCACHED_SUCCESS); + + memcached_result_st *results= memcached_result_create(clone, &result_obj); + assert(results); + + int hits= 0; + while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL) + { + ++hits; + } + assert(hits == 4); + memcached_result_free(&result_obj); + } + } + memcached_free(clone); + + return TEST_SUCCESS; +} + static void increment_request_id(uint16_t *id) { (*id)++;