X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=lib%2Fmemcached_io.c;h=22c54e420da76f51f888a3c4eef6b0480e3c892c;hb=69b905e54a2f5ad5038a3f75c269bbbb4c3e610f;hp=de3eda32875460018fa4aa320237197be0d40d53;hpb=f45cff13d17432059886e6f426d3c8c4d1f23732;p=m6w6%2Flibmemcached diff --git a/lib/memcached_io.c b/lib/memcached_io.c index de3eda32..22c54e42 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -12,10 +12,9 @@ typedef enum { MEM_WRITE, } memc_read_or_write; -static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, - memcached_return *error); +static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error); -static memcached_return io_wait(memcached_st *ptr, unsigned int server_key, +static memcached_return io_wait(memcached_server_st *ptr, memc_read_or_write read_or_write) { struct pollfd fds[1]; @@ -28,28 +27,54 @@ static memcached_return io_wait(memcached_st *ptr, unsigned int server_key, flags= POLLIN | POLLERR; memset(&fds, 0, sizeof(struct pollfd)); - fds[0].fd= ptr->hosts[server_key].fd; + fds[0].fd= ptr->fd; fds[0].events= flags; - error= poll(fds, 1, ptr->poll_timeout); + error= poll(fds, 1, ptr->root->poll_timeout); if (error == 1) return MEMCACHED_SUCCESS; else if (error == 0) { - WATCHPOINT_NUMBER(read_or_write); return MEMCACHED_TIMEOUT; } - WATCHPOINT; /* Imposssible for anything other then -1 */ WATCHPOINT_ASSERT(error == -1); - memcached_quit_server(ptr, server_key, 1); + memcached_quit_server(ptr, 1); + return MEMCACHED_FAILURE; } -ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, +#ifdef UNUSED +void memcached_io_preread(memcached_st *ptr) +{ + unsigned int x; + + return; + + for (x= 0; x < ptr->number_of_hosts; x++) + { + if (memcached_server_response_count(ptr, x) && + ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER ) + { + size_t data_read; + + data_read= read(ptr->hosts[x].fd, + ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length, + MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length); + if (data_read == -1) + continue; + + ptr->hosts[x].read_buffer_length+= data_read; + ptr->hosts[x].read_data_length+= data_read; + } + } +} +#endif + +ssize_t memcached_io_read(memcached_server_st *ptr, char *buffer, size_t length) { char *buffer_ptr; @@ -58,143 +83,165 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key, while (length) { - if (!ptr->hosts[server_key].read_buffer_length) + uint8_t found_eof= 0; + if (!ptr->read_buffer_length) { size_t data_read; while (1) { - data_read= read(ptr->hosts[server_key].fd, - ptr->hosts[server_key].read_buffer, + data_read= read(ptr->fd, + ptr->read_buffer, MEMCACHED_MAX_BUFFER); - if (data_read == -1) + if (data_read) + break; + else if (data_read == -1) { + ptr->cached_errno= errno; switch (errno) { case EAGAIN: { memcached_return rc; - rc= io_wait(ptr, server_key, MEM_READ); + + rc= io_wait(ptr, MEM_READ); if (rc == MEMCACHED_SUCCESS) continue; - - memcached_quit_server(ptr, server_key, 1); - return -1; } + /* fall trough */ default: { - memcached_quit_server(ptr, server_key, 1); - ptr->cached_errno= errno; + memcached_quit_server(ptr, 1); return -1; } } } - else if (data_read) + else + { + WATCHPOINT_ASSERT(0); + found_eof= 1; break; - /* If zero, just keep looping */ + } } - ptr->hosts[server_key].read_buffer_length= data_read; - ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer; + ptr->read_data_length= data_read; + ptr->read_buffer_length= data_read; + ptr->read_ptr= ptr->read_buffer; } - *buffer_ptr= *ptr->hosts[server_key].read_ptr; - length--; - ptr->hosts[server_key].read_ptr++; - ptr->hosts[server_key].read_buffer_length--; - buffer_ptr++; + if (length > 1) + { + size_t difference; + + difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; + + memcpy(buffer_ptr, ptr->read_ptr, difference); + length -= difference; + ptr->read_ptr+= difference; + ptr->read_buffer_length-= difference; + buffer_ptr+= difference; + } + else + { + *buffer_ptr= *ptr->read_ptr; + ptr->read_ptr++; + ptr->read_buffer_length--; + buffer_ptr++; + break; + } + + if (found_eof) + break; } return (size_t)(buffer_ptr - buffer); } -ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key, - char *buffer, size_t length, char with_flush) +ssize_t memcached_io_write(memcached_server_st *ptr, + char *buffer, size_t length, char with_flush) { - unsigned long long x; + size_t original_length; + char* buffer_ptr; + + original_length= length; + buffer_ptr= buffer; - for (x= 0; x < length; x++) + while (length) { - if (ptr->hosts[server_key].write_ptr == 0) - ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; - WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr); - *ptr->hosts[server_key].write_ptr= buffer[x]; - ptr->hosts[server_key].write_ptr++; - ptr->hosts[server_key].write_buffer_offset++; - - if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER) + char *write_ptr; + size_t should_write; + + should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset; + write_ptr= ptr->write_buffer + ptr->write_buffer_offset; + + should_write= (should_write < length) ? should_write : length; + + memcpy(write_ptr, buffer_ptr, should_write); + ptr->write_buffer_offset+= should_write; + buffer_ptr+= should_write; + length-= should_write; + + if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) { memcached_return rc; - size_t sent_length; + ssize_t sent_length; - sent_length= io_flush(ptr, server_key, &rc); + sent_length= io_flush(ptr, &rc); if (sent_length == -1) return -1; WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); - ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; - ptr->hosts[server_key].write_buffer_offset= 0; } } if (with_flush) { memcached_return rc; - if (io_flush(ptr, server_key, &rc) == -1) + if (io_flush(ptr, &rc) == -1) return -1; } - return length; + return original_length; } -memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key) +memcached_return memcached_io_close(memcached_server_st *ptr) { - if (ptr->flags & MEM_NO_BLOCK && 0) - { - int sock_size; - int error; - socklen_t sock_length; - - error= getsockopt(ptr->hosts[server_key].fd, IPPROTO_TCP, SO_LINGER, - &sock_size, &sock_length); - - WATCHPOINT_NUMBER(error); - WATCHPOINT_NUMBER(sock_size); - } - - close(ptr->hosts[server_key].fd); + close(ptr->fd); return MEMCACHED_SUCCESS; } -static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, +static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error) { size_t sent_length; size_t return_length; - char *write_ptr= ptr->hosts[server_key].write_buffer; - size_t write_length= ptr->hosts[server_key].write_buffer_offset; + char *local_write_ptr= ptr->write_buffer; + size_t write_length= ptr->write_buffer_offset; *error= MEMCACHED_SUCCESS; - if (ptr->hosts[server_key].write_buffer_offset == 0) + if (ptr->write_buffer_offset == 0) return 0; + /* Looking for memory overflows */ + if (write_length == MEMCACHED_MAX_BUFFER) + WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); + WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); return_length= 0; while (write_length) { sent_length= 0; - if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP) + if (ptr->type == MEMCACHED_CONNECTION_UDP) { - sent_length= sendto(ptr->hosts[server_key].fd, - write_ptr, write_length, 0, - (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr, + sent_length= sendto(ptr->fd, local_write_ptr, write_length, 0, + (struct sockaddr *)&ptr->address_info->ai_addr, sizeof(struct sockaddr)); } else { - if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr, + if ((ssize_t)(sent_length= write(ptr->fd, local_write_ptr, write_length)) == -1) { switch (errno) @@ -204,16 +251,16 @@ static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, case EAGAIN: { memcached_return rc; - rc= io_wait(ptr, server_key, MEM_WRITE); + rc= io_wait(ptr, MEM_WRITE); if (rc == MEMCACHED_SUCCESS) continue; - memcached_quit_server(ptr, server_key, 1); + memcached_quit_server(ptr, 1); return -1; } default: - memcached_quit_server(ptr, server_key, 1); + memcached_quit_server(ptr, 1); ptr->cached_errno= errno; *error= MEMCACHED_ERRNO; return -1; @@ -221,15 +268,14 @@ static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, } } - write_ptr+= sent_length; + local_write_ptr+= sent_length; write_length-= sent_length; return_length+= sent_length; } WATCHPOINT_ASSERT(write_length == 0); - WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset); - ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; - ptr->hosts[server_key].write_buffer_offset= 0; + WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset); + ptr->write_buffer_offset= 0; return return_length; } @@ -237,9 +283,8 @@ static ssize_t io_flush(memcached_st *ptr, unsigned int server_key, /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(memcached_st *ptr, unsigned int server_key) +void memcached_io_reset(memcached_server_st *ptr) { - ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer; - ptr->hosts[server_key].write_buffer_offset= 0; - memcached_quit(ptr); + ptr->write_buffer_offset= 0; + memcached_quit_server(ptr, 0); }