X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=d2c803f3dd690393431a3b3af3a358a45324edf4;hb=77d70ea2571e43495bb66efb9072f6a8b5009b64;hp=401039b4efbd831ee2c0eddc1ab0e3f8692aac23;hpb=85a66fec1e07e874e5410ce56a2976d68fbe14fd;p=awesomized%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 401039b4..d2c803f3 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -2,87 +2,88 @@ Basic socket buffered IO */ -#include +#include "common.h" #include "memcached_io.h" #include +#include + +static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write) +{ + struct pollfd fds[1]; + short flags= 0; + + if (read_or_write) + flags= POLLOUT | POLLERR; + else + flags= POLLIN | POLLERR; + + memset(&fds, 0, sizeof(struct pollfd)); + fds[0].fd= ptr->hosts[server_key].fd; + fds[0].events= flags; + + if (poll(fds, 1, -1) < 0) + return MEMCACHED_FAILURE; + + return MEMCACHED_SUCCESS; +} ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, char *buffer, size_t length) { - size_t x; char *buffer_ptr; buffer_ptr= buffer; - for (x= 0, buffer_ptr= buffer; - x < length; x++) + while (length) { if (!ptr->read_buffer_length) { - if (length > 1) - { + size_t data_read; - size_t data_read; - data_read= recv(ptr->hosts[server_key].fd, - buffer_ptr, - length - x, 0); - if (data_read == -1) - { - return -1; - } - if (data_read == 0) - return x; - - data_read+= x; - - return data_read; - } - else + while (1) { - size_t data_read; -try_again: - if (ptr->flags & MEM_NO_BLOCK) { - struct timeval local_tv; - fd_set set; + memcached_return rc; - memset(&local_tv, 0, sizeof(struct timeval)); - - local_tv.tv_sec= 0; - local_tv.tv_usec= 300; - - FD_ZERO(&set); - FD_SET(ptr->hosts[server_key].fd, &set); - - select(1, &set, NULL, NULL, &local_tv); + rc= io_wait(ptr, server_key, 0); + if (rc != MEMCACHED_SUCCESS) + return -1; } - data_read= recv(ptr->hosts[server_key].fd, + data_read= read(ptr->hosts[server_key].fd, ptr->read_buffer, - MEMCACHED_MAX_BUFFER, 0); + MEMCACHED_MAX_BUFFER); if (data_read == -1) { - if (errno == EAGAIN) - goto try_again; - return -1; + switch (errno) + { + case EAGAIN: + break; + default: + { + ptr->my_errno= errno; + return -1; + } + } } - ptr->read_buffer_length= data_read; - ptr->read_ptr= ptr->read_buffer; + else if (data_read) + break; + /* If zero, just keep looping */ } - if (ptr->read_buffer_length == -1) - return -1; - if (ptr->read_buffer_length == 0) - return x; + ptr->read_buffer_length= data_read; + ptr->read_ptr= ptr->read_buffer; } + *buffer_ptr= *ptr->read_ptr; - buffer_ptr++; + length--; ptr->read_ptr++; ptr->read_buffer_length--; + buffer_ptr++; } - return length; + return (size_t)(buffer_ptr - buffer); } ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, @@ -101,13 +102,16 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, sent_length= memcached_io_flush(ptr, server_key); - assert(sent_length == MEMCACHED_MAX_BUFFER); + WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); ptr->write_buffer_offset= 0; } } if (with_flush) - memcached_io_flush(ptr, server_key); + { + if (memcached_io_flush(ptr, server_key) == -1) + return -1; + } return length; } @@ -115,34 +119,58 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) { size_t sent_length; + size_t return_length; + char *write_ptr= ptr->write_buffer; + size_t write_length= ptr->write_buffer_offset; + unsigned int loop= 1; if (ptr->write_buffer_offset == 0) return 0; - if (ptr->flags & MEM_NO_BLOCK) + return_length= 0; + while (write_length) { - struct timeval local_tv; - fd_set set; + if (ptr->flags & MEM_NO_BLOCK) + { + memcached_return rc; - local_tv.tv_sec= 0; - local_tv.tv_usec= 300; + rc= io_wait(ptr, server_key, 1); + if (rc != MEMCACHED_SUCCESS) + return -1; + } - FD_ZERO(&set); - FD_SET(ptr->hosts[server_key].fd, &set); + sent_length= 0; + if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr, + write_length)) == -1) + { + switch (errno) + { + case ENOBUFS: + case EAGAIN: + WATCHPOINT; + continue; + if (loop < 100) + { + loop++; + break; + } + /* Yes, we want to fall through */ + default: + ptr->my_errno= errno; + return -1; + } + } - select(1, NULL, &set, NULL, &local_tv); - } - if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, - ptr->write_buffer_offset, 0)) == -1) - { - return -1; + write_ptr+= sent_length; + write_length-= sent_length; + return_length+= sent_length; } - assert(sent_length == ptr->write_buffer_offset); - + WATCHPOINT_ASSERT(write_length == 0); + WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset); ptr->write_buffer_offset= 0; - return sent_length; + return return_length; } /*