X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.c;h=5acbbd521ce6cd9f2c488d5e016cf4df50f1e23c;hb=30e386cd241ac56fd9205d12c143761475307705;hp=f1afc6695322fe961edc7a9dbd2c41a402def199;hpb=04ef974c23973986d4475e3cb8a876012264e2da;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.c b/libmemcached/io.c index f1afc669..5acbbd52 100644 --- a/libmemcached/io.c +++ b/libmemcached/io.c @@ -1,18 +1,43 @@ -/* LibMemcached - * Copyright (C) 2006-2009 Brian Aker - * All rights reserved. +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: + * + * LibMemcached * - * Use and distribution licensed under the BSD license. See - * the COPYING file in the parent directory for full text. + * Copyright (C) 2011 Data Differential, http://datadifferential.com/ + * Copyright (C) 2006-2009 Brian Aker + * All rights reserved. * - * Summary: Server IO, Not public! + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * The names of its contributors may not be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ -#include "common.h" -#include -#include +#include "libmemcached/common.h" typedef enum { MEM_READ, @@ -20,6 +45,7 @@ typedef enum { } memc_read_or_write; static ssize_t io_flush(memcached_server_write_instance_st ptr, + const bool with_flush, memcached_return_t *error); static void increment_udp_message_id(memcached_server_write_instance_st ptr); @@ -32,8 +58,15 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, }; int error; - unlikely (read_or_write == MEM_WRITE) /* write */ + if (read_or_write == MEM_WRITE) /* write */ + { fds.events= POLLOUT; + WATCHPOINT_SET(ptr->io_wait_count.write++); + } + else + { + WATCHPOINT_SET(ptr->io_wait_count.read++); + } /* ** We are going to block on write, but at least on Solaris we might block @@ -50,42 +83,61 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, return MEMCACHED_FAILURE; } - int timeout= ptr->root->poll_timeout; - if (ptr->root->flags.no_block == false) - timeout= -1; - size_t loop_max= 5; - while (--loop_max) + while (--loop_max) // While loop is for ERESTART or EINTR { - error= poll(&fds, 1, timeout); + error= poll(&fds, 1, ptr->root->poll_timeout); switch (error) { - case 1: + case 1: // Success! + WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); + WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); + return MEMCACHED_SUCCESS; - case 0: + case 0: // Timeout occured, we let the while() loop do its thing. return MEMCACHED_TIMEOUT; + default: + WATCHPOINT_ERRNO(get_socket_errno()); + switch (get_socket_errno()) + { #ifdef TARGET_OS_LINUX - case ERESTART: + case ERESTART: #endif - case EINTR: - continue; - default: - ptr->cached_errno= error; - memcached_quit_server(ptr, true); + case EINTR: + break; + default: + if (fds.revents & POLLERR) + { + int err; + socklen_t len= sizeof (err); + (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); + ptr->cached_errno= (err == 0) ? get_socket_errno() : err; + } + else + { + ptr->cached_errno= get_socket_errno(); + } + memcached_quit_server(ptr, true); - return MEMCACHED_FAILURE; + return MEMCACHED_FAILURE; + } } } /* Imposssible for anything other then -1 */ WATCHPOINT_ASSERT(error == -1); - ptr->cached_errno= error; + ptr->cached_errno= get_socket_errno(); memcached_quit_server(ptr, true); return MEMCACHED_FAILURE; } +memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) +{ + return io_wait(ptr, MEM_WRITE); +} + /** * Try to fill the input buffer for a server with as much * data as possible. @@ -108,9 +160,10 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) { /* Just try a single read to grab what's available */ - ssize_t nr= read(ptr->fd, + ssize_t nr= recv(ptr->fd, ptr->read_ptr + ptr->read_data_length, - MEMCACHED_MAX_BUFFER - ptr->read_data_length); + MEMCACHED_MAX_BUFFER - ptr->read_data_length, + 0); if (nr > 0) { @@ -174,40 +227,6 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) return false; } -static inline void memcached_io_cork_push(memcached_server_st *ptr) -{ - (void)ptr; -#ifdef CORK - if (ptr->root->flags.cork == false || ptr->state.is_corked) - return; - - int enable= 1; - int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK, - &enable, (socklen_t)sizeof(int)); - if (! err) - ptr->state.is_corked= true; - - WATCHPOINT_ASSERT(ptr->state.is_corked == true); -#endif -} - -static inline void memcached_io_cork_pop(memcached_server_st *ptr) -{ - (void)ptr; -#ifdef CORK - if (ptr->root->flags.cork == false || ptr->state.is_corked == false) - return; - - int enable= 0; - int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK, - &enable, (socklen_t)sizeof(int)); - if (! err) - ptr->state.is_corked= false; - - WATCHPOINT_ASSERT(ptr->state.is_corked == false); -#endif -} - #if 0 // Dead code, this should be removed. void memcached_io_preread(memcached_st *ptr) { @@ -222,10 +241,10 @@ void memcached_io_preread(memcached_st *ptr) { size_t data_read; - data_read= read(ptr->hosts[x].fd, + data_read= recv(ptr->hosts[x].fd, ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length, - MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length); - if (data_read == -1) + MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length, 0); + if (data_read == SOCKET_ERROR) continue; ptr->hosts[x].read_buffer_length+= data_read; @@ -250,16 +269,21 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, while (1) { - data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER); + data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0); if (data_read > 0) + { break; - else if (data_read == -1) + } + else if (data_read == SOCKET_ERROR) { - ptr->cached_errno= errno; - memcached_return_t rc= MEMCACHED_UNKNOWN_READ_FAILURE; - switch (errno) + ptr->cached_errno= get_socket_errno(); + memcached_return_t rc= MEMCACHED_ERRNO; + switch (get_socket_errno()) { + case EWOULDBLOCK: +#ifdef USE_EAGAIN case EAGAIN: +#endif case EINTR: #ifdef TARGET_OS_LINUX case ERESTART: @@ -287,6 +311,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, for blocking I/O we do not return 0 and for non-blocking case it will return EGAIN if data is not immediatly available. */ + WATCHPOINT_STRING("We had a zero length recv()"); memcached_quit_server(ptr, true); *nread= -1; return MEMCACHED_UNKNOWN_READ_FAILURE; @@ -326,23 +351,17 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } -ssize_t memcached_io_write(memcached_server_write_instance_st ptr, - const void *buffer, size_t length, bool with_flush) +static ssize_t _io_write(memcached_server_write_instance_st ptr, + const void *buffer, size_t length, bool with_flush) { size_t original_length; const char* buffer_ptr; - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); original_length= length; buffer_ptr= buffer; - /* more writable data is coming if a flush isn't required, so delay send */ - if (! with_flush) - { - memcached_io_cork_push(ptr); - } - while (length) { char *write_ptr; @@ -355,7 +374,9 @@ ssize_t memcached_io_write(memcached_server_write_instance_st ptr, buffer_end= MAX_UDP_DATAGRAM_LENGTH; should_write= length; if (ptr->write_buffer_offset + should_write > buffer_end) + { return -1; + } } else { @@ -375,10 +396,12 @@ ssize_t memcached_io_write(memcached_server_write_instance_st ptr, memcached_return_t rc; ssize_t sent_length; - WATCHPOINT_ASSERT(ptr->fd != -1); - sent_length= io_flush(ptr, &rc); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + sent_length= io_flush(ptr, with_flush, &rc); if (sent_length == -1) + { return -1; + } /* If io_flush calls memcached_purge, sent_length may be 0 */ unlikely (sent_length != 0) @@ -391,36 +414,69 @@ ssize_t memcached_io_write(memcached_server_write_instance_st ptr, if (with_flush) { memcached_return_t rc; - WATCHPOINT_ASSERT(ptr->fd != -1); - if (io_flush(ptr, &rc) == -1) + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + if (io_flush(ptr, with_flush, &rc) == -1) { return -1; } - - memcached_io_cork_pop(ptr); } return (ssize_t) original_length; } +ssize_t memcached_io_write(memcached_server_write_instance_st ptr, + const void *buffer, size_t length, bool with_flush) +{ + return _io_write(ptr, buffer, length, with_flush); +} + +ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, + const struct libmemcached_io_vector_st *vector, + size_t number_of, bool with_flush) +{ + ssize_t total= 0; + + for (size_t x= 0; x < number_of; x++, vector++) + { + ssize_t returnable; + + if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + { + return -1; + } + total+= returnable; + } + + if (with_flush) + { + if (memcached_io_write(ptr, NULL, 0, true) == -1) + { + return -1; + } + } + + return total; +} + + memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr) { - if (ptr->fd == -1) + if (ptr->fd == INVALID_SOCKET) { return MEMCACHED_SUCCESS; } /* in case of death shutdown to avoid blocking at close() */ - if (shutdown(ptr->fd, SHUT_RDWR) == -1 && errno != ENOTCONN) + if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN) { WATCHPOINT_NUMBER(ptr->fd); - WATCHPOINT_ERRNO(errno); - WATCHPOINT_ASSERT(errno); + WATCHPOINT_ERRNO(get_socket_errno()); + WATCHPOINT_ASSERT(get_socket_errno()); } - if (close(ptr->fd) == -1) + if (closesocket(ptr->fd) == SOCKET_ERROR) { - WATCHPOINT_ERRNO(errno); + WATCHPOINT_ERRNO(get_socket_errno()); } return MEMCACHED_SUCCESS; @@ -471,7 +527,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st int err= poll(fds, host_index, memc->poll_timeout); switch (err) { case -1: - memc->cached_errno = errno; + memcached_set_errno(memc, get_socket_errno(), NULL); /* FALLTHROUGH */ case 0: break; @@ -496,6 +552,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st } static ssize_t io_flush(memcached_server_write_instance_st ptr, + const bool with_flush, memcached_return_t *error) { /* @@ -505,11 +562,13 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, */ { memcached_return_t rc; - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); rc= memcached_purge(ptr); if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + { return -1; + } } ssize_t sent_length; size_t return_length; @@ -518,11 +577,13 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, *error= MEMCACHED_SUCCESS; - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); // UDP Sanity check, make sure that we are not sending somthing too big if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) + { return -1; + } if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) @@ -538,21 +599,37 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, return_length= 0; while (write_length) { - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); sent_length= 0; if (ptr->type == MEMCACHED_CONNECTION_UDP) increment_udp_message_id(ptr); - sent_length= write(ptr->fd, local_write_ptr, write_length); - if (sent_length == -1) + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + if (with_flush) + { + sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); + } + else + { + sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); + } + + if (sent_length == SOCKET_ERROR) { - ptr->cached_errno= errno; - switch (errno) + ptr->cached_errno= get_socket_errno(); +#if 0 // @todo I should look at why we hit this bit of code hard frequently + WATCHPOINT_ERRNO(get_socket_errno()); + WATCHPOINT_NUMBER(get_socket_errno()); +#endif + switch (get_socket_errno()) { case ENOBUFS: continue; + case EWOULDBLOCK: +#ifdef USE_EAGAIN case EAGAIN: +#endif { /* * We may be blocked on write because the input buffer @@ -573,9 +650,12 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, memcached_quit_server(ptr, true); return -1; } + case ENOTCONN: + case EPIPE: default: memcached_quit_server(ptr, true); *error= MEMCACHED_ERRNO; + WATCHPOINT_ASSERT(ptr->fd == -1); return -1; } }