Added memcached_generate_hash() and more tests! :)
[awesomized/libmemcached] / libmemcached / memcached_io.c
index dc12ebe7d405cc1630444f19d730abc86a5ea45e..7bdb7238fa7737d6cbd8619bc33f65dce2ad98e3 100644 (file)
@@ -23,9 +23,9 @@ static memcached_return io_wait(memcached_server_st *ptr,
   int error;
 
   if (read_or_write == MEM_WRITE) /* write */
-    flags= POLLOUT |  POLLERR;
+    flags= POLLOUT;
   else
-    flags= POLLIN | POLLERR;
+    flags= POLLIN;
 
   memset(&fds, 0, sizeof(struct pollfd));
   fds[0].fd= ptr->fd;
@@ -90,8 +90,8 @@ void memcached_io_preread(memcached_st *ptr)
 }
 #endif
 
-ssize_t memcached_io_read(memcached_server_st *ptr,
-                          void *buffer, size_t length)
+memcached_return memcached_io_read(memcached_server_st *ptr,
+                                   void *buffer, size_t length, ssize_t *nread)
 {
   char *buffer_ptr;
 
@@ -105,31 +105,26 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
 
       while (1)
       {
-        data_read= read(ptr->fd, 
-                        ptr->read_buffer, 
-                        MEMCACHED_MAX_BUFFER);
+        data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER);
         if (data_read > 0)
           break;
         else if (data_read == -1)
         {
           ptr->cached_errno= errno;
+          memcached_return rc= MEMCACHED_UNKNOWN_READ_FAILURE;
           switch (errno)
           {
           case EAGAIN:
-          case EINTR: 
-            {
-              memcached_return rc;
-
-              rc= io_wait(ptr, MEM_READ);
+          case EINTR:
+            if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS)
+              continue;
+          /* fall through */
 
-              if (rc == MEMCACHED_SUCCESS)
-                continue;
-            }
-          /* fall trough */
           default:
             {
               memcached_quit_server(ptr, 1);
-              return -1;
+              *nread= -1;
+              return rc;
             }
           }
         }
@@ -145,7 +140,8 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
             it will return EGAIN if data is not immediatly available.
           */
           memcached_quit_server(ptr, 1);
-          return -1;
+          *nread= -1;
+          return MEMCACHED_UNKNOWN_READ_FAILURE;
         }
       }
 
@@ -177,7 +173,9 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
     }
   }
 
-  return (size_t)(buffer_ptr - (char*)buffer);
+  ptr->server_failure_counter= 0;
+  *nread = (size_t)(buffer_ptr - (char*)buffer);
+  return MEMCACHED_SUCCESS;
 }
 
 ssize_t memcached_io_write(memcached_server_st *ptr,
@@ -229,8 +227,10 @@ ssize_t memcached_io_write(memcached_server_st *ptr,
         return -1;
 
       /* If io_flush calls memcached_purge, sent_length may be 0 */
-      if (sent_length != 0)
-        WATCHPOINT_ASSERT(sent_length == buffer_end);
+      unlikely (sent_length != 0)
+      {
+        WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end);
+      }
     }
   }
 
@@ -276,6 +276,56 @@ memcached_return memcached_io_close(memcached_server_st *ptr)
   return MEMCACHED_SUCCESS;
 }
 
+memcached_server_st *memcached_io_get_readable_server(memcached_st *memc)
+{
+#define MAX_SERVERS_TO_POLL 100
+  struct pollfd fds[MAX_SERVERS_TO_POLL];
+  unsigned int index= 0;
+
+  for (unsigned int x= 0;
+       x< memc->number_of_hosts && index < MAX_SERVERS_TO_POLL;
+       ++x)
+  {
+    if (memc->hosts[x].read_buffer_length > 0) /* I have data in the buffer */
+      return &memc->hosts[x];
+
+    if (memcached_server_response_count(&memc->hosts[x]) > 0)
+    {
+      fds[index].events = POLLIN;
+      fds[index].revents = 0;
+      fds[index].fd = memc->hosts[x].fd;
+      ++index;
+    }
+  }
+
+  if (index < 2)
+  {
+    /* We have 0 or 1 server with pending events.. */
+    for (unsigned int x= 0; x< memc->number_of_hosts; ++x)
+      if (memcached_server_response_count(&memc->hosts[x]) > 0)
+        return &memc->hosts[x];
+
+    return NULL;
+  }
+
+  int err= poll(fds, index, memc->poll_timeout);
+  switch (err) {
+  case -1:
+    memc->cached_errno = errno;
+    /* FALLTHROUGH */
+  case 0:
+    break;
+  default:
+    for (unsigned int x= 0; x < index; ++x)
+      if (fds[x].revents & POLLIN)
+        for (unsigned int y= 0; y < memc->number_of_hosts; ++y)
+          if (memc->hosts[y].fd == fds[x].fd)
+            return &memc->hosts[y];
+  }
+
+  return NULL;
+}
+
 static ssize_t io_flush(memcached_server_st *ptr,
                         memcached_return *error)
 {
@@ -351,7 +401,8 @@ static ssize_t io_flush(memcached_server_st *ptr,
       }
     }
 
-    if (ptr->type == MEMCACHED_CONNECTION_UDP && sent_length != write_length)
+    if (ptr->type == MEMCACHED_CONNECTION_UDP &&
+        (size_t)sent_length != write_length)
     {
       memcached_quit_server(ptr, 1);
       return -1;
@@ -399,12 +450,12 @@ memcached_return memcached_safe_read(memcached_server_st *ptr,
 
   while (offset < size)
   {
-    ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
-    if (nread <= 0)
-    {
-      memcached_io_reset(ptr);
-      return MEMCACHED_UNKNOWN_READ_FAILURE;
-    }
+    ssize_t nread;
+    memcached_return rc= memcached_io_read(ptr, data + offset, size - offset,
+                                           &nread);
+    if (rc != MEMCACHED_SUCCESS)
+      return rc;
+
     offset+= nread;
   }
 
@@ -416,7 +467,7 @@ memcached_return memcached_io_readline(memcached_server_st *ptr,
                                        size_t size)
 {
   bool line_complete= false;
-  int total_nr= 0;
+  size_t total_nr= 0;
 
   while (!line_complete)
   {
@@ -427,8 +478,10 @@ memcached_return memcached_io_readline(memcached_server_st *ptr,
        * buffer. Call the standard read function to avoid duplicating
        * the logic.
        */
-      if (memcached_io_read(ptr, buffer_ptr, 1) != 1)
-        return MEMCACHED_UNKNOWN_READ_FAILURE;
+      ssize_t nread;
+      memcached_return rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
+      if (rc != MEMCACHED_SUCCESS)
+        return rc;
 
       if (*buffer_ptr == '\n')
         line_complete= true;