X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=9277b6ba3a883a387829acc9130046843f3756f4;hb=97ff13b28143ff1346f7600d3a7d253347496463;hp=7067237ffd6f72ecc47317f7fdb74d90a056c585;hpb=cf34e0d38a58e81ec3993af0db6c221585d712e5;p=awesomized%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 7067237f..9277b6ba 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -2,15 +2,91 @@ Basic socket buffered IO */ -#include +#include "common.h" +#include "memcached_io.h" +#include -ssize_t memcached_io_read(memcached_st *ptr, char *buf, size_t length) +ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, + char *buffer, size_t length) { - return -1; + size_t x; + char *buffer_ptr; + + buffer_ptr= buffer; + + for (x= 0, buffer_ptr= buffer; + x < length; x++) + { + if (!ptr->read_buffer_length) + { + if (length > 1) + { + + size_t data_read; + data_read= recv(ptr->hosts[server_key].fd, + buffer_ptr, + length - x, 0); + if (data_read == -1) + { + return -1; + } + if (data_read == 0) + return x; + + data_read+= x; + + return data_read; + } + else + { + size_t data_read; +try_again: + + if (ptr->flags & MEM_NO_BLOCK) + { + struct timeval local_tv; + fd_set set; + + memset(&local_tv, 0, sizeof(struct timeval)); + + local_tv.tv_sec= 0; + local_tv.tv_usec= 300; + + FD_ZERO(&set); + FD_SET(ptr->hosts[server_key].fd, &set); + + select(1, &set, NULL, NULL, &local_tv); + } + + data_read= recv(ptr->hosts[server_key].fd, + ptr->read_buffer, + MEMCACHED_MAX_BUFFER, 0); + if (data_read == -1) + { + if (errno == EAGAIN) + goto try_again; + return -1; + } + ptr->read_buffer_length= data_read; + ptr->read_ptr= ptr->read_buffer; + } + + if (ptr->read_buffer_length == -1) + return -1; + if (ptr->read_buffer_length == 0) + return x; + } + *buffer_ptr= *ptr->read_ptr; + buffer_ptr++; + ptr->read_ptr++; + ptr->read_buffer_length--; + } + + return length; } ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, - char *buffer, size_t length) + char *buffer, size_t length, char with_flush) { unsigned long long x; @@ -18,21 +94,21 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, { ptr->write_buffer[ptr->write_buffer_offset]= buffer[x]; ptr->write_buffer_offset++; + if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) { size_t sent_length; - if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, - MEMCACHED_MAX_BUFFER, 0)) == -1) - return -1; + sent_length= memcached_io_flush(ptr, server_key); assert(sent_length == MEMCACHED_MAX_BUFFER); - ptr->write_between_flush+= MEMCACHED_MAX_BUFFER; - ptr->write_buffer_offset= 0; } } + if (with_flush) + memcached_io_flush(ptr, server_key); + return length; } @@ -43,16 +119,28 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) if (ptr->write_buffer_offset == 0) return 0; + if (ptr->flags & MEM_NO_BLOCK) + { + struct timeval local_tv; + fd_set set; + + local_tv.tv_sec= 0; + local_tv.tv_usec= 300; + + FD_ZERO(&set); + FD_SET(ptr->hosts[server_key].fd, &set); + + select(1, NULL, &set, NULL, &local_tv); + } if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, ptr->write_buffer_offset, 0)) == -1) + { return -1; + } assert(sent_length == ptr->write_buffer_offset); - sent_length+= ptr->write_between_flush; - ptr->write_buffer_offset= 0; - ptr->write_between_flush= 0; return sent_length; }