diff --git a/include/fastdds/rtps/transport/TCPTransportDescriptor.h b/include/fastdds/rtps/transport/TCPTransportDescriptor.h index 43b7737b019..c9619b10b69 100644 --- a/include/fastdds/rtps/transport/TCPTransportDescriptor.h +++ b/include/fastdds/rtps/transport/TCPTransportDescriptor.h @@ -48,6 +48,15 @@ namespace rtps { * * - \c tls_config: Configuration for TLS. * +<<<<<<< HEAD +======= + * - \c non_blocking_send: do not block on send operations. When it is set to true, send operations will return + * immediately if the buffer might get full, but no error will be returned to the upper layer. This means + * that the application will behave as if the datagram is sent and lost. + * + * - \c tcp_negotiation_timeout: time to wait for logical port negotiation (in ms). + * +>>>>>>> 8103cf042 (TCP first message loss (#4454)) * @ingroup TRANSPORT_MODULE */ struct TCPTransportDescriptor : public SocketTransportDescriptor @@ -246,7 +255,15 @@ struct TCPTransportDescriptor : public SocketTransportDescriptor //! Increment between logical ports to try during RTCP negotiation uint16_t logical_port_increment; +<<<<<<< HEAD FASTDDS_TODO_BEFORE(3, 0, "Eliminate tcp_negotiation_timeout, variable not in use.") +======= + /** + * Time to wait for logical port negotiation (ms). If a logical port is under negotiation, it waits for the + * negotiation to finish up to this timeout before trying to send a message to that port. + * Zero value means no waiting (default). + */ +>>>>>>> 8103cf042 (TCP first message loss (#4454)) uint32_t tcp_negotiation_timeout; //! Enables the TCP_NODELAY socket option diff --git a/include/fastrtps/xmlparser/XMLParserCommon.h b/include/fastrtps/xmlparser/XMLParserCommon.h index 0aa98e49607..586ae2a4149 100644 --- a/include/fastrtps/xmlparser/XMLParserCommon.h +++ b/include/fastrtps/xmlparser/XMLParserCommon.h @@ -72,6 +72,12 @@ extern const char* METADATA_LOGICAL_PORT; extern const char* LISTENING_PORTS; extern const char* CALCULATE_CRC; extern const char* CHECK_CRC; +<<<<<<< HEAD +======= +extern const char* KEEP_ALIVE_THREAD; +extern const char* ACCEPT_THREAD; +extern const char* TCP_NEGOTIATION_TIMEOUT; +>>>>>>> 8103cf042 (TCP first message loss (#4454)) extern const char* SEGMENT_SIZE; extern const char* PORT_QUEUE_CAPACITY; extern const char* PORT_OVERFLOW_POLICY; diff --git a/resources/xsd/fastRTPS_profiles.xsd b/resources/xsd/fastRTPS_profiles.xsd index c44dc28c2a9..80047a774c2 100644 --- a/resources/xsd/fastRTPS_profiles.xsd +++ b/resources/xsd/fastRTPS_profiles.xsd @@ -805,6 +805,7 @@ +======= + ├ transport_id [string], + ├ type [string] ("UDPv4", "UDPv6", "TCPv4", "TCPv6", "SHM"), + ├ sendBufferSize [uint32], + ├ receiveBufferSize [uint32], + ├ maxMessageSize [uint32], + ├ maxInitialPeersRange [uint32], + ├ interfaceWhiteList [0~*], (NOT available for SHM type) + | └ address [ipv4Address|ipv6Address] + ├ TTL [uint8], (ONLY available for UDP type) + ├ non_blocking_send [boolean], (NOT available for SHM type) + ├ output_port [uint16], (ONLY available for UDP type) + ├ wan_addr [ipv4AddressFormat], (ONLY available for TCPv4 type) + ├ keep_alive_frequency_ms [uint32], (ONLY available for TCP type) + ├ keep_alive_timeout_ms [uint32], (ONLY available for TCP type) + ├ max_logical_port [uint16], (ONLY available for TCP type) + ├ logical_port_range [uint16], (ONLY available for TCP type) + ├ logical_port_increment [uint16], (ONLY available for TCP type) + ├ listening_ports [0~*], (ONLY available for TCP type) + | └ port [uint16] (ONLY available for TCP type) + ├ tls [0~1], (ONLY available for TCP type) + ├ calculate_crc [bool], (ONLY available for TCP type) + ├ check_crc [bool], (ONLY available for TCP type) + ├ enable_tcp_nodelay [bool], (ONLY available for TCP type) + ├ keep_alive_thread [threadSettingsType], (ONLY available for TCP type) + ├ accept_thread [threadSettingsType], (ONLY available for TCP type) + ├ tcp_negotiation_timeout [uint32], (ONLY available for TCP type) + ├ segment_size [uint32], (ONLY available for SHM type) + ├ port_queue_capacity [uint32], (ONLY available for SHM type) + ├ healthy_check_timeout_ms [uint32], (ONLY available for SHM type) + ├ rtps_dump_file [string] (ONLY available for SHM type) + ├ default_reception_threads [threadSettingsType] + ├ reception_threads [receptionThreadsListType] (ONLY available for SHM type) + └ dump_thread [threadSettingsType] (ONLY available for SHM type) --> +>>>>>>> 8103cf042 (TCP first message loss (#4454)) @@ -882,6 +918,12 @@ +<<<<<<< HEAD +======= + + + +>>>>>>> 8103cf042 (TCP first message loss (#4454)) diff --git a/src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp b/src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp index e2f9df11abb..0d0647c8e32 100644 --- a/src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp +++ b/src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp @@ -98,6 +98,7 @@ static std::shared_ptr create_tcpv4_tra descriptor->check_crc = false; descriptor->apply_security = false; descriptor->enable_tcp_nodelay = true; + descriptor->tcp_negotiation_timeout = 0; return descriptor; } @@ -114,6 +115,7 @@ static std::shared_ptr create_tcpv6_tra descriptor->check_crc = false; descriptor->apply_security = false; descriptor->enable_tcp_nodelay = true; + descriptor->tcp_negotiation_timeout = 0; return descriptor; } diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index 315e9f6cbe0..2bb1e4b5db6 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -97,7 +97,7 @@ ResponseCode TCPChannelResource::process_bind_request( void TCPChannelResource::set_all_ports_pending() { - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); pending_logical_output_ports_.insert(pending_logical_output_ports_.end(), logical_output_ports_.begin(), logical_output_ports_.end()); @@ -107,24 +107,75 @@ void TCPChannelResource::set_all_ports_pending() bool TCPChannelResource::is_logical_port_opened( uint16_t port) { - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); + return is_logical_port_opened_nts(port); +} + +bool TCPChannelResource::is_logical_port_opened_nts( + uint16_t port) +{ return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end(); } bool TCPChannelResource::is_logical_port_added( uint16_t port) { - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end() || std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port) != pending_logical_output_ports_.end(); } +bool TCPChannelResource::wait_logical_port_under_negotiation( + uint16_t port, + const std::chrono::milliseconds& timeout) +{ + std::unique_lock scopedLock(pending_logical_mutex_); + + // Early return if the port is already opened. + if (is_logical_port_opened_nts(port)) + { + return true; + } + + // Early return if the timeout is 0. + if (timeout == std::chrono::milliseconds(0)) + { + return false; + } + + // The port is under negotiation if it's in the pending list and in the negotiation list. + bool found_in_negotiating_list = negotiating_logical_ports_.end() != std::find_if( + negotiating_logical_ports_.begin(), + negotiating_logical_ports_.end(), + [port](const decltype(negotiating_logical_ports_)::value_type& item) + { + return item.second == port; + }); + + if (found_in_negotiating_list && + pending_logical_output_ports_.end() != std::find( + pending_logical_output_ports_.begin(), + pending_logical_output_ports_.end(), + port)) + { + // Wait for the negotiation to finish. The condition variable might get notified if other logical port is opened. In such case, + // it should wait again with the respective remaining time. + auto wait_predicate = [this, port]() -> bool + { + return is_logical_port_opened_nts(port); + }; + logical_output_ports_updated_cv.wait_for(scopedLock, timeout, wait_predicate); + } + + return is_logical_port_opened_nts(port); +} + void TCPChannelResource::add_logical_port( uint16_t port, RTCPMessageManager* rtcp_manager) { - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); // Already opened? if (std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) == logical_output_ports_.end()) { @@ -150,7 +201,7 @@ void TCPChannelResource::add_logical_port( void TCPChannelResource::send_pending_open_logical_ports( RTCPMessageManager* rtcp_manager) { - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); if (!pending_logical_output_ports_.empty()) { for (uint16_t port : pending_logical_output_ports_) @@ -180,6 +231,7 @@ void TCPChannelResource::add_logical_port_response( { pending_logical_output_ports_.erase(portIt); logical_output_ports_.push_back(port); + logical_output_ports_updated_cv.notify_all(); EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port); } else @@ -217,7 +269,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req( // Don't add ports just tested and already pendings if (p <= max_port && p != closedPort) { - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); auto pendingIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), p); if (pendingIt == pending_logical_output_ports_.end()) { @@ -233,7 +285,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req( else { TCPTransactionId id = rtcp_manager->sendCheckLogicalPortsRequest(this, candidatePorts); - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); last_checked_logical_port_[id] = candidatePorts.back(); } } @@ -268,7 +320,7 @@ void TCPChannelResource::process_check_logical_ports_response( void TCPChannelResource::set_logical_port_pending( uint16_t port) { - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); auto it = std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port); if (it != logical_output_ports_.end()) { @@ -280,7 +332,7 @@ void TCPChannelResource::set_logical_port_pending( bool TCPChannelResource::remove_logical_port( uint16_t port) { - std::unique_lock scopedLock(pending_logical_mutex_); + std::lock_guard scopedLock(pending_logical_mutex_); if (!is_logical_port_added(port)) { return false; diff --git a/src/cpp/rtps/transport/TCPChannelResource.h b/src/cpp/rtps/transport/TCPChannelResource.h index 4bceb696bf2..172f99e8ddc 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.h +++ b/src/cpp/rtps/transport/TCPChannelResource.h @@ -70,6 +70,7 @@ class TCPChannelResource : public ChannelResource std::map last_checked_logical_port_; std::vector pending_logical_output_ports_; // Must be accessed after lock pending_logical_mutex_ std::vector logical_output_ports_; + std::condition_variable_any logical_output_ports_updated_cv; std::mutex read_mutex_; std::recursive_mutex pending_logical_mutex_; std::atomic connection_status_; @@ -94,6 +95,19 @@ class TCPChannelResource : public ChannelResource bool is_logical_port_added( uint16_t port); + /** + * This method checks if a logical port is under negotiation. If it is, it waits for the negotiation to finish up to a timeout. + * Independently if being under negotiation or not, it returns true if the port is opened, false otherwise. + * + * @param port The logical port to check. + * @param timeout The maximum time to wait for the negotiation to finish. Zero value means no wait + * + * @return true if the port is opened, false otherwise. + */ + bool wait_logical_port_under_negotiation( + uint16_t port, + const std::chrono::milliseconds& timeout); + bool connection_established() { return connection_status_ == eConnectionStatus::eEstablished; @@ -201,6 +215,9 @@ class TCPChannelResource : public ChannelResource private: + bool is_logical_port_opened_nts( + uint16_t port); + void prepare_send_check_logical_ports_req( uint16_t closedPort, RTCPMessageManager* rtcp_manager); diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index aff82c3fd95..77e506f89c6 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -55,10 +55,13 @@ static const int s_default_keep_alive_frequency = 5000; // 5 SECONDS static const int s_default_keep_alive_timeout = 15000; // 15 SECONDS //static const int s_clean_deleted_sockets_pool_timeout = 100; // 100 MILLISECONDS +<<<<<<< HEAD FASTDDS_TODO_BEFORE(3, 0, "Eliminate s_default_tcp_negotitation_timeout, variable used to initialize deprecate attribute.") static const int s_default_tcp_negotitation_timeout = 5000; // 5 Seconds +======= +>>>>>>> 8103cf042 (TCP first message loss (#4454)) TCPTransportDescriptor::TCPTransportDescriptor() : SocketTransportDescriptor(s_maximumMessageSize, s_maximumInitialPeersRange) , keep_alive_frequency_ms(s_default_keep_alive_frequency) @@ -66,7 +69,7 @@ TCPTransportDescriptor::TCPTransportDescriptor() , max_logical_port(100) , logical_port_range(20) , logical_port_increment(2) - , tcp_negotiation_timeout(s_default_tcp_negotitation_timeout) + , tcp_negotiation_timeout(0) , enable_tcp_nodelay(false) , wait_for_tcp_negotiation(false) , calculate_crc(true) @@ -644,6 +647,8 @@ bool TCPTransportInterface::OpenOutputChannel( Locator physical_locator = IPLocator::toPhysicalLocator(locator); + std::lock_guard socketsLock(sockets_map_mutex_); + // We try to find a SenderResource that has this locator. // Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport // already reuses a SenderResource. @@ -656,7 +661,26 @@ bool TCPTransportInterface::OpenOutputChannel( IPLocator::WanToLanLocator(physical_locator) == tcp_sender_resource->locator()))) { - // If missing, logical port will be added in first send() + // Add logical port to channel if it's not there yet + auto channel_resource = channel_resources_.find(physical_locator); + + // Maybe as WAN? + if (channel_resource == channel_resources_.end() && IPLocator::hasWan(locator)) + { + Locator wan_locator = IPLocator::WanToLanLocator(locator); + channel_resource = channel_resources_.find(IPLocator::toPhysicalLocator(wan_locator)); + } + + if (channel_resource != channel_resources_.end()) + { + channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get()); + } + else + { + std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); + channel_pending_logical_ports_[physical_locator].insert(logical_port); + } + statistics_info_.add_entry(locator); return true; } @@ -670,7 +694,6 @@ bool TCPTransportInterface::OpenOutputChannel( << IPLocator::getLogicalPort( locator) << ") @ " << IPLocator::to_string(locator)); - std::lock_guard socketsLock(sockets_map_mutex_); auto channel_resource = channel_resources_.find(physical_locator); // Maybe as WAN? @@ -760,6 +783,8 @@ bool TCPTransportInterface::OpenOutputChannel( EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [WAIT_CONNECTION] (physical: " << IPLocator::getPhysicalPort(locator) << "; logical: " << IPLocator::getLogicalPort(locator) << ") @ " << IPLocator::to_string(locator)); + std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); + channel_pending_logical_ports_[physical_locator].insert(logical_port); } } @@ -1234,7 +1259,7 @@ bool TCPTransportInterface::send( bool success = false; - std::lock_guard scoped_lock(sockets_map_mutex_); + std::unique_lock scoped_lock(sockets_map_mutex_); auto channel_resource = channel_resources_.find(locator); if (channel_resource == channel_resources_.end()) { @@ -1262,31 +1287,42 @@ bool TCPTransportInterface::send( if (channel->is_logical_port_added(logical_port)) { - if (channel->is_logical_port_opened(logical_port)) + // If tcp_negotiation_timeout is setted, wait until logical port is opened or timeout. Negative timeout means + // waiting indefinitely. + if (!channel->is_logical_port_opened(logical_port)) { - TCPHeader tcp_header; - statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size); - fill_rtcp_header(tcp_header, send_buffer, send_buffer_size, logical_port); - + // Logical port might be under negotiation. Wait a little and check again. This prevents from + // losing first messages. + scoped_lock.unlock(); + bool logical_port_opened = channel->wait_logical_port_under_negotiation(logical_port, std::chrono::milliseconds( + configuration()->tcp_negotiation_timeout)); + if (!logical_port_opened) { - asio::error_code ec; - size_t sent = channel->send( - (octet*)&tcp_header, - static_cast(TCPHeader::size()), - send_buffer, - send_buffer_size, - ec); - - if (sent != static_cast(TCPHeader::size() + send_buffer_size) || ec) - { - EPROSIMA_LOG_WARNING(DEBUG, "Failed to send RTCP message (" << sent << " of " << - TCPHeader::size() + send_buffer_size << " b): " << ec.message()); - success = false; - } - else - { - success = true; - } + return success; + } + scoped_lock.lock(); + } + TCPHeader tcp_header; + statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size); + fill_rtcp_header(tcp_header, send_buffer, send_buffer_size, logical_port); + { + asio::error_code ec; + size_t sent = channel->send( + (octet*)&tcp_header, + static_cast(TCPHeader::size()), + send_buffer, + send_buffer_size, + ec); + + if (sent != static_cast(TCPHeader::size() + send_buffer_size) || ec) + { + EPROSIMA_LOG_WARNING(DEBUG, "Failed to send RTCP message (" << sent << " of " << + TCPHeader::size() + send_buffer_size << " b): " << ec.message()); + success = false; + } + else + { + success = true; } } } @@ -1758,6 +1794,78 @@ void TCPTransportInterface::fill_local_physical_port( } } +<<<<<<< HEAD +======= +void TCPTransportInterface::CloseOutputChannel( + SendResourceList& send_resource_list, + const LocatorList& remote_participant_locators, + const LocatorList& participant_initial_peers) const +{ + // Since send resources handle physical locators, we need to convert the remote participant locators to physical + std::set remote_participant_physical_locators; + for (const Locator& remote_participant_locator : remote_participant_locators) + { + remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(remote_participant_locator)); + + // Also add the WANtoLANLocator ([0][WAN] address) if the remote locator is a WAN locator. In WAN scenario, + //initial peer can also work with the WANtoLANLocator of the remote participant. + if (IPLocator::hasWan(remote_participant_locator)) + { + remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(IPLocator::WanToLanLocator( + remote_participant_locator))); + } + } + + // Exlude initial peers. + for (const auto& initial_peer : participant_initial_peers) + { + if (std::find(remote_participant_physical_locators.begin(), remote_participant_physical_locators.end(), + IPLocator::toPhysicalLocator(initial_peer)) != remote_participant_physical_locators.end()) + { + remote_participant_physical_locators.erase(IPLocator::toPhysicalLocator(initial_peer)); + } + } + + for (const auto& remote_participant_physical_locator : remote_participant_physical_locators) + { + if (!IsLocatorSupported(remote_participant_physical_locator)) + { + continue; + } + // Remove send resources for the associated remote participant locator + for (auto it = send_resource_list.begin(); it != send_resource_list.end();) + { + TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get()); + + if (tcp_sender_resource) + { + if (tcp_sender_resource->locator() == remote_participant_physical_locator) + { + it = send_resource_list.erase(it); + continue; + } + } + ++it; + } + } +} + +void TCPTransportInterface::send_channel_pending_logical_ports( + std::shared_ptr& channel) +{ + std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); + auto logical_ports = channel_pending_logical_ports_.find(channel->locator()); + if (logical_ports != channel_pending_logical_ports_.end()) + { + for (auto logical_port : logical_ports->second) + { + channel->add_logical_port(logical_port, rtcp_message_manager_.get()); + } + channel_pending_logical_ports_.erase(channel->locator()); + } +} + +>>>>>>> 8103cf042 (TCP first message loss (#4454)) } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index c56fe0245d3..25b34b75f10 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -118,6 +118,12 @@ class TCPTransportInterface : public TransportInterface eprosima::fastdds::statistics::rtps::OutputTrafficManager statistics_info_; + // Map containging the logical ports that must be added to a channel that has not been created yet. This could happen + // with acceptor channels that are created after their output channel has been opened (LARGE_DATA case). + // The key is the physical locator associated with the sender resource, and later to the channel. + std::map> channel_pending_logical_ports_; + std::mutex channel_pending_logical_ports_mutex_; + TCPTransportInterface( int32_t transport_kind); @@ -454,11 +460,34 @@ class TCPTransportInterface : public TransportInterface void fill_local_physical_port( Locator& locator) const; +<<<<<<< HEAD bool get_non_blocking_send() const { return non_blocking_send_; } +======= + /** + * Close the output channel associated to the given remote participant but if its locators belong to the + * given list of initial peers. + * + * @param send_resource_list List of send resources associated to the local participant. + * @param remote_participant_locators Set of locators associated to the remote participant. + * @param participant_initial_peers List of locators associated to the initial peers of the local participant. + */ + void CloseOutputChannel( + SendResourceList& send_resource_list, + const LocatorList& remote_participant_locators, + const LocatorList& participant_initial_peers) const; + + /** + * Method to add the logical ports associated to a channel that was not available + * when the logical ports were obtained. + * @param channel Channel that might add the logical ports if available. + */ + void send_channel_pending_logical_ports( + std::shared_ptr& channel); +>>>>>>> 8103cf042 (TCP first message loss (#4454)) }; } // namespace rtps diff --git a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp index 63d56ee9198..11bc4d76e2f 100644 --- a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp +++ b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp @@ -468,6 +468,9 @@ ResponseCode RTCPMessageManager::processBindConnectionRequest( sendData(channel, BIND_CONNECTION_RESPONSE, transaction_id, &payload, code); + // Add pending logical ports to the channel + mTransport->send_channel_pending_logical_ports(channel); + return RETCODE_OK; } diff --git a/src/cpp/rtps/xmlparser/XMLParser.cpp b/src/cpp/rtps/xmlparser/XMLParser.cpp index 189175e458d..b831a312515 100644 --- a/src/cpp/rtps/xmlparser/XMLParser.cpp +++ b/src/cpp/rtps/xmlparser/XMLParser.cpp @@ -243,6 +243,7 @@ XMLP_ret XMLParser::parseXMLTransportData( + @@ -366,7 +367,57 @@ XMLP_ret XMLParser::parseXMLTransportData( return ret; } } +<<<<<<< HEAD else +======= + } + + XMLProfileManager::insertTransportById(sId, pDescriptor); + return ret; +} + +XMLP_ret XMLParser::validateXMLTransportElements( + tinyxml2::XMLElement& p_root) +{ + XMLP_ret ret = XMLP_ret::XML_OK; + for (tinyxml2::XMLElement* p_aux0 = p_root.FirstChildElement(); p_aux0 != nullptr; + p_aux0 = p_aux0->NextSiblingElement()) + { + const char* name = p_aux0->Name(); + if (!(strcmp(name, TRANSPORT_ID) == 0 || + strcmp(name, TYPE) == 0 || + strcmp(name, SEND_BUFFER_SIZE) == 0 || + strcmp(name, RECEIVE_BUFFER_SIZE) == 0 || + strcmp(name, MAX_MESSAGE_SIZE) == 0 || + strcmp(name, MAX_INITIAL_PEERS_RANGE) == 0 || + strcmp(name, WHITE_LIST) == 0 || + strcmp(name, TTL) == 0 || + strcmp(name, NON_BLOCKING_SEND) == 0 || + strcmp(name, UDP_OUTPUT_PORT) == 0 || + strcmp(name, TCP_WAN_ADDR) == 0 || + strcmp(name, KEEP_ALIVE_FREQUENCY) == 0 || + strcmp(name, KEEP_ALIVE_TIMEOUT) == 0 || + strcmp(name, MAX_LOGICAL_PORT) == 0 || + strcmp(name, LOGICAL_PORT_RANGE) == 0 || + strcmp(name, LOGICAL_PORT_INCREMENT) == 0 || + strcmp(name, LISTENING_PORTS) == 0 || + strcmp(name, CALCULATE_CRC) == 0 || + strcmp(name, CHECK_CRC) == 0 || + strcmp(name, KEEP_ALIVE_THREAD) == 0 || + strcmp(name, ACCEPT_THREAD) == 0 || + strcmp(name, ENABLE_TCP_NODELAY) == 0 || + strcmp(name, TCP_NEGOTIATION_TIMEOUT) == 0 || + strcmp(name, TLS) == 0 || + strcmp(name, SEGMENT_SIZE) == 0 || + strcmp(name, PORT_QUEUE_CAPACITY) == 0 || + strcmp(name, HEALTHY_CHECK_TIMEOUT_MS) == 0 || + strcmp(name, RTPS_DUMP_FILE) == 0 || + strcmp(name, DEFAULT_RECEPTION_THREADS) == 0 || + strcmp(name, RECEPTION_THREADS) == 0 || + strcmp(name, DUMP_THREAD) == 0 || + strcmp(name, PORT_OVERFLOW_POLICY) == 0 || + strcmp(name, SEGMENT_OVERFLOW_POLICY) == 0)) +>>>>>>> 8103cf042 (TCP first message loss (#4454)) { EPROSIMA_LOG_ERROR(XMLPARSER, "Invalid transport type: '" << sType << "'"); return XMLP_ret::XML_ERROR; @@ -663,6 +714,16 @@ XMLP_ret XMLParser::parseXMLCommonTCPTransportData( "Invalid element found into 'rtpsTransportDescriptorType'. Name: " << name); return XMLP_ret::XML_ERROR; } + else if (strcmp(name, TCP_NEGOTIATION_TIMEOUT) == 0) + { + // tcp_negotiation_timeout - uint32Type + int iTimeout(0); + if (XMLP_ret::XML_OK != getXMLInt(p_aux0, &iTimeout, 0)) + { + return XMLP_ret::XML_ERROR; + } + pTCPDesc->tcp_negotiation_timeout = static_cast(iTimeout); + } } } else diff --git a/src/cpp/rtps/xmlparser/XMLParserCommon.cpp b/src/cpp/rtps/xmlparser/XMLParserCommon.cpp index d54b966560e..9fdf6a7e5cf 100644 --- a/src/cpp/rtps/xmlparser/XMLParserCommon.cpp +++ b/src/cpp/rtps/xmlparser/XMLParserCommon.cpp @@ -59,6 +59,12 @@ const char* METADATA_LOGICAL_PORT = "metadata_logical_port"; const char* LISTENING_PORTS = "listening_ports"; const char* CALCULATE_CRC = "calculate_crc"; const char* CHECK_CRC = "check_crc"; +<<<<<<< HEAD +======= +const char* KEEP_ALIVE_THREAD = "keep_alive_thread"; +const char* ACCEPT_THREAD = "accept_thread"; +const char* TCP_NEGOTIATION_TIMEOUT = "tcp_negotiation_timeout"; +>>>>>>> 8103cf042 (TCP first message loss (#4454)) const char* SEGMENT_SIZE = "segment_size"; const char* PORT_QUEUE_CAPACITY = "port_queue_capacity"; const char* PORT_OVERFLOW_POLICY = "port_overflow_policy"; diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 977714c09f3..29faaaf0174 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -956,7 +956,8 @@ class PubSubReader PubSubReader& setup_large_data_tcp( bool v6 = false, - const uint16_t& port = 0) + const uint16_t& port = 0, + const uint32_t& tcp_negotiation_timeout = 0) { participant_qos_.transport().use_builtin_transports = false; @@ -972,6 +973,11 @@ class PubSubReader auto data_transport = std::make_shared(); data_transport->add_listener_port(tcp_listening_port); + data_transport->calculate_crc = false; + data_transport->check_crc = false; + data_transport->apply_security = false; + data_transport->enable_tcp_nodelay = true; + data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; participant_qos_.transport().user_transports.push_back(data_transport); } else @@ -981,6 +987,11 @@ class PubSubReader auto data_transport = std::make_shared(); data_transport->add_listener_port(tcp_listening_port); + data_transport->calculate_crc = false; + data_transport->check_crc = false; + data_transport->apply_security = false; + data_transport->enable_tcp_nodelay = true; + data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; participant_qos_.transport().user_transports.push_back(data_transport); } diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 4f1eab7bdd9..5c4fb3ae52f 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -928,7 +928,8 @@ class PubSubWriter PubSubWriter& setup_large_data_tcp( bool v6 = false, - const uint16_t& port = 0) + const uint16_t& port = 0, + const uint32_t& tcp_negotiation_timeout = 0) { participant_qos_.transport().use_builtin_transports = false; @@ -944,6 +945,11 @@ class PubSubWriter auto data_transport = std::make_shared(); data_transport->add_listener_port(tcp_listening_port); + data_transport->calculate_crc = false; + data_transport->check_crc = false; + data_transport->apply_security = false; + data_transport->enable_tcp_nodelay = true; + data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; participant_qos_.transport().user_transports.push_back(data_transport); } else @@ -953,6 +959,11 @@ class PubSubWriter auto data_transport = std::make_shared(); data_transport->add_listener_port(tcp_listening_port); + data_transport->calculate_crc = false; + data_transport->check_crc = false; + data_transport->apply_security = false; + data_transport->enable_tcp_nodelay = true; + data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; participant_qos_.transport().user_transports.push_back(data_transport); } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 9eafaab8da2..18f450e5ac1 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -744,7 +744,8 @@ class PubSubReader PubSubReader& setup_large_data_tcp( bool v6 = false, - const uint16_t& port = 0) + const uint16_t& port = 0, + const uint32_t& tcp_negotiation_timeout = 0) { participant_attr_.rtps.useBuiltinTransports = false; @@ -760,6 +761,11 @@ class PubSubReader auto data_transport = std::make_shared(); data_transport->add_listener_port(tcp_listening_port); + data_transport->calculate_crc = false; + data_transport->check_crc = false; + data_transport->apply_security = false; + data_transport->enable_tcp_nodelay = true; + data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; participant_attr_.rtps.userTransports.push_back(data_transport); } else @@ -769,6 +775,11 @@ class PubSubReader auto data_transport = std::make_shared(); data_transport->add_listener_port(tcp_listening_port); + data_transport->calculate_crc = false; + data_transport->check_crc = false; + data_transport->apply_security = false; + data_transport->enable_tcp_nodelay = true; + data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; participant_attr_.rtps.userTransports.push_back(data_transport); } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp index cf7fbe4f3b6..af0070f9a99 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp @@ -756,7 +756,8 @@ class PubSubWriter PubSubWriter& setup_large_data_tcp( bool v6 = false, - const uint16_t& port = 0) + const uint16_t& port = 0, + const uint32_t& tcp_negotiation_timeout = 0) { participant_attr_.rtps.useBuiltinTransports = false; @@ -772,6 +773,11 @@ class PubSubWriter auto data_transport = std::make_shared(); data_transport->add_listener_port(tcp_listening_port); + data_transport->calculate_crc = false; + data_transport->check_crc = false; + data_transport->apply_security = false; + data_transport->enable_tcp_nodelay = true; + data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; participant_attr_.rtps.userTransports.push_back(data_transport); } else @@ -781,6 +787,11 @@ class PubSubWriter auto data_transport = std::make_shared(); data_transport->add_listener_port(tcp_listening_port); + data_transport->calculate_crc = false; + data_transport->check_crc = false; + data_transport->apply_security = false; + data_transport->enable_tcp_nodelay = true; + data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; participant_attr_.rtps.userTransports.push_back(data_transport); } diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index b6f31643955..b291b1380e5 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -838,6 +838,491 @@ TEST_P(TransportTCP, large_data_topology) writers.clear(); } +<<<<<<< HEAD +======= +// This test verifies that if having a server with several listening ports, only the first one is used. +TEST_P(TransportTCP, multiple_listening_ports) +{ + // Create a server with several listening ports + PubSubReader* server = new PubSubReader(TEST_TOPIC_NAME); + uint16_t server_port_1 = 10000; + uint16_t server_port_2 = 10001; + + std::shared_ptr server_transport; + if (use_ipv6) + { + server_transport = std::make_shared(); + } + else + { + server_transport = std::make_shared(); + } + server_transport->add_listener_port(server_port_1); + server_transport->add_listener_port(server_port_2); + server->disable_builtin_transport().add_user_transport_to_pparams(server_transport).init(); + ASSERT_TRUE(server->isInitialized()); + + // Create two clients each one connecting to a different port + PubSubWriter* client_1 = new PubSubWriter(TEST_TOPIC_NAME); + PubSubWriter* client_2 = new PubSubWriter(TEST_TOPIC_NAME); + std::shared_ptr client_transport_1; + std::shared_ptr client_transport_2; + Locator_t initialPeerLocator_1; + Locator_t initialPeerLocator_2; + if (use_ipv6) + { + client_transport_1 = std::make_shared(); + client_transport_2 = std::make_shared(); + initialPeerLocator_1.kind = LOCATOR_KIND_TCPv6; + initialPeerLocator_2.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator_1, "::1"); + IPLocator::setIPv6(initialPeerLocator_2, "::1"); + } + else + { + client_transport_1 = std::make_shared(); + client_transport_2 = std::make_shared(); + initialPeerLocator_1.kind = LOCATOR_KIND_TCPv4; + initialPeerLocator_2.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator_1, 127, 0, 0, 1); + IPLocator::setIPv4(initialPeerLocator_2, 127, 0, 0, 1); + } + client_1->disable_builtin_transport().add_user_transport_to_pparams(client_transport_1); + client_2->disable_builtin_transport().add_user_transport_to_pparams(client_transport_2); + initialPeerLocator_1.port = server_port_1; + initialPeerLocator_2.port = server_port_2; + LocatorList_t initial_peer_list_1; + LocatorList_t initial_peer_list_2; + initial_peer_list_1.push_back(initialPeerLocator_1); + initial_peer_list_2.push_back(initialPeerLocator_2); + client_1->initial_peers(initial_peer_list_1); + client_2->initial_peers(initial_peer_list_2); + client_1->init(); + client_2->init(); + ASSERT_TRUE(client_1->isInitialized()); + ASSERT_TRUE(client_2->isInitialized()); + + // Wait for discovery. + server->wait_discovery(); + client_1->wait_discovery(); + client_2->wait_discovery(std::chrono::seconds(1)); + EXPECT_EQ(server->get_matched(), 1U); + EXPECT_EQ(client_1->get_matched(), 1U); + EXPECT_EQ(client_2->get_matched(), 0U); + + // Send data + auto data = default_helloworld_data_generator(); + server->startReception(data); + client_1->send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block server until reception finished. + server->block_for_all(); + // Wait for all data to be acked. + EXPECT_TRUE(client_1->waitForAllAcked(std::chrono::milliseconds(100))); + + // Release TCP client and server resources. + delete client_1; + delete client_2; + delete server; +} + +// Test TCP send resource cleaning. This test matches a server with a client and then releases the +// client resources. After PDP unbind message, the server removes the client +// from the send resource list. +TEST_P(TransportTCP, send_resource_cleanup) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + + std::unique_ptr> client(new PubSubWriter(TEST_TOPIC_NAME)); + std::unique_ptr> udp_participant(new PubSubWriter( + TEST_TOPIC_NAME)); + std::unique_ptr> server(new PubSubReader(TEST_TOPIC_NAME)); + + // Server + // Create a server with two transports, one of which uses a DatagramInjectionTransportDescriptor + // which heritates from ChainingTransportDescriptor. The low level transport of this chaining transport will be UDP. + // This will allow us to get send_resource_list_ from the server participant when UDP transport gets its OpenOutputChannel() + // method called. This should happen after TCP transports connection is established. We can then see how many TCP send + // resources exist. + // For the cleanup test we follow that same procedure. Firstly we destroy both participants and then instantiate a new + // UDP participant. The send resource list will get updated with no TCP send resource. + // __________________________________________________________ _____________________ + // | Server | | Client | + // | | | | + // | SendResourceList | | | + // | | | | | + // | Empty | | | + // | | | | | + // | | - TCPv4 init() | | | + // | | | | | + // | | - ChainingTransport(UDP) init() | | | + // | | | | | + // | 1 TCP <------------------------------------------------- TCPv4 init() | + // | | | | | + // | 1 TCP + 1 UDP <------------------------------------------------- UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | | | | | + // | Empty <-------------------------------------------------- clean transports | + // | | | | | + // | 1 UDP - ChainingTransport(UDP) <------------------------ UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 0 get_send_resource_list() | | | + // |__________________________________________________________| |_____________________| + // + uint16_t server_port = 10000; + test_transport_->add_listener_port(server_port); + auto low_level_transport = std::make_shared(); + auto server_chaining_transport = std::make_shared(low_level_transport); + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport_).add_user_transport_to_pparams( + server_chaining_transport).init(); + ASSERT_TRUE(server->isInitialized()); + + // Client + auto initialize_client = [&](PubSubWriter* client) + { + std::shared_ptr client_transport; + Locator_t initialPeerLocator; + if (use_ipv6) + { + client_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + client_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + client->disable_builtin_transport().add_user_transport_to_pparams(client_transport); + initialPeerLocator.port = server_port; + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + client->initial_peers(initial_peer_list); + client->init(); + }; + auto initialize_udp_participant = [&](PubSubWriter* udp_participant) + { + auto udp_participant_transport = std::make_shared(); + udp_participant->disable_builtin_transport().add_user_transport_to_pparams(udp_participant_transport); + udp_participant->init(); + }; + initialize_client(client.get()); + ASSERT_TRUE(client->isInitialized()); + + // Wait for discovery. OpenOutputChannel() is called. We create a udp participant after to guarantee + // that the TCP participants have been mutually discovered when OpenOutputChannel() is called. + server->wait_discovery(std::chrono::seconds(0), 1); + client->wait_discovery(1, std::chrono::seconds(0)); + + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 2); + udp_participant->wait_discovery(1, std::chrono::seconds(0)); + + // We can only update the senders when OpenOutputChannel() is called. If the send resource + // is deleted later, senders obtained from get_send_resource_list() won't have changed. + auto send_resource_list = server_chaining_transport->get_send_resource_list(); + auto tcp_send_resources = [](const std::set& send_resource_list) -> size_t + { + size_t tcp_send_resources = 0; + for (auto& sender_resource : send_resource_list) + { + if (sender_resource->kind() == LOCATOR_KIND_TCPv4 || sender_resource->kind() == LOCATOR_KIND_TCPv6) + { + tcp_send_resources++; + } + } + return tcp_send_resources; + }; + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + + // Release TCP client resources. + client.reset(); + udp_participant.reset(); + + // Wait for undiscovery. + server->wait_writer_undiscovery(); + + // Create new udp client. + udp_participant.reset(new PubSubWriter(TEST_TOPIC_NAME)); + + // Wait for discovery. OpenOutputChannel() is called and we can update the senders. + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 1); + udp_participant->wait_discovery(1, std::chrono::seconds(0)); + + // Check that the send_resource_list has size 0. This means that the send resource + // for the client has been removed. + send_resource_list = server_chaining_transport->get_send_resource_list(); + EXPECT_EQ(tcp_send_resources(send_resource_list), 0); + send_resource_list.clear(); +} + +// Test TCP send resource cleaning. In this case, since the send resource has been created from an initial_peer, +// the send resource should not be removed. +TEST_P(TransportTCP, send_resource_cleanup_initial_peer) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + + std::unique_ptr> client(new PubSubWriter(TEST_TOPIC_NAME)); + std::unique_ptr> udp_participant(new PubSubReader( + TEST_TOPIC_NAME)); + std::unique_ptr> server(new PubSubReader(TEST_TOPIC_NAME)); + + // Client + // Create a client with two transports, one of which uses a DatagramInjectionTransportDescriptor + // which heritates from ChainingTransportDescriptor. This will allow us to get send_resource_list_ + // from the client participant when its transport gets its OpenOutputChannel() method called. + + // __________________________________________________________ _____________________ + // | Server | | Client | + // | | | | + // | SendResourceList | | | + // | | | | | + // | Empty | | | + // | | | | | + // | | - TCPv4 init() | | | + // | | | | | + // | | - ChainingTransport(UDP) init() | | | + // | | | | | + // | 1 TCP <------------------------------------------------- TCPv4 init() | + // | | | | | + // | 1 TCP + 1 UDP <------------------------------------------------- UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | | | | | + // | 1 TCP (initial peer) <-------------------------------------------------- clean transports | + // | | | | | + // | 1 TCP + 1 UDP - ChainingTransport(UDP) <------------------------ UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | (initial peer) | | | + // |__________________________________________________________| |_____________________| + // + + uint16_t server_port = 10000; + LocatorList_t initial_peer_list; + Locator_t initialPeerLocator; + if (use_ipv6) + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + initialPeerLocator.port = server_port; + initial_peer_list.push_back(initialPeerLocator); + client->initial_peers(initial_peer_list); + + auto low_level_transport = std::make_shared(); + auto client_chaining_transport = std::make_shared(low_level_transport); + client->disable_builtin_transport().add_user_transport_to_pparams(test_transport_).add_user_transport_to_pparams( + client_chaining_transport).init(); + ASSERT_TRUE(client->isInitialized()); + + // Server + auto initialize_server = [&](PubSubReader* server) + { + std::shared_ptr server_transport; + if (use_ipv6) + { + server_transport = std::make_shared(); + } + else + { + server_transport = std::make_shared(); + } + server_transport->add_listener_port(server_port); + server->disable_builtin_transport().add_user_transport_to_pparams(server_transport); + server->init(); + }; + auto initialize_udp_participant = [&](PubSubReader* udp_participant) + { + auto udp_participant_transport = std::make_shared(); + udp_participant->disable_builtin_transport().add_user_transport_to_pparams(udp_participant_transport); + udp_participant->init(); + }; + initialize_server(server.get()); + ASSERT_TRUE(server->isInitialized()); + + // Wait for discovery. OpenOutputChannel() is called. We create a udp participant after to guarantee + // that the TCP participants have been mutually discovered when OpenOutputChannel() is called. + client->wait_discovery(1, std::chrono::seconds(0)); + server->wait_discovery(std::chrono::seconds(0), 1); + + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + client->wait_discovery(2, std::chrono::seconds(0)); + udp_participant->wait_discovery(std::chrono::seconds(0), 1); + + // We can only update the senders when OpenOutputChannel() is called. If the send resource + // is deleted later, senders obtained from get_send_resource_list() won't have changed. + auto send_resource_list = client_chaining_transport->get_send_resource_list(); + auto tcp_send_resources = [](const std::set& send_resource_list) -> size_t + { + size_t tcp_send_resources = 0; + for (auto& sender_resource : send_resource_list) + { + if (sender_resource->kind() == LOCATOR_KIND_TCPv4 || sender_resource->kind() == LOCATOR_KIND_TCPv6) + { + tcp_send_resources++; + } + } + return tcp_send_resources; + }; + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + + // Release TCP client resources. + server.reset(); + udp_participant.reset(); + + // Wait for undiscovery. + client->wait_reader_undiscovery(); + + // Create new client instances. + udp_participant.reset(new PubSubReader(TEST_TOPIC_NAME)); + + // Wait for discovery. OpenOutputChannel() is called and we can update the senders. + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + client->wait_discovery(1, std::chrono::seconds(0)); + udp_participant->wait_discovery(std::chrono::seconds(0), 1); + + // Check that the send_resource_list has size 1. This means that the send resource + // for the first client hasn't been removed because it was created from an initial_peer. + send_resource_list = client_chaining_transport->get_send_resource_list(); + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + send_resource_list.clear(); + + // If relaunching the server, the client should connect again. + server.reset(new PubSubReader(TEST_TOPIC_NAME)); + initialize_server(server.get()); + ASSERT_TRUE(server->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 1); + client->wait_discovery(2, std::chrono::seconds(0)); +} + +// Test TCP transport on large message with best effort reliability +TEST_P(TransportTCP, large_message_send_receive) +{ + // Prepare data to be sent before participants discovery so it is ready to be sent as soon as possible. + std::list data; + data = default_data300kb_data_generator(1); + + uint16_t writer_port = global_port; + + /* Test configuration */ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + std::shared_ptr writer_transport; + std::shared_ptr reader_transport; + Locator_t initialPeerLocator; + if (use_ipv6) + { + reader_transport = std::make_shared(); + writer_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + reader_transport = std::make_shared(); + writer_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + writer_transport->tcp_negotiation_timeout = 100; + reader_transport->tcp_negotiation_timeout = 100; + + // Add listener port to server + writer_transport->add_listener_port(writer_port); + + // Add initial peer to client + initialPeerLocator.port = writer_port; + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + + // Setup participants + writer.disable_builtin_transport() + .add_user_transport_to_pparams(writer_transport); + + reader.disable_builtin_transport() + .initial_peers(initial_peer_list) + .add_user_transport_to_pparams(reader_transport); + + // Init participants + writer.init(); + reader.init(); + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(1, std::chrono::seconds(0)); + reader.wait_discovery(std::chrono::seconds(0), 1); + + // Send and receive data + reader.startReception(data); + + writer.send(data); + EXPECT_TRUE(data.empty()); + + reader.block_for_all(); +} + +// Test TCP transport on large message with best effort reliability and LARGE_DATA mode +TEST_P(TransportTCP, large_message_large_data_send_receive) +{ + // Prepare data to be sent. before participants discovery so it is ready to be sent as soon as possible. + // The writer might try to send the data before the reader has negotiated the connection. + // If the negotiation timeout is too short, the writer will fail to send the data and the reader will not receive it. + // LARGE_DATA participant discovery is tipically faster than tcp negotiation. + std::list data; + data = default_data300kb_data_generator(1); + + /* Test configuration */ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + uint32_t tcp_negotiation_timeout = 100; + writer.setup_large_data_tcp(use_ipv6, 0, tcp_negotiation_timeout); + reader.setup_large_data_tcp(use_ipv6, 0, tcp_negotiation_timeout); + + // Init participants + writer.init(); + reader.init(); + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(1, std::chrono::seconds(0)); + reader.wait_discovery(std::chrono::seconds(0), 1); + + // Send and receive data + reader.startReception(data); + + writer.send(data); + EXPECT_TRUE(data.empty()); + + reader.block_for_all(); +} + +>>>>>>> 8103cf042 (TCP first message loss (#4454)) #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/mock/rtps/TCPTransportDescriptor/fastrtps/transport/TCPTransportDescriptor.h b/test/mock/rtps/TCPTransportDescriptor/fastrtps/transport/TCPTransportDescriptor.h index ef1d97811c1..c45fb2ca3e1 100644 --- a/test/mock/rtps/TCPTransportDescriptor/fastrtps/transport/TCPTransportDescriptor.h +++ b/test/mock/rtps/TCPTransportDescriptor/fastrtps/transport/TCPTransportDescriptor.h @@ -175,6 +175,14 @@ typedef struct TCPTransportDescriptor : public SocketTransportDescriptor TLSConfig tls_config; +<<<<<<< HEAD +======= + fastdds::rtps::ThreadSettings keep_alive_thread; + fastdds::rtps::ThreadSettings accept_thread; + + uint32_t tcp_negotiation_timeout; + +>>>>>>> 8103cf042 (TCP first message loss (#4454)) void add_listener_port( uint16_t port) { diff --git a/test/system/tools/xmlvalidation/XMLTesterExample_profile.xml b/test/system/tools/xmlvalidation/XMLTesterExample_profile.xml index c559ea5af1c..4e7d23291de 100644 --- a/test/system/tools/xmlvalidation/XMLTesterExample_profile.xml +++ b/test/system/tools/xmlvalidation/XMLTesterExample_profile.xml @@ -51,6 +51,11 @@ false false false +<<<<<<< HEAD +======= + false + 100 +>>>>>>> 8103cf042 (TCP first message loss (#4454)) diff --git a/test/system/tools/xmlvalidation/all_profile.xml b/test/system/tools/xmlvalidation/all_profile.xml index 84bc8b4e8e3..0be4d70639f 100644 --- a/test/system/tools/xmlvalidation/all_profile.xml +++ b/test/system/tools/xmlvalidation/all_profile.xml @@ -888,6 +888,11 @@ false false false +<<<<<<< HEAD +======= + false + 100 +>>>>>>> 8103cf042 (TCP first message loss (#4454)) @@ -938,6 +943,7 @@ false false false + 100 diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 800a4c218bb..4b937a54d57 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -2073,6 +2073,107 @@ TEST_F(TCPv4Tests, opening_output_channel_with_same_locator_as_local_listening_p ASSERT_EQ(send_resource_list.size(), 2); } +// This test verifies the logical port passed to OpenOutputChannel is correctly added to the channel pending list or the +// trasnport's pending channel logical ports map. +TEST_F(TCPv4Tests, add_logical_port_on_send_resource_creation) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + // TCP Client + { + uint16_t port = 12345; + TCPv4TransportDescriptor clientDescriptor; + std::unique_ptr clientTransportUnderTest(new MockTCPv4Transport(clientDescriptor)); + clientTransportUnderTest->init(); + + // Add initial peer to the client + Locator_t initialPeerLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", port, initialPeerLocator); + IPLocator::setLogicalPort(initialPeerLocator, 7410); + + // OpenOutputChannel + SendResourceList client_resource_list; + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator)); + IPLocator::setLogicalPort(initialPeerLocator, 7411); + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator)); + ASSERT_FALSE(client_resource_list.empty()); + auto channel = clientTransportUnderTest->get_channel_resources().begin()->second; + ASSERT_TRUE(channel->is_logical_port_added(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7411)); + auto channel_pending_logical_ports = clientTransportUnderTest->get_channel_pending_logical_ports(); + ASSERT_TRUE(channel_pending_logical_ports.empty()); + + client_resource_list.clear(); + } + + // TCP Server - LARGE_DATA + { + uint16_t port = 12345; + // Discovered participant physical port has to have a lower value than the listening port to behave as a server + uint16_t participantPhysicalLocator = 12344; + // Create a TCP Server transport + TCPv4TransportDescriptor serverDescriptor; + serverDescriptor.add_listener_port(port); + std::unique_ptr serverTransportUnderTest(new MockTCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + + // Add participant discovered (from UDP discovery for example) + Locator_t discoveredParticipantLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", participantPhysicalLocator, + discoveredParticipantLocator); + IPLocator::setLogicalPort(discoveredParticipantLocator, 7410); + + // OpenOutputChannel + SendResourceList server_resource_list; + ASSERT_TRUE(serverTransportUnderTest->OpenOutputChannel(server_resource_list, discoveredParticipantLocator)); + IPLocator::setLogicalPort(discoveredParticipantLocator, 7411); + ASSERT_TRUE(serverTransportUnderTest->OpenOutputChannel(server_resource_list, discoveredParticipantLocator)); + ASSERT_FALSE(server_resource_list.empty()); + ASSERT_TRUE(serverTransportUnderTest->get_channel_resources().empty()); + auto channel_pending_logical_ports = serverTransportUnderTest->get_channel_pending_logical_ports(); + ASSERT_EQ(channel_pending_logical_ports.size(), 1); + ASSERT_EQ(channel_pending_logical_ports.begin()->second.size(), 2); + ASSERT_TRUE(channel_pending_logical_ports.begin()->second.find( + 7410) != channel_pending_logical_ports.begin()->second.end()); + ASSERT_TRUE(channel_pending_logical_ports.begin()->second.find( + 7411) != channel_pending_logical_ports.begin()->second.end()); + + server_resource_list.clear(); + } + + // TCP Client - LARGE_DATA + { + uint16_t port = 12345; + // Discovered participant physical port has to have a larger value than the listening port to behave as a client + uint16_t participantPhysicalLocator = 12346; + // Create a TCP Client transport + TCPv4TransportDescriptor clientDescriptor; + clientDescriptor.add_listener_port(port); + std::unique_ptr clientTransportUnderTest(new MockTCPv4Transport(clientDescriptor)); + clientTransportUnderTest->init(); + + // Add participant discovered (from UDP discovery for example) + Locator_t discoveredParticipantLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", participantPhysicalLocator, + discoveredParticipantLocator); + IPLocator::setLogicalPort(discoveredParticipantLocator, 7410); + + // OpenOutputChannel + SendResourceList client_resource_list; + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, discoveredParticipantLocator)); + IPLocator::setLogicalPort(discoveredParticipantLocator, 7411); + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, discoveredParticipantLocator)); + ASSERT_FALSE(client_resource_list.empty()); + auto channel = clientTransportUnderTest->get_channel_resources().begin()->second; + ASSERT_TRUE(channel->is_logical_port_added(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7411)); + auto channel_pending_logical_ports = clientTransportUnderTest->get_channel_pending_logical_ports(); + ASSERT_TRUE(channel_pending_logical_ports.empty()); + + client_resource_list.clear(); + } +} + void TCPv4Tests::HELPER_SetDescriptorDefaults() { descriptor.add_listener_port(g_default_port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index 14793895937..658050a8641 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -501,12 +501,116 @@ TEST_F(TCPv6Tests, opening_output_channel_with_same_locator_as_local_listening_p ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(outputLocator)); octet message[5] = { 'H','e','l','l','o' }; +<<<<<<< HEAD Semaphore sem; std::function recCallback = [&]() { EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); sem.post(); }; +======= +// This test verifies the logical port passed to OpenOutputChannel is correctly added to the channel pending list or the +// trasnport's pending channel logical ports map. +TEST_F(TCPv6Tests, add_logical_port_on_send_resource_creation) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + // TCP Client + { + uint16_t port = 12345; + TCPv6TransportDescriptor clientDescriptor; + std::unique_ptr clientTransportUnderTest(new MockTCPv6Transport(clientDescriptor)); + clientTransportUnderTest->init(); + + // Add initial peer to the client + Locator_t initialPeerLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", port, initialPeerLocator); + IPLocator::setLogicalPort(initialPeerLocator, 7410); + + // OpenOutputChannel + SendResourceList client_resource_list; + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator)); + IPLocator::setLogicalPort(initialPeerLocator, 7411); + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator)); + ASSERT_FALSE(client_resource_list.empty()); + auto channel = clientTransportUnderTest->get_channel_resources().begin()->second; + ASSERT_TRUE(channel->is_logical_port_added(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7411)); + auto channel_pending_logical_ports = clientTransportUnderTest->get_channel_pending_logical_ports(); + ASSERT_TRUE(channel_pending_logical_ports.empty()); + + client_resource_list.clear(); + } + + // TCP Server - LARGE_DATA + { + uint16_t port = 12345; + // Discovered participant physical port has to have a lower value than the listening port to behave as a server + uint16_t participantPhysicalLocator = 12344; + // Create a TCP Server transport + TCPv6TransportDescriptor serverDescriptor; + serverDescriptor.add_listener_port(port); + std::unique_ptr serverTransportUnderTest(new MockTCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + + // Add participant discovered (from UDP discovery for example) + Locator_t discoveredParticipantLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", participantPhysicalLocator, discoveredParticipantLocator); + IPLocator::setLogicalPort(discoveredParticipantLocator, 7410); + + // OpenOutputChannel + SendResourceList server_resource_list; + ASSERT_TRUE(serverTransportUnderTest->OpenOutputChannel(server_resource_list, discoveredParticipantLocator)); + IPLocator::setLogicalPort(discoveredParticipantLocator, 7411); + ASSERT_TRUE(serverTransportUnderTest->OpenOutputChannel(server_resource_list, discoveredParticipantLocator)); + ASSERT_FALSE(server_resource_list.empty()); + ASSERT_TRUE(serverTransportUnderTest->get_channel_resources().empty()); + auto channel_pending_logical_ports = serverTransportUnderTest->get_channel_pending_logical_ports(); + ASSERT_EQ(channel_pending_logical_ports.size(), 1); + ASSERT_EQ(channel_pending_logical_ports.begin()->second.size(), 2); + ASSERT_TRUE(channel_pending_logical_ports.begin()->second.find( + 7410) != channel_pending_logical_ports.begin()->second.end()); + ASSERT_TRUE(channel_pending_logical_ports.begin()->second.find( + 7411) != channel_pending_logical_ports.begin()->second.end()); + + server_resource_list.clear(); + } + + // TCP Client - LARGE_DATA + { + uint16_t port = 12345; + // Discovered participant physical port has to have a larger value than the listening port to behave as a client + uint16_t participantPhysicalLocator = 12346; + // Create a TCP Client transport + TCPv6TransportDescriptor clientDescriptor; + clientDescriptor.add_listener_port(port); + std::unique_ptr clientTransportUnderTest(new MockTCPv6Transport(clientDescriptor)); + clientTransportUnderTest->init(); + + // Add participant discovered (from UDP discovery for example) + Locator_t discoveredParticipantLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", participantPhysicalLocator, discoveredParticipantLocator); + IPLocator::setLogicalPort(discoveredParticipantLocator, 7410); + + // OpenOutputChannel + SendResourceList client_resource_list; + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, discoveredParticipantLocator)); + IPLocator::setLogicalPort(discoveredParticipantLocator, 7411); + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, discoveredParticipantLocator)); + ASSERT_FALSE(client_resource_list.empty()); + auto channel = clientTransportUnderTest->get_channel_resources().begin()->second; + ASSERT_TRUE(channel->is_logical_port_added(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7411)); + auto channel_pending_logical_ports = clientTransportUnderTest->get_channel_pending_logical_ports(); + ASSERT_TRUE(channel_pending_logical_ports.empty()); + + client_resource_list.clear(); + } +} + +// TODO: TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) +// TODO: TEST_F(TCPv6Tests, send_and_receive_between_ports) +>>>>>>> 8103cf042 (TCP first message loss (#4454)) msg_recv->setCallback(recCallback); diff --git a/test/unittest/transport/mock/MockTCPv4Transport.h b/test/unittest/transport/mock/MockTCPv4Transport.h index 08569dbd08a..e2aa8e0e9b1 100644 --- a/test/unittest/transport/mock/MockTCPv4Transport.h +++ b/test/unittest/transport/mock/MockTCPv4Transport.h @@ -65,6 +65,11 @@ class MockTCPv4Transport : public TCPv4Transport return TCPv4Transport::send(send_buffer, send_buffer_size, send_resource_locator, remote_locator); } + const std::map>& get_channel_pending_logical_ports() const + { + return channel_pending_logical_ports_; + } + }; } // namespace rtps diff --git a/test/unittest/transport/mock/MockTCPv6Transport.h b/test/unittest/transport/mock/MockTCPv6Transport.h index 3ecbfaf3db9..cc23f039a5a 100644 --- a/test/unittest/transport/mock/MockTCPv6Transport.h +++ b/test/unittest/transport/mock/MockTCPv6Transport.h @@ -65,6 +65,11 @@ class MockTCPv6Transport : public TCPv6Transport return TCPv6Transport::send(send_buffer, send_buffer_size, send_resource_locator, remote_locator); } + const std::map>& get_channel_pending_logical_ports() const + { + return channel_pending_logical_ports_; + } + }; } // namespace rtps diff --git a/test/unittest/xmlparser/XMLParserTests.cpp b/test/unittest/xmlparser/XMLParserTests.cpp index 4867b6fffc8..1671986b85a 100644 --- a/test/unittest/xmlparser/XMLParserTests.cpp +++ b/test/unittest/xmlparser/XMLParserTests.cpp @@ -969,6 +969,11 @@ TEST_F(XMLParserTests, parseXMLTransportData) false\ false\ false\ +<<<<<<< HEAD +======= + true\ + 100\ +>>>>>>> 8103cf042 (TCP first message loss (#4454)) \ \ "; @@ -1002,6 +1007,16 @@ TEST_F(XMLParserTests, parseXMLTransportData) EXPECT_EQ(pTCPv4Desc->logical_port_increment, 2u); EXPECT_EQ(pTCPv4Desc->listening_ports[0], 5100u); EXPECT_EQ(pTCPv4Desc->listening_ports[1], 5200u); +<<<<<<< HEAD +======= + EXPECT_EQ(pTCPv4Desc->keep_alive_thread, modified_thread_settings); + EXPECT_EQ(pTCPv4Desc->non_blocking_send, true); + EXPECT_EQ(pTCPv4Desc->accept_thread, modified_thread_settings); + EXPECT_EQ(pTCPv4Desc->tcp_negotiation_timeout, 100u); + EXPECT_EQ(pTCPv4Desc->default_reception_threads(), modified_thread_settings); + EXPECT_EQ(pTCPv4Desc->get_thread_config_for_port(12345), modified_thread_settings); + EXPECT_EQ(pTCPv4Desc->get_thread_config_for_port(12346), modified_thread_settings); +>>>>>>> 8103cf042 (TCP first message loss (#4454)) xmlparser::XMLProfileManager::DeleteInstance(); // TCPv6 @@ -1027,6 +1042,16 @@ TEST_F(XMLParserTests, parseXMLTransportData) EXPECT_EQ(pTCPv6Desc->logical_port_increment, 2u); EXPECT_EQ(pTCPv6Desc->listening_ports[0], 5100u); EXPECT_EQ(pTCPv6Desc->listening_ports[1], 5200u); +<<<<<<< HEAD +======= + EXPECT_EQ(pTCPv6Desc->keep_alive_thread, modified_thread_settings); + EXPECT_EQ(pTCPv6Desc->non_blocking_send, true); + EXPECT_EQ(pTCPv6Desc->accept_thread, modified_thread_settings); + EXPECT_EQ(pTCPv6Desc->tcp_negotiation_timeout, 100u); + EXPECT_EQ(pTCPv6Desc->default_reception_threads(), modified_thread_settings); + EXPECT_EQ(pTCPv6Desc->get_thread_config_for_port(12345), modified_thread_settings); + EXPECT_EQ(pTCPv6Desc->get_thread_config_for_port(12346), modified_thread_settings); +>>>>>>> 8103cf042 (TCP first message loss (#4454)) xmlparser::XMLProfileManager::DeleteInstance(); } @@ -1113,6 +1138,14 @@ TEST_F(XMLParserTests, parseXMLTransportData_NegativeClauses) "check_crc", "enable_tcp_nodelay", "tls", +<<<<<<< HEAD +======= + "keep_alive_thread", + "accept_thread", + "tcp_negotiation_timeout", + "default_reception_threads", + "reception_threads", +>>>>>>> 8103cf042 (TCP first message loss (#4454)) "bad_element" };