Skip to content

Commit

Permalink
l1_watcher: error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fborello-lambda committed Nov 22, 2024
1 parent c39f934 commit 216f0c4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 29 deletions.
2 changes: 1 addition & 1 deletion crates/l2/proposer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Committer {
}
}

pub async fn main_logic(&self) -> Result<(), CommitterError> {
async fn main_logic(&self) -> Result<(), CommitterError> {
let last_committed_block =
EthClient::get_last_committed_block(&self.eth_client, self.on_chain_proposer_address)
.await?;
Expand Down
71 changes: 43 additions & 28 deletions crates/l2/proposer/l1_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,13 @@ use keccak_hash::keccak;
use secp256k1::SecretKey;
use std::{cmp::min, ops::Mul, time::Duration};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};

pub async fn start_l1_watcher(store: Store) {
let eth_config = EthConfig::from_env().expect("EthConfig::from_env()");
let watcher_config = L1WatcherConfig::from_env().expect("L1WatcherConfig::from_env()");
let sleep_duration = Duration::from_millis(watcher_config.check_interval_ms);
let mut l1_watcher = L1Watcher::new_from_config(watcher_config, eth_config);
loop {
sleep(sleep_duration).await;

let logs = match l1_watcher.get_logs().await {
Ok(logs) => logs,
Err(error) => {
warn!("Error when getting logs from L1: {}", error);
continue;
}
};
if logs.is_empty() {
continue;
}

let pending_deposits_logs = match l1_watcher.get_pending_deposit_logs().await {
Ok(logs) => logs,
Err(error) => {
warn!("Error when getting L1 pending deposit logs: {}", error);
continue;
}
};
let _deposit_txs = l1_watcher
.process_logs(logs, &pending_deposits_logs, &store)
.await
.expect("l1_watcher.process_logs()");
}
l1_watcher.run(&store).await;
}

pub struct L1Watcher {
Expand All @@ -58,6 +32,7 @@ pub struct L1Watcher {
max_block_step: U256,
last_block_fetched: U256,
l2_proposer_pk: SecretKey,
check_interval: Duration,
}

impl L1Watcher {
Expand All @@ -69,6 +44,46 @@ impl L1Watcher {
max_block_step: watcher_config.max_block_step,
last_block_fetched: U256::zero(),
l2_proposer_pk: watcher_config.l2_proposer_private_key,
check_interval: Duration::from_millis(watcher_config.check_interval_ms),
}
}

pub async fn run(&mut self, store: &Store) {
loop {
if let Err(err) = self.main_logic(store.clone()).await {
error!("L1 Watcher Error: {}", err);
}

sleep(Duration::from_millis(200)).await;
}
}

async fn main_logic(&mut self, store: Store) -> Result<(), L1WatcherError> {
loop {
sleep(self.check_interval).await;

let logs = match self.get_logs().await {
Ok(logs) => logs,
Err(error) => {
warn!("Error when getting logs from L1: {}", error);
continue;
}
};
if logs.is_empty() {
continue;
}

let pending_deposits_logs = match self.get_pending_deposit_logs().await {
Ok(logs) => logs,
Err(error) => {
warn!("Error when getting L1 pending deposit logs: {}", error);
continue;
}
};
let _deposit_txs = self
.process_logs(logs, &pending_deposits_logs, &store)
.await
.expect("l1_watcher.process_logs()");
}
}

Expand Down

0 comments on commit 216f0c4

Please sign in to comment.