Non-blocking IO :)
authorBrian Aker <brian@tangent.org>
Wed, 3 Oct 2007 17:03:48 +0000 (10:03 -0700)
committerBrian Aker <brian@tangent.org>
Wed, 3 Oct 2007 17:03:48 +0000 (10:03 -0700)
Quit has also been modified to do an actual quit in cases where we need to
flush out any remaining write buffers.

Test system was refactored to make it a bit friendlier...

[brian@zim src]$ ./memslap --concurrency=5 --execute-number=5000 --initial-load=1 --non-blocking --servers=localhost --test=set
  Threads connecting to servers 5
  Took 10.252 seconds to load data

[brian@zim src]$ ./memslap --concurrency=5 --execute-number=5000 --initial-load=1 --servers=localhost --test=set

  Threads connecting to servers 5
  Took 200.373 seconds to load data

20 files changed:
include/memcached.h
lib/common.h
lib/memcached.c
lib/memcached_auto.c
lib/memcached_behavior.c
lib/memcached_delete.c
lib/memcached_flush.c
lib/memcached_get.c
lib/memcached_hosts.c
lib/memcached_io.c
lib/memcached_io.h
lib/memcached_quit.c
lib/memcached_response.c
lib/memcached_stats.c
lib/memcached_storage.c
lib/memcached_verbosity.c
src/client_options.h
src/memslap.c
tests/output.res
tests/test.c

