Fixes UDP protocol to just use sendmsg().
authorBrian Aker <brian@tangent.org>
Mon, 2 Jan 2012 02:32:34 +0000 (18:32 -0800)
committerBrian Aker <brian@tangent.org>
Mon, 2 Jan 2012 02:32:34 +0000 (18:32 -0800)
libmemcached/do.cc
libmemcached/error.cc
libmemcached/flush.cc
libmemcached/response.cc
libmemcached/verbosity.cc
tests/libmemcached-1.0/mem_functions.cc
tests/mem_udp.cc

index d289c479fa71b41cb7129d920c7fd77081575cac..3c3519c27b9cccb29df440a9e4719735bb3be424 100644 (file)
@@ -40,7 +40,22 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st instance,
                                  memcached_literal_param("UDP messages was attempted, but vector was not setup for it"));
     }
 
-    return MEMCACHED_NOT_SUPPORTED;
+    struct msghdr msg;
+    memset(&msg, 0, sizeof(msg));
+
+    increment_udp_message_id(instance);
+    vector[0].buffer= instance->write_buffer;
+    vector[0].length= UDP_DATAGRAM_HEADER_LENGTH;
+
+    msg.msg_iov= (struct iovec*)vector;
+    msg.msg_iovlen= count;
+
+    if (::sendmsg(instance->fd, &msg, 0) < 1)
+    {
+      return memcached_set_error(*instance, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
+    }
+
+    return MEMCACHED_SUCCESS;
   }
 
   ssize_t sent_length= memcached_io_writev(instance, vector, count, with_flush);
index f7764d0f6470a636a3ce4eab4015a3640c88f06b..becffd23014b3a10256fa61672f905def9587ab3 100644 (file)
@@ -395,11 +395,11 @@ static void _error_print(const memcached_error_t *error)
 
   if (error->size == 0)
   {
-    fprintf(stderr, "%s\n", memcached_strerror(NULL, error->rc) );
+    fprintf(stderr, "\t%s\n", memcached_strerror(NULL, error->rc) );
   }
   else
   {
-    fprintf(stderr, "%s %s\n", memcached_strerror(NULL, error->rc), error->message);
+    fprintf(stderr, "\t%s %s\n", memcached_strerror(NULL, error->rc), error->message);
   }
 
   _error_print(error->next);
@@ -407,10 +407,19 @@ static void _error_print(const memcached_error_t *error)
 
 void memcached_error_print(const memcached_st *self)
 {
-  if (not self)
+  if (self == NULL)
+  {
     return;
+  }
 
   _error_print(self->error_messages);
+
+  for (uint32_t x= 0; x < memcached_server_count(self); x++)
+  {
+    memcached_server_instance_st instance= memcached_server_instance_by_position(self, x);
+
+    _error_print(instance->error_messages);
+  }
 }
 
 static void _error_free(memcached_error_t *error)
index 44a1429afe8c0cbd7ace71f2cccea2f8e8863c01..6b8ea4b0815a7b6ae4daf7bfd4832a1aff934f94 100644 (file)
 #include <libmemcached/common.h>
 
 static memcached_return_t memcached_flush_binary(memcached_st *ptr, 
-                                                 time_t expiration);
-static memcached_return_t memcached_flush_textual(memcached_st *ptr, 
-                                                  time_t expiration);
-
-memcached_return_t memcached_flush(memcached_st *ptr, time_t expiration)
+                                                 time_t expiration,
+                                                 const bool reply)
 {
-  memcached_return_t rc;
-  if (memcached_failed(rc= initialize_query(ptr, true)))
-  {
-    return rc;
-  }
+  protocol_binary_request_flush request= {};
 
-  LIBMEMCACHED_MEMCACHED_FLUSH_START();
-  if (ptr->flags.binary_protocol)
+  request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
+  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
+  request.message.header.request.extlen= 4;
+  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+  request.message.header.request.bodylen= htonl(request.message.header.request.extlen);
+  request.message.body.expiration= htonl((uint32_t) expiration);
+
+  memcached_return_t rc= MEMCACHED_SUCCESS;
+
+  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
   {
-    rc= memcached_flush_binary(ptr, expiration);
+    memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
+
+    if (reply)
+    {
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
+    }
+    else
+    {
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ;
+    }
+
+    libmemcached_io_vector_st vector[]=
+    {
+      { NULL, 0 },
+      { request.bytes, sizeof(request.bytes) }
+    };
+
+    memcached_return_t rrc;
+    if (memcached_failed(rrc= memcached_vdo(instance, vector, 2, true)))
+    {
+      if (instance->error_messages == NULL or instance->root->error_messages == NULL)
+      {
+        memcached_set_error(*instance, rrc, MEMCACHED_AT);
+      }
+      memcached_io_reset(instance);
+      rc= MEMCACHED_SOME_ERRORS;
+    } 
   }
-  else
+
+  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
   {
-    rc= memcached_flush_textual(ptr, expiration);
+    memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
+
+    if (memcached_server_response_count(instance) > 0)
+    {
+      (void)memcached_response(instance, NULL, 0, NULL);
+    }
   }
-  LIBMEMCACHED_MEMCACHED_FLUSH_END();
 
   return rc;
 }
 
 static memcached_return_t memcached_flush_textual(memcached_st *ptr, 
-                                                  time_t expiration)
+                                                  time_t expiration,
+                                                  const bool reply)
 {
-  // Invert the logic to make it simpler to read the code
-  bool reply= memcached_is_replying(ptr);
-
-  char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+  char buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
   int send_length= 0;
   if (expiration)
   {
     send_length= snprintf(buffer, sizeof(buffer), "%llu", (unsigned long long)expiration);
   }
 
-  if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE or send_length < 0)
+  if (size_t(send_length) >= sizeof(buffer) or send_length < 0)
   {
     return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, 
                                memcached_literal_param("snprintf(MEMCACHED_DEFAULT_COMMAND_SIZE)"));
   }
 
