Skip to content

Commit

Permalink
chore: add worker interval config (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
captainlee1024 authored Aug 16, 2024
1 parent 447d871 commit 80b9154
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 12 deletions.
9 changes: 8 additions & 1 deletion configs/settlement.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,11 @@ private_key = "0xbe825d459385bbb3ba169ac8d59bd05b099b190d8a89aace7e5bc3518e2f1a9
chain_id = 12345

[ethereum_settlement_config.zeth_config.zeth_contracts_addr]
global_exit = "0xd0438bB2b2522D56d498F871fcf786006c180b64"
global_exit = "0xd0438bB2b2522D56d498F871fcf786006c180b64"

# A general configuration required for the settlement worker
[settlement_worker_config]
# unit: second
proof_worker_interval = 1
verify_worker_interval = 1
rollup_worker_interval = 1
12 changes: 12 additions & 0 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::custom_reth::TxFilterConfig;
use crate::db::lfs;
use crate::operator::Operator;
use crate::settlement::ethereum::EthereumSettlementConfig;
use crate::settlement::worker::WorkerConfig;
use crate::settlement::NetworkSpec;
use anyhow::{anyhow, Result};
use tokio::select;
Expand Down Expand Up @@ -194,6 +195,16 @@ impl RunCmd {
}
};

let settlement_worker_config = match &self.settlement_conf {
None => {
return Err(anyhow::anyhow!(
"Custom node configuration is required for custom node"
));
}

Some(settlement_conf_path) => WorkerConfig::from_conf_path(settlement_conf_path)?,
};

// Load the database configuration
let db_config = match self.base_params.databases.database {
Database::Memory => {
Expand Down Expand Up @@ -259,6 +270,7 @@ impl RunCmd {
a.as_str(),
stop_rx,
reth_started_signal_rx,
&settlement_worker_config,
)
.await
});
Expand Down
18 changes: 16 additions & 2 deletions src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ use ethers_providers::{Http, Provider};
// use serde::Serialize;
use crate::db::Database;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;

use crate::settlement::worker::Settler;
use crate::settlement::worker::{Settler, WorkerConfig};

pub(crate) struct Operator;

