diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index c578bf58fe..50d9f745f4 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -134,6 +134,7 @@ fn run_consensus( config.start_height, config.validator_id, config.consensus_delay, + config.timeouts.clone(), network_receiver, sync_receiver, ))) @@ -149,6 +150,7 @@ fn run_consensus( config.start_height, config.validator_id, config.consensus_delay, + config.timeouts.clone(), network_channels.broadcasted_messages_receiver, futures::stream::pending(), ))) diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index a2c12ef79c..826c7e69c2 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -8,6 +8,7 @@ use std::collections::BTreeMap; use std::time::Duration; use futures::channel::{mpsc, oneshot}; +use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt}; use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; use papyrus_network::network_manager::ReportSender; @@ -16,7 +17,8 @@ use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; use tracing::{debug, info, instrument}; -use crate::single_height_consensus::SingleHeightConsensus; +use crate::config::TimeoutsConfig; +use crate::single_height_consensus::{ShcReturn, ShcTask, SingleHeightConsensus}; use crate::types::{ ConsensusBlock, ConsensusContext, @@ -27,13 +29,14 @@ use crate::types::{ }; // TODO(dvir): add test for this. -#[instrument(skip(context, start_height, network_receiver, sync_receiver), level = "info")] +#[instrument(skip_all, level = "info")] #[allow(missing_docs)] pub async fn run_consensus( mut context: ContextT, start_height: BlockNumber, validator_id: ValidatorId, consensus_delay: Duration, + timeouts: TimeoutsConfig, mut network_receiver: NetworkReceiverT, mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> @@ -46,17 +49,22 @@ where ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { - info!("Running consensus"); + info!( + "Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}", + start_height, + validator_id, + consensus_delay.as_secs(), + timeouts + ); // Add a short delay to allow peers to connect and avoid "InsufficientPeers" error tokio::time::sleep(consensus_delay).await; let mut current_height = start_height; - let mut manager = MultiHeightManager::new(); + let mut manager = MultiHeightManager::new(validator_id, timeouts); loop { metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); - let run_height = - manager.run_height(&mut context, current_height, validator_id, &mut network_receiver); + let run_height = manager.run_height(&mut context, current_height, &mut network_receiver); // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop // it. We also cannot restart the height; when we dropped the future we dropped the state it @@ -85,25 +93,26 @@ pub struct ProposalWrapper(pub Proposal); /// part of the single height consensus algorithm (e.g. messages from future heights). #[derive(Debug, Default)] struct MultiHeightManager { + validator_id: ValidatorId, cached_messages: BTreeMap>, + timeouts: TimeoutsConfig, } impl MultiHeightManager { /// Create a new consensus manager. - pub fn new() -> Self { - Self { cached_messages: BTreeMap::new() } + pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self { + Self { validator_id, cached_messages: BTreeMap::new(), timeouts } } /// Run the consensus algorithm for a single height. /// /// Assumes that `height` is monotonically increasing across calls for the sake of filtering /// `cached_messaged`. - #[instrument(skip(self, context, validator_id, network_receiver), level = "info")] + #[instrument(skip(self, context, network_receiver), level = "info")] pub async fn run_height( &mut self, context: &mut ContextT, height: BlockNumber, - validator_id: ValidatorId, network_receiver: &mut NetworkReceiverT, ) -> Result, ConsensusError> where @@ -118,40 +127,86 @@ impl MultiHeightManager { )>, { let validators = context.validators(height).await; - let mut shc = SingleHeightConsensus::new(height, validator_id, validators); + let mut shc = SingleHeightConsensus::new( + height, + self.validator_id, + validators, + self.timeouts.clone(), + ); + let mut shc_tasks = FuturesUnordered::new(); - if let Some(decision) = shc.start(context).await? { - return Ok(decision); + match shc.start(context).await? { + ShcReturn::Decision(decision) => return Ok(decision), + ShcReturn::Tasks(tasks) => { + for task in tasks { + shc_tasks.push(create_task_handler(task)); + } + } } let mut current_height_messages = self.get_current_height_messages(height); loop { - let message = next_message(&mut current_height_messages, network_receiver).await?; - // TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints: - // 1. Malicious - must be capped so a malicious peer can't DoS us. - // 2. Parallel proposals - we may send/receive a proposal for (H+1, 0). - // In general I think we will want to only cache (H+1, 0) messages. - if message.height() != height.0 { - debug!("Received a message for a different height. {:?}", message); - if message.height() > height.0 { - self.cached_messages.entry(message.height()).or_default().push(message); - } - continue; - } + let shc_return = tokio::select! { + message = next_message(&mut current_height_messages, network_receiver) => { + self.handle_message(context, height, &mut shc, message?).await? + }, + Some(shc_task) = shc_tasks.next() => { + shc.handle_task(context, shc_task).await? + }, + }; - let maybe_decision = match message { - ConsensusMessage::Proposal(proposal) => { - // Special case due to fake streaming. - let (proposal_init, content_receiver, fin_receiver) = - ProposalWrapper(proposal).into(); - shc.handle_proposal(context, proposal_init, content_receiver, fin_receiver) - .await? + match shc_return { + ShcReturn::Decision(decision) => return Ok(decision), + ShcReturn::Tasks(tasks) => { + for task in tasks { + shc_tasks.push(create_task_handler(task)); + } } - _ => shc.handle_message(context, message).await?, - }; + } + } + } - if let Some(decision) = maybe_decision { - return Ok(decision); + // Handle a single consensus message. + async fn handle_message( + &mut self, + context: &mut ContextT, + height: BlockNumber, + shc: &mut SingleHeightConsensus, + message: ConsensusMessage, + ) -> Result, ConsensusError> + where + BlockT: ConsensusBlock, + ContextT: ConsensusContext, + ProposalWrapper: Into<( + ProposalInit, + mpsc::Receiver, + oneshot::Receiver, + )>, + { + // TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints: + // 1. Malicious - must be capped so a malicious peer can't DoS us. + // 2. Parallel proposals - we may send/receive a proposal for (H+1, 0). + // In general I think we will want to only cache (H+1, 0) messages. + if message.height() != height.0 { + debug!("Received a message for a different height. {:?}", message); + if message.height() > height.0 { + self.cached_messages.entry(message.height()).or_default().push(message); + } + return Ok(ShcReturn::Tasks(vec![])); + } + match message { + ConsensusMessage::Proposal(proposal) => { + // Special case due to fake streaming. + let (proposal_init, content_receiver, fin_receiver) = + ProposalWrapper(proposal).into(); + let res = shc + .handle_proposal(context, proposal_init, content_receiver, fin_receiver) + .await?; + Ok(res) + } + _ => { + let res = shc.handle_message(context, message).await?; + Ok(res) } } } @@ -228,3 +283,8 @@ where } } } + +async fn create_task_handler(task: ShcTask) -> ShcTask { + tokio::time::sleep(task.duration).await; + task +} diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 233a486288..6eaa596f1d 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -15,6 +15,7 @@ use starknet_api::transaction::Transaction; use starknet_types_core::felt::Felt; use super::{run_consensus, MultiHeightManager}; +use crate::config::TimeoutsConfig; use crate::test_utils::{precommit, prevote, proposal}; use crate::types::{ ConsensusBlock, @@ -26,8 +27,11 @@ use crate::types::{ }; lazy_static! { - static ref VALIDATOR_ID: ValidatorId = 1_u32.into(); static ref PROPOSER_ID: ValidatorId = 0_u32.into(); + static ref VALIDATOR_ID: ValidatorId = 1_u32.into(); + static ref VALIDATOR_ID_2: ValidatorId = 2_u32.into(); + static ref VALIDATOR_ID_3: ValidatorId = 3_u32.into(); + static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); } // TODO(matan): Switch to using TestBlock & MockTestContext in `test_utils` once streaming is @@ -102,8 +106,6 @@ async fn send(sender: &mut Sender, msg: ConsensusMessage) { #[tokio::test] async fn manager_multiple_heights_unordered() { - let mut context = MockTestContext::new(); - let (mut sender, mut receiver) = mpsc::unbounded(); // Send messages for height 2 followed by those for height 1. send(&mut sender, proposal(BlockHash(Felt::TWO), 2, 0, *PROPOSER_ID)).await; @@ -113,8 +115,7 @@ async fn manager_multiple_heights_unordered() { send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(BlockHash(Felt::ONE)), 1, 0, *PROPOSER_ID)).await; - let mut manager = MultiHeightManager::new(); - + let mut context = MockTestContext::new(); // Run the manager for height 1. context .expect_validate_proposal() @@ -127,10 +128,9 @@ async fn manager_multiple_heights_unordered() { context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_broadcast().returning(move |_| Ok(())); - let decision = manager - .run_height(&mut context, BlockNumber(1), *VALIDATOR_ID, &mut receiver) - .await - .unwrap(); + + let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone()); + let decision = manager.run_height(&mut context, BlockNumber(1), &mut receiver).await.unwrap(); assert_eq!(decision.block.id(), BlockHash(Felt::ONE)); // Run the manager for height 2. @@ -142,10 +142,7 @@ async fn manager_multiple_heights_unordered() { block_receiver }) .times(1); - let decision = manager - .run_height(&mut context, BlockNumber(2), *VALIDATOR_ID, &mut receiver) - .await - .unwrap(); + let decision = manager.run_height(&mut context, BlockNumber(2), &mut receiver).await.unwrap(); assert_eq!(decision.block.id(), BlockHash(Felt::TWO)); } @@ -184,6 +181,7 @@ async fn run_consensus_sync() { BlockNumber(1), *VALIDATOR_ID, Duration::ZERO, + TIMEOUTS.clone(), &mut network_receiver, &mut sync_receiver, ) @@ -242,6 +240,7 @@ async fn run_consensus_sync_cancellation_safety() { BlockNumber(1), *VALIDATOR_ID, Duration::ZERO, + TIMEOUTS.clone(), &mut network_receiver, &mut sync_receiver, ) @@ -266,3 +265,56 @@ async fn run_consensus_sync_cancellation_safety() { drop(sync_sender); assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); } + +#[tokio::test] +async fn test_timeouts() { + let (mut sender, mut receiver) = mpsc::unbounded(); + send(&mut sender, proposal(BlockHash(Felt::ONE), 1, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await; + send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await; + send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_2)).await; + send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_3)).await; + + let mut context = MockTestContext::new(); + context.expect_validate_proposal().returning(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::ONE) }).unwrap(); + block_receiver + }); + context + .expect_validators() + .returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID, *VALIDATOR_ID_2, *VALIDATOR_ID_3]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + + let (timeout_send, timeout_receive) = oneshot::channel(); + // Node handled Timeout events and responded with NIL vote. + context + .expect_broadcast() + .times(1) + .withf(move |msg: &ConsensusMessage| msg == &prevote(None, 1, 1, *VALIDATOR_ID)) + .return_once(move |_| { + timeout_send.send(()).unwrap(); + Ok(()) + }); + context.expect_broadcast().returning(move |_| Ok(())); + + let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone()); + let manager_handle = tokio::spawn(async move { + let decision = + manager.run_height(&mut context, BlockNumber(1), &mut receiver).await.unwrap(); + assert_eq!(decision.block.id(), BlockHash(Felt::ONE)); + }); + + // Wait for the timeout to be triggered. + timeout_receive.await.unwrap(); + // Show that after the timeout is triggered we can still precommit in favor of the block and + // reach a decision. + send(&mut sender, proposal(BlockHash(Felt::ONE), 1, 1, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, 1, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, 1, *VALIDATOR_ID_2)).await; + send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, 1, *VALIDATOR_ID_3)).await; + send(&mut sender, precommit(Some(BlockHash(Felt::ONE)), 1, 1, *VALIDATOR_ID_2)).await; + send(&mut sender, precommit(Some(BlockHash(Felt::ONE)), 1, 1, *VALIDATOR_ID_3)).await; + + manager_handle.await.unwrap(); +} diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index 1f6a57fb52..2524b0a2d8 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -4,12 +4,14 @@ mod single_height_consensus_test; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; +use std::time::Duration; use futures::channel::{mpsc, oneshot}; use papyrus_protobuf::consensus::{ConsensusMessage, Vote, VoteType}; use starknet_api::block::{BlockHash, BlockNumber}; use tracing::{debug, info, instrument, trace, warn}; +use crate::config::TimeoutsConfig; use crate::state_machine::{StateMachine, StateMachineEvent}; use crate::types::{ ConsensusBlock, @@ -21,6 +23,18 @@ use crate::types::{ ValidatorId, }; +#[derive(Debug, PartialEq)] +pub struct ShcTask { + pub duration: Duration, + pub event: StateMachineEvent, +} + +#[derive(Debug, PartialEq)] +pub enum ShcReturn { + Tasks(Vec), + Decision(Decision), +} + /// Struct which represents a single height of consensus. Each height is expected to be begun with a /// call to `start`, which is relevant if we are the proposer for this height's first round. /// SingleHeightConsensus receives messages directly as parameters to function calls. It can send @@ -29,6 +43,7 @@ pub(crate) struct SingleHeightConsensus { height: BlockNumber, validators: Vec, id: ValidatorId, + timeouts: TimeoutsConfig, state_machine: StateMachine, proposals: HashMap>, prevotes: HashMap<(Round, ValidatorId), Vote>, @@ -36,13 +51,19 @@ pub(crate) struct SingleHeightConsensus { } impl SingleHeightConsensus { - pub(crate) fn new(height: BlockNumber, id: ValidatorId, validators: Vec) -> Self { + pub(crate) fn new( + height: BlockNumber, + id: ValidatorId, + validators: Vec, + timeouts: TimeoutsConfig, + ) -> Self { // TODO(matan): Use actual weights, not just `len`. let state_machine = StateMachine::new(id, validators.len() as u32); Self { height, validators, id, + timeouts, state_machine, proposals: HashMap::new(), prevotes: HashMap::new(), @@ -54,7 +75,7 @@ impl SingleHeightConsensus { pub(crate) async fn start>( &mut self, context: &mut ContextT, - ) -> Result>, ConsensusError> { + ) -> Result, ConsensusError> { info!("Starting consensus with validators {:?}", self.validators); let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let events = self.state_machine.start(&leader_fn); @@ -74,7 +95,7 @@ impl SingleHeightConsensus { init: ProposalInit, p2p_messages_receiver: mpsc::Receiver<::ProposalChunk>, fin_receiver: oneshot::Receiver, - ) -> Result>, ConsensusError> { + ) -> Result, ConsensusError> { debug!( "Received proposal: height={}, round={}, proposer={:?}", init.height.0, init.round, init.proposer @@ -91,7 +112,7 @@ impl SingleHeightConsensus { } let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else { warn!("Round {} already has a proposal, ignoring", init.round); - return Ok(None); + return Ok(ShcReturn::Tasks(vec![])); }; let block_receiver = context.validate_proposal(self.height, p2p_messages_receiver).await; @@ -128,7 +149,7 @@ impl SingleHeightConsensus { context: &mut ContextT, init: &ProposalInit, block_id: Option, - ) -> Result>, ConsensusError> { + ) -> Result, ConsensusError> { let sm_proposal = StateMachineEvent::Proposal(block_id, init.round); let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let sm_events = self.state_machine.handle_event(sm_proposal, &leader_fn); @@ -141,7 +162,7 @@ impl SingleHeightConsensus { &mut self, context: &mut ContextT, message: ConsensusMessage, - ) -> Result>, ConsensusError> { + ) -> Result, ConsensusError> { debug!("Received message: {:?}", message); match message { ConsensusMessage::Proposal(_) => { @@ -151,12 +172,31 @@ impl SingleHeightConsensus { } } + pub async fn handle_task>( + &mut self, + context: &mut ContextT, + task: ShcTask, + ) -> Result, ConsensusError> { + debug!("Received task: {:?}", task); + match task.event { + StateMachineEvent::TimeoutPropose(_) + | StateMachineEvent::TimeoutPrevote(_) + | StateMachineEvent::TimeoutPrecommit(_) => { + let leader_fn = + |round: Round| -> ValidatorId { context.proposer(self.height, round) }; + let sm_events = self.state_machine.handle_event(task.event, &leader_fn); + self.handle_state_machine_events(context, sm_events).await + } + _ => unimplemented!("Unexpected task: {:?}", task), + } + } + #[instrument(skip_all)] async fn handle_vote>( &mut self, context: &mut ContextT, vote: Vote, - ) -> Result>, ConsensusError> { + ) -> Result, ConsensusError> { if !self.validators.contains(&vote.voter) { return Err(ConsensusError::InvalidVote( vote.clone(), @@ -187,7 +227,7 @@ impl SingleHeightConsensus { )); } else { // Replay, ignore. - return Ok(None); + return Ok(ShcReturn::Tasks(vec![])); } } } @@ -202,7 +242,8 @@ impl SingleHeightConsensus { &mut self, context: &mut ContextT, mut events: VecDeque, - ) -> Result>, ConsensusError> { + ) -> Result, ConsensusError> { + let mut ret_val = vec![]; while let Some(event) = events.pop_front() { trace!("Handling event: {:?}", event); match event { @@ -229,11 +270,18 @@ impl SingleHeightConsensus { self.handle_state_machine_vote(context, block_hash, round, VoteType::Precommit) .await?; } - _ => { //TODO(Asmaa): handle timeouts + StateMachineEvent::TimeoutPropose(_) => { + ret_val.push(ShcTask { duration: self.timeouts.proposal_timeout, event }); + } + StateMachineEvent::TimeoutPrevote(_) => { + ret_val.push(ShcTask { duration: self.timeouts.prevote_timeout, event }); + } + StateMachineEvent::TimeoutPrecommit(_) => { + ret_val.push(ShcTask { duration: self.timeouts.precommit_timeout, event }); } } } - Ok(None) + Ok(ShcReturn::Tasks(ret_val)) } #[instrument(skip(self, context), level = "debug")] @@ -278,7 +326,7 @@ impl SingleHeightConsensus { block_hash: Option, round: Round, vote_type: VoteType, - ) -> Result>, ConsensusError> { + ) -> Result, ConsensusError> { let votes = match vote_type { VoteType::Prevote => &mut self.prevotes, VoteType::Precommit => &mut self.precommits, @@ -289,7 +337,7 @@ impl SingleHeightConsensus { panic!("State machine should not send repeat votes: old={:?}, new={:?}", old, vote); } context.broadcast(ConsensusMessage::Vote(vote)).await?; - Ok(None) + Ok(ShcReturn::Tasks(vec![])) } #[instrument(skip_all)] @@ -297,7 +345,7 @@ impl SingleHeightConsensus { &mut self, block_hash: BlockHash, round: Round, - ) -> Result>, ConsensusError> { + ) -> Result, ConsensusError> { let block = self .proposals .remove(&round) @@ -309,14 +357,11 @@ impl SingleHeightConsensus { .iter() .filter_map(|v| { let vote = self.precommits.get(&(round, *v))?; - if vote.block_hash != Some(block_hash) { - return None; - } - Some(vote.clone()) + if vote.block_hash == Some(block_hash) { Some(vote.clone()) } else { None } }) .collect(); // TODO(matan): Check actual weights. assert!(supporting_precommits.len() >= self.state_machine.quorum_size() as usize); - Ok(Some(Decision { precommits: supporting_precommits, block })) + Ok(ShcReturn::Decision(Decision { precommits: supporting_precommits, block })) } } diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index 6e0082e1a5..ce67c52e08 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -9,6 +9,9 @@ use test_case::test_case; use tokio; use super::SingleHeightConsensus; +use crate::config::TimeoutsConfig; +use crate::single_height_consensus::{ShcReturn, ShcTask}; +use crate::state_machine::StateMachineEvent; use crate::test_utils::{precommit, prevote, MockTestContext, TestBlock}; use crate::types::{ConsensusBlock, ConsensusError, ProposalInit, ValidatorId}; @@ -23,13 +26,19 @@ lazy_static! { static ref BLOCK_ID: BlockHash = BLOCK.id(); static ref PROPOSAL_INIT: ProposalInit = ProposalInit { height: BlockNumber(0), round: 0, proposer: *PROPOSER_ID }; + static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); } #[tokio::test] async fn proposer() { let mut context = MockTestContext::new(); - let mut shc = SingleHeightConsensus::new(BlockNumber(0), *VALIDATOR_ID_1, VALIDATORS.to_vec()); + let mut shc = SingleHeightConsensus::new( + BlockNumber(0), + *VALIDATOR_ID_1, + VALIDATORS.to_vec(), + TIMEOUTS.clone(), + ); context.expect_proposer().times(1).returning(move |_, _| *VALIDATOR_ID_1); context.expect_build_proposal().times(1).returning(move |_| { @@ -44,6 +53,7 @@ async fn proposer() { // Ignore content receiver, since this is the context's responsibility. assert_eq!(init.height, BlockNumber(0)); assert_eq!(init.proposer, *VALIDATOR_ID_1); + // This is done so that we can return immediately without dropping the receiver. fin_receiver_clone.set(fin_receiver).unwrap(); Ok(()) }); @@ -55,11 +65,10 @@ async fn proposer() { }) .returning(move |_| Ok(())); // Sends proposal and prevote. - assert!(matches!(shc.start(&mut context).await, Ok(None))); - + assert_eq!(shc.start(&mut context).await, Ok(ShcReturn::Tasks(vec![]))); assert_eq!( shc.handle_message(&mut context, prevote(Some(*BLOCK_ID), 0, 0, *PROPOSER_ID)).await, - Ok(None) + Ok(ShcReturn::Tasks(vec![])) ); // 3 of 4 Prevotes is enough to send a Precommit. context @@ -69,20 +78,39 @@ async fn proposer() { msg == &precommit(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_1) }) .returning(move |_| Ok(())); + // The Node got a Prevote quorum. assert_eq!( shc.handle_message(&mut context, prevote(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_2)).await, - Ok(None) + Ok(ShcReturn::Tasks(vec![ShcTask { + duration: TIMEOUTS.prevote_timeout, + event: StateMachineEvent::TimeoutPrevote(0) + }])) ); let precommits = vec![ - precommit(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_1), - precommit(Some(BlockHash(Felt::TWO)), 0, 0, *VALIDATOR_ID_3), // Ignores since disagrees. precommit(Some(*BLOCK_ID), 0, 0, *PROPOSER_ID), + precommit(Some(BlockHash(Felt::TWO)), 0, 0, *VALIDATOR_ID_3), precommit(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_2), + precommit(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_1), ]; - assert_eq!(shc.handle_message(&mut context, precommits[1].clone()).await, Ok(None)); - assert_eq!(shc.handle_message(&mut context, precommits[2].clone()).await, Ok(None)); - let decision = shc.handle_message(&mut context, precommits[3].clone()).await.unwrap().unwrap(); + assert_eq!( + shc.handle_message(&mut context, precommits[0].clone()).await, + Ok(ShcReturn::Tasks(vec![])) + ); + // The disagreeing vote counts towards the timeout, which uses a heterogeneous quorum, but not + // the decision, which uses a homogenous quorum. + assert_eq!( + shc.handle_message(&mut context, precommits[1].clone()).await, + Ok(ShcReturn::Tasks(vec![ShcTask { + duration: TIMEOUTS.precommit_timeout, + event: StateMachineEvent::TimeoutPrecommit(0) + }])) + ); + let ShcReturn::Decision(decision) = + shc.handle_message(&mut context, precommits[2].clone()).await.unwrap() + else { + panic!("Expected decision"); + }; assert_eq!(decision.block, *BLOCK); assert!( decision @@ -90,10 +118,6 @@ async fn proposer() { .into_iter() .all(|item| precommits.contains(&ConsensusMessage::Vote(item))) ); - - // Check the fin sent to the network. - let fin = Arc::into_inner(fin_receiver).unwrap().take().unwrap().await.unwrap(); - assert_eq!(fin, *BLOCK_ID); } #[test_case(false; "single_proposal")] @@ -103,7 +127,12 @@ async fn validator(repeat_proposal: bool) { let mut context = MockTestContext::new(); // Creation calls to `context.validators`. - let mut shc = SingleHeightConsensus::new(BlockNumber(0), *VALIDATOR_ID_1, VALIDATORS.to_vec()); + let mut shc = SingleHeightConsensus::new( + BlockNumber(0), + *VALIDATOR_ID_1, + VALIDATORS.to_vec(), + TIMEOUTS.clone(), + ); // Send the proposal from the peer. let (fin_sender, fin_receiver) = oneshot::channel(); @@ -130,7 +159,7 @@ async fn validator(repeat_proposal: bool) { fin_receiver, ) .await; - assert_eq!(res, Ok(None)); + assert_eq!(res, Ok(ShcReturn::Tasks(vec![]))); if repeat_proposal { // Send the same proposal again, which should be ignored (no expectations). let (fin_sender, fin_receiver) = oneshot::channel(); @@ -144,11 +173,11 @@ async fn validator(repeat_proposal: bool) { fin_receiver, ) .await; - assert_eq!(res, Ok(None)); + assert_eq!(res, Ok(ShcReturn::Tasks(vec![]))); } assert_eq!( shc.handle_message(&mut context, prevote(Some(*BLOCK_ID), 0, 0, *PROPOSER_ID)).await, - Ok(None) + Ok(ShcReturn::Tasks(vec![])) ); // 3 of 4 Prevotes is enough to send a Precommit. context @@ -158,9 +187,13 @@ async fn validator(repeat_proposal: bool) { msg == &precommit(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_1) }) .returning(move |_| Ok(())); + // The Node got a Prevote quorum. assert_eq!( shc.handle_message(&mut context, prevote(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_2)).await, - Ok(None) + Ok(ShcReturn::Tasks(vec![ShcTask { + duration: TIMEOUTS.prevote_timeout, + event: StateMachineEvent::TimeoutPrevote(0) + }])) ); let precommits = vec![ @@ -168,8 +201,15 @@ async fn validator(repeat_proposal: bool) { precommit(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_2), precommit(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_1), ]; - assert_eq!(shc.handle_message(&mut context, precommits[0].clone()).await, Ok(None)); - let decision = shc.handle_message(&mut context, precommits[1].clone()).await.unwrap().unwrap(); + assert_eq!( + shc.handle_message(&mut context, precommits[0].clone()).await, + Ok(ShcReturn::Tasks(vec![])) + ); + let ShcReturn::Decision(decision) = + shc.handle_message(&mut context, precommits[1].clone()).await.unwrap() + else { + panic!("Expected decision"); + }; assert_eq!(decision.block, *BLOCK); assert!( decision @@ -185,7 +225,12 @@ async fn validator(repeat_proposal: bool) { async fn vote_twice(same_vote: bool) { let mut context = MockTestContext::new(); - let mut shc = SingleHeightConsensus::new(BlockNumber(0), *VALIDATOR_ID_1, VALIDATORS.to_vec()); + let mut shc = SingleHeightConsensus::new( + BlockNumber(0), + *VALIDATOR_ID_1, + VALIDATORS.to_vec(), + TIMEOUTS.clone(), + ); let (fin_sender, fin_receiver) = oneshot::channel(); fin_sender.send(*BLOCK_ID).unwrap(); @@ -209,10 +254,10 @@ async fn vote_twice(same_vote: bool) { fin_receiver, ) .await; - assert_eq!(res, Ok(None)); + assert_eq!(res, Ok(ShcReturn::Tasks(vec![]))); let res = shc.handle_message(&mut context, prevote(Some(*BLOCK_ID), 0, 0, *PROPOSER_ID)).await; - assert_eq!(res, Ok(None)); + assert_eq!(res, Ok(ShcReturn::Tasks(vec![]))); context .expect_broadcast() @@ -221,11 +266,18 @@ async fn vote_twice(same_vote: bool) { .returning(move |_| Ok(())); let res = shc.handle_message(&mut context, prevote(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_2)).await; - assert_eq!(res, Ok(None)); + // The Node got a Prevote quorum. + assert_eq!( + res, + Ok(ShcReturn::Tasks(vec![ShcTask { + duration: TIMEOUTS.prevote_timeout, + event: StateMachineEvent::TimeoutPrevote(0) + }])) + ); let first_vote = precommit(Some(*BLOCK_ID), 0, 0, *PROPOSER_ID); let res = shc.handle_message(&mut context, first_vote.clone()).await; - assert_eq!(res, Ok(None)); + assert_eq!(res, Ok(ShcReturn::Tasks(vec![]))); let second_vote = if same_vote { first_vote.clone() @@ -234,15 +286,17 @@ async fn vote_twice(same_vote: bool) { }; let res = shc.handle_message(&mut context, second_vote.clone()).await; if same_vote { - assert_eq!(res, Ok(None)); + assert_eq!(res, Ok(ShcReturn::Tasks(vec![]))); } else { assert!(matches!(res, Err(ConsensusError::Equivocation(_, _, _)))); } - let decision = shc + let ShcReturn::Decision(decision) = shc .handle_message(&mut context, precommit(Some(*BLOCK_ID), 0, 0, *VALIDATOR_ID_2)) .await .unwrap() - .unwrap(); + else { + panic!("Expected decision"); + }; assert_eq!(decision.block, *BLOCK); }