9f17f0c3f26d971c49e15d95bbe26cb0349577d9
[awesomized/libmemcached] / libmemcached / connect.c
1 /* LibMemcached
2 * Copyright (C) 2006-2010 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Server IO, Not public!
9 *
10 */
11
12 #include <libmemcached/common.h>
13 #include <assert.h>
14 #include <sys/time.h>
15 #include <time.h>
16
17 static memcached_return_t connect_poll(memcached_server_st *ptr)
18 {
19 struct pollfd fds[1];
20 fds[0].fd = ptr->fd;
21 fds[0].events = POLLOUT;
22
23 int error;
24 size_t loop_max= 5;
25
26 while (--loop_max) // Should only loop on cases of ERESTART or EINTR
27 {
28 error= poll(fds, 1, ptr->root->connect_timeout);
29
30 switch (error)
31 {
32 case 1:
33 {
34 int err;
35 socklen_t len= sizeof (err);
36 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
37
38 // We check the value to see what happened wth the socket.
39 if (err == 0)
40 {
41 return MEMCACHED_SUCCESS;
42 }
43 else
44 {
45 ptr->cached_errno= errno;
46
47 return MEMCACHED_ERRNO;
48 }
49 }
50 case 0:
51 return MEMCACHED_TIMEOUT;
52 default: // A real error occurred and we need to completely bail
53 WATCHPOINT_ERRNO(get_socket_errno());
54 switch (get_socket_errno())
55 {
56 #ifdef TARGET_OS_LINUX
57 case ERESTART:
58 #endif
59 case EINTR:
60 continue;
61 default:
62 if (fds[0].revents & POLLERR)
63 {
64 int err;
65 socklen_t len= sizeof (err);
66 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
67 ptr->cached_errno= (err == 0) ? get_socket_errno() : err;
68 }
69 else
70 {
71 ptr->cached_errno= get_socket_errno();
72 }
73
74 (void)closesocket(ptr->fd);
75 ptr->fd= INVALID_SOCKET;
76
77 return MEMCACHED_ERRNO;
78 }
79 }
80 }
81
82 // This should only be possible from ERESTART or EINTR;
83 ptr->cached_errno= get_socket_errno();
84
85 return MEMCACHED_ERRNO;
86 }
87
88 static memcached_return_t set_hostinfo(memcached_server_st *server)
89 {
90 struct addrinfo hints;
91 char str_port[NI_MAXSERV];
92
93 assert(! server->address_info); // We cover the case where a programming mistake has been made.
94 if (server->address_info)
95 {
96 freeaddrinfo(server->address_info);
97 server->address_info= NULL;
98 server->address_info_next= NULL;
99 }
100
101 int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
102 if (length >= NI_MAXSERV || length < 0)
103 return MEMCACHED_FAILURE;
104
105 memset(&hints, 0, sizeof(hints));
106
107 #if 0
108 hints.ai_family= AF_INET;
109 #endif
110 if (server->type == MEMCACHED_CONNECTION_UDP)
111 {
112 hints.ai_protocol= IPPROTO_UDP;
113 hints.ai_socktype= SOCK_DGRAM;
114 }
115 else
116 {
117 hints.ai_socktype= SOCK_STREAM;
118 hints.ai_protocol= IPPROTO_TCP;
119 }
120
121 uint32_t counter= 5;
122 while (--counter)
123 {
124 int e= getaddrinfo(server->hostname, str_port, &hints, &server->address_info);
125
126 if (e == 0)
127 {
128 break;
129 }
130 else if (e == EAI_AGAIN)
131 {
132 #ifndef WIN32
133 struct timespec dream, rem;
134
135 dream.tv_nsec= 1000;
136 dream.tv_sec= 0;
137
138 nanosleep(&dream, &rem);
139 #endif
140 continue;
141 }
142 else
143 {
144 WATCHPOINT_STRING(server->hostname);
145 WATCHPOINT_STRING(gai_strerror(e));
146 return MEMCACHED_HOST_LOOKUP_FAILURE;
147 }
148 }
149
150 server->address_info_next= server->address_info;
151
152 return MEMCACHED_SUCCESS;
153 }
154
155 static inline memcached_return_t set_socket_nonblocking(memcached_server_st *ptr)
156 {
157 #ifdef WIN32
158 u_long arg = 1;
159 if (ioctlsocket(ptr->fd, FIONBIO, &arg) == SOCKET_ERROR)
160 {
161 ptr->cached_errno= get_socket_errno();
162 return MEMCACHED_CONNECTION_FAILURE;
163 }
164 #else
165 int flags;
166
167 do
168 flags= fcntl(ptr->fd, F_GETFL, 0);
169 while (flags == -1 && (errno == EINTR || errno == EAGAIN));
170
171 unlikely (flags == -1)
172 {
173 ptr->cached_errno= errno;
174 return MEMCACHED_CONNECTION_FAILURE;
175 }
176 else if ((flags & O_NONBLOCK) == 0)
177 {
178 int rval;
179
180 do
181 rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
182 while (rval == -1 && (errno == EINTR || errno == EAGAIN));
183
184 unlikely (rval == -1)
185 {
186 ptr->cached_errno= errno;
187 return MEMCACHED_CONNECTION_FAILURE;
188 }
189 }
190 #endif
191 return MEMCACHED_SUCCESS;
192 }
193
194 static memcached_return_t set_socket_options(memcached_server_st *ptr)
195 {
196 WATCHPOINT_ASSERT(ptr->fd != -1);
197
198 if (ptr->type == MEMCACHED_CONNECTION_UDP)
199 return MEMCACHED_SUCCESS;
200
201 #ifdef HAVE_SNDTIMEO
202 if (ptr->root->snd_timeout)
203 {
204 int error;
205 struct timeval waittime;
206
207 waittime.tv_sec= 0;
208 waittime.tv_usec= ptr->root->snd_timeout;
209
210 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
211 &waittime, (socklen_t)sizeof(struct timeval));
212 WATCHPOINT_ASSERT(error == 0);
213 if (error)
214 return MEMCACHED_FAILURE;
215 }
216 #endif
217
218 #ifdef HAVE_RCVTIMEO
219 if (ptr->root->rcv_timeout)
220 {
221 int error;
222 struct timeval waittime;
223
224 waittime.tv_sec= 0;
225 waittime.tv_usec= ptr->root->rcv_timeout;
226
227 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
228 &waittime, (socklen_t)sizeof(struct timeval));
229 WATCHPOINT_ASSERT(error == 0);
230 if (error)
231 return MEMCACHED_FAILURE;
232 }
233 #endif
234
235
236 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
237 {
238 int set = 1;
239 int error= setsockopt(ptr->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
240
241 // This is not considered a fatal error
242 if (error == -1)
243 {
244 WATCHPOINT_ERRNO(get_socket_errno());
245 perror("setsockopt(SO_NOSIGPIPE)");
246 }
247 }
248 #endif
249
250 if (ptr->root->flags.no_block)
251 {
252 int error;
253 struct linger linger;
254
255 linger.l_onoff= 1;
256 linger.l_linger= 0; /* By default on close() just drop the socket */
257 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
258 &linger, (socklen_t)sizeof(struct linger));
259 WATCHPOINT_ASSERT(error == 0);
260 if (error)
261 return MEMCACHED_FAILURE;
262 }
263
264 if (ptr->root->flags.tcp_nodelay)
265 {
266 int flag= 1;
267 int error;
268
269 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
270 &flag, (socklen_t)sizeof(int));
271 WATCHPOINT_ASSERT(error == 0);
272 if (error)
273 return MEMCACHED_FAILURE;
274 }
275
276 if (ptr->root->flags.tcp_keepalive)
277 {
278 int flag= 1;
279 int error;
280
281 error= setsockopt(ptr->fd, SOL_SOCKET, SO_KEEPALIVE,
282 &flag, (socklen_t)sizeof(int));
283 WATCHPOINT_ASSERT(error == 0);
284 if (error)
285 return MEMCACHED_FAILURE;
286 }
287
288 #ifdef TCP_KEEPIDLE
289 if (ptr->root->tcp_keepidle > 0)
290 {
291 int error;
292
293 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_KEEPIDLE,
294 &ptr->root->tcp_keepidle, (socklen_t)sizeof(int));
295 WATCHPOINT_ASSERT(error == 0);
296 if (error)
297 return MEMCACHED_FAILURE;
298 }
299 #endif
300
301 if (ptr->root->send_size > 0)
302 {
303 int error;
304
305 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
306 &ptr->root->send_size, (socklen_t)sizeof(int));
307 WATCHPOINT_ASSERT(error == 0);
308 if (error)
309 return MEMCACHED_FAILURE;
310 }
311
312 if (ptr->root->recv_size > 0)
313 {
314 int error;
315
316 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
317 &ptr->root->recv_size, (socklen_t)sizeof(int));
318 WATCHPOINT_ASSERT(error == 0);
319 if (error)
320 return MEMCACHED_FAILURE;
321 }
322
323
324 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
325 return set_socket_nonblocking(ptr);
326 }
327
328 static memcached_return_t unix_socket_connect(memcached_server_st *ptr)
329 {
330 #ifndef WIN32
331 struct sockaddr_un servAddr;
332
333 WATCHPOINT_ASSERT(ptr->fd == -1);
334
335 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
336 {
337 ptr->cached_errno= errno;
338 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
339 }
340
341 memset(&servAddr, 0, sizeof (struct sockaddr_un));
342 servAddr.sun_family= AF_UNIX;
343 strncpy(servAddr.sun_path, ptr->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
344
345 test_connect:
346 if (connect(ptr->fd,
347 (struct sockaddr *)&servAddr,
348 sizeof(servAddr)) < 0)
349 {
350 switch (errno)
351 {
352 case EINPROGRESS:
353 case EALREADY:
354 case EINTR:
355 goto test_connect;
356 case EISCONN: /* We were spinning waiting on connect */
357 break;
358 default:
359 WATCHPOINT_ERRNO(errno);
360 ptr->cached_errno= errno;
361 return MEMCACHED_ERRNO;
362 }
363 }
364
365 WATCHPOINT_ASSERT(ptr->fd != -1);
366
367 return MEMCACHED_SUCCESS;
368 #else
369 (void)ptr;
370 return MEMCACHED_NOT_SUPPORTED;
371 #endif
372 }
373
374 static memcached_return_t network_connect(memcached_server_st *ptr)
375 {
376 bool timeout_error_occured= false;
377
378 WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET);
379 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
380
381 if (! ptr->address_info)
382 {
383 memcached_return_t rc= set_hostinfo(ptr);
384 if (rc != MEMCACHED_SUCCESS)
385 return rc;
386 }
387
388 /* Create the socket */
389 while (ptr->address_info_next && ptr->fd == INVALID_SOCKET)
390 {
391 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
392 if (ptr->type == MEMCACHED_CONNECTION_UDP && ptr->address_info_next->ai_family != AF_INET)
393 {
394 ptr->address_info_next= ptr->address_info_next->ai_next;
395 continue;
396 }
397
398 if ((ptr->fd= socket(ptr->address_info_next->ai_family,
399 ptr->address_info_next->ai_socktype,
400 ptr->address_info_next->ai_protocol)) < 0)
401 {
402 ptr->cached_errno= get_socket_errno();
403 WATCHPOINT_ERRNO(get_socket_errno());
404 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
405 }
406
407 (void)set_socket_options(ptr);
408
409 /* connect to server */
410 if ((connect(ptr->fd, ptr->address_info_next->ai_addr, ptr->address_info_next->ai_addrlen) != SOCKET_ERROR))
411 {
412 break; // Success
413 }
414
415 /* An error occurred */
416 ptr->cached_errno= get_socket_errno();
417 switch (ptr->cached_errno)
418 {
419 case EWOULDBLOCK:
420 case EINPROGRESS: // nonblocking mode - first return
421 case EALREADY: // nonblocking mode - subsequent returns
422 {
423 memcached_return_t rc;
424 rc= connect_poll(ptr);
425
426 if (rc == MEMCACHED_TIMEOUT)
427 timeout_error_occured= true;
428
429 if (rc == MEMCACHED_SUCCESS)
430 break;
431 }
432
433 case EISCONN: // we are connected :-)
434 break;
435
436 case EINTR: // Special case, we retry ai_addr
437 (void)closesocket(ptr->fd);
438 ptr->fd= INVALID_SOCKET;
439 continue;
440
441 default:
442 (void)closesocket(ptr->fd);
443 ptr->fd= INVALID_SOCKET;
444 ptr->address_info_next= ptr->address_info_next->ai_next;
445 break;
446 }
447 }
448
449 if (ptr->fd == INVALID_SOCKET)
450 {
451 WATCHPOINT_STRING("Never got a good file descriptor");
452
453 /* Failed to connect. schedule next retry */
454 if (ptr->root->retry_timeout)
455 {
456 struct timeval next_time;
457
458 if (gettimeofday(&next_time, NULL) == 0)
459 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
460 }
461
462 if (timeout_error_occured)
463 return MEMCACHED_TIMEOUT;
464
465 return MEMCACHED_ERRNO; /* The last error should be from connect() */
466 }
467
468 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
469 }
470
471 void set_last_disconnected_host(memcached_server_write_instance_st ptr)
472 {
473 // const_cast
474 memcached_st *root= (memcached_st *)ptr->root;
475
476 #if 0
477 WATCHPOINT_STRING(ptr->hostname);
478 WATCHPOINT_NUMBER(ptr->port);
479 WATCHPOINT_ERRNO(ptr->cached_errno);
480 #endif
481 if (root->last_disconnected_server)
482 memcached_server_free(root->last_disconnected_server);
483 root->last_disconnected_server= memcached_server_clone(NULL, ptr);
484 }
485
486 memcached_return_t memcached_connect(memcached_server_write_instance_st ptr)
487 {
488 memcached_return_t rc= MEMCACHED_NO_SERVERS;
489
490 if (ptr->fd != INVALID_SOCKET)
491 return MEMCACHED_SUCCESS;
492
493 LIBMEMCACHED_MEMCACHED_CONNECT_START();
494
495 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
496 WATCHPOINT_ASSERT(ptr->root);
497 if (ptr->root->retry_timeout && ptr->next_retry)
498 {
499 struct timeval curr_time;
500
501 gettimeofday(&curr_time, NULL);
502
503 // We should optimize this to remove the allocation if the server was
504 // the last server to die
505 if (ptr->next_retry > curr_time.tv_sec)
506 {
507 set_last_disconnected_host(ptr);
508
509 return MEMCACHED_SERVER_MARKED_DEAD;
510 }
511 }
512
513 // If we are over the counter failure, we just fail. Reject host only
514 // works if you have a set number of failures.
515 if (ptr->root->server_failure_limit && ptr->server_failure_counter >= ptr->root->server_failure_limit)
516 {
517 set_last_disconnected_host(ptr);
518
519 // @todo fix this by fixing behavior to no longer make use of
520 // memcached_st
521 if (_is_auto_eject_host(ptr->root))
522 {
523 run_distribution((memcached_st *)ptr->root);
524 }
525
526 return MEMCACHED_SERVER_MARKED_DEAD;
527 }
528
529 /* We need to clean up the multi startup piece */
530 switch (ptr->type)
531 {
532 case MEMCACHED_CONNECTION_UNKNOWN:
533 WATCHPOINT_ASSERT(0);
534 rc= MEMCACHED_NOT_SUPPORTED;
535 break;
536 case MEMCACHED_CONNECTION_UDP:
537 case MEMCACHED_CONNECTION_TCP:
538 rc= network_connect(ptr);
539 #ifdef LIBMEMCACHED_WITH_SASL_SUPPORT
540 if (ptr->fd != INVALID_SOCKET && ptr->root->sasl.callbacks)
541 {
542 rc= memcached_sasl_authenticate_connection(ptr);
543 if (rc != MEMCACHED_SUCCESS)
544 {
545 (void)closesocket(ptr->fd);
546 ptr->fd= INVALID_SOCKET;
547 }
548 }
549 #endif
550 break;
551 case MEMCACHED_CONNECTION_UNIX_SOCKET:
552 rc= unix_socket_connect(ptr);
553 break;
554 case MEMCACHED_CONNECTION_MAX:
555 default:
556 WATCHPOINT_ASSERT(0);
557 }
558
559 if (rc == MEMCACHED_SUCCESS)
560 {
561 ptr->server_failure_counter= 0;
562 ptr->next_retry= 0;
563 }
564 else
565 {
566 ptr->server_failure_counter++;
567
568 set_last_disconnected_host(ptr);
569 }
570
571 LIBMEMCACHED_MEMCACHED_CONNECT_END();
572
573 return rc;
574 }