0.38
+ * Added MEMCACHED_BEHAVIOR_CORK.
+
* memslap now creates a configuration file at ~/.memslap.cnf
* memcached_purge() now calls any callbacks registered during get
This allows distributing read load to multiple servers with the expense of
more write traffic.
+=item MEMCACHED_BEHAVIOR_CORK
+
+Enable TCP_CORK behavior. This is only available as an option Linux.
+MEMCACHED_NO_SERVERS is returned if no servers are available to test with.
+MEMCACHED_NOT_SUPPORTED is returned if we were not able to determine
+if support was available. All other responses then MEMCACHED_SUCCESS
+report an error of some sort. This behavior also enables
+MEMCACHED_BEHAVIOR_TCP_NODELAY when set.
+
+
+=item MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE
+
+Find the current size of SO_SNDBUF. A value of 0 means either an error
+occured or no hosts were available. It is safe to assume system default
+if this occurs.
+
+=item MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE
+
+Find the current size of SO_RCVBUF. A value of 0 means either an error
+occured or no hosts were available. It is safe to assume system default
+if this occurs.
+
=back
=head1 RETURN
srandom((uint32_t) time(NULL));
ptr->flags.randomize_replica_read= set_flag(data);
break;
+ case MEMCACHED_BEHAVIOR_CORK:
+ {
+ memcached_server_instance_st *instance;
+ bool action= set_flag(data);
+
+ if (action == false)
+ {
+ ptr->flags.cork= set_flag(false);
+ return MEMCACHED_SUCCESS;
+ }
+
+ instance= memcached_server_instance_fetch(ptr, 0);
+ if (! instance)
+ return MEMCACHED_NO_SERVERS;
+
+
+ /* We just try the first host, and if it is down we return zero */
+ memcached_return_t rc;
+ rc= memcached_connect(instance);
+ if (rc != MEMCACHED_SUCCESS)
+ {
+ return rc;
+ }
+
+ /* Now we test! */
+ memcached_ternary_t enabled;
+ enabled= cork_switch(instance, true);
+
+ switch (enabled)
+ {
+ case MEM_FALSE:
+ return ptr->cached_errno ? MEMCACHED_ERRNO : MEMCACHED_FAILURE ;
+ case MEM_TRUE:
+ {
+ enabled= cork_switch(instance, false);
+
+ if (enabled == false) // Possible bug in OS?
+ {
+ memcached_quit_server(instance, false); // We should reset everything on this error.
+ return MEMCACHED_ERRNO; // Errno will be true because we will have already set it.
+ }
+ ptr->flags.cork= true;
+ ptr->flags.tcp_nodelay= true;
+ }
+ break;
+ case MEM_NOT:
+ default:
+ return MEMCACHED_NOT_SUPPORTED;
+ }
+ }
+ break;
case MEMCACHED_BEHAVIOR_MAX:
default:
/* Shouldn't get here */
return (uint64_t)ptr->rcv_timeout;
case MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE:
{
- int sock_size;
+ int sock_size= 0;
socklen_t sock_length= sizeof(int);
memcached_server_instance_st *instance;
+ if (ptr->send_size != -1) // If value is -1 then we are using the default
+ return (uint64_t) ptr->send_size;
+
instance= memcached_server_instance_fetch(ptr, 0);
- /* REFACTOR */
- /* We just try the first host, and if it is down we return zero */
- if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
- return 0;
+ if (instance) // If we have an instance we test, otherwise we just set and pray
+ {
+ /* REFACTOR */
+ /* We just try the first host, and if it is down we return zero */
+ if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
+ return 0;
- if (getsockopt(instance->fd, SOL_SOCKET,
- SO_SNDBUF, &sock_size, &sock_length))
- return 0; /* Zero means error */
+ if (getsockopt(instance->fd, SOL_SOCKET,
+ SO_SNDBUF, &sock_size, &sock_length))
+ return 0; /* Zero means error */
+ }
return (uint64_t) sock_size;
}
case MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE:
{
- int sock_size;
+ int sock_size= 0;
socklen_t sock_length= sizeof(int);
memcached_server_instance_st *instance;
+ if (ptr->recv_size != -1) // If value is -1 then we are using the default
+ return (uint64_t) ptr->recv_size;
+
instance= memcached_server_instance_fetch(ptr, 0);
/**
@note REFACTOR
*/
- /* We just try the first host, and if it is down we return zero */
- if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
- return 0;
+ if (instance)
+ {
+ /* We just try the first host, and if it is down we return zero */
+ if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
+ return 0;
- if (getsockopt(instance->fd, SOL_SOCKET,
- SO_RCVBUF, &sock_size, &sock_length))
- return 0; /* Zero means error */
+ if (getsockopt(instance->fd, SOL_SOCKET,
+ SO_RCVBUF, &sock_size, &sock_length))
+ return 0; /* Zero means error */
+
+ }
return (uint64_t) sock_size;
}
return ptr->flags.auto_eject_hosts;
case MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ:
return ptr->flags.randomize_replica_read;
+ case MEMCACHED_BEHAVIOR_CORK:
+ return ptr->flags.cork;
case MEMCACHED_BEHAVIOR_MAX:
default:
WATCHPOINT_ASSERT(0); /* Programming mistake if it gets this far */
uint32_t value;
};
+/* Yum, Fortran.... can you make the reference? */
+typedef enum {
+ MEM_NOT= -1,
+ MEM_FALSE= false,
+ MEM_TRUE= true,
+} memcached_ternary_t;
+
#if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
#define unlikely(x) if(__builtin_expect((x) != 0, 0))
#endif
-
#define MEMCACHED_BLOCK_SIZE 1024
#define MEMCACHED_DEFAULT_COMMAND_SIZE 350
#define SMALL_STRING_LEN 1024
return MEMCACHED_SUCCESS;
}
+#ifdef TCP_CORK
+ #define CORK TCP_CORK
+#elif defined TCP_NOPUSH
+ #define CORK TCP_NOPUSH
+#endif
+
+/*
+ cork_switch() tries to enable TCP_CORK. IF TCP_CORK is not an option
+ on the system it returns false but sets errno to 0. Otherwise on
+ failure errno is set.
+*/
+static inline memcached_ternary_t cork_switch(memcached_server_st *ptr, bool enable)
+{
+#ifdef CORK
+ if (ptr->type != MEMCACHED_CONNECTION_TCP)
+ return MEM_FALSE;
+
+ int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK,
+ &enable, (socklen_t)sizeof(int));
+ if (! err)
+ {
+ return MEM_TRUE;
+ }
+ else
+ {
+ ptr->cached_errno= errno;
+ return MEM_FALSE;
+ }
+#else
+ (void)ptr;
+ (void)enable;
+
+ ptr->cached_errno= 0;
+
+ return MEM_NOT;
+#endif
+}
+
#endif /* LIBMEMCACHED_COMMON_H */
return MEMCACHED_FAILURE;
}
- if (ptr->root->send_size)
+ if (ptr->root->send_size > 0)
{
int error;
return MEMCACHED_FAILURE;
}
- if (ptr->root->recv_size)
+ if (ptr->root->recv_size > 0)
{
int error;
MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS,
MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS,
MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ,
+ MEMCACHED_BEHAVIOR_CORK,
MEMCACHED_BEHAVIOR_MAX
} memcached_behavior_t;
return false;
}
+static inline void memcached_io_cork_push(memcached_server_st *ptr)
+{
+#ifdef CORK
+ if (ptr->root->flags.cork == false || ptr->state.is_corked)
+ return;
+
+ ptr->state.is_corked=
+ cork_switch(ptr, true) == MEM_TRUE ? true : false;
+
+ WATCHPOINT_ASSERT(ptr->state.is_corked == true);
+#else
+ (void)ptr;
+#endif
+}
+
+static inline void memcached_io_cork_pop(memcached_server_st *ptr)
+{
+#ifdef CORK
+ if (ptr->root->flags.cork == false || ptr->state.is_corked == false)
+ return;
+
+ ptr->state.is_corked=
+ cork_switch(ptr, false) == MEM_FALSE ? false : true;
+
+ WATCHPOINT_ASSERT(ptr->state.is_corked == false);
+#else
+ (void)ptr;
+#endif
+}
+
#ifdef UNUSED
void memcached_io_preread(memcached_st *ptr)
{
original_length= length;
buffer_ptr= buffer;
+ /* more writable data is coming if a flush isn't required, so delay send */
+ if (! with_flush)
+ {
+ memcached_io_cork_push(ptr);
+ }
+
while (length)
{
char *write_ptr;
memcached_return_t rc;
WATCHPOINT_ASSERT(ptr->fd != -1);
if (io_flush(ptr, &rc) == -1)
+ {
return -1;
+ }
+
+ memcached_io_cork_pop(ptr);
}
return (ssize_t) original_length;
ptr->retry_timeout= 0;
ptr->distribution= MEMCACHED_DISTRIBUTION_MODULA;
+
+ ptr->send_size= -1;
+ ptr->recv_size= -1;
+
/* TODO, Document why we picked these defaults */
ptr->io_msg_watermark= 500;
ptr->io_bytes_watermark= 65 * 1024;
bool use_sort_hosts:1;
bool use_udp:1;
bool verify_key:1;
+ bool cork:1;
} flags;
int32_t poll_timeout;
int32_t connect_timeout;
host->fd= -1;
host->type= type;
host->read_ptr= host->read_buffer;
+ host->state.is_corked= 0;
if (memc)
host->next_retry= memc->retry_timeout;
if (type == MEMCACHED_CONNECTION_UDP)
uint32_t io_bytes_sent; /* # bytes sent since last read */
uint32_t server_failure_counter;
uint32_t weight;
+ struct { // Place any "state" sort variables in here.
+ bool is_corked;
+ } state;
uint8_t major_version;
uint8_t micro_version;
uint8_t minor_version;
return TEST_SUCCESS;
}
+static test_return_t MEMCACHED_BEHAVIOR_CORK_test(memcached_st *memc)
+{
+ memcached_return_t rc;
+ bool set= true;
+ bool value;
+
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_CORK, set);
+#ifdef TCP_CORK
+ test_truth(rc == MEMCACHED_SUCCESS);
+#else
+ test_truth(rc == MEMCACHED_NOT_SUPPORTED);
+#endif
+
+ value= (bool)memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_CORK);
+#ifdef TCP_CORK
+ test_truth((bool)value == set);
+#else
+ test_false((bool)value == set);
+#endif
+
+ return TEST_SUCCESS;
+}
+
static test_return_t fetch_all_results(memcached_st *memc)
{
memcached_return_t rc= MEMCACHED_SUCCESS;
return TEST_SUCCESS;
}
-static test_return_t pre_nonblock(memcached_st *memc)
+static test_return_t pre_nonblock(memcached_st *memc)
{
memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, 0);
return TEST_SUCCESS;
}
+static test_return_t pre_cork(memcached_st *memc)
+{
+ memcached_return_t rc;
+ bool set= true;
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_CORK, set);
+
+ if (rc == MEMCACHED_SUCCESS)
+ return TEST_SUCCESS;
+
+ return TEST_SKIPPED;
+}
+
+static test_return_t pre_cork_and_nonblock(memcached_st *memc)
+{
+ test_return_t rc;
+
+ rc= pre_cork(memc);
+
+ if (rc != TEST_SUCCESS)
+ return rc;
+
+ return pre_nonblock(memc);
+}
+
static test_return_t pre_nonblock_binary(memcached_st *memc)
{
memcached_return_t rc= MEMCACHED_FAILURE;
test_st behavior_tests[] ={
{"behavior_test", 0, (test_callback_fn)behavior_test},
+ {"MEMCACHED_BEHAVIOR_CORK", 0, (test_callback_fn)MEMCACHED_BEHAVIOR_CORK_test},
{0, 0, 0}
};
{"generate_murmur", (test_callback_fn)pre_murmur, 0, generate_tests},
{"generate_jenkins", (test_callback_fn)pre_jenkins, 0, generate_tests},
{"generate_nonblock", (test_callback_fn)pre_nonblock, 0, generate_tests},
+ {"generate_corked", (test_callback_fn)pre_cork, 0, generate_tests},
+ {"generate_corked_and_nonblock", (test_callback_fn)pre_cork_and_nonblock, 0, generate_tests},
{"consistent_not", 0, 0, consistent_tests},
{"consistent_ketama", (test_callback_fn)pre_behavior_ketama, 0, consistent_tests},
{"consistent_ketama_weighted", (test_callback_fn)pre_behavior_ketama_weighted, 0, consistent_weighted_tests},