index e9a61f28d650bf5279dd85a550f50a4f59b2cbc6..84eccee2acad60062bc2a85595466c2d1383026f 100644 (file)
@@ -71,6 +71,7 @@ typedef enum {
 
 typedef enum {
   MEMCACHED_BEHAVIOR_NO_BLOCK,
+  MEMCACHED_BEHAVIOR_BLOCK,
 } memcached_behavior;
 
 typedef enum {
@@ -132,7 +133,6 @@ struct memcached_st {
   char *read_ptr;
   char write_buffer[MEMCACHED_MAX_BUFFER];
   size_t write_buffer_offset;
-  size_t write_between_flush;
   char connected;
   int my_errno;
   unsigned int stack_responses;
@@ -161,7 +161,7 @@ memcached_return memcached_flush(memcached_st *ptr, time_t expiration);
 memcached_return memcached_verbosity(memcached_st *ptr, unsigned int verbosity);
 void memcached_quit(memcached_st *ptr);
 char *memcached_strerror(memcached_st *ptr, memcached_return rc);
-memcached_return memcached_behavior_set(memcached_st *ptr, memcached_behavior flag);
+memcached_return memcached_behavior_set(memcached_st *ptr, memcached_behavior flag, void *data);
 
 /* All of the functions for adding data to the server */
 memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length, 
@@ -233,11 +233,11 @@ memcached_return memcached_string_reset(memcached_st *ptr, memcached_string_st *
 void memcached_string_free(memcached_st *ptr, memcached_string_st *string);
 
 /* Some personal debugging functions */
-#define WATCHPOINT printf("WATCHPOINT %s:%d\n", __FILE__, __LINE__);fflush(stdout);
-#define WATCHPOINT_ERROR(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout);
-#define WATCHPOINT_STRING(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, A);fflush(stdout);
-#define WATCHPOINT_NUMBER(A) printf("WATCHPOINT %s:%d %d\n", __FILE__, __LINE__, A);fflush(stdout);
-#define WATCHPOINT_ERRNO(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, strerror(A));A= 0;fflush(stdout);
+#define WATCHPOINT printf("\nWATCHPOINT %s:%d (%s)\n", __FILE__, __LINE__,__func__);fflush(stdout);
+#define WATCHPOINT_ERROR(A) printf("\nWATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout);
+#define WATCHPOINT_STRING(A) printf("\nWATCHPOINT %s:%d %s\n", __FILE__, __LINE__, A);fflush(stdout);
+#define WATCHPOINT_NUMBER(A) printf("\nWATCHPOINT %s:%d %d\n", __FILE__, __LINE__, A);fflush(stdout);
+#define WATCHPOINT_ERRNO(A) printf("\nWATCHPOINT %s:%d %s\n", __FILE__, __LINE__, strerror(A));A= 0;fflush(stdout);
 
 
 #ifdef __cplusplus
index 153de06c59710de784790cb6683f98e5983c9589..21540b3dd88e0304099fdeb67724c2bc144137ca 100644 (file)
@@ -3,6 +3,7 @@
 */
 
 #include <memcached.h>
+#include <memcached_io.h>
 
 #include <libmemcached_config.h>
 
index 38d479ea00e3f37c4987148c388df64d0a256d15..0bab79580661fc79781e7286d54389e224a8e3b3 100644 (file)
@@ -27,6 +27,7 @@ void memcached_free(memcached_st *ptr)
 {
   if (ptr->hosts)
   {
+    memcached_quit(ptr);
     memcached_server_list_free(ptr->hosts);
     ptr->hosts= NULL;
   }
index 0bb91fd0cd0bf78ad04ea8562203479b93d617cf..29bcf2a6b340261906425dbb27aaca139613a42f 100644 (file)
@@ -24,7 +24,7 @@ static memcached_return memcached_auto(memcached_st *ptr,
                         offset);
   if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
     return MEMCACHED_WRITE_FAILURE;
-  sent_length= send(ptr->hosts[server_key].fd, buffer, send_length, 0);
+  sent_length= memcached_io_write(ptr, server_key, buffer, send_length, 1);
 
   if (sent_length == -1 || sent_length != send_length)
     return MEMCACHED_WRITE_FAILURE;
index 1095168ccf840d3d30d4f9c9ec6234c12d89cf3f..a88bcfcf33f5badb688af1464a3efcc670ee68dd 100644 (file)
@@ -1,12 +1,21 @@
 #include <memcached.h>
 
-memcached_return memcached_behavior_set(memcached_st *ptr, memcached_behavior flag)
+memcached_return memcached_behavior_set(memcached_st *ptr, 
+                                        memcached_behavior flag, 
+                                        void *data)
 {
   switch (flag)
   {
   case MEMCACHED_BEHAVIOR_NO_BLOCK:
+    /* We quit all connections so we can reset the sockets */
+    memcached_quit(ptr);
     ptr->flags|= MEM_NO_BLOCK;
     break;
+  case MEMCACHED_BEHAVIOR_BLOCK:
+    /* We quit all connections so we can reset the sockets */
+    memcached_quit(ptr);
+    ptr->flags+= MEM_NO_BLOCK;
+    break;
   }
 
   return MEMCACHED_SUCCESS;
index ecd7d8dc1b203cad509f1616939981b1b0a1a48a..e44277d44128f62833efab0951fc14a1fc022cac 100644 (file)
@@ -31,7 +31,7 @@ memcached_return memcached_delete(memcached_st *ptr, char *key, size_t key_lengt
     goto error;
   }
 
-  sent_length= send(ptr->hosts[server_key].fd, buffer, send_length, 0);
+  sent_length= memcached_io_write(ptr, server_key, buffer, send_length, 1);
 
   if (sent_length == -1 || sent_length != send_length)
   {
index 967354628ee48781996f84cbea4b71db72e7bcee..9b9c4baca91c3efb181961abef7ecf69a2aa6d62 100644 (file)
@@ -26,11 +26,13 @@ memcached_return memcached_flush(memcached_st *ptr, time_t expiration)
     if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
       return MEMCACHED_WRITE_FAILURE;
 
-    sent_length= send(ptr->hosts[x].fd, buffer, send_length, 0);
+    sent_length= memcached_io_write(ptr, x, buffer, send_length, 1);
 
     if (sent_length == -1 || sent_length != send_length)
       return MEMCACHED_WRITE_FAILURE;
 
+    if (ptr->flags & MEM_NO_BLOCK)
+      WATCHPOINT;
     rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, x);
 
     if (rc != MEMCACHED_SUCCESS)
index 4d2b10d2edad397ae2efd18bba7a25d1a81b9bed..49b64f3b23d97e0cdec9e74ead4c2021462b8a11 100644 (file)
@@ -137,7 +137,8 @@ char *memcached_get(memcached_st *ptr, char *key, size_t key_length,
 
   send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, "get %.*s\r\n", 
                         (int)key_length, key);
-  if ((send(ptr->hosts[server_key].fd, buffer, send_length, 0) == -1))
+
+  if ((memcached_io_write(ptr, server_key, buffer, send_length, 1)) == -1)
   {
     *error= MEMCACHED_WRITE_FAILURE;
     goto error;
@@ -239,8 +240,8 @@ memcached_return memcached_mget(memcached_st *ptr,
       memcached_string_st *string= cursor_key_exec[x];
       memcached_string_append(ptr, string, "\r\n", 2);
 
-      if ((send(ptr->hosts[x].fd, string->string, 
-                 memcached_string_length(ptr, string), 0) == -1))
+      if ((memcached_io_write(ptr, x, string->string, 
+                              memcached_string_length(ptr, string), 1)) == -1)
       {
         memcached_quit(ptr);
         rc= MEMCACHED_SOME_ERRORS;
index ad7ed06b668585b96df7d67d68aad999eb6a43f9..64892212009361da7d5b47ac37acd8e5acf984fd 100644 (file)
@@ -147,12 +147,7 @@ void memcached_server_list_free(memcached_server_st *ptr)
   unsigned int x;
 
   for (x= 0; ptr[x].hostname; x++)
-  {
-    if (ptr[x].fd > 0)
-      close(ptr[x].fd);
-
     free(ptr[x].hostname);
-  }
 
   free(ptr);
 }
index 8615aa6e13eb38c0433bba15e95fab12a4473ceb..401039b4efbd831ee2c0eddc1ab0e3f8692aac23 100644 (file)
@@ -3,6 +3,8 @@
 */
 
 #include <memcached.h>
+#include "memcached_io.h"
+#include <sys/select.h>
 
 ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
                           char *buffer, size_t length)
@@ -25,7 +27,9 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
                         buffer_ptr, 
                         length - x, 0);
         if (data_read == -1)
+        {
           return -1;
+        }
         if (data_read == 0)
           return x;
 
@@ -35,9 +39,35 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
       }
       else
       {
-        ptr->read_buffer_length= recv(ptr->hosts[server_key].fd, 
-                                      ptr->read_buffer, 
-                                      MEMCACHED_MAX_BUFFER, 0);
+        size_t data_read;
+try_again:
+
+        if (ptr->flags & MEM_NO_BLOCK)
+        {
+          struct timeval local_tv;
+          fd_set set;
+
+          memset(&local_tv, 0, sizeof(struct timeval));
+
+          local_tv.tv_sec= 0;
+          local_tv.tv_usec= 300;
+
+          FD_ZERO(&set);
+          FD_SET(ptr->hosts[server_key].fd, &set);
+
+          select(1, &set, NULL, NULL, &local_tv);
+        }
+
+        data_read= recv(ptr->hosts[server_key].fd, 
+                        ptr->read_buffer, 
+                        MEMCACHED_MAX_BUFFER, 0);
+        if (data_read == -1)
+        {
+          if (errno == EAGAIN)
+            goto try_again;
+          return -1;
+        }
+        ptr->read_buffer_length= data_read;
         ptr->read_ptr= ptr->read_buffer;
       }
 
@@ -56,7 +86,7 @@ ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
 }
 
 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
-                        char *buffer, size_t length)
+                        char *buffer, size_t length, char with_flush)
 {
   unsigned long long x;
 
@@ -64,21 +94,21 @@ ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
   {
     ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
     ptr->write_buffer_offset++;
+
     if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
     {
       size_t sent_length;
 
-      if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, 
-                             MEMCACHED_MAX_BUFFER, 0)) == -1)
-        return -1;
+      sent_length= memcached_io_flush(ptr, server_key);
 
       assert(sent_length == MEMCACHED_MAX_BUFFER);
-      ptr->write_between_flush+= MEMCACHED_MAX_BUFFER;
-
       ptr->write_buffer_offset= 0;
     }
   }
 
