diff --git a/crates/consensus_manager/src/consensus_manager.rs b/crates/consensus_manager/src/consensus_manager.rs index bbabb8f6d0..17ebec9843 100644 --- a/crates/consensus_manager/src/consensus_manager.rs +++ b/crates/consensus_manager/src/consensus_manager.rs @@ -8,9 +8,10 @@ use futures::SinkExt; use libp2p::PeerId; use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; +use papyrus_network::gossipsub_impl::Topic; use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager}; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::ConsensusMessage; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart}; use starknet_batcher_types::communication::SharedBatcherClient; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; @@ -34,10 +35,17 @@ impl ConsensusManager { } pub async fn run(&self) -> Result<(), ConsensusError> { - let network_manager = + let mut network_manager = NetworkManager::new(self.config.consensus_config.network_config.clone(), None); + let proposals_broadcast_channels = network_manager + .register_broadcast_topic::( + Topic::new(NETWORK_TOPIC), + BROADCAST_BUFFER_SIZE, + ) + .expect("Failed to register broadcast topic"); let context = SequencerConsensusContext::new( Arc::clone(&self.batcher_client), + proposals_broadcast_channels.broadcast_topic_client.clone(), self.config.consensus_config.num_validators, ); let network_handle = tokio::task::spawn(network_manager.run()); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index cf59a159a5..4e5d170ef5 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -20,7 +20,14 @@ use papyrus_consensus::types::{ Round, ValidatorId, }; -use papyrus_protobuf::consensus::{ConsensusMessage, Vote}; +use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + ProposalInit as ProtobufProposalInit, + ProposalPart, + TransactionBatch, + Vote, +}; use starknet_api::block::{BlockHash, BlockHashAndNumber, BlockNumber}; use starknet_api::executable_transaction::Transaction; use starknet_batcher_types::batcher_types::{ @@ -36,7 +43,7 @@ use starknet_batcher_types::batcher_types::{ ValidateProposalInput, }; use starknet_batcher_types::communication::BatcherClient; -use tracing::{debug, debug_span, error, info, warn, Instrument}; +use tracing::{debug, debug_span, error, info, trace, warn, Instrument}; // {height: {proposal_id: (content, [proposal_ids])}} // Note that multiple proposals IDs can be associated with the same content, but we only need to @@ -56,12 +63,18 @@ pub struct SequencerConsensusContext { // restarting. proposal_id: u64, current_height: Option, + network_broadcast_client: BroadcastTopicClient, } impl SequencerConsensusContext { - pub fn new(batcher: Arc, num_validators: u64) -> Self { + pub fn new( + batcher: Arc, + network_broadcast_client: BroadcastTopicClient, + num_validators: u64, + ) -> Self { Self { batcher, + network_broadcast_client, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -112,6 +125,18 @@ impl ConsensusContext for SequencerConsensusContext { .build_proposal(build_proposal_input) .await .expect("Failed to initiate proposal build"); + let protobuf_consensus_init = ProtobufProposalInit { + height: proposal_init.height.0, + round: proposal_init.round, + proposer: proposal_init.proposer, + valid_round: proposal_init.valid_round, + }; + debug!("Broadcasting proposal init: {protobuf_consensus_init:?}"); + self.network_broadcast_client + .broadcast_message(ProposalPart::Init(protobuf_consensus_init)) + .await + .expect("Failed to broadcast proposal init"); + let broadcast_client = self.network_broadcast_client.clone(); tokio::spawn( async move { stream_build_proposal( @@ -119,6 +144,7 @@ impl ConsensusContext for SequencerConsensusContext { proposal_id, batcher, valid_proposals, + broadcast_client, fin_sender, ) .await; @@ -255,6 +281,7 @@ async fn stream_build_proposal( proposal_id: ProposalId, batcher: Arc, valid_proposals: Arc>, + mut broadcast_client: BroadcastTopicClient, fin_sender: oneshot::Sender, ) { let mut content = Vec::new(); @@ -273,6 +300,20 @@ async fn stream_build_proposal( // TODO: Broadcast the transactions to the network. // TODO(matan): Convert to protobuf and make sure this isn't too large for a single // proto message (could this be a With adapter added to the channel in `new`?). + let mut transaction_hashes = Vec::with_capacity(txs.len()); + let mut transactions = Vec::with_capacity(txs.len()); + txs.into_iter().for_each(|tx| { + transaction_hashes.push(tx.tx_hash()); + transactions.push(tx.tx()); + }); + debug!("Broadcasting proposal content: {transaction_hashes:?}"); + trace!("Broadcasting proposal content: {transactions:?}"); + broadcast_client + .broadcast_message(ProposalPart::Transactions(TransactionBatch { + transactions, + })) + .await + .expect("Failed to broadcast proposal content"); } GetProposalContent::Finished(id) => { let proposal_content_id = BlockHash(id.state_diff_commitment.0.0); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index dea03b4000..3d21219dc0 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -6,6 +6,12 @@ use futures::channel::mpsc; use futures::SinkExt; use lazy_static::lazy_static; use papyrus_consensus::types::{ConsensusContext, ProposalInit}; +use papyrus_network::network_manager::test_utils::{ + mock_register_broadcast_topic, + BroadcastNetworkMock, + TestSubscriberChannels, +}; +use papyrus_network::network_manager::BroadcastTopicChannels; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::{ContractAddress, StateDiffCommitment}; use starknet_api::executable_transaction::Transaction; @@ -73,7 +79,15 @@ async fn build_proposal() { }), }) }); - let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS); + let TestSubscriberChannels { mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = + subscriber_channels; + let BroadcastNetworkMock { + broadcasted_messages_sender: _mock_broadcasted_messages_sender, .. + } = mock_network; + let mut context = + SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); let init = ProposalInit { height: BlockNumber(0), round: 0, @@ -121,7 +135,12 @@ async fn validate_proposal_success() { }) }, ); - let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS); + let TestSubscriberChannels { mock_network: _, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = + subscriber_channels; + let mut context = + SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver = context.validate_proposal(BlockNumber(0), TIMEOUT, content_receiver).await; @@ -154,8 +173,12 @@ async fn repropose() { }) }, ); - - let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS); + let TestSubscriberChannels { mock_network: _, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = + subscriber_channels; + let mut context = + SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); // Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); diff --git a/crates/starknet_api/src/executable_transaction.rs b/crates/starknet_api/src/executable_transaction.rs index 0553553267..876c92ec04 100644 --- a/crates/starknet_api/src/executable_transaction.rs +++ b/crates/starknet_api/src/executable_transaction.rs @@ -51,6 +51,16 @@ pub enum Transaction { } impl Transaction { + pub fn tx(self) -> crate::transaction::Transaction { + match self { + Transaction::Declare(tx_data) => crate::transaction::Transaction::Declare(tx_data.tx), + Transaction::DeployAccount(tx_data) => { + crate::transaction::Transaction::DeployAccount(tx_data.tx) + } + Transaction::Invoke(tx_data) => crate::transaction::Transaction::Invoke(tx_data.tx), + } + } + pub fn contract_address(&self) -> ContractAddress { match self { Transaction::Declare(tx_data) => tx_data.tx.sender_address(),