diff --git a/crates/starknet_batcher/src/batcher.rs b/crates/starknet_batcher/src/batcher.rs index c6188a96de..5411ee3501 100644 --- a/crates/starknet_batcher/src/batcher.rs +++ b/crates/starknet_batcher/src/batcher.rs @@ -17,6 +17,7 @@ use starknet_batcher_types::batcher_types::{ GetProposalContent, GetProposalContentInput, GetProposalContentResponse, + ProposalCommitment, ProposalId, ProposalStatus, ProposeBlockInput, @@ -42,11 +43,11 @@ use crate::block_builder::{ use crate::config::BatcherConfig; use crate::proposal_manager::{ GenerateProposalError, - InternalProposalStatus, ProposalError, ProposalManager, ProposalManagerTrait, ProposalOutput, + ProposalResult, }; use crate::transaction_provider::{ DummyL1ProviderClient, @@ -224,75 +225,74 @@ impl Batcher { &mut self, send_proposal_content_input: SendProposalContentInput, ) -> BatcherResult { - // TODO(Dafna): this method should return an meaningful error if the given proposal_id - // is of a proposed block (and not validated). Currently it panics or returns a - // wrong error. - let proposal_id = send_proposal_content_input.proposal_id; + if !self.validate_tx_streams.contains_key(&proposal_id) { + return Err(BatcherError::ProposalNotFound { proposal_id }); + } match send_proposal_content_input.content { - SendProposalContent::Txs(txs) => self.send_txs_and_get_status(proposal_id, txs).await, - SendProposalContent::Finish => { - self.close_tx_channel_and_get_commitment(proposal_id).await - } - SendProposalContent::Abort => { - self.proposal_manager.abort_proposal(proposal_id).await; - Ok(SendProposalContentResponse { response: ProposalStatus::Aborted }) - } + SendProposalContent::Txs(txs) => self.handle_send_txs_request(proposal_id, txs).await, + SendProposalContent::Finish => self.handle_finish_proposal_request(proposal_id).await, + SendProposalContent::Abort => self.handle_abort_proposal_request(proposal_id).await, } } - async fn send_txs_and_get_status( + async fn handle_send_txs_request( &mut self, proposal_id: ProposalId, txs: Vec, ) -> BatcherResult { - match self.proposal_manager.get_proposal_status(proposal_id).await { - InternalProposalStatus::Processing => { - let tx_provider_sender = &self - .validate_tx_streams - .get(&proposal_id) - .expect("Expecting tx_provider_sender to exist during batching."); - for tx in txs { - tx_provider_sender.send(tx).await.map_err(|err| { - error!("Failed to send transaction to the tx provider: {}", err); - BatcherError::InternalError - })?; - } - Ok(SendProposalContentResponse { response: ProposalStatus::Processing }) + if self.is_active(proposal_id).await { + // The proposal is active. Send the transactions through the tx provider. + let tx_provider_sender = &self + .validate_tx_streams + .get(&proposal_id) + .expect("Expecting tx_provider_sender to exist during batching."); + for tx in txs { + tx_provider_sender.send(tx).await.map_err(|err| { + error!("Failed to send transaction to the tx provider: {}", err); + BatcherError::InternalError + })?; } - // Proposal Got an Error while processing transactions. - InternalProposalStatus::Failed => { - Ok(SendProposalContentResponse { response: ProposalStatus::InvalidProposal }) - } - InternalProposalStatus::Finished => { - Err(BatcherError::ProposalAlreadyFinished { proposal_id }) - } - InternalProposalStatus::NotFound => Err(BatcherError::ProposalNotFound { proposal_id }), + return Ok(SendProposalContentResponse { response: ProposalStatus::Processing }); + } + + // The proposal is no longer active, can't send the transactions. + let proposal_result = + self.get_completed_proposal_result(proposal_id).await.expect("Proposal should exist."); + match proposal_result { + Ok(_) => Err(BatcherError::ProposalAlreadyFinished { proposal_id }), + Err(err) => Ok(SendProposalContentResponse { response: proposal_status_from(err)? }), } } - async fn close_tx_channel_and_get_commitment( + async fn handle_finish_proposal_request( &mut self, proposal_id: ProposalId, ) -> BatcherResult { debug!("Send proposal content done for {}", proposal_id); self.close_input_transaction_stream(proposal_id)?; + if self.is_active(proposal_id).await { + self.proposal_manager.await_active_proposal().await; + } - let response = match self.proposal_manager.await_proposal_commitment(proposal_id).await { - Some(Ok(proposal_commitment)) => ProposalStatus::Finished(proposal_commitment), - Some(Err(ProposalError::BlockBuilderError(err))) => match err.as_ref() { - BlockBuilderError::FailOnError(_) => ProposalStatus::InvalidProposal, - _ => return Err(BatcherError::InternalError), - }, - Some(Err(ProposalError::Aborted)) => return Err(BatcherError::ProposalAborted), - None => { - panic!("Proposal {} should exist in the proposal manager.", proposal_id) - } + let proposal_result = + self.get_completed_proposal_result(proposal_id).await.expect("Proposal should exist."); + let proposal_status = match proposal_result { + Ok(commitment) => ProposalStatus::Finished(commitment), + Err(err) => proposal_status_from(err)?, }; + Ok(SendProposalContentResponse { response: proposal_status }) + } - Ok(SendProposalContentResponse { response }) + async fn handle_abort_proposal_request( + &mut self, + proposal_id: ProposalId, + ) -> BatcherResult { + self.proposal_manager.abort_proposal(proposal_id).await; + self.close_input_transaction_stream(proposal_id)?; + Ok(SendProposalContentResponse { response: ProposalStatus::Aborted }) } fn close_input_transaction_stream(&mut self, proposal_id: ProposalId) -> BatcherResult<()> { @@ -337,14 +337,12 @@ impl Batcher { // TODO: Consider removing the proposal from the proposal manager and keep it in the batcher // for decision reached. self.propose_tx_streams.remove(&proposal_id); - let proposal_commitment = self - .proposal_manager - .await_proposal_commitment(proposal_id) + let commitment = self + .get_completed_proposal_result(proposal_id) .await - .ok_or(BatcherError::ProposalNotFound { proposal_id })??; - Ok(GetProposalContentResponse { - content: GetProposalContent::Finished(proposal_commitment), - }) + .expect("Proposal should exist.")?; + + Ok(GetProposalContentResponse { content: GetProposalContent::Finished(commitment) }) } #[instrument(skip(self), err)] @@ -379,6 +377,27 @@ impl Batcher { } Ok(()) } + + async fn is_active(&self, proposal_id: ProposalId) -> bool { + self.proposal_manager.get_active_proposal().await == Some(proposal_id) + } + + // Returns a completed proposal result, either its commitment or an error if the proposal + // failed. If the proposal doesn't exist, or it's still active, returns None. + async fn get_completed_proposal_result( + &self, + proposal_id: ProposalId, + ) -> Option> { + let completed_proposals = self.proposal_manager.get_completed_proposals().await; + let guard = completed_proposals.lock().await; + let proposal_result = guard.get(&proposal_id); + + match proposal_result { + Some(Ok(output)) => Some(Ok(output.commitment)), + Some(Err(e)) => Some(Err(e.clone())), + None => None, + } + } } pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher { @@ -501,3 +520,19 @@ fn verify_block_number(height: BlockNumber, block_number: BlockNumber) -> Batche } Ok(()) } + +// Return the appropriate ProposalStatus for a given ProposalError. +fn proposal_status_from(proposal_error: ProposalError) -> BatcherResult { + match proposal_error { + ProposalError::BlockBuilderError(err) => { + if let BlockBuilderError::FailOnError(_) = err.as_ref() { + // The proposal either failed due to bad input (e.g. invalid transactions), or + // couldn't finish in time. + Ok(ProposalStatus::InvalidProposal) + } else { + Err(BatcherError::InternalError) + } + } + ProposalError::Aborted => Err(BatcherError::ProposalAborted), + } +} diff --git a/crates/starknet_batcher/src/batcher_test.rs b/crates/starknet_batcher/src/batcher_test.rs index 78ac8e0989..0b968dfadd 100644 --- a/crates/starknet_batcher/src/batcher_test.rs +++ b/crates/starknet_batcher/src/batcher_test.rs @@ -37,6 +37,7 @@ use starknet_batcher_types::batcher_types::{ use starknet_batcher_types::errors::BatcherError; use starknet_mempool_types::communication::MockMempoolClient; use starknet_mempool_types::mempool_types::CommitBlockArgs; +use tokio::sync::Mutex; use crate::batcher::{Batcher, MockBatcherStorageReaderTrait, MockBatcherStorageWriterTrait}; use crate::block_builder::{ @@ -50,7 +51,6 @@ use crate::block_builder::{ use crate::config::BatcherConfig; use crate::proposal_manager::{ GenerateProposalError, - InternalProposalStatus, ProposalError, ProposalManagerTrait, ProposalOutput, @@ -74,6 +74,10 @@ fn proposal_commitment() -> ProposalCommitment { } } +fn proposal_output() -> ProposalOutput { + ProposalOutput { commitment: proposal_commitment(), ..Default::default() } +} + fn deadline() -> chrono::DateTime { chrono::Utc::now() + BLOCK_GENERATION_TIMEOUT } @@ -96,6 +100,12 @@ fn validate_block_input() -> ValidateBlockInput { } } +fn invalid_proposal_result() -> ProposalResult { + Err(ProposalError::BlockBuilderError(Arc::new(BlockBuilderError::FailOnError( + FailOnErrorCause::BlockFull, + )))) +} + struct MockDependencies { storage_reader: MockBatcherStorageReaderTrait, storage_writer: MockBatcherStorageWriterTrait, @@ -167,30 +177,57 @@ fn mock_create_builder_for_propose_block( block_builder_factory } -fn mock_proposal_manager_common_expectations( - proposal_manager: &mut MockProposalManagerTraitWrapper, -) { +fn mock_start_proposal(proposal_manager: &mut MockProposalManagerTraitWrapper) { proposal_manager.expect_wrap_reset().times(1).return_once(|| async {}.boxed()); proposal_manager - .expect_wrap_await_proposal_commitment() + .expect_wrap_spawn_proposal() .times(1) - .with(eq(PROPOSAL_ID)) - .return_once(move |_| { async move { Some(Ok(proposal_commitment())) } }.boxed()); + .with(eq(PROPOSAL_ID), always(), always()) + .return_once(|_, _, _| { async move { Ok(()) } }.boxed()); +} + +fn mock_completed_proposal( + proposal_manager: &mut MockProposalManagerTraitWrapper, + proposal_result: ProposalResult, +) { + proposal_manager.expect_wrap_get_completed_proposals().times(1).return_once(move || { + async move { Arc::new(Mutex::new(HashMap::from([(PROPOSAL_ID, proposal_result)]))) }.boxed() + }); +} + +async fn batcher_with_validated_proposal( + proposal_result: ProposalResult, +) -> Batcher { + let block_builder_factory = mock_create_builder_for_validate_block(); + let mut proposal_manager = MockProposalManagerTraitWrapper::new(); + mock_start_proposal(&mut proposal_manager); + mock_completed_proposal(&mut proposal_manager, proposal_result); + proposal_manager.expect_wrap_get_active_proposal().returning(|| async move { None }.boxed()); + + let mut batcher = create_batcher(MockDependencies { + proposal_manager, + block_builder_factory, + ..Default::default() + }); + + batcher.start_height(StartHeightInput { height: INITIAL_HEIGHT }).await.unwrap(); + + batcher.validate_block(validate_block_input()).await.unwrap(); + + batcher } fn mock_proposal_manager_validate_flow() -> MockProposalManagerTraitWrapper { let mut proposal_manager = MockProposalManagerTraitWrapper::new(); - mock_proposal_manager_common_expectations(&mut proposal_manager); + mock_start_proposal(&mut proposal_manager); proposal_manager - .expect_wrap_spawn_proposal() - .times(1) - .with(eq(PROPOSAL_ID), always(), always()) - .return_once(|_, _, _| { async move { Ok(()) } }.boxed()); + .expect_wrap_get_active_proposal() + .returning(|| async move { Some(PROPOSAL_ID) }.boxed()); proposal_manager - .expect_wrap_get_proposal_status() + .expect_wrap_await_active_proposal() .times(1) - .with(eq(PROPOSAL_ID)) - .returning(move |_| async move { InternalProposalStatus::Processing }.boxed()); + .returning(|| async move { true }.boxed()); + mock_completed_proposal(&mut proposal_manager, Ok(proposal_output())); proposal_manager } @@ -293,14 +330,8 @@ async fn validate_block_full_flow() { #[rstest] #[tokio::test] async fn send_content_after_proposal_already_finished() { - let mut proposal_manager = MockProposalManagerTraitWrapper::new(); - proposal_manager - .expect_wrap_get_proposal_status() - .with(eq(PROPOSAL_ID)) - .times(1) - .returning(|_| async move { InternalProposalStatus::Finished }.boxed()); - - let mut batcher = create_batcher(MockDependencies { proposal_manager, ..Default::default() }); + let successful_proposal_result = Ok(proposal_output()); + let mut batcher = batcher_with_validated_proposal(successful_proposal_result).await; // Send transactions after the proposal has finished. let send_proposal_input_txs = SendProposalContentInput { @@ -314,14 +345,7 @@ async fn send_content_after_proposal_already_finished() { #[rstest] #[tokio::test] async fn send_content_to_unknown_proposal() { - let mut proposal_manager = MockProposalManagerTraitWrapper::new(); - proposal_manager - .expect_wrap_get_proposal_status() - .times(1) - .with(eq(PROPOSAL_ID)) - .return_once(move |_| async move { InternalProposalStatus::NotFound }.boxed()); - - let mut batcher = create_batcher(MockDependencies { proposal_manager, ..Default::default() }); + let mut batcher = create_batcher(MockDependencies::default()); // Send transactions to an unknown proposal. let send_proposal_input_txs = SendProposalContentInput { @@ -341,14 +365,7 @@ async fn send_content_to_unknown_proposal() { #[rstest] #[tokio::test] async fn send_txs_to_an_invalid_proposal() { - let mut proposal_manager = MockProposalManagerTraitWrapper::new(); - proposal_manager - .expect_wrap_get_proposal_status() - .times(1) - .with(eq(PROPOSAL_ID)) - .return_once(move |_| async move { InternalProposalStatus::Failed }.boxed()); - - let mut batcher = create_batcher(MockDependencies { proposal_manager, ..Default::default() }); + let mut batcher = batcher_with_validated_proposal(invalid_proposal_result()).await; let send_proposal_input_txs = SendProposalContentInput { proposal_id: PROPOSAL_ID, @@ -361,31 +378,7 @@ async fn send_txs_to_an_invalid_proposal() { #[rstest] #[tokio::test] async fn send_finish_to_an_invalid_proposal() { - let block_builder_factory = mock_create_builder_for_validate_block(); - let mut proposal_manager = MockProposalManagerTraitWrapper::new(); - proposal_manager.expect_wrap_reset().times(1).return_once(|| async {}.boxed()); - proposal_manager - .expect_wrap_spawn_proposal() - .times(1) - .with(eq(PROPOSAL_ID), always(), always()) - .return_once(|_, _, _| { async move { Ok(()) } }.boxed()); - - let proposal_error = ProposalError::BlockBuilderError(Arc::new( - BlockBuilderError::FailOnError(FailOnErrorCause::BlockFull), - )); - proposal_manager - .expect_wrap_await_proposal_commitment() - .times(1) - .with(eq(PROPOSAL_ID)) - .return_once(move |_| { async move { Some(Err(proposal_error)) } }.boxed()); - - let mut batcher = create_batcher(MockDependencies { - proposal_manager, - block_builder_factory, - ..Default::default() - }); - batcher.start_height(StartHeightInput { height: INITIAL_HEIGHT }).await.unwrap(); - batcher.validate_block(validate_block_input()).await.unwrap(); + let mut batcher = batcher_with_validated_proposal(invalid_proposal_result()).await; let send_proposal_input_txs = SendProposalContentInput { proposal_id: PROPOSAL_ID, content: SendProposalContent::Finish }; @@ -402,11 +395,8 @@ async fn propose_block_full_flow() { let block_builder_factory = mock_create_builder_for_propose_block(txs_to_stream); let mut proposal_manager = MockProposalManagerTraitWrapper::new(); - mock_proposal_manager_common_expectations(&mut proposal_manager); - proposal_manager - .expect_wrap_spawn_proposal() - .times(1) - .return_once(|_, _, _| { async move { Ok(()) } }.boxed()); + mock_start_proposal(&mut proposal_manager); + mock_completed_proposal(&mut proposal_manager, Ok(proposal_output())); let mut batcher = create_batcher(MockDependencies { proposal_manager, @@ -484,7 +474,7 @@ async fn propose_block_without_retrospective_block_hash() { #[tokio::test] async fn get_content_from_unknown_proposal() { let mut proposal_manager = MockProposalManagerTraitWrapper::new(); - proposal_manager.expect_wrap_await_proposal_commitment().times(0); + proposal_manager.expect_wrap_get_completed_proposals().times(0); let mut batcher = create_batcher(MockDependencies { proposal_manager, ..Default::default() }); @@ -577,16 +567,6 @@ trait ProposalManagerTraitWrapper: Send + Sync { fn wrap_await_active_proposal(&mut self) -> BoxFuture<'_, bool>; - fn wrap_get_proposal_status( - &self, - proposal_id: ProposalId, - ) -> BoxFuture<'_, InternalProposalStatus>; - - fn wrap_await_proposal_commitment( - &self, - proposal_id: ProposalId, - ) -> BoxFuture<'_, Option>>; - fn wrap_abort_proposal(&mut self, proposal_id: ProposalId) -> BoxFuture<'_, ()>; fn wrap_reset(&mut self) -> BoxFuture<'_, ()>; @@ -624,17 +604,6 @@ impl ProposalManagerTrait for T { self.wrap_await_active_proposal().await } - async fn get_proposal_status(&self, proposal_id: ProposalId) -> InternalProposalStatus { - self.wrap_get_proposal_status(proposal_id).await - } - - async fn await_proposal_commitment( - &mut self, - proposal_id: ProposalId, - ) -> Option> { - self.wrap_await_proposal_commitment(proposal_id).await - } - async fn abort_proposal(&mut self, proposal_id: ProposalId) { self.wrap_abort_proposal(proposal_id).await } diff --git a/crates/starknet_batcher/src/proposal_manager.rs b/crates/starknet_batcher/src/proposal_manager.rs index 038879facd..69cc25540e 100644 --- a/crates/starknet_batcher/src/proposal_manager.rs +++ b/crates/starknet_batcher/src/proposal_manager.rs @@ -40,13 +40,6 @@ pub enum ProposalError { Aborted, } -pub(crate) enum InternalProposalStatus { - Processing, - Finished, - Failed, - NotFound, -} - #[async_trait] pub trait ProposalManagerTrait: Send + Sync { async fn spawn_proposal( @@ -61,23 +54,14 @@ pub trait ProposalManagerTrait: Send + Sync { proposal_id: ProposalId, ) -> Option>; - #[allow(dead_code)] async fn get_active_proposal(&self) -> Option; - #[allow(dead_code)] async fn get_completed_proposals( &self, ) -> Arc>>>; async fn await_active_proposal(&mut self) -> bool; - async fn get_proposal_status(&self, proposal_id: ProposalId) -> InternalProposalStatus; - - async fn await_proposal_commitment( - &mut self, - proposal_id: ProposalId, - ) -> Option>; - async fn abort_proposal(&mut self, proposal_id: ProposalId); // Resets the proposal manager, aborting any active proposal. @@ -109,7 +93,7 @@ pub(crate) struct ProposalManager { pub type ProposalResult = Result; -#[derive(Debug, PartialEq)] +#[derive(Debug, Default, PartialEq)] pub struct ProposalOutput { pub state_diff: ThinStateDiff, pub commitment: ProposalCommitment, @@ -186,37 +170,6 @@ impl ProposalManagerTrait for ProposalManager { false } - // Returns None if the proposal does not exist, otherwise, returns the status of the proposal. - async fn get_proposal_status(&self, proposal_id: ProposalId) -> InternalProposalStatus { - match self.executed_proposals.lock().await.get(&proposal_id) { - Some(Ok(_)) => InternalProposalStatus::Finished, - Some(Err(_)) => InternalProposalStatus::Failed, - None => { - if self.active_proposal.lock().await.as_ref() == Some(&proposal_id) { - InternalProposalStatus::Processing - } else { - InternalProposalStatus::NotFound - } - } - } - } - - async fn await_proposal_commitment( - &mut self, - proposal_id: ProposalId, - ) -> Option> { - if *self.active_proposal.lock().await == Some(proposal_id) { - self.await_active_proposal().await; - } - let proposals = self.executed_proposals.lock().await; - let output = proposals.get(&proposal_id); - match output { - Some(Ok(output)) => Some(Ok(output.commitment)), - Some(Err(e)) => Some(Err(e.clone())), - None => None, - } - } - // Aborts the proposal with the given ID, if active. // Should be used in validate flow, if the consensus decides to abort the proposal. async fn abort_proposal(&mut self, proposal_id: ProposalId) {