diff --git a/configs/settlement.toml b/configs/settlement.toml index 22ea0e4..1bb2122 100644 --- a/configs/settlement.toml +++ b/configs/settlement.toml @@ -18,4 +18,11 @@ private_key = "0xbe825d459385bbb3ba169ac8d59bd05b099b190d8a89aace7e5bc3518e2f1a9 chain_id = 12345 [ethereum_settlement_config.zeth_config.zeth_contracts_addr] -global_exit = "0xd0438bB2b2522D56d498F871fcf786006c180b64" \ No newline at end of file +global_exit = "0xd0438bB2b2522D56d498F871fcf786006c180b64" + +# A general configuration required for the settlement worker +[settlement_worker_config] +# unit: second +proof_worker_interval = 1 +verify_worker_interval = 1 +rollup_worker_interval = 1 \ No newline at end of file diff --git a/src/commands/run.rs b/src/commands/run.rs index 6d77984..7a52e54 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -8,6 +8,7 @@ use crate::custom_reth::TxFilterConfig; use crate::db::lfs; use crate::operator::Operator; use crate::settlement::ethereum::EthereumSettlementConfig; +use crate::settlement::worker::WorkerConfig; use crate::settlement::NetworkSpec; use anyhow::{anyhow, Result}; use tokio::select; @@ -194,6 +195,16 @@ impl RunCmd { } }; + let settlement_worker_config = match &self.settlement_conf { + None => { + return Err(anyhow::anyhow!( + "Custom node configuration is required for custom node" + )); + } + + Some(settlement_conf_path) => WorkerConfig::from_conf_path(settlement_conf_path)?, + }; + // Load the database configuration let db_config = match self.base_params.databases.database { Database::Memory => { @@ -259,6 +270,7 @@ impl RunCmd { a.as_str(), stop_rx, reth_started_signal_rx, + &settlement_worker_config, ) .await }); diff --git a/src/operator.rs b/src/operator.rs index 9bb76cd..298cb48 100644 --- a/src/operator.rs +++ b/src/operator.rs @@ -10,14 +10,16 @@ use ethers_providers::{Http, Provider}; // use serde::Serialize; use crate::db::Database; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::mpsc::Receiver; -use crate::settlement::worker::Settler; +use crate::settlement::worker::{Settler, WorkerConfig}; pub(crate) struct Operator; impl Operator { + #[allow(clippy::too_many_arguments)] pub async fn run( l2addr: &str, prover_addr: &str, @@ -26,6 +28,7 @@ impl Operator { aggregator_addr: &str, mut stop_rx: Receiver<()>, mut reth_started_signal_rx: Receiver<()>, + general_config: &WorkerConfig, ) -> Result<()> { // initialize all components of the eigen-zeth full node // initialize the prover @@ -59,11 +62,13 @@ impl Operator { let arc_db_for_verify_worker = rollup_db.clone(); let settlement_provider_for_verify_worker = arc_settlement_provider.clone(); let (verify_stop_tx, verify_stop_rx) = mpsc::channel::<()>(1); + let verify_interval = Duration::from_secs(general_config.verify_worker_interval); tokio::spawn(async move { Settler::verify_worker( arc_db_for_verify_worker, settlement_provider_for_verify_worker, verify_stop_rx, + verify_interval, ) .await }); @@ -71,20 +76,29 @@ impl Operator { // start the proof worker let arc_db_for_proof_worker = rollup_db.clone(); let (proof_stop_tx, proof_stop_rx) = mpsc::channel::<()>(1); + let proof_interval = Duration::from_secs(general_config.proof_worker_interval); tokio::spawn(async move { - Settler::proof_worker(arc_db_for_proof_worker, prover, proof_stop_rx).await + Settler::proof_worker( + arc_db_for_proof_worker, + prover, + proof_stop_rx, + proof_interval, + ) + .await }); let arc_db_for_submit_worker = rollup_db.clone(); let settlement_provider_for_submit_worker = arc_settlement_provider.clone(); let l2provider_for_submit_worker = l2provider.clone(); let (submit_stop_tx, submit_stop_rx) = mpsc::channel::<()>(1); + let rollup_interval = Duration::from_secs(general_config.rollup_worker_interval); tokio::spawn(async move { Settler::rollup( arc_db_for_submit_worker, l2provider_for_submit_worker, settlement_provider_for_submit_worker, submit_stop_rx, + rollup_interval, ) .await }); diff --git a/src/settlement/worker.rs b/src/settlement/worker.rs index b70302b..9f1c19b 100644 --- a/src/settlement/worker.rs +++ b/src/settlement/worker.rs @@ -4,22 +4,44 @@ use crate::prover::ProverChannel; use crate::settlement::{BatchData, Settlement}; use alloy_rlp::{length_of_length, BytesMut, Encodable, Header}; use anyhow::{anyhow, Result}; +use config::{Config, File}; use ethers::prelude::U64; use ethers_core::types::{BlockId, BlockNumber, Transaction}; use ethers_providers::{Http, Middleware, Provider}; use prost::bytes; use reqwest::Client; use reth_primitives::{Bytes, TransactionKind, TxLegacy}; +use serde::Deserialize; +use std::path::Path; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -const PROOF_INTERVAL: Duration = Duration::from_secs(30); -const VERIFY_INTERVAL: Duration = Duration::from_secs(30); -const SUBMIT_INTERVAL: Duration = Duration::from_secs(30); - pub(crate) struct Settler {} +/// A general configuration that needs to be included in the configuration structure of each implementation. +#[derive(Debug, Clone, Deserialize)] +pub struct WorkerConfig { + pub proof_worker_interval: u64, + pub verify_worker_interval: u64, + pub rollup_worker_interval: u64, +} + +impl WorkerConfig { + pub fn from_conf_path(conf_path: &str) -> Result { + log::info!("Load the settlement worker config from: {}", conf_path); + + let config = Config::builder() + .add_source(File::from(Path::new(conf_path))) + .build() + .map_err(|e| anyhow!("Failed to build config: {:?}", e))?; + + config + .get("settlement_worker_config") + .map_err(|e| anyhow!("Failed to parse WorkerConfig: {:?}", e)) + } +} + // TODO: Use channels, streams, and other methods to flow data between workers, // and use event driven replacement of rotation databases to drive Zeth to run, // avoiding frequent database access @@ -29,8 +51,9 @@ impl Settler { db: Arc>, mut prover: ProverChannel, mut stop_rx: mpsc::Receiver<()>, + worker_interval: Duration, ) -> Result<()> { - let mut ticker = tokio::time::interval(PROOF_INTERVAL); + let mut ticker = tokio::time::interval(worker_interval); prover.start().await.unwrap(); log::info!("Prove Worker started"); @@ -141,8 +164,9 @@ impl Settler { db: Arc>, settlement_provider: Arc>, mut stop_rx: mpsc::Receiver<()>, + worker_interval: Duration, ) -> Result<()> { - let mut ticker = tokio::time::interval(VERIFY_INTERVAL); + let mut ticker = tokio::time::interval(worker_interval); let bridge_service_client = Client::new(); log::info!("Verify Worker started"); loop { @@ -222,8 +246,9 @@ impl Settler { l2provider: Provider, settlement_provider: Arc>, mut stop_rx: mpsc::Receiver<()>, + worker_interval: Duration, ) -> Result<()> { - let mut ticker = tokio::time::interval(SUBMIT_INTERVAL); + let mut ticker = tokio::time::interval(worker_interval); log::info!("Submit Worker started"); loop { tokio::select! { @@ -531,7 +556,16 @@ mod tests { let (tx, rx) = mpsc::channel(1); let stop_rx = rx; - let submit_worker = Settler::rollup(arc_db, l2provider, arc_settlement_provider, stop_rx); + + let conf_path = "configs/settlement.toml"; + let config = WorkerConfig::from_conf_path(conf_path).unwrap(); + let submit_worker = Settler::rollup( + arc_db, + l2provider, + arc_settlement_provider, + stop_rx, + Duration::from_secs(config.rollup_worker_interval), + ); tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(5)).await; @@ -608,7 +642,15 @@ mod tests { let (tx, rx) = mpsc::channel(1); let stop_rx: mpsc::Receiver<()> = rx; - let verify_worker = Settler::verify_worker(arc_db, arc_settlement_provider, stop_rx); + + let conf_path = "configs/settlement.toml"; + let config = WorkerConfig::from_conf_path(conf_path).unwrap(); + let verify_worker = Settler::verify_worker( + arc_db, + arc_settlement_provider, + stop_rx, + Duration::from_secs(config.rollup_worker_interval), + ); tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(5)).await; @@ -640,4 +682,11 @@ mod tests { ); Ok(()) } + + #[test] + fn test_from_conf_path() { + let conf_path = "configs/settlement.toml"; + let config = WorkerConfig::from_conf_path(conf_path).unwrap(); + println!("{:#?}", config); + } }