Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(batcher): create the tx provider in the batcher #1879

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions crates/batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::proposal_manager::{
ProposalStatus,
StartHeightError,
};
use crate::transaction_provider::DummyL1ProviderClient;
use crate::transaction_provider::{DummyL1ProviderClient, ProposeTransactionProvider};

type OutputStreamReceiver = tokio::sync::mpsc::UnboundedReceiver<Transaction>;
type InputStreamSender = tokio::sync::mpsc::Sender<Transaction>;
Expand Down Expand Up @@ -92,13 +92,19 @@ impl Batcher {
)?);

let (tx_sender, tx_receiver) = tokio::sync::mpsc::unbounded_channel();
let tx_provider = ProposeTransactionProvider {
mempool_client: self.mempool_client.clone(),
// TODO: use a real L1 provider client.
l1_provider_client: Arc::new(DummyL1ProviderClient),
};

self.proposal_manager
.build_block_proposal(
build_proposal_input.proposal_id,
build_proposal_input.retrospective_block_hash,
deadline,
tx_sender,
tx_provider,
)
.await
.map_err(BatcherError::from)?;
Expand Down Expand Up @@ -248,7 +254,6 @@ impl Batcher {
}

pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher {
let l1_provider_client = Arc::new(DummyL1ProviderClient);
let (storage_reader, storage_writer) = papyrus_storage::open_storage(config.storage.clone())
.expect("Failed to open batcher's storage");

Expand All @@ -259,12 +264,8 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient
});
let storage_reader = Arc::new(storage_reader);
let storage_writer = Box::new(storage_writer);
let proposal_manager = Box::new(ProposalManager::new(
l1_provider_client,
mempool_client.clone(),
block_builder_factory,
storage_reader.clone(),
));
let proposal_manager =
Box::new(ProposalManager::new(block_builder_factory, storage_reader.clone()));
Batcher::new(config, storage_reader, storage_writer, mempool_client, proposal_manager)
}

Expand Down
6 changes: 5 additions & 1 deletion crates/batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::proposal_manager::{
StartHeightError,
};
use crate::test_utils::test_txs;
use crate::transaction_provider::ProposeTransactionProvider;

