diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 401a661293..89945cec95 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -14,7 +14,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use papyrus_consensus::types::{ ConsensusContext, ConsensusError, @@ -23,7 +23,7 @@ use papyrus_consensus::types::{ ValidatorId, }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalFin, ProposalInit, ProposalPart, TransactionBatch, Vote}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; @@ -36,10 +36,12 @@ use tracing::{debug, debug_span, info, warn, Instrument}; type HeightToIdToContent = BTreeMap>>; +const CHANNEL_SIZE: usize = 100; + pub struct PapyrusConsensusContext { storage_reader: StorageReader, network_broadcast_client: BroadcastTopicClient, - _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, validators: Vec, sync_broadcast_sender: Option>, // Proposal building/validating returns immediately, leaving the actual processing to a spawned @@ -52,14 +54,14 @@ impl PapyrusConsensusContext { pub fn new( storage_reader: StorageReader, network_broadcast_client: BroadcastTopicClient, - _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, sync_broadcast_sender: Option>, ) -> Self { Self { storage_reader, network_broadcast_client, - _network_proposal_sender, + network_proposal_sender, validators: (0..num_validators).map(ContractAddress::from).collect(), sync_broadcast_sender, valid_proposals: Arc::new(Mutex::new(BTreeMap::new())), @@ -77,7 +79,7 @@ impl ConsensusContext for PapyrusConsensusContext { proposal_init: ProposalInit, _timeout: Duration, ) -> oneshot::Receiver { - let mut network_broadcast_sender = self.network_broadcast_client.clone(); + let mut proposal_sender_sender = self.network_proposal_sender.clone(); let (fin_sender, fin_receiver) = oneshot::channel(); let storage_reader = self.storage_reader.clone(); @@ -113,18 +115,27 @@ impl ConsensusContext for PapyrusConsensusContext { }) .block_hash; - let proposal = Proposal { - height: proposal_init.height.0, - round: proposal_init.round, - proposer: proposal_init.proposer, - transactions: transactions.clone(), - block_hash, - valid_round: proposal_init.valid_round, - }; - network_broadcast_sender - .broadcast_message(ConsensusMessage::Proposal(proposal)) + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + let stream_id = proposal_init.height.0; + proposal_sender_sender + .send((stream_id, proposal_receiver)) + .await + .expect("Failed to send proposal receiver"); + proposal_sender + .send(Self::ProposalPart::Init(proposal_init.clone())) + .await + .expect("Failed to send proposal init"); + proposal_sender + .send(ProposalPart::Transactions(TransactionBatch { + transactions: transactions.clone(), + tx_hashes: vec![], + })) + .await + .expect("Failed to send transactions"); + proposal_sender + .send(ProposalPart::Fin(ProposalFin { proposal_content_id: block_hash })) .await - .expect("Failed to send proposal"); + .expect("Failed to send fin"); { let mut proposals = valid_proposals .lock() diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 50278b5df3..83ce87067f 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -11,7 +11,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use papyrus_consensus::types::{ ConsensusContext, ConsensusError, @@ -19,7 +19,7 @@ use papyrus_consensus::types::{ Round, ValidatorId, }; -use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; +use papyrus_network::network_manager::BroadcastTopicClient; use papyrus_protobuf::consensus::{ ConsensusMessage, ProposalFin, @@ -51,6 +51,8 @@ use tracing::{debug, debug_span, error, info, trace, warn, Instrument}; type HeightToIdToContent = BTreeMap, ProposalId)>>; +const CHANNEL_SIZE: usize = 100; + pub struct SequencerConsensusContext { batcher: Arc, validators: Vec, @@ -63,21 +65,21 @@ pub struct SequencerConsensusContext { // restarting. proposal_id: u64, current_height: Option, - network_broadcast_client: BroadcastTopicClient, - _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + _network_broadcast_client: BroadcastTopicClient, + outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, } impl SequencerConsensusContext { pub fn new( batcher: Arc, - network_broadcast_client: BroadcastTopicClient, - _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + _network_broadcast_client: BroadcastTopicClient, + outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, ) -> Self { Self { batcher, - network_broadcast_client, - _outbound_proposal_sender, + _network_broadcast_client, + outbound_proposal_sender, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -130,11 +132,16 @@ impl ConsensusContext for SequencerConsensusContext { .await .expect("Failed to initiate proposal build"); debug!("Broadcasting proposal init: {proposal_init:?}"); - self.network_broadcast_client - .broadcast_message(ProposalPart::Init(proposal_init.clone())) + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + let stream_id = proposal_init.height.0; + self.outbound_proposal_sender + .send((stream_id, proposal_receiver)) + .await + .expect("Failed to send proposal receiver"); + proposal_sender + .send(ProposalPart::Init(proposal_init.clone())) .await - .expect("Failed to broadcast proposal init"); - let broadcast_client = self.network_broadcast_client.clone(); + .expect("Failed to send proposal init"); tokio::spawn( async move { stream_build_proposal( @@ -142,7 +149,7 @@ impl ConsensusContext for SequencerConsensusContext { proposal_id, batcher, valid_proposals, - broadcast_client, + proposal_sender, fin_sender, ) .await; @@ -270,16 +277,16 @@ impl SequencerConsensusContext { // Handles building a new proposal without blocking consensus: // 1. Receive chunks of content from the batcher. -// 2. Forward these to consensus to be streamed out to the network. +// 2. Forward these to the stream handler to be streamed out to the network. // 3. Once finished, receive the commitment from the batcher. // 4. Store the proposal for re-proposal. -// 5. Send the commitment to consensus. +// 5. Send the commitment to the stream handler (to send fin). async fn stream_build_proposal( height: BlockNumber, proposal_id: ProposalId, batcher: Arc, valid_proposals: Arc>, - mut broadcast_client: BroadcastTopicClient, + mut proposal_sender: mpsc::Sender, fin_sender: oneshot::Sender, ) { let mut content = Vec::new(); @@ -306,8 +313,8 @@ async fn stream_build_proposal( } debug!("Broadcasting proposal content: {transaction_hashes:?}"); trace!("Broadcasting proposal content: {transactions:?}"); - broadcast_client - .broadcast_message(ProposalPart::Transactions(TransactionBatch { + proposal_sender + .send(ProposalPart::Transactions(TransactionBatch { transactions, tx_hashes: transaction_hashes, })) @@ -325,8 +332,8 @@ async fn stream_build_proposal( height ); debug!("Broadcasting proposal fin: {proposal_content_id:?}"); - broadcast_client - .broadcast_message(ProposalPart::Fin(ProposalFin { proposal_content_id })) + proposal_sender + .send(ProposalPart::Fin(ProposalFin { proposal_content_id })) .await .expect("Failed to broadcast proposal fin"); // Update valid_proposals before sending fin to avoid a race condition @@ -336,6 +343,10 @@ async fn stream_build_proposal( .entry(height) .or_default() .insert(proposal_content_id, (content, proposal_id)); + // proposal_sender + // .send(ProposalPart::Fin(ProposalFin { proposal_content_id })) + // .await + // .expect("Failed to broadcast proposal fin"); if fin_sender.send(proposal_content_id).is_err() { // Consensus may exit early (e.g. sync). warn!("Failed to send proposal content id"); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 85f034e3ac..aced9b6ef6 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -160,11 +160,7 @@ async fn validate_proposal_success() { broadcasted_messages_receiver: inbound_network_receiver, broadcast_topic_client: outbound_network_sender, } = subscriber_channels; -<<<<<<< HEAD let (outbound_internal_sender, _inbound_internal_receiver, _) = -======= - let (outbound_internal_sender, _inbound_internal_receiver) = ->>>>>>> 883a253be (feat: allow a streamed proposal channel on top of existing one) StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); let mut context = SequencerConsensusContext::new(