X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.c;h=30a2a1a4921396a967b71bd92bda79ec57ce3bb6;hb=09ef521d2c88955d04d6c91f7b5a1671a1955130;hp=ef46a0d9e460b93f819e8c70d368ce3c2c107b72;hpb=70c3fdd7d6d441a8be3e831f6a0aa3ae88166bc6;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.c b/libmemcached/io.c index ef46a0d9..30a2a1a4 100644 --- a/libmemcached/io.c +++ b/libmemcached/io.c @@ -19,10 +19,11 @@ typedef enum { MEM_WRITE } memc_read_or_write; -static ssize_t io_flush(memcached_server_instance_st *ptr, memcached_return_t *error); -static void increment_udp_message_id(memcached_server_instance_st *ptr); +static ssize_t io_flush(memcached_server_write_instance_st ptr, + memcached_return_t *error); +static void increment_udp_message_id(memcached_server_write_instance_st ptr); -static memcached_return_t io_wait(memcached_server_instance_st *ptr, +static memcached_return_t io_wait(memcached_server_write_instance_st ptr, memc_read_or_write read_or_write) { struct pollfd fds= { @@ -31,8 +32,15 @@ static memcached_return_t io_wait(memcached_server_instance_st *ptr, }; int error; - unlikely (read_or_write == MEM_WRITE) /* write */ + if (read_or_write == MEM_WRITE) /* write */ + { fds.events= POLLOUT; + WATCHPOINT_SET(ptr->io_wait_count.write++); + } + else + { + WATCHPOINT_SET(ptr->io_wait_count.read++); + } /* ** We are going to block on write, but at least on Solaris we might block @@ -54,32 +62,50 @@ static memcached_return_t io_wait(memcached_server_instance_st *ptr, timeout= -1; size_t loop_max= 5; - while (--loop_max) + while (--loop_max) // While loop is for ERESTART or EINTR { error= poll(&fds, 1, timeout); switch (error) { - case 1: + case 1: // Success! + WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); + WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); + return MEMCACHED_SUCCESS; - case 0: + case 0: // Timeout occured, we let the while() loop do its thing. return MEMCACHED_TIMEOUT; + default: + WATCHPOINT_ERRNO(errno); + switch (errno) + { #ifdef TARGET_OS_LINUX - case ERESTART: + case ERESTART: #endif - case EINTR: - continue; - default: - ptr->cached_errno= error; - memcached_quit_server(ptr, true); + case EINTR: + break; + default: + if (fds.revents & POLLERR) + { + int err; + socklen_t len= sizeof (err); + (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); + ptr->cached_errno= (err == 0) ? errno : err; + } + else + { + ptr->cached_errno= errno; + } + memcached_quit_server(ptr, true); - return MEMCACHED_FAILURE; + return MEMCACHED_FAILURE; + } } } /* Imposssible for anything other then -1 */ WATCHPOINT_ASSERT(error == -1); - ptr->cached_errno= error; + ptr->cached_errno= errno; memcached_quit_server(ptr, true); return MEMCACHED_FAILURE; @@ -91,7 +117,7 @@ static memcached_return_t io_wait(memcached_server_instance_st *ptr, * * @param ptr the server to pack */ -static bool repack_input_buffer(memcached_server_instance_st *ptr) +static bool repack_input_buffer(memcached_server_write_instance_st ptr) { if (ptr->read_ptr != ptr->read_buffer) { @@ -131,7 +157,7 @@ static bool repack_input_buffer(memcached_server_instance_st *ptr) * @param ptr the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(memcached_server_instance_st *ptr) +static bool process_input_buffer(memcached_server_write_instance_st ptr) { /* ** We might be able to process some of the response messages if we @@ -234,7 +260,7 @@ void memcached_io_preread(memcached_st *ptr) } #endif -memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, +memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, void *buffer, size_t length, ssize_t *nread) { char *buffer_ptr; @@ -251,11 +277,13 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, { data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER); if (data_read > 0) + { break; + } else if (data_read == -1) { ptr->cached_errno= errno; - memcached_return_t rc= MEMCACHED_UNKNOWN_READ_FAILURE; + memcached_return_t rc= MEMCACHED_ERRNO; switch (errno) { case EAGAIN: @@ -286,6 +314,7 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, for blocking I/O we do not return 0 and for non-blocking case it will return EGAIN if data is not immediatly available. */ + WATCHPOINT_STRING("We had a zero length read()"); memcached_quit_server(ptr, true); *nread= -1; return MEMCACHED_UNKNOWN_READ_FAILURE; @@ -325,8 +354,8 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, return MEMCACHED_SUCCESS; } -ssize_t memcached_io_write(memcached_server_instance_st *ptr, - const void *buffer, size_t length, bool with_flush) +static ssize_t _io_write(memcached_server_write_instance_st ptr, + const void *buffer, size_t length, bool with_flush) { size_t original_length; const char* buffer_ptr; @@ -402,7 +431,42 @@ ssize_t memcached_io_write(memcached_server_instance_st *ptr, return (ssize_t) original_length; } -memcached_return_t memcached_io_close(memcached_server_instance_st *ptr) +ssize_t memcached_io_write(memcached_server_write_instance_st ptr, + const void *buffer, size_t length, bool with_flush) +{ + return _io_write(ptr, buffer, length, with_flush); +} + +ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, + const struct __write_vector_st *vector, + size_t number_of, bool with_flush) +{ + ssize_t total= 0; + + for (size_t x= 0; x < number_of; x++, vector++) + { + ssize_t returnable; + + if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + { + return -1; + } + total+= returnable; + } + + if (with_flush) + { + if (memcached_io_write(ptr, NULL, 0, true) == -1) + { + return -1; + } + } + + return total; +} + + +memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr) { if (ptr->fd == -1) { @@ -425,7 +489,7 @@ memcached_return_t memcached_io_close(memcached_server_instance_st *ptr) return MEMCACHED_SUCCESS; } -memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *memc) +memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; @@ -435,7 +499,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) { - memcached_server_instance_st *instance= + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ @@ -455,7 +519,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - memcached_server_instance_st *instance= + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); if (memcached_server_response_count(instance) > 0) @@ -481,7 +545,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_instance_st *instance= + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y); if (instance->fd == fds[x].fd) @@ -494,7 +558,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem return NULL; } -static ssize_t io_flush(memcached_server_instance_st *ptr, +static ssize_t io_flush(memcached_server_write_instance_st ptr, memcached_return_t *error) { /* @@ -547,6 +611,8 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, if (sent_length == -1) { ptr->cached_errno= errno; + WATCHPOINT_ERRNO(errno); + WATCHPOINT_NUMBER(errno); switch (errno) { case ENOBUFS: @@ -610,7 +676,7 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(memcached_server_instance_st *ptr) +void memcached_io_reset(memcached_server_write_instance_st ptr) { memcached_quit_server(ptr, true); } @@ -619,7 +685,7 @@ void memcached_io_reset(memcached_server_instance_st *ptr) * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(memcached_server_instance_st *ptr, +memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, void *dta, size_t size) { @@ -640,7 +706,7 @@ memcached_return_t memcached_safe_read(memcached_server_instance_st *ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(memcached_server_instance_st *ptr, +memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, char *buffer_ptr, size_t size) { @@ -698,7 +764,7 @@ memcached_return_t memcached_io_readline(memcached_server_instance_st *ptr, * extracts the message number from message_id, increments it and then * writes the new value back into the header */ -static void increment_udp_message_id(memcached_server_instance_st *ptr) +static void increment_udp_message_id(memcached_server_write_instance_st ptr) { struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; uint16_t cur_req= get_udp_datagram_request_id(header); @@ -711,7 +777,7 @@ static void increment_udp_message_id(memcached_server_instance_st *ptr) header->request_id= htons((uint16_t) (thread_id | msg_num)); } -memcached_return_t memcached_io_init_udp_header(memcached_server_instance_st *ptr, uint16_t thread_id) +memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id) { if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) return MEMCACHED_FAILURE;