Initial implementation of memcached_mget_execute
authorTrond Norbye <trond.norbye@sun.com>
Tue, 13 Oct 2009 10:05:51 +0000 (12:05 +0200)
committerTrond Norbye <trond.norbye@sun.com>
Tue, 13 Oct 2009 10:05:51 +0000 (12:05 +0200)
libmemcached/memcached.h
libmemcached/memcached_connect.c
libmemcached/memcached_get.c
libmemcached/memcached_get.h
libmemcached/memcached_io.c
libmemcached/memcached_types.h
tests/function.c

index 08c66a2c2b7f8688d9beb43f2355fb48f3d682d2..5d58acbf27dce03a22ec53f4b7adf26818a34ecb 100644 (file)
@@ -113,6 +113,7 @@ struct memcached_st {
   memcached_trigger_delete_key delete_trigger;
   char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE];
   uint32_t number_of_replicas;
+  memcached_callback_st *callbacks;
 };
 
 LIBMEMCACHED_API
index cee75e4c7c24412752155464c4f5e263f2810d26..f3ee14810081af272b06c092005410bbaee131d8 100644 (file)
@@ -121,16 +121,25 @@ static memcached_return set_socket_options(memcached_server_st *ptr)
     WATCHPOINT_ASSERT(error == 0);
   }
 
