X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=48c0de6c7c2d81620d8c3d5d1cbd2dae80b3e382;hb=0a1ee4b1b5307c14168780fde95431e92311c89a;hp=ae86fa217b9f2902f3b64db4939ce685b4f06853;hpb=70f02eac18862c95ebe45e1b410904910281be6c;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index ae86fa21..48c0de6c 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -81,10 +81,10 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) case EINTR: continue; +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -131,7 +131,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) ** We might be able to process some of the response messages if we ** have a callback set up */ - if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false) + if (ptr->root->callbacks != NULL) { /* * We might have responses... try to read them out and fire @@ -142,10 +142,8 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) memcached_set_processing_input((memcached_st *)ptr->root, true); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - memcached_return_t error; memcached_st *root= (memcached_st *)ptr->root; - error= memcached_response(ptr, buffer, sizeof(buffer), - &root->result); + memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result); memcached_set_processing_input(root, false); @@ -296,18 +294,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - // UDP Sanity check, make sure that we are not sending somthing too big - if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH) - { - error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return false; - } - - if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) - { - return true; - } - /* Looking for memory overflows */ #if defined(DEBUG) if (write_length == MEMCACHED_MAX_BUFFER) @@ -319,10 +305,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); - if (memcached_is_udp(ptr->root)) - { - increment_udp_message_id(ptr); - } ssize_t sent_length= 0; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -337,7 +319,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, if (sent_length == SOCKET_ERROR) { - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); #if 0 // @todo I should look at why we hit this bit of code hard frequently WATCHPOINT_ERRNO(get_socket_errno()); WATCHPOINT_NUMBER(get_socket_errno()); @@ -346,10 +327,11 @@ static bool io_flush(memcached_server_write_instance_st ptr, { case ENOBUFS: continue; + +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: { /* * We may be blocked on write because the input buffer @@ -387,13 +369,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, } } - if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length) - { - memcached_quit_server(ptr, true); - error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return false; - } - ptr->io_bytes_sent+= uint32_t(sent_length); local_write_ptr+= sent_length; @@ -401,14 +376,7 @@ static bool io_flush(memcached_server_write_instance_st ptr, } WATCHPOINT_ASSERT(write_length == 0); - if (memcached_is_udp(ptr->root)) - { - ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; - } - else - { - ptr->write_buffer_offset= 0; - } + ptr->write_buffer_offset= 0; return true; } @@ -419,8 +387,9 @@ memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_s } memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, - void *buffer, size_t length, ssize_t *nread) + void *buffer, size_t length, ssize_t& nread) { + assert(memcached_is_udp(ptr->root) == false); assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error char *buffer_ptr= static_cast(buffer); @@ -448,10 +417,10 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, continue; case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -475,7 +444,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, default: { memcached_quit_server(ptr, true); - *nread= -1; + nread= -1; return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } } @@ -493,8 +462,8 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, */ WATCHPOINT_STRING("We had a zero length recv()"); memcached_quit_server(ptr, true); - *nread= -1; - return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT, + nread= -1; + return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, memcached_literal_param("::rec() returned zero, server has disconnected")); } } while (data_read <= 0); @@ -527,7 +496,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, } } - *nread = (ssize_t)(buffer_ptr - (char*)buffer); + nread= ssize_t(buffer_ptr - (char*)buffer); return MEMCACHED_SUCCESS; } @@ -535,6 +504,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) { assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st"); + assert(memcached_is_udp(ptr->root) == false); if (ptr->fd == INVALID_SOCKET) { @@ -555,10 +525,10 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) continue; case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -592,6 +562,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, const void *buffer, size_t length, bool with_flush) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + assert(memcached_is_udp(ptr->root) == false); size_t original_length= length; const char *buffer_ptr= static_cast(buffer); @@ -599,25 +570,9 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, while (length) { char *write_ptr; - size_t should_write; - size_t buffer_end; - - if (memcached_is_udp(ptr->root)) - { - //UDP does not support partial writes - buffer_end= MAX_UDP_DATAGRAM_LENGTH; - should_write= length; - if (ptr->write_buffer_offset + should_write > buffer_end) - { - return -1; - } - } - else - { - buffer_end= MEMCACHED_MAX_BUFFER; - should_write= buffer_end - ptr->write_buffer_offset; - should_write= (should_write < length) ? should_write : length; - } + size_t buffer_end= MEMCACHED_MAX_BUFFER; + size_t should_write= buffer_end -ptr->write_buffer_offset; + should_write= (should_write < length) ? should_write : length; write_ptr= ptr->write_buffer + ptr->write_buffer_offset; memcpy(write_ptr, buffer_ptr, should_write); @@ -625,7 +580,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false) + if (ptr->write_buffer_offset == buffer_end) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -661,21 +616,9 @@ ssize_t memcached_io_write(memcached_server_write_instance_st ptr, return _io_write(ptr, buffer, length, with_flush); } -size_t io_vector_total_size(libmemcached_io_vector_st* vector, const size_t number_of) -{ - ssize_t total= 0; - - for (size_t x= 0; x < number_of; x++) - { - total+= vector->length; - } - - return total; -} - ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, - const struct libmemcached_io_vector_st *vector, - size_t number_of, bool with_flush) + libmemcached_io_vector_st vector[], + const size_t number_of, const bool with_flush) { ssize_t total= 0; @@ -734,7 +677,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st struct pollfd fds[MAX_SERVERS_TO_POLL]; unsigned int host_index= 0; - for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) + for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); @@ -779,17 +722,18 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st break; default: - for (size_t x= 0; x < host_index; ++x) + for (ptrdiff_t x= 0; x < host_index; ++x) { if (fds[x].revents & POLLIN) { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, y); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y); if (instance->fd == fds[x].fd) + { return instance; + } } } } @@ -822,14 +766,14 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, ssize_t nread; memcached_return_t rc; - while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { }; + while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, nread))) { }; if (memcached_failed(rc)) { return rc; } - offset+= (size_t) nread; + offset+= size_t(nread); } return MEMCACHED_SUCCESS; @@ -853,7 +797,7 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, * the logic. */ ssize_t nread; - memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread); + memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, nread); if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) { memcached_quit_server(ptr, true);