Skip to content

Commit

Permalink
Add missing TypeLookup listeners (#4335) (#4436)
Browse files Browse the repository at this point in the history
* Add listener classes to handle cache change acknowledgments

Signed-off-by: Irene Bandera <irenebandera@eprosima.com>

* Uncrustify

Signed-off-by: Irene Bandera <irenebandera@eprosima.com>

* Fix memory leaks in TypeLookupManager.cpp

* Apply changes

Signed-off-by: Irene Bandera <irenebandera@eprosima.com>

* Apply changes

Signed-off-by: Irene Bandera <irenebandera@eprosima.com>

* Refs #20311: Further refactor TypeLookupManager::create_endpoints

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

---------

Signed-off-by: Irene Bandera <irenebandera@eprosima.com>
Signed-off-by: EduPonz <eduardoponz@eprosima.com>
Co-authored-by: EduPonz <eduardoponz@eprosima.com>
(cherry picked from commit 6cb5ebb)

Co-authored-by: Irene Bandera Moreno <irenebandera@eprosima.com>
  • Loading branch information
mergify[bot] and irenebm authored Mar 5, 2024
1 parent 6dabb3e commit 5ed912b
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 28 deletions.
6 changes: 6 additions & 0 deletions include/fastdds/dds/builtin/typelookup/TypeLookupManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class TypeLookupManager
bool create_secure_endpoints();
#endif
*/

void request_cache_change_acked(
fastrtps::rtps::CacheChange_t* change);

void reply_cache_change_acked(
fastrtps::rtps::CacheChange_t* change);
};

} /* namespace builtin */
Expand Down
17 changes: 14 additions & 3 deletions include/fastdds/dds/builtin/typelookup/TypeLookupReplyListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define TYPELOOKUP_REPLY_LISTENER_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastrtps/rtps/reader/ReaderListener.h>
#include <fastrtps/rtps/writer/WriterListener.h>


namespace eprosima {
namespace fastrtps {
Expand All @@ -47,7 +49,7 @@ class TypeLookupManager;
* Class TypeLookupReplyListener that receives the typelookup request messages of remote endpoints.
* @ingroup TYPES_MODULE
*/
class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener, public fastrtps::rtps::WriterListener
{
public:

Expand All @@ -70,7 +72,16 @@ class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
*/
void onNewCacheChangeAdded(
fastrtps::rtps::RTPSReader* reader,
const fastrtps::rtps::CacheChange_t* const change) override;
const fastrtps::rtps::CacheChange_t* const change) override;

/**
* @brief This method is called when all the readers matched with this Writer acknowledge that a cache
* change has been received.
* @param change The cache change
*/
void onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change) override;

private:

Expand All @@ -85,5 +96,5 @@ class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
} /* namespace dds */
} /* namespace fastdds */
} /* namespace eprosima */
#endif
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* TYPELOOKUP_REPLY_LISTENER_HPP_*/
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define TYPELOOKUP_REQUEST_LISTENER_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastrtps/rtps/reader/ReaderListener.h>
#include <fastrtps/rtps/writer/WriterListener.h>


namespace eprosima {
namespace fastrtps {
Expand All @@ -47,7 +49,7 @@ class TypeLookupManager;
* Class TypeLookupRequestListener that receives the typelookup request messages of remote endpoints.
* @ingroup TYPES_MODULE
*/
class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener, public fastrtps::rtps::WriterListener
{
public:

Expand All @@ -70,7 +72,16 @@ class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
*/
void onNewCacheChangeAdded(
fastrtps::rtps::RTPSReader* reader,
const fastrtps::rtps::CacheChange_t* const change) override;
const fastrtps::rtps::CacheChange_t* const change) override;

/**
* @brief This method is called when all the readers matched with this Writer acknowledge that a cache
* change has been received.
* @param change The cache change
*/
void onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change) override;

private:

Expand All @@ -86,5 +97,5 @@ class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
} /* namespace dds */
} /* namespace fastdds */
} /* namespace eprosima */
#endif
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* TYPELOOKUP_REQUEST_LISTENER_HPP_*/
105 changes: 83 additions & 22 deletions src/cpp/fastdds/builtin/typelookup/TypeLookupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ ReaderHistory* TypeLookupManager::get_builtin_reply_reader_history()
*/
bool TypeLookupManager::create_endpoints()
{
bool ret = true;

const RTPSParticipantAttributes& pattr = participant_->getRTPSParticipantAttributes();

// Built-in history attributes.
Expand All @@ -348,14 +350,15 @@ bool TypeLookupManager::create_endpoints()
// Built-in request writer
if (builtin_protocols_->m_att.typelookup_config.use_client)
{
request_listener_ = new TypeLookupRequestListener(this);
builtin_request_writer_history_ = new WriterHistory(hatt);

RTPSWriter* req_writer;
if (participant_->createWriter(
&req_writer,
watt,
builtin_request_writer_history_,
nullptr,
request_listener_,
fastrtps::rtps::c_EntityId_TypeLookup_request_writer,
true))
{
Expand All @@ -365,23 +368,22 @@ bool TypeLookupManager::create_endpoints()
else
{
EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE, "Typelookup request writer creation failed.");
delete builtin_request_writer_history_;
builtin_request_writer_history_ = nullptr;
return false;
ret = false;
}
}

// Built-in reply writer
if (builtin_protocols_->m_att.typelookup_config.use_server)
if (ret && builtin_protocols_->m_att.typelookup_config.use_server)
{
reply_listener_ = new TypeLookupReplyListener(this);
builtin_reply_writer_history_ = new WriterHistory(hatt);

RTPSWriter* rep_writer;
if (participant_->createWriter(
&rep_writer,
watt,
builtin_reply_writer_history_,
nullptr,
reply_listener_,
fastrtps::rtps::c_EntityId_TypeLookup_reply_writer,
true))
{
Expand All @@ -391,9 +393,7 @@ bool TypeLookupManager::create_endpoints()
else
{
EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE, "Typelookup reply writer creation failed.");
delete builtin_reply_writer_history_;
builtin_reply_writer_history_ = nullptr;
return false;
ret = false;
}
}

Expand All @@ -410,9 +410,12 @@ bool TypeLookupManager::create_endpoints()
ratt.endpoint.durabilityKind = fastrtps::rtps::VOLATILE;

// Built-in request reader
if (builtin_protocols_->m_att.typelookup_config.use_server)
if (ret && builtin_protocols_->m_att.typelookup_config.use_server)
{
request_listener_ = new TypeLookupRequestListener(this);
if (nullptr == request_listener_)
{
request_listener_ = new TypeLookupRequestListener(this);
}
builtin_request_reader_history_ = new ReaderHistory(hatt);

RTPSReader* req_reader;
Expand All @@ -430,18 +433,17 @@ bool TypeLookupManager::create_endpoints()
else
{
EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE, "Typelookup request reader creation failed.");
delete builtin_request_reader_history_;
builtin_request_reader_history_ = nullptr;
delete request_listener_;
request_listener_ = nullptr;
return false;
ret = false;
}
}

