Merging Trond
authorBrian Aker <brian@gaz>
Wed, 14 Oct 2009 23:47:10 +0000 (16:47 -0700)
committerBrian Aker <brian@gaz>
Wed, 14 Oct 2009 23:47:10 +0000 (16:47 -0700)
12 files changed:
docs/Makefile.am
docs/memcached_get.pod
libmemcached/common.h
libmemcached/memcached.h
libmemcached/memcached_connect.c
libmemcached/memcached_fetch.c
libmemcached/memcached_get.c
libmemcached/memcached_get.h
libmemcached/memcached_io.c
libmemcached/memcached_key.c
libmemcached/memcached_types.h
tests/function.c

index 772b03443025d8d17650ab7ea7ef297d83407baa..9a4ee49ac8cdc28ed71a234833acc6517603e586 100644 (file)
@@ -74,6 +74,8 @@ man_MANS = libmemcached.3\
        memcached_increment_with_initial.3\
        memcached_mget.3\
        memcached_mget_by_key.3\
+        memcached_mget_execute.3 \
+        memcached_mget_execute_by_key.3 \
        memcached_prepend.3\
        memcached_prepend_by_key.3\
        memcached_replace.3\
@@ -212,6 +214,12 @@ memcached_mget.3: memcached_get.pod
 memcached_mget_by_key.3: memcached_get.pod
        ${POD2MAN} -c "libmemcached" -r "" -s 3 ${top_srcdir}/docs/memcached_get.pod > memcached_mget_by_key.3
 
+memcached_mget_execute.3: memcached_get.pod
+       ${POD2MAN} -c "libmemcached" -r "" -s 3 ${top_srcdir}/docs/memcached_get.pod > memcached_mget_execute.3
+
+memcached_mget_execute_by_key.3: memcached_get.pod
+       ${POD2MAN} -c "libmemcached" -r "" -s 3 ${top_srcdir}/docs/memcached_get.pod > memcached_mget_execute_by_key.3
+
 memcached_fetch.3: memcached_get.pod
        ${POD2MAN} -c "libmemcached" -r "" -s 3 ${top_srcdir}/docs/memcached_get.pod > memcached_fetch.3
 
index 1fe7298f1ebe6ff541f69c1825af65d5879d6990..db2c9b5ae9d8295b665bf1c0fd18758366d42b9a 100644 (file)
@@ -1,6 +1,7 @@
 =head1 NAME
 
-memcached_get, memcached_mget, memcached_fetch - Get a value
+memcached_get, memcached_mget, memcached_fetch, memcached_mget_execute,
+memcached_mget_execute_by_key - Get a value
 
 =head1 LIBRARY
 
