Skip to content

Commit

Permalink
Refs #20625: Add Monitor Service Fix. Use DataWriterHistory
Browse files Browse the repository at this point in the history
Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL committed Mar 15, 2024
1 parent 607025d commit 6566aed
Showing 1 changed file with 32 additions and 8 deletions.
40 changes: 32 additions & 8 deletions src/cpp/statistics/rtps/monitor-service/MonitorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
*/

#include <statistics/rtps/monitor-service/MonitorService.hpp>
#include <statistics/rtps/StatisticsBase.hpp>

#include <fastdds/publisher/DataWriterHistory.hpp>
#include <fastdds/statistics/topic_names.hpp>
#include <fastrtps/utils/TimeConversion.h>

#include <rtps/history/PoolConfig.h>
#include <rtps/history/TopicPayloadPoolRegistry.hpp>

#include <statistics/rtps/StatisticsBase.hpp>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;

Expand Down Expand Up @@ -461,9 +462,13 @@ bool MonitorService::add_change(
MonitorServiceStatusData& status_data,
const bool& disposed)
{
InstanceHandle_t handle;
type_.getKey(&status_data, &handle, false);

CacheChange_t* change = status_writer_->new_change(
type_.getSerializedSizeProvider(&status_data),
(disposed ? fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED : fastrtps::rtps::ALIVE));
(disposed ? fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED : fastrtps::rtps::ALIVE),
handle);

if (nullptr != change)
{
Expand All @@ -476,8 +481,13 @@ bool MonitorService::add_change(
return false;
}

type_.getKey(&status_data, &change->instanceHandle);
status_writer_history_->add_change(change);
WriteParams wp;
auto datawriter_history = static_cast<eprosima::fastdds::dds::DataWriterHistory*>(status_writer_history_.get());

std::unique_lock<RecursiveTimedMutex> lock(status_writer_->getMutex());
auto max_blocking_time = std::chrono::steady_clock::now() +
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(Duration_t()));
datawriter_history->add_pub_change(change, wp, lock, max_blocking_time);
}
else
{
Expand Down Expand Up @@ -511,11 +521,25 @@ bool MonitorService::create_endpoint()
watts.endpoint.properties.properties().push_back(std::move(property));

HistoryAttributes hatt;
hatt.payloadMaxSize = BUILTIN_DATA_MAX_SIZE;
hatt.payloadMaxSize = type_.m_typeSize;
hatt.memoryPolicy = MemoryManagementPolicy_t::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
hatt.initialReservedCaches = 25;

status_writer_history_.reset(new WriterHistory(hatt));
hatt.maximumReservedCaches = 0;

TopicAttributes tatt;
tatt.historyQos.kind = KEEP_LAST_HISTORY_QOS;
tatt.historyQos.depth = 1;
tatt.topicKind = WITH_KEY;
tatt.topicName = MONITOR_SERVICE_TOPIC;
tatt.resourceLimitsQos.max_instances = 0;
tatt.resourceLimitsQos.max_samples_per_instance = 1;

status_writer_history_.reset(new eprosima::fastdds::dds::DataWriterHistory(tatt, type_.m_typeSize,
MemoryManagementPolicy_t::PREALLOCATED_WITH_REALLOC_MEMORY_MODE,
[](
const InstanceHandle_t& ) -> void
{
}));

PoolConfig writer_pool_cfg = PoolConfig::from_history_attributes(hatt);
status_writer_payload_pool_ = TopicPayloadPoolRegistry::get(MONITOR_SERVICE_TOPIC, writer_pool_cfg);
Expand Down

0 comments on commit 6566aed

Please sign in to comment.