Skip to content

Commit

Permalink
[20276] Discard already processed samples on PDPListener (#4268)
Browse files Browse the repository at this point in the history
* Refs #20276: Discard processing already processed samples on PDPListener

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20276: Fix failing tests

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20276: Address Miguel's comments

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

---------

Signed-off-by: EduPonz <eduardoponz@eprosima.com>
(cherry picked from commit 4864393)
  • Loading branch information
EduPonz authored and mergify[bot] committed Jan 24, 2024
1 parent 432d620 commit ce953ce
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 1 deletion.
2 changes: 2 additions & 0 deletions include/fastdds/rtps/builtin/data/ParticipantProxyData.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class ParticipantProxyData
//!
ProxyHashTable<WriterProxyData>* m_writers = nullptr;

SampleIdentity m_sample_identity;

/**
* Update the data.
* @param pdata Object to copy the data from
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/builtin/data/ParticipantProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ ParticipantProxyData::ParticipantProxyData(
, should_check_lease_duration(false)
, m_readers(new ProxyHashTable<ReaderProxyData>(allocation.readers))
, m_writers(new ProxyHashTable<WriterProxyData>(allocation.writers))
, m_sample_identity()
{
m_userData.set_max_size(static_cast<uint32_t>(allocation.data_limits.max_user_data));
}
Expand Down Expand Up @@ -99,6 +100,7 @@ ParticipantProxyData::ParticipantProxyData(
// so there is no need to copy m_readers and m_writers
, m_readers(nullptr)
, m_writers(nullptr)
, m_sample_identity(pdata.m_sample_identity)
, lease_duration_(pdata.lease_duration_)
{
}
Expand Down Expand Up @@ -729,6 +731,7 @@ void ParticipantProxyData::copy(
isAlive = pdata.isAlive;
m_userData = pdata.m_userData;
m_properties = pdata.m_properties;
m_sample_identity = pdata.m_sample_identity;

// This method is only called when a new participant is discovered.The destination of the copy
// will always be a new ParticipantProxyData or one from the pool, so there is no need for
Expand Down
19 changes: 18 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,33 @@ void PDPListener::onNewCacheChangeAdded(

// Check if participant already exists (updated info)
ParticipantProxyData* pdata = nullptr;
bool already_processed = false;
for (ParticipantProxyData* it : parent_pdp_->participant_proxies_)
{
if (guid == it->m_guid)
{
pdata = it;

// This means this is the same DATA(p) that we have already processed.
// We do not compare sample_identity directly because it is not properly filled
// in the change during desearialization.
if (it->m_sample_identity.writer_guid() == change->writerGUID &&
it->m_sample_identity.sequence_number() == change->sequenceNumber)
{
already_processed = true;
}

break;
}
}

process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock);
// Only process the DATA(p) if it is not a repeated one
if (!already_processed)
{
temp_participant_data_.m_sample_identity.writer_guid(change->writerGUID);
temp_participant_data_.m_sample_identity.sequence_number(change->sequenceNumber);
process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock);
}
}
}
else if (reader->matched_writer_is_matched(writer_guid))
Expand Down

0 comments on commit ce953ce

Please sign in to comment.