Merge
[m6w6/libmemcached] / libmemcached / connect.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 #include <cassert>
42 #include <ctime>
43 #include <sys/time.h>
44
45 static memcached_return_t connect_poll(memcached_server_st *server)
46 {
47 struct pollfd fds[1];
48 fds[0].fd= server->fd;
49 fds[0].events= POLLOUT;
50
51 size_t loop_max= 5;
52
53 if (server->root->poll_timeout == 0)
54 {
55 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
56 }
57
58 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
59 {
60 int error= poll(fds, 1, server->root->connect_timeout);
61 switch (error)
62 {
63 case 1:
64 {
65 int err;
66 socklen_t len= sizeof (err);
67 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
68 {
69 // We check the value to see what happened wth the socket.
70 if (err == 0)
71 {
72 return MEMCACHED_SUCCESS;
73 }
74 errno= err;
75 }
76
77 return memcached_set_errno(*server, err, MEMCACHED_AT);
78 }
79 case 0:
80 {
81 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
82 }
83
84 default: // A real error occurred and we need to completely bail
85 switch (get_socket_errno())
86 {
87 #ifdef TARGET_OS_LINUX
88 case ERESTART:
89 #endif
90 case EINTR:
91 continue;
92
93 case EFAULT:
94 case ENOMEM:
95 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
96
97 case EINVAL:
98 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
99
100 default: // This should not happen
101 if (fds[0].revents & POLLERR)
102 {
103 int err;
104 socklen_t len= sizeof(err);
105 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
106 {
107 if (err == 0)
108 {
109 // This should never happen, if it does? Punt.
110 continue;
111 }
112 errno= err;
113 }
114 }
115
116 int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
117
118 assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
119 (void)closesocket(server->fd);
120 server->fd= INVALID_SOCKET;
121 server->state= MEMCACHED_SERVER_STATE_NEW;
122
123 return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
124 }
125 }
126 }
127
128 // This should only be possible from ERESTART or EINTR;
129 return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
130 }
131
132 static memcached_return_t set_hostinfo(memcached_server_st *server)
133 {
134 assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
135 if (server->address_info)
136 {
137 freeaddrinfo(server->address_info);
138 server->address_info= NULL;
139 server->address_info_next= NULL;
140 }
141
142 char str_port[NI_MAXSERV];
143 int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
144 if (length >= NI_MAXSERV or length < 0)
145 {
146 return MEMCACHED_FAILURE;
147 }
148
149 struct addrinfo hints;
150 memset(&hints, 0, sizeof(struct addrinfo));
151
152 #if 0
153 hints.ai_family= AF_INET;
154 #endif
155 if (memcached_is_udp(server->root))
156 {
157 hints.ai_protocol= IPPROTO_UDP;
158 hints.ai_socktype= SOCK_DGRAM;
159 }
160 else
161 {
162 hints.ai_socktype= SOCK_STREAM;
163 hints.ai_protocol= IPPROTO_TCP;
164 }
165
166 server->address_info= NULL;
167 int errcode;
168 switch(errcode= getaddrinfo(server->hostname, str_port, &hints, &server->address_info))
169 {
170 case 0:
171 break;
172
173 case EAI_AGAIN:
174 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
175
176 case EAI_SYSTEM:
177 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
178
179 case EAI_BADFLAGS:
180 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
181
182 case EAI_MEMORY:
183 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
184
185 default:
186 {
187 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
188 }
189 }
190 server->address_info_next= server->address_info;
191 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
192
193 return MEMCACHED_SUCCESS;
194 }
195
196 static inline void set_socket_nonblocking(memcached_server_st *server)
197 {
198 #ifdef WIN32
199 u_long arg= 1;
200 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR)
201 {
202 memcached_set_errno(*server, get_socket_errno(), NULL);
203 }
204 #else
205 int flags;
206
207 do
208 {
209 flags= fcntl(server->fd, F_GETFL, 0);
210 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
211
212 if (flags == -1)
213 {
214 memcached_set_errno(*server, errno, NULL);
215 }
216 else if ((flags & O_NONBLOCK) == 0)
217 {
218 int rval;
219
220 do
221 {
222 rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
223 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
224
225 if (rval == -1)
226 {
227 memcached_set_errno(*server, errno, NULL);
228 }
229 }
230 #endif
231 }
232
233 static void set_socket_options(memcached_server_st *server)
234 {
235 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
236
237 if (memcached_is_udp(server->root))
238 {
239 return;
240 }
241
242 #ifdef HAVE_SNDTIMEO
243 if (server->root->snd_timeout)
244 {
245 int error;
246 struct timeval waittime;
247
248 waittime.tv_sec= 0;
249 waittime.tv_usec= server->root->snd_timeout;
250
251 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
252 &waittime, (socklen_t)sizeof(struct timeval));
253 WATCHPOINT_ASSERT(error == 0);
254 }
255 #endif
256
257 #ifdef HAVE_RCVTIMEO
258 if (server->root->rcv_timeout)
259 {
260 int error;
261 struct timeval waittime;
262
263 waittime.tv_sec= 0;
264 waittime.tv_usec= server->root->rcv_timeout;
265
266 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
267 &waittime, (socklen_t)sizeof(struct timeval));
268 WATCHPOINT_ASSERT(error == 0);
269 }
270 #endif
271
272
273 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
274 {
275 int set= 1;
276 int error= setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
277
278 // This is not considered a fatal error
279 if (error == -1)
280 {
281 WATCHPOINT_ERRNO(get_socket_errno());
282 perror("setsockopt(SO_NOSIGPIPE)");
283 }
284 }
285 #endif
286
287 if (server->root->flags.no_block)
288 {
289 int error;
290 struct linger linger;
291
292 linger.l_onoff= 1;
293 linger.l_linger= 0; /* By default on close() just drop the socket */
294 error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
295 &linger, (socklen_t)sizeof(struct linger));
296 WATCHPOINT_ASSERT(error == 0);
297 }
298
299 if (server->root->flags.tcp_nodelay)
300 {
301 int flag= 1;
302 int error;
303
304 error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
305 &flag, (socklen_t)sizeof(int));
306 WATCHPOINT_ASSERT(error == 0);
307 }
308
309 if (server->root->flags.tcp_keepalive)
310 {
311 int flag= 1;
312 int error;
313
314 error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
315 &flag, (socklen_t)sizeof(int));
316 WATCHPOINT_ASSERT(error == 0);
317 }
318
319 #ifdef TCP_KEEPIDLE
320 if (server->root->tcp_keepidle > 0)
321 {
322 int error;
323
324 error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
325 &server->root->tcp_keepidle, (socklen_t)sizeof(int));
326 WATCHPOINT_ASSERT(error == 0);
327 }
328 #endif
329
330 if (server->root->send_size > 0)
331 {
332 int error;
333
334 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
335 &server->root->send_size, (socklen_t)sizeof(int));
336 WATCHPOINT_ASSERT(error == 0);
337 }
338
339 if (server->root->recv_size > 0)
340 {
341 int error;
342
343 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
344 &server->root->recv_size, (socklen_t)sizeof(int));
345 WATCHPOINT_ASSERT(error == 0);
346 }
347
348
349 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
350 set_socket_nonblocking(server);
351 }
352
353 static memcached_return_t unix_socket_connect(memcached_server_st *server)
354 {
355 #ifndef WIN32
356 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
357
358 if ((server->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
359 {
360 memcached_set_errno(*server, errno, NULL);
361 return MEMCACHED_CONNECTION_FAILURE;
362 }
363
364 struct sockaddr_un servAddr;
365
366 memset(&servAddr, 0, sizeof (struct sockaddr_un));
367 servAddr.sun_family= AF_UNIX;
368 strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
369
370 do {
371 if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
372 {
373 switch (errno)
374 {
375 case EINPROGRESS:
376 case EALREADY:
377 case EINTR:
378 continue;
379
380 case EISCONN: /* We were spinning waiting on connect */
381 {
382 WATCHPOINT_ASSERT(0); // Programmer error
383 break;
384 }
385
386 default:
387 WATCHPOINT_ERRNO(errno);
388 memcached_set_errno(*server, errno, MEMCACHED_AT);
389 return MEMCACHED_CONNECTION_FAILURE;
390 }
391 }
392 } while (0);
393 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
394
395 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
396
397 return MEMCACHED_SUCCESS;
398 #else
399 (void)server;
400 return MEMCACHED_NOT_SUPPORTED;
401 #endif
402 }
403
404 static memcached_return_t network_connect(memcached_server_st *server)
405 {
406 bool timeout_error_occured= false;
407
408 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
409 WATCHPOINT_ASSERT(server->cursor_active == 0);
410
411 /*
412 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
413 */
414 if (server->address_info == NULL or server->address_info_next == NULL)
415 {
416 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
417 server->address_info_next= NULL;
418 memcached_return_t rc;
419 uint32_t counter= 5;
420 while (--counter)
421 {
422 if ((rc= set_hostinfo(server)) != MEMCACHED_TIMEOUT)
423 {
424 break;
425 }
426
427 #ifndef WIN32
428 struct timespec dream, rem;
429
430 dream.tv_nsec= 1000;
431 dream.tv_sec= 0;
432
433 nanosleep(&dream, &rem);
434 #endif
435 }
436
437 if (memcached_failed(rc))
438 {
439 return rc;
440 }
441 }
442
443 if (server->address_info_next == NULL)
444 {
445 server->address_info_next= server->address_info;
446 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
447 }
448
449 /* Create the socket */
450 while (server->address_info_next and server->fd == INVALID_SOCKET)
451 {
452 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
453 if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
454 {
455 server->address_info_next= server->address_info_next->ai_next;
456 continue;
457 }
458
459 if ((server->fd= socket(server->address_info_next->ai_family,
460 server->address_info_next->ai_socktype,
461 server->address_info_next->ai_protocol)) < 0)
462 {
463 return memcached_set_errno(*server, get_socket_errno(), NULL);
464 }
465
466 set_socket_options(server);
467
468 /* connect to server */
469 if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
470 {
471 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
472 return MEMCACHED_SUCCESS;
473 }
474
475 /* An error occurred */
476 switch (get_socket_errno())
477 {
478 case ETIMEDOUT:
479 timeout_error_occured= true;
480 break;
481
482 case EAGAIN:
483 #if EWOULDBLOCK != EAGAIN
484 case EWOULDBLOCK:
485 #endif
486 case EINPROGRESS: // nonblocking mode - first return
487 case EALREADY: // nonblocking mode - subsequent returns
488 {
489 server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
490 memcached_return_t rc= connect_poll(server);
491
492 if (memcached_success(rc))
493 {
494 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
495 return MEMCACHED_SUCCESS;
496 }
497
498 // A timeout here is treated as an error, we will not retry
499 if (rc == MEMCACHED_TIMEOUT)
500 {
501 timeout_error_occured= true;
502 }
503 }
504 break;
505
506 case EISCONN: // we are connected :-)
507 WATCHPOINT_ASSERT(0); // This is a programmer's error
508 break;
509
510 case EINTR: // Special case, we retry ai_addr
511 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
512 (void)closesocket(server->fd);
513 server->fd= INVALID_SOCKET;
514 continue;
515
516 default:
517 break;
518 }
519
520 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
521 (void)closesocket(server->fd);
522 server->fd= INVALID_SOCKET;
523 server->address_info_next= server->address_info_next->ai_next;
524 }
525
526 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
527
528 if (timeout_error_occured)
529 {
530 if (server->fd != INVALID_SOCKET)
531 {
532 (void)closesocket(server->fd);
533 server->fd= INVALID_SOCKET;
534 }
535 }
536
537 WATCHPOINT_STRING("Never got a good file descriptor");
538
539 if (memcached_has_current_error(*server))
540 {
541 return memcached_server_error_return(server);
542 }
543
544 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
545 {
546 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
547 }
548
549 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
550 }
551
552
553 /*
554 backoff_handling()
555
556 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
557 we get caught spending cycles just waiting.
558 */
559 static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
560 {
561 struct timeval curr_time;
562 bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
563
564 /*
565 If we hit server_failure_limit then something is completely wrong about the server.
566
567 1) If autoeject is enabled we do that.
568 2) If not? We go into timeout again, there is much else to do :(
569 */
570 if (server->server_failure_counter >= server->root->server_failure_limit)
571 {
572 /*
573 We just auto_eject if we hit this point
574 */
575 if (_is_auto_eject_host(server->root))
576 {
577 set_last_disconnected_host(server);
578
579 // Retry dead servers if requested
580 if (_gettime_success and server->root->dead_timeout > 0)
581 {
582 server->next_retry= curr_time.tv_sec +server->root->dead_timeout;
583
584 // We only retry dead servers once before assuming failure again
585 server->server_failure_counter= server->root->server_failure_limit -1;
586 }
587
588 memcached_return_t rc;
589 if (memcached_failed(rc= run_distribution((memcached_st *)server->root)))
590 {
591 return memcached_set_error(*server, rc, MEMCACHED_AT, memcached_literal_param("Backoff handling failed during run_distribution"));
592 }
593
594 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
595 }
596
597 server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;
598
599 // Sanity check/setting
600 if (server->next_retry == 0)
601 {
602 server->next_retry= 1;
603 }
604 }
605
606 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
607 {
608 /*
609 If next_retry is less then our current time, then we reset and try everything again.
610 */
611 if (_gettime_success and server->next_retry < curr_time.tv_sec)
612 {
613 server->state= MEMCACHED_SERVER_STATE_NEW;
614 }
615 else
616 {
617 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
618 }
619
620 in_timeout= true;
621 }
622
623 return MEMCACHED_SUCCESS;
624 }
625
626 static memcached_return_t _memcached_connect(memcached_server_write_instance_st server, const bool set_last_disconnected)
627 {
628 if (server->fd != INVALID_SOCKET)
629 {
630 return MEMCACHED_SUCCESS;
631 }
632
633 LIBMEMCACHED_MEMCACHED_CONNECT_START();
634
635 bool in_timeout= false;
636 memcached_return_t rc;
637 if (memcached_failed(rc= backoff_handling(server, in_timeout)))
638 {
639 set_last_disconnected_host(server);
640 return rc;
641 }
642
643 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root))
644 {
645 return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
646 }
647
648 if (server->hostname[0] == '/')
649 {
650 server->type= MEMCACHED_CONNECTION_UNIX_SOCKET;
651 }
652
653 /* We need to clean up the multi startup piece */
654 switch (server->type)
655 {
656 case MEMCACHED_CONNECTION_UDP:
657 case MEMCACHED_CONNECTION_TCP:
658 rc= network_connect(server);
659
660 if (LIBMEMCACHED_WITH_SASL_SUPPORT)
661 {
662 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks)
663 {
664 rc= memcached_sasl_authenticate_connection(server);
665 if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
666 {
667 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
668 (void)closesocket(server->fd);
669 server->fd= INVALID_SOCKET;
670 }
671 }
672 }
673 break;
674
675 case MEMCACHED_CONNECTION_UNIX_SOCKET:
676 rc= unix_socket_connect(server);
677 break;
678 }
679
680 if (memcached_success(rc))
681 {
682 memcached_mark_server_as_clean(server);
683 return rc;
684 }
685 else if (set_last_disconnected)
686 {
687 set_last_disconnected_host(server);
688 if (memcached_has_current_error(*server))
689 {
690 memcached_mark_server_for_timeout(server);
691 assert(memcached_failed(memcached_server_error_return(server)));
692 }
693 else
694 {
695 memcached_set_error(*server, rc, MEMCACHED_AT);
696 memcached_mark_server_for_timeout(server);
697 }
698
699 LIBMEMCACHED_MEMCACHED_CONNECT_END();
700
701 if (in_timeout)
702 {
703 char buffer[1024];
704 int snprintf_length= snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname, int(server->port));
705 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT, buffer, snprintf_length);
706 }
707 }
708
709 return rc;
710 }
711
712 memcached_return_t memcached_connect_try(memcached_server_write_instance_st server)
713 {
714 return _memcached_connect(server, false);
715 }
716
717 memcached_return_t memcached_connect(memcached_server_write_instance_st server)
718 {
719 return _memcached_connect(server, true);
720 }