Merging replication
authorBrian Aker <brian@gir-3.local>
Sat, 20 Jun 2009 19:18:01 +0000 (12:18 -0700)
committerBrian Aker <brian@gir-3.local>
Sat, 20 Jun 2009 19:18:01 +0000 (12:18 -0700)
1  2 
libmemcached/memcached.h
libmemcached/memcached_get.c
tests/function.c

Simple merge
index 05b317a6268b79d19b5a711d49dfb05cf4dc3cb1,cdae9922bcb3acda7316edf9b4325516f1800a05..0209b283ab1129ec5b53347e91d1420ca2366e36
@@@ -357,3 -357,147 +357,148 @@@ static memcached_return simple_binary_m
  
    return rc;
  }
 -    if (success) {
+ static memcached_return replication_binary_mget(memcached_st *ptr,
+                                              uint32_t* hash, bool* dead_servers,
+                                              char **keys, size_t *key_length,
+                                              unsigned int number_of_keys)
+ {
+   memcached_return rc= MEMCACHED_NOTFOUND;
+   uint32_t x;
+   int flush= number_of_keys == 1;
+   for (int replica = 0; replica <= ptr->number_of_replicas; ++replica)
+   {
+     bool success= true;    
+     
+     for (uint32_t x= 0; x < number_of_keys; ++x)
+     {
+       if (hash[x] == ptr->number_of_hosts)
+         continue; /* Already successfully sent */
+       uint32_t server= hash[x] + replica;
+       while (server >= ptr->number_of_hosts)
+         server -= ptr->number_of_hosts;
+       if (dead_servers[server])
+         continue;
+       if (memcached_server_response_count(&ptr->hosts[server]) == 0)
+       {
+         rc= memcached_connect(&ptr->hosts[server]);
+         if (rc != MEMCACHED_SUCCESS)
+         {
+           memcached_io_reset(&ptr->hosts[server]);
+           dead_servers[server]= true;
+           success= false;
+           continue;
+         }
+       }
+       protocol_binary_request_getk request= {.bytes= {0}};
+       request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+       if (number_of_keys == 1)
+         request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
+       else
+         request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
+       request.message.header.request.keylen= htons((uint16_t)key_length[x]);
+       request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+       request.message.header.request.bodylen= htonl(key_length[x]);
+       if ((memcached_io_write(&ptr->hosts[server], request.bytes,
+                               sizeof(request.bytes), 0) == -1) ||
+           (memcached_io_write(&ptr->hosts[server], keys[x],
+                               key_length[x], flush) == -1))
+       {
+         memcached_io_reset(&ptr->hosts[server]);
+         dead_servers[server]= true;
+         success= false;
+         continue;
+       }
+       memcached_server_response_increment(&ptr->hosts[server]);
+     }
+     if (number_of_keys > 1)
+     {
+       /*
+        * Send a noop command to flush the buffers
+        */
+       protocol_binary_request_noop request= {.bytes= {0}};
+       request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+       request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
+       request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+       for (x= 0; x < ptr->number_of_hosts; x++)
+         if (memcached_server_response_count(&ptr->hosts[x]))
+         {
+           if (memcached_io_write(&ptr->hosts[x], request.bytes,
+                                  sizeof(request.bytes), 1) == -1)
+           {
+             memcached_io_reset(&ptr->hosts[x]);
+             dead_servers[x]= true;
+             success= false;
+           }
+           memcached_server_response_increment(&ptr->hosts[x]);
+           /* mark all of the messages bound for this server as sent! */
+           for (uint32_t x= 0; x < number_of_keys; ++x)
+             if (hash[x] == x)
+               hash[x]= ptr->number_of_hosts;
+         }
+     }
 -    }
++    if (success)
+       break;
 -  } else {
+   }
+   return rc;
+ }
+ static memcached_return binary_mget_by_key(memcached_st *ptr,
+                                            unsigned int master_server_key,
+                                            bool is_master_key_set,
+                                            char **keys, size_t *key_length,
+                                            unsigned int number_of_keys)
+ {
+   memcached_return rc;
+   if (ptr->number_of_replicas == 0) 
+   {
+     rc= simple_binary_mget(ptr, master_server_key, is_master_key_set,
+                            keys, key_length, number_of_keys);
 -      for (unsigned int x= 0; x < number_of_keys; ++x)
++  } 
++  else 
++  {
+     uint32_t* hash;
+     bool* dead_servers;
+     hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys);
+     dead_servers= ptr->call_calloc(ptr, ptr->number_of_hosts, sizeof(bool));
+     if (hash == NULL || dead_servers == NULL)
+     {
+       ptr->call_free(ptr, hash);
+       ptr->call_free(ptr, dead_servers);
+       return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+     }
+     if (is_master_key_set)
 -      for (unsigned int x= 0; x < number_of_keys; ++x)
++      for (unsigned int x= 0; x < number_of_keys; x++)
+         hash[x]= master_server_key;
+     else
++      for (unsigned int x= 0; x < number_of_keys; x++)
+         hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
+     rc= replication_binary_mget(ptr, hash, dead_servers, keys, 
+                                 key_length, number_of_keys);
+     ptr->call_free(ptr, hash);
+     ptr->call_free(ptr, dead_servers);
+     return MEMCACHED_SUCCESS;
+   }
+   return rc;
+ }
index 99ad776aecc1bc8154c4dc58cca229b885fd925c,7b13b1b6d9b462dd1ff856b0355eabb9567fd13c..5f83e1ea2a16a05d8f5133c189aff8a212fbe172
@@@ -3542,6 -3564,175 +3564,177 @@@ static test_return connection_pool_test
  }
  #endif
  
 -  for (int host= 0; host < clone->number_of_hosts; ++host) {
+ static test_return replication_set_test(memcached_st *memc)
+ {
+   memcached_return rc;
+   memcached_st *clone= memcached_clone(NULL, memc);
+   memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0);
+   rc= memcached_set(memc, "bubba", 5, "0", 1, 0, 0);
+   assert(rc == MEMCACHED_SUCCESS);
+   /*
+   ** "bubba" should now be stored on all of our servers. We don't have an
+   ** easy to use API to address each individual server, so I'll just iterate
+   ** through a bunch of "master keys" and I should most likely hit all of the
+   ** servers...
+   */
+   for (int x= 'a'; x <= 'z'; ++x)
+   {
+     char key[2]= { [0]= (char)x };
+     size_t len;
+     uint32_t flags;
+     char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, 
+                                     &len, &flags, &rc);
+     assert(rc == MEMCACHED_SUCCESS);
+     assert(val != NULL);
+     free(val);
+   }
+   memcached_free(clone);
+   return TEST_SUCCESS;
+ }
+ static test_return replication_get_test(memcached_st *memc)
+ {
+   memcached_return rc;
+   /*
+    * Don't do the following in your code. I am abusing the internal details
+    * within the library, and this is not a supported interface.
+    * This is to verify correct behavior in the library
+    */
+   for (int host= 0; host < memc->number_of_hosts; ++host) {
+     memcached_st *clone= memcached_clone(NULL, memc);
+     clone->hosts[host].port= 0;
+     for (int x= 'a'; x <= 'z'; ++x)
+     {
+       char key[2]= { [0]= (char)x };
+       size_t len;
+       uint32_t flags;
+       char *val= memcached_get_by_key(clone, key, 1, "bubba", 5, 
+                                       &len, &flags, &rc);
+       assert(rc == MEMCACHED_SUCCESS);
+       assert(val != NULL);
+       free(val);
+     }
+     memcached_free(clone);
+   }
+   return TEST_SUCCESS;
+ }
+ static test_return replication_mget_test(memcached_st *memc)
+ {
+   memcached_return rc;
+   memcached_st *clone= memcached_clone(NULL, memc);
+   memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 0);
+   char *keys[]= { "bubba", "key1", "key2", "key3" };
+   size_t len[]= { 5, 4, 4, 4 };
+   for (int x=0; x< 4; ++x)
+   {
+     rc= memcached_set(memc, keys[x], len[x], "0", 1, 0, 0);
+     assert(rc == MEMCACHED_SUCCESS);
+   }
+   /*
+    * Don't do the following in your code. I am abusing the internal details
+    * within the library, and this is not a supported interface.
+    * This is to verify correct behavior in the library
+    */
+   memcached_result_st result_obj;
 -        ++hits;
