From cd7996a9ed941a08cfb0d19b5d6eecf54558e9af Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Thu, 5 Dec 2024 17:10:00 +0200 Subject: [PATCH] refactor(consensus): move the sync check into run_height The goal is to remove the nested selects to make cancellation logic simpler. --- .../papyrus_consensus/src/manager.rs | 62 ++++++++++++------- .../papyrus_consensus/src/manager_test.rs | 18 ++++-- 2 files changed, 52 insertions(+), 28 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 880d71f877f..66a0b8954a6 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -67,32 +67,39 @@ where metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); let is_observer = current_height < start_active_height; - let run_height = manager.run_height( - &mut context, - current_height, - is_observer, - &mut broadcast_channels, - &mut inbound_proposal_receiver, - ); - - // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop - // it. We also cannot restart the height; when we dropped the future we dropped the state it - // built and risk equivocating. Therefore, we must only enter the other select branches if - // we are certain to leave this height. - tokio::select! { - decision = run_height => { - let decision = decision?; + match manager + .run_height( + &mut context, + current_height, + is_observer, + &mut broadcast_channels, + &mut inbound_proposal_receiver, + &mut sync_receiver, + ) + .await? + { + RunHeightRes::Decision(decision) => { context.decision_reached(decision.block, decision.precommits).await?; current_height = current_height.unchecked_next(); - }, - sync_height = sync_height(current_height, &mut sync_receiver) => { + } + RunHeightRes::Sync(sync_height) => { metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT); - current_height = sync_height?.unchecked_next(); + current_height = sync_height.unchecked_next(); } } } } +/// Run height can end either when consensus reaches a decision or when we learn, via sync, of the +/// decision. +// TODO(Matan): Sync may change when Shahak actually implements. +pub enum RunHeightRes { + /// Decision reached. + Decision(Decision), + /// Sync protocol returned a future height. + Sync(BlockNumber), +} + /// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly /// part of the single height consensus algorithm (e.g. messages from future heights). #[derive(Debug, Default)] @@ -118,15 +125,19 @@ impl MultiHeightManager { /// /// Assumes that `height` is monotonically increasing across calls for the sake of filtering /// `cached_messaged`. - #[instrument(skip(self, context, broadcast_channels), level = "info")] - pub async fn run_height( + #[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")] + pub async fn run_height( &mut self, context: &mut ContextT, height: BlockNumber, is_observer: bool, broadcast_channels: &mut BroadcastConsensusMessageChannel, proposal_receiver: &mut mpsc::Receiver>, - ) -> Result { + sync_receiver: &mut SyncReceiverT, + ) -> Result + where + SyncReceiverT: Stream + Unpin, + { let validators = context.validators(height).await; info!("running consensus for height {height:?} with validator set {validators:?}"); let mut shc = SingleHeightConsensus::new( @@ -139,7 +150,7 @@ impl MultiHeightManager { let mut shc_events = FuturesUnordered::new(); match self.start_height(context, height, &mut shc).await? { - ShcReturn::Decision(decision) => return Ok(decision), + ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)), ShcReturn::Tasks(tasks) => { for task in tasks { shc_events.push(task.run()); @@ -169,10 +180,13 @@ impl MultiHeightManager { Some(shc_event) = shc_events.next() => { shc.handle_event(context, shc_event).await? }, + sync_height = sync_height(height, sync_receiver) => { + return Ok(RunHeightRes::Sync(sync_height?)); + } }; match shc_return { - ShcReturn::Decision(decision) => return Ok(decision), + ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)), ShcReturn::Tasks(tasks) => { for task in tasks { shc_events.push(task.run()); @@ -319,7 +333,7 @@ impl MultiHeightManager { // Return only when a height is reached that is greater than or equal to the current height. async fn sync_height( height: BlockNumber, - mut sync_receiver: SyncReceiverT, + sync_receiver: &mut SyncReceiverT, ) -> Result where SyncReceiverT: Stream + Unpin, diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index ebef7ddeb0d..46e92cc2016 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -27,7 +27,7 @@ use starknet_api::transaction::Transaction; use starknet_types_core::felt::Felt; use tokio::sync::Notify; -use super::{run_consensus, MultiHeightManager}; +use super::{run_consensus, MultiHeightManager, RunHeightRes}; use crate::config::TimeoutsConfig; use crate::test_utils::{precommit, prevote, proposal_init}; use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId}; @@ -124,6 +124,13 @@ fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) { .times(1); } +fn assert_decision(res: RunHeightRes, id: Felt) { + match res { + RunHeightRes::Decision(decision) => assert_eq!(decision.block, BlockHash(id)), + _ => panic!("Expected decision"), + } +} + #[tokio::test] async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -174,10 +181,11 @@ async fn manager_multiple_heights_unordered() { false, &mut subscriber_channels, &mut proposal_receiver_receiver, + &mut futures::stream::pending(), ) .await .unwrap(); - assert_eq!(decision.block, BlockHash(Felt::ONE)); + assert_decision(decision, Felt::ONE); // Run the manager for height 2. expect_validate_proposal(&mut context, Felt::TWO); @@ -188,10 +196,11 @@ async fn manager_multiple_heights_unordered() { false, &mut subscriber_channels, &mut proposal_receiver_receiver, + &mut futures::stream::pending(), ) .await .unwrap(); - assert_eq!(decision.block, BlockHash(Felt::TWO)); + assert_decision(decision, Felt::TWO); } #[tokio::test] @@ -387,10 +396,11 @@ async fn test_timeouts() { false, &mut subscriber_channels.into(), &mut proposal_receiver_receiver, + &mut futures::stream::pending(), ) .await .unwrap(); - assert_eq!(decision.block, BlockHash(Felt::ONE)); + assert_decision(decision, Felt::ONE); }); // Wait for the timeout to be triggered.