From b0e4e728d5ac828b45137c4f2a817551d1a60b01 Mon Sep 17 00:00:00 2001 From: Wolfgang Hoenig Date: Mon, 27 Mar 2023 09:49:27 +0200 Subject: [PATCH] crazyflie_server_cpp: Use MultiThreadedExecutor * Decreases overall latency, especially when multiple drones are being operated * Tested with up to 4 CFs (which caused crashes without that change) 5 Threads in total: - sending motion capture data - streaming commands to (e.g., cmd_vel) to *all* robots via broadcasts - executing service calls for *all* robots (via broadcasts) - streaming commands to (e.g., cmd_vel) to individual robots - executing service calls for individual robots --- crazyflie/src/crazyflie_server.cpp | 121 ++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 38 deletions(-) diff --git a/crazyflie/src/crazyflie_server.cpp b/crazyflie/src/crazyflie_server.cpp index 1abdd3c15..d0910f3b5 100644 --- a/crazyflie/src/crazyflie_server.cpp +++ b/crazyflie/src/crazyflie_server.cpp @@ -107,6 +107,8 @@ class CrazyflieROS const std::string& cf_type, const std::string& name, rclcpp::Node* node, + rclcpp::CallbackGroup::SharedPtr callback_group_cf_cmd, + rclcpp::CallbackGroup::SharedPtr callback_group_cf_srv, bool enable_parameters = true) : logger_(rclcpp::get_logger(name)) , cf_logger_(logger_) @@ -118,17 +120,29 @@ class CrazyflieROS , node_(node) , tf_broadcaster_(node) { - service_emergency_ = node->create_service(name + "/emergency", std::bind(&CrazyflieROS::emergency, this, _1, _2)); - service_start_trajectory_ = node->create_service(name + "/start_trajectory", std::bind(&CrazyflieROS::start_trajectory, this, _1, _2)); - service_takeoff_ = node->create_service(name + "/takeoff", std::bind(&CrazyflieROS::takeoff, this, _1, _2)); - service_land_ = node->create_service(name + "/land", std::bind(&CrazyflieROS::land, this, _1, _2)); - service_go_to_ = node->create_service(name + "/go_to", std::bind(&CrazyflieROS::go_to, this, _1, _2)); - service_upload_trajectory_ = node->create_service(name + "/upload_trajectory", std::bind(&CrazyflieROS::upload_trajectory, this, _1, _2)); - service_notify_setpoints_stop_ = node->create_service(name + "/notify_setpoints_stop", std::bind(&CrazyflieROS::notify_setpoints_stop, this, _1, _2)); - - subscription_cmd_vel_legacy_ = node->create_subscription(name + "/cmd_vel_legacy", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_vel_legacy_changed, this, _1)); - subscription_cmd_full_state_ = node->create_subscription(name + "/cmd_full_state", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_full_state_changed, this, _1)); - subscription_cmd_position_ = node->create_subscription(name + "/cmd_position", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_position_changed, this, _1)); + auto sub_opt_cf_cmd = rclcpp::SubscriptionOptions(); + sub_opt_cf_cmd.callback_group = callback_group_cf_cmd; + + // Services + auto service_qos = rmw_qos_profile_services_default; + + service_emergency_ = node->create_service(name + "/emergency", std::bind(&CrazyflieROS::emergency, this, _1, _2), service_qos, callback_group_cf_srv); + service_start_trajectory_ = node->create_service(name + "/start_trajectory", std::bind(&CrazyflieROS::start_trajectory, this, _1, _2), service_qos, callback_group_cf_srv); + service_takeoff_ = node->create_service(name + "/takeoff", std::bind(&CrazyflieROS::takeoff, this, _1, _2), service_qos, callback_group_cf_srv); + service_land_ = node->create_service(name + "/land", std::bind(&CrazyflieROS::land, this, _1, _2), service_qos, callback_group_cf_srv); + service_go_to_ = node->create_service(name + "/go_to", std::bind(&CrazyflieROS::go_to, this, _1, _2), service_qos, callback_group_cf_srv); + service_upload_trajectory_ = node->create_service(name + "/upload_trajectory", std::bind(&CrazyflieROS::upload_trajectory, this, _1, _2), service_qos, callback_group_cf_srv); + service_notify_setpoints_stop_ = node->create_service(name + "/notify_setpoints_stop", std::bind(&CrazyflieROS::notify_setpoints_stop, this, _1, _2), service_qos, callback_group_cf_srv); + + // Topics + + subscription_cmd_vel_legacy_ = node->create_subscription(name + "/cmd_vel_legacy", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_vel_legacy_changed, this, _1), sub_opt_cf_cmd); + subscription_cmd_full_state_ = node->create_subscription(name + "/cmd_full_state", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_full_state_changed, this, _1), sub_opt_cf_cmd); + subscription_cmd_position_ = node->create_subscription(name + "/cmd_position", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_position_changed, this, _1), sub_opt_cf_cmd); + + // spinning timer + spin_timer_ = node->create_wall_timer(std::chrono::milliseconds(100), std::bind(&CrazyflieROS::spin_once, this), callback_group_cf_srv); + auto start = std::chrono::system_clock::now(); @@ -351,9 +365,9 @@ class CrazyflieROS cf_.requestMemoryToc(); } - void spin_some() + void spin_once() { - cf_.sendPing(); + cf_.spin_once(); } std::string broadcastUri() const @@ -697,6 +711,11 @@ class CrazyflieROS std::list> log_blocks_generic_; std::list::SharedPtr> publishers_generic_; + + // multithreading + rclcpp::CallbackGroup::SharedPtr callback_group_cf_; + rclcpp::TimerBase::SharedPtr spin_timer_; + }; class CrazyflieServer : public rclcpp::Node @@ -706,17 +725,38 @@ class CrazyflieServer : public rclcpp::Node : Node("crazyflie_server") , logger_(rclcpp::get_logger("all")) { - // topics for "all" + // Create callback groups (each group can run in a separate thread) + callback_group_mocap_ = this->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + auto sub_opt_mocap = rclcpp::SubscriptionOptions(); + sub_opt_mocap.callback_group = callback_group_mocap_; + + callback_group_all_cmd_ = this->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + auto sub_opt_all_cmd = rclcpp::SubscriptionOptions(); + sub_opt_all_cmd.callback_group = callback_group_all_cmd_; - subscription_cmd_full_state_ = this->create_subscription("all/cmd_full_state", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieServer::cmd_full_state_changed, this, _1)); + callback_group_all_srv_ = this->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + callback_group_cf_cmd_ = this->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + callback_group_cf_srv_ = this->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + // topics for "all" + subscription_cmd_full_state_ = this->create_subscription("all/cmd_full_state", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieServer::cmd_full_state_changed, this, _1), sub_opt_all_cmd); // services for "all" - service_emergency_ = this->create_service("all/emergency", std::bind(&CrazyflieServer::emergency, this, _1, _2)); - service_start_trajectory_ = this->create_service("all/start_trajectory", std::bind(&CrazyflieServer::start_trajectory, this, _1, _2)); - service_takeoff_ = this->create_service("all/takeoff", std::bind(&CrazyflieServer::takeoff, this, _1, _2)); - service_land_ = this->create_service("all/land", std::bind(&CrazyflieServer::land, this, _1, _2)); - service_go_to_ = this->create_service("all/go_to", std::bind(&CrazyflieServer::go_to, this, _1, _2)); - service_notify_setpoints_stop_ = this->create_service("all/notify_setpoints_stop", std::bind(&CrazyflieServer::notify_setpoints_stop, this, _1, _2)); + auto service_qos = rmw_qos_profile_services_default; + + service_emergency_ = this->create_service("all/emergency", std::bind(&CrazyflieServer::emergency, this, _1, _2), service_qos, callback_group_all_srv_); + service_start_trajectory_ = this->create_service("all/start_trajectory", std::bind(&CrazyflieServer::start_trajectory, this, _1, _2), service_qos, callback_group_all_srv_); + service_takeoff_ = this->create_service("all/takeoff", std::bind(&CrazyflieServer::takeoff, this, _1, _2), service_qos, callback_group_all_srv_); + service_land_ = this->create_service("all/land", std::bind(&CrazyflieServer::land, this, _1, _2), service_qos, callback_group_all_srv_); + service_go_to_ = this->create_service("all/go_to", std::bind(&CrazyflieServer::go_to, this, _1, _2), service_qos, callback_group_all_srv_); + service_notify_setpoints_stop_ = this->create_service("all/notify_setpoints_stop", std::bind(&CrazyflieServer::notify_setpoints_stop, this, _1, _2), service_qos, callback_group_all_srv_); // declare global params this->declare_parameter("all.broadcasts.num_repeats", 15); @@ -755,7 +795,13 @@ class CrazyflieServer : public rclcpp::Node // if it is a Crazyflie, try to connect if (constr == "crazyflie") { std::string uri = parameter_overrides.at("robots." + name + ".uri").get(); - crazyflies_.emplace(name, std::make_unique(uri, cf_type, name, this)); + crazyflies_.emplace(name, std::make_unique( + uri, + cf_type, + name, + this, + callback_group_cf_cmd_, + callback_group_cf_srv_)); auto broadcastUri = crazyflies_[name]->broadcastUri(); RCLCPP_INFO(logger_, "%s", broadcastUri.c_str()); @@ -782,7 +828,7 @@ class CrazyflieServer : public rclcpp::Node sensor_data_qos.keep_last(1); sensor_data_qos.deadline(rclcpp::Duration(0/*s*/, 1e9/poses_qos_deadline /*ns*/)); sub_poses_ = this->create_subscription( - "poses", sensor_data_qos, std::bind(&CrazyflieServer::posesChanged, this, _1)); + "poses", sensor_data_qos, std::bind(&CrazyflieServer::posesChanged, this, _1), sub_opt_mocap); // support for all.params @@ -794,7 +840,7 @@ class CrazyflieServer : public rclcpp::Node this->declare_parameter("warnings.frequency", 1.0); float freq = this->get_parameter("warnings.frequency").get_parameter_value().get(); if (freq >= 0.0) { - watchdog_timer_ = this->create_wall_timer(std::chrono::milliseconds((int)(1000.0/freq)), std::bind(&CrazyflieServer::on_watchdog_timer, this)); + watchdog_timer_ = this->create_wall_timer(std::chrono::milliseconds((int)(1000.0/freq)), std::bind(&CrazyflieServer::on_watchdog_timer, this), callback_group_all_srv_); } this->declare_parameter("warnings.motion_capture.warning_if_rate_outside", std::vector({80.0, 120.0})); auto rate_range = this->get_parameter("warnings.motion_capture.warning_if_rate_outside").get_parameter_value().get>(); @@ -802,12 +848,6 @@ class CrazyflieServer : public rclcpp::Node mocap_max_rate_ = rate_range[1]; } - void spin_some() - { - for (auto& cf : crazyflies_) { - cf.second->spin_some(); - } - } private: void emergency(const std::shared_ptr request, @@ -1146,21 +1186,26 @@ class CrazyflieServer : public rclcpp::Node float mocap_min_rate_; float mocap_max_rate_; std::vector> mocap_data_received_timepoints_; + + // multithreading + rclcpp::CallbackGroup::SharedPtr callback_group_mocap_; + rclcpp::CallbackGroup::SharedPtr callback_group_all_cmd_; + rclcpp::CallbackGroup::SharedPtr callback_group_all_srv_; + rclcpp::CallbackGroup::SharedPtr callback_group_cf_cmd_; + rclcpp::CallbackGroup::SharedPtr callback_group_cf_srv_; }; int main(int argc, char *argv[]) { rclcpp::init(argc, argv); - // rclcpp::spin(std::make_shared()); + auto node = std::make_shared(); - while (true) - { - node->spin_some(); - rclcpp::spin_some(node); - - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } + + rclcpp::executors::MultiThreadedExecutor executor; + executor.add_node(node); + executor.spin(); + rclcpp::shutdown(); return 0; }