X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=2b0866dc7ac9d86c6fd1626008272f3458fb5fe4;hb=51351e672a6a1626e09d5a9d41e3229df44fc3c8;hp=d7268940543cedeb0e7d8bc84145b442cfdcde74;hpb=969fea8e7bed82c109685ec3976cf7b0ec514ae9;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index d7268940..2b0866dc 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -44,30 +44,6 @@ enum memc_read_or_write { MEM_WRITE }; -/* - * The udp request id consists of two seperate sections - * 1) The thread id - * 2) The message number - * The thread id should only be set when the memcached_st struct is created - * and should not be changed. - * - * The message num is incremented for each new message we send, this function - * extracts the message number from message_id, increments it and then - * writes the new value back into the header - */ -static void increment_udp_message_id(memcached_server_write_instance_st ptr) -{ - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - uint16_t cur_req= get_udp_datagram_request_id(header); - int msg_num= get_msg_num_from_request_id(cur_req); - int thread_id= get_thread_id_from_request_id(cur_req); - - if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) - msg_num= 0; - - header->request_id= htons((uint16_t) (thread_id | msg_num)); -} - /** * Try to fill the input buffer for a server with as much * data as possible. @@ -192,7 +168,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) } static memcached_return_t io_wait(memcached_server_write_instance_st ptr, - memc_read_or_write read_or_write) + const memc_read_or_write read_or_write) { struct pollfd fds; fds.fd= ptr->fd; @@ -268,16 +244,23 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, { int err; socklen_t len= sizeof (err); - (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); - memcached_set_errno(*ptr, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT); + if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) + { + if (err == 0) + { + continue; + } + errno= err; + } } else { memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } + int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno memcached_quit_server(ptr, true); - return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT); } } } @@ -287,9 +270,9 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } -static ssize_t io_flush(memcached_server_write_instance_st ptr, - const bool with_flush, - memcached_return_t *error) +static bool io_flush(memcached_server_write_instance_st ptr, + const bool with_flush, + memcached_return_t& error) { /* ** We might want to purge the input buffer if we haven't consumed @@ -303,29 +286,26 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) { - return -1; + return false; } } - ssize_t sent_length; - size_t return_length; char *local_write_ptr= ptr->write_buffer; size_t write_length= ptr->write_buffer_offset; - *error= MEMCACHED_SUCCESS; + error= MEMCACHED_SUCCESS; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); // UDP Sanity check, make sure that we are not sending somthing too big - if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) + if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH) { - *error= MEMCACHED_WRITE_FAILURE; - return -1; + error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + return false; } - if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP - && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) + if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) { - return 0; + return true; } /* Looking for memory overflows */ @@ -335,23 +315,24 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); #endif - return_length= 0; while (write_length) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); - sent_length= 0; - if (ptr->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(ptr->root)) + { increment_udp_message_id(ptr); + } + ssize_t sent_length= 0; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); if (with_flush) { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); + sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); } else { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); + sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); } if (sent_length == SOCKET_ERROR) @@ -376,8 +357,7 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, * buffer for more data and retry the write before * waiting.. */ - if (repack_input_buffer(ptr) or - process_input_buffer(ptr)) + if (repack_input_buffer(ptr) or process_input_buffer(ptr)) { continue; } @@ -389,51 +369,48 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, } else if (rc == MEMCACHED_TIMEOUT) { - *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); - return -1; + error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return false; } memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - return -1; + error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + return false; } case ENOTCONN: case EPIPE: default: memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); - WATCHPOINT_ASSERT(ptr->fd == -1); - return -1; + error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET); + return false; } } - if (ptr->type == MEMCACHED_CONNECTION_UDP and - (size_t)sent_length != write_length) + if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length) { memcached_quit_server(ptr, true); - *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return -1; + error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + return false; } - ptr->io_bytes_sent += (uint32_t) sent_length; + ptr->io_bytes_sent+= uint32_t(sent_length); local_write_ptr+= sent_length; - write_length-= (uint32_t) sent_length; - return_length+= (uint32_t) sent_length; + write_length-= uint32_t(sent_length); } WATCHPOINT_ASSERT(write_length == 0); - // Need to study this assert() WATCHPOINT_ASSERT(return_length == - // ptr->write_buffer_offset); - - // if we are a udp server, the begining of the buffer is reserverd for - // the upd frame header - if (ptr->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(ptr->root)) + { ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; + } else + { ptr->write_buffer_offset= 0; + } - return (ssize_t) return_length; + return true; } memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) @@ -449,7 +426,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, if (ptr->fd == INVALID_SOCKET) { +#if 0 assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state"); +#endif return MEMCACHED_CONNECTION_FAILURE; } @@ -611,13 +590,10 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) static ssize_t _io_write(memcached_server_write_instance_st ptr, const void *buffer, size_t length, bool with_flush) { - size_t original_length; - const char* buffer_ptr; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - original_length= length; - buffer_ptr= static_cast(buffer); + size_t original_length= length; + const char *buffer_ptr= static_cast(buffer); while (length) { @@ -625,7 +601,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, size_t should_write; size_t buffer_end; - if (ptr->type == MEMCACHED_CONNECTION_UDP) + if (memcached_is_udp(ptr->root)) { //UDP does not support partial writes buffer_end= MAX_UDP_DATAGRAM_LENGTH; @@ -648,22 +624,15 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) + if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); memcached_return_t rc; - ssize_t sent_length= io_flush(ptr, with_flush, &rc); - if (sent_length == -1) + if (io_flush(ptr, with_flush, rc) == false) { return -1; } - - /* If io_flush calls memcached_purge, sent_length may be 0 */ - unlikely (sent_length != 0) - { - WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end); - } } } @@ -671,7 +640,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, { memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (io_flush(ptr, with_flush, &rc) == -1) + if (io_flush(ptr, with_flush, rc) == false) { return -1; } @@ -680,12 +649,29 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, return (ssize_t) original_length; } +bool memcached_io_write(memcached_server_write_instance_st ptr) +{ + return (_io_write(ptr, NULL, 0, true) >= 0); +} + ssize_t memcached_io_write(memcached_server_write_instance_st ptr, - const void *buffer, size_t length, bool with_flush) + const void *buffer, const size_t length, const bool with_flush) { return _io_write(ptr, buffer, length, with_flush); } +size_t io_vector_total_size(libmemcached_io_vector_st* vector, const size_t number_of) +{ + ssize_t total= 0; + + for (size_t x= 0; x < number_of; x++) + { + total+= vector->length; + } + + return total; +} + ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, const struct libmemcached_io_vector_st *vector, size_t number_of, bool with_flush) @@ -696,16 +682,19 @@ ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, { ssize_t returnable; - if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + if (vector->length) { - return -1; + if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + { + return -1; + } + total+= returnable; } - total+= returnable; } if (with_flush) { - if (memcached_io_write(ptr, NULL, 0, true) == -1) + if (memcached_io_write(ptr) == false) { return -1; } @@ -746,11 +735,12 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, x); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ + { return instance; + } if (memcached_server_response_count(instance) > 0) { @@ -821,7 +811,7 @@ void memcached_io_reset(memcached_server_write_instance_st ptr) */ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, void *dta, - size_t size) + const size_t size) { size_t offset= 0; char *data= static_cast(dta); @@ -852,7 +842,7 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, total_nr= 0; bool line_complete= false; - while (not line_complete) + while (line_complete == false) { if (ptr->read_buffer_length == 0) { @@ -874,7 +864,9 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, } if (*buffer_ptr == '\n') + { line_complete= true; + } ++buffer_ptr; ++total_nr; @@ -885,7 +877,9 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, { *buffer_ptr = *ptr->read_ptr; if (*buffer_ptr == '\n') + { line_complete = true; + } --ptr->read_buffer_length; ++ptr->read_ptr; ++total_nr; @@ -893,21 +887,10 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, } if (total_nr == size) + { return MEMCACHED_PROTOCOL_ERROR; + } } return MEMCACHED_SUCCESS; } - -memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id) -{ - if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) - return MEMCACHED_FAILURE; - - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id))); - header->num_datagrams= htons(1); - header->sequence_number= htons(0); - - return MEMCACHED_SUCCESS; -}