bool* dead_servers,
const char *const *keys,
const size_t *key_length,
- size_t number_of_keys,
- bool mget_mode)
+ size_t number_of_keys)
{
memcached_return rc= MEMCACHED_NOTFOUND;
uint32_t x;
- int flush= number_of_keys == 1;
-
for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
{
bool success= true;
-
+
for (x= 0; x < number_of_keys; ++x)
{
if (hash[x] == ptr->number_of_hosts)
}
}
- protocol_binary_request_getk request= {.bytes= {0}};
- request.message.header.request.magic= PROTOCOL_BINARY_REQ;
- if (mget_mode)
- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
- else
- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
-
- request.message.header.request.keylen= htons((uint16_t)key_length[x]);
- request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
- request.message.header.request.bodylen= htonl((uint32_t) key_length[x]);
+ protocol_binary_request_getk request= {
+ .message.header.request= {
+ .magic= PROTOCOL_BINARY_REQ,
+ .opcode= PROTOCOL_BINARY_CMD_GETK,
+ .keylen= htons((uint16_t)key_length[x]),
+ .datatype= PROTOCOL_BINARY_RAW_BYTES,
+ .bodylen= htonl((uint32_t)key_length[x])
+ }
+ };
+ /*
+ * We need to disable buffering to actually know that the request was
+ * successfully sent to the server (so that we should expect a result
+ * back). It would be nice to do this in buffered mode, but then it
+ * would be complex to handle all error situations if we got to send
+ * some of the messages, and then we failed on writing out some others
+ * and we used the callback interface from memcached_mget_execute so
+ * that we might have processed some of the responses etc. For now,
+ * just make sure we work _correctly_
+ */
if ((memcached_io_write(&ptr->hosts[server], request.bytes,
sizeof(request.bytes), 0) == -1) ||
(memcached_io_write(&ptr->hosts[server], keys[x],
- key_length[x], (char) flush) == -1))
+ key_length[x], 1) == -1))
{
memcached_io_reset(&ptr->hosts[server]);
dead_servers[server]= true;
success= false;
continue;
}
- /* we just want one pending response per server */
- memcached_server_response_reset(&ptr->hosts[server]);
- memcached_server_response_increment(&ptr->hosts[server]);
- }
- if (mget_mode)
- {
- /*
- * Send a noop command to flush the buffers
- */
- protocol_binary_request_noop request= {.bytes= {0}};
- request.message.header.request.magic= PROTOCOL_BINARY_REQ;
- request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
- request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-
- for (x= 0; x < ptr->number_of_hosts; x++)
- if (memcached_server_response_count(&ptr->hosts[x]))
- {
- if (memcached_io_write(&ptr->hosts[x], request.bytes,
- sizeof(request.bytes), 1) == -1)
- {
- memcached_io_reset(&ptr->hosts[x]);
- dead_servers[x]= true;
- success= false;
- }
-
- /* mark all of the messages bound for this server as sent! */
- for (x= 0; x < number_of_keys; ++x)
- if (hash[x] == x)
- hash[x]= ptr->number_of_hosts;
- }
+ memcached_server_response_increment(&ptr->hosts[server]);
+ hash[x]= ptr->number_of_hosts;
}
if (success)
hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
rc= replication_binary_mget(ptr, hash, dead_servers, keys,
- key_length, number_of_keys, mget_mode);
+ key_length, number_of_keys);
ptr->call_free(ptr, hash);
ptr->call_free(ptr, dead_servers);
return TEST_SUCCESS;
}
+static test_return_t regression_bug_447342(memcached_st *memc)
+{
+ if (memc->number_of_hosts < 3 || pre_replication(memc) != MEMCACHED_SUCCESS)
+ return TEST_SKIPPED;
+
+ memcached_return rc;
+
+ rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 2);
+ assert(rc == MEMCACHED_SUCCESS);
+
+
+ const size_t max_keys= 100;
+ char **keys= calloc(max_keys, sizeof(char*));
+ size_t *key_length=calloc(max_keys, sizeof(size_t));
+
+ for (int x= 0; x < (int)max_keys; ++x)
+ {
+ char k[251];
+ key_length[x]= (size_t)snprintf(k, sizeof(k), "0200%u", x);
+ keys[x]= strdup(k);
+ assert(keys[x] != NULL);
+ rc= memcached_set(memc, k, key_length[x], k, key_length[x], 0, 0);
+ assert(rc == MEMCACHED_SUCCESS);
+ }
+
+ /*
+ ** We are using the quiet commands to store the replicas, so we need
+ ** to ensure that all of them are processed before we can continue.
+ ** In the test we go directly from storing the object to trying to
+ ** receive the object from all of the different servers, so we
+ ** could end up in a race condition (the memcached server hasn't yet
+ ** processed the quiet command from the replication set when it process
+ ** the request from the other client (created by the clone)). As a
+ ** workaround for that we call memcached_quit to send the quit command
+ ** to the server and wait for the response ;-) If you use the test code
+ ** as an example for your own code, please note that you shouldn't need
+ ** to do this ;-)
+ */
+ memcached_quit(memc);
+
+ /* Verify that all messages are stored, and we didn't stuff too much
+ * into the servers
+ */
+ rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ unsigned int counter= 0;
+ memcached_execute_function callbacks[1]= { [0]= &callback_counter };
+ rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+ /* Verify that we received all of the key/value pairs */
+ assert(counter == (unsigned int)max_keys);
+
+ memcached_quit(memc);
+ /*
+ * Don't do the following in your code. I am abusing the internal details
+ * within the library, and this is not a supported interface.
+ * This is to verify correct behavior in the library. Fake that two servers
+ * are dead..
+ */
+ unsigned int port0= memc->hosts[0].port;
+ unsigned int port2= memc->hosts[2].port;
+ memc->hosts[0].port= 0;
+ memc->hosts[2].port= 0;
+
+ rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ counter= 0;
+ rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+ assert(counter == (unsigned int)max_keys);
+
+ /* Release allocated resources */
+ for (size_t x= 0; x < max_keys; ++x)
+ free(keys[x]);
+ free(keys);
+ free(key_length);
+
+ /* restore the memc handle */
+ memc->hosts[0].port= port0;
+ memc->hosts[2].port= port2;
+
+ return TEST_SUCCESS;
+}
+
+
/* Test memcached_server_get_last_disconnect
* For a working server set, shall be NULL
* For a set of non existing server, shall not be NULL
{"lp:434843 buffered", 1, regression_bug_434843_buffered },
{"lp:421108", 1, regression_bug_421108 },
{"lp:442914", 1, regression_bug_442914 },
+ {"lp:447342", 1, regression_bug_447342 },
{0, 0, 0}
};