Skip to content

Commit

Permalink
Merge pull request #88 from voysys/mjpeg-support
Browse files Browse the repository at this point in the history
Add MJPEG support
  • Loading branch information
scottlamb authored Apr 21, 2024
2 parents 9d1ac89 + f32192d commit cd5d25b
Show file tree
Hide file tree
Showing 8 changed files with 1,087 additions and 11 deletions.
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@ Progress:
* [ ] async-std
* [ ] synchronous with std only
* codec depacketization
* [x] video: H.264
([RFC 6184](https://datatracker.ietf.org/doc/html/rfc6184))
* [ ] SVC
* [ ] periodic infra refresh
* [x] multiple slices per picture
* [ ] multiple SPS/PPS
* [ ] interleaved mode
* [x] AAC output format
* [ ] Annex B output format ([#44](https://github.com/scottlamb/retina/issues/44))
* [x] video
* [x] H.264
* [ ] SVC
* [ ] periodic infra refresh
* [x] multiple slices per picture
* [ ] multiple SPS/PPS
* [ ] interleaved mode
* [x] AAC output format
* [ ] Annex B output format ([#44](https://github.com/scottlamb/retina/issues/44))
* [x] ([RFC 6184](https://datatracker.ietf.org/doc/html/rfc6184))
* [x] MJPEG
* [x] ([RFC 2435](https://datatracker.ietf.org/doc/html/rfc2435))
* audio
* [x] AAC
* [ ] interleaving
Expand Down Expand Up @@ -80,6 +83,7 @@ Where CMD:
* **info** - Gets info about available streams and exits.
* **mp4** - Writes RTSP streams to mp4 file; exit with Ctrl+C.
* **onvif** - Gets realtime onvif metadata if available; exit with Ctrl+C.
* **jpeg** - Writes depacketized JPEG images to disk; exit with CTRL+C.

## Example WebRTC proxy

Expand Down
158 changes: 158 additions & 0 deletions examples/client/src/jpeg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright (C) 2023 Niclas Olmenius <niclas@voysys.se>
// SPDX-License-Identifier: MIT OR Apache-2.0

//! Proof-of-concept `.jpeg` writer.
//!
//! This writes depacketized RTSP MJPEG images to a specified output directory.

use std::{num::NonZeroU32, path::PathBuf, pin::Pin, sync::Arc};

use anyhow::{anyhow, bail, Error};
use clap::Parser;
use futures::{Future, StreamExt};
use log::info;
use retina::{client::SetupOptions, codec::CodecItem};

#[derive(Parser)]
pub struct Opts {
#[command(flatten)]
src: super::Source,

/// Policy for handling the `rtptime` parameter normally seem in the `RTP-Info` header.
/// One of `default`, `require`, `ignore`, `permissive`.
#[arg(default_value_t, long)]
initial_timestamp: retina::client::InitialTimestampPolicy,

/// Allow lost packets mid-stream without aborting.
#[arg(long)]
allow_loss: bool,

/// When to issue a `TEARDOWN` request: `auto`, `always`, or `never`.
#[arg(default_value_t, long)]
teardown: retina::client::TeardownPolicy,

/// Duration after which to exit automatically, in seconds.
#[arg(long, name = "secs")]
duration: Option<u64>,

/// The transport to use: `tcp` or `udp` (experimental).
///
/// Note: `--allow-loss` is strongly recommended with `udp`.
#[arg(default_value_t, long)]
transport: retina::client::Transport,

/// Path to directory to write JPEG images.
out_dir: PathBuf,
}

/// Writes `.jpeg` files to the specified directory.
async fn write_jpeg(
opts: &Opts,
session: retina::client::Session<retina::client::Described>,
stop_signal: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>>,
) -> Result<(), Error> {
let mut session = session
.play(
retina::client::PlayOptions::default()
.initial_timestamp(opts.initial_timestamp)
.enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()),
)
.await?
.demuxed()?;

std::fs::create_dir_all(&opts.out_dir)?;

let sleep = match opts.duration {
Some(secs) => {
futures::future::Either::Left(tokio::time::sleep(std::time::Duration::from_secs(secs)))
}
None => futures::future::Either::Right(futures::future::pending()),
};
tokio::pin!(stop_signal);
tokio::pin!(sleep);

let mut frame_count = 0;

loop {
tokio::select! {
pkt = session.next() => {
match pkt.ok_or_else(|| anyhow!("EOF"))?? {
CodecItem::VideoFrame(f) => {
let out_path = opts.out_dir.join(&format!("{frame_count:05}.jpeg"));
std::fs::write(out_path, f.data())?;

frame_count += 1;
},
CodecItem::Rtcp(rtcp) => {
if let (Some(t), Some(Ok(Some(sr)))) = (rtcp.rtp_timestamp(), rtcp.pkts().next().map(retina::rtcp::PacketRef::as_sender_report)) {
println!("{}: SR ts={}", t, sr.ntp_timestamp());
}
},
_ => continue,
};
},
_ = &mut stop_signal => {
info!("Stopping due to signal");
break;
},
_ = &mut sleep => {
info!("Stopping after {} seconds", opts.duration.unwrap());
break;
},
}
}

Ok(())
}

pub async fn run(opts: Opts) -> Result<(), Error> {
let creds = super::creds(opts.src.username.clone(), opts.src.password.clone());
let stop_signal = Box::pin(tokio::signal::ctrl_c());
let session_group = Arc::new(retina::client::SessionGroup::default());
let mut session = retina::client::Session::describe(
opts.src.url.clone(),
retina::client::SessionOptions::default()
.creds(creds)
.session_group(session_group.clone())
.user_agent("Retina jpeg example".to_owned())
.teardown(opts.teardown),
)
.await?;
let video_stream_i = {
let s = session.streams().iter().position(|s| {
if s.media() == "image" || s.media() == "video" {
if s.encoding_name() == "jpeg" {
log::info!("Using jpeg video stream");
return true;
}
log::info!(
"Ignoring {} video stream because it's unsupported",
s.encoding_name(),
);
}
false
});
if s.is_none() {
log::info!("No suitable video stream found");
}
s
};

if let Some(i) = video_stream_i {
session
.setup(i, SetupOptions::default().transport(opts.transport.clone()))
.await?;
}
if video_stream_i.is_none() {
bail!("Exiting because no video or audio stream was selected; see info log messages above");
}

let result = write_jpeg(&opts, session, stop_signal).await;

// Session has now been dropped, on success or failure. A TEARDOWN should
// be pending if necessary. session_group.await_teardown() will wait for it.
if let Err(e) = session_group.await_teardown().await {
log::error!("TEARDOWN failed: {}", e);
}
result
}
4 changes: 4 additions & 0 deletions examples/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! RTSP client examples.

mod info;
mod jpeg;
mod mp4;
mod onvif;

Expand Down Expand Up @@ -35,6 +36,8 @@ enum Cmd {
Mp4(mp4::Opts),
/// Follows ONVIF metadata stream; use Ctrl+C to stop.
Onvif(onvif::Opts),
/// Writes depacketized JPEG images to disk; use CTRL+C to stop.
Jpeg(jpeg::Opts),
}

fn init_logging() -> mylog::Handle {
Expand Down Expand Up @@ -85,5 +88,6 @@ async fn main_inner() -> Result<(), Error> {
Cmd::Info(opts) => info::run(opts).await,
Cmd::Mp4(opts) => mp4::run(opts).await,
Cmd::Onvif(opts) => onvif::run(opts).await,
Cmd::Jpeg(opts) => jpeg::run(opts).await,
}
}
16 changes: 14 additions & 2 deletions fuzz/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ path = "fuzz_targets/roundtrip_h264.rs"
test = false
doc = false

[[bin]]
name = "depacketize_jpeg"
path = "fuzz_targets/depacketize_jpeg.rs"
test = false
doc = false
60 changes: 60 additions & 0 deletions fuzz/fuzz_targets/depacketize_jpeg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2023 Niclas Olmenius <niclas@voysys.se>
// SPDX-License-Identifier: MIT OR Apache-2.0

#![no_main]
use libfuzzer_sys::fuzz_target;
use std::num::NonZeroU32;

fuzz_target!(|data: &[u8]| {
let mut data = data;
let mut depacketizer =
retina::codec::Depacketizer::new("video", "jpeg", 90_000, None, None).unwrap();
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
let mut sequence_number: u16 = 0;
let conn_ctx = retina::ConnectionContext::dummy();
let stream_ctx = retina::StreamContext::dummy();
let pkt_ctx = retina::PacketContext::dummy();
loop {
let (hdr, rest) = match data.split_first() {
Some(r) => r,
None => return,
};
let ts_change = (hdr & 0b001) != 0;
let mark = (hdr & 0b010) != 0;
let loss = (hdr & 0b100) != 0;
let len = usize::from(hdr >> 3);
if rest.len() < len {
return;
}
let (payload, rest) = rest.split_at(len);
data = rest;
if loss {
sequence_number = sequence_number.wrapping_add(1);
}
if ts_change {
timestamp = timestamp.try_add(1).unwrap();
}
let pkt = retina::rtp::ReceivedPacketBuilder {
ctx: pkt_ctx,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number,
loss: u16::from(loss),
payload_type: 96,
mark,
}
.build(payload.iter().copied())
.unwrap();
//println!("pkt: {:#?}", pkt);
if depacketizer.push(pkt).is_err() {
return;
}
while let Some(item) = depacketizer.pull(&conn_ctx, &stream_ctx).transpose() {
if item.is_err() {
return;
}
}
sequence_number = sequence_number.wrapping_add(1);
}
});
Loading

0 comments on commit cd5d25b

Please sign in to comment.