X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=d2e40043dfaa1c958ae1e346ed8dc1dbacb6bd3a;hb=ca663a567bc8d3facb22b035bcad19349e42a9b1;hp=85474d3eae215b8b7e33564fc605cf2b6cb7c2b2;hpb=53b79318cea42a1cd7bf19a29f4308091961021b;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 85474d3e..d2e40043 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -39,6 +39,13 @@ #include +void initialize_binary_request(memcached_server_write_instance_st server, protocol_binary_request_header& header) +{ + server->request_id++; + header.request.magic= PROTOCOL_BINARY_REQ; + header.request.opaque= htons(server->request_id); +} + enum memc_read_or_write { MEM_READ, MEM_WRITE @@ -205,7 +212,6 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); } - int local_errno; size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { @@ -213,17 +219,47 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, if (active_fd >= 1) { - assert_msg(active_fd == 1 , "poll() returned an unexpected value"); - return MEMCACHED_SUCCESS; + assert_msg(active_fd == 1 , "poll() returned an unexpected number of active file descriptors"); + if (fds.revents & POLLIN or fds.revents & POLLOUT) + { + return MEMCACHED_SUCCESS; + } + + if (fds.revents & POLLHUP) + { + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("poll() detected hang up")); + } + + if (fds.revents & POLLERR) + { + int local_errno= EINVAL; + int err; + socklen_t len= sizeof (err); + if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) + { + if (err == 0) // treat this as EINTR + { + continue; + } + local_errno= err; + } + memcached_quit_server(ptr, true); + return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, + memcached_literal_param("poll() returned POLLHUP")); + } + + return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with")); } - else if (active_fd == 0) + + if (active_fd == 0) { ptr->io_wait_count.timeouts++; return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); } // Only an error should result in this code being called. - local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno assert_msg(active_fd == -1 , "poll() returned an unexpected value"); switch (local_errno) { @@ -235,34 +271,27 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, case EFAULT: case ENOMEM: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); case EINVAL: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); default: - if (fds.revents & POLLERR) - { - int err; - socklen_t len= sizeof (err); - if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) - { - if (err == 0) // treat this as EINTR - { - continue; - } - local_errno= err; - } - } - break; + memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll")); } - break; // should only occur from poll error + break; } memcached_quit_server(ptr, true); - return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT); + if (memcached_has_error(ptr)) + { + return memcached_instance_error_return(ptr); + } + + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, + memcached_literal_param("number of attempts to call io_wait() failed")); } static bool io_flush(memcached_server_write_instance_st ptr, @@ -372,7 +401,7 @@ memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_s return io_wait(ptr, MEM_WRITE); } -static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize_t& nread) +static memcached_return_t _io_fill(memcached_server_write_instance_st ptr) { ssize_t data_read; do @@ -415,12 +444,12 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize case EFAULT: case ECONNREFUSED: default: - { - memcached_quit_server(ptr, true); - nread= -1; - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } + memcached_quit_server(ptr, true); + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + break; } + + return memcached_instance_error_return(ptr); } else if (data_read == 0) { @@ -434,7 +463,6 @@ static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize it will return EGAIN if data is not immediatly available. */ memcached_quit_server(ptr, true); - nread= -1; return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, memcached_literal_param("::rec() returned zero, server has disconnected")); } @@ -469,17 +497,16 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, if (ptr->read_buffer_length == 0) { memcached_return_t io_fill_ret; - if (memcached_fatal(io_fill_ret= _io_fill(ptr, nread))) + if (memcached_fatal(io_fill_ret= _io_fill(ptr))) { + nread= -1; return io_fill_ret; } } if (length > 1) { - size_t difference; - - difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; + size_t difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length; memcpy(buffer_ptr, ptr->read_ptr, difference); length -= difference; @@ -702,11 +729,11 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st return instance; } - if (memcached_server_response_count(instance) > 0) + if (memcached_instance_response_count(instance) > 0) { - fds[host_index].events = POLLIN; - fds[host_index].revents = 0; - fds[host_index].fd = instance->fd; + fds[host_index].events= POLLIN; + fds[host_index].revents= 0; + fds[host_index].fd= instance->fd; ++host_index; } } @@ -716,10 +743,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, x); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); - if (memcached_server_response_count(instance) > 0) + if (memcached_instance_response_count(instance) > 0) { return instance; }