@@ -23,7 +24,8 @@ C Client Library for memcached (libmemcached, -lmemcached)
 
   memcached_return
   memcached_mget (memcached_st *ptr, 
-                  char **keys, size_t *key_length, 
+                  const char * const *keys, 
+                  const size_t *key_length, 
                   size_t number_of_keys);
   char *
   memcached_get_by_key(memcached_st *ptr, 
@@ -36,7 +38,8 @@ C Client Library for memcached (libmemcached, -lmemcached)
   memcached_return 
   memcached_mget_by_key(memcached_st *ptr, 
                         const char *master_key, size_t master_key_length,
-                        char **keys, size_t *key_length, 
+                        const char * const *keys, 
+                        const size_t *key_length, 
                         size_t number_of_keys);
 
   char *memcached_fetch (memcached_st *ptr,
@@ -44,12 +47,35 @@ C Client Library for memcached (libmemcached, -lmemcached)
                          size_t *value_length,
                          uint32_t *flags, 
                          memcached_return *error);
+
   memcached_return 
   memcached_fetch_execute(memcached_st *ptr, 
                           memcached_return (*callback[])(memcached_st *ptr, memcached_result_st *result, void *context),
                           void *context,
                           unsigned int number_of_callbacks);
 
+
+  memcached_return
+  memcached_mget_execute(memcached_st *ptr,
+                         const char * const *keys,
+                         const size_t *key_length,
+                         size_t number_of_keys,
+                         memcached_execute_function *callback,
+                         void *context,
+                         unsigned int number_of_callbacks);
+
+  memcached_return 
+  memcached_mget_execute_by_key(memcached_st *ptr,
+                                const char *master_key,
+                                size_t master_key_length,
+                                const char * const *keys,
+                                const size_t *key_length,
+                                size_t number_of_keys,
+                                memcached_execute_function *callback,
+                                void *context,
+                                unsigned int number_of_callbacks);
+
+
 =head1 DESCRIPTION
 
 memcached_get() is used to fetch an individual value from the server. You
@@ -99,6 +125,16 @@ you supply the calling function. Currently only one value is being passed
 to each function call. In the future there will be an option to allow this
 to be an array.
 
+memcached_mget_execute() and memcached_mget_execute_by_key() is
+similar to memcached_mget(), but it may trigger the supplied callbacks
+with result sets while sending out the queries. If you try to perform
+a really large multiget with memcached_mget() you may encounter a
+deadlock in the OS kernel (we fail to write data to the socket because
+the input buffer is full). memcached_mget_execute() solves this
+problem by processing some of the results before continuing sending
+out requests. Please note that this function is only available in the
+binary protocol.
+
 memcached_get_by_key() and memcached_mget_by_key() behave in a similar nature
 as memcached_get() and memcached_mget(). The difference is that they take
 a master key that is used for determining which server an object was stored
index 96b834632ccfaefa23d3ce5f1bd97acf1e84703d..30fc4e12a9fbfb6203075b55b948dcd4af523b3d 100644 (file)
@@ -136,7 +136,8 @@ LIBMEMCACHED_LOCAL
 void server_list_free(memcached_st *ptr, memcached_server_st *servers);
 
 LIBMEMCACHED_LOCAL
-memcached_return memcached_key_test(const char **keys, size_t *key_length,
+memcached_return memcached_key_test(const char * const *keys, 
+                                    const size_t *key_length,
                                     size_t number_of_keys);
 
 
index 7c25fb3c9db9d15891e1a6f30d6e201fcfc8ec6b..225b20990d1f2d6a4bec4de5cb1e8781b3fc8908 100644 (file)
@@ -113,6 +113,7 @@ struct memcached_st {
   memcached_trigger_delete_key delete_trigger;
   char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE];
   uint32_t number_of_replicas;
+  memcached_callback_st *callbacks;
 };
 
 LIBMEMCACHED_API
index cee75e4c7c24412752155464c4f5e263f2810d26..f3ee14810081af272b06c092005410bbaee131d8 100644 (file)
@@ -121,16 +121,25 @@ static memcached_return set_socket_options(memcached_server_st *ptr)
     WATCHPOINT_ASSERT(error == 0);
   }
 
