Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chrono steady clock for frame timing #173

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions include/web_video_server/image_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#pragma once

#include <chrono>
#include <memory>
#include <string>

Expand Down Expand Up @@ -64,7 +65,7 @@ class ImageStreamer
/**
* Restreams the last received image frame if older than max_age.
*/
virtual void restreamFrame(double max_age) = 0;
virtual void restreamFrame(std::chrono::duration<double> max_age) = 0;

std::string getTopic()
{
Expand Down Expand Up @@ -94,8 +95,8 @@ class ImageTransportImageStreamer : public ImageStreamer

protected:
virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg);
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time) = 0;
virtual void restreamFrame(double max_age);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time) = 0;
virtual void restreamFrame(std::chrono::duration<double> max_age);
virtual void initialize(const cv::Mat &);

image_transport::Subscriber image_sub_;
Expand All @@ -105,7 +106,7 @@ class ImageTransportImageStreamer : public ImageStreamer
std::string default_transport_;
std::string qos_profile_name_;

rclcpp::Time last_frame;
std::chrono::steady_clock::time_point last_frame_;
cv::Mat output_size_image;
std::mutex send_mutex_;

Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/jpeg_streamers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MjpegStreamer : public ImageTransportImageStreamer
~MjpegStreamer();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);

private:
MultipartStream stream_;
Expand All @@ -78,7 +78,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer
~JpegSnapshotStreamer();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);

private:
int quality_;
Expand Down
6 changes: 4 additions & 2 deletions include/web_video_server/libav_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ extern "C"
#include <libavutil/imgutils.h>
}

#include <chrono>
#include <memory>
#include <string>

Expand All @@ -66,7 +67,7 @@ class LibavStreamer : public ImageTransportImageStreamer

protected:
virtual void initializeEncoder();
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);
virtual void initialize(const cv::Mat &);
AVFormatContext * format_context_;
const AVCodec * codec_;
Expand All @@ -78,8 +79,9 @@ class LibavStreamer : public ImageTransportImageStreamer
private:
AVFrame * frame_;
struct SwsContext * sws_context_;
rclcpp::Time first_image_timestamp_;
std::mutex encode_mutex_;
bool first_image_received_;
std::chrono::steady_clock::time_point first_image_time_;

std::string format_name_;
std::string codec_name_;
Expand Down
15 changes: 8 additions & 7 deletions include/web_video_server/multipart_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,35 @@ namespace web_video_server

struct PendingFooter
{
rclcpp::Time timestamp;
std::chrono::steady_clock::time_point timestamp;
std::weak_ptr<std::string> contents;
};

class MultipartStream
{
public:
MultipartStream(
std::function<rclcpp::Time()> get_now,
async_web_server_cpp::HttpConnectionPtr & connection,
const std::string & boundry = "boundarydonotcross",
std::size_t max_queue_size = 1);

void sendInitialHeader();
void sendPartHeader(const rclcpp::Time & time, const std::string & type, size_t payload_size);
void sendPartFooter(const rclcpp::Time & time);
void sendPartHeader(
const std::chrono::steady_clock::time_point & time, const std::string & type,
size_t payload_size);
void sendPartFooter(const std::chrono::steady_clock::time_point & time);
void sendPartAndClear(
const rclcpp::Time & time, const std::string & type,
const std::chrono::steady_clock::time_point & time, const std::string & type,
std::vector<unsigned char> & data);
void sendPart(
const rclcpp::Time & time, const std::string & type, const boost::asio::const_buffer & buffer,
const std::chrono::steady_clock::time_point & time, const std::string & type,
const boost::asio::const_buffer & buffer,
async_web_server_cpp::HttpConnection::ResourcePtr resource);

private:
bool isBusy();

private:
std::function<rclcpp::Time()> get_now_;
const std::size_t max_queue_size_;
async_web_server_cpp::HttpConnectionPtr connection_;
std::string boundry_;
Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/png_streamers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PngStreamer : public ImageTransportImageStreamer
~PngStreamer();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);
virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg);

private:
Expand All @@ -78,7 +78,7 @@ class PngSnapshotStreamer : public ImageTransportImageStreamer
~PngSnapshotStreamer();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);
virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg);

private:
Expand Down
6 changes: 3 additions & 3 deletions include/web_video_server/ros_compressed_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ class RosCompressedStreamer : public ImageStreamer
rclcpp::Node::SharedPtr node);
~RosCompressedStreamer();
virtual void start();
virtual void restreamFrame(double max_age);
virtual void restreamFrame(std::chrono::duration<double> max_age);

