X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;ds=sidebyside;f=lib%2Fmemcached_io.c;h=2c1bb14c3860df761f6011fe799bb9ebdf0d6793;hb=80f385f689fd04194182c1b202e576ad065626a9;hp=8f72f54d0104fe255c1503a068713e9b01d17ff4;hpb=e1944ccd4d1e65f4192783fa9c564c9b747bb618;p=awesomized%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 8f72f54d..2c1bb14c 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -7,14 +7,22 @@ #include #include -static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write) +typedef enum { + MEM_READ, + MEM_WRITE, +} memc_read_or_write; + +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error); + +static memcached_return io_wait(memcached_st *ptr, unsigned int server_key, + memc_read_or_write read_or_write) { struct pollfd fds[1]; short flags= 0; int error; - int latch= 0; - if (read_or_write) + if (read_or_write == MEM_WRITE) /* write */ flags= POLLOUT | POLLERR; else flags= POLLIN | POLLERR; @@ -23,30 +31,21 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ fds[0].fd= ptr->hosts[server_key].fd; fds[0].events= flags; - while (latch == 0) - { - error= poll(fds, 1, ptr->poll_timeout); + error= poll(fds, 1, ptr->poll_timeout); - if (error == 1) - return MEMCACHED_SUCCESS; - else if (error == -1) - { - memcached_quit_server(ptr, server_key, 1); - return MEMCACHED_FAILURE; - } - else if (error) - { - /* This is impossible */ - WATCHPOINT_ASSERT(0); - return MEMCACHED_FAILURE; - } - else - latch++; + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == 0) + { + return MEMCACHED_TIMEOUT; } + WATCHPOINT; + /* Imposssible for anything other then -1 */ + WATCHPOINT_ASSERT(error == -1); memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; - return MEMCACHED_FAILURE; /* Timeout occurred */ } ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, @@ -64,15 +63,6 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, while (1) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 0); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - data_read= read(ptr->hosts[server_key].fd, ptr->hosts[server_key].read_buffer, MEMCACHED_MAX_BUFFER); @@ -81,7 +71,16 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, switch (errno) { case EAGAIN: - break; + { + memcached_return rc; + rc= io_wait(ptr, server_key, MEM_READ); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; + } default: { memcached_quit_server(ptr, server_key, 1); @@ -125,9 +124,10 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER) { + memcached_return rc; size_t sent_length; - sent_length= memcached_io_flush(ptr, server_key); + sent_length= io_flush(ptr, server_key, &rc); if (sent_length == -1) return -1; @@ -139,7 +139,8 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, if (with_flush) { - if (memcached_io_flush(ptr, server_key) == -1) + memcached_return rc; + if (io_flush(ptr, server_key, &rc) == -1) return -1; } @@ -148,45 +149,20 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) { - memcached_return rc; - - rc= MEMCACHED_SUCCESS; - if (ptr->flags & MEM_NO_BLOCK) - { - int error; - struct pollfd fds[1]; - short flags= 0; - - flags= POLLHUP | POLLERR; - - memset(&fds, 0, sizeof(struct pollfd)); - fds[0].fd= ptr->hosts[server_key].fd; - fds[0].events= flags; - fds[0].revents= 0; - - error= poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout); - - if (error == -1) - { - memcached_quit_server(ptr, server_key, 1); - return MEMCACHED_FAILURE; - } - else if (error == 0) - return MEMCACHED_FAILURE; /* Timeout occurred */ - } - close(ptr->hosts[server_key].fd); - return rc; + return MEMCACHED_SUCCESS; } -ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error) { size_t sent_length; size_t return_length; char *write_ptr= ptr->hosts[server_key].write_buffer; size_t write_length= ptr->hosts[server_key].write_buffer_offset; - unsigned int loop= 1; + + *error= MEMCACHED_SUCCESS; if (ptr->hosts[server_key].write_buffer_offset == 0) return 0; @@ -194,15 +170,6 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) return_length= 0; while (write_length) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 1); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - sent_length= 0; if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP) { @@ -219,18 +186,22 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) switch (errno) { case ENOBUFS: - case EAGAIN: - WATCHPOINT; continue; - if (loop < 100) + case EAGAIN: { - loop++; - break; + memcached_return rc; + rc= io_wait(ptr, server_key, MEM_WRITE); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; } - /* Yes, we want to fall through */ default: memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; + *error= MEMCACHED_ERRNO; return -1; } }