Skip to content

Commit

Permalink
Only apply content filter to ALIVE changes (#4876) (#4903)
Browse files Browse the repository at this point in the history
* [20815] Only apply content filter to ALIVE changes (#4876)

* Refs #20815: Add regression test

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Only apply filter to ALIVE changes

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Rename change_is_relevant_for_filter argument

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Refactor test

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Cast loop index to uint16_t for assigning it to the key field

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Refactor test so PubSubWriter can be used directly

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Fix memory leak

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Apply Mario's suggestions

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Add type traits to PubSubWriterReader and PubSubParticipant

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Default move ctor and assignment in DynamicLoanableSequence

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Add doxygen in DynamicLoanableSequence

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Delete copy semantic from DynamicLoanableSequence

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Use alias for TypeTraits::DataListType in PubSub* classes

Signed-off-by: eduponz <eduardoponz@eprosima.com>

---------

Signed-off-by: eduponz <eduardoponz@eprosima.com>
(cherry picked from commit 9a64956)

# Conflicts:
#	src/cpp/rtps/reader/StatefulReader.cpp
#	src/cpp/rtps/reader/StatelessReader.cpp
#	test/unittest/statistics/dds/CMakeLists.txt
Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Avoid doxygen warning about undocumented param in deleted functions

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Rename some targets to be able to compile with thirdparty

Signed-off-by: eduponz <eduardoponz@eprosima.com>

---------

Signed-off-by: eduponz <eduardoponz@eprosima.com>
Co-authored-by: Eduardo Ponz Segrelles <eduardoponz@eprosima.com>
  • Loading branch information
mergify[bot] and EduPonz authored Jun 10, 2024
1 parent de8aeb5 commit 0fcad67
Show file tree
Hide file tree
Showing 18 changed files with 1,041 additions and 131 deletions.
14 changes: 7 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ if(BUILD_DOCUMENTATION)
endif()

# Target to create documentation directories
add_custom_target(docdirs
add_custom_target(fastdds_docdirs
COMMAND ${CMAKE_COMMAND} -E make_directory ${PROJECT_BINARY_DIR}/doc
COMMENT "Creating documentation directory" VERBATIM)

Expand All @@ -480,12 +480,12 @@ if(BUILD_DOCUMENTATION)
# Configure the template doxyfile for or specific project
configure_file(doxyfile.in ${PROJECT_BINARY_DIR}/doxyfile @ONLY IMMEDIATE)
# Add custom target to run doxygen when ever the project is build
add_custom_target(doxygen
add_custom_target(fastdds_doxygen
COMMAND "${DOXYGEN_EXECUTABLE}" "${PROJECT_BINARY_DIR}/doxyfile"
SOURCES "${PROJECT_BINARY_DIR}/doxyfile"
COMMENT "Generating API documentation with doxygen" VERBATIM)

add_dependencies(doxygen docdirs)
add_dependencies(fastdds_doxygen fastdds_docdirs)

### README html ########################

Expand Down Expand Up @@ -536,15 +536,15 @@ if(BUILD_DOCUMENTATION)
COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/readthedocs_custom.cmake
)

add_dependencies(readthedocs docdirs)
add_dependencies(readthedocs fastdds_docdirs)
endif()

add_custom_target(doc ALL
add_custom_target(fastdds_docs ALL
COMMENT "Generated project documentation" VERBATIM)

add_dependencies(doc doxygen)
add_dependencies(fastdds_docs fastdds_doxygen)
if(NOT CHECK_DOCUMENTATION)
add_dependencies(doc readthedocs)
add_dependencies(fastdds_docs readthedocs)
endif()
endif()

Expand Down
180 changes: 180 additions & 0 deletions include/fastrtps/types/DynamicLoanableSequence.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_
#define _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_

#include <cassert>
#include <memory>

#include <fastdds/dds/core/LoanableSequence.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastrtps/types/DynamicData.h>
#include <fastrtps/types/DynamicPubSubType.h>
#include <fastrtps/types/DynamicTypePtr.h>

namespace eprosima {
namespace fastdds {
namespace dds {

/**
* @brief LoanableSequence specialization for DynamicData.
*
* This class provides a sequence container for handling loanable collections
* of DynamicData.
*
* @tparam _NonConstEnabler to enable data access through [] operator on non-const sequences.
*/
template<typename _NonConstEnabler>
class LoanableSequence<fastrtps::types::DynamicData, _NonConstEnabler>
: public LoanableTypedCollection<fastrtps::types::DynamicData, _NonConstEnabler>
{
public:

/// Type for the size of the sequence.
using size_type = LoanableCollection::size_type;

/// Type for the elements in the sequence.
using element_type = LoanableCollection::element_type;

/**
* @brief Construct a LoanableSequence with a specified dynamic type.
*
* @param[in] dyn_type Pointer to the DynamicType.
*/
LoanableSequence(
fastrtps::types::DynamicType_ptr dyn_type)
: dynamic_type_support_(new fastrtps::types::DynamicPubSubType(dyn_type))
{
}

/**
* @brief Construct a LoanableSequence with a specified maximum size.
*
* @param[in] max Maximum size of the sequence.
*/
LoanableSequence(
size_type max)
{
if (max <= 0)
{
return;
}

resize(max);
}

/**
* @brief Destructor for LoanableSequence.
*/
~LoanableSequence()
{
if (elements_ && !has_ownership_)
{
EPROSIMA_LOG_WARNING(SUBSCRIBER, "Sequence destroyed with active loan");
return;
}

release();
}

/// Deleted copy constructor for LoanableSequence.
LoanableSequence(
const LoanableSequence&) = delete;

/// Deleted copy assignment operator for LoanableSequence.
LoanableSequence& operator =(
const LoanableSequence&) = delete;

/// Move constructor for LoanableSequence.
LoanableSequence(
LoanableSequence&&) = default;

/**
* @brief Move assignment operator for LoanableSequence.
*
* @param[in] other The other LoanableSequence to move from.
*
* @return A reference to this LoanableSequence.
*/
LoanableSequence& operator =(
LoanableSequence&&) = default;

protected:

using LoanableCollection::maximum_;
using LoanableCollection::length_;
using LoanableCollection::elements_;
using LoanableCollection::has_ownership_;

private:

/**
* @brief Resize the sequence to a new maximum size.
*
* @param[in] maximum The new maximum size.
*/
void resize(
size_type maximum) override
{
assert(has_ownership_);

// Resize collection and get new pointer
data_.reserve(maximum);
data_.resize(maximum);
elements_ = reinterpret_cast<element_type*>(data_.data());

// Allocate individual elements
while (maximum_ < maximum)
{
data_[maximum_++] = static_cast<fastrtps::types::DynamicData*>(dynamic_type_support_->createData());
}
}

/**
* @brief Release all elements and clear the sequence.
*/
void release()
{
if (has_ownership_ && elements_)
{
for (size_type n = 0; n < maximum_; ++n)
{
fastrtps::types::DynamicData* elem = data_[n];
dynamic_type_support_->deleteData(elem);
}
std::vector<fastrtps::types::DynamicData*>().swap(data_);
}

maximum_ = 0u;
length_ = 0u;
elements_ = nullptr;
has_ownership_ = true;
}

/// Container for holding the DynamicData elements.
std::vector<fastrtps::types::DynamicData*> data_;

/// Pointer to the DynamicPubSubType type support.
std::unique_ptr<fastrtps::types::DynamicPubSubType> dynamic_type_support_;
};

/// Alias for LoanableSequence with DynamicData and true_type.
using DynamicLoanableSequence = LoanableSequence<fastrtps::types::DynamicData, std::true_type>;

} // namespace dds
} // namespace fastdds
} // namespace eprosima

#endif // _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_
6 changes: 4 additions & 2 deletions include/fastrtps/types/DynamicPubSubType.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
#ifndef TYPES_DYNAMIC_PUB_SUB_TYPE_H
#define TYPES_DYNAMIC_PUB_SUB_TYPE_H

#include <fastrtps/types/TypesBase.h>
#include <fastdds/dds/topic/TopicDataType.hpp>
#include <fastrtps/types/DynamicData.h>
#include <fastrtps/types/DynamicTypePtr.h>
#include <fastrtps/types/DynamicDataPtr.h>
#include <fastrtps/types/TypesBase.h>
#include <fastrtps/utils/md5.h>

namespace eprosima {
Expand All @@ -37,6 +37,8 @@ class DynamicPubSubType : public eprosima::fastdds::dds::TopicDataType

public:

typedef DynamicData type;

RTPS_DllAPI DynamicPubSubType();

RTPS_DllAPI DynamicPubSubType(
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ set(${PROJECT_NAME}_source_files
rtps/DataSharing/DataSharingListener.cpp
rtps/DataSharing/DataSharingNotification.cpp
rtps/reader/WriterProxy.cpp
rtps/reader/reader_utils.cpp
rtps/reader/StatefulReader.cpp
rtps/reader/StatelessReader.cpp
rtps/reader/RTPSReader.cpp
Expand Down
14 changes: 9 additions & 5 deletions src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,17 @@ class ReaderFilterCollection
// Copy the signature
std::copy(entry.filter_signature.begin(), entry.filter_signature.end(), signature);

// Evaluate filter and update filtered_out_readers
bool filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first);
if (!filter_result)
// Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant
bool filter_result = true;
if (fastrtps::rtps::ALIVE == change.kind)
{
change.filtered_out_readers.emplace_back(it->first);
// Evaluate filter and update filtered_out_readers
filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first);
if (!filter_result)
{
change.filtered_out_readers.emplace_back(it->first);
}
}

return filter_result;
};

Expand Down
38 changes: 21 additions & 17 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,28 @@
*/

#include <fastdds/rtps/reader/StatefulReader.h>
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/history/ReaderHistory.h>

#include <cassert>
#include <mutex>
#include <thread>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/messages/RTPSMessageCreator.h>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/WriterProxy.h>
#include <fastrtps/utils/TimeConversion.h>
#include <rtps/history/HistoryAttributesExtension.hpp>
#include <rtps/DataSharing/DataSharingListener.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <fastdds/rtps/builtin/BuiltinProtocols.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/common/VendorId_t.hpp>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/messages/RTPSMessageCreator.h>
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/writer/LivelinessManager.h>
#include <fastrtps/utils/TimeConversion.h>

#include "rtps/RTPSDomainImpl.hpp"

#include <mutex>
#include <thread>

#include <cassert>
#include "reader_utils.hpp"
#include <rtps/DataSharing/DataSharingListener.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <rtps/history/HistoryAttributesExtension.hpp>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/WriterProxy.h>
#include <rtps/RTPSDomainImpl.hpp>

#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " <<

Expand Down Expand Up @@ -554,14 +556,15 @@ bool StatefulReader::processDataMsg(
return false;
}

if (data_filter_ && !data_filter_->is_relevant(*change, m_guid))
if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_))
{
if (pWP)
{
pWP->irrelevant_change_set(change->sequenceNumber);
NotifyChanges(pWP);
send_ack_if_datasharing(this, mp_history, pWP, change->sequenceNumber);
}
// Change was filtered out, so there isn't anything else to do
return true;
}

Expand Down Expand Up @@ -737,7 +740,8 @@ bool StatefulReader::processDataFragMsg(

// Temporarilly assign the inline qos while evaluating the data filter
work_change->inline_qos = incomingChange->inline_qos;
bool filtered_out = data_filter_ && !data_filter_->is_relevant(*work_change, m_guid);
bool filtered_out =
!fastdds::rtps::change_is_relevant_for_filter(*work_change, m_guid, data_filter_);
work_change->inline_qos = SerializedPayload_t();

if (filtered_out)
Expand Down
Loading

0 comments on commit 0fcad67

Please sign in to comment.