Added tag 0.5 for changeset 76a1ddcb86f3
[awesomized/libmemcached] / lib / memcached_io.c
index 77df4319076b8286203cc7d6adfdbb4ea3660c00..9277b6ba3a883a387829acc9130046843f3756f4 100644 (file)
@@ -2,7 +2,9 @@
   Basic socket buffered IO
 */
 
-#include <memcached.h>
+#include "common.h"
+#include "memcached_io.h"
+#include <sys/select.h>
 
 ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
                           char *buffer, size_t length)
@@ -17,10 +19,57 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
   {
     if (!ptr->read_buffer_length)
     {
-      ptr->read_buffer_length= recv(ptr->hosts[server_key].fd, 
-                                    ptr->read_buffer, 
-                                    MEMCACHED_MAX_BUFFER, 0);
-      ptr->read_ptr= ptr->read_buffer;
+      if (length > 1)
+      {
+
+        size_t data_read;
+        data_read= recv(ptr->hosts[server_key].fd, 
+                        buffer_ptr, 
+                        length - x, 0);
+        if (data_read == -1)
+        {
+          return -1;
+        }
+        if (data_read == 0)
+          return x;
+
+        data_read+= x;
+
+        return data_read;
+      }
+      else
+      {
+        size_t data_read;
+try_again:
+
+        if (ptr->flags & MEM_NO_BLOCK)
+        {
+          struct timeval local_tv;
+          fd_set set;
+
+          memset(&local_tv, 0, sizeof(struct timeval));
+
+          local_tv.tv_sec= 0;
+          local_tv.tv_usec= 300;
+
+          FD_ZERO(&set);
+          FD_SET(ptr->hosts[server_key].fd, &set);
+
+          select(1, &set, NULL, NULL, &local_tv);
+        }
+
+        data_read= recv(ptr->hosts[server_key].fd, 
+                        ptr->read_buffer, 
+                        MEMCACHED_MAX_BUFFER, 0);
+        if (data_read == -1)
+        {
+          if (errno == EAGAIN)
+            goto try_again;
+          return -1;
+        }
+        ptr->read_buffer_length= data_read;
+        ptr->read_ptr= ptr->read_buffer;
+      }
 
       if (ptr->read_buffer_length == -1)
         return -1;
@@ -37,7 +86,7 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
 }
 
 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
-                        char *buffer, size_t length)
+                        char *buffer, size_t length, char with_flush)
 {
   unsigned long long x;
 
@@ -45,21 +94,21 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
   {
     ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
     ptr->write_buffer_offset++;
+
     if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
     {
       size_t sent_length;
 
-      if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, 
-                             MEMCACHED_MAX_BUFFER, 0)) == -1)
-        return -1;
+      sent_length= memcached_io_flush(ptr, server_key);
 
       assert(sent_length == MEMCACHED_MAX_BUFFER);
-      ptr->write_between_flush+= MEMCACHED_MAX_BUFFER;
-
       ptr->write_buffer_offset= 0;
     }
   }
 
+  if (with_flush)
+    memcached_io_flush(ptr, server_key);
+
   return length;
 }
 
@@ -70,16 +119,28 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
   if (ptr->write_buffer_offset == 0)
     return 0;
 
+  if (ptr->flags & MEM_NO_BLOCK)
+  {
+    struct timeval local_tv;
+    fd_set set;
+
+    local_tv.tv_sec= 0;
+    local_tv.tv_usec= 300;
+
+    FD_ZERO(&set);
+    FD_SET(ptr->hosts[server_key].fd, &set);
+
+    select(1, NULL, &set, NULL, &local_tv);
+  }
   if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, 
                          ptr->write_buffer_offset, 0)) == -1)
+  {
     return -1;
+  }
 
   assert(sent_length == ptr->write_buffer_offset);
 
-  sent_length+= ptr->write_between_flush;
-
   ptr->write_buffer_offset= 0;
-  ptr->write_between_flush= 0;
 
   return sent_length;
 }