From 06159c4a35e195efb580e8d182194923cc8c9451 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Sun, 11 Aug 2024 16:09:23 +0300 Subject: [PATCH] feat(consensus): add sync receiver to consensus This allows for the block propogation to tell consensus to move on if it's working on an old height. Special was taken to make this cancel safe. --- crates/papyrus_node/src/main.rs | 5 + .../papyrus_consensus/src/manager.rs | 54 ++++++-- .../papyrus_consensus/src/manager_test.rs | 124 +++++++++++++++++- .../sequencing/papyrus_consensus/src/types.rs | 2 + 4 files changed, 175 insertions(+), 10 deletions(-) diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 8b28115ab3..7910541428 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,6 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use futures::channel::mpsc; use futures::future::BoxFuture; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; @@ -109,6 +110,8 @@ fn run_consensus( config.num_validators, ); let start_height = config.start_height; + // TODO(matan): connect this to an actual channel. + let (_, sync_rx) = mpsc::channel(1); match config.test { Some(test_config) => { let network_receiver = NetworkReceiver::new( @@ -124,6 +127,7 @@ fn run_consensus( validator_id, config.consensus_delay, network_receiver, + sync_rx, ))) } None => Ok(tokio::spawn(papyrus_consensus::run_consensus( @@ -132,6 +136,7 @@ fn run_consensus( validator_id, config.consensus_delay, consensus_channels.broadcasted_messages_receiver, + sync_rx, ))), } } diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 9154cf1db4..7c96b11808 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -13,7 +13,7 @@ use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; -use tracing::{debug, instrument}; +use tracing::{debug, info, instrument}; use crate::single_height_consensus::SingleHeightConsensus; use crate::types::{ @@ -26,20 +26,22 @@ use crate::types::{ }; // TODO(dvir): add test for this. -#[instrument(skip(context, start_height, network_receiver), level = "info")] +#[instrument(skip(context, start_height, network_receiver, sync_receiver), level = "info")] #[allow(missing_docs)] -pub async fn run_consensus( +pub async fn run_consensus( mut context: ContextT, start_height: BlockNumber, validator_id: ValidatorId, consensus_delay: Duration, mut network_receiver: NetworkReceiverT, + mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where BlockT: ConsensusBlock, ContextT: ConsensusContext, NetworkReceiverT: Stream, ReportSender)> + Unpin, + SyncReceiverT: Stream + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { @@ -48,11 +50,23 @@ where let mut current_height = start_height; let mut manager = MultiHeightManager::new(); loop { - let decision = manager - .run_height(&mut context, current_height, validator_id, &mut network_receiver) - .await?; - context.notify_decision(decision.block, decision.precommits).await?; - current_height = current_height.unchecked_next(); + let run_height = + manager.run_height(&mut context, current_height, validator_id, &mut network_receiver); + + // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop + // it. We also cannot restart the height; when we dropped the future we dropped the state it + // built and risk equivocating. Therefore, we must only enter the other select branches if + // we are certain to leave this height. + tokio::select! { + decision = run_height => { + let decision = decision?; + context.notify_decision(decision.block, decision.precommits).await?; + current_height = current_height.unchecked_next(); + }, + sync_height = sync_height(current_height, &mut sync_receiver) => { + current_height = sync_height?.unchecked_next(); + } + } } } @@ -184,3 +198,27 @@ where } } } + +// Return only when a height is reached that is greater than or equal to the current height. +async fn sync_height( + height: BlockNumber, + mut sync_receiver: SyncReceiverT, +) -> Result +where + SyncReceiverT: Stream + Unpin, +{ + loop { + match sync_receiver.next().await { + Some(sync_height) if sync_height >= height => { + info!("Sync to height: {}. current_height={}", sync_height, height); + return Ok(sync_height); + } + Some(sync_height) => { + debug!("Ignoring sync to height: {}. current_height={}", sync_height, height); + } + None => { + return Err(ConsensusError::SyncError("Sync receiver closed".to_string())); + } + } + } +} diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 36daac0ab9..5788131308 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::vec; use async_trait::async_trait; @@ -5,6 +6,7 @@ use futures::channel::{mpsc, oneshot}; use futures::SinkExt; use lazy_static::lazy_static; use mockall::mock; +use mockall::predicate::eq; use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote, VoteType}; use papyrus_protobuf::converters::ProtobufConversionError; @@ -12,7 +14,7 @@ use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; use starknet_types_core::felt::Felt; -use super::MultiHeightManager; +use super::{run_consensus, MultiHeightManager}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; lazy_static! { @@ -120,7 +122,7 @@ fn precommit(block_hash: Option, height: u64, voter: ValidatorId) -> } #[tokio::test] -async fn run_multiple_heights_unordered() { +async fn manager_multiple_heights_unordered() { let mut context = MockTestContext::new(); let (mut sender, mut receiver) = mpsc::unbounded(); @@ -167,3 +169,121 @@ async fn run_multiple_heights_unordered() { .unwrap(); assert_eq!(decision.block.id(), BlockHash(Felt::TWO)); } + +#[tokio::test] +async fn run_consensus_sync() { + // Set expectations. + let mut context = MockTestContext::new(); + let (decision_tx, decision_rx) = oneshot::channel(); + + context.expect_validate_proposal().return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::TWO) }).unwrap(); + block_receiver + }); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context.expect_broadcast().returning(move |_| Ok(())); + context.expect_decision().return_once(move |block, votes| { + assert_eq!(block.id(), BlockHash(Felt::TWO)); + assert_eq!(votes[0].height, 2); + decision_tx.send(()).unwrap(); + Ok(()) + }); + + // Send messages for height 2. + let (mut network_sender, mut network_receiver) = mpsc::unbounded(); + send(&mut network_sender, proposal(BlockHash(Felt::TWO), 2)).await; + send(&mut network_sender, prevote(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + + // Start at height 1. + let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); + let consensus_handle = tokio::spawn(async move { + run_consensus( + context, + BlockNumber(1), + *VALIDATOR_ID, + Duration::ZERO, + &mut network_receiver, + &mut sync_receiver, + ) + .await + }); + + // Send sync for height 1. + sync_sender.send(BlockNumber(1)).await.unwrap(); + // Make sure the sync is processed before the upcoming messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Decision for height 2. + decision_rx.await.unwrap(); + + // Drop the sender to close consensus and gracefully shut down. + drop(sync_sender); + assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); +} + +// Check for cancellation safety when ignoring old heights. If the current height check was done +// within the select branch this test would hang. +#[tokio::test] +async fn run_consensus_sync_cancellation_safety() { + let mut context = MockTestContext::new(); + let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel(); + let (decision_tx, decision_rx) = oneshot::channel(); + + context.expect_validate_proposal().return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::ONE) }).unwrap(); + block_receiver + }); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context + .expect_broadcast() + .with(eq(prevote(Some(BlockHash(Felt::ONE)), 1, *VALIDATOR_ID))) + .return_once(move |_| { + proposal_handled_tx.send(()).unwrap(); + Ok(()) + }); + context.expect_broadcast().returning(move |_| Ok(())); + context.expect_decision().return_once(|block, votes| { + assert_eq!(block.id(), BlockHash(Felt::ONE)); + assert_eq!(votes[0].height, 1); + decision_tx.send(()).unwrap(); + Ok(()) + }); + + let (mut network_sender, mut network_receiver) = mpsc::unbounded(); + let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); + + let consensus_handle = tokio::spawn(async move { + run_consensus( + context, + BlockNumber(1), + *VALIDATOR_ID, + Duration::ZERO, + &mut network_receiver, + &mut sync_receiver, + ) + .await + }); + + // Send a proposal for height 1. + send(&mut network_sender, proposal(BlockHash(Felt::ONE), 1)).await; + proposal_handled_rx.await.unwrap(); + + // Send an old sync. This should not cancel the current height. + sync_sender.send(BlockNumber(0)).await.unwrap(); + // Make sure the sync is processed before the upcoming messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Finished messages for 1 + send(&mut network_sender, prevote(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + decision_rx.await.unwrap(); + + // Drop the sender to close consensus and gracefully shut down. + drop(sync_sender); + assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); +} diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index b685c551f3..41414fa849 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -187,4 +187,6 @@ pub enum ConsensusError { // As opposed to an error between this node and peer nodes. #[error("{0}")] InternalNetworkError(String), + #[error("{0}")] + SyncError(String), }