#include <libmemcached/common.h>
+void initialize_binary_request(memcached_server_write_instance_st server, protocol_binary_request_header& header)
+{
+ server->request_id++;
+ header.request.magic= PROTOCOL_BINARY_REQ;
+ header.request.opaque= htons(server->request_id);
+}
+
enum memc_read_or_write {
MEM_READ,
MEM_WRITE
return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
}
- int local_errno;
size_t loop_max= 5;
while (--loop_max) // While loop is for ERESTART or EINTR
{
if (active_fd >= 1)
{
- assert_msg(active_fd == 1 , "poll() returned an unexpected value");
- return MEMCACHED_SUCCESS;
+ assert_msg(active_fd == 1 , "poll() returned an unexpected number of active file descriptors");
+ if (fds.revents & POLLIN or fds.revents & POLLOUT)
+ {
+ return MEMCACHED_SUCCESS;
+ }
+
+ if (fds.revents & POLLHUP)
+ {
+ return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
+ memcached_literal_param("poll() detected hang up"));
+ }
+
+ if (fds.revents & POLLERR)
+ {
+ int local_errno= EINVAL;
+ int err;
+ socklen_t len= sizeof (err);
+ if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+ {
+ if (err == 0) // treat this as EINTR
+ {
+ continue;
+ }
+ local_errno= err;
+ }
+ memcached_quit_server(ptr, true);
+ return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT,
+ memcached_literal_param("poll() returned POLLHUP"));
+ }
+
+ return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with"));
}
- else if (active_fd == 0)
+
+ if (active_fd == 0)
{
ptr->io_wait_count.timeouts++;
return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
}
// Only an error should result in this code being called.
- local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
+ int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
assert_msg(active_fd == -1 , "poll() returned an unexpected value");
switch (local_errno)
{
case EFAULT:
case ENOMEM:
- return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+ memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
case EINVAL:
- return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+ memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
default:
- if (fds.revents & POLLERR)
- {
- int err;
- socklen_t len= sizeof (err);
- if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
- {
- if (err == 0) // treat this as EINTR
- {
- continue;
- }
- local_errno= err;
- }
- }
- break;
+ memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
}
- break; // should only occur from poll error
+ break;
}
memcached_quit_server(ptr, true);
- return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
+ if (memcached_has_error(ptr))
+ {
+ return memcached_instance_error_return(ptr);
+ }
+
+ return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
+ memcached_literal_param("number of attempts to call io_wait() failed"));
}
static bool io_flush(memcached_server_write_instance_st ptr,
return io_wait(ptr, MEM_WRITE);
}
+static memcached_return_t _io_fill(memcached_server_write_instance_st ptr)
+{
+ ssize_t data_read;
+ do
+ {
+ data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
+ if (data_read == SOCKET_ERROR)
+ {
+ switch (get_socket_errno())
+ {
+ case EINTR: // We just retry
+ continue;
+
+ case ETIMEDOUT: // OSX
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ case EAGAIN:
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ {
+ memcached_return_t io_wait_ret;
+ if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ)))
+ {
+ continue;
+ }
+
+ return io_wait_ret;
+ }
+
+ /* fall through */
+
+ case ENOTCONN: // Programmer Error
+ WATCHPOINT_ASSERT(0);
+ case ENOTSOCK:
+ WATCHPOINT_ASSERT(0);
+ case EBADF:
+ assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket");
+ case EINVAL:
+ case EFAULT:
+ case ECONNREFUSED:
+ default:
+ memcached_quit_server(ptr, true);
+ memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ break;
+ }
+
+ return memcached_instance_error_return(ptr);
+ }
+ else if (data_read == 0)
+ {
+ /*
+ EOF. Any data received so far is incomplete
+ so discard it. This always reads by byte in case of TCP
+ and protocol enforcement happens at memcached_response()
+ looking for '\n'. We do not care for UDB which requests 8 bytes
+ at once. Generally, this means that connection went away. Since
+ for blocking I/O we do not return 0 and for non-blocking case
+ it will return EGAIN if data is not immediatly available.
+ */
+ memcached_quit_server(ptr, true);
+ return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
+ memcached_literal_param("::rec() returned zero, server has disconnected"));
+ }
+ ptr->io_wait_count._bytes_read+= data_read;
+ } while (data_read <= 0);
+
+ ptr->io_bytes_sent= 0;
+ ptr->read_data_length= (size_t) data_read;
+ ptr->read_buffer_length= (size_t) data_read;
+ ptr->read_ptr= ptr->read_buffer;
+
+ return MEMCACHED_SUCCESS;
+}
+
memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
void *buffer, size_t length, ssize_t& nread)
{
{
if (ptr->read_buffer_length == 0)
{
- ssize_t data_read;
- do
+ memcached_return_t io_fill_ret;
+ if (memcached_fatal(io_fill_ret= _io_fill(ptr)))
{
- data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
- if (data_read == SOCKET_ERROR)
- {
- switch (get_socket_errno())
- {
- case EINTR: // We just retry
- continue;
-
- case ETIMEDOUT: // OSX
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
- case EAGAIN:
-#ifdef TARGET_OS_LINUX
- case ERESTART:
-#endif
- {
- memcached_return_t io_wait_ret;
- if (memcached_success(io_wait_ret= io_wait(ptr, MEM_READ)))
- {
- continue;
- }
-
- return io_wait_ret;
- }
-
- /* fall through */
-
- case ENOTCONN: // Programmer Error
- WATCHPOINT_ASSERT(0);
- case ENOTSOCK:
- WATCHPOINT_ASSERT(0);
- case EBADF:
- assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket");
- case EINVAL:
- case EFAULT:
- case ECONNREFUSED:
- default:
- {
- memcached_quit_server(ptr, true);
- nread= -1;
- return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
- }
- }
- }
- else if (data_read == 0)
- {
- /*
- EOF. Any data received so far is incomplete
- so discard it. This always reads by byte in case of TCP
- and protocol enforcement happens at memcached_response()
- looking for '\n'. We do not care for UDB which requests 8 bytes
- at once. Generally, this means that connection went away. Since
- for blocking I/O we do not return 0 and for non-blocking case
- it will return EGAIN if data is not immediatly available.
- */
- WATCHPOINT_STRING("We had a zero length recv()");
- memcached_quit_server(ptr, true);
- nread= -1;
- return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
- memcached_literal_param("::rec() returned zero, server has disconnected"));
- }
- } while (data_read <= 0);
-
- ptr->io_bytes_sent = 0;
- ptr->read_data_length= (size_t) data_read;
- ptr->read_buffer_length= (size_t) data_read;
- ptr->read_ptr= ptr->read_buffer;
+ nread= -1;
+ return io_fill_ret;
+ }
}
if (length > 1)
{
- size_t difference;
-
- difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
+ size_t difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
memcpy(buffer_ptr, ptr->read_ptr, difference);
length -= difference;
return instance;
}
- if (memcached_server_response_count(instance) > 0)
+ if (memcached_instance_response_count(instance) > 0)
{
- fds[host_index].events = POLLIN;
- fds[host_index].revents = 0;
- fds[host_index].fd = instance->fd;
+ fds[host_index].events= POLLIN;
+ fds[host_index].revents= 0;
+ fds[host_index].fd= instance->fd;
++host_index;
}
}
/* We have 0 or 1 server with pending events.. */
for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
{
- memcached_server_write_instance_st instance=
- memcached_server_instance_fetch(memc, x);
+ memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
- if (memcached_server_response_count(instance) > 0)
+ if (memcached_instance_response_count(instance) > 0)
{
return instance;
}