X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;ds=sidebyside;f=libmemcached%2Fio.cc;h=049c436dbcb7f24e12a2511c32b00e88b192bade;hb=b43d7591dbe31dedd9fed41f01d07e8a3412582b;hp=7a321adeed1f4ee7ca6a38b57b43f84b7595105e;hpb=2959e927ae7c1da490db7a88f0cd589e8bc6b45f;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 7a321ade..049c436d 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -81,10 +81,10 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) case EINTR: continue; +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -131,7 +131,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) ** We might be able to process some of the response messages if we ** have a callback set up */ - if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false) + if (ptr->root->callbacks != NULL) { /* * We might have responses... try to read them out and fire @@ -142,10 +142,8 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) memcached_set_processing_input((memcached_st *)ptr->root, true); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - memcached_return_t error; memcached_st *root= (memcached_st *)ptr->root; - error= memcached_response(ptr, buffer, sizeof(buffer), - &root->result); + memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result); memcached_set_processing_input(root, false); @@ -194,8 +192,7 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, */ if (read_or_write == MEM_WRITE) { - memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + if (memcached_fatal(memcached_purge(ptr))) { return MEMCACHED_FAILURE; } @@ -280,11 +277,10 @@ static bool io_flush(memcached_server_write_instance_st ptr, ** in the purge function to avoid duplicating the logic.. */ { - memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - rc= memcached_purge(ptr); + memcached_return_t rc= memcached_purge(ptr); - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + if (rc != MEMCACHED_SUCCESS and rc != MEMCACHED_STORED) { return false; } @@ -296,18 +292,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - // UDP Sanity check, make sure that we are not sending somthing too big - if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH) - { - error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return false; - } - - if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) - { - return true; - } - /* Looking for memory overflows */ #if defined(DEBUG) if (write_length == MEMCACHED_MAX_BUFFER) @@ -319,10 +303,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); - if (memcached_is_udp(ptr->root)) - { - increment_udp_message_id(ptr); - } ssize_t sent_length= 0; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -337,7 +317,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, if (sent_length == SOCKET_ERROR) { - memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); #if 0 // @todo I should look at why we hit this bit of code hard frequently WATCHPOINT_ERRNO(get_socket_errno()); WATCHPOINT_NUMBER(get_socket_errno()); @@ -346,10 +325,11 @@ static bool io_flush(memcached_server_write_instance_st ptr, { case ENOBUFS: continue; + +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: { /* * We may be blocked on write because the input buffer @@ -387,13 +367,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, } } - if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length) - { - memcached_quit_server(ptr, true); - error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); - return false; - } - ptr->io_bytes_sent+= uint32_t(sent_length); local_write_ptr+= sent_length; @@ -401,14 +374,7 @@ static bool io_flush(memcached_server_write_instance_st ptr, } WATCHPOINT_ASSERT(write_length == 0); - if (memcached_is_udp(ptr->root)) - { - ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; - } - else - { - ptr->write_buffer_offset= 0; - } + ptr->write_buffer_offset= 0; return true; } @@ -421,6 +387,7 @@ memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_s memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, void *buffer, size_t length, ssize_t& nread) { + assert(memcached_is_udp(ptr->root) == false); assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error char *buffer_ptr= static_cast(buffer); @@ -448,10 +415,10 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, continue; case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -535,6 +502,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) { assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st"); + assert(memcached_is_udp(ptr->root) == false); if (ptr->fd == INVALID_SOCKET) { @@ -555,10 +523,10 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) continue; case ETIMEDOUT: // OSX +#if EWOULDBLOCK != EAGAIN case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: #endif + case EAGAIN: #ifdef TARGET_OS_LINUX case ERESTART: #endif @@ -592,6 +560,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, const void *buffer, size_t length, bool with_flush) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + assert(memcached_is_udp(ptr->root) == false); size_t original_length= length; const char *buffer_ptr= static_cast(buffer); @@ -599,25 +568,9 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, while (length) { char *write_ptr; - size_t should_write; - size_t buffer_end; - - if (memcached_is_udp(ptr->root)) - { - //UDP does not support partial writes - buffer_end= MAX_UDP_DATAGRAM_LENGTH; - should_write= length; - if (ptr->write_buffer_offset + should_write > buffer_end) - { - return -1; - } - } - else - { - buffer_end= MEMCACHED_MAX_BUFFER; - should_write= buffer_end - ptr->write_buffer_offset; - should_write= (should_write < length) ? should_write : length; - } + size_t buffer_end= MEMCACHED_MAX_BUFFER; + size_t should_write= buffer_end -ptr->write_buffer_offset; + should_write= (should_write < length) ? should_write : length; write_ptr= ptr->write_buffer + ptr->write_buffer_offset; memcpy(write_ptr, buffer_ptr, should_write); @@ -625,7 +578,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, buffer_ptr+= should_write; length-= should_write; - if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false) + if (ptr->write_buffer_offset == buffer_end) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -720,9 +673,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; - unsigned int host_index= 0; + nfds_t host_index= 0; - for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) + for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) { memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); @@ -767,17 +720,18 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st break; default: - for (size_t x= 0; x < host_index; ++x) + for (nfds_t x= 0; x < host_index; ++x) { if (fds[x].revents & POLLIN) { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, y); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y); if (instance->fd == fds[x].fd) + { return instance; + } } } }