Skip to content

Commit

Permalink
Add XML configuration for FlowControllerDescriptor and remove Through…
Browse files Browse the repository at this point in the history
…putController (#4837)

* Refs #21054: Remove ThroughputController* references from project src/cpp files (except xmlparser)

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Refactor flow_controller names to be std::string

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Add flow_controller_descriptor_list to XSD

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Update XML related source files to the new flow_controller_descriptor_list

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Update unittests

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Update BlackBox tests

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Update FlowControllerExample

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Linter

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Apply partial rev suggestions

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Remove ThroughputControllerDescriptor.cpp from test source

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Set flow_controller_name as optional in .xsd

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: fix windows unittests

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Minor corrections

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Update versions.md

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Apply rev suggestions

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Add scheduling policy and thread setting tags to existing XML flow controller snippets

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Add missing getXMLFlowControllerDescriptorList() unittesting

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Linter

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #21054: Add checks for repeated tags and test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL authored May 31, 2024
1 parent 2b7c0f6 commit 15ffc4b
Show file tree
Hide file tree
Showing 62 changed files with 561 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>

using namespace eprosima::fastdds::dds;
using namespace eprosima::fastdds::rtps;
using namespace eprosima::fastrtps::rtps;

FlowControlExamplePublisher::FlowControlExamplePublisher()
Expand Down Expand Up @@ -72,6 +73,13 @@ bool FlowControlExamplePublisher::init()
pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
pqos.name("Participant_publisher"); //You can put here the name you want

// This controller allows 300kb per second.
auto slow_flow_controller_descriptor = std::make_shared<eprosima::fastdds::rtps::FlowControllerDescriptor>();
slow_flow_controller_descriptor->name = "slow_flow_controller_descriptor";
slow_flow_controller_descriptor->max_bytes_per_period = 300000;
slow_flow_controller_descriptor->period_ms = static_cast<uint64_t>(1000);
pqos.flow_controllers().push_back(slow_flow_controller_descriptor);

participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);

if (participant_ == nullptr)
Expand Down Expand Up @@ -121,10 +129,7 @@ bool FlowControlExamplePublisher::init()
// Create slow DataWriter
DataWriterQos wsqos;
wsqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE;

// This controller allows 300kb per second.
ThroughputControllerDescriptor slowPublisherThroughputController{300000, 1000};
wsqos.throughput_controller() = slowPublisherThroughputController;
wsqos.publish_mode().flow_controller_name = slow_flow_controller_descriptor->name;

slow_writer_ = slow_publisher_->create_datawriter(topic_, wsqos, &m_listener);

Expand Down
24 changes: 12 additions & 12 deletions examples/cpp/dds/FlowControlExample/README.txt
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
To launch this example open two consoles:

1) "$ ./DDSFlowControlExample subscriber" (or "DDSFlowControlExample.exe subscriber" in Windows).
1) "$ ./DDSFlowControlExample subscriber" (or "DDSFlowControlExample.exe subscriber" in Windows).

2..*) "$ ./DDSFlowControlExample publisher" (or "DDSFlowControlExample.exe publisher" in Windows).

This example illustrates the flow control feature.
This example illustrates the flow control feature.

================
= Flow Control =
================

In Fast DDS, Flow Control is implemented through objects called Flow Controllers. In
particular, we will be looking at the simplest kind, the Throughput Controller.
In Fast DDS, Flow Control is implemented through objects called Flow Controllers. In
particular, we will be looking at the simplest kind, the Flow Controller.

A throughput controller is univocally defined by a Throughput Controller Descriptor,
A flow controller is univocally defined by a Flow Controller Descriptor,
which is a simple struct that includes two values:
-> A size in bytes.
-> A period in milliseconds.

Once instantiated from this descriptor, a throughput controller will make sure there is a
limit on the data it processes, so that no more than the specified size gets
Once instantiated from this descriptor, a flow controller will make sure there is a
limit on the data it processes, so that no more than the specified size gets
through it in the specified time. In other words, it limits data throughput.

