1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 * DataDifferential Utility Library
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 #include "mem_config.h"
41 #include "util/instance.hpp"
46 #include <netinet/in.h>
49 #ifdef HAVE_SYS_SOCKET_H
50 # include <sys/socket.h>
52 #include <sys/types.h>
58 #ifndef INVALID_SOCKET
59 # define INVALID_SOCKET -1
63 # define SOCKET_ERROR -1
66 #ifndef get_socket_errno
67 # define get_socket_errno() errno
71 # define closesocket(a) close(a)
75 namespace datadifferential
{
78 Instance::Instance(const std::string
& hostname_arg
, const std::string
& service_arg
) :
80 _service(service_arg
),
81 _sockfd(INVALID_SOCKET
),
90 Instance::Instance(const std::string
& hostname_arg
, const in_port_t port_arg
) :
92 _sockfd(INVALID_SOCKET
),
100 snprintf(tmp
, sizeof(tmp
), "%u", static_cast<unsigned int>(port_arg
));
104 Instance::~Instance()
108 for (Operation::vector::iterator iter
= _operations
.begin(); iter
!= _operations
.end(); ++iter
)
119 while (not _operations
.empty())
121 Operation::vector::value_type operation
= _operations
.back();
130 memset(&ai
, 0, sizeof(struct addrinfo
));
131 ai
.ai_socktype
= SOCK_STREAM
;
132 ai
.ai_protocol
= IPPROTO_TCP
;
134 int ret
= getaddrinfo(_host
.c_str(), _service
.c_str(), &ai
, &_addrinfo
);
137 std::stringstream message
;
138 message
<< "Failed to connect on " << _host
.c_str() << ":" << _service
.c_str() << " with " << gai_strerror(ret
);
139 _last_error
= message
.str();
143 _addrinfo_next
= _addrinfo
;
147 case NEXT_CONNECT_ADDRINFO
:
148 if (_addrinfo_next
->ai_next
== NULL
)
150 std::stringstream message
;
151 message
<< "Error connecting to " << _host
.c_str() << "." << std::endl
;
152 _last_error
= message
.str();
155 _addrinfo_next
= _addrinfo_next
->ai_next
;
161 _sockfd
= socket(_addrinfo_next
->ai_family
,
162 _addrinfo_next
->ai_socktype
,
163 _addrinfo_next
->ai_protocol
);
164 if (_sockfd
== INVALID_SOCKET
)
170 if (connect(_sockfd
, _addrinfo_next
->ai_addr
, _addrinfo_next
->ai_addrlen
) < 0)
187 state
= NEXT_CONNECT_ADDRINFO
;
198 // Add logic for poll() for nonblocking.
205 size_t packet_length
= operation
->size();
206 const char *packet
= operation
->ptr();
210 ssize_t write_size
= send(_sockfd
, packet
, packet_length
, 0);
217 std::cerr
<< "Failed dureng send(" << strerror(errno
) << ")" << std::endl
;
222 packet_length
-= static_cast<size_t>(write_size
);
223 packet
+= static_cast<size_t>(write_size
);
230 if (operation
->has_response())
237 read_length
= ::recv(_sockfd
, buffer
, sizeof(buffer
), 0);
245 _last_error
+= "Error occured while reading data from ";
250 else if (read_length
== 0)
253 _last_error
+= "Socket was shutdown while reading from ";
259 operation
->push(buffer
, static_cast<size_t>(read_length
));
261 } while (more_to_read());
262 } // end has_response
268 std::string response
;
269 bool success
= operation
->response(response
);
272 if (not _finish_fn
->call(success
, response
))
274 // Error was sent from _finish_fn
279 if (operation
->reconnect())
282 _operations
.pop_back();
293 bool Instance::more_to_read() const
299 if (poll(&fds
, 1, 5) < 1) // Default timeout is 5
307 void Instance::close_socket()
309 if (_sockfd
== INVALID_SOCKET
)
314 /* in case of death shutdown to avoid blocking at close() */
315 if (shutdown(_sockfd
, SHUT_RDWR
) == SOCKET_ERROR
&& get_socket_errno() != ENOTCONN
)
319 else if (closesocket(_sockfd
) == SOCKET_ERROR
)
324 _sockfd
= INVALID_SOCKET
;
327 void Instance::free_addrinfo()
329 if (_addrinfo
== NULL
)
334 freeaddrinfo(_addrinfo
);
336 _addrinfo_next
= NULL
;
339 } /* namespace util */
340 } /* namespace datadifferential */