1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 * DataDifferential Utility Library
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 #include "util/instance.hpp"
46 #include <netinet/in.h>
49 #include <sys/socket.h>
50 #include <sys/types.h>
57 namespace datadifferential
{
60 Instance::Instance(const std::string
& hostname_arg
, const std::string
& service_arg
) :
62 _service(service_arg
),
63 _sockfd(INVALID_SOCKET
),
72 Instance::Instance(const std::string
& hostname_arg
, const in_port_t port_arg
) :
74 _sockfd(INVALID_SOCKET
),
82 snprintf(tmp
, sizeof(tmp
), "%u", static_cast<unsigned int>(port_arg
));
90 for (Operation::vector::iterator iter
= _operations
.begin(); iter
!= _operations
.end(); ++iter
)
101 while (not _operations
.empty())
103 Operation::vector::value_type operation
= _operations
.back();
112 memset(&ai
, 0, sizeof(struct addrinfo
));
113 ai
.ai_socktype
= SOCK_STREAM
;
114 ai
.ai_protocol
= IPPROTO_TCP
;
116 int ret
= getaddrinfo(_host
.c_str(), _service
.c_str(), &ai
, &_addrinfo
);
119 std::stringstream message
;
120 message
<< "Failed to connect on " << _host
.c_str() << ":" << _service
.c_str() << " with " << gai_strerror(ret
);
121 _last_error
= message
.str();
125 _addrinfo_next
= _addrinfo
;
129 case NEXT_CONNECT_ADDRINFO
:
130 if (_addrinfo_next
->ai_next
== NULL
)
132 std::stringstream message
;
133 message
<< "Error connecting to " << _host
.c_str() << "." << std::endl
;
134 _last_error
= message
.str();
137 _addrinfo_next
= _addrinfo_next
->ai_next
;
143 _sockfd
= socket(_addrinfo_next
->ai_family
,
144 _addrinfo_next
->ai_socktype
,
145 _addrinfo_next
->ai_protocol
);
146 if (_sockfd
== INVALID_SOCKET
)
152 if (connect(_sockfd
, _addrinfo_next
->ai_addr
, _addrinfo_next
->ai_addrlen
) < 0)
169 state
= NEXT_CONNECT_ADDRINFO
;
180 // Add logic for poll() for nonblocking.
187 size_t packet_length
= operation
->size();
188 const char *packet
= operation
->ptr();
192 ssize_t write_size
= send(_sockfd
, packet
, packet_length
, 0);
199 std::cerr
<< "Failed dureng send(" << strerror(errno
) << ")" << std::endl
;
204 packet_length
-= static_cast<size_t>(write_size
);
205 packet
+= static_cast<size_t>(write_size
);
212 if (operation
->has_response())
220 read_length
= ::recv(_sockfd
, buffer
, sizeof(buffer
), 0);
228 _last_error
+= "Error occured while reading data from ";
233 else if (read_length
== 0)
236 _last_error
+= "Socket was shutdown while reading from ";
242 operation
->push(buffer
, static_cast<size_t>(read_length
));
243 total_read
+= static_cast<size_t>(read_length
);
245 } while (more_to_read());
246 } // end has_response
252 std::string response
;
253 bool success
= operation
->response(response
);
256 if (not _finish_fn
->call(success
, response
))
258 // Error was sent from _finish_fn
263 if (operation
->reconnect())
266 _operations
.pop_back();
277 bool Instance::more_to_read() const
283 if (poll(&fds
, 1, 5) < 1) // Default timeout is 5
291 void Instance::close_socket()
293 if (_sockfd
== INVALID_SOCKET
)
298 /* in case of death shutdown to avoid blocking at close() */
299 if (shutdown(_sockfd
, SHUT_RDWR
) == SOCKET_ERROR
&& get_socket_errno() != ENOTCONN
)
303 else if (closesocket(_sockfd
) == SOCKET_ERROR
)
308 _sockfd
= INVALID_SOCKET
;
311 void Instance::free_addrinfo()
313 if (_addrinfo
== NULL
)
318 freeaddrinfo(_addrinfo
);
320 _addrinfo_next
= NULL
;
323 } /* namespace util */
324 } /* namespace datadifferential */