udp support in fire and forget mode for all ops but get/gets, stat and version
authorEric Lambert <eric.lambert@sun.com>
Mon, 9 Mar 2009 21:59:06 +0000 (22:59 +0100)
committerEric Lambert <eric.lambert@sun.com>
Mon, 9 Mar 2009 21:59:06 +0000 (22:59 +0100)
24 files changed:
clients/client_options.h
clients/memslap.c
clients/utilities.c
libmemcached/common.h
libmemcached/memcached.c
libmemcached/memcached_auto.c
libmemcached/memcached_behavior.c
libmemcached/memcached_constants.h
libmemcached/memcached_delete.c
libmemcached/memcached_do.c
libmemcached/memcached_fetch.c
libmemcached/memcached_flush.c
libmemcached/memcached_get.c
libmemcached/memcached_hosts.c
libmemcached/memcached_io.c
libmemcached/memcached_io.h
libmemcached/memcached_quit.c
libmemcached/memcached_server.c
libmemcached/memcached_stats.c
libmemcached/memcached_storage.c
libmemcached/memcached_verbosity.c
libmemcached/memcached_version.c
tests/function.c
tests/server.c

index a343133666b9377d7c7868446839f4c0442c8640..a4f0f47fa29130cdd575f71c29d8b4da2be52e05 100644 (file)
@@ -24,6 +24,7 @@ typedef enum {
   OPT_FLUSH,
   OPT_HASH,
   OPT_BINARY,
+  OPT_UDP
 } memcached_options;
 
 #endif /* CLIENT_OPTIONS */
index 7aa69eadf543f7b039269e89c038857a2b2a578f..cd170eb172ddcb75519bbfdf786557a2942c82a6 100644 (file)
@@ -77,6 +77,7 @@ static unsigned int opt_createial_load= 0;
 static unsigned int opt_concurrency= 0;
 static int opt_displayflag= 0;
 static char *opt_servers= NULL;
+static int opt_udp_io= 0;
 test_type opt_test= SET_TEST;
 
 int main(int argc, char *argv[])
@@ -139,6 +140,15 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
                               PTHREAD_CREATE_DETACHED);
 
   memc= memcached_create(NULL);
+
+  /* We need to set udp behavior before adding servers to the client */
+  if (opt_udp_io)
+  {
+    memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, opt_udp_io);
+    unsigned int i= 0;
+    for(i= 0; i < servers[0].count; i++ )
+      servers[i].type= MEMCACHED_CONNECTION_UDP;
+  }
   memcached_server_push(memc, servers);
 
   memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
@@ -240,6 +250,7 @@ void options_parse(int argc, char *argv[])
       {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
       {"version", no_argument, NULL, OPT_VERSION},
       {"binary", no_argument, NULL, OPT_BINARY},
+      {"udp", no_argument, NULL, OPT_UDP},
       {0, 0, 0, 0},
     };
 
@@ -254,6 +265,15 @@ void options_parse(int argc, char *argv[])
     {
     case 0:
       break;
+    case OPT_UDP:
+      if (opt_test == GET_TEST)
+      {
+        fprintf(stderr, "You can not run a get test in UDP mode. UDP mode "
+                  "does not currently support get ops.\n");
+        exit(1);
+      }
+      opt_udp_io= 1;
+      break;
     case OPT_BINARY:
       opt_binary = 1;
       break;
@@ -274,7 +294,15 @@ void options_parse(int argc, char *argv[])
       break;
     case OPT_SLAP_TEST:
       if (!strcmp(optarg, "get"))
+      {
+        if (opt_udp_io == 1)
+        {
+          fprintf(stderr, "You can not run a get test in UDP mode. UDP mode "
+                  "does not currently support get ops.\n");
+          exit(1);
+        }
         opt_test= GET_TEST ;
+      }
       else if (!strcmp(optarg, "set"))
         opt_test= SET_TEST;
       else 
index adab23fa8e2fe92a4e5c385b663bb5f7f30f52d2..529955ce4662b812d55e8446b9f92e10719306c6 100644 (file)
@@ -45,6 +45,7 @@ static char *lookup_help(memcached_options option)
   case OPT_HASH: return("Select hash type.");
   case OPT_BINARY: return("Switch to binary protocol.");
   case OPT_ANALYZE: return("Analyze the provided servers.");
+  case OPT_UDP: return("Use UDP protocol when communicating with server.");
   };
 
   WATCHPOINT_ASSERT(0);
index 54b33a20fcee244de56ee3b60123c7ef4460a797..d91c6b302ec1bf7d4586832d56dd0be4b62144d5 100644 (file)
@@ -76,7 +76,8 @@ typedef enum {
   MEM_KETAMA_WEIGHTED= (1 << 11),
   MEM_BINARY_PROTOCOL= (1 << 12),
   MEM_HASH_WITH_PREFIX_KEY= (1 << 13),
-  MEM_NOREPLY= (1 << 14)
+  MEM_NOREPLY= (1 << 14),
+  MEM_USE_UDP= (1 << 15)
 } memcached_flags;
 
 /* Hashing algo */
index 1144115f6a6e3d9cde4525a7a1742bde33f27b1c..a15cd2f2824adf60bf00d091f8998e9973b849f9 100644 (file)
@@ -87,17 +87,6 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *source)
   if (new_clone == NULL)
     return NULL;
 
-  if (source->hosts)
-    rc= memcached_server_push(new_clone, source->hosts);
-
-  if (rc != MEMCACHED_SUCCESS)
-  {
-    memcached_free(new_clone);
-
-    return NULL;
-  }
-
-
   new_clone->flags= source->flags;
   new_clone->send_size= source->send_size;
   new_clone->recv_size= source->recv_size;
@@ -120,6 +109,17 @@ memcached_st *memcached_clone(memcached_st *clone, memcached_st *source)
   new_clone->get_key_failure= source->get_key_failure;
   new_clone->delete_trigger= source->delete_trigger;
 
