Add a request id for each request that hits the wire.
[m6w6/libmemcached] / libmemcached / flush.cc
index 9eb17c655a5a4b462943e8da920c7bd6c8647640..0c6141d8c121920a60ed4f32ca5a5880ab64ba81 100644 (file)
 #include <libmemcached/common.h>
 
 static memcached_return_t memcached_flush_binary(memcached_st *ptr, 
-                                                 time_t expiration);
-static memcached_return_t memcached_flush_textual(memcached_st *ptr, 
-                                                  time_t expiration);
-
-memcached_return_t memcached_flush(memcached_st *ptr, time_t expiration)
+                                                 time_t expiration,
+                                                 const bool reply)
 {
-  memcached_return_t rc;
-  if (memcached_failed(rc= initialize_query(ptr)))
-  {
-    return rc;
-  }
+  protocol_binary_request_flush request= {};
 
-  LIBMEMCACHED_MEMCACHED_FLUSH_START();
-  if (ptr->flags.binary_protocol)
+  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
+  request.message.header.request.extlen= 4;
+  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+  request.message.header.request.bodylen= htonl(request.message.header.request.extlen);
+  request.message.body.expiration= htonl((uint32_t) expiration);
+
+  memcached_return_t rc= MEMCACHED_SUCCESS;
+
+  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
   {
-    rc= memcached_flush_binary(ptr, expiration);
+    memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
+    initialize_binary_request(instance, request.message.header);
+
+    if (reply)
+    {
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
+    }
+    else
+    {
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ;
+    }
+
+    libmemcached_io_vector_st vector[]=
+    {
+      { NULL, 0 },
+      { request.bytes, sizeof(request.bytes) }
+    };
+
+    memcached_return_t rrc;
+    if (memcached_failed(rrc= memcached_vdo(instance, vector, 2, true)))
+    {
+      if (instance->error_messages == NULL or instance->root->error_messages == NULL)
+      {
+        memcached_set_error(*instance, rrc, MEMCACHED_AT);
+      }
+      memcached_io_reset(instance);
+      rc= MEMCACHED_SOME_ERRORS;
+    } 
   }
-  else
+
+  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
   {
-    rc= memcached_flush_textual(ptr, expiration);
+    memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
+
+    if (memcached_server_response_count(instance) > 0)
+    {
+      (void)memcached_response(instance, NULL, 0, NULL);
+    }
   }
-  LIBMEMCACHED_MEMCACHED_FLUSH_END();
 
   return rc;
 }
 
 static memcached_return_t memcached_flush_textual(memcached_st *ptr, 
-                                                  time_t expiration)
+                                                  time_t expiration,
+                                                  const bool reply)
 {
-  // Invert the logic to make it simpler to read the code
-  bool reply= (ptr->flags.no_reply) ? false : true;
-
-  char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+  char buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
   int send_length= 0;
   if (expiration)
   {
     send_length= snprintf(buffer, sizeof(buffer), "%llu", (unsigned long long)expiration);
   }
 
-  if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE or send_length < 0)
+  if (size_t(send_length) >= sizeof(buffer) or send_length < 0)
   {
     return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, 
                                memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
   }
 
-  struct libmemcached_io_vector_st vector[]=
-  {
-    { memcached_literal_param("flush_all ") },
-    { buffer, send_length },
-    { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
-    { memcached_literal_param("\r\n") }
-  };
-
   memcached_return_t rc= MEMCACHED_SUCCESS;
   for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
   {
     memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
 
-    memcached_return_t rrc= memcached_vdo(instance, vector, 4, true);
-    if (rrc == MEMCACHED_SUCCESS and reply == true)
+    libmemcached_io_vector_st vector[]=
+    {
+      { NULL, 0 },
+      { memcached_literal_param("flush_all ") },
+      { buffer, send_length },
+      { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
+      { memcached_literal_param("\r\n") }
+    };
+
+    memcached_return_t rrc= memcached_vdo(instance, vector, 5, true);
+    if (memcached_success(rrc) and reply == true)
     {
       char response_buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
       rrc= memcached_response(instance, response_buffer, sizeof(response_buffer), NULL);
@@ -105,7 +136,7 @@ static memcached_return_t memcached_flush_textual(memcached_st *ptr,
     if (memcached_failed(rrc))
     {
       // If an error has already been reported, then don't add to it
-      if (instance->error_messages == NULL)
+      if (instance->error_messages == NULL or instance->root->error_messages == NULL)
       {
         memcached_set_error(*instance, rrc, MEMCACHED_AT);
       }
@@ -116,51 +147,26 @@ static memcached_return_t memcached_flush_textual(memcached_st *ptr,
   return rc;
 }
 
-static memcached_return_t memcached_flush_binary(memcached_st *ptr, 
-                                                 time_t expiration)
+memcached_return_t memcached_flush(memcached_st *ptr, time_t expiration)
 {
-  protocol_binary_request_flush request= {};
-
-  request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
-  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
-  request.message.header.request.extlen= 4;
-  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-  request.message.header.request.bodylen= htonl(request.message.header.request.extlen);
-  request.message.body.expiration= htonl((uint32_t) expiration);
-
-  memcached_return_t rc= MEMCACHED_SUCCESS;
-
-  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
+  memcached_return_t rc;
+  if (memcached_failed(rc= initialize_query(ptr, true)))
   {
-    memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
+    return rc;
+  }
 
-    if (ptr->flags.no_reply)
-    {
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ;
-    }
-    else
-    {
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
-    }
+  bool reply= memcached_is_replying(ptr);
 
-    memcached_return_t rrc;
-    if ((rrc= memcached_do(instance, request.bytes, sizeof(request.bytes), true)))
-    {
-      memcached_set_error(*instance, rrc, MEMCACHED_AT);
-      memcached_io_reset(instance);
-      rc= MEMCACHED_SOME_ERRORS;
-    } 
+  LIBMEMCACHED_MEMCACHED_FLUSH_START();
+  if (memcached_is_binary(ptr))
+  {
+    rc= memcached_flush_binary(ptr, expiration, reply);
   }
-
-  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
+  else
   {
-    memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
-
-    if (memcached_server_response_count(instance) > 0)
-    {
-      (void)memcached_response(instance, NULL, 0, NULL);
-    }
+    rc= memcached_flush_textual(ptr, expiration, reply);
   }
+  LIBMEMCACHED_MEMCACHED_FLUSH_END();
 
   return rc;
 }