Merge Trond's protocol work.
[m6w6/libmemcached] / libmemcachedutil / memcached_pool.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include "libmemcached/common.h"
3 #include "libmemcached/memcached_pool.h"
4
5 #include <errno.h>
6 #include <pthread.h>
7
8 struct memcached_pool_st
9 {
10 pthread_mutex_t mutex;
11 pthread_cond_t cond;
12 memcached_st *master;
13 memcached_st **mmc;
14 int firstfree;
15 uint32_t size;
16 uint32_t current_size;
17 char* version;
18 };
19
20 static memcached_return mutex_enter(pthread_mutex_t *mutex)
21 {
22 int ret;
23 do
24 ret= pthread_mutex_lock(mutex);
25 while (ret == -1 && errno == EINTR);
26
27 return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
28 }
29
30 static memcached_return mutex_exit(pthread_mutex_t *mutex) {
31 int ret;
32 do
33 ret= pthread_mutex_unlock(mutex);
34 while (ret == -1 && errno == EINTR);
35
36 return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS;
37 }
38
39 /**
40 * Grow the connection pool by creating a connection structure and clone the
41 * original memcached handle.
42 */
43 static int grow_pool(memcached_pool_st* pool) {
44 memcached_st *obj= calloc(1, sizeof(*obj));
45 if (obj == NULL)
46 return -1;
47
48 if (memcached_clone(obj, pool->master) == NULL)
49 {
50 free(obj);
51 return -1;
52 }
53
54 pool->mmc[++pool->firstfree] = obj;
55 pool->current_size++;
56
57 return 0;
58 }
59
60 memcached_pool_st *memcached_pool_create(memcached_st* mmc,
61 uint32_t initial, uint32_t max)
62 {
63 memcached_pool_st* ret = NULL;
64 memcached_pool_st object = { .mutex = PTHREAD_MUTEX_INITIALIZER,
65 .cond = PTHREAD_COND_INITIALIZER,
66 .master = mmc,
67 .mmc = calloc(max, sizeof(memcached_st*)),
68 .firstfree = -1,
69 .size = max,
70 .current_size = 0 };
71
72 if (object.mmc != NULL)
73 {
74 ret= calloc(1, sizeof(*ret));
75 if (ret == NULL)
76 {
77 free(object.mmc);
78 return NULL;
79 }
80
81 *ret = object;
82
83 /* Try to create the initial size of the pool. An allocation failure at
84 * this time is not fatal..
85 */
86 for (unsigned int ii=0; ii < initial; ++ii)
87 if (grow_pool(ret) == -1)
88 break;
89 }
90
91 return ret;
92 }
93
94 memcached_st* memcached_pool_destroy(memcached_pool_st* pool)
95 {
96 memcached_st *ret = pool->master;
97
98 for (int xx= 0; xx <= pool->firstfree; ++xx)
99 {
100 memcached_free(pool->mmc[xx]);
101 free(pool->mmc[xx]);
102 pool->mmc[xx] = NULL;
103 }
104
105 pthread_mutex_destroy(&pool->mutex);
106 pthread_cond_destroy(&pool->cond);
107 free(pool->mmc);
108 free(pool);
109
110 return ret;
111 }
112
113 memcached_st* memcached_pool_pop(memcached_pool_st* pool,
114 bool block,
115 memcached_return *rc)
116 {
117 memcached_st *ret= NULL;
118 if ((*rc= mutex_enter(&pool->mutex)) != MEMCACHED_SUCCESS)
119 return NULL;
120
121 do
122 {
123 if (pool->firstfree > -1)
124 ret= pool->mmc[pool->firstfree--];
125 else if (pool->current_size == pool->size)
126 {
127 if (!block)
128 {
129 *rc= mutex_exit(&pool->mutex);
130 return NULL;
131 }
132
133 if (pthread_cond_wait(&pool->cond, &pool->mutex) == -1)
134 {
135 int err = errno;
136 mutex_exit(&pool->mutex);
137 errno = err;
138 *rc= MEMCACHED_ERRNO;
139 return NULL;
140 }
141 }
142 else if (grow_pool(pool) == -1)
143 {
144 *rc= mutex_exit(&pool->mutex);
145 return NULL;
146 }
147 }
148 while (ret == NULL);
149
150 *rc= mutex_exit(&pool->mutex);
151
152 return ret;
153 }
154
155 memcached_return memcached_pool_push(memcached_pool_st* pool,
156 memcached_st *mmc)
157 {
158 memcached_return rc= mutex_enter(&pool->mutex);
159
160 if (rc != MEMCACHED_SUCCESS)
161 return rc;
162
163 char* version = memcached_get_user_data(mmc);
164 /* Someone updated the behavior on the object.. */
165 if (version != pool->version)
166 {
167 memcached_free(mmc);
168 memset(mmc, 0, sizeof(*mmc));
169 if (memcached_clone(mmc, pool->master) == NULL)
170 {
171 rc= MEMCACHED_SOME_ERRORS;
172 }
173 }
174
175 pool->mmc[++pool->firstfree]= mmc;
176
177 if (pool->firstfree == 0 && pool->current_size == pool->size)
178 {
179 /* we might have people waiting for a connection.. wake them up :-) */
180 pthread_cond_broadcast(&pool->cond);
181 }
182
183 memcached_return rval= mutex_exit(&pool->mutex);
184 if (rc == MEMCACHED_SOME_ERRORS)
185 return rc;
186
187 return rval;
188 }
189
190
191 memcached_return memcached_pool_behavior_set(memcached_pool_st *pool,
192 memcached_behavior flag,
193 uint64_t data)
194 {
195
196 memcached_return rc= mutex_enter(&pool->mutex);
197 if (rc != MEMCACHED_SUCCESS)
198 return rc;
199
200 /* update the master */
201 rc= memcached_behavior_set(pool->master, flag, data);
202 if (rc != MEMCACHED_SUCCESS)
203 {
204 mutex_exit(&pool->mutex);
205 return rc;
206 }
207
208 ++pool->version;
209 memcached_set_user_data(pool->master, pool->version);
210 /* update the clones */
211 for (int xx= 0; xx <= pool->firstfree; ++xx)
212 {
213 rc= memcached_behavior_set(pool->mmc[xx], flag, data);
214 if (rc == MEMCACHED_SUCCESS)
215 memcached_set_user_data(pool->mmc[xx], pool->version);
216 else
217 {
218 memcached_free(pool->mmc[xx]);
219 memset(pool->mmc[xx], 0, sizeof(*pool->mmc[xx]));
220 if (memcached_clone(pool->mmc[xx], pool->master) == NULL)
221 {
222 /* I'm not sure what to do in this case.. this would happen
223 if we fail to push the server list inside the client..
224 I should add a testcase for this, but I believe the following
225 would work, except that you would add a hole in the pool list..
226 in theory you could end up with an empty pool....
227 */
228 free(pool->mmc[xx]);
229 pool->mmc[xx]= NULL;
230 }
231 }
232 }
233
234 return mutex_exit(&pool->mutex);
235 }
236
237 memcached_return memcached_pool_behavior_get(memcached_pool_st *pool,
238 memcached_behavior flag,
239 uint64_t *value)
240 {
241 memcached_return rc= mutex_enter(&pool->mutex);
242 if (rc != MEMCACHED_SUCCESS)
243 return rc;
244 *value= memcached_behavior_get(pool->master, flag);
245 return mutex_exit(&pool->mutex);
246 }