First pass for restructured IO
authorBrian Aker <brian@tangent.org>
Thu, 27 Dec 2012 11:32:03 +0000 (06:32 -0500)
committerBrian Aker <brian@tangent.org>
Thu, 27 Dec 2012 11:32:03 +0000 (06:32 -0500)
libmemcached/connect.cc
libmemcached/dump.cc
libmemcached/fetch.cc
libmemcached/instance.cc
libmemcached/instance.hpp
libmemcached/io.cc
libmemcached/io.hpp
libmemcached/is.h
libmemcached/version.cc

index 02631b60bf167081a8b57a94ab57b7beee62e84d..fa4b7317c02ad4324f9b46711418fc312e9388fb 100644 (file)
@@ -68,7 +68,8 @@ static memcached_return_t connect_poll(org::libmemcached::Instance* server)
 {
   struct pollfd fds[1];
   fds[0].fd= server->fd;
-  fds[0].events= POLLOUT;
+  fds[0].events= server->events();
+  fds[0].revents= 0;
 
   size_t loop_max= 5;
 
@@ -130,6 +131,10 @@ static memcached_return_t connect_poll(org::libmemcached::Instance* server)
       return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
     }
 
+#if 0
+    server->revents(fds[0].revents);
+#endif
+
     if (fds[0].revents & POLLERR or
         fds[0].revents & POLLHUP or 
         fds[0].revents & POLLNVAL)
