From 33df5384789dc8e2e8c87af1b7e6a9a6e10de364 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Sun, 11 Aug 2024 16:09:23 +0300 Subject: [PATCH] feat(consensus): add sync receiver to consensus This allows for the block propogation to tell consensus to move on if it's working on an old height. Special was taken to make this cancel safe. --- crates/papyrus_node/src/main.rs | 5 + .../papyrus_consensus/src/manager.rs | 54 ++++++-- .../papyrus_consensus/src/manager_test.rs | 124 +++++++++++++++++- .../sequencing/papyrus_consensus/src/types.rs | 2 + 4 files changed, 175 insertions(+), 10 deletions(-) diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 8b28115ab33..79105414285 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,6 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use futures::channel::mpsc; use futures::future::BoxFuture; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; @@ -109,6 +110,8 @@ fn run_consensus( config.num_validators, ); let start_height = config.start_height; + // TODO(matan): connect this to an actual channel. + let (_, sync_rx) = mpsc::channel(1); match config.test { Some(test_config) => { let network_receiver = NetworkReceiver::new( @@ -124,6 +127,7 @@ fn run_consensus( validator_id, config.consensus_delay, network_receiver, + sync_rx, ))) } None => Ok(tokio::spawn(papyrus_consensus::run_consensus( @@ -132,6 +136,7 @@ fn run_consensus( validator_id, config.consensus_delay, consensus_channels.broadcasted_messages_receiver, + sync_rx, ))), } } diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index dcd6a25cedc..8a8324a2790 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -13,7 +13,7 @@ use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; -use tracing::{debug, instrument}; +use tracing::{debug, info, instrument}; use crate::single_height_consensus::SingleHeightConsensus; use crate::types::{ @@ -26,20 +26,22 @@ use crate::types::{ }; // TODO(dvir): add test for this. -#[instrument(skip(context, start_height, network_receiver), level = "info")] +#[instrument(skip(context, start_height, network_receiver, sync_receiver), level = "info")] #[allow(missing_docs)] -pub async fn run_consensus( +pub async fn run_consensus( mut context: ContextT, start_height: BlockNumber, validator_id: ValidatorId, consensus_delay: Duration, mut network_receiver: NetworkReceiverT, + mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where BlockT: ConsensusBlock, ContextT: ConsensusContext, NetworkReceiverT: Stream, ReportSender)> + Unpin, + SyncReceiverT: Stream + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { @@ -48,11 +50,23 @@ where let mut current_height = start_height; let mut manager = MultiHeightManager::new(); loop { - let decision = manager - .run_height(&mut context, current_height, validator_id, &mut network_receiver) - .await?; - context.decision(decision.block, decision.precommits).await?; - current_height = current_height.unchecked_next(); + let run_height = + manager.run_height(&mut context, current_height, validator_id, &mut network_receiver); + + // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop + // it. We also cannot restart the height; when we dropped the future we dropped the state it + // built and risk equivocating. Therefore, we must only enter the other select branches if + // we are certain to leave this height. + tokio::select! { + decision = run_height => { + let decision = decision?; + context.decision(decision.block, decision.precommits).await?; + current_height = current_height.unchecked_next(); + }, + sync_height = sync_height(current_height, &mut sync_receiver) => { + current_height = sync_height?.unchecked_next(); + } + } } } @@ -184,3 +198,27 @@ where } } } + +// Return only when a height is reached that is greater than or equal to the current height. +async fn sync_height( + height: BlockNumber, + mut sync_receiver: SyncReceiverT, +) -> Result +where + SyncReceiverT: Stream + Unpin, +{ + loop { + match sync_receiver.next().await { + Some(sync_height) if sync_height >= height => { + info!("Sync to height: {}. current_height={}", sync_height, height); + return Ok(sync_height); + } + Some(sync_height) => { + debug!("Ignoring sync to height: {}. current_height={}", sync_height, height); + } + None => { + return Err(ConsensusError::SyncError("Sync receiver closed".to_string())); + } + } + } +} diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index d12ce5c03c4..0d379f350f4 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::vec; use async_trait::async_trait; @@ -5,6 +6,7 @@ use futures::channel::{mpsc, oneshot}; use futures::SinkExt; use lazy_static::lazy_static; use mockall::mock; +use mockall::predicate::eq; use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote, VoteType}; use papyrus_protobuf::converters::ProtobufConversionError; @@ -12,7 +14,7 @@ use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; use starknet_types_core::felt::Felt; -use super::MultiHeightManager; +use super::{run_consensus, MultiHeightManager}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; lazy_static! { @@ -120,7 +122,7 @@ fn precommit(block_hash: Option, height: u64, voter: ValidatorId) -> } #[tokio::test] -async fn run_multiple_heights_unordered() { +async fn manager_multiple_heights_unordered() { let mut context = MockTestContext::new(); let (mut sender, mut receiver) = mpsc::unbounded(); @@ -167,3 +169,121 @@ async fn run_multiple_heights_unordered() { .unwrap(); assert_eq!(decision.block.id(), BlockHash(Felt::TWO)); } + +#[tokio::test] +async fn run_consensus_sync() { + // Set expectations. + let mut context = MockTestContext::new(); + let (decision_tx, decision_rx) = oneshot::channel(); + + context.expect_validate_proposal().return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::TWO) }).unwrap(); + block_receiver + }); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context.expect_broadcast().returning(move |_| Ok(())); + context.expect_decision().return_once(move |block, votes| { + assert_eq!(block.id(), BlockHash(Felt::TWO)); + assert_eq!(votes[0].height, 2); + decision_tx.send(()).unwrap(); + Ok(()) + }); + + // Send messages for height 2. + let (mut network_sender, mut network_receiver) = mpsc::unbounded(); + send(&mut network_sender, proposal(BlockHash(Felt::TWO), 2)).await; + send(&mut network_sender, prevote(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + + // Start at height 1. + let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); + let consensus_handle = tokio::spawn(async move { + run_consensus( + context, + BlockNumber(1), + *VALIDATOR_ID, + Duration::ZERO, + &mut network_receiver, + &mut sync_receiver, + ) + .await + }); + + // Send sync for height 1. + sync_sender.send(BlockNumber(1)).await.unwrap(); + // Make sure the sync is processed before the upcoming messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Decision for height 2. + decision_rx.await.unwrap(); + + // Drop the sender to close consensus and gracefully shut down. + drop(sync_sender); + assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); +} + +// Check for cancellation safety when ignoring old heights. If the current height check was done +// within the select branch this test would hang. +#[tokio::test] +async fn run_consensus_sync_cancellation_safety() { + let mut context = MockTestContext::new(); + let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel(); + let (decision_tx, decision_rx) = oneshot::channel(); + + context.expect_validate_proposal().return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::ONE) }).unwrap(); + block_receiver + }); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context + .expect_broadcast() + .with(eq(prevote(Some(BlockHash(Felt::ONE)), 1, *VALIDATOR_ID))) + .return_once(move |_| { + proposal_handled_tx.send(()).unwrap(); + Ok(()) + }); + context.expect_broadcast().returning(move |_| Ok(())); + context.expect_decision().return_once(|block, votes| { + assert_eq!(block.id(), BlockHash(Felt::ONE)); + assert_eq!(votes[0].height, 1); + decision_tx.send(()).unwrap(); + Ok(()) + }); + + let (mut network_sender, mut network_receiver) = mpsc::unbounded(); + let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); + + let consensus_handle = tokio::spawn(async move { + run_consensus( + context, + BlockNumber(1), + *VALIDATOR_ID, + Duration::ZERO, + &mut network_receiver, + &mut sync_receiver, + ) + .await + }); + + // Send a proposal for height 1. + send(&mut network_sender, proposal(BlockHash(Felt::ONE), 1)).await; + proposal_handled_rx.await.unwrap(); + + // Send an old sync. This should not cancel the current height. + sync_sender.send(BlockNumber(0)).await.unwrap(); + // Make sure the sync is processed before the upcoming messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Finished messages for 1 + send(&mut network_sender, prevote(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + decision_rx.await.unwrap(); + + // Drop the sender to close consensus and gracefully shut down. + drop(sync_sender); + assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); +} diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 2cac44be55c..f6f8416dfd0 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -187,4 +187,6 @@ pub enum ConsensusError { // As opposed to an error between this node and peer nodes. #[error("{0}")] InternalNetworkError(String), + #[error("{0}")] + SyncError(String), }