diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp index e1af73ec..1fadfb64 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp @@ -19,10 +19,12 @@ #include #include #include +#include #include #include #include + namespace eprosima { namespace ddspipe { namespace core { @@ -49,7 +51,10 @@ class DdsBridge : public Bridge * @param participant_database: Collection of Participants to manage communication * @param payload_pool: Payload Pool that handles the reservation/release of payloads throughout the DDS Router * @param thread_pool: Shared pool of threads in charge of data transmission. - * @param enable: Whether the Bridge should be initialized as enabled + * @param routes_config: Configuration of the routes of the Bridge + * @param remove_unused_entities: Whether the Bridge should remove unused entities + * @param manual_topics: Topics that explicitally set a QoS attribute for this participant + * @param endpoint_kind: Kind of the endpoint that discovered the topic * * @throw InitializationException in case \c IWriters or \c IReaders creation fails. */ @@ -61,7 +66,8 @@ class DdsBridge : public Bridge const std::shared_ptr& thread_pool, const RoutesConfiguration& routes_config, const bool remove_unused_entities, - const std::vector& manual_topics); + const std::vector& manual_topics, + const types::EndpointKind& endpoint_kind); DDSPIPE_CORE_DllAPI ~DdsBridge(); @@ -85,76 +91,120 @@ class DdsBridge : public Bridge void disable() noexcept override; /** - * Build the IReaders and IWriters inside the bridge for the new participant, - * and add them to the Tracks. + * Create a new endpoint in the bridge. * - * Thread safe + * It will call either \c create_writer_and_its_tracks_nts_ or \c create_reader_and_its_track_nts_ depending on the + * \c discovered_endpoint_kind. * - * @param participant_id: The id of the participant who is creating the writer. + * @param participant_id: The id of the participant who is creating the endpoint. + * @param discovered_endpoint_kind: The kind of endpoint that has been discovered. * - * @throw InitializationException in case \c IWriters or \c IReaders creation fails. + * Thread safe. */ DDSPIPE_CORE_DllAPI - void create_writer( - const types::ParticipantId& participant_id); + void create_endpoint( + const types::ParticipantId& participant_id, + const types::EndpointKind& discovered_endpoint_kind); /** - * Remove the IWriter from all the Tracks in the bridge. - * Remove the IReaders and Tracks that don't have any IWriters. + * Remove an endpoint from the bridge. * - * Thread safe + * It will call either \c remove_writer_and_its_tracks_nts_ or \c remove_reader_and_its_track_nts_ depending on the + * \c removed_endpoint_kind. * - * @param participant_id: The id of the participant who is removing the writer. + * @param participant_id: The id of the participant who is removing the endpoint. + * @param removed_endpoint_kind: The kind of endpoint that has been removed. + * + * Thread safe. */ DDSPIPE_CORE_DllAPI - void remove_writer( - const types::ParticipantId& participant_id) noexcept; + void remove_endpoint( + const types::ParticipantId& participant_id, + const types::EndpointKind& removed_endpoint_kind); protected: /** - * Create the readers, writers, and tracks that are required by the routes. + * Create an IWriter for the new participant. + * Create the IReaders in the IWriter's route. + * Create the Tracks of the IReaders with the IWriter. * - * Thread safe + * @param participant_id: The id of the participant who is creating the writer. * * @throw InitializationException in case \c IWriters or \c IReaders creation fails. */ - DDSPIPE_CORE_DllAPI - void create_all_tracks_(); + void create_writer_and_its_tracks_nts_( + const types::ParticipantId& participant_id); /** - * Add each Participant's IWriters to its Track. - * If the Participant's IReader doesn't exist, create it. - * If the Participant's Track doesn't exist, create it. + * Create an IReader for the new participant. + * Create the IWriters in the IReader's route. + * Create the Track with the IReader and IWriters. * - * @param writers: The map of ids to writers that are required for the tracks. + * @param participant_id: The id of the participant who is creating the reader. * - * @throw InitializationException in case \c IReaders creation fails. + * @throw InitializationException in case \c IWriters or \c IReaders creation fails. */ - DDSPIPE_CORE_DllAPI - void add_writer_to_tracks_nts_( - const types::ParticipantId& participant_id, - std::shared_ptr& writer); + void create_reader_and_its_track_nts_( + const types::ParticipantId& participant_id); + + /** + * Remove the IWriter from all the Tracks. + * Remove the IReaders and Tracks without IWriters. + * + * @param participant_id: The id of the participant who is removing the writer. + */ + void remove_writer_and_its_tracks_nts_( + const types::ParticipantId& participant_id) noexcept; /** - * Add each Participant's IWriters to its Track. - * If the Participant's IReader doesn't exist, create it. - * If the Participant's Track doesn't exist, create it. + * Remove the IReader and its Track. + * Remove the IWriters that no longer belong to a Track. * - * @param writers: The map of ids to writers that are required for the tracks. + * @param participant_id: The id of the participant who is removing the reader. + */ + void remove_reader_and_its_track_nts_( + const types::ParticipantId& participant_id) noexcept; + + /** + * @brief Create a Track for an IReader and its IWriters. * - * @throw InitializationException in case \c IReaders creation fails. + * @param id: The id of the participant who is creating the track. + * @param reader: The IReader of the track. + * @param writers: The IWriters of the track. */ - DDSPIPE_CORE_DllAPI - void add_writers_to_tracks_nts_( + void create_track_nts_( + const types::ParticipantId& id, + const std::shared_ptr& reader, std::map>& writers); + /** + * @brief Create an IWriter for a participant in the topic. + * + * A tailored Topic is created for the participant, depending on the QoS configured for it. + * + * @param participant_id: The id of the participant who is creating the writer. + */ + std::shared_ptr create_writer_nts_( + const types::ParticipantId& participant_id); + + /** + * @brief Create an IReader for a participant in the topic. + * + * A tailored Topic is created for the participant, depending on the QoS configured for it. + * + * @param participant_id: The id of the participant who is creating the reader. + */ + std::shared_ptr create_reader_nts_( + const types::ParticipantId& participant_id); + /** * @brief Impose the Topic QoS that have been pre-configured for a participant. * * First, it imposes the Topic QoS configured at \c manual_topics and then the ones configured at \c participants. + * + * @param participant: The participant to impose the QoS on. */ - DDSPIPE_CORE_DllAPI utils::Heritable create_topic_for_participant_nts_( const std::shared_ptr& participant) noexcept; @@ -166,17 +216,18 @@ class DdsBridge : public Bridge utils::Heritable topic_; //! Routes associated to the Topic. - RoutesConfiguration::RoutesMap routes_; + RoutesConfiguration::RoutesMap writers_in_route_; + RoutesConfiguration::RoutesMap readers_in_route_; //! Topics that explicitally set a QoS attribute for this participant. std::vector manual_topics_; - /** - * Inside \c Tracks - * They are indexed by the Id of the participant that is source - */ + //! The tracks of the bridge indexed by the participant_id of their reader. std::map> tracks_; + //! The writers of the bridge index by their participant_id. + std::map> writers_; + //! Mutex to prevent simultaneous calls to enable and/or disable std::mutex mutex_; diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp index fcbd1d2c..b1b15e66 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp @@ -124,7 +124,7 @@ class Track */ DDSPIPE_CORE_DllAPI bool has_writer( - const types::ParticipantId& id) noexcept; + const types::ParticipantId& id) const noexcept; /** * Check if a track has at least one writer. @@ -132,7 +132,7 @@ class Track * Tread safe */ DDSPIPE_CORE_DllAPI - bool has_writers() noexcept; + bool has_writers() const noexcept; protected: @@ -209,8 +209,9 @@ class Track /** * Mutex to prevent simultaneous calls to \c enable and/or \c disable . * It manages access to variable \c enabled_ . + * It is mutable so it can be used in const methods. */ - std::mutex track_mutex_; + mutable std::mutex track_mutex_; ///// // Transmit thread part diff --git a/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp b/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp index e9e5ff01..f6f44cab 100644 --- a/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp +++ b/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp @@ -40,24 +40,52 @@ struct RoutesConfiguration : public IConfiguration // CONSTRUCTORS ///////////////////////// - DDSPIPE_CORE_DllAPI RoutesConfiguration() = default; + DDSPIPE_CORE_DllAPI + RoutesConfiguration() = default; ///////////////////////// // METHODS ///////////////////////// - DDSPIPE_CORE_DllAPI virtual bool is_valid( + DDSPIPE_CORE_DllAPI + bool is_valid( utils::Formatter& error_msg) const noexcept override; - DDSPIPE_CORE_DllAPI bool is_valid( + DDSPIPE_CORE_DllAPI + bool is_valid( utils::Formatter& error_msg, const std::map& participants) const noexcept; + /** + * @brief Returns the writers in each reader's route. + * + * It returns a map with the readers as keys and the writers in each reader's route as values. + * + * The first time this method is called, it will calculate the routes and store them in the object. + * Subsequent calls will return the stored routes. + */ + DDSPIPE_CORE_DllAPI + RoutesMap routes_of_readers( + const std::map& participant_ids) const noexcept; + + /** + * @brief Returns the readers in each writer's route. + * + * It returns a map with the writers as keys and the readers in each writer's route as values. + * + * The first time this method is called, it will calculate the routes and store them in the object. + * Subsequent calls will return the stored routes. + */ + DDSPIPE_CORE_DllAPI + RoutesMap routes_of_writers( + const std::map& participant_ids) const noexcept; + ///////////////////////// // OPERATORS ///////////////////////// - DDSPIPE_CORE_DllAPI RoutesMap operator () () const; + DDSPIPE_CORE_DllAPI + RoutesMap operator () () const; ///////////////////////// // VARIABLES diff --git a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp index 57687b03..36322db3 100644 --- a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp +++ b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp @@ -26,6 +26,7 @@ #include #include #include +#include #include @@ -260,9 +261,11 @@ class DdsPipe * @note This is the only method that adds topics to \c current_topics_ * * @param [in] topic : topic discovered + * @param [in] endpoint_kind : kind of the endpoint */ void discovered_topic_nts_( - const utils::Heritable& topic) noexcept; + const utils::Heritable& topic, + const types::EndpointKind& endpoint_kind = types::EndpointKind::reader) noexcept; /** * @brief Method called every time a new endpoint (corresponding to a server) has been discovered/updated @@ -301,9 +304,11 @@ class DdsPipe * It is created enabled if the DdsPipe is enabled. * * @param [in] topic : new topic + * @param [in] enabled : whether to enable the bridge on creation or not */ void create_new_bridge_nts_( const utils::Heritable& topic, + const types::EndpointKind endpoint_kind = types::EndpointKind::reader, bool enabled = false) noexcept; /** @@ -322,9 +327,11 @@ class DdsPipe * If the topic did not exist before, the Bridge is created. * * @param [in] topic : Topic to be enabled + * @param [in] endpoint_kind : Kind of endpoint who discovered the topic */ void activate_topic_nts_( - const utils::Heritable& topic) noexcept; + const utils::Heritable& topic, + const types::EndpointKind& endpoint_kind = types::EndpointKind::reader) noexcept; /** * @brief Disable a specific topic. diff --git a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp index d1247c95..46b81cdc 100644 --- a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp +++ b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #include @@ -30,22 +32,29 @@ DdsBridge::DdsBridge( const std::shared_ptr& thread_pool, const RoutesConfiguration& routes_config, const bool remove_unused_entities, - const std::vector& manual_topics) + const std::vector& manual_topics, + const EndpointKind& endpoint_kind) : Bridge(participants_database, payload_pool, thread_pool) , topic_(topic) , manual_topics_(manual_topics) { logDebug(DDSPIPE_DDSBRIDGE, "Creating DdsBridge " << *this << "."); - routes_ = routes_config(); + const auto& participants_repeater_map = participants_database->get_participants_repeater_map(); + + writers_in_route_ = routes_config.routes_of_readers(participants_repeater_map); + readers_in_route_ = routes_config.routes_of_writers(participants_repeater_map); if (remove_unused_entities && topic->topic_discoverer() != DEFAULT_PARTICIPANT_ID) { - create_writer(topic->topic_discoverer()); + create_endpoint(topic->topic_discoverer(), endpoint_kind); } else { - create_all_tracks_(); + for (const ParticipantId& id : participants_->get_participants_ids()) + { + create_reader_and_its_track_nts_(id); + } } logDebug(DDSPIPE_DDSBRIDGE, "DdsBridge " << *this << " created."); @@ -69,7 +78,6 @@ void DdsBridge::enable() noexcept { EPROSIMA_LOG_INFO(DDSPIPE_DDSBRIDGE, "Enabling DdsBridge for topic " << topic_ << "."); - // ATTENTION: reference needed or it would copy Track for (auto& track_it : tracks_) { track_it.second->enable(); @@ -87,7 +95,6 @@ void DdsBridge::disable() noexcept { EPROSIMA_LOG_INFO(DDSPIPE_DDSBRIDGE, "Disabling DdsBridge for topic " << topic_ << "."); - // ATTENTION: reference needed or it would copy Track for (auto& track_it : tracks_) { track_it.second->disable(); @@ -97,187 +104,185 @@ void DdsBridge::disable() noexcept } } -void DdsBridge::create_all_tracks_() +void DdsBridge::create_endpoint( + const ParticipantId& participant_id, + const EndpointKind& discovered_endpoint_kind) { std::lock_guard lock(mutex_); - const auto& ids = participants_->get_participants_ids(); + switch (discovered_endpoint_kind) + { + case EndpointKind::reader: + create_writer_and_its_tracks_nts_(participant_id); + break; + case EndpointKind::writer: + create_reader_and_its_track_nts_(participant_id); + break; + default: + logError(DDSPIPE_DDSBRIDGE, "Invalid kind " << discovered_endpoint_kind << " to create an endpoint."); + break; + } +} - // Figure out what writers need to be created - std::set writers_to_create; +void DdsBridge::remove_endpoint( + const ParticipantId& participant_id, + const EndpointKind& removed_endpoint_kind) +{ + std::lock_guard lock(mutex_); - for (const ParticipantId& id : ids) + switch (removed_endpoint_kind) { - const auto& routes_it = routes_.find(id); + case EndpointKind::reader: + remove_writer_and_its_tracks_nts_(participant_id); + break; + case EndpointKind::writer: + remove_reader_and_its_track_nts_(participant_id); + break; + default: + logError(DDSPIPE_DDSBRIDGE, "Invalid kind " << removed_endpoint_kind << " to remove an endpoint."); + break; + } +} - if (routes_it != routes_.end()) - { - // The reader has a route. Create only the writers in the route. +void DdsBridge::create_writer_and_its_tracks_nts_( + const ParticipantId& participant_id) +{ + assert(participant_id != DEFAULT_PARTICIPANT_ID); + + // Create the writer + auto writer = create_writer_nts_(participant_id); + + // Save the writer + writers_[participant_id] = writer; - // We are not going to modify the writers_ids in this route. We can get the writers_ids by reference. - const auto& writers_ids = routes_it->second; - writers_to_create.insert(writers_ids.begin(), writers_ids.end()); + // Find or create the tracks in the writer's route + for (const auto& id : readers_in_route_[participant_id]) + { + if (tracks_.count(id)) + { + // The track already exists. Add the writer. + tracks_[id]->add_writer(participant_id, writer); } else { - // The reader doesn't have a route. Create every writer (+ itself if repeater) - auto writers_ids = ids; + // The track doesn't exist. Create it. + auto reader = create_reader_nts_(id); - if (!participants_->get_participant(id)->is_repeater()) - { - // The participant is not a repeater. Do not add its writer. - writers_ids.erase(id); - } + std::map> writers; + writers[participant_id] = writer; - writers_to_create.insert(writers_ids.begin(), writers_ids.end()); + create_track_nts_(id, reader, writers); } } - - // Create the writers. - std::map> writers; - - for (const auto& id : writers_to_create) - { - std::shared_ptr participant = participants_->get_participant(id); - const auto topic = create_topic_for_participant_nts_(participant); - writers[id] = participant->create_writer(*topic); - } - - // Add the writers to the tracks they have routes for. - add_writers_to_tracks_nts_(writers); } -void DdsBridge::create_writer( +void DdsBridge::create_reader_and_its_track_nts_( const ParticipantId& participant_id) { assert(participant_id != DEFAULT_PARTICIPANT_ID); - std::lock_guard lock(mutex_); + // Create the reader + auto reader = create_reader_nts_(participant_id); - // Create the writer. - std::shared_ptr participant = participants_->get_participant(participant_id); - const auto topic = create_topic_for_participant_nts_(participant); - auto writer = participant->create_writer(*topic); + // Create the missing writers in the reader's route + for (const auto& id : writers_in_route_[participant_id]) + { + if (writers_.count(id) == 0) + { + // The writer doesn't exist. Create it. + writers_[id] = create_writer_nts_(id); + } + } - // Add the writer to the tracks it has routes for. - add_writer_to_tracks_nts_(participant_id, writer); + // Create the track + create_track_nts_(participant_id, reader, writers_); } -void DdsBridge::remove_writer( +void DdsBridge::remove_writer_and_its_tracks_nts_( const ParticipantId& participant_id) noexcept { assert(participant_id != DEFAULT_PARTICIPANT_ID); - std::lock_guard lock(mutex_); - + // Remove the writer from the tracks and remove the tracks without writers for (auto it = tracks_.cbegin(), next_it = it; it != tracks_.cend(); it = next_it) { ++next_it; - const auto& track = it->second; - // If the writer is in the track, remove it. track->remove_writer(participant_id); if (!track->has_writers()) { - // The track doesn't have any writers. Remove it. tracks_.erase(it); } } + + // Remove the writer + writers_.erase(participant_id); } -void DdsBridge::add_writer_to_tracks_nts_( - const ParticipantId& participant_id, - std::shared_ptr& writer) +void DdsBridge::remove_reader_and_its_track_nts_( + const ParticipantId& participant_id) noexcept { - // Create the writer. - std::map> writers; - writers[participant_id] = writer; + assert(participant_id != DEFAULT_PARTICIPANT_ID); + + // Remove the writers that don't belong to another track + for (const auto& writer_id : writers_in_route_[participant_id]) + { + const auto& different_track_doesnt_contain_writer = [&](const auto& track_it) + { + return track_it.first == participant_id || !track_it.second->has_writer(writer_id); + }; + + if (std::all_of(tracks_.begin(), tracks_.end(), different_track_doesnt_contain_writer)) + { + writers_.erase(writer_id); + } + } - // Add the writer to the tracks it has routes for. - add_writers_to_tracks_nts_(writers); + // Remove the track + tracks_.erase(participant_id); } -void DdsBridge::add_writers_to_tracks_nts_( +void DdsBridge::create_track_nts_( + const ParticipantId& id, + const std::shared_ptr& reader, std::map>& writers) { - // Add writers to the tracks of the readers in their route. - // If the readers in their route don't exist, create them with their tracks. - for (const ParticipantId& id : participants_->get_participants_ids()) - { - // Select the necessary writers - std::map> writers_of_track; - - const auto& routes_it = routes_.find(id); + tracks_[id] = std::make_unique( + topic_, + id, + std::move(reader), + std::move(writers), + payload_pool_, + thread_pool_); - if (routes_it != routes_.end()) - { - // The reader has a route. Add only the writers in the route. - const auto& writers_in_route = routes_it->second; + if (enabled_) + { + tracks_[id]->enable(); + } +} - for (const auto& writer_id : writers_in_route) - { - if (writers.count(writer_id) >= 1) - { - writers_of_track[writer_id] = writers[writer_id]; - } - } - } - else - { - // The reader doesn't have a route. Add every writer (+ itself if repeater) - writers_of_track = writers; - - if (!participants_->get_participant(id)->is_repeater()) - { - // The participant is not a repeater. Do not add its writer. - writers_of_track.erase(id); - } - } +std::shared_ptr DdsBridge::create_writer_nts_( + const ParticipantId& participant_id) +{ + // Find the participant and the topic + std::shared_ptr participant = participants_->get_participant(participant_id); + const auto topic = create_topic_for_participant_nts_(participant); - if (writers_of_track.size() == 0) - { - // There are no writers in the route. There is nothing to do. Skip participant. - continue; - } + // Create the writer + return participant->create_writer(*topic); +} - if (tracks_.count(id)) - { - // The track already exists. Add the writers to it. - for (const auto& writers_of_track_it : writers_of_track) - { - const auto& writer_id = writers_of_track_it.first; - const auto& writer = writers_of_track_it.second; +std::shared_ptr DdsBridge::create_reader_nts_( + const ParticipantId& participant_id) +{ + // Find the participant and the topic + std::shared_ptr participant = participants_->get_participant(participant_id); + const auto topic = create_topic_for_participant_nts_(participant); - if (!tracks_[id]->has_writer(writer_id)) - { - // Add the writer to the track - tracks_[id]->add_writer(writer_id, writer); - } - } - } - else - { - // The track doesn't exist. Create it. - std::shared_ptr participant = participants_->get_participant(id); - const auto topic = create_topic_for_participant_nts_(participant); - auto reader = participant->create_reader(*topic); - - tracks_[id] = std::make_unique( - topic_, - id, - std::move(reader), - std::move(writers_of_track), - payload_pool_, - thread_pool_); - - if (enabled_) - { - tracks_[id]->enable(); - } - } - } + // Create the reader + return participant->create_reader(*topic); } utils::Heritable DdsBridge::create_topic_for_participant_nts_( diff --git a/ddspipe_core/src/cpp/communication/dds/Track.cpp b/ddspipe_core/src/cpp/communication/dds/Track.cpp index 1f964a03..e44e531f 100644 --- a/ddspipe_core/src/cpp/communication/dds/Track.cpp +++ b/ddspipe_core/src/cpp/communication/dds/Track.cpp @@ -68,7 +68,7 @@ Track::~Track() { logDebug(DDSPIPE_TRACK, "Destroying Track " << *this << "."); - // Disable reader and writers + // Disable the track disable(); // Unset callback on the Reader (this is needed as Reader will live longer than Track) @@ -131,11 +131,7 @@ void Track::disable() noexcept // Disabling Reader reader_->disable(); - // Disabling Writers - for (auto& writer_it : writers_) - { - writer_it.second->disable(); - } + // Don't disable writers, as they may be used by other tracks } } @@ -163,16 +159,16 @@ void Track::remove_writer( } bool Track::has_writer( - const ParticipantId& id) noexcept + const ParticipantId& id) const noexcept { std::lock_guard lock(track_mutex_); return writers_.count(id) != 0; } -bool Track::has_writers() noexcept +bool Track::has_writers() const noexcept { std::lock_guard lock(track_mutex_); - return writers_.size() > 0; + return !writers_.empty(); } bool Track::should_transmit_() noexcept @@ -265,10 +261,11 @@ void Track::transmit_() noexcept { EPROSIMA_LOG_WARNING( DDSPIPE_TRACK, - "Error writting data in Track " << topic_->serialize() - << " for writer " << writer_it.second.get() - << ". Error code " << ret - << ". Skipping data for this writer and continue."); + "Error writing data in Track " << *this + << " for writer " << writer_it.second.get() + << " in participant " << writer_it.first + << ". Error code " << ret + << ". Skipping data for this writer and continue."); continue; } } diff --git a/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp b/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp index 5e3f8561..39d30008 100644 --- a/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp +++ b/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp @@ -29,12 +29,6 @@ namespace core { bool DdsPipeConfiguration::is_valid( utils::Formatter& error_msg) const noexcept { - if (remove_unused_entities && discovery_trigger != DiscoveryTrigger::READER) - { - error_msg << "A discovery-trigger different from reader is incompatible with remove-unused-entities."; - return false; - } - return routes.is_valid(error_msg) && topic_routes.is_valid(error_msg); } diff --git a/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp b/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp index 59dae06e..ecba2383 100644 --- a/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp +++ b/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp @@ -40,7 +40,7 @@ bool RoutesConfiguration::is_valid( bool RoutesConfiguration::is_valid( utils::Formatter& error_msg, - const std::map& participant_ids) const noexcept + const std::map& participant_ids) const noexcept { if (!is_valid(error_msg)) { @@ -101,6 +101,89 @@ RoutesConfiguration::RoutesMap RoutesConfiguration::operator ()() const return routes; } +RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_readers( + const std::map& participant_ids) const noexcept +{ + static RoutesConfiguration::RoutesMap readers_routes; + + if (!readers_routes.empty()) + { + return readers_routes; + } + + for (const auto& it : participant_ids) + { + const auto& reader_id = it.first; + const auto& is_repeater = it.second; + + const auto& routes_it = routes.find(reader_id); + + if (routes_it != routes.end()) + { + // The reader has a route. Add only the writers in the route. + readers_routes[reader_id] = routes_it->second; + } + else + { + // The reader doesn't have a route. Add every writer (+ itself if repeater). + for (const auto& it : participant_ids) + { + const auto& writer_id = it.first; + + if (reader_id != writer_id || is_repeater) + { + readers_routes[reader_id].insert(writer_id); + } + } + } + } + + return readers_routes; +} + +RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_writers( + const std::map& participant_ids) const noexcept +{ + static RoutesConfiguration::RoutesMap writers_routes; + + if (!writers_routes.empty()) + { + return writers_routes; + } + + for (const auto& it : participant_ids) + { + const auto& reader_id = it.first; + const auto& is_repeater = it.second; + + const auto& routes_it = routes.find(reader_id); + + if (routes_it != routes.end()) + { + // The reader has a route. Add the reader to the route of the writers in its route. + for (const auto& writer_id : routes_it->second) + { + writers_routes[writer_id].insert(reader_id); + } + } + else + { + // The reader doesn't have a route. Add the reader to the route of every writer (+ itself if repeater). + for (const auto& it : participant_ids) + { + const auto& writer_id = it.first; + + if (reader_id != writer_id || is_repeater) + { + writers_routes[writer_id].insert(reader_id); + } + } + } + } + + return writers_routes; +} + } /* namespace core */ } /* namespace ddspipe */ } /* namespace eprosima */ diff --git a/ddspipe_core/src/cpp/core/DdsPipe.cpp b/ddspipe_core/src/cpp/core/DdsPipe.cpp index 5a4eb898..5eb292b9 100644 --- a/ddspipe_core/src/cpp/core/DdsPipe.cpp +++ b/ddspipe_core/src/cpp/core/DdsPipe.cpp @@ -292,7 +292,9 @@ void DdsPipe::discovered_endpoint_nts_( } else if (is_endpoint_relevant_(endpoint)) { - discovered_topic_nts_(utils::Heritable::make_heritable(endpoint.topic)); + discovered_topic_nts_( + utils::Heritable::make_heritable(endpoint.topic), + endpoint.kind); } } @@ -320,7 +322,7 @@ void DdsPipe::removed_endpoint_nts_( if (it_bridge != bridges_.end() && endpoint.discoverer_participant_id != DEFAULT_PARTICIPANT_ID) { - it_bridge->second->remove_writer(endpoint.discoverer_participant_id); + it_bridge->second->remove_endpoint(endpoint.discoverer_participant_id, endpoint.kind); } } } @@ -389,13 +391,13 @@ bool DdsPipe::is_endpoint_relevant_( if (endpoint.active) { - // An active reader is relevant when it is the only active reader in a topic + // An active endpoint is relevant when it is the only active endpoint in a topic // with a discoverer participant id. return relevant_endpoints.size() == 1 && relevant_endpoints.count(endpoint.guid); } else { - // An inactive reader is relevant when there aren't any active readers in a topic + // An inactive endpoint is relevant when there aren't any active endpoints in a topic // with a discoverer participant id. return relevant_endpoints.size() == 0; } @@ -407,12 +409,13 @@ void DdsPipe::init_bridges_nts_( for (const auto& topic : builtin_topics) { discovered_topic_nts_(topic); - create_new_bridge_nts_(topic, false); + create_new_bridge_nts_(topic, EndpointKind::reader, false); } } void DdsPipe::discovered_topic_nts_( - const utils::Heritable& topic) noexcept + const utils::Heritable& topic, + const EndpointKind& endpoint_kind /*= EndpointKind::reader*/) noexcept { EPROSIMA_LOG_INFO(DDSPIPE, "Discovered topic: " << topic << " by: " << topic->topic_discoverer() << "."); @@ -427,13 +430,12 @@ void DdsPipe::discovered_topic_nts_( // If Pipe is enabled and topic allowed, activate it if (enabled_ && allowed_topics_->is_topic_allowed(*topic)) { - activate_topic_nts_(topic); + activate_topic_nts_(topic, endpoint_kind); } } else if (configuration_.remove_unused_entities && topic->topic_discoverer() != DEFAULT_PARTICIPANT_ID) { - // The bridge already exists. Create a writer in the participant who discovered it. - it_bridge->second->create_writer(topic->topic_discoverer()); + it_bridge->second->create_endpoint(topic->topic_discoverer(), endpoint_kind); } } @@ -485,6 +487,7 @@ void DdsPipe::removed_service_nts_( void DdsPipe::create_new_bridge_nts_( const utils::Heritable& topic, + const EndpointKind endpoint_kind /*= EndpointKind::reader*/, bool enabled /*= false*/) noexcept { EPROSIMA_LOG_INFO(DDSPIPE, "Creating Bridge for topic: " << topic << "."); @@ -501,7 +504,8 @@ void DdsPipe::create_new_bridge_nts_( thread_pool_, routes_config, configuration_.remove_unused_entities, - manual_topics); + manual_topics, + endpoint_kind); if (enabled) { @@ -528,7 +532,8 @@ void DdsPipe::create_new_service_nts_( } void DdsPipe::activate_topic_nts_( - const utils::Heritable& topic) noexcept + const utils::Heritable& topic, + const EndpointKind& endpoint_kind /*= EndpointKind::reader*/) noexcept { EPROSIMA_LOG_INFO(DDSPIPE, "Activating topic: " << topic << "."); @@ -541,7 +546,7 @@ void DdsPipe::activate_topic_nts_( if (it_bridge == bridges_.end()) { // The Bridge did not exist - create_new_bridge_nts_(topic, true); + create_new_bridge_nts_(topic, endpoint_kind, true); } else { diff --git a/ddspipe_participants/src/cpp/writer/dynamic_types/SchemaWriter.cpp b/ddspipe_participants/src/cpp/writer/dynamic_types/SchemaWriter.cpp index a678610d..f1e5a025 100644 --- a/ddspipe_participants/src/cpp/writer/dynamic_types/SchemaWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/dynamic_types/SchemaWriter.cpp @@ -64,7 +64,7 @@ utils::ReturnCode SchemaWriter::write_nts_( { EPROSIMA_LOG_WARNING( DDSPIPE_SCHEMA_WRITER, - "Error writting data in topic " << topic_ << " : <" << e.what() << ">."); + "Error writing data in topic " << topic_ << " : <" << e.what() << ">."); return utils::ReturnCode::RETCODE_ERROR; }