41bd7fb03429832d200ca3639a760e46aaa01041
[awesomized/libmemcached] / libmemcached / connect.c
1 #include "common.h"
2 #include <netdb.h>
3 #include <poll.h>
4 #include <sys/time.h>
5
6 static memcached_return_t set_hostinfo(memcached_server_st *server)
7 {
8 struct addrinfo *ai;
9 struct addrinfo hints;
10 int e;
11 char str_port[NI_MAXSERV];
12
13 snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
14
15 memset(&hints, 0, sizeof(hints));
16
17 // hints.ai_family= AF_INET;
18 if (server->type == MEMCACHED_CONNECTION_UDP)
19 {
20 hints.ai_protocol= IPPROTO_UDP;
21 hints.ai_socktype= SOCK_DGRAM;
22 }
23 else
24 {
25 hints.ai_socktype= SOCK_STREAM;
26 hints.ai_protocol= IPPROTO_TCP;
27 }
28
29 e= getaddrinfo(server->hostname, str_port, &hints, &ai);
30 if (e != 0)
31 {
32 WATCHPOINT_STRING(server->hostname);
33 WATCHPOINT_STRING(gai_strerror(e));
34 return MEMCACHED_HOST_LOOKUP_FAILURE;
35 }
36
37 if (server->address_info)
38 {
39 freeaddrinfo(server->address_info);
40 server->address_info= NULL;
41 }
42 server->address_info= ai;
43
44 return MEMCACHED_SUCCESS;
45 }
46
47 static memcached_return_t set_socket_options(memcached_server_st *ptr)
48 {
49 WATCHPOINT_ASSERT(ptr->fd != -1);
50
51 if (ptr->type == MEMCACHED_CONNECTION_UDP)
52 return MEMCACHED_SUCCESS;
53
54 #ifdef HAVE_SNDTIMEO
55 if (ptr->root->snd_timeout)
56 {
57 int error;
58 struct timeval waittime;
59
60 waittime.tv_sec= 0;
61 waittime.tv_usec= ptr->root->snd_timeout;
62
63 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
64 &waittime, (socklen_t)sizeof(struct timeval));
65 WATCHPOINT_ASSERT(error == 0);
66 if (error)
67 return MEMCACHED_FAILURE;
68 }
69 #endif
70
71 #ifdef HAVE_RCVTIMEO
72 if (ptr->root->rcv_timeout)
73 {
74 int error;
75 struct timeval waittime;
76
77 waittime.tv_sec= 0;
78 waittime.tv_usec= ptr->root->rcv_timeout;
79
80 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
81 &waittime, (socklen_t)sizeof(struct timeval));
82 WATCHPOINT_ASSERT(error == 0);
83 if (error)
84 return MEMCACHED_FAILURE;
85 }
86 #endif
87
88 if (ptr->root->flags.no_block)
89 {
90 int error;
91 struct linger linger;
92
93 linger.l_onoff= 1;
94 linger.l_linger= 0; /* By default on close() just drop the socket */
95 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
96 &linger, (socklen_t)sizeof(struct linger));
97 WATCHPOINT_ASSERT(error == 0);
98 if (error)
99 return MEMCACHED_FAILURE;
100 }
101
102 if (ptr->root->flags.tcp_nodelay)
103 {
104 int flag= 1;
105 int error;
106
107 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
108 &flag, (socklen_t)sizeof(int));
109 WATCHPOINT_ASSERT(error == 0);
110 if (error)
111 return MEMCACHED_FAILURE;
112 }
113
114 if (ptr->root->flags.tcp_keepalive)
115 {
116 int flag= 1;
117 int error;
118
119 error= setsockopt(ptr->fd, SOL_SOCKET, SO_KEEPALIVE,
120 &flag, (socklen_t)sizeof(int));
121 WATCHPOINT_ASSERT(error == 0);
122 if (error)
123 return MEMCACHED_FAILURE;
124 }
125
126 if (ptr->root->send_size > 0)
127 {
128 int error;
129
130 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
131 &ptr->root->send_size, (socklen_t)sizeof(int));
132 WATCHPOINT_ASSERT(error == 0);
133 if (error)
134 return MEMCACHED_FAILURE;
135 }
136
137 if (ptr->root->recv_size > 0)
138 {
139 int error;
140
141 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
142 &ptr->root->recv_size, (socklen_t)sizeof(int));
143 WATCHPOINT_ASSERT(error == 0);
144 if (error)
145 return MEMCACHED_FAILURE;
146 }
147
148 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
149 int flags;
150
151 do
152 flags= fcntl(ptr->fd, F_GETFL, 0);
153 while (flags == -1 && (errno == EINTR || errno == EAGAIN));
154
155 unlikely (flags == -1)
156 {
157 return MEMCACHED_CONNECTION_FAILURE;
158 }
159 else if ((flags & O_NONBLOCK) == 0)
160 {
161 int rval;
162
163 do
164 rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
165 while (rval == -1 && (errno == EINTR || errno == EAGAIN));
166
167 unlikely (rval == -1)
168 {
169 return MEMCACHED_CONNECTION_FAILURE;
170 }
171 }
172
173 return MEMCACHED_SUCCESS;
174 }
175
176 static memcached_return_t unix_socket_connect(memcached_server_st *ptr)
177 {
178 struct sockaddr_un servAddr;
179
180 if (ptr->fd == -1)
181 {
182 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
183 {
184 ptr->cached_errno= errno;
185 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
186 }
187
188 memset(&servAddr, 0, sizeof (struct sockaddr_un));
189 servAddr.sun_family= AF_UNIX;
190 strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
191
192 test_connect:
193 if (connect(ptr->fd,
194 (struct sockaddr *)&servAddr,
195 sizeof(servAddr)) < 0)
196 {
197 switch (errno)
198 {
199 case EINPROGRESS:
200 case EALREADY:
201 case EINTR:
202 goto test_connect;
203 case EISCONN: /* We were spinning waiting on connect */
204 break;
205 default:
206 WATCHPOINT_ERRNO(errno);
207 ptr->cached_errno= errno;
208 return MEMCACHED_ERRNO;
209 }
210 }
211 }
212
213 WATCHPOINT_ASSERT(ptr->fd != -1);
214 return MEMCACHED_SUCCESS;
215 }
216
217 static memcached_return_t network_connect(memcached_server_st *ptr)
218 {
219 if (ptr->fd == -1)
220 {
221 struct addrinfo *use;
222
223 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
224
225 if (! ptr->options.sockaddr_inited ||
226 (!(ptr->root->flags.use_cache_lookups)))
227 {
228 memcached_return_t rc;
229
230 rc= set_hostinfo(ptr);
231 if (rc != MEMCACHED_SUCCESS)
232 return rc;
233 ptr->options.sockaddr_inited= true;
234 }
235
236 use= ptr->address_info;
237 /* Create the socket */
238 while (use != NULL)
239 {
240 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
241 if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET)
242 {
243 use= use->ai_next;
244 continue;
245 }
246
247 if ((ptr->fd= socket(use->ai_family,
248 use->ai_socktype,
249 use->ai_protocol)) < 0)
250 {
251 ptr->cached_errno= errno;
252 WATCHPOINT_ERRNO(errno);
253 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
254 }
255
256 (void)set_socket_options(ptr);
257
258 /* connect to server */
259 while (ptr->fd != -1 &&
260 connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
261 {
262 ptr->cached_errno= errno;
263 if (errno == EINPROGRESS || /* nonblocking mode - first return, */
264 errno == EALREADY) /* nonblocking mode - subsequent returns */
265 {
266 struct pollfd fds[1];
267 fds[0].fd = ptr->fd;
268 fds[0].events = POLLOUT;
269 int error= poll(fds, 1, ptr->root->connect_timeout);
270
271 if (error != 1 || fds[0].revents & POLLERR)
272 {
273 if (fds[0].revents & POLLERR)
274 {
275 int err;
276 socklen_t len = sizeof (err);
277 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
278 ptr->cached_errno= (err == 0) ? errno : err;
279 }
280
281 (void)close(ptr->fd);
282 ptr->fd= -1;
283 }
284 }
285 else if (errno == EISCONN) /* we are connected :-) */
286 {
287 break;
288 }
289 else if (errno != EINTR)
290 {
291 (void)close(ptr->fd);
292 ptr->fd= -1;
293 break;
294 }
295 }
296
297 if (ptr->fd != -1)
298 {
299 ptr->server_failure_counter= 0;
300 return MEMCACHED_SUCCESS;
301 }
302 use = use->ai_next;
303 }
304 }
305
306 if (ptr->fd == -1)
307 {
308 /* Failed to connect. schedule next retry */
309 if (ptr->root->retry_timeout)
310 {
311 struct timeval next_time;
312
313 if (gettimeofday(&next_time, NULL) == 0)
314 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
315 }
316 ptr->server_failure_counter++;
317 if (ptr->cached_errno == 0)
318 return MEMCACHED_TIMEOUT;
319
320 return MEMCACHED_ERRNO; /* The last error should be from connect() */
321 }
322
323 ptr->server_failure_counter= 0;
324 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
325 }
326
327
328 memcached_return_t memcached_connect(memcached_server_write_instance_st ptr)
329 {
330 memcached_return_t rc= MEMCACHED_NO_SERVERS;
331 LIBMEMCACHED_MEMCACHED_CONNECT_START();
332
333 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
334 WATCHPOINT_ASSERT(ptr->root);
335 if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
336 {
337 struct timeval curr_time;
338
339 gettimeofday(&curr_time, NULL);
340
341 /* if we've had too many consecutive errors on this server, mark it dead. */
342 if (ptr->server_failure_counter >= ptr->root->server_failure_limit)
343 {
344 ptr->next_retry= curr_time.tv_sec + ptr->root->retry_timeout;
345 ptr->server_failure_counter= 0;
346 }
347
348 if (curr_time.tv_sec < ptr->next_retry)
349 {
350 memcached_st *root= (memcached_st *)ptr->root;
351 // @todo fix this by fixing behavior to no longer make use of
352 // memcached_st
353 if (memcached_behavior_get(root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
354 {
355 run_distribution(root);
356 }
357
358 root->last_disconnected_server = ptr;
359
360 return MEMCACHED_SERVER_MARKED_DEAD;
361 }
362 }
363
364 /* We need to clean up the multi startup piece */
365 switch (ptr->type)
366 {
367 case MEMCACHED_CONNECTION_UNKNOWN:
368 WATCHPOINT_ASSERT(0);
369 rc= MEMCACHED_NOT_SUPPORTED;
370 break;
371 case MEMCACHED_CONNECTION_UDP:
372 case MEMCACHED_CONNECTION_TCP:
373 rc= network_connect(ptr);
374 break;
375 case MEMCACHED_CONNECTION_UNIX_SOCKET:
376 rc= unix_socket_connect(ptr);
377 break;
378 case MEMCACHED_CONNECTION_MAX:
379 default:
380 WATCHPOINT_ASSERT(0);
381 }
382
383 unlikely ( rc != MEMCACHED_SUCCESS)
384 {
385 //@todo create interface around last_discontected_server
386 memcached_st *root= (memcached_st *)ptr->root;
387 root->last_disconnected_server = ptr;
388 }
389
390 LIBMEMCACHED_MEMCACHED_CONNECT_END();
391
392 return rc;
393 }