Skip to content

Commit

Permalink
fix: debug the operator
Browse files Browse the repository at this point in the history
  • Loading branch information
eigmax committed Apr 22, 2024
1 parent 33f6b16 commit 7826a85
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 85 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ rm -rf /tmp/chain
reth init --datadir /tmp/chain --chain testdata/chain.json
RUST_LOG="debug,evm=trace,consensus::auto=trace,consensus::engine=trace,rpc::eth=trace" reth node -d --chain testdata/chain.json --datadir /tmp/chain --auto-mine --http --http.port 8546 --http.api debug,eth,net,trace,web3,rpc
RUST_LOG="rpc::eth=trace" RETH_DB_PATH=/tmp/chain cargo run -r
RUST_LOG="rpc::eth=trace" ZETH_DB_PATH=/tmp/chain ZETH_OPERATOR_DB=/tmp/operator PROVER_ADDR=localhost:50061 ZETH_L1_ADDR=http://localhost:8546 HOST=0.0.0.0:8182 cargo run -r
```


Expand Down
36 changes: 31 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ mod batchproposer;

use crate::rpc::eigen::EigenRpcExtApiServer;

use tokio::select;
use tokio::signal::unix::signal;
use tokio::signal::unix::SignalKind;
use tokio::sync::mpsc;

/// A custom payload attributes type.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct CustomPayloadAttributes {
Expand Down Expand Up @@ -342,11 +347,31 @@ where

#[tokio::main]
async fn main() -> eyre::Result<()> {
env_logger::init();
let db_path = std::env::var("ZETH_OPERATOR_DB")?;
let l1addr = std::env::var("ZETH_L1_ADDR")?;
let op_task = tokio::spawn(async move {
let mut op = operator::Operator::new(&db_path, &l1addr);
op.start();
let prover_addr = std::env::var("PROVER_ADDR").unwrap_or("http://127.0.0.1:50061".to_string());
let mut op = operator::Operator::new(&db_path, &l1addr, &prover_addr);

let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigint = signal(SignalKind::interrupt()).unwrap();

let (stop_tx, stop_rx) = mpsc::channel::<()>(1);

tokio::spawn(async move {
loop {
select! {
_ = sigterm.recv() => {
println!("Recieve SIGTERM");
break;
}
_ = sigint.recv() => {
println!("Recieve SIGTERM");
break;
}
};
}
stop_tx.send(()).await.unwrap();
});

let _guard = RethTracer::new().init()?;
Expand Down Expand Up @@ -390,8 +415,9 @@ async fn main() -> eyre::Result<()> {
println!("Node started");
let _handle = server_args.start(server).await?;

//futures::future::pending::<()>().await;
tokio::join!(op_task);
// futures::future::pending::<()>().await;

op.run(stop_rx).await;
Ok(())

/*
Expand Down
94 changes: 58 additions & 36 deletions src/operator.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,80 @@
//! Initialize all components of the eigen-zeth full node.
//! They will be launched in Run CMD.
use crate::prover::ProverProvider;

use std::{
borrow::Cow,
io::Write,
sync::{Arc, Barrier},
thread::{self, JoinHandle},
};
use tokio::select;
use tokio::signal::unix::signal;
use tokio::signal::unix::SignalKind;
use tokio::sync::{
mpsc::{self, Receiver, Sender},
oneshot, watch,
};
use tokio::task;
use tokio::time::{interval, sleep, Duration, Instant};
use ethers_providers::{Provider, Http, Middleware, Ws};
use ethers_core::types::Address;

use crate::db::{Database, lfs};
use crate::prover::ProverChannel;
use ethers_providers::{Http, Provider};

use tokio::sync::mpsc::{self, Receiver};
use tokio::time::{interval, Duration};

use crate::db::{lfs, Database};

pub(crate) struct Operator {
db: Box<dyn Database>,
prover: ProverProvider,
prover: ProverChannel,
settler: Provider<Http>,
rx_proof: Receiver<Vec<u8>>,
}

impl Operator {
pub fn new(dir: &str, l1addr: &str) -> Self {
let prover = ProverProvider::new();
pub fn new(_db_path: &str, l1addr: &str, prover_addr: &str) -> Self {
let (sx, rx) = mpsc::channel(10);
let prover = ProverChannel::new(&prover_addr, sx);
let db = lfs::open_db(lfs::DBConfig::Memory).unwrap();
// TODO: abstract this in the settlement
let settler = Provider::<Http>::try_from(l1addr).unwrap();
Operator{ prover, db, settler }

Operator {
prover,
db,
settler,
rx_proof: rx,
}
}

pub async fn start(&mut self) {
pub async fn run(&mut self, mut stop_channel: Receiver<()>) {
let mut ticker = interval(Duration::from_millis(1000));
let batch_key = "next_batch".to_string().as_bytes().to_vec();
let proof_key = "batch_proof".to_string().as_bytes().to_vec();

loop {
select! {
tokio::select! {
_ = ticker.tick() => {
let block_or_none = self.settler.get_block(self.prover.current_batch()).await.unwrap();
if let Some(block) = block_or_none {
//write the new block to DB
if let Some(no) = block.number {
self.prover.prove(no.as_u64()).await.unwrap();
}
}
let next_batch = self.db.get(&batch_key);
let current_batch = self.prover.get_current_batch();
log::debug!("fetch block {:?}, {:?}", next_batch, current_batch);

match (next_batch, current_batch){
(None, None) => {
// insert the first block
self.db.put(batch_key.clone(), 1_u64.to_be_bytes().to_vec());
},
(Some(no), None) => {
let block_no = u64::from_be_bytes(no.try_into().unwrap());
self.prover.execute(block_no).await.unwrap();
},
(None, Some(no)) => todo!("Invalid branch, block: {no}"),
(Some(next), Some(cur)) => {
let block_no = u64::from_be_bytes(next.try_into().unwrap());
log::debug!("next: {block_no}, current: {cur}");
},
};
}
proof_data = self.rx_proof.recv() => {
log::debug!("fetch proof: {:?}", proof_data);
self.db.put(proof_key.clone(), proof_data.unwrap());

}
// trigger the next task
if let Some(current_batch) = self.db.get(&batch_key) {
let block_no = u64::from_be_bytes(current_batch.try_into().unwrap());
self.prover.execute(block_no).await.unwrap();
} else {
log::debug!("Wait for the new task coming in");
}
}
_ = stop_channel.recv() => {
break;
}
};
}
}
}

2 changes: 1 addition & 1 deletion src/prover/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub(crate) mod provider;
pub use provider::ProverProvider;
pub use provider::ProverChannel;
69 changes: 27 additions & 42 deletions src/prover/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::prover::provider::prover_service::{
Batch, GenAggregatedProofRequest, GenBatchProofRequest, GenFinalProofRequest, ProofResultCode,
ProverRequest,
};
use prost::Message;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -22,51 +23,20 @@ pub mod prover_service {
tonic::include_proto!("prover.v1"); // The string specified here must match the proto package name
}

/// ProverProvider is used to generate proof for the batch
pub struct ProverProvider {
/// prove SMT, Call the Prover step by step to generate proof for the batch
inner: ProverChannel,
}

impl ProverProvider {
pub fn new() -> Self {
let addr = std::env::var("PROVER_ADDR").unwrap_or("http://127.0.0.1:50051".to_string());
ProverProvider {
inner: ProverChannel::new(addr),
}
}

/// start the prover
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
self.inner.start().await
}

/// stop the prover
pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
self.inner.stop().await
}

/// generate proof for the batch
pub async fn prove(&mut self, batch: BlockNumber) -> Result<(), Box<dyn std::error::Error>> {
self.inner.execute(batch).await
}

/// get current batch number
pub fn current_batch(&self) -> u64 {
self.inner.current_batch.unwrap() as u64
}
}

/// ProverChannel ...
pub struct ProverChannel {
step: ProveStep,
/// the current batch to prove
current_batch: Option<BlockNumber>,
parent_batch: Option<BlockNumber>,
/// the endpoint to communicate with the prover
endpoint: ProverEndpoint,

/// used to receive response from the endpoint
response_receiver: Receiver<ResponseType>,

/// final proof
final_proof_sender: Sender<Vec<u8>>,
}

type BlockNumber = u64;
Expand All @@ -85,13 +55,15 @@ enum ProveStep {
}

impl ProverChannel {
pub fn new(addr: String) -> Self {
pub fn new(addr: &str, sender: Sender<Vec<u8>>) -> Self {
let (response_sender, response_receiver) = mpsc::channel(10);
ProverChannel {
step: ProveStep::Start,
current_batch: None,
parent_batch: None,
endpoint: ProverEndpoint::new(addr, response_sender),
response_receiver,
final_proof_sender: sender,
}
}

Expand All @@ -106,12 +78,12 @@ impl ProverChannel {
}

pub async fn execute(&mut self, batch: BlockNumber) -> Result<(), Box<dyn std::error::Error>> {
self.set_current_batch(batch).await?;
self.set_current_batch(batch)?;

// return proof for the batch
self.entry_step().await?;

self.clean_current_batch().await?;
self.clean_current_batch()?;

Ok(())
}
Expand Down Expand Up @@ -209,6 +181,13 @@ impl ProverChannel {
if gen_final_proof_response.result_code
== ProofResultCode::CompletedOk as i32
{
let mut final_proof = vec![];
gen_final_proof_response
.final_proof
.unwrap()
.encode(&mut final_proof)
.unwrap();
self.final_proof_sender.send(final_proof).await?;
ProveStep::End
} else {
// TODO: return error
Expand All @@ -233,18 +212,24 @@ impl ProverChannel {
}
}

pub async fn set_current_batch(
pub fn set_current_batch(
&mut self,
batch: BlockNumber,
) -> Result<(), Box<dyn std::error::Error>> {
self.parent_batch = self.current_batch.clone();
self.current_batch = Some(batch);
Ok(())
}

pub async fn clean_current_batch(&mut self) -> Result<(), Box<dyn std::error::Error>> {
pub fn clean_current_batch(&mut self) -> Result<(), Box<dyn std::error::Error>> {
self.parent_batch = self.current_batch.clone();
self.current_batch = None;
Ok(())
}

pub fn get_current_batch(&self) -> Option<u64> {
self.current_batch
}
}

/// ProverEndpoint used to communicate with the prover
Expand All @@ -265,11 +250,11 @@ pub struct ProverEndpoint {
}

impl ProverEndpoint {
pub fn new(addr: String, response_sender: Sender<ResponseType>) -> Self {
pub fn new(addr: &str, response_sender: Sender<ResponseType>) -> Self {
let (request_sender, request_receiver) = mpsc::channel(10);
let (stop_tx, stop_rx) = mpsc::channel(1);
ProverEndpoint {
addr,
addr: addr.to_string(),
request_sender,
request_receiver: Some(request_receiver),
stop_endpoint_tx: stop_tx,
Expand Down
1 change: 1 addition & 0 deletions src/settlement/ethereum/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

0 comments on commit 7826a85

Please sign in to comment.