From 316e4d24b95f1572b4828fcd1b824fab01073cbd Mon Sep 17 00:00:00 2001 From: Asmaa Magdoub Date: Wed, 30 Oct 2024 15:26:34 +0200 Subject: [PATCH] feat(consensus): non-blocking proposal handling --- Cargo.lock | 1 + .../sequencing/papyrus_consensus/Cargo.toml | 1 + .../papyrus_consensus/src/manager.rs | 17 +- .../src/single_height_consensus.rs | 265 +++++++++++++----- .../src/single_height_consensus_test.rs | 138 +++++---- 5 files changed, 283 insertions(+), 139 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6117441cf0..b44a88465e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7266,6 +7266,7 @@ version = "0.0.0" dependencies = [ "async-trait", "clap", + "enum-as-inner", "fs2", "futures", "lazy_static", diff --git a/crates/sequencing/papyrus_consensus/Cargo.toml b/crates/sequencing/papyrus_consensus/Cargo.toml index 25be426ad6..7e76e5c9c1 100644 --- a/crates/sequencing/papyrus_consensus/Cargo.toml +++ b/crates/sequencing/papyrus_consensus/Cargo.toml @@ -28,6 +28,7 @@ tokio.workspace = true tracing.workspace = true [dev-dependencies] +enum-as-inner = "0.6.1" mockall.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_network_types = { workspace = true, features = ["testing"] } diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 3a9f18bb7e..697726069c 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -18,7 +18,7 @@ use starknet_api::core::ContractAddress; use tracing::{debug, info, instrument}; use crate::config::TimeoutsConfig; -use crate::single_height_consensus::{ShcReturn, ShcTask, SingleHeightConsensus}; +use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus}; use crate::types::{ BroadcastConsensusMessageChannel, ConsensusContext, @@ -125,13 +125,13 @@ impl MultiHeightManager { validators, self.timeouts.clone(), ); - let mut shc_tasks = FuturesUnordered::new(); + let mut shc_events = FuturesUnordered::new(); match shc.start(context).await? { ShcReturn::Decision(decision) => return Ok(decision), ShcReturn::Tasks(tasks) => { for task in tasks { - shc_tasks.push(create_task_handler(task)); + shc_events.push(task.run()); } } } @@ -142,8 +142,8 @@ impl MultiHeightManager { message = next_message(&mut current_height_messages, broadcast_channels) => { self.handle_message(context, height, &mut shc, message?).await? }, - Some(shc_task) = shc_tasks.next() => { - shc.handle_event(context, shc_task.event).await? + Some(shc_event) = shc_events.next() => { + shc.handle_event(context, shc_event).await? }, }; @@ -151,7 +151,7 @@ impl MultiHeightManager { ShcReturn::Decision(decision) => return Ok(decision), ShcReturn::Tasks(tasks) => { for task in tasks { - shc_tasks.push(create_task_handler(task)); + shc_events.push(task.run()); } } } @@ -278,8 +278,3 @@ where } } } - -async fn create_task_handler(task: ShcTask) -> ShcTask { - tokio::time::sleep(task.duration).await; - task -} diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index dfc1c7195b..4956687c1c 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -6,6 +6,8 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::time::Duration; +#[cfg(test)] +use enum_as_inner::EnumAsInner; use futures::channel::{mpsc, oneshot}; use papyrus_protobuf::consensus::{ConsensusMessage, Vote, VoteType}; use starknet_api::block::BlockNumber; @@ -24,17 +26,123 @@ use crate::types::{ }; #[derive(Debug, PartialEq)] -pub struct ShcTask { - pub duration: Duration, - pub event: StateMachineEvent, -} - -#[derive(Debug, PartialEq)] +#[cfg_attr(test, derive(EnumAsInner))] pub enum ShcReturn { Tasks(Vec), Decision(Decision), } +#[derive(Debug, Clone)] +pub enum ShcEvent { + TimeoutPropose(StateMachineEvent), + TimeoutPrevote(StateMachineEvent), + TimeoutPrecommit(StateMachineEvent), + Prevote(StateMachineEvent), + Precommit(StateMachineEvent), + BuildProposal(StateMachineEvent), + // TODO: Replace ProposalContentId with the unvalidated signature from the proposer. + ValidateProposal(StateMachineEvent, Option), +} + +#[derive(Debug)] +#[cfg_attr(test, derive(EnumAsInner))] +pub enum ShcTask { + TimeoutPropose(Duration, StateMachineEvent), + TimeoutPrevote(Duration, StateMachineEvent), + TimeoutPrecommit(Duration, StateMachineEvent), + Prevote(Duration, StateMachineEvent), + Precommit(Duration, StateMachineEvent), + /// Building a proposal is handled in 3 stages: + /// 1. The SHC requests a block to be built from the context. + /// 2. SHC returns, allowing the context to build the block while the Manager awaits the result + /// without blocking consensus. + /// 3. Once building is complete, the manager returns the built block to the SHC as an event, + /// which can be sent to the SM. + /// * During this process, the SM is frozen; it will accept and buffer other events, only + /// processing them once it receives the built proposal. + BuildProposal(Round, oneshot::Receiver), + /// Validating a proposal is handled in 3 stages: + /// 1. The SHC validates `ProposalInit`, then starts block validation within the context. + /// 2. SHC returns, allowing the context to validate the content while the Manager await the + /// result without blocking consensus. + /// 3. Once validation is complete, the manager returns the built proposal to the SHC as an + /// event, which can be sent to the SM. + ValidateProposal( + ProposalInit, + oneshot::Receiver, // Block built from the content. + oneshot::Receiver, // Fin sent by the proposer. + ), +} + +impl PartialEq for ShcTask { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (ShcTask::TimeoutPropose(d1, e1), ShcTask::TimeoutPropose(d2, e2)) + | (ShcTask::TimeoutPrevote(d1, e1), ShcTask::TimeoutPrevote(d2, e2)) + | (ShcTask::TimeoutPrecommit(d1, e1), ShcTask::TimeoutPrecommit(d2, e2)) + | (ShcTask::Prevote(d1, e1), ShcTask::Prevote(d2, e2)) + | (ShcTask::Precommit(d1, e1), ShcTask::Precommit(d2, e2)) => d1 == d2 && e1 == e2, + (ShcTask::BuildProposal(r1, _), ShcTask::BuildProposal(r2, _)) => r1 == r2, + (ShcTask::ValidateProposal(pi1, _, _), ShcTask::ValidateProposal(pi2, _, _)) => { + pi1 == pi2 + } + _ => false, + } + } +} + +impl ShcTask { + pub async fn run(self) -> ShcEvent { + match self { + ShcTask::TimeoutPropose(duration, event) => { + tokio::time::sleep(duration).await; + ShcEvent::TimeoutPropose(event) + } + ShcTask::TimeoutPrevote(duration, event) => { + tokio::time::sleep(duration).await; + ShcEvent::TimeoutPrevote(event) + } + ShcTask::TimeoutPrecommit(duration, event) => { + tokio::time::sleep(duration).await; + ShcEvent::TimeoutPrecommit(event) + } + ShcTask::Prevote(duration, event) => { + tokio::time::sleep(duration).await; + ShcEvent::Prevote(event) + } + ShcTask::Precommit(duration, event) => { + tokio::time::sleep(duration).await; + ShcEvent::Precommit(event) + } + ShcTask::BuildProposal(round, receiver) => { + println!("Building proposal for round {}", round); + let proposal_id = receiver.await.expect("Block building failed."); + ShcEvent::BuildProposal(StateMachineEvent::GetProposal(Some(proposal_id), round)) + } + ShcTask::ValidateProposal( + init, + id_built_from_content_receiver, + fin_from_proposer_receiver, + ) => { + let proposal_id = match id_built_from_content_receiver.await { + Ok(proposal_id) => Some(proposal_id), + // Proposal never received from peer. + Err(_) => None, + }; + let fin = match fin_from_proposer_receiver.await { + Ok(fin) => Some(fin), + // ProposalFin never received from peer. + Err(_) => None, + }; + ShcEvent::ValidateProposal( + StateMachineEvent::Proposal(proposal_id, init.round, init.valid_round), + fin, + ) + } + } + } +} + /// Struct which represents a single height of consensus. Each height is expected to be begun with a /// call to `start`, which is relevant if we are the proposer for this height's first round. /// SingleHeightConsensus receives messages directly as parameters to function calls. It can send @@ -86,8 +194,8 @@ impl SingleHeightConsensus { self.handle_state_machine_events(context, events).await } - /// Receive a proposal from a peer node. Returns only once the proposal has been fully received - /// and processed. + /// Process the proposal init and initiate block validation. See [`ShcTask::ValidateProposal`] + /// for more details on the full proposal flow. #[instrument( skip_all, fields(height = %self.height), @@ -118,44 +226,20 @@ impl SingleHeightConsensus { warn!("Round {} already has a proposal, ignoring", init.round); return Ok(ShcReturn::Tasks(Vec::new())); }; - + // Since validating the proposal is non-blocking, we want to avoid validating the same round + // twice in parallel. This could be caused by a network repeat or a malicious spam attack. + proposal_entry.insert(None); let block_receiver = context .validate_proposal(self.height, self.timeouts.proposal_timeout, p2p_messages_receiver) .await; - - let block = match block_receiver.await { - Ok(block) => block, - // ProposalFin never received from peer. - Err(_) => { - proposal_entry.insert(None); - return self.process_inbound_proposal(context, &init, None).await; - } - }; - - let fin = match fin_receiver.await { - Ok(fin) => fin, - // ProposalFin never received from peer. - Err(_) => { - proposal_entry.insert(None); - return self.process_inbound_proposal(context, &init, None).await; - } - }; - // TODO(matan): Switch to signature validation. - if block != fin { - proposal_entry.insert(None); - return self.process_inbound_proposal(context, &init, None).await; - } - proposal_entry.insert(Some(block)); - self.process_inbound_proposal(context, &init, Some(block)).await + Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver, fin_receiver)])) } async fn process_inbound_proposal( &mut self, context: &mut ContextT, - init: &ProposalInit, - proposal_id: Option, + sm_proposal: StateMachineEvent, ) -> Result { - let sm_proposal = StateMachineEvent::Proposal(proposal_id, init.round, init.valid_round); let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let sm_events = self.state_machine.handle_event(sm_proposal, &leader_fn); self.handle_state_machine_events(context, sm_events).await @@ -180,19 +264,19 @@ impl SingleHeightConsensus { pub async fn handle_event( &mut self, context: &mut ContextT, - event: StateMachineEvent, + event: ShcEvent, ) -> Result { - debug!("Received Event: {:?}", event); + debug!("Received ShcEvent: {:?}", event); match event { - StateMachineEvent::TimeoutPropose(_) - | StateMachineEvent::TimeoutPrevote(_) - | StateMachineEvent::TimeoutPrecommit(_) => { + ShcEvent::TimeoutPropose(event) + | ShcEvent::TimeoutPrevote(event) + | ShcEvent::TimeoutPrecommit(event) => { let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let sm_events = self.state_machine.handle_event(event, &leader_fn); self.handle_state_machine_events(context, sm_events).await } - StateMachineEvent::Prevote(proposal_id, round) => { + ShcEvent::Prevote(StateMachineEvent::Prevote(proposal_id, round)) => { let Some(last_vote) = &self.last_prevote else { return Err(ConsensusError::InvalidEvent("No prevote to send".to_string())); }; @@ -200,12 +284,12 @@ impl SingleHeightConsensus { return Ok(ShcReturn::Tasks(Vec::new())); } context.broadcast(ConsensusMessage::Vote(last_vote.clone())).await?; - Ok(ShcReturn::Tasks(vec![ShcTask { - duration: self.timeouts.prevote_timeout, - event: StateMachineEvent::Prevote(proposal_id, round), - }])) + Ok(ShcReturn::Tasks(vec![ShcTask::Prevote( + self.timeouts.prevote_timeout, + StateMachineEvent::Prevote(proposal_id, round), + )])) } - StateMachineEvent::Precommit(proposal_id, round) => { + ShcEvent::Precommit(StateMachineEvent::Precommit(proposal_id, round)) => { let Some(last_vote) = &self.last_precommit else { return Err(ConsensusError::InvalidEvent("No precommit to send".to_string())); }; @@ -213,10 +297,48 @@ impl SingleHeightConsensus { return Ok(ShcReturn::Tasks(Vec::new())); } context.broadcast(ConsensusMessage::Vote(last_vote.clone())).await?; - Ok(ShcReturn::Tasks(vec![ShcTask { - duration: self.timeouts.precommit_timeout, - event: StateMachineEvent::Precommit(proposal_id, round), - }])) + Ok(ShcReturn::Tasks(vec![ShcTask::Precommit( + self.timeouts.precommit_timeout, + StateMachineEvent::Precommit(proposal_id, round), + )])) + } + ShcEvent::ValidateProposal( + StateMachineEvent::Proposal(proposal_id, round, valid_round), + fin, + ) => { + // TODO(matan): Switch to signature validation. + let id = if proposal_id != fin { + warn!( + "proposal_id built from content receiver does not match fin: {:#064x?} != \ + {:#064x?}", + proposal_id, fin + ); + None + } else { + proposal_id + }; + // Retaining the entry for this round prevents us from receiving another proposal on + // this round. If the validations failed, which can be caused by a network issue, we + // may want to re-open ourselves to this round. The downside is that this may open + // us to a spam attack. + // TODO(Asmaa): consider revisiting this decision. Spam attacks may not be a problem + // given that serial proposing anyways forces us to use interrupts. + self.proposals.insert(round, id); + self.process_inbound_proposal( + context, + StateMachineEvent::Proposal(id, round, valid_round), + ) + .await + } + ShcEvent::BuildProposal(StateMachineEvent::GetProposal(proposal_id, round)) => { + let old = self.proposals.insert(round, proposal_id); + assert!(old.is_none(), "There should be no entry for this round."); + let leader_fn = + |round: Round| -> ValidatorId { context.proposer(self.height, round) }; + let sm_events = self + .state_machine + .handle_event(StateMachineEvent::GetProposal(proposal_id, round), &leader_fn); + self.handle_state_machine_events(context, sm_events).await } _ => unimplemented!("Unexpected event: {:?}", event), } @@ -277,10 +399,8 @@ impl SingleHeightConsensus { trace!("Handling event: {:?}", event); match event { StateMachineEvent::GetProposal(proposal_id, round) => { - events.append( - &mut self - .handle_state_machine_get_proposal(context, proposal_id, round) - .await, + ret_val.extend( + self.handle_state_machine_get_proposal(context, proposal_id, round).await, ); } StateMachineEvent::Proposal(proposal_id, round, valid_round) => { @@ -313,26 +433,28 @@ impl SingleHeightConsensus { ); } StateMachineEvent::TimeoutPropose(_) => { - ret_val.push(ShcTask { duration: self.timeouts.proposal_timeout, event }); + ret_val.push(ShcTask::TimeoutPropose(self.timeouts.proposal_timeout, event)); } StateMachineEvent::TimeoutPrevote(_) => { - ret_val.push(ShcTask { duration: self.timeouts.prevote_timeout, event }); + ret_val.push(ShcTask::TimeoutPrevote(self.timeouts.prevote_timeout, event)); } StateMachineEvent::TimeoutPrecommit(_) => { - ret_val.push(ShcTask { duration: self.timeouts.precommit_timeout, event }); + ret_val.push(ShcTask::TimeoutPrecommit(self.timeouts.precommit_timeout, event)); } } } Ok(ShcReturn::Tasks(ret_val)) } + /// Initiate block building. See [`ShcTask::BuildProposal`] for more details on the full + /// proposal flow. #[instrument(skip(self, context), level = "debug")] async fn handle_state_machine_get_proposal( &mut self, context: &mut ContextT, proposal_id: Option, round: Round, - ) -> VecDeque { + ) -> Vec { assert!( proposal_id.is_none(), "ProposalContentId must be None since the state machine is requesting a \ @@ -345,12 +467,7 @@ impl SingleHeightConsensus { let init = ProposalInit { height: self.height, round, proposer: self.id, valid_round: None }; let fin_receiver = context.build_proposal(init, self.timeouts.proposal_timeout).await; - let block = fin_receiver.await.expect("Block building failed."); - let old = self.proposals.insert(round, Some(block)); - assert!(old.is_none(), "There should be no entry for this round."); - let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; - self.state_machine - .handle_event(StateMachineEvent::GetProposal(Some(block), round), &leader_fn) + vec![ShcTask::BuildProposal(round, fin_receiver)] } #[instrument(skip(self, context), level = "debug")] @@ -391,18 +508,22 @@ impl SingleHeightConsensus { round: Round, vote_type: VoteType, ) -> Result, ConsensusError> { - let (votes, last_vote, duration, event) = match vote_type { + let (votes, last_vote, task) = match vote_type { VoteType::Prevote => ( &mut self.prevotes, &mut self.last_prevote, - self.timeouts.prevote_timeout, - StateMachineEvent::Prevote(proposal_id, round), + ShcTask::Prevote( + self.timeouts.prevote_timeout, + StateMachineEvent::Prevote(proposal_id, round), + ), ), VoteType::Precommit => ( &mut self.precommits, &mut self.last_precommit, - self.timeouts.precommit_timeout, - StateMachineEvent::Precommit(proposal_id, round), + ShcTask::Precommit( + self.timeouts.precommit_timeout, + StateMachineEvent::Precommit(proposal_id, round), + ), ), }; let vote = Vote { @@ -421,7 +542,7 @@ impl SingleHeightConsensus { return Ok(Vec::new()); } *last_vote = Some(vote); - Ok(vec![ShcTask { duration, event }]) + Ok(vec![task]) } #[instrument(skip_all)] diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index e950b937dc..6758d09072 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -8,7 +8,7 @@ use tokio; use super::SingleHeightConsensus; use crate::config::TimeoutsConfig; -use crate::single_height_consensus::{ShcReturn, ShcTask}; +use crate::single_height_consensus::{ShcEvent, ShcReturn, ShcTask}; use crate::state_machine::StateMachineEvent; use crate::test_utils::{precommit, prevote, MockTestContext, TestBlock}; use crate::types::{ConsensusError, ProposalInit, ValidatorId}; @@ -28,31 +28,53 @@ lazy_static! { valid_round: None }; static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); + static ref VALIDATE_PROPOSAL_EVENT: ShcEvent = ShcEvent::ValidateProposal( + StateMachineEvent::Proposal(Some(BLOCK.id), PROPOSAL_INIT.round, PROPOSAL_INIT.valid_round,), + Some(BLOCK.id), + ); } fn prevote_task(block_felt: Option, round: u32) -> ShcTask { - ShcTask { - duration: TIMEOUTS.prevote_timeout, - event: StateMachineEvent::Prevote(block_felt.map(BlockHash), round), - } + ShcTask::Prevote( + TIMEOUTS.prevote_timeout, + StateMachineEvent::Prevote(block_felt.map(BlockHash), round), + ) } fn precommit_task(block_felt: Option, round: u32) -> ShcTask { - ShcTask { - duration: TIMEOUTS.precommit_timeout, - event: StateMachineEvent::Precommit(block_felt.map(BlockHash), round), - } + ShcTask::Precommit( + TIMEOUTS.precommit_timeout, + StateMachineEvent::Precommit(block_felt.map(BlockHash), round), + ) } fn timeout_prevote_task(round: u32) -> ShcTask { - ShcTask { duration: TIMEOUTS.prevote_timeout, event: StateMachineEvent::TimeoutPrevote(round) } + ShcTask::TimeoutPrevote(TIMEOUTS.prevote_timeout, StateMachineEvent::TimeoutPrevote(round)) } fn timeout_precommit_task(round: u32) -> ShcTask { - ShcTask { - duration: TIMEOUTS.precommit_timeout, - event: StateMachineEvent::TimeoutPrecommit(round), - } + ShcTask::TimeoutPrecommit( + TIMEOUTS.precommit_timeout, + StateMachineEvent::TimeoutPrecommit(round), + ) +} + +async fn handle_proposal( + shc: &mut SingleHeightConsensus, + context: &mut MockTestContext, +) -> ShcReturn { + // Send the proposal from the peer. + let (fin_sender, fin_receiver) = oneshot::channel(); + fin_sender.send(BLOCK.id).unwrap(); + + shc.handle_proposal( + context, + PROPOSAL_INIT.clone(), + mpsc::channel(1).1, // content - ignored by SHC. + fin_receiver, + ) + .await + .unwrap() } #[tokio::test] @@ -78,8 +100,14 @@ async fn proposer() { .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) .returning(move |_| Ok(())); // Sends proposal and prevote. + let shc_ret = shc.start(&mut context).await.unwrap(); + assert_eq!(*shc_ret.as_tasks().unwrap()[0].as_build_proposal().unwrap().0, 0); assert_eq!( - shc.start(&mut context).await, + shc.handle_event( + &mut context, + ShcEvent::BuildProposal(StateMachineEvent::GetProposal(Some(BLOCK.id), 0)), + ) + .await, Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0)])) ); assert_eq!( @@ -144,10 +172,6 @@ async fn validator(repeat_proposal: bool) { TIMEOUTS.clone(), ); - // Send the proposal from the peer. - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(BLOCK.id).unwrap(); - context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); @@ -161,29 +185,16 @@ async fn validator(repeat_proposal: bool) { msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1) }) .returning(move |_| Ok(())); - let res = shc - .handle_proposal( - &mut context, - PROPOSAL_INIT.clone(), - mpsc::channel(1).1, // content - ignored by SHC. - fin_receiver, - ) - .await; - assert_eq!(res, Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0),]))); + let shc_ret = handle_proposal(&mut shc, &mut context).await; + assert_eq!(shc_ret.as_tasks().unwrap()[0].as_validate_proposal().unwrap().0, &*PROPOSAL_INIT); + assert_eq!( + shc.handle_event(&mut context, VALIDATE_PROPOSAL_EVENT.clone()).await, + Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0)])) + ); if repeat_proposal { // Send the same proposal again, which should be ignored (no expectations). - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(BLOCK.id).unwrap(); - - let res = shc - .handle_proposal( - &mut context, - PROPOSAL_INIT.clone(), - mpsc::channel(1).1, // content - ignored by SHC. - fin_receiver, - ) - .await; - assert_eq!(res, Ok(ShcReturn::Tasks(Vec::new()))); + let shc_ret = handle_proposal(&mut shc, &mut context).await; + assert_eq!(shc_ret, ShcReturn::Tasks(Vec::new())); } assert_eq!( shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)).await, @@ -239,9 +250,6 @@ async fn vote_twice(same_vote: bool) { TIMEOUTS.clone(), ); - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(BLOCK.id).unwrap(); - context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID); context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); @@ -253,15 +261,12 @@ async fn vote_twice(same_vote: bool) { .times(1) // Shows the repeat vote is ignored. .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) .returning(move |_| Ok(())); - let res = shc - .handle_proposal( - &mut context, - PROPOSAL_INIT.clone(), - mpsc::channel(1).1, // content - ignored by SHC. - fin_receiver, - ) - .await; - assert_eq!(res, Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0),]))); + let shc_ret = handle_proposal(&mut shc, &mut context).await; + assert_eq!(shc_ret.as_tasks().unwrap()[0].as_validate_proposal().unwrap().0, &*PROPOSAL_INIT,); + assert_eq!( + shc.handle_event(&mut context, VALIDATE_PROPOSAL_EVENT.clone()).await, + Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0)])) + ); let res = shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)).await; assert_eq!(res, Ok(ShcReturn::Tasks(Vec::new()))); @@ -325,9 +330,15 @@ async fn rebroadcast_votes() { .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) .returning(move |_| Ok(())); // Sends proposal and prevote. + let shc_ret = shc.start(&mut context).await.unwrap(); + assert_eq!(*shc_ret.as_tasks().unwrap()[0].as_build_proposal().unwrap().0, 0); assert_eq!( - shc.start(&mut context).await, - Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0),])) + shc.handle_event( + &mut context, + ShcEvent::BuildProposal(StateMachineEvent::GetProposal(Some(BLOCK.id), 0)), + ) + .await, + Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0)])) ); assert_eq!( shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await, @@ -348,7 +359,11 @@ async fn rebroadcast_votes() { ); // Re-broadcast vote. assert_eq!( - shc.handle_event(&mut context, StateMachineEvent::Precommit(Some(BLOCK.id), 0),).await, + shc.handle_event( + &mut context, + ShcEvent::Precommit(StateMachineEvent::Precommit(Some(BLOCK.id), 0)) + ) + .await, Ok(ShcReturn::Tasks(vec![precommit_task(Some(BLOCK.id.0), 0),])) ); } @@ -377,6 +392,12 @@ async fn repropose() { .returning(move |_| Ok(())); // Sends proposal and prevote. shc.start(&mut context).await.unwrap(); + shc.handle_event( + &mut context, + ShcEvent::BuildProposal(StateMachineEvent::GetProposal(Some(BLOCK.id), 0)), + ) + .await + .unwrap(); shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) .await .unwrap(); @@ -411,7 +432,12 @@ async fn repropose() { .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)) .returning(move |_| Ok(())); shc.handle_message(&mut context, precommits[2].clone()).await.unwrap(); - shc.handle_event(&mut context, StateMachineEvent::TimeoutPrecommit(0)).await.unwrap(); + shc.handle_event( + &mut context, + ShcEvent::TimeoutPrecommit(StateMachineEvent::TimeoutPrecommit(0)), + ) + .await + .unwrap(); let precommits = vec![ precommit(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_1),