X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.c;h=5acbbd521ce6cd9f2c488d5e016cf4df50f1e23c;hb=30e386cd241ac56fd9205d12c143761475307705;hp=299242f68f328babc3ec92c4d84c56c2c6ae6f2a;hpb=5ab3cfccd05bf4375edf1a5f3cab85be97220b60;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.c b/libmemcached/io.c index 299242f6..5acbbd52 100644 --- a/libmemcached/io.c +++ b/libmemcached/io.c @@ -1,16 +1,43 @@ -/* LibMemcached - * Copyright (C) 2006-2009 Brian Aker - * All rights reserved. +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: + * + * LibMemcached * - * Use and distribution licensed under the BSD license. See - * the COPYING file in the parent directory for full text. + * Copyright (C) 2011 Data Differential, http://datadifferential.com/ + * Copyright (C) 2006-2009 Brian Aker + * All rights reserved. * - * Summary: Server IO, Not public! + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * The names of its contributors may not be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ -#include "common.h" +#include "libmemcached/common.h" typedef enum { MEM_READ, @@ -18,6 +45,7 @@ typedef enum { } memc_read_or_write; static ssize_t io_flush(memcached_server_write_instance_st ptr, + const bool with_flush, memcached_return_t *error); static void increment_udp_message_id(memcached_server_write_instance_st ptr); @@ -55,14 +83,10 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, return MEMCACHED_FAILURE; } - int timeout= ptr->root->poll_timeout; - if (ptr->root->flags.no_block == false) - timeout= -1; - size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { - error= poll(&fds, 1, timeout); + error= poll(&fds, 1, ptr->root->poll_timeout); switch (error) { @@ -109,6 +133,11 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, return MEMCACHED_FAILURE; } +memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) +{ + return io_wait(ptr, MEM_WRITE); +} + /** * Try to fill the input buffer for a server with as much * data as possible. @@ -198,40 +227,6 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) return false; } -static inline void memcached_io_cork_push(memcached_server_st *ptr) -{ - (void)ptr; -#ifdef CORK - if (ptr->root->flags.cork == false || ptr->state.is_corked) - return; - - int enable= 1; - int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK, - &enable, (socklen_t)sizeof(int)); - if (! err) - ptr->state.is_corked= true; - - WATCHPOINT_ASSERT(ptr->state.is_corked == true); -#endif -} - -static inline void memcached_io_cork_pop(memcached_server_st *ptr) -{ - (void)ptr; -#ifdef CORK - if (ptr->root->flags.cork == false || ptr->state.is_corked == false) - return; - - int enable= 0; - int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK, - &enable, (socklen_t)sizeof(int)); - if (! err) - ptr->state.is_corked= false; - - WATCHPOINT_ASSERT(ptr->state.is_corked == false); -#endif -} - #if 0 // Dead code, this should be removed. void memcached_io_preread(memcached_st *ptr) { @@ -367,12 +362,6 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, original_length= length; buffer_ptr= buffer; - /* more writable data is coming if a flush isn't required, so delay send */ - if (! with_flush) - { - memcached_io_cork_push(ptr); - } - while (length) { char *write_ptr; @@ -385,7 +374,9 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, buffer_end= MAX_UDP_DATAGRAM_LENGTH; should_write= length; if (ptr->write_buffer_offset + should_write > buffer_end) + { return -1; + } } else { @@ -406,9 +397,11 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, ssize_t sent_length; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - sent_length= io_flush(ptr, &rc); + sent_length= io_flush(ptr, with_flush, &rc); if (sent_length == -1) + { return -1; + } /* If io_flush calls memcached_purge, sent_length may be 0 */ unlikely (sent_length != 0) @@ -422,12 +415,10 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, { memcached_return_t rc; WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (io_flush(ptr, &rc) == -1) + if (io_flush(ptr, with_flush, &rc) == -1) { return -1; } - - memcached_io_cork_pop(ptr); } return (ssize_t) original_length; @@ -440,7 +431,7 @@ ssize_t memcached_io_write(memcached_server_write_instance_st ptr, } ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, - const struct __write_vector_st *vector, + const struct libmemcached_io_vector_st *vector, size_t number_of, bool with_flush) { ssize_t total= 0; @@ -536,7 +527,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st int err= poll(fds, host_index, memc->poll_timeout); switch (err) { case -1: - memc->cached_errno = get_socket_errno(); + memcached_set_errno(memc, get_socket_errno(), NULL); /* FALLTHROUGH */ case 0: break; @@ -561,6 +552,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st } static ssize_t io_flush(memcached_server_write_instance_st ptr, + const bool with_flush, memcached_return_t *error) { /* @@ -574,7 +566,9 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, rc= memcached_purge(ptr); if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + { return -1; + } } ssize_t sent_length; size_t return_length; @@ -587,7 +581,9 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, // UDP Sanity check, make sure that we are not sending somthing too big if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) + { return -1; + } if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) @@ -609,7 +605,16 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, if (ptr->type == MEMCACHED_CONNECTION_UDP) increment_udp_message_id(ptr); - sent_length= send(ptr->fd, local_write_ptr, write_length, 0); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + if (with_flush) + { + sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); + } + else + { + sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); + } + if (sent_length == SOCKET_ERROR) { ptr->cached_errno= get_socket_errno(); @@ -645,9 +650,12 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, memcached_quit_server(ptr, true); return -1; } + case ENOTCONN: + case EPIPE: default: memcached_quit_server(ptr, true); *error= MEMCACHED_ERRNO; + WATCHPOINT_ASSERT(ptr->fd == -1); return -1; } }