protected:
virtual void sendImage(
const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg,
const rclcpp::Time & time);
const std::chrono::steady_clock::time_point & time);

private:
void imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg);
MultipartStream stream_;
rclcpp::Subscription<sensor_msgs::msg::CompressedImage>::SharedPtr image_sub_;
rclcpp::Time last_frame;
std::chrono::steady_clock::time_point last_frame_;
sensor_msgs::msg::CompressedImage::ConstSharedPtr last_msg;
std::mutex send_mutex_;
std::string qos_profile_name_;
Expand Down
2 changes: 1 addition & 1 deletion include/web_video_server/web_video_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class WebVideoServer : public rclcpp::Node
const char * begin, const char * end);

private:
void restreamFrames(double max_age);
void restreamFrames(std::chrono::duration<double> max_age);
void cleanup_inactive_streams();

rclcpp::TimerBase::SharedPtr cleanup_timer_;
Expand Down
10 changes: 5 additions & 5 deletions src/image_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &)
{
}

void ImageTransportImageStreamer::restreamFrame(double max_age)
void ImageTransportImageStreamer::restreamFrame(std::chrono::duration<double> max_age)
{
if (inactive_ || !initialized_) {
return;
}
try {
if (last_frame + rclcpp::Duration::from_seconds(max_age) < node_->now() ) {
if (last_frame_ + max_age < std::chrono::steady_clock::now()) {
std::scoped_lock lock(send_mutex_);
// don't update last_frame, it may remain an old value.
sendImage(output_size_image, node_->now());
sendImage(output_size_image, std::chrono::steady_clock::now());
}
} catch (boost::system::system_error & e) {
// happens when client disconnects
Expand Down Expand Up @@ -199,8 +199,8 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C
initialized_ = true;
}

last_frame = node_->now();
sendImage(output_size_image, msg->header.stamp);
last_frame_ = std::chrono::steady_clock::now();
sendImage(output_size_image, last_frame_);
} catch (cv_bridge::Exception & e) {
auto & clk = *node_->get_clock();
RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "cv_bridge exception: %s", e.what());
Expand Down
14 changes: 10 additions & 4 deletions src/jpeg_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ MjpegStreamer::MjpegStreamer(
const async_web_server_cpp::HttpRequest & request,
async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node)
: ImageTransportImageStreamer(request, connection, node),
stream_(std::bind(&rclcpp::Node::now, node), connection)
stream_(connection)
{
quality_ = request.get_query_param_value_or_default<int>("quality", 95);
stream_.sendInitialHeader();
Expand All @@ -50,7 +50,9 @@ MjpegStreamer::~MjpegStreamer()
std::scoped_lock lock(send_mutex_); // protects sendImage.
}

void MjpegStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time)
void MjpegStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
std::vector<int> encode_params;
encode_params.push_back(cv::IMWRITE_JPEG_QUALITY);
Expand Down Expand Up @@ -94,7 +96,9 @@ JpegSnapshotStreamer::~JpegSnapshotStreamer()
std::scoped_lock lock(send_mutex_); // protects sendImage.
}

void JpegSnapshotStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time)
void JpegSnapshotStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
std::vector<int> encode_params;
encode_params.push_back(cv::IMWRITE_JPEG_QUALITY);
Expand All @@ -104,7 +108,9 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & t
cv::imencode(".jpeg", img, encoded_buffer, encode_params);

