X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.cc;h=d7268940543cedeb0e7d8bc84145b442cfdcde74;hb=f0ec3e2ffaf483bbd1329e8c6aba37f729e6df4f;hp=74aa4a239d64bf3e9f894384860ce61694c19efe;hpb=ae6bc7501efd5aeaaee92dabe2da0ec2d1625c5b;p=awesomized%2Flibmemcached diff --git a/libmemcached/io.cc b/libmemcached/io.cc index 74aa4a23..d7268940 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -37,17 +37,159 @@ */ -#include "libmemcached/common.h" +#include -typedef enum { +enum memc_read_or_write { MEM_READ, MEM_WRITE -} memc_read_or_write; +}; -static ssize_t io_flush(memcached_server_write_instance_st ptr, - const bool with_flush, - memcached_return_t *error); -static void increment_udp_message_id(memcached_server_write_instance_st ptr); +/* + * The udp request id consists of two seperate sections + * 1) The thread id + * 2) The message number + * The thread id should only be set when the memcached_st struct is created + * and should not be changed. + * + * The message num is incremented for each new message we send, this function + * extracts the message number from message_id, increments it and then + * writes the new value back into the header + */ +static void increment_udp_message_id(memcached_server_write_instance_st ptr) +{ + struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; + uint16_t cur_req= get_udp_datagram_request_id(header); + int msg_num= get_msg_num_from_request_id(cur_req); + int thread_id= get_thread_id_from_request_id(cur_req); + + if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) + msg_num= 0; + + header->request_id= htons((uint16_t) (thread_id | msg_num)); +} + +/** + * Try to fill the input buffer for a server with as much + * data as possible. + * + * @param ptr the server to pack + */ +static bool repack_input_buffer(memcached_server_write_instance_st ptr) +{ + if (ptr->read_ptr != ptr->read_buffer) + { + /* Move all of the data to the beginning of the buffer so + ** that we can fit more data into the buffer... + */ + memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length); + ptr->read_ptr= ptr->read_buffer; + ptr->read_data_length= ptr->read_buffer_length; + } + + /* There is room in the buffer, try to fill it! */ + if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) + { + do { + /* Just try a single read to grab what's available */ + ssize_t nr= recv(ptr->fd, + ptr->read_ptr + ptr->read_data_length, + MEMCACHED_MAX_BUFFER - ptr->read_data_length, + MSG_DONTWAIT); + + switch (nr) + { + case SOCKET_ERROR: + { + switch (get_socket_errno()) + { + case EINTR: + continue; + + case EWOULDBLOCK: +#ifdef USE_EAGAIN + case EAGAIN: +#endif +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + break; // No IO is fine, we can just move on + + default: + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + } + } + break; + + case 0: // Shutdown on the socket has occurred + { + memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); + } + break; + + default: + { + ptr->read_data_length+= size_t(nr); + ptr->read_buffer_length+= size_t(nr); + return true; + } + break; + } + } while (0); + } + return false; +} + +/** + * If the we have callbacks connected to this server structure + * we may start process the input queue and fire the callbacks + * for the incomming messages. This function is _only_ called + * when the input buffer is full, so that we _know_ that we have + * at least _one_ message to process. + * + * @param ptr the server to star processing iput messages for + * @return true if we processed anything, false otherwise + */ +static bool process_input_buffer(memcached_server_write_instance_st ptr) +{ + /* + ** We might be able to process some of the response messages if we + ** have a callback set up + */ + if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false) + { + /* + * We might have responses... try to read them out and fire + * callbacks + */ + memcached_callback_st cb= *ptr->root->callbacks; + + memcached_set_processing_input((memcached_st *)ptr->root, true); + + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + memcached_return_t error; + memcached_st *root= (memcached_st *)ptr->root; + error= memcached_response(ptr, buffer, sizeof(buffer), + &root->result); + + memcached_set_processing_input(root, false); + + if (error == MEMCACHED_SUCCESS) + { + for (unsigned int x= 0; x < cb.number_of_callback; x++) + { + error= (*cb.callback[x])(ptr->root, &root->result, cb.context); + if (error != MEMCACHED_SUCCESS) + break; + } + + /* @todo what should I do with the error message??? */ + } + /* @todo what should I do with other error messages?? */ + return true; + } + + return false; +} static memcached_return_t io_wait(memcached_server_write_instance_st ptr, memc_read_or_write read_or_write) @@ -56,8 +198,6 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, fds.fd= ptr->fd; fds.events= POLLIN; - int error; - if (read_or_write == MEM_WRITE) /* write */ { fds.events= POLLOUT; @@ -80,14 +220,21 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, { memcached_return_t rc= memcached_purge(ptr); if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) + { return MEMCACHED_FAILURE; + } + } + + if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this) + { + return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); } size_t loop_max= 5; while (--loop_max) // While loop is for ERESTART or EINTR { - error= poll(&fds, 1, ptr->root->poll_timeout); + int error= poll(&fds, 1, ptr->root->poll_timeout); switch (error) { case 1: // Success! @@ -95,8 +242,10 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); return MEMCACHED_SUCCESS; + case 0: // Timeout occured, we let the while() loop do its thing. - return MEMCACHED_TIMEOUT; + return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + default: WATCHPOINT_ERRNO(get_socket_errno()); switch (get_socket_errno()) @@ -106,201 +255,253 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr, #endif case EINTR: break; + + case EFAULT: + case ENOMEM: + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT); + + case EINVAL: + return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid")); + default: if (fds.revents & POLLERR) { int err; socklen_t len= sizeof (err); (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); - ptr->cached_errno= (err == 0) ? get_socket_errno() : err; + memcached_set_errno(*ptr, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT); } else { - ptr->cached_errno= get_socket_errno(); + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } memcached_quit_server(ptr, true); - return MEMCACHED_FAILURE; + return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } } } - /* Imposssible for anything other then -1 */ - WATCHPOINT_ASSERT(error == -1); - ptr->cached_errno= get_socket_errno(); memcached_quit_server(ptr, true); - return MEMCACHED_FAILURE; + return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } -memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) +static ssize_t io_flush(memcached_server_write_instance_st ptr, + const bool with_flush, + memcached_return_t *error) { - return io_wait(ptr, MEM_WRITE); -} - -/** - * Try to fill the input buffer for a server with as much - * data as possible. - * - * @param ptr the server to pack + /* + ** We might want to purge the input buffer if we haven't consumed + ** any output yet... The test for the limits is the purge is inline + ** in the purge function to avoid duplicating the logic.. */ -static bool repack_input_buffer(memcached_server_write_instance_st ptr) -{ - if (ptr->read_ptr != ptr->read_buffer) - { - /* Move all of the data to the beginning of the buffer so - ** that we can fit more data into the buffer... - */ - memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length); - ptr->read_ptr= ptr->read_buffer; - ptr->read_data_length= ptr->read_buffer_length; - } - - /* There is room in the buffer, try to fill it! */ - if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) { - /* Just try a single read to grab what's available */ - ssize_t nr= recv(ptr->fd, - ptr->read_ptr + ptr->read_data_length, - MEMCACHED_MAX_BUFFER - ptr->read_data_length, - 0); + memcached_return_t rc; + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + rc= memcached_purge(ptr); - if (nr > 0) + if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) { - ptr->read_data_length+= (size_t)nr; - ptr->read_buffer_length+= (size_t)nr; - return true; + return -1; } } - return false; -} + ssize_t sent_length; + size_t return_length; + char *local_write_ptr= ptr->write_buffer; + size_t write_length= ptr->write_buffer_offset; -/** - * If the we have callbacks connected to this server structure - * we may start process the input queue and fire the callbacks - * for the incomming messages. This function is _only_ called - * when the input buffer is full, so that we _know_ that we have - * at least _one_ message to process. - * - * @param ptr the server to star processing iput messages for - * @return true if we processed anything, false otherwise - */ -static bool process_input_buffer(memcached_server_write_instance_st ptr) -{ - /* - ** We might be able to process some of the response messages if we - ** have a callback set up - */ - if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false) + *error= MEMCACHED_SUCCESS; + + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + + // UDP Sanity check, make sure that we are not sending somthing too big + if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) { - /* - * We might have responses... try to read them out and fire - * callbacks - */ - memcached_callback_st cb= *ptr->root->callbacks; + *error= MEMCACHED_WRITE_FAILURE; + return -1; + } - memcached_set_processing_input((memcached_st *)ptr->root, true); + if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP + && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) + { + return 0; + } - char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; - memcached_return_t error; - memcached_st *root= (memcached_st *)ptr->root; - error= memcached_response(ptr, buffer, sizeof(buffer), - &root->result); + /* Looking for memory overflows */ +#if defined(DEBUG) + if (write_length == MEMCACHED_MAX_BUFFER) + WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); + WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); +#endif - memcached_set_processing_input(root, false); + return_length= 0; + while (write_length) + { + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + WATCHPOINT_ASSERT(write_length > 0); + sent_length= 0; + if (ptr->type == MEMCACHED_CONNECTION_UDP) + increment_udp_message_id(ptr); - if (error == MEMCACHED_SUCCESS) + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); + if (with_flush) { - for (unsigned int x= 0; x < cb.number_of_callback; x++) + sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); + } + else + { + sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); + } + + if (sent_length == SOCKET_ERROR) + { + memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); +#if 0 // @todo I should look at why we hit this bit of code hard frequently + WATCHPOINT_ERRNO(get_socket_errno()); + WATCHPOINT_NUMBER(get_socket_errno()); +#endif + switch (get_socket_errno()) { - error= (*cb.callback[x])(ptr->root, &root->result, cb.context); - if (error != MEMCACHED_SUCCESS) - break; + case ENOBUFS: + continue; + case EWOULDBLOCK: +#ifdef USE_EAGAIN + case EAGAIN: +#endif + { + /* + * We may be blocked on write because the input buffer + * is full. Let's check if we have room in our input + * buffer for more data and retry the write before + * waiting.. + */ + if (repack_input_buffer(ptr) or + process_input_buffer(ptr)) + { + continue; + } + + memcached_return_t rc= io_wait(ptr, MEM_WRITE); + if (memcached_success(rc)) + { + continue; + } + else if (rc == MEMCACHED_TIMEOUT) + { + *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT); + return -1; + } + + memcached_quit_server(ptr, true); + *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + return -1; + } + case ENOTCONN: + case EPIPE: + default: + memcached_quit_server(ptr, true); + *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); + WATCHPOINT_ASSERT(ptr->fd == -1); + return -1; } + } - /* @todo what should I do with the error message??? */ + if (ptr->type == MEMCACHED_CONNECTION_UDP and + (size_t)sent_length != write_length) + { + memcached_quit_server(ptr, true); + *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT); + return -1; } - /* @todo what should I do with other error messages?? */ - return true; - } - return false; -} + ptr->io_bytes_sent += (uint32_t) sent_length; -#if 0 // Dead code, this should be removed. -void memcached_io_preread(memcached_st *ptr) -{ - unsigned int x; + local_write_ptr+= sent_length; + write_length-= (uint32_t) sent_length; + return_length+= (uint32_t) sent_length; + } - return; + WATCHPOINT_ASSERT(write_length == 0); + // Need to study this assert() WATCHPOINT_ASSERT(return_length == + // ptr->write_buffer_offset); - for (x= 0; x < memcached_server_count(ptr); x++) - { - if (memcached_server_response_count(ptr, x) && - ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER ) - { - size_t data_read; + // if we are a udp server, the begining of the buffer is reserverd for + // the upd frame header + if (ptr->type == MEMCACHED_CONNECTION_UDP) + ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; + else + ptr->write_buffer_offset= 0; - data_read= recv(ptr->hosts[x].fd, - ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length, - MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length, 0); - if (data_read == SOCKET_ERROR) - continue; + return (ssize_t) return_length; +} - ptr->hosts[x].read_buffer_length+= data_read; - ptr->hosts[x].read_data_length+= data_read; - } - } +memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr) +{ + return io_wait(ptr, MEM_WRITE); } -#endif memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, void *buffer, size_t length, ssize_t *nread) { - char *buffer_ptr; + assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error + char *buffer_ptr= static_cast(buffer); - buffer_ptr= static_cast(buffer); + if (ptr->fd == INVALID_SOCKET) + { + assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state"); + return MEMCACHED_CONNECTION_FAILURE; + } while (length) { if (not ptr->read_buffer_length) { ssize_t data_read; - - while (1) + do { - data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0); - if (data_read > 0) - { - break; - } - else if (data_read == SOCKET_ERROR) + data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT); + if (data_read == SOCKET_ERROR) { - ptr->cached_errno= get_socket_errno(); - memcached_return_t rc= MEMCACHED_ERRNO; switch (get_socket_errno()) { + case EINTR: // We just retry + continue; + + case ETIMEDOUT: // OSX case EWOULDBLOCK: #ifdef USE_EAGAIN case EAGAIN: #endif - case EINTR: #ifdef TARGET_OS_LINUX case ERESTART: #endif - if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS) + if (memcached_success(io_wait(ptr, MEM_READ))) + { continue; + } + return MEMCACHED_IN_PROGRESS; + /* fall through */ + case ENOTCONN: // Programmer Error + WATCHPOINT_ASSERT(0); + case ENOTSOCK: + WATCHPOINT_ASSERT(0); + case EBADF: + assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket"); + case EINVAL: + case EFAULT: + case ECONNREFUSED: default: { memcached_quit_server(ptr, true); *nread= -1; - return rc; + return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT); } } } - else + else if (data_read == 0) { /* EOF. Any data received so far is incomplete @@ -314,9 +515,9 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, WATCHPOINT_STRING("We had a zero length recv()"); memcached_quit_server(ptr, true); *nread= -1; - return MEMCACHED_UNKNOWN_READ_FAILURE; + return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT); } - } + } while (data_read <= 0); ptr->io_bytes_sent = 0; ptr->read_data_length= (size_t) data_read; @@ -346,11 +547,67 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, } } - ptr->server_failure_counter= 0; *nread = (ssize_t)(buffer_ptr - (char*)buffer); + return MEMCACHED_SUCCESS; } +memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr) +{ + assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st"); + + if (ptr->fd == INVALID_SOCKET) + { + assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state"); + return MEMCACHED_CONNECTION_FAILURE; + } + + ssize_t data_read; + char buffer[MEMCACHED_MAX_BUFFER]; + do + { + data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT); + if (data_read == SOCKET_ERROR) + { + switch (get_socket_errno()) + { + case EINTR: // We just retry + continue; + + case ETIMEDOUT: // OSX + case EWOULDBLOCK: +#ifdef USE_EAGAIN + case EAGAIN: +#endif +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + if (memcached_success(io_wait(ptr, MEM_READ))) + { + continue; + } + return MEMCACHED_IN_PROGRESS; + + /* fall through */ + + case ENOTCONN: // Programmer Error + WATCHPOINT_ASSERT(0); + case ENOTSOCK: + WATCHPOINT_ASSERT(0); + case EBADF: + assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state"); + case EINVAL: + case EFAULT: + case ECONNREFUSED: + default: + return MEMCACHED_CONNECTION_FAILURE; // We want this! + } + } + } while (data_read > 0); + + return MEMCACHED_CONNECTION_FAILURE; +} + static ssize_t _io_write(memcached_server_write_instance_st ptr, const void *buffer, size_t length, bool with_flush) { @@ -393,11 +650,10 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr, if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP) { - memcached_return_t rc; - ssize_t sent_length; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - sent_length= io_flush(ptr, with_flush, &rc); + + memcached_return_t rc; + ssize_t sent_length= io_flush(ptr, with_flush, &rc); if (sent_length == -1) { return -1; @@ -459,11 +715,11 @@ ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, } -memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr) +void memcached_io_close(memcached_server_write_instance_st ptr) { if (ptr->fd == INVALID_SOCKET) { - return MEMCACHED_SUCCESS; + return; } /* in case of death shutdown to avoid blocking at close() */ @@ -478,8 +734,8 @@ memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr) { WATCHPOINT_ERRNO(get_socket_errno()); } - - return MEMCACHED_SUCCESS; + ptr->state= MEMCACHED_SERVER_STATE_NEW; + ptr->fd= INVALID_SOCKET; } memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc) @@ -488,9 +744,7 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st struct pollfd fds[MAX_SERVERS_TO_POLL]; unsigned int host_index= 0; - for (uint32_t x= 0; - x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; - ++x) + for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) { memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); @@ -524,13 +778,15 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st return NULL; } - int err= poll(fds, host_index, memc->poll_timeout); - switch (err) { + int error= poll(fds, host_index, memc->poll_timeout); + switch (error) + { case -1: - memcached_set_errno(memc, get_socket_errno(), NULL); + memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT); /* FALLTHROUGH */ case 0: break; + default: for (size_t x= 0; x < host_index; ++x) { @@ -551,143 +807,6 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st return NULL; } -static ssize_t io_flush(memcached_server_write_instance_st ptr, - const bool with_flush, - memcached_return_t *error) -{ - /* - ** We might want to purge the input buffer if we haven't consumed - ** any output yet... The test for the limits is the purge is inline - ** in the purge function to avoid duplicating the logic.. - */ - { - memcached_return_t rc; - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - rc= memcached_purge(ptr); - - if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) - { - return -1; - } - } - ssize_t sent_length; - size_t return_length; - char *local_write_ptr= ptr->write_buffer; - size_t write_length= ptr->write_buffer_offset; - - *error= MEMCACHED_SUCCESS; - - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - - // UDP Sanity check, make sure that we are not sending somthing too big - if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) - { - return -1; - } - - if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP - && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH)) - return 0; - - /* Looking for memory overflows */ -#if defined(DEBUG) - if (write_length == MEMCACHED_MAX_BUFFER) - WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr); - WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length)); -#endif - - return_length= 0; - while (write_length) - { - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - WATCHPOINT_ASSERT(write_length > 0); - sent_length= 0; - if (ptr->type == MEMCACHED_CONNECTION_UDP) - increment_udp_message_id(ptr); - - WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); - if (with_flush) - { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT); - } - else - { - sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE); - } - - if (sent_length == SOCKET_ERROR) - { - ptr->cached_errno= get_socket_errno(); -#if 0 // @todo I should look at why we hit this bit of code hard frequently - WATCHPOINT_ERRNO(get_socket_errno()); - WATCHPOINT_NUMBER(get_socket_errno()); -#endif - switch (get_socket_errno()) - { - case ENOBUFS: - continue; - case EWOULDBLOCK: -#ifdef USE_EAGAIN - case EAGAIN: -#endif - { - /* - * We may be blocked on write because the input buffer - * is full. Let's check if we have room in our input - * buffer for more data and retry the write before - * waiting.. - */ - if (repack_input_buffer(ptr) || - process_input_buffer(ptr)) - continue; - - memcached_return_t rc; - rc= io_wait(ptr, MEM_WRITE); - - if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) - continue; - - memcached_quit_server(ptr, true); - return -1; - } - case ENOTCONN: - case EPIPE: - default: - memcached_quit_server(ptr, true); - *error= MEMCACHED_ERRNO; - WATCHPOINT_ASSERT(ptr->fd == -1); - return -1; - } - } - - if (ptr->type == MEMCACHED_CONNECTION_UDP && - (size_t)sent_length != write_length) - { - memcached_quit_server(ptr, true); - return -1; - } - - ptr->io_bytes_sent += (uint32_t) sent_length; - - local_write_ptr+= sent_length; - write_length-= (uint32_t) sent_length; - return_length+= (uint32_t) sent_length; - } - - WATCHPOINT_ASSERT(write_length == 0); - // Need to study this assert() WATCHPOINT_ASSERT(return_length == - // ptr->write_buffer_offset); - - // if we are a udp server, the begining of the buffer is reserverd for - // the upd frame header - if (ptr->type == MEMCACHED_CONNECTION_UDP) - ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH; - else - ptr->write_buffer_offset= 0; - - return (ssize_t) return_length; -} - /* Eventually we will just kill off the server with the problem. */ @@ -710,10 +829,14 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, while (offset < size) { ssize_t nread; - memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset, - &nread); - if (rc != MEMCACHED_SUCCESS) + memcached_return_t rc; + + while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { }; + + if (memcached_failed(rc)) + { return rc; + } offset+= (size_t) nread; } @@ -723,12 +846,13 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, char *buffer_ptr, - size_t size) + size_t size, + size_t& total_nr) { + total_nr= 0; bool line_complete= false; - size_t total_nr= 0; - while (!line_complete) + while (not line_complete) { if (ptr->read_buffer_length == 0) { @@ -739,8 +863,15 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, */ ssize_t nread; memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread); - if (rc != MEMCACHED_SUCCESS) + if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) + { + memcached_quit_server(ptr, true); + return memcached_set_error(*ptr, rc, MEMCACHED_AT); + } + else if (memcached_failed(rc)) + { return rc; + } if (*buffer_ptr == '\n') line_complete= true; @@ -768,30 +899,6 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, return MEMCACHED_SUCCESS; } -/* - * The udp request id consists of two seperate sections - * 1) The thread id - * 2) The message number - * The thread id should only be set when the memcached_st struct is created - * and should not be changed. - * - * The message num is incremented for each new message we send, this function - * extracts the message number from message_id, increments it and then - * writes the new value back into the header - */ -static void increment_udp_message_id(memcached_server_write_instance_st ptr) -{ - struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; - uint16_t cur_req= get_udp_datagram_request_id(header); - int msg_num= get_msg_num_from_request_id(cur_req); - int thread_id= get_thread_id_from_request_id(cur_req); - - if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0) - msg_num= 0; - - header->request_id= htons((uint16_t) (thread_id | msg_num)); -} - memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id) { if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)