Add test case for situation where callback shouldn't be set because of behavior settings.
[awesomized/libmemcached] / libmemcached / util / pool.cc
index 17cff07726e80b49e6a5b921ad3cfa9ae4736cab..47f46eec224b7d726ecc4fd3c9e446cbb04221e5 100644 (file)
 #include <libmemcached/common.h>
 #include <libmemcached/memcached_util.h>
 
+#include <libmemcached/error.hpp>
+
 #include <cassert>
 #include <cerrno>
 #include <pthread.h>
 #include <memory>
 
-static bool grow_pool(memcached_pool_st* pool);
-
 struct memcached_pool_st
 {
   pthread_mutex_t mutex;
@@ -56,6 +56,7 @@ struct memcached_pool_st
   const uint32_t size;
   uint32_t current_size;
   bool _owns_master;
+  struct timespec _timeout;
 
   memcached_pool_st(memcached_st *master_arg, size_t max_arg) :
     master(master_arg),
@@ -67,26 +68,21 @@ struct memcached_pool_st
   {
     pthread_mutex_init(&mutex, NULL);
     pthread_cond_init(&cond, NULL);
+    _timeout.tv_sec= 5;
+    _timeout.tv_nsec= 0;
   }
 
-  bool init(uint32_t initial)
+  const struct timespec& timeout() const
   {
-    server_pool= new (std::nothrow) memcached_st *[size];
-    if (not server_pool)
-      return false;
+    return _timeout;
+  }
 
-    /*
-      Try to create the initial size of the pool. An allocation failure at
-      this time is not fatal..
-    */
-    for (unsigned int x= 0; x < initial; ++x)
-    {
-      if (not grow_pool(this))
-        break;
-    }
+  bool release(memcached_st*, memcached_return_t& rc);
 
-    return true;
-  }
+  memcached_st *fetch(memcached_return_t& rc);
+  memcached_st *fetch(const struct timespec&, memcached_return_t& rc);
+
+  bool init(uint32_t initial);
 
   ~memcached_pool_st()
   {
@@ -121,27 +117,6 @@ struct memcached_pool_st
   }
 };
 
