From 9296302b7c805f0f4a895c22569b51e5d894f3d5 Mon Sep 17 00:00:00 2001 From: Yair <92672946+yair-starkware@users.noreply.github.com> Date: Tue, 12 Nov 2024 12:54:49 +0200 Subject: [PATCH] refactor(batcher): create the tx provider in the batcher (#1879) --- crates/batcher/src/batcher.rs | 17 ++--- crates/batcher/src/batcher_test.rs | 6 +- crates/batcher/src/proposal_manager.rs | 17 ++--- crates/batcher/src/proposal_manager_test.rs | 73 ++++++++++++++++----- 4 files changed, 74 insertions(+), 39 deletions(-) diff --git a/crates/batcher/src/batcher.rs b/crates/batcher/src/batcher.rs index 24f90d67aa..6c0629c709 100644 --- a/crates/batcher/src/batcher.rs +++ b/crates/batcher/src/batcher.rs @@ -40,7 +40,7 @@ use crate::proposal_manager::{ ProposalStatus, StartHeightError, }; -use crate::transaction_provider::DummyL1ProviderClient; +use crate::transaction_provider::{DummyL1ProviderClient, ProposeTransactionProvider}; type OutputStreamReceiver = tokio::sync::mpsc::UnboundedReceiver; type InputStreamSender = tokio::sync::mpsc::Sender; @@ -92,6 +92,11 @@ impl Batcher { )?); let (tx_sender, tx_receiver) = tokio::sync::mpsc::unbounded_channel(); + let tx_provider = ProposeTransactionProvider { + mempool_client: self.mempool_client.clone(), + // TODO: use a real L1 provider client. + l1_provider_client: Arc::new(DummyL1ProviderClient), + }; self.proposal_manager .build_block_proposal( @@ -99,6 +104,7 @@ impl Batcher { build_proposal_input.retrospective_block_hash, deadline, tx_sender, + tx_provider, ) .await .map_err(BatcherError::from)?; @@ -248,7 +254,6 @@ impl Batcher { } pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher { - let l1_provider_client = Arc::new(DummyL1ProviderClient); let (storage_reader, storage_writer) = papyrus_storage::open_storage(config.storage.clone()) .expect("Failed to open batcher's storage"); @@ -259,12 +264,8 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient }); let storage_reader = Arc::new(storage_reader); let storage_writer = Box::new(storage_writer); - let proposal_manager = Box::new(ProposalManager::new( - l1_provider_client, - mempool_client.clone(), - block_builder_factory, - storage_reader.clone(), - )); + let proposal_manager = + Box::new(ProposalManager::new(block_builder_factory, storage_reader.clone())); Batcher::new(config, storage_reader, storage_writer, mempool_client, proposal_manager) } diff --git a/crates/batcher/src/batcher_test.rs b/crates/batcher/src/batcher_test.rs index 93552bbe59..e9c346ae3f 100644 --- a/crates/batcher/src/batcher_test.rs +++ b/crates/batcher/src/batcher_test.rs @@ -41,6 +41,7 @@ use crate::proposal_manager::{ StartHeightError, }; use crate::test_utils::test_txs; +use crate::transaction_provider::ProposeTransactionProvider; const INITIAL_HEIGHT: BlockNumber = BlockNumber(3); const STREAMING_CHUNK_SIZE: usize = 3; @@ -86,7 +87,7 @@ async fn get_stream_content( let mut proposal_manager = MockProposalManagerTraitWrapper::new(); proposal_manager.expect_wrap_start_height().return_once(|_| async { Ok(()) }.boxed()); proposal_manager.expect_wrap_build_block_proposal().return_once( - move |_proposal_id, _block_hash, _deadline, tx_sender| { + move |_proposal_id, _block_hash, _deadline, tx_sender, _tx_provider| { simulate_build_block_proposal(tx_sender, txs_to_stream).boxed() }, ); @@ -249,6 +250,7 @@ trait ProposalManagerTraitWrapper: Send + Sync { retrospective_block_hash: Option, deadline: tokio::time::Instant, output_content_sender: tokio::sync::mpsc::UnboundedSender, + tx_provider: ProposeTransactionProvider, ) -> BoxFuture<'_, Result<(), BuildProposalError>>; fn wrap_take_proposal_result( @@ -276,12 +278,14 @@ impl ProposalManagerTrait for T { retrospective_block_hash: Option, deadline: tokio::time::Instant, output_content_sender: tokio::sync::mpsc::UnboundedSender, + tx_provider: ProposeTransactionProvider, ) -> Result<(), BuildProposalError> { self.wrap_build_block_proposal( proposal_id, retrospective_block_hash, deadline, output_content_sender, + tx_provider, ) .await } diff --git a/crates/batcher/src/proposal_manager.rs b/crates/batcher/src/proposal_manager.rs index bb5321b72c..32e684d87b 100644 --- a/crates/batcher/src/proposal_manager.rs +++ b/crates/batcher/src/proposal_manager.rs @@ -10,14 +10,13 @@ use starknet_api::executable_transaction::Transaction; use starknet_api::state::ThinStateDiff; use starknet_api::transaction::TransactionHash; use starknet_batcher_types::batcher_types::{ProposalCommitment, ProposalId}; -use starknet_mempool_types::communication::SharedMempoolClient; use thiserror::Error; use tokio::sync::Mutex; use tracing::{debug, error, info, instrument, Instrument}; use crate::batcher::BatcherStorageReaderTrait; use crate::block_builder::{BlockBuilderError, BlockBuilderFactoryTrait, BlockExecutionArtifacts}; -use crate::transaction_provider::{ProposeTransactionProvider, SharedL1ProviderClient}; +use crate::transaction_provider::ProposeTransactionProvider; #[derive(Debug, Error)] pub enum StartHeightError { @@ -78,6 +77,7 @@ pub trait ProposalManagerTrait: Send + Sync { retrospective_block_hash: Option, deadline: tokio::time::Instant, tx_sender: tokio::sync::mpsc::UnboundedSender, + tx_provider: ProposeTransactionProvider, ) -> Result<(), BuildProposalError>; async fn take_proposal_result( @@ -101,8 +101,6 @@ pub trait ProposalManagerTrait: Send + Sync { /// /// Triggered by the consensus. pub(crate) struct ProposalManager { - l1_provider_client: SharedL1ProviderClient, - mempool_client: SharedMempoolClient, storage_reader: Arc, active_height: Option, /// The block proposal that is currently being proposed, if any. @@ -158,13 +156,14 @@ impl ProposalManagerTrait for ProposalManager { /// Starts a new block proposal generation task for the given proposal_id and height with /// transactions from the mempool. /// Requires tx_sender for sending the generated transactions to the caller. - #[instrument(skip(self, tx_sender), err, fields(self.active_height))] + #[instrument(skip(self, tx_sender, tx_provider), err, fields(self.active_height))] async fn build_block_proposal( &mut self, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, tx_sender: tokio::sync::mpsc::UnboundedSender, + tx_provider: ProposeTransactionProvider, ) -> Result<(), BuildProposalError> { self.set_active_proposal(proposal_id).await?; @@ -172,10 +171,6 @@ impl ProposalManagerTrait for ProposalManager { let height = self.active_height.expect("No active height."); - let tx_provider = ProposeTransactionProvider { - mempool_client: self.mempool_client.clone(), - l1_provider_client: self.l1_provider_client.clone(), - }; let mut block_builder = self.block_builder_factory.create_block_builder( height, retrospective_block_hash, @@ -256,14 +251,10 @@ impl ProposalManagerTrait for ProposalManager { impl ProposalManager { pub fn new( - l1_provider_client: SharedL1ProviderClient, - mempool_client: SharedMempoolClient, block_builder_factory: Arc, storage_reader: Arc, ) -> Self { Self { - l1_provider_client, - mempool_client, storage_reader, active_proposal: Arc::new(Mutex::new(None)), block_builder_factory, diff --git a/crates/batcher/src/proposal_manager_test.rs b/crates/batcher/src/proposal_manager_test.rs index d749a6b2fa..8562672a6f 100644 --- a/crates/batcher/src/proposal_manager_test.rs +++ b/crates/batcher/src/proposal_manager_test.rs @@ -23,7 +23,7 @@ use crate::proposal_manager::{ ProposalOutput, StartHeightError, }; -use crate::transaction_provider::MockL1ProviderClient; +use crate::transaction_provider::{MockL1ProviderClient, ProposeTransactionProvider}; const INITIAL_HEIGHT: BlockNumber = BlockNumber(3); const BLOCK_GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1); @@ -39,8 +39,8 @@ fn output_streaming() -> ( struct MockDependencies { block_builder_factory: MockBlockBuilderFactoryTrait, - l1_provider_client: MockL1ProviderClient, - mempool_client: MockMempoolClient, + l1_provider_client: Arc, + mempool_client: Arc, storage_reader: MockBatcherStorageReaderTrait, } @@ -84,17 +84,22 @@ fn mock_dependencies() -> MockDependencies { let mut storage_reader = MockBatcherStorageReaderTrait::new(); storage_reader.expect_height().returning(|| Ok(INITIAL_HEIGHT)); MockDependencies { - l1_provider_client: MockL1ProviderClient::new(), + l1_provider_client: Arc::new(MockL1ProviderClient::new()), block_builder_factory: MockBlockBuilderFactoryTrait::new(), - mempool_client: MockMempoolClient::new(), + mempool_client: Arc::new(MockMempoolClient::new()), storage_reader, } } +fn propose_tx_provider(mock_dependencies: &MockDependencies) -> ProposeTransactionProvider { + ProposeTransactionProvider { + mempool_client: mock_dependencies.mempool_client.clone(), + l1_provider_client: mock_dependencies.l1_provider_client.clone(), + } +} + fn init_proposal_manager(mock_dependencies: MockDependencies) -> ProposalManager { ProposalManager::new( - Arc::new(mock_dependencies.l1_provider_client), - Arc::new(mock_dependencies.mempool_client), Arc::new(mock_dependencies.block_builder_factory), Arc::new(mock_dependencies.storage_reader), ) @@ -106,11 +111,12 @@ fn proposal_deadline() -> tokio::time::Instant { async fn build_and_await_block_proposal( proposal_manager: &mut ProposalManager, + tx_provider: ProposeTransactionProvider, proposal_id: ProposalId, ) { let (output_sender, _receiver) = output_streaming(); proposal_manager - .build_block_proposal(proposal_id, None, proposal_deadline(), output_sender) + .build_block_proposal(proposal_id, None, proposal_deadline(), output_sender, tx_provider) .await .unwrap(); @@ -157,9 +163,16 @@ async fn proposal_generation_fails_without_start_height( tokio::sync::mpsc::UnboundedReceiver, ), ) { + let tx_provider = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); let err = proposal_manager - .build_block_proposal(ProposalId(0), None, proposal_deadline(), output_streaming.0) + .build_block_proposal( + ProposalId(0), + None, + proposal_deadline(), + output_streaming.0, + tx_provider, + ) .await; assert_matches!(err, Err(BuildProposalError::NoActiveHeight)); } @@ -169,10 +182,11 @@ async fn proposal_generation_fails_without_start_height( async fn proposal_generation_success(mut mock_dependencies: MockDependencies) { mock_dependencies.expect_build_block(1); + let tx_provider = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider, ProposalId(0)).await; } #[rstest] @@ -180,14 +194,16 @@ async fn proposal_generation_success(mut mock_dependencies: MockDependencies) { async fn consecutive_proposal_generations_success(mut mock_dependencies: MockDependencies) { mock_dependencies.expect_build_block(2); + let tx_provider_0 = propose_tx_provider(&mock_dependencies); + let tx_provider_1 = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); // Generate two consecutive proposals (awaiting on them to make sure they finished // successfully). - build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await; - build_and_await_block_proposal(&mut proposal_manager, ProposalId(1)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider_0, ProposalId(0)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider_1, ProposalId(1)).await; } // This test checks that trying to generate a proposal while another one is being generated will @@ -199,6 +215,8 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc // Generate a block builder with a very long build block operation. mock_dependencies.expect_long_build_block(1); + let tx_provider_0 = propose_tx_provider(&mock_dependencies); + let tx_provider_1 = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); @@ -206,14 +224,26 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc // Build a proposal that will take a very long time to finish. let (output_sender_0, _rec_0) = output_streaming(); proposal_manager - .build_block_proposal(ProposalId(0), None, proposal_deadline(), output_sender_0) + .build_block_proposal( + ProposalId(0), + None, + proposal_deadline(), + output_sender_0, + tx_provider_0, + ) .await .unwrap(); // Try to generate another proposal while the first one is still being generated. let (output_sender_1, _rec_1) = output_streaming(); let another_generate_request = proposal_manager - .build_block_proposal(ProposalId(1), None, proposal_deadline(), output_sender_1) + .build_block_proposal( + ProposalId(1), + None, + proposal_deadline(), + output_sender_1, + tx_provider_1, + ) .await; assert_matches!( another_generate_request, @@ -229,11 +259,12 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc async fn test_take_proposal_result_no_active_proposal(mut mock_dependencies: MockDependencies) { mock_dependencies.expect_build_block(1); + let tx_provider = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider, ProposalId(0)).await; let expected_proposal_output = ProposalOutput::from(BlockExecutionArtifacts::create_for_testing()); @@ -255,14 +286,22 @@ async fn test_abort_and_restart_height(mut mock_dependencies: MockDependencies) // Start a new height and create a proposal. let (output_tx_sender, _receiver) = output_streaming(); + let tx_provider_0 = propose_tx_provider(&mock_dependencies); + let tx_provider_1 = propose_tx_provider(&mock_dependencies); let mut proposal_manager = init_proposal_manager(mock_dependencies); proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await; + build_and_await_block_proposal(&mut proposal_manager, tx_provider_0, ProposalId(0)).await; // Start a new proposal, which will remain active. assert!( proposal_manager - .build_block_proposal(ProposalId(1), None, proposal_deadline(), output_tx_sender) + .build_block_proposal( + ProposalId(1), + None, + proposal_deadline(), + output_tx_sender, + tx_provider_1 + ) .await .is_ok() );