Added option to enable TCP_KEEPALIVE on created sockets.
[awesomized/libmemcached] / libmemcached / behavior.c
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Change the behavior of the memcached connection.
9 *
10 */
11
12 #include "common.h"
13 #include <time.h>
14 #include <sys/types.h>
15 #include <sys/socket.h>
16 #include <netinet/tcp.h>
17
18 static bool set_flag(uint64_t data)
19 {
20 // Wordy :)
21 return data ? true : false;
22 }
23
24 /*
25 This function is used to modify the behavior of running client.
26
27 We quit all connections so we can reset the sockets.
28 */
29
30 memcached_return_t memcached_behavior_set(memcached_st *ptr,
31 const memcached_behavior_t flag,
32 uint64_t data)
33 {
34 switch (flag)
35 {
36 case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS:
37 ptr->number_of_replicas= (uint32_t)data;
38 break;
39 case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
40 ptr->io_msg_watermark= (uint32_t) data;
41 break;
42 case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
43 ptr->io_bytes_watermark= (uint32_t)data;
44 break;
45 case MEMCACHED_BEHAVIOR_IO_KEY_PREFETCH:
46 ptr->io_key_prefetch = (uint32_t)data;
47 break;
48 case MEMCACHED_BEHAVIOR_SND_TIMEOUT:
49 ptr->snd_timeout= (int32_t)data;
50 break;
51 case MEMCACHED_BEHAVIOR_RCV_TIMEOUT:
52 ptr->rcv_timeout= (int32_t)data;
53 break;
54 case MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT:
55 ptr->server_failure_limit= (uint32_t)data;
56 break;
57 case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
58 if (data)
59 {
60 ptr->flags.verify_key= false;
61 }
62 ptr->flags.binary_protocol= set_flag(data);
63 break;
64 case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
65 ptr->flags.support_cas= set_flag(data);
66 break;
67 case MEMCACHED_BEHAVIOR_NO_BLOCK:
68 ptr->flags.no_block= set_flag(data);
69 memcached_quit(ptr);
70 break;
71 case MEMCACHED_BEHAVIOR_BUFFER_REQUESTS:
72 ptr->flags.buffer_requests= set_flag(data);
73 memcached_quit(ptr);
74 break;
75 case MEMCACHED_BEHAVIOR_USE_UDP:
76 if (memcached_server_count(ptr))
77 {
78 return MEMCACHED_FAILURE;
79 }
80 ptr->flags.use_udp= set_flag(data);
81 if (data)
82 {
83 ptr->flags.no_reply= set_flag(data);
84 }
85 break;
86 case MEMCACHED_BEHAVIOR_TCP_NODELAY:
87 ptr->flags.tcp_nodelay= set_flag(data);
88 memcached_quit(ptr);
89 break;
90 case MEMCACHED_BEHAVIOR_TCP_KEEPALIVE:
91 ptr->flags.tcp_keepalive= set_flag(data);
92 memcached_quit(ptr);
93 break;
94 case MEMCACHED_BEHAVIOR_DISTRIBUTION:
95 return memcached_behavior_set_distribution(ptr, (memcached_server_distribution_t)data);
96 case MEMCACHED_BEHAVIOR_KETAMA:
97 {
98 if (data)
99 {
100 (void)memcached_behavior_set_key_hash(ptr, MEMCACHED_HASH_MD5);
101 (void)memcached_behavior_set_distribution_hash(ptr, MEMCACHED_HASH_MD5);
102 (void)memcached_behavior_set_distribution(ptr, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA);
103 }
104 else
105 {
106 (void)memcached_behavior_set_key_hash(ptr, MEMCACHED_HASH_DEFAULT);
107 (void)memcached_behavior_set_distribution_hash(ptr, MEMCACHED_HASH_DEFAULT);
108 (void)memcached_behavior_set_distribution(ptr, MEMCACHED_DISTRIBUTION_MODULA);
109 }
110
111 break;
112 }
113 case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED:
114 {
115 (void)memcached_behavior_set_key_hash(ptr, MEMCACHED_HASH_MD5);
116 (void)memcached_behavior_set_distribution_hash(ptr, MEMCACHED_HASH_MD5);
117 ptr->flags.ketama_weighted= set_flag(data);
118 /**
119 @note We try to keep the same distribution going. This should be deprecated and rewritten.
120 */
121 return memcached_behavior_set_distribution(ptr, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA);
122 }
123 case MEMCACHED_BEHAVIOR_HASH:
124 return memcached_behavior_set_key_hash(ptr, (memcached_hash_t)(data));
125 case MEMCACHED_BEHAVIOR_KETAMA_HASH:
126 return memcached_behavior_set_distribution_hash(ptr, (memcached_hash_t)(data));
127 case MEMCACHED_BEHAVIOR_CACHE_LOOKUPS:
128 ptr->flags.use_cache_lookups= set_flag(data);
129 memcached_quit(ptr);
130 break;
131 case MEMCACHED_BEHAVIOR_VERIFY_KEY:
132 if (ptr->flags.binary_protocol)
133 return MEMCACHED_FAILURE;
134 ptr->flags.verify_key= set_flag(data);
135 break;
136 case MEMCACHED_BEHAVIOR_SORT_HOSTS:
137 {
138 ptr->flags.use_sort_hosts= set_flag(data);
139 run_distribution(ptr);
140
141 break;
142 }
143 case MEMCACHED_BEHAVIOR_POLL_TIMEOUT:
144 ptr->poll_timeout= (int32_t)data;
145 break;
146 case MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT:
147 ptr->connect_timeout= (int32_t)data;
148 break;
149 case MEMCACHED_BEHAVIOR_RETRY_TIMEOUT:
150 ptr->retry_timeout= (int32_t)data;
151 break;
152 case MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE:
153 ptr->send_size= (int32_t)data;
154 memcached_quit(ptr);
155 break;
156 case MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE:
157 ptr->recv_size= (int32_t)data;
158 memcached_quit(ptr);
159 break;
160 case MEMCACHED_BEHAVIOR_USER_DATA:
161 return MEMCACHED_FAILURE;
162 case MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY:
163 ptr->flags.hash_with_prefix_key= set_flag(data);
164 break;
165 case MEMCACHED_BEHAVIOR_NOREPLY:
166 ptr->flags.no_reply= set_flag(data);
167 break;
168 case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS:
169 ptr->flags.auto_eject_hosts= set_flag(data);
170 break;
171 case MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ:
172 srandom((uint32_t) time(NULL));
173 ptr->flags.randomize_replica_read= set_flag(data);
174 break;
175 case MEMCACHED_BEHAVIOR_CORK:
176 {
177 memcached_server_write_instance_st instance;
178 bool action= set_flag(data);
179
180 if (action == false)
181 {
182 ptr->flags.cork= set_flag(false);
183 return MEMCACHED_SUCCESS;
184 }
185
186 instance= memcached_server_instance_fetch(ptr, 0);
187 if (! instance)
188 return MEMCACHED_NO_SERVERS;
189
190
191 /* We just try the first host, and if it is down we return zero */
192 memcached_return_t rc;
193 rc= memcached_connect(instance);
194 if (rc != MEMCACHED_SUCCESS)
195 {
196 return rc;
197 }
198
199 /* Now we test! */
200 memcached_ternary_t enabled;
201 enabled= test_cork(instance, true);
202
203 switch (enabled)
204 {
205 case MEM_FALSE:
206 return ptr->cached_errno ? MEMCACHED_ERRNO : MEMCACHED_FAILURE ;
207 case MEM_TRUE:
208 {
209 enabled= test_cork(instance, false);
210
211 if (enabled == false) // Possible bug in OS?
212 {
213 memcached_quit_server(instance, false); // We should reset everything on this error.
214 return MEMCACHED_ERRNO; // Errno will be true because we will have already set it.
215 }
216 ptr->flags.cork= true;
217 ptr->flags.tcp_nodelay= true;
218 memcached_quit(ptr); // We go on and reset the connections.
219 }
220 break;
221 case MEM_NOT:
222 default:
223 return MEMCACHED_NOT_SUPPORTED;
224 }
225 }
226 break;
227 case MEMCACHED_BEHAVIOR_MAX:
228 default:
229 /* Shouldn't get here */
230 WATCHPOINT_ASSERT(0);
231 return MEMCACHED_FAILURE;
232 }
233
234 return MEMCACHED_SUCCESS;
235 }
236
237 inline bool _is_auto_eject_host(const memcached_st *ptr)
238 {
239 return ptr->flags.auto_eject_hosts;
240 }
241
242 uint64_t memcached_behavior_get(memcached_st *ptr,
243 const memcached_behavior_t flag)
244 {
245 switch (flag)
246 {
247 case MEMCACHED_BEHAVIOR_NUMBER_OF_REPLICAS:
248 return ptr->number_of_replicas;
249 case MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK:
250 return ptr->io_msg_watermark;
251 case MEMCACHED_BEHAVIOR_IO_BYTES_WATERMARK:
252 return ptr->io_bytes_watermark;
253 case MEMCACHED_BEHAVIOR_IO_KEY_PREFETCH:
254 return ptr->io_key_prefetch;
255 case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
256 return ptr->flags.binary_protocol;
257 case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
258 return ptr->flags.support_cas;
259 case MEMCACHED_BEHAVIOR_CACHE_LOOKUPS:
260 return ptr->flags.use_cache_lookups;
261 case MEMCACHED_BEHAVIOR_NO_BLOCK:
262 return ptr->flags.no_block;
263 case MEMCACHED_BEHAVIOR_BUFFER_REQUESTS:
264 return ptr->flags.buffer_requests;
265 case MEMCACHED_BEHAVIOR_USE_UDP:
266 return ptr->flags.use_udp;
267 case MEMCACHED_BEHAVIOR_TCP_NODELAY:
268 return ptr->flags.tcp_nodelay;
269 case MEMCACHED_BEHAVIOR_VERIFY_KEY:
270 return ptr->flags.verify_key;
271 case MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED:
272 return ptr->flags.ketama_weighted;
273 case MEMCACHED_BEHAVIOR_DISTRIBUTION:
274 return ptr->distribution;
275 case MEMCACHED_BEHAVIOR_KETAMA:
276 return (ptr->distribution == MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA) ? (uint64_t) 1 : 0;
277 case MEMCACHED_BEHAVIOR_HASH:
278 return hashkit_get_function(&ptr->hashkit);
279 case MEMCACHED_BEHAVIOR_KETAMA_HASH:
280 return hashkit_get_function(&ptr->distribution_hashkit);
281 case MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT:
282 return ptr->server_failure_limit;
283 case MEMCACHED_BEHAVIOR_SORT_HOSTS:
284 return ptr->flags.use_sort_hosts;
285 case MEMCACHED_BEHAVIOR_POLL_TIMEOUT:
286 return (uint64_t)ptr->poll_timeout;
287 case MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT:
288 return (uint64_t)ptr->connect_timeout;
289 case MEMCACHED_BEHAVIOR_RETRY_TIMEOUT:
290 return (uint64_t)ptr->retry_timeout;
291 case MEMCACHED_BEHAVIOR_SND_TIMEOUT:
292 return (uint64_t)ptr->snd_timeout;
293 case MEMCACHED_BEHAVIOR_RCV_TIMEOUT:
294 return (uint64_t)ptr->rcv_timeout;
295 case MEMCACHED_BEHAVIOR_SOCKET_SEND_SIZE:
296 {
297 int sock_size= 0;
298 socklen_t sock_length= sizeof(int);
299 memcached_server_write_instance_st instance;
300
301 if (ptr->send_size != -1) // If value is -1 then we are using the default
302 return (uint64_t) ptr->send_size;
303
304 instance= memcached_server_instance_fetch(ptr, 0);
305
306 if (instance) // If we have an instance we test, otherwise we just set and pray
307 {
308 /* REFACTOR */
309 /* We just try the first host, and if it is down we return zero */
310 if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
311 return 0;
312
313 if (getsockopt(instance->fd, SOL_SOCKET,
314 SO_SNDBUF, &sock_size, &sock_length))
315 return 0; /* Zero means error */
316 }
317
318 return (uint64_t) sock_size;
319 }
320 case MEMCACHED_BEHAVIOR_SOCKET_RECV_SIZE:
321 {
322 int sock_size= 0;
323 socklen_t sock_length= sizeof(int);
324 memcached_server_write_instance_st instance;
325
326 if (ptr->recv_size != -1) // If value is -1 then we are using the default
327 return (uint64_t) ptr->recv_size;
328
329 instance= memcached_server_instance_fetch(ptr, 0);
330
331 /**
332 @note REFACTOR
333 */
334 if (instance)
335 {
336 /* We just try the first host, and if it is down we return zero */
337 if ((memcached_connect(instance)) != MEMCACHED_SUCCESS)
338 return 0;
339
340 if (getsockopt(instance->fd, SOL_SOCKET,
341 SO_RCVBUF, &sock_size, &sock_length))
342 return 0; /* Zero means error */
343
344 }
345
346 return (uint64_t) sock_size;
347 }
348 case MEMCACHED_BEHAVIOR_USER_DATA:
349 return MEMCACHED_FAILURE;
350 case MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY:
351 return ptr->flags.hash_with_prefix_key;
352 case MEMCACHED_BEHAVIOR_NOREPLY:
353 return ptr->flags.no_reply;
354 case MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS:
355 return ptr->flags.auto_eject_hosts;
356 case MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ:
357 return ptr->flags.randomize_replica_read;
358 case MEMCACHED_BEHAVIOR_CORK:
359 return ptr->flags.cork;
360 case MEMCACHED_BEHAVIOR_MAX:
361 default:
362 WATCHPOINT_ASSERT(0); /* Programming mistake if it gets this far */
363 return 0;
364 }
365
366 /* NOTREACHED */
367 }
368
369
370 memcached_return_t memcached_behavior_set_distribution(memcached_st *ptr, memcached_server_distribution_t type)
371 {
372 if (type < MEMCACHED_DISTRIBUTION_CONSISTENT_MAX)
373 {
374 ptr->distribution= type;
375 run_distribution(ptr);
376 }
377 else
378 {
379 return MEMCACHED_FAILURE;
380 }
381
382 return MEMCACHED_SUCCESS;
383 }
384
385
386 memcached_server_distribution_t memcached_behavior_get_distribution(memcached_st *ptr)
387 {
388 return ptr->distribution;
389 }
390
391 memcached_return_t memcached_behavior_set_key_hash(memcached_st *ptr, memcached_hash_t type)
392 {
393 hashkit_return_t rc;
394 rc= hashkit_set_function(&ptr->hashkit, (hashkit_hash_algorithm_t)type);
395
396 return rc == HASHKIT_SUCCESS ? MEMCACHED_SUCCESS : MEMCACHED_FAILURE;
397 }
398
399 memcached_hash_t memcached_behavior_get_key_hash(memcached_st *ptr)
400 {
401 return (memcached_hash_t)hashkit_get_function(&ptr->hashkit);
402 }
403
404 memcached_return_t memcached_behavior_set_distribution_hash(memcached_st *ptr, memcached_hash_t type)
405 {
406 hashkit_return_t rc;
407 rc= hashkit_set_function(&ptr->distribution_hashkit, (hashkit_hash_algorithm_t)type);
408
409 return rc == HASHKIT_SUCCESS ? MEMCACHED_SUCCESS : MEMCACHED_FAILURE;
410 }
411
412 memcached_hash_t memcached_behavior_get_distribution_hash(memcached_st *ptr)
413 {
414 return (memcached_hash_t)hashkit_get_function(&ptr->distribution_hashkit);
415 }