Merge in all of the build tree.
[m6w6/libmemcached] / libmemcached / connect.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 #include <cassert>
42 #include <ctime>
43 #include <sys/time.h>
44
45 static memcached_return_t connect_poll(memcached_server_st *server)
46 {
47 struct pollfd fds[1];
48 fds[0].fd= server->fd;
49 fds[0].events= POLLOUT;
50
51 size_t loop_max= 5;
52
53 if (server->root->poll_timeout == 0)
54 {
55 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
56 }
57
58 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
59 {
60 int error= poll(fds, 1, server->root->connect_timeout);
61 switch (error)
62 {
63 case 1:
64 {
65 int err;
66 socklen_t len= sizeof (err);
67 (void)getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len);
68
69 // We check the value to see what happened wth the socket.
70 if (err == 0)
71 {
72 return MEMCACHED_SUCCESS;
73 }
74
75 return memcached_set_errno(*server, err, MEMCACHED_AT);
76 }
77 case 0:
78 {
79 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
80 }
81
82 default: // A real error occurred and we need to completely bail
83 switch (get_socket_errno())
84 {
85 #ifdef TARGET_OS_LINUX
86 case ERESTART:
87 #endif
88 case EINTR:
89 continue;
90
91 case EFAULT:
92 case ENOMEM:
93 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
94
95 case EINVAL:
96 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
97
98 default: // This should not happen
99 if (fds[0].revents & POLLERR)
100 {
101 int err;
102 socklen_t len= sizeof (err);
103 (void)getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len);
104 memcached_set_errno(*server, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT);
105 }
106 else
107 {
108 memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
109 }
110
111 assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
112 (void)closesocket(server->fd);
113 server->fd= INVALID_SOCKET;
114 server->state= MEMCACHED_SERVER_STATE_NEW;
115
116 return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
117 }
118 }
119 }
120
121 // This should only be possible from ERESTART or EINTR;
122 return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
123 }
124
125 static memcached_return_t set_hostinfo(memcached_server_st *server)
126 {
127 if (server->address_info)
128 {
129 freeaddrinfo(server->address_info);
130 server->address_info= NULL;
131 server->address_info_next= NULL;
132 }
133
134 char str_port[NI_MAXSERV];
135 int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
136 if (length >= NI_MAXSERV or length < 0)
137 {
138 return MEMCACHED_FAILURE;
139 }
140
141 struct addrinfo hints;
142 memset(&hints, 0, sizeof(struct addrinfo));
143
144 #if 0
145 hints.ai_family= AF_INET;
146 #endif
147 if (server->type == MEMCACHED_CONNECTION_UDP)
148 {
149 hints.ai_protocol= IPPROTO_UDP;
150 hints.ai_socktype= SOCK_DGRAM;
151 }
152 else
153 {
154 hints.ai_socktype= SOCK_STREAM;
155 hints.ai_protocol= IPPROTO_TCP;
156 }
157
158 server->address_info= NULL;
159 int errcode;
160 switch(errcode= getaddrinfo(server->hostname, str_port, &hints, &server->address_info))
161 {
162 case 0:
163 break;
164
165 case EAI_AGAIN:
166 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
167
168 case EAI_SYSTEM:
169 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
170
171 case EAI_BADFLAGS:
172 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
173
174 case EAI_MEMORY:
175 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
176
177 default:
178 {
179 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
180 }
181 }
182 server->address_info_next= server->address_info;
183 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
184
185 return MEMCACHED_SUCCESS;
186 }
187
188 static inline void set_socket_nonblocking(memcached_server_st *server)
189 {
190 #ifdef WIN32
191 u_long arg= 1;
192 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR)
193 {
194 memcached_set_errno(*server, get_socket_errno(), NULL);
195 }
196 #else
197 int flags;
198
199 do
200 {
201 flags= fcntl(server->fd, F_GETFL, 0);
202 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
203
204 if (flags == -1)
205 {
206 memcached_set_errno(*server, errno, NULL);
207 }
208 else if ((flags & O_NONBLOCK) == 0)
209 {
210 int rval;
211
212 do
213 {
214 rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
215 } while (rval == -1 && (errno == EINTR || errno == EAGAIN));
216
217 unlikely (rval == -1)
218 {
219 memcached_set_errno(*server, errno, NULL);
220 }
221 }
222 #endif
223 }
224
225 static void set_socket_options(memcached_server_st *server)
226 {
227 assert_msg(server->fd != -1, "invalid socket was passed to set_socket_options()");
228
229 if (server->type == MEMCACHED_CONNECTION_UDP)
230 {
231 return;
232 }
233
234 #ifdef HAVE_SNDTIMEO
235 if (server->root->snd_timeout)
236 {
237 int error;
238 struct timeval waittime;
239
240 waittime.tv_sec= 0;
241 waittime.tv_usec= server->root->snd_timeout;
242
243 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
244 &waittime, (socklen_t)sizeof(struct timeval));
245 WATCHPOINT_ASSERT(error == 0);
246 }
247 #endif
248
249 #ifdef HAVE_RCVTIMEO
250 if (server->root->rcv_timeout)
251 {
252 int error;
253 struct timeval waittime;
254
255 waittime.tv_sec= 0;
256 waittime.tv_usec= server->root->rcv_timeout;
257
258 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
259 &waittime, (socklen_t)sizeof(struct timeval));
260 WATCHPOINT_ASSERT(error == 0);
261 }
262 #endif
263
264
265 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
266 {
267 int set= 1;
268 int error= setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
269
270 // This is not considered a fatal error
271 if (error == -1)
272 {
273 WATCHPOINT_ERRNO(get_socket_errno());
274 perror("setsockopt(SO_NOSIGPIPE)");
275 }
276 }
277 #endif
278
279 if (server->root->flags.no_block)
280 {
281 int error;
282 struct linger linger;
283
284 linger.l_onoff= 1;
285 linger.l_linger= 0; /* By default on close() just drop the socket */
286 error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
287 &linger, (socklen_t)sizeof(struct linger));
288 WATCHPOINT_ASSERT(error == 0);
289 }
290
291 if (server->root->flags.tcp_nodelay)
292 {
293 int flag= 1;
294 int error;
295
296 error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
297 &flag, (socklen_t)sizeof(int));
298 WATCHPOINT_ASSERT(error == 0);
299 }
300
301 if (server->root->flags.tcp_keepalive)
302 {
303 int flag= 1;
304 int error;
305
306 error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
307 &flag, (socklen_t)sizeof(int));
308 WATCHPOINT_ASSERT(error == 0);
309 }
310
311 #ifdef TCP_KEEPIDLE
312 if (server->root->tcp_keepidle > 0)
313 {
314 int error;
315
316 error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
317 &server->root->tcp_keepidle, (socklen_t)sizeof(int));
318 WATCHPOINT_ASSERT(error == 0);
319 }
320 #endif
321
322 if (server->root->send_size > 0)
323 {
324 int error;
325
326 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
327 &server->root->send_size, (socklen_t)sizeof(int));
328 WATCHPOINT_ASSERT(error == 0);
329 }
330
331 if (server->root->recv_size > 0)
332 {
333 int error;
334
335 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
336 &server->root->recv_size, (socklen_t)sizeof(int));
337 WATCHPOINT_ASSERT(error == 0);
338 }
339
340
341 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
342 set_socket_nonblocking(server);
343 }
344
345 static memcached_return_t unix_socket_connect(memcached_server_st *server)
346 {
347 #ifndef WIN32
348 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
349
350 if ((server->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
351 {
352 memcached_set_errno(*server, errno, NULL);
353 return MEMCACHED_CONNECTION_FAILURE;
354 }
355
356 struct sockaddr_un servAddr;
357
358 memset(&servAddr, 0, sizeof (struct sockaddr_un));
359 servAddr.sun_family= AF_UNIX;
360 strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
361
362 do {
363 if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
364 {
365 switch (errno)
366 {
367 case EINPROGRESS:
368 case EALREADY:
369 case EINTR:
370 continue;
371
372 case EISCONN: /* We were spinning waiting on connect */
373 {
374 WATCHPOINT_ASSERT(0); // Programmer error
375 break;
376 }
377
378 default:
379 WATCHPOINT_ERRNO(errno);
380 memcached_set_errno(*server, errno, MEMCACHED_AT);
381 return MEMCACHED_CONNECTION_FAILURE;
382 }
383 }
384 } while (0);
385 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
386
387 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
388
389 return MEMCACHED_SUCCESS;
390 #else
391 (void)server;
392 return MEMCACHED_NOT_SUPPORTED;
393 #endif
394 }
395
396 static memcached_return_t network_connect(memcached_server_st *server)
397 {
398 bool timeout_error_occured= false;
399
400 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
401 WATCHPOINT_ASSERT(server->cursor_active == 0);
402
403 if (server->address_info == NULL or server->address_info_next == NULL)
404 {
405 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
406 memcached_return_t rc;
407 uint32_t counter= 5;
408 while (--counter)
409 {
410 if ((rc= set_hostinfo(server)) != MEMCACHED_TIMEOUT)
411 {
412 break;
413 }
414
415 #ifndef WIN32
416 struct timespec dream, rem;
417
418 dream.tv_nsec= 1000;
419 dream.tv_sec= 0;
420
421 nanosleep(&dream, &rem);
422 #endif
423 }
424
425 if (memcached_failed(rc))
426 {
427 return rc;
428 }
429 }
430
431 /* Create the socket */
432 while (server->address_info_next and server->fd == INVALID_SOCKET)
433 {
434 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
435 if (server->type == MEMCACHED_CONNECTION_UDP && server->address_info_next->ai_family != AF_INET)
436 {
437 server->address_info_next= server->address_info_next->ai_next;
438 continue;
439 }
440
441 if ((server->fd= socket(server->address_info_next->ai_family,
442 server->address_info_next->ai_socktype,
443 server->address_info_next->ai_protocol)) < 0)
444 {
445 return memcached_set_errno(*server, get_socket_errno(), NULL);
446 }
447
448 set_socket_options(server);
449
450 /* connect to server */
451 if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
452 {
453 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
454 return MEMCACHED_SUCCESS;
455 }
456
457 /* An error occurred */
458 switch (get_socket_errno())
459 {
460 case ETIMEDOUT:
461 timeout_error_occured= true;
462 break;
463
464 case EWOULDBLOCK:
465 case EINPROGRESS: // nonblocking mode - first return
466 case EALREADY: // nonblocking mode - subsequent returns
467 {
468 server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
469 memcached_return_t rc= connect_poll(server);
470
471 if (memcached_success(rc))
472 {
473 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
474 return MEMCACHED_SUCCESS;
475 }
476
477 // A timeout here is treated as an error, we will not retry
478 if (rc == MEMCACHED_TIMEOUT)
479 {
480 timeout_error_occured= true;
481 }
482 }
483 break;
484
485 case EISCONN: // we are connected :-)
486 WATCHPOINT_ASSERT(0); // This is a programmer's error
487 break;
488
489 case EINTR: // Special case, we retry ai_addr
490 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
491 (void)closesocket(server->fd);
492 server->fd= INVALID_SOCKET;
493 continue;
494
495 default:
496 break;
497 }
498
499 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
500 (void)closesocket(server->fd);
501 server->fd= INVALID_SOCKET;
502 server->address_info_next= server->address_info_next->ai_next;
503 }
504
505 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
506
507 if (timeout_error_occured)
508 {
509 if (server->fd != INVALID_SOCKET)
510 {
511 (void)closesocket(server->fd);
512 server->fd= INVALID_SOCKET;
513 }
514 }
515
516 WATCHPOINT_STRING("Never got a good file descriptor");
517
518 if (memcached_has_current_error(*server))
519 {
520 return memcached_server_error_return(server);
521 }
522
523 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
524 {
525 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
526 }
527
528 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
529 }
530
531
532 /*
533 backoff_handling()
534
535 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
536 we get caught spending cycles just waiting.
537 */
538 static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
539 {
540 /*
541 If we hit server_failure_limit then something is completely wrong about the server.
542
543 1) If autoeject is enabled we do that.
544 2) If not? We go into timeout again, there is much else to do :(
545 */
546 if (server->server_failure_counter >= server->root->server_failure_limit)
547 {
548 /*
549 We just auto_eject if we hit this point
550 */
551 if (_is_auto_eject_host(server->root))
552 {
553 set_last_disconnected_host(server);
554 run_distribution((memcached_st *)server->root);
555
556 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
557 }
558
559 server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;
560
561 // Sanity check/setting
562 if (server->next_retry == 0)
563 {
564 server->next_retry= 1;
565 }
566 }
567
568 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
569 {
570 struct timeval curr_time;
571 bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
572
573 /*
574 If next_retry is less then our current time, then we reset and try everything again.
575 */
576 if (_gettime_success and server->next_retry < curr_time.tv_sec)
577 {
578 server->state= MEMCACHED_SERVER_STATE_NEW;
579 }
580 else
581 {
582 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
583 }
584
585 in_timeout= true;
586 }
587
588 return MEMCACHED_SUCCESS;
589 }
590
591 memcached_return_t memcached_connect(memcached_server_write_instance_st server)
592 {
593 if (server->fd != INVALID_SOCKET)
594 {
595 return MEMCACHED_SUCCESS;
596 }
597
598 LIBMEMCACHED_MEMCACHED_CONNECT_START();
599
600 bool in_timeout= false;
601 memcached_return_t rc;
602 if (memcached_failed(rc= backoff_handling(server, in_timeout)))
603 {
604 set_last_disconnected_host(server);
605 return rc;
606 }
607
608 /* We need to clean up the multi startup piece */
609 switch (server->type)
610 {
611 case MEMCACHED_CONNECTION_UDP:
612 case MEMCACHED_CONNECTION_TCP:
613 rc= network_connect(server);
614
615 if (LIBMEMCACHED_WITH_SASL_SUPPORT)
616 {
617 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks)
618 {
619 rc= memcached_sasl_authenticate_connection(server);
620 if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
621 {
622 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
623 (void)closesocket(server->fd);
624 server->fd= INVALID_SOCKET;
625 }
626 }
627 }
628 break;
629
630 case MEMCACHED_CONNECTION_UNIX_SOCKET:
631 rc= unix_socket_connect(server);
632 break;
633 }
634
635 if (memcached_success(rc))
636 {
637 memcached_mark_server_as_clean(server);
638 return rc;
639 }
640
641 set_last_disconnected_host(server);
642 if (memcached_has_current_error(*server))
643 {
644 memcached_mark_server_for_timeout(server);
645 assert(memcached_failed(memcached_server_error_return(server)));
646 }
647 else
648 {
649 memcached_set_error(*server, rc, MEMCACHED_AT);
650 memcached_mark_server_for_timeout(server);
651 }
652
653 LIBMEMCACHED_MEMCACHED_CONNECT_END();
654
655 if (in_timeout)
656 {
657 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
658 }
659
660 return rc;
661 }