Skip to content

Commit

Permalink
TCP first message loss (#4454)
Browse files Browse the repository at this point in the history
* Refs #20508: add remaining add_logical_port calls

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add unittests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add blackbox test + uncomment section (commented for testing purposes)

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add blackbox tests + functional fixes

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Fix tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add cv + fix windows tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508. Use lock_guard where apropiate.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20508. Proxy is_local_port_opened.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20508. Refactor wait_logical_port_under_negotiation.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20508: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Fix typo

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Add wait_for_logical_port_negotiation_ms to transport descriptor

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Apply suggestions - firs message loss related

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Apply suggestions - transport descriptor related

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Reuse unused transport descriptor tcp_negotiation_timeout

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Change timeout behavior

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Fix xml parser

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20508: Undo fastcdr commit change

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 8103cf0)

# Conflicts:
#	include/fastdds/rtps/transport/TCPTransportDescriptor.h
#	include/fastrtps/xmlparser/XMLParserCommon.h
#	resources/xsd/fastRTPS_profiles.xsd
#	src/cpp/rtps/transport/TCPTransportInterface.cpp
#	src/cpp/rtps/transport/TCPTransportInterface.h
#	src/cpp/rtps/xmlparser/XMLParser.cpp
#	src/cpp/rtps/xmlparser/XMLParserCommon.cpp
#	test/blackbox/common/BlackboxTestsTransportTCP.cpp
#	test/mock/rtps/TCPTransportDescriptor/fastrtps/transport/TCPTransportDescriptor.h
#	test/system/tools/xmlvalidation/XMLTesterExample_profile.xml
#	test/system/tools/xmlvalidation/all_profile.xml
#	test/unittest/transport/TCPv4Tests.cpp
#	test/unittest/transport/TCPv6Tests.cpp
#	test/unittest/transport/mock/MockTCPv4Transport.h
#	test/unittest/transport/mock/MockTCPv6Transport.h
#	test/unittest/xmlparser/XMLParserTests.cpp
  • Loading branch information
