Skip to content

Commit

Permalink
TCP non-blocking send (#4237)
Browse files Browse the repository at this point in the history
* Refs #20119: Check send buffer queue before sending new data

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: Add non_blocking_send attribute to tcp

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: Add test

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: Add non-blocking send to secure socket

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: Update versions.md

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: non_blocking_send moved to properties

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20119: Fix Windows&Mac build warnings

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
  • Loading branch information
jepemi committed Feb 23, 2024
1 parent 869217d commit 755abc6
Show file tree
Hide file tree
Showing 12 changed files with 378 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ static void setup_transports_large_data(

auto tcp_transport = create_tcpv4_transport(att);
att.userTransports.push_back(tcp_transport);
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");

Locator_t tcp_loc;
tcp_loc.kind = LOCATOR_KIND_TCPv4;
Expand Down Expand Up @@ -234,6 +235,7 @@ static void setup_transports_large_datav6(

auto tcp_transport = create_tcpv6_transport(att);
att.userTransports.push_back(tcp_transport);
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");

Locator_t tcp_loc;
tcp_loc.kind = LOCATOR_KIND_TCPv6;
Expand Down
24 changes: 24 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,30 @@ bool TCPChannelResource::remove_logical_port(
return true;
}

bool TCPChannelResource::check_socket_send_buffer(
const size_t& msg_size,
const asio::ip::tcp::socket::native_handle_type& socket_native_handle)
{
int bytesInSendQueue = 0;

#ifndef _WIN32
if (ioctl(socket_native_handle, TIOCOUTQ, &bytesInSendQueue) == -1)
{
bytesInSendQueue = 0;
}
#else // ifdef _WIN32
static_cast<void>(socket_native_handle);
#endif // ifndef _WIN32


size_t future_queue_size = size_t(bytesInSendQueue) + msg_size;
if (future_queue_size > size_t(parent_->configuration()->sendBufferSize))
{
return false;
}
return true;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
4 changes: 4 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ class TCPChannelResource : public ChannelResource
const std::vector<uint16_t>& availablePorts,
RTCPMessageManager* rtcp_manager);

bool check_socket_send_buffer(
const size_t& msg_size,
const asio::ip::tcp::socket::native_handle_type& socket_native_handle);

TCPConnectionType tcp_connection_type_;

friend class TCPTransportInterface;
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ size_t TCPChannelResourceBasic::send(
if (eConnecting < connection_status_)
{
std::lock_guard<std::mutex> send_guard(send_mutex_);

if (parent_->get_non_blocking_send() &&
!check_socket_send_buffer(header_size + size, socket_->native_handle()))
{
return 0;
}

if (header_size > 0)
{
std::array<asio::const_buffer, 2> buffers;
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResourceSecure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ size_t TCPChannelResourceSecure::send(

if (eConnecting < connection_status_)
{
if (parent_->get_non_blocking_send() &&
!check_socket_send_buffer(header_size + size,
secure_socket_->lowest_layer().native_handle()))
{
return 0;
}

std::vector<asio::const_buffer> buffers;
if (header_size > 0)
{
Expand Down
11 changes: 10 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ TCPTransportInterface::TCPTransportInterface(
int32_t transport_kind)
: TransportInterface(transport_kind)
, alive_(true)
, non_blocking_send_(false)
#if TLS_FOUND
, ssl_context_(asio::ssl::context::sslv23)
#endif // if TLS_FOUND
Expand Down Expand Up @@ -362,7 +363,7 @@ bool TCPTransportInterface::DoInputLocatorsMatch(
}

bool TCPTransportInterface::init(
const fastrtps::rtps::PropertyPolicy*)
const fastrtps::rtps::PropertyPolicy* properties)
{
if (!apply_tls_config())
{
Expand All @@ -387,6 +388,14 @@ bool TCPTransportInterface::init(
ip::tcp::endpoint local_endpoint = initial_peer_local_locator_socket_->local_endpoint();
initial_peer_local_locator_port_ = local_endpoint.port();

// Get non_blocking_send property
if (properties)
{
auto s_non_blocking_send = eprosima::fastrtps::rtps::PropertyPolicyHelper::find_property(*properties,
"fastdds.tcp_transport.non_blocking_send");
non_blocking_send_ = s_non_blocking_send && *s_non_blocking_send == "true"? true : false;
}

// Check system buffer sizes.
if (configuration()->sendBufferSize == 0)
{
Expand Down
19 changes: 19 additions & 0 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ class TCPTransportInterface : public TransportInterface
asio::io_service io_service_timers_;
std::unique_ptr<asio::ip::tcp::socket> initial_peer_local_locator_socket_;
uint16_t initial_peer_local_locator_port_;
/**
* Whether to use non-blocking calls to send().
*
* When set to true, calls to send() will return immediately if the send buffer is full.
* This may happen when receive buffer on reader's side is full. No error will be returned
* to the upper layer. This means that the application will behave
* as if the datagram is sent but lost (i.e. throughput may be reduced). This value is
* specially useful on high-frequency writers.
*
* When set to false, calls to send() will block until the send buffer has space for the
* datagram. This may cause application lock.
*/
bool non_blocking_send_;

#if TLS_FOUND
asio::ssl::context ssl_context_;
Expand Down Expand Up @@ -437,6 +450,12 @@ class TCPTransportInterface : public TransportInterface
*/
void fill_local_physical_port(
Locator& locator) const;

bool get_non_blocking_send() const
{
return non_blocking_send_;
}

};

} // namespace rtps
Expand Down
206 changes: 206 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "mock/MockTCPChannelResource.h"
#include "mock/MockTCPv4Transport.h"
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastrtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/utils/Semaphore.h>
#include <fastrtps/utils/IPFinder.h>
Expand Down Expand Up @@ -1154,6 +1155,127 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server)
sem.wait();
}
}

#ifndef _WIN32
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
// destination that does not read or does it so slowly.
TEST_F(TCPv4Tests, secure_non_blocking_send)
{
uint16_t port = g_default_port;
uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer;
// Create a TCP Server transport
using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole;
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.sendBufferSize = msg_size;
senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT;
senderDescriptor.tls_config.verify_file = "ca.crt";
senderDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER;
senderDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
senderDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE);
senderDescriptor.tls_config.add_option(TLSOptions::NO_SSLV2);
senderDescriptor.tls_config.add_option(TLSOptions::NO_COMPRESSION);
MockTCPv4Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
// feasible. For the saturation of the sending socket, it's necessary first to
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
serverLoc.port = port;
IPLocator::setLogicalPort(serverLoc, 7410);

// Socket TLS config
asio::ssl::context ssl_context(asio::ssl::context::sslv23);
ssl_context.set_verify_callback([](bool preverified, asio::ssl::verify_context&)
{
return preverified;
});
ssl_context.set_password_callback([](std::size_t, asio::ssl::context_base::password_purpose)
{
return "fastddspwd";
});
ssl_context.use_certificate_chain_file("fastdds.crt");
ssl_context.use_private_key_file("fastdds.key", asio::ssl::context::pem);
ssl_context.use_tmp_dh_file("dh_params.pem");

uint32_t options = 0;
options |= asio::ssl::context::default_workarounds;
options |= asio::ssl::context::single_dh_use;
options |= asio::ssl::context::no_sslv2;
options |= asio::ssl::context::no_compression;
ssl_context.set_options(options);

// TCPChannelResourceSecure::connect() like connection
asio::io_service io_service;
asio::ip::tcp::resolver resolver(io_service);
auto endpoints = resolver.resolve(
IPLocator::ip_to_string(serverLoc),
std::to_string(IPLocator::getPhysicalPort(serverLoc)));

auto secure_socket = std::make_shared<asio::ssl::stream<asio::ip::tcp::socket>>(io_service, ssl_context);
asio::ssl::verify_mode vm = 0x00;
vm |= asio::ssl::verify_peer;
secure_socket->set_verify_mode(vm);

asio::async_connect(secure_socket->lowest_layer(), endpoints,
[secure_socket](const std::error_code& ec
#if ASIO_VERSION >= 101200
, asio::ip::tcp::endpoint
#else
, const tcp::resolver::iterator& /*endpoint*/
#endif // if ASIO_VERSION >= 101200
)
{
ASSERT_TRUE(!ec);
asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::server;
secure_socket->async_handshake(role,
[](const std::error_code& ec)
{
ASSERT_TRUE(!ec);
});
});

std::this_thread::sleep_for(std::chrono::milliseconds(300));

/*
Get server's accepted channel. This is retrieved from the unbound_channel_resources_,
which is a vector where client channels are pushed immediately after the server accepts
a connection. This channel will not be present in the server's channel_resources_ map
as communication lacks most of the discovery messages using a raw socket as participant.
*/
auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
ASSERT_TRUE(sender_unbound_channel_resources.size() == 1);
auto sender_channel_resource =
std::static_pointer_cast<TCPChannelResourceBasic>(sender_unbound_channel_resources[0]);

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}

secure_socket->lowest_layer().close(ec);
}
#endif // ifndef _WIN32

#endif //TLS_FOUND

TEST_F(TCPv4Tests, send_and_receive_between_allowed_localhost_interfaces_ports)
Expand Down Expand Up @@ -1585,6 +1707,90 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2);
}

#ifndef _WIN32
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
// destination that does not read or does it so slowly.
TEST_F(TCPv4Tests, non_blocking_send)
{
uint16_t port = g_default_port;
uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer;
// Create a TCP Server transport
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.sendBufferSize = msg_size;
MockTCPv4Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
// feasible. For the saturation of the sending socket, it's necessary first to
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
serverLoc.port = port;
IPLocator::setLogicalPort(serverLoc, 7410);

// TCPChannelResourceBasic::connect() like connection
asio::io_service io_service;
asio::ip::tcp::resolver resolver(io_service);
auto endpoints = resolver.resolve(
IPLocator::ip_to_string(serverLoc),
std::to_string(IPLocator::getPhysicalPort(serverLoc)));

asio::ip::tcp::socket socket = asio::ip::tcp::socket (io_service);
asio::async_connect(
socket,
endpoints,
[](std::error_code ec
#if ASIO_VERSION >= 101200
, asio::ip::tcp::endpoint
#else
, asio::ip::tcp::resolver::iterator
#endif // if ASIO_VERSION >= 101200
)
{
ASSERT_TRUE(!ec);
}
);

std::this_thread::sleep_for(std::chrono::milliseconds(100));

/*
Get server's accepted channel. This is retrieved from the unbound_channel_resources_,
which is a vector where client channels are pushed immediately after the server accepts
a connection. This channel will not be present in the server's channel_resources_ map
as communication lacks most of the discovery messages using a raw socket as participant.
*/
auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
ASSERT_TRUE(sender_unbound_channel_resources.size() == 1);
auto sender_channel_resource =
std::static_pointer_cast<TCPChannelResourceBasic>(sender_unbound_channel_resources[0]);

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}

socket.shutdown(asio::ip::tcp::socket::shutdown_both);
socket.cancel();
socket.close();
}
#endif // ifndef _WIN32

void TCPv4Tests::HELPER_SetDescriptorDefaults()
{
descriptor.add_listener_port(g_default_port);
Expand Down
Loading

0 comments on commit 755abc6

Please sign in to comment.