Skip to content

Commit

Permalink
Fix destruction data-race on participant removal in intra-process (#5034
Browse files Browse the repository at this point in the history
)

* Refs #21293: Add BB test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: Reinforce test to fail more frequently

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: Add RefCountedPointer.hpp to utils

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: Add unittests for RefCountedPointer

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: LocalReaderPointer.hpp

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: BaseReader aggregates LocalReaderPointer

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: ReaderLocator aggregates LocalReaderPointer

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: RTPSDomainImpl::find_local_reader returns a sared_ptr<LocalReaderPointer> and properly calls local_actions_on_reader_removed()

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: RTPSWriters properly using LocalReaderPointer::Instance when accessing local reader

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: Linter

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: Fix windows warnings

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: Address Miguel's review

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: Apply last comment

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21293: NIT

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
(cherry picked from commit 456e45f)
  • Loading branch information
Mario-DL authored and mergify[bot] committed Oct 29, 2024
1 parent c8c8020 commit 8d8f1ca
Show file tree
Hide file tree
Showing 17 changed files with 623 additions and 43 deletions.
5 changes: 3 additions & 2 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <rtps/network/utils/external_locators.hpp>
#include <rtps/participant/RTPSParticipantImpl.hpp>
#include <rtps/reader/BaseReader.hpp>
#include <rtps/reader/LocalReaderPointer.hpp>
#include <rtps/RTPSDomainImpl.hpp>
#include <rtps/transport/TCPv4Transport.h>
#include <rtps/transport/TCPv6Transport.h>
Expand Down Expand Up @@ -690,7 +691,7 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant(
return nullptr;
}

BaseReader* RTPSDomainImpl::find_local_reader(
std::shared_ptr<LocalReaderPointer> RTPSDomainImpl::find_local_reader(
const GUID_t& reader_guid)
{
auto instance = get_instance();
Expand All @@ -704,7 +705,7 @@ BaseReader* RTPSDomainImpl::find_local_reader(
}
}

return nullptr;
return std::shared_ptr<LocalReaderPointer>(nullptr);
}

