X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=299d4390659616b21ebd8a4b4b050dc6df7603ca;hb=0656f7d7ed52b56cd62af7086352a71ca0877cf7;hp=f9eae4db61556646cd906ad12cf2f049ee071cdc;hpb=311bd0fafe1da7b5cc770a2ff4fac5472a801a40;p=awesomized%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index f9eae4db..299d4390 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -7,12 +7,22 @@ #include #include -static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write) +typedef enum { + MEM_READ, + MEM_WRITE, +} memc_read_or_write; + +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error); + +static memcached_return io_wait(memcached_st *ptr, unsigned int server_key, + memc_read_or_write read_or_write) { struct pollfd fds[1]; short flags= 0; + int error; - if (read_or_write) + if (read_or_write == MEM_WRITE) /* write */ flags= POLLOUT | POLLERR; else flags= POLLIN | POLLERR; @@ -21,10 +31,47 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ fds[0].fd= ptr->hosts[server_key].fd; fds[0].events= flags; - if (poll(fds, 1, ptr->poll_timeout) < 0) - return MEMCACHED_FAILURE; + error= poll(fds, 1, ptr->poll_timeout); - return MEMCACHED_SUCCESS; + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == 0) + { + return MEMCACHED_TIMEOUT; + } + + WATCHPOINT; + /* Imposssible for anything other then -1 */ + WATCHPOINT_ASSERT(error == -1); + memcached_quit_server(ptr, server_key, 1); + + return MEMCACHED_FAILURE; + +} + +void memcached_io_preread(memcached_st *ptr) +{ + unsigned int x; + + return; + + for (x= 0; x < ptr->number_of_hosts; x++) + { + if (memcached_server_response_count(ptr, x) && + ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER ) + { + size_t data_read; + + data_read= read(ptr->hosts[x].fd, + ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length, + MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length); + if (data_read == -1) + continue; + + ptr->hosts[x].read_buffer_length+= data_read; + ptr->hosts[x].read_data_length+= data_read; + } + } } ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, @@ -36,21 +83,13 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, while (length) { + uint8_t found_eof= 0; if (!ptr->hosts[server_key].read_buffer_length) { size_t data_read; while (1) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 0); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - data_read= read(ptr->hosts[server_key].fd, ptr->hosts[server_key].read_buffer, MEMCACHED_MAX_BUFFER); @@ -59,9 +98,20 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, switch (errno) { case EAGAIN: - break; + { + memcached_return rc; + + rc= io_wait(ptr, server_key, MEM_READ); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; + } default: { + memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; return -1; } @@ -69,18 +119,43 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, } else if (data_read) break; - /* If zero, just keep looping */ + /* If zero, just keep looping unless testing, then assert() */ + else + { + WATCHPOINT_ASSERT(0); + found_eof= 1; + break; + } } + ptr->hosts[server_key].read_data_length= data_read; ptr->hosts[server_key].read_buffer_length= data_read; ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer; } - *buffer_ptr= *ptr->hosts[server_key].read_ptr; - length--; - ptr->hosts[server_key].read_ptr++; - ptr->hosts[server_key].read_buffer_length--; - buffer_ptr++; + if (length > 1) + { + size_t difference; + + difference= (length > ptr->hosts[server_key].read_buffer_length) ? ptr->hosts[server_key].read_buffer_length : length; + + memcpy(buffer_ptr, ptr->hosts[server_key].read_ptr, difference); + length -= difference; + ptr->hosts[server_key].read_ptr+= difference; + ptr->hosts[server_key].read_buffer_length-= difference; + buffer_ptr+= difference; + } + else + { + *buffer_ptr= *ptr->hosts[server_key].read_ptr; + length--; + ptr->hosts[server_key].read_ptr++; + ptr->hosts[server_key].read_buffer_length--; + buffer_ptr++; + } + + if (found_eof) + break; } return (size_t)(buffer_ptr - buffer); @@ -102,9 +177,12 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER) { + memcached_return rc; size_t sent_length; - sent_length= memcached_io_flush(ptr, server_key); + sent_length= io_flush(ptr, server_key, &rc); + if (sent_length == -1) + return -1; WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; @@ -114,7 +192,8 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, if (with_flush) { - if (memcached_io_flush(ptr, server_key) == -1) + memcached_return rc; + if (io_flush(ptr, server_key, &rc) == -1) return -1; } @@ -123,37 +202,20 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) { - memcached_return rc; - - rc= MEMCACHED_SUCCESS; - if (ptr->flags & MEM_NO_BLOCK) - { - struct pollfd fds[1]; - short flags= 0; - - flags= POLLHUP | POLLERR; - - memset(&fds, 0, sizeof(struct pollfd)); - fds[0].fd= ptr->hosts[server_key].fd; - fds[0].events= flags; - fds[0].revents= 0; - - if (poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout) < 0) - rc= MEMCACHED_FAILURE; - } - close(ptr->hosts[server_key].fd); - return rc; + return MEMCACHED_SUCCESS; } -ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error) { size_t sent_length; size_t return_length; char *write_ptr= ptr->hosts[server_key].write_buffer; size_t write_length= ptr->hosts[server_key].write_buffer_offset; - unsigned int loop= 1; + + *error= MEMCACHED_SUCCESS; if (ptr->hosts[server_key].write_buffer_offset == 0) return 0; @@ -161,15 +223,6 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) return_length= 0; while (write_length) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 1); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - sent_length= 0; if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP) { @@ -186,17 +239,22 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) switch (errno) { case ENOBUFS: - case EAGAIN: - WATCHPOINT; continue; - if (loop < 100) + case EAGAIN: { - loop++; - break; + memcached_return rc; + rc= io_wait(ptr, server_key, MEM_WRITE); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; } - /* Yes, we want to fall through */ default: + memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; + *error= MEMCACHED_ERRNO; return -1; } }