Skip to content

Commit

Permalink
refactor(batcher): move build_block parameter into the block_builder …
Browse files Browse the repository at this point in the history
…object
  • Loading branch information
Yael-Starkware committed Nov 11, 2024
1 parent c58da37 commit f5917a3
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 30 deletions.
63 changes: 42 additions & 21 deletions crates/batcher/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,41 +77,47 @@ pub struct BlockExecutionArtifacts {
#[cfg_attr(test, automock)]
#[async_trait]
pub trait BlockBuilderTrait: Send {
async fn build_block(
&self,
deadline: tokio::time::Instant,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<BlockExecutionArtifacts>;
async fn build_block(&mut self) -> BlockBuilderResult<BlockExecutionArtifacts>;
}

pub struct BlockBuilder {
// TODO(Yael 14/10/2024): make the executor thread safe and delete this mutex.
executor: Mutex<Box<dyn TransactionExecutorTrait>>,
tx_chunk_size: usize,
deadline: tokio::time::Instant,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
}

impl BlockBuilder {
pub fn new(executor: Box<dyn TransactionExecutorTrait>, tx_chunk_size: usize) -> Self {
Self { executor: Mutex::new(executor), tx_chunk_size }
pub fn new(
executor: Box<dyn TransactionExecutorTrait>,
tx_chunk_size: usize,
deadline: tokio::time::Instant,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> Self {
Self {
executor: Mutex::new(executor),
tx_chunk_size,
deadline,
tx_provider,
output_content_sender,
fail_on_err,
}
}
}

#[async_trait]
impl BlockBuilderTrait for BlockBuilder {
async fn build_block(
&self,
deadline: tokio::time::Instant,
mut tx_provider: Box<dyn TransactionProvider>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<BlockExecutionArtifacts> {
async fn build_block(&mut self) -> BlockBuilderResult<BlockExecutionArtifacts> {
let mut block_is_full = false;
let mut execution_infos = IndexMap::new();
// TODO(yael 6/10/2024): delete the timeout condition once the executor has a timeout
while !block_is_full && tokio::time::Instant::now() < deadline {
let next_txs = tx_provider.get_txs(self.tx_chunk_size).await?;
while !block_is_full && tokio::time::Instant::now() < self.deadline {
let next_txs = self.tx_provider.get_txs(self.tx_chunk_size).await?;
let next_tx_chunk = match next_txs {
NextTxs::Txs(txs) => txs,
NextTxs::End => break,
Expand All @@ -134,8 +140,8 @@ impl BlockBuilderTrait for BlockBuilder {
next_tx_chunk,
results,
&mut execution_infos,
&output_content_sender,
fail_on_err,
&self.output_content_sender,
self.fail_on_err,
)
.await?;
}
Expand Down Expand Up @@ -190,6 +196,10 @@ pub trait BlockBuilderFactoryTrait {
&self,
height: BlockNumber,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<Box<dyn BlockBuilderTrait>>;
}

Expand Down Expand Up @@ -313,9 +323,20 @@ impl BlockBuilderFactoryTrait for BlockBuilderFactory {
&self,
height: BlockNumber,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<Box<dyn BlockBuilderTrait>> {
let executor =
self.preprocess_and_create_transaction_executor(height, retrospective_block_hash)?;
Ok(Box::new(BlockBuilder::new(Box::new(executor), self.block_builder_config.tx_chunk_size)))
Ok(Box::new(BlockBuilder::new(
Box::new(executor),
self.block_builder_config.tx_chunk_size,
deadline,
tx_provider,
output_content_sender,
fail_on_err,
)))
}
}
11 changes: 9 additions & 2 deletions crates/batcher/src/block_builder_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,18 @@ async fn run_build_block(
output_sender: Option<UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<BlockExecutionArtifacts> {
let block_builder = BlockBuilder::new(Box::new(mock_transaction_executor), TX_CHUNK_SIZE);
let deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(BLOCK_GENERATION_DEADLINE_SECS);
let mut block_builder = BlockBuilder::new(
Box::new(mock_transaction_executor),
TX_CHUNK_SIZE,
deadline,
Box::new(tx_provider),
output_sender,
fail_on_err,
);

block_builder.build_block(deadline, Box::new(tx_provider), output_sender, fail_on_err).await
block_builder.build_block().await
}

// TODO: Add test case for failed transaction.
Expand Down
13 changes: 10 additions & 3 deletions crates/batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,25 @@ impl ProposalManagerTrait for ProposalManager {
info!("Starting generation of a new proposal with id {}.", proposal_id);

let height = self.active_height.expect("No active height.");
let block_builder =
self.block_builder_factory.create_block_builder(height, retrospective_block_hash)?;

let tx_provider =
ProposeTransactionProvider { mempool_client: self.mempool_client.clone() };
let mut block_builder = self.block_builder_factory.create_block_builder(
height,
retrospective_block_hash,
deadline,
Box::new(tx_provider),
Some(tx_sender.clone()),
false,
)?;

let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();

self.active_proposal_handle = Some(tokio::spawn(
async move {
let result = block_builder
.build_block(deadline, Box::new(tx_provider), Some(tx_sender.clone()), false)
.build_block()
.await
.map(ProposalOutput::from)
.map_err(|e| GetProposalResultError::BlockBuilderError(Arc::new(e)));
Expand Down
8 changes: 4 additions & 4 deletions crates/batcher/src/proposal_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ impl MockDependencies {
let mut mock_block_builder = MockBlockBuilderTrait::new();
mock_block_builder
.expect_build_block()
.return_once(move |_, _, _, _| Ok(BlockExecutionArtifacts::create_for_testing()));
.return_once(move || Ok(BlockExecutionArtifacts::create_for_testing()));
Ok(Box::new(mock_block_builder))
};

self.block_builder_factory
.expect_create_block_builder()
.times(times)
.returning(move |_, _| simulate_build_block());
.returning(move |_, _, _, _, _, _| simulate_build_block());
}

// This function simulates a long build block operation. This is required for a test that
// tries to run other operations while a block is being built.
fn expect_long_build_block(&mut self, times: usize) {
let simulate_long_build_block = || -> BlockBuilderResult<Box<dyn BlockBuilderTrait>> {
let mut mock_block_builder = MockBlockBuilderTrait::new();
mock_block_builder.expect_build_block().return_once(move |_, _, _, _| {
mock_block_builder.expect_build_block().return_once(move || {
std::thread::sleep(BLOCK_GENERATION_TIMEOUT * 10);
Ok(BlockExecutionArtifacts::create_for_testing())
});
Expand All @@ -73,7 +73,7 @@ impl MockDependencies {
self.block_builder_factory
.expect_create_block_builder()
.times(times)
.returning(move |_, _| simulate_long_build_block());
.returning(move |_, _, _, _, _, _| simulate_long_build_block());
}
}

Expand Down

0 comments on commit f5917a3

Please sign in to comment.