X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fmemcached_io.c;h=69771d71f172c7f421c2aea792b3af1e9d2e7cdf;hb=bbd526419742d4a9bb99fcb1a56a80969ebe90fb;hp=dc12ebe7d405cc1630444f19d730abc86a5ea45e;hpb=70d4781acf107377fa194a04d05e8cf31fd211c8;p=awesomized%2Flibmemcached diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index dc12ebe7..69771d71 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -23,9 +23,9 @@ static memcached_return io_wait(memcached_server_st *ptr, int error; if (read_or_write == MEM_WRITE) /* write */ - flags= POLLOUT | POLLERR; + flags= POLLOUT; else - flags= POLLIN | POLLERR; + flags= POLLIN; memset(&fds, 0, sizeof(struct pollfd)); fds[0].fd= ptr->fd; @@ -90,8 +90,8 @@ void memcached_io_preread(memcached_st *ptr) } #endif -ssize_t memcached_io_read(memcached_server_st *ptr, - void *buffer, size_t length) +memcached_return memcached_io_read(memcached_server_st *ptr, + void *buffer, size_t length, ssize_t *nread) { char *buffer_ptr; @@ -105,31 +105,26 @@ ssize_t memcached_io_read(memcached_server_st *ptr, while (1) { - data_read= read(ptr->fd, - ptr->read_buffer, - MEMCACHED_MAX_BUFFER); + data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER); if (data_read > 0) break; else if (data_read == -1) { ptr->cached_errno= errno; + memcached_return rc= MEMCACHED_UNKNOWN_READ_FAILURE; switch (errno) { case EAGAIN: - case EINTR: - { - memcached_return rc; - - rc= io_wait(ptr, MEM_READ); + case EINTR: + if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS) + continue; + /* fall through */ - if (rc == MEMCACHED_SUCCESS) - continue; - } - /* fall trough */ default: { memcached_quit_server(ptr, 1); - return -1; + *nread= -1; + return rc; } } } @@ -145,7 +140,8 @@ ssize_t memcached_io_read(memcached_server_st *ptr, it will return EGAIN if data is not immediatly available. */ memcached_quit_server(ptr, 1); - return -1; + *nread= -1; + return MEMCACHED_UNKNOWN_READ_FAILURE; } } @@ -177,7 +173,9 @@ ssize_t memcached_io_read(memcached_server_st *ptr, } } - return (size_t)(buffer_ptr - (char*)buffer); + ptr->server_failure_counter= 0; + *nread = (size_t)(buffer_ptr - (char*)buffer); + return MEMCACHED_SUCCESS; } ssize_t memcached_io_write(memcached_server_st *ptr, @@ -229,8 +227,10 @@ ssize_t memcached_io_write(memcached_server_st *ptr, return -1; /* If io_flush calls memcached_purge, sent_length may be 0 */ - if (sent_length != 0) - WATCHPOINT_ASSERT(sent_length == buffer_end); + unlikely (sent_length != 0) + { + WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end); + } } } @@ -257,7 +257,7 @@ memcached_return memcached_io_close(memcached_server_st *ptr) { r= shutdown(ptr->fd, SHUT_RDWR); -#ifdef HAVE_DEBUG +#ifdef DEBUG if (r && errno != ENOTCONN) { WATCHPOINT_NUMBER(ptr->fd); @@ -268,7 +268,7 @@ memcached_return memcached_io_close(memcached_server_st *ptr) } r= close(ptr->fd); -#ifdef HAVE_DEBUG +#ifdef DEBUG if (r != 0) WATCHPOINT_ERRNO(errno); #endif @@ -276,6 +276,56 @@ memcached_return memcached_io_close(memcached_server_st *ptr) return MEMCACHED_SUCCESS; } +memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) +{ +#define MAX_SERVERS_TO_POLL 100 + struct pollfd fds[MAX_SERVERS_TO_POLL]; + unsigned int host_index= 0; + + for (unsigned int x= 0; + x< memc->number_of_hosts && host_index < MAX_SERVERS_TO_POLL; + ++x) + { + if (memc->hosts[x].read_buffer_length > 0) /* I have data in the buffer */ + return &memc->hosts[x]; + + if (memcached_server_response_count(&memc->hosts[x]) > 0) + { + fds[host_index].events = POLLIN; + fds[host_index].revents = 0; + fds[host_index].fd = memc->hosts[x].fd; + ++host_index; + } + } + + if (host_index < 2) + { + /* We have 0 or 1 server with pending events.. */ + for (unsigned int x= 0; x< memc->number_of_hosts; ++x) + if (memcached_server_response_count(&memc->hosts[x]) > 0) + return &memc->hosts[x]; + + return NULL; + } + + int err= poll(fds, host_index, memc->poll_timeout); + switch (err) { + case -1: + memc->cached_errno = errno; + /* FALLTHROUGH */ + case 0: + break; + default: + for (unsigned int x= 0; x < host_index; ++x) + if (fds[x].revents & POLLIN) + for (unsigned int y= 0; y < memc->number_of_hosts; ++y) + if (memc->hosts[y].fd == fds[x].fd) + return &memc->hosts[y]; + } + + return NULL; +} + static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error) { @@ -310,7 +360,7 @@ static ssize_t io_flush(memcached_server_st *ptr, return 0; /* Looking for memory overflows */ -#if defined(HAVE_DEBUG) +#if defined(DEBUG) if (write_length == MEMCACHED_MAX_BUFFER) WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); @@ -351,7 +401,8 @@ static ssize_t io_flush(memcached_server_st *ptr, } } - if (ptr->type == MEMCACHED_CONNECTION_UDP && sent_length != write_length) + if (ptr->type == MEMCACHED_CONNECTION_UDP && + (size_t)sent_length != write_length) { memcached_quit_server(ptr, 1); return -1; @@ -399,12 +450,12 @@ memcached_return memcached_safe_read(memcached_server_st *ptr, while (offset < size) { - ssize_t nread= memcached_io_read(ptr, data + offset, size - offset); - if (nread <= 0) - { - memcached_io_reset(ptr); - return MEMCACHED_UNKNOWN_READ_FAILURE; - } + ssize_t nread; + memcached_return rc= memcached_io_read(ptr, data + offset, size - offset, + &nread); + if (rc != MEMCACHED_SUCCESS) + return rc; + offset+= nread; } @@ -416,7 +467,7 @@ memcached_return memcached_io_readline(memcached_server_st *ptr, size_t size) { bool line_complete= false; - int total_nr= 0; + size_t total_nr= 0; while (!line_complete) { @@ -427,8 +478,10 @@ memcached_return memcached_io_readline(memcached_server_st *ptr, * buffer. Call the standard read function to avoid duplicating * the logic. */ - if (memcached_io_read(ptr, buffer_ptr, 1) != 1) - return MEMCACHED_UNKNOWN_READ_FAILURE; + ssize_t nread; + memcached_return rc= memcached_io_read(ptr, buffer_ptr, 1, &nread); + if (rc != MEMCACHED_SUCCESS) + return rc; if (*buffer_ptr == '\n') line_complete= true;