e8be3c73e1850a8493e08b717d1d5da206df567d
[m6w6/libmemcached] / libmemcached / util / pool.c
1 /* LibMemcached
2 * Copyright (C) 2010 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: connects to a host, and makes sure it is alive.
9 *
10 */
11
12 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
13 #include "libmemcached/common.h"
14 #include "libmemcached/memcached_util.h"
15
16 #include <errno.h>
17 #include <pthread.h>
18
19 struct memcached_pool_st
20 {
21 pthread_mutex_t mutex;
22 pthread_cond_t cond;
23 memcached_st *master;
24 memcached_st **mmc;
25 int firstfree;
26 uint32_t size;
27 uint32_t current_size;
28 bool _owns_master;
29 char *version;
30 };
31
32 static memcached_return_t mutex_enter(pthread_mutex_t *mutex)
33 {
34 int ret;
35 do
36 ret= pthread_mutex_lock(mutex);
37 while (ret == -1 && errno == EINTR);
38
39 return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
40 }
41
42 static memcached_return_t mutex_exit(pthread_mutex_t *mutex)
43 {
44 int ret;
45 do
46 ret= pthread_mutex_unlock(mutex);
47 while (ret == -1 && errno == EINTR);
48
49 return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
50 }
51
52 /**
53 * Grow the connection pool by creating a connection structure and clone the
54 * original memcached handle.
55 */
56 static int grow_pool(memcached_pool_st* pool)
57 {
58 memcached_st *obj= calloc(1, sizeof(*obj));
59
60 if (obj == NULL)
61 return -1;
62
63 if (memcached_clone(obj, pool->master) == NULL)
64 {
65 free(obj);
66 return -1;
67 }
68
69 pool->mmc[++pool->firstfree] = obj;
70 pool->current_size++;
71
72 return EXIT_SUCCESS;
73 }
74
75 static inline memcached_pool_st *_pool_create(memcached_st* mmc, uint32_t initial, uint32_t max)
76 {
77 memcached_pool_st* ret= NULL;
78 memcached_pool_st object= { .mutex = PTHREAD_MUTEX_INITIALIZER,
79 .cond = PTHREAD_COND_INITIALIZER,
80 .master = mmc,
81 .mmc = calloc(max, sizeof(memcached_st*)),
82 .firstfree= -1,
83 .size = max,
84 .current_size= 0,
85 ._owns_master= false};
86
87 if (object.mmc != NULL)
88 {
89 ret= calloc(1, sizeof(*ret));
90 if (ret == NULL)
91 {
92 free(object.mmc);
93 return NULL;
94 }
95
96 *ret = object;
97
98 /*
99 Try to create the initial size of the pool. An allocation failure at
100 this time is not fatal..
101 */
102 for (unsigned int ii= 0; ii < initial; ++ii)
103 {
104 if (grow_pool(ret) == -1)
105 break;
106 }
107 }
108
109 return ret;
110 }
111
112 memcached_pool_st *memcached_pool_create(memcached_st* mmc, uint32_t initial, uint32_t max)
113 {
114 return _pool_create(mmc, initial, max);
115 }
116
117 memcached_pool_st * memcached_pool(const char *option_string, size_t option_string_length)
118 {
119 memcached_pool_st *self;
120 memcached_st *memc= memcached_create_with_options(option_string, option_string_length);
121
122 if (! memc)
123 return NULL;
124
125 self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
126 if (self)
127 {
128 self->_owns_master= true;
129 }
130 else
131 {
132 memcached_free(memc);
133 }
134
135 return self;
136 }
137
138 memcached_st* memcached_pool_destroy(memcached_pool_st* pool)
139 {
140 if (! pool)
141 return NULL;
142
143 memcached_st *ret= pool->master;
144
145 for (int xx= 0; xx <= pool->firstfree; ++xx)
146 {
147 memcached_free(pool->mmc[xx]);
148 free(pool->mmc[xx]);
149 pool->mmc[xx] = NULL;
150 }
151
152 pthread_mutex_destroy(&pool->mutex);
153 pthread_cond_destroy(&pool->cond);
154 free(pool->mmc);
155 if (pool->_owns_master)
156 {
157 memcached_free(pool->master);
158 ret= NULL;
159 }
160 free(pool);
161
162 return ret;
163 }
164
165 memcached_st* memcached_pool_pop(memcached_pool_st* pool,
166 bool block,
167 memcached_return_t *rc)
168 {
169 memcached_st *ret= NULL;
170 if ((*rc= mutex_enter(&pool->mutex)) != MEMCACHED_SUCCESS)
171 return NULL;
172
173 do
174 {
175 if (pool->firstfree > -1)
176 {
177 ret= pool->mmc[pool->firstfree--];
178 }
179 else if (pool->current_size == pool->size)
180 {
181 if (!block)
182 {
183 *rc= mutex_exit(&pool->mutex);
184 return NULL;
185 }
186
187 if (pthread_cond_wait(&pool->cond, &pool->mutex) == -1)
188 {
189 int err= errno;
190 mutex_exit(&pool->mutex);
191 errno= err;
192 *rc= MEMCACHED_ERRNO;
193 return NULL;
194 }
195 }
196 else if (grow_pool(pool) == -1)
197 {
198 *rc= mutex_exit(&pool->mutex);
199 return NULL;
200 }
201 }
202 while (ret == NULL);
203
204 *rc= mutex_exit(&pool->mutex);
205
206 return ret;
207 }
208
209 memcached_return_t memcached_pool_push(memcached_pool_st* pool,
210 memcached_st *mmc)
211 {
212 memcached_return_t rc= mutex_enter(&pool->mutex);
213
214 if (rc != MEMCACHED_SUCCESS)
215 return rc;
216
217 char* version= memcached_get_user_data(mmc);
218 /* Someone updated the behavior on the object.. */
219 if (version != pool->version)
220 {
221 memcached_free(mmc);
222 memset(mmc, 0, sizeof(*mmc));
223 if (memcached_clone(mmc, pool->master) == NULL)
224 {
225 rc= MEMCACHED_SOME_ERRORS;
226 }
227 }
228
229 pool->mmc[++pool->firstfree]= mmc;
230
231 if (pool->firstfree == 0 && pool->current_size == pool->size)
232 {
233 /* we might have people waiting for a connection.. wake them up :-) */
234 pthread_cond_broadcast(&pool->cond);
235 }
236
237 memcached_return_t rval= mutex_exit(&pool->mutex);
238 if (rc == MEMCACHED_SOME_ERRORS)
239 return rc;
240
241 return rval;
242 }
243
244
245 memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
246 memcached_behavior_t flag,
247 uint64_t data)
248 {
249
250 memcached_return_t rc= mutex_enter(&pool->mutex);
251 if (rc != MEMCACHED_SUCCESS)
252 return rc;
253
254 /* update the master */
255 rc= memcached_behavior_set(pool->master, flag, data);
256 if (rc != MEMCACHED_SUCCESS)
257 {
258 mutex_exit(&pool->mutex);
259 return rc;
260 }
261
262 ++pool->version;
263 memcached_set_user_data(pool->master, pool->version);
264 /* update the clones */
265 for (int xx= 0; xx <= pool->firstfree; ++xx)
266 {
267 rc= memcached_behavior_set(pool->mmc[xx], flag, data);
268 if (rc == MEMCACHED_SUCCESS)
269 {
270 memcached_set_user_data(pool->mmc[xx], pool->version);
271 }
272 else
273 {
274 memcached_free(pool->mmc[xx]);
275 memset(pool->mmc[xx], 0, sizeof(*pool->mmc[xx]));
276
277 if (memcached_clone(pool->mmc[xx], pool->master) == NULL)
278 {
279 /* I'm not sure what to do in this case.. this would happen
280 if we fail to push the server list inside the client..
281 I should add a testcase for this, but I believe the following
282 would work, except that you would add a hole in the pool list..
283 in theory you could end up with an empty pool....
284 */
285 free(pool->mmc[xx]);
286 pool->mmc[xx]= NULL;
287 }
288 }
289 }
290
291 return mutex_exit(&pool->mutex);
292 }
293
294 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
295 memcached_behavior_t flag,
296 uint64_t *value)
297 {
298 memcached_return_t rc= mutex_enter(&pool->mutex);
299
300 if (rc != MEMCACHED_SUCCESS)
301 {
302 return rc;
303 }
304
305 *value= memcached_behavior_get(pool->master, flag);
306
307 return mutex_exit(&pool->mutex);
308 }