From ff9a6994a390b5c2680c218f19fa016ff3f53502 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Tue, 24 Sep 2024 07:51:44 +0200 Subject: [PATCH] Refs #20165. Apply suggestions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- .../type_lookup_service/TypeLookupManager.cpp | 9 +- .../type_lookup_service/TypeLookupManager.hpp | 7 +- .../TypeLookupReplyListener.cpp | 151 +++++++++--------- .../TypeLookupReplyListener.hpp | 2 + .../TypeLookupRequestListener.cpp | 2 +- .../discovery/endpoint/EDPSimpleListeners.cpp | 24 ++- .../discovery/participant/PDPServer.cpp | 12 +- .../type_lookup_service/TypeLookupManager.hpp | 3 +- 8 files changed, 116 insertions(+), 94 deletions(-) diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp index cefce417707..a0ca795f5f3 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp @@ -365,7 +365,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received( is_type_identifier_known(type_identifier_with_size)) { // The type is already known, invoke the callback - callback(temp_proxy_data.get()); + callback(RETCODE_OK, temp_proxy_data.get()); return RETCODE_OK; } @@ -408,7 +408,8 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received( } void TypeLookupManager::notify_callbacks( - xtypes::TypeIdentfierWithSize type_identifier_with_size) + ReturnCode_t request_ret_status, + const xtypes::TypeIdentfierWithSize& type_identifier_with_size) { bool removed = false; // Check that type is pending to be resolved @@ -417,7 +418,7 @@ void TypeLookupManager::notify_callbacks( { for (auto& proxy_callback_pair : writer_callbacks_it->second) { - proxy_callback_pair.second(proxy_callback_pair.first); + proxy_callback_pair.second(request_ret_status, proxy_callback_pair.first); } removed = true; } @@ -427,7 +428,7 @@ void TypeLookupManager::notify_callbacks( { for (auto& proxy_callback_pair : reader_callbacks_it->second) { - proxy_callback_pair.second(proxy_callback_pair.first); + proxy_callback_pair.second(request_ret_status, proxy_callback_pair.first); } removed = true; } diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp index 3b72c54320d..b7d18e5ea22 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp @@ -105,9 +105,9 @@ namespace builtin { const SampleIdentity INVALID_SAMPLE_IDENTITY; using AsyncGetTypeWriterCallback = std::function< - void (eprosima::fastdds::rtps::WriterProxyData*)>; + void (eprosima::fastdds::dds::ReturnCode_t, eprosima::fastdds::rtps::WriterProxyData*)>; using AsyncGetTypeReaderCallback = std::function< - void (eprosima::fastdds::rtps::ReaderProxyData*)>; + void (eprosima::fastdds::dds::ReturnCode_t, eprosima::fastdds::rtps::ReaderProxyData*)>; /** * Class TypeLookupManager that implements the TypeLookup Service described in the DDS-XTYPES 1.3 specification. @@ -229,7 +229,8 @@ class TypeLookupManager * @param type_identifier_with_size[in] TypeIdentfierWithSize of the callbacks to notify. */ void notify_callbacks( - xtypes::TypeIdentfierWithSize type_identifier_with_size); + ReturnCode_t request_ret_status, + const xtypes::TypeIdentfierWithSize& type_identifier_with_size); /** * Adds a callback to the async_get_type_callbacks_ entry of the TypeIdentfierWithSize, or creates a new one if diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.cpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.cpp index 940cbe0b516..00e6405a053 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.cpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.cpp @@ -100,7 +100,13 @@ void TypeLookupReplyListener::process_reply() if (!replies_queue_.empty()) { TypeLookup_Reply& reply = replies_queue_.front().reply; + + // Check if the received reply SampleIdentity corresponds to an outstanding request + auto& request_id {reply.header().relatedRequestId()}; + auto request_it = typelookup_manager_->async_get_type_requests_.find(request_id); + if (request_it != typelookup_manager_->async_get_type_requests_.end()) { + xtypes::TypeIdentfierWithSize type_id {request_it->second}; // Process the TypeLookup_Reply based on its type switch (reply.return_value()._d()) { @@ -108,9 +114,14 @@ void TypeLookupReplyListener::process_reply() { if (RETCODE_OK == reply.return_value().getType()._d()) { - check_get_types_reply(reply.header().relatedRequestId(), + check_get_types_reply(request_id, type_id, reply.return_value().getType().result(), reply.header().relatedRequestId()); } + else + { + typelookup_manager_->notify_callbacks(RETCODE_NO_DATA, type_id); + typelookup_manager_->remove_async_get_type_request(request_id); + } break; } case TypeLookup_getDependencies_HashId: @@ -118,9 +129,14 @@ void TypeLookupReplyListener::process_reply() if (RETCODE_OK == reply.return_value().getTypeDependencies()._d()) { check_get_type_dependencies_reply( - reply.header().relatedRequestId(), replies_queue_.front().type_server, + request_id, type_id, replies_queue_.front().type_server, reply.return_value().getTypeDependencies().result()); } + else + { + typelookup_manager_->notify_callbacks(RETCODE_NO_DATA, type_id); + typelookup_manager_->remove_async_get_type_request(request_id); + } break; } default: @@ -138,96 +154,85 @@ void TypeLookupReplyListener::process_reply() void TypeLookupReplyListener::check_get_types_reply( const SampleIdentity& request_id, + const xtypes::TypeIdentfierWithSize& type_id, const TypeLookup_getTypes_Out& reply, SampleIdentity related_request) { - // Check if the received reply SampleIdentity corresponds to an outstanding request - auto requests_it = typelookup_manager_->async_get_type_requests_.find(request_id); - if (requests_it != typelookup_manager_->async_get_type_requests_.end()) - { - ReturnCode_t register_result = RETCODE_OK; + ReturnCode_t register_result = RETCODE_OK; - if (0 != reply.types().size()) + if (0 != reply.types().size()) + { + for (xtypes::TypeIdentifierTypeObjectPair pair : reply.types()) { - for (xtypes::TypeIdentifierTypeObjectPair pair : reply.types()) + xtypes::TypeIdentifierPair type_ids; + type_ids.type_identifier1(pair.type_identifier()); + if (RETCODE_OK != fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer(). + register_type_object(pair.type_object(), type_ids, false)) { - xtypes::TypeIdentifierPair type_ids; - type_ids.type_identifier1(pair.type_identifier()); - if (RETCODE_OK != fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer(). - register_type_object(pair.type_object(), type_ids, false)) - { - // If any of the types is not registered, log error - EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER, - "Error registering remote type."); - register_result = RETCODE_ERROR; - } + // If any of the types is not registered, log error + EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER, + "Error registering remote type."); + register_result = RETCODE_ERROR; } + } - if (RETCODE_OK == register_result) + if (RETCODE_OK == register_result) + { + // Check if the get_type_dependencies related to this reply required a continuation_point + std::unique_lock guard(replies_with_continuation_mutex_); + auto it = std::find(replies_with_continuation_.begin(), + replies_with_continuation_.end(), related_request); + if (it != replies_with_continuation_.end()) { - // Check if the get_type_dependencies related to this reply required a continuation_point - std::unique_lock guard(replies_with_continuation_mutex_); - auto it = std::find(replies_with_continuation_.begin(), - replies_with_continuation_.end(), related_request); - if (it != replies_with_continuation_.end()) - { - // If it did, remove it from the list and continue - replies_with_continuation_.erase(it); - } - else + // If it did, remove it from the list and continue + replies_with_continuation_.erase(it); + } + else + { + // If it did not, check that the type that originated the request is consistent + // before notifying the callbacks associated with the request + try { - // If it did not, check that the type that originated the request is consistent - // before notifying the callbacks associated with the request - try - { - xtypes::TypeObject type_object; - fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().get_type_object( - requests_it->second.type_id(), type_object); - xtypes::TypeObjectUtils::type_object_consistency(type_object); - xtypes::TypeIdentifierPair type_ids; - if (RETCODE_OK != - fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer(). - register_type_object(type_object, type_ids, true)) - { - EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER, - "Cannot register minimal of remote type"); - } - - typelookup_manager_->notify_callbacks(requests_it->second); - } - catch (const std::exception& exception) + xtypes::TypeObject type_object; + fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().get_type_object( + type_id.type_id(), type_object); + xtypes::TypeObjectUtils::type_object_consistency(type_object); + xtypes::TypeIdentifierPair type_ids; + if (RETCODE_OK != + fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer(). + register_type_object(type_object, type_ids, true)) { - EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE_REPLY_LISTENER, - "Error registering remote type: " << exception.what()); + EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER, + "Cannot register minimal of remote type"); } + + typelookup_manager_->notify_callbacks(RETCODE_OK, type_id); + } + catch (const std::exception& exception) + { + EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE_REPLY_LISTENER, + "Error registering remote type: " << exception.what()); } } } - else - { - EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER, - "Reply no contains any type."); - register_result = RETCODE_ERROR; - } - - // Remove the processed SampleIdentity from the outstanding requests - typelookup_manager_->remove_async_get_type_request(request_id); } + else + { + EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER, + "Received reply with no types."); + register_result = RETCODE_ERROR; + } + + // Remove the processed SampleIdentity from the outstanding requests + typelookup_manager_->remove_async_get_type_request(request_id); } void TypeLookupReplyListener::check_get_type_dependencies_reply( const SampleIdentity& request_id, + const xtypes::TypeIdentfierWithSize& type_id, const fastdds::rtps::GUID_t type_server, const TypeLookup_getTypeDependencies_Out& reply) { - // Check if the received reply SampleIdentity corresponds to an outstanding request - auto requests_it = typelookup_manager_->async_get_type_requests_.find(request_id); - if (requests_it == typelookup_manager_->async_get_type_requests_.end()) - { - // The reply is not associated with any outstanding request, ignore it - return; - } - // Add the dependent types to the list for the get_type request xtypes::TypeIdentifierSeq needed_types; std::unordered_set unique_types; @@ -251,18 +256,18 @@ void TypeLookupReplyListener::check_get_type_dependencies_reply( // If there is no continuation point, add the parent type if (reply.continuation_point().empty()) { - needed_types.push_back(requests_it->second.type_id()); + needed_types.push_back(type_id.type_id()); } // Make a new request with the continuation point else { SampleIdentity next_request_id = typelookup_manager_-> - get_type_dependencies({requests_it->second.type_id()}, type_server, + get_type_dependencies({type_id.type_id()}, type_server, reply.continuation_point()); if (INVALID_SAMPLE_IDENTITY != next_request_id) { // Store the sent requests and associated TypeIdentfierWithSize - typelookup_manager_->add_async_get_type_request(next_request_id, requests_it->second); + typelookup_manager_->add_async_get_type_request(next_request_id, type_id); } else { @@ -277,7 +282,7 @@ void TypeLookupReplyListener::check_get_type_dependencies_reply( if (INVALID_SAMPLE_IDENTITY != get_types_request) { // Store the type request - typelookup_manager_->add_async_get_type_request(get_types_request, requests_it->second); + typelookup_manager_->add_async_get_type_request(get_types_request, type_id); // If this get_types request has a continuation_point, store it in the list if (!reply.continuation_point().empty()) diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.hpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.hpp index 5a15b7264a5..95bb4cdd776 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.hpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.hpp @@ -100,6 +100,7 @@ class TypeLookupReplyListener : public fastdds::rtps::ReaderListener, public fas */ void check_get_types_reply( const SampleIdentity& request_id, + const xtypes::TypeIdentfierWithSize& type_id, const TypeLookup_getTypes_Out& reply, SampleIdentity related_request); @@ -114,6 +115,7 @@ class TypeLookupReplyListener : public fastdds::rtps::ReaderListener, public fas */ void check_get_type_dependencies_reply( const SampleIdentity& request_id, + const xtypes::TypeIdentfierWithSize& type_id, const fastdds::rtps::GUID_t type_server, const TypeLookup_getTypeDependencies_Out& reply); diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupRequestListener.cpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupRequestListener.cpp index 2cd8c846374..705464e8a73 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupRequestListener.cpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupRequestListener.cpp @@ -210,7 +210,7 @@ void TypeLookupRequestListener::check_get_types_request( if (0 == request.type_ids().size()) { EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REQUEST_LISTENER, - "Request no contains any type identifier."); + "Received request with no type identifiers."); } // Iterate through requested type_ids diff --git a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp index c133fb3950c..dd81e438fdb 100644 --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp @@ -86,10 +86,11 @@ void EDPBasePUBListener::add_writer_from_change( // Callback function to continue after typelookup is complete fastdds::dds::builtin::AsyncGetTypeWriterCallback after_typelookup_callback = [reader, change, edp, &network, writer_added_callback] - (eprosima::fastdds::rtps::WriterProxyData* temp_writer_data) + (eprosima::fastdds::dds::ReturnCode_t request_ret_status, + eprosima::fastdds::rtps::WriterProxyData* temp_writer_data) { //LOAD INFORMATION IN DESTINATION WRITER PROXY DATA - auto copy_data_fun = [&temp_writer_data, &network]( + auto copy_data_fun = [&request_ret_status, &temp_writer_data, &network]( WriterProxyData* data, bool updating, const ParticipantProxyData& participant_data) @@ -107,6 +108,11 @@ void EDPBasePUBListener::add_writer_from_change( data->guid()); } *data = *temp_writer_data; + + if (request_ret_status != fastdds::dds::RETCODE_OK) + { + data->type_information().clear(); + } return true; }; @@ -149,7 +155,7 @@ void EDPBasePUBListener::add_writer_from_change( { EPROSIMA_LOG_INFO( RTPS_EDP, "EDPSimpleListener: No TypeLookupManager or TypeInformation. Trying fallback mechanism"); - after_typelookup_callback(temp_writer_data.get()); + after_typelookup_callback(fastdds::dds::RETCODE_NO_DATA, temp_writer_data.get()); } // Release temporary proxy temp_writer_data.reset(); @@ -230,10 +236,11 @@ void EDPBaseSUBListener::add_reader_from_change( // Callback function to continue after typelookup is complete fastdds::dds::builtin::AsyncGetTypeReaderCallback after_typelookup_callback = [reader, change, edp, &network, reader_added_callback] - (eprosima::fastdds::rtps::ReaderProxyData* temp_reader_data) + (eprosima::fastdds::dds::ReturnCode_t request_ret_status, + eprosima::fastdds::rtps::ReaderProxyData* temp_reader_data) { //LOAD INFORMATION IN DESTINATION READER PROXY DATA - auto copy_data_fun = [&temp_reader_data, &network]( + auto copy_data_fun = [&request_ret_status, &temp_reader_data, &network]( ReaderProxyData* data, bool updating, const ParticipantProxyData& participant_data) @@ -251,6 +258,11 @@ void EDPBaseSUBListener::add_reader_from_change( data->guid()); } *data = *temp_reader_data; + + if (request_ret_status != fastdds::dds::RETCODE_OK) + { + data->type_information().clear(); + } return true; }; @@ -294,7 +306,7 @@ void EDPBaseSUBListener::add_reader_from_change( { EPROSIMA_LOG_INFO( RTPS_EDP, "EDPSimpleListener: No TypeLookupManager or TypeInformation. Trying fallback mechanism"); - after_typelookup_callback(temp_reader_data.get()); + after_typelookup_callback(fastdds::dds::RETCODE_NO_DATA, temp_reader_data.get()); } // Release the temporary proxy temp_reader_data.reset(); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 90871fff974..6d74f6265ca 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -708,12 +708,6 @@ bool PDPServer::pairing_remote_reader_with_local_writer_after_security( void PDPServer::perform_builtin_endpoints_matching( const ParticipantProxyData& pdata) { - // Inform EDP of new RTPSParticipant data: - if (mp_EDP != nullptr) - { - mp_EDP->assignRemoteEndpoints(pdata, true); - } - if (mp_builtin->mp_WLP != nullptr) { mp_builtin->mp_WLP->assignRemoteEndpoints(pdata, true); @@ -723,6 +717,12 @@ void PDPServer::perform_builtin_endpoints_matching( { mp_builtin->typelookup_manager_->assign_remote_endpoints(pdata); } + + // Inform EDP of new RTPSParticipant data: + if (mp_EDP != nullptr) + { + mp_EDP->assignRemoteEndpoints(pdata, true); + } } void PDPServer::removeRemoteEndpoints( diff --git a/test/mock/rtps/TypeLookupManager/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp b/test/mock/rtps/TypeLookupManager/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp index 74c32cb751d..fdf8a870ae7 100644 --- a/test/mock/rtps/TypeLookupManager/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp +++ b/test/mock/rtps/TypeLookupManager/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp @@ -151,7 +151,8 @@ class TypeLookupManager eprosima::ProxyPool::smart_ptr&, const AsyncGetTypeReaderCallback&)); - MOCK_METHOD1(notify_callbacks, void( + MOCK_METHOD2(notify_callbacks, void( + ReturnCode_t, const xtypes::TypeIdentfierWithSize&)); MOCK_METHOD2(add_async_get_type_request, bool(