X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fmemcached_io.c;h=72a02a0f97f52b030b3c1ab45cefa84b742c4bf4;hb=95084fee7261488e27be38d9c50957f82a75f416;hp=ea8b25816d9d981fce5f4d40cc29f3cc0a84be2f;hpb=2fe4e1635c70f4b2815aaf9bc147fee0b2c93de3;p=awesomized%2Flibmemcached diff --git a/libmemcached/memcached_io.c b/libmemcached/memcached_io.c index ea8b2581..72a02a0f 100644 --- a/libmemcached/memcached_io.c +++ b/libmemcached/memcached_io.c @@ -30,6 +30,21 @@ static memcached_return io_wait(memcached_server_st *ptr, fds[0].fd= ptr->fd; fds[0].events= flags; + /* + ** We are going to block on write, but at least on Solaris we might block + ** on write if we haven't read anything from our input buffer.. + ** Try to purge the input buffer if we don't do any flow control in the + ** application layer (just sending a lot of data etc) + ** The test is moved down in the purge function to avoid duplication of + ** the test. + */ + if (read_or_write == MEM_WRITE) + { + memcached_return rc=memcached_purge(ptr); + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + return MEMCACHED_FAILURE; + } + error= poll(fds, 1, ptr->root->poll_timeout); if (error == 1) @@ -83,7 +98,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr, while (length) { - uint8_t found_eof= 0; if (!ptr->read_buffer_length) { ssize_t data_read; @@ -101,6 +115,7 @@ ssize_t memcached_io_read(memcached_server_st *ptr, switch (errno) { case EAGAIN: + case EINTR: { memcached_return rc; @@ -119,11 +134,21 @@ ssize_t memcached_io_read(memcached_server_st *ptr, } else { - found_eof= 1; - break; + /* + EOF. Any data received so far is incomplete + so discard it. This always reads by byte in case of TCP + and protocol enforcement happens at memcached_response() + looking for '\n'. We do not care for UDB which requests 8 bytes + at once. Generally, this means that connection went away. Since + for blocking I/O we do not return 0 and for non-blocking case + it will return EGAIN if data is not immediatly available. + */ + memcached_quit_server(ptr, 1); + return -1; } } + ptr->io_bytes_sent = 0; ptr->read_data_length= data_read; ptr->read_buffer_length= data_read; ptr->read_ptr= ptr->read_buffer; @@ -149,9 +174,6 @@ ssize_t memcached_io_read(memcached_server_st *ptr, buffer_ptr++; break; } - - if (found_eof) - break; } return (size_t)(buffer_ptr - (char*)buffer); @@ -163,6 +185,8 @@ ssize_t memcached_io_write(memcached_server_st *ptr, size_t original_length; const char* buffer_ptr; + WATCHPOINT_ASSERT(ptr->fd != -1); + original_length= length; buffer_ptr= buffer; @@ -186,17 +210,21 @@ ssize_t memcached_io_write(memcached_server_st *ptr, memcached_return rc; ssize_t sent_length; + WATCHPOINT_ASSERT(ptr->fd != -1); sent_length= io_flush(ptr, &rc); if (sent_length == -1) return -1; - WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); + /* If io_flush calls memcached_purge, sent_length may be 0 */ + if (sent_length != 0) + WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER); } } if (with_flush) { memcached_return rc; + WATCHPOINT_ASSERT(ptr->fd != -1); if (io_flush(ptr, &rc) == -1) return -1; } @@ -204,14 +232,33 @@ ssize_t memcached_io_write(memcached_server_st *ptr, return original_length; } -memcached_return memcached_io_close(memcached_server_st *ptr, uint8_t io_death) +memcached_return memcached_io_close(memcached_server_st *ptr) { + int r; + + if (ptr->fd == -1) + return MEMCACHED_SUCCESS; + /* in case of death shutdown to avoid blocking at close() */ + if (1) + { + r= shutdown(ptr->fd, SHUT_RDWR); - if (io_death) - shutdown(ptr->fd, SHUT_RDWR); - else - close(ptr->fd); +#ifdef HAVE_DEBUG + if (r && errno != ENOTCONN) + { + WATCHPOINT_NUMBER(ptr->fd); + WATCHPOINT_ERRNO(errno); + WATCHPOINT_ASSERT(errno); + } +#endif + } + + r= close(ptr->fd); +#ifdef HAVE_DEBUG + if (r != 0) + WATCHPOINT_ERRNO(errno); +#endif return MEMCACHED_SUCCESS; } @@ -219,24 +266,42 @@ memcached_return memcached_io_close(memcached_server_st *ptr, uint8_t io_death) static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error) { - size_t sent_length; + /* + ** We might want to purge the input buffer if we haven't consumed + ** any output yet... The test for the limits is the purge is inline + ** in the purge function to avoid duplicating the logic.. + */ + { + memcached_return rc; + WATCHPOINT_ASSERT(ptr->fd != -1); + rc= memcached_purge(ptr); + + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + return -1; + } + ssize_t sent_length; size_t return_length; char *local_write_ptr= ptr->write_buffer; size_t write_length= ptr->write_buffer_offset; *error= MEMCACHED_SUCCESS; + WATCHPOINT_ASSERT(ptr->fd != -1); + if (ptr->write_buffer_offset == 0) return 0; /* Looking for memory overflows */ +#if defined(HAVE_DEBUG) if (write_length == MEMCACHED_MAX_BUFFER) WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); +#endif return_length= 0; while (write_length) { + WATCHPOINT_ASSERT(ptr->fd != -1); WATCHPOINT_ASSERT(write_length > 0); sent_length= 0; if (ptr->type == MEMCACHED_CONNECTION_UDP) @@ -269,9 +334,11 @@ static ssize_t io_flush(memcached_server_st *ptr, } else { - if ((ssize_t)(sent_length= write(ptr->fd, local_write_ptr, - write_length)) == -1) + WATCHPOINT_ASSERT(ptr->fd != -1); + if ((sent_length= write(ptr->fd, local_write_ptr, + write_length)) == -1) { + ptr->cached_errno= errno; switch (errno) { case ENOBUFS: @@ -281,7 +348,7 @@ static ssize_t io_flush(memcached_server_st *ptr, memcached_return rc; rc= io_wait(ptr, MEM_WRITE); - if (rc == MEMCACHED_SUCCESS) + if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) continue; memcached_quit_server(ptr, 1); @@ -289,20 +356,22 @@ static ssize_t io_flush(memcached_server_st *ptr, } default: memcached_quit_server(ptr, 1); - ptr->cached_errno= errno; *error= MEMCACHED_ERRNO; return -1; } } } + ptr->io_bytes_sent += sent_length; + local_write_ptr+= sent_length; write_length-= sent_length; return_length+= sent_length; } WATCHPOINT_ASSERT(write_length == 0); - WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset); + // Need to study this assert() WATCHPOINT_ASSERT(return_length == + // ptr->write_buffer_offset); ptr->write_buffer_offset= 0; return return_length; @@ -313,5 +382,75 @@ static ssize_t io_flush(memcached_server_st *ptr, */ void memcached_io_reset(memcached_server_st *ptr) { - memcached_quit_server(ptr, 0); + memcached_quit_server(ptr, 1); +} + +/** + * Read a given number of bytes from the server and place it into a specific + * buffer. Reset the IO channel on this server if an error occurs. + */ +memcached_return memcached_safe_read(memcached_server_st *ptr, + void *dta, + size_t size) +{ + size_t offset= 0; + char *data= dta; + + while (offset < size) + { + ssize_t nread= memcached_io_read(ptr, data + offset, size - offset); + if (nread <= 0) + { + memcached_io_reset(ptr); + return MEMCACHED_UNKNOWN_READ_FAILURE; + } + offset+= nread; + } + + return MEMCACHED_SUCCESS; +} + +memcached_return memcached_io_readline(memcached_server_st *ptr, + char *buffer_ptr, + size_t size) +{ + bool line_complete= false; + int total_nr= 0; + + while (!line_complete) + { + if (ptr->read_buffer_length == 0) + { + /* + * We don't have any data in the buffer, so let's fill the read + * buffer. Call the standard read function to avoid duplicating + * the logic. + */ + if (memcached_io_read(ptr, buffer_ptr, 1) != 1) + return MEMCACHED_UNKNOWN_READ_FAILURE; + + if (*buffer_ptr == '\n') + line_complete= true; + + ++buffer_ptr; + ++total_nr; + } + + /* Now let's look in the buffer and copy as we go! */ + while (ptr->read_buffer_length && total_nr < size && !line_complete) + { + *buffer_ptr = *ptr->read_ptr; + if (*buffer_ptr == '\n') + line_complete = true; + --ptr->read_buffer_length; + ++ptr->read_ptr; + ++total_nr; + ++buffer_ptr; + } + + if (total_nr == size) + return MEMCACHED_PROTOCOL_ERROR; + } + + return MEMCACHED_SUCCESS; }