23c8aec4f5947997d871bf766f70636c00851dca
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 * DataDifferential Utility Library
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 #include "util/instance.hpp"
48 #include <sys/socket.h>
49 #include <sys/types.h>
50 #include <netinet/in.h>
53 namespace datadifferential
{
56 Instance::Instance(const std::string
& hostname_arg
, const std::string
& service_arg
) :
58 _service(service_arg
),
59 _sockfd(INVALID_SOCKET
),
68 Instance::Instance(const std::string
& hostname_arg
, const in_port_t port_arg
) :
70 _sockfd(INVALID_SOCKET
),
78 snprintf(tmp
, sizeof(tmp
), "%u", static_cast<unsigned int>(port_arg
));
86 for (Operation::vector::iterator iter
= _operations
.begin(); iter
!= _operations
.end(); ++iter
)
97 while (not _operations
.empty())
99 Operation::vector::value_type operation
= _operations
.back();
108 memset(&ai
, 0, sizeof(struct addrinfo
));
109 ai
.ai_socktype
= SOCK_STREAM
;
110 ai
.ai_protocol
= IPPROTO_TCP
;
112 int ret
= getaddrinfo(_host
.c_str(), _service
.c_str(), &ai
, &_addrinfo
);
115 std::stringstream message
;
116 message
<< "Failed to connect on " << _host
.c_str() << ":" << _service
.c_str() << " with " << gai_strerror(ret
);
117 _last_error
= message
.str();
121 _addrinfo_next
= _addrinfo
;
125 case NEXT_CONNECT_ADDRINFO
:
126 if (_addrinfo_next
->ai_next
== NULL
)
128 std::stringstream message
;
129 message
<< "Error connecting to " << _host
.c_str() << "." << std::endl
;
130 _last_error
= message
.str();
133 _addrinfo_next
= _addrinfo_next
->ai_next
;
139 _sockfd
= socket(_addrinfo_next
->ai_family
,
140 _addrinfo_next
->ai_socktype
,
141 _addrinfo_next
->ai_protocol
);
142 if (_sockfd
== INVALID_SOCKET
)
148 if (connect(_sockfd
, _addrinfo_next
->ai_addr
, _addrinfo_next
->ai_addrlen
) < 0)
165 state
= NEXT_CONNECT_ADDRINFO
;
176 // Add logic for poll() for nonblocking.
183 size_t packet_length
= operation
->size();
184 const char *packet
= operation
->ptr();
188 ssize_t write_size
= send(_sockfd
, packet
, packet_length
, 0);
195 std::cerr
<< "Failed during send(" << strerror(errno
) << ")" << std::endl
;
200 packet_length
-= static_cast<size_t>(write_size
);
201 packet
+= static_cast<size_t>(write_size
);
208 if (operation
->has_response())
216 read_length
= recv(_sockfd
, buffer
, sizeof(buffer
), 0);
223 std::cerr
<< "Error occured while reading data from " << _host
.c_str() << std::endl
;
228 operation
->push(buffer
, static_cast<size_t>(read_length
));
229 total_read
+= static_cast<size_t>(read_length
);
230 } while (more_to_read());
231 } // end has_response
237 std::string response
;
238 bool success
= operation
->response(response
);
241 if (not _finish_fn
->call(success
, response
))
243 // Error was sent from _finish_fn
248 if (operation
->reconnect())
251 _operations
.pop_back();
262 bool Instance::more_to_read() const
268 if (poll(&fds
, 1, 5) < 1) // Default timeout is 5
276 void Instance::close_socket()
278 if (_sockfd
== INVALID_SOCKET
)
281 /* in case of death shutdown to avoid blocking at close() */
282 if (shutdown(_sockfd
, SHUT_RDWR
) == SOCKET_ERROR
&& get_socket_errno() != ENOTCONN
)
286 else if (closesocket(_sockfd
) == SOCKET_ERROR
)
291 _sockfd
= INVALID_SOCKET
;
294 void Instance::free_addrinfo()
299 freeaddrinfo(_addrinfo
);
301 _addrinfo_next
= NULL
;
304 } /* namespace util */
305 } /* namespace datadifferential */