X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=8cee0178711eeff6587cc8ded3ae226bfa79c2fa;hb=76bf27c007d5015d3dcf2981c942d8afb2e97302;hp=bafa28c818a45c16f7b8da226c638e08da793c07;hpb=6ae03b4b29148dd9295e0eca7e84004b073a9ec0;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index bafa28c8..8cee0178 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -39,6 +39,17 @@ #include +#ifdef HAVE_SYS_SOCKET_H +# include +#endif + +void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header) +{ + server->request_id++; + header.request.magic= PROTOCOL_BINARY_REQ; + header.request.opaque= htons(server->request_id); +} + enum memc_read_or_write { MEM_READ, MEM_WRITE @@ -50,7 +61,7 @@ enum memc_read_or_write { * * @param ptr the server to pack */ -static bool repack_input_buffer(memcached_server_write_instance_st ptr) +static bool repack_input_buffer(org::libmemcached::Instance* ptr) { if (ptr->read_ptr != ptr->read_buffer) { @@ -68,10 +79,10 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) do { /* Just try a single read to grab what's available */ ssize_t nr; - if ((nr= recv(ptr->fd, + if ((nr= ::recv(ptr->fd, ptr->read_ptr + ptr->read_data_length, MEMCACHED_MAX_BUFFER - ptr->read_data_length, - MSG_DONTWAIT)) <= 0) + MSG_DONTWAIT|MSG_NOSIGNAL)) <= 0) { if (nr == 0) { @@ -123,7 +134,7 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) * @param ptr the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(memcached_server_write_instance_st ptr) +static bool process_input_buffer(org::libmemcached::Instance* ptr) { /* ** We might be able to process some of the response messages if we @@ -165,7 +176,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) return false; } -static memcached_return_t io_wait(memcached_server_write_instance_st ptr, +static memcached_return_t io_wait(org::libmemcached::Instance* ptr, const memc_read_or_write read_or_write) { /* @@ -205,7 +216,6 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); } - int local_errno; size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { @@ -213,17 +223,47 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, if (active_fd >= 1) { - assert_msg(active_fd == 1 , "poll() returned an unexpected value"); - return MEMCACHED_SUCCESS; + assert_msg(active_fd == 1 , "poll() returned an unexpected number of active file descriptors"); + if (fds.revents & POLLIN or fds.revents & POLLOUT) + { + return MEMCACHED_SUCCESS; + } + + if (fds.revents & POLLHUP) + { + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("poll() detected hang up")); + } + + if (fds.revents & POLLERR) + { + int local_errno= EINVAL; + int err; + socklen_t len= sizeof (err); + if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == 0) + { + if (err == 0) // treat this as EINTR + { + continue; + } + local_errno= err; + } + memcached_quit_server(ptr, true); + return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, + memcached_literal_param("poll() returned POLLHUP")); + } + + return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with")); } - else if (active_fd == 0) + + if (active_fd == 0) { ptr->io_wait_count.timeouts++; return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); } // Only an error should result in this code being called. - local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno assert_msg(active_fd == -1 , "poll() returned an unexpected value"); switch (local_errno) { @@ -235,37 +275,30 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, case EFAULT: case ENOMEM: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); case EINVAL: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); default: - if (fds.revents & POLLERR) - { - int err; - socklen_t len= sizeof (err); - if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) - { - if (err == 0) // treat this as EINTR - { - continue; - } - local_errno= err; - } - } - break; + memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); } - break; // should only occur from poll error + break; } memcached_quit_server(ptr, true); - return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT); + if (memcached_has_error(ptr)) + { + return memcached_instance_error_return(ptr); + } + + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("number of attempts to call io_wait() failed")); } -static bool io_flush(memcached_server_write_instance_st ptr, +static bool io_flush(org::libmemcached::Instance* ptr, const bool with_flush, memcached_return_t& error) { @@ -301,7 +334,16 @@ static bool io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); - int flags= with_flush ? MSG_NOSIGNAL|MSG_DONTWAIT : MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE; + int flags; + if (with_flush) + { + flags= MSG_NOSIGNAL|MSG_DONTWAIT; + } + else + { + flags= MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE; + } + ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags); if (sent_length == SOCKET_ERROR) @@ -367,17 +409,17 @@ static bool io_flush(memcached_server_write_instance_st ptr, return true; } -memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) +memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* ptr) { return io_wait(ptr, MEM_WRITE); } -static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize_t& nread) +static memcached_return_t _io_fill(org::libmemcached::Instance* ptr) { ssize_t data_read; do { - data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); + data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT|MSG_NOSIGNAL); if (data_read == SOCKET_ERROR) { switch (get_socket_errno()) @@ -415,12 +457,12 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize case EFAULT: case ECONNREFUSED: default: - { - memcached_quit_server(ptr, true); - nread= -1; - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } + memcached_quit_server(ptr, true); + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + break; } + + return memcached_instance_error_return(ptr); } else if (data_read == 0) { @@ -434,7 +476,6 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize it will return EGAIN if data is not immediatly available. */ memcached_quit_server(ptr, true); - nread= -1; return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, memcached_literal_param("::rec() returned zero, server has disconnected")); } @@ -449,11 +490,11 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, +memcached_return_t memcached_io_read(org::libmemcached::Instance* ptr, void *buffer, size_t length, ssize_t& nread) { assert(memcached_is_udp(ptr->root) == false); - assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error + assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error char *buffer_ptr= static_cast(buffer); if (ptr->fd == INVALID_SOCKET) @@ -469,8 +510,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, if (ptr->read_buffer_length == 0) { memcached_return_t io_fill_ret; - if (memcached_fatal(io_fill_ret= _io_fill(ptr, nread))) + if (memcached_fatal(io_fill_ret= _io_fill(ptr))) { + nread= -1; return io_fill_ret; } } @@ -500,9 +542,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) +memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr) { - assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st"); + assert_msg(ptr, "Programmer error, invalid Instance"); assert(memcached_is_udp(ptr->root) == false); if (ptr->fd == INVALID_SOCKET) @@ -515,7 +557,7 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) char buffer[MEMCACHED_MAX_BUFFER]; do { - data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT); + data_read= ::recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT|MSG_NOSIGNAL); if (data_read == SOCKET_ERROR) { switch (get_socket_errno()) @@ -557,7 +599,7 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) return MEMCACHED_CONNECTION_FAILURE; } -static bool _io_write(memcached_server_write_instance_st ptr, +static bool _io_write(org::libmemcached::Instance* ptr, const void *buffer, size_t length, bool with_flush, size_t& written) { @@ -610,13 +652,13 @@ static bool _io_write(memcached_server_write_instance_st ptr, return true; } -bool memcached_io_write(memcached_server_write_instance_st ptr) +bool memcached_io_write(org::libmemcached::Instance* ptr) { size_t written; return _io_write(ptr, NULL, 0, true, written); } -ssize_t memcached_io_write(memcached_server_write_instance_st ptr, +ssize_t memcached_io_write(org::libmemcached::Instance* ptr, const void *buffer, const size_t length, const bool with_flush) { size_t written; @@ -629,9 +671,9 @@ ssize_t memcached_io_write(memcached_server_write_instance_st ptr, return ssize_t(written); } -bool memcached_io_writev(memcached_server_write_instance_st ptr, - libmemcached_io_vector_st vector[], - const size_t number_of, const bool with_flush) +bool memcached_io_writev(org::libmemcached::Instance* ptr, + libmemcached_io_vector_st vector[], + const size_t number_of, const bool with_flush) { ssize_t complete_total= 0; ssize_t total= 0; @@ -662,7 +704,7 @@ bool memcached_io_writev(memcached_server_write_instance_st ptr, } -void memcached_io_close(memcached_server_write_instance_st ptr) +void memcached_io_close(org::libmemcached::Instance* ptr) { if (ptr->fd == INVALID_SOCKET) { @@ -670,7 +712,7 @@ void memcached_io_close(memcached_server_write_instance_st ptr) } /* in case of death shutdown to avoid blocking at close() */ - if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN) + if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) { WATCHPOINT_NUMBER(ptr->fd); WATCHPOINT_ERRNO(get_socket_errno()); @@ -685,7 +727,7 @@ void memcached_io_close(memcached_server_write_instance_st ptr) ptr->fd= INVALID_SOCKET; } -memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc) +org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; @@ -693,18 +735,18 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { - memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ { return instance; } - if (memcached_server_response_count(instance) > 0) + if (instance->response_count() > 0) { - fds[host_index].events = POLLIN; - fds[host_index].revents = 0; - fds[host_index].fd = instance->fd; + fds[host_index].events= POLLIN; + fds[host_index].revents= 0; + fds[host_index].fd= instance->fd; ++host_index; } } @@ -714,10 +756,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, x); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); - if (memcached_server_response_count(instance) > 0) + if (instance->response_count() > 0) { return instance; } @@ -742,7 +783,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, y); if (instance->fd == fds[x].fd) { @@ -759,7 +800,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(memcached_server_write_instance_st ptr) +void memcached_io_reset(org::libmemcached::Instance* ptr) { memcached_quit_server(ptr, true); } @@ -768,7 +809,7 @@ void memcached_io_reset(memcached_server_write_instance_st ptr) * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, +memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr, void *dta, const size_t size) { @@ -793,7 +834,7 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, +memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr, char *buffer_ptr, size_t size, size_t& total_nr)