diff --git a/crates/consensus_manager/src/consensus_manager.rs b/crates/consensus_manager/src/consensus_manager.rs index 9a1f1407ac..61ac39fbf5 100644 --- a/crates/consensus_manager/src/consensus_manager.rs +++ b/crates/consensus_manager/src/consensus_manager.rs @@ -8,9 +8,10 @@ use futures::SinkExt; use libp2p::PeerId; use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; +use papyrus_network::gossipsub_impl::Topic; use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager}; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::ConsensusMessage; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart}; use starknet_batcher_types::communication::SharedBatcherClient; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; @@ -34,10 +35,17 @@ impl ConsensusManager { } pub async fn run(&self) -> Result<(), ConsensusError> { - let network_manager = + let mut network_manager = NetworkManager::new(self.config.consensus_config.network_config.clone(), None); + let proposals_broadcast_channels = network_manager + .register_broadcast_topic::( + Topic::new(NETWORK_TOPIC), + BROADCAST_BUFFER_SIZE, + ) + .expect("Failed to register broadcast topic"); let context = SequencerConsensusContext::new( Arc::clone(&self.batcher_client), + proposals_broadcast_channels.broadcast_topic_client.clone(), self.config.consensus_config.num_validators, ); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index cf59a159a5..6bdb75cb51 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -20,7 +20,14 @@ use papyrus_consensus::types::{ Round, ValidatorId, }; -use papyrus_protobuf::consensus::{ConsensusMessage, Vote}; +use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + ProposalInit as ProtobufProposalInit, + ProposalPart, + TransactionBatch, + Vote, +}; use starknet_api::block::{BlockHash, BlockHashAndNumber, BlockNumber}; use starknet_api::executable_transaction::Transaction; use starknet_batcher_types::batcher_types::{ @@ -36,7 +43,7 @@ use starknet_batcher_types::batcher_types::{ ValidateProposalInput, }; use starknet_batcher_types::communication::BatcherClient; -use tracing::{debug, debug_span, error, info, warn, Instrument}; +use tracing::{debug, debug_span, error, info, trace, warn, Instrument}; // {height: {proposal_id: (content, [proposal_ids])}} // Note that multiple proposals IDs can be associated with the same content, but we only need to @@ -56,12 +63,18 @@ pub struct SequencerConsensusContext { // restarting. proposal_id: u64, current_height: Option, + network_broadcast_client: BroadcastTopicClient, } impl SequencerConsensusContext { - pub fn new(batcher: Arc, num_validators: u64) -> Self { + pub fn new( + batcher: Arc, + network_broadcast_client: BroadcastTopicClient, + num_validators: u64, + ) -> Self { Self { batcher, + network_broadcast_client, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -112,6 +125,18 @@ impl ConsensusContext for SequencerConsensusContext { .build_proposal(build_proposal_input) .await .expect("Failed to initiate proposal build"); + let protobuf_consensus_init = ProtobufProposalInit { + height: proposal_init.height.0, + round: proposal_init.round, + proposer: proposal_init.proposer, + valid_round: proposal_init.valid_round, + }; + debug!("Broadcasting proposal init: {protobuf_consensus_init:?}"); + self.network_broadcast_client + .broadcast_message(ProposalPart::Init(protobuf_consensus_init)) + .await + .expect("Failed to broadcast proposal init"); + let broadcast_client = self.network_broadcast_client.clone(); tokio::spawn( async move { stream_build_proposal( @@ -119,6 +144,7 @@ impl ConsensusContext for SequencerConsensusContext { proposal_id, batcher, valid_proposals, + broadcast_client, fin_sender, ) .await; @@ -255,6 +281,7 @@ async fn stream_build_proposal( proposal_id: ProposalId, batcher: Arc, valid_proposals: Arc>, + mut broadcast_client: BroadcastTopicClient, fin_sender: oneshot::Sender, ) { let mut content = Vec::new(); @@ -273,6 +300,20 @@ async fn stream_build_proposal( // TODO: Broadcast the transactions to the network. // TODO(matan): Convert to protobuf and make sure this isn't too large for a single // proto message (could this be a With adapter added to the channel in `new`?). + let mut transaction_hashes = Vec::with_capacity(txs.len()); + let mut transactions = Vec::with_capacity(txs.len()); + for tx in txs.into_iter() { + transaction_hashes.push(tx.tx_hash()); + transactions.push(tx.into()); + } + debug!("Broadcasting proposal content: {transaction_hashes:?}"); + trace!("Broadcasting proposal content: {transactions:?}"); + broadcast_client + .broadcast_message(ProposalPart::Transactions(TransactionBatch { + transactions, + })) + .await + .expect("Failed to broadcast proposal content"); } GetProposalContent::Finished(id) => { let proposal_content_id = BlockHash(id.state_diff_commitment.0.0); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 3806295482..f1d36e8b7e 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -6,6 +6,11 @@ use futures::channel::mpsc; use futures::SinkExt; use lazy_static::lazy_static; use papyrus_consensus::types::{ConsensusContext, ProposalInit}; +use papyrus_network::network_manager::test_utils::{ + mock_register_broadcast_topic, + TestSubscriberChannels, +}; +use papyrus_network::network_manager::BroadcastTopicChannels; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::{ContractAddress, StateDiffCommitment}; use starknet_api::executable_transaction::{AccountTransaction, Transaction}; @@ -73,7 +78,12 @@ async fn build_proposal() { }), }) }); - let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS); + let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = + subscriber_channels; + let mut context = + SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); let init = ProposalInit { height: BlockNumber(0), round: 0, @@ -121,7 +131,12 @@ async fn validate_proposal_success() { }) }, ); - let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS); + let TestSubscriberChannels { mock_network: _, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = + subscriber_channels; + let mut context = + SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver = context.validate_proposal(BlockNumber(0), TIMEOUT, content_receiver).await; @@ -154,8 +169,12 @@ async fn repropose() { }) }, ); - - let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS); + let TestSubscriberChannels { mock_network: _, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = + subscriber_channels; + let mut context = + SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); // Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); diff --git a/crates/starknet_api/src/transaction.rs b/crates/starknet_api/src/transaction.rs index e7888bb64a..45672a262c 100644 --- a/crates/starknet_api/src/transaction.rs +++ b/crates/starknet_api/src/transaction.rs @@ -105,6 +105,18 @@ impl Transaction { } } +impl From for Transaction { + fn from(tx: crate::executable_transaction::Transaction) -> Self { + match tx { + crate::executable_transaction::Transaction::Declare(tx) => Transaction::Declare(tx.tx), + crate::executable_transaction::Transaction::DeployAccount(tx) => { + Transaction::DeployAccount(tx.tx) + } + crate::executable_transaction::Transaction::Invoke(tx) => Transaction::Invoke(tx.tx), + } + } +} + #[derive(Copy, Clone, Debug, Eq, PartialEq, Default)] pub struct TransactionOptions { /// Transaction that shouldn't be broadcasted to StarkNet. For example, users that want to