From 14ed08eb76405aed0747568326058ab57c5e69a2 Mon Sep 17 00:00:00 2001 From: Brian Aker Date: Thu, 27 Dec 2012 06:32:03 -0500 Subject: [PATCH] First pass for restructured IO --- libmemcached/connect.cc | 55 +++++++++++++++++++++++++-------------- libmemcached/dump.cc | 3 ++- libmemcached/fetch.cc | 3 ++- libmemcached/instance.cc | 24 +++++++++++++++++ libmemcached/instance.hpp | 26 +++++++++++++++--- libmemcached/io.cc | 2 +- libmemcached/io.hpp | 2 +- libmemcached/is.h | 4 +++ libmemcached/version.cc | 6 +++-- 9 files changed, 95 insertions(+), 30 deletions(-) diff --git a/libmemcached/connect.cc b/libmemcached/connect.cc index 02631b60..fa4b7317 100644 --- a/libmemcached/connect.cc +++ b/libmemcached/connect.cc @@ -68,7 +68,8 @@ static memcached_return_t connect_poll(org::libmemcached::Instance* server) { struct pollfd fds[1]; fds[0].fd= server->fd; - fds[0].events= POLLOUT; + fds[0].events= server->events(); + fds[0].revents= 0; size_t loop_max= 5; @@ -130,6 +131,10 @@ static memcached_return_t connect_poll(org::libmemcached::Instance* server) return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT); } +#if 0 + server->revents(fds[0].revents); +#endif + if (fds[0].revents & POLLERR or fds[0].revents & POLLHUP or fds[0].revents & POLLNVAL) @@ -449,46 +454,55 @@ static memcached_return_t unix_socket_connect(org::libmemcached::Instance* serve #ifndef WIN32 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET); - int type= SOCK_STREAM; - if (SOCK_CLOEXEC) - { - type|= SOCK_CLOEXEC; - } + do { + int type= SOCK_STREAM; + if (SOCK_CLOEXEC) + { + type|= SOCK_CLOEXEC; + } - if (SOCK_NONBLOCK) - { - type|= SOCK_NONBLOCK; - } + if (SOCK_NONBLOCK) + { + type|= SOCK_NONBLOCK; + } - if ((server->fd= socket(AF_UNIX, type, 0)) < 0) - { - return memcached_set_errno(*server, errno, NULL); - } + if ((server->fd= socket(AF_UNIX, type, 0)) < 0) + { + return memcached_set_errno(*server, errno, NULL); + } - struct sockaddr_un servAddr; + struct sockaddr_un servAddr; - memset(&servAddr, 0, sizeof (struct sockaddr_un)); - servAddr.sun_family= AF_UNIX; - strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */ + memset(&servAddr, 0, sizeof (struct sockaddr_un)); + servAddr.sun_family= AF_UNIX; + strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */ - do { if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0) { switch (errno) { case EINPROGRESS: case EALREADY: + server->events(POLLOUT); + break; + case EINTR: + (void)closesocket(server->fd); + server->fd= INVALID_SOCKET; continue; case EISCONN: /* We were spinning waiting on connect */ { assert(0); // Programmer error - break; + (void)closesocket(server->fd); + server->fd= INVALID_SOCKET; + continue; } default: WATCHPOINT_ERRNO(errno); + (void)closesocket(server->fd); + server->fd= INVALID_SOCKET; return memcached_set_errno(*server, errno, MEMCACHED_AT); } } @@ -589,6 +603,7 @@ static memcached_return_t network_connect(org::libmemcached::Instance* server) case EINPROGRESS: // nonblocking mode - first return case EALREADY: // nonblocking mode - subsequent returns { + server->events(POLLOUT); server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS; memcached_return_t rc= connect_poll(server); diff --git a/libmemcached/dump.cc b/libmemcached/dump.cc index 618a63cd..13c2580a 100644 --- a/libmemcached/dump.cc +++ b/libmemcached/dump.cc @@ -83,7 +83,8 @@ static memcached_return_t ascii_dump(memcached_st *memc, memcached_dump_fn *call // Collect the returned items org::libmemcached::Instance* instance; - while ((instance= memcached_io_get_readable_server(memc))) + memcached_return_t read_ret= MEMCACHED_SUCCESS; + while ((instance= memcached_io_get_readable_server(memc, read_ret))) { memcached_return_t response_rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); if (response_rc == MEMCACHED_ITEM) diff --git a/libmemcached/fetch.cc b/libmemcached/fetch.cc index 76663703..ff729f9e 100644 --- a/libmemcached/fetch.cc +++ b/libmemcached/fetch.cc @@ -192,7 +192,8 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr, *error= MEMCACHED_MAXIMUM_RETURN; // We use this to see if we ever go into the loop org::libmemcached::Instance *server; - while ((server= memcached_io_get_readable_server(ptr))) + memcached_return_t read_ret= MEMCACHED_SUCCESS; + while ((server= memcached_io_get_readable_server(ptr, read_ret))) { char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; *error= memcached_response(server, buffer, sizeof(buffer), result); diff --git a/libmemcached/instance.cc b/libmemcached/instance.cc index 1414d725..ed840e55 100644 --- a/libmemcached/instance.cc +++ b/libmemcached/instance.cc @@ -44,6 +44,9 @@ static inline void _server_init(org::libmemcached::Instance* self, memcached_st { self->options.is_shutting_down= false; self->options.is_dead= false; + self->options.ready= false; + self->_events= 0; + self->_revents= 0; self->cursor_active_= 0; self->port_= port; self->fd= INVALID_SOCKET; @@ -108,6 +111,27 @@ static org::libmemcached::Instance* _server_create(org::libmemcached::Instance* return self; } +void org::libmemcached::Instance::events(short arg) +{ + if ((_events | arg) == _events) + { + return; + } + + _events|= arg; +} + +void org::libmemcached::Instance::revents(short arg) +{ + if (arg) + { + options.ready= true; + } + + _revents= arg; + _events&= short(~arg); +} + org::libmemcached::Instance* __instance_create_with(memcached_st *memc, org::libmemcached::Instance* self, const memcached_string_t& hostname, diff --git a/libmemcached/instance.hpp b/libmemcached/instance.hpp index fba02b2f..097d3500 100644 --- a/libmemcached/instance.hpp +++ b/libmemcached/instance.hpp @@ -91,11 +91,29 @@ struct Instance { } struct { - bool is_allocated:1; - bool is_initialized:1; - bool is_shutting_down:1; - bool is_dead:1; + bool is_allocated; + bool is_initialized; + bool is_shutting_down; + bool is_dead; + bool ready; } options; + + short _events; + short _revents; + + short events(void) + { + return _events; + } + + short revents(void) + { + return _revents; + } + + void events(short); + void revents(short); + uint32_t cursor_active_; in_port_t port_; memcached_socket_t fd; diff --git a/libmemcached/io.cc b/libmemcached/io.cc index d4599f77..6d9cad9a 100644 --- a/libmemcached/io.cc +++ b/libmemcached/io.cc @@ -727,7 +727,7 @@ void memcached_io_close(org::libmemcached::Instance* ptr) ptr->fd= INVALID_SOCKET; } -org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc) +org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&) { #define MAX_SERVERS_TO_POLL 100 struct pollfd fds[MAX_SERVERS_TO_POLL]; diff --git a/libmemcached/io.hpp b/libmemcached/io.hpp index 5625ce94..ed0b82ad 100644 --- a/libmemcached/io.hpp +++ b/libmemcached/io.hpp @@ -69,6 +69,6 @@ memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr, void *dta, const size_t size); -org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc); +org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&); memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr); diff --git a/libmemcached/is.h b/libmemcached/is.h index 339f2e26..cf56f65f 100644 --- a/libmemcached/is.h +++ b/libmemcached/is.h @@ -59,8 +59,12 @@ #define memcached_is_auto_eject_hosts(__object) ((__object)->flags.auto_eject_hosts) #define memcached_is_use_sort_hosts(__object) ((__object)->flags.use_sort_hosts) +#define memcached_is_ready(__object) ((__object)->options.ready) + #define memcached_is_weighted_ketama(__object) ((__object)->ketama.weighted_) +#define memcached_set_ready(__object, __flag) ((__object)->options.ready= (__flag)) + #define memcached_set_aes(__object, __flag) ((__object).flags.is_aes= __flag) #define memcached_set_udp(__object, __flag) ((__object).flags.use_udp= __flag) #define memcached_set_verify_key(__object, __flag) ((__object).flags.verify_key= __flag) diff --git a/libmemcached/version.cc b/libmemcached/version.cc index e9a54050..03098161 100644 --- a/libmemcached/version.cc +++ b/libmemcached/version.cc @@ -74,7 +74,8 @@ static inline memcached_return_t memcached_version_textual(memcached_st *ptr) { // Collect the returned items org::libmemcached::Instance* instance; - while ((instance= memcached_io_get_readable_server(ptr))) + memcached_return_t readable_error; + while ((instance= memcached_io_get_readable_server(ptr, readable_error))) { memcached_return_t rrc= memcached_response(instance, NULL); if (memcached_failed(rrc)) @@ -128,7 +129,8 @@ static inline memcached_return_t memcached_version_binary(memcached_st *ptr) { // Collect the returned items org::libmemcached::Instance* instance; - while ((instance= memcached_io_get_readable_server(ptr))) + memcached_return_t readable_error; + while ((instance= memcached_io_get_readable_server(ptr, readable_error))) { char buffer[32]; memcached_return_t rrc= memcached_response(instance, buffer, sizeof(buffer), NULL); -- 2.30.2