X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fmemcached_io.c;h=7bdb7238fa7737d6cbd8619bc33f65dce2ad98e3;hb=542ea353c726f6d552b51128932a9b18ed3ed8f2;hp=6f8528933f27258179c96f7e8a2e47ee976ee035;hpb=295e847e38375ba6b8669c33264fd46dc3e59feb;p=awesomized%2Flibmemcached diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 6f852893..7bdb7238 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -13,6 +13,7 @@ typedef enum { } memc_read_or_write; static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error); +static void increment_udp_message_id(memcached_server_st *ptr); static memcached_return io_wait(memcached_server_st *ptr, memc_read_or_write read_or_write) @@ -22,15 +23,14 @@ static memcached_return io_wait(memcached_server_st *ptr, int error; if (read_or_write == MEM_WRITE) /* write */ - flags= POLLOUT | POLLERR; + flags= POLLOUT; else - flags= POLLIN | POLLERR; + flags= POLLIN; memset(&fds, 0, sizeof(struct pollfd)); fds[0].fd= ptr->fd; fds[0].events= flags; -#ifdef NOT_DONE /* ** We are going to block on write, but at least on Solaris we might block ** on write if we haven't read anything from our input buffer.. @@ -41,10 +41,10 @@ static memcached_return io_wait(memcached_server_st *ptr, */ if (read_or_write == MEM_WRITE) { - if (memcached_purge(ptr) != MEMCACHED_SUCCESS || memcached_purge(ptr) != MEMCACHED_STORED) - return MEMCACHED_FAILURE; + memcached_return rc=memcached_purge(ptr); + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + return MEMCACHED_FAILURE; } -#endif error= poll(fds, 1, ptr->root->poll_timeout); @@ -90,8 +90,8 @@ void memcached_io_preread(memcached_st *ptr) } #endif -ssize_t memcached_io_read(memcached_server_st *ptr, - void *buffer, size_t length) +memcached_return memcached_io_read(memcached_server_st *ptr, + void *buffer, size_t length, ssize_t *nread) { char *buffer_ptr; @@ -105,31 +105,26 @@ ssize_t memcached_io_read(memcached_server_st *ptr, while (1) { - data_read= read(ptr->fd, - ptr->read_buffer, - MEMCACHED_MAX_BUFFER); + data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER); if (data_read > 0) break; else if (data_read == -1) { ptr->cached_errno= errno; + memcached_return rc= MEMCACHED_UNKNOWN_READ_FAILURE; switch (errno) { case EAGAIN: - case EINTR: - { - memcached_return rc; - - rc= io_wait(ptr, MEM_READ); + case EINTR: + if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS) + continue; + /* fall through */ - if (rc == MEMCACHED_SUCCESS) - continue; - } - /* fall trough */ default: { memcached_quit_server(ptr, 1); - return -1; + *nread= -1; + return rc; } } } @@ -145,7 +140,8 @@ ssize_t memcached_io_read(memcached_server_st *ptr, it will return EGAIN if data is not immediatly available. */ memcached_quit_server(ptr, 1); - return -1; + *nread= -1; + return MEMCACHED_UNKNOWN_READ_FAILURE; } } @@ -177,7 +173,9 @@ ssize_t memcached_io_read(memcached_server_st *ptr, } } - return (size_t)(buffer_ptr - (char*)buffer); + ptr->server_failure_counter= 0; + *nread = (size_t)(buffer_ptr - (char*)buffer); + return MEMCACHED_SUCCESS; } ssize_t memcached_io_write(memcached_server_st *ptr, @@ -195,18 +193,30 @@ ssize_t memcached_io_write(memcached_server_st *ptr, { char *write_ptr; size_t should_write; + size_t buffer_end; - should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset; - write_ptr= ptr->write_buffer + ptr->write_buffer_offset; - - should_write= (should_write < length) ? should_write : length; + if (ptr->type == MEMCACHED_CONNECTION_UDP) + { + //UDP does not support partial writes + buffer_end= MAX_UDP_DATAGRAM_LENGTH; + should_write= length; + if (ptr->write_buffer_offset + should_write > buffer_end) + return -1; + } + else + { + buffer_end= MEMCACHED_MAX_BUFFER; + should_write= buffer_end - ptr->write_buffer_offset; + should_write= (should_write < length) ? should_write : length; + } + write_ptr= ptr->write_buffer + ptr->write_buffer_offset; memcpy(write_ptr, buffer_ptr, should_write); ptr->write_buffer_offset+= should_write; buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER) + if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) { memcached_return rc; ssize_t sent_length; @@ -216,7 +226,11 @@ ssize_t memcached_io_write(memcached_server_st *ptr, if (sent_length == -1) return -1; - WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); + /* If io_flush calls memcached_purge, sent_length may be 0 */ + unlikely (sent_length != 0) + { + WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end); + } } } @@ -262,9 +276,72 @@ memcached_return memcached_io_close(memcached_server_st *ptr) return MEMCACHED_SUCCESS; } +memcached_server_st *memcached_io_get_readable_server(memcached_st *memc) +{ +#define MAX_SERVERS_TO_POLL 100 + struct pollfd fds[MAX_SERVERS_TO_POLL]; + unsigned int index= 0; + + for (unsigned int x= 0; + x< memc->number_of_hosts && index < MAX_SERVERS_TO_POLL; + ++x) + { + if (memc->hosts[x].read_buffer_length > 0) /* I have data in the buffer */ + return &memc->hosts[x]; + + if (memcached_server_response_count(&memc->hosts[x]) > 0) + { + fds[index].events = POLLIN; + fds[index].revents = 0; + fds[index].fd = memc->hosts[x].fd; + ++index; + } + } + + if (index < 2) + { + /* We have 0 or 1 server with pending events.. */ + for (unsigned int x= 0; x< memc->number_of_hosts; ++x) + if (memcached_server_response_count(&memc->hosts[x]) > 0) + return &memc->hosts[x]; + + return NULL; + } + + int err= poll(fds, index, memc->poll_timeout); + switch (err) { + case -1: + memc->cached_errno = errno; + /* FALLTHROUGH */ + case 0: + break; + default: + for (unsigned int x= 0; x < index; ++x) + if (fds[x].revents & POLLIN) + for (unsigned int y= 0; y < memc->number_of_hosts; ++y) + if (memc->hosts[y].fd == fds[x].fd) + return &memc->hosts[y]; + } + + return NULL; +} + static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error) { + /* + ** We might want to purge the input buffer if we haven't consumed + ** any output yet... The test for the limits is the purge is inline + ** in the purge function to avoid duplicating the logic.. + */ + { + memcached_return rc; + WATCHPOINT_ASSERT(ptr->fd != -1); + rc= memcached_purge(ptr); + + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + return -1; + } ssize_t sent_length; size_t return_length; char *local_write_ptr= ptr->write_buffer; @@ -274,7 +351,12 @@ static ssize_t io_flush(memcached_server_st *ptr, WATCHPOINT_ASSERT(ptr->fd != -1); - if (ptr->write_buffer_offset == 0) + // UDP Sanity check, make sure that we are not sending somthing too big + if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) + return -1; + + if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP + && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) return 0; /* Looking for memory overflows */ @@ -291,77 +373,39 @@ static ssize_t io_flush(memcached_server_st *ptr, WATCHPOINT_ASSERT(write_length > 0); sent_length= 0; if (ptr->type == MEMCACHED_CONNECTION_UDP) + increment_udp_message_id(ptr); + sent_length= write(ptr->fd, local_write_ptr, write_length); + + if (sent_length == -1) { - struct addrinfo *ai; - - ai= ptr->address_info; - - /* Crappy test code */ - char buffer[HUGE_STRING_LEN + 8]; - memset(buffer, 0, HUGE_STRING_LEN + 8); - memcpy (buffer+8, local_write_ptr, write_length); - buffer[0]= 0; - buffer[1]= 0; - buffer[2]= 0; - buffer[3]= 0; - buffer[4]= 0; - buffer[5]= 1; - buffer[6]= 0; - buffer[7]= 0; - sent_length= sendto(ptr->fd, buffer, write_length + 8, 0, - (struct sockaddr *)ai->ai_addr, - ai->ai_addrlen); - if (sent_length == -1) + ptr->cached_errno= errno; + switch (errno) { - WATCHPOINT_ERRNO(errno); - WATCHPOINT_ASSERT(0); - } - sent_length-= 8; /* We remove the header */ - } - else - { -#ifdef NOT_DONE - /* - ** We might want to purge the input buffer if we haven't consumed - ** any output yet... The test for the limits is the purge is inline - ** in the purge function to avoid duplicating the logic.. - */ + case ENOBUFS: + continue; + case EAGAIN: { memcached_return rc; - WATCHPOINT_ASSERT(ptr->fd != -1); - rc= memcached_purge(ptr); - - if (rc != MEMCACHED_SUCCESS || rc != MEMCACHED_STORED) - return -1; - } -#endif + rc= io_wait(ptr, MEM_WRITE); - WATCHPOINT_ASSERT(ptr->fd != -1); - if ((sent_length= write(ptr->fd, local_write_ptr, - write_length)) == -1) - { - switch (errno) - { - case ENOBUFS: + if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) continue; - case EAGAIN: - { - memcached_return rc; - rc= io_wait(ptr, MEM_WRITE); - if (rc == MEMCACHED_SUCCESS) - continue; - - memcached_quit_server(ptr, 1); - return -1; - } - default: - memcached_quit_server(ptr, 1); - ptr->cached_errno= errno; - *error= MEMCACHED_ERRNO; - return -1; - } + memcached_quit_server(ptr, 1); + return -1; } + default: + memcached_quit_server(ptr, 1); + *error= MEMCACHED_ERRNO; + return -1; + } + } + + if (ptr->type == MEMCACHED_CONNECTION_UDP && + (size_t)sent_length != write_length) + { + memcached_quit_server(ptr, 1); + return -1; } ptr->io_bytes_sent += sent_length; @@ -374,7 +418,13 @@ static ssize_t io_flush(memcached_server_st *ptr, WATCHPOINT_ASSERT(write_length == 0); // Need to study this assert() WATCHPOINT_ASSERT(return_length == // ptr->write_buffer_offset); - ptr->write_buffer_offset= 0; + + // if we are a udp server, the begining of the buffer is reserverd for + // the upd frame header + if (ptr->type == MEMCACHED_CONNECTION_UDP) + ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; + else + ptr->write_buffer_offset= 0; return return_length; } @@ -386,3 +436,112 @@ void memcached_io_reset(memcached_server_st *ptr) { memcached_quit_server(ptr, 1); } + +/** + * Read a given number of bytes from the server and place it into a specific + * buffer. Reset the IO channel on this server if an error occurs. + */ +memcached_return memcached_safe_read(memcached_server_st *ptr, + void *dta, + size_t size) +{ + size_t offset= 0; + char *data= dta; + + while (offset < size) + { + ssize_t nread; + memcached_return rc= memcached_io_read(ptr, data + offset, size - offset, + &nread); + if (rc != MEMCACHED_SUCCESS) + return rc; + + offset+= nread; + } + + return MEMCACHED_SUCCESS; +} + +memcached_return memcached_io_readline(memcached_server_st *ptr, + char *buffer_ptr, + size_t size) +{ + bool line_complete= false; + size_t total_nr= 0; + + while (!line_complete) + { + if (ptr->read_buffer_length == 0) + { + /* + * We don't have any data in the buffer, so let's fill the read + * buffer. Call the standard read function to avoid duplicating + * the logic. + */ + ssize_t nread; + memcached_return rc= memcached_io_read(ptr, buffer_ptr, 1, &nread); + if (rc != MEMCACHED_SUCCESS) + return rc; + + if (*buffer_ptr == '\n') + line_complete= true; + + ++buffer_ptr; + ++total_nr; + } + + /* Now let's look in the buffer and copy as we go! */ + while (ptr->read_buffer_length && total_nr < size && !line_complete) + { + *buffer_ptr = *ptr->read_ptr; + if (*buffer_ptr == '\n') + line_complete = true; + --ptr->read_buffer_length; + ++ptr->read_ptr; + ++total_nr; + ++buffer_ptr; + } + + if (total_nr == size) + return MEMCACHED_PROTOCOL_ERROR; + } + + return MEMCACHED_SUCCESS; +} + +/* + * The udp request id consists of two seperate sections + * 1) The thread id + * 2) The message number + * The thread id should only be set when the memcached_st struct is created + * and should not be changed. + * + * The message num is incremented for each new message we send, this function + * extracts the message number from message_id, increments it and then + * writes the new value back into the header + */ +static void increment_udp_message_id(memcached_server_st *ptr) +{ + struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; + uint16_t cur_req= get_udp_datagram_request_id(header); + uint16_t msg_num= get_msg_num_from_request_id(cur_req); + uint16_t thread_id= get_thread_id_from_request_id(cur_req); + + if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) + msg_num= 0; + + header->request_id= htons(thread_id | msg_num); +} + +memcached_return memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id) +{ + if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) + return MEMCACHED_FAILURE; + + struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; + header->request_id= htons(generate_udp_request_thread_id(thread_id)); + header->num_datagrams= htons(1); + header->sequence_number= htons(0); + + return MEMCACHED_SUCCESS; +}