jepemi authored and mergify[bot] committed Mar 15, 2024
1 parent fa2c790 commit 87642e7
Show file tree
Hide file tree
Showing 24 changed files with 1,413 additions and 40 deletions.
17 changes: 17 additions & 0 deletions include/fastdds/rtps/transport/TCPTransportDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ namespace rtps {
*
* - \c tls_config: Configuration for TLS.
*
<<<<<<< HEAD
=======
* - \c non_blocking_send: do not block on send operations. When it is set to true, send operations will return
* immediately if the buffer might get full, but no error will be returned to the upper layer. This means
* that the application will behave as if the datagram is sent and lost.
*
* - \c tcp_negotiation_timeout: time to wait for logical port negotiation (in ms).
*
>>>>>>> 8103cf042 (TCP first message loss (#4454))
* @ingroup TRANSPORT_MODULE
*/
struct TCPTransportDescriptor : public SocketTransportDescriptor
Expand Down Expand Up @@ -246,7 +255,15 @@ struct TCPTransportDescriptor : public SocketTransportDescriptor
//! Increment between logical ports to try during RTCP negotiation
uint16_t logical_port_increment;

<<<<<<< HEAD
FASTDDS_TODO_BEFORE(3, 0, "Eliminate tcp_negotiation_timeout, variable not in use.")
=======
/**
* Time to wait for logical port negotiation (ms). If a logical port is under negotiation, it waits for the
* negotiation to finish up to this timeout before trying to send a message to that port.
* Zero value means no waiting (default).
*/
>>>>>>> 8103cf042 (TCP first message loss (#4454))
uint32_t tcp_negotiation_timeout;

//! Enables the TCP_NODELAY socket option
Expand Down
6 changes: 6 additions & 0 deletions include/fastrtps/xmlparser/XMLParserCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ extern const char* METADATA_LOGICAL_PORT;
extern const char* LISTENING_PORTS;
extern const char* CALCULATE_CRC;
extern const char* CHECK_CRC;
<<<<<<< HEAD
=======
extern const char* KEEP_ALIVE_THREAD;
extern const char* ACCEPT_THREAD;
extern const char* TCP_NEGOTIATION_TIMEOUT;
>>>>>>> 8103cf042 (TCP first message loss (#4454))
extern const char* SEGMENT_SIZE;
extern const char* PORT_QUEUE_CAPACITY;
extern const char* PORT_OVERFLOW_POLICY;
Expand Down
42 changes: 42 additions & 0 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@

<!--| Transport Descriptor Definition |-->
<!--Transport Descriptor:
<<<<<<< HEAD
├ transport_id [string],
├ type [string] ("UDPv4", "UDPv6", "TCPv4", "TCPv6", "SHM"),
├ sendBufferSize [uint32],
Expand Down Expand Up @@ -843,6 +844,41 @@
├ port_queue_capacity [uint32], (ONLY available for SHM type)
├ healthy_check_timeout_ms [uint32], (ONLY available for SHM type)
└ rtps_dump_file [string] (ONLY available for SHM type) -->
=======
├ transport_id [string],
├ type [string] ("UDPv4", "UDPv6", "TCPv4", "TCPv6", "SHM"),
├ sendBufferSize [uint32],
├ receiveBufferSize [uint32],
├ maxMessageSize [uint32],
├ maxInitialPeersRange [uint32],
├ interfaceWhiteList [0~*], (NOT available for SHM type)
| └ address [ipv4Address|ipv6Address]
├ TTL [uint8], (ONLY available for UDP type)
├ non_blocking_send [boolean], (NOT available for SHM type)
├ output_port [uint16], (ONLY available for UDP type)
├ wan_addr [ipv4AddressFormat], (ONLY available for TCPv4 type)
├ keep_alive_frequency_ms [uint32], (ONLY available for TCP type)
├ keep_alive_timeout_ms [uint32], (ONLY available for TCP type)
├ max_logical_port [uint16], (ONLY available for TCP type)
├ logical_port_range [uint16], (ONLY available for TCP type)
├ logical_port_increment [uint16], (ONLY available for TCP type)
├ listening_ports [0~*], (ONLY available for TCP type)
| └ port [uint16] (ONLY available for TCP type)
├ tls [0~1], (ONLY available for TCP type)
├ calculate_crc [bool], (ONLY available for TCP type)
├ check_crc [bool], (ONLY available for TCP type)
├ enable_tcp_nodelay [bool], (ONLY available for TCP type)
├ keep_alive_thread [threadSettingsType], (ONLY available for TCP type)
├ accept_thread [threadSettingsType], (ONLY available for TCP type)
├ tcp_negotiation_timeout [uint32], (ONLY available for TCP type)
├ segment_size [uint32], (ONLY available for SHM type)
├ port_queue_capacity [uint32], (ONLY available for SHM type)
├ healthy_check_timeout_ms [uint32], (ONLY available for SHM type)
├ rtps_dump_file [string] (ONLY available for SHM type)
├ default_reception_threads [threadSettingsType]
├ reception_threads [receptionThreadsListType] (ONLY available for SHM type)
└ dump_thread [threadSettingsType] (ONLY available for SHM type) -->
>>>>>>> 8103cf042 (TCP first message loss (#4454))
<!-- TODO: How to ensure all elements are declared properly (UDP only, TCP only, etc...)? -->
<xs:complexType name="transportDescriptorType">
<xs:all minOccurs="0">
Expand Down Expand Up @@ -896,6 +932,12 @@
<xs:element name="check_crc" type="boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="enable_tcp_nodelay" type="boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="tls" type="tlsConfigType" minOccurs="0" maxOccurs="1"/>
<<<<<<< HEAD
=======
<xs:element name="keep_alive_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="accept_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="tcp_negotiation_timeout" type="uint32" minOccurs="0" maxOccurs="1"/>
>>>>>>> 8103cf042 (TCP first message loss (#4454))
<xs:element name="segment_size" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="port_queue_capacity" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="healthy_check_timeout_ms" type="uint32" minOccurs="0" maxOccurs="1"/>
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ static std::shared_ptr<fastdds::rtps::TCPv4TransportDescriptor> create_tcpv4_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

return descriptor;
}
Expand All @@ -114,6 +115,7 @@ static std::shared_ptr<fastdds::rtps::TCPv6TransportDescriptor> create_tcpv6_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

return descriptor;
}
Expand Down
70 changes: 61 additions & 9 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ ResponseCode TCPChannelResource::process_bind_request(

void TCPChannelResource::set_all_ports_pending()
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
pending_logical_output_ports_.insert(pending_logical_output_ports_.end(),
logical_output_ports_.begin(),
logical_output_ports_.end());
Expand All @@ -107,24 +107,75 @@ void TCPChannelResource::set_all_ports_pending()
bool TCPChannelResource::is_logical_port_opened(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return is_logical_port_opened_nts(port);
}

bool TCPChannelResource::is_logical_port_opened_nts(
uint16_t port)
{
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end();
}

bool TCPChannelResource::is_logical_port_added(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end()
|| std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port)
!= pending_logical_output_ports_.end();
}

bool TCPChannelResource::wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);

// Early return if the port is already opened.
if (is_logical_port_opened_nts(port))
{
return true;
}

// Early return if the timeout is 0.
if (timeout == std::chrono::milliseconds(0))
{
return false;
}

// The port is under negotiation if it's in the pending list and in the negotiation list.
bool found_in_negotiating_list = negotiating_logical_ports_.end() != std::find_if(
negotiating_logical_ports_.begin(),
negotiating_logical_ports_.end(),
[port](const decltype(negotiating_logical_ports_)::value_type& item)
{
return item.second == port;
});

if (found_in_negotiating_list &&
pending_logical_output_ports_.end() != std::find(
pending_logical_output_ports_.begin(),
pending_logical_output_ports_.end(),
port))
{
// Wait for the negotiation to finish. The condition variable might get notified if other logical port is opened. In such case,
// it should wait again with the respective remaining time.
auto wait_predicate = [this, port]() -> bool
{
return is_logical_port_opened_nts(port);
};
logical_output_ports_updated_cv.wait_for(scopedLock, timeout, wait_predicate);
}

return is_logical_port_opened_nts(port);
}

void TCPChannelResource::add_logical_port(
uint16_t port,
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
// Already opened?
if (std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) == logical_output_ports_.end())
{
Expand All @@ -150,7 +201,7 @@ void TCPChannelResource::add_logical_port(
void TCPChannelResource::send_pending_open_logical_ports(
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!pending_logical_output_ports_.empty())
{
for (uint16_t port : pending_logical_output_ports_)
Expand Down Expand Up @@ -180,6 +231,7 @@ void TCPChannelResource::add_logical_port_response(
if (success)
{
logical_output_ports_.push_back(port);
logical_output_ports_updated_cv.notify_all();
EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port);
}
else
Expand Down Expand Up @@ -217,7 +269,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
// Don't add ports just tested and already pendings
if (p <= max_port && p != closedPort)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto pendingIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), p);
if (pendingIt == pending_logical_output_ports_.end())
{
Expand All @@ -233,7 +285,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
else
{
TCPTransactionId id = rtcp_manager->sendCheckLogicalPortsRequest(this, candidatePorts);
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
last_checked_logical_port_[id] = candidatePorts.back();
}
}
Expand Down Expand Up @@ -268,7 +320,7 @@ void TCPChannelResource::process_check_logical_ports_response(
void TCPChannelResource::set_logical_port_pending(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto it = std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port);
if (it != logical_output_ports_.end())
{
Expand All @@ -280,7 +332,7 @@ void TCPChannelResource::set_logical_port_pending(
bool TCPChannelResource::remove_logical_port(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!is_logical_port_added(port))
{
return false;
Expand Down
17 changes: 17 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TCPChannelResource : public ChannelResource
std::map<TCPTransactionId, uint16_t> last_checked_logical_port_;
std::vector<uint16_t> pending_logical_output_ports_; // Must be accessed after lock pending_logical_mutex_
std::vector<uint16_t> logical_output_ports_;
std::condition_variable_any logical_output_ports_updated_cv;
std::mutex read_mutex_;
std::recursive_mutex pending_logical_mutex_;
std::atomic<eConnectionStatus> connection_status_;
Expand All @@ -94,6 +95,19 @@ class TCPChannelResource : public ChannelResource
bool is_logical_port_added(
uint16_t port);

/**
* This method checks if a logical port is under negotiation. If it is, it waits for the negotiation to finish up to a timeout.
* Independently if being under negotiation or not, it returns true if the port is opened, false otherwise.
*
* @param port The logical port to check.
* @param timeout The maximum time to wait for the negotiation to finish. Zero value means no wait
*
* @return true if the port is opened, false otherwise.
*/
bool wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout);

bool connection_established()
{
return connection_status_ == eConnectionStatus::eEstablished;
Expand Down Expand Up @@ -201,6 +215,9 @@ class TCPChannelResource : public ChannelResource

private:

bool is_logical_port_opened_nts(
uint16_t port);

void prepare_send_check_logical_ports_req(
uint16_t closedPort,
RTCPMessageManager* rtcp_manager);
Expand Down
Loading

0 comments on commit 87642e7

Please sign in to comment.