impl Operator {
#[allow(clippy::too_many_arguments)]
pub async fn run(
l2addr: &str,
prover_addr: &str,
Expand All @@ -26,6 +28,7 @@ impl Operator {
aggregator_addr: &str,
mut stop_rx: Receiver<()>,
mut reth_started_signal_rx: Receiver<()>,
general_config: &WorkerConfig,
) -> Result<()> {
// initialize all components of the eigen-zeth full node
// initialize the prover
Expand Down Expand Up @@ -59,32 +62,43 @@ impl Operator {
let arc_db_for_verify_worker = rollup_db.clone();
let settlement_provider_for_verify_worker = arc_settlement_provider.clone();
let (verify_stop_tx, verify_stop_rx) = mpsc::channel::<()>(1);
let verify_interval = Duration::from_secs(general_config.verify_worker_interval);
tokio::spawn(async move {
Settler::verify_worker(
arc_db_for_verify_worker,
settlement_provider_for_verify_worker,
verify_stop_rx,
verify_interval,
)
.await
});

// start the proof worker
let arc_db_for_proof_worker = rollup_db.clone();
let (proof_stop_tx, proof_stop_rx) = mpsc::channel::<()>(1);
let proof_interval = Duration::from_secs(general_config.proof_worker_interval);
tokio::spawn(async move {
Settler::proof_worker(arc_db_for_proof_worker, prover, proof_stop_rx).await
Settler::proof_worker(
arc_db_for_proof_worker,
prover,
proof_stop_rx,
proof_interval,
)
.await
});

let arc_db_for_submit_worker = rollup_db.clone();
let settlement_provider_for_submit_worker = arc_settlement_provider.clone();
let l2provider_for_submit_worker = l2provider.clone();
let (submit_stop_tx, submit_stop_rx) = mpsc::channel::<()>(1);
let rollup_interval = Duration::from_secs(general_config.rollup_worker_interval);
tokio::spawn(async move {
Settler::rollup(
arc_db_for_submit_worker,
l2provider_for_submit_worker,
settlement_provider_for_submit_worker,
submit_stop_rx,
rollup_interval,
)
.await
});
Expand Down
67 changes: 58 additions & 9 deletions src/settlement/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,44 @@ use crate::prover::ProverChannel;
use crate::settlement::{BatchData, Settlement};
use alloy_rlp::{length_of_length, BytesMut, Encodable, Header};
use anyhow::{anyhow, Result};
use config::{Config, File};
use ethers::prelude::U64;
use ethers_core::types::{BlockId, BlockNumber, Transaction};
use ethers_providers::{Http, Middleware, Provider};
use prost::bytes;
use reqwest::Client;
use reth_primitives::{Bytes, TransactionKind, TxLegacy};
use serde::Deserialize;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

const PROOF_INTERVAL: Duration = Duration::from_secs(30);
const VERIFY_INTERVAL: Duration = Duration::from_secs(30);
const SUBMIT_INTERVAL: Duration = Duration::from_secs(30);

pub(crate) struct Settler {}

/// A general configuration that needs to be included in the configuration structure of each implementation.
#[derive(Debug, Clone, Deserialize)]
pub struct WorkerConfig {
pub proof_worker_interval: u64,
pub verify_worker_interval: u64,
pub rollup_worker_interval: u64,
}

impl WorkerConfig {
pub fn from_conf_path(conf_path: &str) -> Result<Self> {
log::info!("Load the settlement worker config from: {}", conf_path);

let config = Config::builder()
.add_source(File::from(Path::new(conf_path)))
.build()
.map_err(|e| anyhow!("Failed to build config: {:?}", e))?;

config
.get("settlement_worker_config")
.map_err(|e| anyhow!("Failed to parse WorkerConfig: {:?}", e))
}
}

// TODO: Use channels, streams, and other methods to flow data between workers,
// and use event driven replacement of rotation databases to drive Zeth to run,
// avoiding frequent database access
Expand All @@ -29,8 +51,9 @@ impl Settler {
db: Arc<Box<dyn Database>>,
mut prover: ProverChannel,
mut stop_rx: mpsc::Receiver<()>,
worker_interval: Duration,
) -> Result<()> {
let mut ticker = tokio::time::interval(PROOF_INTERVAL);
let mut ticker = tokio::time::interval(worker_interval);
prover.start().await.unwrap();

log::info!("Prove Worker started");
Expand Down Expand Up @@ -141,8 +164,9 @@ impl Settler {
db: Arc<Box<dyn Database>>,
settlement_provider: Arc<Box<dyn Settlement>>,
mut stop_rx: mpsc::Receiver<()>,
worker_interval: Duration,
) -> Result<()> {
let mut ticker = tokio::time::interval(VERIFY_INTERVAL);
let mut ticker = tokio::time::interval(worker_interval);
let bridge_service_client = Client::new();
log::info!("Verify Worker started");
loop {
Expand Down Expand Up @@ -222,8 +246,9 @@ impl Settler {
l2provider: Provider<Http>,
settlement_provider: Arc<Box<dyn Settlement>>,
mut stop_rx: mpsc::Receiver<()>,
worker_interval: Duration,
) -> Result<()> {
let mut ticker = tokio::time::interval(SUBMIT_INTERVAL);
let mut ticker = tokio::time::interval(worker_interval);
log::info!("Submit Worker started");
loop {
tokio::select! {
Expand Down Expand Up @@ -531,7 +556,16 @@ mod tests {

let (tx, rx) = mpsc::channel(1);
let stop_rx = rx;
let submit_worker = Settler::rollup(arc_db, l2provider, arc_settlement_provider, stop_rx);

let conf_path = "configs/settlement.toml";
let config = WorkerConfig::from_conf_path(conf_path).unwrap();
let submit_worker = Settler::rollup(
arc_db,
l2provider,
arc_settlement_provider,
stop_rx,
Duration::from_secs(config.rollup_worker_interval),
);

tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down Expand Up @@ -608,7 +642,15 @@ mod tests {

let (tx, rx) = mpsc::channel(1);
let stop_rx: mpsc::Receiver<()> = rx;
let verify_worker = Settler::verify_worker(arc_db, arc_settlement_provider, stop_rx);

let conf_path = "configs/settlement.toml";
let config = WorkerConfig::from_conf_path(conf_path).unwrap();
let verify_worker = Settler::verify_worker(
arc_db,
arc_settlement_provider,
stop_rx,
Duration::from_secs(config.rollup_worker_interval),
);

tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down Expand Up @@ -640,4 +682,11 @@ mod tests {
);
Ok(())
}

#[test]
fn test_from_conf_path() {
let conf_path = "configs/settlement.toml";
let config = WorkerConfig::from_conf_path(conf_path).unwrap();
println!("{:#?}", config);
}
}

0 comments on commit 80b9154

Please sign in to comment.