-  libmemcached_io_vector_st vector[]=
-  {
-    { memcached_literal_param("flush_all ") },
-    { buffer, send_length },
-    { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
-    { memcached_literal_param("\r\n") }
-  };
-
   memcached_return_t rc= MEMCACHED_SUCCESS;
   for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
   {
     memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
 
-    memcached_return_t rrc= memcached_vdo(instance, vector, 4, true);
-    if (rrc == MEMCACHED_SUCCESS and reply == true)
+    libmemcached_io_vector_st vector[]=
+    {
+      { NULL, 0 },
+      { memcached_literal_param("flush_all ") },
+      { buffer, send_length },
+      { " noreply", reply ? 0 : memcached_literal_param_size(" noreply") },
+      { memcached_literal_param("\r\n") }
+    };
+
+    memcached_return_t rrc= memcached_vdo(instance, vector, 5, true);
+    if (memcached_success(rrc) and reply == true)
     {
       char response_buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
       rrc= memcached_response(instance, response_buffer, sizeof(response_buffer), NULL);
@@ -105,7 +136,7 @@ static memcached_return_t memcached_flush_textual(memcached_st *ptr,
     if (memcached_failed(rrc))
     {
       // If an error has already been reported, then don't add to it
-      if (instance->error_messages == NULL)
+      if (instance->error_messages == NULL or instance->root->error_messages == NULL)
       {
         memcached_set_error(*instance, rrc, MEMCACHED_AT);
       }
@@ -116,56 +147,26 @@ static memcached_return_t memcached_flush_textual(memcached_st *ptr,
   return rc;
 }
 
-static memcached_return_t memcached_flush_binary(memcached_st *ptr, 
-                                                 time_t expiration)
+memcached_return_t memcached_flush(memcached_st *ptr, time_t expiration)
 {
-  protocol_binary_request_flush request= {};
-
-  request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
-  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
-  request.message.header.request.extlen= 4;
-  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-  request.message.header.request.bodylen= htonl(request.message.header.request.extlen);
-  request.message.body.expiration= htonl((uint32_t) expiration);
-
-  memcached_return_t rc= MEMCACHED_SUCCESS;
-
-  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
+  memcached_return_t rc;
+  if (memcached_failed(rc= initialize_query(ptr, true)))
   {
-    memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
-
-    if (memcached_is_replying(ptr))
-    {
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
-    }
-    else
-    {
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ;
-    }
+    return rc;
+  }
 
-    libmemcached_io_vector_st vector[]=
-    {
-      { request.bytes, sizeof(request.bytes) }
-    };
+  bool reply= memcached_is_replying(ptr);
 
-    memcached_return_t rrc;
-    if ((rrc= memcached_vdo(instance, vector, 1, true)))
-    {
-      memcached_set_error(*instance, rrc, MEMCACHED_AT);
-      memcached_io_reset(instance);
-      rc= MEMCACHED_SOME_ERRORS;
-    } 
+  LIBMEMCACHED_MEMCACHED_FLUSH_START();
+  if (memcached_is_binary(ptr))
+  {
+    rc= memcached_flush_binary(ptr, expiration, reply);
   }
-
-  for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
+  else
   {
-    memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, x);
-
-    if (memcached_server_response_count(instance) > 0)
-    {
-      (void)memcached_response(instance, NULL, 0, NULL);
-    }
+    rc= memcached_flush_textual(ptr, expiration, reply);
   }
+  LIBMEMCACHED_MEMCACHED_FLUSH_END();
 
   return rc;
 }
index 445b92338b29ca5d45d4c6988298dc1600860b99..435cafe7aa50a0c7951be1c80946ff413613e6cc 100644 (file)
@@ -359,8 +359,9 @@ static memcached_return_t textual_read_one_response(memcached_server_write_insta
     break;
   }
 
+  buffer[total_read]= 0;
   return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT,
-                             memcached_literal_param("Could not determine response"));
+                             buffer, total_read);
 }
 
 static memcached_return_t binary_read_one_response(memcached_server_write_instance_st ptr,
index 557fb00564fdbd56311fb819d63dd0238885be29..334a3316288afd971fc22bb75b2093e1347815d9 100644 (file)
@@ -59,7 +59,7 @@ static memcached_return_t _set_verbosity(const memcached_st *,
     memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc_ptr, 0);
 
 
-    rc= memcached_vdo(instance, vector, 3, true);
+    rc= memcached_vdo(instance, vector, 4, true);
 
     if (rc == MEMCACHED_SUCCESS)
     {
@@ -81,11 +81,6 @@ memcached_return_t memcached_verbosity(memcached_st *ptr, uint32_t verbosity)
     return rc;
   }
 
-  if (memcached_is_udp(ptr))
-  {
-    return MEMCACHED_NOT_SUPPORTED;
-  }
-
   memcached_server_fn callbacks[1];
 
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
@@ -99,6 +94,7 @@ memcached_return_t memcached_verbosity(memcached_st *ptr, uint32_t verbosity)
 
   libmemcached_io_vector_st vector[]=
   {
+    { NULL, 0 },
     { memcached_literal_param("verbosity ") },
     { buffer, send_length },
     { memcached_literal_param("\r\n") }
index b2b807b6b1ca0ad4e2975f4dfd07b6d58ef3efb6..dd2abef27d18de63078e232672f244707fe826d4 100644 (file)
@@ -2119,12 +2119,13 @@ static test_return_t fetch_all_results(memcached_st *memc, unsigned int &keys_re
 {
   keys_returned= 0;
 
-  memcached_result_st* result;
-  while ((result= memcached_fetch_result(memc, NULL, &rc)))
+  memcached_result_st* result= NULL;
+  while ((result= memcached_fetch_result(memc, result, &rc)))
   {
     test_compare(MEMCACHED_SUCCESS, rc);
     keys_returned+= 1;
   }
+  memcached_result_free(result);
 
   return TEST_SUCCESS;
 }
index 577dabe86f8dec7ec582c988afbd95e5d9ca9988..1591bdcf1c23f9263e76177bcb4f4d45f9312868 100644 (file)
@@ -208,7 +208,7 @@ static test_return_t version_TEST(memcached_st *memc)
 
 static test_return_t verbosity_TEST(memcached_st *memc)
 {
-  test_compare(MEMCACHED_NOT_SUPPORTED, memcached_verbosity(memc, 0));
+  test_compare(MEMCACHED_SUCCESS, memcached_verbosity(memc, 0));
   return TEST_SUCCESS;
 }
 
@@ -320,17 +320,19 @@ static test_return_t udp_buffered_set_test(memcached_st *memc)
 static test_return_t udp_set_too_big_test(memcached_st *memc)
 {
   test_true(memc);
-  char value[MAX_UDP_DATAGRAM_LENGTH];
   Expected expected_ids;
   get_udp_request_ids(memc, expected_ids);
 
-  memset(value, int('f'), sizeof(value));
+  std::vector<char> value;
+  value.resize(1024 * 1024 * 10);
 
   test_compare_hint(MEMCACHED_WRITE_FAILURE,
-                    memcached_set(memc, test_literal_param("bar"), 
-                                  test_literal_param(value),
+                    memcached_set(memc,
+                                  test_literal_param(__func__), 
+                                  &value[0], value.size(),
                                   time_t(0), uint32_t(0)),
                     memcached_last_error_message(memc));
+  memcached_quit(memc);
 
   return post_udp_op_check(memc, expected_ids);
 }
@@ -401,6 +403,7 @@ static test_return_t udp_flush_test(memcached_st *memc)
   {
     increment_request_id(&expected_ids[x]);
   }
+  memcached_error_print(memc);
   test_compare_hint(MEMCACHED_SUCCESS, memcached_flush(memc, 0), memcached_last_error_message(memc));
 
   return post_udp_op_check(memc, expected_ids);