Skip to content

Commit

Permalink
add support for lazy subscribers (#815)
Browse files Browse the repository at this point in the history
This implements #780 for ROS 2 distributions after Iron, where we have:

 * Connect/disconnect callbacks, per ros2/rmw#330 (this made it into Iron)
 * Updated APIs in ros-perception/image_common#272 (this is only in Rolling currently)
  • Loading branch information
mikeferguson authored Jan 18, 2024
1 parent 8f16860 commit 335e878
Show file tree
Hide file tree
Showing 18 changed files with 243 additions and 312 deletions.
2 changes: 0 additions & 2 deletions depth_image_proc/include/depth_image_proc/point_cloud_xyz.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ class PointCloudXyzNode : public rclcpp::Node

image_geometry::PinholeCameraModel model_;

void connectCb();

void depthCb(
const Image::ConstSharedPtr & depth_msg,
const CameraInfo::ConstSharedPtr & info_msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ class PointCloudXyzRadialNode : public rclcpp::Node

cv::Mat transform_;

void connectCb();

void depthCb(
const sensor_msgs::msg::Image::ConstSharedPtr & depth_msg,
const sensor_msgs::msg::CameraInfo::ConstSharedPtr & info_msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class PointCloudXyziNode : public rclcpp::Node

image_geometry::PinholeCameraModel model_;

void connectCb();

void imageCb(
const Image::ConstSharedPtr & depth_msg,
const Image::ConstSharedPtr & intensity_msg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ class PointCloudXyziRadialNode : public rclcpp::Node

cv::Mat transform_;

void connectCb();

void imageCb(
const Image::ConstSharedPtr & depth_msg,
const Image::ConstSharedPtr & intensity_msg_in,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ class PointCloudXyzrgbNode : public rclcpp::Node

image_geometry::PinholeCameraModel model_;

void connectCb();

void imageCb(
const Image::ConstSharedPtr & depth_msg,
const Image::ConstSharedPtr & rgb_msg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ class PointCloudXyzrgbRadialNode : public rclcpp::Node

image_geometry::PinholeCameraModel model_;

void connectCb();

void imageCb(
const Image::ConstSharedPtr & depth_msg,
const Image::ConstSharedPtr & rgb_msg,
Expand Down
48 changes: 18 additions & 30 deletions depth_image_proc/src/convert_metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,42 +58,30 @@ class ConvertMetricNode : public rclcpp::Node
std::mutex connect_mutex_;
image_transport::Publisher pub_depth_;

void connectCb();

void depthCb(const sensor_msgs::msg::Image::ConstSharedPtr & raw_msg);
};

ConvertMetricNode::ConvertMetricNode(const rclcpp::NodeOptions & options)
: Node("ConvertMetricNode", options)
{
// Monitor whether anyone is subscribed to the output
// TODO(ros2) Implement when SubscriberStatusCallback is available
// image_transport::SubscriberStatusCallback connect_cb =
// std::bind(&ConvertMetricNode::connectCb, this);
connectCb();

// Make sure we don't enter connectCb() between advertising and assigning to pub_depth_
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement when SubscriberStatusCallback is available
// pub_depth_ = it_->advertise("image", 1, connect_cb, connect_cb);
pub_depth_ = image_transport::create_publisher(this, "image");
}

// Handles (un)subscribing when clients (un)subscribe
void ConvertMetricNode::connectCb()
{
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement getNumSubscribers when rcl/rmw support it
// if (pub_depth_.getNumSubscribers() == 0)
if (0) {
sub_raw_.shutdown();
} else if (!sub_raw_) {
image_transport::TransportHints hints(this, "raw");
sub_raw_ = image_transport::create_subscription(
this, "image_raw",
std::bind(&ConvertMetricNode::depthCb, this, std::placeholders::_1),
hints.getTransport());
}
// Create publisher with connect callback
rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback =
[this](rclcpp::MatchedInfo &)
{
std::lock_guard<std::mutex> lock(connect_mutex_);
if (pub_depth_.getNumSubscribers() == 0) {
sub_raw_.shutdown();
} else if (!sub_raw_) {
image_transport::TransportHints hints(this, "raw");
sub_raw_ = image_transport::create_subscription(
this, "image_raw",
std::bind(&ConvertMetricNode::depthCb, this, std::placeholders::_1),
hints.getTransport());
}
};
pub_depth_ =
image_transport::create_publisher(this, "image", rmw_qos_profile_default, pub_options);
}

void ConvertMetricNode::depthCb(const sensor_msgs::msg::Image::ConstSharedPtr & raw_msg)
Expand Down
53 changes: 23 additions & 30 deletions depth_image_proc/src/point_cloud_xyz.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,31 @@ PointCloudXyzNode::PointCloudXyzNode(const rclcpp::NodeOptions & options)
// Read parameters
queue_size_ = this->declare_parameter<int>("queue_size", 5);

// Monitor whether anyone is subscribed to the output
// TODO(ros2) Implement when SubscriberStatusCallback is available
// ros::SubscriberStatusCallback connect_cb = boost::bind(&PointCloudXyzNode::connectCb, this);
connectCb();

// Make sure we don't enter connectCb() between advertising and assigning to pub_point_cloud_
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement when SubscriberStatusCallback is available
// pub_point_cloud_ = nh.advertise<PointCloud>("points", 1, connect_cb, connect_cb);
pub_point_cloud_ = create_publisher<PointCloud2>("points", rclcpp::SensorDataQoS());
// Create publisher with connect callback
rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback =
[this](rclcpp::MatchedInfo & s)
{
std::lock_guard<std::mutex> lock(connect_mutex_);
if (s.current_count == 0) {
sub_depth_.shutdown();
} else if (!sub_depth_) {
auto custom_qos = rmw_qos_profile_system_default;
custom_qos.depth = queue_size_;

sub_depth_ = image_transport::create_camera_subscription(
this,
"image_rect",
std::bind(
&PointCloudXyzNode::depthCb, this, std::placeholders::_1,
std::placeholders::_2),
"raw",
custom_qos);
}
};
pub_point_cloud_ = create_publisher<PointCloud2>("points", rclcpp::SensorDataQoS(), pub_options);
}

// Handles (un)subscribing when clients (un)subscribe
void PointCloudXyzNode::connectCb()
{
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement getNumSubscribers when rcl/rmw support it
// if (pub_point_cloud_->getNumSubscribers() == 0)
if (0) {
sub_depth_.shutdown();
} else if (!sub_depth_) {
auto custom_qos = rmw_qos_profile_system_default;
custom_qos.depth = queue_size_;

sub_depth_ = image_transport::create_camera_subscription(
this,
"image_rect",
std::bind(&PointCloudXyzNode::depthCb, this, std::placeholders::_1, std::placeholders::_2),
"raw",
custom_qos);
}
}

void PointCloudXyzNode::depthCb(
const Image::ConstSharedPtr & depth_msg,
Expand Down
56 changes: 23 additions & 33 deletions depth_image_proc/src/point_cloud_xyz_radial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,40 +53,30 @@ PointCloudXyzRadialNode::PointCloudXyzRadialNode(const rclcpp::NodeOptions & opt
// Read parameters
queue_size_ = this->declare_parameter<int>("queue_size", 5);

// Monitor whether anyone is subscribed to the output
// TODO(ros2) Implement when SubscriberStatusCallback is available
// ros::SubscriberStatusCallback connect_cb =
// boost::bind(&PointCloudXyzRadialNode::connectCb, this);
connectCb();
// Make sure we don't enter connectCb() between advertising and assigning to pub_point_cloud_
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement when SubscriberStatusCallback is available
// pub_point_cloud_ = nh.advertise<PointCloud>("points", 1, connect_cb, connect_cb);
// Create publisher with connect callback
rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback =
[this](rclcpp::MatchedInfo & s)
{
std::lock_guard<std::mutex> lock(connect_mutex_);
if (s.current_count == 0) {
sub_depth_.shutdown();
} else if (!sub_depth_) {
auto custom_qos = rmw_qos_profile_system_default;
custom_qos.depth = queue_size_;

sub_depth_ = image_transport::create_camera_subscription(
this,
"image_raw",
std::bind(
&PointCloudXyzRadialNode::depthCb, this, std::placeholders::_1,
std::placeholders::_2),
"raw",
custom_qos);
}
};
pub_point_cloud_ = create_publisher<sensor_msgs::msg::PointCloud2>(
"points", rclcpp::SensorDataQoS());
}

// Handles (un)subscribing when clients (un)subscribe
void PointCloudXyzRadialNode::connectCb()
{
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement getNumSubscribers when rcl/rmw support it
// if (pub_point_cloud_.getNumSubscribers() == 0)
if (0) {
sub_depth_.shutdown();
} else if (!sub_depth_) {
auto custom_qos = rmw_qos_profile_system_default;
custom_qos.depth = queue_size_;

sub_depth_ = image_transport::create_camera_subscription(
this,
"image_raw",
std::bind(
&PointCloudXyzRadialNode::depthCb, this, std::placeholders::_1,
std::placeholders::_2),
"raw",
custom_qos);
}
"points", rclcpp::SensorDataQoS(), pub_options);
}

void PointCloudXyzRadialNode::depthCb(
Expand Down
59 changes: 25 additions & 34 deletions depth_image_proc/src/point_cloud_xyzi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,40 +71,31 @@ PointCloudXyziNode::PointCloudXyziNode(const rclcpp::NodeOptions & options)
std::placeholders::_2,
std::placeholders::_3));

// Monitor whether anyone is subscribed to the output
// TODO(ros2) Implement when SubscriberStatusCallback is available
// ros::SubscriberStatusCallback connect_cb = boost::bind(&PointCloudXyziNode::connectCb, this);
connectCb();
// Make sure we don't enter connectCb() between advertising and assigning to pub_point_cloud_
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement when SubscriberStatusCallback is available
// pub_point_cloud_ = depth_nh.advertise<PointCloud>("points", 1, connect_cb, connect_cb);
pub_point_cloud_ = create_publisher<PointCloud>("points", rclcpp::SensorDataQoS());
}

// Handles (un)subscribing when clients (un)subscribe
void PointCloudXyziNode::connectCb()
{
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement getNumSubscribers when rcl/rmw support it
// if (pub_point_cloud_->getNumSubscribers() == 0)
if (0) {
sub_depth_.unsubscribe();
sub_intensity_.unsubscribe();
sub_info_.unsubscribe();
} else if (!sub_depth_.getSubscriber()) {
// parameter for depth_image_transport hint
std::string depth_image_transport_param = "depth_image_transport";

// depth image can use different transport.(e.g. compressedDepth)
image_transport::TransportHints depth_hints(this, "raw", depth_image_transport_param);
sub_depth_.subscribe(this, "depth/image_rect", depth_hints.getTransport());

// intensity uses normal ros transport hints.
image_transport::TransportHints hints(this, "raw");
sub_intensity_.subscribe(this, "intensity/image_rect", hints.getTransport());
sub_info_.subscribe(this, "intensity/camera_info");
}
// Create publisher with connect callback
rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback =
[this](rclcpp::MatchedInfo & s)
{
std::lock_guard<std::mutex> lock(connect_mutex_);
if (s.current_count == 0) {
sub_depth_.unsubscribe();
sub_intensity_.unsubscribe();
sub_info_.unsubscribe();
} else if (!sub_depth_.getSubscriber()) {
// parameter for depth_image_transport hint
std::string depth_image_transport_param = "depth_image_transport";

// depth image can use different transport.(e.g. compressedDepth)
image_transport::TransportHints depth_hints(this, "raw", depth_image_transport_param);
sub_depth_.subscribe(this, "depth/image_rect", depth_hints.getTransport());

// intensity uses normal ros transport hints.
image_transport::TransportHints hints(this, "raw");
sub_intensity_.subscribe(this, "intensity/image_rect", hints.getTransport());
sub_info_.subscribe(this, "intensity/camera_info");
}
};
pub_point_cloud_ = create_publisher<PointCloud>("points", rclcpp::SensorDataQoS(), pub_options);
}

void PointCloudXyziNode::imageCb(
Expand Down
62 changes: 25 additions & 37 deletions depth_image_proc/src/point_cloud_xyzi_radial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,44 +69,32 @@ PointCloudXyziRadialNode::PointCloudXyziRadialNode(const rclcpp::NodeOptions & o
std::placeholders::_2,
std::placeholders::_3));

// Monitor whether anyone is subscribed to the output
// TODO(ros2) Implement when SubscriberStatusCallback is available
// ros::SubscriberStatusCallback connect_cb =
// boost::bind(&PointCloudXyziRadialNode::connectCb, this);
connectCb();

// Make sure we don't enter connectCb() between advertising and assigning to pub_point_cloud_
std::lock_guard<std::mutex> lock(connect_mutex_);
// TODO(ros2) Implement when SubscriberStatusCallback is available
// pub_point_cloud_ = nh.advertise<PointCloud>("points", 20, connect_cb, connect_cb);
// Create publisher with connect callback
rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback =
[this](rclcpp::MatchedInfo & s)
{
std::lock_guard<std::mutex> lock(connect_mutex_);
if (s.current_count == 0) {
sub_depth_.unsubscribe();
sub_intensity_.unsubscribe();
sub_info_.unsubscribe();
} else if (!sub_depth_.getSubscriber()) {
// parameter for depth_image_transport hint
std::string depth_image_transport_param = "depth_image_transport";

// depth image can use different transport.(e.g. compressedDepth)
image_transport::TransportHints depth_hints(this, "raw", depth_image_transport_param);
sub_depth_.subscribe(this, "depth/image_raw", depth_hints.getTransport());

// intensity uses normal ros transport hints.
image_transport::TransportHints hints(this, "raw");
sub_intensity_.subscribe(this, "intensity/image_raw", hints.getTransport());
sub_info_.subscribe(this, "intensity/camera_info");
}
};
pub_point_cloud_ = create_publisher<sensor_msgs::msg::PointCloud2>(
"points", rclcpp::SensorDataQoS());
}

// Handles (un)subscribing when clients (un)subscribe
void PointCloudXyziRadialNode::connectCb()
{
std::lock_guard<std::mutex> lock(connect_mutex_);

// TODO(ros2) Implement getNumSubscribers when rcl/rmw support it
// if (pub_point_cloud_.getNumSubscribers() == 0)
if (0) {
sub_depth_.unsubscribe();
sub_intensity_.unsubscribe();
sub_info_.unsubscribe();
} else if (!sub_depth_.getSubscriber()) {
// parameter for depth_image_transport hint
std::string depth_image_transport_param = "depth_image_transport";

// depth image can use different transport.(e.g. compressedDepth)
image_transport::TransportHints depth_hints(this, "raw", depth_image_transport_param);
sub_depth_.subscribe(this, "depth/image_raw", depth_hints.getTransport());

// intensity uses normal ros transport hints.
image_transport::TransportHints hints(this, "raw");
sub_intensity_.subscribe(this, "intensity/image_raw", hints.getTransport());
sub_info_.subscribe(this, "intensity/camera_info");
}
"points", rclcpp::SensorDataQoS(), pub_options);
}

void PointCloudXyziRadialNode::imageCb(
Expand Down
Loading

0 comments on commit 335e878

Please sign in to comment.