From 39575a40cf067da39e3f578bb3439d17ebd27638 Mon Sep 17 00:00:00 2001 From: Brian Aker Date: Thu, 10 Jan 2013 06:35:33 -0500 Subject: [PATCH] Split up how we call quit. --- libmemcached/io.cc | 283 ++++++++++++++++++++++--------------------- libmemcached/quit.cc | 132 ++++++++++---------- 2 files changed, 209 insertions(+), 206 deletions(-) diff --git a/libmemcached/io.cc b/libmemcached/io.cc index c686397f..60614f5c 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -59,34 +59,34 @@ enum memc_read_or_write { * Try to fill the input buffer for a server with as much * data as possible. * - * @param ptr the server to pack + * @param instance the server to pack */ -static bool repack_input_buffer(org::libmemcached::Instance* ptr) +static bool repack_input_buffer(org::libmemcached::Instance* instance) { - if (ptr->read_ptr != ptr->read_buffer) + if (instance->read_ptr != instance->read_buffer) { /* Move all of the data to the beginning of the buffer so ** that we can fit more data into the buffer... */ - memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length); - ptr->read_ptr= ptr->read_buffer; - ptr->read_data_length= ptr->read_buffer_length; + memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length); + instance->read_ptr= instance->read_buffer; + instance->read_data_length= instance->read_buffer_length; } /* There is room in the buffer, try to fill it! */ - if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) + if (instance->read_buffer_length != MEMCACHED_MAX_BUFFER) { do { /* Just try a single read to grab what's available */ ssize_t nr; - if ((nr= ::recv(ptr->fd, - ptr->read_ptr + ptr->read_data_length, - MEMCACHED_MAX_BUFFER - ptr->read_data_length, - MSG_NOSIGNAL)) <= 0) + if ((nr= ::recv(instance->fd, + instance->read_ptr + instance->read_data_length, + MEMCACHED_MAX_BUFFER - instance->read_data_length, + MSG_NOSIGNAL)) <= 0) { if (nr == 0) { - memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); + memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); } else { @@ -105,7 +105,7 @@ static bool repack_input_buffer(org::libmemcached::Instance* ptr) break; // No IO is fine, we can just move on default: - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); } } @@ -113,8 +113,8 @@ static bool repack_input_buffer(org::libmemcached::Instance* ptr) } else // We read data, append to our read buffer { - ptr->read_data_length+= size_t(nr); - ptr->read_buffer_length+= size_t(nr); + instance->read_data_length+= size_t(nr); + instance->read_buffer_length+= size_t(nr); return true; } @@ -131,28 +131,28 @@ static bool repack_input_buffer(org::libmemcached::Instance* ptr) * when the input buffer is full, so that we _know_ that we have * at least _one_ message to process. * - * @param ptr the server to star processing iput messages for + * @param instance the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(org::libmemcached::Instance* ptr) +static bool process_input_buffer(org::libmemcached::Instance* instance) { /* ** We might be able to process some of the response messages if we ** have a callback set up */ - if (ptr->root->callbacks != NULL) + if (instance->root->callbacks != NULL) { /* * We might have responses... try to read them out and fire * callbacks */ - memcached_callback_st cb= *ptr->root->callbacks; + memcached_callback_st cb= *instance->root->callbacks; - memcached_set_processing_input((memcached_st *)ptr->root, true); + memcached_set_processing_input((memcached_st *)instance->root, true); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - memcached_st *root= (memcached_st *)ptr->root; - memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result); + memcached_st *root= (memcached_st *)instance->root; + memcached_return_t error= memcached_response(instance, buffer, sizeof(buffer), &root->result); memcached_set_processing_input(root, false); @@ -160,7 +160,7 @@ static bool process_input_buffer(org::libmemcached::Instance* ptr) { for (unsigned int x= 0; x < cb.number_of_callback; x++) { - error= (*cb.callback[x])(ptr->root, &root->result, cb.context); + error= (*cb.callback[x])(instance->root, &root->result, cb.context); if (error != MEMCACHED_SUCCESS) { break; @@ -176,7 +176,7 @@ static bool process_input_buffer(org::libmemcached::Instance* ptr) return false; } -static memcached_return_t io_wait(org::libmemcached::Instance* ptr, +static memcached_return_t io_wait(org::libmemcached::Instance* instance, const memc_read_or_write read_or_write) { /* @@ -189,37 +189,37 @@ static memcached_return_t io_wait(org::libmemcached::Instance* ptr, */ if (read_or_write == MEM_WRITE) { - if (memcached_purge(ptr) == false) + if (memcached_purge(instance) == false) { return MEMCACHED_FAILURE; } } struct pollfd fds; - fds.fd= ptr->fd; + fds.fd= instance->fd; fds.events= POLLIN; fds.revents= 0; if (read_or_write == MEM_WRITE) /* write */ { fds.events= POLLOUT; - ptr->io_wait_count.write++; + instance->io_wait_count.write++; } else { - ptr->io_wait_count.read++; + instance->io_wait_count.read++; } - if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) + if (instance->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) { - ptr->io_wait_count.timeouts++; - return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + instance->io_wait_count.timeouts++; + return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT); } size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { - int active_fd= poll(&fds, 1, ptr->root->poll_timeout); + int active_fd= poll(&fds, 1, instance->root->poll_timeout); if (active_fd >= 1) { @@ -231,7 +231,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* ptr, if (fds.revents & POLLHUP) { - return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() detected hang up")); } @@ -240,7 +240,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* ptr, int local_errno= EINVAL; int err; socklen_t len= sizeof (err); - if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == 0) + if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == 0) { if (err == 0) // treat this as EINTR { @@ -248,18 +248,18 @@ static memcached_return_t io_wait(org::libmemcached::Instance* ptr, } local_errno= err; } - memcached_quit_server(ptr, true); - return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, + memcached_quit_server(instance, true); + return memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll() returned POLLHUP")); } - return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with")); + return memcached_set_error(*instance, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with")); } if (active_fd == 0) { - ptr->io_wait_count.timeouts++; - return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + instance->io_wait_count.timeouts++; + return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT); } // Only an error should result in this code being called. @@ -275,30 +275,30 @@ static memcached_return_t io_wait(org::libmemcached::Instance* ptr, case EFAULT: case ENOMEM: - memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); case EINVAL: - memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); default: - memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); + memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); } break; } - memcached_quit_server(ptr, true); + memcached_quit_server(instance, true); - if (memcached_has_error(ptr)) + if (memcached_has_error(instance)) { - return memcached_instance_error_return(ptr); + return memcached_instance_error_return(instance); } - return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, memcached_literal_param("number of attempts to call io_wait() failed")); } -static bool io_flush(org::libmemcached::Instance* ptr, +static bool io_flush(org::libmemcached::Instance* instance, const bool with_flush, memcached_return_t& error) { @@ -308,30 +308,30 @@ static bool io_flush(org::libmemcached::Instance* ptr, ** in the purge function to avoid duplicating the logic.. */ { - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET); - if (memcached_purge(ptr) == false) + if (memcached_purge(instance) == false) { return false; } } - char *local_write_ptr= ptr->write_buffer; - size_t write_length= ptr->write_buffer_offset; + char *local_write_ptr= instance->write_buffer; + size_t write_length= instance->write_buffer_offset; error= MEMCACHED_SUCCESS; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET); /* Looking for memory overflows */ #if defined(DEBUG) if (write_length == MEMCACHED_MAX_BUFFER) - WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); - WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); + WATCHPOINT_ASSERT(instance->write_buffer == local_write_ptr); + WATCHPOINT_ASSERT((instance->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); #endif while (write_length) { - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); int flags; @@ -344,7 +344,7 @@ static bool io_flush(org::libmemcached::Instance* ptr, flags= MSG_NOSIGNAL|MSG_MORE; } - ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags); + ssize_t sent_length= ::send(instance->fd, local_write_ptr, write_length, flags); if (sent_length == SOCKET_ERROR) { @@ -368,12 +368,12 @@ static bool io_flush(org::libmemcached::Instance* ptr, * buffer for more data and retry the write before * waiting.. */ - if (repack_input_buffer(ptr) or process_input_buffer(ptr)) + if (repack_input_buffer(instance) or process_input_buffer(instance)) { continue; } - memcached_return_t rc= io_wait(ptr, MEM_WRITE); + memcached_return_t rc= io_wait(instance, MEM_WRITE); if (memcached_success(rc)) { continue; @@ -383,43 +383,43 @@ static bool io_flush(org::libmemcached::Instance* ptr, return false; } - memcached_quit_server(ptr, true); - error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + memcached_quit_server(instance, true); + error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); return false; } case ENOTCONN: case EPIPE: default: - memcached_quit_server(ptr, true); - error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET); + memcached_quit_server(instance, true); + error= memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); + WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET); return false; } } - ptr->io_bytes_sent+= uint32_t(sent_length); + instance->io_bytes_sent+= uint32_t(sent_length); local_write_ptr+= sent_length; write_length-= uint32_t(sent_length); } WATCHPOINT_ASSERT(write_length == 0); - ptr->write_buffer_offset= 0; + instance->write_buffer_offset= 0; return true; } -memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* ptr) +memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* instance) { - return io_wait(ptr, MEM_WRITE); + return io_wait(instance, MEM_WRITE); } -static memcached_return_t _io_fill(org::libmemcached::Instance* ptr) +static memcached_return_t _io_fill(org::libmemcached::Instance* instance) { ssize_t data_read; do { - data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL); + data_read= ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL); if (data_read == SOCKET_ERROR) { switch (get_socket_errno()) @@ -437,7 +437,7 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* ptr) #endif { memcached_return_t io_wait_ret; - if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ))) + if (memcached_success(io_wait_ret= io_wait(instance, MEM_READ))) { continue; } @@ -452,17 +452,17 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* ptr) case ENOTSOCK: WATCHPOINT_ASSERT(0); case EBADF: - assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket"); + assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket"); case EINVAL: case EFAULT: case ECONNREFUSED: default: - memcached_quit_server(ptr, true); - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + memcached_quit_server(instance, true); + memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT); break; } - return memcached_instance_error_return(ptr); + return memcached_instance_error_return(instance); } else if (data_read == 0) { @@ -475,42 +475,42 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* ptr) for blocking I/O we do not return 0 and for non-blocking case it will return EGAIN if data is not immediatly available. */ - memcached_quit_server(ptr, true); - return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_quit_server(instance, true); + return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, memcached_literal_param("::rec() returned zero, server has disconnected")); } - ptr->io_wait_count._bytes_read+= data_read; + instance->io_wait_count._bytes_read+= data_read; } while (data_read <= 0); - ptr->io_bytes_sent= 0; - ptr->read_data_length= (size_t) data_read; - ptr->read_buffer_length= (size_t) data_read; - ptr->read_ptr= ptr->read_buffer; + instance->io_bytes_sent= 0; + instance->read_data_length= (size_t) data_read; + instance->read_buffer_length= (size_t) data_read; + instance->read_ptr= instance->read_buffer; return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_read(org::libmemcached::Instance* ptr, +memcached_return_t memcached_io_read(org::libmemcached::Instance* instance, void *buffer, size_t length, ssize_t& nread) { - assert(memcached_is_udp(ptr->root) == false); - assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error + assert(memcached_is_udp(instance->root) == false); + assert_msg(instance, "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error char *buffer_ptr= static_cast(buffer); - if (ptr->fd == INVALID_SOCKET) + if (instance->fd == INVALID_SOCKET) { #if 0 - assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state"); + assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state"); #endif return MEMCACHED_CONNECTION_FAILURE; } while (length) { - if (ptr->read_buffer_length == 0) + if (instance->read_buffer_length == 0) { memcached_return_t io_fill_ret; - if (memcached_fatal(io_fill_ret= _io_fill(ptr))) + if (memcached_fatal(io_fill_ret= _io_fill(instance))) { nread= -1; return io_fill_ret; @@ -519,19 +519,19 @@ memcached_return_t memcached_io_read(org::libmemcached::Instance* ptr, if (length > 1) { - size_t difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; + size_t difference= (length > instance->read_buffer_length) ? instance->read_buffer_length : length; - memcpy(buffer_ptr, ptr->read_ptr, difference); + memcpy(buffer_ptr, instance->read_ptr, difference); length -= difference; - ptr->read_ptr+= difference; - ptr->read_buffer_length-= difference; + instance->read_ptr+= difference; + instance->read_buffer_length-= difference; buffer_ptr+= difference; } else { - *buffer_ptr= *ptr->read_ptr; - ptr->read_ptr++; - ptr->read_buffer_length--; + *buffer_ptr= *instance->read_ptr; + instance->read_ptr++; + instance->read_buffer_length--; buffer_ptr++; break; } @@ -542,14 +542,14 @@ memcached_return_t memcached_io_read(org::libmemcached::Instance* ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr) +memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) { - assert_msg(ptr, "Programmer error, invalid Instance"); - assert(memcached_is_udp(ptr->root) == false); + assert_msg(instance, "Programmer error, invalid Instance"); + assert(memcached_is_udp(instance->root) == false); - if (ptr->fd == INVALID_SOCKET) + if (instance->fd == INVALID_SOCKET) { - assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state"); + assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state"); return MEMCACHED_CONNECTION_FAILURE; } @@ -557,7 +557,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr) char buffer[MEMCACHED_MAX_BUFFER]; do { - data_read= ::recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_NOSIGNAL); + data_read= ::recv(instance->fd, instance->read_buffer, sizeof(buffer), MSG_NOSIGNAL); if (data_read == SOCKET_ERROR) { switch (get_socket_errno()) @@ -573,7 +573,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr) #ifdef TARGET_OS_LINUX case ERESTART: #endif - if (memcached_success(io_wait(ptr, MEM_READ))) + if (memcached_success(io_wait(instance, MEM_READ))) { continue; } @@ -586,7 +586,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr) case ENOTSOCK: assert(0); case EBADF: - assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state"); + assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state"); case EINVAL: case EFAULT: case ECONNREFUSED: @@ -599,12 +599,12 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr) return MEMCACHED_CONNECTION_FAILURE; } -static bool _io_write(org::libmemcached::Instance* ptr, +static bool _io_write(org::libmemcached::Instance* instance, const void *buffer, size_t length, bool with_flush, size_t& written) { - assert(ptr->fd != INVALID_SOCKET); - assert(memcached_is_udp(ptr->root) == false); + assert(instance->fd != INVALID_SOCKET); + assert(memcached_is_udp(instance->root) == false); const char *buffer_ptr= static_cast(buffer); @@ -614,21 +614,21 @@ static bool _io_write(org::libmemcached::Instance* ptr, { char *write_ptr; size_t buffer_end= MEMCACHED_MAX_BUFFER; - size_t should_write= buffer_end -ptr->write_buffer_offset; + size_t should_write= buffer_end -instance->write_buffer_offset; should_write= (should_write < length) ? should_write : length; - write_ptr= ptr->write_buffer + ptr->write_buffer_offset; + write_ptr= instance->write_buffer + instance->write_buffer_offset; memcpy(write_ptr, buffer_ptr, should_write); - ptr->write_buffer_offset+= should_write; + instance->write_buffer_offset+= should_write; buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == buffer_end) + if (instance->write_buffer_offset == buffer_end) { - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET); memcached_return_t rc; - if (io_flush(ptr, with_flush, rc) == false) + if (io_flush(instance, with_flush, rc) == false) { written= original_length -length; return false; @@ -639,8 +639,8 @@ static bool _io_write(org::libmemcached::Instance* ptr, if (with_flush) { memcached_return_t rc; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (io_flush(ptr, with_flush, rc) == false) + WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET); + if (io_flush(instance, with_flush, rc) == false) { written= original_length -length; return false; @@ -652,18 +652,18 @@ static bool _io_write(org::libmemcached::Instance* ptr, return true; } -bool memcached_io_write(org::libmemcached::Instance* ptr) +bool memcached_io_write(org::libmemcached::Instance* instance) { size_t written; - return _io_write(ptr, NULL, 0, true, written); + return _io_write(instance, NULL, 0, true, written); } -ssize_t memcached_io_write(org::libmemcached::Instance* ptr, +ssize_t memcached_io_write(org::libmemcached::Instance* instance, const void *buffer, const size_t length, const bool with_flush) { size_t written; - if (_io_write(ptr, buffer, length, with_flush, written) == false) + if (_io_write(instance, buffer, length, with_flush, written) == false) { return -1; } @@ -671,7 +671,7 @@ ssize_t memcached_io_write(org::libmemcached::Instance* ptr, return ssize_t(written); } -bool memcached_io_writev(org::libmemcached::Instance* ptr, +bool memcached_io_writev(org::libmemcached::Instance* instance, libmemcached_io_vector_st vector[], const size_t number_of, const bool with_flush) { @@ -684,7 +684,7 @@ bool memcached_io_writev(org::libmemcached::Instance* ptr, if (vector->length) { size_t written; - if ((_io_write(ptr, vector->buffer, vector->length, false, written)) == false) + if ((_io_write(instance, vector->buffer, vector->length, false, written)) == false) { return false; } @@ -694,7 +694,7 @@ bool memcached_io_writev(org::libmemcached::Instance* ptr, if (with_flush) { - if (memcached_io_write(ptr) == false) + if (memcached_io_write(instance) == false) { return false; } @@ -717,7 +717,7 @@ void org::libmemcached::Instance::close_socket() if (fd != INVALID_SOCKET) { /* in case of death shutdown to avoid blocking at close() */ - if (shutdown(fd, SHUT_RDWR) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) + if (shutdown(fd, SHUT_RD) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) { WATCHPOINT_NUMBER(fd); WATCHPOINT_ERRNO(get_socket_errno()); @@ -731,6 +731,19 @@ void org::libmemcached::Instance::close_socket() state= MEMCACHED_SERVER_STATE_NEW; fd= INVALID_SOCKET; } + + state= MEMCACHED_SERVER_STATE_NEW; + cursor_active_= 0; + io_bytes_sent= 0; + write_buffer_offset= size_t(root and memcached_is_udp(root) ? UDP_DATAGRAM_HEADER_LENGTH : 0); + read_buffer_length= 0; + read_ptr= read_buffer; + options.is_shutting_down= false; + memcached_server_response_reset(this); + + // We reset the version so that if we end up talking to a different server + // we don't have stale server version information. + major_version= minor_version= micro_version= UINT8_MAX; } org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&) @@ -806,16 +819,16 @@ org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(org::libmemcached::Instance* ptr) +void memcached_io_reset(org::libmemcached::Instance* instance) { - memcached_quit_server(ptr, true); + memcached_quit_server(instance, true); } /** * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr, +memcached_return_t memcached_safe_read(org::libmemcached::Instance* instance, void *dta, const size_t size) { @@ -827,7 +840,7 @@ memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr, ssize_t nread; memcached_return_t rc; - while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, nread))) { }; + while (memcached_continue(rc= memcached_io_read(instance, data + offset, size - offset, nread))) { }; if (memcached_failed(rc)) { @@ -840,7 +853,7 @@ memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr, +memcached_return_t memcached_io_readline(org::libmemcached::Instance* instance, char *buffer_ptr, size_t size, size_t& total_nr) @@ -850,7 +863,7 @@ memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr, while (line_complete == false) { - if (ptr->read_buffer_length == 0) + if (instance->read_buffer_length == 0) { /* * We don't have any data in the buffer, so let's fill the read @@ -858,11 +871,11 @@ memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr, * the logic. */ ssize_t nread; - memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, nread); + memcached_return_t rc= memcached_io_read(instance, buffer_ptr, 1, nread); if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) { - memcached_quit_server(ptr, true); - return memcached_set_error(*ptr, rc, MEMCACHED_AT); + memcached_quit_server(instance, true); + return memcached_set_error(*instance, rc, MEMCACHED_AT); } else if (memcached_failed(rc)) { @@ -879,15 +892,15 @@ memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr, } /* Now let's look in the buffer and copy as we go! */ - while (ptr->read_buffer_length and total_nr < size and line_complete == false) + while (instance->read_buffer_length and total_nr < size and line_complete == false) { - *buffer_ptr = *ptr->read_ptr; + *buffer_ptr = *instance->read_ptr; if (*buffer_ptr == '\n') { line_complete = true; } - --ptr->read_buffer_length; - ++ptr->read_ptr; + --instance->read_buffer_length; + ++instance->read_ptr; ++total_nr; ++buffer_ptr; } diff --git a/libmemcached/quit.cc b/libmemcached/quit.cc index c12a1d82..8b33255b 100644 --- a/libmemcached/quit.cc +++ b/libmemcached/quit.cc @@ -37,6 +37,65 @@ #include +namespace { + memcached_return_t send_quit_message(org::libmemcached::Instance* instance) + { + memcached_return_t rc; + if (instance->root->flags.binary_protocol) + { + protocol_binary_request_quit request= {}; // = {.bytes= {0}}; + + initialize_binary_request(instance, request.message.header); + + request.message.header.request.opcode = PROTOCOL_BINARY_CMD_QUIT; + request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES; + + libmemcached_io_vector_st vector[]= + { + { request.bytes, sizeof(request.bytes) } + }; + + rc= memcached_vdo(instance, vector, 1, true); + } + else + { + libmemcached_io_vector_st vector[]= + { + { memcached_literal_param("quit\r\n") } + }; + + rc= memcached_vdo(instance, vector, 1, true); + } + + return rc; + } + + void drain_instance(org::libmemcached::Instance* instance) + { + /* read until socket is closed, or there is an error + * closing the socket before all data is read + * results in server throwing away all data which is + * not read + * + * In .40 we began to only do this if we had been doing buffered + * requests of had replication enabled. + */ + if (instance->root->flags.buffer_requests or instance->root->number_of_replicas) + { + memcached_io_slurp(instance); + } + + /* + * memcached_io_read may call memcached_quit_server with io_death if + * it encounters problems, but we don't care about those occurences. + * The intention of that loop is to drain the data sent from the + * server to ensure that the server processed all of the data we + * sent to the server. + */ + instance->server_failure_counter= 0; + } +} + /* This closes all connections (forces flush of input as well). @@ -52,84 +111,15 @@ void memcached_quit_server(org::libmemcached::Instance* instance, bool io_death) { if (io_death == false and memcached_is_udp(instance->root) == false and instance->is_shutting_down() == false) { - memcached_return_t rc; - if (instance->root->flags.binary_protocol) - { - protocol_binary_request_quit request= {}; // = {.bytes= {0}}; - - initialize_binary_request(instance, request.message.header); - - request.message.header.request.opcode = PROTOCOL_BINARY_CMD_QUIT; - request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES; - - libmemcached_io_vector_st vector[]= - { - { request.bytes, sizeof(request.bytes) } - }; - - rc= memcached_vdo(instance, vector, 1, true); - } - else - { - libmemcached_io_vector_st vector[]= - { - { memcached_literal_param("quit\r\n") } - }; - - rc= memcached_vdo(instance, vector, 1, true); - } + send_quit_message(instance); instance->start_close_socket(); - - /* read until socket is closed, or there is an error - * closing the socket before all data is read - * results in server throwing away all data which is - * not read - * - * In .40 we began to only do this if we had been doing buffered - * requests of had replication enabled. - */ - if (memcached_success(rc) and (instance->root->flags.buffer_requests or instance->root->number_of_replicas)) - { - if (0) - { - memcached_return_t rc_slurp; - while (memcached_continue(rc_slurp= memcached_io_slurp(instance))) {} ; - WATCHPOINT_ASSERT(rc_slurp == MEMCACHED_CONNECTION_FAILURE); - } - else - { - memcached_io_slurp(instance); - } - } - - /* - * memcached_io_read may call memcached_quit_server with io_death if - * it encounters problems, but we don't care about those occurences. - * The intention of that loop is to drain the data sent from the - * server to ensure that the server processed all of the data we - * sent to the server. - */ - instance->server_failure_counter= 0; + drain_instance(instance); } - } instance->close_socket(); - instance->state= MEMCACHED_SERVER_STATE_NEW; - instance->cursor_active_= 0; - instance->io_bytes_sent= 0; - instance->write_buffer_offset= size_t(instance->root and memcached_is_udp(instance->root) ? UDP_DATAGRAM_HEADER_LENGTH : 0); - instance->read_buffer_length= 0; - instance->read_ptr= instance->read_buffer; - instance->options.is_shutting_down= false; - memcached_server_response_reset(instance); - - // We reset the version so that if we end up talking to a different server - // we don't have stale server version information. - instance->major_version= instance->minor_version= instance->micro_version= UINT8_MAX; - if (io_death) { memcached_mark_server_for_timeout(instance); -- 2.30.2