From: Brian Aker Date: Tue, 27 Dec 2011 18:31:38 +0000 (-0800) Subject: Updating for simplified IO flush. X-Git-Tag: 1.0.3~3^2~1^2~10 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=603d93c1346162f4f9d2e3065e5120e948139dda;p=m6w6%2Flibmemcached Updating for simplified IO flush. --- diff --git a/libmemcached/delete.cc b/libmemcached/delete.cc index 9846bb51..544ef49e 100644 --- a/libmemcached/delete.cc +++ b/libmemcached/delete.cc @@ -72,7 +72,7 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr, if (send_length +instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) { - memcached_io_write(instance, NULL, 0, true); + memcached_io_write(instance); } } @@ -113,7 +113,7 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, if (cmd_size +instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) { - memcached_io_write(instance, NULL, 0, true); + memcached_io_write(instance); } } diff --git a/libmemcached/do.cc b/libmemcached/do.cc index ee777d3e..928e3272 100644 --- a/libmemcached/do.cc +++ b/libmemcached/do.cc @@ -35,7 +35,7 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr, **/ if (memcached_is_udp(ptr->root) and with_flush and ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH) { - if (memcached_io_write(ptr, NULL, 0, true) == -1) + if (memcached_io_write(ptr) == false) { memcached_io_reset(ptr); return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); diff --git a/libmemcached/flush_buffers.cc b/libmemcached/flush_buffers.cc index 91d90349..58260fc1 100644 --- a/libmemcached/flush_buffers.cc +++ b/libmemcached/flush_buffers.cc @@ -55,7 +55,7 @@ memcached_return_t memcached_flush_buffers(memcached_st *memc) return ret; } - if (memcached_io_write(instance, NULL, 0, true) == -1) + if (memcached_io_write(instance) == false) { ret= MEMCACHED_SOME_ERRORS; } diff --git a/libmemcached/get.cc b/libmemcached/get.cc index 25d6419d..e478f0ba 100644 --- a/libmemcached/get.cc +++ b/libmemcached/get.cc @@ -260,11 +260,13 @@ static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr, if (ptr->flags.no_block) { - (void)memcached_io_write(instance, NULL, 0, true); + memcached_io_write(instance); } while(memcached_server_response_count(instance)) + { (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result); + } } } @@ -541,12 +543,11 @@ static memcached_return_t simple_binary_mget(memcached_st *ptr, for (uint32_t x= 0; x < memcached_server_count(ptr); ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(ptr, x); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x); if (memcached_server_response_count(instance)) { - if (memcached_io_write(instance, NULL, 0, true) == -1) + if (memcached_io_write(instance) == false) { memcached_server_response_reset(instance); memcached_io_reset(instance); diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 766a2557..5e626b6a 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -296,7 +296,7 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, static bool io_flush(memcached_server_write_instance_st ptr, const bool with_flush, - memcached_return_t *error) + memcached_return_t& error) { /* ** We might want to purge the input buffer if we haven't consumed @@ -313,18 +313,17 @@ static bool io_flush(memcached_server_write_instance_st ptr, return false; } } - size_t return_length; char *local_write_ptr= ptr->write_buffer; size_t write_length= ptr->write_buffer_offset; - *error= MEMCACHED_SUCCESS; + error= MEMCACHED_SUCCESS; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); // UDP Sanity check, make sure that we are not sending somthing too big if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH) { - *error= MEMCACHED_WRITE_FAILURE; + error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); return false; } @@ -340,7 +339,6 @@ static bool io_flush(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); #endif - return_length= 0; while (write_length) { WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); @@ -395,19 +393,19 @@ static bool io_flush(memcached_server_write_instance_st ptr, } else if (rc == MEMCACHED_TIMEOUT) { - *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); return false; } memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); return false; } case ENOTCONN: case EPIPE: default: memcached_quit_server(ptr, true); - *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET); return false; } @@ -416,7 +414,7 @@ static bool io_flush(memcached_server_write_instance_st ptr, if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length) { memcached_quit_server(ptr, true); - *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); return false; } @@ -424,15 +422,9 @@ static bool io_flush(memcached_server_write_instance_st ptr, local_write_ptr+= sent_length; write_length-= uint32_t(sent_length); - return_length+= uint32_t(sent_length); } WATCHPOINT_ASSERT(write_length == 0); - // Need to study this assert() WATCHPOINT_ASSERT(return_length == - // ptr->write_buffer_offset); - - // if we are a udp server, the begining of the buffer is reserverd for - // the upd frame header if (memcached_is_udp(ptr->root)) { ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; @@ -661,7 +653,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); memcached_return_t rc; - if (io_flush(ptr, with_flush, &rc) == false) + if (io_flush(ptr, with_flush, rc) == false) { return -1; } @@ -672,7 +664,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, { memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (io_flush(ptr, with_flush, &rc) == false) + if (io_flush(ptr, with_flush, rc) == false) { return -1; } @@ -681,8 +673,13 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, return (ssize_t) original_length; } +bool memcached_io_write(memcached_server_write_instance_st ptr) +{ + return (_io_write(ptr, NULL, 0, true) >= 0); +} + ssize_t memcached_io_write(memcached_server_write_instance_st ptr, - const void *buffer, size_t length, bool with_flush) + const void *buffer, const size_t length, const bool with_flush) { return _io_write(ptr, buffer, length, with_flush); } @@ -721,7 +718,7 @@ ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, if (with_flush) { - if (memcached_io_write(ptr, NULL, 0, true) == -1) + if (memcached_io_write(ptr) == false) { return -1; } @@ -762,11 +759,12 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) { - memcached_server_write_instance_st instance= - memcached_server_instance_fetch(memc, x); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ + { return instance; + } if (memcached_server_response_count(instance) > 0) { diff --git a/libmemcached/io.h b/libmemcached/io.h index f1d2a450..00bd0c11 100644 --- a/libmemcached/io.h +++ b/libmemcached/io.h @@ -63,26 +63,3 @@ struct libmemcached_io_vector_st const void *buffer; size_t length; }; - -#ifdef __cplusplus -extern "C" { -#endif - -LIBMEMCACHED_LOCAL -ssize_t memcached_io_write(memcached_server_write_instance_st ptr, - const void *buffer, size_t length, bool with_flush); - -LIBMEMCACHED_LOCAL -ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, - const struct libmemcached_io_vector_st *vector, - size_t number_of, bool with_flush); - -#ifdef __cplusplus -} -#endif - -#ifdef __cplusplus - -size_t io_vector_total_size(libmemcached_io_vector_st* vector, const size_t number_of); - -#endif diff --git a/libmemcached/io.hpp b/libmemcached/io.hpp index 23a4a6b4..af18fe3a 100644 --- a/libmemcached/io.hpp +++ b/libmemcached/io.hpp @@ -38,38 +38,40 @@ #pragma once -LIBMEMCACHED_LOCAL +size_t io_vector_total_size(libmemcached_io_vector_st* vector, const size_t number_of); + +bool memcached_io_write(memcached_server_write_instance_st ptr); + +ssize_t memcached_io_write(memcached_server_write_instance_st ptr, + const void *buffer, size_t length, bool with_flush); + +ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, + const struct libmemcached_io_vector_st *vector, + size_t number_of, bool with_flush); + memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr); -LIBMEMCACHED_LOCAL void memcached_io_reset(memcached_server_write_instance_st ptr); -LIBMEMCACHED_LOCAL memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, void *buffer, size_t length, ssize_t *nread); /* Read a line (terminated by '\n') into the buffer */ -LIBMEMCACHED_LOCAL memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, char *buffer_ptr, size_t size, size_t& total); -LIBMEMCACHED_LOCAL void memcached_io_close(memcached_server_write_instance_st ptr); /* Read n bytes of data from the server and store them in dta */ -LIBMEMCACHED_LOCAL memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, void *dta, size_t size); -LIBMEMCACHED_LOCAL memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id); -LIBMEMCACHED_LOCAL memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc); -LIBMEMCACHED_LOCAL memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr); diff --git a/libmemcached/purge.cc b/libmemcached/purge.cc index b0f864a8..86e379e4 100644 --- a/libmemcached/purge.cc +++ b/libmemcached/purge.cc @@ -54,14 +54,18 @@ memcached_return_t memcached_purge(memcached_server_write_instance_st ptr) return MEMCACHED_SUCCESS; } - /* memcached_io_write and memcached_response may call memcached_purge - so we need to be able stop any recursion.. */ + /* + memcached_io_write and memcached_response may call memcached_purge + so we need to be able stop any recursion.. + */ memcached_set_purging(root, true); WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - /* Force a flush of the buffer to ensure that we don't have the n-1 pending - requests buffered up.. */ - if (memcached_io_write(ptr, NULL, 0, true) == -1) + /* + Force a flush of the buffer to ensure that we don't have the n-1 pending + requests buffered up.. + */ + if (memcached_io_write(ptr) == false) { memcached_set_purging(root, true); diff --git a/libmemcached/response.cc b/libmemcached/response.cc index 35a9f74d..b996ee10 100644 --- a/libmemcached/response.cc +++ b/libmemcached/response.cc @@ -707,7 +707,7 @@ memcached_return_t memcached_response(memcached_server_write_instance_st ptr, /* We may have old commands in the buffer not set, first purge */ if ((ptr->root->flags.no_block) && (memcached_is_processing_input(ptr->root) == false)) { - (void)memcached_io_write(ptr, NULL, 0, true); + (void)memcached_io_write(ptr); } /* diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index a7f68f0f..041575b6 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -171,7 +171,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, } if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) { - memcached_io_write(server, NULL, 0, true); + memcached_io_write(server); } } @@ -314,7 +314,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH) { - memcached_io_write(instance, NULL, 0, true); + memcached_io_write(instance); } }