Skip to content

Commit

Permalink
feat(consensus): add sync sender to context to broadcast on decision
Browse files Browse the repository at this point in the history
This is optional since outside of testing mode sync should be its own component of the node.
  • Loading branch information
matan-starkware committed Aug 12, 2024
1 parent 37840e3 commit d61b95f
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 22 deletions.
1 change: 1 addition & 0 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ fn run_consensus(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
None,
);
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
Expand Down
5 changes: 3 additions & 2 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ pub struct Proposal {
pub block_hash: BlockHash,
}

#[derive(Debug, Hash, Clone, Eq, PartialEq)]
#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)]
pub enum VoteType {
Prevote,
#[default]
Precommit,
}

#[derive(Debug, Hash, Clone, Eq, PartialEq)]
#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)]
pub struct Vote {
pub vote_type: VoteType,
pub height: u64,
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl From<Vote> for protobuf::Vote {
}
}

auto_impl_into_and_try_from_vec_u8!(Vote, protobuf::Vote);

impl TryFrom<protobuf::ConsensusMessage> for ConsensusMessage {
type Error = ProtobufConversionError;

Expand Down
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ mock! {
) -> Result<(), ConsensusError>;

async fn decision(
&self,
&mut self,
block: TestBlock,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::ProposalWrapper;

// TODO: add debug messages and span to the tasks.

#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct PapyrusConsensusBlock {
content: Vec<Transaction>,
id: BlockHash,
Expand All @@ -45,22 +45,25 @@ impl ConsensusBlock for PapyrusConsensusBlock {

pub struct PapyrusConsensusContext {
storage_reader: StorageReader,
broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
network_broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
validators: Vec<ValidatorId>,
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
}

impl PapyrusConsensusContext {
// TODO(dvir): remove the dead code attribute after we will use this function.
#[allow(dead_code)]
pub fn new(
storage_reader: StorageReader,
broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
network_broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
num_validators: u64,
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
) -> Self {
Self {
storage_reader,
broadcast_sender,
network_broadcast_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
sync_broadcast_sender,
}
}
}
Expand Down Expand Up @@ -160,9 +163,13 @@ impl ConsensusContext for PapyrusConsensusContext {
panic!("Block in {height} was not found in storage despite waiting for it")
})
.block_hash;

// This can happen as a result of sync interrupting `run_height`.
fin_sender
.send(PapyrusConsensusBlock { content: transactions, id: block_hash })
.expect("Send should succeed");
.unwrap_or_else(|_| {
warn!("Failed to send block to consensus. height={height}");
})
}
.instrument(debug_span!("consensus_validate_proposal")),
);
Expand All @@ -180,7 +187,7 @@ impl ConsensusContext for PapyrusConsensusContext {

async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> {
debug!("Broadcasting message: {message:?}");
self.broadcast_sender.send(message).await?;
self.network_broadcast_sender.send(message).await?;
Ok(())
}

Expand All @@ -190,7 +197,7 @@ impl ConsensusContext for PapyrusConsensusContext {
mut content_receiver: mpsc::Receiver<Transaction>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<(), ConsensusError> {
let mut broadcast_sender = self.broadcast_sender.clone();
let mut network_broadcast_sender = self.network_broadcast_sender.clone();

tokio::spawn(
async move {
Expand Down Expand Up @@ -219,7 +226,7 @@ impl ConsensusContext for PapyrusConsensusContext {
proposal.block_hash
);

broadcast_sender
network_broadcast_sender
.send(ConsensusMessage::Proposal(proposal))
.await
.expect("Failed to send proposal");
Expand All @@ -230,7 +237,7 @@ impl ConsensusContext for PapyrusConsensusContext {
}

async fn decision(
&self,
&mut self,
block: Self::Block,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError> {
Expand All @@ -239,6 +246,10 @@ impl ConsensusContext for PapyrusConsensusContext {
"Finished consensus for height: {height}. Agreed on block with id: {:x}",
block.id().0
);
if let Some(sender) = &mut self.sync_broadcast_sender {
sender.send(precommits[0].clone()).await?;
}

Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_network::network_manager::{mock_register_broadcast_subscriber, BroadcastNetworkMock};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_network::network_manager::{
mock_register_broadcast_subscriber,
BroadcastNetworkMock,
BroadcastSubscriberSender,
};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote};
use papyrus_storage::body::BodyStorageWriter;
use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
Expand All @@ -10,7 +14,7 @@ use starknet_api::block::Block;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;

use crate::papyrus_consensus_context::PapyrusConsensusContext;
use crate::papyrus_consensus_context::{PapyrusConsensusBlock, PapyrusConsensusContext};
use crate::types::{ConsensusBlock, ConsensusContext, ProposalInit};

// TODO(dvir): consider adding tests for times, i.e, the calls are returned immediately and nothing
Expand All @@ -20,7 +24,7 @@ const TEST_CHANNEL_SIZE: usize = 10;

#[tokio::test]
async fn build_proposal() {
let (block, papyrus_context, _mock_network) = test_setup();
let (block, papyrus_context, _mock_network) = test_setup(None);
let block_number = block.header.block_number;

let (mut proposal_receiver, fin_receiver) = papyrus_context.build_proposal(block_number).await;
Expand All @@ -38,7 +42,7 @@ async fn build_proposal() {

#[tokio::test]
async fn validate_proposal_success() {
let (block, papyrus_context, _mock_network) = test_setup();
let (block, papyrus_context, _mock_network) = test_setup(None);
let block_number = block.header.block_number;

let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE);
Expand All @@ -56,7 +60,7 @@ async fn validate_proposal_success() {

#[tokio::test]
async fn validate_proposal_fail() {
let (block, papyrus_context, _mock_network) = test_setup();
let (block, papyrus_context, _mock_network) = test_setup(None);
let block_number = block.header.block_number;

let different_block = get_test_block(4, None, None, None);
Expand All @@ -72,7 +76,7 @@ async fn validate_proposal_fail() {

#[tokio::test]
async fn propose() {
let (block, papyrus_context, mut mock_network) = test_setup();
let (block, papyrus_context, mut mock_network) = test_setup(None);
let block_number = block.header.block_number;

let (mut content_sender, content_receiver) = mpsc::channel(TEST_CHANNEL_SIZE);
Expand All @@ -99,7 +103,23 @@ async fn propose() {
assert_eq!(mock_network.messages_to_broadcast_receiver.next().await.unwrap(), expected_message);
}

fn test_setup() -> (Block, PapyrusConsensusContext, BroadcastNetworkMock<ConsensusMessage>) {
#[tokio::test]
async fn decision() {
let mut sync_channels = mock_register_broadcast_subscriber().unwrap();
let (_, mut papyrus_context, _) =
test_setup(Some(sync_channels.subscriber_channels.messages_to_broadcast_sender));
let block = PapyrusConsensusBlock::default();
let precommit = Vote::default();
papyrus_context.decision(block, vec![precommit.clone()]).await.unwrap();
assert_eq!(
sync_channels.mock_network.messages_to_broadcast_receiver.next().await.unwrap(),
precommit
);
}

fn test_setup(
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
) -> (Block, PapyrusConsensusContext, BroadcastNetworkMock<ConsensusMessage>) {
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage();
let block = get_test_block(5, None, None, None);
let block_number = block.header.block_number;
Expand All @@ -118,6 +138,7 @@ fn test_setup() -> (Block, PapyrusConsensusContext, BroadcastNetworkMock<Consens
storage_reader.clone(),
test_channels.subscriber_channels.messages_to_broadcast_sender,
4,
sync_broadcast_sender,
);
(block, papyrus_context, test_channels.mock_network)
}
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mock! {
) -> Result<(), ConsensusError>;

async fn decision(
&self,
&mut self,
block: TestBlock,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;
Expand Down
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub trait ConsensusContext {
) -> Result<(), ConsensusError>;

async fn decision(
&self,
&mut self,
block: Self::Block,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;
Expand Down

0 comments on commit d61b95f

Please sign in to comment.