}
static memcached_return memcached_mget_by_key_real(memcached_st *ptr,
- const char *master_key,
- size_t master_key_length,
- const char **keys,
- size_t *key_length,
- size_t number_of_keys,
- bool mget_mode);
+ const char *master_key,
+ size_t master_key_length,
+ const char * const *keys,
+ const size_t *key_length,
+ size_t number_of_keys,
+ bool mget_mode);
char *memcached_get_by_key(memcached_st *ptr,
const char *master_key,
}
/* Request the key */
- *error= memcached_mget_by_key_real(ptr,
- master_key,
- master_key_length,
- (const char **)&key, &key_length, 1, false);
+ *error= memcached_mget_by_key_real(ptr, master_key, master_key_length,
+ (const char * const *)&key,
+ &key_length, 1, false);
value= memcached_fetch(ptr, NULL, NULL,
value_length, flags, error);
}
memcached_return memcached_mget(memcached_st *ptr,
- const char **keys, size_t *key_length,
+ const char * const *keys,
+ const size_t *key_length,
size_t number_of_keys)
{
return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
static memcached_return binary_mget_by_key(memcached_st *ptr,
unsigned int master_server_key,
bool is_master_key_set,
- const char **keys, size_t *key_length,
+ const char * const *keys,
+ const size_t *key_length,
size_t number_of_keys,
bool mget_mode);
static memcached_return memcached_mget_by_key_real(memcached_st *ptr,
- const char *master_key,
- size_t master_key_length,
- const char **keys,
- size_t *key_length,
- size_t number_of_keys,
- bool mget_mode)
+ const char *master_key,
+ size_t master_key_length,
+ const char * const *keys,
+ const size_t *key_length,
+ size_t number_of_keys,
+ bool mget_mode)
{
unsigned int x;
memcached_return rc= MEMCACHED_NOTFOUND;
if (master_key && master_key_length)
{
- if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
+ if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char * const *)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
return MEMCACHED_BAD_KEY_PROVIDED;
master_server_key= memcached_generate_hash(ptr, master_key, master_key_length);
is_master_key_set= true;
memcached_return memcached_mget_by_key(memcached_st *ptr,
const char *master_key,
size_t master_key_length,
- const char **keys,
- size_t *key_length,
+ const char * const *keys,
+ const size_t *key_length,
size_t number_of_keys)
{
return memcached_mget_by_key_real(ptr, master_key, master_key_length, keys,
key_length, number_of_keys, true);
}
+memcached_return memcached_mget_execute(memcached_st *ptr,
+ const char * const *keys,
+ const size_t *key_length,
+ size_t number_of_keys,
+ memcached_execute_function *callback,
+ void *context,
+ unsigned int number_of_callbacks)
+{
+ return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
+ number_of_keys, callback,
+ context, number_of_callbacks);
+}
+
+memcached_return memcached_mget_execute_by_key(memcached_st *ptr,
+ const char *master_key,
+ size_t master_key_length,
+ const char * const *keys,
+ const size_t *key_length,
+ size_t number_of_keys,
+ memcached_execute_function *callback,
+ void *context,
+ unsigned int number_of_callbacks)
+{
+ if ((ptr->flags & MEM_BINARY_PROTOCOL) == 0)
+ return MEMCACHED_NOT_SUPPORTED;
+
+ memcached_return rc;
+ memcached_callback_st *original_callbacks= ptr->callbacks;
+ memcached_callback_st cb= {
+ .callback= callback,
+ .context= context,
+ .number_of_callback= number_of_callbacks
+ };
+
+ ptr->callbacks= &cb;
+ rc= memcached_mget_by_key(ptr, master_key, master_key_length, keys,
+ key_length, number_of_keys);
+ ptr->callbacks= original_callbacks;
+ return rc;
+}
+
static memcached_return simple_binary_mget(memcached_st *ptr,
unsigned int master_server_key,
bool is_master_key_set,
- const char **keys, size_t *key_length,
+ const char * const *keys,
+ const size_t *key_length,
size_t number_of_keys, bool mget_mode)
{
memcached_return rc= MEMCACHED_NOTFOUND;
rc= MEMCACHED_SOME_ERRORS;
continue;
}
+
+ /* We just want one pending response per server */
+ memcached_server_response_reset(&ptr->hosts[server_key]);
memcached_server_response_increment(&ptr->hosts[server_key]);
if ((x > 0 && x == ptr->io_key_prefetch) &&
memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
memcached_io_reset(&ptr->hosts[x]);
rc= MEMCACHED_SOME_ERRORS;
}
- memcached_server_response_increment(&ptr->hosts[x]);
}
}
}
static memcached_return replication_binary_mget(memcached_st *ptr,
- uint32_t* hash, bool* dead_servers,
- const char **keys, size_t *key_length,
- size_t number_of_keys, bool mget_mode)
+ uint32_t* hash,
+ bool* dead_servers,
+ const char *const *keys,
+ const size_t *key_length,
+ size_t number_of_keys)
{
memcached_return rc= MEMCACHED_NOTFOUND;
- uint32_t x;
+ uint32_t x, start = 0;
+ uint64_t randomize_read = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
- int flush= number_of_keys == 1;
+ if (randomize_read) {
+ srandom((uint32_t) time(NULL));
+ start = (uint32_t)(random() % (ptr->number_of_replicas + 1));
+ }
+ /* Loop for each replica */
for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
{
- bool success= true;
-
+ bool success= true;
+
for (x= 0; x < number_of_keys; ++x)
{
+ uint32_t server;
+
if (hash[x] == ptr->number_of_hosts)
continue; /* Already successfully sent */
- uint32_t server= hash[x] + replica;
+ server= hash[x] + replica;
+
+ /* In case of randomized reads */
+ if (randomize_read) {
+ if ((server + start) <= (hash[x] + ptr->number_of_replicas)) {
+ server += start;
+ }
+ }
+
while (server >= ptr->number_of_hosts)
server -= ptr->number_of_hosts;
}
}
- protocol_binary_request_getk request= {.bytes= {0}};
- request.message.header.request.magic= PROTOCOL_BINARY_REQ;
- if (mget_mode)
- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
- else
- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
-
- request.message.header.request.keylen= htons((uint16_t)key_length[x]);
- request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
- request.message.header.request.bodylen= htonl((uint32_t) key_length[x]);
+ protocol_binary_request_getk request= {
+ .message.header.request= {
+ .magic= PROTOCOL_BINARY_REQ,
+ .opcode= PROTOCOL_BINARY_CMD_GETK,
+ .keylen= htons((uint16_t)key_length[x]),
+ .datatype= PROTOCOL_BINARY_RAW_BYTES,
+ .bodylen= htonl((uint32_t)key_length[x])
+ }
+ };
+ /*
+ * We need to disable buffering to actually know that the request was
+ * successfully sent to the server (so that we should expect a result
+ * back). It would be nice to do this in buffered mode, but then it
+ * would be complex to handle all error situations if we got to send
+ * some of the messages, and then we failed on writing out some others
+ * and we used the callback interface from memcached_mget_execute so
+ * that we might have processed some of the responses etc. For now,
+ * just make sure we work _correctly_
+ */
if ((memcached_io_write(&ptr->hosts[server], request.bytes,
sizeof(request.bytes), 0) == -1) ||
(memcached_io_write(&ptr->hosts[server], keys[x],
- key_length[x], (char) flush) == -1))
+ key_length[x], 1) == -1))
{
memcached_io_reset(&ptr->hosts[server]);
dead_servers[server]= true;
success= false;
continue;
}
- memcached_server_response_increment(&ptr->hosts[server]);
- }
- if (mget_mode)
- {
- /*
- * Send a noop command to flush the buffers
- */
- protocol_binary_request_noop request= {.bytes= {0}};
- request.message.header.request.magic= PROTOCOL_BINARY_REQ;
- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
- request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-
- for (x= 0; x < ptr->number_of_hosts; x++)
- if (memcached_server_response_count(&ptr->hosts[x]))
- {
- if (memcached_io_write(&ptr->hosts[x], request.bytes,
- sizeof(request.bytes), 1) == -1)
- {
- memcached_io_reset(&ptr->hosts[x]);
- dead_servers[x]= true;
- success= false;
- }
- memcached_server_response_increment(&ptr->hosts[x]);
-
- /* mark all of the messages bound for this server as sent! */
- for (x= 0; x < number_of_keys; ++x)
- if (hash[x] == x)
- hash[x]= ptr->number_of_hosts;
- }
+ memcached_server_response_increment(&ptr->hosts[server]);
+ hash[x]= ptr->number_of_hosts;
}
if (success)
static memcached_return binary_mget_by_key(memcached_st *ptr,
unsigned int master_server_key,
bool is_master_key_set,
- const char **keys, size_t *key_length,
- size_t number_of_keys, bool mget_mode)
+ const char * const *keys,
+ const size_t *key_length,
+ size_t number_of_keys,
+ bool mget_mode)
{
memcached_return rc;
hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
rc= replication_binary_mget(ptr, hash, dead_servers, keys,
- key_length, number_of_keys, mget_mode);
+ key_length, number_of_keys);
ptr->call_free(ptr, hash);
ptr->call_free(ptr, dead_servers);