Skip to content

Commit

Permalink
Separate web_video_server into a component and an executable
Browse files Browse the repository at this point in the history
The [ROS 2 (Humble)
docs](https://docs.ros.org/en/humble/Concepts/Intermediate/About-Composition.html#writing-a-component)
recommend writing applications as components:

> By making the process layout a deploy-time decision the user can
> choose between:
>
> * running multiple nodes in separate processes with the benefits
> of process/fault isolation as well as easier debugging of individual
> nodes and
>
> * running multiple nodes in a single process with the lower overhead
> and optionally more efficient communication.

The default deployment scheme for the `web_video_server` node remains
the same: i.e., using
`ros2 run web_video_server web_video_server {parameter-args-here}`.

Having a component library available adds flexibility for users,
particularly for those users looking to use intra-process memory
for published camera topics and the web video server.
  • Loading branch information
jwdinius committed Oct 28, 2024
1 parent 48215bb commit cea8e8e
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 67 deletions.
42 changes: 35 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ find_package(async_web_server_cpp REQUIRED)
find_package(cv_bridge REQUIRED)
find_package(image_transport REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(sensor_msgs REQUIRED)

find_package(OpenCV REQUIRED)
Expand Down Expand Up @@ -43,7 +44,7 @@ include_directories(include
)

## Declare a cpp executable
add_executable(${PROJECT_NAME}
add_library(${PROJECT_NAME}_lib SHARED
src/web_video_server.cpp
src/image_streamer.cpp
src/libav_streamer.cpp
Expand All @@ -57,12 +58,8 @@ add_executable(${PROJECT_NAME}
src/utils.cpp
)

ament_target_dependencies(${PROJECT_NAME}
sensor_msgs
)

## Specify libraries to link a library or executable target against
target_link_libraries(${PROJECT_NAME}
target_link_libraries(${PROJECT_NAME}_lib
async_web_server_cpp::async_web_server_cpp
cv_bridge::cv_bridge
image_transport::image_transport
Expand All @@ -76,12 +73,43 @@ target_link_libraries(${PROJECT_NAME}
${swscale_LIBRARIES}
)

## Declare a cpp executable
add_executable(${PROJECT_NAME}
src/web_video_server_node.cpp
)

target_link_libraries(${PROJECT_NAME}
${PROJECT_NAME}_lib
)

ament_target_dependencies(${PROJECT_NAME}_lib
rclcpp_components
sensor_msgs
)

rclcpp_components_register_nodes(${PROJECT_NAME}_lib "web_video_server::WebVideoServer")

#############
## Install ##
#############

## Mark executables and/or libraries for installation
install(TARGETS ${PROJECT_NAME}
install(
DIRECTORY include/
DESTINATION include/${PROJECT_NAME}
)

install(
TARGETS ${PROJECT_NAME}_lib
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
)
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)

install(
TARGETS ${PROJECT_NAME}
DESTINATION lib/${PROJECT_NAME}
)

Expand Down
10 changes: 2 additions & 8 deletions include/web_video_server/web_video_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,20 @@ namespace web_video_server
* @class WebVideoServer
* @brief
*/
class WebVideoServer
class WebVideoServer : public rclcpp::Node
{
public:
/**
* @brief Constructor
* @return
*/
explicit WebVideoServer(rclcpp::Node::SharedPtr & node);
explicit WebVideoServer(const rclcpp::NodeOptions & options);

/**
* @brief Destructor - Cleans up
*/
virtual ~WebVideoServer();

/**
* @brief Starts the server and spins
*/
void spin();

void setup_cleanup_inactive_streams();

bool handle_request(
Expand Down Expand Up @@ -104,7 +99,6 @@ class WebVideoServer
void restreamFrames(double max_age);
void cleanup_inactive_streams();

rclcpp::Node::SharedPtr node_;
rclcpp::WallTimer<rclcpp::VoidCallbackType>::SharedPtr cleanup_timer_;

// Parameters
Expand Down
2 changes: 2 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
<buildtool_depend>ament_cmake_ros</buildtool_depend>

<build_depend>rclcpp</build_depend>
<build_depend>rclcpp_components</build_depend>
<build_depend>cv_bridge</build_depend>
<build_depend>image_transport</build_depend>
<build_depend>async_web_server_cpp</build_depend>
<build_depend>ffmpeg</build_depend>
<build_depend>sensor_msgs</build_depend>

<exec_depend>rclcpp</exec_depend>
<exec_depend>rclcpp_components</exec_depend>
<exec_depend>cv_bridge</exec_depend>
<exec_depend>image_transport</exec_depend>
<exec_depend>async_web_server_cpp</exec_depend>
Expand Down
94 changes: 42 additions & 52 deletions src/web_video_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <boost/algorithm/string/predicate.hpp>
#include <opencv2/opencv.hpp>

#include "rclcpp/rclcpp.hpp"

#include "sensor_msgs/image_encodings.hpp"
#include "web_video_server/ros_compressed_streamer.hpp"
#include "web_video_server/jpeg_streamers.hpp"
Expand All @@ -51,26 +53,26 @@ using namespace boost::placeholders; // NOLINT
namespace web_video_server
{

WebVideoServer::WebVideoServer(rclcpp::Node::SharedPtr & node)
: node_(node), handler_group_(
WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options)
: rclcpp::Node("web_video_server", options), handler_group_(
async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found))
{
node_->declare_parameter("port", 8080);
node_->declare_parameter("verbose", true);
node_->declare_parameter("address", "0.0.0.0");
node_->declare_parameter("server_threads", 1);
node_->declare_parameter("ros_threads", 2);
node_->declare_parameter("publish_rate", -1.0);
node_->declare_parameter("default_stream_type", "mjpeg");

node_->get_parameter("port", port_);
node_->get_parameter("verbose", verbose_);
node_->get_parameter("address", address_);
declare_parameter("port", 8080);
declare_parameter("verbose", true);
declare_parameter("address", "0.0.0.0");
declare_parameter("server_threads", 1);
declare_parameter("ros_threads", 2);
declare_parameter("publish_rate", -1.0);
declare_parameter("default_stream_type", "mjpeg");

get_parameter("port", port_);
get_parameter("verbose", verbose_);
get_parameter("address", address_);
int server_threads;
node_->get_parameter("server_threads", server_threads);
node_->get_parameter("ros_threads", ros_threads_);
node_->get_parameter("publish_rate", publish_rate_);
node_->get_parameter("default_stream_type", default_stream_type_);
get_parameter("server_threads", server_threads);
get_parameter("ros_threads", ros_threads_);
get_parameter("publish_rate", publish_rate_);
get_parameter("default_stream_type", default_stream_type_);

stream_types_["mjpeg"] = std::make_shared<MjpegStreamerType>();
stream_types_["png"] = std::make_shared<PngStreamerType>();
Expand Down Expand Up @@ -102,33 +104,26 @@ WebVideoServer::WebVideoServer(rclcpp::Node::SharedPtr & node)
);
} catch (boost::exception & e) {
RCLCPP_ERROR(
node_->get_logger(), "Exception when creating the web server! %s:%d",
get_logger(), "Exception when creating the web server! %s:%d",
address_.c_str(), port_);
throw;
}
RCLCPP_INFO(get_logger(), "Waiting For connections on %s:%d", address_.c_str(), port_);
if (publish_rate_ > 0) {
create_wall_timer(1s / publish_rate_, [this]() {restreamFrames(1.0 / publish_rate_);});
}
server_->run();
}

WebVideoServer::~WebVideoServer()
{
server_->stop();
}

void WebVideoServer::setup_cleanup_inactive_streams()
{
std::function<void()> callback = std::bind(&WebVideoServer::cleanup_inactive_streams, this);
cleanup_timer_ = node_->create_wall_timer(500ms, callback);
}

void WebVideoServer::spin()
{
server_->run();
RCLCPP_INFO(node_->get_logger(), "Waiting For connections on %s:%d", address_.c_str(), port_);
rclcpp::executors::MultiThreadedExecutor spinner(rclcpp::ExecutorOptions(), ros_threads_);
spinner.add_node(node_);
if (publish_rate_ > 0) {
node_->create_wall_timer(1s / publish_rate_, [this]() {restreamFrames(1.0 / publish_rate_);});
}
spinner.spin();
server_->stop();
cleanup_timer_ = create_wall_timer(500ms, callback);
}

void WebVideoServer::restreamFrames(double max_age)
Expand All @@ -149,7 +144,7 @@ void WebVideoServer::cleanup_inactive_streams()
[](const std::shared_ptr<ImageStreamer> & streamer) {return !streamer->isInactive();});
if (verbose_) {
for (auto itr = new_end; itr < image_subscribers_.end(); ++itr) {
RCLCPP_INFO(node_->get_logger(), "Removed Stream: %s", (*itr)->getTopic().c_str());
RCLCPP_INFO(get_logger(), "Removed Stream: %s", (*itr)->getTopic().c_str());
}
}
image_subscribers_.erase(new_end, image_subscribers_.end());
Expand All @@ -162,13 +157,13 @@ bool WebVideoServer::handle_request(
const char * end)
{
if (verbose_) {
RCLCPP_INFO(node_->get_logger(), "Handling Request: %s", request.uri.c_str());
RCLCPP_INFO(get_logger(), "Handling Request: %s", request.uri.c_str());
}

try {
return handler_group_(request, connection, begin, end);
} catch (std::exception & e) {
RCLCPP_WARN(node_->get_logger(), "Error Handling Request: %s", e.what());
RCLCPP_WARN(get_logger(), "Error Handling Request: %s", e.what());
return false;
}
}
Expand All @@ -184,7 +179,7 @@ bool WebVideoServer::handle_stream(
// Fallback for topics without corresponding compressed topics
if (type == std::string("ros_compressed")) {
std::string compressed_topic_name = topic + "/compressed";
auto tnat = node_->get_topic_names_and_types();
auto tnat = get_topic_names_and_types();
bool did_find_compressed_topic = false;
for (auto topic_and_types : tnat) {
if (topic_and_types.second.size() > 1) {
Expand All @@ -201,13 +196,13 @@ bool WebVideoServer::handle_stream(
}
if (!did_find_compressed_topic) {
RCLCPP_WARN(
node_->get_logger(),
get_logger(),
"Could not find compressed image topic for %s, falling back to mjpeg", topic.c_str());
type = "mjpeg";
}
}
std::shared_ptr<ImageStreamer> streamer = stream_types_[type]->create_streamer(
request, connection, node_);
request, connection, shared_from_this());
streamer->start();
std::scoped_lock lock(subscriber_mutex_);
image_subscribers_.push_back(streamer);
Expand All @@ -224,7 +219,7 @@ bool WebVideoServer::handle_snapshot(
const char * end)
{
std::shared_ptr<ImageStreamer> streamer = std::make_shared<JpegSnapshotStreamer>(
request, connection, node_);
request, connection, shared_from_this());
streamer->start();

std::scoped_lock lock(subscriber_mutex_);
Expand All @@ -243,7 +238,7 @@ bool WebVideoServer::handle_stream_viewer(
// Fallback for topics without corresponding compressed topics
if (type == std::string("ros_compressed")) {
std::string compressed_topic_name = topic + "/compressed";
auto tnat = node_->get_topic_names_and_types();
auto tnat = get_topic_names_and_types();
bool did_find_compressed_topic = false;
for (auto topic_and_types : tnat) {
if (topic_and_types.second.size() > 1) {
Expand All @@ -260,7 +255,7 @@ bool WebVideoServer::handle_stream_viewer(
}
if (!did_find_compressed_topic) {
RCLCPP_WARN(
node_->get_logger(),
get_logger(),
"Could not find compressed image topic for %s, falling back to mjpeg", topic.c_str());
type = "mjpeg";
}
Expand Down Expand Up @@ -292,7 +287,7 @@ bool WebVideoServer::handle_list_streams(
{
std::vector<std::string> image_topics;
std::vector<std::string> camera_info_topics;
auto tnat = node_->get_topic_names_and_types();
auto tnat = get_topic_names_and_types();
for (auto topic_and_types : tnat) {
if (topic_and_types.second.size() > 1) {
// skip over topics with more than one type
Expand Down Expand Up @@ -380,14 +375,9 @@ bool WebVideoServer::handle_list_streams(

} // namespace web_video_server

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<rclcpp::Node>("web_video_server");

web_video_server::WebVideoServer server(node);
server.setup_cleanup_inactive_streams();
server.spin();
#include "rclcpp_components/register_node_macro.hpp"

return 0;
}
// Register the component with class_loader.
// This acts as a sort of entry point, allowing the component to be discoverable when its library
// is being loaded into a running process.
RCLCPP_COMPONENTS_REGISTER_NODE(web_video_server::WebVideoServer)
47 changes: 47 additions & 0 deletions src/web_video_server_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2014, Worcester Polytechnic Institute
// Copyright (c) 2024, The Robot Web Tools Contributors
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
#include "rclcpp/rclcpp.hpp"

#include "web_video_server/web_video_server.hpp"

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<web_video_server::WebVideoServer>(rclcpp::NodeOptions());
node->setup_cleanup_inactive_streams();

int ros_threads;
node->get_parameter("ros_threads", ros_threads);
rclcpp::executors::MultiThreadedExecutor spinner(rclcpp::ExecutorOptions(), ros_threads);
spinner.add_node(node);
spinner.spin();

return 0;
}

0 comments on commit cea8e8e

Please sign in to comment.