Added missing constant.
[m6w6/libmemcached] / libmemcached / connect.c
1 #include "common.h"
2 #include <netdb.h>
3 #include <poll.h>
4 #include <sys/time.h>
5
6 static memcached_return_t set_hostinfo(memcached_server_st *server)
7 {
8 struct addrinfo *ai;
9 struct addrinfo hints;
10 int e;
11 char str_port[NI_MAXSERV];
12
13 snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
14
15 memset(&hints, 0, sizeof(hints));
16
17 // hints.ai_family= AF_INET;
18 if (server->type == MEMCACHED_CONNECTION_UDP)
19 {
20 hints.ai_protocol= IPPROTO_UDP;
21 hints.ai_socktype= SOCK_DGRAM;
22 }
23 else
24 {
25 hints.ai_socktype= SOCK_STREAM;
26 hints.ai_protocol= IPPROTO_TCP;
27 }
28
29 e= getaddrinfo(server->hostname, str_port, &hints, &ai);
30 if (e != 0)
31 {
32 WATCHPOINT_STRING(server->hostname);
33 WATCHPOINT_STRING(gai_strerror(e));
34 return MEMCACHED_HOST_LOOKUP_FAILURE;
35 }
36
37 if (server->address_info)
38 {
39 freeaddrinfo(server->address_info);
40 server->address_info= NULL;
41 }
42 server->address_info= ai;
43
44 return MEMCACHED_SUCCESS;
45 }
46
47 static memcached_return_t set_socket_options(memcached_server_st *ptr)
48 {
49 WATCHPOINT_ASSERT(ptr->fd != -1);
50
51 if (ptr->type == MEMCACHED_CONNECTION_UDP)
52 return MEMCACHED_SUCCESS;
53
54 #ifdef HAVE_SNDTIMEO
55 if (ptr->root->snd_timeout)
56 {
57 int error;
58 struct timeval waittime;
59
60 waittime.tv_sec= 0;
61 waittime.tv_usec= ptr->root->snd_timeout;
62
63 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
64 &waittime, (socklen_t)sizeof(struct timeval));
65 WATCHPOINT_ASSERT(error == 0);
66 if (error)
67 return MEMCACHED_FAILURE;
68 }
69 #endif
70
71 #ifdef HAVE_RCVTIMEO
72 if (ptr->root->rcv_timeout)
73 {
74 int error;
75 struct timeval waittime;
76
77 waittime.tv_sec= 0;
78 waittime.tv_usec= ptr->root->rcv_timeout;
79
80 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
81 &waittime, (socklen_t)sizeof(struct timeval));
82 WATCHPOINT_ASSERT(error == 0);
83 if (error)
84 return MEMCACHED_FAILURE;
85 }
86 #endif
87
88 if (ptr->root->flags.no_block)
89 {
90 int error;
91 struct linger linger;
92
93 linger.l_onoff= 1;
94 linger.l_linger= 0; /* By default on close() just drop the socket */
95 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
96 &linger, (socklen_t)sizeof(struct linger));
97 WATCHPOINT_ASSERT(error == 0);
98 if (error)
99 return MEMCACHED_FAILURE;
100 }
101
102 if (ptr->root->flags.tcp_nodelay)
103 {
104 int flag= 1;
105 int error;
106
107 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
108 &flag, (socklen_t)sizeof(int));
109 WATCHPOINT_ASSERT(error == 0);
110 if (error)
111 return MEMCACHED_FAILURE;
112 }
113
114 if (ptr->root->flags.tcp_keepalive)
115 {
116 int flag= 1;
117 int error;
118
119 error= setsockopt(ptr->fd, SOL_SOCKET, SO_KEEPALIVE,
120 &flag, (socklen_t)sizeof(int));
121 WATCHPOINT_ASSERT(error == 0);
122 if (error)
123 return MEMCACHED_FAILURE;
124 }
125
126 #ifdef TCP_KEEPIDLE
127 if (ptr->root->tcp_keepidle > 0)
128 {
129 int error;
130
131 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_KEEPIDLE,
132 &ptr->root->tcp_keepidle, (socklen_t)sizeof(int));
133 WATCHPOINT_ASSERT(error == 0);
134 if (error)
135 return MEMCACHED_FAILURE;
136 }
137 #endif
138
139 if (ptr->root->send_size > 0)
140 {
141 int error;
142
143 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
144 &ptr->root->send_size, (socklen_t)sizeof(int));
145 WATCHPOINT_ASSERT(error == 0);
146 if (error)
147 return MEMCACHED_FAILURE;
148 }
149
150 if (ptr->root->recv_size > 0)
151 {
152 int error;
153
154 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
155 &ptr->root->recv_size, (socklen_t)sizeof(int));
156 WATCHPOINT_ASSERT(error == 0);
157 if (error)
158 return MEMCACHED_FAILURE;
159 }
160
161 /* libmemcached will always use nonblocking IO to avoid write deadlocks */
162 int flags;
163
164 do
165 flags= fcntl(ptr->fd, F_GETFL, 0);
166 while (flags == -1 && (errno == EINTR || errno == EAGAIN));
167
168 unlikely (flags == -1)
169 {
170 return MEMCACHED_CONNECTION_FAILURE;
171 }
172 else if ((flags & O_NONBLOCK) == 0)
173 {
174 int rval;
175
176 do
177 rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
178 while (rval == -1 && (errno == EINTR || errno == EAGAIN));
179
180 unlikely (rval == -1)
181 {
182 return MEMCACHED_CONNECTION_FAILURE;
183 }
184 }
185
186 return MEMCACHED_SUCCESS;
187 }
188
189 static memcached_return_t unix_socket_connect(memcached_server_st *ptr)
190 {
191 struct sockaddr_un servAddr;
192
193 if (ptr->fd == -1)
194 {
195 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
196 {
197 ptr->cached_errno= errno;
198 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
199 }
200
201 memset(&servAddr, 0, sizeof (struct sockaddr_un));
202 servAddr.sun_family= AF_UNIX;
203 strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
204
205 test_connect:
206 if (connect(ptr->fd,
207 (struct sockaddr *)&servAddr,
208 sizeof(servAddr)) < 0)
209 {
210 switch (errno)
211 {
212 case EINPROGRESS:
213 case EALREADY:
214 case EINTR:
215 goto test_connect;
216 case EISCONN: /* We were spinning waiting on connect */
217 break;
218 default:
219 WATCHPOINT_ERRNO(errno);
220 ptr->cached_errno= errno;
221 return MEMCACHED_ERRNO;
222 }
223 }
224 }
225
226 WATCHPOINT_ASSERT(ptr->fd != -1);
227 return MEMCACHED_SUCCESS;
228 }
229
230 static memcached_return_t network_connect(memcached_server_st *ptr)
231 {
232 if (ptr->fd == -1)
233 {
234 struct addrinfo *use;
235
236 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
237
238 if (! ptr->options.sockaddr_inited ||
239 (!(ptr->root->flags.use_cache_lookups)))
240 {
241 memcached_return_t rc;
242
243 rc= set_hostinfo(ptr);
244 if (rc != MEMCACHED_SUCCESS)
245 return rc;
246 ptr->options.sockaddr_inited= true;
247 }
248
249 use= ptr->address_info;
250 /* Create the socket */
251 while (use != NULL)
252 {
253 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
254 if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET)
255 {
256 use= use->ai_next;
257 continue;
258 }
259
260 if ((ptr->fd= socket(use->ai_family,
261 use->ai_socktype,
262 use->ai_protocol)) < 0)
263 {
264 ptr->cached_errno= errno;
265 WATCHPOINT_ERRNO(errno);
266 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
267 }
268
269 (void)set_socket_options(ptr);
270
271 /* connect to server */
272 while (ptr->fd != -1 &&
273 connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
274 {
275 ptr->cached_errno= errno;
276 if (errno == EINPROGRESS || /* nonblocking mode - first return, */
277 errno == EALREADY) /* nonblocking mode - subsequent returns */
278 {
279 struct pollfd fds[1];
280 fds[0].fd = ptr->fd;
281 fds[0].events = POLLOUT;
282 int error= poll(fds, 1, ptr->root->connect_timeout);
283
284 if (error != 1 || fds[0].revents & POLLERR)
285 {
286 if (fds[0].revents & POLLERR)
287 {
288 int err;
289 socklen_t len = sizeof (err);
290 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
291 ptr->cached_errno= (err == 0) ? errno : err;
292 }
293
294 (void)close(ptr->fd);
295 ptr->fd= -1;
296 }
297 }
298 else if (errno == EISCONN) /* we are connected :-) */
299 {
300 break;
301 }
302 else if (errno != EINTR)
303 {
304 (void)close(ptr->fd);
305 ptr->fd= -1;
306 break;
307 }
308 }
309
310 if (ptr->fd != -1)
311 {
312 ptr->server_failure_counter= 0;
313 return MEMCACHED_SUCCESS;
314 }
315 use = use->ai_next;
316 }
317 }
318
319 if (ptr->fd == -1)
320 {
321 /* Failed to connect. schedule next retry */
322 if (ptr->root->retry_timeout)
323 {
324 struct timeval next_time;
325
326 if (gettimeofday(&next_time, NULL) == 0)
327 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
328 }
329 ptr->server_failure_counter++;
330 if (ptr->cached_errno == 0)
331 return MEMCACHED_TIMEOUT;
332
333 return MEMCACHED_ERRNO; /* The last error should be from connect() */
334 }
335
336 ptr->server_failure_counter= 0;
337 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
338 }
339
340
341 memcached_return_t memcached_connect(memcached_server_write_instance_st ptr)
342 {
343 memcached_return_t rc= MEMCACHED_NO_SERVERS;
344 LIBMEMCACHED_MEMCACHED_CONNECT_START();
345
346 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
347 WATCHPOINT_ASSERT(ptr->root);
348 if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
349 {
350 struct timeval curr_time;
351
352 gettimeofday(&curr_time, NULL);
353
354 /* if we've had too many consecutive errors on this server, mark it dead. */
355 if (ptr->server_failure_counter >= ptr->root->server_failure_limit)
356 {
357 ptr->next_retry= curr_time.tv_sec + ptr->root->retry_timeout;
358 ptr->server_failure_counter= 0;
359 }
360
361 if (curr_time.tv_sec < ptr->next_retry)
362 {
363 memcached_st *root= (memcached_st *)ptr->root;
364 // @todo fix this by fixing behavior to no longer make use of
365 // memcached_st
366 if (memcached_behavior_get(root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
367 {
368 run_distribution(root);
369 }
370
371 root->last_disconnected_server = ptr;
372
373 return MEMCACHED_SERVER_MARKED_DEAD;
374 }
375 }
376
377 /* We need to clean up the multi startup piece */
378 switch (ptr->type)
379 {
380 case MEMCACHED_CONNECTION_UNKNOWN:
381 WATCHPOINT_ASSERT(0);
382 rc= MEMCACHED_NOT_SUPPORTED;
383 break;
384 case MEMCACHED_CONNECTION_UDP:
385 case MEMCACHED_CONNECTION_TCP:
386 rc= network_connect(ptr);
387 break;
388 case MEMCACHED_CONNECTION_UNIX_SOCKET:
389 rc= unix_socket_connect(ptr);
390 break;
391 case MEMCACHED_CONNECTION_MAX:
392 default:
393 WATCHPOINT_ASSERT(0);
394 }
395
396 unlikely ( rc != MEMCACHED_SUCCESS)
397 {
398 //@todo create interface around last_discontected_server
399 memcached_st *root= (memcached_st *)ptr->root;
400 root->last_disconnected_server = ptr;
401 }
402
403 LIBMEMCACHED_MEMCACHED_CONNECT_END();
404
405 return rc;
406 }