Skip to content

Commit

Permalink
Refs #20165. Apply suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
  • Loading branch information
richiware committed Sep 24, 2024
1 parent fa3eb30 commit ff9a699
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
is_type_identifier_known(type_identifier_with_size))
{
// The type is already known, invoke the callback
callback(temp_proxy_data.get());
callback(RETCODE_OK, temp_proxy_data.get());
return RETCODE_OK;
}

Expand Down Expand Up @@ -408,7 +408,8 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
}

void TypeLookupManager::notify_callbacks(
xtypes::TypeIdentfierWithSize type_identifier_with_size)
ReturnCode_t request_ret_status,
const xtypes::TypeIdentfierWithSize& type_identifier_with_size)
{
bool removed = false;
// Check that type is pending to be resolved
Expand All @@ -417,7 +418,7 @@ void TypeLookupManager::notify_callbacks(
{
for (auto& proxy_callback_pair : writer_callbacks_it->second)
{
proxy_callback_pair.second(proxy_callback_pair.first);
proxy_callback_pair.second(request_ret_status, proxy_callback_pair.first);
}
removed = true;
}
Expand All @@ -427,7 +428,7 @@ void TypeLookupManager::notify_callbacks(
{
for (auto& proxy_callback_pair : reader_callbacks_it->second)
{
proxy_callback_pair.second(proxy_callback_pair.first);
proxy_callback_pair.second(request_ret_status, proxy_callback_pair.first);
}
removed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ namespace builtin {
const SampleIdentity INVALID_SAMPLE_IDENTITY;

using AsyncGetTypeWriterCallback = std::function<
void (eprosima::fastdds::rtps::WriterProxyData*)>;
void (eprosima::fastdds::dds::ReturnCode_t, eprosima::fastdds::rtps::WriterProxyData*)>;
using AsyncGetTypeReaderCallback = std::function<
void (eprosima::fastdds::rtps::ReaderProxyData*)>;
void (eprosima::fastdds::dds::ReturnCode_t, eprosima::fastdds::rtps::ReaderProxyData*)>;

/**
* Class TypeLookupManager that implements the TypeLookup Service described in the DDS-XTYPES 1.3 specification.
Expand Down Expand Up @@ -229,7 +229,8 @@ class TypeLookupManager
* @param type_identifier_with_size[in] TypeIdentfierWithSize of the callbacks to notify.
*/
void notify_callbacks(
xtypes::TypeIdentfierWithSize type_identifier_with_size);
ReturnCode_t request_ret_status,
const xtypes::TypeIdentfierWithSize& type_identifier_with_size);

/**
* Adds a callback to the async_get_type_callbacks_ entry of the TypeIdentfierWithSize, or creates a new one if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,43 @@ void TypeLookupReplyListener::process_reply()
if (!replies_queue_.empty())
{
TypeLookup_Reply& reply = replies_queue_.front().reply;

// Check if the received reply SampleIdentity corresponds to an outstanding request
auto& request_id {reply.header().relatedRequestId()};
auto request_it = typelookup_manager_->async_get_type_requests_.find(request_id);
if (request_it != typelookup_manager_->async_get_type_requests_.end())
{
xtypes::TypeIdentfierWithSize type_id {request_it->second};
// Process the TypeLookup_Reply based on its type
switch (reply.return_value()._d())
{
case TypeLookup_getTypes_HashId:
{
if (RETCODE_OK == reply.return_value().getType()._d())
{
check_get_types_reply(reply.header().relatedRequestId(),
check_get_types_reply(request_id, type_id,
reply.return_value().getType().result(), reply.header().relatedRequestId());
}
else
{
typelookup_manager_->notify_callbacks(RETCODE_NO_DATA, type_id);
typelookup_manager_->remove_async_get_type_request(request_id);
}
break;
}
case TypeLookup_getDependencies_HashId:
{
if (RETCODE_OK == reply.return_value().getTypeDependencies()._d())
{
check_get_type_dependencies_reply(
reply.header().relatedRequestId(), replies_queue_.front().type_server,
request_id, type_id, replies_queue_.front().type_server,
reply.return_value().getTypeDependencies().result());
}
else
{
typelookup_manager_->notify_callbacks(RETCODE_NO_DATA, type_id);
typelookup_manager_->remove_async_get_type_request(request_id);
}
break;
}
default:
Expand All @@ -138,96 +154,85 @@ void TypeLookupReplyListener::process_reply()

void TypeLookupReplyListener::check_get_types_reply(
const SampleIdentity& request_id,
const xtypes::TypeIdentfierWithSize& type_id,
const TypeLookup_getTypes_Out& reply,
SampleIdentity related_request)
{
// Check if the received reply SampleIdentity corresponds to an outstanding request
auto requests_it = typelookup_manager_->async_get_type_requests_.find(request_id);
if (requests_it != typelookup_manager_->async_get_type_requests_.end())
{
ReturnCode_t register_result = RETCODE_OK;
ReturnCode_t register_result = RETCODE_OK;

if (0 != reply.types().size())
if (0 != reply.types().size())
{
for (xtypes::TypeIdentifierTypeObjectPair pair : reply.types())
{
for (xtypes::TypeIdentifierTypeObjectPair pair : reply.types())
xtypes::TypeIdentifierPair type_ids;
type_ids.type_identifier1(pair.type_identifier());
if (RETCODE_OK != fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().
register_type_object(pair.type_object(), type_ids, false))
{
xtypes::TypeIdentifierPair type_ids;
type_ids.type_identifier1(pair.type_identifier());
if (RETCODE_OK != fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().
register_type_object(pair.type_object(), type_ids, false))
{
// If any of the types is not registered, log error
EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER,
"Error registering remote type.");
register_result = RETCODE_ERROR;
}
// If any of the types is not registered, log error
EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER,
"Error registering remote type.");
register_result = RETCODE_ERROR;
}
}

if (RETCODE_OK == register_result)
if (RETCODE_OK == register_result)
{
// Check if the get_type_dependencies related to this reply required a continuation_point
std::unique_lock<std::mutex> guard(replies_with_continuation_mutex_);
auto it = std::find(replies_with_continuation_.begin(),
replies_with_continuation_.end(), related_request);
if (it != replies_with_continuation_.end())
{
// Check if the get_type_dependencies related to this reply required a continuation_point
std::unique_lock<std::mutex> guard(replies_with_continuation_mutex_);
auto it = std::find(replies_with_continuation_.begin(),
replies_with_continuation_.end(), related_request);
if (it != replies_with_continuation_.end())
{
// If it did, remove it from the list and continue
replies_with_continuation_.erase(it);
}
else
// If it did, remove it from the list and continue
replies_with_continuation_.erase(it);
}
else
{
// If it did not, check that the type that originated the request is consistent
// before notifying the callbacks associated with the request
try
{
// If it did not, check that the type that originated the request is consistent
// before notifying the callbacks associated with the request
try
{
xtypes::TypeObject type_object;
fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().get_type_object(
requests_it->second.type_id(), type_object);
xtypes::TypeObjectUtils::type_object_consistency(type_object);
xtypes::TypeIdentifierPair type_ids;
if (RETCODE_OK !=
fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().
register_type_object(type_object, type_ids, true))
{
EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER,
"Cannot register minimal of remote type");
}

typelookup_manager_->notify_callbacks(requests_it->second);
}
catch (const std::exception& exception)
xtypes::TypeObject type_object;
fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().get_type_object(
type_id.type_id(), type_object);
xtypes::TypeObjectUtils::type_object_consistency(type_object);
xtypes::TypeIdentifierPair type_ids;
if (RETCODE_OK !=
fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().
register_type_object(type_object, type_ids, true))
{
EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE_REPLY_LISTENER,
"Error registering remote type: " << exception.what());
EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER,
"Cannot register minimal of remote type");
}

typelookup_manager_->notify_callbacks(RETCODE_OK, type_id);
}
catch (const std::exception& exception)
{
EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE_REPLY_LISTENER,
"Error registering remote type: " << exception.what());
}
}
}
else
{
EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER,
"Reply no contains any type.");
register_result = RETCODE_ERROR;
}

// Remove the processed SampleIdentity from the outstanding requests
typelookup_manager_->remove_async_get_type_request(request_id);
}
else
{
EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REPLY_LISTENER,
"Received reply with no types.");
register_result = RETCODE_ERROR;
}

// Remove the processed SampleIdentity from the outstanding requests
typelookup_manager_->remove_async_get_type_request(request_id);
}

void TypeLookupReplyListener::check_get_type_dependencies_reply(
const SampleIdentity& request_id,
const xtypes::TypeIdentfierWithSize& type_id,
const fastdds::rtps::GUID_t type_server,
const TypeLookup_getTypeDependencies_Out& reply)
{
// Check if the received reply SampleIdentity corresponds to an outstanding request
auto requests_it = typelookup_manager_->async_get_type_requests_.find(request_id);
if (requests_it == typelookup_manager_->async_get_type_requests_.end())
{
// The reply is not associated with any outstanding request, ignore it
return;
}

// Add the dependent types to the list for the get_type request
xtypes::TypeIdentifierSeq needed_types;
std::unordered_set<xtypes::TypeIdentifier> unique_types;
Expand All @@ -251,18 +256,18 @@ void TypeLookupReplyListener::check_get_type_dependencies_reply(
// If there is no continuation point, add the parent type
if (reply.continuation_point().empty())
{
needed_types.push_back(requests_it->second.type_id());
needed_types.push_back(type_id.type_id());
}
// Make a new request with the continuation point
else
{
SampleIdentity next_request_id = typelookup_manager_->
get_type_dependencies({requests_it->second.type_id()}, type_server,
get_type_dependencies({type_id.type_id()}, type_server,
reply.continuation_point());
if (INVALID_SAMPLE_IDENTITY != next_request_id)
{
// Store the sent requests and associated TypeIdentfierWithSize
typelookup_manager_->add_async_get_type_request(next_request_id, requests_it->second);
typelookup_manager_->add_async_get_type_request(next_request_id, type_id);
}
else
{
Expand All @@ -277,7 +282,7 @@ void TypeLookupReplyListener::check_get_type_dependencies_reply(
if (INVALID_SAMPLE_IDENTITY != get_types_request)
{
// Store the type request
typelookup_manager_->add_async_get_type_request(get_types_request, requests_it->second);
typelookup_manager_->add_async_get_type_request(get_types_request, type_id);

// If this get_types request has a continuation_point, store it in the list
if (!reply.continuation_point().empty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class TypeLookupReplyListener : public fastdds::rtps::ReaderListener, public fas
*/
void check_get_types_reply(
const SampleIdentity& request_id,
const xtypes::TypeIdentfierWithSize& type_id,
const TypeLookup_getTypes_Out& reply,
SampleIdentity related_request);

Expand All @@ -114,6 +115,7 @@ class TypeLookupReplyListener : public fastdds::rtps::ReaderListener, public fas
*/
void check_get_type_dependencies_reply(
const SampleIdentity& request_id,
const xtypes::TypeIdentfierWithSize& type_id,
const fastdds::rtps::GUID_t type_server,
const TypeLookup_getTypeDependencies_Out& reply);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void TypeLookupRequestListener::check_get_types_request(
if (0 == request.type_ids().size())
{
EPROSIMA_LOG_WARNING(TYPELOOKUP_SERVICE_REQUEST_LISTENER,
"Request no contains any type identifier.");
"Received request with no type identifiers.");
}

// Iterate through requested type_ids
Expand Down
24 changes: 18 additions & 6 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ void EDPBasePUBListener::add_writer_from_change(
// Callback function to continue after typelookup is complete
fastdds::dds::builtin::AsyncGetTypeWriterCallback after_typelookup_callback =
[reader, change, edp, &network, writer_added_callback]
(eprosima::fastdds::rtps::WriterProxyData* temp_writer_data)
(eprosima::fastdds::dds::ReturnCode_t request_ret_status,
eprosima::fastdds::rtps::WriterProxyData* temp_writer_data)
{
//LOAD INFORMATION IN DESTINATION WRITER PROXY DATA
auto copy_data_fun = [&temp_writer_data, &network](
auto copy_data_fun = [&request_ret_status, &temp_writer_data, &network](
WriterProxyData* data,
bool updating,
const ParticipantProxyData& participant_data)
Expand All @@ -107,6 +108,11 @@ void EDPBasePUBListener::add_writer_from_change(
data->guid());
}
*data = *temp_writer_data;

if (request_ret_status != fastdds::dds::RETCODE_OK)
{
data->type_information().clear();
}
return true;
};

Expand Down Expand Up @@ -149,7 +155,7 @@ void EDPBasePUBListener::add_writer_from_change(
{
EPROSIMA_LOG_INFO(
RTPS_EDP, "EDPSimpleListener: No TypeLookupManager or TypeInformation. Trying fallback mechanism");
after_typelookup_callback(temp_writer_data.get());
after_typelookup_callback(fastdds::dds::RETCODE_NO_DATA, temp_writer_data.get());
}
// Release temporary proxy
temp_writer_data.reset();
Expand Down Expand Up @@ -230,10 +236,11 @@ void EDPBaseSUBListener::add_reader_from_change(
// Callback function to continue after typelookup is complete
fastdds::dds::builtin::AsyncGetTypeReaderCallback after_typelookup_callback =
[reader, change, edp, &network, reader_added_callback]
(eprosima::fastdds::rtps::ReaderProxyData* temp_reader_data)
(eprosima::fastdds::dds::ReturnCode_t request_ret_status,
eprosima::fastdds::rtps::ReaderProxyData* temp_reader_data)
{
//LOAD INFORMATION IN DESTINATION READER PROXY DATA
auto copy_data_fun = [&temp_reader_data, &network](
auto copy_data_fun = [&request_ret_status, &temp_reader_data, &network](
ReaderProxyData* data,
bool updating,
const ParticipantProxyData& participant_data)
Expand All @@ -251,6 +258,11 @@ void EDPBaseSUBListener::add_reader_from_change(
data->guid());
}
*data = *temp_reader_data;

if (request_ret_status != fastdds::dds::RETCODE_OK)
{
data->type_information().clear();
}
return true;
};

Expand Down Expand Up @@ -294,7 +306,7 @@ void EDPBaseSUBListener::add_reader_from_change(
{
EPROSIMA_LOG_INFO(
RTPS_EDP, "EDPSimpleListener: No TypeLookupManager or TypeInformation. Trying fallback mechanism");
after_typelookup_callback(temp_reader_data.get());
after_typelookup_callback(fastdds::dds::RETCODE_NO_DATA, temp_reader_data.get());
}
// Release the temporary proxy
temp_reader_data.reset();
Expand Down
Loading

0 comments on commit ff9a699

Please sign in to comment.