diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index eb78a3c613..b40384a3f9 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -110,6 +110,7 @@ fn run_consensus( storage_reader.clone(), network_channels.messages_to_broadcast_sender, config.num_validators, + None, ); // TODO(matan): connect this to an actual channel. if let Some(test_config) = config.test.as_ref() { diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index 8abd26f8c8..ac168536a6 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -11,13 +11,14 @@ pub struct Proposal { pub block_hash: BlockHash, } -#[derive(Debug, Hash, Clone, Eq, PartialEq)] +#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)] pub enum VoteType { Prevote, + #[default] Precommit, } -#[derive(Debug, Hash, Clone, Eq, PartialEq)] +#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)] pub struct Vote { pub vote_type: VoteType, pub height: u64, diff --git a/crates/papyrus_protobuf/src/converters/consensus.rs b/crates/papyrus_protobuf/src/converters/consensus.rs index 1e12b3ee77..ec185e2505 100644 --- a/crates/papyrus_protobuf/src/converters/consensus.rs +++ b/crates/papyrus_protobuf/src/converters/consensus.rs @@ -105,6 +105,8 @@ impl From for protobuf::Vote { } } +auto_impl_into_and_try_from_vec_u8!(Vote, protobuf::Vote); + impl TryFrom for ConsensusMessage { type Error = ProtobufConversionError; diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 7b3fe88d91..6f26a5d138 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -75,7 +75,7 @@ mock! { ) -> Result<(), ConsensusError>; async fn notify_decision( - &self, + &mut self, block: TestBlock, precommits: Vec, ) -> Result<(), ConsensusError>; diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs index 9aec17055c..33bd87b3ff 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -24,7 +24,7 @@ use crate::ProposalWrapper; // TODO: add debug messages and span to the tasks. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Default, PartialEq, Eq, Clone)] pub struct PapyrusConsensusBlock { content: Vec, id: BlockHash, @@ -45,8 +45,9 @@ impl ConsensusBlock for PapyrusConsensusBlock { pub struct PapyrusConsensusContext { storage_reader: StorageReader, - broadcast_sender: BroadcastSubscriberSender, + network_broadcast_sender: BroadcastSubscriberSender, validators: Vec, + sync_broadcast_sender: Option>, } impl PapyrusConsensusContext { @@ -54,13 +55,15 @@ impl PapyrusConsensusContext { #[allow(dead_code)] pub fn new( storage_reader: StorageReader, - broadcast_sender: BroadcastSubscriberSender, + network_broadcast_sender: BroadcastSubscriberSender, num_validators: u64, + sync_broadcast_sender: Option>, ) -> Self { Self { storage_reader, - broadcast_sender, + network_broadcast_sender, validators: (0..num_validators).map(ContractAddress::from).collect(), + sync_broadcast_sender, } } } @@ -160,9 +163,13 @@ impl ConsensusContext for PapyrusConsensusContext { panic!("Block in {height} was not found in storage despite waiting for it") }) .block_hash; + + // This can happen as a result of sync interrupting `run_height`. fin_sender .send(PapyrusConsensusBlock { content: transactions, id: block_hash }) - .expect("Send should succeed"); + .unwrap_or_else(|_| { + warn!("Failed to send block to consensus. height={height}"); + }) } .instrument(debug_span!("consensus_validate_proposal")), ); @@ -180,7 +187,7 @@ impl ConsensusContext for PapyrusConsensusContext { async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> { debug!("Broadcasting message: {message:?}"); - self.broadcast_sender.send(message).await?; + self.network_broadcast_sender.send(message).await?; Ok(()) } @@ -190,7 +197,7 @@ impl ConsensusContext for PapyrusConsensusContext { mut content_receiver: mpsc::Receiver, fin_receiver: oneshot::Receiver, ) -> Result<(), ConsensusError> { - let mut broadcast_sender = self.broadcast_sender.clone(); + let mut network_broadcast_sender = self.network_broadcast_sender.clone(); tokio::spawn( async move { @@ -219,7 +226,7 @@ impl ConsensusContext for PapyrusConsensusContext { proposal.block_hash ); - broadcast_sender + network_broadcast_sender .send(ConsensusMessage::Proposal(proposal)) .await .expect("Failed to send proposal"); @@ -230,7 +237,7 @@ impl ConsensusContext for PapyrusConsensusContext { } async fn notify_decision( - &self, + &mut self, block: Self::Block, precommits: Vec, ) -> Result<(), ConsensusError> { @@ -239,6 +246,10 @@ impl ConsensusContext for PapyrusConsensusContext { "Finished consensus for height: {height}. Agreed on block with id: {:x}", block.id().0 ); + if let Some(sender) = &mut self.sync_broadcast_sender { + sender.send(precommits[0].clone()).await?; + } + Ok(()) } } diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs index a9f9c853e3..84f1ac505b 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs @@ -1,7 +1,11 @@ use futures::channel::{mpsc, oneshot}; use futures::StreamExt; -use papyrus_network::network_manager::{mock_register_broadcast_subscriber, BroadcastNetworkMock}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; +use papyrus_network::network_manager::{ + mock_register_broadcast_subscriber, + BroadcastNetworkMock, + BroadcastSubscriberSender, +}; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote}; use papyrus_storage::body::BodyStorageWriter; use papyrus_storage::header::HeaderStorageWriter; use papyrus_storage::test_utils::get_test_storage; @@ -10,7 +14,7 @@ use starknet_api::block::Block; use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; -use crate::papyrus_consensus_context::PapyrusConsensusContext; +use crate::papyrus_consensus_context::{PapyrusConsensusBlock, PapyrusConsensusContext}; use crate::types::{ConsensusBlock, ConsensusContext, ProposalInit}; // TODO(dvir): consider adding tests for times, i.e, the calls are returned immediately and nothing @@ -20,7 +24,7 @@ const TEST_CHANNEL_SIZE: usize = 10; #[tokio::test] async fn build_proposal() { - let (block, papyrus_context, _mock_network) = test_setup(); + let (block, papyrus_context, _mock_network) = test_setup(None); let block_number = block.header.block_number; let (mut proposal_receiver, fin_receiver) = papyrus_context.build_proposal(block_number).await; @@ -38,7 +42,7 @@ async fn build_proposal() { #[tokio::test] async fn validate_proposal_success() { - let (block, papyrus_context, _mock_network) = test_setup(); + let (block, papyrus_context, _mock_network) = test_setup(None); let block_number = block.header.block_number; let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); @@ -56,7 +60,7 @@ async fn validate_proposal_success() { #[tokio::test] async fn validate_proposal_fail() { - let (block, papyrus_context, _mock_network) = test_setup(); + let (block, papyrus_context, _mock_network) = test_setup(None); let block_number = block.header.block_number; let different_block = get_test_block(4, None, None, None); @@ -72,7 +76,7 @@ async fn validate_proposal_fail() { #[tokio::test] async fn propose() { - let (block, papyrus_context, mut mock_network) = test_setup(); + let (block, papyrus_context, mut mock_network) = test_setup(None); let block_number = block.header.block_number; let (mut content_sender, content_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); @@ -99,7 +103,23 @@ async fn propose() { assert_eq!(mock_network.messages_to_broadcast_receiver.next().await.unwrap(), expected_message); } -fn test_setup() -> (Block, PapyrusConsensusContext, BroadcastNetworkMock) { +#[tokio::test] +async fn decision() { + let mut sync_channels = mock_register_broadcast_subscriber().unwrap(); + let (_, mut papyrus_context, _) = + test_setup(Some(sync_channels.subscriber_channels.messages_to_broadcast_sender)); + let block = PapyrusConsensusBlock::default(); + let precommit = Vote::default(); + papyrus_context.decision(block, vec![precommit.clone()]).await.unwrap(); + assert_eq!( + sync_channels.mock_network.messages_to_broadcast_receiver.next().await.unwrap(), + precommit + ); +} + +fn test_setup( + sync_broadcast_sender: Option>, +) -> (Block, PapyrusConsensusContext, BroadcastNetworkMock) { let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); let block = get_test_block(5, None, None, None); let block_number = block.header.block_number; @@ -118,6 +138,7 @@ fn test_setup() -> (Block, PapyrusConsensusContext, BroadcastNetworkMock Result<(), ConsensusError>; async fn notify_decision( - &self, + &mut self, block: TestBlock, precommits: Vec, ) -> Result<(), ConsensusError>; diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 41414fa849..bd81fe8c99 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -141,7 +141,7 @@ pub trait ConsensusContext { /// - `precommits` - All precommits must be for the same `(block.id(), height, round)` and form /// a quorum (>2/3 of the voting power) for this height. async fn notify_decision( - &self, + &mut self, block: Self::Block, precommits: Vec, ) -> Result<(), ConsensusError>;