Merge Thomason's cork patch.
[awesomized/libmemcached] / libmemcached / connect.c
1 #include "common.h"
2 #include <netdb.h>
3 #include <poll.h>
4 #include <sys/time.h>
5
6 static memcached_return_t set_hostinfo(memcached_server_st *server)
7 {
8 struct addrinfo *ai;
9 struct addrinfo hints;
10 int e;
11 char str_port[NI_MAXSERV];
12
13 snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
14
15 memset(&hints, 0, sizeof(hints));
16
17 // hints.ai_family= AF_INET;
18 if (server->type == MEMCACHED_CONNECTION_UDP)
19 {
20 hints.ai_protocol= IPPROTO_UDP;
21 hints.ai_socktype= SOCK_DGRAM;
22 }
23 else
24 {
25 hints.ai_socktype= SOCK_STREAM;
26 hints.ai_protocol= IPPROTO_TCP;
27 }
28
29 e= getaddrinfo(server->hostname, str_port, &hints, &ai);
30 if (e != 0)
31 {
32 WATCHPOINT_STRING(server->hostname);
33 WATCHPOINT_STRING(gai_strerror(e));
34 return MEMCACHED_HOST_LOOKUP_FAILURE;
35 }
36
37 if (server->address_info)
38 {
39 freeaddrinfo(server->address_info);
40 server->address_info= NULL;
41 }
42 server->address_info= ai;
43
44 return MEMCACHED_SUCCESS;
45 }
46
47 static memcached_return_t set_socket_options(memcached_server_st *ptr)
48 {
49 WATCHPOINT_ASSERT(ptr->fd != -1);
50
51 if (ptr->type == MEMCACHED_CONNECTION_UDP)
52 return MEMCACHED_SUCCESS;
53
54 #ifdef HAVE_SNDTIMEO
55 if (ptr->root->snd_timeout)
56 {
57 int error;
58 struct timeval waittime;
59
60 waittime.tv_sec= 0;
61 waittime.tv_usec= ptr->root->snd_timeout;
62
63 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
64 &waittime, (socklen_t)sizeof(struct timeval));
65 WATCHPOINT_ASSERT(error == 0);
66 if (error)
67 return MEMCACHED_FAILURE;
68 }
69 #endif
70
71 #ifdef HAVE_RCVTIMEO
72 if (ptr->root->rcv_timeout)
73 {
74 int error;
75 struct timeval waittime;
76
77 waittime.tv_sec= 0;
78 waittime.tv_usec= ptr->root->rcv_timeout;
79
80 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
81 &waittime, (socklen_t)sizeof(struct timeval));
82 WATCHPOINT_ASSERT(error == 0);
83 if (error)
84 return MEMCACHED_FAILURE;
85 }
86 #endif
87
88 if (ptr->root->flags.no_block)
89 {
90 int error;
91 struct linger linger;
92
93 linger.l_onoff= 1;
94 linger.l_linger= 0; /* By default on close() just drop the socket */
95 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
96 &linger, (socklen_t)sizeof(struct linger));
97 WATCHPOINT_ASSERT(error == 0);
98 if (error)
99 return MEMCACHED_FAILURE;
100 }
101
102 if (ptr->root->flags.tcp_nodelay)
103 {
104 int flag= 1;
105 int error;
106
107 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
108 &flag, (socklen_t)sizeof(int));
109 WATCHPOINT_ASSERT(error == 0);
110 if (error)
111 return MEMCACHED_FAILURE;
112 }
113
114 if (ptr->root->send_size > 0)
115 {
116 int error;
117
118 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
119 &ptr->root->send_size, (socklen_t)sizeof(int));
120 WATCHPOINT_ASSERT(error == 0);
121 if (error)
122 return MEMCACHED_FAILURE;
123 }
124
125 if (ptr->root->recv_size > 0)
126 {
127 int error;
128
129 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
130 &ptr->root->recv_size, (socklen_t)sizeof(int));
131 WATCHPOINT_ASSERT(error == 0);
132 if (error)
133 return MEMCACHED_FAILURE;
134 }
135
136 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
137 int flags;
138
139 do
140 flags= fcntl(ptr->fd, F_GETFL, 0);
141 while (flags == -1 && (errno == EINTR || errno == EAGAIN));
142
143 unlikely (flags == -1)
144 {
145 return MEMCACHED_CONNECTION_FAILURE;
146 }
147 else if ((flags & O_NONBLOCK) == 0)
148 {
149 int rval;
150
151 do
152 rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
153 while (rval == -1 && (errno == EINTR || errno == EAGAIN));
154
155 unlikely (rval == -1)
156 {
157 return MEMCACHED_CONNECTION_FAILURE;
158 }
159 }
160
161 return MEMCACHED_SUCCESS;
162 }
163
164 static memcached_return_t unix_socket_connect(memcached_server_st *ptr)
165 {
166 struct sockaddr_un servAddr;
167
168 if (ptr->fd == -1)
169 {
170 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
171 {
172 ptr->cached_errno= errno;
173 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
174 }
175
176 memset(&servAddr, 0, sizeof (struct sockaddr_un));
177 servAddr.sun_family= AF_UNIX;
178 strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
179
180 test_connect:
181 if (connect(ptr->fd,
182 (struct sockaddr *)&servAddr,
183 sizeof(servAddr)) < 0)
184 {
185 switch (errno)
186 {
187 case EINPROGRESS:
188 case EALREADY:
189 case EINTR:
190 goto test_connect;
191 case EISCONN: /* We were spinning waiting on connect */
192 break;
193 default:
194 WATCHPOINT_ERRNO(errno);
195 ptr->cached_errno= errno;
196 return MEMCACHED_ERRNO;
197 }
198 }
199 }
200
201 WATCHPOINT_ASSERT(ptr->fd != -1);
202 return MEMCACHED_SUCCESS;
203 }
204
205 static memcached_return_t network_connect(memcached_server_st *ptr)
206 {
207 if (ptr->fd == -1)
208 {
209 struct addrinfo *use;
210
211 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
212
213 if (! ptr->options.sockaddr_inited ||
214 (!(ptr->root->flags.use_cache_lookups)))
215 {
216 memcached_return_t rc;
217
218 rc= set_hostinfo(ptr);
219 if (rc != MEMCACHED_SUCCESS)
220 return rc;
221 ptr->options.sockaddr_inited= true;
222 }
223
224 use= ptr->address_info;
225 /* Create the socket */
226 while (use != NULL)
227 {
228 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
229 if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET)
230 {
231 use= use->ai_next;
232 continue;
233 }
234
235 if ((ptr->fd= socket(use->ai_family,
236 use->ai_socktype,
237 use->ai_protocol)) < 0)
238 {
239 ptr->cached_errno= errno;
240 WATCHPOINT_ERRNO(errno);
241 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
242 }
243
244 (void)set_socket_options(ptr);
245
246 /* connect to server */
247 while (ptr->fd != -1 &&
248 connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
249 {
250 ptr->cached_errno= errno;
251 if (errno == EINPROGRESS || /* nonblocking mode - first return, */
252 errno == EALREADY) /* nonblocking mode - subsequent returns */
253 {
254 struct pollfd fds[1];
255 fds[0].fd = ptr->fd;
256 fds[0].events = POLLOUT;
257 int error= poll(fds, 1, ptr->root->connect_timeout);
258
259 if (error != 1 || fds[0].revents & POLLERR)
260 {
261 if (fds[0].revents & POLLERR)
262 {
263 int err;
264 socklen_t len = sizeof (err);
265 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
266 ptr->cached_errno= (err == 0) ? errno : err;
267 }
268
269 (void)close(ptr->fd);
270 ptr->fd= -1;
271 }
272 }
273 else if (errno == EISCONN) /* we are connected :-) */
274 {
275 break;
276 }
277 else if (errno != EINTR)
278 {
279 (void)close(ptr->fd);
280 ptr->fd= -1;
281 break;
282 }
283 }
284
285 if (ptr->fd != -1)
286 {
287 ptr->server_failure_counter= 0;
288 return MEMCACHED_SUCCESS;
289 }
290 use = use->ai_next;
291 }
292 }
293
294 if (ptr->fd == -1)
295 {
296 /* Failed to connect. schedule next retry */
297 if (ptr->root->retry_timeout)
298 {
299 struct timeval next_time;
300
301 if (gettimeofday(&next_time, NULL) == 0)
302 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
303 }
304 ptr->server_failure_counter++;
305 if (ptr->cached_errno == 0)
306 return MEMCACHED_TIMEOUT;
307
308 return MEMCACHED_ERRNO; /* The last error should be from connect() */
309 }
310
311 ptr->server_failure_counter= 0;
312 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
313 }
314
315
316 memcached_return_t memcached_connect(memcached_server_st *ptr)
317 {
318 memcached_return_t rc= MEMCACHED_NO_SERVERS;
319 LIBMEMCACHED_MEMCACHED_CONNECT_START();
320
321 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
322 WATCHPOINT_ASSERT(ptr->root);
323 if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
324 {
325 struct timeval curr_time;
326
327 gettimeofday(&curr_time, NULL);
328
329 /* if we've had too many consecutive errors on this server, mark it dead. */
330 if (ptr->server_failure_counter >= ptr->root->server_failure_limit)
331 {
332 ptr->next_retry= curr_time.tv_sec + ptr->root->retry_timeout;
333 ptr->server_failure_counter= 0;
334 }
335
336 if (curr_time.tv_sec < ptr->next_retry)
337 {
338 if (memcached_behavior_get(ptr->root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
339 {
340 run_distribution(ptr->root);
341 }
342
343 ptr->root->last_disconnected_server = ptr;
344 return MEMCACHED_SERVER_MARKED_DEAD;
345 }
346 }
347
348 /* We need to clean up the multi startup piece */
349 switch (ptr->type)
350 {
351 case MEMCACHED_CONNECTION_UNKNOWN:
352 WATCHPOINT_ASSERT(0);
353 rc= MEMCACHED_NOT_SUPPORTED;
354 break;
355 case MEMCACHED_CONNECTION_UDP:
356 case MEMCACHED_CONNECTION_TCP:
357 rc= network_connect(ptr);
358 break;
359 case MEMCACHED_CONNECTION_UNIX_SOCKET:
360 rc= unix_socket_connect(ptr);
361 break;
362 case MEMCACHED_CONNECTION_MAX:
363 default:
364 WATCHPOINT_ASSERT(0);
365 }
366
367 unlikely ( rc != MEMCACHED_SUCCESS) ptr->root->last_disconnected_server = ptr;
368
369 LIBMEMCACHED_MEMCACHED_CONNECT_END();
370
371 return rc;
372 }