Skip to content

Commit

Permalink
crazyflie_server_cpp: Use MultiThreadedExecutor
Browse files Browse the repository at this point in the history
* Decreases overall latency, especially when multiple drones are being operated
* Tested with up to 4 CFs (which caused crashes without that change)

5 Threads in total:
- sending motion capture data
- streaming commands to (e.g., cmd_vel) to *all* robots via broadcasts
- executing service calls for *all* robots (via broadcasts)
- streaming commands to (e.g., cmd_vel) to individual robots
- executing service calls for individual robots
  • Loading branch information
whoenig committed Aug 18, 2023
1 parent 2e752cc commit b0e4e72
Showing 1 changed file with 83 additions and 38 deletions.
121 changes: 83 additions & 38 deletions crazyflie/src/crazyflie_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class CrazyflieROS
const std::string& cf_type,
const std::string& name,
rclcpp::Node* node,
rclcpp::CallbackGroup::SharedPtr callback_group_cf_cmd,
rclcpp::CallbackGroup::SharedPtr callback_group_cf_srv,
bool enable_parameters = true)
: logger_(rclcpp::get_logger(name))
, cf_logger_(logger_)
Expand All @@ -118,17 +120,29 @@ class CrazyflieROS
, node_(node)
, tf_broadcaster_(node)
{
service_emergency_ = node->create_service<Empty>(name + "/emergency", std::bind(&CrazyflieROS::emergency, this, _1, _2));
service_start_trajectory_ = node->create_service<StartTrajectory>(name + "/start_trajectory", std::bind(&CrazyflieROS::start_trajectory, this, _1, _2));
service_takeoff_ = node->create_service<Takeoff>(name + "/takeoff", std::bind(&CrazyflieROS::takeoff, this, _1, _2));
service_land_ = node->create_service<Land>(name + "/land", std::bind(&CrazyflieROS::land, this, _1, _2));
service_go_to_ = node->create_service<GoTo>(name + "/go_to", std::bind(&CrazyflieROS::go_to, this, _1, _2));
service_upload_trajectory_ = node->create_service<UploadTrajectory>(name + "/upload_trajectory", std::bind(&CrazyflieROS::upload_trajectory, this, _1, _2));
service_notify_setpoints_stop_ = node->create_service<NotifySetpointsStop>(name + "/notify_setpoints_stop", std::bind(&CrazyflieROS::notify_setpoints_stop, this, _1, _2));

subscription_cmd_vel_legacy_ = node->create_subscription<geometry_msgs::msg::Twist>(name + "/cmd_vel_legacy", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_vel_legacy_changed, this, _1));
subscription_cmd_full_state_ = node->create_subscription<crazyflie_interfaces::msg::FullState>(name + "/cmd_full_state", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_full_state_changed, this, _1));
subscription_cmd_position_ = node->create_subscription<crazyflie_interfaces::msg::Position>(name + "/cmd_position", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_position_changed, this, _1));
auto sub_opt_cf_cmd = rclcpp::SubscriptionOptions();
sub_opt_cf_cmd.callback_group = callback_group_cf_cmd;

// Services
auto service_qos = rmw_qos_profile_services_default;

service_emergency_ = node->create_service<Empty>(name + "/emergency", std::bind(&CrazyflieROS::emergency, this, _1, _2), service_qos, callback_group_cf_srv);
service_start_trajectory_ = node->create_service<StartTrajectory>(name + "/start_trajectory", std::bind(&CrazyflieROS::start_trajectory, this, _1, _2), service_qos, callback_group_cf_srv);
service_takeoff_ = node->create_service<Takeoff>(name + "/takeoff", std::bind(&CrazyflieROS::takeoff, this, _1, _2), service_qos, callback_group_cf_srv);
service_land_ = node->create_service<Land>(name + "/land", std::bind(&CrazyflieROS::land, this, _1, _2), service_qos, callback_group_cf_srv);
service_go_to_ = node->create_service<GoTo>(name + "/go_to", std::bind(&CrazyflieROS::go_to, this, _1, _2), service_qos, callback_group_cf_srv);
service_upload_trajectory_ = node->create_service<UploadTrajectory>(name + "/upload_trajectory", std::bind(&CrazyflieROS::upload_trajectory, this, _1, _2), service_qos, callback_group_cf_srv);
service_notify_setpoints_stop_ = node->create_service<NotifySetpointsStop>(name + "/notify_setpoints_stop", std::bind(&CrazyflieROS::notify_setpoints_stop, this, _1, _2), service_qos, callback_group_cf_srv);

