Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to parse Annex B stream in FU-A #100

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

272 changes: 265 additions & 7 deletions src/codec/h264.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub(crate) struct Depacketizer {
/// In state `PreMark`, an entry for each NAL.
/// Kept around (empty) in other states to re-use the backing allocation.
nals: Vec<Nal>,

/// RTP payload is an Annex B stream.
has_annex_b_stream: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -123,6 +126,7 @@ impl Depacketizer {
pending: None,
pieces: Vec::new(),
nals: Vec::new(),
has_annex_b_stream: false,
parameters,
})
}
Expand Down Expand Up @@ -323,18 +327,23 @@ impl Depacketizer {
match (start, access_unit.in_fu_a) {
(true, true) => return Err("FU-A with start bit while frag in progress".into()),
(true, false) => {
self.add_piece(data)?;
self.nals.push(Nal {
hdr: nal_header,
next_piece_idx: u32::MAX, // should be overwritten later.
len: 1 + u32_len,
});
if self.is_annex_b_stream(data.clone()) {
self.has_annex_b_stream = true;
self.read_annex_b_stream(nal_header, &mut data)?;
} else {
self.add_piece(data)?;
self.nals.push(Nal {
hdr: nal_header,
next_piece_idx: u32::MAX, // should be overwritten later.
len: 1 + u32_len,
});
}
access_unit.in_fu_a = true;
}
(false, true) => {
let pieces = self.add_piece(data)?;
let nal = self.nals.last_mut().expect("nals non-empty while in fu-a");
if u8::from(nal_header) != u8::from(nal.hdr) {
if !self.has_annex_b_stream && u8::from(nal_header) != u8::from(nal.hdr) {
return Err(format!(
"FU-A has inconsistent NAL type: {:?} then {:?}",
nal.hdr, nal_header,
Expand Down Expand Up @@ -394,6 +403,97 @@ impl Depacketizer {
u32::try_from(self.pieces.len()).map_err(|_| "more than u32::MAX pieces!".to_string())
}

/// Checks if Annex B start code is present in payload.
fn is_annex_b_stream(&mut self, piece: Bytes) -> bool {
// TODO: should we check for 3 byte start code too?
let _start_code_3_byte = [0x00, 0x00, 0x01];

let start_code_4_byte = [0x00, 0x00, 0x00, 0x01];
let start_code_4_byte_idx = piece
.windows(start_code_4_byte[..].len())
.position(|window| window == &start_code_4_byte[..]);
start_code_4_byte_idx.is_some()
}

/// Parses an Annex B steam, splitting NALUs in it and adding them to `Depacketizer`.
fn read_annex_b_stream(
&mut self,
nal_header: NalHeader,
piece: &mut Bytes,
) -> Result<(), String> {
const START_CODE_4_BYTE: [u8; 4] = [0x00, 0x00, 0x00, 0x01];

assert!(self.has_annex_b_stream);

// TODO: check for start code broken b/w two fragmented packets.

// Find start codes in payload.
let start_codes: Vec<_> = piece
.windows(START_CODE_4_BYTE[..].len())
.enumerate()
.filter_map(|(i, window)| {
if window == START_CODE_4_BYTE {
Some(i)
} else {
None
}
})
.collect();

let mut start_idx = 0;
for (idx, start_code_idx) in start_codes.into_iter().enumerate() {
let start_code_end_idx = start_code_idx + 3;
let mut nal_piece = piece.slice(start_idx..start_code_idx);
start_idx = start_code_end_idx + 1;

if idx == 0 {
if self.pieces.is_empty() {
let pieces = self.add_piece(nal_piece.clone())?;
// use nal_header from argument
self.nals.push(Nal {
hdr: nal_header,
next_piece_idx: pieces,
len: 1 + u32::try_from(nal_piece.clone().len())
.expect("NALU payload must be < u16::MAX"),
})
} else {
// TODO: we received a start code in a fragmented FU-A packet.
// Handle it differently i.e. update the last NALU instead of
// creating a new one.
}
} else {
let nal_header_byte = nal_piece[0];
nal_piece.advance(1);

let pieces = self.add_piece(nal_piece.clone())?;
let nal_header = NalHeader::new(nal_header_byte).expect("NalHeader is valid");
self.nals.push(Nal {
hdr: nal_header,
next_piece_idx: pieces,
len: 1 + u32::try_from(nal_piece.clone().len())
.expect("NALU payload must be < u16::MAX"),
})
}
}

// Handle payload after the last found start code.
if start_idx < piece.len() {
let mut nal_piece = piece.slice(start_idx..);
let nal_header_byte = nal_piece[0];
let nal_header = NalHeader::new(nal_header_byte).expect("NalHeader is valid");
nal_piece.advance(1);
self.add_piece(nal_piece.clone())?;
self.nals.push(Nal {
hdr: nal_header,
next_piece_idx: u32::MAX,
len: 1 + u32::try_from(nal_piece.clone().len())
.expect("NALU payload must be < u16::MAX"),
})
}

Ok(())
}

/// Logs information about each access unit.
/// Currently, "bad" access units (violating certain specification rules)
/// are logged at debug priority, and others are logged at trace priority.
Expand Down Expand Up @@ -1002,6 +1102,7 @@ mod tests {

use crate::testutil::init_logging;
use crate::{codec::CodecItem, rtp::ReceivedPacketBuilder};
use h264_reader::nal::UnitType;

/*
* This test requires
Expand Down Expand Up @@ -1527,4 +1628,161 @@ mod tests {
assert!(frame.has_new_parameters);
assert!(d.parameters().is_some());
}

// FU-A packet containing Annex B stream (https://github.com/scottlamb/retina/issues/68)
#[test]
fn parse_annex_b_stream_in_fu_a() {
init_logging();
let mut d = super::Depacketizer::new(90_000, Some("profile-level-id=TQAf;packetization-mode=1;sprop-parameter-sets=J00AH+dAKALdgKUFBQXwAAADABAAAAMCiwEAAtxoAAIlUX//AoA=,KO48gA==")).unwrap();
let timestamp = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(
ReceivedPacketBuilder {
// FU-A start fragment which includes Annex B stream of 3 NALs
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}
.build(
*b"\
\x3c\x87\
\x4d\x00\x1f\xe7\x40\x28\x02\xdd\x80\xa5\x05\x05\x05\xf0\x00\x00\x03\x00\x10\x00\x00\x03\x02\x8b\x01\x00\x02\xdc\x68\x00\x02\x25\x51\x7f\xff\x02\x80\
\x00\x00\x00\x01\
\x28\
\xee\x3c\x80\
\x00\x00\x00\x01\
\x25\
idr-slice, "
)
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());

// should've parsed Annex B stream from first FU-A frag into 3 NALs (SPS, PPS & IDR slice)
let number_of_nals_in_first_frag = 3;
assert!(d.nals.len() == number_of_nals_in_first_frag);
assert!(d.pieces.len() == number_of_nals_in_first_frag);
assert!(d.nals[0].hdr.nal_unit_type() == UnitType::SeqParameterSet);
assert!(d.nals[1].hdr.nal_unit_type() == UnitType::PicParameterSet);
assert!(d.nals[2].hdr.nal_unit_type() == UnitType::SliceLayerWithoutPartitioningIdr);

d.push(
ReceivedPacketBuilder {
// FU-A packet, middle.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x3c\x07idr-slice continued, ")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());

// For Annex B stream in FU-A, make sure we append next frags to the last nal
// instead of creating a new one from the frag header, since the header is of the starting
// NAL of previous frag, but instead is supposed to be the continuation of the last NAL from Annex B stream.

// This test will also test that retina shouldn't panic on receiving different nal headers in frags of
// a FU-A when the FU-A contains an Annex B stream.

// no new nals are to be created
assert!(d.nals.len() == number_of_nals_in_first_frag);
// data from frag will get appended
assert!(d.pieces.len() == number_of_nals_in_first_frag + 1);

d.push(
ReceivedPacketBuilder {
// FU-A packet, end.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x3c\x47idr-slice end")
.unwrap(),
)
.unwrap();

let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
_ => panic!(),
};
assert_eq!(
frame.data(),
b"\
\x00\x00\x00\x26\
\x27\
\x4d\x00\x1f\xe7\x40\x28\x02\xdd\x80\xa5\x05\x05\x05\xf0\x00\x00\x03\x00\x10\x00\x00\x03\x02\x8b\x01\x00\x02\xdc\x68\x00\x02\x25\x51\x7f\xff\x02\x80\
\x00\x00\x00\
\x04\x28\
\xee\x3c\x80\
\x00\x00\x00\
\x2e\x25\
idr-slice, idr-slice continued, idr-slice end"
);
}

#[test]
fn exit_on_inconsistent_headers_between_fu_a() {
init_logging();
let mut d = super::Depacketizer::new(90_000, Some("profile-level-id=TQAf;packetization-mode=1;sprop-parameter-sets=J00AH+dAKALdgKUFBQXwAAADABAAAAMCiwEAAtxoAAIlUX//AoA=,KO48gA==")).unwrap();
let timestamp = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(
ReceivedPacketBuilder {
// FU-A start fragment
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x3c\x81start of non-idr")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());

let push_result = d.push(
ReceivedPacketBuilder {
// FU-A packet, middle.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x3c\x07a wild sps appeared")
.unwrap(),
);
assert!(push_result.is_err());
}
}
Loading