Rollup from build trunk.
[awesomized/libmemcached] / libmemcached / connect.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 #include <cassert>
42 #include <ctime>
43 #include <sys/time.h>
44
45 static memcached_return_t connect_poll(memcached_server_st *server)
46 {
47 struct pollfd fds[1];
48 fds[0].fd= server->fd;
49 fds[0].events= POLLOUT;
50
51 size_t loop_max= 5;
52
53 if (server->root->poll_timeout == 0)
54 {
55 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
56 }
57
58 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
59 {
60 int error= poll(fds, 1, server->root->connect_timeout);
61 switch (error)
62 {
63 case 1:
64 {
65 int err;
66 socklen_t len= sizeof (err);
67 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
68 {
69 // We check the value to see what happened wth the socket.
70 if (err == 0)
71 {
72 return MEMCACHED_SUCCESS;
73 }
74 errno= err;
75 }
76
77 return memcached_set_errno(*server, err, MEMCACHED_AT);
78 }
79 case 0:
80 {
81 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
82 }
83
84 default: // A real error occurred and we need to completely bail
85 switch (get_socket_errno())
86 {
87 #ifdef TARGET_OS_LINUX
88 case ERESTART:
89 #endif
90 case EINTR:
91 continue;
92
93 case EFAULT:
94 case ENOMEM:
95 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
96
97 case EINVAL:
98 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
99
100 default: // This should not happen
101 if (fds[0].revents & POLLERR)
102 {
103 int err;
104 socklen_t len= sizeof(err);
105 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
106 {
107 if (err == 0)
108 {
109 // This should never happen, if it does? Punt.
110 continue;
111 }
112 errno= err;
113 }
114 }
115
116 int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
117
118 assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
119 (void)closesocket(server->fd);
120 server->fd= INVALID_SOCKET;
121 server->state= MEMCACHED_SERVER_STATE_NEW;
122
123 return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
124 }
125 }
126 }
127
128 // This should only be possible from ERESTART or EINTR;
129 return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
130 }
131
132 static memcached_return_t set_hostinfo(memcached_server_st *server)
133 {
134 assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
135 if (server->address_info)
136 {
137 freeaddrinfo(server->address_info);
138 server->address_info= NULL;
139 server->address_info_next= NULL;
140 }
141
142 char str_port[NI_MAXSERV];
143 int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
144 if (length >= NI_MAXSERV or length < 0)
145 {
146 return MEMCACHED_FAILURE;
147 }
148
149 struct addrinfo hints;
150 memset(&hints, 0, sizeof(struct addrinfo));
151
152 #if 0
153 hints.ai_family= AF_INET;
154 #endif
155 if (memcached_is_udp(server->root))
156 {
157 hints.ai_protocol= IPPROTO_UDP;
158 hints.ai_socktype= SOCK_DGRAM;
159 }
160 else
161 {
162 hints.ai_socktype= SOCK_STREAM;
163 hints.ai_protocol= IPPROTO_TCP;
164 }
165
166 server->address_info= NULL;
167 int errcode;
168 switch(errcode= getaddrinfo(server->hostname, str_port, &hints, &server->address_info))
169 {
170 case 0:
171 break;
172
173 case EAI_AGAIN:
174 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
175
176 case EAI_SYSTEM:
177 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
178
179 case EAI_BADFLAGS:
180 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
181
182 case EAI_MEMORY:
183 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
184
185 default:
186 {
187 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
188 }
189 }
190 server->address_info_next= server->address_info;
191 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
192
193 return MEMCACHED_SUCCESS;
194 }
195
196 static inline void set_socket_nonblocking(memcached_server_st *server)
197 {
198 #ifdef WIN32
199 u_long arg= 1;
200 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR)
201 {
202 memcached_set_errno(*server, get_socket_errno(), NULL);
203 }
204 #else
205 int flags;
206
207 do
208 {
209 flags= fcntl(server->fd, F_GETFL, 0);
210 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
211
212 if (flags == -1)
213 {
214 memcached_set_errno(*server, errno, NULL);
215 }
216 else if ((flags & O_NONBLOCK) == 0)
217 {
218 int rval;
219
220 do
221 {
222 rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
223 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
224
225 if (rval == -1)
226 {
227 memcached_set_errno(*server, errno, NULL);
228 }
229 }
230 #endif
231 }
232
233 static void set_socket_options(memcached_server_st *server)
234 {
235 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
236
237 if (memcached_is_udp(server->root))
238 {
239 return;
240 }
241
242 #ifdef HAVE_SNDTIMEO
243 if (server->root->snd_timeout)
244 {
245 int error;
246 struct timeval waittime;
247
248 waittime.tv_sec= 0;
249 waittime.tv_usec= server->root->snd_timeout;
250
251 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
252 &waittime, (socklen_t)sizeof(struct timeval));
253 WATCHPOINT_ASSERT(error == 0);
254 }
255 #endif
256
257 #ifdef HAVE_RCVTIMEO
258 if (server->root->rcv_timeout)
259 {
260 int error;
261 struct timeval waittime;
262
263 waittime.tv_sec= 0;
264 waittime.tv_usec= server->root->rcv_timeout;
265
266 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
267 &waittime, (socklen_t)sizeof(struct timeval));
268 WATCHPOINT_ASSERT(error == 0);
269 }
270 #endif
271
272
273 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
274 {
275 int set= 1;
276 int error= setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
277
278 // This is not considered a fatal error
279 if (error == -1)
280 {
281 WATCHPOINT_ERRNO(get_socket_errno());
282 perror("setsockopt(SO_NOSIGPIPE)");
283 }
284 }
285 #endif
286
287 if (server->root->flags.no_block)
288 {
289 int error;
290 struct linger linger;
291
292 linger.l_onoff= 1;
293 linger.l_linger= 0; /* By default on close() just drop the socket */
294 error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
295 &linger, (socklen_t)sizeof(struct linger));
296 WATCHPOINT_ASSERT(error == 0);
297 }
298
299 if (server->root->flags.tcp_nodelay)
300 {
301 int flag= 1;
302 int error;
303
304 error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
305 &flag, (socklen_t)sizeof(int));
306 WATCHPOINT_ASSERT(error == 0);
307 }
308
309 if (server->root->flags.tcp_keepalive)
310 {
311 int flag= 1;
312 int error;
313
314 error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
315 &flag, (socklen_t)sizeof(int));
316 WATCHPOINT_ASSERT(error == 0);
317 }
318
319 #ifdef TCP_KEEPIDLE
320 if (server->root->tcp_keepidle > 0)
321 {
322 int error;
323
324 error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
325 &server->root->tcp_keepidle, (socklen_t)sizeof(int));
326 WATCHPOINT_ASSERT(error == 0);
327 }
328 #endif
329
330 if (server->root->send_size > 0)
331 {
332 int error;
333
334 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
335 &server->root->send_size, (socklen_t)sizeof(int));
336 WATCHPOINT_ASSERT(error == 0);
337 }
338
339 if (server->root->recv_size > 0)
340 {
341 int error;
342
343 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
344 &server->root->recv_size, (socklen_t)sizeof(int));
345 WATCHPOINT_ASSERT(error == 0);
346 }
347
348
349 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
350 set_socket_nonblocking(server);
351 }
352
353 static memcached_return_t unix_socket_connect(memcached_server_st *server)
354 {
355 #ifndef WIN32
356 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
357
358 if ((server->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
359 {
360 memcached_set_errno(*server, errno, NULL);
361 return MEMCACHED_CONNECTION_FAILURE;
362 }
363
364 struct sockaddr_un servAddr;
365
366 memset(&servAddr, 0, sizeof (struct sockaddr_un));
367 servAddr.sun_family= AF_UNIX;
368 strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
369
370 do {
371 if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
372 {
373 switch (errno)
374 {
375 case EINPROGRESS:
376 case EALREADY:
377 case EINTR:
378 continue;
379
380 case EISCONN: /* We were spinning waiting on connect */
381 {
382 WATCHPOINT_ASSERT(0); // Programmer error
383 break;
384 }
385
386 default:
387 WATCHPOINT_ERRNO(errno);
388 memcached_set_errno(*server, errno, MEMCACHED_AT);
389 return MEMCACHED_CONNECTION_FAILURE;
390 }
391 }
392 } while (0);
393 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
394
395 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
396
397 return MEMCACHED_SUCCESS;
398 #else
399 (void)server;
400 return MEMCACHED_NOT_SUPPORTED;
401 #endif
402 }
403
404 static memcached_return_t network_connect(memcached_server_st *server)
405 {
406 bool timeout_error_occured= false;
407
408 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
409 WATCHPOINT_ASSERT(server->cursor_active == 0);
410
411 /*
412 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
413 */
414 if (server->address_info == NULL or server->address_info_next == NULL)
415 {
416 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
417 server->address_info_next= NULL;
418 memcached_return_t rc;
419 uint32_t counter= 5;
420 while (--counter)
421 {
422 if ((rc= set_hostinfo(server)) != MEMCACHED_TIMEOUT)
423 {
424 break;
425 }
426
427 #ifndef WIN32
428 struct timespec dream, rem;
429
430 dream.tv_nsec= 1000;
431 dream.tv_sec= 0;
432
433 nanosleep(&dream, &rem);
434 #endif
435 }
436
437 if (memcached_failed(rc))
438 {
439 return rc;
440 }
441 }
442
443 if (server->address_info_next == NULL)
444 {
445 server->address_info_next= server->address_info;
446 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
447 }
448
449 /* Create the socket */
450 while (server->address_info_next and server->fd == INVALID_SOCKET)
451 {
452 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
453 if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
454 {
455 server->address_info_next= server->address_info_next->ai_next;
456 continue;
457 }
458
459 if ((server->fd= socket(server->address_info_next->ai_family,
460 server->address_info_next->ai_socktype,
461 server->address_info_next->ai_protocol)) < 0)
462 {
463 return memcached_set_errno(*server, get_socket_errno(), NULL);
464 }
465
466 set_socket_options(server);
467
468 /* connect to server */
469 if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
470 {
471 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
472 return MEMCACHED_SUCCESS;
473 }
474
475 /* An error occurred */
476 switch (get_socket_errno())
477 {
478 case ETIMEDOUT:
479 timeout_error_occured= true;
480 break;
481
482 case EWOULDBLOCK:
483 case EINPROGRESS: // nonblocking mode - first return
484 case EALREADY: // nonblocking mode - subsequent returns
485 {
486 server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
487 memcached_return_t rc= connect_poll(server);
488
489 if (memcached_success(rc))
490 {
491 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
492 return MEMCACHED_SUCCESS;
493 }
494
495 // A timeout here is treated as an error, we will not retry
496 if (rc == MEMCACHED_TIMEOUT)
497 {
498 timeout_error_occured= true;
499 }
500 }
501 break;
502
503 case EISCONN: // we are connected :-)
504 WATCHPOINT_ASSERT(0); // This is a programmer's error
505 break;
506
507 case EINTR: // Special case, we retry ai_addr
508 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
509 (void)closesocket(server->fd);
510 server->fd= INVALID_SOCKET;
511 continue;
512
513 default:
514 break;
515 }
516
517 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
518 (void)closesocket(server->fd);
519 server->fd= INVALID_SOCKET;
520 server->address_info_next= server->address_info_next->ai_next;
521 }
522
523 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
524
525 if (timeout_error_occured)
526 {
527 if (server->fd != INVALID_SOCKET)
528 {
529 (void)closesocket(server->fd);
530 server->fd= INVALID_SOCKET;
531 }
532 }
533
534 WATCHPOINT_STRING("Never got a good file descriptor");
535
536 if (memcached_has_current_error(*server))
537 {
538 return memcached_server_error_return(server);
539 }
540
541 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
542 {
543 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
544 }
545
546 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
547 }
548
549
550 /*
551 backoff_handling()
552
553 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
554 we get caught spending cycles just waiting.
555 */
556 static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
557 {
558 struct timeval curr_time;
559 bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
560
561 /*
562 If we hit server_failure_limit then something is completely wrong about the server.
563
564 1) If autoeject is enabled we do that.
565 2) If not? We go into timeout again, there is much else to do :(
566 */
567 if (server->server_failure_counter >= server->root->server_failure_limit)
568 {
569 /*
570 We just auto_eject if we hit this point
571 */
572 if (_is_auto_eject_host(server->root))
573 {
574 set_last_disconnected_host(server);
575
576 // Retry dead servers if requested
577 if (_gettime_success and server->root->dead_timeout > 0)
578 {
579 server->next_retry= curr_time.tv_sec +server->root->dead_timeout;
580
581 // We only retry dead servers once before assuming failure again
582 server->server_failure_counter= server->root->server_failure_limit -1;
583 }
584
585 memcached_return_t rc;
586 if (memcached_failed(rc= run_distribution((memcached_st *)server->root)))
587 {
588 return memcached_set_error(*server, rc, MEMCACHED_AT, memcached_literal_param("Backoff handling failed during run_distribution"));
589 }
590
591 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
592 }
593
594 server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;
595
596 // Sanity check/setting
597 if (server->next_retry == 0)
598 {
599 server->next_retry= 1;
600 }
601 }
602
603 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
604 {
605 /*
606 If next_retry is less then our current time, then we reset and try everything again.
607 */
608 if (_gettime_success and server->next_retry < curr_time.tv_sec)
609 {
610 server->state= MEMCACHED_SERVER_STATE_NEW;
611 }
612 else
613 {
614 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
615 }
616
617 in_timeout= true;
618 }
619
620 return MEMCACHED_SUCCESS;
621 }
622
623 static memcached_return_t _memcached_connect(memcached_server_write_instance_st server, const bool set_last_disconnected)
624 {
625 if (server->fd != INVALID_SOCKET)
626 {
627 return MEMCACHED_SUCCESS;
628 }
629
630 LIBMEMCACHED_MEMCACHED_CONNECT_START();
631
632 bool in_timeout= false;
633 memcached_return_t rc;
634 if (memcached_failed(rc= backoff_handling(server, in_timeout)))
635 {
636 set_last_disconnected_host(server);
637 return rc;
638 }
639
640 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root))
641 {
642 return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
643 }
644
645 if (server->hostname[0] == '/')
646 {
647 server->type= MEMCACHED_CONNECTION_UNIX_SOCKET;
648 }
649
650 /* We need to clean up the multi startup piece */
651 switch (server->type)
652 {
653 case MEMCACHED_CONNECTION_UDP:
654 case MEMCACHED_CONNECTION_TCP:
655 rc= network_connect(server);
656
657 if (LIBMEMCACHED_WITH_SASL_SUPPORT)
658 {
659 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks)
660 {
661 rc= memcached_sasl_authenticate_connection(server);
662 if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
663 {
664 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
665 (void)closesocket(server->fd);
666 server->fd= INVALID_SOCKET;
667 }
668 }
669 }
670 break;
671
672 case MEMCACHED_CONNECTION_UNIX_SOCKET:
673 rc= unix_socket_connect(server);
674 break;
675 }
676
677 if (memcached_success(rc))
678 {
679 memcached_mark_server_as_clean(server);
680 return rc;
681 }
682 else if (set_last_disconnected)
683 {
684 set_last_disconnected_host(server);
685 if (memcached_has_current_error(*server))
686 {
687 memcached_mark_server_for_timeout(server);
688 assert(memcached_failed(memcached_server_error_return(server)));
689 }
690 else
691 {
692 memcached_set_error(*server, rc, MEMCACHED_AT);
693 memcached_mark_server_for_timeout(server);
694 }
695
696 LIBMEMCACHED_MEMCACHED_CONNECT_END();
697
698 if (in_timeout)
699 {
700 char buffer[1024];
701 int snprintf_length= snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname, int(server->port));
702 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT, buffer, snprintf_length);
703 }
704 }
705
706 return rc;
707 }
708
709 memcached_return_t memcached_connect_try(memcached_server_write_instance_st server)
710 {
711 return _memcached_connect(server, false);
712 }
713
714 memcached_return_t memcached_connect(memcached_server_write_instance_st server)
715 {
716 return _memcached_connect(server, true);
717 }