From 07a902009d4cb5524aa2b3365259e6b2a9a71444 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 15 May 2024 16:43:53 +0530 Subject: [PATCH 1/4] add retry blocks --- apps/fortuna/Cargo.lock | 2 +- apps/fortuna/Cargo.toml | 2 +- apps/fortuna/src/keeper.rs | 117 +++++++++++++++++++++++++++++++++---- 3 files changed, 107 insertions(+), 14 deletions(-) diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index 50bb566de2..b584a452bc 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1488,7 +1488,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "5.3.0" +version = "5.3.1" dependencies = [ "anyhow", "axum", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index c790bc8ba1..145700587a 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "5.3.0" +version = "5.3.1" edition = "2021" [dependencies] diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 2ebcafcc08..fffff05f04 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -44,9 +44,12 @@ use { }, registry::Registry, }, - std::sync::{ - atomic::AtomicU64, - Arc, + std::{ + collections::HashMap, + sync::{ + atomic::AtomicU64, + Arc, + }, }, tokio::{ spawn, @@ -91,6 +94,7 @@ pub struct KeeperMetrics { pub total_gas_spent: Family>, pub requests: Family, pub requests_processed: Family, + pub requests_reprocessed: Family, pub reveals: Family, } @@ -147,6 +151,12 @@ impl KeeperMetrics { keeper_metrics.total_gas_spent.clone(), ); + writable_registry.register( + "requests_reprocessed", + "Number of requests reprocessed", + keeper_metrics.requests_reprocessed.clone(), + ); + keeper_metrics } } @@ -157,6 +167,16 @@ pub struct BlockRange { pub to: BlockNumber, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RequestState { + /// Fulfilled means that the request was either revealed or we are sure we + /// will not be able to reveal it. + Fulfilled, + /// We have already processed the request but couldn't fulfill it and we are + /// unsure if we can fulfill it or not. + Processed, +} + /// Get the latest safe block number for the chain. Retry internally if there is an error. async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { loop { @@ -203,6 +223,8 @@ pub async fn run_keeper_threads( ); let keeper_address = contract.client().inner().inner().signer().address(); + let fulfilled_requests_cache = Arc::new(RwLock::new(HashMap::::new())); + // Spawn a thread to handle the events from last BACKLOG_RANGE blocks. spawn( process_backlog( @@ -214,6 +236,7 @@ pub async fn run_keeper_threads( chain_eth_config.gas_limit, chain_state.clone(), keeper_metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span(), ); @@ -237,6 +260,7 @@ pub async fn run_keeper_threads( Arc::clone(&contract), chain_eth_config.gas_limit, keeper_metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span(), ); @@ -290,6 +314,7 @@ pub async fn process_event( contract: &Arc, gas_limit: U256, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) -> Result<()> { if chain_config.provider_address != event.provider_address { return Ok(()); @@ -302,6 +327,10 @@ pub async fn process_event( "Error while revealing with error: {:?}", e ); + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Fulfilled); return Ok(()); } }; @@ -330,6 +359,11 @@ pub async fn process_event( sequence_number = &event.sequence_number, "Gas estimate for reveal with callback is higher than the gas limit" ); + + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Fulfilled); return Ok(()); } @@ -399,6 +433,11 @@ pub async fn process_event( address: chain_config.provider_address.to_string(), }) .inc(); + + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Fulfilled); Ok(()) } None => { @@ -450,6 +489,7 @@ pub async fn process_block_range( gas_limit: U256, chain_state: api::BlockchainState, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) { let BlockRange { from: first_block, @@ -471,6 +511,7 @@ pub async fn process_block_range( gas_limit, chain_state.clone(), metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span() .await; @@ -488,6 +529,7 @@ pub async fn process_single_block_batch( gas_limit: U256, chain_state: api::BlockchainState, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) { loop { let events_res = chain_state @@ -499,6 +541,34 @@ pub async fn process_single_block_batch( Ok(events) => { tracing::info!(num_of_events = &events.len(), "Processing",); for event in &events { + if let Some(state) = fulfilled_requests_cache + .read() + .await + .get(&event.sequence_number) + { + match state { + RequestState::Fulfilled => { + tracing::info!( + sequence_number = &event.sequence_number, + "Skipping already fulfilled request", + ); + continue; + } + RequestState::Processed => { + tracing::info!( + sequence_number = &event.sequence_number, + "Reprocessing already processed request", + ); + metrics + .requests_reprocessed + .get_or_create(&AccountLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }) + .inc(); + } + } + } metrics .requests .get_or_create(&AccountLabel { @@ -513,6 +583,7 @@ pub async fn process_single_block_batch( &contract, gas_limit, metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span() .await @@ -627,16 +698,26 @@ pub async fn watch_blocks( let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; if latest_safe_block > *last_safe_block_processed { + let mut from = latest_safe_block.checked_sub(100).unwrap_or(0); + + // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10) + // TODO: add a metric for this in separate PR. We need alerts + // But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and + // last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc + // to be in consistency after this much time. + if from > *last_safe_block_processed { + from = *last_safe_block_processed; + } match tx .send(BlockRange { - from: *last_safe_block_processed + 1, - to: latest_safe_block, + from, + to: latest_safe_block, }) .await { Ok(_) => { tracing::info!( - from_block = *last_safe_block_processed + 1, + from_block = from, to_block = &latest_safe_block, "Block range sent to handle events", ); @@ -644,9 +725,11 @@ pub async fn watch_blocks( } Err(e) => { tracing::error!( - "Error while sending block range to handle events. These will be handled in next call. error: {:?}", - e - ); + from_block = from, + to_block = &latest_safe_block, + "Error while sending block range to handle events. These will be handled in next call. error: {:?}", + e + ); } }; } @@ -661,6 +744,7 @@ pub async fn process_new_blocks( contract: Arc, gas_limit: U256, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) { tracing::info!("Waiting for new block ranges to process"); loop { @@ -671,6 +755,7 @@ pub async fn process_new_blocks( gas_limit, chain_state.clone(), metrics.clone(), + fulfilled_requests_cache.clone(), ) .in_current_span() .await; @@ -686,11 +771,19 @@ pub async fn process_backlog( gas_limit: U256, chain_state: BlockchainState, metrics: Arc, + fulfilled_requests_cache: Arc>>, ) { tracing::info!("Processing backlog"); - process_block_range(backlog_range, contract, gas_limit, chain_state, metrics) - .in_current_span() - .await; + process_block_range( + backlog_range, + contract, + gas_limit, + chain_state, + metrics, + fulfilled_requests_cache, + ) + .in_current_span() + .await; tracing::info!("Backlog processed"); } From e62cc203dfcf94f66de88862801812fa1bc3e74b Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 15 May 2024 16:47:01 +0530 Subject: [PATCH 2/4] add comment --- apps/fortuna/src/keeper.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index fffff05f04..b9015189a7 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -521,7 +521,10 @@ pub async fn process_block_range( } /// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch -/// and then try to process them one by one. If the process fails, it will retry indefinitely. +/// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled. +/// It won't reprocess it. If the request was already processed, it will reprocess it. +/// If the process fails, it will retry indefinitely. +/// #[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))] pub async fn process_single_block_batch( block_range: BlockRange, From 563c4bad5256421616cb33db041111dc9df9f1d4 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 15 May 2024 17:15:05 +0530 Subject: [PATCH 3/4] add comment --- apps/fortuna/src/keeper.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index b9015189a7..215695ee4e 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -78,6 +78,8 @@ const BLOCK_BATCH_SIZE: u64 = 100; const POLL_INTERVAL: Duration = Duration::from_secs(2); /// Track metrics in this interval const TRACK_INTERVAL: Duration = Duration::from_secs(10); +/// Rety last N blocks +const RETRY_PREVIOUS_BLOCKS: u64 = 100; #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] pub struct AccountLabel { @@ -502,6 +504,7 @@ pub async fn process_block_range( to_block = last_block; } + // TODO: this is handling all blocks sequentially we might want to handle them in parallel in future. process_single_block_batch( BlockRange { from: current_block, @@ -524,7 +527,6 @@ pub async fn process_block_range( /// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled. /// It won't reprocess it. If the request was already processed, it will reprocess it. /// If the process fails, it will retry indefinitely. -/// #[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))] pub async fn process_single_block_batch( block_range: BlockRange, @@ -701,7 +703,9 @@ pub async fn watch_blocks( let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; if latest_safe_block > *last_safe_block_processed { - let mut from = latest_safe_block.checked_sub(100).unwrap_or(0); + let mut from = latest_safe_block + .checked_sub(RETRY_PREVIOUS_BLOCKS) + .unwrap_or(0); // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10) // TODO: add a metric for this in separate PR. We need alerts From ae20dfad00dfaf3a3ebac08f4ab1d02621cb8fc6 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 15 May 2024 18:07:10 +0530 Subject: [PATCH 4/4] add processed --- apps/fortuna/src/keeper.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 215695ee4e..67858f813d 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -319,6 +319,10 @@ pub async fn process_event( fulfilled_requests_cache: Arc>>, ) -> Result<()> { if chain_config.provider_address != event.provider_address { + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Fulfilled); return Ok(()); } let provider_revelation = match chain_config.state.reveal(event.sequence_number) { @@ -390,6 +394,10 @@ pub async fn process_event( // ever. We will return an Ok(()) to signal that we have processed this reveal // and concluded that its Ok to not reveal. _ => { + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Processed); tracing::error!( sequence_number = &event.sequence_number, "Error while revealing with error: {:?}", @@ -466,8 +474,12 @@ pub async fn process_event( None => { tracing::info!( sequence_number = &event.sequence_number, - "Not processing event" + "Not fulfilling event" ); + fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number, RequestState::Processed); Ok(()) } },