Throughput filters can be placed at different points in the system. In this example, you
can see a controller being placed on a particular Writer. Controllers allocated in this
way display a hierarchical behaviour, so in order for data to be sent, it must clear
Flow filters can be placed at different points in the system. In this example, you
can see a controller being placed on a particular Writer. Controllers allocated in this
way display a hierarchical behaviour, so in order for data to be sent, it must clear
both the Participant filter and the Writer filter, if available.

Looking at FlowControlExamplePublisher::init(), you can see the steps involved in
adding a size filter to the publisher parameters.
Looking at FlowControlExamplePublisher::init(), you can see the steps involved in
adding a size filter to the publisher parameters.

12 changes: 2 additions & 10 deletions include/fastdds/dds/core/policy/QosPolicies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2047,7 +2047,7 @@ class PublishModeQosPolicy : public QosPolicy
*
* @since 2.4.0
*/
const char* flow_controller_name = fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT;
std::string flow_controller_name = fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT;

inline void clear() override
{
Expand All @@ -2059,7 +2059,7 @@ class PublishModeQosPolicy : public QosPolicy
const PublishModeQosPolicy& b) const
{
return (this->kind == b.kind) &&
0 == strcmp(flow_controller_name, b.flow_controller_name) &&
flow_controller_name == b.flow_controller_name.c_str() &&
QosPolicy::operator ==(b);
}

Expand Down Expand Up @@ -2679,7 +2679,6 @@ class WireProtocolConfigQos : public QosPolicy
(this->participant_id == b.participant_id) &&
(this->builtin == b.builtin) &&
(this->port == b.port) &&
(this->throughput_controller == b.throughput_controller) &&
(this->default_unicast_locator_list == b.default_unicast_locator_list) &&
(this->default_multicast_locator_list == b.default_multicast_locator_list) &&
(this->default_external_unicast_locators == b.default_external_unicast_locators) &&
Expand All @@ -2705,13 +2704,6 @@ class WireProtocolConfigQos : public QosPolicy
//! Port Parameters
fastrtps::rtps::PortParameters port;

/**
* @brief Throughput controller parameters. Leave default for uncontrolled flow.
*
* @deprecated Use flow_controllers() on DomainParticipantQoS
*/
fastrtps::rtps::ThroughputControllerDescriptor throughput_controller;

/**
* Default list of Unicast Locators to be used for any Endpoint defined inside this RTPSParticipant in the case
* that it was defined with NO UnicastLocators. At least ONE locator should be included in this list.
Expand Down
38 changes: 0 additions & 38 deletions include/fastdds/dds/publisher/qos/DataWriterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class DataWriterQos
(this->reliable_writer_qos_ == b.reliable_writer_qos()) &&
(this->endpoint_ == b.endpoint()) &&
(this->writer_resource_limits_ == b.writer_resource_limits()) &&
(this->throughput_controller_ == b.throughput_controller()) &&
(this->data_sharing_ == b.data_sharing());
}

Expand Down Expand Up @@ -772,40 +771,6 @@ class DataWriterQos
writer_resource_limits_ = writer_resource_limits;
}

/**
* Getter for ThroughputControllerDescriptor
*
* @return ThroughputControllerDescriptor reference
* @deprecated Use flow_controllers() on DomainParticipantQoS
*/
FASTDDS_EXPORTED_API fastrtps::rtps::ThroughputControllerDescriptor& throughput_controller()
{
return throughput_controller_;
}

/**
* Getter for ThroughputControllerDescriptor
*
* @return ThroughputControllerDescriptor reference
* @deprecated Use flow_controllers() on DomainParticipantQoS
*/
FASTDDS_EXPORTED_API const fastrtps::rtps::ThroughputControllerDescriptor& throughput_controller() const
{
return throughput_controller_;
}

/**
* Setter for ThroughputControllerDescriptor
*
* @param throughput_controller new value for the ThroughputControllerDescriptor
* @deprecated Use flow_controllers() on DomainParticipantQoS
*/
FASTDDS_EXPORTED_API void throughput_controller(
const fastrtps::rtps::ThroughputControllerDescriptor& throughput_controller)
{
throughput_controller_ = throughput_controller;
}

