Skip to content

Commit

Permalink
Properly delete builtin statistics writers upon `delete_contained_ent…
Browse files Browse the repository at this point in the history
…ities()` (#4891)

* Refs #20816: Add BB test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20816: Fix

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20816: Apply Edu's suggestion

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
(cherry picked from commit 0d62335)

# Conflicts:
#	src/cpp/statistics/fastdds/domain/DomainParticipantImpl.cpp
#	src/cpp/statistics/fastdds/domain/DomainParticipantImpl.hpp
  • Loading branch information
Mario-DL authored and mergify[bot] committed Jun 8, 2024
1 parent 0103a39 commit 548a445
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 0 deletions.
82 changes: 82 additions & 0 deletions src/cpp/statistics/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,88 @@ void DomainParticipantImpl::disable()
efd::DomainParticipantImpl::disable();
}

<<<<<<< HEAD
=======
ReturnCode_t DomainParticipantImpl::delete_contained_entities()
{
delete_statistics_builtin_entities();
return efd::DomainParticipantImpl::delete_contained_entities();
}

ReturnCode_t DomainParticipantImpl::enable_monitor_service()
{
ReturnCode_t ret = efd::RETCODE_OK;

if (!rtps_participant_->is_monitor_service_created())
{
status_observer_.store(rtps_participant_->create_monitor_service(*this));
}

if (!rtps_participant_->enable_monitor_service() ||
nullptr == status_observer_)
{
ret = efd::RETCODE_ERROR;
}

return ret;
}

ReturnCode_t DomainParticipantImpl::disable_monitor_service()
{
ReturnCode_t ret = efd::RETCODE_OK;

if (!rtps_participant_->is_monitor_service_created() ||
!rtps_participant_->disable_monitor_service())
{
ret = efd::RETCODE_NOT_ENABLED;
}

return ret;
}

ReturnCode_t DomainParticipantImpl::fill_discovery_data_from_cdr_message(
fastrtps::rtps::ParticipantProxyData& data,
fastdds::statistics::MonitorServiceStatusData& msg)
{
ReturnCode_t ret{efd::RETCODE_OK};

if (!get_rtps_participant()->fill_discovery_data_from_cdr_message(data, msg))
{
ret = efd::RETCODE_ERROR;
}

return ret;
}

ReturnCode_t DomainParticipantImpl::fill_discovery_data_from_cdr_message(
fastrtps::rtps::WriterProxyData& data,
fastdds::statistics::MonitorServiceStatusData& msg)
{
ReturnCode_t ret{efd::RETCODE_OK};

if (!get_rtps_participant()->fill_discovery_data_from_cdr_message(data, msg))
{
ret = efd::RETCODE_ERROR;
}

return ret;
}

ReturnCode_t DomainParticipantImpl::fill_discovery_data_from_cdr_message(
fastrtps::rtps::ReaderProxyData& data,
fastdds::statistics::MonitorServiceStatusData& msg)
{
ReturnCode_t ret{efd::RETCODE_OK};

if (!get_rtps_participant()->fill_discovery_data_from_cdr_message(data, msg))
{
ret = efd::RETCODE_ERROR;
}

return ret;
}

