Fixed a typo that was causing a race condition error.
[awesomized/libmemcached] / libmemcached / util / pool.c
1 /* LibMemcached
2 * Copyright (C) 2010 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: connects to a host, and makes sure it is alive.
9 *
10 */
11
12 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
13 #include "libmemcached/common.h"
14 #include "libmemcached/memcached_util.h"
15
16 #include <errno.h>
17 #include <pthread.h>
18
19 struct memcached_pool_st
20 {
21 pthread_mutex_t mutex;
22 pthread_cond_t cond;
23 memcached_st *master;
24 memcached_st **mmc;
25 int firstfree;
26 uint32_t size;
27 uint32_t current_size;
28 char *version;
29 };
30
31 static memcached_return_t mutex_enter(pthread_mutex_t *mutex)
32 {
33 int ret;
34 do
35 ret= pthread_mutex_lock(mutex);
36 while (ret == -1 && errno == EINTR);
37
38 return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
39 }
40
41 static memcached_return_t mutex_exit(pthread_mutex_t *mutex)
42 {
43 int ret;
44 do
45 ret= pthread_mutex_unlock(mutex);
46 while (ret == -1 && errno == EINTR);
47
48 return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
49 }
50
51 /**
52 * Grow the connection pool by creating a connection structure and clone the
53 * original memcached handle.
54 */
55 static int grow_pool(memcached_pool_st* pool)
56 {
57 memcached_st *obj= calloc(1, sizeof(*obj));
58
59 if (obj == NULL)
60 return -1;
61
62 if (memcached_clone(obj, pool->master) == NULL)
63 {
64 free(obj);
65 return -1;
66 }
67
68 pool->mmc[++pool->firstfree] = obj;
69 pool->current_size++;
70
71 return 0;
72 }
73
74 memcached_pool_st *memcached_pool_create(memcached_st* mmc,
75 uint32_t initial, uint32_t max)
76 {
77 memcached_pool_st* ret = NULL;
78 memcached_pool_st object = { .mutex = PTHREAD_MUTEX_INITIALIZER,
79 .cond = PTHREAD_COND_INITIALIZER,
80 .master = mmc,
81 .mmc = calloc(max, sizeof(memcached_st*)),
82 .firstfree = -1,
83 .size = max,
84 .current_size = 0 };
85
86 if (object.mmc != NULL)
87 {
88 ret= calloc(1, sizeof(*ret));
89 if (ret == NULL)
90 {
91 free(object.mmc);
92 return NULL;
93 }
94
95 *ret = object;
96
97 /*
98 Try to create the initial size of the pool. An allocation failure at
99 this time is not fatal..
100 */
101 for (unsigned int ii= 0; ii < initial; ++ii)
102 {
103 if (grow_pool(ret) == -1)
104 break;
105 }
106 }
107
108 return ret;
109 }
110
111 memcached_st* memcached_pool_destroy(memcached_pool_st* pool)
112 {
113 memcached_st *ret = pool->master;
114
115 for (int xx= 0; xx <= pool->firstfree; ++xx)
116 {
117 memcached_free(pool->mmc[xx]);
118 free(pool->mmc[xx]);
119 pool->mmc[xx] = NULL;
120 }
121
122 pthread_mutex_destroy(&pool->mutex);
123 pthread_cond_destroy(&pool->cond);
124 free(pool->mmc);
125 free(pool);
126
127 return ret;
128 }
129
130 memcached_st* memcached_pool_pop(memcached_pool_st* pool,
131 bool block,
132 memcached_return_t *rc)
133 {
134 memcached_st *ret= NULL;
135 if ((*rc= mutex_enter(&pool->mutex)) != MEMCACHED_SUCCESS)
136 return NULL;
137
138 do
139 {
140 if (pool->firstfree > -1)
141 ret= pool->mmc[pool->firstfree--];
142 else if (pool->current_size == pool->size)
143 {
144 if (!block)
145 {
146 *rc= mutex_exit(&pool->mutex);
147 return NULL;
148 }
149
150 if (pthread_cond_wait(&pool->cond, &pool->mutex) == -1)
151 {
152 int err = errno;
153 mutex_exit(&pool->mutex);
154 errno = err;
155 *rc= MEMCACHED_ERRNO;
156 return NULL;
157 }
158 }
159 else if (grow_pool(pool) == -1)
160 {
161 *rc= mutex_exit(&pool->mutex);
162 return NULL;
163 }
164 }
165 while (ret == NULL);
166
167 *rc= mutex_exit(&pool->mutex);
168
169 return ret;
170 }
171
172 memcached_return_t memcached_pool_push(memcached_pool_st* pool,
173 memcached_st *mmc)
174 {
175 memcached_return_t rc= mutex_enter(&pool->mutex);
176
177 if (rc != MEMCACHED_SUCCESS)
178 return rc;
179
180 char* version= memcached_get_user_data(mmc);
181 /* Someone updated the behavior on the object.. */
182 if (version != pool->version)
183 {
184 memcached_free(mmc);
185 memset(mmc, 0, sizeof(*mmc));
186 if (memcached_clone(mmc, pool->master) == NULL)
187 {
188 rc= MEMCACHED_SOME_ERRORS;
189 }
190 }
191
192 pool->mmc[++pool->firstfree]= mmc;
193
194 if (pool->firstfree == 0 && pool->current_size == pool->size)
195 {
196 /* we might have people waiting for a connection.. wake them up :-) */
197 pthread_cond_broadcast(&pool->cond);
198 }
199
200 memcached_return_t rval= mutex_exit(&pool->mutex);
201 if (rc == MEMCACHED_SOME_ERRORS)
202 return rc;
203
204 return rval;
205 }
206
207
208 memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
209 memcached_behavior_t flag,
210 uint64_t data)
211 {
212
213 memcached_return_t rc= mutex_enter(&pool->mutex);
214 if (rc != MEMCACHED_SUCCESS)
215 return rc;
216
217 /* update the master */
218 rc= memcached_behavior_set(pool->master, flag, data);
219 if (rc != MEMCACHED_SUCCESS)
220 {
221 mutex_exit(&pool->mutex);
222 return rc;
223 }
224
225 ++pool->version;
226 memcached_set_user_data(pool->master, pool->version);
227 /* update the clones */
228 for (int xx= 0; xx <= pool->firstfree; ++xx)
229 {
230 rc= memcached_behavior_set(pool->mmc[xx], flag, data);
231 if (rc == MEMCACHED_SUCCESS)
232 memcached_set_user_data(pool->mmc[xx], pool->version);
233 else
234 {
235 memcached_free(pool->mmc[xx]);
236 memset(pool->mmc[xx], 0, sizeof(*pool->mmc[xx]));
237 if (memcached_clone(pool->mmc[xx], pool->master) == NULL)
238 {
239 /* I'm not sure what to do in this case.. this would happen
240 if we fail to push the server list inside the client..
241 I should add a testcase for this, but I believe the following
242 would work, except that you would add a hole in the pool list..
243 in theory you could end up with an empty pool....
244 */
245 free(pool->mmc[xx]);
246 pool->mmc[xx]= NULL;
247 }
248 }
249 }
250
251 return mutex_exit(&pool->mutex);
252 }
253
254 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
255 memcached_behavior_t flag,
256 uint64_t *value)
257 {
258 memcached_return_t rc= mutex_enter(&pool->mutex);
259
260 if (rc != MEMCACHED_SUCCESS)
261 {
262 return rc;
263 }
264
265 *value= memcached_behavior_get(pool->master, flag);
266
267 return mutex_exit(&pool->mutex);
268 }