Added buffered IO to write calls
authorBrian Aker <brian@tangent.org>
Tue, 2 Oct 2007 00:04:49 +0000 (17:04 -0700)
committerBrian Aker <brian@tangent.org>
Tue, 2 Oct 2007 00:04:49 +0000 (17:04 -0700)
ChangeLog
include/memcached.h
lib/Makefile.am
lib/memcached_delete.c
lib/memcached_io.c [new file with mode: 0644]
lib/memcached_io.h [new file with mode: 0644]
lib/memcached_storage.c
tests/test.c

index 8230194bf1deb0879483fdeff8d0db35fa49ef2a..f295a9ddc4c4a8877c0eb75b655109cf80fc64ec 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,5 @@
 0.4
+  * Added buffered IO to write calls for keys
   * memstat was broken (bad if/else on connect)
 
 0.3 Mon Oct  1 06:37:52 PDT 2007
index 9c2e2f849dd2af41d870fbfab9bd8862937a6a23..f4d0b25aefb2c2cba7f0436cacf263f5fc2e2f0a 100644 (file)
@@ -38,6 +38,8 @@ typedef struct memcached_server_st memcached_server_st;
 #define SMALL_STRING_LEN 1024
 #define HUGE_STRING_LEN 8196
 #define MEMCACHED_MAX_KEY 251 /* We add one to have it null terminated */
+//#define MEMCACHED_MAX_BUFFER 8196
+#define MEMCACHED_MAX_BUFFER 30
 
 typedef enum {
   MEMCACHED_SUCCESS,
@@ -118,6 +120,9 @@ struct memcached_st {
   memcached_server_st *hosts;
   unsigned int number_of_hosts;
   unsigned int cursor_server;
+  char write_buffer[MEMCACHED_MAX_BUFFER];
+  size_t write_buffer_offset;
+  size_t write_between_flush;
   char connected;
 };
 
index b474f28fadaf948ec353f8e2467943451e56f5fa..4ba350fb14dc2f15d3247721db7e089266454c94 100644 (file)
@@ -21,24 +21,26 @@ DTRACEFILES = memcached.o \
               memcached_stats.o
 
 noinst_HEADERS = libmemcached_probes.h \
+                memcached_io.h \
                  common.h
 
 lib_LTLIBRARIES = libmemcached.la
 libmemcached_la_SOURCES = memcached.c \
-                          memcached_strerror.c \
+                         memcached_auto.c \
                          memcached_connect.c \
-                         memcached_response.c \
-                         memcached_get.c \
-                         memcached_storage.c \
                          memcached_delete.c \
+                         memcached_flush.c \
+                         memcached_get.c \
                          memcached_hash.c \
-                         memcached_auto.c \
-                         memcached_verbosity.c \
+                         memcached_hosts.c \
+                         memcached_io.c \
                          memcached_quit.c \
-                         memcached_flush.c \
+                         memcached_response.c \
+                         memcached_storage.c \
                          memcached_string.c \
-                         memcached_hosts.c \
-                         memcached_stats.c
+                         memcached_stats.c \
+                          memcached_strerror.c \
+                         memcached_verbosity.c 
 libmemcached_la_LIBADD =
 
 if HAVE_DTRACE
index 64036c7998bbb84a0034765986853b799b099292..ecd7d8dc1b203cad509f1616939981b1b0a1a48a 100644 (file)
@@ -26,18 +26,26 @@ memcached_return memcached_delete(memcached_st *ptr, char *key, size_t key_lengt
                           "delete %.*s\r\n", (int)key_length, key);
 
   if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
-    return MEMCACHED_WRITE_FAILURE;
+  {
+    rc= MEMCACHED_WRITE_FAILURE;
+    goto error;
+  }
 
   sent_length= send(ptr->hosts[server_key].fd, buffer, send_length, 0);
 
   if (sent_length == -1 || sent_length != send_length)
-    return MEMCACHED_WRITE_FAILURE;
+  {
+    rc= MEMCACHED_WRITE_FAILURE;
+    goto error;
+  }
 
   rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
-  LIBMEMCACHED_MEMCACHED_DELETE_END();
 
   if (rc == MEMCACHED_DELETED)
     rc= MEMCACHED_SUCCESS;
 
+  LIBMEMCACHED_MEMCACHED_DELETE_END();
+
+error:
   return rc;
 }