>>>>>>> 0d62335cc (Properly delete builtin statistics writers upon `delete_contained_entities()` (#4891))
efd::PublisherImpl* DomainParticipantImpl::create_publisher_impl(
const efd::PublisherQos& qos,
efd::PublisherListener* listener)
Expand Down
83 changes: 83 additions & 0 deletions src/cpp/statistics/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,89 @@ class DomainParticipantImpl : public efd::DomainParticipantImpl
static bool is_statistics_topic_name(
const std::string& topic_name) noexcept;

<<<<<<< HEAD
=======
/**
* @brief This override calls the parent method and returns builtin publishers to nullptr
*
* @return RETCODE_OK if successful
* @note This method is meant to be used followed by a deletion of the participant as it implies
* the deletion of the builtin statistics publishers.
*/
efd::ReturnCode_t delete_contained_entities() override;

/**
* Enables the monitor service in this DomainParticipant.
*
* @return RETCODE_OK if the monitor service could be correctly enabled.
* @return RETCODE_ERROR if the monitor service could not be enabled properly.
* @return RETCODE_UNSUPPORTED if FASTDDS_STATISTICS is not enabled.
*
*/
efd::ReturnCode_t enable_monitor_service();

/**
* Disables the monitor service in this DomainParticipant. Does nothing if the service was not enabled before.
*
* @return RETCODE_OK if the monitor service could be correctly disabled.
* @return RETCODE_NOT_ENABLED if the monitor service was not previously enabled.
* @return RETCODE_ERROR if the service could not be properly disabled.
* @return RETCODE_UNSUPPORTED if FASTDDS_STATISTICS is not enabled.
*
*/
efd::ReturnCode_t disable_monitor_service();

/**
* fills in the ParticipantProxyData from a MonitorService Message
*
* @param [out] data Proxy to fill
* @param [in] msg MonitorService Message to get the proxy information from.
*
* @return RETCODE_OK if the operation succeeds.
* @return RETCODE_ERROR if the operation fails.
*/
efd::ReturnCode_t fill_discovery_data_from_cdr_message(
fastrtps::rtps::ParticipantProxyData& data,
fastdds::statistics::MonitorServiceStatusData& msg);

/**
* fills in the WriterProxyData from a MonitorService Message
*
* @param [out] data Proxy to fill.
* @param [in] msg MonitorService Message to get the proxy information from.
*
* @return RETCODE_OK if the operation succeeds.
* @return RETCODE_ERROR if the operation fails.
*/
efd::ReturnCode_t fill_discovery_data_from_cdr_message(
fastrtps::rtps::WriterProxyData& data,
fastdds::statistics::MonitorServiceStatusData& msg);

/**
* fills in the ReaderProxyData from a MonitorService Message
*
* @param [out] data Proxy to fill.
* @param [in] msg MonitorService Message to get the proxy information from.
*
* @return RETCODE_OK if the operation succeeds.
* @return RETCODE_ERROR if the operation fails.
*/
efd::ReturnCode_t fill_discovery_data_from_cdr_message(
fastrtps::rtps::ReaderProxyData& data,
fastdds::statistics::MonitorServiceStatusData& msg);

/**
* Gets the status observer for that entity
*
* @return status observer
*/

const rtps::IStatusObserver* get_status_observer()
{
return status_observer_.load();
}

>>>>>>> 0d62335cc (Properly delete builtin statistics writers upon `delete_contained_entities()` (#4891))
protected:

/**
Expand Down
94 changes: 94 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,3 +686,97 @@ TEST(DDSStatistics, discovery_topic_physical_data_delete_physical_properties)
test_discovery_topic_physical_data(DiscoveryTopicPhysicalDataTest::NO_PHYSICAL_DATA);
#endif // FASTDDS_STATISTICS
}

class CustomStatisticsParticipantSubscriber : public PubSubReader<HelloWorldPubSubType>
{
public:

CustomStatisticsParticipantSubscriber(
const std::string& topic_name)
: PubSubReader<HelloWorldPubSubType>(topic_name)
{
}

void destroy() override
{
participant_->delete_contained_entities();
DomainParticipantFactory::get_instance()->delete_participant(participant_);
participant_ = nullptr;
}

};

// Regression test for #20816. When an application is terminated with delete_contained_entities()
// it has to properly finish. The test creates a number of participants with some of them sharing the same topic.
// Each participant asynchronously sends and receive a number of samples. In the readers, when a minumm number of samples
// is received the destroy() method is called (abruptly). The test checks that the application finishes successfully
TEST(DDSStatistics, correct_deletion_upon_delete_contained_entities)
{
#ifdef FASTDDS_STATISTICS

//! Set environment variable and create participant using Qos set by code
const char* value = "HISTORY_LATENCY_TOPIC;NETWORK_LATENCY_TOPIC;"
"PUBLICATION_THROUGHPUT_TOPIC;SUBSCRIPTION_THROUGHPUT_TOPIC;RTPS_SENT_TOPIC;"
"RTPS_LOST_TOPIC;HEARTBEAT_COUNT_TOPIC;ACKNACK_COUNT_TOPIC;NACKFRAG_COUNT_TOPIC;"
"GAP_COUNT_TOPIC;DATA_COUNT_TOPIC;RESENT_DATAS_TOPIC;SAMPLE_DATAS_TOPIC;"
"PDP_PACKETS_TOPIC;EDP_PACKETS_TOPIC;DISCOVERY_TOPIC;PHYSICAL_DATA_TOPIC;";

#ifdef _WIN32
ASSERT_EQ(0, _putenv_s("FASTDDS_STATISTICS", value));
#else
ASSERT_EQ(0, setenv("FASTDDS_STATISTICS", value, 1));
#endif // ifdef _WIN32

size_t n_participants = 5;
size_t n_participants_same_topic = 2;

std::vector<std::shared_ptr<PubSubWriter<HelloWorldPubSubType>>> writers;
std::vector<std::shared_ptr<CustomStatisticsParticipantSubscriber>> readers;

readers.reserve(n_participants);
writers.reserve(n_participants);

std::vector<std::shared_ptr<std::thread>> threads;
threads.reserve(2 * n_participants);

for (size_t i = 0; i < n_participants; ++i)
{
size_t topic_number = (i < n_participants_same_topic) ? 0 : i;

auto writer = std::make_shared<PubSubWriter<HelloWorldPubSubType>>(TEST_TOPIC_NAME + std::to_string(
topic_number));
auto reader =
std::make_shared<CustomStatisticsParticipantSubscriber>(TEST_TOPIC_NAME + std::to_string(topic_number));

std::shared_ptr<std::list<HelloWorld>> data = std::make_shared<std::list<HelloWorld>>(default_helloworld_data_generator(
10));

threads.emplace_back(std::make_shared<std::thread>([reader, data]()
{
reader->init();
ASSERT_TRUE(reader->isInitialized());
reader->startReception(data->size());
reader->block_for_at_least(3);
reader->destroy();
}));

threads.emplace_back(std::make_shared<std::thread>([writer, data]()
{
writer->init();
ASSERT_TRUE(writer->isInitialized());
writer->wait_discovery();
writer->send(*data, 10);
writer->destroy();
}));

writers.push_back(writer);
readers.push_back(reader);
}

for (auto& thread : threads)
{
thread->join();
}

#endif // FASTDDS_STATISTICS
}

0 comments on commit 548a445

Please sign in to comment.