MEM_WRITE
};
-/*
- * The udp request id consists of two seperate sections
- * 1) The thread id
- * 2) The message number
- * The thread id should only be set when the memcached_st struct is created
- * and should not be changed.
- *
- * The message num is incremented for each new message we send, this function
- * extracts the message number from message_id, increments it and then
- * writes the new value back into the header
- */
-static void increment_udp_message_id(memcached_server_write_instance_st ptr)
-{
- struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
- uint16_t cur_req= get_udp_datagram_request_id(header);
- int msg_num= get_msg_num_from_request_id(cur_req);
- int thread_id= get_thread_id_from_request_id(cur_req);
-
- if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
- msg_num= 0;
-
- header->request_id= htons((uint16_t) (thread_id | msg_num));
-}
-
/**
* Try to fill the input buffer for a server with as much
* data as possible.
}
static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
- memc_read_or_write read_or_write)
+ const memc_read_or_write read_or_write)
{
struct pollfd fds;
fds.fd= ptr->fd;
return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
-static ssize_t io_flush(memcached_server_write_instance_st ptr,
- const bool with_flush,
- memcached_return_t *error)
+static bool io_flush(memcached_server_write_instance_st ptr,
+ const bool with_flush,
+ memcached_return_t& error)
{
/*
** We might want to purge the input buffer if we haven't consumed
if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
{
- return -1;
+ return false;
}
}
- size_t return_length;
char *local_write_ptr= ptr->write_buffer;
size_t write_length= ptr->write_buffer_offset;
- *error= MEMCACHED_SUCCESS;
+ error= MEMCACHED_SUCCESS;
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
// UDP Sanity check, make sure that we are not sending somthing too big
if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH)
{
- *error= MEMCACHED_WRITE_FAILURE;
- return -1;
+ error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
+ return false;
}
if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
{
- return 0;
+ return true;
}
/* Looking for memory overflows */
WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
#endif
- return_length= 0;
while (write_length)
{
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
if (with_flush)
{
- sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
+ sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
}
else
{
- sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
+ sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
}
if (sent_length == SOCKET_ERROR)
}
else if (rc == MEMCACHED_TIMEOUT)
{
- *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
- return -1;
+ error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+ return false;
}
memcached_quit_server(ptr, true);
- *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
- return -1;
+ error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ return false;
}
case ENOTCONN:
case EPIPE:
default:
memcached_quit_server(ptr, true);
- *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
- WATCHPOINT_ASSERT(ptr->fd == -1);
- return -1;
+ error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET);
+ return false;
}
}
if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length)
{
memcached_quit_server(ptr, true);
- *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
- return -1;
+ error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
+ return false;
}
ptr->io_bytes_sent+= uint32_t(sent_length);
local_write_ptr+= sent_length;
write_length-= uint32_t(sent_length);
- return_length+= uint32_t(sent_length);
}
WATCHPOINT_ASSERT(write_length == 0);
- // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
- // ptr->write_buffer_offset);
-
- // if we are a udp server, the begining of the buffer is reserverd for
- // the upd frame header
if (memcached_is_udp(ptr->root))
{
ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
ptr->write_buffer_offset= 0;
}
- return (ssize_t) return_length;
+ return true;
}
memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
if (ptr->fd == INVALID_SOCKET)
{
+#if 0
assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
+#endif
return MEMCACHED_CONNECTION_FAILURE;
}
ssize_t data_read;
do
{
- data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
+ data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
if (data_read == SOCKET_ERROR)
{
switch (get_socket_errno())
WATCHPOINT_STRING("We had a zero length recv()");
memcached_quit_server(ptr, true);
*nread= -1;
- return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
+ return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT,
+ memcached_literal_param("::rec() returned zero, server has disconnected"));
}
} while (data_read <= 0);
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
memcached_return_t rc;
- ssize_t sent_length= io_flush(ptr, with_flush, &rc);
- if (sent_length == -1)
+ if (io_flush(ptr, with_flush, rc) == false)
{
return -1;
}
-
- /* If io_flush calls memcached_purge, sent_length may be 0 */
- unlikely (sent_length != 0)
- {
- WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end);
- }
}
}
{
memcached_return_t rc;
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- if (io_flush(ptr, with_flush, &rc) == -1)
+ if (io_flush(ptr, with_flush, rc) == false)
{
return -1;
}
return (ssize_t) original_length;
}
+bool memcached_io_write(memcached_server_write_instance_st ptr)
+{
+ return (_io_write(ptr, NULL, 0, true) >= 0);
+}
+
ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
- const void *buffer, size_t length, bool with_flush)
+ const void *buffer, const size_t length, const bool with_flush)
{
return _io_write(ptr, buffer, length, with_flush);
}
ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
- const struct libmemcached_io_vector_st *vector,
- size_t number_of, bool with_flush)
+ libmemcached_io_vector_st vector[],
+ const size_t number_of, const bool with_flush)
{
ssize_t total= 0;
{
ssize_t returnable;
- if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
+ if (vector->length)
{
- return -1;
+ if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
+ {
+ return -1;
+ }
+ total+= returnable;
}
- total+= returnable;
}
if (with_flush)
{
- if (memcached_io_write(ptr, NULL, 0, true) == -1)
+ if (memcached_io_write(ptr) == false)
{
return -1;
}
for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
{
- memcached_server_write_instance_st instance=
- memcached_server_instance_fetch(memc, x);
+ memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
if (instance->read_buffer_length > 0) /* I have data in the buffer */
+ {
return instance;
+ }
if (memcached_server_response_count(instance) > 0)
{
*/
memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
void *dta,
- size_t size)
+ const size_t size)
{
size_t offset= 0;
char *data= static_cast<char *>(dta);
total_nr= 0;
bool line_complete= false;
- while (not line_complete)
+ while (line_complete == false)
{
if (ptr->read_buffer_length == 0)
{
}
if (*buffer_ptr == '\n')
+ {
line_complete= true;
+ }
++buffer_ptr;
++total_nr;
{
*buffer_ptr = *ptr->read_ptr;
if (*buffer_ptr == '\n')
+ {
line_complete = true;
+ }
--ptr->read_buffer_length;
++ptr->read_ptr;
++total_nr;
}
if (total_nr == size)
+ {
return MEMCACHED_PROTOCOL_ERROR;
+ }
}
return MEMCACHED_SUCCESS;
}
-
-memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id)
-{
- if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
- return MEMCACHED_FAILURE;
-
- struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
- header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id)));
- header->num_datagrams= htons(1);
- header->sequence_number= htons(0);
-
- return MEMCACHED_SUCCESS;
-}