diff --git a/src/cpp/statistics/fastdds/domain/DomainParticipantImpl.cpp b/src/cpp/statistics/fastdds/domain/DomainParticipantImpl.cpp index 701cb59365b..d22d53ec9ad 100644 --- a/src/cpp/statistics/fastdds/domain/DomainParticipantImpl.cpp +++ b/src/cpp/statistics/fastdds/domain/DomainParticipantImpl.cpp @@ -222,6 +222,88 @@ void DomainParticipantImpl::disable() efd::DomainParticipantImpl::disable(); } +<<<<<<< HEAD +======= +ReturnCode_t DomainParticipantImpl::delete_contained_entities() +{ + delete_statistics_builtin_entities(); + return efd::DomainParticipantImpl::delete_contained_entities(); +} + +ReturnCode_t DomainParticipantImpl::enable_monitor_service() +{ + ReturnCode_t ret = efd::RETCODE_OK; + + if (!rtps_participant_->is_monitor_service_created()) + { + status_observer_.store(rtps_participant_->create_monitor_service(*this)); + } + + if (!rtps_participant_->enable_monitor_service() || + nullptr == status_observer_) + { + ret = efd::RETCODE_ERROR; + } + + return ret; +} + +ReturnCode_t DomainParticipantImpl::disable_monitor_service() +{ + ReturnCode_t ret = efd::RETCODE_OK; + + if (!rtps_participant_->is_monitor_service_created() || + !rtps_participant_->disable_monitor_service()) + { + ret = efd::RETCODE_NOT_ENABLED; + } + + return ret; +} + +ReturnCode_t DomainParticipantImpl::fill_discovery_data_from_cdr_message( + fastrtps::rtps::ParticipantProxyData& data, + fastdds::statistics::MonitorServiceStatusData& msg) +{ + ReturnCode_t ret{efd::RETCODE_OK}; + + if (!get_rtps_participant()->fill_discovery_data_from_cdr_message(data, msg)) + { + ret = efd::RETCODE_ERROR; + } + + return ret; +} + +ReturnCode_t DomainParticipantImpl::fill_discovery_data_from_cdr_message( + fastrtps::rtps::WriterProxyData& data, + fastdds::statistics::MonitorServiceStatusData& msg) +{ + ReturnCode_t ret{efd::RETCODE_OK}; + + if (!get_rtps_participant()->fill_discovery_data_from_cdr_message(data, msg)) + { + ret = efd::RETCODE_ERROR; + } + + return ret; +} + +ReturnCode_t DomainParticipantImpl::fill_discovery_data_from_cdr_message( + fastrtps::rtps::ReaderProxyData& data, + fastdds::statistics::MonitorServiceStatusData& msg) +{ + ReturnCode_t ret{efd::RETCODE_OK}; + + if (!get_rtps_participant()->fill_discovery_data_from_cdr_message(data, msg)) + { + ret = efd::RETCODE_ERROR; + } + + return ret; +} + +>>>>>>> 0d62335cc (Properly delete builtin statistics writers upon `delete_contained_entities()` (#4891)) efd::PublisherImpl* DomainParticipantImpl::create_publisher_impl( const efd::PublisherQos& qos, efd::PublisherListener* listener) diff --git a/src/cpp/statistics/fastdds/domain/DomainParticipantImpl.hpp b/src/cpp/statistics/fastdds/domain/DomainParticipantImpl.hpp index a2de962130f..1d96fff57e3 100644 --- a/src/cpp/statistics/fastdds/domain/DomainParticipantImpl.hpp +++ b/src/cpp/statistics/fastdds/domain/DomainParticipantImpl.hpp @@ -99,6 +99,89 @@ class DomainParticipantImpl : public efd::DomainParticipantImpl static bool is_statistics_topic_name( const std::string& topic_name) noexcept; +<<<<<<< HEAD +======= + /** + * @brief This override calls the parent method and returns builtin publishers to nullptr + * + * @return RETCODE_OK if successful + * @note This method is meant to be used followed by a deletion of the participant as it implies + * the deletion of the builtin statistics publishers. + */ + efd::ReturnCode_t delete_contained_entities() override; + + /** + * Enables the monitor service in this DomainParticipant. + * + * @return RETCODE_OK if the monitor service could be correctly enabled. + * @return RETCODE_ERROR if the monitor service could not be enabled properly. + * @return RETCODE_UNSUPPORTED if FASTDDS_STATISTICS is not enabled. + * + */ + efd::ReturnCode_t enable_monitor_service(); + + /** + * Disables the monitor service in this DomainParticipant. Does nothing if the service was not enabled before. + * + * @return RETCODE_OK if the monitor service could be correctly disabled. + * @return RETCODE_NOT_ENABLED if the monitor service was not previously enabled. + * @return RETCODE_ERROR if the service could not be properly disabled. + * @return RETCODE_UNSUPPORTED if FASTDDS_STATISTICS is not enabled. + * + */ + efd::ReturnCode_t disable_monitor_service(); + + /** + * fills in the ParticipantProxyData from a MonitorService Message + * + * @param [out] data Proxy to fill + * @param [in] msg MonitorService Message to get the proxy information from. + * + * @return RETCODE_OK if the operation succeeds. + * @return RETCODE_ERROR if the operation fails. + */ + efd::ReturnCode_t fill_discovery_data_from_cdr_message( + fastrtps::rtps::ParticipantProxyData& data, + fastdds::statistics::MonitorServiceStatusData& msg); + + /** + * fills in the WriterProxyData from a MonitorService Message + * + * @param [out] data Proxy to fill. + * @param [in] msg MonitorService Message to get the proxy information from. + * + * @return RETCODE_OK if the operation succeeds. + * @return RETCODE_ERROR if the operation fails. + */ + efd::ReturnCode_t fill_discovery_data_from_cdr_message( + fastrtps::rtps::WriterProxyData& data, + fastdds::statistics::MonitorServiceStatusData& msg); + + /** + * fills in the ReaderProxyData from a MonitorService Message + * + * @param [out] data Proxy to fill. + * @param [in] msg MonitorService Message to get the proxy information from. + * + * @return RETCODE_OK if the operation succeeds. + * @return RETCODE_ERROR if the operation fails. + */ + efd::ReturnCode_t fill_discovery_data_from_cdr_message( + fastrtps::rtps::ReaderProxyData& data, + fastdds::statistics::MonitorServiceStatusData& msg); + + /** + * Gets the status observer for that entity + * + * @return status observer + */ + + const rtps::IStatusObserver* get_status_observer() + { + return status_observer_.load(); + } + +>>>>>>> 0d62335cc (Properly delete builtin statistics writers upon `delete_contained_entities()` (#4891)) protected: /** diff --git a/test/blackbox/common/DDSBlackboxTestsStatistics.cpp b/test/blackbox/common/DDSBlackboxTestsStatistics.cpp index a9dcd8ebcbb..8e7c0018a22 100644 --- a/test/blackbox/common/DDSBlackboxTestsStatistics.cpp +++ b/test/blackbox/common/DDSBlackboxTestsStatistics.cpp @@ -686,3 +686,97 @@ TEST(DDSStatistics, discovery_topic_physical_data_delete_physical_properties) test_discovery_topic_physical_data(DiscoveryTopicPhysicalDataTest::NO_PHYSICAL_DATA); #endif // FASTDDS_STATISTICS } + +class CustomStatisticsParticipantSubscriber : public PubSubReader +{ +public: + + CustomStatisticsParticipantSubscriber( + const std::string& topic_name) + : PubSubReader(topic_name) + { + } + + void destroy() override + { + participant_->delete_contained_entities(); + DomainParticipantFactory::get_instance()->delete_participant(participant_); + participant_ = nullptr; + } + +}; + +// Regression test for #20816. When an application is terminated with delete_contained_entities() +// it has to properly finish. The test creates a number of participants with some of them sharing the same topic. +// Each participant asynchronously sends and receive a number of samples. In the readers, when a minumm number of samples +// is received the destroy() method is called (abruptly). The test checks that the application finishes successfully +TEST(DDSStatistics, correct_deletion_upon_delete_contained_entities) +{ +#ifdef FASTDDS_STATISTICS + + //! Set environment variable and create participant using Qos set by code + const char* value = "HISTORY_LATENCY_TOPIC;NETWORK_LATENCY_TOPIC;" + "PUBLICATION_THROUGHPUT_TOPIC;SUBSCRIPTION_THROUGHPUT_TOPIC;RTPS_SENT_TOPIC;" + "RTPS_LOST_TOPIC;HEARTBEAT_COUNT_TOPIC;ACKNACK_COUNT_TOPIC;NACKFRAG_COUNT_TOPIC;" + "GAP_COUNT_TOPIC;DATA_COUNT_TOPIC;RESENT_DATAS_TOPIC;SAMPLE_DATAS_TOPIC;" + "PDP_PACKETS_TOPIC;EDP_PACKETS_TOPIC;DISCOVERY_TOPIC;PHYSICAL_DATA_TOPIC;"; + + #ifdef _WIN32 + ASSERT_EQ(0, _putenv_s("FASTDDS_STATISTICS", value)); + #else + ASSERT_EQ(0, setenv("FASTDDS_STATISTICS", value, 1)); + #endif // ifdef _WIN32 + + size_t n_participants = 5; + size_t n_participants_same_topic = 2; + + std::vector>> writers; + std::vector> readers; + + readers.reserve(n_participants); + writers.reserve(n_participants); + + std::vector> threads; + threads.reserve(2 * n_participants); + + for (size_t i = 0; i < n_participants; ++i) + { + size_t topic_number = (i < n_participants_same_topic) ? 0 : i; + + auto writer = std::make_shared>(TEST_TOPIC_NAME + std::to_string( + topic_number)); + auto reader = + std::make_shared(TEST_TOPIC_NAME + std::to_string(topic_number)); + + std::shared_ptr> data = std::make_shared>(default_helloworld_data_generator( + 10)); + + threads.emplace_back(std::make_shared([reader, data]() + { + reader->init(); + ASSERT_TRUE(reader->isInitialized()); + reader->startReception(data->size()); + reader->block_for_at_least(3); + reader->destroy(); + })); + + threads.emplace_back(std::make_shared([writer, data]() + { + writer->init(); + ASSERT_TRUE(writer->isInitialized()); + writer->wait_discovery(); + writer->send(*data, 10); + writer->destroy(); + })); + + writers.push_back(writer); + readers.push_back(reader); + } + + for (auto& thread : threads) + { + thread->join(); + } + +#endif // FASTDDS_STATISTICS +}