diff --git a/src/contracts/scanner.rs b/src/contracts/scanner.rs index 19029099..9d880864 100644 --- a/src/contracts/scanner.rs +++ b/src/contracts/scanner.rs @@ -5,6 +5,11 @@ pub struct BlockScanner { read_provider: T, current_block: u64, window_size: u64, + + // How many blocks from the chain head to scan to + // e.g. if latest block is 20 and offset is set to 3 + // then the scanner will scan until block 17 + chain_head_offset: u64, } impl BlockScanner @@ -19,15 +24,22 @@ where read_provider, current_block: latest_block.as_u64(), window_size, + chain_head_offset: 0, }) } + pub fn with_offset(mut self, chain_head_offset: u64) -> Self { + self.chain_head_offset = chain_head_offset; + self + } + pub async fn next( &mut self, address: Option>, topics: [Option; 4], ) -> anyhow::Result> { let latest_block = self.read_provider.get_block_number().await?.as_u64(); + let latest_block = latest_block.saturating_sub(self.chain_head_offset); if self.current_block >= latest_block { return Ok(Vec::new()); diff --git a/src/task_monitor.rs b/src/task_monitor.rs index 82d3d464..c709bedd 100644 --- a/src/task_monitor.rs +++ b/src/task_monitor.rs @@ -103,6 +103,10 @@ pub struct Options { #[clap(long, env, default_value = "100")] pub scanning_window_size: u64, + /// The offset from the latest block to scan + #[clap(long, env, default_value = "0")] + pub scanning_chain_head_offset: u64, + /// The number of seconds to wait between fetching logs #[clap(long, env, default_value = "30")] pub time_between_scans_seconds: u64, @@ -131,6 +135,7 @@ pub struct TaskMonitor { // Finalization params scanning_window_size: u64, + scanning_chain_head_offset: u64, time_between_scans: Duration, max_epoch_duration: Duration, // TODO: docs @@ -150,6 +155,7 @@ impl TaskMonitor { let Options { batch_timeout_seconds, scanning_window_size, + scanning_chain_head_offset, time_between_scans_seconds, max_epoch_duration_seconds, monitored_txs_capacity, @@ -164,6 +170,7 @@ impl TaskMonitor { tree_state, batch_insert_timeout_secs: batch_timeout_seconds, scanning_window_size, + scanning_chain_head_offset, time_between_scans: Duration::from_secs(time_between_scans_seconds), batch_deletion_timeout_seconds, min_batch_deletion_size, @@ -200,6 +207,7 @@ impl TaskMonitor { self.tree_state.get_processed_tree(), self.tree_state.get_mined_tree(), self.scanning_window_size, + self.scanning_chain_head_offset, self.time_between_scans, self.max_epoch_duration, ); diff --git a/src/task_monitor/tasks/finalize_identities.rs b/src/task_monitor/tasks/finalize_identities.rs index b5b21f31..d24f8aec 100644 --- a/src/task_monitor/tasks/finalize_identities.rs +++ b/src/task_monitor/tasks/finalize_identities.rs @@ -23,9 +23,10 @@ pub struct FinalizeRoots { processed_tree: TreeVersion, finalized_tree: TreeVersion, - scanning_window_size: u64, - time_between_scans: Duration, - max_epoch_duration: Duration, + scanning_window_size: u64, + scanning_chain_head_offset: u64, + time_between_scans: Duration, + max_epoch_duration: Duration, } impl FinalizeRoots { @@ -35,6 +36,7 @@ impl FinalizeRoots { processed_tree: TreeVersion, finalized_tree: TreeVersion, scanning_window_size: u64, + scanning_chain_head_offset: u64, time_between_scans: Duration, max_epoch_duration: Duration, ) -> Arc { @@ -44,6 +46,7 @@ impl FinalizeRoots { processed_tree, finalized_tree, scanning_window_size, + scanning_chain_head_offset, time_between_scans, max_epoch_duration, }) @@ -56,6 +59,7 @@ impl FinalizeRoots { &self.processed_tree, &self.finalized_tree, self.scanning_window_size, + self.scanning_chain_head_offset, self.time_between_scans, self.max_epoch_duration, ) @@ -69,6 +73,7 @@ async fn finalize_roots_loop( processed_tree: &TreeVersion, finalized_tree: &TreeVersion, scanning_window_size: u64, + scanning_chain_head_offset: u64, time_between_scans: Duration, max_epoch_duration: Duration, ) -> AnyhowResult<()> { @@ -76,7 +81,10 @@ async fn finalize_roots_loop( let secondary_abis = identity_manager.secondary_abis(); let mut mainnet_scanner = - BlockScanner::new_latest(mainnet_abi.client().clone(), scanning_window_size).await?; + BlockScanner::new_latest(mainnet_abi.client().clone(), scanning_window_size) + .await? + .with_offset(scanning_chain_head_offset); + let mut secondary_scanners = init_secondary_scanners(secondary_abis, scanning_window_size).await?;