// Built-in reply reader
if (builtin_protocols_->m_att.typelookup_config.use_client)
if (ret && builtin_protocols_->m_att.typelookup_config.use_client)
{
reply_listener_ = new TypeLookupReplyListener(this);
if (nullptr == reply_listener_)
{
reply_listener_ = new TypeLookupReplyListener(this);
}
builtin_reply_reader_history_ = new ReaderHistory(hatt);

RTPSReader* rep_reader;
Expand All @@ -459,15 +461,50 @@ bool TypeLookupManager::create_endpoints()
else
{
EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE, "Typelookup reply reader creation failed.");
ret = false;
}
}

// Clean up if something failed.
if (!ret)
{
if (nullptr != builtin_request_writer_history_)
{
delete builtin_request_writer_history_;
builtin_request_writer_history_ = nullptr;
}

if (nullptr != builtin_reply_writer_history_)
{
delete builtin_reply_writer_history_;
builtin_reply_writer_history_ = nullptr;
}

if (nullptr != builtin_request_reader_history_)
{
delete builtin_request_reader_history_;
builtin_request_reader_history_ = nullptr;
}

if (nullptr != builtin_reply_reader_history_)
{
delete builtin_reply_reader_history_;
builtin_reply_reader_history_ = nullptr;
}

if (nullptr != request_listener_)
{
delete request_listener_;
request_listener_ = nullptr;
}
if (nullptr != reply_listener_)
{
delete reply_listener_;
reply_listener_ = nullptr;
return false;
}
}

return true;
return ret;
}

/* TODO Implement if security is needed.
Expand Down Expand Up @@ -567,7 +604,13 @@ bool TypeLookupManager::send_request(
SerializedPayload_t payload;
payload.max_size = change->serializedPayload.max_size - 4;
payload.data = change->serializedPayload.data + 4;
if (valid && request_type_.serialize(&req, &payload, DataRepresentationId_t::XCDR2_DATA_REPRESENTATION))

bool serialize_ret = request_type_.serialize(&req, &payload, DataRepresentationId_t::XCDR2_DATA_REPRESENTATION);
if (!serialize_ret)
{
payload.data = nullptr;
}
else if (valid)
{
change->serializedPayload.length += payload.length;
change->serializedPayload.pos += payload.pos;
Expand Down Expand Up @@ -610,7 +653,13 @@ bool TypeLookupManager::send_reply(
SerializedPayload_t payload;
payload.max_size = change->serializedPayload.max_size - 4;
payload.data = change->serializedPayload.data + 4;
if (valid && reply_type_.serialize(&rep, &payload, DataRepresentationId_t::XCDR2_DATA_REPRESENTATION))

bool serialize_ret = reply_type_.serialize(&rep, &payload, DataRepresentationId_t::XCDR2_DATA_REPRESENTATION);
if (!serialize_ret)
{
payload.data = nullptr;
}
else if (valid)
{
change->serializedPayload.length += payload.length;
change->serializedPayload.pos += payload.pos;
Expand Down Expand Up @@ -695,6 +744,18 @@ const fastrtps::rtps::GUID_t& TypeLookupManager::get_builtin_request_writer_guid
return c_Guid_Unknown;
}

void TypeLookupManager::request_cache_change_acked(
fastrtps::rtps::CacheChange_t* change)
{
builtin_request_writer_history_->remove_change(change);
}

void TypeLookupManager::reply_cache_change_acked(
fastrtps::rtps::CacheChange_t* change)
{
builtin_reply_writer_history_->remove_change(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ void TypeLookupReplyListener::onNewCacheChangeAdded(
reader->getHistory()->remove_change(change);
}

void TypeLookupReplyListener::onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change)
{
tlm_->reply_cache_change_acked(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ void TypeLookupRequestListener::onNewCacheChangeAdded(
reader->getHistory()->remove_change(change);
}

void TypeLookupRequestListener::onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change)
{
tlm_->request_cache_change_acked(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down

0 comments on commit 5ed912b

Please sign in to comment.