Fix problems with multigets and replication
authorTrond Norbye <trond.norbye@sun.com>
Wed, 14 Oct 2009 13:18:07 +0000 (15:18 +0200)
committerTrond Norbye <trond.norbye@sun.com>
Wed, 14 Oct 2009 13:18:07 +0000 (15:18 +0200)
The current code could go out of sync and send multiple "NOOP" to the same
server depending on the replica number to get, and the fetch-code isn't
capable of handling that. The memcached_mget_execute code also complcates this
because it could process some of the messages while receiving them so we
cannot reset the get sequence if we are able to send out the commands but
not the NOOP... To create a sane patch for this I disabled the buffered mode.
It will increase the number of packets sent to the server, but at least we
have a well defined behavior.

libmemcached/memcached_get.c
tests/function.c

index 0a47f63d74b413b6c25e2f1c9a9eac19e30f1ee7..cf0d210d27c2444842aac08f237bc81780d6a268 100644 (file)
@@ -416,18 +416,15 @@ static memcached_return replication_binary_mget(memcached_st *ptr,
                                                 bool* dead_servers,
                                                 const char *const *keys, 
                                                 const size_t *key_length,
-                                                size_t number_of_keys, 
-                                                bool mget_mode)
+                                                size_t number_of_keys)
 {
   memcached_return rc= MEMCACHED_NOTFOUND;
   uint32_t x;
 
-  int flush= number_of_keys == 1;
-
   for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
   {
     bool success= true;    
-    
+
     for (x= 0; x < number_of_keys; ++x)
     {
       if (hash[x] == ptr->number_of_hosts)
@@ -452,58 +449,39 @@ static memcached_return replication_binary_mget(memcached_st *ptr,
         }
       }
 
-      protocol_binary_request_getk request= {.bytes= {0}};
-      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-      if (mget_mode)
-        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
-      else
-        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
-
-      request.message.header.request.keylen= htons((uint16_t)key_length[x]);
-      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-      request.message.header.request.bodylen= htonl((uint32_t) key_length[x]);
+      protocol_binary_request_getk request= {
+        .message.header.request= {
+          .magic= PROTOCOL_BINARY_REQ,
+          .opcode= PROTOCOL_BINARY_CMD_GETK,
+          .keylen= htons((uint16_t)key_length[x]),
+          .datatype= PROTOCOL_BINARY_RAW_BYTES,
+          .bodylen= htonl((uint32_t)key_length[x])
+        }
+      };
 
+      /*
+       * We need to disable buffering to actually know that the request was
+       * successfully sent to the server (so that we should expect a result
+       * back). It would be nice to do this in buffered mode, but then it
+       * would be complex to handle all error situations if we got to send
+       * some of the messages, and then we failed on writing out some others
+       * and we used the callback interface from memcached_mget_execute so
+       * that we might have processed some of the responses etc. For now,
+       * just make sure we work _correctly_
+       */
       if ((memcached_io_write(&ptr->hosts[server], request.bytes,
                               sizeof(request.bytes), 0) == -1) ||
           (memcached_io_write(&ptr->hosts[server], keys[x],
-                              key_length[x], (char) flush) == -1))
+                              key_length[x], 1) == -1))
       {
         memcached_io_reset(&ptr->hosts[server]);
         dead_servers[server]= true;
         success= false;
         continue;
       }
-      /* we just want one pending response per server */
-      memcached_server_response_reset(&ptr->hosts[server]); 
-      memcached_server_response_increment(&ptr->hosts[server]);
-    }
 
-    if (mget_mode)
-    {
-      /*
-       * Send a noop command to flush the buffers
-       */
-      protocol_binary_request_noop request= {.bytes= {0}};
-      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
-      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-
-      for (x= 0; x < ptr->number_of_hosts; x++)
-        if (memcached_server_response_count(&ptr->hosts[x]))
-        {
-          if (memcached_io_write(&ptr->hosts[x], request.bytes,
-                                 sizeof(request.bytes), 1) == -1)
-          {
-            memcached_io_reset(&ptr->hosts[x]);
-            dead_servers[x]= true;
-            success= false;
-          }
-
-          /* mark all of the messages bound for this server as sent! */
-          for (x= 0; x < number_of_keys; ++x)
-            if (hash[x] == x)
-              hash[x]= ptr->number_of_hosts;
-        }
+      memcached_server_response_increment(&ptr->hosts[server]);
+      hash[x]= ptr->number_of_hosts;
     }
 
     if (success)