const INITIAL_HEIGHT: BlockNumber = BlockNumber(3);
const STREAMING_CHUNK_SIZE: usize = 3;
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn get_stream_content(
let mut proposal_manager = MockProposalManagerTraitWrapper::new();
proposal_manager.expect_wrap_start_height().return_once(|_| async { Ok(()) }.boxed());
proposal_manager.expect_wrap_build_block_proposal().return_once(
move |_proposal_id, _block_hash, _deadline, tx_sender| {
move |_proposal_id, _block_hash, _deadline, tx_sender, _tx_provider| {
simulate_build_block_proposal(tx_sender, txs_to_stream).boxed()
},
);
Expand Down Expand Up @@ -249,6 +250,7 @@ trait ProposalManagerTraitWrapper: Send + Sync {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> BoxFuture<'_, Result<(), BuildProposalError>>;

fn wrap_take_proposal_result(
Expand Down Expand Up @@ -276,12 +278,14 @@ impl<T: ProposalManagerTraitWrapper> ProposalManagerTrait for T {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError> {
self.wrap_build_block_proposal(
proposal_id,
retrospective_block_hash,
deadline,
output_content_sender,
tx_provider,
)
.await
}
Expand Down
17 changes: 4 additions & 13 deletions crates/batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ use starknet_api::executable_transaction::Transaction;
use starknet_api::state::ThinStateDiff;
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{ProposalCommitment, ProposalId};
use starknet_mempool_types::communication::SharedMempoolClient;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, Instrument};

use crate::batcher::BatcherStorageReaderTrait;
use crate::block_builder::{BlockBuilderError, BlockBuilderFactoryTrait, BlockExecutionArtifacts};
use crate::transaction_provider::{ProposeTransactionProvider, SharedL1ProviderClient};
use crate::transaction_provider::ProposeTransactionProvider;

#[derive(Debug, Error)]
pub enum StartHeightError {
Expand Down Expand Up @@ -78,6 +77,7 @@ pub trait ProposalManagerTrait: Send + Sync {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError>;

async fn take_proposal_result(
Expand All @@ -101,8 +101,6 @@ pub trait ProposalManagerTrait: Send + Sync {
///
/// Triggered by the consensus.
pub(crate) struct ProposalManager {
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
storage_reader: Arc<dyn BatcherStorageReaderTrait>,
active_height: Option<BlockNumber>,
/// The block proposal that is currently being proposed, if any.
Expand Down Expand Up @@ -158,24 +156,21 @@ impl ProposalManagerTrait for ProposalManager {
/// Starts a new block proposal generation task for the given proposal_id and height with
/// transactions from the mempool.
/// Requires tx_sender for sending the generated transactions to the caller.
#[instrument(skip(self, tx_sender), err, fields(self.active_height))]
#[instrument(skip(self, tx_sender, tx_provider), err, fields(self.active_height))]
async fn build_block_proposal(
&mut self,
proposal_id: ProposalId,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError> {
self.set_active_proposal(proposal_id).await?;

info!("Starting generation of a new proposal with id {}.", proposal_id);

let height = self.active_height.expect("No active height.");

let tx_provider = ProposeTransactionProvider {
mempool_client: self.mempool_client.clone(),
l1_provider_client: self.l1_provider_client.clone(),
};
let mut block_builder = self.block_builder_factory.create_block_builder(
height,
retrospective_block_hash,
Expand Down Expand Up @@ -256,14 +251,10 @@ impl ProposalManagerTrait for ProposalManager {

impl ProposalManager {
pub fn new(
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
block_builder_factory: Arc<dyn BlockBuilderFactoryTrait + Send + Sync>,
storage_reader: Arc<dyn BatcherStorageReaderTrait>,
) -> Self {
Self {
l1_provider_client,
mempool_client,
storage_reader,
active_proposal: Arc::new(Mutex::new(None)),
block_builder_factory,
Expand Down
73 changes: 56 additions & 17 deletions crates/batcher/src/proposal_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::proposal_manager::{
ProposalOutput,
StartHeightError,
};
use crate::transaction_provider::MockL1ProviderClient;
use crate::transaction_provider::{MockL1ProviderClient, ProposeTransactionProvider};

const INITIAL_HEIGHT: BlockNumber = BlockNumber(3);
const BLOCK_GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1);
Expand All @@ -39,8 +39,8 @@ fn output_streaming() -> (

struct MockDependencies {
block_builder_factory: MockBlockBuilderFactoryTrait,
l1_provider_client: MockL1ProviderClient,
mempool_client: MockMempoolClient,
l1_provider_client: Arc<MockL1ProviderClient>,
mempool_client: Arc<MockMempoolClient>,
storage_reader: MockBatcherStorageReaderTrait,
}

Expand Down Expand Up @@ -84,17 +84,22 @@ fn mock_dependencies() -> MockDependencies {
let mut storage_reader = MockBatcherStorageReaderTrait::new();
storage_reader.expect_height().returning(|| Ok(INITIAL_HEIGHT));
MockDependencies {
l1_provider_client: MockL1ProviderClient::new(),
l1_provider_client: Arc::new(MockL1ProviderClient::new()),
block_builder_factory: MockBlockBuilderFactoryTrait::new(),
mempool_client: MockMempoolClient::new(),
mempool_client: Arc::new(MockMempoolClient::new()),
storage_reader,
}
}

fn propose_tx_provider(mock_dependencies: &MockDependencies) -> ProposeTransactionProvider {
ProposeTransactionProvider {
mempool_client: mock_dependencies.mempool_client.clone(),
l1_provider_client: mock_dependencies.l1_provider_client.clone(),
}
}

fn init_proposal_manager(mock_dependencies: MockDependencies) -> ProposalManager {
ProposalManager::new(
Arc::new(mock_dependencies.l1_provider_client),
Arc::new(mock_dependencies.mempool_client),
Arc::new(mock_dependencies.block_builder_factory),
Arc::new(mock_dependencies.storage_reader),
)
Expand All @@ -106,11 +111,12 @@ fn proposal_deadline() -> tokio::time::Instant {

async fn build_and_await_block_proposal(
proposal_manager: &mut ProposalManager,
tx_provider: ProposeTransactionProvider,
proposal_id: ProposalId,
) {
let (output_sender, _receiver) = output_streaming();
proposal_manager
.build_block_proposal(proposal_id, None, proposal_deadline(), output_sender)
.build_block_proposal(proposal_id, None, proposal_deadline(), output_sender, tx_provider)
.await
.unwrap();

Expand Down Expand Up @@ -157,9 +163,16 @@ async fn proposal_generation_fails_without_start_height(
tokio::sync::mpsc::UnboundedReceiver<Transaction>,
),
) {
let tx_provider = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);
let err = proposal_manager
.build_block_proposal(ProposalId(0), None, proposal_deadline(), output_streaming.0)
.build_block_proposal(
ProposalId(0),
None,
proposal_deadline(),
output_streaming.0,
tx_provider,
)
.await;
assert_matches!(err, Err(BuildProposalError::NoActiveHeight));
}
Expand All @@ -169,25 +182,28 @@ async fn proposal_generation_fails_without_start_height(
async fn proposal_generation_success(mut mock_dependencies: MockDependencies) {
mock_dependencies.expect_build_block(1);

let tx_provider = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();
build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider, ProposalId(0)).await;
}

#[rstest]
#[tokio::test]
async fn consecutive_proposal_generations_success(mut mock_dependencies: MockDependencies) {
mock_dependencies.expect_build_block(2);

let tx_provider_0 = propose_tx_provider(&mock_dependencies);
let tx_provider_1 = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();

// Generate two consecutive proposals (awaiting on them to make sure they finished
// successfully).
build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, ProposalId(1)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider_0, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider_1, ProposalId(1)).await;
}

// This test checks that trying to generate a proposal while another one is being generated will
Expand All @@ -199,21 +215,35 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc
// Generate a block builder with a very long build block operation.
mock_dependencies.expect_long_build_block(1);

let tx_provider_0 = propose_tx_provider(&mock_dependencies);
let tx_provider_1 = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();

// Build a proposal that will take a very long time to finish.
let (output_sender_0, _rec_0) = output_streaming();
proposal_manager
.build_block_proposal(ProposalId(0), None, proposal_deadline(), output_sender_0)
.build_block_proposal(
ProposalId(0),
None,
proposal_deadline(),
output_sender_0,
tx_provider_0,
)
.await
.unwrap();

// Try to generate another proposal while the first one is still being generated.
let (output_sender_1, _rec_1) = output_streaming();
let another_generate_request = proposal_manager
.build_block_proposal(ProposalId(1), None, proposal_deadline(), output_sender_1)
.build_block_proposal(
ProposalId(1),
None,
proposal_deadline(),
output_sender_1,
tx_provider_1,
)
.await;
assert_matches!(
another_generate_request,
Expand All @@ -229,11 +259,12 @@ async fn multiple_proposals_generation_fail(mut mock_dependencies: MockDependenc
async fn test_take_proposal_result_no_active_proposal(mut mock_dependencies: MockDependencies) {
mock_dependencies.expect_build_block(1);

let tx_provider = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();

build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider, ProposalId(0)).await;

let expected_proposal_output =
ProposalOutput::from(BlockExecutionArtifacts::create_for_testing());
Expand All @@ -255,14 +286,22 @@ async fn test_abort_and_restart_height(mut mock_dependencies: MockDependencies)

// Start a new height and create a proposal.
let (output_tx_sender, _receiver) = output_streaming();
let tx_provider_0 = propose_tx_provider(&mock_dependencies);
let tx_provider_1 = propose_tx_provider(&mock_dependencies);
let mut proposal_manager = init_proposal_manager(mock_dependencies);
proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();
build_and_await_block_proposal(&mut proposal_manager, ProposalId(0)).await;
build_and_await_block_proposal(&mut proposal_manager, tx_provider_0, ProposalId(0)).await;

// Start a new proposal, which will remain active.
assert!(
proposal_manager
.build_block_proposal(ProposalId(1), None, proposal_deadline(), output_tx_sender)
.build_block_proposal(
ProposalId(1),
None,
proposal_deadline(),
output_tx_sender,
tx_provider_1
)
.await
.is_ok()
);
Expand Down
Loading