From: Brian Aker Date: Thu, 22 Apr 2010 00:49:09 +0000 (-0700) Subject: Small updates/wins around non-block IO X-Git-Tag: 0.40~3 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=ea260e7bce23c9a41c3c60fd68f55b33608714a9;p=awesomized%2Flibmemcached Small updates/wins around non-block IO --- diff --git a/clients/generator.c b/clients/generator.c index 2af06010..6e2f2795 100644 --- a/clients/generator.c +++ b/clients/generator.c @@ -42,14 +42,15 @@ void pairs_free(pairs_st *pairs) { uint32_t x; - if (!pairs) + if (! pairs) return; /* We free until we hit the null pair we stores during creation */ for (x= 0; pairs[x].key; x++) { free(pairs[x].key); - free(pairs[x].value); + if (pairs[x].value) + free(pairs[x].value); } free(pairs); @@ -73,11 +74,19 @@ pairs_st *pairs_generate(uint64_t number_of, size_t value_length) get_random_string(pairs[x].key, 100); pairs[x].key_length= 100; - pairs[x].value= (char *)calloc(value_length, sizeof(char)); - if (!pairs[x].value) - goto error; - get_random_string(pairs[x].value, value_length); - pairs[x].value_length= value_length; + if (value_length) + { + pairs[x].value= (char *)calloc(value_length, sizeof(char)); + if (!pairs[x].value) + goto error; + get_random_string(pairs[x].value, value_length); + pairs[x].value_length= value_length; + } + else + { + pairs[x].value= NULL; + pairs[x].value_length= 0; + } } return pairs; diff --git a/libmemcached/constants.h b/libmemcached/constants.h index d6798d0d..b12a3890 100644 --- a/libmemcached/constants.h +++ b/libmemcached/constants.h @@ -21,7 +21,7 @@ #define MEMCACHED_POINTS_PER_SERVER_KETAMA 160 #define MEMCACHED_CONTINUUM_SIZE MEMCACHED_POINTS_PER_SERVER*100 /* This would then set max hosts to 100 */ #define MEMCACHED_STRIDE 4 -#define MEMCACHED_DEFAULT_TIMEOUT 1000 +#define MEMCACHED_DEFAULT_TIMEOUT 5000 #define MEMCACHED_DEFAULT_CONNECT_TIMEOUT 4000 #define MEMCACHED_CONTINUUM_ADDITION 10 /* How many extra slots we should build for in the continuum */ #define MEMCACHED_PREFIX_KEY_MAX_SIZE 128 diff --git a/libmemcached/io.c b/libmemcached/io.c index 070af6b1..1c8783b2 100644 --- a/libmemcached/io.c +++ b/libmemcached/io.c @@ -32,8 +32,15 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, }; int error; - unlikely (read_or_write == MEM_WRITE) /* write */ + if (read_or_write == MEM_WRITE) /* write */ + { fds.events= POLLOUT; + WATCHPOINT_SET(ptr->io_wait_count.write++); + } + else + { + WATCHPOINT_SET(ptr->io_wait_count.read++); + } /* ** We are going to block on write, but at least on Solaris we might block @@ -62,14 +69,22 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, switch (error) { case 1: + WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); + WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); + return MEMCACHED_SUCCESS; case 0: - continue; + return MEMCACHED_TIMEOUT; default: WATCHPOINT_ERRNO(errno); { switch (errno) { +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + case EINTR: + continue; default: ptr->cached_errno= error; memcached_quit_server(ptr, true); @@ -257,7 +272,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, { data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER); if (data_read > 0) + { break; + } else if (data_read == -1) { ptr->cached_errno= errno; @@ -589,6 +606,7 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, { ptr->cached_errno= errno; WATCHPOINT_ERRNO(errno); + WATCHPOINT_NUMBER(errno); switch (errno) { case ENOBUFS: diff --git a/libmemcached/io.h b/libmemcached/io.h index b614d818..30145823 100644 --- a/libmemcached/io.h +++ b/libmemcached/io.h @@ -46,7 +46,6 @@ struct __write_vector_st const void *buffer; }; - LIBMEMCACHED_LOCAL ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, const struct __write_vector_st *vector, diff --git a/libmemcached/quit.c b/libmemcached/quit.c index 49af9963..18a9a316 100644 --- a/libmemcached/quit.c +++ b/libmemcached/quit.c @@ -40,10 +40,17 @@ void memcached_quit_server(memcached_server_st *ptr, bool io_death) * closing the socket before all data is read * results in server throwing away all data which is * not read + * + * In .40 we began to only do this if we had been doing buffered + * requests of had replication enabled. */ - ssize_t nread; - while (memcached_io_read(ptr, buffer, sizeof(buffer)/sizeof(*buffer), - &nread) == MEMCACHED_SUCCESS); + if (ptr->root->flags.buffer_requests || ptr->root->number_of_replicas) + { + ssize_t nread; + while (memcached_io_read(ptr, buffer, sizeof(buffer)/sizeof(*buffer), + &nread) == MEMCACHED_SUCCESS); + } + /* * memcached_io_read may call memcached_quit_server with io_death if diff --git a/libmemcached/server.c b/libmemcached/server.c index aac5a50e..0461c3c7 100644 --- a/libmemcached/server.c +++ b/libmemcached/server.c @@ -30,6 +30,8 @@ static inline void _server_init(memcached_server_st *self, const memcached_st *r self->weight= weight; self->state.is_corked= false; self->state.is_dead= false; + WATCHPOINT_SET(self->io_wait_count.read= 0); + WATCHPOINT_SET(self->io_wait_count.write= 0); self->major_version= 0; self->micro_version= 0; self->minor_version= 0; diff --git a/libmemcached/server.h b/libmemcached/server.h index 6a2ae5ee..eabf7f0f 100644 --- a/libmemcached/server.h +++ b/libmemcached/server.h @@ -32,6 +32,10 @@ struct memcached_server_st { bool is_corked:1; bool is_dead:1; } state; + struct { + uint32_t read; + uint32_t write; + } io_wait_count; uint8_t major_version; uint8_t micro_version; uint8_t minor_version; diff --git a/libmemcached/storage.c b/libmemcached/storage.c index be0040a4..dce5fec3 100644 --- a/libmemcached/storage.c +++ b/libmemcached/storage.c @@ -45,8 +45,8 @@ static inline const char *storage_op_string(memcached_storage_action_t verb) } static memcached_return_t memcached_send_binary(memcached_st *ptr, - const char *master_key, - size_t master_key_length, + memcached_server_write_instance_st server, + uint32_t server_key, const char *key, size_t key_length, const char *value, @@ -84,108 +84,122 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, if (ptr->flags.verify_key && (memcached_key_test((const char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) return MEMCACHED_BAD_KEY_PROVIDED; - if (ptr->flags.binary_protocol) - { - return memcached_send_binary(ptr, master_key, master_key_length, - key, key_length, - value, value_length, expiration, - flags, cas, verb); - } - server_key= memcached_generate_hash_with_redistribution(ptr, master_key, master_key_length); instance= memcached_server_instance_fetch(ptr, server_key); - if (cas) - { - write_length= (size_t) snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, - "%s %.*s%.*s %u %llu %zu %llu%s\r\n", - storage_op_string(verb), - (int)ptr->prefix_key_length, - ptr->prefix_key, - (int)key_length, key, flags, - (unsigned long long)expiration, value_length, - (unsigned long long)cas, - (ptr->flags.no_reply) ? " noreply" : ""); - } - else - { - char *buffer_ptr= buffer; - const char *command= storage_op_string(verb); - - /* Copy in the command, no space needed, we handle that in the command function*/ - memcpy(buffer_ptr, command, strlen(command)); - - /* Copy in the key prefix, switch to the buffer_ptr */ - buffer_ptr= memcpy((buffer_ptr + strlen(command)), ptr->prefix_key, ptr->prefix_key_length); - - /* Copy in the key, adjust point if a key prefix was used. */ - buffer_ptr= memcpy(buffer_ptr + (ptr->prefix_key_length ? ptr->prefix_key_length : 0), - key, key_length); - buffer_ptr+= key_length; - buffer_ptr[0]= ' '; - buffer_ptr++; - - write_length= (size_t)(buffer_ptr - buffer); - write_length+= (size_t) snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE, - "%u %llu %zu%s\r\n", - flags, - (unsigned long long)expiration, value_length, - ptr->flags.no_reply ? " noreply" : ""); - } - - if (ptr->flags.use_udp && ptr->flags.buffer_requests) - { - size_t cmd_size= write_length + value_length + 2; - if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) - return MEMCACHED_WRITE_FAILURE; - if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) - memcached_io_write(instance, NULL, 0, true); - } + WATCHPOINT_SET(instance->io_wait_count.read= 0); + WATCHPOINT_SET(instance->io_wait_count.write= 0); - if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) + if (ptr->flags.binary_protocol) { - rc= MEMCACHED_WRITE_FAILURE; + rc= memcached_send_binary(ptr, instance, server_key, + key, key_length, + value, value_length, expiration, + flags, cas, verb); + WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read); + WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write); } else { - struct __write_vector_st vector[]= - { - { .length= write_length, .buffer= buffer }, - { .length= value_length, .buffer= value }, - { .length= 2, .buffer= "\r\n" } - }; - if (ptr->flags.buffer_requests && verb == SET_OP) + if (cas) { - to_write= false; + write_length= (size_t) snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, + "%s %.*s%.*s %u %llu %zu %llu%s\r\n", + storage_op_string(verb), + (int)ptr->prefix_key_length, + ptr->prefix_key, + (int)key_length, key, flags, + (unsigned long long)expiration, value_length, + (unsigned long long)cas, + (ptr->flags.no_reply) ? " noreply" : ""); } else { - to_write= true; + char *buffer_ptr= buffer; + const char *command= storage_op_string(verb); + + /* Copy in the command, no space needed, we handle that in the command function*/ + memcpy(buffer_ptr, command, strlen(command)); + + /* Copy in the key prefix, switch to the buffer_ptr */ + buffer_ptr= memcpy((buffer_ptr + strlen(command)), ptr->prefix_key, ptr->prefix_key_length); + + /* Copy in the key, adjust point if a key prefix was used. */ + buffer_ptr= memcpy(buffer_ptr + (ptr->prefix_key_length ? ptr->prefix_key_length : 0), + key, key_length); + buffer_ptr+= key_length; + buffer_ptr[0]= ' '; + buffer_ptr++; + + write_length= (size_t)(buffer_ptr - buffer); + write_length+= (size_t) snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE, + "%u %llu %zu%s\r\n", + flags, + (unsigned long long)expiration, value_length, + ptr->flags.no_reply ? " noreply" : ""); } - /* Send command header */ - rc= memcached_vdo(instance, vector, 3, to_write); - if (rc == MEMCACHED_SUCCESS) + if (ptr->flags.use_udp && ptr->flags.buffer_requests) { + size_t cmd_size= write_length + value_length + 2; + if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH) + return MEMCACHED_WRITE_FAILURE; + if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) + memcached_io_write(instance, NULL, 0, true); + } - if (ptr->flags.no_reply) - return (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; + if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) + { + rc= MEMCACHED_WRITE_FAILURE; + } + else + { + struct __write_vector_st vector[]= + { + { .length= write_length, .buffer= buffer }, + { .length= value_length, .buffer= value }, + { .length= 2, .buffer= "\r\n" } + }; - if (to_write == false) - return MEMCACHED_BUFFERED; + if (ptr->flags.buffer_requests && verb == SET_OP) + { + to_write= false; + } + else + { + to_write= true; + } - rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + /* Send command header */ + rc= memcached_vdo(instance, vector, 3, to_write); + if (rc == MEMCACHED_SUCCESS) + { - if (rc == MEMCACHED_STORED) - return MEMCACHED_SUCCESS; - else - return rc; + if (ptr->flags.no_reply) + { + rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; + } + else if (to_write == false) + { + rc= MEMCACHED_BUFFERED; + } + else + { + rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_STORED) + rc= MEMCACHED_SUCCESS; + } + } } + + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); } - if (rc == MEMCACHED_WRITE_FAILURE) - memcached_io_reset(instance); + WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read); + WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write); return rc; } @@ -426,8 +440,8 @@ static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply static memcached_return_t memcached_send_binary(memcached_st *ptr, - const char *master_key, - size_t master_key_length, + memcached_server_write_instance_st server, + uint32_t server_key, const char *key, size_t key_length, const char *value, @@ -440,11 +454,6 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, bool flush; protocol_binary_request_set request= {.bytes= {0}}; size_t send_length= sizeof(request.bytes); - uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, master_key, - master_key_length); - - memcached_server_write_instance_st server= - memcached_server_instance_fetch(ptr, server_key); bool noreply= server->root->flags.no_reply; @@ -499,9 +508,10 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc; } - unlikely (verb == SET_OP && ptr->number_of_replicas > 0) + if (verb == SET_OP && ptr->number_of_replicas > 0) { request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ; + WATCHPOINT_STRING("replicating"); for (uint32_t x= 0; x < ptr->number_of_replicas; x++) { diff --git a/libmemcached/watchpoint.h b/libmemcached/watchpoint.h index 488238cb..31d0a12b 100644 --- a/libmemcached/watchpoint.h +++ b/libmemcached/watchpoint.h @@ -50,6 +50,8 @@ static inline void __stack_dump(void) #define WATCHPOINT_STRING(A) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__,A);fflush(stdout); } while (0) #define WATCHPOINT_STRING_LENGTH(A,B) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %.*s\n", __FILE__, __LINE__,__func__,(int)B,A);fflush(stdout); } while (0) #define WATCHPOINT_NUMBER(A) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %zu\n", __FILE__, __LINE__,__func__,(size_t)(A));fflush(stdout); } while (0) +#define WATCHPOINT_LABELED_NUMBER(A,B) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s:%zu\n", __FILE__, __LINE__,__func__,(A),(size_t)(B));fflush(stdout); } while (0) +#define WATCHPOINT_IF_LABELED_NUMBER(A,B,C) do { if(A) {fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s:%zu\n", __FILE__, __LINE__,__func__,(B),(size_t)(C));fflush(stdout);} } while (0) #define WATCHPOINT_ERRNO(A) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__, strerror(A));fflush(stdout); } while (0) #define WATCHPOINT_ASSERT_PRINT(A,B,C) do { if(!(A)){fprintf(stderr, "\nWATCHPOINT ASSERT %s:%d (%s) ", __FILE__, __LINE__,__func__);fprintf(stderr, (B),(C));fprintf(stderr,"\n");fflush(stdout); __stack_dump(); } assert((A)); } while (0) #define WATCHPOINT_ASSERT(A) do { if (! (A)) {__stack_dump();} assert((A)); } while (0) @@ -63,6 +65,8 @@ static inline void __stack_dump(void) #define WATCHPOINT_IFERROR(A) #define WATCHPOINT_STRING(A) #define WATCHPOINT_NUMBER(A) +#define WATCHPOINT_LABELED_NUMBER(A,B) +#define WATCHPOINT_IF_LABELED_NUMBER(A,B,C) #define WATCHPOINT_ERRNO(A) #define WATCHPOINT_ASSERT_PRINT(A,B,C) #define WATCHPOINT_ASSERT(A) diff --git a/tests/mem_functions.c b/tests/mem_functions.c index 701de442..01f769a4 100644 --- a/tests/mem_functions.c +++ b/tests/mem_functions.c @@ -52,6 +52,10 @@ static pairs_st *global_pairs; static const char *global_keys[GLOBAL_COUNT]; static size_t global_keys_length[GLOBAL_COUNT]; +// Prototype +static test_return_t pre_binary(memcached_st *memc); + + static test_return_t init_test(memcached_st *not_used __attribute__((unused))) { memcached_st memc; @@ -1667,7 +1671,7 @@ static test_return_t mget_execute(memcached_st *memc) uint32_t number_of_hosts= memc->number_of_hosts; memc->number_of_hosts= 1; - size_t max_keys= binary ? 20480 : 1; + size_t max_keys= 20480; char **keys= calloc(max_keys, sizeof(char*)); @@ -1676,6 +1680,7 @@ static test_return_t mget_execute(memcached_st *memc) /* First add all of the items.. */ char blob[1024] = {0}; memcached_return_t rc; + for (size_t x= 0; x < max_keys; ++x) { char k[251]; @@ -1693,21 +1698,23 @@ static test_return_t mget_execute(memcached_st *memc) rc= memcached_mget_execute(memc, (const char**)keys, key_length, max_keys, callbacks, &counter, 1); - if (binary) + if (rc == MEMCACHED_SUCCESS) { - test_true(rc == MEMCACHED_SUCCESS); - + test_true(binary); rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1); test_true(rc == MEMCACHED_END); /* Verify that we got all of the items */ test_true(counter == max_keys); } - else + else if (rc == MEMCACHED_NOT_SUPPORTED) { - test_true(rc == MEMCACHED_NOT_SUPPORTED); test_true(counter == 0); } + else + { + test_fail("note: this test functions differently when in binary mode"); + } /* Release all allocated resources */ for (size_t x= 0; x < max_keys; ++x) @@ -1721,6 +1728,51 @@ static test_return_t mget_execute(memcached_st *memc) return TEST_SUCCESS; } +#define REGRESSION_BINARY_VS_BLOCK_COUNT 20480 + +static test_return_t key_setup(memcached_st *memc) +{ + (void)memc; + + if (pre_binary(memc) != TEST_SUCCESS) + return TEST_SKIPPED; + + global_pairs= pairs_generate(REGRESSION_BINARY_VS_BLOCK_COUNT, 0); + + return TEST_SUCCESS; +} + +static test_return_t key_teardown(memcached_st *memc) +{ + (void)memc; + pairs_free(global_pairs); + + return TEST_SUCCESS; +} + +static test_return_t block_add_regression(memcached_st *memc) +{ + /* First add all of the items.. */ + for (size_t x= 0; x < REGRESSION_BINARY_VS_BLOCK_COUNT; ++x) + { + memcached_return_t rc; + char blob[1024] = {0}; + + rc= memcached_add_by_key(memc, "bob", 3, global_pairs[x].key, global_pairs[x].key_length, blob, sizeof(blob), 0, 0); + test_true(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED); + } + + return TEST_SUCCESS; +} + +static test_return_t binary_add_regression(memcached_st *memc) +{ + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1); + test_return_t rc= block_add_regression(memc); + memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 0); + return rc; +} + static test_return_t get_stats_keys(memcached_st *memc) { char **stat_list; @@ -2945,8 +2997,6 @@ static test_return_t _user_supplied_bug21(memcached_st* memc, size_t key_count) return TEST_SUCCESS; } -static test_return_t pre_binary(memcached_st *memc); - static test_return_t user_supplied_bug21(memcached_st *memc) { test_return_t test_rc; @@ -5869,6 +5919,12 @@ test_st behavior_tests[] ={ {0, 0, 0} }; +test_st regression_binary_vs_block[] ={ + {"block add", 1, (test_callback_fn)block_add_regression}, + {"binary add", 1, (test_callback_fn)binary_add_regression}, + {0, 0, 0} +}; + test_st async_tests[] ={ {"add", 1, (test_callback_fn)add_wrapper }, {0, 0, 0} @@ -6100,6 +6156,7 @@ collection_st collection[] ={ {"replication_noblock", (test_callback_fn)pre_replication_noblock, 0, replication_tests}, {"regression", 0, 0, regression_tests}, {"behaviors", 0, 0, behavior_tests}, + {"regression_binary_vs_block", (test_callback_fn)key_setup, (test_callback_fn)key_teardown, regression_binary_vs_block}, {0, 0, 0, 0} }; diff --git a/tests/test.c b/tests/test.c index dd64be69..45ee3ab4 100644 --- a/tests/test.c +++ b/tests/test.c @@ -38,7 +38,7 @@ static void world_stats_print(world_stats_st *stats) fprintf(stderr, "\tSucceeded\t\t%u\n", stats->success); } -static long int timedif(struct timeval a, struct timeval b) +long int timedif(struct timeval a, struct timeval b) { long us, s; diff --git a/tests/test.h b/tests/test.h index 28388bbd..cd58a80d 100644 --- a/tests/test.h +++ b/tests/test.h @@ -39,6 +39,8 @@ typedef test_return_t (*test_callback_fn)(void *); typedef test_return_t (*test_callback_runner_fn)(test_callback_fn, void *); typedef test_return_t (*test_callback_error_fn)(test_return_t, void *); +/* Help function for use with gettimeofday() */ +long int timedif(struct timeval a, struct timeval b); /** A structure describing the test case.