*/
-#include "libmemcached/common.h"
+#include <libmemcached/common.h>
+#include <cassert>
-typedef enum {
+enum memc_read_or_write {
MEM_READ,
MEM_WRITE
-} memc_read_or_write;
+};
static ssize_t io_flush(memcached_server_write_instance_st ptr,
const bool with_flush,
fds.fd= ptr->fd;
fds.events= POLLIN;
- int error;
-
if (read_or_write == MEM_WRITE) /* write */
{
fds.events= POLLOUT;
{
memcached_return_t rc= memcached_purge(ptr);
if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+ {
return MEMCACHED_FAILURE;
+ }
+ }
+
+ if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
+ {
+ return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
}
size_t loop_max= 5;
while (--loop_max) // While loop is for ERESTART or EINTR
{
- error= poll(&fds, 1, ptr->root->poll_timeout);
+ int error= poll(&fds, 1, ptr->root->poll_timeout);
switch (error)
{
case 1: // Success!
WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
return MEMCACHED_SUCCESS;
+
case 0: // Timeout occured, we let the while() loop do its thing.
- return MEMCACHED_TIMEOUT;
+ return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+
default:
WATCHPOINT_ERRNO(get_socket_errno());
switch (get_socket_errno())
#endif
case EINTR:
break;
+
+ case EFAULT:
+ case ENOMEM:
+ return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+
+ case EINVAL:
+ return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+
default:
if (fds.revents & POLLERR)
{
int err;
socklen_t len= sizeof (err);
(void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
- ptr->cached_errno= (err == 0) ? get_socket_errno() : err;
+ memcached_set_errno(*ptr, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT);
}
else
{
- ptr->cached_errno= get_socket_errno();
+ memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
memcached_quit_server(ptr, true);
- return MEMCACHED_FAILURE;
+ return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
}
}
- /* Imposssible for anything other then -1 */
- WATCHPOINT_ASSERT(error == -1);
- ptr->cached_errno= get_socket_errno();
memcached_quit_server(ptr, true);
- return MEMCACHED_FAILURE;
+ return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
/* There is room in the buffer, try to fill it! */
if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
{
- /* Just try a single read to grab what's available */
- ssize_t nr= recv(ptr->fd,
- ptr->read_ptr + ptr->read_data_length,
- MEMCACHED_MAX_BUFFER - ptr->read_data_length,
- 0);
+ do {
+ /* Just try a single read to grab what's available */
+ ssize_t nr= recv(ptr->fd,
+ ptr->read_ptr + ptr->read_data_length,
+ MEMCACHED_MAX_BUFFER - ptr->read_data_length,
+ MSG_DONTWAIT);
+
+ switch (nr)
+ {
+ case SOCKET_ERROR:
+ {
+ switch (get_socket_errno())
+ {
+ case EINTR:
+ continue;
- if (nr > 0)
- {
- ptr->read_data_length+= (size_t)nr;
- ptr->read_buffer_length+= (size_t)nr;
- return true;
- }
+ case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+ case EAGAIN:
+#endif
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ break; // No IO is fine, we can just move on
+
+ default:
+ memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ }
+ }
+ break;
+
+ case 0: // Shutdown on the socket has occurred
+ {
+ memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
+ }
+ break;
+
+ default:
+ {
+ ptr->read_data_length+= size_t(nr);
+ ptr->read_buffer_length+= size_t(nr);
+ return true;
+ }
+ break;
+ }
+ } while (0);
}
return false;
}
return false;
}
-#if 0 // Dead code, this should be removed.
-void memcached_io_preread(memcached_st *ptr)
-{
- unsigned int x;
-
- return;
-
- for (x= 0; x < memcached_server_count(ptr); x++)
- {
- if (memcached_server_response_count(ptr, x) &&
- ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
- {
- size_t data_read;
-
- data_read= recv(ptr->hosts[x].fd,
- ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
- MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length, 0);
- if (data_read == SOCKET_ERROR)
- continue;
-
- ptr->hosts[x].read_buffer_length+= data_read;
- ptr->hosts[x].read_data_length+= data_read;
- }
- }
-}
-#endif
-
memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
void *buffer, size_t length, ssize_t *nread)
{
- char *buffer_ptr;
+ assert(ptr); // Programmer error
+ char *buffer_ptr= static_cast<char *>(buffer);
- buffer_ptr= static_cast<char *>(buffer);
+ if (ptr->fd == INVALID_SOCKET)
+ {
+ assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
+ return MEMCACHED_CONNECTION_FAILURE;
+ }
while (length)
{
if (not ptr->read_buffer_length)
{
ssize_t data_read;
-
- while (1)
+ do
{
- data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0);
- if (data_read > 0)
+ data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
+ if (data_read == SOCKET_ERROR)
{
- break;
- }
- else if (data_read == SOCKET_ERROR)
- {
- ptr->cached_errno= get_socket_errno();
- memcached_return_t rc= MEMCACHED_ERRNO;
switch (get_socket_errno())
{
+ case EINTR: // We just retry
+ continue;
+
+ case ETIMEDOUT: // OSX
case EWOULDBLOCK:
#ifdef USE_EAGAIN
case EAGAIN:
#endif
- case EINTR:
#ifdef TARGET_OS_LINUX
case ERESTART:
#endif
- if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS)
+ if (memcached_success(io_wait(ptr, MEM_READ)))
+ {
continue;
+ }
+ return MEMCACHED_IN_PROGRESS;
+
/* fall through */
+ case ENOTCONN: // Programmer Error
+ WATCHPOINT_ASSERT(0);
+ case ENOTSOCK:
+ WATCHPOINT_ASSERT(0);
+ case EBADF:
+ assert(ptr->fd != INVALID_SOCKET);
+ case EINVAL:
+ case EFAULT:
+ case ECONNREFUSED:
default:
{
memcached_quit_server(ptr, true);
*nread= -1;
- return rc;
+ return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
}
}
- else
+ else if (data_read == 0)
{
/*
EOF. Any data received so far is incomplete
it will return EGAIN if data is not immediatly available.
*/
WATCHPOINT_STRING("We had a zero length recv()");
+ assert(0);
memcached_quit_server(ptr, true);
*nread= -1;
- return MEMCACHED_UNKNOWN_READ_FAILURE;
+ return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
}
- }
+ } while (data_read <= 0);
ptr->io_bytes_sent = 0;
ptr->read_data_length= (size_t) data_read;
return MEMCACHED_SUCCESS;
}
+memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
+{
+ assert(ptr); // Programmer error
+
+ if (ptr->fd == INVALID_SOCKET)
+ {
+ assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
+ return MEMCACHED_CONNECTION_FAILURE;
+ }
+
+ ssize_t data_read;
+ char buffer[MEMCACHED_MAX_BUFFER];
+ do
+ {
+ data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
+ if (data_read == SOCKET_ERROR)
+ {
+ switch (get_socket_errno())
+ {
+ case EINTR: // We just retry
+ continue;
+
+ case ETIMEDOUT: // OSX
+ case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+ case EAGAIN:
+#endif
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ if (memcached_success(io_wait(ptr, MEM_READ)))
+ {
+ continue;
+ }
+ return MEMCACHED_IN_PROGRESS;
+
+ /* fall through */
+
+ case ENOTCONN: // Programmer Error
+ WATCHPOINT_ASSERT(0);
+ case ENOTSOCK:
+ WATCHPOINT_ASSERT(0);
+ case EBADF:
+ assert(ptr->fd != INVALID_SOCKET);
+ case EINVAL:
+ case EFAULT:
+ case ECONNREFUSED:
+ default:
+ return MEMCACHED_CONNECTION_FAILURE; // We want this!
+ }
+ }
+ } while (data_read > 0);
+
+ return MEMCACHED_CONNECTION_FAILURE;
+}
+
static ssize_t _io_write(memcached_server_write_instance_st ptr,
const void *buffer, size_t length, bool with_flush)
{
if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
{
- memcached_return_t rc;
- ssize_t sent_length;
-
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- sent_length= io_flush(ptr, with_flush, &rc);
+
+ memcached_return_t rc;
+ ssize_t sent_length= io_flush(ptr, with_flush, &rc);
if (sent_length == -1)
{
return -1;
}
-memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr)
+void memcached_io_close(memcached_server_write_instance_st ptr)
{
if (ptr->fd == INVALID_SOCKET)
{
- return MEMCACHED_SUCCESS;
+ return;
}
/* in case of death shutdown to avoid blocking at close() */
{
WATCHPOINT_ERRNO(get_socket_errno());
}
-
- return MEMCACHED_SUCCESS;
+ ptr->state= MEMCACHED_SERVER_STATE_NEW;
+ ptr->fd= INVALID_SOCKET;
}
memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
struct pollfd fds[MAX_SERVERS_TO_POLL];
unsigned int host_index= 0;
- for (uint32_t x= 0;
- x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL;
- ++x)
+ for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
{
memcached_server_write_instance_st instance=
memcached_server_instance_fetch(memc, x);
return NULL;
}
- int err= poll(fds, host_index, memc->poll_timeout);
- switch (err) {
+ int error= poll(fds, host_index, memc->poll_timeout);
+ switch (error)
+ {
case -1:
- memcached_set_errno(memc, get_socket_errno(), NULL);
+ memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
/* FALLTHROUGH */
case 0:
break;
+
default:
for (size_t x= 0; x < host_index; ++x)
{
// UDP Sanity check, make sure that we are not sending somthing too big
if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
{
+ *error= MEMCACHED_WRITE_FAILURE;
return -1;
}
if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
&& ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
+ {
return 0;
+ }
/* Looking for memory overflows */
#if defined(DEBUG)
if (sent_length == SOCKET_ERROR)
{
- ptr->cached_errno= get_socket_errno();
+ memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
#if 0 // @todo I should look at why we hit this bit of code hard frequently
WATCHPOINT_ERRNO(get_socket_errno());
WATCHPOINT_NUMBER(get_socket_errno());
* buffer for more data and retry the write before
* waiting..
*/
- if (repack_input_buffer(ptr) ||
+ if (repack_input_buffer(ptr) or
process_input_buffer(ptr))
+ {
continue;
+ }
- memcached_return_t rc;
- rc= io_wait(ptr, MEM_WRITE);
-
- if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
+ memcached_return_t rc= io_wait(ptr, MEM_WRITE);
+ if (memcached_success(rc))
+ {
continue;
+ }
+ else if (rc == MEMCACHED_TIMEOUT)
+ {
+ *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+ return -1;
+ }
memcached_quit_server(ptr, true);
+ *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
return -1;
}
case ENOTCONN:
case EPIPE:
default:
memcached_quit_server(ptr, true);
- *error= MEMCACHED_ERRNO;
+ *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
WATCHPOINT_ASSERT(ptr->fd == -1);
return -1;
}
}
- if (ptr->type == MEMCACHED_CONNECTION_UDP &&
+ if (ptr->type == MEMCACHED_CONNECTION_UDP and
(size_t)sent_length != write_length)
{
memcached_quit_server(ptr, true);
+ *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
return -1;
}
while (offset < size)
{
ssize_t nread;
- memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset,
- &nread);
- if (rc != MEMCACHED_SUCCESS)
+ memcached_return_t rc;
+
+ while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { };
+
+ if (memcached_failed(rc))
+ {
return rc;
+ }
offset+= (size_t) nread;
}
bool line_complete= false;
size_t total_nr= 0;
- while (!line_complete)
+ while (not line_complete)
{
if (ptr->read_buffer_length == 0)
{
*/
ssize_t nread;
memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
- if (rc != MEMCACHED_SUCCESS)
+ if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
+ {
+ memcached_quit_server(ptr, true);
+ return memcached_set_error(*ptr, rc, MEMCACHED_AT);
+ }
+ else if (memcached_failed(rc))
+ {
return rc;
+ }
if (*buffer_ptr == '\n')
line_complete= true;