Update tests, improved UDP performance.
[awesomized/libmemcached] / libmemcached / connect.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2010 Brian Aker All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <libmemcached/common.h>
40
41 #include <cassert>
42 #include <ctime>
43 #include <sys/time.h>
44
45 static memcached_return_t connect_poll(memcached_server_st *server)
46 {
47 struct pollfd fds[1];
48 fds[0].fd= server->fd;
49 fds[0].events= POLLOUT;
50
51 size_t loop_max= 5;
52
53 if (server->root->poll_timeout == 0)
54 {
55 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
56 }
57
58 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
59 {
60 int error= poll(fds, 1, server->root->connect_timeout);
61 switch (error)
62 {
63 case 1:
64 {
65 int err;
66 socklen_t len= sizeof (err);
67 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
68 {
69 // We check the value to see what happened wth the socket.
70 if (err == 0)
71 {
72 return MEMCACHED_SUCCESS;
73 }
74 errno= err;
75 }
76
77 return memcached_set_errno(*server, err, MEMCACHED_AT);
78 }
79 case 0:
80 {
81 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
82 }
83
84 default: // A real error occurred and we need to completely bail
85 switch (get_socket_errno())
86 {
87 #ifdef TARGET_OS_LINUX
88 case ERESTART:
89 #endif
90 case EINTR:
91 continue;
92
93 case EFAULT:
94 case ENOMEM:
95 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
96
97 case EINVAL:
98 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
99
100 default: // This should not happen
101 if (fds[0].revents & POLLERR)
102 {
103 int err;
104 socklen_t len= sizeof(err);
105 if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
106 {
107 if (err == 0)
108 {
109 // This should never happen, if it does? Punt.
110 continue;
111 }
112 errno= err;
113 }
114 }
115
116 int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
117
118 assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
119 (void)closesocket(server->fd);
120 server->fd= INVALID_SOCKET;
121 server->state= MEMCACHED_SERVER_STATE_NEW;
122
123 return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
124 }
125 }
126 }
127
128 // This should only be possible from ERESTART or EINTR;
129 return memcached_set_errno(*server, get_socket_errno(), MEMCACHED_AT);
130 }
131
132 memcached_return_t set_hostinfo(memcached_server_st *server)
133 {
134 if (server->address_info)
135 {
136 freeaddrinfo(server->address_info);
137 server->address_info= NULL;
138 server->address_info_next= NULL;
139 }
140
141 char str_port[NI_MAXSERV];
142 int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
143 if (length >= NI_MAXSERV or length < 0)
144 {
145 return MEMCACHED_FAILURE;
146 }
147
148 struct addrinfo hints;
149 memset(&hints, 0, sizeof(struct addrinfo));
150
151 #if 0
152 hints.ai_family= AF_INET;
153 #endif
154 if (memcached_is_udp(server->root))
155 {
156 hints.ai_protocol= IPPROTO_UDP;
157 hints.ai_socktype= SOCK_DGRAM;
158 }
159 else
160 {
161 hints.ai_socktype= SOCK_STREAM;
162 hints.ai_protocol= IPPROTO_TCP;
163 }
164
165 server->address_info= NULL;
166 int errcode;
167 switch(errcode= getaddrinfo(server->hostname, str_port, &hints, &server->address_info))
168 {
169 case 0:
170 break;
171
172 case EAI_AGAIN:
173 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
174
175 case EAI_SYSTEM:
176 return memcached_set_errno(*server, errno, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_SYSTEM)"));
177
178 case EAI_BADFLAGS:
179 return memcached_set_error(*server, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_BADFLAGS)"));
180
181 case EAI_MEMORY:
182 return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("getaddrinfo(EAI_MEMORY)"));
183
184 default:
185 {
186 return memcached_set_error(*server, MEMCACHED_HOST_LOOKUP_FAILURE, MEMCACHED_AT, memcached_string_make_from_cstr(gai_strerror(errcode)));
187 }
188 }
189 server->address_info_next= server->address_info;
190 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
191
192 return MEMCACHED_SUCCESS;
193 }
194
195 static inline void set_socket_nonblocking(memcached_server_st *server)
196 {
197 #ifdef WIN32
198 u_long arg= 1;
199 if (ioctlsocket(server->fd, FIONBIO, &arg) == SOCKET_ERROR)
200 {
201 memcached_set_errno(*server, get_socket_errno(), NULL);
202 }
203 #else
204 int flags;
205
206 do
207 {
208 flags= fcntl(server->fd, F_GETFL, 0);
209 } while (flags == -1 && (errno == EINTR || errno == EAGAIN));
210
211 if (flags == -1)
212 {
213 memcached_set_errno(*server, errno, NULL);
214 }
215 else if ((flags & O_NONBLOCK) == 0)
216 {
217 int rval;
218
219 do
220 {
221 rval= fcntl(server->fd, F_SETFL, flags | O_NONBLOCK);
222 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
223
224 if (rval == -1)
225 {
226 memcached_set_errno(*server, errno, NULL);
227 }
228 }
229 #endif
230 }
231
232 static void set_socket_options(memcached_server_st *server)
233 {
234 assert_msg(server->fd != INVALID_SOCKET, "invalid socket was passed to set_socket_options()");
235
236 if (memcached_is_udp(server->root))
237 {
238 return;
239 }
240
241 #ifdef HAVE_SNDTIMEO
242 if (server->root->snd_timeout)
243 {
244 int error;
245 struct timeval waittime;
246
247 waittime.tv_sec= 0;
248 waittime.tv_usec= server->root->snd_timeout;
249
250 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
251 &waittime, (socklen_t)sizeof(struct timeval));
252 WATCHPOINT_ASSERT(error == 0);
253 }
254 #endif
255
256 #ifdef HAVE_RCVTIMEO
257 if (server->root->rcv_timeout)
258 {
259 int error;
260 struct timeval waittime;
261
262 waittime.tv_sec= 0;
263 waittime.tv_usec= server->root->rcv_timeout;
264
265 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
266 &waittime, (socklen_t)sizeof(struct timeval));
267 WATCHPOINT_ASSERT(error == 0);
268 }
269 #endif
270
271
272 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
273 {
274 int set= 1;
275 int error= setsockopt(server->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
276
277 // This is not considered a fatal error
278 if (error == -1)
279 {
280 WATCHPOINT_ERRNO(get_socket_errno());
281 perror("setsockopt(SO_NOSIGPIPE)");
282 }
283 }
284 #endif
285
286 if (server->root->flags.no_block)
287 {
288 int error;
289 struct linger linger;
290
291 linger.l_onoff= 1;
292 linger.l_linger= 0; /* By default on close() just drop the socket */
293 error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
294 &linger, (socklen_t)sizeof(struct linger));
295 WATCHPOINT_ASSERT(error == 0);
296 }
297
298 if (server->root->flags.tcp_nodelay)
299 {
300 int flag= 1;
301 int error;
302
303 error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
304 &flag, (socklen_t)sizeof(int));
305 WATCHPOINT_ASSERT(error == 0);
306 }
307
308 if (server->root->flags.tcp_keepalive)
309 {
310 int flag= 1;
311 int error;
312
313 error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
314 &flag, (socklen_t)sizeof(int));
315 WATCHPOINT_ASSERT(error == 0);
316 }
317
318 #ifdef TCP_KEEPIDLE
319 if (server->root->tcp_keepidle > 0)
320 {
321 int error;
322
323 error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
324 &server->root->tcp_keepidle, (socklen_t)sizeof(int));
325 WATCHPOINT_ASSERT(error == 0);
326 }
327 #endif
328
329 if (server->root->send_size > 0)
330 {
331 int error;
332
333 error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
334 &server->root->send_size, (socklen_t)sizeof(int));
335 WATCHPOINT_ASSERT(error == 0);
336 }
337
338 if (server->root->recv_size > 0)
339 {
340 int error;
341
342 error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
343 &server->root->recv_size, (socklen_t)sizeof(int));
344 WATCHPOINT_ASSERT(error == 0);
345 }
346
347
348 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
349 set_socket_nonblocking(server);
350 }
351
352 static memcached_return_t unix_socket_connect(memcached_server_st *server)
353 {
354 #ifndef WIN32
355 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
356
357 if ((server->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
358 {
359 memcached_set_errno(*server, errno, NULL);
360 return MEMCACHED_CONNECTION_FAILURE;
361 }
362
363 struct sockaddr_un servAddr;
364
365 memset(&servAddr, 0, sizeof (struct sockaddr_un));
366 servAddr.sun_family= AF_UNIX;
367 strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
368
369 do {
370 if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
371 {
372 switch (errno)
373 {
374 case EINPROGRESS:
375 case EALREADY:
376 case EINTR:
377 continue;
378
379 case EISCONN: /* We were spinning waiting on connect */
380 {
381 WATCHPOINT_ASSERT(0); // Programmer error
382 break;
383 }
384
385 default:
386 WATCHPOINT_ERRNO(errno);
387 memcached_set_errno(*server, errno, MEMCACHED_AT);
388 return MEMCACHED_CONNECTION_FAILURE;
389 }
390 }
391 } while (0);
392 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
393
394 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
395
396 return MEMCACHED_SUCCESS;
397 #else
398 (void)server;
399 return MEMCACHED_NOT_SUPPORTED;
400 #endif
401 }
402
403 static memcached_return_t network_connect(memcached_server_st *server)
404 {
405 bool timeout_error_occured= false;
406
407 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
408 WATCHPOINT_ASSERT(server->cursor_active == 0);
409
410 /*
411 We want to check both of these because if address_info_next has been fully tried, we want to do a new lookup to make sure we have picked up on any new DNS information.
412 */
413 if (server->address_info == NULL or server->address_info_next == NULL)
414 {
415 WATCHPOINT_ASSERT(server->state == MEMCACHED_SERVER_STATE_NEW);
416 server->address_info_next= NULL;
417 memcached_return_t rc;
418 uint32_t counter= 5;
419 while (--counter)
420 {
421 if ((rc= set_hostinfo(server)) != MEMCACHED_TIMEOUT)
422 {
423 break;
424 }
425
426 #ifndef WIN32
427 struct timespec dream, rem;
428
429 dream.tv_nsec= 1000;
430 dream.tv_sec= 0;
431
432 nanosleep(&dream, &rem);
433 #endif
434 }
435
436 if (memcached_failed(rc))
437 {
438 return rc;
439 }
440 }
441
442 if (server->address_info_next == NULL)
443 {
444 server->address_info_next= server->address_info;
445 server->state= MEMCACHED_SERVER_STATE_ADDRINFO;
446 }
447
448 /* Create the socket */
449 while (server->address_info_next and server->fd == INVALID_SOCKET)
450 {
451 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
452 if (memcached_is_udp(server->root) and server->address_info_next->ai_family != AF_INET)
453 {
454 server->address_info_next= server->address_info_next->ai_next;
455 continue;
456 }
457
458 if ((server->fd= socket(server->address_info_next->ai_family,
459 server->address_info_next->ai_socktype,
460 server->address_info_next->ai_protocol)) < 0)
461 {
462 return memcached_set_errno(*server, get_socket_errno(), NULL);
463 }
464
465 set_socket_options(server);
466
467 /* connect to server */
468 if ((connect(server->fd, server->address_info_next->ai_addr, server->address_info_next->ai_addrlen) != SOCKET_ERROR))
469 {
470 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
471 return MEMCACHED_SUCCESS;
472 }
473
474 /* An error occurred */
475 switch (get_socket_errno())
476 {
477 case ETIMEDOUT:
478 timeout_error_occured= true;
479 break;
480
481 case EWOULDBLOCK:
482 case EINPROGRESS: // nonblocking mode - first return
483 case EALREADY: // nonblocking mode - subsequent returns
484 {
485 server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
486 memcached_return_t rc= connect_poll(server);
487
488 if (memcached_success(rc))
489 {
490 server->state= MEMCACHED_SERVER_STATE_CONNECTED;
491 return MEMCACHED_SUCCESS;
492 }
493
494 // A timeout here is treated as an error, we will not retry
495 if (rc == MEMCACHED_TIMEOUT)
496 {
497 timeout_error_occured= true;
498 }
499 }
500 break;
501
502 case EISCONN: // we are connected :-)
503 WATCHPOINT_ASSERT(0); // This is a programmer's error
504 break;
505
506 case EINTR: // Special case, we retry ai_addr
507 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
508 (void)closesocket(server->fd);
509 server->fd= INVALID_SOCKET;
510 continue;
511
512 default:
513 break;
514 }
515
516 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
517 (void)closesocket(server->fd);
518 server->fd= INVALID_SOCKET;
519 server->address_info_next= server->address_info_next->ai_next;
520 }
521
522 WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
523
524 if (timeout_error_occured)
525 {
526 if (server->fd != INVALID_SOCKET)
527 {
528 (void)closesocket(server->fd);
529 server->fd= INVALID_SOCKET;
530 }
531 }
532
533 WATCHPOINT_STRING("Never got a good file descriptor");
534
535 if (memcached_has_current_error(*server))
536 {
537 return memcached_server_error_return(server);
538 }
539
540 if (timeout_error_occured and server->state < MEMCACHED_SERVER_STATE_IN_PROGRESS)
541 {
542 return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
543 }
544
545 return memcached_set_error(*server, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT); /* The last error should be from connect() */
546 }
547
548
549 /*
550 backoff_handling()
551
552 Based on time/failure count fail the connect without trying. This prevents waiting in a state where
553 we get caught spending cycles just waiting.
554 */
555 static memcached_return_t backoff_handling(memcached_server_write_instance_st server, bool& in_timeout)
556 {
557 /*
558 If we hit server_failure_limit then something is completely wrong about the server.
559
560 1) If autoeject is enabled we do that.
561 2) If not? We go into timeout again, there is much else to do :(
562 */
563 if (server->server_failure_counter >= server->root->server_failure_limit)
564 {
565 /*
566 We just auto_eject if we hit this point
567 */
568 if (_is_auto_eject_host(server->root))
569 {
570 set_last_disconnected_host(server);
571 run_distribution((memcached_st *)server->root);
572
573 return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT);
574 }
575
576 server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT;
577
578 // Sanity check/setting
579 if (server->next_retry == 0)
580 {
581 server->next_retry= 1;
582 }
583 }
584
585 if (server->state == MEMCACHED_SERVER_STATE_IN_TIMEOUT)
586 {
587 struct timeval curr_time;
588 bool _gettime_success= (gettimeofday(&curr_time, NULL) == 0);
589
590 /*
591 If next_retry is less then our current time, then we reset and try everything again.
592 */
593 if (_gettime_success and server->next_retry < curr_time.tv_sec)
594 {
595 server->state= MEMCACHED_SERVER_STATE_NEW;
596 }
597 else
598 {
599 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
600 }
601
602 in_timeout= true;
603 }
604
605 return MEMCACHED_SUCCESS;
606 }
607
608 memcached_return_t memcached_connect(memcached_server_write_instance_st server)
609 {
610 if (server->fd != INVALID_SOCKET)
611 {
612 return MEMCACHED_SUCCESS;
613 }
614
615 LIBMEMCACHED_MEMCACHED_CONNECT_START();
616
617 bool in_timeout= false;
618 memcached_return_t rc;
619 if (memcached_failed(rc= backoff_handling(server, in_timeout)))
620 {
621 set_last_disconnected_host(server);
622 return rc;
623 }
624
625 if (LIBMEMCACHED_WITH_SASL_SUPPORT and server->root->sasl.callbacks and memcached_is_udp(server->root))
626 {
627 return memcached_set_error(*server, MEMCACHED_INVALID_HOST_PROTOCOL, MEMCACHED_AT, memcached_literal_param("SASL is not supported for UDP connections"));
628 }
629
630 /* We need to clean up the multi startup piece */
631 switch (server->type)
632 {
633 case MEMCACHED_CONNECTION_UDP:
634 case MEMCACHED_CONNECTION_TCP:
635 rc= network_connect(server);
636
637 if (LIBMEMCACHED_WITH_SASL_SUPPORT)
638 {
639 if (server->fd != INVALID_SOCKET and server->root->sasl.callbacks)
640 {
641 rc= memcached_sasl_authenticate_connection(server);
642 if (memcached_failed(rc) and server->fd != INVALID_SOCKET)
643 {
644 WATCHPOINT_ASSERT(server->fd != INVALID_SOCKET);
645 (void)closesocket(server->fd);
646 server->fd= INVALID_SOCKET;
647 }
648 }
649 }
650 break;
651
652 case MEMCACHED_CONNECTION_UNIX_SOCKET:
653 rc= unix_socket_connect(server);
654 break;
655 }
656
657 if (memcached_success(rc))
658 {
659 memcached_mark_server_as_clean(server);
660 return rc;
661 }
662
663 set_last_disconnected_host(server);
664 if (memcached_has_current_error(*server))
665 {
666 memcached_mark_server_for_timeout(server);
667 assert(memcached_failed(memcached_server_error_return(server)));
668 }
669 else
670 {
671 memcached_set_error(*server, rc, MEMCACHED_AT);
672 memcached_mark_server_for_timeout(server);
673 }
674
675 LIBMEMCACHED_MEMCACHED_CONNECT_END();
676
677 if (in_timeout)
678 {
679 return memcached_set_error(*server, MEMCACHED_SERVER_TEMPORARILY_DISABLED, MEMCACHED_AT);
680 }
681
682 return rc;
683 }