Skip to content

Commit

Permalink
refactor(batcher): create the tx provider in the batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware committed Nov 12, 2024
1 parent d556b2c commit cb88966
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 39 deletions.
17 changes: 9 additions & 8 deletions crates/batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::proposal_manager::{
ProposalStatus,
StartHeightError,
};
use crate::transaction_provider::DummyL1ProviderClient;
use crate::transaction_provider::{DummyL1ProviderClient, ProposeTransactionProvider};

type OutputStreamReceiver = tokio::sync::mpsc::UnboundedReceiver<Transaction>;
type InputStreamSender = tokio::sync::mpsc::Sender<Transaction>;
Expand Down Expand Up @@ -92,13 +92,19 @@ impl Batcher {
)?);

let (tx_sender, tx_receiver) = tokio::sync::mpsc::unbounded_channel();
let tx_provider = ProposeTransactionProvider {
mempool_client: self.mempool_client.clone(),
// TODO: use a real L1 provider client.
l1_provider_client: Arc::new(DummyL1ProviderClient),
};

self.proposal_manager
.build_block_proposal(
build_proposal_input.proposal_id,
build_proposal_input.retrospective_block_hash,
deadline,
tx_sender,
tx_provider,
)
.await
.map_err(BatcherError::from)?;
Expand Down Expand Up @@ -248,7 +254,6 @@ impl Batcher {
}

pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher {
let l1_provider_client = Arc::new(DummyL1ProviderClient);
let (storage_reader, storage_writer) = papyrus_storage::open_storage(config.storage.clone())
.expect("Failed to open batcher's storage");

Expand All @@ -259,12 +264,8 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient
});
let storage_reader = Arc::new(storage_reader);
let storage_writer = Box::new(storage_writer);
let proposal_manager = Box::new(ProposalManager::new(
l1_provider_client,
mempool_client.clone(),
block_builder_factory,
storage_reader.clone(),
));
let proposal_manager =
Box::new(ProposalManager::new(block_builder_factory, storage_reader.clone()));
Batcher::new(config, storage_reader, storage_writer, mempool_client, proposal_manager)
}

Expand Down
6 changes: 5 additions & 1 deletion crates/batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::proposal_manager::{
StartHeightError,
};
use crate::test_utils::test_txs;
use crate::transaction_provider::ProposeTransactionProvider;

