diff --git a/crates/sequencing/papyrus_consensus/src/lib.rs b/crates/sequencing/papyrus_consensus/src/lib.rs index c31fe7b9b74..49206cd6f5b 100644 --- a/crates/sequencing/papyrus_consensus/src/lib.rs +++ b/crates/sequencing/papyrus_consensus/src/lib.rs @@ -7,23 +7,17 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::Stream; +use manager::Manager; use papyrus_common::metrics as papyrus_metrics; use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use papyrus_protobuf::converters::ProtobufConversionError; -use single_height_consensus::SingleHeightConsensus; use starknet_api::block::{BlockHash, BlockNumber}; use tracing::{debug, info, instrument}; -use types::{ - ConsensusBlock, - ConsensusContext, - ConsensusError, - Decision, - ProposalInit, - ValidatorId, -}; +use types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; pub mod config; +pub mod manager; #[allow(missing_docs)] pub mod papyrus_consensus_context; pub(crate) mod simulation_network_receiver; @@ -36,78 +30,6 @@ pub(crate) mod test_utils; #[allow(missing_docs)] pub mod types; -use futures::StreamExt; - -#[instrument(skip(context, validator_id, network_receiver, cached_messages), level = "info")] -#[allow(missing_docs)] -async fn run_height( - context: &mut ContextT, - height: BlockNumber, - validator_id: ValidatorId, - network_receiver: &mut NetworkReceiverT, - cached_messages: &mut Vec, -) -> Result, ConsensusError> -where - BlockT: ConsensusBlock, - ContextT: ConsensusContext, - NetworkReceiverT: - Stream, ReportSender)> + Unpin, - ProposalWrapper: - Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, -{ - let validators = context.validators(height).await; - let mut shc = SingleHeightConsensus::new(height, validator_id, validators); - - if let Some(decision) = shc.start(context).await? { - return Ok(decision); - } - - let mut current_height_messages = Vec::new(); - for msg in std::mem::take(cached_messages) { - match height.0.cmp(&msg.height()) { - std::cmp::Ordering::Less => cached_messages.push(msg), - std::cmp::Ordering::Equal => current_height_messages.push(msg), - std::cmp::Ordering::Greater => {} - } - } - - loop { - let message = if let Some(msg) = current_height_messages.pop() { - msg - } else { - // TODO(matan): Handle parsing failures and utilize ReportCallback. - network_receiver - .next() - .await - .expect("Network receiver closed unexpectedly") - .0 - .expect("Failed to parse consensus message") - }; - - if message.height() != height.0 { - debug!("Received a message for a different height. {:?}", message); - if message.height() > height.0 { - cached_messages.push(message); - } - continue; - } - - let maybe_decision = match message { - ConsensusMessage::Proposal(proposal) => { - // Special case due to fake streaming. - let (proposal_init, content_receiver, fin_receiver) = - ProposalWrapper(proposal).into(); - shc.handle_proposal(context, proposal_init, content_receiver, fin_receiver).await? - } - _ => shc.handle_message(context, message).await?, - }; - - if let Some(decision) = maybe_decision { - return Ok(decision); - } - } -} - // TODO(dvir): add test for this. #[instrument(skip(context, start_height, network_receiver), level = "info")] #[allow(missing_docs)] @@ -129,16 +51,11 @@ where // Add a short delay to allow peers to connect and avoid "InsufficientPeers" error tokio::time::sleep(consensus_delay).await; let mut current_height = start_height; - let mut future_messages = Vec::new(); + let mut manager = Manager::new(); loop { - let decision = run_height( - &mut context, - current_height, - validator_id, - &mut network_receiver, - &mut future_messages, - ) - .await?; + let decision = manager + .run_height(&mut context, current_height, validator_id, &mut network_receiver) + .await?; info!( "Finished consensus for height: {current_height}. Agreed on block with id: {:x}", diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs new file mode 100644 index 00000000000..43c148c2be0 --- /dev/null +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -0,0 +1,114 @@ +//! Consensus manager, see Manager struct. + +#[cfg(test)] +#[path = "manager_test.rs"] +mod manager_test; + +use futures::channel::{mpsc, oneshot}; +use futures::{Stream, StreamExt}; +use papyrus_network::network_manager::ReportSender; +use papyrus_protobuf::consensus::ConsensusMessage; +use papyrus_protobuf::converters::ProtobufConversionError; +use starknet_api::block::{BlockHash, BlockNumber}; +use tracing::{debug, instrument}; + +use crate::single_height_consensus::SingleHeightConsensus; +use crate::types::{ + ConsensusBlock, + ConsensusContext, + ConsensusError, + Decision, + ProposalInit, + ValidatorId, +}; +use crate::ProposalWrapper; + +/// Used run Tendermint. Handles issues which are not explicitly part of the single height consensus +/// algorithm (e.g. messages from future heights). +pub struct Manager { + cached_messages: Vec, +} + +impl Manager { + /// Create a new consensus manager. + pub fn new() -> Self { + Self { cached_messages: Vec::new() } + } + + /// Run the consensus algorithm for a single height. + /// + /// Assumes that `height` is monotonically increasing across calls for the sake of filtering + /// `cached_messaged`. + #[instrument(skip(self, context, validator_id, network_receiver), level = "info")] + pub async fn run_height( + &mut self, + context: &mut ContextT, + height: BlockNumber, + validator_id: ValidatorId, + network_receiver: &mut NetworkReceiverT, + ) -> Result, ConsensusError> + where + BlockT: ConsensusBlock, + ContextT: ConsensusContext, + NetworkReceiverT: Stream, ReportSender)> + + Unpin, + ProposalWrapper: Into<( + ProposalInit, + mpsc::Receiver, + oneshot::Receiver, + )>, + { + let validators = context.validators(height).await; + let mut shc = SingleHeightConsensus::new(height, validator_id, validators); + + if let Some(decision) = shc.start(context).await? { + return Ok(decision); + } + + let mut current_height_messages = Vec::new(); + for msg in std::mem::take(&mut self.cached_messages) { + match height.0.cmp(&msg.height()) { + std::cmp::Ordering::Less => self.cached_messages.push(msg), + std::cmp::Ordering::Equal => current_height_messages.push(msg), + std::cmp::Ordering::Greater => {} + } + } + + loop { + let message = if let Some(msg) = current_height_messages.pop() { + msg + } else { + // TODO(matan): Handle parsing failures and utilize ReportCallback. + network_receiver + .next() + .await + .expect("Network receiver closed unexpectedly") + .0 + .expect("Failed to parse consensus message") + }; + + if message.height() != height.0 { + debug!("Received a message for a different height. {:?}", message); + if message.height() > height.0 { + self.cached_messages.push(message); + } + continue; + } + + let maybe_decision = match message { + ConsensusMessage::Proposal(proposal) => { + // Special case due to fake streaming. + let (proposal_init, content_receiver, fin_receiver) = + ProposalWrapper(proposal).into(); + shc.handle_proposal(context, proposal_init, content_receiver, fin_receiver) + .await? + } + _ => shc.handle_message(context, message).await?, + }; + + if let Some(decision) = maybe_decision { + return Ok(decision); + } + } + } +} diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs new file mode 100644 index 00000000000..f43c728c6dc --- /dev/null +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -0,0 +1,163 @@ +use std::vec; + +use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; +use futures::SinkExt; +use lazy_static::lazy_static; +use mockall::mock; +use papyrus_network::network_manager::ReportSender; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote, VoteType}; +use papyrus_protobuf::converters::ProtobufConversionError; +use starknet_api::block::{BlockHash, BlockNumber}; +use starknet_api::transaction::Transaction; +use starknet_types_core::felt::Felt; + +use super::Manager; +use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; + +lazy_static! { + static ref VALIDATOR_ID: ValidatorId = 1_u32.into(); + static ref PROPOSER_ID: ValidatorId = 0_u32.into(); +} + +// TODO(matan): Switch to using TestBlock & MockTestContext in `test_utils` once streaming is +// supported. Streaming should allow us to make the Manager generic over the content. +#[derive(Debug, PartialEq, Clone)] +pub struct TestBlock { + pub content: Vec, + pub id: BlockHash, +} + +impl ConsensusBlock for TestBlock { + type ProposalChunk = Transaction; + type ProposalIter = std::vec::IntoIter; + + fn id(&self) -> BlockHash { + self.id + } + + fn proposal_iter(&self) -> Self::ProposalIter { + self.content.clone().into_iter() + } +} + +mock! { + pub TestContext {} + + #[async_trait] + impl ConsensusContext for TestContext { + type Block = TestBlock; + + async fn build_proposal(&self, height: BlockNumber) -> ( + mpsc::Receiver, + oneshot::Receiver + ); + + async fn validate_proposal( + &self, + height: BlockNumber, + content: mpsc::Receiver + ) -> oneshot::Receiver; + + async fn validators(&self, height: BlockNumber) -> Vec; + + fn proposer(&self, validators: &[ValidatorId], height: BlockNumber) -> ValidatorId; + + async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError>; + + async fn propose( + &self, + init: ProposalInit, + content_receiver: mpsc::Receiver, + fin_receiver: oneshot::Receiver, + ) -> Result<(), ConsensusError>; + } +} + +type Sender = + mpsc::UnboundedSender<(Result, ReportSender)>; + +async fn send(sender: &mut Sender, msg: ConsensusMessage) { + sender + .send((Ok(msg.clone()), oneshot::channel().0)) + .await + .expect(&format!("Failed to send message: {msg:?}")); +} +fn proposal(block_hash: BlockHash, height: u64) -> ConsensusMessage { + ConsensusMessage::Proposal(Proposal { + height, + block_hash, + round: 0, + proposer: *PROPOSER_ID, + transactions: vec![], + }) +} + +fn prevote(block_hash: Option, height: u64, voter: ValidatorId) -> ConsensusMessage { + ConsensusMessage::Vote(Vote { + vote_type: VoteType::Prevote, + height, + round: 0, + block_hash, + voter, + }) +} + +fn precommit(block_hash: Option, height: u64, voter: ValidatorId) -> ConsensusMessage { + ConsensusMessage::Vote(Vote { + vote_type: VoteType::Precommit, + height, + round: 0, + block_hash, + voter, + }) +} + +#[tokio::test] +async fn run_multiple_heights() { + let mut context = MockTestContext::new(); + + let (mut sender, mut receiver) = mpsc::unbounded(); + // Send messages for height 2 followed by those for height 1. + send(&mut sender, proposal(BlockHash(Felt::TWO), 2)).await; + send(&mut sender, prevote(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await; + send(&mut sender, proposal(BlockHash(Felt::ONE), 1)).await; + send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await; + + let mut manager = Manager::new(); + + // Run the manager for height 1. + context + .expect_validate_proposal() + .return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::ONE) }).unwrap(); + block_receiver + }) + .times(1); + context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); + context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context.expect_broadcast().returning(move |_| Ok(())); + let decision = manager + .run_height(&mut context, BlockNumber(1), *VALIDATOR_ID, &mut receiver) + .await + .unwrap(); + assert_eq!(decision.block.id(), BlockHash(Felt::ONE)); + + // Run the manager for height 2. + context + .expect_validate_proposal() + .return_once(move |_, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::TWO) }).unwrap(); + block_receiver + }) + .times(1); + let decision = manager + .run_height(&mut context, BlockNumber(2), *VALIDATOR_ID, &mut receiver) + .await + .unwrap(); + assert_eq!(decision.block.id(), BlockHash(Felt::TWO)); +}