diff --git a/crates/batcher/src/batcher.rs b/crates/batcher/src/batcher.rs index 6855c4d481..cbd28092dd 100644 --- a/crates/batcher/src/batcher.rs +++ b/crates/batcher/src/batcher.rs @@ -37,7 +37,7 @@ use crate::proposal_manager::{ ProposalOutput, StartHeightError, }; -use crate::transaction_provider::DummyL1ProviderClient; +use crate::transaction_provider::{DummyL1ProviderClient, ProposeTransactionProvider}; struct Proposal { tx_stream: OutputStream, @@ -87,6 +87,11 @@ impl Batcher { )?); let (tx_sender, tx_receiver) = tokio::sync::mpsc::unbounded_channel(); + let tx_provider = ProposeTransactionProvider { + mempool_client: self.mempool_client.clone(), + // TODO: use a real L1 provider client. + l1_provider_client: Arc::new(DummyL1ProviderClient), + }; self.proposal_manager .build_block_proposal( @@ -94,6 +99,7 @@ impl Batcher { build_proposal_input.retrospective_block_hash, deadline, tx_sender, + tx_provider, ) .await .map_err(BatcherError::from)?; @@ -184,7 +190,6 @@ impl Batcher { } pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher { - let l1_provider_client = Arc::new(DummyL1ProviderClient); let (storage_reader, storage_writer) = papyrus_storage::open_storage(config.storage.clone()) .expect("Failed to open batcher's storage"); @@ -195,12 +200,8 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient }); let storage_reader = Arc::new(storage_reader); let storage_writer = Box::new(storage_writer); - let proposal_manager = Box::new(ProposalManager::new( - l1_provider_client, - mempool_client.clone(), - block_builder_factory, - storage_reader.clone(), - )); + let proposal_manager = + Box::new(ProposalManager::new(block_builder_factory, storage_reader.clone())); Batcher::new(config, storage_reader, storage_writer, mempool_client, proposal_manager) } diff --git a/crates/batcher/src/batcher_test.rs b/crates/batcher/src/batcher_test.rs index 7ab6c0a19a..96166f9aee 100644 --- a/crates/batcher/src/batcher_test.rs +++ b/crates/batcher/src/batcher_test.rs @@ -40,6 +40,7 @@ use crate::proposal_manager::{ StartHeightError, }; use crate::test_utils::test_txs; +use crate::transaction_provider::ProposeTransactionProvider; const INITIAL_HEIGHT: BlockNumber = BlockNumber(3); const STREAMING_CHUNK_SIZE: usize = 3; @@ -85,7 +86,7 @@ async fn get_stream_content( let mut proposal_manager = MockProposalManagerTraitWrapper::new(); proposal_manager.expect_wrap_start_height().return_once(|_| async { Ok(()) }.boxed()); proposal_manager.expect_wrap_build_block_proposal().return_once( - move |_proposal_id, _block_hash, _deadline, tx_sender| { + move |_proposal_id, _block_hash, _deadline, tx_sender, _tx_provider| { simulate_build_block_proposal(tx_sender, txs_to_stream).boxed() }, ); @@ -248,6 +249,7 @@ trait ProposalManagerTraitWrapper: Send + Sync { retrospective_block_hash: Option, deadline: tokio::time::Instant, output_content_sender: tokio::sync::mpsc::UnboundedSender, + tx_provider: ProposeTransactionProvider, ) -> BoxFuture<'_, Result<(), BuildProposalError>>; fn wrap_take_proposal_result( @@ -273,12 +275,14 @@ impl ProposalManagerTrait for T { retrospective_block_hash: Option, deadline: tokio::time::Instant, output_content_sender: tokio::sync::mpsc::UnboundedSender, + tx_provider: ProposeTransactionProvider, ) -> Result<(), BuildProposalError> { self.wrap_build_block_proposal( proposal_id, retrospective_block_hash, deadline, output_content_sender, + tx_provider, ) .await } diff --git a/crates/batcher/src/block_builder.rs b/crates/batcher/src/block_builder.rs index e7463d8b7d..dc72f43580 100644 --- a/crates/batcher/src/block_builder.rs +++ b/crates/batcher/src/block_builder.rs @@ -76,9 +76,8 @@ pub struct BlockExecutionArtifacts { #[async_trait] pub trait BlockBuilderTrait: Send { async fn build_block( - &self, + &mut self, deadline: tokio::time::Instant, - tx_provider: Box, output_content_sender: tokio::sync::mpsc::UnboundedSender, ) -> BlockBuilderResult; } @@ -87,27 +86,31 @@ pub struct BlockBuilder { // TODO(Yael 14/10/2024): make the executor thread safe and delete this mutex. executor: Mutex>, tx_chunk_size: usize, + tx_provider: Box, } impl BlockBuilder { - pub fn new(executor: Box, tx_chunk_size: usize) -> Self { - Self { executor: Mutex::new(executor), tx_chunk_size } + pub fn new( + executor: Box, + tx_chunk_size: usize, + tx_provider: Box, + ) -> Self { + Self { executor: Mutex::new(executor), tx_chunk_size, tx_provider } } } #[async_trait] impl BlockBuilderTrait for BlockBuilder { async fn build_block( - &self, + &mut self, deadline: tokio::time::Instant, - mut tx_provider: Box, output_content_sender: tokio::sync::mpsc::UnboundedSender, ) -> BlockBuilderResult { let mut block_is_full = false; let mut execution_infos = IndexMap::new(); // TODO(yael 6/10/2024): delete the timeout condition once the executor has a timeout while !block_is_full && tokio::time::Instant::now() < deadline { - let next_txs = tx_provider.get_txs(self.tx_chunk_size).await?; + let next_txs = self.tx_provider.get_txs(self.tx_chunk_size).await?; let next_tx_chunk = match next_txs { NextTxs::Txs(txs) => txs, NextTxs::End => break, @@ -179,6 +182,7 @@ pub trait BlockBuilderFactoryTrait { &self, height: BlockNumber, retrospective_block_hash: Option, + tx_provider: Box, ) -> BlockBuilderResult>; } @@ -302,9 +306,14 @@ impl BlockBuilderFactoryTrait for BlockBuilderFactory { &self, height: BlockNumber, retrospective_block_hash: Option, + tx_provider: Box, ) -> BlockBuilderResult> { let executor = self.preprocess_and_create_transaction_executor(height, retrospective_block_hash)?; - Ok(Box::new(BlockBuilder::new(Box::new(executor), self.block_builder_config.tx_chunk_size))) + Ok(Box::new(BlockBuilder::new( + Box::new(executor), + self.block_builder_config.tx_chunk_size, + tx_provider, + ))) } } diff --git a/crates/batcher/src/block_builder_test.rs b/crates/batcher/src/block_builder_test.rs index d0aff83130..49835e37f5 100644 --- a/crates/batcher/src/block_builder_test.rs +++ b/crates/batcher/src/block_builder_test.rs @@ -273,11 +273,15 @@ async fn run_build_block( tx_provider: MockTransactionProvider, output_sender: UnboundedSender, ) -> BlockExecutionArtifacts { - let block_builder = BlockBuilder::new(Box::new(mock_transaction_executor), TX_CHUNK_SIZE); + let mut block_builder = BlockBuilder::new( + Box::new(mock_transaction_executor), + TX_CHUNK_SIZE, + Box::new(tx_provider), + ); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(BLOCK_GENERATION_DEADLINE_SECS); - block_builder.build_block(deadline, Box::new(tx_provider), output_sender).await.unwrap() + block_builder.build_block(deadline, output_sender).await.unwrap() } // TODO: Add test case for failed transaction. diff --git a/crates/batcher/src/proposal_manager.rs b/crates/batcher/src/proposal_manager.rs index 2cc5bdc37e..ad8e4ec1c9 100644 --- a/crates/batcher/src/proposal_manager.rs +++ b/crates/batcher/src/proposal_manager.rs @@ -10,14 +10,13 @@ use starknet_api::executable_transaction::Transaction; use starknet_api::state::ThinStateDiff; use starknet_api::transaction::TransactionHash; use starknet_batcher_types::batcher_types::{ProposalCommitment, ProposalId}; -use starknet_mempool_types::communication::SharedMempoolClient; use thiserror::Error; use tokio::sync::Mutex; use tracing::{debug, error, info, instrument, Instrument}; use crate::batcher::BatcherStorageReaderTrait; use crate::block_builder::{BlockBuilderError, BlockBuilderFactoryTrait, BlockExecutionArtifacts}; -use crate::transaction_provider::{ProposeTransactionProvider, SharedL1ProviderClient}; +use crate::transaction_provider::ProposeTransactionProvider; #[derive(Debug, Error)] pub enum StartHeightError { @@ -73,6 +72,7 @@ pub trait ProposalManagerTrait: Send + Sync { retrospective_block_hash: Option, deadline: tokio::time::Instant, tx_sender: tokio::sync::mpsc::UnboundedSender, + tx_provider: ProposeTransactionProvider, ) -> Result<(), BuildProposalError>; async fn take_proposal_result( @@ -94,8 +94,6 @@ pub trait ProposalManagerTrait: Send + Sync { /// /// Triggered by the consensus. pub(crate) struct ProposalManager { - l1_provider_client: SharedL1ProviderClient, - mempool_client: SharedMempoolClient, storage_reader: Arc, active_height: Option, /// The block proposal that is currently being proposed, if any. @@ -151,13 +149,14 @@ impl ProposalManagerTrait for ProposalManager { /// Starts a new block proposal generation task for the given proposal_id and height with /// transactions from the mempool. /// Requires tx_sender for sending the generated transactions to the caller. - #[instrument(skip(self, tx_sender), err, fields(self.active_height))] + #[instrument(skip(self, tx_sender, tx_provider), err, fields(self.active_height))] async fn build_block_proposal( &mut self, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, tx_sender: tokio::sync::mpsc::UnboundedSender, + tx_provider: ProposeTransactionProvider, ) -> Result<(), BuildProposalError> { let height = self.active_height.ok_or(BuildProposalError::NoActiveHeight)?; if self.executed_proposals.lock().await.contains_key(&proposal_id) { @@ -165,20 +164,19 @@ impl ProposalManagerTrait for ProposalManager { } info!("Starting generation of a new proposal with id {}.", proposal_id); self.set_active_proposal(proposal_id).await?; - let block_builder = - self.block_builder_factory.create_block_builder(height, retrospective_block_hash)?; + let mut block_builder = self.block_builder_factory.create_block_builder( + height, + retrospective_block_hash, + Box::new(tx_provider), + )?; - let tx_provider = ProposeTransactionProvider { - mempool_client: self.mempool_client.clone(), - l1_provider_client: self.l1_provider_client.clone(), - }; let active_proposal = self.active_proposal.clone(); let executed_proposals = self.executed_proposals.clone(); self.active_proposal_handle = Some(tokio::spawn( async move { let result = block_builder - .build_block(deadline, Box::new(tx_provider), tx_sender.clone()) + .build_block(deadline, tx_sender.clone()) .await .map(ProposalOutput::from) .map_err(|e| GetProposalResultError::BlockBuilderError(Arc::new(e))); @@ -221,14 +219,10 @@ impl ProposalManagerTrait for ProposalManager { impl ProposalManager { pub fn new( - l1_provider_client: SharedL1ProviderClient, - mempool_client: SharedMempoolClient, block_builder_factory: Arc, storage_reader: Arc, ) -> Self { Self { - l1_provider_client, - mempool_client, storage_reader, active_proposal: Arc::new(Mutex::new(None)), block_builder_factory, diff --git a/crates/batcher/src/proposal_manager_test.rs b/crates/batcher/src/proposal_manager_test.rs index 5bbf081196..0b6d85f569 100644 --- a/crates/batcher/src/proposal_manager_test.rs +++ b/crates/batcher/src/proposal_manager_test.rs @@ -23,7 +23,7 @@ use crate::proposal_manager::{ ProposalOutput, StartHeightError, }; -use crate::transaction_provider::MockL1ProviderClient; +use crate::transaction_provider::{MockL1ProviderClient, ProposeTransactionProvider}; const INITIAL_HEIGHT: BlockNumber = BlockNumber(3); const BLOCK_GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1); @@ -39,8 +39,8 @@ fn output_streaming() -> ( struct MockDependencies { block_builder_factory: MockBlockBuilderFactoryTrait, - l1_provider_client: MockL1ProviderClient, - mempool_client: MockMempoolClient, + l1_provider_client: Arc, + mempool_client: Arc, storage_reader: MockBatcherStorageReaderTrait, } @@ -50,14 +50,14 @@ impl MockDependencies { let mut mock_block_builder = MockBlockBuilderTrait::new(); mock_block_builder .expect_build_block() - .return_once(move |_, _, _| Ok(BlockExecutionArtifacts::create_for_testing())); + .return_once(move |_, _| Ok(BlockExecutionArtifacts::create_for_testing())); Ok(Box::new(mock_block_builder)) }; self.block_builder_factory .expect_create_block_builder() .times(times) - .returning(move |_, _| simulate_build_block()); + .returning(move |_, _, _| simulate_build_block()); } // This function simulates a long build block operation. This is required for a test that @@ -65,7 +65,7 @@ impl MockDependencies { fn expect_long_build_block(&mut self, times: usize) { let simulate_long_build_block = || -> BlockBuilderResult> { let mut mock_block_builder = MockBlockBuilderTrait::new(); - mock_block_builder.expect_build_block().return_once(move |_, _, _| { + mock_block_builder.expect_build_block().return_once(move |_, _| { std::thread::sleep(BLOCK_GENERATION_TIMEOUT * 10); Ok(BlockExecutionArtifacts::create_for_testing()) }); @@ -75,7 +75,7 @@ impl MockDependencies { self.block_builder_factory .expect_create_block_builder() .times(times) - .returning(move |_, _| simulate_long_build_block()); + .returning(move |_, _, _| simulate_long_build_block()); } } @@ -84,17 +84,22 @@ fn mock_dependencies() -> MockDependencies { let mut storage_reader = MockBatcherStorageReaderTrait::new(); storage_reader.expect_height().returning(|| Ok(INITIAL_HEIGHT)); MockDependencies { - l1_provider_client: MockL1ProviderClient::new(), + l1_provider_client: Arc::new(MockL1ProviderClient::new()), block_builder_factory: MockBlockBuilderFactoryTrait::new(), - mempool_client: MockMempoolClient::new(), + mempool_client: Arc::new(MockMempoolClient::new()), storage_reader, } } +fn propose_tx_provider(mock_dependencies: &MockDependencies) -> ProposeTransactionProvider { + ProposeTransactionProvider { + mempool_client: mock_dependencies.mempool_client.clone(), + l1_provider_client: mock_dependencies.l1_provider_client.clone(), + } +} + fn init_proposal_manager(mock_dependencies: MockDependencies) -> ProposalManager { ProposalManager::new( - Arc::new(mock_dependencies.l1_provider_client), - Arc::new(mock_dependencies.mempool_client), Arc::new(mock_dependencies.block_builder_factory), Arc::new(mock_dependencies.storage_reader), ) @@ -106,11 +111,12 @@ fn proposal_deadline() -> tokio::time::Instant { async fn build_and_await_block_proposal( proposal_manager: &mut ProposalManager, + tx_provider: ProposeTransactionProvider, proposal_id: ProposalId, ) { let (output_sender, _receiver) = output_streaming(); proposal_manager - .build_block_proposal(proposal_id, None, proposal_deadline(), output_sender) + .build_block_proposal(proposal_id, None, proposal_deadline(), output_sender, tx_provider) .await .unwrap(); @@ -157,9 +163,16 @@ async fn proposal_generation_fails_without_start_height( tokio::sync::mpsc::UnboundedReceiver, ), ) { + let tx_provider = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); let err = proposal_manager - .build_block_proposal(ProposalId(0), None, proposal_deadline(), output_streaming.0) + .build_block_proposal( + ProposalId(0), + None, + proposal_deadline(), + output_streaming.0, + tx_provider, + ) .await; assert_matches!(err, Err(BuildProposalError::NoActiveHeight)); } @@ -169,10 +182,11 @@ async fn proposal_generation_fails_without_start_height( async fn proposal_generation_success(mut mock_dependencies: MockDependencies) { mock_dependencies.expect_build_block(1); + let tx_provider = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider, ProposalId(0)).await; } #[rstest] @@ -180,14 +194,16 @@ async fn proposal_generation_success(mut mock_dependencies: MockDependencies) { async fn consecutive_proposal_generations_success(mut mock_dependencies: MockDependencies) { mock_dependencies.expect_build_block(2); + let tx_provider_0 = propose_tx_provider(&mock_dependencies); + let tx_provider_1 = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); // Generate two consecutive proposals (awaiting on them to make sure they finished // successfully). - build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await; - build_and_await_block_proposal(&mut proposal_manager, ProposalId(1)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider_0, ProposalId(0)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider_1, ProposalId(1)).await; } // This test checks that trying to generate a proposal while another one is being generated will @@ -199,6 +215,8 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc // Generate a block builder with a very long build block operation. mock_dependencies.expect_long_build_block(1); + let tx_provider_0 = propose_tx_provider(&mock_dependencies); + let tx_provider_1 = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); @@ -206,14 +224,26 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc // Build a proposal that will take a very long time to finish. let (output_sender_0, _rec_0) = output_streaming(); proposal_manager - .build_block_proposal(ProposalId(0), None, proposal_deadline(), output_sender_0) + .build_block_proposal( + ProposalId(0), + None, + proposal_deadline(), + output_sender_0, + tx_provider_0, + ) .await .unwrap(); // Try to generate another proposal while the first one is still being generated. let (output_sender_1, _rec_1) = output_streaming(); let another_generate_request = proposal_manager - .build_block_proposal(ProposalId(1), None, proposal_deadline(), output_sender_1) + .build_block_proposal( + ProposalId(1), + None, + proposal_deadline(), + output_sender_1, + tx_provider_1, + ) .await; assert_matches!( another_generate_request, @@ -229,11 +259,12 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc async fn test_take_proposal_result_no_active_proposal(mut mock_dependencies: MockDependencies) { mock_dependencies.expect_build_block(1); + let tx_provider = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider, ProposalId(0)).await; let expected_proposal_output = ProposalOutput::from(BlockExecutionArtifacts::create_for_testing()); @@ -255,14 +286,22 @@ async fn test_abort_and_restart_height(mut mock_dependencies: MockDependencies) // Start a new height and create a proposal. let (output_tx_sender, _receiver) = output_streaming(); + let tx_provider_0 = propose_tx_provider(&mock_dependencies); + let tx_provider_1 = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider_0, ProposalId(0)).await; // Start a new proposal, which will remain active. assert!( proposal_manager - .build_block_proposal(ProposalId(1), None, proposal_deadline(), output_tx_sender) + .build_block_proposal( + ProposalId(1), + None, + proposal_deadline(), + output_tx_sender, + tx_provider_1 + ) .await .is_ok() );