case EINTR:
continue;
+#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
-#ifdef USE_EAGAIN
- case EAGAIN:
#endif
+ case EAGAIN:
#ifdef TARGET_OS_LINUX
case ERESTART:
#endif
** We might be able to process some of the response messages if we
** have a callback set up
*/
- if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
+ if (ptr->root->callbacks != NULL)
{
/*
* We might have responses... try to read them out and fire
memcached_set_processing_input((memcached_st *)ptr->root, true);
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
- memcached_return_t error;
memcached_st *root= (memcached_st *)ptr->root;
- error= memcached_response(ptr, buffer, sizeof(buffer),
- &root->result);
+ memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result);
memcached_set_processing_input(root, false);
{
error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
if (error != MEMCACHED_SUCCESS)
+ {
break;
+ }
}
/* @todo what should I do with the error message??? */
static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
const memc_read_or_write read_or_write)
{
- struct pollfd fds;
- fds.fd= ptr->fd;
- fds.events= POLLIN;
-
- if (read_or_write == MEM_WRITE) /* write */
- {
- fds.events= POLLOUT;
- WATCHPOINT_SET(ptr->io_wait_count.write++);
- }
- else
- {
- WATCHPOINT_SET(ptr->io_wait_count.read++);
- }
-
/*
** We are going to block on write, but at least on Solaris we might block
** on write if we haven't read anything from our input buffer..
*/
if (read_or_write == MEM_WRITE)
{
- memcached_return_t rc= memcached_purge(ptr);
- if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+ if (memcached_fatal(memcached_purge(ptr)))
{
return MEMCACHED_FAILURE;
}
}
+ struct pollfd fds;
+ memset(&fds, 0, sizeof(pollfd));
+ fds.fd= ptr->fd;
+ fds.events= POLLIN;
+
+ if (read_or_write == MEM_WRITE) /* write */
+ {
+ fds.events= POLLOUT;
+ ptr->io_wait_count.write++;
+ }
+ else
+ {
+ ptr->io_wait_count.read++;
+ }
+
if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
{
return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
size_t loop_max= 5;
while (--loop_max) // While loop is for ERESTART or EINTR
{
+ int active_fd= poll(&fds, 1, ptr->root->poll_timeout);
+ assert_msg(active_fd <= 1 , "poll() returned an unexpected value");
- int error= poll(&fds, 1, ptr->root->poll_timeout);
- switch (error)
+ if (active_fd == 1)
{
- case 1: // Success!
- WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
- WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
-
return MEMCACHED_SUCCESS;
-
- case 0: // Timeout occured, we let the while() loop do its thing.
+ }
+ else if (active_fd == 0)
+ {
+ ptr->io_wait_count.timeouts++;
return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
-
- default:
- WATCHPOINT_ERRNO(get_socket_errno());
+ }
+ else // -1
+ {
switch (get_socket_errno())
{
#ifdef TARGET_OS_LINUX
return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
default:
+ int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
if (fds.revents & POLLERR)
{
int err;
socklen_t len= sizeof (err);
if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
{
- if (err == 0)
+ if (err == 0) // treat this as EINTR
{
continue;
}
- errno= err;
+ local_errno= err;
}
}
- else
- {
- memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
- }
- int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
memcached_quit_server(ptr, true);
return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
** in the purge function to avoid duplicating the logic..
*/
{
- memcached_return_t rc;
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- rc= memcached_purge(ptr);
+ memcached_return_t rc= memcached_purge(ptr);
- if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+ if (rc != MEMCACHED_SUCCESS and rc != MEMCACHED_STORED)
{
return false;
}
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- // UDP Sanity check, make sure that we are not sending somthing too big
- if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH)
- {
- error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
- return false;
- }
-
- if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
- {
- return true;
- }
-
/* Looking for memory overflows */
#if defined(DEBUG)
if (write_length == MEMCACHED_MAX_BUFFER)
{
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
WATCHPOINT_ASSERT(write_length > 0);
- if (memcached_is_udp(ptr->root))
- {
- increment_udp_message_id(ptr);
- }
ssize_t sent_length= 0;
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
if (sent_length == SOCKET_ERROR)
{
- memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
#if 0 // @todo I should look at why we hit this bit of code hard frequently
WATCHPOINT_ERRNO(get_socket_errno());
WATCHPOINT_NUMBER(get_socket_errno());
{
case ENOBUFS:
continue;
+
+#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
-#ifdef USE_EAGAIN
- case EAGAIN:
#endif
+ case EAGAIN:
{
/*
* We may be blocked on write because the input buffer
}
else if (rc == MEMCACHED_TIMEOUT)
{
+ ptr->io_wait_count.timeouts++;
error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
return false;
}
}
}
- if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length)
- {
- memcached_quit_server(ptr, true);
- error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
- return false;
- }
-
ptr->io_bytes_sent+= uint32_t(sent_length);
local_write_ptr+= sent_length;
}
WATCHPOINT_ASSERT(write_length == 0);
- if (memcached_is_udp(ptr->root))
- {
- ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
- }
- else
- {
- ptr->write_buffer_offset= 0;
- }
+ ptr->write_buffer_offset= 0;
return true;
}
memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
void *buffer, size_t length, ssize_t& nread)
{
+ assert(memcached_is_udp(ptr->root) == false);
assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error
char *buffer_ptr= static_cast<char *>(buffer);
while (length)
{
- if (not ptr->read_buffer_length)
+ if (ptr->read_buffer_length == 0)
{
ssize_t data_read;
do
continue;
case ETIMEDOUT: // OSX
+#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
-#ifdef USE_EAGAIN
- case EAGAIN:
#endif
+ case EAGAIN:
#ifdef TARGET_OS_LINUX
case ERESTART:
#endif
- if (memcached_success(io_wait(ptr, MEM_READ)))
{
- continue;
+ memcached_return_t io_wait_ret;
+ if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ)))
+ {
+ continue;
+ }
+
+ return io_wait_ret;
}
- return MEMCACHED_IN_PROGRESS;
/* fall through */
memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
{
assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st");
+ assert(memcached_is_udp(ptr->root) == false);
if (ptr->fd == INVALID_SOCKET)
{
continue;
case ETIMEDOUT: // OSX
+#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
-#ifdef USE_EAGAIN
- case EAGAIN:
#endif
+ case EAGAIN:
#ifdef TARGET_OS_LINUX
case ERESTART:
#endif
const void *buffer, size_t length, bool with_flush)
{
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+ assert(memcached_is_udp(ptr->root) == false);
size_t original_length= length;
const char *buffer_ptr= static_cast<const char *>(buffer);
while (length)
{
char *write_ptr;
- size_t should_write;
- size_t buffer_end;
-
- if (memcached_is_udp(ptr->root))
- {
- //UDP does not support partial writes
- buffer_end= MAX_UDP_DATAGRAM_LENGTH;
- should_write= length;
- if (ptr->write_buffer_offset + should_write > buffer_end)
- {
- return -1;
- }
- }
- else
- {
- buffer_end= MEMCACHED_MAX_BUFFER;
- should_write= buffer_end - ptr->write_buffer_offset;
- should_write= (should_write < length) ? should_write : length;
- }
+ size_t buffer_end= MEMCACHED_MAX_BUFFER;
+ size_t should_write= buffer_end -ptr->write_buffer_offset;
+ should_write= (should_write < length) ? should_write : length;
write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
memcpy(write_ptr, buffer_ptr, should_write);
buffer_ptr+= should_write;
length-= should_write;
- if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false)
+ if (ptr->write_buffer_offset == buffer_end)
{
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
}
}
- return (ssize_t) original_length;
+ return ssize_t(original_length);
}
bool memcached_io_write(memcached_server_write_instance_st ptr)
{
#define MAX_SERVERS_TO_POLL 100
struct pollfd fds[MAX_SERVERS_TO_POLL];
- unsigned int host_index= 0;
+ nfds_t host_index= 0;
- for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
+ for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x)
{
memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
break;
default:
- for (size_t x= 0; x < host_index; ++x)
+ for (nfds_t x= 0; x < host_index; ++x)
{
if (fds[x].revents & POLLIN)
{
for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
{
- memcached_server_write_instance_st instance=
- memcached_server_instance_fetch(memc, y);
+ memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y);
if (instance->fd == fds[x].fd)
+ {
return instance;
+ }
}
}
}