Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20508] TCP first message loss (backport #4454) #4562

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion include/fastdds/rtps/transport/TCPTransportDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace rtps {
*
* - \c tls_config: Configuration for TLS.
*
* - \c tcp_negotiation_timeout: time to wait for logical port negotiation (in ms).
*
* @ingroup TRANSPORT_MODULE
*/
struct TCPTransportDescriptor : public SocketTransportDescriptor
Expand Down Expand Up @@ -246,7 +248,11 @@ struct TCPTransportDescriptor : public SocketTransportDescriptor
//! Increment between logical ports to try during RTCP negotiation
uint16_t logical_port_increment;

FASTDDS_TODO_BEFORE(3, 0, "Eliminate tcp_negotiation_timeout, variable not in use.")
/**
* Time to wait for logical port negotiation (ms). If a logical port is under negotiation, it waits for the
* negotiation to finish up to this timeout before trying to send a message to that port.
* Zero value means no waiting (default).
*/
uint32_t tcp_negotiation_timeout;

//! Enables the TCP_NODELAY socket option
Expand Down
1 change: 1 addition & 0 deletions include/fastrtps/xmlparser/XMLParserCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ extern const char* METADATA_LOGICAL_PORT;
extern const char* LISTENING_PORTS;
extern const char* CALCULATE_CRC;
extern const char* CHECK_CRC;
extern const char* TCP_NEGOTIATION_TIMEOUT;
extern const char* SEGMENT_SIZE;
extern const char* PORT_QUEUE_CAPACITY;
extern const char* PORT_OVERFLOW_POLICY;
Expand Down
2 changes: 2 additions & 0 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@
├ calculate_crc [bool], (ONLY available for TCP type)
├ check_crc [bool], (ONLY available for TCP type)
├ enable_tcp_nodelay [bool], (ONLY available for TCP type)
├ tcp_negotiation_timeout [uint32], (ONLY available for TCP type)
├ segment_size [uint32], (ONLY available for SHM type)
├ port_queue_capacity [uint32], (ONLY available for SHM type)
├ healthy_check_timeout_ms [uint32], (ONLY available for SHM type)
Expand Down Expand Up @@ -882,6 +883,7 @@
<xs:element name="check_crc" type="boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="enable_tcp_nodelay" type="boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="tls" type="tlsConfigType" minOccurs="0" maxOccurs="1"/>
<xs:element name="tcp_negotiation_timeout" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="segment_size" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="port_queue_capacity" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="healthy_check_timeout_ms" type="uint32" minOccurs="0" maxOccurs="1"/>
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ static std::shared_ptr<fastdds::rtps::TCPv4TransportDescriptor> create_tcpv4_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

return descriptor;
}
Expand All @@ -114,6 +115,7 @@ static std::shared_ptr<fastdds::rtps::TCPv6TransportDescriptor> create_tcpv6_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

return descriptor;
}
Expand Down
70 changes: 61 additions & 9 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ ResponseCode TCPChannelResource::process_bind_request(

void TCPChannelResource::set_all_ports_pending()
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
pending_logical_output_ports_.insert(pending_logical_output_ports_.end(),
logical_output_ports_.begin(),
logical_output_ports_.end());
Expand All @@ -107,24 +107,75 @@ void TCPChannelResource::set_all_ports_pending()
bool TCPChannelResource::is_logical_port_opened(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return is_logical_port_opened_nts(port);
}

bool TCPChannelResource::is_logical_port_opened_nts(
uint16_t port)
{
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end();
}

bool TCPChannelResource::is_logical_port_added(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end()
|| std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port)
!= pending_logical_output_ports_.end();
}

bool TCPChannelResource::wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);

// Early return if the port is already opened.
if (is_logical_port_opened_nts(port))
{
return true;
}

// Early return if the timeout is 0.
if (timeout == std::chrono::milliseconds(0))
{
return false;
}

