Merge in util library.
[m6w6/libmemcached] / util / instance.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * DataDifferential Utility Library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <config.h>
40
41 #include "util/instance.hpp"
42
43 #include <cstdio>
44 #include <sstream>
45 #include <netdb.h>
46 #include <poll.h>
47 #include <sys/socket.h>
48 #include <sys/types.h>
49 #include <netinet/in.h>
50
51
52 namespace datadifferential {
53 namespace util {
54
55 Instance::Instance(const std::string& hostname_arg, const std::string& service_arg) :
56 _host(hostname_arg),
57 _service(service_arg),
58 _sockfd(INVALID_SOCKET),
59 state(NOT_WRITING),
60 _addrinfo(0),
61 _addrinfo_next(0),
62 _finish_fn(NULL),
63 _operations()
64 {
65 }
66
67 Instance::Instance(const std::string& hostname_arg, const in_port_t port_arg) :
68 _host(hostname_arg),
69 _sockfd(INVALID_SOCKET),
70 state(NOT_WRITING),
71 _addrinfo(0),
72 _addrinfo_next(0),
73 _finish_fn(NULL),
74 _operations()
75 {
76 char tmp[BUFSIZ];
77 snprintf(tmp, sizeof(tmp), "%u", static_cast<unsigned int>(port_arg));
78 _service= tmp;
79 }
80
81 Instance::~Instance()
82 {
83 close_socket();
84 free_addrinfo();
85 for (Operation::vector::iterator iter= _operations.begin(); iter != _operations.end(); ++iter)
86 {
87 delete *iter;
88 }
89 _operations.clear();
90
91 delete _finish_fn;
92 }
93
94 bool Instance::run()
95 {
96 while (not _operations.empty())
97 {
98 Operation::vector::value_type operation= _operations.back();
99
100 switch (state)
101 {
102 case NOT_WRITING:
103 {
104 free_addrinfo();
105
106 struct addrinfo ai;
107 memset(&ai, 0, sizeof(struct addrinfo));
108 ai.ai_socktype= SOCK_STREAM;
109 ai.ai_protocol= IPPROTO_TCP;
110
111 int ret= getaddrinfo(_host.c_str(), _service.c_str(), &ai, &_addrinfo);
112 if (ret)
113 {
114 std::stringstream message;
115 message << "Failed to connect on " << _host.c_str() << ":" << _service.c_str() << " with " << gai_strerror(ret);
116 _last_error= message.str();
117 return false;
118 }
119 }
120 _addrinfo_next= _addrinfo;
121 state= CONNECT;
122 break;
123
124 case NEXT_CONNECT_ADDRINFO:
125 if (_addrinfo_next->ai_next == NULL)
126 {
127 std::stringstream message;
128 message << "Error connecting to " << _host.c_str() << "." << std::endl;
129 _last_error= message.str();
130 return false;
131 }
132 _addrinfo_next= _addrinfo_next->ai_next;
133
134
135 case CONNECT:
136 close_socket();
137
138 _sockfd= socket(_addrinfo_next->ai_family,
139 _addrinfo_next->ai_socktype,
140 _addrinfo_next->ai_protocol);
141 if (_sockfd == INVALID_SOCKET)
142 {
143 perror("socket");
144 continue;
145 }
146
147 if (connect(_sockfd, _addrinfo_next->ai_addr, _addrinfo_next->ai_addrlen) < 0)
148 {
149 switch(errno)
150 {
151 case EAGAIN:
152 case EINTR:
153 state= CONNECT;
154 break;
155
156 case EINPROGRESS:
157 state= CONNECTING;
158 break;
159
160 case ECONNREFUSED:
161 case ENETUNREACH:
162 case ETIMEDOUT:
163 default:
164 state= NEXT_CONNECT_ADDRINFO;
165 break;
166 }
167 }
168 else
169 {
170 state= CONNECTING;
171 }
172 break;
173
174 case CONNECTING:
175 // Add logic for poll() for nonblocking.
176 state= CONNECTED;
177 break;
178
179 case CONNECTED:
180 case WRITING:
181 {
182 size_t packet_length= operation->size();
183 const char *packet= operation->ptr();
184
185 while(packet_length)
186 {
187 ssize_t write_size= send(_sockfd, packet, packet_length, 0);
188
189 if (write_size < 0)
190 {
191 switch(errno)
192 {
193 default:
194 std::cerr << "Failed during send(" << strerror(errno) << ")" << std::endl;
195 break;
196 }
197 }
198
199 packet_length-= static_cast<size_t>(write_size);
200 packet+= static_cast<size_t>(write_size);
201 }
202 }
203 state= READING;
204 break;
205
206 case READING:
207 if (operation->has_response())
208 {
209 size_t total_read;
210 ssize_t read_length;
211
212 do
213 {
214 char buffer[BUFSIZ];
215 read_length= recv(_sockfd, buffer, sizeof(buffer), 0);
216
217 if (read_length < 0)
218 {
219 switch(errno)
220 {
221 default:
222 std::cerr << "Error occured while reading data from " << _host.c_str() << std::endl;
223 return false;
224 }
225 }
226
227 operation->push(buffer, static_cast<size_t>(read_length));
228 total_read+= static_cast<size_t>(read_length);
229 } while (more_to_read());
230 } // end has_response
231
232 state= FINISHED;
233 break;
234
235 case FINISHED:
236 std::string response;
237 bool success= operation->response(response);
238 if (_finish_fn)
239 {
240 if (not _finish_fn->call(success, response))
241 {
242 // Error was sent from _finish_fn
243 return false;
244 }
245 }
246
247 if (operation->reconnect())
248 {
249 }
250 _operations.pop_back();
251 delete operation;
252
253 state= CONNECTED;
254 break;
255 } // end switch
256 }
257
258 return true;
259 } // end run()
260
261 bool Instance::more_to_read() const
262 {
263 struct pollfd fds;
264 fds.fd= _sockfd;
265 fds.events = POLLIN;
266
267 if (poll(&fds, 1, 5) < 1) // Default timeout is 5
268 {
269 return false;
270 }
271
272 return true;
273 }
274
275 void Instance::close_socket()
276 {
277 if (_sockfd == INVALID_SOCKET)
278 return;
279
280 /* in case of death shutdown to avoid blocking at close() */
281 if (shutdown(_sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
282 {
283 perror("shutdown");
284 }
285 else if (closesocket(_sockfd) == SOCKET_ERROR)
286 {
287 perror("close");
288 }
289
290 _sockfd= INVALID_SOCKET;
291 }
292
293 void Instance::free_addrinfo()
294 {
295 if (not _addrinfo)
296 return;
297
298 freeaddrinfo(_addrinfo);
299 _addrinfo= NULL;
300 _addrinfo_next= NULL;
301 }
302
303 } /* namespace util */
304 } /* namespace datadifferential */