+  if (with_flush)
+    memcached_io_flush(ptr, server_key);
+
   return length;
 }
 
@@ -89,16 +119,28 @@ ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
   if (ptr->write_buffer_offset == 0)
     return 0;
 
+  if (ptr->flags & MEM_NO_BLOCK)
+  {
+    struct timeval local_tv;
+    fd_set set;
+
+    local_tv.tv_sec= 0;
+    local_tv.tv_usec= 300;
+
+    FD_ZERO(&set);
+    FD_SET(ptr->hosts[server_key].fd, &set);
+
+    select(1, NULL, &set, NULL, &local_tv);
+  }
   if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer, 
                          ptr->write_buffer_offset, 0)) == -1)
+  {
     return -1;
+  }
 
   assert(sent_length == ptr->write_buffer_offset);
 
-  sent_length+= ptr->write_between_flush;
-
   ptr->write_buffer_offset= 0;
-  ptr->write_between_flush= 0;
 
   return sent_length;
 }
index 0fac6d626620e39bffad8f194ffad5cf3539ed05..bb5b8fc539b3203ed6146287d38b89f998a7f17f 100644 (file)
@@ -3,7 +3,7 @@
 
 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key);
 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
-                        char *buffer, size_t length);
+                        char *buffer, size_t length, char with_flush);
 void memcached_io_reset(memcached_st *ptr, unsigned int server_key);
 ssize_t memcached_io_read(memcached_st *ptr, unsigned  int server_key,
                           char *buffer, size_t length);
index 89a55ecf43c96f0739152c1c851917d48bfd8c03..64bfe27e25f60cb810acec811255876b4fb91301 100644 (file)
@@ -4,6 +4,9 @@
   This closes all connections (forces flush of input as well).
   
   Maybe add a host specific, or key specific version? 
