Fix for lp:962815, from https://launchpad.net/~harebox
[awesomized/libmemcached] / util / instance.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * DataDifferential Utility Library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <config.h>
40
41 #include "util/instance.hpp"
42
43 #include <cstdio>
44 #include <iostream>
45 #include <netdb.h>
46 #include <netinet/in.h>
47 #include <poll.h>
48 #include <sstream>
49 #include <sys/socket.h>
50 #include <sys/types.h>
51
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56
57 namespace datadifferential {
58 namespace util {
59
60 Instance::Instance(const std::string& hostname_arg, const std::string& service_arg) :
61 _host(hostname_arg),
62 _service(service_arg),
63 _sockfd(INVALID_SOCKET),
64 state(NOT_WRITING),
65 _addrinfo(0),
66 _addrinfo_next(0),
67 _finish_fn(NULL),
68 _operations()
69 {
70 }
71
72 Instance::Instance(const std::string& hostname_arg, const in_port_t port_arg) :
73 _host(hostname_arg),
74 _sockfd(INVALID_SOCKET),
75 state(NOT_WRITING),
76 _addrinfo(0),
77 _addrinfo_next(0),
78 _finish_fn(NULL),
79 _operations()
80 {
81 char tmp[BUFSIZ];
82 snprintf(tmp, sizeof(tmp), "%u", static_cast<unsigned int>(port_arg));
83 _service= tmp;
84 }
85
86 Instance::~Instance()
87 {
88 close_socket();
89 free_addrinfo();
90 for (Operation::vector::iterator iter= _operations.begin(); iter != _operations.end(); ++iter)
91 {
92 delete *iter;
93 }
94 _operations.clear();
95
96 delete _finish_fn;
97 }
98
99 bool Instance::run()
100 {
101 while (not _operations.empty())
102 {
103 Operation::vector::value_type operation= _operations.back();
104
105 switch (state)
106 {
107 case NOT_WRITING:
108 {
109 free_addrinfo();
110
111 struct addrinfo ai;
112 memset(&ai, 0, sizeof(struct addrinfo));
113 ai.ai_socktype= SOCK_STREAM;
114 ai.ai_protocol= IPPROTO_TCP;
115
116 int ret= getaddrinfo(_host.c_str(), _service.c_str(), &ai, &_addrinfo);
117 if (ret)
118 {
119 std::stringstream message;
120 message << "Failed to connect on " << _host.c_str() << ":" << _service.c_str() << " with " << gai_strerror(ret);
121 _last_error= message.str();
122 return false;
123 }
124 }
125 _addrinfo_next= _addrinfo;
126 state= CONNECT;
127 break;
128
129 case NEXT_CONNECT_ADDRINFO:
130 if (_addrinfo_next->ai_next == NULL)
131 {
132 std::stringstream message;
133 message << "Error connecting to " << _host.c_str() << "." << std::endl;
134 _last_error= message.str();
135 return false;
136 }
137 _addrinfo_next= _addrinfo_next->ai_next;
138
139
140 case CONNECT:
141 close_socket();
142
143 _sockfd= socket(_addrinfo_next->ai_family,
144 _addrinfo_next->ai_socktype,
145 _addrinfo_next->ai_protocol);
146 if (_sockfd == INVALID_SOCKET)
147 {
148 perror("socket");
149 continue;
150 }
151
152 if (connect(_sockfd, _addrinfo_next->ai_addr, _addrinfo_next->ai_addrlen) < 0)
153 {
154 switch(errno)
155 {
156 case EAGAIN:
157 case EINTR:
158 state= CONNECT;
159 break;
160
161 case EINPROGRESS:
162 state= CONNECTING;
163 break;
164
165 case ECONNREFUSED:
166 case ENETUNREACH:
167 case ETIMEDOUT:
168 default:
169 state= NEXT_CONNECT_ADDRINFO;
170 break;
171 }
172 }
173 else
174 {
175 state= CONNECTING;
176 }
177 break;
178
179 case CONNECTING:
180 // Add logic for poll() for nonblocking.
181 state= CONNECTED;
182 break;
183
184 case CONNECTED:
185 case WRITING:
186 {
187 size_t packet_length= operation->size();
188 const char *packet= operation->ptr();
189
190 while(packet_length)
191 {
192 ssize_t write_size= send(_sockfd, packet, packet_length, 0);
193
194 if (write_size < 0)
195 {
196 switch(errno)
197 {
198 default:
199 std::cerr << "Failed dureng send(" << strerror(errno) << ")" << std::endl;
200 break;
201 }
202 }
203
204 packet_length-= static_cast<size_t>(write_size);
205 packet+= static_cast<size_t>(write_size);
206 }
207 }
208 state= READING;
209 break;
210
211 case READING:
212 if (operation->has_response())
213 {
214 size_t total_read;
215 ssize_t read_length;
216
217 do
218 {
219 char buffer[BUFSIZ];
220 read_length= ::recv(_sockfd, buffer, sizeof(buffer), 0);
221
222 if (read_length < 0)
223 {
224 switch(errno)
225 {
226 default:
227 _last_error.clear();
228 _last_error+= "Error occured while reading data from ";
229 _last_error+= _host;
230 return false;
231 }
232 }
233 else if (read_length == 0)
234 {
235 _last_error.clear();
236 _last_error+= "Socket was shutdown while reading from ";
237 _last_error+= _host;
238
239 return false;
240 }
241
242 operation->push(buffer, static_cast<size_t>(read_length));
243 total_read+= static_cast<size_t>(read_length);
244
245 } while (more_to_read());
246 } // end has_response
247
248 state= FINISHED;
249 break;
250
251 case FINISHED:
252 std::string response;
253 bool success= operation->response(response);
254 if (_finish_fn)
255 {
256 if (not _finish_fn->call(success, response))
257 {
258 // Error was sent from _finish_fn
259 return false;
260 }
261 }
262
263 if (operation->reconnect())
264 {
265 }
266 _operations.pop_back();
267 delete operation;
268
269 state= CONNECTED;
270 break;
271 } // end switch
272 }
273
274 return true;
275 } // end run()
276
277 bool Instance::more_to_read() const
278 {
279 struct pollfd fds;
280 fds.fd= _sockfd;
281 fds.events = POLLIN;
282
283 if (poll(&fds, 1, 5) < 1) // Default timeout is 5
284 {
285 return false;
286 }
287
288 return true;
289 }
290
291 void Instance::close_socket()
292 {
293 if (_sockfd == INVALID_SOCKET)
294 {
295 return;
296 }
297
298 /* in case of death shutdown to avoid blocking at close() */
299 if (shutdown(_sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
300 {
301 perror("shutdown");
302 }
303 else if (closesocket(_sockfd) == SOCKET_ERROR)
304 {
305 perror("close");
306 }
307
308 _sockfd= INVALID_SOCKET;
309 }
310
311 void Instance::free_addrinfo()
312 {
313 if (_addrinfo == NULL)
314 {
315 return;
316 }
317
318 freeaddrinfo(_addrinfo);
319 _addrinfo= NULL;
320 _addrinfo_next= NULL;
321 }
322
323 } /* namespace util */
324 } /* namespace datadifferential */