Implement the server failure rehash mechanism
[awesomized/libmemcached] / libmemcached / memcached_connect.c
1 #include "common.h"
2 #include <poll.h>
3 #include <sys/time.h>
4
5 static memcached_return set_hostinfo(memcached_server_st *server)
6 {
7 struct addrinfo *ai;
8 struct addrinfo hints;
9 int e;
10 char str_port[NI_MAXSERV];
11
12 sprintf(str_port, "%u", server->port);
13
14 memset(&hints, 0, sizeof(hints));
15
16 hints.ai_family= AF_INET;
17 if (server->type == MEMCACHED_CONNECTION_UDP)
18 {
19 hints.ai_protocol= IPPROTO_UDP;
20 hints.ai_socktype= SOCK_DGRAM;
21 }
22 else
23 {
24 hints.ai_socktype= SOCK_STREAM;
25 hints.ai_protocol= IPPROTO_TCP;
26 }
27
28 e= getaddrinfo(server->hostname, str_port, &hints, &ai);
29 if (e != 0)
30 {
31 WATCHPOINT_STRING(server->hostname);
32 WATCHPOINT_STRING(gai_strerror(e));
33 return MEMCACHED_HOST_LOOKUP_FAILURE;
34 }
35
36 if (server->address_info)
37 freeaddrinfo(server->address_info);
38 server->address_info= ai;
39
40 return MEMCACHED_SUCCESS;
41 }
42
43 static memcached_return set_socket_options(memcached_server_st *ptr)
44 {
45 if (ptr->type == MEMCACHED_CONNECTION_UDP)
46 return MEMCACHED_SUCCESS;
47
48 if (ptr->root->snd_timeout)
49 {
50 int error;
51 struct timeval waittime;
52
53 waittime.tv_sec= 0;
54 waittime.tv_usec= ptr->root->snd_timeout;
55
56 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
57 &waittime, (socklen_t)sizeof(struct timeval));
58 WATCHPOINT_ASSERT(error == 0);
59 }
60
61 if (ptr->root->rcv_timeout)
62 {
63 int error;
64 struct timeval waittime;
65
66 waittime.tv_sec= 0;
67 waittime.tv_usec= ptr->root->rcv_timeout;
68
69 error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
70 &waittime, (socklen_t)sizeof(struct timeval));
71 WATCHPOINT_ASSERT(error == 0);
72 }
73
74 {
75 int error;
76 struct linger linger;
77
78 linger.l_onoff= 1;
79 linger.l_linger= MEMCACHED_DEFAULT_TIMEOUT;
80 error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
81 &linger, (socklen_t)sizeof(struct linger));
82 WATCHPOINT_ASSERT(error == 0);
83 }
84
85 if (ptr->root->flags & MEM_TCP_NODELAY)
86 {
87 int flag= 1;
88 int error;
89
90 error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
91 &flag, (socklen_t)sizeof(int));
92 WATCHPOINT_ASSERT(error == 0);
93 }
94
95 if (ptr->root->send_size)
96 {
97 int error;
98
99 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
100 &ptr->root->send_size, (socklen_t)sizeof(int));
101 WATCHPOINT_ASSERT(error == 0);
102 }
103
104 if (ptr->root->recv_size)
105 {
106 int error;
107
108 error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
109 &ptr->root->recv_size, (socklen_t)sizeof(int));
110 WATCHPOINT_ASSERT(error == 0);
111 }
112
113 /* For the moment, not getting a nonblocking mode will not be fatal */
114 if (ptr->root->flags & MEM_NO_BLOCK)
115 {
116 int flags;
117
118 flags= fcntl(ptr->fd, F_GETFL, 0);
119 unlikely (flags != -1)
120 {
121 (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
122 }
123 }
124
125 return MEMCACHED_SUCCESS;
126 }
127
128 static memcached_return unix_socket_connect(memcached_server_st *ptr)
129 {
130 struct sockaddr_un servAddr;
131 socklen_t addrlen;
132
133 if (ptr->fd == -1)
134 {
135 if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
136 {
137 ptr->cached_errno= errno;
138 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
139 }
140
141 memset(&servAddr, 0, sizeof (struct sockaddr_un));
142 servAddr.sun_family= AF_UNIX;
143 strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
144
145 addrlen= strlen(servAddr.sun_path) + sizeof(servAddr.sun_family);
146
147 test_connect:
148 if (connect(ptr->fd,
149 (struct sockaddr *)&servAddr,
150 sizeof(servAddr)) < 0)
151 {
152 switch (errno) {
153 case EINPROGRESS:
154 case EALREADY:
155 case EINTR:
156 goto test_connect;
157 case EISCONN: /* We were spinning waiting on connect */
158 break;
159 default:
160 WATCHPOINT_ERRNO(errno);
161 ptr->cached_errno= errno;
162 return MEMCACHED_ERRNO;
163 }
164 }
165 }
166 return MEMCACHED_SUCCESS;
167 }
168
169 static memcached_return network_connect(memcached_server_st *ptr)
170 {
171 if (ptr->fd == -1)
172 {
173 struct addrinfo *use;
174
175 if(ptr->root->server_failure_limit != 0) {
176 if(ptr->server_failure_counter >= ptr->root->server_failure_limit) {
177 server_remove(ptr);
178 }
179 }
180 /* Old connection junk still is in the structure */
181 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
182
183 if (ptr->sockaddr_inited == MEMCACHED_NOT_ALLOCATED ||
184 (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS)))
185 {
186 memcached_return rc;
187
188 rc= set_hostinfo(ptr);
189 if (rc != MEMCACHED_SUCCESS)
190 return rc;
191 ptr->sockaddr_inited= MEMCACHED_ALLOCATED;
192 }
193
194 use= ptr->address_info;
195 /* Create the socket */
196 while (use != NULL)
197 {
198 if ((ptr->fd= socket(use->ai_family,
199 use->ai_socktype,
200 use->ai_protocol)) < 0)
201 {
202 ptr->cached_errno= errno;
203 WATCHPOINT_ERRNO(errno);
204 return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
205 }
206
207 (void)set_socket_options(ptr);
208
209 /* connect to server */
210 test_connect:
211 if (connect(ptr->fd,
212 use->ai_addr,
213 use->ai_addrlen) < 0)
214 {
215 switch (errno) {
216 /* We are spinning waiting on connect */
217 case EALREADY:
218 case EINPROGRESS:
219 {
220 struct pollfd fds[1];
221 int error;
222
223 memset(&fds, 0, sizeof(struct pollfd));
224 fds[0].fd= ptr->fd;
225 fds[0].events= POLLOUT | POLLERR;
226 error= poll(fds, 1, ptr->root->connect_timeout);
227
228 if (error == 0)
229 {
230 goto handle_retry;
231 }
232 else if (error != 1 || fds[0].revents & POLLERR)
233 {
234 ptr->cached_errno= errno;
235 WATCHPOINT_ERRNO(ptr->cached_errno);
236 WATCHPOINT_NUMBER(ptr->root->connect_timeout);
237 close(ptr->fd);
238 ptr->fd= -1;
239 if (ptr->address_info)
240 {
241 freeaddrinfo(ptr->address_info);
242 ptr->address_info= NULL;
243 }
244
245 if (ptr->root->retry_timeout)
246 {
247 struct timeval next_time;
248
249 gettimeofday(&next_time, NULL);
250 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
251 }
252 ptr->server_failure_counter+= 1;
253 return MEMCACHED_ERRNO;
254 }
255
256 break;
257 }
258 /* We are spinning waiting on connect */
259 case EINTR:
260 goto test_connect;
261 case EISCONN: /* We were spinning waiting on connect */
262 break;
263 default:
264 handle_retry:
265 ptr->cached_errno= errno;
266 close(ptr->fd);
267 ptr->fd= -1;
268 if (ptr->root->retry_timeout)
269 {
270 struct timeval next_time;
271
272 gettimeofday(&next_time, NULL);
273 ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout;
274 }
275 }
276 }
277 else
278 {
279 WATCHPOINT_ASSERT(ptr->cursor_active == 0);
280 ptr->server_failure_counter= 0;
281 return MEMCACHED_SUCCESS;
282 }
283 use = use->ai_next;
284 }
285 }
286
287 if (ptr->fd == -1) {
288 ptr->server_failure_counter+= 1;
289 return MEMCACHED_ERRNO; /* The last error should be from connect() */
290 }
291
292 ptr->server_failure_counter= 0;
293 return MEMCACHED_SUCCESS; /* The last error should be from connect() */
294 }
295
296
297 memcached_return memcached_connect(memcached_server_st *ptr)
298 {
299 memcached_return rc= MEMCACHED_NO_SERVERS;
300 LIBMEMCACHED_MEMCACHED_CONNECT_START();
301
302 if (ptr->root->retry_timeout)
303 {
304 struct timeval next_time;
305
306 gettimeofday(&next_time, NULL);
307 if (next_time.tv_sec < ptr->next_retry)
308 return MEMCACHED_TIMEOUT;
309 }
310 /* We need to clean up the multi startup piece */
311 switch (ptr->type)
312 {
313 case MEMCACHED_CONNECTION_UNKNOWN:
314 WATCHPOINT_ASSERT(0);
315 rc= MEMCACHED_NOT_SUPPORTED;
316 break;
317 case MEMCACHED_CONNECTION_UDP:
318 case MEMCACHED_CONNECTION_TCP:
319 rc= network_connect(ptr);
320 break;
321 case MEMCACHED_CONNECTION_UNIX_SOCKET:
322 rc= unix_socket_connect(ptr);
323 break;
324 default:
325 WATCHPOINT_ASSERT(0);
326 }
327
328 LIBMEMCACHED_MEMCACHED_CONNECT_END();
329
330 return rc;
331 }