X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fmemcached_io.c;h=693ce95c54ddc052642acc7fd98137c54fb8ebbc;hb=a7a011c2ea4a63368b3a96a332da00820ed402cb;hp=411040f68e6adccaa7bf602c021b84523fcb25c9;hpb=acce29c292b435fef4fd486495dca7d653be2bf2;p=m6w6%2Flibmemcached diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 411040f6..693ce95c 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -9,7 +9,7 @@ typedef enum { MEM_READ, - MEM_WRITE, + MEM_WRITE } memc_read_or_write; static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error); @@ -18,18 +18,14 @@ static void increment_udp_message_id(memcached_server_st *ptr); static memcached_return io_wait(memcached_server_st *ptr, memc_read_or_write read_or_write) { - struct pollfd fds[1]; - short flags= 0; + struct pollfd fds= { + .fd= ptr->fd, + .events = POLLIN + }; int error; - if (read_or_write == MEM_WRITE) /* write */ - flags= POLLOUT; - else - flags= POLLIN; - - memset(&fds, 0, sizeof(struct pollfd)); - fds[0].fd= ptr->fd; - fds[0].events= flags; + unlikely (read_or_write == MEM_WRITE) /* write */ + fds.events= POLLOUT; /* ** We are going to block on write, but at least on Solaris we might block @@ -41,26 +37,109 @@ static memcached_return io_wait(memcached_server_st *ptr, */ if (read_or_write == MEM_WRITE) { - memcached_return rc=memcached_purge(ptr); + memcached_return rc= memcached_purge(ptr); if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - return MEMCACHED_FAILURE; + return MEMCACHED_FAILURE; } - error= poll(fds, 1, ptr->root->poll_timeout); + int timeout= ptr->root->poll_timeout; + if ((ptr->root->flags & MEM_NO_BLOCK) == 0) + timeout= -1; + + error= poll(&fds, 1, timeout); if (error == 1) return MEMCACHED_SUCCESS; else if (error == 0) - { return MEMCACHED_TIMEOUT; - } /* Imposssible for anything other then -1 */ WATCHPOINT_ASSERT(error == -1); memcached_quit_server(ptr, 1); return MEMCACHED_FAILURE; +} + +/** + * Try to fill the input buffer for a server with as much + * data as possible. + * + * @param ptr the server to pack + */ +static bool repack_input_buffer(memcached_server_st *ptr) +{ + if (ptr->read_ptr != ptr->read_buffer) + { + /* Move all of the data to the beginning of the buffer so + ** that we can fit more data into the buffer... + */ + memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length); + ptr->read_ptr= ptr->read_buffer; + ptr->read_data_length= ptr->read_buffer_length; + } + + /* There is room in the buffer, try to fill it! */ + if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) + { + /* Just try a single read to grab what's available */ + ssize_t nr= read(ptr->fd, + ptr->read_ptr + ptr->read_data_length, + MEMCACHED_MAX_BUFFER - ptr->read_data_length); + if (nr > 0) + { + ptr->read_data_length+= (size_t)nr; + ptr->read_buffer_length+= (size_t)nr; + return true; + } + } + return false; +} + +/** + * If the we have callbacks connected to this server structure + * we may start process the input queue and fire the callbacks + * for the incomming messages. This function is _only_ called + * when the input buffer is full, so that we _know_ that we have + * at least _one_ message to process. + * + * @param ptr the server to star processing iput messages for + * @return true if we processed anything, false otherwise + */ +static bool process_input_buffer(memcached_server_st *ptr) +{ + /* + ** We might be able to process some of the response messages if we + ** have a callback set up + */ + if (ptr->root->callbacks != NULL && (ptr->root->flags & MEM_USE_UDP) == 0) + { + /* + * We might have responses... try to read them out and fire + * callbacks + */ + memcached_callback_st cb= *ptr->root->callbacks; + + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + memcached_return error; + error= memcached_response(ptr, buffer, sizeof(buffer), + &ptr->root->result); + if (error == MEMCACHED_SUCCESS) + { + for (unsigned int x= 0; x < cb.number_of_callback; x++) + { + error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context); + if (error != MEMCACHED_SUCCESS) + break; + } + + /* @todo what should I do with the error message??? */ + } + /* @todo what should I do with other error messages?? */ + return true; + } + + return false; } #ifdef UNUSED @@ -385,6 +464,16 @@ static ssize_t io_flush(memcached_server_st *ptr, continue; case EAGAIN: { + /* + * We may be blocked on write because the input buffer + * is full. Let's check if we have room in our input + * buffer for more data and retry the write before + * waiting.. + */ + if (repack_input_buffer(ptr) || + process_input_buffer(ptr)) + continue; + memcached_return rc; rc= io_wait(ptr, MEM_WRITE); @@ -429,7 +518,7 @@ static ssize_t io_flush(memcached_server_st *ptr, return (ssize_t) return_length; } -/* +/* Eventually we will just kill off the server with the problem. */ void memcached_io_reset(memcached_server_st *ptr) @@ -439,7 +528,7 @@ void memcached_io_reset(memcached_server_st *ptr) /** * Read a given number of bytes from the server and place it into a specific - * buffer. Reset the IO channel on this server if an error occurs. + * buffer. Reset the IO channel on this server if an error occurs. */ memcached_return memcached_safe_read(memcached_server_st *ptr, void *dta, @@ -526,7 +615,7 @@ static void increment_udp_message_id(memcached_server_st *ptr) uint16_t cur_req= get_udp_datagram_request_id(header); int msg_num= get_msg_num_from_request_id(cur_req); int thread_id= get_thread_id_from_request_id(cur_req); - + if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) msg_num= 0;