X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.c;h=bca5ea4bb2f0d1a86b900634182789bf5c092705;hb=a4456cc10079f2e6f648befc91657f2723c825e5;hp=6d19c672da79f5414562847111716b3bfb512b98;hpb=ac40b10adf3a182fe62d24899eb96c10364ba91b;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.c b/libmemcached/io.c index 6d19c672..bca5ea4b 100644 --- a/libmemcached/io.c +++ b/libmemcached/io.c @@ -19,10 +19,10 @@ typedef enum { MEM_WRITE } memc_read_or_write; -static ssize_t io_flush(memcached_server_st *ptr, memcached_return_t *error); -static void increment_udp_message_id(memcached_server_st *ptr); +static ssize_t io_flush(memcached_server_instance_st *ptr, memcached_return_t *error); +static void increment_udp_message_id(memcached_server_instance_st *ptr); -static memcached_return_t io_wait(memcached_server_st *ptr, +static memcached_return_t io_wait(memcached_server_instance_st *ptr, memc_read_or_write read_or_write) { struct pollfd fds= { @@ -73,7 +73,7 @@ static memcached_return_t io_wait(memcached_server_st *ptr, * * @param ptr the server to pack */ -static bool repack_input_buffer(memcached_server_st *ptr) +static bool repack_input_buffer(memcached_server_instance_st *ptr) { if (ptr->read_ptr != ptr->read_buffer) { @@ -113,7 +113,7 @@ static bool repack_input_buffer(memcached_server_st *ptr) * @param ptr the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(memcached_server_st *ptr) +static bool process_input_buffer(memcached_server_instance_st *ptr) { /* ** We might be able to process some of the response messages if we @@ -127,15 +127,21 @@ static bool process_input_buffer(memcached_server_st *ptr) */ memcached_callback_st cb= *ptr->root->callbacks; + memcached_set_processing_input((memcached_st *)ptr->root, true); + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; memcached_return_t error; + memcached_st *root= (memcached_st *)ptr->root; error= memcached_response(ptr, buffer, sizeof(buffer), - &ptr->root->result); + &root->result); + + memcached_set_processing_input(root, false); + if (error == MEMCACHED_SUCCESS) { for (unsigned int x= 0; x < cb.number_of_callback; x++) { - error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context); + error= (*cb.callback[x])(ptr->root, &root->result, cb.context); if (error != MEMCACHED_SUCCESS) break; } @@ -149,6 +155,42 @@ static bool process_input_buffer(memcached_server_st *ptr) return false; } +static inline void memcached_io_cork_push(memcached_server_st *ptr) +{ +#ifdef CORK + if (ptr->root->flags.cork == false || ptr->state.is_corked) + return; + + int enable= 1; + int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK, + &enable, (socklen_t)sizeof(int)); + if (! err) + ptr->state.is_corked= true; + + WATCHPOINT_ASSERT(ptr->state.is_corked == true); +#else + (void)ptr; +#endif +} + +static inline void memcached_io_cork_pop(memcached_server_st *ptr) +{ +#ifdef CORK + if (ptr->root->flags.cork == false || ptr->state.is_corked == false) + return; + + int enable= 0; + int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK, + &enable, (socklen_t)sizeof(int)); + if (! err) + ptr->state.is_corked= false; + + WATCHPOINT_ASSERT(ptr->state.is_corked == false); +#else + (void)ptr; +#endif +} + #ifdef UNUSED void memcached_io_preread(memcached_st *ptr) { @@ -176,7 +218,7 @@ void memcached_io_preread(memcached_st *ptr) } #endif -memcached_return_t memcached_io_read(memcached_server_st *ptr, +memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, void *buffer, size_t length, ssize_t *nread) { char *buffer_ptr; @@ -264,7 +306,7 @@ memcached_return_t memcached_io_read(memcached_server_st *ptr, return MEMCACHED_SUCCESS; } -ssize_t memcached_io_write(memcached_server_st *ptr, +ssize_t memcached_io_write(memcached_server_instance_st *ptr, const void *buffer, size_t length, char with_flush) { size_t original_length; @@ -275,6 +317,12 @@ ssize_t memcached_io_write(memcached_server_st *ptr, original_length= length; buffer_ptr= buffer; + /* more writable data is coming if a flush isn't required, so delay send */ + if (! with_flush) + { + memcached_io_cork_push(ptr); + } + while (length) { char *write_ptr; @@ -325,13 +373,17 @@ ssize_t memcached_io_write(memcached_server_st *ptr, memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != -1); if (io_flush(ptr, &rc) == -1) + { return -1; + } + + memcached_io_cork_pop(ptr); } return (ssize_t) original_length; } -memcached_return_t memcached_io_close(memcached_server_st *ptr) +memcached_return_t memcached_io_close(memcached_server_instance_st *ptr) { if (ptr->fd == -1) { @@ -354,24 +406,27 @@ memcached_return_t memcached_io_close(memcached_server_st *ptr) return MEMCACHED_SUCCESS; } -memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) +memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *memc) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; unsigned int host_index= 0; - for (unsigned int x= 0; + for (uint32_t x= 0; x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) { - if (memc->hosts[x].read_buffer_length > 0) /* I have data in the buffer */ - return &memc->hosts[x]; + memcached_server_instance_st *instance= + memcached_server_instance_fetch(memc, x); + + if (instance->read_buffer_length > 0) /* I have data in the buffer */ + return instance; - if (memcached_server_response_count(&memc->hosts[x]) > 0) + if (memcached_server_response_count(instance) > 0) { fds[host_index].events = POLLIN; fds[host_index].revents = 0; - fds[host_index].fd = memc->hosts[x].fd; + fds[host_index].fd = instance->fd; ++host_index; } } @@ -379,9 +434,16 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) if (host_index < 2) { /* We have 0 or 1 server with pending events.. */ - for (unsigned int x= 0; x< memcached_server_count(memc); ++x) - if (memcached_server_response_count(&memc->hosts[x]) > 0) - return &memc->hosts[x]; + for (uint32_t x= 0; x< memcached_server_count(memc); ++x) + { + memcached_server_instance_st *instance= + memcached_server_instance_fetch(memc, x); + + if (memcached_server_response_count(instance) > 0) + { + return instance; + } + } return NULL; } @@ -398,10 +460,13 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) { if (fds[x].revents & POLLIN) { - for (unsigned int y= 0; y < memcached_server_count(memc); ++y) + for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - if (memc->hosts[y].fd == fds[x].fd) - return &memc->hosts[y]; + memcached_server_instance_st *instance= + memcached_server_instance_fetch(memc, y); + + if (instance->fd == fds[x].fd) + return instance; } } } @@ -410,7 +475,7 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) return NULL; } -static ssize_t io_flush(memcached_server_st *ptr, +static ssize_t io_flush(memcached_server_instance_st *ptr, memcached_return_t *error) { /* @@ -526,7 +591,7 @@ static ssize_t io_flush(memcached_server_st *ptr, /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(memcached_server_st *ptr) +void memcached_io_reset(memcached_server_instance_st *ptr) { memcached_quit_server(ptr, 1); } @@ -535,7 +600,7 @@ void memcached_io_reset(memcached_server_st *ptr) * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(memcached_server_st *ptr, +memcached_return_t memcached_safe_read(memcached_server_instance_st *ptr, void *dta, size_t size) { @@ -556,7 +621,7 @@ memcached_return_t memcached_safe_read(memcached_server_st *ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(memcached_server_st *ptr, +memcached_return_t memcached_io_readline(memcached_server_instance_st *ptr, char *buffer_ptr, size_t size) { @@ -614,7 +679,7 @@ memcached_return_t memcached_io_readline(memcached_server_st *ptr, * extracts the message number from message_id, increments it and then * writes the new value back into the header */ -static void increment_udp_message_id(memcached_server_st *ptr) +static void increment_udp_message_id(memcached_server_instance_st *ptr) { struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; uint16_t cur_req= get_udp_datagram_request_id(header); @@ -627,7 +692,7 @@ static void increment_udp_message_id(memcached_server_st *ptr) header->request_id= htons((uint16_t) (thread_id | msg_num)); } -memcached_return_t memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id) +memcached_return_t memcached_io_init_udp_header(memcached_server_instance_st *ptr, uint16_t thread_id) { if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) return MEMCACHED_FAILURE;