From: Brian Aker Date: Wed, 30 Jan 2008 22:02:42 +0000 (-0800) Subject: Partial work on making UDP protocol actually work (flush_all working!) X-Git-Tag: _20~31 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=2c213592852ceb196be1b1760d17284150fc0678;p=m6w6%2Flibmemcached Partial work on making UDP protocol actually work (flush_all working!) --- diff --git a/lib/memcached_connect.c b/lib/memcached_connect.c index 9f0c6e7d..f7167f45 100644 --- a/lib/memcached_connect.c +++ b/lib/memcached_connect.c @@ -10,9 +10,18 @@ static memcached_return set_hostinfo(memcached_server_st *server) sprintf(str_port, "%u", server->port); memset(&hints, 0, sizeof(hints)); + hints.ai_family= AF_INET; - hints.ai_socktype= SOCK_STREAM; - hints.ai_protocol= IPPROTO_TCP; + if (server->type == MEMCACHED_CONNECTION_UDP) + { + hints.ai_protocol= IPPROTO_UDP; + hints.ai_socktype= SOCK_DGRAM; + } + else + { + hints.ai_socktype= SOCK_STREAM; + hints.ai_protocol= IPPROTO_TCP; + } e= getaddrinfo(server->hostname, str_port, &hints, &ai); if (e != 0) @@ -71,41 +80,7 @@ test_connect: return MEMCACHED_SUCCESS; } -static memcached_return udp_connect(memcached_server_st *ptr) -{ - if (ptr->fd == -1) - { - /* Old connection junk still is in the structure */ - WATCHPOINT_ASSERT(ptr->cursor_active == 0); - - /* - If we have not allocated the hosts object. - Or if the cache has not been set. - */ - if (ptr->sockaddr_inited == MEMCACHED_NOT_ALLOCATED || - (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS))) - { - memcached_return rc; - - rc= set_hostinfo(ptr); - if (rc != MEMCACHED_SUCCESS) - return rc; - - ptr->sockaddr_inited= MEMCACHED_ALLOCATED; - } - - /* Create the socket */ - if ((ptr->fd= socket(AF_INET, SOCK_DGRAM, 0)) < 0) - { - ptr->cached_errno= errno; - return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE; - } - } - - return MEMCACHED_SUCCESS; -} - -static memcached_return tcp_connect(memcached_server_st *ptr) +static memcached_return network_connect(memcached_server_st *ptr) { if (ptr->fd == -1) { @@ -136,6 +111,9 @@ static memcached_return tcp_connect(memcached_server_st *ptr) return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE; } + if (ptr->type == MEMCACHED_CONNECTION_UDP) + return MEMCACHED_SUCCESS; + if (ptr->root->flags & MEM_NO_BLOCK) { int error; @@ -245,10 +223,8 @@ memcached_return memcached_connect(memcached_server_st *ptr) rc= MEMCACHED_NOT_SUPPORTED; break; case MEMCACHED_CONNECTION_UDP: - rc= udp_connect(ptr); - break; case MEMCACHED_CONNECTION_TCP: - rc= tcp_connect(ptr); + rc= network_connect(ptr); break; case MEMCACHED_CONNECTION_UNIX_SOCKET: rc= unix_socket_connect(ptr); diff --git a/lib/memcached_io.c b/lib/memcached_io.c index bb53f08e..d9e2602d 100644 --- a/lib/memcached_io.c +++ b/lib/memcached_io.c @@ -213,7 +213,7 @@ memcached_return memcached_io_close(memcached_server_st *ptr) } static ssize_t io_flush(memcached_server_st *ptr, - memcached_return *error) + memcached_return *error) { size_t sent_length; size_t return_length; @@ -229,15 +229,39 @@ static ssize_t io_flush(memcached_server_st *ptr, if (write_length == MEMCACHED_MAX_BUFFER) WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); + return_length= 0; while (write_length) { + WATCHPOINT_ASSERT(write_length > 0); sent_length= 0; if (ptr->type == MEMCACHED_CONNECTION_UDP) { - sent_length= sendto(ptr->fd, local_write_ptr, write_length, 0, - (struct sockaddr *)&ptr->address_info->ai_addr, - sizeof(struct sockaddr)); + struct addrinfo *ai; + + ai= ptr->address_info; + + /* Crappy test code */ + char buffer[HUGE_STRING_LEN + 8]; + memset(buffer, 0, HUGE_STRING_LEN + 8); + memcpy (buffer+8, local_write_ptr, write_length); + buffer[0]= 0; + buffer[1]= 0; + buffer[2]= 0; + buffer[3]= 0; + buffer[4]= 0; + buffer[5]= 1; + buffer[6]= 0; + buffer[7]= 0; + sent_length= sendto(ptr->fd, buffer, write_length + 8, 0, + (struct sockaddr *)ai->ai_addr, + ai->ai_addrlen); + if (sent_length == -1) + { + WATCHPOINT_ERRNO(errno); + WATCHPOINT_ASSERT(0); + } + sent_length-= 8; /* We remove the header */ } else { diff --git a/lib/memcached_response.c b/lib/memcached_response.c index 95610793..31b59117 100644 --- a/lib/memcached_response.c +++ b/lib/memcached_response.c @@ -19,6 +19,16 @@ memcached_return memcached_response(memcached_server_st *ptr, send_length= 0; + /* UDP at the moment is odd...*/ + if (ptr->type == MEMCACHED_CONNECTION_UDP) + { + char buffer[8]; + ssize_t read_length; + + return MEMCACHED_SUCCESS; + + read_length= memcached_io_read(ptr, buffer, 8); + } /* We may have old commands in the buffer not set, first purge */ if (ptr->root->flags & MEM_NO_BLOCK) @@ -30,6 +40,7 @@ memcached_return memcached_response(memcached_server_st *ptr, size_t total_length= 0; buffer_ptr= buffer; + while (1) { ssize_t read_length;