#include <libmemcached/common.h>
#include <libmemcached/memcached_util.h>
+#include <libmemcached/error.hpp>
+
#include <cassert>
#include <cerrno>
#include <pthread.h>
#include <memory>
-static bool grow_pool(memcached_pool_st* pool);
-
struct memcached_pool_st
{
pthread_mutex_t mutex;
const uint32_t size;
uint32_t current_size;
bool _owns_master;
+ struct timespec _timeout;
memcached_pool_st(memcached_st *master_arg, size_t max_arg) :
master(master_arg),
{
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
+ _timeout.tv_sec= 5;
+ _timeout.tv_nsec= 0;
}
- bool init(uint32_t initial)
+ const struct timespec& timeout() const
{
- server_pool= new (std::nothrow) memcached_st *[size];
- if (not server_pool)
- return false;
+ return _timeout;
+ }
- /*
- Try to create the initial size of the pool. An allocation failure at
- this time is not fatal..
- */
- for (unsigned int x= 0; x < initial; ++x)
- {
- if (not grow_pool(this))
- break;
- }
+ bool release(memcached_st*, memcached_return_t& rc);
- return true;
- }
+ memcached_st *fetch(memcached_return_t& rc);
+ memcached_st *fetch(const struct timespec&, memcached_return_t& rc);
+
+ bool init(uint32_t initial);
~memcached_pool_st()
{
}
};
-static memcached_return_t mutex_enter(pthread_mutex_t *mutex)
-{
- int ret;
- do
- {
- ret= pthread_mutex_lock(mutex);
- } while (ret == -1 && errno == EINTR);
-
- return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
-}
-
-static memcached_return_t mutex_exit(pthread_mutex_t *mutex)
-{
- int ret;
- do
- {
- ret= pthread_mutex_unlock(mutex);
- } while (ret == -1 && errno == EINTR);
-
- return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
-}
/**
* Grow the connection pool by creating a connection structure and clone the
*/
static bool grow_pool(memcached_pool_st* pool)
{
+ assert(pool);
+
memcached_st *obj;
if (not (obj= memcached_clone(NULL, pool->master)))
{
return true;
}
+bool memcached_pool_st::init(uint32_t initial)
+{
+ server_pool= new (std::nothrow) memcached_st *[size];
+ if (not server_pool)
+ return false;
+
+ /*
+ Try to create the initial size of the pool. An allocation failure at
+ this time is not fatal..
+ */
+ for (unsigned int x= 0; x < initial; ++x)
+ {
+ if (grow_pool(this) == false)
+ {
+ break;
+ }
+ }
+
+ return true;
+}
+
+
static inline memcached_pool_st *_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
{
- if (! initial || ! max || initial > max)
+ if (initial == 0 or max == 0 or (initial > max))
{
- errno= EINVAL;
return NULL;
}
memcached_pool_st *object= new (std::nothrow) memcached_pool_st(master, max);
- if (not object)
+ if (object == NULL)
{
- errno= ENOMEM; // Set this for the failed calloc
return NULL;
}
{
memcached_st *memc= memcached(option_string, option_string_length);
- if (not memc)
+ if (memc == NULL)
+ {
return NULL;
+ }
- memcached_pool_st *self;
- self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
- if (not self)
+ memcached_pool_st *self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
+ if (self == NULL)
{
memcached_free(memc);
- errno= ENOMEM;
return NULL;
}
- errno= 0;
self->_owns_master= true;
memcached_st* memcached_pool_destroy(memcached_pool_st* pool)
{
- if (not pool)
+ if (pool == NULL)
+ {
return NULL;
+ }
// Legacy that we return the original structure
memcached_st *ret= NULL;
return ret;
}
-memcached_st* memcached_pool_pop(memcached_pool_st* pool,
- bool block,
- memcached_return_t *rc)
+memcached_st* memcached_pool_st::fetch(memcached_return_t& rc)
{
- assert(pool);
- assert(rc);
- if (not pool || not rc)
- {
- errno= EINVAL;
- return NULL;
- }
+ static struct timespec relative_time= { 0, 0 };
+ return fetch(relative_time, rc);
+}
+
+memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, memcached_return_t& rc)
+{
+ rc= MEMCACHED_SUCCESS;
- if ((*rc= mutex_enter(&pool->mutex)) != MEMCACHED_SUCCESS)
+ if (pthread_mutex_lock(&mutex))
{
+ rc= MEMCACHED_IN_PROGRESS;
return NULL;
}
memcached_st *ret= NULL;
do
{
- if (pool->firstfree > -1)
+ if (firstfree > -1)
{
- ret= pool->server_pool[pool->firstfree--];
+ ret= server_pool[firstfree--];
}
- else if (pool->current_size == pool->size)
+ else if (current_size == size)
{
- if (not block)
+ if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0)
{
- *rc= mutex_exit(&pool->mutex); // this should be a different error
+ pthread_mutex_unlock(&mutex);
+ rc= MEMCACHED_NOTFOUND;
+
return NULL;
}
- if (pthread_cond_wait(&pool->cond, &pool->mutex) == -1)
+ struct timespec time_to_wait= {0, 0};
+ time_to_wait.tv_sec= time(NULL) +relative_time.tv_sec;
+ time_to_wait.tv_nsec= relative_time.tv_nsec;
+
+ int thread_ret;
+ if ((thread_ret= pthread_cond_timedwait(&cond, &mutex, &time_to_wait)) != 0)
{
- int err= errno;
- mutex_exit(&pool->mutex);
- errno= err;
- *rc= MEMCACHED_ERRNO;
+ pthread_mutex_unlock(&mutex);
+
+ if (thread_ret == ETIMEDOUT)
+ {
+ rc= MEMCACHED_TIMEOUT;
+ }
+ else
+ {
+ errno= thread_ret;
+ rc= MEMCACHED_ERRNO;
+ }
+
return NULL;
}
}
- else if (not grow_pool(pool))
+ else if (grow_pool(this) == false)
{
- (void)mutex_exit(&pool->mutex);
- *rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+ (void)pthread_mutex_unlock(&mutex);
return NULL;
}
- }
- while (ret == NULL);
+ } while (ret == NULL);
- *rc= mutex_exit(&pool->mutex);
+ pthread_mutex_unlock(&mutex);
return ret;
}
-memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
+bool memcached_pool_st::release(memcached_st *released, memcached_return_t& rc)
{
- if (not pool)
- return MEMCACHED_INVALID_ARGUMENTS;
-
- memcached_return_t rc= mutex_enter(&pool->mutex);
+ rc= MEMCACHED_SUCCESS;
+ if (released == NULL)
+ {
+ rc= MEMCACHED_INVALID_ARGUMENTS;
+ return false;
+ }
- if (rc != MEMCACHED_SUCCESS)
- return rc;
+ if (pthread_mutex_lock(&mutex))
+ {
+ rc= MEMCACHED_IN_PROGRESS;
+ return false;
+ }
- /* Someone updated the behavior on the object.. */
- if (not pool->compare_version(released))
+ /*
+ Someone updated the behavior on the object, so we clone a new memcached_st with the new settings. If we fail to clone, we keep the old one around.
+ */
+ if (compare_version(released) == false)
{
- memcached_free(released);
- if (not (released= memcached_clone(NULL, pool->master)))
+ memcached_st *memc;
+ if ((memc= memcached_clone(NULL, master)))
{
- rc= MEMCACHED_SOME_ERRORS;
+ memcached_free(released);
+ released= memc;
}
}
- pool->server_pool[++pool->firstfree]= released;
+ server_pool[++firstfree]= released;
- if (pool->firstfree == 0 && pool->current_size == pool->size)
+ if (firstfree == 0 and current_size == size)
{
/* we might have people waiting for a connection.. wake them up :-) */
- pthread_cond_broadcast(&pool->cond);
+ pthread_cond_broadcast(&cond);
}
- memcached_return_t rval= mutex_exit(&pool->mutex);
- if (rc == MEMCACHED_SOME_ERRORS)
- return rc;
+ (void)pthread_mutex_unlock(&mutex);
- return rval;
+ return true;
+}
+
+memcached_st* memcached_pool_fetch(memcached_pool_st* pool, struct timespec* relative_time, memcached_return_t* rc)
+{
+ if (pool == NULL)
+ {
+ return NULL;
+ }
+
+ memcached_return_t unused;
+ if (rc == NULL)
+ {
+ rc= &unused;
+ }
+
+ if (relative_time == NULL)
+ {
+ return pool->fetch(*rc);
+ }
+
+ return pool->fetch(*relative_time, *rc);
+}
+
+memcached_st* memcached_pool_pop(memcached_pool_st* pool,
+ bool block,
+ memcached_return_t *rc)
+{
+ if (pool == NULL)
+ {
+ return NULL;
+ }
+
+ memcached_return_t unused;
+ if (rc == NULL)
+ {
+ rc= &unused;
+ }
+
+ memcached_st *memc;
+ if (block)
+ {
+ memc= pool->fetch(pool->timeout(), *rc);
+ }
+ else
+ {
+ memc= pool->fetch(*rc);
+ }
+
+ return memc;
+}
+
+memcached_return_t memcached_pool_release(memcached_pool_st* pool, memcached_st *released)
+{
+ if (pool == NULL)
+ {
+ return MEMCACHED_INVALID_ARGUMENTS;
+ }
+
+ memcached_return_t rc;
+
+ (void) pool->release(released, rc);
+
+ return rc;
+}
+
+memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
+{
+ return memcached_pool_release(pool, released);
}
memcached_behavior_t flag,
uint64_t data)
{
- if (not pool)
+ if (pool == NULL)
+ {
return MEMCACHED_INVALID_ARGUMENTS;
+ }
- memcached_return_t rc= mutex_enter(&pool->mutex);
- if (rc != MEMCACHED_SUCCESS)
- return rc;
+ if (pthread_mutex_lock(&pool->mutex))
+ {
+ return MEMCACHED_IN_PROGRESS;
+ }
/* update the master */
- rc= memcached_behavior_set(pool->master, flag, data);
- if (rc != MEMCACHED_SUCCESS)
+ memcached_return_t rc= memcached_behavior_set(pool->master, flag, data);
+ if (memcached_failed(rc))
{
- mutex_exit(&pool->mutex);
+ (void)pthread_mutex_unlock(&pool->mutex);
return rc;
}
/* update the clones */
for (int xx= 0; xx <= pool->firstfree; ++xx)
{
- rc= memcached_behavior_set(pool->server_pool[xx], flag, data);
- if (rc == MEMCACHED_SUCCESS)
+ if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data)))
{
pool->server_pool[xx]->configure.version= pool->version();
}
else
{
- memcached_free(pool->server_pool[xx]);
- if (not (pool->server_pool[xx]= memcached_clone(NULL, pool->master)))
+ memcached_st *memc;
+ if ((memc= memcached_clone(NULL, pool->master)))
{
+ memcached_free(pool->server_pool[xx]);
+ pool->server_pool[xx]= memc;
/* I'm not sure what to do in this case.. this would happen
if we fail to push the server list inside the client..
I should add a testcase for this, but I believe the following
}
}
- return mutex_exit(&pool->mutex);
+ (void)pthread_mutex_unlock(&pool->mutex);
+
+ return rc;
}
memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
memcached_behavior_t flag,
uint64_t *value)
{
- if (! pool)
+ if (pool == NULL)
+ {
return MEMCACHED_INVALID_ARGUMENTS;
+ }
- memcached_return_t rc= mutex_enter(&pool->mutex);
- if (rc != MEMCACHED_SUCCESS)
+ if (pthread_mutex_lock(&pool->mutex))
{
- return rc;
+ return MEMCACHED_IN_PROGRESS;
}
*value= memcached_behavior_get(pool->master, flag);
- return mutex_exit(&pool->mutex);
+ (void)pthread_mutex_unlock(&pool->mutex);
+
+ return MEMCACHED_SUCCESS;
}