X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=cb8ed5c23199eac47b5665ca6733384216b4ffbb;hb=2973993864cf1347163ddfacbbab4f334ff33d35;hp=20233c7dc665cc668f524dfe56d03e25acac25a9;hpb=c5bd49aee7b7bcb434cc526ff67d4bccddd4ba90;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 20233c7d..cb8ed5c2 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -40,10 +40,10 @@ #include #ifdef HAVE_SYS_SOCKET_H -# include +# include #endif -void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header) +void initialize_binary_request(memcached_instance_st* server, protocol_binary_request_header& header) { server->request_id++; header.request.magic= PROTOCOL_BINARY_REQ; @@ -61,7 +61,7 @@ enum memc_read_or_write { * * @param instance the server to pack */ -static bool repack_input_buffer(org::libmemcached::Instance* instance) +static bool repack_input_buffer(memcached_instance_st* instance) { if (instance->read_ptr != instance->read_buffer) { @@ -134,7 +134,7 @@ static bool repack_input_buffer(org::libmemcached::Instance* instance) * @param instance the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(org::libmemcached::Instance* instance) +static bool process_input_buffer(memcached_instance_st* instance) { /* ** We might be able to process some of the response messages if we @@ -176,7 +176,7 @@ static bool process_input_buffer(org::libmemcached::Instance* instance) return false; } -static memcached_return_t io_wait(org::libmemcached::Instance* instance, +static memcached_return_t io_wait(memcached_instance_st* instance, const memc_read_or_write read_or_write) { /* @@ -212,8 +212,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, if (instance->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) { - instance->io_wait_count.timeouts++; - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_literal_param("poll_timeout() was set to zero")); } size_t loop_max= 5; @@ -258,8 +257,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, if (active_fd == 0) { - instance->io_wait_count.timeouts++; - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_literal_param("No active_fd were found")); } // Only an error should result in this code being called. @@ -298,7 +296,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, memcached_literal_param("number of attempts to call io_wait() failed")); } -static bool io_flush(org::libmemcached::Instance* instance, +static bool io_flush(memcached_instance_st* instance, const bool with_flush, memcached_return_t& error) { @@ -409,12 +407,12 @@ static bool io_flush(org::libmemcached::Instance* instance, return true; } -memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* instance) +memcached_return_t memcached_io_wait_for_write(memcached_instance_st* instance) { return io_wait(instance, MEM_WRITE); } -static memcached_return_t _io_fill(org::libmemcached::Instance* instance) +static memcached_return_t _io_fill(memcached_instance_st* instance) { ssize_t data_read; do @@ -490,7 +488,7 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* instance) return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_read(org::libmemcached::Instance* instance, +memcached_return_t memcached_io_read(memcached_instance_st* instance, void *buffer, size_t length, ssize_t& nread) { assert(memcached_is_udp(instance->root) == false); @@ -542,7 +540,7 @@ memcached_return_t memcached_io_read(org::libmemcached::Instance* instance, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) +memcached_return_t memcached_io_slurp(memcached_instance_st* instance) { assert_msg(instance, "Programmer error, invalid Instance"); assert(memcached_is_udp(instance->root) == false); @@ -599,7 +597,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) return MEMCACHED_CONNECTION_FAILURE; } -static bool _io_write(org::libmemcached::Instance* instance, +static bool _io_write(memcached_instance_st* instance, const void *buffer, size_t length, bool with_flush, size_t& written) { @@ -652,13 +650,13 @@ static bool _io_write(org::libmemcached::Instance* instance, return true; } -bool memcached_io_write(org::libmemcached::Instance* instance) +bool memcached_io_write(memcached_instance_st* instance) { size_t written; return _io_write(instance, NULL, 0, true, written); } -ssize_t memcached_io_write(org::libmemcached::Instance* instance, +ssize_t memcached_io_write(memcached_instance_st* instance, const void *buffer, const size_t length, const bool with_flush) { size_t written; @@ -671,7 +669,7 @@ ssize_t memcached_io_write(org::libmemcached::Instance* instance, return ssize_t(written); } -bool memcached_io_writev(org::libmemcached::Instance* instance, +bool memcached_io_writev(memcached_instance_st* instance, libmemcached_io_vector_st vector[], const size_t number_of, const bool with_flush) { @@ -703,7 +701,7 @@ bool memcached_io_writev(org::libmemcached::Instance* instance, return (complete_total == total); } -void org::libmemcached::Instance::start_close_socket() +void memcached_instance_st::start_close_socket() { if (fd != INVALID_SOCKET) { @@ -712,24 +710,35 @@ void org::libmemcached::Instance::start_close_socket() } } -void org::libmemcached::Instance::close_socket() +void memcached_instance_st::reset_socket() { if (fd != INVALID_SOCKET) { + (void)closesocket(fd); + fd= INVALID_SOCKET; + } +} + +void memcached_instance_st::close_socket() +{ + if (fd != INVALID_SOCKET) + { + int shutdown_options= SHUT_RD; + if (options.is_shutting_down == false) + { + shutdown_options= SHUT_RDWR; + } + /* in case of death shutdown to avoid blocking at close() */ - if (shutdown(fd, SHUT_RD) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) + if (shutdown(fd, shutdown_options) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) { WATCHPOINT_NUMBER(fd); WATCHPOINT_ERRNO(get_socket_errno()); WATCHPOINT_ASSERT(get_socket_errno()); } - if (closesocket(fd) == SOCKET_ERROR) - { - WATCHPOINT_ERRNO(get_socket_errno()); - } + reset_socket(); state= MEMCACHED_SERVER_STATE_NEW; - fd= INVALID_SOCKET; } state= MEMCACHED_SERVER_STATE_NEW; @@ -746,7 +755,7 @@ void org::libmemcached::Instance::close_socket() major_version= minor_version= micro_version= UINT8_MAX; } -org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, memcached_return_t&) +memcached_instance_st* memcached_io_get_readable_server(Memcached *memc, memcached_return_t&) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; @@ -754,7 +763,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, m for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); + memcached_instance_st* instance= memcached_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ { @@ -775,7 +784,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, m /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); + memcached_instance_st* instance= memcached_instance_fetch(memc, x); if (instance->response_count() > 0) { @@ -802,7 +811,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, m { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, y); + memcached_instance_st* instance= memcached_instance_fetch(memc, y); if (instance->fd == fds[x].fd) { @@ -819,7 +828,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(Memcached *memc, m /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(org::libmemcached::Instance* instance) +void memcached_io_reset(memcached_instance_st* instance) { memcached_quit_server(instance, true); } @@ -828,7 +837,7 @@ void memcached_io_reset(org::libmemcached::Instance* instance) * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(org::libmemcached::Instance* instance, +memcached_return_t memcached_safe_read(memcached_instance_st* instance, void *dta, const size_t size) { @@ -853,7 +862,7 @@ memcached_return_t memcached_safe_read(org::libmemcached::Instance* instance, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(org::libmemcached::Instance* instance, +memcached_return_t memcached_io_readline(memcached_instance_st* instance, char *buffer_ptr, size_t size, size_t& total_nr)