*/
-#include "libmemcached/common.h"
+#include <libmemcached/common.h>
-typedef enum {
+enum memc_read_or_write {
MEM_READ,
MEM_WRITE
-} memc_read_or_write;
-
-static ssize_t io_flush(memcached_server_write_instance_st ptr,
- const bool with_flush,
- memcached_return_t *error);
-static void increment_udp_message_id(memcached_server_write_instance_st ptr);
-
-static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
- memc_read_or_write read_or_write)
-{
- struct pollfd fds;
- fds.fd= ptr->fd;
- fds.events= POLLIN;
-
- int error;
-
- if (read_or_write == MEM_WRITE) /* write */
- {
- fds.events= POLLOUT;
- WATCHPOINT_SET(ptr->io_wait_count.write++);
- }
- else
- {
- WATCHPOINT_SET(ptr->io_wait_count.read++);
- }
-
- /*
- ** We are going to block on write, but at least on Solaris we might block
- ** on write if we haven't read anything from our input buffer..
- ** Try to purge the input buffer if we don't do any flow control in the
- ** application layer (just sending a lot of data etc)
- ** The test is moved down in the purge function to avoid duplication of
- ** the test.
- */
- if (read_or_write == MEM_WRITE)
- {
- memcached_return_t rc= memcached_purge(ptr);
- if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
- return MEMCACHED_FAILURE;
- }
-
- size_t loop_max= 5;
- while (--loop_max) // While loop is for ERESTART or EINTR
- {
- error= poll(&fds, 1, ptr->root->poll_timeout);
-
- switch (error)
- {
- case 1: // Success!
- WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
- WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
-
- return MEMCACHED_SUCCESS;
- case 0: // Timeout occured, we let the while() loop do its thing.
- return MEMCACHED_TIMEOUT;
- default:
- WATCHPOINT_ERRNO(get_socket_errno());
- switch (get_socket_errno())
- {
-#ifdef TARGET_OS_LINUX
- case ERESTART:
-#endif
- case EINTR:
- break;
- default:
- if (fds.revents & POLLERR)
- {
- int err;
- socklen_t len= sizeof (err);
- (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
- ptr->cached_errno= (err == 0) ? get_socket_errno() : err;
- }
- else
- {
- ptr->cached_errno= get_socket_errno();
- }
- memcached_quit_server(ptr, true);
-
- return MEMCACHED_FAILURE;
- }
- }
- }
-
- /* Imposssible for anything other then -1 */
- WATCHPOINT_ASSERT(error == -1);
- ptr->cached_errno= get_socket_errno();
- memcached_quit_server(ptr, true);
-
- return MEMCACHED_FAILURE;
-}
-
-memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
-{
- return io_wait(ptr, MEM_WRITE);
-}
+};
/**
* Try to fill the input buffer for a server with as much
/* There is room in the buffer, try to fill it! */
if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
{
- /* Just try a single read to grab what's available */
- ssize_t nr= recv(ptr->fd,
- ptr->read_ptr + ptr->read_data_length,
- MEMCACHED_MAX_BUFFER - ptr->read_data_length,
- 0);
+ do {
+ /* Just try a single read to grab what's available */
+ ssize_t nr;
+ if ((nr= recv(ptr->fd,
+ ptr->read_ptr + ptr->read_data_length,
+ MEMCACHED_MAX_BUFFER - ptr->read_data_length,
+ MSG_DONTWAIT)) <= 0)
+ {
+ if (nr == 0)
+ {
+ memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
+ }
+ else
+ {
+ switch (get_socket_errno())
+ {
+ case EINTR:
+ continue;
- if (nr > 0)
- {
- ptr->read_data_length+= (size_t)nr;
- ptr->read_buffer_length+= (size_t)nr;
- return true;
- }
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ case EAGAIN:
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ break; // No IO is fine, we can just move on
+
+ default:
+ memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ }
+ }
+
+ break;
+ }
+ else // We read data, append to our read buffer
+ {
+ ptr->read_data_length+= size_t(nr);
+ ptr->read_buffer_length+= size_t(nr);
+
+ return true;
+ }
+ } while (false);
}
+
return false;
}
** We might be able to process some of the response messages if we
** have a callback set up
*/
- if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
+ if (ptr->root->callbacks != NULL)
{
/*
* We might have responses... try to read them out and fire
memcached_set_processing_input((memcached_st *)ptr->root, true);
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
- memcached_return_t error;
memcached_st *root= (memcached_st *)ptr->root;
- error= memcached_response(ptr, buffer, sizeof(buffer),
- &root->result);
+ memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result);
memcached_set_processing_input(root, false);
{
error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
if (error != MEMCACHED_SUCCESS)
+ {
break;
+ }
}
/* @todo what should I do with the error message??? */
return false;
}
-#if 0 // Dead code, this should be removed.
-void memcached_io_preread(memcached_st *ptr)
+static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
+ const memc_read_or_write read_or_write)
{
- unsigned int x;
+ /*
+ ** We are going to block on write, but at least on Solaris we might block
+ ** on write if we haven't read anything from our input buffer..
+ ** Try to purge the input buffer if we don't do any flow control in the
+ ** application layer (just sending a lot of data etc)
+ ** The test is moved down in the purge function to avoid duplication of
+ ** the test.
+ */
+ if (read_or_write == MEM_WRITE)
+ {
+ if (memcached_purge(ptr) == false)
+ {
+ return MEMCACHED_FAILURE;
+ }
+ }
+
+ struct pollfd fds;
+ fds.fd= ptr->fd;
+ fds.events= POLLIN;
+ fds.revents= 0;
- return;
+ if (read_or_write == MEM_WRITE) /* write */
+ {
+ fds.events= POLLOUT;
+ ptr->io_wait_count.write++;
+ }
+ else
+ {
+ ptr->io_wait_count.read++;
+ }
- for (x= 0; x < memcached_server_count(ptr); x++)
+ if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
{
- if (memcached_server_response_count(ptr, x) &&
- ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
+ ptr->io_wait_count.timeouts++;
+ return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+ }
+
+ int local_errno;
+ size_t loop_max= 5;
+ while (--loop_max) // While loop is for ERESTART or EINTR
+ {
+ int active_fd= poll(&fds, 1, ptr->root->poll_timeout);
+
+ if (active_fd >= 1)
{
- size_t data_read;
+ assert_msg(active_fd == 1 , "poll() returned an unexpected value");
+ return MEMCACHED_SUCCESS;
+ }
+ else if (active_fd == 0)
+ {
+ ptr->io_wait_count.timeouts++;
+ return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+ }
- data_read= recv(ptr->hosts[x].fd,
- ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
- MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length, 0);
- if (data_read == SOCKET_ERROR)
- continue;
+ // Only an error should result in this code being called.
+ local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
+ assert_msg(active_fd == -1 , "poll() returned an unexpected value");
+ switch (local_errno)
+ {
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ case EINTR:
+ continue;
+
+ case EFAULT:
+ case ENOMEM:
+ return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
- ptr->hosts[x].read_buffer_length+= data_read;
- ptr->hosts[x].read_data_length+= data_read;
+ case EINVAL:
+ return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+
+ default:
+ if (fds.revents & POLLERR)
+ {
+ int err;
+ socklen_t len= sizeof (err);
+ if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+ {
+ if (err == 0) // treat this as EINTR
+ {
+ continue;
+ }
+ local_errno= err;
+ }
+ }
+ break;
}
+
+ break; // should only occur from poll error
}
+
+ memcached_quit_server(ptr, true);
+
+ return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
}
+
+static bool io_flush(memcached_server_write_instance_st ptr,
+ const bool with_flush,
+ memcached_return_t& error)
+{
+ /*
+ ** We might want to purge the input buffer if we haven't consumed
+ ** any output yet... The test for the limits is the purge is inline
+ ** in the purge function to avoid duplicating the logic..
+ */
+ {
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+
+ if (memcached_purge(ptr) == false)
+ {
+ return false;
+ }
+ }
+ char *local_write_ptr= ptr->write_buffer;
+ size_t write_length= ptr->write_buffer_offset;
+
+ error= MEMCACHED_SUCCESS;
+
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+
+ /* Looking for memory overflows */
+#if defined(DEBUG)
+ if (write_length == MEMCACHED_MAX_BUFFER)
+ WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
+ WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
+#endif
+
+ while (write_length)
+ {
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+ WATCHPOINT_ASSERT(write_length > 0);
+
+ int flags= with_flush ? MSG_NOSIGNAL|MSG_DONTWAIT : MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE;
+ ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags);
+
+ if (sent_length == SOCKET_ERROR)
+ {
+#if 0 // @todo I should look at why we hit this bit of code hard frequently
+ WATCHPOINT_ERRNO(get_socket_errno());
+ WATCHPOINT_NUMBER(get_socket_errno());
#endif
+ switch (get_socket_errno())
+ {
+ case ENOBUFS:
+ continue;
+
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ case EAGAIN:
+ {
+ /*
+ * We may be blocked on write because the input buffer
+ * is full. Let's check if we have room in our input
+ * buffer for more data and retry the write before
+ * waiting..
+ */
+ if (repack_input_buffer(ptr) or process_input_buffer(ptr))
+ {
+ continue;
+ }
+
+ memcached_return_t rc= io_wait(ptr, MEM_WRITE);
+ if (memcached_success(rc))
+ {
+ continue;
+ }
+ else if (rc == MEMCACHED_TIMEOUT)
+ {
+ return false;
+ }
+
+ memcached_quit_server(ptr, true);
+ error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ return false;
+ }
+ case ENOTCONN:
+ case EPIPE:
+ default:
+ memcached_quit_server(ptr, true);
+ error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET);
+ return false;
+ }
+ }
+
+ ptr->io_bytes_sent+= uint32_t(sent_length);
+
+ local_write_ptr+= sent_length;
+ write_length-= uint32_t(sent_length);
+ }
+
+ WATCHPOINT_ASSERT(write_length == 0);
+ ptr->write_buffer_offset= 0;
+
+ return true;
+}
+
+memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+{
+ return io_wait(ptr, MEM_WRITE);
+}
memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
- void *buffer, size_t length, ssize_t *nread)
+ void *buffer, size_t length, ssize_t& nread)
{
- char *buffer_ptr;
+ assert(memcached_is_udp(ptr->root) == false);
+ assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error
+ char *buffer_ptr= static_cast<char *>(buffer);
- buffer_ptr= static_cast<char *>(buffer);
+ if (ptr->fd == INVALID_SOCKET)
+ {
+#if 0
+ assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
+#endif
+ return MEMCACHED_CONNECTION_FAILURE;
+ }
while (length)
{
- if (not ptr->read_buffer_length)
+ if (ptr->read_buffer_length == 0)
{
ssize_t data_read;
-
- while (1)
+ do
{
- data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0);
- if (data_read > 0)
- {
- break;
- }
- else if (data_read == SOCKET_ERROR)
+ data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
+ if (data_read == SOCKET_ERROR)
{
- ptr->cached_errno= get_socket_errno();
- memcached_return_t rc= MEMCACHED_ERRNO;
switch (get_socket_errno())
{
+ case EINTR: // We just retry
+ continue;
+
+ case ETIMEDOUT: // OSX
+#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
-#ifdef USE_EAGAIN
- case EAGAIN:
#endif
- case EINTR:
+ case EAGAIN:
#ifdef TARGET_OS_LINUX
case ERESTART:
#endif
- if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS)
- continue;
+ {
+ memcached_return_t io_wait_ret;
+ if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ)))
+ {
+ continue;
+ }
+
+ return io_wait_ret;
+ }
+
/* fall through */
+ case ENOTCONN: // Programmer Error
+ WATCHPOINT_ASSERT(0);
+ case ENOTSOCK:
+ WATCHPOINT_ASSERT(0);
+ case EBADF:
+ assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket");
+ case EINVAL:
+ case EFAULT:
+ case ECONNREFUSED:
default:
{
memcached_quit_server(ptr, true);
- *nread= -1;
- return memcached_set_error(*ptr, rc, MEMCACHED_AT);
+ nread= -1;
+ return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
}
}
- else
+ else if (data_read == 0)
{
/*
EOF. Any data received so far is incomplete
for blocking I/O we do not return 0 and for non-blocking case
it will return EGAIN if data is not immediatly available.
*/
- WATCHPOINT_STRING("We had a zero length recv()");
memcached_quit_server(ptr, true);
- *nread= -1;
- return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
+ nread= -1;
+ return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
+ memcached_literal_param("::rec() returned zero, server has disconnected"));
}
- }
+ ptr->io_wait_count._bytes_read+= data_read;
+ } while (data_read <= 0);
- ptr->io_bytes_sent = 0;
+ ptr->io_bytes_sent= 0;
ptr->read_data_length= (size_t) data_read;
ptr->read_buffer_length= (size_t) data_read;
ptr->read_ptr= ptr->read_buffer;
}
}
- ptr->server_failure_counter= 0;
- *nread = (ssize_t)(buffer_ptr - (char*)buffer);
+ nread= ssize_t(buffer_ptr - (char*)buffer);
+
return MEMCACHED_SUCCESS;
}
-static ssize_t _io_write(memcached_server_write_instance_st ptr,
- const void *buffer, size_t length, bool with_flush)
+memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
{
- size_t original_length;
- const char* buffer_ptr;
+ assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st");
+ assert(memcached_is_udp(ptr->root) == false);
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-
- original_length= length;
- buffer_ptr= static_cast<const char *>(buffer);
-
- while (length)
+ if (ptr->fd == INVALID_SOCKET)
{
- char *write_ptr;
- size_t should_write;
- size_t buffer_end;
+ assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state");
+ return MEMCACHED_CONNECTION_FAILURE;
+ }
- if (ptr->type == MEMCACHED_CONNECTION_UDP)
+ ssize_t data_read;
+ char buffer[MEMCACHED_MAX_BUFFER];
+ do
+ {
+ data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
+ if (data_read == SOCKET_ERROR)
{
- //UDP does not support partial writes
- buffer_end= MAX_UDP_DATAGRAM_LENGTH;
- should_write= length;
- if (ptr->write_buffer_offset + should_write > buffer_end)
+ switch (get_socket_errno())
{
- return -1;
+ case EINTR: // We just retry
+ continue;
+
+ case ETIMEDOUT: // OSX
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ case EAGAIN:
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ if (memcached_success(io_wait(ptr, MEM_READ)))
+ {
+ continue;
+ }
+ return MEMCACHED_IN_PROGRESS;
+
+ /* fall through */
+
+ case ENOTCONN: // Programmer Error
+ assert(0);
+ case ENOTSOCK:
+ assert(0);
+ case EBADF:
+ assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state");
+ case EINVAL:
+ case EFAULT:
+ case ECONNREFUSED:
+ default:
+ return MEMCACHED_CONNECTION_FAILURE; // We want this!
}
}
- else
- {
- buffer_end= MEMCACHED_MAX_BUFFER;
- should_write= buffer_end - ptr->write_buffer_offset;
- should_write= (should_write < length) ? should_write : length;
- }
+ } while (data_read > 0);
+
+ return MEMCACHED_CONNECTION_FAILURE;
+}
+
+static bool _io_write(memcached_server_write_instance_st ptr,
+ const void *buffer, size_t length, bool with_flush,
+ size_t& written)
+{
+ assert(ptr->fd != INVALID_SOCKET);
+ assert(memcached_is_udp(ptr->root) == false);
+
+ const char *buffer_ptr= static_cast<const char *>(buffer);
+
+ const size_t original_length= length;
+
+ while (length)
+ {
+ char *write_ptr;
+ size_t buffer_end= MEMCACHED_MAX_BUFFER;
+ size_t should_write= buffer_end -ptr->write_buffer_offset;
+ should_write= (should_write < length) ? should_write : length;
write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
memcpy(write_ptr, buffer_ptr, should_write);
buffer_ptr+= should_write;
length-= should_write;
- if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
+ if (ptr->write_buffer_offset == buffer_end)
{
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
memcached_return_t rc;
- ssize_t sent_length= io_flush(ptr, with_flush, &rc);
- if (sent_length == -1)
+ if (io_flush(ptr, with_flush, rc) == false)
{
- return -1;
- }
-
- /* If io_flush calls memcached_purge, sent_length may be 0 */
- unlikely (sent_length != 0)
- {
- WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end);
+ written= original_length -length;
+ return false;
}
}
}
{
memcached_return_t rc;
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- if (io_flush(ptr, with_flush, &rc) == -1)
+ if (io_flush(ptr, with_flush, rc) == false)
{
- return -1;
+ written= original_length -length;
+ return false;
}
}
- return (ssize_t) original_length;
+ written= original_length -length;
+
+ return true;
+}
+
+bool memcached_io_write(memcached_server_write_instance_st ptr)
+{
+ size_t written;
+ return _io_write(ptr, NULL, 0, true, written);
}
ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
- const void *buffer, size_t length, bool with_flush)
+ const void *buffer, const size_t length, const bool with_flush)
{
- return _io_write(ptr, buffer, length, with_flush);
+ size_t written;
+
+ if (_io_write(ptr, buffer, length, with_flush, written) == false)
+ {
+ return -1;
+ }
+
+ return ssize_t(written);
}
-ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
- const struct libmemcached_io_vector_st *vector,
- size_t number_of, bool with_flush)
+bool memcached_io_writev(memcached_server_write_instance_st ptr,
+ libmemcached_io_vector_st vector[],
+ const size_t number_of, const bool with_flush)
{
+ ssize_t complete_total= 0;
ssize_t total= 0;
for (size_t x= 0; x < number_of; x++, vector++)
{
- ssize_t returnable;
-
- if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
+ complete_total+= vector->length;
+ if (vector->length)
{
- return -1;
+ size_t written;
+ if ((_io_write(ptr, vector->buffer, vector->length, false, written)) == false)
+ {
+ return false;
+ }
+ total+= written;
}
- total+= returnable;
}
if (with_flush)
{
- if (memcached_io_write(ptr, NULL, 0, true) == -1)
+ if (memcached_io_write(ptr) == false)
{
- return -1;
+ return false;
}
}
- return total;
+ return (complete_total == total);
}
-memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr)
+void memcached_io_close(memcached_server_write_instance_st ptr)
{
if (ptr->fd == INVALID_SOCKET)
{
- return MEMCACHED_SUCCESS;
+ return;
}
/* in case of death shutdown to avoid blocking at close() */
{
WATCHPOINT_ERRNO(get_socket_errno());
}
-
- return MEMCACHED_SUCCESS;
+ ptr->state= MEMCACHED_SERVER_STATE_NEW;
+ ptr->fd= INVALID_SOCKET;
}
memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
{
#define MAX_SERVERS_TO_POLL 100
struct pollfd fds[MAX_SERVERS_TO_POLL];
- unsigned int host_index= 0;
+ nfds_t host_index= 0;
- for (uint32_t x= 0;
- x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL;
- ++x)
+ for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x)
{
- memcached_server_write_instance_st instance=
- memcached_server_instance_fetch(memc, x);
+ memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
if (instance->read_buffer_length > 0) /* I have data in the buffer */
+ {
return instance;
+ }
if (memcached_server_response_count(instance) > 0)
{
return NULL;
}
- switch (poll(fds, host_index, memc->poll_timeout))
+ int error= poll(fds, host_index, memc->poll_timeout);
+ switch (error)
{
case -1:
memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
break;
default:
- for (size_t x= 0; x < host_index; ++x)
+ for (nfds_t x= 0; x < host_index; ++x)
{
if (fds[x].revents & POLLIN)
{
for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
{
- memcached_server_write_instance_st instance=
- memcached_server_instance_fetch(memc, y);
+ memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y);
if (instance->fd == fds[x].fd)
- return instance;
- }
- }
- }
- }
-
- return NULL;
-}
-
-static ssize_t io_flush(memcached_server_write_instance_st ptr,
- const bool with_flush,
- memcached_return_t *error)
-{
- /*
- ** We might want to purge the input buffer if we haven't consumed
- ** any output yet... The test for the limits is the purge is inline
- ** in the purge function to avoid duplicating the logic..
- */
- {
- memcached_return_t rc;
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- rc= memcached_purge(ptr);
-
- if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
- {
- return -1;
- }
- }
- ssize_t sent_length;
- size_t return_length;
- char *local_write_ptr= ptr->write_buffer;
- size_t write_length= ptr->write_buffer_offset;
-
- *error= MEMCACHED_SUCCESS;
-
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-
- // UDP Sanity check, make sure that we are not sending somthing too big
- if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
- {
- return -1;
- }
-
- if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
- && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
- return 0;
-
- /* Looking for memory overflows */
-#if defined(DEBUG)
- if (write_length == MEMCACHED_MAX_BUFFER)
- WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
- WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
-#endif
-
- return_length= 0;
- while (write_length)
- {
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- WATCHPOINT_ASSERT(write_length > 0);
- sent_length= 0;
- if (ptr->type == MEMCACHED_CONNECTION_UDP)
- increment_udp_message_id(ptr);
-
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- if (with_flush)
- {
- sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
- }
- else
- {
- sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
- }
-
- if (sent_length == SOCKET_ERROR)
- {
- ptr->cached_errno= get_socket_errno();
-#if 0 // @todo I should look at why we hit this bit of code hard frequently
- WATCHPOINT_ERRNO(get_socket_errno());
- WATCHPOINT_NUMBER(get_socket_errno());
-#endif
- switch (get_socket_errno())
- {
- case ENOBUFS:
- continue;
- case EWOULDBLOCK:
-#ifdef USE_EAGAIN
- case EAGAIN:
-#endif
- {
- /*
- * We may be blocked on write because the input buffer
- * is full. Let's check if we have room in our input
- * buffer for more data and retry the write before
- * waiting..
- */
- if (repack_input_buffer(ptr) ||
- process_input_buffer(ptr))
- continue;
-
- memcached_return_t rc;
- rc= io_wait(ptr, MEM_WRITE);
-
- if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
{
- continue;
+ return instance;
}
-
- memcached_quit_server(ptr, true);
- return -1;
}
- case ENOTCONN:
- case EPIPE:
- default:
- memcached_quit_server(ptr, true);
- *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
- WATCHPOINT_ASSERT(ptr->fd == -1);
- return -1;
}
}
-
- if (ptr->type == MEMCACHED_CONNECTION_UDP &&
- (size_t)sent_length != write_length)
- {
- memcached_quit_server(ptr, true);
- return -1;
- }
-
- ptr->io_bytes_sent += (uint32_t) sent_length;
-
- local_write_ptr+= sent_length;
- write_length-= (uint32_t) sent_length;
- return_length+= (uint32_t) sent_length;
}
- WATCHPOINT_ASSERT(write_length == 0);
- // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
- // ptr->write_buffer_offset);
-
- // if we are a udp server, the begining of the buffer is reserverd for
- // the upd frame header
- if (ptr->type == MEMCACHED_CONNECTION_UDP)
- ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
- else
- ptr->write_buffer_offset= 0;
-
- return (ssize_t) return_length;
+ return NULL;
}
/*
*/
memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
void *dta,
- size_t size)
+ const size_t size)
{
size_t offset= 0;
char *data= static_cast<char *>(dta);
while (offset < size)
{
ssize_t nread;
- memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset,
- &nread);
- if (rc != MEMCACHED_SUCCESS)
+ memcached_return_t rc;
+
+ while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, nread))) { };
+
+ if (memcached_failed(rc))
{
return rc;
}
- offset+= (size_t) nread;
+ offset+= size_t(nread);
}
return MEMCACHED_SUCCESS;
memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
char *buffer_ptr,
- size_t size)
+ size_t size,
+ size_t& total_nr)
{
+ total_nr= 0;
bool line_complete= false;
- size_t total_nr= 0;
- while (!line_complete)
+ while (line_complete == false)
{
if (ptr->read_buffer_length == 0)
{
* the logic.
*/
ssize_t nread;
- memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
- if (rc != MEMCACHED_SUCCESS)
+ memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, nread);
+ if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
+ {
+ memcached_quit_server(ptr, true);
+ return memcached_set_error(*ptr, rc, MEMCACHED_AT);
+ }
+ else if (memcached_failed(rc))
{
return rc;
}
if (*buffer_ptr == '\n')
+ {
line_complete= true;
+ }
++buffer_ptr;
++total_nr;
}
/* Now let's look in the buffer and copy as we go! */
- while (ptr->read_buffer_length && total_nr < size && !line_complete)
+ while (ptr->read_buffer_length and total_nr < size and line_complete == false)
{
*buffer_ptr = *ptr->read_ptr;
if (*buffer_ptr == '\n')
+ {
line_complete = true;
+ }
--ptr->read_buffer_length;
++ptr->read_ptr;
++total_nr;
}
if (total_nr == size)
+ {
return MEMCACHED_PROTOCOL_ERROR;
+ }
}
return MEMCACHED_SUCCESS;
}
-
-/*
- * The udp request id consists of two seperate sections
- * 1) The thread id
- * 2) The message number
- * The thread id should only be set when the memcached_st struct is created
- * and should not be changed.
- *
- * The message num is incremented for each new message we send, this function
- * extracts the message number from message_id, increments it and then
- * writes the new value back into the header
- */
-static void increment_udp_message_id(memcached_server_write_instance_st ptr)
-{
- struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
- uint16_t cur_req= get_udp_datagram_request_id(header);
- int msg_num= get_msg_num_from_request_id(cur_req);
- int thread_id= get_thread_id_from_request_id(cur_req);
-
- if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
- msg_num= 0;
-
- header->request_id= htons((uint16_t) (thread_id | msg_num));
-}
-
-memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id)
-{
- if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
- return MEMCACHED_FAILURE;
-
- struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
- header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id)));
- header->num_datagrams= htons(1);
- header->sequence_number= htons(0);
-
- return MEMCACHED_SUCCESS;
-}