X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=837959da5db6696765036ffe5787741c8235658c;hb=674c7578fa870c3b57e81e765c355ce98434b310;hp=f77a097325c84d0c166c5f4149c8d793c54427d5;hpb=7abcaebdc4c3dd11b779eaef58a7371fb82ae888;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index f77a0973..837959da 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -38,126 +38,26 @@ #include -#include + +void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header) +{ + server->request_id++; + header.request.magic= PROTOCOL_BINARY_REQ; + header.request.opaque= htons(server->request_id); +} enum memc_read_or_write { MEM_READ, MEM_WRITE }; -static ssize_t io_flush(memcached_server_write_instance_st ptr, - const bool with_flush, - memcached_return_t *error); -static void increment_udp_message_id(memcached_server_write_instance_st ptr); - -static memcached_return_t io_wait(memcached_server_write_instance_st ptr, - memc_read_or_write read_or_write) -{ - struct pollfd fds; - fds.fd= ptr->fd; - fds.events= POLLIN; - - if (read_or_write == MEM_WRITE) /* write */ - { - fds.events= POLLOUT; - WATCHPOINT_SET(ptr->io_wait_count.write++); - } - else - { - WATCHPOINT_SET(ptr->io_wait_count.read++); - } - - /* - ** We are going to block on write, but at least on Solaris we might block - ** on write if we haven't read anything from our input buffer.. - ** Try to purge the input buffer if we don't do any flow control in the - ** application layer (just sending a lot of data etc) - ** The test is moved down in the purge function to avoid duplication of - ** the test. - */ - if (read_or_write == MEM_WRITE) - { - memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - { - return MEMCACHED_FAILURE; - } - } - - if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) - { - return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); - } - - size_t loop_max= 5; - while (--loop_max) // While loop is for ERESTART or EINTR - { - - int error= poll(&fds, 1, ptr->root->poll_timeout); - switch (error) - { - case 1: // Success! - WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); - WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); - - return MEMCACHED_SUCCESS; - - case 0: // Timeout occured, we let the while() loop do its thing. - return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); - - default: - WATCHPOINT_ERRNO(get_socket_errno()); - switch (get_socket_errno()) - { -#ifdef TARGET_OS_LINUX - case ERESTART: -#endif - case EINTR: - break; - - case EFAULT: - case ENOMEM: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - - case EINVAL: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); - - default: - if (fds.revents & POLLERR) - { - int err; - socklen_t len= sizeof (err); - (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); - memcached_set_errno(*ptr, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT); - } - else - { - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } - memcached_quit_server(ptr, true); - - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } - } - } - - memcached_quit_server(ptr, true); - - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); -} - -memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) -{ - return io_wait(ptr, MEM_WRITE); -} - /** * Try to fill the input buffer for a server with as much * data as possible. * * @param ptr the server to pack */ -static bool repack_input_buffer(memcached_server_write_instance_st ptr) +static bool repack_input_buffer(org::libmemcached::Instance* ptr) { if (ptr->read_ptr != ptr->read_buffer) { @@ -174,24 +74,27 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) { do { /* Just try a single read to grab what's available */ - ssize_t nr= recv(ptr->fd, - ptr->read_ptr + ptr->read_data_length, - MEMCACHED_MAX_BUFFER - ptr->read_data_length, - MSG_DONTWAIT); - - switch (nr) + ssize_t nr; + if ((nr= recv(ptr->fd, + ptr->read_ptr + ptr->read_data_length, + MEMCACHED_MAX_BUFFER - ptr->read_data_length, + MSG_DONTWAIT)) <= 0) { - case SOCKET_ERROR: + if (nr == 0) + { + memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); + } + else { switch (get_socket_errno()) { case EINTR: continue; +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -201,24 +104,19 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } } - break; - case 0: // Shutdown on the socket has occurred - { - memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); - } break; + } + else // We read data, append to our read buffer + { + ptr->read_data_length+= size_t(nr); + ptr->read_buffer_length+= size_t(nr); - default: - { - ptr->read_data_length+= size_t(nr); - ptr->read_buffer_length+= size_t(nr); - return true; - } - break; + return true; } - } while (0); + } while (false); } + return false; } @@ -232,13 +130,13 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) * @param ptr the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(memcached_server_write_instance_st ptr) +static bool process_input_buffer(org::libmemcached::Instance* ptr) { /* ** We might be able to process some of the response messages if we ** have a callback set up */ - if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false) + if (ptr->root->callbacks != NULL) { /* * We might have responses... try to read them out and fire @@ -249,10 +147,8 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) memcached_set_processing_input((memcached_st *)ptr->root, true); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - memcached_return_t error; memcached_st *root= (memcached_st *)ptr->root; - error= memcached_response(ptr, buffer, sizeof(buffer), - &root->result); + memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result); memcached_set_processing_input(root, false); @@ -262,7 +158,9 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) { error= (*cb.callback[x])(ptr->root, &root->result, cb.context); if (error != MEMCACHED_SUCCESS) + { break; + } } /* @todo what should I do with the error message??? */ @@ -274,96 +172,341 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) return false; } -memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, - void *buffer, size_t length, ssize_t *nread) +static memcached_return_t io_wait(org::libmemcached::Instance* ptr, + const memc_read_or_write read_or_write) { - assert(ptr); // Programmer error - char *buffer_ptr= static_cast(buffer); + /* + ** We are going to block on write, but at least on Solaris we might block + ** on write if we haven't read anything from our input buffer.. + ** Try to purge the input buffer if we don't do any flow control in the + ** application layer (just sending a lot of data etc) + ** The test is moved down in the purge function to avoid duplication of + ** the test. + */ + if (read_or_write == MEM_WRITE) + { + if (memcached_purge(ptr) == false) + { + return MEMCACHED_FAILURE; + } + } - if (ptr->fd == INVALID_SOCKET) + struct pollfd fds; + fds.fd= ptr->fd; + fds.events= POLLIN; + fds.revents= 0; + + if (read_or_write == MEM_WRITE) /* write */ { - assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO)); - return MEMCACHED_CONNECTION_FAILURE; + fds.events= POLLOUT; + ptr->io_wait_count.write++; + } + else + { + ptr->io_wait_count.read++; } - while (length) + if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) { - if (not ptr->read_buffer_length) + ptr->io_wait_count.timeouts++; + return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + } + + size_t loop_max= 5; + while (--loop_max) // While loop is for ERESTART or EINTR + { + int active_fd= poll(&fds, 1, ptr->root->poll_timeout); + + if (active_fd >= 1) { - ssize_t data_read; - do + assert_msg(active_fd == 1 , "poll() returned an unexpected number of active file descriptors"); + if (fds.revents & POLLIN or fds.revents & POLLOUT) { - data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); - if (data_read == SOCKET_ERROR) + return MEMCACHED_SUCCESS; + } + + if (fds.revents & POLLHUP) + { + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("poll() detected hang up")); + } + + if (fds.revents & POLLERR) + { + int local_errno= EINVAL; + int err; + socklen_t len= sizeof (err); + if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) { - switch (get_socket_errno()) + if (err == 0) // treat this as EINTR { - case EINTR: // We just retry continue; + } + local_errno= err; + } + memcached_quit_server(ptr, true); + return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, + memcached_literal_param("poll() returned POLLHUP")); + } + + return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with")); + } - case ETIMEDOUT: // OSX - case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: -#endif + if (active_fd == 0) + { + ptr->io_wait_count.timeouts++; + return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + } + + // Only an error should result in this code being called. + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + assert_msg(active_fd == -1 , "poll() returned an unexpected value"); + switch (local_errno) + { #ifdef TARGET_OS_LINUX - case ERESTART: + case ERESTART: #endif - if (memcached_success(io_wait(ptr, MEM_READ))) - { - continue; - } - return MEMCACHED_IN_PROGRESS; - - /* fall through */ - - case ENOTCONN: // Programmer Error - WATCHPOINT_ASSERT(0); - case ENOTSOCK: - WATCHPOINT_ASSERT(0); - case EBADF: - assert(ptr->fd != INVALID_SOCKET); - case EINVAL: - case EFAULT: - case ECONNREFUSED: - default: - { - memcached_quit_server(ptr, true); - *nread= -1; - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } - } - } - else if (data_read == 0) + case EINTR: + continue; + + case EFAULT: + case ENOMEM: + memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + + case EINVAL: + memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + + default: + memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); + } + + break; + } + + memcached_quit_server(ptr, true); + + if (memcached_has_error(ptr)) + { + return memcached_instance_error_return(ptr); + } + + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("number of attempts to call io_wait() failed")); +} + +static bool io_flush(org::libmemcached::Instance* ptr, + const bool with_flush, + memcached_return_t& error) +{ + /* + ** We might want to purge the input buffer if we haven't consumed + ** any output yet... The test for the limits is the purge is inline + ** in the purge function to avoid duplicating the logic.. + */ + { + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + + if (memcached_purge(ptr) == false) + { + return false; + } + } + char *local_write_ptr= ptr->write_buffer; + size_t write_length= ptr->write_buffer_offset; + + error= MEMCACHED_SUCCESS; + + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + + /* Looking for memory overflows */ +#if defined(DEBUG) + if (write_length == MEMCACHED_MAX_BUFFER) + WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); + WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); +#endif + + while (write_length) + { + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + WATCHPOINT_ASSERT(write_length > 0); + + int flags= with_flush ? MSG_NOSIGNAL|MSG_DONTWAIT : MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE; + ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags); + + if (sent_length == SOCKET_ERROR) + { +#if 0 // @todo I should look at why we hit this bit of code hard frequently + WATCHPOINT_ERRNO(get_socket_errno()); + WATCHPOINT_NUMBER(get_socket_errno()); +#endif + switch (get_socket_errno()) + { + case ENOBUFS: + continue; + +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + case EAGAIN: { /* - EOF. Any data received so far is incomplete - so discard it. This always reads by byte in case of TCP - and protocol enforcement happens at memcached_response() - looking for '\n'. We do not care for UDB which requests 8 bytes - at once. Generally, this means that connection went away. Since - for blocking I/O we do not return 0 and for non-blocking case - it will return EGAIN if data is not immediatly available. - */ - WATCHPOINT_STRING("We had a zero length recv()"); - assert(0); + * We may be blocked on write because the input buffer + * is full. Let's check if we have room in our input + * buffer for more data and retry the write before + * waiting.. + */ + if (repack_input_buffer(ptr) or process_input_buffer(ptr)) + { + continue; + } + + memcached_return_t rc= io_wait(ptr, MEM_WRITE); + if (memcached_success(rc)) + { + continue; + } + else if (rc == MEMCACHED_TIMEOUT) + { + return false; + } + memcached_quit_server(ptr, true); - *nread= -1; - return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT); + error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + return false; } - } while (data_read <= 0); + case ENOTCONN: + case EPIPE: + default: + memcached_quit_server(ptr, true); + error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET); + return false; + } + } + + ptr->io_bytes_sent+= uint32_t(sent_length); + + local_write_ptr+= sent_length; + write_length-= uint32_t(sent_length); + } + + WATCHPOINT_ASSERT(write_length == 0); + ptr->write_buffer_offset= 0; + + return true; +} + +memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* ptr) +{ + return io_wait(ptr, MEM_WRITE); +} + +static memcached_return_t _io_fill(org::libmemcached::Instance* ptr) +{ + ssize_t data_read; + do + { + data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); + if (data_read == SOCKET_ERROR) + { + switch (get_socket_errno()) + { + case EINTR: // We just retry + continue; + + case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + case EAGAIN: +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + { + memcached_return_t io_wait_ret; + if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ))) + { + continue; + } + + return io_wait_ret; + } + + /* fall through */ + + case ENOTCONN: // Programmer Error + WATCHPOINT_ASSERT(0); + case ENOTSOCK: + WATCHPOINT_ASSERT(0); + case EBADF: + assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket"); + case EINVAL: + case EFAULT: + case ECONNREFUSED: + default: + memcached_quit_server(ptr, true); + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + break; + } - ptr->io_bytes_sent = 0; - ptr->read_data_length= (size_t) data_read; - ptr->read_buffer_length= (size_t) data_read; - ptr->read_ptr= ptr->read_buffer; + return memcached_instance_error_return(ptr); } + else if (data_read == 0) + { + /* + EOF. Any data received so far is incomplete + so discard it. This always reads by byte in case of TCP + and protocol enforcement happens at memcached_response() + looking for '\n'. We do not care for UDB which requests 8 bytes + at once. Generally, this means that connection went away. Since + for blocking I/O we do not return 0 and for non-blocking case + it will return EGAIN if data is not immediatly available. + */ + memcached_quit_server(ptr, true); + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("::rec() returned zero, server has disconnected")); + } + ptr->io_wait_count._bytes_read+= data_read; + } while (data_read <= 0); - if (length > 1) + ptr->io_bytes_sent= 0; + ptr->read_data_length= (size_t) data_read; + ptr->read_buffer_length= (size_t) data_read; + ptr->read_ptr= ptr->read_buffer; + + return MEMCACHED_SUCCESS; +} + +memcached_return_t memcached_io_read(org::libmemcached::Instance* ptr, + void *buffer, size_t length, ssize_t& nread) +{ + assert(memcached_is_udp(ptr->root) == false); + assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error + char *buffer_ptr= static_cast(buffer); + + if (ptr->fd == INVALID_SOCKET) + { +#if 0 + assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state"); +#endif + return MEMCACHED_CONNECTION_FAILURE; + } + + while (length) + { + if (ptr->read_buffer_length == 0) { - size_t difference; + memcached_return_t io_fill_ret; + if (memcached_fatal(io_fill_ret= _io_fill(ptr))) + { + nread= -1; + return io_fill_ret; + } + } - difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; + if (length > 1) + { + size_t difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; memcpy(buffer_ptr, ptr->read_ptr, difference); length -= difference; @@ -381,18 +524,19 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, } } - ptr->server_failure_counter= 0; - *nread = (ssize_t)(buffer_ptr - (char*)buffer); + nread= ssize_t(buffer_ptr - (char*)buffer); + return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) +memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr) { - assert(ptr); // Programmer error + assert_msg(ptr, "Programmer error, invalid Instance"); + assert(memcached_is_udp(ptr->root) == false); if (ptr->fd == INVALID_SOCKET) { - assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO)); + assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state"); return MEMCACHED_CONNECTION_FAILURE; } @@ -409,10 +553,10 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) continue; case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -425,11 +569,11 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) /* fall through */ case ENOTCONN: // Programmer Error - WATCHPOINT_ASSERT(0); + assert(0); case ENOTSOCK: - WATCHPOINT_ASSERT(0); + assert(0); case EBADF: - assert(ptr->fd != INVALID_SOCKET); + assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state"); case EINVAL: case EFAULT: case ECONNREFUSED: @@ -442,39 +586,23 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) return MEMCACHED_CONNECTION_FAILURE; } -static ssize_t _io_write(memcached_server_write_instance_st ptr, - const void *buffer, size_t length, bool with_flush) +static bool _io_write(org::libmemcached::Instance* ptr, + const void *buffer, size_t length, bool with_flush, + size_t& written) { - size_t original_length; - const char* buffer_ptr; + assert(ptr->fd != INVALID_SOCKET); + assert(memcached_is_udp(ptr->root) == false); - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + const char *buffer_ptr= static_cast(buffer); - original_length= length; - buffer_ptr= static_cast(buffer); + const size_t original_length= length; while (length) { char *write_ptr; - size_t should_write; - size_t buffer_end; - - if (ptr->type == MEMCACHED_CONNECTION_UDP) - { - //UDP does not support partial writes - buffer_end= MAX_UDP_DATAGRAM_LENGTH; - should_write= length; - if (ptr->write_buffer_offset + should_write > buffer_end) - { - return -1; - } - } - else - { - buffer_end= MEMCACHED_MAX_BUFFER; - should_write= buffer_end - ptr->write_buffer_offset; - should_write= (should_write < length) ? should_write : length; - } + size_t buffer_end= MEMCACHED_MAX_BUFFER; + size_t should_write= buffer_end -ptr->write_buffer_offset; + should_write= (should_write < length) ? should_write : length; write_ptr= ptr->write_buffer + ptr->write_buffer_offset; memcpy(write_ptr, buffer_ptr, should_write); @@ -482,21 +610,15 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) + if (ptr->write_buffer_offset == buffer_end) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); memcached_return_t rc; - ssize_t sent_length= io_flush(ptr, with_flush, &rc); - if (sent_length == -1) + if (io_flush(ptr, with_flush, rc) == false) { - return -1; - } - - /* If io_flush calls memcached_purge, sent_length may be 0 */ - unlikely (sent_length != 0) - { - WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end); + written= original_length -length; + return false; } } } @@ -505,51 +627,71 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, { memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (io_flush(ptr, with_flush, &rc) == -1) + if (io_flush(ptr, with_flush, rc) == false) { - return -1; + written= original_length -length; + return false; } } - return (ssize_t) original_length; + written= original_length -length; + + return true; } -ssize_t memcached_io_write(memcached_server_write_instance_st ptr, - const void *buffer, size_t length, bool with_flush) +bool memcached_io_write(org::libmemcached::Instance* ptr) { - return _io_write(ptr, buffer, length, with_flush); + size_t written; + return _io_write(ptr, NULL, 0, true, written); } -ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, - const struct libmemcached_io_vector_st *vector, - size_t number_of, bool with_flush) +ssize_t memcached_io_write(org::libmemcached::Instance* ptr, + const void *buffer, const size_t length, const bool with_flush) { + size_t written; + + if (_io_write(ptr, buffer, length, with_flush, written) == false) + { + return -1; + } + + return ssize_t(written); +} + +bool memcached_io_writev(org::libmemcached::Instance* ptr, + libmemcached_io_vector_st vector[], + const size_t number_of, const bool with_flush) +{ + ssize_t complete_total= 0; ssize_t total= 0; for (size_t x= 0; x < number_of; x++, vector++) { - ssize_t returnable; - - if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + complete_total+= vector->length; + if (vector->length) { - return -1; + size_t written; + if ((_io_write(ptr, vector->buffer, vector->length, false, written)) == false) + { + return false; + } + total+= written; } - total+= returnable; } if (with_flush) { - if (memcached_io_write(ptr, NULL, 0, true) == -1) + if (memcached_io_write(ptr) == false) { - return -1; + return false; } } - return total; + return (complete_total == total); } -void memcached_io_close(memcached_server_write_instance_st ptr) +void memcached_io_close(org::libmemcached::Instance* ptr) { if (ptr->fd == INVALID_SOCKET) { @@ -572,25 +714,26 @@ void memcached_io_close(memcached_server_write_instance_st ptr) ptr->fd= INVALID_SOCKET; } -memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc) +org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; - unsigned int host_index= 0; + nfds_t host_index= 0; - for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) + for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, x); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ + { return instance; + } - if (memcached_server_response_count(instance) > 0) + if (memcached_instance_response_count(instance) > 0) { - fds[host_index].events = POLLIN; - fds[host_index].revents = 0; - fds[host_index].fd = instance->fd; + fds[host_index].events= POLLIN; + fds[host_index].revents= 0; + fds[host_index].fd= instance->fd; ++host_index; } } @@ -600,10 +743,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, x); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); - if (memcached_server_response_count(instance) > 0) + if (memcached_instance_response_count(instance) > 0) { return instance; } @@ -622,178 +764,30 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st break; default: - for (size_t x= 0; x < host_index; ++x) + for (nfds_t x= 0; x < host_index; ++x) { if (fds[x].revents & POLLIN) { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, y); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, y); if (instance->fd == fds[x].fd) - return instance; - } - } - } - } - - return NULL; -} - -static ssize_t io_flush(memcached_server_write_instance_st ptr, - const bool with_flush, - memcached_return_t *error) -{ - /* - ** We might want to purge the input buffer if we haven't consumed - ** any output yet... The test for the limits is the purge is inline - ** in the purge function to avoid duplicating the logic.. - */ - { - memcached_return_t rc; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - rc= memcached_purge(ptr); - - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - { - return -1; - } - } - ssize_t sent_length; - size_t return_length; - char *local_write_ptr= ptr->write_buffer; - size_t write_length= ptr->write_buffer_offset; - - *error= MEMCACHED_SUCCESS; - - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - - // UDP Sanity check, make sure that we are not sending somthing too big - if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) - { - *error= MEMCACHED_WRITE_FAILURE; - return -1; - } - - if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP - && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) - { - return 0; - } - - /* Looking for memory overflows */ -#if defined(DEBUG) - if (write_length == MEMCACHED_MAX_BUFFER) - WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); - WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); -#endif - - return_length= 0; - while (write_length) - { - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - WATCHPOINT_ASSERT(write_length > 0); - sent_length= 0; - if (ptr->type == MEMCACHED_CONNECTION_UDP) - increment_udp_message_id(ptr); - - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (with_flush) - { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); - } - else - { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); - } - - if (sent_length == SOCKET_ERROR) - { - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); -#if 0 // @todo I should look at why we hit this bit of code hard frequently - WATCHPOINT_ERRNO(get_socket_errno()); - WATCHPOINT_NUMBER(get_socket_errno()); -#endif - switch (get_socket_errno()) - { - case ENOBUFS: - continue; - case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: -#endif - { - /* - * We may be blocked on write because the input buffer - * is full. Let's check if we have room in our input - * buffer for more data and retry the write before - * waiting.. - */ - if (repack_input_buffer(ptr) or - process_input_buffer(ptr)) - { - continue; - } - - memcached_return_t rc= io_wait(ptr, MEM_WRITE); - if (memcached_success(rc)) - { - continue; - } - else if (rc == MEMCACHED_TIMEOUT) { - *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); - return -1; + return instance; } - - memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - return -1; } - case ENOTCONN: - case EPIPE: - default: - memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - WATCHPOINT_ASSERT(ptr->fd == -1); - return -1; } } - - if (ptr->type == MEMCACHED_CONNECTION_UDP and - (size_t)sent_length != write_length) - { - memcached_quit_server(ptr, true); - *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return -1; - } - - ptr->io_bytes_sent += (uint32_t) sent_length; - - local_write_ptr+= sent_length; - write_length-= (uint32_t) sent_length; - return_length+= (uint32_t) sent_length; } - WATCHPOINT_ASSERT(write_length == 0); - // Need to study this assert() WATCHPOINT_ASSERT(return_length == - // ptr->write_buffer_offset); - - // if we are a udp server, the begining of the buffer is reserverd for - // the upd frame header - if (ptr->type == MEMCACHED_CONNECTION_UDP) - ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; - else - ptr->write_buffer_offset= 0; - - return (ssize_t) return_length; + return NULL; } /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(memcached_server_write_instance_st ptr) +void memcached_io_reset(org::libmemcached::Instance* ptr) { memcached_quit_server(ptr, true); } @@ -802,9 +796,9 @@ void memcached_io_reset(memcached_server_write_instance_st ptr) * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, +memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr, void *dta, - size_t size) + const size_t size) { size_t offset= 0; char *data= static_cast(dta); @@ -814,27 +808,28 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, ssize_t nread; memcached_return_t rc; - while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { }; + while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, nread))) { }; if (memcached_failed(rc)) { return rc; } - offset+= (size_t) nread; + offset+= size_t(nread); } return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, +memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr, char *buffer_ptr, - size_t size) + size_t size, + size_t& total_nr) { + total_nr= 0; bool line_complete= false; - size_t total_nr= 0; - while (not line_complete) + while (line_complete == false) { if (ptr->read_buffer_length == 0) { @@ -844,7 +839,7 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, * the logic. */ ssize_t nread; - memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread); + memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, nread); if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) { memcached_quit_server(ptr, true); @@ -856,18 +851,22 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, } if (*buffer_ptr == '\n') + { line_complete= true; + } ++buffer_ptr; ++total_nr; } /* Now let's look in the buffer and copy as we go! */ - while (ptr->read_buffer_length && total_nr < size && !line_complete) + while (ptr->read_buffer_length and total_nr < size and line_complete == false) { *buffer_ptr = *ptr->read_ptr; if (*buffer_ptr == '\n') + { line_complete = true; + } --ptr->read_buffer_length; ++ptr->read_ptr; ++total_nr; @@ -875,45 +874,10 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, } if (total_nr == size) + { return MEMCACHED_PROTOCOL_ERROR; + } } return MEMCACHED_SUCCESS; } - -/* - * The udp request id consists of two seperate sections - * 1) The thread id - * 2) The message number - * The thread id should only be set when the memcached_st struct is created - * and should not be changed. - * - * The message num is incremented for each new message we send, this function - * extracts the message number from message_id, increments it and then - * writes the new value back into the header - */ -static void increment_udp_message_id(memcached_server_write_instance_st ptr) -{ - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - uint16_t cur_req= get_udp_datagram_request_id(header); - int msg_num= get_msg_num_from_request_id(cur_req); - int thread_id= get_thread_id_from_request_id(cur_req); - - if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) - msg_num= 0; - - header->request_id= htons((uint16_t) (thread_id | msg_num)); -} - -memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id) -{ - if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) - return MEMCACHED_FAILURE; - - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id))); - header->num_datagrams= htons(1); - header->sequence_number= htons(0); - - return MEMCACHED_SUCCESS; -}