X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=309df4c1dd3c3595121261b21250f5155ce28a06;hb=0302d893ad986b16a186b8b145c08e8599e3804a;hp=7a321adeed1f4ee7ca6a38b57b43f84b7595105e;hpb=2959e927ae7c1da490db7a88f0cd589e8bc6b45f;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 7a321ade..309df4c1 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -81,10 +81,10 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) case EINTR: continue; +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -131,7 +131,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) ** We might be able to process some of the response messages if we ** have a callback set up */ - if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false) + if (ptr->root->callbacks != NULL) { /* * We might have responses... try to read them out and fire @@ -142,10 +142,8 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) memcached_set_processing_input((memcached_st *)ptr->root, true); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - memcached_return_t error; memcached_st *root= (memcached_st *)ptr->root; - error= memcached_response(ptr, buffer, sizeof(buffer), - &root->result); + memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result); memcached_set_processing_input(root, false); @@ -155,7 +153,9 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) { error= (*cb.callback[x])(ptr->root, &root->result, cb.context); if (error != MEMCACHED_SUCCESS) + { break; + } } /* @todo what should I do with the error message??? */ @@ -170,20 +170,6 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) static memcached_return_t io_wait(memcached_server_write_instance_st ptr, const memc_read_or_write read_or_write) { - struct pollfd fds; - fds.fd= ptr->fd; - fds.events= POLLIN; - - if (read_or_write == MEM_WRITE) /* write */ - { - fds.events= POLLOUT; - WATCHPOINT_SET(ptr->io_wait_count.write++); - } - else - { - WATCHPOINT_SET(ptr->io_wait_count.read++); - } - /* ** We are going to block on write, but at least on Solaris we might block ** on write if we haven't read anything from our input buffer.. @@ -194,80 +180,90 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, */ if (read_or_write == MEM_WRITE) { - memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + if (memcached_fatal(memcached_purge(ptr))) { return MEMCACHED_FAILURE; } } + struct pollfd fds; + memset(&fds, 0, sizeof(pollfd)); + fds.fd= ptr->fd; + fds.events= POLLIN; + + if (read_or_write == MEM_WRITE) /* write */ + { + fds.events= POLLOUT; + ptr->io_wait_count.write++; + } + else + { + ptr->io_wait_count.read++; + } + if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) { return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); } + int local_errno; size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { + int active_fd= poll(&fds, 1, ptr->root->poll_timeout); - int error= poll(&fds, 1, ptr->root->poll_timeout); - switch (error) + if (active_fd >= 1) { - case 1: // Success! - WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); - WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); - + assert_msg(active_fd == 1 , "poll() returned an unexpected value"); return MEMCACHED_SUCCESS; - - case 0: // Timeout occured, we let the while() loop do its thing. + } + else if (active_fd == 0) + { + ptr->io_wait_count.timeouts++; return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + } - default: - WATCHPOINT_ERRNO(get_socket_errno()); - switch (get_socket_errno()) - { + // Only an error should result in this code being called. + local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno + assert_msg(active_fd == -1 , "poll() returned an unexpected value"); + switch (local_errno) + { #ifdef TARGET_OS_LINUX - case ERESTART: + case ERESTART: #endif - case EINTR: - break; + case EINTR: + continue; - case EFAULT: - case ENOMEM: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + case EFAULT: + case ENOMEM: + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); - case EINVAL: - return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + case EINVAL: + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); - default: - if (fds.revents & POLLERR) + default: + if (fds.revents & POLLERR) + { + int err; + socklen_t len= sizeof (err); + if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) { - int err; - socklen_t len= sizeof (err); - if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) + if (err == 0) // treat this as EINTR { - if (err == 0) - { - continue; - } - errno= err; + continue; } + local_errno= err; } - else - { - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - } - int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno - memcached_quit_server(ptr, true); - - return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT); } + break; } + + break; // should only occur from poll error } memcached_quit_server(ptr, true); - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT); } static bool io_flush(memcached_server_write_instance_st ptr, @@ -280,11 +276,10 @@ static bool io_flush(memcached_server_write_instance_st ptr, ** in the purge function to avoid duplicating the logic.. */ { - memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - rc= memcached_purge(ptr); + memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + if (rc != MEMCACHED_SUCCESS and rc != MEMCACHED_STORED) { return false; } @@ -296,18 +291,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - // UDP Sanity check, make sure that we are not sending somthing too big - if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH) - { - error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return false; - } - - if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) - { - return true; - } - /* Looking for memory overflows */ #if defined(DEBUG) if (write_length == MEMCACHED_MAX_BUFFER) @@ -319,25 +302,12 @@ static bool io_flush(memcached_server_write_instance_st ptr, { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); - if (memcached_is_udp(ptr->root)) - { - increment_udp_message_id(ptr); - } - ssize_t sent_length= 0; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (with_flush) - { - sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); - } - else - { - sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); - } + int flags= with_flush ? MSG_NOSIGNAL|MSG_DONTWAIT : MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE; + ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags); if (sent_length == SOCKET_ERROR) { - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); #if 0 // @todo I should look at why we hit this bit of code hard frequently WATCHPOINT_ERRNO(get_socket_errno()); WATCHPOINT_NUMBER(get_socket_errno()); @@ -346,10 +316,11 @@ static bool io_flush(memcached_server_write_instance_st ptr, { case ENOBUFS: continue; + +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: { /* * We may be blocked on write because the input buffer @@ -369,6 +340,7 @@ static bool io_flush(memcached_server_write_instance_st ptr, } else if (rc == MEMCACHED_TIMEOUT) { + ptr->io_wait_count.timeouts++; error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); return false; } @@ -387,13 +359,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, } } - if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length) - { - memcached_quit_server(ptr, true); - error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return false; - } - ptr->io_bytes_sent+= uint32_t(sent_length); local_write_ptr+= sent_length; @@ -401,14 +366,7 @@ static bool io_flush(memcached_server_write_instance_st ptr, } WATCHPOINT_ASSERT(write_length == 0); - if (memcached_is_udp(ptr->root)) - { - ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; - } - else - { - ptr->write_buffer_offset= 0; - } + ptr->write_buffer_offset= 0; return true; } @@ -421,6 +379,7 @@ memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_s memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, void *buffer, size_t length, ssize_t& nread) { + assert(memcached_is_udp(ptr->root) == false); assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error char *buffer_ptr= static_cast(buffer); @@ -434,7 +393,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, while (length) { - if (not ptr->read_buffer_length) + if (ptr->read_buffer_length == 0) { ssize_t data_read; do @@ -448,18 +407,22 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, continue; case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif - if (memcached_success(io_wait(ptr, MEM_READ))) { - continue; + memcached_return_t io_wait_ret; + if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ))) + { + continue; + } + + return io_wait_ret; } - return MEMCACHED_IN_PROGRESS; /* fall through */ @@ -535,6 +498,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) { assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st"); + assert(memcached_is_udp(ptr->root) == false); if (ptr->fd == INVALID_SOCKET) { @@ -555,10 +519,10 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) continue; case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -592,6 +556,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, const void *buffer, size_t length, bool with_flush) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + assert(memcached_is_udp(ptr->root) == false); size_t original_length= length; const char *buffer_ptr= static_cast(buffer); @@ -599,25 +564,9 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, while (length) { char *write_ptr; - size_t should_write; - size_t buffer_end; - - if (memcached_is_udp(ptr->root)) - { - //UDP does not support partial writes - buffer_end= MAX_UDP_DATAGRAM_LENGTH; - should_write= length; - if (ptr->write_buffer_offset + should_write > buffer_end) - { - return -1; - } - } - else - { - buffer_end= MEMCACHED_MAX_BUFFER; - should_write= buffer_end - ptr->write_buffer_offset; - should_write= (should_write < length) ? should_write : length; - } + size_t buffer_end= MEMCACHED_MAX_BUFFER; + size_t should_write= buffer_end -ptr->write_buffer_offset; + should_write= (should_write < length) ? should_write : length; write_ptr= ptr->write_buffer + ptr->write_buffer_offset; memcpy(write_ptr, buffer_ptr, should_write); @@ -625,7 +574,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false) + if (ptr->write_buffer_offset == buffer_end) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -647,7 +596,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, } } - return (ssize_t) original_length; + return ssize_t(original_length); } bool memcached_io_write(memcached_server_write_instance_st ptr) @@ -720,9 +669,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; - unsigned int host_index= 0; + nfds_t host_index= 0; - for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) + for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); @@ -767,17 +716,18 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st break; default: - for (size_t x= 0; x < host_index; ++x) + for (nfds_t x= 0; x < host_index; ++x) { if (fds[x].revents & POLLIN) { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, y); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y); if (instance->fd == fds[x].fd) + { return instance; + } } } }