f136d7b7d05f788fabd8a2a0537973ae3ffd39e9
[m6w6/libmemcached] / src / util / instance.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * DataDifferential Utility Library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include "mem_config.h"
40
41 #include "util/instance.hpp"
42
43 #include <cstdio>
44 #include <iostream>
45 #include <netdb.h>
46 #include <netinet/in.h>
47 #include <poll.h>
48 #include <sstream>
49 #ifdef HAVE_SYS_SOCKET_H
50 # include <sys/socket.h>
51 #endif
52 #include <sys/types.h>
53
54 #ifdef HAVE_UNISTD_H
55 # include <unistd.h>
56 #endif
57
58 #ifndef INVALID_SOCKET
59 # define INVALID_SOCKET -1
60 #endif
61
62 #ifndef SOCKET_ERROR
63 # define SOCKET_ERROR -1
64 #endif
65
66 #ifndef get_socket_errno
67 # define get_socket_errno() errno
68 #endif
69
70 #ifndef closesocket
71 # define closesocket(a) close(a)
72 #endif
73
74
75 namespace datadifferential {
76 namespace util {
77
78 Instance::Instance(const std::string& hostname_arg, const std::string& service_arg) :
79 _host(hostname_arg),
80 _service(service_arg),
81 _sockfd(INVALID_SOCKET),
82 state(NOT_WRITING),
83 _addrinfo(0),
84 _addrinfo_next(0),
85 _finish_fn(NULL),
86 _operations()
87 {
88 }
89
90 Instance::Instance(const std::string& hostname_arg, const in_port_t port_arg) :
91 _host(hostname_arg),
92 _sockfd(INVALID_SOCKET),
93 state(NOT_WRITING),
94 _addrinfo(0),
95 _addrinfo_next(0),
96 _finish_fn(NULL),
97 _operations()
98 {
99 char tmp[BUFSIZ];
100 snprintf(tmp, sizeof(tmp), "%u", static_cast<unsigned int>(port_arg));
101 _service= tmp;
102 }
103
104 Instance::~Instance()
105 {
106 close_socket();
107 free_addrinfo();
108 for (Operation::vector::iterator iter= _operations.begin(); iter != _operations.end(); ++iter)
109 {
110 delete *iter;
111 }
112 _operations.clear();
113
114 delete _finish_fn;
115 }
116
117 bool Instance::run()
118 {
119 while (not _operations.empty())
120 {
121 Operation::vector::value_type operation= _operations.back();
122
123 switch (state)
124 {
125 case NOT_WRITING:
126 {
127 free_addrinfo();
128
129 struct addrinfo ai;
130 memset(&ai, 0, sizeof(struct addrinfo));
131 ai.ai_socktype= SOCK_STREAM;
132 ai.ai_protocol= IPPROTO_TCP;
133
134 int ret= getaddrinfo(_host.c_str(), _service.c_str(), &ai, &_addrinfo);
135 if (ret)
136 {
137 std::stringstream message;
138 message << "Failed to connect on " << _host.c_str() << ":" << _service.c_str() << " with " << gai_strerror(ret);
139 _last_error= message.str();
140 return false;
141 }
142 }
143 _addrinfo_next= _addrinfo;
144 state= CONNECT;
145 break;
146
147 case NEXT_CONNECT_ADDRINFO:
148 if (_addrinfo_next->ai_next == NULL)
149 {
150 std::stringstream message;
151 message << "Error connecting to " << _host.c_str() << "." << std::endl;
152 _last_error= message.str();
153 return false;
154 }
155 _addrinfo_next= _addrinfo_next->ai_next;
156 /* fall through */
157
158 case CONNECT:
159 close_socket();
160
161 _sockfd= socket(_addrinfo_next->ai_family,
162 _addrinfo_next->ai_socktype,
163 _addrinfo_next->ai_protocol);
164 if (_sockfd == INVALID_SOCKET)
165 {
166 perror("socket");
167 continue;
168 }
169
170 if (connect(_sockfd, _addrinfo_next->ai_addr, _addrinfo_next->ai_addrlen) < 0)
171 {
172 switch(errno)
173 {
174 case EAGAIN:
175 case EINTR:
176 state= CONNECT;
177 break;
178
179 case EINPROGRESS:
180 state= CONNECTING;
181 break;
182
183 case ECONNREFUSED:
184 case ENETUNREACH:
185 case ETIMEDOUT:
186 default:
187 state= NEXT_CONNECT_ADDRINFO;
188 break;
189 }
190 }
191 else
192 {
193 state= CONNECTING;
194 }
195 break;
196
197 case CONNECTING:
198 // Add logic for poll() for nonblocking.
199 state= CONNECTED;
200 break;
201
202 case CONNECTED:
203 case WRITING:
204 {
205 size_t packet_length= operation->size();
206 const char *packet= operation->ptr();
207
208 while(packet_length)
209 {
210 ssize_t write_size= send(_sockfd, packet, packet_length, 0);
211
212 if (write_size < 0)
213 {
214 switch(errno)
215 {
216 default:
217 std::cerr << "Failed dureng send(" << strerror(errno) << ")" << std::endl;
218 break;
219 }
220 }
221
222 packet_length-= static_cast<size_t>(write_size);
223 packet+= static_cast<size_t>(write_size);
224 }
225 }
226 state= READING;
227 break;
228
229 case READING:
230 if (operation->has_response())
231 {
232 ssize_t read_length;
233
234 do
235 {
236 char buffer[BUFSIZ];
237 read_length= ::recv(_sockfd, buffer, sizeof(buffer), 0);
238
239 if (read_length < 0)
240 {
241 switch(errno)
242 {
243 default:
244 _last_error.clear();
245 _last_error+= "Error occured while reading data from ";
246 _last_error+= _host;
247 return false;
248 }
249 }
250 else if (read_length == 0)
251 {
252 _last_error.clear();
253 _last_error+= "Socket was shutdown while reading from ";
254 _last_error+= _host;
255
256 return false;
257 }
258
259 operation->push(buffer, static_cast<size_t>(read_length));
260
261 } while (more_to_read());
262 } // end has_response
263
264 state= FINISHED;
265 break;
266
267 case FINISHED:
268 std::string response;
269 bool success= operation->response(response);
270 if (_finish_fn)
271 {
272 if (not _finish_fn->call(success, response))
273 {
274 // Error was sent from _finish_fn
275 return false;
276 }
277 }
278
279 if (operation->reconnect())
280 {
281 }
282 _operations.pop_back();
283 delete operation;
284
285 state= CONNECTED;
286 break;
287 } // end switch
288 }
289
290 return true;
291 } // end run()
292
293 bool Instance::more_to_read() const
294 {
295 struct pollfd fds;
296 fds.fd= _sockfd;
297 fds.events = POLLIN;
298
299 if (poll(&fds, 1, 5) < 1) // Default timeout is 5
300 {
301 return false;
302 }
303
304 return true;
305 }
306
307 void Instance::close_socket()
308 {
309 if (_sockfd == INVALID_SOCKET)
310 {
311 return;
312 }
313
314 /* in case of death shutdown to avoid blocking at close() */
315 if (shutdown(_sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
316 {
317 perror("shutdown");
318 }
319 else if (closesocket(_sockfd) == SOCKET_ERROR)
320 {
321 perror("close");
322 }
323
324 _sockfd= INVALID_SOCKET;
325 }
326
327 void Instance::free_addrinfo()
328 {
329 if (_addrinfo == NULL)
330 {
331 return;
332 }
333
334 freeaddrinfo(_addrinfo);
335 _addrinfo= NULL;
336 _addrinfo_next= NULL;
337 }
338
339 } /* namespace util */
340 } /* namespace datadifferential */