-static memcached_return_t mutex_enter(pthread_mutex_t *mutex)
-{
-  int ret;
-  do
-  {
-    ret= pthread_mutex_lock(mutex);
-  } while (ret == -1 && errno == EINTR);
-
-  return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
-}
-
-static memcached_return_t mutex_exit(pthread_mutex_t *mutex)
-{
-  int ret;
-  do
-  {
-    ret= pthread_mutex_unlock(mutex);
-  } while (ret == -1 && errno == EINTR);
-
-  return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
-}
 
 /**
  * Grow the connection pool by creating a connection structure and clone the
@@ -149,6 +124,8 @@ static memcached_return_t mutex_exit(pthread_mutex_t *mutex)
  */
 static bool grow_pool(memcached_pool_st* pool)
 {
+  assert(pool);
+
   memcached_st *obj;
   if (not (obj= memcached_clone(NULL, pool->master)))
   {
@@ -162,18 +139,38 @@ static bool grow_pool(memcached_pool_st* pool)
   return true;
 }
 
+bool memcached_pool_st::init(uint32_t initial)
+{
+  server_pool= new (std::nothrow) memcached_st *[size];
+  if (not server_pool)
+    return false;
+
+  /*
+    Try to create the initial size of the pool. An allocation failure at
+    this time is not fatal..
+  */
+  for (unsigned int x= 0; x < initial; ++x)
+  {
+    if (grow_pool(this) == false)
+    {
+      break;
+    }
+  }
+
+  return true;
+}
+
+
 static inline memcached_pool_st *_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
 {
-  if (! initial || ! max || initial > max)
+  if (initial == 0 or max == 0 or (initial > max))
   {
-    errno= EINVAL;
     return NULL;
   }
 
   memcached_pool_st *object= new (std::nothrow) memcached_pool_st(master, max);
-  if (not object)
+  if (object == NULL)
   {
-    errno= ENOMEM; // Set this for the failed calloc
     return NULL;
   }
 
@@ -199,18 +196,17 @@ memcached_pool_st * memcached_pool(const char *option_string, size_t option_stri
 {
   memcached_st *memc= memcached(option_string, option_string_length);
 
-  if (not memc)
+  if (memc == NULL)
+  {
     return NULL;
+  }
 
-  memcached_pool_st *self;
-  self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
-  if (not self)
+  memcached_pool_st *self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
+  if (self == NULL)
   {
     memcached_free(memc);
-    errno= ENOMEM;
     return NULL;
   }
-  errno= 0;
 
   self->_owns_master= true;
 
@@ -219,8 +215,10 @@ memcached_pool_st * memcached_pool(const char *option_string, size_t option_stri
 
 memcached_st*  memcached_pool_destroy(memcached_pool_st* pool)
 {
-  if (not pool)
+  if (pool == NULL)
+  {
     return NULL;
+  }
 
   // Legacy that we return the original structure
   memcached_st *ret= NULL;
@@ -236,94 +234,180 @@ memcached_st*  memcached_pool_destroy(memcached_pool_st* pool)
   return ret;
 }
 
-memcached_st* memcached_pool_pop(memcached_pool_st* pool,
-                                 bool block,
-                                 memcached_return_t *rc)
+memcached_st* memcached_pool_st::fetch(memcached_return_t& rc)
 {
-  assert(pool);
-  assert(rc);
-  if (not pool ||  not rc)
-  {
-    errno= EINVAL;
-    return NULL;
-  }
+  static struct timespec relative_time= { 0, 0 };
+  return fetch(relative_time, rc);
+}
+
+memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, memcached_return_t& rc)
+{
+  rc= MEMCACHED_SUCCESS;
 
-  if ((*rc= mutex_enter(&pool->mutex)) != MEMCACHED_SUCCESS)
+  if (pthread_mutex_lock(&mutex))
   {
+    rc= MEMCACHED_IN_PROGRESS;
     return NULL;
   }
 
   memcached_st *ret= NULL;
   do
   {
-    if (pool->firstfree > -1)
+    if (firstfree > -1)
     {
-      ret= pool->server_pool[pool->firstfree--];
+      ret= server_pool[firstfree--];
     }
-    else if (pool->current_size == pool->size)
+    else if (current_size == size)
     {
-      if (not block)
+      if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0)
       {
-        *rc= mutex_exit(&pool->mutex); // this should be a different error
+        pthread_mutex_unlock(&mutex);
+        rc= MEMCACHED_NOTFOUND;
+
         return NULL;
       }
 
-      if (pthread_cond_wait(&pool->cond, &pool->mutex) == -1)
+      struct timespec time_to_wait= {0, 0};
+      time_to_wait.tv_sec= time(NULL) +relative_time.tv_sec;
+      time_to_wait.tv_nsec= relative_time.tv_nsec;
+
+      int thread_ret;
+      if ((thread_ret= pthread_cond_timedwait(&cond, &mutex, &time_to_wait)) != 0)
       {
-        int err= errno;
-        mutex_exit(&pool->mutex);
-        errno= err;
-        *rc= MEMCACHED_ERRNO;
+        pthread_mutex_unlock(&mutex);
+
+        if (thread_ret == ETIMEDOUT)
+        {
+          rc= MEMCACHED_TIMEOUT;
+        }
+        else
+        {
+          errno= thread_ret;
+          rc= MEMCACHED_ERRNO;
+        }
+
         return NULL;
       }
     }
-    else if (not grow_pool(pool))
+    else if (grow_pool(this) == false)
     {
-      (void)mutex_exit(&pool->mutex);
-      *rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+      (void)pthread_mutex_unlock(&mutex);
       return NULL;
     }
-  }
-  while (ret == NULL);
+  } while (ret == NULL);
 
-  *rc= mutex_exit(&pool->mutex);
+  pthread_mutex_unlock(&mutex);
 
   return ret;
 }
 
-memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
+bool memcached_pool_st::release(memcached_st *released, memcached_return_t& rc)
 {
-  if (not pool)
-    return MEMCACHED_INVALID_ARGUMENTS;
-
-  memcached_return_t rc= mutex_enter(&pool->mutex);
+  rc= MEMCACHED_SUCCESS;
+  if (released == NULL)
+  {
+    rc= MEMCACHED_INVALID_ARGUMENTS;
+    return false;
+  }
 
-  if (rc != MEMCACHED_SUCCESS)
-    return rc;
+  if (pthread_mutex_lock(&mutex))
+  {
+    rc= MEMCACHED_IN_PROGRESS;
+    return false;
+  }
 
-  /* Someone updated the behavior on the object.. */
-  if (not pool->compare_version(released))
+  /* 
+    Someone updated the behavior on the object, so we clone a new memcached_st with the new settings. If we fail to clone, we keep the old one around.
+  */
+  if (compare_version(released) == false)
   {
-    memcached_free(released);
-    if (not (released= memcached_clone(NULL, pool->master)))
+    memcached_st *memc;
+    if ((memc= memcached_clone(NULL, master)))
     {
-      rc= MEMCACHED_SOME_ERRORS;
+      memcached_free(released);
+      released= memc;
     }
   }
 
-  pool->server_pool[++pool->firstfree]= released;
+  server_pool[++firstfree]= released;
 
-  if (pool->firstfree == 0 && pool->current_size == pool->size)
+  if (firstfree == 0 and current_size == size)
   {
     /* we might have people waiting for a connection.. wake them up :-) */
-    pthread_cond_broadcast(&pool->cond);
+    pthread_cond_broadcast(&cond);
   }
 
-  memcached_return_t rval= mutex_exit(&pool->mutex);
-  if (rc == MEMCACHED_SOME_ERRORS)
-    return rc;
+  (void)pthread_mutex_unlock(&mutex);
 
-  return rval;
+  return true;
+}
+
+memcached_st* memcached_pool_fetch(memcached_pool_st* pool, struct timespec* relative_time, memcached_return_t* rc)
+{
+  if (pool == NULL)
+  {
+    return NULL;
+  }
+
+  memcached_return_t unused;
+  if (rc == NULL)
+  {
+    rc= &unused;
+  }
+
+  if (relative_time == NULL)
+  {
+    return pool->fetch(*rc);
+  }
+
+  return pool->fetch(*relative_time, *rc);
+}
+
+memcached_st* memcached_pool_pop(memcached_pool_st* pool,
+                                 bool block,
+                                 memcached_return_t *rc)
+{
+  if (pool == NULL)
+  {
+    return NULL;
+  }
+
+  memcached_return_t unused;
+  if (rc == NULL)
+  {
+    rc= &unused;
+  }
+
+  memcached_st *memc;
+  if (block)
+  {
+    memc= pool->fetch(pool->timeout(), *rc);
+  }
+  else
+  {
+    memc= pool->fetch(*rc);
+  }
+
+  return memc;
+}
+
+memcached_return_t memcached_pool_release(memcached_pool_st* pool, memcached_st *released)
+{
+  if (pool == NULL)
+  {
+    return MEMCACHED_INVALID_ARGUMENTS;
+  }
+
+  memcached_return_t rc;
+
+  (void) pool->release(released, rc);
+
+  return rc;
+}
+
+memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
+{
+  return memcached_pool_release(pool, released);
 }
 
 
@@ -331,18 +415,21 @@ memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
                                                memcached_behavior_t flag,
                                                uint64_t data)
 {
-  if (not pool)
+  if (pool == NULL)
+  {
     return MEMCACHED_INVALID_ARGUMENTS;
+  }
 
-  memcached_return_t rc= mutex_enter(&pool->mutex);
-  if (rc != MEMCACHED_SUCCESS)
-    return rc;
+  if (pthread_mutex_lock(&pool->mutex))
+  {
+    return MEMCACHED_IN_PROGRESS;
+  }
 
   /* update the master */
-  rc= memcached_behavior_set(pool->master, flag, data);
-  if (rc != MEMCACHED_SUCCESS)
+  memcached_return_t rc= memcached_behavior_set(pool->master, flag, data);
+  if (memcached_failed(rc))
   {
-    mutex_exit(&pool->mutex);
+    (void)pthread_mutex_unlock(&pool->mutex);
     return rc;
   }
 
@@ -350,16 +437,17 @@ memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
   /* update the clones */
   for (int xx= 0; xx <= pool->firstfree; ++xx)
   {
-    rc= memcached_behavior_set(pool->server_pool[xx], flag, data);
-    if (rc == MEMCACHED_SUCCESS)
+    if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data)))
     {
       pool->server_pool[xx]->configure.version= pool->version();
     }
     else
     {
-      memcached_free(pool->server_pool[xx]);
-      if (not (pool->server_pool[xx]= memcached_clone(NULL, pool->master)))
+      memcached_st *memc;
+      if ((memc= memcached_clone(NULL, pool->master)))
       {
+        memcached_free(pool->server_pool[xx]);
+        pool->server_pool[xx]= memc;
         /* I'm not sure what to do in this case.. this would happen
           if we fail to push the server list inside the client..
           I should add a testcase for this, but I believe the following
@@ -370,23 +458,28 @@ memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
     }
   }
 
-  return mutex_exit(&pool->mutex);
+  (void)pthread_mutex_unlock(&pool->mutex);
+
+  return rc;
 }
 
 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
                                                memcached_behavior_t flag,
                                                uint64_t *value)
 {
-  if (! pool)
+  if (pool == NULL)
+  {
     return MEMCACHED_INVALID_ARGUMENTS;
+  }
 
-  memcached_return_t rc= mutex_enter(&pool->mutex);
-  if (rc != MEMCACHED_SUCCESS)
+  if (pthread_mutex_lock(&pool->mutex))
   {
-    return rc;
+    return MEMCACHED_IN_PROGRESS;
   }
 
   *value= memcached_behavior_get(pool->master, flag);
 
-  return mutex_exit(&pool->mutex);
+  (void)pthread_mutex_unlock(&pool->mutex);
+
+  return MEMCACHED_SUCCESS;
 }