diff --git a/lib/memcached_io.c b/lib/memcached_io.c
new file mode 100644 (file)
index 0000000..7067237
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+  Basic socket buffered IO
+*/
+
+#include <memcached.h>
+
+ssize_t memcached_io_read(memcached_st *ptr, char *buf, size_t length)
+{
+  return -1;
+}
+
+ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
+                        char *buffer, size_t length)
+{
+  unsigned long long x;
+
+  for (x= 0; x < length; x++)
+  {
+    ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
+    ptr->write_buffer_offset++;
+    if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
+    {
+      size_t sent_length;
+
+      if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, 
+                             MEMCACHED_MAX_BUFFER, 0)) == -1)
+        return -1;
+
+      assert(sent_length == MEMCACHED_MAX_BUFFER);
+      ptr->write_between_flush+= MEMCACHED_MAX_BUFFER;
+
+      ptr->write_buffer_offset= 0;
+    }
+  }
+
+  return length;
+}
+
+ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
+{
+  size_t sent_length;
+
+  if (ptr->write_buffer_offset == 0)
+    return 0;
+
+  if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, 
+                         ptr->write_buffer_offset, 0)) == -1)
+    return -1;
+
+  assert(sent_length == ptr->write_buffer_offset);
+
+  sent_length+= ptr->write_between_flush;
+
+  ptr->write_buffer_offset= 0;
+  ptr->write_between_flush= 0;
+
+  return sent_length;
+}
+
+/* 
+  Eventually we will just kill off the server with the problem.
+*/
+void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
+{
+  ptr->write_buffer_offset= 0;
+  memcached_quit(ptr);
+}
diff --git a/lib/memcached_io.h b/lib/memcached_io.h
new file mode 100644 (file)
index 0000000..d9abd62
--- /dev/null
@@ -0,0 +1,7 @@
+/* Server IO, Not public! */
+#include <memcached.h>
+
+ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key);
+ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
+                        char *buffer, size_t length);
+void memcached_io_reset(memcached_st *ptr, unsigned int server_key);
index c1ce2b830ad4c485776e5fb7e337eac7e09302c4..0c27c76681fdfc310c39bf99488decaec842becc 100644 (file)
@@ -8,6 +8,7 @@
 */
 
 #include "common.h"
+#include "memcached_io.h"
 
 static memcached_return memcached_send(memcached_st *ptr, 
                                        char *key, size_t key_length, 
@@ -31,6 +32,8 @@ static memcached_return memcached_send(memcached_st *ptr,
   if (rc != MEMCACHED_SUCCESS)
     return rc;
 
+  assert(ptr->write_buffer_offset == 0);
+
   server_key= memcached_generate_hash(key, key_length) % ptr->number_of_hosts;
 
   write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
@@ -38,27 +41,63 @@ static memcached_return memcached_send(memcached_st *ptr,
                         (int)key_length, key, flags, 
                         (unsigned long long)expiration, value_length);
   if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
-    return MEMCACHED_WRITE_FAILURE;
-  if ((sent_length= send(ptr->hosts[server_key].fd, buffer, write_length, 0)) == -1)
-    return MEMCACHED_WRITE_FAILURE;
+  {
+    rc= MEMCACHED_WRITE_FAILURE;
+    goto error;
+  }
+
+  if ((sent_length= memcached_io_write(ptr, server_key, buffer, write_length)) == -1)
+  {
+    rc= MEMCACHED_WRITE_FAILURE;
+    goto error;
+  }
   assert(write_length == sent_length);
 
-  if ((sent_length= send(ptr->hosts[server_key].fd, value, value_length, 0)) == -1)
+  /* 
+    We have to flush after sending the command. Memcached is not smart enough
+    to just keep reading from the socket :(
+  */
+  if ((sent_length= memcached_io_flush(ptr, server_key)) == -1)
     return MEMCACHED_WRITE_FAILURE;
+
+  if ((sent_length= memcached_io_write(ptr, server_key, value, value_length)) == -1)
+  {
+    rc= MEMCACHED_WRITE_FAILURE;
+    goto error;
+  }
   assert(value_length == sent_length);
 
-  if ((sent_length= send(ptr->hosts[server_key].fd, "\r\n", 2, 0)) == -1)
-    return MEMCACHED_WRITE_FAILURE;
+  if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2)) == -1)
+  {
+    rc= MEMCACHED_WRITE_FAILURE;
+    goto error;
+  }
+
   assert(2 == sent_length);
 
