Skip to content

Commit

Permalink
refactor(batcher): extract common logic into a function for validate …
Browse files Browse the repository at this point in the history
…flow
  • Loading branch information
Yael-Starkware committed Nov 12, 2024
1 parent 4276a78 commit 5bd1781
Showing 1 changed file with 34 additions and 25 deletions.
59 changes: 34 additions & 25 deletions crates/batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, Instrument};

use crate::batcher::BatcherStorageReaderTrait;
use crate::block_builder::{BlockBuilderError, BlockBuilderFactoryTrait, BlockExecutionArtifacts};
use crate::block_builder::{BlockBuilderError, BlockBuilderFactoryTrait, BlockBuilderTrait, BlockExecutionArtifacts};
use crate::transaction_provider::ProposeTransactionProvider;

#[derive(Debug, Error)]
Expand Down Expand Up @@ -173,7 +173,7 @@ impl ProposalManagerTrait for ProposalManager {

let tx_provider =
ProposeTransactionProvider { mempool_client: self.mempool_client.clone() };
let mut block_builder = self.block_builder_factory.create_block_builder(
let block_builder = self.block_builder_factory.create_block_builder(
height,
retrospective_block_hash,
deadline,
Expand All @@ -182,29 +182,7 @@ impl ProposalManagerTrait for ProposalManager {
false,
)?;

let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();

self.active_proposal_handle = Some(tokio::spawn(
async move {
let result = block_builder
.build_block()
.await
.map(ProposalOutput::from)
.map_err(|e| GetProposalResultError::BlockBuilderError(Arc::new(e)));

executed_proposals.lock().await.insert(proposal_id, result);

// The proposal is done, clear the active proposal.
let mut active_proposal = active_proposal.lock().await;
if let Some(current_active_proposal_id) = *active_proposal {
if current_active_proposal_id == proposal_id {
active_proposal.take();
}
}
}
.in_current_span(),
));
self.spawn_build_block_task(proposal_id, block_builder).await?;

Ok(())
}
Expand Down Expand Up @@ -268,6 +246,37 @@ impl ProposalManager {
}
}

async fn spawn_build_block_task(
&mut self,
proposal_id: ProposalId,
mut block_builder: Box<dyn BlockBuilderTrait>,
) -> Result<(), BuildProposalError> {
let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();

self.active_proposal_handle = Some(tokio::spawn(
async move {
let result = block_builder
.build_block()
.await
.map(ProposalOutput::from)
.map_err(|e| GetProposalResultError::BlockBuilderError(Arc::new(e)));

executed_proposals.lock().await.insert(proposal_id, result);

// The proposal is done, clear the active proposal.
let mut active_proposal = active_proposal.lock().await;
if let Some(current_active_proposal_id) = *active_proposal {
if current_active_proposal_id == proposal_id {
active_proposal.take();
}
}
}
.in_current_span(),
));
Ok(())
}

async fn reset_active_height(&mut self) {
if let Some(_active_proposal) = self.active_proposal.lock().await.take() {
// TODO: Abort the block_builder.
Expand Down

0 comments on commit 5bd1781

Please sign in to comment.