22e259f41f3335fdfb75dcd23a040bfef4d00cb5
[awesomized/libmemcached] / libmemcached / connect.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 #include <cassert>
42
43 #ifndef SOCK_CLOEXEC
44 # define SOCK_CLOEXEC 0
45 #endif
46
47 #ifndef SOCK_NONBLOCK
48 # define SOCK_NONBLOCK 0
49 #endif
50
51 #ifndef FD_CLOEXEC
52 # define FD_CLOEXEC 0
53 #endif
54
55 #ifndef SO_NOSIGPIPE
56 # define SO_NOSIGPIPE 0
57 #endif
58
59 #ifndef TCP_NODELAY
60 # define TCP_NODELAY 0
61 #endif
62
63 #ifndef TCP_KEEPIDLE
64 # define TCP_KEEPIDLE 0
65 #endif
66
67 static memcached_return_t connect_poll(org::libmemcached::Instance* server, const int connection_error)
68 {
69 struct pollfd fds[1];
70 fds[0].fd= server->fd;
71 fds[0].events= server->events();
72 fds[0].revents= 0;
73
74 size_t loop_max= 5;
75
76 if (server->root->poll_timeout == 0)
77 {
78 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
79 memcached_literal_param("The time to wait for a connection to be established was set to zero, which means it will always timeout (MEMCACHED_TIMEOUT)."));
80 }
81
82 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
83 {
84 int number_of;
85 if ((number_of= poll(fds, 1, server->root->connect_timeout)) == -1)
86 {
87 int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
88 switch (local_errno)
89 {
90 #ifdef TARGET_OS_LINUX
91 case ERESTART:
92 #endif
93 case EINTR:
94 continue;
95
96 case EFAULT:
97 case ENOMEM:
98 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
99
100 case EINVAL:
101 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
102 memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
103
104 default: // This should not happen
105 break;
106 #if 0
107 if (fds[0].revents & POLLERR)
108 {
109 int err;
110 socklen_t len= sizeof(err);
111 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == 0)
112 {
113 if (err == 0)
114 {
115 // This should never happen, if it does? Punt.
116 continue;
117 }
118 local_errno= err;
119 }
120 }
121 #endif
122 }
123
124 assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
125 server->reset_socket();
126 server->state= MEMCACHED_SERVER_STATE_NEW;
127
128 return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
129 }
130
131 if (number_of == 0)
132 {
133 if (connection_error == EINPROGRESS)
134 {
135 int err;
136 socklen_t len= sizeof(err);
137 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == -1)
138 {
139 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
140 }
141
142 // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
143 if (err != 0)
144 {
145 return memcached_set_errno(*server, err, MEMCACHED_AT, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
146 }
147 }
148
149 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
150 }
151
152 #if 0
153 server->revents(fds[0].revents);
154 #endif
155
156 assert (number_of == 1);
157
158 if (fds[0].revents & POLLERR or
159 fds[0].revents & POLLHUP or
160 fds[0].revents & POLLNVAL)
161 {
162 int err;
163 socklen_t len= sizeof (err);
164 if (getsockopt(fds[0].fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == -1)
165 {
166 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getsockopt() errored while looking up error state from poll()"));
167 }
168
169 // We check the value to see what happened wth the socket.
170 if (err == 0) // Should not happen
171 {
172 return MEMCACHED_SUCCESS;
173 }
174 errno= err;
175
176 return memcached_set_errno(*server, err, MEMCACHED_AT, memcached_literal_param("getsockopt() found the error from poll() during connect."));
177 }
178 assert(fds[0].revents & POLLOUT);
179
180 if (fds[0].revents & POLLOUT and connection_error == EINPROGRESS)
181 {
182 int err;
183 socklen_t len= sizeof(err);
184 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == -1)
185 {
186 return memcached_set_errno(*server, errno, MEMCACHED_AT);
187 }
188
189 if (err == 0)
190 {
191 return MEMCACHED_SUCCESS;
192 }
193
194 return memcached_set_errno(*server, err, MEMCACHED_AT, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
195 }
196
197 break; // We only have the loop setup for errno types that require restart
198 }
199
200 // This should only be possible from ERESTART or EINTR;
201 return memcached_set_errno(*server, connection_error, MEMCACHED_AT, memcached_literal_param("connect_poll() was exhausted"));
202 }
203
204 static memcached_return_t set_hostinfo(org::libmemcached::Instance* server)
205 {
206 assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
207 server->clear_addrinfo();
208
209 char str_port[MEMCACHED_NI_MAXSERV];
210 int length= snprintf(str_port, MEMCACHED_NI_MAXSERV, "%u", uint32_t(server->port()));
211 if (length >= MEMCACHED_NI_MAXSERV or length <= 0)
212 {
213 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
214 memcached_literal_param("snprintf(NI_MAXSERV)"));
215 }
216
217 struct addrinfo hints;
218 memset(&hints, 0, sizeof(struct addrinfo));
219
220 hints.ai_family= AF_INET;
221 if (memcached_is_udp(server->root))
222 {
223 hints.ai_protocol= IPPROTO_UDP;
224 hints.ai_socktype= SOCK_DGRAM;
225 }
226 else
227 {
228 hints.ai_socktype= SOCK_STREAM;
229 hints.ai_protocol= IPPROTO_TCP;
230 }
231
232 assert(server->address_info == NULL);
233 assert(server->address_info_next == NULL);
234 int errcode;
235 assert(server->hostname());
236 switch(errcode= getaddrinfo(server->hostname(), str_port, &hints, &server->address_info))
237 {
238 case 0:
239 server->address_info_next= server->address_info;
240 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
241 break;
242
243 case EAI_AGAIN:
244 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
245
246 case EAI_SYSTEM:
247 server->clear_addrinfo();
248 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
249
250 case EAI_BADFLAGS:
251 server->clear_addrinfo();
252 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
253
254 case EAI_MEMORY:
255 server->clear_addrinfo();
256 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
257
258 default:
259 {
260 server->clear_addrinfo();
261 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
262 }
263 }
264
265 return MEMCACHED_SUCCESS;
266 }
267
268 static inline void set_socket_nonblocking(org::libmemcached::Instance* server)
269 {
270 #if defined(_WIN32)
271 u_long arg= 1;
272 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR)
273 {
274 memcached_set_errno(*server, get_socket_errno(), NULL);
275 }
276 #else
277 int flags;
278
279 if (SOCK_NONBLOCK == 0)
280 {
281 do
282 {
283 flags= fcntl(server->fd, F_GETFL, 0);
284 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
285
286 if (flags == -1)
287 {
288 memcached_set_errno(*server, errno, NULL);
289 }
290 else if ((flags & O_NONBLOCK) == 0)
291 {
292 int rval;
293
294 do
295 {
296 rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
297 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
298
299 if (rval == -1)
300 {
301 memcached_set_errno(*server, errno, NULL);
302 }
303 }
304 }
305 #endif
306 }
307
308 static bool set_socket_options(org::libmemcached::Instance* server)
309 {
310 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
311
312 #ifdef HAVE_FCNTL
313 // If SOCK_CLOEXEC exists then we don't need to call the following
314 if (SOCK_CLOEXEC == 0)
315 {
316 if (FD_CLOEXEC)
317 {
318 int flags;
319 do
320 {
321 flags= fcntl(server->fd, F_GETFD, 0);
322 } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
323
324 if (flags != -1)
325 {
326 int rval;
327 do
328 {
329 rval= fcntl (server->fd, F_SETFD, flags | FD_CLOEXEC);
330 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
331 // we currently ignore the case where rval is -1
332 }
333 }
334 }
335 #endif
336
337 if (memcached_is_udp(server->root))
338 {
339 return true;
340 }
341
342 #ifdef HAVE_SNDTIMEO
343 if (server->root->snd_timeout > 0)
344 {
345 struct timeval waittime;
346
347 waittime.tv_sec= server->root->snd_timeout / 1000000;
348 waittime.tv_usec= server->root->snd_timeout % 1000000;
349
350 int error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
351 (char*)&waittime, (socklen_t)sizeof(struct timeval));
352 (void)error;
353 assert(error == 0);
354 }
355 #endif
356
357 #ifdef HAVE_RCVTIMEO
358 if (server->root->rcv_timeout > 0)
359 {
360 struct timeval waittime;
361
362 waittime.tv_sec= server->root->rcv_timeout / 1000000;
363 waittime.tv_usec= server->root->rcv_timeout % 1000000;
364
365 int error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
366 (char*)&waittime, (socklen_t)sizeof(struct timeval));
367 (void)(error);
368 assert(error == 0);
369 }
370 #endif
371
372
373 #if defined(_WIN32)
374 #else
375 # if defined(SO_NOSIGPIPE)
376 if (SO_NOSIGPIPE)
377 {
378 int set= 1;
379 int error= setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
380
381 assert(error == 0);
382
383 // This is not considered a fatal error
384 if (error == -1)
385 {
386 #if 0
387 perror("setsockopt(SO_NOSIGPIPE)");
388 #endif
389 }
390 }
391 # endif // SO_NOSIGPIPE
392 #endif // _WIN32
393
394 if (server->root->flags.no_block)
395 {
396 struct linger linger;
397
398 linger.l_onoff= 1;
399 linger.l_linger= 0; /* By default on close() just drop the socket */
400 int error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
401 (char*)&linger, (socklen_t)sizeof(struct linger));
402 (void)(error);
403 assert(error == 0);
404 }
405
406 if (TCP_NODELAY)
407 {
408 if (server->root->flags.tcp_nodelay)
409 {
410 int flag= 1;
411
412 int error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
413 (char*)&flag, (socklen_t)sizeof(int));
414 (void)(error);
415 assert(error == 0);
416 }
417 }
418
419 if (server->root->flags.tcp_keepalive)
420 {
421 int flag= 1;
422
423 int error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
424 (char*)&flag, (socklen_t)sizeof(int));
425 (void)(error);
426 assert(error == 0);
427 }
428
429 if (TCP_KEEPIDLE)
430 {
431 if (server->root->tcp_keepidle > 0)
432 {
433 int error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
434 (char*)&server->root->tcp_keepidle, (socklen_t)sizeof(int));
435 (void)(error);
436 assert(error == 0);
437 }
438 }
439
440 if (server->root->send_size > 0)
441 {
442 int error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
443 (char*)&server->root->send_size, (socklen_t)sizeof(int));
444 (void)(error);
445 assert(error == 0);
446 }
447
448 if (server->root->recv_size > 0)
449 {
450 int error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
451 (char*)&server->root->recv_size, (socklen_t)sizeof(int));
452 (void)(error);
453 assert(error == 0);
454 }
455
456 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
457 set_socket_nonblocking(server);
458
459 return true;
460 }
461
462 static memcached_return_t unix_socket_connect(org::libmemcached::Instance* server)
463 {
464 #ifndef _WIN32
465 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
466
467 do {
468 int type= SOCK_STREAM;
469 if (SOCK_CLOEXEC)
470 {
471 type|= SOCK_CLOEXEC;
472 }
473
474 if (SOCK_NONBLOCK)
475 {
476 type|= SOCK_NONBLOCK;
477 }
478
479 if ((server->fd= socket(AF_UNIX, type, 0)) == -1)
480 {
481 return memcached_set_errno(*server, errno, NULL);
482 }
483
484 struct sockaddr_un servAddr;
485
486 memset(&servAddr, 0, sizeof (struct sockaddr_un));
487 servAddr.sun_family= AF_UNIX;
488 strncpy(servAddr.sun_path, server->hostname(), sizeof(servAddr.sun_path)); /* Copy filename */
489
490 if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) == -1)
491 {
492 switch (errno)
493 {
494 case EINPROGRESS:
495 case EALREADY:
496 server->events(POLLOUT);
497 break;
498
499 case EINTR:
500 server->reset_socket();
501 continue;
502
503 case EISCONN: /* We were spinning waiting on connect */
504 {
505 assert(0); // Programmer error
506 server->reset_socket();
507 continue;
508 }
509
510 default:
511 WATCHPOINT_ERRNO(errno);
512 server->reset_socket();
513 return memcached_set_errno(*server, errno, MEMCACHED_AT);
514 }
515 }
516 } while (0);
517 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
518
519 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
520
521 return MEMCACHED_SUCCESS;
522 #else
523 (void)server;
524 return MEMCACHED_NOT_SUPPORTED;
525 #endif
526 }
527
528 static memcached_return_t network_connect(org::libmemcached::Instance* server)
529 {
530 bool timeout_error_occured= false;
531
532 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
533 WATCHPOINT_ASSERT(server->cursor_active_ == 0);
534
535 /*
536 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
537 */
538 if (server->address_info == NULL or server->address_info_next == NULL)
539 {
540 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
541 server->address_info_next= NULL;
542 memcached_return_t rc= set_hostinfo(server);
543
544 if (memcached_failed(rc))
545 {
546 return rc;
547 }
548 }
549
550 assert(server->address_info_next);
551 assert(server->address_info);
552
553 /* Create the socket */
554 while (server->address_info_next and server->fd == INVALID_SOCKET)
555 {
556 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
557 if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
558 {
559 server->address_info_next= server->address_info_next->ai_next;
560 continue;
561 }
562
563 int type= server->address_info_next->ai_socktype;
564 if (SOCK_CLOEXEC)
565 {
566 type|= SOCK_CLOEXEC;
567 }
568
569 if (SOCK_NONBLOCK)
570 {
571 type|= SOCK_NONBLOCK;
572 }
573
574 server->fd= socket(server->address_info_next->ai_family,
575 type,
576 server->address_info_next->ai_protocol);
577
578 if (int(server->fd) == SOCKET_ERROR)
579 {
580 return memcached_set_errno(*server, get_socket_errno(), NULL);
581 }
582
583 if (set_socket_options(server) == false)
584 {
585 server->reset_socket();
586 return MEMCACHED_CONNECTION_FAILURE;
587 }
588
589 /* connect to server */
590 if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
591 {
592 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
593 return MEMCACHED_SUCCESS;
594 }
595
596 /* An error occurred */
597 int local_error= get_socket_errno();
598 switch (local_error)
599 {
600 case ETIMEDOUT:
601 timeout_error_occured= true;
602 break;
603
604 case EAGAIN:
605 #if EWOULDBLOCK != EAGAIN
606 case EWOULDBLOCK:
607 #endif
608 case EINPROGRESS: // nonblocking mode - first return
609 case EALREADY: // nonblocking mode - subsequent returns
610 {
611 server->events(POLLOUT);
612 server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
613 memcached_return_t rc= connect_poll(server, local_error);
614
615 if (memcached_success(rc))
616 {
617 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
618 return MEMCACHED_SUCCESS;
619 }
620
621 // A timeout here is treated as an error, we will not retry
622 if (rc == MEMCACHED_TIMEOUT)
623 {
624 timeout_error_occured= true;
625 }
626 }
627 break;
628
629 case EISCONN: // we are connected :-)
630 WATCHPOINT_ASSERT(0); // This is a programmer's error
631 break;
632
633 case EINTR: // Special case, we retry ai_addr
634 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
635 server->reset_socket();
636 continue;
637
638 case ECONNREFUSED:
639 // Probably not running service
640
641 default:
642 break;
643 }
644
645 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
646 server->reset_socket();
647 server->address_info_next= server->address_info_next->ai_next;
648 }
649
650 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
651
652 if (timeout_error_occured)
653 {
654 server->reset_socket();
655 }
656
657 WATCHPOINT_STRING("Never got a good file descriptor");
658
659 if (memcached_has_current_error(*server))
660 {
661 return memcached_instance_error_return(server);
662 }
663
664 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
665 {
666 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
667 }
668
669 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
670 }
671
672
673 /*
674 backoff_handling()
675
676 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
677 we get caught spending cycles just waiting.
678 */
679 static memcached_return_t backoff_handling(org::libmemcached::Instance* server, bool& in_timeout)
680 {
681 struct timeval curr_time;
682 bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
683
684 /*
685 If we hit server_failure_limit then something is completely wrong about the server.
686
687 1) If autoeject is enabled we do that.
688 2) If not? We go into timeout again, there is much else to do :(
689 */
690 if (server->server_failure_counter >= server->root->server_failure_limit)
691 {
692 /*
693 We just auto_eject if we hit this point
694 */
695 if (_is_auto_eject_host(server->root))
696 {
697 set_last_disconnected_host(server);
698
699 // Retry dead servers if requested
700 if (_gettime_success and server->root->dead_timeout > 0)
701 {
702 server->next_retry= curr_time.tv_sec +server->root->dead_timeout;
703
704 // We only retry dead servers once before assuming failure again
705 server->server_failure_counter= server->root->server_failure_limit -1;
706 }
707
708 memcached_return_t rc;
709 if (memcached_failed(rc= run_distribution((memcached_st *)server->root)))
710 {
711 return memcached_set_error(*server, rc, MEMCACHED_AT, memcached_literal_param("Backoff handling failed during run_distribution"));
712 }
713
714 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
715 }
716
717 server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;
718
719 // Sanity check/setting
720 if (server->next_retry == 0)
721 {
722 server->next_retry= 1;
723 }
724 }
725
726 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
727 {
728 /*
729 If next_retry is less then our current time, then we reset and try everything again.
730 */
731 if (_gettime_success and server->next_retry < curr_time.tv_sec)
732 {
733 server->state= MEMCACHED_SERVER_STATE_NEW;
734 }
735 else
736 {
737 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
738 }
739
740 in_timeout= true;
741 }
742
743 return MEMCACHED_SUCCESS;
744 }
745
746 static memcached_return_t _memcached_connect(org::libmemcached::Instance* server, const bool set_last_disconnected)
747 {
748 assert(server);
749 if (server->fd != INVALID_SOCKET)
750 {
751 return MEMCACHED_SUCCESS;
752 }
753
754 LIBMEMCACHED_MEMCACHED_CONNECT_START();
755
756 bool in_timeout= false;
757 memcached_return_t rc;
758 if (memcached_failed(rc= backoff_handling(server, in_timeout)))
759 {
760 set_last_disconnected_host(server);
761 return rc;
762 }
763
764 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root))
765 {
766 return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
767 }
768
769 if (server->hostname()[0] == '/')
770 {
771 server->type= MEMCACHED_CONNECTION_UNIX_SOCKET;
772 }
773
774 /* We need to clean up the multi startup piece */
775 switch (server->type)
776 {
777 case MEMCACHED_CONNECTION_UDP:
778 case MEMCACHED_CONNECTION_TCP:
779 rc= network_connect(server);
780
781 if (LIBMEMCACHED_WITH_SASL_SUPPORT)
782 {
783 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks)
784 {
785 rc= memcached_sasl_authenticate_connection(server);
786 if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
787 {
788 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
789 server->reset_socket();
790 }
791 }
792 }
793 break;
794
795 case MEMCACHED_CONNECTION_UNIX_SOCKET:
796 rc= unix_socket_connect(server);
797 break;
798 }
799
800 if (memcached_success(rc))
801 {
802 server->mark_server_as_clean();
803 memcached_version_instance(server);
804 return rc;
805 }
806 else if (set_last_disconnected)
807 {
808 set_last_disconnected_host(server);
809 if (memcached_has_current_error(*server))
810 {
811 memcached_mark_server_for_timeout(server);
812 assert(memcached_failed(memcached_instance_error_return(server)));
813 }
814 else
815 {
816 memcached_set_error(*server, rc, MEMCACHED_AT);
817 memcached_mark_server_for_timeout(server);
818 }
819
820 LIBMEMCACHED_MEMCACHED_CONNECT_END();
821
822 if (in_timeout)
823 {
824 char buffer[1024];
825 int snprintf_length= snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname(), int(server->port()));
826 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT, buffer, snprintf_length);
827 }
828 }
829
830 return rc;
831 }
832
833 memcached_return_t memcached_connect(org::libmemcached::Instance* server)
834 {
835 return _memcached_connect(server, true);
836 }