Use poll() to determine next readable server
authorTrond Norbye <trond.norbye@sun.com>
Tue, 24 Mar 2009 14:33:38 +0000 (15:33 +0100)
committerTrond Norbye <trond.norbye@sun.com>
Tue, 24 Mar 2009 14:33:38 +0000 (15:33 +0100)
libmemcached/memcached_fetch.c
libmemcached/memcached_io.c
libmemcached/memcached_io.h

index e810b64ae326608c0d222699d9f4c72271f98be6..ac76dfc24ad32ff48a77a6773ff7f1bc17a4ef41 100644 (file)
@@ -189,47 +189,32 @@ char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length,
   return NULL;
 }
 
-memcached_result_st *memcached_fetch_result(memcached_st *ptr, 
+memcached_result_st *memcached_fetch_result(memcached_st *ptr,
                                             memcached_result_st *result,
                                             memcached_return *error)
 {
+  memcached_server_st *server;
 
   if (ptr->flags & MEM_USE_UDP)
   {
     *error= MEMCACHED_NOT_SUPPORTED;
     return NULL;
   }
-  
-  if (result == NULL)
-    result= memcached_result_create(ptr, NULL);
 
-#ifdef UNUSED
-  if (ptr->flags & MEM_NO_BLOCK)
-    memcached_io_preread(ptr);
-#endif
+  if (result == NULL)
+    if ((result= memcached_result_create(ptr, NULL)) == NULL)
+      return NULL;
 
-  while (ptr->cursor_server < ptr->number_of_hosts)
-  {
+  while ((server = memcached_io_get_readable_server(ptr)) != NULL) {
     char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+    *error= memcached_response(server, buffer, sizeof(buffer), result);
 
-    if (memcached_server_response_count(&ptr->hosts[ptr->cursor_server]) == 0)
-    {
-      ptr->cursor_server++;
-      continue;
-    }
-
-    *error= memcached_response(&ptr->hosts[ptr->cursor_server], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result);
-    
-    if (*error == MEMCACHED_END) /* END means that we move on to the next */
-    {
-      memcached_server_response_reset(&ptr->hosts[ptr->cursor_server]);
-      ptr->cursor_server++;
-      continue;
-    }
-    else if (*error == MEMCACHED_SUCCESS)
+    if (*error == MEMCACHED_SUCCESS)
       return result;
+    else if (*error == MEMCACHED_END)
+      memcached_server_response_reset(server);
     else
-      return NULL;
+      break;
   }
 
   /* We have completed reading data */
@@ -238,6 +223,6 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr,
   else
     memcached_string_reset(&result->value);
 
-  ptr->cursor_server= 0;
   return NULL;
 }
+
index dc12ebe7d405cc1630444f19d730abc86a5ea45e..43fb509e64c18f44f8dd6352d78e21511b8fc2be 100644 (file)
@@ -276,6 +276,54 @@ memcached_return memcached_io_close(memcached_server_st *ptr)
   return MEMCACHED_SUCCESS;
 }
 
+memcached_server_st *memcached_io_get_readable_server(memcached_st *memc)
+{
+#define MAX_SERVERS_TO_POLL 100
+  struct pollfd fds[MAX_SERVERS_TO_POLL];
+  int index= 0;
+
+  for (int x= 0; x< memc->number_of_hosts && index < MAX_SERVERS_TO_POLL; ++x)
+  {
+    if (memc->hosts[x].read_buffer_length > 0) /* I have data in the buffer */
+      return &memc->hosts[x];
+
+    if (memcached_server_response_count(&memc->hosts[x]) > 0)
+    {
+      fds[index].events = POLLIN;
+      fds[index].revents = 0;
+      fds[index].fd = memc->hosts[x].fd;
+      ++index;
+    }
+  }
+
+  if (index < 2)
+  {
+    /* We have 0 or 1 server with pending events.. */
+    for (int x= 0; x< memc->number_of_hosts; ++x)
+      if (memcached_server_response_count(&memc->hosts[x]) > 0)
+        return &memc->hosts[x];
+
+    return NULL;
+  }
+
+  int err= poll(fds, index, memc->poll_timeout);
+  switch (err) {
+  case -1:
+    memc->cached_errno = errno;
+    /* FALLTHROUGH */
+  case 0:
+    break;
+  default:
+    for (int x= 0; x < index; ++x)
+      if (fds[x].revents & POLLIN)
+        for (int y= 0; y < memc->number_of_hosts; ++y)
+          if (memc->hosts[y].fd == fds[x].fd)
+            return &memc->hosts[y];
+  }
+
+  return NULL;
+}
+
 static ssize_t io_flush(memcached_server_st *ptr,
                         memcached_return *error)
 {
index f1a622c0e5451a686c37330ee0d8a50c4bd0464c..5442277447d658020b5a66a28427fad353bd04d8 100644 (file)
@@ -43,4 +43,7 @@ memcached_return memcached_read_one_response(memcached_server_st *ptr,
                                              memcached_result_st *result);
 memcached_return memcached_io_init_udp_header(memcached_server_st *ptr,
                                               uint16_t thread_id);
+
+memcached_server_st *memcached_io_get_readable_server(memcached_st *memc);
+
 #endif /* __MEMCACHED_IO_H__ */