+  
+  The reason we send "quit" is that in case we have buffered IO, this 
+  will force data to be completed.
 */
 void memcached_quit(memcached_st *ptr)
 {
@@ -15,6 +18,10 @@ void memcached_quit(memcached_st *ptr)
     {
       if (ptr->hosts[x].fd != -1)
       {
+        char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+
+        if (ptr->flags & MEM_NO_BLOCK)
+          memcached_io_write(ptr, x, "quit\r\n", 6, 1);
         close(ptr->hosts[x].fd);
         ptr->hosts[x].fd= -1;
       }
index 42a06411175db1e664ac80e5d5bef1cc748c0324..d649922b0da164354d91bd276e16fadf8f44838b 100644 (file)
@@ -12,28 +12,33 @@ memcached_return memcached_response(memcached_st *ptr,
                                     char *buffer, size_t buffer_length,
                                     unsigned int server_key)
 {
+  unsigned int x;
   size_t send_length;
   char *buffer_ptr;
 
   memset(buffer, 0, buffer_length);
   send_length= 0;
 
-  buffer_ptr= buffer;
-  while (1)
+  for (x= 0; x <= ptr->stack_responses; x++)
   {
-    unsigned int read_length;
+    buffer_ptr= buffer;
+    while (1)
+    {
+      unsigned int read_length;
 
-    read_length= memcached_io_read(ptr, server_key,
-                                   buffer_ptr, 1);
+      read_length= memcached_io_read(ptr, server_key,
+                                     buffer_ptr, 1);
 
-    if (read_length != 1)
-      return  MEMCACHED_UNKNOWN_READ_FAILURE;
+      if (read_length != 1)
+        return  MEMCACHED_UNKNOWN_READ_FAILURE;
 
-    if (*buffer_ptr == '\n')
-      break;
-    else
-      buffer_ptr++;
+      if (*buffer_ptr == '\n')
+        break;
+      else
+        buffer_ptr++;
+    }
   }
+  ptr->stack_responses= 0;
 
   switch(buffer[0])
   {
index 0e3c2efe877c91e0ac98a047a911ecc8ec65fe4f..644ca70d41d24f722731cab263a73482f6c7202e 100644 (file)
@@ -240,7 +240,7 @@ static memcached_return memcached_stats_fetch(memcached_st *ptr,
   if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
     return MEMCACHED_WRITE_FAILURE;
 
-  sent_length= send(ptr->hosts[server_key].fd, buffer, send_length, 0);
+  sent_length= memcached_io_write(ptr, server_key, buffer, send_length, 1);
 
   if (sent_length == -1 || sent_length != send_length)
     return MEMCACHED_WRITE_FAILURE;
index 1e607c24e48f9692953fe4b62235c33af9faace5..fc3c3cbab73f199e0e0fce477f1dc690a2c735bd 100644 (file)
 #include "common.h"
 #include "memcached_io.h"
 
+typedef enum {
+  SET_OP,
+  REPLACE_OP,
+  ADD_OP,
+} memcached_storage_action;
+
+/* Inline this */
+#define storage_op_string(A) A == SET_OP ? "set" : ( A == REPLACE_OP ? "replace" : "add")
+
 static memcached_return memcached_send(memcached_st *ptr, 
                                        char *key, size_t key_length, 
                                        char *value, size_t value_length, 
                                        time_t expiration,
                                        uint16_t  flags,
-                                       char *verb)
+                                       memcached_storage_action verb)
 {
   size_t write_length;
   ssize_t sent_length;
@@ -38,7 +47,7 @@ static memcached_return memcached_send(memcached_st *ptr,
   server_key= memcached_generate_hash(key, key_length) % ptr->number_of_hosts;
 
   write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
-                        "%s %.*s %x %llu %zu\r\n", verb,
+                        "%s %.*s %x %llu %zu\r\n", storage_op_string(verb),
                         (int)key_length, key, flags, 
                         (unsigned long long)expiration, value_length);
   if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
@@ -47,35 +56,37 @@ static memcached_return memcached_send(memcached_st *ptr,
     goto error;
   }
 
-  if ((sent_length= memcached_io_write(ptr, server_key, buffer, write_length)) == -1)
-  {
-    rc= MEMCACHED_WRITE_FAILURE;
-    goto error;
-  }
-
   /* 
     We have to flush after sending the command. Memcached is not smart enough
     to just keep reading from the socket :(
   */
-  if ((sent_length= memcached_io_flush(ptr, server_key)) == -1)
-    return MEMCACHED_WRITE_FAILURE;
-
-  if ((sent_length= memcached_io_write(ptr, server_key, value, value_length)) == -1)
+  if ((sent_length= memcached_io_write(ptr, server_key, buffer, write_length, 1)) == -1)
   {
     rc= MEMCACHED_WRITE_FAILURE;
     goto error;
   }
 
-  if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2)) == -1)
+  if ((sent_length= memcached_io_write(ptr, server_key, value, value_length, 0)) == -1)
   {
     rc= MEMCACHED_WRITE_FAILURE;
     goto error;
   }
 
-  if ((sent_length= memcached_io_flush(ptr, server_key)) == -1)
-    return MEMCACHED_WRITE_FAILURE;
+  if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2, 1)) == -1)
+  {
+    rc= MEMCACHED_WRITE_FAILURE;
+    goto error;
+  }
 
-  rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
+  if ((ptr->flags & MEM_NO_BLOCK) && verb == SET_OP)
+  {
+    rc= MEMCACHED_SUCCESS;
+    ptr->stack_responses++;
+  }
+  else
+  {
+    rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
+  }
 
   if (rc == MEMCACHED_STORED)
     return MEMCACHED_SUCCESS;
@@ -96,7 +107,7 @@ memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length,
   memcached_return rc;
   LIBMEMCACHED_MEMCACHED_SET_START();
   rc= memcached_send(ptr, key, key_length, value, value_length,
-                         expiration, flags, "set");
+                         expiration, flags, SET_OP);
   LIBMEMCACHED_MEMCACHED_SET_END();
   return rc;
 }
