X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=49f6bc686362b70788f3611a091ffe4a52d55d42;hb=1bd637f6eccc1064a4e19e40b14a340818d1ccf3;hp=5e05d2c5ac8a84206541754533f1ed325d4a3d3f;hpb=7ef83f5c1f71a8527a5f3001d72772ac692bcf14;p=m6w6%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 5e05d2c5..49f6bc68 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -5,6 +5,27 @@ #include "common.h" #include "memcached_io.h" #include +#include + +int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write) +{ + struct pollfd fds[1]; + short flags= 0; + + if (read_or_write) + flags= POLLOUT | POLLERR; + else + flags= POLLIN | POLLERR; + + memset(&fds, 0, sizeof(struct pollfd)); + fds[0].fd= ptr->hosts[server_key].fd; + fds[0].events= flags; + + if (poll(fds, 1, -1) < 0) + return MEMCACHED_FAILURE; + + return MEMCACHED_SUCCESS; +} ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, char *buffer, size_t length) @@ -23,35 +44,16 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, { if (ptr->flags & MEM_NO_BLOCK) { - while (1) - { - int select_return; - struct timeval local_tv; - fd_set set; + memcached_return rc; - memset(&local_tv, 0, sizeof(struct timeval)); - - local_tv.tv_sec= 0; - local_tv.tv_usec= 300; - - FD_ZERO(&set); - FD_SET(ptr->hosts[server_key].fd, &set); - - select_return= select(1, &set, NULL, NULL, &local_tv); - - if (select_return == -1) - { - ptr->my_errno= errno; - return -1; - } - else if (!select_return) - break; - } + rc= io_wait(ptr, server_key, 0); + if (rc != MEMCACHED_SUCCESS) + return -1; } - data_read= recv(ptr->hosts[server_key].fd, + data_read= read(ptr->hosts[server_key].fd, ptr->read_buffer, - MEMCACHED_MAX_BUFFER, 0); + MEMCACHED_MAX_BUFFER); if (data_read == -1) { switch (errno) @@ -100,7 +102,7 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, sent_length= memcached_io_flush(ptr, server_key); - assert(sent_length == MEMCACHED_MAX_BUFFER); + WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); ptr->write_buffer_offset= 0; } } @@ -117,6 +119,7 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) { size_t sent_length; + size_t return_length; char *write_ptr= ptr->write_buffer; size_t write_length= ptr->write_buffer_offset; unsigned int loop= 1; @@ -124,44 +127,29 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) if (ptr->write_buffer_offset == 0) return 0; + return_length= 0; while (write_length) { if (ptr->flags & MEM_NO_BLOCK) { + memcached_return rc; - while (1) - { - struct timeval local_tv; - fd_set set; - int select_return; - - local_tv.tv_sec= 0; - local_tv.tv_usec= 300 * loop; - - FD_ZERO(&set); - FD_SET(ptr->hosts[server_key].fd, &set); - - select_return= select(1, NULL, &set, NULL, &local_tv); - - if (select_return == -1) - { - ptr->my_errno= errno; - return -1; - } - else if (!select_return) - break; - } + rc= io_wait(ptr, server_key, 1); + if (rc != MEMCACHED_SUCCESS) + return -1; } sent_length= 0; - if ((sent_length= send(ptr->hosts[server_key].fd, write_ptr, - write_length, 0)) == -1) + if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr, + write_length)) == -1) { switch (errno) { case ENOBUFS: case EAGAIN: - if (loop < 10) + WATCHPOINT; + continue; + if (loop < 100) { loop++; break; @@ -172,16 +160,17 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) return -1; } } - else - { - write_ptr+= sent_length; - write_length-= sent_length; - } + + write_ptr+= sent_length; + write_length-= sent_length; + return_length+= sent_length; } + WATCHPOINT_ASSERT(write_length == 0); + WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset); ptr->write_buffer_offset= 0; - return sent_length; + return return_length; } /*