const INITIAL_HEIGHT: BlockNumber = BlockNumber(3);
const STREAMING_CHUNK_SIZE: usize = 3;
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn get_stream_content(
let mut proposal_manager = MockProposalManagerTraitWrapper::new();
proposal_manager.expect_wrap_start_height().return_once(|_| async { Ok(()) }.boxed());
proposal_manager.expect_wrap_build_block_proposal().return_once(
move |_proposal_id, _block_hash, _deadline, tx_sender| {
move |_proposal_id, _block_hash, _deadline, tx_sender, _tx_provider| {
simulate_build_block_proposal(tx_sender, txs_to_stream).boxed()
},
);
Expand Down Expand Up @@ -249,6 +250,7 @@ trait ProposalManagerTraitWrapper: Send + Sync {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> BoxFuture<'_, Result<(), BuildProposalError>>;

fn wrap_take_proposal_result(
Expand Down Expand Up @@ -276,12 +278,14 @@ impl<T: ProposalManagerTraitWrapper> ProposalManagerTrait for T {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError> {
self.wrap_build_block_proposal(
proposal_id,
retrospective_block_hash,
deadline,
output_content_sender,
tx_provider,
)
.await
}
Expand Down
17 changes: 4 additions & 13 deletions crates/batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ use starknet_api::executable_transaction::Transaction;
use starknet_api::state::ThinStateDiff;
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{ProposalCommitment, ProposalId};
use starknet_mempool_types::communication::SharedMempoolClient;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, Instrument};

use crate::batcher::BatcherStorageReaderTrait;
use crate::block_builder::{BlockBuilderError, BlockBuilderFactoryTrait, BlockExecutionArtifacts};
use crate::transaction_provider::{ProposeTransactionProvider, SharedL1ProviderClient};
use crate::transaction_provider::ProposeTransactionProvider;

#[derive(Debug, Error)]
pub enum StartHeightError {
Expand Down Expand Up @@ -78,6 +77,7 @@ pub trait ProposalManagerTrait: Send + Sync {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError>;

async fn take_proposal_result(
Expand All @@ -101,8 +101,6 @@ pub trait ProposalManagerTrait: Send + Sync {
///
/// Triggered by the consensus.
pub(crate) struct ProposalManager {
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
storage_reader: Arc<dyn BatcherStorageReaderTrait>,
active_height: Option<BlockNumber>,
/// The block proposal that is currently being proposed, if any.
Expand Down Expand Up @@ -158,24 +156,21 @@ impl ProposalManagerTrait for ProposalManager {
/// Starts a new block proposal generation task for the given proposal_id and height with
/// transactions from the mempool.
/// Requires tx_sender for sending the generated transactions to the caller.
#[instrument(skip(self, tx_sender), err, fields(self.active_height))]
#[instrument(skip(self, tx_sender, tx_provider), err, fields(self.active_height))]
async fn build_block_proposal(
&mut self,
proposal_id: ProposalId,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError> {
self.set_active_proposal(proposal_id).await?;

info!("Starting generation of a new proposal with id {}.", proposal_id);

let height = self.active_height.expect("No active height.");

let tx_provider = ProposeTransactionProvider {
mempool_client: self.mempool_client.clone(),
l1_provider_client: self.l1_provider_client.clone(),
};
let mut block_builder = self.block_builder_factory.create_block_builder(
height,
retrospective_block_hash,
Expand Down Expand Up @@ -256,14 +251,10 @@ impl ProposalManagerTrait for ProposalManager {

impl ProposalManager {
pub fn new(
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
block_builder_factory: Arc<dyn BlockBuilderFactoryTrait + Send + Sync>,
storage_reader: Arc<dyn BatcherStorageReaderTrait>,
) -> Self {
Self {
l1_provider_client,
mempool_client,
storage_reader,
active_proposal: Arc::new(Mutex::new(None)),
block_builder_factory,
Expand Down
73 changes: 56 additions & 17 deletions crates/batcher/src/proposal_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::proposal_manager::{
ProposalOutput,
StartHeightError,
};
use crate::transaction_provider::MockL1ProviderClient;
use crate::transaction_provider::{MockL1ProviderClient, ProposeTransactionProvider};

const INITIAL_HEIGHT: BlockNumber = BlockNumber(3);
const BLOCK_GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1);
Expand All @@ -39,8 +39,8 @@ fn output_streaming() -> (

struct MockDependencies {
block_builder_factory: MockBlockBuilderFactoryTrait,
l1_provider_client: MockL1ProviderClient,
mempool_client: MockMempoolClient,
l1_provider_client: Arc<MockL1ProviderClient>,
mempool_client: Arc<MockMempoolClient>,
storage_reader: MockBatcherStorageReaderTrait,
}

Expand Down Expand Up @@ -84,17 +84,22 @@ fn mock_dependencies() -> MockDependencies {
let mut storage_reader = MockBatcherStorageReaderTrait::new();
storage_reader.expect_height().returning(|| Ok(INITIAL_HEIGHT));
MockDependencies {
l1_provider_client: MockL1ProviderClient::new(),
l1_provider_client: Arc::new(MockL1ProviderClient::new()),
block_builder_factory: MockBlockBuilderFactoryTrait::new(),
mempool_client: MockMempoolClient::new(),
mempool_client: Arc::new(MockMempoolClient::new()),
storage_reader,
}
}

fn propose_tx_provider(mock_dependencies: &MockDependencies) -> ProposeTransactionProvider {
ProposeTransactionProvider {
mempool_client: mock_dependencies.mempool_client.clone(),
l1_provider_client: mock_dependencies.l1_provider_client.clone(),
}
}

fn init_proposal_manager(mock_dependencies: MockDependencies) -> ProposalManager {
ProposalManager::new(
Arc::new(mock_dependencies.l1_provider_client),
Arc::new(mock_dependencies.mempool_client),
Arc::new(mock_dependencies.block_builder_factory),
Arc::new(mock_dependencies.storage_reader),
)
Expand All @@ -106,11 +111,12 @@ fn proposal_deadline() -> tokio::time::Instant {

async fn build_and_await_block_proposal(
proposal_manager: &mut ProposalManager,
tx_provider: ProposeTransactionProvider,
proposal_id: ProposalId,
) {
let (output_sender, _receiver) = output_streaming();
proposal_manager
.build_block_proposal(proposal_id, None, proposal_deadline(), output_sender)
.build_block_proposal(proposal_id, None, proposal_deadline(), output_sender, tx_provider)
.await
.unwrap();

Expand Down Expand Up @@ -157,9 +163,16 @@ async fn proposal_generation_fails_without_start_height(
tokio::sync::mpsc::UnboundedReceiver<Transaction>,
),
) {
let tx_provider = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);
let err = proposal_manager
.build_block_proposal(ProposalId(0), None, proposal_deadline(), output_streaming.0)
.build_block_proposal(
ProposalId(0),
None,
proposal_deadline(),
output_streaming.0,
tx_provider,
)
.await;
assert_matches!(err, Err(BuildProposalError::NoActiveHeight));
}
Expand All @@ -169,25 +182,28 @@ async fn proposal_generation_fails_without_start_height(
async fn proposal_generation_success(mut mock_dependencies: MockDependencies) {
mock_dependencies.expect_build_block(1);

let tx_provider = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();
build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider, ProposalId(0)).await;
}

#[rstest]
#[tokio::test]
async fn consecutive_proposal_generations_success(mut mock_dependencies: MockDependencies) {
mock_dependencies.expect_build_block(2);

let tx_provider_0 = propose_tx_provider(&mock_dependencies);
let tx_provider_1 = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();

// Generate two consecutive proposals (awaiting on them to make sure they finished
// successfully).
build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, ProposalId(1)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider_0, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider_1, ProposalId(1)).await;
}

// This test checks that trying to generate a proposal while another one is being generated will
Expand All @@ -199,21 +215,35 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc
// Generate a block builder with a very long build block operation.
mock_dependencies.expect_long_build_block(1);

let tx_provider_0 = propose_tx_provider(&mock_dependencies);
let tx_provider_1 = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();

// Build a proposal that will take a very long time to finish.
let (output_sender_0, _rec_0) = output_streaming();
proposal_manager
.build_block_proposal(ProposalId(0), None, proposal_deadline(), output_sender_0)
.build_block_proposal(
ProposalId(0),
None,
proposal_deadline(),
output_sender_0,
tx_provider_0,
)
.await
.unwrap();

// Try to generate another proposal while the first one is still being generated.
let (output_sender_1, _rec_1) = output_streaming();
let another_generate_request = proposal_manager
.build_block_proposal(ProposalId(1), None, proposal_deadline(), output_sender_1)
.build_block_proposal(
ProposalId(1),
None,
proposal_deadline(),
output_sender_1,
tx_provider_1,
)
.await;
assert_matches!(
another_generate_request,
Expand All @@ -229,11 +259,12 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc
async fn test_take_proposal_result_no_active_proposal(mut mock_dependencies: MockDependencies) {
mock_dependencies.expect_build_block(1);

let tx_provider = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();

build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider, ProposalId(0)).await;

let expected_proposal_output =
ProposalOutput::from(BlockExecutionArtifacts::create_for_testing());
Expand All @@ -255,14 +286,22 @@ async fn test_abort_and_restart_height(mut mock_dependencies: MockDependencies)

// Start a new height and create a proposal.
let (output_tx_sender, _receiver) = output_streaming();
let tx_provider_0 = propose_tx_provider(&mock_dependencies);
let tx_provider_1 = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);
proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();
build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider_0, ProposalId(0)).await;

// Start a new proposal, which will remain active.
assert!(
proposal_manager
.build_block_proposal(ProposalId(1), None, proposal_deadline(), output_tx_sender)
.build_block_proposal(
ProposalId(1),
None,
proposal_deadline(),
output_tx_sender,
tx_provider_1
)
.await
.is_ok()
);
Expand Down

0 comments on commit cb88966

Please sign in to comment.