Skip to content

Commit

Permalink
TCP first message loss (#4454)
Browse files Browse the repository at this point in the history
Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
  • Loading branch information
jepemi committed Apr 1, 2024
1 parent d2e1e3b commit bf8ad84
Show file tree
Hide file tree
Showing 24 changed files with 566 additions and 45 deletions.
8 changes: 7 additions & 1 deletion include/fastdds/rtps/transport/TCPTransportDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace rtps {
*
* - \c tls_config: Configuration for TLS.
*
* - \c tcp_negotiation_timeout: time to wait for logical port negotiation (in ms).
*
* @ingroup TRANSPORT_MODULE
*/
struct TCPTransportDescriptor : public SocketTransportDescriptor
Expand Down Expand Up @@ -246,7 +248,11 @@ struct TCPTransportDescriptor : public SocketTransportDescriptor
//! Increment between logical ports to try during RTCP negotiation
uint16_t logical_port_increment;

FASTDDS_TODO_BEFORE(3, 0, "Eliminate tcp_negotiation_timeout, variable not in use.")
/**
* Time to wait for logical port negotiation (ms). If a logical port is under negotiation, it waits for the
* negotiation to finish up to this timeout before trying to send a message to that port.
* Zero value means no waiting (default).
*/
uint32_t tcp_negotiation_timeout;

//! Enables the TCP_NODELAY socket option
Expand Down
1 change: 1 addition & 0 deletions include/fastrtps/xmlparser/XMLParserCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ extern const char* METADATA_LOGICAL_PORT;
extern const char* LISTENING_PORTS;
extern const char* CALCULATE_CRC;
extern const char* CHECK_CRC;
extern const char* TCP_NEGOTIATION_TIMEOUT;
extern const char* SEGMENT_SIZE;
extern const char* PORT_QUEUE_CAPACITY;
extern const char* PORT_OVERFLOW_POLICY;
Expand Down
2 changes: 2 additions & 0 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@
├ calculate_crc [bool], (ONLY available for TCP type)
├ check_crc [bool], (ONLY available for TCP type)
├ enable_tcp_nodelay [bool], (ONLY available for TCP type)
├ tcp_negotiation_timeout [uint32], (ONLY available for TCP type)
├ segment_size [uint32], (ONLY available for SHM type)
├ port_queue_capacity [uint32], (ONLY available for SHM type)
├ healthy_check_timeout_ms [uint32], (ONLY available for SHM type)
Expand Down Expand Up @@ -882,6 +883,7 @@
<xs:element name="check_crc" type="boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="enable_tcp_nodelay" type="boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="tls" type="tlsConfigType" minOccurs="0" maxOccurs="1"/>
<xs:element name="tcp_negotiation_timeout" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="segment_size" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="port_queue_capacity" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="healthy_check_timeout_ms" type="uint32" minOccurs="0" maxOccurs="1"/>
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ static std::shared_ptr<fastdds::rtps::TCPv4TransportDescriptor> create_tcpv4_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

return descriptor;
}
Expand All @@ -114,6 +115,7 @@ static std::shared_ptr<fastdds::rtps::TCPv6TransportDescriptor> create_tcpv6_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

return descriptor;
}
Expand Down
70 changes: 61 additions & 9 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ ResponseCode TCPChannelResource::process_bind_request(

void TCPChannelResource::set_all_ports_pending()
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
pending_logical_output_ports_.insert(pending_logical_output_ports_.end(),
logical_output_ports_.begin(),
logical_output_ports_.end());
Expand All @@ -107,24 +107,75 @@ void TCPChannelResource::set_all_ports_pending()
bool TCPChannelResource::is_logical_port_opened(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return is_logical_port_opened_nts(port);
}

bool TCPChannelResource::is_logical_port_opened_nts(
uint16_t port)
{
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end();
}

bool TCPChannelResource::is_logical_port_added(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end()
|| std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port)
!= pending_logical_output_ports_.end();
}

bool TCPChannelResource::wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);

// Early return if the port is already opened.
if (is_logical_port_opened_nts(port))
{
return true;
}

