67ceacab7a252312fb8095338d9b878ed7c621bb
[m6w6/libmemcached] / libmemcached / connect.c
1 #include "common.h"
2 #include <netdb.h>
3 #include <poll.h>
4 #include <sys/time.h>
5
6 static memcached_return_t set_hostinfo(memcached_server_st *server)
7 {
8 struct addrinfo *ai;
9 struct addrinfo hints;
10 int e;
11 char str_port[NI_MAXSERV];
12
13 snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
14
15 memset(&hints, 0, sizeof(hints));
16
17 // hints.ai_family= AF_INET;
18 if (server->type == MEMCACHED_CONNECTION_UDP)
19 {
20 hints.ai_protocol= IPPROTO_UDP;
21 hints.ai_socktype= SOCK_DGRAM;
22 }
23 else
24 {
25 hints.ai_socktype= SOCK_STREAM;
26 hints.ai_protocol= IPPROTO_TCP;
27 }
28
29 e= getaddrinfo(server->hostname, str_port, &hints, &ai);
30 if (e != 0)
31 {
32 WATCHPOINT_STRING(server->hostname);
33 WATCHPOINT_STRING(gai_strerror(e));
34 return MEMCACHED_HOST_LOOKUP_FAILURE;
35 }
36
37 if (server->address_info)
38 {
39 freeaddrinfo(server->address_info);
40 server->address_info= NULL;
41 }
42 server->address_info= ai;
43
44 return MEMCACHED_SUCCESS;
45 }
46
47 static memcached_return_t set_socket_options(memcached_server_st *ptr)
48 {
49 WATCHPOINT_ASSERT(ptr->fd != -1);
50
51 if (ptr->type == MEMCACHED_CONNECTION_UDP)
52 return MEMCACHED_SUCCESS;
53
54 #ifdef HAVE_SNDTIMEO
55 if (ptr->root->snd_timeout)
56 {
57 int error;
58 struct timeval waittime;
59
60 waittime.tv_sec= 0;
61 waittime.tv_usec= ptr->root->snd_timeout;
62
63 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
64 &waittime, (socklen_t)sizeof(struct timeval));
65 WATCHPOINT_ASSERT(error == 0);
66 if (error)
67 return MEMCACHED_FAILURE;
68 }
69 #endif
70
71 #ifdef HAVE_RCVTIMEO
72 if (ptr->root->rcv_timeout)
73 {
74 int error;
75 struct timeval waittime;
76
77 waittime.tv_sec= 0;
78 waittime.tv_usec= ptr->root->rcv_timeout;
79
80 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
81 &waittime, (socklen_t)sizeof(struct timeval));
82 WATCHPOINT_ASSERT(error == 0);
83 if (error)
84 return MEMCACHED_FAILURE;
85 }
86 #endif
87
88 #ifdef TCP_KEEPIDLE
89 if (ptr->root->tcp_keepidle)
90 {
91 int flag= 1;
92 int error;
93
94 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_KEEPIDLE,
95 &flag, (socklen_t)sizeof(int));
96 WATCHPOINT_ASSERT(error == 0);
97 if (error)
98 return MEMCACHED_FAILURE;
99 }
100 #endif
101
102 if (ptr->root->flags.no_block)
103 {
104 int error;
105 struct linger linger;
106
107 linger.l_onoff= 1;
108 linger.l_linger= 0; /* By default on close() just drop the socket */
109 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
110 &linger, (socklen_t)sizeof(struct linger));
111 WATCHPOINT_ASSERT(error == 0);
112 if (error)
113 return MEMCACHED_FAILURE;
114 }
115
116 if (ptr->root->flags.tcp_nodelay)
117 {
118 int flag= 1;
119 int error;
120
121 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
122 &flag, (socklen_t)sizeof(int));
123 WATCHPOINT_ASSERT(error == 0);
124 if (error)
125 return MEMCACHED_FAILURE;
126 }
127
128 if (ptr->root->flags.tcp_keepalive)
129 {
130 int flag= 1;
131 int error;
132
133 error= setsockopt(ptr->fd, SOL_SOCKET, SO_KEEPALIVE,
134 &flag, (socklen_t)sizeof(int));
135 WATCHPOINT_ASSERT(error == 0);
136 if (error)
137 return MEMCACHED_FAILURE;
138 }
139
140 if (ptr->root->send_size > 0)
141 {
142 int error;
143
144 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
145 &ptr->root->send_size, (socklen_t)sizeof(int));
146 WATCHPOINT_ASSERT(error == 0);
147 if (error)
148 return MEMCACHED_FAILURE;
149 }
150
151 if (ptr->root->recv_size > 0)
152 {
153 int error;
154
155 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
156 &ptr->root->recv_size, (socklen_t)sizeof(int));
157 WATCHPOINT_ASSERT(error == 0);
158 if (error)
159 return MEMCACHED_FAILURE;
160 }
161
162 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
163 int flags;
164
165 do
166 flags= fcntl(ptr->fd, F_GETFL, 0);
167 while (flags == -1 && (errno == EINTR || errno == EAGAIN));
168
169 unlikely (flags == -1)
170 {
171 return MEMCACHED_CONNECTION_FAILURE;
172 }
173 else if ((flags & O_NONBLOCK) == 0)
174 {
175 int rval;
176
177 do
178 rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
179 while (rval == -1 && (errno == EINTR || errno == EAGAIN));
180
181 unlikely (rval == -1)
182 {
183 return MEMCACHED_CONNECTION_FAILURE;
184 }
185 }
186
187 return MEMCACHED_SUCCESS;
188 }
189
190 static memcached_return_t unix_socket_connect(memcached_server_st *ptr)
191 {
192 struct sockaddr_un servAddr;
193
194 if (ptr->fd == -1)
195 {
196 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
197 {
198 ptr->cached_errno= errno;
199 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
200 }
201
202 memset(&servAddr, 0, sizeof (struct sockaddr_un));
203 servAddr.sun_family= AF_UNIX;
204 strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
205
206 test_connect:
207 if (connect(ptr->fd,
208 (struct sockaddr *)&servAddr,
209 sizeof(servAddr)) < 0)
210 {
211 switch (errno)
212 {
213 case EINPROGRESS:
214 case EALREADY:
215 case EINTR:
216 goto test_connect;
217 case EISCONN: /* We were spinning waiting on connect */
218 break;
219 default:
220 WATCHPOINT_ERRNO(errno);
221 ptr->cached_errno= errno;
222 return MEMCACHED_ERRNO;
223 }
224 }
225 }
226
227 WATCHPOINT_ASSERT(ptr->fd != -1);
228 return MEMCACHED_SUCCESS;
229 }
230
231 static memcached_return_t network_connect(memcached_server_st *ptr)
232 {
233 if (ptr->fd == -1)
234 {
235 struct addrinfo *use;
236
237 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
238
239 if (! ptr->options.sockaddr_inited ||
240 (!(ptr->root->flags.use_cache_lookups)))
241 {
242 memcached_return_t rc;
243
244 rc= set_hostinfo(ptr);
245 if (rc != MEMCACHED_SUCCESS)
246 return rc;
247 ptr->options.sockaddr_inited= true;
248 }
249
250 use= ptr->address_info;
251 /* Create the socket */
252 while (use != NULL)
253 {
254 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
255 if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET)
256 {
257 use= use->ai_next;
258 continue;
259 }
260
261 if ((ptr->fd= socket(use->ai_family,
262 use->ai_socktype,
263 use->ai_protocol)) < 0)
264 {
265 ptr->cached_errno= errno;
266 WATCHPOINT_ERRNO(errno);
267 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
268 }
269
270 (void)set_socket_options(ptr);
271
272 /* connect to server */
273 while (ptr->fd != -1 &&
274 connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
275 {
276 ptr->cached_errno= errno;
277 if (errno == EINPROGRESS || /* nonblocking mode - first return, */
278 errno == EALREADY) /* nonblocking mode - subsequent returns */
279 {
280 struct pollfd fds[1];
281 fds[0].fd = ptr->fd;
282 fds[0].events = POLLOUT;
283 int error= poll(fds, 1, ptr->root->connect_timeout);
284
285 if (error != 1 || fds[0].revents & POLLERR)
286 {
287 if (fds[0].revents & POLLERR)
288 {
289 int err;
290 socklen_t len = sizeof (err);
291 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
292 ptr->cached_errno= (err == 0) ? errno : err;
293 }
294
295 (void)close(ptr->fd);
296 ptr->fd= -1;
297 }
298 }
299 else if (errno == EISCONN) /* we are connected :-) */
300 {
301 break;
302 }
303 else if (errno != EINTR)
304 {
305 (void)close(ptr->fd);
306 ptr->fd= -1;
307 break;
308 }
309 }
310
311 if (ptr->fd != -1)
312 {
313 ptr->server_failure_counter= 0;
314 return MEMCACHED_SUCCESS;
315 }
316 use = use->ai_next;
317 }
318 }
319
320 if (ptr->fd == -1)
321 {
322 /* Failed to connect. schedule next retry */
323 if (ptr->root->retry_timeout)
324 {
325 struct timeval next_time;
326
327 if (gettimeofday(&next_time, NULL) == 0)
328 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
329 }
330 ptr->server_failure_counter++;
331 if (ptr->cached_errno == 0)
332 return MEMCACHED_TIMEOUT;
333
334 return MEMCACHED_ERRNO; /* The last error should be from connect() */
335 }
336
337 ptr->server_failure_counter= 0;
338 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
339 }
340
341
342 memcached_return_t memcached_connect(memcached_server_write_instance_st ptr)
343 {
344 memcached_return_t rc= MEMCACHED_NO_SERVERS;
345 LIBMEMCACHED_MEMCACHED_CONNECT_START();
346
347 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
348 WATCHPOINT_ASSERT(ptr->root);
349 if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
350 {
351 struct timeval curr_time;
352
353 gettimeofday(&curr_time, NULL);
354
355 /* if we've had too many consecutive errors on this server, mark it dead. */
356 if (ptr->server_failure_counter >= ptr->root->server_failure_limit)
357 {
358 ptr->next_retry= curr_time.tv_sec + ptr->root->retry_timeout;
359 ptr->server_failure_counter= 0;
360 }
361
362 if (curr_time.tv_sec < ptr->next_retry)
363 {
364 memcached_st *root= (memcached_st *)ptr->root;
365 // @todo fix this by fixing behavior to no longer make use of
366 // memcached_st
367 if (memcached_behavior_get(root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
368 {
369 run_distribution(root);
370 }
371
372 root->last_disconnected_server = ptr;
373
374 return MEMCACHED_SERVER_MARKED_DEAD;
375 }
376 }
377
378 /* We need to clean up the multi startup piece */
379 switch (ptr->type)
380 {
381 case MEMCACHED_CONNECTION_UNKNOWN:
382 WATCHPOINT_ASSERT(0);
383 rc= MEMCACHED_NOT_SUPPORTED;
384 break;
385 case MEMCACHED_CONNECTION_UDP:
386 case MEMCACHED_CONNECTION_TCP:
387 rc= network_connect(ptr);
388 break;
389 case MEMCACHED_CONNECTION_UNIX_SOCKET:
390 rc= unix_socket_connect(ptr);
391 break;
392 case MEMCACHED_CONNECTION_MAX:
393 default:
394 WATCHPOINT_ASSERT(0);
395 }
396
397 unlikely ( rc != MEMCACHED_SUCCESS)
398 {
399 //@todo create interface around last_discontected_server
400 memcached_st *root= (memcached_st *)ptr->root;
401 root->last_disconnected_server = ptr;
402 }
403
404 LIBMEMCACHED_MEMCACHED_CONNECT_END();
405
406 return rc;
407 }