/**
* Getter for DataSharingQosPolicy
*
Expand Down Expand Up @@ -902,9 +867,6 @@ class DataWriterQos
//!Writer Resource Limits Qos
WriterResourceLimitsQos writer_resource_limits_;

//!Throughput controller
fastrtps::rtps::ThroughputControllerDescriptor throughput_controller_;

//!DataSharing configuration
DataSharingQosPolicy data_sharing_;
};
Expand Down
9 changes: 0 additions & 9 deletions include/fastdds/rtps/attributes/RTPSParticipantAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/flowcontrol/FlowControllerDescriptor.hpp>
#include <fastdds/rtps/flowcontrol/ThroughputControllerDescriptor.h>
#include <fastdds/rtps/resources/ResourceManagement.h>
#include <fastdds/rtps/transport/network/NetmaskFilterKind.hpp>
#include <fastdds/rtps/transport/TransportInterface.h>
Expand Down Expand Up @@ -432,7 +431,6 @@ class RTPSParticipantAttributes
(this->port == b.port) &&
(this->userData == b.userData) &&
(this->participantID == b.participantID) &&
(this->throughputController == b.throughputController) &&
(this->useBuiltinTransports == b.useBuiltinTransports) &&
(this->properties == b.properties) &&
(this->prefix == b.prefix) &&
Expand Down Expand Up @@ -516,13 +514,6 @@ class RTPSParticipantAttributes
//! Participant ID
int32_t participantID = -1;

/**
* @brief Throughput controller parameters. Leave default for uncontrolled flow.
*
* @deprecated Use flow_controllers on RTPSParticipantAttributes
*/
ThroughputControllerDescriptor throughputController;

//! User defined transports to use alongside or in place of builtins.
std::vector<std::shared_ptr<fastdds::rtps::TransportDescriptorInterface>> userTransports;

Expand Down
10 changes: 1 addition & 9 deletions include/fastdds/rtps/attributes/WriterAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/flowcontrol/FlowControllerConsts.hpp>
#include <fastdds/rtps/flowcontrol/ThroughputControllerDescriptor.h>
#include <fastdds/utils/collections/ResourceLimitedContainerConfig.hpp>

namespace eprosima {
Expand Down Expand Up @@ -123,13 +122,6 @@ class WriterAttributes
//!Indicates if the Writer is synchronous or asynchronous
RTPSWriterPublishMode mode;

/**
* @brief Throughput controller, always the last one to apply
*
* @deprecated Use flow_controllers on RTPSParticipantAttributes
*/
ThroughputControllerDescriptor throughputController;

//! Disable the sending of heartbeat piggybacks.
bool disable_heartbeat_piggyback;

Expand All @@ -143,7 +135,7 @@ class WriterAttributes
Duration_t keep_duration;

//! Flow controller name. Default: fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT.
const char* flow_controller_name = fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT;
std::string flow_controller_name = fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT;
};

} /* namespace rtps */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef FASTDDS_RTPS_FLOWCONTROL_FLOWCONTROLLERDESCRIPTOR_HPP
#define FASTDDS_RTPS_FLOWCONTROL_FLOWCONTROLLERDESCRIPTOR_HPP

#include <string>

#include <fastdds/rtps/attributes/ThreadSettings.hpp>

