Skip to content

Commit

Permalink
make possible to create only on node for pub/sub in distributed cm
Browse files Browse the repository at this point in the history
  • Loading branch information
mamueluth committed Apr 24, 2023
1 parent 8c7a3f1 commit e797046
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ rclcpp::NodeOptions get_cm_node_options();

class ControllerManager : public rclcpp::Node
{
enum class controller_manager_type : std::uint8_t;

public:
static constexpr bool kWaitForAllResources = false;
static constexpr auto kInfiniteTimeout = 0;
Expand Down Expand Up @@ -190,6 +192,10 @@ class ControllerManager : public rclcpp::Node
CONTROLLER_MANAGER_PUBLIC
unsigned int get_update_rate() const;

// Per controller update rate support
CONTROLLER_MANAGER_PUBLIC
bool use_multiple_nodes() const;

// Per controller update rate support
CONTROLLER_MANAGER_PUBLIC
std::chrono::milliseconds distributed_interfaces_publish_period() const;
Expand All @@ -199,13 +205,21 @@ class ControllerManager : public rclcpp::Node
void init_services();

CONTROLLER_MANAGER_PUBLIC
void configure_controller_manager();
void get_and_initialize_distributed_parameters();

CONTROLLER_MANAGER_PUBLIC
controller_manager_type determine_controller_manager_type();

CONTROLLER_MANAGER_PUBLIC
void configure_controller_manager(const controller_manager_type & cm_type);

CONTROLLER_MANAGER_PUBLIC
void init_distributed_sub_controller_manager();

CONTROLLER_MANAGER_PUBLIC
void init_distributed_main_controller_services();
void init_distributed_central_controller_manager();

CONTROLLER_MANAGER_PUBLIC void init_distributed_central_controller_manager_services();

CONTROLLER_MANAGER_PUBLIC
void register_sub_controller_manager_srv_cb(
Expand Down Expand Up @@ -427,8 +441,18 @@ class ControllerManager : public rclcpp::Node
*/
rclcpp::CallbackGroup::SharedPtr best_effort_callback_group_;

enum class controller_manager_type : std::uint8_t
{
standard_controller_manager,
distributed_central_controller_manager,
distributed_sub_controller_manager,
unkown_type // indicating something went wrong and type could not be determined
};

bool distributed_ = false;
bool sub_controller_manager_ = false;
bool use_multiple_nodes_ = false;
std::shared_ptr<rclcpp_lifecycle::LifecycleNode> distributed_pub_sub_node_ = nullptr;
std::chrono::milliseconds distributed_interfaces_publish_period_ = std::chrono::milliseconds(12);

rclcpp::CallbackGroup::SharedPtr distributed_system_srv_callback_group_;
Expand Down
118 changes: 87 additions & 31 deletions controller_manager/src/controller_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ ControllerManager::ControllerManager(
diagnostics_updater_.add(
"Controllers Activity", this, &ControllerManager::controller_activity_diagnostic_callback);
init_services();
configure_controller_manager();
get_and_initialize_distributed_parameters();
auto cm_type = determine_controller_manager_type();
configure_controller_manager(cm_type);
}

ControllerManager::ControllerManager(
Expand All @@ -199,7 +201,9 @@ ControllerManager::ControllerManager(
diagnostics_updater_.add(
"Controllers Activity", this, &ControllerManager::controller_activity_diagnostic_callback);
init_services();
configure_controller_manager();
get_and_initialize_distributed_parameters();
auto cm_type = determine_controller_manager_type();
configure_controller_manager(cm_type);
}

void ControllerManager::init_resource_manager(const std::string & robot_description)
Expand Down Expand Up @@ -304,11 +308,7 @@ void ControllerManager::init_services()
qos_services, best_effort_callback_group_);
}

// TODO(Manuel) don't like this, this is for fast poc
// probably better to create factory and handle creation of correct controller manager type
// there. Since asynchronous control should be supported im the future as well and we don't
// want dozen of ifs.
void ControllerManager::configure_controller_manager()
void ControllerManager::get_and_initialize_distributed_parameters()
{
if (!get_parameter("distributed", distributed_))
{
Expand All @@ -324,50 +324,102 @@ void ControllerManager::configure_controller_manager()
sub_controller_manager_ ? "true" : "false");
}

int64_t distributed_interfaces_publish_period;
if (get_parameter("distributed_interfaces_publish_period", distributed_interfaces_publish_period))
{
distributed_interfaces_publish_period_ =
std::chrono::milliseconds(distributed_interfaces_publish_period);
}

else
{
RCLCPP_WARN(
get_logger(),
"'distributed_interfaces_publish_period' parameter not set, using default value.");
}

if (!get_parameter("use_multiple_nodes", use_multiple_nodes_))
{
RCLCPP_WARN(
get_logger(), "'use_multiple_nodes' parameter not set, using default value:%s",
use_multiple_nodes_ ? "true" : "false");
}
}

