diff --git a/crates/papyrus_common/src/metrics.rs b/crates/papyrus_common/src/metrics.rs index c618ab0678..b06f42fec8 100644 --- a/crates/papyrus_common/src/metrics.rs +++ b/crates/papyrus_common/src/metrics.rs @@ -38,5 +38,5 @@ pub const PAPYRUS_NUM_ACTIVE_OUTBOUND_SESSIONS: &str = "papyrus_num_active_outbo /// Global variable set by the main config to enable collecting profiling metrics. pub static COLLECT_PROFILING_METRICS: OnceLock = OnceLock::new(); -/// The height most recently decided by consensus. +/// The height consensus is currently working on. pub const PAPYRUS_CONSENSUS_HEIGHT: &str = "papyrus_consensus_height"; diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 8b28115ab3..fdf952c6f6 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,8 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; -use futures::future::BoxFuture; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; use papyrus_common::pending_classes::PendingClasses; @@ -22,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver; use papyrus_consensus::types::ConsensusError; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError}; +use papyrus_network::network_manager::NetworkManager; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_node::config::NodeConfig; use papyrus_node::version::VERSION_FULL; @@ -34,7 +33,6 @@ use papyrus_p2p_sync::client::{ }; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; -use papyrus_protobuf::consensus::ConsensusMessage; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -42,14 +40,14 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig}; use papyrus_sync::sources::pending::PendingSource; use papyrus_sync::{StateSync, StateSyncError, SyncConfig}; -use starknet_api::block::BlockHash; +use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::felt; use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; use starknet_client::reader::PendingData; use tokio::sync::RwLock; use tokio::task::{JoinError, JoinHandle}; use tracing::metadata::LevelFilter; -use tracing::{debug_span, error, info, warn, Instrument}; +use tracing::{debug, debug_span, error, info, warn, Instrument}; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; @@ -97,42 +95,62 @@ async fn create_rpc_server_future( } fn run_consensus( - config: ConsensusConfig, + config: Option, storage_reader: StorageReader, - consensus_channels: BroadcastSubscriberChannels, + maybe_network_manager: Option<&mut NetworkManager>, ) -> anyhow::Result>> { - let validator_id = config.validator_id; - info!("Running consensus as validator {validator_id}"); - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - consensus_channels.messages_to_broadcast_sender, - config.num_validators, - ); - let start_height = config.start_height; - match config.test { - Some(test_config) => { - let network_receiver = NetworkReceiver::new( - consensus_channels.broadcasted_messages_receiver, - test_config.cache_size, - test_config.random_seed, - test_config.drop_probability, - test_config.invalid_probability, - ); - Ok(tokio::spawn(papyrus_consensus::run_consensus( - context, - start_height, - validator_id, - config.consensus_delay, - network_receiver, - ))) - } - None => Ok(tokio::spawn(papyrus_consensus::run_consensus( + let (Some(config), Some(network_manager)) = (config, maybe_network_manager) else { + info!("Consensus is disabled."); + return Ok(tokio::spawn(pending())); + }; + debug!("Consensus config {:?}", config); + + let network_channels = + network_manager.register_broadcast_topic(Topic::new(config.topic), BUFFER_SIZE)?; + if let Some(test_config) = config.test { + // TODO(matan): Move topic to config. + let sync_channels = network_manager + .register_broadcast_topic(Topic::new("consensus_test_sync".to_string()), BUFFER_SIZE)?; + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender, + config.num_validators, + Some(sync_channels.messages_to_broadcast_sender), + ); + let sync_receiver = + sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { + BlockNumber(vote.expect("Sync channel should never have errors").height) + }); + let network_receiver = NetworkReceiver::new( + network_channels.broadcasted_messages_receiver, + test_config.cache_size, + test_config.random_seed, + test_config.drop_probability, + test_config.invalid_probability, + ); + Ok(tokio::spawn(papyrus_consensus::run_consensus( context, - start_height, - validator_id, + config.start_height, + config.validator_id, config.consensus_delay, - consensus_channels.broadcasted_messages_receiver, - ))), + network_receiver, + sync_receiver, + ))) + } else { + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender, + config.num_validators, + None, + ); + Ok(tokio::spawn(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + network_channels.broadcasted_messages_receiver, + futures::stream::pending(), + ))) } } @@ -147,13 +165,11 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { // P2P network. let ( - network_future, + mut maybe_network_manager, maybe_sync_client_channels, maybe_sync_server_channels, - maybe_consensus_channels, local_peer_id, - ) = run_network(config.network.clone(), config.consensus.clone())?; - let network_handle = tokio::spawn(network_future); + ) = run_network(config.network.clone())?; // Monitoring server. let monitoring_server = MonitoringServer::new( @@ -231,16 +247,15 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { let sync_handle = tokio::spawn(sync_future); let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future); - let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels { - run_consensus( - config.consensus.expect("If consensus_channels is Some, consensus must be Some too."), - storage_reader.clone(), - consensus_channels, - )? - } else { - tokio::spawn(pending()) - }; + let consensus_handle = + run_consensus(config.consensus, storage_reader.clone(), maybe_network_manager.as_mut())?; + let network_handle = tokio::spawn(async move { + match maybe_network_manager { + Some(manager) => manager.run().boxed().await, + None => pending().boxed().await, + } + }); tokio::select! { res = storage_metrics_handle => { error!("collecting storage metrics stopped."); @@ -324,20 +339,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { } } -type NetworkRunReturn = ( - BoxFuture<'static, Result<(), NetworkError>>, - Option, - Option, - Option>, - String, -); - -fn run_network( - network_config: Option, - consensus_config: Option, -) -> anyhow::Result { +type NetworkRunReturn = + (Option, Option, Option, String); + +fn run_network(network_config: Option) -> anyhow::Result { let Some(network_config) = network_config else { - return Ok((pending().boxed(), None, None, None, "".to_string())); + return Ok((None, None, None, "".to_string())); }; let mut network_manager = network_manager::NetworkManager::new(network_config.clone()); let local_peer_id = network_manager.get_local_peer_id(); @@ -359,13 +366,6 @@ fn run_network( let event_server_channel = network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); - let consensus_channels = match consensus_config { - Some(consensus_config) => Some( - network_manager - .register_broadcast_topic(Topic::new(consensus_config.topic), BUFFER_SIZE)?, - ), - None => None, - }; let p2p_sync_client_channels = P2PSyncClientChannels::new( header_client_sender, state_diff_client_sender, @@ -380,10 +380,9 @@ fn run_network( ); Ok(( - network_manager.run().boxed(), + Some(network_manager), Some(p2p_sync_client_channels), Some(p2p_sync_server_channels), - consensus_channels, local_peer_id, )) } diff --git a/crates/papyrus_protobuf/src/converters/consensus.rs b/crates/papyrus_protobuf/src/converters/consensus.rs index 1e12b3ee77..ec185e2505 100644 --- a/crates/papyrus_protobuf/src/converters/consensus.rs +++ b/crates/papyrus_protobuf/src/converters/consensus.rs @@ -105,6 +105,8 @@ impl From for protobuf::Vote { } } +auto_impl_into_and_try_from_vec_u8!(Vote, protobuf::Vote); + impl TryFrom for ConsensusMessage { type Error = ProtobufConversionError; diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index c10e4581a7..69d790a75f 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -9,7 +9,7 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::{Stream, StreamExt}; -use papyrus_common::metrics as papyrus_metrics; +use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT; use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use papyrus_protobuf::converters::ProtobufConversionError; @@ -27,39 +27,50 @@ use crate::types::{ }; // TODO(dvir): add test for this. -#[instrument(skip(context, start_height, network_receiver), level = "info")] +#[instrument(skip(context, start_height, network_receiver, sync_receiver), level = "info")] #[allow(missing_docs)] -pub async fn run_consensus( +pub async fn run_consensus( mut context: ContextT, start_height: BlockNumber, validator_id: ValidatorId, consensus_delay: Duration, mut network_receiver: NetworkReceiverT, + mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where BlockT: ConsensusBlock, ContextT: ConsensusContext, NetworkReceiverT: Stream, ReportSender)> + Unpin, + SyncReceiverT: Stream + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { + info!("Running consensus"); // Add a short delay to allow peers to connect and avoid "InsufficientPeers" error tokio::time::sleep(consensus_delay).await; let mut current_height = start_height; let mut manager = MultiHeightManager::new(); loop { - let decision = manager - .run_height(&mut context, current_height, validator_id, &mut network_receiver) - .await?; - - info!( - "Finished consensus for height: {current_height}. Agreed on block with id: {:x}", - decision.block.id().0 - ); - debug!("Decision: {:?}", decision); - metrics::gauge!(papyrus_metrics::PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); - current_height = current_height.unchecked_next(); + metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); + + let run_height = + manager.run_height(&mut context, current_height, validator_id, &mut network_receiver); + + // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop + // it. We also cannot restart the height; when we dropped the future we dropped the state it + // built and risk equivocating. Therefore, we must only enter the other select branches if + // we are certain to leave this height. + tokio::select! { + decision = run_height => { + let decision = decision?; + context.decision(decision.block, decision.precommits).await?; + current_height = current_height.unchecked_next(); + }, + sync_height = future_height(current_height, &mut sync_receiver) => { + current_height = sync_height?.unchecked_next(); + } + } } } @@ -191,3 +202,27 @@ where } } } + +// Return only when a height is reached that is greater than or equal to the current height. +async fn future_height( + height: BlockNumber, + mut sync_receiver: SyncReceiverT, +) -> Result +where + SyncReceiverT: Stream + Unpin, +{ + loop { + match sync_receiver.next().await { + Some(sync_height) if sync_height >= height => { + info!("Sync to height: {}. current_height={}", sync_height, height); + return Ok(sync_height); + } + Some(sync_height) => { + debug!("Ignoring sync to height: {}. current_height={}", sync_height, height); + } + None => { + return Err(ConsensusError::SyncError("Sync receiver closed".to_string())); + } + } + } +} diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 119cee3f97..0d379f350f 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::vec; use async_trait::async_trait; @@ -5,6 +6,7 @@ use futures::channel::{mpsc, oneshot}; use futures::SinkExt; use lazy_static::lazy_static; use mockall::mock; +use mockall::predicate::eq; use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote, VoteType}; use papyrus_protobuf::converters::ProtobufConversionError; @@ -12,7 +14,7 @@ use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; use starknet_types_core::felt::Felt; -use super::MultiHeightManager; +use super::{run_consensus, MultiHeightManager}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; lazy_static! { @@ -71,6 +73,12 @@ mock! { content_receiver: mpsc::Receiver, fin_receiver: oneshot::Receiver, ) -> Result<(), ConsensusError>; + + async fn decision( + &self, + block: TestBlock, + precommits: Vec, + ) -> Result<(), ConsensusError>; } } @@ -114,7 +122,7 @@ fn precommit(block_hash: Option, height: u64, voter: ValidatorId) -> } #[tokio::test] -async fn run_multiple_heights_unordered() { +async fn manager_multiple_heights_unordered() { let mut context = MockTestContext::new(); let (mut sender, mut receiver) = mpsc::unbounded(); @@ -161,3 +169,121 @@ async fn run_multiple_heights_unordered() { .unwrap(); assert_eq!(decision.block.id(), BlockHash(Felt::TWO)); } + +#[tokio::test] +async fn run_consensus_sync() { + // Set expectations. + let mut context = MockTestContext::new(); + let (decision_tx, decision_rx) = oneshot::channel(); + + context.expect_validate_proposal().return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::TWO) }).unwrap(); + block_receiver + }); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context.expect_broadcast().returning(move |_| Ok(())); + context.expect_decision().return_once(move |block, votes| { + assert_eq!(block.id(), BlockHash(Felt::TWO)); + assert_eq!(votes[0].height, 2); + decision_tx.send(()).unwrap(); + Ok(()) + }); + + // Send messages for height 2. + let (mut network_sender, mut network_receiver) = mpsc::unbounded(); + send(&mut network_sender, proposal(BlockHash(Felt::TWO), 2)).await; + send(&mut network_sender, prevote(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + + // Start at height 1. + let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); + let consensus_handle = tokio::spawn(async move { + run_consensus( + context, + BlockNumber(1), + *VALIDATOR_ID, + Duration::ZERO, + &mut network_receiver, + &mut sync_receiver, + ) + .await + }); + + // Send sync for height 1. + sync_sender.send(BlockNumber(1)).await.unwrap(); + // Make sure the sync is processed before the upcoming messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Decision for height 2. + decision_rx.await.unwrap(); + + // Drop the sender to close consensus and gracefully shut down. + drop(sync_sender); + assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); +} + +// Check for cancellation safety when ignoring old heights. If the current height check was done +// within the select branch this test would hang. +#[tokio::test] +async fn run_consensus_sync_cancellation_safety() { + let mut context = MockTestContext::new(); + let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel(); + let (decision_tx, decision_rx) = oneshot::channel(); + + context.expect_validate_proposal().return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::ONE) }).unwrap(); + block_receiver + }); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context + .expect_broadcast() + .with(eq(prevote(Some(BlockHash(Felt::ONE)), 1, *VALIDATOR_ID))) + .return_once(move |_| { + proposal_handled_tx.send(()).unwrap(); + Ok(()) + }); + context.expect_broadcast().returning(move |_| Ok(())); + context.expect_decision().return_once(|block, votes| { + assert_eq!(block.id(), BlockHash(Felt::ONE)); + assert_eq!(votes[0].height, 1); + decision_tx.send(()).unwrap(); + Ok(()) + }); + + let (mut network_sender, mut network_receiver) = mpsc::unbounded(); + let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); + + let consensus_handle = tokio::spawn(async move { + run_consensus( + context, + BlockNumber(1), + *VALIDATOR_ID, + Duration::ZERO, + &mut network_receiver, + &mut sync_receiver, + ) + .await + }); + + // Send a proposal for height 1. + send(&mut network_sender, proposal(BlockHash(Felt::ONE), 1)).await; + proposal_handled_rx.await.unwrap(); + + // Send an old sync. This should not cancel the current height. + sync_sender.send(BlockNumber(0)).await.unwrap(); + // Make sure the sync is processed before the upcoming messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Finished messages for 1 + send(&mut network_sender, prevote(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + decision_rx.await.unwrap(); + + // Drop the sender to close consensus and gracefully shut down. + drop(sync_sender); + assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); +} diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs index a137259512..901ebdef46 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -10,14 +10,14 @@ use futures::channel::{mpsc, oneshot}; use futures::sink::SinkExt; use futures::StreamExt; use papyrus_network::network_manager::BroadcastSubscriberSender; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; -use tracing::{debug, debug_span, Instrument}; +use tracing::{debug, debug_span, info, warn, Instrument}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; use crate::ProposalWrapper; @@ -45,8 +45,9 @@ impl ConsensusBlock for PapyrusConsensusBlock { pub struct PapyrusConsensusContext { storage_reader: StorageReader, - broadcast_sender: BroadcastSubscriberSender, + network_broadcast_sender: BroadcastSubscriberSender, validators: Vec, + sync_broadcast_sender: Option>, } impl PapyrusConsensusContext { @@ -54,13 +55,15 @@ impl PapyrusConsensusContext { #[allow(dead_code)] pub fn new( storage_reader: StorageReader, - broadcast_sender: BroadcastSubscriberSender, + network_broadcast_sender: BroadcastSubscriberSender, num_validators: u64, + sync_broadcast_sender: Option>, ) -> Self { Self { storage_reader, - broadcast_sender, + network_broadcast_sender, validators: (0..num_validators).map(ContractAddress::from).collect(), + sync_broadcast_sender, } } } @@ -160,9 +163,13 @@ impl ConsensusContext for PapyrusConsensusContext { panic!("Block in {height} was not found in storage despite waiting for it") }) .block_hash; + + // This can happen as a result of sync interrupting `run_height`. fin_sender .send(PapyrusConsensusBlock { content: transactions, id: block_hash }) - .expect("Send should succeed"); + .unwrap_or_else(|_| { + warn!("Failed to send block to consensus. height={height}"); + }) } .instrument(debug_span!("consensus_validate_proposal")), ); @@ -180,7 +187,7 @@ impl ConsensusContext for PapyrusConsensusContext { async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> { debug!("Broadcasting message: {message:?}"); - self.broadcast_sender.send(message).await?; + self.network_broadcast_sender.send(message).await?; Ok(()) } @@ -190,7 +197,7 @@ impl ConsensusContext for PapyrusConsensusContext { mut content_receiver: mpsc::Receiver, fin_receiver: oneshot::Receiver, ) -> Result<(), ConsensusError> { - let mut broadcast_sender = self.broadcast_sender.clone(); + let mut broadcast_sender = self.network_broadcast_sender.clone(); tokio::spawn( async move { @@ -225,6 +232,22 @@ impl ConsensusContext for PapyrusConsensusContext { ); Ok(()) } + + async fn decision( + &self, + block: Self::Block, + precommits: Vec, + ) -> Result<(), ConsensusError> { + let height = precommits[0].height; + info!( + "Finished consensus for height: {height}. Agreed on block with id: {:x}", + block.id().0 + ); + if let Some(sender) = &self.sync_broadcast_sender { + sender.clone().send(precommits[0].clone()).await?; + } + Ok(()) + } } const SLEEP_BETWEEN_CHECK_FOR_BLOCK: Duration = Duration::from_secs(10); diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs index a9f9c853e3..e4314e5010 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs @@ -1,7 +1,7 @@ use futures::channel::{mpsc, oneshot}; use futures::StreamExt; use papyrus_network::network_manager::{mock_register_broadcast_subscriber, BroadcastNetworkMock}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote}; use papyrus_storage::body::BodyStorageWriter; use papyrus_storage::header::HeaderStorageWriter; use papyrus_storage::test_utils::get_test_storage; @@ -20,7 +20,7 @@ const TEST_CHANNEL_SIZE: usize = 10; #[tokio::test] async fn build_proposal() { - let (block, papyrus_context, _mock_network) = test_setup(); + let (block, papyrus_context, _, _) = test_setup(); let block_number = block.header.block_number; let (mut proposal_receiver, fin_receiver) = papyrus_context.build_proposal(block_number).await; @@ -38,7 +38,7 @@ async fn build_proposal() { #[tokio::test] async fn validate_proposal_success() { - let (block, papyrus_context, _mock_network) = test_setup(); + let (block, papyrus_context, _, _) = test_setup(); let block_number = block.header.block_number; let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); @@ -56,7 +56,7 @@ async fn validate_proposal_success() { #[tokio::test] async fn validate_proposal_fail() { - let (block, papyrus_context, _mock_network) = test_setup(); + let (block, papyrus_context, _, _) = test_setup(); let block_number = block.header.block_number; let different_block = get_test_block(4, None, None, None); @@ -72,7 +72,7 @@ async fn validate_proposal_fail() { #[tokio::test] async fn propose() { - let (block, papyrus_context, mut mock_network) = test_setup(); + let (block, papyrus_context, mut mock_network, _) = test_setup(); let block_number = block.header.block_number; let (mut content_sender, content_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); @@ -99,7 +99,12 @@ async fn propose() { assert_eq!(mock_network.messages_to_broadcast_receiver.next().await.unwrap(), expected_message); } -fn test_setup() -> (Block, PapyrusConsensusContext, BroadcastNetworkMock) { +fn test_setup() -> ( + Block, + PapyrusConsensusContext, + BroadcastNetworkMock, + BroadcastNetworkMock, +) { let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); let block = get_test_block(5, None, None, None); let block_number = block.header.block_number; @@ -113,11 +118,13 @@ fn test_setup() -> (Block, PapyrusConsensusContext, BroadcastNetworkMock, fin_receiver: oneshot::Receiver, ) -> Result<(), ConsensusError>; + + async fn decision( + &self, + block: TestBlock, + precommits: Vec, + ) -> Result<(), ConsensusError>; } } diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index d213bc04af..0648234ae9 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -135,6 +135,12 @@ pub trait ConsensusContext { content_receiver: mpsc::Receiver<::ProposalChunk>, fin_receiver: oneshot::Receiver, ) -> Result<(), ConsensusError>; + + async fn decision( + &self, + block: Self::Block, + precommits: Vec, + ) -> Result<(), ConsensusError>; } #[derive(PartialEq)] @@ -177,4 +183,6 @@ pub enum ConsensusError { // As opposed to an error between this node and peer nodes. #[error("{0}")] InternalNetworkError(String), + #[error("{0}")] + SyncError(String), }