Skip to content

Commit

Permalink
seamless mid-stream video parameter changes
Browse files Browse the repository at this point in the history
For #217. This handles the recording logic. May still need fixes to
playback and/or live stream logic.
  • Loading branch information
scottlamb committed Apr 13, 2022
1 parent 71d3f2a commit 3bc552b
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 101 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Each release is tagged in Git and on the Docker repository
## unreleased

* upgrade to Retina 0.3.9, improving camera interop and diagnostics
* [#217](https://github.com/scottlamb/moonfire-nvr/issues/217): no longer
drop the connection to the camera when it changes video parameters, instead
continuing the run seamlessly.

## `v0.7.3` (2022-03-22)

Expand Down
2 changes: 1 addition & 1 deletion server/db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl VideoSampleEntry {
}
}

#[derive(PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct VideoSampleEntryToInsert {
pub data: Vec<u8>,
pub rfc6381_codec: String,
Expand Down
56 changes: 36 additions & 20 deletions server/db/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> {
db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>,
stream_id: i32,
video_sample_entry_id: i32,
state: WriterState<D::File>,
}

Expand All @@ -634,6 +633,7 @@ struct InnerWriter<F: FileWriter> {
r: Arc<Mutex<db::RecordingToInsert>>,
e: recording::SampleIndexEncoder,
id: CompositeId,
video_sample_entry_id: i32,

hasher: blake3::Hasher,

Expand Down Expand Up @@ -680,26 +680,34 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>,
stream_id: i32,
video_sample_entry_id: i32,
) -> Self {
Writer {
dir,
db,
channel,
stream_id,
video_sample_entry_id,
state: WriterState::Unopened,
}
}

/// Opens a new writer.
/// Opens a new recording if not already open.
///
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
/// correcting this.
fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> {
fn open(
&mut self,
shutdown_rx: &mut base::shutdown::Receiver,
video_sample_entry_id: i32,
) -> Result<(), Error> {
let prev = match self.state {
WriterState::Unopened => None,
WriterState::Open(_) => return Ok(()),
WriterState::Open(ref o) => {
if o.video_sample_entry_id != video_sample_entry_id {
bail!("inconsistent video_sample_entry_id");
}
return Ok(());
}
WriterState::Closed(prev) => Some(prev),
};
let (id, r) = self.db.lock().add_recording(
Expand All @@ -709,7 +717,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
start: prev
.map(|p| p.end)
.unwrap_or(recording::Time(i64::max_value())),
video_sample_entry_id: self.video_sample_entry_id,
video_sample_entry_id,
flags: db::RecordingFlags::Growing as i32,
..Default::default()
},
Expand All @@ -726,6 +734,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
hasher: blake3::Hasher::new(),
local_start: recording::Time(i64::max_value()),
unindexed_sample: None,
video_sample_entry_id,
});
Ok(())
}
Expand All @@ -747,8 +756,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
local_time: recording::Time,
pts_90k: i64,
is_key: bool,
video_sample_entry_id: i32,
) -> Result<(), Error> {
self.open(shutdown_rx)?;
self.open(shutdown_rx, video_sample_entry_id)?;
let w = match self.state {
WriterState::Open(ref mut w) => w,
_ => unreachable!(),
Expand Down Expand Up @@ -816,9 +826,14 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
Ok(())
}

/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
/// swallowing errors and using a zero duration for the last sample.
/// Cleanly closes a single recording within this writer, using a supplied
/// pts of the next sample for the last sample's duration (if known).
///
/// The `Writer` may be used again, causing another recording to be created
/// within the same run.
///
/// If the `Writer` is dropped without `close`, the `Drop` trait impl will
/// close, swallowing errors and using a zero duration for the last sample.
pub fn close(&mut self, next_pts: Option<i64>, reason: Option<String>) -> Result<(), Error> {
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
WriterState::Open(w) => {
Expand Down Expand Up @@ -1179,13 +1194,7 @@ mod tests {
rfc6381_codec: "avc1.000000".to_owned(),
})
.unwrap();
let mut w = Writer::new(
&h.dir,
&h.db,
&h.channel,
testutil::TEST_STREAM_ID,
video_sample_entry_id,
);
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new(|_id| Err(nix::Error::EIO)),
Expand All @@ -1200,8 +1209,15 @@ mod tests {
));
f.expect(MockFileAction::Write(Box::new(|_| Ok(1))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true)
.unwrap();
w.write(
&mut h.shutdown_rx,
b"1",
recording::Time(1),
0,
true,
video_sample_entry_id,
)
.unwrap();

let e = w
.write(
Expand Down
15 changes: 11 additions & 4 deletions server/src/cmds/config/cameras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ fn press_test_inner(
transport: retina::client::Transport,
) -> Result<String, Error> {
let _enter = handle.enter();
let (extra_data, stream) = stream::OPENER.open(
let stream = stream::OPENER.open(
"test stream".to_owned(),
url,
retina::client::SessionOptions::default()
Expand All @@ -222,10 +222,17 @@ fn press_test_inner(
})
.transport(transport),
)?;
let video_sample_entry = stream.video_sample_entry();
Ok(format!(
"{}x{} video stream served by tool {:?}",
extra_data.width,
extra_data.height,
"codec: {}\n\
dimensions: {}x{}\n\
pixel aspect ratio: {}x{}\n\
tool: {:?}",
&video_sample_entry.rfc6381_codec,
video_sample_entry.width,
video_sample_entry.height,
video_sample_entry.pasp_h_spacing,
video_sample_entry.pasp_v_spacing,
stream.tool(),
))
}
Expand Down
25 changes: 11 additions & 14 deletions server/src/mp4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2278,21 +2278,18 @@ mod tests {
}

fn copy_mp4_to_db(db: &mut TestDb<RealClocks>) {
let (extra_data, input) =
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let input = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let mut input: Box<dyn stream::Stream> = Box::new(input);

// 2015-04-26 00:00:00 UTC.
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(extra_data).unwrap();
let video_sample_entry_id = db
.db
.lock()
.insert_video_sample_entry(input.video_sample_entry().clone())
.unwrap();
let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap();
let mut output = writer::Writer::new(
dir,
&db.db,
&db.syncer_channel,
TEST_STREAM_ID,
video_sample_entry_id,
);
let mut output = writer::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID);

// end_pts is the pts of the end of the most recent frame (start + duration).
// It's needed because dir::Writer calculates a packet's duration from its pts and the
Expand Down Expand Up @@ -2321,6 +2318,7 @@ mod tests {
frame_time,
pkt.pts,
pkt.is_key,
video_sample_entry_id,
)
.unwrap();
end_pts = Some(pkt.pts + i64::from(pkt.duration));
Expand Down Expand Up @@ -2391,9 +2389,8 @@ mod tests {
}

fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) {
let (orig_extra_data, orig) =
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let (new_extra_data, new) = stream::testutil::Mp4Stream::open(new_filename).unwrap();
let orig = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let new = stream::testutil::Mp4Stream::open(new_filename).unwrap();

if pts_offset > 0 {
// The mp4 crate doesn't interpret the edit list. Manually inspect it.
Expand All @@ -2410,7 +2407,7 @@ mod tests {

let mut orig: Box<dyn stream::Stream> = Box::new(orig);
let mut new: Box<dyn stream::Stream> = Box::new(new);
assert_eq!(orig_extra_data, new_extra_data);
assert_eq!(orig.video_sample_entry(), new.video_sample_entry());
let mut final_durations = None;
for i in 0.. {
let orig_pkt = match orig.next() {
Expand Down
Loading

0 comments on commit 3bc552b

Please sign in to comment.