+  if ((sent_length= memcached_io_flush(ptr, server_key)) == -1)
+    return MEMCACHED_WRITE_FAILURE;
+
+  //assert(sent_length == write_length + value_length + 2);
+
   sent_length= recv(ptr->hosts[server_key].fd, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 0);
 
   if (sent_length && buffer[0] == 'S')  /* STORED */
     return MEMCACHED_SUCCESS;
   else if (write_length && buffer[0] == 'N')  /* NOT_STORED */
     return MEMCACHED_NOTSTORED;
+  else if (write_length && buffer[0] == 'E')  /* ERROR */
+  {
+    printf("BUFFER :%s:\n", buffer);
+    return MEMCACHED_PROTOCOL_ERROR;
+  }
   else
     return MEMCACHED_READ_FAILURE;
+
+error:
+  memcached_io_reset(ptr, server_key);
+
+  return rc;
 }
 
 memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length, 
index 37ed800bce55415cefe62927c74daad19c80c88a..7a3b86291e354bdeb8f13c4fe3cfdf94736404b8 100644 (file)
@@ -201,6 +201,64 @@ void get_test2(void)
   memcached_free(memc);
 }
 
+void set_test2(void)
+{
+  memcached_st *memc;
+  memcached_return rc;
+  char *key= "foo";
+  char *value= "train in the brain";
+  size_t value_length= strlen(value);
+  unsigned int x;
+
+  memc= memcached_create(NULL);
+  assert(memc);
+  rc= memcached_server_add(memc, "localhost", 0);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  for (x= 0; x < 10; x++)
+  {
+    rc= memcached_set(memc, key, strlen(key), 
+                      value, value_length,
+                      (time_t)0, (uint16_t)0);
+    assert(rc == MEMCACHED_SUCCESS);
+  }
+
+  memcached_free(memc);
+}
+
+void set_test3(void)
+{
+  memcached_st *memc;
+  memcached_return rc;
+  char *key= "foo";
+  char *value;
+  size_t value_length= 8191;
+  unsigned int x;
+
+  value = (char*)malloc(value_length);
+  assert(value);
+
+  for (x= 0; x < value_length; x++)
+    value[x] = (char) (x % 127);
+
+  memc= memcached_create(NULL);
+  assert(memc);
+  rc= memcached_server_add(memc, "localhost", 0);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  for (x= 0; x < 1; x++)
+  {
+    rc= memcached_set(memc, key, strlen(key), 
+                      value, value_length,
+                      (time_t)0, (uint16_t)0);
+    assert(rc == MEMCACHED_SUCCESS);
+  }
+
+  free(value);
+
+  memcached_free(memc);
+}
+
 void get_test3(void)
 {
   memcached_st *memc;
@@ -618,6 +676,8 @@ int main(int argc, char *argv[])
   connection_test();
   error_test();
   set_test();
+  set_test2();
+  set_test3();
   add_test();
   replace_test();
   flush_test();
@@ -625,7 +685,7 @@ int main(int argc, char *argv[])
   flush_test();
   get_test();
   get_test2();
-  get_test3();
+  get_test3(); 
   get_test4();
   stats_servername_test();