Merge Andre
[m6w6/libmemcached] / libmemcached / connect.c
1 #include "common.h"
2 #include <netdb.h>
3 #include <poll.h>
4 #include <sys/time.h>
5
6 static memcached_return_t set_hostinfo(memcached_server_st *server)
7 {
8 struct addrinfo *ai;
9 struct addrinfo hints;
10 int e;
11 char str_port[NI_MAXSERV];
12
13 snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
14
15 memset(&hints, 0, sizeof(hints));
16
17 // hints.ai_family= AF_INET;
18 if (server->type == MEMCACHED_CONNECTION_UDP)
19 {
20 hints.ai_protocol= IPPROTO_UDP;
21 hints.ai_socktype= SOCK_DGRAM;
22 }
23 else
24 {
25 hints.ai_socktype= SOCK_STREAM;
26 hints.ai_protocol= IPPROTO_TCP;
27 }
28
29 e= getaddrinfo(server->hostname, str_port, &hints, &ai);
30 if (e != 0)
31 {
32 WATCHPOINT_STRING(server->hostname);
33 WATCHPOINT_STRING(gai_strerror(e));
34 return MEMCACHED_HOST_LOOKUP_FAILURE;
35 }
36
37 if (server->address_info)
38 {
39 freeaddrinfo(server->address_info);
40 server->address_info= NULL;
41 }
42 server->address_info= ai;
43
44 return MEMCACHED_SUCCESS;
45 }
46
47 static memcached_return_t set_socket_options(memcached_server_st *ptr)
48 {
49 WATCHPOINT_ASSERT(ptr->fd != -1);
50
51 if (ptr->type == MEMCACHED_CONNECTION_UDP)
52 return MEMCACHED_SUCCESS;
53
54 #ifdef HAVE_SNDTIMEO
55 if (ptr->root->snd_timeout)
56 {
57 int error;
58 struct timeval waittime;
59
60 waittime.tv_sec= 0;
61 waittime.tv_usec= ptr->root->snd_timeout;
62
63 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
64 &waittime, (socklen_t)sizeof(struct timeval));
65 WATCHPOINT_ASSERT(error == 0);
66 if (error)
67 return MEMCACHED_FAILURE;
68 }
69 #endif
70
71 #ifdef HAVE_RCVTIMEO
72 if (ptr->root->rcv_timeout)
73 {
74 int error;
75 struct timeval waittime;
76
77 waittime.tv_sec= 0;
78 waittime.tv_usec= ptr->root->rcv_timeout;
79
80 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
81 &waittime, (socklen_t)sizeof(struct timeval));
82 WATCHPOINT_ASSERT(error == 0);
83 if (error)
84 return MEMCACHED_FAILURE;
85 }
86 #endif
87
88 if (ptr->root->flags.no_block)
89 {
90 int error;
91 struct linger linger;
92
93 linger.l_onoff= 1;
94 linger.l_linger= 0; /* By default on close() just drop the socket */
95 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
96 &linger, (socklen_t)sizeof(struct linger));
97 WATCHPOINT_ASSERT(error == 0);
98 if (error)
99 return MEMCACHED_FAILURE;
100 }
101
102 if (ptr->root->flags.tcp_nodelay)
103 {
104 int flag= 1;
105 int error;
106
107 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
108 &flag, (socklen_t)sizeof(int));
109 WATCHPOINT_ASSERT(error == 0);
110 if (error)
111 return MEMCACHED_FAILURE;
112 }
113
114 if (ptr->root->flags.tcp_keepalive)
115 {
116 int flag= 1;
117 int error;
118
119 error= setsockopt(ptr->fd, SOL_SOCKET, SO_KEEPALIVE,
120 &flag, (socklen_t)sizeof(int));
121 WATCHPOINT_ASSERT(error == 0);
122 if (error)
123 return MEMCACHED_FAILURE;
124 }
125
126 #ifdef TCP_KEEPIDLE
127 if (ptr->root->tcp_keepidle > 0)
128 {
129 int error;
130
131 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_KEEPIDLE,
132 &ptr->root->tcp_keepidle, (socklen_t)sizeof(int));
133 WATCHPOINT_ASSERT(error == 0);
134 if (error)
135 return MEMCACHED_FAILURE;
136 }
137 #endif
138
139 if (ptr->root->send_size > 0)
140 {
141 int error;
142
143 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
144 &ptr->root->send_size, (socklen_t)sizeof(int));
145 WATCHPOINT_ASSERT(error == 0);
146 if (error)
147 return MEMCACHED_FAILURE;
148 }
149
150 if (ptr->root->recv_size > 0)
151 {
152 int error;
153
154 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
155 &ptr->root->recv_size, (socklen_t)sizeof(int));
156 WATCHPOINT_ASSERT(error == 0);
157 if (error)
158 return MEMCACHED_FAILURE;
159 }
160
161 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
162 int flags;
163
164 do
165 flags= fcntl(ptr->fd, F_GETFL, 0);
166 while (flags == -1 && (errno == EINTR || errno == EAGAIN));
167
168 unlikely (flags == -1)
169 {
170 return MEMCACHED_CONNECTION_FAILURE;
171 }
172 else if ((flags & O_NONBLOCK) == 0)
173 {
174 int rval;
175
176 do
177 rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
178 while (rval == -1 && (errno == EINTR || errno == EAGAIN));
179
180 unlikely (rval == -1)
181 {
182 return MEMCACHED_CONNECTION_FAILURE;
183 }
184 }
185
186 return MEMCACHED_SUCCESS;
187 }
188
189 static memcached_return_t unix_socket_connect(memcached_server_st *ptr)
190 {
191 struct sockaddr_un servAddr;
192
193 if (ptr->fd == -1)
194 {
195 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
196 {
197 ptr->cached_errno= errno;
198 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
199 }
200
201 memset(&servAddr, 0, sizeof (struct sockaddr_un));
202 servAddr.sun_family= AF_UNIX;
203 strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
204
205 test_connect:
206 if (connect(ptr->fd,
207 (struct sockaddr *)&servAddr,
208 sizeof(servAddr)) < 0)
209 {
210 switch (errno)
211 {
212 case EINPROGRESS:
213 case EALREADY:
214 case EINTR:
215 goto test_connect;
216 case EISCONN: /* We were spinning waiting on connect */
217 break;
218 default:
219 WATCHPOINT_ERRNO(errno);
220 ptr->cached_errno= errno;
221 return MEMCACHED_ERRNO;
222 }
223 }
224 }
225
226 WATCHPOINT_ASSERT(ptr->fd != -1);
227 return MEMCACHED_SUCCESS;
228 }
229
230 static memcached_return_t network_connect(memcached_server_st *ptr)
231 {
232 if (ptr->fd == -1)
233 {
234 struct addrinfo *use;
235
236 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
237
238 if (! ptr->options.sockaddr_inited ||
239 (!(ptr->root->flags.use_cache_lookups)))
240 {
241 memcached_return_t rc;
242
243 rc= set_hostinfo(ptr);
244 if (rc != MEMCACHED_SUCCESS)
245 return rc;
246 ptr->options.sockaddr_inited= true;
247 }
248
249 use= ptr->address_info;
250 /* Create the socket */
251 while (use != NULL)
252 {
253 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
254 if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET)
255 {
256 use= use->ai_next;
257 continue;
258 }
259
260 if ((ptr->fd= socket(use->ai_family,
261 use->ai_socktype,
262 use->ai_protocol)) < 0)
263 {
264 ptr->cached_errno= errno;
265 WATCHPOINT_ERRNO(errno);
266 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
267 }
268
269 (void)set_socket_options(ptr);
270
271 /* connect to server */
272 while (ptr->fd != -1 &&
273 connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
274 {
275 ptr->cached_errno= errno;
276 if (errno == EINPROGRESS || /* nonblocking mode - first return, */
277 errno == EALREADY) /* nonblocking mode - subsequent returns */
278 {
279 struct pollfd fds[1];
280 fds[0].fd = ptr->fd;
281 fds[0].events = POLLOUT;
282 int error= poll(fds, 1, ptr->root->connect_timeout);
283
284 if (error != 1 || fds[0].revents & POLLERR)
285 {
286 if (fds[0].revents & POLLERR)
287 {
288 int err;
289 socklen_t len = sizeof (err);
290 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
291 ptr->cached_errno= (err == 0) ? errno : err;
292 }
293
294 (void)close(ptr->fd);
295 ptr->fd= -1;
296 }
297 }
298 else if (errno == EISCONN) /* we are connected :-) */
299 {
300 break;
301 }
302 else if (errno != EINTR)
303 {
304 (void)close(ptr->fd);
305 ptr->fd= -1;
306 break;
307 }
308 }
309
310 #ifdef LIBMEMCACHED_WITH_SASL_SUPPORT
311 if (ptr->fd != -1 && ptr->root->sasl.callbacks != NULL)
312 {
313 memcached_return rc= memcached_sasl_authenticate_connection(ptr);
314 if (rc != MEMCACHED_SUCCESS)
315 {
316 (void)close(ptr->fd);
317 ptr->fd= -1;
318 return rc;
319 }
320 }
321 #endif
322
323
324 if (ptr->fd != -1)
325 {
326 ptr->server_failure_counter= 0;
327 return MEMCACHED_SUCCESS;
328 }
329 use = use->ai_next;
330 }
331 }
332
333 if (ptr->fd == -1)
334 {
335 /* Failed to connect. schedule next retry */
336 if (ptr->root->retry_timeout)
337 {
338 struct timeval next_time;
339
340 if (gettimeofday(&next_time, NULL) == 0)
341 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
342 }
343 ptr->server_failure_counter++;
344 if (ptr->cached_errno == 0)
345 return MEMCACHED_TIMEOUT;
346
347 return MEMCACHED_ERRNO; /* The last error should be from connect() */
348 }
349
350 ptr->server_failure_counter= 0;
351 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
352 }
353
354
355 memcached_return_t memcached_connect(memcached_server_write_instance_st ptr)
356 {
357 memcached_return_t rc= MEMCACHED_NO_SERVERS;
358 LIBMEMCACHED_MEMCACHED_CONNECT_START();
359
360 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
361 WATCHPOINT_ASSERT(ptr->root);
362 if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
363 {
364 struct timeval curr_time;
365
366 gettimeofday(&curr_time, NULL);
367
368 /* if we've had too many consecutive errors on this server, mark it dead. */
369 if (ptr->server_failure_counter >= ptr->root->server_failure_limit)
370 {
371 ptr->next_retry= curr_time.tv_sec + ptr->root->retry_timeout;
372 ptr->server_failure_counter= 0;
373 }
374
375 if (curr_time.tv_sec < ptr->next_retry)
376 {
377 memcached_st *root= (memcached_st *)ptr->root;
378 // @todo fix this by fixing behavior to no longer make use of
379 // memcached_st
380 if (memcached_behavior_get(root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
381 {
382 run_distribution(root);
383 }
384
385 root->last_disconnected_server = ptr;
386
387 return MEMCACHED_SERVER_MARKED_DEAD;
388 }
389 }
390
391 /* We need to clean up the multi startup piece */
392 switch (ptr->type)
393 {
394 case MEMCACHED_CONNECTION_UNKNOWN:
395 WATCHPOINT_ASSERT(0);
396 rc= MEMCACHED_NOT_SUPPORTED;
397 break;
398 case MEMCACHED_CONNECTION_UDP:
399 case MEMCACHED_CONNECTION_TCP:
400 rc= network_connect(ptr);
401 break;
402 case MEMCACHED_CONNECTION_UNIX_SOCKET:
403 rc= unix_socket_connect(ptr);
404 break;
405 case MEMCACHED_CONNECTION_MAX:
406 default:
407 WATCHPOINT_ASSERT(0);
408 }
409
410 unlikely ( rc != MEMCACHED_SUCCESS)
411 {
412 //@todo create interface around last_discontected_server
413 memcached_st *root= (memcached_st *)ptr->root;
414 root->last_disconnected_server = ptr;
415 }
416
417 LIBMEMCACHED_MEMCACHED_CONNECT_END();
418
419 return rc;
420 }