X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=8f72f54d0104fe255c1503a068713e9b01d17ff4;hb=82a0e8be2e1002c6f173baf7c2dfdbb281ee2136;hp=473b8881dc2bd61f1d6703d948ff35887da307e2;hpb=c1f907e3ade57bd12d08277f6b805c958f3cf581;p=m6w6%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 473b8881..8f72f54d 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -11,6 +11,8 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ { struct pollfd fds[1]; short flags= 0; + int error; + int latch= 0; if (read_or_write) flags= POLLOUT | POLLERR; @@ -21,10 +23,30 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ fds[0].fd= ptr->hosts[server_key].fd; fds[0].events= flags; - if (poll(fds, 1, -1) < 0) - return MEMCACHED_FAILURE; + while (latch == 0) + { + error= poll(fds, 1, ptr->poll_timeout); + + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == -1) + { + memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; + } + else if (error) + { + /* This is impossible */ + WATCHPOINT_ASSERT(0); + return MEMCACHED_FAILURE; + } + else + latch++; + } + + memcached_quit_server(ptr, server_key, 1); - return MEMCACHED_SUCCESS; + return MEMCACHED_FAILURE; /* Timeout occurred */ } ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, @@ -62,6 +84,7 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, break; default: { + memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; return -1; } @@ -93,7 +116,11 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, for (x= 0; x < length; x++) { - ptr->hosts[server_key].write_buffer[ptr->hosts[server_key].write_buffer_offset]= buffer[x]; + if (ptr->hosts[server_key].write_ptr == 0) + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr); + *ptr->hosts[server_key].write_ptr= buffer[x]; + ptr->hosts[server_key].write_ptr++; ptr->hosts[server_key].write_buffer_offset++; if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER) @@ -101,8 +128,11 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, size_t sent_length; sent_length= memcached_io_flush(ptr, server_key); + if (sent_length == -1) + return -1; WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; ptr->hosts[server_key].write_buffer_offset= 0; } } @@ -116,6 +146,40 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, return length; } +memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) +{ + memcached_return rc; + + rc= MEMCACHED_SUCCESS; + if (ptr->flags & MEM_NO_BLOCK) + { + int error; + struct pollfd fds[1]; + short flags= 0; + + flags= POLLHUP | POLLERR; + + memset(&fds, 0, sizeof(struct pollfd)); + fds[0].fd= ptr->hosts[server_key].fd; + fds[0].events= flags; + fds[0].revents= 0; + + error= poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout); + + if (error == -1) + { + memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; + } + else if (error == 0) + return MEMCACHED_FAILURE; /* Timeout occurred */ + } + + close(ptr->hosts[server_key].fd); + + return rc; +} + ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) { size_t sent_length; @@ -140,24 +204,35 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) } sent_length= 0; - if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr, - write_length)) == -1) + if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP) { - switch (errno) + sent_length= sendto(ptr->hosts[server_key].fd, + write_ptr, write_length, 0, + (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr, + sizeof(struct sockaddr)); + } + else + { + if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr, + write_length)) == -1) { - case ENOBUFS: - case EAGAIN: - WATCHPOINT; - continue; - if (loop < 100) + switch (errno) { - loop++; - break; + case ENOBUFS: + case EAGAIN: + WATCHPOINT; + continue; + if (loop < 100) + { + loop++; + break; + } + /* Yes, we want to fall through */ + default: + memcached_quit_server(ptr, server_key, 1); + ptr->cached_errno= errno; + return -1; } - /* Yes, we want to fall through */ - default: - ptr->cached_errno= errno; - return -1; } } @@ -168,6 +243,7 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) WATCHPOINT_ASSERT(write_length == 0); WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset); + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; ptr->hosts[server_key].write_buffer_offset= 0; return return_length; @@ -178,6 +254,7 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) */ void memcached_io_reset(memcached_st *ptr, unsigned int server_key) { + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; ptr->hosts[server_key].write_buffer_offset= 0; memcached_quit(ptr); }