Set this value to enable the server be removed after continuous MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT
times connection failure.
+=item MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK
+
+Set this value to tune the number of messages that may be sent before
+libmemcached should start to automatically drain the input queue. Setting
+this value to high, may cause libmemcached to deadlock (trying to send data,
+but the send will block because the input buffer in the kernel is full).
+
+=item MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK
+
+Set this value to tune the number of bytes that may be sent before
+libmemcached should start to automatically drain the input queue (need
+at least 10 IO requests sent without reading the input buffer). Setting
+this value to high, may cause libmemcached to deadlock (trying to send
+data, but the send will block because the input buffer in the kernel is full).
+
=back
=head1 RETURN
memcached_hash.c \
memcached_hosts.c \
memcached_io.c \
+ memcached_purge.c \
md5.c \
memcached_key.c \
memcached_quit.c \
ptr->retry_timeout= 0;
ptr->distribution= MEMCACHED_DISTRIBUTION_MODULA;
+ ptr->io_msg_watermark = 500;
+ ptr->io_bytes_watermark = 65 * 1024;
+
return ptr;
}
int32_t snd_timeout;
int32_t rcv_timeout;
uint32_t server_failure_limit;
+ uint32_t io_msg_watermark;
+ uint32_t io_bytes_watermark;
+ char purging;
};
{
switch (flag)
{
+ case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
+ ptr->io_msg_watermark= (int32_t)data;
+ break;
+ case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
+ ptr->io_bytes_watermark= (int32_t)data;
+ break;
case MEMCACHED_BEHAVIOR_SND_TIMEOUT:
ptr->snd_timeout= (int32_t)data;
break;
switch (flag)
{
+ case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
+ temp_flag = ptr->io_msg_watermark;
+ break;
+ case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
+ temp_flag = ptr->io_bytes_watermark;
+ break;
case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
temp_flag= MEM_BINARY_PROTOCOL;
break;
MEMCACHED_BEHAVIOR_BINARY_PROTOCOL,
MEMCACHED_BEHAVIOR_SND_TIMEOUT,
MEMCACHED_BEHAVIOR_RCV_TIMEOUT,
- MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT
+ MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT,
+ MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK,
+ MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK
} memcached_behavior;
typedef enum {
fds[0].fd= ptr->fd;
fds[0].events= flags;
+ /*
+ ** We are going to block on write, but at least on Solaris we might block
+ ** on write if we haven't read anything from our input buffer..
+ ** Try to purge the input buffer if we don't do any flow control in the
+ ** application layer (just sending a lot of data etc)
+ ** The test is moved down in the purge function to avoid duplication of
+ ** the test.
+ */
+ if (read_or_write == MEM_WRITE)
+ memcached_purge(ptr);
+
error= poll(fds, 1, ptr->root->poll_timeout);
if (error == 1)
}
}
+ ptr->io_bytes_sent = 0;
ptr->read_data_length= data_read;
ptr->read_buffer_length= data_read;
ptr->read_ptr= ptr->read_buffer;
}
else
{
+ /*
+ ** We might want to purge the input buffer if we haven't consumed
+ ** any output yet... The test for the limits is the purge is inline
+ ** in the purge function to avoid duplicating the logic..
+ */
+ memcached_purge(ptr);
+
if ((sent_length= write(ptr->fd, local_write_ptr,
write_length)) == -1)
{
}
}
+ ptr->io_bytes_sent += sent_length;
+
local_write_ptr+= sent_length;
write_length-= sent_length;
return_length+= sent_length;
ssize_t memcached_io_read(memcached_server_st *ptr,
void *buffer, size_t length);
memcached_return memcached_io_close(memcached_server_st *ptr);
-
+void memcached_purge(memcached_server_st *ptr);
#endif /* __MEMCACHED_IO_H__ */
--- /dev/null
+#include <assert.h>
+
+#include "common.h"
+#include "memcached_io.h"
+
+void memcached_purge(memcached_server_st *ptr)
+{
+ char buffer[2048];
+ size_t buffer_length = sizeof(buffer);
+ memcached_result_st result;
+
+ if (ptr->root->purging || /* already purging */
+ (memcached_server_response_count(ptr) < ptr->root->io_msg_watermark &&
+ ptr->io_bytes_sent < ptr->root->io_bytes_watermark) ||
+ (ptr->io_bytes_sent > ptr->root->io_bytes_watermark &&
+ memcached_server_response_count(ptr) < 10)) {
+ return;
+ }
+
+ /* memcached_io_write and memcached_response may call memcached_purge
+ so we need to be able stop any recursion.. */
+ ptr->root->purging = 1;
+
+ /* Force a flush of the buffer to ensure that we don't have the n-1 pending
+ requests buffered up.. */
+ memcached_io_write(ptr, NULL, 0, 1);
+
+ /* we have already incremented the response counter, and memcached_response
+ will read out all messages.. To avoid memcached_response to wait forever
+ for a response to a command I have in my buffer, let's decrement the
+ response counter :) */
+ memcached_server_response_decrement(ptr);
+
+ /* memcached_response may call memcached_io_read, but let's use a short
+ timeout if there is no data yet */
+ int32_t timeout = ptr->root->poll_timeout;
+ ptr->root->poll_timeout = 1;
+ memcached_response(ptr, buffer, sizeof(buffer), &result);
+ ptr->root->poll_timeout = timeout;
+ memcached_server_response_increment(ptr);
+ ptr->root->purging = 0;
+}
memcached_st *root;
uint64_t limit_maxbytes;
uint32_t server_failure_counter;
+ uint32_t io_bytes_sent; /* # bytes sent since last read */
};
#define memcached_server_count(A) (A)->number_of_hosts