Add test case for situation where callback shouldn't be set because of behavior settings.
[awesomized/libmemcached] / libmemcached / util / pool.cc
index e89789d4aadccc63eba79816249f497c8700b963..47f46eec224b7d726ecc4fd3c9e446cbb04221e5 100644 (file)
 #include <libmemcached/common.h>
 #include <libmemcached/memcached_util.h>
 
+#include <libmemcached/error.hpp>
+
 #include <cassert>
 #include <cerrno>
 #include <pthread.h>
 #include <memory>
 
-static bool grow_pool(memcached_pool_st* pool);
-
 struct memcached_pool_st
 {
   pthread_mutex_t mutex;
@@ -56,6 +56,7 @@ struct memcached_pool_st
   const uint32_t size;
   uint32_t current_size;
   bool _owns_master;
+  struct timespec _timeout;
 
   memcached_pool_st(memcached_st *master_arg, size_t max_arg) :
     master(master_arg),
@@ -67,26 +68,21 @@ struct memcached_pool_st
   {
     pthread_mutex_init(&mutex, NULL);
     pthread_cond_init(&cond, NULL);
+    _timeout.tv_sec= 5;
+    _timeout.tv_nsec= 0;
   }
 
-  bool init(uint32_t initial)
+  const struct timespec& timeout() const
   {
-    server_pool= new (std::nothrow) memcached_st *[size];
-    if (not server_pool)
-      return false;
+    return _timeout;
+  }
 
-    /*
-      Try to create the initial size of the pool. An allocation failure at
-      this time is not fatal..
-    */
-    for (unsigned int x= 0; x < initial; ++x)
-    {
-      if (not grow_pool(this))
-        break;
-    }
+  bool release(memcached_st*, memcached_return_t& rc);
 
-    return true;
-  }
+  memcached_st *fetch(memcached_return_t& rc);
+  memcached_st *fetch(const struct timespec&, memcached_return_t& rc);
+
+  bool init(uint32_t initial);
 
   ~memcached_pool_st()
   {
@@ -121,27 +117,6 @@ struct memcached_pool_st
   }
 };
 
