From c4b0fc53b348067c6a6e5eaf335869393cf347bf Mon Sep 17 00:00:00 2001 From: Trond Norbye Date: Wed, 22 Oct 2008 22:08:15 +0200 Subject: [PATCH] Purge the input buffer if I discover a lot of commands being sent and none read (to avoid deadlock) --- docs/memcached_behavior.pod | 15 +++++++++++ libmemcached/Makefile.am | 1 + libmemcached/memcached.c | 3 +++ libmemcached/memcached.h | 3 +++ libmemcached/memcached_behavior.c | 12 +++++++++ libmemcached/memcached_constants.h | 4 ++- libmemcached/memcached_io.c | 21 +++++++++++++++ libmemcached/memcached_io.h | 2 +- libmemcached/memcached_purge.c | 42 ++++++++++++++++++++++++++++++ libmemcached/memcached_server.h | 1 + 10 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 libmemcached/memcached_purge.c diff --git a/docs/memcached_behavior.pod b/docs/memcached_behavior.pod index 51e09e71..2dc214ae 100755 --- a/docs/memcached_behavior.pod +++ b/docs/memcached_behavior.pod @@ -144,6 +144,21 @@ this flag on an open connection. Set this value to enable the server be removed after continuous MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT times connection failure. +=item MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK + +Set this value to tune the number of messages that may be sent before +libmemcached should start to automatically drain the input queue. Setting +this value to high, may cause libmemcached to deadlock (trying to send data, +but the send will block because the input buffer in the kernel is full). + +=item MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK + +Set this value to tune the number of bytes that may be sent before +libmemcached should start to automatically drain the input queue (need +at least 10 IO requests sent without reading the input buffer). Setting +this value to high, may cause libmemcached to deadlock (trying to send +data, but the send will block because the input buffer in the kernel is full). + =back =head1 RETURN diff --git a/libmemcached/Makefile.am b/libmemcached/Makefile.am index fc272120..bb6cfa0b 100644 --- a/libmemcached/Makefile.am +++ b/libmemcached/Makefile.am @@ -54,6 +54,7 @@ libmemcached_la_SOURCES = crc.c \ memcached_hash.c \ memcached_hosts.c \ memcached_io.c \ + memcached_purge.c \ md5.c \ memcached_key.c \ memcached_quit.c \ diff --git a/libmemcached/memcached.c b/libmemcached/memcached.c index d5bfd190..accf24b7 100644 --- a/libmemcached/memcached.c +++ b/libmemcached/memcached.c @@ -28,6 +28,9 @@ memcached_st *memcached_create(memcached_st *ptr) ptr->retry_timeout= 0; ptr->distribution= MEMCACHED_DISTRIBUTION_MODULA; + ptr->io_msg_watermark = 500; + ptr->io_bytes_watermark = 65 * 1024; + return ptr; } diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index 3b549ebc..f12975f6 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -100,6 +100,9 @@ struct memcached_st { int32_t snd_timeout; int32_t rcv_timeout; uint32_t server_failure_limit; + uint32_t io_msg_watermark; + uint32_t io_bytes_watermark; + char purging; }; diff --git a/libmemcached/memcached_behavior.c b/libmemcached/memcached_behavior.c index ef4907dd..0cd42efc 100644 --- a/libmemcached/memcached_behavior.c +++ b/libmemcached/memcached_behavior.c @@ -23,6 +23,12 @@ memcached_return memcached_behavior_set(memcached_st *ptr, { switch (flag) { + case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK: + ptr->io_msg_watermark= (int32_t)data; + break; + case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK: + ptr->io_bytes_watermark= (int32_t)data; + break; case MEMCACHED_BEHAVIOR_SND_TIMEOUT: ptr->snd_timeout= (int32_t)data; break; @@ -130,6 +136,12 @@ uint64_t memcached_behavior_get(memcached_st *ptr, switch (flag) { + case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK: + temp_flag = ptr->io_msg_watermark; + break; + case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK: + temp_flag = ptr->io_bytes_watermark; + break; case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL: temp_flag= MEM_BINARY_PROTOCOL; break; diff --git a/libmemcached/memcached_constants.h b/libmemcached/memcached_constants.h index 47774d37..fb8305f4 100644 --- a/libmemcached/memcached_constants.h +++ b/libmemcached/memcached_constants.h @@ -91,7 +91,9 @@ typedef enum { MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, MEMCACHED_BEHAVIOR_SND_TIMEOUT, MEMCACHED_BEHAVIOR_RCV_TIMEOUT, - MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT + MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT, + MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK, + MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK } memcached_behavior; typedef enum { diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index 47486d30..ddca1972 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -30,6 +30,17 @@ static memcached_return io_wait(memcached_server_st *ptr, fds[0].fd= ptr->fd; fds[0].events= flags; + /* + ** We are going to block on write, but at least on Solaris we might block + ** on write if we haven't read anything from our input buffer.. + ** Try to purge the input buffer if we don't do any flow control in the + ** application layer (just sending a lot of data etc) + ** The test is moved down in the purge function to avoid duplication of + ** the test. + */ + if (read_or_write == MEM_WRITE) + memcached_purge(ptr); + error= poll(fds, 1, ptr->root->poll_timeout); if (error == 1) @@ -124,6 +135,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr, } } + ptr->io_bytes_sent = 0; ptr->read_data_length= data_read; ptr->read_buffer_length= data_read; ptr->read_ptr= ptr->read_buffer; @@ -280,6 +292,13 @@ static ssize_t io_flush(memcached_server_st *ptr, } else { + /* + ** We might want to purge the input buffer if we haven't consumed + ** any output yet... The test for the limits is the purge is inline + ** in the purge function to avoid duplicating the logic.. + */ + memcached_purge(ptr); + if ((sent_length= write(ptr->fd, local_write_ptr, write_length)) == -1) { @@ -307,6 +326,8 @@ static ssize_t io_flush(memcached_server_st *ptr, } } + ptr->io_bytes_sent += sent_length; + local_write_ptr+= sent_length; write_length-= sent_length; return_length+= sent_length; diff --git a/libmemcached/memcached_io.h b/libmemcached/memcached_io.h index ac0ffdf8..1da530ee 100644 --- a/libmemcached/memcached_io.h +++ b/libmemcached/memcached_io.h @@ -10,5 +10,5 @@ void memcached_io_reset(memcached_server_st *ptr); ssize_t memcached_io_read(memcached_server_st *ptr, void *buffer, size_t length); memcached_return memcached_io_close(memcached_server_st *ptr); - +void memcached_purge(memcached_server_st *ptr); #endif /* __MEMCACHED_IO_H__ */ diff --git a/libmemcached/memcached_purge.c b/libmemcached/memcached_purge.c new file mode 100644 index 00000000..d9d43057 --- /dev/null +++ b/libmemcached/memcached_purge.c @@ -0,0 +1,42 @@ +#include + +#include "common.h" +#include "memcached_io.h" + +void memcached_purge(memcached_server_st *ptr) +{ + char buffer[2048]; + size_t buffer_length = sizeof(buffer); + memcached_result_st result; + + if (ptr->root->purging || /* already purging */ + (memcached_server_response_count(ptr) < ptr->root->io_msg_watermark && + ptr->io_bytes_sent < ptr->root->io_bytes_watermark) || + (ptr->io_bytes_sent > ptr->root->io_bytes_watermark && + memcached_server_response_count(ptr) < 10)) { + return; + } + + /* memcached_io_write and memcached_response may call memcached_purge + so we need to be able stop any recursion.. */ + ptr->root->purging = 1; + + /* Force a flush of the buffer to ensure that we don't have the n-1 pending + requests buffered up.. */ + memcached_io_write(ptr, NULL, 0, 1); + + /* we have already incremented the response counter, and memcached_response + will read out all messages.. To avoid memcached_response to wait forever + for a response to a command I have in my buffer, let's decrement the + response counter :) */ + memcached_server_response_decrement(ptr); + + /* memcached_response may call memcached_io_read, but let's use a short + timeout if there is no data yet */ + int32_t timeout = ptr->root->poll_timeout; + ptr->root->poll_timeout = 1; + memcached_response(ptr, buffer, sizeof(buffer), &result); + ptr->root->poll_timeout = timeout; + memcached_server_response_increment(ptr); + ptr->root->purging = 0; +} diff --git a/libmemcached/memcached_server.h b/libmemcached/memcached_server.h index 891c9c73..d5ed2692 100644 --- a/libmemcached/memcached_server.h +++ b/libmemcached/memcached_server.h @@ -37,6 +37,7 @@ struct memcached_server_st { memcached_st *root; uint64_t limit_maxbytes; uint32_t server_failure_counter; + uint32_t io_bytes_sent; /* # bytes sent since last read */ }; #define memcached_server_count(A) (A)->number_of_hosts -- 2.30.2