From 2ff3249dba262aca44e6fc13136c385ace1c9ff7 Mon Sep 17 00:00:00 2001 From: dafnamatsry <92669167+dafnamatsry@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:21:00 +0200 Subject: [PATCH] refactor(batcher): move height mgmt out to the batcher (#2192) --- crates/starknet_batcher/src/batcher.rs | 59 +++++--- crates/starknet_batcher/src/batcher_test.rs | 131 +++++++++++++++--- .../starknet_batcher/src/proposal_manager.rs | 83 ++--------- .../src/proposal_manager_test.rs | 130 +++-------------- 4 files changed, 176 insertions(+), 227 deletions(-) diff --git a/crates/starknet_batcher/src/batcher.rs b/crates/starknet_batcher/src/batcher.rs index 5abd52ea1f..35b966b58c 100644 --- a/crates/starknet_batcher/src/batcher.rs +++ b/crates/starknet_batcher/src/batcher.rs @@ -39,7 +39,6 @@ use crate::proposal_manager::{ ProposalManager, ProposalManagerTrait, ProposalOutput, - StartHeightError, }; use crate::transaction_provider::{ DummyL1ProviderClient, @@ -55,6 +54,8 @@ pub struct Batcher { pub storage_reader: Arc, pub storage_writer: Box, pub mempool_client: SharedMempoolClient, + + active_height: Option, proposal_manager: Box, propose_tx_streams: HashMap, validate_tx_streams: HashMap, @@ -73,16 +74,43 @@ impl Batcher { storage_reader, storage_writer, mempool_client, + active_height: None, proposal_manager, propose_tx_streams: HashMap::new(), validate_tx_streams: HashMap::new(), } } + #[instrument(skip(self), err)] pub async fn start_height(&mut self, input: StartHeightInput) -> BatcherResult<()> { + if self.active_height == Some(input.height) { + return Err(BatcherError::HeightInProgress); + } + + let storage_height = + self.storage_reader.height().map_err(|_| BatcherError::InternalError)?; + if storage_height < input.height { + return Err(BatcherError::StorageNotSynced { + storage_height, + requested_height: input.height, + }); + } + if storage_height > input.height { + return Err(BatcherError::HeightAlreadyPassed { + storage_height, + requested_height: input.height, + }); + } + + // Clear all the proposals from the previous height. + self.proposal_manager.reset().await; self.propose_tx_streams.clear(); self.validate_tx_streams.clear(); - self.proposal_manager.start_height(input.height).await.map_err(BatcherError::from) + + info!("Starting to work on height {}.", input.height); + self.active_height = Some(input.height); + + Ok(()) } #[instrument(skip(self), err)] @@ -90,6 +118,8 @@ impl Batcher { &mut self, propose_block_input: ProposeBlockInput, ) -> BatcherResult<()> { + let active_height = self.active_height.ok_or(BatcherError::NoActiveHeight)?; + let proposal_id = propose_block_input.proposal_id; let deadline = deadline_as_instant(propose_block_input.deadline)?; @@ -103,6 +133,7 @@ impl Batcher { self.proposal_manager .propose_block( + active_height, proposal_id, propose_block_input.retrospective_block_hash, deadline, @@ -120,6 +151,8 @@ impl Batcher { &mut self, validate_block_input: ValidateBlockInput, ) -> BatcherResult<()> { + let active_height = self.active_height.ok_or(BatcherError::NoActiveHeight)?; + let proposal_id = validate_block_input.proposal_id; let deadline = deadline_as_instant(validate_block_input.deadline)?; @@ -133,6 +166,7 @@ impl Batcher { self.proposal_manager .validate_block( + active_height, proposal_id, validate_block_input.retrospective_block_hash, deadline, @@ -299,8 +333,7 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient }); let storage_reader = Arc::new(storage_reader); let storage_writer = Box::new(storage_writer); - let proposal_manager = - Box::new(ProposalManager::new(block_builder_factory, storage_reader.clone())); + let proposal_manager = Box::new(ProposalManager::new(block_builder_factory)); Batcher::new(config, storage_reader, storage_writer, mempool_client, proposal_manager) } @@ -336,24 +369,6 @@ impl BatcherStorageWriterTrait for papyrus_storage::StorageWriter { } } -impl From for BatcherError { - fn from(err: StartHeightError) -> Self { - match err { - StartHeightError::HeightAlreadyPassed { storage_height, requested_height } => { - BatcherError::HeightAlreadyPassed { storage_height, requested_height } - } - StartHeightError::StorageError(err) => { - error!("{}", err); - BatcherError::InternalError - } - StartHeightError::StorageNotSynced { storage_height, requested_height } => { - BatcherError::StorageNotSynced { storage_height, requested_height } - } - StartHeightError::HeightInProgress => BatcherError::HeightInProgress, - } - } -} - impl From for BatcherError { fn from(err: GenerateProposalError) -> Self { match err { diff --git a/crates/starknet_batcher/src/batcher_test.rs b/crates/starknet_batcher/src/batcher_test.rs index 499cbcefaf..1f12fa9b26 100644 --- a/crates/starknet_batcher/src/batcher_test.rs +++ b/crates/starknet_batcher/src/batcher_test.rs @@ -45,7 +45,6 @@ use crate::proposal_manager::{ ProposalManagerTrait, ProposalOutput, ProposalResult, - StartHeightError, }; use crate::test_utils::test_txs; use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider}; @@ -100,11 +99,7 @@ fn batcher(proposal_manager: MockProposalManagerTraitWrapper) -> Batcher { fn mock_proposal_manager_common_expectations( proposal_manager: &mut MockProposalManagerTraitWrapper, ) { - proposal_manager - .expect_wrap_start_height() - .times(1) - .with(eq(INITIAL_HEIGHT)) - .return_once(|_| async { Ok(()) }.boxed()); + proposal_manager.expect_wrap_reset().times(1).return_once(|| async {}.boxed()); proposal_manager .expect_wrap_await_proposal_commitment() .times(1) @@ -118,8 +113,8 @@ fn mock_proposal_manager_validate_flow() -> MockProposalManagerTraitWrapper { proposal_manager .expect_wrap_validate_block() .times(1) - .with(eq(PROPOSAL_ID), eq(None), always(), always()) - .return_once(|_, _, _, tx_provider| { + .with(eq(INITIAL_HEIGHT), eq(PROPOSAL_ID), eq(None), always(), always()) + .return_once(|_, _, _, _, tx_provider| { { async move { // Spawn a task to keep tx_provider alive until the transactions sender is @@ -144,6 +139,89 @@ fn mock_proposal_manager_validate_flow() -> MockProposalManagerTraitWrapper { proposal_manager } +#[rstest] +#[case::height_already_passed( + INITIAL_HEIGHT.prev().unwrap(), + Result::Err(BatcherError::HeightAlreadyPassed { + storage_height: INITIAL_HEIGHT, + requested_height: INITIAL_HEIGHT.prev().unwrap() + } +))] +#[case::happy( + INITIAL_HEIGHT, + Result::Ok(()) +)] +#[case::storage_not_synced( + INITIAL_HEIGHT.unchecked_next(), + Result::Err(BatcherError::StorageNotSynced { + storage_height: INITIAL_HEIGHT, + requested_height: INITIAL_HEIGHT.unchecked_next() + } +))] +#[tokio::test] +async fn start_height( + #[case] height: BlockNumber, + #[case] expected_result: Result<(), BatcherError>, +) { + let mut proposal_manager = MockProposalManagerTraitWrapper::new(); + let reset_times = if expected_result.is_ok() { 1 } else { 0 }; + proposal_manager.expect_wrap_reset().times(reset_times).returning(|| async {}.boxed()); + + let mut batcher = batcher(proposal_manager); + let result = batcher.start_height(StartHeightInput { height }).await; + assert_eq!(format!("{:?}", result), format!("{:?}", expected_result)); +} + +#[rstest] +#[tokio::test] +async fn duplicate_start_height() { + let mut proposal_manager = MockProposalManagerTraitWrapper::new(); + proposal_manager.expect_wrap_reset().times(1).return_once(|| async {}.boxed()); + + let mut batcher = batcher(proposal_manager); + + assert_matches!( + batcher.start_height(StartHeightInput { height: INITIAL_HEIGHT }).await, + Ok(()) + ); + assert_matches!( + batcher.start_height(StartHeightInput { height: INITIAL_HEIGHT }).await, + Err(BatcherError::HeightInProgress) + ); +} + +#[rstest] +#[tokio::test] +async fn propose_block_fails_without_start_height() { + let proposal_manager = MockProposalManagerTraitWrapper::new(); + let mut batcher = batcher(proposal_manager); + + let result = batcher + .propose_block(ProposeBlockInput { + proposal_id: ProposalId(0), + retrospective_block_hash: None, + deadline: chrono::Utc::now() + chrono::Duration::seconds(1), + }) + .await; + assert_matches!(result, Err(BatcherError::NoActiveHeight)); +} + +#[rstest] +#[tokio::test] +async fn validate_proposal_fails_without_start_height() { + let proposal_manager = MockProposalManagerTraitWrapper::new(); + let mut batcher = batcher(proposal_manager); + + let err = batcher + .validate_block(ValidateBlockInput { + proposal_id: ProposalId(0), + retrospective_block_hash: None, + deadline: chrono::Utc::now() + chrono::Duration::seconds(1), + }) + .await; + assert_matches!(err, Err(BatcherError::NoActiveHeight)); +} + #[rstest] #[tokio::test] async fn validate_block_full_flow() { @@ -254,11 +332,12 @@ async fn send_txs_to_an_invalid_proposal() { #[tokio::test] async fn send_finish_to_an_invalid_proposal() { let mut proposal_manager = MockProposalManagerTraitWrapper::new(); + proposal_manager.expect_wrap_reset().times(1).return_once(|| async {}.boxed()); proposal_manager .expect_wrap_validate_block() .times(1) - .with(eq(PROPOSAL_ID), eq(None), always(), always()) - .return_once(|_, _, _, _| { async move { Ok(()) } }.boxed()); + .with(eq(INITIAL_HEIGHT), eq(PROPOSAL_ID), eq(None), always(), always()) + .return_once(|_, _, _, _, _| { async move { Ok(()) } }.boxed()); let proposal_error = GetProposalResultError::BlockBuilderError(Arc::new( BlockBuilderError::FailOnError(FailOnErrorCause::BlockFull), @@ -270,6 +349,7 @@ async fn send_finish_to_an_invalid_proposal() { .return_once(move |_| { async move { Err(proposal_error) } }.boxed()); let mut batcher = batcher(proposal_manager); + batcher.start_height(StartHeightInput { height: INITIAL_HEIGHT }).await.unwrap(); let validate_block_input = ValidateBlockInput { proposal_id: PROPOSAL_ID, @@ -294,7 +374,7 @@ async fn propose_block_full_flow() { let mut proposal_manager = MockProposalManagerTraitWrapper::new(); mock_proposal_manager_common_expectations(&mut proposal_manager); proposal_manager.expect_wrap_propose_block().times(1).return_once( - move |_proposal_id, _block_hash, _deadline, tx_sender, _tx_provider| { + move |_height, _proposal_id, _block_hash, _deadline, tx_sender, _tx_provider| { simulate_build_block_proposal(tx_sender, txs_to_stream).boxed() }, ); @@ -437,13 +517,9 @@ async fn simulate_build_block_proposal( // A wrapper trait to allow mocking the ProposalManagerTrait in tests. #[automock] trait ProposalManagerTraitWrapper: Send + Sync { - fn wrap_start_height( - &mut self, - height: BlockNumber, - ) -> BoxFuture<'_, Result<(), StartHeightError>>; - fn wrap_propose_block( &mut self, + height: BlockNumber, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, @@ -453,6 +529,7 @@ trait ProposalManagerTraitWrapper: Send + Sync { fn wrap_validate_block( &mut self, + height: BlockNumber, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, @@ -475,16 +552,15 @@ trait ProposalManagerTraitWrapper: Send + Sync { ) -> BoxFuture<'_, ProposalResult>; fn wrap_abort_proposal(&mut self, proposal_id: ProposalId) -> BoxFuture<'_, ()>; + + fn wrap_reset(&mut self) -> BoxFuture<'_, ()>; } #[async_trait] impl ProposalManagerTrait for T { - async fn start_height(&mut self, height: BlockNumber) -> Result<(), StartHeightError> { - self.wrap_start_height(height).await - } - async fn propose_block( &mut self, + height: BlockNumber, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, @@ -492,6 +568,7 @@ impl ProposalManagerTrait for T { tx_provider: ProposeTransactionProvider, ) -> Result<(), GenerateProposalError> { self.wrap_propose_block( + height, proposal_id, retrospective_block_hash, deadline, @@ -503,12 +580,20 @@ impl ProposalManagerTrait for T { async fn validate_block( &mut self, + height: BlockNumber, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, tx_provider: ValidateTransactionProvider, ) -> Result<(), GenerateProposalError> { - self.wrap_validate_block(proposal_id, retrospective_block_hash, deadline, tx_provider).await + self.wrap_validate_block( + height, + proposal_id, + retrospective_block_hash, + deadline, + tx_provider, + ) + .await } async fn take_proposal_result( @@ -532,6 +617,10 @@ impl ProposalManagerTrait for T { async fn abort_proposal(&mut self, proposal_id: ProposalId) { self.wrap_abort_proposal(proposal_id).await } + + async fn reset(&mut self) { + self.wrap_reset().await + } } fn test_tx_hashes(range: std::ops::Range) -> HashSet { diff --git a/crates/starknet_batcher/src/proposal_manager.rs b/crates/starknet_batcher/src/proposal_manager.rs index 26ebbada3b..5f1fc2407c 100644 --- a/crates/starknet_batcher/src/proposal_manager.rs +++ b/crates/starknet_batcher/src/proposal_manager.rs @@ -14,7 +14,6 @@ use thiserror::Error; use tokio::sync::Mutex; use tracing::{debug, error, info, instrument, Instrument}; -use crate::batcher::BatcherStorageReaderTrait; use crate::block_builder::{ BlockBuilderError, BlockBuilderExecutionParams, @@ -25,24 +24,6 @@ use crate::block_builder::{ }; use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider}; -#[derive(Debug, Error)] -pub enum StartHeightError { - #[error( - "Requested height {requested_height} is lower than the current storage height \ - {storage_height}." - )] - HeightAlreadyPassed { storage_height: BlockNumber, requested_height: BlockNumber }, - #[error(transparent)] - StorageError(#[from] papyrus_storage::StorageError), - #[error( - "Storage is not synced. Storage height: {storage_height}, requested height: \ - {requested_height}." - )] - StorageNotSynced { storage_height: BlockNumber, requested_height: BlockNumber }, - #[error("Already working on height.")] - HeightInProgress, -} - #[derive(Debug, Error)] pub enum GenerateProposalError { #[error( @@ -80,10 +61,9 @@ pub(crate) enum InternalProposalStatus { #[async_trait] pub trait ProposalManagerTrait: Send + Sync { - async fn start_height(&mut self, height: BlockNumber) -> Result<(), StartHeightError>; - async fn propose_block( &mut self, + height: BlockNumber, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, @@ -93,6 +73,7 @@ pub trait ProposalManagerTrait: Send + Sync { async fn validate_block( &mut self, + height: BlockNumber, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, @@ -112,6 +93,9 @@ pub trait ProposalManagerTrait: Send + Sync { ) -> ProposalResult; async fn abort_proposal(&mut self, proposal_id: ProposalId); + + // Resets the proposal manager, aborting any active proposal. + async fn reset(&mut self); } // Represents a spawned task of building new block proposal. @@ -128,9 +112,6 @@ struct ProposalTask { /// /// Triggered by the consensus. pub(crate) struct ProposalManager { - storage_reader: Arc, - active_height: Option, - /// The block proposal that is currently being built, if any. /// At any given time, there can be only one proposal being actively executed (either proposed /// or validated). @@ -154,42 +135,13 @@ pub struct ProposalOutput { #[async_trait] impl ProposalManagerTrait for ProposalManager { - /// Starts working on the given height. - #[instrument(skip(self), err)] - async fn start_height(&mut self, height: BlockNumber) -> Result<(), StartHeightError> { - if self.active_height == Some(height) { - return Err(StartHeightError::HeightInProgress); - } - - let next_height = self.storage_reader.height()?; - if next_height < height { - error!( - "Storage is not synced. Storage height: {}, requested height: {}.", - next_height, height - ); - return Err(StartHeightError::StorageNotSynced { - storage_height: next_height, - requested_height: height, - }); - } - if next_height > height { - return Err(StartHeightError::HeightAlreadyPassed { - storage_height: next_height, - requested_height: height, - }); - } - - info!("Starting to work on height {}.", height); - self.reset_active_height(height).await; - Ok(()) - } - /// Starts a new block proposal generation task for the given proposal_id and height with /// transactions from the mempool. /// Requires tx_sender for sending the generated transactions to the caller. #[instrument(skip(self, tx_sender, tx_provider), err, fields(self.active_height))] async fn propose_block( &mut self, + height: BlockNumber, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, @@ -202,7 +154,6 @@ impl ProposalManagerTrait for ProposalManager { // Create the block builder, and a channel to allow aborting the block building task. let (abort_signal_sender, abort_signal_receiver) = tokio::sync::oneshot::channel(); - let height = self.active_height.expect("No active height."); let block_builder = self.block_builder_factory.create_block_builder( BlockMetadata { height, retrospective_block_hash }, @@ -223,6 +174,7 @@ impl ProposalManagerTrait for ProposalManager { #[instrument(skip(self, tx_provider), err, fields(self.active_height))] async fn validate_block( &mut self, + height: BlockNumber, proposal_id: ProposalId, retrospective_block_hash: Option, deadline: tokio::time::Instant, @@ -234,7 +186,6 @@ impl ProposalManagerTrait for ProposalManager { // Create the block builder, and a channel to allow aborting the block building task. let (abort_signal_sender, abort_signal_receiver) = tokio::sync::oneshot::channel(); - let height = self.active_height.expect("No active height."); let block_builder = self.block_builder_factory.create_block_builder( BlockMetadata { height, retrospective_block_hash }, @@ -304,19 +255,19 @@ impl ProposalManagerTrait for ProposalManager { .insert(proposal_id, Err(GetProposalResultError::Aborted)); } } + + async fn reset(&mut self) { + self.abort_active_proposal().await; + self.executed_proposals.lock().await.clear(); + } } impl ProposalManager { - pub fn new( - block_builder_factory: Arc, - storage_reader: Arc, - ) -> Self { + pub fn new(block_builder_factory: Arc) -> Self { Self { - storage_reader, active_proposal: Arc::new(Mutex::new(None)), block_builder_factory, active_proposal_task: None, - active_height: None, executed_proposals: Arc::new(Mutex::new(HashMap::new())), } } @@ -350,12 +301,6 @@ impl ProposalManager { ) } - async fn reset_active_height(&mut self, new_height: BlockNumber) { - self.abort_active_proposal().await; - self.executed_proposals.lock().await.clear(); - self.active_height = Some(new_height); - } - // Sets a new active proposal task. // Fails if either there is no active height, there is another proposal being generated, or a // proposal with the same ID already exists. @@ -363,8 +308,6 @@ impl ProposalManager { &mut self, proposal_id: ProposalId, ) -> Result<(), GenerateProposalError> { - self.active_height.ok_or(GenerateProposalError::NoActiveHeight)?; - if self.executed_proposals.lock().await.contains_key(&proposal_id) { return Err(GenerateProposalError::ProposalAlreadyExists { proposal_id }); } diff --git a/crates/starknet_batcher/src/proposal_manager_test.rs b/crates/starknet_batcher/src/proposal_manager_test.rs index a5a81b3375..b3e52da2e3 100644 --- a/crates/starknet_batcher/src/proposal_manager_test.rs +++ b/crates/starknet_batcher/src/proposal_manager_test.rs @@ -7,7 +7,6 @@ use starknet_api::executable_transaction::Transaction; use starknet_batcher_types::batcher_types::ProposalId; use starknet_mempool_types::communication::MockMempoolClient; -use crate::batcher::MockBatcherStorageReaderTrait; use crate::block_builder::{ BlockBuilderResult, BlockBuilderTrait, @@ -21,7 +20,6 @@ use crate::proposal_manager::{ ProposalManager, ProposalManagerTrait, ProposalOutput, - StartHeightError, }; use crate::transaction_provider::{ MockL1ProviderClient, @@ -30,7 +28,6 @@ use crate::transaction_provider::{ }; const INITIAL_HEIGHT: BlockNumber = BlockNumber(3); -const NEXT_HEIGHT: BlockNumber = BlockNumber(4); const BLOCK_GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1); const MAX_L1_HANDLER_TXS_PER_BLOCK_PROPOSAL: usize = 3; const INPUT_CHANNEL_SIZE: usize = 30; @@ -45,7 +42,6 @@ fn output_streaming() -> ( struct MockDependencies { block_builder_factory: MockBlockBuilderFactoryTrait, - storage_reader: MockBatcherStorageReaderTrait, } impl MockDependencies { @@ -82,20 +78,11 @@ impl MockDependencies { .times(times) .returning(move |_, _, _, _, _| simulate_long_build_block()); } - - fn expect_consecutive_storage_heights(&mut self) { - // First validate all expectations set so far, so we can add new ones. - self.storage_reader.checkpoint(); - self.storage_reader.expect_height().times(1).returning(|| Ok(INITIAL_HEIGHT)); - self.storage_reader.expect_height().times(1).returning(|| Ok(NEXT_HEIGHT)); - } } #[fixture] fn mock_dependencies() -> MockDependencies { - let mut storage_reader = MockBatcherStorageReaderTrait::new(); - storage_reader.expect_height().returning(|| Ok(INITIAL_HEIGHT)); - MockDependencies { block_builder_factory: MockBlockBuilderFactoryTrait::new(), storage_reader } + MockDependencies { block_builder_factory: MockBlockBuilderFactoryTrait::new() } } #[fixture] @@ -116,10 +103,7 @@ fn validate_tx_provider() -> ValidateTransactionProvider { } fn proposal_manager(mock_dependencies: MockDependencies) -> ProposalManager { - ProposalManager::new( - Arc::new(mock_dependencies.block_builder_factory), - Arc::new(mock_dependencies.storage_reader), - ) + ProposalManager::new(Arc::new(mock_dependencies.block_builder_factory)) } fn proposal_deadline() -> tokio::time::Instant { @@ -133,7 +117,14 @@ async fn propose_block_non_blocking( ) { let (output_sender, _receiver) = output_streaming(); proposal_manager - .propose_block(proposal_id, None, proposal_deadline(), output_sender, tx_provider) + .propose_block( + INITIAL_HEIGHT, + proposal_id, + None, + proposal_deadline(), + output_sender, + tx_provider, + ) .await .unwrap(); } @@ -153,91 +144,13 @@ async fn validate_block( proposal_id: ProposalId, ) { proposal_manager - .validate_block(proposal_id, None, proposal_deadline(), tx_provider) + .validate_block(INITIAL_HEIGHT, proposal_id, None, proposal_deadline(), tx_provider) .await .unwrap(); assert!(proposal_manager.await_active_proposal().await); } -#[rstest] -#[case::height_already_passed( - INITIAL_HEIGHT.prev().unwrap(), - Result::Err(StartHeightError::HeightAlreadyPassed { - storage_height: INITIAL_HEIGHT, - requested_height: INITIAL_HEIGHT.prev().unwrap() - } -))] -#[case::happy( - INITIAL_HEIGHT, - Result::Ok(()) -)] -#[case::storage_not_synced( - INITIAL_HEIGHT.unchecked_next(), - Result::Err(StartHeightError::StorageNotSynced { - storage_height: INITIAL_HEIGHT, - requested_height: INITIAL_HEIGHT.unchecked_next() - } -))] -#[tokio::test] -async fn start_height( - mock_dependencies: MockDependencies, - #[case] height: BlockNumber, - #[case] expected_result: Result<(), StartHeightError>, -) { - let mut proposal_manager = proposal_manager(mock_dependencies); - let result = proposal_manager.start_height(height).await; - // Unfortunately ProposalManagerError doesn't implement PartialEq. - assert_eq!(format!("{:?}", result), format!("{:?}", expected_result)); -} - -#[rstest] -#[tokio::test] -async fn duplicate_start_height(mock_dependencies: MockDependencies) { - let mut proposal_manager = proposal_manager(mock_dependencies); - assert_matches!(proposal_manager.start_height(INITIAL_HEIGHT).await, Ok(())); - assert_matches!( - proposal_manager.start_height(INITIAL_HEIGHT).await, - Err(StartHeightError::HeightInProgress) - ); -} - -#[rstest] -#[tokio::test] -async fn propose_block_fails_without_start_height( - mock_dependencies: MockDependencies, - propose_tx_provider: ProposeTransactionProvider, - output_streaming: ( - tokio::sync::mpsc::UnboundedSender, - tokio::sync::mpsc::UnboundedReceiver, - ), -) { - let mut proposal_manager = proposal_manager(mock_dependencies); - let err = proposal_manager - .propose_block( - ProposalId(0), - None, - proposal_deadline(), - output_streaming.0, - propose_tx_provider, - ) - .await; - assert_matches!(err, Err(GenerateProposalError::NoActiveHeight)); -} - -#[rstest] -#[tokio::test] -async fn validate_block_fails_without_start_height( - mock_dependencies: MockDependencies, - validate_tx_provider: ValidateTransactionProvider, -) { - let mut proposal_manager = proposal_manager(mock_dependencies); - let err = proposal_manager - .validate_block(ProposalId(0), None, proposal_deadline(), validate_tx_provider) - .await; - assert_matches!(err, Err(GenerateProposalError::NoActiveHeight)); -} - #[rstest] #[tokio::test] async fn propose_block_success( @@ -247,7 +160,6 @@ async fn propose_block_success( mock_dependencies.expect_build_block(1); let mut proposal_manager = proposal_manager(mock_dependencies); - proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); propose_block(&mut proposal_manager, propose_tx_provider, ProposalId(0)).await; proposal_manager.take_proposal_result(ProposalId(0)).await.unwrap(); } @@ -261,7 +173,6 @@ async fn validate_block_success( mock_dependencies.expect_build_block(1); let mut proposal_manager = proposal_manager(mock_dependencies); - proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); validate_block(&mut proposal_manager, validate_tx_provider, ProposalId(0)).await; proposal_manager.take_proposal_result(ProposalId(0)).await.unwrap(); } @@ -275,8 +186,6 @@ async fn consecutive_proposal_generations_success( mock_dependencies.expect_build_block(4); let mut proposal_manager = proposal_manager(mock_dependencies); - proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - // Build and validate multiple proposals consecutively (awaiting on them to // make sure they finished successfully). propose_block(&mut proposal_manager, propose_tx_provider.clone(), ProposalId(0)).await; @@ -298,12 +207,11 @@ async fn multiple_proposals_generation_fail( mock_dependencies.expect_long_build_block(1); let mut proposal_manager = proposal_manager(mock_dependencies); - proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - // Build a proposal that will take a very long time to finish. let (output_sender_0, _rec_0) = output_streaming(); proposal_manager .propose_block( + INITIAL_HEIGHT, ProposalId(0), None, proposal_deadline(), @@ -317,6 +225,7 @@ async fn multiple_proposals_generation_fail( let (output_sender_1, _rec_1) = output_streaming(); let another_generate_request = proposal_manager .propose_block( + INITIAL_HEIGHT, ProposalId(1), None, proposal_deadline(), @@ -342,8 +251,6 @@ async fn take_proposal_result_no_active_proposal( mock_dependencies.expect_build_block(1); let mut proposal_manager = proposal_manager(mock_dependencies); - proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); - propose_block(&mut proposal_manager, propose_tx_provider, ProposalId(0)).await; let expected_proposal_output = @@ -367,7 +274,6 @@ async fn abort_active_proposal( mock_dependencies.expect_long_build_block(1); let mut proposal_manager = proposal_manager(mock_dependencies); - proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); propose_block_non_blocking(&mut proposal_manager, propose_tx_provider, ProposalId(0)).await; proposal_manager.abort_proposal(ProposalId(0)).await; @@ -383,24 +289,20 @@ async fn abort_active_proposal( #[rstest] #[tokio::test] -async fn abort_and_start_new_height( +async fn reset( mut mock_dependencies: MockDependencies, propose_tx_provider: ProposeTransactionProvider, ) { - mock_dependencies.expect_consecutive_storage_heights(); - mock_dependencies.expect_build_block(1); mock_dependencies.expect_long_build_block(1); let mut proposal_manager = proposal_manager(mock_dependencies); - // Start the first height with 2 proposals. - proposal_manager.start_height(INITIAL_HEIGHT).await.unwrap(); + // Create 2 proposals, one will remain active. propose_block(&mut proposal_manager, propose_tx_provider.clone(), ProposalId(0)).await; propose_block_non_blocking(&mut proposal_manager, propose_tx_provider.clone(), ProposalId(1)) .await; - // Start a new height. This should abort and delete all existing proposals. - assert!(proposal_manager.start_height(NEXT_HEIGHT).await.is_ok()); + proposal_manager.reset().await; // Make sure executed proposals are deleted. assert_matches!(