Bug #442914: 'delete noreply' may hang the client
[m6w6/libmemcached] / libmemcached / memcached_connect.c
1 #include "common.h"
2 #include <netdb.h>
3 #include <poll.h>
4 #include <sys/time.h>
5
6 static memcached_return set_hostinfo(memcached_server_st *server)
7 {
8 struct addrinfo *ai;
9 struct addrinfo hints;
10 int e;
11 char str_port[NI_MAXSERV];
12
13 sprintf(str_port, "%u", server->port);
14
15 memset(&hints, 0, sizeof(hints));
16
17 // hints.ai_family= AF_INET;
18 if (server->type == MEMCACHED_CONNECTION_UDP)
19 {
20 hints.ai_protocol= IPPROTO_UDP;
21 hints.ai_socktype= SOCK_DGRAM;
22 }
23 else
24 {
25 hints.ai_socktype= SOCK_STREAM;
26 hints.ai_protocol= IPPROTO_TCP;
27 }
28
29 e= getaddrinfo(server->hostname, str_port, &hints, &ai);
30 if (e != 0)
31 {
32 WATCHPOINT_STRING(server->hostname);
33 WATCHPOINT_STRING(gai_strerror(e));
34 return MEMCACHED_HOST_LOOKUP_FAILURE;
35 }
36
37 if (server->address_info)
38 {
39 freeaddrinfo(server->address_info);
40 server->address_info= NULL;
41 }
42 server->address_info= ai;
43
44 return MEMCACHED_SUCCESS;
45 }
46
47 static memcached_return set_socket_options(memcached_server_st *ptr)
48 {
49 WATCHPOINT_ASSERT(ptr->fd != -1);
50
51 if (ptr->type == MEMCACHED_CONNECTION_UDP)
52 return MEMCACHED_SUCCESS;
53
54 #ifdef HAVE_SNDTIMEO
55 if (ptr->root->snd_timeout)
56 {
57 int error;
58 struct timeval waittime;
59
60 waittime.tv_sec= 0;
61 waittime.tv_usec= ptr->root->snd_timeout;
62
63 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
64 &waittime, (socklen_t)sizeof(struct timeval));
65 WATCHPOINT_ASSERT(error == 0);
66 }
67 #endif
68
69 #ifdef HAVE_RCVTIMEO
70 if (ptr->root->rcv_timeout)
71 {
72 int error;
73 struct timeval waittime;
74
75 waittime.tv_sec= 0;
76 waittime.tv_usec= ptr->root->rcv_timeout;
77
78 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
79 &waittime, (socklen_t)sizeof(struct timeval));
80 WATCHPOINT_ASSERT(error == 0);
81 }
82 #endif
83
84 if (ptr->root->flags & MEM_NO_BLOCK)
85 {
86 int error;
87 struct linger linger;
88
89 linger.l_onoff= 1;
90 linger.l_linger= 0; /* By default on close() just drop the socket */
91 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
92 &linger, (socklen_t)sizeof(struct linger));
93 WATCHPOINT_ASSERT(error == 0);
94 }
95
96 if (ptr->root->flags & MEM_TCP_NODELAY)
97 {
98 int flag= 1;
99 int error;
100
101 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
102 &flag, (socklen_t)sizeof(int));
103 WATCHPOINT_ASSERT(error == 0);
104 }
105
106 if (ptr->root->send_size)
107 {
108 int error;
109
110 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
111 &ptr->root->send_size, (socklen_t)sizeof(int));
112 WATCHPOINT_ASSERT(error == 0);
113 }
114
115 if (ptr->root->recv_size)
116 {
117 int error;
118
119 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVBUF,
120 &ptr->root->recv_size, (socklen_t)sizeof(int));
121 WATCHPOINT_ASSERT(error == 0);
122 }
123
124 /* For the moment, not getting a nonblocking mode will not be fatal */
125 if ((ptr->root->flags & MEM_NO_BLOCK) || ptr->root->connect_timeout)
126 {
127 int flags;
128
129 flags= fcntl(ptr->fd, F_GETFL, 0);
130 unlikely (flags != -1)
131 {
132 (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
133 }
134 }
135
136 return MEMCACHED_SUCCESS;
137 }
138
139 static memcached_return unix_socket_connect(memcached_server_st *ptr)
140 {
141 struct sockaddr_un servAddr;
142 socklen_t addrlen;
143
144 if (ptr->fd == -1)
145 {
146 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
147 {
148 ptr->cached_errno= errno;
149 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
150 }
151
152 memset(&servAddr, 0, sizeof (struct sockaddr_un));
153 servAddr.sun_family= AF_UNIX;
154 strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
155
156 addrlen= (socklen_t) (strlen(servAddr.sun_path) + sizeof(servAddr.sun_family));
157
158 test_connect:
159 if (connect(ptr->fd,
160 (struct sockaddr *)&servAddr,
161 sizeof(servAddr)) < 0)
162 {
163 switch (errno)
164 {
165 case EINPROGRESS:
166 case EALREADY:
167 case EINTR:
168 goto test_connect;
169 case EISCONN: /* We were spinning waiting on connect */
170 break;
171 default:
172 WATCHPOINT_ERRNO(errno);
173 ptr->cached_errno= errno;
174 return MEMCACHED_ERRNO;
175 }
176 }
177 }
178
179 WATCHPOINT_ASSERT(ptr->fd != -1);
180 return MEMCACHED_SUCCESS;
181 }
182
183 static memcached_return network_connect(memcached_server_st *ptr)
184 {
185 if (ptr->fd == -1)
186 {
187 struct addrinfo *use;
188
189 if (!ptr->sockaddr_inited ||
190 (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS)))
191 {
192 memcached_return rc;
193
194 rc= set_hostinfo(ptr);
195 if (rc != MEMCACHED_SUCCESS)
196 return rc;
197 ptr->sockaddr_inited= true;
198 }
199
200 use= ptr->address_info;
201 /* Create the socket */
202 while (use != NULL)
203 {
204 /* Memcache server does not support IPV6 in udp mode, so skip if not ipv4 */
205 if (ptr->type == MEMCACHED_CONNECTION_UDP && use->ai_family != AF_INET)
206 {
207 use= use->ai_next;
208 continue;
209 }
210
211 if ((ptr->fd= socket(use->ai_family,
212 use->ai_socktype,
213 use->ai_protocol)) < 0)
214 {
215 ptr->cached_errno= errno;
216 WATCHPOINT_ERRNO(errno);
217 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
218 }
219
220 (void)set_socket_options(ptr);
221
222 int flags= 0;
223 if (ptr->root->connect_timeout)
224 {
225 flags= fcntl(ptr->fd, F_GETFL, 0);
226 if (flags != -1 && !(flags & O_NONBLOCK))
227 (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
228 }
229
230 /* connect to server */
231 while (ptr->fd != -1 &&
232 connect(ptr->fd, use->ai_addr, use->ai_addrlen) < 0)
233 {
234 ptr->cached_errno= errno;
235 if (errno == EINPROGRESS || /* nonblocking mode - first return, */
236 errno == EALREADY) /* nonblocking mode - subsequent returns */
237 {
238 struct pollfd fds[1];
239 fds[0].fd = ptr->fd;
240 fds[0].events = POLLOUT;
241 int error= poll(fds, 1, ptr->root->connect_timeout);
242
243 if (error != 1 || fds[0].revents & POLLERR)
244 {
245 if (fds[0].revents & POLLERR)
246 {
247 int err;
248 socklen_t len = sizeof (err);
249 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
250 ptr->cached_errno= (err == 0) ? errno : err;
251 }
252
253 (void)close(ptr->fd);
254 ptr->fd= -1;
255 }
256 }
257 else if (errno == EISCONN) /* we are connected :-) */
258 {
259 break;
260 }
261 else if (errno != EINTR)
262 {
263 (void)close(ptr->fd);
264 ptr->fd= -1;
265 break;
266 }
267 }
268
269 if (ptr->fd != -1)
270 {
271 /* restore flags */
272 if (ptr->root->connect_timeout && (ptr->root->flags & MEM_NO_BLOCK) == 0)
273 (void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK);
274
275 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
276 ptr->server_failure_counter= 0;
277 return MEMCACHED_SUCCESS;
278 }
279 use = use->ai_next;
280 }
281 }
282
283 if (ptr->fd == -1)
284 {
285 /* Failed to connect. schedule next retry */
286 if (ptr->root->retry_timeout)
287 {
288 struct timeval next_time;
289
290 if (gettimeofday(&next_time, NULL) == 0)
291 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
292 }
293 ptr->server_failure_counter+= 1;
294 if (ptr->cached_errno == 0)
295 return MEMCACHED_TIMEOUT;
296 return MEMCACHED_ERRNO; /* The last error should be from connect() */
297 }
298
299 ptr->server_failure_counter= 0;
300 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
301 }
302
303
304 memcached_return memcached_connect(memcached_server_st *ptr)
305 {
306 memcached_return rc= MEMCACHED_NO_SERVERS;
307 LIBMEMCACHED_MEMCACHED_CONNECT_START();
308
309 /* both retry_timeout and server_failure_limit must be set in order to delay retrying a server on error. */
310 WATCHPOINT_ASSERT(ptr->root);
311 if (ptr->root->retry_timeout && ptr->root->server_failure_limit)
312 {
313 struct timeval next_time;
314
315 gettimeofday(&next_time, NULL);
316
317 /* if we've had too many consecutive errors on this server, mark it dead. */
318 if (ptr->server_failure_counter > ptr->root->server_failure_limit)
319 {
320 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
321 ptr->server_failure_counter= 0;
322 }
323
324 if (next_time.tv_sec < ptr->next_retry)
325 {
326 if (memcached_behavior_get(ptr->root, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS))
327 run_distribution(ptr->root);
328
329 return MEMCACHED_SERVER_MARKED_DEAD;
330 }
331 }
332
333 /* We need to clean up the multi startup piece */
334 switch (ptr->type)
335 {
336 case MEMCACHED_CONNECTION_UNKNOWN:
337 WATCHPOINT_ASSERT(0);
338 rc= MEMCACHED_NOT_SUPPORTED;
339 break;
340 case MEMCACHED_CONNECTION_UDP:
341 case MEMCACHED_CONNECTION_TCP:
342 rc= network_connect(ptr);
343 break;
344 case MEMCACHED_CONNECTION_UNIX_SOCKET:
345 rc= unix_socket_connect(ptr);
346 break;
347 default:
348 WATCHPOINT_ASSERT(0);
349 }
350
351 LIBMEMCACHED_MEMCACHED_CONNECT_END();
352
353 return rc;
354 }