X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=2c1bb14c3860df761f6011fe799bb9ebdf0d6793;hb=80f385f689fd04194182c1b202e576ad065626a9;hp=54844be445956547705d779bc1eb30934a4b0f1d;hpb=c9f472b091b351b206d34b9fa37e28b6d1e4202c;p=awesomized%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 54844be4..2c1bb14c 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -7,12 +7,22 @@ #include #include -static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write) +typedef enum { + MEM_READ, + MEM_WRITE, +} memc_read_or_write; + +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error); + +static memcached_return io_wait(memcached_st *ptr, unsigned int server_key, + memc_read_or_write read_or_write) { struct pollfd fds[1]; short flags= 0; + int error; - if (read_or_write) + if (read_or_write == MEM_WRITE) /* write */ flags= POLLOUT | POLLERR; else flags= POLLIN | POLLERR; @@ -21,10 +31,21 @@ static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_ fds[0].fd= ptr->hosts[server_key].fd; fds[0].events= flags; - if (poll(fds, 1, -1) < 0) - return MEMCACHED_FAILURE; + error= poll(fds, 1, ptr->poll_timeout); + + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == 0) + { + return MEMCACHED_TIMEOUT; + } + + WATCHPOINT; + /* Imposssible for anything other then -1 */ + WATCHPOINT_ASSERT(error == -1); + memcached_quit_server(ptr, server_key, 1); + return MEMCACHED_FAILURE; - return MEMCACHED_SUCCESS; } ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, @@ -36,32 +57,33 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, while (length) { - if (!ptr->read_buffer_length) + if (!ptr->hosts[server_key].read_buffer_length) { size_t data_read; while (1) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 0); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - data_read= read(ptr->hosts[server_key].fd, - ptr->read_buffer, + ptr->hosts[server_key].read_buffer, MEMCACHED_MAX_BUFFER); if (data_read == -1) { switch (errno) { case EAGAIN: - break; + { + memcached_return rc; + rc= io_wait(ptr, server_key, MEM_READ); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; + } default: { + memcached_quit_server(ptr, server_key, 1); ptr->cached_errno= errno; return -1; } @@ -72,14 +94,14 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, /* If zero, just keep looping */ } - ptr->read_buffer_length= data_read; - ptr->read_ptr= ptr->read_buffer; + ptr->hosts[server_key].read_buffer_length= data_read; + ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer; } - *buffer_ptr= *ptr->read_ptr; + *buffer_ptr= *ptr->hosts[server_key].read_ptr; length--; - ptr->read_ptr++; - ptr->read_buffer_length--; + ptr->hosts[server_key].read_ptr++; + ptr->hosts[server_key].read_buffer_length--; buffer_ptr++; } @@ -93,71 +115,95 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, for (x= 0; x < length; x++) { - ptr->write_buffer[ptr->write_buffer_offset]= buffer[x]; - ptr->write_buffer_offset++; - - if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) + if (ptr->hosts[server_key].write_ptr == 0) + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr); + *ptr->hosts[server_key].write_ptr= buffer[x]; + ptr->hosts[server_key].write_ptr++; + ptr->hosts[server_key].write_buffer_offset++; + + if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER) { + memcached_return rc; size_t sent_length; - sent_length= memcached_io_flush(ptr, server_key); + sent_length= io_flush(ptr, server_key, &rc); + if (sent_length == -1) + return -1; WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); - ptr->write_buffer_offset= 0; + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + ptr->hosts[server_key].write_buffer_offset= 0; } } if (with_flush) { - if (memcached_io_flush(ptr, server_key) == -1) + memcached_return rc; + if (io_flush(ptr, server_key, &rc) == -1) return -1; } return length; } -ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) +memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) +{ + close(ptr->hosts[server_key].fd); + + return MEMCACHED_SUCCESS; +} + +static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, + memcached_return *error) { size_t sent_length; size_t return_length; - char *write_ptr= ptr->write_buffer; - size_t write_length= ptr->write_buffer_offset; - unsigned int loop= 1; + char *write_ptr= ptr->hosts[server_key].write_buffer; + size_t write_length= ptr->hosts[server_key].write_buffer_offset; - if (ptr->write_buffer_offset == 0) + *error= MEMCACHED_SUCCESS; + + if (ptr->hosts[server_key].write_buffer_offset == 0) return 0; return_length= 0; while (write_length) { - if (ptr->flags & MEM_NO_BLOCK) + sent_length= 0; + if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP) { - memcached_return rc; - - rc= io_wait(ptr, server_key, 1); - if (rc != MEMCACHED_SUCCESS) - return -1; + sent_length= sendto(ptr->hosts[server_key].fd, + write_ptr, write_length, 0, + (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr, + sizeof(struct sockaddr)); } - - sent_length= 0; - if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr, - write_length)) == -1) + else { - switch (errno) + if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr, + write_length)) == -1) { - case ENOBUFS: - case EAGAIN: - WATCHPOINT; - continue; - if (loop < 100) + switch (errno) { - loop++; - break; + case ENOBUFS: + continue; + case EAGAIN: + { + memcached_return rc; + rc= io_wait(ptr, server_key, MEM_WRITE); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, server_key, 1); + return -1; + } + default: + memcached_quit_server(ptr, server_key, 1); + ptr->cached_errno= errno; + *error= MEMCACHED_ERRNO; + return -1; } - /* Yes, we want to fall through */ - default: - ptr->cached_errno= errno; - return -1; } } @@ -167,8 +213,9 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) } WATCHPOINT_ASSERT(write_length == 0); - WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset); - ptr->write_buffer_offset= 0; + WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset); + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + ptr->hosts[server_key].write_buffer_offset= 0; return return_length; } @@ -178,6 +225,7 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) */ void memcached_io_reset(memcached_st *ptr, unsigned int server_key) { - ptr->write_buffer_offset= 0; + ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; + ptr->hosts[server_key].write_buffer_offset= 0; memcached_quit(ptr); }