-static memcached_return_t mutex_enter(pthread_mutex_t *mutex)
-{
-  int ret;
-  do
-  {
-    ret= pthread_mutex_lock(mutex);
-  } while (ret == -1 && errno == EINTR);
-
-  return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
-}
-
-static memcached_return_t mutex_exit(pthread_mutex_t *mutex)
-{
-  int ret;
-  do
-  {
-    ret= pthread_mutex_unlock(mutex);
-  } while (ret == -1 && errno == EINTR);
-
-  return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
-}
 
 /**
  * Grow the connection pool by creating a connection structure and clone the
@@ -149,6 +124,8 @@ static memcached_return_t mutex_exit(pthread_mutex_t *mutex)
  */
 static bool grow_pool(memcached_pool_st* pool)
 {
+  assert(pool);
+
   memcached_st *obj;
   if (not (obj= memcached_clone(NULL, pool->master)))
   {
@@ -162,18 +139,38 @@ static bool grow_pool(memcached_pool_st* pool)
   return true;
 }
 
+bool memcached_pool_st::init(uint32_t initial)
+{
+  server_pool= new (std::nothrow) memcached_st *[size];
+  if (not server_pool)
+    return false;
+
+  /*
+    Try to create the initial size of the pool. An allocation failure at
+    this time is not fatal..
+  */
+  for (unsigned int x= 0; x < initial; ++x)
+  {
+    if (grow_pool(this) == false)
+    {
+      break;
+    }
+  }
+
+  return true;
+}
+
+
 static inline memcached_pool_st *_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
 {
   if (initial == 0 or max == 0 or (initial > max))
   {
-    errno= EINVAL;
     return NULL;
   }
 
   memcached_pool_st *object= new (std::nothrow) memcached_pool_st(master, max);
   if (object == NULL)
   {
-    errno= ENOMEM; // Set this for the failed calloc
     return NULL;
   }
 
@@ -208,10 +205,8 @@ memcached_pool_st * memcached_pool(const char *option_string, size_t option_stri
   if (self == NULL)
   {
     memcached_free(memc);
-    errno= ENOMEM;
     return NULL;
   }
-  errno= 0;
 
   self->_owns_master= true;
 
@@ -239,121 +234,180 @@ memcached_st*  memcached_pool_destroy(memcached_pool_st* pool)
   return ret;
 }
 
-memcached_st* memcached_pool_pop(memcached_pool_st* pool,
-                                 bool block,
-                                 memcached_return_t *rc)
+memcached_st* memcached_pool_st::fetch(memcached_return_t& rc)
 {
-  memcached_return_t not_used;
-  if (rc == NULL)
-  {
-    rc= &not_used;
-  }
-
-  if (pool == NULL)
-  {
-    *rc= MEMCACHED_INVALID_ARGUMENTS;
-    errno= EINVAL;
+  static struct timespec relative_time= { 0, 0 };
+  return fetch(relative_time, rc);
+}
 
-    return NULL;
-  }
+memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, memcached_return_t& rc)
+{
+  rc= MEMCACHED_SUCCESS;
 
-  if (memcached_failed((*rc= mutex_enter(&pool->mutex))))
+  if (pthread_mutex_lock(&mutex))
   {
+    rc= MEMCACHED_IN_PROGRESS;
     return NULL;
   }
 
   memcached_st *ret= NULL;
   do
   {
-    if (pool->firstfree > -1)
+    if (firstfree > -1)
     {
-      ret= pool->server_pool[pool->firstfree--];
+      ret= server_pool[firstfree--];
     }
-    else if (pool->current_size == pool->size)
+    else if (current_size == size)
     {
-      if (block == false)
+      if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0)
       {
-        *rc= mutex_exit(&pool->mutex); // this should be a different error
+        pthread_mutex_unlock(&mutex);
+        rc= MEMCACHED_NOTFOUND;
+
         return NULL;
       }
 
       struct timespec time_to_wait= {0, 0};
-      time_to_wait.tv_sec = time(NULL) +5;
+      time_to_wait.tv_sec= time(NULL) +relative_time.tv_sec;
+      time_to_wait.tv_nsec= relative_time.tv_nsec;
+
       int thread_ret;
-      if ((thread_ret= pthread_cond_timedwait(&pool->cond, &pool->mutex, &time_to_wait)) != 0)
+      if ((thread_ret= pthread_cond_timedwait(&cond, &mutex, &time_to_wait)) != 0)
       {
-        mutex_exit(&pool->mutex);
-        errno= thread_ret;
+        pthread_mutex_unlock(&mutex);
 
         if (thread_ret == ETIMEDOUT)
         {
-          *rc= MEMCACHED_TIMEOUT;
+          rc= MEMCACHED_TIMEOUT;
         }
         else
         {
-          *rc= MEMCACHED_ERRNO;
+          errno= thread_ret;
+          rc= MEMCACHED_ERRNO;
         }
 
         return NULL;
       }
     }
-    else if (not grow_pool(pool))
+    else if (grow_pool(this) == false)
     {
-      (void)mutex_exit(&pool->mutex);
-      *rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+      (void)pthread_mutex_unlock(&mutex);
       return NULL;
     }
   } while (ret == NULL);
 
-  *rc= mutex_exit(&pool->mutex);
+  pthread_mutex_unlock(&mutex);
 
   return ret;
 }
 
-memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
+bool memcached_pool_st::release(memcached_st *released, memcached_return_t& rc)
 {
-  if (pool == NULL)
-  {
-    return MEMCACHED_INVALID_ARGUMENTS;
-  }
-
+  rc= MEMCACHED_SUCCESS;
   if (released == NULL)
   {
-    return MEMCACHED_INVALID_ARGUMENTS;
+    rc= MEMCACHED_INVALID_ARGUMENTS;
+    return false;
   }
 
-  memcached_return_t rc= mutex_enter(&pool->mutex);
-
-  if (rc != MEMCACHED_SUCCESS)
+  if (pthread_mutex_lock(&mutex))
   {
-    return rc;
+    rc= MEMCACHED_IN_PROGRESS;
+    return false;
   }
 
-  /* Someone updated the behavior on the object.. */
-  if (pool->compare_version(released) == false)
+  /* 
+    Someone updated the behavior on the object, so we clone a new memcached_st with the new settings. If we fail to clone, we keep the old one around.
+  */
+  if (compare_version(released) == false)
   {
-    memcached_free(released);
-    if (not (released= memcached_clone(NULL, pool->master)))
+    memcached_st *memc;
+    if ((memc= memcached_clone(NULL, master)))
     {
-      rc= MEMCACHED_SOME_ERRORS;
+      memcached_free(released);
+      released= memc;
     }
   }
 
-  pool->server_pool[++pool->firstfree]= released;
+  server_pool[++firstfree]= released;
 
-  if (pool->firstfree == 0 and pool->current_size == pool->size)
+  if (firstfree == 0 and current_size == size)
   {
     /* we might have people waiting for a connection.. wake them up :-) */
-    pthread_cond_broadcast(&pool->cond);
+    pthread_cond_broadcast(&cond);
   }
 
-  memcached_return_t rval= mutex_exit(&pool->mutex);
-  if (rc == MEMCACHED_SOME_ERRORS)
+  (void)pthread_mutex_unlock(&mutex);
+
+  return true;
+}
+
+memcached_st* memcached_pool_fetch(memcached_pool_st* pool, struct timespec* relative_time, memcached_return_t* rc)
+{
+  if (pool == NULL)
   {
-    return rc;
+    return NULL;
+  }
+
+  memcached_return_t unused;
+  if (rc == NULL)
+  {
+    rc= &unused;
   }
 
-  return rval;
+  if (relative_time == NULL)
+  {
+    return pool->fetch(*rc);
+  }
+
+  return pool->fetch(*relative_time, *rc);
+}
+
+memcached_st* memcached_pool_pop(memcached_pool_st* pool,
+                                 bool block,
+                                 memcached_return_t *rc)
+{
+  if (pool == NULL)
+  {
+    return NULL;
+  }
+
+  memcached_return_t unused;
+  if (rc == NULL)
+  {
+    rc= &unused;
+  }
+
+  memcached_st *memc;
+  if (block)
+  {
+    memc= pool->fetch(pool->timeout(), *rc);
+  }
+  else
+  {
+    memc= pool->fetch(*rc);
+  }
+
+  return memc;
+}
+
+memcached_return_t memcached_pool_release(memcached_pool_st* pool, memcached_st *released)
+{
+  if (pool == NULL)
+  {
+    return MEMCACHED_INVALID_ARGUMENTS;
+  }
+
+  memcached_return_t rc;
+
+  (void) pool->release(released, rc);
+
+  return rc;
+}
+
+memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
+{
+  return memcached_pool_release(pool, released);
 }
 
 
@@ -366,15 +420,16 @@ memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
     return MEMCACHED_INVALID_ARGUMENTS;
   }
 
-  memcached_return_t rc= mutex_enter(&pool->mutex);
-  if (rc != MEMCACHED_SUCCESS)
-    return rc;
+  if (pthread_mutex_lock(&pool->mutex))
+  {
+    return MEMCACHED_IN_PROGRESS;
+  }
 
   /* update the master */
-  rc= memcached_behavior_set(pool->master, flag, data);
-  if (rc != MEMCACHED_SUCCESS)
+  memcached_return_t rc= memcached_behavior_set(pool->master, flag, data);
+  if (memcached_failed(rc))
   {
-    mutex_exit(&pool->mutex);
+    (void)pthread_mutex_unlock(&pool->mutex);
     return rc;
   }
 
@@ -382,16 +437,17 @@ memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
   /* update the clones */
   for (int xx= 0; xx <= pool->firstfree; ++xx)
   {
-    rc= memcached_behavior_set(pool->server_pool[xx], flag, data);
-    if (rc == MEMCACHED_SUCCESS)
+    if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data)))
     {
       pool->server_pool[xx]->configure.version= pool->version();
     }
     else
     {
-      memcached_free(pool->server_pool[xx]);
-      if (not (pool->server_pool[xx]= memcached_clone(NULL, pool->master)))
+      memcached_st *memc;
+      if ((memc= memcached_clone(NULL, pool->master)))
       {
+        memcached_free(pool->server_pool[xx]);
+        pool->server_pool[xx]= memc;
         /* I'm not sure what to do in this case.. this would happen
           if we fail to push the server list inside the client..
           I should add a testcase for this, but I believe the following
@@ -402,7 +458,9 @@ memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
     }
   }
 
-  return mutex_exit(&pool->mutex);
+  (void)pthread_mutex_unlock(&pool->mutex);
+
+  return rc;
 }
 
 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
@@ -414,13 +472,14 @@ memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
     return MEMCACHED_INVALID_ARGUMENTS;
   }
 
-  memcached_return_t rc= mutex_enter(&pool->mutex);
-  if (rc != MEMCACHED_SUCCESS)
+  if (pthread_mutex_lock(&pool->mutex))
   {
-    return rc;
+    return MEMCACHED_IN_PROGRESS;
   }
 
   *value= memcached_behavior_get(pool->master, flag);
 
-  return mutex_exit(&pool->mutex);
+  (void)pthread_mutex_unlock(&pool->mutex);
+
+  return MEMCACHED_SUCCESS;
 }