// Topics

subscription_cmd_vel_legacy_ = node->create_subscription<geometry_msgs::msg::Twist>(name + "/cmd_vel_legacy", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_vel_legacy_changed, this, _1), sub_opt_cf_cmd);
subscription_cmd_full_state_ = node->create_subscription<crazyflie_interfaces::msg::FullState>(name + "/cmd_full_state", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_full_state_changed, this, _1), sub_opt_cf_cmd);
subscription_cmd_position_ = node->create_subscription<crazyflie_interfaces::msg::Position>(name + "/cmd_position", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieROS::cmd_position_changed, this, _1), sub_opt_cf_cmd);

// spinning timer
spin_timer_ = node->create_wall_timer(std::chrono::milliseconds(100), std::bind(&CrazyflieROS::spin_once, this), callback_group_cf_srv);


auto start = std::chrono::system_clock::now();

Expand Down Expand Up @@ -351,9 +365,9 @@ class CrazyflieROS
cf_.requestMemoryToc();
}

void spin_some()
void spin_once()
{
cf_.sendPing();
cf_.spin_once();
}

std::string broadcastUri() const
Expand Down Expand Up @@ -697,6 +711,11 @@ class CrazyflieROS

std::list<std::unique_ptr<LogBlockGeneric>> log_blocks_generic_;
std::list<rclcpp::Publisher<crazyflie_interfaces::msg::LogDataGeneric>::SharedPtr> publishers_generic_;

// multithreading
rclcpp::CallbackGroup::SharedPtr callback_group_cf_;
rclcpp::TimerBase::SharedPtr spin_timer_;

};

class CrazyflieServer : public rclcpp::Node
Expand All @@ -706,17 +725,38 @@ class CrazyflieServer : public rclcpp::Node
: Node("crazyflie_server")
, logger_(rclcpp::get_logger("all"))
{
// topics for "all"
// Create callback groups (each group can run in a separate thread)
callback_group_mocap_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
auto sub_opt_mocap = rclcpp::SubscriptionOptions();
sub_opt_mocap.callback_group = callback_group_mocap_;

callback_group_all_cmd_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
auto sub_opt_all_cmd = rclcpp::SubscriptionOptions();
sub_opt_all_cmd.callback_group = callback_group_all_cmd_;

subscription_cmd_full_state_ = this->create_subscription<crazyflie_interfaces::msg::FullState>("all/cmd_full_state", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieServer::cmd_full_state_changed, this, _1));
callback_group_all_srv_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);

callback_group_cf_cmd_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);

callback_group_cf_srv_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);

// topics for "all"
subscription_cmd_full_state_ = this->create_subscription<crazyflie_interfaces::msg::FullState>("all/cmd_full_state", rclcpp::SystemDefaultsQoS(), std::bind(&CrazyflieServer::cmd_full_state_changed, this, _1), sub_opt_all_cmd);

// services for "all"
service_emergency_ = this->create_service<Empty>("all/emergency", std::bind(&CrazyflieServer::emergency, this, _1, _2));
service_start_trajectory_ = this->create_service<StartTrajectory>("all/start_trajectory", std::bind(&CrazyflieServer::start_trajectory, this, _1, _2));
service_takeoff_ = this->create_service<Takeoff>("all/takeoff", std::bind(&CrazyflieServer::takeoff, this, _1, _2));
service_land_ = this->create_service<Land>("all/land", std::bind(&CrazyflieServer::land, this, _1, _2));
service_go_to_ = this->create_service<GoTo>("all/go_to", std::bind(&CrazyflieServer::go_to, this, _1, _2));
service_notify_setpoints_stop_ = this->create_service<NotifySetpointsStop>("all/notify_setpoints_stop", std::bind(&CrazyflieServer::notify_setpoints_stop, this, _1, _2));
auto service_qos = rmw_qos_profile_services_default;

