Update util and fix a few cppcheck warnings.
[m6w6/libmemcached] / util / instance.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * DataDifferential Utility Library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <config.h>
40
41 #include "util/instance.hpp"
42
43 #include <cstdio>
44 #include <iostream>
45 #include <netdb.h>
46 #include <netinet/in.h>
47 #include <poll.h>
48 #include <sstream>
49 #include <sys/socket.h>
50 #include <sys/types.h>
51
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56
57 namespace datadifferential {
58 namespace util {
59
60 Instance::Instance(const std::string& hostname_arg, const std::string& service_arg) :
61 _host(hostname_arg),
62 _service(service_arg),
63 _sockfd(INVALID_SOCKET),
64 state(NOT_WRITING),
65 _addrinfo(0),
66 _addrinfo_next(0),
67 _finish_fn(NULL),
68 _operations()
69 {
70 }
71
72 Instance::Instance(const std::string& hostname_arg, const in_port_t port_arg) :
73 _host(hostname_arg),
74 _sockfd(INVALID_SOCKET),
75 state(NOT_WRITING),
76 _addrinfo(0),
77 _addrinfo_next(0),
78 _finish_fn(NULL),
79 _operations()
80 {
81 char tmp[BUFSIZ];
82 snprintf(tmp, sizeof(tmp), "%u", static_cast<unsigned int>(port_arg));
83 _service= tmp;
84 }
85
86 Instance::~Instance()
87 {
88 close_socket();
89 free_addrinfo();
90 for (Operation::vector::iterator iter= _operations.begin(); iter != _operations.end(); ++iter)
91 {
92 delete *iter;
93 }
94 _operations.clear();
95
96 delete _finish_fn;
97 }
98
99 bool Instance::run()
100 {
101 while (not _operations.empty())
102 {
103 Operation::vector::value_type operation= _operations.back();
104
105 switch (state)
106 {
107 case NOT_WRITING:
108 {
109 free_addrinfo();
110
111 struct addrinfo ai;
112 memset(&ai, 0, sizeof(struct addrinfo));
113 ai.ai_socktype= SOCK_STREAM;
114 ai.ai_protocol= IPPROTO_TCP;
115
116 int ret= getaddrinfo(_host.c_str(), _service.c_str(), &ai, &_addrinfo);
117 if (ret)
118 {
119 std::stringstream message;
120 message << "Failed to connect on " << _host.c_str() << ":" << _service.c_str() << " with " << gai_strerror(ret);
121 _last_error= message.str();
122 return false;
123 }
124 }
125 _addrinfo_next= _addrinfo;
126 state= CONNECT;
127 break;
128
129 case NEXT_CONNECT_ADDRINFO:
130 if (_addrinfo_next->ai_next == NULL)
131 {
132 std::stringstream message;
133 message << "Error connecting to " << _host.c_str() << "." << std::endl;
134 _last_error= message.str();
135 return false;
136 }
137 _addrinfo_next= _addrinfo_next->ai_next;
138
139
140 case CONNECT:
141 close_socket();
142
143 _sockfd= socket(_addrinfo_next->ai_family,
144 _addrinfo_next->ai_socktype,
145 _addrinfo_next->ai_protocol);
146 if (_sockfd == INVALID_SOCKET)
147 {
148 perror("socket");
149 continue;
150 }
151
152 if (connect(_sockfd, _addrinfo_next->ai_addr, _addrinfo_next->ai_addrlen) < 0)
153 {
154 switch(errno)
155 {
156 case EAGAIN:
157 case EINTR:
158 state= CONNECT;
159 break;
160
161 case EINPROGRESS:
162 state= CONNECTING;
163 break;
164
165 case ECONNREFUSED:
166 case ENETUNREACH:
167 case ETIMEDOUT:
168 default:
169 state= NEXT_CONNECT_ADDRINFO;
170 break;
171 }
172 }
173 else
174 {
175 state= CONNECTING;
176 }
177 break;
178
179 case CONNECTING:
180 // Add logic for poll() for nonblocking.
181 state= CONNECTED;
182 break;
183
184 case CONNECTED:
185 case WRITING:
186 {
187 size_t packet_length= operation->size();
188 const char *packet= operation->ptr();
189
190 while(packet_length)
191 {
192 ssize_t write_size= send(_sockfd, packet, packet_length, 0);
193
194 if (write_size < 0)
195 {
196 switch(errno)
197 {
198 default:
199 std::cerr << "Failed dureng send(" << strerror(errno) << ")" << std::endl;
200 break;
201 }
202 }
203
204 packet_length-= static_cast<size_t>(write_size);
205 packet+= static_cast<size_t>(write_size);
206 }
207 }
208 state= READING;
209 break;
210
211 case READING:
212 if (operation->has_response())
213 {
214 ssize_t read_length;
215
216 do
217 {
218 char buffer[BUFSIZ];
219 read_length= ::recv(_sockfd, buffer, sizeof(buffer), 0);
220
221 if (read_length < 0)
222 {
223 switch(errno)
224 {
225 default:
226 _last_error.clear();
227 _last_error+= "Error occured while reading data from ";
228 _last_error+= _host;
229 return false;
230 }
231 }
232 else if (read_length == 0)
233 {
234 _last_error.clear();
235 _last_error+= "Socket was shutdown while reading from ";
236 _last_error+= _host;
237
238 return false;
239 }
240
241 operation->push(buffer, static_cast<size_t>(read_length));
242
243 } while (more_to_read());
244 } // end has_response
245
246 state= FINISHED;
247 break;
248
249 case FINISHED:
250 std::string response;
251 bool success= operation->response(response);
252 if (_finish_fn)
253 {
254 if (not _finish_fn->call(success, response))
255 {
256 // Error was sent from _finish_fn
257 return false;
258 }
259 }
260
261 if (operation->reconnect())
262 {
263 }
264 _operations.pop_back();
265 delete operation;
266
267 state= CONNECTED;
268 break;
269 } // end switch
270 }
271
272 return true;
273 } // end run()
274
275 bool Instance::more_to_read() const
276 {
277 struct pollfd fds;
278 fds.fd= _sockfd;
279 fds.events = POLLIN;
280
281 if (poll(&fds, 1, 5) < 1) // Default timeout is 5
282 {
283 return false;
284 }
285
286 return true;
287 }
288
289 void Instance::close_socket()
290 {
291 if (_sockfd == INVALID_SOCKET)
292 {
293 return;
294 }
295
296 /* in case of death shutdown to avoid blocking at close() */
297 if (shutdown(_sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
298 {
299 perror("shutdown");
300 }
301 else if (closesocket(_sockfd) == SOCKET_ERROR)
302 {
303 perror("close");
304 }
305
306 _sockfd= INVALID_SOCKET;
307 }
308
309 void Instance::free_addrinfo()
310 {
311 if (_addrinfo == NULL)
312 {
313 return;
314 }
315
316 freeaddrinfo(_addrinfo);
317 _addrinfo= NULL;
318 _addrinfo_next= NULL;
319 }
320
321 } /* namespace util */
322 } /* namespace datadifferential */