Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20953] Fix Proxy block in TypeLookup Service #4901

Merged
merged 2 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
typename eprosima::ProxyPool<ProxyType>::smart_ptr& temp_proxy_data,
const AsyncCallback& callback,
std::unordered_map<xtypes::TypeIdentfierWithSize,
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr,
std::vector<std::pair<ProxyType*,
AsyncCallback>>>& async_get_type_callbacks)
{
xtypes::TypeIdentfierWithSize type_identifier_with_size =
Expand All @@ -341,7 +341,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
is_type_identifier_known(type_identifier_with_size))
{
// The type is already known, invoke the callback
callback(temp_proxy_data);
callback(temp_proxy_data.get());
return RETCODE_OK;
}

Expand All @@ -352,7 +352,9 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
if (it != async_get_type_callbacks.end())
{
// TypeIdentfierWithSize exists, add the callback
it->second.push_back(std::make_pair(std::move(temp_proxy_data), callback));
// Make a copy of the proxy to free the EDP pool
ProxyType* temp_proxy_data_copy(new ProxyType(*temp_proxy_data));
it->second.push_back(std::make_pair(temp_proxy_data_copy, callback));
// Return without sending new request
return RETCODE_NO_DATA;
}
Expand All @@ -365,8 +367,10 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
{
// Store the sent requests and callback
add_async_get_type_request(get_type_dependencies_request, type_identifier_with_size);
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr, AsyncCallback>> types;
types.push_back(std::make_pair(std::move(temp_proxy_data), callback));
std::vector<std::pair<ProxyType*, AsyncCallback>> types;
// Make a copy of the proxy to free the EDP pool
ProxyType* temp_proxy_data_copy(new ProxyType(*temp_proxy_data));
types.push_back(std::make_pair(temp_proxy_data_copy, callback));
async_get_type_callbacks.emplace(type_identifier_with_size, std::move(types));

return RETCODE_NO_DATA;
Expand Down Expand Up @@ -442,13 +446,23 @@ bool TypeLookupManager::remove_async_get_type_callback(
auto writer_it = async_get_type_writer_callbacks_.find(type_identifier_with_size);
if (writer_it != async_get_type_writer_callbacks_.end())
{
// Delete the proxies and remove the entry
for (auto& proxy_callback_pair : writer_it->second)
{
delete proxy_callback_pair.first;
}
async_get_type_writer_callbacks_.erase(writer_it);
removed = true;
}
// Check if the key is in the reader map
auto reader_it = async_get_type_reader_callbacks_.find(type_identifier_with_size);
if (reader_it != async_get_type_reader_callbacks_.end())
{
// Delete the proxies and remove the entry
for (auto& proxy_callback_pair : reader_it->second)
{
delete proxy_callback_pair.first;
}
async_get_type_reader_callbacks_.erase(reader_it);
removed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ namespace builtin {
const SampleIdentity INVALID_SAMPLE_IDENTITY;

using AsyncGetTypeWriterCallback = std::function<
void (eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr&)>;
void (eprosima::fastrtps::rtps::WriterProxyData*)>;
using AsyncGetTypeReaderCallback = std::function<
void (eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr&)>;
void (eprosima::fastrtps::rtps::ReaderProxyData*)>;

/**
* Class TypeLookupManager that implements the TypeLookup Service described in the DDS-XTYPES 1.3 specification.
Expand Down Expand Up @@ -211,7 +211,7 @@ class TypeLookupManager
typename eprosima::ProxyPool<ProxyType>::smart_ptr& temp_proxy_data,
const AsyncCallback& callback,
std::unordered_map<xtypes::TypeIdentfierWithSize,
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr,
std::vector<std::pair<ProxyType*,
AsyncCallback>>>& async_get_type_callbacks);

/**
Expand Down Expand Up @@ -427,12 +427,12 @@ class TypeLookupManager

//! Collection of all the WriterProxyData and their callbacks related to a TypeIdentfierWithSize, hashed by its TypeIdentfierWithSize.
std::unordered_map < xtypes::TypeIdentfierWithSize,
std::vector<std::pair<eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr,
std::vector<std::pair<eprosima::fastrtps::rtps::WriterProxyData*,
AsyncGetTypeWriterCallback>>> async_get_type_writer_callbacks_;

//! Collection of all the ReaderProxyData and their callbacks related to a TypeIdentfierWithSize, hashed by its TypeIdentfierWithSize.
std::unordered_map < xtypes::TypeIdentfierWithSize,
std::vector<std::pair<eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr,
std::vector<std::pair<eprosima::fastrtps::rtps::ReaderProxyData*,
AsyncGetTypeReaderCallback>>> async_get_type_reader_callbacks_;

//! Collection of all SampleIdentity and the TypeIdentfierWithSize it originated from, hashed by its SampleIdentity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ void TypeLookupReplyListener::onNewCacheChangeAdded(
return;
}

// Add reply to the processing queue
replies_queue_.push(ReplyWithServerGUID{reply, change->writerGUID});
{
// Notify processor
std::unique_lock<std::mutex> guard(replies_processor_cv_mutex_);
// Add reply to the processing queue
replies_queue_.push(ReplyWithServerGUID{reply, change->writerGUID});
// Notify processor
replies_processor_cv_.notify_all();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,11 @@ void TypeLookupRequestListener::onNewCacheChangeAdded(
TypeLookup_Request request;
if (typelookup_manager_->receive(*change, request))
{
// Add request to the processing queue
requests_queue_.push(request);
{
// Notify processor
std::unique_lock<std::mutex> guard(request_processor_cv_mutex_);
// Add request to the processing queue
requests_queue_.push(request);
// Notify processor
request_processor_cv_.notify_all();
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void EDPBasePUBListener::add_writer_from_change(
// Callback function to continue after typelookup is complete
fastdds::dds::builtin::AsyncGetTypeWriterCallback after_typelookup_callback =
[reader, change, edp, &network, writer_added_callback]
(eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr& temp_writer_data)
(eprosima::fastrtps::rtps::WriterProxyData* temp_writer_data)
{
//LOAD INFORMATION IN DESTINATION WRITER PROXY DATA
auto copy_data_fun = [&temp_writer_data, &network](
Expand Down Expand Up @@ -114,9 +114,6 @@ void EDPBasePUBListener::add_writer_from_change(
WriterProxyData* writer_data =
edp->mp_PDP->addWriterProxyData(temp_writer_data->guid(), participant_guid, copy_data_fun);

// release temporary proxy
temp_writer_data.reset();

if (writer_data != nullptr)
{
edp->pairing_writer_proxy_with_any_local_reader(participant_guid, writer_data);
Expand Down Expand Up @@ -148,8 +145,11 @@ void EDPBasePUBListener::add_writer_from_change(
else
{
EPROSIMA_LOG_INFO(RTPS_EDP, "EDPBasePUBListener: No TypeInformation. Trying fallback mechanism");
after_typelookup_callback(temp_writer_data);
after_typelookup_callback(temp_writer_data.get());
}
// Release temporary proxy
temp_writer_data.reset();


// Take the reader lock again if needed.
reader->getMutex().lock();
Expand Down Expand Up @@ -226,8 +226,9 @@ void EDPBaseSUBListener::add_reader_from_change(
// Callback function to continue after typelookup is complete
fastdds::dds::builtin::AsyncGetTypeReaderCallback after_typelookup_callback =
[reader, change, edp, &network, reader_added_callback]
(eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr& temp_reader_data)
(eprosima::fastrtps::rtps::ReaderProxyData* temp_reader_data)
{
//LOAD INFORMATION IN DESTINATION READER PROXY DATA
auto copy_data_fun = [&temp_reader_data, &network](
ReaderProxyData* data,
bool updating,
Expand All @@ -254,9 +255,6 @@ void EDPBaseSUBListener::add_reader_from_change(
ReaderProxyData* reader_data =
edp->mp_PDP->addReaderProxyData(temp_reader_data->guid(), participant_guid, copy_data_fun);

// Release the temporary proxy
temp_reader_data.reset();

if (reader_data != nullptr) //ADDED NEW DATA
{
edp->pairing_reader_proxy_with_any_local_writer(participant_guid, reader_data);
Expand Down Expand Up @@ -288,8 +286,10 @@ void EDPBaseSUBListener::add_reader_from_change(
else
{
EPROSIMA_LOG_INFO(RTPS_EDP, "EDPBasePUBListener: No TypeInformation. Trying fallback mechanism");
after_typelookup_callback(temp_reader_data);
after_typelookup_callback(temp_reader_data.get());
}
// Release the temporary proxy
temp_reader_data.reset();

// Take the reader lock again if needed.
reader->getMutex().lock();
Expand Down
Loading