X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=2b463ec77b2a5568b8501289d276ff01c3581231;hb=fd8a1d785722f36d111915b0c96331e907df2746;hp=85e5ed42b82460a69d6c6b364b0af682ed189919;hpb=694f0966e3980f448c0ca9f9504e6d065e35654f;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 85e5ed42..2b463ec7 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -44,30 +44,6 @@ enum memc_read_or_write { MEM_WRITE }; -/* - * The udp request id consists of two seperate sections - * 1) The thread id - * 2) The message number - * The thread id should only be set when the memcached_st struct is created - * and should not be changed. - * - * The message num is incremented for each new message we send, this function - * extracts the message number from message_id, increments it and then - * writes the new value back into the header - */ -static void increment_udp_message_id(memcached_server_write_instance_st ptr) -{ - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - uint16_t cur_req= get_udp_datagram_request_id(header); - int msg_num= get_msg_num_from_request_id(cur_req); - int thread_id= get_thread_id_from_request_id(cur_req); - - if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) - msg_num= 0; - - header->request_id= htons((uint16_t) (thread_id | msg_num)); -} - /** * Try to fill the input buffer for a server with as much * data as possible. @@ -192,7 +168,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) } static memcached_return_t io_wait(memcached_server_write_instance_st ptr, - memc_read_or_write read_or_write) + const memc_read_or_write read_or_write) { struct pollfd fds; fds.fd= ptr->fd; @@ -294,9 +270,9 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } -static ssize_t io_flush(memcached_server_write_instance_st ptr, - const bool with_flush, - memcached_return_t *error) +static bool io_flush(memcached_server_write_instance_st ptr, + const bool with_flush, + memcached_return_t& error) { /* ** We might want to purge the input buffer if we haven't consumed @@ -310,27 +286,26 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) { - return -1; + return false; } } - size_t return_length; char *local_write_ptr= ptr->write_buffer; size_t write_length= ptr->write_buffer_offset; - *error= MEMCACHED_SUCCESS; + error= MEMCACHED_SUCCESS; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); // UDP Sanity check, make sure that we are not sending somthing too big if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH) { - *error= MEMCACHED_WRITE_FAILURE; - return -1; + error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + return false; } if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) { - return 0; + return true; } /* Looking for memory overflows */ @@ -340,7 +315,6 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); #endif - return_length= 0; while (write_length) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -354,11 +328,11 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); if (with_flush) { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); + sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); } else { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); + sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); } if (sent_length == SOCKET_ERROR) @@ -395,44 +369,38 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, } else if (rc == MEMCACHED_TIMEOUT) { - *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); - return -1; + error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return false; } memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - return -1; + error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + return false; } case ENOTCONN: case EPIPE: default: memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - WATCHPOINT_ASSERT(ptr->fd == -1); - return -1; + error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET); + return false; } } if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length) { memcached_quit_server(ptr, true); - *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return -1; + error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + return false; } ptr->io_bytes_sent+= uint32_t(sent_length); local_write_ptr+= sent_length; write_length-= uint32_t(sent_length); - return_length+= uint32_t(sent_length); } WATCHPOINT_ASSERT(write_length == 0); - // Need to study this assert() WATCHPOINT_ASSERT(return_length == - // ptr->write_buffer_offset); - - // if we are a udp server, the begining of the buffer is reserverd for - // the upd frame header if (memcached_is_udp(ptr->root)) { ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; @@ -442,7 +410,7 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, ptr->write_buffer_offset= 0; } - return (ssize_t) return_length; + return true; } memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) @@ -458,7 +426,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, if (ptr->fd == INVALID_SOCKET) { +#if 0 assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state"); +#endif return MEMCACHED_CONNECTION_FAILURE; } @@ -469,7 +439,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, ssize_t data_read; do { - data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); + data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); if (data_read == SOCKET_ERROR) { switch (get_socket_errno()) @@ -524,7 +494,8 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, WATCHPOINT_STRING("We had a zero length recv()"); memcached_quit_server(ptr, true); *nread= -1; - return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT); + return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT, + memcached_literal_param("::rec() returned zero, server has disconnected")); } } while (data_read <= 0); @@ -659,17 +630,10 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); memcached_return_t rc; - ssize_t sent_length= io_flush(ptr, with_flush, &rc); - if (sent_length == -1) + if (io_flush(ptr, with_flush, rc) == false) { return -1; } - - /* If io_flush calls memcached_purge, sent_length may be 0 */ - unlikely (sent_length != 0) - { - WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end); - } } } @@ -677,7 +641,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, { memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (io_flush(ptr, with_flush, &rc) == -1) + if (io_flush(ptr, with_flush, rc) == false) { return -1; } @@ -686,14 +650,31 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, return (ssize_t) original_length; } +bool memcached_io_write(memcached_server_write_instance_st ptr) +{ + return (_io_write(ptr, NULL, 0, true) >= 0); +} + ssize_t memcached_io_write(memcached_server_write_instance_st ptr, - const void *buffer, size_t length, bool with_flush) + const void *buffer, const size_t length, const bool with_flush) { return _io_write(ptr, buffer, length, with_flush); } +size_t io_vector_total_size(libmemcached_io_vector_st* vector, const size_t number_of) +{ + ssize_t total= 0; + + for (size_t x= 0; x < number_of; x++) + { + total+= vector->length; + } + + return total; +} + ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, - const struct libmemcached_io_vector_st *vector, + libmemcached_io_vector_st *vector, size_t number_of, bool with_flush) { ssize_t total= 0; @@ -702,16 +683,19 @@ ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, { ssize_t returnable; - if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + if (vector->length) { - return -1; + if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + { + return -1; + } + total+= returnable; } - total+= returnable; } if (with_flush) { - if (memcached_io_write(ptr, NULL, 0, true) == -1) + if (memcached_io_write(ptr) == false) { return -1; } @@ -752,11 +736,12 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, x); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ + { return instance; + } if (memcached_server_response_count(instance) > 0) { @@ -827,7 +812,7 @@ void memcached_io_reset(memcached_server_write_instance_st ptr) */ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, void *dta, - size_t size) + const size_t size) { size_t offset= 0; char *data= static_cast(dta); @@ -858,7 +843,7 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, total_nr= 0; bool line_complete= false; - while (not line_complete) + while (line_complete == false) { if (ptr->read_buffer_length == 0) { @@ -880,7 +865,9 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, } if (*buffer_ptr == '\n') + { line_complete= true; + } ++buffer_ptr; ++total_nr; @@ -891,7 +878,9 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, { *buffer_ptr = *ptr->read_ptr; if (*buffer_ptr == '\n') + { line_complete = true; + } --ptr->read_buffer_length; ++ptr->read_ptr; ++total_nr; @@ -899,21 +888,10 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, } if (total_nr == size) + { return MEMCACHED_PROTOCOL_ERROR; + } } return MEMCACHED_SUCCESS; } - -memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id) -{ - if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) - return MEMCACHED_FAILURE; - - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id))); - header->num_datagrams= htons(1); - header->sequence_number= htons(0); - - return MEMCACHED_SUCCESS; -}