Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fortuna): retry blocks #1579

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "5.3.0"
version = "5.3.1"
edition = "2021"

[dependencies]
Expand Down
140 changes: 126 additions & 14 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ use {
},
registry::Registry,
},
std::sync::{
atomic::AtomicU64,
Arc,
std::{
collections::HashMap,
sync::{
atomic::AtomicU64,
Arc,
},
},
tokio::{
spawn,
Expand Down Expand Up @@ -75,6 +78,8 @@ const BLOCK_BATCH_SIZE: u64 = 100;
const POLL_INTERVAL: Duration = Duration::from_secs(2);
/// Track metrics in this interval
const TRACK_INTERVAL: Duration = Duration::from_secs(10);
/// Rety last N blocks
const RETRY_PREVIOUS_BLOCKS: u64 = 100;

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct AccountLabel {
Expand All @@ -91,6 +96,7 @@ pub struct KeeperMetrics {
pub total_gas_spent: Family<AccountLabel, Gauge<f64, AtomicU64>>,
pub requests: Family<AccountLabel, Counter>,
pub requests_processed: Family<AccountLabel, Counter>,
pub requests_reprocessed: Family<AccountLabel, Counter>,
pub reveals: Family<AccountLabel, Counter>,
}

Expand Down Expand Up @@ -147,6 +153,12 @@ impl KeeperMetrics {
keeper_metrics.total_gas_spent.clone(),
);

writable_registry.register(
"requests_reprocessed",
"Number of requests reprocessed",
keeper_metrics.requests_reprocessed.clone(),
);

keeper_metrics
}
}
Expand All @@ -157,6 +169,16 @@ pub struct BlockRange {
pub to: BlockNumber,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestState {
/// Fulfilled means that the request was either revealed or we are sure we
/// will not be able to reveal it.
Fulfilled,
/// We have already processed the request but couldn't fulfill it and we are
/// unsure if we can fulfill it or not.
Processed,
0xfirefist marked this conversation as resolved.
Show resolved Hide resolved
}

/// Get the latest safe block number for the chain. Retry internally if there is an error.
async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
loop {
Expand Down Expand Up @@ -203,6 +225,8 @@ pub async fn run_keeper_threads(
);
let keeper_address = contract.client().inner().inner().signer().address();

let fulfilled_requests_cache = Arc::new(RwLock::new(HashMap::<u64, RequestState>::new()));

// Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
spawn(
process_backlog(
Expand All @@ -214,6 +238,7 @@ pub async fn run_keeper_threads(
chain_eth_config.gas_limit,
chain_state.clone(),
keeper_metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span(),
);
Expand All @@ -237,6 +262,7 @@ pub async fn run_keeper_threads(
Arc::clone(&contract),
chain_eth_config.gas_limit,
keeper_metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span(),
);
Expand Down Expand Up @@ -290,8 +316,13 @@ pub async fn process_event(
contract: &Arc<SignablePythContract>,
gas_limit: U256,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) -> Result<()> {
if chain_config.provider_address != event.provider_address {
fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Fulfilled);
return Ok(());
}
let provider_revelation = match chain_config.state.reveal(event.sequence_number) {
Expand All @@ -302,6 +333,10 @@ pub async fn process_event(
"Error while revealing with error: {:?}",
e
);
fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Fulfilled);
return Ok(());
}
};
Expand Down Expand Up @@ -330,6 +365,11 @@ pub async fn process_event(
sequence_number = &event.sequence_number,
"Gas estimate for reveal with callback is higher than the gas limit"
);

fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Fulfilled);
return Ok(());
}

Expand All @@ -354,6 +394,10 @@ pub async fn process_event(
// ever. We will return an Ok(()) to signal that we have processed this reveal
// and concluded that its Ok to not reveal.
_ => {
fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Processed);
tracing::error!(
sequence_number = &event.sequence_number,
"Error while revealing with error: {:?}",
Expand Down Expand Up @@ -399,6 +443,11 @@ pub async fn process_event(
address: chain_config.provider_address.to_string(),
})
.inc();

fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Fulfilled);
Ok(())
}
None => {
Expand All @@ -425,8 +474,12 @@ pub async fn process_event(
None => {
tracing::info!(
sequence_number = &event.sequence_number,
"Not processing event"
"Not fulfilling event"
);
fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Processed);
Ok(())
}
},
Expand All @@ -450,6 +503,7 @@ pub async fn process_block_range(
gas_limit: U256,
chain_state: api::BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) {
let BlockRange {
from: first_block,
Expand All @@ -462,6 +516,7 @@ pub async fn process_block_range(
to_block = last_block;
}

// TODO: this is handling all blocks sequentially we might want to handle them in parallel in future.
process_single_block_batch(
BlockRange {
from: current_block,
Expand All @@ -471,6 +526,7 @@ pub async fn process_block_range(
gas_limit,
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
Expand All @@ -480,14 +536,17 @@ pub async fn process_block_range(
}

/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch
/// and then try to process them one by one. If the process fails, it will retry indefinitely.
/// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled.
/// It won't reprocess it. If the request was already processed, it will reprocess it.
/// If the process fails, it will retry indefinitely.
#[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))]
pub async fn process_single_block_batch(
block_range: BlockRange,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: api::BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) {
loop {
let events_res = chain_state
Expand All @@ -499,6 +558,34 @@ pub async fn process_single_block_batch(
Ok(events) => {
tracing::info!(num_of_events = &events.len(), "Processing",);
for event in &events {
0xfirefist marked this conversation as resolved.
Show resolved Hide resolved
if let Some(state) = fulfilled_requests_cache
.read()
.await
.get(&event.sequence_number)
{
match state {
RequestState::Fulfilled => {
tracing::info!(
sequence_number = &event.sequence_number,
"Skipping already fulfilled request",
);
continue;
}
RequestState::Processed => {
0xfirefist marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!(
sequence_number = &event.sequence_number,
"Reprocessing already processed request",
);
metrics
.requests_reprocessed
.get_or_create(&AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.inc();
}
}
}
metrics
.requests
.get_or_create(&AccountLabel {
Expand All @@ -513,6 +600,7 @@ pub async fn process_single_block_batch(
&contract,
gas_limit,
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await
Expand Down Expand Up @@ -627,26 +715,40 @@ pub async fn watch_blocks(

let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
if latest_safe_block > *last_safe_block_processed {
let mut from = latest_safe_block
.checked_sub(RETRY_PREVIOUS_BLOCKS)
.unwrap_or(0);

// In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10)
// TODO: add a metric for this in separate PR. We need alerts
// But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and
// last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc
// to be in consistency after this much time.
if from > *last_safe_block_processed {
from = *last_safe_block_processed;
}
match tx
.send(BlockRange {
from: *last_safe_block_processed + 1,
to: latest_safe_block,
from,
to: latest_safe_block,
})
.await
{
Ok(_) => {
tracing::info!(
from_block = *last_safe_block_processed + 1,
from_block = from,
to_block = &latest_safe_block,
"Block range sent to handle events",
);
*last_safe_block_processed = latest_safe_block;
}
Err(e) => {
tracing::error!(
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
from_block = from,
to_block = &latest_safe_block,
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
}
};
}
Expand All @@ -661,6 +763,7 @@ pub async fn process_new_blocks(
contract: Arc<SignablePythContract>,
gas_limit: U256,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) {
tracing::info!("Waiting for new block ranges to process");
loop {
Expand All @@ -671,6 +774,7 @@ pub async fn process_new_blocks(
gas_limit,
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
Expand All @@ -686,11 +790,19 @@ pub async fn process_backlog(
gas_limit: U256,
chain_state: BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) {
tracing::info!("Processing backlog");
process_block_range(backlog_range, contract, gas_limit, chain_state, metrics)
.in_current_span()
.await;
process_block_range(
backlog_range,
contract,
gas_limit,
chain_state,
metrics,
fulfilled_requests_cache,
)
.in_current_span()
.await;
tracing::info!("Backlog processed");
}

Expand Down
Loading