X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=3b5b25afa026bfb5abdce9e047746aca509382cb;hb=8a86b578acc594d37a8638e3e0afba1286c4b6ca;hp=45a12199e5241d40e749ec0a8f0b6b2fe4139fed;hpb=686192d8ff24bcc24d3ea9d1aa49373fb2bb198c;p=m6w6%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index 45a12199..3b5b25af 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -7,58 +7,74 @@ #include #include -int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write) +typedef enum { + MEM_READ, + MEM_WRITE, +} memc_read_or_write; + +static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error); + +static memcached_return io_wait(memcached_server_st *ptr, + memc_read_or_write read_or_write) { struct pollfd fds[1]; short flags= 0; + int error; - if (read_or_write) + if (read_or_write == MEM_WRITE) /* write */ flags= POLLOUT | POLLERR; else flags= POLLIN | POLLERR; memset(&fds, 0, sizeof(struct pollfd)); - fds[0].fd= ptr->hosts[server_key].fd; + fds[0].fd= ptr->fd; fds[0].events= flags; - if (poll(fds, 1, -1) < 0) - return MEMCACHED_FAILURE; + error= poll(fds, 1, ptr->root->poll_timeout); - return MEMCACHED_SUCCESS; -#ifdef OLD - while (1) + if (error == 1) + return MEMCACHED_SUCCESS; + else if (error == 0) { - int select_return; - struct timeval local_tv; - fd_set set; + return MEMCACHED_TIMEOUT; + } + + /* Imposssible for anything other then -1 */ + WATCHPOINT_ASSERT(error == -1); + memcached_quit_server(ptr, 1); - memset(&local_tv, 0, sizeof(struct timeval)); + return MEMCACHED_FAILURE; - local_tv.tv_sec= 0; - local_tv.tv_usec= 300; +} - FD_ZERO(&set); - FD_SET(ptr->hosts[server_key].fd, &set); +#ifdef UNUSED +void memcached_io_preread(memcached_st *ptr) +{ + unsigned int x; - if (read_or_write) - select_return= select(1, &set, NULL, NULL, &local_tv); - else - select_return= select(1, NULL, &set, NULL, &local_tv); + return; - if (select_return == -1) + for (x= 0; x < ptr->number_of_hosts; x++) + { + if (memcached_server_response_count(ptr, x) && + ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER ) { - ptr->my_errno= errno; - return MEMCACHED_FAILURE; + size_t data_read; + + data_read= read(ptr->hosts[x].fd, + ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length, + MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length); + if (data_read == -1) + continue; + + ptr->hosts[x].read_buffer_length+= data_read; + ptr->hosts[x].read_data_length+= data_read; } - else if (!select_return) - break; } - - return MEMCACHED_SUCCESS; -#endif } +#endif -ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, +ssize_t memcached_io_read(memcached_server_st *ptr, char *buffer, size_t length) { char *buffer_ptr; @@ -67,147 +83,231 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, while (length) { + uint8_t found_eof= 0; if (!ptr->read_buffer_length) { - size_t data_read; + ssize_t data_read; while (1) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 0); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - - data_read= recv(ptr->hosts[server_key].fd, + data_read= read(ptr->fd, ptr->read_buffer, - MEMCACHED_MAX_BUFFER, 0); - if (data_read == -1) + MEMCACHED_MAX_BUFFER); + if (data_read > 0) + break; + else if (data_read == -1) { + ptr->cached_errno= errno; switch (errno) { case EAGAIN: - break; + { + memcached_return rc; + + rc= io_wait(ptr, MEM_READ); + + if (rc == MEMCACHED_SUCCESS) + continue; + } + /* fall trough */ default: { - ptr->my_errno= errno; + memcached_quit_server(ptr, 1); return -1; } } } - else if (data_read) + else + { + WATCHPOINT_ASSERT(0); + found_eof= 1; break; - /* If zero, just keep looping */ + } } + ptr->read_data_length= data_read; ptr->read_buffer_length= data_read; ptr->read_ptr= ptr->read_buffer; } - *buffer_ptr= *ptr->read_ptr; - length--; - ptr->read_ptr++; - ptr->read_buffer_length--; - buffer_ptr++; + if (length > 1) + { + size_t difference; + + difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; + + memcpy(buffer_ptr, ptr->read_ptr, difference); + length -= difference; + ptr->read_ptr+= difference; + ptr->read_buffer_length-= difference; + buffer_ptr+= difference; + } + else + { + *buffer_ptr= *ptr->read_ptr; + ptr->read_ptr++; + ptr->read_buffer_length--; + buffer_ptr++; + break; + } + + if (found_eof) + break; } return (size_t)(buffer_ptr - buffer); } -ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, - char *buffer, size_t length, char with_flush) +ssize_t memcached_io_write(memcached_server_st *ptr, + char *buffer, size_t length, char with_flush) { - unsigned long long x; + size_t original_length; + char* buffer_ptr; + + original_length= length; + buffer_ptr= buffer; - for (x= 0; x < length; x++) + while (length) { - ptr->write_buffer[ptr->write_buffer_offset]= buffer[x]; - ptr->write_buffer_offset++; + char *write_ptr; + size_t should_write; + + should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset; + write_ptr= ptr->write_buffer + ptr->write_buffer_offset; + + should_write= (should_write < length) ? should_write : length; + + memcpy(write_ptr, buffer_ptr, should_write); + ptr->write_buffer_offset+= should_write; + buffer_ptr+= should_write; + length-= should_write; if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) { - size_t sent_length; + memcached_return rc; + ssize_t sent_length; - sent_length= memcached_io_flush(ptr, server_key); + sent_length= io_flush(ptr, &rc); + if (sent_length == -1) + return -1; - assert(sent_length == MEMCACHED_MAX_BUFFER); - ptr->write_buffer_offset= 0; + WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); } } if (with_flush) { - if (memcached_io_flush(ptr, server_key) == -1) + memcached_return rc; + if (io_flush(ptr, &rc) == -1) return -1; } - return length; + return original_length; } -ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key) +memcached_return memcached_io_close(memcached_server_st *ptr) +{ + close(ptr->fd); + + return MEMCACHED_SUCCESS; +} + +static ssize_t io_flush(memcached_server_st *ptr, + memcached_return *error) { size_t sent_length; - char *write_ptr= ptr->write_buffer; + size_t return_length; + char *local_write_ptr= ptr->write_buffer; size_t write_length= ptr->write_buffer_offset; - unsigned int loop= 1; + + *error= MEMCACHED_SUCCESS; if (ptr->write_buffer_offset == 0) return 0; + /* Looking for memory overflows */ + if (write_length == MEMCACHED_MAX_BUFFER) + WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); + WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); + + return_length= 0; while (write_length) { - if (ptr->flags & MEM_NO_BLOCK) - { - memcached_return rc; - - rc= io_wait(ptr, server_key, 1); - if (rc != MEMCACHED_SUCCESS) - return -1; - } - + WATCHPOINT_ASSERT(write_length > 0); sent_length= 0; -#ifdef orig - if ((sent_length= send(ptr->hosts[server_key].fd, write_ptr, - write_length, 0)) == -1) -#endif - if ((sent_length= write(ptr->hosts[server_key].fd, write_ptr, - write_length)) == -1) + if (ptr->type == MEMCACHED_CONNECTION_UDP) { - switch (errno) + struct addrinfo *ai; + + ai= ptr->address_info; + + /* Crappy test code */ + char buffer[HUGE_STRING_LEN + 8]; + memset(buffer, 0, HUGE_STRING_LEN + 8); + memcpy (buffer+8, local_write_ptr, write_length); + buffer[0]= 0; + buffer[1]= 0; + buffer[2]= 0; + buffer[3]= 0; + buffer[4]= 0; + buffer[5]= 1; + buffer[6]= 0; + buffer[7]= 0; + sent_length= sendto(ptr->fd, buffer, write_length + 8, 0, + (struct sockaddr *)ai->ai_addr, + ai->ai_addrlen); + if (sent_length == -1) { - case ENOBUFS: - case EAGAIN: - if (loop < 10) - { - loop++; - break; - } - /* Yes, we want to fall through */ - default: - ptr->my_errno= errno; - return -1; + WATCHPOINT_ERRNO(errno); + WATCHPOINT_ASSERT(0); } + sent_length-= 8; /* We remove the header */ } else { - write_ptr+= sent_length; - write_length-= sent_length; + if ((ssize_t)(sent_length= write(ptr->fd, local_write_ptr, + write_length)) == -1) + { + switch (errno) + { + case ENOBUFS: + continue; + case EAGAIN: + { + memcached_return rc; + rc= io_wait(ptr, MEM_WRITE); + + if (rc == MEMCACHED_SUCCESS) + continue; + + memcached_quit_server(ptr, 1); + return -1; + } + default: + memcached_quit_server(ptr, 1); + ptr->cached_errno= errno; + *error= MEMCACHED_ERRNO; + return -1; + } + } } + + local_write_ptr+= sent_length; + write_length-= sent_length; + return_length+= sent_length; } + WATCHPOINT_ASSERT(write_length == 0); + WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset); ptr->write_buffer_offset= 0; - return sent_length; + return return_length; } /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(memcached_st *ptr, unsigned int server_key) +void memcached_io_reset(memcached_server_st *ptr) { - ptr->write_buffer_offset= 0; - memcached_quit(ptr); + memcached_quit_server(ptr, 0); }