Updating for 1.0.2 release
[awesomized/libmemcached] / libmemcached / connect.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 #include <cassert>
42 #include <ctime>
43 #include <sys/time.h>
44
45 static memcached_return_t connect_poll(memcached_server_st *server)
46 {
47 struct pollfd fds[1];
48 fds[0].fd= server->fd;
49 fds[0].events= POLLOUT;
50
51 size_t loop_max= 5;
52
53 if (server->root->poll_timeout == 0)
54 {
55 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
56 }
57
58 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
59 {
60 int error= poll(fds, 1, server->root->connect_timeout);
61 switch (error)
62 {
63 case 1:
64 {
65 int err;
66 socklen_t len= sizeof (err);
67 (void)getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len);
68
69 // We check the value to see what happened wth the socket.
70 if (err == 0)
71 {
72 return MEMCACHED_SUCCESS;
73 }
74
75 return memcached_set_errno(*server, err, MEMCACHED_AT);
76 }
77 case 0:
78 {
79 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
80 }
81
82 default: // A real error occurred and we need to completely bail
83 switch (get_socket_errno())
84 {
85 #ifdef TARGET_OS_LINUX
86 case ERESTART:
87 #endif
88 case EINTR:
89 continue;
90
91 case EFAULT:
92 case ENOMEM:
93 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
94
95 case EINVAL:
96 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
97
98 default: // This should not happen
99 if (fds[0].revents & POLLERR)
100 {
101 int err;
102 socklen_t len= sizeof (err);
103 (void)getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len);
104 memcached_set_errno(*server, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT);
105 }
106 else
107 {
108 memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
109 }
110
111 assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
112 (void)closesocket(server->fd);
113 server->fd= INVALID_SOCKET;
114 server->state= MEMCACHED_SERVER_STATE_NEW;
115
116 return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
117 }
118 }
119 }
120
121 // This should only be possible from ERESTART or EINTR;
122 return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
123 }
124
125 static memcached_return_t set_hostinfo(memcached_server_st *server)
126 {
127 if (server->address_info)
128 {
129 freeaddrinfo(server->address_info);
130 server->address_info= NULL;
131 server->address_info_next= NULL;
132 }
133
134 char str_port[NI_MAXSERV];
135 int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
136 if (length >= NI_MAXSERV or length < 0)
137 {
138 return MEMCACHED_FAILURE;
139 }
140
141 struct addrinfo hints;
142 memset(&hints, 0, sizeof(struct addrinfo));
143
144 #if 0
145 hints.ai_family= AF_INET;
146 #endif
147 if (memcached_is_udp(server->root))
148 {
149 hints.ai_protocol= IPPROTO_UDP;
150 hints.ai_socktype= SOCK_DGRAM;
151 }
152 else
153 {
154 hints.ai_socktype= SOCK_STREAM;
155 hints.ai_protocol= IPPROTO_TCP;
156 }
157
158 server->address_info= NULL;
159 int errcode;
160 switch(errcode= getaddrinfo(server->hostname, str_port, &hints, &server->address_info))
161 {
162 case 0:
163 break;
164
165 case EAI_AGAIN:
166 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
167
168 case EAI_SYSTEM:
169 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
170
171 case EAI_BADFLAGS:
172 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
173
174 case EAI_MEMORY:
175 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
176
177 default:
178 {
179 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
180 }
181 }
182 server->address_info_next= server->address_info;
183 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
184
185 return MEMCACHED_SUCCESS;
186 }
187
188 static inline void set_socket_nonblocking(memcached_server_st *server)
189 {
190 #ifdef WIN32
191 u_long arg= 1;
192 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR)
193 {
194 memcached_set_errno(*server, get_socket_errno(), NULL);
195 }
196 #else
197 int flags;
198
199 do
200 {
201 flags= fcntl(server->fd, F_GETFL, 0);
202 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
203
204 if (flags == -1)
205 {
206 memcached_set_errno(*server, errno, NULL);
207 }
208 else if ((flags & O_NONBLOCK) == 0)
209 {
210 int rval;
211
212 do
213 {
214 rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
215 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
216
217 if (rval == -1)
218 {
219 memcached_set_errno(*server, errno, NULL);
220 }
221 }
222 #endif
223 }
224
225 static void set_socket_options(memcached_server_st *server)
226 {
227 assert_msg(server->fd != -1, "invalid socket was passed to set_socket_options()");
228
229 if (memcached_is_udp(server->root))
230 {
231 return;
232 }
233
234 #ifdef HAVE_SNDTIMEO
235 if (server->root->snd_timeout)
236 {
237 int error;
238 struct timeval waittime;
239
240 waittime.tv_sec= 0;
241 waittime.tv_usec= server->root->snd_timeout;
242
243 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
244 &waittime, (socklen_t)sizeof(struct timeval));
245 WATCHPOINT_ASSERT(error == 0);
246 }
247 #endif
248
249 #ifdef HAVE_RCVTIMEO
250 if (server->root->rcv_timeout)
251 {
252 int error;
253 struct timeval waittime;
254
255 waittime.tv_sec= 0;
256 waittime.tv_usec= server->root->rcv_timeout;
257
258 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
259 &waittime, (socklen_t)sizeof(struct timeval));
260 WATCHPOINT_ASSERT(error == 0);
261 }
262 #endif
263
264
265 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
266 {
267 int set= 1;
268 int error= setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
269
270 // This is not considered a fatal error
271 if (error == -1)
272 {
273 WATCHPOINT_ERRNO(get_socket_errno());
274 perror("setsockopt(SO_NOSIGPIPE)");
275 }
276 }
277 #endif
278
279 if (server->root->flags.no_block)
280 {
281 int error;
282 struct linger linger;
283
284 linger.l_onoff= 1;
285 linger.l_linger= 0; /* By default on close() just drop the socket */
286 error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
287 &linger, (socklen_t)sizeof(struct linger));
288 WATCHPOINT_ASSERT(error == 0);
289 }
290
291 if (server->root->flags.tcp_nodelay)
292 {
293 int flag= 1;
294 int error;
295
296 error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
297 &flag, (socklen_t)sizeof(int));
298 WATCHPOINT_ASSERT(error == 0);
299 }
300
301 if (server->root->flags.tcp_keepalive)
302 {
303 int flag= 1;
304 int error;
305
306 error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
307 &flag, (socklen_t)sizeof(int));
308 WATCHPOINT_ASSERT(error == 0);
309 }
310
311 #ifdef TCP_KEEPIDLE
312 if (server->root->tcp_keepidle > 0)
313 {
314 int error;
315
316 error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
317 &server->root->tcp_keepidle, (socklen_t)sizeof(int));
318 WATCHPOINT_ASSERT(error == 0);
319 }
320 #endif
321
322 if (server->root->send_size > 0)
323 {
324 int error;
325
326 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
327 &server->root->send_size, (socklen_t)sizeof(int));
328 WATCHPOINT_ASSERT(error == 0);
329 }
330
331 if (server->root->recv_size > 0)
332 {
333 int error;
334
335 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
336 &server->root->recv_size, (socklen_t)sizeof(int));
337 WATCHPOINT_ASSERT(error == 0);
338 }
339
340
341 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
342 set_socket_nonblocking(server);
343 }
344
345 static memcached_return_t unix_socket_connect(memcached_server_st *server)
346 {
347 #ifndef WIN32
348 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
349
350 if ((server->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
351 {
352 memcached_set_errno(*server, errno, NULL);
353 return MEMCACHED_CONNECTION_FAILURE;
354 }
355
356 struct sockaddr_un servAddr;
357
358 memset(&servAddr, 0, sizeof (struct sockaddr_un));
359 servAddr.sun_family= AF_UNIX;
360 strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
361
362 do {
363 if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
364 {
365 switch (errno)
366 {
367 case EINPROGRESS:
368 case EALREADY:
369 case EINTR:
370 continue;
371
372 case EISCONN: /* We were spinning waiting on connect */
373 {
374 WATCHPOINT_ASSERT(0); // Programmer error
375 break;
376 }
377
378 default:
379 WATCHPOINT_ERRNO(errno);
380 memcached_set_errno(*server, errno, MEMCACHED_AT);
381 return MEMCACHED_CONNECTION_FAILURE;
382 }
383 }
384 } while (0);
385 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
386
387 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
388
389 return MEMCACHED_SUCCESS;
390 #else
391 (void)server;
392 return MEMCACHED_NOT_SUPPORTED;
393 #endif
394 }
395
396 static memcached_return_t network_connect(memcached_server_st *server)
397 {
398 bool timeout_error_occured= false;
399
400 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
401 WATCHPOINT_ASSERT(server->cursor_active == 0);
402
403 /*
404 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
405 */
406 if (server->address_info == NULL or server->address_info_next == NULL)
407 {
408 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
409 server->address_info_next= NULL;
410 memcached_return_t rc;
411 uint32_t counter= 5;
412 while (--counter)
413 {
414 if ((rc= set_hostinfo(server)) != MEMCACHED_TIMEOUT)
415 {
416 break;
417 }
418
419 #ifndef WIN32
420 struct timespec dream, rem;
421
422 dream.tv_nsec= 1000;
423 dream.tv_sec= 0;
424
425 nanosleep(&dream, &rem);
426 #endif
427 }
428
429 if (memcached_failed(rc))
430 {
431 return rc;
432 }
433 }
434
435 if (server->address_info_next == NULL)
436 {
437 server->address_info_next= server->address_info;
438 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
439 }
440
441 /* Create the socket */
442 while (server->address_info_next and server->fd == INVALID_SOCKET)
443 {
444 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
445 if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
446 {
447 server->address_info_next= server->address_info_next->ai_next;
448 continue;
449 }
450
451 if ((server->fd= socket(server->address_info_next->ai_family,
452 server->address_info_next->ai_socktype,
453 server->address_info_next->ai_protocol)) < 0)
454 {
455 return memcached_set_errno(*server, get_socket_errno(), NULL);
456 }
457
458 set_socket_options(server);
459
460 /* connect to server */
461 if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
462 {
463 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
464 return MEMCACHED_SUCCESS;
465 }
466
467 /* An error occurred */
468 switch (get_socket_errno())
469 {
470 case ETIMEDOUT:
471 timeout_error_occured= true;
472 break;
473
474 case EWOULDBLOCK:
475 case EINPROGRESS: // nonblocking mode - first return
476 case EALREADY: // nonblocking mode - subsequent returns
477 {
478 server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
479 memcached_return_t rc= connect_poll(server);
480
481 if (memcached_success(rc))
482 {
483 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
484 return MEMCACHED_SUCCESS;
485 }
486
487 // A timeout here is treated as an error, we will not retry
488 if (rc == MEMCACHED_TIMEOUT)
489 {
490 timeout_error_occured= true;
491 }
492 }
493 break;
494
495 case EISCONN: // we are connected :-)
496 WATCHPOINT_ASSERT(0); // This is a programmer's error
497 break;
498
499 case EINTR: // Special case, we retry ai_addr
500 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
501 (void)closesocket(server->fd);
502 server->fd= INVALID_SOCKET;
503 continue;
504
505 default:
506 break;
507 }
508
509 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
510 (void)closesocket(server->fd);
511 server->fd= INVALID_SOCKET;
512 server->address_info_next= server->address_info_next->ai_next;
513 }
514
515 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
516
517 if (timeout_error_occured)
518 {
519 if (server->fd != INVALID_SOCKET)
520 {
521 (void)closesocket(server->fd);
522 server->fd= INVALID_SOCKET;
523 }
524 }
525
526 WATCHPOINT_STRING("Never got a good file descriptor");
527
528 if (memcached_has_current_error(*server))
529 {
530 return memcached_server_error_return(server);
531 }
532
533 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
534 {
535 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
536 }
537
538 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
539 }
540
541
542 /*
543 backoff_handling()
544
545 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
546 we get caught spending cycles just waiting.
547 */
548 static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
549 {
550 /*
551 If we hit server_failure_limit then something is completely wrong about the server.
552
553 1) If autoeject is enabled we do that.
554 2) If not? We go into timeout again, there is much else to do :(
555 */
556 if (server->server_failure_counter >= server->root->server_failure_limit)
557 {
558 /*
559 We just auto_eject if we hit this point
560 */
561 if (_is_auto_eject_host(server->root))
562 {
563 set_last_disconnected_host(server);
564 run_distribution((memcached_st *)server->root);
565
566 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
567 }
568
569 server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;
570
571 // Sanity check/setting
572 if (server->next_retry == 0)
573 {
574 server->next_retry= 1;
575 }
576 }
577
578 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
579 {
580 struct timeval curr_time;
581 bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
582
583 /*
584 If next_retry is less then our current time, then we reset and try everything again.
585 */
586 if (_gettime_success and server->next_retry < curr_time.tv_sec)
587 {
588 server->state= MEMCACHED_SERVER_STATE_NEW;
589 }
590 else
591 {
592 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
593 }
594
595 in_timeout= true;
596 }
597
598 return MEMCACHED_SUCCESS;
599 }
600
601 memcached_return_t memcached_connect(memcached_server_write_instance_st server)
602 {
603 if (server->fd != INVALID_SOCKET)
604 {
605 return MEMCACHED_SUCCESS;
606 }
607
608 LIBMEMCACHED_MEMCACHED_CONNECT_START();
609
610 bool in_timeout= false;
611 memcached_return_t rc;
612 if (memcached_failed(rc= backoff_handling(server, in_timeout)))
613 {
614 set_last_disconnected_host(server);
615 return rc;
616 }
617
618 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root))
619 {
620 return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
621 }
622
623 /* We need to clean up the multi startup piece */
624 switch (server->type)
625 {
626 case MEMCACHED_CONNECTION_UDP:
627 case MEMCACHED_CONNECTION_TCP:
628 rc= network_connect(server);
629
630 if (LIBMEMCACHED_WITH_SASL_SUPPORT)
631 {
632 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks)
633 {
634 rc= memcached_sasl_authenticate_connection(server);
635 if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
636 {
637 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
638 (void)closesocket(server->fd);
639 server->fd= INVALID_SOCKET;
640 }
641 }
642 }
643 break;
644
645 case MEMCACHED_CONNECTION_UNIX_SOCKET:
646 rc= unix_socket_connect(server);
647 break;
648 }
649
650 if (memcached_success(rc))
651 {
652 memcached_mark_server_as_clean(server);
653 return rc;
654 }
655
656 set_last_disconnected_host(server);
657 if (memcached_has_current_error(*server))
658 {
659 memcached_mark_server_for_timeout(server);
660 assert(memcached_failed(memcached_server_error_return(server)));
661 }
662 else
663 {
664 memcached_set_error(*server, rc, MEMCACHED_AT);
665 memcached_mark_server_for_timeout(server);
666 }
667
668 LIBMEMCACHED_MEMCACHED_CONNECT_END();
669
670 if (in_timeout)
671 {
672 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
673 }
674
675 return rc;
676 }