X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=38580638d7f197c81d373373954cb299b8aabc98;hb=acca3c7ad60716bf073ae8de0652517542cdd224;hp=401039b4efbd831ee2c0eddc1ab0e3f8692aac23;hpb=85a66fec1e07e874e5410ce56a2976d68fbe14fd;p=m6w6%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 401039b4..38580638 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -2,87 +2,153 @@ Basic socket buffered IO */ -#include +#include "common.h" #include "memcached_io.h" #include +#include -ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, - char *buffer, size_t length) +typedef enum { + MEM_READ, + MEM_WRITE, +} memc_read_or_write; + +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error); + +static memcached_return io_wait(memcached_st *ptr, unsigned int server_key, + memc_read_or_write read_or_write) { - size_t x; - char *buffer_ptr; + struct pollfd fds[1]; + short flags= 0; + int error; - buffer_ptr= buffer; + if (read_or_write == MEM_WRITE) /* write */ + flags= POLLOUT | POLLERR; + else + flags= POLLIN | POLLERR; + + memset(&fds, 0, sizeof(struct pollfd)); + fds[0].fd= ptr->hosts[server_key].fd; + fds[0].events= flags; - for (x= 0, buffer_ptr= buffer; - x < length; x++) + error= poll(fds, 1, ptr->poll_timeout); + + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == 0) { - if (!ptr->read_buffer_length) - { - if (length > 1) - { + return MEMCACHED_TIMEOUT; + } - size_t data_read; - data_read= recv(ptr->hosts[server_key].fd, - buffer_ptr, - length - x, 0); - if (data_read == -1) - { - return -1; - } - if (data_read == 0) - return x; + WATCHPOINT; + /* Imposssible for anything other then -1 */ + WATCHPOINT_ASSERT(error == -1); + memcached_quit_server(ptr, server_key, 1); - data_read+= x; + return MEMCACHED_FAILURE; - return data_read; - } - else - { - size_t data_read; -try_again: +} - if (ptr->flags & MEM_NO_BLOCK) - { - struct timeval local_tv; - fd_set set; +void memcached_io_preread(memcached_st *ptr) +{ + unsigned int x; - memset(&local_tv, 0, sizeof(struct timeval)); + return; - local_tv.tv_sec= 0; - local_tv.tv_usec= 300; + for (x= 0; x < ptr->number_of_hosts; x++) + { + if (memcached_server_response_count(ptr, x) && + ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER ) + { + size_t data_read; - FD_ZERO(&set); - FD_SET(ptr->hosts[server_key].fd, &set); + data_read= read(ptr->hosts[x].fd, + ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length, + MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length); + if (data_read == -1) + continue; - select(1, &set, NULL, NULL, &local_tv); - } + ptr->hosts[x].read_buffer_length+= data_read; + ptr->hosts[x].read_data_length+= data_read; + } + } +} + +ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, + char *buffer, size_t length) +{ + char *buffer_ptr; + + buffer_ptr= buffer; + + while (length) + { + if (!ptr->hosts[server_key].read_buffer_length) + { + size_t data_read; - data_read= recv(ptr->hosts[server_key].fd, - ptr->read_buffer, - MEMCACHED_MAX_BUFFER, 0); + while (1) + { + data_read= read(ptr->hosts[server_key].fd, + ptr->hosts[server_key].read_buffer, + MEMCACHED_MAX_BUFFER); if (data_read == -1) { - if (errno == EAGAIN) - goto try_again; - return -1; + switch (errno) + { + case EAGAIN: + { + memcached_return rc; + + rc= io_wait(ptr, server_key, MEM_READ); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; + } + default: + { + memcached_quit_server(ptr, server_key, 1); + ptr->cached_errno= errno; + return -1; + } + } } - ptr->read_buffer_length= data_read; - ptr->read_ptr= ptr->read_buffer; + else if (data_read) + break; + /* If zero, just keep looping */ } - if (ptr->read_buffer_length == -1) - return -1; - if (ptr->read_buffer_length == 0) - return x; + ptr->hosts[server_key].read_data_length= data_read; + ptr->hosts[server_key].read_buffer_length= data_read; + ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer; + } + + if (length > 1) + { + size_t difference; + + difference= (length > ptr->hosts[server_key].read_buffer_length) ? ptr->hosts[server_key].read_buffer_length : length; + + memcpy(buffer_ptr, ptr->hosts[server_key].read_ptr, difference); + length -= difference; + ptr->hosts[server_key].read_ptr+= difference; + ptr->hosts[server_key].read_buffer_length-= difference; + buffer_ptr+= difference; + } + else + { + *buffer_ptr= *ptr->hosts[server_key].read_ptr; + length--; + ptr->hosts[server_key].read_ptr++; + ptr->hosts[server_key].read_buffer_length--; + buffer_ptr++; } - *buffer_ptr= *ptr->read_ptr; - buffer_ptr++; - ptr->read_ptr++; - ptr->read_buffer_length--; } - return length; + return (size_t)(buffer_ptr - buffer); } ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, @@ -92,57 +158,109 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, for (x= 0; x < length; x++) { - ptr->write_buffer[ptr->write_buffer_offset]= buffer[x]; - ptr->write_buffer_offset++; - - if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) + if (ptr->hosts[server_key].write_ptr == 0) + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr); + *ptr->hosts[server_key].write_ptr= buffer[x]; + ptr->hosts[server_key].write_ptr++; + ptr->hosts[server_key].write_buffer_offset++; + + if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER) { + memcached_return rc; size_t sent_length; - sent_length= memcached_io_flush(ptr, server_key); + sent_length= io_flush(ptr, server_key, &rc); + if (sent_length == -1) + return -1; - assert(sent_length == MEMCACHED_MAX_BUFFER); - ptr->write_buffer_offset= 0; + WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + ptr->hosts[server_key].write_buffer_offset= 0; } } if (with_flush) - memcached_io_flush(ptr, server_key); + { + memcached_return rc; + if (io_flush(ptr, server_key, &rc) == -1) + return -1; + } return length; } -ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) +memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) { - size_t sent_length; + close(ptr->hosts[server_key].fd); - if (ptr->write_buffer_offset == 0) - return 0; + return MEMCACHED_SUCCESS; +} - if (ptr->flags & MEM_NO_BLOCK) - { - struct timeval local_tv; - fd_set set; +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error) +{ + size_t sent_length; + size_t return_length; + char *write_ptr= ptr->hosts[server_key].write_buffer; + size_t write_length= ptr->hosts[server_key].write_buffer_offset; - local_tv.tv_sec= 0; - local_tv.tv_usec= 300; + *error= MEMCACHED_SUCCESS; - FD_ZERO(&set); - FD_SET(ptr->hosts[server_key].fd, &set); + if (ptr->hosts[server_key].write_buffer_offset == 0) + return 0; - select(1, NULL, &set, NULL, &local_tv); - } - if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, - ptr->write_buffer_offset, 0)) == -1) + return_length= 0; + while (write_length) { - return -1; - } + sent_length= 0; + if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP) + { + sent_length= sendto(ptr->hosts[server_key].fd, + write_ptr, write_length, 0, + (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr, + sizeof(struct sockaddr)); + } + else + { + if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr, + write_length)) == -1) + { + switch (errno) + { + case ENOBUFS: + continue; + case EAGAIN: + { + memcached_return rc; + rc= io_wait(ptr, server_key, MEM_WRITE); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; + } + default: + memcached_quit_server(ptr, server_key, 1); + ptr->cached_errno= errno; + *error= MEMCACHED_ERRNO; + return -1; + } + } + } - assert(sent_length == ptr->write_buffer_offset); + write_ptr+= sent_length; + write_length-= sent_length; + return_length+= sent_length; + } - ptr->write_buffer_offset= 0; + WATCHPOINT_ASSERT(write_length == 0); + WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset); + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + ptr->hosts[server_key].write_buffer_offset= 0; - return sent_length; + return return_length; } /* @@ -150,6 +268,7 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) */ void memcached_io_reset(memcached_st *ptr, unsigned int server_key) { - ptr->write_buffer_offset= 0; + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + ptr->hosts[server_key].write_buffer_offset= 0; memcached_quit(ptr); }