X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=837959da5db6696765036ffe5787741c8235658c;hb=674c7578fa870c3b57e81e765c355ce98434b310;hp=309df4c1dd3c3595121261b21250f5155ce28a06;hpb=2e41f2aa7540640d02892f46bd16871e4581f3cf;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 309df4c1..837959da 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -39,6 +39,13 @@ #include +void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header) +{ + server->request_id++; + header.request.magic= PROTOCOL_BINARY_REQ; + header.request.opaque= htons(server->request_id); +} + enum memc_read_or_write { MEM_READ, MEM_WRITE @@ -50,7 +57,7 @@ enum memc_read_or_write { * * @param ptr the server to pack */ -static bool repack_input_buffer(memcached_server_write_instance_st ptr) +static bool repack_input_buffer(org::libmemcached::Instance* ptr) { if (ptr->read_ptr != ptr->read_buffer) { @@ -67,14 +74,17 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) { do { /* Just try a single read to grab what's available */ - ssize_t nr= recv(ptr->fd, - ptr->read_ptr + ptr->read_data_length, - MEMCACHED_MAX_BUFFER - ptr->read_data_length, - MSG_DONTWAIT); - - switch (nr) + ssize_t nr; + if ((nr= recv(ptr->fd, + ptr->read_ptr + ptr->read_data_length, + MEMCACHED_MAX_BUFFER - ptr->read_data_length, + MSG_DONTWAIT)) <= 0) { - case SOCKET_ERROR: + if (nr == 0) + { + memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); + } + else { switch (get_socket_errno()) { @@ -94,24 +104,19 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } } - break; - case 0: // Shutdown on the socket has occurred - { - memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); - } break; + } + else // We read data, append to our read buffer + { + ptr->read_data_length+= size_t(nr); + ptr->read_buffer_length+= size_t(nr); - default: - { - ptr->read_data_length+= size_t(nr); - ptr->read_buffer_length+= size_t(nr); - return true; - } - break; + return true; } - } while (0); + } while (false); } + return false; } @@ -125,7 +130,7 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) * @param ptr the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(memcached_server_write_instance_st ptr) +static bool process_input_buffer(org::libmemcached::Instance* ptr) { /* ** We might be able to process some of the response messages if we @@ -167,7 +172,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) return false; } -static memcached_return_t io_wait(memcached_server_write_instance_st ptr, +static memcached_return_t io_wait(org::libmemcached::Instance* ptr, const memc_read_or_write read_or_write) { /* @@ -180,16 +185,16 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, */ if (read_or_write == MEM_WRITE) { - if (memcached_fatal(memcached_purge(ptr))) + if (memcached_purge(ptr) == false) { return MEMCACHED_FAILURE; } } struct pollfd fds; - memset(&fds, 0, sizeof(pollfd)); fds.fd= ptr->fd; fds.events= POLLIN; + fds.revents= 0; if (read_or_write == MEM_WRITE) /* write */ { @@ -203,10 +208,10 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) { + ptr->io_wait_count.timeouts++; return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); } - int local_errno; size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { @@ -214,17 +219,47 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, if (active_fd >= 1) { - assert_msg(active_fd == 1 , "poll() returned an unexpected value"); - return MEMCACHED_SUCCESS; + assert_msg(active_fd == 1 , "poll() returned an unexpected number of active file descriptors"); + if (fds.revents & POLLIN or fds.revents & POLLOUT) + { + return MEMCACHED_SUCCESS; + } + + if (fds.revents & POLLHUP) + { + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("poll() detected hang up")); + } + + if (fds.revents & POLLERR) + { + int local_errno= EINVAL; + int err; + socklen_t len= sizeof (err); + if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) + { + if (err == 0) // treat this as EINTR + { + continue; + } + local_errno= err; + } + memcached_quit_server(ptr, true); + return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, + memcached_literal_param("poll() returned POLLHUP")); + } + + return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with")); } - else if (active_fd == 0) + + if (active_fd == 0) { ptr->io_wait_count.timeouts++; return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); } // Only an error should result in this code being called. - local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno assert_msg(active_fd == -1 , "poll() returned an unexpected value"); switch (local_errno) { @@ -236,37 +271,30 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, case EFAULT: case ENOMEM: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); case EINVAL: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); default: - if (fds.revents & POLLERR) - { - int err; - socklen_t len= sizeof (err); - if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) - { - if (err == 0) // treat this as EINTR - { - continue; - } - local_errno= err; - } - } - break; + memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); } - break; // should only occur from poll error + break; } memcached_quit_server(ptr, true); - return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT); + if (memcached_has_error(ptr)) + { + return memcached_instance_error_return(ptr); + } + + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("number of attempts to call io_wait() failed")); } -static bool io_flush(memcached_server_write_instance_st ptr, +static bool io_flush(org::libmemcached::Instance* ptr, const bool with_flush, memcached_return_t& error) { @@ -277,9 +305,8 @@ static bool io_flush(memcached_server_write_instance_st ptr, */ { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS and rc != MEMCACHED_STORED) + if (memcached_purge(ptr) == false) { return false; } @@ -340,8 +367,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, } else if (rc == MEMCACHED_TIMEOUT) { - ptr->io_wait_count.timeouts++; - error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); return false; } @@ -371,16 +396,92 @@ static bool io_flush(memcached_server_write_instance_st ptr, return true; } -memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) +memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* ptr) { return io_wait(ptr, MEM_WRITE); } -memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, +static memcached_return_t _io_fill(org::libmemcached::Instance* ptr) +{ + ssize_t data_read; + do + { + data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); + if (data_read == SOCKET_ERROR) + { + switch (get_socket_errno()) + { + case EINTR: // We just retry + continue; + + case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + case EAGAIN: +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + { + memcached_return_t io_wait_ret; + if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ))) + { + continue; + } + + return io_wait_ret; + } + + /* fall through */ + + case ENOTCONN: // Programmer Error + WATCHPOINT_ASSERT(0); + case ENOTSOCK: + WATCHPOINT_ASSERT(0); + case EBADF: + assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket"); + case EINVAL: + case EFAULT: + case ECONNREFUSED: + default: + memcached_quit_server(ptr, true); + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + break; + } + + return memcached_instance_error_return(ptr); + } + else if (data_read == 0) + { + /* + EOF. Any data received so far is incomplete + so discard it. This always reads by byte in case of TCP + and protocol enforcement happens at memcached_response() + looking for '\n'. We do not care for UDB which requests 8 bytes + at once. Generally, this means that connection went away. Since + for blocking I/O we do not return 0 and for non-blocking case + it will return EGAIN if data is not immediatly available. + */ + memcached_quit_server(ptr, true); + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("::rec() returned zero, server has disconnected")); + } + ptr->io_wait_count._bytes_read+= data_read; + } while (data_read <= 0); + + ptr->io_bytes_sent= 0; + ptr->read_data_length= (size_t) data_read; + ptr->read_buffer_length= (size_t) data_read; + ptr->read_ptr= ptr->read_buffer; + + return MEMCACHED_SUCCESS; +} + +memcached_return_t memcached_io_read(org::libmemcached::Instance* ptr, void *buffer, size_t length, ssize_t& nread) { assert(memcached_is_udp(ptr->root) == false); - assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error + assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error char *buffer_ptr= static_cast(buffer); if (ptr->fd == INVALID_SOCKET) @@ -395,84 +496,17 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, { if (ptr->read_buffer_length == 0) { - ssize_t data_read; - do + memcached_return_t io_fill_ret; + if (memcached_fatal(io_fill_ret= _io_fill(ptr))) { - data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); - if (data_read == SOCKET_ERROR) - { - switch (get_socket_errno()) - { - case EINTR: // We just retry - continue; - - case ETIMEDOUT: // OSX -#if EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - case EAGAIN: -#ifdef TARGET_OS_LINUX - case ERESTART: -#endif - { - memcached_return_t io_wait_ret; - if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ))) - { - continue; - } - - return io_wait_ret; - } - - /* fall through */ - - case ENOTCONN: // Programmer Error - WATCHPOINT_ASSERT(0); - case ENOTSOCK: - WATCHPOINT_ASSERT(0); - case EBADF: - assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket"); - case EINVAL: - case EFAULT: - case ECONNREFUSED: - default: - { - memcached_quit_server(ptr, true); - nread= -1; - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } - } - } - else if (data_read == 0) - { - /* - EOF. Any data received so far is incomplete - so discard it. This always reads by byte in case of TCP - and protocol enforcement happens at memcached_response() - looking for '\n'. We do not care for UDB which requests 8 bytes - at once. Generally, this means that connection went away. Since - for blocking I/O we do not return 0 and for non-blocking case - it will return EGAIN if data is not immediatly available. - */ - WATCHPOINT_STRING("We had a zero length recv()"); - memcached_quit_server(ptr, true); - nread= -1; - return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, - memcached_literal_param("::rec() returned zero, server has disconnected")); - } - } while (data_read <= 0); - - ptr->io_bytes_sent = 0; - ptr->read_data_length= (size_t) data_read; - ptr->read_buffer_length= (size_t) data_read; - ptr->read_ptr= ptr->read_buffer; + nread= -1; + return io_fill_ret; + } } if (length > 1) { - size_t difference; - - difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; + size_t difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; memcpy(buffer_ptr, ptr->read_ptr, difference); length -= difference; @@ -495,9 +529,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) +memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr) { - assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st"); + assert_msg(ptr, "Programmer error, invalid Instance"); assert(memcached_is_udp(ptr->root) == false); if (ptr->fd == INVALID_SOCKET) @@ -535,9 +569,9 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) /* fall through */ case ENOTCONN: // Programmer Error - WATCHPOINT_ASSERT(0); + assert(0); case ENOTSOCK: - WATCHPOINT_ASSERT(0); + assert(0); case EBADF: assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state"); case EINVAL: @@ -552,15 +586,17 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) return MEMCACHED_CONNECTION_FAILURE; } -static ssize_t _io_write(memcached_server_write_instance_st ptr, - const void *buffer, size_t length, bool with_flush) +static bool _io_write(org::libmemcached::Instance* ptr, + const void *buffer, size_t length, bool with_flush, + size_t& written) { - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + assert(ptr->fd != INVALID_SOCKET); assert(memcached_is_udp(ptr->root) == false); - size_t original_length= length; const char *buffer_ptr= static_cast(buffer); + const size_t original_length= length; + while (length) { char *write_ptr; @@ -581,7 +617,8 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, memcached_return_t rc; if (io_flush(ptr, with_flush, rc) == false) { - return -1; + written= original_length -length; + return false; } } } @@ -592,41 +629,53 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); if (io_flush(ptr, with_flush, rc) == false) { - return -1; + written= original_length -length; + return false; } } - return ssize_t(original_length); + written= original_length -length; + + return true; } -bool memcached_io_write(memcached_server_write_instance_st ptr) +bool memcached_io_write(org::libmemcached::Instance* ptr) { - return (_io_write(ptr, NULL, 0, true) >= 0); + size_t written; + return _io_write(ptr, NULL, 0, true, written); } -ssize_t memcached_io_write(memcached_server_write_instance_st ptr, +ssize_t memcached_io_write(org::libmemcached::Instance* ptr, const void *buffer, const size_t length, const bool with_flush) { - return _io_write(ptr, buffer, length, with_flush); + size_t written; + + if (_io_write(ptr, buffer, length, with_flush, written) == false) + { + return -1; + } + + return ssize_t(written); } -ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, - libmemcached_io_vector_st vector[], - const size_t number_of, const bool with_flush) +bool memcached_io_writev(org::libmemcached::Instance* ptr, + libmemcached_io_vector_st vector[], + const size_t number_of, const bool with_flush) { + ssize_t complete_total= 0; ssize_t total= 0; for (size_t x= 0; x < number_of; x++, vector++) { - ssize_t returnable; - + complete_total+= vector->length; if (vector->length) { - if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + size_t written; + if ((_io_write(ptr, vector->buffer, vector->length, false, written)) == false) { - return -1; + return false; } - total+= returnable; + total+= written; } } @@ -634,15 +683,15 @@ ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, { if (memcached_io_write(ptr) == false) { - return -1; + return false; } } - return total; + return (complete_total == total); } -void memcached_io_close(memcached_server_write_instance_st ptr) +void memcached_io_close(org::libmemcached::Instance* ptr) { if (ptr->fd == INVALID_SOCKET) { @@ -665,7 +714,7 @@ void memcached_io_close(memcached_server_write_instance_st ptr) ptr->fd= INVALID_SOCKET; } -memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc) +org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; @@ -673,18 +722,18 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { - memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ { return instance; } - if (memcached_server_response_count(instance) > 0) + if (memcached_instance_response_count(instance) > 0) { - fds[host_index].events = POLLIN; - fds[host_index].revents = 0; - fds[host_index].fd = instance->fd; + fds[host_index].events= POLLIN; + fds[host_index].revents= 0; + fds[host_index].fd= instance->fd; ++host_index; } } @@ -694,10 +743,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, x); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x); - if (memcached_server_response_count(instance) > 0) + if (memcached_instance_response_count(instance) > 0) { return instance; } @@ -722,7 +770,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y); + org::libmemcached::Instance* instance= memcached_instance_fetch(memc, y); if (instance->fd == fds[x].fd) { @@ -739,7 +787,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(memcached_server_write_instance_st ptr) +void memcached_io_reset(org::libmemcached::Instance* ptr) { memcached_quit_server(ptr, true); } @@ -748,7 +796,7 @@ void memcached_io_reset(memcached_server_write_instance_st ptr) * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, +memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr, void *dta, const size_t size) { @@ -773,7 +821,7 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, +memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr, char *buffer_ptr, size_t size, size_t& total_nr) @@ -812,7 +860,7 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, } /* Now let's look in the buffer and copy as we go! */ - while (ptr->read_buffer_length && total_nr < size && !line_complete) + while (ptr->read_buffer_length and total_nr < size and line_complete == false) { *buffer_ptr = *ptr->read_ptr; if (*buffer_ptr == '\n')