From 5b6bc33698294ca5f5dd3263264b3d5e9422c45b Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 17 Apr 2024 16:33:29 +0200 Subject: [PATCH] Internal refactor on port handling (#3440) * Refs #18002. Avoid mutation of multicast ports. Signed-off-by: Miguel Company * Refs #18002. Move security initalization up. Signed-off-by: Miguel Company * Refs #18002. Prepare for refactor of locators setup. Signed-off-by: Miguel Company * Refs #18002. Refactor metatraffic related code. Signed-off-by: Miguel Company * Refs #18002. Refactor initial peers related code. Signed-off-by: Miguel Company * Refs #18002. Refactor user traffic related code. Signed-off-by: Miguel Company * Refs #18002. Refactor output traffic related code. Signed-off-by: Miguel Company * Refs #18002. Safe metatraffic unicast port on a new attribute. Signed-off-by: Miguel Company * Refs #18002. User unicast port calculated on participant instead of NetworkFactory. Signed-off-by: Miguel Company * Refs #18002. Update metatraffic_unicast_port_ inside applyLocatorAdaptRule. Signed-off-by: Miguel Company * Apply suggestion Signed-off-by: Miguel Company * Refs #18002. Always create unicast receiver resources first. Signed-off-by: Miguel Company * Refs #18002. Improve warning message. Signed-off-by: Miguel Company * Refs #18002. Refactor createReceiverResources Signed-off-by: Miguel Company * Refs #18002. Improve SHM.SamePortUnicastMulticast blackbox test Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company Signed-off-by: Miguel Company (cherry picked from commit ea5af48) --- src/cpp/rtps/network/NetworkFactory.cpp | 11 +- src/cpp/rtps/network/NetworkFactory.h | 23 +- .../rtps/participant/RTPSParticipantImpl.cpp | 252 ++++++++++-------- .../rtps/participant/RTPSParticipantImpl.h | 7 + .../common/BlackboxTestsTransportSHM.cpp | 12 +- 5 files changed, 177 insertions(+), 128 deletions(-) diff --git a/src/cpp/rtps/network/NetworkFactory.cpp b/src/cpp/rtps/network/NetworkFactory.cpp index 1773038cfa4..b85e9ba7fbd 100644 --- a/src/cpp/rtps/network/NetworkFactory.cpp +++ b/src/cpp/rtps/network/NetworkFactory.cpp @@ -402,30 +402,27 @@ bool NetworkFactory::configureInitialPeerLocator( } bool NetworkFactory::getDefaultUnicastLocators( - uint32_t domain_id, LocatorList_t& locators, - const RTPSParticipantAttributes& m_att) const + uint32_t port) const { bool result = false; for (auto& transport : mRegisteredTransports) { - result |= transport->getDefaultUnicastLocators(locators, calculate_well_known_port(domain_id, m_att, false)); + result |= transport->getDefaultUnicastLocators(locators, port); } return result; } bool NetworkFactory::fill_default_locator_port( - uint32_t domain_id, Locator_t& locator, - const RTPSParticipantAttributes& m_att, - bool is_multicast) const + uint32_t port) const { bool result = false; for (auto& transport : mRegisteredTransports) { if (transport->IsLocatorSupported(locator)) { - result |= transport->fillUnicastLocator(locator, calculate_well_known_port(domain_id, m_att, is_multicast)); + result |= transport->fillUnicastLocator(locator, port); } } return result; diff --git a/src/cpp/rtps/network/NetworkFactory.h b/src/cpp/rtps/network/NetworkFactory.h index 1a679b66754..e085b4ea76a 100644 --- a/src/cpp/rtps/network/NetworkFactory.h +++ b/src/cpp/rtps/network/NetworkFactory.h @@ -224,18 +224,15 @@ class NetworkFactory * Add locators to the default unicast configuration. * */ bool getDefaultUnicastLocators( - uint32_t domain_id, LocatorList_t& locators, - const RTPSParticipantAttributes& m_att) const; + uint32_t port) const; /** * Fill the locator with the default unicast configuration. * */ bool fill_default_locator_port( - uint32_t domain_id, Locator_t& locator, - const RTPSParticipantAttributes& m_att, - bool is_multicast) const; + uint32_t port) const; /** * Shutdown method to close the connections of the transports. @@ -259,6 +256,14 @@ class NetworkFactory const LocatorList_t& remote_participant_locators, const LocatorList_t& participant_initial_peers) const; + /** + * Calculate well-known ports. + */ + uint16_t calculate_well_known_port( + uint32_t domain_id, + const RTPSParticipantAttributes& att, + bool is_multicast) const; + private: std::vector> mRegisteredTransports; @@ -275,14 +280,6 @@ class NetworkFactory // Mask using transport kinds to indicate whether the transports allows localhost NetworkConfigSet_t network_configuration_; - - /** - * Calculate well-known ports. - */ - uint16_t calculate_well_known_port( - uint32_t domain_id, - const RTPSParticipantAttributes& att, - bool is_multicast) const; }; } // namespace rtps diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index e7cfecac29c..7ba7b8a875e 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -158,7 +158,12 @@ Locator_t& RTPSParticipantImpl::applyLocatorAdaptRule( { // This is a completely made up rule // It is transport responsibility to interpret this new port. - loc.port += m_att.port.participantIDGain; + uint16_t delta = m_att.port.participantIDGain; + if (metatraffic_unicast_port_ == loc.port) + { + metatraffic_unicast_port_ += delta; + } + loc.port += delta; return loc; } @@ -323,13 +328,78 @@ RTPSParticipantImpl::RTPSParticipantImpl( return; } +#if HAVE_SECURITY + // Start security + if (!m_security_manager.init( + security_attributes_, + m_att.properties)) + { + // Participant will be deleted, no need to allocate buffers or create builtin endpoints + return; + } +#endif // if HAVE_SECURITY + + setup_meta_traffic(); + setup_user_traffic(); + setup_initial_peers(); + setup_output_traffic(); + +#if HAVE_SECURITY + if (m_security_manager.is_security_active()) + { + if (!m_security_manager.create_entities()) + { + return; + } + } +#endif // if HAVE_SECURITY + + // Copy NetworkFactory network_configuration to participant attributes prior to proxy creation + // NOTE: all transports already registered before + m_att.builtin.network_configuration = m_network_Factory.network_configuration(); + + mp_builtinProtocols = new BuiltinProtocols(); + + // Initialize builtin protocols + if (!mp_builtinProtocols->initBuiltinProtocols(this, m_att.builtin)) + { + EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "The builtin protocols were not correctly initialized"); + return; + } + + if (c_GuidPrefix_Unknown != persistence_guid) + { + EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, + "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix + << " and persistence guid: " << persistence_guid); + } + else + { + EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, + "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix); + } + + initialized_ = true; +} + +RTPSParticipantImpl::RTPSParticipantImpl( + uint32_t domain_id, + const RTPSParticipantAttributes& PParam, + const GuidPrefix_t& guidP, + RTPSParticipant* par, + RTPSParticipantListener* plisten) + : RTPSParticipantImpl(domain_id, PParam, guidP, c_GuidPrefix_Unknown, par, plisten) +{ +} + +void RTPSParticipantImpl::setup_meta_traffic() +{ /* If metatrafficMulticastLocatorList is empty, add mandatory default Locators Else -> Take them */ // Creation of metatraffic locator and receiver resources uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_); - uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(domain_id_, - static_cast(m_att.participantID)); + metatraffic_unicast_port_ = m_att.port.getUnicastPort(domain_id_, static_cast(m_att.participantID)); uint32_t meta_multicast_port_for_check = metatraffic_multicast_port; /* INSERT DEFAULT MANDATORY MULTICAST LOCATORS HERE */ @@ -355,28 +425,35 @@ RTPSParticipantImpl::RTPSParticipantImpl( std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(), m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) { - m_network_Factory.fillMetatrafficUnicastLocator(locator, metatraffic_unicast_port); + m_network_Factory.fillMetatrafficUnicastLocator(locator, metatraffic_unicast_port_); }); m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList); } - // Initial peers - if (m_att.builtin.initialPeersList.empty()) + if (is_intraprocess_only()) { - m_att.builtin.initialPeersList = m_att.builtin.metatrafficMulticastLocatorList; + m_att.builtin.metatrafficUnicastLocatorList.clear(); } - else - { - LocatorList_t initial_peers; - initial_peers.swap(m_att.builtin.initialPeersList); - std::for_each(initial_peers.begin(), initial_peers.end(), - [&](Locator_t& locator) - { - m_network_Factory.configureInitialPeerLocator(domain_id_, locator, m_att); - }); + createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false, true); + createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, false, false, true); + + // Check metatraffic multicast port + if (0 < m_att.builtin.metatrafficMulticastLocatorList.size() && + m_att.builtin.metatrafficMulticastLocatorList.begin()->port != meta_multicast_port_for_check) + { + EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, + "Metatraffic multicast port " << meta_multicast_port_for_check << " cannot be opened." + " It may is opened by another application. Discovery may fail."); } + namespace external_locators = fastdds::rtps::ExternalLocatorsProcessor; + external_locators::set_listening_locators(m_att.builtin.metatraffic_external_unicast_locators, + m_att.builtin.metatrafficUnicastLocatorList); +} + +void RTPSParticipantImpl::setup_user_traffic() +{ // Creation of user locator and receiver resources //If no default locators are defined we define some. /* The reasoning here is the following. @@ -398,58 +475,59 @@ RTPSParticipantImpl::RTPSParticipantImpl( else { // Locator with port 0, calculate port. + uint32_t unicast_port = metatraffic_unicast_port_ + m_att.port.offsetd3 - m_att.port.offsetd1; std::for_each(m_att.defaultUnicastLocatorList.begin(), m_att.defaultUnicastLocatorList.end(), [&](Locator_t& loc) { - m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, false); + m_network_Factory.fill_default_locator_port(loc, unicast_port); }); m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList); + // Locator with port 0, calculate port. + uint32_t multicast_port = m_network_Factory.calculate_well_known_port(domain_id_, m_att, true); std::for_each(m_att.defaultMulticastLocatorList.begin(), m_att.defaultMulticastLocatorList.end(), [&](Locator_t& loc) { - m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, true); + m_network_Factory.fill_default_locator_port(loc, multicast_port); }); } -#if HAVE_SECURITY - // Start security - if (!m_security_manager.init( - security_attributes_, - m_att.properties)) - { - // Participant will be deleted, no need to allocate buffers or create builtin endpoints - return; - } -#endif // if HAVE_SECURITY - if (is_intraprocess_only()) { - m_att.builtin.metatrafficUnicastLocatorList.clear(); m_att.defaultUnicastLocatorList.clear(); m_att.defaultMulticastLocatorList.clear(); } - createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, true, false, true); - createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false, true); createReceiverResources(m_att.defaultUnicastLocatorList, true, false, true); - createReceiverResources(m_att.defaultMulticastLocatorList, true, false, true); + createReceiverResources(m_att.defaultMulticastLocatorList, false, false, true); - namespace ExternalLocatorsProcessor = fastdds::rtps::ExternalLocatorsProcessor; - ExternalLocatorsProcessor::set_listening_locators(m_att.builtin.metatraffic_external_unicast_locators, - m_att.builtin.metatrafficUnicastLocatorList); - ExternalLocatorsProcessor::set_listening_locators(m_att.default_external_unicast_locators, + namespace external_locators = fastdds::rtps::ExternalLocatorsProcessor; + external_locators::set_listening_locators(m_att.default_external_unicast_locators, m_att.defaultUnicastLocatorList); +} - // Check metatraffic multicast port - if (0 < m_att.builtin.metatrafficMulticastLocatorList.size() && - m_att.builtin.metatrafficMulticastLocatorList.begin()->port != meta_multicast_port_for_check) +void RTPSParticipantImpl::setup_initial_peers() +{ + // Initial peers + if (m_att.builtin.initialPeersList.empty()) { - EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, - "Metatraffic multicast port " << meta_multicast_port_for_check << " cannot be opened." - " It may is opened by another application. Discovery may fail."); + m_att.builtin.initialPeersList = m_att.builtin.metatrafficMulticastLocatorList; + } + else + { + LocatorList_t initial_peers; + initial_peers.swap(m_att.builtin.initialPeersList); + + std::for_each(initial_peers.begin(), initial_peers.end(), + [&](Locator_t& locator) + { + m_network_Factory.configureInitialPeerLocator(domain_id_, locator, m_att); + }); } +} +void RTPSParticipantImpl::setup_output_traffic() +{ bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number; if (num_send_buffers == 0) @@ -483,53 +561,6 @@ RTPSParticipantImpl::RTPSParticipantImpl( { flow_controller_factory_.register_flow_controller(*flow_controller_desc.get()); } - -#if HAVE_SECURITY - if (m_security_manager.is_security_active()) - { - if (!m_security_manager.create_entities()) - { - return; - } - } -#endif // if HAVE_SECURITY - - // Copy NetworkFactory network_configuration to participant attributes prior to proxy creation - // NOTE: all transports already registered before - m_att.builtin.network_configuration = m_network_Factory.network_configuration(); - - mp_builtinProtocols = new BuiltinProtocols(); - - // Initialize builtin protocols - if (!mp_builtinProtocols->initBuiltinProtocols(this, m_att.builtin)) - { - EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "The builtin protocols were not correctly initialized"); - return; - } - - if (c_GuidPrefix_Unknown != persistence_guid) - { - EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, - "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix - << " and persistence guid: " << persistence_guid); - } - else - { - EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, - "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix); - } - - initialized_ = true; -} - -RTPSParticipantImpl::RTPSParticipantImpl( - uint32_t domain_id, - const RTPSParticipantAttributes& PParam, - const GuidPrefix_t& guidP, - RTPSParticipant* par, - RTPSParticipantListener* plisten) - : RTPSParticipantImpl(domain_id, PParam, guidP, c_GuidPrefix_Unknown, par, plisten) -{ } void RTPSParticipantImpl::enable() @@ -1680,7 +1711,8 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint( } // Try creating receiver resources - if (createReceiverResources(attributes.unicastLocatorList, false, true, false)) + LocatorList_t aux_locator_list = attributes.unicastLocatorList; + if (createReceiverResources(aux_locator_list, false, true, false)) { break; } @@ -1797,8 +1829,11 @@ bool RTPSParticipantImpl::createReceiverResources( bool RegisterReceiver, bool log_when_creation_fails) { + auto input_list = Locator_list; + Locator_list.clear(); + std::vector> newItemsBuffer; - bool ret_val = Locator_list.empty(); + bool ret_val = input_list.empty(); #if HAVE_SECURITY // An auxilary buffer is needed in the ReceiverResource to to decrypt the message, @@ -1809,23 +1844,31 @@ bool RTPSParticipantImpl::createReceiverResources( uint32_t max_receiver_buffer_size = (std::numeric_limits::max)(); #endif // if HAVE_SECURITY - for (auto it_loc = Locator_list.begin(); it_loc != Locator_list.end(); ++it_loc) + for (auto it_loc = input_list.begin(); it_loc != input_list.end(); ++it_loc) { - bool ret = m_network_Factory.BuildReceiverResources(*it_loc, newItemsBuffer, max_receiver_buffer_size); + Locator_t loc = *it_loc; + bool ret = m_network_Factory.BuildReceiverResources(loc, newItemsBuffer, max_receiver_buffer_size); if (!ret && ApplyMutation) { uint32_t tries = 0; while (!ret && (tries < m_att.builtin.mutation_tries)) { tries++; - applyLocatorAdaptRule(*it_loc); - ret = m_network_Factory.BuildReceiverResources(*it_loc, newItemsBuffer, max_receiver_buffer_size); + applyLocatorAdaptRule(loc); + ret = m_network_Factory.BuildReceiverResources(loc, newItemsBuffer, max_receiver_buffer_size); } } - if (!ret && log_when_creation_fails) + if (ret) { - EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, "Could not create the specified receiver resource"); + Locator_list.push_back(loc); + } + else if (log_when_creation_fails) + { + std::string postfix = ApplyMutation ? ". Applied mutation until: " + IPLocator::to_string(loc) : ""; + static_cast(postfix); // Might be unused if log is disabled + EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, + "Could not create the specified receiver resource for '" << *it_loc << "'" << postfix); } ret_val |= !newItemsBuffer.empty(); @@ -2077,13 +2120,15 @@ void RTPSParticipantImpl::normalize_endpoint_locators( EndpointAttributes& endpoint_att) { // Locators with port 0, calculate port. + uint32_t unicast_port = metatraffic_unicast_port_ + m_att.port.offsetd3 - m_att.port.offsetd1; for (Locator_t& loc : endpoint_att.unicastLocatorList) { - m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, false); + m_network_Factory.fill_default_locator_port(loc, unicast_port); } + uint32_t multicast_port = m_network_Factory.calculate_well_known_port(domain_id_, m_att, true); for (Locator_t& loc : endpoint_att.multicastLocatorList) { - m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, true); + m_network_Factory.fill_default_locator_port(loc, multicast_port); } // Normalize unicast locators @@ -2661,21 +2706,20 @@ void RTPSParticipantImpl::environment_file_has_changed() void RTPSParticipantImpl::get_default_metatraffic_locators() { uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_); - uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(domain_id_, - static_cast(m_att.participantID)); m_network_Factory.getDefaultMetatrafficMulticastLocators(m_att.builtin.metatrafficMulticastLocatorList, metatraffic_multicast_port); m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficMulticastLocatorList); m_network_Factory.getDefaultMetatrafficUnicastLocators(m_att.builtin.metatrafficUnicastLocatorList, - metatraffic_unicast_port); + metatraffic_unicast_port_); m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList); } void RTPSParticipantImpl::get_default_unicast_locators() { - m_network_Factory.getDefaultUnicastLocators(domain_id_, m_att.defaultUnicastLocatorList, m_att); + uint32_t unicast_port = metatraffic_unicast_port_ + m_att.port.offsetd3 - m_att.port.offsetd1; + m_network_Factory.getDefaultUnicastLocators(m_att.defaultUnicastLocatorList, unicast_port); m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList); } diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 71b66ad35a5..f6b0d07855e 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -563,6 +563,8 @@ class RTPSParticipantImpl uint32_t domain_id_; //!Attributes of the RTPSParticipant RTPSParticipantAttributes m_att; + //! Metatraffic unicast port used by default on this participant + uint32_t metatraffic_unicast_port_ = 0; //!Guid of the RTPSParticipant. GUID_t m_guid; //! String containing the RTPSParticipant Guid. @@ -635,6 +637,11 @@ class RTPSParticipantImpl //! Protect ignored entities collection concurrent access mutable shared_mutex ignored_mtx_; + void setup_meta_traffic(); + void setup_user_traffic(); + void setup_initial_peers(); + void setup_output_traffic(); + RTPSParticipantImpl& operator =( const RTPSParticipantImpl&) = delete; diff --git a/test/blackbox/common/BlackboxTestsTransportSHM.cpp b/test/blackbox/common/BlackboxTestsTransportSHM.cpp index 448429735dd..10e0ad8b61c 100644 --- a/test/blackbox/common/BlackboxTestsTransportSHM.cpp +++ b/test/blackbox/common/BlackboxTestsTransportSHM.cpp @@ -120,11 +120,15 @@ TEST(SHM, SamePortUnicastMulticast) LocatorList reader_locators; participant.get_native_reader().get_listening_locators(reader_locators); - ASSERT_EQ(reader_locators.size(), 2u); + ASSERT_LE(reader_locators.size(), 2u); auto it = reader_locators.begin(); - auto first_port = it->port; - ++it; - auto second_port = it->port; + uint32_t first_port = it->port; + uint32_t second_port = 0; + if (reader_locators.size() == 2) + { + ++it; + second_port = it->port; + } EXPECT_NE(first_port, second_port); EXPECT_TRUE(first_port == global_port || second_port == global_port); }