@@ -551,7 +529,7 @@ static memcached_return binary_mget_by_key(memcached_st *ptr,
         hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
 
     rc= replication_binary_mget(ptr, hash, dead_servers, keys, 
-                                key_length, number_of_keys, mget_mode);
+                                key_length, number_of_keys);
 
     ptr->call_free(ptr, hash);
     ptr->call_free(ptr, dead_servers);
index 3ff842496d03357cfe03904a49f197773acb67da..679dfce579861d971d6ecce4e0fac8c07a179e1e 100644 (file)
@@ -4797,6 +4797,91 @@ static test_return_t regression_bug_442914(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
+static test_return_t regression_bug_447342(memcached_st *memc)
+{
+  if (memc->number_of_hosts < 3 || pre_replication(memc) != MEMCACHED_SUCCESS)
+    return TEST_SKIPPED;
+
+  memcached_return rc;
+
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 2);
+  assert(rc == MEMCACHED_SUCCESS);
+
+
+  const size_t max_keys= 100;
+  char **keys= calloc(max_keys, sizeof(char*));
+  size_t *key_length=calloc(max_keys, sizeof(size_t));
+
+  for (int x= 0; x < (int)max_keys; ++x)
+  {
+     char k[251];
+     key_length[x]= (size_t)snprintf(k, sizeof(k), "0200%u", x);
+     keys[x]= strdup(k);
+     assert(keys[x] != NULL);
+     rc= memcached_set(memc, k, key_length[x], k, key_length[x], 0, 0);
+     assert(rc == MEMCACHED_SUCCESS);
+  }
+
+  /*
+  ** We are using the quiet commands to store the replicas, so we need
+  ** to ensure that all of them are processed before we can continue.
+  ** In the test we go directly from storing the object to trying to
+  ** receive the object from all of the different servers, so we
+  ** could end up in a race condition (the memcached server hasn't yet
+  ** processed the quiet command from the replication set when it process
+  ** the request from the other client (created by the clone)). As a
+  ** workaround for that we call memcached_quit to send the quit command
+  ** to the server and wait for the response ;-) If you use the test code
+  ** as an example for your own code, please note that you shouldn't need
+  ** to do this ;-)
+  */
+  memcached_quit(memc);
+  
+  /* Verify that all messages are stored, and we didn't stuff too much 
+   * into the servers
+   */
+  rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  unsigned int counter= 0;
+  memcached_execute_function callbacks[1]= { [0]= &callback_counter };
+  rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+  /* Verify that we received all of the key/value pairs */
+  assert(counter == (unsigned int)max_keys);
+
+  memcached_quit(memc);
+  /*
+   * Don't do the following in your code. I am abusing the internal details
+   * within the library, and this is not a supported interface.
+   * This is to verify correct behavior in the library. Fake that two servers
+   * are dead..
+   */
+  unsigned int port0= memc->hosts[0].port;
+  unsigned int port2= memc->hosts[2].port;
+  memc->hosts[0].port= 0;
+  memc->hosts[2].port= 0;
+
+  rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  counter= 0;
+  rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+  assert(counter == (unsigned int)max_keys);
+
+  /* Release allocated resources */
+  for (size_t x= 0; x < max_keys; ++x)
+    free(keys[x]);
+  free(keys);
+  free(key_length);
+
+  /* restore the memc handle */
+  memc->hosts[0].port= port0;
+  memc->hosts[2].port= port2;
+
+  return TEST_SUCCESS;
+}
+
+
 /* Test memcached_server_get_last_disconnect
  * For a working server set, shall be NULL
  * For a set of non existing server, shall not be NULL
@@ -5064,6 +5149,7 @@ test_st regression_tests[]= {
   {"lp:434843 buffered", 1, regression_bug_434843_buffered },
   {"lp:421108", 1, regression_bug_421108 },
   {"lp:442914", 1, regression_bug_442914 },
+  {"lp:447342", 1, regression_bug_447342 },
   {0, 0, 0}
 };