diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 8b28115ab3..7910541428 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,6 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use futures::channel::mpsc; use futures::future::BoxFuture; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; @@ -109,6 +110,8 @@ fn run_consensus( config.num_validators, ); let start_height = config.start_height; + // TODO(matan): connect this to an actual channel. + let (_, sync_rx) = mpsc::channel(1); match config.test { Some(test_config) => { let network_receiver = NetworkReceiver::new( @@ -124,6 +127,7 @@ fn run_consensus( validator_id, config.consensus_delay, network_receiver, + sync_rx, ))) } None => Ok(tokio::spawn(papyrus_consensus::run_consensus( @@ -132,6 +136,7 @@ fn run_consensus( validator_id, config.consensus_delay, consensus_channels.broadcasted_messages_receiver, + sync_rx, ))), } } diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index dcd6a25ced..8a8324a279 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -13,7 +13,7 @@ use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; -use tracing::{debug, instrument}; +use tracing::{debug, info, instrument}; use crate::single_height_consensus::SingleHeightConsensus; use crate::types::{ @@ -26,20 +26,22 @@ use crate::types::{ }; // TODO(dvir): add test for this. -#[instrument(skip(context, start_height, network_receiver), level = "info")] +#[instrument(skip(context, start_height, network_receiver, sync_receiver), level = "info")] #[allow(missing_docs)] -pub async fn run_consensus( +pub async fn run_consensus( mut context: ContextT, start_height: BlockNumber, validator_id: ValidatorId, consensus_delay: Duration, mut network_receiver: NetworkReceiverT, + mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where BlockT: ConsensusBlock, ContextT: ConsensusContext, NetworkReceiverT: Stream, ReportSender)> + Unpin, + SyncReceiverT: Stream + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { @@ -48,11 +50,23 @@ where let mut current_height = start_height; let mut manager = MultiHeightManager::new(); loop { - let decision = manager - .run_height(&mut context, current_height, validator_id, &mut network_receiver) - .await?; - context.decision(decision.block, decision.precommits).await?; - current_height = current_height.unchecked_next(); + let run_height = + manager.run_height(&mut context, current_height, validator_id, &mut network_receiver); + + // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop + // it. We also cannot restart the height; when we dropped the future we dropped the state it + // built and risk equivocating. Therefore, we must only enter the other select branches if + // we are certain to leave this height. + tokio::select! { + decision = run_height => { + let decision = decision?; + context.decision(decision.block, decision.precommits).await?; + current_height = current_height.unchecked_next(); + }, + sync_height = sync_height(current_height, &mut sync_receiver) => { + current_height = sync_height?.unchecked_next(); + } + } } } @@ -184,3 +198,27 @@ where } } } + +// Return only when a height is reached that is greater than or equal to the current height. +async fn sync_height( + height: BlockNumber, + mut sync_receiver: SyncReceiverT, +) -> Result +where + SyncReceiverT: Stream + Unpin, +{ + loop { + match sync_receiver.next().await { + Some(sync_height) if sync_height >= height => { + info!("Sync to height: {}. current_height={}", sync_height, height); + return Ok(sync_height); + } + Some(sync_height) => { + debug!("Ignoring sync to height: {}. current_height={}", sync_height, height); + } + None => { + return Err(ConsensusError::SyncError("Sync receiver closed".to_string())); + } + } + } +} diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index d12ce5c03c..0d379f350f 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::vec; use async_trait::async_trait; @@ -5,6 +6,7 @@ use futures::channel::{mpsc, oneshot}; use futures::SinkExt; use lazy_static::lazy_static; use mockall::mock; +use mockall::predicate::eq; use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote, VoteType}; use papyrus_protobuf::converters::ProtobufConversionError; @@ -12,7 +14,7 @@ use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; use starknet_types_core::felt::Felt; -use super::MultiHeightManager; +use super::{run_consensus, MultiHeightManager}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; lazy_static! { @@ -120,7 +122,7 @@ fn precommit(block_hash: Option, height: u64, voter: ValidatorId) -> } #[tokio::test] -async fn run_multiple_heights_unordered() { +async fn manager_multiple_heights_unordered() { let mut context = MockTestContext::new(); let (mut sender, mut receiver) = mpsc::unbounded(); @@ -167,3 +169,121 @@ async fn run_multiple_heights_unordered() { .unwrap(); assert_eq!(decision.block.id(), BlockHash(Felt::TWO)); } + +#[tokio::test] +async fn run_consensus_sync() { + // Set expectations. + let mut context = MockTestContext::new(); + let (decision_tx, decision_rx) = oneshot::channel(); + + context.expect_validate_proposal().return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::TWO) }).unwrap(); + block_receiver + }); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context.expect_broadcast().returning(move |_| Ok(())); + context.expect_decision().return_once(move |block, votes| { + assert_eq!(block.id(), BlockHash(Felt::TWO)); + assert_eq!(votes[0].height, 2); + decision_tx.send(()).unwrap(); + Ok(()) + }); + + // Send messages for height 2. + let (mut network_sender, mut network_receiver) = mpsc::unbounded(); + send(&mut network_sender, proposal(BlockHash(Felt::TWO), 2)).await; + send(&mut network_sender, prevote(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + + // Start at height 1. + let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); + let consensus_handle = tokio::spawn(async move { + run_consensus( + context, + BlockNumber(1), + *VALIDATOR_ID, + Duration::ZERO, + &mut network_receiver, + &mut sync_receiver, + ) + .await + }); + + // Send sync for height 1. + sync_sender.send(BlockNumber(1)).await.unwrap(); + // Make sure the sync is processed before the upcoming messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Decision for height 2. + decision_rx.await.unwrap(); + + // Drop the sender to close consensus and gracefully shut down. + drop(sync_sender); + assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); +} + +// Check for cancellation safety when ignoring old heights. If the current height check was done +// within the select branch this test would hang. +#[tokio::test] +async fn run_consensus_sync_cancellation_safety() { + let mut context = MockTestContext::new(); + let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel(); + let (decision_tx, decision_rx) = oneshot::channel(); + + context.expect_validate_proposal().return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::ONE) }).unwrap(); + block_receiver + }); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context + .expect_broadcast() + .with(eq(prevote(Some(BlockHash(Felt::ONE)), 1, *VALIDATOR_ID))) + .return_once(move |_| { + proposal_handled_tx.send(()).unwrap(); + Ok(()) + }); + context.expect_broadcast().returning(move |_| Ok(())); + context.expect_decision().return_once(|block, votes| { + assert_eq!(block.id(), BlockHash(Felt::ONE)); + assert_eq!(votes[0].height, 1); + decision_tx.send(()).unwrap(); + Ok(()) + }); + + let (mut network_sender, mut network_receiver) = mpsc::unbounded(); + let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); + + let consensus_handle = tokio::spawn(async move { + run_consensus( + context, + BlockNumber(1), + *VALIDATOR_ID, + Duration::ZERO, + &mut network_receiver, + &mut sync_receiver, + ) + .await + }); + + // Send a proposal for height 1. + send(&mut network_sender, proposal(BlockHash(Felt::ONE), 1)).await; + proposal_handled_rx.await.unwrap(); + + // Send an old sync. This should not cancel the current height. + sync_sender.send(BlockNumber(0)).await.unwrap(); + // Make sure the sync is processed before the upcoming messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Finished messages for 1 + send(&mut network_sender, prevote(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + decision_rx.await.unwrap(); + + // Drop the sender to close consensus and gracefully shut down. + drop(sync_sender); + assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); +} diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index cdb179e423..0648234ae9 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -183,4 +183,6 @@ pub enum ConsensusError { // As opposed to an error between this node and peer nodes. #[error("{0}")] InternalNetworkError(String), + #[error("{0}")] + SyncError(String), }