34e72f76b34231f520b8c170933a9df57f44adf2
[m6w6/libmemcached] / libmemcached / connect.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 #include <cassert>
42 #include <ctime>
43 #include <sys/time.h>
44
45 static memcached_return_t connect_poll(memcached_server_st *server)
46 {
47 struct pollfd fds[1];
48 fds[0].fd= server->fd;
49 fds[0].events= POLLOUT;
50
51 size_t loop_max= 5;
52
53 if (server->root->poll_timeout == 0)
54 {
55 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
56 }
57
58 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
59 {
60 int number_of;
61 if ((number_of= poll(fds, 1, server->root->connect_timeout)) <= 0)
62 {
63 if (number_of == -1)
64 {
65 int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
66 switch (local_errno)
67 {
68 #ifdef TARGET_OS_LINUX
69 case ERESTART:
70 #endif
71 case EINTR:
72 continue;
73
74 case EFAULT:
75 case ENOMEM:
76 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
77
78 case EINVAL:
79 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
80
81 default: // This should not happen
82 if (fds[0].revents & POLLERR)
83 {
84 int err;
85 socklen_t len= sizeof(err);
86 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
87 {
88 if (err == 0)
89 {
90 // This should never happen, if it does? Punt.
91 continue;
92 }
93 local_errno= err;
94 }
95 }
96
97 assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
98 (void)closesocket(server->fd);
99 server->fd= INVALID_SOCKET;
100 server->state= MEMCACHED_SERVER_STATE_NEW;
101
102 return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
103 }
104 }
105 assert(number_of == 0);
106
107 server->io_wait_count.timeouts++;
108 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
109 }
110
111 if (fds[0].revents & POLLERR or
112 fds[0].revents & POLLHUP or
113 fds[0].revents & POLLNVAL)
114 {
115 int err;
116 socklen_t len= sizeof (err);
117 if (getsockopt(fds[0].fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
118 {
119 // We check the value to see what happened wth the socket.
120 if (err == 0)
121 {
122 return MEMCACHED_SUCCESS;
123 }
124 errno= err;
125 }
126
127 return memcached_set_errno(*server, err, MEMCACHED_AT);
128 }
129 assert(fds[0].revents & POLLIN or fds[0].revents & POLLOUT);
130
131 return MEMCACHED_SUCCESS;
132 }
133
134 // This should only be possible from ERESTART or EINTR;
135 return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
136 }
137
138 static memcached_return_t set_hostinfo(memcached_server_st *server)
139 {
140 assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
141 if (server->address_info)
142 {
143 freeaddrinfo(server->address_info);
144 server->address_info= NULL;
145 server->address_info_next= NULL;
146 }
147
148 char str_port[NI_MAXSERV];
149 int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
150 if (length >= NI_MAXSERV or length < 0)
151 {
152 return MEMCACHED_FAILURE;
153 }
154
155 struct addrinfo hints;
156 memset(&hints, 0, sizeof(struct addrinfo));
157
158 #if 0
159 hints.ai_family= AF_INET;
160 #endif
161 if (memcached_is_udp(server->root))
162 {
163 hints.ai_protocol= IPPROTO_UDP;
164 hints.ai_socktype= SOCK_DGRAM;
165 }
166 else
167 {
168 hints.ai_socktype= SOCK_STREAM;
169 hints.ai_protocol= IPPROTO_TCP;
170 }
171
172 server->address_info= NULL;
173 int errcode;
174 switch(errcode= getaddrinfo(server->hostname, str_port, &hints, &server->address_info))
175 {
176 case 0:
177 break;
178
179 case EAI_AGAIN:
180 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
181
182 case EAI_SYSTEM:
183 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
184
185 case EAI_BADFLAGS:
186 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
187
188 case EAI_MEMORY:
189 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
190
191 default:
192 {
193 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
194 }
195 }
196 server->address_info_next= server->address_info;
197 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
198
199 return MEMCACHED_SUCCESS;
200 }
201
202 static inline void set_socket_nonblocking(memcached_server_st *server)
203 {
204 #ifdef WIN32
205 u_long arg= 1;
206 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR)
207 {
208 memcached_set_errno(*server, get_socket_errno(), NULL);
209 }
210 #else
211 int flags;
212
213 do
214 {
215 flags= fcntl(server->fd, F_GETFL, 0);
216 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
217
218 if (flags == -1)
219 {
220 memcached_set_errno(*server, errno, NULL);
221 }
222 else if ((flags & O_NONBLOCK) == 0)
223 {
224 int rval;
225
226 do
227 {
228 rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
229 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
230
231 if (rval == -1)
232 {
233 memcached_set_errno(*server, errno, NULL);
234 }
235 }
236 #endif
237 }
238
239 static void set_socket_options(memcached_server_st *server)
240 {
241 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
242
243 if (memcached_is_udp(server->root))
244 {
245 return;
246 }
247
248 #ifdef HAVE_SNDTIMEO
249 if (server->root->snd_timeout)
250 {
251 struct timeval waittime;
252
253 waittime.tv_sec= 0;
254 waittime.tv_usec= server->root->snd_timeout;
255
256 int error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
257 &waittime, (socklen_t)sizeof(struct timeval));
258 assert(error == 0);
259 }
260 #endif
261
262 #ifdef HAVE_RCVTIMEO
263 if (server->root->rcv_timeout)
264 {
265 struct timeval waittime;
266
267 waittime.tv_sec= 0;
268 waittime.tv_usec= server->root->rcv_timeout;
269
270 int error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
271 &waittime, (socklen_t)sizeof(struct timeval));
272 assert(error == 0);
273 }
274 #endif
275
276
277 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
278 {
279 int set= 1;
280 int error= setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
281
282 // This is not considered a fatal error
283 if (error == -1)
284 {
285 WATCHPOINT_ERRNO(get_socket_errno());
286 perror("setsockopt(SO_NOSIGPIPE)");
287 }
288 }
289 #endif
290
291 if (server->root->flags.no_block)
292 {
293 struct linger linger;
294
295 linger.l_onoff= 1;
296 linger.l_linger= 0; /* By default on close() just drop the socket */
297 int error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
298 &linger, (socklen_t)sizeof(struct linger));
299 assert(error == 0);
300 }
301
302 if (server->root->flags.tcp_nodelay)
303 {
304 int flag= 1;
305
306 int error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
307 &flag, (socklen_t)sizeof(int));
308 assert(error == 0);
309 }
310
311 if (server->root->flags.tcp_keepalive)
312 {
313 int flag= 1;
314
315 int error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
316 &flag, (socklen_t)sizeof(int));
317 assert(error == 0);
318 }
319
320 #ifdef TCP_KEEPIDLE
321 if (server->root->tcp_keepidle > 0)
322 {
323 int error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
324 &server->root->tcp_keepidle, (socklen_t)sizeof(int));
325 assert(error == 0);
326 }
327 #endif
328
329 if (server->root->send_size > 0)
330 {
331 int error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
332 &server->root->send_size, (socklen_t)sizeof(int));
333 assert(error == 0);
334 }
335
336 if (server->root->recv_size > 0)
337 {
338 int error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
339 &server->root->recv_size, (socklen_t)sizeof(int));
340 assert(error == 0);
341 }
342
343
344 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
345 set_socket_nonblocking(server);
346 }
347
348 static memcached_return_t unix_socket_connect(memcached_server_st *server)
349 {
350 #ifndef WIN32
351 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
352
353 if ((server->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
354 {
355 memcached_set_errno(*server, errno, NULL);
356 return MEMCACHED_CONNECTION_FAILURE;
357 }
358
359 struct sockaddr_un servAddr;
360
361 memset(&servAddr, 0, sizeof (struct sockaddr_un));
362 servAddr.sun_family= AF_UNIX;
363 strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
364
365 do {
366 if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
367 {
368 switch (errno)
369 {
370 case EINPROGRESS:
371 case EALREADY:
372 case EINTR:
373 continue;
374
375 case EISCONN: /* We were spinning waiting on connect */
376 {
377 assert(0); // Programmer error
378 break;
379 }
380
381 default:
382 WATCHPOINT_ERRNO(errno);
383 memcached_set_errno(*server, errno, MEMCACHED_AT);
384 return MEMCACHED_CONNECTION_FAILURE;
385 }
386 }
387 } while (0);
388 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
389
390 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
391
392 return MEMCACHED_SUCCESS;
393 #else
394 (void)server;
395 return MEMCACHED_NOT_SUPPORTED;
396 #endif
397 }
398
399 static memcached_return_t network_connect(memcached_server_st *server)
400 {
401 bool timeout_error_occured= false;
402
403 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
404 WATCHPOINT_ASSERT(server->cursor_active == 0);
405
406 /*
407 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
408 */
409 if (server->address_info == NULL or server->address_info_next == NULL)
410 {
411 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
412 server->address_info_next= NULL;
413 memcached_return_t rc;
414 uint32_t counter= 5;
415 while (--counter)
416 {
417 if ((rc= set_hostinfo(server)) != MEMCACHED_TIMEOUT)
418 {
419 break;
420 }
421
422 #ifndef WIN32
423 struct timespec dream, rem;
424
425 dream.tv_nsec= 1000;
426 dream.tv_sec= 0;
427
428 nanosleep(&dream, &rem);
429 #endif
430 }
431
432 if (memcached_failed(rc))
433 {
434 return rc;
435 }
436 }
437
438 if (server->address_info_next == NULL)
439 {
440 server->address_info_next= server->address_info;
441 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
442 }
443
444 /* Create the socket */
445 while (server->address_info_next and server->fd == INVALID_SOCKET)
446 {
447 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
448 if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
449 {
450 server->address_info_next= server->address_info_next->ai_next;
451 continue;
452 }
453
454 if ((server->fd= socket(server->address_info_next->ai_family,
455 server->address_info_next->ai_socktype,
456 server->address_info_next->ai_protocol)) < 0)
457 {
458 return memcached_set_errno(*server, get_socket_errno(), NULL);
459 }
460
461 set_socket_options(server);
462
463 /* connect to server */
464 if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
465 {
466 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
467 return MEMCACHED_SUCCESS;
468 }
469
470 /* An error occurred */
471 switch (get_socket_errno())
472 {
473 case ETIMEDOUT:
474 timeout_error_occured= true;
475 break;
476
477 case EAGAIN:
478 #if EWOULDBLOCK != EAGAIN
479 case EWOULDBLOCK:
480 #endif
481 case EINPROGRESS: // nonblocking mode - first return
482 case EALREADY: // nonblocking mode - subsequent returns
483 {
484 server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
485 memcached_return_t rc= connect_poll(server);
486
487 if (memcached_success(rc))
488 {
489 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
490 return MEMCACHED_SUCCESS;
491 }
492
493 // A timeout here is treated as an error, we will not retry
494 if (rc == MEMCACHED_TIMEOUT)
495 {
496 timeout_error_occured= true;
497 }
498 }
499 break;
500
501 case EISCONN: // we are connected :-)
502 WATCHPOINT_ASSERT(0); // This is a programmer's error
503 break;
504
505 case EINTR: // Special case, we retry ai_addr
506 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
507 (void)closesocket(server->fd);
508 server->fd= INVALID_SOCKET;
509 continue;
510
511 default:
512 break;
513 }
514
515 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
516 (void)closesocket(server->fd);
517 server->fd= INVALID_SOCKET;
518 server->address_info_next= server->address_info_next->ai_next;
519 }
520
521 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
522
523 if (timeout_error_occured)
524 {
525 if (server->fd != INVALID_SOCKET)
526 {
527 (void)closesocket(server->fd);
528 server->fd= INVALID_SOCKET;
529 }
530 }
531
532 WATCHPOINT_STRING("Never got a good file descriptor");
533
534 if (memcached_has_current_error(*server))
535 {
536 return memcached_server_error_return(server);
537 }
538
539 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
540 {
541 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
542 }
543
544 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
545 }
546
547
548 /*
549 backoff_handling()
550
551 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
552 we get caught spending cycles just waiting.
553 */
554 static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
555 {
556 struct timeval curr_time;
557 bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
558
559 /*
560 If we hit server_failure_limit then something is completely wrong about the server.
561
562 1) If autoeject is enabled we do that.
563 2) If not? We go into timeout again, there is much else to do :(
564 */
565 if (server->server_failure_counter >= server->root->server_failure_limit)
566 {
567 /*
568 We just auto_eject if we hit this point
569 */
570 if (_is_auto_eject_host(server->root))
571 {
572 set_last_disconnected_host(server);
573
574 // Retry dead servers if requested
575 if (_gettime_success and server->root->dead_timeout > 0)
576 {
577 server->next_retry= curr_time.tv_sec +server->root->dead_timeout;
578
579 // We only retry dead servers once before assuming failure again
580 server->server_failure_counter= server->root->server_failure_limit -1;
581 }
582
583 memcached_return_t rc;
584 if (memcached_failed(rc= run_distribution((memcached_st *)server->root)))
585 {
586 return memcached_set_error(*server, rc, MEMCACHED_AT, memcached_literal_param("Backoff handling failed during run_distribution"));
587 }
588
589 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
590 }
591
592 server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;
593
594 // Sanity check/setting
595 if (server->next_retry == 0)
596 {
597 server->next_retry= 1;
598 }
599 }
600
601 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
602 {
603 /*
604 If next_retry is less then our current time, then we reset and try everything again.
605 */
606 if (_gettime_success and server->next_retry < curr_time.tv_sec)
607 {
608 server->state= MEMCACHED_SERVER_STATE_NEW;
609 }
610 else
611 {
612 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
613 }
614
615 in_timeout= true;
616 }
617
618 return MEMCACHED_SUCCESS;
619 }
620
621 static memcached_return_t _memcached_connect(memcached_server_write_instance_st server, const bool set_last_disconnected)
622 {
623 if (server->fd != INVALID_SOCKET)
624 {
625 return MEMCACHED_SUCCESS;
626 }
627
628 LIBMEMCACHED_MEMCACHED_CONNECT_START();
629
630 bool in_timeout= false;
631 memcached_return_t rc;
632 if (memcached_failed(rc= backoff_handling(server, in_timeout)))
633 {
634 set_last_disconnected_host(server);
635 return rc;
636 }
637
638 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root))
639 {
640 return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
641 }
642
643 if (server->hostname[0] == '/')
644 {
645 server->type= MEMCACHED_CONNECTION_UNIX_SOCKET;
646 }
647
648 /* We need to clean up the multi startup piece */
649 switch (server->type)
650 {
651 case MEMCACHED_CONNECTION_UDP:
652 case MEMCACHED_CONNECTION_TCP:
653 rc= network_connect(server);
654
655 if (LIBMEMCACHED_WITH_SASL_SUPPORT)
656 {
657 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks)
658 {
659 rc= memcached_sasl_authenticate_connection(server);
660 if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
661 {
662 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
663 (void)closesocket(server->fd);
664 server->fd= INVALID_SOCKET;
665 }
666 }
667 }
668 break;
669
670 case MEMCACHED_CONNECTION_UNIX_SOCKET:
671 rc= unix_socket_connect(server);
672 break;
673 }
674
675 if (memcached_success(rc))
676 {
677 memcached_mark_server_as_clean(server);
678 return rc;
679 }
680 else if (set_last_disconnected)
681 {
682 set_last_disconnected_host(server);
683 if (memcached_has_current_error(*server))
684 {
685 memcached_mark_server_for_timeout(server);
686 assert(memcached_failed(memcached_server_error_return(server)));
687 }
688 else
689 {
690 memcached_set_error(*server, rc, MEMCACHED_AT);
691 memcached_mark_server_for_timeout(server);
692 }
693
694 LIBMEMCACHED_MEMCACHED_CONNECT_END();
695
696 if (in_timeout)
697 {
698 char buffer[1024];
699 int snprintf_length= snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname, int(server->port));
700 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT, buffer, snprintf_length);
701 }
702 }
703
704 return rc;
705 }
706
707 memcached_return_t memcached_connect_try(memcached_server_write_instance_st server)
708 {
709 return _memcached_connect(server, false);
710 }
711
712 memcached_return_t memcached_connect(memcached_server_write_instance_st server)
713 {
714 return _memcached_connect(server, true);
715 }