{
uint32_t x;
- if (!pairs)
+ if (! pairs)
return;
/* We free until we hit the null pair we stores during creation */
for (x= 0; pairs[x].key; x++)
{
free(pairs[x].key);
- free(pairs[x].value);
+ if (pairs[x].value)
+ free(pairs[x].value);
}
free(pairs);
get_random_string(pairs[x].key, 100);
pairs[x].key_length= 100;
- pairs[x].value= (char *)calloc(value_length, sizeof(char));
- if (!pairs[x].value)
- goto error;
- get_random_string(pairs[x].value, value_length);
- pairs[x].value_length= value_length;
+ if (value_length)
+ {
+ pairs[x].value= (char *)calloc(value_length, sizeof(char));
+ if (!pairs[x].value)
+ goto error;
+ get_random_string(pairs[x].value, value_length);
+ pairs[x].value_length= value_length;
+ }
+ else
+ {
+ pairs[x].value= NULL;
+ pairs[x].value_length= 0;
+ }
}
return pairs;
#define MEMCACHED_POINTS_PER_SERVER_KETAMA 160
#define MEMCACHED_CONTINUUM_SIZE MEMCACHED_POINTS_PER_SERVER*100 /* This would then set max hosts to 100 */
#define MEMCACHED_STRIDE 4
-#define MEMCACHED_DEFAULT_TIMEOUT 1000
+#define MEMCACHED_DEFAULT_TIMEOUT 5000
#define MEMCACHED_DEFAULT_CONNECT_TIMEOUT 4000
#define MEMCACHED_CONTINUUM_ADDITION 10 /* How many extra slots we should build for in the continuum */
#define MEMCACHED_PREFIX_KEY_MAX_SIZE 128
};
int error;
- unlikely (read_or_write == MEM_WRITE) /* write */
+ if (read_or_write == MEM_WRITE) /* write */
+ {
fds.events= POLLOUT;
+ WATCHPOINT_SET(ptr->io_wait_count.write++);
+ }
+ else
+ {
+ WATCHPOINT_SET(ptr->io_wait_count.read++);
+ }
/*
** We are going to block on write, but at least on Solaris we might block
switch (error)
{
case 1:
+ WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
+ WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
+
return MEMCACHED_SUCCESS;
case 0:
- continue;
+ return MEMCACHED_TIMEOUT;
default:
WATCHPOINT_ERRNO(errno);
{
switch (errno)
{
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ case EINTR:
+ continue;
default:
ptr->cached_errno= error;
memcached_quit_server(ptr, true);
{
data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER);
if (data_read > 0)
+ {
break;
+ }
else if (data_read == -1)
{
ptr->cached_errno= errno;
{
ptr->cached_errno= errno;
WATCHPOINT_ERRNO(errno);
+ WATCHPOINT_NUMBER(errno);
switch (errno)
{
case ENOBUFS:
const void *buffer;
};
-
LIBMEMCACHED_LOCAL
ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
const struct __write_vector_st *vector,
* closing the socket before all data is read
* results in server throwing away all data which is
* not read
+ *
+ * In .40 we began to only do this if we had been doing buffered
+ * requests of had replication enabled.
*/
- ssize_t nread;
- while (memcached_io_read(ptr, buffer, sizeof(buffer)/sizeof(*buffer),
- &nread) == MEMCACHED_SUCCESS);
+ if (ptr->root->flags.buffer_requests || ptr->root->number_of_replicas)
+ {
+ ssize_t nread;
+ while (memcached_io_read(ptr, buffer, sizeof(buffer)/sizeof(*buffer),
+ &nread) == MEMCACHED_SUCCESS);
+ }
+
/*
* memcached_io_read may call memcached_quit_server with io_death if
self->weight= weight;
self->state.is_corked= false;
self->state.is_dead= false;
+ WATCHPOINT_SET(self->io_wait_count.read= 0);
+ WATCHPOINT_SET(self->io_wait_count.write= 0);
self->major_version= 0;
self->micro_version= 0;
self->minor_version= 0;
bool is_corked:1;
bool is_dead:1;
} state;
+ struct {
+ uint32_t read;
+ uint32_t write;
+ } io_wait_count;
uint8_t major_version;
uint8_t micro_version;
uint8_t minor_version;
}
static memcached_return_t memcached_send_binary(memcached_st *ptr,
- const char *master_key,
- size_t master_key_length,
+ memcached_server_write_instance_st server,
+ uint32_t server_key,
const char *key,
size_t key_length,
const char *value,
if (ptr->flags.verify_key && (memcached_key_test((const char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
return MEMCACHED_BAD_KEY_PROVIDED;
- if (ptr->flags.binary_protocol)
- {
- return memcached_send_binary(ptr, master_key, master_key_length,
- key, key_length,
- value, value_length, expiration,
- flags, cas, verb);
- }
-
server_key= memcached_generate_hash_with_redistribution(ptr, master_key, master_key_length);
instance= memcached_server_instance_fetch(ptr, server_key);
- if (cas)
- {
- write_length= (size_t) snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "%s %.*s%.*s %u %llu %zu %llu%s\r\n",
- storage_op_string(verb),
- (int)ptr->prefix_key_length,
- ptr->prefix_key,
- (int)key_length, key, flags,
- (unsigned long long)expiration, value_length,
- (unsigned long long)cas,
- (ptr->flags.no_reply) ? " noreply" : "");
- }
- else
- {
- char *buffer_ptr= buffer;
- const char *command= storage_op_string(verb);
-
- /* Copy in the command, no space needed, we handle that in the command function*/
- memcpy(buffer_ptr, command, strlen(command));
-
- /* Copy in the key prefix, switch to the buffer_ptr */
- buffer_ptr= memcpy((buffer_ptr + strlen(command)), ptr->prefix_key, ptr->prefix_key_length);
-
- /* Copy in the key, adjust point if a key prefix was used. */
- buffer_ptr= memcpy(buffer_ptr + (ptr->prefix_key_length ? ptr->prefix_key_length : 0),
- key, key_length);
- buffer_ptr+= key_length;
- buffer_ptr[0]= ' ';
- buffer_ptr++;
-
- write_length= (size_t)(buffer_ptr - buffer);
- write_length+= (size_t) snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "%u %llu %zu%s\r\n",
- flags,
- (unsigned long long)expiration, value_length,
- ptr->flags.no_reply ? " noreply" : "");
- }
-
- if (ptr->flags.use_udp && ptr->flags.buffer_requests)
- {
- size_t cmd_size= write_length + value_length + 2;
- if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
- return MEMCACHED_WRITE_FAILURE;
- if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
- memcached_io_write(instance, NULL, 0, true);
- }
+ WATCHPOINT_SET(instance->io_wait_count.read= 0);
+ WATCHPOINT_SET(instance->io_wait_count.write= 0);
- if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
+ if (ptr->flags.binary_protocol)
{
- rc= MEMCACHED_WRITE_FAILURE;
+ rc= memcached_send_binary(ptr, instance, server_key,
+ key, key_length,
+ value, value_length, expiration,
+ flags, cas, verb);
+ WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read);
+ WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write);
}
else
{
- struct __write_vector_st vector[]=
- {
- { .length= write_length, .buffer= buffer },
- { .length= value_length, .buffer= value },
- { .length= 2, .buffer= "\r\n" }
- };
- if (ptr->flags.buffer_requests && verb == SET_OP)
+ if (cas)
{
- to_write= false;
+ write_length= (size_t) snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
+ "%s %.*s%.*s %u %llu %zu %llu%s\r\n",
+ storage_op_string(verb),
+ (int)ptr->prefix_key_length,
+ ptr->prefix_key,
+ (int)key_length, key, flags,
+ (unsigned long long)expiration, value_length,
+ (unsigned long long)cas,
+ (ptr->flags.no_reply) ? " noreply" : "");
}
else
{
- to_write= true;
+ char *buffer_ptr= buffer;
+ const char *command= storage_op_string(verb);
+
+ /* Copy in the command, no space needed, we handle that in the command function*/
+ memcpy(buffer_ptr, command, strlen(command));
+
+ /* Copy in the key prefix, switch to the buffer_ptr */
+ buffer_ptr= memcpy((buffer_ptr + strlen(command)), ptr->prefix_key, ptr->prefix_key_length);
+
+ /* Copy in the key, adjust point if a key prefix was used. */
+ buffer_ptr= memcpy(buffer_ptr + (ptr->prefix_key_length ? ptr->prefix_key_length : 0),
+ key, key_length);
+ buffer_ptr+= key_length;
+ buffer_ptr[0]= ' ';
+ buffer_ptr++;
+
+ write_length= (size_t)(buffer_ptr - buffer);
+ write_length+= (size_t) snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE,
+ "%u %llu %zu%s\r\n",
+ flags,
+ (unsigned long long)expiration, value_length,
+ ptr->flags.no_reply ? " noreply" : "");
}
- /* Send command header */
- rc= memcached_vdo(instance, vector, 3, to_write);
- if (rc == MEMCACHED_SUCCESS)
+ if (ptr->flags.use_udp && ptr->flags.buffer_requests)
{
+ size_t cmd_size= write_length + value_length + 2;
+ if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+ return MEMCACHED_WRITE_FAILURE;
+ if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+ memcached_io_write(instance, NULL, 0, true);
+ }
- if (ptr->flags.no_reply)
- return (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
+ if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
+ {
+ rc= MEMCACHED_WRITE_FAILURE;
+ }
+ else
+ {
+ struct __write_vector_st vector[]=
+ {
+ { .length= write_length, .buffer= buffer },
+ { .length= value_length, .buffer= value },
+ { .length= 2, .buffer= "\r\n" }
+ };
- if (to_write == false)
- return MEMCACHED_BUFFERED;
+ if (ptr->flags.buffer_requests && verb == SET_OP)
+ {
+ to_write= false;
+ }
+ else
+ {
+ to_write= true;
+ }
- rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
+ /* Send command header */
+ rc= memcached_vdo(instance, vector, 3, to_write);
+ if (rc == MEMCACHED_SUCCESS)
+ {
- if (rc == MEMCACHED_STORED)
- return MEMCACHED_SUCCESS;
- else
- return rc;
+ if (ptr->flags.no_reply)
+ {
+ rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
+ }
+ else if (to_write == false)
+ {
+ rc= MEMCACHED_BUFFERED;
+ }
+ else
+ {
+ rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
+
+ if (rc == MEMCACHED_STORED)
+ rc= MEMCACHED_SUCCESS;
+ }
+ }
}
+
+ if (rc == MEMCACHED_WRITE_FAILURE)
+ memcached_io_reset(instance);
}
- if (rc == MEMCACHED_WRITE_FAILURE)
- memcached_io_reset(instance);
+ WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.read > 2, "read IO_WAIT", instance->io_wait_count.read);
+ WATCHPOINT_IF_LABELED_NUMBER(instance->io_wait_count.write > 2, "write_IO_WAIT", instance->io_wait_count.write);
return rc;
}
static memcached_return_t memcached_send_binary(memcached_st *ptr,
- const char *master_key,
- size_t master_key_length,
+ memcached_server_write_instance_st server,
+ uint32_t server_key,
const char *key,
size_t key_length,
const char *value,
bool flush;
protocol_binary_request_set request= {.bytes= {0}};
size_t send_length= sizeof(request.bytes);
- uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, master_key,
- master_key_length);
-
- memcached_server_write_instance_st server=
- memcached_server_instance_fetch(ptr, server_key);
bool noreply= server->root->flags.no_reply;
return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc;
}
- unlikely (verb == SET_OP && ptr->number_of_replicas > 0)
+ if (verb == SET_OP && ptr->number_of_replicas > 0)
{
request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
+ WATCHPOINT_STRING("replicating");
for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
{
#define WATCHPOINT_STRING(A) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__,A);fflush(stdout); } while (0)
#define WATCHPOINT_STRING_LENGTH(A,B) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %.*s\n", __FILE__, __LINE__,__func__,(int)B,A);fflush(stdout); } while (0)
#define WATCHPOINT_NUMBER(A) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %zu\n", __FILE__, __LINE__,__func__,(size_t)(A));fflush(stdout); } while (0)
+#define WATCHPOINT_LABELED_NUMBER(A,B) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s:%zu\n", __FILE__, __LINE__,__func__,(A),(size_t)(B));fflush(stdout); } while (0)
+#define WATCHPOINT_IF_LABELED_NUMBER(A,B,C) do { if(A) {fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s:%zu\n", __FILE__, __LINE__,__func__,(B),(size_t)(C));fflush(stdout);} } while (0)
#define WATCHPOINT_ERRNO(A) do { fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__, strerror(A));fflush(stdout); } while (0)
#define WATCHPOINT_ASSERT_PRINT(A,B,C) do { if(!(A)){fprintf(stderr, "\nWATCHPOINT ASSERT %s:%d (%s) ", __FILE__, __LINE__,__func__);fprintf(stderr, (B),(C));fprintf(stderr,"\n");fflush(stdout); __stack_dump(); } assert((A)); } while (0)
#define WATCHPOINT_ASSERT(A) do { if (! (A)) {__stack_dump();} assert((A)); } while (0)
#define WATCHPOINT_IFERROR(A)
#define WATCHPOINT_STRING(A)
#define WATCHPOINT_NUMBER(A)
+#define WATCHPOINT_LABELED_NUMBER(A,B)
+#define WATCHPOINT_IF_LABELED_NUMBER(A,B,C)
#define WATCHPOINT_ERRNO(A)
#define WATCHPOINT_ASSERT_PRINT(A,B,C)
#define WATCHPOINT_ASSERT(A)
static const char *global_keys[GLOBAL_COUNT];
static size_t global_keys_length[GLOBAL_COUNT];
+// Prototype
+static test_return_t pre_binary(memcached_st *memc);
+
+
static test_return_t init_test(memcached_st *not_used __attribute__((unused)))
{
memcached_st memc;
uint32_t number_of_hosts= memc->number_of_hosts;
memc->number_of_hosts= 1;
- size_t max_keys= binary ? 20480 : 1;
+ size_t max_keys= 20480;
char **keys= calloc(max_keys, sizeof(char*));
/* First add all of the items.. */
char blob[1024] = {0};
memcached_return_t rc;
+
for (size_t x= 0; x < max_keys; ++x)
{
char k[251];
rc= memcached_mget_execute(memc, (const char**)keys, key_length,
max_keys, callbacks, &counter, 1);
- if (binary)
+ if (rc == MEMCACHED_SUCCESS)
{
- test_true(rc == MEMCACHED_SUCCESS);
-
+ test_true(binary);
rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
test_true(rc == MEMCACHED_END);
/* Verify that we got all of the items */
test_true(counter == max_keys);
}
- else
+ else if (rc == MEMCACHED_NOT_SUPPORTED)
{
- test_true(rc == MEMCACHED_NOT_SUPPORTED);
test_true(counter == 0);
}
+ else
+ {
+ test_fail("note: this test functions differently when in binary mode");
+ }
/* Release all allocated resources */
for (size_t x= 0; x < max_keys; ++x)
return TEST_SUCCESS;
}
+#define REGRESSION_BINARY_VS_BLOCK_COUNT 20480
+
+static test_return_t key_setup(memcached_st *memc)
+{
+ (void)memc;
+
+ if (pre_binary(memc) != TEST_SUCCESS)
+ return TEST_SKIPPED;
+
+ global_pairs= pairs_generate(REGRESSION_BINARY_VS_BLOCK_COUNT, 0);
+
+ return TEST_SUCCESS;
+}
+
+static test_return_t key_teardown(memcached_st *memc)
+{
+ (void)memc;
+ pairs_free(global_pairs);
+
+ return TEST_SUCCESS;
+}
+
+static test_return_t block_add_regression(memcached_st *memc)
+{
+ /* First add all of the items.. */
+ for (size_t x= 0; x < REGRESSION_BINARY_VS_BLOCK_COUNT; ++x)
+ {
+ memcached_return_t rc;
+ char blob[1024] = {0};
+
+ rc= memcached_add_by_key(memc, "bob", 3, global_pairs[x].key, global_pairs[x].key_length, blob, sizeof(blob), 0, 0);
+ test_true(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+ }
+
+ return TEST_SUCCESS;
+}
+
+static test_return_t binary_add_regression(memcached_st *memc)
+{
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
+ test_return_t rc= block_add_regression(memc);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 0);
+ return rc;
+}
+
static test_return_t get_stats_keys(memcached_st *memc)
{
char **stat_list;
return TEST_SUCCESS;
}
-static test_return_t pre_binary(memcached_st *memc);
-
static test_return_t user_supplied_bug21(memcached_st *memc)
{
test_return_t test_rc;
{0, 0, 0}
};
+test_st regression_binary_vs_block[] ={
+ {"block add", 1, (test_callback_fn)block_add_regression},
+ {"binary add", 1, (test_callback_fn)binary_add_regression},
+ {0, 0, 0}
+};
+
test_st async_tests[] ={
{"add", 1, (test_callback_fn)add_wrapper },
{0, 0, 0}
{"replication_noblock", (test_callback_fn)pre_replication_noblock, 0, replication_tests},
{"regression", 0, 0, regression_tests},
{"behaviors", 0, 0, behavior_tests},
+ {"regression_binary_vs_block", (test_callback_fn)key_setup, (test_callback_fn)key_teardown, regression_binary_vs_block},
{0, 0, 0, 0}
};
fprintf(stderr, "\tSucceeded\t\t%u\n", stats->success);
}
-static long int timedif(struct timeval a, struct timeval b)
+long int timedif(struct timeval a, struct timeval b)
{
long us, s;
typedef test_return_t (*test_callback_runner_fn)(test_callback_fn, void *);
typedef test_return_t (*test_callback_error_fn)(test_return_t, void *);
+/* Help function for use with gettimeofday() */
+long int timedif(struct timeval a, struct timeval b);
/**
A structure describing the test case.