Skip to content

Commit

Permalink
refactor(batcher): extract common logic into a function for validate …
Browse files Browse the repository at this point in the history
…flow
  • Loading branch information
Yael-Starkware committed Nov 12, 2024
1 parent e61c7ce commit 26dcb8a
Showing 1 changed file with 38 additions and 25 deletions.
63 changes: 38 additions & 25 deletions crates/batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, Instrument};

use crate::batcher::BatcherStorageReaderTrait;
use crate::block_builder::{BlockBuilderError, BlockBuilderFactoryTrait, BlockExecutionArtifacts};
use crate::block_builder::{
BlockBuilderError,
BlockBuilderFactoryTrait,
BlockBuilderTrait,
BlockExecutionArtifacts,
};
use crate::transaction_provider::ProposeTransactionProvider;

#[derive(Debug, Error)]
Expand Down Expand Up @@ -171,7 +176,7 @@ impl ProposalManagerTrait for ProposalManager {

let height = self.active_height.expect("No active height.");

let mut block_builder = self.block_builder_factory.create_block_builder(
let block_builder = self.block_builder_factory.create_block_builder(
height,
retrospective_block_hash,
deadline,
Expand All @@ -180,29 +185,7 @@ impl ProposalManagerTrait for ProposalManager {
false,
)?;

let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();

self.active_proposal_handle = Some(tokio::spawn(
async move {
let result = block_builder
.build_block()
.await
.map(ProposalOutput::from)
.map_err(|e| GetProposalResultError::BlockBuilderError(Arc::new(e)));

executed_proposals.lock().await.insert(proposal_id, result);

// The proposal is done, clear the active proposal.
let mut active_proposal = active_proposal.lock().await;
if let Some(current_active_proposal_id) = *active_proposal {
if current_active_proposal_id == proposal_id {
active_proposal.take();
}
}
}
.in_current_span(),
));
self.spawn_build_block_task(proposal_id, block_builder).await;

Ok(())
}
Expand Down Expand Up @@ -264,6 +247,36 @@ impl ProposalManager {
}
}

async fn spawn_build_block_task(
&mut self,
proposal_id: ProposalId,
mut block_builder: Box<dyn BlockBuilderTrait>,
) {
let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();

self.active_proposal_handle = Some(tokio::spawn(
async move {
let result = block_builder
.build_block()
.await
.map(ProposalOutput::from)
.map_err(|e| GetProposalResultError::BlockBuilderError(Arc::new(e)));

executed_proposals.lock().await.insert(proposal_id, result);

// The proposal is done, clear the active proposal.
let mut active_proposal = active_proposal.lock().await;
if let Some(current_active_proposal_id) = *active_proposal {
if current_active_proposal_id == proposal_id {
active_proposal.take();
}
}
}
.in_current_span(),
));
}

async fn reset_active_height(&mut self) {
if let Some(_active_proposal) = self.active_proposal.lock().await.take() {
// TODO: Abort the block_builder.
Expand Down

0 comments on commit 26dcb8a

Please sign in to comment.