char stamp[20];
snprintf(stamp, sizeof(stamp), "%.06lf", time.seconds());
snprintf(
stamp, sizeof(stamp), "%.06lf",
std::chrono::duration_cast<std::chrono::duration<double>>(time.time_since_epoch()).count());
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
.header("Connection", "close")
.header("Server", "web_video_server")
Expand Down
18 changes: 11 additions & 7 deletions src/libav_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ LibavStreamer::LibavStreamer(
const std::string & format_name, const std::string & codec_name,
const std::string & content_type)
: ImageTransportImageStreamer(request, connection, node), format_context_(0), codec_(0),
codec_context_(0), video_stream_(0), frame_(0), sws_context_(0), first_image_timestamp_(0),
format_name_(format_name), codec_name_(codec_name), content_type_(content_type), opt_(0),
io_buffer_(0)
codec_context_(0), video_stream_(0), frame_(0), sws_context_(0),
first_image_received_(false), first_image_time_(), format_name_(format_name),
codec_name_(codec_name), content_type_(content_type), opt_(0), io_buffer_(0)
{
bitrate_ = request.get_query_param_value_or_default<int>("bitrate", 100000);
qmin_ = request.get_query_param_value_or_default<int>("qmin", 10);
Expand Down Expand Up @@ -215,11 +215,14 @@ void LibavStreamer::initializeEncoder()
{
}

void LibavStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time)
void LibavStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
std::scoped_lock lock(encode_mutex_);
if (0 == first_image_timestamp_.nanoseconds()) {
first_image_timestamp_ = time;
if (!first_image_received_) {
first_image_received_ = true;
first_image_time_ = time;
}

AVPixelFormat input_coding_format = AV_PIX_FMT_BGR24;
Expand Down Expand Up @@ -274,7 +277,8 @@ void LibavStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time)
std::size_t size;
uint8_t * output_buf;

double seconds = (time - first_image_timestamp_).seconds();
double seconds = std::chrono::duration_cast<std::chrono::duration<double>>(
time - first_image_time_).count();
// Encode video at 1/0.95 to minimize delay
pkt->pts = (int64_t)(seconds / av_q2d(video_stream_->time_base) * 0.95);
if (pkt->pts <= 0) {
Expand Down
23 changes: 13 additions & 10 deletions src/multipart_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ namespace web_video_server
{

MultipartStream::MultipartStream(
std::function<rclcpp::Time()> get_now,
async_web_server_cpp::HttpConnectionPtr & connection,
const std::string & boundry,
std::size_t max_queue_size)
: get_now_(get_now), connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size)
: connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size)
{}

void MultipartStream::sendInitialHeader()
Expand All @@ -58,11 +57,13 @@ void MultipartStream::sendInitialHeader()
}

void MultipartStream::sendPartHeader(
const rclcpp::Time & time, const std::string & type,
const std::chrono::steady_clock::time_point & time, const std::string & type,
size_t payload_size)
{
char stamp[20];
snprintf(stamp, sizeof(stamp), "%.06lf", time.seconds());
snprintf(
stamp, sizeof(stamp), "%.06lf",
std::chrono::duration_cast<std::chrono::duration<double>>(time.time_since_epoch()).count());
std::shared_ptr<std::vector<async_web_server_cpp::HttpHeader>> headers(
new std::vector<async_web_server_cpp::HttpHeader>());
headers->push_back(async_web_server_cpp::HttpHeader("Content-type", type));
Expand All @@ -73,7 +74,7 @@ void MultipartStream::sendPartHeader(
connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers);
}

void MultipartStream::sendPartFooter(const rclcpp::Time & time)
void MultipartStream::sendPartFooter(const std::chrono::steady_clock::time_point & time)
{
std::shared_ptr<std::string> str(new std::string("\r\n--" + boundry_ + "\r\n"));
PendingFooter pf;
Expand All @@ -84,7 +85,7 @@ void MultipartStream::sendPartFooter(const rclcpp::Time & time)
}

void MultipartStream::sendPartAndClear(
const rclcpp::Time & time, const std::string & type,
const std::chrono::steady_clock::time_point & time, const std::string & type,
std::vector<unsigned char> & data)
{
if (!isBusy()) {
Expand All @@ -95,7 +96,7 @@ void MultipartStream::sendPartAndClear(
}

void MultipartStream::sendPart(
const rclcpp::Time & time, const std::string & type,
const std::chrono::steady_clock::time_point & time, const std::string & type,
const boost::asio::const_buffer & buffer,
async_web_server_cpp::HttpConnection::ResourcePtr resource)
{
Expand All @@ -108,13 +109,15 @@ void MultipartStream::sendPart(

bool MultipartStream::isBusy()
{
rclcpp::Time currentTime = get_now_();
auto current_time = std::chrono::steady_clock::now();
while (!pending_footers_.empty()) {
if (pending_footers_.front().contents.expired()) {
pending_footers_.pop();
} else {
rclcpp::Time footerTime = pending_footers_.front().timestamp;
if ((currentTime - footerTime).seconds() > 0.5) {
auto footer_time = pending_footers_.front().timestamp;
if (std::chrono::duration_cast<std::chrono::duration<double>>(
(current_time - footer_time)).count() > 0.5)
{
pending_footers_.pop();
} else {
break;
Expand Down
Loading
Loading