Skip to content

Commit

Permalink
Refs #21129: fix errors after reader refactor
Browse files Browse the repository at this point in the history
Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>
  • Loading branch information
elianalf committed Jun 12, 2024
1 parent 0693f61 commit 3a0ce6d
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 177 deletions.
2 changes: 1 addition & 1 deletion src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class DataReaderImpl
bool trigger_value);

void update_subscription_matched_status(
const fastrtps::rtps::MatchingInfo& status);
const fastdds::rtps::MatchingInfo& status);

bool on_data_available(
const fastdds::rtps::GUID_t& writer_guid,
Expand Down
118 changes: 58 additions & 60 deletions src/cpp/rtps/reader/BaseReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,18 @@
namespace eprosima {
namespace fastdds {
namespace rtps {

using namespace fastrtps::rtps;


BaseReader::BaseReader(
fastrtps::rtps::RTPSParticipantImpl* pimpl,
const fastrtps::rtps::GUID_t& guid,
const fastrtps::rtps::ReaderAttributes& att,
fastrtps::rtps::ReaderHistory* hist,
fastrtps::rtps::ReaderListener* listen)
: fastrtps::rtps::RTPSReader(pimpl, guid, att, hist)
fastdds::rtps::RTPSParticipantImpl* pimpl,
const fastdds::rtps::GUID_t& guid,
const fastdds::rtps::ReaderAttributes& att,
fastdds::rtps::ReaderHistory* hist,
fastdds::rtps::ReaderListener* listen)
: fastdds::rtps::RTPSReader(pimpl, guid, att, hist)
, listener_(listen)
, accept_messages_from_unkown_writers_(att.accept_messages_from_unkown_writers)
, expects_inline_qos_(att.expects_inline_qos)
, history_state_(new fastrtps::rtps::ReaderHistoryState(att.matched_writers_allocation.initial))
, history_state_(new fastdds::rtps::ReaderHistoryState(att.matched_writers_allocation.initial))
, liveliness_kind_(att.liveliness_kind)
, liveliness_lease_duration_(att.liveliness_lease_duration)
{
Expand All @@ -73,12 +71,12 @@ BaseReader::BaseReader(
}

BaseReader::BaseReader(
fastrtps::rtps::RTPSParticipantImpl* pimpl,
const fastrtps::rtps::GUID_t& guid,
const fastrtps::rtps::ReaderAttributes& att,
const std::shared_ptr<fastrtps::rtps::IPayloadPool>& payload_pool,
fastrtps::rtps::ReaderHistory* hist,
fastrtps::rtps::ReaderListener* listen)
fastdds::rtps::RTPSParticipantImpl* pimpl,
const fastdds::rtps::GUID_t& guid,
const fastdds::rtps::ReaderAttributes& att,
const std::shared_ptr<fastdds::rtps::IPayloadPool>& payload_pool,
fastdds::rtps::ReaderHistory* hist,
fastdds::rtps::ReaderListener* listen)
: BaseReader(
pimpl, guid, att, payload_pool,
std::make_shared<CacheChangePool>(PoolConfig::from_history_attributes(hist->m_att)),
Expand All @@ -87,18 +85,18 @@ BaseReader::BaseReader(
}

BaseReader::BaseReader(
fastrtps::rtps::RTPSParticipantImpl* pimpl,
const fastrtps::rtps::GUID_t& guid,
const fastrtps::rtps::ReaderAttributes& att,
const std::shared_ptr<fastrtps::rtps::IPayloadPool>& payload_pool,
const std::shared_ptr<fastrtps::rtps::IChangePool>& change_pool,
fastrtps::rtps::ReaderHistory* hist,
fastrtps::rtps::ReaderListener* listen)
: fastrtps::rtps::RTPSReader(pimpl, guid, att, hist)
fastdds::rtps::RTPSParticipantImpl* pimpl,
const fastdds::rtps::GUID_t& guid,
const fastdds::rtps::ReaderAttributes& att,
const std::shared_ptr<fastdds::rtps::IPayloadPool>& payload_pool,
const std::shared_ptr<fastdds::rtps::IChangePool>& change_pool,
fastdds::rtps::ReaderHistory* hist,
fastdds::rtps::ReaderListener* listen)
: fastdds::rtps::RTPSReader(pimpl, guid, att, hist)
, listener_(listen)
, accept_messages_from_unkown_writers_(att.accept_messages_from_unkown_writers)
, expects_inline_qos_(att.expects_inline_qos)
, history_state_(new fastrtps::rtps::ReaderHistoryState(att.matched_writers_allocation.initial))
, history_state_(new fastdds::rtps::ReaderHistoryState(att.matched_writers_allocation.initial))
, liveliness_kind_(att.liveliness_kind)
, liveliness_lease_duration_(att.liveliness_lease_duration)
{
Expand Down Expand Up @@ -173,7 +171,7 @@ uint64_t BaseReader::get_unread_count(
{
for (auto it = history_->changesBegin(); 0 < total_unread_ && it != history_->changesEnd(); ++it)
{
fastrtps::rtps::CacheChange_t* change = *it;
fastdds::rtps::CacheChange_t* change = *it;
if (!change->isRead && get_last_notified(change->writerGUID) >= change->sequenceNumber)
{
change->isRead = true;
Expand All @@ -187,7 +185,7 @@ uint64_t BaseReader::get_unread_count(
}

bool BaseReader::wait_for_unread_cache(
const eprosima::fastrtps::Duration_t& timeout)
const eprosima::fastdds::Duration_t& timeout)
{
auto time_out = std::chrono::steady_clock::now() + std::chrono::seconds(timeout.seconds) +
std::chrono::nanoseconds(timeout.nanosec);
Expand Down Expand Up @@ -215,16 +213,16 @@ bool BaseReader::wait_for_unread_cache(

bool BaseReader::is_sample_valid(
const void* data,
const fastrtps::rtps::GUID_t& writer,
const fastrtps::rtps::SequenceNumber_t& sn) const
const fastdds::rtps::GUID_t& writer,
const fastdds::rtps::SequenceNumber_t& sn) const
{
if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched(writer))
{
// Check if the payload is dirty
// Note the Payloads used in loans include a mandatory RTPS 2.3 extra header
auto payload = static_cast<const fastrtps::rtps::octet*>(data);
payload -= fastrtps::rtps::SerializedPayload_t::representation_header_size;
if (!fastrtps::rtps::DataSharingPayloadPool::check_sequence_number(payload, sn))
auto payload = static_cast<const fastdds::rtps::octet*>(data);
payload -= fastdds::rtps::SerializedPayload_t::representation_header_size;
if (!fastdds::rtps::DataSharingPayloadPool::check_sequence_number(payload, sn))
{
return false;
}
Expand All @@ -233,34 +231,34 @@ bool BaseReader::is_sample_valid(
}

BaseReader* BaseReader::downcast(
fastrtps::rtps::RTPSReader* reader)
fastdds::rtps::RTPSReader* reader)
{
assert(nullptr != dynamic_cast<BaseReader*>(reader));
return static_cast<BaseReader*>(reader);
}

BaseReader* BaseReader::downcast(
fastrtps::rtps::Endpoint* endpoint)
fastdds::rtps::Endpoint* endpoint)
{
assert(nullptr != dynamic_cast<BaseReader*>(endpoint));
return static_cast<BaseReader*>(endpoint);
}

void BaseReader::allow_unknown_writers()
{
assert(fastrtps::rtps::EntityId_t::unknown() != trusted_writer_entity_id_);
assert(fastdds::rtps::EntityId_t::unknown() != trusted_writer_entity_id_);
accept_messages_from_unkown_writers_ = true;
}

bool BaseReader::reserve_cache(
uint32_t cdr_payload_size,
fastrtps::rtps::CacheChange_t*& change)
fastdds::rtps::CacheChange_t*& change)
{
std::lock_guard<decltype(mp_mutex)> guard(mp_mutex);

change = nullptr;

fastrtps::rtps::CacheChange_t* reserved_change = nullptr;
fastdds::rtps::CacheChange_t* reserved_change = nullptr;
if (!change_pool_->reserve_cache(reserved_change))
{
EPROSIMA_LOG_WARNING(RTPS_READER, "Problem reserving cache from pool");
Expand All @@ -280,11 +278,11 @@ bool BaseReader::reserve_cache(
}

void BaseReader::release_cache(
fastrtps::rtps::CacheChange_t* change)
fastdds::rtps::CacheChange_t* change)
{
std::lock_guard<decltype(mp_mutex)> guard(mp_mutex);

fastrtps::rtps::IPayloadPool* pool = change->payload_owner();
fastdds::rtps::IPayloadPool* pool = change->payload_owner();
if (pool)
{
pool->release_payload(*change);
Expand All @@ -293,7 +291,7 @@ void BaseReader::release_cache(
}

void BaseReader::update_liveliness_changed_status(
const fastrtps::rtps::GUID_t& writer,
const fastdds::rtps::GUID_t& writer,
int32_t alive_change,
int32_t not_alive_change)
{
Expand Down Expand Up @@ -343,10 +341,10 @@ bool BaseReader::may_remove_history_record(
}

void BaseReader::add_persistence_guid(
const fastrtps::rtps::GUID_t& guid,
const fastrtps::rtps::GUID_t& persistence_guid)
const fastdds::rtps::GUID_t& guid,
const fastdds::rtps::GUID_t& persistence_guid)
{
if (fastrtps::rtps::c_Guid_Unknown == persistence_guid || persistence_guid == guid)
if (fastdds::rtps::c_Guid_Unknown == persistence_guid || persistence_guid == guid)
{
std::lock_guard<decltype(mp_mutex)> guard(mp_mutex);
history_state_->persistence_guid_map[guid] = guid;
Expand All @@ -373,12 +371,12 @@ void BaseReader::add_persistence_guid(
}

void BaseReader::remove_persistence_guid(
const fastrtps::rtps::GUID_t& guid,
const fastrtps::rtps::GUID_t& persistence_guid,
const fastdds::rtps::GUID_t& guid,
const fastdds::rtps::GUID_t& persistence_guid,
bool removed_by_lease)
{
std::lock_guard<decltype(mp_mutex)> guard(mp_mutex);
auto persistence_guid_stored = (fastrtps::rtps::c_Guid_Unknown == persistence_guid) ? guid : persistence_guid;
auto persistence_guid_stored = (fastdds::rtps::c_Guid_Unknown == persistence_guid) ? guid : persistence_guid;
history_state_->persistence_guid_map.erase(guid);
auto count = --history_state_->persistence_guid_count[persistence_guid_stored];
if (count <= 0 && may_remove_history_record(removed_by_lease))
Expand All @@ -388,12 +386,12 @@ void BaseReader::remove_persistence_guid(
}
}

fastrtps::rtps::SequenceNumber_t BaseReader::get_last_notified(
const fastrtps::rtps::GUID_t& guid)
fastdds::rtps::SequenceNumber_t BaseReader::get_last_notified(
const fastdds::rtps::GUID_t& guid)
{
fastrtps::rtps::SequenceNumber_t ret_val;
fastdds::rtps::SequenceNumber_t ret_val;
std::lock_guard<decltype(mp_mutex)> guard(mp_mutex);
fastrtps::rtps::GUID_t guid_to_look = guid;
fastdds::rtps::GUID_t guid_to_look = guid;
auto p_guid = history_state_->persistence_guid_map.find(guid);
if (p_guid != history_state_->persistence_guid_map.end())
{
Expand All @@ -409,13 +407,13 @@ fastrtps::rtps::SequenceNumber_t BaseReader::get_last_notified(
return ret_val;
}

fastrtps::rtps::SequenceNumber_t BaseReader::update_last_notified(
const fastrtps::rtps::GUID_t& guid,
const fastrtps::rtps::SequenceNumber_t& seq)
fastdds::rtps::SequenceNumber_t BaseReader::update_last_notified(
const fastdds::rtps::GUID_t& guid,
const fastdds::rtps::SequenceNumber_t& seq)
{
fastrtps::rtps::SequenceNumber_t ret_val;
fastdds::rtps::SequenceNumber_t ret_val;
std::lock_guard<decltype(mp_mutex)> guard(mp_mutex);
fastrtps::rtps::GUID_t guid_to_look = guid;
fastdds::rtps::GUID_t guid_to_look = guid;
auto p_guid = history_state_->persistence_guid_map.find(guid);
if (p_guid != history_state_->persistence_guid_map.end())
{
Expand All @@ -439,16 +437,16 @@ fastrtps::rtps::SequenceNumber_t BaseReader::update_last_notified(
}

void BaseReader::persist_last_notified_nts(
const fastrtps::rtps::GUID_t& peristence_guid,
const fastrtps::rtps::SequenceNumber_t& seq)
const fastdds::rtps::GUID_t& peristence_guid,
const fastdds::rtps::SequenceNumber_t& seq)
{
// Empty base implementation since base behavior is to not persist data
static_cast<void>(peristence_guid);
static_cast<void>(seq);
}

bool BaseReader::is_datasharing_compatible_with(
const fastrtps::rtps::WriterProxyData& wdata)
const fastdds::rtps::WriterProxyData& wdata)
{
if (!is_datasharing_compatible_ ||
wdata.m_qos.data_sharing.kind() == fastdds::dds::DataSharingKind::OFF)
Expand Down Expand Up @@ -485,9 +483,9 @@ void BaseReader::init(
}

void BaseReader::setup_datasharing(
const fastrtps::rtps::ReaderAttributes& att)
const fastdds::rtps::ReaderAttributes& att)
{
using namespace fastrtps::rtps;
using namespace fastdds::rtps;

if (att.endpoint.data_sharing_configuration().kind() != fastdds::dds::DataSharingKind::OFF)
{
Expand Down
Loading

0 comments on commit 3a0ce6d

Please sign in to comment.