X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=ba4b347494a6e5261cafb68f7d9825a3e5ec0343;hb=8c96d045f23d3f787ea6fc84237eb9e30c1c4fce;hp=60614f5cfdf08b21b7994671cf41bdacc727c6fa;hpb=39575a40cf067da39e3f578bb3439d17ebd27638;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 60614f5c..ba4b3474 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -40,10 +40,10 @@ #include #ifdef HAVE_SYS_SOCKET_H -# include +# include #endif -void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header) +void initialize_binary_request(memcached_instance_st* server, protocol_binary_request_header& header) { server->request_id++; header.request.magic= PROTOCOL_BINARY_REQ; @@ -61,7 +61,7 @@ enum memc_read_or_write { * * @param instance the server to pack */ -static bool repack_input_buffer(org::libmemcached::Instance* instance) +static bool repack_input_buffer(memcached_instance_st* instance) { if (instance->read_ptr != instance->read_buffer) { @@ -99,7 +99,7 @@ static bool repack_input_buffer(org::libmemcached::Instance* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif break; // No IO is fine, we can just move on @@ -134,7 +134,7 @@ static bool repack_input_buffer(org::libmemcached::Instance* instance) * @param instance the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(org::libmemcached::Instance* instance) +static bool process_input_buffer(memcached_instance_st* instance) { /* ** We might be able to process some of the response messages if we @@ -148,10 +148,10 @@ static bool process_input_buffer(org::libmemcached::Instance* instance) */ memcached_callback_st cb= *instance->root->callbacks; - memcached_set_processing_input((memcached_st *)instance->root, true); + memcached_set_processing_input((Memcached *)instance->root, true); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - memcached_st *root= (memcached_st *)instance->root; + Memcached *root= (Memcached *)instance->root; memcached_return_t error= memcached_response(instance, buffer, sizeof(buffer), &root->result); memcached_set_processing_input(root, false); @@ -176,7 +176,7 @@ static bool process_input_buffer(org::libmemcached::Instance* instance) return false; } -static memcached_return_t io_wait(org::libmemcached::Instance* instance, +static memcached_return_t io_wait(memcached_instance_st* instance, const memc_read_or_write read_or_write) { /* @@ -212,8 +212,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, if (instance->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) { - instance->io_wait_count.timeouts++; - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_literal_param("poll_timeout() was set to zero")); } size_t loop_max= 5; @@ -258,8 +257,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, if (active_fd == 0) { - instance->io_wait_count.timeouts++; - return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_literal_param("No active_fd were found")); } // Only an error should result in this code being called. @@ -267,7 +265,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, assert_msg(active_fd == -1 , "poll() returned an unexpected value"); switch (local_errno) { -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif case EINTR: @@ -298,7 +296,7 @@ static memcached_return_t io_wait(org::libmemcached::Instance* instance, memcached_literal_param("number of attempts to call io_wait() failed")); } -static bool io_flush(org::libmemcached::Instance* instance, +static bool io_flush(memcached_instance_st* instance, const bool with_flush, memcached_return_t& error) { @@ -409,12 +407,17 @@ static bool io_flush(org::libmemcached::Instance* instance, return true; } -memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* instance) +memcached_return_t memcached_io_wait_for_write(memcached_instance_st* instance) { return io_wait(instance, MEM_WRITE); } -static memcached_return_t _io_fill(org::libmemcached::Instance* instance) +memcached_return_t memcached_io_wait_for_read(memcached_instance_st* instance) +{ + return io_wait(instance, MEM_READ); +} + +static memcached_return_t _io_fill(memcached_instance_st* instance) { ssize_t data_read; do @@ -432,7 +435,7 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif { @@ -490,7 +493,7 @@ static memcached_return_t _io_fill(org::libmemcached::Instance* instance) return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_read(org::libmemcached::Instance* instance, +memcached_return_t memcached_io_read(memcached_instance_st* instance, void *buffer, size_t length, ssize_t& nread) { assert(memcached_is_udp(instance->root) == false); @@ -542,7 +545,7 @@ memcached_return_t memcached_io_read(org::libmemcached::Instance* instance, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) +memcached_return_t memcached_io_slurp(memcached_instance_st* instance) { assert_msg(instance, "Programmer error, invalid Instance"); assert(memcached_is_udp(instance->root) == false); @@ -570,7 +573,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) case EWOULDBLOCK: #endif case EAGAIN: -#ifdef TARGET_OS_LINUX +#ifdef __linux case ERESTART: #endif if (memcached_success(io_wait(instance, MEM_READ))) @@ -599,7 +602,7 @@ memcached_return_t memcached_io_slurp(org::libmemcached::Instance* instance) return MEMCACHED_CONNECTION_FAILURE; } -static bool _io_write(org::libmemcached::Instance* instance, +static bool _io_write(memcached_instance_st* instance, const void *buffer, size_t length, bool with_flush, size_t& written) { @@ -652,13 +655,13 @@ static bool _io_write(org::libmemcached::Instance* instance, return true; } -bool memcached_io_write(org::libmemcached::Instance* instance) +bool memcached_io_write(memcached_instance_st* instance) { size_t written; return _io_write(instance, NULL, 0, true, written); } -ssize_t memcached_io_write(org::libmemcached::Instance* instance, +ssize_t memcached_io_write(memcached_instance_st* instance, const void *buffer, const size_t length, const bool with_flush) { size_t written; @@ -671,7 +674,7 @@ ssize_t memcached_io_write(org::libmemcached::Instance* instance, return ssize_t(written); } -bool memcached_io_writev(org::libmemcached::Instance* instance, +bool memcached_io_writev(memcached_instance_st* instance, libmemcached_io_vector_st vector[], const size_t number_of, const bool with_flush) { @@ -703,7 +706,7 @@ bool memcached_io_writev(org::libmemcached::Instance* instance, return (complete_total == total); } -void org::libmemcached::Instance::start_close_socket() +void memcached_instance_st::start_close_socket() { if (fd != INVALID_SOCKET) { @@ -712,24 +715,35 @@ void org::libmemcached::Instance::start_close_socket() } } -void org::libmemcached::Instance::close_socket() +void memcached_instance_st::reset_socket() { if (fd != INVALID_SOCKET) { + (void)closesocket(fd); + fd= INVALID_SOCKET; + } +} + +void memcached_instance_st::close_socket() +{ + if (fd != INVALID_SOCKET) + { + int shutdown_options= SHUT_RD; + if (options.is_shutting_down == false) + { + shutdown_options= SHUT_RDWR; + } + /* in case of death shutdown to avoid blocking at close() */ - if (shutdown(fd, SHUT_RD) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) + if (shutdown(fd, shutdown_options) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) { WATCHPOINT_NUMBER(fd); WATCHPOINT_ERRNO(get_socket_errno()); WATCHPOINT_ASSERT(get_socket_errno()); } - if (closesocket(fd) == SOCKET_ERROR) - { - WATCHPOINT_ERRNO(get_socket_errno()); - } + reset_socket(); state= MEMCACHED_SERVER_STATE_NEW; - fd= INVALID_SOCKET; } state= MEMCACHED_SERVER_STATE_NEW; @@ -746,7 +760,7 @@ void org::libmemcached::Instance::close_socket() major_version= minor_version= micro_version= UINT8_MAX; } -org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&) +memcached_instance_st* memcached_io_get_readable_server(Memcached *memc, memcached_return_t&) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; @@ -754,7 +768,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); + memcached_instance_st* instance= memcached_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ { @@ -775,7 +789,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); + memcached_instance_st* instance= memcached_instance_fetch(memc, x); if (instance->response_count() > 0) { @@ -802,7 +816,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - org::libmemcached::Instance* instance= memcached_instance_fetch(memc, y); + memcached_instance_st* instance= memcached_instance_fetch(memc, y); if (instance->fd == fds[x].fd) { @@ -819,7 +833,7 @@ org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(org::libmemcached::Instance* instance) +void memcached_io_reset(memcached_instance_st* instance) { memcached_quit_server(instance, true); } @@ -828,7 +842,7 @@ void memcached_io_reset(org::libmemcached::Instance* instance) * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(org::libmemcached::Instance* instance, +memcached_return_t memcached_safe_read(memcached_instance_st* instance, void *dta, const size_t size) { @@ -853,7 +867,7 @@ memcached_return_t memcached_safe_read(org::libmemcached::Instance* instance, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(org::libmemcached::Instance* instance, +memcached_return_t memcached_io_readline(memcached_instance_st* instance, char *buffer_ptr, size_t size, size_t& total_nr)