58a6cfd2756f037ff3d130e981c40fcfeb5674fc
[m6w6/libmemcached] / libmemcached / connect.c
1 /* LibMemcached
2 * Copyright (C) 2006-2010 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Server IO, Not public!
9 *
10 */
11
12 #include "common.h"
13 #include <sys/time.h>
14 #include <time.h>
15
16 static memcached_return_t connect_poll(memcached_server_st *ptr)
17 {
18 struct pollfd fds[1];
19 fds[0].fd = ptr->fd;
20 fds[0].events = POLLOUT;
21
22 int error;
23 size_t loop_max= 5;
24
25 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
26 {
27 error= poll(fds, 1, ptr->root->connect_timeout);
28
29 switch (error)
30 {
31 case 1:
32 {
33 int err;
34 socklen_t len= sizeof (err);
35 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
36
37 // We check the value to see what happened wth the socket.
38 if (err == 0)
39 {
40 return MEMCACHED_SUCCESS;
41 }
42 else
43 {
44 ptr->cached_errno= errno;
45
46 return MEMCACHED_ERRNO;
47 }
48 }
49 case 0:
50 return MEMCACHED_TIMEOUT;
51 default: // A real error occurred and we need to completely bail
52 WATCHPOINT_ERRNO(get_socket_errno());
53 switch (get_socket_errno())
54 {
55 #ifdef TARGET_OS_LINUX
56 case ERESTART:
57 #endif
58 case EINTR:
59 continue;
60 default:
61 if (fds[0].revents & POLLERR)
62 {
63 int err;
64 socklen_t len= sizeof (err);
65 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
66 ptr->cached_errno= (err == 0) ? get_socket_errno() : err;
67 }
68 else
69 {
70 ptr->cached_errno= get_socket_errno();
71 }
72
73 (void)closesocket(ptr->fd);
74 ptr->fd= INVALID_SOCKET;
75
76 return MEMCACHED_ERRNO;
77 }
78 }
79 }
80
81 // This should only be possible from ERESTART or EINTR;
82 ptr->cached_errno= get_socket_errno();
83
84 return MEMCACHED_ERRNO;
85 }
86
87 static memcached_return_t set_hostinfo(memcached_server_st *server)
88 {
89 struct addrinfo *ai;
90 struct addrinfo hints;
91 char str_port[NI_MAXSERV];
92 uint32_t counter= 5;
93
94 int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
95 if (length >= NI_MAXSERV || length < 0)
96 return MEMCACHED_FAILURE;
97
98 memset(&hints, 0, sizeof(hints));
99
100 // hints.ai_family= AF_INET;
101 if (server->type == MEMCACHED_CONNECTION_UDP)
102 {
103 hints.ai_protocol= IPPROTO_UDP;
104 hints.ai_socktype= SOCK_DGRAM;
105 }
106 else
107 {
108 hints.ai_socktype= SOCK_STREAM;
109 hints.ai_protocol= IPPROTO_TCP;
110 }
111
112 while (--counter)
113 {
114 int e= getaddrinfo(server->hostname, str_port, &hints, &ai);
115
116 if (e == 0)
117 {
118 break;
119 }
120 else if (e == EAI_AGAIN)
121 {
122 #ifndef WIN32
123 struct timespec dream, rem;
124
125 dream.tv_nsec= 1000;
126 dream.tv_sec= 0;
127
128 nanosleep(&dream, &rem);
129 #endif
130 continue;
131 }
132 else
133 {
134 WATCHPOINT_STRING(server->hostname);
135 WATCHPOINT_STRING(gai_strerror(e));
136 return MEMCACHED_HOST_LOOKUP_FAILURE;
137 }
138 }
139
140 if (server->address_info)
141 {
142 freeaddrinfo(server->address_info);
143 server->address_info= NULL;
144 }
145 server->address_info= ai;
146
147 return MEMCACHED_SUCCESS;
148 }
149
150 static inline memcached_return_t set_socket_nonblocking(memcached_server_st *ptr)
151 {
152 #ifdef WIN32
153 u_long arg = 1;
154 if (ioctlsocket(ptr->fd, FIONBIO, &arg) == SOCKET_ERROR)
155 {
156 ptr->cached_errno= get_socket_errno();
157 return MEMCACHED_CONNECTION_FAILURE;
158 }
159 #else
160 int flags;
161
162 do
163 flags= fcntl(ptr->fd, F_GETFL, 0);
164 while (flags == -1 && (errno == EINTR || errno == EAGAIN));
165
166 unlikely (flags == -1)
167 {
168 ptr->cached_errno= errno;
169 return MEMCACHED_CONNECTION_FAILURE;
170 }
171 else if ((flags & O_NONBLOCK) == 0)
172 {
173 int rval;
174
175 do
176 rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
177 while (rval == -1 && (errno == EINTR || errno == EAGAIN));
178
179 unlikely (rval == -1)
180 {
181 ptr->cached_errno= errno;
182 return MEMCACHED_CONNECTION_FAILURE;
183 }
184 }
185 #endif
186 return MEMCACHED_SUCCESS;
187 }
188
189 static memcached_return_t set_socket_options(memcached_server_st *ptr)
190 {
191 WATCHPOINT_ASSERT(ptr->fd != -1);
192
193 if (ptr->type == MEMCACHED_CONNECTION_UDP)
194 return MEMCACHED_SUCCESS;
195
196 #ifdef HAVE_SNDTIMEO
197 if (ptr->root->snd_timeout)
198 {
199 int error;
200 struct timeval waittime;
201
202 waittime.tv_sec= 0;
203 waittime.tv_usec= ptr->root->snd_timeout;
204
205 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
206 &waittime, (socklen_t)sizeof(struct timeval));
207 WATCHPOINT_ASSERT(error == 0);
208 if (error)
209 return MEMCACHED_FAILURE;
210 }
211 #endif
212
213 #ifdef HAVE_RCVTIMEO
214 if (ptr->root->rcv_timeout)
215 {
216 int error;
217 struct timeval waittime;
218
219 waittime.tv_sec= 0;
220 waittime.tv_usec= ptr->root->rcv_timeout;
221
222 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
223 &waittime, (socklen_t)sizeof(struct timeval));
224 WATCHPOINT_ASSERT(error == 0);
225 if (error)
226 return MEMCACHED_FAILURE;
227 }
228 #endif
229
230
231 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
232 {
233 int set = 1;
234 int error= setsockopt(ptr->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
235
236 // This is not considered a fatal error
237 if (error == -1)
238 {
239 WATCHPOINT_ERRNO(get_socket_errno());
240 perror("setsockopt(SO_NOSIGPIPE)");
241 }
242 }
243 #endif
244
245 if (ptr->root->flags.no_block)
246 {
247 int error;
248 struct linger linger;
249
250 linger.l_onoff= 1;
251 linger.l_linger= 0; /* By default on close() just drop the socket */
252 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
253 &linger, (socklen_t)sizeof(struct linger));
254 WATCHPOINT_ASSERT(error == 0);
255 if (error)
256 return MEMCACHED_FAILURE;
257 }
258
259 if (ptr->root->flags.tcp_nodelay)
260 {
261 int flag= 1;
262 int error;
263
264 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
265 &flag, (socklen_t)sizeof(int));
266 WATCHPOINT_ASSERT(error == 0);
267 if (error)
268 return MEMCACHED_FAILURE;
269 }
270
271 if (ptr->root->flags.tcp_keepalive)
272 {
273 int flag= 1;
274 int error;
275
276 error= setsockopt(ptr->fd, SOL_SOCKET, SO_KEEPALIVE,
277 &flag, (socklen_t)sizeof(int));
278 WATCHPOINT_ASSERT(error == 0);
279 if (error)
280 return MEMCACHED_FAILURE;
281 }
282
283 #ifdef TCP_KEEPIDLE
284 if (ptr->root->tcp_keepidle > 0)
285 {
286 int error;
287
288 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_KEEPIDLE,
289 &ptr->root->tcp_keepidle, (socklen_t)sizeof(int));
290 WATCHPOINT_ASSERT(error == 0);
291 if (error)
292 return MEMCACHED_FAILURE;
293 }
294 #endif
295
296 if (ptr->root->send_size > 0)
297 {
298 int error;
299
300 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
301 &ptr->root->send_size, (socklen_t)sizeof(int));
302 WATCHPOINT_ASSERT(error == 0);
303 if (error)
304 return MEMCACHED_FAILURE;
305 }
306
307 if (ptr->root->recv_size > 0)
308 {
309 int error;
310
311 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
312 &ptr->root->recv_size, (socklen_t)sizeof(int));
313 WATCHPOINT_ASSERT(error == 0);
314 if (error)
315 return MEMCACHED_FAILURE;
316 }
317
318
319 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
320 return set_socket_nonblocking(ptr);
321 }
322
323 static memcached_return_t unix_socket_connect(memcached_server_st *ptr)
324 {
325 #ifndef WIN32
326 struct sockaddr_un servAddr;
327
328 WATCHPOINT_ASSERT(ptr->fd == -1);
329
330 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
331 {
332 ptr->cached_errno= errno;
333 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
334 }
335
336 memset(&servAddr, 0, sizeof (struct sockaddr_un));
337 servAddr.sun_family= AF_UNIX;
338 strncpy(servAddr.sun_path, ptr->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
339
340 test_connect:
341 if (connect(ptr->fd,
342 (struct sockaddr *)&servAddr,
343 sizeof(servAddr)) < 0)
344 {
345 switch (errno)
346 {
347 case EINPROGRESS:
348 case EALREADY:
349 case EINTR:
350 goto test_connect;
351 case EISCONN: /* We were spinning waiting on connect */
352 break;
353 default:
354 WATCHPOINT_ERRNO(errno);
355 ptr->cached_errno= errno;
356 return MEMCACHED_ERRNO;
357 }
358 }
359
360 WATCHPOINT_ASSERT(ptr->fd != -1);
361
362 return MEMCACHED_SUCCESS;
363 #else
364 (void)ptr;
365 return MEMCACHED_NOT_SUPPORTED;
366 #endif
367 }
368
369 static memcached_return_t network_connect(memcached_server_st *ptr)
370 {
371 bool timeout_error_occured= false;
372
373
374 WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET);
375 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
376
377 if (! ptr->options.sockaddr_inited || (!(ptr->root->flags.use_cache_lookups)))
378 {
379 memcached_return_t rc;
380
381 rc= set_hostinfo(ptr);
382 if (rc != MEMCACHED_SUCCESS)
383 return rc;
384 ptr->options.sockaddr_inited= true;
385 }
386
387 struct addrinfo *use= ptr->address_info;
388 /* Create the socket */
389 while (use != NULL)
390 {
391 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
392 if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET)
393 {
394 use= use->ai_next;
395 continue;
396 }
397
398 if ((ptr->fd= socket(use->ai_family,
399 use->ai_socktype,
400 use->ai_protocol)) < 0)
401 {
402 ptr->cached_errno= get_socket_errno();
403 WATCHPOINT_ERRNO(get_socket_errno());
404 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
405 }
406
407 (void)set_socket_options(ptr);
408
409 /* connect to server */
410 if ((connect(ptr->fd, use->ai_addr, use->ai_addrlen) != SOCKET_ERROR))
411 {
412 break; // Success
413 }
414
415 /* An error occurred */
416 ptr->cached_errno= get_socket_errno();
417 if (ptr->cached_errno == EWOULDBLOCK ||
418 ptr->cached_errno == EINPROGRESS || /* nonblocking mode - first return, */
419 ptr->cached_errno == EALREADY) /* nonblocking mode - subsequent returns */
420 {
421 memcached_return_t rc;
422 rc= connect_poll(ptr);
423
424 if (rc == MEMCACHED_TIMEOUT)
425 timeout_error_occured= true;
426
427 if (rc == MEMCACHED_SUCCESS)
428 break;
429 }
430 else if (get_socket_errno() == EISCONN) /* we are connected :-) */
431 {
432 break;
433 }
434 else if (get_socket_errno() == EINTR) // Special case, we retry ai_addr
435 {
436 (void)closesocket(ptr->fd);
437 ptr->fd= INVALID_SOCKET;
438 continue;
439 }
440
441 (void)closesocket(ptr->fd);
442 ptr->fd= INVALID_SOCKET;
443 use= use->ai_next;
444 }
445
446 if (ptr->fd == INVALID_SOCKET)
447 {
448 WATCHPOINT_STRING("Never got a good file descriptor");
449
450 /* Failed to connect. schedule next retry */
451 if (ptr->root->retry_timeout)
452 {
453 struct timeval next_time;
454
455 if (gettimeofday(&next_time, NULL) == 0)
456 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
457 }
458
459 if (timeout_error_occured)
460 return MEMCACHED_TIMEOUT;
461
462 return MEMCACHED_ERRNO; /* The last error should be from connect() */
463 }
464
465 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
466 }
467
468 void set_last_disconnected_host(memcached_server_write_instance_st ptr)
469 {
470 // const_cast
471 memcached_st *root= (memcached_st *)ptr->root;
472
473 #if 0
474 WATCHPOINT_STRING(ptr->hostname);
475 WATCHPOINT_NUMBER(ptr->port);
476 WATCHPOINT_ERRNO(ptr->cached_errno);
477 #endif
478 if (root->last_disconnected_server)
479 memcached_server_free(root->last_disconnected_server);
480 root->last_disconnected_server= memcached_server_clone(NULL, ptr);
481 }
482
483 memcached_return_t memcached_connect(memcached_server_write_instance_st ptr)
484 {
485 memcached_return_t rc= MEMCACHED_NO_SERVERS;
486
487 if (ptr->fd != INVALID_SOCKET)
488 return MEMCACHED_SUCCESS;
489
490 LIBMEMCACHED_MEMCACHED_CONNECT_START();
491
492 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
493 WATCHPOINT_ASSERT(ptr->root);
494 if (ptr->root->retry_timeout && ptr->next_retry)
495 {
496 struct timeval curr_time;
497
498 gettimeofday(&curr_time, NULL);
499
500 // We should optimize this to remove the allocation if the server was
501 // the last server to die
502 if (ptr->next_retry > curr_time.tv_sec)
503 {
504 set_last_disconnected_host(ptr);
505
506 return MEMCACHED_SERVER_MARKED_DEAD;
507 }
508 }
509
510 // If we are over the counter failure, we just fail. Reject host only
511 // works if you have a set number of failures.
512 if (ptr->root->server_failure_limit && ptr->server_failure_counter >= ptr->root->server_failure_limit)
513 {
514 set_last_disconnected_host(ptr);
515
516 // @todo fix this by fixing behavior to no longer make use of
517 // memcached_st
518 if (_is_auto_eject_host(ptr->root))
519 {
520 run_distribution((memcached_st *)ptr->root);
521 }
522
523 return MEMCACHED_SERVER_MARKED_DEAD;
524 }
525
526 /* We need to clean up the multi startup piece */
527 switch (ptr->type)
528 {
529 case MEMCACHED_CONNECTION_UNKNOWN:
530 WATCHPOINT_ASSERT(0);
531 rc= MEMCACHED_NOT_SUPPORTED;
532 break;
533 case MEMCACHED_CONNECTION_UDP:
534 case MEMCACHED_CONNECTION_TCP:
535 rc= network_connect(ptr);
536 #ifdef LIBMEMCACHED_WITH_SASL_SUPPORT
537 if (ptr->fd != INVALID_SOCKET && ptr->root->sasl.callbacks)
538 {
539 rc= memcached_sasl_authenticate_connection(ptr);
540 if (rc != MEMCACHED_SUCCESS)
541 {
542 (void)closesocket(ptr->fd);
543 ptr->fd= INVALID_SOCKET;
544 }
545 }
546 #endif
547 break;
548 case MEMCACHED_CONNECTION_UNIX_SOCKET:
549 rc= unix_socket_connect(ptr);
550 break;
551 case MEMCACHED_CONNECTION_MAX:
552 default:
553 WATCHPOINT_ASSERT(0);
554 }
555
556 if (rc == MEMCACHED_SUCCESS)
557 {
558 ptr->server_failure_counter= 0;
559 ptr->next_retry= 0;
560 }
561 else
562 {
563 ptr->server_failure_counter++;
564
565 set_last_disconnected_host(ptr);
566 }
567
568 LIBMEMCACHED_MEMCACHED_CONNECT_END();
569
570 return rc;
571 }