X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=6594aa75d13972e2a84d920c921ce4d0b225e326;hb=28adf7b936c6f5c25b7526ff56ec1256da1246d4;hp=51032f0062879c631b25c47010fb44ca59f3a0de;hpb=a6c3e3a3d04f379b1480c8c88a8eae17e54b1449;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 51032f00..6594aa75 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -37,12 +37,13 @@ */ -#include "libmemcached/common.h" +#include +#include -typedef enum { +enum memc_read_or_write { MEM_READ, MEM_WRITE -} memc_read_or_write; +}; static ssize_t io_flush(memcached_server_write_instance_st ptr, const bool with_flush, @@ -80,13 +81,22 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, { memcached_return_t rc= memcached_purge(ptr); if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + { return MEMCACHED_FAILURE; + } } size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { - error= poll(&fds, 1, ptr->root->poll_timeout); + if (ptr->root->poll_timeout) // Mimic 0 causes timeout behavior (not all platforms do this) + { + error= poll(&fds, 1, ptr->root->poll_timeout); + } + else + { + error= 0; + } switch (error) { @@ -95,8 +105,10 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); return MEMCACHED_SUCCESS; + case 0: // Timeout occured, we let the while() loop do its thing. - return MEMCACHED_TIMEOUT; + return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + default: WATCHPOINT_ERRNO(get_socket_errno()); switch (get_socket_errno()) @@ -106,6 +118,14 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, #endif case EINTR: break; + + case EFAULT: + case ENOMEM: + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + + case EINVAL: + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + default: if (fds.revents & POLLERR) { @@ -120,7 +140,7 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, } memcached_quit_server(ptr, true); - return MEMCACHED_FAILURE; + return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } } } @@ -130,7 +150,7 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, ptr->cached_errno= get_socket_errno(); memcached_quit_server(ptr, true); - return MEMCACHED_FAILURE; + return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT); } memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) @@ -159,18 +179,52 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr) /* There is room in the buffer, try to fill it! */ if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) { - /* Just try a single read to grab what's available */ - ssize_t nr= recv(ptr->fd, - ptr->read_ptr + ptr->read_data_length, - MEMCACHED_MAX_BUFFER - ptr->read_data_length, - 0); + do { + /* Just try a single read to grab what's available */ + ssize_t nr= recv(ptr->fd, + ptr->read_ptr + ptr->read_data_length, + MEMCACHED_MAX_BUFFER - ptr->read_data_length, + MSG_DONTWAIT); + + switch (nr) + { + case SOCKET_ERROR: + { + switch (get_socket_errno()) + { + case EINTR: + continue; - if (nr > 0) - { - ptr->read_data_length+= (size_t)nr; - ptr->read_buffer_length+= (size_t)nr; - return true; - } + case EWOULDBLOCK: +#ifdef USE_EAGAIN + case EAGAIN: +#endif +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + break; // No IO is fine, we can just move on + + default: + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + } + } + break; + + case 0: // Shutdown on the socket has occurred + { + memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); + } + break; + + default: + { + ptr->read_data_length+= size_t(nr); + ptr->read_buffer_length+= size_t(nr); + return true; + } + break; + } + } while (0); } return false; } @@ -227,80 +281,67 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr) return false; } -#if 0 // Dead code, this should be removed. -void memcached_io_preread(memcached_st *ptr) -{ - unsigned int x; - - return; - - for (x= 0; x < memcached_server_count(ptr); x++) - { - if (memcached_server_response_count(ptr, x) && - ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER ) - { - size_t data_read; - - data_read= recv(ptr->hosts[x].fd, - ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length, - MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length, 0); - if (data_read == SOCKET_ERROR) - continue; - - ptr->hosts[x].read_buffer_length+= data_read; - ptr->hosts[x].read_data_length+= data_read; - } - } -} -#endif - memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, void *buffer, size_t length, ssize_t *nread) { - char *buffer_ptr; + assert(ptr); // Programmer error + char *buffer_ptr= static_cast(buffer); - buffer_ptr= static_cast(buffer); + if (ptr->fd == INVALID_SOCKET) + { + assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO)); + return MEMCACHED_CONNECTION_FAILURE; + } while (length) { if (not ptr->read_buffer_length) { ssize_t data_read; - - while (1) + do { - data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0); - if (data_read > 0) - { - break; - } - else if (data_read == SOCKET_ERROR) + data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); + if (data_read == SOCKET_ERROR) { - ptr->cached_errno= get_socket_errno(); - memcached_return_t rc= MEMCACHED_ERRNO; switch (get_socket_errno()) { + case EINTR: // We just retry + continue; + + case ETIMEDOUT: // OSX case EWOULDBLOCK: #ifdef USE_EAGAIN case EAGAIN: #endif - case EINTR: #ifdef TARGET_OS_LINUX case ERESTART: #endif - if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS) + if (memcached_success(io_wait(ptr, MEM_READ))) + { continue; + } + return MEMCACHED_IN_PROGRESS; + /* fall through */ + case ENOTCONN: // Programmer Error + WATCHPOINT_ASSERT(0); + case ENOTSOCK: + WATCHPOINT_ASSERT(0); + case EBADF: + assert(ptr->fd != INVALID_SOCKET); + case EINVAL: + case EFAULT: + case ECONNREFUSED: default: { memcached_quit_server(ptr, true); *nread= -1; - return memcached_set_error(*ptr, rc, MEMCACHED_AT); + return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } } } - else + else if (data_read == 0) { /* EOF. Any data received so far is incomplete @@ -312,11 +353,12 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, it will return EGAIN if data is not immediatly available. */ WATCHPOINT_STRING("We had a zero length recv()"); + assert(0); memcached_quit_server(ptr, true); *nread= -1; return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT); } - } + } while (data_read <= 0); ptr->io_bytes_sent = 0; ptr->read_data_length= (size_t) data_read; @@ -351,6 +393,62 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } +memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) +{ + assert(ptr); // Programmer error + + if (ptr->fd == INVALID_SOCKET) + { + assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO)); + return MEMCACHED_CONNECTION_FAILURE; + } + + ssize_t data_read; + char buffer[MEMCACHED_MAX_BUFFER]; + do + { + data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT); + if (data_read == SOCKET_ERROR) + { + switch (get_socket_errno()) + { + case EINTR: // We just retry + continue; + + case ETIMEDOUT: // OSX + case EWOULDBLOCK: +#ifdef USE_EAGAIN + case EAGAIN: +#endif +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + if (memcached_success(io_wait(ptr, MEM_READ))) + { + continue; + } + return MEMCACHED_IN_PROGRESS; + + /* fall through */ + + case ENOTCONN: // Programmer Error + WATCHPOINT_ASSERT(0); + case ENOTSOCK: + WATCHPOINT_ASSERT(0); + case EBADF: + assert(ptr->fd != INVALID_SOCKET); + case EINVAL: + case EFAULT: + case ECONNREFUSED: + default: + return MEMCACHED_CONNECTION_FAILURE; // We want this! + } + } + } while (data_read > 0); + + return MEMCACHED_CONNECTION_FAILURE; +} + static ssize_t _io_write(memcached_server_write_instance_st ptr, const void *buffer, size_t length, bool with_flush) { @@ -393,11 +491,10 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) { - memcached_return_t rc; - ssize_t sent_length; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - sent_length= io_flush(ptr, with_flush, &rc); + + memcached_return_t rc; + ssize_t sent_length= io_flush(ptr, with_flush, &rc); if (sent_length == -1) { return -1; @@ -459,11 +556,11 @@ ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, } -memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr) +void memcached_io_close(memcached_server_write_instance_st ptr) { if (ptr->fd == INVALID_SOCKET) { - return MEMCACHED_SUCCESS; + return; } /* in case of death shutdown to avoid blocking at close() */ @@ -478,8 +575,8 @@ memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr) { WATCHPOINT_ERRNO(get_socket_errno()); } - - return MEMCACHED_SUCCESS; + ptr->state= MEMCACHED_SERVER_STATE_NEW; + ptr->fd= INVALID_SOCKET; } memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc) @@ -488,9 +585,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st struct pollfd fds[MAX_SERVERS_TO_POLL]; unsigned int host_index= 0; - for (uint32_t x= 0; - x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; - ++x) + for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) { memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); @@ -583,12 +678,15 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, // UDP Sanity check, make sure that we are not sending somthing too big if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) { + *error= MEMCACHED_WRITE_FAILURE; return -1; } if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) + { return 0; + } /* Looking for memory overflows */ #if defined(DEBUG) @@ -638,33 +736,42 @@ static ssize_t io_flush(memcached_server_write_instance_st ptr, * buffer for more data and retry the write before * waiting.. */ - if (repack_input_buffer(ptr) || + if (repack_input_buffer(ptr) or process_input_buffer(ptr)) + { continue; + } - memcached_return_t rc; - rc= io_wait(ptr, MEM_WRITE); - - if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) + memcached_return_t rc= io_wait(ptr, MEM_WRITE); + if (memcached_success(rc)) + { continue; + } + else if (rc == MEMCACHED_TIMEOUT) + { + *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return -1; + } memcached_quit_server(ptr, true); + *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); return -1; } case ENOTCONN: case EPIPE: default: memcached_quit_server(ptr, true); - *error= MEMCACHED_ERRNO; + *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); WATCHPOINT_ASSERT(ptr->fd == -1); return -1; } } - if (ptr->type == MEMCACHED_CONNECTION_UDP && + if (ptr->type == MEMCACHED_CONNECTION_UDP and (size_t)sent_length != write_length) { memcached_quit_server(ptr, true); + *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); return -1; } @@ -711,10 +818,14 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, while (offset < size) { ssize_t nread; - memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset, - &nread); - if (rc != MEMCACHED_SUCCESS) + memcached_return_t rc; + + while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { }; + + if (memcached_failed(rc)) + { return rc; + } offset+= (size_t) nread; } @@ -729,7 +840,7 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, bool line_complete= false; size_t total_nr= 0; - while (!line_complete) + while (not line_complete) { if (ptr->read_buffer_length == 0) { @@ -740,8 +851,15 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, */ ssize_t nread; memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread); - if (rc != MEMCACHED_SUCCESS) + if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) + { + memcached_quit_server(ptr, true); + return memcached_set_error(*ptr, rc, MEMCACHED_AT); + } + else if (memcached_failed(rc)) + { return rc; + } if (*buffer_ptr == '\n') line_complete= true;