X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=8f72f54d0104fe255c1503a068713e9b01d17ff4;hb=e2a313f77fd825bdaec9db1649b0549b2d5962c2;hp=973ccf95f676ccc428657b200ba30b287d3c68c5;hpb=52d5f8e32ae6e26932a07749ef5d5640e815a3f7;p=awesomized%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 973ccf95..8f72f54d 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -11,10 +11,9 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ { struct pollfd fds[1]; short flags= 0; - struct timespec timer; + int error; + int latch= 0; - timer.tv_sec= 1; - timer.tv_nsec= 0; if (read_or_write) flags= POLLOUT | POLLERR; else @@ -24,10 +23,30 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ fds[0].fd= ptr->hosts[server_key].fd; fds[0].events= flags; - if (poll(fds, 1, ptr->poll_timeout) < 0) - return MEMCACHED_FAILURE; + while (latch == 0) + { + error= poll(fds, 1, ptr->poll_timeout); + + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == -1) + { + memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; + } + else if (error) + { + /* This is impossible */ + WATCHPOINT_ASSERT(0); + return MEMCACHED_FAILURE; + } + else + latch++; + } + + memcached_quit_server(ptr, server_key, 1); - return MEMCACHED_SUCCESS; + return MEMCACHED_FAILURE; /* Timeout occurred */ } ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, @@ -65,6 +84,7 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, break; default: { + memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; return -1; } @@ -108,6 +128,8 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, size_t sent_length; sent_length= memcached_io_flush(ptr, server_key); + if (sent_length == -1) + return -1; WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; @@ -124,6 +146,40 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, return length; } +memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) +{ + memcached_return rc; + + rc= MEMCACHED_SUCCESS; + if (ptr->flags & MEM_NO_BLOCK) + { + int error; + struct pollfd fds[1]; + short flags= 0; + + flags= POLLHUP | POLLERR; + + memset(&fds, 0, sizeof(struct pollfd)); + fds[0].fd= ptr->hosts[server_key].fd; + fds[0].events= flags; + fds[0].revents= 0; + + error= poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout); + + if (error == -1) + { + memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; + } + else if (error == 0) + return MEMCACHED_FAILURE; /* Timeout occurred */ + } + + close(ptr->hosts[server_key].fd); + + return rc; +} + ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) { size_t sent_length; @@ -152,7 +208,7 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) { sent_length= sendto(ptr->hosts[server_key].fd, write_ptr, write_length, 0, - (struct sockaddr *)&ptr->hosts[server_key].servAddr, + (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr, sizeof(struct sockaddr)); } else @@ -173,6 +229,7 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) } /* Yes, we want to fall through */ default: + memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; return -1; }