// Early return if the timeout is 0.
if (timeout == std::chrono::milliseconds(0))
{
return false;
}

// The port is under negotiation if it's in the pending list and in the negotiation list.
bool found_in_negotiating_list = negotiating_logical_ports_.end() != std::find_if(
negotiating_logical_ports_.begin(),
negotiating_logical_ports_.end(),
[port](const decltype(negotiating_logical_ports_)::value_type& item)
{
return item.second == port;
});

if (found_in_negotiating_list &&
pending_logical_output_ports_.end() != std::find(
pending_logical_output_ports_.begin(),
pending_logical_output_ports_.end(),
port))
{
// Wait for the negotiation to finish. The condition variable might get notified if other logical port is opened. In such case,
// it should wait again with the respective remaining time.
auto wait_predicate = [this, port]() -> bool
{
return is_logical_port_opened_nts(port);
};
logical_output_ports_updated_cv.wait_for(scopedLock, timeout, wait_predicate);
}

return is_logical_port_opened_nts(port);
}

void TCPChannelResource::add_logical_port(
uint16_t port,
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
// Already opened?
if (std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) == logical_output_ports_.end())
{
Expand All @@ -150,7 +201,7 @@ void TCPChannelResource::add_logical_port(
void TCPChannelResource::send_pending_open_logical_ports(
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!pending_logical_output_ports_.empty())
{
for (uint16_t port : pending_logical_output_ports_)
Expand Down Expand Up @@ -180,6 +231,7 @@ void TCPChannelResource::add_logical_port_response(
{
pending_logical_output_ports_.erase(portIt);
logical_output_ports_.push_back(port);
logical_output_ports_updated_cv.notify_all();
EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port);
}
else
Expand Down Expand Up @@ -217,7 +269,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
// Don't add ports just tested and already pendings
if (p <= max_port && p != closedPort)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto pendingIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), p);
if (pendingIt == pending_logical_output_ports_.end())
{
Expand All @@ -233,7 +285,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
else
{
TCPTransactionId id = rtcp_manager->sendCheckLogicalPortsRequest(this, candidatePorts);
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
last_checked_logical_port_[id] = candidatePorts.back();
}
}
Expand Down Expand Up @@ -268,7 +320,7 @@ void TCPChannelResource::process_check_logical_ports_response(
void TCPChannelResource::set_logical_port_pending(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto it = std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port);
if (it != logical_output_ports_.end())
{
Expand All @@ -280,7 +332,7 @@ void TCPChannelResource::set_logical_port_pending(
bool TCPChannelResource::remove_logical_port(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!is_logical_port_added(port))
{
return false;
Expand Down
17 changes: 17 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TCPChannelResource : public ChannelResource
std::map<TCPTransactionId, uint16_t> last_checked_logical_port_;
std::vector<uint16_t> pending_logical_output_ports_; // Must be accessed after lock pending_logical_mutex_
std::vector<uint16_t> logical_output_ports_;
std::condition_variable_any logical_output_ports_updated_cv;
std::mutex read_mutex_;
std::recursive_mutex pending_logical_mutex_;
std::atomic<eConnectionStatus> connection_status_;
Expand All @@ -94,6 +95,19 @@ class TCPChannelResource : public ChannelResource
bool is_logical_port_added(
uint16_t port);

/**
* This method checks if a logical port is under negotiation. If it is, it waits for the negotiation to finish up to a timeout.
* Independently if being under negotiation or not, it returns true if the port is opened, false otherwise.
*
* @param port The logical port to check.
* @param timeout The maximum time to wait for the negotiation to finish. Zero value means no wait
*
* @return true if the port is opened, false otherwise.
*/
bool wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout);

bool connection_established()
{
return connection_status_ == eConnectionStatus::eEstablished;
Expand Down Expand Up @@ -227,6 +241,9 @@ class TCPChannelResource : public ChannelResource

private:

bool is_logical_port_opened_nts(
uint16_t port);

void prepare_send_check_logical_ports_req(
uint16_t closedPort,
RTCPMessageManager* rtcp_manager);
Expand Down
Loading

0 comments on commit bf8ad84

Please sign in to comment.