#include <libmemcached/common.h>
+#ifdef HAVE_SYS_SOCKET_H
+# include <sys/socket.h>
+#endif
+
+void initialize_binary_request(org::libmemcached::Instance* server, protocol_binary_request_header& header)
+{
+ server->request_id++;
+ header.request.magic= PROTOCOL_BINARY_REQ;
+ header.request.opaque= htons(server->request_id);
+}
+
enum memc_read_or_write {
MEM_READ,
MEM_WRITE
*
* @param ptr the server to pack
*/
-static bool repack_input_buffer(memcached_server_write_instance_st ptr)
+static bool repack_input_buffer(org::libmemcached::Instance* ptr)
{
if (ptr->read_ptr != ptr->read_buffer)
{
* @param ptr the server to star processing iput messages for
* @return true if we processed anything, false otherwise
*/
-static bool process_input_buffer(memcached_server_write_instance_st ptr)
+static bool process_input_buffer(org::libmemcached::Instance* ptr)
{
/*
** We might be able to process some of the response messages if we
return false;
}
-static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
+static memcached_return_t io_wait(org::libmemcached::Instance* ptr,
const memc_read_or_write read_or_write)
{
/*
return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
}
- int local_errno;
size_t loop_max= 5;
while (--loop_max) // While loop is for ERESTART or EINTR
{
if (active_fd >= 1)
{
- assert_msg(active_fd == 1 , "poll() returned an unexpected value");
- return MEMCACHED_SUCCESS;
+ assert_msg(active_fd == 1 , "poll() returned an unexpected number of active file descriptors");
+ if (fds.revents & POLLIN or fds.revents & POLLOUT)
+ {
+ return MEMCACHED_SUCCESS;
+ }
+
+ if (fds.revents & POLLHUP)
+ {
+ return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
+ memcached_literal_param("poll() detected hang up"));
+ }
+
+ if (fds.revents & POLLERR)
+ {
+ int local_errno= EINVAL;
+ int err;
+ socklen_t len= sizeof (err);
+ if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+ {
+ if (err == 0) // treat this as EINTR
+ {
+ continue;
+ }
+ local_errno= err;
+ }
+ memcached_quit_server(ptr, true);
+ return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT,
+ memcached_literal_param("poll() returned POLLHUP"));
+ }
+
+ return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT, memcached_literal_param("poll() returned a value that was not dealt with"));
}
- else if (active_fd == 0)
+
+ if (active_fd == 0)
{
ptr->io_wait_count.timeouts++;
return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
}
// Only an error should result in this code being called.
- local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
+ int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
assert_msg(active_fd == -1 , "poll() returned an unexpected value");
switch (local_errno)
{
case EFAULT:
case ENOMEM:
- return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+ memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
case EINVAL:
- return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+ memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
default:
- if (fds.revents & POLLERR)
- {
- int err;
- socklen_t len= sizeof (err);
- if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
- {
- if (err == 0) // treat this as EINTR
- {
- continue;
- }
- local_errno= err;
- }
- }
- break;
+ memcached_set_errno(*ptr, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
}
- break; // should only occur from poll error
+ break;
}
memcached_quit_server(ptr, true);
- return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
+ if (memcached_has_error(ptr))
+ {
+ return memcached_instance_error_return(ptr);
+ }
+
+ return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
+ memcached_literal_param("number of attempts to call io_wait() failed"));
}
-static bool io_flush(memcached_server_write_instance_st ptr,
+static bool io_flush(org::libmemcached::Instance* ptr,
const bool with_flush,
memcached_return_t& error)
{
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
WATCHPOINT_ASSERT(write_length > 0);
- int flags= with_flush ? MSG_NOSIGNAL|MSG_DONTWAIT : MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE;
+ int flags;
+ if (with_flush)
+ {
+ flags= MSG_NOSIGNAL|MSG_DONTWAIT;
+ }
+ else
+ {
+ flags= MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE;
+ }
+
ssize_t sent_length= ::send(ptr->fd, local_write_ptr, write_length, flags);
if (sent_length == SOCKET_ERROR)
return true;
}
-memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+memcached_return_t memcached_io_wait_for_write(org::libmemcached::Instance* ptr)
{
return io_wait(ptr, MEM_WRITE);
}
-static memcached_return_t _io_fill(memcached_server_write_instance_st ptr, ssize_t& nread)
+static memcached_return_t _io_fill(org::libmemcached::Instance* ptr)
{
ssize_t data_read;
do
case EFAULT:
case ECONNREFUSED:
default:
- {
- memcached_quit_server(ptr, true);
- nread= -1;
- return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
- }
+ memcached_quit_server(ptr, true);
+ memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ break;
}
+
+ return memcached_instance_error_return(ptr);
}
else if (data_read == 0)
{
it will return EGAIN if data is not immediatly available.
*/
memcached_quit_server(ptr, true);
- nread= -1;
return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
memcached_literal_param("::rec() returned zero, server has disconnected"));
}
return MEMCACHED_SUCCESS;
}
-memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
+memcached_return_t memcached_io_read(org::libmemcached::Instance* ptr,
void *buffer, size_t length, ssize_t& nread)
{
assert(memcached_is_udp(ptr->root) == false);
- assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error
+ assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error
char *buffer_ptr= static_cast<char *>(buffer);
if (ptr->fd == INVALID_SOCKET)
if (ptr->read_buffer_length == 0)
{
memcached_return_t io_fill_ret;
- if (memcached_fatal(io_fill_ret= _io_fill(ptr, nread)))
+ if (memcached_fatal(io_fill_ret= _io_fill(ptr)))
{
+ nread= -1;
return io_fill_ret;
}
}
return MEMCACHED_SUCCESS;
}
-memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
+memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr)
{
- assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st");
+ assert_msg(ptr, "Programmer error, invalid Instance");
assert(memcached_is_udp(ptr->root) == false);
if (ptr->fd == INVALID_SOCKET)
return MEMCACHED_CONNECTION_FAILURE;
}
-static bool _io_write(memcached_server_write_instance_st ptr,
+static bool _io_write(org::libmemcached::Instance* ptr,
const void *buffer, size_t length, bool with_flush,
size_t& written)
{
return true;
}
-bool memcached_io_write(memcached_server_write_instance_st ptr)
+bool memcached_io_write(org::libmemcached::Instance* ptr)
{
size_t written;
return _io_write(ptr, NULL, 0, true, written);
}
-ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
+ssize_t memcached_io_write(org::libmemcached::Instance* ptr,
const void *buffer, const size_t length, const bool with_flush)
{
size_t written;
return ssize_t(written);
}
-bool memcached_io_writev(memcached_server_write_instance_st ptr,
- libmemcached_io_vector_st vector[],
- const size_t number_of, const bool with_flush)
+bool memcached_io_writev(org::libmemcached::Instance* ptr,
+ libmemcached_io_vector_st vector[],
+ const size_t number_of, const bool with_flush)
{
ssize_t complete_total= 0;
ssize_t total= 0;
}
-void memcached_io_close(memcached_server_write_instance_st ptr)
+void memcached_io_close(org::libmemcached::Instance* ptr)
{
if (ptr->fd == INVALID_SOCKET)
{
}
/* in case of death shutdown to avoid blocking at close() */
- if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
+ if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR and get_socket_errno() != ENOTCONN)
{
WATCHPOINT_NUMBER(ptr->fd);
WATCHPOINT_ERRNO(get_socket_errno());
ptr->fd= INVALID_SOCKET;
}
-memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
+org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc)
{
#define MAX_SERVERS_TO_POLL 100
struct pollfd fds[MAX_SERVERS_TO_POLL];
for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x)
{
- memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
+ org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x);
if (instance->read_buffer_length > 0) /* I have data in the buffer */
{
return instance;
}
- if (memcached_server_response_count(instance) > 0)
+ if (memcached_instance_response_count(instance) > 0)
{
- fds[host_index].events = POLLIN;
- fds[host_index].revents = 0;
- fds[host_index].fd = instance->fd;
+ fds[host_index].events= POLLIN;
+ fds[host_index].revents= 0;
+ fds[host_index].fd= instance->fd;
++host_index;
}
}
/* We have 0 or 1 server with pending events.. */
for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
{
- memcached_server_write_instance_st instance=
- memcached_server_instance_fetch(memc, x);
+ org::libmemcached::Instance* instance= memcached_instance_fetch(memc, x);
- if (memcached_server_response_count(instance) > 0)
+ if (memcached_instance_response_count(instance) > 0)
{
return instance;
}
{
for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
{
- memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y);
+ org::libmemcached::Instance* instance= memcached_instance_fetch(memc, y);
if (instance->fd == fds[x].fd)
{
/*
Eventually we will just kill off the server with the problem.
*/
-void memcached_io_reset(memcached_server_write_instance_st ptr)
+void memcached_io_reset(org::libmemcached::Instance* ptr)
{
memcached_quit_server(ptr, true);
}
* Read a given number of bytes from the server and place it into a specific
* buffer. Reset the IO channel on this server if an error occurs.
*/
-memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
+memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr,
void *dta,
const size_t size)
{
return MEMCACHED_SUCCESS;
}
-memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
+memcached_return_t memcached_io_readline(org::libmemcached::Instance* ptr,
char *buffer_ptr,
size_t size,
size_t& total_nr)