-  /* For the moment, not getting a nonblocking mode will not be fatal */
-  if ((ptr->root->flags & MEM_NO_BLOCK) || ptr->root->connect_timeout)
-  {
-    int flags;
+  /* libmemcached will always use nonblocking IO to avoid write deadlocks */
+  int flags;
 
+  do
     flags= fcntl(ptr->fd, F_GETFL, 0);
-    unlikely (flags != -1)
-    {
-      (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
-    }
+  while (flags == -1 && (errno == EINTR || errno == EAGAIN));
+
+  unlikely (flags == -1)
+    return MEMCACHED_CONNECTION_FAILURE;
+  else if ((flags & O_NONBLOCK) == 0)
+  {
+    int rval;
+
+    do
+      rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
+    while (rval == -1 && (errno == EINTR || errno == EAGAIN));
+
+    unlikely (rval == -1)
+      return MEMCACHED_CONNECTION_FAILURE;
   }
 
   return MEMCACHED_SUCCESS;
@@ -219,14 +228,6 @@ static memcached_return network_connect(memcached_server_st *ptr)
 
       (void)set_socket_options(ptr);
 
-      int flags= 0;
-      if (ptr->root->connect_timeout)
-      {
-        flags= fcntl(ptr->fd, F_GETFL, 0);
-        if (flags != -1 && !(flags & O_NONBLOCK))
-          (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
-      }
-
       /* connect to server */
       while (ptr->fd != -1 &&
              connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
@@ -268,10 +269,6 @@ static memcached_return network_connect(memcached_server_st *ptr)
 
       if (ptr->fd != -1)
       {
-        /* restore flags */
-        if (ptr->root->connect_timeout && (ptr->root->flags & MEM_NO_BLOCK) == 0)
-          (void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK);
-
         WATCHPOINT_ASSERT(ptr->cursor_active == 0);
         ptr->server_failure_counter= 0;
         return MEMCACHED_SUCCESS;
@@ -290,9 +287,10 @@ static memcached_return network_connect(memcached_server_st *ptr)
       if (gettimeofday(&next_time, NULL) == 0)
         ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
     }
-    ptr->server_failure_counter+= 1;
+    ptr->server_failure_counter++;
     if (ptr->cached_errno == 0)
       return MEMCACHED_TIMEOUT;
+
     return MEMCACHED_ERRNO; /* The last error should be from connect() */
   }
 
index 7ee714fe2e3ead8dc5f887b2f4d5bdc04c36903c..53c76f4bd67279599c5d6296098a8f970582d683 100644 (file)
@@ -277,6 +277,34 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
                                     key_length, number_of_keys, true);
 }
 
+memcached_return memcached_mget_execute(memcached_st *ptr,
+                                        const char *master_key,
+                                        size_t master_key_length,
+                                        const char **keys,
+                                        size_t *key_length,
+                                        size_t number_of_keys,
+                                        memcached_execute_function *callback,
+                                        void *context,
+                                        unsigned int number_of_callbacks)
+{
+  if ((ptr->flags & MEM_BINARY_PROTOCOL) == 0)
+    return MEMCACHED_NOT_SUPPORTED;
+
+  memcached_return rc;
+  memcached_callback_st *original_callbacks= ptr->callbacks;
+  memcached_callback_st cb= {
+    .callback= callback,
+    .context= context,
+    .number_of_callback= 1
+  };
+
+  ptr->callbacks= &cb;
+  rc= memcached_mget_by_key(ptr, master_key, master_key_length, keys,
+                            key_length, number_of_keys);
+  ptr->callbacks= original_callbacks;
+  return rc;
+}
+
 static memcached_return simple_binary_mget(memcached_st *ptr,
                                            unsigned int master_server_key,
                                            bool is_master_key_set,
index 4ea04a9457b10ff05ac2071d9eda7f2f6a2e04de..6e10ad182bc030b742f79eb6035b58eb2ed5c5d0 100644 (file)
@@ -6,8 +6,8 @@
  * Author: Brian Aker
  */
 
-#ifndef __MEMCACHED_GET_H__
-#define __MEMCACHED_GET_H__
+#ifndef LIBMEMCACHED_MEMCACHED_GET_H
+#define LIBMEMCACHED_MEMCACHED_GET_H
 
 #ifdef __cplusplus
 extern "C" {
@@ -53,10 +53,19 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr,
                                             memcached_result_st *result,
                                             memcached_return *error);
 
-
+LIBMEMCACHED_API
+memcached_return memcached_mget_execute(memcached_st *ptr,
+                                        const char *master_key,
+                                        size_t master_key_length,
+                                        const char **keys,
+                                        size_t *key_length,
+                                        size_t number_of_keys,
+                                        memcached_execute_function *callback,
+                                        void *context,
+                                        unsigned int number_of_callbacks);
 
 #ifdef __cplusplus
 }
 #endif
 
-#endif /* __MEMCACHED_GET_H__ */
+#endif /* LIBMEMCACHED_MEMCACHED_GET_H */
index 411040f68e6adccaa7bf602c021b84523fcb25c9..6e517144cb966358a4a0f8be55a80b84e9c6dc19 100644 (file)
@@ -9,7 +9,7 @@
 
 typedef enum {
   MEM_READ,
-  MEM_WRITE,
+  MEM_WRITE
 } memc_read_or_write;
 
 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
@@ -18,18 +18,15 @@ static void increment_udp_message_id(memcached_server_st *ptr);
 static memcached_return io_wait(memcached_server_st *ptr,
                                 memc_read_or_write read_or_write)
 {
-  struct pollfd fds[1];
+  struct pollfd fds[1]= {
+     [0].fd= ptr->fd,
+     [0].events = POLLIN
+  };
   short flags= 0;
   int error;
 
-  if (read_or_write == MEM_WRITE) /* write */
-    flags= POLLOUT;
-  else
-    flags= POLLIN;
-
-  memset(&fds, 0, sizeof(struct pollfd));
-  fds[0].fd= ptr->fd;
-  fds[0].events= flags;
+  unlikely (read_or_write == MEM_WRITE) /* write */
+    fds[0].events= POLLOUT;
 
   /*
   ** We are going to block on write, but at least on Solaris we might block
@@ -41,26 +38,109 @@ static memcached_return io_wait(memcached_server_st *ptr,
   */
   if (read_or_write == MEM_WRITE)
   {
-    memcached_return rc=memcached_purge(ptr);
+    memcached_return rc= memcached_purge(ptr);
     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
-       return MEMCACHED_FAILURE;
+      return MEMCACHED_FAILURE;
   }
 
-  error= poll(fds, 1, ptr->root->poll_timeout);
+  int timeout= ptr->root->poll_timeout;
+  if ((ptr->root->flags & MEM_NO_BLOCK) == 0)
+    timeout= -1;
+
+  error= poll(fds, 1, timeout);
 
   if (error == 1)
     return MEMCACHED_SUCCESS;
   else if (error == 0)
-  {
     return MEMCACHED_TIMEOUT;
-  }
 
   /* Imposssible for anything other then -1 */
   WATCHPOINT_ASSERT(error == -1);
   memcached_quit_server(ptr, 1);
 
   return MEMCACHED_FAILURE;
+}
+
+/**
+ * Try to fill the input buffer for a server with as much
+ * data as possible.
+ *
+ * @param ptr the server to pack
+ */
+static bool repack_input_buffer(memcached_server_st *ptr)
+{
+  if (ptr->read_ptr != ptr->read_buffer)
+  {
+    /* Move all of the data to the beginning of the buffer so
+    ** that we can fit more data into the buffer...
+    */
+    memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
+    ptr->read_ptr= ptr->read_buffer;
+    ptr->read_data_length= ptr->read_buffer_length;
+  }
+
+  /* There is room in the buffer, try to fill it! */
+  if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
+  {
+    /* Just try a single read to grab what's available */
+    ssize_t nr= read(ptr->fd,
+                     ptr->read_ptr + ptr->read_data_length,
+                     MEMCACHED_MAX_BUFFER - ptr->read_data_length);
 
+    if (nr > 0)
+    {
+      ptr->read_data_length+= (size_t)nr;
+      ptr->read_buffer_length+= (size_t)nr;
+      return true;
+    }
+  }
+  return false;
+}
+
+/**
+ * If the we have callbacks connected to this server structure
+ * we may start process the input queue and fire the callbacks
+ * for the incomming messages. This function is _only_ called
+ * when the input buffer is full, so that we _know_ that we have
+ * at least _one_ message to process.
+ *
+ * @param ptr the server to star processing iput messages for
+ * @return true if we processed anything, false otherwise
+ */
+static bool process_input_buffer(memcached_server_st *ptr)
+{
+  /*
+  ** We might be able to process some of the response messages if we
+  ** have a callback set up
+  */
+  if (ptr->root->callbacks != NULL && (ptr->root->flags & MEM_USE_UDP) == 0)
+  {
+    /*
+     * We might have responses... try to read them out and fire
+     * callbacks
+     */
+    memcached_callback_st cb= *ptr->root->callbacks;
+
+    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+    memcached_return error;
+    error= memcached_response(ptr, buffer, sizeof(buffer),
+                              &ptr->root->result);
+    if (error == MEMCACHED_SUCCESS)
+    {
+      for (int x= 0; x < cb.number_of_callback; x++)
+      {
+        error= (*cb.callback[x])(ptr->root, &ptr->root->result, cb.context);
+        if (error != MEMCACHED_SUCCESS)
+          break;
+      }
+
+      /* @todo what should I do with the error message??? */
+    }
+    /* @todo what should I do with other error messages?? */
+    return true;
+  }
+
+  return false;
 }
 
 #ifdef UNUSED
@@ -385,6 +465,16 @@ static ssize_t io_flush(memcached_server_st *ptr,
         continue;
       case EAGAIN:
       {
+        /*
+         * We may be blocked on write because the input buffer
+         * is full. Let's check if we have room in our input
+         * buffer for more data and retry the write before
+         * waiting..
+         */
+        if (repack_input_buffer(ptr) ||
+            process_input_buffer(ptr))
+          continue;
+
         memcached_return rc;
         rc= io_wait(ptr, MEM_WRITE);
 
@@ -429,7 +519,7 @@ static ssize_t io_flush(memcached_server_st *ptr,
   return (ssize_t) return_length;
 }
 
-/* 
+/*
   Eventually we will just kill off the server with the problem.
 */
 void memcached_io_reset(memcached_server_st *ptr)
@@ -439,7 +529,7 @@ void memcached_io_reset(memcached_server_st *ptr)
 
 /**
  * Read a given number of bytes from the server and place it into a specific
- * buffer. Reset the IO channel on this server if an error occurs. 
+ * buffer. Reset the IO channel on this server if an error occurs.
  */
 memcached_return memcached_safe_read(memcached_server_st *ptr,
                                      void *dta,
@@ -526,7 +616,7 @@ static void increment_udp_message_id(memcached_server_st *ptr)
   uint16_t cur_req= get_udp_datagram_request_id(header);
   int msg_num= get_msg_num_from_request_id(cur_req);
   int thread_id= get_thread_id_from_request_id(cur_req);
-  
+
   if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
     msg_num= 0;
 
index d7bb470ab915d9868df4133e8b7f26260f267294..9d2537d4064f8a5804c0584a7c4e087238f35617 100644 (file)
@@ -6,8 +6,8 @@
  * Author: Brian Aker
  */
 
-#ifndef __MEMCACHED_TYPES_H__
-#define __MEMCACHED_TYPES_H__
+#ifndef LIBMEMCACHED_MEMCACHED_TYPES_H
+#define LIBMEMCACHED_MEMCACHED_TYPES_H
 
 #ifdef __cplusplus
 extern "C" {
@@ -37,8 +37,14 @@ typedef memcached_return (*memcached_trigger_delete_key)(memcached_st *ptr,
 typedef memcached_return (*memcached_dump_func)(memcached_st *ptr,  
                                                 const char *key, size_t key_length, void *context);
 
+typedef struct {
+  memcached_execute_function *callback;
+  void *context;
+  unsigned int number_of_callback;
+} memcached_callback_st;
+
 #ifdef __cplusplus
 }
 #endif
 
-#endif /* __MEMCACHED_TYPES_H__ */
+#endif /* LIBMEMCACHED_MEMCACHED_TYPES_H */
index 8e33968e03c8f35c8fd89851c70f5904e8af429d..13daaa29f32909a5ddf1b0860425d2d07839848c 100644 (file)
@@ -1449,6 +1449,71 @@ static test_return_t  mget_test(memcached_st *memc)
   return TEST_SUCCESS;
 }
 
+static test_return_t mget_execute(memcached_st *memc)
+{
+  bool binary= false;
+  if (memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) != 0)
+    binary= true;
+
+  /*
+   * I only want to hit _one_ server so I know the number of requests I'm
+   * sending in the pipeline.
+   */
+  uint32_t number_of_hosts= memc->number_of_hosts;
+  memc->number_of_hosts= 1;
+
+  int max_keys= binary ? 20480 : 1;
+  
+
+  char **keys= calloc(max_keys, sizeof(char*));
+  size_t *key_length=calloc(max_keys, sizeof(size_t));
+
+  /* First add all of the items.. */
+  char blob[1024] = {0};
+  memcached_return rc;
+  for (int x= 0; x < max_keys; ++x)
+  {
+    char k[251];
+    key_length[x]= snprintf(k, sizeof(k), "0200%u", x);
+    keys[x]= strdup(k);
+    assert(keys[x] != NULL);
+    rc= memcached_add(memc, keys[x], key_length[x], blob, sizeof(blob), 0, 0);
+    assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+  }
+
+  /* Try to get all of them with a large multiget */
+  unsigned int counter= 0;
+  memcached_execute_function callbacks[1]= { [0]= &callback_counter };
+  rc= memcached_mget_execute(memc, NULL, 0,
+                             (const char**)keys, key_length,
+                             max_keys, callbacks, &counter, 1);
+
+  if (binary)
+  {
+    assert(rc == MEMCACHED_SUCCESS);
+
+    rc= memcached_fetch_execute(memc, callbacks, (void *)&counter, 1);
+    assert(rc == MEMCACHED_END);
+
+    /* Verify that we got all of the items */
+    assert(counter == max_keys);
+  }
+  else
+  {
+    assert(rc == MEMCACHED_NOT_SUPPORTED);
+    assert(counter == 0);
+  }
+
+  /* Release all allocated resources */
+  for (int x= 0; x < max_keys; ++x)
+    free(keys[x]);
+  free(keys);
+  free(key_length);
+
+  memc->number_of_hosts= number_of_hosts;
+  return TEST_SUCCESS;
+}
+
 static test_return_t  get_stats_keys(memcached_st *memc)
 {
  char **list;
@@ -4892,6 +4957,7 @@ test_st tests[] ={
   {"mget_result", 1, mget_result_test },
   {"mget_result_alloc", 1, mget_result_alloc_test },
   {"mget_result_function", 1, mget_result_function },
+  {"mget_execute", 1, mget_execute },
   {"mget_end", 0, mget_end },
   {"get_stats", 0, get_stats },
   {"add_host_test", 0, add_host_test },