Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19942] Make remove unused entities compatible with the discovery trigger #83

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 92 additions & 41 deletions ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
#include <ddspipe_core/communication/Bridge.hpp>
#include <ddspipe_core/communication/dds/Track.hpp>
#include <ddspipe_core/configuration/RoutesConfiguration.hpp>
#include <ddspipe_core/types/dds/Endpoint.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>
#include <ddspipe_core/types/topic/filter/ManualTopic.hpp>
#include <ddspipe_core/types/topic/filter/WildcardDdsFilterTopic.hpp>


namespace eprosima {
namespace ddspipe {
namespace core {
Expand All @@ -49,7 +51,10 @@ class DdsBridge : public Bridge
* @param participant_database: Collection of Participants to manage communication
* @param payload_pool: Payload Pool that handles the reservation/release of payloads throughout the DDS Router
* @param thread_pool: Shared pool of threads in charge of data transmission.
* @param enable: Whether the Bridge should be initialized as enabled
* @param routes_config: Configuration of the routes of the Bridge
* @param remove_unused_entities: Whether the Bridge should remove unused entities
* @param manual_topics: Topics that explicitally set a QoS attribute for this participant
* @param endpoint_kind: Kind of the endpoint that discovered the topic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I believe DdsBridge should be agnostic (or at least has been until now) to discovery events. So instead of passing the kind of the endpoint that discovered the topic or was discovered, I would pass the kind of the endpoint that should be created.

*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
Expand All @@ -61,7 +66,8 @@ class DdsBridge : public Bridge
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const RoutesConfiguration& routes_config,
const bool remove_unused_entities,
const std::vector<core::types::ManualTopic>& manual_topics);
const std::vector<core::types::ManualTopic>& manual_topics,
const types::EndpointKind& endpoint_kind);
juanlofer-eprosima marked this conversation as resolved.
Show resolved Hide resolved

DDSPIPE_CORE_DllAPI
~DdsBridge();
Expand All @@ -85,76 +91,120 @@ class DdsBridge : public Bridge
void disable() noexcept override;

/**
* Build the IReaders and IWriters inside the bridge for the new participant,
* and add them to the Tracks.
* Create a new endpoint in the bridge.
*
* Thread safe
* It will call either \c create_writer_and_its_tracks_nts_ or \c create_reader_and_its_track_nts_ depending on the
* \c discovered_endpoint_kind.
*
* @param participant_id: The id of the participant who is creating the writer.
* @param participant_id: The id of the participant who is creating the endpoint.
* @param discovered_endpoint_kind: The kind of endpoint that has been discovered.
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
* Thread safe.
*/
DDSPIPE_CORE_DllAPI
void create_writer(
const types::ParticipantId& participant_id);
void create_endpoint(
const types::ParticipantId& participant_id,
const types::EndpointKind& discovered_endpoint_kind);

/**
* Remove the IWriter from all the Tracks in the bridge.
* Remove the IReaders and Tracks that don't have any IWriters.
* Remove an endpoint from the bridge.
*
* Thread safe
* It will call either \c remove_writer_and_its_tracks_nts_ or \c remove_reader_and_its_track_nts_ depending on the
* \c removed_endpoint_kind.
*
* @param participant_id: The id of the participant who is removing the writer.
* @param participant_id: The id of the participant who is removing the endpoint.
* @param removed_endpoint_kind: The kind of endpoint that has been removed.
*
* Thread safe.
*/
DDSPIPE_CORE_DllAPI
void remove_writer(
const types::ParticipantId& participant_id) noexcept;
void remove_endpoint(
const types::ParticipantId& participant_id,
const types::EndpointKind& removed_endpoint_kind);

protected:

/**
* Create the readers, writers, and tracks that are required by the routes.
* Create an IWriter for the new participant.
* Create the IReaders in the IWriter's route.
* Create the Tracks of the IReaders with the IWriter.
*
* Thread safe
* @param participant_id: The id of the participant who is creating the writer.
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
DDSPIPE_CORE_DllAPI
void create_all_tracks_();
void create_writer_and_its_tracks_nts_(
const types::ParticipantId& participant_id);

/**
* Add each Participant's IWriters to its Track.
* If the Participant's IReader doesn't exist, create it.
* If the Participant's Track doesn't exist, create it.
* Create an IReader for the new participant.
* Create the IWriters in the IReader's route.
* Create the Track with the IReader and IWriters.
*
* @param writers: The map of ids to writers that are required for the tracks.
* @param participant_id: The id of the participant who is creating the reader.
*
* @throw InitializationException in case \c IReaders creation fails.
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
DDSPIPE_CORE_DllAPI
void add_writer_to_tracks_nts_(
const types::ParticipantId& participant_id,
std::shared_ptr<IWriter>& writer);
void create_reader_and_its_track_nts_(
const types::ParticipantId& participant_id);

/**
* Remove the IWriter from all the Tracks.
* Remove the IReaders and Tracks without IWriters.
*
* @param participant_id: The id of the participant who is removing the writer.
*/
void remove_writer_and_its_tracks_nts_(
const types::ParticipantId& participant_id) noexcept;

/**
* Add each Participant's IWriters to its Track.
* If the Participant's IReader doesn't exist, create it.
* If the Participant's Track doesn't exist, create it.
* Remove the IReader and its Track.
* Remove the IWriters that no longer belong to a Track.
*
* @param writers: The map of ids to writers that are required for the tracks.
* @param participant_id: The id of the participant who is removing the reader.
*/
void remove_reader_and_its_track_nts_(
const types::ParticipantId& participant_id) noexcept;

/**
* @brief Create a Track for an IReader and its IWriters.
*
* @throw InitializationException in case \c IReaders creation fails.
* @param id: The id of the participant who is creating the track.
* @param reader: The IReader of the track.
* @param writers: The IWriters of the track.
*/
DDSPIPE_CORE_DllAPI
void add_writers_to_tracks_nts_(
void create_track_nts_(
const types::ParticipantId& id,
const std::shared_ptr<IReader>& reader,
std::map<types::ParticipantId, std::shared_ptr<IWriter>>& writers);

/**
* @brief Create an IWriter for a participant in the topic.
*
* A tailored Topic is created for the participant, depending on the QoS configured for it.
*
* @param participant_id: The id of the participant who is creating the writer.
*/
std::shared_ptr<core::IWriter> create_writer_nts_(
const types::ParticipantId& participant_id);

/**
* @brief Create an IReader for a participant in the topic.
*
* A tailored Topic is created for the participant, depending on the QoS configured for it.
*
* @param participant_id: The id of the participant who is creating the reader.
*/
std::shared_ptr<core::IReader> create_reader_nts_(
const types::ParticipantId& participant_id);

/**
* @brief Impose the Topic QoS that have been pre-configured for a participant.
*
* First, it imposes the Topic QoS configured at \c manual_topics and then the ones configured at \c participants.
*
* @param participant: The participant to impose the QoS on.
*/
DDSPIPE_CORE_DllAPI
utils::Heritable<types::DistributedTopic> create_topic_for_participant_nts_(
const std::shared_ptr<IParticipant>& participant) noexcept;

Expand All @@ -166,17 +216,18 @@ class DdsBridge : public Bridge
utils::Heritable<types::DistributedTopic> topic_;

//! Routes associated to the Topic.
RoutesConfiguration::RoutesMap routes_;
RoutesConfiguration::RoutesMap writers_in_route_;
RoutesConfiguration::RoutesMap readers_in_route_;

//! Topics that explicitally set a QoS attribute for this participant.
std::vector<types::ManualTopic> manual_topics_;

/**
* Inside \c Tracks
* They are indexed by the Id of the participant that is source
*/
//! The tracks of the bridge indexed by the participant_id of their reader.
std::map<types::ParticipantId, std::unique_ptr<Track>> tracks_;

//! The writers of the bridge index by their participant_id.
std::map<types::ParticipantId, std::shared_ptr<IWriter>> writers_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design comment: up to now bridges contain tracks, and tracks the only ones holding reference to endpoints. Analyze implications (such as object lifespan) and try to maintain this principle if makes sense.


//! Mutex to prevent simultaneous calls to enable and/or disable
std::mutex mutex_;

Expand Down
7 changes: 4 additions & 3 deletions ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ class Track
*/
DDSPIPE_CORE_DllAPI
bool has_writer(
const types::ParticipantId& id) noexcept;
const types::ParticipantId& id) const noexcept;

/**
* Check if a track has at least one writer.
*
* Tread safe
*/
DDSPIPE_CORE_DllAPI
bool has_writers() noexcept;
bool has_writers() const noexcept;

protected:

Expand Down Expand Up @@ -209,8 +209,9 @@ class Track
/**
* Mutex to prevent simultaneous calls to \c enable and/or \c disable .
* It manages access to variable \c enabled_ .
* It is mutable so it can be used in const methods.
*/
std::mutex track_mutex_;
mutable std::mutex track_mutex_;

/////
// Transmit thread part
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,52 @@ struct RoutesConfiguration : public IConfiguration
// CONSTRUCTORS
/////////////////////////

DDSPIPE_CORE_DllAPI RoutesConfiguration() = default;
DDSPIPE_CORE_DllAPI
RoutesConfiguration() = default;

/////////////////////////
// METHODS
/////////////////////////

DDSPIPE_CORE_DllAPI virtual bool is_valid(
DDSPIPE_CORE_DllAPI
bool is_valid(
utils::Formatter& error_msg) const noexcept override;

DDSPIPE_CORE_DllAPI bool is_valid(
DDSPIPE_CORE_DllAPI
bool is_valid(
utils::Formatter& error_msg,
const std::map<types::ParticipantId, bool>& participants) const noexcept;

/**
* @brief Returns the writers in each reader's route.
*
* It returns a map with the readers as keys and the writers in each reader's route as values.
*
* The first time this method is called, it will calculate the routes and store them in the object.
* Subsequent calls will return the stored routes.
*/
DDSPIPE_CORE_DllAPI
RoutesMap routes_of_readers(
const std::map<types::ParticipantId, bool>& participant_ids) const noexcept;

/**
* @brief Returns the readers in each writer's route.
*
* It returns a map with the writers as keys and the readers in each writer's route as values.
*
* The first time this method is called, it will calculate the routes and store them in the object.
* Subsequent calls will return the stored routes.
*/
DDSPIPE_CORE_DllAPI
RoutesMap routes_of_writers(
const std::map<types::ParticipantId, bool>& participant_ids) const noexcept;

/////////////////////////
// OPERATORS
/////////////////////////

DDSPIPE_CORE_DllAPI RoutesMap operator () () const;
DDSPIPE_CORE_DllAPI
RoutesMap operator () () const;

/////////////////////////
// VARIABLES
Expand Down
11 changes: 9 additions & 2 deletions ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ddspipe_core/dynamic/DiscoveryDatabase.hpp>
#include <ddspipe_core/dynamic/ParticipantsDatabase.hpp>
#include <ddspipe_core/efficiency/payload/PayloadPool.hpp>
#include <ddspipe_core/types/dds/Endpoint.hpp>

#include <ddspipe_core/library/library_dll.h>

Expand Down Expand Up @@ -260,9 +261,11 @@ class DdsPipe
* @note This is the only method that adds topics to \c current_topics_
*
* @param [in] topic : topic discovered
* @param [in] endpoint_kind : kind of the endpoint
*/
void discovered_topic_nts_(
const utils::Heritable<types::DistributedTopic>& topic) noexcept;
const utils::Heritable<types::DistributedTopic>& topic,
const types::EndpointKind& endpoint_kind = types::EndpointKind::reader) noexcept;
juanlofer-eprosima marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Method called every time a new endpoint (corresponding to a server) has been discovered/updated
Expand Down Expand Up @@ -301,9 +304,11 @@ class DdsPipe
* It is created enabled if the DdsPipe is enabled.
*
* @param [in] topic : new topic
* @param [in] enabled : whether to enable the bridge on creation or not
*/
void create_new_bridge_nts_(
const utils::Heritable<types::DistributedTopic>& topic,
const types::EndpointKind endpoint_kind = types::EndpointKind::reader,
bool enabled = false) noexcept;

/**
Expand All @@ -322,9 +327,11 @@ class DdsPipe
* If the topic did not exist before, the Bridge is created.
*
* @param [in] topic : Topic to be enabled
* @param [in] endpoint_kind : Kind of endpoint who discovered the topic
*/
void activate_topic_nts_(
const utils::Heritable<types::DistributedTopic>& topic) noexcept;
const utils::Heritable<types::DistributedTopic>& topic,
const types::EndpointKind& endpoint_kind = types::EndpointKind::reader) noexcept;

/**
* @brief Disable a specific topic.
Expand Down
Loading
Loading