diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index 50bb566de2..b584a452bc 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1488,7 +1488,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "5.3.0" +version = "5.3.1" dependencies = [ "anyhow", "axum", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index c790bc8ba1..145700587a 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "5.3.0" +version = "5.3.1" edition = "2021" [dependencies] diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 2ebcafcc08..67858f813d 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -44,9 +44,12 @@ use { }, registry::Registry, }, - std::sync::{ - atomic::AtomicU64, - Arc, + std::{ + collections::HashMap, + sync::{ + atomic::AtomicU64, + Arc, + }, }, tokio::{ spawn, @@ -75,6 +78,8 @@ const BLOCK_BATCH_SIZE: u64 = 100; const POLL_INTERVAL: Duration = Duration::from_secs(2); /// Track metrics in this interval const TRACK_INTERVAL: Duration = Duration::from_secs(10); +/// Rety last N blocks +const RETRY_PREVIOUS_BLOCKS: u64 = 100; #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] pub struct AccountLabel { @@ -91,6 +96,7 @@ pub struct KeeperMetrics { pub total_gas_spent: Family>, pub requests: Family, pub requests_processed: Family, + pub requests_reprocessed: Family, pub reveals: Family, } @@ -147,6 +153,12 @@ impl KeeperMetrics { keeper_metrics.total_gas_spent.clone(), ); + writable_registry.register( + "requests_reprocessed", + "Number of requests reprocessed", + keeper_metrics.requests_reprocessed.clone(), + ); + keeper_metrics } } @@ -157,6 +169,16 @@ pub struct BlockRange { pub to: BlockNumber, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RequestState { + /// Fulfilled means that the request was either revealed or we are sure we + /// will not be able to reveal it. + Fulfilled, + /// We have already processed the request but couldn't fulfill it and we are + /// unsure if we can fulfill it or not. + Processed, +} + /// Get the latest safe block number for the chain. Retry internally if there is an error. async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { loop { @@ -203,6 +225,8 @@ pub async fn run_keeper_threads( ); let keeper_address = contract.client().inner().inner().signer().address(); + let fulfilled_requests_cache = Arc::new(RwLock::new(HashMap::::new())); + // Spawn a thread to handle the events from last BACKLOG_RANGE blocks. spawn( process_backlog( @@ -214,6 +238,7 @@ pub async fn run_keeper_threads( chain_eth_config.gas_limit, chain_state.clone(), keeper_metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span(), ); @@ -237,6 +262,7 @@ pub async fn run_keeper_threads( Arc::clone(&contract), chain_eth_config.gas_limit, keeper_metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span(), ); @@ -290,8 +316,13 @@ pub async fn process_event( contract: &Arc, gas_limit: U256, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) -> Result<()> { if chain_config.provider_address != event.provider_address { + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Fulfilled); return Ok(()); } let provider_revelation = match chain_config.state.reveal(event.sequence_number) { @@ -302,6 +333,10 @@ pub async fn process_event( "Error while revealing with error: {:?}", e ); + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Fulfilled); return Ok(()); } }; @@ -330,6 +365,11 @@ pub async fn process_event( sequence_number = &event.sequence_number, "Gas estimate for reveal with callback is higher than the gas limit" ); + + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Fulfilled); return Ok(()); } @@ -354,6 +394,10 @@ pub async fn process_event( // ever. We will return an Ok(()) to signal that we have processed this reveal // and concluded that its Ok to not reveal. _ => { + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Processed); tracing::error!( sequence_number = &event.sequence_number, "Error while revealing with error: {:?}", @@ -399,6 +443,11 @@ pub async fn process_event( address: chain_config.provider_address.to_string(), }) .inc(); + + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Fulfilled); Ok(()) } None => { @@ -425,8 +474,12 @@ pub async fn process_event( None => { tracing::info!( sequence_number = &event.sequence_number, - "Not processing event" + "Not fulfilling event" ); + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Processed); Ok(()) } }, @@ -450,6 +503,7 @@ pub async fn process_block_range( gas_limit: U256, chain_state: api::BlockchainState, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) { let BlockRange { from: first_block, @@ -462,6 +516,7 @@ pub async fn process_block_range( to_block = last_block; } + // TODO: this is handling all blocks sequentially we might want to handle them in parallel in future. process_single_block_batch( BlockRange { from: current_block, @@ -471,6 +526,7 @@ pub async fn process_block_range( gas_limit, chain_state.clone(), metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span() .await; @@ -480,7 +536,9 @@ pub async fn process_block_range( } /// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch -/// and then try to process them one by one. If the process fails, it will retry indefinitely. +/// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled. +/// It won't reprocess it. If the request was already processed, it will reprocess it. +/// If the process fails, it will retry indefinitely. #[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))] pub async fn process_single_block_batch( block_range: BlockRange, @@ -488,6 +546,7 @@ pub async fn process_single_block_batch( gas_limit: U256, chain_state: api::BlockchainState, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) { loop { let events_res = chain_state @@ -499,6 +558,34 @@ pub async fn process_single_block_batch( Ok(events) => { tracing::info!(num_of_events = &events.len(), "Processing",); for event in &events { + if let Some(state) = fulfilled_requests_cache + .read() + .await + .get(&event.sequence_number) + { + match state { + RequestState::Fulfilled => { + tracing::info!( + sequence_number = &event.sequence_number, + "Skipping already fulfilled request", + ); + continue; + } + RequestState::Processed => { + tracing::info!( + sequence_number = &event.sequence_number, + "Reprocessing already processed request", + ); + metrics + .requests_reprocessed + .get_or_create(&AccountLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }) + .inc(); + } + } + } metrics .requests .get_or_create(&AccountLabel { @@ -513,6 +600,7 @@ pub async fn process_single_block_batch( &contract, gas_limit, metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span() .await @@ -627,16 +715,28 @@ pub async fn watch_blocks( let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; if latest_safe_block > *last_safe_block_processed { + let mut from = latest_safe_block + .checked_sub(RETRY_PREVIOUS_BLOCKS) + .unwrap_or(0); + + // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10) + // TODO: add a metric for this in separate PR. We need alerts + // But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and + // last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc + // to be in consistency after this much time. + if from > *last_safe_block_processed { + from = *last_safe_block_processed; + } match tx .send(BlockRange { - from: *last_safe_block_processed + 1, - to: latest_safe_block, + from, + to: latest_safe_block, }) .await { Ok(_) => { tracing::info!( - from_block = *last_safe_block_processed + 1, + from_block = from, to_block = &latest_safe_block, "Block range sent to handle events", ); @@ -644,9 +744,11 @@ pub async fn watch_blocks( } Err(e) => { tracing::error!( - "Error while sending block range to handle events. These will be handled in next call. error: {:?}", - e - ); + from_block = from, + to_block = &latest_safe_block, + "Error while sending block range to handle events. These will be handled in next call. error: {:?}", + e + ); } }; } @@ -661,6 +763,7 @@ pub async fn process_new_blocks( contract: Arc, gas_limit: U256, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) { tracing::info!("Waiting for new block ranges to process"); loop { @@ -671,6 +774,7 @@ pub async fn process_new_blocks( gas_limit, chain_state.clone(), metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span() .await; @@ -686,11 +790,19 @@ pub async fn process_backlog( gas_limit: U256, chain_state: BlockchainState, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) { tracing::info!("Processing backlog"); - process_block_range(backlog_range, contract, gas_limit, chain_state, metrics) - .in_current_span() - .await; + process_block_range( + backlog_range, + contract, + gas_limit, + chain_state, + metrics, + fulfilled_requests_cache, + ) + .in_current_span() + .await; tracing::info!("Backlog processed"); }