+  if (source->hosts)
+    rc= memcached_server_push(new_clone, source->hosts);
+
+  if (rc != MEMCACHED_SUCCESS)
+  {
+    memcached_free(new_clone);
+
+    return NULL;
+  }
+
+
   if (source->prefix_key[0] != 0)
   {
     strcpy(new_clone->prefix_key, source->prefix_key);
index 9808ab65e8e98f815f5f3218d4f2b91f76d1d652..787ef6875213b57147566ef60dae26308e8ea390 100644 (file)
@@ -10,6 +10,7 @@ static memcached_return memcached_auto(memcached_st *ptr,
   memcached_return rc;
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
   unsigned int server_key;
+  bool no_reply= (ptr->flags & MEM_NOREPLY);
 
   unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0)
     return MEMCACHED_NO_SERVERS;
@@ -20,15 +21,15 @@ static memcached_return memcached_auto(memcached_st *ptr,
   server_key= memcached_generate_hash(ptr, key, key_length);
 
   send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
-                        "%s %s%.*s %u\r\n", verb, 
+                        "%s %s%.*s %u%s\r\n", verb,
                         ptr->prefix_key,
                         (int)key_length, key,
-                        offset);
+                        offset, no_reply ? " noreply" : "");
   unlikely (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
     return MEMCACHED_WRITE_FAILURE;
 
   rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
-  if (rc != MEMCACHED_SUCCESS)
+  if (no_reply || rc != MEMCACHED_SUCCESS)
     return rc;
 
   rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
@@ -64,12 +65,20 @@ static memcached_return binary_incr_decr(memcached_st *ptr, uint8_t cmd,
                                          uint32_t offset, uint64_t *value) 
 {
   unsigned int server_key;
+  bool no_reply= (ptr->flags & MEM_NOREPLY);
 
   unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0)
     return MEMCACHED_NO_SERVERS;
 
   server_key= memcached_generate_hash(ptr, key, key_length);
 
+  if (no_reply)
+  {
+    if(cmd == PROTOCOL_BINARY_CMD_DECREMENT)
+      cmd= PROTOCOL_BINARY_CMD_DECREMENTQ;
+    if(cmd == PROTOCOL_BINARY_CMD_INCREMENT)
+      cmd= PROTOCOL_BINARY_CMD_INCREMENTQ;
+  }
   protocol_binary_request_incr request= {.bytes= {0}};
 
   request.message.header.request.magic= PROTOCOL_BINARY_REQ;
@@ -88,7 +97,9 @@ static memcached_return binary_incr_decr(memcached_st *ptr, uint8_t cmd,
     memcached_io_reset(&ptr->hosts[server_key]);
     return MEMCACHED_WRITE_FAILURE;
   }
+
+  if (no_reply)
+    return MEMCACHED_SUCCESS;
   return memcached_response(&ptr->hosts[server_key], (char*)value, sizeof(*value), NULL);
 }
 
index f23415507ced1525fa99f3bea346e88ae2db258e..e8f65ddf6eb3a23d45c20feb47045600dff1d5d4 100644 (file)
@@ -5,7 +5,7 @@
 #include <netinet/tcp.h>
 
 /* 
-  This function is used to modify the behabior of running client.
+  This function is used to modify the behavior of running client.
 
   We quit all connections so we can reset the sockets.
 */
@@ -54,6 +54,13 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
     set_behavior_flag(ptr, MEM_BUFFER_REQUESTS, data);
     memcached_quit(ptr);
     break;
+  case MEMCACHED_BEHAVIOR_USE_UDP:
+    if (ptr->number_of_hosts)
+      return MEMCACHED_FAILURE;
+    set_behavior_flag(ptr, MEM_USE_UDP, data);
+    if (data)
+      set_behavior_flag(ptr,MEM_NOREPLY,data);
+    break;
   case MEMCACHED_BEHAVIOR_TCP_NODELAY:
     set_behavior_flag(ptr, MEM_TCP_NODELAY, data);
     memcached_quit(ptr);
@@ -170,6 +177,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
   case MEMCACHED_BEHAVIOR_BUFFER_REQUESTS:
     temp_flag= MEM_BUFFER_REQUESTS;
     break;
+  case MEMCACHED_BEHAVIOR_USE_UDP:
+    temp_flag= MEM_USE_UDP;
+    break;
   case MEMCACHED_BEHAVIOR_TCP_NODELAY:
     temp_flag= MEM_TCP_NODELAY;
     break;
index fc47a3b6e96835f2937c6d0f7f8be6f10962b10d..1af6aa4a9595e0a02d417a607b8ec9b14d90a963 100644 (file)
@@ -61,6 +61,7 @@ typedef enum {
   MEMCACHED_TIMEOUT,
   MEMCACHED_BUFFERED,
   MEMCACHED_BAD_KEY_PROVIDED,
+  MEMCACHED_INVALID_HOST_PROTOCOL,
   MEMCACHED_MAXIMUM_RETURN /* Always add new error code before */
 } memcached_return;
 
@@ -97,7 +98,8 @@ typedef enum {
   MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK,
   MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK,
   MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY,
-  MEMCACHED_BEHAVIOR_NOREPLY
+  MEMCACHED_BEHAVIOR_NOREPLY,
+  MEMCACHED_BEHAVIOR_USE_UDP
 } memcached_behavior;
 
 typedef enum {
index 84b6160c2fe00a4058aed9013df56fc981d2f741..f2fab3221a53cca25203b936d100d86460c5a259 100644 (file)
@@ -36,6 +36,7 @@ memcached_return memcached_delete_by_key(memcached_st *ptr,
 
   server_key= memcached_generate_hash(ptr, master_key, master_key_length);
   to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
+  bool no_reply= (ptr->flags & MEM_NOREPLY);
      
   if (ptr->flags & MEM_BINARY_PROTOCOL) 
     rc= binary_delete(ptr, server_key, key, key_length, to_write);
@@ -43,22 +44,30 @@ memcached_return memcached_delete_by_key(memcached_st *ptr,
   {
     if (expiration)
       send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
-                            "delete %s%.*s %u\r\n", 
+                            "delete %s%.*s %u%s\r\n",
                             ptr->prefix_key,
                             (int)key_length, key, 
-                            (uint32_t)expiration);
+                            (uint32_t)expiration, no_reply ? " noreply" :"" );
     else
        send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
-                             "delete %s%.*s\r\n", 
+                             "delete %s%.*s%s\r\n",
                              ptr->prefix_key,
-                             (int)key_length, key);
+                             (int)key_length, key, no_reply ? " noreply" :"");
     
     if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) 
     {
       rc= MEMCACHED_WRITE_FAILURE;
       goto error;
     }
