+static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
+ const memc_read_or_write read_or_write)
+{
+ /*
+ ** We are going to block on write, but at least on Solaris we might block
+ ** on write if we haven't read anything from our input buffer..
+ ** Try to purge the input buffer if we don't do any flow control in the
+ ** application layer (just sending a lot of data etc)
+ ** The test is moved down in the purge function to avoid duplication of
+ ** the test.
+ */
+ if (read_or_write == MEM_WRITE)
+ {
+ if (memcached_purge(ptr) == false)
+ {
+ return MEMCACHED_FAILURE;
+ }
+ }
+
+ struct pollfd fds;
+ fds.fd= ptr->fd;
+ fds.events= POLLIN;
+ fds.revents= 0;
+
+ if (read_or_write == MEM_WRITE) /* write */
+ {
+ fds.events= POLLOUT;
+ ptr->io_wait_count.write++;
+ }
+ else
+ {
+ ptr->io_wait_count.read++;
+ }
+
+ if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
+ {
+ ptr->io_wait_count.timeouts++;
+ return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+ }
+
+ int local_errno;
+ size_t loop_max= 5;
+ while (--loop_max) // While loop is for ERESTART or EINTR
+ {
+ int active_fd= poll(&fds, 1, ptr->root->poll_timeout);
+
+ if (active_fd >= 1)
+ {
+ assert_msg(active_fd == 1 , "poll() returned an unexpected value");
+ return MEMCACHED_SUCCESS;
+ }
+ else if (active_fd == 0)
+ {
+ ptr->io_wait_count.timeouts++;
+ return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+ }
+
+ // Only an error should result in this code being called.
+ local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
+ assert_msg(active_fd == -1 , "poll() returned an unexpected value");
+ switch (local_errno)
+ {
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ case EINTR:
+ continue;
+
+ case EFAULT:
+ case ENOMEM:
+ return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+
+ case EINVAL:
+ return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+
+ default:
+ if (fds.revents & POLLERR)
+ {
+ int err;
+ socklen_t len= sizeof (err);
+ if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+ {
+ if (err == 0) // treat this as EINTR
+ {
+ continue;
+ }
+ local_errno= err;
+ }
+ }
+ break;
+ }
+
+ break; // should only occur from poll error
+ }
+
+ memcached_quit_server(ptr, true);
+
+ return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
+}
+
+static bool io_flush(memcached_server_write_instance_st ptr,
+ const bool with_flush,
+ memcached_return_t& error)
+{
+ /*
+ ** We might want to purge the input buffer if we haven't consumed
+ ** any output yet... The test for the limits is the purge is inline
+ ** in the purge function to avoid duplicating the logic..
+ */
+ {
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+
+ if (memcached_purge(ptr) == false)
+ {
+ return false;
+ }
+ }
+ char *local_write_ptr= ptr->write_buffer;
+ size_t write_length= ptr->write_buffer_offset;
+
+ error= MEMCACHED_SUCCESS;
+
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+
+ /* Looking for memory overflows */
+#if defined(DEBUG)
+ if (write_length == MEMCACHED_MAX_BUFFER)
+ WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
+ WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
+#endif
+
+ while (write_length)
+ {
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+ WATCHPOINT_ASSERT(write_length > 0);
+
+ int flags= with_flush ? MSG_NOSIGNAL|MSG_DONTWAIT : MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE;
+ ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags);
+
+ if (sent_length == SOCKET_ERROR)
+ {
+#if 0 // @todo I should look at why we hit this bit of code hard frequently
+ WATCHPOINT_ERRNO(get_socket_errno());
+ WATCHPOINT_NUMBER(get_socket_errno());
+#endif
+ switch (get_socket_errno())
+ {
+ case ENOBUFS:
+ continue;
+
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ case EAGAIN:
+ {
+ /*
+ * We may be blocked on write because the input buffer
+ * is full. Let's check if we have room in our input
+ * buffer for more data and retry the write before
+ * waiting..
+ */
+ if (repack_input_buffer(ptr) or process_input_buffer(ptr))
+ {
+ continue;
+ }
+
+ memcached_return_t rc= io_wait(ptr, MEM_WRITE);
+ if (memcached_success(rc))
+ {
+ continue;
+ }
+ else if (rc == MEMCACHED_TIMEOUT)
+ {
+ return false;
+ }
+
+ memcached_quit_server(ptr, true);
+ error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ return false;
+ }
+ case ENOTCONN:
+ case EPIPE:
+ default:
+ memcached_quit_server(ptr, true);
+ error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET);
+ return false;
+ }
+ }
+
+ ptr->io_bytes_sent+= uint32_t(sent_length);
+
+ local_write_ptr+= sent_length;
+ write_length-= uint32_t(sent_length);
+ }
+
+ WATCHPOINT_ASSERT(write_length == 0);
+ ptr->write_buffer_offset= 0;
+
+ return true;
+}
+
+memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+{
+ return io_wait(ptr, MEM_WRITE);
+}
+