diff --git a/src/api/channel/by_gop.rs b/src/api/channel/by_gop.rs index 907ab5414b..a3de842b73 100644 --- a/src/api/channel/by_gop.rs +++ b/src/api/channel/by_gop.rs @@ -163,6 +163,7 @@ fn workerpool( wl.send.send(p).unwrap(); } Err(EncoderStatus::Encoded) => {} + Err(EncoderStatus::ImmediateExit) => break, _ => todo!("Error management {:?}", r), } } @@ -178,6 +179,7 @@ fn workerpool( Ok(p) => wl.send.send(p).unwrap(), Err(EncoderStatus::LimitReached) => break, Err(EncoderStatus::Encoded) => {} + Err(EncoderStatus::ImmediateExit) => break, _ => todo!("Error management"), } } diff --git a/src/api/config/mod.rs b/src/api/config/mod.rs index 228b775f2d..b083baa5d4 100644 --- a/src/api/config/mod.rs +++ b/src/api/config/mod.rs @@ -249,6 +249,10 @@ impl Config { inner.rc_state.setup_second_pass(s); } + if let Some(ref progress) = self.progress { + inner.progress = Arc::clone(progress); + } + Ok(inner) } diff --git a/src/api/context.rs b/src/api/context.rs index 88ea9da84e..ddfb7e9c2d 100644 --- a/src/api/context.rs +++ b/src/api/context.rs @@ -266,6 +266,10 @@ impl Context { /// Err(EncoderStatus::Failure) => { /// return Err(EncoderStatus::Failure); /// }, + /// Err(EncoderStatus::ImmediateExit) => { + /// // We did not set a callback to exit immediately + /// unreachable!(); + /// } /// } /// } /// diff --git a/src/api/internal.rs b/src/api/internal.rs index 14687b3bdf..f7b50200a7 100644 --- a/src/api/internal.rs +++ b/src/api/internal.rs @@ -35,6 +35,8 @@ use std::fs; use std::path::PathBuf; use std::sync::Arc; +use super::{DefaultProgress, GranularProgress}; + /// The set of options that controls frame re-ordering and reference picture /// selection. /// The options stored here are invariant over the whole encode. @@ -257,6 +259,9 @@ pub(crate) struct ContextInner { next_lookahead_output_frameno: u64, /// Optional opaque to be sent back to the user opaque_q: BTreeMap, + + /// Progress callback + pub(crate) progress: Arc, } impl ContextInner { @@ -309,6 +314,7 @@ impl ContextInner { next_lookahead_frame: 1, next_lookahead_output_frameno: 0, opaque_q: BTreeMap::new(), + progress: Arc::new(DefaultProgress {}) as Arc, } } @@ -1162,8 +1168,13 @@ impl ContextInner { if self.rc_state.needs_trial_encode(fti) { let mut trial_fs = frame_data.fs.clone(); - let data = - encode_frame(&frame_data.fi, &mut trial_fs, &self.inter_cfg); + let data = encode_frame( + &frame_data.fi, + &mut trial_fs, + &self.inter_cfg, + self.progress.as_ref(), + ) + .ok_or(EncoderStatus::ImmediateExit)?; self.rc_state.update_state( (data.len() * 8) as i64, fti, @@ -1181,8 +1192,14 @@ impl ContextInner { frame_data.fi.set_quantizers(&qps); } - let data = - encode_frame(&frame_data.fi, &mut frame_data.fs, &self.inter_cfg); + let data = encode_frame( + &frame_data.fi, + &mut frame_data.fs, + &self.inter_cfg, + self.progress.as_ref(), + ) + .ok_or(EncoderStatus::ImmediateExit)?; + let enc_stats = frame_data.fs.enc_stats.clone(); self.maybe_prev_log_base_q = Some(qps.log_base_q); // TODO: Add support for dropping frames. diff --git a/src/api/util.rs b/src/api/util.rs index c26295de88..a6f78d0fea 100644 --- a/src/api/util.rs +++ b/src/api/util.rs @@ -175,6 +175,9 @@ pub enum EncoderStatus { /// [`Context::twopass_out()`]: struct.Context.html#method.twopass_out #[error("not ready")] NotReady, + /// Immediate exit was requested + #[error("immediate exit")] + ImmediateExit, } /// Represents a packet. diff --git a/src/bin/rav1e.rs b/src/bin/rav1e.rs index 9bde699f8a..a27b266d12 100644 --- a/src/bin/rav1e.rs +++ b/src/bin/rav1e.rs @@ -191,6 +191,9 @@ fn process_frame( Err(e @ EncoderStatus::NotReady) => { (Err(e.context("Mismanaged handling of two-pass stats data")), false) } + Err(e @ EncoderStatus::ImmediateExit) => { + (Err(e.context("Immediate exit requested")), false) + } Err(EncoderStatus::Encoded) => (Ok(Some(frame_summaries)), true), }; diff --git a/src/capi.rs b/src/capi.rs index cda43d8d40..cdb366c08d 100644 --- a/src/capi.rs +++ b/src/capi.rs @@ -124,6 +124,8 @@ pub enum EncoderStatus { /// was provided in the second pass of a 2-pass encode to encode the next /// frame. NotReady = -2, + /// Immediate exit requested + ImmediateExit = -3, } impl EncoderStatus { @@ -137,6 +139,7 @@ impl EncoderStatus { Encoded => "A Frame had been encoded but not emitted yet\0".as_ptr(), Failure => "Generic fatal error\0".as_ptr(), NotReady => "First-pass stats data not retrieved or not enough second-pass data provided\0".as_ptr(), + ImmediateExit => "Immediate exit requested\0".as_ptr(), } } } @@ -152,6 +155,7 @@ impl From> for EncoderStatus { rav1e::EncoderStatus::Encoded => EncoderStatus::Encoded, rav1e::EncoderStatus::Failure => EncoderStatus::Failure, rav1e::EncoderStatus::NotReady => EncoderStatus::NotReady, + rav1e::EncoderStatus::ImmediateExit => EncoderStatus::ImmediateExit, }, } } diff --git a/src/encoder.rs b/src/encoder.rs index 43d696e9b7..8fe1fb1a72 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -2973,7 +2973,8 @@ fn get_initial_cdfcontext(fi: &FrameInvariants) -> CDFContext { #[hawktracer(encode_tile_group)] fn encode_tile_group( fi: &FrameInvariants, fs: &mut FrameState, inter_cfg: &InterConfig, -) -> Vec { + progress: &dyn GranularProgress, +) -> Option> { let planes = if fi.sequence.chroma_sampling == ChromaSampling::Cs400 { 1 } else { 3 }; let mut blocks = FrameBlocks::new(fi.w_in_b, fi.h_in_b); @@ -2988,12 +2989,18 @@ fn encode_tile_group( .zip(cdfs.iter_mut()) .collect::>() .into_par_iter() - .map(|(mut ctx, cdf)| { - let raw = encode_tile(fi, &mut ctx.ts, cdf, &mut ctx.tb, inter_cfg); - (raw, ctx.ts) + .map(|(ctx, cdf)| { + let TileContextMut { mut ts, mut tb, .. } = ctx; + let raw = encode_tile(fi, &mut ts, cdf, &mut tb, inter_cfg, progress); + raw.map(|raw| (raw, ts)) }) + .while_some() .unzip(); + if raw_tiles.len() != ti.tile_count() { + return None; + } + let stats = tile_states.into_iter().map(|ts| ts.enc_stats).collect::>(); for tile_stats in stats { @@ -3078,7 +3085,7 @@ fn encode_tile_group( debug_assert!(max_tile_size_bytes > 0 && max_tile_size_bytes <= 4); fs.max_tile_size_bytes = max_tile_size_bytes; - build_raw_tile_group(ti, &raw_tiles, max_tile_size_bytes) + Some(build_raw_tile_group(ti, &raw_tiles, max_tile_size_bytes)) } fn build_raw_tile_group( @@ -3206,8 +3213,8 @@ fn check_lf_queue( fn encode_tile<'a, T: Pixel>( fi: &FrameInvariants, ts: &mut TileStateMut<'_, T>, fc: &'a mut CDFContext, blocks: &'a mut TileBlocksMut<'a>, - inter_cfg: &InterConfig, -) -> Vec { + inter_cfg: &InterConfig, progress: &dyn GranularProgress, +) -> Option> { let mut w = WriterEncoder::new(); let planes = if fi.sequence.chroma_sampling == ChromaSampling::Cs400 { 1 } else { 3 }; @@ -3226,6 +3233,11 @@ fn encode_tile<'a, T: Pixel>( for sbx in 0..ts.sb_width { cw.fc_log.clear(); + let data = ProgressData {}; + if !progress.progress(&data) { + return None; + } + let tile_sbo = TileSuperBlockOffset(SuperBlockOffset { x: sbx, y: sby }); let mut sbs_qe = SBSQueueEntry { sbo: tile_sbo, @@ -3402,7 +3414,7 @@ fn encode_tile<'a, T: Pixel>( ts.sbo.0.x, ts.sbo.0.y ); - w.done() + Some(w.done()) } #[allow(unused)] @@ -3485,7 +3497,8 @@ fn get_initial_segmentation( pub fn encode_frame( fi: &FrameInvariants, fs: &mut FrameState, inter_cfg: &InterConfig, -) -> Vec { + progress: &dyn GranularProgress, +) -> Option> { debug_assert!(!fi.show_existing_frame); debug_assert!(!fi.invalid); let obu_extension = 0; @@ -3496,7 +3509,7 @@ pub fn encode_frame( fs.segmentation = get_initial_segmentation(fi); segmentation_optimize(fi, fs); } - let tile_group = encode_tile_group(fi, fs, inter_cfg); + let tile_group = encode_tile_group(fi, fs, inter_cfg, progress)?; if fi.frame_type == FrameType::KEY { write_key_frame_obus(&mut packet, fi, obu_extension).unwrap(); @@ -3527,7 +3540,7 @@ pub fn encode_frame( buf2.clear(); packet.write_all(&tile_group).unwrap(); - packet + Some(packet) } pub fn update_rec_buffer( diff --git a/src/fuzzing.rs b/src/fuzzing.rs index 8362783689..2c91d6432f 100644 --- a/src/fuzzing.rs +++ b/src/fuzzing.rs @@ -202,6 +202,9 @@ fn encode_frames( Err(EncoderStatus::Failure) => { return Err(EncoderStatus::Failure); } + Err(EncoderStatus::ImmediateExit) => { + unreachable!(); + } } }