Skip to content

Commit

Permalink
TCP non_blocking_send moved to TCPTransportDescriptor (#4415)
Browse files Browse the repository at this point in the history
* Refs #20502: non_blocking_send moved to tcp transport descriptor

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20502: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
  • Loading branch information
jepemi authored Feb 28, 2024
1 parent 6668c19 commit 1c82f83
Show file tree
Hide file tree
Showing 14 changed files with 50 additions and 43 deletions.
18 changes: 18 additions & 0 deletions include/fastdds/rtps/transport/TCPTransportDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ namespace rtps {
*
* - \c tls_config: Configuration for TLS.
*
* - \c non_blocking_send: do not block on send operations. When it is set to true, send operations will return
* immediately if the buffer might get full, but no error will be returned to the upper layer. This means
* that the application will behave as if the datagram is sent and lost.
*
* @ingroup TRANSPORT_MODULE
*/
struct TCPTransportDescriptor : public SocketTransportDescriptor
Expand Down Expand Up @@ -276,6 +280,20 @@ struct TCPTransportDescriptor : public SocketTransportDescriptor
//! Thread settings for the accept connections thread
ThreadSettings accept_thread;

/**
* Whether to use non-blocking calls to send().
*
* When set to true, calls to send() will return immediately if the send buffer might get full.
* This may happen when receive buffer on reader's side is full. No error will be returned
* to the upper layer. This means that the application will behave
* as if the datagram is sent but lost (i.e. throughput may be reduced). This value is
* specially useful on high-frequency writers.
*
* When set to false, which is the default, calls to send() will block until the send buffer has space for the
* datagram. This may cause application lock.
*/
bool non_blocking_send;

//! Add listener port to the listening_ports list
void add_listener_port(
uint16_t port)
Expand Down
2 changes: 1 addition & 1 deletion resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@
├ interfaceWhiteList [0~*], (NOT available for SHM type)
| └ address [ipv4Address|ipv6Address]
├ TTL [uint8], (ONLY available for UDP type)
├ non_blocking_send [boolean], (ONLY available for UDP type)
├ non_blocking_send [boolean], (NOT available for SHM type)
├ output_port [uint16], (ONLY available for UDP type)
├ wan_addr [ipv4AddressFormat], (ONLY available for TCPv4 type)
├ keep_alive_frequency_ms [uint32], (ONLY available for TCP type)
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ size_t TCPChannelResourceBasic::send(
{
std::lock_guard<std::mutex> send_guard(send_mutex_);

if (parent_->get_non_blocking_send() &&
if (parent_->configuration()->non_blocking_send &&
!check_socket_send_buffer(header_size + size, socket_->native_handle()))
{
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/TCPChannelResourceSecure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ size_t TCPChannelResourceSecure::send(

if (eConnecting < connection_status_)
{
if (parent_->get_non_blocking_send() &&
if (parent_->configuration()->non_blocking_send &&
!check_socket_send_buffer(header_size + size,
secure_socket_->lowest_layer().native_handle()))
{
Expand Down
15 changes: 5 additions & 10 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ TCPTransportDescriptor::TCPTransportDescriptor()
, calculate_crc(true)
, check_crc(true)
, apply_security(false)
, non_blocking_send(false)
{
}

Expand All @@ -125,6 +126,7 @@ TCPTransportDescriptor::TCPTransportDescriptor(
, tls_config(t.tls_config)
, keep_alive_thread(t.keep_alive_thread)
, accept_thread(t.accept_thread)
, non_blocking_send(t.non_blocking_send)
{
}

Expand Down Expand Up @@ -152,6 +154,7 @@ TCPTransportDescriptor& TCPTransportDescriptor::operator =(
tls_config = t.tls_config;
keep_alive_thread = t.keep_alive_thread;
accept_thread = t.accept_thread;
non_blocking_send = t.non_blocking_send;
return *this;
}

Expand All @@ -173,14 +176,14 @@ bool TCPTransportDescriptor::operator ==(
this->tls_config == t.tls_config &&
this->keep_alive_thread == t.keep_alive_thread &&
this->accept_thread == t.accept_thread &&
this->non_blocking_send == t.non_blocking_send &&
SocketTransportDescriptor::operator ==(t));
}

TCPTransportInterface::TCPTransportInterface(
int32_t transport_kind)
: TransportInterface(transport_kind)
, alive_(true)
, non_blocking_send_(false)
#if TLS_FOUND
, ssl_context_(asio::ssl::context::sslv23)
#endif // if TLS_FOUND
Expand Down Expand Up @@ -422,7 +425,7 @@ bool TCPTransportInterface::DoInputLocatorsMatch(
}

bool TCPTransportInterface::init(
const fastrtps::rtps::PropertyPolicy* properties)
const fastrtps::rtps::PropertyPolicy*)
{
if (!apply_tls_config())
{
Expand All @@ -447,14 +450,6 @@ bool TCPTransportInterface::init(
ip::tcp::endpoint local_endpoint = initial_peer_local_locator_socket_->local_endpoint();
initial_peer_local_locator_port_ = local_endpoint.port();

// Get non_blocking_send property
if (properties)
{
auto s_non_blocking_send = eprosima::fastrtps::rtps::PropertyPolicyHelper::find_property(*properties,
"fastdds.tcp_transport.non_blocking_send");
non_blocking_send_ = s_non_blocking_send && *s_non_blocking_send == "true"? true : false;
}

// Check system buffer sizes.
if (configuration()->sendBufferSize == 0)
{
Expand Down
18 changes: 0 additions & 18 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,6 @@ class TCPTransportInterface : public TransportInterface
asio::io_service io_service_timers_;
std::unique_ptr<asio::ip::tcp::socket> initial_peer_local_locator_socket_;
uint16_t initial_peer_local_locator_port_;
/**
* Whether to use non-blocking calls to send().
*
* When set to true, calls to send() will return immediately if the send buffer is full.
* This may happen when receive buffer on reader's side is full. No error will be returned
* to the upper layer. This means that the application will behave
* as if the datagram is sent but lost (i.e. throughput may be reduced). This value is
* specially useful on high-frequency writers.
*
* When set to false, calls to send() will block until the send buffer has space for the
* datagram. This may cause application lock.
*/
bool non_blocking_send_;

#if TLS_FOUND
asio::ssl::context ssl_context_;
Expand Down Expand Up @@ -473,11 +460,6 @@ class TCPTransportInterface : public TransportInterface
void fill_local_physical_port(
Locator& locator) const;

bool get_non_blocking_send() const
{
return non_blocking_send_;
}

};

} // namespace rtps
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/xmlparser/XMLParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,14 @@ XMLP_ret XMLParser::parseXMLCommonTCPTransportData(
return XMLP_ret::XML_ERROR;
}
}
// non_blocking_send - boolType
else if (strcmp(name, NON_BLOCKING_SEND) == 0)
{
if (XMLP_ret::XML_OK != getXMLBool(p_aux0, &pTCPDesc->non_blocking_send, 0))
{
return XMLP_ret::XML_ERROR;
}
}
else if (strcmp(name, LISTENING_PORTS) == 0)
{
// listening_ports uint16ListType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ typedef struct TCPTransportDescriptor : public SocketTransportDescriptor
bool calculate_crc;
bool check_crc;
bool apply_security;
bool non_blocking_send;

TLSConfig tls_config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<calculate_crc>false</calculate_crc>
<check_crc>false</check_crc>
<enable_tcp_nodelay>false</enable_tcp_nodelay>
<non_blocking_send>false</non_blocking_send>
</transport_descriptor>
<!-- UDP sample transport descriptor. Several options are common with TCP -->
<transport_descriptor>
Expand Down
1 change: 1 addition & 0 deletions test/system/tools/xmlvalidation/all_profile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@
<calculate_crc>false</calculate_crc>
<check_crc>false</check_crc>
<enable_tcp_nodelay>false</enable_tcp_nodelay>
<non_blocking_send>false</non_blocking_send>
</transport_descriptor>

<transport_descriptor>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<calculate_crc>false</calculate_crc>
<check_crc>false</check_crc>
<enable_tcp_nodelay>false</enable_tcp_nodelay>
<non_blocking_send>false</non_blocking_send>
</transport_descriptor>

<transport_descriptor>
Expand Down
11 changes: 4 additions & 7 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "mock/MockTCPChannelResource.h"
#include "mock/MockTCPv4Transport.h"
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastrtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/utils/Semaphore.h>
#include <fastrtps/utils/IPFinder.h>
Expand Down Expand Up @@ -1426,6 +1425,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole;
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.non_blocking_send = true;
senderDescriptor.sendBufferSize = msg_size;
senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT;
senderDescriptor.tls_config.verify_file = "ca.crt";
Expand All @@ -1435,9 +1435,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
senderDescriptor.tls_config.add_option(TLSOptions::NO_SSLV2);
senderDescriptor.tls_config.add_option(TLSOptions::NO_COMPRESSION);
MockTCPv4Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);
senderTransportUnderTest.init();

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
Expand Down Expand Up @@ -1976,11 +1974,10 @@ TEST_F(TCPv4Tests, non_blocking_send)
// Create a TCP Server transport
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.non_blocking_send = true;
senderDescriptor.sendBufferSize = msg_size;
MockTCPv4Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);
senderTransportUnderTest.init();

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
Expand Down
5 changes: 2 additions & 3 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,10 @@ TEST_F(TCPv6Tests, non_blocking_send)
// Create a TCP Server transport
TCPv6TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.non_blocking_send = true;
senderDescriptor.sendBufferSize = msg_size;
MockTCPv6Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);
senderTransportUnderTest.init();

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
Expand Down
8 changes: 6 additions & 2 deletions test/unittest/xmlparser/XMLParserTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ TEST_F(XMLParserTests, parseXMLTransportData)
<calculate_crc>false</calculate_crc>\
<check_crc>false</check_crc>\
<enable_tcp_nodelay>false</enable_tcp_nodelay>\
<non_blocking_send>true</non_blocking_send>\
<tls><!-- TLS Section --></tls>\
<keep_alive_thread>\
<scheduling_policy>12</scheduling_policy>\
Expand Down Expand Up @@ -1086,6 +1087,7 @@ TEST_F(XMLParserTests, parseXMLTransportData)
EXPECT_EQ(pTCPv4Desc->listening_ports[0], 5100u);
EXPECT_EQ(pTCPv4Desc->listening_ports[1], 5200u);
EXPECT_EQ(pTCPv4Desc->keep_alive_thread, modified_thread_settings);
EXPECT_EQ(pTCPv4Desc->non_blocking_send, true);
EXPECT_EQ(pTCPv4Desc->accept_thread, modified_thread_settings);
EXPECT_EQ(pTCPv4Desc->default_reception_threads(), modified_thread_settings);
EXPECT_EQ(pTCPv4Desc->get_thread_config_for_port(12345), modified_thread_settings);
Expand Down Expand Up @@ -1115,8 +1117,9 @@ TEST_F(XMLParserTests, parseXMLTransportData)
EXPECT_EQ(pTCPv6Desc->logical_port_increment, 2u);
EXPECT_EQ(pTCPv6Desc->listening_ports[0], 5100u);
EXPECT_EQ(pTCPv6Desc->listening_ports[1], 5200u);
EXPECT_EQ(pTCPv4Desc->keep_alive_thread, modified_thread_settings);
EXPECT_EQ(pTCPv4Desc->accept_thread, modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->keep_alive_thread, modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->non_blocking_send, true);
EXPECT_EQ(pTCPv6Desc->accept_thread, modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->default_reception_threads(), modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->get_thread_config_for_port(12345), modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->get_thread_config_for_port(12346), modified_thread_settings);
Expand Down Expand Up @@ -1236,6 +1239,7 @@ TEST_F(XMLParserTests, parseXMLTransportData_NegativeClauses)
"calculate_crc",
"check_crc",
"enable_tcp_nodelay",
"non_blocking_send",
"tls",
"keep_alive_thread",
"accept_thread",
Expand Down

0 comments on commit 1c82f83

Please sign in to comment.