Skip to content

Commit

Permalink
Set real TCP non_blocking_send limitation (#4502)
Browse files Browse the repository at this point in the history
* Refs #20589: Set real non_blocking_send limitation

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20589: Readapt test

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20589: Fix failing test in macos/linux

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20589: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20589: Fix unused variable

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
(cherry picked from commit 33eb8be)
  • Loading branch information
jepemi committed Apr 15, 2024
1 parent 748f85e commit e7c0b9a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 39 deletions.
3 changes: 2 additions & 1 deletion src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ bool TCPChannelResource::check_socket_send_buffer(


size_t future_queue_size = size_t(bytesInSendQueue) + msg_size;
if (future_queue_size > size_t(parent_->configuration()->sendBufferSize))
// TCP actually allocates twice the size of the buffer requested.
if (future_queue_size > size_t(2 * parent_->configuration()->sendBufferSize))
{
return false;
}
Expand Down
94 changes: 65 additions & 29 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1242,17 +1242,21 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports_with_sni)
// destination that does not read or does it so slowly.
TEST_F(TCPv4Tests, secure_non_blocking_send)
{
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info);

uint16_t port = g_default_port;
uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer;
// Create a TCP Server transport
using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole;
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.apply_security = true;
senderDescriptor.sendBufferSize = msg_size;
senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT;
senderDescriptor.tls_config.verify_file = "ca.crt";
senderDescriptor.tls_config.password = "fastddspwd";
senderDescriptor.tls_config.cert_chain_file = "fastdds.crt";
senderDescriptor.tls_config.private_key_file = "fastdds.key";
senderDescriptor.tls_config.tmp_dh_file = "dh_params.pem";
senderDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER;
senderDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
senderDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE);
Expand All @@ -1269,8 +1273,8 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
// Hence, a raw socket is opened and connected to the server. Read calls on that
// socket are controlled.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
Expand All @@ -1283,13 +1287,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
{
return preverified;
});
ssl_context.set_password_callback([](std::size_t, asio::ssl::context_base::password_purpose)
{
return "fastddspwd";
});
ssl_context.use_certificate_chain_file("fastdds.crt");
ssl_context.use_private_key_file("fastdds.key", asio::ssl::context::pem);
ssl_context.use_tmp_dh_file("dh_params.pem");
ssl_context.load_verify_file("ca.crt");

uint32_t options = 0;
options |= asio::ssl::context::default_workarounds;
Expand All @@ -1298,8 +1296,19 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
options |= asio::ssl::context::no_compression;
ssl_context.set_options(options);

// TCPChannelResourceSecure::connect() like connection
asio::io_service io_service;
auto ioServiceFunction = [&]()
{
#if ASIO_VERSION >= 101200
asio::executor_work_guard<asio::io_service::executor_type> work(io_service.get_executor());
#else
io_service::work work(io_service_);
#endif // if ASIO_VERSION >= 101200
io_service.run();
};
std::thread ioServiceThread(ioServiceFunction);

// TCPChannelResourceSecure::connect() like connection
asio::ip::tcp::resolver resolver(io_service);
auto endpoints = resolver.resolve(
IPLocator::ip_to_string(serverLoc),
Expand All @@ -1320,7 +1329,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
)
{
ASSERT_TRUE(!ec);
asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::server;
asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::client;
secure_socket->async_handshake(role,
[](const std::error_code& ec)
{
Expand All @@ -1336,24 +1345,40 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
a connection. This channel will not be present in the server's channel_resources_ map
as communication lacks most of the discovery messages using a raw socket as participant.
*/
// auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
ASSERT_TRUE(sender_unbound_channel_resources.size() == 1);
auto sender_channel_resource =
std::static_pointer_cast<TCPChannelResourceBasic>(sender_unbound_channel_resources[0]);

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
std::vector<octet> message(msg_size * 2, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
// it should be able to send a message of msg_size*2.
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, size);

// Now wait until the receive buffer is flushed (send buffer will be empty too)
std::vector<octet> buffer(size, 0);
size_t bytes_read = 0;
bytes_read = asio::read(*secure_socket, asio::buffer(buffer.data(), size), asio::transfer_exactly(size), ec);
ASSERT_EQ(ec, asio::error_code());
ASSERT_EQ(bytes_read, size);

// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
message.resize(msg_size * 2 + 1);
data = message.data();
size = message.size();
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, 0u);

secure_socket->lowest_layer().close(ec);
io_service.stop();
ioServiceThread.join();
}
#endif // ifndef _WIN32

Expand Down Expand Up @@ -1789,7 +1814,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
}

#ifndef _WIN32
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a
// destination that does not read or does it so slowly.
TEST_F(TCPv4Tests, non_blocking_send)
{
Expand All @@ -1810,8 +1835,8 @@ TEST_F(TCPv4Tests, non_blocking_send)
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
// Hence, a raw socket is opened and connected to the server. Read calls on that
// socket are controlled.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
Expand Down Expand Up @@ -1856,15 +1881,26 @@ TEST_F(TCPv4Tests, non_blocking_send)

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
std::vector<octet> message(msg_size * 2, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
// it should be able to send a message of msg_size*2.
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, size);

// Now wait until the receive buffer is flushed (send buffer will be empty too)
std::vector<octet> buffer(size, 0);
size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec);
ASSERT_EQ(bytes_read, size);

// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
message.resize(msg_size * 2 + 1);
data = message.data();
size = message.size();
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, 0u);

socket.shutdown(asio::ip::tcp::socket::shutdown_both);
socket.cancel();
Expand Down
29 changes: 20 additions & 9 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness)
}

#ifndef _WIN32
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a
// destination that does not read or does it so slowly.
TEST_F(TCPv6Tests, non_blocking_send)
{
Expand All @@ -267,8 +267,8 @@ TEST_F(TCPv6Tests, non_blocking_send)
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
// Hence, a raw socket is opened and connected to the server. Read calls on that
// socket are controlled.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv6;
IPLocator::setIPv6(serverLoc, "::1");
Expand Down Expand Up @@ -313,15 +313,26 @@ TEST_F(TCPv6Tests, non_blocking_send)

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
std::vector<octet> message(msg_size * 2, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
// it should be able to send a message of msg_size*2.
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, size);

// Now wait until the receive buffer is flushed (send buffer will be empty too)
std::vector<octet> buffer(size, 0);
size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec);
ASSERT_EQ(bytes_read, size);

// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
message.resize(msg_size * 2 + 1);
data = message.data();
size = message.size();
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, 0u);

socket.shutdown(asio::ip::tcp::socket::shutdown_both);
socket.cancel();
Expand Down

0 comments on commit e7c0b9a

Please sign in to comment.