Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batcher): add validate flow to proposal manager #1951

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion crates/starknet_batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::proposal_manager::{
StartHeightError,
};
use crate::test_utils::test_txs;
use crate::transaction_provider::ProposeTransactionProvider;
use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider};

const INITIAL_HEIGHT: BlockNumber = BlockNumber(3);
const STREAMING_CHUNK_SIZE: usize = 3;
Expand Down Expand Up @@ -254,6 +254,14 @@ trait ProposalManagerTraitWrapper: Send + Sync {
tx_provider: ProposeTransactionProvider,
) -> BoxFuture<'_, Result<(), GenerateProposalError>>;

fn wrap_validate_block_proposal(
&mut self,
proposal_id: ProposalId,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_provider: ValidateTransactionProvider,
) -> BoxFuture<'_, Result<(), GenerateProposalError>>;

fn wrap_take_proposal_result(
&mut self,
proposal_id: ProposalId,
Expand Down Expand Up @@ -291,6 +299,22 @@ impl<T: ProposalManagerTraitWrapper> ProposalManagerTrait for T {
.await
}

async fn validate_block_proposal(
&mut self,
proposal_id: ProposalId,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_provider: ValidateTransactionProvider,
) -> Result<(), GenerateProposalError> {
self.wrap_validate_block_proposal(
proposal_id,
retrospective_block_hash,
deadline,
tx_provider,
)
.await
}

async fn take_proposal_result(
&mut self,
proposal_id: ProposalId,
Expand Down
43 changes: 42 additions & 1 deletion crates/starknet_batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::block_builder::{
BlockExecutionArtifacts,
BlockMetadata,
};
use crate::transaction_provider::ProposeTransactionProvider;
use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider};

#[derive(Debug, Error)]
pub enum StartHeightError {
Expand Down Expand Up @@ -87,6 +87,16 @@ pub trait ProposalManagerTrait: Send + Sync {
tx_provider: ProposeTransactionProvider,
) -> Result<(), GenerateProposalError>;

// TODO: delete allow dead code once the batcher uses this code.
#[allow(dead_code)]
async fn validate_block_proposal(
&mut self,
proposal_id: ProposalId,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_provider: ValidateTransactionProvider,
) -> Result<(), GenerateProposalError>;

async fn take_proposal_result(
&mut self,
proposal_id: ProposalId,
Expand Down Expand Up @@ -193,6 +203,37 @@ impl ProposalManagerTrait for ProposalManager {
Ok(())
}

/// Starts validation of a block proposal for the given proposal_id and height with
/// transactions from tx_receiver channel.
#[instrument(skip(self, tx_provider), err, fields(self.active_height))]
async fn validate_block_proposal(
&mut self,
proposal_id: ProposalId,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_provider: ValidateTransactionProvider,
) -> Result<(), GenerateProposalError> {
self.set_active_proposal(proposal_id).await?;

info!("Starting validation of proposal with id {}.", proposal_id);

// Create the block builder, and a channel to allow aborting the block building task.
let (_abort_signal_sender, abort_signal_receiver) = tokio::sync::oneshot::channel();
let height = self.active_height.expect("No active height.");

let block_builder = self.block_builder_factory.create_block_builder(
BlockMetadata { height, retrospective_block_hash },
BlockBuilderExecutionParams { deadline, fail_on_err: true },
Box::new(tx_provider),
None,
abort_signal_receiver,
)?;

self.spawn_build_block_task(proposal_id, block_builder).await;

Ok(())
}

async fn take_proposal_result(
&mut self,
proposal_id: ProposalId,
Expand Down
93 changes: 70 additions & 23 deletions crates/starknet_batcher/src/proposal_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ use crate::proposal_manager::{
ProposalOutput,
StartHeightError,
};
use crate::transaction_provider::{MockL1ProviderClient, ProposeTransactionProvider};
use crate::transaction_provider::{
MockL1ProviderClient,
ProposeTransactionProvider,
ValidateTransactionProvider,
};

const INITIAL_HEIGHT: BlockNumber = BlockNumber(3);
const BLOCK_GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1);
const MAX_L1_HANDLER_TXS_PER_BLOCK_PROPOSAL: usize = 3;
const INPUT_CHANNEL_SIZE: usize = 30;

#[fixture]
fn output_streaming() -> (
Expand Down Expand Up @@ -94,6 +99,14 @@ fn propose_tx_provider() -> ProposeTransactionProvider {
)
}

#[fixture]
fn validate_tx_provider() -> ValidateTransactionProvider {
ValidateTransactionProvider {
tx_receiver: tokio::sync::mpsc::channel(INPUT_CHANNEL_SIZE).1,
l1_provider_client: Arc::new(MockL1ProviderClient::new()),
}
}

fn proposal_manager(mock_dependencies: MockDependencies) -> ProposalManager {
ProposalManager::new(
Arc::new(mock_dependencies.block_builder_factory),
Expand All @@ -105,7 +118,7 @@ fn proposal_deadline() -> tokio::time::Instant {
tokio::time::Instant::now() + BLOCK_GENERATION_TIMEOUT
}

async fn build_and_await_block_proposal(
async fn build_proposal(
proposal_manager: &mut ProposalManager,
tx_provider: ProposeTransactionProvider,
proposal_id: ProposalId,
Expand All @@ -119,6 +132,19 @@ async fn build_and_await_block_proposal(
assert!(proposal_manager.await_active_proposal().await);
}

async fn validate_proposal(
proposal_manager: &mut ProposalManager,
tx_provider: ValidateTransactionProvider,
proposal_id: ProposalId,
) {
proposal_manager
.validate_block_proposal(proposal_id, None, proposal_deadline(), tx_provider)
.await
.unwrap();

assert!(proposal_manager.await_active_proposal().await);
}

#[rstest]
#[case::height_already_passed(
INITIAL_HEIGHT.prev().unwrap(),
Expand Down Expand Up @@ -146,13 +172,13 @@ async fn start_height(
) {
let mut proposal_manager = proposal_manager(mock_dependencies);
let result = proposal_manager.start_height(height).await;
// Unfortunatelly ProposalManagerError doesn't implement PartialEq.
// Unfortunately ProposalManagerError doesn't implement PartialEq.
assert_eq!(format!("{:?}", result), format!("{:?}", expected_result));
}

#[rstest]
#[tokio::test]
async fn proposal_generation_fails_without_start_height(
async fn build_proposal_fails_without_start_height(
mock_dependencies: MockDependencies,
propose_tx_provider: ProposeTransactionProvider,
output_streaming: (
Expand All @@ -175,15 +201,43 @@ async fn proposal_generation_fails_without_start_height(

#[rstest]
#[tokio::test]
async fn proposal_generation_success(
async fn validate_proposal_fails_without_start_height(
mock_dependencies: MockDependencies,
validate_tx_provider: ValidateTransactionProvider,
) {
let mut proposal_manager = proposal_manager(mock_dependencies);
let err = proposal_manager
.validate_block_proposal(ProposalId(0), None, proposal_deadline(), validate_tx_provider)
.await;
assert_matches!(err, Err(GenerateProposalError::NoActiveHeight));
}

#[rstest]
#[tokio::test]
async fn build_proposal_success(
mut mock_dependencies: MockDependencies,
propose_tx_provider: ProposeTransactionProvider,
) {
mock_dependencies.expect_build_block(1);
let mut proposal_manager = proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();
build_and_await_block_proposal(&mut proposal_manager, propose_tx_provider, ProposalId(0)).await;
build_proposal(&mut proposal_manager, propose_tx_provider, ProposalId(0)).await;
proposal_manager.take_proposal_result(ProposalId(0)).await.unwrap();
}

#[rstest]
#[tokio::test]
async fn validate_proposal_success(
mut mock_dependencies: MockDependencies,
validate_tx_provider: ValidateTransactionProvider,
) {
mock_dependencies.expect_build_block(1);
let mut proposal_manager = proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();
validate_proposal(&mut proposal_manager, validate_tx_provider, ProposalId(0)).await;
proposal_manager.take_proposal_result(ProposalId(0)).await.unwrap();
}

#[rstest]
Expand All @@ -192,20 +246,17 @@ async fn consecutive_proposal_generations_success(
mut mock_dependencies: MockDependencies,
propose_tx_provider: ProposeTransactionProvider,
) {
mock_dependencies.expect_build_block(2);
mock_dependencies.expect_build_block(4);
let mut proposal_manager = proposal_manager(mock_dependencies);

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();

// Generate two consecutive proposals (awaiting on them to make sure they finished
// successfully).
build_and_await_block_proposal(
&mut proposal_manager,
propose_tx_provider.clone(),
ProposalId(0),
)
.await;
build_and_await_block_proposal(&mut proposal_manager, propose_tx_provider, ProposalId(1)).await;
// Build and validate multiple proposals consecutively (awaiting on them to
// make sure they finished successfully).
build_proposal(&mut proposal_manager, propose_tx_provider.clone(), ProposalId(0)).await;
validate_proposal(&mut proposal_manager, validate_tx_provider(), ProposalId(1)).await;
build_proposal(&mut proposal_manager, propose_tx_provider, ProposalId(2)).await;
validate_proposal(&mut proposal_manager, validate_tx_provider(), ProposalId(3)).await;
}

// This test checks that trying to generate a proposal while another one is being generated will
Expand Down Expand Up @@ -267,7 +318,7 @@ async fn test_take_proposal_result_no_active_proposal(

proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();

build_and_await_block_proposal(&mut proposal_manager, propose_tx_provider, ProposalId(0)).await;
build_proposal(&mut proposal_manager, propose_tx_provider, ProposalId(0)).await;

let expected_proposal_output =
ProposalOutput::from(BlockExecutionArtifacts::create_for_testing());
Expand All @@ -294,12 +345,8 @@ async fn test_abort_and_restart_height(
// Start a new height and create a proposal.
let (output_tx_sender, _receiver) = output_streaming();
proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap();
build_and_await_block_proposal(
&mut proposal_manager,
propose_tx_provider.clone(),
ProposalId(0),
)
.await;

build_proposal(&mut proposal_manager, propose_tx_provider.clone(), ProposalId(0)).await;

// Start a new proposal, which will remain active.
assert!(
Expand Down
Loading