From e7c0b9a40a51c86753319088b5deeb423df1a748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20P=C3=A9rez?= <78275223+jepemi@users.noreply.github.com> Date: Wed, 27 Mar 2024 14:14:32 +0100 Subject: [PATCH] Set real TCP non_blocking_send limitation (#4502) * Refs #20589: Set real non_blocking_send limitation Signed-off-by: Jesus Perez * Refs #20589: Readapt test Signed-off-by: Jesus Perez * Refs #20589: Fix failing test in macos/linux Signed-off-by: Jesus Perez * Refs #20589: Uncrustify Signed-off-by: Jesus Perez * Refs #20589: Fix unused variable Signed-off-by: Jesus Perez --------- Signed-off-by: Jesus Perez (cherry picked from commit 33eb8be4b42d735e84f1754bfecaa6f294e153b1) --- src/cpp/rtps/transport/TCPChannelResource.cpp | 3 +- test/unittest/transport/TCPv4Tests.cpp | 94 +++++++++++++------ test/unittest/transport/TCPv6Tests.cpp | 29 ++++-- 3 files changed, 87 insertions(+), 39 deletions(-) diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index 2bb1e4b5db6..efc7a6903b9 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -362,7 +362,8 @@ bool TCPChannelResource::check_socket_send_buffer( size_t future_queue_size = size_t(bytesInSendQueue) + msg_size; - if (future_queue_size > size_t(parent_->configuration()->sendBufferSize)) + // TCP actually allocates twice the size of the buffer requested. + if (future_queue_size > size_t(2 * parent_->configuration()->sendBufferSize)) { return false; } diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 8e9eec1beae..70a3385a267 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1242,17 +1242,21 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports_with_sni) // destination that does not read or does it so slowly. TEST_F(TCPv4Tests, secure_non_blocking_send) { + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); + uint16_t port = g_default_port; uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; // Create a TCP Server transport using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; - using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole; TCPv4TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); + senderDescriptor.apply_security = true; senderDescriptor.sendBufferSize = msg_size; - senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT; - senderDescriptor.tls_config.verify_file = "ca.crt"; + senderDescriptor.tls_config.password = "fastddspwd"; + senderDescriptor.tls_config.cert_chain_file = "fastdds.crt"; + senderDescriptor.tls_config.private_key_file = "fastdds.key"; + senderDescriptor.tls_config.tmp_dh_file = "dh_params.pem"; senderDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; senderDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); senderDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); @@ -1269,8 +1273,8 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(serverLoc, 127, 0, 0, 1); @@ -1283,13 +1287,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) { return preverified; }); - ssl_context.set_password_callback([](std::size_t, asio::ssl::context_base::password_purpose) - { - return "fastddspwd"; - }); - ssl_context.use_certificate_chain_file("fastdds.crt"); - ssl_context.use_private_key_file("fastdds.key", asio::ssl::context::pem); - ssl_context.use_tmp_dh_file("dh_params.pem"); + ssl_context.load_verify_file("ca.crt"); uint32_t options = 0; options |= asio::ssl::context::default_workarounds; @@ -1298,8 +1296,19 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) options |= asio::ssl::context::no_compression; ssl_context.set_options(options); - // TCPChannelResourceSecure::connect() like connection asio::io_service io_service; + auto ioServiceFunction = [&]() + { +#if ASIO_VERSION >= 101200 + asio::executor_work_guard work(io_service.get_executor()); +#else + io_service::work work(io_service_); +#endif // if ASIO_VERSION >= 101200 + io_service.run(); + }; + std::thread ioServiceThread(ioServiceFunction); + + // TCPChannelResourceSecure::connect() like connection asio::ip::tcp::resolver resolver(io_service); auto endpoints = resolver.resolve( IPLocator::ip_to_string(serverLoc), @@ -1320,7 +1329,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) ) { ASSERT_TRUE(!ec); - asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::server; + asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::client; secure_socket->async_handshake(role, [](const std::error_code& ec) { @@ -1336,6 +1345,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) a connection. This channel will not be present in the server's channel_resources_ map as communication lacks most of the discovery messages using a raw socket as participant. */ + // auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources(); auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources(); ASSERT_TRUE(sender_unbound_channel_resources.size() == 1); auto sender_channel_resource = @@ -1343,17 +1353,32 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header - for (int i = 0; i < 5; i++) - { - sender_channel_resource->send(nullptr, 0, data, size, ec); - } + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = 0; + bytes_read = asio::read(*secure_socket, asio::buffer(buffer.data(), size), asio::transfer_exactly(size), ec); + ASSERT_EQ(ec, asio::error_code()); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); secure_socket->lowest_layer().close(ec); + io_service.stop(); + ioServiceThread.join(); } #endif // ifndef _WIN32 @@ -1789,7 +1814,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) } #ifndef _WIN32 -// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a +// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a // destination that does not read or does it so slowly. TEST_F(TCPv4Tests, non_blocking_send) { @@ -1810,8 +1835,8 @@ TEST_F(TCPv4Tests, non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(serverLoc, 127, 0, 0, 1); @@ -1856,15 +1881,26 @@ TEST_F(TCPv4Tests, non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header - for (int i = 0; i < 5; i++) - { - sender_channel_resource->send(nullptr, 0, data, size, ec); - } + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); socket.shutdown(asio::ip::tcp::socket::shutdown_both); socket.cancel(); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index 2e56450a074..76422fc98a6 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -246,7 +246,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness) } #ifndef _WIN32 -// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a +// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a // destination that does not read or does it so slowly. TEST_F(TCPv6Tests, non_blocking_send) { @@ -267,8 +267,8 @@ TEST_F(TCPv6Tests, non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv6; IPLocator::setIPv6(serverLoc, "::1"); @@ -313,15 +313,26 @@ TEST_F(TCPv6Tests, non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header - for (int i = 0; i < 5; i++) - { - sender_channel_resource->send(nullptr, 0, data, size, ec); - } + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); socket.shutdown(asio::ip::tcp::socket::shutdown_both); socket.cancel();