void ControllerManager::configure_controller_manager(const controller_manager_type & cm_type)
{
switch (cm_type)
{
case controller_manager_type::distributed_central_controller_manager:
init_distributed_central_controller_manager();
break;

case controller_manager_type::distributed_sub_controller_manager:
init_distributed_sub_controller_manager();
break;
case controller_manager_type::standard_controller_manager:
//nothing special to configure
break;
default:
throw std::logic_error(
"Controller manager configuration not possible. Not a known controller manager type."
"Did you maybe set `distributed:false` and `sub_controller_manager:true`?"
"Note:Only distributed controller manager can be a sub controller manager.");
break;
}
}

// TODO(Manuel) don't like this, this is for fast poc
// probably better to create factory and handle creation of correct controller manager type
// there. Since asynchronous control should be supported im the future as well and we don't
// want dozen of ifs.
ControllerManager::controller_manager_type ControllerManager::determine_controller_manager_type()
{
bool std_controller_manager = !distributed_ && !sub_controller_manager_;
bool distributed_sub_controller_manager = distributed_ && sub_controller_manager_;
bool central_controller_manager = distributed_ && !sub_controller_manager_;
if (distributed_sub_controller_manager)
{
init_distributed_sub_controller_manager();
return controller_manager_type::distributed_sub_controller_manager;
}
// This means we are the central controller manager
else if (central_controller_manager)
{
init_distributed_main_controller_services();
return controller_manager_type::distributed_central_controller_manager;
}
// std controller manager or error. std controller manager needs no special setup.
else
else if (std_controller_manager)
{
if (!std_controller_manager)
{
throw std::logic_error(
"Controller manager configured with: distributed:false and sub_controller_manager:true. "
"Only distributed controller manager can be a sub controller manager.");
}
return controller_manager_type::standard_controller_manager;
}
return controller_manager_type::unkown_type;
}

void ControllerManager::init_distributed_sub_controller_manager()
{
int64_t distributed_interfaces_publish_period;
if (get_parameter("distributed_interfaces_publish_period", distributed_interfaces_publish_period))
{
distributed_interfaces_publish_period_ =
std::chrono::milliseconds(distributed_interfaces_publish_period);
}
else
if (!use_multiple_nodes())
{
RCLCPP_WARN(
get_logger(),
"'distributed_interfaces_publish_period' parameter not set, using default value.");
rclcpp::NodeOptions node_options;
distributed_pub_sub_node_ = std::make_shared<rclcpp_lifecycle::LifecycleNode>(
std::string(get_name()) + "_pub_sub_node", get_namespace(), node_options, false);
}
add_hardware_state_publishers();
add_hardware_command_forwarders();
register_sub_controller_manager();
}

void ControllerManager::init_distributed_main_controller_services()
void ControllerManager::init_distributed_central_controller_manager()
{
if (!use_multiple_nodes())
{
rclcpp::NodeOptions node_options;
distributed_pub_sub_node_ = std::make_shared<rclcpp_lifecycle::LifecycleNode>(
std::string(get_name()) + "_pub_sub_node", get_namespace(), node_options, false);
}
init_distributed_central_controller_manager_services();
}