-     
+
+    if (ptr->flags & MEM_USE_UDP && !to_write)
+    {
+      if (send_length > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+        return MEMCACHED_WRITE_FAILURE;
+      if (send_length + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+        memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
+    }
+    
     rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
   }
 
@@ -66,10 +75,8 @@ memcached_return memcached_delete_by_key(memcached_st *ptr,
     goto error;
 
   if ((ptr->flags & MEM_BUFFER_REQUESTS))
-  {
     rc= MEMCACHED_BUFFERED;
-  }
-  else
+  else if (!no_reply)
   {
     rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
     if (rc == MEMCACHED_DELETED)
@@ -93,10 +100,22 @@ static inline memcached_return binary_delete(memcached_st *ptr,
   protocol_binary_request_delete request= {.bytes= {0}};
 
   request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE;
+  if (ptr->flags & MEM_NOREPLY)
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETEQ;
+  else
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE;
   request.message.header.request.keylen= htons((uint16_t)key_length);
   request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
   request.message.header.request.bodylen= htonl(key_length);
+
+  if (ptr->flags & MEM_USE_UDP && !flush)
+  {
+    size_t cmd_size= sizeof(request.bytes) + key_length;
+    if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+      return MEMCACHED_WRITE_FAILURE;
+    if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+      memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
+  }
   
   if ((memcached_do(&ptr->hosts[server_key], request.bytes, 
                     sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
index c364efe8c6a12bb5126e73296a47de7cb3e1a04e..b46805a02acc0c6219beb2620c7ef7bbf8b45929 100644 (file)
@@ -15,6 +15,14 @@ memcached_return memcached_do(memcached_server_st *ptr, const void *command,
     return rc;
   }
 
+  /*
+  ** Since non buffering ops in UDP mode dont check to make sure they will fit
+  ** before they start writing, if there is any data in buffer, clear it out,
+  ** otherwise we might get a partial write.
+  **/
+  if (ptr->type == MEMCACHED_CONNECTION_UDP && with_flush && ptr->write_buffer_offset > UDP_DATAGRAM_HEADER_LENGTH)
+    memcached_io_write(ptr, NULL, 0, 1);
+
   sent_length= memcached_io_write(ptr, command, command_length, with_flush);
 
   if (sent_length == -1 || (size_t)sent_length != command_length)
index f728bebc45fe690504aaa233ae064f51cd780e5c..e810b64ae326608c0d222699d9f4c72271f98be6 100644 (file)
@@ -14,6 +14,9 @@ memcached_return value_fetch(memcached_server_st *ptr,
   size_t to_read;
   char *value_ptr;
 
+  if (ptr->root->flags & MEM_USE_UDP)
+    return MEMCACHED_NOT_SUPPORTED;
+
   WATCHPOINT_ASSERT(ptr->root);
   end_ptr= buffer + MEMCACHED_DEFAULT_COMMAND_SIZE;
 
@@ -133,6 +136,12 @@ char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length,
 {
   memcached_result_st *result_buffer= &ptr->result;
 
+  if (ptr->flags & MEM_USE_UDP)
+  {
+    *error= MEMCACHED_NOT_SUPPORTED;
+    return NULL;
+  }
+
   while (ptr->cursor_server < ptr->number_of_hosts)
   {
     char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
@@ -184,6 +193,13 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr,
                                             memcached_result_st *result,
                                             memcached_return *error)
 {
+
+  if (ptr->flags & MEM_USE_UDP)
+  {
+    *error= MEMCACHED_NOT_SUPPORTED;
+    return NULL;
+  }
+  
   if (result == NULL)
     result= memcached_result_create(ptr, NULL);
 
index 4e048d00260e279858493167f98e05831797b967..55a6ca716de10df28486015751548e3716c1dc01 100644 (file)
@@ -31,16 +31,18 @@ static memcached_return memcached_flush_textual(memcached_st *ptr,
 
   for (x= 0; x < ptr->number_of_hosts; x++)
   {
+    bool no_reply= (ptr->flags & MEM_NOREPLY);
     if (expiration)
       send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
-                            "flush_all %llu\r\n", (unsigned long long)expiration);
+                            "flush_all %llu%s\r\n",
+                            (unsigned long long)expiration, no_reply ? " noreply" : "");
     else
       send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
-                            "flush_all\r\n");
+                            "flush_all%s\r\n", no_reply ? " noreply" : "");
 
     rc= memcached_do(&ptr->hosts[x], buffer, send_length, 1);
 
-    if (rc == MEMCACHED_SUCCESS)
+    if (rc == MEMCACHED_SUCCESS && !no_reply)
       (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
   }
 
@@ -65,6 +67,10 @@ static memcached_return memcached_flush_binary(memcached_st *ptr,
 
   for (x= 0; x < ptr->number_of_hosts; x++)
   {
+    if (ptr->flags & MEM_NOREPLY)
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSHQ;
+    else
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
     if (memcached_do(&ptr->hosts[x], request.bytes, 
                      sizeof(request.bytes), 1) != MEMCACHED_SUCCESS) 
     {
index 2cc40151b4bcf4911549af13eb9e6b10e692c62a..3305a772a01a3ad7825299bb8723d4c2d8a57ffe 100644 (file)
@@ -27,6 +27,12 @@ char *memcached_get_by_key(memcached_st *ptr,
   uint32_t dummy_flags;
   memcached_return dummy_error;
 
+  if (ptr->flags & MEM_USE_UDP)
+  {
+    *error= MEMCACHED_NOT_SUPPORTED;
+    return NULL;
+  }
+
   /* Request the key */
   *error= memcached_mget_by_key(ptr, 
                                 master_key, 
@@ -120,6 +126,9 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
   uint8_t get_command_length= 4;
   unsigned int master_server_key= 0;
 
+   if (ptr->flags & MEM_USE_UDP)
+    return MEMCACHED_NOT_SUPPORTED;
+
   LIBMEMCACHED_MEMCACHED_MGET_START();
   ptr->cursor_server= 0;
 
index c0d0f0367c99c84295585db213792bc3151aebad..2bcd189c15ab3fce95eba0970a039743b848af3b 100644 (file)
@@ -248,6 +248,11 @@ memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *l
 
   for (x= 0; x < count; x++)
   {
+    if ((ptr->flags & MEM_USE_UDP && list[x].type != MEMCACHED_CONNECTION_UDP)
+            || ((list[x].type == MEMCACHED_CONNECTION_UDP)
+            && ! (ptr->flags & MEM_USE_UDP)) )
+      return MEMCACHED_INVALID_HOST_PROTOCOL;
+
     WATCHPOINT_ASSERT(list[x].hostname[0] != 0);
     memcached_server_create(ptr, &ptr->hosts[ptr->number_of_hosts]);
     /* TODO check return type */
@@ -313,7 +318,7 @@ memcached_return memcached_server_add_with_weight(memcached_st *ptr,
     port= MEMCACHED_DEFAULT_PORT; 
 
   if (!hostname)
-    hostname= "localhost"; 
+    hostname= "localhost";
 
   return server_add(ptr, hostname, port, weight, MEMCACHED_CONNECTION_TCP);
 }
@@ -325,6 +330,10 @@ static memcached_return server_add(memcached_st *ptr, const char *hostname,
 {
   memcached_server_st *new_host_list;
 
+  if ( (ptr->flags & MEM_USE_UDP && type != MEMCACHED_CONNECTION_UDP)
+      || ( (type == MEMCACHED_CONNECTION_UDP) && !(ptr->flags & MEM_USE_UDP) ) )
+    return MEMCACHED_INVALID_HOST_PROTOCOL;
+  
   if (ptr->call_realloc)
     new_host_list= (memcached_server_st *)ptr->call_realloc(ptr, ptr->hosts, 
                                                             sizeof(memcached_server_st) * (ptr->number_of_hosts+1));
index 72a02a0f97f52b030b3c1ab45cefa84b742c4bf4..dc12ebe7d405cc1630444f19d730abc86a5ea45e 100644 (file)
@@ -13,6 +13,7 @@ typedef enum {
 } memc_read_or_write;
 
 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
+static void increment_udp_message_id(memcached_server_st *ptr);
 
 static memcached_return io_wait(memcached_server_st *ptr,
                                 memc_read_or_write read_or_write)
@@ -194,18 +195,30 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
   {
     char *write_ptr;
     size_t should_write;
+    size_t buffer_end;
 
-    should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
-    write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
-
-    should_write= (should_write < length) ? should_write : length;
+    if (ptr->type == MEMCACHED_CONNECTION_UDP)
+    {
+      //UDP does not support partial writes
+      buffer_end= MAX_UDP_DATAGRAM_LENGTH;
+      should_write= length;
+      if (ptr->write_buffer_offset + should_write > buffer_end)
+        return -1;
+    }
+    else
+    {
+      buffer_end= MEMCACHED_MAX_BUFFER;
+      should_write= buffer_end - ptr->write_buffer_offset;
+      should_write= (should_write < length) ? should_write : length;
+    }
 
+    write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
     memcpy(write_ptr, buffer_ptr, should_write);
     ptr->write_buffer_offset+= should_write;
     buffer_ptr+= should_write;
     length-= should_write;
 
-    if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
+    if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
     {
       memcached_return rc;
       ssize_t sent_length;
@@ -217,7 +230,7 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
 
       /* If io_flush calls memcached_purge, sent_length may be 0 */
       if (sent_length != 0)
-        WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
+        WATCHPOINT_ASSERT(sent_length == buffer_end);
     }
   }
 
@@ -288,7 +301,12 @@ static ssize_t io_flush(memcached_server_st *ptr,
 
   WATCHPOINT_ASSERT(ptr->fd != -1);
 
-  if (ptr->write_buffer_offset == 0)
+  // UDP Sanity check, make sure that we are not sending somthing too big
+  if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
+    return -1;
+
+  if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
+          && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
     return 0;
 
   /* Looking for memory overflows */
@@ -305,63 +323,40 @@ static ssize_t io_flush(memcached_server_st *ptr,
     WATCHPOINT_ASSERT(write_length > 0);
     sent_length= 0;
     if (ptr->type == MEMCACHED_CONNECTION_UDP)
+      increment_udp_message_id(ptr);
+    sent_length= write(ptr->fd, local_write_ptr, write_length);
+
+    if (sent_length == -1)
     {
-      struct addrinfo *ai;
-
-      ai= ptr->address_info;
-
-      /* Crappy test code */
-      char buffer[HUGE_STRING_LEN + 8];
-      memset(buffer, 0, HUGE_STRING_LEN + 8);
-      memcpy (buffer+8, local_write_ptr, write_length);
-      buffer[0]= 0;
-      buffer[1]= 0;
-      buffer[2]= 0;
-      buffer[3]= 0;
-      buffer[4]= 0;
-      buffer[5]= 1;
-      buffer[6]= 0;
-      buffer[7]= 0;
-      sent_length= sendto(ptr->fd, buffer, write_length + 8, 0, 
-                          (struct sockaddr *)ai->ai_addr, 
-                          ai->ai_addrlen);
-      if (sent_length == -1)
+      ptr->cached_errno= errno;
+      switch (errno)
       {
-        WATCHPOINT_ERRNO(errno);
-        WATCHPOINT_ASSERT(0);
-      }
-      sent_length-= 8; /* We remove the header */
-    }
-    else
-    {
-      WATCHPOINT_ASSERT(ptr->fd != -1);
-      if ((sent_length= write(ptr->fd, local_write_ptr, 
-                              write_length)) == -1)
+      case ENOBUFS:
+        continue;
+      case EAGAIN:
       {
-        ptr->cached_errno= errno;
-        switch (errno)
-        {
-        case ENOBUFS:
-          continue;
-        case EAGAIN:
-          {
-            memcached_return rc;
-            rc= io_wait(ptr, MEM_WRITE);
+        memcached_return rc;
+        rc= io_wait(ptr, MEM_WRITE);
 
-            if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) 
-              continue;
+        if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
+          continue;
 
-            memcached_quit_server(ptr, 1);
-            return -1;
-          }
-        default:
-          memcached_quit_server(ptr, 1);
-          *error= MEMCACHED_ERRNO;
-          return -1;
-        }
+        memcached_quit_server(ptr, 1);
+        return -1;
+      }
+      default:
+        memcached_quit_server(ptr, 1);
+        *error= MEMCACHED_ERRNO;
+        return -1;
       }
     }
 
+    if (ptr->type == MEMCACHED_CONNECTION_UDP && sent_length != write_length)
+    {
+      memcached_quit_server(ptr, 1);
+      return -1;
+    }
+
     ptr->io_bytes_sent += sent_length;
 
     local_write_ptr+= sent_length;
@@ -372,7 +367,13 @@ static ssize_t io_flush(memcached_server_st *ptr,
   WATCHPOINT_ASSERT(write_length == 0);
   // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
   // ptr->write_buffer_offset);
-  ptr->write_buffer_offset= 0;
+
+  // if we are a udp server, the begining of the buffer is reserverd for
+  // the upd frame header
+  if (ptr->type == MEMCACHED_CONNECTION_UDP)
+    ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
+  else
+    ptr->write_buffer_offset= 0;
 
   return return_length;
 }
@@ -454,3 +455,40 @@ memcached_return memcached_io_readline(memcached_server_st *ptr,
 
   return MEMCACHED_SUCCESS;
 }
+
+/*
+ * The udp request id consists of two seperate sections
+ *   1) The thread id
+ *   2) The message number
+ * The thread id should only be set when the memcached_st struct is created
+ * and should not be changed.
+ *
+ * The message num is incremented for each new message we send, this function
+ * extracts the message number from message_id, increments it and then
+ * writes the new value back into the header
+ */
+static void increment_udp_message_id(memcached_server_st *ptr)
+{
+  struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
+  uint16_t cur_req= get_udp_datagram_request_id(header);
+  uint16_t msg_num= get_msg_num_from_request_id(cur_req);
+  uint16_t thread_id= get_thread_id_from_request_id(cur_req);
+  
+  if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
+    msg_num= 0;
+
+  header->request_id= htons(thread_id | msg_num);
+}
+
+memcached_return memcached_io_init_udp_header(memcached_server_st *ptr, uint16_t thread_id)
+{
+  if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
+    return MEMCACHED_FAILURE;
+
+  struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
+  header->request_id= htons(generate_udp_request_thread_id(thread_id));
+  header->num_datagrams= htons(1);
+  header->sequence_number= htons(0);
+
+  return MEMCACHED_SUCCESS;
+}
index 1bed79599787930e2c2b31296e90e7cc0bc2b3fb..f1a622c0e5451a686c37330ee0d8a50c4bd0464c 100644 (file)
@@ -4,6 +4,25 @@
 #ifndef __MEMCACHED_IO_H__
 #define __MEMCACHED_IO_H__
 
+#define MAX_UDP_DATAGRAM_LENGTH 1400
+#define UDP_DATAGRAM_HEADER_LENGTH 8
+#define UDP_REQUEST_ID_MSG_SIG_DIGITS 10
+#define UDP_REQUEST_ID_THREAD_MASK 0xFFFF << UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define get_udp_datagram_request_id(A) ntohs((A)->request_id)
+#define get_udp_datagram_seq_num(A) ntohs((A)->sequence_number)
+#define get_udp_datagram_num_datagrams(A) ntohs((A)->num_datagrams)
+#define get_msg_num_from_request_id(A) ( (A) & (~(UDP_REQUEST_ID_THREAD_MASK)) )
+#define get_thread_id_from_request_id(A) ( (A) & (UDP_REQUEST_ID_THREAD_MASK) ) >> UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define generate_udp_request_thread_id(A) (A) << UDP_REQUEST_ID_MSG_SIG_DIGITS
+#define UDP_REQUEST_ID_MAX_THREAD_ID get_thread_id_from_request_id(0xFFFF)
+
+struct udp_datagram_header_st {
+  uint16_t request_id;
+  uint16_t sequence_number;
+  uint16_t num_datagrams;
+  uint16_t reserved;
+};
+
 ssize_t memcached_io_write(memcached_server_st *ptr,
                            const void *buffer, size_t length, char with_flush);
 void memcached_io_reset(memcached_server_st *ptr);
@@ -22,4 +41,6 @@ memcached_return memcached_safe_read(memcached_server_st *ptr,
 memcached_return memcached_read_one_response(memcached_server_st *ptr,
                                              char *buffer, size_t buffer_length,
                                              memcached_result_st *result);
+memcached_return memcached_io_init_udp_header(memcached_server_st *ptr,
+                                              uint16_t thread_id);
 #endif /* __MEMCACHED_IO_H__ */
index 14eca73d2cfc154d3452fce11c3da2df42a4a76b..592bde9f90f04a363a79119ed613c90279587f69 100644 (file)
@@ -13,7 +13,7 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death)
 {
   if (ptr->fd != -1)
   {
-    if (io_death == 0)
+    if (io_death == 0 && ptr->type != MEMCACHED_CONNECTION_UDP)
     {
       memcached_return rc;
       ssize_t read_length;
@@ -43,7 +43,7 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death)
     memcached_io_close(ptr);
 
     ptr->fd= -1;
-    ptr->write_buffer_offset= 0;
+    ptr->write_buffer_offset= (ptr->type == MEMCACHED_CONNECTION_UDP) ? UDP_DATAGRAM_HEADER_LENGTH : 0 ;
     ptr->read_buffer_length= 0;
     ptr->read_ptr= ptr->read_buffer;
     memcached_server_response_reset(ptr);
index b7dd8a94700eb309b8d54dcbf385ea2b5ec8b722..38d810481175e11affc5974e71bdb90f5176323a 100644 (file)
@@ -41,6 +41,11 @@ memcached_server_st *memcached_server_create_with(memcached_st *memc, memcached_
   host->read_ptr= host->read_buffer;
   if (memc)
     host->next_retry= memc->retry_timeout;
+  if (type == MEMCACHED_CONNECTION_UDP)
+  {
+    host->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
+    memcached_io_init_udp_header(host, 0);
+  }
 
   return host;
 }
index 9069bccbed6e0a74bb2cae1977b420b4e8fd10ab..3ad236d2108c347f8eb778c52e7211442b8cc222 100644 (file)
@@ -340,6 +340,12 @@ memcached_stat_st *memcached_stat(memcached_st *ptr, char *args, memcached_retur
   memcached_return rc;
   memcached_stat_st *stats;
 
+  if (ptr->flags & MEM_USE_UDP)
+  {
+    *error= MEMCACHED_NOT_SUPPORTED;
+    return NULL;
+  }
+
   if (ptr->call_malloc)
     stats= (memcached_stat_st *)ptr->call_malloc(ptr, sizeof(memcached_stat_st)*(ptr->number_of_hosts));
   else
index 1f551b87c078dc329387c29f7747ba7fe924ab0f..9acc7c3bcaef41f91acf357ced485de3b5218cac 100644 (file)
@@ -105,6 +105,14 @@ static inline memcached_return memcached_send(memcached_st *ptr,
                            (unsigned long long)expiration, value_length,
                            (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
 
+  if (ptr->flags & MEM_USE_UDP && ptr->flags & MEM_BUFFER_REQUESTS) {
+    size_t cmd_size= write_length + value_length + 2;
+    if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+      return MEMCACHED_WRITE_FAILURE;
+    if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+      memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
+  }
+
   if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
   {
     rc= MEMCACHED_WRITE_FAILURE;
@@ -135,9 +143,7 @@ static inline memcached_return memcached_send(memcached_st *ptr,
   }
 
   if (ptr->flags & MEM_NOREPLY)
-  {
     return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
-  }
 
   if (to_write == 0)
     return MEMCACHED_BUFFERED;
@@ -394,7 +400,7 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
   char flush;
   protocol_binary_request_set request= {.bytes= {0}};
   size_t send_length= sizeof(request.bytes);
-  bool noreply = server->root->flags & MEM_NOREPLY;
+  bool noreply= server->root->flags & MEM_NOREPLY;
 
   request.message.header.request.magic= PROTOCOL_BINARY_REQ;
   request.message.header.request.opcode= get_com_code(verb, noreply);
@@ -417,6 +423,15 @@ static memcached_return memcached_send_binary(memcached_server_st* server,
   
   flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
 
+  if ((server->root->flags & MEM_USE_UDP) && !flush)
+  {
+    size_t cmd_size= send_length + key_length + value_length;
+    if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
+      return MEMCACHED_WRITE_FAILURE;
+    if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
+      memcached_io_write(server,NULL,0, 1);
+  }
+  
   /* write the header */
   if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
       (memcached_io_write(server, key, key_length, 0) == -1) ||
index 7cfb073fdcd69799a703f88f61ee6519fc137aba..8dc9df3193c6e4f9a45ae6bccd930b7f3b32c1db 100644 (file)
@@ -24,6 +24,9 @@ memcached_return memcached_verbosity(memcached_st *ptr, unsigned int verbosity)
       continue;
     }
 
+    if (ptr->flags & MEM_USE_UDP)
+      continue;
+
     rrc= memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
     if (rrc != MEMCACHED_SUCCESS)
       rc= MEMCACHED_SOME_ERRORS;
index 5b2386f299655f2a8840f5beca98d705539f69d3..30bcb1bd28554638681c45618036cd93d3b71048 100644 (file)
@@ -10,6 +10,9 @@ static inline memcached_return memcached_version_textual(memcached_st *ptr);
 
 memcached_return memcached_version(memcached_st *ptr)
 {
+   if (ptr->flags & MEM_USE_UDP)
+    return MEMCACHED_NOT_SUPPORTED;
+
    if (ptr->flags & MEM_BINARY_PROTOCOL)
      return memcached_version_binary(ptr);
    else
index bb2d399310390781f62da698d0923e00f419a4bf..09d2673e6bda967cb4d85da96c1e08a2d004ce2b 100644 (file)
@@ -3173,6 +3173,349 @@ static test_return analyzer_test(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
+static void increment_request_id(uint16_t *id) {
+  (*id)++;
+  if ((*id & UDP_REQUEST_ID_THREAD_MASK) != 0)
+    *id= 0;
+}
+
+static uint16_t *get_udp_request_ids(memcached_st *memc)
+{
+  uint16_t *ids= malloc(sizeof(uint16_t) * memc->number_of_hosts);
+  assert(ids != NULL);
+  unsigned int i;
+  for (i= 0; i < memc->number_of_hosts; i++)
+    ids[i]= get_udp_datagram_request_id((struct udp_datagram_header_st *) memc->hosts[i].write_buffer);
+
+  return ids;
+}
+
+static test_return post_udp_op_check(memcached_st *memc, uint16_t *expected_req_ids) {
+  unsigned int i;
+  memcached_server_st *cur_server = memc->hosts;
+  uint16_t *cur_req_ids = get_udp_request_ids(memc);
+  for (i= 0; i < memc->number_of_hosts; i++)
+  {
+    assert(cur_server[i].cursor_active == 0);
+    assert(cur_req_ids[i] == expected_req_ids[i]);
+  }
+  free(expected_req_ids);
+  free(cur_req_ids);
+  return TEST_SUCCESS;
+}
+
+/*
+** There is a little bit of a hack here, instead of removing
+** the servers, I just set num host to 0 and them add then new udp servers
+**/
+static memcached_return init_udp(memcached_st *memc)
+{
+  memcached_version(memc);
+  if (memc->hosts[0].major_version != 1 || memc->hosts[0].minor_version != 2)
+    return MEMCACHED_FAILURE;
+
+  uint32_t num_hosts= memc->number_of_hosts;
+  unsigned int i= 0;
+  memcached_server_st servers[num_hosts];
+  memcpy(servers, memc->hosts, sizeof(memcached_server_st) * num_hosts);
+  memc->number_of_hosts= 0;
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1);
+  for (i= 0; i < num_hosts; i++)
+  {
+    assert(memcached_server_add_udp(memc, servers[i].hostname, servers[i].port) == MEMCACHED_SUCCESS);
+    assert(memc->hosts[i].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+  }
+  return MEMCACHED_SUCCESS;
+}
+
+static memcached_return binary_init_udp(memcached_st *memc)
+{
+  pre_binary(memc);
+  return init_udp(memc);
+}
+
+/* Make sure that I cant add a tcp server to a udp client */
+static test_return add_tcp_server_udp_client_test(memcached_st *memc)
+{
+  memcached_server_st server;
+  memcached_server_clone(&server, &memc->hosts[0]);
+  assert(memcached_server_remove(&(memc->hosts[0])) == MEMCACHED_SUCCESS);
+  assert(memcached_server_add(memc, server.hostname, server.port) == MEMCACHED_INVALID_HOST_PROTOCOL);
+  return TEST_SUCCESS;
+}
+
+/* Make sure that I cant add a udp server to a tcp client */
+static test_return add_udp_server_tcp_client_test(memcached_st *memc)
+{
+  memcached_server_st server;
+  memcached_server_clone(&server, &memc->hosts[0]);
+  assert(memcached_server_remove(&(memc->hosts[0])) == MEMCACHED_SUCCESS);
+
+  memcached_st tcp_client;
+  memcached_create(&tcp_client);
+  assert(memcached_server_add_udp(&tcp_client, server.hostname, server.port) == MEMCACHED_INVALID_HOST_PROTOCOL);
+  return TEST_SUCCESS;
+}
+
+static test_return set_udp_behavior_test(memcached_st *memc)
+{
+
+  memcached_quit(memc);
+  memc->number_of_hosts= 0;
+  run_distribution(memc);
+  assert(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1) == MEMCACHED_SUCCESS);
+  assert(memc->flags & MEM_USE_UDP);
+  assert(memc->flags & MEM_NOREPLY);;
+  
+  assert(memc->number_of_hosts == 0);
+
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP,0);
+  assert(!(memc->flags & MEM_USE_UDP));
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY,0);
+  assert(!(memc->flags & MEM_NOREPLY));
+  return TEST_SUCCESS;
+}
+
+static test_return udp_set_test(memcached_st *memc)
+{
+  unsigned int i= 0;
+  unsigned int num_iters= 1025; //request id rolls over at 1024
+  for (i= 0; i < num_iters;i++)
+  {
+    memcached_return rc;
+    char *key= "foo";
+    char *value= "when we sanitize";
+    uint16_t *expected_ids= get_udp_request_ids(memc);
+    unsigned int server_key= memcached_generate_hash(memc,key,strlen(key));
+    size_t init_offset= memc->hosts[server_key].write_buffer_offset;
+    rc= memcached_set(memc, key, strlen(key),
+                      value, strlen(value),
+                      (time_t)0, (uint32_t)0);
+    assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+    /** NB, the check below assumes that if new write_ptr is less than
+     *  the original write_ptr that we have flushed. For large payloads, this
+     *  maybe an invalid assumption, but for the small payload we have it is OK
+     */
+    if (rc == MEMCACHED_SUCCESS ||
+            memc->hosts[server_key].write_buffer_offset < init_offset)
+      increment_request_id(&expected_ids[server_key]);
+    
+    if (rc == MEMCACHED_SUCCESS) {
+      assert(memc->hosts[server_key].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+    } else {
+      assert(memc->hosts[server_key].write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH);
+      assert(memc->hosts[server_key].write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH);
+    }
+    assert(post_udp_op_check(memc,expected_ids) == TEST_SUCCESS);
+  }
+  return TEST_SUCCESS;
+}
+
+static test_return udp_buffered_set_test(memcached_st *memc)
+{
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
+  return udp_set_test(memc);
+}
+
+static test_return udp_set_too_big_test(memcached_st *memc)
+{
+  memcached_return rc;
+  char *key= "bar";
+  char value[MAX_UDP_DATAGRAM_LENGTH];
+  uint16_t *expected_ids= get_udp_request_ids(memc);
+  rc= memcached_set(memc, key, strlen(key),
+                    value, MAX_UDP_DATAGRAM_LENGTH,
+                    (time_t)0, (uint32_t)0);
+  assert(rc == MEMCACHED_WRITE_FAILURE);
+  return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_delete_test(memcached_st *memc)
+{
+  unsigned int i= 0;
+  unsigned int num_iters= 1025; //request id rolls over at 1024
+  for (i= 0; i < num_iters;i++)
+  {
+    memcached_return rc;
+    char *key= "foo";
+    uint16_t *expected_ids=get_udp_request_ids(memc);
+    unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+    size_t init_offset= memc->hosts[server_key].write_buffer_offset;
+    rc= memcached_delete(memc, key, strlen(key), 0);
+    assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+    if (rc == MEMCACHED_SUCCESS || memc->hosts[server_key].write_buffer_offset < init_offset)
+      increment_request_id(&expected_ids[server_key]);
+    if (rc == MEMCACHED_SUCCESS)
+      assert(memc->hosts[server_key].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+    else
+    {
+      assert(memc->hosts[server_key].write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH);
+      assert(memc->hosts[server_key].write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH);
+    }
+    assert(post_udp_op_check(memc,expected_ids) == TEST_SUCCESS);
+  }
+  return TEST_SUCCESS;
+}
+
+static test_return udp_buffered_delete_test(memcached_st *memc)
+{
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
+  return udp_delete_test(memc);
+}
+
+test_return udp_verbosity_test(memcached_st *memc)
+{
+  memcached_return rc;
+  uint16_t *expected_ids= get_udp_request_ids(memc);
+  unsigned int x;
+  for (x= 0; x < memc->number_of_hosts;x++)
+    increment_request_id(&expected_ids[x]);
+
+  rc= memcached_verbosity(memc,3);
+  assert(rc == MEMCACHED_SUCCESS);
+  return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_quit_test(memcached_st *memc)
+{
+  uint16_t *expected_ids= get_udp_request_ids(memc);
+  memcached_quit(memc);
+  return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_flush_test(memcached_st *memc)
+{
+  memcached_return rc;
+  uint16_t *expected_ids= get_udp_request_ids(memc);
+  unsigned int x;
+  for (x= 0; x < memc->number_of_hosts;x++) 
+    increment_request_id(&expected_ids[x]);
+
+  rc= memcached_flush(memc,0);
+  assert(rc == MEMCACHED_SUCCESS);
+  return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_incr_test(memcached_st *memc)
+{
+  memcached_return rc;
+  char *key= "incr";
+  char *value= "1";
+  rc= memcached_set(memc, key, strlen(key), 
+                    value, strlen(value),
+                    (time_t)0, (uint32_t)0);
+  
+  assert(rc == MEMCACHED_SUCCESS);
+  uint16_t *expected_ids= get_udp_request_ids(memc);
+  unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+  increment_request_id(&expected_ids[server_key]);
+  uint64_t newvalue;
+  rc= memcached_increment(memc, key, strlen(key), 1, &newvalue);
+  assert(rc == MEMCACHED_SUCCESS);
+  return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_decr_test(memcached_st *memc)
+{
+  memcached_return rc;
+  char *key= "decr";
+  char *value= "1";
+  rc= memcached_set(memc, key, strlen(key), 
+                    value, strlen(value),
+                    (time_t)0, (uint32_t)0);
+  
+  assert(rc == MEMCACHED_SUCCESS);
+  uint16_t *expected_ids= get_udp_request_ids(memc);
+  unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+  increment_request_id(&expected_ids[server_key]);
+  uint64_t newvalue;
+  rc= memcached_decrement(memc, key, strlen(key), 1, &newvalue);
+  assert(rc == MEMCACHED_SUCCESS);
+  return post_udp_op_check(memc, expected_ids);
+}
+
+
+test_return udp_stat_test(memcached_st *memc)
+{
+  memcached_stat_st * rv= NULL;
+  memcached_return rc;
+  char args[]= "";
+  uint16_t *expected_ids = get_udp_request_ids(memc);
+  rv = memcached_stat(memc, args, &rc);
+  free(rv);
+  assert(rc == MEMCACHED_NOT_SUPPORTED);
+  return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_version_test(memcached_st *memc)
+{
+  memcached_return rc;
+  uint16_t *expected_ids = get_udp_request_ids(memc);
+  rc = memcached_version(memc);
+  assert(rc == MEMCACHED_NOT_SUPPORTED);
+  return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_get_test(memcached_st *memc)
+{
+  memcached_return rc;
+  char *key= "foo";
+  size_t vlen;
+  uint16_t *expected_ids = get_udp_request_ids(memc);
+  char *val= memcached_get(memc, key, strlen(key), &vlen, (uint32_t)0, &rc);
+  assert(rc == MEMCACHED_NOT_SUPPORTED);
+  assert(val == NULL);
+  return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_mixed_io_test(memcached_st *memc)
+{
+  test_st current_op;
+  test_st mixed_io_ops [] ={
+    {"udp_set_test", 0, udp_set_test},
+    {"udp_set_too_big_test", 0, udp_set_too_big_test},
+    {"udp_delete_test", 0, udp_delete_test},
+    {"udp_verbosity_test", 0, udp_verbosity_test},
+    {"udp_quit_test", 0, udp_quit_test},
+    {"udp_flush_test", 0, udp_flush_test},
+    {"udp_incr_test", 0, udp_incr_test},
+    {"udp_decr_test", 0, udp_decr_test},
+    {"udp_version_test", 0, udp_version_test}
+  };
+  unsigned int i= 0;
+  for (i= 0; i < 500; i++)
+  {
+    current_op= mixed_io_ops[random() % 9];
+    assert(current_op.function(memc) == TEST_SUCCESS);
+  }
+  return TEST_SUCCESS;
+}
+
+test_st udp_setup_server_tests[] ={
+  {"set_udp_behavior_test", 0, set_udp_behavior_test},
+  {"add_tcp_server_udp_client_test", 0, add_tcp_server_udp_client_test},
+  {"add_udp_server_tcp_client_test", 0, add_udp_server_tcp_client_test},
+  {0, 0, 0}
+};
+
+test_st upd_io_tests[] ={
+  {"udp_set_test", 0, udp_set_test},
+  {"udp_buffered_set_test", 0, udp_buffered_set_test},
+  {"udp_set_too_big_test", 0, udp_set_too_big_test},
+  {"udp_delete_test", 0, udp_delete_test},
+  {"udp_buffered_delete_test", 0, udp_buffered_delete_test},
+  {"udp_verbosity_test", 0, udp_verbosity_test},
+  {"udp_quit_test", 0, udp_quit_test},
+  {"udp_flush_test", 0, udp_flush_test},
+  {"udp_incr_test", 0, udp_incr_test},
+  {"udp_decr_test", 0, udp_decr_test},
+  {"udp_stat_test", 0, udp_stat_test},
+  {"udp_version_test", 0, udp_version_test},
+  {"udp_get_test", 0, udp_get_test},
+  {"udp_mixed_io_test", 0, udp_mixed_io_test},
+  {0, 0, 0}
+};
+
 /* Clean the server before beginning testing */
 test_st tests[] ={
   {"flush", 0, flush_test },
@@ -3317,6 +3660,9 @@ test_st consistent_weighted_tests[] ={
 };
 
 collection_st collection[] ={
+  {"udp_setup", init_udp, 0, udp_setup_server_tests},
+  {"udp_io", init_udp, 0, upd_io_tests},
+  {"udp_binary_io", binary_init_udp, 0, upd_io_tests},
   {"block", 0, 0, tests},
   {"binary", pre_binary, 0, tests},
   {"nonblock", pre_nonblock, 0, tests},
index 5023188c0dd088e533aa27e5b2ee60b4c8828d68..8a7970a0d48809032413aa9da5c95b6b9ed4acc7 100644 (file)
@@ -38,24 +38,14 @@ void server_startup(server_startup_st *construct)
         int count;
         int status;
 
-        if (construct->udp){
-          if(x == 0) {
-            sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -U %u -m 128", 
-                    MEMCACHED_BINARY, x, x+ TEST_PORT_BASE);
-          } else {
-            sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -U %u", 
-                    MEMCACHED_BINARY, x, x+ TEST_PORT_BASE);
-          }
-        }
-        else{
-          if(x == 0) {
-            sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -m 128", 
-                    MEMCACHED_BINARY, x, x+ TEST_PORT_BASE);
-          } else {
-            sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u", 
-                    MEMCACHED_BINARY, x, x+ TEST_PORT_BASE);
-          }
+        if(x == 0) {
+          sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -U %u -m 128",
+                    MEMCACHED_BINARY, x, x + TEST_PORT_BASE, x + TEST_PORT_BASE);
+        } else {
+          sprintf(buffer, "%s -d -P /tmp/%umemc.pid -t 1 -p %u -U %u",
+                    MEMCACHED_BINARY, x, x + TEST_PORT_BASE, x + TEST_PORT_BASE);
         }
+        fprintf(stderr, "STARTING SERVER: %s\n", buffer);
         status= system(buffer);
         count= sprintf(end_ptr, "localhost:%u,", x + TEST_PORT_BASE);
         end_ptr+= count;