@@ -109,7 +120,7 @@ memcached_return memcached_add(memcached_st *ptr, char *key, size_t key_length,
   memcached_return rc;
   LIBMEMCACHED_MEMCACHED_ADD_START();
   rc= memcached_send(ptr, key, key_length, value, value_length,
-                         expiration, flags, "add");
+                         expiration, flags, ADD_OP);
   LIBMEMCACHED_MEMCACHED_ADD_END();
   return rc;
 }
@@ -122,7 +133,7 @@ memcached_return memcached_replace(memcached_st *ptr, char *key, size_t key_leng
   memcached_return rc;
   LIBMEMCACHED_MEMCACHED_REPLACE_START();
   rc= memcached_send(ptr, key, key_length, value, value_length,
-                         expiration, flags, "replace");
+                         expiration, flags, REPLACE_OP);
   LIBMEMCACHED_MEMCACHED_REPLACE_END();
   return rc;
 }
index 0be5fd4bb6501cc38fa22186fed57a9126619ab3..5d308aa2380399ff0da0e15553e3604fa37d8af7 100644 (file)
@@ -21,7 +21,7 @@ memcached_return memcached_verbosity(memcached_st *ptr, unsigned int verbosity)
   {
     memcached_return rc;
 
-    if ((send(ptr->hosts[x].fd, buffer, send_length, 0) == -1))
+    if ((memcached_io_write(ptr, x, buffer, send_length, 1)) == -1)
     {
       continue;
       return MEMCACHED_SOME_ERRORS;
index 57c87db6d7285e530a3c5f9e127b47e78df82133..8bce24ef23c45558654c45d713ff7b76df58c90d 100644 (file)
@@ -13,4 +13,5 @@ typedef enum {
   OPT_SLAP_INITIAL_LOAD,
   OPT_SLAP_TEST,
   OPT_SLAP_CONCURRENCY,
+  OPT_SLAP_NON_BLOCK,
 } memcached_options;
index c9c65771d94fc020e0d2da741dffc86bd63697d4..b7925b73780d8379439f78daaa316a156eaf9e1b 100644 (file)
@@ -66,6 +66,7 @@ pairs_st *load_createial_data(memcached_server_st *servers, unsigned int number_
                             unsigned int *actual_loaded);
 
 static int opt_verbose= 0;
+static int opt_non_blocking_io= 0;
 static unsigned int opt_execute_number= 0;
 static unsigned int opt_createial_load= 0;
 static unsigned int opt_concurrency= 0;
@@ -204,6 +205,7 @@ void options_parse(int argc, char *argv[])
       {"flag", no_argument, &opt_displayflag, OPT_FLAG},
       {"help", no_argument, NULL, OPT_HELP},
       {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
+      {"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
       {"servers", required_argument, NULL, OPT_SERVERS},
       {"test", required_argument, NULL, OPT_SLAP_TEST},
       {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
@@ -295,6 +297,8 @@ void *run_task(void *p)
   memcached_st *memc;
 
   memc= memcached_create(NULL);
+  if (opt_non_blocking_io)
+    memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL );
   
   memcached_server_push(memc, context->servers);
 
@@ -336,6 +340,8 @@ pairs_st *load_createial_data(memcached_server_st *servers, unsigned int number_
   pairs_st *pairs;
 
   memc= memcached_create(NULL);
+  /* We always used non-blocking IO for load since it is faster */
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL );
   memcached_server_push(memc, servers);
 
   pairs= pairs_generate(number_of);
index 974aa22d62c5b067a1fc2576a672e9d60d7f9959..e6a2f73b9c6c1059509c034d027e814a349c6251 100644 (file)
@@ -74,3 +74,83 @@ Found key bytes_read
 Found key bytes_written
 Found key limit_maxbytes
 Found key threads
+
+WATCHPOINT memcached_flush.c:35 (memcached_flush)
+Error 0 -> SUCCESS
+Error 1 -> FAILURE
+Error 2 -> HOSTNAME LOOKUP FAILURE
+Error 3 -> CONNECTION FAILURE
+Error 4 -> CONNECTION BIND FAILURE
+Error 5 -> WRITE FAILURE
+Error 6 -> READ FAILURE
+Error 7 -> UNKNOWN READ FAILURE
+Error 8 -> PROTOCOL ERROR
+Error 9 -> CLIENT ERROR
+Error 10 -> SERVER ERROR
+Error 11 -> CONNECTION SOCKET CREATE FAILURE
+Error 12 -> CONNECTION DATA EXISTS
+Error 13 -> CONNECTION DATA DOES NOT EXIST
+Error 14 -> NOT STORED
+Error 15 -> STORED
+Error 16 -> NOT FOUND
+Error 17 -> MEMORY ALLOCATION FAILURE
+Error 18 -> PARTIAL READ
+Error 19 -> SOME ERRORS WERE REPORTED
+Error 20 -> NO SERVERS DEFINED
+Error 21 -> SERVER END
+Error 22 -> SERVER DELETE
+Error 23 -> SERVER VALUE
+
+WATCHPOINT memcached_flush.c:35 (memcached_flush)
+Found key pid
+Found key uptime
+Found key time
+Found key version
+Found key pointer_size
+Found key rusage_user
+Found key rusage_system
+Found key rusage_user_seconds
+Found key rusage_user_microseconds
+Found key rusage_system_seconds
+Found key rusage_system_microseconds
+Found key curr_items
+Found key total_items
+Found key bytes
+Found key curr_connections
+Found key total_connections
+Found key connection_structures
+Found key cmd_get
+Found key cmd_set
+Found key get_hits
+Found key get_misses
+Found key evictions
+Found key bytes_read
+Found key bytes_written
+Found key limit_maxbytes
+Found key threads
+Found key pid
+Found key uptime
+Found key time
+Found key version
+Found key pointer_size
+Found key rusage_user
+Found key rusage_system
+Found key rusage_user_seconds
+Found key rusage_user_microseconds
+Found key rusage_system_seconds
+Found key rusage_system_microseconds
+Found key curr_items
+Found key total_items
+Found key bytes
+Found key curr_connections
+Found key total_connections
+Found key connection_structures
+Found key cmd_get
+Found key cmd_set
+Found key get_hits
+Found key get_misses
+Found key evictions
+Found key bytes_read
+Found key bytes_written
+Found key limit_maxbytes
+Found key threads
index 7a3b86291e354bdeb8f13c4fe3cfdf94736404b8..e12df9fc28724075ea7e8bc2e75501df4a2c09df 100644 (file)
@@ -7,7 +7,7 @@
 #include <stdlib.h>
 #include <string.h>
 
-void init_test(void)
+void init_test(memcached_st *not_used)
 {
   memcached_st memc;
 
@@ -15,7 +15,7 @@ void init_test(void)
   memcached_free(&memc);
 }
 
-void allocation_test(void)
+void allocation_test(memcached_st *not_used)
 {
   memcached_st *memc;
   memc= memcached_create(NULL);
@@ -23,99 +23,72 @@ void allocation_test(void)
   memcached_free(memc);
 }
 
-void connection_test(void)
+void connection_test(memcached_st *memc)
 {
   memcached_return rc;
-  memcached_st *memc;
-  memc= memcached_create(NULL);
-  assert(memc);
+
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
-  assert(memc);
-  memcached_free(memc);
 }
 
-void error_test(void)
+void error_test(memcached_st *memc)
 {
-  memcached_st *memc;
-  memc= memcached_create(NULL);
   memcached_return rc;
 
   for (rc= MEMCACHED_SUCCESS; rc < MEMCACHED_MAXIMUM_RETURN; rc++)
   {
     printf("Error %d -> %s\n", rc, memcached_strerror(memc, rc));
   }
-
-  assert(memc);
-  memcached_free(memc);
 }
 
-void set_test(void)
+void set_test(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value= "when we sanitize";
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
   rc= memcached_set(memc, key, strlen(key), 
                     value, strlen(value),
                     (time_t)0, (uint16_t)0);
   assert(rc == MEMCACHED_SUCCESS);
-  
-  memcached_free(memc);
 }
 
-void add_test(void)
+void add_test(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value= "when we sanitize";
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
   rc= memcached_add(memc, key, strlen(key), 
                     value, strlen(value),
                     (time_t)0, (uint16_t)0);
   assert(rc == MEMCACHED_NOTSTORED);
-  
-  memcached_free(memc);
 }
 
-void replace_test(void)
+void replace_test(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value= "when we sanitize";
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
   rc= memcached_replace(memc, key, strlen(key), 
                     value, strlen(value),
                     (time_t)0, (uint16_t)0);
   assert(rc == MEMCACHED_SUCCESS);
-  
-  memcached_free(memc);
 }
 
-void delete_test(void)
+void delete_test(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value= "when we sanitize";
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
   rc= memcached_set(memc, key, strlen(key), 
@@ -125,36 +98,26 @@ void delete_test(void)
 
   rc= memcached_delete(memc, key, strlen(key), (time_t)0);
   assert(rc == MEMCACHED_SUCCESS);
-  
-  memcached_free(memc);
 }
 
-void flush_test(void)
+void flush_test(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
   rc= memcached_flush(memc, 0);
   assert(rc == MEMCACHED_SUCCESS);
-
-  memcached_free(memc);
 }
 
-void get_test(void)
+void get_test(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *string;
   size_t string_length;
   uint16_t flags;
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
   
@@ -164,13 +127,10 @@ void get_test(void)
   assert(rc == MEMCACHED_NOTFOUND);
   assert(string_length ==  0);
   assert(!string);
-
-  memcached_free(memc);
 }
 
-void get_test2(void)
+void get_test2(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value= "when we sanitize";
@@ -178,8 +138,6 @@ void get_test2(void)
   size_t string_length;
   uint16_t flags;
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -197,21 +155,16 @@ void get_test2(void)
   assert(!memcmp(string, value, string_length));
 
   free(string);
-
-  memcached_free(memc);
 }
 
-void set_test2(void)
+void set_test2(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value= "train in the brain";
   size_t value_length= strlen(value);
   unsigned int x;
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -222,13 +175,10 @@ void set_test2(void)
                       (time_t)0, (uint16_t)0);
     assert(rc == MEMCACHED_SUCCESS);
   }
-
-  memcached_free(memc);
 }
 
-void set_test3(void)
+void set_test3(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value;
@@ -241,8 +191,6 @@ void set_test3(void)
   for (x= 0; x < value_length; x++)
     value[x] = (char) (x % 127);
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -255,13 +203,10 @@ void set_test3(void)
   }
 
   free(value);
-
-  memcached_free(memc);
 }
 
-void get_test3(void)
+void get_test3(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value;
@@ -277,8 +222,6 @@ void get_test3(void)
   for (x= 0; x < value_length; x++)
     value[x] = (char) (x % 127);
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -297,13 +240,10 @@ void get_test3(void)
 
   free(string);
   free(value);
-
-  memcached_free(memc);
 }
 
-void get_test4(void)
+void get_test4(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "foo";
   char *value;
@@ -319,8 +259,6 @@ void get_test4(void)
   for (x= 0; x < value_length; x++)
     value[x] = (char) (x % 127);
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -342,11 +280,9 @@ void get_test4(void)
   }
 
   free(value);
-
-  memcached_free(memc);
 }
 
-void stats_servername_test(void)
+void stats_servername_test(memcached_st *memc)
 {
   memcached_return rc;
   memcached_stat_st stat;
@@ -355,16 +291,13 @@ void stats_servername_test(void)
                                  MEMCACHED_DEFAULT_PORT);
 }
 
