From: Trond Norbye Date: Tue, 13 Oct 2009 10:05:51 +0000 (+0200) Subject: Initial implementation of memcached_mget_execute X-Git-Tag: 0.35~11^2~6 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=35a37b8f10e31646efb89325331c171ecae87d78;p=m6w6%2Flibmemcached Initial implementation of memcached_mget_execute --- diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index 08c66a2c..5d58acbf 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -113,6 +113,7 @@ struct memcached_st { memcached_trigger_delete_key delete_trigger; char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE]; uint32_t number_of_replicas; + memcached_callback_st *callbacks; }; LIBMEMCACHED_API diff --git a/libmemcached/memcached_connect.c b/libmemcached/memcached_connect.c index cee75e4c..f3ee1481 100644 --- a/libmemcached/memcached_connect.c +++ b/libmemcached/memcached_connect.c @@ -121,16 +121,25 @@ static memcached_return set_socket_options(memcached_server_st *ptr) WATCHPOINT_ASSERT(error == 0); } - /* For the moment, not getting a nonblocking mode will not be fatal */ - if ((ptr->root->flags & MEM_NO_BLOCK) || ptr->root->connect_timeout) - { - int flags; + /* libmemcached will always use nonblocking IO to avoid write deadlocks */ + int flags; + do flags= fcntl(ptr->fd, F_GETFL, 0); - unlikely (flags != -1) - { - (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK); - } + while (flags == -1 && (errno == EINTR || errno == EAGAIN)); + + unlikely (flags == -1) + return MEMCACHED_CONNECTION_FAILURE; + else if ((flags & O_NONBLOCK) == 0) + { + int rval; + + do + rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK); + while (rval == -1 && (errno == EINTR || errno == EAGAIN)); + + unlikely (rval == -1) + return MEMCACHED_CONNECTION_FAILURE; } return MEMCACHED_SUCCESS; @@ -219,14 +228,6 @@ static memcached_return network_connect(memcached_server_st *ptr) (void)set_socket_options(ptr); - int flags= 0; - if (ptr->root->connect_timeout) - { - flags= fcntl(ptr->fd, F_GETFL, 0); - if (flags != -1 && !(flags & O_NONBLOCK)) - (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK); - } - /* connect to server */ while (ptr->fd != -1 && connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0) @@ -268,10 +269,6 @@ static memcached_return network_connect(memcached_server_st *ptr) if (ptr->fd != -1) { - /* restore flags */ - if (ptr->root->connect_timeout && (ptr->root->flags & MEM_NO_BLOCK) == 0) - (void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK); - WATCHPOINT_ASSERT(ptr->cursor_active == 0); ptr->server_failure_counter= 0; return MEMCACHED_SUCCESS; @@ -290,9 +287,10 @@ static memcached_return network_connect(memcached_server_st *ptr) if (gettimeofday(&next_time, NULL) == 0) ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout; } - ptr->server_failure_counter+= 1; + ptr->server_failure_counter++; if (ptr->cached_errno == 0) return MEMCACHED_TIMEOUT; + return MEMCACHED_ERRNO; /* The last error should be from connect() */ } diff --git a/libmemcached/memcached_get.c b/libmemcached/memcached_get.c index 7ee714fe..53c76f4b 100644 --- a/libmemcached/memcached_get.c +++ b/libmemcached/memcached_get.c @@ -277,6 +277,34 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, key_length, number_of_keys, true); } +memcached_return memcached_mget_execute(memcached_st *ptr, + const char *master_key, + size_t master_key_length, + const char **keys, + size_t *key_length, + size_t number_of_keys, + memcached_execute_function *callback, + void *context, + unsigned int number_of_callbacks) +{ + if ((ptr->flags & MEM_BINARY_PROTOCOL) == 0) + return MEMCACHED_NOT_SUPPORTED; + + memcached_return rc; + memcached_callback_st *original_callbacks= ptr->callbacks; + memcached_callback_st cb= { + .callback= callback, + .context= context, + .number_of_callback= 1 + }; + + ptr->callbacks= &cb; + rc= memcached_mget_by_key(ptr, master_key, master_key_length, keys, + key_length, number_of_keys); + ptr->callbacks= original_callbacks; + return rc; +} + static memcached_return simple_binary_mget(memcached_st *ptr, unsigned int master_server_key, bool is_master_key_set, diff --git a/libmemcached/memcached_get.h b/libmemcached/memcached_get.h index 4ea04a94..6e10ad18 100644 --- a/libmemcached/memcached_get.h +++ b/libmemcached/memcached_get.h @@ -6,8 +6,8 @@ * Author: Brian Aker */ -#ifndef __MEMCACHED_GET_H__ -#define __MEMCACHED_GET_H__ +#ifndef LIBMEMCACHED_MEMCACHED_GET_H +#define LIBMEMCACHED_MEMCACHED_GET_H #ifdef __cplusplus extern "C" { @@ -53,10 +53,19 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr, memcached_result_st *result, memcached_return *error); - +LIBMEMCACHED_API +memcached_return memcached_mget_execute(memcached_st *ptr, + const char *master_key, + size_t master_key_length, + const char **keys, + size_t *key_length, + size_t number_of_keys, + memcached_execute_function *callback, + void *context, + unsigned int number_of_callbacks); #ifdef __cplusplus } #endif -#endif /* __MEMCACHED_GET_H__ */ +#endif /* LIBMEMCACHED_MEMCACHED_GET_H */ diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 411040f6..6e517144 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -9,7 +9,7 @@ typedef enum { MEM_READ, - MEM_WRITE, + MEM_WRITE } memc_read_or_write; static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error); @@ -18,18 +18,15 @@ static void increment_udp_message_id(memcached_server_st *ptr); static memcached_return io_wait(memcached_server_st *ptr, memc_read_or_write read_or_write) { - struct pollfd fds[1]; + struct pollfd fds[1]= { + [0].fd= ptr->fd, + [0].events = POLLIN + }; short flags= 0; int error; - if (read_or_write == MEM_WRITE) /* write */ - flags= POLLOUT; - else - flags= POLLIN; - - memset(&fds, 0, sizeof(struct pollfd)); - fds[0].fd= ptr->fd; - fds[0].events= flags; + unlikely (read_or_write == MEM_WRITE) /* write */ + fds[0].events= POLLOUT; /* ** We are going to block on write, but at least on Solaris we might block @@ -41,26 +38,109 @@ static memcached_return io_wait(memcached_server_st *ptr, */ if (read_or_write == MEM_WRITE) { - memcached_return rc=memcached_purge(ptr); + memcached_return rc= memcached_purge(ptr); if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - return MEMCACHED_FAILURE; + return MEMCACHED_FAILURE; } - error= poll(fds, 1, ptr->root->poll_timeout); + int timeout= ptr->root->poll_timeout; + if ((ptr->root->flags & MEM_NO_BLOCK) == 0) + timeout= -1; + + error= poll(fds, 1, timeout); if (error == 1) return MEMCACHED_SUCCESS; else if (error == 0) - { return MEMCACHED_TIMEOUT; - } /* Imposssible for anything other then -1 */ WATCHPOINT_ASSERT(error == -1); memcached_quit_server(ptr, 1); return MEMCACHED_FAILURE; +} + +/** + * Try to fill the input buffer for a server with as much + * data as possible. + * + * @param ptr the server to pack + */ +static bool repack_input_buffer(memcached_server_st *ptr) +{ + if (ptr->read_ptr != ptr->read_buffer) + { + /* Move all of the data to the beginning of the buffer so + ** that we can fit more data into the buffer... + */ + memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length); + ptr->read_ptr= ptr->read_buffer; + ptr->read_data_length= ptr->read_buffer_length; + } + + /* There is room in the buffer, try to fill it! */ + if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) + { + /* Just try a single read to grab what's available */ + ssize_t nr= read(ptr->fd, + ptr->read_ptr + ptr->read_data_length, + MEMCACHED_MAX_BUFFER - ptr->read_data_length); + if (nr > 0) + { + ptr->read_data_length+= (size_t)nr; + ptr->read_buffer_length+= (size_t)nr; + return true; + } + } + return false; +} + +/** + * If the we have callbacks connected to this server structure + * we may start process the input queue and fire the callbacks + * for the incomming messages. This function is _only_ called + * when the input buffer is full, so that we _know_ that we have + * at least _one_ message to process. + * + * @param ptr the server to star processing iput messages for + * @return true if we processed anything, false otherwise + */ +static bool process_input_buffer(memcached_server_st *ptr) +{ + /* + ** We might be able to process some of the response messages if we + ** have a callback set up + */ + if (ptr->root->callbacks != NULL && (ptr->root->flags & MEM_USE_UDP) == 0) + { + /* + * We might have responses... try to read them out and fire + * callbacks + */ + memcached_callback_st cb= *ptr->root->callbacks; + + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + memcached_return error; + error= memcached_response(ptr, buffer, sizeof(buffer), + &ptr->root->result); + if (error == MEMCACHED_SUCCESS) + { + for (int x= 0; x < cb.number_of_callback; x++) + { + error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context); + if (error != MEMCACHED_SUCCESS) + break; + } + + /* @todo what should I do with the error message??? */ + } + /* @todo what should I do with other error messages?? */ + return true; + } + + return false; } #ifdef UNUSED @@ -385,6 +465,16 @@ static ssize_t io_flush(memcached_server_st *ptr, continue; case EAGAIN: { + /* + * We may be blocked on write because the input buffer + * is full. Let's check if we have room in our input + * buffer for more data and retry the write before + * waiting.. + */ + if (repack_input_buffer(ptr) || + process_input_buffer(ptr)) + continue; + memcached_return rc; rc= io_wait(ptr, MEM_WRITE); @@ -429,7 +519,7 @@ static ssize_t io_flush(memcached_server_st *ptr, return (ssize_t) return_length; } -/* +/* Eventually we will just kill off the server with the problem. */ void memcached_io_reset(memcached_server_st *ptr) @@ -439,7 +529,7 @@ void memcached_io_reset(memcached_server_st *ptr) /** * Read a given number of bytes from the server and place it into a specific - * buffer. Reset the IO channel on this server if an error occurs. + * buffer. Reset the IO channel on this server if an error occurs. */ memcached_return memcached_safe_read(memcached_server_st *ptr, void *dta, @@ -526,7 +616,7 @@ static void increment_udp_message_id(memcached_server_st *ptr) uint16_t cur_req= get_udp_datagram_request_id(header); int msg_num= get_msg_num_from_request_id(cur_req); int thread_id= get_thread_id_from_request_id(cur_req); - + if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) msg_num= 0; diff --git a/libmemcached/memcached_types.h b/libmemcached/memcached_types.h index d7bb470a..9d2537d4 100644 --- a/libmemcached/memcached_types.h +++ b/libmemcached/memcached_types.h @@ -6,8 +6,8 @@ * Author: Brian Aker */ -#ifndef __MEMCACHED_TYPES_H__ -#define __MEMCACHED_TYPES_H__ +#ifndef LIBMEMCACHED_MEMCACHED_TYPES_H +#define LIBMEMCACHED_MEMCACHED_TYPES_H #ifdef __cplusplus extern "C" { @@ -37,8 +37,14 @@ typedef memcached_return (*memcached_trigger_delete_key)(memcached_st *ptr, typedef memcached_return (*memcached_dump_func)(memcached_st *ptr, const char *key, size_t key_length, void *context); +typedef struct { + memcached_execute_function *callback; + void *context; + unsigned int number_of_callback; +} memcached_callback_st; + #ifdef __cplusplus } #endif -#endif /* __MEMCACHED_TYPES_H__ */ +#endif /* LIBMEMCACHED_MEMCACHED_TYPES_H */ diff --git a/tests/function.c b/tests/function.c index 8e33968e..13daaa29 100644 --- a/tests/function.c +++ b/tests/function.c @@ -1449,6 +1449,71 @@ static test_return_t mget_test(memcached_st *memc) return TEST_SUCCESS; } +static test_return_t mget_execute(memcached_st *memc) +{ + bool binary= false; + if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) != 0) + binary= true; + + /* + * I only want to hit _one_ server so I know the number of requests I'm + * sending in the pipeline. + */ + uint32_t number_of_hosts= memc->number_of_hosts; + memc->number_of_hosts= 1; + + int max_keys= binary ? 20480 : 1; + + + char **keys= calloc(max_keys, sizeof(char*)); + size_t *key_length=calloc(max_keys, sizeof(size_t)); + + /* First add all of the items.. */ + char blob[1024] = {0}; + memcached_return rc; + for (int x= 0; x < max_keys; ++x) + { + char k[251]; + key_length[x]= snprintf(k, sizeof(k), "0200%u", x); + keys[x]= strdup(k); + assert(keys[x] != NULL); + rc= memcached_add(memc, keys[x], key_length[x], blob, sizeof(blob), 0, 0); + assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED); + } + + /* Try to get all of them with a large multiget */ + unsigned int counter= 0; + memcached_execute_function callbacks[1]= { [0]= &callback_counter }; + rc= memcached_mget_execute(memc, NULL, 0, + (const char**)keys, key_length, + max_keys, callbacks, &counter, 1); + + if (binary) + { + assert(rc == MEMCACHED_SUCCESS); + + rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1); + assert(rc == MEMCACHED_END); + + /* Verify that we got all of the items */ + assert(counter == max_keys); + } + else + { + assert(rc == MEMCACHED_NOT_SUPPORTED); + assert(counter == 0); + } + + /* Release all allocated resources */ + for (int x= 0; x < max_keys; ++x) + free(keys[x]); + free(keys); + free(key_length); + + memc->number_of_hosts= number_of_hosts; + return TEST_SUCCESS; +} + static test_return_t get_stats_keys(memcached_st *memc) { char **list; @@ -4892,6 +4957,7 @@ test_st tests[] ={ {"mget_result", 1, mget_result_test }, {"mget_result_alloc", 1, mget_result_alloc_test }, {"mget_result_function", 1, mget_result_function }, + {"mget_execute", 1, mget_execute }, {"mget_end", 0, mget_end }, {"get_stats", 0, get_stats }, {"add_host_test", 0, add_host_test },