libmemcached: add MEMCACHED_BEHAVIOR_META_PROTOCOL
[awesomized/libmemcached] / src / libmemcached / get.cc
index 8801d31d7079b31e6fde72aa72b036a4e711609a..32da9b9aba3cb4217f9e70da53e325160e66a1bf 100644 (file)
@@ -137,7 +137,7 @@ static memcached_return_t mget_by_key_real(memcached_st *ptr, const char *group_
                                            size_t group_key_length, const char *const *keys,
                                              const size_t *key_length, size_t number_of_keys,
                                              const bool mget_mode) {
-  bool failures_occured_in_sending = false;
+  bool failures_occurred_in_sending = false, success_happened = false;
   const char *get_command = "get";
   uint8_t get_command_length = 3;
   unsigned int master_server_key = (unsigned int) -1; /* 0 is a valid server id! */
@@ -198,91 +198,143 @@ static memcached_return_t mget_by_key_real(memcached_st *ptr, const char *group_
                               number_of_keys, mget_mode);
   }
 
-  if (ptr->flags.support_cas) {
-    get_command = "gets";
-    get_command_length = 4;
-  }
+  if (memcached_is_meta(ptr)) {
+    size_t hosts_connected = 0;
 
-  /*
-    If a server fails we warn about errors and start all over with sending keys
-    to the server.
-  */
-  WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
-  size_t hosts_connected = 0;
-  for (uint32_t x = 0; x < number_of_keys; x++) {
-    uint32_t server_key;
-
-    if (is_group_key_set) {
-      server_key = master_server_key;
-    } else {
-      server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
-    }
-
-    memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
+    for (uint32_t x = 0; x < number_of_keys; ++x) {
+      auto io_num = 0;
+      libmemcached_io_vector_st io_vec[8] = {};
+
+      io_vec[io_num++] = {memcached_literal_param("mg ")};
+      io_vec[io_num++] = {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)};
+      io_vec[io_num++] = {keys[x], key_length[x]};
+      if (memcached_is_cas(ptr)) {
+        io_vec[io_num++] = { memcached_literal_param(" c")};
+      }
+      io_vec[io_num++] = {memcached_literal_param(" k t f v\r\n")};
 
-    libmemcached_io_vector_st vector[] = {
-        {get_command, get_command_length},
-        {memcached_literal_param(" ")},
-        {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
-        {keys[x], key_length[x]}};
+      uint32_t server_key;
+      if (is_group_key_set) {
+        server_key = master_server_key;
+      } else {
+        server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
+      }
 
-    if (instance->response_count() == 0) {
-      rc = memcached_connect(instance);
+      auto *instance = memcached_instance_fetch(ptr, server_key);
+      if (!instance->response_count()) {
+        rc = memcached_connect(instance);
 
-      if (memcached_failed(rc)) {
-        memcached_set_error(*instance, rc, MEMCACHED_AT);
-        continue;
+        if (memcached_failed(rc)) {
+          memcached_set_error(*instance, rc, MEMCACHED_AT);
+          continue;
+        }
+        hosts_connected++;
       }
-      hosts_connected++;
-
-      if ((memcached_io_writev(instance, vector, 1, false)) == false) {
-        failures_occured_in_sending = true;
+      if (!memcached_io_writev(instance, io_vec, io_num, false)) {
+        failures_occurred_in_sending = true;
         continue;
       }
-      WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
       memcached_instance_response_increment(instance);
-      WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
     }
 
-    {
-      if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) {
-        memcached_instance_response_reset(instance);
-        failures_occured_in_sending = true;
-        continue;
+    for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
+      memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
+
+      if (instance->response_count()) {
+        /* We need to do something about non-connected hosts in the future */
+        if (!memcached_io_write(instance)) {
+          failures_occurred_in_sending = true;
+        } else {
+          success_happened = true;
+        }
       }
     }
-  }
+  } else {
+    if (ptr->flags.support_cas) {
+      get_command = "gets";
+      get_command_length = 4;
+    }
+
+    /*
+      If a server fails we warn about errors and start all over with sending keys
+      to the server.
+    */
+    WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
+    size_t hosts_connected = 0;
+    for (uint32_t x = 0; x < number_of_keys; x++) {
+      uint32_t server_key;
+
+      if (is_group_key_set) {
+        server_key = master_server_key;
+      } else {
+        server_key = memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
+      }
+
+      memcached_instance_st *instance = memcached_instance_fetch(ptr, server_key);
 
-  if (hosts_connected == 0) {
-    LIBMEMCACHED_MEMCACHED_MGET_END();
+      libmemcached_io_vector_st vector[] = {
+          {get_command, get_command_length},
+          {memcached_literal_param(" ")},
+          {memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace)},
+          {keys[x], key_length[x]}};
+
+      if (instance->response_count() == 0) {
+        rc = memcached_connect(instance);
 
-    if (memcached_failed(rc)) {
-      return rc;
+        if (memcached_failed(rc)) {
+          memcached_set_error(*instance, rc, MEMCACHED_AT);
+          continue;
+        }
+        hosts_connected++;
+
+        if ((memcached_io_writev(instance, vector, 1, false)) == false) {
+          failures_occurred_in_sending = true;
+          continue;
+        }
+        WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
+        memcached_instance_response_increment(instance);
+        WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
+      }
+
+      {
+        if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false) {
+          memcached_instance_response_reset(instance);
+          failures_occurred_in_sending = true;
+          continue;
+        }
+      }
     }
 
-    return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
-  }
+    if (hosts_connected == 0) {
+      LIBMEMCACHED_MEMCACHED_MGET_END();
 
-  /*
-    Should we muddle on if some servers are dead?
-  */
-  bool success_happened = false;
-  for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
-    memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
+      if (memcached_failed(rc)) {
+        return rc;
+      }
 
-    if (instance->response_count()) {
-      /* We need to do something about non-connnected hosts in the future */
-      if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) {
-        failures_occured_in_sending = true;
-      } else {
-        success_happened = true;
+      return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
+    }
+
+    /*
+      Should we muddle on if some servers are dead?
+    */
+    for (uint32_t x = 0; x < memcached_server_count(ptr); x++) {
+      memcached_instance_st *instance = memcached_instance_fetch(ptr, x);
+
+      if (instance->response_count()) {
+        /* We need to do something about non-connnected hosts in the future */
+        if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) {
+          failures_occurred_in_sending = true;
+        } else {
+          success_happened = true;
+        }
       }
     }
   }
 
   LIBMEMCACHED_MEMCACHED_MGET_END();
 
-  if (failures_occured_in_sending and success_happened) {
+  if (failures_occurred_in_sending and success_happened) {
     return MEMCACHED_SOME_ERRORS;
   }