Fix typo for initializer
[awesomized/libmemcached] / util / instance.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * DataDifferential Utility Library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 #include "mem_config.h"
40
41 #include "util/instance.hpp"
42
43 #include <cstdio>
44 #include <iostream>
45 #include <netdb.h>
46 #include <netinet/in.h>
47 #include <poll.h>
48 #include <sstream>
49 #include <sys/socket.h>
50 #include <sys/types.h>
51
52 #ifdef HAVE_UNISTD_H
53 # include <unistd.h>
54 #endif
55
56 #ifndef INVALID_SOCKET
57 # define INVALID_SOCKET -1
58 #endif
59
60 #ifndef SOCKET_ERROR
61 # define SOCKET_ERROR -1
62 #endif
63
64 #ifndef get_socket_errno
65 # define get_socket_errno() errno
66 #endif
67
68 #ifndef closesocket
69 # define closesocket(a) close(a)
70 #endif
71
72
73 namespace datadifferential {
74 namespace util {
75
76 Instance::Instance(const std::string& hostname_arg, const std::string& service_arg) :
77 _host(hostname_arg),
78 _service(service_arg),
79 _sockfd(INVALID_SOCKET),
80 state(NOT_WRITING),
81 _addrinfo(0),
82 _addrinfo_next(0),
83 _finish_fn(NULL),
84 _operations()
85 {
86 }
87
88 Instance::Instance(const std::string& hostname_arg, const in_port_t port_arg) :
89 _host(hostname_arg),
90 _sockfd(INVALID_SOCKET),
91 state(NOT_WRITING),
92 _addrinfo(0),
93 _addrinfo_next(0),
94 _finish_fn(NULL),
95 _operations()
96 {
97 char tmp[BUFSIZ];
98 snprintf(tmp, sizeof(tmp), "%u", static_cast<unsigned int>(port_arg));
99 _service= tmp;
100 }
101
102 Instance::~Instance()
103 {
104 close_socket();
105 free_addrinfo();
106 for (Operation::vector::iterator iter= _operations.begin(); iter != _operations.end(); ++iter)
107 {
108 delete *iter;
109 }
110 _operations.clear();
111
112 delete _finish_fn;
113 }
114
115 bool Instance::run()
116 {
117 while (not _operations.empty())
118 {
119 Operation::vector::value_type operation= _operations.back();
120
121 switch (state)
122 {
123 case NOT_WRITING:
124 {
125 free_addrinfo();
126
127 struct addrinfo ai;
128 memset(&ai, 0, sizeof(struct addrinfo));
129 ai.ai_socktype= SOCK_STREAM;
130 ai.ai_protocol= IPPROTO_TCP;
131
132 int ret= getaddrinfo(_host.c_str(), _service.c_str(), &ai, &_addrinfo);
133 if (ret)
134 {
135 std::stringstream message;
136 message << "Failed to connect on " << _host.c_str() << ":" << _service.c_str() << " with " << gai_strerror(ret);
137 _last_error= message.str();
138 return false;
139 }
140 }
141 _addrinfo_next= _addrinfo;
142 state= CONNECT;
143 break;
144
145 case NEXT_CONNECT_ADDRINFO:
146 if (_addrinfo_next->ai_next == NULL)
147 {
148 std::stringstream message;
149 message << "Error connecting to " << _host.c_str() << "." << std::endl;
150 _last_error= message.str();
151 return false;
152 }
153 _addrinfo_next= _addrinfo_next->ai_next;
154
155
156 case CONNECT:
157 close_socket();
158
159 _sockfd= socket(_addrinfo_next->ai_family,
160 _addrinfo_next->ai_socktype,
161 _addrinfo_next->ai_protocol);
162 if (_sockfd == INVALID_SOCKET)
163 {
164 perror("socket");
165 continue;
166 }
167
168 if (connect(_sockfd, _addrinfo_next->ai_addr, _addrinfo_next->ai_addrlen) < 0)
169 {
170 switch(errno)
171 {
172 case EAGAIN:
173 case EINTR:
174 state= CONNECT;
175 break;
176
177 case EINPROGRESS:
178 state= CONNECTING;
179 break;
180
181 case ECONNREFUSED:
182 case ENETUNREACH:
183 case ETIMEDOUT:
184 default:
185 state= NEXT_CONNECT_ADDRINFO;
186 break;
187 }
188 }
189 else
190 {
191 state= CONNECTING;
192 }
193 break;
194
195 case CONNECTING:
196 // Add logic for poll() for nonblocking.
197 state= CONNECTED;
198 break;
199
200 case CONNECTED:
201 case WRITING:
202 {
203 size_t packet_length= operation->size();
204 const char *packet= operation->ptr();
205
206 while(packet_length)
207 {
208 ssize_t write_size= send(_sockfd, packet, packet_length, 0);
209
210 if (write_size < 0)
211 {
212 switch(errno)
213 {
214 default:
215 std::cerr << "Failed dureng send(" << strerror(errno) << ")" << std::endl;
216 break;
217 }
218 }
219
220 packet_length-= static_cast<size_t>(write_size);
221 packet+= static_cast<size_t>(write_size);
222 }
223 }
224 state= READING;
225 break;
226
227 case READING:
228 if (operation->has_response())
229 {
230 ssize_t read_length;
231
232 do
233 {
234 char buffer[BUFSIZ];
235 read_length= ::recv(_sockfd, buffer, sizeof(buffer), 0);
236
237 if (read_length < 0)
238 {
239 switch(errno)
240 {
241 default:
242 _last_error.clear();
243 _last_error+= "Error occured while reading data from ";
244 _last_error+= _host;
245 return false;
246 }
247 }
248 else if (read_length == 0)
249 {
250 _last_error.clear();
251 _last_error+= "Socket was shutdown while reading from ";
252 _last_error+= _host;
253
254 return false;
255 }
256
257 operation->push(buffer, static_cast<size_t>(read_length));
258
259 } while (more_to_read());
260 } // end has_response
261
262 state= FINISHED;
263 break;
264
265 case FINISHED:
266 std::string response;
267 bool success= operation->response(response);
268 if (_finish_fn)
269 {
270 if (not _finish_fn->call(success, response))
271 {
272 // Error was sent from _finish_fn
273 return false;
274 }
275 }
276
277 if (operation->reconnect())
278 {
279 }
280 _operations.pop_back();
281 delete operation;
282
283 state= CONNECTED;
284 break;
285 } // end switch
286 }
287
288 return true;
289 } // end run()
290
291 bool Instance::more_to_read() const
292 {
293 struct pollfd fds;
294 fds.fd= _sockfd;
295 fds.events = POLLIN;
296
297 if (poll(&fds, 1, 5) < 1) // Default timeout is 5
298 {
299 return false;
300 }
301
302 return true;
303 }
304
305 void Instance::close_socket()
306 {
307 if (_sockfd == INVALID_SOCKET)
308 {
309 return;
310 }
311
312 /* in case of death shutdown to avoid blocking at close() */
313 if (shutdown(_sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
314 {
315 perror("shutdown");
316 }
317 else if (closesocket(_sockfd) == SOCKET_ERROR)
318 {
319 perror("close");
320 }
321
322 _sockfd= INVALID_SOCKET;
323 }
324
325 void Instance::free_addrinfo()
326 {
327 if (_addrinfo == NULL)
328 {
329 return;
330 }
331
332 freeaddrinfo(_addrinfo);
333 _addrinfo= NULL;
334 _addrinfo_next= NULL;
335 }
336
337 } /* namespace util */
338 } /* namespace datadifferential */