BaseWriter* RTPSDomainImpl::find_local_writer(
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/RTPSDomainImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fastdds/rtps/writer/RTPSWriter.hpp>

#include <rtps/reader/BaseReader.hpp>
#include <rtps/reader/LocalReaderPointer.hpp>
#include <rtps/writer/BaseWriter.hpp>
#include <utils/shared_memory/BoostAtExitRegistry.hpp>
#include <utils/SystemInfo.hpp>
Expand Down Expand Up @@ -175,7 +176,7 @@ class RTPSDomainImpl
*
* @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
*/
static BaseReader* find_local_reader(
static std::shared_ptr<LocalReaderPointer> find_local_reader(
const GUID_t& reader_guid);

/**
Expand Down
25 changes: 20 additions & 5 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ bool RTPSParticipantImpl::createReader(
return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback);
}

BaseReader* RTPSParticipantImpl::find_local_reader(
std::shared_ptr<LocalReaderPointer> RTPSParticipantImpl::find_local_reader(
const GUID_t& reader_guid)
{
shared_lock<shared_mutex> _(endpoints_list_mutex);
Expand All @@ -1359,11 +1359,11 @@ BaseReader* RTPSParticipantImpl::find_local_reader(
{
if (reader->getGuid() == reader_guid)
{
return reader;
return reader->get_local_pointer();
}
}

return nullptr;
return std::shared_ptr<LocalReaderPointer>();
}

BaseWriter* RTPSParticipantImpl::find_local_writer(
Expand Down Expand Up @@ -1960,6 +1960,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(

bool found = false, found_in_users = false;
Endpoint* p_endpoint = nullptr;
BaseReader* reader = nullptr;

if (endpoint.entityId.is_writer())
{
Expand Down Expand Up @@ -1994,6 +1995,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it
{
reader = *rit;
m_userReaderList.erase(rit);
found_in_users = true;
break;
Expand All @@ -2004,6 +2006,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it
{
reader = *rit;
p_endpoint = *rit;
m_allReaderList.erase(rit);
found = true;
Expand Down Expand Up @@ -2062,6 +2065,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
#endif // if HAVE_SECURITY
}

if (reader)
{
reader->local_actions_on_reader_removed();
}
delete(p_endpoint);
return true;
}
Expand Down Expand Up @@ -2149,6 +2156,11 @@ void RTPSParticipantImpl::deleteAllUserEndpoints()
}
#endif // if HAVE_SECURITY

if (kind == READER)
{
static_cast<BaseReader*>(endpoint)->local_actions_on_reader_removed();
}

// remove the endpoints
delete(endpoint);
}
Expand Down Expand Up @@ -2837,8 +2849,11 @@ bool RTPSParticipantImpl::register_in_reader(
}
else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId))
{
BaseReader* reader = find_local_reader(reader_guid);
res = reader->add_statistics_listener(listener);
LocalReaderPointer::Instance local_reader(find_local_reader(reader_guid));
if (local_reader)
{
res = local_reader->add_statistics_listener(listener);
}
}

return res;
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/participant/RTPSParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include <rtps/messages/SendBuffersManager.hpp>
#include <rtps/network/NetworkFactory.hpp>
#include <rtps/network/ReceiverResource.h>
#include <rtps/reader/LocalReaderPointer.hpp>
#include <rtps/resources/ResourceEvent.h>
#include <statistics/rtps/monitor-service/interfaces/IConnectionsObserver.hpp>
#include <statistics/rtps/monitor-service/interfaces/IConnectionsQueryable.hpp>
Expand Down Expand Up @@ -477,7 +478,7 @@ class RTPSParticipantImpl
/***
* @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
*/
BaseReader* find_local_reader(
std::shared_ptr<LocalReaderPointer> find_local_reader(
const GUID_t& reader_guid);

/***
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/reader/BaseReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ BaseReader::BaseReader(
setup_datasharing(att);
}

void BaseReader::local_actions_on_reader_removed()
{
local_ptr_->deactivate();
}

BaseReader::~BaseReader()
{
EPROSIMA_LOG_INFO(RTPS_READER, "Removing reader " << this->getGuid().entityId);
Expand Down Expand Up @@ -272,6 +277,11 @@ void BaseReader::allow_unknown_writers()
accept_messages_from_unkown_writers_ = true;
}

std::shared_ptr<LocalReaderPointer> BaseReader::get_local_pointer()
{
return local_ptr_;
}

bool BaseReader::reserve_cache(
uint32_t cdr_payload_size,
fastdds::rtps::CacheChange_t*& change)
Expand Down Expand Up @@ -501,6 +511,8 @@ void BaseReader::init(
fixed_payload_size_ = history_->m_att.payloadMaxSize;
}

local_ptr_ = std::make_shared<LocalReaderPointer>(this);

EPROSIMA_LOG_INFO(RTPS_READER, "RTPSReader created correctly");
}

Expand Down
18 changes: 18 additions & 0 deletions src/cpp/rtps/reader/BaseReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#include <fastdds/statistics/rtps/StatisticsCommon.hpp>
#include <fastdds/utils/TimedConditionVariable.hpp>

#include <rtps/reader/LocalReaderPointer.hpp>

namespace eprosima {

namespace fastdds {
Expand Down Expand Up @@ -163,6 +165,14 @@ class BaseReader
return datasharing_listener_;
}

/**
* @brief Retrieves the local pointer to this reader
* to be used by other local entities.
*
* @return Local pointer to this reader.
*/
std::shared_ptr<LocalReaderPointer> get_local_pointer();

/**
* @brief Reserve a CacheChange_t.
*
Expand Down Expand Up @@ -296,6 +306,11 @@ class BaseReader
const fastdds::rtps::SequenceNumberSet_t& gapList,
VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0;

/**
* @brief Waits for not being referenced/used by any other entity.
*/
virtual void local_actions_on_reader_removed();

#ifdef FASTDDS_STATISTICS

bool add_statistics_listener(
Expand Down Expand Up @@ -455,6 +470,9 @@ class BaseReader
/// Trusted writer (for Builtin)
fastdds::rtps::EntityId_t trusted_writer_entity_id_;

/// RefCountedPointer of this instance.
std::shared_ptr<LocalReaderPointer> local_ptr_;

private:

/**
Expand Down
36 changes: 36 additions & 0 deletions src/cpp/rtps/reader/LocalReaderPointer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file LocalReaderPointer.hpp
*/

#ifndef FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP
#define FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP

#include <utils/RefCountedPointer.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

class BaseReader;

using LocalReaderPointer = RefCountedPointer<BaseReader>;

} // namespace rtps
} // namespace fastdds
} // namespace eprosima

#endif // FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP
22 changes: 10 additions & 12 deletions src/cpp/rtps/writer/ReaderLocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ReaderLocator::ReaderLocator(
, async_locator_info_(max_unicast_locators, max_multicast_locators)
, expects_inline_qos_(false)
, is_local_reader_(false)
, local_reader_(nullptr)
, local_reader_()
, guid_prefix_as_vector_(1u)
, guid_as_vector_(1u)
, datasharing_notifier_(nullptr)
Expand Down Expand Up @@ -84,7 +84,7 @@ bool ReaderLocator::start(

is_local_reader_ = RTPSDomainImpl::should_intraprocess_between(owner_->getGuid(), remote_guid);
is_datasharing &= !is_local_reader_;
local_reader_ = nullptr;
local_reader_.reset();

if (!is_local_reader_ && !is_datasharing)
{
Expand Down Expand Up @@ -177,7 +177,7 @@ void ReaderLocator::stop()
guid_prefix_as_vector_.at(0) = c_GuidPrefix_Unknown;
expects_inline_qos_ = false;
is_local_reader_ = false;
local_reader_ = nullptr;
local_reader_.reset();
}

bool ReaderLocator::send(
Expand Down Expand Up @@ -206,13 +206,13 @@ bool ReaderLocator::send(
return true;
}

BaseReader* ReaderLocator::local_reader()
LocalReaderPointer::Instance ReaderLocator::local_reader()
{
if (!local_reader_)
{
local_reader_ = RTPSDomainImpl::find_local_reader(general_locator_info_.remote_guid);
}
return local_reader_;
return LocalReaderPointer::Instance(local_reader_);
}

bool ReaderLocator::is_datasharing_reader() const
Expand All @@ -222,15 +222,13 @@ bool ReaderLocator::is_datasharing_reader() const

void ReaderLocator::datasharing_notify()
{
RTPSReader* reader = nullptr;
if (is_local_reader())
{
reader = local_reader();
}

if (reader)
{
BaseReader::downcast(reader)->datasharing_listener()->notify(true);
LocalReaderPointer::Instance reader = local_reader();
if (reader)
{
reader->datasharing_listener()->notify(true);
}
}
else
{
Expand Down
8 changes: 5 additions & 3 deletions src/cpp/rtps/writer/ReaderLocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastdds/rtps/common/LocatorSelectorEntry.hpp>

#include <rtps/reader/LocalReaderPointer.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {
Expand Down Expand Up @@ -67,10 +69,10 @@ class ReaderLocator : public RTPSMessageSenderInterface
return is_local_reader_;
}

BaseReader* local_reader();
LocalReaderPointer::Instance local_reader();

void local_reader(
BaseReader* local_reader)
std::shared_ptr<LocalReaderPointer> local_reader)
{
local_reader_ = local_reader;
}
Expand Down Expand Up @@ -260,7 +262,7 @@ class ReaderLocator : public RTPSMessageSenderInterface
LocatorSelectorEntry async_locator_info_;
bool expects_inline_qos_;
bool is_local_reader_;
BaseReader* local_reader_;
std::shared_ptr<LocalReaderPointer> local_reader_;
std::vector<GuidPrefix_t> guid_prefix_as_vector_;
std::vector<GUID_t> guid_as_vector_;
IDataSharingNotifier* datasharing_notifier_;
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/writer/ReaderProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class ReaderProxy
* Get the local reader on the same process (if any).
* @return The local reader on the same process.
*/
inline BaseReader* local_reader()
inline LocalReaderPointer::Instance local_reader()
{
return locator_info_.local_reader();
}
Expand Down
Loading

0 comments on commit 8d8f1ca

Please sign in to comment.