#include "FlowControllerConsts.hpp"
Expand All @@ -33,7 +35,7 @@ namespace rtps {
struct FlowControllerDescriptor
{
//! Name of the flow controller.
const char* name = nullptr;
std::string name = FASTDDS_FLOW_CONTROLLER_DEFAULT;

//! Scheduler policy used by the flow controller.
//!
Expand Down
56 changes: 0 additions & 56 deletions include/fastdds/rtps/flowcontrol/ThroughputControllerDescriptor.h

This file was deleted.

41 changes: 40 additions & 1 deletion resources/xsd/fastdds_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
├ allocation [0~1],
├ userData [0~1],
├ prefix [0~1],
├ flow_controller_descriptor_list [flowControllerDescriptorListType],
├ builtin_controllers_sender_thread [threadSettingsType],
├ timed_events_thread [threadSettingsType],
├ discovery_server_thread [threadSettingsType],
Expand Down Expand Up @@ -184,6 +185,7 @@
<xs:element name="allocation" type="rtpsParticipantAllocationAttributesType" minOccurs="0" maxOccurs="1"/>
<xs:element name="userData" type="octectVectorQosPolicyType" minOccurs="0" maxOccurs="1"/>
<xs:element name="prefix" type="prefixType" minOccurs="0" maxOccurs="1"/>
<xs:element name="flow_controller_descriptor_list" type="flowControllerDescriptorListType" minOccurs="0" maxOccurs="1"/>
<xs:element name="builtin_controllers_sender_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="timed_events_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="discovery_server_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
Expand Down Expand Up @@ -1747,7 +1749,8 @@
</xs:complexType>

<!--QoS Publish Mode:
└ kind [string] ("ASYNCHRONOUS", "SYNCHRONOUS") -->
├ kind [string] ("ASYNCHRONOUS", "SYNCHRONOUS")
└ flow_controller_name [string] -->
<xs:complexType name="publishModeQosPolicyType">
<xs:all>
<xs:element name="kind" minOccurs="0" maxOccurs="1">
Expand All @@ -1758,6 +1761,7 @@
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="flow_controller_name" type="string" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>

Expand Down Expand Up @@ -1888,6 +1892,41 @@
</xs:all>
</xs:complexType>

<!--Flow Controller Descriptor List Type:
└ flow_controller_descriptor [1~*]-->
<xs:complexType name="flowControllerDescriptorListType">
<xs:sequence minOccurs="1" maxOccurs="unbounded"> <!-- multiple instances of the elements -->
<xs:element name="flow_controller_descriptor" type="flowControllerDescriptorType"/>
</xs:sequence>
</xs:complexType>

<!--Flow Controller Descriptor Type:
├ name [string] req,
├ scheduler [flowControllerSchedulerPolicy],
├ max_bytes_per_period [int32],
├ period_ms [uint64],
└ sender_thread [threadSettingsType]-->
<xs:complexType name="flowControllerDescriptorType">
<xs:all>
<xs:element name="name" type="string" minOccurs="1" maxOccurs="1"/>
<xs:element name="scheduler" type="flowControllerSchedulerPolicy" minOccurs="0" maxOccurs="1"/>
<xs:element name="max_bytes_per_period" type="int32" minOccurs="0" maxOccurs="1"/>
<xs:element name="period_ms" type="uint64" minOccurs="0" maxOccurs="1"/>
<xs:element name="sender_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>

<!--Flow Controller Scheduler Policy Type [string]:
("FIFO", "ROUND_ROBIN", "HIGH_PRIORITY", "PRIORITY_WITH_RESERVATION")-->
<xs:simpleType name="flowControllerSchedulerPolicy">
<xs:restriction base="xs:string">
<xs:enumeration value="FIFO" />
<xs:enumeration value="ROUND_ROBIN" />
<xs:enumeration value="HIGH_PRIORITY" />
<xs:enumeration value="PRIORITY_WITH_RESERVATION" />
</xs:restriction>
</xs:simpleType>

<!--Thread Settings Type:
├ scheduling_policy [int32],
├ priority [int32],
Expand Down
1 change: 0 additions & 1 deletion src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ set(${PROJECT_NAME}_source_files
rtps/exceptions/Exception.cpp
rtps/flowcontrol/FlowControllerConsts.cpp
rtps/flowcontrol/FlowControllerFactory.cpp
rtps/flowcontrol/ThroughputControllerDescriptor.cpp
rtps/history/CacheChangePool.cpp
rtps/history/History.cpp
rtps/history/ReaderHistory.cpp
Expand Down
Loading

0 comments on commit 15ffc4b

Please sign in to comment.