Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[22022] Fix assertion deleting a Datawriter configured with persistent and flow controller #5364

Merged
merged 4 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
bool found = false, found_in_users = false;
Endpoint* p_endpoint = nullptr;
BaseReader* reader = nullptr;
BaseWriter* writer {nullptr};

if (endpoint.entityId.is_writer())
{
Expand All @@ -1970,6 +1971,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it
{
writer = *wit;
m_userWriterList.erase(wit);
found_in_users = true;
break;
Expand All @@ -1980,6 +1982,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it
{
writer = *wit;
p_endpoint = *wit;
m_allWriterList.erase(wit);
found = true;
Expand Down Expand Up @@ -2069,6 +2072,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
reader->local_actions_on_reader_removed();
}
else if (writer)
{
writer->local_actions_on_writer_removed();
}
delete(p_endpoint);
return true;
}
Expand Down Expand Up @@ -2160,6 +2167,10 @@ void RTPSParticipantImpl::deleteAllUserEndpoints()
{
static_cast<BaseReader*>(endpoint)->local_actions_on_reader_removed();
}
else if (WRITER == kind)
{
static_cast<BaseWriter*>(endpoint)->local_actions_on_writer_removed();
}

// remove the endpoints
delete(endpoint);
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/writer/BaseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ void BaseWriter::init(
}
}

void BaseWriter::deinit()
void BaseWriter::local_actions_on_writer_removed()
{
// First, unregister changes from FlowController. This action must be protected.
{
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/writer/BaseWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class BaseWriter

bool is_async() const final;

virtual void local_actions_on_writer_removed();

#ifdef FASTDDS_STATISTICS

bool add_statistics_listener(
Expand Down Expand Up @@ -354,8 +356,6 @@ class BaseWriter
void init(
const WriterAttributes& att);

void deinit();

void add_guid(
LocatorSelectorSender& locator_selector,
const GUID_t& remote_guid);
Expand Down
1 change: 0 additions & 1 deletion src/cpp/rtps/writer/StatefulPersistentWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ StatefulPersistentWriter::StatefulPersistentWriter(

StatefulPersistentWriter::~StatefulPersistentWriter()
{
deinit();
}

/*
Expand Down
7 changes: 6 additions & 1 deletion src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ void StatefulWriter::init(
StatefulWriter::~StatefulWriter()
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter destructor");
}

void StatefulWriter::local_actions_on_writer_removed()
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter local_actions_on_writer_removed");

// Disable timed events, because their callbacks use cache changes
if (disable_positive_acks_)
Expand All @@ -267,7 +272,7 @@ StatefulWriter::~StatefulWriter()
}

// This must be the next action, as it frees CacheChange_t from the async thread.
deinit();
BaseWriter::local_actions_on_writer_removed();

// Stop all active proxies and pass them to the pool
{
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/writer/StatefulWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class StatefulWriter : public BaseWriter

virtual ~StatefulWriter();

void local_actions_on_writer_removed() override;

//vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv

bool matched_reader_add_edp(
Expand Down
1 change: 0 additions & 1 deletion src/cpp/rtps/writer/StatelessPersistentWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ StatelessPersistentWriter::StatelessPersistentWriter(

StatelessPersistentWriter::~StatelessPersistentWriter()
{
deinit();
}

/*
Expand Down
7 changes: 6 additions & 1 deletion src/cpp/rtps/writer/StatelessWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ void StatelessWriter::init(
StatelessWriter::~StatelessWriter()
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "StatelessWriter destructor"; );
deinit();
}

void StatelessWriter::local_actions_on_writer_removed()
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "StatelessWriter local_actions_on_writer_removed"; );
BaseWriter::local_actions_on_writer_removed();
}

void StatelessWriter::get_builtin_guid()
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/writer/StatelessWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class StatelessWriter : public BaseWriter

virtual ~StatelessWriter();

void local_actions_on_writer_removed() override;

//vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv

bool matched_reader_add_edp(
Expand Down
55 changes: 55 additions & 0 deletions test/blackbox/common/BlackboxTestsPubSubFlowControllers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cstdio>
#include <thread>

#include <gtest/gtest.h>
Expand Down Expand Up @@ -228,6 +229,60 @@ TEST_P(PubSubFlowControllers, AsyncMultipleWritersFlowController64kb)
entities.block_for_all();
}

TEST_P(PubSubFlowControllers, AsyncPubSubAsReliableData64kbWithParticipantFlowControlAndPersistence)
{
PubSubReader<Data64kbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data64kbPubSubType> writer(TEST_TOPIC_NAME);

// Get info about current test
auto info = ::testing::UnitTest::GetInstance()->current_test_info();

// Create DB file name from test name and PID
std::ostringstream ss;
std::string test_case_name(info->test_case_name());
std::string test_name(info->name());
ss <<
test_case_name.replace(test_case_name.find_first_of('/'), 1, "_") << "_" <<
test_name.replace(test_name.find_first_of('/'), 1, "_") << "_" << GET_PID() << ".db";
std::string db_file_name = {ss.str()};

reader.history_depth(3).
reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS).init();

ASSERT_TRUE(reader.isInitialized());

uint32_t bytesPerPeriod = 65000;
uint32_t periodInMs = 500;
writer.add_flow_controller_descriptor_to_pparams(scheduler_policy_, bytesPerPeriod, periodInMs);

writer.history_depth(3)
.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE)
.make_transient(db_file_name, "33.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.init();

ASSERT_TRUE(writer.isInitialized());

// Because its volatile the durability
// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_data64kb_data_generator(3);

reader.startReception(data);

// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();

reader.destroy();
writer.destroy();
std::remove(db_file_name.c_str());
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
1 change: 1 addition & 0 deletions test/blackbox/common/DDSBlackboxTestsPersistence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cstdio>
#include <thread>

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
Expand Down
Loading