diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index fb51c7777a0..421c61c1995 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -158,6 +158,7 @@ set(${PROJECT_NAME}_source_files rtps/participant/RTPSParticipant.cpp rtps/participant/RTPSParticipantImpl.cpp rtps/persistence/PersistenceFactory.cpp + rtps/reader/reader_utils.cpp rtps/reader/RTPSReader.cpp rtps/reader/StatefulPersistentReader.cpp rtps/reader/StatefulReader.cpp diff --git a/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp b/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp index 2ec981b9705..b7b8b2d2d8a 100644 --- a/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp +++ b/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp @@ -133,13 +133,17 @@ class ReaderFilterCollection // Copy the signature std::copy(entry.filter_signature.begin(), entry.filter_signature.end(), signature); - // Evaluate filter and update filtered_out_readers - bool filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first); - if (!filter_result) + // Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant + bool filter_result = true; + if (fastrtps::rtps::ALIVE == change.kind) { - change.filtered_out_readers.emplace_back(it->first); + // Evaluate filter and update filtered_out_readers + filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first); + if (!filter_result) + { + change.filtered_out_readers.emplace_back(it->first); + } } - return filter_result; }; diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index a4f7c8301d5..272a90822c1 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -28,6 +28,8 @@ #include #include +#include "reader_utils.hpp" +#include "rtps/RTPSDomainImpl.hpp" #include #include #include @@ -37,9 +39,6 @@ #include #include #include - -#include "rtps/RTPSDomainImpl.hpp" - #ifdef FASTDDS_STATISTICS #include #endif // FASTDDS_STATISTICS @@ -588,7 +587,7 @@ bool StatefulReader::processDataMsg( return false; } - if (data_filter_ && !data_filter_->is_relevant(*change, m_guid)) + if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_)) { if (pWP) { @@ -596,6 +595,7 @@ bool StatefulReader::processDataMsg( NotifyChanges(pWP); send_ack_if_datasharing(this, mp_history, pWP, change->sequenceNumber); } + // Change was filtered out, so there isn't anything else to do return true; } @@ -771,7 +771,8 @@ bool StatefulReader::processDataFragMsg( // Temporarilly assign the inline qos while evaluating the data filter work_change->inline_qos = incomingChange->inline_qos; - bool filtered_out = data_filter_ && !data_filter_->is_relevant(*work_change, m_guid); + bool filtered_out = + !fastdds::rtps::change_is_relevant_for_filter(*work_change, m_guid, data_filter_); work_change->inline_qos = SerializedPayload_t(); if (filtered_out) diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 2b1c6b91d17..d9affa04c7e 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -27,6 +27,8 @@ #include #include +#include "reader_utils.hpp" +#include "rtps/RTPSDomainImpl.hpp" #include #include #include @@ -34,9 +36,6 @@ #include #include #include - -#include "rtps/RTPSDomainImpl.hpp" - #ifdef FASTDDS_STATISTICS #include #endif // FASTDDS_STATISTICS @@ -582,9 +581,10 @@ bool StatelessReader::processDataMsg( return false; } - if (data_filter_ && !data_filter_->is_relevant(*change, m_guid)) + if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_)) { update_last_notified(change->writerGUID, change->sequenceNumber); + // Change was filtered out, so there isn't anything else to do return true; } @@ -797,7 +797,8 @@ bool StatelessReader::processDataFragMsg( { // Temporarilly assign the inline qos while evaluating the data filter change_completed->inline_qos = incomingChange->inline_qos; - bool filtered_out = data_filter_ && !data_filter_->is_relevant(*change_completed, m_guid); + bool filtered_out = !fastdds::rtps::change_is_relevant_for_filter(*change_completed, m_guid, + data_filter_); change_completed->inline_qos = SerializedPayload_t(); if (filtered_out) diff --git a/src/cpp/rtps/reader/reader_utils.cpp b/src/cpp/rtps/reader/reader_utils.cpp new file mode 100644 index 00000000000..ed7a024f498 --- /dev/null +++ b/src/cpp/rtps/reader/reader_utils.cpp @@ -0,0 +1,45 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file reader_utils.cpp + */ + +#include "reader_utils.hpp" + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +bool change_is_relevant_for_filter( + const CacheChange& change, + const GUID& reader_guid, + const IReaderDataFilter* filter) +{ + bool ret = true; + + // Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant + if ((nullptr != filter) && (fastrtps::rtps::ALIVE == change.kind) && (!filter->is_relevant(change, reader_guid))) + { + ret = false; + } + + return ret; +} + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/rtps/reader/reader_utils.hpp b/src/cpp/rtps/reader/reader_utils.hpp new file mode 100644 index 00000000000..bd2d2f9bad9 --- /dev/null +++ b/src/cpp/rtps/reader/reader_utils.hpp @@ -0,0 +1,53 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file reader_utils.hpp + */ + +#ifndef _FASTDDS_RTPS_READER_READERUTILS_H_ +#define _FASTDDS_RTPS_READER_READERUTILS_H_ + +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +using CacheChange = fastrtps::rtps::CacheChange_t; +using GUID = fastrtps::rtps::GUID_t; + +/** + * @brief Check if a change is relevant for a reader. + * + * @param change The CacheChange_t to be evaluated. + * @param reader_guid Reader's GUID_t. + * @param filter The IReaderDataFilter to be used. + * + * @return true if relevant, false otherwise. + */ +bool change_is_relevant_for_filter( + const CacheChange& change, + const GUID& reader_guid, + const IReaderDataFilter* filter); + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + + +#endif // _FASTDDS_RTPS_READER_READERUTILS_H_ diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 7ee285458e5..289e4517857 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -20,11 +20,14 @@ #ifndef _TEST_BLACKBOX_PUBSUBREADER_HPP_ #define _TEST_BLACKBOX_PUBSUBREADER_HPP_ +#include #include #include #include #include +#include #include +#include #include #include @@ -36,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +50,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -292,6 +298,7 @@ class PubSubReader , listener_(*this) , participant_(nullptr) , topic_(nullptr) + , cf_topic_(nullptr) , subscriber_(nullptr) , datareader_(nullptr) , status_mask_(eprosima::fastdds::dds::StatusMask::all()) @@ -320,6 +327,8 @@ class PubSubReader , times_incompatible_qos_(0) , last_incompatible_qos_(eprosima::fastdds::dds::INVALID_QOS_POLICY_ID) , message_receive_count_(0) + , filter_expression_("") + , expression_parameters_({}) { // Load default QoS to permit testing with external XML profile files. DomainParticipantFactory::get_instance()->load_profiles(); @@ -354,6 +363,19 @@ class PubSubReader loan_sample_validation(false); } + PubSubReader( + const std::string& topic_name, + const std::string& filter_expression, + const std::vector& expression_parameters, + bool take = true, + bool statistics = false, + bool read = true) + : PubSubReader(topic_name, take, statistics, read) + { + filter_expression_ = filter_expression; + expression_parameters_ = expression_parameters; + } + virtual ~PubSubReader() { destroy(); @@ -413,6 +435,17 @@ class PubSubReader ASSERT_NE(topic_, nullptr); ASSERT_TRUE(topic_->is_enabled()); + // Create CFT if needed + if (!filter_expression_.empty()) + { + cf_topic_ = participant_->create_contentfilteredtopic( + topic_name_ + "_cft", + topic_, + filter_expression_, + expression_parameters_); + ASSERT_NE(cf_topic_, nullptr); + } + // Create publisher createSubscriber(); } @@ -426,11 +459,18 @@ class PubSubReader ASSERT_NE(subscriber_, nullptr); ASSERT_TRUE(subscriber_->is_enabled()); + using TopicDescriptionPtr = eprosima::fastdds::dds::TopicDescription*; + TopicDescriptionPtr topic_desc {(nullptr != + cf_topic_) ? static_cast(cf_topic_) : static_cast< + TopicDescriptionPtr>( + topic_)}; + if (!xml_file_.empty()) { if (!datareader_profile_.empty()) { - datareader_ = subscriber_->create_datareader_with_profile(topic_, datareader_profile_, &listener_, + datareader_ = subscriber_->create_datareader_with_profile(topic_desc, datareader_profile_, + &listener_, status_mask_); ASSERT_NE(datareader_, nullptr); ASSERT_TRUE(datareader_->is_enabled()); @@ -438,7 +478,7 @@ class PubSubReader } if (datareader_ == nullptr) { - datareader_ = subscriber_->create_datareader(topic_, datareader_qos_, &listener_, status_mask_); + datareader_ = subscriber_->create_datareader(topic_desc, datareader_qos_, &listener_, status_mask_); } if (datareader_ != nullptr) @@ -474,22 +514,14 @@ class PubSubReader { if (participant_ != nullptr) { - if (datareader_) - { - subscriber_->delete_datareader(datareader_); - datareader_ = nullptr; - } - if (subscriber_) - { - participant_->delete_subscriber(subscriber_); - subscriber_ = nullptr; - } - if (topic_) - { - participant_->delete_topic(topic_); - topic_ = nullptr; - } - eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant_); + ASSERT_EQ(eprosima::fastdds::dds::RETCODE_OK, participant_->delete_contained_entities()); + datareader_ = nullptr; + subscriber_ = nullptr; + cf_topic_ = nullptr; + topic_ = nullptr; + + ASSERT_EQ(eprosima::fastdds::dds::RETCODE_OK, + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant_)); participant_ = nullptr; } @@ -524,6 +556,18 @@ class PubSubReader return get_last_sequence_received(); } + void startReception( + size_t expected_samples) + { + { + std::unique_lock lock(mutex_); + current_processed_count_ = 0; + number_samples_expected_ = expected_samples; + last_seq.clear(); + } + receiving_.store(true); + } + void stopReception() { receiving_.store(false); @@ -1746,6 +1790,13 @@ class PubSubReader return status; } + eprosima::fastdds::dds::SampleLostStatus get_sample_lost_status() const + { + eprosima::fastdds::dds::SampleLostStatus status; + datareader_->get_sample_lost_status(status); + return status; + } + bool is_matched() const { return matched_ > 0; @@ -1878,13 +1929,18 @@ class PubSubReader if (info.valid_data && info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE) { - auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); - ASSERT_NE(it, total_msgs_.end()); - total_msgs_.erase(it); + if (!total_msgs_.empty()) + { + auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); + ASSERT_NE(it, total_msgs_.end()); + total_msgs_.erase(it); + } ++current_processed_count_; default_receive_print(data); cv_.notify_one(); } + + postprocess_sample(data, info); } } @@ -1929,14 +1985,19 @@ class PubSubReader if (valid_sample) { - auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); - ASSERT_NE(it, total_msgs_.end()); - total_msgs_.erase(it); + if (!total_msgs_.empty()) + { + auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); + ASSERT_NE(it, total_msgs_.end()); + total_msgs_.erase(it); + } ++current_processed_count_; default_receive_print(data); cv_.notify_one(); } } + + postprocess_sample(data, info); } datareader->return_loan(datas, infos); @@ -1952,6 +2013,14 @@ class PubSubReader receive_(datareader, std::ref(returnedValue)); } + virtual void postprocess_sample( + const type& data, + const eprosima::fastdds::dds::SampleInfo& info) + { + static_cast(data); + static_cast(info); + } + void participant_matched() { std::unique_lock lock(mutexDiscovery_); @@ -2005,6 +2074,7 @@ class PubSubReader eprosima::fastdds::dds::DomainParticipant* participant_; eprosima::fastdds::dds::DomainParticipantQos participant_qos_; eprosima::fastdds::dds::Topic* topic_; + eprosima::fastdds::dds::ContentFilteredTopic* cf_topic_; eprosima::fastdds::dds::Subscriber* subscriber_; eprosima::fastdds::dds::SubscriberQos subscriber_qos_; eprosima::fastdds::dds::DataReader* datareader_; @@ -2084,6 +2154,11 @@ class PubSubReader SampleLostStatusFunctor sample_lost_status_functor_; //! Functor called when called SampleRejectedStatus listener. SampleRejectedStatusFunctor sample_rejected_status_functor_; + + //! Expression for CFT + std::string filter_expression_; + //! Parameters for CFT expression + std::vector expression_parameters_; }; template diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 543190f3f6c..51abbef074e 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -32,10 +32,12 @@ #if _MSC_VER #include #endif // _MSC_VER +#include #include #include #include #include +#include #include #include #include @@ -46,11 +48,11 @@ #include #include #include +#include +#include #include #include #include -#include -#include #include using eprosima::fastdds::dds::DomainParticipantFactory; @@ -542,6 +544,14 @@ class PubSubWriter return datawriter_->write((void*)&msg); } + eprosima::fastdds::dds::ReturnCode_t send_sample( + type& msg, + const eprosima::fastdds::dds::InstanceHandle_t& instance_handle) + { + default_send_print(msg); + return datawriter_->write((void*)&msg, instance_handle); + } + void assert_liveliness() { datawriter_->assert_liveliness(); diff --git a/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp b/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp index 1f4937a820a..64bfc42d98c 100644 --- a/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp +++ b/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp @@ -13,18 +13,24 @@ // limitations under the License. #include +#include +#include +#include #include +#include +#include #include #include #include #include +#include #include #include #include #include -#include #include +#include #include "../types/HelloWorldTypeObjectSupport.hpp" #include "../types/TestRegression3361PubSubTypes.h" @@ -612,6 +618,91 @@ TEST(DDSContentFilter, CorrectlyHandleAliasOtherHeader) EXPECT_NE(nullptr, filtered_topic); } +/* + * Regression test for https://eprosima.easyredmine.com/issues/20815 + * Check that the content filter is only applied to alive changes. + * The test creates a reliable writer and a reader with a content filter that only accepts messages with a specific + * string. After discovery, the writer sends 10 samples which pass the filer in 10 different instances, with the + * particularity that after each write, the instance is unregistered. + * The DATA(u) generated would not pass the filter if it was applied. To check that the filter is only applied to + * ALIVE changes (not unregister or disposed), the test checks that the reader receives 10 valid samples (one per + * sample sent) and 10 invalid samples (one per unregister). Furthermore, it also checks that no samples are lost.writer + */ +TEST(DDSContentFilter, OnlyFilterAliveChanges) +{ + /* PuBSubReader class to check reception of UNREGISTER samples */ + class CustomPubSubReader : public PubSubReader + { + public: + + CustomPubSubReader( + const std::string& topic_name, + const std::string& filter_expression, + const std::vector& expression_parameters) + : PubSubReader(topic_name, filter_expression, expression_parameters) + { + } + + std::atomic valid_samples{0}; + std::atomic invalid_samples{0}; + + private: + + void postprocess_sample( + const type& /* sample */, + const SampleInfo& info) override final + { + if (info.valid_data) + { + ++valid_samples; + } + else + { + ++invalid_samples; + } + } + + }; + + /* Create reader with CFT */ + std::string expression = "index = 1"; + CustomPubSubReader reader("TestTopic", expression, {}); + reader.reliability(RELIABLE_RELIABILITY_QOS).history_depth(2).init(); + ASSERT_TRUE(reader.isInitialized()); + + /* Create writer */ + PubSubWriter writer("TestTopic"); + writer.reliability(RELIABLE_RELIABILITY_QOS).history_depth(2).init(); + ASSERT_TRUE(writer.isInitialized()); + + /* Wait for discovery */ + writer.wait_discovery(); + reader.wait_discovery(); + + /* Send 10 samples, each on a different instance, unregistering instances after writing */ + const size_t num_samples = 10; + reader.startReception(num_samples); + + for (size_t i = 0; i < num_samples; ++i) + { + KeyedHelloWorld data; + data.key(static_cast(i)); + data.index(1u); // All samples pass the filter + InstanceHandle_t handle = writer.register_instance(data); + ASSERT_NE(HANDLE_NIL, handle); + ASSERT_EQ(RETCODE_OK, writer.send_sample(data, handle)); + ASSERT_EQ(true, writer.unregister_instance(data, handle)); + } + + // Wait until all samples are acknowledged + writer.waitForAllAcked(std::chrono::seconds(3)); + + /* Check that both samples and unregisters are received */ + ASSERT_EQ(reader.valid_samples.load(), 10u); + ASSERT_EQ(reader.invalid_samples.load(), 10u); + ASSERT_EQ(reader.get_sample_lost_status().total_count, 0); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/unittest/dds/publisher/CMakeLists.txt b/test/unittest/dds/publisher/CMakeLists.txt index 5ca597c4008..49ad3799e17 100644 --- a/test/unittest/dds/publisher/CMakeLists.txt +++ b/test/unittest/dds/publisher/CMakeLists.txt @@ -136,6 +136,7 @@ set(DATAWRITERTESTS_SOURCE DataWriterTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/participant/RTPSParticipant.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/participant/RTPSParticipantImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/PersistenceFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/reader_utils.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/RTPSReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulPersistentReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulReader.cpp diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index 674d08a34db..0f970d37843 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -239,6 +239,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS AND NOT QNX) ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/PersistenceFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/sqlite3.c ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/SQLite3PersistenceService.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/reader_utils.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/RTPSReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulPersistentReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulReader.cpp @@ -414,6 +415,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS AND NOT QNX) ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/PersistenceFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/sqlite3.c ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/SQLite3PersistenceService.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/reader_utils.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/RTPSReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulPersistentReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulReader.cpp