-void increment_test(void)
+void increment_test(memcached_st *memc)
 {
-  memcached_st *memc;
   unsigned int new_number;
   memcached_return rc;
   char *key= "number";
   char *value= "0";
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -382,20 +315,15 @@ void increment_test(void)
                           1, &new_number);
   assert(rc == MEMCACHED_SUCCESS);
   assert(new_number == 2);
-
-  memcached_free(memc);
 }
 
-void decrement_test(void)
+void decrement_test(memcached_st *memc)
 {
-  memcached_st *memc;
   unsigned int new_number;
   memcached_return rc;
   char *key= "number";
   char *value= "3";
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -413,19 +341,14 @@ void decrement_test(void)
                           1, &new_number);
   assert(rc == MEMCACHED_SUCCESS);
   assert(new_number == 1);
-
-  memcached_free(memc);
 }
 
-void quit_test(void)
+void quit_test(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *key= "fudge";
   char *value= "sanford and sun";
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -439,13 +362,10 @@ void quit_test(void)
                     value, strlen(value),
                     (time_t)50, (uint16_t)9);
   assert(rc == MEMCACHED_SUCCESS);
-  
-  memcached_free(memc);
 }
 
-void mget_test(void)
+void mget_test(memcached_st *memc)
 {
-  memcached_st *memc;
   memcached_return rc;
   char *keys[]= {"fudge", "son", "food"};
   size_t key_length[]= {5, 3, 4};
@@ -457,8 +377,6 @@ void mget_test(void)
   char *return_value;
   size_t return_value_length;
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -500,41 +418,31 @@ void mget_test(void)
     free(return_value);
     x++;
   }
