X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=94686d0590851a26c16b9cce32bc3858080b0661;hb=20671681ff4e13ed84be72b699f71a90337245cd;hp=fbfb8555e975cb6d164b3b6ba1f5ac4cc7ca6a79;hpb=f19d17ca4cd851c81c005b68c82b17c3c21b6846;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index fbfb8555..94686d05 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -153,7 +153,9 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) { error= (*cb.callback[x])(ptr->root, &root->result, cb.context); if (error != MEMCACHED_SUCCESS) + { break; + } } /* @todo what should I do with the error message??? */ @@ -168,20 +170,6 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) static memcached_return_t io_wait(memcached_server_write_instance_st ptr, const memc_read_or_write read_or_write) { - struct pollfd fds; - fds.fd= ptr->fd; - fds.events= POLLIN; - - if (read_or_write == MEM_WRITE) /* write */ - { - fds.events= POLLOUT; - WATCHPOINT_SET(ptr->io_wait_count.write++); - } - else - { - WATCHPOINT_SET(ptr->io_wait_count.read++); - } - /* ** We are going to block on write, but at least on Solaris we might block ** on write if we haven't read anything from our input buffer.. @@ -192,13 +180,27 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, */ if (read_or_write == MEM_WRITE) { - memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + if (memcached_fatal(memcached_purge(ptr))) { return MEMCACHED_FAILURE; } } + struct pollfd fds; + memset(&fds, 0, sizeof(pollfd)); + fds.fd= ptr->fd; + fds.events= POLLIN; + + if (read_or_write == MEM_WRITE) /* write */ + { + fds.events= POLLOUT; + ptr->io_wait_count.write++; + } + else + { + ptr->io_wait_count.read++; + } + if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) { return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); @@ -207,21 +209,20 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { + int active_fd= poll(&fds, 1, ptr->root->poll_timeout); + assert_msg(active_fd <= 1 , "poll() returned an unexpected value"); - int error= poll(&fds, 1, ptr->root->poll_timeout); - switch (error) + if (active_fd == 1) { - case 1: // Success! - WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); - WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); - return MEMCACHED_SUCCESS; - - case 0: // Timeout occured, we let the while() loop do its thing. + } + else if (active_fd == 0) + { + ptr->io_wait_count.timeouts++; return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); - - default: - WATCHPOINT_ERRNO(get_socket_errno()); + } + else // -1 + { switch (get_socket_errno()) { #ifdef TARGET_OS_LINUX @@ -238,24 +239,20 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); default: + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno if (fds.revents & POLLERR) { int err; socklen_t len= sizeof (err); if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) { - if (err == 0) + if (err == 0) // treat this as EINTR { continue; } - errno= err; + local_errno= err; } } - else - { - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } - int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno memcached_quit_server(ptr, true); return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT); @@ -278,11 +275,10 @@ static bool io_flush(memcached_server_write_instance_st ptr, ** in the purge function to avoid duplicating the logic.. */ { - memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - rc= memcached_purge(ptr); + memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + if (rc != MEMCACHED_SUCCESS and rc != MEMCACHED_STORED) { return false; } @@ -351,6 +347,7 @@ static bool io_flush(memcached_server_write_instance_st ptr, } else if (rc == MEMCACHED_TIMEOUT) { + ptr->io_wait_count.timeouts++; error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); return false; } @@ -403,7 +400,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, while (length) { - if (not ptr->read_buffer_length) + if (ptr->read_buffer_length == 0) { ssize_t data_read; do @@ -424,11 +421,15 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, #ifdef TARGET_OS_LINUX case ERESTART: #endif - if (memcached_success(io_wait(ptr, MEM_READ))) { - continue; + memcached_return_t io_wait_ret; + if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ))) + { + continue; + } + + return io_wait_ret; } - return MEMCACHED_IN_PROGRESS; /* fall through */ @@ -602,7 +603,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, } } - return (ssize_t) original_length; + return ssize_t(original_length); } bool memcached_io_write(memcached_server_write_instance_st ptr) @@ -675,9 +676,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; - unsigned int host_index= 0; + nfds_t host_index= 0; - for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) + for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); @@ -722,17 +723,18 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st break; default: - for (size_t x= 0; x < host_index; ++x) + for (nfds_t x= 0; x < host_index; ++x) { if (fds[x].revents & POLLIN) { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, y); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y); if (instance->fd == fds[x].fd) + { return instance; + } } } }