@@ -449,46 +454,55 @@ static memcached_return_t unix_socket_connect(org::libmemcached::Instance* serve
 #ifndef WIN32
   WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
 
-  int type= SOCK_STREAM;
-  if (SOCK_CLOEXEC)
-  {
-    type|= SOCK_CLOEXEC;
-  }
+  do {
+    int type= SOCK_STREAM;
+    if (SOCK_CLOEXEC)
+    {
+      type|= SOCK_CLOEXEC;
+    }
 
-  if (SOCK_NONBLOCK)
-  {
-    type|= SOCK_NONBLOCK;
-  }
+    if (SOCK_NONBLOCK)
+    {
+      type|= SOCK_NONBLOCK;
+    }
 
-  if ((server->fd= socket(AF_UNIX, type, 0)) < 0)
-  {
-    return memcached_set_errno(*server, errno, NULL);
-  }
+    if ((server->fd= socket(AF_UNIX, type, 0)) < 0)
+    {
+      return memcached_set_errno(*server, errno, NULL);
+    }
 
-  struct sockaddr_un servAddr;
+    struct sockaddr_un servAddr;
 
-  memset(&servAddr, 0, sizeof (struct sockaddr_un));
-  servAddr.sun_family= AF_UNIX;
-  strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
+    memset(&servAddr, 0, sizeof (struct sockaddr_un));
+    servAddr.sun_family= AF_UNIX;
+    strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
 
-  do {
     if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
     {
       switch (errno)
       {
       case EINPROGRESS:
       case EALREADY:
+        server->events(POLLOUT);
+        break;
+
       case EINTR:
+        (void)closesocket(server->fd);
+        server->fd= INVALID_SOCKET;
         continue;
 
       case EISCONN: /* We were spinning waiting on connect */
         {
           assert(0); // Programmer error
-          break;
+          (void)closesocket(server->fd);
+          server->fd= INVALID_SOCKET;
+          continue;
         }
 
       default:
         WATCHPOINT_ERRNO(errno);
+        (void)closesocket(server->fd);
+        server->fd= INVALID_SOCKET;
         return memcached_set_errno(*server, errno, MEMCACHED_AT);
       }
     }
@@ -589,6 +603,7 @@ static memcached_return_t network_connect(org::libmemcached::Instance* server)
     case EINPROGRESS: // nonblocking mode - first return
     case EALREADY: // nonblocking mode - subsequent returns
       {
+        server->events(POLLOUT);
         server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
         memcached_return_t rc= connect_poll(server);
 
index 618a63cd319b7e4d09cbe48a1e1cc224f2f23e3b..13c2580a8267aba049d836aa7ad7e828bcde8b69 100644 (file)
@@ -83,7 +83,8 @@ static memcached_return_t ascii_dump(memcached_st *memc, memcached_dump_fn *call
 
     // Collect the returned items
     org::libmemcached::Instance* instance;
-    while ((instance= memcached_io_get_readable_server(memc)))
+    memcached_return_t read_ret= MEMCACHED_SUCCESS;
+    while ((instance= memcached_io_get_readable_server(memc, read_ret)))
     {
       memcached_return_t response_rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
       if (response_rc == MEMCACHED_ITEM)
index 76663703cc6a97b68fee2bcc0d54182aa1e951ae..ff729f9e59942df907003c90cc0ce5254d85211f 100644 (file)
@@ -192,7 +192,8 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr,
 
   *error= MEMCACHED_MAXIMUM_RETURN; // We use this to see if we ever go into the loop
   org::libmemcached::Instance *server;
-  while ((server= memcached_io_get_readable_server(ptr)))
+  memcached_return_t read_ret= MEMCACHED_SUCCESS;
+  while ((server= memcached_io_get_readable_server(ptr, read_ret)))
   {
     char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
     *error= memcached_response(server, buffer, sizeof(buffer), result);
index 1414d7251c453ac127073001b7957f864b2cbcf6..ed840e55b31bdea89e77f288b8db8607b555f55d 100644 (file)
@@ -44,6 +44,9 @@ static inline void _server_init(org::libmemcached::Instance* self, memcached_st
 {
   self->options.is_shutting_down= false;
   self->options.is_dead= false;
+  self->options.ready= false;
+  self->_events= 0;
+  self->_revents= 0;
   self->cursor_active_= 0;
   self->port_= port;
   self->fd= INVALID_SOCKET;
@@ -108,6 +111,27 @@ static org::libmemcached::Instance* _server_create(org::libmemcached::Instance*
   return self;
 }
 
+void org::libmemcached::Instance::events(short arg)
+{
+  if ((_events | arg) == _events)
+  {
+    return;
+  }
+
+  _events|= arg;
+}
+
+void org::libmemcached::Instance::revents(short arg)
+{
+  if (arg)
+  {
+    options.ready= true;
+  }
+
+  _revents= arg;
+  _events&= short(~arg);
+}
+
 org::libmemcached::Instance* __instance_create_with(memcached_st *memc,
                                                     org::libmemcached::Instance* self,
                                                     const memcached_string_t& hostname,
index fba02b2f4da0f2be1afb39fe5fe5b1f70b8cbb69..097d350097d96578beeff4be78d9298ce62e3d66 100644 (file)
@@ -91,11 +91,29 @@ struct Instance {
   }
 
   struct {
-    bool is_allocated:1;
-    bool is_initialized:1;
-    bool is_shutting_down:1;
-    bool is_dead:1;
+    bool is_allocated;
+    bool is_initialized;
+    bool is_shutting_down;
+    bool is_dead;
+    bool ready;
   } options;
+
+  short _events;
+  short _revents;
+
+  short events(void)
+  {
+    return _events;
+  }
+
+  short revents(void)
+  {
+    return _revents;
+  }
+
+  void events(short);
+  void revents(short);
+
   uint32_t cursor_active_;
   in_port_t port_;
   memcached_socket_t fd;
index d4599f77c793e15095c3b2cf08f47a9f3d66e6fc..6d9cad9a51f3e815e56e959ee3814b609bcd4681 100644 (file)
@@ -727,7 +727,7 @@ void memcached_io_close(org::libmemcached::Instance* ptr)
   ptr->fd= INVALID_SOCKET;
 }
 
-org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc)
+org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&)
 {
 #define MAX_SERVERS_TO_POLL 100
   struct pollfd fds[MAX_SERVERS_TO_POLL];
index 5625ce94492134ea628a1a39b63dba64ca943774..ed0b82ad6714f2f50a21300d2143465c85e5c97f 100644 (file)
@@ -69,6 +69,6 @@ memcached_return_t memcached_safe_read(org::libmemcached::Instance* ptr,
                                        void *dta,
                                        const size_t size);
 
-org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc);
+org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&);
 
 memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr);
index 339f2e26f0fa3eb77ac5f3602be39e55fed7792d..cf56f65fa12c000ce151e64365e0ec9cfb89eb8a 100644 (file)
 #define memcached_is_auto_eject_hosts(__object) ((__object)->flags.auto_eject_hosts)
 #define memcached_is_use_sort_hosts(__object) ((__object)->flags.use_sort_hosts)
 
+#define memcached_is_ready(__object) ((__object)->options.ready)
+
 #define memcached_is_weighted_ketama(__object) ((__object)->ketama.weighted_)
 
+#define memcached_set_ready(__object, __flag) ((__object)->options.ready= (__flag))
+
 #define memcached_set_aes(__object, __flag) ((__object).flags.is_aes= __flag)
 #define memcached_set_udp(__object, __flag) ((__object).flags.use_udp= __flag)
 #define memcached_set_verify_key(__object, __flag) ((__object).flags.verify_key= __flag)
index e9a54050977de0843004f1e3e41dcc196d0f9c86..03098161a52895651a17871fbeffc9c56aa8f246 100644 (file)
@@ -74,7 +74,8 @@ static inline memcached_return_t memcached_version_textual(memcached_st *ptr)
   {
     // Collect the returned items
     org::libmemcached::Instance* instance;
-    while ((instance= memcached_io_get_readable_server(ptr)))
+    memcached_return_t readable_error;
+    while ((instance= memcached_io_get_readable_server(ptr, readable_error)))
     {
       memcached_return_t rrc= memcached_response(instance, NULL);
       if (memcached_failed(rrc))
@@ -128,7 +129,8 @@ static inline memcached_return_t memcached_version_binary(memcached_st *ptr)
   {
     // Collect the returned items
     org::libmemcached::Instance* instance;
-    while ((instance= memcached_io_get_readable_server(ptr)))
+    memcached_return_t readable_error;
+    while ((instance= memcached_io_get_readable_server(ptr, readable_error)))
     {
       char buffer[32];
       memcached_return_t rrc= memcached_response(instance, buffer, sizeof(buffer), NULL);