Skip to content

Commit

Permalink
Fix issues in Dynamic Network Interfaces (#5282)
Browse files Browse the repository at this point in the history
* Refs #21690. Parse `--rescan` argument on communication applications.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Implement rescan mechanism.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add docker infrastructure.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add CMake infrastructure.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Ensure same domain and topic name are used.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add `--loops` argument to publisher.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Publisher exits after publishing all samples.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Improve subscriber script.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Make publisher wait subscriber.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Possible fix.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Clear locators before recalculating them.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Move local participant proxy update to PDP.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Improve new method's logic.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Include what you use.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add empty method to update endpoint locators.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add implementation for `update_endpoint_locators_if_default_nts`.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Compare against old default locators.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Update locators in attributes.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #17283. Avoid early return on `PDP::local_participant_attributes_update_nts`.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #17283. Apply suggestions.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 91bd7c8)

# Conflicts:
#	include/fastdds/rtps/builtin/discovery/participant/PDP.h
#	src/cpp/rtps/builtin/discovery/participant/PDP.cpp
#	src/cpp/rtps/participant/RTPSParticipantImpl.cpp
#	test/dds/communication/PublisherModule.hpp
#	test/dds/communication/SubscriberModule.hpp
#	test/dds/communication/security/PublisherModule.hpp
#	test/dds/communication/security/SubscriberModule.hpp
  • Loading branch information
MiguelCompany authored and mergify[bot] committed Oct 4, 2024
1 parent a8396fa commit 8bf975e
Show file tree
Hide file tree
Showing 21 changed files with 613 additions and 37 deletions.
49 changes: 49 additions & 0 deletions include/fastdds/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

<<<<<<< HEAD:include/fastdds/rtps/builtin/discovery/participant/PDP.h
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
Expand All @@ -37,6 +41,35 @@
#include <fastrtps/qos/QosPolicies.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
#include <fastrtps/utils/ProxyPool.hpp>
=======
#include <fastcdr/cdr/fixed_size_string.hpp>

#include <fastdds/dds/core/Time_t.hpp>
#include <fastdds/dds/core/policy/ParameterTypes.hpp>
#include <fastdds/dds/core/policy/QosPolicies.hpp>
#include <fastdds/rtps/attributes/ReaderAttributes.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.hpp>
#include <fastdds/rtps/attributes/WriterAttributes.hpp>
#include <fastdds/rtps/common/CDRMessage_t.hpp>
#include <fastdds/rtps/common/Guid.hpp>
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.hpp>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/Types.hpp>
#include <fastdds/rtps/common/WriteParams.hpp>
#include <fastdds/rtps/history/IPayloadPool.hpp>
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.hpp>
#include <fastdds/rtps/reader/ReaderDiscoveryStatus.hpp>
#include <fastdds/rtps/writer/WriterDiscoveryStatus.hpp>
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>

#include <rtps/builtin/data/ParticipantProxyData.hpp>
#include <rtps/builtin/data/ReaderProxyData.hpp>
#include <rtps/builtin/data/WriterProxyData.hpp>
#include <statistics/rtps/monitor-service/interfaces/IProxyObserver.hpp>
#include <statistics/rtps/monitor-service/interfaces/IProxyQueryable.hpp>
#include <utils/ProxyPool.hpp>
>>>>>>> 91bd7c857 (Fix issues in Dynamic Network Interfaces (#5282)):src/cpp/rtps/builtin/discovery/participant/PDP.h

namespace eprosima {

Expand All @@ -51,6 +84,7 @@ struct IProxyObserver;

namespace rtps {

<<<<<<< HEAD:include/fastdds/rtps/builtin/discovery/participant/PDP.h
class PDPServerListener;
class PDPEndpoints;

Expand All @@ -68,8 +102,13 @@ namespace rtps {

class RTPSWriter;
class RTPSReader;
=======
class BaseWriter;
class BaseReader;
>>>>>>> 91bd7c857 (Fix issues in Dynamic Network Interfaces (#5282)):src/cpp/rtps/builtin/discovery/participant/PDP.h
class WriterHistory;
class ReaderHistory;
struct RTPSParticipantAllocationAttributes;
class RTPSParticipantImpl;
class RTPSParticipantListener;
class BuiltinProtocols;
Expand All @@ -79,6 +118,7 @@ class ReaderProxyData;
class WriterProxyData;
class ParticipantProxyData;
class ReaderListener;
class PDPEndpoints;
class PDPListener;
class PDPServerListener;
class ITopicPayloadPool;
Expand Down Expand Up @@ -493,6 +533,15 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable

#endif // FASTDDS_STATISTICS

virtual void local_participant_attributes_update_nts(
const RTPSParticipantAttributes& new_atts);

virtual void update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts);

protected:

//!Pointer to the builtin protocols object.
Expand Down
161 changes: 161 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,52 @@
#include <mutex>
#include <chrono>

<<<<<<< HEAD
=======
#include <fastdds/config.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/rtps/builtin/data/BuiltinEndpoints.hpp>
#include <fastdds/rtps/builtin/data/ParticipantBuiltinTopicData.hpp>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/history/ReaderHistory.hpp>
#include <fastdds/rtps/history/WriterHistory.hpp>
#include <fastdds/rtps/participant/RTPSParticipantListener.hpp>
#include <fastdds/rtps/reader/ReaderDiscoveryStatus.hpp>
#include <fastdds/rtps/writer/WriterDiscoveryStatus.hpp>
#include <fastdds/utils/IPLocator.hpp>

#include <fastdds/builtin/type_lookup_service/TypeLookupManager.hpp>
#include <fastdds/utils/TypePropagation.hpp>
#include <rtps/builtin/BuiltinProtocols.h>
#include <rtps/builtin/data/ParticipantProxyData.hpp>
#include <rtps/builtin/data/ProxyDataConverters.hpp>
#include <rtps/builtin/data/ProxyHashTables.hpp>
#include <rtps/builtin/data/ReaderProxyData.hpp>
#include <rtps/builtin/data/WriterProxyData.hpp>
#include <rtps/builtin/discovery/endpoint/EDPSimple.h>
#include <rtps/builtin/discovery/endpoint/EDPStatic.h>
#include <rtps/builtin/discovery/participant/PDPEndpoints.hpp>
#include <rtps/builtin/discovery/participant/PDPListener.h>
#include <rtps/builtin/liveliness/WLP.hpp>
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
#include <rtps/network/utils/external_locators.hpp>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/StatefulReader.hpp>
#include <rtps/reader/StatelessReader.hpp>
#include <rtps/resources/TimedEvent.h>
#include <rtps/writer/StatelessWriter.hpp>
#if HAVE_SECURITY
#include <rtps/security/accesscontrol/ParticipantSecurityAttributes.h>
#endif // if HAVE_SECURITY
#include <utils/shared_mutex.hpp>
#include <utils/TimeConversion.hpp>
#include <rtps/writer/BaseWriter.hpp>
#include <rtps/reader/BaseReader.hpp>

>>>>>>> 91bd7c857 (Fix issues in Dynamic Network Interfaces (#5282))
namespace eprosima {
namespace fastrtps {
namespace rtps {
Expand Down Expand Up @@ -1662,6 +1708,121 @@ void PDP::add_builtin_security_attributes(

#endif // HAVE_SECURITY

void PDP::local_participant_attributes_update_nts(
const RTPSParticipantAttributes& new_atts)
{
// Update user data
auto participant_data = getLocalParticipantProxyData();
participant_data->m_userData.data_vec(new_atts.userData);

// If we are intraprocess only, we do not need to update locators
bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only();
if (announce_locators)
{
// Clear all locators
participant_data->metatraffic_locators.unicast.clear();
participant_data->metatraffic_locators.multicast.clear();
participant_data->default_locators.unicast.clear();
participant_data->default_locators.multicast.clear();

// Update default locators
for (const Locator_t& loc : new_atts.defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : new_atts.defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
}

// Update metatraffic locators
for (const auto& locator : new_atts.builtin.metatrafficUnicastLocatorList)
{
participant_data->metatraffic_locators.add_unicast_locator(locator);
}
if (!new_atts.builtin.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
{
for (const auto& locator : new_atts.builtin.metatrafficMulticastLocatorList)
{
participant_data->metatraffic_locators.add_multicast_locator(locator);
}
}

fastdds::rtps::network::external_locators::add_external_locators(*participant_data,
new_atts.builtin.metatraffic_external_unicast_locators,
new_atts.default_external_unicast_locators);
}
}

void PDP::update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts)
{
// Check if default locators have changed
const auto& old_default_unicast = old_atts.defaultUnicastLocatorList;
const auto& old_default_multicast = old_atts.defaultMulticastLocatorList;
const auto& new_default_unicast = new_atts.defaultUnicastLocatorList;
const auto& new_default_multicast = new_atts.defaultMulticastLocatorList;

// Early return if there is no change in default unicast locators
if ((old_default_unicast == new_default_unicast) &&
(old_default_multicast == new_default_multicast))
{
return;
}

// Update proxies of endpoints with default configured locators
EDP* edp = get_edp();
for (BaseWriter* writer : writers)
{
if ((old_default_multicast == writer->getAttributes().multicastLocatorList) &&
(old_default_unicast == writer->getAttributes().unicastLocatorList))
{
writer->getAttributes().multicastLocatorList = new_default_multicast;
writer->getAttributes().unicastLocatorList = new_default_unicast;

WriterProxyData* wdata = nullptr;
GUID_t participant_guid;
wdata = addWriterProxyData(writer->getGuid(), participant_guid,
[](WriterProxyData* proxy, bool is_update, const ParticipantProxyData& participant)
{
static_cast<void>(is_update);
assert(is_update);

proxy->set_locators(participant.default_locators);
return true;
});
assert(wdata != nullptr);
edp->process_writer_proxy_data(writer, wdata);
}
}
for (BaseReader* reader : readers)
{
if ((old_default_multicast == reader->getAttributes().multicastLocatorList) &&
(old_default_unicast == reader->getAttributes().unicastLocatorList))
{
reader->getAttributes().multicastLocatorList = new_default_multicast;
reader->getAttributes().unicastLocatorList = new_default_unicast;

ReaderProxyData* rdata = nullptr;
GUID_t participant_guid;
rdata = addReaderProxyData(reader->getGuid(), participant_guid,
[](ReaderProxyData* proxy, bool is_update, const ParticipantProxyData& participant)
{
static_cast<void>(is_update);
assert(is_update);

proxy->set_locators(participant.default_locators);
return true;
});
assert(rdata != nullptr);
edp->process_reader_proxy_data(reader, rdata);
}
}
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
22 changes: 22 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1539,19 +1539,33 @@ void RTPSParticipantImpl::update_attributes(
// Check if new interfaces have been added
if (internal_metatraffic_locators_)
{
<<<<<<< HEAD
LocatorList_t metatraffic_unicast_locator_list = m_att.builtin.metatrafficUnicastLocatorList;
get_default_metatraffic_locators();
if (!(metatraffic_unicast_locator_list == m_att.builtin.metatrafficUnicastLocatorList))
=======
LocatorList_t metatraffic_unicast_locator_list = temp_atts.builtin.metatrafficUnicastLocatorList;
temp_atts.builtin.metatrafficUnicastLocatorList.clear();
get_default_metatraffic_locators(temp_atts);
if (!(metatraffic_unicast_locator_list == temp_atts.builtin.metatrafficUnicastLocatorList))
>>>>>>> 91bd7c857 (Fix issues in Dynamic Network Interfaces (#5282))
{
local_interfaces_changed = true;
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, m_att.getName() << " updated its metatraffic locators");
}
}
if (internal_default_locators_)
{
<<<<<<< HEAD
LocatorList_t default_unicast_locator_list = m_att.defaultUnicastLocatorList;
get_default_unicast_locators();
if (!(default_unicast_locator_list == m_att.defaultUnicastLocatorList))
=======
LocatorList_t default_unicast_locator_list = temp_atts.defaultUnicastLocatorList;
temp_atts.defaultUnicastLocatorList.clear();
get_default_unicast_locators(temp_atts);
if (!(default_unicast_locator_list == temp_atts.defaultUnicastLocatorList))
>>>>>>> 91bd7c857 (Fix issues in Dynamic Network Interfaces (#5282))
{
local_interfaces_changed = true;
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT,
Expand Down Expand Up @@ -1673,7 +1687,9 @@ void RTPSParticipantImpl::update_attributes(

{
std::lock_guard<std::recursive_mutex> lock(*pdp->getMutex());
pdp->local_participant_attributes_update_nts(temp_atts);

<<<<<<< HEAD
// Update user data
auto local_participant_proxy_data = pdp->getLocalParticipantProxyData();
local_participant_proxy_data->m_userData.data_vec(m_att.userData);
Expand All @@ -1692,6 +1708,12 @@ void RTPSParticipantImpl::update_attributes(
for (auto locator : m_att.defaultUnicastLocatorList)
{
local_participant_proxy_data->default_locators.add_unicast_locator(locator);
=======
if (local_interfaces_changed && internal_default_locators_)
{
std::lock_guard<shared_mutex> _(endpoints_list_mutex);
pdp->update_endpoint_locators_if_default_nts(m_userWriterList, m_userReaderList, m_att, temp_atts);
>>>>>>> 91bd7c857 (Fix issues in Dynamic Network Interfaces (#5282))
}

if (local_interfaces_changed)
Expand Down
4 changes: 4 additions & 0 deletions test/dds/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,7 @@ if(Python3_Interpreter_FOUND)
endif()

endif()

if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID))
add_subdirectory(dyn_network)
endif()
6 changes: 3 additions & 3 deletions test/dds/communication/PubSubMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void publisher_run(
publisher->wait_discovery(wait);
}

publisher->run(samples, loops, interval);
publisher->run(samples, 0, loops, interval);
}

int main(
Expand Down Expand Up @@ -197,7 +197,7 @@ int main(
DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file);
}

SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy);
SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, false, false);
PublisherModule publisher(exit_on_lost_liveliness, fixed_type, zero_copy);

uint32_t result = 1;
Expand All @@ -208,7 +208,7 @@ int main(

if (subscriber.init(seed, magic))
{
result = subscriber.run(notexit, timeout) ? 0 : -1;
result = subscriber.run(notexit, 0, timeout) ? 0 : -1;
}

publisher_thread.join();
Expand Down
Loading

0 comments on commit 8bf975e

Please sign in to comment.