Added randomized read behavior
[m6w6/libmemcached] / libmemcached / memcached_get.c
index 176be6ad5af2d388b10dc3978c673a409750d7bb..6c730093c9cd14d8407b36f7f6a2b6d45c15f551 100644 (file)
@@ -15,12 +15,12 @@ char *memcached_get(memcached_st *ptr, const char *key,
 }
 
 static memcached_return memcached_mget_by_key_real(memcached_st *ptr, 
-                                       const char *master_key, 
-                                       size_t master_key_length,
-                                       const char **keys, 
-                                       size_t *key_length, 
-                                       size_t number_of_keys,
-                                      bool mget_mode);
+                                                   const char *master_key, 
+                                                   size_t master_key_length,
+                                                   const char * const *keys, 
+                                                   const size_t *key_length, 
+                                                   size_t number_of_keys,
+                                                   bool mget_mode);
 
 char *memcached_get_by_key(memcached_st *ptr, 
                            const char *master_key, 
@@ -42,10 +42,9 @@ char *memcached_get_by_key(memcached_st *ptr,
   }
 
   /* Request the key */
-  *error= memcached_mget_by_key_real(ptr, 
-                                master_key, 
-                                master_key_length, 
-                                (const char **)&key, &key_length, 1, false);
+  *error= memcached_mget_by_key_real(ptr, master_key, master_key_length, 
+                                     (const char * const *)&key, 
+                                     &key_length, 1, false);
 
   value= memcached_fetch(ptr, NULL, NULL, 
                          value_length, flags, error);
@@ -110,7 +109,8 @@ char *memcached_get_by_key(memcached_st *ptr,
 }
 
 memcached_return memcached_mget(memcached_st *ptr, 
-                                const char **keys, size_t *key_length, 
+                                const char * const *keys, 
+                                const size_t *key_length, 
                                 size_t number_of_keys)
 {
   return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
@@ -119,17 +119,18 @@ memcached_return memcached_mget(memcached_st *ptr,
 static memcached_return binary_mget_by_key(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
-                                           const char **keys, size_t *key_length,
+                                           const char * const *keys, 
+                                           const size_t *key_length,
                                            size_t number_of_keys,
                                           bool mget_mode);
 
 static memcached_return memcached_mget_by_key_real(memcached_st *ptr, 
-                                       const char *master_key, 
-                                       size_t master_key_length,
-                                       const char **keys, 
-                                       size_t *key_length, 
-                                       size_t number_of_keys,
-                                      bool mget_mode)
+                                                   const char *master_key, 
+                                                   size_t master_key_length,
+                                                   const char * const *keys, 
+                                                   const size_t *key_length, 
+                                                   size_t number_of_keys,
+                                                   bool mget_mode)
 {
   unsigned int x;
   memcached_return rc= MEMCACHED_NOTFOUND;
@@ -155,7 +156,7 @@ static memcached_return memcached_mget_by_key_real(memcached_st *ptr,
 
   if (master_key && master_key_length)
   {
-    if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
+    if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char * const *)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
       return MEMCACHED_BAD_KEY_PROVIDED;
     master_server_key= memcached_generate_hash(ptr, master_key, master_key_length);
     is_master_key_set= true;
@@ -269,18 +270,60 @@ static memcached_return memcached_mget_by_key_real(memcached_st *ptr,
 memcached_return memcached_mget_by_key(memcached_st *ptr, 
                                        const char *master_key, 
                                        size_t master_key_length,
-                                       const char **keys, 
-                                       size_t *key_length, 
+                                       const char * const *keys, 
+                                       const size_t *key_length, 
                                        size_t number_of_keys)
 {
   return memcached_mget_by_key_real(ptr, master_key, master_key_length, keys, 
                                     key_length, number_of_keys, true);
 }
 
+memcached_return memcached_mget_execute(memcached_st *ptr,
+                                        const char * const *keys,
+                                        const size_t *key_length,
+                                        size_t number_of_keys,
+                                        memcached_execute_function *callback,
+                                        void *context,
+                                        unsigned int number_of_callbacks)
+{
+  return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
+                                       number_of_keys, callback,
+                                       context, number_of_callbacks);
+}
+
+memcached_return memcached_mget_execute_by_key(memcached_st *ptr,
+                                               const char *master_key,
+                                               size_t master_key_length,
+                                               const char * const *keys,
+                                               const size_t *key_length,
+                                               size_t number_of_keys,
+                                               memcached_execute_function *callback,
+                                               void *context,
+                                               unsigned int number_of_callbacks)
+{
+  if ((ptr->flags & MEM_BINARY_PROTOCOL) == 0)
+    return MEMCACHED_NOT_SUPPORTED;
+
+  memcached_return rc;
+  memcached_callback_st *original_callbacks= ptr->callbacks;
+  memcached_callback_st cb= {
+    .callback= callback,
+    .context= context,
+    .number_of_callback= number_of_callbacks
+  };
+
+  ptr->callbacks= &cb;
+  rc= memcached_mget_by_key(ptr, master_key, master_key_length, keys,
+                            key_length, number_of_keys);
+  ptr->callbacks= original_callbacks;
+  return rc;
+}
+
 static memcached_return simple_binary_mget(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
-                                           const char **keys, size_t *key_length, 
+                                           const char * const *keys, 
+                                           const size_t *key_length, 
                                            size_t number_of_keys, bool mget_mode)
 {
   memcached_return rc= MEMCACHED_NOTFOUND;
@@ -338,6 +381,9 @@ static memcached_return simple_binary_mget(memcached_st *ptr,
       rc= MEMCACHED_SOME_ERRORS;
       continue;
     }
+    
+    /* We just want one pending response per server */
+    memcached_server_response_reset(&ptr->hosts[server_key]); 
     memcached_server_response_increment(&ptr->hosts[server_key]);    
     if ((x > 0 && x == ptr->io_key_prefetch) &&
         memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
@@ -371,7 +417,6 @@ static memcached_return simple_binary_mget(memcached_st *ptr,
           memcached_io_reset(&ptr->hosts[x]);
           rc= MEMCACHED_SOME_ERRORS;
         }
-        memcached_server_response_increment(&ptr->hosts[x]);    
       }
     }
 
@@ -380,25 +425,42 @@ static memcached_return simple_binary_mget(memcached_st *ptr,
 }
 
 static memcached_return replication_binary_mget(memcached_st *ptr,
-                                             uint32_t* hash, bool* dead_servers,
-                                             const char **keys, size_t *key_length,
-                                             size_t number_of_keys, bool mget_mode)
+                                                uint32_t* hash, 
+                                                bool* dead_servers,
+                                                const char *const *keys, 
+                                                const size_t *key_length,
+                                                size_t number_of_keys)
 {
   memcached_return rc= MEMCACHED_NOTFOUND;
-  uint32_t x;
+  uint32_t x, start = 0;
+  uint64_t randomize_read = memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
 
-  int flush= number_of_keys == 1;
+  if (randomize_read) {
+    srandom((uint32_t) time(NULL));
+    start = (uint32_t)(random() % (ptr->number_of_replicas + 1));
+  }
 
+  /* Loop for each replica */
   for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
   {
-    bool success= true;    
-    
+    bool success= true;
     for (x= 0; x < number_of_keys; ++x)
     {
+      uint32_t server;
+      
       if (hash[x] == ptr->number_of_hosts)
         continue; /* Already successfully sent */
 
-      uint32_t server= hash[x] + replica;
+      server= hash[x] + replica;
+
+      /* In case of randomized reads */
+      if (randomize_read) {
+        if ((server + start) <= (hash[x] + ptr->number_of_replicas)) {
+          server += start;
+        }
+      }
+      
       while (server >= ptr->number_of_hosts)
         server -= ptr->number_of_hosts;
 
@@ -417,57 +479,39 @@ static memcached_return replication_binary_mget(memcached_st *ptr,
         }
       }
 
-      protocol_binary_request_getk request= {.bytes= {0}};
-      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-      if (mget_mode)
-        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
-      else
-        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
-
-      request.message.header.request.keylen= htons((uint16_t)key_length[x]);
-      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-      request.message.header.request.bodylen= htonl((uint32_t) key_length[x]);
+      protocol_binary_request_getk request= {
+        .message.header.request= {
+          .magic= PROTOCOL_BINARY_REQ,
+          .opcode= PROTOCOL_BINARY_CMD_GETK,
+          .keylen= htons((uint16_t)key_length[x]),
+          .datatype= PROTOCOL_BINARY_RAW_BYTES,
+          .bodylen= htonl((uint32_t)key_length[x])
+        }
+      };
 
+      /*
+       * We need to disable buffering to actually know that the request was
+       * successfully sent to the server (so that we should expect a result
+       * back). It would be nice to do this in buffered mode, but then it
+       * would be complex to handle all error situations if we got to send
+       * some of the messages, and then we failed on writing out some others
+       * and we used the callback interface from memcached_mget_execute so
+       * that we might have processed some of the responses etc. For now,
+       * just make sure we work _correctly_
+       */
       if ((memcached_io_write(&ptr->hosts[server], request.bytes,
                               sizeof(request.bytes), 0) == -1) ||
           (memcached_io_write(&ptr->hosts[server], keys[x],
-                              key_length[x], (char) flush) == -1))
+                              key_length[x], 1) == -1))
       {
         memcached_io_reset(&ptr->hosts[server]);
         dead_servers[server]= true;
         success= false;
         continue;
       }
-      memcached_server_response_increment(&ptr->hosts[server]);
-    }
 
-    if (mget_mode)
-    {
-      /*
-       * Send a noop command to flush the buffers
-       */
-      protocol_binary_request_noop request= {.bytes= {0}};
-      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
-      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-
-      for (x= 0; x < ptr->number_of_hosts; x++)
-        if (memcached_server_response_count(&ptr->hosts[x]))
-        {
-          if (memcached_io_write(&ptr->hosts[x], request.bytes,
-                                 sizeof(request.bytes), 1) == -1)
-          {
-            memcached_io_reset(&ptr->hosts[x]);
-            dead_servers[x]= true;
-            success= false;
-          }
-          memcached_server_response_increment(&ptr->hosts[x]);
-
-          /* mark all of the messages bound for this server as sent! */
-          for (x= 0; x < number_of_keys; ++x)
-            if (hash[x] == x)
-              hash[x]= ptr->number_of_hosts;
-        }
+      memcached_server_response_increment(&ptr->hosts[server]);
+      hash[x]= ptr->number_of_hosts;
     }
 
     if (success)
@@ -480,8 +524,10 @@ static memcached_return replication_binary_mget(memcached_st *ptr,
 static memcached_return binary_mget_by_key(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
-                                           const char **keys, size_t *key_length,
-                                           size_t number_of_keys, bool mget_mode)
+                                           const char * const *keys, 
+                                           const size_t *key_length,
+                                           size_t number_of_keys, 
+                                           bool mget_mode)
 {
   memcached_return rc;
 
@@ -513,7 +559,7 @@ static memcached_return binary_mget_by_key(memcached_st *ptr,
         hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
 
     rc= replication_binary_mget(ptr, hash, dead_servers, keys, 
-                                key_length, number_of_keys, mget_mode);
+                                key_length, number_of_keys);
 
     ptr->call_free(ptr, hash);
     ptr->call_free(ptr, dead_servers);