Merge lp:~tangent-org/libmemcached/1.0-build/ Build: jenkins-Libmemcached-230
[awesomized/libmemcached] / libmemcached / connect.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 #include <cassert>
42
43 #ifndef SOCK_CLOEXEC
44 # define SOCK_CLOEXEC 0
45 #endif
46
47 #ifndef SOCK_NONBLOCK
48 # define SOCK_NONBLOCK 0
49 #endif
50
51 #ifndef FD_CLOEXEC
52 # define FD_CLOEXEC 0
53 #endif
54
55 #ifndef SO_NOSIGPIPE
56 # define SO_NOSIGPIPE 0
57 #endif
58
59 #ifndef TCP_NODELAY
60 # define TCP_NODELAY 0
61 #endif
62
63 #ifndef TCP_KEEPIDLE
64 # define TCP_KEEPIDLE 0
65 #endif
66
67 static memcached_return_t connect_poll(org::libmemcached::Instance* server, const int connection_error)
68 {
69 struct pollfd fds[1];
70 fds[0].fd= server->fd;
71 fds[0].events= server->events();
72 fds[0].revents= 0;
73
74 size_t loop_max= 5;
75
76 if (server->root->poll_timeout == 0)
77 {
78 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
79 memcached_literal_param("The time to wait for a connection to be established was set to zero, which means it will always timeout (MEMCACHED_TIMEOUT)."));
80 }
81
82 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
83 {
84 int number_of;
85 if ((number_of= poll(fds, 1, server->root->connect_timeout)) == -1)
86 {
87 int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
88 switch (local_errno)
89 {
90 #ifdef __linux__
91 case ERESTART:
92 #endif
93 case EINTR:
94 continue;
95
96 case EFAULT:
97 case ENOMEM:
98 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
99
100 case EINVAL:
101 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
102 memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
103
104 default: // This should not happen
105 break;
106 }
107
108 assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
109 server->reset_socket();
110 server->state= MEMCACHED_SERVER_STATE_NEW;
111
112 return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
113 }
114
115 if (number_of == 0)
116 {
117 if (connection_error == EINPROGRESS)
118 {
119 int err;
120 socklen_t len= sizeof(err);
121 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == -1)
122 {
123 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getsockopt() error'ed while looking for error connect_poll(EINPROGRESS)"));
124 }
125
126 // If Zero, my hero, we just fail to a generic MEMCACHED_TIMEOUT error
127 if (err != 0)
128 {
129 return memcached_set_errno(*server, err, MEMCACHED_AT, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
130 }
131 }
132
133 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_literal_param("(number_of == 0)"));
134 }
135
136 assert (number_of == 1);
137
138 if (fds[0].revents & POLLERR or
139 fds[0].revents & POLLHUP or
140 fds[0].revents & POLLNVAL)
141 {
142 int err;
143 socklen_t len= sizeof (err);
144 if (getsockopt(fds[0].fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == -1)
145 {
146 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getsockopt() errored while looking up error state from poll()"));
147 }
148
149 // We check the value to see what happened wth the socket.
150 if (err == 0) // Should not happen
151 {
152 return MEMCACHED_SUCCESS;
153 }
154 errno= err;
155
156 return memcached_set_errno(*server, err, MEMCACHED_AT, memcached_literal_param("getsockopt() found the error from poll() during connect."));
157 }
158 assert(fds[0].revents & POLLOUT);
159
160 if (fds[0].revents & POLLOUT and connection_error == EINPROGRESS)
161 {
162 int err;
163 socklen_t len= sizeof(err);
164 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, (char*)&err, &len) == -1)
165 {
166 return memcached_set_errno(*server, errno, MEMCACHED_AT);
167 }
168
169 if (err == 0)
170 {
171 return MEMCACHED_SUCCESS;
172 }
173
174 return memcached_set_errno(*server, err, MEMCACHED_AT, memcached_literal_param("getsockopt() found the error from poll() after connect() returned EINPROGRESS."));
175 }
176
177 break; // We only have the loop setup for errno types that require restart
178 }
179
180 // This should only be possible from ERESTART or EINTR;
181 return memcached_set_errno(*server, connection_error, MEMCACHED_AT, memcached_literal_param("connect_poll() was exhausted"));
182 }
183
184 static memcached_return_t set_hostinfo(org::libmemcached::Instance* server)
185 {
186 assert(server->type != MEMCACHED_CONNECTION_UNIX_SOCKET);
187 server->clear_addrinfo();
188
189 char str_port[MEMCACHED_NI_MAXSERV];
190 int length= snprintf(str_port, MEMCACHED_NI_MAXSERV, "%u", uint32_t(server->port()));
191 if (length >= MEMCACHED_NI_MAXSERV or length <= 0)
192 {
193 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
194 memcached_literal_param("snprintf(NI_MAXSERV)"));
195 }
196
197 struct addrinfo hints;
198 memset(&hints, 0, sizeof(struct addrinfo));
199
200 hints.ai_family= AF_INET;
201 if (memcached_is_udp(server->root))
202 {
203 hints.ai_protocol= IPPROTO_UDP;
204 hints.ai_socktype= SOCK_DGRAM;
205 }
206 else
207 {
208 hints.ai_socktype= SOCK_STREAM;
209 hints.ai_protocol= IPPROTO_TCP;
210 }
211
212 assert(server->address_info == NULL);
213 assert(server->address_info_next == NULL);
214 int errcode;
215 assert(server->hostname());
216 switch(errcode= getaddrinfo(server->hostname(), str_port, &hints, &server->address_info))
217 {
218 case 0:
219 server->address_info_next= server->address_info;
220 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
221 break;
222
223 case EAI_AGAIN:
224 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
225
226 case EAI_SYSTEM:
227 server->clear_addrinfo();
228 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
229
230 case EAI_BADFLAGS:
231 server->clear_addrinfo();
232 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
233
234 case EAI_MEMORY:
235 server->clear_addrinfo();
236 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
237
238 default:
239 {
240 server->clear_addrinfo();
241 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
242 }
243 }
244
245 return MEMCACHED_SUCCESS;
246 }
247
248 static inline void set_socket_nonblocking(org::libmemcached::Instance* server)
249 {
250 #if defined(_WIN32)
251 u_long arg= 1;
252 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR)
253 {
254 memcached_set_errno(*server, get_socket_errno(), NULL);
255 }
256 #else
257 int flags;
258
259 if (SOCK_NONBLOCK == 0)
260 {
261 do
262 {
263 flags= fcntl(server->fd, F_GETFL, 0);
264 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
265
266 if (flags == -1)
267 {
268 memcached_set_errno(*server, errno, NULL);
269 }
270 else if ((flags & O_NONBLOCK) == 0)
271 {
272 int rval;
273
274 do
275 {
276 rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
277 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
278
279 if (rval == -1)
280 {
281 memcached_set_errno(*server, errno, NULL);
282 }
283 }
284 }
285 #endif
286 }
287
288 static bool set_socket_options(org::libmemcached::Instance* server)
289 {
290 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
291
292 #ifdef HAVE_FCNTL
293 // If SOCK_CLOEXEC exists then we don't need to call the following
294 if (SOCK_CLOEXEC == 0)
295 {
296 if (FD_CLOEXEC)
297 {
298 int flags;
299 do
300 {
301 flags= fcntl(server->fd, F_GETFD, 0);
302 } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
303
304 if (flags != -1)
305 {
306 int rval;
307 do
308 {
309 rval= fcntl (server->fd, F_SETFD, flags | FD_CLOEXEC);
310 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
311 // we currently ignore the case where rval is -1
312 }
313 }
314 }
315 #endif
316
317 if (memcached_is_udp(server->root))
318 {
319 return true;
320 }
321
322 #ifdef HAVE_SNDTIMEO
323 if (server->root->snd_timeout > 0)
324 {
325 struct timeval waittime;
326
327 waittime.tv_sec= server->root->snd_timeout / 1000000;
328 waittime.tv_usec= server->root->snd_timeout % 1000000;
329
330 int error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
331 (char*)&waittime, (socklen_t)sizeof(struct timeval));
332 (void)error;
333 assert(error == 0);
334 }
335 #endif
336
337 #ifdef HAVE_RCVTIMEO
338 if (server->root->rcv_timeout > 0)
339 {
340 struct timeval waittime;
341
342 waittime.tv_sec= server->root->rcv_timeout / 1000000;
343 waittime.tv_usec= server->root->rcv_timeout % 1000000;
344
345 int error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
346 (char*)&waittime, (socklen_t)sizeof(struct timeval));
347 (void)(error);
348 assert(error == 0);
349 }
350 #endif
351
352
353 #if defined(_WIN32)
354 #else
355 # if defined(SO_NOSIGPIPE)
356 if (SO_NOSIGPIPE)
357 {
358 int set= 1;
359 int error= setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
360
361 assert(error == 0);
362
363 // This is not considered a fatal error
364 if (error == -1)
365 {
366 #if 0
367 perror("setsockopt(SO_NOSIGPIPE)");
368 #endif
369 }
370 }
371 # endif // SO_NOSIGPIPE
372 #endif // _WIN32
373
374 if (server->root->flags.no_block)
375 {
376 struct linger linger;
377
378 linger.l_onoff= 1;
379 linger.l_linger= 0; /* By default on close() just drop the socket */
380 int error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
381 (char*)&linger, (socklen_t)sizeof(struct linger));
382 (void)(error);
383 assert(error == 0);
384 }
385
386 if (TCP_NODELAY)
387 {
388 if (server->root->flags.tcp_nodelay)
389 {
390 int flag= 1;
391
392 int error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
393 (char*)&flag, (socklen_t)sizeof(int));
394 (void)(error);
395 assert(error == 0);
396 }
397 }
398
399 if (server->root->flags.tcp_keepalive)
400 {
401 int flag= 1;
402
403 int error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
404 (char*)&flag, (socklen_t)sizeof(int));
405 (void)(error);
406 assert(error == 0);
407 }
408
409 if (TCP_KEEPIDLE)
410 {
411 if (server->root->tcp_keepidle > 0)
412 {
413 int error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
414 (char*)&server->root->tcp_keepidle, (socklen_t)sizeof(int));
415 (void)(error);
416 assert(error == 0);
417 }
418 }
419
420 if (server->root->send_size > 0)
421 {
422 int error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
423 (char*)&server->root->send_size, (socklen_t)sizeof(int));
424 (void)(error);
425 assert(error == 0);
426 }
427
428 if (server->root->recv_size > 0)
429 {
430 int error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
431 (char*)&server->root->recv_size, (socklen_t)sizeof(int));
432 (void)(error);
433 assert(error == 0);
434 }
435
436 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
437 set_socket_nonblocking(server);
438
439 return true;
440 }
441
442 static memcached_return_t unix_socket_connect(org::libmemcached::Instance* server)
443 {
444 #ifndef _WIN32
445 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
446
447 do {
448 int type= SOCK_STREAM;
449 if (SOCK_CLOEXEC)
450 {
451 type|= SOCK_CLOEXEC;
452 }
453
454 if (SOCK_NONBLOCK)
455 {
456 type|= SOCK_NONBLOCK;
457 }
458
459 if ((server->fd= socket(AF_UNIX, type, 0)) == -1)
460 {
461 return memcached_set_errno(*server, errno, NULL);
462 }
463
464 struct sockaddr_un servAddr;
465
466 memset(&servAddr, 0, sizeof (struct sockaddr_un));
467 servAddr.sun_family= AF_UNIX;
468 strncpy(servAddr.sun_path, server->hostname(), sizeof(servAddr.sun_path)); /* Copy filename */
469
470 if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) == -1)
471 {
472 switch (errno)
473 {
474 case EINPROGRESS:
475 case EALREADY:
476 server->events(POLLOUT);
477 break;
478
479 case EINTR:
480 server->reset_socket();
481 continue;
482
483 case EISCONN: /* We were spinning waiting on connect */
484 {
485 assert(0); // Programmer error
486 server->reset_socket();
487 continue;
488 }
489
490 default:
491 WATCHPOINT_ERRNO(errno);
492 server->reset_socket();
493 return memcached_set_errno(*server, errno, MEMCACHED_AT);
494 }
495 }
496 } while (0);
497 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
498
499 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
500
501 return MEMCACHED_SUCCESS;
502 #else
503 (void)server;
504 return MEMCACHED_NOT_SUPPORTED;
505 #endif
506 }
507
508 static memcached_return_t network_connect(org::libmemcached::Instance* server)
509 {
510 bool timeout_error_occured= false;
511
512 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
513 WATCHPOINT_ASSERT(server->cursor_active_ == 0);
514
515 /*
516 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
517 */
518 if (server->address_info == NULL or server->address_info_next == NULL)
519 {
520 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
521 server->address_info_next= NULL;
522 memcached_return_t rc= set_hostinfo(server);
523
524 if (memcached_failed(rc))
525 {
526 return rc;
527 }
528 }
529
530 assert(server->address_info_next);
531 assert(server->address_info);
532
533 /* Create the socket */
534 while (server->address_info_next and server->fd == INVALID_SOCKET)
535 {
536 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
537 if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
538 {
539 server->address_info_next= server->address_info_next->ai_next;
540 continue;
541 }
542
543 int type= server->address_info_next->ai_socktype;
544 if (SOCK_CLOEXEC)
545 {
546 type|= SOCK_CLOEXEC;
547 }
548
549 if (SOCK_NONBLOCK)
550 {
551 type|= SOCK_NONBLOCK;
552 }
553
554 server->fd= socket(server->address_info_next->ai_family,
555 type,
556 server->address_info_next->ai_protocol);
557
558 if (int(server->fd) == SOCKET_ERROR)
559 {
560 return memcached_set_errno(*server, get_socket_errno(), NULL);
561 }
562
563 if (set_socket_options(server) == false)
564 {
565 server->reset_socket();
566 return MEMCACHED_CONNECTION_FAILURE;
567 }
568
569 /* connect to server */
570 if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
571 {
572 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
573 return MEMCACHED_SUCCESS;
574 }
575
576 /* An error occurred */
577 int local_error= get_socket_errno();
578 switch (local_error)
579 {
580 case ETIMEDOUT:
581 timeout_error_occured= true;
582 break;
583
584 case EAGAIN:
585 #if EWOULDBLOCK != EAGAIN
586 case EWOULDBLOCK:
587 #endif
588 case EINPROGRESS: // nonblocking mode - first return
589 case EALREADY: // nonblocking mode - subsequent returns
590 {
591 server->events(POLLOUT);
592 server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
593 memcached_return_t rc= connect_poll(server, local_error);
594
595 if (memcached_success(rc))
596 {
597 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
598 return MEMCACHED_SUCCESS;
599 }
600
601 // A timeout here is treated as an error, we will not retry
602 if (rc == MEMCACHED_TIMEOUT)
603 {
604 timeout_error_occured= true;
605 }
606 }
607 break;
608
609 case EISCONN: // we are connected :-)
610 WATCHPOINT_ASSERT(0); // This is a programmer's error
611 break;
612
613 case EINTR: // Special case, we retry ai_addr
614 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
615 server->reset_socket();
616 continue;
617
618 case ECONNREFUSED:
619 // Probably not running service
620
621 default:
622 break;
623 }
624
625 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
626 server->reset_socket();
627 server->address_info_next= server->address_info_next->ai_next;
628 }
629
630 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
631
632 if (timeout_error_occured)
633 {
634 server->reset_socket();
635 }
636
637 WATCHPOINT_STRING("Never got a good file descriptor");
638
639 if (memcached_has_current_error(*server))
640 {
641 return memcached_instance_error_return(server);
642 }
643
644 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
645 {
646 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT,
647 memcached_literal_param("if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)"));
648 }
649
650 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
651 }
652
653
654 /*
655 backoff_handling()
656
657 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
658 we get caught spending cycles just waiting.
659 */
660 static memcached_return_t backoff_handling(org::libmemcached::Instance* server, bool& in_timeout)
661 {
662 struct timeval curr_time;
663 bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
664
665 /*
666 If we hit server_failure_limit then something is completely wrong about the server.
667
668 1) If autoeject is enabled we do that.
669 2) If not? We go into timeout again, there is much else to do :(
670 */
671 if (server->server_failure_counter >= server->root->server_failure_limit)
672 {
673 /*
674 We just auto_eject if we hit this point
675 */
676 if (_is_auto_eject_host(server->root))
677 {
678 set_last_disconnected_host(server);
679
680 // Retry dead servers if requested
681 if (_gettime_success and server->root->dead_timeout > 0)
682 {
683 server->next_retry= curr_time.tv_sec +server->root->dead_timeout;
684
685 // We only retry dead servers once before assuming failure again
686 server->server_failure_counter= server->root->server_failure_limit -1;
687 }
688
689 memcached_return_t rc;
690 if (memcached_failed(rc= run_distribution((memcached_st *)server->root)))
691 {
692 return memcached_set_error(*server, rc, MEMCACHED_AT, memcached_literal_param("Backoff handling failed during run_distribution"));
693 }
694
695 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
696 }
697
698 server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;
699
700 // Sanity check/setting
701 if (server->next_retry == 0)
702 {
703 server->next_retry= 1;
704 }
705 }
706
707 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
708 {
709 /*
710 If next_retry is less then our current time, then we reset and try everything again.
711 */
712 if (_gettime_success and server->next_retry < curr_time.tv_sec)
713 {
714 server->state= MEMCACHED_SERVER_STATE_NEW;
715 }
716 else
717 {
718 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
719 }
720
721 in_timeout= true;
722 }
723
724 return MEMCACHED_SUCCESS;
725 }
726
727 static memcached_return_t _memcached_connect(org::libmemcached::Instance* server, const bool set_last_disconnected)
728 {
729 assert(server);
730 if (server->fd != INVALID_SOCKET)
731 {
732 return MEMCACHED_SUCCESS;
733 }
734
735 LIBMEMCACHED_MEMCACHED_CONNECT_START();
736
737 bool in_timeout= false;
738 memcached_return_t rc;
739 if (memcached_failed(rc= backoff_handling(server, in_timeout)))
740 {
741 set_last_disconnected_host(server);
742 return rc;
743 }
744
745 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root))
746 {
747 return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
748 }
749
750 if (server->hostname()[0] == '/')
751 {
752 server->type= MEMCACHED_CONNECTION_UNIX_SOCKET;
753 }
754
755 /* We need to clean up the multi startup piece */
756 switch (server->type)
757 {
758 case MEMCACHED_CONNECTION_UDP:
759 case MEMCACHED_CONNECTION_TCP:
760 rc= network_connect(server);
761
762 if (LIBMEMCACHED_WITH_SASL_SUPPORT)
763 {
764 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks)
765 {
766 rc= memcached_sasl_authenticate_connection(server);
767 if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
768 {
769 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
770 server->reset_socket();
771 }
772 }
773 }
774 break;
775
776 case MEMCACHED_CONNECTION_UNIX_SOCKET:
777 rc= unix_socket_connect(server);
778 break;
779 }
780
781 if (memcached_success(rc))
782 {
783 server->mark_server_as_clean();
784 memcached_version_instance(server);
785 return rc;
786 }
787 else if (set_last_disconnected)
788 {
789 set_last_disconnected_host(server);
790 if (memcached_has_current_error(*server))
791 {
792 memcached_mark_server_for_timeout(server);
793 assert(memcached_failed(memcached_instance_error_return(server)));
794 }
795 else
796 {
797 memcached_set_error(*server, rc, MEMCACHED_AT);
798 memcached_mark_server_for_timeout(server);
799 }
800
801 LIBMEMCACHED_MEMCACHED_CONNECT_END();
802
803 if (in_timeout)
804 {
805 char buffer[1024];
806 int snprintf_length= snprintf(buffer, sizeof(buffer), "%s:%d", server->hostname(), int(server->port()));
807 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT, buffer, snprintf_length);
808 }
809 }
810
811 return rc;
812 }
813
814 memcached_return_t memcached_connect(org::libmemcached::Instance* server)
815 {
816 return _memcached_connect(server, true);
817 }