Skip to content

Commit

Permalink
feat(batcher): add validate support to build_block
Browse files Browse the repository at this point in the history
  • Loading branch information
Yael-Starkware committed Nov 11, 2024
1 parent 9e35673 commit 1b9e37f
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 19 deletions.
23 changes: 17 additions & 6 deletions crates/batcher/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum BlockBuilderError {
TransactionExecutionError(#[from] BlockifierTransactionExecutionError),
#[error(transparent)]
StreamTransactionsError(#[from] tokio::sync::mpsc::error::SendError<Transaction>),
#[error("Build block with fail_on_err mode, failed on error {}.", _0)]
FailOnError(BlockifierTransactionExecutorError),
}

pub type BlockBuilderResult<T> = Result<T, BlockBuilderError>;
Expand All @@ -79,7 +81,8 @@ pub trait BlockBuilderTrait: Send {
&self,
deadline: tokio::time::Instant,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<BlockExecutionArtifacts>;
}

Expand All @@ -101,7 +104,8 @@ impl BlockBuilderTrait for BlockBuilder {
&self,
deadline: tokio::time::Instant,
mut tx_provider: Box<dyn TransactionProvider>,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<BlockExecutionArtifacts> {
let mut block_is_full = false;
let mut execution_infos = IndexMap::new();
Expand Down Expand Up @@ -131,6 +135,7 @@ impl BlockBuilderTrait for BlockBuilder {
results,
&mut execution_infos,
&output_content_sender,
fail_on_err,
)
.await?;
}
Expand All @@ -150,22 +155,28 @@ async fn collect_execution_results_and_stream_txs(
tx_chunk: Vec<Transaction>,
results: Vec<TransactionExecutorResult<TransactionExecutionInfo>>,
execution_infos: &mut IndexMap<TransactionHash, TransactionExecutionInfo>,
output_content_sender: &tokio::sync::mpsc::UnboundedSender<Transaction>,
output_content_sender: &Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<bool> {
for (input_tx, result) in tx_chunk.into_iter().zip(results.into_iter()) {
match result {
Ok(tx_execution_info) => {
execution_infos.insert(input_tx.tx_hash(), tx_execution_info);
output_content_sender.send(input_tx)?;
if let Some(output_content_sender) = output_content_sender {
output_content_sender.send(input_tx)?;
}
}
// TODO(yael 18/9/2024): add timeout error handling here once this
// feature is added.
Err(BlockifierTransactionExecutorError::BlockFull) => {
Err(BlockifierTransactionExecutorError::BlockFull) if !fail_on_err => {
info!("Block is full");
return Ok(true);
}
Err(err) => {
debug!("Transaction {:?} failed with error: {}.", input_tx, err)
debug!("Transaction {:?} failed with error: {}.", input_tx, err);
if fail_on_err {
return Err(BlockBuilderError::FailOnError(err));
}
}
}
}
Expand Down
71 changes: 61 additions & 10 deletions crates/batcher/src/block_builder_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use blockifier::blockifier::transaction_executor::TransactionExecutorError;
use assert_matches::assert_matches;
use blockifier::blockifier::transaction_executor::{
TransactionExecutorError,
TransactionExecutorError as BlockifierTransactionExecutorError,
};
use blockifier::bouncer::BouncerWeights;
use blockifier::transaction::objects::TransactionExecutionInfo;
use blockifier::transaction::transaction_execution::Transaction as BlockifierTransaction;
Expand All @@ -11,7 +15,13 @@ use starknet_api::felt;
use starknet_api::transaction::TransactionHash;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

use crate::block_builder::{BlockBuilder, BlockBuilderTrait, BlockExecutionArtifacts};
use crate::block_builder::{
BlockBuilder,
BlockBuilderError,
BlockBuilderResult,
BlockBuilderTrait,
BlockExecutionArtifacts,
};
use crate::test_utils::test_txs;
use crate::transaction_executor::MockTransactionExecutorTrait;
use crate::transaction_provider::{MockTransactionProvider, NextTxs};
Expand Down Expand Up @@ -44,6 +54,18 @@ fn one_chunk_test_expectations(
input_txs: &[Transaction],
) -> (MockTransactionExecutorTrait, MockTransactionProvider, BlockExecutionArtifacts) {
let block_size = input_txs.len();
let (mock_transaction_executor, expected_block_artifacts) =
one_chunk_mock_executor(input_txs, block_size);

let mock_tx_provider = mock_tx_provider_limitless_calls(1, vec![input_txs.to_vec()]);

(mock_transaction_executor, mock_tx_provider, expected_block_artifacts)
}

fn one_chunk_mock_executor(
input_txs: &[Transaction],
block_size: usize,
) -> (MockTransactionExecutorTrait, BlockExecutionArtifacts) {
let input_txs_cloned = input_txs.to_vec();
let mut mock_transaction_executor = MockTransactionExecutorTrait::new();

Expand All @@ -54,10 +76,7 @@ fn one_chunk_test_expectations(

let expected_block_artifacts =
set_close_block_expectations(&mut mock_transaction_executor, block_size);

let mock_tx_provider = mock_tx_provider_limitless_calls(1, vec![input_txs.to_vec()]);

(mock_transaction_executor, mock_tx_provider, expected_block_artifacts)
(mock_transaction_executor, expected_block_artifacts)
}

fn two_chunks_test_expectations(
Expand Down Expand Up @@ -271,13 +290,14 @@ async fn verify_build_block_output(
async fn run_build_block(
mock_transaction_executor: MockTransactionExecutorTrait,
tx_provider: MockTransactionProvider,
output_sender: UnboundedSender<Transaction>,
) -> BlockExecutionArtifacts {
output_sender: Option<UnboundedSender<Transaction>>,
fail_on_err: bool,
) -> BlockBuilderResult<BlockExecutionArtifacts> {
let block_builder = BlockBuilder::new(Box::new(mock_transaction_executor), TX_CHUNK_SIZE);
let deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(BLOCK_GENERATION_DEADLINE_SECS);

block_builder.build_block(deadline, Box::new(tx_provider), output_sender).await.unwrap()
block_builder.build_block(deadline, Box::new(tx_provider), output_sender, fail_on_err).await
}

// TODO: Add test case for failed transaction.
Expand All @@ -303,7 +323,9 @@ async fn test_build_block(
let (output_tx_sender, output_tx_receiver) = output_channel();

let result_block_artifacts =
run_build_block(mock_transaction_executor, mock_tx_provider, output_tx_sender).await;
run_build_block(mock_transaction_executor, mock_tx_provider, Some(output_tx_sender), false)
.await
.unwrap();

verify_build_block_output(
input_txs,
Expand All @@ -314,3 +336,32 @@ async fn test_build_block(
)
.await;
}

#[tokio::test]
async fn test_validate_block() {
let input_txs = test_txs(0..3);
let (mock_transaction_executor, expected_block_artifacts) =
one_chunk_mock_executor(&input_txs, input_txs.len());
let mock_tx_provider = mock_tx_provider_stream_done(input_txs);

let result_block_artifacts =
run_build_block(mock_transaction_executor, mock_tx_provider, None, true).await.unwrap();

assert_eq!(result_block_artifacts, expected_block_artifacts);
}

#[tokio::test]
async fn test_validate_block_with_error() {
let input_txs = test_txs(0..3);
let expected_block_size = 1;
let (mock_transaction_executor, mock_tx_provider, _) =
block_full_test_expectations(&input_txs, expected_block_size);

let result =
run_build_block(mock_transaction_executor, mock_tx_provider, None, true).await.unwrap_err();

assert_matches!(
result,
BlockBuilderError::FailOnError(BlockifierTransactionExecutorError::BlockFull)
);
}
2 changes: 1 addition & 1 deletion crates/batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl ProposalManagerTrait for ProposalManager {
self.active_proposal_handle = Some(tokio::spawn(
async move {
let result = block_builder
.build_block(deadline, Box::new(tx_provider), tx_sender.clone())
.build_block(deadline, Box::new(tx_provider), Some(tx_sender.clone()), false)
.await
.map(ProposalOutput::from)
.map_err(|e| GetProposalResultError::BlockBuilderError(Arc::new(e)));
Expand Down
4 changes: 2 additions & 2 deletions crates/batcher/src/proposal_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl MockDependencies {
let mut mock_block_builder = MockBlockBuilderTrait::new();
mock_block_builder
.expect_build_block()
.return_once(move |_, _, _| Ok(BlockExecutionArtifacts::create_for_testing()));
.return_once(move |_, _, _, _| Ok(BlockExecutionArtifacts::create_for_testing()));
Ok(Box::new(mock_block_builder))
};

Expand All @@ -63,7 +63,7 @@ impl MockDependencies {
fn expect_long_build_block(&mut self, times: usize) {
let simulate_long_build_block = || -> BlockBuilderResult<Box<dyn BlockBuilderTrait>> {
let mut mock_block_builder = MockBlockBuilderTrait::new();
mock_block_builder.expect_build_block().return_once(move |_, _, _| {
mock_block_builder.expect_build_block().return_once(move |_, _, _, _| {
std::thread::sleep(BLOCK_GENERATION_TIMEOUT * 10);
Ok(BlockExecutionArtifacts::create_for_testing())
});
Expand Down

0 comments on commit 1b9e37f

Please sign in to comment.