X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fmemcached_io.c;h=fe534c908cf04bed04051208ff11d221e05d4f38;hb=b4de8d3fd063b9017797dd9809ab3acb8a537606;hp=3566dd2cc9dd5b8666b6a95e3c9cf874343a12b0;hpb=a91a68a7c685f82bc6f46c09fa2036e44a82e7f7;p=m6w6%2Flibmemcached diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 3566dd2c..fe534c90 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -9,58 +9,137 @@ typedef enum { MEM_READ, - MEM_WRITE, + MEM_WRITE } memc_read_or_write; -static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error); +static ssize_t io_flush(memcached_server_st *ptr, memcached_return_t *error); static void increment_udp_message_id(memcached_server_st *ptr); -static memcached_return io_wait(memcached_server_st *ptr, - memc_read_or_write read_or_write) +static memcached_return_t io_wait(memcached_server_st *ptr, + memc_read_or_write read_or_write) { - struct pollfd fds[1]; - short flags= 0; + struct pollfd fds= { + .fd= ptr->fd, + .events = POLLIN + }; int error; - if (read_or_write == MEM_WRITE) /* write */ - flags= POLLOUT; - else - flags= POLLIN; - - memset(&fds, 0, sizeof(struct pollfd)); - fds[0].fd= ptr->fd; - fds[0].events= flags; + unlikely (read_or_write == MEM_WRITE) /* write */ + fds.events= POLLOUT; /* - ** We are going to block on write, but at least on Solaris we might block - ** on write if we haven't read anything from our input buffer.. - ** Try to purge the input buffer if we don't do any flow control in the - ** application layer (just sending a lot of data etc) - ** The test is moved down in the purge function to avoid duplication of - ** the test. - */ + ** We are going to block on write, but at least on Solaris we might block + ** on write if we haven't read anything from our input buffer.. + ** Try to purge the input buffer if we don't do any flow control in the + ** application layer (just sending a lot of data etc) + ** The test is moved down in the purge function to avoid duplication of + ** the test. + */ if (read_or_write == MEM_WRITE) { - memcached_return rc=memcached_purge(ptr); + memcached_return_t rc= memcached_purge(ptr); if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - return MEMCACHED_FAILURE; + return MEMCACHED_FAILURE; } - error= poll(fds, 1, ptr->root->poll_timeout); + int timeout= ptr->root->poll_timeout; + if (ptr->root->flags.no_block == false) + timeout= -1; + + error= poll(&fds, 1, timeout); if (error == 1) return MEMCACHED_SUCCESS; else if (error == 0) - { return MEMCACHED_TIMEOUT; - } /* Imposssible for anything other then -1 */ WATCHPOINT_ASSERT(error == -1); memcached_quit_server(ptr, 1); return MEMCACHED_FAILURE; +} + +/** + * Try to fill the input buffer for a server with as much + * data as possible. + * + * @param ptr the server to pack + */ +static bool repack_input_buffer(memcached_server_st *ptr) +{ + if (ptr->read_ptr != ptr->read_buffer) + { + /* Move all of the data to the beginning of the buffer so + ** that we can fit more data into the buffer... + */ + memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length); + ptr->read_ptr= ptr->read_buffer; + ptr->read_data_length= ptr->read_buffer_length; + } + /* There is room in the buffer, try to fill it! */ + if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) + { + /* Just try a single read to grab what's available */ + ssize_t nr= read(ptr->fd, + ptr->read_ptr + ptr->read_data_length, + MEMCACHED_MAX_BUFFER - ptr->read_data_length); + + if (nr > 0) + { + ptr->read_data_length+= (size_t)nr; + ptr->read_buffer_length+= (size_t)nr; + return true; + } + } + return false; +} + +/** + * If the we have callbacks connected to this server structure + * we may start process the input queue and fire the callbacks + * for the incomming messages. This function is _only_ called + * when the input buffer is full, so that we _know_ that we have + * at least _one_ message to process. + * + * @param ptr the server to star processing iput messages for + * @return true if we processed anything, false otherwise + */ +static bool process_input_buffer(memcached_server_st *ptr) +{ + /* + ** We might be able to process some of the response messages if we + ** have a callback set up + */ + if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false) + { + /* + * We might have responses... try to read them out and fire + * callbacks + */ + memcached_callback_st cb= *ptr->root->callbacks; + + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + memcached_return_t error; + error= memcached_response(ptr, buffer, sizeof(buffer), + &ptr->root->result); + if (error == MEMCACHED_SUCCESS) + { + for (unsigned int x= 0; x < cb.number_of_callback; x++) + { + error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context); + if (error != MEMCACHED_SUCCESS) + break; + } + + /* @todo what should I do with the error message??? */ + } + /* @todo what should I do with other error messages?? */ + return true; + } + + return false; } #ifdef UNUSED @@ -90,8 +169,8 @@ void memcached_io_preread(memcached_st *ptr) } #endif -memcached_return memcached_io_read(memcached_server_st *ptr, - void *buffer, size_t length, ssize_t *nread) +memcached_return_t memcached_io_read(memcached_server_st *ptr, + void *buffer, size_t length, ssize_t *nread) { char *buffer_ptr; @@ -111,14 +190,14 @@ memcached_return memcached_io_read(memcached_server_st *ptr, else if (data_read == -1) { ptr->cached_errno= errno; - memcached_return rc= MEMCACHED_UNKNOWN_READ_FAILURE; + memcached_return_t rc= MEMCACHED_UNKNOWN_READ_FAILURE; switch (errno) { case EAGAIN: case EINTR: if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS) continue; - /* fall through */ + /* fall through */ default: { @@ -146,8 +225,8 @@ memcached_return memcached_io_read(memcached_server_st *ptr, } ptr->io_bytes_sent = 0; - ptr->read_data_length= data_read; - ptr->read_buffer_length= data_read; + ptr->read_data_length= (size_t) data_read; + ptr->read_buffer_length= (size_t) data_read; ptr->read_ptr= ptr->read_buffer; } @@ -174,7 +253,7 @@ memcached_return memcached_io_read(memcached_server_st *ptr, } ptr->server_failure_counter= 0; - *nread = (size_t)(buffer_ptr - (char*)buffer); + *nread = (ssize_t)(buffer_ptr - (char*)buffer); return MEMCACHED_SUCCESS; } @@ -218,7 +297,7 @@ ssize_t memcached_io_write(memcached_server_st *ptr, if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) { - memcached_return rc; + memcached_return_t rc; ssize_t sent_length; WATCHPOINT_ASSERT(ptr->fd != -1); @@ -227,25 +306,25 @@ ssize_t memcached_io_write(memcached_server_st *ptr, return -1; /* If io_flush calls memcached_purge, sent_length may be 0 */ - if (sent_length != 0) + unlikely (sent_length != 0) { - WATCHPOINT_ASSERT(sent_length == buffer_end); + WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end); } } } if (with_flush) { - memcached_return rc; + memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != -1); if (io_flush(ptr, &rc) == -1) return -1; } - return original_length; + return (ssize_t) original_length; } -memcached_return memcached_io_close(memcached_server_st *ptr) +memcached_return_t memcached_io_close(memcached_server_st *ptr) { int r; @@ -257,7 +336,7 @@ memcached_return memcached_io_close(memcached_server_st *ptr) { r= shutdown(ptr->fd, SHUT_RDWR); -#ifdef HAVE_DEBUG +#ifdef DEBUG if (r && errno != ENOTCONN) { WATCHPOINT_NUMBER(ptr->fd); @@ -268,7 +347,7 @@ memcached_return memcached_io_close(memcached_server_st *ptr) } r= close(ptr->fd); -#ifdef HAVE_DEBUG +#ifdef DEBUG if (r != 0) WATCHPOINT_ERRNO(errno); #endif @@ -280,10 +359,10 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; - unsigned int index= 0; + unsigned int host_index= 0; for (unsigned int x= 0; - x< memc->number_of_hosts && index < MAX_SERVERS_TO_POLL; + x< memc->number_of_hosts && host_index < MAX_SERVERS_TO_POLL; ++x) { if (memc->hosts[x].read_buffer_length > 0) /* I have data in the buffer */ @@ -291,14 +370,14 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) if (memcached_server_response_count(&memc->hosts[x]) > 0) { - fds[index].events = POLLIN; - fds[index].revents = 0; - fds[index].fd = memc->hosts[x].fd; - ++index; + fds[host_index].events = POLLIN; + fds[host_index].revents = 0; + fds[host_index].fd = memc->hosts[x].fd; + ++host_index; } } - if (index < 2) + if (host_index < 2) { /* We have 0 or 1 server with pending events.. */ for (unsigned int x= 0; x< memc->number_of_hosts; ++x) @@ -308,7 +387,7 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) return NULL; } - int err= poll(fds, index, memc->poll_timeout); + int err= poll(fds, host_index, memc->poll_timeout); switch (err) { case -1: memc->cached_errno = errno; @@ -316,7 +395,7 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) case 0: break; default: - for (unsigned int x= 0; x < index; ++x) + for (unsigned int x= 0; x < host_index; ++x) if (fds[x].revents & POLLIN) for (unsigned int y= 0; y < memc->number_of_hosts; ++y) if (memc->hosts[y].fd == fds[x].fd) @@ -327,20 +406,20 @@ memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) } static ssize_t io_flush(memcached_server_st *ptr, - memcached_return *error) + memcached_return_t *error) { /* - ** We might want to purge the input buffer if we haven't consumed - ** any output yet... The test for the limits is the purge is inline - ** in the purge function to avoid duplicating the logic.. - */ + ** We might want to purge the input buffer if we haven't consumed + ** any output yet... The test for the limits is the purge is inline + ** in the purge function to avoid duplicating the logic.. + */ { - memcached_return rc; - WATCHPOINT_ASSERT(ptr->fd != -1); - rc= memcached_purge(ptr); + memcached_return_t rc; + WATCHPOINT_ASSERT(ptr->fd != -1); + rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - return -1; + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + return -1; } ssize_t sent_length; size_t return_length; @@ -356,11 +435,11 @@ static ssize_t io_flush(memcached_server_st *ptr, return -1; if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP - && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) + && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) return 0; /* Looking for memory overflows */ -#if defined(HAVE_DEBUG) +#if defined(DEBUG) if (write_length == MEMCACHED_MAX_BUFFER) WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); @@ -384,16 +463,26 @@ static ssize_t io_flush(memcached_server_st *ptr, case ENOBUFS: continue; case EAGAIN: - { - memcached_return rc; - rc= io_wait(ptr, MEM_WRITE); + { + /* + * We may be blocked on write because the input buffer + * is full. Let's check if we have room in our input + * buffer for more data and retry the write before + * waiting.. + */ + if (repack_input_buffer(ptr) || + process_input_buffer(ptr)) + continue; - if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) - continue; + memcached_return_t rc; + rc= io_wait(ptr, MEM_WRITE); - memcached_quit_server(ptr, 1); - return -1; - } + if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) + continue; + + memcached_quit_server(ptr, 1); + return -1; + } default: memcached_quit_server(ptr, 1); *error= MEMCACHED_ERRNO; @@ -408,11 +497,11 @@ static ssize_t io_flush(memcached_server_st *ptr, return -1; } - ptr->io_bytes_sent += sent_length; + ptr->io_bytes_sent += (uint32_t) sent_length; local_write_ptr+= sent_length; - write_length-= sent_length; - return_length+= sent_length; + write_length-= (uint32_t) sent_length; + return_length+= (uint32_t) sent_length; } WATCHPOINT_ASSERT(write_length == 0); @@ -426,10 +515,10 @@ static ssize_t io_flush(memcached_server_st *ptr, else ptr->write_buffer_offset= 0; - return return_length; + return (ssize_t) return_length; } -/* +/* Eventually we will just kill off the server with the problem. */ void memcached_io_reset(memcached_server_st *ptr) @@ -439,11 +528,11 @@ void memcached_io_reset(memcached_server_st *ptr) /** * Read a given number of bytes from the server and place it into a specific - * buffer. Reset the IO channel on this server if an error occurs. + * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return memcached_safe_read(memcached_server_st *ptr, - void *dta, - size_t size) +memcached_return_t memcached_safe_read(memcached_server_st *ptr, + void *dta, + size_t size) { size_t offset= 0; char *data= dta; @@ -451,20 +540,20 @@ memcached_return memcached_safe_read(memcached_server_st *ptr, while (offset < size) { ssize_t nread; - memcached_return rc= memcached_io_read(ptr, data + offset, size - offset, - &nread); + memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset, + &nread); if (rc != MEMCACHED_SUCCESS) return rc; - offset+= nread; + offset+= (size_t) nread; } return MEMCACHED_SUCCESS; } -memcached_return memcached_io_readline(memcached_server_st *ptr, - char *buffer_ptr, - size_t size) +memcached_return_t memcached_io_readline(memcached_server_st *ptr, + char *buffer_ptr, + size_t size) { bool line_complete= false; size_t total_nr= 0; @@ -477,9 +566,9 @@ memcached_return memcached_io_readline(memcached_server_st *ptr, * We don't have any data in the buffer, so let's fill the read * buffer. Call the standard read function to avoid duplicating * the logic. - */ + */ ssize_t nread; - memcached_return rc= memcached_io_read(ptr, buffer_ptr, 1, &nread); + memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread); if (rc != MEMCACHED_SUCCESS) return rc; @@ -524,22 +613,22 @@ static void increment_udp_message_id(memcached_server_st *ptr) { struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; uint16_t cur_req= get_udp_datagram_request_id(header); - uint16_t msg_num= get_msg_num_from_request_id(cur_req); - uint16_t thread_id= get_thread_id_from_request_id(cur_req); - + int msg_num= get_msg_num_from_request_id(cur_req); + int thread_id= get_thread_id_from_request_id(cur_req); + if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) msg_num= 0; - header->request_id= htons(thread_id | msg_num); + header->request_id= htons((uint16_t) (thread_id | msg_num)); } -memcached_return memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id) +memcached_return_t memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id) { if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) return MEMCACHED_FAILURE; struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - header->request_id= htons(generate_udp_request_thread_id(thread_id)); + header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id))); header->num_datagrams= htons(1); header->sequence_number= htons(0);