X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=85e5ed42b82460a69d6c6b364b0af682ed189919;hb=f5838944c3424afa14765d709a127c943f1cf87b;hp=6594aa75d13972e2a84d920c921ce4d0b225e326;hpb=28adf7b936c6f5c25b7526ff56ec1256da1246d4;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 6594aa75..85e5ed42 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -38,124 +38,34 @@ #include -#include enum memc_read_or_write { MEM_READ, MEM_WRITE }; -static ssize_t io_flush(memcached_server_write_instance_st ptr, - const bool with_flush, - memcached_return_t *error); -static void increment_udp_message_id(memcached_server_write_instance_st ptr); - -static memcached_return_t io_wait(memcached_server_write_instance_st ptr, - memc_read_or_write read_or_write) -{ - struct pollfd fds; - fds.fd= ptr->fd; - fds.events= POLLIN; - - int error; - - if (read_or_write == MEM_WRITE) /* write */ - { - fds.events= POLLOUT; - WATCHPOINT_SET(ptr->io_wait_count.write++); - } - else - { - WATCHPOINT_SET(ptr->io_wait_count.read++); - } - - /* - ** We are going to block on write, but at least on Solaris we might block - ** on write if we haven't read anything from our input buffer.. - ** Try to purge the input buffer if we don't do any flow control in the - ** application layer (just sending a lot of data etc) - ** The test is moved down in the purge function to avoid duplication of - ** the test. +/* + * The udp request id consists of two seperate sections + * 1) The thread id + * 2) The message number + * The thread id should only be set when the memcached_st struct is created + * and should not be changed. + * + * The message num is incremented for each new message we send, this function + * extracts the message number from message_id, increments it and then + * writes the new value back into the header */ - if (read_or_write == MEM_WRITE) - { - memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - { - return MEMCACHED_FAILURE; - } - } - - size_t loop_max= 5; - while (--loop_max) // While loop is for ERESTART or EINTR - { - if (ptr->root->poll_timeout) // Mimic 0 causes timeout behavior (not all platforms do this) - { - error= poll(&fds, 1, ptr->root->poll_timeout); - } - else - { - error= 0; - } - - switch (error) - { - case 1: // Success! - WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); - WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); - - return MEMCACHED_SUCCESS; - - case 0: // Timeout occured, we let the while() loop do its thing. - return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); - - default: - WATCHPOINT_ERRNO(get_socket_errno()); - switch (get_socket_errno()) - { -#ifdef TARGET_OS_LINUX - case ERESTART: -#endif - case EINTR: - break; - - case EFAULT: - case ENOMEM: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - - case EINVAL: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); - - default: - if (fds.revents & POLLERR) - { - int err; - socklen_t len= sizeof (err); - (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); - ptr->cached_errno= (err == 0) ? get_socket_errno() : err; - } - else - { - ptr->cached_errno= get_socket_errno(); - } - memcached_quit_server(ptr, true); - - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } - } - } - - /* Imposssible for anything other then -1 */ - WATCHPOINT_ASSERT(error == -1); - ptr->cached_errno= get_socket_errno(); - memcached_quit_server(ptr, true); +static void increment_udp_message_id(memcached_server_write_instance_st ptr) +{ + struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; + uint16_t cur_req= get_udp_datagram_request_id(header); + int msg_num= get_msg_num_from_request_id(cur_req); + int thread_id= get_thread_id_from_request_id(cur_req); - return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT); -} + if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) + msg_num= 0; -memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) -{ - return io_wait(ptr, MEM_WRITE); + header->request_id= htons((uint16_t) (thread_id | msg_num)); } /** @@ -281,15 +191,274 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) return false; } +static memcached_return_t io_wait(memcached_server_write_instance_st ptr, + memc_read_or_write read_or_write) +{ + struct pollfd fds; + fds.fd= ptr->fd; + fds.events= POLLIN; + + if (read_or_write == MEM_WRITE) /* write */ + { + fds.events= POLLOUT; + WATCHPOINT_SET(ptr->io_wait_count.write++); + } + else + { + WATCHPOINT_SET(ptr->io_wait_count.read++); + } + + /* + ** We are going to block on write, but at least on Solaris we might block + ** on write if we haven't read anything from our input buffer.. + ** Try to purge the input buffer if we don't do any flow control in the + ** application layer (just sending a lot of data etc) + ** The test is moved down in the purge function to avoid duplication of + ** the test. + */ + if (read_or_write == MEM_WRITE) + { + memcached_return_t rc= memcached_purge(ptr); + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + { + return MEMCACHED_FAILURE; + } + } + + if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) + { + return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + } + + size_t loop_max= 5; + while (--loop_max) // While loop is for ERESTART or EINTR + { + + int error= poll(&fds, 1, ptr->root->poll_timeout); + switch (error) + { + case 1: // Success! + WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); + WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); + + return MEMCACHED_SUCCESS; + + case 0: // Timeout occured, we let the while() loop do its thing. + return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + + default: + WATCHPOINT_ERRNO(get_socket_errno()); + switch (get_socket_errno()) + { +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + case EINTR: + break; + + case EFAULT: + case ENOMEM: + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + + case EINVAL: + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + + default: + if (fds.revents & POLLERR) + { + int err; + socklen_t len= sizeof (err); + if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) + { + if (err == 0) + { + continue; + } + errno= err; + } + } + else + { + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + } + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + memcached_quit_server(ptr, true); + + return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT); + } + } + } + + memcached_quit_server(ptr, true); + + return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); +} + +static ssize_t io_flush(memcached_server_write_instance_st ptr, + const bool with_flush, + memcached_return_t *error) +{ + /* + ** We might want to purge the input buffer if we haven't consumed + ** any output yet... The test for the limits is the purge is inline + ** in the purge function to avoid duplicating the logic.. + */ + { + memcached_return_t rc; + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + rc= memcached_purge(ptr); + + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + { + return -1; + } + } + size_t return_length; + char *local_write_ptr= ptr->write_buffer; + size_t write_length= ptr->write_buffer_offset; + + *error= MEMCACHED_SUCCESS; + + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + + // UDP Sanity check, make sure that we are not sending somthing too big + if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH) + { + *error= MEMCACHED_WRITE_FAILURE; + return -1; + } + + if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) + { + return 0; + } + + /* Looking for memory overflows */ +#if defined(DEBUG) + if (write_length == MEMCACHED_MAX_BUFFER) + WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); + WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); +#endif + + return_length= 0; + while (write_length) + { + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + WATCHPOINT_ASSERT(write_length > 0); + if (memcached_is_udp(ptr->root)) + { + increment_udp_message_id(ptr); + } + + ssize_t sent_length= 0; + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + if (with_flush) + { + sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); + } + else + { + sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); + } + + if (sent_length == SOCKET_ERROR) + { + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); +#if 0 // @todo I should look at why we hit this bit of code hard frequently + WATCHPOINT_ERRNO(get_socket_errno()); + WATCHPOINT_NUMBER(get_socket_errno()); +#endif + switch (get_socket_errno()) + { + case ENOBUFS: + continue; + case EWOULDBLOCK: +#ifdef USE_EAGAIN + case EAGAIN: +#endif + { + /* + * We may be blocked on write because the input buffer + * is full. Let's check if we have room in our input + * buffer for more data and retry the write before + * waiting.. + */ + if (repack_input_buffer(ptr) or process_input_buffer(ptr)) + { + continue; + } + + memcached_return_t rc= io_wait(ptr, MEM_WRITE); + if (memcached_success(rc)) + { + continue; + } + else if (rc == MEMCACHED_TIMEOUT) + { + *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return -1; + } + + memcached_quit_server(ptr, true); + *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + return -1; + } + case ENOTCONN: + case EPIPE: + default: + memcached_quit_server(ptr, true); + *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + WATCHPOINT_ASSERT(ptr->fd == -1); + return -1; + } + } + + if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length) + { + memcached_quit_server(ptr, true); + *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + return -1; + } + + ptr->io_bytes_sent+= uint32_t(sent_length); + + local_write_ptr+= sent_length; + write_length-= uint32_t(sent_length); + return_length+= uint32_t(sent_length); + } + + WATCHPOINT_ASSERT(write_length == 0); + // Need to study this assert() WATCHPOINT_ASSERT(return_length == + // ptr->write_buffer_offset); + + // if we are a udp server, the begining of the buffer is reserverd for + // the upd frame header + if (memcached_is_udp(ptr->root)) + { + ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; + } + else + { + ptr->write_buffer_offset= 0; + } + + return (ssize_t) return_length; +} + +memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) +{ + return io_wait(ptr, MEM_WRITE); +} + memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, void *buffer, size_t length, ssize_t *nread) { - assert(ptr); // Programmer error + assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error char *buffer_ptr= static_cast(buffer); if (ptr->fd == INVALID_SOCKET) { - assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO)); + assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state"); return MEMCACHED_CONNECTION_FAILURE; } @@ -329,7 +498,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, case ENOTSOCK: WATCHPOINT_ASSERT(0); case EBADF: - assert(ptr->fd != INVALID_SOCKET); + assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket"); case EINVAL: case EFAULT: case ECONNREFUSED: @@ -353,7 +522,6 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, it will return EGAIN if data is not immediatly available. */ WATCHPOINT_STRING("We had a zero length recv()"); - assert(0); memcached_quit_server(ptr, true); *nread= -1; return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT); @@ -388,18 +556,18 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, } } - ptr->server_failure_counter= 0; *nread = (ssize_t)(buffer_ptr - (char*)buffer); + return MEMCACHED_SUCCESS; } memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) { - assert(ptr); // Programmer error + assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st"); if (ptr->fd == INVALID_SOCKET) { - assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO)); + assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state"); return MEMCACHED_CONNECTION_FAILURE; } @@ -436,7 +604,7 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) case ENOTSOCK: WATCHPOINT_ASSERT(0); case EBADF: - assert(ptr->fd != INVALID_SOCKET); + assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state"); case EINVAL: case EFAULT: case ECONNREFUSED: @@ -452,13 +620,10 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) static ssize_t _io_write(memcached_server_write_instance_st ptr, const void *buffer, size_t length, bool with_flush) { - size_t original_length; - const char* buffer_ptr; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - original_length= length; - buffer_ptr= static_cast(buffer); + size_t original_length= length; + const char *buffer_ptr= static_cast(buffer); while (length) { @@ -466,7 +631,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, size_t should_write; size_t buffer_end; - if (ptr->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(ptr->root)) { //UDP does not support partial writes buffer_end= MAX_UDP_DATAGRAM_LENGTH; @@ -489,7 +654,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) + if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -619,7 +784,8 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st return NULL; } - switch (poll(fds, host_index, memc->poll_timeout)) + int error= poll(fds, host_index, memc->poll_timeout); + switch (error) { case -1: memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT); @@ -647,155 +813,6 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st return NULL; } -static ssize_t io_flush(memcached_server_write_instance_st ptr, - const bool with_flush, - memcached_return_t *error) -{ - /* - ** We might want to purge the input buffer if we haven't consumed - ** any output yet... The test for the limits is the purge is inline - ** in the purge function to avoid duplicating the logic.. - */ - { - memcached_return_t rc; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - rc= memcached_purge(ptr); - - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - { - return -1; - } - } - ssize_t sent_length; - size_t return_length; - char *local_write_ptr= ptr->write_buffer; - size_t write_length= ptr->write_buffer_offset; - - *error= MEMCACHED_SUCCESS; - - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - - // UDP Sanity check, make sure that we are not sending somthing too big - if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) - { - *error= MEMCACHED_WRITE_FAILURE; - return -1; - } - - if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP - && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) - { - return 0; - } - - /* Looking for memory overflows */ -#if defined(DEBUG) - if (write_length == MEMCACHED_MAX_BUFFER) - WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); - WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); -#endif - - return_length= 0; - while (write_length) - { - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - WATCHPOINT_ASSERT(write_length > 0); - sent_length= 0; - if (ptr->type == MEMCACHED_CONNECTION_UDP) - increment_udp_message_id(ptr); - - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (with_flush) - { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); - } - else - { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); - } - - if (sent_length == SOCKET_ERROR) - { - ptr->cached_errno= get_socket_errno(); -#if 0 // @todo I should look at why we hit this bit of code hard frequently - WATCHPOINT_ERRNO(get_socket_errno()); - WATCHPOINT_NUMBER(get_socket_errno()); -#endif - switch (get_socket_errno()) - { - case ENOBUFS: - continue; - case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: -#endif - { - /* - * We may be blocked on write because the input buffer - * is full. Let's check if we have room in our input - * buffer for more data and retry the write before - * waiting.. - */ - if (repack_input_buffer(ptr) or - process_input_buffer(ptr)) - { - continue; - } - - memcached_return_t rc= io_wait(ptr, MEM_WRITE); - if (memcached_success(rc)) - { - continue; - } - else if (rc == MEMCACHED_TIMEOUT) - { - *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); - return -1; - } - - memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - return -1; - } - case ENOTCONN: - case EPIPE: - default: - memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - WATCHPOINT_ASSERT(ptr->fd == -1); - return -1; - } - } - - if (ptr->type == MEMCACHED_CONNECTION_UDP and - (size_t)sent_length != write_length) - { - memcached_quit_server(ptr, true); - *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return -1; - } - - ptr->io_bytes_sent += (uint32_t) sent_length; - - local_write_ptr+= sent_length; - write_length-= (uint32_t) sent_length; - return_length+= (uint32_t) sent_length; - } - - WATCHPOINT_ASSERT(write_length == 0); - // Need to study this assert() WATCHPOINT_ASSERT(return_length == - // ptr->write_buffer_offset); - - // if we are a udp server, the begining of the buffer is reserverd for - // the upd frame header - if (ptr->type == MEMCACHED_CONNECTION_UDP) - ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; - else - ptr->write_buffer_offset= 0; - - return (ssize_t) return_length; -} - /* Eventually we will just kill off the server with the problem. */ @@ -835,10 +852,11 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, char *buffer_ptr, - size_t size) + size_t size, + size_t& total_nr) { + total_nr= 0; bool line_complete= false; - size_t total_nr= 0; while (not line_complete) { @@ -887,30 +905,6 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } -/* - * The udp request id consists of two seperate sections - * 1) The thread id - * 2) The message number - * The thread id should only be set when the memcached_st struct is created - * and should not be changed. - * - * The message num is incremented for each new message we send, this function - * extracts the message number from message_id, increments it and then - * writes the new value back into the header - */ -static void increment_udp_message_id(memcached_server_write_instance_st ptr) -{ - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - uint16_t cur_req= get_udp_datagram_request_id(header); - int msg_num= get_msg_num_from_request_id(cur_req); - int thread_id= get_thread_id_from_request_id(cur_req); - - if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) - msg_num= 0; - - header->request_id= htons((uint16_t) (thread_id | msg_num)); -} - memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id) { if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)