Updating for 1.0.2 release
[awesomized/libmemcached] / util / instance.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * DataDifferential Utility Library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include <config.h>
40
41 #include "util/instance.hpp"
42
43 #include <cstdio>
44 #include <sstream>
45 #include <iostream>
46 #include <netdb.h>
47 #include <poll.h>
48 #include <sys/socket.h>
49 #include <sys/types.h>
50 #include <netinet/in.h>
51
52
53 namespace datadifferential {
54 namespace util {
55
56 Instance::Instance(const std::string& hostname_arg, const std::string& service_arg) :
57 _host(hostname_arg),
58 _service(service_arg),
59 _sockfd(INVALID_SOCKET),
60 state(NOT_WRITING),
61 _addrinfo(0),
62 _addrinfo_next(0),
63 _finish_fn(NULL),
64 _operations()
65 {
66 }
67
68 Instance::Instance(const std::string& hostname_arg, const in_port_t port_arg) :
69 _host(hostname_arg),
70 _sockfd(INVALID_SOCKET),
71 state(NOT_WRITING),
72 _addrinfo(0),
73 _addrinfo_next(0),
74 _finish_fn(NULL),
75 _operations()
76 {
77 char tmp[BUFSIZ];
78 snprintf(tmp, sizeof(tmp), "%u", static_cast<unsigned int>(port_arg));
79 _service= tmp;
80 }
81
82 Instance::~Instance()
83 {
84 close_socket();
85 free_addrinfo();
86 for (Operation::vector::iterator iter= _operations.begin(); iter != _operations.end(); ++iter)
87 {
88 delete *iter;
89 }
90 _operations.clear();
91
92 delete _finish_fn;
93 }
94
95 bool Instance::run()
96 {
97 while (not _operations.empty())
98 {
99 Operation::vector::value_type operation= _operations.back();
100
101 switch (state)
102 {
103 case NOT_WRITING:
104 {
105 free_addrinfo();
106
107 struct addrinfo ai;
108 memset(&ai, 0, sizeof(struct addrinfo));
109 ai.ai_socktype= SOCK_STREAM;
110 ai.ai_protocol= IPPROTO_TCP;
111
112 int ret= getaddrinfo(_host.c_str(), _service.c_str(), &ai, &_addrinfo);
113 if (ret)
114 {
115 std::stringstream message;
116 message << "Failed to connect on " << _host.c_str() << ":" << _service.c_str() << " with " << gai_strerror(ret);
117 _last_error= message.str();
118 return false;
119 }
120 }
121 _addrinfo_next= _addrinfo;
122 state= CONNECT;
123 break;
124
125 case NEXT_CONNECT_ADDRINFO:
126 if (_addrinfo_next->ai_next == NULL)
127 {
128 std::stringstream message;
129 message << "Error connecting to " << _host.c_str() << "." << std::endl;
130 _last_error= message.str();
131 return false;
132 }
133 _addrinfo_next= _addrinfo_next->ai_next;
134
135
136 case CONNECT:
137 close_socket();
138
139 _sockfd= socket(_addrinfo_next->ai_family,
140 _addrinfo_next->ai_socktype,
141 _addrinfo_next->ai_protocol);
142 if (_sockfd == INVALID_SOCKET)
143 {
144 perror("socket");
145 continue;
146 }
147
148 if (connect(_sockfd, _addrinfo_next->ai_addr, _addrinfo_next->ai_addrlen) < 0)
149 {
150 switch(errno)
151 {
152 case EAGAIN:
153 case EINTR:
154 state= CONNECT;
155 break;
156
157 case EINPROGRESS:
158 state= CONNECTING;
159 break;
160
161 case ECONNREFUSED:
162 case ENETUNREACH:
163 case ETIMEDOUT:
164 default:
165 state= NEXT_CONNECT_ADDRINFO;
166 break;
167 }
168 }
169 else
170 {
171 state= CONNECTING;
172 }
173 break;
174
175 case CONNECTING:
176 // Add logic for poll() for nonblocking.
177 state= CONNECTED;
178 break;
179
180 case CONNECTED:
181 case WRITING:
182 {
183 size_t packet_length= operation->size();
184 const char *packet= operation->ptr();
185
186 while(packet_length)
187 {
188 ssize_t write_size= send(_sockfd, packet, packet_length, 0);
189
190 if (write_size < 0)
191 {
192 switch(errno)
193 {
194 default:
195 std::cerr << "Failed during send(" << strerror(errno) << ")" << std::endl;
196 break;
197 }
198 }
199
200 packet_length-= static_cast<size_t>(write_size);
201 packet+= static_cast<size_t>(write_size);
202 }
203 }
204 state= READING;
205 break;
206
207 case READING:
208 if (operation->has_response())
209 {
210 size_t total_read;
211 ssize_t read_length;
212
213 do
214 {
215 char buffer[BUFSIZ];
216 read_length= recv(_sockfd, buffer, sizeof(buffer), 0);
217
218 if (read_length < 0)
219 {
220 switch(errno)
221 {
222 default:
223 std::cerr << "Error occured while reading data from " << _host.c_str() << std::endl;
224 return false;
225 }
226 }
227
228 operation->push(buffer, static_cast<size_t>(read_length));
229 total_read+= static_cast<size_t>(read_length);
230 } while (more_to_read());
231 } // end has_response
232
233 state= FINISHED;
234 break;
235
236 case FINISHED:
237 std::string response;
238 bool success= operation->response(response);
239 if (_finish_fn)
240 {
241 if (not _finish_fn->call(success, response))
242 {
243 // Error was sent from _finish_fn
244 return false;
245 }
246 }
247
248 if (operation->reconnect())
249 {
250 }
251 _operations.pop_back();
252 delete operation;
253
254 state= CONNECTED;
255 break;
256 } // end switch
257 }
258
259 return true;
260 } // end run()
261
262 bool Instance::more_to_read() const
263 {
264 struct pollfd fds;
265 fds.fd= _sockfd;
266 fds.events = POLLIN;
267
268 if (poll(&fds, 1, 5) < 1) // Default timeout is 5
269 {
270 return false;
271 }
272
273 return true;
274 }
275
276 void Instance::close_socket()
277 {
278 if (_sockfd == INVALID_SOCKET)
279 return;
280
281 /* in case of death shutdown to avoid blocking at close() */
282 if (shutdown(_sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
283 {
284 perror("shutdown");
285 }
286 else if (closesocket(_sockfd) == SOCKET_ERROR)
287 {
288 perror("close");
289 }
290
291 _sockfd= INVALID_SOCKET;
292 }
293
294 void Instance::free_addrinfo()
295 {
296 if (not _addrinfo)
297 return;
298
299 freeaddrinfo(_addrinfo);
300 _addrinfo= NULL;
301 _addrinfo_next= NULL;
302 }
303
304 } /* namespace util */
305 } /* namespace datadifferential */