-
-  memcached_free(memc);
 }
 
-void get_stats_keys(void)
+void get_stats_keys(memcached_st *memc)
 {
  char **list;
  char **ptr;
- memcached_st *memc;
  memcached_stat_st stat;
  memcached_return rc;
 
- memc= memcached_create(NULL);
- assert(memc);
-
  list= memcached_stat_get_keys(memc, &stat, &rc);
  assert(rc == MEMCACHED_SUCCESS);
  for (ptr= list; *ptr; ptr++)
    printf("Found key %s\n", *ptr);
 
  free(list);
- memcached_free(memc);
 }
 
-void get_stats(void)
+void get_stats(memcached_st *memc)
 {
  unsigned int x;
  char **list;
  char **ptr;
  memcached_return rc;
- memcached_st *memc;
  memcached_stat_st *stat;
 
- memc= memcached_create(NULL);
- assert(memc);
  rc= memcached_server_add(memc, "localhost", 0);
  assert(rc == MEMCACHED_SUCCESS);
 
@@ -555,20 +463,16 @@ void get_stats(void)
  }
 
  free(stat);
- memcached_free(memc);
 }
 
-void get_stats_multiple(void)
+void get_stats_multiple(memcached_st *memc)
 {
  unsigned int x;
  char **list;
  char **ptr;
  memcached_return rc;
- memcached_st *memc;
  memcached_stat_st *stat;
 
- memc= memcached_create(NULL);
- assert(memc);
  rc= memcached_server_add(memc, "localhost", 0);
  assert(rc == MEMCACHED_SUCCESS);
  rc= memcached_server_add(memc, "localhost", 5555);
@@ -591,19 +495,15 @@ void get_stats_multiple(void)
  }
 
  free(stat);
