Skip to content

Commit

Permalink
Working async controllers and components [not synchronized] (#1041)
Browse files Browse the repository at this point in the history
  • Loading branch information
VX792 committed May 8, 2024
1 parent 607755e commit 2cbe470
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 78 deletions.
111 changes: 111 additions & 0 deletions controller_interface/include/controller_interface/async_controller.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2024 ros2_control development team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef CONTROLLER_INTERFACE__ASYNC_CONTROLLER_HPP_
#define CONTROLLER_INTERFACE__ASYNC_CONTROLLER_HPP_

#include <atomic>
#include <memory>
#include <thread>

#include "controller_interface_base.hpp"
#include "lifecycle_msgs/msg/state.hpp"

namespace controller_interface
{

class AsyncControllerThread
{
public:
/// Constructor for the AsyncControllerThread object.
/**
*
* \param[in] controller shared pointer to a controller.
* \param[in] cm_update_rate the controller manager's update rate.
*/
AsyncControllerThread(
std::shared_ptr<controller_interface::ControllerInterfaceBase> & controller, int cm_update_rate)
: terminated_(false), controller_(controller), thread_{}, cm_update_rate_(cm_update_rate)
{
}

AsyncControllerThread(const AsyncControllerThread & t) = delete;
AsyncControllerThread(AsyncControllerThread && t) = delete;

// Destructor, called when the component is erased from its map.
~AsyncControllerThread()
{
terminated_.store(true, std::memory_order_seq_cst);
if (thread_.joinable())
{
thread_.join();
}
}

/// Creates the controller's thread.
/**
* Called when the controller is activated.
*
*/
void activate()
{
thread_ = std::thread(&AsyncControllerThread::controller_update_callback, this);
}

/// Periodically execute the controller's update method.
/**
* Callback of the async controller's thread.
* **Not synchronized with the controller manager's write and read currently**
*
*/
void controller_update_callback()
{
using TimePoint = std::chrono::system_clock::time_point;
unsigned int used_update_rate =
controller_->get_update_rate() == 0 ? cm_update_rate_ : controller_->get_update_rate();

auto previous_time = controller_->get_node()->now();
while (!terminated_.load(std::memory_order_relaxed))
{
auto const period = std::chrono::nanoseconds(1'000'000'000 / used_update_rate);
TimePoint next_iteration_time =
TimePoint(std::chrono::nanoseconds(controller_->get_node()->now().nanoseconds()));

if (controller_->get_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE)
{
auto const current_time = controller_->get_node()->now();
auto const measured_period = current_time - previous_time;
previous_time = current_time;
controller_->update(
controller_->get_node()->now(),
(controller_->get_update_rate() != cm_update_rate_ && controller_->get_update_rate() != 0)
? rclcpp::Duration::from_seconds(1.0 / controller_->get_update_rate())
: measured_period);
}

next_iteration_time += period;
std::this_thread::sleep_until(next_iteration_time);
}
}

private:
std::atomic<bool> terminated_;
std::shared_ptr<controller_interface::ControllerInterfaceBase> controller_;
std::thread thread_;
unsigned int cm_update_rate_;
};

} // namespace controller_interface

#endif // CONTROLLER_INTERFACE__ASYNC_CONTROLLER_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <utility>
#include <vector>

#include "controller_interface/async_controller.hpp"
#include "controller_interface/chainable_controller_interface.hpp"
#include "controller_interface/controller_interface.hpp"
#include "controller_interface/controller_interface_base.hpp"
Expand Down Expand Up @@ -196,6 +197,15 @@ class ControllerManager : public rclcpp::Node
CONTROLLER_MANAGER_PUBLIC
unsigned int get_update_rate() const;

/// Deletes all async controllers and components.
/**
* Needed to join the threads immediately after the control loop is ended
* to avoid unnecessary iterations. Otherwise
* the threads will be joined only when the controller manager gets destroyed.
*/
CONTROLLER_MANAGER_PUBLIC
void shutdown_async_controllers_and_components();

protected:
CONTROLLER_MANAGER_PUBLIC
void init_services();
Expand Down Expand Up @@ -563,65 +573,7 @@ class ControllerManager : public rclcpp::Node

SwitchParams switch_params_;

class ControllerThreadWrapper
{
public:
ControllerThreadWrapper(
std::shared_ptr<controller_interface::ControllerInterfaceBase> & controller,
int cm_update_rate)
: terminated_(false), controller_(controller), thread_{}, cm_update_rate_(cm_update_rate)
{
}

ControllerThreadWrapper(const ControllerThreadWrapper & t) = delete;
ControllerThreadWrapper(ControllerThreadWrapper && t) = default;
~ControllerThreadWrapper()
{
terminated_.store(true, std::memory_order_seq_cst);
if (thread_.joinable())
{
thread_.join();
}
}

void activate()
{
thread_ = std::thread(&ControllerThreadWrapper::call_controller_update, this);
}

void call_controller_update()
{
using TimePoint = std::chrono::system_clock::time_point;
unsigned int used_update_rate =
controller_->get_update_rate() == 0
? cm_update_rate_
: controller_
->get_update_rate(); // determines if the controller's or CM's update rate is used

while (!terminated_.load(std::memory_order_relaxed))
{
auto const period = std::chrono::nanoseconds(1'000'000'000 / used_update_rate);
TimePoint next_iteration_time =
TimePoint(std::chrono::nanoseconds(controller_->get_node()->now().nanoseconds()));

if (controller_->get_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE)
{
// critical section, not implemented yet
}

next_iteration_time += period;
std::this_thread::sleep_until(next_iteration_time);
}
}

private:
std::atomic<bool> terminated_;
std::shared_ptr<controller_interface::ControllerInterfaceBase> controller_;
std::thread thread_;
unsigned int cm_update_rate_;
};

std::unordered_map<std::string, std::unique_ptr<ControllerThreadWrapper>>
std::unordered_map<std::string, std::unique_ptr<controller_interface::AsyncControllerThread>>
async_controller_threads_;
};

Expand Down
10 changes: 9 additions & 1 deletion controller_manager/src/controller_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ controller_interface::return_type ControllerManager::configure_controller(
if (controller->is_async())
{
async_controller_threads_.emplace(
controller_name, std::make_unique<ControllerThreadWrapper>(controller, update_rate_));
controller_name,
std::make_unique<controller_interface::AsyncControllerThread>(controller, update_rate_));
}

const auto controller_update_rate = controller->get_update_rate();
Expand Down Expand Up @@ -2317,6 +2318,13 @@ std::pair<std::string, std::string> ControllerManager::split_command_interface(

unsigned int ControllerManager::get_update_rate() const { return update_rate_; }

void ControllerManager::shutdown_async_controllers_and_components()
{
async_controller_threads_.erase(
async_controller_threads_.begin(), async_controller_threads_.end());
resource_manager_->shutdown_async_components();
}

void ControllerManager::propagate_deactivation_of_chained_mode(
const std::vector<ControllerSpec> & controllers)
{
Expand Down
2 changes: 2 additions & 0 deletions controller_manager/src/ros2_control_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ int main(int argc, char ** argv)
next_iteration_time += period;
std::this_thread::sleep_until(next_iteration_time);
}

cm->shutdown_async_controllers_and_components();
});

executor->add_node(cm);
Expand Down
44 changes: 27 additions & 17 deletions hardware_interface/include/hardware_interface/async_components.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <atomic>
#include <thread>
#include <type_traits>
#include <variant>

#include "hardware_interface/actuator.hpp"
Expand All @@ -34,29 +35,23 @@ class AsyncComponentThread
{
public:
explicit AsyncComponentThread(
Actuator * component, unsigned int update_rate,
unsigned int update_rate,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr clock_interface)
: hardware_component_(component), cm_update_rate_(update_rate), clock_interface_(clock_interface)
: cm_update_rate_(update_rate), clock_interface_(clock_interface)
{
}

explicit AsyncComponentThread(
System * component, unsigned int update_rate,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr clock_interface)
: hardware_component_(component), cm_update_rate_(update_rate), clock_interface_(clock_interface)
{
}

explicit AsyncComponentThread(
Sensor * component, unsigned int update_rate,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr clock_interface)
: hardware_component_(component), cm_update_rate_(update_rate), clock_interface_(clock_interface)
// Fills the internal variant with the desired component.
template <typename T>
void register_component(T * component)
{
hardware_component_ = component;
}

AsyncComponentThread(const AsyncComponentThread & t) = delete;
AsyncComponentThread(AsyncComponentThread && t) = default;
AsyncComponentThread(AsyncComponentThread && t) = delete;

// Destructor, called when the component is erased from its map.
~AsyncComponentThread()
{
terminated_.store(true, std::memory_order_seq_cst);
Expand All @@ -65,9 +60,19 @@ class AsyncComponentThread
write_and_read_.join();
}
}

/// Creates the component's thread.
/**
* Called when the component is activated.
*
*/
void activate() { write_and_read_ = std::thread(&AsyncComponentThread::write_and_read, this); }

/// Periodically execute the component's write and read methods.
/**
* Callback of the async component's thread.
* **Not synchronized with the controller manager's update currently**
*
*/
void write_and_read()
{
using TimePoint = std::chrono::system_clock::time_point;
Expand All @@ -88,8 +93,12 @@ class AsyncComponentThread
auto measured_period = current_time - previous_time;
previous_time = current_time;

// write
// read
if (!first_iteration)
{
component->write(clock_interface_->get_clock()->now(), measured_period);
}
component->read(clock_interface_->get_clock()->now(), measured_period);
first_iteration = false;
}
next_iteration_time += period;
std::this_thread::sleep_until(next_iteration_time);
Expand All @@ -104,6 +113,7 @@ class AsyncComponentThread
std::thread write_and_read_{};

unsigned int cm_update_rate_;
bool first_iteration = true;
rclcpp::node_interfaces::NodeClockInterface::SharedPtr clock_interface_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,12 @@ class HARDWARE_INTERFACE_PUBLIC ResourceManager
return_type set_component_state(
const std::string & component_name, rclcpp_lifecycle::State & target_state);

/// Deletes all async components from the resource storage
/**
* Needed to join the threads immediately after the control loop is ended.
*/
void shutdown_async_components();

/// Reads all loaded hardware components.
/**
* Reads from all active hardware components.
Expand Down
11 changes: 10 additions & 1 deletion hardware_interface/src/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ class ResourceStorage
{
async_component_threads_.emplace(
std::piecewise_construct, std::forward_as_tuple(hardware.get_name()),
std::forward_as_tuple(&hardware, cm_update_rate_, clock_interface_));
std::forward_as_tuple(cm_update_rate_, clock_interface_));

async_component_threads_.at(hardware.get_name()).register_component(&hardware);
}
}
return result;
Expand Down Expand Up @@ -1324,6 +1326,13 @@ return_type ResourceManager::set_component_state(
return result;
}

void ResourceManager::shutdown_async_components()
{
resource_storage_->async_component_threads_.erase(
resource_storage_->async_component_threads_.begin(),
resource_storage_->async_component_threads_.end());
}

// CM API: Called in "update"-thread
HardwareReadWriteStatus ResourceManager::read(
const rclcpp::Time & time, const rclcpp::Duration & period)
Expand Down

0 comments on commit 2cbe470

Please sign in to comment.