void ControllerManager::init_distributed_central_controller_manager_services()
{
distributed_system_srv_callback_group_ =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
Expand Down Expand Up @@ -398,7 +450,8 @@ void ControllerManager::register_sub_controller_manager_srv_cb(
distributed_state_interfaces;
distributed_state_interfaces.reserve(sub_ctrl_mng_wrapper->get_state_publisher_count());
distributed_state_interfaces =
resource_manager_->import_state_interfaces_of_sub_controller_manager(sub_ctrl_mng_wrapper);
resource_manager_->import_state_interfaces_of_sub_controller_manager(
sub_ctrl_mng_wrapper, get_namespace(), distributed_pub_sub_node_);

for (const auto & state_interface : distributed_state_interfaces)
{
Expand All @@ -421,7 +474,8 @@ void ControllerManager::register_sub_controller_manager_srv_cb(
distributed_command_interfaces;
distributed_command_interfaces.reserve(sub_ctrl_mng_wrapper->get_command_forwarder_count());
distributed_command_interfaces =
resource_manager_->import_command_interfaces_of_sub_controller_manager(sub_ctrl_mng_wrapper);
resource_manager_->import_command_interfaces_of_sub_controller_manager(
sub_ctrl_mng_wrapper, get_namespace(), distributed_pub_sub_node_);

for (const auto & command_interface : distributed_command_interfaces)
{
Expand Down Expand Up @@ -458,7 +512,7 @@ void ControllerManager::add_hardware_state_publishers()
std::vector<std::shared_ptr<distributed_control::StatePublisher>> state_publishers_vec;
state_publishers_vec.reserve(resource_manager_->available_state_interfaces().size());
state_publishers_vec = resource_manager_->create_hardware_state_publishers(
get_namespace(), distributed_interfaces_publish_period());
get_namespace(), distributed_interfaces_publish_period(), distributed_pub_sub_node_);

for (auto const & state_publisher : state_publishers_vec)
{
Expand All @@ -480,7 +534,7 @@ void ControllerManager::add_hardware_command_forwarders()
std::vector<std::shared_ptr<distributed_control::CommandForwarder>> command_forwarder_vec;
command_forwarder_vec.reserve(resource_manager_->available_command_interfaces().size());
command_forwarder_vec = resource_manager_->create_hardware_command_forwarders(
get_namespace(), distributed_interfaces_publish_period());
get_namespace(), distributed_interfaces_publish_period(), distributed_pub_sub_node_);

for (auto const & command_forwarder : command_forwarder_vec)
{
Expand Down Expand Up @@ -2204,6 +2258,8 @@ std::pair<std::string, std::string> ControllerManager::split_command_interface(

unsigned int ControllerManager::get_update_rate() const { return update_rate_; }

bool ControllerManager::use_multiple_nodes() const { return use_multiple_nodes_; }

std::chrono::milliseconds ControllerManager::distributed_interfaces_publish_period() const
{
return distributed_interfaces_publish_period_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class CommandForwarder final
public:
explicit CommandForwarder(
std::unique_ptr<hardware_interface::LoanedCommandInterface> loaned_command_interface_ptr,
const std::string & ns, std::chrono::milliseconds period_in_ms);
const std::string & ns, std::chrono::milliseconds period_in_ms,
std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node);

CommandForwarder() = delete;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class StatePublisher final
public:
explicit StatePublisher(
std::unique_ptr<hardware_interface::LoanedStateInterface> loaned_state_interface_ptr,
const std::string & ns, std::chrono::milliseconds period_in_ms);

const std::string & ns, std::chrono::milliseconds period_in_ms,
std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node);
StatePublisher() = delete;

~StatePublisher() {}
Expand Down
47 changes: 28 additions & 19 deletions hardware_interface/include/hardware_interface/handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ class ReadOnlyHandle : public HandleInterface, public ReadHandleInterface
public:
ReadOnlyHandle(
const std::string & prefix_name, const std::string & interface_name,
double * value_ptr = nullptr)
: HandleInterface(prefix_name, interface_name, value_ptr)
double * value_ptr = nullptr, std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node = nullptr)
: HandleInterface(prefix_name, interface_name, value_ptr, node)
{
}

Expand Down Expand Up @@ -190,18 +190,22 @@ class DistributedReadOnlyHandle : public ReadOnlyHandle
// TODO(Manuel): We should pass the initial value via service call, so that the value_ of ReadOnlyHandle
// is initialized with a feasible value.
DistributedReadOnlyHandle(
const distributed_control::PublisherDescription & description, const std::string & ns = "/")
: ReadOnlyHandle(description.prefix_name(), description.interface_name(), &value_),
const distributed_control::PublisherDescription & description, const std::string & ns,
std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node)
: ReadOnlyHandle(description.prefix_name(), description.interface_name(), &value_, node),
get_value_topic_name_(description.topic_name()),
namespace_(ns),
interface_namespace_(description.get_namespace())
{
rclcpp::NodeOptions node_options;
// if no node has been passed
// create node for subscribing to StatePublisher described in StatePublisherDescription
node_ = std::make_shared<rclcpp_lifecycle::LifecycleNode>(
get_underscore_separated_name() + "_state_interface_subscriber", namespace_, node_options,
false);

if (!node_.get())
{
rclcpp::NodeOptions node_options;
node_ = std::make_shared<rclcpp_lifecycle::LifecycleNode>(
get_underscore_separated_name() + "_state_interface_subscriber", namespace_, node_options,
false);
}
// subscribe to topic provided by StatePublisher
state_value_sub_ = node_->create_subscription<std_msgs::msg::Float64>(
get_value_topic_name_, 10,
Expand Down Expand Up @@ -280,8 +284,8 @@ class ReadWriteHandle : public HandleInterface,
public:
ReadWriteHandle(
const std::string & prefix_name, const std::string & interface_name,
double * value_ptr = nullptr)
: HandleInterface(prefix_name, interface_name, value_ptr)
double * value_ptr = nullptr, std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node = nullptr)
: HandleInterface(prefix_name, interface_name, value_ptr, node)
{
}

Expand Down Expand Up @@ -332,20 +336,25 @@ class DistributedReadWriteHandle : public ReadWriteHandle
{
public:
DistributedReadWriteHandle(
const distributed_control::PublisherDescription & description, const std::string & ns = "/")
: ReadWriteHandle(description.prefix_name(), description.interface_name(), &value_),
const distributed_control::PublisherDescription & description, const std::string & ns,
std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node)
: ReadWriteHandle(description.prefix_name(), description.interface_name(), &value_, node),
get_value_topic_name_(description.topic_name()),
namespace_(ns),
interface_namespace_(description.get_namespace()),
forward_command_topic_name_(get_underscore_separated_name() + "_command_forwarding")
{
// create node for subscribing to StatePublisher described in StatePublisherDescription
rclcpp::NodeOptions node_options;
node_ = std::make_shared<rclcpp_lifecycle::LifecycleNode>(
get_underscore_separated_name() + "_distributed_command_interface", namespace_, node_options,
false);
// if no node has been passed
// create node for subscribing to CommandForwarder described in CommandForwarderDescription
if (!node_.get())
{
rclcpp::NodeOptions node_options;
node_ = std::make_shared<rclcpp_lifecycle::LifecycleNode>(
get_underscore_separated_name() + "_distributed_command_interface", namespace_,
node_options, false);
}

// subscribe to topic provided by StatePublisher
// subscribe to topic provided by CommandForwarder
command_value_sub_ = node_->create_subscription<std_msgs::msg::Float64>(
get_value_topic_name_, 10,
std::bind(&DistributedReadWriteHandle::get_value_cb, this, std::placeholders::_1));
Expand Down
Loading

0 comments on commit e797046

Please sign in to comment.