- memcached_free(memc);
 }
 
-void add_host_test(void)
+void add_host_test(memcached_st *memc)
 {
   unsigned int x;
-  memcached_st *memc;
   memcached_server_st *servers;
   memcached_return rc;
   char servername[]= "0.example.com";
 
-  memc= memcached_create(NULL);
-  assert(memc);
   rc= memcached_server_add(memc, "localhost", 0);
   assert(rc == MEMCACHED_SUCCESS);
 
@@ -628,19 +528,14 @@ void add_host_test(void)
   assert(rc == MEMCACHED_SUCCESS);
 
   memcached_server_list_free(servers);
-  memcached_free(memc);
 }
 
-void add_host_test1(void)
+void add_host_test1(memcached_st *memc)
 {
   unsigned int x;
-  memcached_st *memc;
-  memcached_server_st *servers;
   memcached_return rc;
   char servername[]= "0.example.com";
-
-  memc= memcached_create(NULL);
-  assert(memc);
+  memcached_server_st *servers;
 
   servers= memcached_server_list_append(NULL, servername, 400, &rc);
   assert(servers);
@@ -663,47 +558,92 @@ void add_host_test1(void)
   assert(rc == MEMCACHED_SUCCESS);
 
   memcached_server_list_free(servers);
-  memcached_free(memc);
 }
 
+typedef struct test_st test_st;
+
+struct test_st {
+  char *function_name;
+  unsigned int requires_flush;
+  void (*function)(memcached_st *memc);
+};
 
 int main(int argc, char *argv[])
 {
+  unsigned int x;
+
   /* Clean the server before beginning testing */
-  flush_test();
-  init_test();
-  allocation_test();
-  connection_test();
-  error_test();
-  set_test();
-  set_test2();
-  set_test3();
-  add_test();
-  replace_test();
-  flush_test();
-  delete_test();
-  flush_test();
-  get_test();
-  get_test2();
-  get_test3(); 
-  get_test4();
-  stats_servername_test();
-
-  increment_test();
-  decrement_test();
-  quit_test();
-  mget_test();
-  get_stats();
-  add_host_test();
+  test_st tests[] ={
+    {"flush", 0, flush_test },
+    {"init", 0, init_test },
+    {"allocation", 0, allocation_test },
+    {"error", 0, error_test },
+    {"set", 0, set_test },
+    {"set2", 0, set_test2 },
+    {"set3", 0, set_test3 },
+    {"add", 0, add_test },
+    {"replace", 0, replace_test },
+    {"delete", 1, delete_test },
+    {"get", 0, get_test },
+    {"get2", 0, get_test2 },
+    {"get3", 0, get_test3 },
+    {"get4", 0, get_test4 },
+    {"stats_servername", 0, stats_servername_test },
+    {"increment", 0, increment_test },
+    {"decrement", 0, decrement_test },
+    {"quit", 0, quit_test },
+    {"mget", 0, mget_test },
+    {"get_stats", 0, get_stats },
+    {"add_host_test", 0, add_host_test },
+    {"get_stats_keys", 0, get_stats_keys },
+    {0, 0, 0}
+  };
+
+  fprintf(stderr, "\nBlock tests\n\n");
+  for (x= 0; tests[x].function_name; x++)
+  {
+    memcached_st *memc;
+    memc= memcached_create(NULL);
+    assert(memc);
+    fprintf(stderr, "Testing %s", tests[x].function_name);
+    tests[x].function(memc);
+    fprintf(stderr, "\t\t\t\t\t[ ok ]\n");
+    assert(memc);
+    memcached_free(memc);
+  }
+
+  fprintf(stderr, "\nNonblock tests\n\n");
+  for (x= 0; tests[x].function_name; x++)
+  {
+    memcached_st *memc;
+    memc= memcached_create(NULL);
+    assert(memc);
+    fprintf(stderr, "Testing %s", tests[x].function_name);
+    memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL);
+    tests[x].function(memc);
+    fprintf(stderr, "\t\t\t\t\t[ ok ]\n");
+    assert(memc);
+    memcached_free(memc);
+  }
+
 
   /* The multiple tests */
   if (argc == 2)
   {
-    get_stats_multiple();
+    memcached_st *memc;
+    memc= memcached_create(NULL);
+    assert(memc);
+    get_stats_multiple(memc);
+    memcached_free(memc);
   }
-  get_stats_keys();
 
   /* Clean up whatever we might have left */
-  flush_test();
+  {
+    memcached_st *memc;
+    memc= memcached_create(NULL);
+    assert(memc);
+    flush_test(memc);
+    memcached_free(memc);
+  }
   return 0;
 }