Skip to content

Commit

Permalink
TCP first message loss (#4454)
Browse files Browse the repository at this point in the history
* Refs #20508: add remaining add_logical_port calls

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add unittests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add blackbox test + uncomment section (commented for testing purposes)

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add blackbox tests + functional fixes

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Fix tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add cv + fix windows tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508. Use lock_guard where apropiate.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20508. Proxy is_local_port_opened.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20508. Refactor wait_logical_port_under_negotiation.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20508: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Fix typo

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add wait_for_logical_port_negotiation_ms to transport descriptor

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Apply suggestions - firs message loss related

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Apply suggestions - transport descriptor related

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Reuse unused transport descriptor tcp_negotiation_timeout

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Change timeout behavior

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Fix xml parser

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Undo fastcdr commit change

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 8103cf0)

# Conflicts:
#	include/fastdds/rtps/transport/TCPTransportDescriptor.h
#	include/fastrtps/xmlparser/XMLParserCommon.h
#	resources/xsd/fastRTPS_profiles.xsd
#	src/cpp/rtps/transport/TCPChannelResource.cpp
#	src/cpp/rtps/transport/TCPTransportInterface.cpp
#	src/cpp/rtps/transport/TCPTransportInterface.h
#	src/cpp/rtps/xmlparser/XMLParser.cpp
#	src/cpp/rtps/xmlparser/XMLParserCommon.cpp
#	test/blackbox/api/dds-pim/PubSubReader.hpp
#	test/blackbox/api/dds-pim/PubSubWriter.hpp
#	test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
#	test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp
#	test/blackbox/common/BlackboxTestsTransportTCP.cpp
#	test/mock/rtps/TCPTransportDescriptor/fastrtps/transport/TCPTransportDescriptor.h
#	test/system/tools/xmlvalidation/XMLTesterExample_profile.xml
#	test/system/tools/xmlvalidation/all_profile.xml
#	test/unittest/transport/TCPv6Tests.cpp
#	test/unittest/transport/mock/MockTCPv4Transport.h
#	test/unittest/transport/mock/MockTCPv6Transport.h
#	test/unittest/xmlparser/XMLParserTests.cpp
  • Loading branch information
