X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fio.c;h=6513492d5007734ec46599bfa287f5193263cd29;hb=c63b3f26c9e8d0214d3e1c70fb761f7700d61d2d;hp=3a523213357aaf3c15a7c7f29ac3d472630c992e;hpb=b2768ceafc13f9338e1c43d1bf0f1a65adb7e889;p=m6w6%2Flibmemcached diff --git a/libmemcached/io.c b/libmemcached/io.c index 3a523213..6513492d 100644 --- a/libmemcached/io.c +++ b/libmemcached/io.c @@ -11,18 +11,17 @@ #include "common.h" -#include -#include typedef enum { MEM_READ, MEM_WRITE } memc_read_or_write; -static ssize_t io_flush(memcached_server_instance_st *ptr, memcached_return_t *error); -static void increment_udp_message_id(memcached_server_instance_st *ptr); +static ssize_t io_flush(memcached_server_write_instance_st ptr, + memcached_return_t *error); +static void increment_udp_message_id(memcached_server_write_instance_st ptr); -static memcached_return_t io_wait(memcached_server_instance_st *ptr, +static memcached_return_t io_wait(memcached_server_write_instance_st ptr, memc_read_or_write read_or_write) { struct pollfd fds= { @@ -31,8 +30,15 @@ static memcached_return_t io_wait(memcached_server_instance_st *ptr, }; int error; - unlikely (read_or_write == MEM_WRITE) /* write */ + if (read_or_write == MEM_WRITE) /* write */ + { fds.events= POLLOUT; + WATCHPOINT_SET(ptr->io_wait_count.write++); + } + else + { + WATCHPOINT_SET(ptr->io_wait_count.read++); + } /* ** We are going to block on write, but at least on Solaris we might block @@ -53,16 +59,52 @@ static memcached_return_t io_wait(memcached_server_instance_st *ptr, if (ptr->root->flags.no_block == false) timeout= -1; - error= poll(&fds, 1, timeout); + size_t loop_max= 5; + while (--loop_max) // While loop is for ERESTART or EINTR + { + error= poll(&fds, 1, timeout); - if (error == 1) - return MEMCACHED_SUCCESS; - else if (error == 0) - return MEMCACHED_TIMEOUT; + switch (error) + { + case 1: // Success! + WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max); + WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max); + + return MEMCACHED_SUCCESS; + case 0: // Timeout occured, we let the while() loop do its thing. + return MEMCACHED_TIMEOUT; + default: + WATCHPOINT_ERRNO(get_socket_errno()); + switch (get_socket_errno()) + { +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif + case EINTR: + break; + default: + if (fds.revents & POLLERR) + { + int err; + socklen_t len= sizeof (err); + (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len); + ptr->cached_errno= (err == 0) ? get_socket_errno() : err; + } + else + { + ptr->cached_errno= get_socket_errno(); + } + memcached_quit_server(ptr, true); + + return MEMCACHED_FAILURE; + } + } + } /* Imposssible for anything other then -1 */ WATCHPOINT_ASSERT(error == -1); - memcached_quit_server(ptr, 1); + ptr->cached_errno= get_socket_errno(); + memcached_quit_server(ptr, true); return MEMCACHED_FAILURE; } @@ -73,7 +115,7 @@ static memcached_return_t io_wait(memcached_server_instance_st *ptr, * * @param ptr the server to pack */ -static bool repack_input_buffer(memcached_server_instance_st *ptr) +static bool repack_input_buffer(memcached_server_write_instance_st ptr) { if (ptr->read_ptr != ptr->read_buffer) { @@ -89,9 +131,10 @@ static bool repack_input_buffer(memcached_server_instance_st *ptr) if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER) { /* Just try a single read to grab what's available */ - ssize_t nr= read(ptr->fd, + ssize_t nr= recv(ptr->fd, ptr->read_ptr + ptr->read_data_length, - MEMCACHED_MAX_BUFFER - ptr->read_data_length); + MEMCACHED_MAX_BUFFER - ptr->read_data_length, + 0); if (nr > 0) { @@ -113,7 +156,7 @@ static bool repack_input_buffer(memcached_server_instance_st *ptr) * @param ptr the server to star processing iput messages for * @return true if we processed anything, false otherwise */ -static bool process_input_buffer(memcached_server_instance_st *ptr) +static bool process_input_buffer(memcached_server_write_instance_st ptr) { /* ** We might be able to process some of the response messages if we @@ -127,20 +170,21 @@ static bool process_input_buffer(memcached_server_instance_st *ptr) */ memcached_callback_st cb= *ptr->root->callbacks; - ptr->root->options.is_processing_input= true; + memcached_set_processing_input((memcached_st *)ptr->root, true); char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; memcached_return_t error; + memcached_st *root= (memcached_st *)ptr->root; error= memcached_response(ptr, buffer, sizeof(buffer), - &ptr->root->result); + &root->result); - ptr->root->options.is_processing_input = false; + memcached_set_processing_input(root, false); if (error == MEMCACHED_SUCCESS) { for (unsigned int x= 0; x < cb.number_of_callback; x++) { - error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context); + error= (*cb.callback[x])(ptr->root, &root->result, cb.context); if (error != MEMCACHED_SUCCESS) break; } @@ -156,6 +200,7 @@ static bool process_input_buffer(memcached_server_instance_st *ptr) static inline void memcached_io_cork_push(memcached_server_st *ptr) { + (void)ptr; #ifdef CORK if (ptr->root->flags.cork == false || ptr->state.is_corked) return; @@ -167,13 +212,12 @@ static inline void memcached_io_cork_push(memcached_server_st *ptr) ptr->state.is_corked= true; WATCHPOINT_ASSERT(ptr->state.is_corked == true); -#else - (void)ptr; #endif } static inline void memcached_io_cork_pop(memcached_server_st *ptr) { + (void)ptr; #ifdef CORK if (ptr->root->flags.cork == false || ptr->state.is_corked == false) return; @@ -185,12 +229,10 @@ static inline void memcached_io_cork_pop(memcached_server_st *ptr) ptr->state.is_corked= false; WATCHPOINT_ASSERT(ptr->state.is_corked == false); -#else - (void)ptr; #endif } -#ifdef UNUSED +#if 0 // Dead code, this should be removed. void memcached_io_preread(memcached_st *ptr) { unsigned int x; @@ -204,10 +246,10 @@ void memcached_io_preread(memcached_st *ptr) { size_t data_read; - data_read= read(ptr->hosts[x].fd, + data_read= recv(ptr->hosts[x].fd, ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length, - MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length); - if (data_read == -1) + MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length, 0); + if (data_read == SOCKET_ERROR) continue; ptr->hosts[x].read_buffer_length+= data_read; @@ -217,7 +259,7 @@ void memcached_io_preread(memcached_st *ptr) } #endif -memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, +memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr, void *buffer, size_t length, ssize_t *nread) { char *buffer_ptr; @@ -232,24 +274,32 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, while (1) { - data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER); + data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0); if (data_read > 0) + { break; - else if (data_read == -1) + } + else if (data_read == SOCKET_ERROR) { - ptr->cached_errno= errno; - memcached_return_t rc= MEMCACHED_UNKNOWN_READ_FAILURE; - switch (errno) + ptr->cached_errno= get_socket_errno(); + memcached_return_t rc= MEMCACHED_ERRNO; + switch (get_socket_errno()) { + case EWOULDBLOCK: +#ifdef USE_EAGAIN case EAGAIN: +#endif case EINTR: +#ifdef TARGET_OS_LINUX + case ERESTART: +#endif if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS) continue; /* fall through */ default: { - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); *nread= -1; return rc; } @@ -266,7 +316,8 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, for blocking I/O we do not return 0 and for non-blocking case it will return EGAIN if data is not immediatly available. */ - memcached_quit_server(ptr, 1); + WATCHPOINT_STRING("We had a zero length recv()"); + memcached_quit_server(ptr, true); *nread= -1; return MEMCACHED_UNKNOWN_READ_FAILURE; } @@ -305,13 +356,13 @@ memcached_return_t memcached_io_read(memcached_server_instance_st *ptr, return MEMCACHED_SUCCESS; } -ssize_t memcached_io_write(memcached_server_instance_st *ptr, - const void *buffer, size_t length, char with_flush) +static ssize_t _io_write(memcached_server_write_instance_st ptr, + const void *buffer, size_t length, bool with_flush) { size_t original_length; const char* buffer_ptr; - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); original_length= length; buffer_ptr= buffer; @@ -354,7 +405,7 @@ ssize_t memcached_io_write(memcached_server_instance_st *ptr, memcached_return_t rc; ssize_t sent_length; - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); sent_length= io_flush(ptr, &rc); if (sent_length == -1) return -1; @@ -370,7 +421,7 @@ ssize_t memcached_io_write(memcached_server_instance_st *ptr, if (with_flush) { memcached_return_t rc; - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); if (io_flush(ptr, &rc) == -1) { return -1; @@ -382,30 +433,65 @@ ssize_t memcached_io_write(memcached_server_instance_st *ptr, return (ssize_t) original_length; } -memcached_return_t memcached_io_close(memcached_server_instance_st *ptr) +ssize_t memcached_io_write(memcached_server_write_instance_st ptr, + const void *buffer, size_t length, bool with_flush) { - if (ptr->fd == -1) + return _io_write(ptr, buffer, length, with_flush); +} + +ssize_t memcached_io_writev(memcached_server_write_instance_st ptr, + const struct libmemcached_io_vector_st *vector, + size_t number_of, bool with_flush) +{ + ssize_t total= 0; + + for (size_t x= 0; x < number_of; x++, vector++) + { + ssize_t returnable; + + if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1) + { + return -1; + } + total+= returnable; + } + + if (with_flush) + { + if (memcached_io_write(ptr, NULL, 0, true) == -1) + { + return -1; + } + } + + return total; +} + + +memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr) +{ + if (ptr->fd == INVALID_SOCKET) { return MEMCACHED_SUCCESS; } /* in case of death shutdown to avoid blocking at close() */ - if (shutdown(ptr->fd, SHUT_RDWR) == -1 && errno != ENOTCONN) + if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN) { WATCHPOINT_NUMBER(ptr->fd); - WATCHPOINT_ERRNO(errno); - WATCHPOINT_ASSERT(errno); + WATCHPOINT_ERRNO(get_socket_errno()); + WATCHPOINT_ASSERT(get_socket_errno()); } - if (close(ptr->fd) == -1) + if (closesocket(ptr->fd) == SOCKET_ERROR) { - WATCHPOINT_ERRNO(errno); + WATCHPOINT_ERRNO(get_socket_errno()); } return MEMCACHED_SUCCESS; } -memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *memc) +memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; @@ -415,7 +501,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x) { - memcached_server_instance_st *instance= + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); if (instance->read_buffer_length > 0) /* I have data in the buffer */ @@ -435,7 +521,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem /* We have 0 or 1 server with pending events.. */ for (uint32_t x= 0; x< memcached_server_count(memc); ++x) { - memcached_server_instance_st *instance= + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x); if (memcached_server_response_count(instance) > 0) @@ -450,7 +536,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem int err= poll(fds, host_index, memc->poll_timeout); switch (err) { case -1: - memc->cached_errno = errno; + memc->cached_errno = get_socket_errno(); /* FALLTHROUGH */ case 0: break; @@ -461,7 +547,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem { for (uint32_t y= 0; y < memcached_server_count(memc); ++y) { - memcached_server_instance_st *instance= + memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y); if (instance->fd == fds[x].fd) @@ -474,7 +560,7 @@ memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *mem return NULL; } -static ssize_t io_flush(memcached_server_instance_st *ptr, +static ssize_t io_flush(memcached_server_write_instance_st ptr, memcached_return_t *error) { /* @@ -484,7 +570,7 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, */ { memcached_return_t rc; - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); rc= memcached_purge(ptr); if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED) @@ -497,7 +583,7 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, *error= MEMCACHED_SUCCESS; - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); // UDP Sanity check, make sure that we are not sending somthing too big if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH) @@ -517,21 +603,26 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, return_length= 0; while (write_length) { - WATCHPOINT_ASSERT(ptr->fd != -1); + WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET); WATCHPOINT_ASSERT(write_length > 0); sent_length= 0; if (ptr->type == MEMCACHED_CONNECTION_UDP) increment_udp_message_id(ptr); - sent_length= write(ptr->fd, local_write_ptr, write_length); - if (sent_length == -1) + sent_length= send(ptr->fd, local_write_ptr, write_length, 0); + if (sent_length == SOCKET_ERROR) { - ptr->cached_errno= errno; - switch (errno) + ptr->cached_errno= get_socket_errno(); + WATCHPOINT_ERRNO(get_socket_errno()); + WATCHPOINT_NUMBER(get_socket_errno()); + switch (get_socket_errno()) { case ENOBUFS: continue; + case EWOULDBLOCK: +#ifdef USE_EAGAIN case EAGAIN: +#endif { /* * We may be blocked on write because the input buffer @@ -549,11 +640,11 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT) continue; - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); return -1; } default: - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); *error= MEMCACHED_ERRNO; return -1; } @@ -562,7 +653,7 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, if (ptr->type == MEMCACHED_CONNECTION_UDP && (size_t)sent_length != write_length) { - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); return -1; } @@ -590,16 +681,16 @@ static ssize_t io_flush(memcached_server_instance_st *ptr, /* Eventually we will just kill off the server with the problem. */ -void memcached_io_reset(memcached_server_instance_st *ptr) +void memcached_io_reset(memcached_server_write_instance_st ptr) { - memcached_quit_server(ptr, 1); + memcached_quit_server(ptr, true); } /** * Read a given number of bytes from the server and place it into a specific * buffer. Reset the IO channel on this server if an error occurs. */ -memcached_return_t memcached_safe_read(memcached_server_instance_st *ptr, +memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr, void *dta, size_t size) { @@ -620,7 +711,7 @@ memcached_return_t memcached_safe_read(memcached_server_instance_st *ptr, return MEMCACHED_SUCCESS; } -memcached_return_t memcached_io_readline(memcached_server_instance_st *ptr, +memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr, char *buffer_ptr, size_t size) { @@ -678,7 +769,7 @@ memcached_return_t memcached_io_readline(memcached_server_instance_st *ptr, * extracts the message number from message_id, increments it and then * writes the new value back into the header */ -static void increment_udp_message_id(memcached_server_instance_st *ptr) +static void increment_udp_message_id(memcached_server_write_instance_st ptr) { struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer; uint16_t cur_req= get_udp_datagram_request_id(header); @@ -691,7 +782,7 @@ static void increment_udp_message_id(memcached_server_instance_st *ptr) header->request_id= htons((uint16_t) (thread_id | msg_num)); } -memcached_return_t memcached_io_init_udp_header(memcached_server_instance_st *ptr, uint16_t thread_id) +memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id) { if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID) return MEMCACHED_FAILURE;