diff --git a/crates/batcher/src/block_builder.rs b/crates/batcher/src/block_builder.rs index 9c471f5740..a18a3f64dd 100644 --- a/crates/batcher/src/block_builder.rs +++ b/crates/batcher/src/block_builder.rs @@ -36,7 +36,7 @@ use tracing::{debug, error, info, trace}; use crate::papyrus_state::PapyrusReader; use crate::transaction_executor::TransactionExecutorTrait; -use crate::transaction_provider::{TransactionProvider, TransactionProviderError}; +use crate::transaction_provider::{NextTxs, TransactionProvider, TransactionProviderError}; #[derive(Debug, Error)] pub enum BlockBuilderError { @@ -142,14 +142,18 @@ impl BlockBuilderTrait for BlockBuilder { async fn build_block( &self, deadline: tokio::time::Instant, - tx_provider: Box, + mut tx_provider: Box, output_content_sender: tokio::sync::mpsc::UnboundedSender, ) -> BlockBuilderResult { let mut block_is_full = false; let mut execution_infos = IndexMap::new(); // TODO(yael 6/10/2024): delete the timeout condition once the executor has a timeout while !block_is_full && tokio::time::Instant::now() < deadline { - let next_tx_chunk = tx_provider.get_txs(self.tx_chunk_size).await?; + let next_txs = tx_provider.get_txs(self.tx_chunk_size).await?; + let next_tx_chunk = match next_txs { + NextTxs::Txs(txs) => txs, + NextTxs::End => break, + }; debug!("Got {} transactions from the transaction provider.", next_tx_chunk.len()); if next_tx_chunk.is_empty() { // TODO: Consider what is the best sleep duration. diff --git a/crates/batcher/src/block_builder_test.rs b/crates/batcher/src/block_builder_test.rs index bbb580a7bd..d0aff83130 100644 --- a/crates/batcher/src/block_builder_test.rs +++ b/crates/batcher/src/block_builder_test.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use crate::block_builder::{BlockBuilder, BlockBuilderTrait, BlockExecutionArtifacts}; use crate::test_utils::test_txs; use crate::transaction_executor::MockTransactionExecutorTrait; -use crate::transaction_provider::MockTransactionProvider; +use crate::transaction_provider::{MockTransactionProvider, NextTxs}; const BLOCK_GENERATION_DEADLINE_SECS: u64 = 1; const TX_CHANNEL_SIZE: usize = 50; @@ -144,6 +144,26 @@ fn test_expectations_with_delay( (mock_transaction_executor, mock_tx_provider, expected_block_artifacts) } +fn stream_done_test_expectations( + input_txs: &[Transaction], +) -> (MockTransactionExecutorTrait, MockTransactionProvider, BlockExecutionArtifacts) { + let block_size = input_txs.len(); + let input_txs_cloned = input_txs.to_vec(); + let mut mock_transaction_executor = MockTransactionExecutorTrait::new(); + + mock_transaction_executor + .expect_add_txs_to_block() + .withf(move |blockifier_input| compare_tx_hashes(&input_txs_cloned, blockifier_input)) + .return_once(move |_| (0..block_size).map(|_| Ok(execution_info())).collect()); + + let expected_block_artifacts = + set_close_block_expectations(&mut mock_transaction_executor, block_size); + + let mock_tx_provider = mock_tx_provider_stream_done(input_txs.to_vec()); + + (mock_transaction_executor, mock_tx_provider, expected_block_artifacts) +} + // Fill the executor outputs with some non-default values to make sure the block_builder uses // them. fn block_builder_expected_output(execution_info_len: usize) -> BlockExecutionArtifacts { @@ -180,7 +200,24 @@ fn mock_tx_provider_limited_calls( .expect_get_txs() .times(n_calls) .with(eq(TX_CHUNK_SIZE)) - .returning(move |_n_txs| Ok(input_chunks.remove(0))); + .returning(move |_n_txs| Ok(NextTxs::Txs(input_chunks.remove(0)))); + mock_tx_provider +} + +fn mock_tx_provider_stream_done(input_chunk: Vec) -> MockTransactionProvider { + let mut mock_tx_provider = MockTransactionProvider::new(); + let mut seq = Sequence::new(); + mock_tx_provider + .expect_get_txs() + .times(1) + .in_sequence(&mut seq) + .with(eq(TX_CHUNK_SIZE)) + .returning(move |_n_txs| Ok(NextTxs::Txs(input_chunk.clone()))); + mock_tx_provider + .expect_get_txs() + .times(1) + .in_sequence(&mut seq) + .returning(|_n_txs| Ok(NextTxs::End)); mock_tx_provider } @@ -198,7 +235,10 @@ fn mock_tx_provider_limitless_calls( } fn add_limitless_empty_calls(mock_tx_provider: &mut MockTransactionProvider) { - mock_tx_provider.expect_get_txs().with(eq(TX_CHUNK_SIZE)).returning(|_n_txs| Ok(Vec::new())); + mock_tx_provider + .expect_get_txs() + .with(eq(TX_CHUNK_SIZE)) + .returning(|_n_txs| Ok(NextTxs::Txs(Vec::new()))); } fn compare_tx_hashes(input: &[Transaction], blockifier_input: &[BlockifierTransaction]) -> bool { @@ -247,6 +287,7 @@ async fn run_build_block( #[case::empty_block(0, vec![], empty_block_test_expectations())] #[case::block_full(1, test_txs(0..3), block_full_test_expectations(&input_txs, expected_block_size))] #[case::deadline_reached_after_first_chunk(3, test_txs(0..6), test_expectations_with_delay(&input_txs))] +#[case::stream_done(2, test_txs(0..2), stream_done_test_expectations(&input_txs))] #[tokio::test] async fn test_build_block( #[case] expected_block_size: usize, diff --git a/crates/batcher/src/transaction_provider.rs b/crates/batcher/src/transaction_provider.rs index e3084d197d..e2732d35ff 100644 --- a/crates/batcher/src/transaction_provider.rs +++ b/crates/batcher/src/transaction_provider.rs @@ -11,10 +11,16 @@ pub enum TransactionProviderError { MempoolError(#[from] MempoolClientError), } +#[derive(Debug, PartialEq)] +pub enum NextTxs { + Txs(Vec), + End, +} + #[cfg_attr(test, automock)] #[async_trait] pub trait TransactionProvider: Send + Sync { - async fn get_txs(&self, n_txs: usize) -> Result, TransactionProviderError>; + async fn get_txs(&mut self, n_txs: usize) -> Result; } pub struct ProposeTransactionProvider { @@ -23,8 +29,26 @@ pub struct ProposeTransactionProvider { #[async_trait] impl TransactionProvider for ProposeTransactionProvider { - async fn get_txs(&self, n_txs: usize) -> Result, TransactionProviderError> { + async fn get_txs(&mut self, n_txs: usize) -> Result { // TODO: Get also L1 transactions. - Ok(self.mempool_client.get_txs(n_txs).await?) + Ok(NextTxs::Txs(self.mempool_client.get_txs(n_txs).await?)) + } +} + +pub struct ValidateTransactionProvider { + pub tx_receiver: tokio::sync::mpsc::Receiver, +} + +#[async_trait] +impl TransactionProvider for ValidateTransactionProvider { + async fn get_txs(&mut self, n_txs: usize) -> Result { + let mut buffer = Vec::with_capacity(n_txs); + self.tx_receiver.recv_many(&mut buffer, n_txs).await; + // If the buffer is empty, it means that the stream was dropped, otherwise it would have + // been waiting for transactions. + if buffer.is_empty() { + return Ok(NextTxs::End); + } + Ok(NextTxs::Txs(buffer)) } }