-  /* For the moment, not getting a nonblocking mode will not be fatal */
-  if ((ptr->root->flags & MEM_NO_BLOCK) || ptr->root->connect_timeout)
-  {
-    int flags;
+  /* libmemcached will always use nonblocking IO to avoid write deadlocks */
+  int flags;
 
+  do
     flags= fcntl(ptr->fd, F_GETFL, 0);
-    unlikely (flags != -1)
-    {
-      (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
-    }
+  while (flags == -1 && (errno == EINTR || errno == EAGAIN));
+
+  unlikely (flags == -1)
+    return MEMCACHED_CONNECTION_FAILURE;
+  else if ((flags & O_NONBLOCK) == 0)
+  {
+    int rval;
+
+    do
+      rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
+    while (rval == -1 && (errno == EINTR || errno == EAGAIN));
+
+    unlikely (rval == -1)
+      return MEMCACHED_CONNECTION_FAILURE;
   }
 
   return MEMCACHED_SUCCESS;
@@ -219,14 +228,6 @@ static memcached_return network_connect(memcached_server_st *ptr)
 
       (void)set_socket_options(ptr);
 
-      int flags= 0;
-      if (ptr->root->connect_timeout)
-      {
-        flags= fcntl(ptr->fd, F_GETFL, 0);
-        if (flags != -1 && !(flags & O_NONBLOCK))
-          (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
-      }
-
       /* connect to server */
       while (ptr->fd != -1 &&
              connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
@@ -268,10 +269,6 @@ static memcached_return network_connect(memcached_server_st *ptr)
 
       if (ptr->fd != -1)
       {
-        /* restore flags */
-        if (ptr->root->connect_timeout && (ptr->root->flags & MEM_NO_BLOCK) == 0)
-          (void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK);
-
         WATCHPOINT_ASSERT(ptr->cursor_active == 0);
         ptr->server_failure_counter= 0;
         return MEMCACHED_SUCCESS;
@@ -290,9 +287,10 @@ static memcached_return network_connect(memcached_server_st *ptr)
       if (gettimeofday(&next_time, NULL) == 0)
         ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
     }
-    ptr->server_failure_counter+= 1;
+    ptr->server_failure_counter++;
     if (ptr->cached_errno == 0)
       return MEMCACHED_TIMEOUT;
+
     return MEMCACHED_ERRNO; /* The last error should be from connect() */
   }
 
index d3f012197d6a177a30356b90161650c4739c7653..9c31e2b01a6cc6e260080eaaa4c7e79a6535cc3b 100644 (file)
@@ -64,7 +64,7 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr,
       return result;
     else if (*error == MEMCACHED_END)
       memcached_server_response_reset(server);
-    else
+    else if (*error != MEMCACHED_NOTFOUND)
       break;
   }
 
index 7ee714fe2e3ead8dc5f887b2f4d5bdc04c36903c..3436186f24a4803a32bf107e9c0f773489f8f53f 100644 (file)
@@ -15,12 +15,12 @@ char *memcached_get(memcached_st *ptr, const char *key,
 }
 
 static memcached_return memcached_mget_by_key_real(memcached_st *ptr, 
-                                       const char *master_key, 
-                                       size_t master_key_length,
-                                       const char **keys, 
-                                       size_t *key_length, 
-                                       size_t number_of_keys,
-                                      bool mget_mode);
+                                                   const char *master_key, 
+                                                   size_t master_key_length,
+                                                   const char * const *keys, 
+                                                   const size_t *key_length, 
+                                                   size_t number_of_keys,
+                                                   bool mget_mode);
 
 char *memcached_get_by_key(memcached_st *ptr, 
                            const char *master_key, 
@@ -42,10 +42,9 @@ char *memcached_get_by_key(memcached_st *ptr,
   }
 
   /* Request the key */
-  *error= memcached_mget_by_key_real(ptr, 
-                                master_key, 
-                                master_key_length, 
-                                (const char **)&key, &key_length, 1, false);
+  *error= memcached_mget_by_key_real(ptr, master_key, master_key_length, 
+                                     (const char * const *)&key, 
+                                     &key_length, 1, false);
 
   value= memcached_fetch(ptr, NULL, NULL, 
                          value_length, flags, error);
@@ -110,7 +109,8 @@ char *memcached_get_by_key(memcached_st *ptr,
 }
 
 memcached_return memcached_mget(memcached_st *ptr, 
-                                const char **keys, size_t *key_length, 
+                                const char * const *keys, 
+                                const size_t *key_length, 
                                 size_t number_of_keys)
 {
   return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
@@ -119,17 +119,18 @@ memcached_return memcached_mget(memcached_st *ptr,
 static memcached_return binary_mget_by_key(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
-                                           const char **keys, size_t *key_length,
+                                           const char * const *keys, 
+                                           const size_t *key_length,
                                            size_t number_of_keys,
                                           bool mget_mode);
 
 static memcached_return memcached_mget_by_key_real(memcached_st *ptr, 
-                                       const char *master_key, 
-                                       size_t master_key_length,
-                                       const char **keys, 
-                                       size_t *key_length, 
-                                       size_t number_of_keys,
-                                      bool mget_mode)
+                                                   const char *master_key, 
+                                                   size_t master_key_length,
+                                                   const char * const *keys, 
+                                                   const size_t *key_length, 
+                                                   size_t number_of_keys,
+                                                   bool mget_mode)
 {
   unsigned int x;
   memcached_return rc= MEMCACHED_NOTFOUND;
@@ -155,7 +156,7 @@ static memcached_return memcached_mget_by_key_real(memcached_st *ptr,
 
   if (master_key && master_key_length)
   {
-    if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
+    if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char * const *)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
       return MEMCACHED_BAD_KEY_PROVIDED;
     master_server_key= memcached_generate_hash(ptr, master_key, master_key_length);
     is_master_key_set= true;
@@ -269,18 +270,60 @@ static memcached_return memcached_mget_by_key_real(memcached_st *ptr,
 memcached_return memcached_mget_by_key(memcached_st *ptr, 
                                        const char *master_key, 
                                        size_t master_key_length,
-                                       const char **keys, 
-                                       size_t *key_length, 
+                                       const char * const *keys, 
+                                       const size_t *key_length, 
                                        size_t number_of_keys)
 {
   return memcached_mget_by_key_real(ptr, master_key, master_key_length, keys, 
                                     key_length, number_of_keys, true);
 }
 
+memcached_return memcached_mget_execute(memcached_st *ptr,
+                                        const char * const *keys,
+                                        const size_t *key_length,
+                                        size_t number_of_keys,
+                                        memcached_execute_function *callback,
+                                        void *context,
+                                        unsigned int number_of_callbacks)
+{
+  return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
+                                       number_of_keys, callback,
+                                       context, number_of_callbacks);
+}
+
+memcached_return memcached_mget_execute_by_key(memcached_st *ptr,
+                                               const char *master_key,
+                                               size_t master_key_length,
+                                               const char * const *keys,
+                                               const size_t *key_length,
+                                               size_t number_of_keys,
+                                               memcached_execute_function *callback,
+                                               void *context,
+                                               unsigned int number_of_callbacks)
+{
+  if ((ptr->flags & MEM_BINARY_PROTOCOL) == 0)
+    return MEMCACHED_NOT_SUPPORTED;
+
+  memcached_return rc;
+  memcached_callback_st *original_callbacks= ptr->callbacks;
+  memcached_callback_st cb= {
+    .callback= callback,
+    .context= context,
+    .number_of_callback= number_of_callbacks
+  };
+
+  ptr->callbacks= &cb;
+  rc= memcached_mget_by_key(ptr, master_key, master_key_length, keys,
+                            key_length, number_of_keys);
+  ptr->callbacks= original_callbacks;
+  return rc;
+}
+
 static memcached_return simple_binary_mget(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
-                                           const char **keys, size_t *key_length, 
+                                           const char * const *keys, 
+                                           const size_t *key_length, 
                                            size_t number_of_keys, bool mget_mode)
 {
   memcached_return rc= MEMCACHED_NOTFOUND;
@@ -382,19 +425,19 @@ static memcached_return simple_binary_mget(memcached_st *ptr,
 }
 
 static memcached_return replication_binary_mget(memcached_st *ptr,
-                                             uint32_t* hash, bool* dead_servers,
-                                             const char **keys, size_t *key_length,
-                                             size_t number_of_keys, bool mget_mode)
+                                                uint32_t* hash, 
+                                                bool* dead_servers,
+                                                const char *const *keys, 
+                                                const size_t *key_length,
+                                                size_t number_of_keys)
 {
   memcached_return rc= MEMCACHED_NOTFOUND;
   uint32_t x;
 
-  int flush= number_of_keys == 1;
-
   for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
   {
     bool success= true;    
-    
+
     for (x= 0; x < number_of_keys; ++x)
     {
       if (hash[x] == ptr->number_of_hosts)
@@ -419,58 +462,39 @@ static memcached_return replication_binary_mget(memcached_st *ptr,
         }
       }
 
-      protocol_binary_request_getk request= {.bytes= {0}};
-      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-      if (mget_mode)
-        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
-      else
-        request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
-
-      request.message.header.request.keylen= htons((uint16_t)key_length[x]);
-      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
-      request.message.header.request.bodylen= htonl((uint32_t) key_length[x]);
+      protocol_binary_request_getk request= {
+        .message.header.request= {
+          .magic= PROTOCOL_BINARY_REQ,
+          .opcode= PROTOCOL_BINARY_CMD_GETK,
+          .keylen= htons((uint16_t)key_length[x]),
+          .datatype= PROTOCOL_BINARY_RAW_BYTES,
+          .bodylen= htonl((uint32_t)key_length[x])
+        }
+      };
 
+      /*
+       * We need to disable buffering to actually know that the request was
+       * successfully sent to the server (so that we should expect a result
+       * back). It would be nice to do this in buffered mode, but then it
+       * would be complex to handle all error situations if we got to send
+       * some of the messages, and then we failed on writing out some others
+       * and we used the callback interface from memcached_mget_execute so
+       * that we might have processed some of the responses etc. For now,
+       * just make sure we work _correctly_
+       */
       if ((memcached_io_write(&ptr->hosts[server], request.bytes,
                               sizeof(request.bytes), 0) == -1) ||
           (memcached_io_write(&ptr->hosts[server], keys[x],
-                              key_length[x], (char) flush) == -1))
+                              key_length[x], 1) == -1))
       {
         memcached_io_reset(&ptr->hosts[server]);
         dead_servers[server]= true;
         success= false;
         continue;
       }
-      /* we just want one pending response per server */
-      memcached_server_response_reset(&ptr->hosts[server]); 
-      memcached_server_response_increment(&ptr->hosts[server]);
-    }
-
-    if (mget_mode)
-    {
-      /*
-       * Send a noop command to flush the buffers
-       */
-      protocol_binary_request_noop request= {.bytes= {0}};
-      request.message.header.request.magic= PROTOCOL_BINARY_REQ;
-      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
-      request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
 
-      for (x= 0; x < ptr->number_of_hosts; x++)
-        if (memcached_server_response_count(&ptr->hosts[x]))
-        {
-          if (memcached_io_write(&ptr->hosts[x], request.bytes,
-                                 sizeof(request.bytes), 1) == -1)
-          {
-            memcached_io_reset(&ptr->hosts[x]);
-            dead_servers[x]= true;
-            success= false;
-          }
-
-          /* mark all of the messages bound for this server as sent! */
-          for (x= 0; x < number_of_keys; ++x)
-            if (hash[x] == x)
-              hash[x]= ptr->number_of_hosts;
-        }
+      memcached_server_response_increment(&ptr->hosts[server]);
+      hash[x]= ptr->number_of_hosts;
     }
 
     if (success)
@@ -483,8 +507,10 @@ static memcached_return replication_binary_mget(memcached_st *ptr,
 static memcached_return binary_mget_by_key(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
-                                           const char **keys, size_t *key_length,
-                                           size_t number_of_keys, bool mget_mode)
+                                           const char * const *keys, 
+                                           const size_t *key_length,
+                                           size_t number_of_keys, 
+                                           bool mget_mode)
 {
   memcached_return rc;
 
@@ -516,7 +542,7 @@ static memcached_return binary_mget_by_key(memcached_st *ptr,
         hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
 
     rc= replication_binary_mget(ptr, hash, dead_servers, keys, 
-                                key_length, number_of_keys, mget_mode);
+                                key_length, number_of_keys);
 
     ptr->call_free(ptr, hash);
     ptr->call_free(ptr, dead_servers);
index 4ea04a9457b10ff05ac2071d9eda7f2f6a2e04de..bf3aeb5333728459bf243ee4af50c0daa13004bb 100644 (file)
@@ -6,8 +6,8 @@
  * Author: Brian Aker
  */
 
-#ifndef __MEMCACHED_GET_H__
-#define __MEMCACHED_GET_H__
+#ifndef LIBMEMCACHED_MEMCACHED_GET_H
+#define LIBMEMCACHED_MEMCACHED_GET_H
 
 #ifdef __cplusplus
 extern "C" {
@@ -23,7 +23,8 @@ char *memcached_get(memcached_st *ptr,
 
 LIBMEMCACHED_API
 memcached_return memcached_mget(memcached_st *ptr, 
-                                const char **keys, size_t *key_length, 
+                                const char * const *keys, 
+                                const size_t *key_length, 
                                 size_t number_of_keys);
 
 LIBMEMCACHED_API
@@ -38,8 +39,8 @@ LIBMEMCACHED_API
 memcached_return memcached_mget_by_key(memcached_st *ptr, 
                                        const char *master_key, size_t 
                                        master_key_length,
-                                       const char **keys, 
-                                       size_t *key_length, 
+                                       const char * const *keys, 
+                                       const size_t *key_length, 
                                        size_t number_of_keys);
 
 LIBMEMCACHED_API
@@ -53,10 +54,28 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr,
                                             memcached_result_st *result,
                                             memcached_return *error);
 
+LIBMEMCACHED_API
+memcached_return memcached_mget_execute(memcached_st *ptr,
+                                        const char * const *keys,
+                                        const size_t *key_length,
+                                        size_t number_of_keys,
+                                        memcached_execute_function *callback,
+                                        void *context,
+                                        unsigned int number_of_callbacks);
 
+LIBMEMCACHED_API
+memcached_return memcached_mget_execute_by_key(memcached_st *ptr,
+                                               const char *master_key,
+                                               size_t master_key_length,
+                                               const char * const *keys,
+                                               const size_t *key_length,
+                                               size_t number_of_keys,
+                                               memcached_execute_function *callback,
+                                               void *context,
+                                               unsigned int number_of_callbacks);
 
 #ifdef __cplusplus
 }
 #endif
 
-#endif /* __MEMCACHED_GET_H__ */
+#endif /* LIBMEMCACHED_MEMCACHED_GET_H */
index 411040f68e6adccaa7bf602c021b84523fcb25c9..693ce95c54ddc052642acc7fd98137c54fb8ebbc 100644 (file)
@@ -9,7 +9,7 @@
 
 typedef enum {
   MEM_READ,
-  MEM_WRITE,
+  MEM_WRITE
 } memc_read_or_write;
 
 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
@@ -18,18 +18,14 @@ static void increment_udp_message_id(memcached_server_st *ptr);
 static memcached_return io_wait(memcached_server_st *ptr,
                                 memc_read_or_write read_or_write)
 {
-  struct pollfd fds[1];
-  short flags= 0;
+  struct pollfd fds= {
+     .fd= ptr->fd,
+     .events = POLLIN
+  };
   int error;
 
-  if (read_or_write == MEM_WRITE) /* write */
-    flags= POLLOUT;
-  else
-    flags= POLLIN;
-
-  memset(&fds, 0, sizeof(struct pollfd));
-  fds[0].fd= ptr->fd;
-  fds[0].events= flags;
+  unlikely (read_or_write == MEM_WRITE) /* write */
+    fds.events= POLLOUT;
 
   /*
   ** We are going to block on write, but at least on Solaris we might block
@@ -41,26 +37,109 @@ static memcached_return io_wait(memcached_server_st *ptr,
   */
   if (read_or_write == MEM_WRITE)
   {
-    memcached_return rc=memcached_purge(ptr);
+    memcached_return rc= memcached_purge(ptr);
     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
-       return MEMCACHED_FAILURE;
+      return MEMCACHED_FAILURE;
   }
 
-  error= poll(fds, 1, ptr->root->poll_timeout);
+  int timeout= ptr->root->poll_timeout;
+  if ((ptr->root->flags & MEM_NO_BLOCK) == 0)
+    timeout= -1;
+
+  error= poll(&fds, 1, timeout);
 
   if (error == 1)
     return MEMCACHED_SUCCESS;
   else if (error == 0)
-  {
     return MEMCACHED_TIMEOUT;
-  }
 
   /* Imposssible for anything other then -1 */
   WATCHPOINT_ASSERT(error == -1);
   memcached_quit_server(ptr, 1);
 
   return MEMCACHED_FAILURE;
+}
+
+/**
+ * Try to fill the input buffer for a server with as much
+ * data as possible.
+ *
+ * @param ptr the server to pack
+ */
+static bool repack_input_buffer(memcached_server_st *ptr)
+{
+  if (ptr->read_ptr != ptr->read_buffer)
+  {
+    /* Move all of the data to the beginning of the buffer so
+    ** that we can fit more data into the buffer...
+    */
+    memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
+    ptr->read_ptr= ptr->read_buffer;
+    ptr->read_data_length= ptr->read_buffer_length;
+  }
+
+  /* There is room in the buffer, try to fill it! */
+  if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
+  {
+    /* Just try a single read to grab what's available */
+    ssize_t nr= read(ptr->fd,
+                     ptr->read_ptr + ptr->read_data_length,
+                     MEMCACHED_MAX_BUFFER - ptr->read_data_length);
 
+    if (nr > 0)
+    {
+      ptr->read_data_length+= (size_t)nr;
+      ptr->read_buffer_length+= (size_t)nr;
+      return true;
+    }
+  }
+  return false;
+}
+
+/**
+ * If the we have callbacks connected to this server structure
+ * we may start process the input queue and fire the callbacks
+ * for the incomming messages. This function is _only_ called
+ * when the input buffer is full, so that we _know_ that we have
+ * at least _one_ message to process.
+ *
+ * @param ptr the server to star processing iput messages for
+ * @return true if we processed anything, false otherwise
+ */
+static bool process_input_buffer(memcached_server_st *ptr)
+{
+  /*
+  ** We might be able to process some of the response messages if we
+  ** have a callback set up
+  */
+  if (ptr->root->callbacks != NULL && (ptr->root->flags & MEM_USE_UDP) == 0)
+  {
+    /*
+     * We might have responses... try to read them out and fire
+     * callbacks
+     */
+    memcached_callback_st cb= *ptr->root->callbacks;
+
+    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+    memcached_return error;
+    error= memcached_response(ptr, buffer, sizeof(buffer),
+                              &ptr->root->result);
+    if (error == MEMCACHED_SUCCESS)
+    {
+      for (unsigned int x= 0; x < cb.number_of_callback; x++)
+      {
+        error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context);
+        if (error != MEMCACHED_SUCCESS)
+          break;
+      }
+
+      /* @todo what should I do with the error message??? */
+    }
+    /* @todo what should I do with other error messages?? */
+    return true;
+  }
+
+  return false;
 }
 
 #ifdef UNUSED
@@ -385,6 +464,16 @@ static ssize_t io_flush(memcached_server_st *ptr,
         continue;
       case EAGAIN:
       {
+        /*
+         * We may be blocked on write because the input buffer
+         * is full. Let's check if we have room in our input
+         * buffer for more data and retry the write before
+         * waiting..
+         */
+        if (repack_input_buffer(ptr) ||
+            process_input_buffer(ptr))
+          continue;
+
         memcached_return rc;
         rc= io_wait(ptr, MEM_WRITE);
 
@@ -429,7 +518,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
   return (ssize_t) return_length;
 }
 
-/* 
+/*
   Eventually we will just kill off the server with the problem.
 */
 void memcached_io_reset(memcached_server_st *ptr)
@@ -439,7 +528,7 @@ void memcached_io_reset(memcached_server_st *ptr)
 
 /**
  * Read a given number of bytes from the server and place it into a specific
- * buffer. Reset the IO channel on this server if an error occurs. 
+ * buffer. Reset the IO channel on this server if an error occurs.
  */
 memcached_return memcached_safe_read(memcached_server_st *ptr,
                                      void *dta,
@@ -526,7 +615,7 @@ static void increment_udp_message_id(memcached_server_st *ptr)
   uint16_t cur_req= get_udp_datagram_request_id(header);
   int msg_num= get_msg_num_from_request_id(cur_req);
   int thread_id= get_thread_id_from_request_id(cur_req);
-  
+
   if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
     msg_num= 0;
 
index c8a3a0eb4f5d59d325f88614adc7ce3dad099827..9aac1ebcfd24f0828b66e8d75ce1e5cd8f35716a 100644 (file)
@@ -1,6 +1,7 @@
 #include "common.h"
 
-memcached_return memcached_key_test(const char **keys, size_t *key_length, 
+memcached_return memcached_key_test(const char * const *keys, 
+                                    const size_t *key_length, 
                                     size_t number_of_keys)
 {
   uint32_t x;
@@ -14,8 +15,6 @@ memcached_return memcached_key_test(const char **keys, size_t *key_length,
     if (rc != MEMCACHED_SUCCESS)
       return rc;
     
-
-
     for (y= 0; y < *(key_length + x); y++)
     {
       if ((isgraph(keys[x][y])) == 0)
index d7bb470ab915d9868df4133e8b7f26260f267294..9d2537d4064f8a5804c0584a7c4e087238f35617 100644 (file)
@@ -6,8 +6,8 @@
  * Author: Brian Aker
  */
 
-#ifndef __MEMCACHED_TYPES_H__
-#define __MEMCACHED_TYPES_H__
+#ifndef LIBMEMCACHED_MEMCACHED_TYPES_H
+#define LIBMEMCACHED_MEMCACHED_TYPES_H
 
 #ifdef __cplusplus
 extern "C" {
@@ -37,8 +37,14 @@ typedef memcached_return (*memcached_trigger_delete_key)(memcached_st *ptr,
 typedef memcached_return (*memcached_dump_func)(memcached_st *ptr,  
                                                 const char *key, size_t key_length, void *context);
 
+typedef struct {
+  memcached_execute_function *callback;
+  void *context;
+  unsigned int number_of_callback;
+} memcached_callback_st;
+
 #ifdef __cplusplus
 }
 #endif
 
-#endif /* __MEMCACHED_TYPES_H__ */
+#endif /* LIBMEMCACHED_MEMCACHED_TYPES_H */
index 8e33968e03c8f35c8fd89851c70f5904e8af429d..2bc0192b3f0cf39f510eefcb729ae4b2852e2ce2 100644 (file)
@@ -1449,6 +1449,70 @@ static test_return_t  mget_test(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
+static test_return_t mget_execute(memcached_st *memc)
+{
+  bool binary= false;
+  if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) != 0)
+    binary= true;
+
+  /*
+   * I only want to hit _one_ server so I know the number of requests I'm
+   * sending in the pipeline.
+   */
+  uint32_t number_of_hosts= memc->number_of_hosts;
+  memc->number_of_hosts= 1;
+
+  int max_keys= binary ? 20480 : 1;
+  
+
+  char **keys= calloc((size_t)max_keys, sizeof(char*));
+  size_t *key_length=calloc((size_t)max_keys, sizeof(size_t));
+
+  /* First add all of the items.. */
+  char blob[1024] = {0};
+  memcached_return rc;
+  for (int x= 0; x < max_keys; ++x)
+  {
+    char k[251];
+    key_length[x]= (size_t)snprintf(k, sizeof(k), "0200%u", x);
+    keys[x]= strdup(k);
+    assert(keys[x] != NULL);
+    rc= memcached_add(memc, keys[x], key_length[x], blob, sizeof(blob), 0, 0);
+    assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+  }
+
+  /* Try to get all of them with a large multiget */
+  unsigned int counter= 0;
+  memcached_execute_function callbacks[1]= { [0]= &callback_counter };
+  rc= memcached_mget_execute(memc, (const char**)keys, key_length,
+                             (size_t)max_keys, callbacks, &counter, 1);
+
+  if (binary)
+  {
+    assert(rc == MEMCACHED_SUCCESS);
+
+    rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+    assert(rc == MEMCACHED_END);
+
+    /* Verify that we got all of the items */
+    assert(counter == (unsigned int)max_keys);
+  }
+  else
+  {
+    assert(rc == MEMCACHED_NOT_SUPPORTED);
+    assert(counter == 0);
+  }
+
+  /* Release all allocated resources */
+  for (int x= 0; x < max_keys; ++x)
+    free(keys[x]);
+  free(keys);
+  free(key_length);
+
+  memc->number_of_hosts= number_of_hosts;
+  return TEST_SUCCESS;
+}
+
 static test_return_t  get_stats_keys(memcached_st *memc)
 {
  char **list;
@@ -4732,6 +4796,114 @@ static test_return_t regression_bug_442914(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
+static test_return_t regression_bug_447342(memcached_st *memc)
+{
+  if (memc->number_of_hosts < 3 || pre_replication(memc) != MEMCACHED_SUCCESS)
+    return TEST_SKIPPED;
+
+  memcached_return rc;
+
+  rc= memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS, 2);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  const size_t max_keys= 100;
+  char **keys= calloc(max_keys, sizeof(char*));
+  size_t *key_length=calloc(max_keys, sizeof(size_t));
+
+  for (int x= 0; x < (int)max_keys; ++x)
+  {
+    char k[251];
+    key_length[x]= (size_t)snprintf(k, sizeof(k), "0200%u", x);
+    keys[x]= strdup(k);
+    assert(keys[x] != NULL);
+    rc= memcached_set(memc, k, key_length[x], k, key_length[x], 0, 0);
+    assert(rc == MEMCACHED_SUCCESS);
+  }
+
+  /*
+  ** We are using the quiet commands to store the replicas, so we need
+  ** to ensure that all of them are processed before we can continue.
+  ** In the test we go directly from storing the object to trying to
+  ** receive the object from all of the different servers, so we
+  ** could end up in a race condition (the memcached server hasn't yet
+  ** processed the quiet command from the replication set when it process
+  ** the request from the other client (created by the clone)). As a
+  ** workaround for that we call memcached_quit to send the quit command
+  ** to the server and wait for the response ;-) If you use the test code
+  ** as an example for your own code, please note that you shouldn't need
+  ** to do this ;-)
+  */
+  memcached_quit(memc);
+  
+  /* Verify that all messages are stored, and we didn't stuff too much 
+   * into the servers
+   */
+  rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  unsigned int counter= 0;
+  memcached_execute_function callbacks[1]= { [0]= &callback_counter };
+  rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+  /* Verify that we received all of the key/value pairs */
+  assert(counter == (unsigned int)max_keys);
+
+  memcached_quit(memc);
+  /*
+   * Don't do the following in your code. I am abusing the internal details
+   * within the library, and this is not a supported interface.
+   * This is to verify correct behavior in the library. Fake that two servers
+   * are dead..
+   */
+  unsigned int port0= memc->hosts[0].port;
+  unsigned int port2= memc->hosts[2].port;
+  memc->hosts[0].port= 0;
+  memc->hosts[2].port= 0;
+
+  rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  counter= 0;
+  rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+  assert(counter == (unsigned int)max_keys);
+
+  /* restore the memc handle */
+  memc->hosts[0].port= port0;
+  memc->hosts[2].port= port2;
+
+  memcached_quit(memc);
+
+  /* Remove half of the objects */
+  for (int x= 0; x < (int)max_keys; ++x)
+    if (x & 1)
+    {
+      rc= memcached_delete(memc, keys[x], key_length[x], 0);
+      assert(rc == MEMCACHED_SUCCESS);
+    }
+
+  memcached_quit(memc);
+  memc->hosts[0].port= 0;
+  memc->hosts[2].port= 0;
+
+  /* now retry the command, this time we should have cache misses */
+  rc= memcached_mget(memc, (const char* const *)keys, key_length, max_keys);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  counter= 0;
+  rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+  assert(counter == (unsigned int)(max_keys >> 1));
+
+  /* Release allocated resources */
+  for (size_t x= 0; x < max_keys; ++x)
+    free(keys[x]);
+  free(keys);
+  free(key_length);
+
+  /* restore the memc handle */
+  memc->hosts[0].port= port0;
+  memc->hosts[2].port= port2;
+  return TEST_SUCCESS;
+}
+
 /* Test memcached_server_get_last_disconnect
  * For a working server set, shall be NULL
  * For a set of non existing server, shall not be NULL
@@ -4892,6 +5064,7 @@ test_st tests[] ={
   {"mget_result", 1, mget_result_test },
   {"mget_result_alloc", 1, mget_result_alloc_test },
   {"mget_result_function", 1, mget_result_function },
+  {"mget_execute", 1, mget_execute },
   {"mget_end", 0, mget_end },
   {"get_stats", 0, get_stats },
   {"add_host_test", 0, add_host_test },
@@ -4998,6 +5171,7 @@ test_st regression_tests[]= {
   {"lp:434843 buffered", 1, regression_bug_434843_buffered },
   {"lp:421108", 1, regression_bug_421108 },
   {"lp:442914", 1, regression_bug_442914 },
+  {"lp:447342", 1, regression_bug_447342 },
   {0, 0, 0}
 };