// The port is under negotiation if it's in the pending list and in the negotiation list.
bool found_in_negotiating_list = negotiating_logical_ports_.end() != std::find_if(
negotiating_logical_ports_.begin(),
negotiating_logical_ports_.end(),
[port](const decltype(negotiating_logical_ports_)::value_type& item)
{
return item.second == port;
});

if (found_in_negotiating_list &&
pending_logical_output_ports_.end() != std::find(
pending_logical_output_ports_.begin(),
pending_logical_output_ports_.end(),
port))
{
// Wait for the negotiation to finish. The condition variable might get notified if other logical port is opened. In such case,
// it should wait again with the respective remaining time.
auto wait_predicate = [this, port]() -> bool
{
return is_logical_port_opened_nts(port);
};
logical_output_ports_updated_cv.wait_for(scopedLock, timeout, wait_predicate);
}

return is_logical_port_opened_nts(port);
}

void TCPChannelResource::add_logical_port(
uint16_t port,
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
// Already opened?
if (std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) == logical_output_ports_.end())
{
Expand All @@ -150,7 +201,7 @@ void TCPChannelResource::add_logical_port(
void TCPChannelResource::send_pending_open_logical_ports(
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!pending_logical_output_ports_.empty())
{
for (uint16_t port : pending_logical_output_ports_)
Expand Down Expand Up @@ -180,6 +231,7 @@ void TCPChannelResource::add_logical_port_response(
{
pending_logical_output_ports_.erase(portIt);
logical_output_ports_.push_back(port);
logical_output_ports_updated_cv.notify_all();
EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port);
}
else
Expand Down Expand Up @@ -217,7 +269,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
// Don't add ports just tested and already pendings
if (p <= max_port && p != closedPort)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto pendingIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), p);
if (pendingIt == pending_logical_output_ports_.end())
{
Expand All @@ -233,7 +285,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
else
{
TCPTransactionId id = rtcp_manager->sendCheckLogicalPortsRequest(this, candidatePorts);
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
last_checked_logical_port_[id] = candidatePorts.back();
}
}
Expand Down Expand Up @@ -268,7 +320,7 @@ void TCPChannelResource::process_check_logical_ports_response(
void TCPChannelResource::set_logical_port_pending(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto it = std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port);
if (it != logical_output_ports_.end())
{
Expand All @@ -280,7 +332,7 @@ void TCPChannelResource::set_logical_port_pending(
bool TCPChannelResource::remove_logical_port(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!is_logical_port_added(port))
{
return false;
Expand Down
17 changes: 17 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TCPChannelResource : public ChannelResource
std::map<TCPTransactionId, uint16_t> last_checked_logical_port_;
std::vector<uint16_t> pending_logical_output_ports_; // Must be accessed after lock pending_logical_mutex_
std::vector<uint16_t> logical_output_ports_;
std::condition_variable_any logical_output_ports_updated_cv;
std::mutex read_mutex_;
std::recursive_mutex pending_logical_mutex_;
std::atomic<eConnectionStatus> connection_status_;
Expand All @@ -94,6 +95,19 @@ class TCPChannelResource : public ChannelResource
bool is_logical_port_added(
uint16_t port);

/**
* This method checks if a logical port is under negotiation. If it is, it waits for the negotiation to finish up to a timeout.
* Independently if being under negotiation or not, it returns true if the port is opened, false otherwise.
*
* @param port The logical port to check.
* @param timeout The maximum time to wait for the negotiation to finish. Zero value means no wait
*
* @return true if the port is opened, false otherwise.
*/
bool wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout);

bool connection_established()
{
return connection_status_ == eConnectionStatus::eEstablished;
Expand Down Expand Up @@ -227,6 +241,9 @@ class TCPChannelResource : public ChannelResource

private:

bool is_logical_port_opened_nts(
uint16_t port);

void prepare_send_check_logical_ports_req(
uint16_t closedPort,
RTCPMessageManager* rtcp_manager);
Expand Down
Loading
Loading