X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=54a4b8dcd16d6c154604349533c03c9a067942e0;hb=7add2cb42be7d072888ab916eb35cdef8a2774a1;hp=77df4319076b8286203cc7d6adfdbb4ea3660c00;hpb=852e3f2d609c4aac766405c27cd8c6774fb20a23;p=awesomized%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 77df4319..54a4b8dc 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -2,42 +2,90 @@ Basic socket buffered IO */ -#include +#include "common.h" +#include "memcached_io.h" +#include ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, char *buffer, size_t length) { - size_t x; char *buffer_ptr; buffer_ptr= buffer; - for (x= 0, buffer_ptr= buffer; - x < length; x++) + while (length) { if (!ptr->read_buffer_length) { - ptr->read_buffer_length= recv(ptr->hosts[server_key].fd, - ptr->read_buffer, - MEMCACHED_MAX_BUFFER, 0); + size_t data_read; + + while (1) + { + if (ptr->flags & MEM_NO_BLOCK) + { + while (1) + { + int select_return; + struct timeval local_tv; + fd_set set; + + memset(&local_tv, 0, sizeof(struct timeval)); + + local_tv.tv_sec= 0; + local_tv.tv_usec= 300; + + FD_ZERO(&set); + FD_SET(ptr->hosts[server_key].fd, &set); + + select_return= select(1, &set, NULL, NULL, &local_tv); + + if (select_return == -1) + { + ptr->my_errno= errno; + return -1; + } + else if (!select_return) + break; + } + } + + data_read= recv(ptr->hosts[server_key].fd, + ptr->read_buffer, + MEMCACHED_MAX_BUFFER, 0); + if (data_read == -1) + { + switch (errno) + { + case EAGAIN: + break; + default: + { + ptr->my_errno= errno; + return -1; + } + } + } + else if (data_read) + break; + /* If zero, just keep looping */ + } + + ptr->read_buffer_length= data_read; ptr->read_ptr= ptr->read_buffer; - - if (ptr->read_buffer_length == -1) - return -1; - if (ptr->read_buffer_length == 0) - return x; } + *buffer_ptr= *ptr->read_ptr; - buffer_ptr++; + length--; ptr->read_ptr++; ptr->read_buffer_length--; + buffer_ptr++; } - return length; + return (size_t)(buffer_ptr - buffer); } ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, - char *buffer, size_t length) + char *buffer, size_t length, char with_flush) { unsigned long long x; @@ -45,41 +93,97 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, { ptr->write_buffer[ptr->write_buffer_offset]= buffer[x]; ptr->write_buffer_offset++; + if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) { size_t sent_length; - if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, - MEMCACHED_MAX_BUFFER, 0)) == -1) - return -1; + sent_length= memcached_io_flush(ptr, server_key); assert(sent_length == MEMCACHED_MAX_BUFFER); - ptr->write_between_flush+= MEMCACHED_MAX_BUFFER; - ptr->write_buffer_offset= 0; } } + if (with_flush) + { + if (memcached_io_flush(ptr, server_key) == -1) + return -1; + } + return length; } ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) { size_t sent_length; + char *write_ptr= ptr->write_buffer; + size_t write_length= ptr->write_buffer_offset; + unsigned int loop= 1; if (ptr->write_buffer_offset == 0) return 0; - if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, - ptr->write_buffer_offset, 0)) == -1) - return -1; + while (write_length) + { + if (ptr->flags & MEM_NO_BLOCK) + { + + while (1) + { + struct timeval local_tv; + fd_set set; + int select_return; + + local_tv.tv_sec= 0; + local_tv.tv_usec= 300 * loop; - assert(sent_length == ptr->write_buffer_offset); + FD_ZERO(&set); + FD_SET(ptr->hosts[server_key].fd, &set); + + select_return= select(1, NULL, &set, NULL, &local_tv); + + if (select_return == -1) + { + ptr->my_errno= errno; + return -1; + } + else if (!select_return) + break; + } + } - sent_length+= ptr->write_between_flush; + sent_length= 0; +#ifdef orig + if ((sent_length= send(ptr->hosts[server_key].fd, write_ptr, + write_length, 0)) == -1) +#endif + if ((sent_length= write(ptr->hosts[server_key].fd, write_ptr, + write_length)) == -1) + { + switch (errno) + { + case ENOBUFS: + case EAGAIN: + if (loop < 10) + { + loop++; + break; + } + /* Yes, we want to fall through */ + default: + ptr->my_errno= errno; + return -1; + } + } + else + { + write_ptr+= sent_length; + write_length-= sent_length; + } + } ptr->write_buffer_offset= 0; - ptr->write_between_flush= 0; return sent_length; }