Set this value to specify that you really don't care about the result
from your storage commands (set, add, replace, append, prepend).
+=item MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS
+
+If you just want "a poor mans HA", you may specify the numbers of
+replicas libmemcached should store of each item (on different servers).
+This replication does not dedicate certain memcached servers to store the
+replicas in, but instead it will store the replicas together with all of the
+other objects (on the 'n' next servers specified in your server list).
+
=back
=head1 RETURN
new_clone->io_msg_watermark= source->io_msg_watermark;
new_clone->io_bytes_watermark= source->io_bytes_watermark;
new_clone->io_key_prefetch= source->io_key_prefetch;
+ new_clone->number_of_replicas= source->number_of_replicas;
if (source->hosts)
rc= memcached_server_push(new_clone, source->hosts);
memcached_trigger_key get_key_failure;
memcached_trigger_delete_key delete_trigger;
char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE];
+ uint32_t number_of_replicas;
};
{
switch (flag)
{
+ case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS:
+ ptr->number_of_replicas= (uint32_t)data;
+ break;
case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
ptr->io_msg_watermark= (int32_t)data;
break;
switch (flag)
{
+ case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS:
+ return ptr->number_of_replicas;
case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
return ptr->io_msg_watermark;
case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY,
MEMCACHED_BEHAVIOR_NOREPLY,
MEMCACHED_BEHAVIOR_USE_UDP,
- MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS
+ MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS,
+ MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS
} memcached_behavior;
typedef enum {
#include "common.h"
+#include "memcached/protocol_binary.h"
memcached_return memcached_delete(memcached_st *ptr, const char *key, size_t key_length,
time_t expiration)
memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
}
+ memcached_return rc= MEMCACHED_SUCCESS;
+
if ((memcached_do(&ptr->hosts[server_key], request.bytes,
sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
(memcached_io_write(&ptr->hosts[server_key], key,
key_length, flush) == -1))
{
memcached_io_reset(&ptr->hosts[server_key]);
- return MEMCACHED_WRITE_FAILURE;
+ rc= MEMCACHED_WRITE_FAILURE;
+ }
+
+ if (ptr->number_of_replicas > 0)
+ {
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ;
+
+ for (int x= 0; x < ptr->number_of_replicas; ++x)
+ {
+ ++server_key;
+ if (server_key == ptr->number_of_hosts)
+ server_key= 0;
+
+ memcached_server_st* server= &ptr->hosts[server_key];
+ if ((memcached_do(server, (const char*)request.bytes,
+ sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
+ (memcached_io_write(server, key, key_length, flush) == -1))
+ memcached_io_reset(server);
+ }
}
- return MEMCACHED_SUCCESS;
+ return rc;
}
if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
return MEMCACHED_BAD_KEY_PROVIDED;
- if (ptr->flags & MEM_SUPPORT_CAS)
- {
- get_command= "gets ";
- get_command_length= 5;
- }
-
if (master_key && master_key_length)
{
if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys,
key_length, number_of_keys);
+ if (ptr->flags & MEM_SUPPORT_CAS)
+ {
+ get_command= "gets ";
+ get_command_length= 5;
+ }
+
/*
If a server fails we warn about errors and start all over with sending keys
to the server.
return rc;
}
-static memcached_return binary_mget_by_key(memcached_st *ptr,
+static memcached_return simple_binary_mget(memcached_st *ptr,
unsigned int master_server_key,
bool is_master_key_set,
char **keys, size_t *key_length,
return rc;
}
+
+static memcached_return replication_binary_mget(memcached_st *ptr,
+ uint32_t* hash, bool* dead_servers,
+ char **keys, size_t *key_length,
+ unsigned int number_of_keys)
+{
+ memcached_return rc= MEMCACHED_NOTFOUND;
+ uint32_t x;
+
+ int flush= number_of_keys == 1;
+
+ for (int replica = 0; replica <= ptr->number_of_replicas; ++replica)
+ {
+ bool success= true;
+
+ for (uint32_t x= 0; x < number_of_keys; ++x)
+ {
+ if (hash[x] == ptr->number_of_hosts)
+ continue; /* Already successfully sent */
+
+ uint32_t server= hash[x] + replica;
+ while (server >= ptr->number_of_hosts)
+ server -= ptr->number_of_hosts;
+
+ if (dead_servers[server])
+ continue;
+
+ if (memcached_server_response_count(&ptr->hosts[server]) == 0)
+ {
+ rc= memcached_connect(&ptr->hosts[server]);
+ if (rc != MEMCACHED_SUCCESS)
+ {
+ memcached_io_reset(&ptr->hosts[server]);
+ dead_servers[server]= true;
+ success= false;
+ continue;
+ }
+ }
+
+ protocol_binary_request_getk request= {.bytes= {0}};
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ if (number_of_keys == 1)
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
+ else
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
+
+ request.message.header.request.keylen= htons((uint16_t)key_length[x]);
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+ request.message.header.request.bodylen= htonl(key_length[x]);
+
+ if ((memcached_io_write(&ptr->hosts[server], request.bytes,
+ sizeof(request.bytes), 0) == -1) ||
+ (memcached_io_write(&ptr->hosts[server], keys[x],
+ key_length[x], flush) == -1))
+ {
+ memcached_io_reset(&ptr->hosts[server]);
+ dead_servers[server]= true;
+ success= false;
+ continue;
+ }
+ memcached_server_response_increment(&ptr->hosts[server]);
+ }
+
+ if (number_of_keys > 1)
+ {
+ /*
+ * Send a noop command to flush the buffers
+ */
+ protocol_binary_request_noop request= {.bytes= {0}};
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+
+ for (x= 0; x < ptr->number_of_hosts; x++)
+ if (memcached_server_response_count(&ptr->hosts[x]))
+ {
+ if (memcached_io_write(&ptr->hosts[x], request.bytes,
+ sizeof(request.bytes), 1) == -1)
+ {
+ memcached_io_reset(&ptr->hosts[x]);
+ dead_servers[x]= true;
+ success= false;
+ }
+ memcached_server_response_increment(&ptr->hosts[x]);
+
+ /* mark all of the messages bound for this server as sent! */
+ for (uint32_t x= 0; x < number_of_keys; ++x)
+ if (hash[x] == x)
+ hash[x]= ptr->number_of_hosts;
+ }
+ }
+
+ if (success) {
+ break;
+ }
+ }
+
+ return rc;
+}
+
+static memcached_return binary_mget_by_key(memcached_st *ptr,
+ unsigned int master_server_key,
+ bool is_master_key_set,
+ char **keys, size_t *key_length,
+ unsigned int number_of_keys)
+{
+ memcached_return rc;
+
+ if (ptr->number_of_replicas == 0) {
+ rc= simple_binary_mget(ptr, master_server_key, is_master_key_set,
+ keys, key_length, number_of_keys);
+ } else {
+ uint32_t* hash;
+ bool* dead_servers;
+
+ if (ptr->call_malloc)
+ {
+ hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys);
+ dead_servers= ptr->call_malloc(ptr, sizeof(bool) * ptr->number_of_hosts);
+ }
+ else
+ {
+ hash = malloc(sizeof(uint32_t) * number_of_keys);
+ dead_servers= malloc(sizeof(bool) * ptr->number_of_hosts);
+ }
+
+ if (hash == NULL || dead_servers == NULL)
+ {
+ if (ptr->call_free)
+ {
+ if (hash != NULL) ptr->call_free(ptr, hash);
+ if (dead_servers != NULL) ptr->call_free(ptr, dead_servers);
+ }
+ else
+ {
+ free(hash); /* No need to check for NULL (just look in the C spec) */
+ free(dead_servers);
+ }
+ return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+ }
+
+ memset(dead_servers, 0, sizeof(bool) * ptr->number_of_hosts);
+
+ if (is_master_key_set)
+ for (unsigned int x= 0; x < number_of_keys; ++x)
+ hash[x]= master_server_key;
+ else
+ for (unsigned int x= 0; x < number_of_keys; ++x)
+ hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
+
+ rc= replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys);
+
+ if (ptr->call_free)
+ {
+ ptr->call_free(ptr, hash);
+ ptr->call_free(ptr, dead_servers);
+ }
+ else
+ {
+ free(hash);
+ free(dead_servers);
+ }
+
+ return MEMCACHED_SUCCESS;
+ }
+
+ return rc;
+}
/* NOTREACHED */
}
-static memcached_return memcached_send_binary(memcached_server_st* server,
+static memcached_return memcached_send_binary(memcached_st *ptr,
+ const char *master_key,
+ size_t master_key_length,
const char *key,
size_t key_length,
const char *value,
if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
return MEMCACHED_BAD_KEY_PROVIDED;
- server_key= memcached_generate_hash(ptr, master_key, master_key_length);
-
if (ptr->flags & MEM_BINARY_PROTOCOL)
- return memcached_send_binary(&ptr->hosts[server_key], key, key_length,
- value, value_length, expiration,
+ return memcached_send_binary(ptr, master_key, master_key_length,
+ key, key_length,
+ value, value_length, expiration,
flags, cas, verb);
+ server_key= memcached_generate_hash(ptr, master_key, master_key_length);
+
if (cas)
write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
"%s %s%.*s %u %llu %zu %llu%s\r\n",
-static memcached_return memcached_send_binary(memcached_server_st* server,
- const char *key,
+static memcached_return memcached_send_binary(memcached_st *ptr,
+ const char *master_key,
+ size_t master_key_length,
+ const char *key,
size_t key_length,
const char *value,
size_t value_length,
char flush;
protocol_binary_request_set request= {.bytes= {0}};
size_t send_length= sizeof(request.bytes);
+ uint32_t server_key= memcached_generate_hash(ptr, master_key,
+ master_key_length);
+ memcached_server_st *server= &ptr->hosts[server_key];
bool noreply= server->root->flags & MEM_NOREPLY;
request.message.header.request.magic= PROTOCOL_BINARY_REQ;
memcached_io_reset(server);
return MEMCACHED_WRITE_FAILURE;
}
-
+
+ if (verb == SET_OP && ptr->number_of_replicas > 0)
+ {
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
+
+ for (int x= 0; x < ptr->number_of_replicas; ++x)
+ {
+ ++server_key;
+ if (server_key == ptr->number_of_hosts)
+ server_key= 0;
+
+ memcached_server_st *srv= &ptr->hosts[server_key];
+ if ((memcached_do(srv, (const char*)request.bytes,
+ send_length, 0) != MEMCACHED_SUCCESS) ||
+ (memcached_io_write(srv, key, key_length, 0) == -1) ||
+ (memcached_io_write(srv, value, value_length, flush) == -1))
+ memcached_io_reset(server);
+ }
+ }
+
if (flush == 0)
return MEMCACHED_BUFFERED;
assert(clone->server_failure_limit == memc->server_failure_limit);
assert(clone->snd_timeout == memc->snd_timeout);
assert(clone->user_data == memc->user_data);
+ assert(clone->number_of_replicas == memc->number_of_replicas);
memcached_free(clone);
}
value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE);
assert(value > 0);
+ value= memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, value + 1);
+ assert((value + 1) == memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS));
return 0;
}
return rc;
}
+static memcached_return pre_replication(memcached_st *memc)
+{
+ memcached_return rc= MEMCACHED_FAILURE;
+ if (pre_binary(memc) == MEMCACHED_SUCCESS)
+ {
+ /*
+ * Make sure that we store the item on all servers
+ * (master + replicas == number of servers)
+ */
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS,
+ memc->number_of_hosts - 1);
+ assert(rc == MEMCACHED_SUCCESS);
+ assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS) == memc->number_of_hosts - 1);
+ }
+
+ return rc;
+}
+
static void my_free(memcached_st *ptr __attribute__((unused)), void *mem)
{
free(mem);
}
#endif
+static test_return replication_set_test(memcached_st *memc)
+{
+ memcached_return rc;
+ memcached_st *clone= memcached_clone(NULL, memc);
+ memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0);
+
+ rc= memcached_set(memc, "bubba", 5, "0", 1, 0, 0);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ /*
+ ** "bubba" should now be stored on all of our servers. We don't have an
+ ** easy to use API to address each individual server, so I'll just iterate
+ ** through a bunch of "master keys" and I should most likely hit all of the
+ ** servers...
+ */
+ for (int x= 'a'; x <= 'z'; ++x)
+ {
+ char key[2]= { [0]= (char)x };
+ size_t len;
+ uint32_t flags;
+ char *val= memcached_get_by_key(clone, key, 1, "bubba", 5,
+ &len, &flags, &rc);
+ assert(rc == MEMCACHED_SUCCESS);
+ assert(val != NULL);
+ free(val);
+ }
+
+ memcached_free(clone);
+
+ return TEST_SUCCESS;
+}
+
+static test_return replication_get_test(memcached_st *memc)
+{
+ memcached_return rc;
+
+ /*
+ * Don't do the following in your code. I am abusing the internal details
+ * within the library, and this is not a supported interface.
+ * This is to verify correct behavior in the library
+ */
+ for (int host= 0; host < memc->number_of_hosts; ++host) {
+ memcached_st *clone= memcached_clone(NULL, memc);
+ clone->hosts[host].port= 0;
+
+ for (int x= 'a'; x <= 'z'; ++x)
+ {
+ char key[2]= { [0]= (char)x };
+ size_t len;
+ uint32_t flags;
+ char *val= memcached_get_by_key(clone, key, 1, "bubba", 5,
+ &len, &flags, &rc);
+ assert(rc == MEMCACHED_SUCCESS);
+ assert(val != NULL);
+ free(val);
+ }
+
+ memcached_free(clone);
+ }
+
+ return TEST_SUCCESS;
+}
+
+static test_return replication_mget_test(memcached_st *memc)
+{
+ memcached_return rc;
+ memcached_st *clone= memcached_clone(NULL, memc);
+ memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0);
+
+ char *keys[]= { "bubba", "key1", "key2", "key3" };
+ size_t len[]= { 5, 4, 4, 4 };
+
+ for (int x=0; x< 4; ++x)
+ {
+ rc= memcached_set(memc, keys[x], len[x], "0", 1, 0, 0);
+ assert(rc == MEMCACHED_SUCCESS);
+ }
+
+ /*
+ * Don't do the following in your code. I am abusing the internal details
+ * within the library, and this is not a supported interface.
+ * This is to verify correct behavior in the library
+ */
+ memcached_result_st result_obj;
+ for (int host= 0; host < clone->number_of_hosts; ++host) {
+ memcached_st *clone= memcached_clone(NULL, memc);
+ clone->hosts[host].port= 0;
+
+ for (int x= 'a'; x <= 'z'; ++x)
+ {
+ char key[2]= { [0]= (char)x };
+
+ rc= memcached_mget_by_key(clone, key, 1, keys, len, 4);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ memcached_result_st *results= memcached_result_create(clone, &result_obj);
+ assert(results);
+
+ int hits= 0;
+ while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL)
+ {
+ ++hits;
+ }
+ assert(hits == 4);
+ memcached_result_free(&result_obj);
+ }
+
+ memcached_free(clone);
+ }
+
+ return TEST_SUCCESS;
+}
+
+static test_return replication_delete_test(memcached_st *memc)
+{
+ memcached_return rc;
+ memcached_st *clone= memcached_clone(NULL, memc);
+ /* Delete the items from all of the servers except 1 */
+ uint64_t repl= memcached_behavior_get(memc,
+ MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, --repl);
+
+ char *keys[]= { "bubba", "key1", "key2", "key3" };
+ size_t len[]= { 5, 4, 4, 4 };
+
+ for (int x=0; x< 4; ++x)
+ {
+ rc= memcached_delete_by_key(memc, keys[0], len[0], keys[x], len[x], 0);
+ assert(rc == MEMCACHED_SUCCESS);
+ }
+
+ /*
+ * Don't do the following in your code. I am abusing the internal details
+ * within the library, and this is not a supported interface.
+ * This is to verify correct behavior in the library
+ */
+ int hash= memcached_generate_hash(memc, keys[0], len[0]);
+ for (int x= 0; x < (repl + 1); ++x) {
+ clone->hosts[hash].port= 0;
+ if (++hash == clone->number_of_hosts)
+ hash= 0;
+ }
+
+ memcached_result_st result_obj;
+ for (int host= 0; host < clone->number_of_hosts; ++host) {
+ for (int x= 'a'; x <= 'z'; ++x)
+ {
+ char key[2]= { [0]= (char)x };
+
+ rc= memcached_mget_by_key(clone, key, 1, keys, len, 4);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ memcached_result_st *results= memcached_result_create(clone, &result_obj);
+ assert(results);
+
+ int hits= 0;
+ while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL)
+ {
+ ++hits;
+ }
+ assert(hits == 4);
+ memcached_result_free(&result_obj);
+ }
+ }
+ memcached_free(clone);
+
+ return TEST_SUCCESS;
+}
+
static void increment_request_id(uint16_t *id)
{
(*id)++;
{0, 0, 0}
};
+test_st replication_tests[]= {
+ {"set", 1, replication_set_test },
+ {"get", 0, replication_get_test },
+ {"mget", 0, replication_mget_test },
+ {"delete", 0, replication_delete_test },
+ {0, 0, 0}
+};
+
test_st generate_tests[] ={
{"generate_pairs", 1, generate_pairs },
{"generate_data", 1, generate_data },
{"consistent_ketama", pre_behavior_ketama, 0, consistent_tests},
{"consistent_ketama_weighted", pre_behavior_ketama_weighted, 0, consistent_weighted_tests},
{"test_hashes", 0, 0, hash_tests},
+ {"replication", pre_replication, 0, replication_tests},
{0, 0, 0, 0}
};