jepemi authored and mergify[bot] committed Mar 15, 2024
1 parent 75d6958 commit e5d65ee
Show file tree
Hide file tree
Showing 24 changed files with 3,823 additions and 23 deletions.
17 changes: 17 additions & 0 deletions include/fastdds/rtps/transport/TCPTransportDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ namespace rtps {
*
* - \c tls_config: Configuration for TLS.
*
<<<<<<< HEAD
=======
* - \c non_blocking_send: do not block on send operations. When it is set to true, send operations will return
* immediately if the buffer might get full, but no error will be returned to the upper layer. This means
* that the application will behave as if the datagram is sent and lost.
*
* - \c tcp_negotiation_timeout: time to wait for logical port negotiation (in ms).
*
>>>>>>> 8103cf042 (TCP first message loss (#4454))
* @ingroup TRANSPORT_MODULE
*/
struct TCPTransportDescriptor : public SocketTransportDescriptor
Expand Down Expand Up @@ -242,7 +251,15 @@ struct TCPTransportDescriptor : public SocketTransportDescriptor
//! Increment between logical ports to try during RTCP negotiation
uint16_t logical_port_increment;

<<<<<<< HEAD
FASTDDS_TODO_BEFORE(3, 0, "Eliminate tcp_negotiation_timeout, variable not in use.")
=======
/**
* Time to wait for logical port negotiation (ms). If a logical port is under negotiation, it waits for the
* negotiation to finish up to this timeout before trying to send a message to that port.
* Zero value means no waiting (default).
*/
>>>>>>> 8103cf042 (TCP first message loss (#4454))
uint32_t tcp_negotiation_timeout;

//! Enables the TCP_NODELAY socket option
Expand Down
6 changes: 6 additions & 0 deletions include/fastrtps/xmlparser/XMLParserCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ extern const char* METADATA_LOGICAL_PORT;
extern const char* LISTENING_PORTS;
extern const char* CALCULATE_CRC;
extern const char* CHECK_CRC;
<<<<<<< HEAD
=======
extern const char* KEEP_ALIVE_THREAD;
extern const char* ACCEPT_THREAD;
extern const char* TCP_NEGOTIATION_TIMEOUT;
>>>>>>> 8103cf042 (TCP first message loss (#4454))
extern const char* SEGMENT_SIZE;
extern const char* PORT_QUEUE_CAPACITY;
extern const char* PORT_OVERFLOW_POLICY;
Expand Down
144 changes: 144 additions & 0 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,138 @@
</xs:all>
</xs:complexType>

<<<<<<< HEAD
<xs:complexType name="rtpsTransportDescriptorType">
=======
<!--port:
├ portBase [uint16],
├ domainIDGain [uint16],
├ participantIDGain [uint16],
├ offsetd0 [uint16],
├ offsetd1 [uint16],
├ offsetd2 [uint16],
└ offsetd3 [uint16] -->
<xs:complexType name="portType">
<xs:all>
<xs:element name="portBase" type="uint16" minOccurs="0" maxOccurs="1"/>
<xs:element name="domainIDGain" type="uint16" minOccurs="0" maxOccurs="1"/>
<xs:element name="participantIDGain" type="uint16" minOccurs="0" maxOccurs="1"/>
<xs:element name="offsetd0" type="uint16" minOccurs="0" maxOccurs="1"/>
<xs:element name="offsetd1" type="uint16" minOccurs="0" maxOccurs="1"/>
<xs:element name="offsetd2" type="uint16" minOccurs="0" maxOccurs="1"/>
<xs:element name="offsetd3" type="uint16" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>

<!--participant allocation type:
├ remote_locators [0~1],
| ├ max_unicast_locators [uint32],
| └ max_multicast_locators [uint32],
├ total_participants [0~1],
├ total_readers [0~1],
├ total_writers [0~1],
├ send_buffers [0~1],
├ max_properties [uint32],
├ max_user_data [uint32],
└ max_partitions [uint32] -->
<xs:complexType name="rtpsParticipantAllocationAttributesType">
<xs:all>
<xs:element name="remote_locators" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:all>
<xs:element name="max_unicast_locators" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="max_multicast_locators" type="uint32" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>
</xs:element>
<xs:element name="total_participants" type="allocationConfigType" minOccurs="0" maxOccurs="1"/>
<xs:element name="total_readers" type="allocationConfigType" minOccurs="0" maxOccurs="1"/>
<xs:element name="total_writers" type="allocationConfigType" minOccurs="0" maxOccurs="1"/>
<xs:element name="send_buffers" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:all>
<xs:element name="preallocated_number" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="dynamic" type="boolean" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>
</xs:element>
<xs:element name="max_properties" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="max_user_data" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="max_partitions" type="uint32" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>




<!--| Data Writer / Publisher Definition |-->
<!--Data Writer Times:
├ initialHeartbeatDelay [durationType],
├ heartbeatPeriod [durationType],
├ nackResponseDelay [durationType],
└ nackSupressionDuration [durationType] -->
<xs:complexType name="writerTimesType">
<xs:all>
<xs:element name="initialHeartbeatDelay" type="durationType" minOccurs="0" maxOccurs="1"/>
<xs:element name="heartbeatPeriod" type="durationType" minOccurs="0" maxOccurs="1"/>
<xs:element name="nackResponseDelay" type="durationType" minOccurs="0" maxOccurs="1"/>
<xs:element name="nackSupressionDuration" type="durationType" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>


<!--| Data Reader / Subscriber Definition |-->
<!--Data Reader Times:
├ initialAcknackDelay [durationType],
└ heartbeatResponseDelay [durationType] -->
<xs:complexType name="readerTimesType">
<xs:all>
<xs:element name="initialAcknackDelay" type="durationType" minOccurs="0" maxOccurs="1"/>
<xs:element name="heartbeatResponseDelay" type="durationType" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>




<!--| Transport Descriptor Definition |-->
<!--Transport Descriptor:
├ transport_id [string],
├ type [string] ("UDPv4", "UDPv6", "TCPv4", "TCPv6", "SHM"),
├ sendBufferSize [uint32],
├ receiveBufferSize [uint32],
├ maxMessageSize [uint32],
├ maxInitialPeersRange [uint32],
├ interfaceWhiteList [0~*], (NOT available for SHM type)
| └ address [ipv4Address|ipv6Address]
├ TTL [uint8], (ONLY available for UDP type)
├ non_blocking_send [boolean], (NOT available for SHM type)
├ output_port [uint16], (ONLY available for UDP type)
├ wan_addr [ipv4AddressFormat], (ONLY available for TCPv4 type)
├ keep_alive_frequency_ms [uint32], (ONLY available for TCP type)
├ keep_alive_timeout_ms [uint32], (ONLY available for TCP type)
├ max_logical_port [uint16], (ONLY available for TCP type)
├ logical_port_range [uint16], (ONLY available for TCP type)
├ logical_port_increment [uint16], (ONLY available for TCP type)
├ listening_ports [0~*], (ONLY available for TCP type)
| └ port [uint16] (ONLY available for TCP type)
├ tls [0~1], (ONLY available for TCP type)
├ calculate_crc [bool], (ONLY available for TCP type)
├ check_crc [bool], (ONLY available for TCP type)
├ enable_tcp_nodelay [bool], (ONLY available for TCP type)
├ keep_alive_thread [threadSettingsType], (ONLY available for TCP type)
├ accept_thread [threadSettingsType], (ONLY available for TCP type)
├ tcp_negotiation_timeout [uint32], (ONLY available for TCP type)
├ segment_size [uint32], (ONLY available for SHM type)
├ port_queue_capacity [uint32], (ONLY available for SHM type)
├ healthy_check_timeout_ms [uint32], (ONLY available for SHM type)
├ rtps_dump_file [string] (ONLY available for SHM type)
├ default_reception_threads [threadSettingsType]
├ reception_threads [receptionThreadsListType] (ONLY available for SHM type)
└ dump_thread [threadSettingsType] (ONLY available for SHM type) -->
<!-- TODO: How to ensure all elements are declared properly (UDP only, TCP only, etc...)? -->
<xs:complexType name="transportDescriptorType">
>>>>>>> 8103cf042 (TCP first message loss (#4454))
<xs:all minOccurs="0">
<xs:element name="transport_id" type="stringType"/>
<xs:element name="type" type="stringType"/>
Expand All @@ -854,10 +985,23 @@
<xs:element name="check_crc" type="boolType" minOccurs="0" maxOccurs="1"/>
<xs:element name="enable_tcp_nodelay" type="boolType" minOccurs="0" maxOccurs="1"/>
<xs:element name="tls" type="tlsConfigType" minOccurs="0" maxOccurs="1"/>
<<<<<<< HEAD
<xs:element name="segment_size" type="uint32Type" minOccurs="0" maxOccurs="1"/>
<xs:element name="port_queue_capacity" type="uint32Type" minOccurs="0" maxOccurs="1"/>
<xs:element name="healthy_check_timeout_ms" type="uint32Type" minOccurs="0" maxOccurs="1"/>
<xs:element name="rtps_dump_file" type="stringType" minOccurs="0" maxOccurs="1"/>
=======
<xs:element name="keep_alive_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="accept_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="tcp_negotiation_timeout" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="segment_size" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="port_queue_capacity" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="healthy_check_timeout_ms" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="rtps_dump_file" type="string" minOccurs="0" maxOccurs="1"/>
<xs:element name="default_reception_threads" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="reception_threads" type="receptionThreadsListType" minOccurs="0" maxOccurs="1"/>
<xs:element name="dump_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
>>>>>>> 8103cf042 (TCP first message loss (#4454))
</xs:all>
</xs:complexType>

Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ static std::shared_ptr<fastdds::rtps::TCPv4TransportDescriptor> create_tcpv4_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

return descriptor;
}
Expand All @@ -114,6 +115,7 @@ static std::shared_ptr<fastdds::rtps::TCPv6TransportDescriptor> create_tcpv6_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

return descriptor;
}
Expand Down
74 changes: 65 additions & 9 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ ResponseCode TCPChannelResource::process_bind_request(

void TCPChannelResource::set_all_ports_pending()
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
pending_logical_output_ports_.insert(pending_logical_output_ports_.end(),
logical_output_ports_.begin(),
logical_output_ports_.end());
Expand All @@ -107,24 +107,75 @@ void TCPChannelResource::set_all_ports_pending()
bool TCPChannelResource::is_logical_port_opened(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return is_logical_port_opened_nts(port);
}

bool TCPChannelResource::is_logical_port_opened_nts(
uint16_t port)
{
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end();
}

bool TCPChannelResource::is_logical_port_added(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end()
|| std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port)
!= pending_logical_output_ports_.end();
}

bool TCPChannelResource::wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);

// Early return if the port is already opened.
if (is_logical_port_opened_nts(port))
{
return true;
}

// Early return if the timeout is 0.
if (timeout == std::chrono::milliseconds(0))
{
return false;
}

// The port is under negotiation if it's in the pending list and in the negotiation list.
bool found_in_negotiating_list = negotiating_logical_ports_.end() != std::find_if(
negotiating_logical_ports_.begin(),
negotiating_logical_ports_.end(),
[port](const decltype(negotiating_logical_ports_)::value_type& item)
{
return item.second == port;
});

if (found_in_negotiating_list &&
pending_logical_output_ports_.end() != std::find(
pending_logical_output_ports_.begin(),
pending_logical_output_ports_.end(),
port))
{
// Wait for the negotiation to finish. The condition variable might get notified if other logical port is opened. In such case,
// it should wait again with the respective remaining time.
auto wait_predicate = [this, port]() -> bool
{
return is_logical_port_opened_nts(port);
};
logical_output_ports_updated_cv.wait_for(scopedLock, timeout, wait_predicate);
}

return is_logical_port_opened_nts(port);
}

void TCPChannelResource::add_logical_port(
uint16_t port,
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
// Already opened?
if (std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) == logical_output_ports_.end())
{
Expand Down Expand Up @@ -152,7 +203,7 @@ void TCPChannelResource::add_logical_port(
void TCPChannelResource::send_pending_open_logical_ports(
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!pending_logical_output_ports_.empty())
{
for (uint16_t port : pending_logical_output_ports_)
Expand Down Expand Up @@ -182,7 +233,12 @@ void TCPChannelResource::add_logical_port_response(
if (success)
{
logical_output_ports_.push_back(port);
<<<<<<< HEAD
logInfo(RTCP, "OpenedLogicalPort: " << port);
=======
logical_output_ports_updated_cv.notify_all();
EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port);
>>>>>>> 8103cf042 (TCP first message loss (#4454))
}
else
{
Expand Down Expand Up @@ -219,7 +275,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
// Don't add ports just tested and already pendings
if (p <= max_port && p != closedPort)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto pendingIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), p);
if (pendingIt == pending_logical_output_ports_.end())
{
Expand All @@ -235,7 +291,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
else
{
TCPTransactionId id = rtcp_manager->sendCheckLogicalPortsRequest(this, candidatePorts);
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
last_checked_logical_port_[id] = candidatePorts.back();
}
}
Expand Down Expand Up @@ -270,7 +326,7 @@ void TCPChannelResource::process_check_logical_ports_response(
void TCPChannelResource::set_logical_port_pending(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto it = std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port);
if (it != logical_output_ports_.end())
{
Expand All @@ -282,7 +338,7 @@ void TCPChannelResource::set_logical_port_pending(
bool TCPChannelResource::remove_logical_port(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!is_logical_port_added(port))
{
return false;
Expand Down
Loading

0 comments on commit e5d65ee

Please sign in to comment.