service_emergency_ = this->create_service<Empty>("all/emergency", std::bind(&CrazyflieServer::emergency, this, _1, _2), service_qos, callback_group_all_srv_);
service_start_trajectory_ = this->create_service<StartTrajectory>("all/start_trajectory", std::bind(&CrazyflieServer::start_trajectory, this, _1, _2), service_qos, callback_group_all_srv_);
service_takeoff_ = this->create_service<Takeoff>("all/takeoff", std::bind(&CrazyflieServer::takeoff, this, _1, _2), service_qos, callback_group_all_srv_);
service_land_ = this->create_service<Land>("all/land", std::bind(&CrazyflieServer::land, this, _1, _2), service_qos, callback_group_all_srv_);
service_go_to_ = this->create_service<GoTo>("all/go_to", std::bind(&CrazyflieServer::go_to, this, _1, _2), service_qos, callback_group_all_srv_);
service_notify_setpoints_stop_ = this->create_service<NotifySetpointsStop>("all/notify_setpoints_stop", std::bind(&CrazyflieServer::notify_setpoints_stop, this, _1, _2), service_qos, callback_group_all_srv_);

// declare global params
this->declare_parameter("all.broadcasts.num_repeats", 15);
Expand Down Expand Up @@ -755,7 +795,13 @@ class CrazyflieServer : public rclcpp::Node
// if it is a Crazyflie, try to connect
if (constr == "crazyflie") {
std::string uri = parameter_overrides.at("robots." + name + ".uri").get<std::string>();
crazyflies_.emplace(name, std::make_unique<CrazyflieROS>(uri, cf_type, name, this));
crazyflies_.emplace(name, std::make_unique<CrazyflieROS>(
uri,
cf_type,
name,
this,
callback_group_cf_cmd_,
callback_group_cf_srv_));

auto broadcastUri = crazyflies_[name]->broadcastUri();
RCLCPP_INFO(logger_, "%s", broadcastUri.c_str());
Expand All @@ -782,7 +828,7 @@ class CrazyflieServer : public rclcpp::Node
sensor_data_qos.keep_last(1);
sensor_data_qos.deadline(rclcpp::Duration(0/*s*/, 1e9/poses_qos_deadline /*ns*/));
sub_poses_ = this->create_subscription<NamedPoseArray>(
"poses", sensor_data_qos, std::bind(&CrazyflieServer::posesChanged, this, _1));
"poses", sensor_data_qos, std::bind(&CrazyflieServer::posesChanged, this, _1), sub_opt_mocap);

// support for all.params

Expand All @@ -794,20 +840,14 @@ class CrazyflieServer : public rclcpp::Node
this->declare_parameter("warnings.frequency", 1.0);
float freq = this->get_parameter("warnings.frequency").get_parameter_value().get<float>();
if (freq >= 0.0) {
watchdog_timer_ = this->create_wall_timer(std::chrono::milliseconds((int)(1000.0/freq)), std::bind(&CrazyflieServer::on_watchdog_timer, this));
watchdog_timer_ = this->create_wall_timer(std::chrono::milliseconds((int)(1000.0/freq)), std::bind(&CrazyflieServer::on_watchdog_timer, this), callback_group_all_srv_);
}
this->declare_parameter("warnings.motion_capture.warning_if_rate_outside", std::vector<double>({80.0, 120.0}));
auto rate_range = this->get_parameter("warnings.motion_capture.warning_if_rate_outside").get_parameter_value().get<std::vector<double>>();
mocap_min_rate_ = rate_range[0];
mocap_max_rate_ = rate_range[1];
}

void spin_some()
{
for (auto& cf : crazyflies_) {
cf.second->spin_some();
}
}

private:
void emergency(const std::shared_ptr<Empty::Request> request,
Expand Down Expand Up @@ -1146,21 +1186,26 @@ class CrazyflieServer : public rclcpp::Node
float mocap_min_rate_;
float mocap_max_rate_;
std::vector<std::chrono::time_point<std::chrono::steady_clock>> mocap_data_received_timepoints_;

// multithreading
rclcpp::CallbackGroup::SharedPtr callback_group_mocap_;
rclcpp::CallbackGroup::SharedPtr callback_group_all_cmd_;
rclcpp::CallbackGroup::SharedPtr callback_group_all_srv_;
rclcpp::CallbackGroup::SharedPtr callback_group_cf_cmd_;
rclcpp::CallbackGroup::SharedPtr callback_group_cf_srv_;
};

int main(int argc, char *argv[])
{
rclcpp::init(argc, argv);

// rclcpp::spin(std::make_shared<CrazyflieServer>());

auto node = std::make_shared<CrazyflieServer>();
while (true)
{
node->spin_some();
rclcpp::spin_some(node);

std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();

rclcpp::shutdown();
return 0;
}

0 comments on commit b0e4e72

Please sign in to comment.