Partial work on making UDP protocol actually work (flush_all working!)
authorBrian Aker <brian@tangent.org>
Wed, 30 Jan 2008 22:02:42 +0000 (14:02 -0800)
committerBrian Aker <brian@tangent.org>
Wed, 30 Jan 2008 22:02:42 +0000 (14:02 -0800)
lib/memcached_connect.c
lib/memcached_io.c
lib/memcached_response.c

index 9f0c6e7dee863a7fccaaebcce87cd77bb73be2a4..f7167f451b8fbf8af1ca8a7808163b9acbf21243 100644 (file)
@@ -10,9 +10,18 @@ static memcached_return set_hostinfo(memcached_server_st *server)
   sprintf(str_port, "%u", server->port);
 
   memset(&hints, 0, sizeof(hints));
+
   hints.ai_family= AF_INET;
-  hints.ai_socktype= SOCK_STREAM;
-  hints.ai_protocol= IPPROTO_TCP;
+  if (server->type == MEMCACHED_CONNECTION_UDP)
+  {
+    hints.ai_protocol= IPPROTO_UDP;
+    hints.ai_socktype= SOCK_DGRAM;
+  }
+  else
+  {
+    hints.ai_socktype= SOCK_STREAM;
+    hints.ai_protocol= IPPROTO_TCP;
+  }
 
   e= getaddrinfo(server->hostname, str_port, &hints, &ai);
   if (e != 0)
@@ -71,41 +80,7 @@ test_connect:
   return MEMCACHED_SUCCESS;
 }
 
-static memcached_return udp_connect(memcached_server_st *ptr)
-{
-  if (ptr->fd == -1)
-  {
-    /* Old connection junk still is in the structure */
-    WATCHPOINT_ASSERT(ptr->cursor_active == 0);
-
-    /*
-      If we have not allocated the hosts object.
-      Or if the cache has not been set.
-    */
-    if (ptr->sockaddr_inited == MEMCACHED_NOT_ALLOCATED || 
-        (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS)))
-    {
-      memcached_return rc;
-
-      rc= set_hostinfo(ptr);
-      if (rc != MEMCACHED_SUCCESS)
-        return rc;
-
-      ptr->sockaddr_inited= MEMCACHED_ALLOCATED;
-    }
-
-    /* Create the socket */
-    if ((ptr->fd= socket(AF_INET, SOCK_DGRAM, 0)) < 0)
-    {
-      ptr->cached_errno= errno;
-      return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
-    }
-  }
-
-  return MEMCACHED_SUCCESS;
-}
-
-static memcached_return tcp_connect(memcached_server_st *ptr)
+static memcached_return network_connect(memcached_server_st *ptr)
 {
   if (ptr->fd == -1)
   {
@@ -136,6 +111,9 @@ static memcached_return tcp_connect(memcached_server_st *ptr)
       return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
     }
 
+    if (ptr->type == MEMCACHED_CONNECTION_UDP)
+      return MEMCACHED_SUCCESS;
+
     if (ptr->root->flags & MEM_NO_BLOCK)
     {
       int error;
@@ -245,10 +223,8 @@ memcached_return memcached_connect(memcached_server_st *ptr)
     rc= MEMCACHED_NOT_SUPPORTED;
     break;
   case MEMCACHED_CONNECTION_UDP:
-    rc= udp_connect(ptr);
-    break;
   case MEMCACHED_CONNECTION_TCP:
-    rc= tcp_connect(ptr);
+    rc= network_connect(ptr);
     break;
   case MEMCACHED_CONNECTION_UNIX_SOCKET:
     rc= unix_socket_connect(ptr);
index bb53f08ec023b9d1273812abd7ae74d12e7d3b90..d9e2602de3b7555717f1617ca552d86ef5f28101 100644 (file)
@@ -213,7 +213,7 @@ memcached_return memcached_io_close(memcached_server_st *ptr)
 }
 
 static ssize_t io_flush(memcached_server_st *ptr,
-                                  memcached_return *error)
+                        memcached_return *error)
 {
   size_t sent_length;
   size_t return_length;
@@ -229,15 +229,39 @@ static ssize_t io_flush(memcached_server_st *ptr,
   if (write_length == MEMCACHED_MAX_BUFFER)
     WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
   WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
+
   return_length= 0;
   while (write_length)
   {
+    WATCHPOINT_ASSERT(write_length > 0);
     sent_length= 0;
     if (ptr->type == MEMCACHED_CONNECTION_UDP)
     {
-      sent_length= sendto(ptr->fd, local_write_ptr, write_length, 0, 
-                          (struct sockaddr *)&ptr->address_info->ai_addr, 
-                          sizeof(struct sockaddr));
+      struct addrinfo *ai;
+
+      ai= ptr->address_info;
+
+      /* Crappy test code */
+      char buffer[HUGE_STRING_LEN + 8];
+      memset(buffer, 0, HUGE_STRING_LEN + 8);
+      memcpy (buffer+8, local_write_ptr, write_length);
+      buffer[0]= 0;
+      buffer[1]= 0;
+      buffer[2]= 0;
+      buffer[3]= 0;
+      buffer[4]= 0;
+      buffer[5]= 1;
+      buffer[6]= 0;
+      buffer[7]= 0;
+      sent_length= sendto(ptr->fd, buffer, write_length + 8, 0, 
+                          (struct sockaddr *)ai->ai_addr, 
+                          ai->ai_addrlen);
+      if (sent_length == -1)
+      {
+        WATCHPOINT_ERRNO(errno);
+        WATCHPOINT_ASSERT(0);
+      }
+      sent_length-= 8; /* We remove the header */
     }
     else
     {
index 956107932615034853f4d12748137f052e759e40..31b5911737f2820a39d4d1f36c0ab55c9e23e344 100644 (file)
@@ -19,6 +19,16 @@ memcached_return memcached_response(memcached_server_st *ptr,
 
 
   send_length= 0;
+  /* UDP at the moment is odd...*/
+  if (ptr->type == MEMCACHED_CONNECTION_UDP)
+  {
+    char buffer[8];
+    ssize_t read_length;
+
+    return MEMCACHED_SUCCESS;
+
+    read_length= memcached_io_read(ptr, buffer, 8);
+  }
 
   /* We may have old commands in the buffer not set, first purge */
   if (ptr->root->flags & MEM_NO_BLOCK)
@@ -30,6 +40,7 @@ memcached_return memcached_response(memcached_server_st *ptr,
     size_t total_length= 0;
     buffer_ptr= buffer;
 
+
     while (1)
     {
       ssize_t read_length;