++  for (int host= 0; host < clone->number_of_hosts; ++host) 
++  {
+     memcached_st *clone= memcached_clone(NULL, memc);
+     clone->hosts[host].port= 0;
+     for (int x= 'a'; x <= 'z'; ++x)
+     {
+       char key[2]= { [0]= (char)x };
+       rc= memcached_mget_by_key(clone, key, 1, keys, len, 4);
+       assert(rc == MEMCACHED_SUCCESS);
+       memcached_result_st *results= memcached_result_create(clone, &result_obj);
+       assert(results);
+       int hits= 0;
+       while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL)
+       {
 -  for (int host= 0; host < clone->number_of_hosts; ++host) {
++        hits++;
+       }
+       assert(hits == 4);
+       memcached_result_free(&result_obj);
+     }
+     memcached_free(clone);
+   }
+   return TEST_SUCCESS;
+ }
+ static test_return replication_delete_test(memcached_st *memc)
+ {
+   memcached_return rc;
+   memcached_st *clone= memcached_clone(NULL, memc);
+   /* Delete the items from all of the servers except 1 */
+   uint64_t repl= memcached_behavior_get(memc,
+                                         MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS);
+   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, --repl);
+   char *keys[]= { "bubba", "key1", "key2", "key3" };
+   size_t len[]= { 5, 4, 4, 4 };
+   for (int x=0; x< 4; ++x)
+   {
+     rc= memcached_delete_by_key(memc, keys[0], len[0], keys[x], len[x], 0);
+     assert(rc == MEMCACHED_SUCCESS);
+   }
+   /*
+    * Don't do the following in your code. I am abusing the internal details
+    * within the library, and this is not a supported interface.
+    * This is to verify correct behavior in the library
+    */
+   int hash= memcached_generate_hash(memc, keys[0], len[0]);
+   for (int x= 0; x < (repl + 1); ++x) {
+     clone->hosts[hash].port= 0;
+     if (++hash == clone->number_of_hosts)
+       hash= 0;
+   }
+   memcached_result_st result_obj;
++  for (int host= 0; host < clone->number_of_hosts; ++host) 
++  {
+     for (int x= 'a'; x <= 'z'; ++x)
+     {
+       char key[2]= { [0]= (char)x };
+       rc= memcached_mget_by_key(clone, key, 1, keys, len, 4);
+       assert(rc == MEMCACHED_SUCCESS);
+       memcached_result_st *results= memcached_result_create(clone, &result_obj);
+       assert(results);
+       int hits= 0;
+       while ((results= memcached_fetch_result(clone, &result_obj, &rc)) != NULL)
+       {
+         ++hits;
+       }
+       assert(hits == 4);
+       memcached_result_free(&result_obj);
+     }
+   }
+   memcached_free(clone);
+   return TEST_SUCCESS;
+ }
  static void increment_request_id(uint16_t *id)
  {
    (*id)++;
@@@ -3554,6 -3745,6 +3747,7 @@@ static uint16_t *get_udp_request_ids(me
    uint16_t *ids= malloc(sizeof(uint16_t) * memc->number_of_hosts);
    assert(ids != NULL);
    unsigned int x;
++
    for (x= 0; x < memc->number_of_hosts; x++)
      ids[x]= get_udp_datagram_request_id((struct udp_datagram_header_st *) memc->hosts[x].write_buffer);
  
@@@ -3565,6 -3756,6 +3759,7 @@@ static test_return post_udp_op_check(me
    unsigned int x;
    memcached_server_st *cur_server = memc->hosts;
    uint16_t *cur_req_ids = get_udp_request_ids(memc);
++
    for (x= 0; x < memc->number_of_hosts; x++)
    {
      assert(cur_server[x].cursor_active == 0);
    }
    free(expected_req_ids);
    free(cur_req_ids);
++
    return TEST_SUCCESS;
  }
  
@@@ -3593,6 -3784,6 +3789,7 @@@ static memcached_return init_udp(memcac
    memcpy(servers, memc->hosts, sizeof(memcached_server_st) * num_hosts);
    for (x= 0; x < num_hosts; x++)
      memcached_server_free(&memc->hosts[x]);
++
    memc->number_of_hosts= 0;
    memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1);
    for (x= 0; x < num_hosts; x++)
      assert(memcached_server_add_udp(memc, servers[x].hostname, servers[x].port) == MEMCACHED_SUCCESS);
      assert(memc->hosts[x].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
    }
++
    return MEMCACHED_SUCCESS;
  }